Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect shard throughput #5151

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ license.workspace = true
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
bytesize = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
Expand Down
49 changes: 39 additions & 10 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ use itertools::Itertools;
use quickwit_proto::control_plane::{RebuildPlanRequest, RebuildPlanResponse};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
PIPELINE_THROUGHTPUT,
};
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::{NodeId, ShardId};
use quickwit_proto::types::NodeId;
use scheduling::{SourceToSchedule, SourceToScheduleType};
use serde::Serialize;
use tracing::{debug, info, warn};
Expand All @@ -42,7 +43,7 @@ use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier};
use crate::indexing_scheduler::scheduling::build_physical_indexing_plan;
use crate::metrics::ShardLocalityMetrics;
use crate::model::{ControlPlaneModel, ShardLocations};
use crate::model::{ControlPlaneModel, ShardEntry, ShardLocations};
use crate::{IndexerNodeInfo, IndexerPool};

pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration =
Expand Down Expand Up @@ -121,6 +122,32 @@ impl fmt::Debug for IndexingScheduler {
}
}

/// Computes the CPU load associated to a single shard of a given index.
///
/// The array passed contains all of data we have about the shard of the index.
/// This function averages their statistics.
///
/// For the moment, this function only takes in account the measured throughput,
/// and assumes a constant CPU usage of 4 vCPU = 20mb/s.
///
/// It does not take in account the variation that could raise from the different
/// doc mapping / nature of the data, etc.
fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
let num_shards = shard_entries.len().max(1) as u64;
let average_throughput_per_shard_bytes: u64 = shard_entries
.iter()
.map(|shard_entry| shard_entry.ingestion_rate.0 as u64 * bytesize::MIB)
.sum::<u64>()
.div_ceil(num_shards)
// A shard throughput cannot exceed PIPELINE_THROUGHPUT in the long term (this is enforced
// by the configuration).
.min(PIPELINE_THROUGHTPUT.as_u64());
let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() as u64 * average_throughput_per_shard_bytes)
/ PIPELINE_THROUGHTPUT.as_u64();
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
}

fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
let mut sources = Vec::new();

Expand All @@ -147,22 +174,24 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
// Expect: the source should exist since we just read it from `get_source_configs`.
// Note that we keep all shards, including Closed shards:
// A closed shards still needs to be indexed.
let shard_ids: Vec<ShardId> = model
let shard_entries: Vec<&ShardEntry> = model
.get_shards_for_source(&source_uid)
.expect("source should exist")
.keys()
.cloned()
.values()
.collect();
if shard_ids.is_empty() {
if shard_entries.is_empty() {
continue;
}
let shard_ids = shard_entries
.iter()
.map(|shard_entry| shard_entry.shard_id().clone())
.collect();
let load_per_shard = compute_load_per_shard(&shard_entries[..]);
sources.push(SourceToSchedule {
source_uid,
source_type: SourceToScheduleType::Sharded {
shard_ids,
// FIXME
load_per_shard: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4)
.unwrap(),
load_per_shard,
},
});
}
Expand Down Expand Up @@ -562,7 +591,7 @@ mod tests {
use proptest::{prop_compose, proptest};
use quickwit_config::{IndexConfig, KafkaSourceParams, SourceConfig, SourceParams};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::types::{IndexUid, PipelineUid, SourceUid};
use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid};

use super::*;
use crate::model::ShardLocations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,26 @@ fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) {
else {
return;
};

// We first artificially scale down the node capacities.
//
// The node capacity is an estimate of the amount of CPU available on a given indexer node.
// It has two purpose,
// - under a lot of load, indexer will receive work proportional to their relative capacity.
// - under low load, the absolute magnitude will be used by the scheduler, to decide whether
// to prefer having a balanced workload over other criteria (all pipeline from a same index on
// the same node, indexing local shards, etc.).
//
// The default CPU capacity is detected from the OS. Using these values directly leads
// a non uniform distribution of the load which is very confusing for users. We artificially
// scale down the indexer capacities.
problem.scale_node_capacities(0.3f32);

let min_indexer_capacity = (0..problem.num_indexers())
.map(|indexer_ord| problem.indexer_cpu_capacity(indexer_ord))
.min()
.expect("At least one indexer is required");

assert_ne!(min_indexer_capacity.cpu_millis(), 0);
if min_indexer_capacity.cpu_millis() < largest_shard_load.get() {
let scaling_factor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ fn assert_remove_extraneous_shards_post_condition(
// Releave sources from the node that are exceeding their maximum load.

fn enforce_indexers_cpu_capacity(problem: &SchedulingProblem, solution: &mut SchedulingSolution) {
for indexer_assignment in solution.indexer_assignments.iter_mut() {
for indexer_assignment in &mut solution.indexer_assignments {
let indexer_cpu_capacity: CpuCapacity =
problem.indexer_cpu_capacity(indexer_assignment.indexer_ord);
enforce_indexer_cpu_capacity(problem, indexer_cpu_capacity, indexer_assignment);
Expand Down Expand Up @@ -753,6 +753,35 @@ mod tests {
assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
}

#[test]
fn test_problem_unbalanced_simple() {
let mut problem = SchedulingProblem::with_indexer_cpu_capacities(vec![
CpuCapacity::from_cpu_millis(1),
CpuCapacity::from_cpu_millis(1),
]);
problem.add_source(1, NonZeroU32::new(10).unwrap());
for _ in 0..10 {
problem.add_source(1, NonZeroU32::new(1).unwrap());
}
let previous_solution = problem.new_solution();
let solution = solve(problem.clone(), previous_solution);
let available_capacities: Vec<u32> = solution
.indexer_assignments
.iter()
.map(|indexer_assignment: &IndexerAssignment| {
indexer_assignment.total_cpu_load(&problem)
})
.collect();
assert_eq!(available_capacities.len(), 2);
let (min, max) = available_capacities
.into_iter()
.minmax()
.into_option()
.unwrap();
assert_eq!(min, 10);
assert_eq!(max, 10);
}

proptest! {
#[test]
fn test_proptest_post_conditions((problem, solution) in problem_solution_strategy()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Source {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
pub struct SchedulingProblem {
sources: Vec<Source>,
indexer_cpu_capacities: Vec<CpuCapacity>,
Expand Down
11 changes: 10 additions & 1 deletion quickwit/quickwit-proto/src/indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::ops::{Add, Mul, Sub};

use bytesize::ByteSize;
use quickwit_actors::AskError;
use quickwit_common::pubsub::Event;
use quickwit_common::tower::{MakeLoadShedError, RpcName};
Expand Down Expand Up @@ -176,9 +177,17 @@ impl Display for PipelineMetrics {
}

/// One full pipeline (including merging) is assumed to consume 4 CPU threads.
/// The actual number somewhere between 3 and 4.
/// The actual number somewhere between 3 and 4. Quickwit is not super sensitive to this number.
///
/// It simply impacts the point where we prefer to work on balancing the load over the different
/// indexers and the point where we prefer improving other feature of the system (shard locality,
/// grouping pipelines associated to a given index on the same node, etc.).
pub const PIPELINE_FULL_CAPACITY: CpuCapacity = CpuCapacity::from_cpu_millis(4_000u32);

/// One full pipeline (including merging) is supposed to have the capacity to index at least 20mb/s.
/// This is a defensive value: In reality, this is typically above 30mb/s.
pub const PIPELINE_THROUGHTPUT: ByteSize = ByteSize::mb(20);

/// The CpuCapacity represents an amount of CPU resource available.
///
/// It is usually expressed in CPU millis (For instance, one full CPU thread is
Expand Down
Loading