From 49bbbf55b12fc3944a6be88ca7c1b7c430e986a1 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Sun, 2 Jul 2023 17:28:07 +0800 Subject: [PATCH] feat: add conflict check logic for compact Refs: #188 Signed-off-by: Phoeniix Zhao --- xline/src/server/command.rs | 133 ++++++++++++++++++++++++++++++++++-- xlineapi/src/lib.rs | 28 +++++++- 2 files changed, 156 insertions(+), 5 deletions(-) diff --git a/xline/src/server/command.rs b/xline/src/server/command.rs index 59cf406ebf..bed95acd5b 100644 --- a/xline/src/server/command.rs +++ b/xline/src/server/command.rs @@ -481,10 +481,36 @@ impl ConflictCheck for Command { return true; } + if this_req.is_compaction_request() && other_req.is_compaction_request() { + return true; + } + + if (this_req.is_txn_request() && other_req.is_compaction_request()) + || (this_req.is_compaction_request() && other_req.is_txn_request()) + { + let conflict = match (this_req.clone(), other_req.clone()) { + ( + RequestWrapper::CompactionRequest(ref com_req), + RequestWrapper::TxnRequest(ref txn_req), + ) + | ( + RequestWrapper::TxnRequest(ref txn_req), + RequestWrapper::CompactionRequest(ref com_req), + ) => { + let target_revision = com_req.revision; + txn_req.is_conflict_with_rev(target_revision) + } + _ => false, + }; + if conflict { + return true; + } + } + let this_lease_ids = get_lease_ids(this_req); let other_lease_ids = get_lease_ids(other_req); let lease_conflict = !this_lease_ids.is_disjoint(&other_lease_ids); - let key_conflict = self + let key_conflict: bool = self .keys() .iter() .cartesian_product(other.keys().iter()) @@ -579,8 +605,8 @@ impl CurpCommand for Command { mod test { use super::*; use crate::rpc::{ - AuthEnableRequest, AuthStatusRequest, LeaseGrantRequest, LeaseLeasesRequest, - LeaseRevokeRequest, PutRequest, RequestOp, TxnRequest, + AuthEnableRequest, AuthStatusRequest, CompactionRequest, LeaseGrantRequest, + LeaseLeasesRequest, LeaseRevokeRequest, PutRequest, RangeRequest, RequestOp, TxnRequest, }; #[test] @@ -673,7 +699,7 @@ mod test { lease: 123, ..Default::default() })), - ProposeId::new("id5".to_owned()), + ProposeId::new("id8".to_owned()), ); let txn_with_lease_id_cmd = Command::new( vec![KeyRange::new_one_key("key")], @@ -697,6 +723,8 @@ mod test { ProposeId::new("id4".to_owned()), ); + + assert!(lease_grant_cmd.is_conflict(&put_with_lease_cmd)); // lease id assert!(lease_grant_cmd.is_conflict(&txn_with_lease_id_cmd)); // lease id assert!(put_with_lease_cmd.is_conflict(&txn_with_lease_id_cmd)); // lease id @@ -707,5 +735,102 @@ mod test { assert!(cmd5.is_conflict(&cmd6)); // lease id assert!(lease_leases_cmd.is_conflict(&cmd5)); // lease read and write assert!(cmd6.is_conflict(&lease_leases_cmd)); // lease read and write + + } + + #[test] + fn test_compaction_txn_conflict() { + + let compaction_cmd_1 = Command::new( + vec![], + RequestWithToken::new(RequestWrapper::CompactionRequest(CompactionRequest { + revision: 3, + physical: false, + })), + ProposeId::new("id11".to_owned()), + ); + + let compaction_cmd_2 = Command::new( + vec![], + RequestWithToken::new(RequestWrapper::CompactionRequest(CompactionRequest { + revision: 5, + physical: false, + })), + ProposeId::new("id12".to_owned()), + ); + + let txn_with_lease_id_cmd = Command::new( + vec![KeyRange::new_one_key("key")], + RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { + compare: vec![], + success: vec![RequestOp { + request: Some(Request::RequestPut(PutRequest { + key: b"key".to_vec(), + value: b"value".to_vec(), + lease: 123, + ..Default::default() + })), + }], + failure: vec![], + })), + ProposeId::new("id6".to_owned()), + ); + + let txn_cmd_1 = Command::new( + vec![KeyRange::new_one_key("key")], + RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { + compare: vec![], + success: vec![RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: b"key".to_vec(), + ..Default::default() + })), + }], + failure: vec![], + })), + ProposeId::new("id13".to_owned()), + ); + + let txn_cmd_2 = Command::new( + vec![KeyRange::new_one_key("key")], + RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { + compare: vec![], + success: vec![RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: b"key".to_vec(), + revision: 3, + ..Default::default() + })), + }], + failure: vec![], + })), + ProposeId::new("id14".to_owned()), + ); + + let txn_cmd_3 = Command::new( + vec![KeyRange::new_one_key("key")], + RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest { + compare: vec![], + success: vec![RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: b"key".to_vec(), + revision: 7, + ..Default::default() + })), + }], + failure: vec![], + })), + ProposeId::new("id15".to_owned()), + ); + + assert!(compaction_cmd_1.is_conflict(&compaction_cmd_2)); + assert!(compaction_cmd_2.is_conflict(&compaction_cmd_1)); + assert!(!compaction_cmd_1.is_conflict(&txn_with_lease_id_cmd)); + assert!(!compaction_cmd_2.is_conflict(&txn_with_lease_id_cmd)); + + assert!(!compaction_cmd_2.is_conflict(&txn_cmd_1)); + assert!(compaction_cmd_2.is_conflict(&txn_cmd_2)); + assert!(!compaction_cmd_2.is_conflict(&txn_cmd_3)); + } } diff --git a/xlineapi/src/lib.rs b/xlineapi/src/lib.rs index 51ebe8f2dc..933715ecd3 100644 --- a/xlineapi/src/lib.rs +++ b/xlineapi/src/lib.rs @@ -457,7 +457,9 @@ impl RequestWrapper { /// Check whether the kv request or lease request should skip the revision or not pub fn skip_general_revision(&self) -> bool { match self { - RequestWrapper::RangeRequest(_) | RequestWrapper::LeaseGrantRequest(_) => true, + RequestWrapper::RangeRequest(_) + | RequestWrapper::LeaseGrantRequest(_) + | RequestWrapper::CompactionRequest(_) => true, RequestWrapper::TxnRequest(req) => req.is_read_only(), _ => false, } @@ -473,6 +475,14 @@ impl RequestWrapper { self.backend() == RequestBackend::Kv } + pub fn is_compaction_request(&self) -> bool { + matches!(*self, RequestWrapper::CompactionRequest(_)) + } + + pub fn is_txn_request(&self) -> bool { + matches!(*self, RequestWrapper::TxnRequest(_)) + } + pub fn is_lease_read_request(&self) -> bool { matches!(*self, RequestWrapper::LeaseLeasesRequest(_)) } @@ -659,6 +669,22 @@ impl TxnRequest { }; self.success.iter().all(read_only_checker) && self.failure.iter().all(read_only_checker) } + + /// Checks whether a `TxnRequest` is conflict with a given revision + pub fn is_conflict_with_rev(&self, revision: i64) -> bool { + let conflict_checker = |req: &RequestOp| { + if let Some(ref request) = req.request { + match request { + Request::RequestRange(req) => req.revision > 0 && req.revision < revision, + Request::RequestDeleteRange(_) | Request::RequestPut(_) => false, + Request::RequestTxn(req) => req.is_conflict_with_rev(revision), + } + } else { + false + } + }; + self.success.iter().any(conflict_checker) || self.failure.iter().any(conflict_checker) + } } #[cfg(test)]