From c92b523aaa4dc17868b834dfa92771c70590abde Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 9 Aug 2024 11:48:36 +0200 Subject: [PATCH] continue uncompleted operations --- .../turbo-tasks-backend/src/backend/mod.rs | 12 +++++++++- .../src/backend/operation/mod.rs | 14 +++++++++++ .../src/backing_storage.rs | 1 + .../src/lmdb_backing_storage.rs | 24 +++++++++++++------ 4 files changed, 43 insertions(+), 8 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 5900d37d1d092a..e8b549ecf33e1a 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -230,7 +230,7 @@ impl TurboTasksBackend { } if strongy_consistent { - todo!("Handle strongly consistent read: {task:#?}"); + // todo!("Handle strongly consistent read: {task:#?}"); } if let Some(output) = get!(task, Output) { @@ -359,6 +359,16 @@ impl TurboTasksBackend { impl Backend for TurboTasksBackend { fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi) { + // Continue all uncompleted operations + // They can't be interrupted by a snapshot since the snapshotting job has not been scheduled + // yet. + let uncompleted_operations = self.backing_storage.uncompleted_operations(); + let ctx = self.execute_context(turbo_tasks); + for op in uncompleted_operations { + op.execute(&ctx); + } + + // Schedule the snapshot job turbo_tasks.schedule_backend_background_job(BackendJobId::from(1)); } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 79bbee881b9c5b..d4ae100fd9aa09 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -262,6 +262,20 @@ pub enum AnyOperation { Nested(Vec), } +impl AnyOperation { + pub fn execute(self, ctx: &ExecuteContext<'_>) { + match self { + AnyOperation::ConnectChild(op) => op.execute(ctx), + AnyOperation::Invalidate(op) => op.execute(ctx), + AnyOperation::Nested(ops) => { + for op in ops { + op.execute(ctx); + } + } + } + } +} + impl_operation!(ConnectChild connect_child::ConnectChildOperation); impl_operation!(Invalidate invalidate::InvalidateOperation); diff --git a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs index 6b6707c476e512..cb1b2da8a4129b 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs @@ -11,6 +11,7 @@ use crate::{ pub trait BackingStorage { fn next_free_task_id(&self) -> TaskId; + fn uncompleted_operations(&self) -> Vec; fn save_snapshot( &self, operations: Vec>, diff --git a/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs index 5239eb18dda829..5562243069a593 100644 --- a/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs @@ -75,13 +75,23 @@ impl LmdbBackingStorage { impl BackingStorage for LmdbBackingStorage { fn next_free_task_id(&self) -> TaskId { - let Ok(tx) = self.env.begin_ro_txn() else { - return TaskId::from(1); - }; - let next_free_task_id = - as_u32(tx.get(self.meta_db, &IntKey::new(META_KEY_NEXT_FREE_TASK_ID))).unwrap_or(1); - let _ = tx.commit(); - TaskId::from(next_free_task_id) + fn get(this: &LmdbBackingStorage) -> Result { + let tx = this.env.begin_rw_txn()?; + let next_free_task_id = + as_u32(tx.get(this.meta_db, &IntKey::new(META_KEY_NEXT_FREE_TASK_ID)))?; + Ok(next_free_task_id) + } + TaskId::from(get(self).unwrap_or(1)) + } + + fn uncompleted_operations(&self) -> Vec { + fn get(this: &LmdbBackingStorage) -> Result> { + let tx = this.env.begin_ro_txn()?; + let operations = tx.get(this.meta_db, &IntKey::new(META_KEY_OPERATIONS))?; + let operations = bincode::deserialize(operations)?; + Ok(operations) + } + get(self).unwrap_or_default() } fn save_snapshot(