Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/curp client retry #584

Merged
merged 5 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions curp/src/client_new/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
/// Unary rpc client
mod unary;

/// Retry layer
mod retry;

use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
Expand Down Expand Up @@ -71,20 +74,20 @@

/// Fetch leader id
#[inline]
async fn fetch_leader_id(&self, linearizable: bool) -> Result<ServerId, Self::Error> {
if linearizable {
let resp = self.fetch_cluster(true).await?;
return Ok(resp.leader_id.unwrap_or_else(|| {
unreachable!("linearizable fetch cluster should return a leader id")
}));
}
let resp = self.fetch_cluster(false).await?;
if let Some(id) = resp.leader_id {
return Ok(id);
}
// fallback to linearizable fetch
self.fetch_leader_id(true).await
}

Check warning on line 90 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L77-L90

Added lines #L77 - L90 were not covered by tests
}

/// Update leader state
Expand All @@ -95,7 +98,7 @@
}

/// Client builder to build a client
#[derive(Debug, Clone)]

Check warning on line 101 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L101

Added line #L101 was not covered by tests
pub struct ClientBuilder {
/// local server id
local_server_id: Option<ServerId>,
Expand All @@ -111,30 +114,30 @@
/// Create a client builder
#[inline]
#[must_use]
pub fn new(config: ClientConfig) -> Self {
Self {
local_server_id: None,
config,
all_members: None,
cluster_version: None,
}
}

Check warning on line 124 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L117-L124

Added lines #L117 - L124 were not covered by tests

/// Set the local server id
#[inline]
#[must_use]
pub fn local_server_id(&mut self, id: ServerId) -> &mut Self {
self.local_server_id = Some(id);
self
}

Check warning on line 132 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L129-L132

Added lines #L129 - L132 were not covered by tests

/// Set the initial cluster version
#[inline]
#[must_use]
pub fn cluster_version(&mut self, cluster_version: u64) -> &mut Self {
self.cluster_version = Some(cluster_version);
self
}

Check warning on line 140 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L137-L140

Added lines #L137 - L140 were not covered by tests

/// Fetch initial all members from some endpoints if you do not know the whole members
///
Expand All @@ -142,47 +145,47 @@
///
/// Return `tonic::Status` for connection failure or some server errors.
#[inline]
pub async fn fetch_all_members(
&mut self,
addrs: Vec<String>,
) -> Result<&mut Self, tonic::Status> {
let propose_timeout = *self.config.propose_timeout();
let mut futs: FuturesUnordered<_> = addrs
.into_iter()
.map(|mut addr| {
if !addr.starts_with("http://") {
addr.insert_str(0, "http://");
}
async move {
let mut protocol_client = ProtocolClient::connect(addr).await.map_err(|e| {
tonic::Status::cancelled(format!("cannot connect to addr, error: {e}"))
})?;
let mut req = tonic::Request::new(FetchClusterRequest::default());
req.set_timeout(propose_timeout);
let fetch_cluster_res = protocol_client.fetch_cluster(req).await?.into_inner();
Ok::<FetchClusterResponse, tonic::Status>(fetch_cluster_res)
}
})
.collect();
let mut err = tonic::Status::invalid_argument("addrs is empty");
while let Some(r) = futs.next().await {
match r {
Ok(r) => {
self.cluster_version = Some(r.cluster_version);
self.all_members = Some(r.into_members_addrs());
return Ok(self);

Check warning on line 176 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L148-L176

Added lines #L148 - L176 were not covered by tests
}
Err(e) => err = e,

Check warning on line 178 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L178

Added line #L178 was not covered by tests
}
}
Err(err)
}

Check warning on line 182 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L181-L182

Added lines #L181 - L182 were not covered by tests

/// Set the initial all members
#[inline]
#[must_use]
pub fn set_all_members(&mut self, all_members: HashMap<ServerId, Vec<String>>) -> &mut Self {
self.all_members = Some(all_members);
self
}

Check warning on line 190 in curp/src/client_new/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/mod.rs#L187-L190

Added lines #L187 - L190 were not covered by tests
}
247 changes: 247 additions & 0 deletions curp/src/client_new/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
#![allow(unused)] // TODO: remove

use std::{marker::PhantomData, ops::SubAssign, sync::Arc, time::Duration};

use async_trait::async_trait;
use curp_external_api::cmd::Command;
use futures::Future;
use tracing::warn;

use crate::{
members::ServerId,
rpc::{
connect::ConnectApi, ConfChange, CurpError, FetchClusterResponse, Member, ReadState,
Redirect,
},
};

use super::{ClientApi, LeaderStateUpdate, ProposeResponse};

/// Backoff config
#[derive(Debug, Clone)]

Check warning on line 21 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L21

Added line #L21 was not covered by tests
pub(super) enum BackoffConfig {
/// A fixed delay backoff
Fixed,
/// A exponential delay backoff
Exponential {
/// Control the max delay of exponential
max_delay: Duration,
},
}

/// Retry config to control the retry policy
#[derive(Debug, Clone)]

Check warning on line 33 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L33

Added line #L33 was not covered by tests
pub(super) struct RetryConfig {
/// Backoff config
backoff: BackoffConfig,
/// Initial delay
delay: Duration,
/// Retry count
count: usize,
}

/// Backoff tool
#[derive(Debug)]

Check warning on line 44 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L44

Added line #L44 was not covered by tests
struct Backoff {
/// The retry config
config: RetryConfig,
/// Current delay
cur_delay: Duration,
/// Total RPC count
count: usize,
}

impl RetryConfig {
/// Create a fixed retry config
fn new_fixed(delay: Duration, count: usize) -> Self {
assert!(count > 0, "retry count should be larger than 0");
iGxnon marked this conversation as resolved.
Show resolved Hide resolved
Self {
backoff: BackoffConfig::Fixed,
delay,
count,
}
}

Check warning on line 63 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L56-L63

Added lines #L56 - L63 were not covered by tests

/// Create a exponential retry config
fn new_exponential(delay: Duration, max_delay: Duration, count: usize) -> Self {
assert!(count > 0, "retry count should be larger than 0");
Self {
backoff: BackoffConfig::Exponential { max_delay },
delay,
count,
}
}

Check warning on line 73 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L66-L73

Added lines #L66 - L73 were not covered by tests

/// Create a backoff process
fn init_backoff(&self) -> Backoff {
Backoff {
config: self.clone(),
cur_delay: self.delay,
count: self.count,
}
}

Check warning on line 82 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L76-L82

Added lines #L76 - L82 were not covered by tests
}

impl Backoff {
/// Get the next delay duration, None means the end.
fn next_delay(&mut self) -> Option<Duration> {
if self.count == 0 {
return None;
}
self.count.sub_assign(1);
let mut cur = self.cur_delay;
if let BackoffConfig::Exponential { max_delay } = self.config.backoff {
self.cur_delay = self
.cur_delay
.checked_mul(2)
.unwrap_or(self.cur_delay)
.min(max_delay);
}
Some(cur)
}

Check warning on line 101 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L87-L101

Added lines #L87 - L101 were not covered by tests
}

/// The retry client automatically retry the requests of the inner client api
/// which raises the [`tonic::Status`] error
#[derive(Debug)]

Check warning on line 106 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L106

Added line #L106 was not covered by tests
pub(super) struct Retry<Api> {
/// Inner client
inner: Api,
/// Retry config
config: RetryConfig,
}

impl<Api> Retry<Api>
where
Api: ClientApi<Error = CurpError> + LeaderStateUpdate + Send + Sync + 'static,
{
/// Create a retry client
fn new(inner: Api, config: RetryConfig) -> Self {
Self { inner, config }
}

Check warning on line 121 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L119-L121

Added lines #L119 - L121 were not covered by tests

/// Takes a function f and run retry.
async fn retry<'a, R, F>(&'a self, f: impl Fn(&'a Api) -> F) -> Result<R, tonic::Status>
where
F: Future<Output = Result<R, CurpError>>,
{
let mut backoff = self.config.init_backoff();
while let Some(delay) = backoff.next_delay() {
let err = match f(&self.inner).await {
Ok(res) => return Ok(res),
Err(err) => err,
};

match err {

Check warning on line 135 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L124-L135

Added lines #L124 - L135 were not covered by tests
// some errors that should not retry
CurpError::Duplicated(_)
| CurpError::ShuttingDown(_)
| CurpError::InvalidConfig(_)
| CurpError::NodeNotExists(_)
| CurpError::NodeAlreadyExists(_)
| CurpError::LearnerNotCatchUp(_) => {
return Err(tonic::Status::from(err));

Check warning on line 143 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L143

Added line #L143 was not covered by tests
}

// some errors that could have a retry
CurpError::ExpiredClientId(_)
| CurpError::KeyConflict(_)
| CurpError::RpcTransport(_)
| CurpError::Internal(_) => {}

Check warning on line 150 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L150

Added line #L150 was not covered by tests

// update the cluster state if got WrongClusterVersion
CurpError::WrongClusterVersion(_) => {
// the inner client should automatically update cluster state when fetch_cluster
if let Err(e) = self.inner.fetch_cluster(false).await {
warn!("fetch cluster failed, error {e:?}");
}

Check warning on line 157 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L155-L157

Added lines #L155 - L157 were not covered by tests
}

// update the leader state if got Redirect
CurpError::Redirect(Redirect { leader_id, term }) => {
self.inner.update_leader(leader_id, term);
}

Check warning on line 163 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L161-L163

Added lines #L161 - L163 were not covered by tests
}

warn!("retry on {} seconds later", delay.as_secs_f32());
tokio::time::sleep(delay).await;

Check warning on line 167 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L166-L167

Added lines #L166 - L167 were not covered by tests
}

Err(tonic::Status::deadline_exceeded("request timeout"))
}

Check warning on line 171 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L170-L171

Added lines #L170 - L171 were not covered by tests
}

#[async_trait]
impl<Api> ClientApi for Retry<Api>
where
Api: ClientApi<Error = CurpError> + LeaderStateUpdate + Send + Sync + 'static,
{
/// The client error
type Error = tonic::Status;

/// Inherit the command type
type Cmd = Api::Cmd;

/// Get the local connection when the client is on the server node.
async fn local_connect(&self) -> Option<Arc<dyn ConnectApi>> {
self.inner.local_connect().await
}

Check warning on line 188 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L186-L188

Added lines #L186 - L188 were not covered by tests

/// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered
/// requests (event the requests are commutative).
async fn propose(
&self,
cmd: &Self::Cmd,
use_fast_path: bool,
) -> Result<ProposeResponse<Self::Cmd>, tonic::Status> {
self.retry::<_, _>(|client| client.propose(cmd, use_fast_path))
.await
}

Check warning on line 199 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L192-L199

Added lines #L192 - L199 were not covered by tests

/// Send propose configuration changes to the cluster
async fn propose_conf_change(
&self,
changes: Vec<ConfChange>,
) -> Result<Vec<Member>, tonic::Status> {
self.retry::<_, _>(|client| {
let changes_c = changes.clone();
async move { client.propose_conf_change(changes_c).await }
})
.await
}

Check warning on line 211 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L202-L211

Added lines #L202 - L211 were not covered by tests

/// Send propose to shutdown cluster
async fn propose_shutdown(&self) -> Result<(), tonic::Status> {
self.retry::<_, _>(ClientApi::propose_shutdown).await
}

Check warning on line 216 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L214-L216

Added lines #L214 - L216 were not covered by tests

/// Send propose to publish a node id and name
async fn propose_publish(
&self,
node_id: ServerId,
node_name: String,
) -> Result<(), Self::Error> {
self.retry::<_, _>(|client| {
let name_c = node_name.clone();
async move { client.propose_publish(node_id, name_c).await }
})
.await
}

Check warning on line 229 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L219-L229

Added lines #L219 - L229 were not covered by tests

/// Send fetch read state from leader
async fn fetch_read_state(&self, cmd: &Self::Cmd) -> Result<ReadState, tonic::Status> {
self.retry::<_, _>(|client| client.fetch_read_state(cmd))
.await
}

Check warning on line 235 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L232-L235

Added lines #L232 - L235 were not covered by tests

/// Send fetch cluster requests to all servers (That's because initially, we didn't
/// know who the leader is.)
/// Note: The fetched cluster may still be outdated if `linearizable` is false
async fn fetch_cluster(
&self,
linearizable: bool,
) -> Result<FetchClusterResponse, tonic::Status> {
self.retry::<_, _>(|client| client.fetch_cluster(linearizable))
.await
}

Check warning on line 246 in curp/src/client_new/retry.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/retry.rs#L240-L246

Added lines #L240 - L246 were not covered by tests
}
13 changes: 10 additions & 3 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,8 @@
}

/// Get the priority of the error
pub(crate) fn priority(&self) -> CurpErrorPriority {
match *self {

Check warning on line 640 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L639-L640

Added lines #L639 - L640 were not covered by tests
CurpError::Duplicated(_)
| CurpError::ShuttingDown(_)
| CurpError::InvalidConfig(_)
Expand All @@ -645,23 +645,30 @@
| CurpError::NodeNotExists(_)
| CurpError::LearnerNotCatchUp(_)
| CurpError::ExpiredClientId(_)
| CurpError::Redirect(_) => CurpErrorPriority::ReturnImmediately,
CurpError::WrongClusterVersion(_) => CurpErrorPriority::High,

Check warning on line 649 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L648-L649

Added lines #L648 - L649 were not covered by tests
CurpError::RpcTransport(_) | CurpError::Internal(_) | CurpError::KeyConflict(_) => {
CurpErrorPriority::Low

Check warning on line 651 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L651

Added line #L651 was not covered by tests
}
}
}

Check warning on line 654 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L654

Added line #L654 was not covered by tests
}

/// The priority of curp error, indicate which error should be handled in retry layer
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]

Check warning on line 658 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L658

Added line #L658 was not covered by tests
pub(crate) enum CurpErrorPriority {
/// Low priority
/// Low priority, in multiple sequenced RPCs, if preceding RPCs returns
/// a low-priority error, will not exit prematurely. In concurrent RPCs,
/// a low-priority error returned may be overridden by a higher-priority error.
Low = 1,
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
/// High priority
/// High priority, in multiple sequenced RPCs, if preceding RPCs returns
/// a high-priority error, it will exit early, preventing next RPCs from
/// proceeding. In concurrent RPCs, high-priority errors will override
/// low-priority errors.
High = 2,
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
/// Should be returned immediately if any server response it
/// Should be returned immediately if any server response it. If requests are
/// sent to multiple servers, the request that does not have received a
/// response will be terminated immediately.
ReturnImmediately = 3,
}

Expand Down Expand Up @@ -783,7 +790,7 @@
pub(crate) fn is_conflict_with_cmd(&self, c: &C) -> bool {
match self.inner {
PoolEntryInner::Command(ref cmd) => cmd.is_conflict(c),
PoolEntryInner::ConfChange(ref _conf_change) => true,

Check warning on line 793 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L793

Added line #L793 was not covered by tests
}
}
}
Expand All @@ -794,7 +801,7 @@
{
fn is_conflict(&self, other: &Self) -> bool {
let PoolEntryInner::Command(ref cmd1) = self.inner else {
return true;

Check warning on line 804 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L804

Added line #L804 was not covered by tests
};
let PoolEntryInner::Command(ref cmd2) = other.inner else {
return true;
Expand Down
Loading