From 8924829e7b19918ff6671812ddf9fdafdb3d967c Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Fri, 28 Oct 2022 10:12:29 -0700 Subject: [PATCH] extract 'unref_pubkeys' for shared code (#28653) --- runtime/src/accounts_db.rs | 70 +++++++++++++++++----------- runtime/src/accounts_index.rs | 7 +-- runtime/src/in_mem_accounts_index.rs | 12 ----- 3 files changed, 43 insertions(+), 46 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 3b2f442234..15faf8d4e4 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -3622,8 +3622,8 @@ impl AccountsDb { // Must be kept private!, does sensitive cleanup that should only be called from // supported pipelines in AccountsDb - // pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index - // and should not be unref'd. If they exist in the accounts index, they are NEW. + /// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index + /// and should not be unref'd. If they exist in the accounts index, they are NEW. fn process_dead_slots( &self, dead_slots: &HashSet, @@ -4334,11 +4334,11 @@ impl AccountsDb { ) { let unref = Self::get_keys_to_unref_ancient(accounts, existing_ancient_pubkeys); - self.thread_pool_clean.install(|| { - unref.into_par_iter().for_each(|key| { - self.accounts_index.unref_from_storage(key); - }); - }); + self.unref_pubkeys( + unref.iter().cloned(), + unref.len(), + &PubkeysRemovedFromAccountsIndex::default(), + ); } /// get the storages from 'slot' to squash @@ -8027,8 +8027,8 @@ impl AccountsDb { dead_slots } - // pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index - // and should not be unref'd. If they exist in the accounts index, they are NEW. + /// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index + /// and should not be unref'd. If they exist in the accounts index, they are NEW. fn remove_dead_slots_metadata<'a>( &'a self, dead_slots_iter: impl Iterator + Clone, @@ -8054,31 +8054,28 @@ impl AccountsDb { inc_new_counter_info!("remove_dead_slots_metadata-ms", measure.as_ms() as usize); } - /// lookup each pubkey in 'purged_slot_pubkeys' and unref it in the accounts index - /// populate 'purged_stored_account_slots' by grouping 'purged_slot_pubkeys' by pubkey - // pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index - // and should not be unref'd. If they exist in the accounts index, they are NEW. - fn unref_accounts( - &self, - purged_slot_pubkeys: HashSet<(Slot, Pubkey)>, - purged_stored_account_slots: &mut AccountSlots, - pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex, + /// lookup each pubkey in 'pubkeys' and unref it in the accounts index + /// skip pubkeys that are in 'pubkeys_removed_from_accounts_index' + fn unref_pubkeys<'a>( + &'a self, + pubkeys: impl Iterator + Clone + Send + Sync, + num_pubkeys: usize, + pubkeys_removed_from_accounts_index: &'a PubkeysRemovedFromAccountsIndex, ) { - let len = purged_slot_pubkeys.len(); - let batches = 1 + (len / UNREF_ACCOUNTS_BATCH_SIZE); + let batches = 1 + (num_pubkeys / UNREF_ACCOUNTS_BATCH_SIZE); self.thread_pool_clean.install(|| { (0..batches).into_par_iter().for_each(|batch| { let skip = batch * UNREF_ACCOUNTS_BATCH_SIZE; self.accounts_index.scan( - purged_slot_pubkeys - .iter() + pubkeys + .clone() .skip(skip) .take(UNREF_ACCOUNTS_BATCH_SIZE) - .filter_map(|(_slot, pubkey)| { + .filter(|pubkey| { // filter out pubkeys that have already been removed from the accounts index in a previous step let already_removed = pubkeys_removed_from_accounts_index.contains(pubkey); - (!already_removed).then_some(pubkey) + !already_removed }), |_pubkey, _slots_refs| { /* unused */ @@ -8088,6 +8085,23 @@ impl AccountsDb { ) }); }); + } + + /// lookup each pubkey in 'purged_slot_pubkeys' and unref it in the accounts index + /// populate 'purged_stored_account_slots' by grouping 'purged_slot_pubkeys' by pubkey + /// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index + /// and should not be unref'd. If they exist in the accounts index, they are NEW. + fn unref_accounts( + &self, + purged_slot_pubkeys: HashSet<(Slot, Pubkey)>, + purged_stored_account_slots: &mut AccountSlots, + pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex, + ) { + self.unref_pubkeys( + purged_slot_pubkeys.iter().map(|(_slot, pubkey)| pubkey), + purged_slot_pubkeys.len(), + pubkeys_removed_from_accounts_index, + ); for (slot, pubkey) in purged_slot_pubkeys { purged_stored_account_slots .entry(pubkey) @@ -8096,8 +8110,8 @@ impl AccountsDb { } } - // pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index - // and should not be unref'd. If they exist in the accounts index, they are NEW. + /// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index + /// and should not be unref'd. If they exist in the accounts index, they are NEW. fn clean_dead_slots_from_accounts_index<'a>( &'a self, dead_slots_iter: impl Iterator + Clone, @@ -8152,8 +8166,8 @@ impl AccountsDb { .update(&accounts_index_root_stats); } - // pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index - // and should not be unref'd. If they exist in the accounts index, they are NEW. + /// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index + /// and should not be unref'd. If they exist in the accounts index, they are NEW. fn clean_stored_dead_slots( &self, dead_slots: &HashSet, diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 287e707d23..d13cc072a7 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1358,7 +1358,7 @@ impl AccountsIndex { // if 'avoid_callback_result' is Some(_), then callback is NOT called // and _ is returned as if callback were called. F: FnMut(&'a Pubkey, Option<(&SlotList, RefCount)>) -> AccountsIndexScanResult, - I: IntoIterator, + I: Iterator, { let mut lock = None; let mut last_bin = self.bins(); // too big, won't match @@ -1668,11 +1668,6 @@ impl AccountsIndex { self.update_secondary_indexes(pubkey, account, account_indexes); } - pub fn unref_from_storage(&self, pubkey: &Pubkey) { - let map = self.get_bin(pubkey); - map.unref(pubkey) - } - pub fn ref_count_from_storage(&self, pubkey: &Pubkey) -> RefCount { let map = self.get_bin(pubkey); map.get_internal(pubkey, |entry| { diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index eb7136ffef..006f60fe79 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -8,7 +8,6 @@ use { bucket_map_holder_stats::BucketMapHolderStats, waitable_condvar::WaitableCondvar, }, - log::*, rand::{thread_rng, Rng}, solana_bucket_map::bucket_api::BucketApi, solana_measure::measure::Measure, @@ -437,17 +436,6 @@ impl InMemAccountsIndex { }) } - pub fn unref(&self, pubkey: &Pubkey) { - self.get_internal(pubkey, |entry| { - if let Some(entry) = entry { - if entry.unref() { - info!("refcount of item already at 0: {pubkey}"); - } - } - (true, ()) - }) - } - /// update 'entry' with 'new_value' fn update_slot_list_entry( &self,