Add timing measurement for gossip vote txn processing (#26163)

* add timing for gossip vote txn processing

* fix build

* fix too many arg error in clippy

* atomic interval
This commit is contained in:
HaoranYi 2022-06-27 08:53:34 -05:00 committed by GitHub
parent 090e11210a
commit d5efbdb19b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 71 additions and 0 deletions

View File

@ -40,6 +40,7 @@ use {
pubkey::Pubkey,
signature::Signature,
slot_hashes,
timing::AtomicInterval,
transaction::Transaction,
},
std::{
@ -182,6 +183,47 @@ impl BankSendVotesStats {
}
}
#[derive(Default)]
struct VoteProcessingTiming {
gossip_txn_processing_time_us: u64,
gossip_slot_confirming_time_us: u64,
last_report: AtomicInterval,
}
const VOTE_PROCESSING_REPORT_INTERVAL_MS: u64 = 1_000;
impl VoteProcessingTiming {
fn reset(&mut self) {
self.gossip_slot_confirming_time_us = 0;
self.gossip_slot_confirming_time_us = 0;
}
fn update(&mut self, vote_txn_processing_time_us: u64, vote_slot_confirming_time_us: u64) {
self.gossip_txn_processing_time_us += vote_txn_processing_time_us;
self.gossip_slot_confirming_time_us += vote_slot_confirming_time_us;
if self
.last_report
.should_update(VOTE_PROCESSING_REPORT_INTERVAL_MS)
{
datapoint_info!(
"vote-processing-timing",
(
"vote_txn_processing_us",
self.gossip_txn_processing_time_us as i64,
i64
),
(
"slot_confirming_time_us",
self.gossip_slot_confirming_time_us as i64,
i64
),
);
self.reset();
}
}
}
pub struct ClusterInfoVoteListener {
thread_hdls: Vec<JoinHandle<()>>,
}
@ -450,6 +492,7 @@ impl ClusterInfoVoteListener {
OptimisticConfirmationVerifier::new(bank_forks.read().unwrap().root());
let mut last_process_root = Instant::now();
let cluster_confirmed_slot_sender = Some(cluster_confirmed_slot_sender);
let mut vote_processing_time = Some(VoteProcessingTiming::default());
loop {
if exit.load(Ordering::Relaxed) {
return Ok(());
@ -480,6 +523,7 @@ impl ClusterInfoVoteListener {
&replay_votes_receiver,
&bank_notification_sender,
&cluster_confirmed_slot_sender,
&mut vote_processing_time,
);
match confirmed_slots {
Ok(confirmed_slots) => {
@ -519,9 +563,11 @@ impl ClusterInfoVoteListener {
replay_votes_receiver,
&None,
&None,
&mut None,
)
}
#[allow(clippy::too_many_arguments)]
fn listen_and_confirm_votes(
gossip_vote_txs_receiver: &VerifiedVoteTransactionsReceiver,
vote_tracker: &VoteTracker,
@ -532,6 +578,7 @@ impl ClusterInfoVoteListener {
replay_votes_receiver: &ReplayVoteReceiver,
bank_notification_sender: &Option<BankNotificationSender>,
cluster_confirmed_slot_sender: &Option<GossipDuplicateConfirmedSlotsSender>,
vote_processing_time: &mut Option<VoteProcessingTiming>,
) -> Result<ThresholdConfirmedSlots> {
let mut sel = Select::new();
sel.recv(gossip_vote_txs_receiver);
@ -560,6 +607,7 @@ impl ClusterInfoVoteListener {
verified_vote_sender,
bank_notification_sender,
cluster_confirmed_slot_sender,
vote_processing_time,
));
}
remaining_wait_time = remaining_wait_time.saturating_sub(start.elapsed());
@ -682,6 +730,7 @@ impl ClusterInfoVoteListener {
}
}
#[allow(clippy::too_many_arguments)]
fn filter_and_confirm_with_new_votes(
vote_tracker: &VoteTracker,
gossip_vote_txs: Vec<Transaction>,
@ -692,11 +741,13 @@ impl ClusterInfoVoteListener {
verified_vote_sender: &VerifiedVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
cluster_confirmed_slot_sender: &Option<GossipDuplicateConfirmedSlotsSender>,
vote_processing_time: &mut Option<VoteProcessingTiming>,
) -> ThresholdConfirmedSlots {
let mut diff: HashMap<Slot, HashMap<Pubkey, bool>> = HashMap::new();
let mut new_optimistic_confirmed_slots = vec![];
// Process votes from gossip and ReplayStage
let mut gossip_vote_txn_processing_time = Measure::start("gossip_vote_processing_time");
let votes = gossip_vote_txs
.iter()
.filter_map(vote_parser::parse_vote_transaction)
@ -719,8 +770,11 @@ impl ClusterInfoVoteListener {
cluster_confirmed_slot_sender,
);
}
gossip_vote_txn_processing_time.stop();
let gossip_vote_txn_processing_time_us = gossip_vote_txn_processing_time.as_us();
// Process all the slots accumulated from replay and gossip.
let mut gossip_vote_slot_confirming_time = Measure::start("gossip_vote_slot_confirm_time");
for (slot, mut slot_diff) in diff {
let slot_tracker = vote_tracker.get_or_insert_slot_tracker(slot);
{
@ -768,6 +822,16 @@ impl ClusterInfoVoteListener {
w_slot_tracker.gossip_only_stake += gossip_only_stake
}
gossip_vote_slot_confirming_time.stop();
let gossip_vote_slot_confirming_time_us = gossip_vote_slot_confirming_time.as_us();
match vote_processing_time {
Some(ref mut vote_processing_time) => vote_processing_time.update(
gossip_vote_txn_processing_time_us,
gossip_vote_slot_confirming_time_us,
),
None => {}
}
new_optimistic_confirmed_slots
}
@ -949,6 +1013,7 @@ mod tests {
&replay_votes_receiver,
&None,
&None,
&mut None,
)
.unwrap();
@ -980,6 +1045,7 @@ mod tests {
&replay_votes_receiver,
&None,
&None,
&mut None,
)
.unwrap();
@ -1062,6 +1128,7 @@ mod tests {
&replay_votes_receiver,
&None,
&None,
&mut None,
)
.unwrap();
@ -1219,6 +1286,7 @@ mod tests {
&replay_votes_receiver,
&None,
&None,
&mut None,
)
.unwrap();
@ -1319,6 +1387,7 @@ mod tests {
&replay_votes_receiver,
&None,
&None,
&mut None,
);
}
let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap();
@ -1409,6 +1478,7 @@ mod tests {
&verified_vote_sender,
&None,
&None,
&mut None,
);
// Setup next epoch
@ -1455,6 +1525,7 @@ mod tests {
&verified_vote_sender,
&None,
&None,
&mut None,
);
}