Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump to event-listener v3.0.0 #43

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ name = "broadcast_bench"
[features]

[dependencies]
event-listener = "2.5.2"
event-listener = "3"
event-listener-strategy = "0.1.0"
futures-core = "0.3.21"

[dev-dependencies]
Expand Down
133 changes: 107 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@
//! [tbc]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html
//! [tom]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html#lagging
//!
#![forbid(unsafe_code, future_incompatible, rust_2018_idioms)]
#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, rustdoc::missing_doc_code_examples, unreachable_pub)]
#![forbid(unsafe_code)]
#![deny(missing_debug_implementations, nonstandard_style, rust_2018_idioms)]
#![warn(rustdoc::missing_doc_code_examples, unreachable_pub)]

Check warning on line 96 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, nightly)

unknown lint: `rustdoc::missing_doc_code_examples`

Check warning on line 96 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, beta)

unknown lint: `rustdoc::missing_doc_code_examples`

Check warning on line 96 in src/lib.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable)

unknown lint: `rustdoc::missing_doc_code_examples`
#![doc(
html_favicon_url = "https://github.com/raw/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
Expand All @@ -116,6 +116,7 @@
use std::task::{Context, Poll};

use event_listener::{Event, EventListener};
use event_listener_strategy::{easy_wrapper, EventListenerFuture};
use futures_core::{ready, stream::Stream};

/// Create a new broadcast channel.
Expand Down Expand Up @@ -649,6 +650,10 @@
///
/// If the channel is closed, this method returns an error.
///
/// The future returned by this function is pinned to the heap. If the future being `Unpin` is
/// not important to you, or if you just `.await` this future, use the [`broadcast_direct`] method
/// instead.
///
/// # Examples
///
/// ```
Expand All @@ -662,12 +667,35 @@
/// assert_eq!(s.broadcast(2).await, Err(SendError(2)));
/// # });
/// ```
pub fn broadcast(&self, msg: T) -> Send<'_, T> {
Send {
pub fn broadcast(&self, msg: T) -> Pin<Box<Send<'_, T>>> {
Box::pin(self.broadcast_direct(msg))
}

/// Broadcasts a message on the channel without pinning the future to the heap.
///
/// The future returned by this method is not `Unpin` and must be pinned before use. This is
/// the desired behavior if you just `.await` on the future. For other uses cases, use the
zeenix marked this conversation as resolved.
Show resolved Hide resolved
/// [`broadcast`] method instead.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_broadcast::{broadcast, SendError};
///
/// let (s, r) = broadcast(1);
///
/// assert_eq!(s.broadcast_direct(1).await, Ok(None));
/// drop(r);
/// assert_eq!(s.broadcast_direct(2).await, Err(SendError(2)));
/// # });
/// ```
pub fn broadcast_direct(&self, msg: T) -> Send<'_, T> {
Send::_new(SendInner {
sender: self,
listener: None,
msg: Some(msg),
}
})
}

/// Attempts to broadcast a message on the channel.
Expand Down Expand Up @@ -757,7 +785,7 @@
pos: u64,

/// Listens for a send or close event to unblock this stream.
listener: Option<EventListener>,
listener: Option<Pin<Box<EventListener>>>,
}

impl<T> Receiver<T> {
Expand Down Expand Up @@ -1103,6 +1131,10 @@
/// this method returns an error and readjusts its cursor to point to the first available
/// message.
///
/// The future returned by this function is pinned to the heap. If the future being `Unpin` is
/// not important to you, or if you just `.await` this future, use the [`recv_direct`] method
/// instead.
///
/// # Examples
///
/// ```
Expand All @@ -1121,11 +1153,39 @@
/// assert_eq!(r2.recv().await, Err(RecvError::Closed));
/// # });
/// ```
pub fn recv(&mut self) -> Recv<'_, T> {
Recv {
pub fn recv(&mut self) -> Pin<Box<Recv<'_, T>>> {
Box::pin(self.recv_direct())
}

/// Receives a message from the channel without pinning the future to the heap.
///
/// The future returned by this method is not `Unpin` and must be pinned before use. This is
/// the desired behavior if you just `.await` on the future. For other uses cases, use the
/// [`recv`] method instead.
///
/// # Examples
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_broadcast::{broadcast, RecvError};
///
/// let (s, mut r1) = broadcast(1);
/// let mut r2 = r1.clone();
///
/// assert_eq!(s.broadcast(1).await, Ok(None));
/// drop(s);
///
/// assert_eq!(r1.recv_direct().await, Ok(1));
/// assert_eq!(r1.recv_direct().await, Err(RecvError::Closed));
/// assert_eq!(r2.recv_direct().await, Ok(1));
/// assert_eq!(r2.recv_direct().await, Err(RecvError::Closed));
/// # });
/// ```
pub fn recv_direct(&mut self) -> Recv<'_, T> {
Recv::_new(RecvInner {
receiver: self,
listener: None,
}
})
}

/// Attempts to receive a message from the channel.
Expand Down Expand Up @@ -1528,21 +1588,32 @@
}
}

/// A future returned by [`Sender::broadcast()`].
easy_wrapper! {
/// A future returned by [`Sender::broadcast()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Send<'a, T: Clone>(SendInner<'a, T> => Result<Option<T>, SendError<T>>);
pub(crate) wait();
}

#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Send<'a, T> {
struct SendInner<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
// TODO: Remove the Pin<Box<>> at the next breaking release and make this type !Unpin
zeenix marked this conversation as resolved.
Show resolved Hide resolved
listener: Option<Pin<Box<EventListener>>>,
msg: Option<T>,
}

impl<'a, T> Unpin for Send<'a, T> {}
impl<'a, T> Unpin for SendInner<'a, T> {}

impl<'a, T: Clone> Future for Send<'a, T> {
impl<'a, T: Clone> EventListenerFuture for SendInner<'a, T> {
type Output = Result<Option<T>, SendError<T>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
self: Pin<&'x mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = Pin::new(self);

loop {
Expand Down Expand Up @@ -1578,28 +1649,38 @@
}
Some(l) => {
// Wait for a notification.
ready!(Pin::new(l).poll(cx));
ready!(strategy.poll(l.as_mut(), context));
this.listener = None;
}
}
}
}
}

/// A future returned by [`Receiver::recv()`].
easy_wrapper! {
/// A future returned by [`Receiver::recv()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Recv<'a, T: Clone>(RecvInner<'a, T> => Result<T, RecvError>);
pub(crate) wait();
}

#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Recv<'a, T> {
struct RecvInner<'a, T> {
receiver: &'a mut Receiver<T>,
listener: Option<EventListener>,
listener: Option<Pin<Box<EventListener>>>,
}

impl<'a, T> Unpin for Recv<'a, T> {}
impl<'a, T> Unpin for RecvInner<'a, T> {}

impl<'a, T: Clone> Future for Recv<'a, T> {
impl<'a, T: Clone> EventListenerFuture for RecvInner<'a, T> {
type Output = Result<T, RecvError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>(
self: Pin<&'x mut Self>,
strategy: &mut S,
context: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = Pin::new(self);

loop {
Expand All @@ -1624,7 +1705,7 @@
}
Some(l) => {
// Wait for a notification.
ready!(Pin::new(l).poll(cx));
ready!(strategy.poll(l.as_mut(), context));
this.listener = None;
}
}
Expand Down
Loading