add ancient pack_ancient_storages (#30235)

* add ancient pack_ancient_storages

* cleanup
This commit is contained in:
Jeff Washington (jwash) 2023-02-10 14:06:36 -06:00 committed by GitHub
parent ceb225f36e
commit 157a9d725f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 378 additions and 2 deletions

View File

@ -181,7 +181,6 @@ impl<'a> StoreTo<'a> {
/// alive means in the accounts index
pub(crate) struct AliveAccounts<'a> {
/// slot the accounts are currently stored in
#[allow(dead_code)]
pub(crate) slot: Slot,
pub(crate) accounts: Vec<&'a StoredAccountMeta<'a>>,
pub(crate) bytes: usize,

View File

@ -12,7 +12,7 @@ use {
INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION,
},
accounts_index::ZeroLamport,
append_vec::{AppendVec, StoredAccountMeta},
append_vec::{aligned_stored_size, AppendVec, StoredAccountMeta},
storable_accounts::{StorableAccounts, StorableAccountsBySlot},
},
rand::{thread_rng, Rng},
@ -439,6 +439,95 @@ struct PackedAncientStorage<'a> {
bytes: u64,
}
impl<'a> PackedAncientStorage<'a> {
#[allow(dead_code)]
/// return a minimal set of 'PackedAncientStorage's to contain all 'accounts_to_combine' with
/// the new storages having a size guided by 'ideal_size'
fn pack(
mut accounts_to_combine: impl Iterator<Item = &'a AliveAccounts<'a>>,
ideal_size: NonZeroU64,
) -> Vec<PackedAncientStorage<'a>> {
let mut result = Vec::default();
let ideal_size: u64 = ideal_size.into();
let ideal_size = ideal_size as usize;
let mut current_alive_accounts = accounts_to_combine.next();
// starting at first entry in current_alive_accounts
let mut partial_inner_index = 0;
// 0 bytes written so far from the current set of accounts
let mut partial_bytes_written = 0;
// pack a new storage each iteration of this outer loop
loop {
let mut bytes_total = 0usize;
let mut accounts_to_write = Vec::default();
// walk through each set of alive accounts to pack the current new storage up to ideal_size
let mut full = false;
while !full && current_alive_accounts.is_some() {
let alive_accounts = current_alive_accounts.unwrap();
if partial_inner_index >= alive_accounts.accounts.len() {
// current_alive_accounts have all been written, so advance to next set from accounts_to_combine
current_alive_accounts = accounts_to_combine.next();
// reset partial progress since we're starting over with a new set of alive accounts
partial_inner_index = 0;
partial_bytes_written = 0;
continue;
}
let bytes_remaining_this_slot =
alive_accounts.bytes.saturating_sub(partial_bytes_written);
let bytes_total_with_this_slot =
bytes_total.saturating_add(bytes_remaining_this_slot);
let mut partial_inner_index_max_exclusive;
if bytes_total_with_this_slot <= ideal_size {
partial_inner_index_max_exclusive = alive_accounts.accounts.len();
bytes_total = bytes_total_with_this_slot;
} else {
partial_inner_index_max_exclusive = partial_inner_index;
// adding all the alive accounts in this storage would exceed the ideal size, so we have to break these accounts up
// look at each account and stop when we exceed the ideal size
while partial_inner_index_max_exclusive < alive_accounts.accounts.len() {
let account = alive_accounts.accounts[partial_inner_index_max_exclusive];
let account_size = aligned_stored_size(account.data().len());
let new_size = bytes_total.saturating_add(account_size);
if new_size > ideal_size && bytes_total > 0 {
full = true;
// partial_inner_index_max_exclusive is the index of the first account that puts us over the ideal size
// so, save it for next time
break;
}
// this account fits
saturating_add_assign!(partial_bytes_written, account_size);
bytes_total = new_size;
partial_inner_index_max_exclusive += 1;
}
}
if partial_inner_index < partial_inner_index_max_exclusive {
// these accounts belong in the current packed storage we're working on
accounts_to_write.push((
alive_accounts.slot,
// maybe all alive accounts from the current or could be partial
&alive_accounts.accounts
[partial_inner_index..partial_inner_index_max_exclusive],
));
}
// start next storage with the account we ended with
// this could be the end of the current alive accounts or could be anywhere within that vec
partial_inner_index = partial_inner_index_max_exclusive;
}
if accounts_to_write.is_empty() {
// if we returned without any accounts to write, then we have exhausted source data and have packaged all the storages we need
break;
}
// we know the full contents of this packed storage now
result.push(PackedAncientStorage {
bytes: bytes_total as u64,
accounts: accounts_to_write,
});
}
result
}
}
/// a set of accounts need to be stored.
/// If there are too many to fit in 'Primary', the rest are put in 'Overflow'
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@ -614,6 +703,294 @@ pub mod tests {
assert!(write_ancient_accounts.shrinks_in_progress.is_empty());
}
#[test]
fn test_pack_ancient_storages_one_account_per_storage() {
for num_slots in 0..4 {
for (ideal_size, expected_storages) in [
(1, num_slots),
(get_ancient_append_vec_capacity(), 1.min(num_slots)),
] {
let (db, storages, slots, _infos) = get_sample_storages(num_slots, None);
let original_results = storages
.iter()
.map(|store| db.get_unique_accounts_from_storage(store))
.collect::<Vec<_>>();
let slots_vec = slots.collect::<Vec<_>>();
let accounts_to_combine = original_results
.iter()
.zip(slots_vec.iter().cloned())
.map(|(accounts, slot)| AliveAccounts {
accounts: accounts.stored_accounts.iter().collect::<Vec<_>>(),
bytes: accounts
.stored_accounts
.iter()
.map(|account| aligned_stored_size(account.data().len()))
.sum(),
slot,
})
.collect::<Vec<_>>();
let result = PackedAncientStorage::pack(
accounts_to_combine.iter(),
NonZeroU64::new(ideal_size).unwrap(),
);
let storages_needed = result.len();
assert_eq!(storages_needed, expected_storages);
}
}
}
#[test]
fn test_pack_ancient_storages_one_partial() {
// n slots
// m accounts per slot
// divide into different ideal sizes so that we combine multiple slots sometimes and combine partial slots
solana_logger::setup();
let total_accounts_per_storage = 10;
let account_size = 184;
for num_slots in 0..4 {
for (ideal_size, expected_storages) in [
(1, num_slots * total_accounts_per_storage),
(account_size - 1, num_slots * total_accounts_per_storage),
(account_size, num_slots * total_accounts_per_storage),
(account_size + 1, num_slots * total_accounts_per_storage),
(account_size * 2 - 1, num_slots * total_accounts_per_storage),
(account_size * 2, num_slots * total_accounts_per_storage / 2),
(get_ancient_append_vec_capacity(), 1.min(num_slots)),
] {
let (db, storages, slots, _infos) = get_sample_storages(num_slots, None);
let account_template = storages
.first()
.map(|storage| {
storage
.accounts
.account_iter()
.next()
.unwrap()
.to_account_shared_data()
})
.unwrap_or_default();
// add some accounts to each storage so we can make partial progress
let mut lamports = 1000;
let _pubkeys_and_accounts = storages
.iter()
.map(|storage| {
(0..(total_accounts_per_storage - 1))
.map(|_| {
let pk = solana_sdk::pubkey::new_rand();
let mut account = account_template.clone();
account.set_lamports(lamports);
lamports += 1;
append_single_account_with_default_hash(
storage,
&pk,
&account,
0,
true,
Some(&db.accounts_index),
);
(pk, account)
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let original_results = storages
.iter()
.map(|store| db.get_unique_accounts_from_storage(store))
.collect::<Vec<_>>();
let slots_vec = slots.collect::<Vec<_>>();
let accounts_to_combine = original_results
.iter()
.zip(slots_vec.iter().cloned())
.map(|(accounts, slot)| AliveAccounts {
accounts: accounts.stored_accounts.iter().collect::<Vec<_>>(),
bytes: accounts
.stored_accounts
.iter()
.map(|account| aligned_stored_size(account.data().len()))
.sum(),
slot,
})
.collect::<Vec<_>>();
let result = PackedAncientStorage::pack(
accounts_to_combine.iter(),
NonZeroU64::new(ideal_size).unwrap(),
);
let storages_needed = result.len();
assert_eq!(storages_needed, expected_storages, "num_slots: {num_slots}, expected_storages: {expected_storages}, storages_needed: {storages_needed}, ideal_size: {ideal_size}");
compare_all_accounts(
&packed_to_compare(&result)[..],
&unique_to_compare(&original_results)[..],
);
}
}
}
fn packed_to_compare(packed: &[PackedAncientStorage]) -> Vec<(Pubkey, AccountSharedData)> {
packed
.iter()
.flat_map(|packed| {
packed.accounts.iter().flat_map(|(_slot, stored_metas)| {
stored_metas.iter().map(|stored_meta| {
(*stored_meta.pubkey(), stored_meta.to_account_shared_data())
})
})
})
.collect::<Vec<_>>()
}
#[test]
fn test_pack_ancient_storages_varying() {
// n slots
// different number of accounts in each slot
// each account has different size
// divide into different ideal sizes so that we combine multiple slots sometimes and combine partial slots
// compare at end that all accounts are in result exactly once
solana_logger::setup();
let total_accounts_per_storage = 10;
let account_size = 184;
for num_slots in 0..4 {
for ideal_size in [
1,
account_size - 1,
account_size,
account_size + 1,
account_size * 2 - 1,
account_size * 2,
get_ancient_append_vec_capacity(),
] {
let (db, storages, slots, _infos) = get_sample_storages(num_slots, None);
let account_template = storages
.first()
.map(|storage| {
storage
.accounts
.account_iter()
.next()
.unwrap()
.to_account_shared_data()
})
.unwrap_or_default();
// add some accounts to each storage so we can make partial progress
let mut data_size = 450;
// random # of extra accounts here
let total_accounts_per_storage =
thread_rng().gen_range(0, total_accounts_per_storage);
let _pubkeys_and_accounts = storages
.iter()
.map(|storage| {
(0..(total_accounts_per_storage - 1))
.map(|_| {
let pk = solana_sdk::pubkey::new_rand();
let mut account = account_template.clone();
account.set_data((0..data_size).map(|x| (x % 256) as u8).collect());
data_size += 1;
append_single_account_with_default_hash(
storage,
&pk,
&account,
0,
true,
Some(&db.accounts_index),
);
(pk, account)
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
let original_results = storages
.iter()
.map(|store| db.get_unique_accounts_from_storage(store))
.collect::<Vec<_>>();
let slots_vec = slots.collect::<Vec<_>>();
let accounts_to_combine = original_results
.iter()
.zip(slots_vec.iter().cloned())
.map(|(accounts, slot)| AliveAccounts {
accounts: accounts.stored_accounts.iter().collect::<Vec<_>>(),
bytes: accounts
.stored_accounts
.iter()
.map(|account| aligned_stored_size(account.data().len()))
.sum(),
slot,
})
.collect::<Vec<_>>();
let result = PackedAncientStorage::pack(
accounts_to_combine.iter(),
NonZeroU64::new(ideal_size).unwrap(),
);
let largest_account_size = aligned_stored_size(data_size) as u64;
// all packed storages should be close to ideal size
result.iter().enumerate().for_each(|(i, packed)| {
if i + 1 < result.len() && ideal_size > largest_account_size {
// cannot assert this on the last packed storage - it may be small
// cannot assert this when the ideal size is too small to hold the largest account size
assert!(
packed.bytes >= ideal_size - largest_account_size,
"packed size too small: bytes: {}, ideal: {}, largest: {}",
packed.bytes,
ideal_size,
largest_account_size
);
}
assert!(
packed.bytes > 0,
"packed size of zero"
);
assert!(
packed.bytes <= ideal_size || packed.accounts.iter().map(|(_slot, accounts)| accounts.len()).sum::<usize>() == 1,
"packed size too large: bytes: {}, ideal_size: {}, data_size: {}, num_slots: {}, # accounts: {}",
packed.bytes,
ideal_size,
data_size,
num_slots,
packed.accounts.len()
);
});
result.iter().for_each(|packed| {
assert_eq!(
packed.bytes,
packed
.accounts
.iter()
.map(|(_slot, accounts)| accounts
.iter()
.map(|account| aligned_stored_size(account.data().len()) as u64)
.sum::<u64>())
.sum::<u64>()
);
});
compare_all_accounts(
&packed_to_compare(&result)[..],
&unique_to_compare(&original_results)[..],
);
}
}
}
fn unique_to_compare(unique: &[GetUniqueAccountsResult]) -> Vec<(Pubkey, AccountSharedData)> {
unique
.iter()
.flat_map(|unique| {
unique.stored_accounts.iter().map(|stored_meta| {
(*stored_meta.pubkey(), stored_meta.to_account_shared_data())
})
})
.collect::<Vec<_>>()
}
#[test]
fn test_calc_accounts_to_combine_simple() {
// n storages