From 14113a349884f65cff8e129a343681433c1eb589 Mon Sep 17 00:00:00 2001 From: Benjamin Woodruff Date: Thu, 15 Aug 2024 13:54:53 -0700 Subject: [PATCH] Split local and global task state --- turbopack/crates/turbo-tasks/src/manager.rs | 263 ++++++++++---------- 1 file changed, 137 insertions(+), 126 deletions(-) diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index 18aa99080789c..15de2eaa84b52 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -309,18 +309,17 @@ pub struct TurboTasks { program_start: Instant, } -struct CurrentTaskState { +/// Information about a "global" task. A global task can contain multiple "local" tasks (see +/// [`CurrentLocalTaskState`]), which all share the same global state. +/// +/// A global task is one that: +/// +/// - Has a unique task id. +/// - Is potentially cached. +/// - The backend is aware of. +struct CurrentGlobalTaskState { task_id: TaskId, - /// A unique identifier created for each unique `CurrentTaskState`. Used to - /// check that [`CurrentTaskState::local_cells`] are valid for the current - /// `RawVc::LocalCell`. - execution_id: ExecutionId, - - /// The function's metadata if this is a persistent task. Contains information about arguments - /// passed to the `#[turbo_tasks::function(...)]` macro. - function_meta: Option<&'static FunctionMeta>, - /// Affected tasks, that are tracked during task execution. These tasks will /// be invalidated when the execution finishes or before reading a cell /// value. @@ -344,16 +343,10 @@ struct CurrentTaskState { local_task_tracker: TaskTracker, } -impl CurrentTaskState { - fn new( - task_id: TaskId, - execution_id: ExecutionId, - function_meta: Option<&'static FunctionMeta>, - ) -> Self { +impl CurrentGlobalTaskState { + fn new(task_id: TaskId) -> Self { Self { task_id, - execution_id, - function_meta, tasks_to_notify: Vec::new(), stateful: false, cell_counters: Some(AutoMap::default()), @@ -363,12 +356,38 @@ impl CurrentTaskState { } } +/// Information specific to the current "local" task. A local task re-uses it's parent global task's +/// [`CurrentGlobalTaskState`]. +/// +/// Even if a task itself isn't local, it will have a `CurrentLocalTaskState` representing the root +/// of the global task. +#[derive(Clone)] +struct CurrentLocalTaskState { + /// A unique identifier created for each unique [`CurrentLocalTaskState`]. Used to check that + /// [`CurrentTaskState::local_cells`] are valid for the current [`RawVc::LocalCell`]. + execution_id: ExecutionId, + + /// The function's metadata if this is a persistent task. Contains information about arguments + /// passed to the `#[turbo_tasks::function(...)]` macro. + function_meta: Option<&'static FunctionMeta>, +} + +impl CurrentLocalTaskState { + fn new(execution_id: ExecutionId, function_meta: Option<&'static FunctionMeta>) -> Self { + Self { + execution_id, + function_meta, + } + } +} + // TODO implement our own thread pool and make these thread locals instead task_local! { /// The current TurboTasks instance static TURBO_TASKS: Arc; - static CURRENT_TASK_STATE: Arc>; + static CURRENT_GLOBAL_TASK_STATE: Arc>; + static CURRENT_LOCAL_TASK_STATE: CurrentLocalTaskState; } impl TurboTasks { @@ -670,61 +689,65 @@ impl TurboTasks { let future = async move { let mut schedule_again = true; while schedule_again { - let task_state = Arc::new(RwLock::new(CurrentTaskState::new( - task_id, + let global_task_state = Arc::new(RwLock::new(CurrentGlobalTaskState::new(task_id))); + let local_task_state = CurrentLocalTaskState::new( this.execution_id_factory.get(), this.backend .try_get_function_id(task_id) .map(|func_id| &get_function(func_id).function_meta), - ))); - schedule_again = CURRENT_TASK_STATE - .scope(task_state, async { - if this.stopped.load(Ordering::Acquire) { - return false; - } + ); + let single_execution_future = async { + if this.stopped.load(Ordering::Acquire) { + return false; + } - let Some(TaskExecutionSpec { future, span }) = - this.backend.try_start_task_execution(task_id, &*this) - else { - return false; - }; - - async { - let (result, duration, memory_usage) = - CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()).await; - - // wait for all spawned local tasks using `local_cells` to finish - let ltt = CURRENT_TASK_STATE - .with(|ts| ts.read().unwrap().local_task_tracker.clone()); - ltt.close(); - ltt.wait().await; - - let result = result.map_err(|any| match any.downcast::() { - Ok(owned) => Some(Cow::Owned(*owned)), - Err(any) => match any.downcast::<&'static str>() { - Ok(str) => Some(Cow::Borrowed(*str)), - Err(_) => None, - }, - }); - this.backend.task_execution_result(task_id, result, &*this); - let stateful = this.finish_current_task_state(); - let cell_counters = CURRENT_TASK_STATE - .with(|ts| ts.write().unwrap().cell_counters.take().unwrap()); - let schedule_again = this.backend.task_execution_completed( - task_id, - duration, - memory_usage, - &cell_counters, - stateful, - &*this, - ); - // task_execution_completed might need to notify tasks - this.notify_scheduled_tasks(); - schedule_again - } - .instrument(span) - .await - }) + let Some(TaskExecutionSpec { future, span }) = + this.backend.try_start_task_execution(task_id, &*this) + else { + return false; + }; + + async { + let (result, duration, memory_usage) = + CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()).await; + + // wait for all spawned local tasks using `local_cells` to finish + let ltt = CURRENT_GLOBAL_TASK_STATE + .with(|ts| ts.read().unwrap().local_task_tracker.clone()); + ltt.close(); + ltt.wait().await; + + let result = result.map_err(|any| match any.downcast::() { + Ok(owned) => Some(Cow::Owned(*owned)), + Err(any) => match any.downcast::<&'static str>() { + Ok(str) => Some(Cow::Borrowed(*str)), + Err(_) => None, + }, + }); + this.backend.task_execution_result(task_id, result, &*this); + let stateful = this.finish_current_task_state(); + let cell_counters = CURRENT_GLOBAL_TASK_STATE + .with(|ts| ts.write().unwrap().cell_counters.take().unwrap()); + let schedule_again = this.backend.task_execution_completed( + task_id, + duration, + memory_usage, + &cell_counters, + stateful, + &*this, + ); + // task_execution_completed might need to notify tasks + this.notify_scheduled_tasks(); + schedule_again + } + .instrument(span) + .await + }; + schedule_again = CURRENT_GLOBAL_TASK_STATE + .scope( + global_task_state, + CURRENT_LOCAL_TASK_STATE.scope(local_task_state, single_execution_future), + ) .await; } this.finish_primary_job(); @@ -1037,8 +1060,8 @@ impl TurboTasks { } fn finish_current_task_state(&self) -> bool { - let (stateful, tasks) = CURRENT_TASK_STATE.with(|cell| { - let CurrentTaskState { + let (stateful, tasks) = CURRENT_GLOBAL_TASK_STATE.with(|cell| { + let CurrentGlobalTaskState { tasks_to_notify, stateful, .. @@ -1165,9 +1188,9 @@ impl TurboTasksApi for TurboTasks { } fn notify_scheduled_tasks(&self) { - let _ = CURRENT_TASK_STATE.try_with(|cell| { + let _ = CURRENT_GLOBAL_TASK_STATE.try_with(|cell| { let tasks = { - let CurrentTaskState { + let CurrentGlobalTaskState { tasks_to_notify, .. } = &mut *cell.write().unwrap(); take(tasks_to_notify) @@ -1291,20 +1314,24 @@ impl TurboTasksApi for TurboTasks { ) -> Pin> + Send + 'static>> { // this is similar to what happens for a local task, except that we keep the local task's // state as well. - let task_state = CURRENT_TASK_STATE.with(|ts| ts.clone()); + let global_task_state = CURRENT_GLOBAL_TASK_STATE.with(|ts| ts.clone()); + let local_task_state = CURRENT_LOCAL_TASK_STATE.with(|ts| ts.clone()); let (task_id, fut) = { - let ts = task_state.read().unwrap(); + let ts = global_task_state.read().unwrap(); (ts.task_id, ts.local_task_tracker.track_future(fut)) }; Box::pin(TURBO_TASKS.scope( turbo_tasks(), - CURRENT_TASK_STATE.scope( - task_state, - // TODO(bgw): This will create a new task-local in the backend, which is not - // what we want. Instead we should replace `execution_scope` with a more - // limited API that allows storing thread-local state in a way the manager can - // control. - self.backend.execution_scope(task_id, fut), + CURRENT_GLOBAL_TASK_STATE.scope( + global_task_state, + CURRENT_LOCAL_TASK_STATE.scope( + local_task_state, + // TODO(bgw): This will create a new task-local in the backend, which is not + // what we want. Instead we should replace `execution_scope` with a more + // limited API that allows storing thread-local state in a way the manager can + // control. + self.backend.execution_scope(task_id, fut), + ), ), )) } @@ -1370,8 +1397,8 @@ impl TurboTasksBackendApi for TurboTasks { /// Enqueues tasks for notification of changed dependencies. This will /// eventually call `dependent_cell_updated()` on all tasks. fn schedule_notify_tasks(&self, tasks: &[TaskId]) { - let result = CURRENT_TASK_STATE.try_with(|cell| { - let CurrentTaskState { + let result = CURRENT_GLOBAL_TASK_STATE.try_with(|cell| { + let CurrentGlobalTaskState { tasks_to_notify, .. } = &mut *cell.write().unwrap(); tasks_to_notify.extend(tasks.iter()); @@ -1385,8 +1412,8 @@ impl TurboTasksBackendApi for TurboTasks { /// Enqueues tasks for notification of changed dependencies. This will /// eventually call `dependent_cell_updated()` on all tasks. fn schedule_notify_tasks_set(&self, tasks: &TaskIdSet) { - let result = CURRENT_TASK_STATE.try_with(|cell| { - let CurrentTaskState { + let result = CURRENT_GLOBAL_TASK_STATE.try_with(|cell| { + let CurrentGlobalTaskState { tasks_to_notify, .. } = &mut *cell.write().unwrap(); tasks_to_notify.extend(tasks.iter()); @@ -1426,7 +1453,7 @@ impl TurboTasksBackendApi for TurboTasks { } pub(crate) fn current_task(from: &str) -> TaskId { - match CURRENT_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) { + match CURRENT_GLOBAL_TASK_STATE.try_with(|ts| ts.read().unwrap().task_id) { Ok(id) => id, Err(_) => panic!( "{} can only be used in the context of turbo_tasks task execution", @@ -1638,13 +1665,9 @@ pub fn with_turbo_tasks_for_testing( ) -> impl Future { TURBO_TASKS.scope( tt, - CURRENT_TASK_STATE.scope( - Arc::new(RwLock::new(CurrentTaskState::new( - current_task, - execution_id, - None, - ))), - f, + CURRENT_GLOBAL_TASK_STATE.scope( + Arc::new(RwLock::new(CurrentGlobalTaskState::new(current_task))), + CURRENT_LOCAL_TASK_STATE.scope(CurrentLocalTaskState::new(execution_id, None), f), ), ) } @@ -1658,7 +1681,7 @@ pub fn spawn_detached_for_testing(f: impl Future> + Send + ' } pub fn current_task_for_testing() -> TaskId { - CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().task_id) + CURRENT_GLOBAL_TASK_STATE.with(|ts| ts.read().unwrap().task_id) } /// Get an [`Invalidator`] that can be used to invalidate the current task @@ -1683,8 +1706,8 @@ pub fn mark_finished() { /// Marks the current task as stateful. This prevents the tasks from being /// dropped without persisting the state. pub fn mark_stateful() { - CURRENT_TASK_STATE.with(|cell| { - let CurrentTaskState { stateful, .. } = &mut *cell.write().unwrap(); + CURRENT_GLOBAL_TASK_STATE.with(|cell| { + let CurrentGlobalTaskState { stateful, .. } = &mut *cell.write().unwrap(); *stateful = true; }) } @@ -1940,7 +1963,7 @@ impl From for RawVc { } pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef { - CURRENT_TASK_STATE.with(|ts| { + CURRENT_GLOBAL_TASK_STATE.with(|ts| { let current_task = current_task("celling turbo_tasks values"); let mut ts = ts.write().unwrap(); let map = ts.cell_counters.as_mut().unwrap(); @@ -1955,20 +1978,16 @@ pub fn find_cell_by_type(ty: ValueTypeId) -> CurrentCellRef { } pub(crate) fn try_get_function_meta() -> Option<&'static FunctionMeta> { - CURRENT_TASK_STATE.with(|ts| ts.read().unwrap().function_meta) + CURRENT_LOCAL_TASK_STATE.with(|ts| ts.function_meta) } pub(crate) fn create_local_cell(value: TypedSharedReference) -> (ExecutionId, LocalCellId) { - let (execution_id, raw_local_cell_id) = CURRENT_TASK_STATE.with(|ts| { - let CurrentTaskState { - execution_id, - local_cells, - .. - } = &mut *ts.write().unwrap(); - + let execution_id = CURRENT_LOCAL_TASK_STATE.with(|ts| ts.execution_id); + let raw_local_cell_id = CURRENT_GLOBAL_TASK_STATE.with(|ts| { + let CurrentGlobalTaskState { local_cells, .. } = &mut *ts.write().unwrap(); // store in the task-local arena local_cells.push(value); - (*execution_id, local_cells.len()) + local_cells.len() }); // generate a one-indexed id let local_cell_id = if cfg!(debug_assertions) { @@ -1992,13 +2011,9 @@ pub(crate) fn read_local_cell( execution_id: ExecutionId, local_cell_id: LocalCellId, ) -> TypedSharedReference { - CURRENT_TASK_STATE.with(|ts| { - let CurrentTaskState { - execution_id: expected_execution_id, - local_cells, - .. - } = &*ts.write().unwrap(); - assert_eq_local_cell(execution_id, *expected_execution_id); + assert_execution_id(execution_id); + CURRENT_GLOBAL_TASK_STATE.with(|ts| { + let CurrentGlobalTaskState { local_cells, .. } = &*ts.write().unwrap(); // local cell ids are one-indexed (they use NonZeroU32) local_cells[(*local_cell_id as usize) - 1].clone() }) @@ -2007,19 +2022,15 @@ pub(crate) fn read_local_cell( /// Panics if the [`ExecutionId`] does not match the current task's /// `execution_id`. pub(crate) fn assert_execution_id(execution_id: ExecutionId) { - CURRENT_TASK_STATE.with(|ts| { - let CurrentTaskState { + CURRENT_LOCAL_TASK_STATE.with(|ts| { + let CurrentLocalTaskState { execution_id: expected_execution_id, .. - } = &*ts.read().unwrap(); - assert_eq_local_cell(execution_id, *expected_execution_id); + } = ts; + assert_eq!( + &execution_id, expected_execution_id, + "This Vc is local. Local Vcs must only be accessed within their own task. Resolve the \ + Vc to convert it into a non-local version." + ); }) } - -fn assert_eq_local_cell(actual: ExecutionId, expected: ExecutionId) { - assert_eq!( - actual, expected, - "This Vc is local. Local Vcs must only be accessed within their own task. Resolve the Vc \ - to convert it into a non-local version." - ); -}