diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 367500156d2b18..758a3dfb143b08 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -3069,32 +3069,24 @@ impl AccountsDb { .collect() } - /// Remove `slots` from `uncleaned_pubkeys` and collect all pubkeys - /// - /// For each slot in the list of uncleaned slots, remove it from the `uncleaned_pubkeys` Map - /// and collect all the pubkeys to return. - fn remove_uncleaned_slots_and_collect_pubkeys( - &self, - uncleaned_slots: Vec, - ) -> Vec> { - uncleaned_slots - .into_iter() - .filter_map(|uncleaned_slot| { - self.uncleaned_pubkeys - .remove(&uncleaned_slot) - .map(|(_removed_slot, removed_pubkeys)| removed_pubkeys) - }) - .collect() - } - - /// Remove uncleaned slots, up to a maximum slot, and return the collected pubkeys - /// - fn remove_uncleaned_slots_and_collect_pubkeys_up_to_slot( + /// For each slot in the list of uncleaned slots, up to a maximum + /// slot, remove it from the `uncleaned_pubkeys` and move all the + /// pubkeys to `candidates` for cleaning. + fn remove_uncleaned_slots_up_to_slot_and_move_pubkeys( &self, max_slot_inclusive: Slot, - ) -> Vec> { + candidates: &[RwLock>], + ) { let uncleaned_slots = self.collect_uncleaned_slots_up_to_slot(max_slot_inclusive); - self.remove_uncleaned_slots_and_collect_pubkeys(uncleaned_slots) + for uncleaned_slot in uncleaned_slots.into_iter() { + if let Some((_removed_slot, removed_pubkeys)) = + self.uncleaned_pubkeys.remove(&uncleaned_slot) + { + removed_pubkeys + .iter() + .for_each(|pubkey| self.insert_pubkey(candidates, pubkey)) + } + } } fn count_pubkeys(candidates: &[RwLock>]) -> u64 { @@ -3104,10 +3096,19 @@ impl AccountsDb { .sum::() as u64 } - /// Construct a vec of pubkeys for cleaning from: - /// uncleaned_pubkeys - the delta set of updated pubkeys in rooted slots from the last clean - /// dirty_stores - set of stores which had accounts removed or recently rooted - /// returns the minimum slot we encountered + fn insert_pubkey(&self, candidates: &[RwLock>], pubkey: &Pubkey) { + let index = self.accounts_index.bin_calculator.bin_from_pubkey(pubkey); + let mut candidates_bin = candidates[index].write().unwrap(); + candidates_bin.insert(*pubkey, CleaningInfo::default()); + } + + /// Construct a list of candidates for cleaning from: + /// - dirty_stores -- set of stores which had accounts + /// removed or recently rooted; + /// - uncleaned_pubkeys -- the delta set of updated pubkeys in + /// rooted slots from the last clean. + /// + /// The function also returns the minimum slot we encountered. fn construct_candidate_clean_keys( &self, max_clean_root_inclusive: Option, @@ -3138,12 +3139,6 @@ impl AccountsDb { std::iter::repeat_with(|| RwLock::new(HashMap::::new())) .take(num_bins) .collect(); - - let insert_pubkey = |pubkey: &Pubkey| { - let index = self.accounts_index.bin_calculator.bin_from_pubkey(pubkey); - let mut candidates_bin = candidates[index].write().unwrap(); - candidates_bin.insert(*pubkey, CleaningInfo::default()); - }; let dirty_ancient_stores = AtomicUsize::default(); let mut dirty_store_routine = || { let chunk_size = 1.max(dirty_stores_len.saturating_div(rayon::current_num_threads())); @@ -3156,7 +3151,9 @@ impl AccountsDb { dirty_ancient_stores.fetch_add(1, Ordering::Relaxed); } oldest_dirty_slot = oldest_dirty_slot.min(*slot); - store.accounts.scan_pubkeys(insert_pubkey); + store + .accounts + .scan_pubkeys(|pubkey| self.insert_pubkey(&candidates, pubkey)); }); oldest_dirty_slot }) @@ -3186,25 +3183,14 @@ impl AccountsDb { timings.dirty_ancient_stores = dirty_ancient_stores.load(Ordering::Relaxed); let mut collect_delta_keys = Measure::start("key_create"); - let delta_keys = - self.remove_uncleaned_slots_and_collect_pubkeys_up_to_slot(max_slot_inclusive); + self.remove_uncleaned_slots_up_to_slot_and_move_pubkeys(max_slot_inclusive, &candidates); collect_delta_keys.stop(); timings.collect_delta_keys_us += collect_delta_keys.as_us(); - let mut delta_insert = Measure::start("delta_insert"); - self.thread_pool_clean.install(|| { - delta_keys.par_iter().for_each(|keys| { - for key in keys { - insert_pubkey(key); - } - }); - }); - delta_insert.stop(); - timings.delta_insert_us += delta_insert.as_us(); - timings.delta_key_count = Self::count_pubkeys(&candidates); - // Check if we should purge any of the zero_lamport_accounts_to_purge_later, based on the + // Check if we should purge any of the + // zero_lamport_accounts_to_purge_later, based on the // latest_full_snapshot_slot. let latest_full_snapshot_slot = self.latest_full_snapshot_slot(); assert!( @@ -3217,7 +3203,7 @@ impl AccountsDb { let is_candidate_for_clean = max_slot_inclusive >= *slot && latest_full_snapshot_slot >= *slot; if is_candidate_for_clean { - insert_pubkey(pubkey); + self.insert_pubkey(&candidates, pubkey); } !is_candidate_for_clean }); @@ -14810,64 +14796,6 @@ pub mod tests { assert_eq!(uncleaned_slots3, [slot1, slot2, slot3]); } - #[test] - fn test_remove_uncleaned_slots_and_collect_pubkeys() { - solana_logger::setup(); - let db = AccountsDb::new_single_for_tests(); - - let slot1 = 11; - let slot2 = 222; - let slot3 = 3333; - - let pubkey1 = Pubkey::new_unique(); - let pubkey2 = Pubkey::new_unique(); - let pubkey3 = Pubkey::new_unique(); - - let account1 = AccountSharedData::new(0, 0, &pubkey1); - let account2 = AccountSharedData::new(0, 0, &pubkey2); - let account3 = AccountSharedData::new(0, 0, &pubkey3); - - db.store_for_tests(slot1, &[(&pubkey1, &account1)]); - db.store_for_tests(slot2, &[(&pubkey2, &account2)]); - db.store_for_tests(slot3, &[(&pubkey3, &account3)]); - - db.add_root(slot1); - // slot 2 is _not_ a root on purpose - db.add_root(slot3); - - db.uncleaned_pubkeys.insert(slot1, vec![pubkey1]); - db.uncleaned_pubkeys.insert(slot2, vec![pubkey2]); - db.uncleaned_pubkeys.insert(slot3, vec![pubkey3]); - - let uncleaned_pubkeys1 = db - .remove_uncleaned_slots_and_collect_pubkeys(vec![slot1]) - .into_iter() - .flatten() - .collect::>(); - let uncleaned_pubkeys2 = db - .remove_uncleaned_slots_and_collect_pubkeys(vec![slot2]) - .into_iter() - .flatten() - .collect::>(); - let uncleaned_pubkeys3 = db - .remove_uncleaned_slots_and_collect_pubkeys(vec![slot3]) - .into_iter() - .flatten() - .collect::>(); - - assert!(uncleaned_pubkeys1.contains(&pubkey1)); - assert!(!uncleaned_pubkeys1.contains(&pubkey2)); - assert!(!uncleaned_pubkeys1.contains(&pubkey3)); - - assert!(!uncleaned_pubkeys2.contains(&pubkey1)); - assert!(uncleaned_pubkeys2.contains(&pubkey2)); - assert!(!uncleaned_pubkeys2.contains(&pubkey3)); - - assert!(!uncleaned_pubkeys3.contains(&pubkey1)); - assert!(!uncleaned_pubkeys3.contains(&pubkey2)); - assert!(uncleaned_pubkeys3.contains(&pubkey3)); - } - #[test] fn test_remove_uncleaned_slots_and_collect_pubkeys_up_to_slot() { solana_logger::setup(); @@ -14897,15 +14825,21 @@ pub mod tests { db.uncleaned_pubkeys.insert(slot2, vec![pubkey2]); db.uncleaned_pubkeys.insert(slot3, vec![pubkey3]); - let uncleaned_pubkeys = db - .remove_uncleaned_slots_and_collect_pubkeys_up_to_slot(slot3) - .into_iter() - .flatten() - .collect::>(); + let num_bins = db.accounts_index.bins(); + let candidates: Box<_> = + std::iter::repeat_with(|| RwLock::new(HashMap::::new())) + .take(num_bins) + .collect(); + db.remove_uncleaned_slots_up_to_slot_and_move_pubkeys(slot3, &candidates); - assert!(uncleaned_pubkeys.contains(&pubkey1)); - assert!(uncleaned_pubkeys.contains(&pubkey2)); - assert!(uncleaned_pubkeys.contains(&pubkey3)); + let candidates_contain = |pubkey: &Pubkey| { + candidates + .iter() + .any(|bin| bin.read().unwrap().contains(pubkey)) + }; + assert!(candidates_contain(&pubkey1)); + assert!(candidates_contain(&pubkey2)); + assert!(candidates_contain(&pubkey3)); } #[test]