More replay stage timing metrics (#10828)

This commit is contained in:
sakridge 2020-06-28 10:04:15 -07:00 committed by GitHub
parent ea30c157e0
commit 17a2128a8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 111 additions and 26 deletions

View File

@ -25,7 +25,7 @@ use solana_ledger::{
entry::VerifyRecyclers,
leader_schedule_cache::LeaderScheduleCache,
};
use solana_measure::thread_mem_usage;
use solana_measure::{measure::Measure, thread_mem_usage};
use solana_metrics::inc_new_counter_info;
use solana_runtime::{
bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache,
@ -37,7 +37,7 @@ use solana_sdk::{
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::duration_as_ms,
timing::timestamp,
transaction::Transaction,
};
use solana_vote_program::{
@ -54,7 +54,7 @@ use std::{
Arc, Mutex, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
time::Duration,
};
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
@ -110,36 +110,95 @@ pub struct ReplayStageConfig {
#[derive(Default)]
pub struct ReplayTiming {
num_iterations: u64,
last_print: u64,
compute_bank_stats_elapsed: u64,
select_vote_and_reset_forks_elapsed: u64,
start_leader_elapsed: u64,
reset_bank_elapsed: u64,
voting_elapsed: u64,
select_forks_elapsed: u64,
compute_slot_stats_elapsed: u64,
generate_new_bank_forks_elapsed: u64,
replay_active_banks_elapsed: u64,
reset_duplicate_slots_elapsed: u64,
}
impl ReplayTiming {
#[allow(clippy::too_many_arguments)]
fn update(
&mut self,
compute_bank_stats_elapsed: u64,
select_vote_and_reset_forks_elapsed: u64,
start_leader_elapsed: u64,
reset_bank_elapsed: u64,
voting_elapsed: u64,
select_forks_elapsed: u64,
compute_slot_stats_elapsed: u64,
generate_new_bank_forks_elapsed: u64,
replay_active_banks_elapsed: u64,
reset_duplicate_slots_elapsed: u64,
) {
self.num_iterations += 1;
self.compute_bank_stats_elapsed += compute_bank_stats_elapsed;
self.select_vote_and_reset_forks_elapsed += select_vote_and_reset_forks_elapsed;
if self.num_iterations == 100 {
self.start_leader_elapsed += start_leader_elapsed;
self.reset_bank_elapsed += reset_bank_elapsed;
self.voting_elapsed += voting_elapsed;
self.select_forks_elapsed += select_forks_elapsed;
self.compute_slot_stats_elapsed += compute_slot_stats_elapsed;
self.generate_new_bank_forks_elapsed += generate_new_bank_forks_elapsed;
self.replay_active_banks_elapsed += replay_active_banks_elapsed;
self.reset_duplicate_slots_elapsed += reset_duplicate_slots_elapsed;
let now = timestamp();
let elapsed_ms = now - self.last_print;
if elapsed_ms > 1000 {
datapoint_info!(
"replay-loop-timing-stats",
("total_elapsed_us", elapsed_ms * 1000, i64),
(
"compute_bank_stats_elapsed",
self.compute_bank_stats_elapsed as i64 / 100,
self.compute_bank_stats_elapsed as i64,
i64
),
(
"select_vote_and_reset_forks_elapsed",
self.select_vote_and_reset_forks_elapsed as i64 / 100,
self.select_vote_and_reset_forks_elapsed as i64,
i64
),
(
"start_leader_elapsed",
self.start_leader_elapsed as i64,
i64
),
("reset_bank_elapsed", self.reset_bank_elapsed as i64, i64),
("voting_elapsed", self.voting_elapsed as i64, i64),
(
"select_forks_elapsed",
self.select_forks_elapsed as i64,
i64
),
(
"compute_slot_stats_elapsed",
self.compute_slot_stats_elapsed as i64,
i64
),
(
"generate_new_bank_forks_elapsed",
self.generate_new_bank_forks_elapsed as i64,
i64
),
(
"replay_active_banks_elapsed",
self.replay_active_banks_elapsed as i64,
i64
),
(
"reset_duplicate_slots_elapsed",
self.reset_duplicate_slots_elapsed as i64,
i64
),
);
self.num_iterations = 0;
self.compute_bank_stats_elapsed = 0;
self.select_vote_and_reset_forks_elapsed = 0;
*self = ReplayTiming::default();
self.last_print = now;
}
}
}
@ -254,6 +313,8 @@ impl ReplayStage {
}
let start = allocated.get();
let mut generate_new_bank_forks_time =
Measure::start("generate_new_bank_forks_time");
Self::generate_new_bank_forks(
&blockstore,
&bank_forks,
@ -263,11 +324,13 @@ impl ReplayStage {
&mut progress,
&mut all_pubkeys,
);
generate_new_bank_forks_time.stop();
Self::report_memory(&allocated, "generate_new_bank_forks", start);
let mut tpu_has_bank = poh_recorder.lock().unwrap().has_bank();
let start = allocated.get();
let mut replay_active_banks_time = Measure::start("replay_active_banks_time");
let did_complete_bank = Self::replay_active_banks(
&blockstore,
&bank_forks,
@ -279,8 +342,10 @@ impl ReplayStage {
&mut heaviest_subtree_fork_choice,
&subscriptions,
);
replay_active_banks_time.stop();
Self::report_memory(&allocated, "replay_active_banks", start);
let mut reset_duplicate_slots_time = Measure::start("reset_duplicate_slots");
let mut ancestors = bank_forks.read().unwrap().ancestors();
let mut descendants = bank_forks.read().unwrap().descendants();
let forks_root = bank_forks.read().unwrap().root();
@ -296,6 +361,9 @@ impl ReplayStage {
&mut progress,
&bank_forks,
);
reset_duplicate_slots_time.stop();
let mut collect_frozen_banks_time = Measure::start("frozen_banks");
let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
@ -304,7 +372,9 @@ impl ReplayStage {
.filter(|(slot, _)| *slot >= forks_root)
.map(|(_, bank)| bank)
.collect();
let now = Instant::now();
collect_frozen_banks_time.stop();
let mut compute_bank_stats_time = Measure::start("compute_bank_stats");
let newly_computed_slot_stats = Self::compute_bank_stats(
&my_pubkey,
&ancestors,
@ -318,7 +388,9 @@ impl ReplayStage {
&mut heaviest_subtree_fork_choice,
&mut bank_weight_fork_choice,
);
let compute_bank_stats_elapsed = now.elapsed().as_micros();
compute_bank_stats_time.stop();
let mut compute_slot_stats_time = Measure::start("compute_slot_stats_time");
for slot in newly_computed_slot_stats {
let fork_stats = progress.get_fork_stats(slot).unwrap();
let confirmed_forks = Self::confirm_forks(
@ -337,7 +409,9 @@ impl ReplayStage {
.confirmation_reported = true;
}
}
compute_slot_stats_time.stop();
let mut select_forks_time = Measure::start("select_forks_time");
let fork_choice: &mut dyn ForkChoice =
if forks_root > unlock_heaviest_subtree_fork_choice_slot {
&mut heaviest_subtree_fork_choice
@ -346,10 +420,12 @@ impl ReplayStage {
};
let (heaviest_bank, heaviest_bank_on_same_voted_fork) = fork_choice
.select_forks(&frozen_banks, &tower, &progress, &ancestors, &bank_forks);
select_forks_time.stop();
Self::report_memory(&allocated, "select_fork", start);
let now = Instant::now();
let mut select_vote_and_reset_forks_time =
Measure::start("select_vote_and_reset_forks");
let SelectVoteAndResetForkResult {
vote_bank,
reset_bank,
@ -362,11 +438,7 @@ impl ReplayStage {
&progress,
&tower,
);
let select_vote_and_reset_forks_elapsed = now.elapsed().as_micros();
replay_timing.update(
compute_bank_stats_elapsed as u64,
select_vote_and_reset_forks_elapsed as u64,
);
select_vote_and_reset_forks_time.stop();
if tower.is_recent(heaviest_bank.slot()) && !heaviest_fork_failures.is_empty() {
info!(
@ -388,6 +460,7 @@ impl ReplayStage {
let start = allocated.get();
let mut voting_time = Measure::start("voting_time");
// Vote on a fork
if let Some((ref vote_bank, ref switch_fork_decision)) = vote_bank {
if let Some(votable_leader) =
@ -421,10 +494,12 @@ impl ReplayStage {
&mut heaviest_subtree_fork_choice,
)?;
};
voting_time.stop();
Self::report_memory(&allocated, "votable_bank", start);
let start = allocated.get();
let mut reset_bank_time = Measure::start("reset_bank");
// Reset onto a fork
if let Some(reset_bank) = reset_bank {
if last_reset != reset_bank.last_blockhash() {
@ -486,14 +561,13 @@ impl ReplayStage {
inc_new_counter_info!("replay_stage-partition_resolved", 1);
}
}
datapoint_debug!(
"replay_stage-memory",
("reset_bank", (allocated.get() - start) as i64, i64),
);
Self::report_memory(&allocated, "reset_bank", start);
}
reset_bank_time.stop();
Self::report_memory(&allocated, "reset_bank", start);
let start = allocated.get();
let mut start_leader_time = Measure::start("start_leader_time");
if !tpu_has_bank {
Self::maybe_start_leader(
&my_pubkey,
@ -517,11 +591,22 @@ impl ReplayStage {
);
}
}
start_leader_time.stop();
Self::report_memory(&allocated, "start_leader", start);
datapoint_debug!(
"replay_stage",
("duration", duration_as_ms(&now.elapsed()) as i64, i64)
replay_timing.update(
compute_bank_stats_time.as_us(),
select_vote_and_reset_forks_time.as_us(),
start_leader_time.as_us(),
reset_bank_time.as_us(),
voting_time.as_us(),
select_forks_time.as_us(),
compute_slot_stats_time.as_us(),
generate_new_bank_forks_time.as_us(),
replay_active_banks_time.as_us(),
reset_duplicate_slots_time.as_us(),
);
if did_complete_bank {
//just processed a bank, skip the signal; maybe there's more slots available
continue;