Skip to content

Commit

Permalink
refactor(turbo-tasks) Add a higher-level task-local state API for the…
Browse files Browse the repository at this point in the history
… Backend trait (#68996)

This replaces the `Backend::execution_scope` API, which wouldn't work
for detached tasks or tasks using `local_cells`, as we need to share
this backend state across multiple spawned futures.

This higher-level API gives control of where the state is stored to the
manager.

## Testing

```
cargo nextest r -p turbo-tasks-memory
```
  • Loading branch information
bgw authored Sep 4, 2024
1 parent 568df76 commit 38d8301
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 75 deletions.
29 changes: 15 additions & 14 deletions turbopack/crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
borrow::{Borrow, Cow},
cell::RefCell,
future::Future,
hash::{BuildHasher, BuildHasherDefault, Hash},
num::NonZeroU32,
Expand All @@ -16,7 +15,6 @@ use anyhow::{anyhow, bail, Result};
use auto_hash_map::AutoMap;
use dashmap::{mapref::entry::Entry, DashMap};
use rustc_hash::FxHasher;
use tokio::task::futures::TaskLocalFuture;
use tracing::trace_span;
use turbo_prehash::{BuildHasherExt, PassThroughHash, PreHashed};
use turbo_tasks::{
Expand All @@ -37,14 +35,20 @@ use crate::{
PERCENTAGE_MIN_IDLE_TARGET_MEMORY, PERCENTAGE_MIN_TARGET_MEMORY,
},
output::Output,
task::{ReadCellError, Task, TaskType, DEPENDENCIES_TO_TRACK},
task::{ReadCellError, Task, TaskType},
task_statistics::TaskStatisticsApi,
};

fn prehash_task_type(task_type: CachedTaskType) -> PreHashed<CachedTaskType> {
BuildHasherDefault::<FxHasher>::prehash(&Default::default(), task_type)
}

pub struct TaskState {
/// Cells/Outputs/Collectibles that are read during task execution. These will be stored as
/// dependencies when the execution has finished.
pub dependencies_to_track: TaskEdgesSet,
}

pub struct MemoryBackend {
persistent_tasks: NoMoveVec<Task, 13>,
transient_tasks: NoMoveVec<Task, 10>,
Expand Down Expand Up @@ -436,14 +440,11 @@ impl Backend for MemoryBackend {
self.with_task(task, |task| task.get_description())
}

type ExecutionScopeFuture<T: Future<Output = Result<()>> + Send + 'static> =
TaskLocalFuture<RefCell<TaskEdgesSet>, T>;
fn execution_scope<T: Future<Output = Result<()>> + Send + 'static>(
&self,
_task: TaskId,
future: T,
) -> Self::ExecutionScopeFuture<T> {
DEPENDENCIES_TO_TRACK.scope(RefCell::new(TaskEdgesSet::new()), future)
type TaskState = TaskState;
fn new_task_state(&self, _task: TaskId) -> Self::TaskState {
TaskState {
dependencies_to_track: TaskEdgesSet::new(),
}
}

fn try_start_task_execution<'a>(
Expand Down Expand Up @@ -529,7 +530,7 @@ impl Backend for MemoryBackend {
move || format!("reading task output from {reader}"),
turbo_tasks,
|output| {
Task::add_dependency_to_current(TaskEdge::Output(task));
Task::add_dependency_to_current(TaskEdge::Output(task), turbo_tasks);
output.read(reader)
},
)
Expand Down Expand Up @@ -564,7 +565,7 @@ impl Backend for MemoryBackend {
})
.into_typed(index.type_id)))
} else {
Task::add_dependency_to_current(TaskEdge::Cell(task_id, index));
Task::add_dependency_to_current(TaskEdge::Cell(task_id, index), turbo_tasks);
self.with_task(task_id, |task| {
match task.read_cell(
index,
Expand Down Expand Up @@ -623,7 +624,7 @@ impl Backend for MemoryBackend {
reader: TaskId,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> TaskCollectiblesMap {
Task::add_dependency_to_current(TaskEdge::Collectibles(id, trait_id));
Task::add_dependency_to_current(TaskEdge::Collectibles(id, trait_id), turbo_tasks);
Task::read_collectibles(id, trait_id, reader, self, turbo_tasks)
}

Expand Down
25 changes: 10 additions & 15 deletions turbopack/crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
borrow::Cow,
cell::RefCell,
fmt::{self, Debug, Display, Formatter},
future::Future,
hash::{BuildHasherDefault, Hash},
Expand All @@ -17,14 +16,13 @@ use either::Either;
use parking_lot::{Mutex, RwLock};
use rustc_hash::FxHasher;
use smallvec::SmallVec;
use tokio::task_local;
use tracing::Span;
use turbo_prehash::PreHashed;
use turbo_tasks::{
backend::{CachedTaskType, CellContent, TaskCollectiblesMap, TaskExecutionSpec},
event::{Event, EventListener},
get_invalidator, registry, CellId, Invalidator, RawVc, ReadConsistency, TaskId, TaskIdSet,
TraitTypeId, TurboTasksBackendApi, ValueTypeId,
TraitTypeId, TurboTasksBackendApi, TurboTasksBackendApiExt, ValueTypeId,
};

use crate::{
Expand All @@ -45,12 +43,6 @@ pub type NativeTaskFn = Box<dyn Fn() -> NativeTaskFuture + Send + Sync>;
mod aggregation;
mod meta_state;

task_local! {
/// Cells/Outputs/Collectibles that are read during task execution
/// These will be stored as dependencies when the execution has finished
pub(crate) static DEPENDENCIES_TO_TRACK: RefCell<TaskEdgesSet>;
}

type OnceTaskFn = Mutex<Option<Pin<Box<dyn Future<Output = Result<RawVc>> + Send + 'static>>>>;

/// Different Task types
Expand Down Expand Up @@ -966,7 +958,8 @@ impl Task {
let mut change_job = None;
let mut remove_job = None;
let mut drained_cells = SmallVec::<[Cell; 8]>::new();
let dependencies = DEPENDENCIES_TO_TRACK.with(|deps| deps.take());
let dependencies = turbo_tasks
.write_task_state(|deps| std::mem::take(&mut deps.dependencies_to_track));
{
let mut state = self.full_state_mut();

Expand Down Expand Up @@ -1343,11 +1336,13 @@ impl Task {
}
}

pub(crate) fn add_dependency_to_current(dep: TaskEdge) {
DEPENDENCIES_TO_TRACK.with(|list| {
let mut list = list.borrow_mut();
list.insert(dep);
})
pub(crate) fn add_dependency_to_current(
dep: TaskEdge,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) {
turbo_tasks.write_task_state(|ts| {
ts.dependencies_to_track.insert(dep);
});
}

/// Get an [Invalidator] that can be used to invalidate the current [Task]
Expand Down
131 changes: 110 additions & 21 deletions turbopack/crates/turbo-tasks-testing/tests/detached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,142 @@

use tokio::{
sync::{watch, Notify},
time::{timeout, Duration},
time::{sleep, timeout, Duration},
};
use turbo_tasks::{turbo_tasks, Completion, TransientInstance, Vc};
use turbo_tasks::{turbo_tasks, State, TransientInstance, Vc};
use turbo_tasks_testing::{register, run, Registration};

static REGISTRATION: Registration = register!();

#[tokio::test]
async fn test_spawns_detached() -> anyhow::Result<()> {
run(&REGISTRATION, || async {
let notify = TransientInstance::new(Notify::new());
let (tx, mut rx) = watch::channel(None);
// timeout: prevent the test from hanging, and fail instead if this is broken
timeout(Duration::from_secs(5), async {
let notify = TransientInstance::new(Notify::new());
let (tx, mut rx) = watch::channel(None);
let tx = TransientInstance::new(tx);

// create the task
let out_vc = spawns_detached(notify.clone(), TransientInstance::new(tx));
// create the task
let out_vc = spawns_detached(notify.clone(), tx.clone());

// see that the task does not exit yet
timeout(Duration::from_millis(100), out_vc.strongly_consistent())
.await
.expect_err("should wait on the detached task");
// see that the task does not exit yet
timeout(Duration::from_millis(100), out_vc.strongly_consistent())
.await
.expect_err("should wait on the detached task");

// let the detached future exit
notify.notify_waiters();
// let the detached future exit
notify.notify_waiters();

// it should send us back a cell
let detached_vc: Vc<u32> = rx.wait_for(|opt| opt.is_some()).await.unwrap().unwrap();
assert_eq!(*detached_vc.await.unwrap(), 42);
// it should send us back a cell
let detached_vc: Vc<u32> = rx.wait_for(|opt| opt.is_some()).await?.unwrap();
assert_eq!(*detached_vc.strongly_consistent().await?, 42);

// the parent task should now be able to exit
out_vc.strongly_consistent().await.unwrap();
// the parent task should now be able to exit
out_vc.strongly_consistent().await?;

Ok(())
Ok(())
})
.await?
})
.await
}

#[turbo_tasks::function]
fn spawns_detached(
async fn spawns_detached(
notify: TransientInstance<Notify>,
sender: TransientInstance<watch::Sender<Option<Vc<u32>>>>,
) -> Vc<Completion> {
) -> Vc<()> {
tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(async move {
notify.notified().await;
// creating cells after the normal lifetime of the task should be okay, as the parent task
// is waiting on us before exiting!
sender.send(Some(Vc::cell(42))).unwrap();
Ok(())
})));
Completion::new()
Vc::cell(())
}

#[tokio::test]
async fn test_spawns_detached_changing() -> anyhow::Result<()> {
run(&REGISTRATION, || async {
// timeout: prevent the test from hanging, and fail instead if this is broken
timeout(Duration::from_secs(5), async {
let (tx, mut rx) = watch::channel(None);
let tx = TransientInstance::new(tx);

// state that's read by the detached future
let changing_input_detached = ChangingInput {
state: State::new(42),
}
.cell();

// state that's read by the outer task
let changing_input_outer = ChangingInput {
state: State::new(0),
}
.cell();

// create the task
let out_vc =
spawns_detached_changing(tx.clone(), changing_input_detached, changing_input_outer);

// it should send us back a cell
let detached_vc: Vc<u32> = rx.wait_for(|opt| opt.is_some()).await.unwrap().unwrap();
assert_eq!(*detached_vc.strongly_consistent().await.unwrap(), 42);

// the parent task should now be able to exit
out_vc.strongly_consistent().await.unwrap();

// changing either input should invalidate the vc and cause it to run again
changing_input_detached.await.unwrap().state.set(43);
out_vc.strongly_consistent().await.unwrap();
assert_eq!(*detached_vc.strongly_consistent().await.unwrap(), 43);

changing_input_outer.await.unwrap().state.set(44);
assert_eq!(*out_vc.strongly_consistent().await.unwrap(), 44);

Ok(())
})
.await?
})
.await
}

#[turbo_tasks::value]
struct ChangingInput {
state: State<u32>,
}

#[turbo_tasks::function]
async fn spawns_detached_changing(
sender: TransientInstance<watch::Sender<Option<Vc<u32>>>>,
changing_input_detached: Vc<ChangingInput>,
changing_input_outer: Vc<ChangingInput>,
) -> Vc<u32> {
let tt = turbo_tasks();
tokio::spawn(tt.clone().detached_for_testing(Box::pin(async move {
sleep(Duration::from_millis(100)).await;
// nested detached_for_testing calls should work
tokio::spawn(tt.clone().detached_for_testing(Box::pin(async move {
sleep(Duration::from_millis(100)).await;
// creating cells after the normal lifetime of the task should be okay, as the parent
// task is waiting on us before exiting!
sender
.send(Some(Vc::cell(
*read_changing_input(changing_input_detached).await.unwrap(),
)))
.unwrap();
Ok(())
})));
Ok(())
})));
Vc::cell(*read_changing_input(changing_input_outer).await.unwrap())
}

// spawns_detached should take a dependency on this function for each input
#[turbo_tasks::function]
async fn read_changing_input(changing_input: Vc<ChangingInput>) -> Vc<u32> {
// when changing_input.set is called, it will trigger an invalidator for this task
Vc::cell(*changing_input.await.unwrap().state.get())
}
29 changes: 21 additions & 8 deletions turbopack/crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,28 @@ pub trait Backend: Sync + Send {

fn get_task_description(&self, task: TaskId) -> String;

type ExecutionScopeFuture<T: Future<Output = Result<()>> + Send + 'static>: Future<Output = Result<()>>
+ Send
+ 'static;
/// Task-local state that stored inside of [`TurboTasksBackendApi`]. Constructed with
/// [`Self::new_task_state`].
///
/// This value that can later be written to or read from using
/// [`crate::TurboTasksBackendApiExt::write_task_state`] or
/// [`crate::TurboTasksBackendApiExt::read_task_state`]
///
/// This data may be shared across multiple threads (must be `Sync`) in order to support
/// detached futures ([`crate::TurboTasksApi::detached_for_testing`]) and [pseudo-tasks using
/// `local_cells`][crate::function]. A [`RwLock`][std::sync::RwLock] is used to provide
/// concurrent access.
type TaskState: Send + Sync + 'static;

fn execution_scope<T: Future<Output = Result<()>> + Send + 'static>(
&self,
task: TaskId,
future: T,
) -> Self::ExecutionScopeFuture<T>;
/// Constructs a new task-local [`Self::TaskState`] for the given `task_id`.
///
/// If a task is re-executed (e.g. because it is invalidated), this function will be called
/// again with the same [`TaskId`].
///
/// This value can be written to or read from using
/// [`crate::TurboTasksBackendApiExt::write_task_state`] and
/// [`crate::TurboTasksBackendApiExt::read_task_state`]
fn new_task_state(&self, task: TaskId) -> Self::TaskState;

fn try_start_task_execution<'a>(
&'a self,
Expand Down
3 changes: 2 additions & 1 deletion turbopack/crates/turbo-tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ pub use manager::{
dynamic_call, dynamic_this_call, emit, get_invalidator, mark_finished, mark_stateful,
prevent_gc, run_once, run_once_with_reason, spawn_blocking, spawn_thread, trait_call,
turbo_tasks, CurrentCellRef, Invalidator, ReadConsistency, TaskPersistence, TurboTasks,
TurboTasksApi, TurboTasksBackendApi, TurboTasksCallApi, Unused, UpdateInfo,
TurboTasksApi, TurboTasksBackendApi, TurboTasksBackendApiExt, TurboTasksCallApi, Unused,
UpdateInfo,
};
pub use native_function::{FunctionMeta, NativeFunction};
pub use raw_vc::{CellId, RawVc, ReadRawVcFuture, ResolveTypeError};
Expand Down
Loading

0 comments on commit 38d8301

Please sign in to comment.