Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Drain all the pending messages in the channel when TracingUnboundedRe…
Browse files Browse the repository at this point in the history
…ceiver is dropped (#13917)

Signed-off-by: linning <linningde25@gmail.com>
  • Loading branch information
NingLin-P committed Apr 14, 2023
1 parent c2f664e commit a9f67d0
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions client/utils/src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl<T> TracingUnboundedReceiver<T> {

impl<T> Drop for TracingUnboundedReceiver<T> {
fn drop(&mut self) {
// Close the channel to prevent any further messages to be sent into the channel
self.close();
// the number of messages about to be dropped
let count = self.inner.len();
Expand All @@ -150,6 +151,10 @@ impl<T> Drop for TracingUnboundedReceiver<T> {
.with_label_values(&[self.name, "dropped"])
.inc_by(count.saturated_into());
}
// Drain all the pending messages in the channel since they can never be accessed,
// this can be removed once https://github.com/smol-rs/async-channel/issues/23 is
// resolved
while let Ok(_) = self.inner.try_recv() {}
}
}

Expand Down Expand Up @@ -177,3 +182,22 @@ impl<T> FusedStream for TracingUnboundedReceiver<T> {
self.inner.is_terminated()
}
}

#[cfg(test)]
mod tests {
use super::tracing_unbounded;
use async_channel::{self, RecvError, TryRecvError};

#[test]
fn test_tracing_unbounded_receiver_drop() {
let (tracing_unbounded_sender, tracing_unbounded_receiver) =
tracing_unbounded("test-receiver-drop", 10);
let (tx, rx) = async_channel::unbounded::<usize>();

tracing_unbounded_sender.unbounded_send(tx).unwrap();
drop(tracing_unbounded_receiver);

assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
assert_eq!(rx.recv_blocking(), Err(RecvError));
}
}

0 comments on commit a9f67d0

Please sign in to comment.