Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logging and request id tracking to wasmtime serve #7366

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ tracing = { workspace = true }
log = { workspace = true }

async-trait = { workspace = true }
bytes = { workspace = true }
tokio = { workspace = true, optional = true, features = [ "signal", "macros" ] }
hyper = { workspace = true, optional = true }
http-body-util = { workspace = true, optional = true }
Expand Down
120 changes: 103 additions & 17 deletions src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use std::{
path::PathBuf,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
};
use wasmtime::component::{InstancePre, Linker};
use wasmtime::{Engine, Store, StoreLimits};
use wasmtime_wasi::preview2::{Table, WasiCtx, WasiCtxBuilder, WasiView};
use wasmtime_wasi::preview2::{self, StreamResult, Table, WasiCtx, WasiCtxBuilder, WasiView};
use wasmtime_wasi_http::{body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView};

#[cfg(feature = "wasi-nn")]
Expand Down Expand Up @@ -135,10 +135,20 @@ impl ServeCommand {
Ok(())
}

fn new_store(&self, engine: &Engine) -> Result<Store<Host>> {
fn new_store(&self, engine: &Engine, req_id: u64, output: OutputMutex) -> Result<Store<Host>> {
let mut builder = WasiCtxBuilder::new();

// TODO: connect stdio to logging infrastructure
builder.envs(&[("REQUEST_ID", req_id.to_string())]);

builder.stdout(LogStream {
prefix: format!("stdout [{req_id}] :: "),
output: output.clone(),
});

builder.stderr(LogStream {
prefix: format!("stderr [{req_id}] :: "),
output,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be reasonable to make a separate Stderr handle here. My thought was that since the output is already prefixed with stdout/stderr, using the same output stream would be okay.

});

let mut host = Host {
table: Table::new(),
Expand Down Expand Up @@ -244,6 +254,8 @@ impl ServeCommand {
None
};

log::info!("Listening on {}", self.addr);

let handler = ProxyHandler::new(self, engine, instance);

loop {
Expand Down Expand Up @@ -298,18 +310,34 @@ struct ProxyHandlerInner {
cmd: ServeCommand,
engine: Engine,
instance_pre: InstancePre<Host>,
next_id: AtomicU64,
}

impl ProxyHandlerInner {
fn next_req_id(&self) -> u64 {
self.next_id.fetch_add(1, Ordering::Relaxed)
}
}

type OutputMutex = Arc<tokio::sync::Mutex<std::io::Stdout>>;

#[derive(Clone)]
struct ProxyHandler(Arc<ProxyHandlerInner>);
struct ProxyHandler {
inner: Arc<ProxyHandlerInner>,
output: OutputMutex,
}

impl ProxyHandler {
fn new(cmd: ServeCommand, engine: Engine, instance_pre: InstancePre<Host>) -> Self {
Self(Arc::new(ProxyHandlerInner {
cmd,
engine,
instance_pre,
}))
Self {
inner: Arc::new(ProxyHandlerInner {
cmd,
engine,
instance_pre,
next_id: AtomicU64::from(0),
}),
output: Arc::new(tokio::sync::Mutex::new(std::io::stdout())),
}
}
}

Expand All @@ -323,25 +351,31 @@ impl hyper::service::Service<Request> for ProxyHandler {
fn call(&mut self, req: Request) -> Self::Future {
use http_body_util::BodyExt;

let handler = self.clone();
let ProxyHandler { inner, output } = self.clone();

let (sender, receiver) = tokio::sync::oneshot::channel();

// TODO: need to track the join handle, but don't want to block the response on it
tokio::task::spawn(async move {
let mut store = handler.0.cmd.new_store(&handler.0.engine)?;
let req_id = inner.next_req_id();

log::info!(
"Request {req_id} handling {} to {}",
req.method(),
req.uri()
);

let mut store = inner.cmd.new_store(&inner.engine, req_id, output)?;

let req = store.data_mut().new_incoming_request(
req.map(|body| body.map_err(|e| anyhow::anyhow!(e)).boxed()),
)?;

let out = store.data_mut().new_response_outparam(sender)?;

let (proxy, _inst) = wasmtime_wasi_http::proxy::Proxy::instantiate_pre(
&mut store,
&handler.0.instance_pre,
)
.await?;
let (proxy, _inst) =
wasmtime_wasi_http::proxy::Proxy::instantiate_pre(&mut store, &inner.instance_pre)
.await?;

proxy
.wasi_http_incoming_handler()
Expand All @@ -357,3 +391,55 @@ impl hyper::service::Service<Request> for ProxyHandler {
})
}
}

#[derive(Clone)]
struct LogStream {
prefix: String,
output: OutputMutex,
}

impl preview2::StdoutStream for LogStream {
fn stream(&self) -> Box<dyn preview2::HostOutputStream> {
Box::new(self.clone())
}

fn isatty(&self) -> bool {
false
}
}

impl preview2::HostOutputStream for LogStream {
fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
let mut msg = Vec::new();

for line in bytes.split(|c| *c == b'\n') {
if !line.is_empty() {
msg.extend_from_slice(&self.prefix.as_bytes());
msg.extend_from_slice(line);
msg.push(b'\n');
}
}

let output = self.output.clone();
tokio::task::spawn(async move {
use std::io::Write;
let mut output = output.lock().await;
output.write_all(&msg).expect("writing to stdout");
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you think of going ahead and directly doing a blocking write to stdout/stderr here? That'd help applying a bit of backpressure (albeit not in a perfect way) if necessary and additionally would avoid the need to pass around streams much. That'd also perhaps make it a bit easier to write stdout to stdout and stderr to stderr.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion! I've implemented that, and substantially simplified the LogStream::write implementation :)


Ok(())
}

fn flush(&mut self) -> StreamResult<()> {
Ok(())
}

fn check_write(&mut self) -> StreamResult<usize> {
Ok(1024 * 1024)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is arbitrary.

}
}

#[async_trait::async_trait]
impl preview2::Subscribe for LogStream {
async fn ready(&mut self) {}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear that there's a reasonable implementation for this here. Perhaps using tokio::io::Stdout instead of a mutex guarded std::io::Stdout would make more sense?

}