Lower priority for hashing threads. (#14043)

This commit is contained in:
sakridge 2020-12-10 12:26:47 -08:00 committed by GitHub
parent 409fe3bca1
commit 68109a46e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 85 additions and 77 deletions

View File

@ -766,49 +766,53 @@ impl AccountsDB {
.cloned() .cloned()
.collect(); .collect();
// parallel scan the index. // parallel scan the index.
let (mut purges, purges_in_root) = pubkeys let (mut purges, purges_in_root) = {
.par_chunks(4096) self.thread_pool_clean.install(|| {
.map(|pubkeys: &[Pubkey]| { pubkeys
let mut purges_in_root = Vec::new(); .par_chunks(4096)
let mut purges = HashMap::new(); .map(|pubkeys: &[Pubkey]| {
for pubkey in pubkeys { let mut purges_in_root = Vec::new();
if let Some((locked_entry, index)) = let mut purges = HashMap::new();
self.accounts_index.get(pubkey, None, max_clean_root) for pubkey in pubkeys {
{ if let Some((locked_entry, index)) =
let (slot, account_info) = &locked_entry.slot_list()[index]; self.accounts_index.get(pubkey, None, max_clean_root)
if account_info.lamports == 0 { {
purges.insert( let (slot, account_info) = &locked_entry.slot_list()[index];
*pubkey, if account_info.lamports == 0 {
self.accounts_index purges.insert(
.roots_and_ref_count(&locked_entry, max_clean_root), *pubkey,
); self.accounts_index
} .roots_and_ref_count(&locked_entry, max_clean_root),
);
}
// Release the lock // Release the lock
let slot = *slot; let slot = *slot;
drop(locked_entry); drop(locked_entry);
if self.accounts_index.is_uncleaned_root(slot) { if self.accounts_index.is_uncleaned_root(slot) {
// Assertion enforced by `accounts_index.get()`, the latest slot // Assertion enforced by `accounts_index.get()`, the latest slot
// will not be greater than the given `max_clean_root` // will not be greater than the given `max_clean_root`
if let Some(max_clean_root) = max_clean_root { if let Some(max_clean_root) = max_clean_root {
assert!(slot <= max_clean_root); assert!(slot <= max_clean_root);
}
purges_in_root.push(*pubkey);
}
} }
purges_in_root.push(*pubkey);
} }
} (purges, purges_in_root)
} })
(purges, purges_in_root) .reduce(
|| (HashMap::new(), Vec::new()),
|mut m1, m2| {
// Collapse down the hashmaps/vecs into one.
m1.0.extend(m2.0);
m1.1.extend(m2.1);
m1
},
)
}) })
.reduce( };
|| (HashMap::new(), Vec::new()),
|mut m1, m2| {
// Collapse down the hashmaps/vecs into one.
m1.0.extend(m2.0);
m1.1.extend(m2.1);
m1
},
);
accounts_scan.stop(); accounts_scan.stop();
let mut clean_old_rooted = Measure::start("clean_old_roots"); let mut clean_old_rooted = Measure::start("clean_old_roots");
@ -2258,48 +2262,52 @@ impl AccountsDB {
.cloned() .cloned()
.collect(); .collect();
let mismatch_found = AtomicU64::new(0); let mismatch_found = AtomicU64::new(0);
let hashes: Vec<(Pubkey, Hash, u64)> = keys let hashes: Vec<(Pubkey, Hash, u64)> = {
.par_iter() self.thread_pool_clean.install(|| {
.filter_map(|pubkey| { keys.par_iter()
if let Some((lock, index)) = .filter_map(|pubkey| {
self.accounts_index.get(pubkey, Some(ancestors), Some(slot)) if let Some((lock, index)) =
{ self.accounts_index.get(pubkey, Some(ancestors), Some(slot))
let (slot, account_info) = &lock.slot_list()[index]; {
if account_info.lamports != 0 { let (slot, account_info) = &lock.slot_list()[index];
self.storage if account_info.lamports != 0 {
.get_account_storage_entry(*slot, account_info.store_id) self.storage
.and_then(|store| { .get_account_storage_entry(*slot, account_info.store_id)
let account = store.accounts.get_account(account_info.offset)?.0; .and_then(|store| {
let balance = Self::account_balance_for_capitalization( let account =
account_info.lamports, store.accounts.get_account(account_info.offset)?.0;
&account.account_meta.owner, let balance = Self::account_balance_for_capitalization(
account.account_meta.executable, account_info.lamports,
); &account.account_meta.owner,
account.account_meta.executable,
);
if check_hash { if check_hash {
let hash = Self::hash_stored_account( let hash = Self::hash_stored_account(
*slot, *slot,
&account, &account,
&self &self.cluster_type.expect(
.cluster_type "Cluster type must be set at initialization",
.expect("Cluster type must be set at initialization"), ),
); );
if hash != *account.hash { if hash != *account.hash {
mismatch_found.fetch_add(1, Ordering::Relaxed); mismatch_found.fetch_add(1, Ordering::Relaxed);
return None; return None;
} }
} }
Some((*pubkey, *account.hash, balance)) Some((*pubkey, *account.hash, balance))
}) })
} else { } else {
None None
} }
} else { } else {
None None
} }
})
.collect()
}) })
.collect(); };
if mismatch_found.load(Ordering::Relaxed) > 0 { if mismatch_found.load(Ordering::Relaxed) > 0 {
warn!( warn!(
"{} mismatched account hash(es) found", "{} mismatched account hash(es) found",