ancient add write_one_packed_storage (#30220)

This commit is contained in:
Jeff Washington (jwash) 2023-02-09 16:47:22 -06:00 committed by GitHub
parent 67f644473b
commit db25ccba52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 124 additions and 57 deletions

View File

@ -9,10 +9,11 @@ use {
accounts_db::{ accounts_db::{
AccountStorageEntry, AccountsDb, AliveAccounts, GetUniqueAccountsResult, ShrinkCollect, AccountStorageEntry, AccountsDb, AliveAccounts, GetUniqueAccountsResult, ShrinkCollect,
ShrinkCollectAliveSeparatedByRefs, ShrinkStatsSub, StoreReclaims, ShrinkCollectAliveSeparatedByRefs, ShrinkStatsSub, StoreReclaims,
INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION,
}, },
accounts_index::ZeroLamport, accounts_index::ZeroLamport,
append_vec::{AppendVec, StoredAccountMeta}, append_vec::{AppendVec, StoredAccountMeta},
storable_accounts::StorableAccounts, storable_accounts::{StorableAccounts, StorableAccountsBySlot},
}, },
rand::{thread_rng, Rng}, rand::{thread_rng, Rng},
solana_measure::{measure, measure_us}, solana_measure::{measure, measure_us},
@ -222,6 +223,15 @@ struct WriteAncientAccounts<'a> {
metrics: ShrinkStatsSub, metrics: ShrinkStatsSub,
} }
impl<'a> WriteAncientAccounts<'a> {
pub(crate) fn accumulate(&mut self, mut other: Self) {
self.metrics.accumulate(&other.metrics);
other.shrinks_in_progress.drain().for_each(|(k, v)| {
self.shrinks_in_progress.insert(k, v);
});
}
}
impl AccountsDb { impl AccountsDb {
/// calculate all storage info for the storages in slots /// calculate all storage info for the storages in slots
/// Then, apply 'tuning' to filter out slots we do NOT want to combine. /// Then, apply 'tuning' to filter out slots we do NOT want to combine.
@ -348,6 +358,29 @@ impl AccountsDb {
target_slots, target_slots,
} }
} }
/// create packed storage and write contents of 'packed' to it.
/// accumulate results in 'write_ancient_accounts'
#[allow(dead_code)]
fn write_one_packed_storage<'a, 'b: 'a>(
&'b self,
packed: &'a PackedAncientStorage<'a>,
target_slot: Slot,
write_ancient_accounts: &mut WriteAncientAccounts<'b>,
) {
let PackedAncientStorage {
bytes: bytes_total,
accounts: accounts_to_write,
} = packed;
let accounts_to_write = StorableAccountsBySlot::new(
target_slot,
&accounts_to_write[..],
INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION,
);
write_ancient_accounts
.accumulate(self.write_ancient_accounts(*bytes_total, accounts_to_write))
}
} }
/// hold all alive accounts to be shrunk and/or combined /// hold all alive accounts to be shrunk and/or combined
@ -372,6 +405,16 @@ struct AccountsToCombine<'a> {
target_slots: Vec<Slot>, target_slots: Vec<Slot>,
} }
#[allow(dead_code)]
#[derive(Default)]
/// intended contents of a packed ancient storage
struct PackedAncientStorage<'a> {
/// accounts to move into this storage, along with the slot the accounts are currently stored in
accounts: Vec<(Slot, &'a [&'a StoredAccountMeta<'a>])>,
/// total bytes required to hold 'accounts'
bytes: u64,
}
/// a set of accounts need to be stored. /// a set of accounts need to be stored.
/// If there are too many to fit in 'Primary', the rest are put in 'Overflow' /// If there are too many to fit in 'Primary', the rest are put in 'Overflow'
#[derive(Copy, Clone, Debug, PartialEq, Eq)] #[derive(Copy, Clone, Debug, PartialEq, Eq)]
@ -1399,45 +1442,53 @@ pub mod tests {
); );
} }
#[derive(EnumIter, Debug, PartialEq, Eq)]
enum TestWriteAncient {
OnePackedStorage,
AncientAccounts,
}
#[test] #[test]
fn test_write_ancient_accounts() { fn test_write_ancient_accounts() {
for data_size in [None, Some(10_000_000)] { for data_size in [None, Some(10_000_000)] {
for num_slots in 0..4 { for method in TestWriteAncient::iter() {
for combine_into in 0..=num_slots { for num_slots in 0..4 {
if combine_into == num_slots && num_slots > 0 { for combine_into in 0..=num_slots {
// invalid combination when num_slots > 0, but required to hit num_slots=0, combine_into=0 if combine_into == num_slots && num_slots > 0 {
continue; // invalid combination when num_slots > 0, but required to hit num_slots=0, combine_into=0
} continue;
let (db, storages, slots, _infos) = get_sample_storages(num_slots, data_size); }
let (db, storages, slots, _infos) =
get_sample_storages(num_slots, data_size);
let initial_accounts = get_all_accounts(&db, slots.clone()); let initial_accounts = get_all_accounts(&db, slots.clone());
let accounts_vecs = storages let accounts_vecs = storages
.iter() .iter()
.map(|storage| (storage.slot(), storage.accounts.accounts(0))) .map(|storage| (storage.slot(), storage.accounts.accounts(0)))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// reshape the data // reshape the data
let accounts_vecs2 = accounts_vecs let accounts_vecs2 = accounts_vecs
.iter() .iter()
.map(|(slot, accounts)| (*slot, accounts.iter().collect::<Vec<_>>())) .map(|(slot, accounts)| (*slot, accounts.iter().collect::<Vec<_>>()))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let accounts = accounts_vecs2 let accounts = accounts_vecs2
.iter() .iter()
.map(|(slot, accounts)| (*slot, &accounts[..])) .map(|(slot, accounts)| (*slot, &accounts[..]))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let target_slot = slots.clone().nth(combine_into).unwrap_or(slots.start); let target_slot = slots.clone().nth(combine_into).unwrap_or(slots.start);
let accounts_to_write = StorableAccountsBySlot::new( let accounts_to_write = StorableAccountsBySlot::new(
target_slot, target_slot,
&accounts[..], &accounts[..],
INCLUDE_SLOT_IN_HASH_TESTS, INCLUDE_SLOT_IN_HASH_TESTS,
); );
let bytes = storages let bytes = storages
.iter() .iter()
.map(|storage| storage.written_bytes()) .map(|storage| storage.written_bytes())
.sum::<u64>(); .sum::<u64>();
assert_eq!( assert_eq!(
bytes, bytes,
initial_accounts initial_accounts
.iter() .iter()
@ -1445,31 +1496,47 @@ pub mod tests {
.sum::<u64>() .sum::<u64>()
); );
if num_slots > 0 { if num_slots > 0 {
let mut result = db let mut result = match method {
.write_ancient_accounts(bytes, accounts_to_write) TestWriteAncient::AncientAccounts => {
.shrinks_in_progress; db.write_ancient_accounts(bytes, accounts_to_write)
let one = result.drain().collect::<Vec<_>>(); .shrinks_in_progress
assert_eq!(1, one.len()); }
assert_eq!(target_slot, one.first().unwrap().0); TestWriteAncient::OnePackedStorage => {
assert_eq!( let mut write_ancient_accounts =
one.first().unwrap().1.old_storage().append_vec_id(), WriteAncientAccounts::default();
storages[combine_into].append_vec_id()
);
// make sure the single new append vec contains all the same accounts
let accounts_in_new_storage =
one.first().unwrap().1.new_storage().accounts.accounts(0);
compare_all_accounts(
&initial_accounts,
&accounts_in_new_storage
.into_iter()
.map(|meta| (*meta.pubkey(), meta.to_account_shared_data()))
.collect::<Vec<_>>()[..],
);
}
let all_accounts = get_all_accounts(&db, target_slot..(target_slot + 1));
compare_all_accounts(&initial_accounts, &all_accounts); let packed = PackedAncientStorage { accounts, bytes };
db.write_one_packed_storage(
&packed,
target_slot,
&mut write_ancient_accounts,
);
write_ancient_accounts.shrinks_in_progress
}
};
let one = result.drain().collect::<Vec<_>>();
assert_eq!(1, one.len());
assert_eq!(target_slot, one.first().unwrap().0);
assert_eq!(
one.first().unwrap().1.old_storage().append_vec_id(),
storages[combine_into].append_vec_id()
);
// make sure the single new append vec contains all the same accounts
let accounts_in_new_storage =
one.first().unwrap().1.new_storage().accounts.accounts(0);
compare_all_accounts(
&initial_accounts,
&accounts_in_new_storage
.into_iter()
.map(|meta| (*meta.pubkey(), meta.to_account_shared_data()))
.collect::<Vec<_>>()[..],
);
}
let all_accounts = get_all_accounts(&db, target_slot..(target_slot + 1));
compare_all_accounts(&initial_accounts, &all_accounts);
}
} }
} }
} }