diff --git a/benches/hello_world_tower_hyper_tls_tcp.rs b/benches/hello_world_tower_hyper_tls_tcp.rs index dfcdc9a..4312419 100644 --- a/benches/hello_world_tower_hyper_tls_tcp.rs +++ b/benches/hello_world_tower_hyper_tls_tcp.rs @@ -268,8 +268,8 @@ async fn start_server( shutdown_rx.await.ok(); }), ) - .await - .unwrap(); + .await + .unwrap(); }); Ok((server_addr, shutdown_tx)) } diff --git a/src/http.rs b/src/http.rs index 6957369..d4d66ac 100644 --- a/src/http.rs +++ b/src/http.rs @@ -36,9 +36,9 @@ async fn sleep_or_pending(wait_for: Option) { }; } -/// Serves a single HTTP connection from a hyper service backend. +/// Serves HTTP an HTTP connection on the transport from a hyper service backend. /// -/// This method handles an individual HTTP connection, processing requests through +/// This method handles an HTTP connection on a given transport `IO`, processing requests through /// the provided service and managing the connection lifecycle. /// /// # Type Parameters @@ -61,93 +61,88 @@ pub async fn serve_http_connection( hyper_io: IO, hyper_service: S, builder: HttpConnectionBuilder, - mut watcher: Option>, + watcher: Option>, max_connection_age: Option, ) where B: Body + Send + 'static, B::Data: Send, B::Error: Into> + Send + Sync, IO: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static, - S: Service, Response = Response> + Clone + Send + 'static, + S: Service, Response=Response> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into> + Send, E: HttpServerConnExec + Send + Sync + 'static, { - // Spawn a new asynchronous task to handle the incoming hyper IO stream - tokio::spawn(async move { - { - // Set up a fused future for the watcher - let mut sig = pin!(crate::fuse::Fuse { - inner: watcher.as_mut().map(|w| w.changed()), - }); + // Set up a fused future for the watcher + let mut watcher = watcher.clone(); + let mut sig = pin!(Fuse { + inner: watcher.as_mut().map(|w| w.changed()), + }); - let builder = builder.clone(); - // TODO(How to accept a preconfigured builder) - // The API here for hyper_util is poor. - // Really what you want to do is configure a builder like this - // and pass it in for use as a builder, however, you cannot - // the simple way may be to require configuration and - // then accept an immutable reference to an http2 connection builder - let mut builder = builder.clone(); - builder - // HTTP/1 settings - .http1() - .half_close(true) - .keep_alive(true) - .max_buf_size(64 * 1024) - .pipeline_flush(true) - .preserve_header_case(true) - .title_case_headers(false) - // HTTP/2 settings - .http2() - .initial_stream_window_size(Some(1024 * 1024)) - .initial_connection_window_size(Some(2 * 1024 * 1024)) - .adaptive_window(true) - .max_frame_size(Some(16 * 1024)) - .max_concurrent_streams(Some(1000)) - .max_send_buf_size(1024 * 1024) - .enable_connect_protocol() - .max_header_list_size(16 * 1024) - .keep_alive_interval(Some(Duration::from_secs(20))) - .keep_alive_timeout(Duration::from_secs(20)); - - // Create and pin the HTTP connection - let mut conn = pin!(builder.serve_connection_with_upgrades(hyper_io, hyper_service)); - - // Set up the sleep future for max connection age - let sleep = sleep_or_pending(max_connection_age); - tokio::pin!(sleep); - - // Main loop for serving the HTTP connection - loop { - tokio::select! { - // Handle the connection result - rv = &mut conn => { - if let Err(err) = rv { - // Log any errors that occur while serving the HTTP connection - debug!("failed serving HTTP connection: {:#}", err); - } - break; - }, - // Handle max connection age timeout - _ = &mut sleep => { - // Initiate a graceful shutdown when max connection age is reached - conn.as_mut().graceful_shutdown(); - sleep.set(sleep_or_pending(None)); - }, - // Handle graceful shutdown signal - _ = &mut sig => { - // Initiate a graceful shutdown when signal is received - conn.as_mut().graceful_shutdown(); - } + // Set up the sleep future for max connection age + let sleep = sleep_or_pending(max_connection_age); + tokio::pin!(sleep); + + // TODO(This builder should be pre-configured outside of the server) + // unfortunately this object is very poorly designed and there is + // no way exposed to pre-configure it. + // + // There must be some way to approach here. + let builder = builder.clone(); + // Configure the builder + let mut builder = builder.clone(); + builder + // HTTP/1 settings + .http1() + .half_close(true) + .keep_alive(true) + .max_buf_size(64 * 1024) + .pipeline_flush(true) + .preserve_header_case(true) + .title_case_headers(false) + // HTTP/2 settings + .http2() + .initial_stream_window_size(Some(1024 * 1024)) + .initial_connection_window_size(Some(2 * 1024 * 1024)) + .adaptive_window(true) + .max_frame_size(Some(16 * 1024)) + .max_concurrent_streams(Some(1000)) + .max_send_buf_size(1024 * 1024) + .enable_connect_protocol() + .max_header_list_size(16 * 1024) + .keep_alive_interval(Some(Duration::from_secs(20))) + .keep_alive_timeout(Duration::from_secs(20)); + + // Create and pin the HTTP connection + // This handles all the HTTP connection logic via hyper + let mut conn = pin!(builder.serve_connection_with_upgrades(hyper_io, hyper_service)); + + // Here we wait for the http connection to terminate + loop { + tokio::select! { + // Handle the connection result + rv = &mut conn => { + if let Err(err) = rv { + // Log any errors that occur while serving the HTTP connection + debug!("failed serving HTTP connection: {:#}", err); } + break; + }, + // Handle max connection age timeout + _ = &mut sleep => { + // Initiate a graceful shutdown when max connection age is reached + conn.as_mut().graceful_shutdown(); + sleep.set(sleep_or_pending(None)); + }, + // Handle graceful shutdown signal + _ = &mut sig => { + // Initiate a graceful shutdown when signal is received + conn.as_mut().graceful_shutdown(); } } + } - // Clean up and log connection closure - drop(watcher); - trace!("HTTP connection closed"); - }); + trace!("HTTP connection closed"); } /// Serves HTTP/HTTPS requests with graceful shutdown capability. @@ -373,85 +368,149 @@ pub async fn serve_http_with_shutdown( signal: Option, ) -> Result<(), super::Error> where - F: Future + Send + 'static, - I: Stream> + Send + 'static, + F: Future + Send + 'static, + I: Stream> + Send + 'static, IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, IE: Into + Send + 'static, - S: Service, Response = Response> + Clone + Send + 'static, + S: Service, Response=Response> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into> + Send, - ResBody: Body + Send + Sync + 'static, + ResBody: Body + Send + Sync + 'static, ResBody::Error: Into + Send + Sync, E: HttpServerConnExec + Send + Sync + 'static, { - // Prepare the incoming stream of TCP connections - let incoming = crate::tcp::serve_tcp_incoming(incoming); - - // Create a channel for signaling graceful shutdown + // Create a channel for signaling graceful shutdown to listening connections let (signal_tx, signal_rx) = tokio::sync::watch::channel(()); let signal_tx = Arc::new(signal_tx); + // We say that graceful shutdown is enabled if a signal is provided let graceful = signal.is_some(); + + // The signal future that will resolve when the server should shut down let mut sig = pin!(Fuse { inner: signal }); + + // Prepare the incoming stream of TCP connections + // from the provided stream of IO objects, which is coming + // most likely from a TCP stream. + let incoming = crate::tcp::serve_tcp_incoming(incoming); + + // Pin the incoming stream to the stack let mut incoming = pin!(incoming); // Create TLS acceptor if TLS config is provided let tls_acceptor = tls_config.map(TlsAcceptor::from); - // Main server loop + // Enter the main server loop loop { + // Select between the future which returns first, + // A shutdown signal or an incoming IO result. tokio::select! { - // Handle shutdown signal + // Check if we received a graceful shutdown signal for the server _ = &mut sig => { + // Exit the loop if we did, and shut down the server trace!("signal received, shutting down"); break; }, - // Handle incoming connections + // Wait for the next IO result from the incoming stream io = incoming.next() => { + // If we got an IO result from the incoming stream + // This effectively demultiplexes the incoming stream of IO objects, + // which each represent a connection which may then be individually + // streamed/handled. + // + // So this is effectively a demultiplexer for the incoming stream of IO objects. + // + // Because of the way the stream handling is implemented, + // the responses are multiplexed back over the same stream to the client. + // However, that would not be intuitive just from looking it this code + // because the reverse multiplexing is "invisible" to the reader. let io = match io { + // We check if it's a valid stream Some(Ok(io)) => io, + // or if it's a non-fatal error Some(Err(e)) => { trace!("error accepting connection: {:#}", e); + // if it's a non-fatal error, we continue processing IO objects continue; }, None => { + // If we got a fatal error, meaning we lost connection or something else + // we break out of the loop break }, }; - trace!("connection accepted"); - - // Prepare the connection for hyper - let transport = if let Some(tls_acceptor) = &tls_acceptor { - match tls_acceptor.accept(io).await { - Ok(tls_stream) => Transport::new_tls(tls_stream), - Err(e) => { - debug!("TLS handshake failed: {:#}", e); - continue; - } - } - } else { - Transport::new_plain(io) - }; - - let hyper_io = TokioIo::new(transport); - let hyper_svc = service.clone(); - - // Serve the HTTP connection - serve_http_connection( - hyper_io, - hyper_svc, - builder.clone(), - graceful.then(|| signal_rx.clone()), - None - ).await; + trace!("TCP streaming connection accepted"); + + // For each of these TCP streams, we are going to want to + // spawn a new task to handle the connection. + + // Clone necessary values for the spawned task + let service = service.clone(); + let builder = builder.clone(); + let tls_acceptor = tls_acceptor.clone(); + let signal_rx = signal_rx.clone(); + + // Spawn a new task to handle this connection + tokio::spawn(async move { + // Abstract the transport layer for hyper + + let transport = if let Some(tls_acceptor) = &tls_acceptor { + // If TLS is enabled, then we perform a TLS handshake + // Clone the TLS acceptor and IO for use in the blocking task + let tls_acceptor = tls_acceptor.clone(); + let io = io; + + match tokio::task::spawn_blocking(move || { + // Perform the TLS handshake in a blocking task + // Because this is one of the most computationally heavy things the sever does. + // In the case of ECDSA and very fast handshakes, this has more downside + // than upside, but in the case of RSA and slow handshakes, this is a good idea. + // It amortizes out to about 2 µs of overhead per connection. + // and moves this computationally heavy task off the main thread pool. + tokio::runtime::Handle::current().block_on(tls_acceptor.accept(io)) + }).await { + // Handle the result of the TLS handshake + Ok(Ok(tls_stream)) => Transport::new_tls(tls_stream), + Ok(Err(e)) => { + // This connection failed to handshake + debug!("TLS handshake failed: {:#}", e); + return; + }, + Err(e) => { + // This connection was malformed and the server was unable to handle it + debug!("TLS handshake task panicked: {:#}", e); + return; + } + + } + } + else { + // If TLS is not enabled, then we use a plain transport + Transport::new_plain(io) + }; + + // Convert our abstracted tokio transport into a hyper transport + let hyper_io = TokioIo::new(transport); + + // Serve the HTTP connections on this transport + serve_http_connection( + hyper_io, + service, + builder, + graceful.then(|| signal_rx), + None + ).await; + }); } } } // Handle graceful shutdown if graceful { + // Broadcast the shutdown signal to all connections let _ = signal_tx.send(()); + // Drop the sender to signal that no more connections will be accepted drop(signal_rx); trace!( "waiting for {} connections to close", @@ -459,6 +518,7 @@ where ); // Wait for all connections to close + // TODO(Add a timeout here, optionally) signal_tx.closed().await; } @@ -688,7 +748,7 @@ mod tests { } } }) - .await; + .await; match shutdown_result { Ok(Ok(())) => println!("Server shut down successfully"),