Skip to content

Commit

Permalink
feat: pass headers from decision message to the messenger publish act…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
gk-kindred committed Oct 16, 2023
1 parent 66a8097 commit 453eb2b
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 18 deletions.
10 changes: 3 additions & 7 deletions examples/messenger_using_kafka/src/kafka_producer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
use ahash::HashMap;
use async_trait::async_trait;
use log::info;
use rdkafka::producer::ProducerContext;
use talos_messenger_actions::kafka::{context::MessengerProducerDeliveryOpaque, models::KafkaAction, producer::KafkaProducer};
use talos_messenger_core::core::{MessengerPublisher, PublishActionType};
// use talos_messenger::{
// core::{MessengerPublisher, PublishActionType},
// kafka::producer::{KafkaProducer, MessengerProducerDeliveryOpaque},
// models::commit_actions::publish::KafkaAction,
// };

pub struct MessengerKafkaPublisher<C: ProducerContext + 'static> {
pub publisher: KafkaProducer<C>,
Expand All @@ -24,7 +20,7 @@ where
PublishActionType::Kafka
}

async fn send(&self, version: u64, payload: Self::Payload, additional_data: Self::AdditionalData) -> () {
async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap<String, String>, additional_data: Self::AdditionalData) -> () {
info!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}");

let mut bytes: Vec<u8> = Vec::new();
Expand All @@ -44,7 +40,7 @@ where
payload.partition,
payload.key.as_deref(),
payload_str,
None,
headers,
Box::new(delivery_opaque),
)
.unwrap();
Expand Down
7 changes: 2 additions & 5 deletions packages/talos_messenger_actions/src/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<C: ProducerContext + 'static> KafkaProducer<C> {
partition: Option<i32>,
key: Option<&str>,
value: &str,
headers: Option<HashMap<String, String>>,
headers: HashMap<String, String>,
delivery_opaque: C::DeliveryOpaque,
) -> Result<(), MessagePublishError> {
let record = BaseRecord::with_opaque_to(topic, delivery_opaque).payload(value);
Expand All @@ -54,10 +54,7 @@ impl<C: ProducerContext + 'static> KafkaProducer<C> {
let record = if let Some(key_str) = key { record.key(key_str) } else { record };

// Add headers if applicable
let record = match headers {
Some(x) => record.headers(build_kafka_headers(x)),
None => record,
};
let record = record.headers(build_kafka_headers(headers));

self.producer.send(record).map_err(|(kafka_error, record)| MessagePublishError {
reason: kafka_error.to_string(),
Expand Down
6 changes: 4 additions & 2 deletions packages/talos_messenger_actions/src/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ where
loop {
tokio::select! {
Some(actions) = self.rx_actions_channel.recv() => {
let MessengerCommitActions {version, commit_actions } = actions;
let MessengerCommitActions {version, commit_actions, headers } = actions;

if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()){
match get_actions_deserialised::<Vec<KafkaAction>>(publish_actions_for_type) {
Ok(actions) => {

let total_len = actions.len() as u32;

let headers_cloned = headers.clone();
for action in actions {
let publisher = self.publisher.clone();
let headers = headers_cloned.clone();
// Publish the message
tokio::spawn(async move {
publisher.send(version, action, total_len ).await;
publisher.send(version, action, headers, total_len ).await;
});

}
Expand Down
3 changes: 2 additions & 1 deletion packages/talos_messenger_core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait MessengerPublisher {
type Payload;
type AdditionalData;
fn get_publish_type(&self) -> PublishActionType;
async fn send(&self, version: u64, payload: Self::Payload, additional_data: Self::AdditionalData) -> ();
async fn send(&self, version: u64, payload: Self::Payload, headers: HashMap<String, String>, additional_data: Self::AdditionalData) -> ();
}

/// Trait to be implemented by all services.
Expand All @@ -38,6 +38,7 @@ pub trait MessengerSystemService {
pub struct MessengerCommitActions {
pub version: u64,
pub commit_actions: HashMap<String, Value>,
pub headers: HashMap<String, String>,
}

pub enum MessengerChannelFeedback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ where
acc.insert(key.to_string(), value.get_payload().clone());
acc
}),
headers: item.headers,
};
// send for publishing
self.tx_actions_channel.send(payload_to_send).await.map_err(|e| MessengerServiceError {
Expand Down Expand Up @@ -182,7 +183,9 @@ where
let version = decision.message.get_candidate_version();
info!("[Decision Message] Version received = {} and {}", decision.decision_version, version);

self.suffix.update_item_decision(version, decision.decision_version, &decision.message);
// TODO: GK - no hardcoded filters on headers
let headers: HashMap<String, String> = decision.headers.into_iter().filter(|(key, _)| key.as_str() != "messageType").collect();
self.suffix.update_item_decision(version, decision.decision_version, &decision.message, headers);

self.process_next_actions().await?;

Expand Down
25 changes: 23 additions & 2 deletions packages/talos_messenger_core/src/suffix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ pub trait MessengerSuffixItemTrait: Debug + Clone {
fn set_safepoint(&mut self, safepoint: Option<u64>);
fn set_commit_action(&mut self, commit_actions: HashMap<String, AllowedActionsMapItem>);
fn set_decision(&mut self, decision: Decision);
fn set_headers(&mut self, headers: HashMap<String, String>);

fn get_state(&self) -> &SuffixItemState;
fn get_commit_actions(&self) -> &HashMap<String, AllowedActionsMapItem>;
fn get_action_by_key_mut(&mut self, action_key: &str) -> Option<&mut AllowedActionsMapItem>;
fn get_safepoint(&self) -> &Option<u64>;
fn get_headers(&self) -> &HashMap<String, String>;
fn get_headers_mut(&mut self) -> &mut HashMap<String, String>;

fn is_abort(&self) -> Option<bool>;
}
Expand All @@ -35,7 +38,7 @@ pub trait MessengerSuffixTrait<T: MessengerSuffixItemTrait>: SuffixTrait<T> {
/// Gets the suffix items eligible to process.
fn get_suffix_items_to_process(&self) -> Vec<ActionsMapWithVersion>;
/// Updates the decision for a version.
fn update_item_decision<D: DecisionMessageTrait>(&mut self, version: u64, decision_version: u64, decision_message: &D);
fn update_item_decision<D: DecisionMessageTrait>(&mut self, version: u64, decision_version: u64, decision_message: &D, headers: HashMap<String, String>);
/// Updates the action for a version using the action_key for lookup.
fn increment_item_action_count(&mut self, version: u64, action_key: &str);

Expand Down Expand Up @@ -107,6 +110,7 @@ impl AllowedActionsMapItem {
pub struct ActionsMapWithVersion {
pub actions: HashMap<String, AllowedActionsMapItem>,
pub version: u64,
pub headers: HashMap<String, String>,
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
Expand All @@ -120,6 +124,8 @@ pub struct MessengerCandidate {
state: SuffixItemState,
/// Filtered actions that need to be processed by the messenger
allowed_actions_map: HashMap<String, AllowedActionsMapItem>,
/// Any headers from decision to be used in on-commit actions
headers: HashMap<String, String>,
}

impl From<CandidateMessage> for MessengerCandidate {
Expand All @@ -131,6 +137,7 @@ impl From<CandidateMessage> for MessengerCandidate {

state: SuffixItemState::AwaitingDecision,
allowed_actions_map: HashMap::new(),
headers: HashMap::new(),
}
}
}
Expand Down Expand Up @@ -171,6 +178,18 @@ impl MessengerSuffixItemTrait for MessengerCandidate {
fn get_action_by_key_mut(&mut self, action_key: &str) -> Option<&mut AllowedActionsMapItem> {
self.allowed_actions_map.get_mut(action_key)
}

fn set_headers(&mut self, headers: HashMap<String, String>) {
self.headers.extend(headers);
}

fn get_headers(&self) -> &HashMap<String, String> {
&self.headers
}

fn get_headers_mut(&mut self) -> &mut HashMap<String, String> {
&mut self.headers
}
}

impl<T> MessengerSuffixTrait<T> for Suffix<T>
Expand Down Expand Up @@ -217,13 +236,14 @@ where
.map(|x| ActionsMapWithVersion {
version: x.item_ver,
actions: x.item.get_commit_actions().clone(),
headers: x.item.get_headers().clone(),
})
.collect();

items
}

fn update_item_decision<D: DecisionMessageTrait>(&mut self, version: u64, decision_version: u64, decision_message: &D) {
fn update_item_decision<D: DecisionMessageTrait>(&mut self, version: u64, decision_version: u64, decision_message: &D, headers: HashMap<String, String>) {
let _ = self.update_decision_suffix_item(version, decision_version);

if let Some(item_to_update) = self.get_mut(version) {
Expand All @@ -238,6 +258,7 @@ where

item_to_update.item.set_decision(decision_message.get_decision().clone());
item_to_update.item.set_safepoint(decision_message.get_safepoint());
item_to_update.item.set_headers(headers);
}
}

Expand Down

0 comments on commit 453eb2b

Please sign in to comment.