Skip to content

Commit

Permalink
chore: updates from review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Sep 26, 2023
1 parent dc6d84e commit 283936a
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn main() {
let suffix: Suffix<MessengerCandidate> = Suffix::with_config(suffix_config);

let mut whitelisted_actions = HashMap::<&'static str, Vec<&'static str>>::new();

// TODO: GK - Set through env
whitelisted_actions.insert("publish", vec!["kafka"]);

let inbound_service = MessengerInboundService {
Expand Down
13 changes: 10 additions & 3 deletions examples/messenger_using_kafka/src/kafka_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,23 @@ where
let mut bytes: Vec<u8> = Vec::new();
serde_json::to_writer(&mut bytes, &payload.value).unwrap();

let payld = std::str::from_utf8(&bytes).unwrap();
info!("[MessengerKafkaPublisher] base_record=\n{payld:#?}");
let payload_str = std::str::from_utf8(&bytes).unwrap();
info!("[MessengerKafkaPublisher] base_record=\n{payload_str:#?}");

let delivery_opaque = MessengerProducerDeliveryOpaque {
version,
total_publish_count: additional_data,
};

self.publisher
.publish_to_topic("test.messenger.topic", "test", payld, None, Box::new(delivery_opaque))
.publish_to_topic(
&payload.topic,
payload.partition,
payload.key.as_deref(),
payload_str,
None,
Box::new(delivery_opaque),
)
.unwrap();
}
}
16 changes: 12 additions & 4 deletions packages/talos_messenger_actions/src/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,21 @@ impl<C: ProducerContext + 'static> KafkaProducer<C> {
pub fn publish_to_topic(
&self,
topic: &str,
key: &str,
partition: Option<i32>,
key: Option<&str>,
value: &str,
headers: Option<HashMap<String, String>>,
delivery_opaque: C::DeliveryOpaque,
) -> Result<(), MessagePublishError> {
let record = BaseRecord::with_opaque_to(topic, delivery_opaque).payload(value).key(key);
let record = BaseRecord::with_opaque_to(topic, delivery_opaque).payload(value);

// Add partition if applicable
let record = if let Some(part) = partition { record.partition(part) } else { record };

// Add key if applicable
let record = if let Some(key_str) = key { record.key(key_str) } else { record };

// Add headers if applicable
let record = match headers {
Some(x) => record.headers(build_kafka_headers(x)),
None => record,
Expand All @@ -116,7 +124,7 @@ impl MessagePublisher for KafkaProducer {
None => record,
};

debug!("Preparing to send the Decision Message. ");
debug!("Preparing to publish the message. ");
let delivery_result = self
.producer
.send(record)
Expand All @@ -127,7 +135,7 @@ impl MessagePublisher for KafkaProducer {
data: Some(format!("{:?}", record)),
})?;

debug!("Sent the Decision Message successfully {:?} ", delivery_result.to_owned());
debug!("Published the message successfully {:?} ", delivery_result.to_owned());
Ok(())
}
}
Expand Down
21 changes: 2 additions & 19 deletions packages/talos_messenger_core/src/talos_messenger_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ pub struct TalosMessengerService {

impl TalosMessengerService {
pub async fn run(self) -> MessengerServiceResult {
let service_handle = self.services.into_iter().map(|mut service| tokio::spawn(async move { service.run().await }));
let service_handles = self.services.into_iter().map(|mut service| tokio::spawn(async move { service.run().await }));

let k = try_join_all(service_handle).await.map_err(|e| MessengerServiceError {
let k = try_join_all(service_handles).await.map_err(|e| MessengerServiceError {
kind: MessengerServiceErrorKind::System,
reason: e.to_string(),
data: None,
Expand All @@ -26,21 +26,4 @@ impl TalosMessengerService {

Ok(())
}

// pub async fn shutdown(self) -> MessengerServiceResult {
// let service_handle = self.services.into_iter().map(|service| tokio::spawn(async move { service.stop().await }));

// let k = try_join_all(service_handle).await.map_err(|e| MessengerServiceError {
// kind: MessengerServiceErrorKind::System,
// reason: e.to_string(),
// data: None,
// service: "Main thread".to_string(),
// })?;

// for res in k {
// res?
// }

// Ok(())
// }
}

0 comments on commit 283936a

Please sign in to comment.