-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add logging and request id tracking to wasmtime serve
#7366
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,13 +5,13 @@ use std::{ | |
path::PathBuf, | ||
pin::Pin, | ||
sync::{ | ||
atomic::{AtomicBool, Ordering}, | ||
atomic::{AtomicBool, AtomicU64, Ordering}, | ||
Arc, | ||
}, | ||
}; | ||
use wasmtime::component::{InstancePre, Linker}; | ||
use wasmtime::{Engine, Store, StoreLimits}; | ||
use wasmtime_wasi::preview2::{Table, WasiCtx, WasiCtxBuilder, WasiView}; | ||
use wasmtime_wasi::preview2::{self, StreamResult, Table, WasiCtx, WasiCtxBuilder, WasiView}; | ||
use wasmtime_wasi_http::{body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView}; | ||
|
||
#[cfg(feature = "wasi-nn")] | ||
|
@@ -135,10 +135,20 @@ impl ServeCommand { | |
Ok(()) | ||
} | ||
|
||
fn new_store(&self, engine: &Engine) -> Result<Store<Host>> { | ||
fn new_store(&self, engine: &Engine, req_id: u64, output: OutputMutex) -> Result<Store<Host>> { | ||
let mut builder = WasiCtxBuilder::new(); | ||
|
||
// TODO: connect stdio to logging infrastructure | ||
builder.envs(&[("REQUEST_ID", req_id.to_string())]); | ||
|
||
builder.stdout(LogStream { | ||
prefix: format!("stdout [{req_id}] :: "), | ||
output: output.clone(), | ||
}); | ||
|
||
builder.stderr(LogStream { | ||
prefix: format!("stderr [{req_id}] :: "), | ||
output, | ||
}); | ||
|
||
let mut host = Host { | ||
table: Table::new(), | ||
|
@@ -244,6 +254,8 @@ impl ServeCommand { | |
None | ||
}; | ||
|
||
log::info!("Listening on {}", self.addr); | ||
|
||
let handler = ProxyHandler::new(self, engine, instance); | ||
|
||
loop { | ||
|
@@ -298,18 +310,34 @@ struct ProxyHandlerInner { | |
cmd: ServeCommand, | ||
engine: Engine, | ||
instance_pre: InstancePre<Host>, | ||
next_id: AtomicU64, | ||
} | ||
|
||
impl ProxyHandlerInner { | ||
fn next_req_id(&self) -> u64 { | ||
self.next_id.fetch_add(1, Ordering::Relaxed) | ||
} | ||
} | ||
|
||
type OutputMutex = Arc<tokio::sync::Mutex<std::io::Stdout>>; | ||
|
||
#[derive(Clone)] | ||
struct ProxyHandler(Arc<ProxyHandlerInner>); | ||
struct ProxyHandler { | ||
inner: Arc<ProxyHandlerInner>, | ||
output: OutputMutex, | ||
} | ||
|
||
impl ProxyHandler { | ||
fn new(cmd: ServeCommand, engine: Engine, instance_pre: InstancePre<Host>) -> Self { | ||
Self(Arc::new(ProxyHandlerInner { | ||
cmd, | ||
engine, | ||
instance_pre, | ||
})) | ||
Self { | ||
inner: Arc::new(ProxyHandlerInner { | ||
cmd, | ||
engine, | ||
instance_pre, | ||
next_id: AtomicU64::from(0), | ||
}), | ||
output: Arc::new(tokio::sync::Mutex::new(std::io::stdout())), | ||
} | ||
} | ||
} | ||
|
||
|
@@ -323,25 +351,31 @@ impl hyper::service::Service<Request> for ProxyHandler { | |
fn call(&mut self, req: Request) -> Self::Future { | ||
use http_body_util::BodyExt; | ||
|
||
let handler = self.clone(); | ||
let ProxyHandler { inner, output } = self.clone(); | ||
|
||
let (sender, receiver) = tokio::sync::oneshot::channel(); | ||
|
||
// TODO: need to track the join handle, but don't want to block the response on it | ||
tokio::task::spawn(async move { | ||
let mut store = handler.0.cmd.new_store(&handler.0.engine)?; | ||
let req_id = inner.next_req_id(); | ||
|
||
log::info!( | ||
"Request {req_id} handling {} to {}", | ||
req.method(), | ||
req.uri() | ||
); | ||
|
||
let mut store = inner.cmd.new_store(&inner.engine, req_id, output)?; | ||
|
||
let req = store.data_mut().new_incoming_request( | ||
req.map(|body| body.map_err(|e| anyhow::anyhow!(e)).boxed()), | ||
)?; | ||
|
||
let out = store.data_mut().new_response_outparam(sender)?; | ||
|
||
let (proxy, _inst) = wasmtime_wasi_http::proxy::Proxy::instantiate_pre( | ||
&mut store, | ||
&handler.0.instance_pre, | ||
) | ||
.await?; | ||
let (proxy, _inst) = | ||
wasmtime_wasi_http::proxy::Proxy::instantiate_pre(&mut store, &inner.instance_pre) | ||
.await?; | ||
|
||
proxy | ||
.wasi_http_incoming_handler() | ||
|
@@ -357,3 +391,55 @@ impl hyper::service::Service<Request> for ProxyHandler { | |
}) | ||
} | ||
} | ||
|
||
#[derive(Clone)] | ||
struct LogStream { | ||
prefix: String, | ||
output: OutputMutex, | ||
} | ||
|
||
impl preview2::StdoutStream for LogStream { | ||
fn stream(&self) -> Box<dyn preview2::HostOutputStream> { | ||
Box::new(self.clone()) | ||
} | ||
|
||
fn isatty(&self) -> bool { | ||
false | ||
} | ||
} | ||
|
||
impl preview2::HostOutputStream for LogStream { | ||
fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> { | ||
let mut msg = Vec::new(); | ||
|
||
for line in bytes.split(|c| *c == b'\n') { | ||
if !line.is_empty() { | ||
msg.extend_from_slice(&self.prefix.as_bytes()); | ||
msg.extend_from_slice(line); | ||
msg.push(b'\n'); | ||
} | ||
} | ||
|
||
let output = self.output.clone(); | ||
tokio::task::spawn(async move { | ||
use std::io::Write; | ||
let mut output = output.lock().await; | ||
output.write_all(&msg).expect("writing to stdout"); | ||
}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would you think of going ahead and directly doing a blocking write to stdout/stderr here? That'd help applying a bit of backpressure (albeit not in a perfect way) if necessary and additionally would avoid the need to pass around streams much. That'd also perhaps make it a bit easier to write stdout to stdout and stderr to stderr. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great suggestion! I've implemented that, and substantially simplified the |
||
|
||
Ok(()) | ||
} | ||
|
||
fn flush(&mut self) -> StreamResult<()> { | ||
Ok(()) | ||
} | ||
|
||
fn check_write(&mut self) -> StreamResult<usize> { | ||
Ok(1024 * 1024) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is arbitrary. |
||
} | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl preview2::Subscribe for LogStream { | ||
async fn ready(&mut self) {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not clear that there's a reasonable implementation for this here. Perhaps using |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be reasonable to make a separate
Stderr
handle here. My thought was that since the output is already prefixed withstdout
/stderr
, using the same output stream would be okay.