Skip to content

Commit

Permalink
docs: document io
Browse files Browse the repository at this point in the history
  • Loading branch information
0xAlcibiades committed Sep 13, 2024
1 parent 59ef595 commit 8e2a506
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 40 deletions.
39 changes: 26 additions & 13 deletions benches/hello_world_tower_hyper_tls_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,24 +340,37 @@ fn bench_server(c: &mut Criterion) {

let https = HttpsConnectorBuilder::new()
.with_tls_config(tls_config.client_config)
.https_or_http()
.enable_all_versions()
.https_only()
.enable_http2()
.build();

let client = Client::builder(TokioExecutor::new())
// HTTP/2 settings
.http2_only(true) // Force HTTP/2 for consistent benchmarking and to match server config
.http2_initial_stream_window_size(2 * 1024 * 1024) // 2MB, matches server setting for better flow control
.http2_initial_connection_window_size(4 * 1024 * 1024) // 4MB, matches server setting for improved throughput
.http2_adaptive_window(true) // Enable dynamic flow control to optimize performance under varying conditions
.http2_max_frame_size(32 * 1024) // 32KB, matches server setting for larger data chunks
.http2_keep_alive_interval(Duration::from_secs(10)) // Maintain connection health, matching server's 10-second interval
.http2_keep_alive_timeout(Duration::from_secs(30)) // Allow time for keep-alive response, matching server's 30-second timeout
.http2_max_concurrent_reset_streams(2000) // Match server's max concurrent streams for better parallelism
.http2_max_send_buf_size(2 * 1024 * 1024) // 2MB, matches server setting for improved write performance
.http2_only(true)
// Ensures all connections use HTTP/2 protocol
.http2_initial_stream_window_size(4 * 1024 * 1024)
// Sets initial HTTP/2 stream flow control window to 4MB
.http2_initial_connection_window_size(8 * 1024 * 1024)
// Sets initial HTTP/2 connection flow control window to 8MB
.http2_adaptive_window(true)
// Enables dynamic adjustment of flow control window based on network conditions
.http2_max_frame_size(1024 * 1024)
// Sets maximum HTTP/2 frame size to 1MB
.http2_keep_alive_interval(Duration::from_secs(30))
// Sends keep-alive pings every 30 seconds
.http2_keep_alive_timeout(Duration::from_secs(60))
// Allows 60 seconds for keep-alive responses before timing out
.http2_max_concurrent_reset_streams(250)
// Limits the number of concurrent streams per connection to 250
.http2_max_send_buf_size(4 * 1024 * 1024)
// Sets maximum send buffer size to 4MB
// Connection pooling settings
.pool_idle_timeout(Duration::from_secs(90)) // Keep connections alive longer for reuse in benchmarks
.pool_max_idle_per_host(2000) // Match max concurrent streams to fully utilize HTTP/2 multiplexing
.pool_idle_timeout(Duration::from_secs(60))
// Keeps idle connections alive for 60 seconds
.pool_max_idle_per_host(32)
// Sets maximum number of idle connections per host to 32
// This is key, you have a lot of pain in store at runtime if you
// don't set these.
.timer(TokioTimer::new())
.pool_timer(TokioTimer::new())
.build(https);
Expand Down
44 changes: 20 additions & 24 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ pub async fn serve_http_connection<B, IO, S, E>(
// this builder doesn't have a way to convert back to a builder
// once you start building.

// Configure the builder
let mut builder = builder.clone();
builder
// HTTP/1 settings
Expand All @@ -100,41 +99,38 @@ pub async fn serve_http_connection<B, IO, S, E>(
.half_close(true)
// Enable keep-alive to reduce overhead for multiple requests
.keep_alive(true)
// Increase max buffer size to 256KB for better performance with larger payloads
.max_buf_size(256 * 1024)
// Enable immediate flushing of pipelined responses
// Increase max buffer size to 1MB for better performance with larger payloads
.max_buf_size(1024 * 1024)
// Enable immediate flushing of pipelined responses for lower latency
.pipeline_flush(true)
// Preserve original header case for compatibility
.preserve_header_case(true)
// Disable automatic title casing of headers to reduce processing overhead
.title_case_headers(false)
// HTTP/2 settings
.http2()
// Add the timer to the builder
// This will cause you all sorts of pain otherwise
// https://github.com/seanmonstar/reqwest/issues/2421
// https://github.com/rustls/hyper-rustls/issues/287
// Add the timer to the builder to avoid potential issues
.timer(TokioTimer::new())
// Increase initial stream window size to 2MB for better throughput
.initial_stream_window_size(Some(2 * 1024 * 1024))
// Increase initial connection window size to 4MB for improved performance
.initial_connection_window_size(Some(4 * 1024 * 1024))
// Increase initial stream window size to 4MB for better throughput
.initial_stream_window_size(Some(4 * 1024 * 1024))
// Increase initial connection window size to 8MB for improved performance
.initial_connection_window_size(Some(8 * 1024 * 1024))
// Enable adaptive window for dynamic flow control
.adaptive_window(true)
// Increase max frame size to 32KB for larger data chunks
.max_frame_size(Some(32 * 1024))
// Allow up to 2000 concurrent streams for better parallelism
.max_concurrent_streams(Some(2000))
// Increase max send buffer size to 2MB for improved write performance
.max_send_buf_size(2 * 1024 * 1024)
// Increase max frame size to 1MB for larger data chunks
.max_frame_size(Some(1024 * 1024))
// Allow up to 250 concurrent streams for better parallelism without overwhelming the connection
.max_concurrent_streams(Some(250))
// Increase max send buffer size to 4MB for improved write performance
.max_send_buf_size(4 * 1024 * 1024)
// Enable CONNECT protocol support for proxying and tunneling
.enable_connect_protocol()
// Increase max header list size to 32KB to handle larger headers
.max_header_list_size(32 * 1024)
// Set keep-alive interval to 10 seconds for more responsive connection management
.keep_alive_interval(Some(Duration::from_secs(10)))
// Set keep-alive timeout to 30 seconds to balance connection reuse and resource conservation
.keep_alive_timeout(Duration::from_secs(30));
// Increase max header list size to 64KB to handle larger headers
.max_header_list_size(64 * 1024)
// Set keep-alive interval to 30 seconds for more responsive connection management
.keep_alive_interval(Some(Duration::from_secs(30)))
// Set keep-alive timeout to 60 seconds to balance connection reuse and resource conservation
.keep_alive_timeout(Duration::from_secs(60));

// Create and pin the HTTP connection
//
Expand Down
99 changes: 98 additions & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,74 @@
//! This module provides an abstraction layer for handling both plain TCP and TLS connections.
//! It allows higher-level code to work with a unified `Transport` type, regardless of whether
//! the underlying connection is encrypted or not.

use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::server::TlsStream;

/// Represents either a plain TCP connection or a TLS-encrypted connection.
///
/// This enum allows the rest of the application to work with connections
/// without needing to know whether they're encrypted or not.
pub(crate) enum Transport<IO> {
/// Represents a plain, unencrypted connection.
Plain(IO),
/// Represents a TLS-encrypted connection.
/// We use `Box` here to keep the enum size constant, as `TlsStream` might be larger than `IO`.
Tls(Box<TlsStream<IO>>),
}

impl<IO> Transport<IO> {
/// Creates a new `Transport` instance for a plain, unencrypted connection.
///
/// # Arguments
///
/// * `io` - The I/O object representing the connection.
///
/// # Returns
///
/// Returns a `Transport::Plain` variant containing the provided I/O object.
#[inline]
pub(crate) fn new_plain(io: IO) -> Self {
Self::Plain(io)
}

/// Creates a new `Transport` instance for a TLS-encrypted connection.
///
/// # Arguments
///
/// * `io` - The `TlsStream` object representing the encrypted connection.
///
/// # Returns
///
/// Returns a `Transport::Tls` variant containing the provided TLS stream.
#[inline]
pub(crate) fn new_tls(io: TlsStream<IO>) -> Self {
Self::Tls(Box::from(io))
Self::Tls(Box::new(io))
}
}

/// Implementation of `AsyncRead` for `Transport`.
///
/// This allows `Transport` to be used in asynchronous read operations,
/// regardless of whether it's a plain or TLS connection.
impl<IO> AsyncRead for Transport<IO>
where
IO: AsyncRead + AsyncWrite + Unpin,
{
/// Attempts to read from the transport into the provided buffer.
///
/// # Arguments
///
/// * `self` - Pinned mutable reference to the transport.
/// * `cx` - The context for the current task.
/// * `buf` - The buffer to read into.
///
/// # Returns
///
/// Returns a `Poll` indicating whether the operation is complete or pending.
#[inline]
fn poll_read(
self: Pin<&mut Self>,
Expand All @@ -36,10 +82,25 @@ where
}
}

/// Implementation of `AsyncWrite` for `Transport`.
///
/// This allows `Transport` to be used in asynchronous write operations,
/// regardless of whether it's a plain or TLS connection.
impl<IO> AsyncWrite for Transport<IO>
where
IO: AsyncRead + AsyncWrite + Unpin,
{
/// Attempts to write data from the provided buffer into the transport.
///
/// # Arguments
///
/// * `self` - Pinned mutable reference to the transport.
/// * `cx` - The context for the current task.
/// * `buf` - The buffer containing data to write.
///
/// # Returns
///
/// Returns a `Poll` containing the number of bytes written if successful.
#[inline]
fn poll_write(
self: Pin<&mut Self>,
Expand All @@ -52,6 +113,16 @@ where
}
}

/// Attempts to flush the transport, ensuring all intermediately buffered contents reach their destination.
///
/// # Arguments
///
/// * `self` - Pinned mutable reference to the transport.
/// * `cx` - The context for the current task.
///
/// # Returns
///
/// Returns a `Poll` indicating whether the flush operation is complete or pending.
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.get_mut() {
Expand All @@ -60,6 +131,16 @@ where
}
}

/// Attempts to shut down the transport.
///
/// # Arguments
///
/// * `self` - Pinned mutable reference to the transport.
/// * `cx` - The context for the current task.
///
/// # Returns
///
/// Returns a `Poll` indicating whether the shutdown operation is complete or pending.
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self.get_mut() {
Expand All @@ -68,6 +149,17 @@ where
}
}

/// Attempts to write a sequence of buffers to the transport.
///
/// # Arguments
///
/// * `self` - Pinned mutable reference to the transport.
/// * `cx` - The context for the current task.
/// * `bufs` - A slice of `IoSlice`s to write.
///
/// # Returns
///
/// Returns a `Poll` containing the number of bytes written if successful.
#[inline]
fn poll_write_vectored(
self: Pin<&mut Self>,
Expand All @@ -80,6 +172,11 @@ where
}
}

/// Determines whether this transport can write vectored data efficiently.
///
/// # Returns
///
/// Returns `true` if the transport supports vectored writing, `false` otherwise.
#[inline]
fn is_write_vectored(&self) -> bool {
match self {
Expand Down
4 changes: 2 additions & 2 deletions src/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ mod tests {
let server_task = tokio::spawn(async move {
debug!("Server task started");
let mut tls_stream = Box::pin(serve_tls_incoming(tcp_incoming, tls_acceptor));
tokio::time::timeout(std::time::Duration::from_millis(10), tls_stream.next()).await
tokio::time::timeout(std::time::Duration::from_millis(1), tls_stream.next()).await
});

let untrusted_client_config = ClientConfig::builder()
Expand Down Expand Up @@ -392,7 +392,7 @@ mod tests {
debug!("Server task started");
let mut tls_stream = Box::pin(serve_tls_incoming(tcp_incoming, tls_acceptor));
let result =
tokio::time::timeout(std::time::Duration::from_millis(10), tls_stream.next()).await;
tokio::time::timeout(std::time::Duration::from_millis(1), tls_stream.next()).await;
debug!("Server task completed with result: {:?}", result.is_err());
result
});
Expand Down

0 comments on commit 8e2a506

Please sign in to comment.