From 47baaa65beaf3208926e5805b185024706b48fa9 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 8 Aug 2024 16:42:50 +0200 Subject: [PATCH] add lmdb persisting and restoring --- Cargo.lock | 50 ++--- .../crates/turbo-tasks-backend/Cargo.toml | 2 + .../turbo-tasks-backend/src/backend/mod.rs | 130 ++++++++++-- .../src/backend/operation/connect_child.rs | 12 +- .../src/backend/operation/mod.rs | 27 ++- .../src/backend/operation/update_cell.rs | 2 +- .../src/backend/storage.rs | 48 ++++- .../src/backing_storage.rs | 23 ++ .../crates/turbo-tasks-backend/src/data.rs | 16 +- .../crates/turbo-tasks-backend/src/lib.rs | 3 + .../src/lmdb_backing_storage.rs | 199 ++++++++++++++++++ .../src/utils/chunked_vec.rs | 21 +- .../src/utils/ptr_eq_arc.rs | 4 + .../turbo-tasks-backend/tests/test_config.trs | 19 +- .../crates/turbo-tasks-testing/src/lib.rs | 4 + .../crates/turbo-tasks-testing/src/run.rs | 1 + .../crates/turbo-tasks-testing/tests/basic.rs | 16 +- turbopack/crates/turbo-tasks/src/backend.rs | 2 +- turbopack/crates/turbo-tasks/src/lib.rs | 2 +- turbopack/crates/turbo-tasks/src/manager.rs | 10 + turbopack/crates/turbo-tasks/src/task/mod.rs | 2 +- 21 files changed, 519 insertions(+), 74 deletions(-) create mode 100644 turbopack/crates/turbo-tasks-backend/src/backing_storage.rs create mode 100644 turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs diff --git a/Cargo.lock b/Cargo.lock index e6c68df4839b0..11598c703aafe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1651,7 +1651,7 @@ dependencies = [ "cssparser-macros", "dtoa-short", "itoa", - "phf 0.10.1", + "phf 0.11.2", "serde", "smallvec", ] @@ -3482,6 +3482,28 @@ version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +[[package]] +name = "lmdb" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0908efb5d6496aa977d96f91413da2635a902e5e31dbef0bfb88986c248539" +dependencies = [ + "bitflags 1.3.2", + "libc", + "lmdb-sys", +] + +[[package]] +name = "lmdb-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b392838cfe8858e86fac37cf97a0e8c55cc60ba0a18365cadc33092f128ce9" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "lock_api" version = "0.4.10" @@ -4617,9 +4639,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fabbf1ead8a5bcbc20f5f8b939ee3f5b0f6f281b6ad3468b84656b658b455259" dependencies = [ - "phf_macros 0.10.0", "phf_shared 0.10.0", - "proc-macro-hack", ] [[package]] @@ -4628,7 +4648,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" dependencies = [ - "phf_macros 0.11.2", + "phf_macros", "phf_shared 0.11.2", ] @@ -4662,20 +4682,6 @@ dependencies = [ "rand", ] -[[package]] -name = "phf_macros" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58fdf3184dd560f160dd73922bea2d5cd6e8f064bf4b13110abd81b03697b4e0" -dependencies = [ - "phf_generator 0.10.0", - "phf_shared 0.10.0", - "proc-macro-hack", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "phf_macros" version = "0.11.2" @@ -4939,12 +4945,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "1.0.79" @@ -8390,8 +8390,10 @@ dependencies = [ "anyhow", "async-trait", "auto-hash-map", + "bincode", "dashmap", "indexmap 1.9.3", + "lmdb", "once_cell", "parking_lot", "rand", diff --git a/turbopack/crates/turbo-tasks-backend/Cargo.toml b/turbopack/crates/turbo-tasks-backend/Cargo.toml index ff458ac2808b2..30a13b255d9a6 100644 --- a/turbopack/crates/turbo-tasks-backend/Cargo.toml +++ b/turbopack/crates/turbo-tasks-backend/Cargo.toml @@ -16,8 +16,10 @@ workspace = true anyhow = { workspace = true } async-trait = { workspace = true } auto-hash-map = { workspace = true } +bincode = "1.3.3" dashmap = { workspace = true } indexmap = { workspace = true } +lmdb = "0.8.0" once_cell = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index e0618aaa19cd5..dd854c33ac573 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -4,15 +4,16 @@ mod storage; use std::{ borrow::Cow, - collections::HashSet, + collections::{HashMap, HashSet}, future::Future, hash::BuildHasherDefault, + mem::take, pin::Pin, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }, - time::Duration, + time::{Duration, Instant}, }; use anyhow::Result; @@ -36,6 +37,7 @@ use turbo_tasks::{ use self::{operation::ExecuteContext, storage::Storage}; use crate::{ + backing_storage::BackingStorage, data::{ CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate, CellRef, InProgressState, OutputValue, RootType, @@ -61,6 +63,8 @@ impl SnapshotRequest { } pub struct TurboTasksBackend { + start_time: Instant, + persisted_task_id_factory: IdFactoryWithReuse, transient_task_id_factory: IdFactoryWithReuse, @@ -86,19 +90,18 @@ pub struct TurboTasksBackend { /// Condition Variable that is triggered when a snapshot is completed and /// operations can continue. snapshot_completed: Condvar, -} + /// The timestamp of the last started snapshot. + last_snapshot: AtomicU64, -impl Default for TurboTasksBackend { - fn default() -> Self { - Self::new() - } + backing_storage: Arc, } impl TurboTasksBackend { - pub fn new() -> Self { + pub fn new(backing_storage: Arc) -> Self { Self { + start_time: Instant::now(), persisted_task_id_factory: IdFactoryWithReuse::new_with_range( - 1, + *backing_storage.next_free_task_id() as u64, (TRANSIENT_TASK_BIT - 1) as u64, ), transient_task_id_factory: IdFactoryWithReuse::new_with_range( @@ -114,6 +117,8 @@ impl TurboTasksBackend { snapshot_request: Mutex::new(SnapshotRequest::new()), operations_suspended: Condvar::new(), snapshot_completed: Condvar::new(), + last_snapshot: AtomicU64::new(0), + backing_storage, } } @@ -288,14 +293,78 @@ impl TurboTasksBackend { reader_task.add(CachedDataItem::CellDependency { target, value: () }); } } - return Ok(Ok(CellContent(Some(content)).into_typed(cell.type_id))); + return Ok(Ok(TypedCellContent( + cell.type_id, + CellContent(Some(content.1)), + ))); } todo!("Cell {cell:?} is not available, recompute task or error: {task:#?}"); } + + fn snapshot(&self) -> Option { + let mut snapshot_request = self.snapshot_request.lock(); + snapshot_request.snapshot_requested = true; + let active_operations = self + .in_progress_operations + .fetch_or(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed); + if active_operations != 0 { + self.operations_suspended + .wait_while(&mut snapshot_request, |_| { + self.in_progress_operations + .load(std::sync::atomic::Ordering::Relaxed) + != SNAPSHOT_REQUESTED_BIT + }); + } + let suspended_operations = snapshot_request + .suspended_operations + .iter() + .map(|op| op.arc().clone()) + .collect::>(); + drop(snapshot_request); + let persisted_storage_log = take(&mut *self.persisted_storage_log.lock()); + let persisted_task_cache_log = take(&mut *self.persisted_task_cache_log.lock()); + let mut snapshot_request = self.snapshot_request.lock(); + snapshot_request.snapshot_requested = false; + self.in_progress_operations + .fetch_sub(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed); + self.snapshot_completed.notify_all(); + let snapshot_time = Instant::now(); + drop(snapshot_request); + + let mut counts: HashMap = HashMap::new(); + for CachedDataUpdate { task, .. } in persisted_storage_log.iter() { + *counts.entry(*task).or_default() += 1; + } + + if !persisted_task_cache_log.is_empty() || !persisted_storage_log.is_empty() { + if let Err(err) = self.backing_storage.save_snapshot( + suspended_operations, + persisted_task_cache_log, + persisted_storage_log, + ) { + println!("Persising failed: {:#?}", err); + return None; + } + println!("Snapshot saved"); + } + + for (task_id, count) in counts { + self.storage + .access_mut(task_id) + .persistance_state + .finish_persisting_items(count); + } + + Some(snapshot_time) + } } impl Backend for TurboTasksBackend { + fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi) { + turbo_tasks.schedule_backend_background_job(BackendJobId::from(1)); + } + fn get_or_create_persistent_task( &self, task_type: CachedTaskType, @@ -307,6 +376,12 @@ impl Backend for TurboTasksBackend { return task_id; } + if let Some(task_id) = self.backing_storage.forward_lookup_task_cache(&task_type) { + let _ = self.task_cache.try_insert(Arc::new(task_type), task_id); + self.connect_child(parent_task, task_id, turbo_tasks); + return task_id; + } + let task_type = Arc::new(task_type); let task_id = self.persisted_task_id_factory.get(); if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) { @@ -652,12 +727,31 @@ impl Backend for TurboTasksBackend { stale } - fn run_backend_job( - &self, - _: BackendJobId, - _: &dyn TurboTasksBackendApi, - ) -> Pin + Send + 'static)>> { - todo!() + fn run_backend_job<'a>( + &'a self, + id: BackendJobId, + turbo_tasks: &'a dyn TurboTasksBackendApi, + ) -> Pin + Send + 'a>> { + Box::pin(async move { + if *id == 1 { + const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(1); + + let last_snapshot = self.last_snapshot.load(Ordering::Relaxed); + let last_snapshot = self.start_time + Duration::from_millis(last_snapshot); + let elapsed = last_snapshot.elapsed(); + if elapsed < SNAPSHOT_INTERVAL { + tokio::time::sleep(SNAPSHOT_INTERVAL - elapsed).await; + } + + if let Some(last_snapshot) = self.snapshot() { + let last_snapshot = last_snapshot.duration_since(self.start_time); + self.last_snapshot + .store(last_snapshot.as_millis() as u64, Ordering::Relaxed); + + turbo_tasks.schedule_backend_background_job(id); + } + } + }) } fn try_read_task_output( @@ -707,7 +801,7 @@ impl Backend for TurboTasksBackend { let ctx = self.execute_context(turbo_tasks); let task = ctx.task(task_id); if let Some(content) = get!(task, CellData { cell }) { - Ok(CellContent(Some(content.clone())).into_typed(cell.type_id)) + Ok(CellContent(Some(content.1.clone())).into_typed(cell.type_id)) } else { Ok(CellContent(None).into_typed(cell.type_id)) } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs index 8bd3e39b7ce59..b5eccffc4c938 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use turbo_tasks::TaskId; use super::{ExecuteContext, Operation}; -use crate::data::CachedDataItem; +use crate::data::{CachedDataItem, CachedDataItemKey}; #[derive(Serialize, Deserialize, Clone, Default)] pub enum ConnectChildOperation { @@ -37,11 +37,17 @@ impl Operation for ConnectChildOperation { ctx.operation_suspend_point(&self); match self { ConnectChildOperation::ScheduleTask { task_id } => { + let mut should_schedule; { let mut task = ctx.task(task_id); - task.add(CachedDataItem::new_scheduled(task_id)); + should_schedule = !task.has_key(&CachedDataItemKey::Output {}); + if should_schedule { + should_schedule = task.add(CachedDataItem::new_scheduled(task_id)); + } + } + if should_schedule { + ctx.schedule(task_id); } - ctx.schedule(task_id); self = ConnectChildOperation::Done; continue; diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index ef4c2399b0004..8de4d1b5fb9b8 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -50,8 +50,25 @@ impl<'a> ExecuteContext<'a> { } pub fn task(&self, task_id: TaskId) -> TaskGuard<'a> { + let mut task = self.backend.storage.access_mut(task_id); + if !task.persistance_state.is_restored() { + if task_id.is_transient() { + task.persistance_state.set_restored(); + } else { + // Avoid holding the lock too long since this can also affect other tasks + drop(task); + let items = self.backend.backing_storage.lookup_data(task_id); + task = self.backend.storage.access_mut(task_id); + if !task.persistance_state.is_restored() { + for item in items { + task.add(item); + } + task.persistance_state.set_restored(); + } + } + } TaskGuard { - task: self.backend.storage.access_mut(task_id), + task, task_id, backend: self.backend, } @@ -126,6 +143,7 @@ impl<'a> TaskGuard<'a> { self.task.add(item) } else if self.task.add(item.clone()) { let (key, value) = item.into_key_and_value(); + self.task.persistance_state.add_persisting_item(); self.backend .persisted_storage_log .lock() @@ -150,6 +168,7 @@ impl<'a> TaskGuard<'a> { key.clone(), value.clone(), )); + self.task.persistance_state.add_persisting_item(); self.backend .persisted_storage_log .lock() @@ -163,6 +182,7 @@ impl<'a> TaskGuard<'a> { let item = CachedDataItem::from_key_and_value(key.clone(), value); if let Some(old) = self.task.insert(item) { if old.is_persistent() { + self.task.persistance_state.add_persisting_item(); self.backend .persisted_storage_log .lock() @@ -184,6 +204,7 @@ impl<'a> TaskGuard<'a> { if let Some(value) = old_value { if key.is_persistent() && value.is_persistent() { let key = key.clone(); + self.task.persistance_state.add_persisting_item(); self.backend .persisted_storage_log .lock() @@ -203,6 +224,10 @@ impl<'a> TaskGuard<'a> { self.task.get(key) } + pub fn has_key(&self, key: &CachedDataItemKey) -> bool { + self.task.has_key(key) + } + pub fn iter(&self) -> impl Iterator { self.task.iter() } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs index 664f3fdc3cfbd..22750fc8b7186 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs @@ -11,7 +11,7 @@ impl UpdateCellOperation { let old_content = if let CellContent(Some(new_content)) = content { task.insert(CachedDataItem::CellData { cell, - value: new_content, + value: new_content.into_typed(cell.type_id), }) } else { task.remove(&CachedDataItemKey::CellData { cell }) diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index c8f9702585096..1b4a3c58fef9b 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -10,27 +10,51 @@ use dashmap::{mapref::one::RefMut, DashMap}; use rustc_hash::FxHasher; use turbo_tasks::KeyValuePair; -enum PersistanceState { - /// We know that all state of the object is only in the cache and nothing is - /// stored in the persistent cache. - CacheOnly, - /// We know that some state of the object is stored in the persistent cache. - Persisted, - /// We have never checked the persistent cache for the state of the object. - Unknown, +const UNRESTORED: u32 = u32::MAX; + +pub struct PersistanceState { + value: u32, +} + +impl Default for PersistanceState { + fn default() -> Self { + Self { value: UNRESTORED } + } +} + +impl PersistanceState { + pub fn set_restored(&mut self) { + self.value = 0; + } + + pub fn add_persisting_item(&mut self) { + self.value += 1; + } + + pub fn finish_persisting_items(&mut self, count: u32) { + self.value -= count; + } + + pub fn is_restored(&self) -> bool { + self.value != UNRESTORED + } + + pub fn is_fully_persisted(&self) -> bool { + self.value == 0 + } } pub struct InnerStorage { // TODO consider adding some inline storage map: AutoMap, - persistance_state: PersistanceState, + pub persistance_state: PersistanceState, } impl InnerStorage { fn new() -> Self { Self { map: AutoMap::new(), - persistance_state: PersistanceState::Unknown, + persistance_state: PersistanceState::default(), } } @@ -58,6 +82,10 @@ impl InnerStorage { self.map.get(key) } + pub fn has_key(&self, key: &T::Key) -> bool { + self.map.contains_key(key) + } + pub fn iter(&self) -> impl Iterator { self.map.iter() } diff --git a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs new file mode 100644 index 0000000000000..6b6707c476e51 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs @@ -0,0 +1,23 @@ +use std::sync::Arc; + +use anyhow::Result; +use turbo_tasks::{backend::CachedTaskType, TaskId}; + +use crate::{ + backend::AnyOperation, + data::{CachedDataItem, CachedDataUpdate}, + utils::chunked_vec::ChunkedVec, +}; + +pub trait BackingStorage { + fn next_free_task_id(&self) -> TaskId; + fn save_snapshot( + &self, + operations: Vec>, + task_cache_updates: ChunkedVec<(Arc, TaskId)>, + data_updates: ChunkedVec, + ) -> Result<()>; + fn forward_lookup_task_cache(&self, key: &CachedTaskType) -> Option; + fn reverse_lookup_task_cache(&self, task_id: TaskId) -> Option>; + fn lookup_data(&self, task_id: TaskId) -> Vec; +} diff --git a/turbopack/crates/turbo-tasks-backend/src/data.rs b/turbopack/crates/turbo-tasks-backend/src/data.rs index 52952432c6fb5..c079b035247e1 100644 --- a/turbopack/crates/turbo-tasks-backend/src/data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/data.rs @@ -1,12 +1,15 @@ -use turbo_tasks::{event::Event, util::SharedError, CellId, KeyValuePair, SharedReference, TaskId}; +use serde::{Deserialize, Serialize}; +use turbo_tasks::{ + event::Event, util::SharedError, CellId, KeyValuePair, TaskId, TypedSharedReference, +}; -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] pub struct CellRef { pub task: TaskId, pub cell: CellId, } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum OutputValue { Cell(CellRef), Output(TaskId), @@ -51,7 +54,7 @@ impl Clone for InProgressState { } } -#[derive(Debug, Clone, KeyValuePair)] +#[derive(Debug, Clone, KeyValuePair, Serialize, Deserialize)] pub enum CachedDataItem { // Output Output { @@ -79,7 +82,7 @@ pub enum CachedDataItem { // Cells CellData { cell: CellId, - value: SharedReference, + value: TypedSharedReference, }, // Dependencies @@ -130,11 +133,13 @@ pub enum CachedDataItem { }, // Transient Root Type + #[serde(skip)] AggregateRootType { value: RootType, }, // Transient In Progress state + #[serde(skip)] InProgress { value: InProgressState, }, @@ -156,6 +161,7 @@ pub enum CachedDataItem { }, // Transient Error State + #[serde(skip)] Error { value: SharedError, }, diff --git a/turbopack/crates/turbo-tasks-backend/src/lib.rs b/turbopack/crates/turbo-tasks-backend/src/lib.rs index 5fc95b21a44d9..293c5d6c105b9 100644 --- a/turbopack/crates/turbo-tasks-backend/src/lib.rs +++ b/turbopack/crates/turbo-tasks-backend/src/lib.rs @@ -1,5 +1,8 @@ mod backend; +mod backing_storage; mod data; +mod lmdb_backing_storage; mod utils; pub use backend::TurboTasksBackend; +pub use lmdb_backing_storage::LmdbBackingStorage; diff --git a/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs new file mode 100644 index 0000000000000..5239eb18dda82 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs @@ -0,0 +1,199 @@ +use std::{ + collections::{hash_map::Entry, HashMap}, + error::Error, + path::Path, + sync::Arc, + thread::available_parallelism, +}; + +use anyhow::Result; +use lmdb::{Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, WriteFlags}; +use turbo_tasks::{backend::CachedTaskType, KeyValuePair, TaskId}; + +use crate::{ + backend::AnyOperation, + backing_storage::BackingStorage, + data::{CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate}, + utils::chunked_vec::ChunkedVec, +}; + +const META_KEY_OPERATIONS: u32 = 0; +const META_KEY_NEXT_FREE_TASK_ID: u32 = 1; + +struct IntKey([u8; 4]); + +impl IntKey { + fn new(value: u32) -> Self { + Self(value.to_be_bytes()) + } +} + +impl AsRef<[u8]> for IntKey { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +fn as_u32(result: Result<&[u8], E>) -> Result { + let bytes = result?; + let n = bytes.try_into().map(u32::from_be_bytes)?; + Ok(n) +} + +pub struct LmdbBackingStorage { + env: Environment, + meta_db: Database, + data_db: Database, + forward_task_cache_db: Database, + reverse_task_cache_db: Database, +} + +impl LmdbBackingStorage { + pub fn new(path: &Path) -> Result { + println!("opening lmdb {:?}", path); + let env = Environment::new() + .set_flags(EnvironmentFlags::WRITE_MAP | EnvironmentFlags::NO_META_SYNC) + .set_max_readers((available_parallelism().map_or(16, |v| v.get()) * 8) as u32) + .set_max_dbs(4) + .set_map_size(20 * 1024 * 1024 * 1024) + .open(path)?; + let meta_db = env.create_db(Some("meta"), DatabaseFlags::INTEGER_KEY)?; + let data_db = env.create_db(Some("data"), DatabaseFlags::INTEGER_KEY)?; + let forward_task_cache_db = + env.create_db(Some("forward_task_cache"), DatabaseFlags::empty())?; + let reverse_task_cache_db = + env.create_db(Some("reverse_task_cache"), DatabaseFlags::INTEGER_KEY)?; + Ok(Self { + env, + meta_db, + data_db, + forward_task_cache_db, + reverse_task_cache_db, + }) + } +} + +impl BackingStorage for LmdbBackingStorage { + fn next_free_task_id(&self) -> TaskId { + let Ok(tx) = self.env.begin_ro_txn() else { + return TaskId::from(1); + }; + let next_free_task_id = + as_u32(tx.get(self.meta_db, &IntKey::new(META_KEY_NEXT_FREE_TASK_ID))).unwrap_or(1); + let _ = tx.commit(); + TaskId::from(next_free_task_id) + } + + fn save_snapshot( + &self, + operations: Vec>, + task_cache_updates: ChunkedVec<(Arc, TaskId)>, + data_updates: ChunkedVec, + ) -> Result<()> { + let mut tx = self.env.begin_rw_txn()?; + let mut next_task_id = + as_u32(tx.get(self.meta_db, &IntKey::new(META_KEY_NEXT_FREE_TASK_ID))).unwrap_or(1); + for (task_type, task_id) in task_cache_updates.iter() { + let task_id = **task_id; + let task_type = bincode::serialize(&task_type)?; + tx.put( + self.forward_task_cache_db, + &task_type, + &task_id.to_be_bytes(), + WriteFlags::empty(), + )?; + tx.put( + self.reverse_task_cache_db, + &IntKey::new(task_id), + &task_type, + WriteFlags::empty(), + )?; + next_task_id = next_task_id.max(task_id + 1); + } + tx.put( + self.meta_db, + &IntKey::new(META_KEY_NEXT_FREE_TASK_ID), + &next_task_id.to_be_bytes(), + WriteFlags::empty(), + )?; + let operations = bincode::serialize(&operations)?; + tx.put( + self.meta_db, + &IntKey::new(META_KEY_OPERATIONS), + &operations, + WriteFlags::empty(), + )?; + let mut updated_items: HashMap> = + HashMap::new(); + for CachedDataUpdate { task, key, value } in data_updates.into_iter() { + let data = match updated_items.entry(task) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + let mut map = HashMap::new(); + if let Ok(old_data) = tx.get(self.data_db, &IntKey::new(*task)) { + let old_data: Vec = bincode::deserialize(old_data)?; + for item in old_data { + let (key, value) = item.into_key_and_value(); + map.insert(key, value); + } + } + entry.insert(map) + } + }; + if let Some(value) = value { + data.insert(key, value); + } else { + data.remove(&key); + } + } + for (task_id, data) in updated_items { + let vec: Vec = data + .into_iter() + .map(|(key, value)| CachedDataItem::from_key_and_value(key, value)) + .collect(); + let value = bincode::serialize(&vec)?; + tx.put( + self.data_db, + &IntKey::new(*task_id), + &value, + WriteFlags::empty(), + )?; + } + tx.commit()?; + Ok(()) + } + + fn forward_lookup_task_cache(&self, task_type: &CachedTaskType) -> Option { + let tx = self.env.begin_ro_txn().ok()?; + let task_type = bincode::serialize(task_type).ok()?; + let result = tx + .get(self.forward_task_cache_db, &task_type) + .ok() + .and_then(|v| v.try_into().ok()) + .map(|v| TaskId::from(u32::from_be_bytes(v))); + tx.commit().ok()?; + result + } + + fn reverse_lookup_task_cache(&self, task_id: TaskId) -> Option> { + let tx = self.env.begin_ro_txn().ok()?; + let result = tx + .get(self.reverse_task_cache_db, &(*task_id).to_be_bytes()) + .ok() + .and_then(|v| v.try_into().ok()) + .and_then(|v: [u8; 4]| bincode::deserialize(&v).ok()); + tx.commit().ok()?; + result + } + + fn lookup_data(&self, task_id: TaskId) -> Vec { + fn lookup(this: &LmdbBackingStorage, task_id: TaskId) -> Result> { + let tx = this.env.begin_ro_txn()?; + let bytes = tx.get(this.data_db, &IntKey::new(*task_id))?; + let result = bincode::deserialize(bytes)?; + tx.commit()?; + Ok(result) + } + lookup(self, task_id).unwrap_or_default() + } +} diff --git a/turbopack/crates/turbo-tasks-backend/src/utils/chunked_vec.rs b/turbopack/crates/turbo-tasks-backend/src/utils/chunked_vec.rs index 46292f79e5e72..c5e2014715e29 100644 --- a/turbopack/crates/turbo-tasks-backend/src/utils/chunked_vec.rs +++ b/turbopack/crates/turbo-tasks-backend/src/utils/chunked_vec.rs @@ -2,18 +2,25 @@ pub struct ChunkedVec { chunks: Vec>, } +impl Default for ChunkedVec { + fn default() -> Self { + Self::new() + } +} + impl ChunkedVec { pub fn new() -> Self { Self { chunks: Vec::new() } } pub fn len(&self) -> usize { - if let Some(last) = self.chunks.last() { - let free = last.capacity() - self.len(); - cummulative_chunk_size(self.chunks.len() - 1) - free - } else { - 0 + for (i, chunk) in self.chunks.iter().enumerate().rev() { + if !chunk.is_empty() { + let free = chunk.capacity() - chunk.len(); + return cummulative_chunk_size(i) - free; + } } + 0 } pub fn push(&mut self, item: T) { @@ -42,6 +49,10 @@ impl ChunkedVec { len: self.len(), } } + + pub fn is_empty(&self) -> bool { + self.chunks.first().map_or(true, |chunk| chunk.is_empty()) + } } fn chunk_size(chunk_index: usize) -> usize { diff --git a/turbopack/crates/turbo-tasks-backend/src/utils/ptr_eq_arc.rs b/turbopack/crates/turbo-tasks-backend/src/utils/ptr_eq_arc.rs index 87543c0399031..139c40d7b8f94 100644 --- a/turbopack/crates/turbo-tasks-backend/src/utils/ptr_eq_arc.rs +++ b/turbopack/crates/turbo-tasks-backend/src/utils/ptr_eq_arc.rs @@ -10,6 +10,10 @@ impl PtrEqArc { pub fn new(value: T) -> Self { Self(Arc::new(value)) } + + pub fn arc(&self) -> &Arc { + &self.0 + } } impl From> for PtrEqArc { diff --git a/turbopack/crates/turbo-tasks-backend/tests/test_config.trs b/turbopack/crates/turbo-tasks-backend/tests/test_config.trs index 7387c44aaf3dd..71510aae294a4 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/test_config.trs +++ b/turbopack/crates/turbo-tasks-backend/tests/test_config.trs @@ -1,3 +1,18 @@ -|_name, _initial| { - turbo_tasks::TurboTasks::new(turbo_tasks_backend::TurboTasksBackend::new()) +|_name, initial | { + let path = std::path::PathBuf::from(concat!( + env!("OUT_DIR"), + "/.cache/", + module_path!(), + )); + if initial { + let _ = std::fs::remove_dir_all(&path); + } + std::fs::create_dir_all(&path).unwrap(); + turbo_tasks::TurboTasks::new( + turbo_tasks_backend::TurboTasksBackend::new( + Arc::new(turbo_tasks_backend::LmdbBackingStorage::new( + &path.as_path() + ).unwrap()) + ) + ) } diff --git a/turbopack/crates/turbo-tasks-testing/src/lib.rs b/turbopack/crates/turbo-tasks-testing/src/lib.rs index 62116f98444d4..e7412c3f8f861 100644 --- a/turbopack/crates/turbo-tasks-testing/src/lib.rs +++ b/turbopack/crates/turbo-tasks-testing/src/lib.rs @@ -297,6 +297,10 @@ impl TurboTasksApi for VcStorage { ) -> std::pin::Pin> + Send + 'static>> { unimplemented!() } + + fn stop_and_wait(&self) -> std::pin::Pin + Send + 'static>> { + Box::pin(async {}) + } } impl VcStorage { diff --git a/turbopack/crates/turbo-tasks-testing/src/run.rs b/turbopack/crates/turbo-tasks-testing/src/run.rs index c01c52b2b4f3a..dc0195ad3cf1a 100644 --- a/turbopack/crates/turbo-tasks-testing/src/run.rs +++ b/turbopack/crates/turbo-tasks-testing/src/run.rs @@ -102,6 +102,7 @@ where let tt = registration.create_turbo_tasks(&name, false); println!("Run #3 (with persistent cache if available, new TurboTasks instance)"); let third = run_once(tt.clone(), fut()).await?; + tt.stop_and_wait().await; assert_eq!(first, third); Ok(()) } diff --git a/turbopack/crates/turbo-tasks-testing/tests/basic.rs b/turbopack/crates/turbo-tasks-testing/tests/basic.rs index 84a56237e3193..28606acd3c563 100644 --- a/turbopack/crates/turbo-tasks-testing/tests/basic.rs +++ b/turbopack/crates/turbo-tasks-testing/tests/basic.rs @@ -13,9 +13,12 @@ async fn basic() { assert_eq!(output1.await?.value, 123); let input = Value { value: 42 }.cell(); - let output2 = func(input); + let output2 = func_transient(input); assert_eq!(output2.await?.value, 42); + let output3 = func_persistent(output1); + assert_eq!(output3.await?.value, 123); + anyhow::Ok(()) }) .await @@ -28,13 +31,22 @@ struct Value { } #[turbo_tasks::function] -async fn func(input: Vc) -> Result> { +async fn func_transient(input: Vc) -> Result> { + println!("func_transient"); + let value = input.await?.value; + Ok(Value { value }.cell()) +} + +#[turbo_tasks::function] +async fn func_persistent(input: Vc) -> Result> { + println!("func_persistent"); let value = input.await?.value; Ok(Value { value }.cell()) } #[turbo_tasks::function] async fn func_without_args() -> Result> { + println!("func_without_args"); let value = 123; Ok(Value { value }.cell()) } diff --git a/turbopack/crates/turbo-tasks/src/backend.rs b/turbopack/crates/turbo-tasks/src/backend.rs index a0d92eb16bcba..33ffa0bbb7f06 100644 --- a/turbopack/crates/turbo-tasks/src/backend.rs +++ b/turbopack/crates/turbo-tasks/src/backend.rs @@ -248,7 +248,7 @@ mod ser { s.serialize_element::(&0)?; s.serialize_element(&FunctionAndArg::Borrowed { fn_type: *fn_type, - arg, + arg: &**arg, })?; s.serialize_element(this)?; s.end() diff --git a/turbopack/crates/turbo-tasks/src/lib.rs b/turbopack/crates/turbo-tasks/src/lib.rs index d0a8787338040..9f867391fa357 100644 --- a/turbopack/crates/turbo-tasks/src/lib.rs +++ b/turbopack/crates/turbo-tasks/src/lib.rs @@ -100,7 +100,7 @@ pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError}; pub use read_ref::ReadRef; use rustc_hash::FxHasher; pub use state::State; -pub use task::{task_input::TaskInput, SharedReference}; +pub use task::{task_input::TaskInput, SharedReference, TypedSharedReference}; pub use trait_ref::{IntoTraitRef, TraitRef}; pub use turbo_tasks_macros::{function, value, value_impl, value_trait, KeyValuePair, TaskInput}; pub use value::{TransientInstance, TransientValue, Value}; diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index e91ce81042ec2..ec980a93de4db 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -161,6 +161,8 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send { &self, f: Pin> + Send + 'static>>, ) -> Pin> + Send + 'static>>; + + fn stop_and_wait(&self) -> Pin + Send>>; } /// A wrapper around a value that is unused. @@ -1181,6 +1183,13 @@ impl TurboTasksApi for TurboTasks { ), )) } + + fn stop_and_wait(&self) -> Pin + Send + 'static>> { + let this = self.pin(); + Box::pin(async move { + this.stop_and_wait().await; + }) + } } impl TurboTasksBackendApi for TurboTasks { @@ -1197,6 +1206,7 @@ impl TurboTasksBackendApi for TurboTasks { this.backend.run_backend_job(id, &*this).await; }) } + #[track_caller] fn schedule_backend_foreground_job(&self, id: BackendJobId) { self.schedule_foreground_job(move |this| async move { diff --git a/turbopack/crates/turbo-tasks/src/task/mod.rs b/turbopack/crates/turbo-tasks/src/task/mod.rs index 963a3a74a99d1..971eb28623611 100644 --- a/turbopack/crates/turbo-tasks/src/task/mod.rs +++ b/turbopack/crates/turbo-tasks/src/task/mod.rs @@ -4,6 +4,6 @@ pub(crate) mod task_input; pub(crate) mod task_output; pub use function::{AsyncFunctionMode, FunctionMode, IntoTaskFn, TaskFn}; -pub use shared_reference::SharedReference; +pub use shared_reference::{SharedReference, TypedSharedReference}; pub use task_input::TaskInput; pub use task_output::TaskOutput;