From 3dbc7744ab0c3fd89a42414224ab314f59f70d8f Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Thu, 13 May 2021 14:32:19 -0500 Subject: [PATCH] metrics for generating index time (#17192) * metrics for generating index time * update metrics to include scan time --- runtime/src/accounts_db.rs | 178 +++++++++++++++++++++---------------- 1 file changed, 99 insertions(+), 79 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index b07b40e4f4..8d764ec250 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -4962,89 +4962,109 @@ impl AccountsDb { let total_processed_slots_across_all_threads = AtomicU64::new(0); let outer_slots_len = slots.len(); let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot - slots.par_chunks(chunk_size).for_each(|slots| { - let mut last_log_update = Instant::now(); - let mut my_last_reported_number_of_processed_slots = 0; - let mut was_first = false; - for (index, slot) in slots.iter().enumerate() { - let now = Instant::now(); - if now.duration_since(last_log_update).as_secs() >= 2 { - let my_total_newly_processed_slots_since_last_report = - (index as u64) - my_last_reported_number_of_processed_slots; - my_last_reported_number_of_processed_slots = index as u64; - let previous_total_processed_slots_across_all_threads = - total_processed_slots_across_all_threads.fetch_add( - my_total_newly_processed_slots_since_last_report, - Ordering::Relaxed, - ); - was_first = was_first || 0 == previous_total_processed_slots_across_all_threads; - if was_first { - info!( - "generating index: {}/{} slots...", - previous_total_processed_slots_across_all_threads - + my_total_newly_processed_slots_since_last_report, - outer_slots_len - ); - } - last_log_update = now; - } - let storage_maps: Vec> = self - .storage - .get_slot_storage_entries(*slot) - .unwrap_or_default(); - let num_accounts = storage_maps - .iter() - .map(|storage| storage.approx_stored_count()) - .sum(); - let mut accounts_map: AccountsMap = AccountsMap::with_capacity(num_accounts); - storage_maps.iter().for_each(|storage| { - let accounts = storage.all_accounts(); - accounts.into_iter().for_each(|stored_account| { - let entry = accounts_map - .entry(stored_account.meta.pubkey) - .or_insert_with(BTreeMap::new); - assert!( - // There should only be one update per write version for a specific slot - // and account - entry - .insert( - stored_account.meta.write_version, - (storage.append_vec_id(), stored_account) - ) - .is_none() - ); - }) - }); - // Need to restore indexes even with older write versions which may - // be shielding other accounts. When they are then purged, the - // original non-shielded account value will be visible when the account - // is restored from the append-vec - if !accounts_map.is_empty() { - let mut _reclaims: Vec<(u64, AccountInfo)> = vec![]; - let dirty_keys = accounts_map.iter().map(|(pubkey, _info)| *pubkey).collect(); - self.uncleaned_pubkeys.insert(*slot, dirty_keys); - for (pubkey, account_infos) in accounts_map.into_iter() { - for (_, (store_id, stored_account)) in account_infos.into_iter() { - let account_info = AccountInfo { - store_id, - offset: stored_account.offset, - stored_size: stored_account.stored_size, - lamports: stored_account.account_meta.lamports, - }; - self.accounts_index.insert_new_if_missing( - *slot, - &pubkey, - &stored_account.account_meta.owner, - &stored_account.data, - &self.account_indexes, - account_info, - &mut _reclaims, + let mut index_time = Measure::start("index"); + let scan_time: u64 = slots + .par_chunks(chunk_size) + .map(|slots| { + let mut last_log_update = Instant::now(); + let mut my_last_reported_number_of_processed_slots = 0; + let mut was_first = false; + let mut scan_time_sum = 0; + for (index, slot) in slots.iter().enumerate() { + let mut scan_time = Measure::start("scan"); + let now = Instant::now(); + if now.duration_since(last_log_update).as_secs() >= 2 { + let my_total_newly_processed_slots_since_last_report = + (index as u64) - my_last_reported_number_of_processed_slots; + my_last_reported_number_of_processed_slots = index as u64; + let previous_total_processed_slots_across_all_threads = + total_processed_slots_across_all_threads.fetch_add( + my_total_newly_processed_slots_since_last_report, + Ordering::Relaxed, ); + was_first = + was_first || 0 == previous_total_processed_slots_across_all_threads; + if was_first { + info!( + "generating index: {}/{} slots...", + previous_total_processed_slots_across_all_threads + + my_total_newly_processed_slots_since_last_report, + outer_slots_len + ); + } + last_log_update = now; + } + let storage_maps: Vec> = self + .storage + .get_slot_storage_entries(*slot) + .unwrap_or_default(); + let num_accounts = storage_maps + .iter() + .map(|storage| storage.approx_stored_count()) + .sum(); + let mut accounts_map: AccountsMap = AccountsMap::with_capacity(num_accounts); + storage_maps.iter().for_each(|storage| { + let accounts = storage.all_accounts(); + accounts.into_iter().for_each(|stored_account| { + let entry = accounts_map + .entry(stored_account.meta.pubkey) + .or_insert_with(BTreeMap::new); + assert!( + // There should only be one update per write version for a specific slot + // and account + entry + .insert( + stored_account.meta.write_version, + (storage.append_vec_id(), stored_account) + ) + .is_none() + ); + }) + }); + scan_time.stop(); + scan_time_sum += scan_time.as_us(); + + // Need to restore indexes even with older write versions which may + // be shielding other accounts. When they are then purged, the + // original non-shielded account value will be visible when the account + // is restored from the append-vec + if !accounts_map.is_empty() { + let mut _reclaims: Vec<(u64, AccountInfo)> = vec![]; + let dirty_keys = + accounts_map.iter().map(|(pubkey, _info)| *pubkey).collect(); + self.uncleaned_pubkeys.insert(*slot, dirty_keys); + for (pubkey, account_infos) in accounts_map.into_iter() { + for (_, (store_id, stored_account)) in account_infos.into_iter() { + let account_info = AccountInfo { + store_id, + offset: stored_account.offset, + stored_size: stored_account.stored_size, + lamports: stored_account.account_meta.lamports, + }; + self.accounts_index.insert_new_if_missing( + *slot, + &pubkey, + &stored_account.account_meta.owner, + &stored_account.data, + &self.account_indexes, + account_info, + &mut _reclaims, + ); + } } } } - } - }); + scan_time_sum + }) + .sum(); + index_time.stop(); + + datapoint_info!( + "generate_index", + // we cannot accurately measure index insertion time because of many threads and lock contention + ("total_us", index_time.as_us(), i64), + ("scan_stores_us", scan_time, i64), + ); // Need to add these last, otherwise older updates will be cleaned for slot in slots {