diff --git a/Cargo.toml b/Cargo.toml index 238761f..ee37a8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,12 +40,9 @@ nix = { version = "0.29.0", features = ["signal"] } [[example]] name = "tower-server1" -path = "examples/tower-server1.rs" [[example]] name = "tower-server2" -path = "examples/tower-server2.rs" [[example]] name = "tower-client" -path = "examples/tower-client.rs" diff --git a/examples/tower-server1.rs b/examples/tower-server1.rs index b587284..84a81c6 100644 --- a/examples/tower-server1.rs +++ b/examples/tower-server1.rs @@ -59,9 +59,9 @@ impl Server { let io = TokioIo::new(stream); // Create a tower service let svc = tower::service_fn(hello); - let svc = ServiceBuilder::new().service(svc); + let remapped = ServiceBuilder::new().service(svc.clone()); // Convert to a hyper service - let svc = TowerToHyperService::new(svc); + let svc = TowerToHyperService::new(remapped); // Create a new service for each connection let conn = http.serve_connection(io, svc); // Watch the connection for graceful shutdown diff --git a/examples/tower-server2.rs b/examples/tower-server2.rs index ea5b8e3..c59a6b3 100644 --- a/examples/tower-server2.rs +++ b/examples/tower-server2.rs @@ -7,137 +7,114 @@ use hyper::{ server::conn::http1, Request, Response, }; -use tower::layer::util::Identity; use hyper_util::rt::TokioIo; use hyper_util::service::TowerToHyperService; use http_body_util::Full; -use std::{convert::Infallible, net::SocketAddr}; -use std::future::Future; -use std::pin::Pin; +use std::{convert::Infallible, future, net::SocketAddr}; use std::sync::Arc; -use http_body::Body; +use hyper::service::Service; use tokio::net::TcpListener; -use tower_service::Service; -use hyper_server::CompositeService; +use tower::layer::util::{Identity, Stack}; +use tower::{Layer, ServiceBuilder}; +use tower::util::BoxService; - -// Type alias for the complex error type -type BoxError = Box; - -// Type alias for the future returned by the service -type BoxFuture = Pin + Send>>; - -// Type alias for the specific service signature -type SpecificService = dyn Service< - Request, - Response = Response>, - Error = BoxError, - Future = BoxFuture>, BoxError>>> + Send + Sync; +type BoxBody = http_body_util::combinators::BoxBody; +pub struct Server { + /// The layer stack that will be applied to each service + builder: Arc, +} -/// Represents our HTTP server. -pub struct Server { - /// The socket address on which the server will listen. - socket_addr: SocketAddr, - services: CompositeService, Request>, +impl Default for Server { + fn default() -> Self { + Self { + builder: Arc::new(tower::layer::util::Identity::new()), + } + } } impl Server { - /// Creates a new Server instance. - /// - /// # Arguments - /// - /// * `addr` - The socket address on which the server will listen. - pub async fn new(addr: SocketAddr) -> Result> { - Ok(Server { - socket_addr: addr, - services: CompositeService::new(), - }) + fn new() -> Self { + Server::default() } +} - /// Add a tower service to the server - pub fn add_service(&mut self, service: S) +impl Server +where + L: Layer, hyper::Response, hyper::Error>> + Send + Sync + 'static, + L::Service: Send + 'static, +{ + /// Add layer + pub fn layer(self, layer: NewLayer) -> Server> where - S: Service< - Request, - Response = Response>, - Error = BoxError, - Future = BoxFuture>, BoxError>> - > + Send + Sync + 'static + NewLayer: Layer, hyper::Response, hyper::Error>> + Send + Sync + 'static, { - self.services.push(Box::new(service) as Box); + Server { + builder: Arc::new(ServiceBuilder::new().layer(layer).chain(Arc::try_unwrap(self.builder).unwrap_or_else(|arc| (*arc).clone()))), + } + } + + /// Add service + pub fn service(self, svc: S) -> Self + where + S: Service, Response = hyper::Response, Error = hyper::Error> + Clone + Send + 'static, + S::Future: Send + 'static, + { + let builder = self.builder.clone(); + Server { + builder: Arc::new(ServiceBuilder::new().service_fn(move |req| { + let svc = builder.service(svc.clone()); + svc.call(req) + })), + } } /// Starts the server and handles incoming connections. - /// - /// This method sets up the TCP listener, initializes the HTTP server, - /// and enters the main service loop. It handles incoming connections - /// and manages the graceful shutdown process. - pub async fn serve(&mut self) -> Result<(), Box> { - // Bind to the specified address - let listener = TcpListener::bind(self.socket_addr).await?; - - // Specify our HTTP settings (http1, http2, auto all work) + /// Starts the server and handles incoming connections. + pub async fn serve(self, addr: SocketAddr) -> Result<(), Box> { + let listener = TcpListener::bind(addr).await?; let http = http1::Builder::new(); - - // Initialize the graceful shutdown mechanism let graceful = hyper_util::server::graceful::GracefulShutdown::new(); + let mut signal = std::pin::pin!(Self::shutdown_signal()); - // Prepare the shutdown signal future - let mut signal = std::pin::pin!(Server::shutdown_signal()); + println!("Server listening on {}", addr); - println!("Server listening on {}", self.socket_addr); - - // Main server loop loop { tokio::select! { - // Handle incoming connections - Ok((stream, _addr)) = listener.accept() => { + Ok((stream, _)) = listener.accept() => { let io = TokioIo::new(stream); + let builder = self.builder.clone(); - // make copy of services - let svc = self.services.clone(); - - // Create a new service for each connection + let svc = TowerToHyperService::new(self.builder.clone()); let conn = http.serve_connection(io, svc); - // Watch the connection for graceful shutdown let fut = graceful.watch(conn); - // Spawn a new task for each connection, watching for closure tokio::spawn(async move { if let Err(e) = fut.await { eprintln!("Error serving connection: {:?}", e); } }); }, - // Handle shutdown signal _ = &mut signal => { eprintln!("Graceful shutdown signal received"); - // Stop the accept loop break; } } } - // Graceful shutdown process tokio::select! { _ = graceful.shutdown() => { eprintln!("All connections gracefully closed"); Ok(()) }, _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => { - eprintln!("Timed out waiting for all connections to close"); Err(Box::from("Timed out waiting for connections to close")) } } } - /// Waits for a CTRL+C signal to initiate the shutdown process. - /// - /// This function uses tokio's signal handling to wait for a CTRL+C signal, - /// which will trigger the graceful shutdown of our server. async fn shutdown_signal() { tokio::signal::ctrl_c() .await @@ -158,17 +135,12 @@ pub async fn hello(_: Request) -> Result>, Infall #[tokio::main] async fn main() -> Result<(), Box> { let addr = SocketAddr::from(([127, 0, 0, 1], 54321)); - let mut server = Server::new(addr).await?; - - // Create a tower service let svc = tower::service_fn(hello); - server.add_service(Arc::new(svc)); - // Start the server in a separate task - let server_task = tokio::spawn(async move { - server.serve().await - }); + Server::new() + .service(svc) + .serve(addr) + .await?; - // Wait for the server to shut down - server_task.await? -} \ No newline at end of file + Ok(()) +} diff --git a/src/composite_service.rs b/src/composite_service.rs deleted file mode 100644 index 6b95d32..0000000 --- a/src/composite_service.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::{future}; -use std::sync::{Arc}; -use std::task::{Context, Poll}; -use futures_util::future::BoxFuture; -use http_body::Body; -use tokio::sync::Mutex; -use tower_service::Service; - -/// Custom error type for CompositeService -pub enum CompositeError { - NoServices, - AllFailed(E), -} - -/// A composite tower service -#[derive(Clone)] -pub struct CompositeService { - services: Arc>>, - _phantom: std::marker::PhantomData, -} - -impl CompositeService -where - S: Service, -{ - pub fn new() -> Self { - CompositeService { - services: Arc::new(Mutex::new(Vec::new())), - _phantom: std::marker::PhantomData, - } - } - - /// Add service to stack - pub async fn push (&mut self, service: S) { - self.services.lock().await.push(service); - } -} - -impl Service for CompositeService -where - S: Service + Clone + Send + 'static, - S::Future: Send + 'static, - S::Response: Send + 'static, - S::Error: Send + 'static, - Request: Send + 'static, -{ - type Response = S::Response; - type Error = CompositeError; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - /* - let mut services = self.services.lock().unwrap(); - if services.is_empty() { - return Poll::Ready(Err(CompositeError::NoServices)); - } - for mut service in services.iter_mut() { - if service.poll_ready(cx).is_ready() { - return Poll::Ready(Ok(())); - } - } - Poll::Pending - */ - - // We can't use .await here, so we'll need to handle this differently - // This is a simplification and might not be ideal for all use cases - Poll::Ready(Ok(())) - } - - fn call(&mut self, request: Request) -> Self::Future { - let services = self.services.clone(); - Box::pin(async move { - let mut services = services.lock().await; - if services.is_empty() { - return Err(CompositeError::NoServices); - } - - let mut last_error = None; - for mut service in services.iter_mut() { - match service.call(request).await { - Ok(response) => return Ok(response), - Err(e) => { - last_error = Some(e); - break; - } - } - } - - Err(CompositeError::AllFailed(last_error.expect("Should have at least one error"))) - }) - } -} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 2b6597f..e69de29 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +0,0 @@ -mod composite_service; - -pub use composite_service::CompositeService; \ No newline at end of file