Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement client compact #389

Merged
merged 3 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions xline-client/examples/kv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use xline_client::{
error::ClientError as Error,
types::kv::{
CompactionRequest, Compare, DeleteRangeRequest, PutRequest, RangeRequest, Txn, TxnOp,
},
Client, ClientOptions,
};
use xlineapi::CompareResult;

#[tokio::main]
async fn main() -> Result<(), Error> {
// the name and address of all curp members
let curp_members = [
("server0", "10.0.0.1:2379"),
("server1", "10.0.0.2:2379"),
("server2", "10.0.0.3:2379"),
];

let client = Client::connect(curp_members, ClientOptions::default())
.await?
.kv_client();

// put
client.put(PutRequest::new("key1", "value1")).await?;
client.put(PutRequest::new("key2", "value2")).await?;

// range
let resp = client.range(RangeRequest::new("key1")).await?;

if let Some(kv) = resp.kvs.first() {
println!(
"got key: {}, value: {}",
String::from_utf8_lossy(&kv.key),
String::from_utf8_lossy(&kv.value)
);
}

// delete
let resp = client
.delete(DeleteRangeRequest::new("key1").with_prev_kv(true))
.await?;

for kv in resp.prev_kvs {
println!(
"deleted key: {}, value: {}",
String::from_utf8_lossy(&kv.key),
String::from_utf8_lossy(&kv.value)
);
}

// txn
let txn_req = Txn::new()
.when(&[Compare::value("key2", CompareResult::Equal, "value2")][..])
.and_then(
&[TxnOp::put(
PutRequest::new("key2", "value3").with_prev_kv(true),
)][..],
)
.or_else(&[TxnOp::range(RangeRequest::new("key2"))][..]);

let _resp = client.txn(txn_req).await?;
let resp = client.range(RangeRequest::new("key2")).await?;
// should print "value3"
if let Some(kv) = resp.kvs.first() {
println!(
"got key: {}, value: {}",
String::from_utf8_lossy(&kv.key),
String::from_utf8_lossy(&kv.value)
);
}

// compact
let rev = resp.header.unwrap().revision;
let _resp = client.compact(CompactionRequest::new(rev)).await?;
Ok(())
}
70 changes: 68 additions & 2 deletions xline-client/src/clients/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ use std::sync::Arc;
use curp::{client::Client as CurpClient, cmd::ProposeId};
use uuid::Uuid;
use xline::server::{Command, KeyRange};
use xlineapi::{DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken, TxnResponse};
use xlineapi::{
CompactionResponse, DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken,
TxnResponse,
};

use crate::{
error::Result,
types::kv::{DeleteRangeRequest, PutRequest, RangeRequest, Txn},
types::kv::{CompactionRequest, DeleteRangeRequest, PutRequest, RangeRequest, Txn},
};

/// Client for KV operations.
Expand Down Expand Up @@ -111,6 +114,69 @@ impl KvClient {
Ok(res_wrapper.into())
}

/// Compacts the key-value store up to a given revision.
/// All keys with revisions less than the given revision will be compacted.
/// The compaction process will remove all historical versions of these keys, except for the most recent one.
/// For example, here is a revision list: [(A, 1), (A, 2), (A, 3), (A, 4), (A, 5)].
/// We compact at revision 3. After the compaction, the revision list will become [(A, 3), (A, 4), (A, 5)].
/// All revisions less than 3 are deleted. The latest revision, 3, will be kept.
///
/// # Errors
///
/// This function will return an error if the inner CURP client encountered a propose failure
///
/// # Examples
///
///```no_run
/// use xline_client::{
/// error::Result,
/// types::kv::{CompactionRequest, PutRequest},
/// Client, ClientOptions,
/// };
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
/// let curp_members = [
/// ("server0", "10.0.0.1:2379"),
/// ("server1", "10.0.0.2:2379"),
/// ("server2", "10.0.0.3:2379"),
/// ];
///
/// let client = Client::connect(curp_members, ClientOptions::default())
/// .await?
/// .kv_client();
///
/// let resp_put = client.put(PutRequest::new("key", "val")).await?;
/// let rev = resp_put.header.unwrap().revision;
///
/// let _resp = client.compact(CompactionRequest::new(rev)).await?;
///
/// Ok(())
/// }
/// ```
#[inline]
pub async fn compact(&self, request: CompactionRequest) -> Result<CompactionResponse> {
let use_fast_path = request.physical();
let propose_id = self.generate_propose_id();
let request = RequestWithToken::new_with_token(
xlineapi::CompactionRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(vec![], request, propose_id);

let res_wrapper = if use_fast_path {
let cmd_res = self.curp_client.propose(cmd).await?;
cmd_res.decode()
} else {
let (cmd_res, sync_res) = self.curp_client.propose_indexed(cmd).await?;
let mut res_wrapper = cmd_res.decode();
res_wrapper.update_revision(sync_res.revision());
res_wrapper
};

Ok(res_wrapper.into())
}

/// Generate a new `ProposeId`
fn generate_propose_id(&self) -> ProposeId {
ProposeId::new(format!("{}-{}", self.name, Uuid::new_v4()))
Expand Down
52 changes: 52 additions & 0 deletions xline-client/src/types/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,55 @@ impl From<Txn> for xlineapi::TxnRequest {
txn.req
}
}

/// Compaction Request compacts the key-value store up to a given revision.
/// All keys with revisions less than the given revision will be compacted.
/// The compaction process will remove all historical versions of these keys, except for the most recent one.
/// For example, here is a revision list: [(A, 1), (A, 2), (A, 3), (A, 4), (A, 5)].
/// We compact at revision 3. After the compaction, the revision list will become [(A, 3), (A, 4), (A, 5)].
/// All revisions less than 3 are deleted. The latest revision, 3, will be kept.
#[derive(Debug)]
pub struct CompactionRequest {
/// The inner request
inner: xlineapi::CompactionRequest,
}

impl CompactionRequest {
/// Creates a new `CompactionRequest`
///
/// `Revision` is the key-value store revision for the compaction operation.
#[inline]
#[must_use]
pub fn new(revision: i64) -> Self {
Self {
inner: xlineapi::CompactionRequest {
revision,
..Default::default()
},
}
}

/// Physical is set so the RPC will wait until the compaction is physically
/// applied to the local database such that compacted entries are totally
/// removed from the backend database.
#[inline]
#[must_use]
pub fn with_physical(mut self) -> Self {
self.inner.physical = true;
self
}

/// Get `physical`
#[inline]
#[must_use]
pub fn physical(&self) -> bool {
self.inner.physical
}
}

impl From<CompactionRequest> for xlineapi::CompactionRequest {
#[inline]
fn from(req: CompactionRequest) -> Self {
req.inner
}
}
41 changes: 40 additions & 1 deletion xline-client/tests/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use common::get_cluster_client;
use test_macros::abort_on_panic;
use xline_client::{
error::Result,
types::kv::{Compare, DeleteRangeRequest, PutRequest, RangeRequest, Txn, TxnOp},
types::kv::{
CompactionRequest, Compare, DeleteRangeRequest, PutRequest, RangeRequest, Txn, TxnOp,
},
};
use xlineapi::CompareResult;

Expand Down Expand Up @@ -232,3 +234,40 @@ async fn txn_should_execute_as_expected() -> Result<()> {

Ok(())
}

#[tokio::test]
#[abort_on_panic]
async fn compact_should_remove_previous_revision() -> Result<()> {
let (_cluster, client) = get_cluster_client().await?;
let client = client.kv_client();

client.put(PutRequest::new("compact", "0")).await?;
client.put(PutRequest::new("compact", "1")).await?;

// before compacting
let rev0_resp = client
.range(RangeRequest::new("compact").with_revision(2))
.await?;
assert_eq!(rev0_resp.kvs[0].value, b"0");
let rev1_resp = client
.range(RangeRequest::new("compact").with_revision(3))
.await?;
assert_eq!(rev1_resp.kvs[0].value, b"1");

client.compact(CompactionRequest::new(4)).await?;

// after compacting
let rev0_resp = client
.range(RangeRequest::new("compact").with_revision(2))
.await?;
assert!(
rev0_resp.kvs.is_empty(),
"kvs should be empty after compaction"
);
let rev1_resp = client
.range(RangeRequest::new("compact").with_revision(3))
.await?;
assert_eq!(rev1_resp.kvs[0].value, b"1");

Ok(())
}
Loading