Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cli): remove possible deadlock in test channel #22662

Merged
merged 2 commits into from
Mar 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions cli/tools/test/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ use super::TestStdioStream;
use deno_core::futures::future::poll_fn;
use deno_core::parking_lot;
use deno_core::parking_lot::lock_api::RawMutex;
use deno_core::parking_lot::lock_api::RawMutexTimed;
use deno_runtime::deno_io::pipe;
use deno_runtime::deno_io::AsyncPipeRead;
use deno_runtime::deno_io::PipeRead;
use deno_runtime::deno_io::PipeWrite;
use std::fmt::Display;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::ready;
use std::task::Poll;
use std::time::Duration;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::ReadBuf;
Expand Down Expand Up @@ -143,17 +148,23 @@ impl TestStream {
self.read_opt.is_some()
}

/// Cancellation-safe.
#[inline]
fn pipe(&mut self) -> impl Future<Output = ()> + '_ {
poll_fn(|cx| self.poll_pipe(cx))
}

/// Attempt to read from a given stream, pushing all of the data in it into the given
/// [`UnboundedSender`] before returning.
async fn pipe(&mut self) {
fn poll_pipe(&mut self, cx: &mut std::task::Context) -> Poll<()> {
let mut buffer = [0_u8; BUFFER_SIZE];
let mut buf = ReadBuf::new(&mut buffer);
let res = {
// No more stream, so just return.
// No more stream, we shouldn't hit this case.
let Some(stream) = &mut self.read_opt else {
return;
Copy link
Contributor Author

@mmastrac mmastrac Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could lead to starvation as we just quietly returned "ready" and tokio could select us again.

unreachable!();
};
poll_fn(|cx| Pin::new(&mut *stream).poll_read(cx, &mut buf)).await
ready!(Pin::new(&mut *stream).poll_read(cx, &mut buf))
};
match res {
Ok(_) => {
Expand All @@ -173,6 +184,7 @@ impl TestStream {
self.read_opt.take();
}
}
Poll::Ready(())
}

/// Read and "block" until the sync markers have been read.
Expand Down Expand Up @@ -249,11 +261,21 @@ impl TestEventSenderFactory {
let mut test_stderr =
TestStream::new(id, TestStdioStream::Stderr, stderr_reader, sender)?;

// This ensures that the stdout and stderr streams in the select! loop below cannot starve each
// other.
let mut alternate_stream_priority = false;

// This function will be woken whenever a stream or the receiver is ready
loop {
alternate_stream_priority = !alternate_stream_priority;
let (a, b) = if alternate_stream_priority {
(&mut test_stdout, &mut test_stderr)
} else {
(&mut test_stderr, &mut test_stdout)
};

tokio::select! {
_ = test_stdout.pipe(), if test_stdout.is_alive() => {},
_ = test_stderr.pipe(), if test_stdout.is_alive() => {},
biased; // We actually want to poll the channel first
recv = sync_receiver.recv() => {
match recv {
// If the channel closed, we assume that all important data from the streams was synced,
Expand All @@ -273,6 +295,10 @@ impl TestEventSenderFactory {
}
}
}
// Poll stdout first if `alternate_stream_priority` is true, otherwise poll stderr first.
// This is necessary because of the `biased` flag above to avoid starvation.
_ = a.pipe(), if a.is_alive() => {},
_ = b.pipe(), if b.is_alive() => {},
}
}

Expand Down Expand Up @@ -377,7 +403,12 @@ impl TestEventSender {
let mutex = parking_lot::RawMutex::INIT;
mutex.lock();
self.sync_sender.send(SendMutex(&mutex as _))?;
mutex.lock();
if !mutex.try_lock_for(Duration::from_secs(30)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case

panic!(
"Test flush deadlock, sender closed = {}",
self.sync_sender.is_closed()
);
}
Ok(())
}
}
Expand Down Expand Up @@ -444,10 +475,9 @@ mod tests {
}

/// Test that flushing a large number of times doesn't hang.
#[ignore]
#[tokio::test]
async fn test_flush_lots() {
test_util::timeout!(60);
test_util::timeout!(240);
let (mut worker, mut receiver) = create_single_test_event_channel();
let recv_handle = spawn(async move {
let mut queue = vec![];
Expand Down