Skip to content

Commit

Permalink
feat: enable deeper insights while filtering items to install (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred authored Aug 21, 2023
1 parent ea7c973 commit a447b57
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
use std::ops::ControlFlow;

use ahash::RandomState;
use indexmap::IndexMap;

use crate::core::{StatemapInstallState, StatemapInstallerHashmap};
use crate::{
core::{StatemapInstallState, StatemapInstallerHashmap},
utils::installer_utils::{is_queue_item_above_version, is_queue_item_serializable, is_queue_item_state_match},
};

#[derive(Debug, Default)]
pub struct DbgQueueFilterSummary<T> {
pub filter_enter_count: usize,
pub filter_exit_count: usize,
pub filter_reject_items: Vec<T>,
}
#[derive(Debug, Default)]
pub struct DbgQueueInstallItemsSummary<T> {
pub installable_items: Vec<T>,
pub filter_steps_insights: Vec<DbgQueueFilterSummary<T>>,
}

#[derive(Debug, Default)]
pub struct StatemapInstallerQueue {
Expand Down Expand Up @@ -36,41 +53,77 @@ impl StatemapInstallerQueue {

/// Filter items in queue based on the `StatemapInstallState`
pub fn filter_items_by_state(&self, state: StatemapInstallState) -> impl Iterator<Item = &StatemapInstallerHashmap> {
self.queue.values().filter(move |&x| x.state == state)
self.queue.values().filter(is_queue_item_state_match(state))
}

pub(crate) fn dbg_get_versions_to_install(&self) -> DbgQueueInstallItemsSummary<&StatemapInstallerHashmap> {
let mut intermediate_steps = vec![];

let items_awaiting: Vec<&StatemapInstallerHashmap> = self.queue.values().filter(is_queue_item_state_match(StatemapInstallState::Awaiting)).collect();

// Capture for debug the items entering and exiting
let filter_on_awaiting_criteria = DbgQueueFilterSummary::<&StatemapInstallerHashmap> {
filter_enter_count: self.queue.len(),
filter_exit_count: items_awaiting.len(),
filter_reject_items: vec![],
};

let vec1 = vec![];
let vec2 = vec![];

let mut closure_above_version = is_queue_item_above_version(&self.snapshot_version);
let x: ControlFlow<_, _> = items_awaiting.iter().try_fold((vec1, vec2), |mut acc, x| {
if closure_above_version(x) {
acc.0.push(*x);
ControlFlow::Continue(acc)
} else {
acc.1.push(*x);
ControlFlow::Break(acc)
}
});

let (items_safepoint_match, items_safepoint_fail) = match x {
ControlFlow::Continue(v) => v,
ControlFlow::Break(v) => v,
};

let filter_on_snapshot_criteria = DbgQueueFilterSummary::<&StatemapInstallerHashmap> {
filter_enter_count: items_awaiting.len(),
filter_exit_count: items_safepoint_match.len(),
filter_reject_items: items_safepoint_fail,
};

let (final_items, items_non_serializable): (Vec<&StatemapInstallerHashmap>, Vec<&StatemapInstallerHashmap>) =
items_safepoint_match.into_iter().partition(is_queue_item_serializable(&self.queue));

let filter_on_serialization_criteria = DbgQueueFilterSummary::<&StatemapInstallerHashmap> {
filter_enter_count: filter_on_snapshot_criteria.filter_exit_count,
filter_exit_count: final_items.len(),
filter_reject_items: items_non_serializable,
};

intermediate_steps.push(filter_on_awaiting_criteria);
intermediate_steps.push(filter_on_snapshot_criteria);
intermediate_steps.push(filter_on_serialization_criteria);

DbgQueueInstallItemsSummary {
installable_items: final_items,
filter_steps_insights: intermediate_steps,
}
}

pub fn get_versions_to_install(&self) -> Vec<u64> {
self
// Get items in awaiting
.filter_items_by_state(StatemapInstallState::Awaiting)
self.queue
.values()
// filter items in `Awaiting` state
.filter(is_queue_item_state_match(StatemapInstallState::Awaiting))
// Get items whose safepoint is below the snapshot.
.take_while(|v| {
// If no safepoint, this could be a abort item and is safe to install as statemap will be empty.
let Some(safepoint) = v.safepoint else {
return true;
};

self.snapshot_version >= safepoint
})
// filter out the ones that can't be serialized
.filter_map(|v| {
// If no safepoint, this could be a abort item and is safe to install as statemap will be empty.
let Some(safepoint) = v.safepoint else {
return Some(v.version);
};

// If there is no version matching the safepoint, then it is safe to install
let Some(safepoint_pointing_item) = self.queue.get(&safepoint) else {
return Some(v.version);
};
if safepoint_pointing_item.state == StatemapInstallState::Installed {
return Some(v.version);
};
// error!("[items_to_install] Not picking {} as safepoint={safepoint} criteria failed against={:?}", v.version, statemap_queue.get(&safepoint));

None
})
// take the remaining we can install
.take_while(is_queue_item_above_version(&self.snapshot_version))
// filter items safe to be serialized
.filter(is_queue_item_serializable(&self.queue))
// map the version
.map(|x| x.version)
// collect the iterator of versions into a vec
.collect::<Vec<u64>>()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ where
}

let items_to_install: Vec<u64> = statemap_installer_queue.get_versions_to_install();

// Sends for installation.
for key in items_to_install {
// Send for installation
Expand Down Expand Up @@ -140,6 +141,45 @@ where
current snapshot: {}
\n ", statemap_installer_queue.snapshot_version);
// last vers send to install : {last_item_send_for_install}

if config.enable_stats && awaiting_count > 0 && inflight_count == 0
{
let result = statemap_installer_queue.dbg_get_versions_to_install();
let final_items = result.installable_items;
let criteria_awaiting_state = &result.filter_steps_insights[0];
let criteria_snapshot_check = &result.filter_steps_insights[1];
let criteria_seriazable_check = &result.filter_steps_insights[2];

if let Some(first_item_to_fail_safepoint_check) = criteria_snapshot_check.filter_reject_items.first() {
error!("\n\n
+----------+-----------------------------------+----------------------------+
| Total | Items in queue | {}
+----------+-----------------------------------+----------------------------+
| Filter 1 | Items in AWAITING state | {}
| Filter 2 | Items whose safepoint <= snapshot | {}
| | | Current snapshot={}
| |-----------------------------------+----------------------------+
| | > First item to fail this check | Version={}
| | | Safepoint={:?}
| |-----------------------------------+----------------------------+
| Filter 3 | Items serializable in the batch | {}
| | > Non serializable items count | {}
+----------+-----------------------------------+------+---------------------+
| Total | Items ready to install | {}
+----------+-----------------------------------+----------------------------+
\n\n",
criteria_awaiting_state.filter_enter_count,
criteria_awaiting_state.filter_exit_count,
criteria_snapshot_check.filter_exit_count,
statemap_installer_queue.snapshot_version,
first_item_to_fail_safepoint_check.version,
first_item_to_fail_safepoint_check.safepoint,
criteria_seriazable_check.filter_exit_count,
criteria_seriazable_check.filter_reject_items.len(),
final_items.len(),
);
}
}
}

statemap_installer_queue.remove_installed();
Expand Down
32 changes: 32 additions & 0 deletions packages/talos_cohort_replicator/src/utils/installer_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use ahash::RandomState;
use indexmap::IndexMap;

use crate::core::{StatemapInstallState, StatemapInstallerHashmap};

pub fn is_queue_item_state_match(state: StatemapInstallState) -> impl FnMut(&&StatemapInstallerHashmap) -> bool {
move |x| x.state == state
}
pub fn is_queue_item_above_version<'a>(version: &'a u64) -> impl FnMut(&&'a StatemapInstallerHashmap) -> bool {
move |x| {
// If no safepoint, this could be a abort item and is safe to install as statemap will be empty.
let Some(safepoint) = x.safepoint else {
return true;
};

version >= &safepoint
}
}
pub fn is_queue_item_serializable<'a>(queue: &'a IndexMap<u64, StatemapInstallerHashmap, RandomState>) -> impl FnMut(&&'a StatemapInstallerHashmap) -> bool {
move |x| {
// If no safepoint, this could be a abort item and is safe to install as statemap will be empty.
let Some(safepoint) = x.safepoint else {
return true;
};

// If there is no version matching the safepoint, then it is safe to install
let Some(safepoint_pointing_item) = queue.get(&safepoint) else {
return true;
};
safepoint_pointing_item.state == StatemapInstallState::Installed
}
}
2 changes: 1 addition & 1 deletion packages/talos_cohort_replicator/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pub(crate) mod installer_utils;
mod replicator_utils;

pub use replicator_utils::*;

0 comments on commit a447b57

Please sign in to comment.