Skip to content

Commit

Permalink
wip: hyper v1 upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Dec 7, 2023
1 parent 4926d76 commit cd96691
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 182 deletions.
12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ __internal_proxy_sys_no_cache = []

[dependencies]
base64 = "0.21"
http = "0.2"
http = "1"
url = "2.2"
bytes = "1.0"
serde = "1.0"
Expand All @@ -99,9 +99,11 @@ mime_guess = { version = "2.0", default-features = false, optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
encoding_rs = "0.8"
http-body = "0.4.0"
hyper = { version = "0.14.21", default-features = false, features = ["tcp", "http1", "http2", "client", "runtime"] }
h2 = "0.3.10"
http-body = "1"
http-body-util = "0.1"
hyper = { version = "1", features = ["http1", "http2", "client"] }
hyper-util = { version = "0.1", features = ["http1", "http2", "client", "client-legacy", "tokio"] }
h2 = "0.4"
once_cell = "1"
log = "0.4"
mime = "0.3.16"
Expand All @@ -113,7 +115,7 @@ ipnet = "2.3"
# Optional deps...

## default-tls
hyper-tls = { version = "0.5", optional = true }
hyper-tls = { version = "0.6", optional = true }
native-tls-crate = { version = "0.2.10", optional = true, package = "native-tls" }
tokio-native-tls = { version = "0.3.0", optional = true }

Expand Down
159 changes: 40 additions & 119 deletions src/async_impl/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures_core::Stream;
use http_body::Body as HttpBody;
use pin_project_lite::pin_project;
#[cfg(feature = "stream")]
Expand All @@ -18,9 +17,6 @@ pub struct Body {
inner: Inner,
}

// The `Stream` trait isn't stable, so the impl isn't public.
pub(crate) struct ImplStream(Body);

enum Inner {
Reusable(Bytes),
Streaming {
Expand All @@ -35,15 +31,22 @@ enum Inner {
},
}

/// A body with a total timeout.
///
/// The timeout does not reset upon each chunk, but rather requires the whole
/// body be streamed before the deadline is reached.
pub(crate) struct TotalTimeoutBody<B> {
inner: B,
timeout: Pin<Box<Sleep>>,
}

pin_project! {
struct WrapStream<S> {
#[pin]
inner: S,
}
}

struct WrapHyper(hyper::Body);

impl Body {
/// Returns a reference to the internal data of the `Body`.
///
Expand Down Expand Up @@ -108,6 +111,7 @@ impl Body {
}
}

/*
pub(crate) fn response(body: hyper::Body, timeout: Option<Pin<Box<Sleep>>>) -> Body {
Body {
inner: Inner::Streaming {
Expand All @@ -126,6 +130,7 @@ impl Body {
},
}
}
*/

pub(crate) fn empty() -> Body {
Body::reusable(Bytes::new())
Expand Down Expand Up @@ -153,9 +158,11 @@ impl Body {
}
}

/*
pub(crate) fn into_stream(self) -> ImplStream {
ImplStream(self)
}
*/

#[cfg(feature = "multipart")]
pub(crate) fn content_length(&self) -> Option<u64> {
Expand All @@ -166,6 +173,7 @@ impl Body {
}
}

/*
impl From<hyper::Body> for Body {
#[inline]
fn from(body: hyper::Body) -> Body {
Expand All @@ -177,6 +185,7 @@ impl From<hyper::Body> for Body {
}
}
}
*/

impl From<Bytes> for Body {
#[inline]
Expand Down Expand Up @@ -228,132 +237,44 @@ impl fmt::Debug for Body {
}
}

// ===== impl ImplStream =====

impl HttpBody for ImplStream {
type Data = Bytes;
type Error = crate::Error;

fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let opt_try_chunk = match self.0.inner {
Inner::Streaming {
ref mut body,
ref mut timeout,
} => {
if let Some(ref mut timeout) = timeout {
if let Poll::Ready(()) = timeout.as_mut().poll(cx) {
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
}
}
futures_core::ready!(Pin::new(body).poll_data(cx))
.map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::body))
}
Inner::Reusable(ref mut bytes) => {
if bytes.is_empty() {
None
} else {
Some(Ok(std::mem::replace(bytes, Bytes::new())))
}
}
};
// ===== impl TotalTimeoutBody =====

Poll::Ready(opt_try_chunk)
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}

fn is_end_stream(&self) -> bool {
match self.0.inner {
Inner::Streaming { ref body, .. } => body.is_end_stream(),
Inner::Reusable(ref bytes) => bytes.is_empty(),
}
}

fn size_hint(&self) -> http_body::SizeHint {
match self.0.inner {
Inner::Streaming { ref body, .. } => body.size_hint(),
Inner::Reusable(ref bytes) => {
let mut hint = http_body::SizeHint::default();
hint.set_exact(bytes.len() as u64);
hint
}
}
pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
TotalTimeoutBody {
inner: body,
timeout,
}
}

impl Stream for ImplStream {
type Item = Result<Bytes, crate::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.poll_data(cx)
}
}

// ===== impl WrapStream =====

impl<S, D, E> HttpBody for WrapStream<S>
impl<B> hyper::body::Body for TotalTimeoutBody<B>
where
S: Stream<Item = Result<D, E>>,
D: Into<Bytes>,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
B: hyper::body::Body + Unpin,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Data = Bytes;
type Error = E;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let item = futures_core::ready!(self.project().inner.poll_next(cx)?);

Poll::Ready(item.map(|val| Ok(val.into())))
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}

// ===== impl WrapHyper =====

impl HttpBody for WrapHyper {
type Data = Bytes;
type Error = Box<dyn std::error::Error + Send + Sync>;
type Data = B::Data;
type Error = crate::Error;

fn poll_data(
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
// safe pin projection
Pin::new(&mut self.0)
.poll_data(cx)
.map(|opt| opt.map(|res| res.map_err(Into::into)))
) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
if let Poll::Ready(()) = self.timeout.as_mut().poll(cx) {
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
}
Poll::Ready(futures_core::ready!(Pin::new(&mut self.inner).poll_frame(cx))
.map(|opt_chunk| opt_chunk.map_err(crate::error::body)))
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
pub(crate) type ResponseBody = http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;

fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}
pub(crate) fn response(body: hyper::body::Incoming, timeout: Option<Pin<Box<Sleep>>>) -> ResponseBody {
use http_body_util::BodyExt;

fn size_hint(&self) -> http_body::SizeHint {
HttpBody::size_hint(&self.0)
if let Some(timeout) = timeout {
total_timeout(body, timeout).map_err(Into::into).boxed()
} else {
body.map_err(Into::into).boxed()
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use http::header::{
};
use http::uri::Scheme;
use http::Uri;
use hyper::client::{HttpConnector, ResponseFuture as HyperResponseFuture};
use hyper_util::client::legacy::{connect::HttpConnector/*, ResponseFuture as HyperResponseFuture*/};
#[cfg(feature = "native-tls-crate")]
use native_tls_crate::TlsConnector;
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -612,7 +612,7 @@ impl ClientBuilder {
connector.set_timeout(config.connect_timeout);
connector.set_verbose(config.connection_verbose);

let mut builder = hyper::Client::builder();
let mut builder = HyperClient::builder();
if matches!(config.http_version_pref, HttpVersionPref::Http2) {
builder.http2_only(true);
}
Expand Down Expand Up @@ -1645,7 +1645,7 @@ impl ClientBuilder {
}
}

type HyperClient = hyper::Client<Connector, super::body::ImplStream>;
type HyperClient = hyper_util::client::legacy::Client<Connector, super::body::ImplStream>;

impl Default for Client {
fn default() -> Self {
Expand Down
16 changes: 8 additions & 8 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use bytes::Bytes;
use futures_core::Stream;
use futures_util::stream::Peekable;
use http::HeaderMap;
use hyper::body::HttpBody;
use hyper::body::Body as HttpBody;

#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
use tokio_util::codec::{BytesCodec, FramedRead};
#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
use tokio_util::io::StreamReader;

use super::super::Body;
use super::body::ResponseBody;
use crate::error;

#[derive(Clone, Copy, Debug)]
Expand All @@ -50,7 +50,7 @@ type PeekableIoStreamReader = StreamReader<PeekableIoStream, Bytes>;

enum Inner {
/// A `PlainText` decoder just returns the response content as is.
PlainText(super::body::ImplStream),
PlainText(ResponseBody),

/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
#[cfg(feature = "gzip")]
Expand All @@ -72,7 +72,7 @@ enum Inner {
/// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
struct Pending(PeekableIoStream, DecoderType);

struct IoStream(super::body::ImplStream);
struct IoStream(ResponseBody);

enum DecoderType {
#[cfg(feature = "gzip")]
Expand Down Expand Up @@ -100,17 +100,17 @@ impl Decoder {
/// A plain text decoder.
///
/// This decoder will emit the underlying chunks as-is.
fn plain_text(body: Body) -> Decoder {
fn plain_text(body: ResponseBody) -> Decoder {
Decoder {
inner: Inner::PlainText(body.into_stream()),
inner: Inner::PlainText(body),
}
}

/// A gzip decoder.
///
/// This decoder will buffer and decompress chunks that are gzipped.
#[cfg(feature = "gzip")]
fn gzip(body: Body) -> Decoder {
fn gzip(body: ResponseBody) -> Decoder {
use futures_util::StreamExt;

Decoder {
Expand Down Expand Up @@ -187,7 +187,7 @@ impl Decoder {
/// how to decode the content body of the request.
///
/// Uses the correct variant by inspecting the Content-Encoding header.
pub(super) fn detect(_headers: &mut HeaderMap, body: Body, _accepts: Accepts) -> Decoder {
pub(super) fn detect(_headers: &mut HeaderMap, body: ResponseBody, _accepts: Accepts) -> Decoder {
#[cfg(feature = "gzip")]
{
if _accepts.gzip && Decoder::detect_encoding(_headers, "gzip") {
Expand Down
Loading

0 comments on commit cd96691

Please sign in to comment.