diff --git a/examples/mutex.rs b/examples/mutex.rs index 3faa341..550bed0 100644 --- a/examples/mutex.rs +++ b/examples/mutex.rs @@ -90,9 +90,7 @@ impl Mutex { } Some(mut l) => { // Wait until a notification is received. - if !l.as_mut().wait_deadline(deadline) { - return None; - } + l.as_mut().wait_deadline(deadline)?; } } } diff --git a/src/lib.rs b/src/lib.rs index b393970..ebdf9f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,7 +97,12 @@ use std::time::{Duration, Instant}; use sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use sync::{Arc, WithMut}; -pub use notify::{IntoNotification, Notification, Notify, NotifyAdditional, Tag, TagWith}; +pub use notify::{Additional, IntoNotification, Notification, Notify, Tag, TagWith}; + +/// Useful trait for listeners. +pub mod prelude { + pub use crate::{IntoNotification, Notification}; +} /// 1.39-compatible replacement for `matches!` macro_rules! matches { @@ -110,7 +115,7 @@ macro_rules! matches { } /// Inner state of [`Event`]. -struct Inner { +struct Inner { /// The number of notified entries, or `usize::MAX` if all of them have been notified. /// /// If there are no entries, this value is set to `usize::MAX`. @@ -121,10 +126,10 @@ struct Inner { /// On `std` platforms, this is an intrusive linked list. On `no_std` platforms, this is a /// more traditional `Vec` of listeners, with an atomic queue used as a backup for high /// contention. - list: sys::List, + list: sys::List, } -impl Inner { +impl Inner { fn new() -> Self { Self { notified: AtomicUsize::new(core::usize::MAX), @@ -152,48 +157,48 @@ impl Inner { /// kind of notification was delivered. /// /// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness. -pub struct Event { +pub struct Event { /// A pointer to heap-allocated inner state. /// /// This pointer is initially null and gets lazily initialized on first use. Semantically, it /// is an `Arc` so it's important to keep in mind that it contributes to the [`Arc`]'s /// reference count. - inner: AtomicPtr, + inner: AtomicPtr>, } -unsafe impl Send for Event {} -unsafe impl Sync for Event {} +unsafe impl Send for Event {} +unsafe impl Sync for Event {} #[cfg(feature = "std")] -impl std::panic::UnwindSafe for Event {} +impl std::panic::UnwindSafe for Event {} #[cfg(feature = "std")] -impl std::panic::RefUnwindSafe for Event {} +impl std::panic::RefUnwindSafe for Event {} -impl fmt::Debug for Event { +impl fmt::Debug for Event { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("Pad { .. }") + f.write_str("Event { .. }") } } -impl Default for Event { +impl Default for Event { #[inline] fn default() -> Self { - Self::new() + Self::with_tag() } } -impl Event { - /// Creates a new [`Event`]. +impl Event { + /// Creates a new `Event` with a tag type. /// /// # Examples /// /// ``` /// use event_listener::Event; /// - /// let event = Event::new(); + /// let event = Event::::with_tag(); /// ``` #[inline] - pub const fn new() -> Self { + pub const fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), } @@ -214,7 +219,7 @@ impl Event { /// let listener = event.listen(); /// ``` #[cold] - pub fn listen(&self) -> Pin> { + pub fn listen(&self) -> Pin>> { let mut listener = Box::pin(EventListener::new(self)); listener.as_mut().listen(); listener @@ -224,13 +229,16 @@ impl Event { /// /// The number is allowed to be zero or exceed the current number of listeners. /// - /// In contrast to [`Event::notify_additional()`], this method only makes sure *at least* `n` - /// listeners among the active ones are notified. + /// The [`Notification`] trait is used to define what kind of notification is delivered. + /// The default implementation (implemented on `usize`) is a notification that only notifies + /// *at least* the specified number of listeners. /// - /// This method emits a `SeqCst` fence before notifying listeners. + /// In certain cases, this function emits a `SeqCst` fence before notifying listeners. /// /// # Examples /// + /// Use the default notification strategy: + /// /// ``` /// use event_listener::Event; /// @@ -249,20 +257,169 @@ impl Event { /// // get notified here since they start listening before `listener3`. /// event.notify(2); /// ``` + /// + /// Notify without emitting a `SeqCst` fence. This uses the [`relaxed`] notification strategy. + /// This is equivalent to calling [`Event::notify_relaxed()`]. + /// + /// [`relaxed`]: IntoNotification::relaxed + /// + /// ``` + /// use event_listener::{prelude::*, Event}; + /// use std::sync::atomic::{self, Ordering}; + /// + /// let event = Event::new(); + /// + /// // This notification gets lost because there are no listeners. + /// event.notify(1.relaxed()); + /// + /// let listener1 = event.listen(); + /// let listener2 = event.listen(); + /// let listener3 = event.listen(); + /// + /// // We should emit a fence manually when using relaxed notifications. + /// atomic::fence(Ordering::SeqCst); + /// + /// // Notifies two listeners. + /// // + /// // Listener queueing is fair, which means `listener1` and `listener2` + /// // get notified here since they start listening before `listener3`. + /// event.notify(2.relaxed()); + /// ``` + /// + /// Notify additional listeners. In contrast to [`Event::notify()`], this method will notify `n` + /// *additional* listeners that were previously unnotified. This uses the [`additional`] + /// notification strategy. This is equivalent to calling [`Event::notify_additional()`]. + /// + /// [`additional`]: IntoNotification::additional + /// + /// ``` + /// use event_listener::{prelude::*, Event}; + /// + /// let event = Event::new(); + /// + /// // This notification gets lost because there are no listeners. + /// event.notify(1.additional()); + /// + /// let listener1 = event.listen(); + /// let listener2 = event.listen(); + /// let listener3 = event.listen(); + /// + /// // Notifies two listeners. + /// // + /// // Listener queueing is fair, which means `listener1` and `listener2` + /// // get notified here since they start listening before `listener3`. + /// event.notify(1.additional()); + /// event.notify(1.additional()); + /// ``` + /// + /// Notifies with the [`additional`] and [`relaxed`] strategies at the same time. This is + /// equivalent to calling [`Event::notify_additional_relaxed()`]. + /// + /// ``` + /// use event_listener::{prelude::*, Event}; + /// use std::sync::atomic::{self, Ordering}; + /// + /// let event = Event::new(); + /// + /// // This notification gets lost because there are no listeners. + /// event.notify(1.additional().relaxed()); + /// + /// let listener1 = event.listen(); + /// let listener2 = event.listen(); + /// let listener3 = event.listen(); + /// + /// // We should emit a fence manually when using relaxed notifications. + /// atomic::fence(Ordering::SeqCst); + /// + /// // Notifies two listeners. + /// // + /// // Listener queueing is fair, which means `listener1` and `listener2` + /// // get notified here since they start listening before `listener3`. + /// event.notify(1.additional().relaxed()); + /// event.notify(1.additional().relaxed()); + /// ``` #[inline] - pub fn notify(&self, n: usize) { + pub fn notify(&self, notify: impl IntoNotification) { + let notify = notify.into_notification(); + // Make sure the notification comes after whatever triggered it. - notify::full_fence(); + notify.fence(); if let Some(inner) = self.try_inner() { + let limit = if notify.is_additional() { + usize::MAX + } else { + notify.count() + }; + // Notify if there is at least one unnotified listener and the number of notified - // listeners is less than `n`. - if inner.notified.load(Ordering::Acquire) < n { - inner.notify(n, false); + // listeners is less than `limit`. + if inner.notified.load(Ordering::Acquire) < limit { + inner.notify(notify); } } } + /// Return a reference to the inner state if it has been initialized. + #[inline] + fn try_inner(&self) -> Option<&Inner> { + let inner = self.inner.load(Ordering::Acquire); + unsafe { inner.as_ref() } + } + + /// Returns a raw, initialized pointer to the inner state. + /// + /// This returns a raw pointer instead of reference because `from_raw` + /// requires raw/mut provenance: . + fn inner(&self) -> *const Inner { + let mut inner = self.inner.load(Ordering::Acquire); + + // If this is the first use, initialize the state. + if inner.is_null() { + // Allocate the state on the heap. + let new = Arc::new(Inner::::new()); + + // Convert the state to a raw pointer. + let new = Arc::into_raw(new) as *mut Inner; + + // Replace the null pointer with the new state pointer. + inner = self + .inner + .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire) + .unwrap_or_else(|x| x); + + // Check if the old pointer value was indeed null. + if inner.is_null() { + // If yes, then use the new state pointer. + inner = new; + } else { + // If not, that means a concurrent operation has initialized the state. + // In that case, use the old pointer and deallocate the new one. + unsafe { + drop(Arc::from_raw(new)); + } + } + } + + inner + } +} + +impl Event<()> { + /// Creates a new [`Event`]. + /// + /// # Examples + /// + /// ``` + /// use event_listener::Event; + /// + /// let event = Event::new(); + /// ``` + #[inline] + pub const fn new() -> Self { + Self::with_tag() + } + /// Notifies a number of active listeners without emitting a `SeqCst` fence. /// /// The number is allowed to be zero or exceed the current number of listeners. @@ -272,6 +429,20 @@ impl Event { /// /// Unlike [`Event::notify()`], this method does not emit a `SeqCst` fence. /// + /// This method only works for untagged events. In other cases, it is recommended to instead + /// use [`Event::notify()`] like so: + /// + /// ``` + /// use event_listener::{prelude::*, Event}; + /// let event = Event::new(); + /// + /// // Old way: + /// event.notify_relaxed(1); + /// + /// // New way: + /// event.notify(1.relaxed()); + /// ``` + /// /// # Examples /// /// ``` @@ -281,7 +452,7 @@ impl Event { /// let event = Event::new(); /// /// // This notification gets lost because there are no listeners. - /// event.notify(1); + /// event.notify_relaxed(1); /// /// let listener1 = event.listen(); /// let listener2 = event.listen(); @@ -294,17 +465,11 @@ impl Event { /// // /// // Listener queueing is fair, which means `listener1` and `listener2` /// // get notified here since they start listening before `listener3`. - /// event.notify(2); + /// event.notify_relaxed(2); /// ``` #[inline] pub fn notify_relaxed(&self, n: usize) { - if let Some(inner) = self.try_inner() { - // Notify if there is at least one unnotified listener and the number of notified - // listeners is less than `n`. - if inner.notified.load(Ordering::Acquire) < n { - inner.notify(n, true); - } - } + self.notify(n.relaxed()) } /// Notifies a number of active and still unnotified listeners. @@ -316,6 +481,20 @@ impl Event { /// /// This method emits a `SeqCst` fence before notifying listeners. /// + /// This method only works for untagged events. In other cases, it is recommended to instead + /// use [`Event::notify()`] like so: + /// + /// ``` + /// use event_listener::{prelude::*, Event}; + /// let event = Event::new(); + /// + /// // Old way: + /// event.notify_additional(1); + /// + /// // New way: + /// event.notify(1.additional()); + /// ``` + /// /// # Examples /// /// ``` @@ -324,7 +503,7 @@ impl Event { /// let event = Event::new(); /// /// // This notification gets lost because there are no listeners. - /// event.notify(1); + /// event.notify_additional(1); /// /// let listener1 = event.listen(); /// let listener2 = event.listen(); @@ -339,15 +518,7 @@ impl Event { /// ``` #[inline] pub fn notify_additional(&self, n: usize) { - // Make sure the notification comes after whatever triggered it. - notify::full_fence(); - - if let Some(inner) = self.try_inner() { - // Notify if there is at least one unnotified listener. - if inner.notified.load(Ordering::Acquire) < core::usize::MAX { - inner.notify(n, true); - } - } + self.notify(n.additional()) } /// Notifies a number of active and still unnotified listeners without emitting a `SeqCst` @@ -360,6 +531,20 @@ impl Event { /// /// Unlike [`Event::notify_additional()`], this method does not emit a `SeqCst` fence. /// + /// This method only works for untagged events. In other cases, it is recommended to instead + /// use [`Event::notify()`] like so: + /// + /// ``` + /// use event_listener::{prelude::*, Event}; + /// let event = Event::new(); + /// + /// // Old way: + /// event.notify_additional_relaxed(1); + /// + /// // New way: + /// event.notify(1.additional().relaxed()); + /// ``` + /// /// # Examples /// /// ``` @@ -387,60 +572,11 @@ impl Event { /// ``` #[inline] pub fn notify_additional_relaxed(&self, n: usize) { - if let Some(inner) = self.try_inner() { - // Notify if there is at least one unnotified listener. - if inner.notified.load(Ordering::Acquire) < core::usize::MAX { - inner.notify(n, true); - } - } - } - - /// Return a reference to the inner state if it has been initialized. - #[inline] - fn try_inner(&self) -> Option<&Inner> { - let inner = self.inner.load(Ordering::Acquire); - unsafe { inner.as_ref() } - } - - /// Returns a raw, initialized pointer to the inner state. - /// - /// This returns a raw pointer instead of reference because `from_raw` - /// requires raw/mut provenance: . - fn inner(&self) -> *const Inner { - let mut inner = self.inner.load(Ordering::Acquire); - - // If this is the first use, initialize the state. - if inner.is_null() { - // Allocate the state on the heap. - let new = Arc::new(Inner::new()); - - // Convert the state to a raw pointer. - let new = Arc::into_raw(new) as *mut Inner; - - // Replace the null pointer with the new state pointer. - inner = self - .inner - .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire) - .unwrap_or_else(|x| x); - - // Check if the old pointer value was indeed null. - if inner.is_null() { - // If yes, then use the new state pointer. - inner = new; - } else { - // If not, that means a concurrent operation has initialized the state. - // In that case, use the old pointer and deallocate the new one. - unsafe { - drop(Arc::from_raw(new)); - } - } - } - - inner + self.notify(n.additional().relaxed()) } } -impl Drop for Event { +impl Drop for Event { #[inline] fn drop(&mut self) { self.inner.with_mut(|&mut inner| { @@ -464,17 +600,17 @@ impl Drop for Event { /// If a notified listener is dropped without receiving a notification, dropping will notify /// another active listener. Whether one *additional* listener will be notified depends on what /// kind of notification was delivered. -pub struct EventListener(Listener>); +pub struct EventListener(Listener>>); -impl fmt::Debug for EventListener { +impl fmt::Debug for EventListener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("EventListener { .. }") } } -impl EventListener { +impl EventListener { /// Create a new `EventListener` that will wait for a notification from the given [`Event`]. - pub fn new(event: &Event) -> Self { + pub fn new(event: &Event) -> Self { let inner = event.inner(); let listener = Listener { @@ -514,8 +650,8 @@ impl EventListener { /// listener.as_mut().wait(); /// ``` #[cfg(feature = "std")] - pub fn wait(self: Pin<&mut Self>) { - self.listener().wait_internal(None); + pub fn wait(self: Pin<&mut Self>) -> T { + self.listener().wait_internal(None).unwrap() } /// Blocks until a notification is received or a timeout is reached. @@ -532,10 +668,10 @@ impl EventListener { /// let mut listener = event.listen(); /// /// // There are no notification so this times out. - /// assert!(!listener.as_mut().wait_timeout(Duration::from_secs(1))); + /// assert!(listener.as_mut().wait_timeout(Duration::from_secs(1)).is_none()); /// ``` #[cfg(feature = "std")] - pub fn wait_timeout(self: Pin<&mut Self>, timeout: Duration) -> bool { + pub fn wait_timeout(self: Pin<&mut Self>, timeout: Duration) -> Option { self.listener() .wait_internal(Instant::now().checked_add(timeout)) } @@ -554,10 +690,10 @@ impl EventListener { /// let mut listener = event.listen(); /// /// // There are no notification so this times out. - /// assert!(!listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1))); + /// assert!(listener.as_mut().wait_deadline(Instant::now() + Duration::from_secs(1)).is_none()); /// ``` #[cfg(feature = "std")] - pub fn wait_deadline(self: Pin<&mut Self>, deadline: Instant) -> bool { + pub fn wait_deadline(self: Pin<&mut Self>, deadline: Instant) -> Option { self.listener().wait_internal(Some(deadline)) } @@ -596,8 +732,8 @@ impl EventListener { /// assert!(listener.listens_to(&event)); /// ``` #[inline] - pub fn listens_to(&self, event: &Event) -> bool { - ptr::eq::(&**self.inner(), event.inner.load(Ordering::Acquire)) + pub fn listens_to(&self, event: &Event) -> bool { + ptr::eq::>(&**self.inner(), event.inner.load(Ordering::Acquire)) } /// Returns `true` if both listeners listen to the same `Event`. @@ -613,44 +749,44 @@ impl EventListener { /// /// assert!(listener1.same_event(&listener2)); /// ``` - pub fn same_event(&self, other: &EventListener) -> bool { - ptr::eq::(&**self.inner(), &**other.inner()) + pub fn same_event(&self, other: &EventListener) -> bool { + ptr::eq::>(&**self.inner(), &**other.inner()) } - fn listener(self: Pin<&mut Self>) -> Pin<&mut Listener>> { + fn listener(self: Pin<&mut Self>) -> Pin<&mut Listener>>> { unsafe { self.map_unchecked_mut(|this| &mut this.0) } } - fn inner(&self) -> &Arc { + fn inner(&self) -> &Arc> { &self.0.event } } -impl Future for EventListener { - type Output = (); +impl Future for EventListener { + type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.listener().poll_internal(cx) } } -struct Listener + Unpin> { +struct Listener> + Unpin> { /// The reference to the original event. event: B, /// The inner state of the listener. - listener: Option, + listener: Option>, /// Enforce pinning. _pin: PhantomPinned, } -unsafe impl + Unpin + Send> Send for Listener {} -unsafe impl + Unpin + Sync> Sync for Listener {} +unsafe impl> + Unpin + Send> Send for Listener {} +unsafe impl> + Unpin + Sync> Sync for Listener {} -impl + Unpin> Listener { +impl> + Unpin> Listener { /// Pin-project this listener. - fn project(self: Pin<&mut Self>) -> (&Inner, Pin<&mut Option>) { + fn project(self: Pin<&mut Self>) -> (&Inner, Pin<&mut Option>>) { // SAFETY: `event` is `Unpin`, and `listener`'s pin status is preserved unsafe { let Listener { @@ -669,7 +805,7 @@ impl + Unpin> Listener { /// Wait until the provided deadline. #[cfg(feature = "std")] - fn wait_internal(mut self: Pin<&mut Self>, deadline: Option) -> bool { + fn wait_internal(mut self: Pin<&mut Self>, deadline: Option) -> Option { use std::cell::RefCell; std::thread_local! { @@ -708,24 +844,13 @@ impl + Unpin> Listener { deadline: Option, parker: &Parker, unparker: TaskRef<'_>, - ) -> bool { + ) -> Option { let (inner, mut listener) = self.project(); // Set the listener's state to `Task`. - match inner.register(listener.as_mut(), unparker) { - Some(true) => { - // We were already notified, so we don't need to park. - return true; - } - - Some(false) => { - // We're now waiting for a notification. - } - - None => { - // We were never inserted into the list. - panic!("listener was never inserted into the list"); - } + if let Some(tag) = inner.register(listener.as_mut(), unparker).notified() { + // We were already notified, so we don't need to park. + return Some(tag); } // Wait until a notification is received or the timeout is reached. @@ -741,17 +866,14 @@ impl + Unpin> Listener { return inner .remove(listener, false) .expect("We never removed ourself from the list") - .is_notified(); + .notified(); } } } // See if we were notified. - if inner - .register(listener.as_mut(), unparker) - .expect("We never removed ourself from the list") - { - return true; + if let Some(tag) = inner.register(listener.as_mut(), unparker).notified() { + return Some(tag); } } } @@ -767,30 +889,28 @@ impl + Unpin> Listener { } /// Poll this listener for a notification. - fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + fn poll_internal(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (inner, mut listener) = self.project(); // Try to register the listener. - match inner.register(listener.as_mut(), TaskRef::Waker(cx.waker())) { - Some(true) => { + match inner + .register(listener.as_mut(), TaskRef::Waker(cx.waker())) + .notified() + { + Some(tag) => { // We were already notified, so we don't need to park. - Poll::Ready(()) + Poll::Ready(tag) } - Some(false) => { + None => { // We're now waiting for a notification. Poll::Pending } - - None => { - // We were never inserted into the list. - panic!("listener was never inserted into the list"); - } } } } -impl + Unpin> Drop for Listener { +impl> + Unpin> Drop for Listener { fn drop(&mut self) { // If we're being dropped, we need to remove ourself from the list. let (inner, listener) = unsafe { Pin::new_unchecked(self).project() }; @@ -801,14 +921,20 @@ impl + Unpin> Drop for Listener { /// The state of a listener. #[derive(Debug, PartialEq)] -enum State { +enum State { /// The listener was just created. Created, /// The listener has received a notification. /// /// The `bool` is `true` if this was an "additional" notification. - Notified(bool), + Notified { + /// Whether or not this is an "additional" notification. + additional: bool, + + /// The tag associated with the notification. + tag: T, + }, /// A task is waiting for a notification. Task(Task), @@ -817,9 +943,44 @@ enum State { NotifiedTaken, } -impl State { +impl State { fn is_notified(&self) -> bool { - matches!(self, Self::Notified(_) | Self::NotifiedTaken) + matches!(self, Self::Notified { .. } | Self::NotifiedTaken) + } + + /// If this state was notified, return the tag associated with the notification. + fn notified(self) -> Option { + match self { + Self::Notified { tag, .. } => Some(tag), + Self::NotifiedTaken => panic!("listener was already notified but taken"), + _ => None, + } + } +} + +/// The result of registering a listener. +#[derive(Debug, PartialEq)] +enum RegisterResult { + /// The listener was already notified. + Notified(T), + + /// The listener has been registered. + Registered, + + /// The listener was never inserted into the list. + NeverInserted, +} + +impl RegisterResult { + /// Whether or not the listener was notified. + /// + /// Panics if the listener was never inserted into the list. + fn notified(self) -> Option { + match self { + Self::Notified(tag) => Some(tag), + Self::Registered => None, + Self::NeverInserted => panic!("listener was never inserted into the list"), + } } } diff --git a/src/no_std.rs b/src/no_std.rs index 36727d2..a22c701 100644 --- a/src/no_std.rs +++ b/src/no_std.rs @@ -18,12 +18,14 @@ mod queue; use node::{Node, TaskWaiting}; use queue::Queue; +use crate::notify::{GenericNotify, Notification}; use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::cell::{Cell, UnsafeCell}; use crate::sync::Arc; -use crate::{State, Task, TaskRef}; +use crate::{RegisterResult, State, Task, TaskRef}; use core::fmt; +use core::marker::PhantomData; use core::mem; use core::num::NonZeroUsize; use core::ops; @@ -31,9 +33,9 @@ use core::pin::Pin; use alloc::vec::Vec; -impl crate::Inner { +impl crate::Inner { /// Locks the list. - fn try_lock(&self) -> Option> { + fn try_lock(&self) -> Option> { self.list.inner.try_lock().map(|guard| ListGuard { inner: self, guard: Some(guard), @@ -43,7 +45,7 @@ impl crate::Inner { /// Add a new listener to the list. /// /// Does nothing if the list is already registered. - pub(crate) fn insert(&self, mut listener: Pin<&mut Option>) { + pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { if listener.as_ref().as_pin_ref().is_some() { // Already inserted. return; @@ -67,9 +69,9 @@ impl crate::Inner { /// Remove a listener from the list. pub(crate) fn remove( &self, - mut listener: Pin<&mut Option>, + mut listener: Pin<&mut Option>>, propogate: bool, - ) -> Option { + ) -> Option> { let state = match listener.as_mut().take() { Some(Listener::HasNode(key)) => { match self.try_lock() { @@ -99,6 +101,8 @@ impl crate::Inner { } None => None, + + _ => unreachable!(), }; state @@ -106,19 +110,47 @@ impl crate::Inner { /// Notifies a number of entries. #[cold] - pub(crate) fn notify(&self, n: usize, additional: bool) { + pub(crate) fn notify(&self, mut notify: impl Notification) { match self.try_lock() { Some(mut guard) => { // Notify the listeners. - guard.notify(n, additional); + guard.notify(notify); } None => { // Push it to the queue. - let node = Node::Notify { - count: n, - additional, - }; + let node = Node::Notify(GenericNotify::new( + notify.count(), + notify.is_additional(), + { + // Collect every tag we need. + let mut tags = { + let count = notify.count(); + let mut tags = Vec::with_capacity(count); + for _ in 0..count { + tags.push(notify.next_tag()); + } + + // Convert into an iterator. + tags.into_iter() + }; + + // Function that iterates over the tags. + let tags = Box::new(move || tags.next().unwrap()); + + // SAFETY: The generic `GenericNotify` expects a `Box` that is `Send` and + // `Sync`. The `tags` function is `Send` if `T` is `Send`, and `Sync` if `T` + // is `Sync`. However, the end result (the `Event`) is `Send` and `Sync` if + // `T` is `Send` and `Sync`, so we can safely assume that the `Box` is + // going to be handled safely. + // + // This also works out lifetime wise, since the Box does not + // outlive the `Event`. + unsafe { + mem::transmute:: T>, node::GenericTags>(tags) + } + }, + )); self.list.queue.push(node); } @@ -131,9 +163,9 @@ impl crate::Inner { /// isn't inserted, returns `None`. pub(crate) fn register( &self, - mut listener: Pin<&mut Option>, + mut listener: Pin<&mut Option>>, task: TaskRef<'_>, - ) -> Option { + ) -> RegisterResult { loop { match listener.as_mut().take() { Some(Listener::HasNode(key)) => { @@ -148,7 +180,7 @@ impl crate::Inner { // Wait for the lock. let node = Node::Waiting(task.into_task()); self.list.queue.push(node); - return Some(false); + return RegisterResult::Registered; } } } @@ -165,27 +197,29 @@ impl crate::Inner { // We're still queued, so register the task. task_waiting.register(task.into_task()); *listener = Some(Listener::Queued(task_waiting)); - return None; + return RegisterResult::Registered; } } } - _ => return None, + None => return RegisterResult::NeverInserted, + + _ => unreachable!(), } } } } -pub(crate) struct List { +pub(crate) struct List { /// The inner list. - inner: Mutex, + inner: Mutex>, /// The queue of pending operations. - queue: Queue, + queue: Queue, } -impl List { - pub(super) fn new() -> List { +impl List { + pub(super) fn new() -> List { List { inner: Mutex::new(ListenerSlab::new()), queue: Queue::new(), @@ -194,21 +228,21 @@ impl List { } /// The guard returned by [`Inner::lock`]. -pub(crate) struct ListGuard<'a> { +pub(crate) struct ListGuard<'a, T: Unpin> { /// Reference to the inner state. - pub(crate) inner: &'a crate::Inner, + pub(crate) inner: &'a crate::Inner, /// The locked list. - pub(crate) guard: Option>, + pub(crate) guard: Option>>, } -impl ListGuard<'_> { +impl ListGuard<'_, T> { #[cold] fn process_nodes_slow( &mut self, - start_node: Node, + start_node: Node, tasks: &mut Vec, - guard: &mut MutexGuard<'_, ListenerSlab>, + guard: &mut MutexGuard<'_, ListenerSlab>, ) { // Process the start node. tasks.extend(start_node.apply(guard)); @@ -220,21 +254,21 @@ impl ListGuard<'_> { } } -impl ops::Deref for ListGuard<'_> { - type Target = ListenerSlab; +impl ops::Deref for ListGuard<'_, T> { + type Target = ListenerSlab; fn deref(&self) -> &Self::Target { self.guard.as_ref().unwrap() } } -impl ops::DerefMut for ListGuard<'_> { +impl ops::DerefMut for ListGuard<'_, T> { fn deref_mut(&mut self) -> &mut Self::Target { self.guard.as_mut().unwrap() } } -impl Drop for ListGuard<'_> { +impl Drop for ListGuard<'_, T> { fn drop(&mut self) { let Self { inner, guard } = self; let mut list = guard.take().unwrap(); @@ -267,11 +301,11 @@ impl Drop for ListGuard<'_> { } /// An entry representing a registered listener. -enum Entry { +enum Entry { /// Contains the listener state. Listener { /// The state of the listener. - state: Cell, + state: Cell>, /// The previous listener in the list. prev: Cell>, @@ -287,38 +321,38 @@ enum Entry { Sentinel, } -struct TakenState<'a> { - slot: &'a Cell, - state: State, +struct TakenState<'a, T> { + slot: &'a Cell>, + state: State, } -impl Drop for TakenState<'_> { +impl Drop for TakenState<'_, T> { fn drop(&mut self) { self.slot .set(mem::replace(&mut self.state, State::NotifiedTaken)); } } -impl fmt::Debug for TakenState<'_> { +impl fmt::Debug for TakenState<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&self.state, f) } } -impl PartialEq for TakenState<'_> { +impl PartialEq for TakenState<'_, T> { fn eq(&self, other: &Self) -> bool { self.state == other.state } } -impl<'a> TakenState<'a> { - fn new(slot: &'a Cell) -> Self { +impl<'a, T> TakenState<'a, T> { + fn new(slot: &'a Cell>) -> Self { let state = slot.replace(State::NotifiedTaken); Self { slot, state } } } -impl fmt::Debug for Entry { +impl fmt::Debug for Entry { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Entry::Listener { state, next, prev } => f @@ -333,8 +367,8 @@ impl fmt::Debug for Entry { } } -impl PartialEq for Entry { - fn eq(&self, other: &Entry) -> bool { +impl PartialEq for Entry { + fn eq(&self, other: &Entry) -> bool { match (self, other) { ( Self::Listener { @@ -361,8 +395,8 @@ impl PartialEq for Entry { } } -impl Entry { - fn state(&self) -> &Cell { +impl Entry { + fn state(&self) -> &Cell> { match self { Entry::Listener { state, .. } => state, _ => unreachable!(), @@ -385,9 +419,9 @@ impl Entry { } /// A linked list of entries. -pub(crate) struct ListenerSlab { +pub(crate) struct ListenerSlab { /// The raw list of entries. - listeners: Vec, + listeners: Vec>, /// First entry in the list. head: Option, @@ -409,7 +443,7 @@ pub(crate) struct ListenerSlab { first_empty: NonZeroUsize, } -impl ListenerSlab { +impl ListenerSlab { /// Create a new, empty list. pub(crate) fn new() -> Self { Self { @@ -424,7 +458,7 @@ impl ListenerSlab { } /// Inserts a new entry into the list. - pub(crate) fn insert(&mut self, state: State) -> NonZeroUsize { + pub(crate) fn insert(&mut self, state: State) -> NonZeroUsize { // Add the new entry into the list. let key = { let entry = Entry::Listener { @@ -476,7 +510,7 @@ impl ListenerSlab { } /// Removes an entry from the list and returns its state. - pub(crate) fn remove(&mut self, key: NonZeroUsize, propogate: bool) -> Option { + pub(crate) fn remove(&mut self, key: NonZeroUsize, propogate: bool) -> Option> { let entry = &self.listeners[key.get()]; let prev = entry.prev().get(); let next = entry.next().get(); @@ -505,7 +539,7 @@ impl ListenerSlab { ); self.first_empty = key; - let state = match entry { + let mut state = match entry { Entry::Listener { state, .. } => state.into_inner(), _ => unreachable!(), }; @@ -516,8 +550,15 @@ impl ListenerSlab { if propogate { // Propogate the notification to the next entry. - if let State::Notified(additional) = state { - self.notify(1, additional); + let state = mem::replace(&mut state, State::NotifiedTaken); + if let State::Notified { tag, additional } = state { + let tags = { + let mut tag = Some(tag); + + move || tag.take().expect("called more than once") + }; + + self.notify(GenericNotify::new(1, additional, tags)); } } } @@ -528,8 +569,10 @@ impl ListenerSlab { /// Notifies a number of listeners. #[cold] - pub(crate) fn notify(&mut self, mut n: usize, additional: bool) { - if !additional { + pub(crate) fn notify(&mut self, mut notify: impl Notification) { + let mut n = notify.count(); + let is_additional = notify.is_additional(); + if !is_additional { // Make sure we're not notifying more than we have. if n <= self.notified { return; @@ -550,7 +593,11 @@ impl ListenerSlab { self.start = entry.next().get(); // Set the state to `Notified` and notify. - if let State::Task(task) = entry.state().replace(State::Notified(additional)) { + let tag = notify.next_tag(); + if let State::Task(task) = entry.state().replace(State::Notified { + tag, + additional: is_additional, + }) { task.wake(); } @@ -567,23 +614,23 @@ impl ListenerSlab { /// isn't inserted, returns `None`. pub(crate) fn register( &mut self, - mut listener: Pin<&mut Option>, + mut listener: Pin<&mut Option>>, task: TaskRef<'_>, - ) -> Option { + ) -> RegisterResult { let key = match *listener { Some(Listener::HasNode(key)) => key, - _ => return None, + _ => return RegisterResult::NeverInserted, }; let entry = &self.listeners[key.get()]; // Take the state out and check it. match entry.state().replace(State::NotifiedTaken) { - State::Notified(_) | State::NotifiedTaken => { + State::Notified { tag, .. } => { // The listener was already notified, so we don't need to do anything. - self.remove(key, false)?; + self.remove(key, false); *listener = None; - Some(true) + RegisterResult::Notified(tag) } State::Task(other_task) => { @@ -594,28 +641,31 @@ impl ListenerSlab { entry.state().set(State::Task(task.into_task())); } - Some(false) + RegisterResult::Registered } _ => { // Register the task. entry.state().set(State::Task(task.into_task())); - Some(false) + RegisterResult::Registered } } } } #[derive(Debug)] -pub(crate) enum Listener { +pub(crate) enum Listener { /// The listener has a node inside of the linked list. HasNode(NonZeroUsize), /// The listener has an entry in the queue that may or may not have a task waiting. Queued(Arc), + + /// Eat the lifetime for consistency. + _EatLifetime(PhantomData), } -impl PartialEq for Listener { +impl PartialEq for Listener { fn eq(&self, other: &Self) -> bool { match (self, other) { (Self::HasNode(a), Self::HasNode(b)) => a == b, @@ -735,7 +785,7 @@ mod tests { #[test] fn smoke_listener_slab() { - let mut listeners = ListenerSlab::new(); + let mut listeners = ListenerSlab::<()>::new(); // Insert a few listeners. let key1 = listeners.insert(State::Created); @@ -816,7 +866,7 @@ mod tests { let key3 = listeners.insert(State::Created); // Notify one. - listeners.notify(1, true); + listeners.notify(GenericNotify::new(1, true, || ())); assert_eq!(listeners.len, 3); assert_eq!(listeners.notified, 1); @@ -828,7 +878,10 @@ mod tests { assert_eq!( listeners.listeners[1], Entry::Listener { - state: Cell::new(State::Notified(true)), + state: Cell::new(State::Notified { + additional: true, + tag: () + }), prev: Cell::new(None), next: Cell::new(Some(key2)), } @@ -851,7 +904,13 @@ mod tests { ); // Remove the notified listener. - assert_eq!(listeners.remove(key1, false), Some(State::Notified(true))); + assert_eq!( + listeners.remove(key1, false), + Some(State::Notified { + additional: true, + tag: () + }) + ); assert_eq!(listeners.len, 2); assert_eq!(listeners.notified, 0); @@ -903,7 +962,7 @@ mod tests { Pin::new(&mut Some(Listener::HasNode(key2))), TaskRef::Waker(&waker) ), - Some(false) + RegisterResult::Registered ); assert_eq!(listeners.len, 3); @@ -939,7 +998,7 @@ mod tests { ); // Notify the listener. - listeners.notify(2, false); + listeners.notify(GenericNotify::new(2, false, || ())); assert_eq!(listeners.len, 3); assert_eq!(listeners.notified, 2); @@ -951,7 +1010,10 @@ mod tests { assert_eq!( listeners.listeners[1], Entry::Listener { - state: Cell::new(State::Notified(false)), + state: Cell::new(State::Notified { + additional: false, + tag: (), + }), prev: Cell::new(None), next: Cell::new(Some(key2)), } @@ -959,7 +1021,10 @@ mod tests { assert_eq!( listeners.listeners[2], Entry::Listener { - state: Cell::new(State::Notified(false)), + state: Cell::new(State::Notified { + additional: false, + tag: (), + }), prev: Cell::new(Some(key1)), next: Cell::new(Some(key3)), } @@ -979,7 +1044,7 @@ mod tests { Pin::new(&mut Some(Listener::HasNode(key2))), TaskRef::Waker(&waker) ), - Some(true) + RegisterResult::Notified(()) ); } @@ -1004,7 +1069,7 @@ mod tests { Pin::new(&mut Some(Listener::HasNode(key2))), TaskRef::Waker(&waker) ), - Some(false) + RegisterResult::Registered ); assert_eq!(listeners.len, 3); @@ -1040,7 +1105,7 @@ mod tests { ); // Notify the first listener. - listeners.notify(1, false); + listeners.notify(GenericNotify::new(1, false, || ())); assert_eq!(listeners.len, 3); assert_eq!(listeners.notified, 1); @@ -1052,7 +1117,10 @@ mod tests { assert_eq!( listeners.listeners[1], Entry::Listener { - state: Cell::new(State::Notified(false)), + state: Cell::new(State::Notified { + additional: false, + tag: (), + }), prev: Cell::new(None), next: Cell::new(Some(key2)), } @@ -1075,7 +1143,7 @@ mod tests { ); // Calling notify again should not change anything. - listeners.notify(1, false); + listeners.notify(GenericNotify::new(1, false, || ())); assert_eq!(listeners.len, 3); assert_eq!(listeners.notified, 1); @@ -1087,7 +1155,10 @@ mod tests { assert_eq!( listeners.listeners[1], Entry::Listener { - state: Cell::new(State::Notified(false)), + state: Cell::new(State::Notified { + additional: false, + tag: (), + }), prev: Cell::new(None), next: Cell::new(Some(key2)), } @@ -1110,7 +1181,13 @@ mod tests { ); // Remove the first listener. - assert_eq!(listeners.remove(key1, false), Some(State::Notified(false))); + assert_eq!( + listeners.remove(key1, false), + Some(State::Notified { + additional: false, + tag: () + }) + ); assert_eq!(listeners.len, 2); assert_eq!(listeners.notified, 0); @@ -1141,7 +1218,7 @@ mod tests { ); // Notify the second listener. - listeners.notify(1, false); + listeners.notify(GenericNotify::new(1, false, || ())); assert!(woken.load(Ordering::SeqCst)); assert_eq!(listeners.len, 2); @@ -1158,7 +1235,10 @@ mod tests { assert_eq!( listeners.listeners[2], Entry::Listener { - state: Cell::new(State::Notified(false)), + state: Cell::new(State::Notified { + additional: false, + tag: (), + }), prev: Cell::new(None), next: Cell::new(Some(key3)), } @@ -1173,7 +1253,7 @@ mod tests { ); // Remove and propogate the second listener. - assert_eq!(listeners.remove(key2, true), Some(State::Notified(false))); + assert_eq!(listeners.remove(key2, true), Some(State::NotifiedTaken)); // The third listener should be notified. assert_eq!(listeners.len, 1); @@ -1194,14 +1274,23 @@ mod tests { assert_eq!( listeners.listeners[3], Entry::Listener { - state: Cell::new(State::Notified(false)), + state: Cell::new(State::Notified { + additional: false, + tag: (), + }), prev: Cell::new(None), next: Cell::new(None), } ); // Remove the third listener. - assert_eq!(listeners.remove(key3, false), Some(State::Notified(false))); + assert_eq!( + listeners.remove(key3, false), + Some(State::Notified { + additional: false, + tag: () + }) + ); } #[test] @@ -1231,34 +1320,34 @@ mod tests { }); assert_eq!( inner.register(Pin::new(&mut listener2), TaskRef::Waker(&waker)), - Some(false) + RegisterResult::Registered ); // Notify the first listener. - inner.notify(1, false); + inner.notify(GenericNotify::new(1, false, || ())); assert!(!woken.load(Ordering::SeqCst)); // Another notify should do nothing. - inner.notify(1, false); + inner.notify(GenericNotify::new(1, false, || ())); assert!(!woken.load(Ordering::SeqCst)); // Receive the notification. assert_eq!( inner.register(Pin::new(&mut listener1), TaskRef::Waker(&waker)), - Some(true) + RegisterResult::Notified(()) ); // First listener is already removed. assert!(listener1.is_none()); // Notify the second listener. - inner.notify(1, false); + inner.notify(GenericNotify::new(1, false, || ())); assert!(woken.load(Ordering::SeqCst)); // Remove the second listener and propogate the notification. assert_eq!( inner.remove(Pin::new(&mut listener2), true), - Some(State::Notified(false)) + Some(State::NotifiedTaken) ); // Second listener is already removed. @@ -1267,7 +1356,7 @@ mod tests { // Third listener should be notified. assert_eq!( inner.register(Pin::new(&mut listener3), TaskRef::Waker(&waker)), - Some(true) + RegisterResult::Notified(()) ); } } diff --git a/src/no_std/node.rs b/src/no_std/node.rs index feba69c..8057a01 100644 --- a/src/no_std/node.rs +++ b/src/no_std/node.rs @@ -2,6 +2,7 @@ //! The node that makes up queues. +use crate::notify::GenericNotify; use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use crate::sync::Arc; use crate::sys::ListenerSlab; @@ -12,8 +13,10 @@ use alloc::boxed::Box; use core::num::NonZeroUsize; use core::ptr; +pub(super) type GenericTags = Box T + Send + Sync + 'static>; + /// A node in the backup queue. -pub(crate) enum Node { +pub(crate) enum Node { /// This node is requesting to add a listener. // For some reason, the MSRV build says this variant is never constructed. #[allow(dead_code)] @@ -23,13 +26,7 @@ pub(crate) enum Node { }, /// This node is notifying a listener. - Notify { - /// The number of listeners to notify. - count: usize, - - /// Whether to wake up notified listeners. - additional: bool, - }, + Notify(GenericNotify>), /// This node is removing a listener. RemoveListener { @@ -55,7 +52,7 @@ pub(crate) struct TaskWaiting { entry_id: AtomicUsize, } -impl Node { +impl Node { pub(crate) fn listener() -> (Self, Arc) { // Create a new `TaskWaiting` structure. let task_waiting = Arc::new(TaskWaiting { @@ -72,7 +69,7 @@ impl Node { } /// Apply the node to the list. - pub(super) fn apply(self, list: &mut ListenerSlab) -> Option { + pub(super) fn apply(self, list: &mut ListenerSlab) -> Option { match self { Node::AddListener { task_waiting } => { // Add a new entry to the list. @@ -83,9 +80,9 @@ impl Node { return task_waiting.task.take().map(|t| *t); } - Node::Notify { count, additional } => { + Node::Notify(notify) => { // Notify the next `count` listeners. - list.notify(count, additional); + list.notify(notify); } Node::RemoveListener { listener, diff --git a/src/no_std/queue.rs b/src/no_std/queue.rs index 1d676a1..e63c045 100644 --- a/src/no_std/queue.rs +++ b/src/no_std/queue.rs @@ -7,23 +7,23 @@ use alloc::boxed::Box; use core::ptr; /// An naive atomic queue of operations to process. -pub(super) struct Queue { +pub(super) struct Queue { /// The head of the queue. - head: AtomicPtr, + head: AtomicPtr>, /// The tail of the queue. - tail: AtomicPtr, + tail: AtomicPtr>, } -struct Link { +struct Link { /// The inner node. - node: Node, + node: Node, /// The next node in the queue. - next: AtomicPtr, + next: AtomicPtr>, } -impl Queue { +impl Queue { /// Create a new, empty queue. pub(super) fn new() -> Self { Self { @@ -33,7 +33,7 @@ impl Queue { } /// Push a new node onto the queue. - pub(super) fn push(&self, node: Node) { + pub(super) fn push(&self, node: Node) { // Allocate a new link. let link = Box::into_raw(Box::new(Link { node, @@ -86,7 +86,7 @@ impl Queue { } /// Pop a node from the queue. - pub(super) fn pop(&self) -> Option { + pub(super) fn pop(&self) -> Option> { // Pop the head of the queue. let mut head = self.head.load(Ordering::Acquire); loop { @@ -120,7 +120,7 @@ impl Queue { } } -impl Drop for Queue { +impl Drop for Queue { fn drop(&mut self) { // Pop all nodes from the queue. while self.pop().is_some() {} @@ -129,21 +129,17 @@ impl Drop for Queue { #[cfg(test)] mod tests { + use crate::notify::{GenericNotify, Notification}; + use super::*; - fn node_from_num(num: usize) -> Node { - Node::Notify { - count: num, - additional: true, - } + fn node_from_num(num: usize) -> Node<()> { + Node::Notify(GenericNotify::new(num, true, Box::new(|| ()))) } - fn node_to_num(node: Node) -> usize { + fn node_to_num(node: Node<()>) -> usize { match node { - Node::Notify { - count, - additional: true, - } => count, + Node::Notify(notify) => notify.count(), _ => panic!("unexpected node"), } } diff --git a/src/notify.rs b/src/notify.rs index e3e0910..0bfd8be 100644 --- a/src/notify.rs +++ b/src/notify.rs @@ -324,6 +324,11 @@ macro_rules! impl_for_numeric_types { type Notify = Notify; fn into_notification(self) -> Self::Notify { + #[allow(unused_comparisons)] + if self < 0 { + panic!("negative notification count"); + } + use core::convert::TryInto; Notify::new(self.try_into().expect("overflow")) } diff --git a/src/std.rs b/src/std.rs index f317c18..059e9e2 100644 --- a/src/std.rs +++ b/src/std.rs @@ -2,10 +2,11 @@ //! //! This implementation crates an intrusive linked list of listeners. +use crate::notify::{GenericNotify, Notification}; use crate::sync::atomic::Ordering; use crate::sync::cell::{Cell, UnsafeCell}; use crate::sync::{Mutex, MutexGuard}; -use crate::{State, TaskRef}; +use crate::{RegisterResult, State, TaskRef}; use core::marker::PhantomPinned; use core::mem; @@ -13,17 +14,17 @@ use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::ptr::NonNull; -pub(super) struct List(Mutex); +pub(super) struct List(Mutex>); -struct Inner { +struct Inner { /// The head of the linked list. - head: Option>, + head: Option>>, /// The tail of the linked list. - tail: Option>, + tail: Option>>, /// The first unnotified listener. - next: Option>, + next: Option>>, /// Total number of listeners. len: usize, @@ -32,7 +33,7 @@ struct Inner { notified: usize, } -impl List { +impl List { /// Create a new, empty event listener list. pub(super) fn new() -> Self { Self(Mutex::new(Inner { @@ -45,8 +46,8 @@ impl List { } } -impl crate::Inner { - fn lock(&self) -> ListLock<'_, '_> { +impl crate::Inner { + fn lock(&self) -> ListLock<'_, '_, T> { ListLock { inner: self, lock: self.list.0.lock().unwrap_or_else(|e| e.into_inner()), @@ -56,7 +57,7 @@ impl crate::Inner { /// Add a new listener to the list. /// /// Does nothing is the listener is already registered. - pub(crate) fn insert(&self, listener: Pin<&mut Option>) { + pub(crate) fn insert(&self, listener: Pin<&mut Option>>) { let mut inner = self.lock(); // SAFETY: We are locked, so we can access the inner `link`. @@ -101,16 +102,16 @@ impl crate::Inner { /// Remove a listener from the list. pub(crate) fn remove( &self, - listener: Pin<&mut Option>, + listener: Pin<&mut Option>>, propogate: bool, - ) -> Option { + ) -> Option> { self.lock().remove(listener, propogate) } /// Notifies a number of entries. #[cold] - pub(crate) fn notify(&self, n: usize, additional: bool) { - self.lock().notify(n, additional) + pub(crate) fn notify(&self, notify: impl Notification) { + self.lock().notify(notify) } /// Register a task to be notified when the event is triggered. @@ -119,24 +120,27 @@ impl crate::Inner { /// isn't inserted, returns `None`. pub(crate) fn register( &self, - mut listener: Pin<&mut Option>, + mut listener: Pin<&mut Option>>, task: TaskRef<'_>, - ) -> Option { + ) -> RegisterResult { let mut inner = self.lock(); // SAFETY: We are locked, so we can access the inner `link`. let entry = unsafe { // SAFETY: We never move out the `link` field. - let listener = listener.as_mut().get_unchecked_mut().as_mut()?; + let listener = match listener.as_mut().get_unchecked_mut().as_mut() { + Some(listener) => listener, + None => return RegisterResult::NeverInserted, + }; &*listener.link.get() }; // Take out the state and check it. match entry.state.replace(State::NotifiedTaken) { - State::Notified(_) => { + State::Notified { tag, .. } => { // We have been notified, remove the listener. inner.remove(listener, false); - Some(true) + RegisterResult::Notified(tag) } State::Task(other_task) => { @@ -149,24 +153,24 @@ impl crate::Inner { } })); - Some(false) + RegisterResult::Registered } _ => { // We have not been notified, register the task. entry.state.set(State::Task(task.into_task())); - Some(false) + RegisterResult::Registered } } } } -impl Inner { +impl Inner { fn remove( &mut self, - mut listener: Pin<&mut Option>, + mut listener: Pin<&mut Option>>, propogate: bool, - ) -> Option { + ) -> Option> { let entry = unsafe { // SAFETY: We never move out the `link` field. let listener = listener.as_mut().get_unchecked_mut().as_mut()?; @@ -209,15 +213,20 @@ impl Inner { .into_inner() }; - let state = entry.state.into_inner(); + let mut state = entry.state.into_inner(); // Update the notified count. if state.is_notified() { self.notified -= 1; if propogate { - if let State::Notified(additional) = state { - self.notify(1, additional); + let state = mem::replace(&mut state, State::NotifiedTaken); + if let State::Notified { additional, tag } = state { + let tags = { + let mut tag = Some(tag); + move || tag.take().expect("tag already taken") + }; + self.notify(GenericNotify::new(1, additional, tags)); } } } @@ -227,10 +236,12 @@ impl Inner { } #[cold] - fn notify(&mut self, mut n: usize, additional: bool) { - if !additional { - // Make sure we're not notifying more than we have. - if n <= self.notified { + fn notify(&mut self, mut notify: impl Notification) { + let mut n = notify.count(); + let is_additional = notify.is_additional(); + + if !is_additional { + if n < self.notified { return; } n -= self.notified; @@ -249,7 +260,11 @@ impl Inner { self.next = entry.next.get(); // Set the state to `Notified` and notify. - if let State::Task(task) = entry.state.replace(State::Notified(additional)) { + let tag = notify.next_tag(); + if let State::Task(task) = entry.state.replace(State::Notified { + additional: is_additional, + tag, + }) { task.wake(); } @@ -261,26 +276,26 @@ impl Inner { } } -struct ListLock<'a, 'b> { - lock: MutexGuard<'a, Inner>, - inner: &'b crate::Inner, +struct ListLock<'a, 'b, T> { + lock: MutexGuard<'a, Inner>, + inner: &'b crate::Inner, } -impl Deref for ListLock<'_, '_> { - type Target = Inner; +impl Deref for ListLock<'_, '_, T> { + type Target = Inner; fn deref(&self) -> &Self::Target { &self.lock } } -impl DerefMut for ListLock<'_, '_> { +impl DerefMut for ListLock<'_, '_, T> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.lock } } -impl Drop for ListLock<'_, '_> { +impl Drop for ListLock<'_, '_, T> { fn drop(&mut self) { let list = &mut **self; @@ -295,27 +310,27 @@ impl Drop for ListLock<'_, '_> { } } -pub(crate) struct Listener { +pub(crate) struct Listener { /// The inner link in the linked list. /// /// # Safety /// /// This can only be accessed while the central mutex is locked. - link: UnsafeCell, + link: UnsafeCell>, /// This listener cannot be moved after being pinned. _pin: PhantomPinned, } -struct Link { +struct Link { /// The current state of the listener. - state: Cell, + state: Cell>, /// The previous link in the linked list. - prev: Cell>>, + prev: Cell>>>, /// The next link in the linked list. - next: Cell>>, + next: Cell>>>, } #[cfg(test)] @@ -326,7 +341,7 @@ mod tests { macro_rules! make_listeners { ($($id:ident),*) => { $( - let $id = Option::::None; + let $id = Option::>::None; pin!($id); )* }; @@ -364,7 +379,7 @@ mod tests { inner.insert(listen3.as_mut()); // Notify one. - inner.notify(1, false); + inner.notify(GenericNotify::new(1, false, || ())); // Remove one. inner.remove(listen3, true);