From ffeeb4e598dbc2de950124cb3138094d54c75326 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 9 Feb 2024 21:26:27 +0800 Subject: [PATCH 01/15] feat(meta): make complete barrier non-async in worker loop --- src/meta/src/barrier/mod.rs | 548 ++++++++++++++++--------------- src/meta/src/barrier/progress.rs | 5 + src/meta/src/barrier/rpc.rs | 50 +-- 3 files changed, 304 insertions(+), 299 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 3bad3cbd15a2..b5525783127e 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -15,11 +15,14 @@ use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet, VecDeque}; -use std::mem::take; +use std::future::{poll_fn, Future}; use std::sync::Arc; +use std::task::{ready, Poll}; use std::time::Duration; +use anyhow::anyhow; use fail::fail_point; +use futures::FutureExt; use itertools::Itertools; use prometheus::HistogramTimer; use risingwave_common::bail; @@ -37,6 +40,7 @@ use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; +use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use risingwave_pb::stream_service::BarrierCompleteResponse; use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; @@ -50,9 +54,8 @@ use self::progress::TrackingCommand; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob}; -use crate::barrier::rpc::BarrierRpcManager; +use crate::barrier::rpc::BarrierCollectFuture; use crate::barrier::state::BarrierManagerState; -use crate::barrier::BarrierEpochState::{Completed, InFlight}; use crate::hummock::{CommitEpochInfo, HummockManagerRef}; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ @@ -151,7 +154,7 @@ pub struct GlobalBarrierManagerContext { sink_manager: SinkCoordinatorManager, - metrics: Arc, + pub(super) metrics: Arc, stream_rpc_manager: StreamRpcManager, @@ -185,31 +188,32 @@ pub struct GlobalBarrierManager { checkpoint_control: CheckpointControl, - rpc_manager: BarrierRpcManager, - active_streaming_nodes: ActiveStreamingWorkerNodes, } /// Controls the concurrent execution of commands. struct CheckpointControl { /// Save the state and message of barrier in order. - command_ctx_queue: VecDeque, + inflight_command_ctx_queue: VecDeque, - metrics: Arc, + /// Command that has been collected but is still completing. + /// The join handle of the completing future is stored. + completing_command: Option, - /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) - finished_jobs: Vec, + context: GlobalBarrierManagerContext, } impl CheckpointControl { - fn new(metrics: Arc) -> Self { + fn new(context: GlobalBarrierManagerContext) -> Self { Self { - command_ctx_queue: Default::default(), - metrics, - finished_jobs: Default::default(), + inflight_command_ctx_queue: Default::default(), + completing_command: None, + context, } } +} +impl CreateMviewProgressTracker { /// Stash a command to finish later. fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { self.finished_jobs.push(finished_job); @@ -231,149 +235,139 @@ impl CheckpointControl { } Ok(!self.finished_jobs.is_empty()) } +} +impl CheckpointControl { fn cancel_command(&mut self, cancelled_job: TrackingJob) { if let TrackingJob::New(cancelled_command) = cancelled_job { - if let Some(index) = self.command_ctx_queue.iter().position(|x| { - x.command_ctx.prev_epoch.value() == cancelled_command.context.prev_epoch.value() + if let Some(index) = self.inflight_command_ctx_queue.iter().position(|command| { + command.command_ctx.prev_epoch.value() + == cancelled_command.context.prev_epoch.value() }) { - self.command_ctx_queue.remove(index); + self.inflight_command_ctx_queue.remove(index); } } else { // Recovered jobs do not need to be cancelled since only `RUNNING` actors will get recovered. } } +} +impl CreateMviewProgressTracker { fn cancel_stashed_command(&mut self, id: TableId) { self.finished_jobs .retain(|x| x.table_to_create() != Some(id)); } +} +impl CheckpointControl { /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { - self.metrics.in_flight_barrier_nums.set( - self.command_ctx_queue - .iter() - .filter(|x| matches!(x.state, InFlight)) - .count() as i64, - ); - self.metrics + self.context + .metrics + .in_flight_barrier_nums + .set(self.inflight_command_ctx_queue.len() as i64); + self.context + .metrics .all_barrier_nums - .set(self.command_ctx_queue.len() as i64); + .set(self.inflight_command_ctx_queue.len() as i64); } /// Enqueue a barrier command, and init its state to `InFlight`. - fn enqueue_command(&mut self, command_ctx: Arc, notifiers: Vec) { - let timer = self.metrics.barrier_latency.start_timer(); + fn enqueue_inflight_command( + &mut self, + command_ctx: Arc, + notifiers: Vec, + barrier_collect_future: BarrierCollectFuture, + ) { + let enqueue_time = self.context.metrics.barrier_latency.start_timer(); - self.command_ctx_queue.push_back(EpochNode { - timer: Some(timer), - wait_commit_timer: None, + if let Some(command) = self.inflight_command_ctx_queue.back() { + assert_eq!( + command.command_ctx.curr_epoch.value(), + command_ctx.prev_epoch.value() + ); + } - state: InFlight, + self.inflight_command_ctx_queue.push_back(InflightCommand { command_ctx, + barrier_collect_future, + enqueue_time, notifiers, }); } - /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes - /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. - fn barrier_completed( - &mut self, - prev_epoch: u64, - result: Vec, - ) -> Vec { - // change state to complete, and wait for nodes with the smaller epoch to commit - let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); - if let Some(node) = self - .command_ctx_queue - .iter_mut() - .find(|x| x.command_ctx.prev_epoch.value().0 == prev_epoch) - { - assert!(matches!(node.state, InFlight)); - node.wait_commit_timer = Some(wait_commit_timer); - node.state = Completed(result); - }; - // Find all continuous nodes with 'Complete' starting from first node - let index = self - .command_ctx_queue - .iter() - .position(|x| !matches!(x.state, Completed(_))) - .unwrap_or(self.command_ctx_queue.len()); - let complete_nodes = self.command_ctx_queue.drain(..index).collect_vec(); - complete_nodes - } - - /// Remove all nodes from queue and return them. - fn barrier_failed(&mut self) -> Vec { - self.command_ctx_queue.drain(..).collect_vec() - } - /// Pause inject barrier until True. fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool { - let in_flight_not_full = self - .command_ctx_queue - .iter() - .filter(|x| matches!(x.state, InFlight)) - .count() - < in_flight_barrier_nums; + let in_flight_not_full = self.inflight_command_ctx_queue.len() < in_flight_barrier_nums; // Whether some command requires pausing concurrent barrier. If so, it must be the last one. let should_pause = self - .command_ctx_queue + .inflight_command_ctx_queue .back() - .map(|x| x.command_ctx.command.should_pause_inject_barrier()) + .map(|command| command.command_ctx.command.should_pause_inject_barrier()) .unwrap_or(false); debug_assert_eq!( - self.command_ctx_queue + self.inflight_command_ctx_queue .iter() - .any(|x| x.command_ctx.command.should_pause_inject_barrier()), + .any(|command| command.command_ctx.command.should_pause_inject_barrier()), should_pause ); in_flight_not_full && !should_pause } - /// Check whether the target epoch is managed by `CheckpointControl`. - pub fn contains_epoch(&self, epoch: u64) -> bool { - self.command_ctx_queue - .iter() - .any(|x| x.command_ctx.prev_epoch.value().0 == epoch) - } - /// We need to make sure there are no changes when doing recovery - pub fn clear_changes(&mut self) { - self.finished_jobs.clear(); + pub async fn clear_and_fail_all_nodes(&mut self, err: &MetaError) { + // join spawned completing command to finish no matter it succeeds or not. + if let Some(command) = self.completing_command.take() { + info!( + prev_epoch = ?command.command_ctx.prev_epoch, + curr_epoch = ?command.command_ctx.curr_epoch, + "waiting for completing command to finish in recovery" + ); + match command.join_handle.await { + Err(e) => { + warn!(err = ?e.as_report(), "failed to join completing task"); + } + Ok(Err(e)) => { + warn!(err = ?e.as_report(), "failed to complete barrier during clear"); + } + Ok(Ok(_)) => {} + }; + } + for command in self.inflight_command_ctx_queue.drain(..) { + for notifier in command.notifiers { + notifier.notify_collection_failed(err.clone()); + } + command.enqueue_time.observe_duration(); + } } } -/// The state and message of this barrier, a node for concurrent checkpoint. -pub struct EpochNode { - /// Timer for recording barrier latency, taken after `complete_barriers`. - timer: Option, - /// The timer of `barrier_wait_commit_latency` - wait_commit_timer: Option, - - /// Whether this barrier is in-flight or completed. - state: BarrierEpochState, - /// Context of this command to generate barrier and do some post jobs. +struct InflightCommand { command_ctx: Arc, - /// Notifiers of this barrier. + + barrier_collect_future: BarrierCollectFuture, + + enqueue_time: HistogramTimer, + notifiers: Vec, } -/// The state of barrier. -enum BarrierEpochState { - /// This barrier is current in-flight on the stream graph of compute nodes. - InFlight, +struct CompleteBarrierOutput { + cancelled_job: Option, + has_remaining_job: bool, +} + +struct CompletingCommand { + command_ctx: Arc, - /// This barrier is completed or failed. - Completed(Vec), + join_handle: JoinHandle>, } -/// The result of barrier completion. +/// The result of barrier collect. #[derive(Debug)] -struct BarrierCompletion { +struct BarrierCollectResult { prev_epoch: u64, result: MetaResult>, } @@ -400,7 +394,6 @@ impl GlobalBarrierManager { InflightActorInfo::default(), None, ); - let checkpoint_control = CheckpointControl::new(metrics.clone()); let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized(); @@ -419,7 +412,7 @@ impl GlobalBarrierManager { env: env.clone(), }; - let rpc_manager = BarrierRpcManager::new(context.clone()); + let checkpoint_control = CheckpointControl::new(context.clone()); Self { enable_recovery, @@ -429,7 +422,6 @@ impl GlobalBarrierManager { env, state: initial_invalid_state, checkpoint_control, - rpc_manager, active_streaming_nodes, } } @@ -619,14 +611,21 @@ impl GlobalBarrierManager { .set_checkpoint_frequency(p.checkpoint_frequency() as usize) } } - // Barrier completes. - completion = self.rpc_manager.next_complete_barrier() => { - self.handle_barrier_complete( - completion, - ) - .await; - } - + complete_result = self.checkpoint_control.next_completed_barrier() => { + match complete_result { + Ok((command_context, remaining)) => { + // If there are remaining commands (that requires checkpoint to finish), we force + // the next barrier to be a checkpoint. + if remaining { + assert_matches!(command_context.kind, BarrierKind::Barrier); + self.scheduled_barriers.force_checkpoint_in_next_barrier(); + } + } + Err(e) => { + self.failure_recovery(e).await; + } + } + }, // There's barrier scheduled. _ = self.scheduled_barriers.wait_one(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => { min_interval.reset(); // Reset the interval as we have a new barrier. @@ -681,10 +680,7 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - self.rpc_manager - .inject_barrier(command_ctx.clone()) - .instrument(span) - .await; + let await_collect_future = self.context.inject_barrier(command_ctx.clone()).await; // Notify about the injection. let prev_paused_reason = self.state.paused_reason(); @@ -701,75 +697,15 @@ impl GlobalBarrierManager { // Update the paused state after the barrier is injected. self.state.set_paused_reason(curr_paused_reason); // Record the in-flight barrier. - self.checkpoint_control - .enqueue_command(command_ctx.clone(), notifiers); - } - - /// Changes the state to `Complete`, and try to commit all epoch that state is `Complete` in - /// order. If commit is err, all nodes will be handled. - async fn handle_barrier_complete(&mut self, completion: BarrierCompletion) { - let BarrierCompletion { prev_epoch, result } = completion; - - assert!( - self.checkpoint_control.contains_epoch(prev_epoch), - "received barrier complete response for an unknown epoch: {}", - prev_epoch + self.checkpoint_control.enqueue_inflight_command( + command_ctx.clone(), + notifiers, + await_collect_future, ); - - if let Err(err) = result { - // FIXME: If it is a connector source error occurred in the init barrier, we should pass - // back to frontend - fail_point!("inject_barrier_err_success"); - let fail_node = self.checkpoint_control.barrier_failed(); - warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch"); - self.failure_recovery(err, fail_node).await; - return; - } - // change the state to Complete - let mut complete_nodes = self - .checkpoint_control - .barrier_completed(prev_epoch, result.unwrap()); - // try commit complete nodes - let (mut index, mut err_msg) = (0, None); - for (i, node) in complete_nodes.iter_mut().enumerate() { - assert!(matches!(node.state, Completed(_))); - let span = node.command_ctx.span.clone(); - if let Err(err) = self.complete_barrier(node).instrument(span).await { - index = i; - err_msg = Some(err); - break; - } - } - // Handle the error node and the nodes after it - if let Some(err) = err_msg { - let fail_nodes = complete_nodes - .drain(index..) - .chain(self.checkpoint_control.barrier_failed().into_iter()) - .collect_vec(); - warn!(%prev_epoch, error = %err.as_report(), "Failed to commit epoch"); - self.failure_recovery(err, fail_nodes).await; - } } - async fn failure_recovery( - &mut self, - err: MetaError, - fail_nodes: impl IntoIterator, - ) { - self.checkpoint_control.clear_changes(); - self.rpc_manager.clear(); - - for node in fail_nodes { - if let Some(timer) = node.timer { - timer.observe_duration(); - } - if let Some(wait_commit_timer) = node.wait_commit_timer { - wait_commit_timer.observe_duration(); - } - node.notifiers - .into_iter() - .for_each(|notifier| notifier.notify_collection_failed(err.clone())); - } + async fn failure_recovery(&mut self, err: MetaError) { + self.checkpoint_control.clear_and_fail_all_nodes(&err).await; if self.enable_recovery { self.context @@ -793,18 +729,50 @@ impl GlobalBarrierManager { panic!("failed to execute barrier: {}", err.as_report()); } } +} +impl GlobalBarrierManagerContext { /// Try to commit this node. If err, returns - async fn complete_barrier(&mut self, node: &mut EpochNode) -> MetaResult<()> { - let prev_epoch = node.command_ctx.prev_epoch.value().0; - match &mut node.state { - Completed(resps) => { + async fn complete_barrier( + self, + command_ctx: Arc, + resps: Vec, + mut notifiers: Vec, + enqueue_time: HistogramTimer, + ) -> MetaResult { + let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); + let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps); + if let Err(e) = self.update_snapshot(&command_ctx, commit_info).await { + for notifier in notifiers { + notifier.notify_collection_failed(e.clone()); + } + return Err(e); + }; + notifiers.iter_mut().for_each(|notifier| { + notifier.notify_collected(); + }); + let result = self + .update_tracking_jobs(notifiers, command_ctx.clone(), create_mview_progress) + .await; + let duration_sec = enqueue_time.stop_and_record(); + self.report_complete_event(duration_sec, &command_ctx); + wait_commit_timer.observe_duration(); + result + } + + async fn update_snapshot( + &self, + command_ctx: &CommandContext, + commit_info: CommitEpochInfo, + ) -> MetaResult<()> { + { + { + let prev_epoch = command_ctx.prev_epoch.value().0; // We must ensure all epochs are committed in ascending order, // because the storage engine will query from new to old in the order in which // the L0 layer files are generated. // See https://github.com/risingwave-labs/risingwave/issues/1251 - let kind = node.command_ctx.kind; - let commit_info = collect_commit_epoch_info(resps); + let kind = command_ctx.kind; // hummock_manager commit epoch. let mut new_snapshot = None; @@ -816,25 +784,20 @@ impl GlobalBarrierManager { ), BarrierKind::Checkpoint => { new_snapshot = self - .context .hummock_manager - .commit_epoch(node.command_ctx.prev_epoch.value().0, commit_info) + .commit_epoch(command_ctx.prev_epoch.value().0, commit_info) .await?; } BarrierKind::Barrier => { - new_snapshot = Some( - self.context - .hummock_manager - .update_current_epoch(prev_epoch), - ); + new_snapshot = Some(self.hummock_manager.update_current_epoch(prev_epoch)); // if we collect a barrier(checkpoint = false), // we need to ensure that command is Plain and the notifier's checkpoint is // false - assert!(!node.command_ctx.command.need_checkpoint()); + assert!(!command_ctx.command.need_checkpoint()); } } - node.command_ctx.post_collect().await?; + command_ctx.post_collect().await?; // Notify new snapshot after fragment_mapping changes have been notified in // `post_collect`. if let Some(snapshot) = new_snapshot { @@ -845,17 +808,27 @@ impl GlobalBarrierManager { Info::HummockSnapshot(snapshot), ); } + Ok(()) + } + } + } + async fn update_tracking_jobs( + &self, + notifiers: Vec, + command_ctx: Arc, + create_mview_progress: Vec, + ) -> MetaResult { + { + { // Notify about collected. - let mut notifiers = take(&mut node.notifiers); - notifiers.iter_mut().for_each(|notifier| { - notifier.notify_collected(); - }); + let version_stats = self.hummock_manager.get_version_stats().await; + + let mut tracker = self.tracker.lock().await; // Save `cancelled_command` for Create MVs. - let actors_to_cancel = node.command_ctx.actors_to_cancel(); - let cancelled_command = if !actors_to_cancel.is_empty() { - let mut tracker = self.context.tracker.lock().await; + let actors_to_cancel = command_ctx.actors_to_cancel(); + let cancelled_job = if !actors_to_cancel.is_empty() { tracker.find_cancelled_command(actors_to_cancel) } else { None @@ -864,12 +837,10 @@ impl GlobalBarrierManager { // Save `finished_commands` for Create MVs. let finished_commands = { let mut commands = vec![]; - let version_stats = self.context.hummock_manager.get_version_stats().await; - let mut tracker = self.context.tracker.lock().await; // Add the command to tracker. if let Some(command) = tracker.add( TrackingCommand { - context: node.command_ctx.clone(), + context: command_ctx.clone(), notifiers, }, &version_stats, @@ -878,9 +849,9 @@ impl GlobalBarrierManager { commands.push(command); } // Update the progress of all commands. - for progress in resps.iter().flat_map(|r| &r.create_mview_progress) { + for progress in create_mview_progress { // Those with actors complete can be finished immediately. - if let Some(command) = tracker.update(progress, &version_stats) { + if let Some(command) = tracker.update(&progress, &version_stats) { tracing::trace!(?progress, "finish progress"); commands.push(command); } else { @@ -891,53 +862,111 @@ impl GlobalBarrierManager { }; for command in finished_commands { - self.checkpoint_control.stash_command_to_finish(command); + tracker.stash_command_to_finish(command); } - if let Some(command) = cancelled_command { - self.checkpoint_control.cancel_command(command); - } else if let Some(table_id) = node.command_ctx.table_to_cancel() { + if let Some(table_id) = command_ctx.table_to_cancel() { // the cancelled command is possibly stashed in `finished_commands` and waiting // for checkpoint, we should also clear it. - self.checkpoint_control.cancel_stashed_command(table_id); + tracker.cancel_stashed_command(table_id); } - let remaining = self - .checkpoint_control - .finish_jobs(kind.is_checkpoint()) + let has_remaining_job = tracker + .finish_jobs(command_ctx.kind.is_checkpoint()) .await?; - // If there are remaining commands (that requires checkpoint to finish), we force - // the next barrier to be a checkpoint. - if remaining { - assert_matches!(kind, BarrierKind::Barrier); - self.scheduled_barriers.force_checkpoint_in_next_barrier(); - } - let duration_sec = node.timer.take().unwrap().stop_and_record(); - node.wait_commit_timer.take().unwrap().observe_duration(); + Ok(CompleteBarrierOutput { + cancelled_job, + has_remaining_job, + }) + } + } + } + fn report_complete_event(&self, duration_sec: f64, command_ctx: &CommandContext) { + { + { { // Record barrier latency in event log. use risingwave_pb::meta::event_log; let event = event_log::EventBarrierComplete { - prev_epoch: node.command_ctx.prev_epoch.value().0, - cur_epoch: node.command_ctx.curr_epoch.value().0, + prev_epoch: command_ctx.prev_epoch.value().0, + cur_epoch: command_ctx.curr_epoch.value().0, duration_sec, - command: node.command_ctx.command.to_string(), - barrier_kind: node.command_ctx.kind.as_str_name().to_string(), + command: command_ctx.command.to_string(), + barrier_kind: command_ctx.kind.as_str_name().to_string(), }; self.env .event_log_manager_ref() .add_event_logs(vec![event_log::Event::BarrierComplete(event)]); } - - Ok(()) } - InFlight => unreachable!(), } } } +impl CheckpointControl { + pub(super) fn next_completed_barrier( + &mut self, + ) -> impl Future, bool)>> + '_ { + poll_fn(|cx| { + let completing_command = match &mut self.completing_command { + Some(command) => command, + None => { + // If there is no completing barrier, try to start completing the earliest barrier if + // it has been collected. + if let Some(inflight_command) = self.inflight_command_ctx_queue.front_mut() { + // If the earliest barrier has been collected (i.e. return Poll::Ready), start completing + // the barrier is the collection is ok. When it is still pending, the current poll cannot + // make any progress and will return Poll::Pending. + let BarrierCollectResult { prev_epoch, result } = + ready!(inflight_command.barrier_collect_future.poll_unpin(cx)); + let resps = match result { + Ok(resps) => resps, + Err(err) => { + // FIXME: If it is a connector source error occurred in the init barrier, we should pass + // back to frontend + fail_point!("inject_barrier_err_success"); + warn!(%prev_epoch, error = %err.as_report(), "Failed to collect epoch"); + return Poll::Ready(Err(err)); + } + }; + // We only pop the command on successful collect. On error, no need to pop the node out, + // because the error will trigger an error immediately after return, which clears the failed node. + let earliest_inflight_command = + self.inflight_command_ctx_queue.pop_front().unwrap(); + let join_handle = tokio::spawn(self.context.clone().complete_barrier( + earliest_inflight_command.command_ctx.clone(), + resps, + earliest_inflight_command.notifiers, + earliest_inflight_command.enqueue_time, + )); + self.completing_command.insert(CompletingCommand { + command_ctx: earliest_inflight_command.command_ctx, + join_handle, + }) + } else { + return Poll::Pending; + } + } + }; + let join_result = ready!(completing_command.join_handle.poll_unpin(cx)) + .map_err(|e| anyhow!("failed to join completing command: {:?}", e).into()) + .and_then(|result| result); + let completed_command = self.completing_command.take().expect("non-empty"); + let result = join_result.map(|output| { + let command_ctx = completed_command.command_ctx.clone(); + if let Some(job) = output.cancelled_job { + self.cancel_command(job); + } + (command_ctx, output.has_remaining_job) + }); + + Poll::Ready(result) + }) + } +} + impl GlobalBarrierManagerContext { /// Check the status of barrier manager, return error if it is not `Running`. pub async fn check_status_running(&self) -> MetaResult<()> { @@ -1026,37 +1055,39 @@ impl GlobalBarrierManagerContext { pub type BarrierManagerRef = GlobalBarrierManagerContext; -fn collect_commit_epoch_info(resps: &mut [BarrierCompleteResponse]) -> CommitEpochInfo { +fn collect_commit_epoch_info( + resps: Vec, +) -> (CommitEpochInfo, Vec) { let mut sst_to_worker: HashMap = HashMap::new(); let mut synced_ssts: Vec = vec![]; - for resp in &mut *resps { - let mut t: Vec = resp - .synced_sstables - .iter_mut() - .map(|grouped| { - let sst_info = std::mem::take(&mut grouped.sst).expect("field not None"); - sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id); - ExtendedSstableInfo::new( - grouped.compaction_group_id, - sst_info, - std::mem::take(&mut grouped.table_stats_map), - ) - }) - .collect_vec(); - synced_ssts.append(&mut t); + let mut table_watermarks = Vec::with_capacity(resps.len()); + let mut progresses = Vec::new(); + for resp in resps { + let ssts_iter = resp.synced_sstables.into_iter().map(|grouped| { + let sst_info = grouped.sst.expect("field not None"); + sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id); + ExtendedSstableInfo::new( + grouped.compaction_group_id, + sst_info, + grouped.table_stats_map, + ) + }); + synced_ssts.extend(ssts_iter); + table_watermarks.push(resp.table_watermarks); + progresses.extend(resp.create_mview_progress); } - CommitEpochInfo::new( + let info = CommitEpochInfo::new( synced_ssts, merge_multiple_new_table_watermarks( - resps - .iter() - .map(|resp| { - resp.table_watermarks - .iter() + table_watermarks + .into_iter() + .map(|watermarks| { + watermarks + .into_iter() .map(|(table_id, watermarks)| { ( - TableId::new(*table_id), - TableWatermarks::from_protobuf(watermarks), + TableId::new(table_id), + TableWatermarks::from_protobuf(&watermarks), ) }) .collect() @@ -1064,5 +1095,6 @@ fn collect_commit_epoch_info(resps: &mut [BarrierCompleteResponse]) -> CommitEpo .collect_vec(), ), sst_to_worker, - ) + ); + (info, progresses) } diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 0c753a3c3f02..543c7974da0f 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -254,6 +254,9 @@ pub(super) struct CreateMviewProgressTracker { /// Find the epoch of the create-mview DDL by the actor containing the backfill executors. actor_map: HashMap, + + /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) + pub(super) finished_jobs: Vec, } impl CreateMviewProgressTracker { @@ -313,6 +316,7 @@ impl CreateMviewProgressTracker { Self { progress_map, actor_map, + finished_jobs: Vec::new(), } } @@ -320,6 +324,7 @@ impl CreateMviewProgressTracker { Self { progress_map: Default::default(), actor_map: Default::default(), + finished_jobs: Vec::new(), } } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 670ee7cf1092..b08f2c7247c7 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -19,8 +19,7 @@ use std::sync::Arc; use anyhow::anyhow; use fail::fail_point; use futures::future::try_join_all; -use futures::stream::FuturesUnordered; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::hash::ActorId; @@ -33,53 +32,22 @@ use risingwave_pb::stream_service::{ }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClient; -use rw_futures_util::pending_on_none; use tokio::sync::oneshot; use uuid::Uuid; use super::command::CommandContext; -use super::{BarrierCompletion, GlobalBarrierManagerContext}; +use super::{BarrierCollectResult, GlobalBarrierManagerContext}; use crate::manager::{MetaSrvEnv, WorkerId}; use crate::MetaResult; -pub(super) struct BarrierRpcManager { - context: GlobalBarrierManagerContext, - - /// Futures that await on the completion of barrier. - injected_in_progress_barrier: FuturesUnordered, -} - -impl BarrierRpcManager { - pub(super) fn new(context: GlobalBarrierManagerContext) -> Self { - Self { - context, - injected_in_progress_barrier: FuturesUnordered::new(), - } - } - - pub(super) fn clear(&mut self) { - self.injected_in_progress_barrier = FuturesUnordered::new(); - } - - pub(super) async fn inject_barrier(&mut self, command_context: Arc) { - let await_complete_future = self.context.inject_barrier(command_context).await; - self.injected_in_progress_barrier - .push(await_complete_future); - } - - pub(super) async fn next_complete_barrier(&mut self) -> BarrierCompletion { - pending_on_none(self.injected_in_progress_barrier.next()).await - } -} - -pub(super) type BarrierCompletionFuture = impl Future + Send + 'static; +pub(super) type BarrierCollectFuture = impl Future + Send + 'static; impl GlobalBarrierManagerContext { /// Inject a barrier to all CNs and spawn a task to collect it pub(super) async fn inject_barrier( &self, command_context: Arc, - ) -> BarrierCompletionFuture { + ) -> BarrierCollectFuture { let (tx, rx) = oneshot::channel(); let prev_epoch = command_context.prev_epoch.value().0; let result = self @@ -99,7 +67,7 @@ impl GlobalBarrierManagerContext { }); } Err(e) => { - let _ = tx.send(BarrierCompletion { + let _ = tx.send(BarrierCollectResult { prev_epoch, result: Err(e), }); @@ -107,7 +75,7 @@ impl GlobalBarrierManagerContext { } rx.map(move |result| match result { Ok(completion) => completion, - Err(_e) => BarrierCompletion { + Err(_e) => BarrierCollectResult { prev_epoch, result: Err(anyhow!("failed to receive barrier completion result").into()), }, @@ -195,7 +163,7 @@ impl StreamRpcManager { &self, node_need_collect: HashMap, command_context: Arc, - barrier_complete_tx: oneshot::Sender, + barrier_collect_tx: oneshot::Sender, ) { let prev_epoch = command_context.prev_epoch.value().0; let tracing_context = @@ -245,8 +213,8 @@ impl StreamRpcManager { .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); }) .map_err(Into::into); - let _ = barrier_complete_tx - .send(BarrierCompletion { prev_epoch, result }) + let _ = barrier_collect_tx + .send(BarrierCollectResult { prev_epoch, result }) .inspect_err(|_| tracing::warn!(prev_epoch, "failed to notify barrier completion")); } } From 9ec1655e5dcad92b664dd31699ff6df44e73c6ea Mon Sep 17 00:00:00 2001 From: William Wen Date: Sun, 18 Feb 2024 16:04:51 +0800 Subject: [PATCH 02/15] make dylint happy --- src/meta/src/barrier/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index b5525783127e..7329f216347e 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -951,7 +951,9 @@ impl CheckpointControl { } }; let join_result = ready!(completing_command.join_handle.poll_unpin(cx)) - .map_err(|e| anyhow!("failed to join completing command: {:?}", e).into()) + .map_err(|e| { + anyhow!("failed to join completing command: {:?}", e.as_report()).into() + }) .and_then(|result| result); let completed_command = self.completing_command.take().expect("non-empty"); let result = join_result.map(|output| { From 6b85b4f666a76e2285c13d6b2893bc462647c3ec Mon Sep 17 00:00:00 2001 From: William Wen Date: Sun, 18 Feb 2024 17:23:26 +0800 Subject: [PATCH 03/15] refactor(meta): track finished create mv job in tracker --- src/meta/src/barrier/command.rs | 9 ---- src/meta/src/barrier/mod.rs | 80 ++++++++++++-------------------- src/meta/src/barrier/progress.rs | 35 ++++---------- 3 files changed, 38 insertions(+), 86 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 6b1b73d6ca69..9e22f5debd0e 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -709,15 +709,6 @@ impl CommandContext { } } - /// For `CancelStreamingJob`, returns the actors of the `StreamScan` nodes. For other commands, - /// returns an empty set. - pub fn actors_to_cancel(&self) -> HashSet { - match &self.command { - Command::CancelStreamingJob(table_fragments) => table_fragments.backfill_actor_ids(), - _ => Default::default(), - } - } - /// For `CancelStreamingJob`, returns the table id of the target table. pub fn table_to_cancel(&self) -> Option { match &self.command { diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 3bad3cbd15a2..7e24f4564df4 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -151,7 +151,7 @@ pub struct GlobalBarrierManagerContext { sink_manager: SinkCoordinatorManager, - metrics: Arc, + pub(super) metrics: Arc, stream_rpc_manager: StreamRpcManager, @@ -195,21 +195,19 @@ struct CheckpointControl { /// Save the state and message of barrier in order. command_ctx_queue: VecDeque, - metrics: Arc, - - /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) - finished_jobs: Vec, + context: GlobalBarrierManagerContext, } impl CheckpointControl { - fn new(metrics: Arc) -> Self { + fn new(context: GlobalBarrierManagerContext) -> Self { Self { command_ctx_queue: Default::default(), - metrics, - finished_jobs: Default::default(), + context, } } +} +impl CreateMviewProgressTracker { /// Stash a command to finish later. fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { self.finished_jobs.push(finished_job); @@ -232,39 +230,32 @@ impl CheckpointControl { Ok(!self.finished_jobs.is_empty()) } - fn cancel_command(&mut self, cancelled_job: TrackingJob) { - if let TrackingJob::New(cancelled_command) = cancelled_job { - if let Some(index) = self.command_ctx_queue.iter().position(|x| { - x.command_ctx.prev_epoch.value() == cancelled_command.context.prev_epoch.value() - }) { - self.command_ctx_queue.remove(index); - } - } else { - // Recovered jobs do not need to be cancelled since only `RUNNING` actors will get recovered. - } - } - - fn cancel_stashed_command(&mut self, id: TableId) { + fn cancel_command(&mut self, id: TableId) { + let _ = self.progress_map.remove(&id); self.finished_jobs .retain(|x| x.table_to_create() != Some(id)); + self.actor_map.retain(|_, table_id| *table_id != id); } +} +impl CheckpointControl { /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { - self.metrics.in_flight_barrier_nums.set( + self.context.metrics.in_flight_barrier_nums.set( self.command_ctx_queue .iter() .filter(|x| matches!(x.state, InFlight)) .count() as i64, ); - self.metrics + self.context + .metrics .all_barrier_nums .set(self.command_ctx_queue.len() as i64); } /// Enqueue a barrier command, and init its state to `InFlight`. fn enqueue_command(&mut self, command_ctx: Arc, notifiers: Vec) { - let timer = self.metrics.barrier_latency.start_timer(); + let timer = self.context.metrics.barrier_latency.start_timer(); self.command_ctx_queue.push_back(EpochNode { timer: Some(timer), @@ -284,7 +275,11 @@ impl CheckpointControl { result: Vec, ) -> Vec { // change state to complete, and wait for nodes with the smaller epoch to commit - let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); + let wait_commit_timer = self + .context + .metrics + .barrier_wait_commit_latency + .start_timer(); if let Some(node) = self .command_ctx_queue .iter_mut() @@ -340,11 +335,6 @@ impl CheckpointControl { .iter() .any(|x| x.command_ctx.prev_epoch.value().0 == epoch) } - - /// We need to make sure there are no changes when doing recovery - pub fn clear_changes(&mut self) { - self.finished_jobs.clear(); - } } /// The state and message of this barrier, a node for concurrent checkpoint. @@ -400,7 +390,6 @@ impl GlobalBarrierManager { InflightActorInfo::default(), None, ); - let checkpoint_control = CheckpointControl::new(metrics.clone()); let active_streaming_nodes = ActiveStreamingWorkerNodes::uninitialized(); @@ -419,6 +408,8 @@ impl GlobalBarrierManager { env: env.clone(), }; + let checkpoint_control = CheckpointControl::new(context.clone()); + let rpc_manager = BarrierRpcManager::new(context.clone()); Self { @@ -756,7 +747,6 @@ impl GlobalBarrierManager { err: MetaError, fail_nodes: impl IntoIterator, ) { - self.checkpoint_control.clear_changes(); self.rpc_manager.clear(); for node in fail_nodes { @@ -852,20 +842,13 @@ impl GlobalBarrierManager { notifier.notify_collected(); }); - // Save `cancelled_command` for Create MVs. - let actors_to_cancel = node.command_ctx.actors_to_cancel(); - let cancelled_command = if !actors_to_cancel.is_empty() { - let mut tracker = self.context.tracker.lock().await; - tracker.find_cancelled_command(actors_to_cancel) - } else { - None - }; + // Notify about collected. + let version_stats = self.context.hummock_manager.get_version_stats().await; + let mut tracker = self.context.tracker.lock().await; // Save `finished_commands` for Create MVs. let finished_commands = { let mut commands = vec![]; - let version_stats = self.context.hummock_manager.get_version_stats().await; - let mut tracker = self.context.tracker.lock().await; // Add the command to tracker. if let Some(command) = tracker.add( TrackingCommand { @@ -891,21 +874,16 @@ impl GlobalBarrierManager { }; for command in finished_commands { - self.checkpoint_control.stash_command_to_finish(command); + tracker.stash_command_to_finish(command); } - if let Some(command) = cancelled_command { - self.checkpoint_control.cancel_command(command); - } else if let Some(table_id) = node.command_ctx.table_to_cancel() { + if let Some(table_id) = node.command_ctx.table_to_cancel() { // the cancelled command is possibly stashed in `finished_commands` and waiting // for checkpoint, we should also clear it. - self.checkpoint_control.cancel_stashed_command(table_id); + tracker.cancel_command(table_id); } - let remaining = self - .checkpoint_control - .finish_jobs(kind.is_checkpoint()) - .await?; + let remaining = tracker.finish_jobs(kind.is_checkpoint()).await?; // If there are remaining commands (that requires checkpoint to finish), we force // the next barrier to be a checkpoint. if remaining { diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index f22c5a2bbb21..53ba3f511b64 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -16,7 +16,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::Epoch; use risingwave_pb::ddl_service::DdlProgress; @@ -44,7 +43,7 @@ enum BackfillState { /// Progress of all actors containing backfill executors while creating mview. #[derive(Debug)] -struct Progress { +pub(super) struct Progress { states: HashMap, done_count: usize, @@ -250,10 +249,13 @@ pub(super) struct TrackingCommand { /// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`. pub(super) struct CreateMviewProgressTracker { /// Progress of the create-mview DDL indicated by the TableId. - progress_map: HashMap, + pub(super) progress_map: HashMap, /// Find the epoch of the create-mview DDL by the actor containing the backfill executors. - actor_map: HashMap, + pub(super) actor_map: HashMap, + + /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) + pub(super) finished_jobs: Vec, } impl CreateMviewProgressTracker { @@ -313,6 +315,7 @@ impl CreateMviewProgressTracker { Self { progress_map, actor_map, + finished_jobs: Vec::new(), } } @@ -320,6 +323,7 @@ impl CreateMviewProgressTracker { Self { progress_map: Default::default(), actor_map: Default::default(), + finished_jobs: Vec::new(), } } @@ -338,27 +342,6 @@ impl CreateMviewProgressTracker { .collect() } - /// Try to find the target create-streaming-job command from track. - /// - /// Return the target command as it should be cancelled based on the input actors. - pub fn find_cancelled_command( - &mut self, - actors_to_cancel: HashSet, - ) -> Option { - let epochs = actors_to_cancel - .into_iter() - .map(|actor_id| self.actor_map.get(&actor_id)) - .collect_vec(); - assert!(epochs.iter().all_equal()); - // If the target command found in progress map, return and remove it. Note that the command - // should have finished if not found. - if let Some(Some(epoch)) = epochs.first() { - Some(self.progress_map.remove(epoch).unwrap().1) - } else { - None - } - } - /// Add a new create-mview DDL command to track. /// /// If the actors to track is empty, return the given command as it can be finished immediately. @@ -496,7 +479,7 @@ impl CreateMviewProgressTracker { table_id ); - // Clean-up the mapping from actors to DDL epoch. + // Clean-up the mapping from actors to DDL table_id. for actor in o.get().0.actors() { self.actor_map.remove(&actor); } From 3e885e9db9e79ba0d99bdfb5017c7df46769d3e9 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 19 Feb 2024 14:22:17 +0800 Subject: [PATCH 04/15] move code --- src/meta/src/barrier/mod.rs | 33 +----------------------------- src/meta/src/barrier/progress.rs | 35 +++++++++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 101085f40cb6..9b3fa3a4d65b 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -50,7 +50,7 @@ use self::notifier::Notifier; use self::progress::TrackingCommand; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::BarrierInfo; -use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob}; +use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::BarrierRpcManager; use crate::barrier::state::BarrierManagerState; use crate::barrier::BarrierEpochState::{Completed, InFlight}; @@ -208,37 +208,6 @@ impl CheckpointControl { } } -impl CreateMviewProgressTracker { - /// Stash a command to finish later. - fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { - self.finished_jobs.push(finished_job); - } - - /// Finish stashed jobs. - /// If checkpoint, means all jobs can be finished. - /// If not checkpoint, jobs which do not require checkpoint can be finished. - /// - /// Returns whether there are still remaining stashed jobs to finish. - async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult { - for job in self - .finished_jobs - .extract_if(|job| checkpoint || !job.is_checkpoint_required()) - { - // The command is ready to finish. We can now call `pre_finish`. - job.pre_finish().await?; - job.notify_finished(); - } - Ok(!self.finished_jobs.is_empty()) - } - - fn cancel_command(&mut self, id: TableId) { - let _ = self.progress_map.remove(&id); - self.finished_jobs - .retain(|x| x.table_to_create() != Some(id)); - self.actor_map.retain(|_, table_id| *table_id != id); - } -} - impl CheckpointControl { /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 53ba3f511b64..5c1e701e6fc8 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -249,13 +249,13 @@ pub(super) struct TrackingCommand { /// 4. With `actor_map` we can use an actor's `ActorId` to find the ID of the `StreamJob`. pub(super) struct CreateMviewProgressTracker { /// Progress of the create-mview DDL indicated by the TableId. - pub(super) progress_map: HashMap, + progress_map: HashMap, /// Find the epoch of the create-mview DDL by the actor containing the backfill executors. - pub(super) actor_map: HashMap, + actor_map: HashMap, /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) - pub(super) finished_jobs: Vec, + finished_jobs: Vec, } impl CreateMviewProgressTracker { @@ -342,6 +342,35 @@ impl CreateMviewProgressTracker { .collect() } + /// Stash a command to finish later. + pub(super) fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { + self.finished_jobs.push(finished_job); + } + + /// Finish stashed jobs. + /// If checkpoint, means all jobs can be finished. + /// If not checkpoint, jobs which do not require checkpoint can be finished. + /// + /// Returns whether there are still remaining stashed jobs to finish. + pub(super) async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult { + for job in self + .finished_jobs + .extract_if(|job| checkpoint || !job.is_checkpoint_required()) + { + // The command is ready to finish. We can now call `pre_finish`. + job.pre_finish().await?; + job.notify_finished(); + } + Ok(!self.finished_jobs.is_empty()) + } + + pub(super) fn cancel_command(&mut self, id: TableId) { + let _ = self.progress_map.remove(&id); + self.finished_jobs + .retain(|x| x.table_to_create() != Some(id)); + self.actor_map.retain(|_, table_id| *table_id != id); + } + /// Add a new create-mview DDL command to track. /// /// If the actors to track is empty, return the given command as it can be finished immediately. From b02a7b447fa137a1383c2da278bb54f7b6eadd01 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 19 Feb 2024 14:24:08 +0800 Subject: [PATCH 05/15] minor --- src/meta/src/barrier/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 9b3fa3a4d65b..7e0726fa99fc 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -206,9 +206,7 @@ impl CheckpointControl { context, } } -} -impl CheckpointControl { /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { self.context.metrics.in_flight_barrier_nums.set( From 77e14b84178099c2980d37875181accb04439dbe Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 23 Feb 2024 16:19:53 +0800 Subject: [PATCH 06/15] add comment --- src/meta/src/barrier/mod.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index eb905c5ad1e8..f6890b8118f6 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -215,16 +215,29 @@ impl CheckpointControl { } } + fn inflight_command_num(&self) -> usize { + self.inflight_command_ctx_queue.len() + } + + fn total_command_num(&self) -> usize { + self.inflight_command_ctx_queue.len() + + if self.completing_command.is_some() { + 1 + } else { + 0 + } + } + /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { self.context .metrics .in_flight_barrier_nums - .set(self.inflight_command_ctx_queue.len() as i64); + .set(self.inflight_command_num() as i64); self.context .metrics .all_barrier_nums - .set(self.inflight_command_ctx_queue.len() as i64); + .set(self.total_command_num() as i64); } /// Enqueue a barrier command, and init its state to `InFlight`. @@ -253,7 +266,7 @@ impl CheckpointControl { /// Pause inject barrier until True. fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool { - let in_flight_not_full = self.inflight_command_ctx_queue.len() < in_flight_barrier_nums; + let in_flight_not_full = self.inflight_command_num() < in_flight_barrier_nums; // Whether some command requires pausing concurrent barrier. If so, it must be the last one. let should_pause = self @@ -312,6 +325,9 @@ struct InflightCommand { struct CompletingCommand { command_ctx: Arc, + // The join handle of a spawned task that completes the barrier. + // The return value indicate whether there is some create streaming job command + // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier join_handle: JoinHandle>, } From 5cf1b1e2aabb13b313a5e71f08b8bbb71050b22f Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 23 Feb 2024 18:30:58 +0800 Subject: [PATCH 07/15] reduce diff --- src/meta/src/barrier/mod.rs | 234 +++++++++++++++++++----------------- src/meta/src/barrier/rpc.rs | 45 ++++++- 2 files changed, 167 insertions(+), 112 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index f6890b8118f6..fcfc7dcf0b05 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -26,11 +26,11 @@ use fail::fail_point; use futures::FutureExt; use itertools::Itertools; use prometheus::HistogramTimer; -use risingwave_common::bail; use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; +use risingwave_common::{bail, must_match}; use risingwave_hummock_sdk::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, }; @@ -45,7 +45,7 @@ use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgres use risingwave_pb::stream_service::BarrierCompleteResponse; use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; -use tokio::sync::{oneshot, Mutex}; +use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::{info, warn, Instrument}; @@ -55,7 +55,7 @@ use self::progress::TrackingCommand; use crate::barrier::info::InflightActorInfo; use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::CreateMviewProgressTracker; -use crate::barrier::rpc::BarrierCollectFuture; +use crate::barrier::rpc::BarrierRpcManager; use crate::barrier::state::BarrierManagerState; use crate::hummock::{CommitEpochInfo, HummockManagerRef}; use crate::manager::sink_coordination::SinkCoordinatorManager; @@ -189,15 +189,15 @@ pub struct GlobalBarrierManager { checkpoint_control: CheckpointControl, - active_streaming_nodes: ActiveStreamingWorkerNodes, + rpc_manager: BarrierRpcManager, - prev_injecting_barrier: Option>, + active_streaming_nodes: ActiveStreamingWorkerNodes, } /// Controls the concurrent execution of commands. struct CheckpointControl { /// Save the state and message of barrier in order. - inflight_command_ctx_queue: VecDeque, + command_ctx_queue: VecDeque, /// Command that has been collected but is still completing. /// The join handle of the completing future is stored. @@ -209,18 +209,14 @@ struct CheckpointControl { impl CheckpointControl { fn new(context: GlobalBarrierManagerContext) -> Self { Self { - inflight_command_ctx_queue: Default::default(), + command_ctx_queue: Default::default(), completing_command: None, context, } } - fn inflight_command_num(&self) -> usize { - self.inflight_command_ctx_queue.len() - } - fn total_command_num(&self) -> usize { - self.inflight_command_ctx_queue.len() + self.command_ctx_queue.len() + if self.completing_command.is_some() { 1 } else { @@ -230,10 +226,12 @@ impl CheckpointControl { /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { - self.context - .metrics - .in_flight_barrier_nums - .set(self.inflight_command_num() as i64); + self.context.metrics.in_flight_barrier_nums.set( + self.command_ctx_queue + .iter() + .filter(|x| matches!(x.state, BarrierEpochState::InFlight)) + .count() as i64, + ); self.context .metrics .all_barrier_nums @@ -241,43 +239,54 @@ impl CheckpointControl { } /// Enqueue a barrier command, and init its state to `InFlight`. - fn enqueue_inflight_command( - &mut self, - command_ctx: Arc, - notifiers: Vec, - barrier_collect_future: BarrierCollectFuture, - ) { - let enqueue_time = self.context.metrics.barrier_latency.start_timer(); + fn enqueue_command(&mut self, command_ctx: Arc, notifiers: Vec) { + let timer = self.context.metrics.barrier_latency.start_timer(); - if let Some(command) = self.inflight_command_ctx_queue.back() { - assert_eq!( - command.command_ctx.curr_epoch.value(), - command_ctx.prev_epoch.value() - ); - } - - self.inflight_command_ctx_queue.push_back(InflightCommand { + self.command_ctx_queue.push_back(EpochNode { + enqueue_time: timer, + state: BarrierEpochState::InFlight, command_ctx, - barrier_collect_future, - enqueue_time, notifiers, }); } + /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes + /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. + fn barrier_collected(&mut self, prev_epoch: u64, result: Vec) { + if let Some(node) = self + .command_ctx_queue + .iter_mut() + .find(|x| x.command_ctx.prev_epoch.value().0 == prev_epoch) + { + assert!(matches!(node.state, BarrierEpochState::InFlight)); + node.state = BarrierEpochState::Completed(result); + } else { + panic!( + "received barrier complete response for an unknown epoch: {}", + prev_epoch + ); + } + } + /// Pause inject barrier until True. fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool { - let in_flight_not_full = self.inflight_command_num() < in_flight_barrier_nums; + let in_flight_not_full = self + .command_ctx_queue + .iter() + .filter(|x| matches!(x.state, BarrierEpochState::InFlight)) + .count() + < in_flight_barrier_nums; // Whether some command requires pausing concurrent barrier. If so, it must be the last one. let should_pause = self - .inflight_command_ctx_queue + .command_ctx_queue .back() - .map(|command| command.command_ctx.command.should_pause_inject_barrier()) + .map(|x| x.command_ctx.command.should_pause_inject_barrier()) .unwrap_or(false); debug_assert_eq!( - self.inflight_command_ctx_queue + self.command_ctx_queue .iter() - .any(|command| command.command_ctx.command.should_pause_inject_barrier()), + .any(|x| x.command_ctx.command.should_pause_inject_barrier()), should_pause ); @@ -303,7 +312,7 @@ impl CheckpointControl { Ok(Ok(_)) => {} }; } - for command in self.inflight_command_ctx_queue.drain(..) { + for command in self.command_ctx_queue.drain(..) { for notifier in command.notifiers { notifier.notify_collection_failed(err.clone()); } @@ -312,16 +321,28 @@ impl CheckpointControl { } } -struct InflightCommand { - command_ctx: Arc, - - barrier_collect_future: BarrierCollectFuture, - +/// The state and message of this barrier, a node for concurrent checkpoint. +pub struct EpochNode { + /// Timer for recording barrier latency, taken after `complete_barriers`. enqueue_time: HistogramTimer, + /// Whether this barrier is in-flight or completed. + state: BarrierEpochState, + /// Context of this command to generate barrier and do some post jobs. + command_ctx: Arc, + /// Notifiers of this barrier. notifiers: Vec, } +/// The state of barrier. +enum BarrierEpochState { + /// This barrier is current in-flight on the stream graph of compute nodes. + InFlight, + + /// This barrier is completed or failed. + Completed(Vec), +} + struct CompletingCommand { command_ctx: Arc, @@ -380,6 +401,8 @@ impl GlobalBarrierManager { let checkpoint_control = CheckpointControl::new(context.clone()); + let rpc_manager = BarrierRpcManager::new(context.clone()); + Self { enable_recovery, scheduled_barriers, @@ -388,8 +411,8 @@ impl GlobalBarrierManager { env, state: initial_invalid_state, checkpoint_control, + rpc_manager, active_streaming_nodes, - prev_injecting_barrier: None, } } @@ -570,6 +593,18 @@ impl GlobalBarrierManager { .set_checkpoint_frequency(p.checkpoint_frequency() as usize) } } + // Barrier completes. + collect_result = self.rpc_manager.next_collected_barrier() => { + match collect_result.result { + Ok(resps) => { + self.checkpoint_control.barrier_collected(collect_result.prev_epoch, resps); + }, + Err(e) => { + fail_point!("inject_barrier_err_success"); + self.failure_recovery(e).await; + } + } + } complete_result = self.checkpoint_control.next_completed_barrier() => { match complete_result { Ok((command_context, remaining)) => { @@ -632,14 +667,7 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - // this is to notify that the barrier has been injected so that the next - // barrier can be injected to avoid out of order barrier injection. - // TODO: can be removed when bidi-stream control in implemented. - let (inject_tx, inject_rx) = oneshot::channel(); - let prev_inject_rx = self.prev_injecting_barrier.replace(inject_rx); - let await_collect_future = - self.context - .inject_barrier(command_ctx.clone(), Some(inject_tx), prev_inject_rx); + self.rpc_manager.inject_barrier(command_ctx.clone()); // Notify about the injection. let prev_paused_reason = self.state.paused_reason(); @@ -656,15 +684,12 @@ impl GlobalBarrierManager { // Update the paused state after the barrier is injected. self.state.set_paused_reason(curr_paused_reason); // Record the in-flight barrier. - self.checkpoint_control.enqueue_inflight_command( - command_ctx.clone(), - notifiers, - await_collect_future, - ); + self.checkpoint_control + .enqueue_command(command_ctx.clone(), notifiers); } async fn failure_recovery(&mut self, err: MetaError) { - self.prev_injecting_barrier = None; + self.rpc_manager.clear(); self.checkpoint_control.clear_and_fail_all_nodes(&err).await; if self.enable_recovery { @@ -856,59 +881,46 @@ impl CheckpointControl { pub(super) fn next_completed_barrier( &mut self, ) -> impl Future, bool)>> + '_ { + if self.completing_command.is_none() { + // If there is no completing barrier, try to start completing the earliest barrier if + // it has been collected. + if let Some(EpochNode { + state: BarrierEpochState::Completed(_), + .. + }) = self.command_ctx_queue.front_mut() + { + let node = self.command_ctx_queue.pop_front().expect("non-empty"); + let resps = must_match!(node.state, BarrierEpochState::Completed(resps) => resps); + let join_handle = tokio::spawn(self.context.clone().complete_barrier( + node.command_ctx.clone(), + resps, + node.notifiers, + node.enqueue_time, + )); + let _ = self.completing_command.insert(CompletingCommand { + command_ctx: node.command_ctx, + join_handle, + }); + } + } + poll_fn(|cx| { - let completing_command = match &mut self.completing_command { - Some(command) => command, - None => { - // If there is no completing barrier, try to start completing the earliest barrier if - // it has been collected. - if let Some(inflight_command) = self.inflight_command_ctx_queue.front_mut() { - // If the earliest barrier has been collected (i.e. return Poll::Ready), start completing - // the barrier is the collection is ok. When it is still pending, the current poll cannot - // make any progress and will return Poll::Pending. - let BarrierCollectResult { prev_epoch, result } = - ready!(inflight_command.barrier_collect_future.poll_unpin(cx)); - let resps = match result { - Ok(resps) => resps, - Err(err) => { - // FIXME: If it is a connector source error occurred in the init barrier, we should pass - // back to frontend - fail_point!("inject_barrier_err_success"); - warn!(%prev_epoch, error = %err.as_report(), "Failed to collect epoch"); - return Poll::Ready(Err(err)); - } - }; - // We only pop the command on successful collect. On error, no need to pop the node out, - // because the error will trigger an error immediately after return, which clears the failed node. - let earliest_inflight_command = - self.inflight_command_ctx_queue.pop_front().unwrap(); - let join_handle = tokio::spawn(self.context.clone().complete_barrier( - earliest_inflight_command.command_ctx.clone(), - resps, - earliest_inflight_command.notifiers, - earliest_inflight_command.enqueue_time, - )); - self.completing_command.insert(CompletingCommand { - command_ctx: earliest_inflight_command.command_ctx, - join_handle, - }) - } else { - return Poll::Pending; - } - } - }; - let join_result = ready!(completing_command.join_handle.poll_unpin(cx)) - .map_err(|e| { - anyhow!("failed to join completing command: {:?}", e.as_report()).into() - }) - .and_then(|result| result); - let completed_command = self.completing_command.take().expect("non-empty"); - let result = join_result.map(|has_remaining| { - let command_ctx = completed_command.command_ctx.clone(); - (command_ctx, has_remaining) - }); - - Poll::Ready(result) + if let Some(completing_command) = &mut self.completing_command { + let join_result = ready!(completing_command.join_handle.poll_unpin(cx)) + .map_err(|e| { + anyhow!("failed to join completing command: {:?}", e.as_report()).into() + }) + .and_then(|result| result); + let completed_command = self.completing_command.take().expect("non-empty"); + let result = join_result.map(|has_remaining| { + let command_ctx = completed_command.command_ctx.clone(); + (command_ctx, has_remaining) + }); + + Poll::Ready(result) + } else { + Poll::Pending + } }) } } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index b258cbec5952..c4c8e40b1f4e 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -19,7 +19,8 @@ use std::sync::Arc; use anyhow::anyhow; use fail::fail_point; use futures::future::try_join_all; -use futures::FutureExt; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::hash::ActorId; @@ -32,6 +33,7 @@ use risingwave_pb::stream_service::{ }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClient; +use rw_futures_util::pending_on_none; use tokio::sync::oneshot; use tracing::Instrument; use uuid::Uuid; @@ -41,6 +43,47 @@ use super::{BarrierCollectResult, GlobalBarrierManagerContext}; use crate::manager::{MetaSrvEnv, WorkerId}; use crate::MetaResult; +pub(super) struct BarrierRpcManager { + context: GlobalBarrierManagerContext, + + /// Futures that await on the completion of barrier. + injected_in_progress_barrier: FuturesUnordered, + + prev_injecting_barrier: Option>, +} + +impl BarrierRpcManager { + pub(super) fn new(context: GlobalBarrierManagerContext) -> Self { + Self { + context, + injected_in_progress_barrier: FuturesUnordered::new(), + prev_injecting_barrier: None, + } + } + + pub(super) fn clear(&mut self) { + self.injected_in_progress_barrier = FuturesUnordered::new(); + self.prev_injecting_barrier = None; + } + + pub(super) fn inject_barrier(&mut self, command_context: Arc) { + // this is to notify that the barrier has been injected so that the next + // barrier can be injected to avoid out of order barrier injection. + // TODO: can be removed when bidi-stream control in implemented. + let (inject_tx, inject_rx) = oneshot::channel(); + let prev_inject_rx = self.prev_injecting_barrier.replace(inject_rx); + let await_complete_future = + self.context + .inject_barrier(command_context, Some(inject_tx), prev_inject_rx); + self.injected_in_progress_barrier + .push(await_complete_future); + } + + pub(super) async fn next_collected_barrier(&mut self) -> BarrierCollectResult { + pending_on_none(self.injected_in_progress_barrier.next()).await + } +} + pub(super) type BarrierCollectFuture = impl Future + Send + 'static; impl GlobalBarrierManagerContext { From a00a500e2303a03d7ad6cfbb9b9dc8f4b7a88610 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 26 Feb 2024 15:45:06 +0800 Subject: [PATCH 08/15] continue to complete collected barrier in recovery --- src/meta/src/barrier/mod.rs | 163 ++++++++++++++++++++++-------------- 1 file changed, 102 insertions(+), 61 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index fcfc7dcf0b05..80770224168c 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -16,6 +16,7 @@ use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet, VecDeque}; use std::future::{poll_fn, Future}; +use std::mem::replace; use std::sync::Arc; use std::task::{ready, Poll}; use std::time::Duration; @@ -47,7 +48,7 @@ use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; -use tracing::{info, warn, Instrument}; +use tracing::{error, info, warn, Instrument}; use self::command::CommandContext; use self::notifier::Notifier; @@ -201,7 +202,7 @@ struct CheckpointControl { /// Command that has been collected but is still completing. /// The join handle of the completing future is stored. - completing_command: Option, + completing_command: CompletingCommand, context: GlobalBarrierManagerContext, } @@ -210,17 +211,16 @@ impl CheckpointControl { fn new(context: GlobalBarrierManagerContext) -> Self { Self { command_ctx_queue: Default::default(), - completing_command: None, + completing_command: CompletingCommand::None, context, } } fn total_command_num(&self) -> usize { self.command_ctx_queue.len() - + if self.completing_command.is_some() { - 1 - } else { - 0 + + match &self.completing_command { + CompletingCommand::Completing { .. } => 1, + _ => 0, } } @@ -259,7 +259,7 @@ impl CheckpointControl { .find(|x| x.command_ctx.prev_epoch.value().0 == prev_epoch) { assert!(matches!(node.state, BarrierEpochState::InFlight)); - node.state = BarrierEpochState::Completed(result); + node.state = BarrierEpochState::Collected(result); } else { panic!( "received barrier complete response for an unknown epoch: {}", @@ -294,29 +294,63 @@ impl CheckpointControl { } /// We need to make sure there are no changes when doing recovery - pub async fn clear_and_fail_all_nodes(&mut self, err: &MetaError) { + pub async fn clear_on_err(&mut self, err: &MetaError) { // join spawned completing command to finish no matter it succeeds or not. - if let Some(command) = self.completing_command.take() { - info!( - prev_epoch = ?command.command_ctx.prev_epoch, - curr_epoch = ?command.command_ctx.curr_epoch, - "waiting for completing command to finish in recovery" - ); - match command.join_handle.await { - Err(e) => { - warn!(err = ?e.as_report(), "failed to join completing task"); - } - Ok(Err(e)) => { - warn!(err = ?e.as_report(), "failed to complete barrier during clear"); + let is_err = match replace(&mut self.completing_command, CompletingCommand::None) { + CompletingCommand::None => false, + CompletingCommand::Completing { + command_ctx, + join_handle, + } => { + info!( + prev_epoch = ?command_ctx.prev_epoch, + curr_epoch = ?command_ctx.curr_epoch, + "waiting for completing command to finish in recovery" + ); + match join_handle.await { + Err(e) => { + warn!(err = ?e.as_report(), "failed to join completing task"); + } + Ok(Err(e)) => { + warn!(err = ?e.as_report(), "failed to complete barrier during clear"); + } + Ok(Ok(_)) => {} + }; + false + } + CompletingCommand::Err(_) => true, + }; + if !is_err { + // continue to finish the pending collected barrier. + while let Some(EpochNode { + state: BarrierEpochState::Collected(_), + .. + }) = self.command_ctx_queue.front() + { + let node = self.command_ctx_queue.pop_front().expect("non-empty"); + let command_ctx = node.command_ctx.clone(); + if let Err(e) = self.context.clone().complete_barrier(node).await { + error!( + prev_epoch = ?command_ctx.prev_epoch, + curr_epoch = ?command_ctx.curr_epoch, + err = ?e.as_report(), + "failed to complete barrier during recovery" + ); + break; + } else { + info!( + prev_epoch = ?command_ctx.prev_epoch, + curr_epoch = ?command_ctx.curr_epoch, + "succeed to complete barrier during recovery" + ) } - Ok(Ok(_)) => {} - }; + } } - for command in self.command_ctx_queue.drain(..) { - for notifier in command.notifiers { + for node in self.command_ctx_queue.drain(..) { + for notifier in node.notifiers { notifier.notify_collection_failed(err.clone()); } - command.enqueue_time.observe_duration(); + node.enqueue_time.observe_duration(); } } } @@ -339,17 +373,21 @@ enum BarrierEpochState { /// This barrier is current in-flight on the stream graph of compute nodes. InFlight, - /// This barrier is completed or failed. - Completed(Vec), + /// This barrier is collected. + Collected(Vec), } -struct CompletingCommand { - command_ctx: Arc, +enum CompletingCommand { + None, + Completing { + command_ctx: Arc, - // The join handle of a spawned task that completes the barrier. - // The return value indicate whether there is some create streaming job command - // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier - join_handle: JoinHandle>, + // The join handle of a spawned task that completes the barrier. + // The return value indicate whether there is some create streaming job command + // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier + join_handle: JoinHandle>, + }, + Err(MetaError), } /// The result of barrier collect. @@ -690,7 +728,7 @@ impl GlobalBarrierManager { async fn failure_recovery(&mut self, err: MetaError) { self.rpc_manager.clear(); - self.checkpoint_control.clear_and_fail_all_nodes(&err).await; + self.checkpoint_control.clear_on_err(&err).await; if self.enable_recovery { self.context @@ -717,13 +755,15 @@ impl GlobalBarrierManager { impl GlobalBarrierManagerContext { /// Try to commit this node. If err, returns - async fn complete_barrier( - self, - command_ctx: Arc, - resps: Vec, - mut notifiers: Vec, - enqueue_time: HistogramTimer, - ) -> MetaResult { + async fn complete_barrier(self, node: EpochNode) -> MetaResult { + let EpochNode { + command_ctx, + mut notifiers, + enqueue_time, + state, + .. + } = node; + let resps = must_match!(state, BarrierEpochState::Collected(resps) => resps); let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps); if let Err(e) = self.update_snapshot(&command_ctx, commit_info).await { @@ -881,41 +921,42 @@ impl CheckpointControl { pub(super) fn next_completed_barrier( &mut self, ) -> impl Future, bool)>> + '_ { - if self.completing_command.is_none() { + if matches!(&self.completing_command, CompletingCommand::None) { // If there is no completing barrier, try to start completing the earliest barrier if // it has been collected. if let Some(EpochNode { - state: BarrierEpochState::Completed(_), + state: BarrierEpochState::Collected(_), .. }) = self.command_ctx_queue.front_mut() { let node = self.command_ctx_queue.pop_front().expect("non-empty"); - let resps = must_match!(node.state, BarrierEpochState::Completed(resps) => resps); - let join_handle = tokio::spawn(self.context.clone().complete_barrier( - node.command_ctx.clone(), - resps, - node.notifiers, - node.enqueue_time, - )); - let _ = self.completing_command.insert(CompletingCommand { - command_ctx: node.command_ctx, + let command_ctx = node.command_ctx.clone(); + let join_handle = tokio::spawn(self.context.clone().complete_barrier(node)); + self.completing_command = CompletingCommand::Completing { + command_ctx, join_handle, - }); + }; } } poll_fn(|cx| { - if let Some(completing_command) = &mut self.completing_command { - let join_result = ready!(completing_command.join_handle.poll_unpin(cx)) + if let CompletingCommand::Completing { + join_handle, + command_ctx, + } = &mut self.completing_command + { + let join_result = ready!(join_handle.poll_unpin(cx)) .map_err(|e| { anyhow!("failed to join completing command: {:?}", e.as_report()).into() }) .and_then(|result| result); - let completed_command = self.completing_command.take().expect("non-empty"); - let result = join_result.map(|has_remaining| { - let command_ctx = completed_command.command_ctx.clone(); - (command_ctx, has_remaining) - }); + let command_ctx = command_ctx.clone(); + if let Err(e) = &join_result { + self.completing_command = CompletingCommand::Err(e.clone()); + } else { + self.completing_command = CompletingCommand::None; + } + let result = join_result.map(move |has_remaining| (command_ctx, has_remaining)); Poll::Ready(result) } else { From 81d85e3c1bfa297c5eb8d4aa21fc7bd64418347f Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 26 Feb 2024 16:34:07 +0800 Subject: [PATCH 09/15] use btreemap as queue --- src/meta/src/barrier/mod.rs | 76 ++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 30 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 80770224168c..1992f9e05f1d 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -14,9 +14,9 @@ use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::future::{poll_fn, Future}; -use std::mem::replace; +use std::mem::{replace, take}; use std::sync::Arc; use std::task::{ready, Poll}; use std::time::Duration; @@ -198,7 +198,7 @@ pub struct GlobalBarrierManager { /// Controls the concurrent execution of commands. struct CheckpointControl { /// Save the state and message of barrier in order. - command_ctx_queue: VecDeque, + command_ctx_queue: BTreeMap, /// Command that has been collected but is still completing. /// The join handle of the completing future is stored. @@ -228,7 +228,7 @@ impl CheckpointControl { fn update_barrier_nums_metrics(&self) { self.context.metrics.in_flight_barrier_nums.set( self.command_ctx_queue - .iter() + .values() .filter(|x| matches!(x.state, BarrierEpochState::InFlight)) .count() as i64, ); @@ -242,22 +242,27 @@ impl CheckpointControl { fn enqueue_command(&mut self, command_ctx: Arc, notifiers: Vec) { let timer = self.context.metrics.barrier_latency.start_timer(); - self.command_ctx_queue.push_back(EpochNode { - enqueue_time: timer, - state: BarrierEpochState::InFlight, - command_ctx, - notifiers, - }); + if let Some((_, node)) = self.command_ctx_queue.last_key_value() { + assert_eq!( + command_ctx.prev_epoch.value(), + node.command_ctx.curr_epoch.value() + ); + } + self.command_ctx_queue.insert( + command_ctx.prev_epoch.value().0, + EpochNode { + enqueue_time: timer, + state: BarrierEpochState::InFlight, + command_ctx, + notifiers, + }, + ); } /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. fn barrier_collected(&mut self, prev_epoch: u64, result: Vec) { - if let Some(node) = self - .command_ctx_queue - .iter_mut() - .find(|x| x.command_ctx.prev_epoch.value().0 == prev_epoch) - { + if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) { assert!(matches!(node.state, BarrierEpochState::InFlight)); node.state = BarrierEpochState::Collected(result); } else { @@ -272,7 +277,7 @@ impl CheckpointControl { fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool { let in_flight_not_full = self .command_ctx_queue - .iter() + .values() .filter(|x| matches!(x.state, BarrierEpochState::InFlight)) .count() < in_flight_barrier_nums; @@ -280,12 +285,17 @@ impl CheckpointControl { // Whether some command requires pausing concurrent barrier. If so, it must be the last one. let should_pause = self .command_ctx_queue - .back() - .map(|x| x.command_ctx.command.should_pause_inject_barrier()) + .last_key_value() + .map(|(_, x)| &x.command_ctx) + .or(match &self.completing_command { + CompletingCommand::None | CompletingCommand::Err(_) => None, + CompletingCommand::Completing { command_ctx, .. } => Some(command_ctx), + }) + .map(|command_ctx| command_ctx.command.should_pause_inject_barrier()) .unwrap_or(false); debug_assert_eq!( self.command_ctx_queue - .iter() + .values() .any(|x| x.command_ctx.command.should_pause_inject_barrier()), should_pause ); @@ -322,12 +332,15 @@ impl CheckpointControl { }; if !is_err { // continue to finish the pending collected barrier. - while let Some(EpochNode { - state: BarrierEpochState::Collected(_), - .. - }) = self.command_ctx_queue.front() + while let Some(( + _, + EpochNode { + state: BarrierEpochState::Collected(_), + .. + }, + )) = self.command_ctx_queue.first_key_value() { - let node = self.command_ctx_queue.pop_front().expect("non-empty"); + let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); let command_ctx = node.command_ctx.clone(); if let Err(e) = self.context.clone().complete_barrier(node).await { error!( @@ -346,7 +359,7 @@ impl CheckpointControl { } } } - for node in self.command_ctx_queue.drain(..) { + for (_, node) in take(&mut self.command_ctx_queue) { for notifier in node.notifiers { notifier.notify_collection_failed(err.clone()); } @@ -924,12 +937,15 @@ impl CheckpointControl { if matches!(&self.completing_command, CompletingCommand::None) { // If there is no completing barrier, try to start completing the earliest barrier if // it has been collected. - if let Some(EpochNode { - state: BarrierEpochState::Collected(_), - .. - }) = self.command_ctx_queue.front_mut() + if let Some(( + _, + EpochNode { + state: BarrierEpochState::Collected(_), + .. + }, + )) = self.command_ctx_queue.first_key_value() { - let node = self.command_ctx_queue.pop_front().expect("non-empty"); + let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); let command_ctx = node.command_ctx.clone(); let join_handle = tokio::spawn(self.context.clone().complete_barrier(node)); self.completing_command = CompletingCommand::Completing { From b4fc60a1ed44c1a41a940d9b7bd8f26f44d0a276 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 26 Feb 2024 16:35:41 +0800 Subject: [PATCH 10/15] increase timeout --- ci/workflows/pull-request.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index a67f915d943c..6e412d511766 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -537,7 +537,7 @@ steps: # - test-collector#v1.0.0: # files: "*-junit.xml" # format: "junit" - timeout_in_minutes: 25 + timeout_in_minutes: 30 cancel_on_build_failing: true retry: *auto-retry From b501998492aebe285ce855e127efd2b9bb6d9604 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 26 Feb 2024 17:08:56 +0800 Subject: [PATCH 11/15] fix assert --- src/meta/src/barrier/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 1992f9e05f1d..8befcebcfeb0 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -296,6 +296,14 @@ impl CheckpointControl { debug_assert_eq!( self.command_ctx_queue .values() + .map(|node| &node.command_ctx) + .chain( + match &self.completing_command { + CompletingCommand::None | CompletingCommand::Err(_) => None, + CompletingCommand::Completing { command_ctx, .. } => Some(command_ctx), + } + .iter() + ) .any(|x| x.command_ctx.command.should_pause_inject_barrier()), should_pause ); From 78aa1558cab0f638fc64c0623a6ff404d1f7b00c Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 26 Feb 2024 17:12:17 +0800 Subject: [PATCH 12/15] fix assert --- src/meta/src/barrier/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 8befcebcfeb0..d9494e538cb2 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -302,9 +302,9 @@ impl CheckpointControl { CompletingCommand::None | CompletingCommand::Err(_) => None, CompletingCommand::Completing { command_ctx, .. } => Some(command_ctx), } - .iter() + .into_iter() ) - .any(|x| x.command_ctx.command.should_pause_inject_barrier()), + .any(|command_ctx| command_ctx.command.should_pause_inject_barrier()), should_pause ); From 34db7885e9f04a3d1821de58960675214ab55e83 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 27 Feb 2024 14:16:40 +0800 Subject: [PATCH 13/15] reduce kill rate --- ci/workflows/pull-request.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 6e412d511766..caf90218a6ec 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -518,7 +518,7 @@ steps: retry: *auto-retry - label: "recovery test (deterministic simulation)" - command: "TEST_NUM=8 KILL_RATE=0.5 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh" + command: "TEST_NUM=8 KILL_RATE=0.4 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh" if: | !(build.pull_request.labels includes "ci/skip-ci") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation" @@ -537,7 +537,7 @@ steps: # - test-collector#v1.0.0: # files: "*-junit.xml" # format: "junit" - timeout_in_minutes: 30 + timeout_in_minutes: 25 cancel_on_build_failing: true retry: *auto-retry From f97e6a91ccd6a0caf7d47fd8224836f321af4f49 Mon Sep 17 00:00:00 2001 From: William Wen Date: Sun, 3 Mar 2024 16:02:24 +0800 Subject: [PATCH 14/15] fix comment --- src/meta/src/barrier/mod.rs | 71 ++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 36 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index e71e559070d1..d37bdb021a65 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -15,16 +15,14 @@ use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::future::{poll_fn, Future}; +use std::future::pending; use std::mem::{replace, take}; use std::sync::Arc; -use std::task::{ready, Poll}; use std::time::Duration; use anyhow::anyhow; use arc_swap::ArcSwap; use fail::fail_point; -use futures::FutureExt; use itertools::Itertools; use prometheus::HistogramTimer; use risingwave_common::catalog::TableId; @@ -198,6 +196,7 @@ pub struct GlobalBarrierManager { /// Controls the concurrent execution of commands. struct CheckpointControl { /// Save the state and message of barrier in order. + /// Key is the prev_epoch. command_ctx_queue: BTreeMap, /// Command that has been collected but is still completing. @@ -406,7 +405,7 @@ enum CompletingCommand { // The join handle of a spawned task that completes the barrier. // The return value indicate whether there is some create streaming job command // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier - join_handle: JoinHandle>, + join_handle: JoinHandle>, }, Err(MetaError), } @@ -666,11 +665,11 @@ impl GlobalBarrierManager { } complete_result = self.checkpoint_control.next_completed_barrier() => { match complete_result { - Ok((command_context, remaining)) => { + Ok(output) => { // If there are remaining commands (that requires checkpoint to finish), we force // the next barrier to be a checkpoint. - if remaining { - assert_matches!(command_context.kind, BarrierKind::Barrier); + if output.require_next_checkpoint { + assert_matches!(output.command_ctx.kind, BarrierKind::Barrier); self.scheduled_barriers.force_checkpoint_in_next_barrier(); } } @@ -778,7 +777,7 @@ impl GlobalBarrierManager { impl GlobalBarrierManagerContext { /// Try to commit this node. If err, returns - async fn complete_barrier(self, node: EpochNode) -> MetaResult { + async fn complete_barrier(self, node: EpochNode) -> MetaResult { let EpochNode { command_ctx, mut notifiers, @@ -798,13 +797,16 @@ impl GlobalBarrierManagerContext { notifiers.iter_mut().for_each(|notifier| { notifier.notify_collected(); }); - let result = self + let has_remaining = self .update_tracking_jobs(notifiers, command_ctx.clone(), create_mview_progress) - .await; + .await?; let duration_sec = enqueue_time.stop_and_record(); self.report_complete_event(duration_sec, &command_ctx); wait_commit_timer.observe_duration(); - result + Ok(BarrierCompleteOutput { + command_ctx, + require_next_checkpoint: has_remaining, + }) } async fn update_snapshot( @@ -940,10 +942,13 @@ impl GlobalBarrierManagerContext { } } +struct BarrierCompleteOutput { + command_ctx: Arc, + require_next_checkpoint: bool, +} + impl CheckpointControl { - pub(super) fn next_completed_barrier( - &mut self, - ) -> impl Future, bool)>> + '_ { + pub(super) async fn next_completed_barrier(&mut self) -> MetaResult { if matches!(&self.completing_command, CompletingCommand::None) { // If there is no completing barrier, try to start completing the earliest barrier if // it has been collected. @@ -965,30 +970,24 @@ impl CheckpointControl { } } - poll_fn(|cx| { - if let CompletingCommand::Completing { - join_handle, - command_ctx, - } = &mut self.completing_command - { - let join_result = ready!(join_handle.poll_unpin(cx)) - .map_err(|e| { - anyhow!("failed to join completing command: {:?}", e.as_report()).into() - }) - .and_then(|result| result); - let command_ctx = command_ctx.clone(); - if let Err(e) = &join_result { - self.completing_command = CompletingCommand::Err(e.clone()); - } else { - self.completing_command = CompletingCommand::None; - } - let result = join_result.map(move |has_remaining| (command_ctx, has_remaining)); - - Poll::Ready(result) + if let CompletingCommand::Completing { join_handle, .. } = &mut self.completing_command { + let join_result = join_handle + .await + .map_err(|e| { + anyhow!("failed to join completing command: {:?}", e.as_report()).into() + }) + .and_then(|result| result); + // It's important to reset the completing_command after await no matter the result is err + // or not, and otherwise the join handle will be polled again after ready. + if let Err(e) = &join_result { + self.completing_command = CompletingCommand::Err(e.clone()); } else { - Poll::Pending + self.completing_command = CompletingCommand::None; } - }) + join_result + } else { + pending().await + } } } From 66f916ef3ec0475cf0ea2ccd2f4886499526b0a0 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 4 Mar 2024 14:08:07 +0800 Subject: [PATCH 15/15] break on join err --- src/meta/src/barrier/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 22ce3f628b29..a59eb00c723e 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -327,13 +327,14 @@ impl CheckpointControl { match join_handle.await { Err(e) => { warn!(err = ?e.as_report(), "failed to join completing task"); + true } Ok(Err(e)) => { warn!(err = ?e.as_report(), "failed to complete barrier during clear"); + true } - Ok(Ok(_)) => {} - }; - false + Ok(Ok(_)) => false, + } } CompletingCommand::Err(_) => true, };