diff --git a/.env.example b/.env.example index d1e7df8a..0c175510 100644 --- a/.env.example +++ b/.env.example @@ -70,3 +70,6 @@ STATEMAP_INSTALLER_THREAD_POOL=10 BANK_REPLICATOR_KAFKA_GROUP_ID="talos-replicator-dev" BANK_STATEMAP_INSTALLER_MAX_RETRY=5 BANK_STATEMAP_INSTALL_RETRY_WAIT_MS=2 + +# Messenger environment variables +TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 884f1207..02b43fd4 100644 --- a/.gitignore +++ b/.gitignore @@ -101,3 +101,6 @@ Temporary Items tasklist.md docs/.*.bkp + +# Temporary testing bin +packages/talos_messenger_actions/src/bin/test**.rs diff --git a/Cargo.lock b/Cargo.lock index 64a4918b..88c3f3b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1354,6 +1354,27 @@ dependencies = [ "autocfg", ] +[[package]] +name = "messenger_using_kafka" +version = "0.0.1" +dependencies = [ + "ahash 0.8.3", + "async-trait", + "env_logger", + "log", + "rdkafka 0.33.2", + "serde", + "serde_json", + "talos_certifier", + "talos_certifier_adapters", + "talos_common_utils", + "talos_messenger_actions", + "talos_messenger_core", + "talos_rdkafka_utils", + "talos_suffix", + "tokio", +] + [[package]] name = "metrics" version = "0.0.1" @@ -2620,6 +2641,59 @@ dependencies = [ "serial_test", ] +[[package]] +name = "talos_messenger_actions" +version = "0.1.0" +dependencies = [ + "ahash 0.8.3", + "async-trait", + "env_logger", + "futures-executor", + "futures-util", + "indexmap 2.0.0", + "log", + "mockall", + "rand", + "rdkafka 0.33.2", + "serde", + "serde_json", + "strum 0.25.0", + "talos_certifier", + "talos_certifier_adapters", + "talos_messenger_core", + "talos_rdkafka_utils", + "thiserror", + "time 0.3.26", + "tokio", + "tokio-test", +] + +[[package]] +name = "talos_messenger_core" +version = "0.1.0" +dependencies = [ + "ahash 0.8.3", + "async-trait", + "env_logger", + "futures-executor", + "futures-util", + "indexmap 2.0.0", + "log", + "mockall", + "rand", + "rdkafka 0.33.2", + "serde", + "serde_json", + "strum 0.25.0", + "talos_certifier", + "talos_common_utils", + "talos_suffix", + "thiserror", + "time 0.3.26", + "tokio", + "tokio-test", +] + [[package]] name = "talos_rdkafka_utils" version = "0.1.0" diff --git a/Makefile b/Makefile index 4ef71fd3..b9c106e5 100644 --- a/Makefile +++ b/Makefile @@ -179,6 +179,11 @@ dev.histogram_decision_timeline_from_kafka: $(call pp,histogram_decision_timeline_from_kafka...) cargo run --bin histogram_decision_timeline_from_kafka --release -- $(args) +## example.messenger_kafka: 🧪 Runs the example messenger that uses Kafka. +example.messenger_kafka: + $(call pp,run app...) + cargo run -r --example messenger_using_kafka + ## lint: 🧹 Checks for lint failures on rust lint: $(call pp,lint rust...) diff --git a/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs b/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs index 9c4aa103..5c79ff27 100644 --- a/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs +++ b/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs @@ -19,6 +19,8 @@ async fn main() -> Result<(), impl std::error::Error> { info!("Talos certifier starting..."); let kafka_config = KafkaConfig::from_env(None); + // kafka_config.extend(None, None); + let pg_config = PgConfig::from_env(); let mock_config = get_mock_config(); let suffix_config = Some(SuffixConfig { diff --git a/examples/messenger_using_kafka/Cargo.toml b/examples/messenger_using_kafka/Cargo.toml new file mode 100644 index 00000000..f7e4e080 --- /dev/null +++ b/examples/messenger_using_kafka/Cargo.toml @@ -0,0 +1,33 @@ +[package] +# [package] +name = "messenger_using_kafka" +version = "0.0.1" +edition = "2021" +keywords = ["talos"] +description = "Example on consuming `talos_messenger`" + +[dependencies] +# Logging +log = { workspace = true } +env_logger = { workspace = true } +# Async +tokio = { workspace = true } +async-trait = { workspace = true } +# Serde +serde = { workspace = true } +serde_json = { workspace = true } + +# Kafka +rdkafka = { version = "0.33.2", features = ["sasl"] } + +# ahash +ahash = "0.8.3" + +# internal crates +talos_certifier = { path = "../../packages/talos_certifier" } +talos_suffix = { path = "../../packages/talos_suffix" } +talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters" } +talos_common_utils = { path = "../../packages/talos_common_utils" } +talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" } +talos_messenger_core = { path = "../../packages/talos_messenger_core" } +talos_messenger_actions = { path = "../../packages/talos_messenger_actions" } diff --git a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs new file mode 100644 index 00000000..d6e1ccff --- /dev/null +++ b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs @@ -0,0 +1,87 @@ +use ahash::{HashMap, HashMapExt}; +use talos_certifier::ports::MessageReciever; +use talos_certifier_adapters::KafkaConsumer; +use talos_common_utils::env_var; +use talos_messenger_actions::kafka::{ + producer::{KafkaProducer, MessengerKafkaProducerContext}, + service::KafkaActionService, +}; +use talos_messenger_core::{services::MessengerInboundService, suffix::MessengerCandidate, talos_messenger_service::TalosMessengerService}; +use talos_rdkafka_utils::kafka_config::KafkaConfig; +use talos_suffix::{core::SuffixConfig, Suffix}; +use tokio::sync::mpsc; + +use messenger_using_kafka::kafka_producer::MessengerKafkaPublisher; + +#[tokio::main] +async fn main() { + env_logger::builder().format_timestamp_millis().init(); + + // 0. Create required items. + // a. Create Kafka consumer + let mut kafka_config = KafkaConfig::from_env(None); + kafka_config.group_id = env_var!("TALOS_MESSENGER_KAFKA_GROUP_ID"); + kafka_config.extend( + None, + Some( + [ + ("enable.auto.commit".to_string(), "false".to_string()), + ("auto.offset.reset".to_string(), "earliest".to_string()), + // ("fetch.wait.max.ms".to_string(), "600".to_string()), + // ("socket.keepalive.enable".to_string(), "true".to_string()), + // ("acks".to_string(), "0".to_string()), + ] + .into(), + ), + ); + let kafka_consumer = KafkaConsumer::new(&kafka_config); + + // b. Subscribe to topic. + kafka_consumer.subscribe().await.unwrap(); + + let (tx_feedback_channel, rx_feedback_channel) = mpsc::channel(10_000); + let (tx_actions_channel, rx_actions_channel) = mpsc::channel(10_000); + + let suffix_config = SuffixConfig { + capacity: 400_000, + prune_start_threshold: Some(2_000), + min_size_after_prune: None, + }; + let suffix: Suffix = Suffix::with_config(suffix_config); + + let mut whitelisted_actions = HashMap::<&'static str, Vec<&'static str>>::new(); + // TODO: GK - Set through env + whitelisted_actions.insert("publish", vec!["kafka"]); + + let inbound_service = MessengerInboundService { + message_receiver: kafka_consumer, + tx_actions_channel, + rx_feedback_channel, + suffix, + allowed_actions: whitelisted_actions, + }; + + // TODO: GK - create topic should be part of publish. + kafka_config.topic = "test.messenger.topic".to_string(); + + let tx_feedback_channel_clone = tx_feedback_channel.clone(); + let custom_context = MessengerKafkaProducerContext { + tx_feedback_channel: tx_feedback_channel_clone, + }; + let kafka_producer = KafkaProducer::with_context(&kafka_config, custom_context); + let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer }; + + let publish_service = KafkaActionService { + publisher: messenger_kafka_publisher, + rx_actions_channel, + tx_feedback_channel, + }; + + // inbound_service.run().await.unwrap(); + + let messenger_service = TalosMessengerService { + services: vec![Box::new(inbound_service), Box::new(publish_service)], + }; + + messenger_service.run().await.unwrap(); +} diff --git a/examples/messenger_using_kafka/src/kafka_producer.rs b/examples/messenger_using_kafka/src/kafka_producer.rs new file mode 100644 index 00000000..75f53899 --- /dev/null +++ b/examples/messenger_using_kafka/src/kafka_producer.rs @@ -0,0 +1,55 @@ +use async_trait::async_trait; +use log::info; +use rdkafka::producer::ProducerContext; +use talos_messenger_actions::kafka::{ + models::KafkaAction, + producer::{KafkaProducer, MessengerProducerDeliveryOpaque}, +}; +use talos_messenger_core::core::{MessengerPublisher, PublishActionType}; +// use talos_messenger::{ +// core::{MessengerPublisher, PublishActionType}, +// kafka::producer::{KafkaProducer, MessengerProducerDeliveryOpaque}, +// models::commit_actions::publish::KafkaAction, +// }; + +pub struct MessengerKafkaPublisher { + pub publisher: KafkaProducer, +} + +#[async_trait] +impl MessengerPublisher for MessengerKafkaPublisher +where + C: ProducerContext> + 'static, +{ + type Payload = KafkaAction; + type AdditionalData = u32; + fn get_publish_type(&self) -> PublishActionType { + PublishActionType::Kafka + } + + async fn send(&self, version: u64, payload: Self::Payload, additional_data: Self::AdditionalData) -> () { + info!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}"); + + let mut bytes: Vec = Vec::new(); + serde_json::to_writer(&mut bytes, &payload.value).unwrap(); + + let payload_str = std::str::from_utf8(&bytes).unwrap(); + info!("[MessengerKafkaPublisher] base_record=\n{payload_str:#?}"); + + let delivery_opaque = MessengerProducerDeliveryOpaque { + version, + total_publish_count: additional_data, + }; + + self.publisher + .publish_to_topic( + &payload.topic, + payload.partition, + payload.key.as_deref(), + payload_str, + None, + Box::new(delivery_opaque), + ) + .unwrap(); + } +} diff --git a/examples/messenger_using_kafka/src/lib.rs b/examples/messenger_using_kafka/src/lib.rs new file mode 100644 index 00000000..54b94503 --- /dev/null +++ b/examples/messenger_using_kafka/src/lib.rs @@ -0,0 +1 @@ +pub mod kafka_producer; diff --git a/packages/talos_certifier/src/model/candidate_message.rs b/packages/talos_certifier/src/model/candidate_message.rs index d6fd0385..48f8ca30 100644 --- a/packages/talos_certifier/src/model/candidate_message.rs +++ b/packages/talos_certifier/src/model/candidate_message.rs @@ -4,8 +4,6 @@ use std::collections::HashMap; use crate::certifier::CertifierCandidate; -use super::delivery_order::DeliveryOrder; - #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] #[serde(rename_all = "camelCase", tag = "_typ")] pub struct CandidateMessage { @@ -25,7 +23,7 @@ pub struct CandidateMessage { #[serde(skip_serializing_if = "Option::is_none")] pub metadata: Option>, #[serde(skip_serializing_if = "Option::is_none")] - pub on_commit: Option>>, + pub on_commit: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub statemap: Option>>, diff --git a/packages/talos_certifier/src/model/decision_message.rs b/packages/talos_certifier/src/model/decision_message.rs index e86957d3..30bd5c85 100644 --- a/packages/talos_certifier/src/model/decision_message.rs +++ b/packages/talos_certifier/src/model/decision_message.rs @@ -46,6 +46,7 @@ pub trait DecisionMessageTrait { fn get_candidate_version(&self) -> u64; fn get_safepoint(&self) -> Option; fn get_decision(&self) -> &Decision; + fn is_abort(&self) -> bool; fn is_duplicate(&self) -> bool; fn get_decided_at(&self) -> i128; } @@ -64,6 +65,10 @@ impl DecisionMessageTrait for DecisionMessage { &self.decision } + fn is_abort(&self) -> bool { + self.decision == Decision::Aborted + } + fn is_duplicate(&self) -> bool { self.duplicate_version.is_some() } diff --git a/packages/talos_messenger_actions/Cargo.toml b/packages/talos_messenger_actions/Cargo.toml new file mode 100644 index 00000000..e10dddd8 --- /dev/null +++ b/packages/talos_messenger_actions/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "talos_messenger_actions" +version = "0.1.0" +edition = "2021" + +[lib] +doctest = false + +[dependencies] +# Packages from workspace +async-trait = { workspace = true } +env_logger = { workspace = true } +log = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true, features = ["full"] } + +# Strum +strum = { version = "0.25", features = ["derive"] } + +# Futures +futures-util = "0.3.26" +futures-executor = "0.3.28" + +# Error +thiserror = { version = "1.0.31" } + +# Kafka +rdkafka = { version = "0.33.2", features = ["sasl"] } + +# Time +time = { version = "0.3.17" } + +# indexmap +indexmap = { version = "2.0.0", features = ["rayon"] } +ahash = "0.8.3" + +talos_certifier = { path = "../talos_certifier" } +talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters" } +talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" } +talos_messenger_core = { path = "../../packages/talos_messenger_core" } + +[dev-dependencies] +mockall = { version = "0.11.3" } +tokio-test = { version = "0.4.2" } +rand = { version = "0.8.5" } diff --git a/packages/talos_messenger_actions/src/kafka/mod.rs b/packages/talos_messenger_actions/src/kafka/mod.rs new file mode 100644 index 00000000..3d5b6a49 --- /dev/null +++ b/packages/talos_messenger_actions/src/kafka/mod.rs @@ -0,0 +1,3 @@ +pub mod models; +pub mod producer; +pub mod service; diff --git a/packages/talos_messenger_actions/src/kafka/models.rs b/packages/talos_messenger_actions/src/kafka/models.rs new file mode 100644 index 00000000..4e617bb6 --- /dev/null +++ b/packages/talos_messenger_actions/src/kafka/models.rs @@ -0,0 +1,13 @@ +use ahash::HashMap; +use serde::{Deserialize, Serialize}; // 1.0.130 +use serde_json::{self}; + +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +pub struct KafkaAction { + // TODO: GK - Add additional Kafka producer related props here. + pub topic: String, + pub key: Option, + pub partition: Option, + pub headers: Option>, + pub value: serde_json::Value, +} diff --git a/packages/talos_messenger_actions/src/kafka/producer.rs b/packages/talos_messenger_actions/src/kafka/producer.rs new file mode 100644 index 00000000..7e1b8e83 --- /dev/null +++ b/packages/talos_messenger_actions/src/kafka/producer.rs @@ -0,0 +1,151 @@ +use std::collections::HashMap; + +use async_trait::async_trait; +use log::{debug, info}; +use rdkafka::{ + config::{FromClientConfig, FromClientConfigAndContext}, + producer::{BaseRecord, DefaultProducerContext, ProducerContext, ThreadedProducer}, + ClientContext, Message, +}; +use talos_certifier::{ + errors::SystemServiceError, + ports::{common::SharedPortTraits, errors::MessagePublishError, MessagePublisher}, +}; +use talos_certifier_adapters::kafka::utils::build_kafka_headers; +use talos_rdkafka_utils::kafka_config::KafkaConfig; +use tokio::sync::mpsc; + +use futures_executor::block_on; +use talos_messenger_core::core::MessengerChannelFeedback; + +#[derive(Debug)] +pub struct MessengerProducerDeliveryOpaque { + pub version: u64, + pub total_publish_count: u32, +} + +pub struct MessengerKafkaProducerContext { + pub tx_feedback_channel: mpsc::Sender, +} + +impl ClientContext for MessengerKafkaProducerContext {} +impl ProducerContext for MessengerKafkaProducerContext { + type DeliveryOpaque = Box; + + fn delivery(&self, delivery_result: &rdkafka::producer::DeliveryResult<'_>, delivery_opaque: Self::DeliveryOpaque) { + let result = delivery_result.as_ref(); + + let version = delivery_opaque.version; + + match result { + Ok(msg) => { + info!("Message {:?} {:?}", msg.key(), msg.offset()); + // TODO: GK - what to do on error? Panic? + let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Success( + version, + "kafka".to_string(), + delivery_opaque.total_publish_count, + ))); + } + Err(err) => { + // TODO: GK - what to do on error? Panic? + let _ = block_on(self.tx_feedback_channel.send(MessengerChannelFeedback::Error(version, err.0.to_string()))); + } + } + } +} + +// Kafka Producer +// #[derive(Clone)] +pub struct KafkaProducer { + producer: ThreadedProducer, + topic: String, +} + +impl KafkaProducer { + pub fn new(config: &KafkaConfig) -> Self { + let client_config = config.build_producer_config(); + let producer = ThreadedProducer::from_config(&client_config).expect("Failed to create producer"); + let topic = config.topic.to_owned(); + + Self { producer, topic } + } +} +impl KafkaProducer { + pub fn with_context(config: &KafkaConfig, context: C) -> Self { + let client_config = config.build_producer_config(); + let producer = ThreadedProducer::from_config_and_context(&client_config, context).expect("Failed to create producer"); + let topic = config.topic.to_owned(); + + Self { producer, topic } + } + + pub fn publish_to_topic( + &self, + topic: &str, + partition: Option, + key: Option<&str>, + value: &str, + headers: Option>, + delivery_opaque: C::DeliveryOpaque, + ) -> Result<(), MessagePublishError> { + let record = BaseRecord::with_opaque_to(topic, delivery_opaque).payload(value); + + // Add partition if applicable + let record = if let Some(part) = partition { record.partition(part) } else { record }; + + // Add key if applicable + let record = if let Some(key_str) = key { record.key(key_str) } else { record }; + + // Add headers if applicable + let record = match headers { + Some(x) => record.headers(build_kafka_headers(x)), + None => record, + }; + + self.producer.send(record).map_err(|(kafka_error, record)| MessagePublishError { + reason: kafka_error.to_string(), + data: Some(format!( + "Topic={:?} partition={:?} key={:?} headers={:?} payload={:?}", + self.topic, record.partition, record.key, record.headers, record.payload + )), + }) + } +} + +// Message publisher traits +#[async_trait] +impl MessagePublisher for KafkaProducer { + async fn publish_message(&self, key: &str, value: &str, headers: Option>) -> Result<(), SystemServiceError> { + let record = BaseRecord::to(&self.topic).payload(value).key(key); + + let record = match headers { + Some(x) => record.headers(build_kafka_headers(x)), + None => record, + }; + + debug!("Preparing to publish the message. "); + let delivery_result = self + .producer + .send(record) + // .send(record, Timeout::After(Duration::from_secs(1))) + // .await + .map_err(|(kafka_error, record)| MessagePublishError { + reason: kafka_error.to_string(), + data: Some(format!("{:?}", record)), + })?; + + debug!("Published the message successfully {:?} ", delivery_result.to_owned()); + Ok(()) + } +} + +#[async_trait] +impl SharedPortTraits for KafkaProducer { + async fn is_healthy(&self) -> bool { + true + } + async fn shutdown(&self) -> bool { + true + } +} diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs new file mode 100644 index 00000000..c6362d9e --- /dev/null +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -0,0 +1,64 @@ +use async_trait::async_trait; +use log::{error, info}; +use tokio::sync::mpsc; + +use talos_messenger_core::{ + core::{MessengerChannelFeedback, MessengerCommitActions, MessengerPublisher, MessengerSystemService}, + errors::MessengerServiceResult, + utlis::get_actions_deserialised, +}; + +use super::models::KafkaAction; + +pub struct KafkaActionService + Send + Sync> { + pub publisher: M, + pub rx_actions_channel: mpsc::Receiver, + pub tx_feedback_channel: mpsc::Sender, +} + +#[async_trait] +impl MessengerSystemService for KafkaActionService +where + M: MessengerPublisher + Send + Sync, +{ + async fn start(&self) -> MessengerServiceResult { + todo!() + } + async fn run(&mut self) -> MessengerServiceResult { + info!("Running Kafka Publisher service!!"); + loop { + tokio::select! { + Some(actions) = self.rx_actions_channel.recv() => { + let MessengerCommitActions {version, commit_actions } = actions; + + let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()) else { + // If publish is not present, continue the loop. + continue; + }; + + // TODO: GK - Make this block generic in next ticket to iterator in loop by PublishActionType + { + match get_actions_deserialised::>(publish_actions_for_type) { + Ok(actions) => { + + let total_len = actions.len() as u32; + for action in actions { + // Send to Kafka + self.publisher.send(version, action, total_len ).await; + + } + }, + Err(err) => { + error!("Failed to deserialise for version={version} key={} for data={:?} with error={:?}",&self.publisher.get_publish_type(), err.data, err.reason ) + }, + } + } + } + } + } + } + + async fn stop(&self) -> MessengerServiceResult { + todo!() + } +} diff --git a/packages/talos_messenger_actions/src/lib.rs b/packages/talos_messenger_actions/src/lib.rs new file mode 100644 index 00000000..b17877c5 --- /dev/null +++ b/packages/talos_messenger_actions/src/lib.rs @@ -0,0 +1 @@ +pub mod kafka; diff --git a/packages/talos_messenger_core/Cargo.toml b/packages/talos_messenger_core/Cargo.toml new file mode 100644 index 00000000..7313387e --- /dev/null +++ b/packages/talos_messenger_core/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "talos_messenger_core" +version = "0.1.0" +edition = "2021" + +[lib] +doctest = false + +[dependencies] +# Packages from workspace +async-trait = { workspace = true } +env_logger = { workspace = true } +log = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true, features = ["full"] } + +# Strum +strum = { version = "0.25", features = ["derive"] } + +# Futures +futures-util = "0.3.26" +futures-executor = "0.3.28" + +# Error +thiserror = { version = "1.0.31" } + +# Kafka +rdkafka = { version = "0.33.2", features = ["sasl"] } + +# Time +time = { version = "0.3.17" } + +# indexmap +indexmap = { version = "2.0.0", features = ["rayon"] } +ahash = "0.8.3" + +talos_certifier = { path = "../talos_certifier" } +talos_suffix = { path = "../talos_suffix" } +talos_common_utils = { path = "../../packages/talos_common_utils" } + +[dev-dependencies] +mockall = { version = "0.11.3" } +tokio-test = { version = "0.4.2" } +rand = { version = "0.8.5" } diff --git a/packages/talos_messenger_core/src/core.rs b/packages/talos_messenger_core/src/core.rs new file mode 100644 index 00000000..aa65d063 --- /dev/null +++ b/packages/talos_messenger_core/src/core.rs @@ -0,0 +1,46 @@ +use ahash::HashMap; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use strum::{Display, EnumIter, EnumString}; + +use crate::errors::MessengerServiceResult; + +#[derive(Debug, Display, Serialize, Deserialize, EnumString, EnumIter, Clone, Eq, PartialEq)] +pub enum CommitActionType { + #[strum(serialize = "publish")] + Publish, +} + +#[derive(Debug, Display, Serialize, Deserialize, EnumString, Clone, Eq, PartialEq)] +pub enum PublishActionType { + #[strum(serialize = "kafka")] + Kafka, +} + +#[async_trait] +pub trait MessengerPublisher { + type Payload; + type AdditionalData; + fn get_publish_type(&self) -> PublishActionType; + async fn send(&self, version: u64, payload: Self::Payload, additional_data: Self::AdditionalData) -> (); +} + +/// Trait to be implemented by all services. +#[async_trait] +pub trait MessengerSystemService { + async fn start(&self) -> MessengerServiceResult; + async fn run(&mut self) -> MessengerServiceResult; + async fn stop(&self) -> MessengerServiceResult; +} + +#[derive(Debug)] +pub struct MessengerCommitActions { + pub version: u64, + pub commit_actions: HashMap, +} + +pub enum MessengerChannelFeedback { + Error(u64, String), + Success(u64, String, u32), +} diff --git a/packages/talos_messenger_core/src/errors.rs b/packages/talos_messenger_core/src/errors.rs new file mode 100644 index 00000000..b44ae7ce --- /dev/null +++ b/packages/talos_messenger_core/src/errors.rs @@ -0,0 +1,31 @@ +use thiserror::Error as ThisError; + +pub type MessengerServiceResult = Result<(), MessengerServiceError>; + +#[derive(Debug, PartialEq, Clone)] +pub enum ActionErrorKind { + Deserialisation, +} +#[derive(Debug, ThisError, PartialEq, Clone)] +#[error("Action Error {kind:?} with reason={reason} for data={data:?}")] +pub struct ActionError { + pub kind: ActionErrorKind, + pub reason: String, + pub data: String, +} + +#[derive(Debug, PartialEq, Clone)] +pub enum MessengerServiceErrorKind { + System, + Channel, + Messaging, +} + +#[derive(Debug, ThisError, PartialEq, Clone)] +#[error("error in service={service} kind={kind:?} \n reason={reason} \n data={data:?}")] +pub struct MessengerServiceError { + pub kind: MessengerServiceErrorKind, + pub reason: String, + pub data: Option, + pub service: String, +} diff --git a/packages/talos_messenger_core/src/lib.rs b/packages/talos_messenger_core/src/lib.rs new file mode 100644 index 00000000..fd75be0d --- /dev/null +++ b/packages/talos_messenger_core/src/lib.rs @@ -0,0 +1,9 @@ +pub mod core; +pub mod errors; +pub mod services; +pub mod suffix; +pub mod talos_messenger_service; +pub mod utlis; + +#[cfg(test)] +mod tests; diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs new file mode 100644 index 00000000..49e8e8ed --- /dev/null +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -0,0 +1,207 @@ +// 1. Kafka - Get candidate message +// a. Store inmemory. +// 2. Kafka - Get decision message. +// a. Update the store. +// 3. Handle `On Commit` part of the message +// a. Can there be anything other than publishing to kafka? +// b. what if the topic doesnt exist? +// c. Any validation required on what is being published? +// d. Publish T(k) only if all prioir items are published or if safepoint of T(k) is published? +// e. If there are multiple messages to be published, should they be done serially?:- +// i. If to the same topic +// ii. If to another topic +// 4. After a message was published:- +// a. Mark that item as processed. +// b. Prune the store if contiguous items are processed. + +use ahash::{HashMap, HashMapExt}; +use async_trait::async_trait; +use log::{error, info, warn}; + +use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage}; +use talos_suffix::{Suffix, SuffixTrait}; +use tokio::sync::mpsc; + +use crate::{ + core::{MessengerChannelFeedback, MessengerCommitActions, MessengerSystemService}, + errors::{MessengerServiceError, MessengerServiceResult}, + suffix::{MessengerCandidate, MessengerSuffixItemTrait, MessengerSuffixTrait, SuffixItemCompleteStateReason, SuffixItemState}, + utlis::get_allowed_commit_actions, +}; + +pub struct MessengerInboundService +where + M: MessageReciever + Send + Sync + 'static, +{ + pub message_receiver: M, + pub tx_actions_channel: mpsc::Sender, + pub rx_feedback_channel: mpsc::Receiver, + pub suffix: Suffix, + pub allowed_actions: HashMap<&'static str, Vec<&'static str>>, +} + +impl MessengerInboundService +where + M: MessageReciever + Send + Sync + 'static, +{ + /// Get next versions with their commit actions to process. + /// + async fn process_next_actions(&mut self) -> MessengerServiceResult { + let items_to_process = self.suffix.get_suffix_items_to_process(); + + error!( + "Items to process count... {:#?}", + items_to_process.iter().map(|x| x.version).collect::>() + ); + + for item in items_to_process { + let ver = item.version; + + let payload_to_send = MessengerCommitActions { + version: ver, + commit_actions: item.actions.iter().fold(HashMap::new(), |mut acc, (key, value)| { + acc.insert(key.to_string(), value.get_payload().clone()); + acc + }), + }; + // send for publishing + self.tx_actions_channel.send(payload_to_send).await.map_err(|e| MessengerServiceError { + kind: crate::errors::MessengerServiceErrorKind::Channel, + reason: e.to_string(), + data: Some(ver.to_string()), + service: "Inbound Service".to_string(), + })?; + + // Mark item as in process + self.suffix.set_item_state(ver, SuffixItemState::Processing); + } + + Ok(()) + } + + /// + /// Handles the feedback received from other services when they have successfully processed the action. + /// Will update the individual action for the count and completed flag and also update state of the suffix item. + /// + pub(crate) fn handle_item_actioned_success(&mut self, version: u64, action_key: &str, total_count: u32) { + let item_state = self.suffix.get_item_state(version); + match item_state { + Some(SuffixItemState::Processing) | Some(SuffixItemState::PartiallyComplete) => { + self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete); + + self.suffix.update_action(version, action_key, total_count); + if self.suffix.all_actions_completed(version) { + self.suffix + .set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed)); + } + } + _ => (), + }; + error!( + "State change for version={version} from {item_state:?} => {:?}", + self.suffix.get_item_state(version) + ); + } +} + +#[async_trait] +impl MessengerSystemService for MessengerInboundService +where + M: MessageReciever + Send + Sync + 'static, +{ + async fn start(&self) -> MessengerServiceResult { + info!("Start Messenger service"); + Ok(()) + } + + async fn stop(&self) -> MessengerServiceResult { + todo!() + } + + async fn run(&mut self) -> MessengerServiceResult { + info!("Running Messenger service"); + loop { + tokio::select! { + // 1. Consume message. + Ok(Some(msg)) = self.message_receiver.consume_message() => { + + // 2. Add/update to suffix. + match msg { + // 2.1 For CM - Install messages on the version + ChannelMessage::Candidate(message) => { + + let version = message.version; + if message.version > 0 { + // insert item to suffix + let _ = self.suffix.insert(version, message.into()); + + if let Some(item_to_update) = self.suffix.get_mut(version){ + if let Some(commit_actions) = &item_to_update.item.candidate.on_commit { + let filter_actions = get_allowed_commit_actions(commit_actions, &self.allowed_actions); + if filter_actions.is_empty() { + // There are on_commit actions, but not the ones required by messenger + item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoRelavantCommitActions)); + } else { + item_to_update.item.set_commit_action(filter_actions); + } + } else { + // No on_commit actions + item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoCommitActions)); + + } + error!("[FILTERED ACTIONS] version={} state={:?} actions={:#?}", version, item_to_update.item.get_state(), item_to_update.item.get_commit_actions()); + }; + + } else { + warn!("Version 0 will not be inserted into suffix.") + } + + }, + // 2.2 For DM - Update the decision with outcome + safepoint. + ChannelMessage::Decision(decision_version, decision_message) => { + let version = decision_message.get_candidate_version(); + info!("[Decision Message] Version received = {} and {}", decision_version, version); + + self.suffix.update_item_decision(version, decision_version, &decision_message); + + self.process_next_actions().await? + + + // TODO: GK - Calculate the safe offset to commit. + + // TODO: GK - Prune suffix. + + }, + } + + } + // Next condition - Commit, get processed/published info. + + + + // Receive feedback from publisher. + Some(feedback) = self.rx_feedback_channel.recv() => { + match feedback { + // TODO: GK - What to do when we have error on publishing? Retry?? + MessengerChannelFeedback::Error(_, _) => panic!("Implement the error feedback"), + MessengerChannelFeedback::Success(version, key, total_count) => { + info!("Successfully received version={version} count={total_count}"); + + self.handle_item_actioned_success(version, &key, total_count); + + + // self.suffix.messages.iter().flatten().for_each(|item| + // error!("version={} decision={:?} state={:?} action_state={:#?}", item.item_ver, item.item.decision, item.item.get_state(), item.item.commit_actions.iter().map(|x| (x.1.count, x.1.is_completed)).collect::>()) + // ); + // info!("Suffix dump ={:?}") + // info!("State on completion ={:?}", item_state); + }, + } + // Process the next items with commit actions + self.process_next_actions().await? + + } + } + } + } +} diff --git a/packages/talos_messenger_core/src/services/mod.rs b/packages/talos_messenger_core/src/services/mod.rs new file mode 100644 index 00000000..75e294e4 --- /dev/null +++ b/packages/talos_messenger_core/src/services/mod.rs @@ -0,0 +1,3 @@ +mod inbound_service; + +pub use inbound_service::MessengerInboundService; diff --git a/packages/talos_messenger_core/src/suffix.rs b/packages/talos_messenger_core/src/suffix.rs new file mode 100644 index 00000000..615872b5 --- /dev/null +++ b/packages/talos_messenger_core/src/suffix.rs @@ -0,0 +1,297 @@ +use ahash::{HashMap, HashMapExt}; +use log::{error, warn}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::fmt::Debug; +use talos_certifier::model::{CandidateMessage, Decision, DecisionMessageTrait}; +use talos_suffix::{core::SuffixMeta, Suffix, SuffixItem, SuffixTrait}; + +pub trait MessengerSuffixItemTrait { + fn set_state(&mut self, state: SuffixItemState); + fn set_safepoint(&mut self, safepoint: Option); + fn set_commit_action(&mut self, commit_actions: HashMap); + fn set_decision(&mut self, decision: Decision); + + fn get_state(&self) -> &SuffixItemState; + fn get_commit_actions(&self) -> &HashMap; + fn get_action_by_key_mut(&mut self, action_key: &str) -> Option<&mut AllowedActionsMapItem>; + fn get_safepoint(&self) -> &Option; + + fn is_abort(&self) -> Option; +} + +pub trait MessengerSuffixTrait: SuffixTrait { + // Setters + fn set_item_state(&mut self, version: u64, process_state: SuffixItemState); + + // Getters + fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem>; + fn get_item_state(&self, version: u64) -> Option; + fn get_last_installed(&self, to_version: Option) -> Option<&SuffixItem>; + // fn update_suffix_item_decision(&mut self, version: u64, decision_ver: u64) -> SuffixResult<()>; + fn get_suffix_meta(&self) -> &SuffixMeta; + fn installed_all_prior_decided_items(&self, version: u64) -> bool; + + fn get_suffix_items_to_process(&self) -> Vec; + // updates + fn update_prune_index(&mut self, version: u64); + fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D); + fn update_action(&mut self, version: u64, action_key: &str, total_count: u32); + + fn all_actions_completed(&self, version: u64) -> bool; +} + +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +pub enum SuffixItemState { + AwaitingDecision, + ReadyToProcess, + Processing, + PartiallyComplete, + Complete(SuffixItemCompleteStateReason), +} + +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +pub enum SuffixItemCompleteStateReason { + /// When the decision is an abort + Aborted, + /// When there are no commit actions. + NoCommitActions, + /// When there are commit actions, but they are not required to be handled in messenger + NoRelavantCommitActions, + //TODO: GK - Mark as error? + /// When there is an error? + // Error(String), + /// When all commit action has are completed. + Processed, +} + +// #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +// pub struct AllowedActionsMapValueMeta { +// pub total_count: u32, +// pub completed_count: u32, +// } +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +pub struct AllowedActionsMapItem { + payload: Value, + count: u32, + is_completed: bool, +} + +impl AllowedActionsMapItem { + pub fn new(payload: Value) -> Self { + AllowedActionsMapItem { + payload, + count: 0, + is_completed: false, + } + } + pub fn update_count(&mut self) { + self.count += 1; + } + + pub fn mark_completed(&mut self) { + self.is_completed = true; + } + + pub fn get_payload(&self) -> &Value { + &self.payload + } + + pub fn get_count(&self) -> u32 { + self.count + } + + pub fn is_completed(&self) -> bool { + self.is_completed + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +pub struct ActionsMapWithVersion { + pub actions: HashMap, + pub version: u64, +} + +#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] +pub struct MessengerCandidate { + pub candidate: CandidateMessage, + /// Safepoint received for committed outcomes from certifier. + safepoint: Option, + /// Decision received from certifier. + decision: Option, + /// Suffix item state. + state: SuffixItemState, + /// Filtered actions that need to be processed by the messenger + allowed_actions_map: HashMap, +} + +impl From for MessengerCandidate { + fn from(candidate: CandidateMessage) -> Self { + MessengerCandidate { + candidate, + safepoint: None, + decision: None, + + state: SuffixItemState::AwaitingDecision, + allowed_actions_map: HashMap::new(), + } + } +} + +impl MessengerSuffixItemTrait for MessengerCandidate { + fn set_safepoint(&mut self, safepoint: Option) { + self.safepoint = safepoint; + } + + fn set_decision(&mut self, decision: Decision) { + self.decision = Some(decision); + } + + fn set_state(&mut self, state: SuffixItemState) { + self.state = state; + } + + fn set_commit_action(&mut self, commit_actions: HashMap) { + self.allowed_actions_map = commit_actions + } + + fn get_state(&self) -> &SuffixItemState { + &self.state + } + + fn get_safepoint(&self) -> &Option { + &self.safepoint + } + fn get_commit_actions(&self) -> &HashMap { + &self.allowed_actions_map + // &None + } + + fn is_abort(&self) -> Option { + Some(self.decision.clone()?.eq(&Decision::Aborted)) + } + + fn get_action_by_key_mut(&mut self, action_key: &str) -> Option<&mut AllowedActionsMapItem> { + self.allowed_actions_map.get_mut(action_key) + } +} + +impl MessengerSuffixTrait for Suffix +where + T: MessengerSuffixItemTrait + Debug + Clone, +{ + // TODO: GK - Elevate this to core suffix + fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem> { + let index = self.index_from_head(version)?; + self.messages.get_mut(index)?.as_mut() + } + + fn set_item_state(&mut self, version: u64, process_state: SuffixItemState) { + if let Some(item_to_update) = self.get_mut(version) { + item_to_update.item.set_state(process_state) + } + } + + fn get_item_state(&self, version: u64) -> Option { + if let Ok(Some(suffix_item)) = self.get(version) { + Some(suffix_item.item.get_state().clone()) + } else { + None + } + } + + fn get_last_installed(&self, _to_version: Option) -> Option<&SuffixItem> { + todo!() + } + + // TODO: GK - Elevate this to core suffix + fn get_suffix_meta(&self) -> &SuffixMeta { + &self.meta + } + + fn installed_all_prior_decided_items(&self, _version: u64) -> bool { + todo!() + } + + fn get_suffix_items_to_process(&self) -> Vec { + let items = self + .messages + .iter() + // Remove `None` items + .flatten() + // Filter only the items awaiting to be processed. + .filter(|&x| x.item.get_state().eq(&SuffixItemState::ReadyToProcess)) + // Take while contiguous ones, whose safepoint is already processed. + .take_while(|&x| { + let Some(safepoint) = x.item.get_safepoint() else { + error!("take while early exit for version {:?}", x.item_ver); + return false; + }; + + match self.get(*safepoint) { + // If we find the suffix item from the safepoint, we need to ensure that it already in `Complete` state + Ok(Some(safepoint_item)) => { + error!("State of safepoint items is {:?}", safepoint_item.item.get_state()); + matches!(safepoint_item.item.get_state(), SuffixItemState::Complete(..)) + } + // If we couldn't find the item in suffix, it could be because it was pruned and it is safe to assume that we can consider it. + _ => true, + } + }) + .map(|x| ActionsMapWithVersion { + version: x.item_ver, + actions: x.item.get_commit_actions().clone(), + }) + .collect(); + + items + } + + fn update_prune_index(&mut self, _version: u64) { + todo!() + } + + fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D) { + let _ = self.update_decision_suffix_item(version, decision_version); + + if let Some(item_to_update) = self.get_mut(version) { + // If abort, mark the item as processed. + if decision_message.is_abort() { + item_to_update + .item + .set_state(SuffixItemState::Complete(crate::suffix::SuffixItemCompleteStateReason::Aborted)); + } else if item_to_update.item.get_state().eq(&SuffixItemState::AwaitingDecision) { + item_to_update.item.set_state(SuffixItemState::ReadyToProcess); + } + + item_to_update.item.set_decision(decision_message.get_decision().clone()); + item_to_update.item.set_safepoint(decision_message.get_safepoint()); + } + } + + fn update_action(&mut self, version: u64, action_key: &str, total_count: u32) { + if let Some(item_to_update) = self.get_mut(version) { + if let Some(action) = item_to_update.item.get_action_by_key_mut(action_key) { + action.update_count(); + + if action.get_count() == total_count { + action.mark_completed(); + } + } else { + warn!("Could not update the action as item with version={version} does not have action_key={action_key}! "); + } + } else { + warn!("Could not update the action as item with version={version} was not found! "); + } + } + + fn all_actions_completed(&self, version: u64) -> bool { + if let Ok(Some(item)) = self.get(version) { + item.item.get_commit_actions().iter().all(|(_, x)| x.is_completed()) + } else { + warn!("could not find item for version={version}"); + // TODO: GK - handle this in another way for future? + true + } + } +} diff --git a/packages/talos_messenger_core/src/talos_messenger_service.rs b/packages/talos_messenger_core/src/talos_messenger_service.rs new file mode 100644 index 00000000..5a61a2b4 --- /dev/null +++ b/packages/talos_messenger_core/src/talos_messenger_service.rs @@ -0,0 +1,29 @@ +use futures_util::future::try_join_all; + +use crate::{ + core::MessengerSystemService, + errors::{MessengerServiceError, MessengerServiceErrorKind, MessengerServiceResult}, +}; + +pub struct TalosMessengerService { + pub services: Vec>, +} + +impl TalosMessengerService { + pub async fn run(self) -> MessengerServiceResult { + let service_handles = self.services.into_iter().map(|mut service| tokio::spawn(async move { service.run().await })); + + let k = try_join_all(service_handles).await.map_err(|e| MessengerServiceError { + kind: MessengerServiceErrorKind::System, + reason: e.to_string(), + data: None, + service: "Main thread".to_string(), + })?; + + for res in k { + res? + } + + Ok(()) + } +} diff --git a/packages/talos_messenger_core/src/tests/mod.rs b/packages/talos_messenger_core/src/tests/mod.rs new file mode 100644 index 00000000..80ce8455 --- /dev/null +++ b/packages/talos_messenger_core/src/tests/mod.rs @@ -0,0 +1 @@ +pub(crate) mod utils; diff --git a/packages/talos_messenger_core/src/tests/utils.rs b/packages/talos_messenger_core/src/tests/utils.rs new file mode 100644 index 00000000..f0ca8427 --- /dev/null +++ b/packages/talos_messenger_core/src/tests/utils.rs @@ -0,0 +1,223 @@ +use ahash::{HashMap, HashMapExt}; +use serde_json::{json, Value}; + +use crate::utlis::{get_actions_deserialised, get_allowed_commit_actions}; + +// Start - testing get_allowed_commit_actions function +#[test] +fn test_fn_get_allowed_commit_actions_allowed_actions_negative_scenarios() { + let mut allowed_actions = HashMap::new(); + + let on_commit_actions = serde_json::json!({ + "publish": { + "kafka": [ + { + "_typ": "KafkaMessage", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + }, + { + "_typ": "KafkaMessage", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + } + ], + "mqtt": [ + { + "_typ": "Mqtt", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + } + ] + } + }); + + // When allowed action map is empty. + let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + assert!(result.is_empty()); + + // When allowed action is supported type by the messenger, but the sub actions are not provided + allowed_actions.clear(); + allowed_actions.insert("publish", vec![]); + let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + assert!(result.is_empty()); + + // When allowed action is supported type by the messenger, but the sub actions are not supported + allowed_actions.clear(); + allowed_actions.insert("publish", vec!["sqs", "sns"]); + let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + assert!(result.is_empty()); + + // When allowed action is non supported type by the messenger, with empty sub type + allowed_actions.clear(); + allowed_actions.insert("random", vec![]); + let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + assert!(result.is_empty()); + + // When allowed action is non supported type by the messenger, but has valid sub actions + allowed_actions.clear(); + allowed_actions.insert("random", vec!["sqs", "sns", "kafka", "mqtt"]); + let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + assert!(result.is_empty()); +} + +#[test] +fn test_fn_get_allowed_commit_actions_on_commit_action_negative_scenarios() { + let mut allowed_actions = HashMap::new(); + allowed_actions.insert("publish", vec!["sqs", "sns", "kafka", "mqtt"]); + + // When on_commit_actions are not present + let on_commit_actions = serde_json::json!({}); + let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + assert!(result.is_empty()); + + // When on_commit_actions is of type not supported by messenger + // 1. When actions is an array + let on_commit_actions = serde_json::json!([1, 2, 3, 4]); + let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + assert!(result.is_empty()); + + // 2. When actions is some other object type + let on_commit_actions = serde_json::json!({ + "test": { + "a": "foo", + "kafka": "bar" + } + }); + let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + assert!(result.is_empty()); + + // When on_commit_actions is valid json supported by messenger, but not the action required by messenger + let on_commit_actions = serde_json::json!({ + "random": { + "kafka": [ + { + "_typ": "KafkaMessage", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + }, + { + "_typ": "KafkaMessage", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + } + ], + "mqtt": [ + { + "_typ": "Mqtt", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + } + ] + } + }); + let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + assert!(result.is_empty()); + + // When on_commit_actions is valid json supported by messenger, with valid action, but the sub-actions are not supported by messenger + let on_commit_actions = serde_json::json!({ + "publish": { + "foo": "Lorem", + "bar": "Ipsum" + } + }); + let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + assert!(result.is_empty()); +} + +#[test] +fn test_fn_get_allowed_commit_actions_positive_scenario() { + let mut allowed_actions = HashMap::new(); + + let on_commit_actions = serde_json::json!({ + "publish": { + "kafka": [ + { + "_typ": "KafkaMessage", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + }, + { + "_typ": "KafkaMessage", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + } + ], + "mqtt": [ + { + "_typ": "Mqtt", + "topic": "${env}.chimera.coupons", // ${env} is substituted by the Kafka provider + "value": "test" + } + ] + } + }); + + allowed_actions.insert("publish", vec!["sqs", "sns", "kafka", "mqtt"]); + let result = get_allowed_commit_actions(&on_commit_actions, &allowed_actions); + assert_eq!(result.len(), 2); +} + +// End - testing get_allowed_commit_actions function + +// Start - testing get_actions_deserialised function + +// Negative scenarios:- +// 1. Value +// . Empty or null Value +// . Array +// . String +// +// 2. Key +#[test] +fn test_fn_get_actions_deserialised_actions_incorrect_arguments() { + let mut actions_map: HashMap = HashMap::new(); + + // When value is empty string. + actions_map.insert("kafka".to_string(), "".into()); + let result = get_actions_deserialised::>(actions_map.get("kafka").unwrap()); + assert!(result.is_err()); + + // When value is Array of string, but we want to parse it to array of u32. + actions_map.insert("kafka".to_string(), vec!["foo", "bar"].into()); + let result = get_actions_deserialised::>(actions_map.get("kafka").unwrap()); + assert!(result.is_err()); +} +#[test] +fn test_fn_get_actions_deserialised_actions_correct_arguments_passed() { + let mut actions_map: HashMap = HashMap::new(); + + // When value is empty string. + actions_map.insert("kafka".to_string(), "".into()); + let result = get_actions_deserialised::(actions_map.get("kafka").unwrap()); + assert!(result.is_ok()); + + // When value is Array of string. + actions_map.insert("kafka".to_string(), vec!["foo", "bar"].into()); + let result = get_actions_deserialised::>(actions_map.get("kafka").unwrap()); + assert!(result.is_ok()); + + // More complex type + #[derive(Debug, serde::Serialize, serde::Deserialize)] + struct Address { + street_number: u32, + street: String, + city: String, + state: String, + } + + actions_map.insert( + "address".to_string(), + json!( + + { + "street_number": 47, + "street": "Wallaby Way".to_string(), + "city": "Sydney".to_string(), + "state": "New South Wales".to_string(), + } + ), + ); + let result = get_actions_deserialised::
(actions_map.get("address").unwrap()); + assert!(result.is_ok()); +} diff --git a/packages/talos_messenger_core/src/utlis.rs b/packages/talos_messenger_core/src/utlis.rs new file mode 100644 index 00000000..f65cfb67 --- /dev/null +++ b/packages/talos_messenger_core/src/utlis.rs @@ -0,0 +1,100 @@ +use std::any::type_name; + +use ahash::{HashMap, HashMapExt}; +use serde::de::DeserializeOwned; +use serde_json::Value; + +use crate::{errors::ActionError, suffix::AllowedActionsMapItem}; + +/// Retrieves the serde_json::Value for a given key +pub fn get_value_by_key<'a>(value: &'a Value, key: &str) -> Option<&'a Value> { + value.get(key) +} + +/// Create a Hashmap of all the actions that require to be actioned by the messenger. +/// Key for the map is the Action type. eg: "kafka", "mqtt" ..etc +/// Value for the map contains the payload and some meta information like items actioned, and is completed flag +pub fn get_allowed_commit_actions( + on_commit_actions: &Value, + allowed_actions: &HashMap<&'static str, Vec<&'static str>>, +) -> HashMap { + let mut filtered_actions = HashMap::new(); + + allowed_actions.iter().for_each(|(action_key, sub_action_keys)| { + if let Some(action) = get_value_by_key(on_commit_actions, action_key) { + for sub_action_key in sub_action_keys { + if let Some(sub_action) = get_value_by_key(action, sub_action_key) { + filtered_actions.insert(sub_action_key.to_string(), AllowedActionsMapItem::new(sub_action.clone())); + } + } + } + }); + + filtered_actions +} + +/// Retrieves sub actions under publish by using a look key. +pub fn get_actions_deserialised(actions: &Value) -> Result { + match serde_json::from_value(actions.clone()) { + Ok(res) => Ok(res), + Err(err) => Err(ActionError { + kind: crate::errors::ActionErrorKind::Deserialisation, + reason: format!("Deserialisation to type={} failed, with error={:?}", type_name::(), err), + data: actions.to_string(), + }), + } +} + +///// Retrieves the oncommit actions that are supported by the system. +// fn get_allowed_commit_actions(version: &u64, on_commit_actions: &Value) -> Option { +// let Some(publish_actions) = on_commit_actions.get("publish") else { +// warn!("No publish actions found for version={version} in {on_commit_actions}"); +// return None; +// }; + +// // TODO: GK - In future we will need to check if there are other type that we are interested in, and not just Kafka +// match get_sub_actions::>(version, publish_actions, "kafka") { +// Some(kafka_actions) if !kafka_actions.is_empty() => Some(OnCommitActions::Publish(Some(PublishActions::Kafka(kafka_actions)))), +// _ => None, +// } +// } + +///// Retrieves sub actions under publish by using a look key. +// fn deserialize_commit_actions(version: &u64, actions: &Value) -> Option { +// match serde_json::from_value(actions.clone()) { +// Ok(res) => Some(res), +// Err(err) => { +// warn!("Failed to parse on commit actions for version={version} with error={:?} for {actions}", err); +// None +// } +// } +// } + +///// Checks if the relevant oncommit actions are available. +// fn has_supported_commit_actions(version: &u64, on_commit_actions: &Value) -> bool { +// let Some(publish_actions) = get_value_by_key(on_commit_actions, "publish") else { +// warn!("No publish actions found for version={version} in {on_commit_actions}"); +// return false; +// }; + +// get_value_by_key(publish_actions, "kafka").is_some() +// } + +///// Retrieves sub actions under publish by using a look key. +// fn get_sub_actions(version: &u64, actions: &Value, key: &str) -> Option { +// let Some(sub_action_value) = actions.get(key) else { +// warn!("No {key} publish actions found for version={version} in {actions}"); +// return None; +// }; + +// match serde_json::from_value(sub_action_value.clone()) { +// Ok(res) => Some(res), +// Err(err) => { +// warn!( +// "Failed to parse {key} on commit actions for version={version} with error={:?} for {actions}", +// err +// ); +// None +// } +// } +// } diff --git a/packages/talos_suffix/src/core.rs b/packages/talos_suffix/src/core.rs index 5257b32a..9ed1830c 100644 --- a/packages/talos_suffix/src/core.rs +++ b/packages/talos_suffix/src/core.rs @@ -55,7 +55,7 @@ pub struct SuffixMeta { pub type SuffixItemType = T; pub trait SuffixTrait { - fn get(&mut self, version: u64) -> SuffixResult>>; + fn get(&self, version: u64) -> SuffixResult>>; fn insert(&mut self, version: u64, message: SuffixItemType) -> SuffixResult<()>; fn update_decision(&mut self, version: u64, decision_ver: u64) -> SuffixResult<()>; fn prune_till_index(&mut self, index: usize) -> SuffixResult>>>; diff --git a/packages/talos_suffix/src/suffix.rs b/packages/talos_suffix/src/suffix.rs index e0ae433a..8ccfe331 100644 --- a/packages/talos_suffix/src/suffix.rs +++ b/packages/talos_suffix/src/suffix.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; -use log::{debug, info, warn}; +use log::{debug, error, info, warn}; use crate::{ core::{SuffixConfig, SuffixMeta, SuffixResult, SuffixTrait}, @@ -188,12 +188,12 @@ where pub fn update_decision_suffix_item(&mut self, version: u64, decision_ver: u64) -> SuffixResult<()> { // When Certifier is catching up with messages ignore the messages which are prior to the head if version < self.meta.head { - info!("Returned due to version < self.meta.head for version={version} and decision version={decision_ver}"); + error!("Returned due to version < self.meta.head for version={version} and decision version={decision_ver}"); return Ok(()); } let Some(sfx_item) = self.get(version)? else { - info!( + error!( "Returned due item not found in suffix for version={version} with index={:?} and decision version={decision_ver}", self.index_from_head(version) ); @@ -221,7 +221,7 @@ impl SuffixTrait for Suffix where T: Sized + Clone + std::fmt::Debug, { - fn get(&mut self, version: u64) -> SuffixResult>> { + fn get(&self, version: u64) -> SuffixResult>> { let index = self.index_from_head(version).ok_or(SuffixError::VersionToIndexConversionError(version))?; let suffix_item = self.messages.get(index).and_then(|x| x.as_ref()).cloned();