From a245efe83da0f31941f387fd70833688aea0b9b1 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Wed, 23 Feb 2022 15:22:29 -0600 Subject: [PATCH] trait for acct data with slot per item (#23285) --- runtime/src/accounts_db.rs | 104 +++++++++++++++++-------------- runtime/src/lib.rs | 1 + runtime/src/storable_accounts.rs | 38 +++++++++++ 3 files changed, 96 insertions(+), 47 deletions(-) create mode 100644 runtime/src/storable_accounts.rs diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index f361d9a8cc..52c1623a0e 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -42,6 +42,7 @@ use { read_only_accounts_cache::ReadOnlyAccountsCache, rent_collector::RentCollector, sorted_storages::SortedStorages, + storable_accounts::StorableAccounts, }, blake3::traits::digest::Digest, crossbeam_channel::{unbounded, Receiver, Sender}, @@ -4942,33 +4943,36 @@ impl AccountsDb { } fn store_accounts_to< + 'a, F: FnMut(Slot, usize) -> Arc, P: Iterator, + T: ReadableAccount + Sync + ZeroLamport, >( &self, - slot: Slot, - accounts: &[(&Pubkey, &(impl ReadableAccount + ZeroLamport))], + accounts: &impl StorableAccounts<'a, T>, hashes: Option<&[impl Borrow]>, storage_finder: F, mut write_version_producer: P, is_cached_store: bool, ) -> Vec { let mut calc_stored_meta_time = Measure::start("calc_stored_meta"); - let accounts_and_meta_to_store: Vec<_> = accounts - .iter() - .map(|(pubkey, account)| { - self.read_only_accounts_cache.remove(**pubkey, slot); + let slot = accounts.target_slot(); + let accounts_and_meta_to_store: Vec<_> = (0..accounts.len()) + .into_iter() + .map(|index| { + let (pubkey, account) = (accounts.pubkey(index), accounts.account(index)); + self.read_only_accounts_cache.remove(*pubkey, slot); // this is the source of Some(Account) or None. // Some(Account) = store 'Account' // None = store a default/empty account with 0 lamports let (account, data_len) = if account.is_zero_lamport() { (None, 0) } else { - (Some(*account), account.data().len() as u64) + (Some(account), account.data().len() as u64) }; let meta = StoredMeta { write_version: write_version_producer.next().unwrap(), - pubkey: **pubkey, + pubkey: *pubkey, data_len, }; (meta, account) @@ -4995,9 +4999,10 @@ impl AccountsDb { let mut stats = BankHashStats::default(); let len = accounts_and_meta_to_store.len(); let mut hashes = Vec::with_capacity(len); - for account in accounts { - stats.update(account.1); - let hash = Self::hash_account(slot, account.1, account.0); + for index in 0..accounts.len() { + let (pubkey, account) = (accounts.pubkey(index), accounts.account(index)); + stats.update(account); + let hash = Self::hash_account(slot, account, pubkey); hashes.push(hash); } hash_time.stop(); @@ -5942,22 +5947,27 @@ impl AccountsDb { // previous_slot_entry_was_cached = true means we just need to assert that after this update is complete // that there are no items we would have put in reclaims that are not cached - fn update_index( + fn update_index<'a, T: ReadableAccount + Sync>( &self, - slot: Slot, infos: Vec, - accounts: &[(&Pubkey, &T)], + accounts: impl StorableAccounts<'a, T>, previous_slot_entry_was_cached: bool, ) -> SlotList { + let slot = accounts.target_slot(); // using a thread pool here results in deadlock panics from bank_hashes.write() // so, instead we limit how many threads will be created to the same size as the bg thread pool - let chunk_size = std::cmp::max(1, accounts.len() / quarter_thread_count()); // # pubkeys/thread - infos - .par_chunks(chunk_size) - .zip(accounts.par_chunks(chunk_size)) - .map(|(infos_chunk, accounts_chunk)| { - let mut reclaims = Vec::with_capacity(infos_chunk.len() / 2); - for (info, pubkey_account) in infos_chunk.iter().zip(accounts_chunk.iter()) { + let len = std::cmp::min(accounts.len(), infos.len()); + let chunk_size = std::cmp::max(1, len / quarter_thread_count()); // # pubkeys/thread + let batches = 1 + len / chunk_size; + (0..batches) + .into_par_iter() + .map(|batch| { + let start = batch * chunk_size; + let end = std::cmp::min(start + chunk_size, len); + let mut reclaims = Vec::with_capacity((end - start) / 2); + (start..end).into_iter().for_each(|i| { + let info = infos[i]; + let pubkey_account = (accounts.pubkey(i), accounts.account(i)); let pubkey = pubkey_account.0; self.accounts_index.upsert( slot, @@ -5965,11 +5975,11 @@ impl AccountsDb { pubkey_account.1.owner(), pubkey_account.1.data(), &self.account_indexes, - *info, + info, &mut reclaims, previous_slot_entry_was_cached, ); - } + }); reclaims }) .flatten() @@ -6255,16 +6265,20 @@ impl AccountsDb { } pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { - self.store(slot, accounts, self.caching_enabled); + self.store((slot, accounts), self.caching_enabled); } /// Store the account update. /// only called by tests pub fn store_uncached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { - self.store(slot, accounts, false); + self.store((slot, accounts), false) } - fn store(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)], is_cached_store: bool) { + fn store<'a, T: ReadableAccount + Sync + ZeroLamport>( + &self, + accounts: impl StorableAccounts<'a, T>, + is_cached_store: bool, + ) { // If all transactions in a batch are errored, // it's possible to get a store with no accounts. if accounts.is_empty() { @@ -6273,9 +6287,10 @@ impl AccountsDb { let mut stats = BankHashStats::default(); let mut total_data = 0; - accounts.iter().for_each(|(_pubkey, account)| { + (0..accounts.len()).for_each(|index| { + let account = accounts.account(index); total_data += account.data().len(); - stats.update(*account); + stats.update(account); }); self.stats @@ -6286,13 +6301,13 @@ impl AccountsDb { // we need to drop bank_hashes to prevent deadlocks let mut bank_hashes = self.bank_hashes.write().unwrap(); let slot_info = bank_hashes - .entry(slot) + .entry(accounts.target_slot()) .or_insert_with(BankHashInfo::default); slot_info.stats.merge(&stats); } // we use default hashes for now since the same account may be stored to the cache multiple times - self.store_accounts_unfrozen(slot, accounts, None, is_cached_store); + self.store_accounts_unfrozen(accounts, None, is_cached_store); self.report_store_timings(); } @@ -6407,10 +6422,9 @@ impl AccountsDb { } } - fn store_accounts_unfrozen( + fn store_accounts_unfrozen<'a, T: ReadableAccount + Sync + ZeroLamport>( &self, - slot: Slot, - accounts: &[(&Pubkey, &AccountSharedData)], + accounts: impl StorableAccounts<'a, T>, hashes: Option<&[&Hash]>, is_cached_store: bool, ) { @@ -6423,7 +6437,6 @@ impl AccountsDb { let reset_accounts = true; self.store_accounts_custom( - slot, accounts, hashes, None::, @@ -6447,8 +6460,7 @@ impl AccountsDb { let reset_accounts = false; let is_cached_store = false; self.store_accounts_custom( - slot, - accounts, + (slot, accounts), hashes, storage_finder, write_version_producer, @@ -6457,17 +6469,17 @@ impl AccountsDb { ) } - fn store_accounts_custom<'a, T: ReadableAccount + Sync + ZeroLamport>( + fn store_accounts_custom<'a, 'b, T: ReadableAccount + Sync + ZeroLamport>( &'a self, - slot: Slot, - accounts: &[(&Pubkey, &T)], + accounts: impl StorableAccounts<'b, T>, hashes: Option<&[impl Borrow]>, storage_finder: Option>, write_version_producer: Option>>, is_cached_store: bool, reset_accounts: bool, ) -> StoreAccountsTiming { - let storage_finder: StorageFinder<'a> = storage_finder + let slot = accounts.target_slot(); + let storage_finder = storage_finder .unwrap_or_else(|| Box::new(move |slot, size| self.find_storage_candidate(slot, size))); let write_version_producer: Box> = write_version_producer @@ -6485,8 +6497,7 @@ impl AccountsDb { .fetch_add(accounts.len() as u64, Ordering::Relaxed); let mut store_accounts_time = Measure::start("store_accounts"); let infos = self.store_accounts_to( - slot, - accounts, + &accounts, hashes, storage_finder, write_version_producer, @@ -6504,7 +6515,7 @@ impl AccountsDb { // after the account are stored by the above `store_accounts_to` // call and all the accounts are stored, all reads after this point // will know to not check the cache anymore - let mut reclaims = self.update_index(slot, infos, accounts, previous_slot_entry_was_cached); + let mut reclaims = self.update_index(infos, accounts, previous_slot_entry_was_cached); // For each updated account, `reclaims` should only have at most one // item (if the account was previously updated in this slot). @@ -9901,8 +9912,7 @@ pub mod tests { // put wrong hash value in store so we get a mismatch db.store_accounts_unfrozen( - some_slot, - &[(&key, &account)], + (some_slot, &[(&key, &account)][..]), Some(&[&Hash::default()]), false, ); @@ -10058,7 +10068,7 @@ pub mod tests { let account = AccountSharedData::new(1, some_data_len, &key); let ancestors = vec![(some_slot, 0)].into_iter().collect(); - let accounts = &[(&key, &account)]; + let accounts = &[(&key, &account)][..]; // update AccountsDb's bank hash { let mut bank_hashes = db.bank_hashes.write().unwrap(); @@ -10068,7 +10078,7 @@ pub mod tests { } // provide bogus account hashes let some_hash = Hash::new(&[0xca; HASH_BYTES]); - db.store_accounts_unfrozen(some_slot, accounts, Some(&[&some_hash]), false); + db.store_accounts_unfrozen((some_slot, accounts), Some(&[&some_hash]), false); db.add_root(some_slot); assert_matches!( db.verify_bank_hash_and_lamports(some_slot, &ancestors, 1, true), diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 77d7f555f1..5d6eb223e7 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -57,6 +57,7 @@ pub mod stake_history; pub mod stake_weighted_timestamp; pub mod stakes; pub mod status_cache; +mod storable_accounts; mod system_instruction_processor; pub mod transaction_batch; pub mod transaction_cost_metrics_sender; diff --git a/runtime/src/storable_accounts.rs b/runtime/src/storable_accounts.rs new file mode 100644 index 0000000000..ce802a401c --- /dev/null +++ b/runtime/src/storable_accounts.rs @@ -0,0 +1,38 @@ +//! trait for abstracting underlying storage of pubkey and account pairs to be written +use solana_sdk::{account::ReadableAccount, clock::Slot, pubkey::Pubkey}; + +/// abstract access to pubkey, account, slot, target_slot of either: +/// a. (slot, &[&Pubkey, &ReadableAccount]) +/// b. (slot, &[&Pubkey, &ReadableAccount, Slot]) (we will use this later) +/// This trait avoids having to allocate redundant data when there is a duplicated slot parameter. +/// All legacy callers do not have a unique slot per account to store. +pub trait StorableAccounts<'a, T: ReadableAccount + Sync>: Sync { + /// pubkey at 'index' + fn pubkey(&self, index: usize) -> &Pubkey; + /// account at 'index' + fn account(&self, index: usize) -> &T; + /// slot that all accounts are to be written to + fn target_slot(&self) -> Slot; + /// true if no accounts to write + fn is_empty(&self) -> bool; + /// # accounts to write + fn len(&self) -> usize; +} + +impl<'a, T: ReadableAccount + Sync> StorableAccounts<'a, T> for (Slot, &'a [(&'a Pubkey, &'a T)]) { + fn pubkey(&self, index: usize) -> &Pubkey { + self.1[index].0 + } + fn account(&self, index: usize) -> &T { + self.1[index].1 + } + fn target_slot(&self) -> Slot { + self.0 + } + fn is_empty(&self) -> bool { + self.1.is_empty() + } + fn len(&self) -> usize { + self.1.len() + } +}