Skip to content

Commit

Permalink
Dispatch directly to stdout/stderr in LogStream::write
Browse files Browse the repository at this point in the history
  • Loading branch information
elliottt committed Oct 25, 2023
1 parent 0dd32e1 commit 4a1b335
Showing 1 changed file with 42 additions and 32 deletions.
74 changes: 42 additions & 32 deletions src/commands/serve.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::common::{Profile, RunCommon, RunTarget};
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use clap::Parser;
use std::{
path::PathBuf,
Expand All @@ -11,7 +11,9 @@ use std::{
};
use wasmtime::component::{InstancePre, Linker};
use wasmtime::{Engine, Store, StoreLimits};
use wasmtime_wasi::preview2::{self, StreamResult, Table, WasiCtx, WasiCtxBuilder, WasiView};
use wasmtime_wasi::preview2::{
self, StreamError, StreamResult, Table, WasiCtx, WasiCtxBuilder, WasiView,
};
use wasmtime_wasi_http::{body::HyperOutgoingBody, WasiHttpCtx, WasiHttpView};

#[cfg(feature = "wasi-nn")]
Expand Down Expand Up @@ -135,19 +137,19 @@ impl ServeCommand {
Ok(())
}

fn new_store(&self, engine: &Engine, req_id: u64, output: OutputMutex) -> Result<Store<Host>> {
fn new_store(&self, engine: &Engine, req_id: u64) -> Result<Store<Host>> {
let mut builder = WasiCtxBuilder::new();

builder.envs(&[("REQUEST_ID", req_id.to_string())]);

builder.stdout(LogStream {
prefix: format!("stdout [{req_id}] :: "),
output: output.clone(),
output: Output::Stdout,
});

builder.stderr(LogStream {
prefix: format!("stderr [{req_id}] :: "),
output,
output: Output::Stderr,
});

let mut host = Host {
Expand Down Expand Up @@ -319,25 +321,17 @@ impl ProxyHandlerInner {
}
}

type OutputMutex = Arc<tokio::sync::Mutex<std::io::Stdout>>;

#[derive(Clone)]
struct ProxyHandler {
inner: Arc<ProxyHandlerInner>,
output: OutputMutex,
}
struct ProxyHandler(Arc<ProxyHandlerInner>);

impl ProxyHandler {
fn new(cmd: ServeCommand, engine: Engine, instance_pre: InstancePre<Host>) -> Self {
Self {
inner: Arc::new(ProxyHandlerInner {
cmd,
engine,
instance_pre,
next_id: AtomicU64::from(0),
}),
output: Arc::new(tokio::sync::Mutex::new(std::io::stdout())),
}
Self(Arc::new(ProxyHandlerInner {
cmd,
engine,
instance_pre,
next_id: AtomicU64::from(0),
}))
}
}

Expand All @@ -351,7 +345,7 @@ impl hyper::service::Service<Request> for ProxyHandler {
fn call(&mut self, req: Request) -> Self::Future {
use http_body_util::BodyExt;

let ProxyHandler { inner, output } = self.clone();
let ProxyHandler(inner) = self.clone();

let (sender, receiver) = tokio::sync::oneshot::channel();

Expand All @@ -365,7 +359,7 @@ impl hyper::service::Service<Request> for ProxyHandler {
req.uri()
);

let mut store = inner.cmd.new_store(&inner.engine, req_id, output)?;
let mut store = inner.cmd.new_store(&inner.engine, req_id)?;

let req = store.data_mut().new_incoming_request(
req.map(|body| body.map_err(|e| anyhow::anyhow!(e)).boxed()),
Expand All @@ -392,10 +386,28 @@ impl hyper::service::Service<Request> for ProxyHandler {
}
}

#[derive(Clone)]
enum Output {
Stdout,
Stderr,
}

impl Output {
fn write_all(&self, buf: &[u8]) -> anyhow::Result<()> {
use std::io::Write;

match self {
Output::Stdout => std::io::stdout().write_all(buf),
Output::Stderr => std::io::stderr().write_all(buf),
}
.map_err(|e| anyhow!(e))
}
}

#[derive(Clone)]
struct LogStream {
prefix: String,
output: OutputMutex,
output: Output,
}

impl preview2::StdoutStream for LogStream {
Expand All @@ -404,7 +416,12 @@ impl preview2::StdoutStream for LogStream {
}

fn isatty(&self) -> bool {
false
use std::io::IsTerminal;

match &self.output {
Output::Stdout => std::io::stdout().is_terminal(),
Output::Stderr => std::io::stderr().is_terminal(),
}
}
}

Expand All @@ -420,14 +437,7 @@ impl preview2::HostOutputStream for LogStream {
}
}

let output = self.output.clone();
tokio::task::spawn(async move {
use std::io::Write;
let mut output = output.lock().await;
output.write_all(&msg).expect("writing to stdout");
});

Ok(())
self.output.write_all(&msg).map_err(StreamError::LastOperationFailed)
}

fn flush(&mut self) -> StreamResult<()> {
Expand Down

0 comments on commit 4a1b335

Please sign in to comment.