Skip to content

Commit

Permalink
remove exceeding cells
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Sep 4, 2024
1 parent 92aff18 commit 15950e6
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 64 deletions.
109 changes: 101 additions & 8 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
time::{Duration, Instant},
};

use anyhow::Result;
use anyhow::{bail, Result};
use auto_hash_map::{AutoMap, AutoSet};
use dashmap::DashMap;
pub use operation::AnyOperation;
Expand All @@ -41,9 +41,9 @@ use crate::{
backing_storage::BackingStorage,
data::{
CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate, CellRef,
InProgressState, OutputValue, RootType,
InProgressCellState, InProgressState, OutputValue, RootType,
},
get, remove,
get, get_many, remove,
utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc},
};

Expand Down Expand Up @@ -261,7 +261,14 @@ impl TurboTasksBackend {
}
}

todo!("Output of is not available, recompute task: {task:#?}");
// Output doesn't exist. We need to schedule the task to compute it.
let dirty = task.has_key(&CachedDataItemKey::Dirty {});
let (item, listener) = CachedDataItem::new_scheduled_with_listener(task_id, !dirty);
if task.add(item) {
turbo_tasks.schedule(task_id);
}

Ok(Err(listener))
}

fn try_read_task_cell(
Expand Down Expand Up @@ -300,7 +307,45 @@ impl TurboTasksBackend {
)));
}

todo!("Cell {cell:?} is not available, recompute task or error: {task:#?}");
// Check cell index range (cell might not exist at all)
let Some(max_id) = get!(
task,
CellTypeMaxIndex {
cell_type: cell.type_id
}
) else {
bail!(
"Cell {cell:?} no longer exists in task {task_id:?} (no cell of this type exists)"
);
};
if cell.index > *max_id {
bail!("Cell {cell:?} no longer exists in task {task_id:?} (index out of bounds)");
}

// Cell should exist, but data was dropped or is not serializable. We need to recompute the
// task the get the cell content.

// Register event listener for cell computation
if let Some(in_progress) = get!(task, InProgressCell { cell }) {
// Someone else is already computing the cell
let listener = in_progress.event.listen();
return Ok(Err(listener));
}

// We create the event and potentially schedule the task
let in_progress = InProgressCellState::new(task_id, cell);
let listener = in_progress.event.listen();
task.add(CachedDataItem::InProgressCell {
cell,
value: in_progress,
});

// Schedule the task
if task.add(CachedDataItem::new_scheduled(task_id)) {
turbo_tasks.schedule(task_id);
}

Ok(Err(listener))
}

fn lookup_task_type(&self, task_id: TaskId) -> Option<Arc<CachedTaskType>> {
Expand Down Expand Up @@ -745,9 +790,6 @@ impl Backend for TurboTasksBackend {
panic!("Task execution completed, but task is not in progress: {task:#?}");
};

// TODO handle cell counters
let _ = cell_counters;

// TODO handle stateful
let _ = stateful;

Expand All @@ -762,6 +804,48 @@ impl Backend for TurboTasksBackend {
drop(task);
drop(ctx);
} else {
// handle cell counters: update max index and remove cells that are no longer used
let mut removed_cells = HashMap::new();
let mut old_counters: HashMap<_, _> =
get_many!(task, CellTypeMaxIndex { cell_type } max_index => (cell_type, max_index));
for (&cell_type, &max_index) in cell_counters.iter() {
if let Some(old_max_index) = old_counters.remove(&cell_type) {
if old_max_index != max_index {
task.insert(CachedDataItem::CellTypeMaxIndex {
cell_type,
value: max_index,
});
if old_max_index > max_index {
removed_cells.insert(cell_type, max_index + 1..=old_max_index);
}
}
} else {
task.add(CachedDataItem::CellTypeMaxIndex {
cell_type,
value: max_index,
});
}
}
for (cell_type, old_max_index) in old_counters {
task.remove(&CachedDataItemKey::CellTypeMaxIndex { cell_type });
removed_cells.insert(cell_type, 0..=old_max_index);
}
let mut removed_data = Vec::new();
for (&cell_type, range) in removed_cells.iter() {
for index in range.clone() {
removed_data.extend(
task.remove(&CachedDataItemKey::CellData {
cell: CellId {
type_id: cell_type,
index,
},
})
.into_iter(),
);
}
}

// find all outdated data items (removed cells, outdated edges)
let old_edges = task
.iter()
.filter_map(|(key, _)| match *key {
Expand All @@ -772,6 +856,13 @@ impl Backend for TurboTasksBackend {
CachedDataItemKey::OutdatedOutputDependency { target } => {
Some(OutdatedEdge::OutputDependency(target))
}
CachedDataItemKey::CellDependent { cell, task }
if removed_cells
.get(&cell.type_id)
.map_or(false, |range| range.contains(&cell.index)) =>
{
Some(OutdatedEdge::RemovedCellDependent(task))
}
_ => None,
})
.collect::<Vec<_>>();
Expand All @@ -782,6 +873,8 @@ impl Backend for TurboTasksBackend {
drop(task);

CleanupOldEdgesOperation::run(task_id, old_edges, ctx);

drop(removed_data)
}

stale
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use turbo_tasks::TaskId;

use super::{
aggregation_update::{AggregationUpdateJob, AggregationUpdateQueue},
invalidate::make_task_dirty,
ExecuteContext, Operation,
};
use crate::{
Expand Down Expand Up @@ -32,6 +33,7 @@ pub enum OutdatedEdge {
Child(TaskId),
CellDependency(CellRef),
OutputDependency(TaskId),
RemovedCellDependent(TaskId),
}

impl CleanupOldEdgesOperation {
Expand Down Expand Up @@ -101,6 +103,9 @@ impl Operation for CleanupOldEdgesOperation {
});
}
}
OutdatedEdge::RemovedCellDependent(task_id) => {
make_task_dirty(task_id, queue, ctx);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,60 +39,7 @@ impl Operation for InvalidateOperation {
InvalidateOperation::MakeDirty { task_ids } => {
let mut queue = AggregationUpdateQueue::new();
for task_id in task_ids {
let mut task = ctx.task(task_id);

if task.add(CachedDataItem::Dirty { value: () }) {
let in_progress = match get!(task, InProgress) {
Some(InProgressState::Scheduled { clean, .. }) => {
if *clean {
update!(task, InProgress, |in_progress| {
let Some(InProgressState::Scheduled {
clean: _,
done_event,
start_event,
}) = in_progress
else {
unreachable!();
};
Some(InProgressState::Scheduled {
clean: false,
done_event,
start_event,
})
});
}
true
}
Some(InProgressState::InProgress { clean, stale, .. }) => {
if *clean || !*stale {
update!(task, InProgress, |in_progress| {
let Some(InProgressState::InProgress {
clean: _,
stale: _,
done_event,
}) = in_progress
else {
unreachable!();
};
Some(InProgressState::InProgress {
clean: false,
stale: true,
done_event,
})
});
}
true
}
None => false,
};
if !in_progress && task.add(CachedDataItem::new_scheduled(task_id)) {
ctx.turbo_tasks.schedule(task_id)
}
queue.push(AggregationUpdateJob::DataUpdate {
task_id,
update: AggregatedDataUpdate::dirty_task(task_id),
})
}
make_task_dirty(task_id, &mut queue, ctx);
}
if queue.is_empty() {
self = InvalidateOperation::Done
Expand All @@ -113,3 +60,60 @@ impl Operation for InvalidateOperation {
}
}
}

pub fn make_task_dirty(task_id: TaskId, queue: &mut AggregationUpdateQueue, ctx: &ExecuteContext) {
let mut task = ctx.task(task_id);

if task.add(CachedDataItem::Dirty { value: () }) {
let in_progress = match get!(task, InProgress) {
Some(InProgressState::Scheduled { clean, .. }) => {
if *clean {
update!(task, InProgress, |in_progress| {
let Some(InProgressState::Scheduled {
clean: _,
done_event,
start_event,
}) = in_progress
else {
unreachable!();
};
Some(InProgressState::Scheduled {
clean: false,
done_event,
start_event,
})
});
}
true
}
Some(InProgressState::InProgress { clean, stale, .. }) => {
if *clean || !*stale {
update!(task, InProgress, |in_progress| {
let Some(InProgressState::InProgress {
clean: _,
stale: _,
done_event,
}) = in_progress
else {
unreachable!();
};
Some(InProgressState::InProgress {
clean: false,
stale: true,
done_event,
})
});
}
true
}
None => false,
};
if !in_progress && task.add(CachedDataItem::new_scheduled(task_id)) {
ctx.turbo_tasks.schedule(task_id)
}
queue.push(AggregationUpdateJob::DataUpdate {
task_id,
update: AggregatedDataUpdate::dirty_task(task_id),
})
}
}
9 changes: 9 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ macro_rules! get_many {
})
.collect()
};
($task:ident, $key:ident $input:tt $value_ident:ident => $value:expr) => {
$task
.iter()
.filter_map(|(key, value)| match (key, value) {
(&CachedDataItemKey::$key $input, &CachedDataItemValue::$key { value: $value_ident }) => Some($value),
_ => None,
})
.collect()
};
($task:ident, $key1:ident $input1:tt => $value1:ident, $key2:ident $input2:tt => $value2:ident) => {
$task
.iter()
Expand Down
Loading

0 comments on commit 15950e6

Please sign in to comment.