Skip to content

Commit

Permalink
Add logging and request id tracking to wasmtime serve (#7366)
Browse files Browse the repository at this point in the history
* Add logging and request id tracking to `wasmtime serve`

* Dispatch directly to stdout/stderr in LogStream::write
  • Loading branch information
elliottt authored Oct 25, 2023
1 parent 952c6a5 commit 05a6b3b
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ tracing = { workspace = true }
log = { workspace = true }

async-trait = { workspace = true }
bytes = { workspace = true }
tokio = { workspace = true, optional = true, features = [ "signal", "macros" ] }
hyper = { workspace = true, optional = true }
http-body-util = { workspace = true, optional = true }
Expand Down
122 changes: 110 additions & 12 deletions src/commands/serve.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use crate::common::{Profile, RunCommon, RunTarget};
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use clap::Parser;
use std::{
path::PathBuf,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
use wasmtime::component::{InstancePre, Linker};
use wasmtime::{Engine, Store, StoreLimits};
use wasmtime_wasi::preview2::{Table, WasiCtx, WasiCtxBuilder, WasiView};
use wasmtime_wasi::preview2::{
self, StreamError, StreamResult, Table, WasiCtx, WasiCtxBuilder, WasiView,
};
use wasmtime_wasi_http::{body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView};

#[cfg(feature = "wasi-nn")]
Expand Down Expand Up @@ -135,10 +137,20 @@ impl ServeCommand {
Ok(())
}

fn new_store(&self, engine: &Engine) -> Result<Store<Host>> {
fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
let mut builder = WasiCtxBuilder::new();

// TODO: connect stdio to logging infrastructure
builder.envs(&[("REQUEST_ID", req_id.to_string())]);

builder.stdout(LogStream {
prefix: format!("stdout [{req_id}] :: "),
output: Output::Stdout,
});

builder.stderr(LogStream {
prefix: format!("stderr [{req_id}] :: "),
output: Output::Stderr,
});

let mut host = Host {
table: Table::new(),
Expand Down Expand Up @@ -244,6 +256,8 @@ impl ServeCommand {
None
};

log::info!("Listening on {}", self.addr);

let handler = ProxyHandler::new(self, engine, instance);

loop {
Expand Down Expand Up @@ -298,6 +312,13 @@ struct ProxyHandlerInner {
cmd: ServeCommand,
engine: Engine,
instance_pre: InstancePre<Host>,
next_id: AtomicU64,
}

impl ProxyHandlerInner {
fn next_req_id(&self) -> u64 {
self.next_id.fetch_add(1, Ordering::Relaxed)
}
}

#[derive(Clone)]
Expand All @@ -309,6 +330,7 @@ impl ProxyHandler {
cmd,
engine,
instance_pre,
next_id: AtomicU64::from(0),
}))
}
}
Expand All @@ -323,25 +345,31 @@ impl hyper::service::Service<Request> for ProxyHandler {
fn call(&mut self, req: Request) -> Self::Future {
use http_body_util::BodyExt;

let handler = self.clone();
let ProxyHandler(inner) = self.clone();

let (sender, receiver) = tokio::sync::oneshot::channel();

// TODO: need to track the join handle, but don't want to block the response on it
tokio::task::spawn(async move {
let mut store = handler.0.cmd.new_store(&handler.0.engine)?;
let req_id = inner.next_req_id();

log::info!(
"Request {req_id} handling {} to {}",
req.method(),
req.uri()
);

let mut store = inner.cmd.new_store(&inner.engine, req_id)?;

let req = store.data_mut().new_incoming_request(
req.map(|body| body.map_err(|e| anyhow::anyhow!(e)).boxed()),
)?;

let out = store.data_mut().new_response_outparam(sender)?;

let (proxy, _inst) = wasmtime_wasi_http::proxy::Proxy::instantiate_pre(
&mut store,
&handler.0.instance_pre,
)
.await?;
let (proxy, _inst) =
wasmtime_wasi_http::proxy::Proxy::instantiate_pre(&mut store, &inner.instance_pre)
.await?;

proxy
.wasi_http_incoming_handler()
Expand All @@ -357,3 +385,73 @@ impl hyper::service::Service<Request> for ProxyHandler {
})
}
}

#[derive(Clone)]
enum Output {
Stdout,
Stderr,
}

impl Output {
fn write_all(&self, buf: &[u8]) -> anyhow::Result<()> {
use std::io::Write;

match self {
Output::Stdout => std::io::stdout().write_all(buf),
Output::Stderr => std::io::stderr().write_all(buf),
}
.map_err(|e| anyhow!(e))
}
}

#[derive(Clone)]
struct LogStream {
prefix: String,
output: Output,
}

impl preview2::StdoutStream for LogStream {
fn stream(&self) -> Box<dyn preview2::HostOutputStream> {
Box::new(self.clone())
}

fn isatty(&self) -> bool {
use std::io::IsTerminal;

match &self.output {
Output::Stdout => std::io::stdout().is_terminal(),
Output::Stderr => std::io::stderr().is_terminal(),
}
}
}

impl preview2::HostOutputStream for LogStream {
fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
let mut msg = Vec::new();

for line in bytes.split(|c| *c == b'\n') {
if !line.is_empty() {
msg.extend_from_slice(&self.prefix.as_bytes());
msg.extend_from_slice(line);
msg.push(b'\n');
}
}

self.output
.write_all(&msg)
.map_err(StreamError::LastOperationFailed)
}

fn flush(&mut self) -> StreamResult<()> {
Ok(())
}

fn check_write(&mut self) -> StreamResult<usize> {
Ok(1024 * 1024)
}
}

#[async_trait::async_trait]
impl preview2::Subscribe for LogStream {
async fn ready(&mut self) {}
}

0 comments on commit 05a6b3b

Please sign in to comment.