From 68109a46e03e2475d3c29beb1ced05c196294384 Mon Sep 17 00:00:00 2001 From: sakridge Date: Thu, 10 Dec 2020 12:26:47 -0800 Subject: [PATCH] Lower priority for hashing threads. (#14043) --- runtime/src/accounts_db.rs | 162 +++++++++++++++++++------------------ 1 file changed, 85 insertions(+), 77 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index d06e098c1b..98b8e1ffea 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -766,49 +766,53 @@ impl AccountsDB { .cloned() .collect(); // parallel scan the index. - let (mut purges, purges_in_root) = pubkeys - .par_chunks(4096) - .map(|pubkeys: &[Pubkey]| { - let mut purges_in_root = Vec::new(); - let mut purges = HashMap::new(); - for pubkey in pubkeys { - if let Some((locked_entry, index)) = - self.accounts_index.get(pubkey, None, max_clean_root) - { - let (slot, account_info) = &locked_entry.slot_list()[index]; - if account_info.lamports == 0 { - purges.insert( - *pubkey, - self.accounts_index - .roots_and_ref_count(&locked_entry, max_clean_root), - ); - } + let (mut purges, purges_in_root) = { + self.thread_pool_clean.install(|| { + pubkeys + .par_chunks(4096) + .map(|pubkeys: &[Pubkey]| { + let mut purges_in_root = Vec::new(); + let mut purges = HashMap::new(); + for pubkey in pubkeys { + if let Some((locked_entry, index)) = + self.accounts_index.get(pubkey, None, max_clean_root) + { + let (slot, account_info) = &locked_entry.slot_list()[index]; + if account_info.lamports == 0 { + purges.insert( + *pubkey, + self.accounts_index + .roots_and_ref_count(&locked_entry, max_clean_root), + ); + } - // Release the lock - let slot = *slot; - drop(locked_entry); + // Release the lock + let slot = *slot; + drop(locked_entry); - if self.accounts_index.is_uncleaned_root(slot) { - // Assertion enforced by `accounts_index.get()`, the latest slot - // will not be greater than the given `max_clean_root` - if let Some(max_clean_root) = max_clean_root { - assert!(slot <= max_clean_root); + if self.accounts_index.is_uncleaned_root(slot) { + // Assertion enforced by `accounts_index.get()`, the latest slot + // will not be greater than the given `max_clean_root` + if let Some(max_clean_root) = max_clean_root { + assert!(slot <= max_clean_root); + } + purges_in_root.push(*pubkey); + } } - purges_in_root.push(*pubkey); } - } - } - (purges, purges_in_root) + (purges, purges_in_root) + }) + .reduce( + || (HashMap::new(), Vec::new()), + |mut m1, m2| { + // Collapse down the hashmaps/vecs into one. + m1.0.extend(m2.0); + m1.1.extend(m2.1); + m1 + }, + ) }) - .reduce( - || (HashMap::new(), Vec::new()), - |mut m1, m2| { - // Collapse down the hashmaps/vecs into one. - m1.0.extend(m2.0); - m1.1.extend(m2.1); - m1 - }, - ); + }; accounts_scan.stop(); let mut clean_old_rooted = Measure::start("clean_old_roots"); @@ -2258,48 +2262,52 @@ impl AccountsDB { .cloned() .collect(); let mismatch_found = AtomicU64::new(0); - let hashes: Vec<(Pubkey, Hash, u64)> = keys - .par_iter() - .filter_map(|pubkey| { - if let Some((lock, index)) = - self.accounts_index.get(pubkey, Some(ancestors), Some(slot)) - { - let (slot, account_info) = &lock.slot_list()[index]; - if account_info.lamports != 0 { - self.storage - .get_account_storage_entry(*slot, account_info.store_id) - .and_then(|store| { - let account = store.accounts.get_account(account_info.offset)?.0; - let balance = Self::account_balance_for_capitalization( - account_info.lamports, - &account.account_meta.owner, - account.account_meta.executable, - ); + let hashes: Vec<(Pubkey, Hash, u64)> = { + self.thread_pool_clean.install(|| { + keys.par_iter() + .filter_map(|pubkey| { + if let Some((lock, index)) = + self.accounts_index.get(pubkey, Some(ancestors), Some(slot)) + { + let (slot, account_info) = &lock.slot_list()[index]; + if account_info.lamports != 0 { + self.storage + .get_account_storage_entry(*slot, account_info.store_id) + .and_then(|store| { + let account = + store.accounts.get_account(account_info.offset)?.0; + let balance = Self::account_balance_for_capitalization( + account_info.lamports, + &account.account_meta.owner, + account.account_meta.executable, + ); - if check_hash { - let hash = Self::hash_stored_account( - *slot, - &account, - &self - .cluster_type - .expect("Cluster type must be set at initialization"), - ); - if hash != *account.hash { - mismatch_found.fetch_add(1, Ordering::Relaxed); - return None; - } - } + if check_hash { + let hash = Self::hash_stored_account( + *slot, + &account, + &self.cluster_type.expect( + "Cluster type must be set at initialization", + ), + ); + if hash != *account.hash { + mismatch_found.fetch_add(1, Ordering::Relaxed); + return None; + } + } - Some((*pubkey, *account.hash, balance)) - }) - } else { - None - } - } else { - None - } + Some((*pubkey, *account.hash, balance)) + }) + } else { + None + } + } else { + None + } + }) + .collect() }) - .collect(); + }; if mismatch_found.load(Ordering::Relaxed) > 0 { warn!( "{} mismatched account hash(es) found",