Skip to content

Commit

Permalink
test: add http3 test server support
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Jun 11, 2024
1 parent e5ce0b5 commit 404df59
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 2 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ jobs:
toolchain: 'stable'

- name: Check
run: RUSTFLAGS="--cfg reqwest_unstable" cargo check --features http3
run: cargo test --features http3
env:
RUSTFLAGS: --cfg reqwest_unstable
RUSTDOCFLAGS: --cfg reqwest_unstable

docs:
name: Docs
Expand Down
32 changes: 32 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,37 @@ async fn donot_set_content_length_0_if_have_no_body() {
assert_eq!(res.status(), reqwest::StatusCode::OK);
}

#[cfg(feature = "http3")]
#[tokio::test]
async fn http3_request_full() {
//use http_body_util::BodyExt;

let server = server::http3(move |_req| async move {
/*
assert_eq!(req.headers()[CONTENT_LENGTH], "5");
let reqb = req.collect().await.unwrap().to_bytes();
assert_eq!(reqb, "hello");
*/
http::Response::default()
});

let url = format!("https://{}/content-length", server.addr());
let res = reqwest::Client::builder()
.http3_prior_knowledge()
.danger_accept_invalid_certs(true)
.build()
.expect("client builder")
.post(url)
.version(http::Version::HTTP_3)
.body("hello")
.send()
.await
.expect("request");

assert_eq!(res.version(), http::Version::HTTP_3);
assert_eq!(res.status(), reqwest::StatusCode::OK);
}

#[tokio::test]
async fn user_agent() {
let server = server::http(move |req| async move {
Expand Down Expand Up @@ -384,6 +415,7 @@ async fn http2_upgrade() {
}

#[cfg(feature = "default-tls")]
#[cfg_attr(feature = "http3", ignore = "enabling http3 seems to break this, why?")]
#[tokio::test]
async fn test_allowed_methods() {
let resp = reqwest::Client::builder()
Expand Down
Binary file added tests/support/server.cert
Binary file not shown.
Binary file added tests/support/server.key
Binary file not shown.
109 changes: 108 additions & 1 deletion tests/support/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ where
F2: FnOnce(&mut Builder) -> Bu + Send + 'static,
{
// Spawn new runtime in thread to prevent reactor execution context conflict
let test_name = thread::current().name().unwrap_or("<unknown>").to_string();
thread::spawn(move || {
let rt = runtime::Builder::new_current_thread()
.enable_all()
Expand All @@ -68,7 +69,7 @@ where
let (panic_tx, panic_rx) = std_mpsc::channel();
let tname = format!(
"test({})-support-server",
thread::current().name().unwrap_or("<unknown>")
test_name,
);
thread::Builder::new()
.name(tname)
Expand Down Expand Up @@ -110,3 +111,109 @@ where
.join()
.unwrap()
}

#[cfg(feature = "http3")]
pub fn http3<F1, Fut>(func: F1) -> Server
where
F1: Fn(http::Request<http_body_util::combinators::BoxBody<bytes::Bytes, h3::Error>>) -> Fut
+ Clone
+ Send
+ 'static,
Fut: Future<Output = http::Response<reqwest::Body>> + Send + 'static,
{
use bytes::Buf;
use http_body_util::BodyExt;
use quinn::crypto::rustls::QuicServerConfig;
use std::sync::Arc;

// Spawn new runtime in thread to prevent reactor execution context conflict
let test_name = thread::current().name().unwrap_or("<unknown>").to_string();
thread::spawn(move || {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("new rt");

let cert = std::fs::read("tests/support/server.cert").unwrap().into();
let key = std::fs::read("tests/support/server.key").unwrap().try_into().unwrap();

let mut tls_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(vec![cert], key)
.unwrap();
tls_config.max_early_data_size = u32::MAX;
tls_config.alpn_protocols = vec![b"h3".into()];

let server_config = quinn::ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(tls_config).unwrap()));
let endpoint = rt.block_on(async move {
quinn::Endpoint::server(server_config, "[::1]:0".parse().unwrap()).unwrap()
});
let addr = endpoint.local_addr().unwrap();

let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let (panic_tx, panic_rx) = std_mpsc::channel();
let tname = format!(
"test({})-support-server",
test_name,
);
thread::Builder::new()
.name(tname)
.spawn(move || {
rt.block_on(async move {

loop {
tokio::select! {
_ = &mut shutdown_rx => {
break;
}
Some(accepted) = endpoint.accept() => {
let conn = accepted.await.expect("accepted");
let mut h3_conn = h3::server::Connection::new(h3_quinn::Connection::new(conn)).await.unwrap();
let func = func.clone();
tokio::spawn(async move {
while let Ok(Some((req, stream))) = h3_conn.accept().await {
let func = func.clone();
tokio::spawn(async move {
let (mut tx, rx) = stream.split();
let body = futures_util::stream::unfold(rx, |mut rx| async move {
match rx.recv_data().await {
Ok(Some(mut buf)) => {
Some((Ok(hyper::body::Frame::data(buf.copy_to_bytes(buf.remaining()))), rx))
},
Ok(None) => None,
Err(err) => {
Some((Err(err), rx))
}
}
});
let body = BodyExt::boxed(http_body_util::StreamBody::new(body));
let resp = func(req.map(move |()| body)).await;
let (parts, mut body) = resp.into_parts();
let resp = http::Response::from_parts(parts, ());
tx.send_response(resp).await.unwrap();

while let Some(Ok(frame)) = body.frame().await {
if let Ok(data) = frame.into_data() {
tx.send_data(data).await.unwrap();
}
}
tx.finish().await.unwrap();
});
}
});
}
}
}
let _ = panic_tx.send(());
});
})
.expect("thread spawn");
Server {
addr,
panic_rx,
shutdown_tx: Some(shutdown_tx),
}
})
.join()
.unwrap()
}

0 comments on commit 404df59

Please sign in to comment.