From 5d88a9b32b11ee7ca69c83dc63c3da762724d20c Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Tue, 22 Nov 2022 17:36:57 -0600 Subject: [PATCH] improve perf of storing accounts for shrink/ancient (#28880) * improve perf of storing accounts for shrink/ancient * rename * phantom data * update comment * make impl Borrow consistent * remove unused static --- runtime/benches/append_vec.rs | 22 ++- runtime/src/accounts.rs | 2 +- runtime/src/accounts_db.rs | 215 +++++++++++---------- runtime/src/ancient_append_vecs.rs | 24 +-- runtime/src/append_vec.rs | 298 +++++++++++++++++++++++++++-- runtime/src/bank.rs | 2 +- runtime/src/snapshot_minimizer.rs | 2 +- runtime/src/storable_accounts.rs | 36 +++- 8 files changed, 459 insertions(+), 142 deletions(-) diff --git a/runtime/benches/append_vec.rs b/runtime/benches/append_vec.rs index 543a0c60b5..439752c07e 100644 --- a/runtime/benches/append_vec.rs +++ b/runtime/benches/append_vec.rs @@ -3,12 +3,16 @@ extern crate test; use { rand::{thread_rng, Rng}, - solana_runtime::append_vec::{ - test_utils::{create_test_account, get_append_vec_path}, - AppendVec, StoredMeta, + solana_runtime::{ + accounts_db::INCLUDE_SLOT_IN_HASH_TESTS, + append_vec::{ + test_utils::{create_test_account, get_append_vec_path}, + AppendVec, StorableAccountsWithHashesAndWriteVersions, StoredMeta, + }, }, solana_sdk::{ account::{AccountSharedData, ReadableAccount}, + clock::Slot, hash::Hash, }, std::{ @@ -28,7 +32,17 @@ fn append_account( account: &AccountSharedData, hash: Hash, ) -> Option { - let res = vec.append_accounts(&[(storage_meta, Some(account))], &[&hash]); + let slot_ignored = Slot::MAX; + let accounts = [(&storage_meta.pubkey, account)]; + let slice = &accounts[..]; + let accounts = (slot_ignored, slice, INCLUDE_SLOT_IN_HASH_TESTS); + let storable_accounts = + StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions( + &accounts, + vec![&hash], + vec![storage_meta.write_version], + ); + let res = vec.append_accounts(&storable_accounts, 0); res.and_then(|res| res.first().cloned()) } diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index e3b5913663..e4157e80a7 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1331,7 +1331,7 @@ impl Accounts { ); } - pub fn store_accounts_cached<'a, T: ReadableAccount + Sync + ZeroLamport>( + pub fn store_accounts_cached<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>( &self, accounts: impl StorableAccounts<'a, T>, ) { diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index eb0f0ecd0d..b6a8402288 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -40,7 +40,10 @@ use { ancient_append_vecs::{ get_ancient_append_vec_capacity, is_ancient, AccountsToStore, StorageSelector, }, - append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion}, + append_vec::{ + AppendVec, StorableAccountsWithHashesAndWriteVersions, StoredAccountMeta, StoredMeta, + StoredMetaWriteVersion, + }, bank::Rewrites, cache_hash_data::{CacheHashData, CacheHashDataFile}, contains::Contains, @@ -229,7 +232,7 @@ impl CurrentAncientAppendVec { accounts_to_store: &AccountsToStore, storage_selector: StorageSelector, ) -> StoreAccountsTiming { - let (accounts, hashes) = accounts_to_store.get(storage_selector); + let accounts = accounts_to_store.get(storage_selector); db.store_accounts_frozen( ( @@ -238,7 +241,7 @@ impl CurrentAncientAppendVec { INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION, accounts_to_store.slot, ), - Some(hashes), + None::>, Some(self.append_vec()), None, StoreReclaims::Ignore, @@ -289,7 +292,7 @@ impl AncientSlotPubkeys { // we are taking accounts from 'slot' and putting them into 'current_ancient.slot()' // StorageSelector::Primary here because only the accounts that are moving from 'slot' to 'current_ancient.slot()' // Any overflow accounts will get written into a new append vec AT 'slot', so they don't need to be unrefed - let (accounts, _hashes) = to_store.get(StorageSelector::Primary); + let accounts = to_store.get(StorageSelector::Primary); if Some(current_ancient.slot()) != self.inner.as_ref().map(|ap| ap.slot) { let pubkeys = current_ancient .append_vec() @@ -3985,7 +3988,7 @@ impl AccountsDb { &accounts[..], INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION, ), - Some(&hashes), + Some(hashes), Some(&shrunken_store), Some(Box::new(write_versions.into_iter())), StoreReclaims::Ignore, @@ -6203,31 +6206,35 @@ impl AccountsDb { .fetch_add(count as StoredMetaWriteVersion, Ordering::AcqRel) } - fn write_accounts_to_storage Arc>( + fn write_accounts_to_storage< + 'a, + 'b, + F: FnMut(Slot, usize) -> Arc, + T: ReadableAccount + Sync, + U: StorableAccounts<'a, T>, + V: Borrow, + >( &self, slot: Slot, - hashes: &[impl Borrow], mut storage_finder: F, - accounts_and_meta_to_store: &[(StoredMeta, Option<&impl ReadableAccount>)], + accounts_and_meta_to_store: &StorableAccountsWithHashesAndWriteVersions<'a, 'b, T, U, V>, ) -> Vec { - assert_eq!(hashes.len(), accounts_and_meta_to_store.len()); let mut infos: Vec = Vec::with_capacity(accounts_and_meta_to_store.len()); let mut total_append_accounts_us = 0; let mut total_storage_find_us = 0; while infos.len() < accounts_and_meta_to_store.len() { let mut storage_find = Measure::start("storage_finder"); - let data_len = accounts_and_meta_to_store[infos.len()] - .1 + let account = accounts_and_meta_to_store.account(infos.len()); + let data_len = account .map(|account| account.data().len()) .unwrap_or_default(); let storage = storage_finder(slot, data_len + STORE_META_OVERHEAD); storage_find.stop(); total_storage_find_us += storage_find.as_us(); let mut append_accounts = Measure::start("append_accounts"); - let rvs = storage.accounts.append_accounts( - &accounts_and_meta_to_store[infos.len()..], - &hashes[infos.len()..], - ); + let rvs = storage + .accounts + .append_accounts(accounts_and_meta_to_store, infos.len()); append_accounts.stop(); total_append_accounts_us += append_accounts.as_us(); if rvs.is_none() { @@ -6247,18 +6254,15 @@ impl AccountsDb { continue; } - for (offsets, (_, account)) in rvs - .unwrap() - .windows(2) - .zip(&accounts_and_meta_to_store[infos.len()..]) - { + for (i, offsets) in rvs.unwrap().windows(2).enumerate() { let stored_size = offsets[1] - offsets[0]; storage.add_account(stored_size); infos.push(AccountInfo::new( StorageLocation::AppendVec(storage.append_vec_id(), offsets[0]), stored_size as StoredSize, // stored_size should never exceed StoredSize::MAX because of max data len const - account + accounts_and_meta_to_store + .account(i) .map(|account| account.lamports()) .unwrap_or_default(), )); @@ -6571,7 +6575,7 @@ impl AccountsDb { let include_slot_in_hash = IncludeSlotInHash::IrrelevantAssertOnUse; self.store_accounts_frozen( (slot, &accounts[..], include_slot_in_hash), - Some(&hashes), + Some(hashes), Some(&flushed_store), None, StoreReclaims::Default, @@ -6589,7 +6593,7 @@ impl AccountsDb { }); self.store_accounts_frozen( (slot, &accounts[..], include_slot_in_hash), - Some(&hashes), + Some(hashes), Some(&flushed_store), None, StoreReclaims::Ignore, @@ -6682,28 +6686,18 @@ impl AccountsDb { } } - fn write_accounts_to_cache<'a>( + fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync>( &self, slot: Slot, - hashes: Option<&[impl Borrow]>, - accounts_and_meta_to_store: &[(StoredMeta, Option<&impl ReadableAccount>)], + accounts_and_meta_to_store: &impl StorableAccounts<'b, T>, txn_signatures_iter: Box> + 'a>, include_slot_in_hash: IncludeSlotInHash, ) -> Vec { - let len = accounts_and_meta_to_store.len(); - let hashes = hashes.map(|hashes| { - assert_eq!(hashes.len(), len); - hashes - }); - - accounts_and_meta_to_store - .iter() - .zip(txn_signatures_iter) + txn_signatures_iter .enumerate() - .map(|(i, ((meta, account), signature))| { - let hash = hashes.map(|hashes| hashes[i].borrow()); - - let account = account + .map(|(i, signature)| { + let account = accounts_and_meta_to_store + .account_default_if_zero_lamport(i) .map(|account| account.to_account_shared_data()) .unwrap_or_default(); let account_info = AccountInfo::new( @@ -6712,13 +6706,19 @@ impl AccountsDb { account.lamports(), ); - self.notify_account_at_accounts_update(slot, meta, &account, signature); + let meta = StoredMeta { + pubkey: *accounts_and_meta_to_store.pubkey(i), + data_len: account.data().len() as u64, + write_version: 0, + }; + + self.notify_account_at_accounts_update(slot, &meta, &account, signature); let cached_account = self.accounts_cache.store( slot, - &meta.pubkey, + accounts_and_meta_to_store.pubkey(i), account, - hash, + None::<&Hash>, include_slot_in_hash, ); // hash this account in the bg @@ -6734,42 +6734,27 @@ impl AccountsDb { } fn store_accounts_to< - 'a, + 'a: 'c, + 'b, + 'c, F: FnMut(Slot, usize) -> Arc, P: Iterator, - T: ReadableAccount + Sync + ZeroLamport, + T: ReadableAccount + Sync + ZeroLamport + 'b, >( &self, - accounts: &impl StorableAccounts<'a, T>, - hashes: Option<&[impl Borrow]>, + accounts: &'c impl StorableAccounts<'b, T>, + hashes: Option>>, storage_finder: F, mut write_version_producer: P, is_cached_store: bool, - txn_signatures: Option<&'a [Option<&'a Signature>]>, + txn_signatures: Option<&[Option<&'a Signature>]>, ) -> Vec { let mut calc_stored_meta_time = Measure::start("calc_stored_meta"); let slot = accounts.target_slot(); - let accounts_and_meta_to_store: Vec<_> = (0..accounts.len()) - .into_iter() - .map(|index| { - let (pubkey, account) = (accounts.pubkey(index), accounts.account(index)); - self.read_only_accounts_cache.remove(*pubkey, slot); - // this is the source of Some(Account) or None. - // Some(Account) = store 'Account' - // None = store a default/empty account with 0 lamports - let (account, data_len) = if account.is_zero_lamport() { - (None, 0) - } else { - (Some(account), account.data().len() as u64) - }; - let meta = StoredMeta { - write_version: write_version_producer.next().unwrap(), - pubkey: *pubkey, - data_len, - }; - (meta, account) - }) - .collect(); + (0..accounts.len()).into_iter().for_each(|index| { + let pubkey = accounts.pubkey(index); + self.read_only_accounts_cache.remove(*pubkey, slot); + }); calc_stored_meta_time.stop(); self.stats .calc_stored_meta @@ -6779,33 +6764,42 @@ impl AccountsDb { let signature_iter: Box>> = match txn_signatures { Some(txn_signatures) => { - assert_eq!(txn_signatures.len(), accounts_and_meta_to_store.len()); + assert_eq!(txn_signatures.len(), accounts.len()); Box::new(txn_signatures.iter()) } - None => { - Box::new(std::iter::repeat(&None).take(accounts_and_meta_to_store.len())) - } + None => Box::new(std::iter::repeat(&None).take(accounts.len())), }; self.write_accounts_to_cache( slot, - hashes, - &accounts_and_meta_to_store, + accounts, signature_iter, accounts.include_slot_in_hash(), ) + } else if accounts.has_hash_and_write_version() { + self.write_accounts_to_storage( + slot, + storage_finder, + &StorableAccountsWithHashesAndWriteVersions::<'_, '_, _, _, &Hash>::new(accounts), + ) } else { + let write_versions = (0..accounts.len()) + .map(|_| write_version_producer.next().unwrap()) + .collect::>(); match hashes { Some(hashes) => self.write_accounts_to_storage( slot, - hashes, storage_finder, - &accounts_and_meta_to_store, + &StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions( + accounts, + hashes, + write_versions, + ), ), None => { // hash any accounts where we were lazy in calculating the hash let mut hash_time = Measure::start("hash_accounts"); - let len = accounts_and_meta_to_store.len(); + let len = accounts.len(); let mut hashes = Vec::with_capacity(len); for index in 0..accounts.len() { let (pubkey, account) = (accounts.pubkey(index), accounts.account(index)); @@ -6823,11 +6817,10 @@ impl AccountsDb { .fetch_add(hash_time.as_us(), Ordering::Relaxed); self.write_accounts_to_storage( - slot, - &hashes, - storage_finder, - &accounts_and_meta_to_store, - ) + slot, + storage_finder, + &StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions(accounts, hashes, write_versions), + ) } } } @@ -7939,7 +7932,7 @@ impl AccountsDb { fn update_index<'a, T: ReadableAccount + Sync>( &self, infos: Vec, - accounts: impl StorableAccounts<'a, T>, + accounts: &impl StorableAccounts<'a, T>, reclaim: UpsertReclaim, ) -> SlotList { let target_slot = accounts.target_slot(); @@ -8344,7 +8337,7 @@ impl AccountsDb { .fetch_add(measure.as_us(), Ordering::Relaxed); } - pub fn store_cached<'a, T: ReadableAccount + Sync + ZeroLamport>( + pub fn store_cached<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>( &self, accounts: impl StorableAccounts<'a, T>, txn_signatures: Option<&'a [Option<&'a Signature>]>, @@ -8368,7 +8361,7 @@ impl AccountsDb { ); } - fn store<'a, T: ReadableAccount + Sync + ZeroLamport>( + fn store<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>( &self, accounts: impl StorableAccounts<'a, T>, is_cached_store: bool, @@ -8403,7 +8396,13 @@ impl AccountsDb { } // we use default hashes for now since the same account may be stored to the cache multiple times - self.store_accounts_unfrozen(accounts, None, is_cached_store, txn_signatures, reclaim); + self.store_accounts_unfrozen( + accounts, + None::>, + is_cached_store, + txn_signatures, + reclaim, + ); self.report_store_timings(); } @@ -8530,10 +8529,10 @@ impl AccountsDb { } } - fn store_accounts_unfrozen<'a, T: ReadableAccount + Sync + ZeroLamport>( + fn store_accounts_unfrozen<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>( &self, accounts: impl StorableAccounts<'a, T>, - hashes: Option<&[&Hash]>, + hashes: Option>>, is_cached_store: bool, txn_signatures: Option<&'a [Option<&'a Signature>]>, reclaim: StoreReclaims, @@ -8558,10 +8557,10 @@ impl AccountsDb { ); } - pub(crate) fn store_accounts_frozen<'a, T: ReadableAccount + Sync + ZeroLamport>( + pub(crate) fn store_accounts_frozen<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>( &'a self, accounts: impl StorableAccounts<'a, T>, - hashes: Option<&[impl Borrow]>, + hashes: Option>>, storage: Option<&'a Arc>, write_version_producer: Option>>, reclaim: StoreReclaims, @@ -8583,15 +8582,15 @@ impl AccountsDb { ) } - fn store_accounts_custom<'a, 'b, T: ReadableAccount + Sync + ZeroLamport>( - &'a self, - accounts: impl StorableAccounts<'b, T>, - hashes: Option<&[impl Borrow]>, + fn store_accounts_custom<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>( + &self, + accounts: impl StorableAccounts<'a, T>, + hashes: Option>>, storage: Option<&'a Arc>, write_version_producer: Option>>, is_cached_store: bool, reset_accounts: bool, - txn_signatures: Option<&'b [Option<&'b Signature>]>, + txn_signatures: Option<&[Option<&Signature>]>, reclaim: StoreReclaims, ) -> StoreAccountsTiming { let storage_finder = Box::new(move |slot, size| { @@ -8644,7 +8643,7 @@ impl AccountsDb { // after the account are stored by the above `store_accounts_to` // call and all the accounts are stored, all reads after this point // will know to not check the cache anymore - let mut reclaims = self.update_index(infos, accounts, reclaim); + let mut reclaims = self.update_index(infos, &accounts, reclaim); // For each updated account, `reclaims` should only have at most one // item (if the account was previously updated in this slot). @@ -9102,7 +9101,7 @@ impl AccountsDb { let include_slot_in_hash = INCLUDE_SLOT_IN_HASH_TESTS; self.store_accounts_frozen( (*slot, &add[..], include_slot_in_hash), - Some(&hashes[..]), + Some(hashes), None, None, StoreReclaims::Ignore, @@ -10766,12 +10765,18 @@ pub mod tests { account: &AccountSharedData, write_version: StoredMetaWriteVersion, ) { - let sm = StoredMeta { - data_len: 1, - pubkey: *pubkey, - write_version, - }; - vec.append_accounts(&[(sm, Some(account))], &[&Hash::default()]); + let slot_ignored = Slot::MAX; + let accounts = [(pubkey, account)]; + let slice = &accounts[..]; + let account_data = (slot_ignored, slice); + let hash = Hash::default(); + let storable_accounts = + StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions( + &account_data, + vec![&hash], + vec![write_version], + ); + vec.append_accounts(&storable_accounts, 0); } #[test] @@ -12789,7 +12794,7 @@ pub mod tests { // put wrong hash value in store so we get a mismatch db.store_accounts_unfrozen( (some_slot, &[(&key, &account)][..]), - Some(&[&Hash::default()]), + Some(vec![&Hash::default()]), false, None, StoreReclaims::Default, @@ -13052,7 +13057,7 @@ pub mod tests { let some_hash = Hash::new(&[0xca; HASH_BYTES]); db.store_accounts_unfrozen( (some_slot, accounts), - Some(&[&some_hash]), + Some(vec![&some_hash]), false, None, StoreReclaims::Default, diff --git a/runtime/src/ancient_append_vecs.rs b/runtime/src/ancient_append_vecs.rs index 0f3b0e0cc1..1d948a741d 100644 --- a/runtime/src/ancient_append_vecs.rs +++ b/runtime/src/ancient_append_vecs.rs @@ -8,7 +8,7 @@ use { accounts_db::FoundStoredAccount, append_vec::{AppendVec, StoredAccountMeta}, }, - solana_sdk::{clock::Slot, hash::Hash}, + solana_sdk::clock::Slot, }; /// a set of accounts need to be stored. @@ -25,7 +25,6 @@ pub enum StorageSelector { /// We need 1-2 of these slices constructed based on available bytes and individual account sizes. /// The slice arithmetic accross both hashes and account data gets messy. So, this struct abstracts that. pub struct AccountsToStore<'a> { - hashes: Vec<&'a Hash>, accounts: Vec<&'a StoredAccountMeta<'a>>, /// if 'accounts' contains more items than can be contained in the primary storage, then we have to split these accounts. /// 'index_first_item_overflow' specifies the index of the first item in 'accounts' that will go into the overflow storage @@ -42,7 +41,6 @@ impl<'a> AccountsToStore<'a> { slot: Slot, ) -> Self { let num_accounts = stored_accounts.len(); - let mut hashes = Vec::with_capacity(num_accounts); let mut accounts = Vec::with_capacity(num_accounts); // index of the first account that doesn't fit in the current append vec let mut index_first_item_overflow = num_accounts; // assume all fit @@ -53,15 +51,13 @@ impl<'a> AccountsToStore<'a> { } else if index_first_item_overflow == num_accounts { available_bytes = 0; // the # of accounts we have so far seen is the most that will fit in the current ancient append vec - index_first_item_overflow = hashes.len(); + index_first_item_overflow = accounts.len(); } - hashes.push(account.account.hash); // we have to specify 'slot' here because we are writing to an ancient append vec and squashing slots, // so we need to update the previous accounts index entry for this account from 'slot' to 'ancient_slot' accounts.push(&account.account); }); Self { - hashes, accounts, index_first_item_overflow, slot, @@ -73,13 +69,13 @@ impl<'a> AccountsToStore<'a> { self.index_first_item_overflow < self.accounts.len() } - /// get the accounts and hashes to store in the given 'storage' - pub fn get(&self, storage: StorageSelector) -> (&[&'a StoredAccountMeta<'a>], &[&'a Hash]) { + /// get the accounts to store in the given 'storage' + pub fn get(&self, storage: StorageSelector) -> &[&'a StoredAccountMeta<'a>] { let range = match storage { StorageSelector::Primary => 0..self.index_first_item_overflow, StorageSelector::Overflow => self.index_first_item_overflow..self.accounts.len(), }; - (&self.accounts[range.clone()], &self.hashes[range]) + &self.accounts[range] } } @@ -107,6 +103,7 @@ pub mod tests { }, solana_sdk::{ account::{AccountSharedData, ReadableAccount}, + hash::Hash, pubkey::Pubkey, }, }; @@ -117,9 +114,8 @@ pub mod tests { let slot = 1; let accounts_to_store = AccountsToStore::new(0, &map, slot); for selector in [StorageSelector::Primary, StorageSelector::Overflow] { - let (accounts, hash) = accounts_to_store.get(selector); + let accounts = accounts_to_store.get(selector); assert!(accounts.is_empty()); - assert!(hash.is_empty()); } assert!(!accounts_to_store.has_overflow()); } @@ -164,20 +160,18 @@ pub mod tests { ] { let slot = 1; let accounts_to_store = AccountsToStore::new(available_bytes as u64, &map, slot); - let (accounts, hashes) = accounts_to_store.get(selector); + let accounts = accounts_to_store.get(selector); assert_eq!( accounts, map.iter().map(|b| &b.account).collect::>(), "mismatch" ); - assert_eq!(hashes, vec![&hash]); - let (accounts, hash) = accounts_to_store.get(get_opposite(&selector)); + let accounts = accounts_to_store.get(get_opposite(&selector)); assert_eq!( selector == StorageSelector::Overflow, accounts_to_store.has_overflow() ); assert!(accounts.is_empty()); - assert!(hash.is_empty()); } } fn get_opposite(selector: &StorageSelector) -> StorageSelector { diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index 54722d1864..0b0c42ce4b 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -5,6 +5,7 @@ //! use { + crate::storable_accounts::StorableAccounts, log::*, memmap2::MmapMut, serde::{Deserialize, Serialize}, @@ -19,6 +20,7 @@ use { convert::TryFrom, fs::{remove_file, OpenOptions}, io::{self, Seek, SeekFrom, Write}, + marker::PhantomData, mem, path::{Path, PathBuf}, sync::{ @@ -43,6 +45,94 @@ pub const MAXIMUM_APPEND_VEC_FILE_SIZE: u64 = 16 * 1024 * 1024 * 1024; // 16 GiB pub type StoredMetaWriteVersion = u64; +/// Goal is to eliminate copies and data reshaping given various code paths that store accounts. +/// This struct contains what is needed to store accounts to a storage +/// 1. account & pubkey (StorableAccounts) +/// 2. hash per account (Maybe in StorableAccounts, otherwise has to be passed in separately) +/// 3. write version per account (Maybe in StorableAccounts, otherwise has to be passed in separately) +pub struct StorableAccountsWithHashesAndWriteVersions< + 'a: 'b, + 'b, + T: ReadableAccount + Sync + 'b, + U: StorableAccounts<'a, T>, + V: Borrow, +> { + /// accounts to store + /// always has pubkey and account + /// may also have hash and write_version per account + accounts: &'b U, + /// if accounts does not have hash and write version, this has a hash and write version per account + hashes_and_write_versions: Option<(Vec, Vec)>, + _phantom: PhantomData<&'a T>, +} + +impl<'a: 'b, 'b, T: ReadableAccount + Sync + 'b, U: StorableAccounts<'a, T>, V: Borrow> + StorableAccountsWithHashesAndWriteVersions<'a, 'b, T, U, V> +{ + /// used when accounts contains hash and write version already + pub fn new(accounts: &'b U) -> Self { + assert!(accounts.has_hash_and_write_version()); + Self { + accounts, + hashes_and_write_versions: None, + _phantom: PhantomData, + } + } + /// used when accounts does NOT contains hash or write version + /// In this case, hashes and write_versions have to be passed in separately and zipped together. + pub fn new_with_hashes_and_write_versions( + accounts: &'b U, + hashes: Vec, + write_versions: Vec, + ) -> Self { + assert!(!accounts.has_hash_and_write_version()); + assert_eq!(accounts.len(), hashes.len()); + assert_eq!(write_versions.len(), hashes.len()); + Self { + accounts, + hashes_and_write_versions: Some((hashes, write_versions)), + _phantom: PhantomData, + } + } + + /// hash for the account at 'index' + pub fn hash(&self, index: usize) -> &Hash { + if self.accounts.has_hash_and_write_version() { + self.accounts.hash(index) + } else { + self.hashes_and_write_versions.as_ref().unwrap().0[index].borrow() + } + } + /// write_version for the account at 'index' + pub fn write_version(&self, index: usize) -> u64 { + if self.accounts.has_hash_and_write_version() { + self.accounts.write_version(index) + } else { + self.hashes_and_write_versions.as_ref().unwrap().1[index] + } + } + /// None if account at index has lamports == 0 + /// Otherwise, Some(account) + /// This is the only way to access the account. + pub fn account(&self, index: usize) -> Option<&T> { + self.accounts.account_default_if_zero_lamport(index) + } + + /// pubkey at 'index' + pub fn pubkey(&self, index: usize) -> &Pubkey { + self.accounts.pubkey(index) + } + + /// # accounts to write + pub fn len(&self) -> usize { + self.accounts.len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + /// Meta contains enough context to recover the index from storage itself /// This struct will be backed by mmaped and snapshotted data files. /// So the data layout must be stable and consistent across the entire cluster! @@ -516,24 +606,48 @@ impl AppendVec { /// So, return.len() is 1 + (number of accounts written) /// After each account is appended, the internal `current_len` is updated /// and will be available to other threads. - pub fn append_accounts( + pub fn append_accounts< + 'a, + 'b, + T: ReadableAccount + Sync, + U: StorableAccounts<'a, T>, + V: Borrow, + >( &self, - accounts: &[(StoredMeta, Option<&impl ReadableAccount>)], - hashes: &[impl Borrow], + accounts: &StorableAccountsWithHashesAndWriteVersions<'a, 'b, T, U, V>, + skip: usize, ) -> Option> { let _lock = self.append_lock.lock().unwrap(); let mut offset = self.len(); - let mut rv = Vec::with_capacity(accounts.len()); - for ((stored_meta, account), hash) in accounts.iter().zip(hashes) { - let meta_ptr = stored_meta as *const StoredMeta; - let account_meta = AccountMeta::from(*account); + + let mut rv = Vec::with_capacity(accounts.accounts.len()); + let len = accounts.accounts.len(); + for i in skip..len { + let account = accounts.account(i); + let account_meta = account + .map(|account| AccountMeta { + lamports: account.lamports(), + owner: *account.owner(), + rent_epoch: account.rent_epoch(), + executable: account.executable(), + }) + .unwrap_or_default(); + + let stored_meta = StoredMeta { + pubkey: *accounts.pubkey(i), + data_len: account + .map(|account| account.data().len()) + .unwrap_or_default() as u64, + write_version: accounts.write_version(i), + }; + let meta_ptr = &stored_meta as *const StoredMeta; let account_meta_ptr = &account_meta as *const AccountMeta; let data_len = stored_meta.data_len as usize; let data_ptr = account .map(|account| account.data()) .unwrap_or_default() .as_ptr(); - let hash_ptr = hash.borrow().as_ref().as_ptr(); + let hash_ptr = accounts.hash(i).as_ref().as_ptr(); let ptrs = [ (meta_ptr as *const u8, mem::size_of::()), (account_meta_ptr as *const u8, mem::size_of::()), @@ -563,19 +677,32 @@ impl AppendVec { pub mod tests { use { super::{test_utils::*, *}, + crate::accounts_db::INCLUDE_SLOT_IN_HASH_TESTS, assert_matches::assert_matches, rand::{thread_rng, Rng}, - solana_sdk::{account::WritableAccount, timing::duration_as_ms}, + solana_sdk::{ + account::{accounts_equal, WritableAccount}, + timing::duration_as_ms, + }, std::time::Instant, }; impl AppendVec { fn append_account_test(&self, data: &(StoredMeta, AccountSharedData)) -> Option { - self.append_accounts( - &[(data.0.clone(), Some(&data.1.clone()))], - &[Hash::default()], - ) - .map(|res| res[0]) + let slot_ignored = Slot::MAX; + let accounts = [(&data.0.pubkey, &data.1)]; + let slice = &accounts[..]; + let account_data = (slot_ignored, slice); + let hash = Hash::default(); + let storable_accounts = + StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions( + &account_data, + vec![&hash], + vec![data.0.write_version], + ); + + self.append_accounts(&storable_accounts, 0) + .map(|res| res[0]) } } @@ -604,6 +731,149 @@ pub mod tests { } } + #[test] + #[should_panic(expected = "assertion failed: accounts.has_hash_and_write_version()")] + fn test_storable_accounts_with_hashes_and_write_versions_new() { + let account = AccountSharedData::default(); + // for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash) + let slot = 0 as Slot; + let pubkey = Pubkey::default(); + StorableAccountsWithHashesAndWriteVersions::<'_, '_, _, _, &Hash>::new(&( + slot, + &[(&pubkey, &account)][..], + INCLUDE_SLOT_IN_HASH_TESTS, + )); + } + + fn test_mismatch(correct_hashes: bool, correct_write_versions: bool) { + let account = AccountSharedData::default(); + // for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash) + let slot = 0 as Slot; + let pubkey = Pubkey::default(); + // mismatch between lens of accounts, hashes, write_versions + let mut hashes = Vec::default(); + if correct_hashes { + hashes.push(Hash::default()); + } + let mut write_versions = Vec::default(); + if correct_write_versions { + write_versions.push(0); + } + StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions( + &(slot, &[(&pubkey, &account)][..], INCLUDE_SLOT_IN_HASH_TESTS), + hashes, + write_versions, + ); + } + + #[test] + #[should_panic(expected = "assertion failed:")] + fn test_storable_accounts_with_hashes_and_write_versions_new2() { + test_mismatch(false, false); + } + + #[test] + #[should_panic(expected = "assertion failed:")] + fn test_storable_accounts_with_hashes_and_write_versions_new3() { + test_mismatch(false, true); + } + + #[test] + #[should_panic(expected = "assertion failed:")] + fn test_storable_accounts_with_hashes_and_write_versions_new4() { + test_mismatch(true, false); + } + + #[test] + fn test_storable_accounts_with_hashes_and_write_versions_empty() { + // for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash) + let account = AccountSharedData::default(); + let slot = 0 as Slot; + let pubkeys = vec![Pubkey::default()]; + let hashes = Vec::::default(); + let write_versions = Vec::default(); + let mut accounts = vec![(&pubkeys[0], &account)]; + accounts.clear(); + let accounts2 = (slot, &accounts[..], INCLUDE_SLOT_IN_HASH_TESTS); + let storable = + StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions( + &accounts2, + hashes, + write_versions, + ); + assert_eq!(storable.len(), 0); + assert!(storable.is_empty()); + } + + #[test] + fn test_storable_accounts_with_hashes_and_write_versions_hash_and_write_version() { + // for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash) + let account = AccountSharedData::default(); + let slot = 0 as Slot; + let pubkeys = vec![Pubkey::new(&[5; 32]), Pubkey::new(&[6; 32])]; + let hashes = vec![Hash::new(&[3; 32]), Hash::new(&[4; 32])]; + let write_versions = vec![42, 43]; + let accounts = vec![(&pubkeys[0], &account), (&pubkeys[1], &account)]; + let accounts2 = (slot, &accounts[..], INCLUDE_SLOT_IN_HASH_TESTS); + let storable = + StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions( + &accounts2, + hashes.clone(), + write_versions.clone(), + ); + assert_eq!(storable.len(), pubkeys.len()); + assert!(!storable.is_empty()); + (0..2).for_each(|i| { + assert_eq!(storable.hash(i), &hashes[i]); + assert_eq!(&storable.write_version(i), &write_versions[i]); + assert_eq!(storable.pubkey(i), &pubkeys[i]); + }); + } + + #[test] + fn test_storable_accounts_with_hashes_and_write_versions_default() { + // 0 lamport account, should return default account (or None in this case) + let account = Account { + data: vec![0], + ..Account::default() + } + .to_account_shared_data(); + // for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash) + let slot = 0 as Slot; + let pubkey = Pubkey::default(); + let hashes = vec![Hash::default()]; + let write_versions = vec![0]; + let accounts = vec![(&pubkey, &account)]; + let accounts2 = (slot, &accounts[..], INCLUDE_SLOT_IN_HASH_TESTS); + let storable = + StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions( + &accounts2, + hashes.clone(), + write_versions.clone(), + ); + let get_account = storable.account(0); + assert!(get_account.is_none()); + + // non-zero lamports, data should be correct + let account = Account { + lamports: 1, + data: vec![0], + ..Account::default() + } + .to_account_shared_data(); + // for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash) + let accounts = vec![(&pubkey, &account)]; + let accounts2 = (slot, &accounts[..], INCLUDE_SLOT_IN_HASH_TESTS); + let storable = + StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions( + &accounts2, + hashes, + write_versions, + ); + let get_account = storable.account(0); + assert!(accounts_equal(&account, get_account.unwrap())); + } + #[test] fn test_account_meta_default() { let def1 = AccountMeta::default(); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 4cf59fe5d2..c4eeaacc67 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -6322,7 +6322,7 @@ impl Bank { )) } - pub fn store_accounts<'a, T: ReadableAccount + Sync + ZeroLamport>( + pub fn store_accounts<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>( &self, accounts: impl StorableAccounts<'a, T>, ) { diff --git a/runtime/src/snapshot_minimizer.rs b/runtime/src/snapshot_minimizer.rs index 99d1d5a226..d6ad32c856 100644 --- a/runtime/src/snapshot_minimizer.rs +++ b/runtime/src/snapshot_minimizer.rs @@ -373,7 +373,7 @@ impl<'a> SnapshotMinimizer<'a> { &accounts[..], crate::accounts_db::INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION, ), - Some(&hashes), + Some(hashes), Some(&new_storage), Some(Box::new(write_versions.into_iter())), StoreReclaims::Default, diff --git a/runtime/src/storable_accounts.rs b/runtime/src/storable_accounts.rs index d8d34b1352..02c3029f84 100644 --- a/runtime/src/storable_accounts.rs +++ b/runtime/src/storable_accounts.rs @@ -1,7 +1,7 @@ //! trait for abstracting underlying storage of pubkey and account pairs to be written use { crate::{accounts_db::IncludeSlotInHash, append_vec::StoredAccountMeta}, - solana_sdk::{account::ReadableAccount, clock::Slot, pubkey::Pubkey}, + solana_sdk::{account::ReadableAccount, clock::Slot, hash::Hash, pubkey::Pubkey}, }; /// abstract access to pubkey, account, slot, target_slot of either: @@ -14,6 +14,11 @@ pub trait StorableAccounts<'a, T: ReadableAccount + Sync>: Sync { fn pubkey(&self, index: usize) -> &Pubkey; /// account at 'index' fn account(&self, index: usize) -> &T; + /// None if account is zero lamports + fn account_default_if_zero_lamport(&self, index: usize) -> Option<&T> { + let account = self.account(index); + (account.lamports() != 0).then_some(account) + } // current slot for account at 'index' fn slot(&self, index: usize) -> Slot; /// slot that all accounts are to be written to @@ -29,6 +34,26 @@ pub trait StorableAccounts<'a, T: ReadableAccount + Sync>: Sync { fn contains_multiple_slots(&self) -> bool; /// true iff hashing these accounts should include the slot fn include_slot_in_hash(&self) -> IncludeSlotInHash; + + /// true iff the impl can provide hash and write_version + /// Otherwise, hash and write_version have to be provided separately to store functions. + fn has_hash_and_write_version(&self) -> bool { + false + } + + /// return hash for account at 'index' + /// Should only be called if 'has_hash_and_write_version' = true + fn hash(&self, _index: usize) -> &Hash { + // this should never be called if has_hash_and_write_version returns false + unimplemented!(); + } + + /// return write_version for account at 'index' + /// Should only be called if 'has_hash_and_write_version' = true + fn write_version(&self, _index: usize) -> u64 { + // this should never be called if has_hash_and_write_version returns false + unimplemented!(); + } } /// accounts that are moving from 'old_slot' to 'target_slot' @@ -159,6 +184,15 @@ impl<'a> StorableAccounts<'a, StoredAccountMeta<'a>> fn include_slot_in_hash(&self) -> IncludeSlotInHash { self.2 } + fn has_hash_and_write_version(&self) -> bool { + true + } + fn hash(&self, index: usize) -> &Hash { + self.1[index].hash + } + fn write_version(&self, index: usize) -> u64 { + self.1[index].meta.write_version + } } #[cfg(test)] pub mod tests {