Skip to content

Commit

Permalink
fix: optimize server for generality
Browse files Browse the repository at this point in the history
  • Loading branch information
0xAlcibiades committed Sep 12, 2024
1 parent e3b3425 commit ff7b439
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 114 deletions.
4 changes: 2 additions & 2 deletions benches/hello_world_tower_hyper_tls_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ async fn start_server(
shutdown_rx.await.ok();
}),
)
.await
.unwrap();
.await
.unwrap();
});
Ok((server_addr, shutdown_tx))
}
Expand Down
284 changes: 172 additions & 112 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ async fn sleep_or_pending(wait_for: Option<Duration>) {
};
}

/// Serves a single HTTP connection from a hyper service backend.
/// Serves HTTP an HTTP connection on the transport from a hyper service backend.
///
/// This method handles an individual HTTP connection, processing requests through
/// This method handles an HTTP connection on a given transport `IO`, processing requests through
/// the provided service and managing the connection lifecycle.
///
/// # Type Parameters
Expand All @@ -61,93 +61,88 @@ pub async fn serve_http_connection<B, IO, S, E>(
hyper_io: IO,
hyper_service: S,
builder: HttpConnectionBuilder<E>,
mut watcher: Option<tokio::sync::watch::Receiver<()>>,
watcher: Option<tokio::sync::watch::Receiver<()>>,
max_connection_age: Option<Duration>,
) where
B: Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
IO: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
S: Service<Request<Incoming>, Response = Response<B>> + Clone + Send + 'static,
S: Service<Request<Incoming>, Response=Response<B>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
E: HttpServerConnExec<S::Future, B> + Send + Sync + 'static,
{
// Spawn a new asynchronous task to handle the incoming hyper IO stream
tokio::spawn(async move {
{
// Set up a fused future for the watcher
let mut sig = pin!(crate::fuse::Fuse {
inner: watcher.as_mut().map(|w| w.changed()),
});
// Set up a fused future for the watcher
let mut watcher = watcher.clone();
let mut sig = pin!(Fuse {
inner: watcher.as_mut().map(|w| w.changed()),
});

let builder = builder.clone();
// TODO(How to accept a preconfigured builder)
// The API here for hyper_util is poor.
// Really what you want to do is configure a builder like this
// and pass it in for use as a builder, however, you cannot
// the simple way may be to require configuration and
// then accept an immutable reference to an http2 connection builder
let mut builder = builder.clone();
builder
// HTTP/1 settings
.http1()
.half_close(true)
.keep_alive(true)
.max_buf_size(64 * 1024)
.pipeline_flush(true)
.preserve_header_case(true)
.title_case_headers(false)
// HTTP/2 settings
.http2()
.initial_stream_window_size(Some(1024 * 1024))
.initial_connection_window_size(Some(2 * 1024 * 1024))
.adaptive_window(true)
.max_frame_size(Some(16 * 1024))
.max_concurrent_streams(Some(1000))
.max_send_buf_size(1024 * 1024)
.enable_connect_protocol()
.max_header_list_size(16 * 1024)
.keep_alive_interval(Some(Duration::from_secs(20)))
.keep_alive_timeout(Duration::from_secs(20));

// Create and pin the HTTP connection
let mut conn = pin!(builder.serve_connection_with_upgrades(hyper_io, hyper_service));

// Set up the sleep future for max connection age
let sleep = sleep_or_pending(max_connection_age);
tokio::pin!(sleep);

// Main loop for serving the HTTP connection
loop {
tokio::select! {
// Handle the connection result
rv = &mut conn => {
if let Err(err) = rv {
// Log any errors that occur while serving the HTTP connection
debug!("failed serving HTTP connection: {:#}", err);
}
break;
},
// Handle max connection age timeout
_ = &mut sleep => {
// Initiate a graceful shutdown when max connection age is reached
conn.as_mut().graceful_shutdown();
sleep.set(sleep_or_pending(None));
},
// Handle graceful shutdown signal
_ = &mut sig => {
// Initiate a graceful shutdown when signal is received
conn.as_mut().graceful_shutdown();
}
// Set up the sleep future for max connection age
let sleep = sleep_or_pending(max_connection_age);
tokio::pin!(sleep);

// TODO(This builder should be pre-configured outside of the server)
// unfortunately this object is very poorly designed and there is
// no way exposed to pre-configure it.
//
// There must be some way to approach here.
let builder = builder.clone();
// Configure the builder
let mut builder = builder.clone();
builder
// HTTP/1 settings
.http1()
.half_close(true)
.keep_alive(true)
.max_buf_size(64 * 1024)
.pipeline_flush(true)
.preserve_header_case(true)
.title_case_headers(false)
// HTTP/2 settings
.http2()
.initial_stream_window_size(Some(1024 * 1024))
.initial_connection_window_size(Some(2 * 1024 * 1024))
.adaptive_window(true)
.max_frame_size(Some(16 * 1024))
.max_concurrent_streams(Some(1000))
.max_send_buf_size(1024 * 1024)
.enable_connect_protocol()
.max_header_list_size(16 * 1024)
.keep_alive_interval(Some(Duration::from_secs(20)))
.keep_alive_timeout(Duration::from_secs(20));

// Create and pin the HTTP connection
// This handles all the HTTP connection logic via hyper
let mut conn = pin!(builder.serve_connection_with_upgrades(hyper_io, hyper_service));

// Here we wait for the http connection to terminate
loop {
tokio::select! {
// Handle the connection result
rv = &mut conn => {
if let Err(err) = rv {
// Log any errors that occur while serving the HTTP connection
debug!("failed serving HTTP connection: {:#}", err);
}
break;
},
// Handle max connection age timeout
_ = &mut sleep => {
// Initiate a graceful shutdown when max connection age is reached
conn.as_mut().graceful_shutdown();
sleep.set(sleep_or_pending(None));
},
// Handle graceful shutdown signal
_ = &mut sig => {
// Initiate a graceful shutdown when signal is received
conn.as_mut().graceful_shutdown();
}
}
}

// Clean up and log connection closure
drop(watcher);
trace!("HTTP connection closed");
});
trace!("HTTP connection closed");
}

/// Serves HTTP/HTTPS requests with graceful shutdown capability.
Expand Down Expand Up @@ -373,92 +368,157 @@ pub async fn serve_http_with_shutdown<E, F, I, IO, IE, ResBody, S>(
signal: Option<F>,
) -> Result<(), super::Error>
where
F: Future<Output = ()> + Send + 'static,
I: Stream<Item = Result<IO, IE>> + Send + 'static,
F: Future<Output=()> + Send + 'static,
I: Stream<Item=Result<IO, IE>> + Send + 'static,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
IE: Into<crate::Error> + Send + 'static,
S: Service<Request<Incoming>, Response = Response<ResBody>> + Clone + Send + 'static,
S: Service<Request<Incoming>, Response=Response<ResBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
ResBody: Body<Data = Bytes> + Send + Sync + 'static,
ResBody: Body<Data=Bytes> + Send + Sync + 'static,
ResBody::Error: Into<crate::Error> + Send + Sync,
E: HttpServerConnExec<S::Future, ResBody> + Send + Sync + 'static,
{
// Prepare the incoming stream of TCP connections
let incoming = crate::tcp::serve_tcp_incoming(incoming);

// Create a channel for signaling graceful shutdown
// Create a channel for signaling graceful shutdown to listening connections
let (signal_tx, signal_rx) = tokio::sync::watch::channel(());
let signal_tx = Arc::new(signal_tx);

// We say that graceful shutdown is enabled if a signal is provided
let graceful = signal.is_some();

// The signal future that will resolve when the server should shut down
let mut sig = pin!(Fuse { inner: signal });

// Prepare the incoming stream of TCP connections
// from the provided stream of IO objects, which is coming
// most likely from a TCP stream.
let incoming = crate::tcp::serve_tcp_incoming(incoming);

// Pin the incoming stream to the stack
let mut incoming = pin!(incoming);

// Create TLS acceptor if TLS config is provided
let tls_acceptor = tls_config.map(TlsAcceptor::from);

// Main server loop
// Enter the main server loop
loop {
// Select between the future which returns first,
// A shutdown signal or an incoming IO result.
tokio::select! {
// Handle shutdown signal
// Check if we received a graceful shutdown signal for the server
_ = &mut sig => {
// Exit the loop if we did, and shut down the server
trace!("signal received, shutting down");
break;
},
// Handle incoming connections
// Wait for the next IO result from the incoming stream
io = incoming.next() => {
// If we got an IO result from the incoming stream
// This effectively demultiplexes the incoming stream of IO objects,
// which each represent a connection which may then be individually
// streamed/handled.
//
// So this is effectively a demultiplexer for the incoming stream of IO objects.
//
// Because of the way the stream handling is implemented,
// the responses are multiplexed back over the same stream to the client.
// However, that would not be intuitive just from looking it this code
// because the reverse multiplexing is "invisible" to the reader.
let io = match io {
// We check if it's a valid stream
Some(Ok(io)) => io,
// or if it's a non-fatal error
Some(Err(e)) => {
trace!("error accepting connection: {:#}", e);
// if it's a non-fatal error, we continue processing IO objects
continue;
},
None => {
// If we got a fatal error, meaning we lost connection or something else
// we break out of the loop
break
},
};

trace!("connection accepted");

// Prepare the connection for hyper
let transport = if let Some(tls_acceptor) = &tls_acceptor {
match tls_acceptor.accept(io).await {
Ok(tls_stream) => Transport::new_tls(tls_stream),
Err(e) => {
debug!("TLS handshake failed: {:#}", e);
continue;
}
}
} else {
Transport::new_plain(io)
};

let hyper_io = TokioIo::new(transport);
let hyper_svc = service.clone();

// Serve the HTTP connection
serve_http_connection(
hyper_io,
hyper_svc,
builder.clone(),
graceful.then(|| signal_rx.clone()),
None
).await;
trace!("TCP streaming connection accepted");

// For each of these TCP streams, we are going to want to
// spawn a new task to handle the connection.

// Clone necessary values for the spawned task
let service = service.clone();
let builder = builder.clone();
let tls_acceptor = tls_acceptor.clone();
let signal_rx = signal_rx.clone();

// Spawn a new task to handle this connection
tokio::spawn(async move {
// Abstract the transport layer for hyper

let transport = if let Some(tls_acceptor) = &tls_acceptor {
// If TLS is enabled, then we perform a TLS handshake
// Clone the TLS acceptor and IO for use in the blocking task
let tls_acceptor = tls_acceptor.clone();
let io = io;

match tokio::task::spawn_blocking(move || {
// Perform the TLS handshake in a blocking task
// Because this is one of the most computationally heavy things the sever does.
// In the case of ECDSA and very fast handshakes, this has more downside
// than upside, but in the case of RSA and slow handshakes, this is a good idea.
// It amortizes out to about 2 µs of overhead per connection.
// and moves this computationally heavy task off the main thread pool.
tokio::runtime::Handle::current().block_on(tls_acceptor.accept(io))
}).await {
// Handle the result of the TLS handshake
Ok(Ok(tls_stream)) => Transport::new_tls(tls_stream),
Ok(Err(e)) => {
// This connection failed to handshake
debug!("TLS handshake failed: {:#}", e);
return;
},
Err(e) => {
// This connection was malformed and the server was unable to handle it
debug!("TLS handshake task panicked: {:#}", e);
return;
}

}
}
else {
// If TLS is not enabled, then we use a plain transport
Transport::new_plain(io)
};

// Convert our abstracted tokio transport into a hyper transport
let hyper_io = TokioIo::new(transport);

// Serve the HTTP connections on this transport
serve_http_connection(
hyper_io,
service,
builder,
graceful.then(|| signal_rx),
None
).await;
});
}
}
}

// Handle graceful shutdown
if graceful {
// Broadcast the shutdown signal to all connections
let _ = signal_tx.send(());
// Drop the sender to signal that no more connections will be accepted
drop(signal_rx);
trace!(
"waiting for {} connections to close",
signal_tx.receiver_count()
);

// Wait for all connections to close
// TODO(Add a timeout here, optionally)
signal_tx.closed().await;
}

Expand Down Expand Up @@ -688,7 +748,7 @@ mod tests {
}
}
})
.await;
.await;

match shutdown_result {
Ok(Ok(())) => println!("Server shut down successfully"),
Expand Down

0 comments on commit ff7b439

Please sign in to comment.