Skip to content

Commit

Permalink
chore: attempt optimal settings
Browse files Browse the repository at this point in the history
  • Loading branch information
0xAlcibiades committed Sep 11, 2024
1 parent 04d88eb commit a3357d2
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 105 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ http = "1.1.0"
http-body = "1.0.1"
http-body-util = "0.1.2"
hyper = "1.4.1"
hyper-rustls = "0.27.3"
hyper-util = { version = "0.1.8", features = ["server", "tokio", "server-auto", "server-graceful", "service"] }
pin-project = "1.1.5"
rand = "0.9.0-alpha.2"
rustls = "0.23.13"
rustls-pemfile = "2.1.3"
tokio = { version = "1.40.0", features = ["net", "macros", "rt-multi-thread"] }
tokio-rustls = "0.26.0"
tokio-stream = { version = "0.1.16", features = ["net"] }
tokio-util = "0.7.12"
tower = { version = "0.5.1", features = ["util"] }
tracing = "0.1.40"
tokio-util = "0.7.12"
hyper-rustls = "0.27.3"
rand = "0.9.0-alpha.2"

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] }
Expand Down
201 changes: 101 additions & 100 deletions benches/hello_world_tower_hyper_tls_tcp.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
use rustls::ClientConfig;
use rustls::RootCertStore;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion, BenchmarkId};
use futures::future::join_all;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use http::{Request, Response, StatusCode, Uri};
use http_body_util::{Empty, Full, BodyExt};
use http_body_util::{BodyExt, Empty, Full};
use hyper::body::Incoming;
use hyper_rustls::HttpsConnectorBuilder;
use hyper_server::{load_certs, load_private_key, serve_http_with_shutdown};
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use hyper_util::server::conn::auto::Builder as HttpConnectionBuilder;
use rustls::ServerConfig;
use tokio::net::TcpListener;
use hyper_util::service::TowerToHyperService;
use rustls::{ClientConfig, RootCertStore, ServerConfig};
use std::net::SocketAddr;
use std::sync::Arc;
use rustls::server::ServerSessionMemoryCache;
use tokio::net::{TcpSocket};
use tokio::runtime::Runtime;
use tokio::sync::{oneshot, Semaphore};
use tokio::sync::oneshot;
use tokio::time::{Duration, Instant};
use tokio_stream::wrappers::TcpListenerStream;
use hyper_util::service::TowerToHyperService;
use tracing::info;
use hyper_server::{load_certs, load_private_key, serve_http_with_shutdown};
use hyper_rustls::HttpsConnectorBuilder;
use hyper_util::client::legacy::Client;

async fn echo(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&hyper::Method::GET, "/") => Ok(Response::new(Full::new(Bytes::from("Hello, World!")))),
(&hyper::Method::POST, "/echo") => {
let body = req.collect().await?.to_bytes();
Ok(Response::new(Full::new(body)))
},
}
_ => {
let mut res = Response::new(Full::new(Bytes::from("Not Found")));
*res.status_mut() = StatusCode::NOT_FOUND;
Expand All @@ -37,30 +37,51 @@ async fn echo(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, hyper::Er
}
}

async fn setup_server() -> Result<(TcpListenerStream, SocketAddr, Arc<ServerConfig>), Box<dyn std::error::Error + Send + Sync>> {
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let listener = TcpListener::bind(addr).await?;
async fn setup_server() -> Result<
(TcpListenerStream, SocketAddr, Arc<ServerConfig>),
Box<dyn std::error::Error + Send + Sync>,
> {
// Socket configuration
let addr = SocketAddr::from(([127, 0, 0, 1], 0)); // Listen on all interfaces
let socket = TcpSocket::new_v4()?;
socket.set_send_buffer_size(262_144)?; // 256 KB
socket.set_recv_buffer_size(262_144)?; // 256 KB
socket.set_nodelay(true)?; // Disable Nagle's algorithm
socket.bind(addr)?;
let listener = socket.listen(8192)?; // Increase backlog for high-traffic scenarios
let server_addr = listener.local_addr()?;
let incoming = TcpListenerStream::new(listener);

// Load certificates and private key
let certs = load_certs("examples/sample.pem")?;
let key = load_private_key("examples/sample.rsa")?;

// TLS configuration
let mut config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(certs, key)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];

// ALPN configuration
config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];

// Performance optimizations
config.max_fragment_size = Some(16384); // Larger fragment size for powerful servers
config.send_half_rtt_data = true; // Enable 0.5-RTT data
config.session_storage = ServerSessionMemoryCache::new(10240); // Larger session cache
config.max_early_data_size = 16384; // Enable 0-RTT data

let tls_config = Arc::new(config);

Ok((incoming, server_addr, tls_config))
}


async fn start_server() -> Result<(SocketAddr, oneshot::Sender<()>), Box<dyn std::error::Error + Send + Sync>> {
async fn start_server(
) -> Result<(SocketAddr, oneshot::Sender<()>), Box<dyn std::error::Error + Send + Sync>> {
let (incoming, server_addr, tls_config) = setup_server().await?;
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let http_server_builder = HttpConnectionBuilder::new(TokioExecutor::new());

let tower_service_fn = tower::service_fn(echo);
let hyper_service = TowerToHyperService::new(tower_service_fn);
tokio::spawn(async move {
Expand All @@ -69,20 +90,59 @@ async fn start_server() -> Result<(SocketAddr, oneshot::Sender<()>), Box<dyn std
incoming,
http_server_builder,
Some(tls_config),
Some(async { shutdown_rx.await.ok(); }),
Some(async {
shutdown_rx.await.ok();
}),
)
.await
.unwrap();
.await
.unwrap();
});
Ok((server_addr, shutdown_tx))
}

async fn send_request(client: &Client<hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>, Empty<Bytes>>, url: Uri) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
async fn send_request(
client: &Client<
hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
Empty<Bytes>,
>,
url: Uri,
) -> Result<(Duration, usize), Box<dyn std::error::Error + Send + Sync>> {
let start = Instant::now();
let res = client.get(url).await?;
assert_eq!(res.status(), StatusCode::OK);
let body = res.into_body().collect().await?.to_bytes();
assert_eq!(&body[..], b"Hello, World!");
Ok(())
Ok((start.elapsed(), body.len()))
}

async fn concurrent_benchmark(
client: &Client<
hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
Empty<Bytes>,
>,
url: Uri,
num_requests: usize,
) -> (Duration, Vec<Duration>, usize) {
let start = Instant::now();
let mut futures = FuturesUnordered::new();

for _ in 0..num_requests {
let client = client.clone();
let url = url.clone();
futures.push(async move { send_request(&client, url).await });
}

let mut request_times = Vec::with_capacity(num_requests);
let mut total_bytes = 0;
while let Some(result) = futures.next().await {
if let Ok((duration, bytes)) = result {
request_times.push(duration);
total_bytes += bytes;
}
}

let total_time = start.elapsed();
(total_time, request_times, total_bytes)
}

fn bench_server(c: &mut Criterion) {
Expand Down Expand Up @@ -117,102 +177,43 @@ fn bench_server(c: &mut Criterion) {
.expect("Failed to build URI");

let mut group = c.benchmark_group("hyper_server");
group.sample_size(10);
group.measurement_time(Duration::from_secs(20));
group.sample_size(20);
group.measurement_time(Duration::from_secs(30));

// Single request latency
group.bench_function("single_request_latency", |b| {
// Latency test
group.bench_function("latency", |b| {
let client = client.clone();
let url = url.clone();
b.to_async(&rt).iter(|| async {
send_request(&client, url.clone()).await.unwrap()
});
b.to_async(&rt)
.iter(|| async { send_request(&client, url.clone()).await.unwrap().0 });
});

// Throughput test
group.throughput(Throughput::Elements(1));
group.bench_function("throughput", |b| {
let client = client.clone();
let url = url.clone();
b.to_async(&rt).iter_custom(|iters| {
let client = client.clone();
let url = url.clone();
async move {
let start = std::time::Instant::now();
for _ in 0..iters {
send_request(&client, url.clone()).await.unwrap();
}
start.elapsed()
}
});
b.to_async(&rt)
.iter(|| async { send_request(&client, url.clone()).await.unwrap() });
});

// Concurrent connections test
let concurrent_requests = vec![10, 50, 100, 200];
// Concurrency stress test
let concurrent_requests = vec![1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987]; // Fibonacci sequence
for &num_requests in &concurrent_requests {
group.throughput(Throughput::Elements(num_requests as u64));
group.bench_with_input(
BenchmarkId::new("concurrent_requests", num_requests),
&num_requests,
|b, &num_requests| {
let client = client.clone();
let url = url.clone();
let semaphore = Arc::new(Semaphore::new(num_requests));
b.to_async(&rt).iter(|| async {
let requests = (0..num_requests).map(|_| {
let client = client.clone();
let url = url.clone();
let semaphore = semaphore.clone();
async move {
let _permit = semaphore.acquire().await.unwrap();
send_request(&client, url).await
}
});
join_all(requests).await.into_iter().collect::<Result<Vec<_>, _>>().unwrap()
concurrent_benchmark(&client, url.clone(), num_requests).await
});
},
);
}

let post_url = Uri::builder()
.scheme("https")
.authority(format!("localhost:{}", server_addr.port()))
.path_and_query("/echo")
.build()
.expect("Failed to build POST URI");

group.bench_function("post_request_with_payload", |b| {
let client = client.clone();
let post_url = post_url.clone();
b.to_async(&rt).iter(|| async {
let req = Request::builder()
.method("POST")
.uri(post_url.clone())
.body(Empty::<Bytes>::new())
.unwrap();
let res = client.request(req).await.unwrap();
assert_eq!(res.status(), StatusCode::OK);
let body = res.into_body().collect().await.unwrap().to_bytes();
assert_eq!(&body[..], b""); // The echo endpoint will return an empty body for an empty request
});
});

// Long-running connection test
group.bench_function("long_running_connection", |b| {
let client = client.clone();
let url = url.clone();
b.to_async(&rt).iter_custom(|iters| {
let client = client.clone();
let url = url.clone();
async move {
let start = std::time::Instant::now();
for _ in 0..iters {
send_request(&client, url.clone()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
}
start.elapsed()
}
});
});

group.finish();

rt.block_on(async {
Expand All @@ -230,4 +231,4 @@ criterion_group! {
targets = bench_server
}

criterion_main!(benches);
criterion_main!(benches);
36 changes: 34 additions & 2 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use hyper_util::{
rt::TokioIo,
server::conn::auto::{Builder as HttpConnectionBuilder, HttpServerConnExec},
};
use hyper_util::server::conn::auto::Http2Builder;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::sleep;
use tokio_stream::Stream;
Expand Down Expand Up @@ -58,7 +59,7 @@ async fn sleep_or_pending(wait_for: Option<Duration>) {
pub async fn serve_http_connection<B, IO, S, E>(
hyper_io: IO,
hyper_service: S,
builder: HttpConnectionBuilder<E>,
builder: Http2Builder<E>,
mut watcher: Option<tokio::sync::watch::Receiver<()>>,
max_connection_age: Option<Duration>,
) where
Expand All @@ -79,8 +80,39 @@ pub async fn serve_http_connection<B, IO, S, E>(
inner: watcher.as_mut().map(|w| w.changed()),
});

// TODO(How to accept a preconfigured builder)
// The API here for hyper_util is poor.
// Really what you want to do is configure a builder like this
// and pass it in for use as a builder, however, you cannot
// the simple way may be to require configuration and
// then accept an immutable reference to an http2 connection builder
let mut builder = builder.clone();
builder
// HTTP/1 settings
.http1()
.half_close(true)
.keep_alive(true)
.max_buf_size(64 * 1024)
.pipeline_flush(true)
.preserve_header_case(true)
.title_case_headers(false)

// HTTP/2 settings
.http2()
.initial_stream_window_size(Some(1024 * 1024))
.initial_connection_window_size(Some(2 * 1024 * 1024))
.adaptive_window(true)
.max_frame_size(Some(16 * 1024))
.max_concurrent_streams(Some(1000))
.max_send_buf_size(1024 * 1024)
.enable_connect_protocol()
.max_header_list_size(16 * 1024)
.keep_alive_interval(Some(Duration::from_secs(20)))
.keep_alive_timeout(Duration::from_secs(20));

// Create and pin the HTTP connection
let mut conn = pin!(builder.serve_connection(hyper_io, hyper_service));
let mut conn = pin!(builder.serve_connection_with_upgrades(hyper_io, hyper_service)
);

// Set up the sleep future for max connection age
let sleep = sleep_or_pending(max_connection_age);
Expand Down

0 comments on commit a3357d2

Please sign in to comment.