Skip to content

Commit

Permalink
fix: use headers directly without metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Oct 16, 2023
1 parent adfe5ba commit 66a8097
Show file tree
Hide file tree
Showing 15 changed files with 61 additions and 91 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 4 additions & 16 deletions packages/talos_certifier/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;

use ahash::HashMap;
use async_trait::async_trait;
use strum::{Display, EnumString};
use tokio::sync::broadcast;
Expand All @@ -9,27 +8,20 @@ use crate::{
model::{CandidateMessage, DecisionMessage},
};

#[derive(Debug, Clone)]
pub struct ChannelMeta {
pub headers: HashMap<String, String>,
}

#[derive(Debug, Clone)]
pub struct CandidateChannelMessage {
pub message: CandidateMessage,
pub meta: ChannelMeta,
pub headers: HashMap<String, String>,
}

#[derive(Debug, Clone)]
pub struct DecisionChannelMessage {
pub decision_version: u64,
pub message: DecisionMessage,
pub meta: ChannelMeta,
pub headers: HashMap<String, String>,
}

#[derive(Debug, Clone)]
// TODO: double check this setting
#[allow(clippy::large_enum_variant)]
pub enum ChannelMessage {
Candidate(Box<CandidateChannelMessage>),
Decision(Box<DecisionChannelMessage>),
Expand All @@ -52,14 +44,10 @@ pub enum SystemMessage {

pub type ServiceResult<T = ()> = Result<T, Box<SystemServiceError>>;

#[derive(Debug)]
pub struct DecisionOutboxChannelMessageMeta {
pub headers: HashMap<String, String>,
}
#[derive(Debug)]
pub struct DecisionOutboxChannelMessage {
pub message: DecisionMessage,
pub meta: DecisionOutboxChannelMessageMeta,
pub headers: HashMap<String, String>,
}

#[derive(Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion packages/talos_certifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ pub mod ports;
pub mod services;
pub mod talos_certifier_service;

pub use crate::core::{ChannelMessage, ChannelMeta, SystemMessage};
pub use crate::core::{ChannelMessage, SystemMessage};
pub use certifier::Certifier;
pub use certifier::CertifierCandidate;
4 changes: 2 additions & 2 deletions packages/talos_certifier/src/ports/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ahash::HashMap;
use async_trait::async_trait;
use std::collections::HashMap;
use tokio::task::JoinHandle;

use crate::errors::SystemServiceError;
Expand All @@ -24,5 +24,5 @@ pub trait MessageReciever: SharedPortTraits {
// The trait that should be implemented by any adapter that will publish the Decision message from Certifier Domain.
#[async_trait]
pub trait MessagePublisher: SharedPortTraits {
async fn publish_message(&self, key: &str, value: &str, headers: Option<HashMap<String, String>>) -> Result<(), SystemServiceError>;
async fn publish_message(&self, key: &str, value: &str, headers: HashMap<String, String>) -> Result<(), SystemServiceError>;
}
5 changes: 1 addition & 4 deletions packages/talos_certifier/src/services/certifier_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use time::OffsetDateTime;
use tokio::sync::mpsc;

use crate::certifier::utils::generate_certifier_sets_from_suffix;
use crate::core::DecisionOutboxChannelMessageMeta;
use crate::{
core::{DecisionOutboxChannelMessage, ServiceResult, System, SystemService},
errors::{CertificationError, SystemErrorType, SystemServiceError, SystemServiceErrorKind},
Expand Down Expand Up @@ -153,9 +152,7 @@ impl CertifierService {

let decision_outbox_channel_message = DecisionOutboxChannelMessage {
message: decision_message.clone(),
meta: DecisionOutboxChannelMessageMeta {
headers: candidate.meta.headers.clone(),
},
headers: candidate.headers.clone(),
};

Ok(self
Expand Down
13 changes: 7 additions & 6 deletions packages/talos_certifier/src/services/decision_outbox_service.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::sync::Arc;

use ahash::HashMap;
use async_trait::async_trait;
use log::{debug, error};

use time::OffsetDateTime;
use tokio::sync::mpsc;

use crate::core::{DecisionOutboxChannelMessageMeta, ServiceResult};
use crate::core::ServiceResult;
use crate::{
core::{DecisionOutboxChannelMessage, MessageVariant, System, SystemService},
errors::{SystemServiceError, SystemServiceErrorKind},
Expand Down Expand Up @@ -74,7 +75,7 @@ impl DecisionOutboxService {
pub async fn publish_decision(
publisher: &Arc<Box<dyn MessagePublisher + Send + Sync>>,
decision_message: &DecisionMessage,
meta: DecisionOutboxChannelMessageMeta,
headers: HashMap<String, String>,
) -> ServiceResult {
let xid = decision_message.xid.clone();
let decision_str = serde_json::to_string(&decision_message).map_err(|e| {
Expand All @@ -86,7 +87,7 @@ impl DecisionOutboxService {
})
})?;

let mut decision_publish_header = meta.headers;
let mut decision_publish_header = headers;
decision_publish_header.insert("messageType".to_string(), MessageVariant::Decision.to_string());
decision_publish_header.insert("certXid".to_string(), decision_message.xid.to_owned());

Expand All @@ -98,7 +99,7 @@ impl DecisionOutboxService {

debug!("Publishing message {}", decision_message.version);
publisher
.publish_message(xid.as_str(), &decision_str, Some(decision_publish_header.clone()))
.publish_message(xid.as_str(), &decision_str, decision_publish_header.clone())
.await
.map_err(|publish_error| {
Box::new(SystemServiceError {
Expand All @@ -120,13 +121,13 @@ impl SystemService for DecisionOutboxService {

if let Some(decision_channel_message) = self.decision_outbox_channel_rx.recv().await {
let DecisionOutboxChannelMessage {
meta,
headers,
message: decision_message,
} = decision_channel_message;
tokio::spawn(async move {
match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await {
Ok(decision) => {
if let Err(publish_error) = DecisionOutboxService::publish_decision(&publisher, &decision, meta).await {
if let Err(publish_error) = DecisionOutboxService::publish_decision(&publisher, &decision, headers).await {
error!(
"Error publishing message for version={} with reason={:?}",
decision.version,
Expand Down
24 changes: 11 additions & 13 deletions packages/talos_certifier/src/services/tests/certifier_service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
collections::HashMap,
sync::{atomic::AtomicI64, Arc},
};
use std::sync::{atomic::AtomicI64, Arc};

use crate::{
core::{CandidateChannelMessage, DecisionChannelMessage},
Expand All @@ -12,6 +9,7 @@ use crate::{
services::CertifierServiceConfig,
ChannelMessage, SystemMessage,
};
use ahash::{HashMap, HashMapExt};
use talos_suffix::core::SuffixConfig;
use tokio::sync::{broadcast, mpsc};

Expand All @@ -26,7 +24,7 @@ async fn send_candidate_message(message_channel_tx: mpsc::Sender<ChannelMessage>
.send(ChannelMessage::Candidate(
CandidateChannelMessage {
message: candidate_message,
meta: crate::ChannelMeta { headers: HashMap::new() },
headers: HashMap::new(),
}
.into(),
))
Expand Down Expand Up @@ -224,7 +222,7 @@ async fn test_certification_process_decision() {
DecisionChannelMessage {
decision_version: decision.message.version,
message: decision.message,
meta: crate::ChannelMeta { headers: HashMap::new() },
headers: HashMap::new(),
}
.into(),
))
Expand Down Expand Up @@ -295,7 +293,7 @@ async fn test_certification_process_decision_incorrect_version() {
version: 10,
..decision.message
},
meta: crate::ChannelMeta { headers: HashMap::new() },
headers: HashMap::new(),
}
.into(),
))
Expand Down Expand Up @@ -465,7 +463,7 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() {
DecisionChannelMessage {
decision_version: 6,
message: decision.message,
meta: crate::ChannelMeta { headers: HashMap::new() },
headers: HashMap::new(),
}
.into(),
))
Expand All @@ -482,7 +480,7 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() {
DecisionChannelMessage {
decision_version: 7,
message: decision.message,
meta: crate::ChannelMeta { headers: HashMap::new() },
headers: HashMap::new(),
}
.into(),
))
Expand All @@ -499,7 +497,7 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() {
DecisionChannelMessage {
decision_version: 8,
message: decision.message,
meta: crate::ChannelMeta { headers: HashMap::new() },
headers: HashMap::new(),
}
.into(),
))
Expand All @@ -516,7 +514,7 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() {
DecisionChannelMessage {
decision_version: 10,
message: decision.message,
meta: crate::ChannelMeta { headers: HashMap::new() },
headers: HashMap::new(),
}
.into(),
))
Expand Down Expand Up @@ -626,7 +624,7 @@ async fn test_certification_check_suffix_prune_is_not_at_threshold() {
DecisionChannelMessage {
decision_version: 6,
message: decision.message,
meta: crate::ChannelMeta { headers: HashMap::new() },
headers: HashMap::new(),
}
.into(),
))
Expand All @@ -643,7 +641,7 @@ async fn test_certification_check_suffix_prune_is_not_at_threshold() {
DecisionChannelMessage {
decision_version: 7,
message: decision.message,
meta: crate::ChannelMeta { headers: HashMap::new() },
headers: HashMap::new(),
}
.into(),
))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
Expand All @@ -14,6 +13,7 @@ use crate::{
},
SystemMessage,
};
use ahash::{HashMap, HashMapExt};
use async_trait::async_trait;
use tokio::{
sync::{broadcast, mpsc},
Expand Down Expand Up @@ -62,7 +62,7 @@ struct MockDecisionPublisher;

#[async_trait]
impl MessagePublisher for MockDecisionPublisher {
async fn publish_message(&self, _key: &str, _value: &str, _headers: Option<HashMap<String, String>>) -> Result<(), SystemServiceError> {
async fn publish_message(&self, _key: &str, _value: &str, _headers: HashMap<String, String>) -> Result<(), SystemServiceError> {
Ok(())
}
}
Expand Down Expand Up @@ -124,7 +124,7 @@ async fn test_candidate_message_create_decision_message() {
conflict_version: None,
metrics: TxProcessingTimeline::default(),
},
meta: crate::core::DecisionOutboxChannelMessageMeta { headers: HashMap::new() },
headers: HashMap::new(),
})
.await
.unwrap();
Expand Down Expand Up @@ -186,7 +186,7 @@ async fn test_save_and_publish_multiple_decisions() {
conflict_version: None,
metrics: TxProcessingTimeline::default(),
},
meta: crate::core::DecisionOutboxChannelMessageMeta { headers: HashMap::new() },
headers: HashMap::new(),
})
.await
.unwrap();
Expand All @@ -207,7 +207,7 @@ async fn test_save_and_publish_multiple_decisions() {
conflict_version: None,
metrics: TxProcessingTimeline::default(),
},
meta: crate::core::DecisionOutboxChannelMessageMeta { headers: HashMap::new() },
headers: HashMap::new(),
})
.await
.unwrap();
Expand Down Expand Up @@ -306,7 +306,7 @@ async fn test_capture_child_thread_dberror() {
conflict_version: None,
metrics: TxProcessingTimeline::default(),
},
meta: crate::core::DecisionOutboxChannelMessageMeta { headers: HashMap::new() },
headers: HashMap::new(),
})
.await
.unwrap();
Expand All @@ -325,7 +325,7 @@ struct MockDecisionPublisherWithError;

#[async_trait]
impl MessagePublisher for MockDecisionPublisherWithError {
async fn publish_message(&self, _key: &str, _value: &str, _headers: Option<HashMap<String, String>>) -> Result<(), SystemServiceError> {
async fn publish_message(&self, _key: &str, _value: &str, _headers: HashMap<String, String>) -> Result<(), SystemServiceError> {
Err(SystemServiceError {
kind: SystemServiceErrorKind::MessagePublishError,
reason: "Failed to Publish".to_string(),
Expand Down Expand Up @@ -361,13 +361,7 @@ async fn test_capture_publish_error() {
metrics: TxProcessingTimeline::default(),
};

if let Err(publish_error) = DecisionOutboxService::publish_decision(
&Arc::new(Box::new(mock_decision_publisher)),
&decision_message,
crate::core::DecisionOutboxChannelMessageMeta { headers: HashMap::new() },
)
.await
{
if let Err(publish_error) = DecisionOutboxService::publish_decision(&Arc::new(Box::new(mock_decision_publisher)), &decision_message, HashMap::new()).await {
assert!(publish_error.kind == SystemServiceErrorKind::MessagePublishError);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::{
collections::HashMap,
sync::{atomic::AtomicI64, Arc},
};
use std::sync::{atomic::AtomicI64, Arc};

use ahash::{HashMap, HashMapExt};
use async_trait::async_trait;
use tokio::{
sync::{broadcast, mpsc},
Expand Down Expand Up @@ -111,7 +109,7 @@ async fn test_consume_message() {
.send(ChannelMessage::Candidate(
CandidateChannelMessage {
message: candidate_message,
meta: crate::ChannelMeta { headers: HashMap::new() },
headers: HashMap::new(),
}
.into(),
))
Expand Down Expand Up @@ -166,7 +164,7 @@ async fn test_consume_message_error() {
.send(ChannelMessage::Candidate(
CandidateChannelMessage {
message: candidate_message,
meta: crate::ChannelMeta { headers: HashMap::new() },
headers: HashMap::new(),
}
.into(),
))
Expand Down
13 changes: 8 additions & 5 deletions packages/talos_certifier_adapters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ futures-util = "0.3.21"
# Kafka
rdkafka = { version = "0.33.2", features = ["sasl"] }

# Ahash hashmap
ahash = "0.8.3"

# uuid
uuid = { version = "1.2.2", features = [] }
# postgres
Expand All @@ -42,11 +45,11 @@ thiserror = "1.0.31"
mockall = "0.11.0"

# internal crates
logger = { path = "../logger" }
metrics = { path = "../metrics" }
talos_certifier = { path = "../talos_certifier" }
talos_suffix = { path = "../talos_suffix" }
talos_common_utils = { path = "../talos_common_utils" }
logger = { path = "../logger" }
metrics = { path = "../metrics" }
talos_certifier = { path = "../talos_certifier" }
talos_suffix = { path = "../talos_suffix" }
talos_common_utils = { path = "../talos_common_utils" }
talos_rdkafka_utils = { path = "../talos_rdkafka_utils" }


Expand Down
Loading

0 comments on commit 66a8097

Please sign in to comment.