From 2d7ce2a6c21d3a017b540975b30f3ddcecccfbc7 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Fri, 6 May 2022 15:20:50 -0500 Subject: [PATCH] Revert "uses designated thread-pools for account db parallel ops (#24954)" (#25053) This reverts commit e8bdc27080d803a5de4e9870fa48285036366bf7. --- runtime/src/accounts_db.rs | 100 +++++++++++++++---------------------- 1 file changed, 40 insertions(+), 60 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 7633f62791..5db3a6c3ff 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -1805,23 +1805,17 @@ impl AccountsDb { const INDEX_CLEAN_BULK_COUNT: usize = 4096; let mut clean_rooted = Measure::start("clean_old_root-ms"); - let reclaims: Vec<_> = self.thread_pool_clean.install(|| { - purges - .par_chunks(INDEX_CLEAN_BULK_COUNT) - .map(|pubkeys: &[Pubkey]| { - let mut reclaims = Vec::new(); - for pubkey in pubkeys { - self.accounts_index.clean_rooted_entries( - pubkey, - &mut reclaims, - max_clean_root, - ); - } - reclaims - }) - .flatten() - .collect() - }); + let reclaim_vecs = purges + .par_chunks(INDEX_CLEAN_BULK_COUNT) + .map(|pubkeys: &[Pubkey]| { + let mut reclaims = Vec::new(); + for pubkey in pubkeys { + self.accounts_index + .clean_rooted_entries(pubkey, &mut reclaims, max_clean_root); + } + reclaims + }); + let reclaims: Vec<_> = reclaim_vecs.flatten().collect(); clean_rooted.stop(); inc_new_counter_info!("clean-old-root-par-clean-ms", clean_rooted.as_ms() as usize); self.clean_accounts_stats @@ -6173,50 +6167,44 @@ impl AccountsDb { // previous_slot_entry_was_cached = true means we just need to assert that after this update is complete // that there are no items we would have put in reclaims that are not cached - // This function may be invoked from both foreground and background - // processes. As such it takes an explicit thread-pool argument which is - // set to either self.thread_pool or self.thread_pool_clean accordingly by - // the call-stack. Specifying wrong thread-pool here may cause deadlock - // panics in bank_hashes.write(). fn update_index<'a, T: ReadableAccount + Sync>( &self, - thread_pool: &ThreadPool, infos: Vec, accounts: impl StorableAccounts<'a, T>, previous_slot_entry_was_cached: bool, ) -> SlotList { let target_slot = accounts.target_slot(); + // using a thread pool here results in deadlock panics from bank_hashes.write() + // so, instead we limit how many threads will be created to the same size as the bg thread pool let len = std::cmp::min(accounts.len(), infos.len()); let chunk_size = std::cmp::max(1, len / quarter_thread_count()); // # pubkeys/thread let batches = 1 + len / chunk_size; - thread_pool.install(|| { - (0..batches) - .into_par_iter() - .map(|batch| { - let start = batch * chunk_size; - let end = std::cmp::min(start + chunk_size, len); - let mut reclaims = Vec::with_capacity((end - start) / 2); - (start..end).into_iter().for_each(|i| { - let info = infos[i]; - let pubkey_account = (accounts.pubkey(i), accounts.account(i)); - let pubkey = pubkey_account.0; - let old_slot = accounts.slot(i); - self.accounts_index.upsert( - target_slot, - old_slot, - pubkey, - pubkey_account.1, - &self.account_indexes, - info, - &mut reclaims, - previous_slot_entry_was_cached, - ); - }); - reclaims - }) - .flatten() - .collect::>() - }) + (0..batches) + .into_par_iter() + .map(|batch| { + let start = batch * chunk_size; + let end = std::cmp::min(start + chunk_size, len); + let mut reclaims = Vec::with_capacity((end - start) / 2); + (start..end).into_iter().for_each(|i| { + let info = infos[i]; + let pubkey_account = (accounts.pubkey(i), accounts.account(i)); + let pubkey = pubkey_account.0; + let old_slot = accounts.slot(i); + self.accounts_index.upsert( + target_slot, + old_slot, + pubkey, + pubkey_account.1, + &self.account_indexes, + info, + &mut reclaims, + previous_slot_entry_was_cached, + ); + }); + reclaims + }) + .flatten() + .collect::>() } fn should_not_shrink(aligned_bytes: u64, total_bytes: u64, num_stores: usize) -> bool { @@ -6676,10 +6664,7 @@ impl AccountsDb { // hold just 1 ref from this slot. let reset_accounts = true; - // self.thread_pool (and not self.thread_pool_clean) here because this - // function is only invoked from replay and fg processes. self.store_accounts_custom( - &self.thread_pool, accounts, hashes, None::, @@ -6701,10 +6686,7 @@ impl AccountsDb { // and accounts in the append_vec can be unrefed correctly let reset_accounts = false; let is_cached_store = false; - // self.thread_pool_clean (and not self.thread_pool) here because this - // function is only invoked from cleanup and bg processes. self.store_accounts_custom( - &self.thread_pool_clean, accounts, hashes, storage_finder, @@ -6716,7 +6698,6 @@ impl AccountsDb { fn store_accounts_custom<'a, 'b, T: ReadableAccount + Sync + ZeroLamport>( &'a self, - thread_pool: &ThreadPool, accounts: impl StorableAccounts<'b, T>, hashes: Option<&[impl Borrow]>, storage_finder: Option>, @@ -6764,8 +6745,7 @@ impl AccountsDb { // after the account are stored by the above `store_accounts_to` // call and all the accounts are stored, all reads after this point // will know to not check the cache anymore - let mut reclaims = - self.update_index(thread_pool, infos, accounts, previous_slot_entry_was_cached); + let mut reclaims = self.update_index(infos, accounts, previous_slot_entry_was_cached); // For each updated account, `reclaims` should only have at most one // item (if the account was previously updated in this slot).