Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default task pools and method for configuring them #2

Merged
merged 3 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/bevy_app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ keywords = ["bevy"]
# bevy
bevy_derive = { path = "../bevy_derive", version = "0.1" }
bevy_ecs = { path = "../bevy_ecs", version = "0.1" }
bevy_tasks = { path = "../bevy_tasks" }
num_cpus = "1"

# other
libloading = "0.6"
Expand Down
7 changes: 6 additions & 1 deletion crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::app_builder::AppBuilder;
use crate::DefaultTaskPoolOptions;
use bevy_ecs::{ParallelExecutor, Resources, Schedule, World};

#[allow(clippy::needless_doctest_main)]
Expand Down Expand Up @@ -63,7 +64,11 @@ impl App {
}

pub fn run(mut self) {
ParallelExecutor::initialize_pools(&mut self.resources);
// Setup the default bevy task pools
self.resources
.get_cloned::<DefaultTaskPoolOptions>()
.unwrap_or_else(DefaultTaskPoolOptions::default)
.create_default_pools(&mut self.resources);

self.startup_schedule.initialize(&mut self.resources);
self.startup_executor.run(
Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ mod app_builder;
mod event;
mod plugin;
mod schedule_runner;
mod task_pool_options;

pub use app::*;
pub use app_builder::*;
pub use bevy_derive::DynamicPlugin;
pub use event::*;
pub use plugin::*;
pub use schedule_runner::*;
pub use task_pool_options::*;

pub mod prelude {
pub use crate::{
Expand Down
157 changes: 157 additions & 0 deletions crates/bevy_app/src/task_pool_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use bevy_ecs::Resources;
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder};

fn clamp_usize(value: usize, min: usize, max: usize) -> usize {
if value > max {
max
} else if value < min {
min
} else {
value
}
}

/// Defines a simple way to determine how many threads to use given the number of remaining cores
/// and number of total cores
#[derive(Clone)]
pub struct TaskPoolThreadAssignmentPolicy {
/// Force using at least this many threads
pub min_threads: usize,
/// Under no circumstance use more than this many threads for this pool
pub max_threads: usize,
/// Target using this percentage of total cores, clamped by min_threads and max_threads. It is
/// permitted to use 1.0 to try to use all remaining threads
pub percent: f32,
}

impl TaskPoolThreadAssignmentPolicy {
/// Determine the number of threads to use for this task pool
fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize {
assert!(self.percent >= 0.0);
let mut desired = (total_threads as f32 * self.percent).round() as usize;
aclysma marked this conversation as resolved.
Show resolved Hide resolved

// Limit ourselves to the number of cores available
desired = desired.min(remaining_threads);

// Clamp by min_threads, max_threads. (This may result in us using more threads than are
// available, this is intended. An example case where this might happen is a device with
// <= 2 threads.
clamp_usize(desired, self.min_threads, self.max_threads)
}
}

/// Helper for configuring and creating the default task pools. For end-users who want full control,
/// insert the default task pools into the resource map manually. If the pools are already inserted,
/// this helper will do nothing.
#[derive(Clone)]
pub struct DefaultTaskPoolOptions {
/// If the number of physical cores is less than min_total_threads, force using min_total_threads
pub min_total_threads: usize,
/// If the number of physical cores is grater than max_total_threads, force using max_total_threads
pub max_total_threads: usize,

/// Used to determine number of IO threads to allocate
pub io: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of async compute threads to allocate
pub async_compute: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of compute threads to allocate
pub compute: TaskPoolThreadAssignmentPolicy,
}

impl Default for DefaultTaskPoolOptions {
fn default() -> Self {
DefaultTaskPoolOptions {
// By default, use however many cores are available on the system
min_total_threads: 1,
max_total_threads: std::usize::MAX,

// Use 25% of cores for IO, at least 1, no more than 4
io: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},

// Use 25% of cores for async compute, at least 1, no more than 4
async_compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},

// Use all remaining cores for compute (at least 1)
compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: std::usize::MAX,
percent: 1.0, // This 1.0 here means "whatever is left over"
},
}
}
}

impl DefaultTaskPoolOptions {
/// Create a configuration that forces using the given number of threads.
pub fn with_num_threads(thread_count: usize) -> Self {
let mut options = Self::default();
options.min_total_threads = thread_count;
options.max_total_threads = thread_count;

options
}

/// Inserts the default thread pools into the given resource map based on the configured values
pub fn create_default_pools(&self, resources: &mut Resources) {
aclysma marked this conversation as resolved.
Show resolved Hide resolved
let total_threads = clamp_usize(
num_cpus::get(),
self.min_total_threads,
self.max_total_threads,
);

let mut remaining_threads = total_threads;

if !resources.contains::<IOTaskPool>() {
// Determine the number of IO threads we will use
let io_threads = self
.io
.get_number_of_threads(remaining_threads, total_threads);
remaining_threads -= io_threads;

resources.insert(IOTaskPool(
TaskPoolBuilder::default()
.num_threads(io_threads)
.thread_name("IO Task Pool".to_string())
.build(),
));
}

if !resources.contains::<AsyncComputeTaskPool>() {
// Determine the number of async compute threads we will use
let async_compute_threads = self
.async_compute
.get_number_of_threads(remaining_threads, total_threads);
remaining_threads -= async_compute_threads;

resources.insert(AsyncComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(async_compute_threads)
.thread_name("Async Compute Task Pool".to_string())
.build(),
));
}

if !resources.contains::<ComputeTaskPool>() {
// Determine the number of compute threads we will use
// This is intentionally last so that an end user can specify 1.0 as the percent
let compute_threads = self
.compute
.get_number_of_threads(remaining_threads, total_threads);

resources.insert(ComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(compute_threads)
.thread_name("Compute Task Pool".to_string())
.build(),
));
}
}
}
2 changes: 2 additions & 0 deletions crates/bevy_ecs/src/resource/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ impl Resources {
self.get_resource_mut(ResourceIndex::Global)
}

/// Returns a clone of the underlying resource, this is helpful when borrowing something
aclysma marked this conversation as resolved.
Show resolved Hide resolved
/// cloneable (like a task pool) without taking a borrow on the resource map
pub fn get_cloned<T: Resource + Clone>(&self) -> Option<T> {
self.get::<T>().map(|r| (*r).clone())
}
Expand Down
67 changes: 3 additions & 64 deletions crates/bevy_ecs/src/schedule/parallel_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,6 @@ impl Default for ParallelExecutor {
}

impl ParallelExecutor {
pub fn initialize_pools(resources: &mut Resources) {
let compute_pool: bevy_tasks::ComputePool = resources
.get::<ParallelExecutorOptions>()
.map(|options| (*options).clone())
.unwrap_or_else(ParallelExecutorOptions::default)
.create_builder()
.build();

// For now, bevy_ecs only uses the global task pool so it is sufficient to configure it once here.
resources.insert(compute_pool);
}

pub fn without_tracker_clears() -> Self {
Self {
clear_trackers: false,
Expand Down Expand Up @@ -77,52 +65,6 @@ impl ParallelExecutor {
}
}

/// This can be added as an app resource to control the global `bevy_tasks::TaskPool` used by ecs.
// Dev internal note: We cannot directly expose a ThreadPoolBuilder here as it does not implement Send and Sync.
#[derive(Debug, Default, Clone)]
pub struct ParallelExecutorOptions {
/// If some value, we'll set up the thread pool to use at most n threads. See `bevy_tasks::TaskPoolBuilder::num_threads`.
num_threads: Option<usize>,
/// If some value, we'll set up the thread pool's' workers to the given stack size. See `bevy_tasks::TaskPoolBuilder::stack_size`.
stack_size: Option<usize>,
// TODO: Do we also need/want to expose other features (*_handler, etc.)
}

impl ParallelExecutorOptions {
/// Creates a new ParallelExecutorOptions instance
pub fn new() -> Self {
Self::default()
}

/// Sets the num_threads option, using the builder pattern
pub fn with_num_threads(mut self, num_threads: Option<usize>) -> Self {
self.num_threads = num_threads;
self
}

/// Sets the stack_size option, using the builder pattern. WARNING: Only use this if you know what you're doing,
/// otherwise your application may run into stability and performance issues.
pub fn with_stack_size(mut self, stack_size: Option<usize>) -> Self {
self.stack_size = stack_size;
self
}

/// Creates a new ThreadPoolBuilder based on the current options.
pub(crate) fn create_builder(&self) -> bevy_tasks::TaskPoolBuilder {
let mut builder = bevy_tasks::TaskPoolBuilder::new();

if let Some(num_threads) = self.num_threads {
builder = builder.num_threads(num_threads);
}

if let Some(stack_size) = self.stack_size {
builder = builder.stack_size(stack_size);
}

builder
}
}

#[derive(Debug, Clone)]
pub struct ExecutorStage {
/// each system's set of dependencies
Expand Down Expand Up @@ -325,11 +267,6 @@ impl ExecutorStage {
system.run(world, resources);
sender.send(system_index).unwrap();
});
// scope.spawn_fifo(move |_| {
// let mut system = system.lock();
// system.run(world, resources);
// sender.send(system_index).unwrap();
// });

systems_currently_running = true;
}
Expand All @@ -345,7 +282,9 @@ impl ExecutorStage {
systems: &[Arc<Mutex<Box<dyn System>>>],
schedule_changed: bool,
) {
let compute_pool = resources.get_cloned::<bevy_tasks::ComputePool>().unwrap();
let compute_pool = resources
.get_cloned::<bevy_tasks::ComputeTaskPool>()
.unwrap();

// if the schedule has changed, clear executor state / fill it with new defaults
if schedule_changed {
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bevy_tasks"
version = "0.1.0"
version = "0.1.3"
authors = [
"Bevy Contributors <bevyengine@gmail.com>",
"Lachlan Sneff <lachlan.sneff@gmail.com>",
Expand Down
Loading