diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 28e51d3a0f..9526f6d2a6 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -171,6 +171,7 @@ mod tests { None, accounts_db::AccountShrinkThreshold::default(), check_hash_calculation, + false, ) .unwrap(); diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 267f497cdd..7297567238 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -845,6 +845,10 @@ fn main() { .validator(is_slot) .takes_value(true) .help("Halt processing at the given slot"); + let verify_index_arg = Arg::with_name("verify_accounts_index") + .long("verify-accounts-index") + .takes_value(false) + .help("For debugging and tests on accounts index."); let limit_load_slot_count_from_snapshot_arg = Arg::with_name("limit_load_slot_count_from_snapshot") .long("limit-load-slot-count-from-snapshot") .value_name("SLOT") @@ -1121,6 +1125,7 @@ fn main() { .arg(&account_paths_arg) .arg(&halt_at_slot_arg) .arg(&limit_load_slot_count_from_snapshot_arg) + .arg(&verify_index_arg) .arg(&hard_forks_arg) .arg(&no_accounts_db_caching_arg) .arg(&accounts_db_test_hash_calculation_arg) @@ -1845,6 +1850,7 @@ fn main() { usize ) .ok(), + verify_index: arg_matches.is_present("verify_accounts_index"), allow_dead_slots: arg_matches.is_present("allow_dead_slots"), accounts_db_test_hash_calculation: arg_matches .is_present("accounts_db_test_hash_calculation"), diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index dfa85c3900..0761705716 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -140,6 +140,7 @@ fn load_from_snapshot( process_options.limit_load_slot_count_from_snapshot, process_options.shrink_ratio, process_options.accounts_db_test_hash_calculation, + process_options.verify_index, ) .expect("Load from snapshot failed"); if let Some(shrink_paths) = shrink_paths { diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 43f94b31d4..ba29f5b72a 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -377,6 +377,7 @@ pub struct ProcessOptions { pub limit_load_slot_count_from_snapshot: Option, pub allow_dead_slots: bool, pub accounts_db_test_hash_calculation: bool, + pub verify_index: bool, pub shrink_ratio: AccountShrinkThreshold, } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 841bf0a612..c13eff7b32 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -5912,69 +5912,104 @@ impl AccountsDb { } #[allow(clippy::needless_collect)] - pub fn generate_index(&self, limit_load_slot_count_from_snapshot: Option) { + pub fn generate_index(&self, limit_load_slot_count_from_snapshot: Option, verify: bool) { let mut slots = self.storage.all_slots(); #[allow(clippy::stable_sort_primitive)] slots.sort(); if let Some(limit) = limit_load_slot_count_from_snapshot { slots.truncate(limit); // get rid of the newer slots and keep just the older } - let total_processed_slots_across_all_threads = AtomicU64::new(0); - let outer_slots_len = slots.len(); - let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot - let mut index_time = Measure::start("index"); - let insertion_time_us = AtomicU64::new(0); - let scan_time: u64 = slots - .par_chunks(chunk_size) - .map(|slots| { - let mut log_status = MultiThreadProgress::new( - &total_processed_slots_across_all_threads, - 2, - outer_slots_len as u64, - ); - let mut scan_time_sum = 0; - for (index, slot) in slots.iter().enumerate() { - let mut scan_time = Measure::start("scan"); - log_status.report(index as u64); - let storage_maps: Vec> = self - .storage - .get_slot_storage_entries(*slot) - .unwrap_or_default(); - let accounts_map = Self::process_storage_slot(&storage_maps); - scan_time.stop(); - scan_time_sum += scan_time.as_us(); + // pass == 0 always runs and generates the index + // pass == 1 only runs if verify == true. + // verify checks that all the expected items are in the accounts index and measures how long it takes to look them all up + let passes = if verify { 2 } else { 1 }; + for pass in 0..passes { + let total_processed_slots_across_all_threads = AtomicU64::new(0); + let outer_slots_len = slots.len(); + let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot + let mut index_time = Measure::start("index"); + let insertion_time_us = AtomicU64::new(0); + let scan_time: u64 = slots + .par_chunks(chunk_size) + .map(|slots| { + let mut log_status = MultiThreadProgress::new( + &total_processed_slots_across_all_threads, + 2, + outer_slots_len as u64, + ); + let mut scan_time_sum = 0; + for (index, slot) in slots.iter().enumerate() { + let mut scan_time = Measure::start("scan"); + log_status.report(index as u64); + let storage_maps: Vec> = self + .storage + .get_slot_storage_entries(*slot) + .unwrap_or_default(); + let accounts_map = Self::process_storage_slot(&storage_maps); + scan_time.stop(); + scan_time_sum += scan_time.as_us(); - let insert_us = self.generate_index_for_slot(accounts_map, slot); - insertion_time_us.fetch_add(insert_us, Ordering::Relaxed); - } - scan_time_sum - }) - .sum(); - index_time.stop(); + let insert_us = if pass == 0 { + // generate index + self.generate_index_for_slot(accounts_map, slot) + } else { + // verify index matches expected and measure the time to get all items + assert!(verify); + let mut lookup_time = Measure::start("lookup_time"); + for account in accounts_map.into_iter() { + let (key, account_info) = account; + let lock = self.accounts_index.get_account_maps_read_lock(&key); + let x = lock.get(&key).unwrap(); + let sl = x.slot_list.read().unwrap(); + let mut count = 0; + for (slot2, account_info2) in sl.iter() { + if slot2 == slot { + count += 1; + let ai = AccountInfo { + store_id: account_info.1, + offset: account_info.2.offset, + stored_size: account_info.2.stored_size, + lamports: account_info.2.account_meta.lamports, + }; + assert_eq!(&ai, account_info2); + } + } + assert_eq!(1, count); + } + lookup_time.stop(); + lookup_time.as_us() + }; + insertion_time_us.fetch_add(insert_us, Ordering::Relaxed); + } + scan_time_sum + }) + .sum(); + index_time.stop(); - let mut min_bin_size = usize::MAX; - let mut max_bin_size = usize::MIN; - let total_items = self - .accounts_index - .account_maps - .iter() - .map(|i| { - let len = i.read().unwrap().len(); - min_bin_size = std::cmp::min(min_bin_size, len); - max_bin_size = std::cmp::max(max_bin_size, len); - len - }) - .sum(); + let mut min_bin_size = usize::MAX; + let mut max_bin_size = usize::MIN; + let total_items = self + .accounts_index + .account_maps + .iter() + .map(|i| { + let len = i.read().unwrap().len(); + min_bin_size = std::cmp::min(min_bin_size, len); + max_bin_size = std::cmp::max(max_bin_size, len); + len + }) + .sum(); - let timings = GenerateIndexTimings { - scan_time, - index_time: index_time.as_us(), - insertion_time_us: insertion_time_us.load(Ordering::Relaxed), - min_bin_size, - max_bin_size, - total_items, - }; - timings.report(); + let timings = GenerateIndexTimings { + scan_time, + index_time: index_time.as_us(), + insertion_time_us: insertion_time_us.load(Ordering::Relaxed), + min_bin_size, + max_bin_size, + total_items, + }; + timings.report(); + } // Need to add these last, otherwise older updates will be cleaned for slot in slots { diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index cb8b4e8eda..9f1e65a809 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -140,6 +140,7 @@ pub(crate) fn bank_from_stream( caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + verify_index: bool, ) -> std::result::Result where R: Read, @@ -161,6 +162,7 @@ where caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + verify_index, )?; Ok(bank) }}; @@ -252,6 +254,7 @@ fn reconstruct_bank_from_fields( caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + verify_index: bool, ) -> Result where E: SerializableStorage + std::marker::Sync, @@ -265,6 +268,7 @@ where caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + verify_index, )?; accounts_db.freeze_accounts( &Ancestors::from(&bank_fields.ancestors), @@ -314,6 +318,7 @@ fn reconstruct_accountsdb_from_fields( caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + verify_index: bool, ) -> Result where E: SerializableStorage + std::marker::Sync, @@ -396,6 +401,6 @@ where accounts_db .write_version .fetch_add(version, Ordering::Relaxed); - accounts_db.generate_index(limit_load_slot_count_from_snapshot); + accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index); Ok(accounts_db) } diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index c7bf8a9bda..ee1b203afa 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -75,6 +75,7 @@ where false, None, AccountShrinkThreshold::default(), + false, ) } @@ -231,6 +232,7 @@ fn test_bank_serialize_style(serde_style: SerdeStyle) { false, None, AccountShrinkThreshold::default(), + false, ) .unwrap(); dbank.src = ref_sc; diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 15ee53a7a6..99db8683e3 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -645,6 +645,7 @@ pub fn bank_from_snapshot_archive

( limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, test_hash_calculation: bool, + verify_index: bool, ) -> Result<(Bank, BankFromArchiveTimings)> where P: AsRef + std::marker::Sync, @@ -688,6 +689,7 @@ where accounts_db_caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + verify_index, )?; measure.stop(); @@ -945,6 +947,7 @@ fn rebuild_bank_from_snapshots( accounts_db_caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, + verify_index: bool, ) -> Result { let (snapshot_version_enum, root_paths) = verify_snapshot_version_and_folder(snapshot_version, unpacked_snapshots_dir)?; @@ -967,6 +970,7 @@ fn rebuild_bank_from_snapshots( accounts_db_caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, + verify_index, ), }?) })?; @@ -1500,6 +1504,7 @@ mod tests { None, AccountShrinkThreshold::default(), false, + false, ) .unwrap(); @@ -1588,6 +1593,7 @@ mod tests { None, AccountShrinkThreshold::default(), false, + false, ) .unwrap();