add ancient write_packed_storages (#30260)

This commit is contained in:
Jeff Washington (jwash) 2023-02-13 09:25:47 -06:00 committed by GitHub
parent cb7fed6fae
commit bcd7cf0821
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 352 additions and 250 deletions

View File

@ -291,6 +291,37 @@ impl AccountsDb {
infos
}
/// write packed storages as described in 'accounts_to_combine'
/// and 'packed_contents'
#[allow(dead_code)]
fn write_packed_storages<'a, 'b>(
&'a self,
accounts_to_combine: &'b AccountsToCombine<'b>,
packed_contents: Vec<PackedAncientStorage<'b>>,
) -> WriteAncientAccounts<'a> {
let mut write_ancient_accounts = WriteAncientAccounts::default();
// ok if we have more slots, but NOT ok if we have fewer slots than we have contents
assert!(accounts_to_combine.target_slots_sorted.len() >= packed_contents.len());
// write packed storages containing contents from many original slots
// iterate slots in highest to lowest
accounts_to_combine
.target_slots_sorted
.iter()
.rev()
.zip(packed_contents)
.for_each(|(target_slot, pack)| {
self.write_one_packed_storage(&pack, *target_slot, &mut write_ancient_accounts);
});
// write new storages where contents were unable to move because ref_count > 1
self.write_ancient_accounts_to_same_slot_multiple_refs(
accounts_to_combine.accounts_keep_slots.values(),
&mut write_ancient_accounts,
);
write_ancient_accounts
}
/// for each slot in 'ancient_slots', collect all accounts in that slot
/// return the collection of accounts by slot
#[allow(dead_code)]
@ -683,6 +714,36 @@ pub mod tests {
compare_all_accounts(&unique_to_accounts(one), &unique_to_accounts(two));
}
#[test]
fn test_write_packed_storages_empty() {
let (db, _storages, _slots, _infos) = get_sample_storages(0, None);
let write_ancient_accounts =
db.write_packed_storages(&AccountsToCombine::default(), Vec::default());
assert!(write_ancient_accounts.shrinks_in_progress.is_empty());
}
#[test]
#[should_panic(
expected = "assertion failed: accounts_to_combine.target_slots_sorted.len() >= packed_contents.len()"
)]
fn test_write_packed_storages_too_few_slots() {
let (db, storages, slots, _infos) = get_sample_storages(1, None);
let accounts_to_combine = AccountsToCombine::default();
let accounts = [storages
.first()
.unwrap()
.accounts
.accounts(0)
.pop()
.unwrap()];
let accounts = accounts.iter().collect::<Vec<_>>();
let packed_contents = vec![PackedAncientStorage {
bytes: 0,
accounts: vec![(slots.start, &accounts[..])],
}];
db.write_packed_storages(&accounts_to_combine, packed_contents);
}
#[test]
fn test_write_ancient_accounts_to_same_slot_multiple_refs_empty() {
let (db, _storages, _slots, _infos) = get_sample_storages(0, None);
@ -982,120 +1043,133 @@ pub mod tests {
.collect::<Vec<_>>()
}
#[derive(EnumIter, Debug, PartialEq, Eq)]
enum TestWriteMultipleRefs {
MultipleRefs,
PackedStorages,
}
#[test]
fn test_calc_accounts_to_combine_simple() {
// n storages
// 1 account each
// all accounts have 1 ref or all accounts have 2 refs
for num_slots in 0..3 {
for two_refs in [false, true] {
let (db, storages, slots, infos) = get_sample_storages(num_slots, None);
let original_results = storages
.iter()
.map(|store| db.get_unique_accounts_from_storage(store))
.collect::<Vec<_>>();
if two_refs {
original_results.iter().for_each(|results| {
results.stored_accounts.iter().for_each(|account| {
let entry = db
.accounts_index
.get_account_read_entry(account.pubkey())
.unwrap();
entry.addref();
})
});
}
let accounts_per_storage = infos
.iter()
.zip(original_results.into_iter())
.collect::<Vec<_>>();
let accounts_to_combine = db.calc_accounts_to_combine(&accounts_per_storage);
let slots_vec = slots.collect::<Vec<_>>();
assert_eq!(accounts_to_combine.accounts_to_combine.len(), num_slots);
if two_refs {
// all accounts should be in many_refs
let mut accounts_keep = accounts_to_combine
.accounts_keep_slots
.keys()
.cloned()
for method in TestWriteMultipleRefs::iter() {
for num_slots in 0..3 {
for two_refs in [false, true] {
let (db, storages, slots, infos) = get_sample_storages(num_slots, None);
let original_results = storages
.iter()
.map(|store| db.get_unique_accounts_from_storage(store))
.collect::<Vec<_>>();
accounts_keep.sort_unstable();
assert_eq!(accounts_keep, slots_vec);
assert!(accounts_to_combine.target_slots_sorted.is_empty());
assert_eq!(accounts_to_combine.accounts_keep_slots.len(), num_slots);
assert!(accounts_to_combine
.accounts_to_combine
.iter()
.all(|shrink_collect| shrink_collect
.alive_accounts
.one_ref
.accounts
.is_empty()));
assert!(accounts_to_combine
.accounts_to_combine
.iter()
.all(|shrink_collect| shrink_collect
.alive_accounts
.many_refs
.accounts
.is_empty()));
} else {
// all accounts should be in one_ref and all slots are available as target slots
assert_eq!(accounts_to_combine.target_slots_sorted, slots_vec);
assert!(accounts_to_combine.accounts_keep_slots.is_empty());
assert!(accounts_to_combine
.accounts_to_combine
.iter()
.all(|shrink_collect| !shrink_collect
.alive_accounts
.one_ref
.accounts
.is_empty()));
assert!(accounts_to_combine
.accounts_to_combine
.iter()
.all(|shrink_collect| shrink_collect
.alive_accounts
.many_refs
.accounts
.is_empty()));
}
// test write_ancient_accounts_to_same_slot_multiple_refs since we built interesting 'AccountsToCombine'
let mut write_ancient_accounts = WriteAncientAccounts::default();
db.write_ancient_accounts_to_same_slot_multiple_refs(
accounts_to_combine.accounts_keep_slots.values(),
&mut write_ancient_accounts,
);
if two_refs {
assert_eq!(write_ancient_accounts.shrinks_in_progress.len(), num_slots);
let mut shrinks_in_progress = write_ancient_accounts
.shrinks_in_progress
if two_refs {
original_results.iter().for_each(|results| {
results.stored_accounts.iter().for_each(|account| {
let entry = db
.accounts_index
.get_account_read_entry(account.pubkey())
.unwrap();
entry.addref();
})
});
}
let accounts_per_storage = infos
.iter()
.zip(original_results.into_iter())
.collect::<Vec<_>>();
shrinks_in_progress.sort_unstable_by(|a, b| a.0.cmp(b.0));
assert_eq!(
shrinks_in_progress
let accounts_to_combine = db.calc_accounts_to_combine(&accounts_per_storage);
let slots_vec = slots.collect::<Vec<_>>();
assert_eq!(accounts_to_combine.accounts_to_combine.len(), num_slots);
if two_refs {
// all accounts should be in many_refs
let mut accounts_keep = accounts_to_combine
.accounts_keep_slots
.keys()
.cloned()
.collect::<Vec<_>>();
accounts_keep.sort_unstable();
assert_eq!(accounts_keep, slots_vec);
assert!(accounts_to_combine.target_slots_sorted.is_empty());
assert_eq!(accounts_to_combine.accounts_keep_slots.len(), num_slots);
assert!(accounts_to_combine.accounts_to_combine.iter().all(
|shrink_collect| shrink_collect
.alive_accounts
.one_ref
.accounts
.is_empty()
));
assert!(accounts_to_combine.accounts_to_combine.iter().all(
|shrink_collect| shrink_collect
.alive_accounts
.many_refs
.accounts
.is_empty()
));
} else {
// all accounts should be in one_ref and all slots are available as target slots
assert_eq!(accounts_to_combine.target_slots_sorted, slots_vec);
assert!(accounts_to_combine.accounts_keep_slots.is_empty());
assert!(accounts_to_combine.accounts_to_combine.iter().all(
|shrink_collect| !shrink_collect
.alive_accounts
.one_ref
.accounts
.is_empty()
));
assert!(accounts_to_combine.accounts_to_combine.iter().all(
|shrink_collect| shrink_collect
.alive_accounts
.many_refs
.accounts
.is_empty()
));
}
// test write_ancient_accounts_to_same_slot_multiple_refs since we built interesting 'AccountsToCombine'
let write_ancient_accounts = match method {
TestWriteMultipleRefs::MultipleRefs => {
let mut write_ancient_accounts = WriteAncientAccounts::default();
db.write_ancient_accounts_to_same_slot_multiple_refs(
accounts_to_combine.accounts_keep_slots.values(),
&mut write_ancient_accounts,
);
write_ancient_accounts
}
TestWriteMultipleRefs::PackedStorages => {
let packed_contents = Vec::default();
db.write_packed_storages(&accounts_to_combine, packed_contents)
}
};
if two_refs {
assert_eq!(write_ancient_accounts.shrinks_in_progress.len(), num_slots);
let mut shrinks_in_progress = write_ancient_accounts
.shrinks_in_progress
.iter()
.map(|(slot, _)| **slot)
.collect::<Vec<_>>(),
slots_vec
);
assert_eq!(
shrinks_in_progress
.iter()
.map(|(_, shrink_in_progress)| shrink_in_progress
.old_storage()
.append_vec_id())
.collect::<Vec<_>>(),
storages
.iter()
.map(|storage| storage.append_vec_id())
.collect::<Vec<_>>()
);
} else {
assert!(write_ancient_accounts.shrinks_in_progress.is_empty());
.collect::<Vec<_>>();
shrinks_in_progress.sort_unstable_by(|a, b| a.0.cmp(b.0));
assert_eq!(
shrinks_in_progress
.iter()
.map(|(slot, _)| **slot)
.collect::<Vec<_>>(),
slots_vec
);
assert_eq!(
shrinks_in_progress
.iter()
.map(|(_, shrink_in_progress)| shrink_in_progress
.old_storage()
.append_vec_id())
.collect::<Vec<_>>(),
storages
.iter()
.map(|storage| storage.append_vec_id())
.collect::<Vec<_>>()
);
} else {
assert!(write_ancient_accounts.shrinks_in_progress.is_empty());
}
}
}
}
@ -1107,148 +1181,159 @@ pub mod tests {
// 2 accounts
// 1 with 1 ref
// 1 with 2 refs
let num_slots = 1;
let (db, storages, slots, infos) = get_sample_storages(num_slots, None);
let original_results = storages
.iter()
.map(|store| db.get_unique_accounts_from_storage(store))
.collect::<Vec<_>>();
let storage = storages.first().unwrap().clone();
let pk_with_1_ref = solana_sdk::pubkey::new_rand();
let slot1 = slots.start;
let account_with_2_refs = original_results
.first()
.unwrap()
.stored_accounts
.first()
.unwrap();
let pk_with_2_refs = account_with_2_refs.pubkey();
let mut account_with_1_ref = account_with_2_refs.to_account_shared_data();
_ = account_with_1_ref.checked_add_lamports(1);
append_single_account_with_default_hash(
&storage,
&pk_with_1_ref,
&account_with_1_ref,
0,
true,
Some(&db.accounts_index),
);
original_results.iter().for_each(|results| {
results.stored_accounts.iter().for_each(|account| {
let entry = db
.accounts_index
.get_account_read_entry(account.pubkey())
.unwrap();
entry.addref();
})
});
// update to get both accounts in the storage
let original_results = storages
.iter()
.map(|store| db.get_unique_accounts_from_storage(store))
.collect::<Vec<_>>();
assert_eq!(original_results.first().unwrap().stored_accounts.len(), 2);
let accounts_per_storage = infos
.iter()
.zip(original_results.into_iter())
.collect::<Vec<_>>();
let accounts_to_combine = db.calc_accounts_to_combine(&accounts_per_storage);
let slots_vec = slots.collect::<Vec<_>>();
assert_eq!(accounts_to_combine.accounts_to_combine.len(), num_slots);
// all accounts should be in many_refs
let mut accounts_keep = accounts_to_combine
.accounts_keep_slots
.keys()
.cloned()
.collect::<Vec<_>>();
accounts_keep.sort_unstable();
assert_eq!(accounts_keep, slots_vec);
assert!(accounts_to_combine.target_slots_sorted.is_empty());
assert_eq!(accounts_to_combine.accounts_keep_slots.len(), num_slots);
assert_eq!(
accounts_to_combine
.accounts_keep_slots
.get(&slot1)
.unwrap()
.accounts
for method in TestWriteMultipleRefs::iter() {
let num_slots = 1;
let (db, storages, slots, infos) = get_sample_storages(num_slots, None);
let original_results = storages
.iter()
.map(|meta| meta.pubkey())
.collect::<Vec<_>>(),
vec![pk_with_2_refs]
);
assert_eq!(accounts_to_combine.accounts_to_combine.len(), 1);
let one_ref_accounts = &accounts_to_combine
.accounts_to_combine
.first()
.unwrap()
.alive_accounts
.one_ref
.accounts;
assert_eq!(
one_ref_accounts
.iter()
.map(|meta| meta.pubkey())
.collect::<Vec<_>>(),
vec![&pk_with_1_ref]
);
assert_eq!(
one_ref_accounts
.iter()
.map(|meta| meta.to_account_shared_data())
.collect::<Vec<_>>(),
vec![account_with_1_ref]
);
assert!(accounts_to_combine
.accounts_to_combine
.iter()
.all(|shrink_collect| shrink_collect.alive_accounts.many_refs.accounts.is_empty()));
// test write_ancient_accounts_to_same_slot_multiple_refs since we built interesting 'AccountsToCombine'
let mut write_ancient_accounts = WriteAncientAccounts::default();
db.write_ancient_accounts_to_same_slot_multiple_refs(
accounts_to_combine.accounts_keep_slots.values(),
&mut write_ancient_accounts,
);
assert_eq!(write_ancient_accounts.shrinks_in_progress.len(), num_slots);
let mut shrinks_in_progress = write_ancient_accounts
.shrinks_in_progress
.iter()
.collect::<Vec<_>>();
shrinks_in_progress.sort_unstable_by(|a, b| a.0.cmp(b.0));
assert_eq!(
shrinks_in_progress
.iter()
.map(|(slot, _)| **slot)
.collect::<Vec<_>>(),
slots_vec
);
assert_eq!(
shrinks_in_progress
.iter()
.map(|(_, shrink_in_progress)| shrink_in_progress.old_storage().append_vec_id())
.collect::<Vec<_>>(),
storages
.iter()
.map(|storage| storage.append_vec_id())
.collect::<Vec<_>>()
);
// assert that we wrote the 2_ref account to the newly shrunk append vec
let shrink_in_progress = shrinks_in_progress.first().unwrap().1;
let accounts_shrunk_same_slot = shrink_in_progress.new_storage().accounts.accounts(0);
assert_eq!(accounts_shrunk_same_slot.len(), 1);
assert_eq!(
accounts_shrunk_same_slot.first().unwrap().pubkey(),
pk_with_2_refs
);
assert_eq!(
accounts_shrunk_same_slot
.map(|store| db.get_unique_accounts_from_storage(store))
.collect::<Vec<_>>();
let storage = storages.first().unwrap().clone();
let pk_with_1_ref = solana_sdk::pubkey::new_rand();
let slot1 = slots.start;
let account_with_2_refs = original_results
.first()
.unwrap()
.to_account_shared_data(),
account_with_2_refs.to_account_shared_data()
);
.stored_accounts
.first()
.unwrap();
let pk_with_2_refs = account_with_2_refs.pubkey();
let mut account_with_1_ref = account_with_2_refs.to_account_shared_data();
_ = account_with_1_ref.checked_add_lamports(1);
append_single_account_with_default_hash(
&storage,
&pk_with_1_ref,
&account_with_1_ref,
0,
true,
Some(&db.accounts_index),
);
original_results.iter().for_each(|results| {
results.stored_accounts.iter().for_each(|account| {
let entry = db
.accounts_index
.get_account_read_entry(account.pubkey())
.unwrap();
entry.addref();
})
});
// update to get both accounts in the storage
let original_results = storages
.iter()
.map(|store| db.get_unique_accounts_from_storage(store))
.collect::<Vec<_>>();
assert_eq!(original_results.first().unwrap().stored_accounts.len(), 2);
let accounts_per_storage = infos
.iter()
.zip(original_results.into_iter())
.collect::<Vec<_>>();
let accounts_to_combine = db.calc_accounts_to_combine(&accounts_per_storage);
let slots_vec = slots.collect::<Vec<_>>();
assert_eq!(accounts_to_combine.accounts_to_combine.len(), num_slots);
// all accounts should be in many_refs
let mut accounts_keep = accounts_to_combine
.accounts_keep_slots
.keys()
.cloned()
.collect::<Vec<_>>();
accounts_keep.sort_unstable();
assert_eq!(accounts_keep, slots_vec);
assert!(accounts_to_combine.target_slots_sorted.is_empty());
assert_eq!(accounts_to_combine.accounts_keep_slots.len(), num_slots);
assert_eq!(
accounts_to_combine
.accounts_keep_slots
.get(&slot1)
.unwrap()
.accounts
.iter()
.map(|meta| meta.pubkey())
.collect::<Vec<_>>(),
vec![pk_with_2_refs]
);
assert_eq!(accounts_to_combine.accounts_to_combine.len(), 1);
let one_ref_accounts = &accounts_to_combine
.accounts_to_combine
.first()
.unwrap()
.alive_accounts
.one_ref
.accounts;
assert_eq!(
one_ref_accounts
.iter()
.map(|meta| meta.pubkey())
.collect::<Vec<_>>(),
vec![&pk_with_1_ref]
);
assert_eq!(
one_ref_accounts
.iter()
.map(|meta| meta.to_account_shared_data())
.collect::<Vec<_>>(),
vec![account_with_1_ref]
);
assert!(accounts_to_combine
.accounts_to_combine
.iter()
.all(|shrink_collect| shrink_collect.alive_accounts.many_refs.accounts.is_empty()));
// test write_ancient_accounts_to_same_slot_multiple_refs since we built interesting 'AccountsToCombine'
let write_ancient_accounts = match method {
TestWriteMultipleRefs::MultipleRefs => {
let mut write_ancient_accounts = WriteAncientAccounts::default();
db.write_ancient_accounts_to_same_slot_multiple_refs(
accounts_to_combine.accounts_keep_slots.values(),
&mut write_ancient_accounts,
);
write_ancient_accounts
}
TestWriteMultipleRefs::PackedStorages => {
let packed_contents = Vec::default();
db.write_packed_storages(&accounts_to_combine, packed_contents)
}
};
assert_eq!(write_ancient_accounts.shrinks_in_progress.len(), num_slots);
let mut shrinks_in_progress = write_ancient_accounts
.shrinks_in_progress
.iter()
.collect::<Vec<_>>();
shrinks_in_progress.sort_unstable_by(|a, b| a.0.cmp(b.0));
assert_eq!(
shrinks_in_progress
.iter()
.map(|(slot, _)| **slot)
.collect::<Vec<_>>(),
slots_vec
);
assert_eq!(
shrinks_in_progress
.iter()
.map(|(_, shrink_in_progress)| shrink_in_progress.old_storage().append_vec_id())
.collect::<Vec<_>>(),
storages
.iter()
.map(|storage| storage.append_vec_id())
.collect::<Vec<_>>()
);
// assert that we wrote the 2_ref account to the newly shrunk append vec
let shrink_in_progress = shrinks_in_progress.first().unwrap().1;
let accounts_shrunk_same_slot = shrink_in_progress.new_storage().accounts.accounts(0);
assert_eq!(accounts_shrunk_same_slot.len(), 1);
assert_eq!(
accounts_shrunk_same_slot.first().unwrap().pubkey(),
pk_with_2_refs
);
assert_eq!(
accounts_shrunk_same_slot
.first()
.unwrap()
.to_account_shared_data(),
account_with_2_refs.to_account_shared_data()
);
}
}
#[test]
@ -1941,6 +2026,7 @@ pub mod tests {
enum TestWriteAncient {
OnePackedStorage,
AncientAccounts,
PackedStorages,
}
#[test]
@ -1984,12 +2070,12 @@ pub mod tests {
.map(|storage| storage.written_bytes())
.sum::<u64>();
assert_eq!(
bytes,
initial_accounts
.iter()
.map(|(_, account)| aligned_stored_size(account.data().len()) as u64)
.sum::<u64>()
);
bytes,
initial_accounts
.iter()
.map(|(_, account)| aligned_stored_size(account.data().len()) as u64)
.sum::<u64>()
);
if num_slots > 0 {
let mut write_ancient_accounts = WriteAncientAccounts::default();
@ -2009,6 +2095,22 @@ pub mod tests {
&mut write_ancient_accounts,
);
}
TestWriteAncient::PackedStorages => {
let packed = PackedAncientStorage { accounts, bytes };
let accounts_to_combine = AccountsToCombine {
// target slots are supposed to be read in reverse order, so test that
target_slots_sorted: vec![
Slot::MAX, // this asserts if it gets used
Slot::MAX,
target_slot,
],
..AccountsToCombine::default()
};
write_ancient_accounts = db
.write_packed_storages(&accounts_to_combine, vec![packed]);
}
};
let mut result = write_ancient_accounts.shrinks_in_progress;
let one = result.drain().collect::<Vec<_>>();