From ea7c9733e78a959b461decc079dccc8f2d4d56f2 Mon Sep 17 00:00:00 2001 From: fmarek-kindred <123923685+fmarek-kindred@users.noreply.github.com> Date: Wed, 16 Aug 2023 13:55:59 +1000 Subject: [PATCH] feature: Change u64 to u32 where u64 is not required. --- .../src/callbacks/oo_installer.rs | 6 ++--- packages/cohort_sdk/src/cohort.rs | 8 +++---- packages/cohort_sdk/src/delay_controller.rs | 10 ++++---- packages/cohort_sdk/src/model/callbacks.rs | 2 +- packages/cohort_sdk/src/model/mod.rs | 23 ++++++++++--------- packages/talos_agent/src/api.rs | 8 +++---- packages/talos_agent/src/messaging/kafka.rs | 8 +++---- 7 files changed, 33 insertions(+), 32 deletions(-) diff --git a/packages/cohort_banking/src/callbacks/oo_installer.rs b/packages/cohort_banking/src/callbacks/oo_installer.rs index 3730e74a..6278eb09 100644 --- a/packages/cohort_banking/src/callbacks/oo_installer.rs +++ b/packages/cohort_banking/src/callbacks/oo_installer.rs @@ -63,7 +63,7 @@ impl OutOfOrderInstallerImpl { } } - async fn install_using_polling(&self, _xid: String, safepoint: u64, new_version: u64, _attempt_nr: u64) -> Result { + async fn install_using_polling(&self, _xid: String, safepoint: u64, new_version: u64, _attempt_nr: u32) -> Result { let db = Arc::clone(&self.database); let wait_handle: JoinHandle> = tokio::spawn(async move { let mut safe_now = Self::is_safe_to_proceed(Arc::clone(&db), safepoint).await?; @@ -91,7 +91,7 @@ impl OutOfOrderInstallerImpl { } } - async fn install_using_single_query(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u64) -> Result { + async fn install_using_single_query(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u32) -> Result { // Params order: // 1 - from, 2 - to, 3 - amount // 4 - new_ver, 5 - safepoint @@ -251,7 +251,7 @@ impl OutOfOrderInstallerImpl { #[async_trait] impl OutOfOrderInstaller for OutOfOrderInstallerImpl { - async fn install(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u64) -> Result { + async fn install(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u32) -> Result { if self.single_query_strategy { self.install_using_single_query(xid, safepoint, new_version, attempt_nr).await } else { diff --git a/packages/cohort_sdk/src/cohort.rs b/packages/cohort_sdk/src/cohort.rs index 63ef98f8..31c94bee 100644 --- a/packages/cohort_sdk/src/cohort.rs +++ b/packages/cohort_sdk/src/cohort.rs @@ -243,10 +243,10 @@ impl Cohort { c_giveups.add(giveups, &[]); } if attempt > 1 { - c_retry.add(attempt - 1, &[]); + c_retry.add(attempt as u64 - 1, &[]); } - h_attempts.record(attempt, &[]); + h_attempts.record(attempt as u64, &[]); h_span_2.record(span_2_val * 100.0, &[]); }); result @@ -347,7 +347,7 @@ impl Cohort { c_talos_aborts.add(talos_aborts, &[]); c_agent_errors.add(agent_errors, &[]); c_db_errors.add(db_errors, &[]); - h_agent_retries.record(attempts, &[]); + h_agent_retries.record(attempts as u64, &[]); }); } @@ -366,7 +366,7 @@ impl Cohort { let timeout = if request.timeout_ms > 0 { Duration::from_millis(request.timeout_ms) } else { - Duration::from_millis(self.config.snapshot_wait_timeout_ms) + Duration::from_millis(self.config.snapshot_wait_timeout_ms as u64) }; let local_state: CapturedState = match self.await_for_snapshot(state_provider, previous_conflict, timeout).await { diff --git a/packages/cohort_sdk/src/delay_controller.rs b/packages/cohort_sdk/src/delay_controller.rs index 4749c756..7f3f3daf 100644 --- a/packages/cohort_sdk/src/delay_controller.rs +++ b/packages/cohort_sdk/src/delay_controller.rs @@ -4,14 +4,14 @@ use std::time::Duration; #[derive(Clone)] pub struct DelayController { pub total_sleep_time: u128, - multiplier: u64, - min_sleep_ms: u64, - max_sleep_ms: u64, + multiplier: u32, + min_sleep_ms: u32, + max_sleep_ms: u32, } // TODO: move me into cohort_sdk package impl DelayController { - pub fn new(min_sleep_ms: u64, max_sleep_ms: u64) -> Self { + pub fn new(min_sleep_ms: u32, max_sleep_ms: u32) -> Self { Self { multiplier: 1, min_sleep_ms, @@ -40,7 +40,7 @@ impl DelayController { }; let delay_ms = std::cmp::min(self.max_sleep_ms, m + add); - tokio::time::sleep(Duration::from_millis(delay_ms)).await; + tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await; self.total_sleep_time += delay_ms as u128; } } diff --git a/packages/cohort_sdk/src/model/callbacks.rs b/packages/cohort_sdk/src/model/callbacks.rs index dadd32a4..0c6bb390 100644 --- a/packages/cohort_sdk/src/model/callbacks.rs +++ b/packages/cohort_sdk/src/model/callbacks.rs @@ -18,7 +18,7 @@ pub trait ItemStateProvider { #[async_trait] pub trait OutOfOrderInstaller { - async fn install(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u64) -> Result; + async fn install(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u32) -> Result; } pub enum OutOfOrderInstallOutcome { diff --git a/packages/cohort_sdk/src/model/mod.rs b/packages/cohort_sdk/src/model/mod.rs index 90872f46..06910693 100644 --- a/packages/cohort_sdk/src/model/mod.rs +++ b/packages/cohort_sdk/src/model/mod.rs @@ -37,7 +37,7 @@ pub struct CertificationResponse { #[derive(Clone)] pub struct ResponseMetadata { - pub attempts: u64, + pub attempts: u32, pub duration_ms: u64, } @@ -61,12 +61,12 @@ pub struct ClientError { #[derive(Clone)] pub struct BackoffConfig { - pub min_ms: u64, - pub max_ms: u64, + pub min_ms: u32, + pub max_ms: u32, } impl BackoffConfig { - pub fn new(min_ms: u64, max_ms: u64) -> Self { + pub fn new(min_ms: u32, max_ms: u32) -> Self { Self { min_ms, max_ms } } } @@ -79,11 +79,11 @@ pub struct Config { pub backoff_on_conflict: BackoffConfig, pub retry_backoff: BackoffConfig, - pub retry_attempts_max: u64, + pub retry_attempts_max: u32, pub retry_oo_backoff: BackoffConfig, - pub retry_oo_attempts_max: u64, + pub retry_oo_attempts_max: u32, - pub snapshot_wait_timeout_ms: u64, + pub snapshot_wait_timeout_ms: u32, // // agent config values @@ -108,13 +108,13 @@ pub struct Config { // // Must be unique for each agent instance. Can be the same as AgentConfig.agent_id pub agent_group_id: String, - pub agent_fetch_wait_max_ms: u64, + pub agent_fetch_wait_max_ms: u32, // The maximum time librdkafka may use to deliver a message (including retries) - pub agent_message_timeout_ms: u64, + pub agent_message_timeout_ms: u32, // Controls how long to wait until message is successfully placed on the librdkafka producer queue (including retries). - pub agent_enqueue_timeout_ms: u64, + pub agent_enqueue_timeout_ms: u32, // should be mapped to rdkafka::config::RDKafkaLogLevel - pub agent_log_level: u64, + pub agent_log_level: u32, // // Database config @@ -126,6 +126,7 @@ pub struct Config { pub db_port: String, pub db_database: String, } + pub struct ReplicatorServices { pub replicator_handle: JoinHandle>, pub installer_handle: JoinHandle>, diff --git a/packages/talos_agent/src/api.rs b/packages/talos_agent/src/api.rs index f72a9e71..aca85637 100644 --- a/packages/talos_agent/src/api.rs +++ b/packages/talos_agent/src/api.rs @@ -65,11 +65,11 @@ pub struct KafkaConfig { // Must be unique for each agent instance. Can be the same as AgentConfig.agent_id pub group_id: String, pub certification_topic: String, - pub fetch_wait_max_ms: u64, + pub fetch_wait_max_ms: u32, // The maximum time librdkafka may use to deliver a message (including retries) - pub message_timeout_ms: u64, + pub message_timeout_ms: u32, // Controls how long to wait until message is successfully placed on the librdkafka producer queue (including retries). - pub enqueue_timeout_ms: u64, + pub enqueue_timeout_ms: u32, pub log_level: RDKafkaLogLevel, pub talos_type: TalosType, // defaults to SCRAM-SHA-512 @@ -79,7 +79,7 @@ pub struct KafkaConfig { } impl KafkaConfig { - pub fn map_log_level(level: u64) -> RDKafkaLogLevel { + pub fn map_log_level(level: u32) -> RDKafkaLogLevel { match level { 0 => RDKafkaLogLevel::Emerg, 1 => RDKafkaLogLevel::Alert, diff --git a/packages/talos_agent/src/messaging/kafka.rs b/packages/talos_agent/src/messaging/kafka.rs index dda3d119..7ce012b5 100644 --- a/packages/talos_agent/src/messaging/kafka.rs +++ b/packages/talos_agent/src/messaging/kafka.rs @@ -94,7 +94,7 @@ impl Publisher for KafkaPublisher { let data = KafkaPublisher::make_record(self.agent.clone(), &self.config.certification_topic, key.as_str(), payload.as_str()); - let timeout = Timeout::After(Duration::from_millis(self.config.enqueue_timeout_ms)); + let timeout = Timeout::After(Duration::from_millis(self.config.enqueue_timeout_ms as u64)); return match self.producer.send(data, timeout).await { Ok((partition, offset)) => { debug!("KafkaPublisher.send_message(): Published into partition {}, offset: {}", partition, offset); @@ -380,9 +380,9 @@ mod tests { brokers: "brokers".to_string(), group_id: "group_id".to_string(), certification_topic: "certification_topic".to_string(), - fetch_wait_max_ms: 1_u64, - message_timeout_ms: 1_u64, - enqueue_timeout_ms: 1_u64, + fetch_wait_max_ms: 1_u32, + message_timeout_ms: 1_u32, + enqueue_timeout_ms: 1_u32, log_level: RDKafkaLogLevel::Debug, talos_type: TalosType::InProcessMock, sasl_mechanisms: None,