Skip to content

Commit

Permalink
feat: max_inactive_interval option for Websocket server (#1192)
Browse files Browse the repository at this point in the history
adds extra option to Websocket server, which can be used
to configure forceful disconnection clients, which are not
submitting any requests (including pongs).
  • Loading branch information
bmuddha committed Aug 30, 2023
1 parent d4ea346 commit 0518896
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 54 deletions.
2 changes: 1 addition & 1 deletion client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async fn batch_request_with_failed_call_gives_proper_error() {
.unwrap()
.unwrap();
let err: Vec<_> = res.into_ok().unwrap_err().collect();
assert_eq!(err, vec![ErrorObject::from(ErrorCode::MethodNotFound), ErrorObject::borrowed(-32602, &"foo", None)]);
assert_eq!(err, vec![ErrorObject::from(ErrorCode::MethodNotFound), ErrorObject::borrowed(-32602, "foo", None)]);
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ pub use future::ServerHandle;
pub use jsonrpsee_core::server::*;
pub use jsonrpsee_core::{id_providers::*, traits::IdProvider};
pub use jsonrpsee_types as types;
pub use server::{BatchRequestConfig, Builder as ServerBuilder, Server};
pub use server::{BatchRequestConfig, Builder as ServerBuilder, PingConfig, Server};
pub use tracing;
92 changes: 73 additions & 19 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ where
max_subscriptions_per_connection,
batch_requests_config,
id_provider: id_provider.clone(),
ping_interval: self.cfg.ping_interval,
ping_config: self.cfg.ping_config,
stop_handle: stop_handle.clone(),
conn_id: id,
logger: logger.clone(),
Expand Down Expand Up @@ -210,14 +210,14 @@ struct Settings {
batch_requests_config: BatchRequestConfig,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
/// The interval at which `Ping` frames are submitted.
ping_interval: Duration,
/// Enable HTTP.
enable_http: bool,
/// Enable WS.
enable_ws: bool,
/// Number of messages that server is allowed to `buffer` until backpressure kicks in.
message_buffer_capacity: u32,
/// Ping settings.
ping_config: PingConfig,
}

/// Configuration for batch request handling.
Expand All @@ -231,6 +231,54 @@ pub enum BatchRequestConfig {
Unlimited,
}

/// Configuration for WebSocket ping's.
///
/// If the server sends out a ping then remote peer must reply with a corresponding pong message.
///
/// It's possible to just send out pings then don't care about response
/// or terminate the connection if the ping isn't replied to the configured `max_inactivity` limit.
///
/// NOTE: It's possible that a `ping` may be backpressured and if you expect a connection
/// to be reassumed after interruption it's not recommended to enable the activity check.
#[derive(Debug, Copy, Clone)]
pub enum PingConfig {
/// The server pings the connected clients continuously at the configured interval but
/// doesn't disconnect them if no pongs are received from the client.
WithoutInactivityCheck(Duration),
/// The server pings the connected clients continuously at the configured interval
/// and terminates the connection if no websocket messages received from client
/// after the max limit is exceeded.
WithInactivityCheck {
/// Time interval between consequent pings from server
ping_interval: Duration,
/// Max allowed time for connection to stay idle
inactive_limit: Duration,
},
}

impl PingConfig {
pub(crate) fn ping_interval(&self) -> Duration {
match self {
Self::WithoutInactivityCheck(ping_interval) => *ping_interval,
Self::WithInactivityCheck { ping_interval, .. } => *ping_interval,
}
}

pub(crate) fn inactive_limit(&self) -> Option<Duration> {
if let Self::WithInactivityCheck { inactive_limit, .. } = self {
Some(*inactive_limit)
} else {
None
}
}
}

impl Default for PingConfig {
fn default() -> Self {
Self::WithoutInactivityCheck(Duration::from_secs(60))
}
}

impl Default for Settings {
fn default() -> Self {
Self {
Expand All @@ -241,10 +289,10 @@ impl Default for Settings {
max_subscriptions_per_connection: 1024,
batch_requests_config: BatchRequestConfig::Unlimited,
tokio_runtime: None,
ping_interval: Duration::from_secs(60),
enable_http: true,
enable_ws: true,
message_buffer_capacity: 1024,
ping_config: PingConfig::WithoutInactivityCheck(Duration::from_secs(60)),
}
}
}
Expand Down Expand Up @@ -368,25 +416,31 @@ impl<B, L> Builder<B, L> {
self
}

/// Configure the interval at which pings are submitted.
/// Configure the interval at which pings are submitted,
/// and optionally enable connection inactivity check
///
/// This option is used to keep the connection alive, and is just submitting `Ping` frames,
/// without making any assumptions about when a `Pong` frame should be received.
/// This option is used to keep the connection alive, and can be configured to just submit `Ping` frames or with extra parameter, configuring max interval when a `Pong` frame should be received
///
/// Default: 60 seconds.
/// Default: ping interval is set to 60 seconds and the inactivity check is disabled
///
/// # Examples
///
/// ```rust
/// use std::time::Duration;
/// use jsonrpsee_server::ServerBuilder;
/// use jsonrpsee_server::{ServerBuilder, PingConfig};
///
/// // Set the ping interval to 10 seconds.
/// let builder = ServerBuilder::default().ping_interval(Duration::from_secs(10));
/// // Set the ping interval to 10 seconds but terminate the connection if a client is inactive for more than 2 minutes
/// let builder = ServerBuilder::default().ping_interval(PingConfig::WithInactivityCheck { ping_interval: Duration::from_secs(10), inactive_limit: Duration::from_secs(2 * 60) }).unwrap();
/// ```
pub fn ping_interval(mut self, interval: Duration) -> Self {
self.settings.ping_interval = interval;
self
pub fn ping_interval(mut self, config: PingConfig) -> Result<Self, Error> {
if let PingConfig::WithInactivityCheck { ping_interval, inactive_limit } = config {
if ping_interval >= inactive_limit {
return Err(Error::Custom("`inactive_limit` must be bigger than `ping_interval` to work".into()));
}
}

self.settings.ping_config = config;
Ok(self)
}

/// Configure custom `subscription ID` provider for the server to use
Expand Down Expand Up @@ -576,8 +630,8 @@ pub(crate) struct ServiceData<L: Logger> {
pub(crate) batch_requests_config: BatchRequestConfig,
/// Subscription ID provider.
pub(crate) id_provider: Arc<dyn IdProvider>,
/// Ping interval
pub(crate) ping_interval: Duration,
/// Ping configuration.
pub(crate) ping_config: PingConfig,
/// Stop handle.
pub(crate) stop_handle: StopHandle,
/// Connection ID
Expand Down Expand Up @@ -700,8 +754,8 @@ struct ProcessConnection<L> {
batch_requests_config: BatchRequestConfig,
/// Subscription ID provider.
id_provider: Arc<dyn IdProvider>,
/// Ping interval
ping_interval: Duration,
/// Ping config.
ping_config: PingConfig,
/// Stop handle.
stop_handle: StopHandle,
/// Max connections,
Expand Down Expand Up @@ -769,7 +823,7 @@ fn process_connection<'a, L: Logger, B, U>(
max_subscriptions_per_connection: cfg.max_subscriptions_per_connection,
batch_requests_config: cfg.batch_requests_config,
id_provider: cfg.id_provider,
ping_interval: cfg.ping_interval,
ping_config: cfg.ping_config,
stop_handle: cfg.stop_handle.clone(),
conn_id: cfg.conn_id,
logger: cfg.logger,
Expand Down
3 changes: 2 additions & 1 deletion server/src/tests/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,8 @@ async fn server_with_infinite_call(
) -> (crate::ServerHandle, std::net::SocketAddr) {
let server = ServerBuilder::default()
// Make sure that the ping_interval doesn't force the connection to be closed
.ping_interval(timeout)
.ping_interval(crate::server::PingConfig::WithoutInactivityCheck(timeout))
.unwrap()
.build("127.0.0.1:0")
.with_default_timeout()
.await
Expand Down
91 changes: 60 additions & 31 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use crate::logger::{self, Logger, TransportProtocol};
use crate::server::{BatchRequestConfig, ServiceData};
use crate::PingConfig;

use futures_util::future::{self, Either};
use futures_util::future::{self, Either, Fuse};
use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::{FuturesOrdered, FuturesUnordered};
use futures_util::{Future, StreamExt};
use futures_util::{Future, FutureExt, StreamExt};
use hyper::upgrade::Upgraded;
use jsonrpsee_core::server::helpers::{
batch_response_error, prepare_error, BatchResponseBuilder, MethodResponse, MethodSink,
Expand Down Expand Up @@ -234,7 +235,7 @@ pub(crate) async fn background_task<L: Logger>(sender: Sender, mut receiver: Rec
batch_requests_config,
stop_handle,
id_provider,
ping_interval,
ping_config,
conn_id,
logger,
remote_addr,
Expand All @@ -250,7 +251,7 @@ pub(crate) async fn background_task<L: Logger>(sender: Sender, mut receiver: Rec
let pending_calls = FuturesUnordered::new();

// Spawn another task that sends out the responses on the Websocket.
let send_task_handle = tokio::spawn(send_task(rx, sender, ping_interval, conn_rx));
let send_task_handle = tokio::spawn(send_task(rx, sender, ping_config.ping_interval(), conn_rx));

// Buffer for incoming data.
let mut data = Vec::with_capacity(100);
Expand Down Expand Up @@ -278,14 +279,14 @@ pub(crate) async fn background_task<L: Logger>(sender: Sender, mut receiver: Rec
// Thus, this check enforces that if the client can't keep up with receiving messages,
// then no new messages will be read from them.
//
// TCP retransmission mechanism will take of the rest and adjust the window size accordingly.
// TCP retransmission mechanism will take care of the rest and adjust the window size accordingly.
let Some(stop) = wait_until_connection_buffer_has_capacity(&sink, stopped).await else {
break Ok(Shutdown::ConnectionClosed);
};

stopped = stop;

match try_recv(&mut receiver, &mut data, stopped).await {
match try_recv(&mut receiver, &mut data, stopped, ping_config).await {
Receive::Shutdown => break Ok(Shutdown::Stopped),
Receive::Ok(stop) => {
stopped = stop;
Expand Down Expand Up @@ -379,7 +380,8 @@ async fn send_task(
}

// Handle timer intervals.
Either::Right((Either::Left((Some(_instant), stop)), next_rx)) => {
Either::Right((Either::Left((_instant, _stopped)), next_rx)) => {
stop = _stopped;
if let Err(err) = send_ping(&mut ws_sender).await {
tracing::debug!("WS transport error: send ping failed: {}", err);
break;
Expand All @@ -388,11 +390,8 @@ async fn send_task(
rx_item = next_rx;
futs = future::select(ping_interval.next(), stop);
}

Either::Right((Either::Left((None, _)), _)) => unreachable!("IntervalStream never terminates"),

// Server is stopped.
Either::Right((Either::Right(_), _)) => {
Either::Right((Either::Right((_stopped, _)), _)) => {
// server has stopped
break;
}
}
Expand Down Expand Up @@ -426,31 +425,61 @@ where
}

/// Attempts to read data from WebSocket fails if the server was stopped.
async fn try_recv<S>(receiver: &mut Receiver, data: &mut Vec<u8>, stopped: S) -> Receive<S>
async fn try_recv<S>(receiver: &mut Receiver, data: &mut Vec<u8>, mut stopped: S, ping_config: PingConfig) -> Receive<S>
where
S: Future<Output = ()> + Unpin,
{
let receive = async {
// Identical loop to `soketto::receive_data` with debug logs for `Pong` frames.
loop {
match receiver.receive(data).await? {
soketto::Incoming::Data(d) => break Ok(d),
soketto::Incoming::Pong(_) => tracing::debug!("Received pong"),
soketto::Incoming::Closed(_) => {
// The closing reason is already logged by `soketto` trace log level.
// Return the `Closed` error to avoid logging unnecessary warnings on clean shutdown.
break Err(SokettoError::Closed);
}
}
let mut last_active = Instant::now();

let receive = futures_util::stream::unfold((receiver, data), |(receiver, data)| async {
match receiver.receive(data).await {
Ok(soketto::Incoming::Data(_)) => None,
Ok(soketto::Incoming::Pong(_)) => Some((Ok(()), (receiver, data))),
Ok(soketto::Incoming::Closed(_)) => Some((Err(SokettoError::Closed), (receiver, data))),
// The closing reason is already logged by `soketto` trace log level.
// Return the `Closed` error to avoid logging unnecessary warnings on clean shutdown.
Err(e) => Some((Err(e), (receiver, data))),
}
};
});

tokio::pin!(receive);

match futures_util::future::select(receive, stopped).await {
Either::Left((Ok(_), s)) => Receive::Ok(s),
Either::Left((Err(e), s)) => Receive::Err(e, s),
Either::Right(_) => Receive::Shutdown,
let inactivity_check =
Box::pin(ping_config.inactive_limit().map(|d| tokio::time::sleep(d).fuse()).unwrap_or_else(Fuse::terminated));
let mut futs = futures_util::future::select(receive.next(), inactivity_check);

loop {
match futures_util::future::select(futs, stopped).await {
// The message has been received, we are done
Either::Left((Either::Left((None, _)), s)) => break Receive::Ok(s),
// Got a pong response, update our "last seen" timestamp.
Either::Left((Either::Left((Some(Ok(())), inactive)), s)) => {
last_active = Instant::now();
stopped = s;
futs = futures_util::future::select(receive.next(), inactive);
}
// Received an error, terminate the connection.
Either::Left((Either::Left((Some(Err(e)), _)), s)) => break Receive::Err(e, s),
// Max inactivity timeout fired, check if the connection has been idle too long.
Either::Left((Either::Right((_instant, rcv)), s)) => {
let inactive_limit_exceeded =
ping_config.inactive_limit().map_or(false, |duration| last_active.elapsed() > duration);

if inactive_limit_exceeded {
break Receive::Err(SokettoError::Closed, s);
}

stopped = s;
// use really large duration instead of Duration::MAX to
// solve the panic issue with interval initialization
let inactivity_check = Box::pin(
ping_config.inactive_limit().map(|d| tokio::time::sleep(d).fuse()).unwrap_or_else(Fuse::terminated),
);
futs = futures_util::future::select(rcv, inactivity_check);
}
// Server has been stopped.
Either::Right(_) => break Receive::Shutdown,
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ mod tests {
let data = serde_json::value::to_raw_value(&"\\\"validate_transaction\\\"").unwrap();
let exp = ErrorObject::borrowed(
1002,
&"desc: \"Could not decode `ChargeAssetTxPayment::asset_id`\" } })",
"desc: \"Could not decode `ChargeAssetTxPayment::asset_id`\" } })",
Some(&*data),
);

Expand Down

0 comments on commit 0518896

Please sign in to comment.