metrics for generating index time (#17192)

* metrics for generating index time

* update metrics to include scan time
This commit is contained in:
Jeff Washington (jwash) 2021-05-13 14:32:19 -05:00 committed by GitHub
parent 3e0c0abb53
commit 3dbc7744ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 99 additions and 79 deletions

View File

@ -4962,89 +4962,109 @@ impl AccountsDb {
let total_processed_slots_across_all_threads = AtomicU64::new(0); let total_processed_slots_across_all_threads = AtomicU64::new(0);
let outer_slots_len = slots.len(); let outer_slots_len = slots.len();
let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot
slots.par_chunks(chunk_size).for_each(|slots| { let mut index_time = Measure::start("index");
let mut last_log_update = Instant::now(); let scan_time: u64 = slots
let mut my_last_reported_number_of_processed_slots = 0; .par_chunks(chunk_size)
let mut was_first = false; .map(|slots| {
for (index, slot) in slots.iter().enumerate() { let mut last_log_update = Instant::now();
let now = Instant::now(); let mut my_last_reported_number_of_processed_slots = 0;
if now.duration_since(last_log_update).as_secs() >= 2 { let mut was_first = false;
let my_total_newly_processed_slots_since_last_report = let mut scan_time_sum = 0;
(index as u64) - my_last_reported_number_of_processed_slots; for (index, slot) in slots.iter().enumerate() {
my_last_reported_number_of_processed_slots = index as u64; let mut scan_time = Measure::start("scan");
let previous_total_processed_slots_across_all_threads = let now = Instant::now();
total_processed_slots_across_all_threads.fetch_add( if now.duration_since(last_log_update).as_secs() >= 2 {
my_total_newly_processed_slots_since_last_report, let my_total_newly_processed_slots_since_last_report =
Ordering::Relaxed, (index as u64) - my_last_reported_number_of_processed_slots;
); my_last_reported_number_of_processed_slots = index as u64;
was_first = was_first || 0 == previous_total_processed_slots_across_all_threads; let previous_total_processed_slots_across_all_threads =
if was_first { total_processed_slots_across_all_threads.fetch_add(
info!( my_total_newly_processed_slots_since_last_report,
"generating index: {}/{} slots...", Ordering::Relaxed,
previous_total_processed_slots_across_all_threads
+ my_total_newly_processed_slots_since_last_report,
outer_slots_len
);
}
last_log_update = now;
}
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_storage_entries(*slot)
.unwrap_or_default();
let num_accounts = storage_maps
.iter()
.map(|storage| storage.approx_stored_count())
.sum();
let mut accounts_map: AccountsMap = AccountsMap::with_capacity(num_accounts);
storage_maps.iter().for_each(|storage| {
let accounts = storage.all_accounts();
accounts.into_iter().for_each(|stored_account| {
let entry = accounts_map
.entry(stored_account.meta.pubkey)
.or_insert_with(BTreeMap::new);
assert!(
// There should only be one update per write version for a specific slot
// and account
entry
.insert(
stored_account.meta.write_version,
(storage.append_vec_id(), stored_account)
)
.is_none()
);
})
});
// Need to restore indexes even with older write versions which may
// be shielding other accounts. When they are then purged, the
// original non-shielded account value will be visible when the account
// is restored from the append-vec
if !accounts_map.is_empty() {
let mut _reclaims: Vec<(u64, AccountInfo)> = vec![];
let dirty_keys = accounts_map.iter().map(|(pubkey, _info)| *pubkey).collect();
self.uncleaned_pubkeys.insert(*slot, dirty_keys);
for (pubkey, account_infos) in accounts_map.into_iter() {
for (_, (store_id, stored_account)) in account_infos.into_iter() {
let account_info = AccountInfo {
store_id,
offset: stored_account.offset,
stored_size: stored_account.stored_size,
lamports: stored_account.account_meta.lamports,
};
self.accounts_index.insert_new_if_missing(
*slot,
&pubkey,
&stored_account.account_meta.owner,
&stored_account.data,
&self.account_indexes,
account_info,
&mut _reclaims,
); );
was_first =
was_first || 0 == previous_total_processed_slots_across_all_threads;
if was_first {
info!(
"generating index: {}/{} slots...",
previous_total_processed_slots_across_all_threads
+ my_total_newly_processed_slots_since_last_report,
outer_slots_len
);
}
last_log_update = now;
}
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage
.get_slot_storage_entries(*slot)
.unwrap_or_default();
let num_accounts = storage_maps
.iter()
.map(|storage| storage.approx_stored_count())
.sum();
let mut accounts_map: AccountsMap = AccountsMap::with_capacity(num_accounts);
storage_maps.iter().for_each(|storage| {
let accounts = storage.all_accounts();
accounts.into_iter().for_each(|stored_account| {
let entry = accounts_map
.entry(stored_account.meta.pubkey)
.or_insert_with(BTreeMap::new);
assert!(
// There should only be one update per write version for a specific slot
// and account
entry
.insert(
stored_account.meta.write_version,
(storage.append_vec_id(), stored_account)
)
.is_none()
);
})
});
scan_time.stop();
scan_time_sum += scan_time.as_us();
// Need to restore indexes even with older write versions which may
// be shielding other accounts. When they are then purged, the
// original non-shielded account value will be visible when the account
// is restored from the append-vec
if !accounts_map.is_empty() {
let mut _reclaims: Vec<(u64, AccountInfo)> = vec![];
let dirty_keys =
accounts_map.iter().map(|(pubkey, _info)| *pubkey).collect();
self.uncleaned_pubkeys.insert(*slot, dirty_keys);
for (pubkey, account_infos) in accounts_map.into_iter() {
for (_, (store_id, stored_account)) in account_infos.into_iter() {
let account_info = AccountInfo {
store_id,
offset: stored_account.offset,
stored_size: stored_account.stored_size,
lamports: stored_account.account_meta.lamports,
};
self.accounts_index.insert_new_if_missing(
*slot,
&pubkey,
&stored_account.account_meta.owner,
&stored_account.data,
&self.account_indexes,
account_info,
&mut _reclaims,
);
}
} }
} }
} }
} scan_time_sum
}); })
.sum();
index_time.stop();
datapoint_info!(
"generate_index",
// we cannot accurately measure index insertion time because of many threads and lock contention
("total_us", index_time.as_us(), i64),
("scan_stores_us", scan_time, i64),
);
// Need to add these last, otherwise older updates will be cleaned // Need to add these last, otherwise older updates will be cleaned
for slot in slots { for slot in slots {