add combine_ancient_slots_packed (#30276)

* add combine_ancient_slots_new

* pr feedback

* implement ==
This commit is contained in:
Jeff Washington (jwash) 2023-02-14 08:53:31 -06:00 committed by GitHub
parent 6fb4716e48
commit 253517cba3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 262 additions and 12 deletions

View File

@ -153,7 +153,7 @@ pub enum IncludeSlotInHash {
IrrelevantAssertOnUse,
}
#[derive(Debug, Default, Clone, Copy)]
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum CreateAncientStorage {
/// ancient storages are created by appending
#[default]
@ -4024,7 +4024,6 @@ impl AccountsDb {
self.shrink_stats.report();
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn update_shrink_stats(shrink_stats: &ShrinkStats, stats_sub: ShrinkStatsSub) {
shrink_stats
.num_slots_shrunk
@ -4288,10 +4287,17 @@ impl AccountsDb {
}
let can_randomly_shrink = true;
self.combine_ancient_slots(
self.get_sorted_potential_ancient_slots(),
can_randomly_shrink,
);
if self.create_ancient_storage == CreateAncientStorage::Append {
self.combine_ancient_slots(
self.get_sorted_potential_ancient_slots(),
can_randomly_shrink,
);
} else {
self.combine_ancient_slots_packed(
self.get_sorted_potential_ancient_slots(),
can_randomly_shrink,
);
}
}
#[cfg(test)]
@ -17253,7 +17259,7 @@ pub mod tests {
}
}
const CAN_RANDOMLY_SHRINK_FALSE: bool = false;
pub(crate) const CAN_RANDOMLY_SHRINK_FALSE: bool = false;
#[test]
fn test_combine_ancient_slots_empty() {

View File

@ -12,13 +12,18 @@ use {
INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION,
},
accounts_index::ZeroLamport,
active_stats::ActiveStatItem,
append_vec::{aligned_stored_size, AppendVec, StoredAccountMeta},
storable_accounts::{StorableAccounts, StorableAccountsBySlot},
},
rand::{thread_rng, Rng},
solana_measure::{measure, measure_us},
solana_sdk::{account::ReadableAccount, clock::Slot, hash::Hash, saturating_add_assign},
std::{collections::HashMap, num::NonZeroU64, sync::Arc},
std::{
collections::HashMap,
num::NonZeroU64,
sync::{atomic::Ordering, Arc},
},
};
/// ancient packing algorithm tuning per pass
@ -224,6 +229,89 @@ struct WriteAncientAccounts<'a> {
}
impl AccountsDb {
#[allow(dead_code)]
/// Combine account data from storages in 'sorted_slots' into packed storages.
/// This keeps us from accumulating storages for each slot older than an epoch.
/// Ater this function the number of alive roots is <= # alive roots when it was called.
/// In practice, the # of alive roots after will be significantly less than # alive roots when called.
/// Trying to reduce # roots and storages (one per root) required to store all the data in ancient slots
pub(crate) fn combine_ancient_slots_packed(
&self,
sorted_slots: Vec<Slot>,
can_randomly_shrink: bool,
) {
let tuning = PackedAncientStorageTuning {
// only allow 10k slots old enough to be ancient
max_ancient_slots: 10_000,
// re-combine/shrink 55% of the data savings this pass
percent_of_alive_shrunk_data: 55,
ideal_storage_size: NonZeroU64::new(get_ancient_append_vec_capacity()).unwrap(),
can_randomly_shrink,
};
let _guard = self.active_stats.activate(ActiveStatItem::SquashAncient);
let mut stats_sub = ShrinkStatsSub::default();
let (_, total_us) = measure_us!(self.combine_ancient_slots_packed_internal(
sorted_slots,
tuning,
&mut stats_sub
));
Self::update_shrink_stats(&self.shrink_ancient_stats.shrink_stats, stats_sub);
self.shrink_ancient_stats
.total_us
.fetch_add(total_us, Ordering::Relaxed);
// only log when we've spent 1s total
// results will continue to accumulate otherwise
if self.shrink_ancient_stats.total_us.load(Ordering::Relaxed) > 1_000_000 {
self.shrink_ancient_stats.report();
}
}
#[allow(dead_code)]
fn combine_ancient_slots_packed_internal(
&self,
sorted_slots: Vec<Slot>,
tuning: PackedAncientStorageTuning,
metrics: &mut ShrinkStatsSub,
) {
let ancient_slot_infos = self.collect_sort_filter_ancient_slots(sorted_slots, &tuning);
if ancient_slot_infos.all_infos.is_empty() {
return; // nothing to do
}
let accounts_per_storage = self
.get_unique_accounts_from_storage_for_combining_ancient_slots(
&ancient_slot_infos.all_infos[..],
);
let accounts_to_combine = self.calc_accounts_to_combine(&accounts_per_storage);
// pack the accounts with 1 ref
let pack = PackedAncientStorage::pack(
accounts_to_combine
.accounts_to_combine
.iter()
.map(|shrink_collect| &shrink_collect.alive_accounts.one_ref),
tuning.ideal_storage_size,
);
if pack.len() > accounts_to_combine.target_slots_sorted.len() {
return; // not enough slots to contain the storages we are trying to pack
}
let write_ancient_accounts = self.write_packed_storages(&accounts_to_combine, pack);
self.finish_combine_ancient_slots_packed_internal(
accounts_to_combine,
write_ancient_accounts,
metrics,
);
}
/// calculate all storage info for the storages in slots
/// Then, apply 'tuning' to filter out slots we do NOT want to combine.
#[allow(dead_code)]
@ -345,7 +433,7 @@ impl AccountsDb {
/// finish shrink operation on slots where a new storage was created
/// drop root and storage for all original slots whose contents were combined into other storages
#[allow(dead_code)]
fn finish_combine_ancient_slots_packed<'a>(
fn finish_combine_ancient_slots_packed_internal<'a>(
&self,
accounts_to_combine: AccountsToCombine<'a>,
mut write_ancient_accounts: WriteAncientAccounts,
@ -674,7 +762,7 @@ pub mod tests {
tests::{
append_single_account_with_default_hash, compare_all_accounts,
create_db_with_storages_and_index, create_storages_and_update_index,
get_all_accounts, remove_account_for_tests,
get_all_accounts, remove_account_for_tests, CAN_RANDOMLY_SHRINK_FALSE,
},
INCLUDE_SLOT_IN_HASH_TESTS,
},
@ -1078,7 +1166,7 @@ pub mod tests {
}
#[test]
fn test_finish_combine_ancient_slots_packed() {
fn test_finish_combine_ancient_slots_packed_internal() {
// n storages
// 1 account each
// all accounts have 1 ref
@ -1123,7 +1211,7 @@ pub mod tests {
});
}
db.finish_combine_ancient_slots_packed(
db.finish_combine_ancient_slots_packed_internal(
accounts_to_combine,
write_ancient_accounts,
&mut stats,
@ -1588,6 +1676,29 @@ pub mod tests {
}
}
fn get_one_packed_ancient_append_vec_and_others(
alive: bool,
num_normal_slots: usize,
) -> (AccountsDb, Slot) {
let (db, slot1) = create_db_with_storages_and_index(alive, num_normal_slots + 1, None);
let storage = db.storage.get_slot_storage_entry(slot1).unwrap();
let created_accounts = db.get_unique_accounts_from_storage(&storage);
db.combine_ancient_slots_packed(vec![slot1], CAN_RANDOMLY_SHRINK_FALSE);
assert!(db.storage.get_slot_storage_entry(slot1).is_some());
let after_store = db.storage.get_slot_storage_entry(slot1).unwrap();
let GetUniqueAccountsResult {
stored_accounts: after_stored_accounts,
capacity: after_capacity,
} = db.get_unique_accounts_from_storage(&after_store);
assert_eq!(created_accounts.capacity, after_capacity);
assert_eq!(created_accounts.stored_accounts.len(), 1);
// always 1 account: either we leave the append vec alone if it is all dead
// or we create a new one and copy into it if account is alive
assert_eq!(after_stored_accounts.len(), 1);
(db, slot1)
}
fn assert_storage_info(info: &SlotInfo, storage: &AccountStorageEntry, should_shrink: bool) {
assert_eq!(storage.append_vec_id(), info.storage.append_vec_id());
assert_eq!(storage.slot(), info.slot);
@ -2364,4 +2475,137 @@ pub mod tests {
assert!(first_capacity >= second_capacity);
}
}
#[test]
fn test_combine_ancient_slots_packed_internal() {
let can_randomly_shrink = false;
let alive = true;
for num_slots in 0..4 {
for max_ancient_slots in 0..4 {
let (db, slot1) = create_db_with_storages_and_index(alive, num_slots, None);
let original_stores = (0..num_slots)
.filter_map(|slot| db.storage.get_slot_storage_entry((slot as Slot) + slot1))
.collect::<Vec<_>>();
let original_results = original_stores
.iter()
.map(|store| db.get_unique_accounts_from_storage(store))
.collect::<Vec<_>>();
let tuning = PackedAncientStorageTuning {
percent_of_alive_shrunk_data: 0,
max_ancient_slots,
can_randomly_shrink,
ideal_storage_size: NonZeroU64::new(get_ancient_append_vec_capacity()).unwrap(),
};
db.combine_ancient_slots_packed_internal(
(0..num_slots).map(|slot| (slot as Slot) + slot1).collect(),
tuning,
&mut ShrinkStatsSub::default(),
);
let storage = db.storage.get_slot_storage_entry(slot1);
if num_slots == 0 {
assert!(storage.is_none());
continue;
}
// any of the several slots could have been chosen to be re-used
let active_slots = (0..num_slots)
.filter_map(|slot| db.storage.get_slot_storage_entry((slot as Slot) + slot1))
.count();
let mut expected_slots = max_ancient_slots.min(num_slots);
if max_ancient_slots == 0 {
expected_slots = 1;
}
assert_eq!(
active_slots, expected_slots,
"slots: {num_slots}, max_ancient_slots: {max_ancient_slots}, alive: {alive}"
);
assert_eq!(
expected_slots,
db.storage.all_slots().len(),
"slots: {num_slots}, max_ancient_slots: {max_ancient_slots}"
);
let stores = (0..num_slots)
.filter_map(|slot| db.storage.get_slot_storage_entry((slot as Slot) + slot1))
.collect::<Vec<_>>();
let results = stores
.iter()
.map(|store| db.get_unique_accounts_from_storage(store))
.collect::<Vec<_>>();
let all_accounts = get_all_accounts(&db, slot1..(slot1 + num_slots as Slot));
compare_all_accounts(&vec_unique_to_accounts(&original_results), &all_accounts);
compare_all_accounts(
&vec_unique_to_accounts(&results),
&get_all_accounts(&db, slot1..(slot1 + num_slots as Slot)),
);
}
}
}
fn vec_unique_to_accounts(one: &[GetUniqueAccountsResult]) -> Vec<(Pubkey, AccountSharedData)> {
one.iter()
.flat_map(|result| {
result
.stored_accounts
.iter()
.map(|result| (*result.pubkey(), result.to_account_shared_data()))
})
.collect()
}
#[test]
fn test_combine_packed_ancient_slots_simple() {
for alive in [false, true] {
_ = get_one_packed_ancient_append_vec_and_others(alive, 0);
}
}
#[test]
fn test_shrink_packed_ancient() {
solana_logger::setup();
let num_normal_slots = 1;
// build an ancient append vec at slot 'ancient_slot'
let (db, ancient_slot) =
get_one_packed_ancient_append_vec_and_others(true, num_normal_slots);
let max_slot_inclusive = ancient_slot + (num_normal_slots as Slot);
let initial_accounts = get_all_accounts(&db, ancient_slot..(max_slot_inclusive + 1));
compare_all_accounts(
&initial_accounts,
&get_all_accounts(&db, ancient_slot..(max_slot_inclusive + 1)),
);
// combine normal append vec(s) into existing ancient append vec
db.combine_ancient_slots_packed(
(ancient_slot..=max_slot_inclusive).collect(),
CAN_RANDOMLY_SHRINK_FALSE,
);
compare_all_accounts(
&initial_accounts,
&get_all_accounts(&db, ancient_slot..(max_slot_inclusive + 1)),
);
// create a 2nd ancient append vec at 'next_slot'
let next_slot = max_slot_inclusive + 1;
create_storages_and_update_index(&db, None, next_slot, num_normal_slots, true, None);
let max_slot_inclusive = next_slot + (num_normal_slots as Slot);
let initial_accounts = get_all_accounts(&db, ancient_slot..(max_slot_inclusive + 1));
compare_all_accounts(
&initial_accounts,
&get_all_accounts(&db, ancient_slot..(max_slot_inclusive + 1)),
);
db.combine_ancient_slots_packed(
(next_slot..=max_slot_inclusive).collect(),
CAN_RANDOMLY_SHRINK_FALSE,
);
compare_all_accounts(
&initial_accounts,
&get_all_accounts(&db, ancient_slot..(max_slot_inclusive + 1)),
);
}
}