diff --git a/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs b/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs index dff50829..b236740b 100644 --- a/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs +++ b/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs @@ -33,6 +33,7 @@ async fn main() -> Result<(), impl std::error::Error> { pg_config: Some(pg_config), kafka_config, db_mock: mock_config.db_mock, + app_name: None, }; let talos_certifier = certifier_with_kafka_pg(TalosCertifierChannelBuffers::default(), configuration).await?; diff --git a/packages/talos_certifier/src/core.rs b/packages/talos_certifier/src/core.rs index 758a2d25..2c60ac32 100644 --- a/packages/talos_certifier/src/core.rs +++ b/packages/talos_certifier/src/core.rs @@ -54,6 +54,8 @@ pub struct DecisionOutboxChannelMessage { pub struct System { pub system_notifier: broadcast::Sender, pub is_shutdown: bool, + /// Unique identifier of the system - container or pod name/id + pub name: String, } #[async_trait] diff --git a/packages/talos_certifier/src/model/decision_headers.rs b/packages/talos_certifier/src/model/decision_headers.rs new file mode 100644 index 00000000..0b4ecf8d --- /dev/null +++ b/packages/talos_certifier/src/model/decision_headers.rs @@ -0,0 +1,240 @@ +use ahash::AHashMap; + +use crate::core::MessageVariant; + +use super::{DecisionMessage, DecisionMessageTrait}; + +#[derive(Debug, Default, Clone)] +pub struct DecisionMetaHeaders { + message_type: String, + message_encoding: String, + producer: String, + major_version: u64, +} + +impl DecisionMetaHeaders { + pub fn new(major_version: u64, producer: String, message_encoding: Option) -> Self { + Self { + message_type: MessageVariant::Decision.to_string(), + message_encoding: message_encoding.unwrap_or("application/json".to_string()), + major_version, + producer, + } + } +} + +#[derive(Debug, Default, Clone)] +pub struct DecisionCertHeaders { + cert_xid: String, + cert_version: u64, + cert_safepoint: Option, + cert_time: Option, + cert_agent: String, +} + +impl DecisionCertHeaders { + pub fn new(decision_message: &DecisionMessage) -> Self { + Self { + cert_xid: decision_message.xid.to_string(), + cert_agent: decision_message.agent.to_string(), + cert_time: decision_message.time.clone(), + cert_safepoint: decision_message.safepoint, + cert_version: decision_message.get_candidate_version(), + } + } +} + +// region: states +#[derive(Debug, Default, Clone)] +pub struct NoMetaHeaders; + +#[derive(Debug, Default, Clone)] +pub struct MetaHeaders(DecisionMetaHeaders); + +#[derive(Debug, Default, Clone)] +pub struct NoCertHeaders; + +#[derive(Debug, Default, Clone)] +pub struct CertHeaders(DecisionCertHeaders); + +// endregion: states + +#[derive(Debug, Default, Clone)] +pub struct DecisionHeaderBuilder { + pub meta_headers: V, + pub cert_headers: C, + pub additional_headers: Option>, +} + +impl DecisionHeaderBuilder { + pub fn new() -> Self { + Self { + ..DecisionHeaderBuilder::default() + } + } + pub fn with_additional_headers(additional_headers: AHashMap) -> Self { + Self { + additional_headers: Some(additional_headers), + ..DecisionHeaderBuilder::new() + } + } +} + +impl DecisionHeaderBuilder { + pub fn add_meta_headers(self, meta_headers: DecisionMetaHeaders) -> DecisionHeaderBuilder { + DecisionHeaderBuilder { + meta_headers: MetaHeaders(meta_headers), + cert_headers: self.cert_headers, + additional_headers: self.additional_headers, + } + } +} + +impl DecisionHeaderBuilder { + pub fn add_cert_headers(self, cert_headers: DecisionCertHeaders) -> DecisionHeaderBuilder { + DecisionHeaderBuilder { + cert_headers: CertHeaders(cert_headers), + meta_headers: self.meta_headers, + additional_headers: self.additional_headers, + } + } +} + +impl DecisionHeaderBuilder { + pub fn build(self) -> AHashMap { + let cert_headers = self.cert_headers.0; + let meta_headers = self.meta_headers.0; + + let mut headers = AHashMap::new(); + + // candidate headers carried over + if let Some(candidate_headers) = self.additional_headers { + headers.extend(candidate_headers); + } + + // meta headers + headers.insert("majorVersion".to_owned(), meta_headers.major_version.to_string()); + headers.insert("messageType".to_owned(), meta_headers.message_type); + headers.insert("messageEncoding".to_owned(), meta_headers.message_encoding); + headers.insert("producer".to_owned(), meta_headers.producer); + + // certifier specific headers + headers.insert("certVersion".to_owned(), cert_headers.cert_version.to_string()); + headers.insert("certXid".to_owned(), cert_headers.cert_xid); + headers.insert("certAgent".to_owned(), cert_headers.cert_agent); + + if let Some(cert_time) = cert_headers.cert_time { + headers.insert("certTime".to_owned(), cert_time); + } + if let Some(cert_safepoint) = cert_headers.cert_safepoint { + headers.insert("certSafepoint".to_owned(), cert_safepoint.to_string()); + } + + headers + } +} + +#[cfg(test)] +mod tests { + + use ahash::AHashMap; + + use crate::model::decision_headers::{DecisionCertHeaders, DecisionHeaderBuilder, DecisionMetaHeaders}; + + #[test] + fn test_decision_header_with_message_encoding_field_default() { + let decision_meta_headers = DecisionMetaHeaders::new(1_u64, "test_producer".to_string(), None); + let decision_cert_headers = DecisionCertHeaders { + cert_xid: "abcd".to_string(), + cert_version: 100, + cert_safepoint: Some(29), + cert_time: Some("2024-10-20.12:32:31.12323Z".to_owned()), + cert_agent: "some-agent".to_owned(), + }; + + let decision_headers = DecisionHeaderBuilder::new() + .add_meta_headers(decision_meta_headers.clone()) + .add_cert_headers(decision_cert_headers.clone()) + .build(); + + assert_eq!( + decision_headers.get("certXid").unwrap().to_owned(), + decision_cert_headers.cert_xid, + "certXid does not match" + ); + assert_eq!( + decision_headers.get("majorVersion").unwrap().to_owned(), + decision_meta_headers.major_version.to_string(), + "majorVersion doesn't match" + ); + + // test encoding is the default value of "application/json" + assert_eq!( + decision_headers.get("messageEncoding").unwrap().to_owned(), + "application/json".to_string(), + "messageEncoding does not match default application/json" + ); + + assert_eq!( + decision_headers.get("messageEncoding").unwrap().to_owned(), + decision_meta_headers.message_encoding, + "messageEncoding doesn't match" + ); + } + #[test] + fn test_decision_header_with_message_encoding_field_custom() { + let decision_meta_headers = DecisionMetaHeaders::new(1_u64, "test_producer".to_string(), Some("another_encoding".to_owned())); + let decision_cert_headers = DecisionCertHeaders { + cert_xid: "abcd".to_string(), + cert_version: 100, + cert_safepoint: Some(29), + cert_time: Some("2024-10-20.12:32:31.12323Z".to_owned()), + cert_agent: "some-agent".to_owned(), + }; + + let decision_headers = DecisionHeaderBuilder::new() + .add_meta_headers(decision_meta_headers.clone()) + .add_cert_headers(decision_cert_headers.clone()) + .build(); + + // test encoding is not the default value of "application/json" + assert_ne!( + decision_headers.get("messageEncoding").unwrap().to_owned(), + "application/json".to_string(), + "messageEncoding must not match default application/json" + ); + + assert_eq!( + decision_headers.get("messageEncoding").unwrap().to_owned(), + decision_meta_headers.message_encoding, + "messageEncoding doesn't match" + ); + } + + #[test] + fn test_decision_header_with_additional_headers() { + let decision_meta_headers = DecisionMetaHeaders::new(1_u64, "test_producer".to_string(), None); + let decision_cert_headers = DecisionCertHeaders { + cert_xid: "abcd".to_string(), + cert_version: 100, + cert_safepoint: Some(29), + cert_time: Some("2024-10-20.12:32:31.12323Z".to_owned()), + cert_agent: "some-agent".to_owned(), + }; + + let mut additiona_headers = AHashMap::new(); + additiona_headers.insert("test-header-1".to_owned(), "test-header-1-value".to_owned()); + additiona_headers.insert("correlationId".to_owned(), "eb10b6e1-a7cf-4b44-94da-6cd007030d81".to_owned()); + + let decision_headers = DecisionHeaderBuilder::with_additional_headers(additiona_headers.clone()) + .add_meta_headers(decision_meta_headers.clone()) + .add_cert_headers(decision_cert_headers.clone()) + .build(); + + assert_eq!( + decision_headers.get("correlationId").unwrap(), + additiona_headers.get("correlationId").unwrap(), + "correlationId must match" + ); + } +} diff --git a/packages/talos_certifier/src/model/decision_message.rs b/packages/talos_certifier/src/model/decision_message.rs index 30bd5c85..b358f5ce 100644 --- a/packages/talos_certifier/src/model/decision_message.rs +++ b/packages/talos_certifier/src/model/decision_message.rs @@ -1,10 +1,14 @@ // use super::CandidateMessage; use serde::{Deserialize, Serialize}; +use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; use crate::certifier::Outcome; use super::{candidate_message::CandidateMessage, metrics::TxProcessingTimeline}; +pub const DEFAULT_DECISION_MESSAGE_VERSION: u64 = 1_u64; + #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] pub enum Decision { #[serde(rename = "committed")] @@ -24,6 +28,10 @@ pub struct DecisionMessage { pub decision: Decision, pub suffix_start: u64, + /// timestamp when certification/decision was made. + #[serde(skip_serializing_if = "Option::is_none")] + pub time: Option, + /// the version for which the decision was made. pub version: u64, /// If a duplicate was found on XDB, this field will hold the new duplicate candidate @@ -89,10 +97,13 @@ impl DecisionMessage { Outcome::Aborted { version, discord: _ } => (Decision::Aborted, None, version), }; + let time = OffsetDateTime::now_utc().format(&Rfc3339).ok(); + Self { xid: xid.clone(), agent: agent.clone(), cohort: cohort.clone(), + time, decision, suffix_start, version: *version, diff --git a/packages/talos_certifier/src/model/mod.rs b/packages/talos_certifier/src/model/mod.rs index 5134a53b..ef7c9416 100644 --- a/packages/talos_certifier/src/model/mod.rs +++ b/packages/talos_certifier/src/model/mod.rs @@ -1,7 +1,8 @@ mod candidate_message; +pub mod decision_headers; mod decision_message; pub mod delivery_order; pub mod metrics; pub use candidate_message::{CandidateMessage, CandidateReadWriteSet}; -pub use decision_message::{Decision, DecisionMessage, DecisionMessageTrait}; +pub use decision_message::{Decision, DecisionMessage, DecisionMessageTrait, DEFAULT_DECISION_MESSAGE_VERSION}; diff --git a/packages/talos_certifier/src/services/certifier_service.rs b/packages/talos_certifier/src/services/certifier_service.rs index bc1a041e..dde7df1f 100644 --- a/packages/talos_certifier/src/services/certifier_service.rs +++ b/packages/talos_certifier/src/services/certifier_service.rs @@ -5,7 +5,6 @@ use async_trait::async_trait; use log::{debug, error, warn}; use talos_suffix::core::SuffixConfig; use talos_suffix::{get_nonempty_suffix_items, Suffix, SuffixTrait}; -use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use tokio::sync::mpsc; @@ -151,14 +150,9 @@ impl CertifierService { Some(ChannelMessage::Candidate(candidate)) => { let decision_message = self.process_candidate(&candidate.message)?; - let mut headers = candidate.headers.clone(); - if let Ok(cert_time) = OffsetDateTime::now_utc().format(&Rfc3339) { - headers.insert("certTime".to_owned(), cert_time); - } - let decision_outbox_channel_message = DecisionOutboxChannelMessage { message: decision_message.clone(), - headers, + headers: candidate.headers.clone(), }; Ok(self diff --git a/packages/talos_certifier/src/services/decision_outbox_service.rs b/packages/talos_certifier/src/services/decision_outbox_service.rs index 68c0e055..a6eac252 100644 --- a/packages/talos_certifier/src/services/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/decision_outbox_service.rs @@ -8,8 +8,10 @@ use time::OffsetDateTime; use tokio::sync::mpsc; use crate::core::ServiceResult; +use crate::model::decision_headers::{DecisionCertHeaders, DecisionHeaderBuilder, DecisionMetaHeaders}; +use crate::model::DEFAULT_DECISION_MESSAGE_VERSION; use crate::{ - core::{DecisionOutboxChannelMessage, MessageVariant, System, SystemService}, + core::{DecisionOutboxChannelMessage, System, SystemService}, errors::{SystemServiceError, SystemServiceErrorKind}, model::DecisionMessage, ports::{DecisionStore, MessagePublisher}, @@ -87,27 +89,15 @@ impl DecisionOutboxService { }) })?; - let mut decision_publish_header = headers; - decision_publish_header.insert("messageType".to_string(), MessageVariant::Decision.to_string()); - decision_publish_header.insert("certXid".to_string(), decision_message.xid.to_owned()); - - if let Some(safepoint) = decision_message.safepoint { - decision_publish_header.insert("certSafepoint".to_string(), safepoint.to_string()); - } - decision_publish_header.insert("certAgent".to_string(), decision_message.agent.to_owned()); - debug!("Publishing message {}", decision_message.version); - publisher - .publish_message(xid.as_str(), &decision_str, decision_publish_header.clone()) - .await - .map_err(|publish_error| { - Box::new(SystemServiceError { - kind: SystemServiceErrorKind::MessagePublishError, - reason: publish_error.reason, - data: publish_error.data, //Some(format!("{:?}", decision_message)), - service: "Decision Outbox Service".to_string(), - }) + publisher.publish_message(xid.as_str(), &decision_str, headers).await.map_err(|publish_error| { + Box::new(SystemServiceError { + kind: SystemServiceErrorKind::MessagePublishError, + reason: publish_error.reason, + data: publish_error.data, //Some(format!("{:?}", decision_message)), + service: "Decision Outbox Service".to_string(), }) + }) } } @@ -123,21 +113,35 @@ impl SystemService for DecisionOutboxService { headers, message: decision_message, } = decision_channel_message; - tokio::spawn(async move { - match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await { - Ok(decision) => { - if let Err(publish_error) = DecisionOutboxService::publish_decision(&publisher, &decision, headers).await { - error!( - "Error publishing message for version={} with reason={:?}", - decision.version, - publish_error.to_string() - ); + tokio::spawn({ + let decision_headers = DecisionHeaderBuilder::with_additional_headers(headers.into()).add_meta_headers(DecisionMetaHeaders::new( + DEFAULT_DECISION_MESSAGE_VERSION, // major version of decision message + self.system.name.clone(), + None, + )); + + async move { + match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await { + Ok(decision) => { + if let Err(publish_error) = DecisionOutboxService::publish_decision( + &publisher, + &decision, + decision_headers.add_cert_headers(DecisionCertHeaders::new(&decision)).build().into(), + ) + .await + { + error!( + "Error publishing message for version={} with reason={:?}", + decision.version, + publish_error.to_string() + ); + } + } + Err(db_error) => { + system.system_notifier.send(SystemMessage::ShutdownWithError(db_error)).unwrap(); } - } - Err(db_error) => { - system.system_notifier.send(SystemMessage::ShutdownWithError(db_error)).unwrap(); - } - }; + }; + } }); }; diff --git a/packages/talos_certifier/src/services/tests/certifier_service.rs b/packages/talos_certifier/src/services/tests/certifier_service.rs index 38974106..70799efe 100644 --- a/packages/talos_certifier/src/services/tests/certifier_service.rs +++ b/packages/talos_certifier/src/services/tests/certifier_service.rs @@ -59,6 +59,7 @@ async fn test_certification_rule_2() { let system = System { system_notifier, is_shutdown: false, + name: "test-system".to_string(), }; let mut certifier_svc = CertifierService::new(message_channel_rx, do_channel_tx, Arc::new(0.into()), system, None); @@ -133,6 +134,7 @@ async fn test_error_in_processing_candidate_message_certifying() { let system = System { system_notifier, is_shutdown: false, + name: "test-system".to_string(), }; let mut certifier_svc = CertifierService::new(message_channel_rx, do_channel_tx, Arc::new(0.into()), system, None); @@ -182,6 +184,7 @@ async fn test_certification_process_decision() { let system = System { system_notifier, is_shutdown: false, + name: "test-system".to_string(), }; let commit_state: Arc = Arc::new(0.into()); @@ -250,6 +253,7 @@ async fn test_certification_process_decision_incorrect_version() { let system = System { system_notifier, is_shutdown: false, + name: "test-system".to_string(), }; let mut certifier_svc = CertifierService::new(message_channel_rx, do_channel_tx, Arc::new(0.into()), system, None); @@ -317,6 +321,7 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() { let system = System { system_notifier, is_shutdown: false, + name: "test-system".to_string(), }; let mut certifier_svc = CertifierService::new( @@ -550,6 +555,7 @@ async fn test_certification_check_suffix_prune_is_not_at_threshold() { let system = System { system_notifier, is_shutdown: false, + name: "test-system".to_string(), }; let mut certifier_svc = CertifierService::new( diff --git a/packages/talos_certifier/src/services/tests/decision_outbox_service.rs b/packages/talos_certifier/src/services/tests/decision_outbox_service.rs index 11f97068..51351c0c 100644 --- a/packages/talos_certifier/src/services/tests/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/tests/decision_outbox_service.rs @@ -95,6 +95,7 @@ async fn test_candidate_message_create_decision_message() { let system = System { system_notifier, is_shutdown: false, + name: "test-system".to_string(), }; //clones @@ -116,6 +117,7 @@ async fn test_candidate_message_create_decision_message() { xid: "test-xid-1".to_owned(), agent: "test-agent-1".to_owned(), cohort: "test-cohort-1".to_owned(), + time: Some("2021-08-02T11:21:34.523Z".to_owned()), decision: Decision::Committed, suffix_start: 2, version: 4, @@ -156,6 +158,7 @@ async fn test_save_and_publish_multiple_decisions() { let system = System { system_notifier, is_shutdown: false, + name: "test-system".to_string(), }; //clones @@ -178,6 +181,7 @@ async fn test_save_and_publish_multiple_decisions() { xid: "test-xid-1".to_owned(), agent: "test-agent-1".to_owned(), cohort: "test-cohort-1".to_owned(), + time: Some("2021-08-02T11:21:34.523Z".to_owned()), decision: Decision::Committed, suffix_start: 2, version: 4, @@ -199,6 +203,7 @@ async fn test_save_and_publish_multiple_decisions() { xid: "test-xid-2".to_owned(), agent: "test-agent-1".to_owned(), cohort: "test-cohort-1".to_owned(), + time: Some("2021-08-02T11:21:34.523Z".to_owned()), decision: Decision::Committed, suffix_start: 2, version: 4, @@ -277,6 +282,7 @@ async fn test_capture_child_thread_dberror() { let system = System { system_notifier, is_shutdown: false, + name: "test-system".to_string(), }; //clones @@ -298,6 +304,7 @@ async fn test_capture_child_thread_dberror() { xid: "test-xid-1".to_owned(), agent: "test-agent-1".to_owned(), cohort: "test-cohort-1".to_owned(), + time: Some("2021-08-02T11:21:34.523Z".to_owned()), decision: Decision::Committed, suffix_start: 2, version: 4, @@ -352,6 +359,7 @@ async fn test_capture_publish_error() { xid: "test-xid-1".to_owned(), agent: "test-agent-1".to_owned(), cohort: "test-cohort-1".to_owned(), + time: Some("2021-08-02T11:21:34.523Z".to_owned()), decision: Decision::Committed, suffix_start: 2, version: 4, @@ -412,6 +420,7 @@ async fn test_duplicate_version_found_in_db() { xid: "test-xid-1".to_owned(), agent: "test-agent-1".to_owned(), cohort: "test-cohort-1".to_owned(), + time: Some("2021-08-02T11:21:34.523Z".to_owned()), decision: Decision::Committed, suffix_start: 2, version: 4, @@ -429,6 +438,7 @@ async fn test_duplicate_version_found_in_db() { xid: "test-xid-1".to_owned(), agent: "test-agent-2".to_owned(), cohort: "test-cohort-1".to_owned(), + time: Some("2021-08-02T11:21:34.523Z".to_owned()), decision: Decision::Committed, suffix_start: 5, version: 8, diff --git a/packages/talos_certifier/src/services/tests/message_receiver_service.rs b/packages/talos_certifier/src/services/tests/message_receiver_service.rs index c8d3a278..f4dfc20f 100644 --- a/packages/talos_certifier/src/services/tests/message_receiver_service.rs +++ b/packages/talos_certifier/src/services/tests/message_receiver_service.rs @@ -83,6 +83,7 @@ async fn test_consume_message() { let system = System { system_notifier, is_shutdown: false, + name: "test-system".to_string(), }; let commit_offset: Arc = Arc::new(0.into()); @@ -141,6 +142,7 @@ async fn test_consume_message_error() { let system = System { system_notifier, is_shutdown: false, + name: "test-system".to_string(), }; let commit_offset: Arc = Arc::new(0.into()); diff --git a/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs b/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs index 8394e080..27de958e 100644 --- a/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs +++ b/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs @@ -40,6 +40,9 @@ pub struct Configuration { pub kafka_config: KafkaConfig, pub certifier_mock: bool, pub db_mock: bool, + /// If not set, defaults to `talos_certifier`. + /// Setting this can be useful to distinguish unique deployed version of talos certifier for easier debugging. + pub app_name: Option, } /// Talos certifier instantiated with Kafka as Abcast and Postgres as XDB. @@ -52,6 +55,7 @@ pub async fn certifier_with_kafka_pg( let system = System { system_notifier, is_shutdown: false, + name: configuration.app_name.unwrap_or("talos_certifier".to_string()), }; let (tx, rx) = mpsc::channel(channel_buffer.message_receiver); diff --git a/packages/talos_certifier_adapters/src/mock_certifier_service.rs b/packages/talos_certifier_adapters/src/mock_certifier_service.rs index 1a483109..14a85439 100644 --- a/packages/talos_certifier_adapters/src/mock_certifier_service.rs +++ b/packages/talos_certifier_adapters/src/mock_certifier_service.rs @@ -38,6 +38,7 @@ impl SystemService for MockCertifierService { decision: Decision::Committed, agent: message.agent, cohort: message.cohort, + time: Some("2021-08-02T11:21:34.523Z".to_owned()), xid: message.xid, suffix_start: 0, safepoint: Some(0),