improve perf of storing accounts for shrink/ancient (#28880)

* improve perf of storing accounts for shrink/ancient

* rename

* phantom data

* update comment

* make impl Borrow<Hash> consistent

* remove unused static
This commit is contained in:
Jeff Washington (jwash) 2022-11-22 17:36:57 -06:00 committed by GitHub
parent ed2c59d0e4
commit 5d88a9b32b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 459 additions and 142 deletions

View File

@ -3,12 +3,16 @@ extern crate test;
use {
rand::{thread_rng, Rng},
solana_runtime::append_vec::{
test_utils::{create_test_account, get_append_vec_path},
AppendVec, StoredMeta,
solana_runtime::{
accounts_db::INCLUDE_SLOT_IN_HASH_TESTS,
append_vec::{
test_utils::{create_test_account, get_append_vec_path},
AppendVec, StorableAccountsWithHashesAndWriteVersions, StoredMeta,
},
},
solana_sdk::{
account::{AccountSharedData, ReadableAccount},
clock::Slot,
hash::Hash,
},
std::{
@ -28,7 +32,17 @@ fn append_account(
account: &AccountSharedData,
hash: Hash,
) -> Option<usize> {
let res = vec.append_accounts(&[(storage_meta, Some(account))], &[&hash]);
let slot_ignored = Slot::MAX;
let accounts = [(&storage_meta.pubkey, account)];
let slice = &accounts[..];
let accounts = (slot_ignored, slice, INCLUDE_SLOT_IN_HASH_TESTS);
let storable_accounts =
StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions(
&accounts,
vec![&hash],
vec![storage_meta.write_version],
);
let res = vec.append_accounts(&storable_accounts, 0);
res.and_then(|res| res.first().cloned())
}

View File

@ -1331,7 +1331,7 @@ impl Accounts {
);
}
pub fn store_accounts_cached<'a, T: ReadableAccount + Sync + ZeroLamport>(
pub fn store_accounts_cached<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>(
&self,
accounts: impl StorableAccounts<'a, T>,
) {

View File

@ -40,7 +40,10 @@ use {
ancient_append_vecs::{
get_ancient_append_vec_capacity, is_ancient, AccountsToStore, StorageSelector,
},
append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion},
append_vec::{
AppendVec, StorableAccountsWithHashesAndWriteVersions, StoredAccountMeta, StoredMeta,
StoredMetaWriteVersion,
},
bank::Rewrites,
cache_hash_data::{CacheHashData, CacheHashDataFile},
contains::Contains,
@ -229,7 +232,7 @@ impl CurrentAncientAppendVec {
accounts_to_store: &AccountsToStore,
storage_selector: StorageSelector,
) -> StoreAccountsTiming {
let (accounts, hashes) = accounts_to_store.get(storage_selector);
let accounts = accounts_to_store.get(storage_selector);
db.store_accounts_frozen(
(
@ -238,7 +241,7 @@ impl CurrentAncientAppendVec {
INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION,
accounts_to_store.slot,
),
Some(hashes),
None::<Vec<Hash>>,
Some(self.append_vec()),
None,
StoreReclaims::Ignore,
@ -289,7 +292,7 @@ impl AncientSlotPubkeys {
// we are taking accounts from 'slot' and putting them into 'current_ancient.slot()'
// StorageSelector::Primary here because only the accounts that are moving from 'slot' to 'current_ancient.slot()'
// Any overflow accounts will get written into a new append vec AT 'slot', so they don't need to be unrefed
let (accounts, _hashes) = to_store.get(StorageSelector::Primary);
let accounts = to_store.get(StorageSelector::Primary);
if Some(current_ancient.slot()) != self.inner.as_ref().map(|ap| ap.slot) {
let pubkeys = current_ancient
.append_vec()
@ -3985,7 +3988,7 @@ impl AccountsDb {
&accounts[..],
INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION,
),
Some(&hashes),
Some(hashes),
Some(&shrunken_store),
Some(Box::new(write_versions.into_iter())),
StoreReclaims::Ignore,
@ -6203,31 +6206,35 @@ impl AccountsDb {
.fetch_add(count as StoredMetaWriteVersion, Ordering::AcqRel)
}
fn write_accounts_to_storage<F: FnMut(Slot, usize) -> Arc<AccountStorageEntry>>(
fn write_accounts_to_storage<
'a,
'b,
F: FnMut(Slot, usize) -> Arc<AccountStorageEntry>,
T: ReadableAccount + Sync,
U: StorableAccounts<'a, T>,
V: Borrow<Hash>,
>(
&self,
slot: Slot,
hashes: &[impl Borrow<Hash>],
mut storage_finder: F,
accounts_and_meta_to_store: &[(StoredMeta, Option<&impl ReadableAccount>)],
accounts_and_meta_to_store: &StorableAccountsWithHashesAndWriteVersions<'a, 'b, T, U, V>,
) -> Vec<AccountInfo> {
assert_eq!(hashes.len(), accounts_and_meta_to_store.len());
let mut infos: Vec<AccountInfo> = Vec::with_capacity(accounts_and_meta_to_store.len());
let mut total_append_accounts_us = 0;
let mut total_storage_find_us = 0;
while infos.len() < accounts_and_meta_to_store.len() {
let mut storage_find = Measure::start("storage_finder");
let data_len = accounts_and_meta_to_store[infos.len()]
.1
let account = accounts_and_meta_to_store.account(infos.len());
let data_len = account
.map(|account| account.data().len())
.unwrap_or_default();
let storage = storage_finder(slot, data_len + STORE_META_OVERHEAD);
storage_find.stop();
total_storage_find_us += storage_find.as_us();
let mut append_accounts = Measure::start("append_accounts");
let rvs = storage.accounts.append_accounts(
&accounts_and_meta_to_store[infos.len()..],
&hashes[infos.len()..],
);
let rvs = storage
.accounts
.append_accounts(accounts_and_meta_to_store, infos.len());
append_accounts.stop();
total_append_accounts_us += append_accounts.as_us();
if rvs.is_none() {
@ -6247,18 +6254,15 @@ impl AccountsDb {
continue;
}
for (offsets, (_, account)) in rvs
.unwrap()
.windows(2)
.zip(&accounts_and_meta_to_store[infos.len()..])
{
for (i, offsets) in rvs.unwrap().windows(2).enumerate() {
let stored_size = offsets[1] - offsets[0];
storage.add_account(stored_size);
infos.push(AccountInfo::new(
StorageLocation::AppendVec(storage.append_vec_id(), offsets[0]),
stored_size as StoredSize, // stored_size should never exceed StoredSize::MAX because of max data len const
account
accounts_and_meta_to_store
.account(i)
.map(|account| account.lamports())
.unwrap_or_default(),
));
@ -6571,7 +6575,7 @@ impl AccountsDb {
let include_slot_in_hash = IncludeSlotInHash::IrrelevantAssertOnUse;
self.store_accounts_frozen(
(slot, &accounts[..], include_slot_in_hash),
Some(&hashes),
Some(hashes),
Some(&flushed_store),
None,
StoreReclaims::Default,
@ -6589,7 +6593,7 @@ impl AccountsDb {
});
self.store_accounts_frozen(
(slot, &accounts[..], include_slot_in_hash),
Some(&hashes),
Some(hashes),
Some(&flushed_store),
None,
StoreReclaims::Ignore,
@ -6682,28 +6686,18 @@ impl AccountsDb {
}
}
fn write_accounts_to_cache<'a>(
fn write_accounts_to_cache<'a, 'b, T: ReadableAccount + Sync>(
&self,
slot: Slot,
hashes: Option<&[impl Borrow<Hash>]>,
accounts_and_meta_to_store: &[(StoredMeta, Option<&impl ReadableAccount>)],
accounts_and_meta_to_store: &impl StorableAccounts<'b, T>,
txn_signatures_iter: Box<dyn std::iter::Iterator<Item = &Option<&Signature>> + 'a>,
include_slot_in_hash: IncludeSlotInHash,
) -> Vec<AccountInfo> {
let len = accounts_and_meta_to_store.len();
let hashes = hashes.map(|hashes| {
assert_eq!(hashes.len(), len);
hashes
});
accounts_and_meta_to_store
.iter()
.zip(txn_signatures_iter)
txn_signatures_iter
.enumerate()
.map(|(i, ((meta, account), signature))| {
let hash = hashes.map(|hashes| hashes[i].borrow());
let account = account
.map(|(i, signature)| {
let account = accounts_and_meta_to_store
.account_default_if_zero_lamport(i)
.map(|account| account.to_account_shared_data())
.unwrap_or_default();
let account_info = AccountInfo::new(
@ -6712,13 +6706,19 @@ impl AccountsDb {
account.lamports(),
);
self.notify_account_at_accounts_update(slot, meta, &account, signature);
let meta = StoredMeta {
pubkey: *accounts_and_meta_to_store.pubkey(i),
data_len: account.data().len() as u64,
write_version: 0,
};
self.notify_account_at_accounts_update(slot, &meta, &account, signature);
let cached_account = self.accounts_cache.store(
slot,
&meta.pubkey,
accounts_and_meta_to_store.pubkey(i),
account,
hash,
None::<&Hash>,
include_slot_in_hash,
);
// hash this account in the bg
@ -6734,42 +6734,27 @@ impl AccountsDb {
}
fn store_accounts_to<
'a,
'a: 'c,
'b,
'c,
F: FnMut(Slot, usize) -> Arc<AccountStorageEntry>,
P: Iterator<Item = u64>,
T: ReadableAccount + Sync + ZeroLamport,
T: ReadableAccount + Sync + ZeroLamport + 'b,
>(
&self,
accounts: &impl StorableAccounts<'a, T>,
hashes: Option<&[impl Borrow<Hash>]>,
accounts: &'c impl StorableAccounts<'b, T>,
hashes: Option<Vec<impl Borrow<Hash>>>,
storage_finder: F,
mut write_version_producer: P,
is_cached_store: bool,
txn_signatures: Option<&'a [Option<&'a Signature>]>,
txn_signatures: Option<&[Option<&'a Signature>]>,
) -> Vec<AccountInfo> {
let mut calc_stored_meta_time = Measure::start("calc_stored_meta");
let slot = accounts.target_slot();
let accounts_and_meta_to_store: Vec<_> = (0..accounts.len())
.into_iter()
.map(|index| {
let (pubkey, account) = (accounts.pubkey(index), accounts.account(index));
self.read_only_accounts_cache.remove(*pubkey, slot);
// this is the source of Some(Account) or None.
// Some(Account) = store 'Account'
// None = store a default/empty account with 0 lamports
let (account, data_len) = if account.is_zero_lamport() {
(None, 0)
} else {
(Some(account), account.data().len() as u64)
};
let meta = StoredMeta {
write_version: write_version_producer.next().unwrap(),
pubkey: *pubkey,
data_len,
};
(meta, account)
})
.collect();
(0..accounts.len()).into_iter().for_each(|index| {
let pubkey = accounts.pubkey(index);
self.read_only_accounts_cache.remove(*pubkey, slot);
});
calc_stored_meta_time.stop();
self.stats
.calc_stored_meta
@ -6779,33 +6764,42 @@ impl AccountsDb {
let signature_iter: Box<dyn std::iter::Iterator<Item = &Option<&Signature>>> =
match txn_signatures {
Some(txn_signatures) => {
assert_eq!(txn_signatures.len(), accounts_and_meta_to_store.len());
assert_eq!(txn_signatures.len(), accounts.len());
Box::new(txn_signatures.iter())
}
None => {
Box::new(std::iter::repeat(&None).take(accounts_and_meta_to_store.len()))
}
None => Box::new(std::iter::repeat(&None).take(accounts.len())),
};
self.write_accounts_to_cache(
slot,
hashes,
&accounts_and_meta_to_store,
accounts,
signature_iter,
accounts.include_slot_in_hash(),
)
} else if accounts.has_hash_and_write_version() {
self.write_accounts_to_storage(
slot,
storage_finder,
&StorableAccountsWithHashesAndWriteVersions::<'_, '_, _, _, &Hash>::new(accounts),
)
} else {
let write_versions = (0..accounts.len())
.map(|_| write_version_producer.next().unwrap())
.collect::<Vec<_>>();
match hashes {
Some(hashes) => self.write_accounts_to_storage(
slot,
hashes,
storage_finder,
&accounts_and_meta_to_store,
&StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions(
accounts,
hashes,
write_versions,
),
),
None => {
// hash any accounts where we were lazy in calculating the hash
let mut hash_time = Measure::start("hash_accounts");
let len = accounts_and_meta_to_store.len();
let len = accounts.len();
let mut hashes = Vec::with_capacity(len);
for index in 0..accounts.len() {
let (pubkey, account) = (accounts.pubkey(index), accounts.account(index));
@ -6823,11 +6817,10 @@ impl AccountsDb {
.fetch_add(hash_time.as_us(), Ordering::Relaxed);
self.write_accounts_to_storage(
slot,
&hashes,
storage_finder,
&accounts_and_meta_to_store,
)
slot,
storage_finder,
&StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions(accounts, hashes, write_versions),
)
}
}
}
@ -7939,7 +7932,7 @@ impl AccountsDb {
fn update_index<'a, T: ReadableAccount + Sync>(
&self,
infos: Vec<AccountInfo>,
accounts: impl StorableAccounts<'a, T>,
accounts: &impl StorableAccounts<'a, T>,
reclaim: UpsertReclaim,
) -> SlotList<AccountInfo> {
let target_slot = accounts.target_slot();
@ -8344,7 +8337,7 @@ impl AccountsDb {
.fetch_add(measure.as_us(), Ordering::Relaxed);
}
pub fn store_cached<'a, T: ReadableAccount + Sync + ZeroLamport>(
pub fn store_cached<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>(
&self,
accounts: impl StorableAccounts<'a, T>,
txn_signatures: Option<&'a [Option<&'a Signature>]>,
@ -8368,7 +8361,7 @@ impl AccountsDb {
);
}
fn store<'a, T: ReadableAccount + Sync + ZeroLamport>(
fn store<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>(
&self,
accounts: impl StorableAccounts<'a, T>,
is_cached_store: bool,
@ -8403,7 +8396,13 @@ impl AccountsDb {
}
// we use default hashes for now since the same account may be stored to the cache multiple times
self.store_accounts_unfrozen(accounts, None, is_cached_store, txn_signatures, reclaim);
self.store_accounts_unfrozen(
accounts,
None::<Vec<Hash>>,
is_cached_store,
txn_signatures,
reclaim,
);
self.report_store_timings();
}
@ -8530,10 +8529,10 @@ impl AccountsDb {
}
}
fn store_accounts_unfrozen<'a, T: ReadableAccount + Sync + ZeroLamport>(
fn store_accounts_unfrozen<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>(
&self,
accounts: impl StorableAccounts<'a, T>,
hashes: Option<&[&Hash]>,
hashes: Option<Vec<impl Borrow<Hash>>>,
is_cached_store: bool,
txn_signatures: Option<&'a [Option<&'a Signature>]>,
reclaim: StoreReclaims,
@ -8558,10 +8557,10 @@ impl AccountsDb {
);
}
pub(crate) fn store_accounts_frozen<'a, T: ReadableAccount + Sync + ZeroLamport>(
pub(crate) fn store_accounts_frozen<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>(
&'a self,
accounts: impl StorableAccounts<'a, T>,
hashes: Option<&[impl Borrow<Hash>]>,
hashes: Option<Vec<impl Borrow<Hash>>>,
storage: Option<&'a Arc<AccountStorageEntry>>,
write_version_producer: Option<Box<dyn Iterator<Item = StoredMetaWriteVersion>>>,
reclaim: StoreReclaims,
@ -8583,15 +8582,15 @@ impl AccountsDb {
)
}
fn store_accounts_custom<'a, 'b, T: ReadableAccount + Sync + ZeroLamport>(
&'a self,
accounts: impl StorableAccounts<'b, T>,
hashes: Option<&[impl Borrow<Hash>]>,
fn store_accounts_custom<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>(
&self,
accounts: impl StorableAccounts<'a, T>,
hashes: Option<Vec<impl Borrow<Hash>>>,
storage: Option<&'a Arc<AccountStorageEntry>>,
write_version_producer: Option<Box<dyn Iterator<Item = u64>>>,
is_cached_store: bool,
reset_accounts: bool,
txn_signatures: Option<&'b [Option<&'b Signature>]>,
txn_signatures: Option<&[Option<&Signature>]>,
reclaim: StoreReclaims,
) -> StoreAccountsTiming {
let storage_finder = Box::new(move |slot, size| {
@ -8644,7 +8643,7 @@ impl AccountsDb {
// after the account are stored by the above `store_accounts_to`
// call and all the accounts are stored, all reads after this point
// will know to not check the cache anymore
let mut reclaims = self.update_index(infos, accounts, reclaim);
let mut reclaims = self.update_index(infos, &accounts, reclaim);
// For each updated account, `reclaims` should only have at most one
// item (if the account was previously updated in this slot).
@ -9102,7 +9101,7 @@ impl AccountsDb {
let include_slot_in_hash = INCLUDE_SLOT_IN_HASH_TESTS;
self.store_accounts_frozen(
(*slot, &add[..], include_slot_in_hash),
Some(&hashes[..]),
Some(hashes),
None,
None,
StoreReclaims::Ignore,
@ -10766,12 +10765,18 @@ pub mod tests {
account: &AccountSharedData,
write_version: StoredMetaWriteVersion,
) {
let sm = StoredMeta {
data_len: 1,
pubkey: *pubkey,
write_version,
};
vec.append_accounts(&[(sm, Some(account))], &[&Hash::default()]);
let slot_ignored = Slot::MAX;
let accounts = [(pubkey, account)];
let slice = &accounts[..];
let account_data = (slot_ignored, slice);
let hash = Hash::default();
let storable_accounts =
StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions(
&account_data,
vec![&hash],
vec![write_version],
);
vec.append_accounts(&storable_accounts, 0);
}
#[test]
@ -12789,7 +12794,7 @@ pub mod tests {
// put wrong hash value in store so we get a mismatch
db.store_accounts_unfrozen(
(some_slot, &[(&key, &account)][..]),
Some(&[&Hash::default()]),
Some(vec![&Hash::default()]),
false,
None,
StoreReclaims::Default,
@ -13052,7 +13057,7 @@ pub mod tests {
let some_hash = Hash::new(&[0xca; HASH_BYTES]);
db.store_accounts_unfrozen(
(some_slot, accounts),
Some(&[&some_hash]),
Some(vec![&some_hash]),
false,
None,
StoreReclaims::Default,

View File

@ -8,7 +8,7 @@ use {
accounts_db::FoundStoredAccount,
append_vec::{AppendVec, StoredAccountMeta},
},
solana_sdk::{clock::Slot, hash::Hash},
solana_sdk::clock::Slot,
};
/// a set of accounts need to be stored.
@ -25,7 +25,6 @@ pub enum StorageSelector {
/// We need 1-2 of these slices constructed based on available bytes and individual account sizes.
/// The slice arithmetic accross both hashes and account data gets messy. So, this struct abstracts that.
pub struct AccountsToStore<'a> {
hashes: Vec<&'a Hash>,
accounts: Vec<&'a StoredAccountMeta<'a>>,
/// if 'accounts' contains more items than can be contained in the primary storage, then we have to split these accounts.
/// 'index_first_item_overflow' specifies the index of the first item in 'accounts' that will go into the overflow storage
@ -42,7 +41,6 @@ impl<'a> AccountsToStore<'a> {
slot: Slot,
) -> Self {
let num_accounts = stored_accounts.len();
let mut hashes = Vec::with_capacity(num_accounts);
let mut accounts = Vec::with_capacity(num_accounts);
// index of the first account that doesn't fit in the current append vec
let mut index_first_item_overflow = num_accounts; // assume all fit
@ -53,15 +51,13 @@ impl<'a> AccountsToStore<'a> {
} else if index_first_item_overflow == num_accounts {
available_bytes = 0;
// the # of accounts we have so far seen is the most that will fit in the current ancient append vec
index_first_item_overflow = hashes.len();
index_first_item_overflow = accounts.len();
}
hashes.push(account.account.hash);
// we have to specify 'slot' here because we are writing to an ancient append vec and squashing slots,
// so we need to update the previous accounts index entry for this account from 'slot' to 'ancient_slot'
accounts.push(&account.account);
});
Self {
hashes,
accounts,
index_first_item_overflow,
slot,
@ -73,13 +69,13 @@ impl<'a> AccountsToStore<'a> {
self.index_first_item_overflow < self.accounts.len()
}
/// get the accounts and hashes to store in the given 'storage'
pub fn get(&self, storage: StorageSelector) -> (&[&'a StoredAccountMeta<'a>], &[&'a Hash]) {
/// get the accounts to store in the given 'storage'
pub fn get(&self, storage: StorageSelector) -> &[&'a StoredAccountMeta<'a>] {
let range = match storage {
StorageSelector::Primary => 0..self.index_first_item_overflow,
StorageSelector::Overflow => self.index_first_item_overflow..self.accounts.len(),
};
(&self.accounts[range.clone()], &self.hashes[range])
&self.accounts[range]
}
}
@ -107,6 +103,7 @@ pub mod tests {
},
solana_sdk::{
account::{AccountSharedData, ReadableAccount},
hash::Hash,
pubkey::Pubkey,
},
};
@ -117,9 +114,8 @@ pub mod tests {
let slot = 1;
let accounts_to_store = AccountsToStore::new(0, &map, slot);
for selector in [StorageSelector::Primary, StorageSelector::Overflow] {
let (accounts, hash) = accounts_to_store.get(selector);
let accounts = accounts_to_store.get(selector);
assert!(accounts.is_empty());
assert!(hash.is_empty());
}
assert!(!accounts_to_store.has_overflow());
}
@ -164,20 +160,18 @@ pub mod tests {
] {
let slot = 1;
let accounts_to_store = AccountsToStore::new(available_bytes as u64, &map, slot);
let (accounts, hashes) = accounts_to_store.get(selector);
let accounts = accounts_to_store.get(selector);
assert_eq!(
accounts,
map.iter().map(|b| &b.account).collect::<Vec<_>>(),
"mismatch"
);
assert_eq!(hashes, vec![&hash]);
let (accounts, hash) = accounts_to_store.get(get_opposite(&selector));
let accounts = accounts_to_store.get(get_opposite(&selector));
assert_eq!(
selector == StorageSelector::Overflow,
accounts_to_store.has_overflow()
);
assert!(accounts.is_empty());
assert!(hash.is_empty());
}
}
fn get_opposite(selector: &StorageSelector) -> StorageSelector {

View File

@ -5,6 +5,7 @@
//! <https://docs.solana.com/implemented-proposals/persistent-account-storage>
use {
crate::storable_accounts::StorableAccounts,
log::*,
memmap2::MmapMut,
serde::{Deserialize, Serialize},
@ -19,6 +20,7 @@ use {
convert::TryFrom,
fs::{remove_file, OpenOptions},
io::{self, Seek, SeekFrom, Write},
marker::PhantomData,
mem,
path::{Path, PathBuf},
sync::{
@ -43,6 +45,94 @@ pub const MAXIMUM_APPEND_VEC_FILE_SIZE: u64 = 16 * 1024 * 1024 * 1024; // 16 GiB
pub type StoredMetaWriteVersion = u64;
/// Goal is to eliminate copies and data reshaping given various code paths that store accounts.
/// This struct contains what is needed to store accounts to a storage
/// 1. account & pubkey (StorableAccounts)
/// 2. hash per account (Maybe in StorableAccounts, otherwise has to be passed in separately)
/// 3. write version per account (Maybe in StorableAccounts, otherwise has to be passed in separately)
pub struct StorableAccountsWithHashesAndWriteVersions<
'a: 'b,
'b,
T: ReadableAccount + Sync + 'b,
U: StorableAccounts<'a, T>,
V: Borrow<Hash>,
> {
/// accounts to store
/// always has pubkey and account
/// may also have hash and write_version per account
accounts: &'b U,
/// if accounts does not have hash and write version, this has a hash and write version per account
hashes_and_write_versions: Option<(Vec<V>, Vec<StoredMetaWriteVersion>)>,
_phantom: PhantomData<&'a T>,
}
impl<'a: 'b, 'b, T: ReadableAccount + Sync + 'b, U: StorableAccounts<'a, T>, V: Borrow<Hash>>
StorableAccountsWithHashesAndWriteVersions<'a, 'b, T, U, V>
{
/// used when accounts contains hash and write version already
pub fn new(accounts: &'b U) -> Self {
assert!(accounts.has_hash_and_write_version());
Self {
accounts,
hashes_and_write_versions: None,
_phantom: PhantomData,
}
}
/// used when accounts does NOT contains hash or write version
/// In this case, hashes and write_versions have to be passed in separately and zipped together.
pub fn new_with_hashes_and_write_versions(
accounts: &'b U,
hashes: Vec<V>,
write_versions: Vec<StoredMetaWriteVersion>,
) -> Self {
assert!(!accounts.has_hash_and_write_version());
assert_eq!(accounts.len(), hashes.len());
assert_eq!(write_versions.len(), hashes.len());
Self {
accounts,
hashes_and_write_versions: Some((hashes, write_versions)),
_phantom: PhantomData,
}
}
/// hash for the account at 'index'
pub fn hash(&self, index: usize) -> &Hash {
if self.accounts.has_hash_and_write_version() {
self.accounts.hash(index)
} else {
self.hashes_and_write_versions.as_ref().unwrap().0[index].borrow()
}
}
/// write_version for the account at 'index'
pub fn write_version(&self, index: usize) -> u64 {
if self.accounts.has_hash_and_write_version() {
self.accounts.write_version(index)
} else {
self.hashes_and_write_versions.as_ref().unwrap().1[index]
}
}
/// None if account at index has lamports == 0
/// Otherwise, Some(account)
/// This is the only way to access the account.
pub fn account(&self, index: usize) -> Option<&T> {
self.accounts.account_default_if_zero_lamport(index)
}
/// pubkey at 'index'
pub fn pubkey(&self, index: usize) -> &Pubkey {
self.accounts.pubkey(index)
}
/// # accounts to write
pub fn len(&self) -> usize {
self.accounts.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
/// Meta contains enough context to recover the index from storage itself
/// This struct will be backed by mmaped and snapshotted data files.
/// So the data layout must be stable and consistent across the entire cluster!
@ -516,24 +606,48 @@ impl AppendVec {
/// So, return.len() is 1 + (number of accounts written)
/// After each account is appended, the internal `current_len` is updated
/// and will be available to other threads.
pub fn append_accounts(
pub fn append_accounts<
'a,
'b,
T: ReadableAccount + Sync,
U: StorableAccounts<'a, T>,
V: Borrow<Hash>,
>(
&self,
accounts: &[(StoredMeta, Option<&impl ReadableAccount>)],
hashes: &[impl Borrow<Hash>],
accounts: &StorableAccountsWithHashesAndWriteVersions<'a, 'b, T, U, V>,
skip: usize,
) -> Option<Vec<usize>> {
let _lock = self.append_lock.lock().unwrap();
let mut offset = self.len();
let mut rv = Vec::with_capacity(accounts.len());
for ((stored_meta, account), hash) in accounts.iter().zip(hashes) {
let meta_ptr = stored_meta as *const StoredMeta;
let account_meta = AccountMeta::from(*account);
let mut rv = Vec::with_capacity(accounts.accounts.len());
let len = accounts.accounts.len();
for i in skip..len {
let account = accounts.account(i);
let account_meta = account
.map(|account| AccountMeta {
lamports: account.lamports(),
owner: *account.owner(),
rent_epoch: account.rent_epoch(),
executable: account.executable(),
})
.unwrap_or_default();
let stored_meta = StoredMeta {
pubkey: *accounts.pubkey(i),
data_len: account
.map(|account| account.data().len())
.unwrap_or_default() as u64,
write_version: accounts.write_version(i),
};
let meta_ptr = &stored_meta as *const StoredMeta;
let account_meta_ptr = &account_meta as *const AccountMeta;
let data_len = stored_meta.data_len as usize;
let data_ptr = account
.map(|account| account.data())
.unwrap_or_default()
.as_ptr();
let hash_ptr = hash.borrow().as_ref().as_ptr();
let hash_ptr = accounts.hash(i).as_ref().as_ptr();
let ptrs = [
(meta_ptr as *const u8, mem::size_of::<StoredMeta>()),
(account_meta_ptr as *const u8, mem::size_of::<AccountMeta>()),
@ -563,19 +677,32 @@ impl AppendVec {
pub mod tests {
use {
super::{test_utils::*, *},
crate::accounts_db::INCLUDE_SLOT_IN_HASH_TESTS,
assert_matches::assert_matches,
rand::{thread_rng, Rng},
solana_sdk::{account::WritableAccount, timing::duration_as_ms},
solana_sdk::{
account::{accounts_equal, WritableAccount},
timing::duration_as_ms,
},
std::time::Instant,
};
impl AppendVec {
fn append_account_test(&self, data: &(StoredMeta, AccountSharedData)) -> Option<usize> {
self.append_accounts(
&[(data.0.clone(), Some(&data.1.clone()))],
&[Hash::default()],
)
.map(|res| res[0])
let slot_ignored = Slot::MAX;
let accounts = [(&data.0.pubkey, &data.1)];
let slice = &accounts[..];
let account_data = (slot_ignored, slice);
let hash = Hash::default();
let storable_accounts =
StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions(
&account_data,
vec![&hash],
vec![data.0.write_version],
);
self.append_accounts(&storable_accounts, 0)
.map(|res| res[0])
}
}
@ -604,6 +731,149 @@ pub mod tests {
}
}
#[test]
#[should_panic(expected = "assertion failed: accounts.has_hash_and_write_version()")]
fn test_storable_accounts_with_hashes_and_write_versions_new() {
let account = AccountSharedData::default();
// for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash)
let slot = 0 as Slot;
let pubkey = Pubkey::default();
StorableAccountsWithHashesAndWriteVersions::<'_, '_, _, _, &Hash>::new(&(
slot,
&[(&pubkey, &account)][..],
INCLUDE_SLOT_IN_HASH_TESTS,
));
}
fn test_mismatch(correct_hashes: bool, correct_write_versions: bool) {
let account = AccountSharedData::default();
// for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash)
let slot = 0 as Slot;
let pubkey = Pubkey::default();
// mismatch between lens of accounts, hashes, write_versions
let mut hashes = Vec::default();
if correct_hashes {
hashes.push(Hash::default());
}
let mut write_versions = Vec::default();
if correct_write_versions {
write_versions.push(0);
}
StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions(
&(slot, &[(&pubkey, &account)][..], INCLUDE_SLOT_IN_HASH_TESTS),
hashes,
write_versions,
);
}
#[test]
#[should_panic(expected = "assertion failed:")]
fn test_storable_accounts_with_hashes_and_write_versions_new2() {
test_mismatch(false, false);
}
#[test]
#[should_panic(expected = "assertion failed:")]
fn test_storable_accounts_with_hashes_and_write_versions_new3() {
test_mismatch(false, true);
}
#[test]
#[should_panic(expected = "assertion failed:")]
fn test_storable_accounts_with_hashes_and_write_versions_new4() {
test_mismatch(true, false);
}
#[test]
fn test_storable_accounts_with_hashes_and_write_versions_empty() {
// for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash)
let account = AccountSharedData::default();
let slot = 0 as Slot;
let pubkeys = vec![Pubkey::default()];
let hashes = Vec::<Hash>::default();
let write_versions = Vec::default();
let mut accounts = vec![(&pubkeys[0], &account)];
accounts.clear();
let accounts2 = (slot, &accounts[..], INCLUDE_SLOT_IN_HASH_TESTS);
let storable =
StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions(
&accounts2,
hashes,
write_versions,
);
assert_eq!(storable.len(), 0);
assert!(storable.is_empty());
}
#[test]
fn test_storable_accounts_with_hashes_and_write_versions_hash_and_write_version() {
// for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash)
let account = AccountSharedData::default();
let slot = 0 as Slot;
let pubkeys = vec![Pubkey::new(&[5; 32]), Pubkey::new(&[6; 32])];
let hashes = vec![Hash::new(&[3; 32]), Hash::new(&[4; 32])];
let write_versions = vec![42, 43];
let accounts = vec![(&pubkeys[0], &account), (&pubkeys[1], &account)];
let accounts2 = (slot, &accounts[..], INCLUDE_SLOT_IN_HASH_TESTS);
let storable =
StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions(
&accounts2,
hashes.clone(),
write_versions.clone(),
);
assert_eq!(storable.len(), pubkeys.len());
assert!(!storable.is_empty());
(0..2).for_each(|i| {
assert_eq!(storable.hash(i), &hashes[i]);
assert_eq!(&storable.write_version(i), &write_versions[i]);
assert_eq!(storable.pubkey(i), &pubkeys[i]);
});
}
#[test]
fn test_storable_accounts_with_hashes_and_write_versions_default() {
// 0 lamport account, should return default account (or None in this case)
let account = Account {
data: vec![0],
..Account::default()
}
.to_account_shared_data();
// for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash)
let slot = 0 as Slot;
let pubkey = Pubkey::default();
let hashes = vec![Hash::default()];
let write_versions = vec![0];
let accounts = vec![(&pubkey, &account)];
let accounts2 = (slot, &accounts[..], INCLUDE_SLOT_IN_HASH_TESTS);
let storable =
StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions(
&accounts2,
hashes.clone(),
write_versions.clone(),
);
let get_account = storable.account(0);
assert!(get_account.is_none());
// non-zero lamports, data should be correct
let account = Account {
lamports: 1,
data: vec![0],
..Account::default()
}
.to_account_shared_data();
// for (Slot, &'a [(&'a Pubkey, &'a T)], IncludeSlotInHash)
let accounts = vec![(&pubkey, &account)];
let accounts2 = (slot, &accounts[..], INCLUDE_SLOT_IN_HASH_TESTS);
let storable =
StorableAccountsWithHashesAndWriteVersions::new_with_hashes_and_write_versions(
&accounts2,
hashes,
write_versions,
);
let get_account = storable.account(0);
assert!(accounts_equal(&account, get_account.unwrap()));
}
#[test]
fn test_account_meta_default() {
let def1 = AccountMeta::default();

View File

@ -6322,7 +6322,7 @@ impl Bank {
))
}
pub fn store_accounts<'a, T: ReadableAccount + Sync + ZeroLamport>(
pub fn store_accounts<'a, T: ReadableAccount + Sync + ZeroLamport + 'a>(
&self,
accounts: impl StorableAccounts<'a, T>,
) {

View File

@ -373,7 +373,7 @@ impl<'a> SnapshotMinimizer<'a> {
&accounts[..],
crate::accounts_db::INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION,
),
Some(&hashes),
Some(hashes),
Some(&new_storage),
Some(Box::new(write_versions.into_iter())),
StoreReclaims::Default,

View File

@ -1,7 +1,7 @@
//! trait for abstracting underlying storage of pubkey and account pairs to be written
use {
crate::{accounts_db::IncludeSlotInHash, append_vec::StoredAccountMeta},
solana_sdk::{account::ReadableAccount, clock::Slot, pubkey::Pubkey},
solana_sdk::{account::ReadableAccount, clock::Slot, hash::Hash, pubkey::Pubkey},
};
/// abstract access to pubkey, account, slot, target_slot of either:
@ -14,6 +14,11 @@ pub trait StorableAccounts<'a, T: ReadableAccount + Sync>: Sync {
fn pubkey(&self, index: usize) -> &Pubkey;
/// account at 'index'
fn account(&self, index: usize) -> &T;
/// None if account is zero lamports
fn account_default_if_zero_lamport(&self, index: usize) -> Option<&T> {
let account = self.account(index);
(account.lamports() != 0).then_some(account)
}
// current slot for account at 'index'
fn slot(&self, index: usize) -> Slot;
/// slot that all accounts are to be written to
@ -29,6 +34,26 @@ pub trait StorableAccounts<'a, T: ReadableAccount + Sync>: Sync {
fn contains_multiple_slots(&self) -> bool;
/// true iff hashing these accounts should include the slot
fn include_slot_in_hash(&self) -> IncludeSlotInHash;
/// true iff the impl can provide hash and write_version
/// Otherwise, hash and write_version have to be provided separately to store functions.
fn has_hash_and_write_version(&self) -> bool {
false
}
/// return hash for account at 'index'
/// Should only be called if 'has_hash_and_write_version' = true
fn hash(&self, _index: usize) -> &Hash {
// this should never be called if has_hash_and_write_version returns false
unimplemented!();
}
/// return write_version for account at 'index'
/// Should only be called if 'has_hash_and_write_version' = true
fn write_version(&self, _index: usize) -> u64 {
// this should never be called if has_hash_and_write_version returns false
unimplemented!();
}
}
/// accounts that are moving from 'old_slot' to 'target_slot'
@ -159,6 +184,15 @@ impl<'a> StorableAccounts<'a, StoredAccountMeta<'a>>
fn include_slot_in_hash(&self) -> IncludeSlotInHash {
self.2
}
fn has_hash_and_write_version(&self) -> bool {
true
}
fn hash(&self, index: usize) -> &Hash {
self.1[index].hash
}
fn write_version(&self, index: usize) -> u64 {
self.1[index].meta.write_version
}
}
#[cfg(test)]
pub mod tests {