Skip to content

Commit

Permalink
Revert "feat: max_inactive_interval option for Websocket server (#1192)"
Browse files Browse the repository at this point in the history
This reverts commit 0518896.
  • Loading branch information
niklasad1 committed Sep 15, 2023
1 parent 30c0fbb commit 146dfd0
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 137 deletions.
2 changes: 1 addition & 1 deletion client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async fn batch_request_with_failed_call_gives_proper_error() {
.unwrap()
.unwrap();
let err: Vec<_> = res.into_ok().unwrap_err().collect();
assert_eq!(err, vec![ErrorObject::from(ErrorCode::MethodNotFound), ErrorObject::borrowed(-32602, "foo", None)]);
assert_eq!(err, vec![ErrorObject::from(ErrorCode::MethodNotFound), ErrorObject::borrowed(-32602, &"foo", None)]);
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ pub use future::ServerHandle;
pub use jsonrpsee_core::server::*;
pub use jsonrpsee_core::{id_providers::*, traits::IdProvider};
pub use jsonrpsee_types as types;
pub use server::{BatchRequestConfig, Builder as ServerBuilder, PingConfig, Server};
pub use server::{BatchRequestConfig, Builder as ServerBuilder, Server};
pub use tracing;
92 changes: 19 additions & 73 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ where
max_subscriptions_per_connection,
batch_requests_config,
id_provider: id_provider.clone(),
ping_config: self.cfg.ping_config,
ping_interval: self.cfg.ping_interval,
stop_handle: stop_handle.clone(),
conn_id: id,
logger: logger.clone(),
Expand Down Expand Up @@ -210,14 +210,14 @@ struct Settings {
batch_requests_config: BatchRequestConfig,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
/// The interval at which `Ping` frames are submitted.
ping_interval: Duration,
/// Enable HTTP.
enable_http: bool,
/// Enable WS.
enable_ws: bool,
/// Number of messages that server is allowed to `buffer` until backpressure kicks in.
message_buffer_capacity: u32,
/// Ping settings.
ping_config: PingConfig,
}

/// Configuration for batch request handling.
Expand All @@ -231,54 +231,6 @@ pub enum BatchRequestConfig {
Unlimited,
}

/// Configuration for WebSocket ping's.
///
/// If the server sends out a ping then remote peer must reply with a corresponding pong message.
///
/// It's possible to just send out pings then don't care about response
/// or terminate the connection if the ping isn't replied to the configured `max_inactivity` limit.
///
/// NOTE: It's possible that a `ping` may be backpressured and if you expect a connection
/// to be reassumed after interruption it's not recommended to enable the activity check.
#[derive(Debug, Copy, Clone)]
pub enum PingConfig {
/// The server pings the connected clients continuously at the configured interval but
/// doesn't disconnect them if no pongs are received from the client.
WithoutInactivityCheck(Duration),
/// The server pings the connected clients continuously at the configured interval
/// and terminates the connection if no websocket messages received from client
/// after the max limit is exceeded.
WithInactivityCheck {
/// Time interval between consequent pings from server
ping_interval: Duration,
/// Max allowed time for connection to stay idle
inactive_limit: Duration,
},
}

impl PingConfig {
pub(crate) fn ping_interval(&self) -> Duration {
match self {
Self::WithoutInactivityCheck(ping_interval) => *ping_interval,
Self::WithInactivityCheck { ping_interval, .. } => *ping_interval,
}
}

pub(crate) fn inactive_limit(&self) -> Option<Duration> {
if let Self::WithInactivityCheck { inactive_limit, .. } = self {
Some(*inactive_limit)
} else {
None
}
}
}

impl Default for PingConfig {
fn default() -> Self {
Self::WithoutInactivityCheck(Duration::from_secs(60))
}
}

impl Default for Settings {
fn default() -> Self {
Self {
Expand All @@ -289,10 +241,10 @@ impl Default for Settings {
max_subscriptions_per_connection: 1024,
batch_requests_config: BatchRequestConfig::Unlimited,
tokio_runtime: None,
ping_interval: Duration::from_secs(60),
enable_http: true,
enable_ws: true,
message_buffer_capacity: 1024,
ping_config: PingConfig::WithoutInactivityCheck(Duration::from_secs(60)),
}
}
}
Expand Down Expand Up @@ -416,31 +368,25 @@ impl<B, L> Builder<B, L> {
self
}

/// Configure the interval at which pings are submitted,
/// and optionally enable connection inactivity check
/// Configure the interval at which pings are submitted.
///
/// This option is used to keep the connection alive, and can be configured to just submit `Ping` frames or with extra parameter, configuring max interval when a `Pong` frame should be received
/// This option is used to keep the connection alive, and is just submitting `Ping` frames,
/// without making any assumptions about when a `Pong` frame should be received.
///
/// Default: ping interval is set to 60 seconds and the inactivity check is disabled
/// Default: 60 seconds.
///
/// # Examples
///
/// ```rust
/// use std::time::Duration;
/// use jsonrpsee_server::{ServerBuilder, PingConfig};
/// use jsonrpsee_server::ServerBuilder;
///
/// // Set the ping interval to 10 seconds but terminate the connection if a client is inactive for more than 2 minutes
/// let builder = ServerBuilder::default().ping_interval(PingConfig::WithInactivityCheck { ping_interval: Duration::from_secs(10), inactive_limit: Duration::from_secs(2 * 60) }).unwrap();
/// // Set the ping interval to 10 seconds.
/// let builder = ServerBuilder::default().ping_interval(Duration::from_secs(10));
/// ```
pub fn ping_interval(mut self, config: PingConfig) -> Result<Self, Error> {
if let PingConfig::WithInactivityCheck { ping_interval, inactive_limit } = config {
if ping_interval >= inactive_limit {
return Err(Error::Custom("`inactive_limit` must be bigger than `ping_interval` to work".into()));
}
}

self.settings.ping_config = config;
Ok(self)
pub fn ping_interval(mut self, interval: Duration) -> Self {
self.settings.ping_interval = interval;
self
}

/// Configure custom `subscription ID` provider for the server to use
Expand Down Expand Up @@ -630,8 +576,8 @@ pub(crate) struct ServiceData<L: Logger> {
pub(crate) batch_requests_config: BatchRequestConfig,
/// Subscription ID provider.
pub(crate) id_provider: Arc<dyn IdProvider>,
/// Ping configuration.
pub(crate) ping_config: PingConfig,
/// Ping interval
pub(crate) ping_interval: Duration,
/// Stop handle.
pub(crate) stop_handle: StopHandle,
/// Connection ID
Expand Down Expand Up @@ -754,8 +700,8 @@ struct ProcessConnection<L> {
batch_requests_config: BatchRequestConfig,
/// Subscription ID provider.
id_provider: Arc<dyn IdProvider>,
/// Ping config.
ping_config: PingConfig,
/// Ping interval
ping_interval: Duration,
/// Stop handle.
stop_handle: StopHandle,
/// Max connections,
Expand Down Expand Up @@ -823,7 +769,7 @@ fn process_connection<'a, L: Logger, B, U>(
max_subscriptions_per_connection: cfg.max_subscriptions_per_connection,
batch_requests_config: cfg.batch_requests_config,
id_provider: cfg.id_provider,
ping_config: cfg.ping_config,
ping_interval: cfg.ping_interval,
stop_handle: cfg.stop_handle.clone(),
conn_id: cfg.conn_id,
logger: cfg.logger,
Expand Down
3 changes: 1 addition & 2 deletions server/src/tests/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,8 +880,7 @@ async fn server_with_infinite_call(
) -> (crate::ServerHandle, std::net::SocketAddr) {
let server = ServerBuilder::default()
// Make sure that the ping_interval doesn't force the connection to be closed
.ping_interval(crate::server::PingConfig::WithoutInactivityCheck(timeout))
.unwrap()
.ping_interval(timeout)
.build("127.0.0.1:0")
.with_default_timeout()
.await
Expand Down
89 changes: 30 additions & 59 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;

use crate::logger::{self, Logger, TransportProtocol};
use crate::server::{BatchRequestConfig, ServiceData};
use crate::PingConfig;

use futures_util::future::{self, Either, Fuse};
use futures_util::future::{self, Either};
use futures_util::io::{BufReader, BufWriter};
use futures_util::stream::FuturesOrdered;
use futures_util::{Future, FutureExt, StreamExt};
Expand Down Expand Up @@ -235,7 +234,7 @@ pub(crate) async fn background_task<L: Logger>(sender: Sender, mut receiver: Rec
batch_requests_config,
stop_handle,
id_provider,
ping_config,
ping_interval,
conn_id,
logger,
remote_addr,
Expand All @@ -255,7 +254,7 @@ pub(crate) async fn background_task<L: Logger>(sender: Sender, mut receiver: Rec
let (pending_calls, pending_calls_completed) = mpsc::channel::<()>(1);

// Spawn another task that sends out the responses on the Websocket.
let send_task_handle = tokio::spawn(send_task(rx, sender, ping_config.ping_interval(), conn_rx));
let send_task_handle = tokio::spawn(send_task(rx, sender, ping_interval, conn_rx));

// Buffer for incoming data.
let mut data = Vec::with_capacity(100);
Expand Down Expand Up @@ -284,14 +283,14 @@ pub(crate) async fn background_task<L: Logger>(sender: Sender, mut receiver: Rec
// Thus, this check enforces that if the client can't keep up with receiving messages,
// then no new messages will be read from them.
//
// TCP retransmission mechanism will take care of the rest and adjust the window size accordingly.
// TCP retransmission mechanism will take of the rest and adjust the window size accordingly.
let Some(stop) = wait_until_connection_buffer_has_capacity(&sink, stopped).await else {
break Ok(Shutdown::ConnectionClosed);
};

stopped = stop;

match try_recv(&mut receiver, &mut data, stopped, ping_config).await {
match try_recv(&mut receiver, &mut data, stopped).await {
Receive::Shutdown => break Ok(Shutdown::Stopped),
Receive::Ok(stop) => {
stopped = stop;
Expand Down Expand Up @@ -382,8 +381,7 @@ async fn send_task(
}

// Handle timer intervals.
Either::Right((Either::Left((_instant, _stopped)), next_rx)) => {
stop = _stopped;
Either::Right((Either::Left((Some(_instant), stop)), next_rx)) => {
if let Err(err) = send_ping(&mut ws_sender).await {
tracing::debug!("WS transport error: send ping failed: {}", err);
break;
Expand All @@ -392,8 +390,11 @@ async fn send_task(
rx_item = next_rx;
futs = future::select(ping_interval.next(), stop);
}
Either::Right((Either::Right((_stopped, _)), _)) => {
// server has stopped

Either::Right((Either::Left((None, _)), _)) => unreachable!("IntervalStream never terminates"),

// Server is stopped.
Either::Right((Either::Right(_), _)) => {
break;
}
}
Expand Down Expand Up @@ -427,61 +428,31 @@ where
}

/// Attempts to read data from WebSocket fails if the server was stopped.
async fn try_recv<S>(receiver: &mut Receiver, data: &mut Vec<u8>, mut stopped: S, ping_config: PingConfig) -> Receive<S>
async fn try_recv<S>(receiver: &mut Receiver, data: &mut Vec<u8>, stopped: S) -> Receive<S>
where
S: Future<Output = ()> + Unpin,
{
let mut last_active = Instant::now();

let receive = futures_util::stream::unfold((receiver, data), |(receiver, data)| async {
match receiver.receive(data).await {
Ok(soketto::Incoming::Data(_)) => None,
Ok(soketto::Incoming::Pong(_)) => Some((Ok(()), (receiver, data))),
Ok(soketto::Incoming::Closed(_)) => Some((Err(SokettoError::Closed), (receiver, data))),
// The closing reason is already logged by `soketto` trace log level.
// Return the `Closed` error to avoid logging unnecessary warnings on clean shutdown.
Err(e) => Some((Err(e), (receiver, data))),
let receive = async {
// Identical loop to `soketto::receive_data` with debug logs for `Pong` frames.
loop {
match receiver.receive(data).await? {
soketto::Incoming::Data(d) => break Ok(d),
soketto::Incoming::Pong(_) => tracing::debug!("Received pong"),
soketto::Incoming::Closed(_) => {
// The closing reason is already logged by `soketto` trace log level.
// Return the `Closed` error to avoid logging unnecessary warnings on clean shutdown.
break Err(SokettoError::Closed);
}
}
}
});
};

tokio::pin!(receive);

let inactivity_check =
Box::pin(ping_config.inactive_limit().map(|d| tokio::time::sleep(d).fuse()).unwrap_or_else(Fuse::terminated));
let mut futs = futures_util::future::select(receive.next(), inactivity_check);

loop {
match futures_util::future::select(futs, stopped).await {
// The message has been received, we are done
Either::Left((Either::Left((None, _)), s)) => break Receive::Ok(s),
// Got a pong response, update our "last seen" timestamp.
Either::Left((Either::Left((Some(Ok(())), inactive)), s)) => {
last_active = Instant::now();
stopped = s;
futs = futures_util::future::select(receive.next(), inactive);
}
// Received an error, terminate the connection.
Either::Left((Either::Left((Some(Err(e)), _)), s)) => break Receive::Err(e, s),
// Max inactivity timeout fired, check if the connection has been idle too long.
Either::Left((Either::Right((_instant, rcv)), s)) => {
let inactive_limit_exceeded =
ping_config.inactive_limit().map_or(false, |duration| last_active.elapsed() > duration);

if inactive_limit_exceeded {
break Receive::Err(SokettoError::Closed, s);
}

stopped = s;
// use really large duration instead of Duration::MAX to
// solve the panic issue with interval initialization
let inactivity_check = Box::pin(
ping_config.inactive_limit().map(|d| tokio::time::sleep(d).fuse()).unwrap_or_else(Fuse::terminated),
);
futs = futures_util::future::select(rcv, inactivity_check);
}
// Server has been stopped.
Either::Right(_) => break Receive::Shutdown,
}
match futures_util::future::select(receive, stopped).await {
Either::Left((Ok(_), s)) => Receive::Ok(s),
Either::Left((Err(e), s)) => Receive::Err(e, s),
Either::Right(_) => Receive::Shutdown,
}
}

Expand Down
2 changes: 1 addition & 1 deletion types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ mod tests {
let data = serde_json::value::to_raw_value(&"\\\"validate_transaction\\\"").unwrap();
let exp = ErrorObject::borrowed(
1002,
"desc: \"Could not decode `ChargeAssetTxPayment::asset_id`\" } })",
&"desc: \"Could not decode `ChargeAssetTxPayment::asset_id`\" } })",
Some(&*data),
);

Expand Down

0 comments on commit 146dfd0

Please sign in to comment.