Skip to content

Commit

Permalink
Using the shard throughput information in the scheduling logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jun 24, 2024
1 parent 9fddb68 commit edd6787
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 12 deletions.
44 changes: 34 additions & 10 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
};
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::{NodeId, ShardId};
use quickwit_proto::types::NodeId;
use scheduling::{SourceToSchedule, SourceToScheduleType};
use serde::Serialize;
use tracing::{debug, info, warn};
Expand All @@ -42,7 +42,7 @@ use crate::indexing_plan::PhysicalIndexingPlan;
use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier};
use crate::indexing_scheduler::scheduling::build_physical_indexing_plan;
use crate::metrics::ShardLocalityMetrics;
use crate::model::{ControlPlaneModel, ShardLocations};
use crate::model::{ControlPlaneModel, ShardEntry, ShardLocations};
use crate::{IndexerNodeInfo, IndexerPool};

pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration =
Expand Down Expand Up @@ -121,6 +121,28 @@ impl fmt::Debug for IndexingScheduler {
}
}

/// Computes the CPU load associated to a single shard of a given index.
///
/// The array passed contains all of data we have about the shard of the index.
/// This function averages their statistics.
///
/// For the moment, this function only takes in account the measured throughput,
/// and assumes a constant CPU usage of 4 vCPU = 30mb/s.
///
/// It does not take in account the variation that could raise from the different
/// doc mapping / nature of the data, etc.
fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 {
let num_shards = shard_entries.len().max(1) as u32;
let average_throughput_per_shard: u32 = shard_entries
.iter()
.map(|shard_entry| u32::from(shard_entry.ingestion_rate.0))
.sum::<u32>()
.div_ceil(num_shards);
let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() * average_throughput_per_shard) / 20;
const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32;
NonZeroU32::new(num_cpu_millis.max(MIN_CPU_LOAD_PER_SHARD)).unwrap()
}

fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
let mut sources = Vec::new();

Expand All @@ -147,22 +169,24 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
// Expect: the source should exist since we just read it from `get_source_configs`.
// Note that we keep all shards, including Closed shards:
// A closed shards still needs to be indexed.
let shard_ids: Vec<ShardId> = model
let shard_entries: Vec<&ShardEntry> = model
.get_shards_for_source(&source_uid)
.expect("source should exist")
.keys()
.cloned()
.values()
.collect();
if shard_ids.is_empty() {
if shard_entries.is_empty() {
continue;
}
let shard_ids = shard_entries
.iter()
.map(|shard_entry| shard_entry.shard_id().clone())
.collect();
let load_per_shard = compute_load_per_shard(&shard_entries[..]);
sources.push(SourceToSchedule {
source_uid,
source_type: SourceToScheduleType::Sharded {
shard_ids,
// FIXME
load_per_shard: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4)
.unwrap(),
load_per_shard,
},
});
}
Expand Down Expand Up @@ -562,7 +586,7 @@ mod tests {
use proptest::{prop_compose, proptest};
use quickwit_config::{IndexConfig, KafkaSourceParams, SourceConfig, SourceParams};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::types::{IndexUid, PipelineUid, SourceUid};
use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid};

use super::*;
use crate::model::ShardLocations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,26 @@ fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) {
else {
return;
};

// We first artificially scale down the node capacities.
//
// The node capacity is an estimate of the amount of CPU available on a given indexer node.
// It has two purpose,
// - under a lot of load, indexer will receive work proportional to their relative capacity.
// - under low load, the absolute magnitude will be used by the scheduler, to decide whether
// to prefer having a balanced workload over other criteria (all pipeline from a same index on
// the same node, indexing local shards, etc.).
//
// The default CPU capacity is detected from the OS. Using these values directly leads
// a non uniform distribution of the load which is very confusing for users. We artificially
// scale down the indexer capacities.
problem.scale_node_capacities(0.3f32);

let min_indexer_capacity = (0..problem.num_indexers())
.map(|indexer_ord| problem.indexer_cpu_capacity(indexer_ord))
.min()
.expect("At least one indexer is required");

assert_ne!(min_indexer_capacity.cpu_millis(), 0);
if min_indexer_capacity.cpu_millis() < largest_shard_load.get() {
let scaling_factor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ fn assert_remove_extraneous_shards_post_condition(
// Releave sources from the node that are exceeding their maximum load.

fn enforce_indexers_cpu_capacity(problem: &SchedulingProblem, solution: &mut SchedulingSolution) {
for indexer_assignment in solution.indexer_assignments.iter_mut() {
for indexer_assignment in &mut solution.indexer_assignments {
let indexer_cpu_capacity: CpuCapacity =
problem.indexer_cpu_capacity(indexer_assignment.indexer_ord);
enforce_indexer_cpu_capacity(problem, indexer_cpu_capacity, indexer_assignment);
Expand Down Expand Up @@ -753,6 +753,35 @@ mod tests {
assert_eq!(solution.indexer_assignments[0].num_shards(0), 1);
}

#[test]
fn test_problem_unbalanced_simple() {
let mut problem = SchedulingProblem::with_indexer_cpu_capacities(vec![
CpuCapacity::from_cpu_millis(1),
CpuCapacity::from_cpu_millis(1),
]);
problem.add_source(1, NonZeroU32::new(10).unwrap());
for _ in 0..10 {
problem.add_source(1, NonZeroU32::new(1).unwrap());
}
let previous_solution = problem.new_solution();
let solution = solve(problem.clone(), previous_solution);
let available_capacities: Vec<u32> = solution
.indexer_assignments
.iter()
.map(|indexer_assignment: &IndexerAssignment| {
indexer_assignment.total_cpu_load(&problem)
})
.collect();
assert_eq!(available_capacities.len(), 2);
let (min, max) = available_capacities
.into_iter()
.minmax()
.into_option()
.unwrap();
assert_eq!(min, 10);
assert_eq!(max, 10);
}

proptest! {
#[test]
fn test_proptest_post_conditions((problem, solution) in problem_solution_strategy()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Source {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
pub struct SchedulingProblem {
sources: Vec<Source>,
indexer_cpu_capacities: Vec<CpuCapacity>,
Expand Down

0 comments on commit edd6787

Please sign in to comment.