packed ancient: parallel write to append vecs (#31144)

This commit is contained in:
Jeff Washington (jwash) 2023-04-14 15:57:46 -05:00 committed by GitHub
parent 8a849718d2
commit c008a557c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 24 additions and 5 deletions

View File

@ -18,12 +18,13 @@ use {
storable_accounts::{StorableAccounts, StorableAccountsBySlot},
},
rand::{thread_rng, Rng},
rayon::prelude::{IntoParallelRefIterator, ParallelIterator},
solana_measure::measure_us,
solana_sdk::{account::ReadableAccount, clock::Slot, hash::Hash, saturating_add_assign},
std::{
collections::HashMap,
num::NonZeroU64,
sync::{atomic::Ordering, Arc},
sync::{atomic::Ordering, Arc, Mutex},
},
};
@ -388,20 +389,38 @@ impl AccountsDb {
accounts_to_combine: &'b AccountsToCombine<'b>,
packed_contents: Vec<PackedAncientStorage<'b>>,
) -> WriteAncientAccounts<'a> {
let mut write_ancient_accounts = WriteAncientAccounts::default();
let write_ancient_accounts = Mutex::new(WriteAncientAccounts::default());
// ok if we have more slots, but NOT ok if we have fewer slots than we have contents
assert!(accounts_to_combine.target_slots_sorted.len() >= packed_contents.len());
// write packed storages containing contents from many original slots
// iterate slots in highest to lowest
accounts_to_combine
let packer = accounts_to_combine
.target_slots_sorted
.iter()
.rev()
.zip(packed_contents)
.for_each(|(target_slot, pack)| {
self.write_one_packed_storage(&pack, *target_slot, &mut write_ancient_accounts);
.collect::<Vec<_>>();
self.thread_pool_clean.install(|| {
packer.par_iter().for_each(|(target_slot, pack)| {
let mut write_ancient_accounts_local = WriteAncientAccounts::default();
self.write_one_packed_storage(
pack,
**target_slot,
&mut write_ancient_accounts_local,
);
let mut write = write_ancient_accounts.lock().unwrap();
write
.shrinks_in_progress
.extend(write_ancient_accounts_local.shrinks_in_progress);
write
.metrics
.accumulate(&write_ancient_accounts_local.metrics);
});
});
let mut write_ancient_accounts = write_ancient_accounts.into_inner().unwrap();
// write new storages where contents were unable to move because ref_count > 1
self.write_ancient_accounts_to_same_slot_multiple_refs(