Skip to content

Commit

Permalink
continue uncompleted operations
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Aug 13, 2024
1 parent ffd7120 commit 96777a7
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 8 deletions.
12 changes: 11 additions & 1 deletion turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl TurboTasksBackend {
}

if strongy_consistent {
todo!("Handle strongly consistent read: {task:#?}");
// todo!("Handle strongly consistent read: {task:#?}");
}

if let Some(output) = get!(task, Output) {
Expand Down Expand Up @@ -362,6 +362,16 @@ impl TurboTasksBackend {

impl Backend for TurboTasksBackend {
fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
// Continue all uncompleted operations
// They can't be interrupted by a snapshot since the snapshotting job has not been scheduled
// yet.
let uncompleted_operations = self.backing_storage.uncompleted_operations();
let ctx = self.execute_context(turbo_tasks);
for op in uncompleted_operations {
op.execute(&ctx);
}

// Schedule the snapshot job
turbo_tasks.schedule_backend_background_job(BackendJobId::from(1));
}

Expand Down
15 changes: 15 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,21 @@ pub enum AnyOperation {
Nested(Vec<AnyOperation>),
}

impl AnyOperation {
pub fn execute(self, ctx: &ExecuteContext<'_>) {
match self {
AnyOperation::ConnectChild(op) => op.execute(ctx),
AnyOperation::Invalidate(op) => op.execute(ctx),
AnyOperation::CleanupOldEdges(op) => op.execute(ctx),
AnyOperation::Nested(ops) => {
for op in ops {
op.execute(ctx);
}
}
}
}
}

impl_operation!(ConnectChild connect_child::ConnectChildOperation);
impl_operation!(Invalidate invalidate::InvalidateOperation);
impl_operation!(CleanupOldEdges cleanup_old_edges::CleanupOldEdgesOperation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{

pub trait BackingStorage {
fn next_free_task_id(&self) -> TaskId;
fn uncompleted_operations(&self) -> Vec<AnyOperation>;
fn save_snapshot(
&self,
operations: Vec<Arc<AnyOperation>>,
Expand Down
24 changes: 17 additions & 7 deletions turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,23 @@ impl LmdbBackingStorage {

impl BackingStorage for LmdbBackingStorage {
fn next_free_task_id(&self) -> TaskId {
let Ok(tx) = self.env.begin_ro_txn() else {
return TaskId::from(1);
};
let next_free_task_id =
as_u32(tx.get(self.meta_db, &IntKey::new(META_KEY_NEXT_FREE_TASK_ID))).unwrap_or(1);
let _ = tx.commit();
TaskId::from(next_free_task_id)
fn get(this: &LmdbBackingStorage) -> Result<u32> {
let tx = this.env.begin_rw_txn()?;
let next_free_task_id =
as_u32(tx.get(this.meta_db, &IntKey::new(META_KEY_NEXT_FREE_TASK_ID)))?;
Ok(next_free_task_id)
}
TaskId::from(get(self).unwrap_or(1))
}

fn uncompleted_operations(&self) -> Vec<AnyOperation> {
fn get(this: &LmdbBackingStorage) -> Result<Vec<AnyOperation>> {
let tx = this.env.begin_ro_txn()?;
let operations = tx.get(this.meta_db, &IntKey::new(META_KEY_OPERATIONS))?;
let operations = bincode::deserialize(operations)?;
Ok(operations)
}
get(self).unwrap_or_default()
}

fn save_snapshot(
Expand Down

0 comments on commit 96777a7

Please sign in to comment.