Skip to content

Commit

Permalink
Update quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jul 1, 2024
1 parent edd6787 commit 76cc3d9
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 9 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ license.workspace = true
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
bytesize = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
Expand Down
23 changes: 15 additions & 8 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ use std::num::NonZeroU32;
use std::sync::Arc;
use std::time::{Duration, Instant};

use bytesize::ByteSize;
use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use quickwit_proto::control_plane::{RebuildPlanRequest, RebuildPlanResponse};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
PIPELINE_THROUGHTPUT,
};
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::NodeId;
Expand Down Expand Up @@ -127,20 +129,25 @@ impl fmt::Debug for IndexingScheduler {
/// This function averages their statistics.
///
/// For the moment, this function only takes in account the measured throughput,
/// and assumes a constant CPU usage of 4 vCPU = 30mb/s.
/// and assumes a constant CPU usage of 4 vCPU = 20mb/s.
///
/// It does not take in account the variation that could raise from the different
/// doc mapping / nature of the data, etc.
fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
let num_shards = shard_entries.len().max(1) as u32;
let average_throughput_per_shard: u32 = shard_entries
let num_shards = shard_entries.len().max(1) as u64;
let average_throughput_per_shard_bytes: u64 = shard_entries
.iter()
.map(|shard_entry| u32::from(shard_entry.ingestion_rate.0))
.sum::<u32>()
.div_ceil(num_shards);
let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() * average_throughput_per_shard) / 20;
.map(|shard_entry| shard_entry.ingestion_rate.0 as u64 * bytesize::MIB)
.sum::<u64>()
.div_ceil(num_shards)
// A shard throughput cannot exceed PIPELINE_THROUGHPUT in the long term (this is enforced
// by the configuration).
.min(PIPELINE_THROUGHTPUT.as_u64());
let num_cpu_millis = ((PIPELINE_FULL_CAPACITY.cpu_millis() as u64
* average_throughput_per_shard_bytes) as u64)
/ PIPELINE_THROUGHTPUT.as_u64();
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
NonZeroU32::new(num_cpu_millis.max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
}

fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
Expand Down
11 changes: 10 additions & 1 deletion quickwit/quickwit-proto/src/indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::ops::{Add, Mul, Sub};

use bytesize::ByteSize;
use quickwit_actors::AskError;
use quickwit_common::pubsub::Event;
use quickwit_common::tower::{MakeLoadShedError, RpcName};
Expand Down Expand Up @@ -176,9 +177,17 @@ impl Display for PipelineMetrics {
}

/// One full pipeline (including merging) is assumed to consume 4 CPU threads.
/// The actual number somewhere between 3 and 4.
/// The actual number somewhere between 3 and 4. Quickwit is not super sensitive to this number.
///
/// It simply impacts the point where we prefer to work on balancing the load over the different
/// indexers and the point where we prefer improving other feature of the system (shard locality,
/// grouping pipelines associated to a given index on the same node, etc.).
pub const PIPELINE_FULL_CAPACITY: CpuCapacity = CpuCapacity::from_cpu_millis(4_000u32);

/// One full pipeline (including merging) is supposed to have the capacity to index at least 20mb/s.
/// This is a defensive value: In reality, this is typically above 30mb/s.
pub const PIPELINE_THROUGHTPUT: ByteSize = ByteSize::mb(20);

/// The CpuCapacity represents an amount of CPU resource available.
///
/// It is usually expressed in CPU millis (For instance, one full CPU thread is
Expand Down

0 comments on commit 76cc3d9

Please sign in to comment.