From 544b9745c2c8bbbea14ceeadb84a7df4d0396cd0 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Wed, 11 Jan 2023 14:05:15 -0600 Subject: [PATCH] snapshot storage path uses 1 append vec per slot (#29627) --- core/src/snapshot_packager_service.rs | 2 +- core/tests/snapshots.rs | 1 - runtime/src/accounts_db.rs | 233 +++++++++----------------- runtime/src/accounts_hash.rs | 18 +- runtime/src/bank.rs | 4 +- runtime/src/serde_snapshot/tests.rs | 18 +- runtime/src/snapshot_minimizer.rs | 23 ++- runtime/src/snapshot_package.rs | 25 +-- runtime/src/snapshot_utils.rs | 28 +++- runtime/src/sorted_storages.rs | 102 ++++++----- 10 files changed, 199 insertions(+), 255 deletions(-) diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 050b9d73e3..2c86efa35a 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -328,7 +328,7 @@ mod tests { block_height: slot, slot_deltas: vec![], snapshot_links: link_snapshots_dir, - snapshot_storages: vec![storage_entries], + snapshot_storages: storage_entries, snapshot_version: SnapshotVersion::default(), snapshot_type: SnapshotType::FullSnapshot, }; diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 017184a4e3..1e65308d66 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -442,7 +442,6 @@ fn test_concurrent_snapshot_packaging( let snapshot_storage_files: HashSet<_> = bank_forks[slot] .get_snapshot_storages(None) .into_iter() - .flatten() .map(|s| s.get_path()) .collect(); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 1d41271a69..aa363a29b9 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -610,7 +610,10 @@ impl<'a> MultiThreadProgress<'a> { pub type AtomicAppendVecId = AtomicU32; pub type AppendVecId = u32; pub type SnapshotStorage = Vec>; +pub type SnapshotStorageOne = Arc; pub type SnapshotStorages = Vec; +/// exactly 1 append vec per slot +pub type SnapshotStoragesOne = SnapshotStorage; // Each slot has a set of storage entries. pub(crate) type SlotStores = Arc>>>; @@ -1515,9 +1518,9 @@ impl SplitAncientStorages { ) -> Vec { let range = snapshot_storages.range(); let mut ancient_slots = Vec::default(); - for (slot, storages) in snapshot_storages.iter_range(&(range.start..one_epoch_old_slot)) { - if let Some(storages) = storages { - if storages.len() == 1 && is_ancient(&storages.first().unwrap().accounts) { + for (slot, storage) in snapshot_storages.iter_range(&(range.start..one_epoch_old_slot)) { + if let Some(storage) = storage { + if is_ancient(&storage.accounts) { ancient_slots.push(slot); continue; // was ancient, keep looking } @@ -6819,7 +6822,7 @@ impl AccountsDb { } /// iterate over a single storage, calling scanner on each item - fn scan_single_account_storage(storage: &Arc, scanner: &mut S) + fn scan_single_account_storage(storage: &SnapshotStorageOne, scanner: &mut S) where S: AppendVecScan, { @@ -6830,10 +6833,8 @@ impl AccountsDb { }); } - fn scan_multiple_account_storages_one_slot( - storages: &[Arc], - scanner: &mut S, - ) where + fn scan_multiple_account_storages_one_slot(storages: &[SnapshotStorageOne], scanner: &mut S) + where S: AppendVecScan, { let mut len = storages.len(); @@ -6897,23 +6898,15 @@ impl AccountsDb { } } - fn update_old_slot_stats(&self, stats: &HashStats, sub_storages: Option<&SnapshotStorage>) { - if let Some(sub_storages) = sub_storages { + fn update_old_slot_stats(&self, stats: &HashStats, storage: Option<&Arc>) { + if let Some(storage) = storage { stats.roots_older_than_epoch.fetch_add(1, Ordering::Relaxed); let mut ancients = 0; - let num_accounts = sub_storages - .iter() - .map(|storage| { - if is_ancient(&storage.accounts) { - ancients += 1; - } - storage.count() - }) - .sum(); - let sizes = sub_storages - .iter() - .map(|storage| storage.total_bytes()) - .sum::(); + let num_accounts = storage.count(); + if is_ancient(&storage.accounts) { + ancients += 1; + } + let sizes = storage.total_bytes(); stats .append_vec_sizes_older_than_epoch .fetch_add(sizes as usize, Ordering::Relaxed); @@ -6961,16 +6954,12 @@ impl AccountsDb { /// return true iff storages are valid for loading from cache fn hash_storage_info( hasher: &mut impl StdHasher, - storages: Option<&SnapshotStorage>, + storage: Option<&SnapshotStorageOne>, slot: Slot, ) -> bool { - if let Some(sub_storages) = storages { - if sub_storages.len() > 1 { - // Having > 1 appendvecs per slot is not expected. If we have that, we just fail to load from the cache for this slot. - return false; - } + if let Some(sub_storage) = storage { // hash info about this storage - let append_vec = sub_storages.first().unwrap(); + let append_vec = sub_storage; append_vec.written_bytes().hash(hasher); let storage_file = append_vec.accounts.get_path(); slot.hash(hasher); @@ -7074,12 +7063,10 @@ impl AccountsDb { let mut init_accum = true; // load from cache failed, so create the cache file for this chunk - for (slot, sub_storages) in snapshot_storages.iter_range(&range_this_chunk) { + for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) { let mut ancient = false; - let (_, scan) = measure!(if let Some(sub_storages) = sub_storages { - if let Some(storage) = sub_storages.first() { - ancient = is_ancient(&storage.accounts); - } + let (_, scan) = measure!(if let Some(storage) = storage { + ancient = is_ancient(&storage.accounts); if init_accum { let range = bin_range.end - bin_range.start; scanner.init_accum(range); @@ -7087,7 +7074,10 @@ impl AccountsDb { } scanner.set_slot(slot); - Self::scan_multiple_account_storages_one_slot(sub_storages, &mut scanner); + Self::scan_multiple_account_storages_one_slot( + &[Arc::clone(storage)], + &mut scanner, + ); }); if ancient { stats @@ -7129,18 +7119,16 @@ impl AccountsDb { let acceptable_straggler_slot_count = 100; // do nothing special for these old stores which will likely get cleaned up shortly let sub = slots_per_epoch + acceptable_straggler_slot_count; let in_epoch_range_start = max.saturating_sub(sub); - for (slot, storages) in storages.iter_range(&(..in_epoch_range_start)) { - if let Some(storages) = storages { - storages.iter().for_each(|store| { - if !is_ancient(&store.accounts) { - // ancient stores are managed separately - we expect them to be old and keeping accounts - // We can expect the normal processes will keep them cleaned. - // If we included them here then ALL accounts in ALL ancient append vecs will be visited by clean each time. - self.dirty_stores - .insert((slot, store.append_vec_id()), store.clone()); - num_dirty_slots += 1; - } - }); + for (slot, store) in storages.iter_range(&(..in_epoch_range_start)) { + if let Some(store) = store { + if !is_ancient(&store.accounts) { + // ancient stores are managed separately - we expect them to be old and keeping accounts + // We can expect the normal processes will keep them cleaned. + // If we included them here then ALL accounts in ALL ancient append vecs will be visited by clean each time. + self.dirty_stores + .insert((slot, store.append_vec_id()), store.clone()); + num_dirty_slots += 1; + } } } mark_time.stop(); @@ -8398,7 +8386,7 @@ impl AccountsDb { &self, requested_slots: impl RangeBounds + Sync, ancestors: Option<&Ancestors>, - ) -> (SnapshotStorages, Vec) { + ) -> (SnapshotStoragesOne, Vec) { let mut m = Measure::start("get slots"); let slots_and_storages = self .storage @@ -8427,16 +8415,15 @@ impl AccountsDb { .unwrap_or_default() }) .filter_map(|(slot, storages)| { - let storages = storages + storages .read() .unwrap() .values() + .next() .filter(|x| x.has_accounts()) - .cloned() - .collect::>(); - (!storages.is_empty()).then_some((storages, *slot)) + .map(|storage| (Arc::clone(storage), *slot)) }) - .collect::>() + .collect::>() }) .collect::>() }); @@ -9700,7 +9687,7 @@ pub mod tests { fn sample_storages_and_account_in_slot( slot: Slot, accounts: &AccountsDb, - ) -> (SnapshotStorages, Vec) { + ) -> (SnapshotStoragesOne, Vec) { let pubkey0 = Pubkey::new(&[0u8; 32]); let pubkey127 = Pubkey::new(&[0x7fu8; 32]); let pubkey128 = Pubkey::new(&[0x80u8; 32]); @@ -9754,21 +9741,19 @@ pub mod tests { storages .iter() .zip(slots.iter()) - .for_each(|(storages, slot)| { - for storage in storages { - assert_eq!(&storage.slot(), slot); - } + .for_each(|(storage, slot)| { + assert_eq!(&storage.slot(), slot); }); (storages, raw_expected) } fn sample_storages_and_accounts( accounts: &AccountsDb, - ) -> (SnapshotStorages, Vec) { + ) -> (SnapshotStoragesOne, Vec) { sample_storages_and_account_in_slot(1, accounts) } - fn get_storage_refs(input: &[SnapshotStorage]) -> SortedStorages { + fn get_storage_refs(input: &[SnapshotStorageOne]) -> SortedStorages { SortedStorages::new(input) } @@ -10109,14 +10094,14 @@ pub mod tests { assert_eq!(result, (expected_accounts_hash, sum)); } - fn sample_storage() -> (SnapshotStorages, usize, Slot) { + fn sample_storage() -> (SnapshotStoragesOne, usize, Slot) { let (_temp_dirs, paths) = get_temp_accounts_paths(1).unwrap(); let slot_expected: Slot = 0; let size: usize = 123; let data = AccountStorageEntry::new(&paths[0], slot_expected, 0, size as u64); let arc = Arc::new(data); - let storages = vec![vec![arc]]; + let storages = vec![arc]; (storages, size, slot_expected) } @@ -10175,10 +10160,10 @@ pub mod tests { data.accounts = av; let arc = Arc::new(data); - let storages = vec![vec![arc]]; + let storages = vec![arc]; let pubkey = solana_sdk::pubkey::new_rand(); let acc = AccountSharedData::new(1, 48, AccountSharedData::default().owner()); - append_single_account_with_default_hash(&storages[0][0].accounts, &pubkey, &acc, 1); + append_single_account_with_default_hash(&storages[0].accounts, &pubkey, &acc, 1); let calls = Arc::new(AtomicU64::new(0)); let temp_dir = TempDir::new().unwrap(); @@ -10261,10 +10246,10 @@ pub mod tests { data.accounts = av; let arc = Arc::new(data); - let storages = vec![vec![arc]]; + let storages = vec![arc]; let pubkey = solana_sdk::pubkey::new_rand(); let acc = AccountSharedData::new(1, 48, AccountSharedData::default().owner()); - append_single_account_with_default_hash(&storages[0][0].accounts, &pubkey, &acc, 1); + append_single_account_with_default_hash(&storages[0].accounts, &pubkey, &acc, 1); let calls = Arc::new(AtomicU64::new(0)); @@ -10277,29 +10262,7 @@ pub mod tests { value_to_use_for_lamports: expected, }; - AccountsDb::scan_multiple_account_storages_one_slot(&storages[0], &mut test_scan); - let accum = test_scan.scanning_complete(); - assert_eq!(calls.load(Ordering::Relaxed), 1); - assert_eq!( - accum - .iter() - .flatten() - .map(|a| a.lamports) - .collect::>(), - vec![expected] - ); - - let calls = Arc::new(AtomicU64::new(0)); - let mut test_scan = TestScan { - calls: calls.clone(), - pubkey, - slot_expected, - accum: Vec::default(), - current_slot: 0, - value_to_use_for_lamports: expected, - }; - - AccountsDb::scan_single_account_storage(&storages[0][0], &mut test_scan); + AccountsDb::scan_multiple_account_storages_one_slot(&storages, &mut test_scan); let accum = test_scan.scanning_complete(); assert_eq!(calls.load(Ordering::Relaxed), 1); assert_eq!( @@ -10313,17 +10276,12 @@ pub mod tests { } fn append_sample_data_to_storage( - storages: &SnapshotStorages, + storages: &SnapshotStoragesOne, pubkey: &Pubkey, write_version: StoredMetaWriteVersion, ) { let acc = AccountSharedData::new(1, 48, AccountSharedData::default().owner()); - append_single_account_with_default_hash( - &storages[0][0].accounts, - pubkey, - &acc, - write_version, - ); + append_single_account_with_default_hash(&storages[0].accounts, pubkey, &acc, write_version); } fn sample_storage_with_entries( @@ -10331,7 +10289,7 @@ pub mod tests { write_version: StoredMetaWriteVersion, slot: Slot, pubkey: &Pubkey, - ) -> SnapshotStorages { + ) -> SnapshotStoragesOne { sample_storage_with_entries_id(tf, write_version, slot, pubkey, 0) } @@ -10341,7 +10299,7 @@ pub mod tests { slot: Slot, pubkey: &Pubkey, id: AppendVecId, - ) -> SnapshotStorages { + ) -> SnapshotStoragesOne { let (_temp_dirs, paths) = get_temp_accounts_paths(1).unwrap(); let size: usize = 123; let mut data = AccountStorageEntry::new(&paths[0], slot, id, size as u64); @@ -10349,7 +10307,7 @@ pub mod tests { data.accounts = av; let arc = Arc::new(data); - let storages = vec![vec![arc]]; + let storages = vec![arc]; append_sample_data_to_storage(&storages, pubkey, write_version); storages } @@ -10368,12 +10326,8 @@ pub mod tests { let pubkey2 = solana_sdk::pubkey::new_rand(); for swap in [false, true].iter() { let mut storages = [ - sample_storage_with_entries(&tf, write_version1, slot_expected, &pubkey1) - .remove(0) - .remove(0), - sample_storage_with_entries(&tf, write_version2, slot_expected, &pubkey2) - .remove(0) - .remove(0), + sample_storage_with_entries(&tf, write_version1, slot_expected, &pubkey1).remove(0), + sample_storage_with_entries(&tf, write_version2, slot_expected, &pubkey2).remove(0), ]; if *swap { storages[..].swap(0, 1); @@ -12907,11 +12861,7 @@ pub mod tests { accounts.store_for_tests(current_slot, &[(&pubkey3, &zero_lamport_account)]); let snapshot_stores = accounts.get_snapshot_storages(..=current_slot, None).0; - let total_accounts: usize = snapshot_stores - .iter() - .flatten() - .map(|s| s.all_accounts().len()) - .sum(); + let total_accounts: usize = snapshot_stores.iter().map(|s| s.all_accounts().len()).sum(); assert!(!snapshot_stores.is_empty()); assert!(total_accounts > 0); @@ -12924,11 +12874,8 @@ pub mod tests { accounts.print_accounts_stats("Post-D clean"); - let total_accounts_post_clean: usize = snapshot_stores - .iter() - .flatten() - .map(|s| s.all_accounts().len()) - .sum(); + let total_accounts_post_clean: usize = + snapshot_stores.iter().map(|s| s.all_accounts().len()).sum(); assert_eq!(total_accounts, total_accounts_post_clean); // should clean all 3 pubkeys @@ -16645,7 +16592,7 @@ pub mod tests { let id = 1; let size = 1; let non_ancient_storage = Arc::new(AccountStorageEntry::new(path, slot2, id, size)); - let raw_storages = vec![vec![non_ancient_storage.clone()]]; + let raw_storages = vec![non_ancient_storage.clone()]; let snapshot_storages = SortedStorages::new(&raw_storages); // test without an ancient append vec let one_epoch_old_slot = 0; @@ -16658,7 +16605,7 @@ pub mod tests { assert_eq!(Vec::::default(), ancient_slots); // now test with an ancient append vec - let raw_storages = vec![vec![ancient.clone()]]; + let raw_storages = vec![ancient.clone()]; let snapshot_storages = SortedStorages::new(&raw_storages); let one_epoch_old_slot = 0; let ancient_slots = @@ -16670,7 +16617,7 @@ pub mod tests { assert_eq!(vec![slot1_ancient], ancient_slots); // now test with an ancient append vec and then a non-ancient append vec - let raw_storages = vec![vec![ancient.clone()], vec![non_ancient_storage.clone()]]; + let raw_storages = vec![ancient.clone(), non_ancient_storage.clone()]; let snapshot_storages = SortedStorages::new(&raw_storages); let one_epoch_old_slot = 0; let ancient_slots = @@ -16683,9 +16630,9 @@ pub mod tests { // ancient, non-ancient, ancient let raw_storages = vec![ - vec![ancient.clone()], - vec![non_ancient_storage.clone()], - vec![ancient3.new_storage().clone()], + ancient.clone(), + non_ancient_storage.clone(), + ancient3.new_storage().clone(), ]; let snapshot_storages = SortedStorages::new(&raw_storages); let one_epoch_old_slot = 0; @@ -16700,10 +16647,10 @@ pub mod tests { if sparse { // ancient, ancient, non-ancient, ancient let raw_storages = vec![ - vec![Arc::clone(&ancient)], - vec![Arc::clone(&ancient_1_plus)], - vec![non_ancient_storage], - vec![Arc::clone(ancient3.new_storage())], + Arc::clone(&ancient), + Arc::clone(&ancient_1_plus), + non_ancient_storage, + Arc::clone(ancient3.new_storage()), ]; let snapshot_storages = SortedStorages::new(&raw_storages[..]); let one_epoch_old_slot = 0; @@ -16768,24 +16715,6 @@ pub mod tests { // can't assert hash here - it is a function of mod date assert!(load); } - { - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - let slot: Slot = 0; - let slot1 = 1; - let tf = crate::append_vec::test_utils::get_append_vec_path( - "test_accountsdb_scan_account_storage_no_bank", - ); - let write_version1 = 0; - let pubkey1 = solana_sdk::pubkey::new_rand(); - let mut storages = sample_storage_with_entries(&tf, write_version1, slot, &pubkey1); - let mut storages2 = sample_storage_with_entries(&tf, write_version1, slot1, &pubkey1); - storages[0].push(storages2[0].remove(0)); - - let load = AccountsDb::hash_storage_info(&mut hasher, Some(&storages[0]), slot); - let _ = hasher.finish(); - // cannot load because we have 2 storages - assert!(!load); - } } #[test] @@ -17460,7 +17389,7 @@ pub mod tests { for i in 0..num_slots { let id = starting_id + (i as AppendVecId); let pubkey1 = solana_sdk::pubkey::new_rand(); - let storages = sample_storage_with_entries_id( + let storage = sample_storage_with_entries_id( tf, write_version1, starting_slot + (i as Slot), @@ -17469,7 +17398,7 @@ pub mod tests { ) .pop() .unwrap(); - insert_store(db, Arc::clone(&storages[0])); + insert_store(db, Arc::clone(&storage)); } let storage = db.get_storage_for_slot(starting_slot).unwrap(); @@ -17586,13 +17515,13 @@ pub mod tests { ); let write_version1 = 0; let pubkey1 = solana_sdk::pubkey::new_rand(); - let storages = sample_storage_with_entries(&tf, write_version1, slot5, &pubkey1) + let storage = sample_storage_with_entries(&tf, write_version1, slot5, &pubkey1) .pop() .unwrap(); let mut current_ancient = CurrentAncientAppendVec::default(); let should_move = db.should_move_to_ancient_append_vec( - &storages[0], + &storage, &mut current_ancient, slot5, CAN_RANDOMLY_SHRINK_FALSE, @@ -17601,9 +17530,9 @@ pub mod tests { // slot is not ancient, so it is good to move assert!(should_move); - current_ancient = CurrentAncientAppendVec::new(slot5, Arc::clone(&storages[0])); // just 'some', contents don't matter + current_ancient = CurrentAncientAppendVec::new(slot5, Arc::clone(&storage)); // just 'some', contents don't matter let should_move = db.should_move_to_ancient_append_vec( - &storages[0], + &storage, &mut current_ancient, slot5, CAN_RANDOMLY_SHRINK_FALSE, @@ -17611,7 +17540,7 @@ pub mod tests { // should have kept the same 'current_ancient' assert_eq!(current_ancient.slot(), slot5); assert_eq!(current_ancient.append_vec().slot(), slot5); - assert_eq!(current_ancient.append_vec_id(), storages[0].append_vec_id()); + assert_eq!(current_ancient.append_vec_id(), storage.append_vec_id()); // slot is not ancient, so it is good to move assert!(should_move); @@ -17731,7 +17660,7 @@ pub mod tests { } fn make_ancient_append_vec_full(ancient: &Arc) { - let vecs = vec![vec![ancient.clone()]]; + let vecs = vec![ancient.clone()]; for _ in 0..100 { append_sample_data_to_storage(&vecs, &Pubkey::default(), 0); } diff --git a/runtime/src/accounts_hash.rs b/runtime/src/accounts_hash.rs index c56fda81f8..e28f8ffa0b 100644 --- a/runtime/src/accounts_hash.rs +++ b/runtime/src/accounts_hash.rs @@ -1,6 +1,6 @@ use { crate::{ - accounts_db::{SnapshotStorages, PUBKEY_BINS_FOR_CALCULATING_HASHES}, + accounts_db::{SnapshotStoragesOne, PUBKEY_BINS_FOR_CALCULATING_HASHES}, ancestors::Ancestors, rent_collector::RentCollector, }, @@ -173,20 +173,14 @@ pub struct HashStats { pub count_ancient_scans: AtomicU64, } impl HashStats { - pub fn calc_storage_size_quartiles(&mut self, storages: &SnapshotStorages) { + pub fn calc_storage_size_quartiles(&mut self, storages: &SnapshotStoragesOne) { let mut sum = 0; let mut sizes = storages .iter() - .flat_map(|storages| { - let result = storages - .iter() - .map(|storage| { - let cap = storage.accounts.capacity() as usize; - sum += cap; - cap - }) - .collect::>(); - result + .map(|storage| { + let cap = storage.accounts.capacity() as usize; + sum += cap; + cap }) .collect::>(); sizes.sort_unstable(); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 68beefec70..a557c39a5a 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -45,7 +45,7 @@ use { }, accounts_db::{ AccountShrinkThreshold, AccountsDbConfig, CalcAccountsHashDataSource, - IncludeSlotInHash, SnapshotStorages, ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS, + IncludeSlotInHash, SnapshotStoragesOne, ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS, ACCOUNTS_DB_CONFIG_FOR_TESTING, }, accounts_hash::AccountsHash, @@ -6848,7 +6848,7 @@ impl Bank { /// Get this bank's storages to use for snapshots. /// /// If a base slot is provided, return only the storages that are *higher* than this slot. - pub fn get_snapshot_storages(&self, base_slot: Option) -> SnapshotStorages { + pub fn get_snapshot_storages(&self, base_slot: Option) -> SnapshotStoragesOne { // if a base slot is provided, request storages starting at the slot *after* let start_slot = base_slot.map_or(0, |slot| slot.saturating_add(1)); // we want to *include* the storage at our slot diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index 5b549fa714..14f07313bc 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -13,7 +13,7 @@ use { bank::{Bank, BankTestConfig}, epoch_accounts_hash, genesis_utils::{self, activate_all_features, activate_feature}, - snapshot_utils::ArchiveFormat, + snapshot_utils::{get_storages_to_serialize, ArchiveFormat}, status_cache::StatusCache, }, bincode::serialize_into, @@ -43,7 +43,7 @@ fn copy_append_vecs>( let storage_entries = accounts_db.get_snapshot_storages(RangeFull, None).0; let storage: AccountStorageMap = AccountStorageMap::with_capacity(storage_entries.len()); let mut next_append_vec_id = 0; - for storage_entry in storage_entries.into_iter().flatten() { + for storage_entry in storage_entries.into_iter() { // Copy file to new directory let storage_path = storage_entry.get_path(); let file_name = AppendVec::file_name(storage_entry.slot(), storage_entry.append_vec_id()); @@ -187,7 +187,7 @@ fn test_accounts_serialize_style(serde_style: SerdeStyle) { &mut writer, &accounts.accounts_db, 0, - &accounts.accounts_db.get_snapshot_storages(..=0, None).0, + &get_storages_to_serialize(&accounts.accounts_db.get_snapshot_storages(..=0, None).0), ) .unwrap(); @@ -281,7 +281,7 @@ fn test_bank_serialize_style( serde_style, &mut std::io::BufWriter::new(&mut writer), &bank2, - &snapshot_storages, + &get_storages_to_serialize(&snapshot_storages), ) .unwrap(); @@ -431,7 +431,7 @@ pub(crate) fn reconstruct_accounts_db_via_serialization( &mut writer, accounts, slot, - &snapshot_storages, + &get_storages_to_serialize(&snapshot_storages), ) .unwrap(); @@ -516,11 +516,12 @@ fn test_extra_fields_eof() { let snapshot_storages = bank.get_snapshot_storages(None); let mut buf = vec![]; let mut writer = Cursor::new(&mut buf); + crate::serde_snapshot::bank_to_stream( SerdeStyle::Newer, &mut std::io::BufWriter::new(&mut writer), &bank, - &snapshot_storages, + &get_storages_to_serialize(&snapshot_storages), ) .unwrap(); @@ -643,11 +644,12 @@ fn test_blank_extra_fields() { let snapshot_storages = bank.get_snapshot_storages(None); let mut buf = vec![]; let mut writer = Cursor::new(&mut buf); + crate::serde_snapshot::bank_to_stream_no_extra_fields( SerdeStyle::Newer, &mut std::io::BufWriter::new(&mut writer), &bank, - &snapshot_storages, + &get_storages_to_serialize(&snapshot_storages), ) .unwrap(); @@ -713,7 +715,7 @@ mod test_bank_serialize { (SerializableBankAndStorage:: { bank, - snapshot_storages: &snapshot_storages, + snapshot_storages: &get_storages_to_serialize(&snapshot_storages), phantom: std::marker::PhantomData::default(), }) .serialize(s) diff --git a/runtime/src/snapshot_minimizer.rs b/runtime/src/snapshot_minimizer.rs index c3cfda2ea4..de27747561 100644 --- a/runtime/src/snapshot_minimizer.rs +++ b/runtime/src/snapshot_minimizer.rs @@ -3,7 +3,8 @@ use { crate::{ accounts_db::{ - AccountsDb, GetUniqueAccountsResult, PurgeStats, SnapshotStorage, StoreReclaims, + AccountsDb, GetUniqueAccountsResult, PurgeStats, SnapshotStorage, SnapshotStorageOne, + StoreReclaims, }, bank::Bank, builtins, static_ids, @@ -282,11 +283,11 @@ impl<'a> SnapshotMinimizer<'a> { let dead_slots = Mutex::new(Vec::new()); let dead_storages = Mutex::new(Vec::new()); - snapshot_storages.into_par_iter().for_each(|storages| { - let slot = storages.first().unwrap().slot(); + snapshot_storages.into_par_iter().for_each(|storage| { + let slot = storage.slot(); if slot != self.starting_slot { if minimized_slot_set.contains(&slot) { - self.filter_storages(storages, &dead_storages); + self.filter_storage(&storage, &dead_storages); } else { dead_slots.lock().unwrap().push(slot); } @@ -299,13 +300,11 @@ impl<'a> SnapshotMinimizer<'a> { } /// Creates new storage replacing `storages` that contains only accounts in `minimized_account_set`. - fn filter_storages(&self, storages: SnapshotStorage, dead_storages: &Mutex) { - let slot = storages.first().unwrap().slot(); + fn filter_storage(&self, storage: &SnapshotStorageOne, dead_storages: &Mutex) { + let slot = storage.slot(); let GetUniqueAccountsResult { stored_accounts, .. - } = self - .accounts_db() - .get_unique_accounts_from_storage(storages.first().unwrap()); + } = self.accounts_db().get_unique_accounts_from_storage(storage); let keep_accounts_collect = Mutex::new(Vec::with_capacity(stored_accounts.len())); let purge_pubkeys_collect = Mutex::new(Vec::with_capacity(stored_accounts.len())); @@ -678,10 +677,8 @@ mod tests { assert_eq!(snapshot_storages.len(), 3); let mut account_count = 0; - snapshot_storages.into_iter().for_each(|storages| { - storages.into_iter().for_each(|storage| { - account_count += storage.accounts.account_iter().count(); - }); + snapshot_storages.into_iter().for_each(|storage| { + account_count += storage.accounts.account_iter().count(); }); assert_eq!( diff --git a/runtime/src/snapshot_package.rs b/runtime/src/snapshot_package.rs index c556dad4f6..54fdbb46e9 100644 --- a/runtime/src/snapshot_package.rs +++ b/runtime/src/snapshot_package.rs @@ -1,7 +1,7 @@ use { crate::{ accounts::Accounts, - accounts_db::SnapshotStorages, + accounts_db::SnapshotStoragesOne, accounts_hash::AccountsHash, bank::{Bank, BankSlotDelta}, epoch_accounts_hash::EpochAccountsHash, @@ -36,7 +36,7 @@ pub struct AccountsPackage { pub package_type: AccountsPackageType, pub slot: Slot, pub block_height: Slot, - pub snapshot_storages: SnapshotStorages, + pub snapshot_storages: SnapshotStoragesOne, pub expected_capitalization: u64, pub accounts_hash_for_testing: Option, pub accounts: Arc, @@ -62,7 +62,7 @@ impl AccountsPackage { slot_deltas: Vec, full_snapshot_archives_dir: impl AsRef, incremental_snapshot_archives_dir: impl AsRef, - snapshot_storages: SnapshotStorages, + snapshot_storages: SnapshotStoragesOne, archive_format: ArchiveFormat, snapshot_version: SnapshotVersion, accounts_hash_for_testing: Option, @@ -125,7 +125,7 @@ impl AccountsPackage { pub fn new_for_epoch_accounts_hash( package_type: AccountsPackageType, bank: &Bank, - snapshot_storages: SnapshotStorages, + snapshot_storages: SnapshotStoragesOne, accounts_hash_for_testing: Option, ) -> Self { assert_eq!(package_type, AccountsPackageType::EpochAccountsHash); @@ -141,7 +141,7 @@ impl AccountsPackage { fn _new( package_type: AccountsPackageType, bank: &Bank, - snapshot_storages: SnapshotStorages, + snapshot_storages: SnapshotStoragesOne, accounts_hash_for_testing: Option, snapshot_info: Option, ) -> Self { @@ -167,7 +167,7 @@ impl AccountsPackage { package_type: AccountsPackageType::AccountsHashVerifier, slot: Slot::default(), block_height: Slot::default(), - snapshot_storages: SnapshotStorages::default(), + snapshot_storages: SnapshotStoragesOne::default(), expected_capitalization: u64::default(), accounts_hash_for_testing: Option::default(), accounts: Arc::new(Accounts::default_for_tests()), @@ -239,7 +239,7 @@ pub struct SnapshotPackage { pub block_height: Slot, pub slot_deltas: Vec, pub snapshot_links: TempDir, - pub snapshot_storages: SnapshotStorages, + pub snapshot_storages: SnapshotStoragesOne, pub snapshot_version: SnapshotVersion, pub snapshot_type: SnapshotType, } @@ -263,16 +263,9 @@ impl SnapshotPackage { snapshot_info.archive_format, ), SnapshotType::IncrementalSnapshot(incremental_snapshot_base_slot) => { - snapshot_storages.retain(|storages| { - storages - .first() // storages are grouped by slot in the outer Vec, so all storages will have the same slot as the first - .map(|storage| storage.slot() > incremental_snapshot_base_slot) - .unwrap_or_default() - }); + snapshot_storages.retain(|storage| storage.slot() > incremental_snapshot_base_slot); assert!( - snapshot_storages.iter().all(|storage| storage - .iter() - .all(|entry| entry.slot() > incremental_snapshot_base_slot)), + snapshot_storages.iter().all(|storage| storage.slot() > incremental_snapshot_base_slot), "Incremental snapshot package must only contain storage entries where slot > incremental snapshot base slot (i.e. full snapshot slot)!" ); snapshot_utils::build_incremental_snapshot_archive_path( diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 9c6d2117ee..fec0bab4bd 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -3,7 +3,7 @@ use { account_storage::AccountStorageMap, accounts_db::{ AccountShrinkThreshold, AccountsDbConfig, AtomicAppendVecId, - CalcAccountsHashDataSource, SnapshotStorage, SnapshotStorages, + CalcAccountsHashDataSource, SnapshotStorageOne, SnapshotStorages, SnapshotStoragesOne, }, accounts_index::AccountSecondaryIndexes, accounts_update_notifier_interface::AccountsUpdateNotifier, @@ -446,7 +446,7 @@ pub fn archive_snapshot_package( .map_err(|e| SnapshotError::IoWithSource(e, "create staging symlinks"))?; // Add the AppendVecs into the compressible list - for storage in snapshot_package.snapshot_storages.iter().flatten() { + for storage in snapshot_package.snapshot_storages.iter() { storage.flush()?; let storage_path = storage.get_path(); let output_path = staging_accounts_dir.join(crate::append_vec::AppendVec::file_name( @@ -847,7 +847,7 @@ where pub fn add_bank_snapshot( bank_snapshots_dir: impl AsRef, bank: &Bank, - snapshot_storages: &[SnapshotStorage], + snapshot_storages: &[SnapshotStorageOne], snapshot_version: SnapshotVersion, ) -> Result { let mut add_snapshot_time = Measure::start("add-snapshot-ms"); @@ -871,7 +871,8 @@ pub fn add_bank_snapshot( let serde_style = match snapshot_version { SnapshotVersion::V1_2_0 => SerdeStyle::Newer, }; - bank_to_stream(serde_style, stream.by_ref(), bank, snapshot_storages)?; + let serialize = get_storages_to_serialize(snapshot_storages); + bank_to_stream(serde_style, stream.by_ref(), bank, &serialize)?; Ok(()) }; let consumed_size = @@ -903,6 +904,17 @@ pub fn add_bank_snapshot( }) } +/// serializing needs Vec>, but data structure at runtime is Vec<...> +/// translates to what we need +pub(crate) fn get_storages_to_serialize( + snapshot_storages: &[SnapshotStorageOne], +) -> SnapshotStorages { + snapshot_storages + .iter() + .map(|storage| vec![Arc::clone(storage)]) + .collect::>() +} + fn serialize_status_cache( slot: Slot, slot_deltas: &[BankSlotDelta], @@ -2165,14 +2177,12 @@ pub fn purge_old_bank_snapshots(bank_snapshots_dir: impl AsRef) { } /// Get the snapshot storages for this bank -pub fn get_snapshot_storages(bank: &Bank) -> SnapshotStorages { +pub fn get_snapshot_storages(bank: &Bank) -> SnapshotStoragesOne { let mut measure_snapshot_storages = Measure::start("snapshot-storages"); let snapshot_storages = bank.get_snapshot_storages(None); measure_snapshot_storages.stop(); - let snapshot_storages_count = snapshot_storages.iter().map(Vec::len).sum::(); datapoint_info!( "get_snapshot_storages", - ("snapshot-storages-count", snapshot_storages_count, i64), ( "snapshot-storages-time-ms", measure_snapshot_storages.as_ms(), @@ -2283,7 +2293,7 @@ pub fn package_and_archive_full_snapshot( bank_snapshots_dir: impl AsRef, full_snapshot_archives_dir: impl AsRef, incremental_snapshot_archives_dir: impl AsRef, - snapshot_storages: SnapshotStorages, + snapshot_storages: SnapshotStoragesOne, archive_format: ArchiveFormat, snapshot_version: SnapshotVersion, maximum_full_snapshot_archives_to_retain: usize, @@ -2335,7 +2345,7 @@ pub fn package_and_archive_incremental_snapshot( bank_snapshots_dir: impl AsRef, full_snapshot_archives_dir: impl AsRef, incremental_snapshot_archives_dir: impl AsRef, - snapshot_storages: SnapshotStorages, + snapshot_storages: SnapshotStoragesOne, archive_format: ArchiveFormat, snapshot_version: SnapshotVersion, maximum_full_snapshot_archives_to_retain: usize, diff --git a/runtime/src/sorted_storages.rs b/runtime/src/sorted_storages.rs index a328a3ba16..fe595ea6c0 100644 --- a/runtime/src/sorted_storages.rs +++ b/runtime/src/sorted_storages.rs @@ -1,5 +1,5 @@ use { - crate::accounts_db::SnapshotStorage, + crate::accounts_db::SnapshotStorageOne, log::*, solana_measure::measure::Measure, solana_sdk::clock::Slot, @@ -11,7 +11,7 @@ pub struct SortedStorages<'a> { /// range of slots where storages exist (likely sparse) range: Range, /// the actual storages. index is (slot - range.start) - storages: Vec>, + storages: Vec>, slot_count: usize, storage_count: usize, } @@ -35,7 +35,7 @@ impl<'a> SortedStorages<'a> { SortedStoragesIter::new(self, range) } - fn get(&self, slot: Slot) -> Option<&SnapshotStorage> { + fn get(&self, slot: Slot) -> Option<&SnapshotStorageOne> { if !self.range.contains(&slot) { None } else { @@ -67,11 +67,8 @@ impl<'a> SortedStorages<'a> { // assumptions: // 1. each SnapshotStorage.!is_empty() // 2. SnapshotStorage.first().unwrap().get_slot() is unique from all other SnapshotStorage items. - pub fn new(source: &'a [SnapshotStorage]) -> Self { - let slots = source.iter().map(|storages| { - let first = storages.first(); - assert!(first.is_some(), "SnapshotStorage.is_empty()"); - let storage = first.unwrap(); + pub fn new(source: &'a [SnapshotStorageOne]) -> Self { + let slots = source.iter().map(|storage| { storage.slot() // this must be unique. Will be enforced in new_with_slots }); Self::new_with_slots(source.iter().zip(slots.into_iter()), None, None) @@ -81,7 +78,7 @@ impl<'a> SortedStorages<'a> { /// 'source' contains a SnapshotStorage and its associated slot /// 'source' does not have to be sorted in any way, but is assumed to not have duplicate slot #s pub fn new_with_slots( - source: impl Iterator + Clone, + source: impl Iterator + Clone, // A slot used as a lower bound, but potentially smaller than the smallest slot in the given 'source' iterator min_slot: Option, // highest valid slot. Only matters if source array does not contain a slot >= max_slot_inclusive. @@ -108,8 +105,8 @@ impl<'a> SortedStorages<'a> { let mut time = Measure::start("get slot"); let source_ = source.clone(); let mut storage_count = 0; - source_.for_each(|(storages, slot)| { - storage_count += storages.len(); + source_.for_each(|(_, slot)| { + storage_count += 1; slot_count += 1; adjust_min_max(slot); }); @@ -157,7 +154,7 @@ pub struct SortedStoragesIter<'a> { } impl<'a> Iterator for SortedStoragesIter<'a> { - type Item = (Slot, Option<&'a SnapshotStorage>); + type Item = (Slot, Option<&'a SnapshotStorageOne>); fn next(&mut self) -> Option { let slot = self.next_slot; @@ -206,9 +203,16 @@ impl<'a> SortedStoragesIter<'a> { #[cfg(test)] pub mod tests { - use super::*; + use { + super::*, + crate::{ + accounts_db::{AccountStorageEntry, AppendVecId, SnapshotStorageOne}, + append_vec::AppendVec, + }, + std::sync::Arc, + }; impl<'a> SortedStorages<'a> { - pub fn new_debug(source: &[(&'a SnapshotStorage, Slot)], min: Slot, len: usize) -> Self { + pub fn new_debug(source: &[(&'a SnapshotStorageOne, Slot)], min: Slot, len: usize) -> Self { let mut storages = vec![None; len]; let range = Range { start: min, @@ -227,7 +231,7 @@ pub mod tests { } } - pub fn new_for_tests(storages: &[&'a SnapshotStorage], slots: &[Slot]) -> Self { + pub fn new_for_tests(storages: &[&'a SnapshotStorageOne], slots: &[Slot]) -> Self { assert_eq!(storages.len(), slots.len()); SortedStorages::new_with_slots( storages.iter().cloned().zip(slots.iter().cloned()), @@ -240,7 +244,7 @@ pub mod tests { #[test] fn test_sorted_storages_range_iter() { let storages = SortedStorages::empty(); - let check = |(slot, storages): (Slot, Option<&SnapshotStorage>)| { + let check = |(slot, storages): (Slot, Option<&SnapshotStorageOne>)| { assert!(storages.is_none()); slot }; @@ -262,9 +266,9 @@ pub mod tests { ); // only item is slot 3 - let s1 = Vec::new(); + let s1 = create_sample_store(1); let storages = SortedStorages::new_for_tests(&[&s1], &[3]); - let check = |(slot, storages): (Slot, Option<&SnapshotStorage>)| { + let check = |(slot, storages): (Slot, Option<&SnapshotStorageOne>)| { assert!( (slot != 3) ^ storages.is_some(), "slot: {slot}, storages: {storages:?}" @@ -296,16 +300,17 @@ pub mod tests { ); // items in slots 2 and 4 - let s2 = Vec::with_capacity(2); - let s4 = Vec::with_capacity(4); - let storages = SortedStorages::new_for_tests(&[&s2, &s4], &[2, 4]); - let check = |(slot, storages): (Slot, Option<&SnapshotStorage>)| { + let store2 = create_sample_store(2); + let store4 = create_sample_store(4); + + let storages = SortedStorages::new_for_tests(&[&store2, &store4], &[2, 4]); + let check = |(slot, storage): (Slot, Option<&SnapshotStorageOne>)| { assert!( (slot != 2 && slot != 4) - ^ storages - .map(|storages| storages.capacity() == (slot as usize)) + ^ storage + .map(|storage| storage.append_vec_id() == (slot as AppendVecId)) .unwrap_or(false), - "slot: {slot}, storages: {storages:?}" + "slot: {slot}, storage: {storage:?}" ); slot }; @@ -334,16 +339,11 @@ pub mod tests { ); } - #[test] - #[should_panic(expected = "SnapshotStorage.is_empty()")] - fn test_sorted_storages_empty() { - SortedStorages::new(&[Vec::new()]); - } - #[test] #[should_panic(expected = "slots are not unique")] fn test_sorted_storages_duplicate_slots() { - SortedStorages::new_for_tests(&[&Vec::new(), &Vec::new()], &[0, 0]); + let store = create_sample_store(1); + SortedStorages::new_for_tests(&[&store, &store], &[0, 0]); } #[test] @@ -357,10 +357,9 @@ pub mod tests { #[test] fn test_sorted_storages_1() { - let vec = vec![]; - let vec_check = vec.clone(); + let store = create_sample_store(1); let slot = 4; - let vecs = [&vec]; + let vecs = [&store]; let result = SortedStorages::new_for_tests(&vecs, &[slot]); assert_eq!( result.range, @@ -371,15 +370,30 @@ pub mod tests { ); assert_eq!(result.slot_count, 1); assert_eq!(result.storages.len(), 1); - assert_eq!(result.get(slot).unwrap().len(), vec_check.len()); + assert_eq!( + result.get(slot).unwrap().append_vec_id(), + store.append_vec_id() + ); + } + + fn create_sample_store(id: AppendVecId) -> SnapshotStorageOne { + let tf = crate::append_vec::test_utils::get_append_vec_path("create_sample_store"); + let (_temp_dirs, paths) = crate::accounts_db::get_temp_accounts_paths(1).unwrap(); + let size: usize = 123; + let slot = 0; + let mut data = AccountStorageEntry::new(&paths[0], slot, id, size as u64); + let av = AppendVec::new(&tf.path, true, 1024 * 1024); + data.accounts = av; + + Arc::new(data) } #[test] fn test_sorted_storages_2() { - let vec = vec![]; - let vec_check = vec.clone(); + let store = create_sample_store(1); + let store2 = create_sample_store(2); let slots = [4, 7]; - let vecs = [&vec, &vec]; + let vecs = [&store, &store2]; let result = SortedStorages::new_for_tests(&vecs, &slots); assert_eq!( result.range, @@ -395,7 +409,13 @@ pub mod tests { assert!(result.get(5).is_none()); assert!(result.get(6).is_none()); assert!(result.get(8).is_none()); - assert_eq!(result.get(slots[0]).unwrap().len(), vec_check.len()); - assert_eq!(result.get(slots[1]).unwrap().len(), vec_check.len()); + assert_eq!( + result.get(slots[0]).unwrap().append_vec_id(), + store.append_vec_id() + ); + assert_eq!( + result.get(slots[1]).unwrap().append_vec_id(), + store2.append_vec_id() + ); } }