From a3357d2a575d8bf1f8dc147dc0c681cc9955deef Mon Sep 17 00:00:00 2001 From: Alcibiades Athens Date: Wed, 11 Sep 2024 05:35:08 -0400 Subject: [PATCH] chore: attempt optimal settings --- Cargo.toml | 6 +- benches/hello_world_tower_hyper_tls_tcp.rs | 201 +++++++++++---------- src/http.rs | 36 +++- 3 files changed, 138 insertions(+), 105 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 51fbe33..e2c7147 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,18 +19,18 @@ http = "1.1.0" http-body = "1.0.1" http-body-util = "0.1.2" hyper = "1.4.1" +hyper-rustls = "0.27.3" hyper-util = { version = "0.1.8", features = ["server", "tokio", "server-auto", "server-graceful", "service"] } pin-project = "1.1.5" +rand = "0.9.0-alpha.2" rustls = "0.23.13" rustls-pemfile = "2.1.3" tokio = { version = "1.40.0", features = ["net", "macros", "rt-multi-thread"] } tokio-rustls = "0.26.0" tokio-stream = { version = "0.1.16", features = ["net"] } +tokio-util = "0.7.12" tower = { version = "0.5.1", features = ["util"] } tracing = "0.1.40" -tokio-util = "0.7.12" -hyper-rustls = "0.27.3" -rand = "0.9.0-alpha.2" [dev-dependencies] criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] } diff --git a/benches/hello_world_tower_hyper_tls_tcp.rs b/benches/hello_world_tower_hyper_tls_tcp.rs index 015da77..129dc3d 100644 --- a/benches/hello_world_tower_hyper_tls_tcp.rs +++ b/benches/hello_world_tower_hyper_tls_tcp.rs @@ -1,26 +1,26 @@ -use rustls::ClientConfig; -use rustls::RootCertStore; -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::Duration; use bytes::Bytes; -use criterion::{criterion_group, criterion_main, Criterion, BenchmarkId}; -use futures::future::join_all; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use http::{Request, Response, StatusCode, Uri}; -use http_body_util::{Empty, Full, BodyExt}; +use http_body_util::{BodyExt, Empty, Full}; use hyper::body::Incoming; +use hyper_rustls::HttpsConnectorBuilder; +use hyper_server::{load_certs, load_private_key, serve_http_with_shutdown}; +use hyper_util::client::legacy::Client; use hyper_util::rt::TokioExecutor; use hyper_util::server::conn::auto::Builder as HttpConnectionBuilder; -use rustls::ServerConfig; -use tokio::net::TcpListener; +use hyper_util::service::TowerToHyperService; +use rustls::{ClientConfig, RootCertStore, ServerConfig}; +use std::net::SocketAddr; +use std::sync::Arc; +use rustls::server::ServerSessionMemoryCache; +use tokio::net::{TcpSocket}; use tokio::runtime::Runtime; -use tokio::sync::{oneshot, Semaphore}; +use tokio::sync::oneshot; +use tokio::time::{Duration, Instant}; use tokio_stream::wrappers::TcpListenerStream; -use hyper_util::service::TowerToHyperService; use tracing::info; -use hyper_server::{load_certs, load_private_key, serve_http_with_shutdown}; -use hyper_rustls::HttpsConnectorBuilder; -use hyper_util::client::legacy::Client; async fn echo(req: Request) -> Result>, hyper::Error> { match (req.method(), req.uri().path()) { @@ -28,7 +28,7 @@ async fn echo(req: Request) -> Result>, hyper::Er (&hyper::Method::POST, "/echo") => { let body = req.collect().await?.to_bytes(); Ok(Response::new(Full::new(body))) - }, + } _ => { let mut res = Response::new(Full::new(Bytes::from("Not Found"))); *res.status_mut() = StatusCode::NOT_FOUND; @@ -37,30 +37,51 @@ async fn echo(req: Request) -> Result>, hyper::Er } } -async fn setup_server() -> Result<(TcpListenerStream, SocketAddr, Arc), Box> { - let addr = SocketAddr::from(([127, 0, 0, 1], 0)); - let listener = TcpListener::bind(addr).await?; +async fn setup_server() -> Result< + (TcpListenerStream, SocketAddr, Arc), + Box, +> { + // Socket configuration + let addr = SocketAddr::from(([127, 0, 0, 1], 0)); // Listen on all interfaces + let socket = TcpSocket::new_v4()?; + socket.set_send_buffer_size(262_144)?; // 256 KB + socket.set_recv_buffer_size(262_144)?; // 256 KB + socket.set_nodelay(true)?; // Disable Nagle's algorithm + socket.bind(addr)?; + let listener = socket.listen(8192)?; // Increase backlog for high-traffic scenarios let server_addr = listener.local_addr()?; let incoming = TcpListenerStream::new(listener); + // Load certificates and private key let certs = load_certs("examples/sample.pem")?; let key = load_private_key("examples/sample.rsa")?; + // TLS configuration let mut config = ServerConfig::builder() .with_no_client_auth() .with_single_cert(certs, key) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()]; + + // ALPN configuration + config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + + // Performance optimizations + config.max_fragment_size = Some(16384); // Larger fragment size for powerful servers + config.send_half_rtt_data = true; // Enable 0.5-RTT data + config.session_storage = ServerSessionMemoryCache::new(10240); // Larger session cache + config.max_early_data_size = 16384; // Enable 0-RTT data + let tls_config = Arc::new(config); Ok((incoming, server_addr, tls_config)) } - -async fn start_server() -> Result<(SocketAddr, oneshot::Sender<()>), Box> { +async fn start_server( +) -> Result<(SocketAddr, oneshot::Sender<()>), Box> { let (incoming, server_addr, tls_config) = setup_server().await?; let (shutdown_tx, shutdown_rx) = oneshot::channel(); let http_server_builder = HttpConnectionBuilder::new(TokioExecutor::new()); + let tower_service_fn = tower::service_fn(echo); let hyper_service = TowerToHyperService::new(tower_service_fn); tokio::spawn(async move { @@ -69,20 +90,59 @@ async fn start_server() -> Result<(SocketAddr, oneshot::Sender<()>), Box, Empty>, url: Uri) -> Result<(), Box> { +async fn send_request( + client: &Client< + hyper_rustls::HttpsConnector, + Empty, + >, + url: Uri, +) -> Result<(Duration, usize), Box> { + let start = Instant::now(); let res = client.get(url).await?; assert_eq!(res.status(), StatusCode::OK); let body = res.into_body().collect().await?.to_bytes(); assert_eq!(&body[..], b"Hello, World!"); - Ok(()) + Ok((start.elapsed(), body.len())) +} + +async fn concurrent_benchmark( + client: &Client< + hyper_rustls::HttpsConnector, + Empty, + >, + url: Uri, + num_requests: usize, +) -> (Duration, Vec, usize) { + let start = Instant::now(); + let mut futures = FuturesUnordered::new(); + + for _ in 0..num_requests { + let client = client.clone(); + let url = url.clone(); + futures.push(async move { send_request(&client, url).await }); + } + + let mut request_times = Vec::with_capacity(num_requests); + let mut total_bytes = 0; + while let Some(result) = futures.next().await { + if let Ok((duration, bytes)) = result { + request_times.push(duration); + total_bytes += bytes; + } + } + + let total_time = start.elapsed(); + (total_time, request_times, total_bytes) } fn bench_server(c: &mut Criterion) { @@ -117,102 +177,43 @@ fn bench_server(c: &mut Criterion) { .expect("Failed to build URI"); let mut group = c.benchmark_group("hyper_server"); - group.sample_size(10); - group.measurement_time(Duration::from_secs(20)); + group.sample_size(20); + group.measurement_time(Duration::from_secs(30)); - // Single request latency - group.bench_function("single_request_latency", |b| { + // Latency test + group.bench_function("latency", |b| { let client = client.clone(); let url = url.clone(); - b.to_async(&rt).iter(|| async { - send_request(&client, url.clone()).await.unwrap() - }); + b.to_async(&rt) + .iter(|| async { send_request(&client, url.clone()).await.unwrap().0 }); }); // Throughput test + group.throughput(Throughput::Elements(1)); group.bench_function("throughput", |b| { let client = client.clone(); let url = url.clone(); - b.to_async(&rt).iter_custom(|iters| { - let client = client.clone(); - let url = url.clone(); - async move { - let start = std::time::Instant::now(); - for _ in 0..iters { - send_request(&client, url.clone()).await.unwrap(); - } - start.elapsed() - } - }); + b.to_async(&rt) + .iter(|| async { send_request(&client, url.clone()).await.unwrap() }); }); - // Concurrent connections test - let concurrent_requests = vec![10, 50, 100, 200]; + // Concurrency stress test + let concurrent_requests = vec![1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987]; // Fibonacci sequence for &num_requests in &concurrent_requests { + group.throughput(Throughput::Elements(num_requests as u64)); group.bench_with_input( BenchmarkId::new("concurrent_requests", num_requests), &num_requests, |b, &num_requests| { let client = client.clone(); let url = url.clone(); - let semaphore = Arc::new(Semaphore::new(num_requests)); b.to_async(&rt).iter(|| async { - let requests = (0..num_requests).map(|_| { - let client = client.clone(); - let url = url.clone(); - let semaphore = semaphore.clone(); - async move { - let _permit = semaphore.acquire().await.unwrap(); - send_request(&client, url).await - } - }); - join_all(requests).await.into_iter().collect::, _>>().unwrap() + concurrent_benchmark(&client, url.clone(), num_requests).await }); }, ); } - let post_url = Uri::builder() - .scheme("https") - .authority(format!("localhost:{}", server_addr.port())) - .path_and_query("/echo") - .build() - .expect("Failed to build POST URI"); - - group.bench_function("post_request_with_payload", |b| { - let client = client.clone(); - let post_url = post_url.clone(); - b.to_async(&rt).iter(|| async { - let req = Request::builder() - .method("POST") - .uri(post_url.clone()) - .body(Empty::::new()) - .unwrap(); - let res = client.request(req).await.unwrap(); - assert_eq!(res.status(), StatusCode::OK); - let body = res.into_body().collect().await.unwrap().to_bytes(); - assert_eq!(&body[..], b""); // The echo endpoint will return an empty body for an empty request - }); - }); - - // Long-running connection test - group.bench_function("long_running_connection", |b| { - let client = client.clone(); - let url = url.clone(); - b.to_async(&rt).iter_custom(|iters| { - let client = client.clone(); - let url = url.clone(); - async move { - let start = std::time::Instant::now(); - for _ in 0..iters { - send_request(&client, url.clone()).await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - } - start.elapsed() - } - }); - }); - group.finish(); rt.block_on(async { @@ -230,4 +231,4 @@ criterion_group! { targets = bench_server } -criterion_main!(benches); \ No newline at end of file +criterion_main!(benches); diff --git a/src/http.rs b/src/http.rs index b1c950c..28bb37e 100644 --- a/src/http.rs +++ b/src/http.rs @@ -12,6 +12,7 @@ use hyper_util::{ rt::TokioIo, server::conn::auto::{Builder as HttpConnectionBuilder, HttpServerConnExec}, }; +use hyper_util::server::conn::auto::Http2Builder; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::time::sleep; use tokio_stream::Stream; @@ -58,7 +59,7 @@ async fn sleep_or_pending(wait_for: Option) { pub async fn serve_http_connection( hyper_io: IO, hyper_service: S, - builder: HttpConnectionBuilder, + builder: Http2Builder, mut watcher: Option>, max_connection_age: Option, ) where @@ -79,8 +80,39 @@ pub async fn serve_http_connection( inner: watcher.as_mut().map(|w| w.changed()), }); + // TODO(How to accept a preconfigured builder) + // The API here for hyper_util is poor. + // Really what you want to do is configure a builder like this + // and pass it in for use as a builder, however, you cannot + // the simple way may be to require configuration and + // then accept an immutable reference to an http2 connection builder + let mut builder = builder.clone(); + builder + // HTTP/1 settings + .http1() + .half_close(true) + .keep_alive(true) + .max_buf_size(64 * 1024) + .pipeline_flush(true) + .preserve_header_case(true) + .title_case_headers(false) + + // HTTP/2 settings + .http2() + .initial_stream_window_size(Some(1024 * 1024)) + .initial_connection_window_size(Some(2 * 1024 * 1024)) + .adaptive_window(true) + .max_frame_size(Some(16 * 1024)) + .max_concurrent_streams(Some(1000)) + .max_send_buf_size(1024 * 1024) + .enable_connect_protocol() + .max_header_list_size(16 * 1024) + .keep_alive_interval(Some(Duration::from_secs(20))) + .keep_alive_timeout(Duration::from_secs(20)); + // Create and pin the HTTP connection - let mut conn = pin!(builder.serve_connection(hyper_io, hyper_service)); + let mut conn = pin!(builder.serve_connection_with_upgrades(hyper_io, hyper_service) + ); // Set up the sleep future for max connection age let sleep = sleep_or_pending(max_connection_age);