Parallel insertion of dirty store keys during clean (#27058)

parallelize dirty store key insertion
This commit is contained in:
Brennan Watt 2022-08-20 06:52:32 -07:00 committed by GitHub
parent 55d18e8463
commit 3c786bab65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 29 additions and 5 deletions

View File

@ -2424,6 +2424,7 @@ impl AccountsDb {
fn construct_candidate_clean_keys(
&self,
max_clean_root: Option<Slot>,
is_startup: bool,
last_full_snapshot_slot: Option<Slot>,
timings: &mut CleanKeyTimings,
) -> Vec<Pubkey> {
@ -2441,11 +2442,33 @@ impl AccountsDb {
});
let dirty_stores_len = dirty_stores.len();
let pubkeys = DashSet::new();
timings.oldest_dirty_slot = max_slot_inclusive.saturating_add(1);
for (slot, store) in dirty_stores {
timings.oldest_dirty_slot = std::cmp::min(timings.oldest_dirty_slot, slot);
store.accounts.account_iter().for_each(|account| {
pubkeys.insert(account.meta.pubkey);
let mut dirty_store_routine = || {
let chunk_size = 1.max(dirty_stores_len.saturating_div(rayon::current_num_threads()));
let oldest_dirty_slots: Vec<u64> = dirty_stores
.par_chunks(chunk_size)
.map(|dirty_store_chunk| {
let mut oldest_dirty_slot = max_slot_inclusive.saturating_add(1);
dirty_store_chunk.iter().for_each(|(slot, store)| {
oldest_dirty_slot = oldest_dirty_slot.min(*slot);
store.accounts.account_iter().for_each(|account| {
pubkeys.insert(account.meta.pubkey);
});
});
oldest_dirty_slot
})
.collect();
timings.oldest_dirty_slot = *oldest_dirty_slots
.iter()
.min()
.unwrap_or(&max_slot_inclusive.saturating_add(1));
};
if is_startup {
// Free to consume all the cores during startup
dirty_store_routine();
} else {
self.thread_pool_clean.install(|| {
dirty_store_routine();
});
}
trace!(
@ -2533,6 +2556,7 @@ impl AccountsDb {
let mut key_timings = CleanKeyTimings::default();
let mut pubkeys = self.construct_candidate_clean_keys(
max_clean_root,
is_startup,
last_full_snapshot_slot,
&mut key_timings,
);