Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tokio as an optional bevy_tasks backend #6762

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ repository = "https://github.com/bevyengine/bevy"
license = "MIT OR Apache-2.0"
keywords = ["bevy"]

[features]
default = ["tokio"]
tokio = ["dep:tokio"]

[dependencies]
futures-lite = "1.4.0"
async-executor = "1.3.0"
Expand All @@ -19,5 +23,8 @@ concurrent-queue = "1.2.2"
[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.22", optional = true, features = ["rt-multi-thread"]}

[dev-dependencies]
instant = { version = "0.1", features = ["wasm-bindgen"] }
15 changes: 13 additions & 2 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,25 @@
mod slice;
pub use slice::{ParallelSlice, ParallelSliceMut};

#[cfg(any(target_arch = "wasm32", not(feature = "tokio")))]
mod task;
#[cfg(any(target_arch = "wasm32", not(feature = "tokio")))]
pub use task::Task;
#[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))]
mod tokio_task;
#[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))]
pub use tokio_task::Task;

#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), not(feature = "tokio")))]
mod task_pool;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(all(not(target_arch = "wasm32"), not(feature = "tokio")))]
pub use task_pool::{Scope, TaskPool, TaskPoolBuilder};

#[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))]
mod tokio_task_pool;
#[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))]
pub use tokio_task_pool::{Scope, TaskPool, TaskPoolBuilder};

#[cfg(target_arch = "wasm32")]
mod single_threaded_task_pool;
#[cfg(target_arch = "wasm32")]
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ pub struct Scope<'scope, 'env: 'scope, T> {
env: PhantomData<&'env mut &'env ()>,
}

impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> {
impl<'scope, 'env, T: Send + 'static> Scope<'scope, 'env, T> {
/// Spawns a scoped future onto the thread pool. The scope *must* outlive
/// the provided future. The results of the future will be returned as a part of
/// [`TaskPool::scope`]'s return value.
Expand Down
81 changes: 81 additions & 0 deletions crates/bevy_tasks/src/tokio_task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures_lite::FutureExt;
use tokio::task::JoinHandle;

/// Wraps `async_executor::Task`, a spawned future.
///
/// Tasks are also futures themselves and yield the output of the spawned future.
///
/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit
/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method.
///
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
/// Wraps `async_executor::Task`
#[derive(Debug)]
#[must_use = "Tasks are canceled when dropped, use `.detach()` to run them in the background."]
pub struct Task<T>(Option<JoinHandle<T>>);

impl<T> Task<T> {
/// Creates a new task from a given `async_executor::Task`
pub fn new(task: JoinHandle<T>) -> Self {
Self(Some(task))
}

/// Detaches the task to let it keep running in the background. See
/// `async_executor::Task::detach`
pub fn detach(mut self) {
drop(self.0.take());
}

/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or [`None`] if
/// it didn't complete.
///
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
///
/// See `async_executor::Task::cancel`
pub async fn cancel(mut self) -> Option<T> {
self.0.take()?
.await
.ok()
}

/// Returns `true` if the current task is finished.
///
///
/// Unlike poll, it doesn't resolve the final value, it just checks if the task has finished.
/// Note that in a multithreaded environment, this task can be finished immediately after calling this function.
pub fn is_finished(&self) -> bool {
self.0.as_ref().map(|handle| handle.is_finished()).unwrap_or(true)
}
}

impl<T> Future for Task<T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(handle) = self.0.as_mut() {
match handle.poll(cx) {
Poll::Ready(Ok(result)) => Poll::Ready(result),
Poll::Ready(Err(err)) => panic!("Task has failed: {}", err),
Poll::Pending => Poll::Pending,
}
} else {
unreachable!("Polling dropped task");
}
}
}

impl<T> Drop for Task<T> {
fn drop(&mut self) {
if let Some(handle) = self.0.take() {
handle.abort();
}
}
}
Loading