-
Notifications
You must be signed in to change notification settings - Fork 5.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(cli): remove possible deadlock in test channel #22662
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,16 +5,21 @@ use super::TestStdioStream; | |
use deno_core::futures::future::poll_fn; | ||
use deno_core::parking_lot; | ||
use deno_core::parking_lot::lock_api::RawMutex; | ||
use deno_core::parking_lot::lock_api::RawMutexTimed; | ||
use deno_runtime::deno_io::pipe; | ||
use deno_runtime::deno_io::AsyncPipeRead; | ||
use deno_runtime::deno_io::PipeRead; | ||
use deno_runtime::deno_io::PipeWrite; | ||
use std::fmt::Display; | ||
use std::future::Future; | ||
use std::io::Write; | ||
use std::pin::Pin; | ||
use std::sync::atomic::AtomicUsize; | ||
use std::sync::atomic::Ordering; | ||
use std::sync::Arc; | ||
use std::task::ready; | ||
use std::task::Poll; | ||
use std::time::Duration; | ||
use tokio::io::AsyncRead; | ||
use tokio::io::AsyncReadExt; | ||
use tokio::io::ReadBuf; | ||
|
@@ -143,17 +148,23 @@ impl TestStream { | |
self.read_opt.is_some() | ||
} | ||
|
||
/// Cancellation-safe. | ||
#[inline] | ||
fn pipe(&mut self) -> impl Future<Output = ()> + '_ { | ||
poll_fn(|cx| self.poll_pipe(cx)) | ||
} | ||
|
||
/// Attempt to read from a given stream, pushing all of the data in it into the given | ||
/// [`UnboundedSender`] before returning. | ||
async fn pipe(&mut self) { | ||
fn poll_pipe(&mut self, cx: &mut std::task::Context) -> Poll<()> { | ||
let mut buffer = [0_u8; BUFFER_SIZE]; | ||
let mut buf = ReadBuf::new(&mut buffer); | ||
let res = { | ||
// No more stream, so just return. | ||
// No more stream, we shouldn't hit this case. | ||
let Some(stream) = &mut self.read_opt else { | ||
return; | ||
unreachable!(); | ||
}; | ||
poll_fn(|cx| Pin::new(&mut *stream).poll_read(cx, &mut buf)).await | ||
ready!(Pin::new(&mut *stream).poll_read(cx, &mut buf)) | ||
}; | ||
match res { | ||
Ok(_) => { | ||
|
@@ -173,6 +184,7 @@ impl TestStream { | |
self.read_opt.take(); | ||
} | ||
} | ||
Poll::Ready(()) | ||
} | ||
|
||
/// Read and "block" until the sync markers have been read. | ||
|
@@ -249,11 +261,21 @@ impl TestEventSenderFactory { | |
let mut test_stderr = | ||
TestStream::new(id, TestStdioStream::Stderr, stderr_reader, sender)?; | ||
|
||
// This ensures that the stdout and stderr streams in the select! loop below cannot starve each | ||
// other. | ||
let mut alternate_stream_priority = false; | ||
|
||
// This function will be woken whenever a stream or the receiver is ready | ||
loop { | ||
alternate_stream_priority = !alternate_stream_priority; | ||
let (a, b) = if alternate_stream_priority { | ||
(&mut test_stdout, &mut test_stderr) | ||
} else { | ||
(&mut test_stderr, &mut test_stdout) | ||
}; | ||
|
||
tokio::select! { | ||
_ = test_stdout.pipe(), if test_stdout.is_alive() => {}, | ||
_ = test_stderr.pipe(), if test_stdout.is_alive() => {}, | ||
biased; // We actually want to poll the channel first | ||
recv = sync_receiver.recv() => { | ||
match recv { | ||
// If the channel closed, we assume that all important data from the streams was synced, | ||
|
@@ -273,6 +295,10 @@ impl TestEventSenderFactory { | |
} | ||
} | ||
} | ||
// Poll stdout first if `alternate_stream_priority` is true, otherwise poll stderr first. | ||
// This is necessary because of the `biased` flag above to avoid starvation. | ||
_ = a.pipe(), if a.is_alive() => {}, | ||
_ = b.pipe(), if b.is_alive() => {}, | ||
} | ||
} | ||
|
||
|
@@ -377,7 +403,12 @@ impl TestEventSender { | |
let mutex = parking_lot::RawMutex::INIT; | ||
mutex.lock(); | ||
self.sync_sender.send(SendMutex(&mutex as _))?; | ||
mutex.lock(); | ||
if !mutex.try_lock_for(Duration::from_secs(30)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just in case |
||
panic!( | ||
"Test flush deadlock, sender closed = {}", | ||
self.sync_sender.is_closed() | ||
); | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
@@ -444,10 +475,9 @@ mod tests { | |
} | ||
|
||
/// Test that flushing a large number of times doesn't hang. | ||
#[ignore] | ||
#[tokio::test] | ||
async fn test_flush_lots() { | ||
test_util::timeout!(60); | ||
test_util::timeout!(240); | ||
let (mut worker, mut receiver) = create_single_test_event_channel(); | ||
let recv_handle = spawn(async move { | ||
let mut queue = vec![]; | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could lead to starvation as we just quietly returned "ready" and tokio could select us again.