Skip to content

Commit

Permalink
feat: send abort and commits with empty statemaps for installation (#77)
Browse files Browse the repository at this point in the history
* feat: send aborts and empty statemap candidates to installation

sending aborts and commited messages with empty statemaps is needed to update the snapshot

* fix: final changes to send aborts for installation
  • Loading branch information
gk-kindred authored Sep 13, 2023
1 parent 4ef3f83 commit 29cee52
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 121 deletions.
29 changes: 11 additions & 18 deletions packages/talos_cohort_replicator/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, Chann

use super::{
suffix::{ReplicatorSuffixItemTrait, ReplicatorSuffixTrait},
utils::{get_filtered_batch, get_statemap_from_suffix_items},
utils::get_statemap_from_suffix_items,
};

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -124,25 +124,18 @@ where
}
}

pub(crate) fn generate_statemap_batch(&mut self) -> (Vec<u64>, Vec<(u64, Vec<StatemapItem>)>) {
pub(crate) fn generate_statemap_batch(&mut self) -> Vec<(u64, Vec<StatemapItem>)> {
// get batch of items from suffix to install.
let items_option = self.suffix.get_message_batch_from_version(self.last_installing, None);
let items = self.suffix.get_message_batch_from_version(self.last_installing, None);

let mut statemaps_batch = vec![];

let mut current_batch_versions: Vec<u64> = vec![];

if let Some(items) = items_option {
current_batch_versions = items.iter().map(|i| i.item_ver).collect();
let filtered_message_batch = get_filtered_batch(items.iter().copied());
// generate the statemap from each item in batch.
statemaps_batch = get_statemap_from_suffix_items(filtered_message_batch);

if let Some(last_item) = items.last() {
self.last_installing = last_item.item_ver;
}
}
(current_batch_versions, statemaps_batch)
let Some(last_item) = items.last() else {
// We don't have to explicitly check the vec is empty since if items.last() returns `None`
// we implicitly know the vec is empty.
return vec![];
};
self.last_installing = last_item.item_ver;
// generate the statemap from each item in batch.
get_statemap_from_suffix_items(items.into_iter())
}

pub(crate) async fn commit_till_last_installed(&mut self) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct ReplicatorServiceConfig {
}

pub async fn replicator_service<S, M>(
statemaps_tx: mpsc::Sender<Vec<StatemapItem>>,
statemaps_tx: mpsc::Sender<(u64, Vec<StatemapItem>)>,
mut replicator_rx: mpsc::Receiver<ReplicatorChannel>,
mut replicator: Replicator<ReplicatorCandidate, S, M>,
config: ReplicatorServiceConfig,
Expand All @@ -31,7 +31,6 @@ where
let mut interval = tokio::time::interval(Duration::from_millis(config.commit_frequency_ms));

let mut total_items_send = 0;
let mut total_items_processed = 0;
let mut total_items_installed = 0;
let mut time_first_item_created_start_ns: i128 = 0; //
let mut time_last_item_send_end_ns: i128 = 0;
Expand All @@ -54,32 +53,19 @@ where
ChannelMessage::Decision(decision_version, decision_message) => {
replicator.process_decision_message(decision_version, decision_message).await;

if total_items_processed == 0 {
if total_items_send == 0 {
time_first_item_created_start_ns = OffsetDateTime::now_utc().unix_timestamp_nanos();
}
// Get a batch of remaining versions with their statemaps to install.
let (all_versions_picked, statemaps_batch) = replicator.generate_statemap_batch();
let statemaps_batch = replicator.generate_statemap_batch();

total_items_send += statemaps_batch.len();
total_items_processed += all_versions_picked.len();

let statemap_batch_cloned = statemaps_batch.clone();
let versions_not_sent = all_versions_picked.into_iter().filter(|&v| {
!statemap_batch_cloned.iter().any(|(ver, _)| ver != &v)
});


// Send statemaps batch to
for (_, statemap_vec) in statemaps_batch {
statemaps_tx.send(statemap_vec).await.unwrap();
for (ver, statemap_vec) in statemaps_batch {
statemaps_tx.send((ver,statemap_vec)).await.unwrap();
}

// These versions are decided but they are not send to Statemap installer as they are either aborted or don't have statemap
versions_not_sent.for_each(|version| {
replicator.suffix.set_item_installed(version);

});

time_last_item_send_end_ns = OffsetDateTime::now_utc().unix_timestamp_nanos();

},
Expand All @@ -91,7 +77,6 @@ where
if config.enable_stats {
let duration_sec = Duration::from_nanos((time_last_item_send_end_ns - time_first_item_created_start_ns) as u64).as_secs_f32();
let tps_send = total_items_send as f32 / duration_sec;
let tps_processed = total_items_processed as f32 / duration_sec;


let duration_installed_sec = Duration::from_nanos((time_last_item_installed_ns - time_first_item_created_start_ns) as u64).as_secs_f32();
Expand All @@ -100,7 +85,6 @@ where

error!("
Replicator Stats:
processed : tps={tps_processed:.3} | count={total_items_processed}
send for install : tps={tps_send:.3} | count={total_items_send}
installed : tps={tps_install:.3} | count={total_items_installed}
\n ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Default for StatemapQueueServiceConfig {
}

pub async fn statemap_queue_service<S>(
mut statemaps_rx: mpsc::Receiver<Vec<StatemapItem>>,
mut statemaps_rx: mpsc::Receiver<(u64, Vec<StatemapItem>)>,
mut statemap_installation_rx: mpsc::Receiver<StatemapInstallationStatus>,
installation_tx: mpsc::Sender<(u64, Vec<StatemapItem>)>,
snapshot_api: S,
Expand Down Expand Up @@ -56,9 +56,8 @@ where
tokio::select! {
statemap_batch_option = statemaps_rx.recv() => {

if let Some(statemaps) = statemap_batch_option {
if let Some((ver, statemaps)) = statemap_batch_option {

let ver = statemaps.first().unwrap().version;
// Inserts the statemaps to the map

let safepoint = if let Some(first_statemap) = statemaps.first() {
Expand Down
26 changes: 13 additions & 13 deletions packages/talos_cohort_replicator/src/suffix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub trait ReplicatorSuffixTrait<T: ReplicatorSuffixItemTrait>: SuffixTrait<T> {
fn update_prune_index(&mut self, version: u64);
/// Returns the items from suffix
fn get_suffix_meta(&self) -> &SuffixMeta;
fn get_message_batch_from_version(&self, from: u64, count: Option<u64>) -> Option<Vec<&SuffixItem<T>>>;
fn get_message_batch_from_version(&self, from: u64, count: Option<u64>) -> Vec<&SuffixItem<T>>;
fn installed_all_prior_decided_items(&self, version: u64) -> bool;
}

Expand Down Expand Up @@ -86,28 +86,28 @@ where
}
}

fn get_message_batch_from_version(&self, from: u64, count: Option<u64>) -> Option<Vec<&SuffixItem<T>>> {
fn get_message_batch_from_version(&self, from: u64, count: Option<u64>) -> Vec<&SuffixItem<T>> {
// let mut batch = vec![];
let batch_size = match count {
Some(c) => c as usize,
None => self.messages.len(),
};

let from_index = if from != 0 { self.index_from_head(from).unwrap() + 1 } else { 0 };
let from_index = if from > 0 {
if let Some(index) = self.index_from_head(from) {
index + 1
} else {
0
}
} else {
0
};

let items = get_nonempty_suffix_items(self.messages.range(from_index..)) // take only some items in suffix
get_nonempty_suffix_items(self.messages.range(from_index..)) // take only some items in suffix
.take_while(|m| m.is_decided) // take items till we find a not decided item.
.filter(|m| !m.item.is_installed()) // remove already installed items.
.take(batch_size)
.collect::<Vec<&SuffixItem<T>>>();
// let items_picked_in_suffix_batch = items.iter().map(|&item| item.item_ver).collect::<Vec<u64>>();

// error!("Items picked in this batch from_version={from} as index={from_index} \n versions={items_picked_in_suffix_batch:?}");
if !items.is_empty() {
Some(items)
} else {
None
}
.collect::<Vec<&SuffixItem<T>>>()
}

fn update_suffix_item_decision(&mut self, version: u64, decision_ver: u64) -> SuffixResult<()> {
Expand Down
24 changes: 12 additions & 12 deletions packages/talos_cohort_replicator/src/tests/suffix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn test_replicator_suffix() {
suffix.insert(8, TestReplicatorSuffixItem::default()).unwrap();

// Message batch is empty as the decision is not added.
assert_eq!(suffix.get_message_batch_from_version(5, Some(5)), None);
assert!(suffix.get_message_batch_from_version(5, Some(5)).is_empty());

// Nothing happens for version 50 updates as the item doesn't exist.
suffix.set_safepoint(50, Some(2));
Expand All @@ -97,32 +97,32 @@ fn test_replicator_suffix() {
assert_eq!(item_at_version3.item.decision.unwrap(), CandidateDecisionOutcome::Committed);
assert!(!item_at_version3.item.is_installed);
// Message batch will be one as only version 3's decision is recorded..
assert_eq!(suffix.get_message_batch_from_version(0, None).unwrap().len(), 1);
assert_eq!(suffix.get_message_batch_from_version(0, None).len(), 1);

suffix.update_decision(4, 12).unwrap();
// Message batch will still be 1 as there was no version 1 inserted.
// So the decision will be discarded
assert_eq!(suffix.get_message_batch_from_version(0, Some(4)).unwrap().len(), 1);
assert_eq!(suffix.get_message_batch_from_version(0, Some(4)).len(), 1);

suffix.update_decision(5, 19).unwrap();
// Message batch will be 2 as safepoint is not set a decision is made, therefore version 3 and 4 are picked.
// version 3 is considered as commited as the safepoint and decision_outcome is set.
// version 4 is considered as aborted at this point as safepoint is not set.
assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).unwrap().len(), 2);
assert_eq!(suffix.get_message_batch_from_version(3, Some(10)).unwrap().len(), 1);
assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).len(), 2);
assert_eq!(suffix.get_message_batch_from_version(3, Some(10)).len(), 1);

//add safepoint and decision for version 8
suffix.update_decision(8, 19).unwrap();
suffix.set_safepoint(8, Some(2));
suffix.set_decision_outcome(8, Some(CandidateDecisionOutcome::Committed));
// Message batch will be 3, as version 3,5, and 8 are not installed.
assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).unwrap().len(), 3);
assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).len(), 3);

//add safepoint and decision for version 5
suffix.set_safepoint(5, Some(2));
suffix.set_decision_outcome(5, Some(CandidateDecisionOutcome::Committed));
// Message batch will be 3 as version 3, 4 and 5 has Some safepoint value
assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).unwrap().len(), 3);
assert_eq!(suffix.get_message_batch_from_version(0, Some(10)).len(), 3);
}

#[test]
Expand All @@ -145,10 +145,10 @@ fn test_replicator_suffix_installed() {
suffix.set_decision_outcome(3, Some(CandidateDecisionOutcome::Committed));

// Batch returns one item as only version 3 is decided, others haven't got the decisions yet.
assert_eq!(suffix.get_message_batch_from_version(0, Some(1)).unwrap().len(), 1);
assert_eq!(suffix.get_message_batch_from_version(0, Some(1)).len(), 1);
suffix.set_item_installed(3);
// Batch returns 0 items as version 3 is already installed, others haven't got the decisions yet.
assert!(suffix.get_message_batch_from_version(0, Some(1)).is_none());
assert!(suffix.get_message_batch_from_version(0, Some(1)).is_empty());

let suffix_item_3 = suffix.get(3).unwrap().unwrap();
// confirm version 3 is marked as installed.
Expand All @@ -159,14 +159,14 @@ fn test_replicator_suffix_installed() {
suffix.set_safepoint(9, Some(2));
suffix.set_decision_outcome(9, Some(CandidateDecisionOutcome::Committed));
// Batch returns 0, because there is a version in between which is not decided.
assert!(suffix.get_message_batch_from_version(0, Some(1)).is_none());
assert!(suffix.get_message_batch_from_version(0, Some(1)).is_empty());

// update decision for version 6
suffix.update_suffix_item_decision(6, 23).unwrap();
suffix.set_safepoint(6, None);
suffix.set_decision_outcome(6, Some(CandidateDecisionOutcome::Aborted));
// Batch returns 2 items (version 6 & 9).
let batch = suffix.get_message_batch_from_version(0, None).unwrap();
let batch = suffix.get_message_batch_from_version(0, None);
assert_eq!(batch.len(), 2);

// Confirm the batch returned the correct item.
Expand All @@ -175,7 +175,7 @@ fn test_replicator_suffix_installed() {
// Mark version 9 as installed.
suffix.set_item_installed(9);
// Although version 9 is installed, version 6 is not, therefore it is picked up here.
assert_eq!(suffix.get_message_batch_from_version(3, Some(1)).unwrap().len(), 1);
assert_eq!(suffix.get_message_batch_from_version(3, Some(1)).len(), 1);

assert_eq!(suffix.get_suffix_meta().head, 3);
}
31 changes: 0 additions & 31 deletions packages/talos_cohort_replicator/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,6 @@ fn test_get_filtered_batch_stop_on_undecided() {
assert_eq!(result.count(), 1);
}

#[test]
fn test_get_filtered_batch_remove_items_no_safepoint() {
//Test data
let item1 = build_test_suffix_item(10, Some(11), BankStatemapTestCandidate::create_with_statemap(1).set_safepoint(Some(1)));
let item2 = build_test_suffix_item(12, Some(15), BankStatemapTestCandidate::create_with_statemap(1)); // This item should be removed as safepoint is None
let item3 = build_test_suffix_item(13, Some(14), BankStatemapTestCandidate::create_with_statemap(1).set_safepoint(Some(2)));
let suffix_item = vec![&item1, &item2, &item3];

let mut result = get_filtered_batch(suffix_item.into_iter());

assert_eq!(result.next().unwrap().item_ver, 10);
assert_eq!(result.next().unwrap().item_ver, 13);
assert!(result.next().is_none());
}

#[test]
fn test_get_filtered_batch_remove_items_no_statemap() {
//Test data

// item1 doesn't have statemap, and therefore shouldn't be in the result
let item1 = build_test_suffix_item(10, Some(11), BankStatemapTestCandidate::default().set_safepoint(Some(1)));
let item2 = build_test_suffix_item(12, Some(15), BankStatemapTestCandidate::create_with_statemap(2).set_safepoint(Some(1)));
let item3 = build_test_suffix_item(13, Some(14), BankStatemapTestCandidate::create_with_statemap(3).set_safepoint(Some(2)));
let item4 = build_test_suffix_item(16, Some(18), BankStatemapTestCandidate::create_with_statemap(1).set_safepoint(Some(2)));
let suffix_item = vec![&item1, &item2, &item3, &item4];

let mut result = get_filtered_batch(suffix_item.into_iter());

assert_eq!(result.next().unwrap().item_ver, 12);
assert_eq!(result.last().unwrap().item_ver, 16);
}
#[test]
fn test_get_all_statemap_from_suffix_items() {
//Test data
Expand Down
57 changes: 34 additions & 23 deletions packages/talos_cohort_replicator/src/utils/replicator_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,49 @@ use talos_suffix::SuffixItem;

use crate::{core::StatemapItem, suffix::ReplicatorSuffixItemTrait};

/// Get all contiguous items decided irrespective of committed or aborted.
pub fn get_filtered_batch<'a, T: ReplicatorSuffixItemTrait + 'a>(messages: impl Iterator<Item = &'a SuffixItem<T>>) -> impl Iterator<Item = &'a SuffixItem<T>> {
messages
.into_iter()
.take_while(|&m| m.is_decided)
// select only the messages that have safepoint i.e committed messages and select only the messages that have statemap.
.filter(|&m| m.item.get_safepoint().is_some() && m.item.get_statemap().is_some())
messages.into_iter().take_while(|&m| m.is_decided)
// select only the messages that have safepoint i.e committed messages and select only the messages that have statemap.
// .filter(|&m| m.item.get_safepoint().is_some() && m.item.get_statemap().is_some())
// .filter(|&m| m.item.get_statemap().is_some()) // select only the messages that have statemap.
}

/// Takes an iterator of suffix items and returns a vector of versions and statemaps
pub fn get_statemap_from_suffix_items<'a, T: ReplicatorSuffixItemTrait + 'a>(
messages: impl Iterator<Item = &'a SuffixItem<T>>,
) -> Vec<(u64, Vec<StatemapItem>)> {
messages.into_iter().fold(vec![], |mut acc, m| match m.item.get_statemap().as_ref() {
Some(sm_items) => {
let state_maps_to_append = sm_items.iter().map(|sm| {
let key = sm.keys().next().unwrap().to_string();
let payload = sm.get(&key).unwrap().clone();

StatemapItem {
action: key,
payload,
version: m.item_ver,
safepoint: *m.item.get_safepoint(),
}
});
acc.push((m.item_ver, state_maps_to_append.collect::<Vec<StatemapItem>>()));
acc
}
None => {
messages.into_iter().fold(vec![], |mut acc, m| {
// aborts
if m.item.get_safepoint().is_none() {
acc.push((m.item_ver, vec![]));
acc
return acc;
}

// commits
match m.item.get_statemap().as_ref() {
// when there is statemap
Some(sm_items) => {
let state_maps_to_append = sm_items.iter().map(|sm| {
let key = sm.keys().next().unwrap().to_string();
let payload = sm.get(&key).unwrap().clone();

StatemapItem {
action: key,
payload,
version: m.item_ver,
safepoint: *m.item.get_safepoint(),
}
});
acc.push((m.item_ver, state_maps_to_append.collect::<Vec<StatemapItem>>()));
acc
}
// when there is no statemap
None => {
// Empty statemap items are send for installs anyways to update the snapshot
acc.push((m.item_ver, vec![]));
acc
}
}
})
}

0 comments on commit 29cee52

Please sign in to comment.