Skip to content

Commit

Permalink
try two
Browse files Browse the repository at this point in the history
  • Loading branch information
tr8dr committed Sep 9, 2024
1 parent 95f6194 commit ad5c61b
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 187 deletions.
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ nix = { version = "0.29.0", features = ["signal"] }

[[example]]
name = "tower-server1"
path = "examples/tower-server1.rs"

[[example]]
name = "tower-server2"
path = "examples/tower-server2.rs"

[[example]]
name = "tower-client"
path = "examples/tower-client.rs"
4 changes: 2 additions & 2 deletions examples/tower-server1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ impl Server {
let io = TokioIo::new(stream);
// Create a tower service
let svc = tower::service_fn(hello);
let svc = ServiceBuilder::new().service(svc);
let remapped = ServiceBuilder::new().service(svc.clone());
// Convert to a hyper service
let svc = TowerToHyperService::new(svc);
let svc = TowerToHyperService::new(remapped);
// Create a new service for each connection
let conn = http.serve_connection(io, svc);
// Watch the connection for graceful shutdown
Expand Down
146 changes: 59 additions & 87 deletions examples/tower-server2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,137 +7,114 @@ use hyper::{
server::conn::http1,
Request, Response,
};
use tower::layer::util::Identity;
use hyper_util::rt::TokioIo;
use hyper_util::service::TowerToHyperService;
use http_body_util::Full;
use std::{convert::Infallible, net::SocketAddr};
use std::future::Future;
use std::pin::Pin;
use std::{convert::Infallible, future, net::SocketAddr};
use std::sync::Arc;
use http_body::Body;
use hyper::service::Service;
use tokio::net::TcpListener;
use tower_service::Service;
use hyper_server::CompositeService;
use tower::layer::util::{Identity, Stack};
use tower::{Layer, ServiceBuilder};
use tower::util::BoxService;


// Type alias for the complex error type
type BoxError = Box<dyn std::error::Error + Send + Sync>;

// Type alias for the future returned by the service
type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;

// Type alias for the specific service signature
type SpecificService = dyn Service<
Request<Incoming>,
Response = Response<Full<Bytes>>,
Error = BoxError,
Future = BoxFuture<Result<Response<Full<Bytes>>, BoxError>>> + Send + Sync;
type BoxBody = http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>;


pub struct Server<L = tower::layer::util::Identity> {
/// The layer stack that will be applied to each service
builder: Arc<L>,
}

/// Represents our HTTP server.
pub struct Server {
/// The socket address on which the server will listen.
socket_addr: SocketAddr,
services: CompositeService<Box<SpecificService>, Request<Incoming>>,
impl Default for Server<tower::layer::util::Identity> {
fn default() -> Self {
Self {
builder: Arc::new(tower::layer::util::Identity::new()),
}
}
}

impl Server {
/// Creates a new Server instance.
///
/// # Arguments
///
/// * `addr` - The socket address on which the server will listen.
pub async fn new(addr: SocketAddr) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
Ok(Server {
socket_addr: addr,
services: CompositeService::new(),
})
fn new() -> Self {
Server::default()
}
}

/// Add a tower service to the server
pub fn add_service<S>(&mut self, service: S)
impl<L> Server<L>
where
L: Layer<BoxService<hyper::Request<hyper::body::Incoming>, hyper::Response<BoxBody>, hyper::Error>> + Send + Sync + 'static,
L::Service: Send + 'static,
{
/// Add layer
pub fn layer<NewLayer>(self, layer: NewLayer) -> Server<Stack<NewLayer, L>>
where
S: Service<
Request<Incoming>,
Response = Response<Full<Bytes>>,
Error = BoxError,
Future = BoxFuture<Result<Response<Full<Bytes>>, BoxError>>
> + Send + Sync + 'static
NewLayer: Layer<BoxService<hyper::Request<hyper::body::Incoming>, hyper::Response<BoxBody>, hyper::Error>> + Send + Sync + 'static,
{
self.services.push(Box::new(service) as Box<SpecificService>);
Server {
builder: Arc::new(ServiceBuilder::new().layer(layer).chain(Arc::try_unwrap(self.builder).unwrap_or_else(|arc| (*arc).clone()))),
}
}

/// Add service
pub fn service<S>(self, svc: S) -> Self
where
S: Service<hyper::Request<hyper::body::Incoming>, Response = hyper::Response<BoxBody>, Error = hyper::Error> + Clone + Send + 'static,
S::Future: Send + 'static,
{
let builder = self.builder.clone();
Server {
builder: Arc::new(ServiceBuilder::new().service_fn(move |req| {
let svc = builder.service(svc.clone());
svc.call(req)
})),
}
}


/// Starts the server and handles incoming connections.
///
/// This method sets up the TCP listener, initializes the HTTP server,
/// and enters the main service loop. It handles incoming connections
/// and manages the graceful shutdown process.
pub async fn serve(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Bind to the specified address
let listener = TcpListener::bind(self.socket_addr).await?;

// Specify our HTTP settings (http1, http2, auto all work)
/// Starts the server and handles incoming connections.
pub async fn serve(self, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind(addr).await?;
let http = http1::Builder::new();

// Initialize the graceful shutdown mechanism
let graceful = hyper_util::server::graceful::GracefulShutdown::new();
let mut signal = std::pin::pin!(Self::shutdown_signal());

// Prepare the shutdown signal future
let mut signal = std::pin::pin!(Server::shutdown_signal());
println!("Server listening on {}", addr);

println!("Server listening on {}", self.socket_addr);

// Main server loop
loop {
tokio::select! {
// Handle incoming connections
Ok((stream, _addr)) = listener.accept() => {
Ok((stream, _)) = listener.accept() => {
let io = TokioIo::new(stream);
let builder = self.builder.clone();

// make copy of services
let svc = self.services.clone();

// Create a new service for each connection
let svc = TowerToHyperService::new(self.builder.clone());
let conn = http.serve_connection(io, svc);
// Watch the connection for graceful shutdown
let fut = graceful.watch(conn);

// Spawn a new task for each connection, watching for closure
tokio::spawn(async move {
if let Err(e) = fut.await {
eprintln!("Error serving connection: {:?}", e);
}
});
},
// Handle shutdown signal
_ = &mut signal => {
eprintln!("Graceful shutdown signal received");
// Stop the accept loop
break;
}
}
}

// Graceful shutdown process
tokio::select! {
_ = graceful.shutdown() => {
eprintln!("All connections gracefully closed");
Ok(())
},
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
eprintln!("Timed out waiting for all connections to close");
Err(Box::from("Timed out waiting for connections to close"))
}
}
}

/// Waits for a CTRL+C signal to initiate the shutdown process.
///
/// This function uses tokio's signal handling to wait for a CTRL+C signal,
/// which will trigger the graceful shutdown of our server.
async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
Expand All @@ -158,17 +135,12 @@ pub async fn hello(_: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infall
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr = SocketAddr::from(([127, 0, 0, 1], 54321));
let mut server = Server::new(addr).await?;

// Create a tower service
let svc = tower::service_fn(hello);
server.add_service(Arc::new(svc));

// Start the server in a separate task
let server_task = tokio::spawn(async move {
server.serve().await
});
Server::new()
.service(svc)
.serve(addr)
.await?;

// Wait for the server to shut down
server_task.await?
}
Ok(())
}
92 changes: 0 additions & 92 deletions src/composite_service.rs

This file was deleted.

3 changes: 0 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
mod composite_service;

pub use composite_service::CompositeService;

0 comments on commit ad5c61b

Please sign in to comment.