snapshot storage path uses 1 append vec per slot (#29627)

This commit is contained in:
Jeff Washington (jwash) 2023-01-11 14:05:15 -06:00 committed by GitHub
parent d89cf0d28b
commit 544b9745c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 199 additions and 255 deletions

View File

@ -328,7 +328,7 @@ mod tests {
block_height: slot,
slot_deltas: vec![],
snapshot_links: link_snapshots_dir,
snapshot_storages: vec![storage_entries],
snapshot_storages: storage_entries,
snapshot_version: SnapshotVersion::default(),
snapshot_type: SnapshotType::FullSnapshot,
};

View File

@ -442,7 +442,6 @@ fn test_concurrent_snapshot_packaging(
let snapshot_storage_files: HashSet<_> = bank_forks[slot]
.get_snapshot_storages(None)
.into_iter()
.flatten()
.map(|s| s.get_path())
.collect();

View File

@ -610,7 +610,10 @@ impl<'a> MultiThreadProgress<'a> {
pub type AtomicAppendVecId = AtomicU32;
pub type AppendVecId = u32;
pub type SnapshotStorage = Vec<Arc<AccountStorageEntry>>;
pub type SnapshotStorageOne = Arc<AccountStorageEntry>;
pub type SnapshotStorages = Vec<SnapshotStorage>;
/// exactly 1 append vec per slot
pub type SnapshotStoragesOne = SnapshotStorage;
// Each slot has a set of storage entries.
pub(crate) type SlotStores = Arc<RwLock<HashMap<AppendVecId, Arc<AccountStorageEntry>>>>;
@ -1515,9 +1518,9 @@ impl SplitAncientStorages {
) -> Vec<Slot> {
let range = snapshot_storages.range();
let mut ancient_slots = Vec::default();
for (slot, storages) in snapshot_storages.iter_range(&(range.start..one_epoch_old_slot)) {
if let Some(storages) = storages {
if storages.len() == 1 && is_ancient(&storages.first().unwrap().accounts) {
for (slot, storage) in snapshot_storages.iter_range(&(range.start..one_epoch_old_slot)) {
if let Some(storage) = storage {
if is_ancient(&storage.accounts) {
ancient_slots.push(slot);
continue; // was ancient, keep looking
}
@ -6819,7 +6822,7 @@ impl AccountsDb {
}
/// iterate over a single storage, calling scanner on each item
fn scan_single_account_storage<S>(storage: &Arc<AccountStorageEntry>, scanner: &mut S)
fn scan_single_account_storage<S>(storage: &SnapshotStorageOne, scanner: &mut S)
where
S: AppendVecScan,
{
@ -6830,10 +6833,8 @@ impl AccountsDb {
});
}
fn scan_multiple_account_storages_one_slot<S>(
storages: &[Arc<AccountStorageEntry>],
scanner: &mut S,
) where
fn scan_multiple_account_storages_one_slot<S>(storages: &[SnapshotStorageOne], scanner: &mut S)
where
S: AppendVecScan,
{
let mut len = storages.len();
@ -6897,23 +6898,15 @@ impl AccountsDb {
}
}
fn update_old_slot_stats(&self, stats: &HashStats, sub_storages: Option<&SnapshotStorage>) {
if let Some(sub_storages) = sub_storages {
fn update_old_slot_stats(&self, stats: &HashStats, storage: Option<&Arc<AccountStorageEntry>>) {
if let Some(storage) = storage {
stats.roots_older_than_epoch.fetch_add(1, Ordering::Relaxed);
let mut ancients = 0;
let num_accounts = sub_storages
.iter()
.map(|storage| {
if is_ancient(&storage.accounts) {
ancients += 1;
}
storage.count()
})
.sum();
let sizes = sub_storages
.iter()
.map(|storage| storage.total_bytes())
.sum::<u64>();
let num_accounts = storage.count();
if is_ancient(&storage.accounts) {
ancients += 1;
}
let sizes = storage.total_bytes();
stats
.append_vec_sizes_older_than_epoch
.fetch_add(sizes as usize, Ordering::Relaxed);
@ -6961,16 +6954,12 @@ impl AccountsDb {
/// return true iff storages are valid for loading from cache
fn hash_storage_info(
hasher: &mut impl StdHasher,
storages: Option<&SnapshotStorage>,
storage: Option<&SnapshotStorageOne>,
slot: Slot,
) -> bool {
if let Some(sub_storages) = storages {
if sub_storages.len() > 1 {
// Having > 1 appendvecs per slot is not expected. If we have that, we just fail to load from the cache for this slot.
return false;
}
if let Some(sub_storage) = storage {
// hash info about this storage
let append_vec = sub_storages.first().unwrap();
let append_vec = sub_storage;
append_vec.written_bytes().hash(hasher);
let storage_file = append_vec.accounts.get_path();
slot.hash(hasher);
@ -7074,12 +7063,10 @@ impl AccountsDb {
let mut init_accum = true;
// load from cache failed, so create the cache file for this chunk
for (slot, sub_storages) in snapshot_storages.iter_range(&range_this_chunk) {
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
let mut ancient = false;
let (_, scan) = measure!(if let Some(sub_storages) = sub_storages {
if let Some(storage) = sub_storages.first() {
ancient = is_ancient(&storage.accounts);
}
let (_, scan) = measure!(if let Some(storage) = storage {
ancient = is_ancient(&storage.accounts);
if init_accum {
let range = bin_range.end - bin_range.start;
scanner.init_accum(range);
@ -7087,7 +7074,10 @@ impl AccountsDb {
}
scanner.set_slot(slot);
Self::scan_multiple_account_storages_one_slot(sub_storages, &mut scanner);
Self::scan_multiple_account_storages_one_slot(
&[Arc::clone(storage)],
&mut scanner,
);
});
if ancient {
stats
@ -7129,18 +7119,16 @@ impl AccountsDb {
let acceptable_straggler_slot_count = 100; // do nothing special for these old stores which will likely get cleaned up shortly
let sub = slots_per_epoch + acceptable_straggler_slot_count;
let in_epoch_range_start = max.saturating_sub(sub);
for (slot, storages) in storages.iter_range(&(..in_epoch_range_start)) {
if let Some(storages) = storages {
storages.iter().for_each(|store| {
if !is_ancient(&store.accounts) {
// ancient stores are managed separately - we expect them to be old and keeping accounts
// We can expect the normal processes will keep them cleaned.
// If we included them here then ALL accounts in ALL ancient append vecs will be visited by clean each time.
self.dirty_stores
.insert((slot, store.append_vec_id()), store.clone());
num_dirty_slots += 1;
}
});
for (slot, store) in storages.iter_range(&(..in_epoch_range_start)) {
if let Some(store) = store {
if !is_ancient(&store.accounts) {
// ancient stores are managed separately - we expect them to be old and keeping accounts
// We can expect the normal processes will keep them cleaned.
// If we included them here then ALL accounts in ALL ancient append vecs will be visited by clean each time.
self.dirty_stores
.insert((slot, store.append_vec_id()), store.clone());
num_dirty_slots += 1;
}
}
}
mark_time.stop();
@ -8398,7 +8386,7 @@ impl AccountsDb {
&self,
requested_slots: impl RangeBounds<Slot> + Sync,
ancestors: Option<&Ancestors>,
) -> (SnapshotStorages, Vec<Slot>) {
) -> (SnapshotStoragesOne, Vec<Slot>) {
let mut m = Measure::start("get slots");
let slots_and_storages = self
.storage
@ -8427,16 +8415,15 @@ impl AccountsDb {
.unwrap_or_default()
})
.filter_map(|(slot, storages)| {
let storages = storages
storages
.read()
.unwrap()
.values()
.next()
.filter(|x| x.has_accounts())
.cloned()
.collect::<Vec<_>>();
(!storages.is_empty()).then_some((storages, *slot))
.map(|storage| (Arc::clone(storage), *slot))
})
.collect::<Vec<(SnapshotStorage, Slot)>>()
.collect::<Vec<(SnapshotStorageOne, Slot)>>()
})
.collect::<Vec<_>>()
});
@ -9700,7 +9687,7 @@ pub mod tests {
fn sample_storages_and_account_in_slot(
slot: Slot,
accounts: &AccountsDb,
) -> (SnapshotStorages, Vec<CalculateHashIntermediate>) {
) -> (SnapshotStoragesOne, Vec<CalculateHashIntermediate>) {
let pubkey0 = Pubkey::new(&[0u8; 32]);
let pubkey127 = Pubkey::new(&[0x7fu8; 32]);
let pubkey128 = Pubkey::new(&[0x80u8; 32]);
@ -9754,21 +9741,19 @@ pub mod tests {
storages
.iter()
.zip(slots.iter())
.for_each(|(storages, slot)| {
for storage in storages {
assert_eq!(&storage.slot(), slot);
}
.for_each(|(storage, slot)| {
assert_eq!(&storage.slot(), slot);
});
(storages, raw_expected)
}
fn sample_storages_and_accounts(
accounts: &AccountsDb,
) -> (SnapshotStorages, Vec<CalculateHashIntermediate>) {
) -> (SnapshotStoragesOne, Vec<CalculateHashIntermediate>) {
sample_storages_and_account_in_slot(1, accounts)
}
fn get_storage_refs(input: &[SnapshotStorage]) -> SortedStorages {
fn get_storage_refs(input: &[SnapshotStorageOne]) -> SortedStorages {
SortedStorages::new(input)
}
@ -10109,14 +10094,14 @@ pub mod tests {
assert_eq!(result, (expected_accounts_hash, sum));
}
fn sample_storage() -> (SnapshotStorages, usize, Slot) {
fn sample_storage() -> (SnapshotStoragesOne, usize, Slot) {
let (_temp_dirs, paths) = get_temp_accounts_paths(1).unwrap();
let slot_expected: Slot = 0;
let size: usize = 123;
let data = AccountStorageEntry::new(&paths[0], slot_expected, 0, size as u64);
let arc = Arc::new(data);
let storages = vec![vec![arc]];
let storages = vec![arc];
(storages, size, slot_expected)
}
@ -10175,10 +10160,10 @@ pub mod tests {
data.accounts = av;
let arc = Arc::new(data);
let storages = vec![vec![arc]];
let storages = vec![arc];
let pubkey = solana_sdk::pubkey::new_rand();
let acc = AccountSharedData::new(1, 48, AccountSharedData::default().owner());
append_single_account_with_default_hash(&storages[0][0].accounts, &pubkey, &acc, 1);
append_single_account_with_default_hash(&storages[0].accounts, &pubkey, &acc, 1);
let calls = Arc::new(AtomicU64::new(0));
let temp_dir = TempDir::new().unwrap();
@ -10261,10 +10246,10 @@ pub mod tests {
data.accounts = av;
let arc = Arc::new(data);
let storages = vec![vec![arc]];
let storages = vec![arc];
let pubkey = solana_sdk::pubkey::new_rand();
let acc = AccountSharedData::new(1, 48, AccountSharedData::default().owner());
append_single_account_with_default_hash(&storages[0][0].accounts, &pubkey, &acc, 1);
append_single_account_with_default_hash(&storages[0].accounts, &pubkey, &acc, 1);
let calls = Arc::new(AtomicU64::new(0));
@ -10277,29 +10262,7 @@ pub mod tests {
value_to_use_for_lamports: expected,
};
AccountsDb::scan_multiple_account_storages_one_slot(&storages[0], &mut test_scan);
let accum = test_scan.scanning_complete();
assert_eq!(calls.load(Ordering::Relaxed), 1);
assert_eq!(
accum
.iter()
.flatten()
.map(|a| a.lamports)
.collect::<Vec<_>>(),
vec![expected]
);
let calls = Arc::new(AtomicU64::new(0));
let mut test_scan = TestScan {
calls: calls.clone(),
pubkey,
slot_expected,
accum: Vec::default(),
current_slot: 0,
value_to_use_for_lamports: expected,
};
AccountsDb::scan_single_account_storage(&storages[0][0], &mut test_scan);
AccountsDb::scan_multiple_account_storages_one_slot(&storages, &mut test_scan);
let accum = test_scan.scanning_complete();
assert_eq!(calls.load(Ordering::Relaxed), 1);
assert_eq!(
@ -10313,17 +10276,12 @@ pub mod tests {
}
fn append_sample_data_to_storage(
storages: &SnapshotStorages,
storages: &SnapshotStoragesOne,
pubkey: &Pubkey,
write_version: StoredMetaWriteVersion,
) {
let acc = AccountSharedData::new(1, 48, AccountSharedData::default().owner());
append_single_account_with_default_hash(
&storages[0][0].accounts,
pubkey,
&acc,
write_version,
);
append_single_account_with_default_hash(&storages[0].accounts, pubkey, &acc, write_version);
}
fn sample_storage_with_entries(
@ -10331,7 +10289,7 @@ pub mod tests {
write_version: StoredMetaWriteVersion,
slot: Slot,
pubkey: &Pubkey,
) -> SnapshotStorages {
) -> SnapshotStoragesOne {
sample_storage_with_entries_id(tf, write_version, slot, pubkey, 0)
}
@ -10341,7 +10299,7 @@ pub mod tests {
slot: Slot,
pubkey: &Pubkey,
id: AppendVecId,
) -> SnapshotStorages {
) -> SnapshotStoragesOne {
let (_temp_dirs, paths) = get_temp_accounts_paths(1).unwrap();
let size: usize = 123;
let mut data = AccountStorageEntry::new(&paths[0], slot, id, size as u64);
@ -10349,7 +10307,7 @@ pub mod tests {
data.accounts = av;
let arc = Arc::new(data);
let storages = vec![vec![arc]];
let storages = vec![arc];
append_sample_data_to_storage(&storages, pubkey, write_version);
storages
}
@ -10368,12 +10326,8 @@ pub mod tests {
let pubkey2 = solana_sdk::pubkey::new_rand();
for swap in [false, true].iter() {
let mut storages = [
sample_storage_with_entries(&tf, write_version1, slot_expected, &pubkey1)
.remove(0)
.remove(0),
sample_storage_with_entries(&tf, write_version2, slot_expected, &pubkey2)
.remove(0)
.remove(0),
sample_storage_with_entries(&tf, write_version1, slot_expected, &pubkey1).remove(0),
sample_storage_with_entries(&tf, write_version2, slot_expected, &pubkey2).remove(0),
];
if *swap {
storages[..].swap(0, 1);
@ -12907,11 +12861,7 @@ pub mod tests {
accounts.store_for_tests(current_slot, &[(&pubkey3, &zero_lamport_account)]);
let snapshot_stores = accounts.get_snapshot_storages(..=current_slot, None).0;
let total_accounts: usize = snapshot_stores
.iter()
.flatten()
.map(|s| s.all_accounts().len())
.sum();
let total_accounts: usize = snapshot_stores.iter().map(|s| s.all_accounts().len()).sum();
assert!(!snapshot_stores.is_empty());
assert!(total_accounts > 0);
@ -12924,11 +12874,8 @@ pub mod tests {
accounts.print_accounts_stats("Post-D clean");
let total_accounts_post_clean: usize = snapshot_stores
.iter()
.flatten()
.map(|s| s.all_accounts().len())
.sum();
let total_accounts_post_clean: usize =
snapshot_stores.iter().map(|s| s.all_accounts().len()).sum();
assert_eq!(total_accounts, total_accounts_post_clean);
// should clean all 3 pubkeys
@ -16645,7 +16592,7 @@ pub mod tests {
let id = 1;
let size = 1;
let non_ancient_storage = Arc::new(AccountStorageEntry::new(path, slot2, id, size));
let raw_storages = vec![vec![non_ancient_storage.clone()]];
let raw_storages = vec![non_ancient_storage.clone()];
let snapshot_storages = SortedStorages::new(&raw_storages);
// test without an ancient append vec
let one_epoch_old_slot = 0;
@ -16658,7 +16605,7 @@ pub mod tests {
assert_eq!(Vec::<Slot>::default(), ancient_slots);
// now test with an ancient append vec
let raw_storages = vec![vec![ancient.clone()]];
let raw_storages = vec![ancient.clone()];
let snapshot_storages = SortedStorages::new(&raw_storages);
let one_epoch_old_slot = 0;
let ancient_slots =
@ -16670,7 +16617,7 @@ pub mod tests {
assert_eq!(vec![slot1_ancient], ancient_slots);
// now test with an ancient append vec and then a non-ancient append vec
let raw_storages = vec![vec![ancient.clone()], vec![non_ancient_storage.clone()]];
let raw_storages = vec![ancient.clone(), non_ancient_storage.clone()];
let snapshot_storages = SortedStorages::new(&raw_storages);
let one_epoch_old_slot = 0;
let ancient_slots =
@ -16683,9 +16630,9 @@ pub mod tests {
// ancient, non-ancient, ancient
let raw_storages = vec![
vec![ancient.clone()],
vec![non_ancient_storage.clone()],
vec![ancient3.new_storage().clone()],
ancient.clone(),
non_ancient_storage.clone(),
ancient3.new_storage().clone(),
];
let snapshot_storages = SortedStorages::new(&raw_storages);
let one_epoch_old_slot = 0;
@ -16700,10 +16647,10 @@ pub mod tests {
if sparse {
// ancient, ancient, non-ancient, ancient
let raw_storages = vec![
vec![Arc::clone(&ancient)],
vec![Arc::clone(&ancient_1_plus)],
vec![non_ancient_storage],
vec![Arc::clone(ancient3.new_storage())],
Arc::clone(&ancient),
Arc::clone(&ancient_1_plus),
non_ancient_storage,
Arc::clone(ancient3.new_storage()),
];
let snapshot_storages = SortedStorages::new(&raw_storages[..]);
let one_epoch_old_slot = 0;
@ -16768,24 +16715,6 @@ pub mod tests {
// can't assert hash here - it is a function of mod date
assert!(load);
}
{
let mut hasher = std::collections::hash_map::DefaultHasher::new();
let slot: Slot = 0;
let slot1 = 1;
let tf = crate::append_vec::test_utils::get_append_vec_path(
"test_accountsdb_scan_account_storage_no_bank",
);
let write_version1 = 0;
let pubkey1 = solana_sdk::pubkey::new_rand();
let mut storages = sample_storage_with_entries(&tf, write_version1, slot, &pubkey1);
let mut storages2 = sample_storage_with_entries(&tf, write_version1, slot1, &pubkey1);
storages[0].push(storages2[0].remove(0));
let load = AccountsDb::hash_storage_info(&mut hasher, Some(&storages[0]), slot);
let _ = hasher.finish();
// cannot load because we have 2 storages
assert!(!load);
}
}
#[test]
@ -17460,7 +17389,7 @@ pub mod tests {
for i in 0..num_slots {
let id = starting_id + (i as AppendVecId);
let pubkey1 = solana_sdk::pubkey::new_rand();
let storages = sample_storage_with_entries_id(
let storage = sample_storage_with_entries_id(
tf,
write_version1,
starting_slot + (i as Slot),
@ -17469,7 +17398,7 @@ pub mod tests {
)
.pop()
.unwrap();
insert_store(db, Arc::clone(&storages[0]));
insert_store(db, Arc::clone(&storage));
}
let storage = db.get_storage_for_slot(starting_slot).unwrap();
@ -17586,13 +17515,13 @@ pub mod tests {
);
let write_version1 = 0;
let pubkey1 = solana_sdk::pubkey::new_rand();
let storages = sample_storage_with_entries(&tf, write_version1, slot5, &pubkey1)
let storage = sample_storage_with_entries(&tf, write_version1, slot5, &pubkey1)
.pop()
.unwrap();
let mut current_ancient = CurrentAncientAppendVec::default();
let should_move = db.should_move_to_ancient_append_vec(
&storages[0],
&storage,
&mut current_ancient,
slot5,
CAN_RANDOMLY_SHRINK_FALSE,
@ -17601,9 +17530,9 @@ pub mod tests {
// slot is not ancient, so it is good to move
assert!(should_move);
current_ancient = CurrentAncientAppendVec::new(slot5, Arc::clone(&storages[0])); // just 'some', contents don't matter
current_ancient = CurrentAncientAppendVec::new(slot5, Arc::clone(&storage)); // just 'some', contents don't matter
let should_move = db.should_move_to_ancient_append_vec(
&storages[0],
&storage,
&mut current_ancient,
slot5,
CAN_RANDOMLY_SHRINK_FALSE,
@ -17611,7 +17540,7 @@ pub mod tests {
// should have kept the same 'current_ancient'
assert_eq!(current_ancient.slot(), slot5);
assert_eq!(current_ancient.append_vec().slot(), slot5);
assert_eq!(current_ancient.append_vec_id(), storages[0].append_vec_id());
assert_eq!(current_ancient.append_vec_id(), storage.append_vec_id());
// slot is not ancient, so it is good to move
assert!(should_move);
@ -17731,7 +17660,7 @@ pub mod tests {
}
fn make_ancient_append_vec_full(ancient: &Arc<AccountStorageEntry>) {
let vecs = vec![vec![ancient.clone()]];
let vecs = vec![ancient.clone()];
for _ in 0..100 {
append_sample_data_to_storage(&vecs, &Pubkey::default(), 0);
}

View File

@ -1,6 +1,6 @@
use {
crate::{
accounts_db::{SnapshotStorages, PUBKEY_BINS_FOR_CALCULATING_HASHES},
accounts_db::{SnapshotStoragesOne, PUBKEY_BINS_FOR_CALCULATING_HASHES},
ancestors::Ancestors,
rent_collector::RentCollector,
},
@ -173,20 +173,14 @@ pub struct HashStats {
pub count_ancient_scans: AtomicU64,
}
impl HashStats {
pub fn calc_storage_size_quartiles(&mut self, storages: &SnapshotStorages) {
pub fn calc_storage_size_quartiles(&mut self, storages: &SnapshotStoragesOne) {
let mut sum = 0;
let mut sizes = storages
.iter()
.flat_map(|storages| {
let result = storages
.iter()
.map(|storage| {
let cap = storage.accounts.capacity() as usize;
sum += cap;
cap
})
.collect::<Vec<_>>();
result
.map(|storage| {
let cap = storage.accounts.capacity() as usize;
sum += cap;
cap
})
.collect::<Vec<_>>();
sizes.sort_unstable();

View File

@ -45,7 +45,7 @@ use {
},
accounts_db::{
AccountShrinkThreshold, AccountsDbConfig, CalcAccountsHashDataSource,
IncludeSlotInHash, SnapshotStorages, ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS,
IncludeSlotInHash, SnapshotStoragesOne, ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS,
ACCOUNTS_DB_CONFIG_FOR_TESTING,
},
accounts_hash::AccountsHash,
@ -6848,7 +6848,7 @@ impl Bank {
/// Get this bank's storages to use for snapshots.
///
/// If a base slot is provided, return only the storages that are *higher* than this slot.
pub fn get_snapshot_storages(&self, base_slot: Option<Slot>) -> SnapshotStorages {
pub fn get_snapshot_storages(&self, base_slot: Option<Slot>) -> SnapshotStoragesOne {
// if a base slot is provided, request storages starting at the slot *after*
let start_slot = base_slot.map_or(0, |slot| slot.saturating_add(1));
// we want to *include* the storage at our slot

View File

@ -13,7 +13,7 @@ use {
bank::{Bank, BankTestConfig},
epoch_accounts_hash,
genesis_utils::{self, activate_all_features, activate_feature},
snapshot_utils::ArchiveFormat,
snapshot_utils::{get_storages_to_serialize, ArchiveFormat},
status_cache::StatusCache,
},
bincode::serialize_into,
@ -43,7 +43,7 @@ fn copy_append_vecs<P: AsRef<Path>>(
let storage_entries = accounts_db.get_snapshot_storages(RangeFull, None).0;
let storage: AccountStorageMap = AccountStorageMap::with_capacity(storage_entries.len());
let mut next_append_vec_id = 0;
for storage_entry in storage_entries.into_iter().flatten() {
for storage_entry in storage_entries.into_iter() {
// Copy file to new directory
let storage_path = storage_entry.get_path();
let file_name = AppendVec::file_name(storage_entry.slot(), storage_entry.append_vec_id());
@ -187,7 +187,7 @@ fn test_accounts_serialize_style(serde_style: SerdeStyle) {
&mut writer,
&accounts.accounts_db,
0,
&accounts.accounts_db.get_snapshot_storages(..=0, None).0,
&get_storages_to_serialize(&accounts.accounts_db.get_snapshot_storages(..=0, None).0),
)
.unwrap();
@ -281,7 +281,7 @@ fn test_bank_serialize_style(
serde_style,
&mut std::io::BufWriter::new(&mut writer),
&bank2,
&snapshot_storages,
&get_storages_to_serialize(&snapshot_storages),
)
.unwrap();
@ -431,7 +431,7 @@ pub(crate) fn reconstruct_accounts_db_via_serialization(
&mut writer,
accounts,
slot,
&snapshot_storages,
&get_storages_to_serialize(&snapshot_storages),
)
.unwrap();
@ -516,11 +516,12 @@ fn test_extra_fields_eof() {
let snapshot_storages = bank.get_snapshot_storages(None);
let mut buf = vec![];
let mut writer = Cursor::new(&mut buf);
crate::serde_snapshot::bank_to_stream(
SerdeStyle::Newer,
&mut std::io::BufWriter::new(&mut writer),
&bank,
&snapshot_storages,
&get_storages_to_serialize(&snapshot_storages),
)
.unwrap();
@ -643,11 +644,12 @@ fn test_blank_extra_fields() {
let snapshot_storages = bank.get_snapshot_storages(None);
let mut buf = vec![];
let mut writer = Cursor::new(&mut buf);
crate::serde_snapshot::bank_to_stream_no_extra_fields(
SerdeStyle::Newer,
&mut std::io::BufWriter::new(&mut writer),
&bank,
&snapshot_storages,
&get_storages_to_serialize(&snapshot_storages),
)
.unwrap();
@ -713,7 +715,7 @@ mod test_bank_serialize {
(SerializableBankAndStorage::<newer::Context> {
bank,
snapshot_storages: &snapshot_storages,
snapshot_storages: &get_storages_to_serialize(&snapshot_storages),
phantom: std::marker::PhantomData::default(),
})
.serialize(s)

View File

@ -3,7 +3,8 @@
use {
crate::{
accounts_db::{
AccountsDb, GetUniqueAccountsResult, PurgeStats, SnapshotStorage, StoreReclaims,
AccountsDb, GetUniqueAccountsResult, PurgeStats, SnapshotStorage, SnapshotStorageOne,
StoreReclaims,
},
bank::Bank,
builtins, static_ids,
@ -282,11 +283,11 @@ impl<'a> SnapshotMinimizer<'a> {
let dead_slots = Mutex::new(Vec::new());
let dead_storages = Mutex::new(Vec::new());
snapshot_storages.into_par_iter().for_each(|storages| {
let slot = storages.first().unwrap().slot();
snapshot_storages.into_par_iter().for_each(|storage| {
let slot = storage.slot();
if slot != self.starting_slot {
if minimized_slot_set.contains(&slot) {
self.filter_storages(storages, &dead_storages);
self.filter_storage(&storage, &dead_storages);
} else {
dead_slots.lock().unwrap().push(slot);
}
@ -299,13 +300,11 @@ impl<'a> SnapshotMinimizer<'a> {
}
/// Creates new storage replacing `storages` that contains only accounts in `minimized_account_set`.
fn filter_storages(&self, storages: SnapshotStorage, dead_storages: &Mutex<SnapshotStorage>) {
let slot = storages.first().unwrap().slot();
fn filter_storage(&self, storage: &SnapshotStorageOne, dead_storages: &Mutex<SnapshotStorage>) {
let slot = storage.slot();
let GetUniqueAccountsResult {
stored_accounts, ..
} = self
.accounts_db()
.get_unique_accounts_from_storage(storages.first().unwrap());
} = self.accounts_db().get_unique_accounts_from_storage(storage);
let keep_accounts_collect = Mutex::new(Vec::with_capacity(stored_accounts.len()));
let purge_pubkeys_collect = Mutex::new(Vec::with_capacity(stored_accounts.len()));
@ -678,10 +677,8 @@ mod tests {
assert_eq!(snapshot_storages.len(), 3);
let mut account_count = 0;
snapshot_storages.into_iter().for_each(|storages| {
storages.into_iter().for_each(|storage| {
account_count += storage.accounts.account_iter().count();
});
snapshot_storages.into_iter().for_each(|storage| {
account_count += storage.accounts.account_iter().count();
});
assert_eq!(

View File

@ -1,7 +1,7 @@
use {
crate::{
accounts::Accounts,
accounts_db::SnapshotStorages,
accounts_db::SnapshotStoragesOne,
accounts_hash::AccountsHash,
bank::{Bank, BankSlotDelta},
epoch_accounts_hash::EpochAccountsHash,
@ -36,7 +36,7 @@ pub struct AccountsPackage {
pub package_type: AccountsPackageType,
pub slot: Slot,
pub block_height: Slot,
pub snapshot_storages: SnapshotStorages,
pub snapshot_storages: SnapshotStoragesOne,
pub expected_capitalization: u64,
pub accounts_hash_for_testing: Option<AccountsHash>,
pub accounts: Arc<Accounts>,
@ -62,7 +62,7 @@ impl AccountsPackage {
slot_deltas: Vec<BankSlotDelta>,
full_snapshot_archives_dir: impl AsRef<Path>,
incremental_snapshot_archives_dir: impl AsRef<Path>,
snapshot_storages: SnapshotStorages,
snapshot_storages: SnapshotStoragesOne,
archive_format: ArchiveFormat,
snapshot_version: SnapshotVersion,
accounts_hash_for_testing: Option<AccountsHash>,
@ -125,7 +125,7 @@ impl AccountsPackage {
pub fn new_for_epoch_accounts_hash(
package_type: AccountsPackageType,
bank: &Bank,
snapshot_storages: SnapshotStorages,
snapshot_storages: SnapshotStoragesOne,
accounts_hash_for_testing: Option<AccountsHash>,
) -> Self {
assert_eq!(package_type, AccountsPackageType::EpochAccountsHash);
@ -141,7 +141,7 @@ impl AccountsPackage {
fn _new(
package_type: AccountsPackageType,
bank: &Bank,
snapshot_storages: SnapshotStorages,
snapshot_storages: SnapshotStoragesOne,
accounts_hash_for_testing: Option<AccountsHash>,
snapshot_info: Option<SupplementalSnapshotInfo>,
) -> Self {
@ -167,7 +167,7 @@ impl AccountsPackage {
package_type: AccountsPackageType::AccountsHashVerifier,
slot: Slot::default(),
block_height: Slot::default(),
snapshot_storages: SnapshotStorages::default(),
snapshot_storages: SnapshotStoragesOne::default(),
expected_capitalization: u64::default(),
accounts_hash_for_testing: Option::default(),
accounts: Arc::new(Accounts::default_for_tests()),
@ -239,7 +239,7 @@ pub struct SnapshotPackage {
pub block_height: Slot,
pub slot_deltas: Vec<BankSlotDelta>,
pub snapshot_links: TempDir,
pub snapshot_storages: SnapshotStorages,
pub snapshot_storages: SnapshotStoragesOne,
pub snapshot_version: SnapshotVersion,
pub snapshot_type: SnapshotType,
}
@ -263,16 +263,9 @@ impl SnapshotPackage {
snapshot_info.archive_format,
),
SnapshotType::IncrementalSnapshot(incremental_snapshot_base_slot) => {
snapshot_storages.retain(|storages| {
storages
.first() // storages are grouped by slot in the outer Vec, so all storages will have the same slot as the first
.map(|storage| storage.slot() > incremental_snapshot_base_slot)
.unwrap_or_default()
});
snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot);
assert!(
snapshot_storages.iter().all(|storage| storage
.iter()
.all(|entry| entry.slot() > incremental_snapshot_base_slot)),
snapshot_storages.iter().all(|storage| storage.slot() > incremental_snapshot_base_slot),
"Incremental snapshot package must only contain storage entries where slot > incremental snapshot base slot (i.e. full snapshot slot)!"
);
snapshot_utils::build_incremental_snapshot_archive_path(

View File

@ -3,7 +3,7 @@ use {
account_storage::AccountStorageMap,
accounts_db::{
AccountShrinkThreshold, AccountsDbConfig, AtomicAppendVecId,
CalcAccountsHashDataSource, SnapshotStorage, SnapshotStorages,
CalcAccountsHashDataSource, SnapshotStorageOne, SnapshotStorages, SnapshotStoragesOne,
},
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
@ -446,7 +446,7 @@ pub fn archive_snapshot_package(
.map_err(|e| SnapshotError::IoWithSource(e, "create staging symlinks"))?;
// Add the AppendVecs into the compressible list
for storage in snapshot_package.snapshot_storages.iter().flatten() {
for storage in snapshot_package.snapshot_storages.iter() {
storage.flush()?;
let storage_path = storage.get_path();
let output_path = staging_accounts_dir.join(crate::append_vec::AppendVec::file_name(
@ -847,7 +847,7 @@ where
pub fn add_bank_snapshot(
bank_snapshots_dir: impl AsRef<Path>,
bank: &Bank,
snapshot_storages: &[SnapshotStorage],
snapshot_storages: &[SnapshotStorageOne],
snapshot_version: SnapshotVersion,
) -> Result<BankSnapshotInfo> {
let mut add_snapshot_time = Measure::start("add-snapshot-ms");
@ -871,7 +871,8 @@ pub fn add_bank_snapshot(
let serde_style = match snapshot_version {
SnapshotVersion::V1_2_0 => SerdeStyle::Newer,
};
bank_to_stream(serde_style, stream.by_ref(), bank, snapshot_storages)?;
let serialize = get_storages_to_serialize(snapshot_storages);
bank_to_stream(serde_style, stream.by_ref(), bank, &serialize)?;
Ok(())
};
let consumed_size =
@ -903,6 +904,17 @@ pub fn add_bank_snapshot(
})
}
/// serializing needs Vec<Vec<...>>, but data structure at runtime is Vec<...>
/// translates to what we need
pub(crate) fn get_storages_to_serialize(
snapshot_storages: &[SnapshotStorageOne],
) -> SnapshotStorages {
snapshot_storages
.iter()
.map(|storage| vec![Arc::clone(storage)])
.collect::<Vec<_>>()
}
fn serialize_status_cache(
slot: Slot,
slot_deltas: &[BankSlotDelta],
@ -2165,14 +2177,12 @@ pub fn purge_old_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
}
/// Get the snapshot storages for this bank
pub fn get_snapshot_storages(bank: &Bank) -> SnapshotStorages {
pub fn get_snapshot_storages(bank: &Bank) -> SnapshotStoragesOne {
let mut measure_snapshot_storages = Measure::start("snapshot-storages");
let snapshot_storages = bank.get_snapshot_storages(None);
measure_snapshot_storages.stop();
let snapshot_storages_count = snapshot_storages.iter().map(Vec::len).sum::<usize>();
datapoint_info!(
"get_snapshot_storages",
("snapshot-storages-count", snapshot_storages_count, i64),
(
"snapshot-storages-time-ms",
measure_snapshot_storages.as_ms(),
@ -2283,7 +2293,7 @@ pub fn package_and_archive_full_snapshot(
bank_snapshots_dir: impl AsRef<Path>,
full_snapshot_archives_dir: impl AsRef<Path>,
incremental_snapshot_archives_dir: impl AsRef<Path>,
snapshot_storages: SnapshotStorages,
snapshot_storages: SnapshotStoragesOne,
archive_format: ArchiveFormat,
snapshot_version: SnapshotVersion,
maximum_full_snapshot_archives_to_retain: usize,
@ -2335,7 +2345,7 @@ pub fn package_and_archive_incremental_snapshot(
bank_snapshots_dir: impl AsRef<Path>,
full_snapshot_archives_dir: impl AsRef<Path>,
incremental_snapshot_archives_dir: impl AsRef<Path>,
snapshot_storages: SnapshotStorages,
snapshot_storages: SnapshotStoragesOne,
archive_format: ArchiveFormat,
snapshot_version: SnapshotVersion,
maximum_full_snapshot_archives_to_retain: usize,

View File

@ -1,5 +1,5 @@
use {
crate::accounts_db::SnapshotStorage,
crate::accounts_db::SnapshotStorageOne,
log::*,
solana_measure::measure::Measure,
solana_sdk::clock::Slot,
@ -11,7 +11,7 @@ pub struct SortedStorages<'a> {
/// range of slots where storages exist (likely sparse)
range: Range<Slot>,
/// the actual storages. index is (slot - range.start)
storages: Vec<Option<&'a SnapshotStorage>>,
storages: Vec<Option<&'a SnapshotStorageOne>>,
slot_count: usize,
storage_count: usize,
}
@ -35,7 +35,7 @@ impl<'a> SortedStorages<'a> {
SortedStoragesIter::new(self, range)
}
fn get(&self, slot: Slot) -> Option<&SnapshotStorage> {
fn get(&self, slot: Slot) -> Option<&SnapshotStorageOne> {
if !self.range.contains(&slot) {
None
} else {
@ -67,11 +67,8 @@ impl<'a> SortedStorages<'a> {
// assumptions:
// 1. each SnapshotStorage.!is_empty()
// 2. SnapshotStorage.first().unwrap().get_slot() is unique from all other SnapshotStorage items.
pub fn new(source: &'a [SnapshotStorage]) -> Self {
let slots = source.iter().map(|storages| {
let first = storages.first();
assert!(first.is_some(), "SnapshotStorage.is_empty()");
let storage = first.unwrap();
pub fn new(source: &'a [SnapshotStorageOne]) -> Self {
let slots = source.iter().map(|storage| {
storage.slot() // this must be unique. Will be enforced in new_with_slots
});
Self::new_with_slots(source.iter().zip(slots.into_iter()), None, None)
@ -81,7 +78,7 @@ impl<'a> SortedStorages<'a> {
/// 'source' contains a SnapshotStorage and its associated slot
/// 'source' does not have to be sorted in any way, but is assumed to not have duplicate slot #s
pub fn new_with_slots(
source: impl Iterator<Item = (&'a SnapshotStorage, Slot)> + Clone,
source: impl Iterator<Item = (&'a SnapshotStorageOne, Slot)> + Clone,
// A slot used as a lower bound, but potentially smaller than the smallest slot in the given 'source' iterator
min_slot: Option<Slot>,
// highest valid slot. Only matters if source array does not contain a slot >= max_slot_inclusive.
@ -108,8 +105,8 @@ impl<'a> SortedStorages<'a> {
let mut time = Measure::start("get slot");
let source_ = source.clone();
let mut storage_count = 0;
source_.for_each(|(storages, slot)| {
storage_count += storages.len();
source_.for_each(|(_, slot)| {
storage_count += 1;
slot_count += 1;
adjust_min_max(slot);
});
@ -157,7 +154,7 @@ pub struct SortedStoragesIter<'a> {
}
impl<'a> Iterator for SortedStoragesIter<'a> {
type Item = (Slot, Option<&'a SnapshotStorage>);
type Item = (Slot, Option<&'a SnapshotStorageOne>);
fn next(&mut self) -> Option<Self::Item> {
let slot = self.next_slot;
@ -206,9 +203,16 @@ impl<'a> SortedStoragesIter<'a> {
#[cfg(test)]
pub mod tests {
use super::*;
use {
super::*,
crate::{
accounts_db::{AccountStorageEntry, AppendVecId, SnapshotStorageOne},
append_vec::AppendVec,
},
std::sync::Arc,
};
impl<'a> SortedStorages<'a> {
pub fn new_debug(source: &[(&'a SnapshotStorage, Slot)], min: Slot, len: usize) -> Self {
pub fn new_debug(source: &[(&'a SnapshotStorageOne, Slot)], min: Slot, len: usize) -> Self {
let mut storages = vec![None; len];
let range = Range {
start: min,
@ -227,7 +231,7 @@ pub mod tests {
}
}
pub fn new_for_tests(storages: &[&'a SnapshotStorage], slots: &[Slot]) -> Self {
pub fn new_for_tests(storages: &[&'a SnapshotStorageOne], slots: &[Slot]) -> Self {
assert_eq!(storages.len(), slots.len());
SortedStorages::new_with_slots(
storages.iter().cloned().zip(slots.iter().cloned()),
@ -240,7 +244,7 @@ pub mod tests {
#[test]
fn test_sorted_storages_range_iter() {
let storages = SortedStorages::empty();
let check = |(slot, storages): (Slot, Option<&SnapshotStorage>)| {
let check = |(slot, storages): (Slot, Option<&SnapshotStorageOne>)| {
assert!(storages.is_none());
slot
};
@ -262,9 +266,9 @@ pub mod tests {
);
// only item is slot 3
let s1 = Vec::new();
let s1 = create_sample_store(1);
let storages = SortedStorages::new_for_tests(&[&s1], &[3]);
let check = |(slot, storages): (Slot, Option<&SnapshotStorage>)| {
let check = |(slot, storages): (Slot, Option<&SnapshotStorageOne>)| {
assert!(
(slot != 3) ^ storages.is_some(),
"slot: {slot}, storages: {storages:?}"
@ -296,16 +300,17 @@ pub mod tests {
);
// items in slots 2 and 4
let s2 = Vec::with_capacity(2);
let s4 = Vec::with_capacity(4);
let storages = SortedStorages::new_for_tests(&[&s2, &s4], &[2, 4]);
let check = |(slot, storages): (Slot, Option<&SnapshotStorage>)| {
let store2 = create_sample_store(2);
let store4 = create_sample_store(4);
let storages = SortedStorages::new_for_tests(&[&store2, &store4], &[2, 4]);
let check = |(slot, storage): (Slot, Option<&SnapshotStorageOne>)| {
assert!(
(slot != 2 && slot != 4)
^ storages
.map(|storages| storages.capacity() == (slot as usize))
^ storage
.map(|storage| storage.append_vec_id() == (slot as AppendVecId))
.unwrap_or(false),
"slot: {slot}, storages: {storages:?}"
"slot: {slot}, storage: {storage:?}"
);
slot
};
@ -334,16 +339,11 @@ pub mod tests {
);
}
#[test]
#[should_panic(expected = "SnapshotStorage.is_empty()")]
fn test_sorted_storages_empty() {
SortedStorages::new(&[Vec::new()]);
}
#[test]
#[should_panic(expected = "slots are not unique")]
fn test_sorted_storages_duplicate_slots() {
SortedStorages::new_for_tests(&[&Vec::new(), &Vec::new()], &[0, 0]);
let store = create_sample_store(1);
SortedStorages::new_for_tests(&[&store, &store], &[0, 0]);
}
#[test]
@ -357,10 +357,9 @@ pub mod tests {
#[test]
fn test_sorted_storages_1() {
let vec = vec![];
let vec_check = vec.clone();
let store = create_sample_store(1);
let slot = 4;
let vecs = [&vec];
let vecs = [&store];
let result = SortedStorages::new_for_tests(&vecs, &[slot]);
assert_eq!(
result.range,
@ -371,15 +370,30 @@ pub mod tests {
);
assert_eq!(result.slot_count, 1);
assert_eq!(result.storages.len(), 1);
assert_eq!(result.get(slot).unwrap().len(), vec_check.len());
assert_eq!(
result.get(slot).unwrap().append_vec_id(),
store.append_vec_id()
);
}
fn create_sample_store(id: AppendVecId) -> SnapshotStorageOne {
let tf = crate::append_vec::test_utils::get_append_vec_path("create_sample_store");
let (_temp_dirs, paths) = crate::accounts_db::get_temp_accounts_paths(1).unwrap();
let size: usize = 123;
let slot = 0;
let mut data = AccountStorageEntry::new(&paths[0], slot, id, size as u64);
let av = AppendVec::new(&tf.path, true, 1024 * 1024);
data.accounts = av;
Arc::new(data)
}
#[test]
fn test_sorted_storages_2() {
let vec = vec![];
let vec_check = vec.clone();
let store = create_sample_store(1);
let store2 = create_sample_store(2);
let slots = [4, 7];
let vecs = [&vec, &vec];
let vecs = [&store, &store2];
let result = SortedStorages::new_for_tests(&vecs, &slots);
assert_eq!(
result.range,
@ -395,7 +409,13 @@ pub mod tests {
assert!(result.get(5).is_none());
assert!(result.get(6).is_none());
assert!(result.get(8).is_none());
assert_eq!(result.get(slots[0]).unwrap().len(), vec_check.len());
assert_eq!(result.get(slots[1]).unwrap().len(), vec_check.len());
assert_eq!(
result.get(slots[0]).unwrap().append_vec_id(),
store.append_vec_id()
);
assert_eq!(
result.get(slots[1]).unwrap().append_vec_id(),
store2.append_vec_id()
);
}
}