diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index ab96ae00af39..a8c143f768a8 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -519,7 +519,7 @@ steps: retry: *auto-retry - label: "recovery test (deterministic simulation)" - command: "TEST_NUM=8 KILL_RATE=0.5 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh" + command: "TEST_NUM=8 KILL_RATE=0.4 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh" if: | !(build.pull_request.labels includes "ci/skip-ci") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation" diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 466c6f259fd1..a59eb00c723e 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -14,20 +14,22 @@ use std::assert_matches::assert_matches; use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet, VecDeque}; -use std::mem::take; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::future::pending; +use std::mem::{replace, take}; use std::sync::Arc; use std::time::Duration; +use anyhow::anyhow; use arc_swap::ArcSwap; use fail::fail_point; use itertools::Itertools; use prometheus::HistogramTimer; -use risingwave_common::bail; use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; +use risingwave_common::{bail, must_match}; use risingwave_hummock_sdk::table_watermark::{ merge_multiple_new_table_watermarks, TableWatermarks, }; @@ -38,12 +40,13 @@ use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; +use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; use risingwave_pb::stream_service::BarrierCompleteResponse; use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; -use tracing::{info, warn, Instrument}; +use tracing::{error, info, warn, Instrument}; use self::command::CommandContext; use self::notifier::Notifier; @@ -53,7 +56,6 @@ use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::BarrierRpcManager; use crate::barrier::state::BarrierManagerState; -use crate::barrier::BarrierEpochState::{Completed, InFlight}; use crate::hummock::{CommitEpochInfo, HummockManagerRef}; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ @@ -194,7 +196,12 @@ pub struct GlobalBarrierManager { /// Controls the concurrent execution of commands. struct CheckpointControl { /// Save the state and message of barrier in order. - command_ctx_queue: VecDeque, + /// Key is the prev_epoch. + command_ctx_queue: BTreeMap, + + /// Command that has been collected but is still completing. + /// The join handle of the completing future is stored. + completing_command: CompletingCommand, context: GlobalBarrierManagerContext, } @@ -203,114 +210,176 @@ impl CheckpointControl { fn new(context: GlobalBarrierManagerContext) -> Self { Self { command_ctx_queue: Default::default(), + completing_command: CompletingCommand::None, context, } } + fn total_command_num(&self) -> usize { + self.command_ctx_queue.len() + + match &self.completing_command { + CompletingCommand::Completing { .. } => 1, + _ => 0, + } + } + /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { self.context.metrics.in_flight_barrier_nums.set( self.command_ctx_queue - .iter() - .filter(|x| matches!(x.state, InFlight)) + .values() + .filter(|x| matches!(x.state, BarrierEpochState::InFlight)) .count() as i64, ); self.context .metrics .all_barrier_nums - .set(self.command_ctx_queue.len() as i64); + .set(self.total_command_num() as i64); } /// Enqueue a barrier command, and init its state to `InFlight`. fn enqueue_command(&mut self, command_ctx: Arc, notifiers: Vec) { let timer = self.context.metrics.barrier_latency.start_timer(); - self.command_ctx_queue.push_back(EpochNode { - timer: Some(timer), - wait_commit_timer: None, - - state: InFlight, - command_ctx, - notifiers, - }); + if let Some((_, node)) = self.command_ctx_queue.last_key_value() { + assert_eq!( + command_ctx.prev_epoch.value(), + node.command_ctx.curr_epoch.value() + ); + } + self.command_ctx_queue.insert( + command_ctx.prev_epoch.value().0, + EpochNode { + enqueue_time: timer, + state: BarrierEpochState::InFlight, + command_ctx, + notifiers, + }, + ); } /// Change the state of this `prev_epoch` to `Completed`. Return continuous nodes /// with `Completed` starting from first node [`Completed`..`InFlight`) and remove them. - fn barrier_completed( - &mut self, - prev_epoch: u64, - result: Vec, - ) -> Vec { - // change state to complete, and wait for nodes with the smaller epoch to commit - let wait_commit_timer = self - .context - .metrics - .barrier_wait_commit_latency - .start_timer(); - if let Some(node) = self - .command_ctx_queue - .iter_mut() - .find(|x| x.command_ctx.prev_epoch.value().0 == prev_epoch) - { - assert!(matches!(node.state, InFlight)); - node.wait_commit_timer = Some(wait_commit_timer); - node.state = Completed(result); - }; - // Find all continuous nodes with 'Complete' starting from first node - let index = self - .command_ctx_queue - .iter() - .position(|x| !matches!(x.state, Completed(_))) - .unwrap_or(self.command_ctx_queue.len()); - let complete_nodes = self.command_ctx_queue.drain(..index).collect_vec(); - complete_nodes - } - - /// Remove all nodes from queue and return them. - fn barrier_failed(&mut self) -> Vec { - self.command_ctx_queue.drain(..).collect_vec() + fn barrier_collected(&mut self, prev_epoch: u64, result: Vec) { + if let Some(node) = self.command_ctx_queue.get_mut(&prev_epoch) { + assert!(matches!(node.state, BarrierEpochState::InFlight)); + node.state = BarrierEpochState::Collected(result); + } else { + panic!( + "received barrier complete response for an unknown epoch: {}", + prev_epoch + ); + } } /// Pause inject barrier until True. fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool { let in_flight_not_full = self .command_ctx_queue - .iter() - .filter(|x| matches!(x.state, InFlight)) + .values() + .filter(|x| matches!(x.state, BarrierEpochState::InFlight)) .count() < in_flight_barrier_nums; // Whether some command requires pausing concurrent barrier. If so, it must be the last one. let should_pause = self .command_ctx_queue - .back() - .map(|x| x.command_ctx.command.should_pause_inject_barrier()) + .last_key_value() + .map(|(_, x)| &x.command_ctx) + .or(match &self.completing_command { + CompletingCommand::None | CompletingCommand::Err(_) => None, + CompletingCommand::Completing { command_ctx, .. } => Some(command_ctx), + }) + .map(|command_ctx| command_ctx.command.should_pause_inject_barrier()) .unwrap_or(false); debug_assert_eq!( self.command_ctx_queue - .iter() - .any(|x| x.command_ctx.command.should_pause_inject_barrier()), + .values() + .map(|node| &node.command_ctx) + .chain( + match &self.completing_command { + CompletingCommand::None | CompletingCommand::Err(_) => None, + CompletingCommand::Completing { command_ctx, .. } => Some(command_ctx), + } + .into_iter() + ) + .any(|command_ctx| command_ctx.command.should_pause_inject_barrier()), should_pause ); in_flight_not_full && !should_pause } - /// Check whether the target epoch is managed by `CheckpointControl`. - pub fn contains_epoch(&self, epoch: u64) -> bool { - self.command_ctx_queue - .iter() - .any(|x| x.command_ctx.prev_epoch.value().0 == epoch) + /// We need to make sure there are no changes when doing recovery + pub async fn clear_on_err(&mut self, err: &MetaError) { + // join spawned completing command to finish no matter it succeeds or not. + let is_err = match replace(&mut self.completing_command, CompletingCommand::None) { + CompletingCommand::None => false, + CompletingCommand::Completing { + command_ctx, + join_handle, + } => { + info!( + prev_epoch = ?command_ctx.prev_epoch, + curr_epoch = ?command_ctx.curr_epoch, + "waiting for completing command to finish in recovery" + ); + match join_handle.await { + Err(e) => { + warn!(err = ?e.as_report(), "failed to join completing task"); + true + } + Ok(Err(e)) => { + warn!(err = ?e.as_report(), "failed to complete barrier during clear"); + true + } + Ok(Ok(_)) => false, + } + } + CompletingCommand::Err(_) => true, + }; + if !is_err { + // continue to finish the pending collected barrier. + while let Some(( + _, + EpochNode { + state: BarrierEpochState::Collected(_), + .. + }, + )) = self.command_ctx_queue.first_key_value() + { + let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); + let command_ctx = node.command_ctx.clone(); + if let Err(e) = self.context.clone().complete_barrier(node).await { + error!( + prev_epoch = ?command_ctx.prev_epoch, + curr_epoch = ?command_ctx.curr_epoch, + err = ?e.as_report(), + "failed to complete barrier during recovery" + ); + break; + } else { + info!( + prev_epoch = ?command_ctx.prev_epoch, + curr_epoch = ?command_ctx.curr_epoch, + "succeed to complete barrier during recovery" + ) + } + } + } + for (_, node) in take(&mut self.command_ctx_queue) { + for notifier in node.notifiers { + notifier.notify_failed(err.clone()); + } + node.enqueue_time.observe_duration(); + } } } /// The state and message of this barrier, a node for concurrent checkpoint. pub struct EpochNode { /// Timer for recording barrier latency, taken after `complete_barriers`. - timer: Option, - /// The timer of `barrier_wait_commit_latency` - wait_commit_timer: Option, + enqueue_time: HistogramTimer, /// Whether this barrier is in-flight or completed. state: BarrierEpochState, @@ -325,13 +394,26 @@ enum BarrierEpochState { /// This barrier is current in-flight on the stream graph of compute nodes. InFlight, - /// This barrier is completed or failed. - Completed(Vec), + /// This barrier is collected. + Collected(Vec), } -/// The result of barrier completion. +enum CompletingCommand { + None, + Completing { + command_ctx: Arc, + + // The join handle of a spawned task that completes the barrier. + // The return value indicate whether there is some create streaming job command + // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier + join_handle: JoinHandle>, + }, + Err(MetaError), +} + +/// The result of barrier collect. #[derive(Debug)] -struct BarrierCompletion { +struct BarrierCollectResult { prev_epoch: u64, result: MetaResult>, } @@ -571,12 +653,32 @@ impl GlobalBarrierManager { } } // Barrier completes. - completion = self.rpc_manager.next_complete_barrier() => { - self.handle_barrier_complete( - completion, - ) - .await; + collect_result = self.rpc_manager.next_collected_barrier() => { + match collect_result.result { + Ok(resps) => { + self.checkpoint_control.barrier_collected(collect_result.prev_epoch, resps); + }, + Err(e) => { + fail_point!("inject_barrier_err_success"); + self.failure_recovery(e).await; + } + } } + complete_result = self.checkpoint_control.next_completed_barrier() => { + match complete_result { + Ok(output) => { + // If there are remaining commands (that requires checkpoint to finish), we force + // the next barrier to be a checkpoint. + if output.require_next_checkpoint { + assert_matches!(output.command_ctx.kind, BarrierKind::Barrier); + self.scheduled_barriers.force_checkpoint_in_next_barrier(); + } + } + Err(e) => { + self.failure_recovery(e).await; + } + } + }, scheduled = self.scheduled_barriers.next_barrier(), if self .checkpoint_control @@ -647,72 +749,10 @@ impl GlobalBarrierManager { .enqueue_command(command_ctx.clone(), notifiers); } - /// Changes the state to `Complete`, and try to commit all epoch that state is `Complete` in - /// order. If commit is err, all nodes will be handled. - async fn handle_barrier_complete(&mut self, completion: BarrierCompletion) { - let BarrierCompletion { prev_epoch, result } = completion; - - assert!( - self.checkpoint_control.contains_epoch(prev_epoch), - "received barrier complete response for an unknown epoch: {}", - prev_epoch - ); - - if let Err(err) = result { - // FIXME: If it is a connector source error occurred in the init barrier, we should pass - // back to frontend - fail_point!("inject_barrier_err_success"); - let fail_node = self.checkpoint_control.barrier_failed(); - warn!(%prev_epoch, error = %err.as_report(), "Failed to complete epoch"); - self.failure_recovery(err, fail_node).await; - return; - } - // change the state to Complete - let mut complete_nodes = self - .checkpoint_control - .barrier_completed(prev_epoch, result.unwrap()); - // try commit complete nodes - let (mut index, mut err_msg) = (0, None); - for (i, node) in complete_nodes.iter_mut().enumerate() { - assert!(matches!(node.state, Completed(_))); - let span = node.command_ctx.span.clone(); - if let Err(err) = self.complete_barrier(node).instrument(span).await { - index = i; - err_msg = Some(err); - break; - } - } - // Handle the error node and the nodes after it - if let Some(err) = err_msg { - let fail_nodes = complete_nodes - .drain(index..) - .chain(self.checkpoint_control.barrier_failed().into_iter()) - .collect_vec(); - warn!(%prev_epoch, error = %err.as_report(), "Failed to commit epoch"); - self.failure_recovery(err, fail_nodes).await; - } - } - - async fn failure_recovery( - &mut self, - err: MetaError, - fail_nodes: impl IntoIterator, - ) { + async fn failure_recovery(&mut self, err: MetaError) { self.context.tracker.lock().await.abort_all(&err); self.rpc_manager.clear(); - - for node in fail_nodes { - if let Some(timer) = node.timer { - timer.observe_duration(); - } - if let Some(wait_commit_timer) = node.wait_commit_timer { - wait_commit_timer.observe_duration(); - } - node.notifiers.into_iter().for_each(|notifier| - // some of the fail nodes may be notified as collected before, we should notify them - // as failed using the specified error. - notifier.notify_failed(err.clone())); - } + self.checkpoint_control.clear_on_err(&err).await; if self.enable_recovery { self.context @@ -735,18 +775,55 @@ impl GlobalBarrierManager { panic!("failed to execute barrier: {}", err.as_report()); } } +} +impl GlobalBarrierManagerContext { /// Try to commit this node. If err, returns - async fn complete_barrier(&mut self, node: &mut EpochNode) -> MetaResult<()> { - let prev_epoch = node.command_ctx.prev_epoch.value().0; - match &mut node.state { - Completed(resps) => { + async fn complete_barrier(self, node: EpochNode) -> MetaResult { + let EpochNode { + command_ctx, + mut notifiers, + enqueue_time, + state, + .. + } = node; + let resps = must_match!(state, BarrierEpochState::Collected(resps) => resps); + let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer(); + let (commit_info, create_mview_progress) = collect_commit_epoch_info(resps); + if let Err(e) = self.update_snapshot(&command_ctx, commit_info).await { + for notifier in notifiers { + notifier.notify_collection_failed(e.clone()); + } + return Err(e); + }; + notifiers.iter_mut().for_each(|notifier| { + notifier.notify_collected(); + }); + let has_remaining = self + .update_tracking_jobs(notifiers, command_ctx.clone(), create_mview_progress) + .await?; + let duration_sec = enqueue_time.stop_and_record(); + self.report_complete_event(duration_sec, &command_ctx); + wait_commit_timer.observe_duration(); + Ok(BarrierCompleteOutput { + command_ctx, + require_next_checkpoint: has_remaining, + }) + } + + async fn update_snapshot( + &self, + command_ctx: &CommandContext, + commit_info: CommitEpochInfo, + ) -> MetaResult<()> { + { + { + let prev_epoch = command_ctx.prev_epoch.value().0; // We must ensure all epochs are committed in ascending order, // because the storage engine will query from new to old in the order in which // the L0 layer files are generated. // See https://github.com/risingwave-labs/risingwave/issues/1251 - let kind = node.command_ctx.kind; - let commit_info = collect_commit_epoch_info(resps); + let kind = command_ctx.kind; // hummock_manager commit epoch. let mut new_snapshot = None; @@ -758,25 +835,20 @@ impl GlobalBarrierManager { ), BarrierKind::Checkpoint => { new_snapshot = self - .context .hummock_manager - .commit_epoch(node.command_ctx.prev_epoch.value().0, commit_info) + .commit_epoch(command_ctx.prev_epoch.value().0, commit_info) .await?; } BarrierKind::Barrier => { - new_snapshot = Some( - self.context - .hummock_manager - .update_current_epoch(prev_epoch), - ); + new_snapshot = Some(self.hummock_manager.update_current_epoch(prev_epoch)); // if we collect a barrier(checkpoint = false), // we need to ensure that command is Plain and the notifier's checkpoint is // false - assert!(!node.command_ctx.command.need_checkpoint()); + assert!(!command_ctx.command.need_checkpoint()); } } - node.command_ctx.post_collect().await?; + command_ctx.post_collect().await?; // Notify new snapshot after fragment_mapping changes have been notified in // `post_collect`. if let Some(snapshot) = new_snapshot { @@ -787,16 +859,22 @@ impl GlobalBarrierManager { Info::HummockSnapshot(snapshot), ); } + Ok(()) + } + } + } + async fn update_tracking_jobs( + &self, + notifiers: Vec, + command_ctx: Arc, + create_mview_progress: Vec, + ) -> MetaResult { + { + { // Notify about collected. - let mut notifiers = take(&mut node.notifiers); - notifiers.iter_mut().for_each(|notifier| { - notifier.notify_collected(); - }); - - // Notify about collected. - let version_stats = self.context.hummock_manager.get_version_stats().await; - let mut tracker = self.context.tracker.lock().await; + let version_stats = self.hummock_manager.get_version_stats().await; + let mut tracker = self.tracker.lock().await; // Save `finished_commands` for Create MVs. let finished_commands = { @@ -804,7 +882,7 @@ impl GlobalBarrierManager { // Add the command to tracker. if let Some(command) = tracker.add( TrackingCommand { - context: node.command_ctx.clone(), + context: command_ctx.clone(), notifiers, }, &version_stats, @@ -813,9 +891,9 @@ impl GlobalBarrierManager { commands.push(command); } // Update the progress of all commands. - for progress in resps.iter().flat_map(|r| &r.create_mview_progress) { + for progress in create_mview_progress { // Those with actors complete can be finished immediately. - if let Some(command) = tracker.update(progress, &version_stats) { + if let Some(command) = tracker.update(&progress, &version_stats) { tracing::trace!(?progress, "finish progress"); commands.push(command); } else { @@ -829,41 +907,88 @@ impl GlobalBarrierManager { tracker.stash_command_to_finish(command); } - if let Some(table_id) = node.command_ctx.table_to_cancel() { + if let Some(table_id) = command_ctx.table_to_cancel() { // the cancelled command is possibly stashed in `finished_commands` and waiting // for checkpoint, we should also clear it. tracker.cancel_command(table_id); } - let remaining = tracker.finish_jobs(kind.is_checkpoint()).await?; - // If there are remaining commands (that requires checkpoint to finish), we force - // the next barrier to be a checkpoint. - if remaining { - assert_matches!(kind, BarrierKind::Barrier); - self.scheduled_barriers.force_checkpoint_in_next_barrier(); - } + let has_remaining_job = tracker + .finish_jobs(command_ctx.kind.is_checkpoint()) + .await?; - let duration_sec = node.timer.take().unwrap().stop_and_record(); - node.wait_commit_timer.take().unwrap().observe_duration(); + Ok(has_remaining_job) + } + } + } + fn report_complete_event(&self, duration_sec: f64, command_ctx: &CommandContext) { + { + { { // Record barrier latency in event log. use risingwave_pb::meta::event_log; let event = event_log::EventBarrierComplete { - prev_epoch: node.command_ctx.prev_epoch.value().0, - cur_epoch: node.command_ctx.curr_epoch.value().0, + prev_epoch: command_ctx.prev_epoch.value().0, + cur_epoch: command_ctx.curr_epoch.value().0, duration_sec, - command: node.command_ctx.command.to_string(), - barrier_kind: node.command_ctx.kind.as_str_name().to_string(), + command: command_ctx.command.to_string(), + barrier_kind: command_ctx.kind.as_str_name().to_string(), }; self.env .event_log_manager_ref() .add_event_logs(vec![event_log::Event::BarrierComplete(event)]); } + } + } + } +} - Ok(()) +struct BarrierCompleteOutput { + command_ctx: Arc, + require_next_checkpoint: bool, +} + +impl CheckpointControl { + pub(super) async fn next_completed_barrier(&mut self) -> MetaResult { + if matches!(&self.completing_command, CompletingCommand::None) { + // If there is no completing barrier, try to start completing the earliest barrier if + // it has been collected. + if let Some(( + _, + EpochNode { + state: BarrierEpochState::Collected(_), + .. + }, + )) = self.command_ctx_queue.first_key_value() + { + let (_, node) = self.command_ctx_queue.pop_first().expect("non-empty"); + let command_ctx = node.command_ctx.clone(); + let join_handle = tokio::spawn(self.context.clone().complete_barrier(node)); + self.completing_command = CompletingCommand::Completing { + command_ctx, + join_handle, + }; } - InFlight => unreachable!(), + } + + if let CompletingCommand::Completing { join_handle, .. } = &mut self.completing_command { + let join_result = join_handle + .await + .map_err(|e| { + anyhow!("failed to join completing command: {:?}", e.as_report()).into() + }) + .and_then(|result| result); + // It's important to reset the completing_command after await no matter the result is err + // or not, and otherwise the join handle will be polled again after ready. + if let Err(e) = &join_result { + self.completing_command = CompletingCommand::Err(e.clone()); + } else { + self.completing_command = CompletingCommand::None; + } + join_result + } else { + pending().await } } } @@ -955,37 +1080,39 @@ impl GlobalBarrierManagerContext { pub type BarrierManagerRef = GlobalBarrierManagerContext; -fn collect_commit_epoch_info(resps: &mut [BarrierCompleteResponse]) -> CommitEpochInfo { +fn collect_commit_epoch_info( + resps: Vec, +) -> (CommitEpochInfo, Vec) { let mut sst_to_worker: HashMap = HashMap::new(); let mut synced_ssts: Vec = vec![]; - for resp in &mut *resps { - let mut t: Vec = resp - .synced_sstables - .iter_mut() - .map(|grouped| { - let sst_info = std::mem::take(&mut grouped.sst).expect("field not None"); - sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id); - ExtendedSstableInfo::new( - grouped.compaction_group_id, - sst_info, - std::mem::take(&mut grouped.table_stats_map), - ) - }) - .collect_vec(); - synced_ssts.append(&mut t); + let mut table_watermarks = Vec::with_capacity(resps.len()); + let mut progresses = Vec::new(); + for resp in resps { + let ssts_iter = resp.synced_sstables.into_iter().map(|grouped| { + let sst_info = grouped.sst.expect("field not None"); + sst_to_worker.insert(sst_info.get_object_id(), resp.worker_id); + ExtendedSstableInfo::new( + grouped.compaction_group_id, + sst_info, + grouped.table_stats_map, + ) + }); + synced_ssts.extend(ssts_iter); + table_watermarks.push(resp.table_watermarks); + progresses.extend(resp.create_mview_progress); } - CommitEpochInfo::new( + let info = CommitEpochInfo::new( synced_ssts, merge_multiple_new_table_watermarks( - resps - .iter() - .map(|resp| { - resp.table_watermarks - .iter() + table_watermarks + .into_iter() + .map(|watermarks| { + watermarks + .into_iter() .map(|(table_id, watermarks)| { ( - TableId::new(*table_id), - TableWatermarks::from_protobuf(watermarks), + TableId::new(table_id), + TableWatermarks::from_protobuf(&watermarks), ) }) .collect() @@ -993,5 +1120,6 @@ fn collect_commit_epoch_info(resps: &mut [BarrierCompleteResponse]) -> CommitEpo .collect_vec(), ), sst_to_worker, - ) + ); + (info, progresses) } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 755d8f4c2061..dfe9ada44a47 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -40,7 +40,7 @@ use tracing::Instrument; use uuid::Uuid; use super::command::CommandContext; -use super::{BarrierCompletion, GlobalBarrierManagerContext}; +use super::{BarrierCollectResult, GlobalBarrierManagerContext}; use crate::manager::{MetaSrvEnv, WorkerId}; use crate::{MetaError, MetaResult}; @@ -48,7 +48,7 @@ pub(super) struct BarrierRpcManager { context: GlobalBarrierManagerContext, /// Futures that await on the completion of barrier. - injected_in_progress_barrier: FuturesUnordered, + injected_in_progress_barrier: FuturesUnordered, prev_injecting_barrier: Option>, } @@ -80,12 +80,12 @@ impl BarrierRpcManager { .push(await_complete_future); } - pub(super) async fn next_complete_barrier(&mut self) -> BarrierCompletion { + pub(super) async fn next_collected_barrier(&mut self) -> BarrierCollectResult { pending_on_none(self.injected_in_progress_barrier.next()).await } } -pub(super) type BarrierCompletionFuture = impl Future + Send + 'static; +pub(super) type BarrierCollectFuture = impl Future + Send + 'static; impl GlobalBarrierManagerContext { /// Inject a barrier to all CNs and spawn a task to collect it @@ -94,7 +94,7 @@ impl GlobalBarrierManagerContext { command_context: Arc, inject_tx: Option>, prev_inject_rx: Option>, - ) -> BarrierCompletionFuture { + ) -> BarrierCollectFuture { let (tx, rx) = oneshot::channel(); let prev_epoch = command_context.prev_epoch.value().0; let stream_rpc_manager = self.stream_rpc_manager.clone(); @@ -103,7 +103,7 @@ impl GlobalBarrierManagerContext { let span = command_context.span.clone(); if let Some(prev_inject_rx) = prev_inject_rx { if prev_inject_rx.await.is_err() { - let _ = tx.send(BarrierCompletion { + let _ = tx.send(BarrierCollectResult { prev_epoch, result: Err(anyhow!("prev barrier failed to be injected").into()), }); @@ -125,7 +125,7 @@ impl GlobalBarrierManagerContext { .await; } Err(e) => { - let _ = tx.send(BarrierCompletion { + let _ = tx.send(BarrierCollectResult { prev_epoch, result: Err(e), }); @@ -134,7 +134,7 @@ impl GlobalBarrierManagerContext { }); rx.map(move |result| match result { Ok(completion) => completion, - Err(_e) => BarrierCompletion { + Err(_e) => BarrierCollectResult { prev_epoch, result: Err(anyhow!("failed to receive barrier completion result").into()), }, @@ -222,7 +222,7 @@ impl StreamRpcManager { &self, node_need_collect: HashMap, command_context: Arc, - barrier_complete_tx: oneshot::Sender, + barrier_collect_tx: oneshot::Sender, ) { let prev_epoch = command_context.prev_epoch.value().0; let tracing_context = @@ -272,8 +272,8 @@ impl StreamRpcManager { .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); }) .map_err(Into::into); - let _ = barrier_complete_tx - .send(BarrierCompletion { prev_epoch, result }) + let _ = barrier_collect_tx + .send(BarrierCollectResult { prev_epoch, result }) .inspect_err(|_| tracing::warn!(prev_epoch, "failed to notify barrier completion")); } }