Skip to content

Commit

Permalink
feat: Add no_std implementation
Browse files Browse the repository at this point in the history
This commit adds an implementation of the event-listener algorithm built
without locks. This is a replacement of the old no_std backend. It is
written without concurrent-queue and therefore closes #109.

The idea behind this implementation is to store the listeners in "slots"
in an infinitely large list. Then, assign each listener a number and
then use that to wait on each listener.

The list used is similar to the one used by the thread-local crate. It
consists of a list of "buckets" that hold slots. The number of slots
increases in an amortized way. The first slot holds 1, the second slot
holds 2, the third slot holds 4... all the way up to usize::MAX slots.

Indexes are done by having a list of reusable indexes and using those
when possible, only increasing the max index when necessary. This part
of the code could use some work; under contention it's possible to
make some slots unusuable. This can happen under two cases:

- If there is contention on the indexes list when the listener is being
  freed.
- If the slot is still being notified when it is attempted to be reused.

Both of these cases are probably fixable, and should be fixed before
release. Otherwise long running server processes using this code will
run out of memory under heavy loads.

From here the rest of the implementation is an atomic linked list based
on the above primitives. It functions very similarly to the std variant.
The main difference is that the Link structure's waker functions very
similarly to AtomicWaker from the atomic-waker crate. Aside from that
the code isn't very interesting on its own.

Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull committed Apr 16, 2024
1 parent 19ef495 commit dbfbb3e
Show file tree
Hide file tree
Showing 9 changed files with 1,061 additions and 203 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ jobs:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: cargo build --all --all-targets
- run: cargo build --all --no-default-features --all-targets
- run: cargo build --all --all-features --all-targets
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
if: startsWith(matrix.rust, 'nightly')
Expand All @@ -48,12 +50,13 @@ jobs:
matrix:
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.39']
rust: ['1.59']
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: cargo build
- run: cargo build --no-default-features

clippy:
runs-on: ubuntu-latest
Expand All @@ -62,6 +65,8 @@ jobs:
- name: Install Rust
run: rustup update stable
- run: cargo clippy --all-features --all-targets
- run: cargo clippy --all-targets
- run: cargo clippy --no-default-features --all-targets

fmt:
runs-on: ubuntu-latest
Expand Down
5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "event-listener"
version = "2.5.3"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2021"
rust-version = "1.56"
rust-version = "1.59"
description = "Notify async tasks or threads"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/event-listener"
Expand All @@ -21,9 +21,8 @@ portable-atomic = ["portable-atomic-crate", "portable-atomic-util"]

[dependencies]
portable-atomic-crate = { version = "1.6.0", package = "portable-atomic", optional = true }
portable-atomic-util = { version = "0.1.5", optional = true }
portable-atomic-util = { version = "0.1.5", features = ["alloc"], optional = true }

[dev-dependencies]
futures = { version = "0.3", default-features = false, features = ["std"] }
futures-lite = "2.3.0"
waker-fn = "1"
282 changes: 147 additions & 135 deletions examples/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,182 +4,194 @@

#![allow(dead_code)]

use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, Instant};

use event_listener::{Event, Listener};

/// A simple mutex.
struct Mutex<T> {
/// Set to `true` when the mutex is locked.
locked: AtomicBool,

/// Blocked lock operations.
lock_ops: Event,

/// The inner protected data.
data: UnsafeCell<T>,
}
#[cfg(feature = "std")]
mod ex {
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, Instant};

use event_listener::{Event, Listener};

/// A simple mutex.
struct Mutex<T> {
/// Set to `true` when the mutex is locked.
locked: AtomicBool,

/// Blocked lock operations.
lock_ops: Event,

/// The inner protected data.
data: UnsafeCell<T>,
}

unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}

impl<T> Mutex<T> {
/// Creates a mutex.
fn new(t: T) -> Mutex<T> {
Mutex {
locked: AtomicBool::new(false),
lock_ops: Event::new(),
data: UnsafeCell::new(t),
impl<T> Mutex<T> {
/// Creates a mutex.
fn new(t: T) -> Mutex<T> {
Mutex {
locked: AtomicBool::new(false),
lock_ops: Event::new(),
data: UnsafeCell::new(t),
}
}
}

/// Attempts to acquire a lock.
fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if !self.locked.swap(true, Ordering::Acquire) {
Some(MutexGuard(self))
} else {
None
/// Attempts to acquire a lock.
fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if !self.locked.swap(true, Ordering::Acquire) {
Some(MutexGuard(self))
} else {
None
}
}
}

/// Blocks until a lock is acquired.
fn lock(&self) -> MutexGuard<'_, T> {
let mut listener = None;

loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
/// Blocks until a lock is acquired.
fn lock(&self) -> MutexGuard<'_, T> {
let mut listener = None;

// Set up an event listener or wait for a notification.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
Some(l) => {
// Wait until a notification is received.
l.wait();

// Set up an event listener or wait for a notification.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.wait();
}
}
}
}
}

/// Blocks until a lock is acquired or the timeout is reached.
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
let deadline = Instant::now() + timeout;
let mut listener = None;

loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return Some(guard);
}
/// Blocks until a lock is acquired or the timeout is reached.
fn lock_timeout(&self, timeout: Duration) -> Option<MutexGuard<'_, T>> {
let deadline = Instant::now() + timeout;
let mut listener = None;

// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return Some(guard);
}
Some(l) => {
// Wait until a notification is received.
if l.wait_deadline(deadline).is_none() {
return None;

// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.wait_deadline(deadline)?;
}
}
}
}
}

/// Acquires a lock asynchronously.
async fn lock_async(&self) -> MutexGuard<'_, T> {
let mut listener = None;

loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
/// Acquires a lock asynchronously.
async fn lock_async(&self) -> MutexGuard<'_, T> {
let mut listener = None;

// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
loop {
// Attempt grabbing a lock.
if let Some(guard) = self.try_lock() {
return guard;
}
Some(l) => {
// Wait until a notification is received.
l.await;

// Set up an event listener or wait for an event.
match listener.take() {
None => {
// Start listening and then try locking again.
listener = Some(self.lock_ops.listen());
}
Some(l) => {
// Wait until a notification is received.
l.await;
}
}
}
}
}
}

/// A guard holding a lock.
struct MutexGuard<'a, T>(&'a Mutex<T>);
/// A guard holding a lock.
struct MutexGuard<'a, T>(&'a Mutex<T>);

unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
unsafe impl<T: Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}

impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
self.0.lock_ops.notify(1);
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
self.0.lock_ops.notify(1);
}
}
}

impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;

fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
fn deref(&self) -> &T {
unsafe { &*self.0.data.get() }
}
}
}

impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.data.get() }
}
}
}

fn main() {
const N: usize = 10;
pub(crate) fn entry() {
const N: usize = 10;

// A shared counter.
let counter = Arc::new(Mutex::new(0));
// A shared counter.
let counter = Arc::new(Mutex::new(0));

// A channel that signals when all threads are done.
let (tx, rx) = mpsc::channel();
// A channel that signals when all threads are done.
let (tx, rx) = mpsc::channel();

// Spawn a bunch of threads incrementing the counter.
for _ in 0..N {
let counter = counter.clone();
let tx = tx.clone();
// Spawn a bunch of threads incrementing the counter.
for _ in 0..N {
let counter = counter.clone();
let tx = tx.clone();

thread::spawn(move || {
let mut counter = counter.lock();
*counter += 1;
thread::spawn(move || {
let mut counter = counter.lock();
*counter += 1;

// If this is the last increment, signal that we're done.
if *counter == N {
tx.send(()).unwrap();
}
});
}
// If this is the last increment, signal that we're done.
if *counter == N {
tx.send(()).unwrap();
}
});
}

// Wait until the last thread increments the counter.
rx.recv().unwrap();
// Wait until the last thread increments the counter.
rx.recv().unwrap();

// The counter must equal the number of threads.
assert_eq!(*counter.lock(), N);
// The counter must equal the number of threads.
assert_eq!(*counter.lock(), N);

println!("Done!");
println!("Done!");
}
}

#[cfg(not(feature = "std"))]
mod ex {
pub(crate) fn entry() {
eprintln!("this example requires the 'std' feature")
}
}

fn main() {
ex::entry();
}
Loading

0 comments on commit dbfbb3e

Please sign in to comment.