Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow headers pass through #89

Merged
merged 4 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[workspace]
resolver = "2"

members = [
"packages/*",
# Example crates
Expand Down
10 changes: 3 additions & 7 deletions examples/messenger_using_kafka/src/kafka_producer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
use ahash::HashMap;
use async_trait::async_trait;
use log::info;
use rdkafka::producer::ProducerContext;
use talos_messenger_actions::kafka::{context::MessengerProducerDeliveryOpaque, models::KafkaAction, producer::KafkaProducer};
use talos_messenger_core::core::{MessengerPublisher, PublishActionType};
// use talos_messenger::{
// core::{MessengerPublisher, PublishActionType},
// kafka::producer::{KafkaProducer, MessengerProducerDeliveryOpaque},
// models::commit_actions::publish::KafkaAction,
// };

pub struct MessengerKafkaPublisher<C: ProducerContext + 'static> {
pub publisher: KafkaProducer<C>,
Expand All @@ -24,7 +20,7 @@ where
PublishActionType::Kafka
}

async fn send(&self, version: u64, payload: Self::Payload, additional_data: Self::AdditionalData) -> () {
async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap<String, String>, additional_data: Self::AdditionalData) -> () {
info!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}");

let mut bytes: Vec<u8> = Vec::new();
Expand All @@ -44,7 +40,7 @@ where
payload.partition,
payload.key.as_deref(),
payload_str,
None,
headers,
Box::new(delivery_opaque),
)
.unwrap();
Expand Down
26 changes: 19 additions & 7 deletions packages/talos_certifier/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ahash::HashMap;
use async_trait::async_trait;
use strum::{Display, EnumString};
use tokio::sync::broadcast;
Expand All @@ -7,13 +8,23 @@ use crate::{
model::{CandidateMessage, DecisionMessage},
};

type Version = u64;
#[derive(Debug, Clone)]
// TODO: double check this setting
#[allow(clippy::large_enum_variant)]
pub struct CandidateChannelMessage {
pub message: CandidateMessage,
pub headers: HashMap<String, String>,
}

#[derive(Debug, Clone)]
pub struct DecisionChannelMessage {
pub decision_version: u64,
pub message: DecisionMessage,
pub headers: HashMap<String, String>,
}

#[derive(Debug, Clone)]
pub enum ChannelMessage {
Candidate(CandidateMessage),
Decision(Version, DecisionMessage),
Candidate(Box<CandidateChannelMessage>),
Decision(Box<DecisionChannelMessage>),
}

#[derive(Debug, Display, Eq, PartialEq, EnumString)]
Expand All @@ -34,8 +45,9 @@ pub enum SystemMessage {
pub type ServiceResult<T = ()> = Result<T, Box<SystemServiceError>>;

#[derive(Debug)]
pub enum DecisionOutboxChannelMessage {
Decision(DecisionMessage),
pub struct DecisionOutboxChannelMessage {
pub message: DecisionMessage,
pub headers: HashMap<String, String>,
}

#[derive(Debug, Clone)]
Expand Down
4 changes: 2 additions & 2 deletions packages/talos_certifier/src/ports/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ahash::HashMap;
use async_trait::async_trait;
use std::collections::HashMap;
use tokio::task::JoinHandle;

use crate::errors::SystemServiceError;
Expand All @@ -24,5 +24,5 @@ pub trait MessageReciever: SharedPortTraits {
// The trait that should be implemented by any adapter that will publish the Decision message from Certifier Domain.
#[async_trait]
pub trait MessagePublisher: SharedPortTraits {
async fn publish_message(&self, key: &str, value: &str, headers: Option<HashMap<String, String>>) -> Result<(), SystemServiceError>;
async fn publish_message(&self, key: &str, value: &str, headers: HashMap<String, String>) -> Result<(), SystemServiceError>;
}
23 changes: 17 additions & 6 deletions packages/talos_certifier/src/services/certifier_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_trait::async_trait;
use log::{debug, error, warn};
use talos_suffix::core::SuffixConfig;
use talos_suffix::{get_nonempty_suffix_items, Suffix, SuffixTrait};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -89,7 +90,7 @@ impl CertifierService {
Ok(dm)
}

pub(crate) fn process_decision(&mut self, decision_version: &u64, decision_message: &DecisionMessage) -> Result<(), CertificationError> {
pub(crate) fn process_decision(&mut self, decision_version: u64, decision_message: &DecisionMessage) -> Result<(), CertificationError> {
// update the decision in suffix
debug!(
"[Process Decision message] Version {} and Decision Message {:?} ",
Expand All @@ -111,7 +112,7 @@ impl CertifierService {

if candidate_version_index.is_some() && candidate_version_index.unwrap().le(&self.suffix.messages.len()) {
self.suffix
.update_decision_suffix_item(candidate_version, *decision_version)
.update_decision_suffix_item(candidate_version, decision_version)
.map_err(CertificationError::SuffixError)?;

// check if all prioir items are decided.
Expand Down Expand Up @@ -147,12 +148,22 @@ impl CertifierService {

pub async fn process_message(&mut self, channel_message: &Option<ChannelMessage>) -> ServiceResult {
if let Err(certification_error) = match channel_message {
Some(ChannelMessage::Candidate(message)) => {
let decision_message = self.process_candidate(message)?;
Some(ChannelMessage::Candidate(candidate)) => {
let decision_message = self.process_candidate(&candidate.message)?;

let mut headers = candidate.headers.clone();
if let Ok(cert_time) = OffsetDateTime::now_utc().format(&Rfc3339) {
headers.insert("certTime".to_owned(), cert_time);
}

let decision_outbox_channel_message = DecisionOutboxChannelMessage {
message: decision_message.clone(),
headers,
};

Ok(self
.decision_outbox_tx
.send(DecisionOutboxChannelMessage::Decision(decision_message.clone()))
.send(decision_outbox_channel_message)
.await
.map_err(|e| SystemServiceError {
kind: SystemServiceErrorKind::SystemError(SystemErrorType::Channel),
Expand All @@ -162,7 +173,7 @@ impl CertifierService {
})?)
}

Some(ChannelMessage::Decision(version, decision_message)) => self.process_decision(version, decision_message),
Some(ChannelMessage::Decision(decision)) => self.process_decision(decision.decision_version, &decision.message),

None => Ok(()),
// _ => (),
Expand Down
28 changes: 21 additions & 7 deletions packages/talos_certifier/src/services/decision_outbox_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use ahash::HashMap;
use async_trait::async_trait;
use log::{debug, error};

Expand Down Expand Up @@ -71,7 +72,11 @@ impl DecisionOutboxService {
Ok(decision)
}

pub async fn publish_decision(publisher: &Arc<Box<dyn MessagePublisher + Send + Sync>>, decision_message: &DecisionMessage) -> ServiceResult {
pub async fn publish_decision(
publisher: &Arc<Box<dyn MessagePublisher + Send + Sync>>,
decision_message: &DecisionMessage,
headers: HashMap<String, String>,
) -> ServiceResult {
let xid = decision_message.xid.clone();
let decision_str = serde_json::to_string(&decision_message).map_err(|e| {
Box::new(SystemServiceError {
Expand All @@ -82,13 +87,18 @@ impl DecisionOutboxService {
})
})?;

let mut decision_publish_header = HashMap::new();
let mut decision_publish_header = headers;
decision_publish_header.insert("messageType".to_string(), MessageVariant::Decision.to_string());
decision_publish_header.insert("certAgent".to_string(), decision_message.agent.clone());
decision_publish_header.insert("certXid".to_string(), decision_message.xid.to_owned());

if let Some(safepoint) = decision_message.safepoint {
decision_publish_header.insert("certSafepoint".to_string(), safepoint.to_string());
}
decision_publish_header.insert("certAgent".to_string(), decision_message.agent.to_owned());

debug!("Publishing message {}", decision_message.version);
publisher
.publish_message(xid.as_str(), &decision_str, Some(decision_publish_header.clone()))
.publish_message(xid.as_str(), &decision_str, decision_publish_header.clone())
.await
.map_err(|publish_error| {
Box::new(SystemServiceError {
Expand All @@ -108,11 +118,15 @@ impl SystemService for DecisionOutboxService {
let publisher = Arc::clone(&self.decision_publisher);
let system = self.system.clone();

if let Some(DecisionOutboxChannelMessage::Decision(decision_message)) = self.decision_outbox_channel_rx.recv().await {
if let Some(decision_channel_message) = self.decision_outbox_channel_rx.recv().await {
let DecisionOutboxChannelMessage {
headers,
message: decision_message,
} = decision_channel_message;
tokio::spawn(async move {
match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await {
Ok(decision) => {
if let Err(publish_error) = DecisionOutboxService::publish_decision(&publisher, &decision).await {
if let Err(publish_error) = DecisionOutboxService::publish_decision(&publisher, &decision, headers).await {
error!(
"Error publishing message for version={} with reason={:?}",
decision.version,
Expand Down
Loading