Skip to content

Commit

Permalink
feat: poll barrier complete rpc in barrier manager worker loop (#14419)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 9, 2024
1 parent a30583f commit d20632d
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 195 deletions.
210 changes: 22 additions & 188 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,16 @@ use std::assert_matches::assert_matches;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet, VecDeque};
use std::mem::take;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use fail::fail_point;
use futures::future::try_join_all;
use itertools::Itertools;
use prometheus::HistogramTimer;
use risingwave_common::bail;
use risingwave_common::catalog::TableId;
use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_common::util::tracing::TracingContext;
use risingwave_hummock_sdk::table_watermark::{
merge_multiple_new_table_watermarks, TableWatermarks,
};
Expand All @@ -40,24 +37,19 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_plan::{Barrier, BarrierMutation};
use risingwave_pb::stream_service::{
BarrierCompleteRequest, BarrierCompleteResponse, InjectBarrierRequest,
};
use risingwave_rpc_client::StreamClientPoolRef;
use tokio::sync::mpsc::UnboundedSender;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::Instrument;
use uuid::Uuid;

use self::command::CommandContext;
use self::info::BarrierActorInfo;
use self::notifier::Notifier;
use self::progress::TrackingCommand;
use crate::barrier::notifier::BarrierInfo;
use crate::barrier::progress::{CreateMviewProgressTracker, TrackingJob};
use crate::barrier::rpc::BarrierRpcManager;
use crate::barrier::BarrierEpochState::{Completed, InFlight};
use crate::hummock::{CommitEpochInfo, HummockManagerRef};
use crate::manager::sink_coordination::SinkCoordinatorManager;
Expand All @@ -72,6 +64,7 @@ mod info;
mod notifier;
mod progress;
mod recovery;
mod rpc;
mod schedule;
mod trace;

Expand Down Expand Up @@ -135,6 +128,7 @@ struct Scheduled {
/// Choose a different barrier(checkpoint == true) according to it
checkpoint: bool,
}

/// Changes to the actors to be sent or collected after this command is committed.
///
/// Since the checkpoints might be concurrent, the meta store of table fragments is only updated
Expand Down Expand Up @@ -211,6 +205,8 @@ pub struct GlobalBarrierManager {
state: BarrierManagerState,

checkpoint_control: CheckpointControl,

rpc_manager: BarrierRpcManager,
}

/// Controls the concurrent execution of commands.
Expand Down Expand Up @@ -605,6 +601,8 @@ impl GlobalBarrierManager {
env: env.clone(),
};

let rpc_manager = BarrierRpcManager::new(context.clone());

Self {
enable_recovery,
scheduled_barriers,
Expand All @@ -613,6 +611,7 @@ impl GlobalBarrierManager {
env,
state: initial_invalid_state,
checkpoint_control,
rpc_manager,
}
}

Expand Down Expand Up @@ -717,7 +716,6 @@ impl GlobalBarrierManager {

let mut min_interval = tokio::time::interval(interval);
min_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let (barrier_complete_tx, mut barrier_complete_rx) = tokio::sync::mpsc::unbounded_channel();
let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();
self.env
Expand Down Expand Up @@ -750,32 +748,29 @@ impl GlobalBarrierManager {
}
}
// Barrier completes.
completion = barrier_complete_rx.recv() => {
completion = self.rpc_manager.next_complete_barrier() => {
self.handle_barrier_complete(
completion.unwrap(),
completion,
)
.await;
}

// There's barrier scheduled.
_ = self.scheduled_barriers.wait_one(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
min_interval.reset(); // Reset the interval as we have a new barrier.
self.handle_new_barrier(&barrier_complete_tx).await;
self.handle_new_barrier().await;
}
// Minimum interval reached.
_ = min_interval.tick(), if self.checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {
self.handle_new_barrier(&barrier_complete_tx).await;
self.handle_new_barrier().await;
}
}
self.checkpoint_control.update_barrier_nums_metrics();
}
}

/// Handle the new barrier from the scheduled queue and inject it.
async fn handle_new_barrier(
&mut self,
barrier_complete_tx: &UnboundedSender<BarrierCompletion>,
) {
async fn handle_new_barrier(&mut self) {
assert!(self
.checkpoint_control
.can_inject_barrier(self.in_flight_barrier_nums));
Expand Down Expand Up @@ -821,8 +816,8 @@ impl GlobalBarrierManager {

send_latency_timer.observe_duration();

self.context
.inject_barrier(command_ctx.clone(), barrier_complete_tx)
self.rpc_manager
.inject_barrier(command_ctx.clone())
.instrument(span)
.await;

Expand Down Expand Up @@ -850,17 +845,11 @@ impl GlobalBarrierManager {
async fn handle_barrier_complete(&mut self, completion: BarrierCompletion) {
let BarrierCompletion { prev_epoch, result } = completion;

// Received barrier complete responses with an epoch that is not managed by checkpoint
// control, which means a recovery has been triggered. We should ignore it because
// trying to complete and commit the epoch is not necessary and could cause
// meaningless recovery again.
if !self.checkpoint_control.contains_epoch(prev_epoch) {
tracing::warn!(
"received barrier complete response for an unknown epoch: {}",
prev_epoch
);
return;
}
assert!(
self.checkpoint_control.contains_epoch(prev_epoch),
"received barrier complete response for an unknown epoch: {}",
prev_epoch
);

if let Err(err) = result {
// FIXME: If it is a connector source error occurred in the init barrier, we should pass
Expand Down Expand Up @@ -903,6 +892,7 @@ impl GlobalBarrierManager {
fail_nodes: impl IntoIterator<Item = EpochNode>,
) {
self.checkpoint_control.clear_changes();
self.rpc_manager.clear();

for node in fail_nodes {
if let Some(timer) = node.timer {
Expand Down Expand Up @@ -1109,162 +1099,6 @@ impl GlobalBarrierManagerContext {
*status = new_status;
}

/// Inject a barrier to all CNs and spawn a task to collect it
async fn inject_barrier(
&self,
command_context: Arc<CommandContext>,
barrier_complete_tx: &UnboundedSender<BarrierCompletion>,
) {
let prev_epoch = command_context.prev_epoch.value().0;
let result = self.inject_barrier_inner(command_context.clone()).await;
match result {
Ok(node_need_collect) => {
// todo: the collect handler should be abort when recovery.
tokio::spawn(Self::collect_barrier(
self.env.clone(),
node_need_collect,
self.env.stream_client_pool_ref(),
command_context,
barrier_complete_tx.clone(),
));
}
Err(e) => {
let _ = barrier_complete_tx.send(BarrierCompletion {
prev_epoch,
result: Err(e),
});
}
}
}

/// Send inject-barrier-rpc to stream service and wait for its response before returns.
async fn inject_barrier_inner(
&self,
command_context: Arc<CommandContext>,
) -> MetaResult<HashMap<WorkerId, bool>> {
fail_point!("inject_barrier_err", |_| bail!("inject_barrier_err"));
let mutation = command_context.to_mutation().await?;
let info = command_context.info.clone();
let mut node_need_collect = HashMap::new();
let inject_futures = info.node_map.iter().filter_map(|(node_id, node)| {
let actor_ids_to_send = info.actor_ids_to_send(node_id).collect_vec();
let actor_ids_to_collect = info.actor_ids_to_collect(node_id).collect_vec();
if actor_ids_to_collect.is_empty() {
// No need to send or collect barrier for this node.
assert!(actor_ids_to_send.is_empty());
node_need_collect.insert(*node_id, false);
None
} else {
node_need_collect.insert(*node_id, true);
let mutation = mutation.clone();
let request_id = Uuid::new_v4().to_string();
let barrier = Barrier {
epoch: Some(risingwave_pb::data::Epoch {
curr: command_context.curr_epoch.value().0,
prev: command_context.prev_epoch.value().0,
}),
mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
tracing_context: TracingContext::from_span(command_context.curr_epoch.span())
.to_protobuf(),
kind: command_context.kind as i32,
passed_actors: vec![],
};
async move {
let client = self.env.stream_client_pool().get(node).await?;

let request = InjectBarrierRequest {
request_id,
barrier: Some(barrier),
actor_ids_to_send,
actor_ids_to_collect,
};
tracing::debug!(
target: "events::meta::barrier::inject_barrier",
?request, "inject barrier request"
);

// This RPC returns only if this worker node has injected this barrier.
client.inject_barrier(request).await
}
.into()
}
});
try_join_all(inject_futures).await.inspect_err(|e| {
// Record failure in event log.
use risingwave_pb::meta::event_log;
use thiserror_ext::AsReport;
let event = event_log::EventInjectBarrierFail {
prev_epoch: command_context.prev_epoch.value().0,
cur_epoch: command_context.curr_epoch.value().0,
error: e.to_report_string(),
};
self.env
.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
})?;
Ok(node_need_collect)
}

/// Send barrier-complete-rpc and wait for responses from all CNs
async fn collect_barrier(
env: MetaSrvEnv,
node_need_collect: HashMap<WorkerId, bool>,
client_pool_ref: StreamClientPoolRef,
command_context: Arc<CommandContext>,
barrier_complete_tx: UnboundedSender<BarrierCompletion>,
) {
let prev_epoch = command_context.prev_epoch.value().0;
let tracing_context =
TracingContext::from_span(command_context.prev_epoch.span()).to_protobuf();

let info = command_context.info.clone();
let client_pool = client_pool_ref.deref();
let collect_futures = info.node_map.iter().filter_map(|(node_id, node)| {
if !*node_need_collect.get(node_id).unwrap() {
// No need to send or collect barrier for this node.
None
} else {
let request_id = Uuid::new_v4().to_string();
let tracing_context = tracing_context.clone();
async move {
let client = client_pool.get(node).await?;
let request = BarrierCompleteRequest {
request_id,
prev_epoch,
tracing_context,
};
tracing::debug!(
target: "events::meta::barrier::barrier_complete",
?request, "barrier complete"
);

// This RPC returns only if this worker node has collected this barrier.
client.barrier_complete(request).await
}
.into()
}
});

let result = try_join_all(collect_futures)
.await
.inspect_err(|e| {
// Record failure in event log.
use risingwave_pb::meta::event_log;
use thiserror_ext::AsReport;
let event = event_log::EventCollectBarrierFail {
prev_epoch: command_context.prev_epoch.value().0,
cur_epoch: command_context.curr_epoch.value().0,
error: e.to_report_string(),
};
env.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
})
.map_err(Into::into);
let _ = barrier_complete_tx
.send(BarrierCompletion { prev_epoch, result })
.inspect_err(|_| tracing::warn!(prev_epoch, "failed to notify barrier completion"));
}

/// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
/// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
/// will create or drop before this barrier flow through them.
Expand Down
10 changes: 3 additions & 7 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,13 +432,9 @@ impl GlobalBarrierManagerContext {
if mce != INVALID_EPOCH {
command_ctx.wait_epoch_commit(mce).await?;
}
}

let (barrier_complete_tx, mut barrier_complete_rx) =
tokio::sync::mpsc::unbounded_channel();
self.inject_barrier(command_ctx.clone(), &barrier_complete_tx)
.await;
let res = match barrier_complete_rx.recv().await.unwrap().result {
};
let await_barrier_complete = self.inject_barrier(command_ctx.clone()).await;
let res = match await_barrier_complete.await.result {
Ok(response) => {
if let Err(err) = command_ctx.post_collect().await {
warn!(err = ?err, "post_collect failed");
Expand Down
Loading

0 comments on commit d20632d

Please sign in to comment.