Skip to content

Commit

Permalink
feat: implement compaction in xline-client
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>
  • Loading branch information
bsbds committed Jul 20, 2023
1 parent 164982f commit 1f19685
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 3 deletions.
35 changes: 33 additions & 2 deletions xline-client/src/clients/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ use std::sync::Arc;
use curp::{client::Client as CurpClient, cmd::ProposeId};
use uuid::Uuid;
use xline::server::{Command, KeyRange};
use xlineapi::{DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken, TxnResponse};
use xlineapi::{
CompactionResponse, DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken,
TxnResponse,
};

use crate::{
error::Result,
types::kv::{DeleteRangeRequest, PutRequest, RangeRequest, Txn},
types::kv::{CompactionRequest, DeleteRangeRequest, PutRequest, RangeRequest, Txn},
};

/// Client for KV operations.
Expand Down Expand Up @@ -111,6 +114,34 @@ impl KvClient {
Ok(res_wrapper.into())
}

/// Send `CompactionRequest` by `CurpClient`
///
/// # Errors
///
/// If `CurpClient` failed to send request
#[inline]
pub async fn compact(&self, request: CompactionRequest) -> Result<CompactionResponse> {
let use_fast_path = request.physical();
let propose_id = self.generate_propose_id();
let request = RequestWithToken::new_with_token(
xlineapi::CompactionRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(vec![], request, propose_id);

let res_wrapper = if use_fast_path {
let cmd_res = self.curp_client.propose(cmd).await?;
cmd_res.decode()
} else {
let (cmd_res, sync_res) = self.curp_client.propose_indexed(cmd).await?;
let mut res_wrapper = cmd_res.decode();
res_wrapper.update_revision(sync_res.revision());
res_wrapper
};

Ok(res_wrapper.into())
}

/// Generate a new `ProposeId`
fn generate_propose_id(&self) -> ProposeId {
ProposeId::new(format!("{}-{}", self.name, Uuid::new_v4()))
Expand Down
48 changes: 48 additions & 0 deletions xline-client/src/types/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,51 @@ impl From<Txn> for xlineapi::TxnRequest {
txn.req
}
}

/// Compaction Request compacts the key-value store up to a given revision.
/// All superseded keys with a revision less than the compaction revision will be removed.
#[derive(Debug)]
pub struct CompactionRequest {
/// The inner request
inner: xlineapi::CompactionRequest,
}

impl CompactionRequest {
/// Creates a new `CompactionRequest`
///
/// `Revision` is the key-value store revision for the compaction operation.
#[inline]
#[must_use]
pub fn new(revision: i64) -> Self {
Self {
inner: xlineapi::CompactionRequest {
revision,
..Default::default()
},
}
}

/// Physical is set so the RPC will wait until the compaction is physically
/// applied to the local database such that compacted entries are totally
/// removed from the backend database.
#[inline]
#[must_use]
pub fn with_physical(mut self) -> Self {
self.inner.physical = true;
self
}

/// Get `physical`
#[inline]
#[must_use]
pub fn physical(&self) -> bool {
self.inner.physical
}
}

impl From<CompactionRequest> for xlineapi::CompactionRequest {
#[inline]
fn from(req: CompactionRequest) -> Self {
req.inner
}
}
41 changes: 40 additions & 1 deletion xline-client/tests/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use common::get_cluster_client;
use test_macros::abort_on_panic;
use xline_client::{
error::Result,
types::kv::{Compare, DeleteRangeRequest, PutRequest, RangeRequest, Txn, TxnOp},
types::kv::{
CompactionRequest, Compare, DeleteRangeRequest, PutRequest, RangeRequest, Txn, TxnOp,
},
};
use xlineapi::CompareResult;

Expand Down Expand Up @@ -232,3 +234,40 @@ async fn txn_should_execute_as_expected() -> Result<()> {

Ok(())
}

#[tokio::test]
#[abort_on_panic]
async fn compact_should_remove_previous_revision() -> Result<()> {
let (_cluster, client) = get_cluster_client().await?;
let mut client = client.kv_client();

client.put(PutRequest::new("compact", "0")).await?;
client.put(PutRequest::new("compact", "1")).await?;

// before compacting
let rev0_resp = client
.range(RangeRequest::new("compact").with_revision(2))
.await?;
assert_eq!(rev0_resp.kvs[0].value, b"0");
let rev1_resp = client
.range(RangeRequest::new("compact").with_revision(3))
.await?;
assert_eq!(rev1_resp.kvs[0].value, b"1");

client.compact(CompactionRequest::new(4)).await?;

// after compacting
let rev0_resp = client
.range(RangeRequest::new("compact").with_revision(2))
.await?;
assert!(
rev0_resp.kvs.is_empty(),
"kvs should be empty after compaction"
);
let rev1_resp = client
.range(RangeRequest::new("compact").with_revision(3))
.await?;
assert_eq!(rev1_resp.kvs[0].value, b"1");

Ok(())
}

0 comments on commit 1f19685

Please sign in to comment.