From c96d9d127aa0bfd22f69e265eba856fa7a21174a Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Mon, 13 Jun 2022 17:03:34 -0500 Subject: [PATCH] Include forwarding counters in leader slot metrics (#25874) * To include forwarding counters in leader slot metrics * Capture slot_end_detected time when checking leader slots, to be used in reporting later * Simplify banking stage loop to report leader slot metrics Co-authored-by: carllin --- core/src/banking_stage.rs | 52 ++-- core/src/leader_slot_banking_stage_metrics.rs | 287 ++++++++++++------ ...eader_slot_banking_stage_timing_metrics.rs | 28 +- 3 files changed, 230 insertions(+), 137 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index d66fd5eab..24f6ba7a7 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -917,7 +917,7 @@ impl BankingStage { connection_cache: &ConnectionCache, tracer_packet_stats: &mut TracerPacketStats, ) { - let (decision, make_decision_time) = measure!( + let ((metrics_action, decision), make_decision_time) = measure!( { let bank_start; let ( @@ -938,13 +938,16 @@ impl BankingStage { ), ) }; - slot_metrics_tracker.update_on_leader_slot_boundary(&bank_start); - Self::consume_or_forward_packets( - my_pubkey, - leader_at_slot_offset, - bank_still_processing_txs, - would_be_leader, - would_be_leader_shortly, + + ( + slot_metrics_tracker.check_leader_slot_boundary(&bank_start), + Self::consume_or_forward_packets( + my_pubkey, + leader_at_slot_offset, + bank_still_processing_txs, + would_be_leader, + would_be_leader_shortly, + ), ) }, "make_decision", @@ -953,6 +956,11 @@ impl BankingStage { match decision { BufferedPacketsDecision::Consume(max_tx_ingestion_ns) => { + // Take metrics action before consume packets (potentially resetting the + // slot metrics tracker to the next slot) so that we don't count the + // packet processing metrics from the next slot towards the metrics + // of the previous slot + slot_metrics_tracker.apply_action(metrics_action); let (_, consume_buffered_packets_time) = measure!( Self::consume_buffered_packets( my_pubkey, @@ -990,6 +998,9 @@ impl BankingStage { "forward", ); slot_metrics_tracker.increment_forward_us(forward_time.as_us()); + // Take metrics action after forwarding packets to include forwarded + // metrics into current slot + slot_metrics_tracker.apply_action(metrics_action); } BufferedPacketsDecision::ForwardAndHold => { let (_, forward_and_hold_time) = measure!( @@ -1008,6 +1019,8 @@ impl BankingStage { "forward_and_hold", ); slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_time.as_us()); + // Take metrics action after forwarding packets + slot_metrics_tracker.apply_action(metrics_action); } _ => (), } @@ -1102,7 +1115,9 @@ impl BankingStage { loop { let my_pubkey = cluster_info.id(); - if !buffered_packet_batches.is_empty() { + if !buffered_packet_batches.is_empty() + || last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD + { let (_, process_buffered_packets_time) = measure!( Self::process_buffered_packets( &my_pubkey, @@ -1124,28 +1139,11 @@ impl BankingStage { ); slot_metrics_tracker .increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); + last_metrics_update = Instant::now(); } tracer_packet_stats.report(1000); - if last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD { - let (_, slot_metrics_checker_check_slot_boundary_time) = measure!( - { - let current_poh_bank = { - let poh = poh_recorder.lock().unwrap(); - poh.bank_start() - }; - slot_metrics_tracker.update_on_leader_slot_boundary(¤t_poh_bank); - }, - "slot_metrics_checker_check_slot_boundary", - ); - slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us( - slot_metrics_checker_check_slot_boundary_time.as_us(), - ); - - last_metrics_update = Instant::now(); - } - let recv_timeout = if !buffered_packet_batches.is_empty() { // If there are buffered packets, run the equivalent of try_recv to try reading more // packets. This prevents starving BankingStage::consume_buffered_packets due to diff --git a/core/src/leader_slot_banking_stage_metrics.rs b/core/src/leader_slot_banking_stage_metrics.rs index ed11cfb11..2307b2f0e 100644 --- a/core/src/leader_slot_banking_stage_metrics.rs +++ b/core/src/leader_slot_banking_stage_metrics.rs @@ -283,6 +283,18 @@ impl LeaderSlotMetrics { None } } + + fn mark_slot_end_detected(&mut self) { + self.timing_metrics.mark_slot_end_detected(); + } +} + +#[derive(Debug)] +pub(crate) enum MetricsTrackerAction { + Noop, + ReportAndResetTracker, + NewTracker(Option), + ReportAndNewTracker(Option), } #[derive(Debug)] @@ -301,52 +313,72 @@ impl LeaderSlotMetricsTracker { } } - // Returns reported slot if metrics were reported - pub(crate) fn update_on_leader_slot_boundary( + // Check leader slot, return MetricsTrackerAction to be applied by apply_action() + pub(crate) fn check_leader_slot_boundary( &mut self, bank_start: &Option, - ) -> Option { + ) -> MetricsTrackerAction { match (self.leader_slot_metrics.as_mut(), bank_start) { - (None, None) => None, + (None, None) => MetricsTrackerAction::Noop, (Some(leader_slot_metrics), None) => { - leader_slot_metrics.report(); - // Ensure tests catch that `report()` method was called - let reported_slot = leader_slot_metrics.reported_slot(); - // Slot has ended, time to report metrics - self.leader_slot_metrics = None; - reported_slot + leader_slot_metrics.mark_slot_end_detected(); + MetricsTrackerAction::ReportAndResetTracker } + // Our leader slot has begain, time to create a new slot tracker (None, Some(bank_start)) => { - // Our leader slot has begain, time to create a new slot tracker - self.leader_slot_metrics = Some(LeaderSlotMetrics::new( + MetricsTrackerAction::NewTracker(Some(LeaderSlotMetrics::new( self.id, bank_start.working_bank.slot(), &bank_start.bank_creation_time, - )); - self.leader_slot_metrics.as_ref().unwrap().reported_slot() + ))) } (Some(leader_slot_metrics), Some(bank_start)) => { if leader_slot_metrics.slot != bank_start.working_bank.slot() { // Last slot has ended, new slot has began - leader_slot_metrics.report(); - // Ensure tests catch that `report()` method was called - let reported_slot = leader_slot_metrics.reported_slot(); - self.leader_slot_metrics = Some(LeaderSlotMetrics::new( + leader_slot_metrics.mark_slot_end_detected(); + MetricsTrackerAction::ReportAndNewTracker(Some(LeaderSlotMetrics::new( self.id, bank_start.working_bank.slot(), &bank_start.bank_creation_time, - )); - reported_slot + ))) } else { - leader_slot_metrics.reported_slot() + MetricsTrackerAction::Noop } } } } + pub(crate) fn apply_action(&mut self, action: MetricsTrackerAction) -> Option { + match action { + MetricsTrackerAction::Noop => None, + MetricsTrackerAction::ReportAndResetTracker => { + let mut reported_slot = None; + if let Some(leader_slot_metrics) = self.leader_slot_metrics.as_mut() { + leader_slot_metrics.report(); + reported_slot = leader_slot_metrics.reported_slot(); + } + self.leader_slot_metrics = None; + reported_slot + } + MetricsTrackerAction::NewTracker(new_slot_metrics) => { + self.leader_slot_metrics = new_slot_metrics; + self.leader_slot_metrics.as_ref().unwrap().reported_slot() + } + MetricsTrackerAction::ReportAndNewTracker(new_slot_metrics) => { + let mut reported_slot = None; + if let Some(leader_slot_metrics) = self.leader_slot_metrics.as_mut() { + leader_slot_metrics.report(); + reported_slot = leader_slot_metrics.reported_slot(); + } + self.leader_slot_metrics = new_slot_metrics; + reported_slot + } + } + } + pub(crate) fn accumulate_process_transactions_summary( &mut self, process_transactions_summary: &ProcessTransactionsSummary, @@ -584,18 +616,6 @@ impl LeaderSlotMetricsTracker { } } - pub(crate) fn increment_slot_metrics_check_slot_boundary_us(&mut self, us: u64) { - if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { - saturating_add_assign!( - leader_slot_metrics - .timing_metrics - .outer_loop_timings - .slot_metrics_check_slot_boundary_us, - us - ); - } - } - pub(crate) fn increment_receive_and_buffer_packets_us(&mut self, us: u64) { if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { saturating_add_assign!( @@ -745,7 +765,7 @@ mod tests { super::*, solana_runtime::{bank::Bank, genesis_utils::create_genesis_config}, solana_sdk::pubkey::Pubkey, - std::sync::Arc, + std::{mem, sync::Arc}, }; struct TestSlotBoundaryComponents { @@ -794,9 +814,12 @@ mod tests { .. } = setup_test_slot_boundary_banks(); // Test that with no bank being tracked, and no new bank being tracked, nothing is reported - assert!(leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&None) - .is_none()); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None); + assert_eq!( + mem::discriminant(&MetricsTrackerAction::Noop), + mem::discriminant(&action) + ); + assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); } @@ -811,9 +834,13 @@ mod tests { // Test case where the thread has not detected a leader bank, and now sees a leader bank. // Metrics should not be reported because leader slot has not ended assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); - assert!(leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) - .is_none()); + let action = + leader_slot_metrics_tracker.check_leader_slot_boundary(&Some(first_poh_recorder_bank)); + assert_eq!( + mem::discriminant(&MetricsTrackerAction::NewTracker(None)), + mem::discriminant(&action) + ); + assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_some()); } @@ -829,19 +856,33 @@ mod tests { // Test case where the thread has a leader bank, and now detects there's no more leader bank, // implying the slot has ended. Metrics should be reported for `first_bank.slot()`, // because that leader slot has just ended. - assert!(leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) - .is_none()); - assert_eq!( - leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&None) - .unwrap(), - first_bank.slot() - ); - assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); - assert!(leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&None) - .is_none()); + { + // Setup first_bank + let action = leader_slot_metrics_tracker + .check_leader_slot_boundary(&Some(first_poh_recorder_bank)); + assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); + } + { + // Assert reporting if slot has ended + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None); + assert_eq!( + mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), + mem::discriminant(&action) + ); + assert_eq!( + leader_slot_metrics_tracker.apply_action(action).unwrap(), + first_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } + { + // Assert no-op if still no new bank + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None); + assert_eq!( + mem::discriminant(&MetricsTrackerAction::Noop), + mem::discriminant(&action) + ); + } } #[test] @@ -855,19 +896,35 @@ mod tests { // Test case where the thread has a leader bank, and now detects the same leader bank, // implying the slot is still running. Metrics should not be reported - assert!(leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank.clone())) - .is_none()); - assert!(leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) - .is_none()); - assert_eq!( - leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&None) - .unwrap(), - first_bank.slot() - ); - assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + { + // Setup with first_bank + let action = leader_slot_metrics_tracker + .check_leader_slot_boundary(&Some(first_poh_recorder_bank.clone())); + assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); + } + { + // Assert nop-op if same bank + let action = leader_slot_metrics_tracker + .check_leader_slot_boundary(&Some(first_poh_recorder_bank)); + assert_eq!( + mem::discriminant(&MetricsTrackerAction::Noop), + mem::discriminant(&action) + ); + assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); + } + { + // Assert reporting if slot has ended + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None); + assert_eq!( + mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), + mem::discriminant(&action) + ); + assert_eq!( + leader_slot_metrics_tracker.apply_action(action).unwrap(), + first_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } } #[test] @@ -883,22 +940,39 @@ mod tests { // Test case where the thread has a leader bank, and now detects there's a new leader bank // for a bigger slot, implying the slot has ended. Metrics should be reported for the // smaller slot - assert!(leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) - .is_none()); - assert_eq!( - leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&Some(next_poh_recorder_bank)) - .unwrap(), - first_bank.slot() - ); - assert_eq!( - leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&None) - .unwrap(), - next_bank.slot() - ); - assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + { + // Setup with first_bank + let action = leader_slot_metrics_tracker + .check_leader_slot_boundary(&Some(first_poh_recorder_bank)); + assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); + } + { + // Assert reporting if new bank + let action = leader_slot_metrics_tracker + .check_leader_slot_boundary(&Some(next_poh_recorder_bank)); + assert_eq!( + mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)), + mem::discriminant(&action) + ); + assert_eq!( + leader_slot_metrics_tracker.apply_action(action).unwrap(), + first_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_some()); + } + { + // Assert reporting if slot has ended + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None); + assert_eq!( + mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), + mem::discriminant(&action) + ); + assert_eq!( + leader_slot_metrics_tracker.apply_action(action).unwrap(), + next_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } } #[test] @@ -913,21 +987,38 @@ mod tests { // Test case where the thread has a leader bank, and now detects there's a new leader bank // for a samller slot, implying the slot has ended. Metrics should be reported for the // bigger slot - assert!(leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&Some(next_poh_recorder_bank)) - .is_none()); - assert_eq!( - leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) - .unwrap(), - next_bank.slot() - ); - assert_eq!( - leader_slot_metrics_tracker - .update_on_leader_slot_boundary(&None) - .unwrap(), - first_bank.slot() - ); - assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + { + // Setup with next_bank + let action = leader_slot_metrics_tracker + .check_leader_slot_boundary(&Some(next_poh_recorder_bank)); + assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); + } + { + // Assert reporting if new bank + let action = leader_slot_metrics_tracker + .check_leader_slot_boundary(&Some(first_poh_recorder_bank)); + assert_eq!( + mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)), + mem::discriminant(&action) + ); + assert_eq!( + leader_slot_metrics_tracker.apply_action(action).unwrap(), + next_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_some()); + } + { + // Assert reporting if slot has ended + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None); + assert_eq!( + mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker), + mem::discriminant(&action) + ); + assert_eq!( + leader_slot_metrics_tracker.apply_action(action).unwrap(), + first_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } } } diff --git a/core/src/leader_slot_banking_stage_timing_metrics.rs b/core/src/leader_slot_banking_stage_timing_metrics.rs index 5c337e1f4..a56f8a754 100644 --- a/core/src/leader_slot_banking_stage_timing_metrics.rs +++ b/core/src/leader_slot_banking_stage_timing_metrics.rs @@ -117,6 +117,10 @@ impl LeaderSlotTimingMetrics { self.process_packets_timings.report(id, slot); self.execute_and_commit_timings.report(id, slot); } + + pub(crate) fn mark_slot_end_detected(&mut self) { + self.outer_loop_timings.mark_slot_end_detected(); + } } #[derive(Debug)] @@ -129,15 +133,15 @@ pub(crate) struct OuterLoopTimings { // Time spent processing buffered packets pub process_buffered_packets_us: u64, - // Time spent checking for slot boundary and reporting leader slot metrics - pub slot_metrics_check_slot_boundary_us: u64, - // Time spent processing new incoming packets to the banking thread pub receive_and_buffer_packets_us: u64, // The number of times the function to receive and buffer new packets // was called pub receive_and_buffer_packets_invoked_count: u64, + + // Elapsed time between bank was detected and slot end was detected + pub bank_detected_to_slot_end_detected_us: u64, } impl OuterLoopTimings { @@ -146,26 +150,31 @@ impl OuterLoopTimings { bank_detected_time: Instant::now(), bank_detected_delay_us: bank_creation_time.elapsed().as_micros() as u64, process_buffered_packets_us: 0, - slot_metrics_check_slot_boundary_us: 0, receive_and_buffer_packets_us: 0, receive_and_buffer_packets_invoked_count: 0, + bank_detected_to_slot_end_detected_us: 0, } } + /// Call when detected slot end to capture elapsed time, which might be reported later + fn mark_slot_end_detected(&mut self) { + self.bank_detected_to_slot_end_detected_us = + self.bank_detected_time.elapsed().as_micros() as u64; + } + fn report(&self, id: u32, slot: Slot) { - let bank_detected_to_now_us = self.bank_detected_time.elapsed().as_micros() as u64; datapoint_info!( "banking_stage-leader_slot_loop_timings", ("id", id as i64, i64), ("slot", slot as i64, i64), ( "bank_detected_to_slot_end_detected_us", - bank_detected_to_now_us, + self.bank_detected_to_slot_end_detected_us, i64 ), ( "bank_creation_to_slot_end_detected_us", - bank_detected_to_now_us + self.bank_detected_delay_us, + self.bank_detected_to_slot_end_detected_us + self.bank_detected_delay_us, i64 ), ("bank_detected_delay_us", self.bank_detected_delay_us, i64), @@ -174,11 +183,6 @@ impl OuterLoopTimings { self.process_buffered_packets_us, i64 ), - ( - "slot_metrics_check_slot_boundary_us", - self.slot_metrics_check_slot_boundary_us, - i64 - ), ( "receive_and_buffer_packets_us", self.receive_and_buffer_packets_us,