Skip to content

Commit

Permalink
Added rebuild plan rest debug handler. (#5150)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Jun 24, 2024
1 parent 48e7a66 commit 9fddb68
Show file tree
Hide file tree
Showing 10 changed files with 473 additions and 31 deletions.
26 changes: 14 additions & 12 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt, IndexMet
use quickwit_proto::control_plane::{
AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult,
GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSubrequest,
RebuildPlanRequest, RebuildPlanResponse,
};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -77,9 +78,6 @@ const REBUILD_PLAN_COOLDOWN_PERIOD: Duration = Duration::from_secs(2);
#[derive(Debug)]
struct ControlPlanLoop;

#[derive(Debug, Default)]
struct RebuildPlan;

pub struct ControlPlane {
cluster_config: ClusterConfig,
cluster_change_stream_opt: Option<ClusterChangeStream>,
Expand Down Expand Up @@ -384,22 +382,24 @@ impl ControlPlane {
.next_rebuild_tracker
.next_rebuild_waiter();
self.rebuild_plan_debouncer
.self_send_with_cooldown::<RebuildPlan>(ctx);
.self_send_with_cooldown::<RebuildPlanRequest>(ctx);
next_rebuild_waiter
}
}

#[async_trait]
impl Handler<RebuildPlan> for ControlPlane {
type Reply = ();
impl Handler<RebuildPlanRequest> for ControlPlane {
type Reply = ControlPlaneResult<RebuildPlanResponse>;

async fn handle(
&mut self,
_message: RebuildPlan,
rebuild_plan_request: RebuildPlanRequest,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.indexing_scheduler.rebuild_plan(&self.model);
Ok(())
) -> Result<ControlPlaneResult<RebuildPlanResponse>, ActorExitStatus> {
let rebuild_plan_response = self
.indexing_scheduler
.rebuild_plan(&self.model, rebuild_plan_request);
Ok(Ok(rebuild_plan_response))
}
}

Expand Down Expand Up @@ -925,7 +925,8 @@ impl Handler<IndexerJoined> for ControlPlane {
{
return convert_metastore_error::<()>(metastore_error).map(|_| ());
}
self.indexing_scheduler.rebuild_plan(&self.model);
self.indexing_scheduler
.rebuild_plan(&self.model, RebuildPlanRequest::default());
Ok(())
}
}
Expand Down Expand Up @@ -955,7 +956,8 @@ impl Handler<IndexerLeft> for ControlPlane {
{
return convert_metastore_error::<()>(metastore_error).map(|_| ());
}
self.indexing_scheduler.rebuild_plan(&self.model);
self.indexing_scheduler
.rebuild_plan(&self.model, RebuildPlanRequest::default());
Ok(())
}
}
Expand Down
45 changes: 36 additions & 9 deletions quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::time::{Duration, Instant};

use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use quickwit_proto::control_plane::{RebuildPlanRequest, RebuildPlanResponse};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
};
Expand Down Expand Up @@ -205,7 +206,12 @@ impl IndexingScheduler {
//
// Prefer not calling this method directly, and instead call
// `ControlPlane::rebuild_indexing_plan_debounced`.
pub(crate) fn rebuild_plan(&mut self, model: &ControlPlaneModel) {
pub(crate) fn rebuild_plan(
&mut self,
model: &ControlPlaneModel,
rebuild_plan_request: RebuildPlanRequest,
) -> RebuildPlanResponse {
let RebuildPlanRequest { reset, debug } = rebuild_plan_request;
crate::metrics::CONTROL_PLANE_METRICS.schedule_total.inc();

let notify_on_drop = self.next_rebuild_tracker.start_rebuild();
Expand All @@ -214,6 +220,8 @@ impl IndexingScheduler {

let indexers: Vec<IndexerNodeInfo> = self.get_indexers_from_indexer_pool();

let mut rebuild_plan_response = RebuildPlanResponse::default();

let indexer_id_to_cpu_capacities: FnvHashMap<String, CpuCapacity> = indexers
.iter()
.filter_map(|indexer| {
Expand All @@ -229,15 +237,28 @@ impl IndexingScheduler {
if !sources.is_empty() {
warn!("no indexing capacity available, cannot schedule an indexing plan");
}
return;
return rebuild_plan_response;
};

let shard_locations = model.shard_locations();
let previous_applied_plan = if reset {
None
} else {
self.state.last_applied_physical_plan.as_ref()
};

let debug_output: Option<&mut RebuildPlanResponse> = if debug {
Some(&mut rebuild_plan_response)
} else {
None
};

let new_physical_plan = build_physical_indexing_plan(
&sources,
&indexer_id_to_cpu_capacities,
self.state.last_applied_physical_plan.as_ref(),
previous_applied_plan,
&shard_locations,
debug_output,
);
let shard_locality_metrics =
get_shard_locality_metrics(&new_physical_plan, &shard_locations);
Expand All @@ -249,11 +270,12 @@ impl IndexingScheduler {
);
// No need to apply the new plan as it is the same as the old one.
if plans_diff.is_empty() {
return;
return rebuild_plan_response;
}
}
self.apply_physical_indexing_plan(&indexers, new_physical_plan, Some(notify_on_drop));
self.state.num_schedule_indexing_plan += 1;
rebuild_plan_response
}

/// Checks if the last applied plan corresponds to the running indexing tasks present in the
Expand All @@ -268,7 +290,7 @@ impl IndexingScheduler {
// If there is no plan, the node is probably starting and the scheduler did not find
// indexers yet. In this case, we want to schedule as soon as possible to find new
// indexers.
self.rebuild_plan(model);
self.rebuild_plan(model, RebuildPlanRequest::default());
return;
};
if let Some(last_applied_plan_timestamp) = self.state.last_applied_plan_timestamp {
Expand All @@ -290,7 +312,7 @@ impl IndexingScheduler {
);
if !indexing_plans_diff.has_same_nodes() {
info!(plans_diff=?indexing_plans_diff, "running plan and last applied plan node IDs differ: schedule an indexing plan");
self.rebuild_plan(model);
let _ = self.rebuild_plan(model, RebuildPlanRequest::default());
} else if !indexing_plans_diff.has_same_tasks() {
// Some nodes may have not received their tasks, apply it again.
info!(plans_diff=?indexing_plans_diff, "running tasks and last applied tasks differ: reapply last plan");
Expand Down Expand Up @@ -833,8 +855,13 @@ mod tests {
indexer_max_loads.insert("indexer1".to_string(), mcpu(3_000));
indexer_max_loads.insert("indexer2".to_string(), mcpu(3_000));
let shard_locations = ShardLocations::default();
let physical_plan =
build_physical_indexing_plan(&sources[..], &indexer_max_loads, None, &shard_locations);
let physical_plan = build_physical_indexing_plan(
&sources[..],
&indexer_max_loads,
None,
&shard_locations,
None,
);
assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 2);
let indexing_tasks_1 = physical_plan.indexer("indexer1").unwrap();
assert_eq!(indexing_tasks_1.len(), 2);
Expand Down Expand Up @@ -865,7 +892,7 @@ mod tests {
indexer_max_loads.insert(indexer_id, mcpu(4_000));
}
let shard_locations = ShardLocations::default();
let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations);
let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations, None);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::num::NonZeroU32;

use fnv::{FnvHashMap, FnvHashSet};
use quickwit_common::rate_limited_debug;
use quickwit_proto::control_plane::RebuildPlanResponse;
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::types::{PipelineUid, ShardId, SourceUid};
use scheduling_logic_model::{IndexerOrd, SourceOrd};
Expand Down Expand Up @@ -609,6 +610,7 @@ pub fn build_physical_indexing_plan(
indexer_id_to_cpu_capacities: &FnvHashMap<String, CpuCapacity>,
previous_plan_opt: Option<&PhysicalIndexingPlan>,
shard_locations: &ShardLocations,
mut debug_output_opt: Option<&mut RebuildPlanResponse>,
) -> PhysicalIndexingPlan {
// Asserts that the source are valid.
check_sources(sources);
Expand All @@ -623,13 +625,23 @@ pub fn build_physical_indexing_plan(

// Populate the previous solution, if any.
let mut previous_solution = problem.new_solution();

if let Some(previous_plan) = previous_plan_opt {
convert_physical_plan_to_solution(previous_plan, &id_to_ord_map, &mut previous_solution);
}

if let Some(debug_output) = &mut debug_output_opt {
debug_output.previous_solution_json = serde_json::to_string(&previous_solution).unwrap();
debug_output.problem_json = serde_json::to_string(&problem).unwrap();
}

// Compute the new scheduling solution using a heuristic.
let new_solution = scheduling_logic::solve(problem, previous_solution);

if let Some(debug_output) = debug_output_opt {
debug_output.new_solution_json = serde_json::to_string(&new_solution).unwrap();
}

// Convert the new scheduling solution back to a physical plan.
let new_physical_plan = convert_scheduling_solution_to_physical_plan(
&new_solution,
Expand Down Expand Up @@ -778,6 +790,7 @@ mod tests {
&indexer_id_to_cpu_capacities,
None,
&shard_locations,
None,
);
assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2);

Expand Down Expand Up @@ -848,6 +861,7 @@ mod tests {
&indexer_id_to_cpu_capacities,
None,
&shard_locations,
None,
);
assert_eq!(plan.indexing_tasks_per_indexer().len(), num_indexers);
let metrics = get_shard_locality_metrics(&plan, &shard_locations);
Expand Down Expand Up @@ -876,8 +890,13 @@ mod tests {
{
indexer_max_loads.insert(indexer1.clone(), mcpu(1_999));
// This test what happens when there isn't enough capacity on the cluster.
let physical_plan =
build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations);
let physical_plan = build_physical_indexing_plan(
&sources,
&indexer_max_loads,
None,
&shard_locations,
None,
);
assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1);
let expected_tasks = physical_plan.indexer(&indexer1).unwrap();
assert_eq!(expected_tasks.len(), 2);
Expand All @@ -886,8 +905,13 @@ mod tests {
{
indexer_max_loads.insert(indexer1.clone(), mcpu(2_000));
// This test what happens when there isn't enough capacity on the cluster.
let physical_plan =
build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations);
let physical_plan = build_physical_indexing_plan(
&sources,
&indexer_max_loads,
None,
&shard_locations,
None,
);
assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1);
let expected_tasks = physical_plan.indexer(&indexer1).unwrap();
assert_eq!(expected_tasks.len(), 2);
Expand Down Expand Up @@ -955,6 +979,7 @@ mod tests {
&indexer_id_to_cpu_capacities,
Some(&indexing_plan),
&shard_locations,
None,
);
let indexing_tasks = new_plan.indexer("node1").unwrap();
assert_eq!(indexing_tasks.len(), 2);
Expand Down Expand Up @@ -995,6 +1020,7 @@ mod tests {
&indexer_id_to_cpu_capacities,
Some(&indexing_plan),
&shard_locations,
None,
);
let mut indexing_tasks = new_plan.indexer(NODE).unwrap().to_vec();
for indexing_task in &mut indexing_tasks {
Expand Down Expand Up @@ -1177,7 +1203,13 @@ mod tests {
let mut capacities = FnvHashMap::default();
capacities.insert("indexer-1".to_string(), CpuCapacity::from_cpu_millis(8000));
let shard_locations = ShardLocations::default();
build_physical_indexing_plan(&sources_to_schedule, &capacities, None, &shard_locations);
build_physical_indexing_plan(
&sources_to_schedule,
&capacities,
None,
&shard_locations,
None,
);
}

#[test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use std::collections::BTreeMap;
use std::num::NonZeroU32;

use quickwit_proto::indexing::CpuCapacity;
use serde::Serialize;

pub type SourceOrd = u32;
pub type IndexerOrd = usize;

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct Source {
pub source_ord: SourceOrd,
pub load_per_shard: NonZeroU32,
Expand Down Expand Up @@ -79,7 +80,7 @@ impl Source {
}
}

#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct SchedulingProblem {
sources: Vec<Source>,
indexer_cpu_capacities: Vec<CpuCapacity>,
Expand Down Expand Up @@ -174,7 +175,7 @@ impl SchedulingProblem {
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct IndexerAssignment {
pub indexer_ord: IndexerOrd,
pub num_shards_per_source: BTreeMap<SourceOrd, u32>,
Expand Down Expand Up @@ -232,7 +233,7 @@ impl IndexerAssignment {
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct SchedulingSolution {
pub indexer_assignments: Vec<IndexerAssignment>,
}
Expand Down
17 changes: 17 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/control_plane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ service ControlPlaneService {

// Asks the control plane whether the shards listed in the request should be deleted or truncated.
rpc AdviseResetShards(AdviseResetShardsRequest) returns (AdviseResetShardsResponse);

/// Compute the indexing plan from scratch, without taking into account the current state of the indexers.
rpc RebuildPlan(RebuildPlanRequest) returns (RebuildPlanResponse);
}

// Shard API
Expand Down Expand Up @@ -122,3 +125,17 @@ message AdviseResetShardsResponse {
repeated quickwit.ingest.ShardIds shards_to_delete = 1;
repeated quickwit.ingest.ShardIdPositions shards_to_truncate = 2;
}

/// Careful here! The Default implementation is used in different place of the code.
/// If you modify this struct make sure, its default's value aligns with the previous
/// behavior.
message RebuildPlanRequest {
bool reset = 1;
bool debug = 2;
}

message RebuildPlanResponse {
string previous_solution_json = 1;
string problem_json = 2;
string new_solution_json = 3;
}
Loading

0 comments on commit 9fddb68

Please sign in to comment.