diff --git a/packages/talos_cohort_replicator/src/core.rs b/packages/talos_cohort_replicator/src/core.rs index 7b301394..6ce8b771 100644 --- a/packages/talos_cohort_replicator/src/core.rs +++ b/packages/talos_cohort_replicator/src/core.rs @@ -7,7 +7,7 @@ use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, Chann use super::{ suffix::{ReplicatorSuffixItemTrait, ReplicatorSuffixTrait}, - utils::{get_filtered_batch, get_statemap_from_suffix_items}, + utils::get_statemap_from_suffix_items, }; #[derive(Debug, Clone, PartialEq)] @@ -124,25 +124,18 @@ where } } - pub(crate) fn generate_statemap_batch(&mut self) -> (Vec, Vec<(u64, Vec)>) { + pub(crate) fn generate_statemap_batch(&mut self) -> Vec<(u64, Vec)> { // get batch of items from suffix to install. - let items_option = self.suffix.get_message_batch_from_version(self.last_installing, None); + let items = self.suffix.get_message_batch_from_version(self.last_installing, None); - let mut statemaps_batch = vec![]; - - let mut current_batch_versions: Vec = vec![]; - - if let Some(items) = items_option { - current_batch_versions = items.iter().map(|i| i.item_ver).collect(); - let filtered_message_batch = get_filtered_batch(items.iter().copied()); - // generate the statemap from each item in batch. - statemaps_batch = get_statemap_from_suffix_items(filtered_message_batch); - - if let Some(last_item) = items.last() { - self.last_installing = last_item.item_ver; - } - } - (current_batch_versions, statemaps_batch) + let Some(last_item) = items.last() else { + // We don't have to explicitly check the vec is empty since if items.last() returns `None` + // we implicitly know the vec is empty. + return vec![]; + }; + self.last_installing = last_item.item_ver; + // generate the statemap from each item in batch. + get_statemap_from_suffix_items(items.into_iter()) } pub(crate) async fn commit_till_last_installed(&mut self) { diff --git a/packages/talos_cohort_replicator/src/services/replicator_service.rs b/packages/talos_cohort_replicator/src/services/replicator_service.rs index 91b2fa3a..b7e4d261 100644 --- a/packages/talos_cohort_replicator/src/services/replicator_service.rs +++ b/packages/talos_cohort_replicator/src/services/replicator_service.rs @@ -18,7 +18,7 @@ pub struct ReplicatorServiceConfig { } pub async fn replicator_service( - statemaps_tx: mpsc::Sender>, + statemaps_tx: mpsc::Sender<(u64, Vec)>, mut replicator_rx: mpsc::Receiver, mut replicator: Replicator, config: ReplicatorServiceConfig, @@ -31,7 +31,6 @@ where let mut interval = tokio::time::interval(Duration::from_millis(config.commit_frequency_ms)); let mut total_items_send = 0; - let mut total_items_processed = 0; let mut total_items_installed = 0; let mut time_first_item_created_start_ns: i128 = 0; // let mut time_last_item_send_end_ns: i128 = 0; @@ -54,32 +53,19 @@ where ChannelMessage::Decision(decision_version, decision_message) => { replicator.process_decision_message(decision_version, decision_message).await; - if total_items_processed == 0 { + if total_items_send == 0 { time_first_item_created_start_ns = OffsetDateTime::now_utc().unix_timestamp_nanos(); } // Get a batch of remaining versions with their statemaps to install. - let (all_versions_picked, statemaps_batch) = replicator.generate_statemap_batch(); + let statemaps_batch = replicator.generate_statemap_batch(); total_items_send += statemaps_batch.len(); - total_items_processed += all_versions_picked.len(); - - let statemap_batch_cloned = statemaps_batch.clone(); - let versions_not_sent = all_versions_picked.into_iter().filter(|&v| { - !statemap_batch_cloned.iter().any(|(ver, _)| ver != &v) - }); - // Send statemaps batch to - for (_, statemap_vec) in statemaps_batch { - statemaps_tx.send(statemap_vec).await.unwrap(); + for (ver, statemap_vec) in statemaps_batch { + statemaps_tx.send((ver,statemap_vec)).await.unwrap(); } - // These versions are decided but they are not send to Statemap installer as they are either aborted or don't have statemap - versions_not_sent.for_each(|version| { - replicator.suffix.set_item_installed(version); - - }); - time_last_item_send_end_ns = OffsetDateTime::now_utc().unix_timestamp_nanos(); }, @@ -91,7 +77,6 @@ where if config.enable_stats { let duration_sec = Duration::from_nanos((time_last_item_send_end_ns - time_first_item_created_start_ns) as u64).as_secs_f32(); let tps_send = total_items_send as f32 / duration_sec; - let tps_processed = total_items_processed as f32 / duration_sec; let duration_installed_sec = Duration::from_nanos((time_last_item_installed_ns - time_first_item_created_start_ns) as u64).as_secs_f32(); @@ -100,7 +85,6 @@ where error!(" Replicator Stats: - processed : tps={tps_processed:.3} | count={total_items_processed} send for install : tps={tps_send:.3} | count={total_items_send} installed : tps={tps_install:.3} | count={total_items_installed} \n "); diff --git a/packages/talos_cohort_replicator/src/services/statemap_queue_service.rs b/packages/talos_cohort_replicator/src/services/statemap_queue_service.rs index 751be7b2..a250e9b6 100644 --- a/packages/talos_cohort_replicator/src/services/statemap_queue_service.rs +++ b/packages/talos_cohort_replicator/src/services/statemap_queue_service.rs @@ -28,7 +28,7 @@ impl Default for StatemapQueueServiceConfig { } pub async fn statemap_queue_service( - mut statemaps_rx: mpsc::Receiver>, + mut statemaps_rx: mpsc::Receiver<(u64, Vec)>, mut statemap_installation_rx: mpsc::Receiver, installation_tx: mpsc::Sender<(u64, Vec)>, snapshot_api: S, @@ -56,9 +56,8 @@ where tokio::select! { statemap_batch_option = statemaps_rx.recv() => { - if let Some(statemaps) = statemap_batch_option { + if let Some((ver, statemaps)) = statemap_batch_option { - let ver = statemaps.first().unwrap().version; // Inserts the statemaps to the map let safepoint = if let Some(first_statemap) = statemaps.first() { diff --git a/packages/talos_cohort_replicator/src/suffix.rs b/packages/talos_cohort_replicator/src/suffix.rs index 81523ef1..b17842cc 100644 --- a/packages/talos_cohort_replicator/src/suffix.rs +++ b/packages/talos_cohort_replicator/src/suffix.rs @@ -27,7 +27,7 @@ pub trait ReplicatorSuffixTrait: SuffixTrait { fn update_prune_index(&mut self, version: u64); /// Returns the items from suffix fn get_suffix_meta(&self) -> &SuffixMeta; - fn get_message_batch_from_version(&self, from: u64, count: Option) -> Option>>; + fn get_message_batch_from_version(&self, from: u64, count: Option) -> Vec<&SuffixItem>; fn installed_all_prior_decided_items(&self, version: u64) -> bool; } @@ -86,28 +86,28 @@ where } } - fn get_message_batch_from_version(&self, from: u64, count: Option) -> Option>> { + fn get_message_batch_from_version(&self, from: u64, count: Option) -> Vec<&SuffixItem> { // let mut batch = vec![]; let batch_size = match count { Some(c) => c as usize, None => self.messages.len(), }; - let from_index = if from != 0 { self.index_from_head(from).unwrap() + 1 } else { 0 }; + let from_index = if from > 0 { + if let Some(index) = self.index_from_head(from) { + index + 1 + } else { + 0 + } + } else { + 0 + }; - let items = get_nonempty_suffix_items(self.messages.range(from_index..)) // take only some items in suffix + get_nonempty_suffix_items(self.messages.range(from_index..)) // take only some items in suffix .take_while(|m| m.is_decided) // take items till we find a not decided item. .filter(|m| !m.item.is_installed()) // remove already installed items. .take(batch_size) - .collect::>>(); - // let items_picked_in_suffix_batch = items.iter().map(|&item| item.item_ver).collect::>(); - - // error!("Items picked in this batch from_version={from} as index={from_index} \n versions={items_picked_in_suffix_batch:?}"); - if !items.is_empty() { - Some(items) - } else { - None - } + .collect::>>() } fn update_suffix_item_decision(&mut self, version: u64, decision_ver: u64) -> SuffixResult<()> { diff --git a/packages/talos_cohort_replicator/src/tests/suffix.rs b/packages/talos_cohort_replicator/src/tests/suffix.rs index 79e88c32..dae239e8 100644 --- a/packages/talos_cohort_replicator/src/tests/suffix.rs +++ b/packages/talos_cohort_replicator/src/tests/suffix.rs @@ -81,7 +81,7 @@ fn test_replicator_suffix() { suffix.insert(8, TestReplicatorSuffixItem::default()).unwrap(); // Message batch is empty as the decision is not added. - assert_eq!(suffix.get_message_batch_from_version(5, Some(5)), None); + assert!(suffix.get_message_batch_from_version(5, Some(5)).is_empty()); // Nothing happens for version 50 updates as the item doesn't exist. suffix.set_safepoint(50, Some(2)); @@ -97,32 +97,32 @@ fn test_replicator_suffix() { assert_eq!(item_at_version3.item.decision.unwrap(), CandidateDecisionOutcome::Committed); assert!(!item_at_version3.item.is_installed); // Message batch will be one as only version 3's decision is recorded.. - assert_eq!(suffix.get_message_batch_from_version(0, None).unwrap().len(), 1); + assert_eq!(suffix.get_message_batch_from_version(0, None).len(), 1); suffix.update_decision(4, 12).unwrap(); // Message batch will still be 1 as there was no version 1 inserted. // So the decision will be discarded - assert_eq!(suffix.get_message_batch_from_version(0, Some(4)).unwrap().len(), 1); + assert_eq!(suffix.get_message_batch_from_version(0, Some(4)).len(), 1); suffix.update_decision(5, 19).unwrap(); // Message batch will be 2 as safepoint is not set a decision is made, therefore version 3 and 4 are picked. // version 3 is considered as commited as the safepoint and decision_outcome is set. // version 4 is considered as aborted at this point as safepoint is not set. - assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).unwrap().len(), 2); - assert_eq!(suffix.get_message_batch_from_version(3, Some(10)).unwrap().len(), 1); + assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).len(), 2); + assert_eq!(suffix.get_message_batch_from_version(3, Some(10)).len(), 1); //add safepoint and decision for version 8 suffix.update_decision(8, 19).unwrap(); suffix.set_safepoint(8, Some(2)); suffix.set_decision_outcome(8, Some(CandidateDecisionOutcome::Committed)); // Message batch will be 3, as version 3,5, and 8 are not installed. - assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).unwrap().len(), 3); + assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).len(), 3); //add safepoint and decision for version 5 suffix.set_safepoint(5, Some(2)); suffix.set_decision_outcome(5, Some(CandidateDecisionOutcome::Committed)); // Message batch will be 3 as version 3, 4 and 5 has Some safepoint value - assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).unwrap().len(), 3); + assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).len(), 3); } #[test] @@ -145,10 +145,10 @@ fn test_replicator_suffix_installed() { suffix.set_decision_outcome(3, Some(CandidateDecisionOutcome::Committed)); // Batch returns one item as only version 3 is decided, others haven't got the decisions yet. - assert_eq!(suffix.get_message_batch_from_version(0, Some(1)).unwrap().len(), 1); + assert_eq!(suffix.get_message_batch_from_version(0, Some(1)).len(), 1); suffix.set_item_installed(3); // Batch returns 0 items as version 3 is already installed, others haven't got the decisions yet. - assert!(suffix.get_message_batch_from_version(0, Some(1)).is_none()); + assert!(suffix.get_message_batch_from_version(0, Some(1)).is_empty()); let suffix_item_3 = suffix.get(3).unwrap().unwrap(); // confirm version 3 is marked as installed. @@ -159,14 +159,14 @@ fn test_replicator_suffix_installed() { suffix.set_safepoint(9, Some(2)); suffix.set_decision_outcome(9, Some(CandidateDecisionOutcome::Committed)); // Batch returns 0, because there is a version in between which is not decided. - assert!(suffix.get_message_batch_from_version(0, Some(1)).is_none()); + assert!(suffix.get_message_batch_from_version(0, Some(1)).is_empty()); // update decision for version 6 suffix.update_suffix_item_decision(6, 23).unwrap(); suffix.set_safepoint(6, None); suffix.set_decision_outcome(6, Some(CandidateDecisionOutcome::Aborted)); // Batch returns 2 items (version 6 & 9). - let batch = suffix.get_message_batch_from_version(0, None).unwrap(); + let batch = suffix.get_message_batch_from_version(0, None); assert_eq!(batch.len(), 2); // Confirm the batch returned the correct item. @@ -175,7 +175,7 @@ fn test_replicator_suffix_installed() { // Mark version 9 as installed. suffix.set_item_installed(9); // Although version 9 is installed, version 6 is not, therefore it is picked up here. - assert_eq!(suffix.get_message_batch_from_version(3, Some(1)).unwrap().len(), 1); + assert_eq!(suffix.get_message_batch_from_version(3, Some(1)).len(), 1); assert_eq!(suffix.get_suffix_meta().head, 3); } diff --git a/packages/talos_cohort_replicator/src/tests/utils.rs b/packages/talos_cohort_replicator/src/tests/utils.rs index ff505725..6e6c63ac 100644 --- a/packages/talos_cohort_replicator/src/tests/utils.rs +++ b/packages/talos_cohort_replicator/src/tests/utils.rs @@ -32,37 +32,6 @@ fn test_get_filtered_batch_stop_on_undecided() { assert_eq!(result.count(), 1); } -#[test] -fn test_get_filtered_batch_remove_items_no_safepoint() { - //Test data - let item1 = build_test_suffix_item(10, Some(11), BankStatemapTestCandidate::create_with_statemap(1).set_safepoint(Some(1))); - let item2 = build_test_suffix_item(12, Some(15), BankStatemapTestCandidate::create_with_statemap(1)); // This item should be removed as safepoint is None - let item3 = build_test_suffix_item(13, Some(14), BankStatemapTestCandidate::create_with_statemap(1).set_safepoint(Some(2))); - let suffix_item = vec![&item1, &item2, &item3]; - - let mut result = get_filtered_batch(suffix_item.into_iter()); - - assert_eq!(result.next().unwrap().item_ver, 10); - assert_eq!(result.next().unwrap().item_ver, 13); - assert!(result.next().is_none()); -} - -#[test] -fn test_get_filtered_batch_remove_items_no_statemap() { - //Test data - - // item1 doesn't have statemap, and therefore shouldn't be in the result - let item1 = build_test_suffix_item(10, Some(11), BankStatemapTestCandidate::default().set_safepoint(Some(1))); - let item2 = build_test_suffix_item(12, Some(15), BankStatemapTestCandidate::create_with_statemap(2).set_safepoint(Some(1))); - let item3 = build_test_suffix_item(13, Some(14), BankStatemapTestCandidate::create_with_statemap(3).set_safepoint(Some(2))); - let item4 = build_test_suffix_item(16, Some(18), BankStatemapTestCandidate::create_with_statemap(1).set_safepoint(Some(2))); - let suffix_item = vec![&item1, &item2, &item3, &item4]; - - let mut result = get_filtered_batch(suffix_item.into_iter()); - - assert_eq!(result.next().unwrap().item_ver, 12); - assert_eq!(result.last().unwrap().item_ver, 16); -} #[test] fn test_get_all_statemap_from_suffix_items() { //Test data diff --git a/packages/talos_cohort_replicator/src/utils/replicator_utils.rs b/packages/talos_cohort_replicator/src/utils/replicator_utils.rs index 6dd12faa..c7a15090 100644 --- a/packages/talos_cohort_replicator/src/utils/replicator_utils.rs +++ b/packages/talos_cohort_replicator/src/utils/replicator_utils.rs @@ -2,12 +2,11 @@ use talos_suffix::SuffixItem; use crate::{core::StatemapItem, suffix::ReplicatorSuffixItemTrait}; +/// Get all contiguous items decided irrespective of committed or aborted. pub fn get_filtered_batch<'a, T: ReplicatorSuffixItemTrait + 'a>(messages: impl Iterator>) -> impl Iterator> { - messages - .into_iter() - .take_while(|&m| m.is_decided) - // select only the messages that have safepoint i.e committed messages and select only the messages that have statemap. - .filter(|&m| m.item.get_safepoint().is_some() && m.item.get_statemap().is_some()) + messages.into_iter().take_while(|&m| m.is_decided) + // select only the messages that have safepoint i.e committed messages and select only the messages that have statemap. + // .filter(|&m| m.item.get_safepoint().is_some() && m.item.get_statemap().is_some()) // .filter(|&m| m.item.get_statemap().is_some()) // select only the messages that have statemap. } @@ -15,25 +14,37 @@ pub fn get_filtered_batch<'a, T: ReplicatorSuffixItemTrait + 'a>(messages: impl pub fn get_statemap_from_suffix_items<'a, T: ReplicatorSuffixItemTrait + 'a>( messages: impl Iterator>, ) -> Vec<(u64, Vec)> { - messages.into_iter().fold(vec![], |mut acc, m| match m.item.get_statemap().as_ref() { - Some(sm_items) => { - let state_maps_to_append = sm_items.iter().map(|sm| { - let key = sm.keys().next().unwrap().to_string(); - let payload = sm.get(&key).unwrap().clone(); - - StatemapItem { - action: key, - payload, - version: m.item_ver, - safepoint: *m.item.get_safepoint(), - } - }); - acc.push((m.item_ver, state_maps_to_append.collect::>())); - acc - } - None => { + messages.into_iter().fold(vec![], |mut acc, m| { + // aborts + if m.item.get_safepoint().is_none() { acc.push((m.item_ver, vec![])); - acc + return acc; + } + + // commits + match m.item.get_statemap().as_ref() { + // when there is statemap + Some(sm_items) => { + let state_maps_to_append = sm_items.iter().map(|sm| { + let key = sm.keys().next().unwrap().to_string(); + let payload = sm.get(&key).unwrap().clone(); + + StatemapItem { + action: key, + payload, + version: m.item_ver, + safepoint: *m.item.get_safepoint(), + } + }); + acc.push((m.item_ver, state_maps_to_append.collect::>())); + acc + } + // when there is no statemap + None => { + // Empty statemap items are send for installs anyways to update the snapshot + acc.push((m.item_ver, vec![])); + acc + } } }) }