add ledger-tool verify verify-accounts-index option (#18375)

* add ledger-tool verify verify-accounts-index option

* comment, merge, respond to feedback, cleanup
This commit is contained in:
Jeff Washington (jwash) 2021-07-13 11:06:18 -05:00 committed by GitHub
parent 3e11468a04
commit d092fa1f03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 112 additions and 55 deletions

View File

@ -171,6 +171,7 @@ mod tests {
None,
accounts_db::AccountShrinkThreshold::default(),
check_hash_calculation,
false,
)
.unwrap();

View File

@ -845,6 +845,10 @@ fn main() {
.validator(is_slot)
.takes_value(true)
.help("Halt processing at the given slot");
let verify_index_arg = Arg::with_name("verify_accounts_index")
.long("verify-accounts-index")
.takes_value(false)
.help("For debugging and tests on accounts index.");
let limit_load_slot_count_from_snapshot_arg = Arg::with_name("limit_load_slot_count_from_snapshot")
.long("limit-load-slot-count-from-snapshot")
.value_name("SLOT")
@ -1121,6 +1125,7 @@ fn main() {
.arg(&account_paths_arg)
.arg(&halt_at_slot_arg)
.arg(&limit_load_slot_count_from_snapshot_arg)
.arg(&verify_index_arg)
.arg(&hard_forks_arg)
.arg(&no_accounts_db_caching_arg)
.arg(&accounts_db_test_hash_calculation_arg)
@ -1845,6 +1850,7 @@ fn main() {
usize
)
.ok(),
verify_index: arg_matches.is_present("verify_accounts_index"),
allow_dead_slots: arg_matches.is_present("allow_dead_slots"),
accounts_db_test_hash_calculation: arg_matches
.is_present("accounts_db_test_hash_calculation"),

View File

@ -140,6 +140,7 @@ fn load_from_snapshot(
process_options.limit_load_slot_count_from_snapshot,
process_options.shrink_ratio,
process_options.accounts_db_test_hash_calculation,
process_options.verify_index,
)
.expect("Load from snapshot failed");
if let Some(shrink_paths) = shrink_paths {

View File

@ -377,6 +377,7 @@ pub struct ProcessOptions {
pub limit_load_slot_count_from_snapshot: Option<usize>,
pub allow_dead_slots: bool,
pub accounts_db_test_hash_calculation: bool,
pub verify_index: bool,
pub shrink_ratio: AccountShrinkThreshold,
}

View File

@ -5912,69 +5912,104 @@ impl AccountsDb {
}
#[allow(clippy::needless_collect)]
pub fn generate_index(&self, limit_load_slot_count_from_snapshot: Option<usize>) {
pub fn generate_index(&self, limit_load_slot_count_from_snapshot: Option<usize>, verify: bool) {
let mut slots = self.storage.all_slots();
#[allow(clippy::stable_sort_primitive)]
slots.sort();
if let Some(limit) = limit_load_slot_count_from_snapshot {
slots.truncate(limit); // get rid of the newer slots and keep just the older
}
let total_processed_slots_across_all_threads = AtomicU64::new(0);
let outer_slots_len = slots.len();
let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot
let mut index_time = Measure::start("index");
let insertion_time_us = AtomicU64::new(0);
let scan_time: u64 = slots
.par_chunks(chunk_size)
.map(|slots| {
let mut log_status = MultiThreadProgress::new(
&total_processed_slots_across_all_threads,
2,
outer_slots_len as u64,
);
let mut scan_time_sum = 0;
for (index, slot) in slots.iter().enumerate() {
let mut scan_time = Measure::start("scan");
log_status.report(index as u64);
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_storage_entries(*slot)
.unwrap_or_default();
let accounts_map = Self::process_storage_slot(&storage_maps);
scan_time.stop();
scan_time_sum += scan_time.as_us();
// pass == 0 always runs and generates the index
// pass == 1 only runs if verify == true.
// verify checks that all the expected items are in the accounts index and measures how long it takes to look them all up
let passes = if verify { 2 } else { 1 };
for pass in 0..passes {
let total_processed_slots_across_all_threads = AtomicU64::new(0);
let outer_slots_len = slots.len();
let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot
let mut index_time = Measure::start("index");
let insertion_time_us = AtomicU64::new(0);
let scan_time: u64 = slots
.par_chunks(chunk_size)
.map(|slots| {
let mut log_status = MultiThreadProgress::new(
&total_processed_slots_across_all_threads,
2,
outer_slots_len as u64,
);
let mut scan_time_sum = 0;
for (index, slot) in slots.iter().enumerate() {
let mut scan_time = Measure::start("scan");
log_status.report(index as u64);
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_storage_entries(*slot)
.unwrap_or_default();
let accounts_map = Self::process_storage_slot(&storage_maps);
scan_time.stop();
scan_time_sum += scan_time.as_us();
let insert_us = self.generate_index_for_slot(accounts_map, slot);
insertion_time_us.fetch_add(insert_us, Ordering::Relaxed);
}
scan_time_sum
})
.sum();
index_time.stop();
let insert_us = if pass == 0 {
// generate index
self.generate_index_for_slot(accounts_map, slot)
} else {
// verify index matches expected and measure the time to get all items
assert!(verify);
let mut lookup_time = Measure::start("lookup_time");
for account in accounts_map.into_iter() {
let (key, account_info) = account;
let lock = self.accounts_index.get_account_maps_read_lock(&key);
let x = lock.get(&key).unwrap();
let sl = x.slot_list.read().unwrap();
let mut count = 0;
for (slot2, account_info2) in sl.iter() {
if slot2 == slot {
count += 1;
let ai = AccountInfo {
store_id: account_info.1,
offset: account_info.2.offset,
stored_size: account_info.2.stored_size,
lamports: account_info.2.account_meta.lamports,
};
assert_eq!(&ai, account_info2);
}
}
assert_eq!(1, count);
}
lookup_time.stop();
lookup_time.as_us()
};
insertion_time_us.fetch_add(insert_us, Ordering::Relaxed);
}
scan_time_sum
})
.sum();
index_time.stop();
let mut min_bin_size = usize::MAX;
let mut max_bin_size = usize::MIN;
let total_items = self
.accounts_index
.account_maps
.iter()
.map(|i| {
let len = i.read().unwrap().len();
min_bin_size = std::cmp::min(min_bin_size, len);
max_bin_size = std::cmp::max(max_bin_size, len);
len
})
.sum();
let mut min_bin_size = usize::MAX;
let mut max_bin_size = usize::MIN;
let total_items = self
.accounts_index
.account_maps
.iter()
.map(|i| {
let len = i.read().unwrap().len();
min_bin_size = std::cmp::min(min_bin_size, len);
max_bin_size = std::cmp::max(max_bin_size, len);
len
})
.sum();
let timings = GenerateIndexTimings {
scan_time,
index_time: index_time.as_us(),
insertion_time_us: insertion_time_us.load(Ordering::Relaxed),
min_bin_size,
max_bin_size,
total_items,
};
timings.report();
let timings = GenerateIndexTimings {
scan_time,
index_time: index_time.as_us(),
insertion_time_us: insertion_time_us.load(Ordering::Relaxed),
min_bin_size,
max_bin_size,
total_items,
};
timings.report();
}
// Need to add these last, otherwise older updates will be cleaned
for slot in slots {

View File

@ -140,6 +140,7 @@ pub(crate) fn bank_from_stream<R>(
caching_enabled: bool,
limit_load_slot_count_from_snapshot: Option<usize>,
shrink_ratio: AccountShrinkThreshold,
verify_index: bool,
) -> std::result::Result<Bank, Error>
where
R: Read,
@ -161,6 +162,7 @@ where
caching_enabled,
limit_load_slot_count_from_snapshot,
shrink_ratio,
verify_index,
)?;
Ok(bank)
}};
@ -252,6 +254,7 @@ fn reconstruct_bank_from_fields<E>(
caching_enabled: bool,
limit_load_slot_count_from_snapshot: Option<usize>,
shrink_ratio: AccountShrinkThreshold,
verify_index: bool,
) -> Result<Bank, Error>
where
E: SerializableStorage + std::marker::Sync,
@ -265,6 +268,7 @@ where
caching_enabled,
limit_load_slot_count_from_snapshot,
shrink_ratio,
verify_index,
)?;
accounts_db.freeze_accounts(
&Ancestors::from(&bank_fields.ancestors),
@ -314,6 +318,7 @@ fn reconstruct_accountsdb_from_fields<E>(
caching_enabled: bool,
limit_load_slot_count_from_snapshot: Option<usize>,
shrink_ratio: AccountShrinkThreshold,
verify_index: bool,
) -> Result<AccountsDb, Error>
where
E: SerializableStorage + std::marker::Sync,
@ -396,6 +401,6 @@ where
accounts_db
.write_version
.fetch_add(version, Ordering::Relaxed);
accounts_db.generate_index(limit_load_slot_count_from_snapshot);
accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index);
Ok(accounts_db)
}

View File

@ -75,6 +75,7 @@ where
false,
None,
AccountShrinkThreshold::default(),
false,
)
}
@ -231,6 +232,7 @@ fn test_bank_serialize_style(serde_style: SerdeStyle) {
false,
None,
AccountShrinkThreshold::default(),
false,
)
.unwrap();
dbank.src = ref_sc;

View File

@ -645,6 +645,7 @@ pub fn bank_from_snapshot_archive<P>(
limit_load_slot_count_from_snapshot: Option<usize>,
shrink_ratio: AccountShrinkThreshold,
test_hash_calculation: bool,
verify_index: bool,
) -> Result<(Bank, BankFromArchiveTimings)>
where
P: AsRef<Path> + std::marker::Sync,
@ -688,6 +689,7 @@ where
accounts_db_caching_enabled,
limit_load_slot_count_from_snapshot,
shrink_ratio,
verify_index,
)?;
measure.stop();
@ -945,6 +947,7 @@ fn rebuild_bank_from_snapshots(
accounts_db_caching_enabled: bool,
limit_load_slot_count_from_snapshot: Option<usize>,
shrink_ratio: AccountShrinkThreshold,
verify_index: bool,
) -> Result<Bank> {
let (snapshot_version_enum, root_paths) =
verify_snapshot_version_and_folder(snapshot_version, unpacked_snapshots_dir)?;
@ -967,6 +970,7 @@ fn rebuild_bank_from_snapshots(
accounts_db_caching_enabled,
limit_load_slot_count_from_snapshot,
shrink_ratio,
verify_index,
),
}?)
})?;
@ -1500,6 +1504,7 @@ mod tests {
None,
AccountShrinkThreshold::default(),
false,
false,
)
.unwrap();
@ -1588,6 +1593,7 @@ mod tests {
None,
AccountShrinkThreshold::default(),
false,
false,
)
.unwrap();