From 7993e5040fa75c384ffed271fd4ade74eac9e345 Mon Sep 17 00:00:00 2001 From: Harsh1s Date: Thu, 7 Mar 2024 04:09:57 +0530 Subject: [PATCH] refactor: split config into multiple modules Signed-off-by: Harsh1s --- crates/benchmark/src/runner.rs | 2 +- crates/curp-test-utils/src/test_cmd.rs | 4 +- crates/curp/src/client/mod.rs | 2 +- crates/curp/src/server/cmd_worker/mod.rs | 2 +- crates/curp/src/server/curp_node.rs | 5 +- crates/curp/src/server/mod.rs | 2 +- crates/curp/src/server/raw_curp/log.rs | 2 +- crates/curp/src/server/raw_curp/mod.rs | 2 +- crates/curp/src/server/raw_curp/tests.rs | 2 +- crates/curp/src/server/storage/db.rs | 2 +- crates/curp/tests/it/common/curp_group.rs | 3 +- crates/curp/tests/it/server.rs | 2 +- crates/utils/src/config/auth_config.rs | 28 ++ crates/utils/src/config/client_config.rs | 157 +++++++ crates/utils/src/config/cluster_config.rs | 139 ++++++ crates/utils/src/config/compact_config.rs | 99 +++++ crates/utils/src/config/curp_config.rs | 223 ++++++++++ crates/utils/src/config/engine_config.rs | 26 ++ crates/utils/src/config/log_config.rs | 150 +++++++ crates/utils/src/config/metrics_config.rs | 151 +++++++ crates/utils/src/config/mod.rs | 445 +++++++++++++++++++ crates/utils/src/config/server_config.rs | 106 +++++ crates/utils/src/config/storage_config.rs | 43 ++ crates/utils/src/config/tls_config.rs | 52 +++ crates/utils/src/config/trace_config.rs | 56 +++ crates/utils/src/parser.rs | 6 +- crates/xline-client/src/lib.rs | 2 +- crates/xline-test-utils/src/lib.rs | 6 +- crates/xline/src/server/maintenance.rs | 2 +- crates/xline/src/server/watch_server.rs | 4 +- crates/xline/src/server/xline_server.rs | 5 +- crates/xline/src/storage/auth_store/store.rs | 2 +- crates/xline/src/storage/compact/mod.rs | 2 +- crates/xline/src/storage/db.rs | 2 +- crates/xline/src/storage/kv_store.rs | 2 +- crates/xline/src/storage/kvwatcher.rs | 2 +- crates/xline/src/storage/lease_store/mod.rs | 2 +- crates/xline/src/utils/args.rs | 36 +- crates/xline/src/utils/metrics.rs | 2 +- crates/xline/src/utils/trace.rs | 2 +- crates/xline/tests/it/auth_test.rs | 5 +- crates/xlinectl/src/main.rs | 2 +- 42 files changed, 1739 insertions(+), 50 deletions(-) create mode 100644 crates/utils/src/config/auth_config.rs create mode 100644 crates/utils/src/config/client_config.rs create mode 100644 crates/utils/src/config/cluster_config.rs create mode 100644 crates/utils/src/config/compact_config.rs create mode 100644 crates/utils/src/config/curp_config.rs create mode 100644 crates/utils/src/config/engine_config.rs create mode 100644 crates/utils/src/config/log_config.rs create mode 100644 crates/utils/src/config/metrics_config.rs create mode 100644 crates/utils/src/config/mod.rs create mode 100644 crates/utils/src/config/server_config.rs create mode 100644 crates/utils/src/config/storage_config.rs create mode 100644 crates/utils/src/config/tls_config.rs create mode 100644 crates/utils/src/config/trace_config.rs diff --git a/crates/benchmark/src/runner.rs b/crates/benchmark/src/runner.rs index 26ae232d3..eb8924008 100644 --- a/crates/benchmark/src/runner.rs +++ b/crates/benchmark/src/runner.rs @@ -19,7 +19,7 @@ use tokio::{ time::{Duration, Instant}, }; use tracing::debug; -use utils::config::ClientConfig; +use utils::config::client_config::ClientConfig; use xline_client::{types::kv::PutRequest, ClientOptions}; use crate::{args::Commands, bench_client::BenchClient, Benchmark}; diff --git a/crates/curp-test-utils/src/test_cmd.rs b/crates/curp-test-utils/src/test_cmd.rs index 1fdde4f1a..0aa8979de 100644 --- a/crates/curp-test-utils/src/test_cmd.rs +++ b/crates/curp-test-utils/src/test_cmd.rs @@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::{sync::mpsc, time::sleep}; use tracing::debug; -use utils::config::EngineConfig; +use utils::config::engine_config::EngineConfig; use crate::{META_TABLE, REVISION_TABLE, TEST_TABLE}; @@ -422,7 +422,7 @@ impl TestCE { after_sync_sender: mpsc::UnboundedSender<(TestCommand, LogIndex)>, engine_cfg: EngineConfig, ) -> Self { - let engine_type = match engine_cfg { + let engine_type: EngineType = match engine_cfg { EngineConfig::Memory => EngineType::Memory, EngineConfig::RocksDB(path) => EngineType::Rocks(path), _ => unreachable!("Not supported storage type"), diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 17ded1a7d..42a463a9d 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -30,9 +30,9 @@ use tokio::task::JoinHandle; #[cfg(not(madsim))] use tonic::transport::ClientTlsConfig; use tracing::debug; +use utils::{build_endpoint, config::client_config::ClientConfig}; #[cfg(madsim)] use utils::ClientTlsConfig; -use utils::{build_endpoint, config::ClientConfig}; use self::{ retry::{Retry, RetryConfig}, diff --git a/crates/curp/src/server/cmd_worker/mod.rs b/crates/curp/src/server/cmd_worker/mod.rs index f97be228c..fdcd0b51c 100644 --- a/crates/curp/src/server/cmd_worker/mod.rs +++ b/crates/curp/src/server/cmd_worker/mod.rs @@ -422,7 +422,7 @@ mod tests { use test_macros::abort_on_panic; use tokio::{sync::mpsc, time::Instant}; use tracing_test::traced_test; - use utils::config::EngineConfig; + use utils::config::engine_config::EngineConfig; use super::*; use crate::{log_entry::LogEntry, rpc::ProposeId}; diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index ea0f0806b..b4efe3e32 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -18,12 +18,9 @@ use tokio::{ #[cfg(not(madsim))] use tonic::transport::ClientTlsConfig; use tracing::{debug, error, info, trace, warn}; +use utils::{config::curp_config::CurpConfig, task_manager::{tasks::TaskName, Listener, State, TaskManager}}; #[cfg(madsim)] use utils::ClientTlsConfig; -use utils::{ - config::CurpConfig, - task_manager::{tasks::TaskName, Listener, State, TaskManager}, -}; use super::{ cmd_board::{CmdBoardRef, CommandBoard}, diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index b6fca3a99..0872dafb9 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -7,7 +7,7 @@ use tonic::transport::ClientTlsConfig; use tracing::instrument; #[cfg(madsim)] use utils::ClientTlsConfig; -use utils::{config::CurpConfig, task_manager::TaskManager, tracing::Extract}; +use utils::{config::curp_config::CurpConfig, task_manager::TaskManager, tracing::Extract}; use self::curp_node::CurpNode; pub use self::raw_curp::RawCurp; diff --git a/crates/curp/src/server/raw_curp/log.rs b/crates/curp/src/server/raw_curp/log.rs index 4aee089d3..be1f5ca9c 100644 --- a/crates/curp/src/server/raw_curp/log.rs +++ b/crates/curp/src/server/raw_curp/log.rs @@ -455,7 +455,7 @@ mod tests { use std::{iter::repeat, ops::Index, sync::Arc}; use curp_test_utils::test_cmd::TestCommand; - use utils::config::{default_batch_max_size, default_log_entries_cap}; + use utils::config::curp_config::{default_batch_max_size, default_log_entries_cap}; use super::*; diff --git a/crates/curp/src/server/raw_curp/mod.rs b/crates/curp/src/server/raw_curp/mod.rs index 86c62eace..cffb7c4f2 100644 --- a/crates/curp/src/server/raw_curp/mod.rs +++ b/crates/curp/src/server/raw_curp/mod.rs @@ -37,7 +37,7 @@ use tracing::{ #[cfg(madsim)] use utils::ClientTlsConfig; use utils::{ - config::CurpConfig, + config::curp_config::CurpConfig, parking_lot_lock::{MutexMap, RwLockMap}, task_manager::TaskManager, }; diff --git a/crates/curp/src/server/raw_curp/tests.rs b/crates/curp/src/server/raw_curp/tests.rs index 5e3896c37..8eaeff9d8 100644 --- a/crates/curp/src/server/raw_curp/tests.rs +++ b/crates/curp/src/server/raw_curp/tests.rs @@ -7,7 +7,7 @@ use tokio::{ time::{sleep, Instant}, }; use tracing_test::traced_test; -use utils::config::{ +use utils::config::curp_config::{ default_candidate_timeout_ticks, default_follower_timeout_ticks, default_heartbeat_interval, CurpConfigBuilder, }; diff --git a/crates/curp/src/server/storage/db.rs b/crates/curp/src/server/storage/db.rs index 0c8433080..2319d9580 100644 --- a/crates/curp/src/server/storage/db.rs +++ b/crates/curp/src/server/storage/db.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use async_trait::async_trait; use engine::{Engine, EngineType, StorageEngine, WriteOperation}; use prost::Message; -use utils::config::EngineConfig; +use utils::config::engine_config::EngineConfig; use super::{StorageApi, StorageError}; use crate::{ diff --git a/crates/curp/tests/it/common/curp_group.rs b/crates/curp/tests/it/common/curp_group.rs index 8a4cccfc2..980ac2c82 100644 --- a/crates/curp/tests/it/common/curp_group.rs +++ b/crates/curp/tests/it/common/curp_group.rs @@ -39,7 +39,8 @@ use tracing::debug; use utils::{ build_endpoint, config::{ - default_quota, ClientConfig, CurpConfig, CurpConfigBuilder, EngineConfig, StorageConfig, + client_config::ClientConfig, curp_config::CurpConfig, curp_config::CurpConfigBuilder, + engine_config::EngineConfig, storage_config::default_quota, storage_config::StorageConfig, }, task_manager::{tasks::TaskName, Listener, TaskManager}, }; diff --git a/crates/curp/tests/it/server.rs b/crates/curp/tests/it/server.rs index 5ea89a808..817c6160b 100644 --- a/crates/curp/tests/it/server.rs +++ b/crates/curp/tests/it/server.rs @@ -15,7 +15,7 @@ use curp_test_utils::{ use madsim::rand::{thread_rng, Rng}; use test_macros::abort_on_panic; use tokio::net::TcpListener; -use utils::{config::ClientConfig, timestamp}; +use utils::{config::client_config::ClientConfig, timestamp}; use crate::common::curp_group::{ commandpb::ProposeId, CurpGroup, FetchClusterRequest, ProposeRequest, ProposeResponse, diff --git a/crates/utils/src/config/auth_config.rs b/crates/utils/src/config/auth_config.rs new file mode 100644 index 000000000..2ec2f6f75 --- /dev/null +++ b/crates/utils/src/config/auth_config.rs @@ -0,0 +1,28 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; + +/// Xline auth configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] +pub struct AuthConfig { + /// The public key file + #[getset(get = "pub")] + pub auth_public_key: Option, + /// The private key file + #[getset(get = "pub")] + pub auth_private_key: Option, +} + +impl AuthConfig { + /// Generate a new `AuthConfig` object + #[must_use] + #[inline] + pub fn new(auth_public_key: Option, auth_private_key: Option) -> Self { + Self { + auth_public_key, + auth_private_key, + } + } +} diff --git a/crates/utils/src/config/client_config.rs b/crates/utils/src/config/client_config.rs new file mode 100644 index 000000000..e850f641f --- /dev/null +++ b/crates/utils/src/config/client_config.rs @@ -0,0 +1,157 @@ +use std::time::Duration; + +use getset::Getters; +use serde::Deserialize; + +/// Curp client settings +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +#[allow(clippy::module_name_repetitions)] +pub struct ClientConfig { + /// Curp client wait sync timeout + #[getset(get = "pub")] + #[serde( + with = "duration_format", + default = "default_client_wait_synced_timeout" + )] + wait_synced_timeout: Duration, + + /// Curp client propose request timeout + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_propose_timeout")] + propose_timeout: Duration, + + /// Curp client initial retry interval + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_initial_retry_timeout")] + initial_retry_timeout: Duration, + + /// Curp client max retry interval + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_max_retry_timeout")] + max_retry_timeout: Duration, + + /// Curp client retry interval + #[getset(get = "pub")] + #[serde(default = "default_retry_count")] + retry_count: usize, + + /// Whether to use exponential backoff in retries + #[getset(get = "pub")] + #[serde(default = "default_fixed_backoff")] + fixed_backoff: bool, +} + +impl ClientConfig { + /// Create a new client timeout + /// + /// # Panics + /// + /// Panics if `initial_retry_timeout` is larger than `max_retry_timeout` + #[must_use] + #[inline] + pub fn new( + wait_synced_timeout: Duration, + propose_timeout: Duration, + initial_retry_timeout: Duration, + max_retry_timeout: Duration, + retry_count: usize, + fixed_backoff: bool, + ) -> Self { + assert!( + initial_retry_timeout <= max_retry_timeout, + "`initial_retry_timeout` should less or equal to `max_retry_timeout`" + ); + Self { + wait_synced_timeout, + propose_timeout, + initial_retry_timeout, + max_retry_timeout, + retry_count, + fixed_backoff, + } + } +} + +impl Default for ClientConfig { + #[inline] + fn default() -> Self { + Self { + wait_synced_timeout: default_client_wait_synced_timeout(), + propose_timeout: default_propose_timeout(), + initial_retry_timeout: default_initial_retry_timeout(), + max_retry_timeout: default_max_retry_timeout(), + retry_count: default_retry_count(), + fixed_backoff: default_fixed_backoff(), + } + } +} + +/// default client wait synced timeout +#[must_use] +#[inline] +pub const fn default_client_wait_synced_timeout() -> Duration { + Duration::from_secs(2) +} + +/// default client propose timeout +#[must_use] +#[inline] +pub const fn default_propose_timeout() -> Duration { + Duration::from_secs(1) +} + +/// default initial retry timeout +#[must_use] +#[inline] +pub const fn default_initial_retry_timeout() -> Duration { + Duration::from_millis(1500) +} + +/// default max retry timeout +#[must_use] +#[inline] +pub const fn default_max_retry_timeout() -> Duration { + Duration::from_millis(10_000) +} + +/// default retry count +#[cfg(not(madsim))] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 3 +} + +/// default retry count +#[cfg(madsim)] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 10 +} + +/// default use backoff +#[must_use] +#[inline] +pub const fn default_fixed_backoff() -> bool { + false +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{self, Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/cluster_config.rs b/crates/utils/src/config/cluster_config.rs new file mode 100644 index 000000000..2a56e4de3 --- /dev/null +++ b/crates/utils/src/config/cluster_config.rs @@ -0,0 +1,139 @@ +use std::collections::HashMap; + +use getset::Getters; +use serde::Deserialize; + +use super::client_config::ClientConfig; +use super::curp_config::CurpConfig; +use super::server_config::ServerTimeout; + +/// Cluster configuration object, including cluster relevant configuration fields +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct ClusterConfig { + /// Get xline server name + #[getset(get = "pub")] + name: String, + /// Xline server peer listen urls + #[getset(get = "pub")] + peer_listen_urls: Vec, + /// Xline server peer advertise urls + #[getset(get = "pub")] + peer_advertise_urls: Vec, + /// Xline server client listen urls + #[getset(get = "pub")] + client_listen_urls: Vec, + /// Xline server client advertise urls + #[getset(get = "pub")] + client_advertise_urls: Vec, + /// All the nodes in the xline cluster + #[getset(get = "pub")] + peers: HashMap>, + /// Leader node. + #[getset(get = "pub")] + is_leader: bool, + /// Curp server timeout settings + #[getset(get = "pub")] + #[serde(default = "CurpConfig::default")] + curp_config: CurpConfig, + /// Curp client config settings + #[getset(get = "pub")] + #[serde(default = "ClientConfig::default")] + client_config: ClientConfig, + /// Xline server timeout settings + #[getset(get = "pub")] + #[serde(default = "ServerTimeout::default")] + server_timeout: ServerTimeout, + /// Xline server initial state + #[getset(get = "pub")] + #[serde(with = "state_format", default = "InitialClusterState::default")] + initial_cluster_state: InitialClusterState, +} + +impl Default for ClusterConfig { + #[inline] + fn default() -> Self { + Self { + name: "default".to_owned(), + peer_listen_urls: vec!["http://127.0.0.1:2380".to_owned()], + peer_advertise_urls: vec!["http://127.0.0.1:2380".to_owned()], + client_listen_urls: vec!["http://127.0.0.1:2379".to_owned()], + client_advertise_urls: vec!["http://127.0.0.1:2379".to_owned()], + peers: HashMap::from([( + "default".to_owned(), + vec!["http://127.0.0.1:2379".to_owned()], + )]), + is_leader: false, + curp_config: CurpConfig::default(), + client_config: ClientConfig::default(), + server_timeout: ServerTimeout::default(), + initial_cluster_state: InitialClusterState::default(), + } + } +} + +impl ClusterConfig { + /// Generate a new `ClusterConfig` object + #[must_use] + #[inline] + #[allow(clippy::too_many_arguments)] + pub fn new( + name: String, + peer_listen_urls: Vec, + peer_advertise_urls: Vec, + client_listen_urls: Vec, + client_advertise_urls: Vec, + members: HashMap>, + is_leader: bool, + curp: CurpConfig, + client_config: ClientConfig, + server_timeout: ServerTimeout, + initial_cluster_state: InitialClusterState, + ) -> Self { + Self { + name, + peer_listen_urls, + peer_advertise_urls, + client_listen_urls, + client_advertise_urls, + peers: members, + is_leader, + curp_config: curp, + client_config, + server_timeout, + initial_cluster_state, + } + } +} + +/// Cluster Range type alias +pub type ClusterRange = std::ops::Range; + +/// Initial cluster state of xline server +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)] +#[non_exhaustive] +pub enum InitialClusterState { + /// Create a new cluster + #[default] + New, + /// Join an existing cluster + Existing, +} + +/// `InitialClusterState` deserialization formatter +pub mod state_format { + use serde::{self, Deserialize, Deserializer}; + + use super::InitialClusterState; + use crate::parse_state; + + /// deserializes a cluster log rotation strategy + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_state(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/compact_config.rs b/crates/utils/src/config/compact_config.rs new file mode 100644 index 000000000..e5eea2aa7 --- /dev/null +++ b/crates/utils/src/config/compact_config.rs @@ -0,0 +1,99 @@ +use std::time::Duration; + +use getset::Getters; +use serde::Deserialize; + +/// Compaction configuration +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Getters)] +#[allow(clippy::module_name_repetitions)] +pub struct CompactConfig { + /// The max number of historical versions processed in a single compact operation + #[getset(get = "pub")] + #[serde(default = "default_compact_batch_size")] + pub compact_batch_size: usize, + /// The interval between two compaction batches + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_compact_sleep_interval")] + pub compact_sleep_interval: Duration, + /// The auto compactor config + #[getset(get = "pub")] + pub auto_compact_config: Option, +} + +impl Default for CompactConfig { + #[inline] + fn default() -> Self { + Self { + compact_batch_size: default_compact_batch_size(), + compact_sleep_interval: default_compact_sleep_interval(), + auto_compact_config: None, + } + } +} + +impl CompactConfig { + /// Create a new compact config + #[must_use] + #[inline] + pub fn new( + compact_batch_size: usize, + compact_sleep_interval: Duration, + auto_compact_config: Option, + ) -> Self { + Self { + compact_batch_size, + compact_sleep_interval, + auto_compact_config, + } + } +} + +/// Auto Compactor Configuration +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] +#[serde( + tag = "mode", + content = "retention", + rename_all(deserialize = "lowercase") +)] +pub enum AutoCompactConfig { + /// auto periodic compactor + #[serde(with = "duration_format")] + Periodic(Duration), + /// auto revision compactor + Revision(i64), +} + +/// default compact batch size +#[must_use] +#[inline] +pub const fn default_compact_batch_size() -> usize { + 1000 +} + +/// default compact interval +#[must_use] +#[inline] +pub const fn default_compact_sleep_interval() -> Duration { + Duration::from_millis(10) +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{self, Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/curp_config.rs b/crates/utils/src/config/curp_config.rs new file mode 100644 index 000000000..5492fe67c --- /dev/null +++ b/crates/utils/src/config/curp_config.rs @@ -0,0 +1,223 @@ +use std::time::Duration; + +use derive_builder::Builder; +use getset::Getters; +use serde::Deserialize; + +use super::engine_config::EngineConfig; + +/// Curp server timeout settings +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Builder)] +#[allow(clippy::module_name_repetitions, clippy::exhaustive_structs)] +pub struct CurpConfig { + /// Heartbeat Interval + #[builder(default = "default_heartbeat_interval()")] + #[serde(with = "duration_format", default = "default_heartbeat_interval")] + pub heartbeat_interval: Duration, + + /// Curp wait sync timeout + #[builder(default = "default_server_wait_synced_timeout()")] + #[serde( + with = "duration_format", + default = "default_server_wait_synced_timeout" + )] + pub wait_synced_timeout: Duration, + + /// Curp propose retry count + #[builder(default = "default_retry_count()")] + #[serde(default = "default_retry_count")] + pub retry_count: usize, + + /// Curp rpc timeout + #[builder(default = "default_rpc_timeout()")] + #[serde(with = "duration_format", default = "default_rpc_timeout")] + pub rpc_timeout: Duration, + + /// Curp append entries batch timeout + /// If the `batch_timeout` has expired, then it will be dispatched + /// whether its size reaches the `BATCHING_MSG_MAX_SIZE` or not. + #[builder(default = "default_batch_timeout()")] + #[serde(with = "duration_format", default = "default_batch_timeout")] + pub batch_timeout: Duration, + + /// The maximum number of bytes per batch. + #[builder(default = "default_batch_max_size()")] + #[serde(with = "bytes_format", default = "default_batch_max_size")] + pub batch_max_size: u64, + + /// How many ticks a follower is allowed to miss before it starts a new round of election + /// The actual timeout will be randomized and in between heartbeat_interval * [follower_timeout_ticks, 2 * follower_timeout_ticks) + #[builder(default = "default_follower_timeout_ticks()")] + #[serde(default = "default_follower_timeout_ticks")] + pub follower_timeout_ticks: u8, + + /// How many ticks a candidate needs to wait before it starts a new round of election + /// It should be smaller than `follower_timeout_ticks` + /// The actual timeout will be randomized and in between heartbeat_interval * [candidate_timeout_ticks, 2 * candidate_timeout_ticks) + #[builder(default = "default_candidate_timeout_ticks()")] + #[serde(default = "default_candidate_timeout_ticks")] + pub candidate_timeout_ticks: u8, + + /// Curp storage path + #[builder(default = "EngineConfig::default()")] + #[serde(default = "EngineConfig::default")] + pub engine_cfg: EngineConfig, + + /// Number of command execute workers + #[builder(default = "default_cmd_workers()")] + #[serde(default = "default_cmd_workers")] + pub cmd_workers: u8, + + /// How often should the gc task run + #[builder(default = "default_gc_interval()")] + #[serde(with = "duration_format", default = "default_gc_interval")] + pub gc_interval: Duration, + + /// Number of log entries to keep in memory + #[builder(default = "default_log_entries_cap()")] + #[serde(default = "default_log_entries_cap")] + pub log_entries_cap: usize, +} + +impl Default for CurpConfig { + #[inline] + fn default() -> Self { + Self { + heartbeat_interval: default_heartbeat_interval(), + wait_synced_timeout: default_server_wait_synced_timeout(), + retry_count: default_retry_count(), + rpc_timeout: default_rpc_timeout(), + batch_timeout: default_batch_timeout(), + batch_max_size: default_batch_max_size(), + follower_timeout_ticks: default_follower_timeout_ticks(), + candidate_timeout_ticks: default_candidate_timeout_ticks(), + engine_cfg: EngineConfig::default(), + cmd_workers: default_cmd_workers(), + gc_interval: default_gc_interval(), + log_entries_cap: default_log_entries_cap(), + } + } +} + +/// default heartbeat interval +#[must_use] +#[inline] +pub const fn default_heartbeat_interval() -> Duration { + Duration::from_millis(300) +} + +/// default wait synced timeout +#[must_use] +#[inline] +pub const fn default_server_wait_synced_timeout() -> Duration { + Duration::from_secs(5) +} + +/// default retry count +#[cfg(not(madsim))] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 3 +} + +/// default retry count +#[cfg(madsim)] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 10 +} + +/// default rpc timeout +#[must_use] +#[inline] +pub const fn default_rpc_timeout() -> Duration { + Duration::from_millis(50) +} + +/// default batch timeout +#[must_use] +#[inline] +pub const fn default_batch_timeout() -> Duration { + Duration::from_millis(15) +} + +/// default batch timeout +#[must_use] +#[inline] +#[allow(clippy::arithmetic_side_effects)] +pub const fn default_batch_max_size() -> u64 { + 2 * 1024 * 1024 +} + +/// default follower timeout +#[must_use] +#[inline] +pub const fn default_follower_timeout_ticks() -> u8 { + 5 +} + +/// default candidate timeout ticks +#[must_use] +#[inline] +pub const fn default_candidate_timeout_ticks() -> u8 { + 2 +} + +/// default number of execute workers +#[must_use] +#[inline] +pub const fn default_cmd_workers() -> u8 { + 8 +} + +/// default gc interval +#[must_use] +#[inline] +pub const fn default_gc_interval() -> Duration { + Duration::from_secs(20) +} + +/// default number of log entries to keep in memory +#[must_use] +#[inline] +pub const fn default_log_entries_cap() -> usize { + 5000 +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{self, Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} + +/// batch size deserialization formatter +pub mod bytes_format { + use serde::{self, Deserialize, Deserializer}; + + use crate::parse_batch_bytes; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_batch_bytes(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/engine_config.rs b/crates/utils/src/config/engine_config.rs new file mode 100644 index 000000000..12a8a5057 --- /dev/null +++ b/crates/utils/src/config/engine_config.rs @@ -0,0 +1,26 @@ +use std::path::PathBuf; + +use serde::Deserialize; + +/// Engine Configuration +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[serde( + tag = "type", + content = "data_dir", + rename_all(deserialize = "lowercase") +)] +pub enum EngineConfig { + /// Memory Storage Engine + Memory, + /// RocksDB Storage Engine + RocksDB(PathBuf), +} + +impl Default for EngineConfig { + #[inline] + fn default() -> Self { + Self::Memory + } +} diff --git a/crates/utils/src/config/log_config.rs b/crates/utils/src/config/log_config.rs new file mode 100644 index 000000000..d555739ac --- /dev/null +++ b/crates/utils/src/config/log_config.rs @@ -0,0 +1,150 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; +use tracing_appender::rolling::RollingFileAppender; + +/// Log verbosity level alias +#[allow(clippy::module_name_repetitions)] +pub type LevelConfig = tracing::metadata::LevelFilter; + +/// Log configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct LogConfig { + /// Log file path + #[getset(get = "pub")] + #[serde(default)] + path: Option, + /// Log rotation strategy + #[getset(get = "pub")] + #[serde(with = "rotation_format", default = "default_rotation")] + rotation: RotationConfig, + /// Log verbosity level + #[getset(get = "pub")] + #[serde(with = "level_format", default = "default_log_level")] + level: LevelConfig, +} + +impl Default for LogConfig { + #[inline] + fn default() -> Self { + Self { + path: None, + rotation: default_rotation(), + level: default_log_level(), + } + } +} + +/// `LevelConfig` deserialization formatter +pub mod level_format { + use serde::{self, Deserialize, Deserializer}; + + use super::LevelConfig; + use crate::parse_log_level; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_log_level(&s).map_err(serde::de::Error::custom) + } +} + +/// default log level +#[must_use] +#[inline] +pub const fn default_log_level() -> LevelConfig { + LevelConfig::INFO +} + +impl LogConfig { + /// Generate a new `LogConfig` object + #[must_use] + #[inline] + pub fn new(path: PathBuf, rotation: RotationConfig, level: LevelConfig) -> Self { + Self { + path: Some(path), + rotation, + level, + } + } +} + +/// Xline log rotation strategy +#[non_exhaustive] +#[allow(clippy::module_name_repetitions)] +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq)] +#[serde(rename_all(deserialize = "lowercase"))] +pub enum RotationConfig { + /// Rotate log file in every hour + Hourly, + /// Rotate log file every day + Daily, + /// Never rotate log file + Never, +} + +impl std::fmt::Display for RotationConfig { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + RotationConfig::Hourly => write!(f, "hourly"), + RotationConfig::Daily => write!(f, "daily"), + RotationConfig::Never => write!(f, "never"), + } + } +} + +/// `RotationConfig` deserialization formatter +pub mod rotation_format { + use serde::{self, Deserialize, Deserializer}; + + use super::RotationConfig; + use crate::parse_rotation; + + /// deserializes a cluster log rotation strategy + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_rotation(&s).map_err(serde::de::Error::custom) + } +} + +/// default log rotation strategy +#[must_use] +#[inline] +pub const fn default_rotation() -> RotationConfig { + RotationConfig::Daily +} + +/// Generates a `RollingFileAppender` from the given `RotationConfig` and `name` +#[must_use] +#[inline] +pub fn file_appender( + rotation: RotationConfig, + file_path: &PathBuf, + name: &str, +) -> RollingFileAppender { + match rotation { + RotationConfig::Hourly => { + tracing_appender::rolling::hourly(file_path, format!("xline_{name}.log")) + } + RotationConfig::Daily => { + tracing_appender::rolling::daily(file_path, format!("xline_{name}.log")) + } + RotationConfig::Never => { + tracing_appender::rolling::never(file_path, format!("xline_{name}.log")) + } + #[allow(unreachable_patterns)] + // It's ok because `parse_rotation` have check the validity before. + _ => unreachable!("should not call file_appender when parse_rotation failed"), + } +} diff --git a/crates/utils/src/config/metrics_config.rs b/crates/utils/src/config/metrics_config.rs new file mode 100644 index 000000000..9da6517c7 --- /dev/null +++ b/crates/utils/src/config/metrics_config.rs @@ -0,0 +1,151 @@ +use getset::Getters; +use serde::Deserialize; + +/// Xline metrics configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct MetricsConfig { + /// Enable or not + #[getset(get = "pub")] + #[serde(default = "default_metrics_enable")] + pub enable: bool, + /// The http port to expose + #[getset(get = "pub")] + #[serde(default = "default_metrics_port")] + pub port: u16, + /// The http path to expose + #[getset(get = "pub")] + #[serde(default = "default_metrics_path")] + pub path: String, + /// Enable push or not + #[getset(get = "pub")] + #[serde(default = "default_metrics_push")] + pub push: bool, + /// Push endpoint + #[getset(get = "pub")] + #[serde(default = "default_metrics_push_endpoint")] + pub push_endpoint: String, + /// Push protocol + #[getset(get = "pub")] + #[serde(with = "protocol_format", default = "default_metrics_push_protocol")] + pub push_protocol: MetricsPushProtocol, +} + +impl MetricsConfig { + /// Create a new `MetricsConfig` + #[must_use] + #[inline] + pub fn new( + enable: bool, + port: u16, + path: String, + push: bool, + push_endpoint: String, + push_protocol: MetricsPushProtocol, + ) -> Self { + Self { + enable, + port, + path, + push, + push_endpoint, + push_protocol, + } + } +} + +impl Default for MetricsConfig { + #[inline] + fn default() -> Self { + Self { + enable: default_metrics_enable(), + port: default_metrics_port(), + path: default_metrics_path(), + push: default_metrics_push(), + push_endpoint: default_metrics_push_endpoint(), + push_protocol: default_metrics_push_protocol(), + } + } +} + +/// Default metrics enable +#[must_use] +#[inline] +pub const fn default_metrics_enable() -> bool { + true +} + +/// Default metrics port +#[must_use] +#[inline] +pub const fn default_metrics_port() -> u16 { + 9100 +} + +/// Default metrics path +#[must_use] +#[inline] +pub fn default_metrics_path() -> String { + "/metrics".to_owned() +} + +/// Default metrics push option +#[must_use] +#[inline] +pub fn default_metrics_push() -> bool { + false +} + +/// Default metrics push protocol +#[must_use] +#[inline] +pub fn default_metrics_push_protocol() -> MetricsPushProtocol { + MetricsPushProtocol::GRPC +} + +/// Default metrics push endpoint +#[must_use] +#[inline] +pub fn default_metrics_push_endpoint() -> String { + "http://127.0.0.1:4318".to_owned() +} + +/// Xline metrics push protocol +#[non_exhaustive] +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all(deserialize = "lowercase"))] +pub enum MetricsPushProtocol { + /// HTTP protocol + HTTP, + /// GRPC protocol + #[default] + GRPC, +} + +impl std::fmt::Display for MetricsPushProtocol { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + MetricsPushProtocol::HTTP => write!(f, "http"), + MetricsPushProtocol::GRPC => write!(f, "grpc"), + } + } +} + +/// Metrics push protocol format +pub mod protocol_format { + use serde::{self, Deserialize, Deserializer}; + + use super::MetricsPushProtocol; + use crate::parse_metrics_push_protocol; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_metrics_push_protocol(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/mod.rs b/crates/utils/src/config/mod.rs new file mode 100644 index 000000000..4a1fc8152 --- /dev/null +++ b/crates/utils/src/config/mod.rs @@ -0,0 +1,445 @@ +/// Xline auth configuration module +pub mod auth_config; +/// Curp client module +pub mod client_config; +/// Cluster configuration module +pub mod cluster_config; +/// Compaction configuration module +pub mod compact_config; +/// Curp server module +pub mod curp_config; +/// Engine Configuration module +pub mod engine_config; +/// Log configuration module +pub mod log_config; +/// Xline metrics configuration module +pub mod metrics_config; +/// Xline server module +pub mod server_config; +/// Storage Configuration module +pub mod storage_config; +/// Xline tls configuration module +pub mod tls_config; +/// Xline tracing configuration module +pub mod trace_config; + +use getset::Getters; +use serde::Deserialize; + +use crate::config::cluster_config::ClusterConfig; +use crate::config::compact_config::CompactConfig; +use crate::config::log_config::LogConfig; +use crate::config::metrics_config::MetricsConfig; +use crate::config::storage_config::StorageConfig; +use crate::config::trace_config::TraceConfig; + +use crate::config::{auth_config::AuthConfig, tls_config::TlsConfig}; + +/// Xline server configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] +pub struct XlineServerConfig { + /// cluster configuration object + #[getset(get = "pub")] + cluster: ClusterConfig, + /// xline storage configuration object + #[getset(get = "pub")] + storage: StorageConfig, + /// log configuration object + #[getset(get = "pub")] + log: LogConfig, + /// trace configuration object + #[getset(get = "pub")] + trace: TraceConfig, + /// auth configuration object + #[getset(get = "pub")] + auth: AuthConfig, + /// compactor configuration object + #[getset(get = "pub")] + compact: CompactConfig, + /// tls configuration object + #[getset(get = "pub")] + tls: TlsConfig, + /// Metrics config + #[getset(get = "pub")] + #[serde(default = "MetricsConfig::default")] + metrics: MetricsConfig, +} + +impl XlineServerConfig { + /// Generates a new `XlineServerConfig` object + #[must_use] + #[inline] + #[allow(clippy::too_many_arguments)] + pub fn new( + cluster: ClusterConfig, + storage: StorageConfig, + log: LogConfig, + trace: TraceConfig, + auth: AuthConfig, + compact: CompactConfig, + tls: TlsConfig, + metrics: MetricsConfig, + ) -> Self { + Self { + cluster, + storage, + log, + trace, + auth, + compact, + tls, + metrics, + } + } +} + +#[cfg(test)] +mod tests { + use crate::{ + config::{ + client_config::{ + default_client_wait_synced_timeout, default_fixed_backoff, default_propose_timeout, + default_retry_count, ClientConfig, + }, + compact_config::AutoCompactConfig, + curp_config::CurpConfigBuilder, + engine_config::EngineConfig, + server_config::ServerTimeout, + storage_config::default_quota, + }, + InitialClusterState, LevelConfig, MetricsPushProtocol, RotationConfig, + }; + + use super::*; + use std::{collections::HashMap, path::PathBuf, time::Duration}; + + #[allow(clippy::too_many_lines)] // just a testcase, not too bad + #[test] + fn test_xline_server_config_should_be_loaded() { + let config: XlineServerConfig = toml::from_str( + r#"[cluster] + name = 'node1' + is_leader = true + initial_cluster_state = 'new' + peer_listen_urls = ['127.0.0.1:2380'] + peer_advertise_urls = ['127.0.0.1:2380'] + client_listen_urls = ['127.0.0.1:2379'] + client_advertise_urls = ['127.0.0.1:2379'] + + [cluster.server_timeout] + range_retry_timeout = '3s' + compact_timeout = '5s' + sync_victims_interval = '20ms' + watch_progress_notify_interval = '1s' + + [cluster.peers] + node1 = ['127.0.0.1:2378', '127.0.0.1:2379'] + node2 = ['127.0.0.1:2380'] + node3 = ['127.0.0.1:2381'] + + [cluster.curp_config] + heartbeat_interval = '200ms' + wait_synced_timeout = '100ms' + rpc_timeout = '100ms' + retry_timeout = '100ms' + + [cluster.client_config] + initial_retry_timeout = '5s' + max_retry_timeout = '50s' + + [storage] + engine = { type = 'memory'} + + [compact] + compact_batch_size = 123 + compact_sleep_interval = '5ms' + + [compact.auto_compact_config] + mode = 'periodic' + retention = '10h' + + [log] + path = '/var/log/xline' + rotation = 'daily' + level = 'info' + + [trace] + jaeger_online = false + jaeger_offline = false + jaeger_output_dir = './jaeger_jsons' + jaeger_level = 'info' + + [auth] + auth_public_key = './public_key.pem' + auth_private_key = './private_key.pem' + + [tls] + server_cert_path = './cert.pem' + server_key_path = './key.pem' + client_ca_cert_path = './ca.pem' + + [metrics] + enable = true + port = 9100 + path = "/metrics" + push = true + push_endpoint = 'http://some-endpoint.com:4396' + push_protocol = 'http' + "#, + ) + .unwrap(); + + let curp_config = CurpConfigBuilder::default() + .heartbeat_interval(Duration::from_millis(200)) + .wait_synced_timeout(Duration::from_millis(100)) + .rpc_timeout(Duration::from_millis(100)) + .build() + .unwrap(); + + let client_config = ClientConfig::new( + default_client_wait_synced_timeout(), + default_propose_timeout(), + Duration::from_secs(5), + Duration::from_secs(50), + default_retry_count(), + default_fixed_backoff(), + ); + + let server_timeout = ServerTimeout::new( + Duration::from_secs(3), + Duration::from_secs(5), + Duration::from_millis(20), + Duration::from_secs(1), + ); + + assert_eq!( + config.cluster, + ClusterConfig::new( + "node1".to_owned(), + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + HashMap::from_iter([ + ( + "node1".to_owned(), + vec!["127.0.0.1:2378".to_owned(), "127.0.0.1:2379".to_owned()] + ), + ("node2".to_owned(), vec!["127.0.0.1:2380".to_owned()]), + ("node3".to_owned(), vec!["127.0.0.1:2381".to_owned()]), + ]), + true, + curp_config, + client_config, + server_timeout, + InitialClusterState::New + ) + ); + + assert_eq!( + config.storage, + StorageConfig::new(EngineConfig::Memory, default_quota()) + ); + + assert_eq!( + config.log, + LogConfig::new( + PathBuf::from("/var/log/xline"), + RotationConfig::Daily, + LevelConfig::INFO + ) + ); + assert_eq!( + config.trace, + TraceConfig::new( + false, + false, + PathBuf::from("./jaeger_jsons"), + LevelConfig::INFO + ) + ); + + assert_eq!( + config.compact, + CompactConfig::new( + 123, + Duration::from_millis(5), + Some(AutoCompactConfig::Periodic(Duration::from_secs( + 10 * 60 * 60 + ))) + ) + ); + + assert_eq!( + config.auth, + AuthConfig { + auth_private_key: Some(PathBuf::from("./private_key.pem")), + auth_public_key: Some(PathBuf::from("./public_key.pem")), + } + ); + + assert_eq!( + config.tls, + TlsConfig { + server_cert_path: Some(PathBuf::from("./cert.pem")), + server_key_path: Some(PathBuf::from("./key.pem")), + client_ca_cert_path: Some(PathBuf::from("./ca.pem")), + ..Default::default() + } + ); + + assert_eq!( + config.metrics, + MetricsConfig { + enable: true, + port: 9100, + path: "/metrics".to_owned(), + push: true, + push_endpoint: "http://some-endpoint.com:4396".to_owned(), + push_protocol: MetricsPushProtocol::HTTP, + }, + ); + } + + #[test] + fn test_xline_server_default_config_should_be_loaded() { + let config: XlineServerConfig = toml::from_str( + r#"[cluster] + name = 'node1' + is_leader = true + peer_listen_urls = ['127.0.0.1:2380'] + peer_advertise_urls = ['127.0.0.1:2380'] + client_listen_urls = ['127.0.0.1:2379'] + client_advertise_urls = ['127.0.0.1:2379'] + + [cluster.peers] + node1 = ['127.0.0.1:2379'] + node2 = ['127.0.0.1:2380'] + node3 = ['127.0.0.1:2381'] + + [cluster.storage] + + [log] + path = '/var/log/xline' + + [storage] + engine = { type = 'rocksdb', data_dir = '/usr/local/xline/data-dir' } + + [compact] + + [trace] + jaeger_online = false + jaeger_offline = false + jaeger_output_dir = './jaeger_jsons' + jaeger_level = 'info' + + [auth] + + [tls] + "#, + ) + .unwrap(); + + assert_eq!( + config.cluster, + ClusterConfig::new( + "node1".to_owned(), + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + HashMap::from([ + ("node1".to_owned(), vec!["127.0.0.1:2379".to_owned()]), + ("node2".to_owned(), vec!["127.0.0.1:2380".to_owned()]), + ("node3".to_owned(), vec!["127.0.0.1:2381".to_owned()]), + ]), + true, + CurpConfigBuilder::default().build().unwrap(), + ClientConfig::default(), + ServerTimeout::default(), + InitialClusterState::default() + ) + ); + + if let EngineConfig::RocksDB(path) = config.storage.engine { + assert_eq!(path, PathBuf::from("/usr/local/xline/data-dir")); + } else { + unreachable!(); + } + + assert_eq!( + config.log, + LogConfig::new( + PathBuf::from("/var/log/xline"), + RotationConfig::Daily, + LevelConfig::INFO + ) + ); + assert_eq!( + config.trace, + TraceConfig::new( + false, + false, + PathBuf::from("./jaeger_jsons"), + LevelConfig::INFO + ) + ); + assert_eq!(config.compact, CompactConfig::default()); + assert_eq!(config.auth, AuthConfig::default()); + assert_eq!(config.tls, TlsConfig::default()); + assert_eq!(config.metrics, MetricsConfig::default()); + } + + #[test] + fn test_auto_revision_compactor_config_should_be_loaded() { + let config: XlineServerConfig = toml::from_str( + r#"[cluster] + name = 'node1' + is_leader = true + peer_listen_urls = ['127.0.0.1:2380'] + peer_advertise_urls = ['127.0.0.1:2380'] + client_listen_urls = ['127.0.0.1:2379'] + client_advertise_urls = ['127.0.0.1:2379'] + + [cluster.peers] + node1 = ['127.0.0.1:2379'] + node2 = ['127.0.0.1:2380'] + node3 = ['127.0.0.1:2381'] + + [cluster.storage] + + [log] + path = '/var/log/xline' + + [storage] + engine = { type = 'memory' } + + [compact] + + [compact.auto_compact_config] + mode = 'revision' + retention = 10000 + + [trace] + jaeger_online = false + jaeger_offline = false + jaeger_output_dir = './jaeger_jsons' + jaeger_level = 'info' + + [auth] + + [tls] + "#, + ) + .unwrap(); + + assert_eq!( + config.compact, + CompactConfig { + auto_compact_config: Some(AutoCompactConfig::Revision(10000)), + ..Default::default() + } + ); + } +} diff --git a/crates/utils/src/config/server_config.rs b/crates/utils/src/config/server_config.rs new file mode 100644 index 000000000..578e8c876 --- /dev/null +++ b/crates/utils/src/config/server_config.rs @@ -0,0 +1,106 @@ +use std::time::Duration; + +use getset::Getters; +use serde::Deserialize; + +/// Xline server settings +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct ServerTimeout { + /// Range request retry timeout settings + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_range_retry_timeout")] + range_retry_timeout: Duration, + /// Range request retry timeout settings + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_compact_timeout")] + compact_timeout: Duration, + /// Sync victims interval + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_sync_victims_interval")] + sync_victims_interval: Duration, + /// Watch progress notify interval settings + #[getset(get = "pub")] + #[serde( + with = "duration_format", + default = "default_watch_progress_notify_interval" + )] + watch_progress_notify_interval: Duration, +} + +impl ServerTimeout { + /// Create a new server timeout + #[must_use] + #[inline] + pub fn new( + range_retry_timeout: Duration, + compact_timeout: Duration, + sync_victims_interval: Duration, + watch_progress_notify_interval: Duration, + ) -> Self { + Self { + range_retry_timeout, + compact_timeout, + sync_victims_interval, + watch_progress_notify_interval, + } + } +} + +impl Default for ServerTimeout { + #[inline] + fn default() -> Self { + Self { + range_retry_timeout: default_range_retry_timeout(), + compact_timeout: default_compact_timeout(), + sync_victims_interval: default_sync_victims_interval(), + watch_progress_notify_interval: default_watch_progress_notify_interval(), + } + } +} + +/// default range retry timeout +#[must_use] +#[inline] +pub const fn default_range_retry_timeout() -> Duration { + Duration::from_secs(2) +} + +/// default compact timeout +#[must_use] +#[inline] +pub const fn default_compact_timeout() -> Duration { + Duration::from_secs(5) +} + +/// default sync victims interval +#[must_use] +#[inline] +pub const fn default_sync_victims_interval() -> Duration { + Duration::from_millis(10) +} + +/// default watch progress notify interval +#[must_use] +#[inline] +pub const fn default_watch_progress_notify_interval() -> Duration { + Duration::from_secs(600) +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{self, Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/storage_config.rs b/crates/utils/src/config/storage_config.rs new file mode 100644 index 000000000..74442d565 --- /dev/null +++ b/crates/utils/src/config/storage_config.rs @@ -0,0 +1,43 @@ +use serde::Deserialize; + +use crate::config::engine_config::EngineConfig; + +/// Storage Configuration +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +pub struct StorageConfig { + /// Engine Configuration + #[serde(default = "EngineConfig::default")] + pub engine: EngineConfig, + /// Quota + #[serde(default = "default_quota")] + pub quota: u64, +} + +impl StorageConfig { + /// Create a new storage config + #[inline] + #[must_use] + pub fn new(engine: EngineConfig, quota: u64) -> Self { + Self { engine, quota } + } +} + +impl Default for StorageConfig { + #[inline] + fn default() -> Self { + Self { + engine: EngineConfig::default(), + quota: default_quota(), + } + } +} + +/// Default quota: 8GB +#[inline] +#[must_use] +pub fn default_quota() -> u64 { + // 8 * 1024 * 1024 * 1024 + 0x0002_0000_0000 +} diff --git a/crates/utils/src/config/tls_config.rs b/crates/utils/src/config/tls_config.rs new file mode 100644 index 000000000..99981a1cc --- /dev/null +++ b/crates/utils/src/config/tls_config.rs @@ -0,0 +1,52 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; + +/// Xline tls configuration object +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] +pub struct TlsConfig { + /// The CA certificate file used by server to verify client certificates + #[getset(get = "pub")] + pub server_ca_cert_path: Option, + /// The public key file used by server + #[getset(get = "pub")] + pub server_cert_path: Option, + /// The private key file used by server + #[getset(get = "pub")] + pub server_key_path: Option, + /// The CA certificate file used by client to verify server certificates + #[getset(get = "pub")] + pub client_ca_cert_path: Option, + /// The public key file used by client + #[getset(get = "pub")] + pub client_cert_path: Option, + /// The private key file used by client + #[getset(get = "pub")] + pub client_key_path: Option, +} + +impl TlsConfig { + /// Create a new `TlsConfig` object + #[must_use] + #[inline] + pub fn new( + server_ca_cert_path: Option, + server_cert_path: Option, + server_key_path: Option, + client_ca_cert_path: Option, + client_cert_path: Option, + client_key_path: Option, + ) -> Self { + Self { + server_ca_cert_path, + server_cert_path, + server_key_path, + client_ca_cert_path, + client_cert_path, + client_key_path, + } + } +} diff --git a/crates/utils/src/config/trace_config.rs b/crates/utils/src/config/trace_config.rs new file mode 100644 index 000000000..866941935 --- /dev/null +++ b/crates/utils/src/config/trace_config.rs @@ -0,0 +1,56 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; + +use super::log_config::{default_log_level, level_format, LevelConfig}; + +/// Xline tracing configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct TraceConfig { + /// Open jaeger online, sending data to jaeger agent directly + #[getset(get = "pub")] + jaeger_online: bool, + /// Open jaeger offline, saving data to the `jaeger_output_dir` + #[getset(get = "pub")] + jaeger_offline: bool, + /// The dir path to save the data when `jaeger_offline` is on + #[getset(get = "pub")] + jaeger_output_dir: PathBuf, + /// The verbosity level of tracing + #[getset(get = "pub")] + #[serde(with = "level_format", default = "default_log_level")] + jaeger_level: LevelConfig, +} + +impl Default for TraceConfig { + #[inline] + fn default() -> Self { + Self { + jaeger_online: false, + jaeger_offline: false, + jaeger_output_dir: "".into(), + jaeger_level: default_log_level(), + } + } +} + +impl TraceConfig { + /// Generate a new `TraceConfig` object + #[must_use] + #[inline] + pub fn new( + jaeger_online: bool, + jaeger_offline: bool, + jaeger_output_dir: PathBuf, + jaeger_level: LevelConfig, + ) -> Self { + Self { + jaeger_online, + jaeger_offline, + jaeger_output_dir, + jaeger_level, + } + } +} diff --git a/crates/utils/src/parser.rs b/crates/utils/src/parser.rs index ba7186074..5442066a7 100644 --- a/crates/utils/src/parser.rs +++ b/crates/utils/src/parser.rs @@ -3,8 +3,10 @@ use std::{collections::HashMap, time::Duration}; use clippy_utilities::OverflowArithmetic; use thiserror::Error; -use crate::config::{ - ClusterRange, InitialClusterState, LevelConfig, MetricsPushProtocol, RotationConfig, +pub use crate::config::{ + cluster_config::{ClusterRange, InitialClusterState}, + log_config::{LevelConfig, RotationConfig}, + metrics_config::MetricsPushProtocol, }; /// seconds per minute diff --git a/crates/xline-client/src/lib.rs b/crates/xline-client/src/lib.rs index 11f780cdc..a65372da2 100644 --- a/crates/xline-client/src/lib.rs +++ b/crates/xline-client/src/lib.rs @@ -174,7 +174,7 @@ use tonic::transport::ClientTlsConfig; use tower::Service; #[cfg(madsim)] use utils::ClientTlsConfig; -use utils::{build_endpoint, config::ClientConfig}; +use utils::{build_endpoint, config::client_config::ClientConfig}; use xlineapi::command::{Command, CurpClient}; use crate::{ diff --git a/crates/xline-test-utils/src/lib.rs b/crates/xline-test-utils/src/lib.rs index e1bfd24de..98390b74b 100644 --- a/crates/xline-test-utils/src/lib.rs +++ b/crates/xline-test-utils/src/lib.rs @@ -10,8 +10,10 @@ use tokio::{ }; use tonic::transport::ClientTlsConfig; use utils::config::{ - default_quota, AuthConfig, ClusterConfig, CompactConfig, EngineConfig, InitialClusterState, - LogConfig, MetricsConfig, StorageConfig, TlsConfig, TraceConfig, XlineServerConfig, + auth_config::AuthConfig, cluster_config::ClusterConfig, cluster_config::InitialClusterState, + compact_config::CompactConfig, engine_config::EngineConfig, log_config::LogConfig, + metrics_config::MetricsConfig, storage_config::default_quota, storage_config::StorageConfig, + tls_config::TlsConfig, trace_config::TraceConfig, XlineServerConfig, }; use xline::server::XlineServer; use xline_client::types::auth::{ diff --git a/crates/xline/src/server/maintenance.rs b/crates/xline/src/server/maintenance.rs index 8ea586818..cb8a44017 100644 --- a/crates/xline/src/server/maintenance.rs +++ b/crates/xline/src/server/maintenance.rs @@ -288,7 +288,7 @@ mod test { use test_macros::abort_on_panic; use tokio_stream::StreamExt; - use utils::config::EngineConfig; + use utils::config::engine_config::EngineConfig; use super::*; use crate::storage::db::DB; diff --git a/crates/xline/src/server/watch_server.rs b/crates/xline/src/server/watch_server.rs index ecc99987f..7155ec03a 100644 --- a/crates/xline/src/server/watch_server.rs +++ b/crates/xline/src/server/watch_server.rs @@ -435,7 +435,9 @@ mod test { sync::mpsc, time::{sleep, timeout}, }; - use utils::config::{default_watch_progress_notify_interval, EngineConfig}; + use utils::config::{ + engine_config::EngineConfig, server_config::default_watch_progress_notify_interval, + }; use xlineapi::RequestWrapper; use super::*; diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index 661c4a38d..0da8c6b46 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -24,8 +24,9 @@ use tonic::transport::{server::Router, Server}; use tracing::{info, warn}; use utils::{ config::{ - AuthConfig, ClusterConfig, CompactConfig, EngineConfig, InitialClusterState, StorageConfig, - TlsConfig, + auth_config::AuthConfig, cluster_config::ClusterConfig, + cluster_config::InitialClusterState, compact_config::CompactConfig, + engine_config::EngineConfig, storage_config::StorageConfig, tls_config::TlsConfig, }, task_manager::{tasks::TaskName, TaskManager}, }; diff --git a/crates/xline/src/storage/auth_store/store.rs b/crates/xline/src/storage/auth_store/store.rs index e91c23776..0d15edd9d 100644 --- a/crates/xline/src/storage/auth_store/store.rs +++ b/crates/xline/src/storage/auth_store/store.rs @@ -1173,7 +1173,7 @@ mod test { use std::collections::HashMap; use merged_range::MergedRange; - use utils::config::EngineConfig; + use utils::config::engine_config::EngineConfig; use super::*; use crate::{ diff --git a/crates/xline/src/storage/compact/mod.rs b/crates/xline/src/storage/compact/mod.rs index 36dcb19f5..083e97b7f 100644 --- a/crates/xline/src/storage/compact/mod.rs +++ b/crates/xline/src/storage/compact/mod.rs @@ -7,7 +7,7 @@ use periodic_compactor::PeriodicCompactor; use revision_compactor::RevisionCompactor; use tokio::{sync::mpsc::Receiver, time::sleep}; use utils::{ - config::AutoCompactConfig, + config::compact_config::AutoCompactConfig, task_manager::{tasks::TaskName, Listener, TaskManager}, }; use xlineapi::{command::Command, execute_error::ExecuteError, RequestWrapper}; diff --git a/crates/xline/src/storage/db.rs b/crates/xline/src/storage/db.rs index 9eaf04aae..ceeb9b27d 100644 --- a/crates/xline/src/storage/db.rs +++ b/crates/xline/src/storage/db.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, path::Path, sync::Arc}; use engine::{Engine, EngineType, Snapshot, StorageEngine, WriteOperation}; use prost::Message; use utils::{ - config::EngineConfig, + config::engine_config::EngineConfig, table_names::{ ALARM_TABLE, AUTH_TABLE, KV_TABLE, LEASE_TABLE, META_TABLE, ROLE_TABLE, USER_TABLE, XLINE_TABLES, diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index ebbedf68e..83cf979c3 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -928,7 +928,7 @@ mod test { use test_macros::abort_on_panic; use tokio::{runtime::Handle, task::block_in_place}; use utils::{ - config::EngineConfig, + config::engine_config::EngineConfig, task_manager::{tasks::TaskName, TaskManager}, }; diff --git a/crates/xline/src/storage/kvwatcher.rs b/crates/xline/src/storage/kvwatcher.rs index 70b39fd0d..ec8dfd1c9 100644 --- a/crates/xline/src/storage/kvwatcher.rs +++ b/crates/xline/src/storage/kvwatcher.rs @@ -604,7 +604,7 @@ mod test { use clippy_utilities::{NumericCast, OverflowArithmetic}; use test_macros::abort_on_panic; use tokio::time::{sleep, timeout}; - use utils::config::EngineConfig; + use utils::config::engine_config::EngineConfig; use xlineapi::RequestWrapper; use super::*; diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index 9986ec40e..d1d272a68 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -373,7 +373,7 @@ mod test { use std::{error::Error, time::Duration}; use test_macros::abort_on_panic; - use utils::config::EngineConfig; + use utils::config::engine_config::EngineConfig; use super::*; use crate::storage::db::DB; diff --git a/crates/xline/src/utils/args.rs b/crates/xline/src/utils/args.rs index 199d26b7e..b6f92c592 100644 --- a/crates/xline/src/utils/args.rs +++ b/crates/xline/src/utils/args.rs @@ -5,20 +5,28 @@ use clap::Parser; use tokio::fs; use utils::{ config::{ - default_batch_max_size, default_batch_timeout, default_candidate_timeout_ticks, - default_client_id_keep_alive_interval, default_client_wait_synced_timeout, - default_cmd_workers, default_compact_batch_size, default_compact_sleep_interval, - default_compact_timeout, default_follower_timeout_ticks, default_gc_interval, - default_heartbeat_interval, default_initial_retry_timeout, default_log_entries_cap, - default_log_level, default_max_retry_timeout, default_metrics_enable, default_metrics_path, - default_metrics_port, default_metrics_push_endpoint, default_metrics_push_protocol, - default_propose_timeout, default_quota, default_range_retry_timeout, default_retry_count, - default_rotation, default_rpc_timeout, default_server_wait_synced_timeout, - default_sync_victims_interval, default_watch_progress_notify_interval, AuthConfig, - AutoCompactConfig, ClientConfig, ClusterConfig, CompactConfig, CurpConfigBuilder, - EngineConfig, InitialClusterState, LevelConfig, LogConfig, MetricsConfig, - MetricsPushProtocol, RotationConfig, ServerTimeout, StorageConfig, TlsConfig, TraceConfig, - XlineServerConfig, + auth_config::AuthConfig, client_config::default_client_wait_synced_timeout, + client_config::default_initial_retry_timeout, client_config::default_max_retry_timeout, + client_config::default_propose_timeout, client_config::default_retry_count, + client_config::ClientConfig, cluster_config::ClusterConfig, + cluster_config::InitialClusterState, compact_config::default_compact_batch_size, + compact_config::default_compact_sleep_interval, compact_config::AutoCompactConfig, + compact_config::CompactConfig, curp_config::default_batch_max_size, + curp_config::default_batch_timeout, curp_config::default_candidate_timeout_ticks, + curp_config::default_cmd_workers, curp_config::default_follower_timeout_ticks, + curp_config::default_gc_interval, curp_config::default_heartbeat_interval, + curp_config::default_log_entries_cap, curp_config::default_rpc_timeout, + curp_config::default_server_wait_synced_timeout, curp_config::CurpConfigBuilder, + engine_config::EngineConfig, log_config::default_log_level, log_config::default_rotation, + log_config::LevelConfig, log_config::LogConfig, log_config::RotationConfig, + metrics_config::default_metrics_enable, metrics_config::default_metrics_path, + metrics_config::default_metrics_port, metrics_config::default_metrics_push_endpoint, + metrics_config::default_metrics_push_protocol, metrics_config::MetricsConfig, + metrics_config::MetricsPushProtocol, server_config::default_compact_timeout, + server_config::default_range_retry_timeout, server_config::default_sync_victims_interval, + server_config::default_watch_progress_notify_interval, server_config::ServerTimeout, + storage_config::default_quota, storage_config::StorageConfig, tls_config::TlsConfig, + trace_config::TraceConfig, XlineServerConfig, }, parse_batch_bytes, parse_duration, parse_log_level, parse_members, parse_metrics_push_protocol, parse_rotation, parse_state, ConfigFileError, diff --git a/crates/xline/src/utils/metrics.rs b/crates/xline/src/utils/metrics.rs index 97a22896d..37c30cbce 100644 --- a/crates/xline/src/utils/metrics.rs +++ b/crates/xline/src/utils/metrics.rs @@ -2,7 +2,7 @@ use opentelemetry::global; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime::Tokio}; use tracing::info; -use utils::config::{MetricsConfig, MetricsPushProtocol}; +use utils::config::metrics_config::{MetricsConfig, MetricsPushProtocol}; /// Start metrics server /// # Errors diff --git a/crates/xline/src/utils/trace.rs b/crates/xline/src/utils/trace.rs index 3b3fda98c..4f6faabbf 100644 --- a/crates/xline/src/utils/trace.rs +++ b/crates/xline/src/utils/trace.rs @@ -3,7 +3,7 @@ use opentelemetry_contrib::trace::exporter::jaeger_json::JaegerJsonExporter; use opentelemetry_sdk::runtime::Tokio; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{fmt::format, layer::SubscriberExt, util::SubscriberInitExt, Layer}; -use utils::config::{file_appender, LogConfig, TraceConfig}; +use utils::config::{log_config::file_appender, log_config::LogConfig, trace_config::TraceConfig}; /// init tracing subscriber /// # Errors diff --git a/crates/xline/tests/it/auth_test.rs b/crates/xline/tests/it/auth_test.rs index 2692ffaa4..44c678ab0 100644 --- a/crates/xline/tests/it/auth_test.rs +++ b/crates/xline/tests/it/auth_test.rs @@ -2,8 +2,9 @@ use std::{error::Error, iter, path::PathBuf}; use test_macros::abort_on_panic; use utils::config::{ - AuthConfig, ClusterConfig, CompactConfig, LogConfig, MetricsConfig, StorageConfig, TlsConfig, - TraceConfig, XlineServerConfig, + auth_config::AuthConfig, cluster_config::ClusterConfig, compact_config::CompactConfig, + log_config::LogConfig, metrics_config::MetricsConfig, storage_config::StorageConfig, + tls_config::TlsConfig, trace_config::TraceConfig, XlineServerConfig, }; use xline_test_utils::{ enable_auth, set_user, diff --git a/crates/xlinectl/src/main.rs b/crates/xlinectl/src/main.rs index 42fbd82b3..8559bfe87 100644 --- a/crates/xlinectl/src/main.rs +++ b/crates/xlinectl/src/main.rs @@ -162,7 +162,7 @@ use std::{path::PathBuf, time::Duration}; use anyhow::Result; use clap::{arg, value_parser, Command}; use command::compaction; -use ext_utils::config::ClientConfig; +use ext_utils::config::client_config::ClientConfig; use tokio::fs; use tonic::transport::{Certificate, ClientTlsConfig}; use xline_client::{Client, ClientOptions};