Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add txn retry logic #776

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions crates/curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ impl CommandExecutor<TestCommand> for TestCE {
let Some(index) = self
.store
.get(META_TABLE, APPLIED_INDEX_KEY)
.map_err(|e| ExecuteError(e.to_string()))? else {
.map_err(|e| ExecuteError(e.to_string()))?
else {
return Ok(0);
};
let index = LogIndex::from_le_bytes(index.as_slice().try_into().unwrap());
Expand All @@ -379,7 +380,10 @@ impl CommandExecutor<TestCommand> for TestCE {
snapshot: Option<(Snapshot, LogIndex)>,
) -> Result<(), <TestCommand as Command>::Error> {
let Some((mut snapshot, index)) = snapshot else {
let ops = vec![WriteOperation::new_delete_range(TEST_TABLE, &[], &[0xff]),WriteOperation::new_delete(META_TABLE, APPLIED_INDEX_KEY.as_ref())];
let ops = vec![
WriteOperation::new_delete_range(TEST_TABLE, &[], &[0xff]),
WriteOperation::new_delete(META_TABLE, APPLIED_INDEX_KEY.as_ref()),
];
self.store
.write_batch(ops, true)
.map_err(|e| ExecuteError(e.to_string()))?;
Expand Down
4 changes: 2 additions & 2 deletions crates/curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ async fn worker_as<C: Command, CE: CommandExecutor<C>, RC: RoleChange>(
let success = match entry.entry_data {
EntryData::Command(ref cmd) => {
let Some(prepare) = prepare else {
unreachable!("prepare should always be Some(_) when entry is a command");
};
unreachable!("prepare should always be Some(_) when entry is a command");
};
let asr = ce.after_sync(cmd.as_ref(), entry.index, prepare).await;
let asr_ok = asr.is_ok();
cb.write().insert_asr(entry.propose_id, asr);
Expand Down
3 changes: 1 addition & 2 deletions crates/curp/src/server/conflict/spec_pool_new.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use curp_external_api::conflict::{ConflictPoolOp, SpeculativePoolOp};

use crate::rpc::PoolEntry;

use super::{CommandEntry, ConfChangeEntry, ConflictPoolEntry};
use crate::rpc::PoolEntry;

/// A speculative pool object
pub type SpObject<C> = Box<dyn SpeculativePoolOp<Entry = CommandEntry<C>> + Send + 'static>;
Expand Down
3 changes: 1 addition & 2 deletions crates/curp/src/server/conflict/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ use std::{cmp::Ordering, sync::Arc};

use curp_external_api::conflict::{ConflictPoolOp, SpeculativePoolOp, UncommittedPoolOp};

use super::{spec_pool_new::SpeculativePool, CommandEntry};
use crate::{
rpc::{ConfChange, PoolEntry, PoolEntryInner, ProposeId},
server::conflict::uncommitted_pool::UncommittedPool,
};

use super::{spec_pool_new::SpeculativePool, CommandEntry};

#[derive(Debug, Default)]
struct TestSp {
entries: Vec<CommandEntry<i32>>,
Expand Down
3 changes: 1 addition & 2 deletions crates/curp/src/server/conflict/uncommitted_pool.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use curp_external_api::conflict::{ConflictPoolOp, UncommittedPoolOp};

use crate::rpc::PoolEntry;

use super::{CommandEntry, ConfChangeEntry, ConflictPoolEntry};
use crate::rpc::PoolEntry;

/// An uncommitted pool object
pub type UcpObject<C> = Box<dyn UncommittedPoolOp<Entry = CommandEntry<C>> + Send + 'static>;
Expand Down
6 changes: 5 additions & 1 deletion crates/curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,11 @@ impl<C: Command, RC: RoleChange> CurpNode<C, RC> {
break;
}
let Some(event) = remove_events.remove(&change.node_id) else {
unreachable!("({:?}) shutdown_event of removed follower ({:x}) should exist", curp.id(), change.node_id);
unreachable!(
"({:?}) shutdown_event of removed follower ({:x}) should exist",
curp.id(),
change.node_id
);
};
event.notify(1);
}
Expand Down
5 changes: 4 additions & 1 deletion crates/curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,10 @@ impl<C: Command, RC: RoleChange> RawCurp<C, RC> {
};

let Some(next_index) = self.lst.get_next_index(follower_id) else {
warn!("follower {} is not found, it maybe has been removed", follower_id);
warn!(
"follower {} is not found, it maybe has been removed",
follower_id
);
return None;
};
let log_r = self.log.read();
Expand Down
2 changes: 1 addition & 1 deletion crates/curp/tests/it/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
));

let collection = collection_task.await.unwrap();
sleep_secs(7).await; // wait for the cluster to shutdown
sleep_secs(1).await; // wait for the cluster to shutdown
bsbds marked this conversation as resolved.
Show resolved Hide resolved
assert!(group.is_finished());

let group = CurpGroup::new_rocks(3, tmp_path).await;
Expand Down
1 change: 1 addition & 0 deletions crates/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tokio = { version = "0.2.23", package = "madsim-tokio", features = [
"io-util",
] }
tokio-util = { version = "0.7.8", features = ["io"] }
tracing = "0.1.40"
utils = { path = "../utils" }
uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../../workspace-hack" }
Expand Down
3 changes: 1 addition & 2 deletions crates/engine/src/memory_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ use parking_lot::RwLock;
use tokio::io::AsyncWriteExt;
use tokio_util::io::read_buf;

pub(super) use self::transaction::MemoryTransaction;
use crate::{
api::{engine_api::StorageEngine, snapshot_api::SnapshotApi},
error::EngineError,
WriteOperation,
};

pub(super) use self::transaction::MemoryTransaction;

/// A helper type to store the key-value pairs for the `MemoryEngine`
type MemoryTable = HashMap<Vec<u8>, Vec<u8>>;

Expand Down
113 changes: 70 additions & 43 deletions crates/engine/src/rocksdb_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@ use std::{
use bytes::{Buf, Bytes, BytesMut};
use clippy_utilities::{NumericCast, OverflowArithmetic};
use rocksdb::{
Direction, Error as RocksError, IteratorMode, OptimisticTransactionDB, Options, SstFileWriter,
Direction, Error as RocksError, ErrorKind as RocksErrorKind, IteratorMode,
OptimisticTransactionDB, Options, SstFileWriter,
};
use serde::{Deserialize, Serialize};
use tokio::{fs::File, io::AsyncWriteExt};
use tokio_util::io::read_buf;
use tracing::warn;

pub(super) use self::transaction::RocksTransaction;
use crate::{
api::{engine_api::StorageEngine, snapshot_api::SnapshotApi},
error::EngineError,
WriteOperation,
};

pub(super) use self::transaction::RocksTransaction;

/// Install snapshot chunk size: 64KB
const SNAPSHOT_CHUNK_SIZE: usize = 64 * 1024;

Expand Down Expand Up @@ -200,51 +201,73 @@ impl StorageEngine for RocksEngine {

#[inline]
fn write_batch(&self, wr_ops: Vec<WriteOperation<'_>>, _sync: bool) -> Result<(), EngineError> {
let transaction = self.inner.transaction();
let mut size = 0;
for op in wr_ops {
match op {
WriteOperation::Put { table, key, value } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound(table.to_owned()))?;
size = size.overflow_add(Self::max_write_size(
table.len(),
key.len(),
value.len(),
));
transaction.put_cf(&cf, key, value)?;
let mut retry_interval = 10;
let max_retry_count = 5;
let mut retry_count = 0;
loop {
let transaction = self.inner.transaction();
let mut size = 0;
#[allow(clippy::pattern_type_mismatch)] // can't be fixed
for op in &wr_ops {
match op {
WriteOperation::Put { table, key, value } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound((*table).to_owned()))?;
size = size.overflow_add(Self::max_write_size(
table.len(),
key.len(),
value.len(),
));
transaction.put_cf(&cf, key, value)?;
}
WriteOperation::Delete { table, key } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound((*table).to_owned()))?;
transaction.delete_cf(&cf, key)?;
}
WriteOperation::DeleteRange { table, from, to } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound((*table).to_owned()))?;
let mode = IteratorMode::From(from, Direction::Forward);
let kvs: Vec<_> = transaction
.iterator_cf(&cf, mode)
.take_while(|res| res.as_ref().is_ok_and(|(key, _)| key.as_ref() < *to))
.collect::<Result<Vec<_>, _>>()?;
for (key, _) in kvs {
transaction.delete_cf(&cf, key)?;
}
}
}
WriteOperation::Delete { table, key } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound(table.to_owned()))?;
transaction.delete_cf(&cf, key)?;
}
match transaction.commit() {
Ok(()) => {
_ = self
.size
.fetch_add(size.numeric_cast(), std::sync::atomic::Ordering::Relaxed);
return Ok(());
}
WriteOperation::DeleteRange { table, from, to } => {
let cf = self
.inner
.cf_handle(table)
.ok_or(EngineError::TableNotFound(table.to_owned()))?;
let mode = IteratorMode::From(from, Direction::Forward);
let kvs: Vec<_> = transaction
.iterator_cf(&cf, mode)
.take_while(|res| res.as_ref().is_ok_and(|kv| kv.0.as_ref() < to))
.collect::<Result<Vec<_>, _>>()?;
for (key, _) in kvs {
transaction.delete_cf(&cf, key)?;
Err(err)
if matches!(err.kind(), RocksErrorKind::Busy | RocksErrorKind::TryAgain) =>
{
if retry_count > max_retry_count {
warn!("Oops, txn commit retry count reach the max_retry_count: {max_retry_count}");
return Err(EngineError::UnderlyingError(err.to_string()));
}
warn!("Rocksdb txn commit failed, retrying after {retry_interval}ms");
std::thread::sleep(std::time::Duration::from_millis(retry_interval));
bsbds marked this conversation as resolved.
Show resolved Hide resolved
retry_interval = retry_interval.overflow_mul(2);
retry_count = retry_count.overflow_add(1);
continue;
}
Err(err) => return Err(EngineError::UnderlyingError(err.to_string())),
}
}
transaction.commit()?;
_ = self
.size
.fetch_add(size.numeric_cast(), std::sync::atomic::Ordering::Relaxed);

Ok(())
}

#[inline]
Expand Down Expand Up @@ -480,7 +503,11 @@ impl RocksSnapshot {

/// path of current file
fn current_file_path(&self, tmp: bool) -> PathBuf {
let Some(current_filename) = self.snap_files.get(self.snap_file_idx).map(|sf| &sf.filename) else {
let Some(current_filename) = self
.snap_files
.get(self.snap_file_idx)
.map(|sf| &sf.filename)
else {
unreachable!("this method must be called when self.file_index < self.snap_files.len()")
};
let filename = if tmp {
Expand Down
2 changes: 1 addition & 1 deletion crates/utils/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl TaskManager {
let mut queue = Self::root_tasks_queue(&tasks);
state.store(1, Ordering::Release);
while let Some(v) = queue.pop_front() {
let Some((_name,mut task)) = tasks.remove(&v) else {
let Some((_name, mut task)) = tasks.remove(&v) else {
continue;
};
task.notifier.notify_waiters();
Expand Down
6 changes: 5 additions & 1 deletion crates/xline-client/src/clients/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,11 @@ impl AuthClient {
.await??;
cmd_res.into_inner()
} else {
let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd,self.token.as_ref(),false).await?? else {
let (cmd_res, Some(sync_res)) = self
.curp_client
.propose(&cmd, self.token.as_ref(), false)
.await??
else {
unreachable!("sync_res is always Some when use_fast_path is false");
};
let mut res_wrapper = cmd_res.into_inner();
Expand Down
6 changes: 5 additions & 1 deletion crates/xline-client/src/clients/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ impl KvClient {
pub async fn txn(&self, request: TxnRequest) -> Result<TxnResponse> {
let request = RequestWrapper::from(xlineapi::TxnRequest::from(request));
let cmd = Command::new(request.keys(), request);
let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd,self.token.as_ref(), false).await?? else {
let (cmd_res, Some(sync_res)) = self
.curp_client
.propose(&cmd, self.token.as_ref(), false)
.await??
else {
unreachable!("sync_res is always Some when use_fast_path is false");
};
let mut res_wrapper = cmd_res.into_inner();
Expand Down
2 changes: 1 addition & 1 deletion crates/xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ where
let cmd_size = size_estimate::cmd_size(cmd.request());
if self.persistent.estimated_file_size().overflow_add(cmd_size) > self.quota {
let Ok(file_size) = self.persistent.file_size() else {
return false
return false;
};
if file_size.overflow_add(cmd_size) > self.quota {
warn!(
Expand Down
7 changes: 5 additions & 2 deletions crates/xline/src/storage/auth_store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,11 @@ where
if (req.role != ROOT_ROLE) && role.is_err() {
return Err(ExecuteError::RoleNotFound(req.role.clone()));
}
let Err(idx) = user.roles.binary_search(&req.role) else {
return Err(ExecuteError::UserAlreadyHasRole(req.user.clone(), req.role.clone()));
let Err(idx) = user.roles.binary_search(&req.role) else {
return Err(ExecuteError::UserAlreadyHasRole(
req.user.clone(),
req.role.clone(),
));
};
user.roles.insert(idx, req.role.clone());
if let Ok(role) = role {
Expand Down
7 changes: 2 additions & 5 deletions crates/xline/src/storage/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,8 @@ where

/// Get compact revision from db
fn get_compact_revision(&self, revision_key: &str) -> Result<Option<i64>, ExecuteError> {
let Some(revision_bytes)= self.inner
.db
.get_value(META_TABLE, revision_key)?
else {
return Ok(None);
let Some(revision_bytes) = self.inner.db.get_value(META_TABLE, revision_key)? else {
return Ok(None);
};
let bytes = revision_bytes.try_into().map_err(|e| {
ExecuteError::DbError(format!(
Expand Down
4 changes: 2 additions & 2 deletions crates/xline/src/storage/lease_store/lease_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl LeaseCollection {
pub(crate) fn attach(&self, lease_id: i64, key: Vec<u8>) -> Result<(), ExecuteError> {
let mut inner = self.inner.write();
let Some(lease) = inner.lease_map.get_mut(&lease_id) else {
return Err(ExecuteError::LeaseNotFound(lease_id));
return Err(ExecuteError::LeaseNotFound(lease_id));
};
lease.insert_key(key.clone());
let _ignore = inner.item_map.insert(key, lease_id);
Expand All @@ -96,7 +96,7 @@ impl LeaseCollection {
pub(crate) fn detach(&self, lease_id: i64, key: &[u8]) -> Result<(), ExecuteError> {
let mut inner = self.inner.write();
let Some(lease) = inner.lease_map.get_mut(&lease_id) else {
return Err(ExecuteError::LeaseNotFound(lease_id));
return Err(ExecuteError::LeaseNotFound(lease_id));
};
lease.remove_key(key);
let _ignore = inner.item_map.remove(key);
Expand Down
8 changes: 6 additions & 2 deletions crates/xline/src/storage/lease_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,17 @@ mod test {
let _ignore4 = exe_and_sync_req(&lease_store, &req4, -1).await?;
let resp_1 = exe_and_sync_req(&lease_store, &req6, -1).await?;

let ResponseWrapper::LeaseLeasesResponse(leases_1) = resp_1 else { panic!("wrong response type: {resp_1:?}"); };
let ResponseWrapper::LeaseLeasesResponse(leases_1) = resp_1 else {
panic!("wrong response type: {resp_1:?}");
};
assert_eq!(leases_1.leases[0].id, 3);
assert_eq!(leases_1.leases[1].id, 4);

let _ignore5 = exe_and_sync_req(&lease_store, &req5, -1).await?;
let resp_2 = exe_and_sync_req(&lease_store, &req6, -1).await?;
let ResponseWrapper::LeaseLeasesResponse(leases_2) = resp_2 else { panic!("wrong response type: {resp_2:?}"); };
let ResponseWrapper::LeaseLeasesResponse(leases_2) = resp_2 else {
panic!("wrong response type: {resp_2:?}");
};
assert_eq!(leases_2.leases[0].id, 4);

Ok(())
Expand Down
Loading
Loading