Skip to content

Commit

Permalink
feature: Change u64 to u32 where u64 is not required.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmarek-kindred authored Aug 16, 2023
1 parent e279978 commit ea7c973
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 32 deletions.
6 changes: 3 additions & 3 deletions packages/cohort_banking/src/callbacks/oo_installer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl OutOfOrderInstallerImpl {
}
}

async fn install_using_polling(&self, _xid: String, safepoint: u64, new_version: u64, _attempt_nr: u64) -> Result<OutOfOrderInstallOutcome, String> {
async fn install_using_polling(&self, _xid: String, safepoint: u64, new_version: u64, _attempt_nr: u32) -> Result<OutOfOrderInstallOutcome, String> {
let db = Arc::clone(&self.database);
let wait_handle: JoinHandle<Result<bool, String>> = tokio::spawn(async move {
let mut safe_now = Self::is_safe_to_proceed(Arc::clone(&db), safepoint).await?;
Expand Down Expand Up @@ -91,7 +91,7 @@ impl OutOfOrderInstallerImpl {
}
}

async fn install_using_single_query(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u64) -> Result<OutOfOrderInstallOutcome, String> {
async fn install_using_single_query(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u32) -> Result<OutOfOrderInstallOutcome, String> {
// Params order:
// 1 - from, 2 - to, 3 - amount
// 4 - new_ver, 5 - safepoint
Expand Down Expand Up @@ -251,7 +251,7 @@ impl OutOfOrderInstallerImpl {

#[async_trait]
impl OutOfOrderInstaller for OutOfOrderInstallerImpl {
async fn install(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u64) -> Result<OutOfOrderInstallOutcome, String> {
async fn install(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u32) -> Result<OutOfOrderInstallOutcome, String> {
if self.single_query_strategy {
self.install_using_single_query(xid, safepoint, new_version, attempt_nr).await
} else {
Expand Down
8 changes: 4 additions & 4 deletions packages/cohort_sdk/src/cohort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,10 @@ impl Cohort {
c_giveups.add(giveups, &[]);
}
if attempt > 1 {
c_retry.add(attempt - 1, &[]);
c_retry.add(attempt as u64 - 1, &[]);
}

h_attempts.record(attempt, &[]);
h_attempts.record(attempt as u64, &[]);
h_span_2.record(span_2_val * 100.0, &[]);
});
result
Expand Down Expand Up @@ -347,7 +347,7 @@ impl Cohort {
c_talos_aborts.add(talos_aborts, &[]);
c_agent_errors.add(agent_errors, &[]);
c_db_errors.add(db_errors, &[]);
h_agent_retries.record(attempts, &[]);
h_agent_retries.record(attempts as u64, &[]);
});
}

Expand All @@ -366,7 +366,7 @@ impl Cohort {
let timeout = if request.timeout_ms > 0 {
Duration::from_millis(request.timeout_ms)
} else {
Duration::from_millis(self.config.snapshot_wait_timeout_ms)
Duration::from_millis(self.config.snapshot_wait_timeout_ms as u64)
};

let local_state: CapturedState = match self.await_for_snapshot(state_provider, previous_conflict, timeout).await {
Expand Down
10 changes: 5 additions & 5 deletions packages/cohort_sdk/src/delay_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use std::time::Duration;
#[derive(Clone)]
pub struct DelayController {
pub total_sleep_time: u128,
multiplier: u64,
min_sleep_ms: u64,
max_sleep_ms: u64,
multiplier: u32,
min_sleep_ms: u32,
max_sleep_ms: u32,
}

// TODO: move me into cohort_sdk package
impl DelayController {
pub fn new(min_sleep_ms: u64, max_sleep_ms: u64) -> Self {
pub fn new(min_sleep_ms: u32, max_sleep_ms: u32) -> Self {
Self {
multiplier: 1,
min_sleep_ms,
Expand Down Expand Up @@ -40,7 +40,7 @@ impl DelayController {
};

let delay_ms = std::cmp::min(self.max_sleep_ms, m + add);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
tokio::time::sleep(Duration::from_millis(delay_ms as u64)).await;
self.total_sleep_time += delay_ms as u128;
}
}
2 changes: 1 addition & 1 deletion packages/cohort_sdk/src/model/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub trait ItemStateProvider {

#[async_trait]
pub trait OutOfOrderInstaller {
async fn install(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u64) -> Result<OutOfOrderInstallOutcome, String>;
async fn install(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u32) -> Result<OutOfOrderInstallOutcome, String>;
}

pub enum OutOfOrderInstallOutcome {
Expand Down
23 changes: 12 additions & 11 deletions packages/cohort_sdk/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct CertificationResponse {

#[derive(Clone)]
pub struct ResponseMetadata {
pub attempts: u64,
pub attempts: u32,
pub duration_ms: u64,
}

Expand All @@ -61,12 +61,12 @@ pub struct ClientError {

#[derive(Clone)]
pub struct BackoffConfig {
pub min_ms: u64,
pub max_ms: u64,
pub min_ms: u32,
pub max_ms: u32,
}

impl BackoffConfig {
pub fn new(min_ms: u64, max_ms: u64) -> Self {
pub fn new(min_ms: u32, max_ms: u32) -> Self {
Self { min_ms, max_ms }
}
}
Expand All @@ -79,11 +79,11 @@ pub struct Config {
pub backoff_on_conflict: BackoffConfig,
pub retry_backoff: BackoffConfig,

pub retry_attempts_max: u64,
pub retry_attempts_max: u32,
pub retry_oo_backoff: BackoffConfig,
pub retry_oo_attempts_max: u64,
pub retry_oo_attempts_max: u32,

pub snapshot_wait_timeout_ms: u64,
pub snapshot_wait_timeout_ms: u32,

//
// agent config values
Expand All @@ -108,13 +108,13 @@ pub struct Config {
//
// Must be unique for each agent instance. Can be the same as AgentConfig.agent_id
pub agent_group_id: String,
pub agent_fetch_wait_max_ms: u64,
pub agent_fetch_wait_max_ms: u32,
// The maximum time librdkafka may use to deliver a message (including retries)
pub agent_message_timeout_ms: u64,
pub agent_message_timeout_ms: u32,
// Controls how long to wait until message is successfully placed on the librdkafka producer queue (including retries).
pub agent_enqueue_timeout_ms: u64,
pub agent_enqueue_timeout_ms: u32,
// should be mapped to rdkafka::config::RDKafkaLogLevel
pub agent_log_level: u64,
pub agent_log_level: u32,

//
// Database config
Expand All @@ -126,6 +126,7 @@ pub struct Config {
pub db_port: String,
pub db_database: String,
}

pub struct ReplicatorServices {
pub replicator_handle: JoinHandle<Result<(), String>>,
pub installer_handle: JoinHandle<Result<(), String>>,
Expand Down
8 changes: 4 additions & 4 deletions packages/talos_agent/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ pub struct KafkaConfig {
// Must be unique for each agent instance. Can be the same as AgentConfig.agent_id
pub group_id: String,
pub certification_topic: String,
pub fetch_wait_max_ms: u64,
pub fetch_wait_max_ms: u32,
// The maximum time librdkafka may use to deliver a message (including retries)
pub message_timeout_ms: u64,
pub message_timeout_ms: u32,
// Controls how long to wait until message is successfully placed on the librdkafka producer queue (including retries).
pub enqueue_timeout_ms: u64,
pub enqueue_timeout_ms: u32,
pub log_level: RDKafkaLogLevel,
pub talos_type: TalosType,
// defaults to SCRAM-SHA-512
Expand All @@ -79,7 +79,7 @@ pub struct KafkaConfig {
}

impl KafkaConfig {
pub fn map_log_level(level: u64) -> RDKafkaLogLevel {
pub fn map_log_level(level: u32) -> RDKafkaLogLevel {
match level {
0 => RDKafkaLogLevel::Emerg,
1 => RDKafkaLogLevel::Alert,
Expand Down
8 changes: 4 additions & 4 deletions packages/talos_agent/src/messaging/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl Publisher for KafkaPublisher {

let data = KafkaPublisher::make_record(self.agent.clone(), &self.config.certification_topic, key.as_str(), payload.as_str());

let timeout = Timeout::After(Duration::from_millis(self.config.enqueue_timeout_ms));
let timeout = Timeout::After(Duration::from_millis(self.config.enqueue_timeout_ms as u64));
return match self.producer.send(data, timeout).await {
Ok((partition, offset)) => {
debug!("KafkaPublisher.send_message(): Published into partition {}, offset: {}", partition, offset);
Expand Down Expand Up @@ -380,9 +380,9 @@ mod tests {
brokers: "brokers".to_string(),
group_id: "group_id".to_string(),
certification_topic: "certification_topic".to_string(),
fetch_wait_max_ms: 1_u64,
message_timeout_ms: 1_u64,
enqueue_timeout_ms: 1_u64,
fetch_wait_max_ms: 1_u32,
message_timeout_ms: 1_u32,
enqueue_timeout_ms: 1_u32,
log_level: RDKafkaLogLevel::Debug,
talos_type: TalosType::InProcessMock,
sasl_mechanisms: None,
Expand Down

0 comments on commit ea7c973

Please sign in to comment.