Skip to content

Commit

Permalink
Basic optimizations (#18)
Browse files Browse the repository at this point in the history
* optimize: more

* fix: bump package versions

* fix: apples to apples

* fix: fmt
  • Loading branch information
0xAlcibiades authored Sep 11, 2024
1 parent ea358ba commit 091f0d8
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 4 deletions.
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,26 @@ hyper-rustls = "0.27.3"
hyper-util = { version = "0.1.8", features = ["server", "tokio", "server-auto", "server-graceful", "service"] }
pin-project = "1.1.5"
rand = "0.9.0-alpha.2"
rustls = "0.23.13"
rustls = { version = "0.23.13", features = ["zlib"] }
rustls-pemfile = "2.1.3"
tokio = { version = "1.40.0", features = ["net", "macros", "rt-multi-thread"] }
tokio-rustls = "0.26.0"
tokio-stream = { version = "0.1.16", features = ["net"] }
tokio-util = "0.7.12"
tower = { version = "0.5.1", features = ["util"] }
tracing = "0.1.40"
jemallocator = { version = "0.5", optional = true }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] }
hyper = { version = "1.4.1", features = ["client"] }
tokio = { version = "1.0", features = ["rt", "net", "test-util"] }
tokio = { version = "1.40", features = ["rt", "net", "test-util"] }
tokio-util = { version = "0.7", features = ["compat"] }
tracing-subscriber = "0.3.18"

[[bench]]
name = "hello_world_tower_hyper_tls_tcp"
harness = false

[features]
jemalloc = ["jemallocator"]
11 changes: 9 additions & 2 deletions benches/hello_world_tower_hyper_tls_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ async fn setup_server() -> Result<
config.max_fragment_size = Some(16384); // Larger fragment size for powerful servers
config.send_half_rtt_data = true; // Enable 0.5-RTT data
config.session_storage = ServerSessionMemoryCache::new(10240); // Larger session cache
config.cert_compression_cache = Arc::new(rustls::compress::CompressionCache::default());
config.max_early_data_size = 16384; // Enable 0-RTT data

let tls_config = Arc::new(config);
Expand Down Expand Up @@ -154,9 +155,15 @@ fn bench_server(c: &mut Criterion) {
let mut root_cert_store = RootCertStore::empty();
root_cert_store.add_parsable_certificates(load_certs("examples/sample.pem").unwrap());

let client_config = ClientConfig::builder()
let mut client_config = ClientConfig::builder()
.with_root_certificates(root_cert_store)
.with_no_client_auth();
// Enable handshake resumption
client_config.resumption = rustls::client::Resumption::in_memory_sessions(10240);
client_config.cert_compression_cache =
Arc::new(rustls::compress::CompressionCache::default());
client_config.max_fragment_size = Some(16384); // Larger fragment size for powerful servers
client_config.enable_early_data = true; // Enable 0-RTT data

let https = HttpsConnectorBuilder::new()
.with_tls_config(client_config)
Expand Down Expand Up @@ -198,7 +205,7 @@ fn bench_server(c: &mut Criterion) {
});

// Concurrency stress test
let concurrent_requests = vec![1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987]; // Fibonacci sequence
let concurrent_requests = vec![1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987]; // log sequence
for &num_requests in &concurrent_requests {
group.throughput(Throughput::Elements(num_requests as u64));
group.bench_with_input(
Expand Down
33 changes: 33 additions & 0 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::fuse::Fuse;
///
/// * `wait_for` - An `Option<Duration>` specifying how long to sleep.
/// If `None`, the function will wait indefinitely.
#[inline]
async fn sleep_or_pending(wait_for: Option<Duration>) {
match wait_for {
Some(wait) => sleep(wait).await,
Expand Down Expand Up @@ -55,6 +56,7 @@ async fn sleep_or_pending(wait_for: Option<Duration>) {
/// * `watcher`: An optional `tokio::sync::watch::Receiver` for graceful shutdown signaling.
/// * `max_connection_age`: An optional `Duration` specifying the maximum age of the connection
/// before initiating a graceful shutdown.
#[inline]
pub async fn serve_http_connection<B, IO, S, E>(
hyper_io: IO,
hyper_service: S,
Expand All @@ -79,6 +81,36 @@ pub async fn serve_http_connection<B, IO, S, E>(
inner: watcher.as_mut().map(|w| w.changed()),
});

let builder = builder.clone();
// TODO(How to accept a preconfigured builder)
// The API here for hyper_util is poor.
// Really what you want to do is configure a builder like this
// and pass it in for use as a builder, however, you cannot
// the simple way may be to require configuration and
// then accept an immutable reference to an http2 connection builder
let mut builder = builder.clone();
builder
// HTTP/1 settings
.http1()
.half_close(true)
.keep_alive(true)
.max_buf_size(64 * 1024)
.pipeline_flush(true)
.preserve_header_case(true)
.title_case_headers(false)
// HTTP/2 settings
.http2()
.initial_stream_window_size(Some(1024 * 1024))
.initial_connection_window_size(Some(2 * 1024 * 1024))
.adaptive_window(true)
.max_frame_size(Some(16 * 1024))
.max_concurrent_streams(Some(1000))
.max_send_buf_size(1024 * 1024)
.enable_connect_protocol()
.max_header_list_size(16 * 1024)
.keep_alive_interval(Some(Duration::from_secs(20)))
.keep_alive_timeout(Duration::from_secs(20));

// Create and pin the HTTP connection
let mut conn = pin!(builder.serve_connection_with_upgrades(hyper_io, hyper_service));

Expand Down Expand Up @@ -332,6 +364,7 @@ pub async fn serve_http_connection<B, IO, S, E>(
/// - The server will continue to accept new connections until the `signal` future resolves.
/// - When using TLS, make sure to provide a properly configured `ServerConfig`.
/// - The function will return when all connections have been closed after the shutdown signal.
#[inline]
pub async fn serve_http_with_shutdown<E, F, I, IO, IE, ResBody, S>(
service: S,
incoming: I,
Expand Down
6 changes: 6 additions & 0 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ impl<IO> AsyncRead for Transport<IO>
where
IO: AsyncRead + AsyncWrite + Unpin,
{
#[inline]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -39,6 +40,7 @@ impl<IO> AsyncWrite for Transport<IO>
where
IO: AsyncRead + AsyncWrite + Unpin,
{
#[inline]
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -50,20 +52,23 @@ where
}
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.get_mut() {
Transport::Plain(io) => Pin::new(io).poll_flush(cx),
Transport::Tls(io) => Pin::new(io).poll_flush(cx),
}
}

#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.get_mut() {
Transport::Plain(io) => Pin::new(io).poll_shutdown(cx),
Transport::Tls(io) => Pin::new(io).poll_shutdown(cx),
}
}

#[inline]
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -75,6 +80,7 @@ where
}
}

#[inline]
fn is_write_vectored(&self) -> bool {
match self {
Transport::Plain(io) => io.is_write_vectored(),
Expand Down
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
#[cfg(all(feature = "jemalloc", not(target_env = "msvc")))]
use jemallocator::Jemalloc;

#[cfg(all(feature = "jemalloc", not(target_env = "msvc")))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

pub use error::{Error as TransportError, Kind as TransportErrorKind};
pub use http::serve_http_connection;
pub use http::serve_http_with_shutdown;
Expand Down
1 change: 1 addition & 0 deletions src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ fn handle_accept_error(e: impl Into<Error>) -> ControlFlow<Error> {
/// This function uses `handle_accept_error` to determine whether to continue accepting
/// connections after an error occurs. Non-fatal errors are logged and skipped, while
/// fatal errors cause the stream to yield an error and terminate.
#[inline]
pub fn serve_tcp_incoming<IO, IE>(
incoming: impl Stream<Item = Result<IO, IE>> + Send + 'static,
) -> impl Stream<Item = Result<IO, crate::Error>>
Expand Down
3 changes: 3 additions & 0 deletions src/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tokio_stream::{Stream, StreamExt};
///
/// - If the input `tcp_stream` yields an error, that error is propagated.
/// - If the TLS handshake fails, the error is wrapped in the crate's `Error` type.
#[inline]
pub fn serve_tls_incoming<IO>(
tcp_stream: impl Stream<Item = Result<IO, Error>>,
tls: TlsAcceptor,
Expand Down Expand Up @@ -71,6 +72,7 @@ where
/// # Returns
///
/// A `Result` containing a vector of `CertificateDer` on success, or an `io::Error` on failure.
#[inline]
pub fn load_certs(filename: &str) -> io::Result<Vec<CertificateDer<'static>>> {
// Open certificate file
let certfile = fs::File::open(filename)?;
Expand All @@ -92,6 +94,7 @@ pub fn load_certs(filename: &str) -> io::Result<Vec<CertificateDer<'static>>> {
/// # Returns
///
/// A `Result` containing a `PrivateKeyDer` on success, or an `io::Error` on failure.
#[inline]
pub fn load_private_key(filename: &str) -> io::Result<PrivateKeyDer<'static>> {
// Open keyfile
let keyfile = fs::File::open(filename)?;
Expand Down

0 comments on commit 091f0d8

Please sign in to comment.