diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5bc4ef3e00..9c70a0500a 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -25,7 +25,7 @@ use solana_ledger::{ entry::VerifyRecyclers, leader_schedule_cache::LeaderScheduleCache, }; -use solana_measure::thread_mem_usage; +use solana_measure::{measure::Measure, thread_mem_usage}; use solana_metrics::inc_new_counter_info; use solana_runtime::{ bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache, @@ -37,7 +37,7 @@ use solana_sdk::{ hash::Hash, pubkey::Pubkey, signature::{Keypair, Signer}, - timing::duration_as_ms, + timing::timestamp, transaction::Transaction, }; use solana_vote_program::{ @@ -54,7 +54,7 @@ use std::{ Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, - time::{Duration, Instant}, + time::Duration, }; pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; @@ -110,36 +110,95 @@ pub struct ReplayStageConfig { #[derive(Default)] pub struct ReplayTiming { - num_iterations: u64, + last_print: u64, compute_bank_stats_elapsed: u64, select_vote_and_reset_forks_elapsed: u64, + start_leader_elapsed: u64, + reset_bank_elapsed: u64, + voting_elapsed: u64, + select_forks_elapsed: u64, + compute_slot_stats_elapsed: u64, + generate_new_bank_forks_elapsed: u64, + replay_active_banks_elapsed: u64, + reset_duplicate_slots_elapsed: u64, } impl ReplayTiming { + #[allow(clippy::too_many_arguments)] fn update( &mut self, compute_bank_stats_elapsed: u64, select_vote_and_reset_forks_elapsed: u64, + start_leader_elapsed: u64, + reset_bank_elapsed: u64, + voting_elapsed: u64, + select_forks_elapsed: u64, + compute_slot_stats_elapsed: u64, + generate_new_bank_forks_elapsed: u64, + replay_active_banks_elapsed: u64, + reset_duplicate_slots_elapsed: u64, ) { - self.num_iterations += 1; self.compute_bank_stats_elapsed += compute_bank_stats_elapsed; self.select_vote_and_reset_forks_elapsed += select_vote_and_reset_forks_elapsed; - if self.num_iterations == 100 { + self.start_leader_elapsed += start_leader_elapsed; + self.reset_bank_elapsed += reset_bank_elapsed; + self.voting_elapsed += voting_elapsed; + self.select_forks_elapsed += select_forks_elapsed; + self.compute_slot_stats_elapsed += compute_slot_stats_elapsed; + self.generate_new_bank_forks_elapsed += generate_new_bank_forks_elapsed; + self.replay_active_banks_elapsed += replay_active_banks_elapsed; + self.reset_duplicate_slots_elapsed += reset_duplicate_slots_elapsed; + let now = timestamp(); + let elapsed_ms = now - self.last_print; + if elapsed_ms > 1000 { datapoint_info!( "replay-loop-timing-stats", + ("total_elapsed_us", elapsed_ms * 1000, i64), ( "compute_bank_stats_elapsed", - self.compute_bank_stats_elapsed as i64 / 100, + self.compute_bank_stats_elapsed as i64, i64 ), ( "select_vote_and_reset_forks_elapsed", - self.select_vote_and_reset_forks_elapsed as i64 / 100, + self.select_vote_and_reset_forks_elapsed as i64, + i64 + ), + ( + "start_leader_elapsed", + self.start_leader_elapsed as i64, + i64 + ), + ("reset_bank_elapsed", self.reset_bank_elapsed as i64, i64), + ("voting_elapsed", self.voting_elapsed as i64, i64), + ( + "select_forks_elapsed", + self.select_forks_elapsed as i64, + i64 + ), + ( + "compute_slot_stats_elapsed", + self.compute_slot_stats_elapsed as i64, + i64 + ), + ( + "generate_new_bank_forks_elapsed", + self.generate_new_bank_forks_elapsed as i64, + i64 + ), + ( + "replay_active_banks_elapsed", + self.replay_active_banks_elapsed as i64, + i64 + ), + ( + "reset_duplicate_slots_elapsed", + self.reset_duplicate_slots_elapsed as i64, i64 ), ); - self.num_iterations = 0; - self.compute_bank_stats_elapsed = 0; - self.select_vote_and_reset_forks_elapsed = 0; + + *self = ReplayTiming::default(); + self.last_print = now; } } } @@ -254,6 +313,8 @@ impl ReplayStage { } let start = allocated.get(); + let mut generate_new_bank_forks_time = + Measure::start("generate_new_bank_forks_time"); Self::generate_new_bank_forks( &blockstore, &bank_forks, @@ -263,11 +324,13 @@ impl ReplayStage { &mut progress, &mut all_pubkeys, ); + generate_new_bank_forks_time.stop(); Self::report_memory(&allocated, "generate_new_bank_forks", start); let mut tpu_has_bank = poh_recorder.lock().unwrap().has_bank(); let start = allocated.get(); + let mut replay_active_banks_time = Measure::start("replay_active_banks_time"); let did_complete_bank = Self::replay_active_banks( &blockstore, &bank_forks, @@ -279,8 +342,10 @@ impl ReplayStage { &mut heaviest_subtree_fork_choice, &subscriptions, ); + replay_active_banks_time.stop(); Self::report_memory(&allocated, "replay_active_banks", start); + let mut reset_duplicate_slots_time = Measure::start("reset_duplicate_slots"); let mut ancestors = bank_forks.read().unwrap().ancestors(); let mut descendants = bank_forks.read().unwrap().descendants(); let forks_root = bank_forks.read().unwrap().root(); @@ -296,6 +361,9 @@ impl ReplayStage { &mut progress, &bank_forks, ); + reset_duplicate_slots_time.stop(); + + let mut collect_frozen_banks_time = Measure::start("frozen_banks"); let mut frozen_banks: Vec<_> = bank_forks .read() .unwrap() @@ -304,7 +372,9 @@ impl ReplayStage { .filter(|(slot, _)| *slot >= forks_root) .map(|(_, bank)| bank) .collect(); - let now = Instant::now(); + collect_frozen_banks_time.stop(); + + let mut compute_bank_stats_time = Measure::start("compute_bank_stats"); let newly_computed_slot_stats = Self::compute_bank_stats( &my_pubkey, &ancestors, @@ -318,7 +388,9 @@ impl ReplayStage { &mut heaviest_subtree_fork_choice, &mut bank_weight_fork_choice, ); - let compute_bank_stats_elapsed = now.elapsed().as_micros(); + compute_bank_stats_time.stop(); + + let mut compute_slot_stats_time = Measure::start("compute_slot_stats_time"); for slot in newly_computed_slot_stats { let fork_stats = progress.get_fork_stats(slot).unwrap(); let confirmed_forks = Self::confirm_forks( @@ -337,7 +409,9 @@ impl ReplayStage { .confirmation_reported = true; } } + compute_slot_stats_time.stop(); + let mut select_forks_time = Measure::start("select_forks_time"); let fork_choice: &mut dyn ForkChoice = if forks_root > unlock_heaviest_subtree_fork_choice_slot { &mut heaviest_subtree_fork_choice @@ -346,10 +420,12 @@ impl ReplayStage { }; let (heaviest_bank, heaviest_bank_on_same_voted_fork) = fork_choice .select_forks(&frozen_banks, &tower, &progress, &ancestors, &bank_forks); + select_forks_time.stop(); Self::report_memory(&allocated, "select_fork", start); - let now = Instant::now(); + let mut select_vote_and_reset_forks_time = + Measure::start("select_vote_and_reset_forks"); let SelectVoteAndResetForkResult { vote_bank, reset_bank, @@ -362,11 +438,7 @@ impl ReplayStage { &progress, &tower, ); - let select_vote_and_reset_forks_elapsed = now.elapsed().as_micros(); - replay_timing.update( - compute_bank_stats_elapsed as u64, - select_vote_and_reset_forks_elapsed as u64, - ); + select_vote_and_reset_forks_time.stop(); if tower.is_recent(heaviest_bank.slot()) && !heaviest_fork_failures.is_empty() { info!( @@ -388,6 +460,7 @@ impl ReplayStage { let start = allocated.get(); + let mut voting_time = Measure::start("voting_time"); // Vote on a fork if let Some((ref vote_bank, ref switch_fork_decision)) = vote_bank { if let Some(votable_leader) = @@ -421,10 +494,12 @@ impl ReplayStage { &mut heaviest_subtree_fork_choice, )?; }; + voting_time.stop(); Self::report_memory(&allocated, "votable_bank", start); let start = allocated.get(); + let mut reset_bank_time = Measure::start("reset_bank"); // Reset onto a fork if let Some(reset_bank) = reset_bank { if last_reset != reset_bank.last_blockhash() { @@ -486,14 +561,13 @@ impl ReplayStage { inc_new_counter_info!("replay_stage-partition_resolved", 1); } } - datapoint_debug!( - "replay_stage-memory", - ("reset_bank", (allocated.get() - start) as i64, i64), - ); + Self::report_memory(&allocated, "reset_bank", start); } + reset_bank_time.stop(); Self::report_memory(&allocated, "reset_bank", start); let start = allocated.get(); + let mut start_leader_time = Measure::start("start_leader_time"); if !tpu_has_bank { Self::maybe_start_leader( &my_pubkey, @@ -517,11 +591,22 @@ impl ReplayStage { ); } } + start_leader_time.stop(); Self::report_memory(&allocated, "start_leader", start); - datapoint_debug!( - "replay_stage", - ("duration", duration_as_ms(&now.elapsed()) as i64, i64) + + replay_timing.update( + compute_bank_stats_time.as_us(), + select_vote_and_reset_forks_time.as_us(), + start_leader_time.as_us(), + reset_bank_time.as_us(), + voting_time.as_us(), + select_forks_time.as_us(), + compute_slot_stats_time.as_us(), + generate_new_bank_forks_time.as_us(), + replay_active_banks_time.as_us(), + reset_duplicate_slots_time.as_us(), ); + if did_complete_bank { //just processed a bank, skip the signal; maybe there's more slots available continue;