Skip to content

Commit

Permalink
dragons
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Dec 25, 2023
1 parent 0dca943 commit cd5d754
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 60 deletions.
47 changes: 9 additions & 38 deletions src/async_impl/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::task::{Context, Poll};

use bytes::Bytes;
use http_body::Body as HttpBody;
use pin_project_lite::pin_project;
use http_body_util::combinators::BoxBody;
#[cfg(feature = "stream")]
use tokio::fs::File;
use tokio::time::Sleep;
Expand All @@ -20,14 +20,7 @@ pub struct Body {
enum Inner {
Reusable(Bytes),
Streaming {
body: Pin<
Box<
dyn HttpBody<Data = Bytes, Error = Box<dyn std::error::Error + Send + Sync>>
+ Send
+ Sync,
>,
>,
timeout: Option<Pin<Box<Sleep>>>,
body: BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>,
},
}

Expand All @@ -40,13 +33,6 @@ pub(crate) struct TotalTimeoutBody<B> {
timeout: Pin<Box<Sleep>>,
}

pin_project! {
struct WrapStream<S> {
#[pin]
inner: S,
}
}

impl Body {
/// Returns a reference to the internal data of the `Body`.
///
Expand Down Expand Up @@ -99,34 +85,25 @@ impl Body {
Bytes: From<S::Ok>,
{
use futures_util::TryStreamExt;
use http_body::Frame;
use http_body_util::StreamBody;

let body = Box::pin(WrapStream {
inner: stream.map_ok(Bytes::from).map_err(Into::into),
});
let body = http_body_util::BodyExt::boxed(StreamBody::new(
stream.map_ok(|d| Frame::data(Bytes::from(d))).map_err(Into::into),
));
Body {
inner: Inner::Streaming {
body,
timeout: None,
},
}
}

/*
pub(crate) fn response(body: hyper::Body, timeout: Option<Pin<Box<Sleep>>>) -> Body {
Body {
inner: Inner::Streaming {
body: Box::pin(WrapHyper(body)),
timeout,
},
}
}
#[cfg(feature = "blocking")]
pub(crate) fn wrap(body: hyper::Body) -> Body {
Body {
inner: Inner::Streaming {
body: Box::pin(WrapHyper(body)),
timeout: None,
},
}
}
Expand Down Expand Up @@ -180,7 +157,6 @@ impl From<hyper::Body> for Body {
Self {
inner: Inner::Streaming {
body: Box::pin(WrapHyper(body)),
timeout: None,
},
}
}
Expand Down Expand Up @@ -237,7 +213,7 @@ impl fmt::Debug for Body {
}
}

impl hyper::body::Body for Body {
impl HttpBody for Body {
type Data = Bytes;
type Error = crate::Error;

Expand All @@ -254,12 +230,7 @@ impl hyper::body::Body for Body {
Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
}
},
Inner::Streaming { ref mut body, ref mut timeout } => {
if let Some(timeout) = timeout {
if let Poll::Ready(()) = timeout.as_mut().poll(cx) {
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
}
}
Inner::Streaming { ref mut body } => {
Poll::Ready(futures_core::ready!(Pin::new(body).poll_frame(cx))
.map(|opt_chunk| opt_chunk.map_err(crate::error::body)))
}
Expand Down
2 changes: 1 addition & 1 deletion src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ impl ClientBuilder {
connector.set_timeout(config.connect_timeout);
connector.set_verbose(config.connection_verbose);

let mut builder = HyperClient::builder(hyper_util::rt::TokioExecutor::new());
let mut builder = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new());
if matches!(config.http_version_pref, HttpVersionPref::Http2) {
builder.http2_only(true);
}
Expand Down
17 changes: 13 additions & 4 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,19 @@ impl Stream for IoStream {
type Item = Result<Bytes, std::io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) {
Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))),
Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))),
None => Poll::Ready(None),
loop {
return match futures_core::ready!(Pin::new(&mut self.0).poll_frame(cx)) {
Some(Ok(frame)) => {
// skip non-data frames
if let Ok(buf) = frame.into_data() {
Poll::Ready(Some(Ok(buf)))
} else {
continue;
}
},
Some(Err(err)) => Poll::Ready(Some(Err(error::into_io(err)))),
None => Poll::Ready(None),
};
}
}
}
Expand Down
18 changes: 13 additions & 5 deletions src/async_impl/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::pin::Pin;

use bytes::Bytes;
use encoding_rs::{Encoding, UTF_8};
use futures_util::stream::StreamExt;
use hyper_util::client::legacy::connect::HttpInfo;
use hyper::{HeaderMap, StatusCode, Version};
use mime::Mime;
Expand Down Expand Up @@ -278,10 +277,19 @@ impl Response {
/// # }
/// ```
pub async fn chunk(&mut self) -> crate::Result<Option<Bytes>> {
if let Some(item) = self.res.body_mut().next().await {
Ok(Some(item?))
} else {
Ok(None)
use http_body_util::BodyExt;

// loop to ignore unrecognized frames
loop {
if let Some(res) = self.res.body_mut().frame().await {
let frame = res?;
if let Ok(buf) = frame.into_data() {
return Ok(Some(buf));
}
// else continue
} else {
return Ok(None);
}
}
}

Expand Down
19 changes: 10 additions & 9 deletions src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

//#[cfg(feature = "default-tls")]
//use self::native_tls_conn::NativeTlsConn;
#[cfg(feature = "default-tls")]
use self::native_tls_conn::NativeTlsConn;
#[cfg(feature = "__rustls")]
use self::rustls_tls_conn::RustlsTlsConn;
use crate::dns::DynResolver;
Expand Down Expand Up @@ -266,7 +266,7 @@ impl Connector {
stream.inner().get_ref().get_ref().get_ref().inner().inner().set_nodelay(false)?;
}
Ok(Conn {
inner: self.verbose.wrap(stream/*NativeTlsConn { inner: stream }*/),
inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
is_proxy,
tls_info: self.tls_info,
})
Expand Down Expand Up @@ -354,7 +354,7 @@ impl Connector {
.connect(host.ok_or("no host in url")?, TokioIo::new(tunneled))
.await?;
return Ok(Conn {
inner: self.verbose.wrap(io/*NativeTlsConn { inner: io }*/),
inner: self.verbose.wrap(NativeTlsConn { inner: io }),
is_proxy: false,
tls_info: false,
});
Expand Down Expand Up @@ -477,6 +477,7 @@ impl TlsInfoFactory for tokio::net::TcpStream {
}
}

/*
#[cfg(feature = "__tls")]
impl<T: TlsInfoFactory> TlsInfoFactory for TokioIo<T> {
fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
Expand All @@ -497,7 +498,7 @@ impl<T: TlsInfoFactory> TlsInfoFactory for hyper_tls::MaybeHttpsStream<T> {
#[cfg(feature = "default-tls")]
impl<T> TlsInfoFactory for hyper_tls::TlsStream<T>
where
native_tls_crate::TlsStream<tokio_native_tls::AllowStd<T>>: std::io::Read + std::io::Write,
T: tokio::io::AsyncRead + tokio::io::AsyncWrite,
{
fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
let peer_certificate = self
Expand All @@ -523,6 +524,7 @@ impl<T> TlsInfoFactory for tokio_rustls::TlsStream<T> {
Some(crate::tls::TlsInfo { peer_certificate })
}
}
*/

pub(crate) trait AsyncConn:
Read + Write + Connection + Send + Sync + Unpin + 'static
Expand Down Expand Up @@ -700,7 +702,6 @@ fn tunnel_eof() -> BoxError {
"unexpected eof while tunneling".into()
}

/*
#[cfg(feature = "default-tls")]
mod native_tls_conn {
use super::TlsInfoFactory;
Expand All @@ -713,6 +714,7 @@ mod native_tls_conn {
task::{Context, Poll},
};
use hyper::rt::{Read, ReadBufCursor, Write};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_native_tls::TlsStream;

pin_project! {
Expand Down Expand Up @@ -742,7 +744,7 @@ mod native_tls_conn {
}
}

impl<T: Read + Write + Unpin> Read for NativeTlsConn<T> {
impl<T: AsyncRead + AsyncWrite + Unpin> Read for NativeTlsConn<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
Expand All @@ -753,7 +755,7 @@ mod native_tls_conn {
}
}

impl<T: Read + Write + Unpin> Write for NativeTlsConn<T> {
impl<T: AsyncRead + AsyncWrite + Unpin> Write for NativeTlsConn<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context,
Expand Down Expand Up @@ -799,7 +801,6 @@ mod native_tls_conn {
}
}
}
*/

#[cfg(feature = "__rustls")]
mod rustls_tls_conn {
Expand Down
5 changes: 2 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,8 @@ pub(crate) fn upgrade<E: Into<BoxError>>(e: E) -> Error {

// io::Error helpers

#[allow(unused)]
pub(crate) fn into_io(e: Error) -> io::Error {
e.into_io()
pub(crate) fn into_io(e: BoxError) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}

#[allow(unused)]
Expand Down

0 comments on commit cd5d754

Please sign in to comment.