Skip to content

Commit

Permalink
Make CURRENT_TASK_STATE shareable across multiple tokio tasks / threa…
Browse files Browse the repository at this point in the history
…ds, wait for detached work to finish before exiting a turbo task
  • Loading branch information
bgw committed Aug 16, 2024
1 parent 12f2fa7 commit 3a78b7a
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 77 deletions.
44 changes: 17 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ tempfile = "3.3.0"
thiserror = "1.0.48"
tiny-gradient = "0.1.0"
tokio = "1.25.0"
tokio-util = { version = "0.7.7", features = ["io"] }
tokio-util = { version = "0.7.11", features = ["io", "rt"] }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"
triomphe = { git = "https://github.com/sokra/triomphe", branch = "sokra/unstable" }
Expand Down
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-memory/tests/detached.rs
54 changes: 54 additions & 0 deletions turbopack/crates/turbo-tasks-testing/tests/detached.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#![feature(arbitrary_self_types)]

use tokio::{
sync::{watch, Notify},
time::{timeout, Duration},
};
use turbo_tasks::{turbo_tasks, Completion, TransientInstance, Vc};
use turbo_tasks_testing::{register, run, Registration};

static REGISTRATION: Registration = register!();

#[tokio::test]
async fn test_spawns_detached() -> anyhow::Result<()> {
run(&REGISTRATION, || async {
let notify = TransientInstance::new(Notify::new());
let (tx, mut rx) = watch::channel(None);

// create the task
let out_vc = spawns_detached(notify.clone(), TransientInstance::new(tx));

// see that the task does not exit yet
timeout(Duration::from_millis(100), out_vc.strongly_consistent())
.await
.expect_err("should wait on the detached task");

// let the detached future exit
notify.notify_waiters();

// it should send us back a cell
let detached_vc: Vc<u32> = rx.wait_for(|opt| opt.is_some()).await.unwrap().unwrap();
assert_eq!(*detached_vc.await.unwrap(), 42);

// the parent task should now be able to exit
out_vc.strongly_consistent().await.unwrap();

Ok(())
})
.await
}

#[turbo_tasks::function]
fn spawns_detached(
notify: TransientInstance<Notify>,
sender: TransientInstance<watch::Sender<Option<Vc<u32>>>>,
) -> Vc<Completion> {
tokio::spawn(turbo_tasks().detached_for_testing(Box::pin(async move {
notify.notified().await;
// creating cells after the normal lifetime of the task should be okay, as the parent task
// is waiting on us before exiting!
sender.send(Some(Vc::cell(42))).unwrap();
Ok(())
})));
Completion::new()
}
1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ serde_json = { workspace = true }
serde_regex = "1.1.0"
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
triomphe = { workspace = true, features = ["unsize", "unstable"] }
turbo-tasks-hash = { workspace = true }
Expand Down
Loading

0 comments on commit 3a78b7a

Please sign in to comment.