diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d62df86..8b55472 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,6 +42,13 @@ jobs: if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep - run: cargo test + - run: cargo test --no-default-features + - name: Install cargo-hack + uses: taiki-e/install-action@cargo-hack + - run: rustup target add thumbv7m-none-eabi + - name: Run cargo check (without dev-dependencies to catch missing feature flags) + run: cargo hack build --all --no-dev-deps + - run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps msrv: runs-on: ubuntu-latest @@ -49,12 +56,13 @@ jobs: matrix: # When updating this, the reminder to update the minimum supported # Rust version in Cargo.toml. - rust: ['1.45'] + rust: ['1.59'] steps: - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - run: cargo build + - run: cargo build --no-default-features clippy: runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index 5c4eea8..ed92ba1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ name = "async-channel" version = "1.9.0" authors = ["Stjepan Glavina "] edition = "2018" -rust-version = "1.45" +rust-version = "1.59" description = "Async multi-producer multi-consumer channel" license = "Apache-2.0 OR MIT" repository = "https://github.com/smol-rs/async-channel" @@ -15,10 +15,16 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [dependencies] -concurrent-queue = "2" -event-listener = "2.4.0" -futures-core = "0.3.5" +concurrent-queue = { version = "2", default-features = false } +event-listener = { version = "3.0.0", default-features = false } +event-listener-strategy = { version = "0.2.0", default-features = false } +futures-core = { version = "0.3.5", default-features = false } +pin-project-lite = "0.2.11" [dev-dependencies] easy-parallel = "3" futures-lite = "1" + +[features] +default = ["std"] +std = ["concurrent-queue/std", "event-listener/std", "event-listener-strategy/std"] diff --git a/src/lib.rs b/src/lib.rs index a92f38b..0820a88 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ //! # }); //! ``` +#![cfg_attr(not(feature = "std"), no_std)] #![forbid(unsafe_code)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![doc( @@ -35,18 +36,21 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] -use std::error; -use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::process; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::usize; +extern crate alloc; + +use core::fmt; +use core::future::Future; +use core::pin::Pin; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::task::{Context, Poll}; +use core::usize; + +use alloc::sync::Arc; use concurrent_queue::{ConcurrentQueue, PopError, PushError}; use event_listener::{Event, EventListener}; +use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy}; +use futures_core::ready; use futures_core::stream::Stream; struct Channel { @@ -128,8 +132,8 @@ pub fn bounded(cap: usize) -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { + listener: EventListener::new(&channel.stream_ops), channel, - listener: None, }; (s, r) } @@ -168,8 +172,8 @@ pub fn unbounded() -> (Sender, Receiver) { channel: channel.clone(), }; let r = Receiver { + listener: EventListener::new(&channel.stream_ops), channel, - listener: None, }; (s, r) } @@ -240,11 +244,11 @@ impl Sender { /// # }); /// ``` pub fn send(&self, msg: T) -> Send<'_, T> { - Send { + Send::_new(SendInner { sender: self, - listener: None, msg: Some(msg), - } + listener: EventListener::new(&self.channel.send_ops), + }) } /// Sends a message into this channel using the blocking strategy. @@ -272,6 +276,7 @@ impl Sender { /// drop(r); /// assert_eq!(s.send_blocking(2), Err(SendError(2))); /// ``` + #[cfg(feature = "std")] pub fn send_blocking(&self, msg: T) -> Result<(), SendError> { self.send(msg).wait() } @@ -463,7 +468,7 @@ impl Clone for Sender { // Make sure the count never overflows, even if lots of sender clones are leaked. if count > usize::MAX / 2 { - process::abort(); + abort(); } Sender { @@ -472,20 +477,34 @@ impl Clone for Sender { } } -/// The receiving side of a channel. -/// -/// Receivers can be cloned and shared among threads. When all receivers associated with a channel -/// are dropped, the channel becomes closed. -/// -/// The channel can also be closed manually by calling [`Receiver::close()`]. -/// -/// Receivers implement the [`Stream`] trait. -pub struct Receiver { - /// Inner channel state. - channel: Arc>, +pin_project_lite::pin_project! { + /// The receiving side of a channel. + /// + /// Receivers can be cloned and shared among threads. When all receivers associated with a channel + /// are dropped, the channel becomes closed. + /// + /// The channel can also be closed manually by calling [`Receiver::close()`]. + /// + /// Receivers implement the [`Stream`] trait. + pub struct Receiver { + // Inner channel state. + channel: Arc>, + + // Listens for a send or close event to unblock this stream. + #[pin] + listener: EventListener, + } - /// Listens for a send or close event to unblock this stream. - listener: Option, + impl PinnedDrop for Receiver { + fn drop(this: Pin<&mut Self>) { + let this = this.project(); + + // Decrement the receiver count and close the channel if it drops down to zero. + if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { + this.channel.close(); + } + } + } } impl Receiver { @@ -546,10 +565,10 @@ impl Receiver { /// # }); /// ``` pub fn recv(&self) -> Recv<'_, T> { - Recv { + Recv::_new(RecvInner { receiver: self, - listener: None, - } + listener: EventListener::new(&self.channel.recv_ops), + }) } /// Receives a message from the channel using the blocking strategy. @@ -580,6 +599,7 @@ impl Receiver { /// assert_eq!(r.recv_blocking(), Ok(1)); /// assert_eq!(r.recv_blocking(), Err(RecvError)); /// ``` + #[cfg(feature = "std")] pub fn recv_blocking(&self) -> Result { self.recv().wait() } @@ -750,15 +770,6 @@ impl Receiver { } } -impl Drop for Receiver { - fn drop(&mut self) { - // Decrement the receiver count and close the channel if it drops down to zero. - if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 { - self.channel.close(); - } - } -} - impl fmt::Debug for Receiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Receiver {{ .. }}") @@ -771,12 +782,12 @@ impl Clone for Receiver { // Make sure the count never overflows, even if lots of receiver clones are leaked. if count > usize::MAX / 2 { - process::abort(); + abort(); } Receiver { channel: self.channel.clone(), - listener: None, + listener: EventListener::new(&self.channel.stream_ops), } } } @@ -787,9 +798,11 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { // If this stream is listening for events, first wait for a notification. - if let Some(listener) = self.listener.as_mut() { - futures_core::ready!(Pin::new(listener).poll(cx)); - self.listener = None; + { + let this = self.as_mut().project(); + if this.listener.is_listening() { + ready!(this.listener.poll(cx)); + } } loop { @@ -797,27 +810,30 @@ impl Stream for Receiver { match self.try_recv() { Ok(msg) => { // The stream is not blocked on an event - drop the listener. - self.listener = None; + let mut this = self.project(); + this.listener + .as_mut() + .set(EventListener::new(&this.channel.stream_ops)); return Poll::Ready(Some(msg)); } Err(TryRecvError::Closed) => { // The stream is not blocked on an event - drop the listener. - self.listener = None; + let mut this = self.project(); + this.listener + .as_mut() + .set(EventListener::new(&this.channel.stream_ops)); return Poll::Ready(None); } Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - match self.listener.as_mut() { - None => { - // Create a listener and try sending the message again. - self.listener = Some(self.channel.stream_ops.listen()); - } - Some(_) => { - // Go back to the outer loop to poll the listener. - break; - } + let mut this = self.as_mut().project(); + if this.listener.is_listening() { + // Go back to the outer loop to wait for a notification. + break; + } else { + this.listener.as_mut().listen(); } } } @@ -852,7 +868,7 @@ impl WeakSender { Err(_) => None, Ok(new_value) if new_value > usize::MAX / 2 => { // Make sure the count never overflows, even if lots of sender clones are leaked. - process::abort(); + abort(); } Ok(_) => Some(Sender { channel: self.channel.clone(), @@ -898,11 +914,11 @@ impl WeakReceiver { Err(_) => None, Ok(new_value) if new_value > usize::MAX / 2 => { // Make sure the count never overflows, even if lots of receiver clones are leaked. - process::abort(); + abort(); } Ok(_) => Some(Receiver { channel: self.channel.clone(), - listener: None, + listener: EventListener::new(&self.channel.stream_ops), }), } } @@ -936,7 +952,8 @@ impl SendError { } } -impl error::Error for SendError {} +#[cfg(feature = "std")] +impl std::error::Error for SendError {} impl fmt::Debug for SendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -986,7 +1003,8 @@ impl TrySendError { } } -impl error::Error for TrySendError {} +#[cfg(feature = "std")] +impl std::error::Error for TrySendError {} impl fmt::Debug for TrySendError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -1012,7 +1030,8 @@ impl fmt::Display for TrySendError { #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub struct RecvError; -impl error::Error for RecvError {} +#[cfg(feature = "std")] +impl std::error::Error for RecvError {} impl fmt::Display for RecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -1048,7 +1067,8 @@ impl TryRecvError { } } -impl error::Error for TryRecvError {} +#[cfg(feature = "std")] +impl std::error::Error for TryRecvError {} impl fmt::Display for TryRecvError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -1059,158 +1079,117 @@ impl fmt::Display for TryRecvError { } } -/// A future returned by [`Sender::send()`]. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Send<'a, T> { - sender: &'a Sender, - listener: Option, - msg: Option, +easy_wrapper! { + /// A future returned by [`Sender::send()`]. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError>); + #[cfg(feature = "std")] + pub(crate) wait(); +} + +pin_project_lite::pin_project! { + #[derive(Debug)] + struct SendInner<'a, T> { + sender: &'a Sender, + msg: Option, + #[pin] + listener: EventListener, + } } -impl<'a, T> Send<'a, T> { +impl<'a, T> EventListenerFuture for SendInner<'a, T> { + type Output = Result<(), SendError>; + /// Run this future with the given `Strategy`. - fn run_with_strategy( - &mut self, - cx: &mut S::Context, + fn poll_with_strategy<'x, S: Strategy<'x>>( + self: Pin<&'x mut Self>, + strategy: &mut S, + context: &mut S::Context, ) -> Poll>> { + let mut this = self.project(); + loop { - let msg = self.msg.take().unwrap(); + let msg = this.msg.take().unwrap(); // Attempt to send a message. - match self.sender.try_send(msg) { + match this.sender.try_send(msg) { Ok(()) => return Poll::Ready(Ok(())), Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))), - Err(TrySendError::Full(m)) => self.msg = Some(m), + Err(TrySendError::Full(m)) => *this.msg = Some(m), } // Sending failed - now start listening for notifications or wait for one. - match self.listener.take() { - None => { - // Start listening and then try sending again. - self.listener = Some(self.sender.channel.send_ops.listen()); - } - Some(l) => { - // Poll using the given strategy - if let Err(l) = S::poll(l, cx) { - self.listener = Some(l); - return Poll::Pending; - } - } + if this.listener.is_listening() { + // Poll using the given strategy + ready!(S::poll(strategy, this.listener.as_mut(), context)); + } else { + this.listener.as_mut().listen(); } } } - - /// Run using the blocking strategy. - fn wait(mut self) -> Result<(), SendError> { - match self.run_with_strategy::(&mut ()) { - Poll::Ready(res) => res, - Poll::Pending => unreachable!(), - } - } } -impl<'a, T> Unpin for Send<'a, T> {} - -impl<'a, T> Future for Send<'a, T> { - type Output = Result<(), SendError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.run_with_strategy::>(cx) - } +easy_wrapper! { + /// A future returned by [`Receiver::recv()`]. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Recv<'a, T>(RecvInner<'a, T> => Result); + #[cfg(feature = "std")] + pub(crate) wait(); } -/// A future returned by [`Receiver::recv()`]. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Recv<'a, T> { - receiver: &'a Receiver, - listener: Option, +pin_project_lite::pin_project! { + #[derive(Debug)] + struct RecvInner<'a, T> { + receiver: &'a Receiver, + #[pin] + listener: EventListener, + } } -impl<'a, T> Unpin for Recv<'a, T> {} +impl<'a, T> EventListenerFuture for RecvInner<'a, T> { + type Output = Result; -impl<'a, T> Recv<'a, T> { /// Run this future with the given `Strategy`. - fn run_with_strategy( - &mut self, + fn poll_with_strategy<'x, S: Strategy<'x>>( + self: Pin<&'x mut Self>, + strategy: &mut S, cx: &mut S::Context, ) -> Poll> { + let mut this = self.project(); + loop { // Attempt to receive a message. - match self.receiver.try_recv() { + match this.receiver.try_recv() { Ok(msg) => return Poll::Ready(Ok(msg)), Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)), Err(TryRecvError::Empty) => {} } // Receiving failed - now start listening for notifications or wait for one. - match self.listener.take() { - None => { - // Start listening and then try receiving again. - self.listener = Some(self.receiver.channel.recv_ops.listen()); - } - Some(l) => { - // Poll using the given strategy. - if let Err(l) = S::poll(l, cx) { - self.listener = Some(l); - return Poll::Pending; - } - } + if this.listener.is_listening() { + // Poll using the given strategy + ready!(S::poll(strategy, this.listener.as_mut(), cx)); + } else { + this.listener.as_mut().listen(); } } } - - /// Run with the blocking strategy. - fn wait(mut self) -> Result { - match self.run_with_strategy::(&mut ()) { - Poll::Ready(res) => res, - Poll::Pending => unreachable!(), - } - } -} - -impl<'a, T> Future for Recv<'a, T> { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.run_with_strategy::>(cx) - } } -/// A strategy used to poll an `EventListener`. -trait Strategy { - /// Context needed to be provided to the `poll` method. - type Context; +#[cfg(feature = "std")] +use std::process::abort; - /// Polls the given `EventListener`. - /// - /// Returns the `EventListener` back if it was not completed; otherwise, - /// returns `Ok(())`. - fn poll(evl: EventListener, cx: &mut Self::Context) -> Result<(), EventListener>; -} - -/// Non-blocking strategy for use in asynchronous code. -struct NonBlocking<'a>(&'a mut ()); - -impl<'a> Strategy for NonBlocking<'a> { - type Context = Context<'a>; +#[cfg(not(feature = "std"))] +fn abort() -> ! { + struct PanicOnDrop; - fn poll(mut evl: EventListener, cx: &mut Context<'a>) -> Result<(), EventListener> { - match Pin::new(&mut evl).poll(cx) { - Poll::Ready(()) => Ok(()), - Poll::Pending => Err(evl), + impl Drop for PanicOnDrop { + fn drop(&mut self) { + panic!("Panic while panicking to abort"); } } -} - -/// Blocking strategy for use in synchronous code. -struct Blocking; -impl Strategy for Blocking { - type Context = (); - - fn poll(evl: EventListener, _cx: &mut ()) -> Result<(), EventListener> { - evl.wait(); - Ok(()) - } + let _bomb = PanicOnDrop; + panic!("Panic while panicking to abort") } diff --git a/tests/bounded.rs b/tests/bounded.rs index 0ae4890..b525dfb 100644 --- a/tests/bounded.rs +++ b/tests/bounded.rs @@ -25,6 +25,7 @@ fn smoke() { assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); } +#[cfg(feature = "std")] #[test] fn smoke_blocking() { let (s, r) = bounded(1); @@ -332,9 +333,10 @@ fn forget_blocked_sender() { .add(move || { assert!(future::block_on(s1.send(3)).is_ok()); assert!(future::block_on(s1.send(7)).is_ok()); - let mut s1_fut = s1.send(13); + let s1_fut = s1.send(13); + futures_lite::pin!(s1_fut); // Poll but keep the future alive. - assert_eq!(future::block_on(future::poll_once(&mut s1_fut)), None); + assert_eq!(future::block_on(future::poll_once(s1_fut)), None); sleep(ms(500)); }) .add(move || { @@ -358,8 +360,9 @@ fn forget_blocked_receiver() { Parallel::new() .add(move || { - let mut r1_fut = r1.recv(); + let r1_fut = r1.recv(); // Poll but keep the future alive. + futures_lite::pin!(r1_fut); assert_eq!(future::block_on(future::poll_once(&mut r1_fut)), None); sleep(ms(500)); }) @@ -436,8 +439,9 @@ fn mpmc_stream() { Parallel::new() .each(0..THREADS, { - let mut r = r; + let r = r; move |_| { + futures_lite::pin!(r); for _ in 0..COUNT { let n = future::block_on(r.next()).unwrap(); v[n].fetch_add(1, Ordering::SeqCst); @@ -456,6 +460,7 @@ fn mpmc_stream() { } } +#[cfg(feature = "std")] #[test] fn weak() { let (s, r) = bounded::(3); diff --git a/tests/unbounded.rs b/tests/unbounded.rs index e239d34..90395a3 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -24,6 +24,7 @@ fn smoke() { assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); } +#[cfg(feature = "std")] #[test] fn smoke_blocking() { let (s, r) = unbounded(); @@ -295,8 +296,9 @@ fn mpmc_stream() { Parallel::new() .each(0..THREADS, { - let mut r = r.clone(); + let r = r.clone(); move |_| { + futures_lite::pin!(r); for _ in 0..COUNT { let n = future::block_on(r.next()).unwrap(); v[n].fetch_add(1, Ordering::SeqCst); @@ -317,6 +319,7 @@ fn mpmc_stream() { } } +#[cfg(feature = "std")] #[test] fn weak() { let (s, r) = unbounded::();