From 8a4f54e71d1140cf9e9d25f1462e08fd0ba5c2c4 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 5 May 2024 12:07:45 -0700 Subject: [PATCH] feat: Add a force send function Closes #44 by adding a "force_send" method. This method can replace an existing element in the list, in which case that element is returned. This can be used to make "limited capacity" channels. Signed-off-by: John Nunley --- src/lib.rs | 44 ++++++++++++++++++++++++++++++++++++++++++-- tests/bounded.rs | 24 ++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9c9e3b9..2f5ce5d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,11 +44,10 @@ use core::marker::PhantomPinned; use core::pin::Pin; use core::sync::atomic::{AtomicUsize, Ordering}; use core::task::{Context, Poll}; -use core::usize; use alloc::sync::Arc; -use concurrent_queue::{ConcurrentQueue, PopError, PushError}; +use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError}; use event_listener::{Event, EventListener}; use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy}; use futures_core::ready; @@ -286,6 +285,47 @@ impl Sender { self.send(msg).wait() } + /// Forcefully push a message into this channel. + /// + /// If the channel is full, this method will replace an existing message in the + /// channel and return it as `Ok(Some(value))`. If the channel is closed, this + /// method will return an error. + /// + /// # Examples + /// + /// ``` + /// # futures_lite::future::block_on(async { + /// use async_channel::{bounded, SendError}; + /// + /// let (s, r) = bounded(3); + /// + /// assert_eq!(s.send(1).await, Ok(())); + /// assert_eq!(s.send(2).await, Ok(())); + /// assert_eq!(s.force_send(3), Ok(None)); + /// assert_eq!(s.force_send(4), Ok(Some(1))); + /// + /// assert_eq!(r.recv().await, Ok(2)); + /// assert_eq!(r.recv().await, Ok(3)); + /// assert_eq!(r.recv().await, Ok(4)); + /// # }); + /// ``` + pub fn force_send(&self, msg: T) -> Result, SendError> { + match self.channel.queue.force_push(msg) { + Ok(backlog) => { + // Notify a blocked receive operation. If the notified operation gets canceled, + // it will notify another blocked receive operation. + self.channel.recv_ops.notify_additional(1); + + // Notify all blocked streams. + self.channel.stream_ops.notify(usize::MAX); + + Ok(backlog) + } + + Err(ForcePushError(reject)) => Err(SendError(reject)), + } + } + /// Closes the channel. /// /// Returns `true` if this call has closed the channel and it was not closed already. diff --git a/tests/bounded.rs b/tests/bounded.rs index 4b691f7..460cb55 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -184,6 +184,30 @@ fn send() { .run(); } +#[cfg(not(target_family = "wasm"))] +#[test] +fn force_send() { + let (s, r) = bounded(1); + + Parallel::new() + .add(|| { + s.force_send(7).unwrap(); + sleep(ms(1000)); + s.force_send(8).unwrap(); + sleep(ms(1000)); + s.force_send(9).unwrap(); + sleep(ms(1000)); + s.force_send(10).unwrap(); + }) + .add(|| { + sleep(ms(1500)); + assert_eq!(future::block_on(r.recv()), Ok(8)); + assert_eq!(future::block_on(r.recv()), Ok(9)); + assert_eq!(future::block_on(r.recv()), Ok(10)); + }) + .run(); +} + #[cfg(not(target_family = "wasm"))] #[test] fn send_after_close() {