Skip to content

Commit

Permalink
feat: add conflict check logic for compact
Browse files Browse the repository at this point in the history
Refs: #188
Signed-off-by: Phoeniix Zhao <Phoenix500526@163.com>
  • Loading branch information
Phoenix500526 committed Jul 2, 2023
1 parent 6477d23 commit 49bbbf5
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 5 deletions.
133 changes: 129 additions & 4 deletions xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,36 @@ impl ConflictCheck for Command {
return true;
}

if this_req.is_compaction_request() && other_req.is_compaction_request() {
return true;
}

if (this_req.is_txn_request() && other_req.is_compaction_request())
|| (this_req.is_compaction_request() && other_req.is_txn_request())
{
let conflict = match (this_req.clone(), other_req.clone()) {
(
RequestWrapper::CompactionRequest(ref com_req),
RequestWrapper::TxnRequest(ref txn_req),
)
| (
RequestWrapper::TxnRequest(ref txn_req),
RequestWrapper::CompactionRequest(ref com_req),
) => {
let target_revision = com_req.revision;
txn_req.is_conflict_with_rev(target_revision)
}
_ => false,
};
if conflict {
return true;
}
}

let this_lease_ids = get_lease_ids(this_req);
let other_lease_ids = get_lease_ids(other_req);
let lease_conflict = !this_lease_ids.is_disjoint(&other_lease_ids);
let key_conflict = self
let key_conflict: bool = self
.keys()
.iter()
.cartesian_product(other.keys().iter())
Expand Down Expand Up @@ -579,8 +605,8 @@ impl CurpCommand for Command {
mod test {
use super::*;
use crate::rpc::{
AuthEnableRequest, AuthStatusRequest, LeaseGrantRequest, LeaseLeasesRequest,
LeaseRevokeRequest, PutRequest, RequestOp, TxnRequest,
AuthEnableRequest, AuthStatusRequest, CompactionRequest, LeaseGrantRequest,
LeaseLeasesRequest, LeaseRevokeRequest, PutRequest, RangeRequest, RequestOp, TxnRequest,
};

#[test]
Expand Down Expand Up @@ -673,7 +699,7 @@ mod test {
lease: 123,
..Default::default()
})),
ProposeId::new("id5".to_owned()),
ProposeId::new("id8".to_owned()),
);
let txn_with_lease_id_cmd = Command::new(
vec![KeyRange::new_one_key("key")],
Expand All @@ -697,6 +723,8 @@ mod test {
ProposeId::new("id4".to_owned()),
);



assert!(lease_grant_cmd.is_conflict(&put_with_lease_cmd)); // lease id
assert!(lease_grant_cmd.is_conflict(&txn_with_lease_id_cmd)); // lease id
assert!(put_with_lease_cmd.is_conflict(&txn_with_lease_id_cmd)); // lease id
Expand All @@ -707,5 +735,102 @@ mod test {
assert!(cmd5.is_conflict(&cmd6)); // lease id
assert!(lease_leases_cmd.is_conflict(&cmd5)); // lease read and write
assert!(cmd6.is_conflict(&lease_leases_cmd)); // lease read and write

}

#[test]
fn test_compaction_txn_conflict() {

let compaction_cmd_1 = Command::new(
vec![],
RequestWithToken::new(RequestWrapper::CompactionRequest(CompactionRequest {
revision: 3,
physical: false,
})),
ProposeId::new("id11".to_owned()),
);

let compaction_cmd_2 = Command::new(
vec![],
RequestWithToken::new(RequestWrapper::CompactionRequest(CompactionRequest {
revision: 5,
physical: false,
})),
ProposeId::new("id12".to_owned()),
);

let txn_with_lease_id_cmd = Command::new(
vec![KeyRange::new_one_key("key")],
RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest {
compare: vec![],
success: vec![RequestOp {
request: Some(Request::RequestPut(PutRequest {
key: b"key".to_vec(),
value: b"value".to_vec(),
lease: 123,
..Default::default()
})),
}],
failure: vec![],
})),
ProposeId::new("id6".to_owned()),
);

let txn_cmd_1 = Command::new(
vec![KeyRange::new_one_key("key")],
RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest {
compare: vec![],
success: vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
..Default::default()
})),
}],
failure: vec![],
})),
ProposeId::new("id13".to_owned()),
);

let txn_cmd_2 = Command::new(
vec![KeyRange::new_one_key("key")],
RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest {
compare: vec![],
success: vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
revision: 3,
..Default::default()
})),
}],
failure: vec![],
})),
ProposeId::new("id14".to_owned()),
);

let txn_cmd_3 = Command::new(
vec![KeyRange::new_one_key("key")],
RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest {
compare: vec![],
success: vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
revision: 7,
..Default::default()
})),
}],
failure: vec![],
})),
ProposeId::new("id15".to_owned()),
);

assert!(compaction_cmd_1.is_conflict(&compaction_cmd_2));
assert!(compaction_cmd_2.is_conflict(&compaction_cmd_1));
assert!(!compaction_cmd_1.is_conflict(&txn_with_lease_id_cmd));
assert!(!compaction_cmd_2.is_conflict(&txn_with_lease_id_cmd));

assert!(!compaction_cmd_2.is_conflict(&txn_cmd_1));
assert!(compaction_cmd_2.is_conflict(&txn_cmd_2));
assert!(!compaction_cmd_2.is_conflict(&txn_cmd_3));

}
}
28 changes: 27 additions & 1 deletion xlineapi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,9 @@ impl RequestWrapper {
/// Check whether the kv request or lease request should skip the revision or not
pub fn skip_general_revision(&self) -> bool {
match self {
RequestWrapper::RangeRequest(_) | RequestWrapper::LeaseGrantRequest(_) => true,
RequestWrapper::RangeRequest(_)
| RequestWrapper::LeaseGrantRequest(_)
| RequestWrapper::CompactionRequest(_) => true,
RequestWrapper::TxnRequest(req) => req.is_read_only(),
_ => false,
}
Expand All @@ -473,6 +475,14 @@ impl RequestWrapper {
self.backend() == RequestBackend::Kv
}

pub fn is_compaction_request(&self) -> bool {
matches!(*self, RequestWrapper::CompactionRequest(_))
}

pub fn is_txn_request(&self) -> bool {
matches!(*self, RequestWrapper::TxnRequest(_))
}

pub fn is_lease_read_request(&self) -> bool {
matches!(*self, RequestWrapper::LeaseLeasesRequest(_))
}
Expand Down Expand Up @@ -659,6 +669,22 @@ impl TxnRequest {
};
self.success.iter().all(read_only_checker) && self.failure.iter().all(read_only_checker)
}

/// Checks whether a `TxnRequest` is conflict with a given revision
pub fn is_conflict_with_rev(&self, revision: i64) -> bool {
let conflict_checker = |req: &RequestOp| {
if let Some(ref request) = req.request {
match request {
Request::RequestRange(req) => req.revision > 0 && req.revision < revision,
Request::RequestDeleteRange(_) | Request::RequestPut(_) => false,
Request::RequestTxn(req) => req.is_conflict_with_rev(revision),
}
} else {
false
}
};
self.success.iter().any(conflict_checker) || self.failure.iter().any(conflict_checker)
}
}

#[cfg(test)]
Expand Down

0 comments on commit 49bbbf5

Please sign in to comment.