Revert "uses designated thread-pools for account db parallel ops (#24954)" (#25053)

This reverts commit e8bdc27080.
This commit is contained in:
Jeff Washington (jwash) 2022-05-06 15:20:50 -05:00 committed by GitHub
parent c920d411f7
commit 2d7ce2a6c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 40 additions and 60 deletions

View File

@ -1805,23 +1805,17 @@ impl AccountsDb {
const INDEX_CLEAN_BULK_COUNT: usize = 4096; const INDEX_CLEAN_BULK_COUNT: usize = 4096;
let mut clean_rooted = Measure::start("clean_old_root-ms"); let mut clean_rooted = Measure::start("clean_old_root-ms");
let reclaims: Vec<_> = self.thread_pool_clean.install(|| { let reclaim_vecs = purges
purges .par_chunks(INDEX_CLEAN_BULK_COUNT)
.par_chunks(INDEX_CLEAN_BULK_COUNT) .map(|pubkeys: &[Pubkey]| {
.map(|pubkeys: &[Pubkey]| { let mut reclaims = Vec::new();
let mut reclaims = Vec::new(); for pubkey in pubkeys {
for pubkey in pubkeys { self.accounts_index
self.accounts_index.clean_rooted_entries( .clean_rooted_entries(pubkey, &mut reclaims, max_clean_root);
pubkey, }
&mut reclaims, reclaims
max_clean_root, });
); let reclaims: Vec<_> = reclaim_vecs.flatten().collect();
}
reclaims
})
.flatten()
.collect()
});
clean_rooted.stop(); clean_rooted.stop();
inc_new_counter_info!("clean-old-root-par-clean-ms", clean_rooted.as_ms() as usize); inc_new_counter_info!("clean-old-root-par-clean-ms", clean_rooted.as_ms() as usize);
self.clean_accounts_stats self.clean_accounts_stats
@ -6173,50 +6167,44 @@ impl AccountsDb {
// previous_slot_entry_was_cached = true means we just need to assert that after this update is complete // previous_slot_entry_was_cached = true means we just need to assert that after this update is complete
// that there are no items we would have put in reclaims that are not cached // that there are no items we would have put in reclaims that are not cached
// This function may be invoked from both foreground and background
// processes. As such it takes an explicit thread-pool argument which is
// set to either self.thread_pool or self.thread_pool_clean accordingly by
// the call-stack. Specifying wrong thread-pool here may cause deadlock
// panics in bank_hashes.write().
fn update_index<'a, T: ReadableAccount + Sync>( fn update_index<'a, T: ReadableAccount + Sync>(
&self, &self,
thread_pool: &ThreadPool,
infos: Vec<AccountInfo>, infos: Vec<AccountInfo>,
accounts: impl StorableAccounts<'a, T>, accounts: impl StorableAccounts<'a, T>,
previous_slot_entry_was_cached: bool, previous_slot_entry_was_cached: bool,
) -> SlotList<AccountInfo> { ) -> SlotList<AccountInfo> {
let target_slot = accounts.target_slot(); let target_slot = accounts.target_slot();
// using a thread pool here results in deadlock panics from bank_hashes.write()
// so, instead we limit how many threads will be created to the same size as the bg thread pool
let len = std::cmp::min(accounts.len(), infos.len()); let len = std::cmp::min(accounts.len(), infos.len());
let chunk_size = std::cmp::max(1, len / quarter_thread_count()); // # pubkeys/thread let chunk_size = std::cmp::max(1, len / quarter_thread_count()); // # pubkeys/thread
let batches = 1 + len / chunk_size; let batches = 1 + len / chunk_size;
thread_pool.install(|| { (0..batches)
(0..batches) .into_par_iter()
.into_par_iter() .map(|batch| {
.map(|batch| { let start = batch * chunk_size;
let start = batch * chunk_size; let end = std::cmp::min(start + chunk_size, len);
let end = std::cmp::min(start + chunk_size, len); let mut reclaims = Vec::with_capacity((end - start) / 2);
let mut reclaims = Vec::with_capacity((end - start) / 2); (start..end).into_iter().for_each(|i| {
(start..end).into_iter().for_each(|i| { let info = infos[i];
let info = infos[i]; let pubkey_account = (accounts.pubkey(i), accounts.account(i));
let pubkey_account = (accounts.pubkey(i), accounts.account(i)); let pubkey = pubkey_account.0;
let pubkey = pubkey_account.0; let old_slot = accounts.slot(i);
let old_slot = accounts.slot(i); self.accounts_index.upsert(
self.accounts_index.upsert( target_slot,
target_slot, old_slot,
old_slot, pubkey,
pubkey, pubkey_account.1,
pubkey_account.1, &self.account_indexes,
&self.account_indexes, info,
info, &mut reclaims,
&mut reclaims, previous_slot_entry_was_cached,
previous_slot_entry_was_cached, );
); });
}); reclaims
reclaims })
}) .flatten()
.flatten() .collect::<Vec<_>>()
.collect::<Vec<_>>()
})
} }
fn should_not_shrink(aligned_bytes: u64, total_bytes: u64, num_stores: usize) -> bool { fn should_not_shrink(aligned_bytes: u64, total_bytes: u64, num_stores: usize) -> bool {
@ -6676,10 +6664,7 @@ impl AccountsDb {
// hold just 1 ref from this slot. // hold just 1 ref from this slot.
let reset_accounts = true; let reset_accounts = true;
// self.thread_pool (and not self.thread_pool_clean) here because this
// function is only invoked from replay and fg processes.
self.store_accounts_custom( self.store_accounts_custom(
&self.thread_pool,
accounts, accounts,
hashes, hashes,
None::<StorageFinder>, None::<StorageFinder>,
@ -6701,10 +6686,7 @@ impl AccountsDb {
// and accounts in the append_vec can be unrefed correctly // and accounts in the append_vec can be unrefed correctly
let reset_accounts = false; let reset_accounts = false;
let is_cached_store = false; let is_cached_store = false;
// self.thread_pool_clean (and not self.thread_pool) here because this
// function is only invoked from cleanup and bg processes.
self.store_accounts_custom( self.store_accounts_custom(
&self.thread_pool_clean,
accounts, accounts,
hashes, hashes,
storage_finder, storage_finder,
@ -6716,7 +6698,6 @@ impl AccountsDb {
fn store_accounts_custom<'a, 'b, T: ReadableAccount + Sync + ZeroLamport>( fn store_accounts_custom<'a, 'b, T: ReadableAccount + Sync + ZeroLamport>(
&'a self, &'a self,
thread_pool: &ThreadPool,
accounts: impl StorableAccounts<'b, T>, accounts: impl StorableAccounts<'b, T>,
hashes: Option<&[impl Borrow<Hash>]>, hashes: Option<&[impl Borrow<Hash>]>,
storage_finder: Option<StorageFinder<'a>>, storage_finder: Option<StorageFinder<'a>>,
@ -6764,8 +6745,7 @@ impl AccountsDb {
// after the account are stored by the above `store_accounts_to` // after the account are stored by the above `store_accounts_to`
// call and all the accounts are stored, all reads after this point // call and all the accounts are stored, all reads after this point
// will know to not check the cache anymore // will know to not check the cache anymore
let mut reclaims = let mut reclaims = self.update_index(infos, accounts, previous_slot_entry_was_cached);
self.update_index(thread_pool, infos, accounts, previous_slot_entry_was_cached);
// For each updated account, `reclaims` should only have at most one // For each updated account, `reclaims` should only have at most one
// item (if the account was previously updated in this slot). // item (if the account was previously updated in this slot).