diff --git a/Cargo.toml b/Cargo.toml index e2c7147..1690145 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ hyper-rustls = "0.27.3" hyper-util = { version = "0.1.8", features = ["server", "tokio", "server-auto", "server-graceful", "service"] } pin-project = "1.1.5" rand = "0.9.0-alpha.2" -rustls = "0.23.13" +rustls = { version = "0.23.13", features = ["zlib"] } rustls-pemfile = "2.1.3" tokio = { version = "1.40.0", features = ["net", "macros", "rt-multi-thread"] } tokio-rustls = "0.26.0" @@ -31,14 +31,18 @@ tokio-stream = { version = "0.1.16", features = ["net"] } tokio-util = "0.7.12" tower = { version = "0.5.1", features = ["util"] } tracing = "0.1.40" +jemallocator = { version = "0.5", optional = true } [dev-dependencies] criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] } hyper = { version = "1.4.1", features = ["client"] } -tokio = { version = "1.0", features = ["rt", "net", "test-util"] } +tokio = { version = "1.40", features = ["rt", "net", "test-util"] } tokio-util = { version = "0.7", features = ["compat"] } tracing-subscriber = "0.3.18" [[bench]] name = "hello_world_tower_hyper_tls_tcp" harness = false + +[features] +jemalloc = ["jemallocator"] diff --git a/benches/hello_world_tower_hyper_tls_tcp.rs b/benches/hello_world_tower_hyper_tls_tcp.rs index 2acdefa..6293823 100644 --- a/benches/hello_world_tower_hyper_tls_tcp.rs +++ b/benches/hello_world_tower_hyper_tls_tcp.rs @@ -69,6 +69,7 @@ async fn setup_server() -> Result< config.max_fragment_size = Some(16384); // Larger fragment size for powerful servers config.send_half_rtt_data = true; // Enable 0.5-RTT data config.session_storage = ServerSessionMemoryCache::new(10240); // Larger session cache + config.cert_compression_cache = Arc::new(rustls::compress::CompressionCache::default()); config.max_early_data_size = 16384; // Enable 0-RTT data let tls_config = Arc::new(config); @@ -154,9 +155,15 @@ fn bench_server(c: &mut Criterion) { let mut root_cert_store = RootCertStore::empty(); root_cert_store.add_parsable_certificates(load_certs("examples/sample.pem").unwrap()); - let client_config = ClientConfig::builder() + let mut client_config = ClientConfig::builder() .with_root_certificates(root_cert_store) .with_no_client_auth(); + // Enable handshake resumption + client_config.resumption = rustls::client::Resumption::in_memory_sessions(10240); + client_config.cert_compression_cache = + Arc::new(rustls::compress::CompressionCache::default()); + client_config.max_fragment_size = Some(16384); // Larger fragment size for powerful servers + client_config.enable_early_data = true; // Enable 0-RTT data let https = HttpsConnectorBuilder::new() .with_tls_config(client_config) @@ -198,7 +205,7 @@ fn bench_server(c: &mut Criterion) { }); // Concurrency stress test - let concurrent_requests = vec![1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987]; // Fibonacci sequence + let concurrent_requests = vec![1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987]; // log sequence for &num_requests in &concurrent_requests { group.throughput(Throughput::Elements(num_requests as u64)); group.bench_with_input( diff --git a/src/http.rs b/src/http.rs index 56dc96d..6957369 100644 --- a/src/http.rs +++ b/src/http.rs @@ -28,6 +28,7 @@ use crate::fuse::Fuse; /// /// * `wait_for` - An `Option` specifying how long to sleep. /// If `None`, the function will wait indefinitely. +#[inline] async fn sleep_or_pending(wait_for: Option) { match wait_for { Some(wait) => sleep(wait).await, @@ -55,6 +56,7 @@ async fn sleep_or_pending(wait_for: Option) { /// * `watcher`: An optional `tokio::sync::watch::Receiver` for graceful shutdown signaling. /// * `max_connection_age`: An optional `Duration` specifying the maximum age of the connection /// before initiating a graceful shutdown. +#[inline] pub async fn serve_http_connection( hyper_io: IO, hyper_service: S, @@ -79,6 +81,36 @@ pub async fn serve_http_connection( inner: watcher.as_mut().map(|w| w.changed()), }); + let builder = builder.clone(); + // TODO(How to accept a preconfigured builder) + // The API here for hyper_util is poor. + // Really what you want to do is configure a builder like this + // and pass it in for use as a builder, however, you cannot + // the simple way may be to require configuration and + // then accept an immutable reference to an http2 connection builder + let mut builder = builder.clone(); + builder + // HTTP/1 settings + .http1() + .half_close(true) + .keep_alive(true) + .max_buf_size(64 * 1024) + .pipeline_flush(true) + .preserve_header_case(true) + .title_case_headers(false) + // HTTP/2 settings + .http2() + .initial_stream_window_size(Some(1024 * 1024)) + .initial_connection_window_size(Some(2 * 1024 * 1024)) + .adaptive_window(true) + .max_frame_size(Some(16 * 1024)) + .max_concurrent_streams(Some(1000)) + .max_send_buf_size(1024 * 1024) + .enable_connect_protocol() + .max_header_list_size(16 * 1024) + .keep_alive_interval(Some(Duration::from_secs(20))) + .keep_alive_timeout(Duration::from_secs(20)); + // Create and pin the HTTP connection let mut conn = pin!(builder.serve_connection_with_upgrades(hyper_io, hyper_service)); @@ -332,6 +364,7 @@ pub async fn serve_http_connection( /// - The server will continue to accept new connections until the `signal` future resolves. /// - When using TLS, make sure to provide a properly configured `ServerConfig`. /// - The function will return when all connections have been closed after the shutdown signal. +#[inline] pub async fn serve_http_with_shutdown( service: S, incoming: I, diff --git a/src/io.rs b/src/io.rs index 48ece65..feacfa5 100644 --- a/src/io.rs +++ b/src/io.rs @@ -23,6 +23,7 @@ impl AsyncRead for Transport where IO: AsyncRead + AsyncWrite + Unpin, { + #[inline] fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -39,6 +40,7 @@ impl AsyncWrite for Transport where IO: AsyncRead + AsyncWrite + Unpin, { + #[inline] fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -50,6 +52,7 @@ where } } + #[inline] fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.get_mut() { Transport::Plain(io) => Pin::new(io).poll_flush(cx), @@ -57,6 +60,7 @@ where } } + #[inline] fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.get_mut() { Transport::Plain(io) => Pin::new(io).poll_shutdown(cx), @@ -64,6 +68,7 @@ where } } + #[inline] fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -75,6 +80,7 @@ where } } + #[inline] fn is_write_vectored(&self) -> bool { match self { Transport::Plain(io) => io.is_write_vectored(), diff --git a/src/lib.rs b/src/lib.rs index 8fe53d3..5bf2185 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,10 @@ +#[cfg(all(feature = "jemalloc", not(target_env = "msvc")))] +use jemallocator::Jemalloc; + +#[cfg(all(feature = "jemalloc", not(target_env = "msvc")))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + pub use error::{Error as TransportError, Kind as TransportErrorKind}; pub use http::serve_http_connection; pub use http::serve_http_with_shutdown; diff --git a/src/tcp.rs b/src/tcp.rs index 2d5be6d..98347d8 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -69,6 +69,7 @@ fn handle_accept_error(e: impl Into) -> ControlFlow { /// This function uses `handle_accept_error` to determine whether to continue accepting /// connections after an error occurs. Non-fatal errors are logged and skipped, while /// fatal errors cause the stream to yield an error and terminate. +#[inline] pub fn serve_tcp_incoming( incoming: impl Stream> + Send + 'static, ) -> impl Stream> diff --git a/src/tls.rs b/src/tls.rs index 61ae64a..6f55978 100644 --- a/src/tls.rs +++ b/src/tls.rs @@ -31,6 +31,7 @@ use tokio_stream::{Stream, StreamExt}; /// /// - If the input `tcp_stream` yields an error, that error is propagated. /// - If the TLS handshake fails, the error is wrapped in the crate's `Error` type. +#[inline] pub fn serve_tls_incoming( tcp_stream: impl Stream>, tls: TlsAcceptor, @@ -71,6 +72,7 @@ where /// # Returns /// /// A `Result` containing a vector of `CertificateDer` on success, or an `io::Error` on failure. +#[inline] pub fn load_certs(filename: &str) -> io::Result>> { // Open certificate file let certfile = fs::File::open(filename)?; @@ -92,6 +94,7 @@ pub fn load_certs(filename: &str) -> io::Result>> { /// # Returns /// /// A `Result` containing a `PrivateKeyDer` on success, or an `io::Error` on failure. +#[inline] pub fn load_private_key(filename: &str) -> io::Result> { // Open keyfile let keyfile = fs::File::open(filename)?;