Skip to content

Commit

Permalink
feat: Talos messenger base (#83)
Browse files Browse the repository at this point in the history
* temp commit

* feat: initial commit connecting the both ends

* feat: first successful publish

* feat: update the logic to use state transitions

Use states for the overall candidate as well as for each specific action

* feat: split core functionality and kafka action specific logic into separate packages

* chore: refactor inbound service code into smaller fns

* feat: minor refactor and unit tests

* chore: revert the postgres user from admin to postgres

* chore: updates from review comments

* chore: minor review comment fix
  • Loading branch information
gk-kindred authored Sep 26, 2023
1 parent d99010a commit 4c58df0
Show file tree
Hide file tree
Showing 30 changed files with 1,543 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,6 @@ STATEMAP_INSTALLER_THREAD_POOL=10
BANK_REPLICATOR_KAFKA_GROUP_ID="talos-replicator-dev"
BANK_STATEMAP_INSTALLER_MAX_RETRY=5
BANK_STATEMAP_INSTALL_RETRY_WAIT_MS=2

# Messenger environment variables
TALOS_MESSENGER_KAFKA_GROUP_ID="talos-messenger-dev"
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,6 @@ Temporary Items
tasklist.md

docs/.*.bkp

# Temporary testing bin
packages/talos_messenger_actions/src/bin/test**.rs
74 changes: 74 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ dev.histogram_decision_timeline_from_kafka:
$(call pp,histogram_decision_timeline_from_kafka...)
cargo run --bin histogram_decision_timeline_from_kafka --release -- $(args)

## example.messenger_kafka: 🧪 Runs the example messenger that uses Kafka.
example.messenger_kafka:
$(call pp,run app...)
cargo run -r --example messenger_using_kafka

## lint: 🧹 Checks for lint failures on rust
lint:
$(call pp,lint rust...)
Expand Down
2 changes: 2 additions & 0 deletions examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ async fn main() -> Result<(), impl std::error::Error> {
info!("Talos certifier starting...");

let kafka_config = KafkaConfig::from_env(None);
// kafka_config.extend(None, None);

let pg_config = PgConfig::from_env();
let mock_config = get_mock_config();
let suffix_config = Some(SuffixConfig {
Expand Down
33 changes: 33 additions & 0 deletions examples/messenger_using_kafka/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
# [package]
name = "messenger_using_kafka"
version = "0.0.1"
edition = "2021"
keywords = ["talos"]
description = "Example on consuming `talos_messenger`"

[dependencies]
# Logging
log = { workspace = true }
env_logger = { workspace = true }
# Async
tokio = { workspace = true }
async-trait = { workspace = true }
# Serde
serde = { workspace = true }
serde_json = { workspace = true }

# Kafka
rdkafka = { version = "0.33.2", features = ["sasl"] }

# ahash
ahash = "0.8.3"

# internal crates
talos_certifier = { path = "../../packages/talos_certifier" }
talos_suffix = { path = "../../packages/talos_suffix" }
talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters" }
talos_common_utils = { path = "../../packages/talos_common_utils" }
talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" }
talos_messenger_core = { path = "../../packages/talos_messenger_core" }
talos_messenger_actions = { path = "../../packages/talos_messenger_actions" }
87 changes: 87 additions & 0 deletions examples/messenger_using_kafka/examples/messenger_using_kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use ahash::{HashMap, HashMapExt};
use talos_certifier::ports::MessageReciever;
use talos_certifier_adapters::KafkaConsumer;
use talos_common_utils::env_var;
use talos_messenger_actions::kafka::{
producer::{KafkaProducer, MessengerKafkaProducerContext},
service::KafkaActionService,
};
use talos_messenger_core::{services::MessengerInboundService, suffix::MessengerCandidate, talos_messenger_service::TalosMessengerService};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use talos_suffix::{core::SuffixConfig, Suffix};
use tokio::sync::mpsc;

use messenger_using_kafka::kafka_producer::MessengerKafkaPublisher;

#[tokio::main]
async fn main() {
env_logger::builder().format_timestamp_millis().init();

// 0. Create required items.
// a. Create Kafka consumer
let mut kafka_config = KafkaConfig::from_env(None);
kafka_config.group_id = env_var!("TALOS_MESSENGER_KAFKA_GROUP_ID");
kafka_config.extend(
None,
Some(
[
("enable.auto.commit".to_string(), "false".to_string()),
("auto.offset.reset".to_string(), "earliest".to_string()),
// ("fetch.wait.max.ms".to_string(), "600".to_string()),
// ("socket.keepalive.enable".to_string(), "true".to_string()),
// ("acks".to_string(), "0".to_string()),
]
.into(),
),
);
let kafka_consumer = KafkaConsumer::new(&kafka_config);

// b. Subscribe to topic.
kafka_consumer.subscribe().await.unwrap();

let (tx_feedback_channel, rx_feedback_channel) = mpsc::channel(10_000);
let (tx_actions_channel, rx_actions_channel) = mpsc::channel(10_000);

let suffix_config = SuffixConfig {
capacity: 400_000,
prune_start_threshold: Some(2_000),
min_size_after_prune: None,
};
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(suffix_config);

let mut whitelisted_actions = HashMap::<&'static str, Vec<&'static str>>::new();
// TODO: GK - Set through env
whitelisted_actions.insert("publish", vec!["kafka"]);

let inbound_service = MessengerInboundService {
message_receiver: kafka_consumer,
tx_actions_channel,
rx_feedback_channel,
suffix,
allowed_actions: whitelisted_actions,
};

// TODO: GK - create topic should be part of publish.
kafka_config.topic = "test.messenger.topic".to_string();

let tx_feedback_channel_clone = tx_feedback_channel.clone();
let custom_context = MessengerKafkaProducerContext {
tx_feedback_channel: tx_feedback_channel_clone,
};
let kafka_producer = KafkaProducer::with_context(&kafka_config, custom_context);
let messenger_kafka_publisher = MessengerKafkaPublisher { publisher: kafka_producer };

let publish_service = KafkaActionService {
publisher: messenger_kafka_publisher,
rx_actions_channel,
tx_feedback_channel,
};

// inbound_service.run().await.unwrap();

let messenger_service = TalosMessengerService {
services: vec![Box::new(inbound_service), Box::new(publish_service)],
};

messenger_service.run().await.unwrap();
}
55 changes: 55 additions & 0 deletions examples/messenger_using_kafka/src/kafka_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use async_trait::async_trait;
use log::info;
use rdkafka::producer::ProducerContext;
use talos_messenger_actions::kafka::{
models::KafkaAction,
producer::{KafkaProducer, MessengerProducerDeliveryOpaque},
};
use talos_messenger_core::core::{MessengerPublisher, PublishActionType};
// use talos_messenger::{
// core::{MessengerPublisher, PublishActionType},
// kafka::producer::{KafkaProducer, MessengerProducerDeliveryOpaque},
// models::commit_actions::publish::KafkaAction,
// };

pub struct MessengerKafkaPublisher<C: ProducerContext + 'static> {
pub publisher: KafkaProducer<C>,
}

#[async_trait]
impl<C> MessengerPublisher for MessengerKafkaPublisher<C>
where
C: ProducerContext<DeliveryOpaque = Box<MessengerProducerDeliveryOpaque>> + 'static,
{
type Payload = KafkaAction;
type AdditionalData = u32;
fn get_publish_type(&self) -> PublishActionType {
PublishActionType::Kafka
}

async fn send(&self, version: u64, payload: Self::Payload, additional_data: Self::AdditionalData) -> () {
info!("[MessengerKafkaPublisher] Publishing message with payload=\n{payload:#?}");

let mut bytes: Vec<u8> = Vec::new();
serde_json::to_writer(&mut bytes, &payload.value).unwrap();

let payload_str = std::str::from_utf8(&bytes).unwrap();
info!("[MessengerKafkaPublisher] base_record=\n{payload_str:#?}");

let delivery_opaque = MessengerProducerDeliveryOpaque {
version,
total_publish_count: additional_data,
};

self.publisher
.publish_to_topic(
&payload.topic,
payload.partition,
payload.key.as_deref(),
payload_str,
None,
Box::new(delivery_opaque),
)
.unwrap();
}
}
1 change: 1 addition & 0 deletions examples/messenger_using_kafka/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod kafka_producer;
4 changes: 1 addition & 3 deletions packages/talos_certifier/src/model/candidate_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use std::collections::HashMap;

use crate::certifier::CertifierCandidate;

use super::delivery_order::DeliveryOrder;

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[serde(rename_all = "camelCase", tag = "_typ")]
pub struct CandidateMessage {
Expand All @@ -25,7 +23,7 @@ pub struct CandidateMessage {
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub on_commit: Option<HashMap<String, Vec<DeliveryOrder>>>,
pub on_commit: Option<Box<Value>>,

#[serde(skip_serializing_if = "Option::is_none")]
pub statemap: Option<Vec<HashMap<String, Value>>>,
Expand Down
5 changes: 5 additions & 0 deletions packages/talos_certifier/src/model/decision_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub trait DecisionMessageTrait {
fn get_candidate_version(&self) -> u64;
fn get_safepoint(&self) -> Option<u64>;
fn get_decision(&self) -> &Decision;
fn is_abort(&self) -> bool;
fn is_duplicate(&self) -> bool;
fn get_decided_at(&self) -> i128;
}
Expand All @@ -64,6 +65,10 @@ impl DecisionMessageTrait for DecisionMessage {
&self.decision
}

fn is_abort(&self) -> bool {
self.decision == Decision::Aborted
}

fn is_duplicate(&self) -> bool {
self.duplicate_version.is_some()
}
Expand Down
46 changes: 46 additions & 0 deletions packages/talos_messenger_actions/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[package]
name = "talos_messenger_actions"
version = "0.1.0"
edition = "2021"

[lib]
doctest = false

[dependencies]
# Packages from workspace
async-trait = { workspace = true }
env_logger = { workspace = true }
log = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["full"] }

# Strum
strum = { version = "0.25", features = ["derive"] }

# Futures
futures-util = "0.3.26"
futures-executor = "0.3.28"

# Error
thiserror = { version = "1.0.31" }

# Kafka
rdkafka = { version = "0.33.2", features = ["sasl"] }

# Time
time = { version = "0.3.17" }

# indexmap
indexmap = { version = "2.0.0", features = ["rayon"] }
ahash = "0.8.3"

talos_certifier = { path = "../talos_certifier" }
talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters" }
talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" }
talos_messenger_core = { path = "../../packages/talos_messenger_core" }

[dev-dependencies]
mockall = { version = "0.11.3" }
tokio-test = { version = "0.4.2" }
rand = { version = "0.8.5" }
3 changes: 3 additions & 0 deletions packages/talos_messenger_actions/src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod models;
pub mod producer;
pub mod service;
Loading

0 comments on commit 4c58df0

Please sign in to comment.