Skip to content

Commit

Permalink
feat: final tweaks to messenger (#86)
Browse files Browse the repository at this point in the history
* feat: rename update_savepoint fn in suffix to update_offset_to_commit

* feat: move MessengerProducerContext to its own file

* feat: built whitelist actions using env variables

* feat: move get_mut and get_meta function in messenger suffix to core suffix package

* feat: Handle processing action error and marking items complete

* feat: publish message in separate thread

* chore: lint fix due to rust version update to 1.73
  • Loading branch information
gk-kindred authored Oct 9, 2023
1 parent f97e6f5 commit 86e1454
Show file tree
Hide file tree
Showing 24 changed files with 350 additions and 233 deletions.
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,8 @@ BANK_REPLICATOR_KAFKA_GROUP_ID="talos-replicator-dev"
BANK_STATEMAP_INSTALLER_MAX_RETRY=5
BANK_STATEMAP_INSTALL_RETRY_WAIT_MS=2

# ### Talos Messenger Env variables (start) #############################
# Messenger environment variables
TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev"
TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev"
TALOS_MESSENGER_ACTIONS_WHITELIST="publish:kafka"
# ### Talos Messenger Env variables (end) #############################
21 changes: 9 additions & 12 deletions examples/agent_client/examples/agent_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,15 @@ async fn get_params() -> Result<LaunchParams, String> {
}
}

if stop_type.is_none() {
Err("Parameter --volume is required".into())
} else if target_rate.is_none() {
Err("Parameter --rate is required".into())
} else {
Ok(LaunchParams {
target_rate: target_rate.unwrap(),
stop_type: stop_type.unwrap(),
threads: threads.unwrap(),
collect_metrics: collect_metrics.unwrap(),
})
}
let stop_type = stop_type.ok_or("Parameter --volume is required")?;
let target_rate = target_rate.ok_or("Parameter --rate is required")?;

Ok(LaunchParams {
target_rate,
stop_type,
threads: threads.unwrap(),
collect_metrics: collect_metrics.unwrap(),
})
}

struct RequestGenerator {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,18 @@ async fn get_params() -> Result<LaunchParams, String> {
}
}

if stop_type.is_none() {
Err("Parameter --volume is required".into())
} else if accounts.is_none() {
Err("Parameter --accounts is required".into())
} else if target_rate.is_none() {
Err("Parameter --rate is required".into())
} else {
Ok(LaunchParams {
target_rate: target_rate.unwrap(),
stop_type: stop_type.unwrap(),
threads: threads.unwrap(),
accounts: accounts.unwrap(),
scaling_config: scaling_config.unwrap_or(HashMap::new()),
metric_print_raw: metric_print_raw.is_some(),
})
}
let stop_type = stop_type.ok_or("Parameter --volume is required")?;
let target_rate = target_rate.ok_or("Parameter --rate is required")?;
let accounts = accounts.ok_or("Parameter --accounts is required")?;

Ok(LaunchParams {
target_rate,
stop_type,
threads: threads.unwrap(),
accounts,
scaling_config: scaling_config.unwrap_or_default(),
metric_print_raw: metric_print_raw.is_some(),
})
}

struct TransferRequestGenerator {
Expand Down
25 changes: 11 additions & 14 deletions examples/messenger_using_kafka/examples/messenger_using_kafka.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use ahash::{HashMap, HashMapExt};
use talos_certifier::ports::MessageReciever;
use talos_certifier_adapters::KafkaConsumer;
use talos_common_utils::env_var;
use talos_messenger_actions::kafka::{
producer::{KafkaProducer, MessengerKafkaProducerContext},
service::KafkaActionService,
use talos_messenger_actions::kafka::{context::MessengerProducerContext, producer::KafkaProducer, service::KafkaActionService};
use talos_messenger_core::{
services::MessengerInboundService,
suffix::MessengerCandidate,
talos_messenger_service::TalosMessengerService,
utlis::{create_whitelist_actions_from_str, ActionsParserConfig},
};
use talos_messenger_core::{services::MessengerInboundService, suffix::MessengerCandidate, talos_messenger_service::TalosMessengerService};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use talos_suffix::{core::SuffixConfig, Suffix};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -49,30 +50,26 @@ async fn main() {
};
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(suffix_config);

let mut whitelisted_actions = HashMap::<&'static str, Vec<&'static str>>::new();
// TODO: GK - Set through env
whitelisted_actions.insert("publish", vec!["kafka"]);
let actions_from_env = env_var!("TALOS_MESSENGER_ACTIONS_WHITELIST");
let allowed_actions = create_whitelist_actions_from_str(&actions_from_env, &ActionsParserConfig::default());

let inbound_service = MessengerInboundService {
message_receiver: kafka_consumer,
tx_actions_channel,
rx_feedback_channel,
suffix,
allowed_actions: whitelisted_actions,
allowed_actions,
};

// TODO: GK - create topic should be part of publish.
kafka_config.topic = "test.messenger.topic".to_string();

let tx_feedback_channel_clone = tx_feedback_channel.clone();
let custom_context = MessengerKafkaProducerContext {
let custom_context = MessengerProducerContext {
tx_feedback_channel: tx_feedback_channel_clone,
};
let kafka_producer = KafkaProducer::with_context(&kafka_config, custom_context);
let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer };

let publish_service = KafkaActionService {
publisher: messenger_kafka_publisher,
publisher: messenger_kafka_publisher.into(),
rx_actions_channel,
tx_feedback_channel,
};
Expand Down
5 changes: 1 addition & 4 deletions examples/messenger_using_kafka/src/kafka_producer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use async_trait::async_trait;
use log::info;
use rdkafka::producer::ProducerContext;
use talos_messenger_actions::kafka::{
models::KafkaAction,
producer::{KafkaProducer, MessengerProducerDeliveryOpaque},
};
use talos_messenger_actions::kafka::{context::MessengerProducerDeliveryOpaque, models::KafkaAction, producer::KafkaProducer};
use talos_messenger_core::core::{MessengerPublisher, PublishActionType};
// use talos_messenger::{
// core::{MessengerPublisher, PublishActionType},
Expand Down
2 changes: 1 addition & 1 deletion packages/talos_certifier/src/ports/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub trait MessageReciever: SharedPortTraits {
async fn subscribe(&self) -> Result<(), SystemServiceError>;
async fn commit(&self) -> Result<(), SystemServiceError>;
fn commit_async(&self) -> Option<JoinHandle<Result<(), SystemServiceError>>>;
fn update_savepoint(&mut self, offset: i64) -> Result<(), Box<SystemServiceError>>;
fn update_offset_to_commit(&mut self, offset: i64) -> Result<(), Box<SystemServiceError>>;
async fn update_savepoint_async(&mut self, offset: i64) -> Result<(), SystemServiceError>;
async fn unsubscribe(&self);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl SystemService for MessageReceiverService {
//** commit message
_ = self.commit_interval.tick() => {
let offset = self.commit_offset.load(std::sync::atomic::Ordering::Relaxed);
self.receiver.update_savepoint(offset)?;
self.receiver.update_offset_to_commit(offset)?;
self.receiver.commit_async();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl MessageReciever for MockReciever {
fn commit_async(&self) -> Option<JoinHandle<Result<(), SystemServiceError>>> {
None
}
fn update_savepoint(&mut self, _version: i64) -> Result<(), Box<SystemServiceError>> {
fn update_offset_to_commit(&mut self, _version: i64) -> Result<(), Box<SystemServiceError>> {
Ok(())
}
async fn update_savepoint_async(&mut self, _version: i64) -> Result<(), SystemServiceError> {
Expand Down
2 changes: 1 addition & 1 deletion packages/talos_certifier_adapters/src/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl MessageReciever for KafkaConsumer {
}
}

fn update_savepoint(&mut self, offset: i64) -> Result<(), Box<SystemServiceError>> {
fn update_offset_to_commit(&mut self, offset: i64) -> Result<(), Box<SystemServiceError>> {
// let partition = self.tpl.;
let tpl = self.tpl.elements_for_topic(&self.topic);
if !tpl.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion packages/talos_cohort_replicator/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ where

pub(crate) async fn commit(&mut self) {
if let Some(version) = self.next_commit_offset {
self.receiver.update_savepoint(version as i64).unwrap();
self.receiver.update_offset_to_commit(version as i64).unwrap();
self.receiver.commit_async();
self.next_commit_offset = None;
}
Expand Down
57 changes: 57 additions & 0 deletions packages/talos_messenger_actions/src/kafka/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use futures_executor::block_on;
use log::{error, info};
use rdkafka::{producer::ProducerContext, ClientContext, Message};
use talos_messenger_core::{core::MessengerChannelFeedback, errors::MessengerActionError};
use tokio::sync::mpsc;

#[derive(Debug)]
pub struct MessengerProducerDeliveryOpaque {
pub version: u64,
pub total_publish_count: u32,
}

#[derive(Debug, Clone)]
pub struct MessengerProducerContext {
pub tx_feedback_channel: mpsc::Sender<MessengerChannelFeedback>,
}

impl ClientContext for MessengerProducerContext {}
impl ProducerContext for MessengerProducerContext {
type DeliveryOpaque = Box<MessengerProducerDeliveryOpaque>;

fn delivery(&self, delivery_result: &rdkafka::producer::DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque) {
let result = delivery_result.as_ref();

let version = delivery_opaque.version;

match result {
Ok(msg) => {
info!("Message {:?} {:?}", msg.key(), msg.offset());
// Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown.
if let Err(error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(version, "kafka".to_string()))) {
error!("[Messenger Producer Context] Error sending feedback for version={version} with error={error:?}");
};
}
Err((publish_error, borrowed_message)) => {
error!(
"[Messenger Producer Context] Error for version={:?} \nerror={:?}",
delivery_opaque.version,
publish_error.to_string()
);
let messenger_error = MessengerActionError {
kind: talos_messenger_core::errors::MessengerActionErrorKind::Publishing,
reason: publish_error.to_string(),
data: format!("version={version} message={:#?}", borrowed_message.detach()),
};
// Safe to ignore error check, as error occurs only if receiver is closed or dropped, which would happen if the thread receving has errored. In such a scenario, the publisher thread would also shutdown.
if let Err(send_error) = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Error(
version,
"kafka".to_string(),
Box::new(messenger_error),
))) {
error!("[Messenger Producer Context] Error sending error feedback for version={version} with \npublish_error={publish_error:?} \nchannel send_error={send_error:?}");
};
}
}
}
}
1 change: 1 addition & 0 deletions packages/talos_messenger_actions/src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod context;
pub mod models;
pub mod producer;
pub mod service;
31 changes: 30 additions & 1 deletion packages/talos_messenger_actions/src/kafka/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,41 @@ use ahash::HashMap;
use serde::{Deserialize, Serialize}; // 1.0.130
use serde_json::{self};

fn default_text_plain_encoding() -> String {
"text/plain".to_string()
}

fn default_application_json_encoding() -> String {
"application/json".to_string()
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub struct MessengerKafkaActionHeader {
pub key_encoding: String,
pub key: String,
pub value_encoding: String,
pub value: String,
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct KafkaAction {
// TODO: GK - Add additional Kafka producer related props here.
#[serde(default)]
pub cluster: String,
/// Topic to publish the payload
pub topic: String,
/// Key encoding to be used. Defaults to `text/plain`.
#[serde(default = "default_text_plain_encoding")]
pub key_encoding: String,
/// Key for the message to publish.
pub key: Option<String>,
/// Optional if the message should be published to a specific partition.
pub partition: Option<i32>,
/// Optional headers while publishing.
pub headers: Option<HashMap<String, String>>,
/// Key encoding to be used. Defaults to `application/json`.
#[serde(default = "default_application_json_encoding")]
pub value_encoding: String,
/// Payload to publish.
pub value: serde_json::Value,
}
45 changes: 1 addition & 44 deletions packages/talos_messenger_actions/src/kafka/producer.rs
Original file line number Diff line number Diff line change
@@ -1,62 +1,19 @@
use std::collections::HashMap;

use async_trait::async_trait;
use log::{debug, info};
use log::debug;
use rdkafka::{
config::{FromClientConfig, FromClientConfigAndContext},
producer::{BaseRecord, DefaultProducerContext, ProducerContext, ThreadedProducer},
ClientContext, Message,
};
use talos_certifier::{
errors::SystemServiceError,
ports::{common::SharedPortTraits, errors::MessagePublishError, MessagePublisher},
};
use talos_certifier_adapters::kafka::utils::build_kafka_headers;
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use tokio::sync::mpsc;

use futures_executor::block_on;
use talos_messenger_core::core::MessengerChannelFeedback;

#[derive(Debug)]
pub struct MessengerProducerDeliveryOpaque {
pub version: u64,
pub total_publish_count: u32,
}

pub struct MessengerKafkaProducerContext {
pub tx_feedback_channel: mpsc::Sender<MessengerChannelFeedback>,
}

impl ClientContext for MessengerKafkaProducerContext {}
impl ProducerContext for MessengerKafkaProducerContext {
type DeliveryOpaque = Box<MessengerProducerDeliveryOpaque>;

fn delivery(&self, delivery_result: &rdkafka::producer::DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque) {
let result = delivery_result.as_ref();

let version = delivery_opaque.version;

match result {
Ok(msg) => {
info!("Message {:?} {:?}", msg.key(), msg.offset());
// TODO: GK - what to do on error? Panic?
let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success(
version,
"kafka".to_string(),
delivery_opaque.total_publish_count,
)));
}
Err(err) => {
// TODO: GK - what to do on error? Panic?
let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Error(version, err.0.to_string())));
}
}
}
}

// Kafka Producer
// #[derive(Clone)]
pub struct KafkaProducer<C: ProducerContext + 'static = DefaultProducerContext> {
producer: ThreadedProducer<C>,
topic: String,
Expand Down
Loading

0 comments on commit 86e1454

Please sign in to comment.