diff --git a/packages/talos_cohort_replicator/src/models/statemap_installer_queue.rs b/packages/talos_cohort_replicator/src/models/statemap_installer_queue.rs index b2b87f77..0277c8fa 100644 --- a/packages/talos_cohort_replicator/src/models/statemap_installer_queue.rs +++ b/packages/talos_cohort_replicator/src/models/statemap_installer_queue.rs @@ -1,7 +1,24 @@ +use std::ops::ControlFlow; + use ahash::RandomState; use indexmap::IndexMap; -use crate::core::{StatemapInstallState, StatemapInstallerHashmap}; +use crate::{ + core::{StatemapInstallState, StatemapInstallerHashmap}, + utils::installer_utils::{is_queue_item_above_version, is_queue_item_serializable, is_queue_item_state_match}, +}; + +#[derive(Debug, Default)] +pub struct DbgQueueFilterSummary { + pub filter_enter_count: usize, + pub filter_exit_count: usize, + pub filter_reject_items: Vec, +} +#[derive(Debug, Default)] +pub struct DbgQueueInstallItemsSummary { + pub installable_items: Vec, + pub filter_steps_insights: Vec>, +} #[derive(Debug, Default)] pub struct StatemapInstallerQueue { @@ -36,41 +53,77 @@ impl StatemapInstallerQueue { /// Filter items in queue based on the `StatemapInstallState` pub fn filter_items_by_state(&self, state: StatemapInstallState) -> impl Iterator { - self.queue.values().filter(move |&x| x.state == state) + self.queue.values().filter(is_queue_item_state_match(state)) + } + + pub(crate) fn dbg_get_versions_to_install(&self) -> DbgQueueInstallItemsSummary<&StatemapInstallerHashmap> { + let mut intermediate_steps = vec![]; + + let items_awaiting: Vec<&StatemapInstallerHashmap> = self.queue.values().filter(is_queue_item_state_match(StatemapInstallState::Awaiting)).collect(); + + // Capture for debug the items entering and exiting + let filter_on_awaiting_criteria = DbgQueueFilterSummary::<&StatemapInstallerHashmap> { + filter_enter_count: self.queue.len(), + filter_exit_count: items_awaiting.len(), + filter_reject_items: vec![], + }; + + let vec1 = vec![]; + let vec2 = vec![]; + + let mut closure_above_version = is_queue_item_above_version(&self.snapshot_version); + let x: ControlFlow<_, _> = items_awaiting.iter().try_fold((vec1, vec2), |mut acc, x| { + if closure_above_version(x) { + acc.0.push(*x); + ControlFlow::Continue(acc) + } else { + acc.1.push(*x); + ControlFlow::Break(acc) + } + }); + + let (items_safepoint_match, items_safepoint_fail) = match x { + ControlFlow::Continue(v) => v, + ControlFlow::Break(v) => v, + }; + + let filter_on_snapshot_criteria = DbgQueueFilterSummary::<&StatemapInstallerHashmap> { + filter_enter_count: items_awaiting.len(), + filter_exit_count: items_safepoint_match.len(), + filter_reject_items: items_safepoint_fail, + }; + + let (final_items, items_non_serializable): (Vec<&StatemapInstallerHashmap>, Vec<&StatemapInstallerHashmap>) = + items_safepoint_match.into_iter().partition(is_queue_item_serializable(&self.queue)); + + let filter_on_serialization_criteria = DbgQueueFilterSummary::<&StatemapInstallerHashmap> { + filter_enter_count: filter_on_snapshot_criteria.filter_exit_count, + filter_exit_count: final_items.len(), + filter_reject_items: items_non_serializable, + }; + + intermediate_steps.push(filter_on_awaiting_criteria); + intermediate_steps.push(filter_on_snapshot_criteria); + intermediate_steps.push(filter_on_serialization_criteria); + + DbgQueueInstallItemsSummary { + installable_items: final_items, + filter_steps_insights: intermediate_steps, + } } pub fn get_versions_to_install(&self) -> Vec { - self - // Get items in awaiting - .filter_items_by_state(StatemapInstallState::Awaiting) + self.queue + .values() + // filter items in `Awaiting` state + .filter(is_queue_item_state_match(StatemapInstallState::Awaiting)) // Get items whose safepoint is below the snapshot. - .take_while(|v| { - // If no safepoint, this could be a abort item and is safe to install as statemap will be empty. - let Some(safepoint) = v.safepoint else { - return true; - }; - - self.snapshot_version >= safepoint - }) - // filter out the ones that can't be serialized - .filter_map(|v| { - // If no safepoint, this could be a abort item and is safe to install as statemap will be empty. - let Some(safepoint) = v.safepoint else { - return Some(v.version); - }; - - // If there is no version matching the safepoint, then it is safe to install - let Some(safepoint_pointing_item) = self.queue.get(&safepoint) else { - return Some(v.version); - }; - if safepoint_pointing_item.state == StatemapInstallState::Installed { - return Some(v.version); - }; - // error!("[items_to_install] Not picking {} as safepoint={safepoint} criteria failed against={:?}", v.version, statemap_queue.get(&safepoint)); - - None - }) - // take the remaining we can install + .take_while(is_queue_item_above_version(&self.snapshot_version)) + // filter items safe to be serialized + .filter(is_queue_item_serializable(&self.queue)) + // map the version + .map(|x| x.version) + // collect the iterator of versions into a vec .collect::>() } } diff --git a/packages/talos_cohort_replicator/src/services/statemap_queue_service.rs b/packages/talos_cohort_replicator/src/services/statemap_queue_service.rs index b40036f8..751be7b2 100644 --- a/packages/talos_cohort_replicator/src/services/statemap_queue_service.rs +++ b/packages/talos_cohort_replicator/src/services/statemap_queue_service.rs @@ -110,6 +110,7 @@ where } let items_to_install: Vec = statemap_installer_queue.get_versions_to_install(); + // Sends for installation. for key in items_to_install { // Send for installation @@ -140,6 +141,45 @@ where current snapshot: {} \n ", statemap_installer_queue.snapshot_version); // last vers send to install : {last_item_send_for_install} + + if config.enable_stats && awaiting_count > 0 && inflight_count == 0 + { + let result = statemap_installer_queue.dbg_get_versions_to_install(); + let final_items = result.installable_items; + let criteria_awaiting_state = &result.filter_steps_insights[0]; + let criteria_snapshot_check = &result.filter_steps_insights[1]; + let criteria_seriazable_check = &result.filter_steps_insights[2]; + + if let Some(first_item_to_fail_safepoint_check) = criteria_snapshot_check.filter_reject_items.first() { + error!("\n\n + +----------+-----------------------------------+----------------------------+ + | Total | Items in queue | {} + +----------+-----------------------------------+----------------------------+ + | Filter 1 | Items in AWAITING state | {} + | Filter 2 | Items whose safepoint <= snapshot | {} + | | | Current snapshot={} + | |-----------------------------------+----------------------------+ + | | > First item to fail this check | Version={} + | | | Safepoint={:?} + | |-----------------------------------+----------------------------+ + | Filter 3 | Items serializable in the batch | {} + | | > Non serializable items count | {} + +----------+-----------------------------------+------+---------------------+ + | Total | Items ready to install | {} + +----------+-----------------------------------+----------------------------+ + \n\n", + criteria_awaiting_state.filter_enter_count, + criteria_awaiting_state.filter_exit_count, + criteria_snapshot_check.filter_exit_count, + statemap_installer_queue.snapshot_version, + first_item_to_fail_safepoint_check.version, + first_item_to_fail_safepoint_check.safepoint, + criteria_seriazable_check.filter_exit_count, + criteria_seriazable_check.filter_reject_items.len(), + final_items.len(), + ); + } + } } statemap_installer_queue.remove_installed(); diff --git a/packages/talos_cohort_replicator/src/utils/installer_utils.rs b/packages/talos_cohort_replicator/src/utils/installer_utils.rs new file mode 100644 index 00000000..ac84661c --- /dev/null +++ b/packages/talos_cohort_replicator/src/utils/installer_utils.rs @@ -0,0 +1,32 @@ +use ahash::RandomState; +use indexmap::IndexMap; + +use crate::core::{StatemapInstallState, StatemapInstallerHashmap}; + +pub fn is_queue_item_state_match(state: StatemapInstallState) -> impl FnMut(&&StatemapInstallerHashmap) -> bool { + move |x| x.state == state +} +pub fn is_queue_item_above_version<'a>(version: &'a u64) -> impl FnMut(&&'a StatemapInstallerHashmap) -> bool { + move |x| { + // If no safepoint, this could be a abort item and is safe to install as statemap will be empty. + let Some(safepoint) = x.safepoint else { + return true; + }; + + version >= &safepoint + } +} +pub fn is_queue_item_serializable<'a>(queue: &'a IndexMap) -> impl FnMut(&&'a StatemapInstallerHashmap) -> bool { + move |x| { + // If no safepoint, this could be a abort item and is safe to install as statemap will be empty. + let Some(safepoint) = x.safepoint else { + return true; + }; + + // If there is no version matching the safepoint, then it is safe to install + let Some(safepoint_pointing_item) = queue.get(&safepoint) else { + return true; + }; + safepoint_pointing_item.state == StatemapInstallState::Installed + } +} diff --git a/packages/talos_cohort_replicator/src/utils/mod.rs b/packages/talos_cohort_replicator/src/utils/mod.rs index 95f68998..2767544a 100644 --- a/packages/talos_cohort_replicator/src/utils/mod.rs +++ b/packages/talos_cohort_replicator/src/utils/mod.rs @@ -1,3 +1,3 @@ +pub(crate) mod installer_utils; mod replicator_utils; - pub use replicator_utils::*;