diff --git a/Cargo.lock b/Cargo.lock index 88c3f3b1..18a419ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2586,6 +2586,7 @@ dependencies = [ name = "talos_certifier_adapters" version = "0.0.1" dependencies = [ + "ahash 0.8.3", "async-trait", "deadpool-postgres", "env_logger", diff --git a/packages/talos_certifier/src/core.rs b/packages/talos_certifier/src/core.rs index 5dee94c8..758a2d25 100644 --- a/packages/talos_certifier/src/core.rs +++ b/packages/talos_certifier/src/core.rs @@ -1,5 +1,4 @@ -use std::collections::HashMap; - +use ahash::HashMap; use async_trait::async_trait; use strum::{Display, EnumString}; use tokio::sync::broadcast; @@ -9,27 +8,20 @@ use crate::{ model::{CandidateMessage, DecisionMessage}, }; -#[derive(Debug, Clone)] -pub struct ChannelMeta { - pub headers: HashMap, -} - #[derive(Debug, Clone)] pub struct CandidateChannelMessage { pub message: CandidateMessage, - pub meta: ChannelMeta, + pub headers: HashMap, } #[derive(Debug, Clone)] pub struct DecisionChannelMessage { pub decision_version: u64, pub message: DecisionMessage, - pub meta: ChannelMeta, + pub headers: HashMap, } #[derive(Debug, Clone)] -// TODO: double check this setting -#[allow(clippy::large_enum_variant)] pub enum ChannelMessage { Candidate(Box), Decision(Box), @@ -52,14 +44,10 @@ pub enum SystemMessage { pub type ServiceResult = Result>; -#[derive(Debug)] -pub struct DecisionOutboxChannelMessageMeta { - pub headers: HashMap, -} #[derive(Debug)] pub struct DecisionOutboxChannelMessage { pub message: DecisionMessage, - pub meta: DecisionOutboxChannelMessageMeta, + pub headers: HashMap, } #[derive(Debug, Clone)] diff --git a/packages/talos_certifier/src/lib.rs b/packages/talos_certifier/src/lib.rs index c481abc1..7cf7e2cb 100644 --- a/packages/talos_certifier/src/lib.rs +++ b/packages/talos_certifier/src/lib.rs @@ -8,6 +8,6 @@ pub mod ports; pub mod services; pub mod talos_certifier_service; -pub use crate::core::{ChannelMessage, ChannelMeta, SystemMessage}; +pub use crate::core::{ChannelMessage, SystemMessage}; pub use certifier::Certifier; pub use certifier::CertifierCandidate; diff --git a/packages/talos_certifier/src/ports/message.rs b/packages/talos_certifier/src/ports/message.rs index 9264d0a4..b5ed5145 100644 --- a/packages/talos_certifier/src/ports/message.rs +++ b/packages/talos_certifier/src/ports/message.rs @@ -1,5 +1,5 @@ +use ahash::HashMap; use async_trait::async_trait; -use std::collections::HashMap; use tokio::task::JoinHandle; use crate::errors::SystemServiceError; @@ -24,5 +24,5 @@ pub trait MessageReciever: SharedPortTraits { // The trait that should be implemented by any adapter that will publish the Decision message from Certifier Domain. #[async_trait] pub trait MessagePublisher: SharedPortTraits { - async fn publish_message(&self, key: &str, value: &str, headers: Option>) -> Result<(), SystemServiceError>; + async fn publish_message(&self, key: &str, value: &str, headers: HashMap) -> Result<(), SystemServiceError>; } diff --git a/packages/talos_certifier/src/services/certifier_service.rs b/packages/talos_certifier/src/services/certifier_service.rs index c1ca6a9d..dde7df1f 100644 --- a/packages/talos_certifier/src/services/certifier_service.rs +++ b/packages/talos_certifier/src/services/certifier_service.rs @@ -9,7 +9,6 @@ use time::OffsetDateTime; use tokio::sync::mpsc; use crate::certifier::utils::generate_certifier_sets_from_suffix; -use crate::core::DecisionOutboxChannelMessageMeta; use crate::{ core::{DecisionOutboxChannelMessage, ServiceResult, System, SystemService}, errors::{CertificationError, SystemErrorType, SystemServiceError, SystemServiceErrorKind}, @@ -153,9 +152,7 @@ impl CertifierService { let decision_outbox_channel_message = DecisionOutboxChannelMessage { message: decision_message.clone(), - meta: DecisionOutboxChannelMessageMeta { - headers: candidate.meta.headers.clone(), - }, + headers: candidate.headers.clone(), }; Ok(self diff --git a/packages/talos_certifier/src/services/decision_outbox_service.rs b/packages/talos_certifier/src/services/decision_outbox_service.rs index d890d23d..573bbd63 100644 --- a/packages/talos_certifier/src/services/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/decision_outbox_service.rs @@ -1,12 +1,13 @@ use std::sync::Arc; +use ahash::HashMap; use async_trait::async_trait; use log::{debug, error}; use time::OffsetDateTime; use tokio::sync::mpsc; -use crate::core::{DecisionOutboxChannelMessageMeta, ServiceResult}; +use crate::core::ServiceResult; use crate::{ core::{DecisionOutboxChannelMessage, MessageVariant, System, SystemService}, errors::{SystemServiceError, SystemServiceErrorKind}, @@ -74,7 +75,7 @@ impl DecisionOutboxService { pub async fn publish_decision( publisher: &Arc>, decision_message: &DecisionMessage, - meta: DecisionOutboxChannelMessageMeta, + headers: HashMap, ) -> ServiceResult { let xid = decision_message.xid.clone(); let decision_str = serde_json::to_string(&decision_message).map_err(|e| { @@ -86,7 +87,7 @@ impl DecisionOutboxService { }) })?; - let mut decision_publish_header = meta.headers; + let mut decision_publish_header = headers; decision_publish_header.insert("messageType".to_string(), MessageVariant::Decision.to_string()); decision_publish_header.insert("certXid".to_string(), decision_message.xid.to_owned()); @@ -98,7 +99,7 @@ impl DecisionOutboxService { debug!("Publishing message {}", decision_message.version); publisher - .publish_message(xid.as_str(), &decision_str, Some(decision_publish_header.clone())) + .publish_message(xid.as_str(), &decision_str, decision_publish_header.clone()) .await .map_err(|publish_error| { Box::new(SystemServiceError { @@ -120,13 +121,13 @@ impl SystemService for DecisionOutboxService { if let Some(decision_channel_message) = self.decision_outbox_channel_rx.recv().await { let DecisionOutboxChannelMessage { - meta, + headers, message: decision_message, } = decision_channel_message; tokio::spawn(async move { match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await { Ok(decision) => { - if let Err(publish_error) = DecisionOutboxService::publish_decision(&publisher, &decision, meta).await { + if let Err(publish_error) = DecisionOutboxService::publish_decision(&publisher, &decision, headers).await { error!( "Error publishing message for version={} with reason={:?}", decision.version, diff --git a/packages/talos_certifier/src/services/tests/certifier_service.rs b/packages/talos_certifier/src/services/tests/certifier_service.rs index ad8bbabc..38974106 100644 --- a/packages/talos_certifier/src/services/tests/certifier_service.rs +++ b/packages/talos_certifier/src/services/tests/certifier_service.rs @@ -1,7 +1,4 @@ -use std::{ - collections::HashMap, - sync::{atomic::AtomicI64, Arc}, -}; +use std::sync::{atomic::AtomicI64, Arc}; use crate::{ core::{CandidateChannelMessage, DecisionChannelMessage}, @@ -12,6 +9,7 @@ use crate::{ services::CertifierServiceConfig, ChannelMessage, SystemMessage, }; +use ahash::{HashMap, HashMapExt}; use talos_suffix::core::SuffixConfig; use tokio::sync::{broadcast, mpsc}; @@ -26,7 +24,7 @@ async fn send_candidate_message(message_channel_tx: mpsc::Sender .send(ChannelMessage::Candidate( CandidateChannelMessage { message: candidate_message, - meta: crate::ChannelMeta { headers: HashMap::new() }, + headers: HashMap::new(), } .into(), )) @@ -224,7 +222,7 @@ async fn test_certification_process_decision() { DecisionChannelMessage { decision_version: decision.message.version, message: decision.message, - meta: crate::ChannelMeta { headers: HashMap::new() }, + headers: HashMap::new(), } .into(), )) @@ -295,7 +293,7 @@ async fn test_certification_process_decision_incorrect_version() { version: 10, ..decision.message }, - meta: crate::ChannelMeta { headers: HashMap::new() }, + headers: HashMap::new(), } .into(), )) @@ -465,7 +463,7 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() { DecisionChannelMessage { decision_version: 6, message: decision.message, - meta: crate::ChannelMeta { headers: HashMap::new() }, + headers: HashMap::new(), } .into(), )) @@ -482,7 +480,7 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() { DecisionChannelMessage { decision_version: 7, message: decision.message, - meta: crate::ChannelMeta { headers: HashMap::new() }, + headers: HashMap::new(), } .into(), )) @@ -499,7 +497,7 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() { DecisionChannelMessage { decision_version: 8, message: decision.message, - meta: crate::ChannelMeta { headers: HashMap::new() }, + headers: HashMap::new(), } .into(), )) @@ -516,7 +514,7 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() { DecisionChannelMessage { decision_version: 10, message: decision.message, - meta: crate::ChannelMeta { headers: HashMap::new() }, + headers: HashMap::new(), } .into(), )) @@ -626,7 +624,7 @@ async fn test_certification_check_suffix_prune_is_not_at_threshold() { DecisionChannelMessage { decision_version: 6, message: decision.message, - meta: crate::ChannelMeta { headers: HashMap::new() }, + headers: HashMap::new(), } .into(), )) @@ -643,7 +641,7 @@ async fn test_certification_check_suffix_prune_is_not_at_threshold() { DecisionChannelMessage { decision_version: 7, message: decision.message, - meta: crate::ChannelMeta { headers: HashMap::new() }, + headers: HashMap::new(), } .into(), )) diff --git a/packages/talos_certifier/src/services/tests/decision_outbox_service.rs b/packages/talos_certifier/src/services/tests/decision_outbox_service.rs index 1563d727..11f97068 100644 --- a/packages/talos_certifier/src/services/tests/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/tests/decision_outbox_service.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, sync::{Arc, Mutex}, time::Duration, }; @@ -14,6 +13,7 @@ use crate::{ }, SystemMessage, }; +use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; use tokio::{ sync::{broadcast, mpsc}, @@ -62,7 +62,7 @@ struct MockDecisionPublisher; #[async_trait] impl MessagePublisher for MockDecisionPublisher { - async fn publish_message(&self, _key: &str, _value: &str, _headers: Option>) -> Result<(), SystemServiceError> { + async fn publish_message(&self, _key: &str, _value: &str, _headers: HashMap) -> Result<(), SystemServiceError> { Ok(()) } } @@ -124,7 +124,7 @@ async fn test_candidate_message_create_decision_message() { conflict_version: None, metrics: TxProcessingTimeline::default(), }, - meta: crate::core::DecisionOutboxChannelMessageMeta { headers: HashMap::new() }, + headers: HashMap::new(), }) .await .unwrap(); @@ -186,7 +186,7 @@ async fn test_save_and_publish_multiple_decisions() { conflict_version: None, metrics: TxProcessingTimeline::default(), }, - meta: crate::core::DecisionOutboxChannelMessageMeta { headers: HashMap::new() }, + headers: HashMap::new(), }) .await .unwrap(); @@ -207,7 +207,7 @@ async fn test_save_and_publish_multiple_decisions() { conflict_version: None, metrics: TxProcessingTimeline::default(), }, - meta: crate::core::DecisionOutboxChannelMessageMeta { headers: HashMap::new() }, + headers: HashMap::new(), }) .await .unwrap(); @@ -306,7 +306,7 @@ async fn test_capture_child_thread_dberror() { conflict_version: None, metrics: TxProcessingTimeline::default(), }, - meta: crate::core::DecisionOutboxChannelMessageMeta { headers: HashMap::new() }, + headers: HashMap::new(), }) .await .unwrap(); @@ -325,7 +325,7 @@ struct MockDecisionPublisherWithError; #[async_trait] impl MessagePublisher for MockDecisionPublisherWithError { - async fn publish_message(&self, _key: &str, _value: &str, _headers: Option>) -> Result<(), SystemServiceError> { + async fn publish_message(&self, _key: &str, _value: &str, _headers: HashMap) -> Result<(), SystemServiceError> { Err(SystemServiceError { kind: SystemServiceErrorKind::MessagePublishError, reason: "Failed to Publish".to_string(), @@ -361,13 +361,7 @@ async fn test_capture_publish_error() { metrics: TxProcessingTimeline::default(), }; - if let Err(publish_error) = DecisionOutboxService::publish_decision( - &Arc::new(Box::new(mock_decision_publisher)), - &decision_message, - crate::core::DecisionOutboxChannelMessageMeta { headers: HashMap::new() }, - ) - .await - { + if let Err(publish_error) = DecisionOutboxService::publish_decision(&Arc::new(Box::new(mock_decision_publisher)), &decision_message, HashMap::new()).await { assert!(publish_error.kind == SystemServiceErrorKind::MessagePublishError); } } diff --git a/packages/talos_certifier/src/services/tests/message_receiver_service.rs b/packages/talos_certifier/src/services/tests/message_receiver_service.rs index 76134faf..c8d3a278 100644 --- a/packages/talos_certifier/src/services/tests/message_receiver_service.rs +++ b/packages/talos_certifier/src/services/tests/message_receiver_service.rs @@ -1,8 +1,6 @@ -use std::{ - collections::HashMap, - sync::{atomic::AtomicI64, Arc}, -}; +use std::sync::{atomic::AtomicI64, Arc}; +use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; use tokio::{ sync::{broadcast, mpsc}, @@ -111,7 +109,7 @@ async fn test_consume_message() { .send(ChannelMessage::Candidate( CandidateChannelMessage { message: candidate_message, - meta: crate::ChannelMeta { headers: HashMap::new() }, + headers: HashMap::new(), } .into(), )) @@ -166,7 +164,7 @@ async fn test_consume_message_error() { .send(ChannelMessage::Candidate( CandidateChannelMessage { message: candidate_message, - meta: crate::ChannelMeta { headers: HashMap::new() }, + headers: HashMap::new(), } .into(), )) diff --git a/packages/talos_certifier_adapters/Cargo.toml b/packages/talos_certifier_adapters/Cargo.toml index 2a69f054..a578cec7 100644 --- a/packages/talos_certifier_adapters/Cargo.toml +++ b/packages/talos_certifier_adapters/Cargo.toml @@ -23,6 +23,9 @@ futures-util = "0.3.21" # Kafka rdkafka = { version = "0.33.2", features = ["sasl"] } +# Ahash hashmap +ahash = "0.8.3" + # uuid uuid = { version = "1.2.2", features = [] } # postgres @@ -42,11 +45,11 @@ thiserror = "1.0.31" mockall = "0.11.0" # internal crates -logger = { path = "../logger" } -metrics = { path = "../metrics" } -talos_certifier = { path = "../talos_certifier" } -talos_suffix = { path = "../talos_suffix" } -talos_common_utils = { path = "../talos_common_utils" } +logger = { path = "../logger" } +metrics = { path = "../metrics" } +talos_certifier = { path = "../talos_certifier" } +talos_suffix = { path = "../talos_suffix" } +talos_common_utils = { path = "../talos_common_utils" } talos_rdkafka_utils = { path = "../talos_rdkafka_utils" } diff --git a/packages/talos_certifier_adapters/src/kafka/consumer.rs b/packages/talos_certifier_adapters/src/kafka/consumer.rs index 74be23b0..a51aa1c1 100644 --- a/packages/talos_certifier_adapters/src/kafka/consumer.rs +++ b/packages/talos_certifier_adapters/src/kafka/consumer.rs @@ -111,7 +111,6 @@ impl MessageReciever for KafkaConsumer { data: None, })?; - let channel_meta = talos_certifier::ChannelMeta { headers: headers.clone() }; let channel_msg = match utils::parse_message_variant(message_type).map_err(|e| MessageReceiverError { kind: MessageReceiverErrorKind::ParseError, version: Some(offset), @@ -130,7 +129,7 @@ impl MessageReciever for KafkaConsumer { ChannelMessage::Candidate( CandidateChannelMessage { message: msg, - meta: channel_meta, + headers: headers.clone(), } .into(), ) @@ -160,7 +159,7 @@ impl MessageReciever for KafkaConsumer { DecisionChannelMessage { decision_version: offset, message: msg, - meta: channel_meta, + headers: headers.clone(), } .into(), ) diff --git a/packages/talos_certifier_adapters/src/kafka/producer.rs b/packages/talos_certifier_adapters/src/kafka/producer.rs index 7037105b..dc86d049 100644 --- a/packages/talos_certifier_adapters/src/kafka/producer.rs +++ b/packages/talos_certifier_adapters/src/kafka/producer.rs @@ -1,5 +1,4 @@ -use std::collections::HashMap; - +use ahash::HashMap; use async_trait::async_trait; use log::debug; use rdkafka::producer::{BaseRecord, DefaultProducerContext, ThreadedProducer}; @@ -30,13 +29,10 @@ impl KafkaProducer { // Message publisher traits #[async_trait] impl MessagePublisher for KafkaProducer { - async fn publish_message(&self, key: &str, value: &str, headers: Option>) -> Result<(), SystemServiceError> { + async fn publish_message(&self, key: &str, value: &str, headers: HashMap) -> Result<(), SystemServiceError> { let record = BaseRecord::to(&self.topic).payload(value).key(key); - let record = match headers { - Some(x) => record.headers(build_kafka_headers(x)), - None => record, - }; + let record = record.headers(build_kafka_headers(headers)); debug!("Preparing to send the Decision Message. "); let delivery_result = self diff --git a/packages/talos_certifier_adapters/src/kafka/utils.rs b/packages/talos_certifier_adapters/src/kafka/utils.rs index 9b911b63..849f80ca 100644 --- a/packages/talos_certifier_adapters/src/kafka/utils.rs +++ b/packages/talos_certifier_adapters/src/kafka/utils.rs @@ -1,5 +1,6 @@ -use std::{collections::HashMap, str::FromStr}; +use std::str::FromStr; +use ahash::{HashMap, HashMapExt}; use rdkafka::{ message::{BorrowedMessage, Header, Headers, OwnedHeaders}, Message, @@ -62,8 +63,7 @@ pub fn parse_message_variant(message_type: &String) -> Result KafkaProducer { // Message publisher traits #[async_trait] impl MessagePublisher for KafkaProducer { - async fn publish_message(&self, key: &str, value: &str, headers: Option>) -> Result<(), SystemServiceError> { + async fn publish_message(&self, key: &str, value: &str, headers: HashMap) -> Result<(), SystemServiceError> { let record = BaseRecord::to(&self.topic).payload(value).key(key); - let record = match headers { - Some(x) => record.headers(build_kafka_headers(x)), - None => record, - }; + let record = record.headers(build_kafka_headers(headers)); debug!("Preparing to publish the message. "); let delivery_result = self