diff --git a/Cargo.toml b/Cargo.toml index 40432586f8df0..e18cdb2c78f5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ bevy_text = { path = "crates/bevy_text", version = "0.1" } bevy_ui = { path = "crates/bevy_ui", version = "0.1" } bevy_utils = { path = "crates/bevy_utils", version = "0.1" } bevy_window = { path = "crates/bevy_window", version = "0.1" } +bevy_tasks = { path = "crates/bevy_tasks", version = "0.1" } # bevy (optional) bevy_audio = { path = "crates/bevy_audio", optional = true, version = "0.1" } diff --git a/crates/bevy_app/Cargo.toml b/crates/bevy_app/Cargo.toml index 912b747953947..713560ba553fd 100644 --- a/crates/bevy_app/Cargo.toml +++ b/crates/bevy_app/Cargo.toml @@ -13,8 +13,10 @@ keywords = ["bevy"] # bevy bevy_derive = { path = "../bevy_derive", version = "0.1" } bevy_ecs = { path = "../bevy_ecs", version = "0.1" } +bevy_tasks = { path = "../bevy_tasks", version = "0.1" } +bevy_math = { path = "../bevy_math", version = "0.1" } # other libloading = "0.6" log = { version = "0.4", features = ["release_max_level_info"] } -serde = { version = "1.0", features = ["derive"]} \ No newline at end of file +serde = { version = "1.0", features = ["derive"]} diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index d5d1cfbc3c466..c62cdea8e7460 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -1,4 +1,4 @@ -use crate::app_builder::AppBuilder; +use crate::{app_builder::AppBuilder, DefaultTaskPoolOptions}; use bevy_ecs::{ParallelExecutor, Resources, Schedule, World}; #[allow(clippy::needless_doctest_main)] @@ -63,6 +63,12 @@ impl App { } pub fn run(mut self) { + // Setup the default bevy task pools + self.resources + .get_cloned::() + .unwrap_or_else(DefaultTaskPoolOptions::default) + .create_default_pools(&mut self.resources); + self.startup_schedule.initialize(&mut self.resources); self.startup_executor.run( &mut self.startup_schedule, diff --git a/crates/bevy_app/src/lib.rs b/crates/bevy_app/src/lib.rs index d2e241616b55d..a409edbfafae3 100644 --- a/crates/bevy_app/src/lib.rs +++ b/crates/bevy_app/src/lib.rs @@ -8,6 +8,7 @@ mod app_builder; mod event; mod plugin; mod schedule_runner; +mod task_pool_options; pub use app::*; pub use app_builder::*; @@ -15,6 +16,7 @@ pub use bevy_derive::DynamicPlugin; pub use event::*; pub use plugin::*; pub use schedule_runner::*; +pub use task_pool_options::*; pub mod prelude { pub use crate::{ diff --git a/crates/bevy_app/src/task_pool_options.rs b/crates/bevy_app/src/task_pool_options.rs new file mode 100644 index 0000000000000..fc5edbfc0e36f --- /dev/null +++ b/crates/bevy_app/src/task_pool_options.rs @@ -0,0 +1,147 @@ +use bevy_ecs::Resources; +use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder}; + +/// Defines a simple way to determine how many threads to use given the number of remaining cores +/// and number of total cores +#[derive(Clone)] +pub struct TaskPoolThreadAssignmentPolicy { + /// Force using at least this many threads + pub min_threads: usize, + /// Under no circumstance use more than this many threads for this pool + pub max_threads: usize, + /// Target using this percentage of total cores, clamped by min_threads and max_threads. It is + /// permitted to use 1.0 to try to use all remaining threads + pub percent: f32, +} + +impl TaskPoolThreadAssignmentPolicy { + /// Determine the number of threads to use for this task pool + fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize { + assert!(self.percent >= 0.0); + let mut desired = (total_threads as f32 * self.percent).round() as usize; + + // Limit ourselves to the number of cores available + desired = desired.min(remaining_threads); + + // Clamp by min_threads, max_threads. (This may result in us using more threads than are + // available, this is intended. An example case where this might happen is a device with + // <= 2 threads. + bevy_math::clamp(desired, self.min_threads, self.max_threads) + } +} + +/// Helper for configuring and creating the default task pools. For end-users who want full control, +/// insert the default task pools into the resource map manually. If the pools are already inserted, +/// this helper will do nothing. +#[derive(Clone)] +pub struct DefaultTaskPoolOptions { + /// If the number of physical cores is less than min_total_threads, force using min_total_threads + pub min_total_threads: usize, + /// If the number of physical cores is grater than max_total_threads, force using max_total_threads + pub max_total_threads: usize, + + /// Used to determine number of IO threads to allocate + pub io: TaskPoolThreadAssignmentPolicy, + /// Used to determine number of async compute threads to allocate + pub async_compute: TaskPoolThreadAssignmentPolicy, + /// Used to determine number of compute threads to allocate + pub compute: TaskPoolThreadAssignmentPolicy, +} + +impl Default for DefaultTaskPoolOptions { + fn default() -> Self { + DefaultTaskPoolOptions { + // By default, use however many cores are available on the system + min_total_threads: 1, + max_total_threads: std::usize::MAX, + + // Use 25% of cores for IO, at least 1, no more than 4 + io: TaskPoolThreadAssignmentPolicy { + min_threads: 1, + max_threads: 4, + percent: 0.25, + }, + + // Use 25% of cores for async compute, at least 1, no more than 4 + async_compute: TaskPoolThreadAssignmentPolicy { + min_threads: 1, + max_threads: 4, + percent: 0.25, + }, + + // Use all remaining cores for compute (at least 1) + compute: TaskPoolThreadAssignmentPolicy { + min_threads: 1, + max_threads: std::usize::MAX, + percent: 1.0, // This 1.0 here means "whatever is left over" + }, + } + } +} + +impl DefaultTaskPoolOptions { + /// Create a configuration that forces using the given number of threads. + pub fn with_num_threads(thread_count: usize) -> Self { + let mut options = Self::default(); + options.min_total_threads = thread_count; + options.max_total_threads = thread_count; + + options + } + + /// Inserts the default thread pools into the given resource map based on the configured values + pub fn create_default_pools(&self, resources: &mut Resources) { + let total_threads = bevy_math::clamp( + bevy_tasks::logical_core_count(), + self.min_total_threads, + self.max_total_threads, + ); + + let mut remaining_threads = total_threads; + + if !resources.contains::() { + // Determine the number of IO threads we will use + let io_threads = self + .io + .get_number_of_threads(remaining_threads, total_threads); + remaining_threads -= io_threads; + + resources.insert(IOTaskPool( + TaskPoolBuilder::default() + .num_threads(io_threads) + .thread_name("IO Task Pool".to_string()) + .build(), + )); + } + + if !resources.contains::() { + // Determine the number of async compute threads we will use + let async_compute_threads = self + .async_compute + .get_number_of_threads(remaining_threads, total_threads); + remaining_threads -= async_compute_threads; + + resources.insert(AsyncComputeTaskPool( + TaskPoolBuilder::default() + .num_threads(async_compute_threads) + .thread_name("Async Compute Task Pool".to_string()) + .build(), + )); + } + + if !resources.contains::() { + // Determine the number of compute threads we will use + // This is intentionally last so that an end user can specify 1.0 as the percent + let compute_threads = self + .compute + .get_number_of_threads(remaining_threads, total_threads); + + resources.insert(ComputeTaskPool( + TaskPoolBuilder::default() + .num_threads(compute_threads) + .thread_name("Compute Task Pool".to_string()) + .build(), + )); + } + } +} diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index 6d07ca56fda36..72e4fb354d10e 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -15,9 +15,9 @@ profiler = [] [dependencies] bevy_hecs = { path = "hecs", features = ["macros", "serialize"], version = "0.1" } +bevy_tasks = { path = "../bevy_tasks", version = "0.1" } bevy_utils = { path = "../bevy_utils", version = "0.1" } rand = "0.7.2" -rayon = "1.3" crossbeam-channel = "0.4.2" fixedbitset = "0.3.0" downcast-rs = "1.1.1" diff --git a/crates/bevy_ecs/src/resource/resources.rs b/crates/bevy_ecs/src/resource/resources.rs index 6612b7d1b1342..c9aae4bdb3c55 100644 --- a/crates/bevy_ecs/src/resource/resources.rs +++ b/crates/bevy_ecs/src/resource/resources.rs @@ -43,6 +43,12 @@ impl Resources { self.get_resource_mut(ResourceIndex::Global) } + /// Returns a clone of the underlying resource, this is helpful when borrowing something + /// cloneable (like a task pool) without taking a borrow on the resource map + pub fn get_cloned(&self) -> Option { + self.get::().map(|r| (*r).clone()) + } + #[allow(clippy::needless_lifetimes)] pub fn get_local<'a, T: Resource>(&'a self, id: SystemId) -> Option> { self.get_resource(ResourceIndex::System(id)) diff --git a/crates/bevy_ecs/src/schedule/parallel_executor.rs b/crates/bevy_ecs/src/schedule/parallel_executor.rs index a51ea4cff0f2d..3b434db503b6f 100644 --- a/crates/bevy_ecs/src/schedule/parallel_executor.rs +++ b/crates/bevy_ecs/src/schedule/parallel_executor.rs @@ -7,7 +7,6 @@ use bevy_hecs::{ArchetypesGeneration, World}; use crossbeam_channel::{Receiver, Sender}; use fixedbitset::FixedBitSet; use parking_lot::Mutex; -use rayon::ScopeFifo; use std::{ops::Range, sync::Arc}; /// Executes each schedule stage in parallel by analyzing system dependencies. @@ -66,52 +65,6 @@ impl ParallelExecutor { } } -/// This can be added as an app resource to control the global `rayon::ThreadPool` used by ecs. -// Dev internal note: We cannot directly expose a ThreadPoolBuilder here as it does not implement Send and Sync. -#[derive(Debug, Default, Clone)] -pub struct ParallelExecutorOptions { - /// If some value, we'll set up the thread pool to use at most n threads. See `rayon::ThreadPoolBuilder::num_threads`. - num_threads: Option, - /// If some value, we'll set up the thread pool's' workers to the given stack size. See `rayon::ThreadPoolBuilder::stack_size`. - stack_size: Option, - // TODO: Do we also need/want to expose other features (*_handler, etc.) -} - -impl ParallelExecutorOptions { - /// Creates a new ParallelExecutorOptions instance - pub fn new() -> Self { - Self::default() - } - - /// Sets the num_threads option, using the builder pattern - pub fn with_num_threads(mut self, num_threads: Option) -> Self { - self.num_threads = num_threads; - self - } - - /// Sets the stack_size option, using the builder pattern. WARNING: Only use this if you know what you're doing, - /// otherwise your application may run into stability and performance issues. - pub fn with_stack_size(mut self, stack_size: Option) -> Self { - self.stack_size = stack_size; - self - } - - /// Creates a new ThreadPoolBuilder based on the current options. - pub(crate) fn create_builder(&self) -> rayon::ThreadPoolBuilder { - let mut builder = rayon::ThreadPoolBuilder::new(); - - if let Some(num_threads) = self.num_threads { - builder = builder.num_threads(num_threads); - } - - if let Some(stack_size) = self.stack_size { - builder = builder.stack_size(stack_size); - } - - builder - } -} - #[derive(Debug, Clone)] pub struct ExecutorStage { /// each system's set of dependencies @@ -262,7 +215,7 @@ impl ExecutorStage { &mut self, systems: &[Arc>>], run_ready_type: RunReadyType, - scope: &ScopeFifo<'run>, + scope: &mut bevy_tasks::Scope<'run, ()>, world: &'run World, resources: &'run Resources, ) -> RunReadyResult { @@ -308,7 +261,8 @@ impl ExecutorStage { // handle multi-threaded system let sender = self.sender.clone(); self.running_systems.insert(system_index); - scope.spawn_fifo(move |_| { + + scope.spawn(async move { let mut system = system.lock(); system.run(world, resources); sender.send(system_index).unwrap(); @@ -328,6 +282,10 @@ impl ExecutorStage { systems: &[Arc>>], schedule_changed: bool, ) { + let compute_pool = resources + .get_cloned::() + .unwrap(); + // if the schedule has changed, clear executor state / fill it with new defaults if schedule_changed { self.system_dependencies.clear(); @@ -364,7 +322,8 @@ impl ExecutorStage { // if there are no upcoming thread local systems, run everything right now 0..systems.len() }; - rayon::scope_fifo(|scope| { + + compute_pool.scope(|scope| { run_ready_result = self.run_ready_systems( systems, RunReadyType::Range(run_ready_system_index_range), @@ -373,6 +332,7 @@ impl ExecutorStage { resources, ); }); + loop { // if all systems in the stage are finished, break out of the loop if self.finished_systems.count_ones(..) == systems.len() { @@ -393,7 +353,7 @@ impl ExecutorStage { run_ready_result = RunReadyResult::Ok; } else { // wait for a system to finish, then run its dependents - rayon::scope_fifo(|scope| { + compute_pool.scope(|scope| { loop { // if all systems in the stage are finished, break out of the loop if self.finished_systems.count_ones(..) == systems.len() { @@ -410,7 +370,7 @@ impl ExecutorStage { resources, ); - // if the next ready system is thread local, break out of this loop/rayon scope so it can be run + // if the next ready system is thread local, break out of this loop/bevy_tasks scope so it can be run if let RunReadyResult::ThreadLocalReady(_) = run_ready_result { break; } @@ -442,6 +402,7 @@ mod tests { Commands, }; use bevy_hecs::{Entity, World}; + use bevy_tasks::{ComputeTaskPool, TaskPool}; use fixedbitset::FixedBitSet; use parking_lot::Mutex; use std::sync::Arc; @@ -455,6 +416,8 @@ mod tests { fn cross_stage_archetype_change_prepare() { let mut world = World::new(); let mut resources = Resources::default(); + resources.insert(ComputeTaskPool(TaskPool::default())); + let mut schedule = Schedule::default(); schedule.add_stage("PreArchetypeChange"); schedule.add_stage("PostArchetypeChange"); @@ -484,6 +447,8 @@ mod tests { fn intra_stage_archetype_change_prepare() { let mut world = World::new(); let mut resources = Resources::default(); + resources.insert(ComputeTaskPool(TaskPool::default())); + let mut schedule = Schedule::default(); schedule.add_stage("update"); @@ -512,6 +477,7 @@ mod tests { fn schedule() { let mut world = World::new(); let mut resources = Resources::default(); + resources.insert(ComputeTaskPool(TaskPool::default())); resources.insert(Counter::default()); resources.insert(1.0f64); resources.insert(2isize); diff --git a/crates/bevy_ecs/src/schedule/schedule.rs b/crates/bevy_ecs/src/schedule/schedule.rs index c889f8f1d4cdb..fe5f62b7abb8e 100644 --- a/crates/bevy_ecs/src/schedule/schedule.rs +++ b/crates/bevy_ecs/src/schedule/schedule.rs @@ -1,6 +1,5 @@ use crate::{ resource::Resources, - schedule::ParallelExecutorOptions, system::{System, SystemId, ThreadLocalExecution}, }; use bevy_hecs::World; @@ -168,15 +167,6 @@ impl Schedule { return; } - let thread_pool_builder = resources - .get::() - .map(|options| (*options).clone()) - .unwrap_or_else(ParallelExecutorOptions::default) - .create_builder(); - // For now, bevy_ecs only uses the global thread pool so it is sufficient to configure it once here. - // Dont call .unwrap() as the function is called twice.. - let _ = thread_pool_builder.build_global(); - for stage in self.stages.values_mut() { for system in stage.iter_mut() { let mut system = system.lock(); diff --git a/crates/bevy_math/src/clamp.rs b/crates/bevy_math/src/clamp.rs new file mode 100644 index 0000000000000..92807e54f179e --- /dev/null +++ b/crates/bevy_math/src/clamp.rs @@ -0,0 +1,19 @@ +/// A value bounded by a minimum and a maximum +/// +/// If input is less than min then this returns min. +/// If input is greater than max then this returns max. +/// Otherwise this returns input. +/// +/// **Panics** in debug mode if `!(min <= max)`. +/// +/// Original implementation from num-traits licensed as MIT +pub fn clamp(input: T, min: T, max: T) -> T { + debug_assert!(min <= max, "min must be less than or equal to max"); + if input < min { + min + } else if input > max { + max + } else { + input + } +} diff --git a/crates/bevy_math/src/lib.rs b/crates/bevy_math/src/lib.rs index 3dcb0614dc565..121932725b725 100644 --- a/crates/bevy_math/src/lib.rs +++ b/crates/bevy_math/src/lib.rs @@ -1,6 +1,8 @@ +mod clamp; mod face_toward; mod geometry; +pub use clamp::*; pub use face_toward::*; pub use geometry::*; pub use glam::*; diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml new file mode 100644 index 0000000000000..38ee408ee2682 --- /dev/null +++ b/crates/bevy_tasks/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "bevy_tasks" +version = "0.1.3" +authors = [ + "Bevy Contributors ", + "Lachlan Sneff ", + "Philip Degarmo " +] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +multitask = "0.2" +num_cpus = "1" +parking = "1" +pollster = "0.2" diff --git a/crates/bevy_tasks/README.md b/crates/bevy_tasks/README.md new file mode 100644 index 0000000000000..67aebf87c4bea --- /dev/null +++ b/crates/bevy_tasks/README.md @@ -0,0 +1,32 @@ +# bevy_tasks + +A refreshingly simple task executor for bevy. :) + +This is a simple threadpool with minimal dependencies. The main usecase is a scoped fork-join, i.e. spawning tasks from +a single thread and having that thread await the completion of those tasks. This is intended specifically for +[`bevy`][bevy] as a lighter alternative to [`rayon`][rayon] for this specific usecase. There are also utilities for +generating the tasks from a slice of data. This library is intended for games and makes no attempt to ensure fairness +or ordering of spawned tasks. + +It is based on [`multitask`][multitask], a lightweight executor that allows the end user to manage their own threads. +`multitask` is based on async-task, a core piece of async-std. + +[bevy]: https://bevyengine.org +[rayon]: https://github.com/rayon-rs/rayon +[multitask]: https://github.com/stjepang/multitask + +## Dependencies + +A very small dependency list is a key feature of this module + +``` +├── multitask +│ ├── async-task +│ ├── concurrent-queue +│ │ └── cache-padded +│ └── fastrand +├── num_cpus +│ └── libc +├── parking +└── pollster +``` diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs new file mode 100644 index 0000000000000..26dfaeac285d9 --- /dev/null +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -0,0 +1,33 @@ +use bevy_tasks::TaskPoolBuilder; + +// This sample demonstrates creating a thread pool with 4 tasks and spawning 40 tasks that spin +// for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical +// cores) + +fn main() { + let pool = TaskPoolBuilder::new() + .thread_name("Busy Behavior ThreadPool".to_string()) + .num_threads(4) + .build(); + + let t0 = std::time::Instant::now(); + pool.scope(|s| { + for i in 0..40 { + s.spawn(async move { + let now = std::time::Instant::now(); + while std::time::Instant::now() - now < std::time::Duration::from_millis(100) { + // spin, simulating work being done + } + + println!( + "Thread {:?} index {} finished", + std::thread::current().id(), + i + ); + }) + } + }); + + let t1 = std::time::Instant::now(); + println!("all tasks finished in {} secs", (t1 - t0).as_secs_f32()); +} diff --git a/crates/bevy_tasks/examples/idle_behavior.rs b/crates/bevy_tasks/examples/idle_behavior.rs new file mode 100644 index 0000000000000..4a392cb2e6da9 --- /dev/null +++ b/crates/bevy_tasks/examples/idle_behavior.rs @@ -0,0 +1,31 @@ +use bevy_tasks::TaskPoolBuilder; + +// This sample demonstrates a thread pool with one thread per logical core and only one task +// spinning. Other than the one thread, the system should remain idle, demonstrating good behavior +// for small workloads. + +fn main() { + let pool = TaskPoolBuilder::new() + .thread_name("Idle Behavior ThreadPool".to_string()) + .build(); + + pool.scope(|s| { + for i in 0..1 { + s.spawn(async move { + println!("Blocking for 10 seconds"); + let now = std::time::Instant::now(); + while std::time::Instant::now() - now < std::time::Duration::from_millis(10000) { + // spin, simulating work being done + } + + println!( + "Thread {:?} index {} finished", + std::thread::current().id(), + i + ); + }) + } + }); + + println!("all tasks finished"); +} diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs new file mode 100644 index 0000000000000..8ac79b3e1cf37 --- /dev/null +++ b/crates/bevy_tasks/src/lib.rs @@ -0,0 +1,26 @@ +mod slice; +pub use slice::{ParallelSlice, ParallelSliceMut}; + +mod task; +pub use task::Task; + +mod task_pool; +pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; + +mod usages; +pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool}; + +pub mod prelude { + pub use crate::{ + slice::{ParallelSlice, ParallelSliceMut}, + usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool}, + }; +} + +pub fn logical_core_count() -> usize { + num_cpus::get() +} + +pub fn physical_core_count() -> usize { + num_cpus::get_physical() +} diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs new file mode 100644 index 0000000000000..a04c9af10a28f --- /dev/null +++ b/crates/bevy_tasks/src/slice.rs @@ -0,0 +1,116 @@ +use super::TaskPool; + +pub trait ParallelSlice: AsRef<[T]> { + fn par_chunk_map(&self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec + where + F: Fn(&[T]) -> R + Send + Sync, + R: Send + 'static, + { + let slice = self.as_ref(); + let f = &f; + task_pool.scope(|scope| { + for chunk in slice.chunks(chunk_size) { + scope.spawn(async move { f(chunk) }); + } + }) + } + + fn par_splat_map(&self, task_pool: &TaskPool, max_tasks: Option, f: F) -> Vec + where + F: Fn(&[T]) -> R + Send + Sync, + R: Send + 'static, + { + let slice = self.as_ref(); + let chunk_size = std::cmp::max( + 1, + std::cmp::max( + slice.len() / task_pool.thread_num(), + slice.len() / max_tasks.unwrap_or(usize::MAX), + ), + ); + + slice.par_chunk_map(task_pool, chunk_size, f) + } +} + +impl ParallelSlice for S where S: AsRef<[T]> {} + +pub trait ParallelSliceMut: AsMut<[T]> { + fn par_chunk_map_mut(&mut self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec + where + F: Fn(&mut [T]) -> R + Send + Sync, + R: Send + 'static, + { + let slice = self.as_mut(); + let f = &f; + task_pool.scope(|scope| { + for chunk in slice.chunks_mut(chunk_size) { + scope.spawn(async move { f(chunk) }); + } + }) + } + + fn par_splat_map_mut( + &mut self, + task_pool: &TaskPool, + max_tasks: Option, + f: F, + ) -> Vec + where + F: Fn(&mut [T]) -> R + Send + Sync, + R: Send + 'static, + { + let mut slice = self.as_mut(); + let chunk_size = std::cmp::max( + 1, + std::cmp::max( + slice.len() / task_pool.thread_num(), + slice.len() / max_tasks.unwrap_or(usize::MAX), + ), + ); + + slice.par_chunk_map_mut(task_pool, chunk_size, f) + } +} + +impl ParallelSliceMut for S where S: AsMut<[T]> {} + +#[cfg(test)] +mod tests { + use crate::*; + + #[test] + fn test_par_chunks_map() { + let v = vec![42; 1000]; + let task_pool = TaskPool::new(); + let outputs = v.par_splat_map(&task_pool, None, |numbers| -> i32 { numbers.iter().sum() }); + + let mut sum = 0; + for output in outputs { + sum += output; + } + + assert_eq!(sum, 1000 * 42); + } + + #[test] + fn test_par_chunks_map_mut() { + let mut v = vec![42; 1000]; + let task_pool = TaskPool::new(); + + let outputs = v.par_splat_map_mut(&task_pool, None, |numbers| -> i32 { + for number in numbers.iter_mut() { + *number *= 2; + } + numbers.iter().sum() + }); + + let mut sum = 0; + for output in outputs { + sum += output; + } + + assert_eq!(sum, 1000 * 42 * 2); + assert_eq!(v[0], 84); + } +} diff --git a/crates/bevy_tasks/src/task.rs b/crates/bevy_tasks/src/task.rs new file mode 100644 index 0000000000000..27d8fc4e38021 --- /dev/null +++ b/crates/bevy_tasks/src/task.rs @@ -0,0 +1,45 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// Wraps `multitask::Task`, a spawned future. +/// +/// Tasks are also futures themselves and yield the output of the spawned future. +/// +/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit +/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method. +/// +/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. +/// Wraps multitask::Task +pub struct Task(multitask::Task); + +impl Task { + /// Detaches the task to let it keep running in the background. See `multitask::Task::detach` + pub fn detach(self) { + self.0.detach(); + } + + /// Cancels the task and waits for it to stop running. + /// + /// Returns the task's output if it was completed just before it got canceled, or [`None`] if + /// it didn't complete. + /// + /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of + /// canceling because it also waits for the task to stop running. + /// + /// See `multitask::Task::cancel` + pub async fn cancel(self) -> Option { + self.0.cancel().await + } +} + +impl Future for Task { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Safe because Task is pinned and contains multitask::Task by value + unsafe { self.map_unchecked_mut(|x| &mut x.0).poll(cx) } + } +} diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs new file mode 100644 index 0000000000000..546a8c1a081a2 --- /dev/null +++ b/crates/bevy_tasks/src/task_pool.rs @@ -0,0 +1,285 @@ +use parking::Unparker; +use std::{ + future::Future, + mem, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, JoinHandle}, +}; + +/// Used to create a TaskPool +#[derive(Debug, Default, Clone)] +pub struct TaskPoolBuilder { + /// If set, we'll set up the thread pool to use at most n threads. Otherwise use + /// the logical core count of the system + num_threads: Option, + /// If set, we'll use the given stack size rather than the system default + stack_size: Option, + /// Allows customizing the name of the threads - helpful for debugging. If set, threads will + /// be named (), i.e. "MyThreadPool (2)" + thread_name: Option, +} + +impl TaskPoolBuilder { + /// Creates a new TaskPoolBuilder instance + pub fn new() -> Self { + Self::default() + } + + /// Override the number of threads created for the pool. If unset, we default to the number + /// of logical cores of the system + pub fn num_threads(mut self, num_threads: usize) -> Self { + self.num_threads = Some(num_threads); + self + } + + /// Override the stack size of the threads created for the pool + pub fn stack_size(mut self, stack_size: usize) -> Self { + self.stack_size = Some(stack_size); + self + } + + /// Override the name of the threads created for the pool. If set, threads will + /// be named (), i.e. "MyThreadPool (2)" + pub fn thread_name(mut self, thread_name: String) -> Self { + self.thread_name = Some(thread_name); + self + } + + /// Creates a new ThreadPoolBuilder based on the current options. + pub fn build(self) -> TaskPool { + TaskPool::new_internal( + self.num_threads, + self.stack_size, + self.thread_name.as_deref(), + ) + } +} + +struct TaskPoolInner { + threads: Vec<(JoinHandle<()>, Arc)>, + shutdown_flag: Arc, +} + +impl Drop for TaskPoolInner { + fn drop(&mut self) { + self.shutdown_flag.store(true, Ordering::Release); + + for (_, unparker) in &self.threads { + unparker.unpark(); + } + for (join_handle, _) in self.threads.drain(..) { + join_handle + .join() + .expect("task thread panicked while executing"); + } + } +} + +/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by +/// the pool on threads owned by the pool. +#[derive(Clone)] +pub struct TaskPool { + /// The executor for the pool + /// + /// This has to be separate from TaskPoolInner because we have to create an Arc to + /// pass into the worker threads, and we must create the worker threads before we can create the + /// Vec> contained within TaskPoolInner + executor: Arc, + + /// Inner state of the pool + inner: Arc, +} + +impl TaskPool { + /// Create a `TaskPool` with the default configuration. + pub fn new() -> Self { + TaskPoolBuilder::new().build() + } + + fn new_internal( + num_threads: Option, + stack_size: Option, + thread_name: Option<&str>, + ) -> Self { + let executor = Arc::new(multitask::Executor::new()); + let shutdown_flag = Arc::new(AtomicBool::new(false)); + + let num_threads = num_threads.unwrap_or_else(num_cpus::get); + + let threads = (0..num_threads) + .map(|i| { + let ex = Arc::clone(&executor); + let flag = Arc::clone(&shutdown_flag); + let (p, u) = parking::pair(); + let unparker = Arc::new(u); + let u = Arc::clone(&unparker); + // Run an executor thread. + + let thread_name = if let Some(thread_name) = thread_name { + format!("{} ({})", thread_name, i) + } else { + format!("TaskPool ({})", i) + }; + + let mut thread_builder = thread::Builder::new().name(thread_name); + + if let Some(stack_size) = stack_size { + thread_builder = thread_builder.stack_size(stack_size); + } + + let handle = thread_builder + .spawn(move || { + let ticker = ex.ticker(move || u.unpark()); + loop { + if flag.load(Ordering::Acquire) { + break; + } + + if !ticker.tick() { + p.park(); + } + } + }) + .expect("failed to spawn thread"); + + (handle, unparker) + }) + .collect(); + + Self { + executor, + inner: Arc::new(TaskPoolInner { + threads, + shutdown_flag, + }), + } + } + + /// Return the number of threads owned by the task pool + pub fn thread_num(&self) -> usize { + self.inner.threads.len() + } + + /// Allows spawning non-`static futures on the thread pool. The function takes a callback, + /// passing a scope object into it. The scope object provided to the callback can be used + /// to spawn tasks. This function will await the completion of all tasks before returning. + /// + /// This is similar to `rayon::scope` and `crossbeam::scope` + pub fn scope<'scope, F, T>(&self, f: F) -> Vec + where + F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + T: Send + 'static, + { + // SAFETY: This function blocks until all futures complete, so this future must return + // before this function returns. However, rust has no way of knowing + // this so we must convert to 'static here to appease the compiler as it is unable to + // validate safety. + let executor: &multitask::Executor = &*self.executor as &multitask::Executor; + let executor: &'scope multitask::Executor = unsafe { mem::transmute(executor) }; + + let fut = async move { + let mut scope = Scope { + executor, + spawned: Vec::new(), + }; + + f(&mut scope); + + let mut results = Vec::with_capacity(scope.spawned.len()); + for task in scope.spawned { + results.push(task.await); + } + + results + }; + + // Move the value to ensure that it is owned + let mut fut = fut; + + // Shadow the original binding so that it can't be directly accessed + // ever again. + let fut = unsafe { Pin::new_unchecked(&mut fut) }; + + // SAFETY: This function blocks until all futures complete, so we do not read/write the + // data from futures outside of the 'scope lifetime. However, rust has no way of knowing + // this so we must convert to 'static here to appease the compiler as it is unable to + // validate safety. + let fut: Pin<&mut (dyn Future> + Send)> = fut; + let fut: Pin<&'static mut (dyn Future> + Send + 'static)> = + unsafe { mem::transmute(fut) }; + + pollster::block_on(self.executor.spawn(fut)) + } + + /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be + /// cancelled and "detached" allowing it to continue running without having to be polled by the + /// end-user. + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> impl Future + Send + where + T: Send + 'static, + { + self.executor.spawn(future) + } +} + +impl Default for TaskPool { + fn default() -> Self { + Self::new() + } +} + +pub struct Scope<'scope, T> { + executor: &'scope multitask::Executor, + spawned: Vec>, +} + +impl<'scope, T: Send + 'static> Scope<'scope, T> { + pub fn spawn + 'scope + Send>(&mut self, f: Fut) { + // SAFETY: This function blocks until all futures complete, so we do not read/write the + // data from futures outside of the 'scope lifetime. However, rust has no way of knowing + // this so we must convert to 'static here to appease the compiler as it is unable to + // validate safety. + let fut: Pin + 'scope + Send>> = Box::pin(f); + let fut: Pin + 'static + Send>> = unsafe { mem::transmute(fut) }; + + let task = self.executor.spawn(fut); + self.spawned.push(task); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_spawn() { + let pool = TaskPool::new(); + + let foo = Box::new(42); + let foo = &*foo; + + let outputs = pool.scope(|scope| { + for i in 0..100 { + scope.spawn(async move { + println!("task {}", i); + if *foo != 42 { + panic!("not 42!?!?") + } else { + *foo + } + }); + } + }); + + for output in outputs { + assert_eq!(output, 42); + } + } +} diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs new file mode 100644 index 0000000000000..f604aae70a73f --- /dev/null +++ b/crates/bevy_tasks/src/usages.rs @@ -0,0 +1,52 @@ +//! Definitions for a few common task pools that we want. Generally the determining factor for what +//! kind of work should go in each pool is latency requirements. +//! +//! For CPU-intensive work (tasks that generally spin until completion) we have a standard Compute +//! pool and an AsyncCompute pool. Work that does not need to be completed to present the next +//! frame should go to the AsyncCompute pool +//! +//! For IO-intensive work (tasks that spend very little time in a "woken" state) we have an IO +//! task pool. The tasks here are expected to complete very quickly. Generally they should just +//! await receiving data from somewhere (i.e. disk) and signal other systems when the data is ready +//! for consumption. (likely via channels) + +use super::TaskPool; +use std::ops::Deref; + +/// A newtype for a task pool for CPU-intensive work that must be completed to deliver the next +/// frame +#[derive(Clone)] +pub struct ComputeTaskPool(pub TaskPool); + +impl Deref for ComputeTaskPool { + type Target = TaskPool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// A newtype for a task pool for CPU-intensive work that may span across multiple frames +#[derive(Clone)] +pub struct AsyncComputeTaskPool(pub TaskPool); + +impl Deref for AsyncComputeTaskPool { + type Target = TaskPool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a +/// "woken" state) +#[derive(Clone)] +pub struct IOTaskPool(pub TaskPool); + +impl Deref for IOTaskPool { + type Target = TaskPool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/examples/app/thread_pool_resources.rs b/examples/app/thread_pool_resources.rs index 99f3016236748..27ce33e2670c4 100644 --- a/examples/app/thread_pool_resources.rs +++ b/examples/app/thread_pool_resources.rs @@ -1,10 +1,11 @@ -use bevy::{ecs::ParallelExecutorOptions, prelude::*}; +use bevy::prelude::*; +use bevy_app::DefaultTaskPoolOptions; /// This example illustrates how to customize the thread pool used internally (e.g. to only use a /// certain number of threads). fn main() { App::build() - .add_resource(ParallelExecutorOptions::new().with_num_threads(Some(4))) + .add_resource(DefaultTaskPoolOptions::with_num_threads(4)) .add_default_plugins() .run(); } diff --git a/src/lib.rs b/src/lib.rs index ae29f1b089ca6..b0a00b83fabf3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,7 @@ pub use bevy_property as property; pub use bevy_render as render; pub use bevy_scene as scene; pub use bevy_sprite as sprite; +pub use bevy_tasks as tasks; pub use bevy_text as text; pub use bevy_transform as transform; pub use bevy_type_registry as type_registry;