diff --git a/xline-client/src/clients/kv.rs b/xline-client/src/clients/kv.rs index 20a6a3d2f8..d0d831fce9 100644 --- a/xline-client/src/clients/kv.rs +++ b/xline-client/src/clients/kv.rs @@ -3,11 +3,14 @@ use std::sync::Arc; use curp::{client::Client as CurpClient, cmd::ProposeId}; use uuid::Uuid; use xline::server::{Command, KeyRange}; -use xlineapi::{DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken, TxnResponse}; +use xlineapi::{ + CompactionResponse, DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken, + TxnResponse, +}; use crate::{ error::Result, - types::kv::{DeleteRangeRequest, PutRequest, RangeRequest, Txn}, + types::kv::{CompactionRequest, DeleteRangeRequest, PutRequest, RangeRequest, Txn}, }; /// Client for KV operations. @@ -111,6 +114,34 @@ impl KvClient { Ok(res_wrapper.into()) } + /// Send `CompactionRequest` by `CurpClient` + /// + /// # Errors + /// + /// If `CurpClient` failed to send request + #[inline] + pub async fn compact(&self, request: CompactionRequest) -> Result { + let use_fast_path = request.physical(); + let propose_id = self.generate_propose_id(); + let request = RequestWithToken::new_with_token( + xlineapi::CompactionRequest::from(request).into(), + self.token.clone(), + ); + let cmd = Command::new(vec![], request, propose_id); + + let res_wrapper = if use_fast_path { + let cmd_res = self.curp_client.propose(cmd).await?; + cmd_res.decode() + } else { + let (cmd_res, sync_res) = self.curp_client.propose_indexed(cmd).await?; + let mut res_wrapper = cmd_res.decode(); + res_wrapper.update_revision(sync_res.revision()); + res_wrapper + }; + + Ok(res_wrapper.into()) + } + /// Generate a new `ProposeId` fn generate_propose_id(&self) -> ProposeId { ProposeId::new(format!("{}-{}", self.name, Uuid::new_v4())) diff --git a/xline-client/src/types/kv.rs b/xline-client/src/types/kv.rs index bd10b30b1c..ddd5e0e249 100644 --- a/xline-client/src/types/kv.rs +++ b/xline-client/src/types/kv.rs @@ -674,3 +674,51 @@ impl From for xlineapi::TxnRequest { txn.req } } + +/// Compaction Request compacts the key-value store up to a given revision. +/// All superseded keys with a revision less than the compaction revision will be removed. +#[derive(Debug)] +pub struct CompactionRequest { + /// The inner request + inner: xlineapi::CompactionRequest, +} + +impl CompactionRequest { + /// Creates a new `CompactionRequest` + /// + /// `Revision` is the key-value store revision for the compaction operation. + #[inline] + #[must_use] + pub fn new(revision: i64) -> Self { + Self { + inner: xlineapi::CompactionRequest { + revision, + ..Default::default() + }, + } + } + + /// Physical is set so the RPC will wait until the compaction is physically + /// applied to the local database such that compacted entries are totally + /// removed from the backend database. + #[inline] + #[must_use] + pub fn with_physical(mut self) -> Self { + self.inner.physical = true; + self + } + + /// Get `physical` + #[inline] + #[must_use] + pub fn physical(&self) -> bool { + self.inner.physical + } +} + +impl From for xlineapi::CompactionRequest { + #[inline] + fn from(req: CompactionRequest) -> Self { + req.inner + } +} diff --git a/xline-client/tests/kv.rs b/xline-client/tests/kv.rs index 995548608e..de39bb8b10 100644 --- a/xline-client/tests/kv.rs +++ b/xline-client/tests/kv.rs @@ -3,7 +3,9 @@ use common::get_cluster_client; use test_macros::abort_on_panic; use xline_client::{ error::Result, - types::kv::{Compare, DeleteRangeRequest, PutRequest, RangeRequest, Txn, TxnOp}, + types::kv::{ + CompactionRequest, Compare, DeleteRangeRequest, PutRequest, RangeRequest, Txn, TxnOp, + }, }; use xlineapi::CompareResult; @@ -232,3 +234,40 @@ async fn txn_should_execute_as_expected() -> Result<()> { Ok(()) } + +#[tokio::test] +#[abort_on_panic] +async fn compact_should_remove_previous_revision() -> Result<()> { + let (_cluster, client) = get_cluster_client().await?; + let mut client = client.kv_client(); + + client.put(PutRequest::new("compact", "0")).await?; + client.put(PutRequest::new("compact", "1")).await?; + + // before compacting + let rev0_resp = client + .range(RangeRequest::new("compact").with_revision(2)) + .await?; + assert_eq!(rev0_resp.kvs[0].value, b"0"); + let rev1_resp = client + .range(RangeRequest::new("compact").with_revision(3)) + .await?; + assert_eq!(rev1_resp.kvs[0].value, b"1"); + + client.compact(CompactionRequest::new(4)).await?; + + // after compacting + let rev0_resp = client + .range(RangeRequest::new("compact").with_revision(2)) + .await?; + assert!( + rev0_resp.kvs.is_empty(), + "kvs should be empty after compaction" + ); + let rev1_resp = client + .range(RangeRequest::new("compact").with_revision(3)) + .await?; + assert_eq!(rev1_resp.kvs[0].value, b"1"); + + Ok(()) +}