add index_entries_being_shrunk to ShrinkCollect (#30518)

This commit is contained in:
Jeff Washington (jwash) 2023-03-01 19:09:42 -06:00 committed by GitHub
parent 590ee58667
commit fbcb82dcf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 10 deletions

View File

@ -37,10 +37,11 @@ use {
ZeroLamportAccounts,
},
accounts_index::{
AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexConfig,
AccountsIndexRootsStats, AccountsIndexScanResult, DiskIndexValue, IndexKey, IndexValue,
IsCached, RefCount, ScanConfig, ScanResult, SlotList, UpsertReclaim, ZeroLamport,
ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS, ACCOUNTS_INDEX_CONFIG_FOR_TESTING,
AccountIndexGetResult, AccountMapEntry, AccountSecondaryIndexes, AccountsIndex,
AccountsIndexConfig, AccountsIndexRootsStats, AccountsIndexScanResult, DiskIndexValue,
IndexKey, IndexValue, IsCached, RefCount, ScanConfig, ScanResult, SlotList,
UpsertReclaim, ZeroLamport, ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS,
ACCOUNTS_INDEX_CONFIG_FOR_TESTING,
},
accounts_index_storage::Startup,
accounts_update_notifier_interface::AccountsUpdateNotifier,
@ -465,6 +466,9 @@ pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> {
pub(crate) total_starting_accounts: usize,
/// true if all alive accounts are zero lamports
pub(crate) all_are_zero_lamports: bool,
/// index entries that need to be held in memory while shrink is in progress
/// These aren't read - they are just held so that entries cannot be flushed.
pub(crate) _index_entries_being_shrunk: Vec<AccountMapEntry<AccountInfo>>,
}
pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
@ -499,6 +503,8 @@ struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> {
unrefed_pubkeys: Vec<&'a Pubkey>,
/// true if all alive accounts are zero lamport accounts
all_are_zero_lamports: bool,
/// index entries we need to hold onto to keep them from getting flushed
index_entries_being_shrunk: Vec<AccountMapEntry<AccountInfo>>,
}
pub struct GetUniqueAccountsResult<'a> {
@ -3189,7 +3195,7 @@ impl AccountsDb {
let mut useful = 0;
self.accounts_index.scan(
pubkeys.iter(),
|pubkey, slots_refs| {
|pubkey, slots_refs, _entry| {
let mut useless = true;
if let Some((slot_list, ref_count)) = slots_refs {
let index_in_slot_list = self.accounts_index.latest_slot(
@ -3255,6 +3261,7 @@ impl AccountsDb {
}
},
None,
false,
);
found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed);
not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed);
@ -3741,9 +3748,10 @@ impl AccountsDb {
let mut dead = 0;
let mut index = 0;
let mut all_are_zero_lamports = true;
let mut index_entries_being_shrunk = Vec::with_capacity(accounts.len());
self.accounts_index.scan(
accounts.iter().map(|account| account.pubkey()),
|pubkey, slots_refs| {
|pubkey, slots_refs, entry| {
let mut result = AccountsIndexScanResult::None;
if let Some((slot_list, ref_count)) = slots_refs {
let stored_account = &accounts[index];
@ -3760,6 +3768,9 @@ impl AccountsDb {
result = AccountsIndexScanResult::Unref;
dead += 1;
} else {
// Hold onto the index entry arc so that it cannot be flushed.
// Since we are shrinking these entries, we need to disambiguate append_vec_ids during this period and those only exist in the in-memory accounts index.
index_entries_being_shrunk.push(Arc::clone(entry.unwrap()));
all_are_zero_lamports &= stored_account.lamports() == 0;
alive_accounts.add(ref_count, stored_account);
alive += 1;
@ -3769,6 +3780,7 @@ impl AccountsDb {
result
},
None,
true,
);
assert_eq!(index, std::cmp::min(accounts.len(), count));
stats.alive_accounts.fetch_add(alive, Ordering::Relaxed);
@ -3778,6 +3790,7 @@ impl AccountsDb {
alive_accounts,
unrefed_pubkeys,
all_are_zero_lamports,
index_entries_being_shrunk,
}
}
@ -3840,6 +3853,7 @@ impl AccountsDb {
.accounts_loaded
.fetch_add(len as u64, Ordering::Relaxed);
let all_are_zero_lamports_collect = Mutex::new(true);
let index_entries_being_shrunk_outer = Mutex::new(Vec::default());
self.thread_pool_clean.install(|| {
stored_accounts
.par_chunks(SHRINK_COLLECT_CHUNK_SIZE)
@ -3848,6 +3862,7 @@ impl AccountsDb {
alive_accounts,
mut unrefed_pubkeys,
all_are_zero_lamports,
mut index_entries_being_shrunk,
} = self.load_accounts_index_for_shrink(stored_accounts, stats, slot);
// collect
@ -3859,6 +3874,10 @@ impl AccountsDb {
.lock()
.unwrap()
.append(&mut unrefed_pubkeys);
index_entries_being_shrunk_outer
.lock()
.unwrap()
.append(&mut index_entries_being_shrunk);
if !all_are_zero_lamports {
*all_are_zero_lamports_collect.lock().unwrap() = false;
}
@ -3897,6 +3916,7 @@ impl AccountsDb {
alive_total_bytes,
total_starting_accounts: len,
all_are_zero_lamports: all_are_zero_lamports_collect.into_inner().unwrap(),
_index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(),
}
}
@ -8012,11 +8032,12 @@ impl AccountsDb {
pubkeys_removed_from_accounts_index.contains(pubkey);
!already_removed
}),
|_pubkey, _slots_refs| {
|_pubkey, _slots_refs, _entry| {
/* unused */
AccountsIndexScanResult::Unref
},
Some(AccountsIndexScanResult::Unref),
false,
)
});
});

View File

@ -1356,11 +1356,13 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
/// For each pubkey, find the slot list in the accounts index
/// apply 'avoid_callback_result' if specified.
/// otherwise, call `callback`
/// if 'provide_entry_in_callback' is true, populate callback with the Arc of the entry itself.
pub(crate) fn scan<'a, F, I>(
&self,
pubkeys: I,
mut callback: F,
avoid_callback_result: Option<AccountsIndexScanResult>,
provide_entry_in_callback: bool,
) where
// params:
// pubkey looked up
@ -1368,9 +1370,14 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
// None if 'pubkey' is not in accounts index.
// slot_list: comes from accounts index for 'pubkey'
// ref_count: refcount of entry in index
// entry, if 'provide_entry_in_callback' is true
// if 'avoid_callback_result' is Some(_), then callback is NOT called
// and _ is returned as if callback were called.
F: FnMut(&'a Pubkey, Option<(&SlotList<T>, RefCount)>) -> AccountsIndexScanResult,
F: FnMut(
&'a Pubkey,
Option<(&SlotList<T>, RefCount)>,
Option<&AccountMapEntry<T>>,
) -> AccountsIndexScanResult,
I: Iterator<Item = &'a Pubkey>,
{
let mut lock = None;
@ -1390,7 +1397,11 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
*result
} else {
let slot_list = &locked_entry.slot_list.read().unwrap();
callback(pubkey, Some((slot_list, locked_entry.ref_count())))
callback(
pubkey,
Some((slot_list, locked_entry.ref_count())),
provide_entry_in_callback.then_some(locked_entry),
)
};
cache = match result {
AccountsIndexScanResult::Unref => {
@ -1404,7 +1415,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
};
}
None => {
avoid_callback_result.unwrap_or_else(|| callback(pubkey, None));
avoid_callback_result.unwrap_or_else(|| callback(pubkey, None, None));
}
}
(cache, ())