Skip to content

Commit

Permalink
serve: Fix logging prints to stdout/stderr (#8877)
Browse files Browse the repository at this point in the history
This commit fixes writes to stdout/stderr which don't end in a newline
to not get split across lines with a prefix on each line. Instead
internally a flag is used to track whether a prefix is required at the
beginning of each chunk.
  • Loading branch information
alexcrichton authored Jun 27, 2024
1 parent 0f4ae88 commit 9dff778
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 21 deletions.
30 changes: 30 additions & 0 deletions crates/test-programs/src/bin/cli_serve_with_print.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::io::Write;
use test_programs::proxy;
use test_programs::wasi::http::types::{
Fields, IncomingRequest, OutgoingResponse, ResponseOutparam,
};

struct T;

proxy::export!(T);

impl proxy::exports::wasi::http::incoming_handler::Guest for T {
fn handle(_request: IncomingRequest, outparam: ResponseOutparam) {
print!("this is half a print ");
std::io::stdout().flush().unwrap();
println!("to stdout");
println!(); // empty line
println!("after empty");

eprint!("this is half a print ");
std::io::stderr().flush().unwrap();
eprintln!("to stderr");
eprintln!(); // empty line
eprintln!("after empty");

let resp = OutgoingResponse::new(Fields::new());
ResponseOutparam::set(outparam, Ok(resp));
}
}

fn main() {}
62 changes: 44 additions & 18 deletions src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,15 @@ impl ServeCommand {

builder.env("REQUEST_ID", req_id.to_string());

builder.stdout(LogStream {
prefix: format!("stdout [{req_id}] :: "),
output: Output::Stdout,
});
builder.stdout(LogStream::new(
format!("stdout [{req_id}] :: "),
Output::Stdout,
));

builder.stderr(LogStream {
prefix: format!("stderr [{req_id}] :: "),
output: Output::Stderr,
});
builder.stderr(LogStream::new(
format!("stderr [{req_id}] :: "),
Output::Stderr,
));

let mut host = Host {
table: wasmtime::component::ResourceTable::new(),
Expand Down Expand Up @@ -470,6 +470,17 @@ impl Output {
struct LogStream {
prefix: String,
output: Output,
needs_prefix_on_next_write: bool,
}

impl LogStream {
fn new(prefix: String, output: Output) -> LogStream {
LogStream {
prefix,
output,
needs_prefix_on_next_write: true,
}
}
}

impl wasmtime_wasi::StdoutStream for LogStream {
Expand All @@ -489,19 +500,34 @@ impl wasmtime_wasi::StdoutStream for LogStream {

impl wasmtime_wasi::HostOutputStream for LogStream {
fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
let mut msg = Vec::new();

for line in bytes.split(|c| *c == b'\n') {
if !line.is_empty() {
msg.extend_from_slice(&self.prefix.as_bytes());
msg.extend_from_slice(line);
msg.push(b'\n');
let mut bytes = &bytes[..];

while !bytes.is_empty() {
if self.needs_prefix_on_next_write {
self.output
.write_all(self.prefix.as_bytes())
.map_err(StreamError::LastOperationFailed)?;
self.needs_prefix_on_next_write = false;
}
match bytes.iter().position(|b| *b == b'\n') {
Some(i) => {
let (a, b) = bytes.split_at(i + 1);
bytes = b;
self.output
.write_all(a)
.map_err(StreamError::LastOperationFailed)?;
self.needs_prefix_on_next_write = true;
}
None => {
self.output
.write_all(bytes)
.map_err(StreamError::LastOperationFailed)?;
break;
}
}
}

self.output
.write_all(&msg)
.map_err(StreamError::LastOperationFailed)
Ok(())
}

fn flush(&mut self) -> StreamResult<()> {
Expand Down
54 changes: 51 additions & 3 deletions tests/all/cli_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,7 @@ mod test_programs {
}

/// Completes this server gracefully by printing the output on failure.
fn finish(mut self) -> Result<String> {
fn finish(mut self) -> Result<(String, String)> {
let mut child = self.child.take().unwrap();

// If the child process has already exited then collect the output
Expand All @@ -1525,7 +1525,10 @@ mod test_programs {
bail!("child failed {output:?}");
}

Ok(String::from_utf8_lossy(&output.stderr).into_owned())
Ok((
String::from_utf8_lossy(&output.stdout).into_owned(),
String::from_utf8_lossy(&output.stderr).into_owned(),
))
}

/// Send a request to this server and wait for the response.
Expand Down Expand Up @@ -1660,7 +1663,7 @@ mod test_programs {
)
.await;
assert!(result.is_err());
let stderr = server.finish()?;
let (_, stderr) = server.finish()?;
assert!(
stderr.contains("maximum concurrent memory limit of 0 reached"),
"bad stderr: {stderr}",
Expand Down Expand Up @@ -1766,6 +1769,51 @@ mod test_programs {

Ok(())
}

#[tokio::test]
async fn cli_serve_with_print() -> Result<()> {
let server = WasmtimeServe::new(CLI_SERVE_WITH_PRINT_COMPONENT, |cmd| {
cmd.arg("-Scli");
})?;

for _ in 0..2 {
let resp = server
.send_request(
hyper::Request::builder()
.uri("http://localhost/")
.body(String::new())
.context("failed to make request")?,
)
.await?;
assert!(resp.status().is_success());
}

let (out, err) = server.finish()?;
assert_eq!(
out,
"\
stdout [0] :: this is half a print to stdout
stdout [0] :: \n\
stdout [0] :: after empty
stdout [1] :: this is half a print to stdout
stdout [1] :: \n\
stdout [1] :: after empty
"
);
assert_eq!(
err,
"\
stderr [0] :: this is half a print to stderr
stderr [0] :: \n\
stderr [0] :: after empty
stderr [1] :: this is half a print to stderr
stderr [1] :: \n\
stderr [1] :: after empty
"
);

Ok(())
}
}

#[test]
Expand Down

0 comments on commit 9dff778

Please sign in to comment.