diff --git a/crates/test-programs/wasi-http-tests/src/lib.rs b/crates/test-programs/wasi-http-tests/src/lib.rs index f432230a229d..90ad31b79daa 100644 --- a/crates/test-programs/wasi-http-tests/src/lib.rs +++ b/crates/test-programs/wasi-http-tests/src/lib.rs @@ -150,15 +150,14 @@ pub async fn request( let input_stream_pollable = input_stream.subscribe(); let mut body = Vec::new(); - let mut eof = streams::StreamStatus::Open; - while eof != streams::StreamStatus::Ended { + loop { poll::poll_list(&[&input_stream_pollable]); - let (mut body_chunk, stream_status) = input_stream - .read(1024 * 1024) - .map_err(|_| anyhow!("input_stream read failed"))?; - - eof = stream_status; + let mut body_chunk = match input_stream.read(1024 * 1024) { + Ok(c) => c, + Err(streams::StreamError::Closed) => break, + Err(e) => Err(anyhow!("input_stream read failed: {e:?}"))?, + }; if !body_chunk.is_empty() { body.append(&mut body_chunk); diff --git a/crates/test-programs/wasi-sockets-tests/src/lib.rs b/crates/test-programs/wasi-sockets-tests/src/lib.rs index c7a49203bb7b..a46cff830c0f 100644 --- a/crates/test-programs/wasi-sockets-tests/src/lib.rs +++ b/crates/test-programs/wasi-sockets-tests/src/lib.rs @@ -4,38 +4,24 @@ use wasi::io::poll; use wasi::io::streams; use wasi::sockets::{network, tcp, tcp_create_socket}; -pub fn write(output: &streams::OutputStream, mut bytes: &[u8]) -> (usize, streams::StreamStatus) { - let total = bytes.len(); - let mut written = 0; - +pub fn write(output: &streams::OutputStream, mut bytes: &[u8]) -> Result<(), streams::StreamError> { let pollable = output.subscribe(); while !bytes.is_empty() { poll::poll_list(&[&pollable]); - let permit = match output.check_write() { - Ok(n) => n, - Err(_) => return (written, streams::StreamStatus::Ended), - }; + let permit = output.check_write()?; let len = bytes.len().min(permit as usize); let (chunk, rest) = bytes.split_at(len); - match output.write(chunk) { - Ok(()) => {} - Err(_) => return (written, streams::StreamStatus::Ended), - } + output.write(chunk)?; - match output.blocking_flush() { - Ok(()) => {} - Err(_) => return (written, streams::StreamStatus::Ended), - } + output.blocking_flush()?; bytes = rest; - written += len; } - - (total, streams::StreamStatus::Open) + Ok(()) } pub fn example_body(net: tcp::Network, sock: tcp::TcpSocket, family: network::IpAddressFamily) { @@ -59,13 +45,9 @@ pub fn example_body(net: tcp::Network, sock: tcp::TcpSocket, family: network::Ip poll::poll_one(&client_sub); let (client_input, client_output) = client.finish_connect().unwrap(); - let (n, status) = write(&client_output, &[]); - assert_eq!(n, 0); - assert_eq!(status, streams::StreamStatus::Open); + write(&client_output, &[]).unwrap(); - let (n, status) = write(&client_output, first_message); - assert_eq!(n, first_message.len()); - assert_eq!(status, streams::StreamStatus::Open); + write(&client_output, first_message).unwrap(); drop(client_input); drop(client_output); @@ -75,12 +57,10 @@ pub fn example_body(net: tcp::Network, sock: tcp::TcpSocket, family: network::Ip poll::poll_one(&sub); let (accepted, input, output) = sock.accept().unwrap(); - let (empty_data, status) = input.read(0).unwrap(); + let empty_data = input.read(0).unwrap(); assert!(empty_data.is_empty()); - assert_eq!(status, streams::StreamStatus::Open); - let (data, status) = input.blocking_read(first_message.len() as u64).unwrap(); - assert_eq!(status, streams::StreamStatus::Open); + let data = input.blocking_read(first_message.len() as u64).unwrap(); drop(input); drop(output); @@ -97,9 +77,7 @@ pub fn example_body(net: tcp::Network, sock: tcp::TcpSocket, family: network::Ip poll::poll_one(&client_sub); let (client_input, client_output) = client.finish_connect().unwrap(); - let (n, status) = write(&client_output, second_message); - assert_eq!(n, second_message.len()); - assert_eq!(status, streams::StreamStatus::Open); + write(&client_output, second_message).unwrap(); drop(client_input); drop(client_output); @@ -108,8 +86,7 @@ pub fn example_body(net: tcp::Network, sock: tcp::TcpSocket, family: network::Ip poll::poll_one(&sub); let (accepted, input, output) = sock.accept().unwrap(); - let (data, status) = input.blocking_read(second_message.len() as u64).unwrap(); - assert_eq!(status, streams::StreamStatus::Open); + let data = input.blocking_read(second_message.len() as u64).unwrap(); drop(input); drop(output); diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index d7e3b07daced..39bc30de05bb 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -10,8 +10,7 @@ use std::{ }; use tokio::sync::{mpsc, oneshot}; use wasmtime_wasi::preview2::{ - self, AbortOnDropJoinHandle, HostInputStream, HostOutputStream, OutputStreamError, - StreamRuntimeError, StreamState, Subscribe, + self, AbortOnDropJoinHandle, HostInputStream, HostOutputStream, StreamError, Subscribe, }; pub type HyperIncomingBody = BoxBody; @@ -146,21 +145,21 @@ impl HostIncomingBodyStream { #[async_trait::async_trait] impl HostInputStream for HostIncomingBodyStream { - fn read(&mut self, size: usize) -> anyhow::Result<(Bytes, StreamState)> { + fn read(&mut self, size: usize) -> Result { use mpsc::error::TryRecvError; if !self.buffer.is_empty() { let len = size.min(self.buffer.len()); let chunk = self.buffer.split_to(len); - return Ok((chunk, StreamState::Open)); + return Ok(chunk); } if let Some(e) = self.error.take() { - return Err(StreamRuntimeError::from(e).into()); + return Err(StreamError::LastOperationFailed(e)); } if !self.open { - return Ok((Bytes::new(), StreamState::Closed)); + return Err(StreamError::Closed); } match self.receiver.try_recv() { @@ -171,21 +170,21 @@ impl HostInputStream for HostIncomingBodyStream { self.buffer = bytes; } - return Ok((chunk, StreamState::Open)); + return Ok(chunk); } Ok(Err(e)) => { self.open = false; - return Err(StreamRuntimeError::from(e).into()); + return Err(StreamError::LastOperationFailed(e)); } Err(TryRecvError::Empty) => { - return Ok((Bytes::new(), StreamState::Open)); + return Ok(Bytes::new()); } Err(TryRecvError::Disconnected) => { self.open = false; - return Ok((Bytes::new(), StreamState::Closed)); + return Err(StreamError::Closed); } } } @@ -332,12 +331,12 @@ struct WorkerState { } impl WorkerState { - fn check_error(&mut self) -> Result<(), OutputStreamError> { + fn check_error(&mut self) -> Result<(), StreamError> { if let Some(e) = self.error.take() { - return Err(OutputStreamError::LastOperationFailed(e)); + return Err(StreamError::LastOperationFailed(e)); } if !self.alive { - return Err(OutputStreamError::Closed); + return Err(StreamError::Closed); } Ok(()) } @@ -382,7 +381,7 @@ impl Worker { self.write_ready_changed.notified().await; } } - fn check_write(&self) -> Result { + fn check_write(&self) -> Result { let mut state = self.state(); if let Err(e) = state.check_error() { return Err(e); @@ -476,11 +475,11 @@ impl BodyWriteStream { #[async_trait::async_trait] impl HostOutputStream for BodyWriteStream { - fn write(&mut self, bytes: Bytes) -> Result<(), OutputStreamError> { + fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> { let mut state = self.worker.state(); state.check_error()?; if state.flush_pending { - return Err(OutputStreamError::Trap(anyhow!( + return Err(StreamError::Trap(anyhow!( "write not permitted while flush pending" ))); } @@ -489,13 +488,13 @@ impl HostOutputStream for BodyWriteStream { state.write_budget = remaining_budget; state.items.push_back(bytes); } - None => return Err(OutputStreamError::Trap(anyhow!("write exceeded budget"))), + None => return Err(StreamError::Trap(anyhow!("write exceeded budget"))), } drop(state); self.worker.new_work.notify_one(); Ok(()) } - fn flush(&mut self) -> Result<(), OutputStreamError> { + fn flush(&mut self) -> Result<(), StreamError> { let mut state = self.worker.state(); state.check_error()?; @@ -505,7 +504,7 @@ impl HostOutputStream for BodyWriteStream { Ok(()) } - fn check_write(&mut self) -> Result { + fn check_write(&mut self) -> Result { self.worker.check_write() } } diff --git a/crates/wasi-http/wit/deps/io/streams.wit b/crates/wasi-http/wit/deps/io/streams.wit index eeeff505890a..8240507976f7 100644 --- a/crates/wasi-http/wit/deps/io/streams.wit +++ b/crates/wasi-http/wit/deps/io/streams.wit @@ -8,20 +8,14 @@ package wasi:io interface streams { use poll.{pollable} - /// Streams provide a sequence of data and then end; once they end, they - /// no longer provide any further data. - /// - /// For example, a stream reading from a file ends when the stream reaches - /// the end of the file. For another example, a stream reading from a - /// socket ends when the socket is closed. - enum stream-status { - /// The stream is open and may produce further data. - open, - /// When reading, this indicates that the stream will not produce - /// further data. - /// When writing, this indicates that the stream will no longer be read. - /// Further writes are still permitted. - ended, + /// An error for input-stream and output-stream operations. + enum stream-error { + /// The last operation (a write or flush) failed before completion. + last-operation-failed, + /// The stream is closed: no more input will be accepted by the + /// stream. A closed output-stream will return this error on all + /// future operations. + closed } /// An input bytestream. @@ -58,14 +52,14 @@ interface streams { read: func( /// The maximum number of bytes to read len: u64 - ) -> result, stream-status>> + ) -> result, stream-error> /// Read bytes from a stream, after blocking until at least one byte can /// be read. Except for blocking, identical to `read`. blocking-read: func( /// The maximum number of bytes to read len: u64 - ) -> result, stream-status>> + ) -> result, stream-error> /// Skip bytes from a stream. /// @@ -82,14 +76,14 @@ interface streams { skip: func( /// The maximum number of bytes to skip. len: u64, - ) -> result> + ) -> result /// Skip bytes from a stream, after blocking until at least one byte /// can be skipped. Except for blocking behavior, identical to `skip`. blocking-skip: func( /// The maximum number of bytes to skip. len: u64, - ) -> result> + ) -> result /// Create a `pollable` which will resolve once either the specified stream /// has bytes available to read or the other end of the stream has been @@ -100,18 +94,6 @@ interface streams { subscribe: func() -> pollable } - /// An error for output-stream operations. - /// - /// Contrary to input-streams, a closed output-stream is reported using - /// an error. - enum write-error { - /// The last operation (a write or flush) failed before completion. - last-operation-failed, - /// The stream is closed: no more input will be accepted by the - /// stream. A closed output-stream will return this error on all - /// future operations. - closed - } /// An output bytestream. /// @@ -131,7 +113,7 @@ interface streams { /// When this function returns 0 bytes, the `subscribe` pollable will /// become ready when this function will report at least 1 byte, or an /// error. - check-write: func() -> result + check-write: func() -> result /// Perform a write. This function never blocks. /// @@ -142,7 +124,7 @@ interface streams { /// the last call to check-write provided a permit. write: func( contents: list - ) -> result<_, write-error> + ) -> result<_, stream-error> /// Perform a write of up to 4096 bytes, and then flush the stream. Block /// until all of these operations are complete, or an error occurs. @@ -170,7 +152,7 @@ interface streams { /// ``` blocking-write-and-flush: func( contents: list - ) -> result<_, write-error> + ) -> result<_, stream-error> /// Request to flush buffered output. This function never blocks. /// @@ -182,11 +164,11 @@ interface streams { /// writes (`check-write` will return `ok(0)`) until the flush has /// completed. The `subscribe` pollable will become ready when the /// flush has completed and the stream can accept more writes. - flush: func() -> result<_, write-error> + flush: func() -> result<_, stream-error> /// Request to flush buffered output, and block until flush completes /// and stream is ready for writing again. - blocking-flush: func() -> result<_, write-error> + blocking-flush: func() -> result<_, stream-error> /// Create a `pollable` which will resolve once the output-stream /// is ready for more writing, or an error has occured. When this @@ -209,7 +191,7 @@ interface streams { write-zeroes: func( /// The number of zero-bytes to write len: u64 - ) -> result<_, write-error> + ) -> result<_, stream-error> /// Perform a write of up to 4096 zeroes, and then flush the stream. /// Block until all of these operations are complete, or an error @@ -238,7 +220,7 @@ interface streams { blocking-write-zeroes-and-flush: func( /// The number of zero-bytes to write len: u64 - ) -> result<_, write-error> + ) -> result<_, stream-error> /// Read from one stream and write to another. /// @@ -252,7 +234,7 @@ interface streams { src: input-stream, /// The number of bytes to splice len: u64, - ) -> result> + ) -> result /// Read from one stream and write to another, with blocking. /// @@ -263,7 +245,7 @@ interface streams { src: input-stream, /// The number of bytes to splice len: u64, - ) -> result> + ) -> result /// Forward the entire contents of an input stream to an output stream. /// @@ -280,6 +262,6 @@ interface streams { forward: func( /// The stream to read from src: input-stream - ) -> result> + ) -> result } } diff --git a/crates/wasi-preview1-component-adapter/src/lib.rs b/crates/wasi-preview1-component-adapter/src/lib.rs index 46f9c887772b..acbf82eea54d 100644 --- a/crates/wasi-preview1-component-adapter/src/lib.rs +++ b/crates/wasi-preview1-component-adapter/src/lib.rs @@ -891,10 +891,17 @@ pub unsafe extern "C" fn fd_read( let read_len = u64::try_from(len).trapping_unwrap(); let wasi_stream = streams.get_read_stream()?; - let (data, stream_stat) = state + let data = match state .import_alloc .with_buffer(ptr, len, || blocking_mode.read(wasi_stream, read_len)) - .map_err(|_| ERRNO_IO)?; + { + Ok(data) => data, + Err(streams::StreamError::Closed) => { + *nread = 0; + return Ok(()); + } + Err(_) => Err(ERRNO_IO)?, + }; assert_eq!(data.as_ptr(), ptr); assert!(data.len() <= len); @@ -903,16 +910,15 @@ pub unsafe extern "C" fn fd_read( if let StreamType::File(file) = &streams.type_ { file.position .set(file.position.get() + data.len() as filesystem::Filesize); + if len == 0 { + return Err(ERRNO_INTR); + } } let len = data.len(); + *nread = len; forget(data); - if stream_stat == crate::streams::StreamStatus::Open && len == 0 { - Err(ERRNO_INTR) - } else { - *nread = len; - Ok(()) - } + Ok(()) } Descriptor::Closed(_) => Err(ERRNO_BADF), } @@ -2134,7 +2140,7 @@ impl BlockingMode { self, input_stream: &streams::InputStream, read_len: u64, - ) -> Result<(Vec, streams::StreamStatus), ()> { + ) -> Result, streams::StreamError> { match self { BlockingMode::NonBlocking => input_stream.read(read_len), BlockingMode::Blocking => input_stream.blocking_read(read_len), @@ -2163,8 +2169,8 @@ impl BlockingMode { BlockingMode::NonBlocking => { let permit = match output_stream.check_write() { Ok(n) => n, - Err(streams::WriteError::Closed) => 0, - Err(streams::WriteError::LastOperationFailed) => return Err(ERRNO_IO), + Err(streams::StreamError::Closed) => 0, + Err(streams::StreamError::LastOperationFailed) => return Err(ERRNO_IO), }; let len = bytes.len().min(permit as usize); @@ -2174,14 +2180,14 @@ impl BlockingMode { match output_stream.write(&bytes[..len]) { Ok(_) => {} - Err(streams::WriteError::Closed) => return Ok(0), - Err(streams::WriteError::LastOperationFailed) => return Err(ERRNO_IO), + Err(streams::StreamError::Closed) => return Ok(0), + Err(streams::StreamError::LastOperationFailed) => return Err(ERRNO_IO), } match output_stream.blocking_flush() { Ok(_) => {} - Err(streams::WriteError::Closed) => return Ok(0), - Err(streams::WriteError::LastOperationFailed) => return Err(ERRNO_IO), + Err(streams::StreamError::Closed) => return Ok(0), + Err(streams::StreamError::LastOperationFailed) => return Err(ERRNO_IO), } Ok(len) diff --git a/crates/wasi/src/preview2/filesystem.rs b/crates/wasi/src/preview2/filesystem.rs index e09d32df1d9c..488ec93b1f89 100644 --- a/crates/wasi/src/preview2/filesystem.rs +++ b/crates/wasi/src/preview2/filesystem.rs @@ -1,8 +1,5 @@ use crate::preview2::bindings::filesystem::types; -use crate::preview2::{ - AbortOnDropJoinHandle, HostOutputStream, OutputStreamError, StreamRuntimeError, StreamState, - Subscribe, -}; +use crate::preview2::{AbortOnDropJoinHandle, HostOutputStream, StreamError, Subscribe}; use anyhow::anyhow; use bytes::{Bytes, BytesMut}; use std::io; @@ -126,7 +123,7 @@ impl FileInputStream { Self { file, position } } - pub async fn read(&mut self, size: usize) -> anyhow::Result<(Bytes, StreamState)> { + pub async fn read(&mut self, size: usize) -> Result { use system_interface::fs::FileIoExt; let f = Arc::clone(&self.file); let p = self.position; @@ -137,33 +134,24 @@ impl FileInputStream { }) .await .unwrap(); - let (n, state) = read_result(r)?; + let n = read_result(r)?; buf.truncate(n); self.position += n as u64; - Ok((buf.freeze(), state)) + Ok(buf.freeze()) } - pub async fn skip(&mut self, nelem: usize) -> anyhow::Result<(usize, StreamState)> { - let mut nread = 0; - let mut state = StreamState::Open; - - let (bs, read_state) = self.read(nelem).await?; - // TODO: handle the case where `bs.len()` is less than `nelem` - nread += bs.len(); - if read_state.is_closed() { - state = read_state; - } - - Ok((nread, state)) + pub async fn skip(&mut self, nelem: usize) -> Result { + let bs = self.read(nelem).await?; + Ok(bs.len()) } } -fn read_result(r: io::Result) -> Result<(usize, StreamState), anyhow::Error> { +fn read_result(r: io::Result) -> Result { match r { - Ok(0) => Ok((0, StreamState::Closed)), - Ok(n) => Ok((n, StreamState::Open)), - Err(e) if e.kind() == io::ErrorKind::Interrupted => Ok((0, StreamState::Open)), - Err(e) => Err(StreamRuntimeError::from(anyhow!(e)).into()), + Ok(0) => Err(StreamError::Closed), + Ok(n) => Ok(n), + Err(e) if e.kind() == std::io::ErrorKind::Interrupted => Ok(0), + Err(e) => Err(StreamError::LastOperationFailed(e.into())), } } @@ -210,14 +198,14 @@ impl FileOutputStream { const FILE_WRITE_CAPACITY: usize = 1024 * 1024; impl HostOutputStream for FileOutputStream { - fn write(&mut self, buf: Bytes) -> Result<(), OutputStreamError> { + fn write(&mut self, buf: Bytes) -> Result<(), StreamError> { use system_interface::fs::FileIoExt; match self.state { OutputState::Ready => {} - OutputState::Closed => return Err(OutputStreamError::Closed), + OutputState::Closed => return Err(StreamError::Closed), OutputState::Waiting(_) | OutputState::Error(_) => { // a write is pending - this call was not permitted - return Err(OutputStreamError::Trap(anyhow!( + return Err(StreamError::Trap(anyhow!( "write not permitted: check_write not called first" ))); } @@ -248,25 +236,25 @@ impl HostOutputStream for FileOutputStream { self.state = OutputState::Waiting(task); Ok(()) } - fn flush(&mut self) -> Result<(), OutputStreamError> { + fn flush(&mut self) -> Result<(), StreamError> { match self.state { // Only userland buffering of file writes is in the blocking task, // so there's nothing extra that needs to be done to request a // flush. OutputState::Ready | OutputState::Waiting(_) => Ok(()), - OutputState::Closed => Err(OutputStreamError::Closed), + OutputState::Closed => Err(StreamError::Closed), OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) { - OutputState::Error(e) => Err(OutputStreamError::LastOperationFailed(e.into())), + OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())), _ => unreachable!(), }, } } - fn check_write(&mut self) -> Result { + fn check_write(&mut self) -> Result { match self.state { OutputState::Ready => Ok(FILE_WRITE_CAPACITY), - OutputState::Closed => Err(OutputStreamError::Closed), + OutputState::Closed => Err(StreamError::Closed), OutputState::Error(_) => match mem::replace(&mut self.state, OutputState::Closed) { - OutputState::Error(e) => Err(OutputStreamError::LastOperationFailed(e.into())), + OutputState::Error(e) => Err(StreamError::LastOperationFailed(e.into())), _ => unreachable!(), }, OutputState::Waiting(_) => Ok(0), diff --git a/crates/wasi/src/preview2/host/io.rs b/crates/wasi/src/preview2/host/io.rs index a9e18de4caf3..92cfa0cdb0de 100644 --- a/crates/wasi/src/preview2/host/io.rs +++ b/crates/wasi/src/preview2/host/io.rs @@ -1,34 +1,25 @@ use crate::preview2::{ bindings::io::streams::{self, InputStream, OutputStream}, poll::subscribe, - stream::{OutputStreamError, StreamRuntimeError, StreamState}, + stream::StreamError, Pollable, TableError, WasiView, }; use wasmtime::component::Resource; -impl From for streams::StreamStatus { - fn from(state: StreamState) -> Self { - match state { - StreamState::Open => Self::Open, - StreamState::Closed => Self::Ended, - } - } -} - impl From for streams::Error { fn from(e: TableError) -> streams::Error { streams::Error::trap(e.into()) } } -impl From for streams::Error { - fn from(e: OutputStreamError) -> streams::Error { +impl From for streams::Error { + fn from(e: StreamError) -> streams::Error { match e { - OutputStreamError::Closed => streams::WriteError::Closed.into(), - OutputStreamError::LastOperationFailed(e) => { - tracing::debug!("streams::WriteError::LastOperationFailed: {e:?}"); - streams::WriteError::LastOperationFailed.into() + StreamError::Closed => streams::StreamError::Closed.into(), + StreamError::LastOperationFailed(e) => { + tracing::debug!("streams::StreamError::LastOperationFailed: {e:?}"); + streams::StreamError::LastOperationFailed.into() } - OutputStreamError::Trap(e) => streams::Error::trap(e), + StreamError::Trap(e) => streams::Error::trap(e), } } } @@ -148,7 +139,7 @@ impl streams::HostOutputStream for T { _dst: Resource, _src: Resource, _len: u64, - ) -> anyhow::Result> { + ) -> Result { // TODO: We can't get two streams at the same time because they both // carry the exclusive lifetime of `ctx`. When [`get_many_mut`] is // stabilized, that could allow us to add a `get_many_stream_mut` or @@ -177,7 +168,7 @@ impl streams::HostOutputStream for T { _dst: Resource, _src: Resource, _len: u64, - ) -> anyhow::Result> { + ) -> Result { // TODO: once splice is implemented, figure out what the blocking semantics are for waiting // on src and dest here. todo!("stream splice is not implemented") @@ -187,7 +178,7 @@ impl streams::HostOutputStream for T { &mut self, _dst: Resource, _src: Resource, - ) -> anyhow::Result> { + ) -> Result { // TODO: We can't get two streams at the same time because they both // carry the exclusive lifetime of `ctx`. When [`get_many_mut`] is // stabilized, that could allow us to add a `get_many_stream_mut` or @@ -213,6 +204,11 @@ impl streams::HostOutputStream for T { } } +impl From for streams::Error { + fn from(e: std::num::TryFromIntError) -> Self { + streams::Error::trap(anyhow::anyhow!("length overflow: {e:?}")) + } +} #[async_trait::async_trait] impl streams::HostInputStream for T { fn drop(&mut self, stream: Resource) -> anyhow::Result<()> { @@ -224,46 +220,21 @@ impl streams::HostInputStream for T { &mut self, stream: Resource, len: u64, - ) -> anyhow::Result, streams::StreamStatus), ()>> { - match self.table_mut().get_resource_mut(&stream)? { - InputStream::Host(s) => { - let (bytes, state) = match s.read(len as usize) { - Ok(a) => a, - Err(e) => { - if let Some(e) = e.downcast_ref::() { - tracing::debug!("stream runtime error: {e:?}"); - return Ok(Err(())); - } else { - return Err(e); - } - } - }; - debug_assert!(bytes.len() <= len as usize); - - Ok(Ok((bytes.into(), state.into()))) - } - InputStream::File(s) => { - let (bytes, state) = match s.read(len as usize).await { - Ok(a) => a, - Err(e) => { - if let Some(e) = e.downcast_ref::() { - tracing::debug!("stream runtime error: {e:?}"); - return Ok(Err(())); - } else { - return Err(e); - } - } - }; - Ok(Ok((bytes.into(), state.into()))) - } - } + ) -> Result, streams::Error> { + let len = len.try_into()?; + let bytes = match self.table_mut().get_resource_mut(&stream)? { + InputStream::Host(s) => s.read(len)?, + InputStream::File(s) => s.read(len).await?, + }; + debug_assert!(bytes.len() <= len as usize); + Ok(bytes.into()) } async fn blocking_read( &mut self, stream: Resource, len: u64, - ) -> anyhow::Result, streams::StreamStatus), ()>> { + ) -> Result, streams::Error> { if let InputStream::Host(s) = self.table_mut().get_resource_mut(&stream)? { s.ready().await; } @@ -274,46 +245,20 @@ impl streams::HostInputStream for T { &mut self, stream: Resource, len: u64, - ) -> anyhow::Result> { - match self.table_mut().get_resource_mut(&stream)? { - InputStream::Host(s) => { - // TODO: the cast to usize should be fallible, use `.try_into()?` - let (bytes_skipped, state) = match s.skip(len as usize) { - Ok(a) => a, - Err(e) => { - if let Some(e) = e.downcast_ref::() { - tracing::debug!("stream runtime error: {e:?}"); - return Ok(Err(())); - } else { - return Err(e); - } - } - }; - - Ok(Ok((bytes_skipped as u64, state.into()))) - } - InputStream::File(s) => { - let (bytes_skipped, state) = match s.skip(len as usize).await { - Ok(a) => a, - Err(e) => { - if let Some(e) = e.downcast_ref::() { - tracing::debug!("stream runtime error: {e:?}"); - return Ok(Err(())); - } else { - return Err(e); - } - } - }; - Ok(Ok((bytes_skipped as u64, state.into()))) - } - } + ) -> Result { + let len = len.try_into()?; + let written = match self.table_mut().get_resource_mut(&stream)? { + InputStream::Host(s) => s.skip(len)?, + InputStream::File(s) => s.skip(len).await?, + }; + Ok(written.try_into().expect("usize always fits in u64")) } async fn blocking_skip( &mut self, stream: Resource, len: u64, - ) -> anyhow::Result> { + ) -> Result { if let InputStream::Host(s) = self.table_mut().get_resource_mut(&stream)? { s.ready().await; } @@ -337,35 +282,18 @@ pub mod sync { }; use wasmtime::component::Resource; - // same boilerplate everywhere, converting between two identical types with different - // definition sites. one day wasmtime-wit-bindgen will make all this unnecessary - fn xform( - r: Result<(A, async_streams::StreamStatus), ()>, - ) -> Result<(A, streams::StreamStatus), ()> { - r.map(|(a, b)| (a, b.into())) - } - - impl From for streams::StreamStatus { - fn from(other: async_streams::StreamStatus) -> Self { - match other { - async_streams::StreamStatus::Open => Self::Open, - async_streams::StreamStatus::Ended => Self::Ended, - } - } - } - - impl From for streams::WriteError { - fn from(other: async_streams::WriteError) -> Self { + impl From for streams::StreamError { + fn from(other: async_streams::StreamError) -> Self { match other { - async_streams::WriteError::LastOperationFailed => Self::LastOperationFailed, - async_streams::WriteError::Closed => Self::Closed, + async_streams::StreamError::LastOperationFailed => Self::LastOperationFailed, + async_streams::StreamError::Closed => Self::Closed, } } } impl From for streams::Error { fn from(other: async_streams::Error) -> Self { match other.downcast() { - Ok(write_error) => streams::Error::from(streams::WriteError::from(write_error)), + Ok(write_error) => streams::Error::from(streams::StreamError::from(write_error)), Err(e) => streams::Error::trap(e), } } @@ -444,8 +372,10 @@ pub mod sync { dst: Resource, src: Resource, len: u64, - ) -> anyhow::Result> { - in_tokio(async { AsyncHostOutputStream::splice(self, dst, src, len).await }).map(xform) + ) -> Result { + Ok(in_tokio(async { + AsyncHostOutputStream::splice(self, dst, src, len).await + })?) } fn blocking_splice( @@ -453,17 +383,20 @@ pub mod sync { dst: Resource, src: Resource, len: u64, - ) -> anyhow::Result> { - in_tokio(async { AsyncHostOutputStream::blocking_splice(self, dst, src, len).await }) - .map(xform) + ) -> Result { + Ok(in_tokio(async { + AsyncHostOutputStream::blocking_splice(self, dst, src, len).await + })?) } fn forward( &mut self, dst: Resource, src: Resource, - ) -> anyhow::Result> { - in_tokio(async { AsyncHostOutputStream::forward(self, dst, src).await }).map(xform) + ) -> Result { + Ok(in_tokio(async { + AsyncHostOutputStream::forward(self, dst, src).await + })?) } } @@ -476,34 +409,36 @@ pub mod sync { &mut self, stream: Resource, len: u64, - ) -> anyhow::Result, streams::StreamStatus), ()>> { - in_tokio(async { AsyncHostInputStream::read(self, stream, len).await }).map(xform) + ) -> Result, streams::Error> { + Ok(in_tokio(async { + AsyncHostInputStream::read(self, stream, len).await + })?) } fn blocking_read( &mut self, stream: Resource, len: u64, - ) -> anyhow::Result, streams::StreamStatus), ()>> { - in_tokio(async { AsyncHostInputStream::blocking_read(self, stream, len).await }) - .map(xform) + ) -> Result, streams::Error> { + Ok(in_tokio(async { + AsyncHostInputStream::blocking_read(self, stream, len).await + })?) } - fn skip( - &mut self, - stream: Resource, - len: u64, - ) -> anyhow::Result> { - in_tokio(async { AsyncHostInputStream::skip(self, stream, len).await }).map(xform) + fn skip(&mut self, stream: Resource, len: u64) -> Result { + Ok(in_tokio(async { + AsyncHostInputStream::skip(self, stream, len).await + })?) } fn blocking_skip( &mut self, stream: Resource, len: u64, - ) -> anyhow::Result> { - in_tokio(async { AsyncHostInputStream::blocking_skip(self, stream, len).await }) - .map(xform) + ) -> Result { + Ok(in_tokio(async { + AsyncHostInputStream::blocking_skip(self, stream, len).await + })?) } fn subscribe( diff --git a/crates/wasi/src/preview2/mod.rs b/crates/wasi/src/preview2/mod.rs index de8dd801dcd2..78b436afe38a 100644 --- a/crates/wasi/src/preview2/mod.rs +++ b/crates/wasi/src/preview2/mod.rs @@ -44,10 +44,7 @@ pub use self::filesystem::{DirPerms, FilePerms}; pub use self::poll::{subscribe, ClosureFuture, MakeFuture, Pollable, PollableFuture, Subscribe}; pub use self::random::{thread_rng, Deterministic}; pub use self::stdio::{stderr, stdin, stdout, IsATTY, Stderr, Stdin, Stdout}; -pub use self::stream::{ - HostInputStream, HostOutputStream, InputStream, OutputStream, OutputStreamError, - StreamRuntimeError, StreamState, -}; +pub use self::stream::{HostInputStream, HostOutputStream, InputStream, OutputStream, StreamError}; pub use self::table::{Table, TableError}; pub use cap_fs_ext::SystemTimeSpec; pub use cap_rand::RngCore; @@ -68,7 +65,7 @@ pub mod bindings { ", tracing: true, trappable_error_type: { - "wasi:io/streams"::"write-error": Error, + "wasi:io/streams"::"stream-error": Error, "wasi:filesystem/types"::"error-code": Error, }, with: { @@ -146,7 +143,7 @@ pub mod bindings { ], }, trappable_error_type: { - "wasi:io/streams"::"write-error": Error, + "wasi:io/streams"::"stream-error": Error, "wasi:filesystem/types"::"error-code": Error, "wasi:sockets/network"::"error-code": Error, }, diff --git a/crates/wasi/src/preview2/pipe.rs b/crates/wasi/src/preview2/pipe.rs index dae3e7569a99..bc8d20b1de56 100644 --- a/crates/wasi/src/preview2/pipe.rs +++ b/crates/wasi/src/preview2/pipe.rs @@ -8,8 +8,8 @@ //! but the virtual pipes can be instantiated with any `Read` or `Write` type. //! use crate::preview2::poll::Subscribe; -use crate::preview2::{HostInputStream, HostOutputStream, OutputStreamError, StreamState}; -use anyhow::{anyhow, Error}; +use crate::preview2::{HostInputStream, HostOutputStream, StreamError}; +use anyhow::anyhow; use bytes::Bytes; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; @@ -35,20 +35,15 @@ impl MemoryInputPipe { #[async_trait::async_trait] impl HostInputStream for MemoryInputPipe { - fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { + fn read(&mut self, size: usize) -> Result { let mut buffer = self.buffer.lock().unwrap(); if buffer.is_empty() { - return Ok((Bytes::new(), StreamState::Closed)); + return Err(StreamError::Closed); } let size = size.min(buffer.len()); let read = buffer.split_to(size); - let state = if buffer.is_empty() { - StreamState::Closed - } else { - StreamState::Open - }; - Ok((read, state)) + Ok(read) } } @@ -81,10 +76,10 @@ impl MemoryOutputPipe { } impl HostOutputStream for MemoryOutputPipe { - fn write(&mut self, bytes: Bytes) -> Result<(), OutputStreamError> { + fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> { let mut buf = self.buffer.lock().unwrap(); if bytes.len() > self.capacity - buf.len() { - return Err(OutputStreamError::Trap(anyhow!( + return Err(StreamError::Trap(anyhow!( "write beyond capacity of MemoryOutputPipe" ))); } @@ -92,17 +87,17 @@ impl HostOutputStream for MemoryOutputPipe { // Always ready for writing Ok(()) } - fn flush(&mut self) -> Result<(), OutputStreamError> { + fn flush(&mut self) -> Result<(), StreamError> { // This stream is always flushed Ok(()) } - fn check_write(&mut self) -> Result { + fn check_write(&mut self) -> Result { let consumed = self.buffer.lock().unwrap().len(); if consumed < self.capacity { Ok(self.capacity - consumed) } else { // Since the buffer is full, no more bytes will ever be written - Err(OutputStreamError::Closed) + Err(StreamError::Closed) } } } @@ -114,9 +109,9 @@ impl Subscribe for MemoryOutputPipe { /// Provides a [`HostInputStream`] impl from a [`tokio::io::AsyncRead`] impl pub struct AsyncReadStream { - state: StreamState, - buffer: Option>, - receiver: mpsc::Receiver>, + closed: bool, + buffer: Option>, + receiver: mpsc::Receiver>, _join_handle: crate::preview2::AbortOnDropJoinHandle<()>, } @@ -130,11 +125,13 @@ impl AsyncReadStream { use tokio::io::AsyncReadExt; let mut buf = bytes::BytesMut::with_capacity(4096); let sent = match reader.read_buf(&mut buf).await { - Ok(nbytes) if nbytes == 0 => { - sender.send(Ok((Bytes::new(), StreamState::Closed))).await + Ok(nbytes) if nbytes == 0 => sender.send(Err(StreamError::Closed)).await, + Ok(_) => sender.send(Ok(buf.freeze())).await, + Err(e) => { + sender + .send(Err(StreamError::LastOperationFailed(e.into()))) + .await } - Ok(_) => sender.send(Ok((buf.freeze(), StreamState::Open))).await, - Err(e) => sender.send(Err(e)).await, }; if sent.is_err() { // no more receiver - stop trying to read @@ -143,7 +140,7 @@ impl AsyncReadStream { } }); AsyncReadStream { - state: StreamState::Open, + closed: false, buffer: None, receiver, _join_handle: join_handle, @@ -153,7 +150,7 @@ impl AsyncReadStream { #[async_trait::async_trait] impl HostInputStream for AsyncReadStream { - fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { + fn read(&mut self, size: usize) -> Result { use mpsc::error::TryRecvError; match self.buffer.take() { @@ -161,55 +158,47 @@ impl HostInputStream for AsyncReadStream { // TODO: de-duplicate the buffer management with the case below let len = bytes.len().min(size); let rest = bytes.split_off(len); - let return_state = if !rest.is_empty() { + if !rest.is_empty() { self.buffer = Some(Ok(rest)); - StreamState::Open - } else { - self.state - }; - return Ok((bytes, return_state)); + } + return Ok(bytes); + } + Some(Err(e)) => { + self.closed = true; + return Err(e); } - Some(Err(e)) => return Err(e.into()), None => {} } match self.receiver.try_recv() { - Ok(Ok((mut bytes, state))) => { - self.state = state; - + Ok(Ok(mut bytes)) => { let len = bytes.len().min(size); let rest = bytes.split_off(len); - let return_state = if !rest.is_empty() { + if !rest.is_empty() { self.buffer = Some(Ok(rest)); - StreamState::Open - } else { - self.state - }; + } - Ok((bytes, return_state)) + Ok(bytes) } - Ok(Err(e)) => Err(e.into()), - Err(TryRecvError::Empty) => Ok((Bytes::new(), self.state)), - Err(TryRecvError::Disconnected) => Err(anyhow!( + Ok(Err(e)) => { + self.closed = true; + Err(e) + } + Err(TryRecvError::Empty) => Ok(Bytes::new()), + Err(TryRecvError::Disconnected) => Err(StreamError::Trap(anyhow!( "AsyncReadStream sender died - should be impossible" - )), + ))), } } } #[async_trait::async_trait] impl Subscribe for AsyncReadStream { async fn ready(&mut self) { - if self.buffer.is_some() || self.state == StreamState::Closed { + if self.buffer.is_some() || self.closed { return; } match self.receiver.recv().await { - Some(Ok((bytes, state))) => { - if state == StreamState::Closed { - self.state = state; - } - self.buffer = Some(Ok(bytes)); - } - Some(Err(e)) => self.buffer = Some(Err(e)), + Some(res) => self.buffer = Some(res), None => { panic!("no more sender for an open AsyncReadStream - should be impossible") } @@ -222,15 +211,15 @@ impl Subscribe for AsyncReadStream { pub struct SinkOutputStream; impl HostOutputStream for SinkOutputStream { - fn write(&mut self, _buf: Bytes) -> Result<(), OutputStreamError> { + fn write(&mut self, _buf: Bytes) -> Result<(), StreamError> { Ok(()) } - fn flush(&mut self) -> Result<(), OutputStreamError> { + fn flush(&mut self) -> Result<(), StreamError> { // This stream is always flushed Ok(()) } - fn check_write(&mut self) -> Result { + fn check_write(&mut self) -> Result { // This stream is always ready for writing. Ok(usize::MAX) } @@ -247,8 +236,8 @@ pub struct ClosedInputStream; #[async_trait::async_trait] impl HostInputStream for ClosedInputStream { - fn read(&mut self, _size: usize) -> Result<(Bytes, StreamState), Error> { - Ok((Bytes::new(), StreamState::Closed)) + fn read(&mut self, _size: usize) -> Result { + Err(StreamError::Closed) } } @@ -262,15 +251,15 @@ impl Subscribe for ClosedInputStream { pub struct ClosedOutputStream; impl HostOutputStream for ClosedOutputStream { - fn write(&mut self, _: Bytes) -> Result<(), OutputStreamError> { - Err(OutputStreamError::Closed) + fn write(&mut self, _: Bytes) -> Result<(), StreamError> { + Err(StreamError::Closed) } - fn flush(&mut self) -> Result<(), OutputStreamError> { - Err(OutputStreamError::Closed) + fn flush(&mut self) -> Result<(), StreamError> { + Err(StreamError::Closed) } - fn check_write(&mut self) -> Result { - Err(OutputStreamError::Closed) + fn check_write(&mut self) -> Result { + Err(StreamError::Closed) } } @@ -324,22 +313,20 @@ mod test { #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn empty_read_stream() { let mut reader = AsyncReadStream::new(tokio::io::empty()); - let (bs, state) = reader.read(10).unwrap(); - assert!(bs.is_empty()); // In a multi-threaded context, the value of state is not deterministic -- the spawned // reader task may run on a different thread. - match state { + match reader.read(10) { // The reader task ran before we tried to read, and noticed that the input was empty. - StreamState::Closed => {} + Err(StreamError::Closed) => {} // The reader task hasn't run yet. Call `ready` to await and fill the buffer. - StreamState::Open => { - resolves_immediately(reader.ready()).await; - let (bs, state) = reader.read(0).unwrap(); + Ok(bs) => { assert!(bs.is_empty()); - assert_eq!(state, StreamState::Closed); + resolves_immediately(reader.ready()).await; + assert!(matches!(reader.read(0), Err(StreamError::Closed))); } + res => panic!("unexpected: {res:?}"), } } @@ -347,27 +334,23 @@ mod test { async fn infinite_read_stream() { let mut reader = AsyncReadStream::new(tokio::io::repeat(0)); - let (bs, state) = reader.read(10).unwrap(); - assert_eq!(state, StreamState::Open); + let bs = reader.read(10).unwrap(); if bs.is_empty() { // Reader task hasn't run yet. Call `ready` to await and fill the buffer. resolves_immediately(reader.ready()).await; // Now a read should succeed - let (bs, state) = reader.read(10).unwrap(); + let bs = reader.read(10).unwrap(); assert_eq!(bs.len(), 10); - assert_eq!(state, StreamState::Open); } else { assert_eq!(bs.len(), 10); } // Subsequent reads should succeed - let (bs, state) = reader.read(10).unwrap(); - assert_eq!(state, StreamState::Open); + let bs = reader.read(10).unwrap(); assert_eq!(bs.len(), 10); // Even 0-length reads should succeed and show its open - let (bs, state) = reader.read(0).unwrap(); - assert_eq!(state, StreamState::Open); + let bs = reader.read(0).unwrap(); assert_eq!(bs.len(), 0); } @@ -381,33 +364,29 @@ mod test { async fn finite_read_stream() { let mut reader = AsyncReadStream::new(finite_async_reader(&[1; 123]).await); - let (bs, state) = reader.read(123).unwrap(); - assert_eq!(state, StreamState::Open); + let bs = reader.read(123).unwrap(); if bs.is_empty() { // Reader task hasn't run yet. Call `ready` to await and fill the buffer. resolves_immediately(reader.ready()).await; // Now a read should succeed - let (bs, state) = reader.read(123).unwrap(); + let bs = reader.read(123).unwrap(); assert_eq!(bs.len(), 123); - assert_eq!(state, StreamState::Open); } else { assert_eq!(bs.len(), 123); } // The AsyncRead's should be empty now, but we have a race where the reader task hasn't // yet send that to the AsyncReadStream. - let (bs, state) = reader.read(0).unwrap(); - assert!(bs.is_empty()); - match state { - StreamState::Closed => {} // Correct! - StreamState::Open => { + match reader.read(0) { + Err(StreamError::Closed) => {} // Correct! + Ok(bs) => { + assert!(bs.is_empty()); // Need to await to give this side time to catch up resolves_immediately(reader.ready()).await; // Now a read should show closed - let (bs, state) = reader.read(0).unwrap(); - assert_eq!(bs.len(), 0); - assert_eq!(state, StreamState::Closed); + assert!(matches!(reader.read(0), Err(StreamError::Closed))); } + res => panic!("unexpected: {res:?}"), } } @@ -420,31 +399,27 @@ mod test { w.write_all(&[123]).await.unwrap(); - let (bs, state) = reader.read(1).unwrap(); - assert_eq!(state, StreamState::Open); + let bs = reader.read(1).unwrap(); if bs.is_empty() { // Reader task hasn't run yet. Call `ready` to await and fill the buffer. resolves_immediately(reader.ready()).await; // Now a read should succeed - let (bs, state) = reader.read(1).unwrap(); + let bs = reader.read(1).unwrap(); assert_eq!(*bs, [123u8]); - assert_eq!(state, StreamState::Open); } else { assert_eq!(*bs, [123u8]); } // The stream should be empty and open now: - let (bs, state) = reader.read(1).unwrap(); + let bs = reader.read(1).unwrap(); assert!(bs.is_empty()); - assert_eq!(state, StreamState::Open); // We can wait on readiness and it will time out: never_resolves(reader.ready()).await; // Still open and empty: - let (bs, state) = reader.read(1).unwrap(); + let bs = reader.read(1).unwrap(); assert!(bs.is_empty()); - assert_eq!(state, StreamState::Open); // Put something else in the stream: w.write_all(&[45]).await.unwrap(); @@ -454,22 +429,19 @@ mod test { resolves_immediately(reader.ready()).await; // read the something else back out: - let (bs, state) = reader.read(1).unwrap(); + let bs = reader.read(1).unwrap(); assert_eq!(*bs, [45u8]); - assert_eq!(state, StreamState::Open); // nothing else in there: - let (bs, state) = reader.read(1).unwrap(); + let bs = reader.read(1).unwrap(); assert!(bs.is_empty()); - assert_eq!(state, StreamState::Open); // We can wait on readiness and it will time out: never_resolves(reader.ready()).await; // nothing else in there: - let (bs, state) = reader.read(1).unwrap(); + let bs = reader.read(1).unwrap(); assert!(bs.is_empty()); - assert_eq!(state, StreamState::Open); // Now close the pipe: drop(w); @@ -479,9 +451,7 @@ mod test { resolves_immediately(reader.ready()).await; // empty and now closed: - let (bs, state) = reader.read(1).unwrap(); - assert!(bs.is_empty()); - assert_eq!(state, StreamState::Closed); + assert!(matches!(reader.read(1), Err(StreamError::Closed))); } #[test_log::test(tokio::test(flavor = "multi_thread"))] @@ -502,18 +472,16 @@ mod test { // Now we expect the reader task has sent 4k from the stream to the reader. // Try to read out one bigger than the buffer available: - let (bs, state) = reader.read(4097).unwrap(); + let bs = reader.read(4097).unwrap(); assert_eq!(bs.len(), 4096); - assert_eq!(state, StreamState::Open); // Allow the crank to turn more: resolves_immediately(reader.ready()).await; // Again we expect the reader task has sent 4k from the stream to the reader. // Try to read out one bigger than the buffer available: - let (bs, state) = reader.read(4097).unwrap(); + let bs = reader.read(4097).unwrap(); assert_eq!(bs.len(), 4096); - assert_eq!(state, StreamState::Open); // The writer task is now finished - join with it: let w = resolves_immediately(writer_task).await; @@ -525,9 +493,7 @@ mod test { resolves_immediately(reader.ready()).await; // Now we expect the reader to be empty, and the stream closed: - let (bs, state) = reader.read(4097).unwrap(); - assert_eq!(bs.len(), 0); - assert_eq!(state, StreamState::Closed); + assert!(matches!(reader.read(4097), Err(StreamError::Closed))); } #[test_log::test(test_log::test(tokio::test(flavor = "multi_thread")))] @@ -598,7 +564,7 @@ mod test { // worker hasn't processed write yet: Ok(1023) => {} // worker reports failure: - Err(OutputStreamError::LastOperationFailed(_)) => { + Err(StreamError::LastOperationFailed(_)) => { tracing::debug!("discovered stream failure in first write_ready"); should_be_closed = true; } @@ -612,13 +578,13 @@ mod test { let flush_res = writer.flush(); match flush_res { // worker reports failure: - Err(OutputStreamError::LastOperationFailed(_)) => { + Err(StreamError::LastOperationFailed(_)) => { tracing::debug!("discovered stream failure trying to flush"); assert!(!should_be_closed); should_be_closed = true; } // Already reported failure, now closed - Err(OutputStreamError::Closed) => { + Err(StreamError::Closed) => { assert!( should_be_closed, "expected a LastOperationFailed before we see Closed. {write_ready_res:?}" @@ -633,12 +599,12 @@ mod test { // closed. match resolves_immediately(writer.write_ready()).await { // worker reports failure: - Err(OutputStreamError::LastOperationFailed(_)) => { + Err(StreamError::LastOperationFailed(_)) => { tracing::debug!("discovered stream failure trying to flush"); assert!(!should_be_closed); } // Already reported failure, now closed - Err(OutputStreamError::Closed) => { + Err(StreamError::Closed) => { assert!(should_be_closed); } r => { diff --git a/crates/wasi/src/preview2/preview1.rs b/crates/wasi/src/preview2/preview1.rs index 4a51b98d0362..a4bdada9b7f6 100644 --- a/crates/wasi/src/preview2/preview1.rs +++ b/crates/wasi/src/preview2/preview1.rs @@ -1,12 +1,14 @@ -use crate::preview2::bindings::cli::{ - stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr, terminal_stdin, - terminal_stdout, +use crate::preview2::bindings::{ + self, + cli::{ + stderr, stdin, stdout, terminal_input, terminal_output, terminal_stderr, terminal_stdin, + terminal_stdout, + }, + clocks::{monotonic_clock, wall_clock}, + filesystem::{preopens, types as filesystem}, + io::{poll, streams}, }; -use crate::preview2::bindings::clocks::{monotonic_clock, wall_clock}; -use crate::preview2::bindings::filesystem::{preopens, types as filesystem}; -use crate::preview2::bindings::io::poll; -use crate::preview2::bindings::io::streams; -use crate::preview2::{bindings, IsATTY, TableError, WasiView}; +use crate::preview2::{IsATTY, StreamError, TableError, WasiView}; use anyhow::{anyhow, bail, Context}; use std::borrow::Borrow; use std::collections::{BTreeMap, HashSet}; @@ -54,14 +56,34 @@ impl BlockingMode { host: &mut impl streams::Host, input_stream: Resource, max_size: usize, - ) -> Result<(Vec, streams::StreamStatus), types::Error> { + ) -> Result, types::Error> { let max_size = max_size.try_into().unwrap_or(u64::MAX); match self { - BlockingMode::Blocking => stream_res( - streams::HostInputStream::blocking_read(host, input_stream, max_size).await, - ), + BlockingMode::Blocking => { + match streams::HostInputStream::blocking_read(host, input_stream, max_size).await { + Ok(r) if r.is_empty() => Err(types::Errno::Intr.into()), + Ok(r) => Ok(r), + Err(e) if matches!(e.downcast_ref(), Some(streams::StreamError::Closed)) => { + Ok(Vec::new()) + } + Err(e) => { + tracing::trace!("throwing away read error to report as Errno::Io: {e:?}"); + Err(types::Errno::Io.into()) + } + } + } + BlockingMode::NonBlocking => { - stream_res(streams::HostInputStream::read(host, input_stream, max_size).await) + match streams::HostInputStream::read(host, input_stream, max_size).await { + Ok(r) => Ok(r), + Err(e) if matches!(e.downcast_ref(), Some(streams::StreamError::Closed)) => { + Ok(Vec::new()) + } + Err(e) => { + tracing::trace!("throwing away read error to report as Errno::Io: {e:?}"); + Err(types::Errno::Io.into()) + } + } } } } @@ -70,7 +92,7 @@ impl BlockingMode { host: &mut (impl streams::Host + poll::Host), output_stream: Resource, mut bytes: &[u8], - ) -> Result { + ) -> Result { use streams::HostOutputStream as Streams; match self { @@ -95,7 +117,7 @@ impl BlockingMode { BlockingMode::NonBlocking => { let n = match Streams::check_write(host, output_stream.borrowed()) { Ok(n) => n, - Err(e) if matches!(e.downcast_ref(), Some(streams::WriteError::Closed)) => 0, + Err(e) if matches!(e.downcast_ref(), Some(streams::StreamError::Closed)) => 0, Err(e) => Err(e)?, }; @@ -106,7 +128,7 @@ impl BlockingMode { match Streams::write(host, output_stream.borrowed(), bytes[..len].to_vec()) { Ok(()) => {} - Err(e) if matches!(e.downcast_ref(), Some(streams::WriteError::Closed)) => { + Err(e) if matches!(e.downcast_ref(), Some(streams::StreamError::Closed)) => { return Ok(0) } Err(e) => Err(e)?, @@ -114,7 +136,7 @@ impl BlockingMode { match Streams::blocking_flush(host, output_stream.borrowed()).await { Ok(()) => {} - Err(e) if matches!(e.downcast_ref(), Some(streams::WriteError::Closed)) => { + Err(e) if matches!(e.downcast_ref(), Some(streams::StreamError::Closed)) => { return Ok(0) } Err(e) => Err(e)?, @@ -552,23 +574,28 @@ impl wiggle::GuestErrorType for types::Errno { } } +impl From for types::Error { + fn from(err: StreamError) -> Self { + types::Error::from(streams::Error::from(err)) + } +} + impl From for types::Error { fn from(err: streams::Error) -> Self { match err.downcast() { - Ok(streams::WriteError::Closed | streams::WriteError::LastOperationFailed) => { - types::Errno::Io.into() - } - + Ok(se) => se.into(), Err(t) => types::Error::trap(t), } } } -fn stream_res(r: anyhow::Result>) -> Result { - match r { - Ok(Ok(a)) => Ok(a), - Ok(Err(_)) => Err(types::Errno::Io.into()), - Err(trap) => Err(types::Error::trap(trap)), +impl From for types::Error { + fn from(err: streams::StreamError) -> Self { + match err { + streams::StreamError::Closed | streams::StreamError::LastOperationFailed => { + types::Errno::Io.into() + } + } } } @@ -1325,7 +1352,7 @@ impl< ) -> Result { let t = self.transact()?; let desc = t.get_descriptor(fd)?; - let (mut buf, read, state) = match desc { + let (mut buf, read) = match desc { Descriptor::File(File { fd, blocking_mode, @@ -1346,12 +1373,12 @@ impl< .context("failed to call `read-via-stream`") .unwrap_or_else(types::Error::trap) })?; - let (read, state) = blocking_mode.read(self, stream, buf.len()).await?; + let read = blocking_mode.read(self, stream, buf.len()).await?; let n = read.len().try_into()?; let pos = pos.checked_add(n).ok_or(types::Errno::Overflow)?; position.store(pos, Ordering::Relaxed); - (buf, read, state) + (buf, read) } Descriptor::Stdin { stream, .. } => { let stream = stream.borrowed(); @@ -1359,24 +1386,14 @@ impl< let Some(buf) = first_non_empty_iovec(iovs)? else { return Ok(0); }; - let (read, state) = stream_res( - streams::HostInputStream::blocking_read( - self, - stream, - buf.len().try_into().unwrap_or(u64::MAX), - ) - .await, - )?; - (buf, read, state) + let read = BlockingMode::Blocking.read(self, stream, buf.len()).await?; + (buf, read) } _ => return Err(types::Errno::Badf.into()), }; if read.len() > buf.len() { return Err(types::Errno::Range.into()); } - if state == streams::StreamStatus::Open && read.len() == 0 { - return Err(types::Errno::Intr.into()); - } let (buf, _) = buf.split_at_mut(read.len()); buf.copy_from_slice(&read); let n = read.len().try_into()?; @@ -1394,7 +1411,7 @@ impl< ) -> Result { let t = self.transact()?; let desc = t.get_descriptor(fd)?; - let (mut buf, read, state) = match desc { + let (mut buf, read) = match desc { Descriptor::File(File { fd, blocking_mode, .. }) if t.view.table().get_resource(fd)?.is_file() => { @@ -1410,8 +1427,8 @@ impl< .context("failed to call `read-via-stream`") .unwrap_or_else(types::Error::trap) })?; - let (read, state) = blocking_mode.read(self, stream, buf.len()).await?; - (buf, read, state) + let read = blocking_mode.read(self, stream, buf.len()).await?; + (buf, read) } Descriptor::Stdin { .. } => { // NOTE: legacy implementation returns SPIPE here @@ -1422,9 +1439,6 @@ impl< if read.len() > buf.len() { return Err(types::Errno::Range.into()); } - if state == streams::StreamStatus::Open && read.len() == 0 { - return Err(types::Errno::Intr.into()); - } let (buf, _) = buf.split_at_mut(read.len()); buf.copy_from_slice(&read); let n = read.len().try_into()?; diff --git a/crates/wasi/src/preview2/stdio.rs b/crates/wasi/src/preview2/stdio.rs index 0d61adc2438a..9a4e93b86f5f 100644 --- a/crates/wasi/src/preview2/stdio.rs +++ b/crates/wasi/src/preview2/stdio.rs @@ -229,7 +229,7 @@ impl terminal_stderr::Host for T { #[cfg(all(unix, test))] mod test { - use crate::preview2::{HostInputStream, StreamState}; + use crate::preview2::HostInputStream; use libc; use std::fs::File; use std::io::{BufRead, BufReader, Write}; @@ -321,12 +321,10 @@ mod test { stdin.ready().await; println!("child: reading input"); - let (bytes, status) = stdin.read(1024).unwrap(); + // We can't effectively test for the case where stdin was closed, so panic if it is... + let bytes = stdin.read(1024).unwrap(); - println!("child: {:?}, {:?}", bytes, status); - - // We can't effectively test for the case where stdin was closed. - assert_eq!(status, StreamState::Open); + println!("child got: {:?}", bytes); buffer.push_str(std::str::from_utf8(bytes.as_ref()).unwrap()); if let Some((line, rest)) = buffer.split_once('\n') { diff --git a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs index 1d1c80aabace..cb2f50f2c61b 100644 --- a/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs +++ b/crates/wasi/src/preview2/stdio/worker_thread_stdin.rs @@ -25,8 +25,7 @@ use crate::preview2::poll::Subscribe; use crate::preview2::stdio::StdinStream; -use crate::preview2::{HostInputStream, StreamState}; -use anyhow::Error; +use crate::preview2::{HostInputStream, StreamError}; use bytes::{Bytes, BytesMut}; use std::io::{IsTerminal, Read}; use std::mem; @@ -117,15 +116,15 @@ impl StdinStream for Stdin { #[async_trait::async_trait] impl HostInputStream for Stdin { - fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error> { + fn read(&mut self, size: usize) -> Result { let g = GlobalStdin::get(); let mut locked = g.state.lock().unwrap(); match mem::replace(&mut *locked, StdinState::ReadRequested) { StdinState::ReadNotRequested => { g.read_requested.notify_one(); - Ok((Bytes::new(), StreamState::Open)) + Ok(Bytes::new()) } - StdinState::ReadRequested => Ok((Bytes::new(), StreamState::Open)), + StdinState::ReadRequested => Ok(Bytes::new()), StdinState::Data(mut data) => { let size = data.len().min(size); let bytes = data.split_to(size); @@ -134,15 +133,15 @@ impl HostInputStream for Stdin { } else { StdinState::Data(data) }; - Ok((bytes.freeze(), StreamState::Open)) + Ok(bytes.freeze()) } StdinState::Error(e) => { *locked = StdinState::Closed; - return Err(e.into()); + Err(StreamError::LastOperationFailed(e.into())) } StdinState::Closed => { *locked = StdinState::Closed; - Ok((Bytes::new(), StreamState::Closed)) + Err(StreamError::Closed) } } } diff --git a/crates/wasi/src/preview2/stream.rs b/crates/wasi/src/preview2/stream.rs index 7300b64073a4..4143b9ad86c2 100644 --- a/crates/wasi/src/preview2/stream.rs +++ b/crates/wasi/src/preview2/stream.rs @@ -1,98 +1,56 @@ use crate::preview2::filesystem::FileInputStream; use crate::preview2::poll::Subscribe; -use anyhow::Error; use anyhow::Result; use bytes::Bytes; -use std::fmt; - -/// An error which should be reported to Wasm as a runtime error, rather than -/// an error which should trap Wasm execution. The definition for runtime -/// stream errors is the empty type, so the contents of this error will only -/// be available via a `tracing`::event` at `Level::DEBUG`. -pub struct StreamRuntimeError(anyhow::Error); -impl From for StreamRuntimeError { - fn from(e: anyhow::Error) -> Self { - StreamRuntimeError(e) - } -} -impl fmt::Debug for StreamRuntimeError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Stream runtime error: {:?}", self.0) - } -} -impl fmt::Display for StreamRuntimeError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "Stream runtime error") - } -} -impl std::error::Error for StreamRuntimeError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.0.source() - } -} - -#[derive(Clone, Copy, Debug, PartialEq)] -pub enum StreamState { - Open, - Closed, -} - -impl StreamState { - pub fn is_closed(&self) -> bool { - *self == Self::Closed - } -} /// Host trait for implementing the `wasi:io/streams.input-stream` resource: A /// bytestream which can be read from. #[async_trait::async_trait] pub trait HostInputStream: Subscribe { - /// Read bytes. On success, returns a pair holding the number of bytes - /// read and a flag indicating whether the end of the stream was reached. - /// Important: this read must be non-blocking! - /// Returning an Err which downcasts to a [`StreamRuntimeError`] will be - /// reported to Wasm as the empty error result. Otherwise, errors will trap. - fn read(&mut self, size: usize) -> Result<(Bytes, StreamState), Error>; - - /// Read bytes from a stream and discard them. Important: this method must - /// be non-blocking! - /// Returning an Error which downcasts to a StreamRuntimeError will be - /// reported to Wasm as the empty error result. Otherwise, errors will trap. - fn skip(&mut self, nelem: usize) -> Result<(usize, StreamState), Error> { - let mut nread = 0; - let mut state = StreamState::Open; - - let (bs, read_state) = self.read(nelem)?; - // TODO: handle the case where `bs.len()` is less than `nelem` - nread += bs.len(); - if read_state.is_closed() { - state = read_state; - } + /// Reads up to `size` bytes, returning a buffer holding these bytes on + /// success. + /// + /// This function does not block the current thread and is the equivalent of + /// a non-blocking read. On success all bytes read are returned through + /// `Bytes`, which is no larger than the `size` provided. If the returned + /// list of `Bytes` is empty then no data is ready to be read at this time. + /// + /// # Errors + /// + /// The [`StreamError`] return value communicates when this stream is + /// closed, when a read fails, or when a trap should be generated. + fn read(&mut self, size: usize) -> Result; - Ok((nread, state)) + /// Same as the `read` method except that bytes are skipped. + /// + /// Note that this method is non-blocking like `read` and returns the same + /// errors. + fn skip(&mut self, nelem: usize) -> Result { + let bs = self.read(nelem)?; + Ok(bs.len()) } } #[derive(Debug)] -pub enum OutputStreamError { +pub enum StreamError { Closed, LastOperationFailed(anyhow::Error), Trap(anyhow::Error), } -impl std::fmt::Display for OutputStreamError { +impl std::fmt::Display for StreamError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - OutputStreamError::Closed => write!(f, "closed"), - OutputStreamError::LastOperationFailed(e) => write!(f, "last operation failed: {e}"), - OutputStreamError::Trap(e) => write!(f, "trap: {e}"), + StreamError::Closed => write!(f, "closed"), + StreamError::LastOperationFailed(e) => write!(f, "last operation failed: {e}"), + StreamError::Trap(e) => write!(f, "trap: {e}"), } } } -impl std::error::Error for OutputStreamError { +impl std::error::Error for StreamError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { - OutputStreamError::Closed => None, - OutputStreamError::LastOperationFailed(e) | OutputStreamError::Trap(e) => e.source(), + StreamError::Closed => None, + StreamError::LastOperationFailed(e) | StreamError::Trap(e) => e.source(), } } } @@ -102,59 +60,63 @@ impl std::error::Error for OutputStreamError { #[async_trait::async_trait] pub trait HostOutputStream: Subscribe { /// Write bytes after obtaining a permit to write those bytes - /// Prior to calling [`write`](Self::write) - /// the caller must call [`write_ready`](Self::write_ready), - /// which resolves to a non-zero permit /// - /// This method must never block. - /// [`write_ready`](Self::write_ready) permit indicates the maximum amount of bytes that are - /// permitted to be written in a single [`write`](Self::write) following the - /// [`write_ready`](Self::write_ready) resolution + /// Prior to calling [`write`](Self::write) the caller must call + /// [`check_write`](Self::check_write), which resolves to a non-zero permit + /// + /// This method must never block. The [`check_write`](Self::check_write) + /// permit indicates the maximum amount of bytes that are permitted to be + /// written in a single [`write`](Self::write) following the + /// [`check_write`](Self::check_write) resolution. /// /// # Errors /// - /// Returns an [OutputStreamError] if: + /// Returns a [`StreamError`] if: /// - stream is closed /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed /// - caller performed an illegal operation (e.g. wrote more bytes than were permitted) - fn write(&mut self, bytes: Bytes) -> Result<(), OutputStreamError>; + fn write(&mut self, bytes: Bytes) -> Result<(), StreamError>; /// Trigger a flush of any bytes buffered in this stream implementation. /// /// This method may be called at any time and must never block. /// - /// After this method is called, [`write_ready`](Self::write_ready) must pend until flush is - /// complete. - /// When [`write_ready`](Self::write_ready) becomes ready after a flush, that guarantees that - /// all prior writes have been flushed from the implementation successfully, or that any error - /// associated with those writes is reported in the return value of [`flush`](Self::flush) or - /// [`write_ready`](Self::write_ready) + /// After this method is called, [`check_write`](Self::check_write) must + /// pend until flush is complete. + /// + /// When [`check_write`](Self::check_write) becomes ready after a flush, + /// that guarantees that all prior writes have been flushed from the + /// implementation successfully, or that any error associated with those + /// writes is reported in the return value of [`flush`](Self::flush) or + /// [`check_write`](Self::check_write) /// /// # Errors /// - /// Returns an [OutputStreamError] if: + /// Returns a [`StreamError`] if: /// - stream is closed /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed /// - caller performed an illegal operation (e.g. wrote more bytes than were permitted) - fn flush(&mut self) -> Result<(), OutputStreamError>; + fn flush(&mut self) -> Result<(), StreamError>; /// Returns the number of bytes that are ready to be written to this stream. /// /// Zero bytes indicates that this stream is not currently ready for writing /// and `ready()` must be awaited first. /// + /// Note that this method does not block. + /// /// # Errors /// - /// Returns an [OutputStreamError] if: + /// Returns an [`StreamError`] if: /// - stream is closed /// - prior operation ([`write`](Self::write) or [`flush`](Self::flush)) failed - fn check_write(&mut self) -> Result; + fn check_write(&mut self) -> Result; /// Repeatedly write a byte to a stream. /// Important: this write must be non-blocking! - /// Returning an Err which downcasts to a [`StreamRuntimeError`] will be + /// Returning an Err which downcasts to a [`StreamError`] will be /// reported to Wasm as the empty error result. Otherwise, errors will trap. - fn write_zeroes(&mut self, nelem: usize) -> Result<(), OutputStreamError> { + fn write_zeroes(&mut self, nelem: usize) -> Result<(), StreamError> { // TODO: We could optimize this to not allocate one big zeroed buffer, and instead write // repeatedly from a 'static buffer of zeros. let bs = Bytes::from_iter(core::iter::repeat(0 as u8).take(nelem)); @@ -164,7 +126,7 @@ pub trait HostOutputStream: Subscribe { /// Simultaneously waits for this stream to be writable and then returns how /// much may be written or the last error that happened. - async fn write_ready(&mut self) -> Result { + async fn write_ready(&mut self) -> Result { self.ready().await; self.check_write() } diff --git a/crates/wasi/src/preview2/tcp.rs b/crates/wasi/src/preview2/tcp.rs index 3d5925a686b8..434eeb3aeead 100644 --- a/crates/wasi/src/preview2/tcp.rs +++ b/crates/wasi/src/preview2/tcp.rs @@ -1,7 +1,7 @@ -use super::{HostInputStream, HostOutputStream, OutputStreamError}; -use crate::preview2::poll::Subscribe; -use crate::preview2::stream::{InputStream, OutputStream}; -use crate::preview2::{with_ambient_tokio_runtime, AbortOnDropJoinHandle, StreamState}; +use super::{HostInputStream, HostOutputStream, StreamError}; +use crate::preview2::{ + with_ambient_tokio_runtime, AbortOnDropJoinHandle, InputStream, OutputStream, Subscribe, +}; use anyhow::{Error, Result}; use cap_net_ext::{AddressFamily, Blocking, TcpListenerExt}; use cap_std::net::TcpListener; @@ -70,20 +70,16 @@ impl TcpReadStream { closed: false, } } - fn stream_state(&self) -> StreamState { - if self.closed { - StreamState::Closed - } else { - StreamState::Open - } - } } #[async_trait::async_trait] impl HostInputStream for TcpReadStream { - fn read(&mut self, size: usize) -> Result<(bytes::Bytes, StreamState), anyhow::Error> { - if size == 0 || self.closed { - return Ok((bytes::Bytes::new(), self.stream_state())); + fn read(&mut self, size: usize) -> Result { + if self.closed { + return Err(StreamError::Closed); + } + if size == 0 { + return Ok(bytes::Bytes::new()); } let mut buf = bytes::BytesMut::with_capacity(size); @@ -100,14 +96,13 @@ impl HostInputStream for TcpReadStream { Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => 0, Err(e) => { - tracing::debug!("unexpected error on TcpReadStream read: {e:?}"); self.closed = true; - 0 + return Err(StreamError::LastOperationFailed(e.into())); } }; buf.truncate(n); - Ok((buf.freeze(), self.stream_state())) + Ok(buf.freeze()) } } @@ -171,11 +166,11 @@ impl TcpWriteStream { } impl HostOutputStream for TcpWriteStream { - fn write(&mut self, mut bytes: bytes::Bytes) -> Result<(), OutputStreamError> { + fn write(&mut self, mut bytes: bytes::Bytes) -> Result<(), StreamError> { match self.last_write { LastWrite::Done => {} LastWrite::Waiting(_) | LastWrite::Error(_) => { - return Err(OutputStreamError::Trap(anyhow::anyhow!( + return Err(StreamError::Trap(anyhow::anyhow!( "unpermitted: must call check_write first" ))); } @@ -194,28 +189,28 @@ impl HostOutputStream for TcpWriteStream { return Ok(()); } - Err(e) => return Err(OutputStreamError::LastOperationFailed(e.into())), + Err(e) => return Err(StreamError::LastOperationFailed(e.into())), } } Ok(()) } - fn flush(&mut self) -> Result<(), OutputStreamError> { + fn flush(&mut self) -> Result<(), StreamError> { // `flush` is a no-op here, as we're not managing any internal buffer. Additionally, // `write_ready` will join the background write task if it's active, so following `flush` // with `write_ready` will have the desired effect. Ok(()) } - fn check_write(&mut self) -> Result { + fn check_write(&mut self) -> Result { match mem::replace(&mut self.last_write, LastWrite::Done) { LastWrite::Waiting(task) => { self.last_write = LastWrite::Waiting(task); return Ok(0); } LastWrite::Done => {} - LastWrite::Error(e) => return Err(OutputStreamError::LastOperationFailed(e.into())), + LastWrite::Error(e) => return Err(StreamError::LastOperationFailed(e.into())), } let writable = self.stream.writable(); diff --git a/crates/wasi/src/preview2/write_stream.rs b/crates/wasi/src/preview2/write_stream.rs index 166192aa0506..afc2305f007a 100644 --- a/crates/wasi/src/preview2/write_stream.rs +++ b/crates/wasi/src/preview2/write_stream.rs @@ -1,4 +1,4 @@ -use crate::preview2::{HostOutputStream, OutputStreamError, Subscribe}; +use crate::preview2::{HostOutputStream, StreamError, Subscribe}; use anyhow::anyhow; use bytes::Bytes; use std::sync::{Arc, Mutex}; @@ -13,12 +13,12 @@ struct WorkerState { } impl WorkerState { - fn check_error(&mut self) -> Result<(), OutputStreamError> { + fn check_error(&mut self) -> Result<(), StreamError> { if let Some(e) = self.error.take() { - return Err(OutputStreamError::LastOperationFailed(e)); + return Err(StreamError::LastOperationFailed(e)); } if !self.alive { - return Err(OutputStreamError::Closed); + return Err(StreamError::Closed); } Ok(()) } @@ -63,7 +63,7 @@ impl Worker { self.write_ready_changed.notified().await; } } - fn check_write(&self) -> Result { + fn check_write(&self) -> Result { let mut state = self.state(); if let Err(e) = state.check_error() { return Err(e); @@ -162,11 +162,11 @@ impl AsyncWriteStream { } impl HostOutputStream for AsyncWriteStream { - fn write(&mut self, bytes: Bytes) -> Result<(), OutputStreamError> { + fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> { let mut state = self.worker.state(); state.check_error()?; if state.flush_pending { - return Err(OutputStreamError::Trap(anyhow!( + return Err(StreamError::Trap(anyhow!( "write not permitted while flush pending" ))); } @@ -175,13 +175,13 @@ impl HostOutputStream for AsyncWriteStream { state.write_budget = remaining_budget; state.items.push_back(bytes); } - None => return Err(OutputStreamError::Trap(anyhow!("write exceeded budget"))), + None => return Err(StreamError::Trap(anyhow!("write exceeded budget"))), } drop(state); self.worker.new_work.notify_one(); Ok(()) } - fn flush(&mut self) -> Result<(), OutputStreamError> { + fn flush(&mut self) -> Result<(), StreamError> { let mut state = self.worker.state(); state.check_error()?; @@ -191,7 +191,7 @@ impl HostOutputStream for AsyncWriteStream { Ok(()) } - fn check_write(&mut self) -> Result { + fn check_write(&mut self) -> Result { self.worker.check_write() } } diff --git a/crates/wasi/wit/deps/io/streams.wit b/crates/wasi/wit/deps/io/streams.wit index eeeff505890a..8240507976f7 100644 --- a/crates/wasi/wit/deps/io/streams.wit +++ b/crates/wasi/wit/deps/io/streams.wit @@ -8,20 +8,14 @@ package wasi:io interface streams { use poll.{pollable} - /// Streams provide a sequence of data and then end; once they end, they - /// no longer provide any further data. - /// - /// For example, a stream reading from a file ends when the stream reaches - /// the end of the file. For another example, a stream reading from a - /// socket ends when the socket is closed. - enum stream-status { - /// The stream is open and may produce further data. - open, - /// When reading, this indicates that the stream will not produce - /// further data. - /// When writing, this indicates that the stream will no longer be read. - /// Further writes are still permitted. - ended, + /// An error for input-stream and output-stream operations. + enum stream-error { + /// The last operation (a write or flush) failed before completion. + last-operation-failed, + /// The stream is closed: no more input will be accepted by the + /// stream. A closed output-stream will return this error on all + /// future operations. + closed } /// An input bytestream. @@ -58,14 +52,14 @@ interface streams { read: func( /// The maximum number of bytes to read len: u64 - ) -> result, stream-status>> + ) -> result, stream-error> /// Read bytes from a stream, after blocking until at least one byte can /// be read. Except for blocking, identical to `read`. blocking-read: func( /// The maximum number of bytes to read len: u64 - ) -> result, stream-status>> + ) -> result, stream-error> /// Skip bytes from a stream. /// @@ -82,14 +76,14 @@ interface streams { skip: func( /// The maximum number of bytes to skip. len: u64, - ) -> result> + ) -> result /// Skip bytes from a stream, after blocking until at least one byte /// can be skipped. Except for blocking behavior, identical to `skip`. blocking-skip: func( /// The maximum number of bytes to skip. len: u64, - ) -> result> + ) -> result /// Create a `pollable` which will resolve once either the specified stream /// has bytes available to read or the other end of the stream has been @@ -100,18 +94,6 @@ interface streams { subscribe: func() -> pollable } - /// An error for output-stream operations. - /// - /// Contrary to input-streams, a closed output-stream is reported using - /// an error. - enum write-error { - /// The last operation (a write or flush) failed before completion. - last-operation-failed, - /// The stream is closed: no more input will be accepted by the - /// stream. A closed output-stream will return this error on all - /// future operations. - closed - } /// An output bytestream. /// @@ -131,7 +113,7 @@ interface streams { /// When this function returns 0 bytes, the `subscribe` pollable will /// become ready when this function will report at least 1 byte, or an /// error. - check-write: func() -> result + check-write: func() -> result /// Perform a write. This function never blocks. /// @@ -142,7 +124,7 @@ interface streams { /// the last call to check-write provided a permit. write: func( contents: list - ) -> result<_, write-error> + ) -> result<_, stream-error> /// Perform a write of up to 4096 bytes, and then flush the stream. Block /// until all of these operations are complete, or an error occurs. @@ -170,7 +152,7 @@ interface streams { /// ``` blocking-write-and-flush: func( contents: list - ) -> result<_, write-error> + ) -> result<_, stream-error> /// Request to flush buffered output. This function never blocks. /// @@ -182,11 +164,11 @@ interface streams { /// writes (`check-write` will return `ok(0)`) until the flush has /// completed. The `subscribe` pollable will become ready when the /// flush has completed and the stream can accept more writes. - flush: func() -> result<_, write-error> + flush: func() -> result<_, stream-error> /// Request to flush buffered output, and block until flush completes /// and stream is ready for writing again. - blocking-flush: func() -> result<_, write-error> + blocking-flush: func() -> result<_, stream-error> /// Create a `pollable` which will resolve once the output-stream /// is ready for more writing, or an error has occured. When this @@ -209,7 +191,7 @@ interface streams { write-zeroes: func( /// The number of zero-bytes to write len: u64 - ) -> result<_, write-error> + ) -> result<_, stream-error> /// Perform a write of up to 4096 zeroes, and then flush the stream. /// Block until all of these operations are complete, or an error @@ -238,7 +220,7 @@ interface streams { blocking-write-zeroes-and-flush: func( /// The number of zero-bytes to write len: u64 - ) -> result<_, write-error> + ) -> result<_, stream-error> /// Read from one stream and write to another. /// @@ -252,7 +234,7 @@ interface streams { src: input-stream, /// The number of bytes to splice len: u64, - ) -> result> + ) -> result /// Read from one stream and write to another, with blocking. /// @@ -263,7 +245,7 @@ interface streams { src: input-stream, /// The number of bytes to splice len: u64, - ) -> result> + ) -> result /// Forward the entire contents of an input stream to an output stream. /// @@ -280,6 +262,6 @@ interface streams { forward: func( /// The stream to read from src: input-stream - ) -> result> + ) -> result } }