Skip to content

Commit

Permalink
feat: talos messenger suffix prune and message receiver offset commit (
Browse files Browse the repository at this point in the history
…#84)

* feat: add messenger suffix pruning

* feat: add suffix prune and commit receiver
  • Loading branch information
gk-kindred authored Oct 2, 2023
1 parent 4c58df0 commit f97e6f5
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 68 deletions.
18 changes: 18 additions & 0 deletions packages/talos_certifier/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,24 @@ impl From<MessageReceiverError> for SystemServiceError {
}
}
}
impl From<MessageReceiverError> for Box<SystemServiceError> {
fn from(msg_rx_error: MessageReceiverError) -> Self {
match msg_rx_error.kind {
MessageReceiverErrorKind::ParseError => Box::new(SystemServiceError {
kind: SystemServiceErrorKind::ParseError,
reason: msg_rx_error.reason,
data: msg_rx_error.data,
service: "Message Receiver Servicer".to_string(),
}),
_ => Box::new(SystemServiceError {
kind: SystemServiceErrorKind::MessageReceiverError(msg_rx_error.kind),
reason: msg_rx_error.reason,
data: msg_rx_error.data,
service: "Message Receiver Servicer".to_string(),
}),
}
}
}
impl From<MessagePublishError> for SystemServiceError {
fn from(msg_rx_error: MessagePublishError) -> Self {
SystemServiceError {
Expand Down
3 changes: 2 additions & 1 deletion packages/talos_certifier/src/ports/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub trait MessageReciever: SharedPortTraits {
async fn subscribe(&self) -> Result<(), SystemServiceError>;
async fn commit(&self) -> Result<(), SystemServiceError>;
fn commit_async(&self) -> Option<JoinHandle<Result<(), SystemServiceError>>>;
async fn update_savepoint(&mut self, offset: i64) -> Result<(), SystemServiceError>;
fn update_savepoint(&mut self, offset: i64) -> Result<(), Box<SystemServiceError>>;
async fn update_savepoint_async(&mut self, offset: i64) -> Result<(), SystemServiceError>;
async fn unsubscribe(&self);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl SystemService for MessageReceiverService {
//** commit message
_ = self.commit_interval.tick() => {
let offset = self.commit_offset.load(std::sync::atomic::Ordering::Relaxed);
self.receiver.update_savepoint(offset).await?;
self.receiver.update_savepoint(offset)?;
self.receiver.commit_async();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ impl MessageReciever for MockReciever {
fn commit_async(&self) -> Option<JoinHandle<Result<(), SystemServiceError>>> {
None
}
async fn update_savepoint(&mut self, _version: i64) -> Result<(), SystemServiceError> {
fn update_savepoint(&mut self, _version: i64) -> Result<(), Box<SystemServiceError>> {
Ok(())
}
async fn update_savepoint_async(&mut self, _version: i64) -> Result<(), SystemServiceError> {
Ok(())
}

Expand Down
7 changes: 6 additions & 1 deletion packages/talos_certifier_adapters/src/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl MessageReciever for KafkaConsumer {
}
}

async fn update_savepoint(&mut self, offset: i64) -> Result<(), SystemServiceError> {
fn update_savepoint(&mut self, offset: i64) -> Result<(), Box<SystemServiceError>> {
// let partition = self.tpl.;
let tpl = self.tpl.elements_for_topic(&self.topic);
if !tpl.is_empty() {
Expand All @@ -222,6 +222,11 @@ impl MessageReciever for KafkaConsumer {
}
Ok(())
}

async fn update_savepoint_async(&mut self, _offset: i64) -> Result<(), SystemServiceError> {
// For future, maybe for another abcast. Not needed in here.
unimplemented!()
}
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion packages/talos_cohort_replicator/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ where

pub(crate) async fn commit(&mut self) {
if let Some(version) = self.next_commit_offset {
self.receiver.update_savepoint(version as i64).await.unwrap();
self.receiver.update_savepoint(version as i64).unwrap();
self.receiver.commit_async();
self.next_commit_offset = None;
}
Expand Down
59 changes: 27 additions & 32 deletions packages/talos_messenger_core/src/services/inbound_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use ahash::{HashMap, HashMapExt};
use async_trait::async_trait;
use log::{error, info, warn};
use log::{debug, info, warn};

use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage};
use talos_suffix::{Suffix, SuffixTrait};
Expand Down Expand Up @@ -49,11 +49,6 @@ where
async fn process_next_actions(&mut self) -> MessengerServiceResult {
let items_to_process = self.suffix.get_suffix_items_to_process();

error!(
"Items to process count... {:#?}",
items_to_process.iter().map(|x| x.version).collect::<Vec<u64>>()
);

for item in items_to_process {
let ver = item.version;

Expand Down Expand Up @@ -83,24 +78,41 @@ where
/// Handles the feedback received from other services when they have successfully processed the action.
/// Will update the individual action for the count and completed flag and also update state of the suffix item.
///
pub(crate) fn handle_item_actioned_success(&mut self, version: u64, action_key: &str, total_count: u32) {
pub(crate) async fn handle_item_actioned_success(&mut self, version: u64, action_key: &str, total_count: u32) {
let item_state = self.suffix.get_item_state(version);
match item_state {
Some(SuffixItemState::Processing) | Some(SuffixItemState::PartiallyComplete) => {
self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete);

self.suffix.update_action(version, action_key, total_count);
if self.suffix.all_actions_completed(version) {
self.suffix.update_item_action(version, action_key, total_count);
if self.suffix.are_all_item_actions_completed(version) {
self.suffix
.set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed));

// Pruning of suffix.
self.suffix.update_prune_index_from_version(version);

debug!("[Actions] All actions in Version {version} completed!");
// Check prune eligibility by looking at the prune meta info.
if let Some(index_to_prune) = self.suffix.get_safe_prune_index() {
// Call prune method on suffix.
let _ = self.suffix.prune_till_index(index_to_prune);

// TODO: GK - Calculate the safe offset to commit.
let commit_offset = version + 1;
debug!("[Commit] Updating tpl to version .. {commit_offset}");
let _ = self.message_receiver.update_savepoint(commit_offset as i64);

self.message_receiver.commit_async();
}
}
debug!(
"[Action] State version={version} changed from {item_state:?} => {:?}",
self.suffix.get_item_state(version)
);
}
_ => (),
};
error!(
"State change for version={version} from {item_state:?} => {:?}",
self.suffix.get_item_state(version)
);
}
}

Expand Down Expand Up @@ -149,7 +161,6 @@ where
item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoCommitActions));

}
error!("[FILTERED ACTIONS] version={} state={:?} actions={:#?}", version, item_to_update.item.get_state(), item_to_update.item.get_commit_actions());
};

} else {
Expand All @@ -164,21 +175,12 @@ where

self.suffix.update_item_decision(version, decision_version, &decision_message);

self.process_next_actions().await?


// TODO: GK - Calculate the safe offset to commit.

// TODO: GK - Prune suffix.
self.process_next_actions().await?;

},
}

}
// Next condition - Commit, get processed/published info.



// Receive feedback from publisher.
Some(feedback) = self.rx_feedback_channel.recv() => {
match feedback {
Expand All @@ -187,14 +189,7 @@ where
MessengerChannelFeedback::Success(version, key, total_count) => {
info!("Successfully received version={version} count={total_count}");

self.handle_item_actioned_success(version, &key, total_count);


// self.suffix.messages.iter().flatten().for_each(|item|
// error!("version={} decision={:?} state={:?} action_state={:#?}", item.item_ver, item.item.decision, item.item.get_state(), item.item.commit_actions.iter().map(|x| (x.1.count, x.1.is_completed)).collect::<Vec<(u32, bool)>>())
// );
// info!("Suffix dump ={:?}")
// info!("State on completion ={:?}", item_state);
self.handle_item_actioned_success(version, &key, total_count).await;
},
}
// Process the next items with commit actions
Expand Down
86 changes: 57 additions & 29 deletions packages/talos_messenger_core/src/suffix.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use ahash::{HashMap, HashMapExt};
use log::{error, warn};
use log::{debug, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt::Debug;
use talos_certifier::model::{CandidateMessage, Decision, DecisionMessageTrait};
use talos_suffix::{core::SuffixMeta, Suffix, SuffixItem, SuffixTrait};

pub trait MessengerSuffixItemTrait {
pub trait MessengerSuffixItemTrait: Debug + Clone {
fn set_state(&mut self, state: SuffixItemState);
fn set_safepoint(&mut self, safepoint: Option<u64>);
fn set_commit_action(&mut self, commit_actions: HashMap<String, AllowedActionsMapItem>);
Expand All @@ -22,23 +22,33 @@ pub trait MessengerSuffixItemTrait {

pub trait MessengerSuffixTrait<T: MessengerSuffixItemTrait>: SuffixTrait<T> {
// Setters
/// Sets the state of an item by version.
fn set_item_state(&mut self, version: u64, process_state: SuffixItemState);

// Getters
/// Get suffix meta
fn get_meta(&self) -> &SuffixMeta;
/// Get suffix item as mutable reference.
fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem<T>>;
fn get_item_state(&self, version: u64) -> Option<SuffixItemState>;
fn get_last_installed(&self, to_version: Option<u64>) -> Option<&SuffixItem<T>>;
// fn update_suffix_item_decision(&mut self, version: u64, decision_ver: u64) -> SuffixResult<()>;
fn get_suffix_meta(&self) -> &SuffixMeta;
fn installed_all_prior_decided_items(&self, version: u64) -> bool;

/// Checks if suffix ready to prune
///
// fn is_safe_prune() -> bool;
/// Get the state of an item by version.
fn get_item_state(&self, version: u64) -> Option<SuffixItemState>;
/// Gets the suffix items eligible to process.
fn get_suffix_items_to_process(&self) -> Vec<ActionsMapWithVersion>;
// updates
fn update_prune_index(&mut self, version: u64);
/// Updates the decision for a version.
fn update_item_decision<D: DecisionMessageTrait>(&mut self, version: u64, decision_version: u64, decision_message: &D);
fn update_action(&mut self, version: u64, action_key: &str, total_count: u32);
/// Updates the action for a version using the action_key for lookup.
fn update_item_action(&mut self, version: u64, action_key: &str, total_count: u32);

/// Checks if all versions prioir to this version are already completed, and updates the prune index.
/// If the prune index was updated, returns the new prune_index, else returns None.
fn update_prune_index_from_version(&mut self, version: u64) -> Option<usize>;

fn all_actions_completed(&self, version: u64) -> bool;
/// Checks if all commit actions are completed for the version
fn are_all_item_actions_completed(&self, version: u64) -> bool;
}

#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -178,7 +188,7 @@ impl MessengerSuffixItemTrait for MessengerCandidate {

impl<T> MessengerSuffixTrait<T> for Suffix<T>
where
T: MessengerSuffixItemTrait + Debug + Clone,
T: MessengerSuffixItemTrait,
{
// TODO: GK - Elevate this to core suffix
fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem<T>> {
Expand All @@ -200,19 +210,11 @@ where
}
}

fn get_last_installed(&self, _to_version: Option<u64>) -> Option<&SuffixItem<T>> {
todo!()
}

// TODO: GK - Elevate this to core suffix
fn get_suffix_meta(&self) -> &SuffixMeta {
fn get_meta(&self) -> &SuffixMeta {
&self.meta
}

fn installed_all_prior_decided_items(&self, _version: u64) -> bool {
todo!()
}

fn get_suffix_items_to_process(&self) -> Vec<ActionsMapWithVersion> {
let items = self
.messages
Expand All @@ -224,14 +226,12 @@ where
// Take while contiguous ones, whose safepoint is already processed.
.take_while(|&x| {
let Some(safepoint) = x.item.get_safepoint() else {
error!("take while early exit for version {:?}", x.item_ver);
return false;
};

match self.get(*safepoint) {
// If we find the suffix item from the safepoint, we need to ensure that it already in `Complete` state
Ok(Some(safepoint_item)) => {
error!("State of safepoint items is {:?}", safepoint_item.item.get_state());
matches!(safepoint_item.item.get_state(), SuffixItemState::Complete(..))
}
// If we couldn't find the item in suffix, it could be because it was pruned and it is safe to assume that we can consider it.
Expand All @@ -247,10 +247,6 @@ where
items
}

fn update_prune_index(&mut self, _version: u64) {
todo!()
}

fn update_item_decision<D: DecisionMessageTrait>(&mut self, version: u64, decision_version: u64, decision_message: &D) {
let _ = self.update_decision_suffix_item(version, decision_version);

Expand All @@ -269,7 +265,7 @@ where
}
}

fn update_action(&mut self, version: u64, action_key: &str, total_count: u32) {
fn update_item_action(&mut self, version: u64, action_key: &str, total_count: u32) {
if let Some(item_to_update) = self.get_mut(version) {
if let Some(action) = item_to_update.item.get_action_by_key_mut(action_key) {
action.update_count();
Expand All @@ -285,7 +281,39 @@ where
}
}

fn all_actions_completed(&self, version: u64) -> bool {
fn update_prune_index_from_version(&mut self, version: u64) -> Option<usize> {
let current_prune_index = self.get_meta().prune_index;

let start_index = current_prune_index.unwrap_or(0);

let end_index = match self.index_from_head(version) {
Some(index) if index > start_index => index,
_ => self.suffix_length() - 1,
};

debug!(
"[Update prune index] Calculating prune index in suffix slice between index {start_index} <-> {end_index}. Current prune index version {current_prune_index:?}.",
);

// 1. Get the last contiguous item that is completed.
let safe_prune_version = self
.messages
.range(start_index..=end_index)
.flatten()
.take_while(|item| matches!(item.item.get_state(), SuffixItemState::Complete(..)))
.last()?
.item_ver;

// 2. Update the prune index.
let index = self.index_from_head(safe_prune_version)?;

self.update_prune_index(index.into());
debug!("[Update prune index] Prune version updated to {index} (version={safe_prune_version}");

Some(index)
}

fn are_all_item_actions_completed(&self, version: u64) -> bool {
if let Ok(Some(item)) = self.get(version) {
item.item.get_commit_actions().iter().all(|(_, x)| x.is_completed())
} else {
Expand Down
4 changes: 2 additions & 2 deletions packages/talos_suffix/src/suffix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ where
.collect()
}

pub fn find_prune_till_index(&mut self, prune_till_index: usize) -> usize {
pub fn find_prune_till_index(&self, prune_till_index: usize) -> usize {
let prune_till_index = self
.messages
.range(..prune_till_index + 1)
Expand All @@ -141,7 +141,7 @@ where
prune_till_index
}

pub fn get_safe_prune_index(&mut self) -> Option<usize> {
pub fn get_safe_prune_index(&self) -> Option<usize> {
// If `prune_start_threshold=None` don't prune.
let Some(prune_threshold) = self.meta.prune_start_threshold else {
debug!("[SUFFIX PRUNE CHECK] As suffix.meta.prune_start_threshold is None, pruning is disabled.");
Expand Down

0 comments on commit f97e6f5

Please sign in to comment.