Skip to content

Commit

Permalink
Make the Wasmtime CLI use async (#9184)
Browse files Browse the repository at this point in the history
* Make the Wasmtime CLI use async

This means that interrupting a running Wasm program will now work
correctly, even when the program is blocked on I/O or waiting on a timeout or
some such.

This also involved making `wasi-threads` async-compatible.

Co-Authored-By: Alex Crichton <alex@alexcrichton.com>

* rustfmt

* Make `run` command enable the `tokio` feature

* Add a test for CLI, timeouts, and sleeping forever

* Fix warning

---------

Co-authored-by: Alex Crichton <alex@alexcrichton.com>
  • Loading branch information
fitzgen and alexcrichton authored Aug 29, 2024
1 parent acbcc21 commit 4005a81
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 68 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ wasmtime-cranelift = { workspace = true, optional = true }
wasmtime-environ = { workspace = true }
wasmtime-explorer = { workspace = true, optional = true }
wasmtime-wast = { workspace = true, optional = true }
wasi-common = { workspace = true, default-features = true, features = ["exit"], optional = true }
wasi-common = { workspace = true, default-features = true, features = ["exit", "tokio"], optional = true }
wasmtime-wasi = { workspace = true, default-features = true, optional = true }
wasmtime-wasi-nn = { workspace = true, optional = true }
wasmtime-wasi-runtime-config = { workspace = true, optional = true }
Expand Down Expand Up @@ -439,7 +439,7 @@ explore = ["dep:wasmtime-explorer", "dep:tempfile"]
wast = ["dep:wasmtime-wast"]
config = ["cache"]
compile = ["cranelift"]
run = ["dep:wasmtime-wasi", "wasmtime/runtime", "dep:listenfd", "dep:wasi-common"]
run = ["dep:wasmtime-wasi", "wasmtime/runtime", "dep:listenfd", "dep:wasi-common", "dep:tokio"]

[[test]]
name = "host_segfault"
Expand Down
5 changes: 5 additions & 0 deletions crates/test-programs/src/bin/cli_sleep_forever.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use std::time::Duration;

fn main() {
std::thread::sleep(Duration::from_nanos(u64::MAX));
}
1 change: 1 addition & 0 deletions crates/wasi-threads/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ log = { workspace = true }
rand = "0.8"
wasi-common = { workspace = true, features = ["exit"]}
wasmtime = { workspace = true, features = ['threads'] }
wasmtime-wasi = { workspace = true }
19 changes: 17 additions & 2 deletions crates/wasi-threads/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ impl<T: Clone + Send + 'static> WasiThreadsCtx<T> {
let result = catch_unwind(AssertUnwindSafe(|| {
// Each new instance is created in its own store.
let mut store = Store::new(&instance_pre.module().engine(), host);
let instance = instance_pre.instantiate(&mut store).unwrap();

let instance = if instance_pre.module().engine().is_async() {
wasmtime_wasi::runtime::in_tokio(instance_pre.instantiate_async(&mut store))
} else {
instance_pre.instantiate(&mut store)
}
.unwrap();

let thread_entry_point = instance
.get_typed_func::<(i32, i32), ()>(&mut store, WASI_ENTRY_POINT)
.unwrap();
Expand All @@ -77,7 +84,15 @@ impl<T: Clone + Send + 'static> WasiThreadsCtx<T> {
WASI_ENTRY_POINT,
thread_start_arg
);
match thread_entry_point.call(&mut store, (wasi_thread_id, thread_start_arg)) {
let res = if instance_pre.module().engine().is_async() {
wasmtime_wasi::runtime::in_tokio(
thread_entry_point
.call_async(&mut store, (wasi_thread_id, thread_start_arg)),
)
} else {
thread_entry_point.call(&mut store, (wasi_thread_id, thread_start_arg))
};
match res {
Ok(_) => log::trace!("exiting thread id = {} normally", wasi_thread_id),
Err(e) => {
log::trace!("exiting thread id = {} due to error", wasi_thread_id);
Expand Down
7 changes: 7 additions & 0 deletions crates/wasmtime/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ impl Engine {
Arc::ptr_eq(&a.inner, &b.inner)
}

/// Returns whether the engine is configured to support async functions.
#[cfg(feature = "async")]
#[inline]
pub fn is_async(&self) -> bool {
self.config().async_support
}

/// Detects whether the bytes provided are a precompiled object produced by
/// Wasmtime.
///
Expand Down
152 changes: 89 additions & 63 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl RunCommand {
self.run.common.init_logging()?;

let mut config = self.run.common.config(None, None)?;
config.async_support(true);

if self.run.common.wasm.timeout.is_some() {
config.epoch_interruption(true);
Expand Down Expand Up @@ -149,61 +150,78 @@ impl RunCommand {
store.set_fuel(fuel)?;
}

// Load the preload wasm modules.
let mut modules = Vec::new();
if let RunTarget::Core(m) = &main {
modules.push((String::new(), m.clone()));
}
for (name, path) in self.preloads.iter() {
// Read the wasm module binary either as `*.wat` or a raw binary
let module = match self.run.load_module(&engine, path)? {
RunTarget::Core(m) => m,
#[cfg(feature = "component-model")]
RunTarget::Component(_) => bail!("components cannot be loaded with `--preload`"),
};
modules.push((name.clone(), module.clone()));
// Always run the module asynchronously to ensure that the module can be
// interrupted, even if it is blocking on I/O or a timeout or something.
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.enable_io()
.build()?;

// Add the module's functions to the linker.
match &mut linker {
#[cfg(feature = "cranelift")]
CliLinker::Core(linker) => {
linker.module(&mut store, name, &module).context(format!(
"failed to process preload `{}` at `{}`",
name,
path.display()
))?;
}
#[cfg(not(feature = "cranelift"))]
CliLinker::Core(_) => {
bail!("support for --preload disabled at compile time");
let dur = self
.run
.common
.wasm
.timeout
.unwrap_or(std::time::Duration::MAX);
let result = runtime.block_on(async {
tokio::time::timeout(dur, async {
// Load the preload wasm modules.
let mut modules = Vec::new();
if let RunTarget::Core(m) = &main {
modules.push((String::new(), m.clone()));
}
#[cfg(feature = "component-model")]
CliLinker::Component(_) => {
bail!("--preload cannot be used with components");
for (name, path) in self.preloads.iter() {
// Read the wasm module binary either as `*.wat` or a raw binary
let module = match self.run.load_module(&engine, path)? {
RunTarget::Core(m) => m,
#[cfg(feature = "component-model")]
RunTarget::Component(_) => {
bail!("components cannot be loaded with `--preload`")
}
};
modules.push((name.clone(), module.clone()));

// Add the module's functions to the linker.
match &mut linker {
#[cfg(feature = "cranelift")]
CliLinker::Core(linker) => {
linker
.module_async(&mut store, name, &module)
.await
.context(format!(
"failed to process preload `{}` at `{}`",
name,
path.display()
))?;
}
#[cfg(not(feature = "cranelift"))]
CliLinker::Core(_) => {
bail!("support for --preload disabled at compile time");
}
#[cfg(feature = "component-model")]
CliLinker::Component(_) => {
bail!("--preload cannot be used with components");
}
}
}
}
}

// Pre-emptively initialize and install a Tokio runtime ambiently in the
// environment when executing the module. Without this whenever a WASI
// call is made that needs to block on a future a Tokio runtime is
// configured and entered, and this appears to be slower than simply
// picking an existing runtime out of the environment and using that.
// The goal of this is to improve the performance of WASI-related
// operations that block in the CLI since the CLI doesn't use async to
// invoke WebAssembly.
let result = wasmtime_wasi::runtime::with_ambient_tokio_runtime(|| {
self.load_main_module(&mut store, &mut linker, &main, modules)
.with_context(|| {
format!(
"failed to run main module `{}`",
self.module_and_args[0].to_string_lossy()
)
})
self.load_main_module(&mut store, &mut linker, &main, modules)
.await
.with_context(|| {
format!(
"failed to run main module `{}`",
self.module_and_args[0].to_string_lossy()
)
})
})
.await
});

// Load the main wasm module.
match result {
match result.unwrap_or_else(|elapsed| {
Err(anyhow::Error::from(wasmtime::Trap::Interrupt))
.with_context(|| format!("timed out after {elapsed}"))
}) {
Ok(()) => (),
Err(e) => {
// Exit the process if Wasmtime understands the error;
Expand Down Expand Up @@ -367,7 +385,7 @@ impl RunCommand {
});
}

fn load_main_module(
async fn load_main_module(
&self,
store: &mut Store<Host>,
linker: &mut CliLinker,
Expand Down Expand Up @@ -403,15 +421,20 @@ impl RunCommand {
let result = match linker {
CliLinker::Core(linker) => {
let module = module.unwrap_core();
let instance = linker.instantiate(&mut *store, &module).context(format!(
"failed to instantiate {:?}",
self.module_and_args[0]
))?;
let instance = linker
.instantiate_async(&mut *store, &module)
.await
.context(format!(
"failed to instantiate {:?}",
self.module_and_args[0]
))?;

// If `_initialize` is present, meaning a reactor, then invoke
// the function.
if let Some(func) = instance.get_func(&mut *store, "_initialize") {
func.typed::<(), ()>(&store)?.call(&mut *store, ())?;
func.typed::<(), ()>(&store)?
.call_async(&mut *store, ())
.await?;
}

// Look for the specific function provided or otherwise look for
Expand All @@ -429,7 +452,7 @@ impl RunCommand {
};

match func {
Some(func) => self.invoke_func(store, func),
Some(func) => self.invoke_func(store, func).await,
None => Ok(()),
}
}
Expand All @@ -441,14 +464,16 @@ impl RunCommand {

let component = module.unwrap_component();

let command = wasmtime_wasi::bindings::sync::Command::instantiate(
let command = wasmtime_wasi::bindings::Command::instantiate_async(
&mut *store,
component,
linker,
)?;
)
.await?;
let result = command
.wasi_cli_run()
.call_run(&mut *store)
.await
.context("failed to invoke `run` function")
.map_err(|e| self.handle_core_dump(&mut *store, e));

Expand All @@ -465,7 +490,7 @@ impl RunCommand {
result
}

fn invoke_func(&self, store: &mut Store<Host>, func: Func) -> Result<()> {
async fn invoke_func(&self, store: &mut Store<Host>, func: Func) -> Result<()> {
let ty = func.ty(&store);
if ty.params().len() > 0 {
eprintln!(
Expand Down Expand Up @@ -505,7 +530,8 @@ impl RunCommand {
// out, if there are any.
let mut results = vec![Val::null_func_ref(); ty.results().len()];
let invoke_res = func
.call(&mut *store, &values, &mut results)
.call_async(&mut *store, &values, &mut results)
.await
.with_context(|| {
if let Some(name) = &self.invoke {
format!("failed to invoke `{name}`")
Expand Down Expand Up @@ -600,7 +626,7 @@ impl RunCommand {
// are enabled, then use the historical preview1
// implementation.
(Some(false), _) | (None, Some(true)) => {
wasi_common::sync::add_to_linker(linker, |host| {
wasi_common::tokio::add_to_linker(linker, |host| {
host.preview1_ctx.as_mut().unwrap()
})?;
self.set_preview1_ctx(store)?;
Expand All @@ -613,11 +639,11 @@ impl RunCommand {
// default-disabled in the future.
(Some(true), _) | (None, Some(false) | None) => {
if self.run.common.wasi.preview0 != Some(false) {
wasmtime_wasi::preview0::add_to_linker_sync(linker, |t| {
wasmtime_wasi::preview0::add_to_linker_async(linker, |t| {
t.preview2_ctx()
})?;
}
wasmtime_wasi::preview1::add_to_linker_sync(linker, |t| {
wasmtime_wasi::preview1::add_to_linker_async(linker, |t| {
t.preview2_ctx()
})?;
self.set_preview2_ctx(store)?;
Expand All @@ -626,7 +652,7 @@ impl RunCommand {
}
#[cfg(feature = "component-model")]
CliLinker::Component(linker) => {
wasmtime_wasi::add_to_linker_sync(linker)?;
wasmtime_wasi::add_to_linker_async(linker)?;
self.set_preview2_ctx(store)?;
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ impl RunCommon {
// the program as the CLI. This helps improve the performance of some
// blocking operations in WASI, for example, by skipping the
// back-and-forth between sync and async.
builder.allow_blocking_current_thread(true);
//
// However, do not set this if a timeout is configured, as that would
// cause the timeout to be ignored if the guest does, for example,
// something like `sleep(FOREVER)`.
builder.allow_blocking_current_thread(self.common.wasm.timeout.is_none());

if self.common.wasi.inherit_env == Some(true) {
for (k, v) in std::env::vars() {
Expand Down
22 changes: 22 additions & 0 deletions tests/all/cli_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,28 @@ mod test_programs {
Ok(())
}

#[test]
fn cli_sleep_forever() -> Result<()> {
for timeout in [
// Tests still pass when we race with going to sleep.
"-Wtimeout=1ns",
// Tests pass when we wait till the Wasm has (likely) gone to sleep.
"-Wtimeout=250ms",
] {
let e = run_wasmtime(&["run", timeout, CLI_SLEEP_FOREVER]).unwrap_err();
let e = e.to_string();
println!("Got error: {e}");
assert!(e.contains("interrupt"));

let e = run_wasmtime(&["run", timeout, CLI_SLEEP_FOREVER_COMPONENT]).unwrap_err();
let e = e.to_string();
println!("Got error: {e}");
assert!(e.contains("interrupt"));
}

Ok(())
}

/// Helper structure to manage an invocation of `wasmtime serve`
struct WasmtimeServe {
child: Option<Child>,
Expand Down

0 comments on commit 4005a81

Please sign in to comment.