From 15950e6afa0cd0ea59aaf45b3aa27217f3495cd4 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 15 Aug 2024 11:28:30 +0200 Subject: [PATCH] remove exceeding cells --- .../turbo-tasks-backend/src/backend/mod.rs | 109 +++++++++++++++-- .../backend/operation/cleanup_old_edges.rs | 5 + .../src/backend/operation/invalidate.rs | 112 +++++++++--------- .../src/backend/storage.rs | 9 ++ .../crates/turbo-tasks-backend/src/data.rs | 57 ++++++++- 5 files changed, 228 insertions(+), 64 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 3f296e1f07af5..7e843900d6b64 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -16,7 +16,7 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::Result; +use anyhow::{bail, Result}; use auto_hash_map::{AutoMap, AutoSet}; use dashmap::DashMap; pub use operation::AnyOperation; @@ -41,9 +41,9 @@ use crate::{ backing_storage::BackingStorage, data::{ CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate, CellRef, - InProgressState, OutputValue, RootType, + InProgressCellState, InProgressState, OutputValue, RootType, }, - get, remove, + get, get_many, remove, utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc}, }; @@ -261,7 +261,14 @@ impl TurboTasksBackend { } } - todo!("Output of is not available, recompute task: {task:#?}"); + // Output doesn't exist. We need to schedule the task to compute it. + let dirty = task.has_key(&CachedDataItemKey::Dirty {}); + let (item, listener) = CachedDataItem::new_scheduled_with_listener(task_id, !dirty); + if task.add(item) { + turbo_tasks.schedule(task_id); + } + + Ok(Err(listener)) } fn try_read_task_cell( @@ -300,7 +307,45 @@ impl TurboTasksBackend { ))); } - todo!("Cell {cell:?} is not available, recompute task or error: {task:#?}"); + // Check cell index range (cell might not exist at all) + let Some(max_id) = get!( + task, + CellTypeMaxIndex { + cell_type: cell.type_id + } + ) else { + bail!( + "Cell {cell:?} no longer exists in task {task_id:?} (no cell of this type exists)" + ); + }; + if cell.index > *max_id { + bail!("Cell {cell:?} no longer exists in task {task_id:?} (index out of bounds)"); + } + + // Cell should exist, but data was dropped or is not serializable. We need to recompute the + // task the get the cell content. + + // Register event listener for cell computation + if let Some(in_progress) = get!(task, InProgressCell { cell }) { + // Someone else is already computing the cell + let listener = in_progress.event.listen(); + return Ok(Err(listener)); + } + + // We create the event and potentially schedule the task + let in_progress = InProgressCellState::new(task_id, cell); + let listener = in_progress.event.listen(); + task.add(CachedDataItem::InProgressCell { + cell, + value: in_progress, + }); + + // Schedule the task + if task.add(CachedDataItem::new_scheduled(task_id)) { + turbo_tasks.schedule(task_id); + } + + Ok(Err(listener)) } fn lookup_task_type(&self, task_id: TaskId) -> Option> { @@ -745,9 +790,6 @@ impl Backend for TurboTasksBackend { panic!("Task execution completed, but task is not in progress: {task:#?}"); }; - // TODO handle cell counters - let _ = cell_counters; - // TODO handle stateful let _ = stateful; @@ -762,6 +804,48 @@ impl Backend for TurboTasksBackend { drop(task); drop(ctx); } else { + // handle cell counters: update max index and remove cells that are no longer used + let mut removed_cells = HashMap::new(); + let mut old_counters: HashMap<_, _> = + get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, max_index)); + for (&cell_type, &max_index) in cell_counters.iter() { + if let Some(old_max_index) = old_counters.remove(&cell_type) { + if old_max_index != max_index { + task.insert(CachedDataItem::CellTypeMaxIndex { + cell_type, + value: max_index, + }); + if old_max_index > max_index { + removed_cells.insert(cell_type, max_index + 1..=old_max_index); + } + } + } else { + task.add(CachedDataItem::CellTypeMaxIndex { + cell_type, + value: max_index, + }); + } + } + for (cell_type, old_max_index) in old_counters { + task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type }); + removed_cells.insert(cell_type, 0..=old_max_index); + } + let mut removed_data = Vec::new(); + for (&cell_type, range) in removed_cells.iter() { + for index in range.clone() { + removed_data.extend( + task.remove(&CachedDataItemKey::CellData { + cell: CellId { + type_id: cell_type, + index, + }, + }) + .into_iter(), + ); + } + } + + // find all outdated data items (removed cells, outdated edges) let old_edges = task .iter() .filter_map(|(key, _)| match *key { @@ -772,6 +856,13 @@ impl Backend for TurboTasksBackend { CachedDataItemKey::OutdatedOutputDependency { target } => { Some(OutdatedEdge::OutputDependency(target)) } + CachedDataItemKey::CellDependent { cell, task } + if removed_cells + .get(&cell.type_id) + .map_or(false, |range| range.contains(&cell.index)) => + { + Some(OutdatedEdge::RemovedCellDependent(task)) + } _ => None, }) .collect::>(); @@ -782,6 +873,8 @@ impl Backend for TurboTasksBackend { drop(task); CleanupOldEdgesOperation::run(task_id, old_edges, ctx); + + drop(removed_data) } stale diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs index e4dc93f6a6c2c..ecd71c9df6fff 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/cleanup_old_edges.rs @@ -5,6 +5,7 @@ use turbo_tasks::TaskId; use super::{ aggregation_update::{AggregationUpdateJob, AggregationUpdateQueue}, + invalidate::make_task_dirty, ExecuteContext, Operation, }; use crate::{ @@ -32,6 +33,7 @@ pub enum OutdatedEdge { Child(TaskId), CellDependency(CellRef), OutputDependency(TaskId), + RemovedCellDependent(TaskId), } impl CleanupOldEdgesOperation { @@ -101,6 +103,9 @@ impl Operation for CleanupOldEdgesOperation { }); } } + OutdatedEdge::RemovedCellDependent(task_id) => { + make_task_dirty(task_id, queue, ctx); + } } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs index 07fb67e5fe180..c4c42415c2e32 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/invalidate.rs @@ -39,60 +39,7 @@ impl Operation for InvalidateOperation { InvalidateOperation::MakeDirty { task_ids } => { let mut queue = AggregationUpdateQueue::new(); for task_id in task_ids { - let mut task = ctx.task(task_id); - - if task.add(CachedDataItem::Dirty { value: () }) { - let in_progress = match get!(task, InProgress) { - Some(InProgressState::Scheduled { clean, .. }) => { - if *clean { - update!(task, InProgress, |in_progress| { - let Some(InProgressState::Scheduled { - clean: _, - done_event, - start_event, - }) = in_progress - else { - unreachable!(); - }; - Some(InProgressState::Scheduled { - clean: false, - done_event, - start_event, - }) - }); - } - true - } - Some(InProgressState::InProgress { clean, stale, .. }) => { - if *clean || !*stale { - update!(task, InProgress, |in_progress| { - let Some(InProgressState::InProgress { - clean: _, - stale: _, - done_event, - }) = in_progress - else { - unreachable!(); - }; - Some(InProgressState::InProgress { - clean: false, - stale: true, - done_event, - }) - }); - } - true - } - None => false, - }; - if !in_progress && task.add(CachedDataItem::new_scheduled(task_id)) { - ctx.turbo_tasks.schedule(task_id) - } - queue.push(AggregationUpdateJob::DataUpdate { - task_id, - update: AggregatedDataUpdate::dirty_task(task_id), - }) - } + make_task_dirty(task_id, &mut queue, ctx); } if queue.is_empty() { self = InvalidateOperation::Done @@ -113,3 +60,60 @@ impl Operation for InvalidateOperation { } } } + +pub fn make_task_dirty(task_id: TaskId, queue: &mut AggregationUpdateQueue, ctx: &ExecuteContext) { + let mut task = ctx.task(task_id); + + if task.add(CachedDataItem::Dirty { value: () }) { + let in_progress = match get!(task, InProgress) { + Some(InProgressState::Scheduled { clean, .. }) => { + if *clean { + update!(task, InProgress, |in_progress| { + let Some(InProgressState::Scheduled { + clean: _, + done_event, + start_event, + }) = in_progress + else { + unreachable!(); + }; + Some(InProgressState::Scheduled { + clean: false, + done_event, + start_event, + }) + }); + } + true + } + Some(InProgressState::InProgress { clean, stale, .. }) => { + if *clean || !*stale { + update!(task, InProgress, |in_progress| { + let Some(InProgressState::InProgress { + clean: _, + stale: _, + done_event, + }) = in_progress + else { + unreachable!(); + }; + Some(InProgressState::InProgress { + clean: false, + stale: true, + done_event, + }) + }); + } + true + } + None => false, + }; + if !in_progress && task.add(CachedDataItem::new_scheduled(task_id)) { + ctx.turbo_tasks.schedule(task_id) + } + queue.push(AggregationUpdateJob::DataUpdate { + task_id, + update: AggregatedDataUpdate::dirty_task(task_id), + }) + } +} diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index 4832f29ffbc59..15f5b9b1058d0 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -207,6 +207,15 @@ macro_rules! get_many { }) .collect() }; + ($task:ident, $key:ident $input:tt $value_ident:ident => $value:expr) => { + $task + .iter() + .filter_map(|(key, value)| match (key, value) { + (&CachedDataItemKey::$key $input, &CachedDataItemValue::$key { value: $value_ident }) => Some($value), + _ => None, + }) + .collect() + }; ($task:ident, $key1:ident $input1:tt => $value1:ident, $key2:ident $input2:tt => $value2:ident) => { $task .iter() diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 782b09f423ee5..2841d1042fcae 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -1,7 +1,9 @@ use serde::{Deserialize, Serialize}; use turbo_tasks::{ - event::Event, registry, util::SharedError, CellId, KeyValuePair, TaskId, TypedSharedReference, - ValueTypeId, + event::{Event, EventListener}, + registry, + util::SharedError, + CellId, KeyValuePair, TaskId, TypedSharedReference, ValueTypeId, }; #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] @@ -44,11 +46,13 @@ pub enum RootType { #[derive(Debug)] pub enum InProgressState { Scheduled { + // TODO remove in favor of Dirty clean: bool, done_event: Event, start_event: Event, }, InProgress { + // TODO remove in favor of Dirty clean: bool, stale: bool, done_event: Event, @@ -61,6 +65,27 @@ impl Clone for InProgressState { } } +#[derive(Debug)] +pub struct InProgressCellState { + pub event: Event, +} + +impl Clone for InProgressCellState { + fn clone(&self) -> Self { + panic!("InProgressCell cannot be cloned"); + } +} + +impl InProgressCellState { + pub fn new(task_id: TaskId, cell: CellId) -> Self { + InProgressCellState { + event: Event::new(move || { + format!("InProgressCellState::event ({} {:?})", task_id, cell) + }), + } + } +} + #[derive(Debug, Clone, KeyValuePair, Serialize, Deserialize)] pub enum CachedDataItem { // Output @@ -91,6 +116,10 @@ pub enum CachedDataItem { cell: CellId, value: TypedSharedReference, }, + CellTypeMaxIndex { + cell_type: ValueTypeId, + value: u32, + }, // Dependencies OutputDependency { @@ -159,6 +188,11 @@ pub enum CachedDataItem { InProgress { value: InProgressState, }, + #[serde(skip)] + InProgressCell { + cell: CellId, + value: InProgressCellState, + }, OutdatedCollectible { collectible: CellRef, value: (), @@ -192,6 +226,7 @@ impl CachedDataItem { CachedDataItem::DirtyWhenPersisted { .. } => true, CachedDataItem::Child { task, .. } => !task.is_transient(), CachedDataItem::CellData { .. } => true, + CachedDataItem::CellTypeMaxIndex { .. } => true, CachedDataItem::OutputDependency { target, .. } => !target.is_transient(), CachedDataItem::CellDependency { target, .. } => !target.task.is_transient(), CachedDataItem::CollectiblesDependency { target, .. } => !target.task.is_transient(), @@ -208,6 +243,7 @@ impl CachedDataItem { CachedDataItem::AggregatedUnfinishedTasks { .. } => true, CachedDataItem::AggregateRootType { .. } => false, CachedDataItem::InProgress { .. } => false, + CachedDataItem::InProgressCell { .. } => false, CachedDataItem::OutdatedCollectible { .. } => false, CachedDataItem::OutdatedOutputDependency { .. } => false, CachedDataItem::OutdatedCellDependency { .. } => false, @@ -232,6 +268,21 @@ impl CachedDataItem { }, } } + + pub fn new_scheduled_with_listener(task_id: TaskId, clean: bool) -> (Self, EventListener) { + let done_event = Event::new(move || format!("{} done_event", task_id)); + let listener = done_event.listen(); + ( + CachedDataItem::InProgress { + value: InProgressState::Scheduled { + clean, + done_event, + start_event: Event::new(move || format!("{} start_event", task_id)), + }, + }, + listener, + ) + } } impl CachedDataItemKey { @@ -243,6 +294,7 @@ impl CachedDataItemKey { CachedDataItemKey::DirtyWhenPersisted { .. } => true, CachedDataItemKey::Child { task, .. } => !task.is_transient(), CachedDataItemKey::CellData { .. } => true, + CachedDataItemKey::CellTypeMaxIndex { .. } => true, CachedDataItemKey::OutputDependency { target, .. } => !target.is_transient(), CachedDataItemKey::CellDependency { target, .. } => !target.task.is_transient(), CachedDataItemKey::CollectiblesDependency { target, .. } => !target.task.is_transient(), @@ -259,6 +311,7 @@ impl CachedDataItemKey { CachedDataItemKey::AggregatedUnfinishedTasks { .. } => true, CachedDataItemKey::AggregateRootType { .. } => false, CachedDataItemKey::InProgress { .. } => false, + CachedDataItemKey::InProgressCell { .. } => false, CachedDataItemKey::OutdatedCollectible { .. } => false, CachedDataItemKey::OutdatedOutputDependency { .. } => false, CachedDataItemKey::OutdatedCellDependency { .. } => false,