Skip to content

Commit

Permalink
add lmdb persisting and restoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Aug 13, 2024
1 parent 5103d42 commit 47baaa6
Show file tree
Hide file tree
Showing 21 changed files with 519 additions and 74 deletions.
50 changes: 26 additions & 24 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions turbopack/crates/turbo-tasks-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ workspace = true
anyhow = { workspace = true }
async-trait = { workspace = true }
auto-hash-map = { workspace = true }
bincode = "1.3.3"
dashmap = { workspace = true }
indexmap = { workspace = true }
lmdb = "0.8.0"
once_cell = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
Expand Down
130 changes: 112 additions & 18 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ mod storage;

use std::{
borrow::Cow,
collections::HashSet,
collections::{HashMap, HashSet},
future::Future,
hash::BuildHasherDefault,
mem::take,
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
time::Duration,
time::{Duration, Instant},
};

use anyhow::Result;
Expand All @@ -36,6 +37,7 @@ use turbo_tasks::{

use self::{operation::ExecuteContext, storage::Storage};
use crate::{
backing_storage::BackingStorage,
data::{
CachedDataItem, CachedDataItemKey, CachedDataItemValue, CachedDataUpdate, CellRef,
InProgressState, OutputValue, RootType,
Expand All @@ -61,6 +63,8 @@ impl SnapshotRequest {
}

pub struct TurboTasksBackend {
start_time: Instant,

persisted_task_id_factory: IdFactoryWithReuse<TaskId>,
transient_task_id_factory: IdFactoryWithReuse<TaskId>,

Expand All @@ -86,19 +90,18 @@ pub struct TurboTasksBackend {
/// Condition Variable that is triggered when a snapshot is completed and
/// operations can continue.
snapshot_completed: Condvar,
}
/// The timestamp of the last started snapshot.
last_snapshot: AtomicU64,

impl Default for TurboTasksBackend {
fn default() -> Self {
Self::new()
}
backing_storage: Arc<dyn BackingStorage + Sync + Send>,
}

impl TurboTasksBackend {
pub fn new() -> Self {
pub fn new(backing_storage: Arc<dyn BackingStorage + Sync + Send>) -> Self {
Self {
start_time: Instant::now(),
persisted_task_id_factory: IdFactoryWithReuse::new_with_range(
1,
*backing_storage.next_free_task_id() as u64,
(TRANSIENT_TASK_BIT - 1) as u64,
),
transient_task_id_factory: IdFactoryWithReuse::new_with_range(
Expand All @@ -114,6 +117,8 @@ impl TurboTasksBackend {
snapshot_request: Mutex::new(SnapshotRequest::new()),
operations_suspended: Condvar::new(),
snapshot_completed: Condvar::new(),
last_snapshot: AtomicU64::new(0),
backing_storage,
}
}

Expand Down Expand Up @@ -288,14 +293,78 @@ impl TurboTasksBackend {
reader_task.add(CachedDataItem::CellDependency { target, value: () });
}
}
return Ok(Ok(CellContent(Some(content)).into_typed(cell.type_id)));
return Ok(Ok(TypedCellContent(
cell.type_id,
CellContent(Some(content.1)),
)));
}

todo!("Cell {cell:?} is not available, recompute task or error: {task:#?}");
}

fn snapshot(&self) -> Option<Instant> {
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = true;
let active_operations = self
.in_progress_operations
.fetch_or(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed);
if active_operations != 0 {
self.operations_suspended
.wait_while(&mut snapshot_request, |_| {
self.in_progress_operations
.load(std::sync::atomic::Ordering::Relaxed)
!= SNAPSHOT_REQUESTED_BIT
});
}
let suspended_operations = snapshot_request
.suspended_operations
.iter()
.map(|op| op.arc().clone())
.collect::<Vec<_>>();
drop(snapshot_request);
let persisted_storage_log = take(&mut *self.persisted_storage_log.lock());
let persisted_task_cache_log = take(&mut *self.persisted_task_cache_log.lock());
let mut snapshot_request = self.snapshot_request.lock();
snapshot_request.snapshot_requested = false;
self.in_progress_operations
.fetch_sub(SNAPSHOT_REQUESTED_BIT, std::sync::atomic::Ordering::Relaxed);
self.snapshot_completed.notify_all();
let snapshot_time = Instant::now();
drop(snapshot_request);

let mut counts: HashMap<TaskId, u32> = HashMap::new();
for CachedDataUpdate { task, .. } in persisted_storage_log.iter() {
*counts.entry(*task).or_default() += 1;
}

if !persisted_task_cache_log.is_empty() || !persisted_storage_log.is_empty() {
if let Err(err) = self.backing_storage.save_snapshot(
suspended_operations,
persisted_task_cache_log,
persisted_storage_log,
) {
println!("Persising failed: {:#?}", err);
return None;
}
println!("Snapshot saved");
}

for (task_id, count) in counts {
self.storage
.access_mut(task_id)
.persistance_state
.finish_persisting_items(count);
}

Some(snapshot_time)
}
}

impl Backend for TurboTasksBackend {
fn startup(&self, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
turbo_tasks.schedule_backend_background_job(BackendJobId::from(1));
}

fn get_or_create_persistent_task(
&self,
task_type: CachedTaskType,
Expand All @@ -307,6 +376,12 @@ impl Backend for TurboTasksBackend {
return task_id;
}

if let Some(task_id) = self.backing_storage.forward_lookup_task_cache(&task_type) {
let _ = self.task_cache.try_insert(Arc::new(task_type), task_id);
self.connect_child(parent_task, task_id, turbo_tasks);
return task_id;
}

let task_type = Arc::new(task_type);
let task_id = self.persisted_task_id_factory.get();
if let Err(existing_task_id) = self.task_cache.try_insert(task_type.clone(), task_id) {
Expand Down Expand Up @@ -652,12 +727,31 @@ impl Backend for TurboTasksBackend {
stale
}

fn run_backend_job(
&self,
_: BackendJobId,
_: &dyn TurboTasksBackendApi<Self>,
) -> Pin<Box<(dyn Future<Output = ()> + Send + 'static)>> {
todo!()
fn run_backend_job<'a>(
&'a self,
id: BackendJobId,
turbo_tasks: &'a dyn TurboTasksBackendApi<Self>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
Box::pin(async move {
if *id == 1 {
const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(1);

let last_snapshot = self.last_snapshot.load(Ordering::Relaxed);
let last_snapshot = self.start_time + Duration::from_millis(last_snapshot);
let elapsed = last_snapshot.elapsed();
if elapsed < SNAPSHOT_INTERVAL {
tokio::time::sleep(SNAPSHOT_INTERVAL - elapsed).await;
}

if let Some(last_snapshot) = self.snapshot() {
let last_snapshot = last_snapshot.duration_since(self.start_time);
self.last_snapshot
.store(last_snapshot.as_millis() as u64, Ordering::Relaxed);

turbo_tasks.schedule_backend_background_job(id);
}
}
})
}

fn try_read_task_output(
Expand Down Expand Up @@ -707,7 +801,7 @@ impl Backend for TurboTasksBackend {
let ctx = self.execute_context(turbo_tasks);
let task = ctx.task(task_id);
if let Some(content) = get!(task, CellData { cell }) {
Ok(CellContent(Some(content.clone())).into_typed(cell.type_id))
Ok(CellContent(Some(content.1.clone())).into_typed(cell.type_id))
} else {
Ok(CellContent(None).into_typed(cell.type_id))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use turbo_tasks::TaskId;

use super::{ExecuteContext, Operation};
use crate::data::CachedDataItem;
use crate::data::{CachedDataItem, CachedDataItemKey};

#[derive(Serialize, Deserialize, Clone, Default)]
pub enum ConnectChildOperation {
Expand Down Expand Up @@ -37,11 +37,17 @@ impl Operation for ConnectChildOperation {
ctx.operation_suspend_point(&self);
match self {
ConnectChildOperation::ScheduleTask { task_id } => {
let mut should_schedule;
{
let mut task = ctx.task(task_id);
task.add(CachedDataItem::new_scheduled(task_id));
should_schedule = !task.has_key(&CachedDataItemKey::Output {});
if should_schedule {
should_schedule = task.add(CachedDataItem::new_scheduled(task_id));
}
}
if should_schedule {
ctx.schedule(task_id);
}
ctx.schedule(task_id);

self = ConnectChildOperation::Done;
continue;
Expand Down
Loading

0 comments on commit 47baaa6

Please sign in to comment.