From 46c0db337512ce7024935905b52ffdf476fd1c12 Mon Sep 17 00:00:00 2001 From: re0312 Date: Sat, 20 Jan 2024 19:37:56 +0800 Subject: [PATCH 01/13] rayon --- crates/bevy_tasks/examples/busy_behavior.rs | 1 + crates/bevy_tasks/examples/vec_example.rs | 7 + crates/bevy_tasks/src/iter/adapters.rs | 207 ---- crates/bevy_tasks/src/iter/collect.rs | 282 +++++ crates/bevy_tasks/src/iter/extend.rs | 29 + crates/bevy_tasks/src/iter/for_each.rs | 75 ++ crates/bevy_tasks/src/iter/from_par_iter.rs | 25 + crates/bevy_tasks/src/iter/mod.rs | 1068 +++++++++++-------- crates/bevy_tasks/src/iter/noop.rs | 8 + crates/bevy_tasks/src/lib.rs | 6 +- crates/bevy_tasks/src/slice.rs | 339 +++--- crates/bevy_tasks/src/task_pool.rs | 7 +- crates/bevy_tasks/src/vec.rs | 0 13 files changed, 1201 insertions(+), 853 deletions(-) create mode 100644 crates/bevy_tasks/examples/vec_example.rs delete mode 100644 crates/bevy_tasks/src/iter/adapters.rs create mode 100644 crates/bevy_tasks/src/iter/collect.rs create mode 100644 crates/bevy_tasks/src/iter/extend.rs create mode 100644 crates/bevy_tasks/src/iter/for_each.rs create mode 100644 crates/bevy_tasks/src/iter/from_par_iter.rs create mode 100644 crates/bevy_tasks/src/iter/noop.rs create mode 100644 crates/bevy_tasks/src/vec.rs diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index 23a43de017a59..c9f8fcf6494b4 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -12,6 +12,7 @@ fn main() { .build(); let t0 = Instant::now(); + let a =vec![1,2,3]; pool.scope(|s| { for i in 0..40 { s.spawn(async move { diff --git a/crates/bevy_tasks/examples/vec_example.rs b/crates/bevy_tasks/examples/vec_example.rs new file mode 100644 index 0000000000000..9e6d6b5ee03c9 --- /dev/null +++ b/crates/bevy_tasks/examples/vec_example.rs @@ -0,0 +1,7 @@ +use bevy_tasks::{IntoParallelRefMutIterator, ParallelIterator}; + +fn main() { + let mut a = vec![2, 3, 4]; + let c = a.par_iter_mut(); + c.for_each(|a| {}); +} diff --git a/crates/bevy_tasks/src/iter/adapters.rs b/crates/bevy_tasks/src/iter/adapters.rs deleted file mode 100644 index c880167a62631..0000000000000 --- a/crates/bevy_tasks/src/iter/adapters.rs +++ /dev/null @@ -1,207 +0,0 @@ -use crate::iter::ParallelIterator; - -#[derive(Debug)] -pub struct Chain { - pub(crate) left: T, - pub(crate) right: U, - pub(crate) left_in_progress: bool, -} - -impl ParallelIterator for Chain -where - B: Iterator + Send, - T: ParallelIterator, - U: ParallelIterator, -{ - fn next_batch(&mut self) -> Option { - if self.left_in_progress { - match self.left.next_batch() { - b @ Some(_) => return b, - None => self.left_in_progress = false, - } - } - self.right.next_batch() - } -} - -#[derive(Debug)] -pub struct Map { - pub(crate) iter: P, - pub(crate) f: F, -} - -impl ParallelIterator> for Map -where - B: Iterator + Send, - U: ParallelIterator, - F: FnMut(B::Item) -> T + Send + Clone, -{ - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.map(self.f.clone())) - } -} - -#[derive(Debug)] -pub struct Filter { - pub(crate) iter: P, - pub(crate) predicate: F, -} - -impl ParallelIterator> for Filter -where - B: Iterator + Send, - P: ParallelIterator, - F: FnMut(&B::Item) -> bool + Send + Clone, -{ - fn next_batch(&mut self) -> Option> { - self.iter - .next_batch() - .map(|b| b.filter(self.predicate.clone())) - } -} - -#[derive(Debug)] -pub struct FilterMap { - pub(crate) iter: P, - pub(crate) f: F, -} - -impl ParallelIterator> for FilterMap -where - B: Iterator + Send, - P: ParallelIterator, - F: FnMut(B::Item) -> Option + Send + Clone, -{ - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.filter_map(self.f.clone())) - } -} - -#[derive(Debug)] -pub struct FlatMap { - pub(crate) iter: P, - pub(crate) f: F, -} - -impl ParallelIterator> for FlatMap -where - B: Iterator + Send, - P: ParallelIterator, - F: FnMut(B::Item) -> U + Send + Clone, - U: IntoIterator, - U::IntoIter: Send, -{ - // This extends each batch using the flat map. The other option is - // to turn each IntoIter into its own batch. - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.flat_map(self.f.clone())) - } -} - -#[derive(Debug)] -pub struct Flatten

{ - pub(crate) iter: P, -} - -impl ParallelIterator> for Flatten

-where - B: Iterator + Send, - P: ParallelIterator, - B::Item: IntoIterator, - ::IntoIter: Send, -{ - // This extends each batch using the flatten. The other option is to - // turn each IntoIter into its own batch. - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.flatten()) - } -} - -#[derive(Debug)] -pub struct Fuse

{ - pub(crate) iter: Option

, -} - -impl ParallelIterator for Fuse

-where - B: Iterator + Send, - P: ParallelIterator, -{ - fn next_batch(&mut self) -> Option { - match &mut self.iter { - Some(iter) => iter.next_batch().or_else(|| { - self.iter = None; - None - }), - None => None, - } - } -} - -#[derive(Debug)] -pub struct Inspect { - pub(crate) iter: P, - pub(crate) f: F, -} - -impl ParallelIterator> for Inspect -where - B: Iterator + Send, - P: ParallelIterator, - F: FnMut(&B::Item) + Send + Clone, -{ - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.inspect(self.f.clone())) - } -} - -#[derive(Debug)] -pub struct Copied

{ - pub(crate) iter: P, -} - -impl<'a, B, P, T> ParallelIterator> for Copied

-where - B: Iterator + Send, - P: ParallelIterator, - T: 'a + Copy, -{ - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.copied()) - } -} - -#[derive(Debug)] -pub struct Cloned

{ - pub(crate) iter: P, -} - -impl<'a, B, P, T> ParallelIterator> for Cloned

-where - B: Iterator + Send, - P: ParallelIterator, - T: 'a + Copy, -{ - fn next_batch(&mut self) -> Option> { - self.iter.next_batch().map(|b| b.cloned()) - } -} - -#[derive(Debug)] -pub struct Cycle

{ - pub(crate) iter: P, - pub(crate) curr: Option

, -} - -impl ParallelIterator for Cycle

-where - B: Iterator + Send, - P: ParallelIterator + Clone, -{ - fn next_batch(&mut self) -> Option { - self.curr.as_mut().and_then(|c| c.next_batch()).or_else(|| { - self.curr = Some(self.iter.clone()); - self.next_batch() - }) - } -} diff --git a/crates/bevy_tasks/src/iter/collect.rs b/crates/bevy_tasks/src/iter/collect.rs new file mode 100644 index 0000000000000..2f280c5f16940 --- /dev/null +++ b/crates/bevy_tasks/src/iter/collect.rs @@ -0,0 +1,282 @@ +use std::{marker::PhantomData, ptr, slice}; +use super::{Consumer, Folder, Reducer, UnindexedConsumer, ParallelIterator}; + +/// We need to transmit raw pointers across threads. It is possible to do this +/// without any unsafe code by converting pointers to usize or to AtomicPtr +/// then back to a raw pointer for use. We prefer this approach because code +/// that uses this type is more explicit. +/// +/// Unsafe code is still required to dereference the pointer, so this type is +/// not unsound on its own, although it does partly lift the unconditional +/// !Send and !Sync on raw pointers. As always, dereference with care. +struct SendPtr(*mut T); + +// SAFETY: !Send for raw pointers is not for safety, just as a lint +unsafe impl Send for SendPtr {} + +// SAFETY: !Sync for raw pointers is not for safety, just as a lint +unsafe impl Sync for SendPtr {} + +impl SendPtr { + // Helper to avoid disjoint captures of `send_ptr.0` + fn get(self) -> *mut T { + self.0 + } +} + +// Implement Clone without the T: Clone bound from the derive +impl Clone for SendPtr { + fn clone(&self) -> Self { + *self + } +} + +// Implement Copy without the T: Copy bound from the derive +impl Copy for SendPtr {} + +pub(super) struct CollectConsumer<'c, T: Send> { + /// See `CollectResult` for explanation of why this is not a slice + start: SendPtr, + len: usize, + marker: PhantomData<&'c mut T>, +} + +impl CollectConsumer<'_, T> { + /// Create a collector for `len` items in the unused capacity of the vector. + pub(super) fn appender(vec: &mut Vec, len: usize) -> CollectConsumer<'_, T> { + let start = vec.len(); + assert!(vec.capacity() - start >= len); + + // SAFETY: We already made sure to have the additional space allocated. + // The pointer is derived from `Vec` directly, not through a `Deref`, + // so it has provenance over the whole allocation. + unsafe { CollectConsumer::new(vec.as_mut_ptr().add(start), len) } + } +} + +impl<'c, T: Send + 'c> CollectConsumer<'c, T> { + /// The target memory is considered uninitialized, and will be + /// overwritten without reading or dropping existing values. + unsafe fn new(start: *mut T, len: usize) -> Self { + CollectConsumer { + start: SendPtr(start), + len, + marker: PhantomData, + } + } +} + +impl<'c, T: Send + 'c> UnindexedConsumer for CollectConsumer<'c, T> { + fn split_off_left(&self) -> Self { + unreachable!("CollectConsumer must be indexed!") + } + fn to_reducer(&self) -> Self::Reducer { + CollectReducer + } +} + +impl<'c, T: Send + 'c> Consumer for CollectConsumer<'c, T> { + type Folder = CollectResult<'c, T>; + type Reducer = CollectReducer; + type Result = CollectResult<'c, T>; + + fn split_at(self, index: usize) -> (Self, Self, CollectReducer) { + let CollectConsumer { start, len, .. } = self; + + // Produce new consumers. + // SAFETY: This assert checks that `index` is a valid offset for `start` + unsafe { + assert!(index <= len); + ( + CollectConsumer::new(start.0, index), + CollectConsumer::new(start.0.add(index), len - index), + CollectReducer, + ) + } + } + + fn full(&self) -> bool { + false + } + + fn into_folder(self) -> Self::Folder { + // Create a result/folder that consumes values and writes them + // into the region after start. The initial result has length 0. + CollectResult { + start: self.start, + total_len: self.len, + initialized_len: 0, + invariant_lifetime: PhantomData, + } + } +} + +pub(super) struct CollectReducer; + +impl<'c, T> Reducer> for CollectReducer { + fn reduce( + self, + mut left: CollectResult<'c, T>, + right: CollectResult<'c, T>, + ) -> CollectResult<'c, T> { + // Merge if the CollectResults are adjacent and in left to right order + // else: drop the right piece now and total length will end up short in the end, + // when the correctness of the collected result is asserted. + unsafe { + let left_end = left.start.0.add(left.initialized_len); + if left_end == right.start.0 { + left.total_len += right.total_len; + left.initialized_len += right.release_ownership(); + } + left + } + } +} + +/// CollectResult represents an initialized part of the target slice. +/// +/// This is a proxy owner of the elements in the slice; when it drops, +/// the elements will be dropped, unless its ownership is released before then. +#[must_use] +pub(super) struct CollectResult<'c, T> { + /// This pointer and length has the same representation as a slice, + /// but retains the provenance of the entire array so that we can merge + /// these regions together in `CollectReducer`. + start: SendPtr, + total_len: usize, + /// The current initialized length after `start` + initialized_len: usize, + /// Lifetime invariance guarantees that the data flows from consumer to result, + /// especially for the `scope_fn` callback in `Collect::with_consumer`. + invariant_lifetime: PhantomData<&'c mut &'c mut [T]>, +} + +impl<'c, T> CollectResult<'c, T> { + /// The current length of the collect result + pub(super) fn len(&self) -> usize { + self.initialized_len + } + + /// Release ownership of the slice of elements, and return the length + pub(super) fn release_ownership(mut self) -> usize { + let ret = self.initialized_len; + self.initialized_len = 0; + ret + } +} + +unsafe impl<'c, T> Send for CollectResult<'c, T> where T: Send {} + +impl<'c, T> Drop for CollectResult<'c, T> { + fn drop(&mut self) { + // Drop the first `self.initialized_len` elements, which have been recorded + // to be initialized by the folder. + unsafe { + ptr::drop_in_place(slice::from_raw_parts_mut( + self.start.0, + self.initialized_len, + )); + } + } +} + +impl<'c, T: Send + 'c> Folder for CollectResult<'c, T> { + type Result = Self; + + fn consume(mut self, item: T) -> Self { + assert!( + self.initialized_len < self.total_len, + "too many values pushed to consumer" + ); + + // SAFETY: The assert above is a bounds check for this write, and we + // avoid assignment here so we do not drop an uninitialized T. + unsafe { + // Write item and increase the initialized length + self.start.0.add(self.initialized_len).write(item); + self.initialized_len += 1; + } + + self + } + + fn complete(self) -> Self::Result { + // NB: We don't explicitly check that the local writes were complete, + // but Collect will assert the total result length in the end. + self + } + + fn full(&self) -> bool { + false + } +} + +/// Collects the results of the iterator into the specified vector. +/// +/// Technically, this only works for `IndexedParallelIterator`, but we're faking a +/// bit of specialization here until Rust can do that natively. Callers are +/// using `opt_len` to find the length before calling this, and only exact +/// iterators will return anything but `None` there. +/// +/// Since the type system doesn't understand that contract, we have to allow +/// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement +/// `UnindexedConsumer`. That implementation panics `unreachable!` in case +/// there's a bug where we actually do try to use this unindexed. +pub(crate) fn special_extend(pi: I, len: usize, v: &mut Vec) +where + I: ParallelIterator, + T: Send, +{ + collect_with_consumer(v, len, |consumer| pi.drive_unindexed(consumer)); +} + +/// Create a consumer on the slice of memory we are collecting into. +/// +/// The consumer needs to be used inside the scope function, and the +/// complete collect result passed back. +/// +/// This method will verify the collect result, and panic if the slice +/// was not fully written into. Otherwise, in the successful case, +/// the vector is complete with the collected result. +fn collect_with_consumer(vec: &mut Vec, len: usize, scope_fn: F) +where + T: Send, + F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>, +{ + // Reserve space for `len` more elements in the vector, + vec.reserve(len); + + // Create the consumer and run the callback for collection. + let result = scope_fn(CollectConsumer::appender(vec, len)); + + // The `CollectResult` represents a contiguous part of the slice, that has + // been written to. On unwind here, the `CollectResult` will be dropped. If + // some producers on the way did not produce enough elements, partial + // `CollectResult`s may have been dropped without being reduced to the final + // result, and we will see that as the length coming up short. + // + // Here, we assert that added length is fully initialized. This is checked + // by the following assert, which verifies if a complete `CollectResult` + // was produced; if the length is correct, it is necessarily covering the + // target slice. Since we know that the consumer cannot have escaped from + // `drive` (by parametricity, essentially), we know that any stores that + // will happen, have happened. Unless some code is buggy, that means we + // should have seen `len` total writes. + let actual_writes = result.len(); + assert!( + actual_writes == len, + "expected {} total writes, but got {}", + len, + actual_writes + ); + + // Release the result's mutable borrow and "proxy ownership" + // of the elements, before the vector takes it over. + result.release_ownership(); + + let new_len = vec.len() + len; + + unsafe { + vec.set_len(new_len); + } +} diff --git a/crates/bevy_tasks/src/iter/extend.rs b/crates/bevy_tasks/src/iter/extend.rs new file mode 100644 index 0000000000000..2c1c348150967 --- /dev/null +++ b/crates/bevy_tasks/src/iter/extend.rs @@ -0,0 +1,29 @@ +use super::{ParallelExtend, IntoParallelIterator, ParallelIterator}; + +/// Extends a vector with items from a parallel iterator. +impl ParallelExtend for Vec +where + T: Send, +{ + fn par_extend(&mut self, par_iter: I) + where + I: IntoParallelIterator, + { + // See the vec_collect benchmarks in rayon-demo for different strategies. + let par_iter = par_iter.into_par_iter(); + match par_iter.opt_len() { + Some(len) => { + // When Rust gets specialization, we can get here for indexed iterators + // without relying on `opt_len`. Until then, `special_extend()` fakes + // an unindexed mode on the promise that `opt_len()` is accurate. + crate::iter::collect::special_extend(par_iter, len, self); + } + None => { + todo!(); + // This works like `extend`, but `Vec::append` is more efficient. + // let list = par_iter.drive_unindexed(ListVecConsumer); + // vec_append(self, list); + } + } + } +} diff --git a/crates/bevy_tasks/src/iter/for_each.rs b/crates/bevy_tasks/src/iter/for_each.rs new file mode 100644 index 0000000000000..e577469cfdc28 --- /dev/null +++ b/crates/bevy_tasks/src/iter/for_each.rs @@ -0,0 +1,75 @@ +use super::{noop::NoopReducer, Consumer, Folder, UnindexedConsumer, ParallelIterator}; + +pub(super) fn for_each(pi: I, op: &F) +where + I: ParallelIterator, + F: Fn(T) + Sync, + T: Send, +{ + let consumer = ForEachConsumer { op }; + pi.drive_unindexed(consumer) +} + +struct ForEachConsumer<'f, F> { + op: &'f F, +} + +impl<'f, F, T> Consumer for ForEachConsumer<'f, F> +where + F: Fn(T) + Sync, +{ + type Folder = ForEachConsumer<'f, F>; + type Reducer = NoopReducer; + type Result = (); + + fn split_at(self, _index: usize) -> (Self, Self, NoopReducer) { + (self.split_off_left(), self, NoopReducer) + } + + fn full(&self) -> bool { + false + } + + fn into_folder(self) -> Self::Folder { + self + } +} + +impl<'f, F, T> UnindexedConsumer for ForEachConsumer<'f, F> +where + F: Fn(T) + Sync, +{ + fn split_off_left(&self) -> Self { + ForEachConsumer { op: self.op } + } + + fn to_reducer(&self) -> NoopReducer { + NoopReducer + } +} + +impl<'f, F, T> Folder for ForEachConsumer<'f, F> +where + F: Fn(T) + Sync, +{ + type Result = (); + + fn consume(self, item: T) -> Self { + (self.op)(item); + self + } + + fn consume_iter(self, iter: I) -> Self + where + I: IntoIterator, + { + iter.into_iter().for_each(self.op); + self + } + + fn complete(self) {} + + fn full(&self) -> bool { + false + } +} diff --git a/crates/bevy_tasks/src/iter/from_par_iter.rs b/crates/bevy_tasks/src/iter/from_par_iter.rs new file mode 100644 index 0000000000000..70bf6b82ba746 --- /dev/null +++ b/crates/bevy_tasks/src/iter/from_par_iter.rs @@ -0,0 +1,25 @@ +use crate::iter::{IntoParallelIterator, FromParallelIterator, ParallelExtend}; + +/// Creates an empty default collection and extends it. +fn collect_extended(par_iter: I) -> C +where + I: IntoParallelIterator, + C: ParallelExtend + Default, +{ + let mut collection = C::default(); + collection.par_extend(par_iter); + collection +} + +/// Collects items from a parallel iterator into a vector. +impl FromParallelIterator for Vec +where + T: Send, +{ + fn from_par_iter(par_iter: I) -> Self + where + I: IntoParallelIterator, + { + collect_extended(par_iter) + } +} diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 6887fa05440a1..f3b37ca75802c 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -1,510 +1,728 @@ -use crate::TaskPool; +use std::cmp; -mod adapters; -pub use adapters::*; +use crate::task_pool::compute_task_pool_thread_num; -/// [`ParallelIterator`] closely emulates the `std::iter::Iterator` -/// interface. However, it uses `bevy_task` to compute batches in parallel. +mod collect; +mod extend; +mod for_each; +mod from_par_iter; +mod noop; + +// pub use self::collect::special_extend; +/// This helper function is used to "connect" a parallel iterator to a +/// consumer. It will convert the `par_iter` into a producer P and +/// then pull items from P and feed them to `consumer`, splitting and +/// creating parallel threads as needed. +/// +/// This is useful when you are implementing your own parallel +/// iterators: it is often used as the definition of the +/// [`drive_unindexed`] or [`drive`] methods. /// -/// Note that the overhead of [`ParallelIterator`] is high relative to some -/// workloads. In particular, if the batch size is too small or task being -/// run in parallel is inexpensive, *a [`ParallelIterator`] could take longer -/// than a normal [`Iterator`]*. Therefore, you should profile your code before -/// using [`ParallelIterator`]. -pub trait ParallelIterator +/// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed +/// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive +pub fn bridge(par_iter: I, consumer: C) -> C::Result where - BatchIter: Iterator + Send, - Self: Sized + Send, + I: IndexedParallelIterator, + C: Consumer, { - /// Returns the next batch of items for processing. - /// - /// Each batch is an iterator with items of the same type as the - /// [`ParallelIterator`]. Returns `None` when there are no batches left. - fn next_batch(&mut self) -> Option; - - /// Returns the bounds on the remaining number of items in the - /// parallel iterator. - /// - /// See [`Iterator::size_hint()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.size_hint) - fn size_hint(&self) -> (usize, Option) { - (0, None) - } - - /// Consumes the parallel iterator and returns the number of items. - /// - /// See [`Iterator::count()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.count) - fn count(mut self, pool: &TaskPool) -> usize { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.count() }); - } - }) - .iter() - .sum() - } - - /// Consumes the parallel iterator and returns the last item. - /// - /// See [`Iterator::last()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.last) - fn last(mut self, _pool: &TaskPool) -> Option { - let mut last_item = None; - while let Some(batch) = self.next_batch() { - last_item = batch.last(); - } - last_item - } + let len = par_iter.len(); + return par_iter.with_producer(Callback { len, consumer }); - /// Consumes the parallel iterator and returns the nth item. - /// - /// See [`Iterator::nth()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.nth) - // TODO: Optimize with size_hint on each batch - fn nth(mut self, _pool: &TaskPool, n: usize) -> Option { - let mut i = 0; - while let Some(batch) = self.next_batch() { - for item in batch { - if i == n { - return Some(item); - } - i += 1; - } - } - None + struct Callback { + len: usize, + consumer: C, } - /// Takes two parallel iterators and returns a parallel iterators over - /// both in sequence. - /// - /// See [`Iterator::chain()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.chain) - // TODO: Use IntoParallelIterator for U - fn chain(self, other: U) -> Chain + impl ProducerCallback for Callback where - U: ParallelIterator, + C: Consumer, { - Chain { - left: self, - right: other, - left_in_progress: true, + type Output = C::Result; + fn callback

(self, producer: P) -> C::Result + where + P: Producer, + { + bridge_producer_consumer(self.len, producer, self.consumer) } } +} - /// Takes a closure and creates a parallel iterator which calls that - /// closure on each item. - /// - /// See [`Iterator::map()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.map) - fn map(self, f: F) -> Map +/// todo +pub fn bridge_producer_consumer(len: usize, producer: P, consumer: C) -> C::Result +where + P: Producer, + C: Consumer, +{ + let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len); + return helper(len, false, splitter, producer, consumer); + + fn helper( + len: usize, + migrated: bool, + mut splitter: LengthSplitter, + producer: P, + consumer: C, + ) -> C::Result where - F: FnMut(BatchIter::Item) -> T + Send + Clone, + P: Producer, + C: Consumer, { - Map { iter: self, f } + println!("222"); + todo!(); + // if consumer.full() { + // consumer.into_folder().complete() + // } else if splitter.try_split(len, migrated) { + // let mid = len / 2; + // let (left_producer, right_producer) = producer.split_at(mid); + // let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); + // let (left_result, right_result) = join_context( + // |context| { + // helper( + // mid, + // context.migrated(), + // splitter, + // left_producer, + // left_consumer, + // ) + // }, + // |context| { + // helper( + // len - mid, + // context.migrated(), + // splitter, + // right_producer, + // right_consumer, + // ) + // }, + // ); + // reducer.reduce(left_result, right_result) + // } else { + // producer.fold_with(consumer.into_folder()).complete() + // } } +} + +/// The `Folder` trait encapsulates [the standard fold +/// operation][fold]. It can be fed many items using the `consume` +/// method. At the end, once all items have been consumed, it can then +/// be converted (using `complete`) into a final value. +/// +/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold +pub trait Folder: Sized { + /// The type of result that will ultimately be produced by the folder. + type Result; + + /// Consume next item and return new sequential state. + fn consume(self, item: Item) -> Self; - /// Calls a closure on each item of a parallel iterator. + /// Consume items from the iterator until full, and return new sequential state. /// - /// See [`Iterator::for_each()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.for_each) - fn for_each(mut self, pool: &TaskPool, f: F) + /// This method is **optional**. The default simply iterates over + /// `iter`, invoking `consume` and checking after each iteration + /// whether `full` returns false. + /// + /// The main reason to override it is if you can provide a more + /// specialized, efficient implementation. + fn consume_iter(mut self, iter: I) -> Self where - F: FnMut(BatchIter::Item) + Send + Clone + Sync, + I: IntoIterator, { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - let newf = f.clone(); - s.spawn(async move { - batch.for_each(newf); - }); + for item in iter { + self = self.consume(item); + if self.full() { + break; } - }); + } + self } - /// Creates a parallel iterator which uses a closure to determine - /// if an element should be yielded. - /// - /// See [`Iterator::filter()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.filter) - fn filter(self, predicate: F) -> Filter - where - F: FnMut(&BatchIter::Item) -> bool, - { - Filter { - iter: self, - predicate, - } + /// Finish consuming items, produce final result. + fn complete(self) -> Self::Result; + + /// Hint whether this `Folder` would like to stop processing + /// further items, e.g. if a search has been completed. + fn full(&self) -> bool; +} + +pub trait Producer: Send + Sized { + /// The type of item that will be produced by this producer once + /// it is converted into an iterator. + type Item; + + /// The type of iterator we will become. + type IntoIter: Iterator + DoubleEndedIterator + ExactSizeIterator; + + /// Convert `self` into an iterator; at this point, no more parallel splits + /// are possible. + fn into_iter(self) -> Self::IntoIter; + + /// The minimum number of items that we will process + /// sequentially. Defaults to 1, which means that we will split + /// all the way down to a single item. This can be raised higher + /// using the [`with_min_len`] method, which will force us to + /// create sequential tasks at a larger granularity. Note that + /// Rayon automatically normally attempts to adjust the size of + /// parallel splits to reduce overhead, so this should not be + /// needed. + /// + /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len + fn min_len(&self) -> usize { + 1 } - /// Creates a parallel iterator that both filters and maps. - /// - /// See [`Iterator::filter_map()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.filter_map) - fn filter_map(self, f: F) -> FilterMap - where - F: FnMut(BatchIter::Item) -> Option, - { - FilterMap { iter: self, f } + /// The maximum number of items that we will process + /// sequentially. Defaults to MAX, which means that we can choose + /// not to split at all. This can be lowered using the + /// [`with_max_len`] method, which will force us to create more + /// parallel tasks. Note that Rayon automatically normally + /// attempts to adjust the size of parallel splits to reduce + /// overhead, so this should not be needed. + /// + /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len + fn max_len(&self) -> usize { + usize::MAX } - /// Creates a parallel iterator that works like map, but flattens - /// nested structure. + /// Split into two producers; one produces items `0..index`, the + /// other `index..N`. Index must be less than or equal to `N`. + fn split_at(self, index: usize) -> (Self, Self); + + /// Iterate the producer, feeding each element to `folder`, and + /// stop when the folder is full (or all elements have been consumed). /// - /// See [`Iterator::flat_map()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.flat_map) - fn flat_map(self, f: F) -> FlatMap + /// The provided implementation is sufficient for most iterables. + fn fold_with(self, folder: F) -> F where - F: FnMut(BatchIter::Item) -> U, - U: IntoIterator, + F: Folder, { - FlatMap { iter: self, f } + folder.consume_iter(self.into_iter()) } +} - /// Creates a parallel iterator that flattens nested structure. - /// - /// See [`Iterator::flatten()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.flatten) - fn flatten(self) -> Flatten +/// The `ProducerCallback` trait is a kind of generic closure, +/// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in +/// the plumbing README][r] for more details. +/// +/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback +/// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html +pub trait ProducerCallback { + /// The type of value returned by this callback. Analogous to + /// [`Output` from the `FnOnce` trait][Output]. + /// + /// [Output]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html#associatedtype.Output + type Output; + + /// Invokes the callback with the given producer as argument. The + /// key point of this trait is that this method is generic over + /// `P`, and hence implementors must be defined for any producer. + fn callback

(self, producer: P) -> Self::Output where - BatchIter::Item: IntoIterator, - { - Flatten { iter: self } - } + P: Producer; +} - /// Creates a parallel iterator which ends after the first None. - /// - /// See [`Iterator::fuse()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fuse) - fn fuse(self) -> Fuse { - Fuse { iter: Some(self) } - } +pub trait Reducer { + /// Reduce two final results into one; this is executed after a + /// split. + fn reduce(self, left: Result, right: Result) -> Result; +} - /// Does something with each item of a parallel iterator, passing - /// the value on. - /// - /// See [`Iterator::inspect()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.inspect) - fn inspect(self, f: F) -> Inspect - where - F: FnMut(&BatchIter::Item), - { - Inspect { iter: self, f } - } +pub trait Consumer: Send + Sized { + /// The type of folder that this consumer can be converted into. + type Folder: Folder; - /// Borrows a parallel iterator, rather than consuming it. - /// - /// See [`Iterator::by_ref()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.by_ref) - fn by_ref(&mut self) -> &mut Self { - self - } + /// The type of reducer that is produced if this consumer is split. + type Reducer: Reducer; - /// Transforms a parallel iterator into a collection. - /// - /// See [`Iterator::collect()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.collect) - // TODO: Investigate optimizations for less copying - fn collect(mut self, pool: &TaskPool) -> C - where - C: FromIterator, - BatchIter::Item: Send + 'static, - { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.collect::>() }); - } - }) - .into_iter() - .flatten() - .collect() - } + /// The type of result that this consumer will ultimately produce. + type Result: Send; - /// Consumes a parallel iterator, creating two collections from it. - /// - /// See [`Iterator::partition()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.partition) - // TODO: Investigate optimizations for less copying - fn partition(mut self, pool: &TaskPool, f: F) -> (C, C) - where - C: Default + Extend + Send, - F: FnMut(&BatchIter::Item) -> bool + Send + Sync + Clone, - BatchIter::Item: Send + 'static, - { - let (mut a, mut b) = <(C, C)>::default(); - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - let newf = f.clone(); - s.spawn(async move { batch.partition::, F>(newf) }); - } - }) - .into_iter() - .for_each(|(c, d)| { - a.extend(c); - b.extend(d); - }); - (a, b) - } + /// Divide the consumer into two consumers, one processing items + /// `0..index` and one processing items from `index..`. Also + /// produces a reducer that can be used to reduce the results at + /// the end. + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer); + + /// Convert the consumer into a folder that can consume items + /// sequentially, eventually producing a final result. + fn into_folder(self) -> Self::Folder; + + /// Hint whether this `Consumer` would like to stop processing + /// further items, e.g. if a search has been completed. + fn full(&self) -> bool; +} + +/// A stateless consumer can be freely copied. These consumers can be +/// used like regular consumers, but they also support a +/// `split_off_left` method that does not take an index to split, but +/// simply splits at some arbitrary point (`for_each`, for example, +/// produces an unindexed consumer). +pub trait UnindexedConsumer: Consumer { + /// Splits off a "left" consumer and returns it. The `self` + /// consumer should then be used to consume the "right" portion of + /// the data. (The ordering matters for methods like find_first -- + /// values produced by the returned value are given precedence + /// over values produced by `self`.) Once the left and right + /// halves have been fully consumed, you should reduce the results + /// with the result of `to_reducer`. + fn split_off_left(&self) -> Self; + + /// Creates a reducer that can be used to combine the results from + /// a split consumer. + fn to_reducer(&self) -> Self::Reducer; +} - /// Repeatedly applies a function to items of each batch of a parallel - /// iterator, producing a Vec of final values. +pub trait ParallelIterator: Sized + Send { + /// The type of item that this parallel iterator produces. + /// For example, if you use the [`for_each`] method, this is the type of + /// item that your closure will be invoked with. /// - /// *Note that this folds each batch independently and returns a Vec of - /// results (in batch order).* + /// [`for_each`]: #method.for_each + type Item: Send; + /// Executes `OP` on each item produced by the iterator, in parallel. /// - /// See [`Iterator::fold()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold) - fn fold(mut self, pool: &TaskPool, init: C, f: F) -> Vec - where - F: FnMut(C, BatchIter::Item) -> C + Send + Sync + Clone, - C: Clone + Send + Sync + 'static, - { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - let newf = f.clone(); - let newi = init.clone(); - s.spawn(async move { batch.fold(newi, newf) }); - } - }) - } - - /// Tests if every element of the parallel iterator matches a predicate. + /// # Examples /// - /// *Note that all is **not** short circuiting.* + /// ``` + /// use rayon::prelude::*; /// - /// See [`Iterator::all()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.all) - fn all(mut self, pool: &TaskPool, f: F) -> bool + /// (0..100).into_par_iter().for_each(|x| println!("{:?}", x)); + /// ``` + fn for_each(self, op: OP) where - F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone, + OP: Fn(Self::Item) + Sync + Send, { - pool.scope(|s| { - while let Some(mut batch) = self.next_batch() { - let newf = f.clone(); - s.spawn(async move { batch.all(newf) }); - } - }) - .into_iter() - .all(std::convert::identity) + for_each::for_each(self, &op) } - /// Tests if any element of the parallel iterator matches a predicate. - /// - /// *Note that any is **not** short circuiting.* - /// - /// See [`Iterator::any()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.any) - fn any(mut self, pool: &TaskPool, f: F) -> bool + fn collect(self) -> C where - F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone, + C: FromParallelIterator, { - pool.scope(|s| { - while let Some(mut batch) = self.next_batch() { - let newf = f.clone(); - s.spawn(async move { batch.any(newf) }); - } - }) - .into_iter() - .any(std::convert::identity) + C::from_par_iter(self) } - /// Searches for an element in a parallel iterator, returning its index. + /// Internal method used to define the behavior of this parallel + /// iterator. You should not need to call this directly. + /// + /// This method causes the iterator `self` to start producing + /// items and to feed them to the consumer `consumer` one by one. + /// It may split the consumer before doing so to create the + /// opportunity to produce in parallel. /// - /// *Note that position consumes the whole iterator.* + /// See the [README] for more details on the internals of parallel + /// iterators. /// - /// See [`Iterator::position()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.position) - // TODO: Investigate optimizations for less copying - fn position(mut self, pool: &TaskPool, f: F) -> Option + /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md + fn drive_unindexed(self, consumer: C) -> C::Result where - F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone, - { - let poses = pool.scope(|s| { - while let Some(batch) = self.next_batch() { - let mut newf = f.clone(); - s.spawn(async move { - let mut len = 0; - let mut pos = None; - for item in batch { - if pos.is_none() && newf(item) { - pos = Some(len); - } - len += 1; - } - (len, pos) - }); - } - }); - let mut start = 0; - for (len, pos) in poses { - if let Some(pos) = pos { - return Some(start + pos); - } - start += len; - } + C: UnindexedConsumer; + + /// Internal method used to define the behavior of this parallel + /// iterator. You should not need to call this directly. + /// + /// Returns the number of items produced by this iterator, if known + /// statically. This can be used by consumers to trigger special fast + /// paths. Therefore, if `Some(_)` is returned, this iterator must only + /// use the (indexed) `Consumer` methods when driving a consumer, such + /// as `split_at()`. Calling `UnindexedConsumer::split_off_left()` or + /// other `UnindexedConsumer` methods -- or returning an inaccurate + /// value -- may result in panics. + /// + /// This method is currently used to optimize `collect` for want + /// of true Rust specialization; it may be removed when + /// specialization is stable. + fn opt_len(&self) -> Option { None } +} - /// Returns the maximum item of a parallel iterator. +pub trait IndexedParallelIterator: ParallelIterator { + /// Produces an exact count of how many items this iterator will + /// produce, presuming no panic occurs. /// - /// See [`Iterator::max()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.max) - fn max(mut self, pool: &TaskPool) -> Option - where - BatchIter::Item: Ord + Send + 'static, - { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.max() }); - } - }) - .into_iter() - .flatten() - .max() - } - - /// Returns the minimum item of a parallel iterator. + /// # Examples /// - /// See [`Iterator::min()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.min) - fn min(mut self, pool: &TaskPool) -> Option - where - BatchIter::Item: Ord + Send + 'static, - { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.min() }); - } - }) - .into_iter() - .flatten() - .min() - } + /// ``` + /// use rayon::prelude::*; + /// + /// let par_iter = (0..100).into_par_iter().zip(vec![0; 10]); + /// assert_eq!(par_iter.len(), 10); + /// + /// let vec: Vec<_> = par_iter.collect(); + /// assert_eq!(vec.len(), 10); + /// ``` + fn len(&self) -> usize; - /// Returns the item that gives the maximum value from the specified function. + /// Internal method used to define the behavior of this parallel + /// iterator. You should not need to call this directly. /// - /// See [`Iterator::max_by_key()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.max_by_key) - fn max_by_key(mut self, pool: &TaskPool, f: F) -> Option - where - R: Ord, - F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, - BatchIter::Item: Send + 'static, - { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - let newf = f.clone(); - s.spawn(async move { batch.max_by_key(newf) }); - } - }) - .into_iter() - .flatten() - .max_by_key(f) - } + /// This method causes the iterator `self` to start producing + /// items and to feed them to the consumer `consumer` one by one. + /// It may split the consumer before doing so to create the + /// opportunity to produce in parallel. If a split does happen, it + /// will inform the consumer of the index where the split should + /// occur (unlike `ParallelIterator::drive_unindexed()`). + /// + /// See the [README] for more details on the internals of parallel + /// iterators. + /// + /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md + fn drive>(self, consumer: C) -> C::Result; - /// Returns the item that gives the maximum value with respect to the specified comparison - /// function. + /// Internal method used to define the behavior of this parallel + /// iterator. You should not need to call this directly. + /// + /// This method converts the iterator into a producer P and then + /// invokes `callback.callback()` with P. Note that the type of + /// this producer is not defined as part of the API, since + /// `callback` must be defined generically for all producers. This + /// allows the producer type to contain references; it also means + /// that parallel iterators can adjust that type without causing a + /// breaking change. /// - /// See [`Iterator::max_by()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.max_by) - fn max_by(mut self, pool: &TaskPool, f: F) -> Option + /// See the [README] for more details on the internals of parallel + /// iterators. + /// + /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md + fn with_producer>(self, callback: CB) -> CB::Output; +} + +/// `FromParallelIterator` implements the creation of a collection +/// from a [`ParallelIterator`]. By implementing +/// `FromParallelIterator` for a given type, you define how it will be +/// created from an iterator. +/// +/// `FromParallelIterator` is used through [`ParallelIterator`]'s [`collect()`] method. +/// +/// [`ParallelIterator`]: trait.ParallelIterator.html +/// [`collect()`]: trait.ParallelIterator.html#method.collect +/// +/// # Examples +/// +/// Implementing `FromParallelIterator` for your type: +/// +/// ``` +/// use rayon::prelude::*; +/// use std::mem; +/// +/// struct BlackHole { +/// mass: usize, +/// } +/// +/// impl FromParallelIterator for BlackHole { +/// fn from_par_iter(par_iter: I) -> Self +/// where I: IntoParallelIterator +/// { +/// let par_iter = par_iter.into_par_iter(); +/// BlackHole { +/// mass: par_iter.count() * mem::size_of::(), +/// } +/// } +/// } +/// +/// let bh: BlackHole = (0i32..1000).into_par_iter().collect(); +/// assert_eq!(bh.mass, 4000); +/// ``` +pub trait FromParallelIterator +where + T: Send, +{ + /// Creates an instance of the collection from the parallel iterator `par_iter`. + /// + /// If your collection is not naturally parallel, the easiest (and + /// fastest) way to do this is often to collect `par_iter` into a + /// [`LinkedList`] or other intermediate data structure and then + /// sequentially extend your collection. However, a more 'native' + /// technique is to use the [`par_iter.fold`] or + /// [`par_iter.fold_with`] methods to create the collection. + /// Alternatively, if your collection is 'natively' parallel, you + /// can use `par_iter.for_each` to process each element in turn. + /// + /// [`LinkedList`]: https://doc.rust-lang.org/std/collections/struct.LinkedList.html + /// [`par_iter.fold`]: trait.ParallelIterator.html#method.fold + /// [`par_iter.fold_with`]: trait.ParallelIterator.html#method.fold_with + /// [`par_iter.for_each`]: trait.ParallelIterator.html#method.for_each + fn from_par_iter(par_iter: I) -> Self where - F: FnMut(&BatchIter::Item, &BatchIter::Item) -> std::cmp::Ordering + Send + Sync + Clone, - BatchIter::Item: Send + 'static, - { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - let newf = f.clone(); - s.spawn(async move { batch.max_by(newf) }); - } - }) - .into_iter() - .flatten() - .max_by(f) - } + I: IntoParallelIterator; +} - /// Returns the item that gives the minimum value from the specified function. +/// `IntoParallelIterator` implements the conversion to a [`ParallelIterator`]. +/// +/// By implementing `IntoParallelIterator` for a type, you define how it will +/// transformed into an iterator. This is a parallel version of the standard +/// library's [`std::iter::IntoIterator`] trait. +/// +/// [`ParallelIterator`]: trait.ParallelIterator.html +/// [`std::iter::IntoIterator`]: https://doc.rust-lang.org/std/iter/trait.IntoIterator.html +pub trait IntoParallelIterator { + /// The parallel iterator type that will be created. + type Iter: ParallelIterator; + + /// The type of item that the parallel iterator will produce. + type Item: Send; + + /// Converts `self` into a parallel iterator. /// - /// See [`Iterator::min_by_key()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.min_by_key) - fn min_by_key(mut self, pool: &TaskPool, f: F) -> Option - where - R: Ord, - F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, - BatchIter::Item: Send + 'static, - { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - let newf = f.clone(); - s.spawn(async move { batch.min_by_key(newf) }); - } - }) - .into_iter() - .flatten() - .min_by_key(f) + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// println!("counting in parallel:"); + /// (0..100).into_par_iter() + /// .for_each(|i| println!("{}", i)); + /// ``` + /// + /// This conversion is often implicit for arguments to methods like [`zip`]. + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let v: Vec<_> = (0..5).into_par_iter().zip(5..10).collect(); + /// assert_eq!(v, [(0, 5), (1, 6), (2, 7), (3, 8), (4, 9)]); + /// ``` + /// + /// [`zip`]: trait.IndexedParallelIterator.html#method.zip + fn into_par_iter(self) -> Self::Iter; +} + +impl IntoParallelIterator for T { + type Iter = T; + type Item = T::Item; + + fn into_par_iter(self) -> T { + self } +} - /// Returns the item that gives the minimum value with respect to the specified comparison - /// function. +/// `IntoParallelRefIterator` implements the conversion to a +/// [`ParallelIterator`], providing shared references to the data. +/// +/// This is a parallel version of the `iter()` method +/// defined by various collections. +/// +/// This trait is automatically implemented +/// `for I where &I: IntoParallelIterator`. In most cases, users +/// will want to implement [`IntoParallelIterator`] rather than implement +/// this trait directly. +/// +/// [`ParallelIterator`]: trait.ParallelIterator.html +/// [`IntoParallelIterator`]: trait.IntoParallelIterator.html +pub trait IntoParallelRefIterator<'data> { + /// The type of the parallel iterator that will be returned. + type Iter: ParallelIterator; + + /// The type of item that the parallel iterator will produce. + /// This will typically be an `&'data T` reference type. + type Item: Send + 'data; + + /// Converts `self` into a parallel iterator. /// - /// See [`Iterator::min_by()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.min_by) - fn min_by(mut self, pool: &TaskPool, f: F) -> Option - where - F: FnMut(&BatchIter::Item, &BatchIter::Item) -> std::cmp::Ordering + Send + Sync + Clone, - BatchIter::Item: Send + 'static, - { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - let newf = f.clone(); - s.spawn(async move { batch.min_by(newf) }); - } - }) - .into_iter() - .flatten() - .min_by(f) + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let v: Vec<_> = (0..100).collect(); + /// assert_eq!(v.par_iter().sum::(), 100 * 99 / 2); + /// + /// // `v.par_iter()` is shorthand for `(&v).into_par_iter()`, + /// // producing the exact same references. + /// assert!(v.par_iter().zip(&v) + /// .all(|(a, b)| std::ptr::eq(a, b))); + /// ``` + fn par_iter(&'data self) -> Self::Iter; +} + +impl<'data, I: 'data + ?Sized> IntoParallelRefIterator<'data> for I +where + &'data I: IntoParallelIterator, +{ + type Iter = <&'data I as IntoParallelIterator>::Iter; + type Item = <&'data I as IntoParallelIterator>::Item; + + fn par_iter(&'data self) -> Self::Iter { + self.into_par_iter() } +} + +pub trait IntoParallelRefMutIterator<'data> { + /// The type of iterator that will be created. + type Iter: ParallelIterator; - /// Creates a parallel iterator which copies all of its items. + /// The type of item that will be produced; this is typically an + /// `&'data mut T` reference. + type Item: Send + 'data; + + /// Creates the parallel iterator from `self`. /// - /// See [`Iterator::copied()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.copied) - fn copied<'a, T>(self) -> Copied - where - Self: ParallelIterator, - T: 'a + Copy, - { - Copied { iter: self } + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let mut v = vec![0usize; 5]; + /// v.par_iter_mut().enumerate().for_each(|(i, x)| *x = i); + /// assert_eq!(v, [0, 1, 2, 3, 4]); + /// ``` + fn par_iter_mut(&'data mut self) -> Self::Iter; +} + +impl<'data, I: 'data + ?Sized> IntoParallelRefMutIterator<'data> for I +where + &'data mut I: IntoParallelIterator, +{ + type Iter = <&'data mut I as IntoParallelIterator>::Iter; + type Item = <&'data mut I as IntoParallelIterator>::Item; + + fn par_iter_mut(&'data mut self) -> Self::Iter { + self.into_par_iter() } +} - /// Creates a parallel iterator which clones all of its items. +/// `ParallelExtend` extends an existing collection with items from a [`ParallelIterator`]. +/// +/// [`ParallelIterator`]: trait.ParallelIterator.html +/// +/// # Examples +/// +/// Implementing `ParallelExtend` for your type: +/// +/// ``` +/// use rayon::prelude::*; +/// use std::mem; +/// +/// struct BlackHole { +/// mass: usize, +/// } +/// +/// impl ParallelExtend for BlackHole { +/// fn par_extend(&mut self, par_iter: I) +/// where I: IntoParallelIterator +/// { +/// let par_iter = par_iter.into_par_iter(); +/// self.mass += par_iter.count() * mem::size_of::(); +/// } +/// } +/// +/// let mut bh = BlackHole { mass: 0 }; +/// bh.par_extend(0i32..1000); +/// assert_eq!(bh.mass, 4000); +/// bh.par_extend(0i64..10); +/// assert_eq!(bh.mass, 4080); +/// ``` +pub trait ParallelExtend +where + T: Send, +{ + /// Extends an instance of the collection with the elements drawn + /// from the parallel iterator `par_iter`. + /// + /// # Examples /// - /// See [`Iterator::cloned()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.cloned) - fn cloned<'a, T>(self) -> Cloned + /// ``` + /// use rayon::prelude::*; + /// + /// let mut vec = vec![]; + /// vec.par_extend(0..5); + /// vec.par_extend((0..5).into_par_iter().map(|i| i * i)); + /// assert_eq!(vec, [0, 1, 2, 3, 4, 0, 1, 4, 9, 16]); + /// ``` + fn par_extend(&mut self, par_iter: I) where - Self: ParallelIterator, - T: 'a + Copy, - { - Cloned { iter: self } + I: IntoParallelIterator; +} + +/// A splitter controls the policy for splitting into smaller work items. +/// +/// Thief-splitting is an adaptive policy that starts by splitting into +/// enough jobs for every worker thread, and then resets itself whenever a +/// job is actually stolen into a different thread. +#[derive(Clone, Copy)] +struct Splitter { + /// The `splits` tell us approximately how many remaining times we'd + /// like to split this job. We always just divide it by two though, so + /// the effective number of pieces will be `next_power_of_two()`. + splits: usize, +} + +impl Splitter { + #[inline] + fn new() -> Splitter { + Splitter { + splits: crate::task_pool::compute_task_pool_thread_num(), + } } - /// Repeats a parallel iterator endlessly. - /// - /// See [`Iterator::cycle()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.cycle) - fn cycle(self) -> Cycle - where - Self: Clone, - { - Cycle { - iter: self, - curr: None, + #[inline] + fn try_split(&mut self, stolen: bool) -> bool { + let Splitter { splits } = *self; + + if stolen { + // This job was stolen! Reset the number of desired splits to the + // thread count, if that's more than we had remaining anyway. + self.splits = cmp::max( + crate::task_pool::compute_task_pool_thread_num(), + self.splits / 2, + ); + true + } else if splits > 0 { + // We have splits remaining, make it so. + self.splits /= 2; + true + } else { + // Not stolen, and no more splits -- we're done! + false } } +} - /// Sums the items of a parallel iterator. - /// - /// See [`Iterator::sum()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.sum) - fn sum(mut self, pool: &TaskPool) -> R - where - S: std::iter::Sum + Send + 'static, - R: std::iter::Sum, - { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.sum() }); - } - }) - .into_iter() - .sum() +/// The length splitter is built on thief-splitting, but additionally takes +/// into account the remaining length of the iterator. +#[derive(Clone, Copy)] +struct LengthSplitter { + inner: Splitter, + + /// The smallest we're willing to divide into. Usually this is just 1, + /// but you can choose a larger working size with `with_min_len()`. + min: usize, +} + +impl LengthSplitter { + /// Creates a new splitter based on lengths. + /// + /// The `min` is a hard lower bound. We'll never split below that, but + /// of course an iterator might start out smaller already. + /// + /// The `max` is an upper bound on the working size, used to determine + /// the minimum number of times we need to split to get under that limit. + /// The adaptive algorithm may very well split even further, but never + /// smaller than the `min`. + #[inline] + fn new(min: usize, max: usize, len: usize) -> LengthSplitter { + let mut splitter = LengthSplitter { + inner: Splitter::new(), + min: cmp::max(min, 1), + }; + + // Divide the given length by the max working length to get the minimum + // number of splits we need to get under that max. This rounds down, + // but the splitter actually gives `next_power_of_two()` pieces anyway. + // e.g. len 12345 / max 100 = 123 min_splits -> 128 pieces. + let min_splits = len / cmp::max(max, 1); + + // Only update the value if it's not splitting enough already. + if min_splits > splitter.inner.splits { + splitter.inner.splits = min_splits; + } + + splitter } - /// Multiplies all the items of a parallel iterator. - /// - /// See [`Iterator::product()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.product) - fn product(mut self, pool: &TaskPool) -> R - where - S: std::iter::Product + Send + 'static, - R: std::iter::Product, - { - pool.scope(|s| { - while let Some(batch) = self.next_batch() { - s.spawn(async move { batch.product() }); - } - }) - .into_iter() - .product() + #[inline] + fn try_split(&mut self, len: usize, stolen: bool) -> bool { + // If splitting wouldn't make us too small, try the inner splitter. + len / 2 >= self.min && self.inner.try_split(stolen) } } diff --git a/crates/bevy_tasks/src/iter/noop.rs b/crates/bevy_tasks/src/iter/noop.rs new file mode 100644 index 0000000000000..23acf8d92c099 --- /dev/null +++ b/crates/bevy_tasks/src/iter/noop.rs @@ -0,0 +1,8 @@ +use super::Reducer; + + +pub(super) struct NoopReducer; + +impl Reducer<()> for NoopReducer { + fn reduce(self, _left: (), _right: ()) {} +} diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 7b998c0db3909..9cce0f22d0fac 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -2,7 +2,6 @@ #![doc = include_str!("../README.md")] mod slice; -pub use slice::{ParallelSlice, ParallelSliceMut}; mod task; pub use task::Task; @@ -33,17 +32,14 @@ pub use async_io::block_on; pub use futures_lite::future::block_on; mod iter; -pub use iter::ParallelIterator; - pub use futures_lite; +pub use iter::*; #[allow(missing_docs)] pub mod prelude { #[doc(hidden)] pub use crate::{ block_on, - iter::ParallelIterator, - slice::{ParallelSlice, ParallelSliceMut}, usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}, }; } diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index 8410478322ee0..cc54147830269 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -1,240 +1,149 @@ -use super::TaskPool; - -/// Provides functions for mapping read-only slices across a provided [`TaskPool`]. -pub trait ParallelSlice: AsRef<[T]> { - /// Splits the slice in chunks of size `chunks_size` or less and maps the chunks - /// in parallel across the provided `task_pool`. One task is spawned in the task pool - /// for every chunk. - /// - /// Returns a `Vec` of the mapped results in the same order as the input. - /// - /// # Example - /// - /// ``` - /// # use bevy_tasks::prelude::*; - /// # use bevy_tasks::TaskPool; - /// let task_pool = TaskPool::new(); - /// let counts = (0..10000).collect::>(); - /// let incremented = counts.par_chunk_map(&task_pool, 100, |chunk| { - /// let mut results = Vec::new(); - /// for count in chunk { - /// results.push(*count + 2); - /// } - /// results - /// }); - /// # let flattened: Vec<_> = incremented.into_iter().flatten().collect(); - /// # assert_eq!(flattened, (2..10002).collect::>()); - /// ``` - /// - /// # See Also - /// - /// - [`ParallelSliceMut::par_chunk_map_mut`] for mapping mutable slices. - /// - [`ParallelSlice::par_splat_map`] for mapping when a specific chunk size is unknown. - fn par_chunk_map(&self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec - where - F: Fn(&[T]) -> R + Send + Sync, - R: Send + 'static, - { - let slice = self.as_ref(); - let f = &f; - task_pool.scope(|scope| { - for chunk in slice.chunks(chunk_size) { - scope.spawn(async move { f(chunk) }); - } - }) - } - - /// Splits the slice into a maximum of `max_tasks` chunks, and maps the chunks in parallel - /// across the provided `task_pool`. One task is spawned in the task pool for every chunk. - /// - /// If `max_tasks` is `None`, this function will attempt to use one chunk per thread in - /// `task_pool`. - /// - /// Returns a `Vec` of the mapped results in the same order as the input. - /// - /// # Example - /// - /// ``` - /// # use bevy_tasks::prelude::*; - /// # use bevy_tasks::TaskPool; - /// let task_pool = TaskPool::new(); - /// let counts = (0..10000).collect::>(); - /// let incremented = counts.par_splat_map(&task_pool, None, |chunk| { - /// let mut results = Vec::new(); - /// for count in chunk { - /// results.push(*count + 2); - /// } - /// results - /// }); - /// # let flattened: Vec<_> = incremented.into_iter().flatten().collect(); - /// # assert_eq!(flattened, (2..10002).collect::>()); - /// ``` - /// - /// # See Also - /// - /// [`ParallelSliceMut::par_splat_map_mut`] for mapping mutable slices. - /// [`ParallelSlice::par_chunk_map`] for mapping when a specific chunk size is desirable. - fn par_splat_map(&self, task_pool: &TaskPool, max_tasks: Option, f: F) -> Vec +use crate::iter::{ + bridge, Consumer, IndexedParallelIterator, IntoParallelIterator, Producer, ProducerCallback, + UnindexedConsumer, ParallelIterator, +}; + +impl<'data, T: Sync + 'data> IntoParallelIterator for &'data [T] { + type Item = &'data T; + type Iter = Iter<'data, T>; + + fn into_par_iter(self) -> Self::Iter { + Iter { slice: self } + } +} + +impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut [T] { + type Item = &'data mut T; + type Iter = IterMut<'data, T>; + + fn into_par_iter(self) -> Self::Iter { + IterMut { slice: self } + } +} + +/// Parallel iterator over immutable items in a slice +#[derive(Debug)] +pub struct Iter<'data, T: Sync> { + slice: &'data [T], +} + +impl<'data, T: Sync> Clone for Iter<'data, T> { + fn clone(&self) -> Self { + Iter { ..*self } + } +} + +impl<'data, T: Sync + 'data> ParallelIterator for Iter<'data, T> { + type Item = &'data T; + + fn drive_unindexed(self, consumer: C) -> C::Result where - F: Fn(&[T]) -> R + Send + Sync, - R: Send + 'static, + C: UnindexedConsumer, { - let slice = self.as_ref(); - let chunk_size = std::cmp::max( - 1, - std::cmp::max( - slice.len() / task_pool.thread_num(), - slice.len() / max_tasks.unwrap_or(usize::MAX), - ), - ); + bridge(self, consumer) + } - slice.par_chunk_map(task_pool, chunk_size, f) + fn opt_len(&self) -> Option { + Some(self.len()) } } -impl ParallelSlice for S where S: AsRef<[T]> {} - -/// Provides functions for mapping mutable slices across a provided [`TaskPool`]. -pub trait ParallelSliceMut: AsMut<[T]> { - /// Splits the slice in chunks of size `chunks_size` or less and maps the chunks - /// in parallel across the provided `task_pool`. One task is spawned in the task pool - /// for every chunk. - /// - /// Returns a `Vec` of the mapped results in the same order as the input. - /// - /// # Example - /// - /// ``` - /// # use bevy_tasks::prelude::*; - /// # use bevy_tasks::TaskPool; - /// let task_pool = TaskPool::new(); - /// let mut counts = (0..10000).collect::>(); - /// let incremented = counts.par_chunk_map_mut(&task_pool, 100, |chunk| { - /// let mut results = Vec::new(); - /// for count in chunk { - /// *count += 5; - /// results.push(*count - 2); - /// } - /// results - /// }); - /// - /// assert_eq!(counts, (5..10005).collect::>()); - /// # let flattened: Vec<_> = incremented.into_iter().flatten().collect(); - /// # assert_eq!(flattened, (3..10003).collect::>()); - /// ``` - /// - /// # See Also - /// - /// [`ParallelSlice::par_chunk_map`] for mapping immutable slices. - /// [`ParallelSliceMut::par_splat_map_mut`] for mapping when a specific chunk size is unknown. - fn par_chunk_map_mut(&mut self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec +impl<'data, T: Sync + 'data> IndexedParallelIterator for Iter<'data, T> { + fn drive(self, consumer: C) -> C::Result where - F: Fn(&mut [T]) -> R + Send + Sync, - R: Send + 'static, + C: Consumer, { - let slice = self.as_mut(); - let f = &f; - task_pool.scope(|scope| { - for chunk in slice.chunks_mut(chunk_size) { - scope.spawn(async move { f(chunk) }); - } - }) - } - - /// Splits the slice into a maximum of `max_tasks` chunks, and maps the chunks in parallel - /// across the provided `task_pool`. One task is spawned in the task pool for every chunk. - /// - /// If `max_tasks` is `None`, this function will attempt to use one chunk per thread in - /// `task_pool`. - /// - /// Returns a `Vec` of the mapped results in the same order as the input. - /// - /// # Example - /// - /// ``` - /// # use bevy_tasks::prelude::*; - /// # use bevy_tasks::TaskPool; - /// let task_pool = TaskPool::new(); - /// let mut counts = (0..10000).collect::>(); - /// let incremented = counts.par_splat_map_mut(&task_pool, None, |chunk| { - /// let mut results = Vec::new(); - /// for count in chunk { - /// *count += 5; - /// results.push(*count - 2); - /// } - /// results - /// }); - /// - /// assert_eq!(counts, (5..10005).collect::>()); - /// # let flattened: Vec<_> = incremented.into_iter().flatten().collect::>(); - /// # assert_eq!(flattened, (3..10003).collect::>()); - /// ``` - /// - /// # See Also - /// - /// [`ParallelSlice::par_splat_map`] for mapping immutable slices. - /// [`ParallelSliceMut::par_chunk_map_mut`] for mapping when a specific chunk size is desirable. - fn par_splat_map_mut( - &mut self, - task_pool: &TaskPool, - max_tasks: Option, - f: F, - ) -> Vec + bridge(self, consumer) + } + + fn len(&self) -> usize { + self.slice.len() + } + + fn with_producer(self, callback: CB) -> CB::Output where - F: Fn(&mut [T]) -> R + Send + Sync, - R: Send + 'static, + CB: ProducerCallback, { - let mut slice = self.as_mut(); - let chunk_size = std::cmp::max( - 1, - std::cmp::max( - slice.len() / task_pool.thread_num(), - slice.len() / max_tasks.unwrap_or(usize::MAX), - ), - ); + callback.callback(IterProducer { slice: self.slice }) + } +} + +struct IterProducer<'data, T: Sync> { + slice: &'data [T], +} + +impl<'data, T: 'data + Sync> Producer for IterProducer<'data, T> { + type Item = &'data T; + type IntoIter = ::std::slice::Iter<'data, T>; + + fn into_iter(self) -> Self::IntoIter { + self.slice.iter() + } - slice.par_chunk_map_mut(task_pool, chunk_size, f) + fn split_at(self, index: usize) -> (Self, Self) { + let (left, right) = self.slice.split_at(index); + (IterProducer { slice: left }, IterProducer { slice: right }) } } -impl ParallelSliceMut for S where S: AsMut<[T]> {} +/// Parallel iterator over mutable items in a slice +#[derive(Debug)] +pub struct IterMut<'data, T: Send> { + slice: &'data mut [T], +} -#[cfg(test)] -mod tests { - use crate::*; +impl<'data, T: Send + 'data> ParallelIterator for IterMut<'data, T> { + type Item = &'data mut T; - #[test] - fn test_par_chunks_map() { - let v = vec![42; 1000]; - let task_pool = TaskPool::new(); - let outputs = v.par_splat_map(&task_pool, None, |numbers| -> i32 { numbers.iter().sum() }); + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge(self, consumer) + } - let mut sum = 0; - for output in outputs { - sum += output; - } + fn opt_len(&self) -> Option { + Some(self.len()) + } +} - assert_eq!(sum, 1000 * 42); +impl<'data, T: Send + 'data> IndexedParallelIterator for IterMut<'data, T> { + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + bridge(self, consumer) } - #[test] - fn test_par_chunks_map_mut() { - let mut v = vec![42; 1000]; - let task_pool = TaskPool::new(); + fn len(&self) -> usize { + self.slice.len() + } - let outputs = v.par_splat_map_mut(&task_pool, None, |numbers| -> i32 { - for number in numbers.iter_mut() { - *number *= 2; - } - numbers.iter().sum() - }); + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + callback.callback(IterMutProducer { slice: self.slice }) + } +} + +struct IterMutProducer<'data, T: Send> { + slice: &'data mut [T], +} - let mut sum = 0; - for output in outputs { - sum += output; - } +impl<'data, T: 'data + Send> Producer for IterMutProducer<'data, T> { + type Item = &'data mut T; + type IntoIter = ::std::slice::IterMut<'data, T>; + + fn into_iter(self) -> Self::IntoIter { + self.slice.iter_mut() + } - assert_eq!(sum, 1000 * 42 * 2); - assert_eq!(v[0], 84); + fn split_at(self, index: usize) -> (Self, Self) { + let (left, right) = self.slice.split_at_mut(index); + ( + IterMutProducer { slice: left }, + IterMutProducer { slice: right }, + ) } } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index d55a9476b0076..c3da4520f5c8b 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -14,7 +14,7 @@ use futures_lite::FutureExt; use crate::{ block_on, thread_executor::{ThreadExecutor, ThreadExecutorTicker}, - Task, + ComputeTaskPool, Task, }; struct CallOnDrop(Option>); @@ -675,6 +675,11 @@ where } } +pub fn compute_task_pool_thread_num() -> usize { + ComputeTaskPool::try_get() + .map(|p| p.thread_num()) + .unwrap_or(1) +} #[cfg(test)] #[allow(clippy::disallowed_types)] mod tests { diff --git a/crates/bevy_tasks/src/vec.rs b/crates/bevy_tasks/src/vec.rs new file mode 100644 index 0000000000000..e69de29bb2d1d From c9d94df8d22796c768211b38d19c7f4aa30c66b0 Mon Sep 17 00:00:00 2001 From: re0312 Date: Sun, 21 Jan 2024 09:01:26 +0800 Subject: [PATCH 02/13] pool --- crates/bevy_tasks/examples/busy_behavior.rs | 3 +- crates/bevy_tasks/examples/vec_example.rs | 29 ++++++-- crates/bevy_tasks/src/iter/collect.rs | 2 +- crates/bevy_tasks/src/iter/mod.rs | 76 ++++++++++----------- crates/bevy_tasks/src/lib.rs | 3 +- crates/bevy_tasks/src/task_pool.rs | 5 -- crates/bevy_tasks/src/usages.rs | 7 ++ crates/bevy_tasks/src/vec.rs | 22 ++++++ 8 files changed, 96 insertions(+), 51 deletions(-) diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index c9f8fcf6494b4..c2fc2aabe5e99 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -8,11 +8,10 @@ use web_time::{Duration, Instant}; fn main() { let pool = TaskPoolBuilder::new() .thread_name("Busy Behavior ThreadPool".to_string()) - .num_threads(4) + .num_threads(20) .build(); let t0 = Instant::now(); - let a =vec![1,2,3]; pool.scope(|s| { for i in 0..40 { s.spawn(async move { diff --git a/crates/bevy_tasks/examples/vec_example.rs b/crates/bevy_tasks/examples/vec_example.rs index 9e6d6b5ee03c9..0a6faf38b6559 100644 --- a/crates/bevy_tasks/examples/vec_example.rs +++ b/crates/bevy_tasks/examples/vec_example.rs @@ -1,7 +1,28 @@ -use bevy_tasks::{IntoParallelRefMutIterator, ParallelIterator}; +use bevy_tasks::{ + ComputeTaskPool, IntoParallelRefIterator, ParallelIterator, TaskPool, TaskPoolBuilder, +}; +use web_time::{Duration, Instant}; fn main() { - let mut a = vec![2, 3, 4]; - let c = a.par_iter_mut(); - c.for_each(|a| {}); + ComputeTaskPool::get_or_init(|| { + TaskPoolBuilder::default() + .num_threads(20) + .thread_name("Compute Task Pool".to_string()) + .build() + }); + let a: Vec<_> = (0..40).collect(); + let t0 = Instant::now(); + a.par_iter().for_each(|v| { + let now = Instant::now(); + while Instant::now() - now < Duration::from_millis(100) { + // spin, simulating work being done + } + println!( + "Thread {:?} index {} finished", + std::thread::current().id(), + v + ); + }); + let t1 = Instant::now(); + println!("all tasks finished in {} secs", (t1 - t0).as_secs_f32()); } diff --git a/crates/bevy_tasks/src/iter/collect.rs b/crates/bevy_tasks/src/iter/collect.rs index 2f280c5f16940..a9dc6c4f43ea7 100644 --- a/crates/bevy_tasks/src/iter/collect.rs +++ b/crates/bevy_tasks/src/iter/collect.rs @@ -1,5 +1,5 @@ +use super::{Consumer, Folder, ParallelIterator, Reducer, UnindexedConsumer}; use std::{marker::PhantomData, ptr, slice}; -use super::{Consumer, Folder, Reducer, UnindexedConsumer, ParallelIterator}; /// We need to transmit raw pointers across threads. It is possible to do this /// without any unsafe code by converting pointers to usize or to AtomicPtr diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index f3b37ca75802c..0c9cbf38a4ad9 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -1,6 +1,6 @@ use std::cmp; -use crate::task_pool::compute_task_pool_thread_num; +use crate::{compute_task_pool_thread_num, ComputeTaskPool, TaskPool}; mod collect; mod extend; @@ -47,6 +47,26 @@ where } } +fn join(pool: &TaskPool, op_a: A, op_b: B) -> (RA, RB) +where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, +{ + let mut ra = None; + let mut rb = None; + pool.scope(|s| { + s.spawn(async { + ra = Some(op_a()); + }); + s.spawn(async { + rb = Some(op_b()); + }); + }); + (ra.unwrap(), rb.unwrap()) +} + /// todo pub fn bridge_producer_consumer(len: usize, producer: P, consumer: C) -> C::Result where @@ -67,38 +87,21 @@ where P: Producer, C: Consumer, { - println!("222"); - todo!(); - // if consumer.full() { - // consumer.into_folder().complete() - // } else if splitter.try_split(len, migrated) { - // let mid = len / 2; - // let (left_producer, right_producer) = producer.split_at(mid); - // let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); - // let (left_result, right_result) = join_context( - // |context| { - // helper( - // mid, - // context.migrated(), - // splitter, - // left_producer, - // left_consumer, - // ) - // }, - // |context| { - // helper( - // len - mid, - // context.migrated(), - // splitter, - // right_producer, - // right_consumer, - // ) - // }, - // ); - // reducer.reduce(left_result, right_result) - // } else { - // producer.fold_with(consumer.into_folder()).complete() - // } + if consumer.full() { + consumer.into_folder().complete() + } else if splitter.try_split(len, migrated) { + let mid = len / 2; + let (left_producer, right_producer) = producer.split_at(mid); + let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); + let (left_result, right_result) = join( + ComputeTaskPool::get(), + || helper(mid, false, splitter, left_producer, left_consumer), + || helper(len - mid, false, splitter, right_producer, right_consumer), + ); + reducer.reduce(left_result, right_result) + } else { + producer.fold_with(consumer.into_folder()).complete() + } } } @@ -651,7 +654,7 @@ impl Splitter { #[inline] fn new() -> Splitter { Splitter { - splits: crate::task_pool::compute_task_pool_thread_num(), + splits: compute_task_pool_thread_num(), } } @@ -662,10 +665,7 @@ impl Splitter { if stolen { // This job was stolen! Reset the number of desired splits to the // thread count, if that's more than we had remaining anyway. - self.splits = cmp::max( - crate::task_pool::compute_task_pool_thread_num(), - self.splits / 2, - ); + self.splits = cmp::max(compute_task_pool_thread_num(), self.splits / 2); true } else if splits > 0 { // We have splits remaining, make it so. diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 9cce0f22d0fac..fbf4ed8934819 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -19,7 +19,7 @@ pub use single_threaded_task_pool::{FakeTask, Scope, TaskPool, TaskPoolBuilder, mod usages; #[cfg(not(target_arch = "wasm32"))] pub use usages::tick_global_task_pools_on_main_thread; -pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; +pub use usages::{compute_task_pool_thread_num, AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; #[cfg(all(not(target_arch = "wasm32"), feature = "multi-threaded"))] mod thread_executor; @@ -31,6 +31,7 @@ pub use async_io::block_on; #[cfg(not(feature = "async-io"))] pub use futures_lite::future::block_on; +pub mod vec; mod iter; pub use futures_lite; pub use iter::*; diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index c3da4520f5c8b..0838d28d57325 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -675,11 +675,6 @@ where } } -pub fn compute_task_pool_thread_num() -> usize { - ComputeTaskPool::try_get() - .map(|p| p.thread_num()) - .unwrap_or(1) -} #[cfg(test)] #[allow(clippy::disallowed_types)] mod tests { diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index fda3092b8ebc8..3421ae67d9124 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -74,6 +74,13 @@ taskpool! { (IO_TASK_POOL, IoTaskPool) } + +pub fn compute_task_pool_thread_num() -> usize { + ComputeTaskPool::try_get() + .map(|p| p.thread_num()) + .unwrap_or(1) +} + /// A function used by `bevy_core` to tick the global tasks pools on the main thread. /// This will run a maximum of 100 local tasks per executor per call to this function. /// diff --git a/crates/bevy_tasks/src/vec.rs b/crates/bevy_tasks/src/vec.rs index e69de29bb2d1d..465c2c0df20ef 100644 --- a/crates/bevy_tasks/src/vec.rs +++ b/crates/bevy_tasks/src/vec.rs @@ -0,0 +1,22 @@ +use crate::{ + slice::{Iter, IterMut}, + IntoParallelIterator, +}; + +impl<'data, T: Sync + 'data> IntoParallelIterator for &'data Vec { + type Item = &'data T; + type Iter = Iter<'data, T>; + + fn into_par_iter(self) -> Self::Iter { + <&[T]>::into_par_iter(self) + } +} + +impl<'data, T: Send + 'data> IntoParallelIterator for &'data mut Vec { + type Item = &'data mut T; + type Iter = IterMut<'data, T>; + + fn into_par_iter(self) -> Self::Iter { + <&mut [T]>::into_par_iter(self) + } +} From 0e8c0258e26aeac1898591d846272156f133be3f Mon Sep 17 00:00:00 2001 From: re0312 Date: Mon, 22 Jan 2024 02:55:10 +0800 Subject: [PATCH 03/13] helper --- crates/bevy_tasks/src/iter/extend.rs | 91 ++++++++++++++++++++++++++-- crates/bevy_tasks/src/iter/mod.rs | 29 +++------ crates/bevy_tasks/src/lib.rs | 2 +- 3 files changed, 97 insertions(+), 25 deletions(-) diff --git a/crates/bevy_tasks/src/iter/extend.rs b/crates/bevy_tasks/src/iter/extend.rs index 2c1c348150967..60d502b226680 100644 --- a/crates/bevy_tasks/src/iter/extend.rs +++ b/crates/bevy_tasks/src/iter/extend.rs @@ -1,4 +1,8 @@ -use super::{ParallelExtend, IntoParallelIterator, ParallelIterator}; +use std::collections::LinkedList; + +use crate::{Consumer, Folder, Reducer, UnindexedConsumer}; + +use super::{IntoParallelIterator, ParallelExtend, ParallelIterator}; /// Extends a vector with items from a parallel iterator. impl ParallelExtend for Vec @@ -19,11 +23,90 @@ where crate::iter::collect::special_extend(par_iter, len, self); } None => { - todo!(); // This works like `extend`, but `Vec::append` is more efficient. - // let list = par_iter.drive_unindexed(ListVecConsumer); - // vec_append(self, list); + let list = par_iter.drive_unindexed(ListVecConsumer); + vec_append(self, list); } } } } + +fn len(list: &LinkedList>) -> usize { + list.iter().map(Vec::len).sum() +} + +fn vec_append(vec: &mut Vec, list: LinkedList>) { + vec.reserve(len(&list)); + for mut other in list { + vec.append(&mut other); + } +} + +struct ListVecConsumer; +struct ListVecFolder { + vec: Vec, +} +struct ListReducer; + +impl Reducer> for ListReducer { + fn reduce(self, mut left: LinkedList, mut right: LinkedList) -> LinkedList { + left.append(&mut right); + left + } +} +impl Consumer for ListVecConsumer { + type Folder = ListVecFolder; + type Reducer = ListReducer; + type Result = LinkedList>; + + fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { + (Self, Self, ListReducer) + } + + fn into_folder(self) -> Self::Folder { + ListVecFolder { vec: Vec::new() } + } + + fn full(&self) -> bool { + false + } +} + +impl UnindexedConsumer for ListVecConsumer { + fn split_off_left(&self) -> Self { + Self + } + + fn to_reducer(&self) -> Self::Reducer { + ListReducer + } +} + +impl Folder for ListVecFolder { + type Result = LinkedList>; + + fn consume(mut self, item: T) -> Self { + self.vec.push(item); + self + } + + fn consume_iter(mut self, iter: I) -> Self + where + I: IntoIterator, + { + self.vec.extend(iter); + self + } + + fn complete(self) -> Self::Result { + let mut list = LinkedList::new(); + if !self.vec.is_empty() { + list.push_back(self.vec); + } + list + } + + fn full(&self) -> bool { + false + } +} diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 0c9cbf38a4ad9..3e1aaf546f87e 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -74,29 +74,23 @@ where C: Consumer, { let splitter = LengthSplitter::new(producer.min_len(), producer.max_len(), len); - return helper(len, false, splitter, producer, consumer); + return helper(len, splitter, producer, consumer); - fn helper( - len: usize, - migrated: bool, - mut splitter: LengthSplitter, - producer: P, - consumer: C, - ) -> C::Result + fn helper(len: usize, mut splitter: LengthSplitter, producer: P, consumer: C) -> C::Result where P: Producer, C: Consumer, { if consumer.full() { consumer.into_folder().complete() - } else if splitter.try_split(len, migrated) { + } else if splitter.try_split(len) { let mid = len / 2; let (left_producer, right_producer) = producer.split_at(mid); let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); let (left_result, right_result) = join( ComputeTaskPool::get(), - || helper(mid, false, splitter, left_producer, left_consumer), - || helper(len - mid, false, splitter, right_producer, right_consumer), + || helper(mid, splitter, left_producer, left_consumer), + || helper(len - mid, splitter, right_producer, right_consumer), ); reducer.reduce(left_result, right_result) } else { @@ -659,15 +653,10 @@ impl Splitter { } #[inline] - fn try_split(&mut self, stolen: bool) -> bool { + fn try_split(&mut self) -> bool { let Splitter { splits } = *self; - if stolen { - // This job was stolen! Reset the number of desired splits to the - // thread count, if that's more than we had remaining anyway. - self.splits = cmp::max(compute_task_pool_thread_num(), self.splits / 2); - true - } else if splits > 0 { + if splits > 0 { // We have splits remaining, make it so. self.splits /= 2; true @@ -721,8 +710,8 @@ impl LengthSplitter { } #[inline] - fn try_split(&mut self, len: usize, stolen: bool) -> bool { + fn try_split(&mut self, len: usize) -> bool { // If splitting wouldn't make us too small, try the inner splitter. - len / 2 >= self.min && self.inner.try_split(stolen) + len / 2 >= self.min && self.inner.try_split() } } diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index fbf4ed8934819..1f1b8b4055cea 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -31,8 +31,8 @@ pub use async_io::block_on; #[cfg(not(feature = "async-io"))] pub use futures_lite::future::block_on; -pub mod vec; mod iter; +mod vec; pub use futures_lite; pub use iter::*; From 96bf85eb151ef26f81cc53717fac638ba4989307 Mon Sep 17 00:00:00 2001 From: re0312 Date: Mon, 22 Jan 2024 06:18:22 +0800 Subject: [PATCH 04/13] map --- crates/bevy_tasks/examples/busy_behavior.rs | 39 +-- crates/bevy_tasks/examples/vec_example.rs | 43 ++-- crates/bevy_tasks/src/iter/map.rs | 251 ++++++++++++++++++++ crates/bevy_tasks/src/iter/mod.rs | 35 ++- 4 files changed, 334 insertions(+), 34 deletions(-) create mode 100644 crates/bevy_tasks/src/iter/map.rs diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index c2fc2aabe5e99..767006fca99b5 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -8,27 +8,30 @@ use web_time::{Duration, Instant}; fn main() { let pool = TaskPoolBuilder::new() .thread_name("Busy Behavior ThreadPool".to_string()) - .num_threads(20) + .num_threads(5) .build(); + println!("main {:?}", std::thread::current().id()); + const iter_count: usize = 1000; let t0 = Instant::now(); - pool.scope(|s| { - for i in 0..40 { - s.spawn(async move { - let now = Instant::now(); - while Instant::now() - now < Duration::from_millis(100) { - // spin, simulating work being done - } + for _ in 0..iter_count { + pool.scope(|s| { + for i in 0..20 { + s.spawn(async move { + let now = Instant::now(); + while Instant::now() - now < Duration::from_micros(1000) { + // spin, simulating work being done + } - println!( - "Thread {:?} index {} finished", - std::thread::current().id(), - i - ); - }); - } - }); + // println!( + // "Thread {:?} index {} finished", + // std::thread::current().id(), + // i + // ); + }); + } + }); + } - let t1 = Instant::now(); - println!("all tasks finished in {} secs", (t1 - t0).as_secs_f32()); + println!(" par {:?} elapsed", t0.elapsed() / iter_count as u32); } diff --git a/crates/bevy_tasks/examples/vec_example.rs b/crates/bevy_tasks/examples/vec_example.rs index 0a6faf38b6559..4b8daa7a921f3 100644 --- a/crates/bevy_tasks/examples/vec_example.rs +++ b/crates/bevy_tasks/examples/vec_example.rs @@ -1,28 +1,41 @@ +use std::hint::black_box; + use bevy_tasks::{ - ComputeTaskPool, IntoParallelRefIterator, ParallelIterator, TaskPool, TaskPoolBuilder, + compute_task_pool_thread_num, ComputeTaskPool, IntoParallelRefIterator, ParallelIterator, + TaskPool, TaskPoolBuilder, }; use web_time::{Duration, Instant}; +pub fn heavy_compute(v: i32) -> i32 { + let now = Instant::now(); + while Instant::now() - now < Duration::from_micros(1) { + // spin, simulating work being done + } + v +} fn main() { ComputeTaskPool::get_or_init(|| { TaskPoolBuilder::default() - .num_threads(20) + .num_threads(5) .thread_name("Compute Task Pool".to_string()) .build() }); - let a: Vec<_> = (0..40).collect(); + println!("main {:?}", std::thread::current().id()); + let a: Vec<_> = (0..20000).collect(); let t0 = Instant::now(); - a.par_iter().for_each(|v| { - let now = Instant::now(); - while Instant::now() - now < Duration::from_millis(100) { - // spin, simulating work being done - } - println!( - "Thread {:?} index {} finished", - std::thread::current().id(), - v - ); + a.iter().for_each(|v| { + black_box(v); + black_box(heavy_compute(*v)); }); - let t1 = Instant::now(); - println!("all tasks finished in {} secs", (t1 - t0).as_secs_f32()); + println!(" iter {:?} elapsed", t0.elapsed()); + const iter_count: usize = 1000; + let t0 = Instant::now(); + for _ in 0..iter_count { + a.par_iter().for_each(|v| { + // println!("Thread {:?} finished", std::thread::current().id(),); + black_box(v); + black_box(heavy_compute(*v)); + }); + } + println!(" par {:?} elapsed", t0.elapsed() / iter_count as u32); } diff --git a/crates/bevy_tasks/src/iter/map.rs b/crates/bevy_tasks/src/iter/map.rs new file mode 100644 index 0000000000000..2f71e23efda9a --- /dev/null +++ b/crates/bevy_tasks/src/iter/map.rs @@ -0,0 +1,251 @@ +use std::iter; + +use crate::{ParallelIterator, UnindexedConsumer, IndexedParallelIterator, Consumer, ProducerCallback, Producer, Folder}; + +/// `Map` is an iterator that transforms the elements of an underlying iterator. +/// +/// This struct is created by the [`map()`] method on [`ParallelIterator`] +/// +/// [`map()`]: trait.ParallelIterator.html#method.map +/// [`ParallelIterator`]: trait.ParallelIterator.html +#[must_use = "iterator adaptors are lazy and do nothing unless consumed"] +#[derive(Clone)] +pub struct Map { + base: I, + map_op: F, +} + +impl Map +where + I: ParallelIterator, +{ + /// Creates a new `Map` iterator. + pub(super) fn new(base: I, map_op: F) -> Self { + Map { base, map_op } + } +} + +impl ParallelIterator for Map +where + I: ParallelIterator, + F: Fn(I::Item) -> R + Sync + Send, + R: Send, +{ + type Item = F::Output; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + let consumer1 = MapConsumer::new(consumer, &self.map_op); + self.base.drive_unindexed(consumer1) + } + + fn opt_len(&self) -> Option { + self.base.opt_len() + } +} + +impl IndexedParallelIterator for Map +where + I: IndexedParallelIterator, + F: Fn(I::Item) -> R + Sync + Send, + R: Send, +{ + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + let consumer1 = MapConsumer::new(consumer, &self.map_op); + self.base.drive(consumer1) + } + + fn len(&self) -> usize { + self.base.len() + } + + fn with_producer(self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + return self.base.with_producer(Callback { + callback, + map_op: self.map_op, + }); + + struct Callback { + callback: CB, + map_op: F, + } + + impl ProducerCallback for Callback + where + CB: ProducerCallback, + F: Fn(T) -> R + Sync, + R: Send, + { + type Output = CB::Output; + + fn callback

(self, base: P) -> CB::Output + where + P: Producer, + { + let producer = MapProducer { + base, + map_op: &self.map_op, + }; + self.callback.callback(producer) + } + } + } +} + +/// //////////////////////////////////////////////////////////////////////// + +struct MapProducer<'f, P, F> { + base: P, + map_op: &'f F, +} + +impl<'f, P, F, R> Producer for MapProducer<'f, P, F> +where + P: Producer, + F: Fn(P::Item) -> R + Sync, + R: Send, +{ + type Item = F::Output; + type IntoIter = iter::Map; + + fn into_iter(self) -> Self::IntoIter { + self.base.into_iter().map(self.map_op) + } + + fn min_len(&self) -> usize { + self.base.min_len() + } + fn max_len(&self) -> usize { + self.base.max_len() + } + + fn split_at(self, index: usize) -> (Self, Self) { + let (left, right) = self.base.split_at(index); + ( + MapProducer { + base: left, + map_op: self.map_op, + }, + MapProducer { + base: right, + map_op: self.map_op, + }, + ) + } + + fn fold_with(self, folder: G) -> G + where + G: Folder, + { + let folder1 = MapFolder { + base: folder, + map_op: self.map_op, + }; + self.base.fold_with(folder1).base + } +} + +/// //////////////////////////////////////////////////////////////////////// +/// Consumer implementation + +struct MapConsumer<'f, C, F> { + base: C, + map_op: &'f F, +} + +impl<'f, C, F> MapConsumer<'f, C, F> { + fn new(base: C, map_op: &'f F) -> Self { + MapConsumer { base, map_op } + } +} + +impl<'f, T, R, C, F> Consumer for MapConsumer<'f, C, F> +where + C: Consumer, + F: Fn(T) -> R + Sync, + R: Send, +{ + type Folder = MapFolder<'f, C::Folder, F>; + type Reducer = C::Reducer; + type Result = C::Result; + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { + let (left, right, reducer) = self.base.split_at(index); + ( + MapConsumer::new(left, self.map_op), + MapConsumer::new(right, self.map_op), + reducer, + ) + } + + fn into_folder(self) -> Self::Folder { + MapFolder { + base: self.base.into_folder(), + map_op: self.map_op, + } + } + + fn full(&self) -> bool { + self.base.full() + } +} + +impl<'f, T, R, C, F> UnindexedConsumer for MapConsumer<'f, C, F> +where + C: UnindexedConsumer, + F: Fn(T) -> R + Sync, + R: Send, +{ + fn split_off_left(&self) -> Self { + MapConsumer::new(self.base.split_off_left(), self.map_op) + } + + fn to_reducer(&self) -> Self::Reducer { + self.base.to_reducer() + } +} + +struct MapFolder<'f, C, F> { + base: C, + map_op: &'f F, +} + +impl<'f, T, R, C, F> Folder for MapFolder<'f, C, F> +where + C: Folder, + F: Fn(T) -> R, +{ + type Result = C::Result; + + fn consume(self, item: T) -> Self { + let mapped_item = (self.map_op)(item); + MapFolder { + base: self.base.consume(mapped_item), + map_op: self.map_op, + } + } + + fn consume_iter(mut self, iter: I) -> Self + where + I: IntoIterator, + { + self.base = self.base.consume_iter(iter.into_iter().map(self.map_op)); + self + } + + fn complete(self) -> C::Result { + self.base.complete() + } + + fn full(&self) -> bool { + self.base.full() + } +} diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 3e1aaf546f87e..e8e8bf9c230a0 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -2,10 +2,13 @@ use std::cmp; use crate::{compute_task_pool_thread_num, ComputeTaskPool, TaskPool}; +use self::map::Map; + mod collect; mod extend; mod for_each; mod from_par_iter; +mod map; mod noop; // pub use self::collect::special_extend; @@ -291,6 +294,30 @@ pub trait ParallelIterator: Sized + Send { for_each::for_each(self, &op) } + /// Applies `map_op` to each item of this iterator, producing a new + /// iterator with the results. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// + /// let mut par_iter = (0..5).into_par_iter().map(|x| x * 2); + /// + /// let doubles: Vec<_> = par_iter.collect(); + /// + /// assert_eq!(&doubles[..], &[0, 2, 4, 6, 8]); + /// ``` + fn map(self, map_op: F) -> Map + where + F: Fn(Self::Item) -> R + Sync + Send, + R: Send, + { + Map::new(self, map_op) + } + + /// Creates a fresh collection containing all the elements produced + /// by this parallel iterator. fn collect(self) -> C where C: FromParallelIterator, @@ -333,6 +360,12 @@ pub trait ParallelIterator: Sized + Send { } } +/// An iterator that supports "random access" to its data, meaning +/// that you can split it at arbitrary indices and draw data from +/// those points. +/// +/// **Note:** Not implemented for `u64`, `i64`, `u128`, or `i128` ranges +// Waiting for `ExactSizeIterator::is_empty` to be stabilized. See rust-lang/rust#35428 pub trait IndexedParallelIterator: ParallelIterator { /// Produces an exact count of how many items this iterator will /// produce, presuming no panic occurs. @@ -648,7 +681,7 @@ impl Splitter { #[inline] fn new() -> Splitter { Splitter { - splits: compute_task_pool_thread_num(), + splits: compute_task_pool_thread_num() * 2, } } From 8bab5896efab583cb2949d1231db5e76ca883a54 Mon Sep 17 00:00:00 2001 From: re0312 Date: Mon, 22 Jan 2024 15:18:42 +0800 Subject: [PATCH 05/13] clean up --- crates/bevy_tasks/examples/busy_behavior.rs | 1 + crates/bevy_tasks/examples/vec_example.rs | 22 +- crates/bevy_tasks/src/iter/extend.rs | 1 - crates/bevy_tasks/src/iter/map.rs | 4 +- crates/bevy_tasks/src/iter/mod.rs | 262 +++----------------- 5 files changed, 60 insertions(+), 230 deletions(-) diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index 767006fca99b5..91e5d7bc22596 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -14,6 +14,7 @@ fn main() { println!("main {:?}", std::thread::current().id()); const iter_count: usize = 1000; let t0 = Instant::now(); + let a: Vec<_> = (0..20000).collect(); for _ in 0..iter_count { pool.scope(|s| { for i in 0..20 { diff --git a/crates/bevy_tasks/examples/vec_example.rs b/crates/bevy_tasks/examples/vec_example.rs index 4b8daa7a921f3..226a33382c13e 100644 --- a/crates/bevy_tasks/examples/vec_example.rs +++ b/crates/bevy_tasks/examples/vec_example.rs @@ -1,8 +1,8 @@ -use std::hint::black_box; +use std::{hint::black_box, iter}; use bevy_tasks::{ - compute_task_pool_thread_num, ComputeTaskPool, IntoParallelRefIterator, ParallelIterator, - TaskPool, TaskPoolBuilder, + compute_task_pool_thread_num, ComputeTaskPool, FromParallelIterator, IntoParallelRefIterator, + ParallelIterator, TaskPool, TaskPoolBuilder, }; use web_time::{Duration, Instant}; @@ -13,20 +13,34 @@ pub fn heavy_compute(v: i32) -> i32 { } v } +struct Test { + mass: usize, +} +impl FromParallelIterator for Test { + fn from_par_iter(par_iter: I) -> Self + where + I: bevy_tasks::IntoParallelIterator, + { + let par_iter = par_iter.into_par_iter(); + Test { mass: 1 } + } +} fn main() { ComputeTaskPool::get_or_init(|| { TaskPoolBuilder::default() - .num_threads(5) + .num_threads(10) .thread_name("Compute Task Pool".to_string()) .build() }); println!("main {:?}", std::thread::current().id()); let a: Vec<_> = (0..20000).collect(); let t0 = Instant::now(); + // for _ in 0..iter_count { a.iter().for_each(|v| { black_box(v); black_box(heavy_compute(*v)); }); + // } println!(" iter {:?} elapsed", t0.elapsed()); const iter_count: usize = 1000; let t0 = Instant::now(); diff --git a/crates/bevy_tasks/src/iter/extend.rs b/crates/bevy_tasks/src/iter/extend.rs index 60d502b226680..143357d894adc 100644 --- a/crates/bevy_tasks/src/iter/extend.rs +++ b/crates/bevy_tasks/src/iter/extend.rs @@ -13,7 +13,6 @@ where where I: IntoParallelIterator, { - // See the vec_collect benchmarks in rayon-demo for different strategies. let par_iter = par_iter.into_par_iter(); match par_iter.opt_len() { Some(len) => { diff --git a/crates/bevy_tasks/src/iter/map.rs b/crates/bevy_tasks/src/iter/map.rs index 2f71e23efda9a..be584eb4cec2c 100644 --- a/crates/bevy_tasks/src/iter/map.rs +++ b/crates/bevy_tasks/src/iter/map.rs @@ -5,9 +5,7 @@ use crate::{ParallelIterator, UnindexedConsumer, IndexedParallelIterator, Consum /// `Map` is an iterator that transforms the elements of an underlying iterator. /// /// This struct is created by the [`map()`] method on [`ParallelIterator`] -/// -/// [`map()`]: trait.ParallelIterator.html#method.map -/// [`ParallelIterator`]: trait.ParallelIterator.html + #[must_use = "iterator adaptors are lazy and do nothing unless consumed"] #[derive(Clone)] pub struct Map { diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index e8e8bf9c230a0..7ac7d38dee831 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -11,7 +11,6 @@ mod from_par_iter; mod map; mod noop; -// pub use self::collect::special_extend; /// This helper function is used to "connect" a parallel iterator to a /// consumer. It will convert the `par_iter` into a producer P and /// then pull items from P and feed them to `consumer`, splitting and @@ -20,9 +19,6 @@ mod noop; /// This is useful when you are implementing your own parallel /// iterators: it is often used as the definition of the /// [`drive_unindexed`] or [`drive`] methods. -/// -/// [`drive_unindexed`]: ../trait.ParallelIterator.html#tymethod.drive_unindexed -/// [`drive`]: ../trait.IndexedParallelIterator.html#tymethod.drive pub fn bridge(par_iter: I, consumer: C) -> C::Result where I: IndexedParallelIterator, @@ -50,6 +46,7 @@ where } } +/// ToDO:optimize it fn join(pool: &TaskPool, op_a: A, op_b: B) -> (RA, RB) where A: FnOnce() -> RA + Send, @@ -70,7 +67,15 @@ where (ra.unwrap(), rb.unwrap()) } -/// todo +/// This helper function is used to "connect" a producer and a +/// consumer. You may prefer to call [`bridge`], which wraps this +/// function. This function will draw items from `producer` and feed +/// them to `consumer`, splitting and creating parallel tasks when +/// needed. +/// +/// This is useful when you are implementing your own parallel +/// iterators: it is often used as the definition of the +/// [`drive_unindexed`] or [`drive`] methods. pub fn bridge_producer_consumer(len: usize, producer: P, consumer: C) -> C::Result where P: Producer, @@ -87,6 +92,11 @@ where if consumer.full() { consumer.into_folder().complete() } else if splitter.try_split(len) { + // TODO: optimize it + // Increasing the thread number may not necessarily enhance performance due to the split method. + // Additional benefits can only be realized when the number of threads reaches the next power of 2. + // Rayon may split tasks into smaller slices in some cases, but Bevy's executor suffers from overhead + // when spawning a large number of small tasks. let mid = len / 2; let (left_producer, right_producer) = producer.split_at(mid); let (left_consumer, right_consumer, reducer) = consumer.split_at(mid); @@ -144,6 +154,27 @@ pub trait Folder: Sized { fn full(&self) -> bool; } +/// A `Producer` is effectively a "splittable `IntoIterator`". That +/// is, a producer is a value which can be converted into an iterator +/// at any time: at that point, it simply produces items on demand, +/// like any iterator. But what makes a `Producer` special is that, +/// *before* we convert to an iterator, we can also **split** it at a +/// particular point using the `split_at` method. This will yield up +/// two producers, one producing the items before that point, and one +/// producing the items after that point (these two producers can then +/// independently be split further, or be converted into iterators). +/// In Rayon, this splitting is used to divide between threads. +/// See [the `plumbing` README][r] for further details. +/// +/// Note that each producer will always produce a fixed number of +/// items N. However, this number N is not queryable through the API; +/// the consumer is expected to track it. +/// +/// NB. You might expect `Producer` to extend the `IntoIterator` +/// trait. However, [rust-lang/rust#20671][20671] prevents us from +/// declaring the DoubleEndedIterator and ExactSizeIterator +/// constraints on a required IntoIterator trait, so we inline +/// IntoIterator here until that issue is fixed. pub trait Producer: Send + Sized { /// The type of item that will be produced by this producer once /// it is converted into an iterator. @@ -160,12 +191,7 @@ pub trait Producer: Send + Sized { /// sequentially. Defaults to 1, which means that we will split /// all the way down to a single item. This can be raised higher /// using the [`with_min_len`] method, which will force us to - /// create sequential tasks at a larger granularity. Note that - /// Rayon automatically normally attempts to adjust the size of - /// parallel splits to reduce overhead, so this should not be - /// needed. - /// - /// [`with_min_len`]: ../trait.IndexedParallelIterator.html#method.with_min_len + /// create sequential tasks at a larger granularity. fn min_len(&self) -> usize { 1 } @@ -174,11 +200,7 @@ pub trait Producer: Send + Sized { /// sequentially. Defaults to MAX, which means that we can choose /// not to split at all. This can be lowered using the /// [`with_max_len`] method, which will force us to create more - /// parallel tasks. Note that Rayon automatically normally - /// attempts to adjust the size of parallel splits to reduce - /// overhead, so this should not be needed. - /// - /// [`with_max_len`]: ../trait.IndexedParallelIterator.html#method.with_max_len + /// parallel tasks. fn max_len(&self) -> usize { usize::MAX } @@ -200,11 +222,7 @@ pub trait Producer: Send + Sized { } /// The `ProducerCallback` trait is a kind of generic closure, -/// [analogous to `FnOnce`][FnOnce]. See [the corresponding section in -/// the plumbing README][r] for more details. -/// -/// [r]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md#producer-callback -/// [FnOnce]: https://doc.rust-lang.org/std/ops/trait.FnOnce.html +/// [analogous to `FnOnce`][FnOnce]. pub trait ProducerCallback { /// The type of value returned by this callback. Analogous to /// [`Output` from the `FnOnce` trait][Output]. @@ -279,14 +297,6 @@ pub trait ParallelIterator: Sized + Send { /// [`for_each`]: #method.for_each type Item: Send; /// Executes `OP` on each item produced by the iterator, in parallel. - /// - /// # Examples - /// - /// ``` - /// use rayon::prelude::*; - /// - /// (0..100).into_par_iter().for_each(|x| println!("{:?}", x)); - /// ``` fn for_each(self, op: OP) where OP: Fn(Self::Item) + Sync + Send, @@ -296,18 +306,6 @@ pub trait ParallelIterator: Sized + Send { /// Applies `map_op` to each item of this iterator, producing a new /// iterator with the results. - /// - /// # Examples - /// - /// ``` - /// use rayon::prelude::*; - /// - /// let mut par_iter = (0..5).into_par_iter().map(|x| x * 2); - /// - /// let doubles: Vec<_> = par_iter.collect(); - /// - /// assert_eq!(&doubles[..], &[0, 2, 4, 6, 8]); - /// ``` fn map(self, map_op: F) -> Map where F: Fn(Self::Item) -> R + Sync + Send, @@ -327,16 +325,6 @@ pub trait ParallelIterator: Sized + Send { /// Internal method used to define the behavior of this parallel /// iterator. You should not need to call this directly. - /// - /// This method causes the iterator `self` to start producing - /// items and to feed them to the consumer `consumer` one by one. - /// It may split the consumer before doing so to create the - /// opportunity to produce in parallel. - /// - /// See the [README] for more details on the internals of parallel - /// iterators. - /// - /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer; @@ -369,18 +357,6 @@ pub trait ParallelIterator: Sized + Send { pub trait IndexedParallelIterator: ParallelIterator { /// Produces an exact count of how many items this iterator will /// produce, presuming no panic occurs. - /// - /// # Examples - /// - /// ``` - /// use rayon::prelude::*; - /// - /// let par_iter = (0..100).into_par_iter().zip(vec![0; 10]); - /// assert_eq!(par_iter.len(), 10); - /// - /// let vec: Vec<_> = par_iter.collect(); - /// assert_eq!(vec.len(), 10); - /// ``` fn len(&self) -> usize; /// Internal method used to define the behavior of this parallel @@ -392,11 +368,6 @@ pub trait IndexedParallelIterator: ParallelIterator { /// opportunity to produce in parallel. If a split does happen, it /// will inform the consumer of the index where the split should /// occur (unlike `ParallelIterator::drive_unindexed()`). - /// - /// See the [README] for more details on the internals of parallel - /// iterators. - /// - /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md fn drive>(self, consumer: C) -> C::Result; /// Internal method used to define the behavior of this parallel @@ -409,11 +380,6 @@ pub trait IndexedParallelIterator: ParallelIterator { /// allows the producer type to contain references; it also means /// that parallel iterators can adjust that type without causing a /// breaking change. - /// - /// See the [README] for more details on the internals of parallel - /// iterators. - /// - /// [README]: https://github.com/rayon-rs/rayon/blob/master/src/iter/plumbing/README.md fn with_producer>(self, callback: CB) -> CB::Output; } @@ -423,68 +389,18 @@ pub trait IndexedParallelIterator: ParallelIterator { /// created from an iterator. /// /// `FromParallelIterator` is used through [`ParallelIterator`]'s [`collect()`] method. -/// -/// [`ParallelIterator`]: trait.ParallelIterator.html -/// [`collect()`]: trait.ParallelIterator.html#method.collect -/// -/// # Examples -/// -/// Implementing `FromParallelIterator` for your type: -/// -/// ``` -/// use rayon::prelude::*; -/// use std::mem; -/// -/// struct BlackHole { -/// mass: usize, -/// } -/// -/// impl FromParallelIterator for BlackHole { -/// fn from_par_iter(par_iter: I) -> Self -/// where I: IntoParallelIterator -/// { -/// let par_iter = par_iter.into_par_iter(); -/// BlackHole { -/// mass: par_iter.count() * mem::size_of::(), -/// } -/// } -/// } -/// -/// let bh: BlackHole = (0i32..1000).into_par_iter().collect(); -/// assert_eq!(bh.mass, 4000); -/// ``` pub trait FromParallelIterator where T: Send, { /// Creates an instance of the collection from the parallel iterator `par_iter`. /// - /// If your collection is not naturally parallel, the easiest (and - /// fastest) way to do this is often to collect `par_iter` into a - /// [`LinkedList`] or other intermediate data structure and then - /// sequentially extend your collection. However, a more 'native' - /// technique is to use the [`par_iter.fold`] or - /// [`par_iter.fold_with`] methods to create the collection. - /// Alternatively, if your collection is 'natively' parallel, you - /// can use `par_iter.for_each` to process each element in turn. - /// - /// [`LinkedList`]: https://doc.rust-lang.org/std/collections/struct.LinkedList.html - /// [`par_iter.fold`]: trait.ParallelIterator.html#method.fold - /// [`par_iter.fold_with`]: trait.ParallelIterator.html#method.fold_with - /// [`par_iter.for_each`]: trait.ParallelIterator.html#method.for_each fn from_par_iter(par_iter: I) -> Self where I: IntoParallelIterator; } /// `IntoParallelIterator` implements the conversion to a [`ParallelIterator`]. -/// -/// By implementing `IntoParallelIterator` for a type, you define how it will -/// transformed into an iterator. This is a parallel version of the standard -/// library's [`std::iter::IntoIterator`] trait. -/// -/// [`ParallelIterator`]: trait.ParallelIterator.html -/// [`std::iter::IntoIterator`]: https://doc.rust-lang.org/std/iter/trait.IntoIterator.html pub trait IntoParallelIterator { /// The parallel iterator type that will be created. type Iter: ParallelIterator; @@ -493,27 +409,6 @@ pub trait IntoParallelIterator { type Item: Send; /// Converts `self` into a parallel iterator. - /// - /// # Examples - /// - /// ``` - /// use rayon::prelude::*; - /// - /// println!("counting in parallel:"); - /// (0..100).into_par_iter() - /// .for_each(|i| println!("{}", i)); - /// ``` - /// - /// This conversion is often implicit for arguments to methods like [`zip`]. - /// - /// ``` - /// use rayon::prelude::*; - /// - /// let v: Vec<_> = (0..5).into_par_iter().zip(5..10).collect(); - /// assert_eq!(v, [(0, 5), (1, 6), (2, 7), (3, 8), (4, 9)]); - /// ``` - /// - /// [`zip`]: trait.IndexedParallelIterator.html#method.zip fn into_par_iter(self) -> Self::Iter; } @@ -528,17 +423,6 @@ impl IntoParallelIterator for T { /// `IntoParallelRefIterator` implements the conversion to a /// [`ParallelIterator`], providing shared references to the data. -/// -/// This is a parallel version of the `iter()` method -/// defined by various collections. -/// -/// This trait is automatically implemented -/// `for I where &I: IntoParallelIterator`. In most cases, users -/// will want to implement [`IntoParallelIterator`] rather than implement -/// this trait directly. -/// -/// [`ParallelIterator`]: trait.ParallelIterator.html -/// [`IntoParallelIterator`]: trait.IntoParallelIterator.html pub trait IntoParallelRefIterator<'data> { /// The type of the parallel iterator that will be returned. type Iter: ParallelIterator; @@ -548,20 +432,6 @@ pub trait IntoParallelRefIterator<'data> { type Item: Send + 'data; /// Converts `self` into a parallel iterator. - /// - /// # Examples - /// - /// ``` - /// use rayon::prelude::*; - /// - /// let v: Vec<_> = (0..100).collect(); - /// assert_eq!(v.par_iter().sum::(), 100 * 99 / 2); - /// - /// // `v.par_iter()` is shorthand for `(&v).into_par_iter()`, - /// // producing the exact same references. - /// assert!(v.par_iter().zip(&v) - /// .all(|(a, b)| std::ptr::eq(a, b))); - /// ``` fn par_iter(&'data self) -> Self::Iter; } @@ -586,16 +456,6 @@ pub trait IntoParallelRefMutIterator<'data> { type Item: Send + 'data; /// Creates the parallel iterator from `self`. - /// - /// # Examples - /// - /// ``` - /// use rayon::prelude::*; - /// - /// let mut v = vec![0usize; 5]; - /// v.par_iter_mut().enumerate().for_each(|(i, x)| *x = i); - /// assert_eq!(v, [0, 1, 2, 3, 4]); - /// ``` fn par_iter_mut(&'data mut self) -> Self::Iter; } @@ -612,53 +472,12 @@ where } /// `ParallelExtend` extends an existing collection with items from a [`ParallelIterator`]. -/// -/// [`ParallelIterator`]: trait.ParallelIterator.html -/// -/// # Examples -/// -/// Implementing `ParallelExtend` for your type: -/// -/// ``` -/// use rayon::prelude::*; -/// use std::mem; -/// -/// struct BlackHole { -/// mass: usize, -/// } -/// -/// impl ParallelExtend for BlackHole { -/// fn par_extend(&mut self, par_iter: I) -/// where I: IntoParallelIterator -/// { -/// let par_iter = par_iter.into_par_iter(); -/// self.mass += par_iter.count() * mem::size_of::(); -/// } -/// } -/// -/// let mut bh = BlackHole { mass: 0 }; -/// bh.par_extend(0i32..1000); -/// assert_eq!(bh.mass, 4000); -/// bh.par_extend(0i64..10); -/// assert_eq!(bh.mass, 4080); -/// ``` pub trait ParallelExtend where T: Send, { /// Extends an instance of the collection with the elements drawn /// from the parallel iterator `par_iter`. - /// - /// # Examples - /// - /// ``` - /// use rayon::prelude::*; - /// - /// let mut vec = vec![]; - /// vec.par_extend(0..5); - /// vec.par_extend((0..5).into_par_iter().map(|i| i * i)); - /// assert_eq!(vec, [0, 1, 2, 3, 4, 0, 1, 4, 9, 16]); - /// ``` fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator; @@ -681,7 +500,7 @@ impl Splitter { #[inline] fn new() -> Splitter { Splitter { - splits: compute_task_pool_thread_num() * 2, + splits: compute_task_pool_thread_num(), } } @@ -694,7 +513,6 @@ impl Splitter { self.splits /= 2; true } else { - // Not stolen, and no more splits -- we're done! false } } From 3ac0d70344369eb91ccd52da57d96c40c2b71229 Mon Sep 17 00:00:00 2001 From: re0312 Date: Mon, 22 Jan 2024 15:31:59 +0800 Subject: [PATCH 06/13] document --- crates/bevy_tasks/src/iter/collect.rs | 7 ---- crates/bevy_tasks/src/iter/mod.rs | 46 ++++++++++++++++++--------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/crates/bevy_tasks/src/iter/collect.rs b/crates/bevy_tasks/src/iter/collect.rs index a9dc6c4f43ea7..7733c5e878fac 100644 --- a/crates/bevy_tasks/src/iter/collect.rs +++ b/crates/bevy_tasks/src/iter/collect.rs @@ -17,13 +17,6 @@ unsafe impl Send for SendPtr {} // SAFETY: !Sync for raw pointers is not for safety, just as a lint unsafe impl Sync for SendPtr {} -impl SendPtr { - // Helper to avoid disjoint captures of `send_ptr.0` - fn get(self) -> *mut T { - self.0 - } -} - // Implement Clone without the T: Clone bound from the derive impl Clone for SendPtr { fn clone(&self) -> Self { diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 7ac7d38dee831..6476710ef09fc 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -93,7 +93,7 @@ where consumer.into_folder().complete() } else if splitter.try_split(len) { // TODO: optimize it - // Increasing the thread number may not necessarily enhance performance due to the split method. + // Increasing thread number may not necessarily enhance performance due to the split method. // Additional benefits can only be realized when the number of threads reaches the next power of 2. // Rayon may split tasks into smaller slices in some cases, but Bevy's executor suffers from overhead // when spawning a large number of small tasks. @@ -116,8 +116,6 @@ where /// operation][fold]. It can be fed many items using the `consume` /// method. At the end, once all items have been consumed, it can then /// be converted (using `complete`) into a final value. -/// -/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold pub trait Folder: Sized { /// The type of result that will ultimately be produced by the folder. type Result; @@ -163,18 +161,6 @@ pub trait Folder: Sized { /// two producers, one producing the items before that point, and one /// producing the items after that point (these two producers can then /// independently be split further, or be converted into iterators). -/// In Rayon, this splitting is used to divide between threads. -/// See [the `plumbing` README][r] for further details. -/// -/// Note that each producer will always produce a fixed number of -/// items N. However, this number N is not queryable through the API; -/// the consumer is expected to track it. -/// -/// NB. You might expect `Producer` to extend the `IntoIterator` -/// trait. However, [rust-lang/rust#20671][20671] prevents us from -/// declaring the DoubleEndedIterator and ExactSizeIterator -/// constraints on a required IntoIterator trait, so we inline -/// IntoIterator here until that issue is fixed. pub trait Producer: Send + Sized { /// The type of item that will be produced by this producer once /// it is converted into an iterator. @@ -238,12 +224,24 @@ pub trait ProducerCallback { P: Producer; } +/// The reducer is the final step of a `Consumer` -- after a consumer +/// has been split into two parts, and each of those parts has been +/// fully processed, we are left with two results. The reducer is then +/// used to combine those two results into one. pub trait Reducer { /// Reduce two final results into one; this is executed after a /// split. fn reduce(self, left: Result, right: Result) -> Result; } +/// A consumer is effectively a [generalized "fold" operation][fold], +/// and in fact each consumer will eventually be converted into a +/// [`Folder`]. What makes a consumer special is that, like a +/// [`Producer`], it can be **split** into multiple consumers using +/// the `split_at` method. When a consumer is split, it produces two +/// consumers, as well as a **reducer**. The two consumers can be fed +/// items independently, and when they are done the reducer is used to +/// combine their two results into one. pub trait Consumer: Send + Sized { /// The type of folder that this consumer can be converted into. type Folder: Folder; @@ -289,6 +287,14 @@ pub trait UnindexedConsumer: Consumer { fn to_reducer(&self) -> Self::Reducer; } +/// Parallel version of the standard iterator trait. +/// +/// The combinators on this trait are available on **all** parallel +/// iterators. Additional methods can be found on the +/// [`IndexedParallelIterator`] trait: those methods are only +/// available for parallel iterators where the number of items is +/// known in advance (so, e.g., after invoking `filter`, those methods +/// become unavailable). pub trait ParallelIterator: Sized + Send { /// The type of item that this parallel iterator produces. /// For example, if you use the [`for_each`] method, this is the type of @@ -447,6 +453,16 @@ where } } +/// `IntoParallelRefMutIterator` implements the conversion to a +/// [`ParallelIterator`], providing mutable references to the data. +/// +/// This is a parallel version of the `iter_mut()` method +/// defined by various collections. +/// +/// This trait is automatically implemented +/// `for I where &mut I: IntoParallelIterator`. In most cases, users +/// will want to implement [`IntoParallelIterator`] rather than implement +/// this trait directly. pub trait IntoParallelRefMutIterator<'data> { /// The type of iterator that will be created. type Iter: ParallelIterator; From 872ea50fb78fcff2501a431cc53dc4a81408839c Mon Sep 17 00:00:00 2001 From: re0312 Date: Tue, 23 Jan 2024 16:29:04 +0800 Subject: [PATCH 07/13] cleanup --- crates/bevy_tasks/examples/busy_behavior.rs | 40 +++++++-------- crates/bevy_tasks/examples/parallel.rs | 30 +++++++++++ crates/bevy_tasks/examples/vec_example.rs | 55 --------------------- crates/bevy_tasks/src/iter/mod.rs | 6 +-- crates/bevy_tasks/src/usages.rs | 4 +- 5 files changed, 52 insertions(+), 83 deletions(-) create mode 100644 crates/bevy_tasks/examples/parallel.rs delete mode 100644 crates/bevy_tasks/examples/vec_example.rs diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index 91e5d7bc22596..23a43de017a59 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -8,31 +8,27 @@ use web_time::{Duration, Instant}; fn main() { let pool = TaskPoolBuilder::new() .thread_name("Busy Behavior ThreadPool".to_string()) - .num_threads(5) + .num_threads(4) .build(); - println!("main {:?}", std::thread::current().id()); - const iter_count: usize = 1000; let t0 = Instant::now(); - let a: Vec<_> = (0..20000).collect(); - for _ in 0..iter_count { - pool.scope(|s| { - for i in 0..20 { - s.spawn(async move { - let now = Instant::now(); - while Instant::now() - now < Duration::from_micros(1000) { - // spin, simulating work being done - } + pool.scope(|s| { + for i in 0..40 { + s.spawn(async move { + let now = Instant::now(); + while Instant::now() - now < Duration::from_millis(100) { + // spin, simulating work being done + } - // println!( - // "Thread {:?} index {} finished", - // std::thread::current().id(), - // i - // ); - }); - } - }); - } + println!( + "Thread {:?} index {} finished", + std::thread::current().id(), + i + ); + }); + } + }); - println!(" par {:?} elapsed", t0.elapsed() / iter_count as u32); + let t1 = Instant::now(); + println!("all tasks finished in {} secs", (t1 - t0).as_secs_f32()); } diff --git a/crates/bevy_tasks/examples/parallel.rs b/crates/bevy_tasks/examples/parallel.rs new file mode 100644 index 0000000000000..5d0a15f984234 --- /dev/null +++ b/crates/bevy_tasks/examples/parallel.rs @@ -0,0 +1,30 @@ +use std::hint::black_box; + +use bevy_tasks::{ComputeTaskPool, IntoParallelRefIterator, ParallelIterator, TaskPoolBuilder}; +use web_time::{Duration, Instant}; + +pub fn heavy_compute(v: i32) -> i32 { + let now = Instant::now(); + while Instant::now() - now < Duration::from_micros(4) { + // spin, simulating work being done + } + v +} + +// 1,000,000 tasks that spin for 4us ,It's expected to take about one second to run (assuming the machine has >= 4 logical +// cores) +fn main() { + ComputeTaskPool::get_or_init(|| { + TaskPoolBuilder::default() + .num_threads(4) + .thread_name("Compute Task Pool".to_string()) + .build() + }); + + let a: Vec<_> = (0..1000000).collect(); + let t0 = Instant::now(); + a.par_iter().for_each(|v| { + black_box(heavy_compute(*v)); + }); + println!("foreach: {:?} elapsed", t0.elapsed()); +} diff --git a/crates/bevy_tasks/examples/vec_example.rs b/crates/bevy_tasks/examples/vec_example.rs deleted file mode 100644 index 226a33382c13e..0000000000000 --- a/crates/bevy_tasks/examples/vec_example.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::{hint::black_box, iter}; - -use bevy_tasks::{ - compute_task_pool_thread_num, ComputeTaskPool, FromParallelIterator, IntoParallelRefIterator, - ParallelIterator, TaskPool, TaskPoolBuilder, -}; -use web_time::{Duration, Instant}; - -pub fn heavy_compute(v: i32) -> i32 { - let now = Instant::now(); - while Instant::now() - now < Duration::from_micros(1) { - // spin, simulating work being done - } - v -} -struct Test { - mass: usize, -} -impl FromParallelIterator for Test { - fn from_par_iter(par_iter: I) -> Self - where - I: bevy_tasks::IntoParallelIterator, - { - let par_iter = par_iter.into_par_iter(); - Test { mass: 1 } - } -} -fn main() { - ComputeTaskPool::get_or_init(|| { - TaskPoolBuilder::default() - .num_threads(10) - .thread_name("Compute Task Pool".to_string()) - .build() - }); - println!("main {:?}", std::thread::current().id()); - let a: Vec<_> = (0..20000).collect(); - let t0 = Instant::now(); - // for _ in 0..iter_count { - a.iter().for_each(|v| { - black_box(v); - black_box(heavy_compute(*v)); - }); - // } - println!(" iter {:?} elapsed", t0.elapsed()); - const iter_count: usize = 1000; - let t0 = Instant::now(); - for _ in 0..iter_count { - a.par_iter().for_each(|v| { - // println!("Thread {:?} finished", std::thread::current().id(),); - black_box(v); - black_box(heavy_compute(*v)); - }); - } - println!(" par {:?} elapsed", t0.elapsed() / iter_count as u32); -} diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 6476710ef09fc..322f4892f1ffd 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -57,12 +57,10 @@ where let mut ra = None; let mut rb = None; pool.scope(|s| { - s.spawn(async { - ra = Some(op_a()); - }); s.spawn(async { rb = Some(op_b()); }); + ra = Some(op_a()); }); (ra.unwrap(), rb.unwrap()) } @@ -167,7 +165,7 @@ pub trait Producer: Send + Sized { type Item; /// The type of iterator we will become. - type IntoIter: Iterator + DoubleEndedIterator + ExactSizeIterator; + type IntoIter: Iterator; /// Convert `self` into an iterator; at this point, no more parallel splits /// are possible. diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 3421ae67d9124..f60cd29a1b83f 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -74,11 +74,11 @@ taskpool! { (IO_TASK_POOL, IoTaskPool) } - +/// get thread numbers of bevy's compute task pool pub fn compute_task_pool_thread_num() -> usize { ComputeTaskPool::try_get() .map(|p| p.thread_num()) - .unwrap_or(1) + .unwrap_or(0) } /// A function used by `bevy_core` to tick the global tasks pools on the main thread. From 927dfc39121707f912662e6eae13a172d15329af Mon Sep 17 00:00:00 2001 From: re0312 Date: Tue, 23 Jan 2024 16:30:01 +0800 Subject: [PATCH 08/13] remove --- crates/bevy_tasks/src/task_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 0838d28d57325..d55a9476b0076 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -14,7 +14,7 @@ use futures_lite::FutureExt; use crate::{ block_on, thread_executor::{ThreadExecutor, ThreadExecutorTicker}, - ComputeTaskPool, Task, + Task, }; struct CallOnDrop(Option>); From f4c04542b4ef900560b375afc303a9370a677a08 Mon Sep 17 00:00:00 2001 From: re0312 Date: Tue, 23 Jan 2024 21:31:54 +0800 Subject: [PATCH 09/13] fmt --- crates/bevy_tasks/src/iter/for_each.rs | 2 +- crates/bevy_tasks/src/iter/from_par_iter.rs | 2 +- crates/bevy_tasks/src/iter/map.rs | 5 ++++- crates/bevy_tasks/src/iter/noop.rs | 1 - crates/bevy_tasks/src/slice.rs | 4 ++-- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/bevy_tasks/src/iter/for_each.rs b/crates/bevy_tasks/src/iter/for_each.rs index e577469cfdc28..21758819a1aa0 100644 --- a/crates/bevy_tasks/src/iter/for_each.rs +++ b/crates/bevy_tasks/src/iter/for_each.rs @@ -1,4 +1,4 @@ -use super::{noop::NoopReducer, Consumer, Folder, UnindexedConsumer, ParallelIterator}; +use super::{noop::NoopReducer, Consumer, Folder, ParallelIterator, UnindexedConsumer}; pub(super) fn for_each(pi: I, op: &F) where diff --git a/crates/bevy_tasks/src/iter/from_par_iter.rs b/crates/bevy_tasks/src/iter/from_par_iter.rs index 70bf6b82ba746..5e5b79c2af31c 100644 --- a/crates/bevy_tasks/src/iter/from_par_iter.rs +++ b/crates/bevy_tasks/src/iter/from_par_iter.rs @@ -1,4 +1,4 @@ -use crate::iter::{IntoParallelIterator, FromParallelIterator, ParallelExtend}; +use crate::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend}; /// Creates an empty default collection and extends it. fn collect_extended(par_iter: I) -> C diff --git a/crates/bevy_tasks/src/iter/map.rs b/crates/bevy_tasks/src/iter/map.rs index be584eb4cec2c..2677b6358b4ef 100644 --- a/crates/bevy_tasks/src/iter/map.rs +++ b/crates/bevy_tasks/src/iter/map.rs @@ -1,6 +1,9 @@ use std::iter; -use crate::{ParallelIterator, UnindexedConsumer, IndexedParallelIterator, Consumer, ProducerCallback, Producer, Folder}; +use crate::{ + Consumer, Folder, IndexedParallelIterator, ParallelIterator, Producer, ProducerCallback, + UnindexedConsumer, +}; /// `Map` is an iterator that transforms the elements of an underlying iterator. /// diff --git a/crates/bevy_tasks/src/iter/noop.rs b/crates/bevy_tasks/src/iter/noop.rs index 23acf8d92c099..8241b4cef5d2b 100644 --- a/crates/bevy_tasks/src/iter/noop.rs +++ b/crates/bevy_tasks/src/iter/noop.rs @@ -1,6 +1,5 @@ use super::Reducer; - pub(super) struct NoopReducer; impl Reducer<()> for NoopReducer { diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index cc54147830269..a145cd21ded69 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -1,6 +1,6 @@ use crate::iter::{ - bridge, Consumer, IndexedParallelIterator, IntoParallelIterator, Producer, ProducerCallback, - UnindexedConsumer, ParallelIterator, + bridge, Consumer, IndexedParallelIterator, IntoParallelIterator, ParallelIterator, Producer, + ProducerCallback, UnindexedConsumer, }; impl<'data, T: Sync + 'data> IntoParallelIterator for &'data [T] { From 81acf5dcb77ce40812966c3c6cfc02b78cbf04c7 Mon Sep 17 00:00:00 2001 From: re0312 Date: Wed, 24 Jan 2024 09:44:14 +0800 Subject: [PATCH 10/13] fix --- crates/bevy_tasks/src/iter/collect.rs | 8 ++++++-- crates/bevy_tasks/src/iter/for_each.rs | 2 +- crates/bevy_tasks/src/iter/map.rs | 3 --- crates/bevy_tasks/src/iter/mod.rs | 3 ++- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/crates/bevy_tasks/src/iter/collect.rs b/crates/bevy_tasks/src/iter/collect.rs index 7733c5e878fac..84073d39c4b91 100644 --- a/crates/bevy_tasks/src/iter/collect.rs +++ b/crates/bevy_tasks/src/iter/collect.rs @@ -2,7 +2,7 @@ use super::{Consumer, Folder, ParallelIterator, Reducer, UnindexedConsumer}; use std::{marker::PhantomData, ptr, slice}; /// We need to transmit raw pointers across threads. It is possible to do this -/// without any unsafe code by converting pointers to usize or to AtomicPtr +/// without any unsafe code by converting pointers to usize or to `AtomicPtr` /// then back to a raw pointer for use. We prefer this approach because code /// that uses this type is more explicit. /// @@ -115,6 +115,7 @@ impl<'c, T> Reducer> for CollectReducer { // Merge if the CollectResults are adjacent and in left to right order // else: drop the right piece now and total length will end up short in the end, // when the correctness of the collected result is asserted. + // SAFTY: left and right should be Continuous unsafe { let left_end = left.start.0.add(left.initialized_len); if left_end == right.start.0 { @@ -126,7 +127,7 @@ impl<'c, T> Reducer> for CollectReducer { } } -/// CollectResult represents an initialized part of the target slice. +/// `CollectResult` represents an initialized part of the target slice. /// /// This is a proxy owner of the elements in the slice; when it drops, /// the elements will be dropped, unless its ownership is released before then. @@ -158,12 +159,14 @@ impl<'c, T> CollectResult<'c, T> { } } +// SAFETY: CollectResult<'c,T> can be safely sent across threads as long as its generic type `T` is also `Send`. unsafe impl<'c, T> Send for CollectResult<'c, T> where T: Send {} impl<'c, T> Drop for CollectResult<'c, T> { fn drop(&mut self) { // Drop the first `self.initialized_len` elements, which have been recorded // to be initialized by the folder. + // SAFETY: Caller assures that `release_ownership` has been called unsafe { ptr::drop_in_place(slice::from_raw_parts_mut( self.start.0, @@ -269,6 +272,7 @@ where let new_len = vec.len() + len; + // SAFETY: The assert checks `new_len` is valid unsafe { vec.set_len(new_len); } diff --git a/crates/bevy_tasks/src/iter/for_each.rs b/crates/bevy_tasks/src/iter/for_each.rs index 21758819a1aa0..2764c10155fd9 100644 --- a/crates/bevy_tasks/src/iter/for_each.rs +++ b/crates/bevy_tasks/src/iter/for_each.rs @@ -7,7 +7,7 @@ where T: Send, { let consumer = ForEachConsumer { op }; - pi.drive_unindexed(consumer) + pi.drive_unindexed(consumer); } struct ForEachConsumer<'f, F> { diff --git a/crates/bevy_tasks/src/iter/map.rs b/crates/bevy_tasks/src/iter/map.rs index 2677b6358b4ef..a1478587200f2 100644 --- a/crates/bevy_tasks/src/iter/map.rs +++ b/crates/bevy_tasks/src/iter/map.rs @@ -6,9 +6,6 @@ use crate::{ }; /// `Map` is an iterator that transforms the elements of an underlying iterator. -/// -/// This struct is created by the [`map()`] method on [`ParallelIterator`] - #[must_use = "iterator adaptors are lazy and do nothing unless consumed"] #[derive(Clone)] pub struct Map { diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 322f4892f1ffd..b20b4a07a6db7 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -305,7 +305,7 @@ pub trait ParallelIterator: Sized + Send { where OP: Fn(Self::Item) + Sync + Send, { - for_each::for_each(self, &op) + for_each::for_each(self, &op); } /// Applies `map_op` to each item of this iterator, producing a new @@ -361,6 +361,7 @@ pub trait ParallelIterator: Sized + Send { pub trait IndexedParallelIterator: ParallelIterator { /// Produces an exact count of how many items this iterator will /// produce, presuming no panic occurs. + #[allow(clippy::len_without_is_empty)] fn len(&self) -> usize; /// Internal method used to define the behavior of this parallel From b3d22574dc46289bc7e9a83d53b2b624e50e225e Mon Sep 17 00:00:00 2001 From: re0312 Date: Wed, 24 Jan 2024 09:49:38 +0800 Subject: [PATCH 11/13] doc --- crates/bevy_tasks/src/iter/mod.rs | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index b20b4a07a6db7..253e5e3ffbe26 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -15,10 +15,6 @@ mod noop; /// consumer. It will convert the `par_iter` into a producer P and /// then pull items from P and feed them to `consumer`, splitting and /// creating parallel threads as needed. -/// -/// This is useful when you are implementing your own parallel -/// iterators: it is often used as the definition of the -/// [`drive_unindexed`] or [`drive`] methods. pub fn bridge(par_iter: I, consumer: C) -> C::Result where I: IndexedParallelIterator, @@ -70,10 +66,6 @@ where /// function. This function will draw items from `producer` and feed /// them to `consumer`, splitting and creating parallel tasks when /// needed. -/// -/// This is useful when you are implementing your own parallel -/// iterators: it is often used as the definition of the -/// [`drive_unindexed`] or [`drive`] methods. pub fn bridge_producer_consumer(len: usize, producer: P, consumer: C) -> C::Result where P: Producer, @@ -114,6 +106,8 @@ where /// operation][fold]. It can be fed many items using the `consume` /// method. At the end, once all items have been consumed, it can then /// be converted (using `complete`) into a final value. +/// +/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold pub trait Folder: Sized { /// The type of result that will ultimately be produced by the folder. type Result; @@ -173,18 +167,14 @@ pub trait Producer: Send + Sized { /// The minimum number of items that we will process /// sequentially. Defaults to 1, which means that we will split - /// all the way down to a single item. This can be raised higher - /// using the [`with_min_len`] method, which will force us to - /// create sequential tasks at a larger granularity. + /// all the way down to a single item. fn min_len(&self) -> usize { 1 } /// The maximum number of items that we will process /// sequentially. Defaults to MAX, which means that we can choose - /// not to split at all. This can be lowered using the - /// [`with_max_len`] method, which will force us to create more - /// parallel tasks. + /// not to split at all. fn max_len(&self) -> usize { usize::MAX } @@ -240,6 +230,8 @@ pub trait Reducer { /// consumers, as well as a **reducer**. The two consumers can be fed /// items independently, and when they are done the reducer is used to /// combine their two results into one. +/// +/// [fold]: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.fold pub trait Consumer: Send + Sized { /// The type of folder that this consumer can be converted into. type Folder: Folder; @@ -393,7 +385,7 @@ pub trait IndexedParallelIterator: ParallelIterator { /// `FromParallelIterator` for a given type, you define how it will be /// created from an iterator. /// -/// `FromParallelIterator` is used through [`ParallelIterator`]'s [`collect()`] method. +/// `FromParallelIterator` is used through [`ParallelIterator`]'s [`mod@collect`] method. pub trait FromParallelIterator where T: Send, From 07e7307f38b41ae2e07cf2fd297dc7771d19d2da Mon Sep 17 00:00:00 2001 From: re0312 Date: Wed, 24 Jan 2024 09:53:55 +0800 Subject: [PATCH 12/13] fmt --- crates/bevy_tasks/src/iter/collect.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/iter/collect.rs b/crates/bevy_tasks/src/iter/collect.rs index 84073d39c4b91..ad661d614c182 100644 --- a/crates/bevy_tasks/src/iter/collect.rs +++ b/crates/bevy_tasks/src/iter/collect.rs @@ -272,7 +272,7 @@ where let new_len = vec.len() + len; - // SAFETY: The assert checks `new_len` is valid + // SAFETY: The assert checks `new_len` is valid unsafe { vec.set_len(new_len); } From eccf23a2463d6b052b6395d41ce6b4d5911e0eed Mon Sep 17 00:00:00 2001 From: re0312 Date: Wed, 24 Jan 2024 10:04:41 +0800 Subject: [PATCH 13/13] fix cli --- crates/bevy_tasks/src/iter/collect.rs | 4 ++-- crates/bevy_tasks/src/iter/mod.rs | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/bevy_tasks/src/iter/collect.rs b/crates/bevy_tasks/src/iter/collect.rs index ad661d614c182..e0689519704a9 100644 --- a/crates/bevy_tasks/src/iter/collect.rs +++ b/crates/bevy_tasks/src/iter/collect.rs @@ -2,7 +2,7 @@ use super::{Consumer, Folder, ParallelIterator, Reducer, UnindexedConsumer}; use std::{marker::PhantomData, ptr, slice}; /// We need to transmit raw pointers across threads. It is possible to do this -/// without any unsafe code by converting pointers to usize or to `AtomicPtr` +/// without any unsafe code by converting pointers to usize or to `AtomicPtr` /// then back to a raw pointer for use. We prefer this approach because code /// that uses this type is more explicit. /// @@ -115,7 +115,7 @@ impl<'c, T> Reducer> for CollectReducer { // Merge if the CollectResults are adjacent and in left to right order // else: drop the right piece now and total length will end up short in the end, // when the correctness of the collected result is asserted. - // SAFTY: left and right should be Continuous + // SAFETY: left and right should be Continuous unsafe { let left_end = left.start.0.add(left.initialized_len); if left_end == right.start.0 { diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 253e5e3ffbe26..cabee9fd51c66 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -42,7 +42,7 @@ where } } -/// ToDO:optimize it +/// TODO: optimize it fn join(pool: &TaskPool, op_a: A, op_b: B) -> (RA, RB) where A: FnOnce() -> RA + Send, @@ -265,7 +265,7 @@ pub trait Consumer: Send + Sized { pub trait UnindexedConsumer: Consumer { /// Splits off a "left" consumer and returns it. The `self` /// consumer should then be used to consume the "right" portion of - /// the data. (The ordering matters for methods like find_first -- + /// the data. (The ordering matters for methods like `find_first` -- /// values produced by the returned value are given precedence /// over values produced by `self`.) Once the left and right /// halves have been fully consumed, you should reduce the results @@ -350,10 +350,10 @@ pub trait ParallelIterator: Sized + Send { /// /// **Note:** Not implemented for `u64`, `i64`, `u128`, or `i128` ranges // Waiting for `ExactSizeIterator::is_empty` to be stabilized. See rust-lang/rust#35428 +#[allow(clippy::len_without_is_empty)] pub trait IndexedParallelIterator: ParallelIterator { /// Produces an exact count of how many items this iterator will /// produce, presuming no panic occurs. - #[allow(clippy::len_without_is_empty)] fn len(&self) -> usize; /// Internal method used to define the behavior of this parallel @@ -385,7 +385,7 @@ pub trait IndexedParallelIterator: ParallelIterator { /// `FromParallelIterator` for a given type, you define how it will be /// created from an iterator. /// -/// `FromParallelIterator` is used through [`ParallelIterator`]'s [`mod@collect`] method. +/// `FromParallelIterator` is used through [`ParallelIterator`] pub trait FromParallelIterator where T: Send,