diff --git a/simulation/src/curp_group.rs b/simulation/src/curp_group.rs index 58a7d6394..4b9d990f1 100644 --- a/simulation/src/curp_group.rs +++ b/simulation/src/curp_group.rs @@ -257,12 +257,13 @@ impl CurpGroup { } pub async fn get_leader(&self) -> (ServerId, u64) { + const RETRY_INTERVAL: u64 = 100; loop { if let Some(leader) = self.try_get_leader().await { return leader; } debug!("failed to get leader"); - madsim::time::sleep(Duration::from_millis(100)).await; + madsim::time::sleep(Duration::from_millis(RETRY_INTERVAL)).await; } } diff --git a/utils/src/config.rs b/utils/src/config.rs index d721d2e43..01bf30809 100644 --- a/utils/src/config.rs +++ b/utils/src/config.rs @@ -129,10 +129,10 @@ pub struct CompactConfig { #[getset(get = "pub")] #[serde(default = "default_compact_batch_size")] compact_batch_size: usize, - /// The interval between two compact operations + /// The interval between two compaction batches #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_compact_interval")] - compact_interval: Duration, + #[serde(with = "duration_format", default = "default_compact_sleep_interval")] + compact_sleep_interval: Duration, } impl Default for CompactConfig { @@ -140,7 +140,7 @@ impl Default for CompactConfig { fn default() -> Self { Self { compact_batch_size: default_compact_batch_size(), - compact_interval: default_compact_interval(), + compact_sleep_interval: default_compact_sleep_interval(), } } } @@ -149,10 +149,10 @@ impl CompactConfig { /// Create a new compact config #[must_use] #[inline] - pub fn new(compact_batch_size: usize, compact_interval: Duration) -> Self { + pub fn new(compact_batch_size: usize, compact_sleep_interval: Duration) -> Self { Self { compact_batch_size, - compact_interval, + compact_sleep_interval, } } } @@ -167,7 +167,7 @@ pub const fn default_compact_batch_size() -> usize { /// default compact interval #[must_use] #[inline] -pub const fn default_compact_interval() -> Duration { +pub const fn default_compact_sleep_interval() -> Duration { Duration::from_millis(10) } @@ -749,7 +749,7 @@ mod tests { [compact] compact_batch_size = 123 - compact_interval = '5ms' + compact_sleep_interval = '5ms' [log] path = '/var/log/xline' @@ -825,7 +825,7 @@ mod tests { config.compact, CompactConfig { compact_batch_size: 123, - compact_interval: Duration::from_millis(5) + compact_sleep_interval: Duration::from_millis(5) } ); } diff --git a/xline/src/main.rs b/xline/src/main.rs index 0403f2fc9..c3a259b1d 100644 --- a/xline/src/main.rs +++ b/xline/src/main.rs @@ -152,7 +152,7 @@ use utils::{ config::{ default_batch_max_size, default_batch_timeout, default_candidate_timeout_ticks, default_client_wait_synced_timeout, default_cmd_workers, default_compact_batch_size, - default_compact_interval, default_follower_timeout_ticks, default_gc_interval, + default_compact_sleep_interval, default_follower_timeout_ticks, default_gc_interval, default_heartbeat_interval, default_log_entries_cap, default_log_level, default_propose_timeout, default_range_retry_timeout, default_retry_timeout, default_rotation, default_rpc_timeout, default_server_wait_synced_timeout, @@ -268,7 +268,7 @@ struct ServerArgs { compact_batch_size: usize, /// Interval between two compaction operations [default: 10ms] #[clap(long, value_parser = parse_duration)] - compact_interval: Option, + compact_sleep_interval: Option, } impl From for XlineServerConfig { @@ -337,8 +337,8 @@ impl From for XlineServerConfig { let auth = AuthConfig::new(args.auth_public_key, args.auth_private_key); let compact = CompactConfig::new( args.compact_batch_size, - args.compact_interval - .unwrap_or_else(default_compact_interval), + args.compact_sleep_interval + .unwrap_or_else(default_compact_sleep_interval), ); XlineServerConfig::new(cluster, storage, log, trace, auth, compact) } diff --git a/xline/src/server/command.rs b/xline/src/server/command.rs index 745bf0149..d52adf42c 100644 --- a/xline/src/server/command.rs +++ b/xline/src/server/command.rs @@ -488,29 +488,26 @@ impl ConflictCheck for Command { if (this_req.is_txn_request() && other_req.is_compaction_request()) || (this_req.is_compaction_request() && other_req.is_txn_request()) { - let conflict = match (this_req.clone(), other_req.clone()) { + match (this_req, other_req) { ( - RequestWrapper::CompactionRequest(ref com_req), - RequestWrapper::TxnRequest(ref txn_req), + &RequestWrapper::CompactionRequest(ref com_req), + &RequestWrapper::TxnRequest(ref txn_req), ) | ( - RequestWrapper::TxnRequest(ref txn_req), - RequestWrapper::CompactionRequest(ref com_req), + &RequestWrapper::TxnRequest(ref txn_req), + &RequestWrapper::CompactionRequest(ref com_req), ) => { let target_revision = com_req.revision; - txn_req.is_conflict_with_rev(target_revision) + return txn_req.is_conflict_with_rev(target_revision) } - _ => false, - }; - if conflict { - return true; + _ => unreachable!("The request must be either a transaction or a compaction request! \nthis_req = {this_req:?} \nother_req = {other_req:?}") } } let this_lease_ids = get_lease_ids(this_req); let other_lease_ids = get_lease_ids(other_req); let lease_conflict = !this_lease_ids.is_disjoint(&other_lease_ids); - let key_conflict: bool = self + let key_conflict = self .keys() .iter() .cartesian_product(other.keys().iter()) @@ -603,6 +600,8 @@ impl CurpCommand for Command { #[cfg(test)] mod test { + use xlineapi::Compare; + use super::*; use crate::rpc::{ AuthEnableRequest, AuthStatusRequest, CompactionRequest, LeaseGrantRequest, @@ -699,7 +698,7 @@ mod test { lease: 123, ..Default::default() })), - ProposeId::new("id8".to_owned()), + ProposeId::new("id5".to_owned()), ); let txn_with_lease_id_cmd = Command::new( vec![KeyRange::new_one_key("key")], @@ -735,6 +734,24 @@ mod test { assert!(cmd6.is_conflict(&lease_leases_cmd)); // lease read and write } + fn generate_txn_command( + keys: Vec, + compare: Vec, + success: Vec, + failure: Vec, + propose_id: &str, + ) -> Command { + Command::new( + keys, + RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { + compare, + success, + failure, + })), + ProposeId::new(propose_id.to_owned()), + ) + } + #[test] fn test_compaction_txn_conflict() { let compaction_cmd_1 = Command::new( @@ -755,68 +772,60 @@ mod test { ProposeId::new("id12".to_owned()), ); - let txn_with_lease_id_cmd = Command::new( + let txn_with_lease_id_cmd = generate_txn_command( vec![KeyRange::new_one_key("key")], - RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { - compare: vec![], - success: vec![RequestOp { - request: Some(Request::RequestPut(PutRequest { - key: b"key".to_vec(), - value: b"value".to_vec(), - lease: 123, - ..Default::default() - })), - }], - failure: vec![], - })), - ProposeId::new("id6".to_owned()), + vec![], + vec![RequestOp { + request: Some(Request::RequestPut(PutRequest { + key: b"key".to_vec(), + value: b"value".to_vec(), + lease: 123, + ..Default::default() + })), + }], + vec![], + "id6", ); - let txn_cmd_1 = Command::new( + let txn_cmd_1 = generate_txn_command( vec![KeyRange::new_one_key("key")], - RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { - compare: vec![], - success: vec![RequestOp { - request: Some(Request::RequestRange(RangeRequest { - key: b"key".to_vec(), - ..Default::default() - })), - }], - failure: vec![], - })), - ProposeId::new("id13".to_owned()), + vec![], + vec![RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: b"key".to_vec(), + ..Default::default() + })), + }], + vec![], + "id13", ); - let txn_cmd_2 = Command::new( + let txn_cmd_2 = generate_txn_command( vec![KeyRange::new_one_key("key")], - RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { - compare: vec![], - success: vec![RequestOp { - request: Some(Request::RequestRange(RangeRequest { - key: b"key".to_vec(), - revision: 3, - ..Default::default() - })), - }], - failure: vec![], - })), - ProposeId::new("id14".to_owned()), + vec![], + vec![RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: b"key".to_vec(), + revision: 3, + ..Default::default() + })), + }], + vec![], + "id14", ); - let txn_cmd_3 = Command::new( + let txn_cmd_3 = generate_txn_command( vec![KeyRange::new_one_key("key")], - RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { - compare: vec![], - success: vec![RequestOp { - request: Some(Request::RequestRange(RangeRequest { - key: b"key".to_vec(), - revision: 7, - ..Default::default() - })), - }], - failure: vec![], - })), - ProposeId::new("id15".to_owned()), + vec![], + vec![RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: b"key".to_vec(), + revision: 7, + ..Default::default() + })), + }], + vec![], + "id15", ); assert!(compaction_cmd_1.is_conflict(&compaction_cmd_2)); diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 7864930ba..84dd2c933 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -9,6 +9,7 @@ use futures::future::join_all; use tokio::time::timeout; use tracing::{debug, instrument}; use uuid::Uuid; +use xlineapi::ResponseWrapper; use super::{ auth_server::get_token, @@ -211,16 +212,23 @@ where "required revision {} is higher than current revision {}", req.revision, current_revision ))) - } else if req.revision < compacted_revision { - Err(tonic::Status::invalid_argument(format!( - "required revision {} has been compacted, compacted revision is {}", - req.revision, compacted_revision - ))) } else { - Ok(()) + Self::check_range_compacted(req.revision, compacted_revision) } } + /// check whether the required revision is compacted or not + fn check_range_compacted( + range_revision: i64, + compacted_revision: i64, + ) -> Result<(), tonic::Status> { + (range_revision >= compacted_revision) + .then_some(()) + .ok_or(tonic::Status::invalid_argument(format!( + "required revision {range_revision} has been compacted, compacted revision is {compacted_revision}" + ))) + } + /// Validate compact request before handle fn check_compact_request( req: &CompactionRequest, @@ -439,6 +447,7 @@ where self.kv_storage.compacted_revision(), self.kv_storage.revision(), )?; + let range_required_revision = range_req.revision; let is_serializable = range_req.serializable; let token = get_token(request.metadata()); let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); @@ -446,6 +455,13 @@ where let cmd = Self::command_from_request_wrapper(propose_id, wrapper); if !is_serializable { self.wait_read_state(&cmd).await?; + // Double check whether the range request is compacted or not since the compaction request + // may be executed during the process of `wait_read_state` which results in the result of + // previous `check_range_request` outdated. + Self::check_range_compacted( + range_required_revision, + self.kv_storage.compacted_revision(), + )?; } self.serializable_range(cmd.request()) } @@ -547,18 +563,23 @@ where debug!("Receive CompactionRequest {:?}", request); let compacted_revision = self.kv_storage.compacted_revision(); let current_revision = self.kv_storage.revision(); - Self::check_compact_request(request.get_ref(), compacted_revision, current_revision)?; - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Not Implemented".to_owned(), - )) + let req = request.get_ref(); + Self::check_compact_request(req, compacted_revision, current_revision)?; + + let is_fast_path = !req.physical; + let (cmd_res, _sync_res) = self.propose(request, is_fast_path).await?; + let resp = cmd_res.decode(); + + if let ResponseWrapper::CompactionResponse(response) = resp { + Ok(tonic::Response::new(response)) + } else { + panic!("Receive wrong response {resp:?} for CompactionRequest"); + } } } #[cfg(test)] mod test { - use test_macros::abort_on_panic; - use super::*; use crate::storage::db::DB; diff --git a/xline/src/server/watch_server.rs b/xline/src/server/watch_server.rs index 7731356ea..bc6af9865 100644 --- a/xline/src/server/watch_server.rs +++ b/xline/src/server/watch_server.rs @@ -806,10 +806,15 @@ mod test { for i in 0..3 { let watch_res = res_rx.recv().await.unwrap(); if i == 0 { - assert!( - watch_res.is_err(), - "watch create request with a compacted revision should not be successful" - ); + if let Err(e) = watch_res { + assert_eq!(e.code(), tonic::Code::InvalidArgument, + "watch a compacted revision should return invalid_argument error, but found {e:?}" + ); + } else { + unreachable!( + "watch create request with a compacted revision should not be successful" + ) + } } else { assert!( watch_res.is_ok(), diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index a0f40c883..294e37f2e 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -140,7 +140,7 @@ impl XlineServer { Arc::clone(&kv_storage), Arc::clone(&index), *self.compact_cfg.compact_batch_size(), - *self.compact_cfg.compact_interval(), + *self.compact_cfg.compact_sleep_interval(), compact_task_rx, )); // TODO: Boot up the compact policy scheduler diff --git a/xline/src/storage/compact.rs b/xline/src/storage/compact.rs index ae14fc263..03ac75200 100644 --- a/xline/src/storage/compact.rs +++ b/xline/src/storage/compact.rs @@ -25,6 +25,7 @@ pub(crate) async fn compactor( .into_iter() .map(|key_rev| key_rev.as_revision().encode_to_vec()) .collect::>>(); + // Given that the Xline uses a lim-tree database with smaller write amplification as the storage backend , does using progressive compaction really good at improving performance? for revision_chunk in target_revisions.chunks(batch_limit) { if let Err(e) = kv_store.compact(revision_chunk) { panic!("failed to compact revision chunk {revision_chunk:?} due to {e}"); diff --git a/xline/src/storage/kv_store.rs b/xline/src/storage/kv_store.rs index fd96758be..97cb9b6f7 100644 --- a/xline/src/storage/kv_store.rs +++ b/xline/src/storage/kv_store.rs @@ -126,6 +126,10 @@ where "cannot decode compacted revision from META_TABLE: {e:?}" )) })?); + assert!( + compacted_revision >= -1 && compacted_revision <= current_rev, + "compacted revision corruption, which ({compacted_revision}) must belong to the range [-1, {current_rev}]" + ); self.update_compacted_revision(compacted_revision); if let Err(e) = self.compact_task_tx.send((compacted_revision, None)) { panic!("the compactor exited unexpectedly: {e:?}"); @@ -644,6 +648,7 @@ where ) -> Result<(Vec, Vec), ExecuteError> { let revision = req.revision; let ops = vec![WriteOp::PutCompactRevision(revision)]; + // TODO: Remove the physical process logic here. It's better to move into the KvServer #[allow(clippy::collapsible_else_if)] if req.physical { let event = Arc::new(event_listener::Event::new());