Skip to content

Commit

Permalink
fix: fix ce event tx logs
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <themanforfree@gmail.com>
  • Loading branch information
themanforfree authored and mergify[bot] committed Dec 18, 2023
1 parent 7ee48b6 commit 8055282
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
3 changes: 2 additions & 1 deletion curp/src/server/cmd_worker/conflict_checked_mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,15 @@ pub(in crate::server) fn channel<C: 'static + Command, CE: 'static + CommandExec
let (filter_tx, recv_rx) = flume::unbounded();
// recv from user to mark a msg done
let (done_tx, done_rx) = flume::unbounded::<(Task<C>, bool)>();
let ce_event_tx = CEEventTx(send_tx, shutdown_trigger.subscribe());
let _ig = tokio::spawn(conflict_checked_mpmc_task(
filter_tx,
filter_rx,
ce,
shutdown_trigger,
done_rx,
));
(CEEventTx(send_tx), recv_rx, done_tx)
(ce_event_tx, recv_rx, done_tx)
}

/// Conflict checked mpmc task
Expand Down
37 changes: 21 additions & 16 deletions curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use clippy_utilities::NumericCast;
#[cfg(test)]
use mockall::automock;
use tokio::sync::oneshot;
use tracing::{debug, error};
use tracing::{debug, error, info};
use utils::shutdown;

use self::conflict_checked_mpmc::Task;
Expand Down Expand Up @@ -278,7 +278,7 @@ async fn worker_snapshot<

/// Send event to background command executor workers
#[derive(Debug, Clone)]
pub(super) struct CEEventTx<C: Command>(flume::Sender<CEEvent<C>>);
pub(super) struct CEEventTx<C: Command>(flume::Sender<CEEvent<C>>, shutdown::Listener);

/// Recv cmds that need to be executed
#[derive(Clone)]
Expand All @@ -300,36 +300,41 @@ pub(super) trait CEEventTxApi<C: Command + 'static>: Send + Sync + 'static {
fn send_snapshot(&self, meta: SnapshotMeta) -> oneshot::Receiver<Snapshot>;
}

impl<C: Command + 'static> CEEventTxApi<C> for CEEventTx<C> {
fn send_sp_exe(&self, entry: Arc<LogEntry<C>>) {
let event = CEEvent::SpecExeReady(Arc::clone(&entry));
impl<C: Command + 'static> CEEventTx<C> {
/// Send ce event
fn send_event(&self, event: CEEvent<C>) {
if let Err(e) = self.0.send(event) {
if self.1.is_shutdown() {
info!("send event after current node shutdown");
return;
}
error!("failed to send cmd exe event to background cmd worker, {e}");
}
}
}

impl<C: Command + 'static> CEEventTxApi<C> for CEEventTx<C> {
fn send_sp_exe(&self, entry: Arc<LogEntry<C>>) {
let event = CEEvent::SpecExeReady(Arc::clone(&entry));
self.send_event(event);
}

fn send_after_sync(&self, entry: Arc<LogEntry<C>>) {
let event = CEEvent::ASReady(Arc::clone(&entry));
if let Err(e) = self.0.send(event) {
error!("failed to send cmd as event to background cmd worker, {e}");
}
self.send_event(event);
}

fn send_reset(&self, snapshot: Option<Snapshot>) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel();
let msg = CEEvent::Reset(snapshot, tx);
if let Err(e) = self.0.send(msg) {
error!("failed to send reset event to background cmd worker, {e}");
}
let event = CEEvent::Reset(snapshot, tx);
self.send_event(event);
rx
}

fn send_snapshot(&self, meta: SnapshotMeta) -> oneshot::Receiver<Snapshot> {
let (tx, rx) = oneshot::channel();
let msg = CEEvent::Snapshot(meta, tx);
if let Err(e) = self.0.send(msg) {
error!("failed to send snapshot event to background cmd worker, {e}");
}
let event = CEEvent::Snapshot(meta, tx);
self.send_event(event);
rx
}
}
Expand Down
3 changes: 2 additions & 1 deletion utils/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ impl Listener {

/// Check if the shutdown signal has been sent.
#[inline]
pub fn is_shutdown(&mut self) -> bool {
#[must_use]
pub fn is_shutdown(&self) -> bool {
!matches!(*self.listener.borrow(), Signal::Running)
}
}
Expand Down

0 comments on commit 8055282

Please sign in to comment.