Skip to content

Commit

Permalink
Merge branch 'master' into proxy-protocol-header
Browse files Browse the repository at this point in the history
  • Loading branch information
0xAlcibiades authored Oct 13, 2023
2 parents d76e313 + 13ae39f commit 3fa2c1c
Show file tree
Hide file tree
Showing 14 changed files with 478 additions and 195 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "MIT"
name = "hyper-server"
readme = "README.md"
repository = "https://github.com/valorem-labs-inc/hyper-server"
version = "0.5.2"
version = "0.5.3"

[features]
default = []
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ You can find more examples [here](/examples).

## Minimum Supported Rust Version

axum-server's MSRV is `1.65`.
hyper-server's MSRV is `1.65`.

## Safety

Expand Down
36 changes: 28 additions & 8 deletions src/accept.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,51 @@
//! [`Accept`] trait and utilities.
//! Module `accept` provides utilities for asynchronously processing and modifying IO streams and services.
//!
//! The primary trait exposed by this module is [`Accept`], which allows for asynchronous transformations
//! of input streams and services. The module also provides a default implementation, [`DefaultAcceptor`],
//! that performs no modifications and directly passes through the input stream and service.

use std::{
future::{Future, Ready},
io,
};

/// An asynchronous function to modify io stream and service.
/// An asynchronous trait for processing and modifying IO streams and services.
///
/// Implementations of this trait can be used to modify or transform the input stream and service before
/// further processing. For instance, this trait could be used to perform initial authentication, logging,
/// or other setup operations on new connections.
pub trait Accept<I, S> {
/// IO stream produced by accept.
/// The modified or transformed IO stream produced by `accept`.
type Stream;

/// Service produced by accept.
/// The modified or transformed service produced by `accept`.
type Service;

/// Future return value.
/// The Future type that is returned by `accept`.
type Future: Future<Output = io::Result<(Self::Stream, Self::Service)>>;

/// Process io stream and service asynchronously.
/// Asynchronously process and possibly modify the given IO stream and service.
///
/// # Parameters:
/// * `stream`: The incoming IO stream, typically a connection.
/// * `service`: The associated service with the stream.
///
/// # Returns:
/// A future resolving to the modified stream and service, or an error.
fn accept(&self, stream: I, service: S) -> Self::Future;
}

/// A no-op acceptor.
/// A default implementation of the [`Accept`] trait that performs no modifications.
///
/// This is a no-op acceptor that simply passes the provided stream and service through without any transformations.
#[derive(Clone, Copy, Debug, Default)]
pub struct DefaultAcceptor;

impl DefaultAcceptor {
/// Create a new default acceptor.
/// Create a new default acceptor instance.
///
/// # Returns:
/// An instance of [`DefaultAcceptor`].
pub fn new() -> Self {
Self
}
Expand Down
92 changes: 78 additions & 14 deletions src/addr_incoming_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use std::time::Duration;

/// A configuration for [`AddrIncoming`](hyper::server::conn::AddrIncoming).
/// Configuration settings for the `AddrIncoming`.
///
/// This configuration structure is designed to be used in conjunction with the
/// [`AddrIncoming`](hyper::server::conn::AddrIncoming) type from the Hyper crate.
/// It provides a mechanism to customize server settings like TCP keepalive probes,
/// error handling, and other TCP socket-level configurations.
#[derive(Debug, Clone)]
pub struct AddrIncomingConfig {
pub(crate) tcp_sleep_on_accept_errors: bool,
Expand All @@ -17,7 +22,18 @@ impl Default for AddrIncomingConfig {
}

impl AddrIncomingConfig {
/// Creates a default [`AddrIncoming`](hyper::server::conn::AddrIncoming) config.
/// Creates a new `AddrIncomingConfig` with default settings.
///
/// # Default Settings
/// - Sleep on accept errors: `true`
/// - TCP keepalive probes: Disabled (`None`)
/// - Duration between keepalive retransmissions: None
/// - Number of keepalive retransmissions: None
/// - `TCP_NODELAY` option: `false`
///
/// # Returns
///
/// A new `AddrIncomingConfig` instance with default settings.
pub fn new() -> AddrIncomingConfig {
Self {
tcp_sleep_on_accept_errors: true,
Expand All @@ -28,47 +44,95 @@ impl AddrIncomingConfig {
}
}

/// Builds the config, creating an owned version of it.
/// Creates a cloned copy of the current configuration.
///
/// This method can be useful when you want to preserve the original settings and
/// create a modified configuration based on the current one.
///
/// # Returns
///
/// A cloned `AddrIncomingConfig`.
pub fn build(&mut self) -> Self {
self.clone()
}

/// Set whether to sleep on accept errors, to avoid exhausting file descriptor limits.
/// Specifies whether to pause (sleep) when an error occurs while accepting a connection.
///
/// This can be useful to prevent rapidly exhausting file descriptors in scenarios
/// where errors might be transient or frequent.
///
/// Default is `true`.
/// # Parameters
///
/// - `val`: Whether to sleep on accept errors. Default is `true`.
///
/// # Returns
///
/// A mutable reference to the current `AddrIncomingConfig`.
pub fn tcp_sleep_on_accept_errors(&mut self, val: bool) -> &mut Self {
self.tcp_sleep_on_accept_errors = val;
self
}

/// Set how often to send TCP keepalive probes.
/// Configures the frequency of TCP keepalive probes.
///
/// TCP keepalive probes are used to detect whether a peer is still connected.
///
/// # Parameters
///
/// - `val`: Duration between keepalive probes. Setting to `None` disables keepalive probes. Default is `None`.
///
/// By default TCP keepalive probes is disabled.
/// # Returns
///
/// A mutable reference to the current `AddrIncomingConfig`.
pub fn tcp_keepalive(&mut self, val: Option<Duration>) -> &mut Self {
self.tcp_keepalive = val;
self
}

/// Set the duration between two successive TCP keepalive retransmissions,
/// if acknowledgement to the previous keepalive transmission is not received.
/// Configures the duration between two successive TCP keepalive retransmissions.
///
/// If an acknowledgment to a previous keepalive probe isn't received within this duration,
/// a new probe will be sent.
///
/// # Parameters
///
/// - `val`: Duration between keepalive retransmissions. Default is no interval (`None`).
///
/// Default is no interval.
/// # Returns
///
/// A mutable reference to the current `AddrIncomingConfig`.
pub fn tcp_keepalive_interval(&mut self, val: Option<Duration>) -> &mut Self {
self.tcp_keepalive_interval = val;
self
}

/// Set the number of retransmissions to be carried out before declaring that remote end is not available.
/// Configures the number of times to retransmit a TCP keepalive probe if no acknowledgment is received.
///
/// After the specified number of retransmissions, the remote end is considered unavailable.
///
/// # Parameters
///
/// Default is no retry.
/// - `val`: Number of retransmissions before considering the remote end unavailable. Default is no retry (`None`).
///
/// # Returns
///
/// A mutable reference to the current `AddrIncomingConfig`.
pub fn tcp_keepalive_retries(&mut self, val: Option<u32>) -> &mut Self {
self.tcp_keepalive_retries = val;
self
}

/// Set the value of `TCP_NODELAY` option for accepted connections.
/// Configures the `TCP_NODELAY` option for accepted connections.
///
/// When enabled, this option disables Nagle's algorithm, which can reduce latencies for small packets.
///
/// # Parameters
///
/// - `val`: Whether to enable `TCP_NODELAY`. Default is `false`.
///
/// # Returns
///
/// Default is `false`.
/// A mutable reference to the current `AddrIncomingConfig`.
pub fn tcp_nodelay(&mut self, val: bool) -> &mut Self {
self.tcp_nodelay = val;
self
Expand Down
56 changes: 45 additions & 11 deletions src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use std::{
};
use tokio::{sync::Notify, time::sleep};

/// A handle for [`Server`](crate::server::Server).
/// A handle to manage and interact with the server.
///
/// `Handle` provides methods to access server information, such as the number of active connections,
/// and to perform actions like initiating a shutdown.
#[derive(Clone, Debug, Default)]
pub struct Handle {
inner: Arc<HandleInner>,
Expand All @@ -27,33 +30,51 @@ struct HandleInner {
}

impl Handle {
/// Create a new handle.
/// Create a new handle for the server.
///
/// # Returns
///
/// A new `Handle` instance.
pub fn new() -> Self {
Self::default()
}

/// Get the number of connections.
/// Get the number of active connections to the server.
///
/// # Returns
///
/// The number of active connections.
pub fn connection_count(&self) -> usize {
self.inner.conn_count.load(Ordering::SeqCst)
}

/// Shutdown the server.
/// Initiate an immediate shutdown of the server.
///
/// This method will terminate the server without waiting for active connections to close.
pub fn shutdown(&self) {
self.inner.shutdown.notify_waiters();
}

/// Gracefully shutdown the server.
/// Initiate a graceful shutdown of the server.
///
/// The server will wait for active connections to close before shutting down. If a duration
/// is provided, the server will wait up to that duration for active connections to close
/// before forcing a shutdown.
///
/// `None` means indefinite grace period.
/// # Parameters
///
/// - `duration`: Maximum time to wait for active connections to close. `None` means the server
/// will wait indefinitely.
pub fn graceful_shutdown(&self, duration: Option<Duration>) {
*self.inner.graceful_dur.lock().unwrap() = duration;

self.inner.graceful.notify_waiters();
}

/// Returns local address and port when server starts listening.
/// Wait until the server starts listening and then returns its local address and port.
///
/// # Returns
///
/// Returns `None` if server fails to bind.
/// The local `SocketAddr` if the server successfully binds, otherwise `None`.
pub async fn listening(&self) -> Option<SocketAddr> {
let notified = self.inner.addr_notify.notified();

Expand All @@ -66,24 +87,28 @@ impl Handle {
*self.inner.addr.lock().unwrap()
}

/// Internal method to notify the handle when the server starts listening on a particular address.
pub(crate) fn notify_listening(&self, addr: Option<SocketAddr>) {
*self.inner.addr.lock().unwrap() = addr;

self.inner.addr_notify.notify_waiters();
}

/// Creates a watcher that monitors server status and connection activity.
pub(crate) fn watcher(&self) -> Watcher {
Watcher::new(self.clone())
}

/// Internal method to wait until the server is shut down.
pub(crate) async fn wait_shutdown(&self) {
self.inner.shutdown.notified().await;
}

/// Internal method to wait until the server is gracefully shut down.
pub(crate) async fn wait_graceful_shutdown(&self) {
self.inner.graceful.notified().await;
}

/// Internal method to wait until all connections have ended, or the optional graceful duration has expired.
pub(crate) async fn wait_connections_end(&self) {
if self.inner.conn_count.load(Ordering::SeqCst) == 0 {
return;
Expand All @@ -102,27 +127,36 @@ impl Handle {
}
}

/// A watcher that monitors server status and connection activity.
///
/// The watcher keeps track of active connections and listens for shutdown or graceful shutdown signals.
pub(crate) struct Watcher {
handle: Handle,
}

impl Watcher {
/// Creates a new watcher linked to the given server handle.
fn new(handle: Handle) -> Self {
handle.inner.conn_count.fetch_add(1, Ordering::SeqCst);

Self { handle }
}

/// Internal method to wait until the server is gracefully shut down.
pub(crate) async fn wait_graceful_shutdown(&self) {
self.handle.wait_graceful_shutdown().await
}

/// Internal method to wait until the server is shut down.
pub(crate) async fn wait_shutdown(&self) {
self.handle.wait_shutdown().await
}
}

impl Drop for Watcher {
/// Reduces the active connection count when a watcher is dropped.
///
/// If the connection count reaches zero and a graceful shutdown has been initiated, the server is notified that
/// all connections have ended.
fn drop(&mut self) {
let count = self.handle.inner.conn_count.fetch_sub(1, Ordering::SeqCst) - 1;

Expand Down
Loading

0 comments on commit 3fa2c1c

Please sign in to comment.