Skip to content

Commit

Permalink
refactor: refactor auto compactor implementation
Browse files Browse the repository at this point in the history
* using static generic instead of dynamic trait object
* improve revision window's implementation

Signed-off-by: Phoeniix Zhao <Phoenix500526@163.com>
  • Loading branch information
Phoenix500526 committed Aug 3, 2023
1 parent acaa487 commit 7f1f239
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 72 deletions.
1 change: 0 additions & 1 deletion xline-client/src/types/auth.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use xline::server::KeyRange;

pub use xlineapi::{
AuthDisableResponse, AuthEnableResponse, AuthRoleAddResponse, AuthRoleDeleteResponse,
AuthRoleGetResponse, AuthRoleGrantPermissionResponse, AuthRoleListResponse,
Expand Down
3 changes: 1 addition & 2 deletions xline-client/src/types/kv.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use xline::server::KeyRange;

pub use xlineapi::{
CompareResult, CompareTarget, DeleteRangeResponse, PutResponse, RangeResponse, Response,
ResponseOp, SortOrder, SortTarget, TargetUnion, TxnResponse,
Expand Down Expand Up @@ -714,7 +713,7 @@ impl From<TxnRequest> for xlineapi::TxnRequest {

/// Compaction Request compacts the key-value store up to a given revision.
/// All keys with revisions less than the given revision will be compacted.
/// The compaction process will remove all historical versions of these keys, except for the most recent one.
/// The compaction process will remove all historical versions of these keys, except for the most recent one.
/// For example, here is a revision list: [(A, 1), (A, 2), (A, 3), (A, 4), (A, 5)].
/// We compact at revision 3. After the compaction, the revision list will become [(A, 3), (A, 4), (A, 5)].
/// All revisions less than 3 are deleted. The latest revision, 3, will be kept.
Expand Down
2 changes: 1 addition & 1 deletion xline-client/src/types/lease.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures::channel::mpsc::Sender;

pub use xlineapi::{
LeaseGrantResponse, LeaseKeepAliveResponse, LeaseLeasesResponse, LeaseRevokeResponse,
LeaseStatus, LeaseTimeToLiveResponse,
};

use crate::error::{ClientError, Result};

/// The lease keep alive handle.
Expand Down
3 changes: 1 addition & 2 deletions xline-client/src/types/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use std::fmt::Debug;

use futures::channel::mpsc::Sender;
use xline::server::KeyRange;
pub use xlineapi::{Event, EventType, KeyValue, WatchResponse};
use xlineapi::{RequestUnion, WatchCancelRequest, WatchProgressRequest};

use crate::error::{ClientError, Result};

pub use xlineapi::{Event, EventType, KeyValue, WatchResponse};

/// The watching handle.
#[derive(Debug)]
pub struct Watcher {
Expand Down
7 changes: 4 additions & 3 deletions xline-client/tests/watch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! The following tests are originally from `etcd-client`
use crate::common::get_cluster_client;
use xline_client::{
error::Result,
types::kv::PutRequest,
types::watch::{EventType, WatchRequest},
types::{
kv::PutRequest,
watch::{EventType, WatchRequest},
},
};

use crate::common::get_cluster_client;
Expand Down
16 changes: 9 additions & 7 deletions xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,16 @@ impl XlineServer {

let auto_compactor = if let Some(auto_config_cfg) = *self.compact_cfg.auto_compact_config()
{
auto_compactor(
self.is_leader,
Arc::clone(&client),
header_gen.revision_arc(),
Arc::clone(&self.shutdown_trigger),
auto_config_cfg,
Some(
auto_compactor(
self.is_leader,
Arc::clone(&client),
header_gen.general_revision_arc(),
Arc::clone(&self.shutdown_trigger),
auto_config_cfg,
)
.await,
)
.await
} else {
None
};
Expand Down
4 changes: 2 additions & 2 deletions xline/src/storage/compact/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub(crate) async fn auto_compactor(
revision_getter: Arc<RevisionNumberGenerator>,
shutdown_trigger: Arc<Event>,
auto_compact_cfg: AutoCompactConfig,
) -> Option<Arc<dyn Compactor>> {
) -> Arc<dyn Compactor> {
let auto_compactor: Arc<dyn Compactor> = match auto_compact_cfg {
AutoCompactConfig::Periodic(period) => {
PeriodicCompactor::new_arc(is_leader, client, revision_getter, shutdown_trigger, period)
Expand All @@ -90,7 +90,7 @@ pub(crate) async fn auto_compactor(
let _hd = tokio::spawn(async move {
auto_compactor.run().await;
});
Some(compactor_handle)
compactor_handle
}

/// background compact executor
Expand Down
62 changes: 20 additions & 42 deletions xline/src/storage/compact/periodic_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,61 +28,38 @@ impl RevisionWindow {
/// Create a new `RevisionWindow`
fn new(retention: usize) -> Self {
Self {
ring_buf: Vec::with_capacity(retention),
ring_buf: vec![0; retention],
cursor: retention.overflow_sub(1),
retention,
}
}

/// Store the revision into the inner ring buffer
#[allow(clippy::integer_arithmetic)]
#[allow(clippy::integer_arithmetic, clippy::indexing_slicing)]
fn sample(&mut self, revision: i64) {
self.cursor = (self.cursor + 1) % self.retention; // it's ok to do so since cursor will never overflow
match self.ring_buf.len().cmp(&self.retention) {
Ordering::Less => self.ring_buf.push(revision),
Ordering::Equal => {
if let Some(element) = self.ring_buf.get_mut(self.cursor) {
*element = revision;
} else {
unreachable!(
"ring_buf ({:?}) at {} should not be None",
self.ring_buf, self.cursor
);
}
}
Ordering::Greater => {
unreachable!(
"the length of RevisionWindow should be less than {}",
self.retention
)
}
}
self.ring_buf[self.cursor] = revision;
}

/// Retrieve the expired revision that is sampled period ago
#[allow(clippy::indexing_slicing, clippy::integer_arithmetic)]
fn expired_revision(&self) -> Option<i64> {
debug_assert!(
self.ring_buf.len() <= self.retention,
"the length of RevisionWindow should be less than {}",
self.retention
);
if self.ring_buf.len() < self.retention {
let target = self.ring_buf[(self.cursor + 1) % self.retention];
if target == 0 {
None
} else {
let target = (self.cursor + 1) % self.retention;
Some(self.ring_buf[target]) // it's ok to do so since ring_buf[target] should not be None.
Some(target)
}
}
}

/// Revision auto compactor
#[derive(Debug)]
pub(crate) struct PeriodicCompactor {
pub(crate) struct PeriodicCompactor<C: Compactable> {
/// `is_leader` indicates whether the current node is a leader or not.
is_leader: AtomicBool,
/// curp client
client: Arc<dyn Compactable>,
client: Arc<C>,
/// revision getter
revision_getter: Arc<RevisionNumberGenerator>,
/// shutdown trigger
Expand All @@ -91,11 +68,11 @@ pub(crate) struct PeriodicCompactor {
period: Duration,
}

impl PeriodicCompactor {
impl<C: Compactable> PeriodicCompactor<C> {
/// Creates a new revision compactor
pub(super) fn new_arc(
is_leader: bool,
client: Arc<dyn Compactable>,
client: Arc<C>,
revision_getter: Arc<RevisionNumberGenerator>,
shutdown_trigger: Arc<Event>,
period: Duration,
Expand Down Expand Up @@ -150,10 +127,11 @@ impl PeriodicCompactor {

/// Calculate the sample frequency and the total amount of samples.
fn sample_config(period: Duration) -> (Duration, usize) {
let one_hour = Duration::from_secs(60.overflow_mul(60));
let base_interval = match period.cmp(&one_hour) {
/// one hour duration
const ONEHOUR: Duration = Duration::from_secs(3600);
let base_interval = match period.cmp(&ONEHOUR) {
Ordering::Less => period,
Ordering::Equal | Ordering::Greater => one_hour,
Ordering::Equal | Ordering::Greater => ONEHOUR,
};
let divisor = 10;
let check_interval = base_interval
Expand All @@ -170,7 +148,7 @@ fn sample_config(period: Duration) -> (Duration, usize) {
}

#[async_trait::async_trait]
impl Compactor for PeriodicCompactor {
impl<C: Compactable> Compactor for PeriodicCompactor<C> {
fn pause(&self) {
self.is_leader.store(false, Relaxed);
}
Expand Down Expand Up @@ -214,18 +192,18 @@ mod test {
fn revision_window_should_work() {
let mut rw = RevisionWindow::new(3);
assert!(rw.expired_revision().is_none());
rw.sample(0);
assert!(rw.expired_revision().is_none());
rw.sample(1);
assert!(rw.expired_revision().is_none());
rw.sample(2);
assert_eq!(rw.expired_revision(), Some(0));
assert!(rw.expired_revision().is_none());
rw.sample(3);
assert_eq!(rw.expired_revision(), Some(1));
// retention is 2
// The first 3 minutes: 0,1,2
// The first 3 minutes: 1,2,3
// The second 2 minutes: 3,4
rw.sample(3);
rw.sample(4);
assert_eq!(rw.expired_revision(), Some(2));
assert_eq!(rw.expired_revision(), Some(3));
}

#[test]
Expand Down
10 changes: 5 additions & 5 deletions xline/src/storage/compact/revision_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ const CHECK_INTERVAL: Duration = Duration::from_secs(5 * 60);

/// Revision auto compactor
#[derive(Debug)]
pub(crate) struct RevisionCompactor {
pub(crate) struct RevisionCompactor<C: Compactable> {
/// `is_leader` indicates whether the current node is a leader or not.
is_leader: AtomicBool,
/// curp client
client: Arc<dyn Compactable>,
client: Arc<C>,
/// revision getter
revision_getter: Arc<RevisionNumberGenerator>,
/// shutdown trigger
Expand All @@ -31,11 +31,11 @@ pub(crate) struct RevisionCompactor {
retention: i64,
}

impl RevisionCompactor {
impl<C: Compactable> RevisionCompactor<C> {
/// Creates a new revision compactor
pub(super) fn new_arc(
is_leader: bool,
client: Arc<dyn Compactable + Send + Sync>,
client: Arc<C>,
revision_getter: Arc<RevisionNumberGenerator>,
shutdown_trigger: Arc<Event>,
retention: i64,
Expand Down Expand Up @@ -85,7 +85,7 @@ impl RevisionCompactor {
}

#[async_trait::async_trait]
impl Compactor for RevisionCompactor {
impl<C: Compactable> Compactor for RevisionCompactor<C> {
fn pause(&self) {
self.is_leader.store(false, Relaxed);
}
Expand Down
3 changes: 1 addition & 2 deletions xlinectl/src/command/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result

#[cfg(test)]
mod tests {
use crate::testcase_struct;

use super::*;
use crate::testcase_struct;

testcase_struct!(RangeRequest);

Expand Down
3 changes: 1 addition & 2 deletions xlinectl/src/command/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result

#[cfg(test)]
mod tests {
use crate::testcase_struct;

use super::*;
use crate::testcase_struct;

testcase_struct!(PutRequest);

Expand Down
10 changes: 7 additions & 3 deletions xlinectl/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,13 @@ use clap::{arg, value_parser, Command};
use ext_utils::config::ClientTimeout;
use xline_client::{Client, ClientOptions};

use crate::command::{get, put};
use crate::utils::parser::{parse_endpoints, parse_user};
use crate::utils::printer::{set_printer_type, PrinterType};
use crate::{
command::{get, put},
utils::{
parser::{parse_endpoints, parse_user},
printer::{set_printer_type, PrinterType},
},
};

/// Command definitions and parsers
mod command;
Expand Down

0 comments on commit 7f1f239

Please sign in to comment.