From 1353cf42e0e6b979cd24fefd515384fd3cad078e Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Thu, 20 Jul 2023 17:35:00 +0800 Subject: [PATCH 1/3] feat: implement compaction in `xline-client` Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- xline-client/src/clients/kv.rs | 35 +++++++++++++++++++++++-- xline-client/src/types/kv.rs | 48 ++++++++++++++++++++++++++++++++++ xline-client/tests/kv.rs | 41 ++++++++++++++++++++++++++++- 3 files changed, 121 insertions(+), 3 deletions(-) diff --git a/xline-client/src/clients/kv.rs b/xline-client/src/clients/kv.rs index fc1af4e81..7305934a5 100644 --- a/xline-client/src/clients/kv.rs +++ b/xline-client/src/clients/kv.rs @@ -3,11 +3,14 @@ use std::sync::Arc; use curp::{client::Client as CurpClient, cmd::ProposeId}; use uuid::Uuid; use xline::server::{Command, KeyRange}; -use xlineapi::{DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken, TxnResponse}; +use xlineapi::{ + CompactionResponse, DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken, + TxnResponse, +}; use crate::{ error::Result, - types::kv::{DeleteRangeRequest, PutRequest, RangeRequest, Txn}, + types::kv::{CompactionRequest, DeleteRangeRequest, PutRequest, RangeRequest, Txn}, }; /// Client for KV operations. @@ -111,6 +114,34 @@ impl KvClient { Ok(res_wrapper.into()) } + /// Send `CompactionRequest` by `CurpClient` + /// + /// # Errors + /// + /// If `CurpClient` failed to send request + #[inline] + pub async fn compact(&self, request: CompactionRequest) -> Result { + let use_fast_path = request.physical(); + let propose_id = self.generate_propose_id(); + let request = RequestWithToken::new_with_token( + xlineapi::CompactionRequest::from(request).into(), + self.token.clone(), + ); + let cmd = Command::new(vec![], request, propose_id); + + let res_wrapper = if use_fast_path { + let cmd_res = self.curp_client.propose(cmd).await?; + cmd_res.decode() + } else { + let (cmd_res, sync_res) = self.curp_client.propose_indexed(cmd).await?; + let mut res_wrapper = cmd_res.decode(); + res_wrapper.update_revision(sync_res.revision()); + res_wrapper + }; + + Ok(res_wrapper.into()) + } + /// Generate a new `ProposeId` fn generate_propose_id(&self) -> ProposeId { ProposeId::new(format!("{}-{}", self.name, Uuid::new_v4())) diff --git a/xline-client/src/types/kv.rs b/xline-client/src/types/kv.rs index 3694a8fb7..95ba95e21 100644 --- a/xline-client/src/types/kv.rs +++ b/xline-client/src/types/kv.rs @@ -674,3 +674,51 @@ impl From for xlineapi::TxnRequest { txn.req } } + +/// Compaction Request compacts the key-value store up to a given revision. +/// All superseded keys with a revision less than the compaction revision will be removed. +#[derive(Debug)] +pub struct CompactionRequest { + /// The inner request + inner: xlineapi::CompactionRequest, +} + +impl CompactionRequest { + /// Creates a new `CompactionRequest` + /// + /// `Revision` is the key-value store revision for the compaction operation. + #[inline] + #[must_use] + pub fn new(revision: i64) -> Self { + Self { + inner: xlineapi::CompactionRequest { + revision, + ..Default::default() + }, + } + } + + /// Physical is set so the RPC will wait until the compaction is physically + /// applied to the local database such that compacted entries are totally + /// removed from the backend database. + #[inline] + #[must_use] + pub fn with_physical(mut self) -> Self { + self.inner.physical = true; + self + } + + /// Get `physical` + #[inline] + #[must_use] + pub fn physical(&self) -> bool { + self.inner.physical + } +} + +impl From for xlineapi::CompactionRequest { + #[inline] + fn from(req: CompactionRequest) -> Self { + req.inner + } +} diff --git a/xline-client/tests/kv.rs b/xline-client/tests/kv.rs index efabf66d1..0b3c1d174 100644 --- a/xline-client/tests/kv.rs +++ b/xline-client/tests/kv.rs @@ -3,7 +3,9 @@ use common::get_cluster_client; use test_macros::abort_on_panic; use xline_client::{ error::Result, - types::kv::{Compare, DeleteRangeRequest, PutRequest, RangeRequest, Txn, TxnOp}, + types::kv::{ + CompactionRequest, Compare, DeleteRangeRequest, PutRequest, RangeRequest, Txn, TxnOp, + }, }; use xlineapi::CompareResult; @@ -232,3 +234,40 @@ async fn txn_should_execute_as_expected() -> Result<()> { Ok(()) } + +#[tokio::test] +#[abort_on_panic] +async fn compact_should_remove_previous_revision() -> Result<()> { + let (_cluster, client) = get_cluster_client().await?; + let client = client.kv_client(); + + client.put(PutRequest::new("compact", "0")).await?; + client.put(PutRequest::new("compact", "1")).await?; + + // before compacting + let rev0_resp = client + .range(RangeRequest::new("compact").with_revision(2)) + .await?; + assert_eq!(rev0_resp.kvs[0].value, b"0"); + let rev1_resp = client + .range(RangeRequest::new("compact").with_revision(3)) + .await?; + assert_eq!(rev1_resp.kvs[0].value, b"1"); + + client.compact(CompactionRequest::new(4)).await?; + + // after compacting + let rev0_resp = client + .range(RangeRequest::new("compact").with_revision(2)) + .await?; + assert!( + rev0_resp.kvs.is_empty(), + "kvs should be empty after compaction" + ); + let rev1_resp = client + .range(RangeRequest::new("compact").with_revision(3)) + .await?; + assert_eq!(rev1_resp.kvs[0].value, b"1"); + + Ok(()) +} From 572c47023c41ba3a81b2718f482359a3dac36737 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 24 Jul 2023 15:06:22 +0800 Subject: [PATCH 2/3] docs: add examples to compact in kv client Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- xline-client/examples/kv.rs | 76 ++++++++++++++++++++++++++++++++++ xline-client/src/clients/kv.rs | 35 +++++++++++++++- 2 files changed, 109 insertions(+), 2 deletions(-) create mode 100644 xline-client/examples/kv.rs diff --git a/xline-client/examples/kv.rs b/xline-client/examples/kv.rs new file mode 100644 index 000000000..19aac70e5 --- /dev/null +++ b/xline-client/examples/kv.rs @@ -0,0 +1,76 @@ +use xline_client::{ + error::ClientError as Error, + types::kv::{ + CompactionRequest, Compare, DeleteRangeRequest, PutRequest, RangeRequest, Txn, TxnOp, + }, + Client, ClientOptions, +}; +use xlineapi::CompareResult; + +#[tokio::main] +async fn main() -> Result<(), Error> { + // the name and address of all curp members + let curp_members = [ + ("server0", "10.0.0.1:2379"), + ("server1", "10.0.0.2:2379"), + ("server2", "10.0.0.3:2379"), + ]; + + let client = Client::connect(curp_members, ClientOptions::default()) + .await? + .kv_client(); + + // put + client.put(PutRequest::new("key1", "value1")).await?; + client.put(PutRequest::new("key2", "value2")).await?; + + // range + let resp = client.range(RangeRequest::new("key1")).await?; + + if let Some(kv) = resp.kvs.first() { + println!( + "got key: {}, value: {}", + String::from_utf8_lossy(&kv.key), + String::from_utf8_lossy(&kv.value) + ); + } + + // delete + let resp = client + .delete(DeleteRangeRequest::new("key1").with_prev_kv(true)) + .await?; + + for kv in resp.prev_kvs { + println!( + "deleted key: {}, value: {}", + String::from_utf8_lossy(&kv.key), + String::from_utf8_lossy(&kv.value) + ); + } + + // txn + let txn_req = Txn::new() + .when(&[Compare::value("key2", CompareResult::Equal, "value2")][..]) + .and_then( + &[TxnOp::put( + PutRequest::new("key2", "value3").with_prev_kv(true), + )][..], + ) + .or_else(&[TxnOp::range(RangeRequest::new("key2"))][..]); + + let _resp = client.txn(txn_req).await?; + let resp = client.range(RangeRequest::new("key2")).await?; + // should print "value3" + if let Some(kv) = resp.kvs.first() { + println!( + "got key: {}, value: {}", + String::from_utf8_lossy(&kv.key), + String::from_utf8_lossy(&kv.value) + ); + } + + // compact + let rev = resp.header.unwrap().revision; + let _resp = client.compact(CompactionRequest::new(rev)).await?; + Ok(()) +} diff --git a/xline-client/src/clients/kv.rs b/xline-client/src/clients/kv.rs index 7305934a5..65fc5deaa 100644 --- a/xline-client/src/clients/kv.rs +++ b/xline-client/src/clients/kv.rs @@ -114,11 +114,42 @@ impl KvClient { Ok(res_wrapper.into()) } - /// Send `CompactionRequest` by `CurpClient` + /// Compacts the key-value store up to a given revision. All superseded keys + /// with a revision less than the compaction revision will be removed. /// /// # Errors /// - /// If `CurpClient` failed to send request + /// This function will return an error if the inner CURP client encountered a propose failure + /// + /// # Examples + /// + ///```no_run + /// use xline_client::{ + /// error::Result, + /// types::kv::{CompactionRequest, PutRequest}, + /// Client, ClientOptions, + /// }; + /// + /// #[tokio::main] + /// async fn main() -> Result<()> { + /// let curp_members = [ + /// ("server0", "10.0.0.1:2379"), + /// ("server1", "10.0.0.2:2379"), + /// ("server2", "10.0.0.3:2379"), + /// ]; + /// + /// let client = Client::connect(curp_members, ClientOptions::default()) + /// .await? + /// .kv_client(); + /// + /// let resp_put = client.put(PutRequest::new("key", "val")).await?; + /// let rev = resp_put.header.unwrap().revision; + /// + /// let _resp = client.compact(CompactionRequest::new(rev)).await?; + /// + /// Ok(()) + /// } + /// ``` #[inline] pub async fn compact(&self, request: CompactionRequest) -> Result { let use_fast_path = request.physical(); From 989b4910bc46452ee4c6f51058fb8aca82a41cb2 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 1 Aug 2023 19:33:33 +0800 Subject: [PATCH 3/3] docs: add a note to compaction request description Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- xline-client/src/clients/kv.rs | 8 ++++++-- xline-client/src/types/kv.rs | 6 +++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/xline-client/src/clients/kv.rs b/xline-client/src/clients/kv.rs index 65fc5deaa..18fcd9862 100644 --- a/xline-client/src/clients/kv.rs +++ b/xline-client/src/clients/kv.rs @@ -114,8 +114,12 @@ impl KvClient { Ok(res_wrapper.into()) } - /// Compacts the key-value store up to a given revision. All superseded keys - /// with a revision less than the compaction revision will be removed. + /// Compacts the key-value store up to a given revision. + /// All keys with revisions less than the given revision will be compacted. + /// The compaction process will remove all historical versions of these keys, except for the most recent one. + /// For example, here is a revision list: [(A, 1), (A, 2), (A, 3), (A, 4), (A, 5)]. + /// We compact at revision 3. After the compaction, the revision list will become [(A, 3), (A, 4), (A, 5)]. + /// All revisions less than 3 are deleted. The latest revision, 3, will be kept. /// /// # Errors /// diff --git a/xline-client/src/types/kv.rs b/xline-client/src/types/kv.rs index 95ba95e21..6e1c09893 100644 --- a/xline-client/src/types/kv.rs +++ b/xline-client/src/types/kv.rs @@ -676,7 +676,11 @@ impl From for xlineapi::TxnRequest { } /// Compaction Request compacts the key-value store up to a given revision. -/// All superseded keys with a revision less than the compaction revision will be removed. +/// All keys with revisions less than the given revision will be compacted. +/// The compaction process will remove all historical versions of these keys, except for the most recent one. +/// For example, here is a revision list: [(A, 1), (A, 2), (A, 3), (A, 4), (A, 5)]. +/// We compact at revision 3. After the compaction, the revision list will become [(A, 3), (A, 4), (A, 5)]. +/// All revisions less than 3 are deleted. The latest revision, 3, will be kept. #[derive(Debug)] pub struct CompactionRequest { /// The inner request