Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Span parenting so that system spans are correctly linked in multithreaded executor #2423

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub struct App {
pub world: World,
pub runner: Box<dyn Fn(App)>,
pub schedule: Schedule,
#[cfg(feature = "trace")]
frame_count: u32,
}

impl Default for App {
Expand Down Expand Up @@ -71,14 +73,21 @@ impl App {
world: Default::default(),
schedule: Default::default(),
runner: Box::new(run_once),
#[cfg(feature = "trace")]
frame_count: 0,
}
}

pub fn update(&mut self) {
#[cfg(feature = "trace")]
let bevy_frame_update_span = info_span!("frame");
#[cfg(feature = "trace")]
let _bevy_frame_update_guard = bevy_frame_update_span.enter();
{
self.frame_count = self.frame_count.wrapping_add(1);
let bevy_frame_update_span = info_span!("frame", frame_count = self.frame_count);
let _bevy_frame_update_guard = bevy_frame_update_span.enter();
self.schedule
.run_in_span(&mut self.world, Some(&bevy_frame_update_span));
}
#[cfg(not(feature = "trace"))]
self.schedule.run(&mut self.world);
}

Expand Down
32 changes: 29 additions & 3 deletions crates/bevy_ecs/src/schedule/executor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
use crate::{archetype::ArchetypeGeneration, schedule::ParallelSystemContainer, world::World};
use bevy_utils::tracing::Span;
use downcast_rs::{impl_downcast, Downcast};

pub trait ParallelSystemExecutor: Downcast + Send + Sync {
/// Called by `SystemStage` whenever `systems` have been changed.
fn rebuild_cached_data(&mut self, systems: &[ParallelSystemContainer]);

fn run_systems(&mut self, systems: &mut [ParallelSystemContainer], world: &mut World);
fn run_systems_in_span(
&mut self,
systems: &mut [ParallelSystemContainer],
world: &mut World,
span: Option<&Span>,
);

fn run_systems(&mut self, systems: &mut [ParallelSystemContainer], world: &mut World) {
self.run_systems_in_span(systems, world, None);
}
}

impl_downcast!(ParallelSystemExecutor);
Expand All @@ -25,13 +35,29 @@ impl Default for SingleThreadedExecutor {
impl ParallelSystemExecutor for SingleThreadedExecutor {
fn rebuild_cached_data(&mut self, _: &[ParallelSystemContainer]) {}

fn run_systems(&mut self, systems: &mut [ParallelSystemContainer], world: &mut World) {
#[allow(unused)]
fn run_systems_in_span(
&mut self,
systems: &mut [ParallelSystemContainer],
world: &mut World,
span: Option<&Span>,
) {
self.update_archetypes(systems, world);

for system in systems {
if system.should_run() {
#[cfg(feature = "trace")]
let system_span = bevy_utils::tracing::info_span!("system", name = &*system.name());
let system_span = {
if let Some(span) = span {
bevy_utils::tracing::info_span!(
parent: span,
"system",
name = &*system.name()
)
} else {
bevy_utils::tracing::info_span!("system", name = &*system.name())
}
};
#[cfg(feature = "trace")]
let _system_guard = system_span.enter();
system.system_mut().run((), world);
Expand Down
25 changes: 21 additions & 4 deletions crates/bevy_ecs/src/schedule/executor_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
};
use async_channel::{Receiver, Sender};
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool};
use bevy_utils::tracing::Span;
use fixedbitset::FixedBitSet;

#[cfg(test)]
Expand Down Expand Up @@ -104,7 +105,12 @@ impl ParallelSystemExecutor for ParallelExecutor {
}
}

fn run_systems(&mut self, systems: &mut [ParallelSystemContainer], world: &mut World) {
fn run_systems_in_span(
&mut self,
systems: &mut [ParallelSystemContainer],
world: &mut World,
span: Option<&Span>,
) {
#[cfg(test)]
if self.events_sender.is_none() {
let (sender, receiver) = async_channel::unbounded::<SchedulingEvent>();
Expand All @@ -118,7 +124,7 @@ impl ParallelSystemExecutor for ParallelExecutor {
.get_resource_or_insert_with(|| ComputeTaskPool(TaskPool::default()))
.clone();
compute_pool.scope(|scope| {
self.prepare_systems(scope, systems, world);
self.prepare_systems(scope, systems, world, span);
scope.spawn(async {
// All systems have been ran if there are no queued or running systems.
while 0 != self.queued.count_ones(..) + self.running.count_ones(..) {
Expand Down Expand Up @@ -168,11 +174,13 @@ impl ParallelExecutor {

/// Populates `should_run` bitset, spawns tasks for systems that should run this iteration,
/// queues systems with no dependencies to run (or skip) at next opportunity.
#[allow(unused)]
fn prepare_systems<'scope>(
&mut self,
scope: &mut Scope<'scope, ()>,
systems: &'scope [ParallelSystemContainer],
world: &'scope World,
span: Option<&'scope Span>,
) {
self.should_run.clear();
for (index, system_data) in self.system_metadata.iter_mut().enumerate() {
Expand All @@ -188,8 +196,17 @@ impl ParallelExecutor {
.await
.unwrap_or_else(|error| unreachable!(error));
#[cfg(feature = "trace")]
let system_span =
bevy_utils::tracing::info_span!("system", name = &*system.name());
let system_span = {
if let Some(span) = span {
bevy_utils::tracing::info_span!(
parent: span,
"system",
name = &*system.name()
)
} else {
bevy_utils::tracing::info_span!("system", name = &*system.name())
}
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see this is an exact copy of the SingleThreadedExecutor's code to calculate this - it might be worth factoring it out?

#[cfg(feature = "trace")]
let system_guard = system_span.enter();
unsafe { system.run_unsafe((), world) };
Expand Down
33 changes: 24 additions & 9 deletions crates/bevy_ecs/src/schedule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use system_set::*;
use std::fmt::Debug;

use crate::{system::System, world::World};
use bevy_utils::HashMap;
use bevy_utils::{tracing::Span, HashMap};

#[derive(Default)]
pub struct Schedule {
Expand Down Expand Up @@ -195,13 +195,28 @@ impl Schedule {
}

pub fn run_once(&mut self, world: &mut World) {
self.run_once_in_span(world, None);
}

#[allow(unused)]
pub fn run_once_in_span(&mut self, world: &mut World, span: Option<&Span>) {
for label in self.stage_order.iter() {
#[cfg(feature = "trace")]
let stage_span =
bevy_utils::tracing::info_span!("stage", name = &format!("{:?}", label) as &str);
#[cfg(feature = "trace")]
let _stage_guard = stage_span.enter();
let stage = self.stages.get_mut(label).unwrap();
#[cfg(feature = "trace")]
{
let stage_span = if let Some(span) = span {
bevy_utils::tracing::info_span!(
parent: span,
"stage",
name = &format!("{:?}", label) as &str
)
} else {
bevy_utils::tracing::info_span!("stage", name = &format!("{:?}", label) as &str)
};
let _stage_guard = stage_span.enter();
stage.run_in_span(world, Some(&stage_span));
}
#[cfg(not(feature = "trace"))]
stage.run(world);
}
}
Expand All @@ -215,16 +230,16 @@ impl Schedule {
}

impl Stage for Schedule {
fn run(&mut self, world: &mut World) {
fn run_in_span(&mut self, world: &mut World, span: Option<&Span>) {
loop {
match self.run_criteria.should_run(world) {
ShouldRun::No => return,
ShouldRun::Yes => {
self.run_once(world);
self.run_once_in_span(world, span);
return;
}
ShouldRun::YesAndCheckAgain => {
self.run_once(world);
self.run_once_in_span(world, span);
}
ShouldRun::NoAndCheckAgain => {
panic!("`NoAndCheckAgain` would loop infinitely in this situation.")
Expand Down
16 changes: 12 additions & 4 deletions crates/bevy_ecs/src/schedule/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use crate::{
},
world::{World, WorldId},
};
use bevy_utils::{tracing::info, HashMap, HashSet};
use bevy_utils::{
tracing::{info, Span},
HashMap, HashSet,
};
use downcast_rs::{impl_downcast, Downcast};
use fixedbitset::FixedBitSet;
use std::fmt::Debug;
Expand All @@ -21,7 +24,11 @@ use super::IntoSystemDescriptor;
pub trait Stage: Downcast + Send + Sync {
/// Runs the stage; this happens once per update.
/// Implementors must initialize all of their state and systems before running the first time.
fn run(&mut self, world: &mut World);
fn run(&mut self, world: &mut World) {
self.run_in_span(world, None);
}

fn run_in_span(&mut self, world: &mut World, span: Option<&Span>);
}

impl_downcast!(Stage);
Expand Down Expand Up @@ -738,7 +745,7 @@ fn find_ambiguities(systems: &[impl SystemContainer]) -> Vec<(usize, usize, Vec<
}

impl Stage for SystemStage {
fn run(&mut self, world: &mut World) {
fn run_in_span(&mut self, world: &mut World, span: Option<&Span>) {
if let Some(world_id) = self.world_id {
assert!(
world.id() == world_id,
Expand Down Expand Up @@ -821,7 +828,8 @@ impl Stage for SystemStage {
container.should_run =
should_run(container, &self.run_criteria, default_should_run);
}
self.executor.run_systems(&mut self.parallel, world);
self.executor
.run_systems_in_span(&mut self.parallel, world, span);

// Run systems that want to be between parallel systems and their command buffers.
for container in &mut self.exclusive_before_commands {
Expand Down