From 2903c2f415f0c668fcc47b3a8e08380d08431709 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Thu, 21 Dec 2023 13:46:23 +0800 Subject: [PATCH 1/5] feat: implement retry layer Signed-off-by: iGxnon --- curp/src/client_new/mod.rs | 3 + curp/src/client_new/retry.rs | 256 +++++++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+) create mode 100644 curp/src/client_new/retry.rs diff --git a/curp/src/client_new/mod.rs b/curp/src/client_new/mod.rs index b398d9e72..d7d65079e 100644 --- a/curp/src/client_new/mod.rs +++ b/curp/src/client_new/mod.rs @@ -4,6 +4,9 @@ /// Unary rpc client mod unary; +/// Retry layer +mod retry; + use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; diff --git a/curp/src/client_new/retry.rs b/curp/src/client_new/retry.rs new file mode 100644 index 000000000..fba6bc661 --- /dev/null +++ b/curp/src/client_new/retry.rs @@ -0,0 +1,256 @@ +#![allow(unused)] // TODO: remove + +use std::{marker::PhantomData, ops::SubAssign, sync::Arc, time::Duration}; + +use async_trait::async_trait; +use curp_external_api::cmd::Command; +use futures::Future; +use tracing::warn; + +use crate::{ + members::ServerId, + rpc::{ + connect::ConnectApi, ConfChange, CurpError, FetchClusterResponse, Member, ReadState, + Redirect, + }, +}; + +use super::{ClientApi, LeaderStateUpdate, ProposeResponse}; + +/// Backoff config +#[derive(Debug, Clone)] +pub(super) enum BackoffConfig { + /// A fixed delay backoff + Fixed, + /// A exponential delay backoff + Exponential { + /// Control the max delay of exponential + max_delay: Duration, + }, +} + +/// Retry config to control the retry policy +#[derive(Debug, Clone)] +pub(super) struct RetryConfig { + /// Backoff config + backoff: BackoffConfig, + /// Initial delay + delay: Duration, + /// Retry count + count: usize, + /// Enable jitter to randomize the delay to avoid thundering herd + jitter: bool, +} + +/// Backoff tool +#[derive(Debug)] +struct Backoff { + /// The retry config + config: RetryConfig, + /// Current delay + cur_delay: Duration, +} + +impl RetryConfig { + /// Create a fixed retry config + fn new_fixed(delay: Duration, count: usize, jitter: bool) -> Self { + assert!(count > 0, "retry count should be larger than 0"); + Self { + backoff: BackoffConfig::Fixed, + delay, + count, + jitter, + } + } + + /// Create a exponential retry config + fn new_exponential(delay: Duration, max_delay: Duration, count: usize, jitter: bool) -> Self { + assert!(count > 0, "retry count should be larger than 0"); + Self { + backoff: BackoffConfig::Exponential { max_delay }, + delay, + count, + jitter, + } + } + + /// Create a backoff process + fn init_backoff(&self) -> Backoff { + Backoff { + config: self.clone(), + cur_delay: self.delay, + } + } +} + +impl Backoff { + /// Get the next delay duration, None means the end. + fn next_delay(&mut self) -> Option { + if self.config.count == 0 { + return None; + } + self.config.count.sub_assign(1); + let mut cur = self.cur_delay; + if let BackoffConfig::Exponential { max_delay } = self.config.backoff { + self.cur_delay = self + .cur_delay + .checked_mul(2) + .unwrap_or(self.cur_delay) + .min(max_delay); + } + #[allow(clippy::float_arithmetic)] // It is always correct. + if self.config.jitter { + // jitter algorithm will randomly pick a delay between [0.5 * delay, 1.5 * delay) + let per: f32 = rand::random(); + let cur_sec = cur.as_secs_f32() * (0.5 + per); + cur = Duration::from_secs_f32(cur_sec); + } + Some(cur) + } +} + +/// The retry client automatically retry the requests of the inner client api +/// which raises the [`tonic::Status`] error +#[derive(Debug)] +pub(super) struct Retry { + /// Inner client + inner: Api, + /// Retry config + config: RetryConfig, +} + +impl Retry +where + Api: ClientApi + LeaderStateUpdate + Send + Sync + 'static, +{ + /// Create a retry client + fn new(inner: Api, config: RetryConfig) -> Self { + Self { inner, config } + } + + /// Takes a function f and run retry. + async fn retry<'a, R, F>(&'a self, f: impl Fn(&'a Api) -> F) -> Result + where + F: Future>, + { + let mut backoff = self.config.init_backoff(); + while let Some(delay) = backoff.next_delay() { + let err = match f(&self.inner).await { + Ok(res) => return Ok(res), + Err(err) => err, + }; + + match err { + // some errors that should not retry + CurpError::Duplicated(_) + | CurpError::ShuttingDown(_) + | CurpError::InvalidConfig(_) + | CurpError::NodeNotExists(_) + | CurpError::NodeAlreadyExists(_) + | CurpError::LearnerNotCatchUp(_) => { + return Err(tonic::Status::from(err)); + } + + // some errors that could have a retry + CurpError::ExpiredClientId(_) + | CurpError::KeyConflict(_) + | CurpError::RpcTransport(_) + | CurpError::Internal(_) => {} + + // update the cluster state if got WrongClusterVersion + CurpError::WrongClusterVersion(_) => { + // the inner client should automatically update cluster state when fetch_cluster + if let Err(e) = self.inner.fetch_cluster(false).await { + warn!("fetch cluster failed, error {e:?}"); + } + } + + // update the leader state if got Redirect + CurpError::Redirect(Redirect { leader_id, term }) => { + self.inner.update_leader(leader_id, term); + } + } + + warn!("retry on {} seconds later", delay.as_secs_f32()); + tokio::time::sleep(delay).await; + } + + Err(tonic::Status::deadline_exceeded("request timeout")) + } +} + +#[async_trait] +impl ClientApi for Retry +where + Api: ClientApi + LeaderStateUpdate + Send + Sync + 'static, +{ + /// The client error + type Error = tonic::Status; + + /// Inherit the command type + type Cmd = Api::Cmd; + + /// Get the local connection when the client is on the server node. + async fn local_connect(&self) -> Option> { + self.inner.local_connect().await + } + + /// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered + /// requests (event the requests are commutative). + async fn propose( + &self, + cmd: &Self::Cmd, + use_fast_path: bool, + ) -> Result, tonic::Status> { + self.retry::<_, _>(|client| async move { client.propose(cmd, use_fast_path).await }) + .await + } + + /// Send propose configuration changes to the cluster + async fn propose_conf_change( + &self, + changes: Vec, + ) -> Result, tonic::Status> { + self.retry::<_, _>(|client| { + let changes_c = changes.clone(); + async move { client.propose_conf_change(changes_c).await } + }) + .await + } + + /// Send propose to shutdown cluster + async fn propose_shutdown(&self) -> Result<(), tonic::Status> { + self.retry::<_, _>(|client| async move { client.propose_shutdown().await }) + .await + } + + /// Send propose to publish a node id and name + async fn propose_publish( + &self, + node_id: ServerId, + node_name: String, + ) -> Result<(), Self::Error> { + self.retry::<_, _>(|client| { + let name_c = node_name.clone(); + async move { client.propose_publish(node_id, name_c).await } + }) + .await + } + + /// Send fetch read state from leader + async fn fetch_read_state(&self, cmd: &Self::Cmd) -> Result { + self.retry::<_, _>(|client| async move { client.fetch_read_state(cmd).await }) + .await + } + + /// Send fetch cluster requests to all servers (That's because initially, we didn't + /// know who the leader is.) + /// Note: The fetched cluster may still be outdated if `linearizable` is false + async fn fetch_cluster( + &self, + linearizable: bool, + ) -> Result { + self.retry::<_, _>(|client| async move { client.fetch_cluster(linearizable).await }) + .await + } +} From e0ee787ebece0f069d58754bfd29208555ecbaaa Mon Sep 17 00:00:00 2001 From: iGxnon Date: Mon, 25 Dec 2023 15:06:38 +0800 Subject: [PATCH 2/5] chore: remove redundant async move block Signed-off-by: iGxnon --- curp/src/client_new/retry.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/curp/src/client_new/retry.rs b/curp/src/client_new/retry.rs index fba6bc661..fc84a4c45 100644 --- a/curp/src/client_new/retry.rs +++ b/curp/src/client_new/retry.rs @@ -202,7 +202,7 @@ where cmd: &Self::Cmd, use_fast_path: bool, ) -> Result, tonic::Status> { - self.retry::<_, _>(|client| async move { client.propose(cmd, use_fast_path).await }) + self.retry::<_, _>(|client| client.propose(cmd, use_fast_path)) .await } @@ -220,8 +220,7 @@ where /// Send propose to shutdown cluster async fn propose_shutdown(&self) -> Result<(), tonic::Status> { - self.retry::<_, _>(|client| async move { client.propose_shutdown().await }) - .await + self.retry::<_, _>(ClientApi::propose_shutdown).await } /// Send propose to publish a node id and name @@ -239,7 +238,7 @@ where /// Send fetch read state from leader async fn fetch_read_state(&self, cmd: &Self::Cmd) -> Result { - self.retry::<_, _>(|client| async move { client.fetch_read_state(cmd).await }) + self.retry::<_, _>(|client| client.fetch_read_state(cmd)) .await } @@ -250,7 +249,7 @@ where &self, linearizable: bool, ) -> Result { - self.retry::<_, _>(|client| async move { client.fetch_cluster(linearizable).await }) + self.retry::<_, _>(|client| client.fetch_cluster(linearizable)) .await } } From 8875121adb56c13bb95027471509798812c0f94d Mon Sep 17 00:00:00 2001 From: iGxnon Date: Mon, 25 Dec 2023 19:05:21 +0800 Subject: [PATCH 3/5] chore: update RPC count field Signed-off-by: iGxnon --- curp/src/client_new/retry.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/curp/src/client_new/retry.rs b/curp/src/client_new/retry.rs index fc84a4c45..989508196 100644 --- a/curp/src/client_new/retry.rs +++ b/curp/src/client_new/retry.rs @@ -49,6 +49,8 @@ struct Backoff { config: RetryConfig, /// Current delay cur_delay: Duration, + /// Total RPC count + count: usize, } impl RetryConfig { @@ -79,6 +81,7 @@ impl RetryConfig { Backoff { config: self.clone(), cur_delay: self.delay, + count: self.count, } } } @@ -86,10 +89,10 @@ impl RetryConfig { impl Backoff { /// Get the next delay duration, None means the end. fn next_delay(&mut self) -> Option { - if self.config.count == 0 { + if self.count == 0 { return None; } - self.config.count.sub_assign(1); + self.count.sub_assign(1); let mut cur = self.cur_delay; if let BackoffConfig::Exponential { max_delay } = self.config.backoff { self.cur_delay = self From e79270d5ce1b9f624c1ff767ae23d207f5d2f2cc Mon Sep 17 00:00:00 2001 From: iGxnon Date: Fri, 29 Dec 2023 14:28:53 +0800 Subject: [PATCH 4/5] chore: remove jitter Signed-off-by: iGxnon --- curp/src/client_new/retry.rs | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/curp/src/client_new/retry.rs b/curp/src/client_new/retry.rs index 989508196..a33ac316b 100644 --- a/curp/src/client_new/retry.rs +++ b/curp/src/client_new/retry.rs @@ -38,8 +38,6 @@ pub(super) struct RetryConfig { delay: Duration, /// Retry count count: usize, - /// Enable jitter to randomize the delay to avoid thundering herd - jitter: bool, } /// Backoff tool @@ -55,24 +53,22 @@ struct Backoff { impl RetryConfig { /// Create a fixed retry config - fn new_fixed(delay: Duration, count: usize, jitter: bool) -> Self { + fn new_fixed(delay: Duration, count: usize) -> Self { assert!(count > 0, "retry count should be larger than 0"); Self { backoff: BackoffConfig::Fixed, delay, count, - jitter, } } /// Create a exponential retry config - fn new_exponential(delay: Duration, max_delay: Duration, count: usize, jitter: bool) -> Self { + fn new_exponential(delay: Duration, max_delay: Duration, count: usize) -> Self { assert!(count > 0, "retry count should be larger than 0"); Self { backoff: BackoffConfig::Exponential { max_delay }, delay, count, - jitter, } } @@ -101,13 +97,6 @@ impl Backoff { .unwrap_or(self.cur_delay) .min(max_delay); } - #[allow(clippy::float_arithmetic)] // It is always correct. - if self.config.jitter { - // jitter algorithm will randomly pick a delay between [0.5 * delay, 1.5 * delay) - let per: f32 = rand::random(); - let cur_sec = cur.as_secs_f32() * (0.5 + per); - cur = Duration::from_secs_f32(cur_sec); - } Some(cur) } } From f62a6d39152b4e06a79a2883dad16e6497ee8706 Mon Sep 17 00:00:00 2001 From: iGxnon Date: Tue, 2 Jan 2024 11:41:15 +0800 Subject: [PATCH 5/5] chore: add comments Signed-off-by: iGxnon --- curp/src/rpc/mod.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index 5fa7f2e59..c1d4edc25 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -657,11 +657,18 @@ impl CurpError { /// The priority of curp error, indicate which error should be handled in retry layer #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] pub(crate) enum CurpErrorPriority { - /// Low priority + /// Low priority, in multiple sequenced RPCs, if preceding RPCs returns + /// a low-priority error, will not exit prematurely. In concurrent RPCs, + /// a low-priority error returned may be overridden by a higher-priority error. Low = 1, - /// High priority + /// High priority, in multiple sequenced RPCs, if preceding RPCs returns + /// a high-priority error, it will exit early, preventing next RPCs from + /// proceeding. In concurrent RPCs, high-priority errors will override + /// low-priority errors. High = 2, - /// Should be returned immediately if any server response it + /// Should be returned immediately if any server response it. If requests are + /// sent to multiple servers, the request that does not have received a + /// response will be terminated immediately. ReturnImmediately = 3, }