Skip to content

Commit

Permalink
Fix race condition in AsyncWriteStream (bytecodealliance#7144)
Browse files Browse the repository at this point in the history
* Fix race condition in `AsyncWriteStream`

This commit fixes a synchronization issue with `AsyncWriteStream` where
the writer task would get wedged and not make any further progress. The
underlying reason is that `notify_waiters` was used which did not buffer
its notification, meaning that if a waiter wasn't actually waiting then
it would miss the notification and block forever on the next call to
`notified`. By using `notify_one` instead this will buffer up a single
notification for the other end for when it gets to waiting for work.

Additionally this removes creation of the `Notified` future
ahead-of-time as that's no longer necessary.

* Fix http bodies too
  • Loading branch information
alexcrichton authored and eduardomourar committed Oct 4, 2023
1 parent 0e262a5 commit 8e75472
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
11 changes: 5 additions & 6 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,11 @@ impl Worker {
state.error = Some(e.into());
state.flush_pending = false;
}
self.write_ready_changed.notify_waiters();
self.write_ready_changed.notify_one();
}

async fn work(&self, writer: mpsc::Sender<Bytes>) {
loop {
let notified = self.new_work.notified();
while let Some(job) = self.pop() {
match job {
Job::Flush => {
Expand All @@ -446,10 +445,10 @@ impl Worker {
}
}

self.write_ready_changed.notify_waiters();
self.write_ready_changed.notify_one();
}

notified.await;
self.new_work.notified().await;
}
}
}
Expand Down Expand Up @@ -493,15 +492,15 @@ impl HostOutputStream for BodyWriteStream {
None => return Err(OutputStreamError::Trap(anyhow!("write exceeded budget"))),
}
drop(state);
self.worker.new_work.notify_waiters();
self.worker.new_work.notify_one();
Ok(())
}
fn flush(&mut self) -> Result<(), OutputStreamError> {
let mut state = self.worker.state();
state.check_error()?;

state.flush_pending = true;
self.worker.new_work.notify_waiters();
self.worker.new_work.notify_one();

Ok(())
}
Expand Down
12 changes: 5 additions & 7 deletions crates/wasi/src/preview2/write_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,11 @@ impl Worker {
state.error = Some(e.into());
state.flush_pending = false;
}
self.write_ready_changed.notify_waiters();
self.write_ready_changed.notify_one();
}
async fn work<T: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static>(&self, mut writer: T) {
use tokio::io::AsyncWriteExt;
loop {
let notified = self.new_work.notified();
while let Some(job) = self.pop() {
match job {
Job::Flush => {
Expand Down Expand Up @@ -130,10 +129,9 @@ impl Worker {
}
}

self.write_ready_changed.notify_waiters();
self.write_ready_changed.notify_one();
}

notified.await;
self.new_work.notified().await;
}
}
}
Expand Down Expand Up @@ -180,15 +178,15 @@ impl HostOutputStream for AsyncWriteStream {
None => return Err(OutputStreamError::Trap(anyhow!("write exceeded budget"))),
}
drop(state);
self.worker.new_work.notify_waiters();
self.worker.new_work.notify_one();
Ok(())
}
fn flush(&mut self) -> Result<(), OutputStreamError> {
let mut state = self.worker.state();
state.check_error()?;

state.flush_pending = true;
self.worker.new_work.notify_waiters();
self.worker.new_work.notify_one();

Ok(())
}
Expand Down

0 comments on commit 8e75472

Please sign in to comment.