metrics for generate new bank forks (#22492)

* metrics for generate new bank forks

* fixed

* Apply suggestions from code review

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* --fixup

* fixup!

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>
This commit is contained in:
anatoly yakovenko 2022-01-17 08:59:47 -08:00 committed by GitHub
parent cc76a73c49
commit 2d94e6e5d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 59 additions and 1 deletions

View File

@ -59,6 +59,7 @@ use {
genesis_config::ClusterType, genesis_config::ClusterType,
hash::Hash, hash::Hash,
pubkey::Pubkey, pubkey::Pubkey,
saturating_add_assign,
signature::{Keypair, Signature, Signer}, signature::{Keypair, Signature, Signer},
timing::timestamp, timing::timestamp,
transaction::Transaction, transaction::Transaction,
@ -165,6 +166,10 @@ pub struct ReplayTiming {
process_unfrozen_gossip_verified_vote_hashes_elapsed: u64, process_unfrozen_gossip_verified_vote_hashes_elapsed: u64,
repair_correct_slots_elapsed: u64, repair_correct_slots_elapsed: u64,
retransmit_not_propagated_elapsed: u64, retransmit_not_propagated_elapsed: u64,
generate_new_bank_forks_read_lock_us: u64,
generate_new_bank_forks_get_slots_since_us: u64,
generate_new_bank_forks_loop_us: u64,
generate_new_bank_forks_write_lock_us: u64,
} }
impl ReplayTiming { impl ReplayTiming {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -304,8 +309,27 @@ impl ReplayTiming {
self.retransmit_not_propagated_elapsed as i64, self.retransmit_not_propagated_elapsed as i64,
i64 i64
), ),
(
"generate_new_bank_forks_read_lock_us",
self.generate_new_bank_forks_read_lock_us as i64,
i64
),
(
"generate_new_bank_forks_get_slots_since_us",
self.generate_new_bank_forks_get_slots_since_us as i64,
i64
),
(
"generate_new_bank_forks_loop_us",
self.generate_new_bank_forks_loop_us as i64,
i64
),
(
"generate_new_bank_forks_write_lock_us",
self.generate_new_bank_forks_write_lock_us as i64,
i64
),
); );
*self = ReplayTiming::default(); *self = ReplayTiming::default();
self.last_print = now; self.last_print = now;
} }
@ -415,6 +439,7 @@ impl ReplayStage {
&leader_schedule_cache, &leader_schedule_cache,
&rpc_subscriptions, &rpc_subscriptions,
&mut progress, &mut progress,
&mut replay_timing,
); );
generate_new_bank_forks_time.stop(); generate_new_bank_forks_time.stop();
@ -2859,24 +2884,34 @@ impl ReplayStage {
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
rpc_subscriptions: &Arc<RpcSubscriptions>, rpc_subscriptions: &Arc<RpcSubscriptions>,
progress: &mut ProgressMap, progress: &mut ProgressMap,
replay_timing: &mut ReplayTiming,
) { ) {
// Find the next slot that chains to the old slot // Find the next slot that chains to the old slot
let mut generate_new_bank_forks_read_lock =
Measure::start("generate_new_bank_forks_read_lock");
let forks = bank_forks.read().unwrap(); let forks = bank_forks.read().unwrap();
generate_new_bank_forks_read_lock.stop();
let frozen_banks = forks.frozen_banks(); let frozen_banks = forks.frozen_banks();
let frozen_bank_slots: Vec<u64> = frozen_banks let frozen_bank_slots: Vec<u64> = frozen_banks
.keys() .keys()
.cloned() .cloned()
.filter(|s| *s >= forks.root()) .filter(|s| *s >= forks.root())
.collect(); .collect();
let mut generate_new_bank_forks_get_slots_since =
Measure::start("generate_new_bank_forks_get_slots_since");
let next_slots = blockstore let next_slots = blockstore
.get_slots_since(&frozen_bank_slots) .get_slots_since(&frozen_bank_slots)
.expect("Db error"); .expect("Db error");
generate_new_bank_forks_get_slots_since.stop();
// Filter out what we've already seen // Filter out what we've already seen
trace!("generate new forks {:?}", { trace!("generate new forks {:?}", {
let mut next_slots = next_slots.iter().collect::<Vec<_>>(); let mut next_slots = next_slots.iter().collect::<Vec<_>>();
next_slots.sort(); next_slots.sort();
next_slots next_slots
}); });
let mut generate_new_bank_forks_loop = Measure::start("generate_new_bank_forks_loop");
let mut new_banks = HashMap::new(); let mut new_banks = HashMap::new();
for (parent_slot, children) in next_slots { for (parent_slot, children) in next_slots {
let parent_bank = frozen_banks let parent_bank = frozen_banks
@ -2917,11 +2952,31 @@ impl ReplayStage {
} }
} }
drop(forks); drop(forks);
generate_new_bank_forks_loop.stop();
let mut generate_new_bank_forks_write_lock =
Measure::start("generate_new_bank_forks_write_lock");
let mut forks = bank_forks.write().unwrap(); let mut forks = bank_forks.write().unwrap();
for (_, bank) in new_banks { for (_, bank) in new_banks {
forks.insert(bank); forks.insert(bank);
} }
generate_new_bank_forks_write_lock.stop();
saturating_add_assign!(
replay_timing.generate_new_bank_forks_read_lock_us,
generate_new_bank_forks_read_lock.as_us()
);
saturating_add_assign!(
replay_timing.generate_new_bank_forks_get_slots_since_us,
generate_new_bank_forks_get_slots_since.as_us()
);
saturating_add_assign!(
replay_timing.generate_new_bank_forks_loop_us,
generate_new_bank_forks_loop.as_us()
);
saturating_add_assign!(
replay_timing.generate_new_bank_forks_write_lock_us,
generate_new_bank_forks_write_lock.as_us()
);
} }
fn new_bank_from_parent_with_notify( fn new_bank_from_parent_with_notify(
@ -3196,12 +3251,14 @@ pub mod tests {
.unwrap() .unwrap()
.get(NUM_CONSECUTIVE_LEADER_SLOTS) .get(NUM_CONSECUTIVE_LEADER_SLOTS)
.is_none()); .is_none());
let mut replay_timing = ReplayTiming::default();
ReplayStage::generate_new_bank_forks( ReplayStage::generate_new_bank_forks(
&blockstore, &blockstore,
&bank_forks, &bank_forks,
&leader_schedule_cache, &leader_schedule_cache,
&rpc_subscriptions, &rpc_subscriptions,
&mut progress, &mut progress,
&mut replay_timing,
); );
assert!(bank_forks assert!(bank_forks
.read() .read()
@ -3224,6 +3281,7 @@ pub mod tests {
&leader_schedule_cache, &leader_schedule_cache,
&rpc_subscriptions, &rpc_subscriptions,
&mut progress, &mut progress,
&mut replay_timing,
); );
assert!(bank_forks assert!(bank_forks
.read() .read()