Add new replay metrics for replay blockstore_into_bank and complete (#25717)

This commit is contained in:
sakridge 2022-06-03 11:45:27 -06:00 committed by GitHub
parent 88299e72b8
commit 447a3239e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 10 deletions

View File

@ -36,7 +36,13 @@ impl std::ops::DerefMut for ReplaySlotStats {
} }
impl ReplaySlotStats { impl ReplaySlotStats {
pub fn report_stats(&self, slot: Slot, num_entries: usize, num_shreds: u64) { pub fn report_stats(
&self,
slot: Slot,
num_entries: usize,
num_shreds: u64,
bank_complete_time_us: u64,
) {
datapoint_info!( datapoint_info!(
"replay-slot-stats", "replay-slot-stats",
("slot", slot as i64, i64), ("slot", slot as i64, i64),
@ -104,6 +110,7 @@ impl ReplaySlotStats {
.index(ExecuteTimingType::UpdateStakesCacheUs), .index(ExecuteTimingType::UpdateStakesCacheUs),
i64 i64
), ),
("bank_complete_time_us", bank_complete_time_us, i64),
( (
"total_batches_len", "total_batches_len",
*self *self

View File

@ -173,6 +173,7 @@ pub struct ReplayTiming {
generate_new_bank_forks_get_slots_since_us: u64, generate_new_bank_forks_get_slots_since_us: u64,
generate_new_bank_forks_loop_us: u64, generate_new_bank_forks_loop_us: u64,
generate_new_bank_forks_write_lock_us: u64, generate_new_bank_forks_write_lock_us: u64,
replay_blockstore_us: u64,
} }
impl ReplayTiming { impl ReplayTiming {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -332,6 +333,11 @@ impl ReplayTiming {
self.generate_new_bank_forks_write_lock_us as i64, self.generate_new_bank_forks_write_lock_us as i64,
i64 i64
), ),
(
"replay_blockstore_us",
self.replay_blockstore_us as i64,
i64
),
); );
*self = ReplayTiming::default(); *self = ReplayTiming::default();
self.last_print = now; self.last_print = now;
@ -493,6 +499,7 @@ impl ReplayStage {
&ancestor_hashes_replay_update_sender, &ancestor_hashes_replay_update_sender,
block_metadata_notifier.clone(), block_metadata_notifier.clone(),
transaction_cost_metrics_sender.as_ref(), transaction_cost_metrics_sender.as_ref(),
&mut replay_timing,
); );
replay_active_banks_time.stop(); replay_active_banks_time.stop();
@ -2177,6 +2184,7 @@ impl ReplayStage {
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
block_metadata_notifier: Option<BlockMetadataNotifierLock>, block_metadata_notifier: Option<BlockMetadataNotifierLock>,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>, transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
replay_timing: &mut ReplayTiming,
) -> bool { ) -> bool {
let mut did_complete_bank = false; let mut did_complete_bank = false;
let mut tx_count = 0; let mut tx_count = 0;
@ -2220,6 +2228,7 @@ impl ReplayStage {
}); });
if bank.collector_id() != my_pubkey { if bank.collector_id() != my_pubkey {
let root_slot = bank_forks.read().unwrap().root(); let root_slot = bank_forks.read().unwrap().root();
let mut replay_blockstore_time = Measure::start("replay_blockstore_into_bank");
let replay_result = Self::replay_blockstore_into_bank( let replay_result = Self::replay_blockstore_into_bank(
&bank, &bank,
blockstore, blockstore,
@ -2229,6 +2238,8 @@ impl ReplayStage {
transaction_cost_metrics_sender, transaction_cost_metrics_sender,
verify_recyclers, verify_recyclers,
); );
replay_blockstore_time.stop();
replay_timing.replay_blockstore_us += replay_blockstore_time.as_us();
match replay_result { match replay_result {
Ok(replay_tx_count) => tx_count += replay_tx_count, Ok(replay_tx_count) => tx_count += replay_tx_count,
Err(err) => { Err(err) => {
@ -2255,17 +2266,13 @@ impl ReplayStage {
} }
assert_eq!(*bank_slot, bank.slot()); assert_eq!(*bank_slot, bank.slot());
if bank.is_complete() { if bank.is_complete() {
let mut bank_complete_time = Measure::start("bank_complete_time");
execute_timings.accumulate(&bank_progress.replay_stats.execute_timings); execute_timings.accumulate(&bank_progress.replay_stats.execute_timings);
debug!("bank {} is completed replay from blockstore, contribute to update cost with {:?}", debug!("bank {} is completed replay from blockstore, contribute to update cost with {:?}",
bank.slot(), bank.slot(),
bank_progress.replay_stats.execute_timings bank_progress.replay_stats.execute_timings
); );
bank_progress.replay_stats.report_stats(
bank.slot(),
bank_progress.replay_progress.num_entries,
bank_progress.replay_progress.num_shreds,
);
did_complete_bank = true; did_complete_bank = true;
info!("bank frozen: {}", bank.slot()); info!("bank frozen: {}", bank.slot());
let _ = cluster_slots_update_sender.send(vec![*bank_slot]); let _ = cluster_slots_update_sender.send(vec![*bank_slot]);
@ -2289,10 +2296,7 @@ impl ReplayStage {
(bank.slot(), bank.hash()), (bank.slot(), bank.hash()),
Some((bank.parent_slot(), bank.parent_hash())), Some((bank.parent_slot(), bank.parent_hash())),
); );
progress bank_progress.fork_stats.bank_hash = Some(bank.hash());
.get_fork_stats_mut(bank.slot())
.expect("All frozen banks must exist in the Progress map")
.bank_hash = Some(bank.hash());
let bank_frozen_state = BankFrozenState::new_from_state( let bank_frozen_state = BankFrozenState::new_from_state(
bank.slot(), bank.slot(),
bank.hash(), bank.hash(),
@ -2343,6 +2347,14 @@ impl ReplayStage {
Some(bank.block_height()), Some(bank.block_height()),
) )
} }
bank_complete_time.stop();
bank_progress.replay_stats.report_stats(
bank.slot(),
bank_progress.replay_progress.num_entries,
bank_progress.replay_progress.num_shreds,
bank_complete_time.as_us(),
);
} else { } else {
trace!( trace!(
"bank {} not completed tick_height: {}, max_tick_height: {}", "bank {} not completed tick_height: {}, max_tick_height: {}",