share shrink_collect code (#28454)

* share shrink_collect code

* use correct stats
This commit is contained in:
Jeff Washington (jwash) 2022-10-19 08:07:05 -07:00 committed by GitHub
parent 1cc9cf927c
commit 040035063f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 64 additions and 51 deletions

View File

@ -193,6 +193,17 @@ enum LoadZeroLamports {
SomeWithZeroLamportAccountForTests,
}
struct ShrinkCollect<'a> {
original_bytes: u64,
store_ids: Vec<AppendVecId>,
aligned_total: u64,
unrefed_pubkeys: Vec<&'a Pubkey>,
alive_accounts: Vec<&'a (Pubkey, FoundStoredAccount<'a>)>,
alive_total: usize,
index_read_elapsed: Measure,
total_starting_accounts: usize,
}
// the current best way to add filler accounts is gradually.
// In other scenarios, such as monitoring catchup with large # of accounts, it may be useful to be able to
// add filler accounts at the beginning, so that code path remains but won't execute at the moment.
@ -3626,16 +3637,23 @@ impl AccountsDb {
}
}
fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I) -> usize
/// shared code for shrinking normal slots and combining into ancient append vecs
/// note 'stored_accounts' is passed by ref so we can return references to data within it, avoiding self-references
fn shrink_collect<'a: 'b, 'b, I>(
&'a self,
stores: I,
stored_accounts: &'b mut Vec<(Pubkey, FoundStoredAccount<'b>)>,
stats: &ShrinkStats,
) -> ShrinkCollect<'b>
where
I: Iterator<Item = &'a Arc<AccountStorageEntry>>,
{
debug!("do_shrink_slot_stores: slot: {}", slot);
let GetUniqueAccountsResult {
stored_accounts,
stored_accounts: stored_accounts_temp,
original_bytes,
store_ids,
} = self.get_unique_accounts_from_storages(stores);
*stored_accounts = stored_accounts_temp;
let mut index_read_elapsed = Measure::start("index_read_elapsed");
let alive_total_collect = AtomicUsize::new(0);
@ -3643,7 +3661,7 @@ impl AccountsDb {
let len = stored_accounts.len();
let alive_accounts_collect = Mutex::new(Vec::with_capacity(len));
let unrefed_pubkeys_collect = Mutex::new(Vec::with_capacity(len));
self.shrink_stats
stats
.accounts_loaded
.fetch_add(len as u64, Ordering::Relaxed);
@ -3678,6 +3696,35 @@ impl AccountsDb {
index_read_elapsed.stop();
let aligned_total: u64 = Self::page_align(alive_total as u64);
ShrinkCollect {
store_ids,
original_bytes,
aligned_total,
unrefed_pubkeys,
alive_accounts,
alive_total,
index_read_elapsed,
total_starting_accounts: len,
}
}
fn do_shrink_slot_stores<'a, I>(&'a self, slot: Slot, stores: I) -> usize
where
I: Iterator<Item = &'a Arc<AccountStorageEntry>>,
{
let mut stored_accounts = Vec::default();
debug!("do_shrink_slot_stores: slot: {}", slot);
let ShrinkCollect {
store_ids,
original_bytes,
alive_total,
index_read_elapsed,
aligned_total,
unrefed_pubkeys,
alive_accounts,
total_starting_accounts,
} = self.shrink_collect(stores, &mut stored_accounts, &self.shrink_stats);
// This shouldn't happen if alive_bytes/approx_stored_count are accurate
if Self::should_not_shrink(aligned_total, original_bytes, store_ids.len()) {
self.shrink_stats
@ -3691,7 +3738,6 @@ impl AccountsDb {
return 0;
}
let total_starting_accounts = stored_accounts.len();
let total_accounts_after_shrink = alive_accounts.len();
debug!(
"shrinking: slot: {}, accounts: ({} => {}) bytes: ({} ; aligned to: {}) original: {}",
@ -4272,54 +4318,21 @@ impl AccountsDb {
);
}
// this code is copied from shrink. I would like to combine it into a helper function, but the borrow checker has defeated my efforts so far.
let GetUniqueAccountsResult {
stored_accounts,
let mut stored_accounts = Vec::default();
let ShrinkCollect {
original_bytes,
store_ids: _,
} = self.get_unique_accounts_from_storages(old_storages.iter());
index_read_elapsed,
aligned_total,
unrefed_pubkeys,
alive_accounts,
total_starting_accounts,
..
} = self.shrink_collect(
old_storages.iter(),
&mut stored_accounts,
&self.shrink_ancient_stats.shrink_stats,
);
let mut index_read_elapsed = Measure::start("index_read_elapsed");
let alive_total_collect = AtomicUsize::new(0);
let len = stored_accounts.len();
let alive_accounts_collect = Mutex::new(Vec::with_capacity(len));
let unrefed_pubkeys_collect = Mutex::new(Vec::with_capacity(len));
self.shrink_ancient_stats
.shrink_stats
.accounts_loaded
.fetch_add(len as u64, Ordering::Relaxed);
self.thread_pool_clean.install(|| {
let chunk_size = 50; // # accounts/thread
stored_accounts
.par_chunks(chunk_size)
.for_each(|stored_accounts| {
let LoadAccountsIndexForShrink {
alive_total,
mut alive_accounts,
mut unrefed_pubkeys,
} = self.load_accounts_index_for_shrink(stored_accounts);
// collect
alive_accounts_collect
.lock()
.unwrap()
.append(&mut alive_accounts);
unrefed_pubkeys_collect
.lock()
.unwrap()
.append(&mut unrefed_pubkeys);
alive_total_collect.fetch_add(alive_total, Ordering::Relaxed);
});
});
let alive_accounts = alive_accounts_collect.into_inner().unwrap();
let unrefed_pubkeys = unrefed_pubkeys_collect.into_inner().unwrap();
let alive_total = alive_total_collect.load(Ordering::Relaxed);
index_read_elapsed.stop();
let aligned_total: u64 = Self::page_align(alive_total as u64);
let total_starting_accounts = len;
// could follow what shrink does more closely
if total_starting_accounts == 0 {
continue; // skipping slot with no useful accounts to write