Skip to content

Commit

Permalink
Downgrade hyper
Browse files Browse the repository at this point in the history
  • Loading branch information
elliottt committed Sep 20, 2023
1 parent 5d8754a commit fa0750e
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 115 deletions.
53 changes: 47 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/wasi-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true, default-features = false }
hyper = { version = "=1.0.0-rc.3", features = ["full"] }
hyper = { version = "0.14.27", features = ["full"] }
tokio = { version = "1", default-features = false, features = [
"net",
"rt-multi-thread",
"time",
] }
http = { version = "0.2.9" }
http-body = "1.0.0-rc.2"
http-body = "0.4.5"
http-body-util = "0.1.0-rc.2"
thiserror = { workspace = true }
tracing = { workspace = true }
Expand Down
126 changes: 33 additions & 93 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::{bindings::http::types, types::FieldMap};
use anyhow::anyhow;
use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use std::{
convert::Infallible,
pin::Pin,
sync::{Arc, Mutex},
time::Duration,
Expand Down Expand Up @@ -122,21 +120,18 @@ impl HostIncomingBody {
/// streaming body to completion. Data segments will be communicated out over the
/// [`DataReceiver`] channel, and a [`HostFutureTrailers`] gives a way to block on/retrieve the
/// trailers.
pub fn new(mut body: hyper::body::Incoming, between_bytes_timeout: Duration) -> Self {
pub fn new(mut body: hyper::body::Body, between_bytes_timeout: Duration) -> Self {
use hyper::body::HttpBody;

let (body_writer, body_receiver) = mpsc::channel(1);
let (trailer_writer, trailers) = oneshot::channel();

let worker = preview2::spawn(async move {
loop {
let frame = match tokio::time::timeout(
between_bytes_timeout,
http_body_util::BodyExt::frame(&mut body),
)
.await
{
let data = match tokio::time::timeout(between_bytes_timeout, body.data()).await {
Ok(None) => break,

Ok(Some(Ok(frame))) => frame,
Ok(Some(Ok(data))) => data,

Ok(Some(Err(e))) => {
match body_writer.send(Err(anyhow::anyhow!(e))).await {
Expand Down Expand Up @@ -168,31 +163,35 @@ impl HostIncomingBody {
}
};

if frame.is_trailers() {
// We know we're not going to write any more data frames at this point, so we
// explicitly drop the body_writer so that anything waiting on the read end returns
// immediately.
drop(body_writer);

let trailers = frame.into_trailers().unwrap();

// TODO: this will fail in two cases:
// 1. we've already used the channel once, which should be imposible,
// 2. the read end is closed.
// I'm not sure how to differentiate between these two cases, or really
// if we need to do anything to handle either.
let _ = trailer_writer.send(Ok(trailers));
// If the receiver no longer exists, thats ok - in that case we want to keep the
// loop running to relieve backpressure, so we get to the trailers.
let _ = body_writer.send(Ok(data)).await;
}

break;
match tokio::time::timeout(between_bytes_timeout, body.trailers()).await {
Err(_) => {
let _ = trailer_writer.send(Err(types::Error::TimeoutError(
"data frame timed out".to_string(),
)
.into()));
}

assert!(frame.is_data(), "frame wasn't data");
Ok(res) => {
// We know we're not going to write any more data frames at this point, so we
// explicitly drop the body_writer so that anything waiting on the read end
// returns immediately.
drop(body_writer);

let data = frame.into_data().unwrap();
// We always send a trailers map, treating no trailers from hyper as sending
// the empty trailers. It might be good to revisit this, passing an optional
// trailers out isntead to communicate when none were present in the response.
let res = res.map(|e| e.unwrap_or_default()).map_err(|e| anyhow!(e));

// If the receiver no longer exists, thats ok - in that case we want to keep the
// loop running to relieve backpressure, so we get to the trailers.
let _ = body_writer.send(Ok(data)).await;
// This could fail in two ways: 1) we've already used the channel, or 2) the
// read end is closed. We can't take any action based on either outcome, so we
// ignore the error here.
let _ = trailer_writer.send(res);
}
}
});

Expand Down Expand Up @@ -253,75 +252,16 @@ impl HostFutureTrailers {
}
}

pub type HyperBody = BoxBody<Bytes, Infallible>;
pub type HyperBody = hyper::Body;

pub struct HostOutgoingBody {
pub body_output_stream: Option<Box<dyn HostOutputStream>>,
pub trailers_sender: Option<tokio::sync::oneshot::Sender<hyper::HeaderMap>>,
pub sender: hyper::body::Sender,
}

impl HostOutgoingBody {
pub fn new() -> (Self, HyperBody) {
use http_body_util::BodyExt;
use hyper::{
body::{Body, Frame},
HeaderMap,
};
use std::future::Future;
use std::task::{Context, Poll};
use tokio::sync::oneshot::error::RecvError;
struct BodyImpl {
body_receiver: mpsc::Receiver<Bytes>,
trailers_receiver: Option<oneshot::Receiver<HeaderMap>>,
}
impl Body for BodyImpl {
type Data = Bytes;
type Error = Infallible;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.as_mut().body_receiver.poll_recv(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(frame)) => Poll::Ready(Some(Ok(Frame::data(frame)))),

// This means that the `body_sender` end of the channel has been dropped.
Poll::Ready(None) => {
if let Some(mut trailers_receiver) = self.as_mut().trailers_receiver.take()
{
match Pin::new(&mut trailers_receiver).poll(cx) {
Poll::Pending => {
self.as_mut().trailers_receiver = Some(trailers_receiver);
Poll::Pending
}
Poll::Ready(Ok(trailers)) => {
Poll::Ready(Some(Ok(Frame::trailers(trailers))))
}
Poll::Ready(Err(RecvError { .. })) => Poll::Ready(None),
}
} else {
Poll::Ready(None)
}
}
}
}
}

let (body_sender, body_receiver) = mpsc::channel(1);
let (trailers_sender, trailers_receiver) = oneshot::channel();
let body_impl = BodyImpl {
body_receiver,
trailers_receiver: Some(trailers_receiver),
}
.boxed();
(
Self {
// TODO: this capacity constant is arbitrary, and should be configurable
body_output_stream: Some(Box::new(BodyWriteStream::new(1024 * 1024, body_sender))),
trailers_sender: Some(trailers_sender),
},
body_impl,
)
let (sender, body) = hyper::Body::channel();
(Self { sender }, body)
}
}

Expand Down
6 changes: 2 additions & 4 deletions crates/wasi-http/src/http_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use crate::bindings::http::{
use crate::types::{HostFutureIncomingResponse, IncomingResponseInternal, TableHttpExt};
use crate::WasiHttpView;
use anyhow::Context;
use bytes::Bytes;
use http_body_util::{BodyExt, Empty};
use hyper::Method;
use std::time::Duration;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -84,7 +82,7 @@ impl<T: WasiHttpView> outgoing_handler::Host for T {
}
}

let body = req.body.unwrap_or_else(|| Empty::<Bytes>::new().boxed());
let body = req.body.unwrap_or_default();

let request = builder.body(body).map_err(http_protocol_error)?;

Expand All @@ -105,7 +103,7 @@ impl<T: WasiHttpView> outgoing_handler::Host for T {
timeout(
connect_timeout,
// TODO: we should plumb the builder through the http context, and use it here
hyper::client::conn::http1::handshake(tcp_stream),
hyper::client::conn::handshake(tcp_stream),
)
.await
.map_err(|_| timeout_error("connection"))??
Expand Down
4 changes: 2 additions & 2 deletions crates/wasi-http/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct HostOutgoingRequest {
pub struct HostIncomingResponse {
pub status: u16,
pub headers: FieldMap,
pub body: Option<(hyper::body::Incoming, std::time::Duration)>,
pub body: Option<(hyper::body::Body, std::time::Duration)>,
pub worker: AbortOnDropJoinHandle<anyhow::Result<()>>,
}

Expand Down Expand Up @@ -90,7 +90,7 @@ pub enum HostFields {
}

pub struct IncomingResponseInternal {
pub resp: hyper::Response<hyper::body::Incoming>,
pub resp: hyper::Response<hyper::body::Body>,
pub worker: AbortOnDropJoinHandle<anyhow::Result<()>>,
pub between_bytes_timeout: std::time::Duration,
}
Expand Down
13 changes: 5 additions & 8 deletions crates/wasi-http/src/types_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,10 @@ impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
id: OutgoingBody,
) -> wasmtime::Result<Result<OutputStream, ()>> {
let body = self.table().get_outgoing_body(id)?;
// This is the point where the pre-1.0.0 api is hard to fit onto wasi-http: we can't
// duplicate teh sender, but we need to keep it around to be able to send trailers. Perhaps
// we could manage this with an `Arc<Mutex>`, but the send method is a future, so that
// would likely force some other issues as well.
if let Some(stream) = body.body_output_stream.take() {
let id = self.table().push_output_stream_child(stream, id)?;
Ok(Ok(id))
Expand All @@ -460,14 +464,7 @@ impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
let mut body = self.table().delete_outgoing_body(id)?;
let trailers = self.table().get_fields(ts)?.clone();

match body
.trailers_sender
.take()
// Should be unreachable - this is the only place we take the trailers sender,
// at the end of the HostOutgoingBody's lifetime
.ok_or_else(|| anyhow!("trailers_sender missing"))?
.send(trailers.into())
{
match body.sender.send_trailers(trailers.into()).await {
Ok(()) => {}
Err(_) => {} // Ignoring failure: receiver died sending body, but we can't report that
// here.
Expand Down

0 comments on commit fa0750e

Please sign in to comment.