From f97e6f5f6dddeabefb1274af7a50b8df4970f8a0 Mon Sep 17 00:00:00 2001 From: gk-kindred <118979108+gk-kindred@users.noreply.github.com> Date: Tue, 3 Oct 2023 08:47:45 +1100 Subject: [PATCH] feat: talos messenger suffix prune and message receiver offset commit (#84) * feat: add messenger suffix pruning * feat: add suffix prune and commit receiver --- packages/talos_certifier/src/errors.rs | 18 ++++ packages/talos_certifier/src/ports/message.rs | 3 +- .../src/services/message_receiver_service.rs | 2 +- .../tests/message_receiver_service.rs | 5 +- .../src/kafka/consumer.rs | 7 +- packages/talos_cohort_replicator/src/core.rs | 2 +- .../src/services/inbound_service.rs | 59 ++++++------- packages/talos_messenger_core/src/suffix.rs | 86 ++++++++++++------- packages/talos_suffix/src/suffix.rs | 4 +- 9 files changed, 118 insertions(+), 68 deletions(-) diff --git a/packages/talos_certifier/src/errors.rs b/packages/talos_certifier/src/errors.rs index 84f25132..d54c4d6c 100644 --- a/packages/talos_certifier/src/errors.rs +++ b/packages/talos_certifier/src/errors.rs @@ -135,6 +135,24 @@ impl From for SystemServiceError { } } } +impl From for Box { + fn from(msg_rx_error: MessageReceiverError) -> Self { + match msg_rx_error.kind { + MessageReceiverErrorKind::ParseError => Box::new(SystemServiceError { + kind: SystemServiceErrorKind::ParseError, + reason: msg_rx_error.reason, + data: msg_rx_error.data, + service: "Message Receiver Servicer".to_string(), + }), + _ => Box::new(SystemServiceError { + kind: SystemServiceErrorKind::MessageReceiverError(msg_rx_error.kind), + reason: msg_rx_error.reason, + data: msg_rx_error.data, + service: "Message Receiver Servicer".to_string(), + }), + } + } +} impl From for SystemServiceError { fn from(msg_rx_error: MessagePublishError) -> Self { SystemServiceError { diff --git a/packages/talos_certifier/src/ports/message.rs b/packages/talos_certifier/src/ports/message.rs index bb274fa2..508ab001 100644 --- a/packages/talos_certifier/src/ports/message.rs +++ b/packages/talos_certifier/src/ports/message.rs @@ -16,7 +16,8 @@ pub trait MessageReciever: SharedPortTraits { async fn subscribe(&self) -> Result<(), SystemServiceError>; async fn commit(&self) -> Result<(), SystemServiceError>; fn commit_async(&self) -> Option>>; - async fn update_savepoint(&mut self, offset: i64) -> Result<(), SystemServiceError>; + fn update_savepoint(&mut self, offset: i64) -> Result<(), Box>; + async fn update_savepoint_async(&mut self, offset: i64) -> Result<(), SystemServiceError>; async fn unsubscribe(&self); } diff --git a/packages/talos_certifier/src/services/message_receiver_service.rs b/packages/talos_certifier/src/services/message_receiver_service.rs index 74e6e9fc..4b9cd345 100644 --- a/packages/talos_certifier/src/services/message_receiver_service.rs +++ b/packages/talos_certifier/src/services/message_receiver_service.rs @@ -84,7 +84,7 @@ impl SystemService for MessageReceiverService { //** commit message _ = self.commit_interval.tick() => { let offset = self.commit_offset.load(std::sync::atomic::Ordering::Relaxed); - self.receiver.update_savepoint(offset).await?; + self.receiver.update_savepoint(offset)?; self.receiver.commit_async(); } } diff --git a/packages/talos_certifier/src/services/tests/message_receiver_service.rs b/packages/talos_certifier/src/services/tests/message_receiver_service.rs index 660e1d0a..81a920eb 100644 --- a/packages/talos_certifier/src/services/tests/message_receiver_service.rs +++ b/packages/talos_certifier/src/services/tests/message_receiver_service.rs @@ -47,7 +47,10 @@ impl MessageReciever for MockReciever { fn commit_async(&self) -> Option>> { None } - async fn update_savepoint(&mut self, _version: i64) -> Result<(), SystemServiceError> { + fn update_savepoint(&mut self, _version: i64) -> Result<(), Box> { + Ok(()) + } + async fn update_savepoint_async(&mut self, _version: i64) -> Result<(), SystemServiceError> { Ok(()) } diff --git a/packages/talos_certifier_adapters/src/kafka/consumer.rs b/packages/talos_certifier_adapters/src/kafka/consumer.rs index ca7bc614..7f149333 100644 --- a/packages/talos_certifier_adapters/src/kafka/consumer.rs +++ b/packages/talos_certifier_adapters/src/kafka/consumer.rs @@ -206,7 +206,7 @@ impl MessageReciever for KafkaConsumer { } } - async fn update_savepoint(&mut self, offset: i64) -> Result<(), SystemServiceError> { + fn update_savepoint(&mut self, offset: i64) -> Result<(), Box> { // let partition = self.tpl.; let tpl = self.tpl.elements_for_topic(&self.topic); if !tpl.is_empty() { @@ -222,6 +222,11 @@ impl MessageReciever for KafkaConsumer { } Ok(()) } + + async fn update_savepoint_async(&mut self, _offset: i64) -> Result<(), SystemServiceError> { + // For future, maybe for another abcast. Not needed in here. + unimplemented!() + } } #[async_trait] diff --git a/packages/talos_cohort_replicator/src/core.rs b/packages/talos_cohort_replicator/src/core.rs index cab4bae4..e349c2ef 100644 --- a/packages/talos_cohort_replicator/src/core.rs +++ b/packages/talos_cohort_replicator/src/core.rs @@ -141,7 +141,7 @@ where pub(crate) async fn commit(&mut self) { if let Some(version) = self.next_commit_offset { - self.receiver.update_savepoint(version as i64).await.unwrap(); + self.receiver.update_savepoint(version as i64).unwrap(); self.receiver.commit_async(); self.next_commit_offset = None; } diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 49e8e8ed..1fb04381 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -16,7 +16,7 @@ use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; -use log::{error, info, warn}; +use log::{debug, info, warn}; use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage}; use talos_suffix::{Suffix, SuffixTrait}; @@ -49,11 +49,6 @@ where async fn process_next_actions(&mut self) -> MessengerServiceResult { let items_to_process = self.suffix.get_suffix_items_to_process(); - error!( - "Items to process count... {:#?}", - items_to_process.iter().map(|x| x.version).collect::>() - ); - for item in items_to_process { let ver = item.version; @@ -83,24 +78,41 @@ where /// Handles the feedback received from other services when they have successfully processed the action. /// Will update the individual action for the count and completed flag and also update state of the suffix item. /// - pub(crate) fn handle_item_actioned_success(&mut self, version: u64, action_key: &str, total_count: u32) { + pub(crate) async fn handle_item_actioned_success(&mut self, version: u64, action_key: &str, total_count: u32) { let item_state = self.suffix.get_item_state(version); match item_state { Some(SuffixItemState::Processing) | Some(SuffixItemState::PartiallyComplete) => { self.suffix.set_item_state(version, SuffixItemState::PartiallyComplete); - self.suffix.update_action(version, action_key, total_count); - if self.suffix.all_actions_completed(version) { + self.suffix.update_item_action(version, action_key, total_count); + if self.suffix.are_all_item_actions_completed(version) { self.suffix .set_item_state(version, SuffixItemState::Complete(SuffixItemCompleteStateReason::Processed)); + + // Pruning of suffix. + self.suffix.update_prune_index_from_version(version); + + debug!("[Actions] All actions in Version {version} completed!"); + // Check prune eligibility by looking at the prune meta info. + if let Some(index_to_prune) = self.suffix.get_safe_prune_index() { + // Call prune method on suffix. + let _ = self.suffix.prune_till_index(index_to_prune); + + // TODO: GK - Calculate the safe offset to commit. + let commit_offset = version + 1; + debug!("[Commit] Updating tpl to version .. {commit_offset}"); + let _ = self.message_receiver.update_savepoint(commit_offset as i64); + + self.message_receiver.commit_async(); + } } + debug!( + "[Action] State version={version} changed from {item_state:?} => {:?}", + self.suffix.get_item_state(version) + ); } _ => (), }; - error!( - "State change for version={version} from {item_state:?} => {:?}", - self.suffix.get_item_state(version) - ); } } @@ -149,7 +161,6 @@ where item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::NoCommitActions)); } - error!("[FILTERED ACTIONS] version={} state={:?} actions={:#?}", version, item_to_update.item.get_state(), item_to_update.item.get_commit_actions()); }; } else { @@ -164,21 +175,12 @@ where self.suffix.update_item_decision(version, decision_version, &decision_message); - self.process_next_actions().await? - - - // TODO: GK - Calculate the safe offset to commit. - - // TODO: GK - Prune suffix. + self.process_next_actions().await?; }, } } - // Next condition - Commit, get processed/published info. - - - // Receive feedback from publisher. Some(feedback) = self.rx_feedback_channel.recv() => { match feedback { @@ -187,14 +189,7 @@ where MessengerChannelFeedback::Success(version, key, total_count) => { info!("Successfully received version={version} count={total_count}"); - self.handle_item_actioned_success(version, &key, total_count); - - - // self.suffix.messages.iter().flatten().for_each(|item| - // error!("version={} decision={:?} state={:?} action_state={:#?}", item.item_ver, item.item.decision, item.item.get_state(), item.item.commit_actions.iter().map(|x| (x.1.count, x.1.is_completed)).collect::>()) - // ); - // info!("Suffix dump ={:?}") - // info!("State on completion ={:?}", item_state); + self.handle_item_actioned_success(version, &key, total_count).await; }, } // Process the next items with commit actions diff --git a/packages/talos_messenger_core/src/suffix.rs b/packages/talos_messenger_core/src/suffix.rs index 615872b5..bc18633d 100644 --- a/packages/talos_messenger_core/src/suffix.rs +++ b/packages/talos_messenger_core/src/suffix.rs @@ -1,12 +1,12 @@ use ahash::{HashMap, HashMapExt}; -use log::{error, warn}; +use log::{debug, warn}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::fmt::Debug; use talos_certifier::model::{CandidateMessage, Decision, DecisionMessageTrait}; use talos_suffix::{core::SuffixMeta, Suffix, SuffixItem, SuffixTrait}; -pub trait MessengerSuffixItemTrait { +pub trait MessengerSuffixItemTrait: Debug + Clone { fn set_state(&mut self, state: SuffixItemState); fn set_safepoint(&mut self, safepoint: Option); fn set_commit_action(&mut self, commit_actions: HashMap); @@ -22,23 +22,33 @@ pub trait MessengerSuffixItemTrait { pub trait MessengerSuffixTrait: SuffixTrait { // Setters + /// Sets the state of an item by version. fn set_item_state(&mut self, version: u64, process_state: SuffixItemState); // Getters + /// Get suffix meta + fn get_meta(&self) -> &SuffixMeta; + /// Get suffix item as mutable reference. fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem>; - fn get_item_state(&self, version: u64) -> Option; - fn get_last_installed(&self, to_version: Option) -> Option<&SuffixItem>; - // fn update_suffix_item_decision(&mut self, version: u64, decision_ver: u64) -> SuffixResult<()>; - fn get_suffix_meta(&self) -> &SuffixMeta; - fn installed_all_prior_decided_items(&self, version: u64) -> bool; + /// Checks if suffix ready to prune + /// + // fn is_safe_prune() -> bool; + /// Get the state of an item by version. + fn get_item_state(&self, version: u64) -> Option; + /// Gets the suffix items eligible to process. fn get_suffix_items_to_process(&self) -> Vec; - // updates - fn update_prune_index(&mut self, version: u64); + /// Updates the decision for a version. fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D); - fn update_action(&mut self, version: u64, action_key: &str, total_count: u32); + /// Updates the action for a version using the action_key for lookup. + fn update_item_action(&mut self, version: u64, action_key: &str, total_count: u32); + + /// Checks if all versions prioir to this version are already completed, and updates the prune index. + /// If the prune index was updated, returns the new prune_index, else returns None. + fn update_prune_index_from_version(&mut self, version: u64) -> Option; - fn all_actions_completed(&self, version: u64) -> bool; + /// Checks if all commit actions are completed for the version + fn are_all_item_actions_completed(&self, version: u64) -> bool; } #[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)] @@ -178,7 +188,7 @@ impl MessengerSuffixItemTrait for MessengerCandidate { impl MessengerSuffixTrait for Suffix where - T: MessengerSuffixItemTrait + Debug + Clone, + T: MessengerSuffixItemTrait, { // TODO: GK - Elevate this to core suffix fn get_mut(&mut self, version: u64) -> Option<&mut SuffixItem> { @@ -200,19 +210,11 @@ where } } - fn get_last_installed(&self, _to_version: Option) -> Option<&SuffixItem> { - todo!() - } - // TODO: GK - Elevate this to core suffix - fn get_suffix_meta(&self) -> &SuffixMeta { + fn get_meta(&self) -> &SuffixMeta { &self.meta } - fn installed_all_prior_decided_items(&self, _version: u64) -> bool { - todo!() - } - fn get_suffix_items_to_process(&self) -> Vec { let items = self .messages @@ -224,14 +226,12 @@ where // Take while contiguous ones, whose safepoint is already processed. .take_while(|&x| { let Some(safepoint) = x.item.get_safepoint() else { - error!("take while early exit for version {:?}", x.item_ver); return false; }; match self.get(*safepoint) { // If we find the suffix item from the safepoint, we need to ensure that it already in `Complete` state Ok(Some(safepoint_item)) => { - error!("State of safepoint items is {:?}", safepoint_item.item.get_state()); matches!(safepoint_item.item.get_state(), SuffixItemState::Complete(..)) } // If we couldn't find the item in suffix, it could be because it was pruned and it is safe to assume that we can consider it. @@ -247,10 +247,6 @@ where items } - fn update_prune_index(&mut self, _version: u64) { - todo!() - } - fn update_item_decision(&mut self, version: u64, decision_version: u64, decision_message: &D) { let _ = self.update_decision_suffix_item(version, decision_version); @@ -269,7 +265,7 @@ where } } - fn update_action(&mut self, version: u64, action_key: &str, total_count: u32) { + fn update_item_action(&mut self, version: u64, action_key: &str, total_count: u32) { if let Some(item_to_update) = self.get_mut(version) { if let Some(action) = item_to_update.item.get_action_by_key_mut(action_key) { action.update_count(); @@ -285,7 +281,39 @@ where } } - fn all_actions_completed(&self, version: u64) -> bool { + fn update_prune_index_from_version(&mut self, version: u64) -> Option { + let current_prune_index = self.get_meta().prune_index; + + let start_index = current_prune_index.unwrap_or(0); + + let end_index = match self.index_from_head(version) { + Some(index) if index > start_index => index, + _ => self.suffix_length() - 1, + }; + + debug!( + "[Update prune index] Calculating prune index in suffix slice between index {start_index} <-> {end_index}. Current prune index version {current_prune_index:?}.", + ); + + // 1. Get the last contiguous item that is completed. + let safe_prune_version = self + .messages + .range(start_index..=end_index) + .flatten() + .take_while(|item| matches!(item.item.get_state(), SuffixItemState::Complete(..))) + .last()? + .item_ver; + + // 2. Update the prune index. + let index = self.index_from_head(safe_prune_version)?; + + self.update_prune_index(index.into()); + debug!("[Update prune index] Prune version updated to {index} (version={safe_prune_version}"); + + Some(index) + } + + fn are_all_item_actions_completed(&self, version: u64) -> bool { if let Ok(Some(item)) = self.get(version) { item.item.get_commit_actions().iter().all(|(_, x)| x.is_completed()) } else { diff --git a/packages/talos_suffix/src/suffix.rs b/packages/talos_suffix/src/suffix.rs index 8ccfe331..38e62887 100644 --- a/packages/talos_suffix/src/suffix.rs +++ b/packages/talos_suffix/src/suffix.rs @@ -129,7 +129,7 @@ where .collect() } - pub fn find_prune_till_index(&mut self, prune_till_index: usize) -> usize { + pub fn find_prune_till_index(&self, prune_till_index: usize) -> usize { let prune_till_index = self .messages .range(..prune_till_index + 1) @@ -141,7 +141,7 @@ where prune_till_index } - pub fn get_safe_prune_index(&mut self) -> Option { + pub fn get_safe_prune_index(&self) -> Option { // If `prune_start_threshold=None` don't prune. let Some(prune_threshold) = self.meta.prune_start_threshold else { debug!("[SUFFIX PRUNE CHECK] As suffix.meta.prune_start_threshold is None, pruning is disabled.");