extract 'unref_pubkeys' for shared code (#28653)
This commit is contained in:
parent
0232944c95
commit
8924829e7b
|
@ -3622,8 +3622,8 @@ impl AccountsDb {
|
||||||
|
|
||||||
// Must be kept private!, does sensitive cleanup that should only be called from
|
// Must be kept private!, does sensitive cleanup that should only be called from
|
||||||
// supported pipelines in AccountsDb
|
// supported pipelines in AccountsDb
|
||||||
// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
/// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
||||||
// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
/// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
||||||
fn process_dead_slots(
|
fn process_dead_slots(
|
||||||
&self,
|
&self,
|
||||||
dead_slots: &HashSet<Slot>,
|
dead_slots: &HashSet<Slot>,
|
||||||
|
@ -4334,11 +4334,11 @@ impl AccountsDb {
|
||||||
) {
|
) {
|
||||||
let unref = Self::get_keys_to_unref_ancient(accounts, existing_ancient_pubkeys);
|
let unref = Self::get_keys_to_unref_ancient(accounts, existing_ancient_pubkeys);
|
||||||
|
|
||||||
self.thread_pool_clean.install(|| {
|
self.unref_pubkeys(
|
||||||
unref.into_par_iter().for_each(|key| {
|
unref.iter().cloned(),
|
||||||
self.accounts_index.unref_from_storage(key);
|
unref.len(),
|
||||||
});
|
&PubkeysRemovedFromAccountsIndex::default(),
|
||||||
});
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// get the storages from 'slot' to squash
|
/// get the storages from 'slot' to squash
|
||||||
|
@ -8027,8 +8027,8 @@ impl AccountsDb {
|
||||||
dead_slots
|
dead_slots
|
||||||
}
|
}
|
||||||
|
|
||||||
// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
/// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
||||||
// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
/// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
||||||
fn remove_dead_slots_metadata<'a>(
|
fn remove_dead_slots_metadata<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
dead_slots_iter: impl Iterator<Item = &'a Slot> + Clone,
|
dead_slots_iter: impl Iterator<Item = &'a Slot> + Clone,
|
||||||
|
@ -8054,31 +8054,28 @@ impl AccountsDb {
|
||||||
inc_new_counter_info!("remove_dead_slots_metadata-ms", measure.as_ms() as usize);
|
inc_new_counter_info!("remove_dead_slots_metadata-ms", measure.as_ms() as usize);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// lookup each pubkey in 'purged_slot_pubkeys' and unref it in the accounts index
|
/// lookup each pubkey in 'pubkeys' and unref it in the accounts index
|
||||||
/// populate 'purged_stored_account_slots' by grouping 'purged_slot_pubkeys' by pubkey
|
/// skip pubkeys that are in 'pubkeys_removed_from_accounts_index'
|
||||||
// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
fn unref_pubkeys<'a>(
|
||||||
// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
&'a self,
|
||||||
fn unref_accounts(
|
pubkeys: impl Iterator<Item = &'a Pubkey> + Clone + Send + Sync,
|
||||||
&self,
|
num_pubkeys: usize,
|
||||||
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
|
pubkeys_removed_from_accounts_index: &'a PubkeysRemovedFromAccountsIndex,
|
||||||
purged_stored_account_slots: &mut AccountSlots,
|
|
||||||
pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
|
|
||||||
) {
|
) {
|
||||||
let len = purged_slot_pubkeys.len();
|
let batches = 1 + (num_pubkeys / UNREF_ACCOUNTS_BATCH_SIZE);
|
||||||
let batches = 1 + (len / UNREF_ACCOUNTS_BATCH_SIZE);
|
|
||||||
self.thread_pool_clean.install(|| {
|
self.thread_pool_clean.install(|| {
|
||||||
(0..batches).into_par_iter().for_each(|batch| {
|
(0..batches).into_par_iter().for_each(|batch| {
|
||||||
let skip = batch * UNREF_ACCOUNTS_BATCH_SIZE;
|
let skip = batch * UNREF_ACCOUNTS_BATCH_SIZE;
|
||||||
self.accounts_index.scan(
|
self.accounts_index.scan(
|
||||||
purged_slot_pubkeys
|
pubkeys
|
||||||
.iter()
|
.clone()
|
||||||
.skip(skip)
|
.skip(skip)
|
||||||
.take(UNREF_ACCOUNTS_BATCH_SIZE)
|
.take(UNREF_ACCOUNTS_BATCH_SIZE)
|
||||||
.filter_map(|(_slot, pubkey)| {
|
.filter(|pubkey| {
|
||||||
// filter out pubkeys that have already been removed from the accounts index in a previous step
|
// filter out pubkeys that have already been removed from the accounts index in a previous step
|
||||||
let already_removed =
|
let already_removed =
|
||||||
pubkeys_removed_from_accounts_index.contains(pubkey);
|
pubkeys_removed_from_accounts_index.contains(pubkey);
|
||||||
(!already_removed).then_some(pubkey)
|
!already_removed
|
||||||
}),
|
}),
|
||||||
|_pubkey, _slots_refs| {
|
|_pubkey, _slots_refs| {
|
||||||
/* unused */
|
/* unused */
|
||||||
|
@ -8088,6 +8085,23 @@ impl AccountsDb {
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// lookup each pubkey in 'purged_slot_pubkeys' and unref it in the accounts index
|
||||||
|
/// populate 'purged_stored_account_slots' by grouping 'purged_slot_pubkeys' by pubkey
|
||||||
|
/// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
||||||
|
/// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
||||||
|
fn unref_accounts(
|
||||||
|
&self,
|
||||||
|
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
|
||||||
|
purged_stored_account_slots: &mut AccountSlots,
|
||||||
|
pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
|
||||||
|
) {
|
||||||
|
self.unref_pubkeys(
|
||||||
|
purged_slot_pubkeys.iter().map(|(_slot, pubkey)| pubkey),
|
||||||
|
purged_slot_pubkeys.len(),
|
||||||
|
pubkeys_removed_from_accounts_index,
|
||||||
|
);
|
||||||
for (slot, pubkey) in purged_slot_pubkeys {
|
for (slot, pubkey) in purged_slot_pubkeys {
|
||||||
purged_stored_account_slots
|
purged_stored_account_slots
|
||||||
.entry(pubkey)
|
.entry(pubkey)
|
||||||
|
@ -8096,8 +8110,8 @@ impl AccountsDb {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
/// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
||||||
// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
/// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
||||||
fn clean_dead_slots_from_accounts_index<'a>(
|
fn clean_dead_slots_from_accounts_index<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
dead_slots_iter: impl Iterator<Item = &'a Slot> + Clone,
|
dead_slots_iter: impl Iterator<Item = &'a Slot> + Clone,
|
||||||
|
@ -8152,8 +8166,8 @@ impl AccountsDb {
|
||||||
.update(&accounts_index_root_stats);
|
.update(&accounts_index_root_stats);
|
||||||
}
|
}
|
||||||
|
|
||||||
// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
/// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
||||||
// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
/// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
||||||
fn clean_stored_dead_slots(
|
fn clean_stored_dead_slots(
|
||||||
&self,
|
&self,
|
||||||
dead_slots: &HashSet<Slot>,
|
dead_slots: &HashSet<Slot>,
|
||||||
|
|
|
@ -1358,7 +1358,7 @@ impl<T: IndexValue> AccountsIndex<T> {
|
||||||
// if 'avoid_callback_result' is Some(_), then callback is NOT called
|
// if 'avoid_callback_result' is Some(_), then callback is NOT called
|
||||||
// and _ is returned as if callback were called.
|
// and _ is returned as if callback were called.
|
||||||
F: FnMut(&'a Pubkey, Option<(&SlotList<T>, RefCount)>) -> AccountsIndexScanResult,
|
F: FnMut(&'a Pubkey, Option<(&SlotList<T>, RefCount)>) -> AccountsIndexScanResult,
|
||||||
I: IntoIterator<Item = &'a Pubkey>,
|
I: Iterator<Item = &'a Pubkey>,
|
||||||
{
|
{
|
||||||
let mut lock = None;
|
let mut lock = None;
|
||||||
let mut last_bin = self.bins(); // too big, won't match
|
let mut last_bin = self.bins(); // too big, won't match
|
||||||
|
@ -1668,11 +1668,6 @@ impl<T: IndexValue> AccountsIndex<T> {
|
||||||
self.update_secondary_indexes(pubkey, account, account_indexes);
|
self.update_secondary_indexes(pubkey, account, account_indexes);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn unref_from_storage(&self, pubkey: &Pubkey) {
|
|
||||||
let map = self.get_bin(pubkey);
|
|
||||||
map.unref(pubkey)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn ref_count_from_storage(&self, pubkey: &Pubkey) -> RefCount {
|
pub fn ref_count_from_storage(&self, pubkey: &Pubkey) -> RefCount {
|
||||||
let map = self.get_bin(pubkey);
|
let map = self.get_bin(pubkey);
|
||||||
map.get_internal(pubkey, |entry| {
|
map.get_internal(pubkey, |entry| {
|
||||||
|
|
|
@ -8,7 +8,6 @@ use {
|
||||||
bucket_map_holder_stats::BucketMapHolderStats,
|
bucket_map_holder_stats::BucketMapHolderStats,
|
||||||
waitable_condvar::WaitableCondvar,
|
waitable_condvar::WaitableCondvar,
|
||||||
},
|
},
|
||||||
log::*,
|
|
||||||
rand::{thread_rng, Rng},
|
rand::{thread_rng, Rng},
|
||||||
solana_bucket_map::bucket_api::BucketApi,
|
solana_bucket_map::bucket_api::BucketApi,
|
||||||
solana_measure::measure::Measure,
|
solana_measure::measure::Measure,
|
||||||
|
@ -437,17 +436,6 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn unref(&self, pubkey: &Pubkey) {
|
|
||||||
self.get_internal(pubkey, |entry| {
|
|
||||||
if let Some(entry) = entry {
|
|
||||||
if entry.unref() {
|
|
||||||
info!("refcount of item already at 0: {pubkey}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(true, ())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// update 'entry' with 'new_value'
|
/// update 'entry' with 'new_value'
|
||||||
fn update_slot_list_entry(
|
fn update_slot_list_entry(
|
||||||
&self,
|
&self,
|
||||||
|
|
Loading…
Reference in New Issue