Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix cluster version update #590

Merged
merged 4 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions curp/src/client_new/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
rpc::{
self,
connect::{BypassedConnect, ConnectApi},
ConfChange, CurpError, FetchClusterRequest, FetchClusterResponse, FetchReadStateRequest,
Member, ProposeConfChangeRequest, ProposeId, ProposeRequest, Protocol, PublishRequest,
ReadState, ShutdownRequest, WaitSyncedRequest,
ConfChange, CurpError, CurpErrorPriority, FetchClusterRequest, FetchClusterResponse,
FetchReadStateRequest, Member, ProposeConfChangeRequest, ProposeId, ProposeRequest,
Protocol, PublishRequest, ReadState, ShutdownRequest, WaitSyncedRequest,
},
};

Expand All @@ -44,18 +44,18 @@
}

impl std::fmt::Debug for State {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StateInner")
.field("leader", &self.leader)
.field("term", &self.term)
.field("cluster_version", &self.cluster_version)
.field("connects", &self.connects.keys())
.finish()
}

Check warning on line 54 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L47-L54

Added lines #L47 - L54 were not covered by tests
}

/// The unary client config
#[derive(Debug)]

Check warning on line 58 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L58

Added line #L58 was not covered by tests
pub(super) struct UnaryConfig {
/// The rpc timeout of a propose request
propose_timeout: Duration,
Expand All @@ -67,24 +67,24 @@

impl UnaryConfig {
/// Create a unary config
fn new(propose_timeout: Duration) -> Self {
Self {
propose_timeout,
wait_synced_timeout: propose_timeout * 2,
}
}

Check warning on line 75 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L70-L75

Added lines #L70 - L75 were not covered by tests

/// Create a unary config
fn new_full(propose_timeout: Duration, wait_synced_timeout: Duration) -> Self {
Self {
propose_timeout,
wait_synced_timeout,
}
}

Check warning on line 83 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L78-L83

Added lines #L78 - L83 were not covered by tests
}

/// The unary client
#[derive(Debug)]

Check warning on line 87 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L87

Added line #L87 was not covered by tests
pub(super) struct Unary<C: Command> {
/// Client state
state: RwLock<State>,
Expand All @@ -98,117 +98,117 @@

impl<C: Command> Unary<C> {
/// Update leader
fn check_and_update_leader(state: &mut State, leader_id: Option<ServerId>, term: u64) -> bool {
match state.term.cmp(&term) {

Check warning on line 102 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L101-L102

Added lines #L101 - L102 were not covered by tests
Ordering::Less => {
// reset term only when the resp has leader id to prevent:
// If a server loses contact with its leader, it will update its term for election. Since other servers are all right, the election will not succeed.
// But if the client learns about the new term and updates its term to it, it will never get the true leader.
if let Some(new_leader_id) = leader_id {
debug!("client term updates to {}", term);
debug!("client leader id updates to {new_leader_id}");
state.term = term;
state.leader = Some(new_leader_id);
}

Check warning on line 112 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L107-L112

Added lines #L107 - L112 were not covered by tests
}
Ordering::Equal => {
if let Some(new_leader_id) = leader_id {
if state.leader.is_none() {
debug!("client leader id updates to {new_leader_id}");
state.leader = Some(new_leader_id);
}
assert_eq!(
state.leader,
Some(new_leader_id),
"there should never be two leader in one term"

Check warning on line 123 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L115-L123

Added lines #L115 - L123 were not covered by tests
);
}

Check warning on line 125 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L125

Added line #L125 was not covered by tests
}
Ordering::Greater => {
debug!("ignore old term({}) from server", term);
return false;

Check warning on line 129 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L128-L129

Added lines #L128 - L129 were not covered by tests
}
}
true
}

Check warning on line 133 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L132-L133

Added lines #L132 - L133 were not covered by tests

/// Update client state based on [`FetchClusterResponse`]
async fn check_and_update(
&self,
res: &FetchClusterResponse,
) -> Result<(), tonic::transport::Error> {
let mut state = self.state.write().await;
if !Self::check_and_update_leader(&mut state, res.leader_id, res.term) {
return Ok(());
}
if state.cluster_version >= res.cluster_version {
debug!(
"ignore old cluster version({}) from server",
res.cluster_version
);
return Ok(());
}

Check warning on line 150 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L136-L150

Added lines #L136 - L150 were not covered by tests

debug!("client cluster version updated to {}", res.cluster_version);
state.cluster_version = res.cluster_version;

let mut new_members = res.clone().into_members_addrs();

let old_ids = state.connects.keys().copied().collect::<HashSet<_>>();
let new_ids = new_members.keys().copied().collect::<HashSet<_>>();

let diffs = &old_ids ^ &new_ids;
let sames = &old_ids & &new_ids;

Check warning on line 161 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L152-L161

Added lines #L152 - L161 were not covered by tests

for diff in diffs {
if let Entry::Vacant(e) = state.connects.entry(diff) {
let addrs = new_members
.remove(&diff)
.unwrap_or_else(|| unreachable!("{diff} must in new member addrs"));
debug!("client connects to a new server({diff}), address({addrs:?})");
let new_conn = rpc::connect(diff, addrs).await?;
let _ig = e.insert(new_conn);

Check warning on line 170 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L163-L170

Added lines #L163 - L170 were not covered by tests
} else {
debug!("client removes old server({diff})");
let _ig = state.connects.remove(&diff);

Check warning on line 173 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L172-L173

Added lines #L172 - L173 were not covered by tests
}
}
for same in sames {
let conn = state
.connects
.get(&same)
.unwrap_or_else(|| unreachable!("{same} must in old connects"));
let addrs = new_members
.remove(&same)
.unwrap_or_else(|| unreachable!("{same} must in new member addrs"));
conn.update_addrs(addrs).await?;

Check warning on line 184 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L176-L184

Added lines #L176 - L184 were not covered by tests
}

Ok(())
}

Check warning on line 188 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L187-L188

Added lines #L187 - L188 were not covered by tests

/// Get cluster version.
async fn cluster_version(&self) -> u64 {
self.state.read().await.cluster_version
}

Check warning on line 193 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L191-L193

Added lines #L191 - L193 were not covered by tests

/// Give a handle `f` to apply to all servers *concurrently* and return a stream to poll result one by one.
async fn for_each_server<R, F: Future<Output = R>>(
&self,
mut f: impl FnMut(Arc<dyn ConnectApi>) -> F,
) -> (usize, impl Stream<Item = R>) {
let connects = self
.state
.read()
.await

Check warning on line 203 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L196-L203

Added lines #L196 - L203 were not covered by tests
.connects
.values()
.map(|connect| f(Arc::clone(connect)))
.collect::<FuturesUnordered<F>>();
// size calculated here to keep size = stream.len(), otherwise Non-atomic read operation on the `connects` may result in inconsistency.
let size = connects.len();
(size, connects)
}

Check warning on line 211 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L205-L211

Added lines #L205 - L211 were not covered by tests

/// Get a handle `f` and return the future to apply `f` on the leader.
/// NOTICE:
Expand All @@ -216,143 +216,149 @@
/// `map_leader` should never be invoked in [`ClientApi::fetch_cluster`]
/// `map_leader` might call `fetch_leader_id`, `fetch_cluster`, finally
/// result in stack overflow.
async fn map_leader<R, F: Future<Output = R>>(
&self,
f: impl FnOnce(Arc<dyn ConnectApi>) -> F,
) -> Result<R, CurpError> {
let state_r = self.state.read().await;
let cached_leader = state_r.leader;
let leader_id = match cached_leader {
Some(id) => id,
None => <Unary<C> as ClientApi>::fetch_leader_id(self, false).await?,

Check warning on line 227 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L219-L227

Added lines #L219 - L227 were not covered by tests
};
// If the leader id cannot be found in connects, it indicates that there is
// an inconsistency between the client's local leader state and the cluster
// state, then mock a `WrongClusterVersion` return to the outside.
let connect = state_r
.connects
.get(&leader_id)
.ok_or_else(CurpError::wrong_cluster_version)?;
let res = f(Arc::clone(connect)).await;
Ok(res)
}

Check warning on line 238 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L232-L238

Added lines #L232 - L238 were not covered by tests

/// Send proposal to all servers
async fn fast_round(
&self,
propose_id: ProposeId,
cmd: &C,
) -> Result<Result<C::ER, C::Error>, CurpError> {
let req = ProposeRequest::new(propose_id, cmd, self.cluster_version().await);
let timeout = self.config.propose_timeout;

Check warning on line 247 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L241-L247

Added lines #L241 - L247 were not covered by tests

let (size, mut responses) = self
.for_each_server(|conn| {
let req_c = req.clone();
async move { (conn.id(), conn.propose(req_c, timeout).await) }
})
.await;
let super_quorum = super_quorum(size);

let mut err = None;
let mut err: Option<CurpError> = None;
let mut execute_result: Option<C::ER> = None;
let mut ok_cnt = 0;

Check warning on line 259 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L249-L259

Added lines #L249 - L259 were not covered by tests

while let Some((id, resp)) = responses.next().await {
let resp = match resp {
Ok(resp) => resp.into_inner(),
Err(e) => {
warn!("propose cmd({propose_id}) to server({id}) error: {e:?}");
if e.return_early() {
if e.priority() == CurpErrorPriority::ReturnImmediately {
return Err(e);
}
err = Some(e);
if let Some(old_err) = err.as_ref() {
if old_err.priority() <= e.priority() {
err = Some(e);
}
} else {
err = Some(e);
}
continue;

Check warning on line 276 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L261-L276

Added lines #L261 - L276 were not covered by tests
}
};
let deserialize_res = resp.map_result::<C, _, Result<(), C::Error>>(|res| {
let er = match res {
Ok(er) => er,
Err(cmd_err) => return Err(cmd_err),

Check warning on line 282 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L279-L282

Added lines #L279 - L282 were not covered by tests
};
if let Some(er) = er {
assert!(execute_result.is_none(), "should not set exe result twice");
execute_result = Some(er);
}
ok_cnt.add_assign(1);
Ok(())
});
let dr = match deserialize_res {
Ok(dr) => dr,
Err(ser_err) => {
warn!("serialize error: {ser_err}");

Check warning on line 294 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L284-L294

Added lines #L284 - L294 were not covered by tests
// We blame this error to the server, although it may be a local error.
// We need to retry as same as a server error.
err = Some(CurpError::from(ser_err));
continue;

Check warning on line 298 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L297-L298

Added lines #L297 - L298 were not covered by tests
}
};
if let Err(cmd_err) = dr {

Check warning on line 301 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L301

Added line #L301 was not covered by tests
// got a command execution error early, abort the next requests and return the cmd error
return Ok(Err(cmd_err));
}
// if the propose meets the super quorum and we got the execute result,
// that means we can safely abort the next requests
if ok_cnt >= super_quorum {
if let Some(er) = execute_result {
debug!("fast round for cmd({}) succeed", propose_id);
return Ok(Ok(er));
}
}

Check warning on line 312 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L303-L312

Added lines #L303 - L312 were not covered by tests
}

if let Some(err) = err {
return Err(err);
}

// We will at least send the request to the leader if no `WrongClusterVersion` returned.
// If no errors occur, the leader should return the ER
// If it is because the super quorum has not been reached, an error will definitely occur.
unreachable!("leader should return ER if no error happens");
}

Check warning on line 323 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L315-L323

Added lines #L315 - L323 were not covered by tests

/// Wait synced result from server
async fn slow_round(
&self,
propose_id: ProposeId,
) -> Result<Result<(C::ASR, C::ER), C::Error>, CurpError> {
let timeout = self.config.wait_synced_timeout;
let req = WaitSyncedRequest::new(propose_id, self.cluster_version().await);
let resp = self
.map_leader(|conn| async move { conn.wait_synced(req, timeout).await })
.await??
.into_inner();
let synced_res = resp.map_result::<C, _, _>(|res| res).map_err(|ser_err| {
warn!("serialize error: {ser_err}");

Check warning on line 337 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L326-L337

Added lines #L326 - L337 were not covered by tests
// Same as fast round, we blame the server for the serializing error.
CurpError::from(ser_err)
})?;
Ok(synced_res)
}

Check warning on line 342 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L339-L342

Added lines #L339 - L342 were not covered by tests

/// Get the client id
#[allow(clippy::unused_async)] // TODO: grant a client id from server
async fn get_client_id(&self) -> Result<u64, CurpError> {
Ok(rand::random())
}

Check warning on line 348 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L346-L348

Added lines #L346 - L348 were not covered by tests

/// New a seq num and record it
#[allow(clippy::unused_self)] // TODO: implement request tracker
fn new_seq_num(&self) -> u64 {
rand::random()
}

Check warning on line 354 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L352-L354

Added lines #L352 - L354 were not covered by tests

/// Generate a new propose id
async fn gen_propose_id(&self) -> Result<ProposeId, CurpError> {
let client_id = self.get_client_id().await?;
let seq_num = self.new_seq_num();
Ok(ProposeId(client_id, seq_num))
}

Check warning on line 361 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L357-L361

Added lines #L357 - L361 were not covered by tests
}

#[async_trait]
Expand All @@ -364,227 +370,233 @@
type Cmd = C;

/// Get the local connection when the client is on the server node.
async fn local_connect(&self) -> Option<Arc<dyn ConnectApi>> {
let id = self.local_server_id?;
self.state.read().await.connects.get(&id).map(Arc::clone)
}

Check warning on line 376 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L373-L376

Added lines #L373 - L376 were not covered by tests

/// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered
/// requests (event the requests are commutative).
async fn propose(&self, cmd: &C, use_fast_path: bool) -> Result<ProposeResponse<C>, CurpError> {
let propose_id = self.gen_propose_id().await?;

Check warning on line 381 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L380-L381

Added lines #L380 - L381 were not covered by tests

tokio::pin! {
let fast_round = self.fast_round(propose_id, cmd);
let slow_round = self.slow_round(propose_id);
}

Check warning on line 386 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L383-L386

Added lines #L383 - L386 were not covered by tests

let res: ProposeResponse<C> = if use_fast_path {
match futures::future::select(fast_round, slow_round).await {
futures::future::Either::Left((fast_result, slow_round)) => match fast_result {
Ok(er) => er.map(|e| (e, None)),
Err(err) => {
if err.return_early() {
if err.priority() > CurpErrorPriority::Low {
return Err(err);
}

Check warning on line 395 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L388-L395

Added lines #L388 - L395 were not covered by tests
// fallback to slow round if fast round failed
let sr = slow_round.await?;
sr.map(|(asr, er)| (er, Some(asr)))

Check warning on line 398 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L397-L398

Added lines #L397 - L398 were not covered by tests
}
},
futures::future::Either::Right((slow_result, fast_round)) => match slow_result {
Ok(er) => er.map(|(asr, e)| (e, Some(asr))),
Err(err) => {
if err.return_early() {
if err.priority() > CurpErrorPriority::Low {
return Err(err);
}
let fr = fast_round.await?;
fr.map(|er| (er, None))

Check warning on line 408 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L401-L408

Added lines #L401 - L408 were not covered by tests
}
},
}
} else {
let (fr, sr) = futures::future::join(fast_round, slow_round).await;
if let Err(err) = fr {
if err.return_early() {
if err.priority() > CurpErrorPriority::Low {
return Err(err);
}
}
sr?.map(|(asr, er)| (er, Some(asr)))

Check warning on line 419 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L413-L419

Added lines #L413 - L419 were not covered by tests
};

Ok(res)
}

Check warning on line 423 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L422-L423

Added lines #L422 - L423 were not covered by tests

/// Send propose configuration changes to the cluster
async fn propose_conf_change(
&self,
changes: Vec<ConfChange>,
) -> Result<Vec<Member>, CurpError> {
let propose_id = self.gen_propose_id().await?;
let req = ProposeConfChangeRequest::new(propose_id, changes, self.cluster_version().await);
let timeout = self.config.wait_synced_timeout;
let members = self
.map_leader(|conn| async move { conn.propose_conf_change(req, timeout).await })
.await??
.into_inner()
.members;
Ok(members)
}

Check warning on line 439 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L426-L439

Added lines #L426 - L439 were not covered by tests

/// Send propose to shutdown cluster
async fn propose_shutdown(&self) -> Result<(), CurpError> {
let propose_id = self.gen_propose_id().await?;
let req = ShutdownRequest::new(propose_id, self.cluster_version().await);
let timeout = self.config.wait_synced_timeout;
let _ig = self
.map_leader(|conn| async move { conn.shutdown(req, timeout).await })
.await??;
Ok(())
}

Check warning on line 450 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L442-L450

Added lines #L442 - L450 were not covered by tests

/// Send propose to publish a node id and name
async fn propose_publish(
&self,
node_id: ServerId,
node_name: String,
) -> Result<(), Self::Error> {
let propose_id = self.gen_propose_id().await?;
let req = PublishRequest::new(propose_id, node_id, node_name);
let timeout = self.config.wait_synced_timeout;
let _ig = self
.map_leader(|conn| async move { conn.publish(req, timeout).await })
.await??;
Ok(())
}

Check warning on line 465 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L453-L465

Added lines #L453 - L465 were not covered by tests

/// Send fetch read state from leader
async fn fetch_read_state(&self, cmd: &C) -> Result<ReadState, CurpError> {

Check warning on line 468 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L468

Added line #L468 was not covered by tests
// Same as fast_round, we blame the serializing error to the server even
// thought it is the local error
let req =
FetchReadStateRequest::new(cmd, self.cluster_version().await).map_err(|ser_err| {
warn!("serializing error: {ser_err}");
CurpError::from(ser_err)
})?;
let timeout = self.config.wait_synced_timeout;
let state = self
.map_leader(|conn| async move { conn.fetch_read_state(req, timeout).await })
.await??
.into_inner()
.read_state
.unwrap_or_else(|| unreachable!("read_state must be set in fetch read state response"));
Ok(state)
}

Check warning on line 484 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L471-L484

Added lines #L471 - L484 were not covered by tests

/// Send fetch cluster requests to all servers
/// Note: The fetched cluster may still be outdated if `linearizable` is false
async fn fetch_cluster(&self, linearizable: bool) -> Result<FetchClusterResponse, CurpError> {
let timeout = self.config.wait_synced_timeout;
if !linearizable {

Check warning on line 490 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L488-L490

Added lines #L488 - L490 were not covered by tests
// firstly, try to fetch the local server
if let Some(connect) = <Unary<C> as ClientApi>::local_connect(self).await {

Check warning on line 492 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L492

Added line #L492 was not covered by tests
/// local timeout, in fact, local connect should only be bypassed, so the timeout maybe unused.
const FETCH_LOCAL_TIMEOUT: Duration = Duration::from_secs(1);

let resp = connect
.fetch_cluster(FetchClusterRequest::default(), FETCH_LOCAL_TIMEOUT)
.await
.unwrap_or_else(|e| {
unreachable!(
"fetch cluster from local connect should never failed, err {e:?}"
)
})
.into_inner();
return Ok(resp);
}
}

Check warning on line 507 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L496-L507

Added lines #L496 - L507 were not covered by tests
// then fetch the whole cluster
let (size, mut responses) = self
.for_each_server(|conn| async move {
(
conn.id(),
conn.fetch_cluster(FetchClusterRequest { linearizable }, timeout)
.await
.map(Response::into_inner),
)
})
.await;
let quorum = quorum(size);

let mut max_term = 0;
let mut res = None;
let mut ok_cnt = 0;
let mut err = None;
let mut err: Option<CurpError> = None;

Check warning on line 524 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L509-L524

Added lines #L509 - L524 were not covered by tests

while let Some((id, resp)) = responses.next().await {
let inner = match resp {
Ok(r) => r,
Err(e) => {
warn!("fetch cluster from {} failed, {:?}", id, e);
if e.return_early() {
if e.priority() == CurpErrorPriority::ReturnImmediately {
return Err(e);
}
err = Some(e);
if let Some(old_err) = err.as_ref() {
if old_err.priority() <= e.priority() {
err = Some(e);
}
} else {
err = Some(e);
}
continue;

Check warning on line 541 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L526-L541

Added lines #L526 - L541 were not covered by tests
}
};

#[allow(clippy::integer_arithmetic)]
match max_term.cmp(&inner.term) {

Check warning on line 546 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L546

Added line #L546 was not covered by tests
Ordering::Less => {
if !inner.members.is_empty() {
max_term = inner.term;
res = Some(inner);
}
ok_cnt = 1;

Check warning on line 552 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L548-L552

Added lines #L548 - L552 were not covered by tests
}
Ordering::Equal => {
if !inner.members.is_empty() {
res = Some(inner);
}
ok_cnt += 1;

Check warning on line 558 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L555-L558

Added lines #L555 - L558 were not covered by tests
}
Ordering::Greater => {}

Check warning on line 560 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L560

Added line #L560 was not covered by tests
}

if ok_cnt >= quorum {
break;
}

Check warning on line 565 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L563-L565

Added lines #L563 - L565 were not covered by tests
}
if let Some(res) = res {
debug!("fetch cluster succeeded, result: {res:?}");
if let Err(e) = self.check_and_update(&res).await {
warn!("update to a new cluster state failed, error {e}");
}
return Ok(res);
}
if let Some(err) = err {
return Err(err);
}

unreachable!("At least one server will return `members` or a connection error has occurred. Leaders should not return empty members.")
}

Check warning on line 579 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L567-L579

Added lines #L567 - L579 were not covered by tests
}

#[async_trait]
impl<C: Command> LeaderStateUpdate for Unary<C> {
/// Update leader
async fn update_leader(&self, leader_id: Option<ServerId>, term: u64) -> bool {
let mut state_w = self.state.write().await;
Self::check_and_update_leader(&mut state_w, leader_id, term)
}

Check warning on line 588 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L585-L588

Added lines #L585 - L588 were not covered by tests
}

/// Calculate the super quorum
fn super_quorum(size: usize) -> usize {
let fault_tolerance = size.wrapping_div(2);
fault_tolerance
.wrapping_add(fault_tolerance.wrapping_add(1).wrapping_div(2))
.wrapping_add(1)
}

Check warning on line 597 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L592-L597

Added lines #L592 - L597 were not covered by tests

/// Calculate the quorum
fn quorum(size: usize) -> usize {
size.wrapping_div(2).wrapping_add(1)
}

Check warning on line 602 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L600-L602

Added lines #L600 - L602 were not covered by tests
24 changes: 15 additions & 9 deletions curp/src/members.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::Hasher,
hash::{Hash, Hasher},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand All @@ -11,7 +11,7 @@ use std::{
use dashmap::{mapref::one::Ref, DashMap};
use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use tracing::debug;
use tracing::{debug, info};

use crate::rpc::{self, FetchClusterRequest, FetchClusterResponse, Member};

Expand Down Expand Up @@ -285,14 +285,20 @@ impl ClusterInfo {
self.cluster_version.load(Ordering::Relaxed)
}

/// cluster version increase
pub(crate) fn cluster_version_inc(&self) -> u64 {
self.cluster_version.fetch_add(1, Ordering::Relaxed)
}

/// cluster version decrease
pub(crate) fn cluster_version_dec(&self) -> u64 {
self.cluster_version.fetch_sub(1, Ordering::Relaxed)
pub(crate) fn cluster_version_update(&self) {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
self.all_members_addrs()
.into_iter()
.sorted()
.for_each(|(id, mut addrs)| {
id.hash(&mut hasher);
addrs.sort();
addrs.hash(&mut hasher);
});
let ver = hasher.finish();
bsbds marked this conversation as resolved.
Show resolved Hide resolved
info!("cluster version updates to {ver}");
self.cluster_version.store(ver, Ordering::Relaxed);
}

/// Get peers
Expand Down
40 changes: 26 additions & 14 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,24 +635,36 @@
Self::Internal(reason.into())
}

/// Errors that should return early to the retry layer when we
/// got the error at propose stage
pub(crate) fn return_early(&self) -> bool {
matches!(
*self,
/// Get the priority of the error
pub(crate) fn priority(&self) -> CurpErrorPriority {
match *self {

Check warning on line 640 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L639-L640

Added lines #L639 - L640 were not covered by tests
CurpError::Duplicated(_)
| CurpError::ShuttingDown(_)
| CurpError::InvalidConfig(_)
| CurpError::NodeAlreadyExists(_)
| CurpError::NodeNotExists(_)
| CurpError::LearnerNotCatchUp(_)
| CurpError::ExpiredClientId(_)
| CurpError::WrongClusterVersion(_)
| CurpError::Redirect(_)
)
| CurpError::ShuttingDown(_)
| CurpError::InvalidConfig(_)
| CurpError::NodeAlreadyExists(_)
| CurpError::NodeNotExists(_)
| CurpError::LearnerNotCatchUp(_)
| CurpError::ExpiredClientId(_)
| CurpError::Redirect(_) => CurpErrorPriority::ReturnImmediately,
CurpError::WrongClusterVersion(_) => CurpErrorPriority::High,

Check warning on line 649 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L648-L649

Added lines #L648 - L649 were not covered by tests
CurpError::RpcTransport(_) | CurpError::Internal(_) | CurpError::KeyConflict(_) => {
CurpErrorPriority::Low

Check warning on line 651 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L651

Added line #L651 was not covered by tests
}
}
}

Check warning on line 654 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L654

Added line #L654 was not covered by tests
}

/// The priority of curp error, indicate which error should be handled in retry layer
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]

Check warning on line 658 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L658

Added line #L658 was not covered by tests
pub(crate) enum CurpErrorPriority {
/// Low priority
Low = 1,
/// High priority
High = 2,
/// Should be returned immediately if any server response it
ReturnImmediately = 3,
}

impl<E: std::error::Error + 'static> From<E> for CurpError {
#[inline]
fn from(value: E) -> Self {
Expand Down Expand Up @@ -771,7 +783,7 @@
pub(crate) fn is_conflict_with_cmd(&self, c: &C) -> bool {
match self.inner {
PoolEntryInner::Command(ref cmd) => cmd.is_conflict(c),
PoolEntryInner::ConfChange(ref _conf_change) => true,

Check warning on line 786 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L786

Added line #L786 was not covered by tests
}
}
}
Expand All @@ -782,7 +794,7 @@
{
fn is_conflict(&self, other: &Self) -> bool {
let PoolEntryInner::Command(ref cmd1) = self.inner else {
return true;

Check warning on line 797 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L797

Added line #L797 was not covered by tests
};
let PoolEntryInner::Command(ref cmd2) = other.inner else {
return true;
Expand Down
4 changes: 2 additions & 2 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@
let EntryData::ConfChange(ref conf_change) = info.origin_entry.entry_data else {
unreachable!("the entry in the fallback_info should be conf change entry");
};
let changes = conf_change.clone();

Check warning on line 401 in curp/src/server/raw_curp/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/server/raw_curp/mod.rs#L401

Added line #L401 was not covered by tests
self.fallback_conf_change(changes, info.addrs, info.name, info.is_learner);
}
// apply conf change entries
Expand Down Expand Up @@ -567,7 +567,7 @@
EntryData::Empty
| EntryData::Command(_)
| EntryData::Shutdown
| EntryData::SetName(_, _) => false,

Check warning on line 570 in curp/src/server/raw_curp/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/server/raw_curp/mod.rs#L570

Added line #L570 was not covered by tests
});
// extra check to shutdown removed node
if !contains_candidate && !remove_candidate_is_not_committed {
Expand Down Expand Up @@ -1154,7 +1154,7 @@
None
}
};
let _ig = self.ctx.cluster_info.cluster_version_dec();
self.ctx.cluster_info.cluster_version_update();
if let Some(c) = fallback_change {
self.ctx
.change_tx
Expand Down Expand Up @@ -1546,7 +1546,7 @@
}
};
if modified {
let _ig = self.ctx.cluster_info.cluster_version_inc();
self.ctx.cluster_info.cluster_version_update();
}
if self.is_leader() {
self.ctx
Expand Down
5 changes: 4 additions & 1 deletion curp/src/server/raw_curp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,10 @@ fn add_node_should_add_new_node_to_curp() {
old_cluster.all_members(),
cluster_after_fallback.all_members()
);
assert_eq!(cluster_after_fallback.cluster_version(), 0);
iGxnon marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(
cluster_after_fallback.cluster_version(),
old_cluster.cluster_version()
);
}

#[traced_test]
Expand Down
12 changes: 6 additions & 6 deletions curp/tests/it/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
assert!(matches!(res, Err(ClientError::ShuttingDown)));

let collection = collection_task.await.unwrap();
sleep_secs(3).await; // wait for the cluster to shutdown
sleep_secs(7).await; // wait for the cluster to shutdown
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
assert!(group.is_finished());

let group = CurpGroup::new_rocks(3, tmp_path).await;
Expand Down Expand Up @@ -341,7 +341,7 @@ async fn propose_remove_follower_should_success() {
let members = client.propose_conf_change(changes).await.unwrap().unwrap();
assert_eq!(members.len(), 4);
assert!(members.iter().all(|m| m.id != follower_id));
sleep_secs(3).await; // wait the removed node start election and detect it is removed
sleep_secs(7).await; // wait the removed node start election and detect it is removed
assert!(group.nodes.get(&follower_id).unwrap().handle.is_finished());
// check if the old client can propose to the new cluster
let res = client.propose(TestCommand::new_get(vec![1]), true).await;
Expand All @@ -360,7 +360,7 @@ async fn propose_remove_leader_should_success() {
let members = client.propose_conf_change(changes).await.unwrap().unwrap();
assert_eq!(members.len(), 4);
assert!(members.iter().all(|m| m.id != leader_id));
sleep_secs(3).await; // wait for the new leader to be elected
sleep_secs(7).await; // wait for the new leader to be elected
assert!(group.nodes.get(&leader_id).unwrap().handle.is_finished());
let new_leader_id = group.get_leader().await.0;
assert_ne!(new_leader_id, leader_id);
Expand Down Expand Up @@ -415,7 +415,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster_when_client_has_wrong_leader()
.unwrap();
client.shutdown().await.unwrap();

sleep_secs(3).await; // wait for the cluster to shutdown
sleep_secs(7).await; // wait for the cluster to shutdown
assert!(group.is_finished());
}

Expand Down Expand Up @@ -542,7 +542,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster_when_client_has_wrong_cluster(
.await;
client.shutdown().await.unwrap();

sleep_secs(5).await; // wait for the cluster to shutdown
sleep_secs(7).await; // wait for the cluster to shutdown
assert!(group.is_finished());
}

Expand All @@ -569,7 +569,7 @@ async fn propose_conf_change_rpc_should_work_when_client_has_wrong_cluster() {
let members = client.propose_conf_change(changes).await.unwrap().unwrap();
assert_eq!(members.len(), 3);
assert!(members.iter().all(|m| m.id != node_id));
sleep_secs(5).await;
sleep_secs(7).await;
assert!(group.nodes.get(&node_id).unwrap().handle.is_finished());
}

Expand Down
Loading