Skip to content

Commit

Permalink
feat: tcp incoming
Browse files Browse the repository at this point in the history
  • Loading branch information
0xAlcibiades committed Sep 10, 2024
1 parent 4d9bc67 commit 8e66463
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 376 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ tokio-rustls = "0.26.0"
tower = { version = "0.5.1", features = ["util"] }
tracing = "0.1.40"
http-body = "1.0.1"
tokio-stream = "0.1.16"
tokio-stream = { version = "0.1.16", features = ["net"] }
bytes = "1.7.1"
pin-project = "1.1.5"
async-stream = "0.3.5"
futures = "0.3.30"

[dev-dependencies]
tokio = { version = "1.40.0", features = ["macros"] }
tokio = { version = "1.0", features = ["rt", "net", "test-util", "macros"] }
tokio-test = "0.4.4"
70 changes: 70 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::{error::Error as StdError, fmt};

type Source = Box<dyn StdError + Send + Sync + 'static>;

/// Errors that originate from the server;
pub struct Error {
inner: ErrorImpl,
}

struct ErrorImpl {
kind: Kind,
source: Option<Source>,
}

#[derive(Debug)]
pub(crate) enum Kind {
Transport,
}

impl Error {
pub(crate) fn new(kind: Kind) -> Self {
Self {
inner: ErrorImpl { kind, source: None },
}
}

pub(crate) fn with(mut self, source: impl Into<Source>) -> Self {
self.inner.source = Some(source.into());
self
}

pub(crate) fn from_source(source: impl Into<crate::Error>) -> Self {
Error::new(Kind::Transport).with(source)
}

fn description(&self) -> &str {
match &self.inner.kind {
Kind::Transport => "transport error",
}
}
}

impl fmt::Debug for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut f = f.debug_tuple("tonic::transport::Error");

f.field(&self.inner.kind);

if let Some(source) = &self.inner.source {
f.field(source);
}

f.finish()
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.description())
}
}

impl StdError for Error {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
self.inner
.source
.as_ref()
.map(|source| &**source as &(dyn StdError + 'static))
}
}
30 changes: 30 additions & 0 deletions src/fuse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

// From `futures-util` crate, borrowed since this is the only dependency hyper-server requires.
// LICENSE: MIT or Apache-2.0
// A future which only yields `Poll::Ready` once, and thereafter yields `Poll::Pending`.
#[pin_project]
pub(crate) struct Fuse<F> {
#[pin]
pub(crate) inner: Option<F>,
}

impl<F> Future for Fuse<F>
where
F: Future,
{
type Output = F::Output;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().project().inner.as_pin_mut() {
Some(fut) => fut.poll(cx).map(|output| {
self.project().inner.set(None);
output
}),
None => Poll::Pending,
}
}
}
100 changes: 100 additions & 0 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use http::{Request, Response};
use http_body::Body;
use hyper::body::Incoming;
use hyper::service::Service;
use hyper_util::server::conn::auto::{Builder, HttpServerConnExec};
use std::future::pending;
use std::pin::pin;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{debug, trace};

async fn sleep_or_pending(wait_for: Option<Duration>) {
match wait_for {
Some(wait) => sleep(wait).await,
None => pending().await,
};
}

/// Serves a single HTTP connection from a hyper service backend.
///
/// This method handles an individual HTTP connection, processing requests through
/// the provided service and managing the connection lifecycle.
///
/// # Type Parameters
///
/// * `B`: The body type for the HTTP response.
/// * `IO`: The I/O type for the HTTP connection.
/// * `S`: The service type that processes HTTP requests.
/// * `E`: The executor type for the HTTP server connection.
///
/// # Parameters
///
/// * `hyper_io`: The I/O object representing the inbound hyper IO stream.
/// * `hyper_svc`: The hyper `Service` implementation used to process HTTP requests.
/// * `builder`: An `HttpConnBuilder` used to create and serve the HTTP connection.
/// * `watcher`: An optional `tokio::sync::watch::Receiver` for graceful shutdown signaling.
/// * `max_connection_age`: An optional `Duration` specifying the maximum age of the connection
/// before initiating a graceful shutdown.
async fn serve_http_connection<B, IO, S, E>(
hyper_io: IO,
hyper_service: S,
builder: Builder<E>,
mut watcher: Option<tokio::sync::watch::Receiver<()>>,
max_connection_age: Option<Duration>,
) where
B: Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
IO: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
S: Service<Request<Incoming>, Response = Response<B>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
E: HttpServerConnExec<S::Future, B> + Send + Sync + 'static,
{
// Spawn a new asynchronous task to handle the incoming hyper IO stream
tokio::spawn(async move {
{
// Set up a fused future for the watcher
let mut sig = pin!(crate::fuse::Fuse {
inner: watcher.as_mut().map(|w| w.changed()),
});

// Create and pin the HTTP connection
let mut conn = pin!(builder.serve_connection(hyper_io, hyper_service));

// Set up the sleep future for max connection age
let sleep = sleep_or_pending(max_connection_age);
tokio::pin!(sleep);

// Main loop for serving the HTTP connection
loop {
tokio::select! {
// Handle the connection result
rv = &mut conn => {
if let Err(err) = rv {
// Log any errors that occur while serving the HTTP connection
debug!("failed serving HTTP connection: {:#}", err);
}
break;
},
// Handle max connection age timeout
_ = &mut sleep => {
// Initiate a graceful shutdown when max connection age is reached
conn.as_mut().graceful_shutdown();
sleep.set(sleep_or_pending(None));
},
// Handle graceful shutdown signal
_ = &mut sig => {
// Initiate a graceful shutdown when signal is received
conn.as_mut().graceful_shutdown();
}
}
}
}

// Clean up and log connection closure
drop(watcher);
trace!("HTTP connection closed");
});
}
Loading

0 comments on commit 8e66463

Please sign in to comment.