-
-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add TaskPool::spawn_pollable so that the consumer does not need a blo… #4102
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
use crate::Task; | ||
use async_channel::{Receiver, TryRecvError}; | ||
|
||
/// A pollable task whose result readiness can be checked in system functions | ||
/// on every frame update without blocking on a future | ||
#[derive(Debug)] | ||
pub struct PollableTask<T> { | ||
receiver: Receiver<T>, | ||
// this is to keep the task alive | ||
_task: Task<()>, | ||
} | ||
|
||
impl<T> PollableTask<T> { | ||
pub(crate) fn new(receiver: Receiver<T>, task: Task<()>) -> Self { | ||
Self { | ||
receiver, | ||
_task: task, | ||
} | ||
} | ||
|
||
/// poll to see whether the task finished | ||
pub fn poll(&self) -> Option<T> { | ||
match self.receiver.try_recv() { | ||
Ok(value) => Some(value), | ||
Err(try_error) => match try_error { | ||
TryRecvError::Empty => None, | ||
TryRecvError::Closed => { | ||
panic!("Polling on the task failed because the connection was already closed.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this panic-worthy? I'm not sure exactly when the actual spawned future is dropped, but I think it would be reasonable for the executor to drop it once it reaches I'd say it might be better to add e.g. an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the panic due to this discussion: #4102 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, it seems quite harsh to stop the world for running the check after the task has ended. I don't want to block on this; I don't feel very strongly either way. |
||
} | ||
}, | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,17 @@ | ||
use async_channel::bounded; | ||
use std::{ | ||
any::type_name, | ||
future::Future, | ||
mem, | ||
pin::Pin, | ||
sync::Arc, | ||
thread::{self, JoinHandle}, | ||
}; | ||
|
||
use bevy_utils::tracing::error; | ||
use futures_lite::{future, pin}; | ||
|
||
use crate::Task; | ||
use crate::{PollableTask, Task}; | ||
|
||
/// Used to create a [`TaskPool`] | ||
#[derive(Debug, Default, Clone)] | ||
|
@@ -228,6 +231,28 @@ impl TaskPool { | |
Task::new(self.executor.spawn(future)) | ||
} | ||
|
||
/// Spawns a static future onto the thread pool. The returned `PollableTask` is not a future, | ||
/// but can be polled in system functions on every frame update without being blocked on | ||
pub fn spawn_pollable<F, T>(&self, future: F) -> PollableTask<T> | ||
where | ||
F: Future<Output = T> + Send + 'static, | ||
T: Send + Sync + 'static, | ||
{ | ||
let (sender, receiver) = bounded(1); | ||
let task = self.spawn(async move { | ||
let result = future.await; | ||
match sender.send(result).await { | ||
Ok(()) => {} | ||
Err(_) => error!( | ||
"Sending result for future {future_name} (`Future<Output={return_name}>`) failed, because the receiving `PollableTask` was dropped", | ||
future_name=type_name::<F>(), | ||
return_name=type_name::<T>(), | ||
), | ||
} | ||
}); | ||
PollableTask::new(receiver, task) | ||
} | ||
|
||
/// Spawns a static future on the thread-local async executor for the current thread. The task | ||
/// will run entirely on the thread the task was spawned on. The returned Task is a future. | ||
/// It can also be cancelled and "detached" allowing it to continue running without having | ||
|
@@ -301,9 +326,13 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { | |
#[allow(clippy::blacklisted_name)] | ||
mod tests { | ||
use super::*; | ||
use std::sync::{ | ||
atomic::{AtomicBool, AtomicI32, Ordering}, | ||
Barrier, | ||
use std::{ | ||
ops::Range, | ||
sync::{ | ||
atomic::{AtomicBool, AtomicI32, Ordering}, | ||
Barrier, | ||
}, | ||
time::Duration, | ||
}; | ||
|
||
#[test] | ||
|
@@ -405,7 +434,7 @@ mod tests { | |
scope.spawn_local(async move { | ||
inner_count_clone.fetch_add(1, Ordering::Release); | ||
if std::thread::current().id() != spawner { | ||
// NOTE: This check is using an atomic rather than simply panicing the | ||
// NOTE: This check is using an atomic rather than simply panicking the | ||
// thread to avoid deadlocking the barrier on failure | ||
inner_thread_check_failed.store(true, Ordering::Release); | ||
} | ||
|
@@ -418,4 +447,31 @@ mod tests { | |
assert!(!thread_check_failed.load(Ordering::Acquire)); | ||
assert_eq!(count.load(Ordering::Acquire), 200); | ||
} | ||
|
||
#[test] | ||
fn test_spawn_pollable() { | ||
let transform_fn = |i| i + 1; | ||
|
||
let pool = TaskPool::new(); | ||
let nums: Range<u8> = 0..10; | ||
|
||
let pollable_tasks = nums | ||
.clone() | ||
.into_iter() | ||
.map(|i| pool.spawn_pollable(async move { transform_fn(i) })) | ||
.collect::<Vec<_>>(); | ||
|
||
for _ in 0..100 { | ||
for (pollable_task, number) in pollable_tasks.iter().zip(nums.clone()) { | ||
match pollable_task.poll() { | ||
None => continue, | ||
Some(actual) => assert_eq!(transform_fn(number), actual), | ||
} | ||
return; | ||
} | ||
|
||
std::thread::sleep(Duration::from_secs_f32(1. / 100.)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sleeping in tests is very much not ideal. I think we would check the output from the relevant channel from each task I.e. write this test relatviely There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't quite get how I should use |
||
} | ||
panic!("Tasks did not finish in time."); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is using channels actually preferable to calling
future::block_on
? Has anyone run benchmarks? Will this work onwasm
? (pretty surefuture::block_on
doesn't, but its still worth checking)Are there alternatives that don't involve async channels (which will allocate and use atomics)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks like #2650.
I haven't run the numbers on it though. We need to get data into the ecs from outside the ecs, which means that it needs to live somewhere with a stable address so that both sides can talk to each other. Currently, that's where the
Task
lives, in this PR it's in the channel internals.Adding new items can also happen at any time relative to the system code happening, which means we definitely need atomics in some form.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There were these benches from the original author, hanabi1224.
https://github.com/hanabi1224/bevy/blob/accee1bb4e1b3ea8c628199c0fd4ec02badc594f/examples/async_tasks/async_bench.rs
Looking at it, they probably should be ported to criterion and a bench needs to be added for the approach in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think another option is to use or implement something similar to
FuturesExt::now_or_never
. It would have almost no overhead compared to usingfuture::block_on
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can confirm the example doesn't currently compile on wasm, since the single threaded task pool doesn't have the spawn_pollable api. You might be able to get it to work on wasm by detaching the task instead of storing it and then copying the code for spawn pollable over to the single threaded task pool.