Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Dec 29, 2023
1 parent ea96619 commit 1569dd5
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 40 deletions.
31 changes: 21 additions & 10 deletions src/async_impl/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ impl Body {
use http_body_util::StreamBody;

let body = http_body_util::BodyExt::boxed(StreamBody::new(
stream.map_ok(|d| Frame::data(Bytes::from(d))).map_err(Into::into),
stream
.map_ok(|d| Frame::data(Bytes::from(d)))
.map_err(Into::into),
));
Body {
inner: Inner::Streaming(body),
Expand Down Expand Up @@ -115,7 +117,10 @@ impl Body {
{
use http_body_util::BodyExt;

let boxed = inner.map_frame(|f| f.map_data(Into::into)).map_err(Into::into).boxed();
let boxed = inner
.map_frame(|f| f.map_data(Into::into))
.map_err(Into::into)
.boxed();

Body {
inner: Inner::Streaming(boxed),
Expand Down Expand Up @@ -232,11 +237,11 @@ impl HttpBody for Body {
} else {
Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
}
},
Inner::Streaming(ref mut body) => {
Poll::Ready(futures_core::ready!(Pin::new(body).poll_frame(cx))
.map(|opt_chunk| opt_chunk.map_err(crate::error::body)))
}
Inner::Streaming(ref mut body) => Poll::Ready(
futures_core::ready!(Pin::new(body).poll_frame(cx))
.map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
),
}
}
}
Expand Down Expand Up @@ -265,14 +270,20 @@ where
if let Poll::Ready(()) = self.timeout.as_mut().poll(cx) {
return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
}
Poll::Ready(futures_core::ready!(Pin::new(&mut self.inner).poll_frame(cx))
.map(|opt_chunk| opt_chunk.map_err(crate::error::body)))
Poll::Ready(
futures_core::ready!(Pin::new(&mut self.inner).poll_frame(cx))
.map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
)
}
}

pub(crate) type ResponseBody = http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
pub(crate) type ResponseBody =
http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;

pub(crate) fn response(body: hyper::body::Incoming, timeout: Option<Pin<Box<Sleep>>>) -> ResponseBody {
pub(crate) fn response(
body: hyper::body::Incoming,
timeout: Option<Pin<Box<Sleep>>>,
) -> ResponseBody {
use http_body_util::BodyExt;

if let Some(timeout) = timeout {
Expand Down
11 changes: 6 additions & 5 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use http::header::{
};
use http::uri::Scheme;
use http::Uri;
use hyper_util::client::legacy::{connect::HttpConnector/*, ResponseFuture as HyperResponseFuture*/};
use hyper_util::client::legacy::{
connect::HttpConnector, /*, ResponseFuture as HyperResponseFuture*/
};
#[cfg(feature = "native-tls-crate")]
use native_tls_crate::TlsConnector;
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -614,7 +616,8 @@ impl ClientBuilder {
connector.set_timeout(config.connect_timeout);
connector.set_verbose(config.connection_verbose);

let mut builder = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new());
let mut builder =
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new());
if matches!(config.http_version_pref, HttpVersionPref::Http2) {
builder.http2_only(true);
}
Expand Down Expand Up @@ -1830,9 +1833,7 @@ impl Client {
ResponseFuture::H3(self.inner.h3_client.as_ref().unwrap().request(req))
}
_ => {
let mut req = builder
.body(body)
.expect("valid request parts");
let mut req = builder.body(body).expect("valid request parts");
*req.headers_mut() = headers.clone();
ResponseFuture::Default(self.inner.hyper.request(req))
}
Expand Down
15 changes: 8 additions & 7 deletions src/async_impl/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ impl Decoder {
/// how to decode the content body of the request.
///
/// Uses the correct variant by inspecting the Content-Encoding header.
pub(super) fn detect(_headers: &mut HeaderMap, body: ResponseBody, _accepts: Accepts) -> Decoder {
pub(super) fn detect(
_headers: &mut HeaderMap,
body: ResponseBody,
_accepts: Accepts,
) -> Decoder {
#[cfg(feature = "gzip")]
{
if _accepts.gzip && Decoder::detect_encoding(_headers, "gzip") {
Expand Down Expand Up @@ -238,7 +242,7 @@ impl HttpBody for Decoder {
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode(err)))),
None => Poll::Ready(None),
}
},
}
#[cfg(feature = "gzip")]
Inner::Gzip(ref mut decoder) => {
match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
Expand Down Expand Up @@ -302,10 +306,7 @@ impl Future for Pending {
None => return Poll::Ready(Ok(Inner::PlainText(empty()))),
};

let _body = std::mem::replace(
&mut self.0,
IoStream(empty()).peekable(),
);
let _body = std::mem::replace(&mut self.0, IoStream(empty()).peekable());

match self.1 {
#[cfg(feature = "brotli")]
Expand Down Expand Up @@ -340,7 +341,7 @@ impl Stream for IoStream {
} else {
continue;
}
},
}
Some(Err(err)) => Poll::Ready(Some(Err(error::into_io(err)))),
None => Poll::Ready(None),
};
Expand Down
14 changes: 10 additions & 4 deletions src/async_impl/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::pin::Pin;

use bytes::Bytes;
use encoding_rs::{Encoding, UTF_8};
use hyper_util::client::legacy::connect::HttpInfo;
use hyper::{HeaderMap, StatusCode, Version};
use hyper_util::client::legacy::connect::HttpInfo;
use mime::Mime;
#[cfg(feature = "json")]
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -35,7 +35,11 @@ impl Response {
timeout: Option<Pin<Box<Sleep>>>,
) -> Response {
let (mut parts, body) = res.into_parts();
let decoder = Decoder::detect(&mut parts.headers, super::body::response(body, timeout), accepts);
let decoder = Decoder::detect(
&mut parts.headers,
super::body::response(body, timeout),
accepts,
);
let res = hyper::Response::from_parts(parts, decoder);

Response {
Expand Down Expand Up @@ -256,7 +260,9 @@ impl Response {
pub async fn bytes(self) -> crate::Result<Bytes> {
use http_body_util::BodyExt;

BodyExt::collect(self.res.into_body()).await.map(|buf| buf.to_bytes())
BodyExt::collect(self.res.into_body())
.await
.map(|buf| buf.to_bytes())
}

/// Stream a chunk of the response body.
Expand Down Expand Up @@ -335,7 +341,7 @@ impl Response {
} else {
continue;
}
},
}
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
};
Expand Down
4 changes: 3 additions & 1 deletion src/async_impl/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ impl fmt::Debug for Upgraded {

impl From<hyper::upgrade::Upgraded> for Upgraded {
fn from(inner: hyper::upgrade::Upgraded) -> Self {
Upgraded { inner: TokioIo::new(inner) }
Upgraded {
inner: TokioIo::new(inner),
}
}
}

Expand Down
51 changes: 38 additions & 13 deletions src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
use http::header::HeaderValue;
use http::uri::{Authority, Scheme};
use http::Uri;
use hyper::rt::{Read, Write, ReadBufCursor};
use hyper_util::rt::TokioIo;
use hyper::rt::{Read, ReadBufCursor, Write};
use hyper_util::client::legacy::connect::{Connected, Connection};
use tower_service::Service;
use hyper_util::rt::TokioIo;
#[cfg(feature = "native-tls-crate")]
use native_tls_crate::{TlsConnector, TlsConnectorBuilder};
use tower_service::Service;

use pin_project_lite::pin_project;
use std::future::Future;
Expand Down Expand Up @@ -197,7 +197,7 @@ impl Connector {
let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
let io = tls_connector.connect(&host, conn).await?;
return Ok(Conn {
inner: self.verbose.wrap(io/*NativeTlsConn { inner: io }*/),
inner: self.verbose.wrap(io /*NativeTlsConn { inner: io }*/),
is_proxy: false,
tls_info: self.tls_info,
});
Expand Down Expand Up @@ -263,7 +263,14 @@ impl Connector {

if let hyper_tls::MaybeHttpsStream::Https(stream) = io {
if !self.nodelay {
stream.inner().get_ref().get_ref().get_ref().inner().inner().set_nodelay(false)?;
stream
.inner()
.get_ref()
.get_ref()
.get_ref()
.inner()
.inner()
.set_nodelay(false)?;
}
Ok(Conn {
inner: self.verbose.wrap(NativeTlsConn { inner: stream }),
Expand Down Expand Up @@ -354,7 +361,9 @@ impl Connector {
.connect(host.ok_or("no host in url")?, TokioIo::new(tunneled))
.await?;
return Ok(Conn {
inner: self.verbose.wrap(NativeTlsConn { inner: TokioIo::new(io) }),
inner: self.verbose.wrap(NativeTlsConn {
inner: TokioIo::new(io),
}),
is_proxy: false,
tls_info: false,
});
Expand Down Expand Up @@ -498,7 +507,11 @@ impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<TokioIo<tokio::net::
}

#[cfg(feature = "default-tls")]
impl TlsInfoFactory for tokio_native_tls::TlsStream<TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>> {
impl TlsInfoFactory
for tokio_native_tls::TlsStream<
TokioIo<hyper_tls::MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>,
>
{
fn tls_info(&self) -> Option<crate::tls::TlsInfo> {
let peer_certificate = self
.get_ref()
Expand Down Expand Up @@ -748,6 +761,8 @@ fn tunnel_eof() -> BoxError {
#[cfg(feature = "default-tls")]
mod native_tls_conn {
use super::TlsInfoFactory;
use hyper::rt::{Read, ReadBufCursor, Write};
use hyper_tls::MaybeHttpsStream;
use hyper_util::client::legacy::connect::{Connected, Connection};
use hyper_util::rt::TokioIo;
use pin_project_lite::pin_project;
Expand All @@ -756,8 +771,6 @@ mod native_tls_conn {
pin::Pin,
task::{Context, Poll},
};
use hyper::rt::{Read, ReadBufCursor, Write};
use hyper_tls::MaybeHttpsStream;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_native_tls::TlsStream;
Expand All @@ -770,13 +783,25 @@ mod native_tls_conn {

impl Connection for NativeTlsConn<TokioIo<TokioIo<TcpStream>>> {
fn connected(&self) -> Connected {
self.inner.inner().get_ref().get_ref().get_ref().inner().connected()
self.inner
.inner()
.get_ref()
.get_ref()
.get_ref()
.inner()
.connected()
}
}

impl Connection for NativeTlsConn<TokioIo<MaybeHttpsStream<TokioIo<tokio::net::TcpStream>>>> {
fn connected(&self) -> Connected {
self.inner.inner().get_ref().get_ref().get_ref().inner().connected()
self.inner
.inner()
.get_ref()
.get_ref()
.get_ref()
.inner()
.connected()
}
}

Expand Down Expand Up @@ -1000,13 +1025,13 @@ mod socks {
}

mod verbose {
use hyper::rt::{Read, ReadBufCursor, Write};
use hyper_util::client::legacy::connect::{Connected, Connection};
use std::cmp::min;
use std::fmt;
use std::io::{self, IoSlice};
use std::pin::Pin;
use std::task::{Context, Poll};
use hyper::rt::{Read, Write, ReadBufCursor};
use hyper_util::client::legacy::connect::{Connected, Connection};

pub(super) const OFF: Wrapper = Wrapper(false);

Expand Down

0 comments on commit 1569dd5

Please sign in to comment.