improve performance of unrefing ancient pubkeys (#27485)

This commit is contained in:
Jeff Washington (jwash) 2022-09-01 07:34:52 -07:00 committed by GitHub
parent e7689f7961
commit 874085aadc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 38 additions and 17 deletions

View File

@ -3724,23 +3724,30 @@ impl AccountsDb {
})
}
/// 'accounts' are about to be appended to an ancient append vec. That ancient append vec may already have some accounts.
/// unref each account in 'accounts' that already exists in 'ancient_store'
/// as a side effect, on exit, 'existing_ancient_pubkeys' will contain all pubkeys in 'accounts'.
fn unref_accounts_already_in_storage(
&self,
accounts: &[(&Pubkey, &StoredAccountMeta<'_>, u64)],
ancient_store: &Arc<AccountStorageEntry>,
existing_ancient_pubkeys: &mut HashSet<Pubkey>,
) {
// make a hashset of all keys we're about to add to this storage
let mut accounts_to_add = accounts.iter().map(|entry| entry.0).collect::<HashSet<_>>();
let mut unref = HashSet::<&Pubkey>::default();
// for each key that we're about to add that already exists in this storage, we need to unref. The account was in a different storage.
// Now it is being put into an ancient storage, but it is already there, so maintain max of 1 ref per storage in the accounts index.
ancient_store.accounts.account_iter().for_each(|account| {
// remove here is so we don't unref the same key more than once in this loop if it is in the existing storage 2 times already
let key = &account.meta.pubkey;
if accounts_to_add.remove(key) {
self.accounts_index.unref_from_storage(key);
// Now it is being put into an ancient storage again, but it is already there, so maintain max of 1 ref per storage in the accounts index.
// The slot that currently references the account is going away, so unref to maintain # slots that reference the pubkey = refcount.
accounts.iter().for_each(|(key, _, _)| {
if !existing_ancient_pubkeys.insert(**key) {
// this key exists BOTH in 'accounts' and already in the ancient append vec, so we need to unref it
unref.insert(*key);
}
})
});
self.thread_pool_clean.install(|| {
unref.into_par_iter().for_each(|key| {
self.accounts_index.unref_from_storage(key);
});
});
}
/// helper function to cleanup call to 'store_accounts_frozen'
@ -3750,14 +3757,9 @@ impl AccountsDb {
ancient_store: &Arc<AccountStorageEntry>,
accounts: &AccountsToStore,
storage_selector: StorageSelector,
unref_if_already_exists: bool,
) -> StoreAccountsTiming {
let (accounts, hashes) = accounts.get(storage_selector);
if unref_if_already_exists {
self.unref_accounts_already_in_storage(accounts, ancient_store);
}
self.store_accounts_frozen(
(ancient_slot, accounts),
Some(hashes),
@ -3832,6 +3834,10 @@ impl AccountsDb {
let mut current_ancient = None;
let mut dropped_roots = vec![];
// we have to keep track of what pubkeys exist in the current ancient append vec so we can unref correctly
let mut ancient_pubkeys = HashSet::default();
let mut ancient_slot_with_pubkeys = None;
let len = sorted_slots.len();
for slot in sorted_slots {
let old_storages =
@ -3926,6 +3932,23 @@ impl AccountsDb {
// if this slot is not the ancient slot we're writing to, then this root will be dropped
let mut drop_root = slot != ancient_slot;
if slot != ancient_slot {
// we are taking accounts from 'slot' and putting them into 'ancient_slot'
let (accounts, _hashes) = to_store.get(StorageSelector::Primary);
if Some(ancient_slot) != ancient_slot_with_pubkeys {
// 'ancient_slot_with_pubkeys' is a local, re-used only for the set of slots we're iterating right now.
// the first time or when we change to a new ancient append vec, we need to recreate the set of ancient pubkeys here.
ancient_slot_with_pubkeys = Some(ancient_slot);
ancient_pubkeys = ancient_store
.accounts
.account_iter()
.map(|account| account.meta.pubkey)
.collect::<HashSet<_>>();
}
// accounts in 'slot' but ALSO already in the ancient append vec at a different slot need to be unref'd since 'slot' is going away
self.unref_accounts_already_in_storage(accounts, &mut ancient_pubkeys);
}
let mut rewrite_elapsed = Measure::start("rewrite_elapsed");
// write what we can to the current ancient storage
let mut store_accounts_timing = self.store_ancient_accounts(
@ -3934,7 +3957,6 @@ impl AccountsDb {
&to_store,
StorageSelector::Primary,
// we are adding accounts to an existing append vec from a different slot. We need to unref each account that exists already in 'ancient_store'.
slot != ancient_slot,
);
// handle accounts from 'slot' which did not fit into the current ancient append vec
@ -3961,7 +3983,6 @@ impl AccountsDb {
ancient_store,
&to_store,
StorageSelector::Overflow,
false, // we do not want to unref any accounts. these remaining accounts are going into a new append vec, so we need to keep the refs they already have
);
store_accounts_timing.store_accounts_elapsed = timing.store_accounts_elapsed;
store_accounts_timing.update_index_elapsed = timing.update_index_elapsed;