Performance: Allowed forced inline update_index (no thread pool) (#31455)

This commit is contained in:
Andrew Fitzgerald 2023-05-18 13:32:41 -07:00 committed by GitHub
parent 520c647918
commit d391e75a60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 4 deletions

View File

@ -1325,7 +1325,7 @@ impl Accounts {
durable_nonce,
lamports_per_signature,
);
self.accounts_db.store_cached(
self.accounts_db.store_cached_inline_update_index(
(slot, &accounts_to_store[..], include_slot_in_hash),
Some(&transactions),
);

View File

@ -7857,6 +7857,7 @@ impl AccountsDb {
infos: Vec<AccountInfo>,
accounts: &impl StorableAccounts<'a, T>,
reclaim: UpsertReclaim,
update_index_thread_selection: UpdateIndexThreadSelection,
) -> SlotList<AccountInfo> {
let target_slot = accounts.target_slot();
// using a thread pool here results in deadlock panics from bank_hashes.write()
@ -7884,7 +7885,11 @@ impl AccountsDb {
});
reclaims
};
if len > threshold {
if matches!(
update_index_thread_selection,
UpdateIndexThreadSelection::PoolWithThreshold,
) && len > threshold
{
let chunk_size = std::cmp::max(1, len / quarter_thread_count()); // # pubkeys/thread
let batches = 1 + len / chunk_size;
(0..batches)
@ -8250,6 +8255,24 @@ impl AccountsDb {
&StoreTo::Cache,
transactions,
StoreReclaims::Default,
UpdateIndexThreadSelection::PoolWithThreshold,
);
}
pub(crate) fn store_cached_inline_update_index<
'a,
T: ReadableAccount + Sync + ZeroLamport + 'a,
>(
&self,
accounts: impl StorableAccounts<'a, T>,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
) {
self.store(
accounts,
&StoreTo::Cache,
transactions,
StoreReclaims::Default,
UpdateIndexThreadSelection::Inline,
);
}
@ -8262,6 +8285,7 @@ impl AccountsDb {
&StoreTo::Storage(&storage),
None,
StoreReclaims::Default,
UpdateIndexThreadSelection::PoolWithThreshold,
);
}
@ -8271,6 +8295,7 @@ impl AccountsDb {
store_to: &StoreTo,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) {
// If all transactions in a batch are errored,
// it's possible to get a store with no accounts.
@ -8301,7 +8326,14 @@ impl AccountsDb {
}
// we use default hashes for now since the same account may be stored to the cache multiple times
self.store_accounts_unfrozen(accounts, None::<Vec<Hash>>, store_to, transactions, reclaim);
self.store_accounts_unfrozen(
accounts,
None::<Vec<Hash>>,
store_to,
transactions,
reclaim,
update_index_thread_selection,
);
self.report_store_timings();
}
@ -8430,6 +8462,7 @@ impl AccountsDb {
store_to: &StoreTo,
transactions: Option<&'a [Option<&'a SanitizedTransaction>]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) {
// This path comes from a store to a non-frozen slot.
// If a store is dead here, then a newer update for
@ -8447,6 +8480,7 @@ impl AccountsDb {
reset_accounts,
transactions,
reclaim,
update_index_thread_selection,
);
}
@ -8470,6 +8504,7 @@ impl AccountsDb {
reset_accounts,
None,
reclaim,
UpdateIndexThreadSelection::PoolWithThreshold,
)
}
@ -8482,6 +8517,7 @@ impl AccountsDb {
reset_accounts: bool,
transactions: Option<&[Option<&SanitizedTransaction>]>,
reclaim: StoreReclaims,
update_index_thread_selection: UpdateIndexThreadSelection,
) -> StoreAccountsTiming {
let write_version_producer: Box<dyn Iterator<Item = u64>> = write_version_producer
.unwrap_or_else(|| {
@ -8526,7 +8562,8 @@ impl AccountsDb {
// after the account are stored by the above `store_accounts_to`
// call and all the accounts are stored, all reads after this point
// will know to not check the cache anymore
let mut reclaims = self.update_index(infos, &accounts, reclaim);
let mut reclaims =
self.update_index(infos, &accounts, reclaim, update_index_thread_selection);
// For each updated account, `reclaims` should only have at most one
// item (if the account was previously updated in this slot).
@ -9345,6 +9382,13 @@ impl CalcAccountsHashFlavor {
}
}
pub(crate) enum UpdateIndexThreadSelection {
/// Use current thread only
Inline,
/// Use a thread-pool if the number of updates exceeds a threshold
PoolWithThreshold,
}
#[cfg(test)]
impl AccountsDb {
pub fn new(paths: Vec<PathBuf>, cluster_type: &ClusterType) -> Self {
@ -12699,6 +12743,7 @@ pub mod tests {
&StoreTo::Storage(&db.find_storage_candidate(some_slot, 1)),
None,
StoreReclaims::Default,
UpdateIndexThreadSelection::PoolWithThreshold,
);
db.add_root(some_slot);
let check_hash = true;
@ -12935,6 +12980,7 @@ pub mod tests {
&StoreTo::Storage(&db.find_storage_candidate(some_slot, 1)),
None,
StoreReclaims::Default,
UpdateIndexThreadSelection::PoolWithThreshold,
);
db.add_root(some_slot);
@ -16445,6 +16491,7 @@ pub mod tests {
&StoreTo::Cache,
None,
StoreReclaims::Default,
UpdateIndexThreadSelection::PoolWithThreshold,
);
}