Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add TaskPool::spawn_pollable so that the consumer does not need a blo… #4102

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ license = "MIT OR Apache-2.0"
keywords = ["bevy"]

[dependencies]
bevy_utils = { path = "../bevy_utils", version = "0.8.0-dev" }

futures-lite = "1.4.0"
event-listener = "2.5.2"
async-executor = "1.3.0"
Expand Down
3 changes: 3 additions & 0 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ pub use slice::{ParallelSlice, ParallelSliceMut};
mod task;
pub use task::Task;

mod pollable_task;
pub use pollable_task::PollableTask;

#[cfg(not(target_arch = "wasm32"))]
mod task_pool;
#[cfg(not(target_arch = "wasm32"))]
Expand Down
33 changes: 33 additions & 0 deletions crates/bevy_tasks/src/pollable_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use crate::Task;
use async_channel::{Receiver, TryRecvError};

/// A pollable task whose result readiness can be checked in system functions
/// on every frame update without blocking on a future
#[derive(Debug)]
pub struct PollableTask<T> {
receiver: Receiver<T>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is using channels actually preferable to calling future::block_on? Has anyone run benchmarks? Will this work on wasm? (pretty sure future::block_on doesn't, but its still worth checking)
Are there alternatives that don't involve async channels (which will allocate and use atomics)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks like #2650.

I haven't run the numbers on it though. We need to get data into the ecs from outside the ecs, which means that it needs to live somewhere with a stable address so that both sides can talk to each other. Currently, that's where the Task lives, in this PR it's in the channel internals.

Adding new items can also happen at any time relative to the system code happening, which means we definitely need atomics in some form.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were these benches from the original author, hanabi1224.

https://github.com/hanabi1224/bevy/blob/accee1bb4e1b3ea8c628199c0fd4ec02badc594f/examples/async_tasks/async_bench.rs

Looking at it, they probably should be ported to criterion and a bench needs to be added for the approach in this PR.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another option is to use or implement something similar to FuturesExt::now_or_never. It would have almost no overhead compared to using future::block_on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can confirm the example doesn't currently compile on wasm, since the single threaded task pool doesn't have the spawn_pollable api. You might be able to get it to work on wasm by detaching the task instead of storing it and then copying the code for spawn pollable over to the single threaded task pool.

// this is to keep the task alive
_task: Task<()>,
}

impl<T> PollableTask<T> {
pub(crate) fn new(receiver: Receiver<T>, task: Task<()>) -> Self {
Self {
receiver,
_task: task,
}
}

/// poll to see whether the task finished
pub fn poll(&self) -> Option<T> {
match self.receiver.try_recv() {
Ok(value) => Some(value),
Err(try_error) => match try_error {
TryRecvError::Empty => None,
TryRecvError::Closed => {
panic!("Polling on the task failed because the connection was already closed.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this panic-worthy? I'm not sure exactly when the actual spawned future is dropped, but I think it would be reasonable for the executor to drop it once it reaches Ready once.

I'd say it might be better to add e.g. an AtomicBool to this object, and print a warning in this case, at most once for each task. I'm not sure of the exact details of the executor though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the panic due to this discussion: #4102 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it seems quite harsh to stop the world for running the check after the task has ended.

I don't want to block on this; I don't feel very strongly either way.

}
},
}
}
}
66 changes: 61 additions & 5 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use async_channel::bounded;
use std::{
any::type_name,
future::Future,
mem,
pin::Pin,
sync::Arc,
thread::{self, JoinHandle},
};

use bevy_utils::tracing::error;
use futures_lite::{future, pin};

use crate::Task;
use crate::{PollableTask, Task};

/// Used to create a [`TaskPool`]
#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -228,6 +231,28 @@ impl TaskPool {
Task::new(self.executor.spawn(future))
}

/// Spawns a static future onto the thread pool. The returned `PollableTask` is not a future,
/// but can be polled in system functions on every frame update without being blocked on
pub fn spawn_pollable<F, T>(&self, future: F) -> PollableTask<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + Sync + 'static,
{
let (sender, receiver) = bounded(1);
let task = self.spawn(async move {
let result = future.await;
match sender.send(result).await {
Ok(()) => {}
Err(_) => error!(
"Sending result for future {future_name} (`Future<Output={return_name}>`) failed, because the receiving `PollableTask` was dropped",
future_name=type_name::<F>(),
return_name=type_name::<T>(),
),
}
});
PollableTask::new(receiver, task)
}

/// Spawns a static future on the thread-local async executor for the current thread. The task
/// will run entirely on the thread the task was spawned on. The returned Task is a future.
/// It can also be cancelled and "detached" allowing it to continue running without having
Expand Down Expand Up @@ -301,9 +326,13 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> {
#[allow(clippy::blacklisted_name)]
mod tests {
use super::*;
use std::sync::{
atomic::{AtomicBool, AtomicI32, Ordering},
Barrier,
use std::{
ops::Range,
sync::{
atomic::{AtomicBool, AtomicI32, Ordering},
Barrier,
},
time::Duration,
};

#[test]
Expand Down Expand Up @@ -405,7 +434,7 @@ mod tests {
scope.spawn_local(async move {
inner_count_clone.fetch_add(1, Ordering::Release);
if std::thread::current().id() != spawner {
// NOTE: This check is using an atomic rather than simply panicing the
// NOTE: This check is using an atomic rather than simply panicking the
// thread to avoid deadlocking the barrier on failure
inner_thread_check_failed.store(true, Ordering::Release);
}
Expand All @@ -418,4 +447,31 @@ mod tests {
assert!(!thread_check_failed.load(Ordering::Acquire));
assert_eq!(count.load(Ordering::Acquire), 200);
}

#[test]
fn test_spawn_pollable() {
let transform_fn = |i| i + 1;

let pool = TaskPool::new();
let nums: Range<u8> = 0..10;

let pollable_tasks = nums
.clone()
.into_iter()
.map(|i| pool.spawn_pollable(async move { transform_fn(i) }))
.collect::<Vec<_>>();

for _ in 0..100 {
for (pollable_task, number) in pollable_tasks.iter().zip(nums.clone()) {
match pollable_task.poll() {
None => continue,
Some(actual) => assert_eq!(transform_fn(number), actual),
}
return;
}

std::thread::sleep(Duration::from_secs_f32(1. / 100.));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sleeping in tests is very much not ideal. I think we would check the output from the relevant channel from each task

I.e. write this test relatviely async, using e.g. TaskPoll::scope

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get how I should use TaskPool::scope here. I want to test that a PollableTask will return something. Should I wrap those pollable tasks in a scope to do the waiting there?

}
panic!("Tasks did not finish in time.");
}
}