Skip to content

Commit

Permalink
feat: add missing decision headers and carry forward headers to on_co…
Browse files Browse the repository at this point in the history
…mmit actions (#107)

* feat: add missing decision headers and carry forward to on_commit actions

* chore: use constant for the default major_version 1 for decision message
  • Loading branch information
gk-kindred authored Sep 9, 2024
1 parent bc281df commit ae02be5
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 42 deletions.
1 change: 1 addition & 0 deletions examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async fn main() -> Result<(), impl std::error::Error> {
pg_config: Some(pg_config),
kafka_config,
db_mock: mock_config.db_mock,
app_name: None,
};

let talos_certifier = certifier_with_kafka_pg(TalosCertifierChannelBuffers::default(), configuration).await?;
Expand Down
2 changes: 2 additions & 0 deletions packages/talos_certifier/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct DecisionOutboxChannelMessage {
pub struct System {
pub system_notifier: broadcast::Sender<SystemMessage>,
pub is_shutdown: bool,
/// Unique identifier of the system - container or pod name/id
pub name: String,
}

#[async_trait]
Expand Down
240 changes: 240 additions & 0 deletions packages/talos_certifier/src/model/decision_headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
use ahash::AHashMap;

use crate::core::MessageVariant;

use super::{DecisionMessage, DecisionMessageTrait};

#[derive(Debug, Default, Clone)]
pub struct DecisionMetaHeaders {
message_type: String,
message_encoding: String,
producer: String,
major_version: u64,
}

impl DecisionMetaHeaders {
pub fn new(major_version: u64, producer: String, message_encoding: Option<String>) -> Self {
Self {
message_type: MessageVariant::Decision.to_string(),
message_encoding: message_encoding.unwrap_or("application/json".to_string()),
major_version,
producer,
}
}
}

#[derive(Debug, Default, Clone)]
pub struct DecisionCertHeaders {
cert_xid: String,
cert_version: u64,
cert_safepoint: Option<u64>,
cert_time: Option<String>,
cert_agent: String,
}

impl DecisionCertHeaders {
pub fn new(decision_message: &DecisionMessage) -> Self {
Self {
cert_xid: decision_message.xid.to_string(),
cert_agent: decision_message.agent.to_string(),
cert_time: decision_message.time.clone(),
cert_safepoint: decision_message.safepoint,
cert_version: decision_message.get_candidate_version(),
}
}
}

// region: states
#[derive(Debug, Default, Clone)]
pub struct NoMetaHeaders;

#[derive(Debug, Default, Clone)]
pub struct MetaHeaders(DecisionMetaHeaders);

#[derive(Debug, Default, Clone)]
pub struct NoCertHeaders;

#[derive(Debug, Default, Clone)]
pub struct CertHeaders(DecisionCertHeaders);

// endregion: states

#[derive(Debug, Default, Clone)]
pub struct DecisionHeaderBuilder<V, C> {
pub meta_headers: V,
pub cert_headers: C,
pub additional_headers: Option<AHashMap<String, String>>,
}

impl DecisionHeaderBuilder<NoMetaHeaders, NoCertHeaders> {
pub fn new() -> Self {
Self {
..DecisionHeaderBuilder::default()
}
}
pub fn with_additional_headers(additional_headers: AHashMap<String, String>) -> Self {
Self {
additional_headers: Some(additional_headers),
..DecisionHeaderBuilder::new()
}
}
}

impl<C> DecisionHeaderBuilder<NoMetaHeaders, C> {
pub fn add_meta_headers(self, meta_headers: DecisionMetaHeaders) -> DecisionHeaderBuilder<MetaHeaders, C> {
DecisionHeaderBuilder {
meta_headers: MetaHeaders(meta_headers),
cert_headers: self.cert_headers,
additional_headers: self.additional_headers,
}
}
}

impl<V> DecisionHeaderBuilder<V, NoCertHeaders> {
pub fn add_cert_headers(self, cert_headers: DecisionCertHeaders) -> DecisionHeaderBuilder<V, CertHeaders> {
DecisionHeaderBuilder {
cert_headers: CertHeaders(cert_headers),
meta_headers: self.meta_headers,
additional_headers: self.additional_headers,
}
}
}

impl DecisionHeaderBuilder<MetaHeaders, CertHeaders> {
pub fn build(self) -> AHashMap<String, String> {
let cert_headers = self.cert_headers.0;
let meta_headers = self.meta_headers.0;

let mut headers = AHashMap::new();

// candidate headers carried over
if let Some(candidate_headers) = self.additional_headers {
headers.extend(candidate_headers);
}

// meta headers
headers.insert("majorVersion".to_owned(), meta_headers.major_version.to_string());
headers.insert("messageType".to_owned(), meta_headers.message_type);
headers.insert("messageEncoding".to_owned(), meta_headers.message_encoding);
headers.insert("producer".to_owned(), meta_headers.producer);

// certifier specific headers
headers.insert("certVersion".to_owned(), cert_headers.cert_version.to_string());
headers.insert("certXid".to_owned(), cert_headers.cert_xid);
headers.insert("certAgent".to_owned(), cert_headers.cert_agent);

if let Some(cert_time) = cert_headers.cert_time {
headers.insert("certTime".to_owned(), cert_time);
}
if let Some(cert_safepoint) = cert_headers.cert_safepoint {
headers.insert("certSafepoint".to_owned(), cert_safepoint.to_string());
}

headers
}
}

#[cfg(test)]
mod tests {

use ahash::AHashMap;

use crate::model::decision_headers::{DecisionCertHeaders, DecisionHeaderBuilder, DecisionMetaHeaders};

#[test]
fn test_decision_header_with_message_encoding_field_default() {
let decision_meta_headers = DecisionMetaHeaders::new(1_u64, "test_producer".to_string(), None);
let decision_cert_headers = DecisionCertHeaders {
cert_xid: "abcd".to_string(),
cert_version: 100,
cert_safepoint: Some(29),
cert_time: Some("2024-10-20.12:32:31.12323Z".to_owned()),
cert_agent: "some-agent".to_owned(),
};

let decision_headers = DecisionHeaderBuilder::new()
.add_meta_headers(decision_meta_headers.clone())
.add_cert_headers(decision_cert_headers.clone())
.build();

assert_eq!(
decision_headers.get("certXid").unwrap().to_owned(),
decision_cert_headers.cert_xid,
"certXid does not match"
);
assert_eq!(
decision_headers.get("majorVersion").unwrap().to_owned(),
decision_meta_headers.major_version.to_string(),
"majorVersion doesn't match"
);

// test encoding is the default value of "application/json"
assert_eq!(
decision_headers.get("messageEncoding").unwrap().to_owned(),
"application/json".to_string(),
"messageEncoding does not match default application/json"
);

assert_eq!(
decision_headers.get("messageEncoding").unwrap().to_owned(),
decision_meta_headers.message_encoding,
"messageEncoding doesn't match"
);
}
#[test]
fn test_decision_header_with_message_encoding_field_custom() {
let decision_meta_headers = DecisionMetaHeaders::new(1_u64, "test_producer".to_string(), Some("another_encoding".to_owned()));
let decision_cert_headers = DecisionCertHeaders {
cert_xid: "abcd".to_string(),
cert_version: 100,
cert_safepoint: Some(29),
cert_time: Some("2024-10-20.12:32:31.12323Z".to_owned()),
cert_agent: "some-agent".to_owned(),
};

let decision_headers = DecisionHeaderBuilder::new()
.add_meta_headers(decision_meta_headers.clone())
.add_cert_headers(decision_cert_headers.clone())
.build();

// test encoding is not the default value of "application/json"
assert_ne!(
decision_headers.get("messageEncoding").unwrap().to_owned(),
"application/json".to_string(),
"messageEncoding must not match default application/json"
);

assert_eq!(
decision_headers.get("messageEncoding").unwrap().to_owned(),
decision_meta_headers.message_encoding,
"messageEncoding doesn't match"
);
}

#[test]
fn test_decision_header_with_additional_headers() {
let decision_meta_headers = DecisionMetaHeaders::new(1_u64, "test_producer".to_string(), None);
let decision_cert_headers = DecisionCertHeaders {
cert_xid: "abcd".to_string(),
cert_version: 100,
cert_safepoint: Some(29),
cert_time: Some("2024-10-20.12:32:31.12323Z".to_owned()),
cert_agent: "some-agent".to_owned(),
};

let mut additiona_headers = AHashMap::new();
additiona_headers.insert("test-header-1".to_owned(), "test-header-1-value".to_owned());
additiona_headers.insert("correlationId".to_owned(), "eb10b6e1-a7cf-4b44-94da-6cd007030d81".to_owned());

let decision_headers = DecisionHeaderBuilder::with_additional_headers(additiona_headers.clone())
.add_meta_headers(decision_meta_headers.clone())
.add_cert_headers(decision_cert_headers.clone())
.build();

assert_eq!(
decision_headers.get("correlationId").unwrap(),
additiona_headers.get("correlationId").unwrap(),
"correlationId must match"
);
}
}
11 changes: 11 additions & 0 deletions packages/talos_certifier/src/model/decision_message.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// use super::CandidateMessage;
use serde::{Deserialize, Serialize};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;

use crate::certifier::Outcome;

use super::{candidate_message::CandidateMessage, metrics::TxProcessingTimeline};

pub const DEFAULT_DECISION_MESSAGE_VERSION: u64 = 1_u64;

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub enum Decision {
#[serde(rename = "committed")]
Expand All @@ -24,6 +28,10 @@ pub struct DecisionMessage {
pub decision: Decision,
pub suffix_start: u64,

/// timestamp when certification/decision was made.
#[serde(skip_serializing_if = "Option::is_none")]
pub time: Option<String>,

/// the version for which the decision was made.
pub version: u64,
/// If a duplicate was found on XDB, this field will hold the new duplicate candidate
Expand Down Expand Up @@ -89,10 +97,13 @@ impl DecisionMessage {
Outcome::Aborted { version, discord: _ } => (Decision::Aborted, None, version),
};

let time = OffsetDateTime::now_utc().format(&Rfc3339).ok();

Self {
xid: xid.clone(),
agent: agent.clone(),
cohort: cohort.clone(),
time,
decision,
suffix_start,
version: *version,
Expand Down
3 changes: 2 additions & 1 deletion packages/talos_certifier/src/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
mod candidate_message;
pub mod decision_headers;
mod decision_message;
pub mod delivery_order;
pub mod metrics;

pub use candidate_message::{CandidateMessage, CandidateReadWriteSet};
pub use decision_message::{Decision, DecisionMessage, DecisionMessageTrait};
pub use decision_message::{Decision, DecisionMessage, DecisionMessageTrait, DEFAULT_DECISION_MESSAGE_VERSION};
8 changes: 1 addition & 7 deletions packages/talos_certifier/src/services/certifier_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use async_trait::async_trait;
use log::{debug, error, warn};
use talos_suffix::core::SuffixConfig;
use talos_suffix::{get_nonempty_suffix_items, Suffix, SuffixTrait};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -151,14 +150,9 @@ impl CertifierService {
Some(ChannelMessage::Candidate(candidate)) => {
let decision_message = self.process_candidate(&candidate.message)?;

let mut headers = candidate.headers.clone();
if let Ok(cert_time) = OffsetDateTime::now_utc().format(&Rfc3339) {
headers.insert("certTime".to_owned(), cert_time);
}

let decision_outbox_channel_message = DecisionOutboxChannelMessage {
message: decision_message.clone(),
headers,
headers: candidate.headers.clone(),
};

Ok(self
Expand Down
Loading

0 comments on commit ae02be5

Please sign in to comment.