From d5efbdb19b0fdf26721c458b413ad800f11dcbd0 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Mon, 27 Jun 2022 08:53:34 -0500 Subject: [PATCH] Add timing measurement for gossip vote txn processing (#26163) * add timing for gossip vote txn processing * fix build * fix too many arg error in clippy * atomic interval --- core/src/cluster_info_vote_listener.rs | 71 ++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index f069cda1a4..02d858a1a1 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -40,6 +40,7 @@ use { pubkey::Pubkey, signature::Signature, slot_hashes, + timing::AtomicInterval, transaction::Transaction, }, std::{ @@ -182,6 +183,47 @@ impl BankSendVotesStats { } } +#[derive(Default)] +struct VoteProcessingTiming { + gossip_txn_processing_time_us: u64, + gossip_slot_confirming_time_us: u64, + last_report: AtomicInterval, +} + +const VOTE_PROCESSING_REPORT_INTERVAL_MS: u64 = 1_000; + +impl VoteProcessingTiming { + fn reset(&mut self) { + self.gossip_slot_confirming_time_us = 0; + self.gossip_slot_confirming_time_us = 0; + } + + fn update(&mut self, vote_txn_processing_time_us: u64, vote_slot_confirming_time_us: u64) { + self.gossip_txn_processing_time_us += vote_txn_processing_time_us; + self.gossip_slot_confirming_time_us += vote_slot_confirming_time_us; + + if self + .last_report + .should_update(VOTE_PROCESSING_REPORT_INTERVAL_MS) + { + datapoint_info!( + "vote-processing-timing", + ( + "vote_txn_processing_us", + self.gossip_txn_processing_time_us as i64, + i64 + ), + ( + "slot_confirming_time_us", + self.gossip_slot_confirming_time_us as i64, + i64 + ), + ); + self.reset(); + } + } +} + pub struct ClusterInfoVoteListener { thread_hdls: Vec>, } @@ -450,6 +492,7 @@ impl ClusterInfoVoteListener { OptimisticConfirmationVerifier::new(bank_forks.read().unwrap().root()); let mut last_process_root = Instant::now(); let cluster_confirmed_slot_sender = Some(cluster_confirmed_slot_sender); + let mut vote_processing_time = Some(VoteProcessingTiming::default()); loop { if exit.load(Ordering::Relaxed) { return Ok(()); @@ -480,6 +523,7 @@ impl ClusterInfoVoteListener { &replay_votes_receiver, &bank_notification_sender, &cluster_confirmed_slot_sender, + &mut vote_processing_time, ); match confirmed_slots { Ok(confirmed_slots) => { @@ -519,9 +563,11 @@ impl ClusterInfoVoteListener { replay_votes_receiver, &None, &None, + &mut None, ) } + #[allow(clippy::too_many_arguments)] fn listen_and_confirm_votes( gossip_vote_txs_receiver: &VerifiedVoteTransactionsReceiver, vote_tracker: &VoteTracker, @@ -532,6 +578,7 @@ impl ClusterInfoVoteListener { replay_votes_receiver: &ReplayVoteReceiver, bank_notification_sender: &Option, cluster_confirmed_slot_sender: &Option, + vote_processing_time: &mut Option, ) -> Result { let mut sel = Select::new(); sel.recv(gossip_vote_txs_receiver); @@ -560,6 +607,7 @@ impl ClusterInfoVoteListener { verified_vote_sender, bank_notification_sender, cluster_confirmed_slot_sender, + vote_processing_time, )); } remaining_wait_time = remaining_wait_time.saturating_sub(start.elapsed()); @@ -682,6 +730,7 @@ impl ClusterInfoVoteListener { } } + #[allow(clippy::too_many_arguments)] fn filter_and_confirm_with_new_votes( vote_tracker: &VoteTracker, gossip_vote_txs: Vec, @@ -692,11 +741,13 @@ impl ClusterInfoVoteListener { verified_vote_sender: &VerifiedVoteSender, bank_notification_sender: &Option, cluster_confirmed_slot_sender: &Option, + vote_processing_time: &mut Option, ) -> ThresholdConfirmedSlots { let mut diff: HashMap> = HashMap::new(); let mut new_optimistic_confirmed_slots = vec![]; // Process votes from gossip and ReplayStage + let mut gossip_vote_txn_processing_time = Measure::start("gossip_vote_processing_time"); let votes = gossip_vote_txs .iter() .filter_map(vote_parser::parse_vote_transaction) @@ -719,8 +770,11 @@ impl ClusterInfoVoteListener { cluster_confirmed_slot_sender, ); } + gossip_vote_txn_processing_time.stop(); + let gossip_vote_txn_processing_time_us = gossip_vote_txn_processing_time.as_us(); // Process all the slots accumulated from replay and gossip. + let mut gossip_vote_slot_confirming_time = Measure::start("gossip_vote_slot_confirm_time"); for (slot, mut slot_diff) in diff { let slot_tracker = vote_tracker.get_or_insert_slot_tracker(slot); { @@ -768,6 +822,16 @@ impl ClusterInfoVoteListener { w_slot_tracker.gossip_only_stake += gossip_only_stake } + gossip_vote_slot_confirming_time.stop(); + let gossip_vote_slot_confirming_time_us = gossip_vote_slot_confirming_time.as_us(); + + match vote_processing_time { + Some(ref mut vote_processing_time) => vote_processing_time.update( + gossip_vote_txn_processing_time_us, + gossip_vote_slot_confirming_time_us, + ), + None => {} + } new_optimistic_confirmed_slots } @@ -949,6 +1013,7 @@ mod tests { &replay_votes_receiver, &None, &None, + &mut None, ) .unwrap(); @@ -980,6 +1045,7 @@ mod tests { &replay_votes_receiver, &None, &None, + &mut None, ) .unwrap(); @@ -1062,6 +1128,7 @@ mod tests { &replay_votes_receiver, &None, &None, + &mut None, ) .unwrap(); @@ -1219,6 +1286,7 @@ mod tests { &replay_votes_receiver, &None, &None, + &mut None, ) .unwrap(); @@ -1319,6 +1387,7 @@ mod tests { &replay_votes_receiver, &None, &None, + &mut None, ); } let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); @@ -1409,6 +1478,7 @@ mod tests { &verified_vote_sender, &None, &None, + &mut None, ); // Setup next epoch @@ -1455,6 +1525,7 @@ mod tests { &verified_vote_sender, &None, &None, + &mut None, ); }