From 80552827db12d0400c9510d9894d23bf5e32bbdb Mon Sep 17 00:00:00 2001 From: themanforfree Date: Tue, 12 Dec 2023 22:43:37 +0800 Subject: [PATCH] fix: fix ce event tx logs Signed-off-by: themanforfree --- .../cmd_worker/conflict_checked_mpmc.rs | 3 +- curp/src/server/cmd_worker/mod.rs | 37 +++++++++++-------- utils/src/shutdown.rs | 3 +- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/curp/src/server/cmd_worker/conflict_checked_mpmc.rs b/curp/src/server/cmd_worker/conflict_checked_mpmc.rs index 644aeb25f..a3dc7e287 100644 --- a/curp/src/server/cmd_worker/conflict_checked_mpmc.rs +++ b/curp/src/server/cmd_worker/conflict_checked_mpmc.rs @@ -544,6 +544,7 @@ pub(in crate::server) fn channel, bool)>(); + let ce_event_tx = CEEventTx(send_tx, shutdown_trigger.subscribe()); let _ig = tokio::spawn(conflict_checked_mpmc_task( filter_tx, filter_rx, @@ -551,7 +552,7 @@ pub(in crate::server) fn channel(flume::Sender>); +pub(super) struct CEEventTx(flume::Sender>, shutdown::Listener); /// Recv cmds that need to be executed #[derive(Clone)] @@ -300,36 +300,41 @@ pub(super) trait CEEventTxApi: Send + Sync + 'static { fn send_snapshot(&self, meta: SnapshotMeta) -> oneshot::Receiver; } -impl CEEventTxApi for CEEventTx { - fn send_sp_exe(&self, entry: Arc>) { - let event = CEEvent::SpecExeReady(Arc::clone(&entry)); +impl CEEventTx { + /// Send ce event + fn send_event(&self, event: CEEvent) { if let Err(e) = self.0.send(event) { + if self.1.is_shutdown() { + info!("send event after current node shutdown"); + return; + } error!("failed to send cmd exe event to background cmd worker, {e}"); } } +} + +impl CEEventTxApi for CEEventTx { + fn send_sp_exe(&self, entry: Arc>) { + let event = CEEvent::SpecExeReady(Arc::clone(&entry)); + self.send_event(event); + } fn send_after_sync(&self, entry: Arc>) { let event = CEEvent::ASReady(Arc::clone(&entry)); - if let Err(e) = self.0.send(event) { - error!("failed to send cmd as event to background cmd worker, {e}"); - } + self.send_event(event); } fn send_reset(&self, snapshot: Option) -> oneshot::Receiver<()> { let (tx, rx) = oneshot::channel(); - let msg = CEEvent::Reset(snapshot, tx); - if let Err(e) = self.0.send(msg) { - error!("failed to send reset event to background cmd worker, {e}"); - } + let event = CEEvent::Reset(snapshot, tx); + self.send_event(event); rx } fn send_snapshot(&self, meta: SnapshotMeta) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); - let msg = CEEvent::Snapshot(meta, tx); - if let Err(e) = self.0.send(msg) { - error!("failed to send snapshot event to background cmd worker, {e}"); - } + let event = CEEvent::Snapshot(meta, tx); + self.send_event(event); rx } } diff --git a/utils/src/shutdown.rs b/utils/src/shutdown.rs index f82a827d4..dfbb740e5 100644 --- a/utils/src/shutdown.rs +++ b/utils/src/shutdown.rs @@ -182,7 +182,8 @@ impl Listener { /// Check if the shutdown signal has been sent. #[inline] - pub fn is_shutdown(&mut self) -> bool { + #[must_use] + pub fn is_shutdown(&self) -> bool { !matches!(*self.listener.borrow(), Signal::Running) } }