Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the Wasmtime CLI use async #9184

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ wasmtime-cranelift = { workspace = true, optional = true }
wasmtime-environ = { workspace = true }
wasmtime-explorer = { workspace = true, optional = true }
wasmtime-wast = { workspace = true, optional = true }
wasi-common = { workspace = true, default-features = true, features = ["exit"], optional = true }
wasi-common = { workspace = true, default-features = true, features = ["exit", "tokio"], optional = true }
wasmtime-wasi = { workspace = true, default-features = true, optional = true }
wasmtime-wasi-nn = { workspace = true, optional = true }
wasmtime-wasi-runtime-config = { workspace = true, optional = true }
Expand Down Expand Up @@ -439,7 +439,7 @@ explore = ["dep:wasmtime-explorer", "dep:tempfile"]
wast = ["dep:wasmtime-wast"]
config = ["cache"]
compile = ["cranelift"]
run = ["dep:wasmtime-wasi", "wasmtime/runtime", "dep:listenfd", "dep:wasi-common"]
run = ["dep:wasmtime-wasi", "wasmtime/runtime", "dep:listenfd", "dep:wasi-common", "dep:tokio"]

[[test]]
name = "host_segfault"
Expand Down
5 changes: 5 additions & 0 deletions crates/test-programs/src/bin/cli_sleep_forever.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use std::time::Duration;

fn main() {
std::thread::sleep(Duration::from_nanos(u64::MAX));
}
1 change: 1 addition & 0 deletions crates/wasi-threads/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ log = { workspace = true }
rand = "0.8"
wasi-common = { workspace = true, features = ["exit"]}
wasmtime = { workspace = true, features = ['threads'] }
wasmtime-wasi = { workspace = true }
19 changes: 17 additions & 2 deletions crates/wasi-threads/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ impl<T: Clone + Send + 'static> WasiThreadsCtx<T> {
let result = catch_unwind(AssertUnwindSafe(|| {
// Each new instance is created in its own store.
let mut store = Store::new(&instance_pre.module().engine(), host);
let instance = instance_pre.instantiate(&mut store).unwrap();

let instance = if instance_pre.module().engine().is_async() {
wasmtime_wasi::runtime::in_tokio(instance_pre.instantiate_async(&mut store))
} else {
instance_pre.instantiate(&mut store)
}
.unwrap();

let thread_entry_point = instance
.get_typed_func::<(i32, i32), ()>(&mut store, WASI_ENTRY_POINT)
.unwrap();
Expand All @@ -77,7 +84,15 @@ impl<T: Clone + Send + 'static> WasiThreadsCtx<T> {
WASI_ENTRY_POINT,
thread_start_arg
);
match thread_entry_point.call(&mut store, (wasi_thread_id, thread_start_arg)) {
let res = if instance_pre.module().engine().is_async() {
wasmtime_wasi::runtime::in_tokio(
thread_entry_point
.call_async(&mut store, (wasi_thread_id, thread_start_arg)),
)
} else {
thread_entry_point.call(&mut store, (wasi_thread_id, thread_start_arg))
};
match res {
Ok(_) => log::trace!("exiting thread id = {} normally", wasi_thread_id),
Err(e) => {
log::trace!("exiting thread id = {} due to error", wasi_thread_id);
Expand Down
7 changes: 7 additions & 0 deletions crates/wasmtime/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ impl Engine {
Arc::ptr_eq(&a.inner, &b.inner)
}

/// Returns whether the engine is configured to support async functions.
#[cfg(feature = "async")]
#[inline]
pub fn is_async(&self) -> bool {
self.config().async_support
}

/// Detects whether the bytes provided are a precompiled object produced by
/// Wasmtime.
///
Expand Down
152 changes: 89 additions & 63 deletions src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl RunCommand {
self.run.common.init_logging()?;

let mut config = self.run.common.config(None, None)?;
config.async_support(true);

if self.run.common.wasm.timeout.is_some() {
config.epoch_interruption(true);
Expand Down Expand Up @@ -149,61 +150,78 @@ impl RunCommand {
store.set_fuel(fuel)?;
}

// Load the preload wasm modules.
let mut modules = Vec::new();
if let RunTarget::Core(m) = &main {
modules.push((String::new(), m.clone()));
}
for (name, path) in self.preloads.iter() {
// Read the wasm module binary either as `*.wat` or a raw binary
let module = match self.run.load_module(&engine, path)? {
RunTarget::Core(m) => m,
#[cfg(feature = "component-model")]
RunTarget::Component(_) => bail!("components cannot be loaded with `--preload`"),
};
modules.push((name.clone(), module.clone()));
// Always run the module asynchronously to ensure that the module can be
// interrupted, even if it is blocking on I/O or a timeout or something.
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.enable_io()
.build()?;

// Add the module's functions to the linker.
match &mut linker {
#[cfg(feature = "cranelift")]
CliLinker::Core(linker) => {
linker.module(&mut store, name, &module).context(format!(
"failed to process preload `{}` at `{}`",
name,
path.display()
))?;
}
#[cfg(not(feature = "cranelift"))]
CliLinker::Core(_) => {
bail!("support for --preload disabled at compile time");
let dur = self
.run
.common
.wasm
.timeout
.unwrap_or(std::time::Duration::MAX);
let result = runtime.block_on(async {
tokio::time::timeout(dur, async {
// Load the preload wasm modules.
let mut modules = Vec::new();
if let RunTarget::Core(m) = &main {
modules.push((String::new(), m.clone()));
}
#[cfg(feature = "component-model")]
CliLinker::Component(_) => {
bail!("--preload cannot be used with components");
for (name, path) in self.preloads.iter() {
// Read the wasm module binary either as `*.wat` or a raw binary
let module = match self.run.load_module(&engine, path)? {
RunTarget::Core(m) => m,
#[cfg(feature = "component-model")]
RunTarget::Component(_) => {
bail!("components cannot be loaded with `--preload`")
}
};
modules.push((name.clone(), module.clone()));

// Add the module's functions to the linker.
match &mut linker {
#[cfg(feature = "cranelift")]
CliLinker::Core(linker) => {
linker
.module_async(&mut store, name, &module)
.await
.context(format!(
"failed to process preload `{}` at `{}`",
name,
path.display()
))?;
}
#[cfg(not(feature = "cranelift"))]
CliLinker::Core(_) => {
bail!("support for --preload disabled at compile time");
}
#[cfg(feature = "component-model")]
CliLinker::Component(_) => {
bail!("--preload cannot be used with components");
}
}
}
}
}

// Pre-emptively initialize and install a Tokio runtime ambiently in the
// environment when executing the module. Without this whenever a WASI
// call is made that needs to block on a future a Tokio runtime is
// configured and entered, and this appears to be slower than simply
// picking an existing runtime out of the environment and using that.
// The goal of this is to improve the performance of WASI-related
// operations that block in the CLI since the CLI doesn't use async to
// invoke WebAssembly.
let result = wasmtime_wasi::runtime::with_ambient_tokio_runtime(|| {
self.load_main_module(&mut store, &mut linker, &main, modules)
.with_context(|| {
format!(
"failed to run main module `{}`",
self.module_and_args[0].to_string_lossy()
)
})
self.load_main_module(&mut store, &mut linker, &main, modules)
.await
.with_context(|| {
format!(
"failed to run main module `{}`",
self.module_and_args[0].to_string_lossy()
)
})
})
.await
});

// Load the main wasm module.
match result {
match result.unwrap_or_else(|elapsed| {
Err(anyhow::Error::from(wasmtime::Trap::Interrupt))
.with_context(|| format!("timed out after {elapsed}"))
}) {
Ok(()) => (),
Err(e) => {
// Exit the process if Wasmtime understands the error;
Expand Down Expand Up @@ -367,7 +385,7 @@ impl RunCommand {
});
}

fn load_main_module(
async fn load_main_module(
&self,
store: &mut Store<Host>,
linker: &mut CliLinker,
Expand Down Expand Up @@ -403,15 +421,20 @@ impl RunCommand {
let result = match linker {
CliLinker::Core(linker) => {
let module = module.unwrap_core();
let instance = linker.instantiate(&mut *store, &module).context(format!(
"failed to instantiate {:?}",
self.module_and_args[0]
))?;
let instance = linker
.instantiate_async(&mut *store, &module)
.await
.context(format!(
"failed to instantiate {:?}",
self.module_and_args[0]
))?;

// If `_initialize` is present, meaning a reactor, then invoke
// the function.
if let Some(func) = instance.get_func(&mut *store, "_initialize") {
func.typed::<(), ()>(&store)?.call(&mut *store, ())?;
func.typed::<(), ()>(&store)?
.call_async(&mut *store, ())
.await?;
}

// Look for the specific function provided or otherwise look for
Expand All @@ -429,7 +452,7 @@ impl RunCommand {
};

match func {
Some(func) => self.invoke_func(store, func),
Some(func) => self.invoke_func(store, func).await,
None => Ok(()),
}
}
Expand All @@ -441,14 +464,16 @@ impl RunCommand {

let component = module.unwrap_component();

let command = wasmtime_wasi::bindings::sync::Command::instantiate(
let command = wasmtime_wasi::bindings::Command::instantiate_async(
&mut *store,
component,
linker,
)?;
)
.await?;
let result = command
.wasi_cli_run()
.call_run(&mut *store)
.await
.context("failed to invoke `run` function")
.map_err(|e| self.handle_core_dump(&mut *store, e));

Expand All @@ -465,7 +490,7 @@ impl RunCommand {
result
}

fn invoke_func(&self, store: &mut Store<Host>, func: Func) -> Result<()> {
async fn invoke_func(&self, store: &mut Store<Host>, func: Func) -> Result<()> {
let ty = func.ty(&store);
if ty.params().len() > 0 {
eprintln!(
Expand Down Expand Up @@ -505,7 +530,8 @@ impl RunCommand {
// out, if there are any.
let mut results = vec![Val::null_func_ref(); ty.results().len()];
let invoke_res = func
.call(&mut *store, &values, &mut results)
.call_async(&mut *store, &values, &mut results)
.await
.with_context(|| {
if let Some(name) = &self.invoke {
format!("failed to invoke `{name}`")
Expand Down Expand Up @@ -600,7 +626,7 @@ impl RunCommand {
// are enabled, then use the historical preview1
// implementation.
(Some(false), _) | (None, Some(true)) => {
wasi_common::sync::add_to_linker(linker, |host| {
wasi_common::tokio::add_to_linker(linker, |host| {
host.preview1_ctx.as_mut().unwrap()
})?;
self.set_preview1_ctx(store)?;
Expand All @@ -613,11 +639,11 @@ impl RunCommand {
// default-disabled in the future.
(Some(true), _) | (None, Some(false) | None) => {
if self.run.common.wasi.preview0 != Some(false) {
wasmtime_wasi::preview0::add_to_linker_sync(linker, |t| {
wasmtime_wasi::preview0::add_to_linker_async(linker, |t| {
t.preview2_ctx()
})?;
}
wasmtime_wasi::preview1::add_to_linker_sync(linker, |t| {
wasmtime_wasi::preview1::add_to_linker_async(linker, |t| {
t.preview2_ctx()
})?;
self.set_preview2_ctx(store)?;
Expand All @@ -626,7 +652,7 @@ impl RunCommand {
}
#[cfg(feature = "component-model")]
CliLinker::Component(linker) => {
wasmtime_wasi::add_to_linker_sync(linker)?;
wasmtime_wasi::add_to_linker_async(linker)?;
self.set_preview2_ctx(store)?;
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ impl RunCommon {
// the program as the CLI. This helps improve the performance of some
// blocking operations in WASI, for example, by skipping the
// back-and-forth between sync and async.
builder.allow_blocking_current_thread(true);
//
// However, do not set this if a timeout is configured, as that would
// cause the timeout to be ignored if the guest does, for example,
// something like `sleep(FOREVER)`.
builder.allow_blocking_current_thread(self.common.wasm.timeout.is_none());

if self.common.wasi.inherit_env == Some(true) {
for (k, v) in std::env::vars() {
Expand Down
22 changes: 22 additions & 0 deletions tests/all/cli_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,28 @@ mod test_programs {
Ok(())
}

#[test]
fn cli_sleep_forever() -> Result<()> {
for timeout in [
// Tests still pass when we race with going to sleep.
"-Wtimeout=1ns",
// Tests pass when we wait till the Wasm has (likely) gone to sleep.
"-Wtimeout=250ms",
] {
let e = run_wasmtime(&["run", timeout, CLI_SLEEP_FOREVER]).unwrap_err();
let e = e.to_string();
println!("Got error: {e}");
assert!(e.contains("interrupt"));

let e = run_wasmtime(&["run", timeout, CLI_SLEEP_FOREVER_COMPONENT]).unwrap_err();
let e = e.to_string();
println!("Got error: {e}");
assert!(e.contains("interrupt"));
}

Ok(())
}

/// Helper structure to manage an invocation of `wasmtime serve`
struct WasmtimeServe {
child: Option<Child>,
Expand Down