From f51e6399842857f6ed89b7e0323cc89e06f45844 Mon Sep 17 00:00:00 2001 From: pubrrr Date: Fri, 4 Mar 2022 15:54:56 +0100 Subject: [PATCH 1/3] add TaskPool::spawn_pollable so that the consumer does not need a block_on method --- crates/bevy_tasks/Cargo.toml | 2 + crates/bevy_tasks/src/lib.rs | 3 ++ crates/bevy_tasks/src/pollable_task.rs | 33 ++++++++++++++ crates/bevy_tasks/src/task_pool.rs | 59 +++++++++++++++++++++++--- 4 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 crates/bevy_tasks/src/pollable_task.rs diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 06a0da456931a..72ca31b17950b 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,6 +9,8 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [dependencies] +bevy_utils = { path = "../bevy_utils", version = "0.6.0" } + futures-lite = "1.4.0" event-listener = "2.5.2" async-executor = "1.3.0" diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 7345d775ee968..1d0afbc001986 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -7,6 +7,9 @@ pub use slice::{ParallelSlice, ParallelSliceMut}; mod task; pub use task::Task; +mod pollable_task; +pub use pollable_task::PollableTask; + #[cfg(not(target_arch = "wasm32"))] mod task_pool; #[cfg(not(target_arch = "wasm32"))] diff --git a/crates/bevy_tasks/src/pollable_task.rs b/crates/bevy_tasks/src/pollable_task.rs new file mode 100644 index 0000000000000..d80e082eee0c9 --- /dev/null +++ b/crates/bevy_tasks/src/pollable_task.rs @@ -0,0 +1,33 @@ +use crate::Task; +use async_channel::{Receiver, TryRecvError}; + +/// A pollable task whose result readiness can be checked in system functions +/// on every frame update without blocking on a future +#[derive(Debug)] +pub struct PollableTask { + receiver: Receiver, + // this is to keep the task alive + _task: Task<()>, +} + +impl PollableTask { + pub(crate) fn new(receiver: Receiver, task: Task<()>) -> Self { + Self { + receiver, + _task: task, + } + } + + /// poll to see whether the task finished + pub fn poll(&self) -> Option { + match self.receiver.try_recv() { + Ok(value) => Some(value), + Err(try_error) => match try_error { + TryRecvError::Empty => None, + TryRecvError::Closed => { + panic!("Polling on the task failed because the connection was already closed.") + } + }, + } + } +} diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 1d0f86e7cb5ed..677069821a7b0 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -1,3 +1,4 @@ +use async_channel::bounded; use std::{ future::Future, mem, @@ -6,9 +7,10 @@ use std::{ thread::{self, JoinHandle}, }; +use bevy_utils::tracing::warn; use futures_lite::{future, pin}; -use crate::Task; +use crate::{PollableTask, Task}; /// Used to create a [`TaskPool`] #[derive(Debug, Default, Clone)] @@ -228,6 +230,26 @@ impl TaskPool { Task::new(self.executor.spawn(future)) } + /// Spawns a static future onto the thread pool. The returned `PollableTask` is not a future, + /// but can be polled in system functions on every frame update without being blocked on + pub fn spawn_pollable( + &self, + future: impl Future + Send + 'static, + ) -> PollableTask + where + T: Send + Sync + 'static, + { + let (sender, receiver) = bounded(1); + let task = self.spawn(async move { + let result = future.await; + match sender.send(result).await { + Ok(_) => {} + Err(_) => warn!("Could not send result of task to receiver"), + } + }); + PollableTask::new(receiver, task) + } + /// Spawns a static future on the thread-local async executor for the current thread. The task /// will run entirely on the thread the task was spawned on. The returned Task is a future. /// It can also be cancelled and "detached" allowing it to continue running without having @@ -301,9 +323,13 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { #[allow(clippy::blacklisted_name)] mod tests { use super::*; - use std::sync::{ - atomic::{AtomicBool, AtomicI32, Ordering}, - Barrier, + use std::{ + ops::Range, + sync::{ + atomic::{AtomicBool, AtomicI32, Ordering}, + Barrier, + }, + time::Duration, }; #[test] @@ -405,7 +431,7 @@ mod tests { scope.spawn_local(async move { inner_count_clone.fetch_add(1, Ordering::Release); if std::thread::current().id() != spawner { - // NOTE: This check is using an atomic rather than simply panicing the + // NOTE: This check is using an atomic rather than simply panicking the // thread to avoid deadlocking the barrier on failure inner_thread_check_failed.store(true, Ordering::Release); } @@ -418,4 +444,27 @@ mod tests { assert!(!thread_check_failed.load(Ordering::Acquire)); assert_eq!(count.load(Ordering::Acquire), 200); } + + #[test] + fn test_spawn_pollable() { + let transform_fn = |i| i + 1; + + let pool = TaskPool::new(); + let nums: Range = 0..10; + + let pollable_tasks = nums + .clone() + .into_iter() + .map(|i| pool.spawn_pollable(async move { transform_fn(i) })) + .collect::>(); + + std::thread::sleep(Duration::from_secs_f32(1.0 / 30.0)); + + for (pollable_task, number) in pollable_tasks.iter().zip(nums) { + let poll_result = pollable_task.poll(); + + let expected = transform_fn(number); + assert_eq!(Some(expected), poll_result); + } + } } From 550a0f1c84d0299bbda69b6d5076dad10e1bdcd3 Mon Sep 17 00:00:00 2001 From: pubrrr Date: Mon, 7 Mar 2022 23:07:38 +0100 Subject: [PATCH 2/3] make test more robust --- crates/bevy_tasks/src/task_pool.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 677069821a7b0..0956117248476 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -458,13 +458,17 @@ mod tests { .map(|i| pool.spawn_pollable(async move { transform_fn(i) })) .collect::>(); - std::thread::sleep(Duration::from_secs_f32(1.0 / 30.0)); - - for (pollable_task, number) in pollable_tasks.iter().zip(nums) { - let poll_result = pollable_task.poll(); + for _ in 0..100 { + for (pollable_task, number) in pollable_tasks.iter().zip(nums.clone()) { + match pollable_task.poll() { + None => continue, + Some(actual) => assert_eq!(transform_fn(number), actual), + } + return; + } - let expected = transform_fn(number); - assert_eq!(Some(expected), poll_result); + std::thread::sleep(Duration::from_secs_f32(1. / 100.)); } + panic!("Tasks did not finish in time."); } } From 8163fdec347b24b730c383dee1a143cd276cd526 Mon Sep 17 00:00:00 2001 From: pubrrr Date: Sun, 12 Jun 2022 13:24:52 +0200 Subject: [PATCH 3/3] improve error message when sending the result fails --- crates/bevy_tasks/Cargo.toml | 2 +- crates/bevy_tasks/src/task_pool.rs | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 72ca31b17950b..98cae6d4d4b3a 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,7 +9,7 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [dependencies] -bevy_utils = { path = "../bevy_utils", version = "0.6.0" } +bevy_utils = { path = "../bevy_utils", version = "0.8.0-dev" } futures-lite = "1.4.0" event-listener = "2.5.2" diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 0956117248476..99bc4631767ea 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -1,5 +1,6 @@ use async_channel::bounded; use std::{ + any::type_name, future::Future, mem, pin::Pin, @@ -7,7 +8,7 @@ use std::{ thread::{self, JoinHandle}, }; -use bevy_utils::tracing::warn; +use bevy_utils::tracing::error; use futures_lite::{future, pin}; use crate::{PollableTask, Task}; @@ -232,19 +233,21 @@ impl TaskPool { /// Spawns a static future onto the thread pool. The returned `PollableTask` is not a future, /// but can be polled in system functions on every frame update without being blocked on - pub fn spawn_pollable( - &self, - future: impl Future + Send + 'static, - ) -> PollableTask + pub fn spawn_pollable(&self, future: F) -> PollableTask where + F: Future + Send + 'static, T: Send + Sync + 'static, { let (sender, receiver) = bounded(1); let task = self.spawn(async move { let result = future.await; match sender.send(result).await { - Ok(_) => {} - Err(_) => warn!("Could not send result of task to receiver"), + Ok(()) => {} + Err(_) => error!( + "Sending result for future {future_name} (`Future`) failed, because the receiving `PollableTask` was dropped", + future_name=type_name::(), + return_name=type_name::(), + ), } }); PollableTask::new(receiver, task)