Include forwarding counters in leader slot metrics (#25874)

* To include forwarding counters in leader slot metrics

* Capture slot_end_detected time when checking leader slots, to be used in reporting later

* Simplify banking stage loop to report leader slot metrics

Co-authored-by: carllin <carl@solana.com>
This commit is contained in:
Tao Zhu 2022-06-13 17:03:34 -05:00 committed by GitHub
parent c419845cfe
commit c96d9d127a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 230 additions and 137 deletions

View File

@ -917,7 +917,7 @@ impl BankingStage {
connection_cache: &ConnectionCache,
tracer_packet_stats: &mut TracerPacketStats,
) {
let (decision, make_decision_time) = measure!(
let ((metrics_action, decision), make_decision_time) = measure!(
{
let bank_start;
let (
@ -938,13 +938,16 @@ impl BankingStage {
),
)
};
slot_metrics_tracker.update_on_leader_slot_boundary(&bank_start);
Self::consume_or_forward_packets(
my_pubkey,
leader_at_slot_offset,
bank_still_processing_txs,
would_be_leader,
would_be_leader_shortly,
(
slot_metrics_tracker.check_leader_slot_boundary(&bank_start),
Self::consume_or_forward_packets(
my_pubkey,
leader_at_slot_offset,
bank_still_processing_txs,
would_be_leader,
would_be_leader_shortly,
),
)
},
"make_decision",
@ -953,6 +956,11 @@ impl BankingStage {
match decision {
BufferedPacketsDecision::Consume(max_tx_ingestion_ns) => {
// Take metrics action before consume packets (potentially resetting the
// slot metrics tracker to the next slot) so that we don't count the
// packet processing metrics from the next slot towards the metrics
// of the previous slot
slot_metrics_tracker.apply_action(metrics_action);
let (_, consume_buffered_packets_time) = measure!(
Self::consume_buffered_packets(
my_pubkey,
@ -990,6 +998,9 @@ impl BankingStage {
"forward",
);
slot_metrics_tracker.increment_forward_us(forward_time.as_us());
// Take metrics action after forwarding packets to include forwarded
// metrics into current slot
slot_metrics_tracker.apply_action(metrics_action);
}
BufferedPacketsDecision::ForwardAndHold => {
let (_, forward_and_hold_time) = measure!(
@ -1008,6 +1019,8 @@ impl BankingStage {
"forward_and_hold",
);
slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_time.as_us());
// Take metrics action after forwarding packets
slot_metrics_tracker.apply_action(metrics_action);
}
_ => (),
}
@ -1102,7 +1115,9 @@ impl BankingStage {
loop {
let my_pubkey = cluster_info.id();
if !buffered_packet_batches.is_empty() {
if !buffered_packet_batches.is_empty()
|| last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD
{
let (_, process_buffered_packets_time) = measure!(
Self::process_buffered_packets(
&my_pubkey,
@ -1124,28 +1139,11 @@ impl BankingStage {
);
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_time.as_us());
last_metrics_update = Instant::now();
}
tracer_packet_stats.report(1000);
if last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD {
let (_, slot_metrics_checker_check_slot_boundary_time) = measure!(
{
let current_poh_bank = {
let poh = poh_recorder.lock().unwrap();
poh.bank_start()
};
slot_metrics_tracker.update_on_leader_slot_boundary(&current_poh_bank);
},
"slot_metrics_checker_check_slot_boundary",
);
slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us(
slot_metrics_checker_check_slot_boundary_time.as_us(),
);
last_metrics_update = Instant::now();
}
let recv_timeout = if !buffered_packet_batches.is_empty() {
// If there are buffered packets, run the equivalent of try_recv to try reading more
// packets. This prevents starving BankingStage::consume_buffered_packets due to

View File

@ -283,6 +283,18 @@ impl LeaderSlotMetrics {
None
}
}
fn mark_slot_end_detected(&mut self) {
self.timing_metrics.mark_slot_end_detected();
}
}
#[derive(Debug)]
pub(crate) enum MetricsTrackerAction {
Noop,
ReportAndResetTracker,
NewTracker(Option<LeaderSlotMetrics>),
ReportAndNewTracker(Option<LeaderSlotMetrics>),
}
#[derive(Debug)]
@ -301,52 +313,72 @@ impl LeaderSlotMetricsTracker {
}
}
// Returns reported slot if metrics were reported
pub(crate) fn update_on_leader_slot_boundary(
// Check leader slot, return MetricsTrackerAction to be applied by apply_action()
pub(crate) fn check_leader_slot_boundary(
&mut self,
bank_start: &Option<BankStart>,
) -> Option<Slot> {
) -> MetricsTrackerAction {
match (self.leader_slot_metrics.as_mut(), bank_start) {
(None, None) => None,
(None, None) => MetricsTrackerAction::Noop,
(Some(leader_slot_metrics), None) => {
leader_slot_metrics.report();
// Ensure tests catch that `report()` method was called
let reported_slot = leader_slot_metrics.reported_slot();
// Slot has ended, time to report metrics
self.leader_slot_metrics = None;
reported_slot
leader_slot_metrics.mark_slot_end_detected();
MetricsTrackerAction::ReportAndResetTracker
}
// Our leader slot has begain, time to create a new slot tracker
(None, Some(bank_start)) => {
// Our leader slot has begain, time to create a new slot tracker
self.leader_slot_metrics = Some(LeaderSlotMetrics::new(
MetricsTrackerAction::NewTracker(Some(LeaderSlotMetrics::new(
self.id,
bank_start.working_bank.slot(),
&bank_start.bank_creation_time,
));
self.leader_slot_metrics.as_ref().unwrap().reported_slot()
)))
}
(Some(leader_slot_metrics), Some(bank_start)) => {
if leader_slot_metrics.slot != bank_start.working_bank.slot() {
// Last slot has ended, new slot has began
leader_slot_metrics.report();
// Ensure tests catch that `report()` method was called
let reported_slot = leader_slot_metrics.reported_slot();
self.leader_slot_metrics = Some(LeaderSlotMetrics::new(
leader_slot_metrics.mark_slot_end_detected();
MetricsTrackerAction::ReportAndNewTracker(Some(LeaderSlotMetrics::new(
self.id,
bank_start.working_bank.slot(),
&bank_start.bank_creation_time,
));
reported_slot
)))
} else {
leader_slot_metrics.reported_slot()
MetricsTrackerAction::Noop
}
}
}
}
pub(crate) fn apply_action(&mut self, action: MetricsTrackerAction) -> Option<Slot> {
match action {
MetricsTrackerAction::Noop => None,
MetricsTrackerAction::ReportAndResetTracker => {
let mut reported_slot = None;
if let Some(leader_slot_metrics) = self.leader_slot_metrics.as_mut() {
leader_slot_metrics.report();
reported_slot = leader_slot_metrics.reported_slot();
}
self.leader_slot_metrics = None;
reported_slot
}
MetricsTrackerAction::NewTracker(new_slot_metrics) => {
self.leader_slot_metrics = new_slot_metrics;
self.leader_slot_metrics.as_ref().unwrap().reported_slot()
}
MetricsTrackerAction::ReportAndNewTracker(new_slot_metrics) => {
let mut reported_slot = None;
if let Some(leader_slot_metrics) = self.leader_slot_metrics.as_mut() {
leader_slot_metrics.report();
reported_slot = leader_slot_metrics.reported_slot();
}
self.leader_slot_metrics = new_slot_metrics;
reported_slot
}
}
}
pub(crate) fn accumulate_process_transactions_summary(
&mut self,
process_transactions_summary: &ProcessTransactionsSummary,
@ -584,18 +616,6 @@ impl LeaderSlotMetricsTracker {
}
}
pub(crate) fn increment_slot_metrics_check_slot_boundary_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.outer_loop_timings
.slot_metrics_check_slot_boundary_us,
us
);
}
}
pub(crate) fn increment_receive_and_buffer_packets_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
@ -745,7 +765,7 @@ mod tests {
super::*,
solana_runtime::{bank::Bank, genesis_utils::create_genesis_config},
solana_sdk::pubkey::Pubkey,
std::sync::Arc,
std::{mem, sync::Arc},
};
struct TestSlotBoundaryComponents {
@ -794,9 +814,12 @@ mod tests {
..
} = setup_test_slot_boundary_banks();
// Test that with no bank being tracked, and no new bank being tracked, nothing is reported
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.is_none());
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::Noop),
mem::discriminant(&action)
);
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
}
@ -811,9 +834,13 @@ mod tests {
// Test case where the thread has not detected a leader bank, and now sees a leader bank.
// Metrics should not be reported because leader slot has not ended
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
.is_none());
let action =
leader_slot_metrics_tracker.check_leader_slot_boundary(&Some(first_poh_recorder_bank));
assert_eq!(
mem::discriminant(&MetricsTrackerAction::NewTracker(None)),
mem::discriminant(&action)
);
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_some());
}
@ -829,19 +856,33 @@ mod tests {
// Test case where the thread has a leader bank, and now detects there's no more leader bank,
// implying the slot has ended. Metrics should be reported for `first_bank.slot()`,
// because that leader slot has just ended.
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
.is_none());
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.unwrap(),
first_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.is_none());
{
// Setup first_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(&Some(first_poh_recorder_bank));
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)
);
assert_eq!(
leader_slot_metrics_tracker.apply_action(action).unwrap(),
first_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
}
{
// Assert no-op if still no new bank
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::Noop),
mem::discriminant(&action)
);
}
}
#[test]
@ -855,19 +896,35 @@ mod tests {
// Test case where the thread has a leader bank, and now detects the same leader bank,
// implying the slot is still running. Metrics should not be reported
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank.clone()))
.is_none());
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
.is_none());
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.unwrap(),
first_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
{
// Setup with first_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(&Some(first_poh_recorder_bank.clone()));
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert nop-op if same bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(&Some(first_poh_recorder_bank));
assert_eq!(
mem::discriminant(&MetricsTrackerAction::Noop),
mem::discriminant(&action)
);
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)
);
assert_eq!(
leader_slot_metrics_tracker.apply_action(action).unwrap(),
first_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
}
}
#[test]
@ -883,22 +940,39 @@ mod tests {
// Test case where the thread has a leader bank, and now detects there's a new leader bank
// for a bigger slot, implying the slot has ended. Metrics should be reported for the
// smaller slot
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
.is_none());
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(next_poh_recorder_bank))
.unwrap(),
first_bank.slot()
);
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.unwrap(),
next_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
{
// Setup with first_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(&Some(first_poh_recorder_bank));
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert reporting if new bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(&Some(next_poh_recorder_bank));
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)),
mem::discriminant(&action)
);
assert_eq!(
leader_slot_metrics_tracker.apply_action(action).unwrap(),
first_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_some());
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)
);
assert_eq!(
leader_slot_metrics_tracker.apply_action(action).unwrap(),
next_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
}
}
#[test]
@ -913,21 +987,38 @@ mod tests {
// Test case where the thread has a leader bank, and now detects there's a new leader bank
// for a samller slot, implying the slot has ended. Metrics should be reported for the
// bigger slot
assert!(leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(next_poh_recorder_bank))
.is_none());
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&Some(first_poh_recorder_bank))
.unwrap(),
next_bank.slot()
);
assert_eq!(
leader_slot_metrics_tracker
.update_on_leader_slot_boundary(&None)
.unwrap(),
first_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
{
// Setup with next_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(&Some(next_poh_recorder_bank));
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert reporting if new bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(&Some(first_poh_recorder_bank));
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)),
mem::discriminant(&action)
);
assert_eq!(
leader_slot_metrics_tracker.apply_action(action).unwrap(),
next_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_some());
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(&None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)
);
assert_eq!(
leader_slot_metrics_tracker.apply_action(action).unwrap(),
first_bank.slot()
);
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
}
}
}

View File

@ -117,6 +117,10 @@ impl LeaderSlotTimingMetrics {
self.process_packets_timings.report(id, slot);
self.execute_and_commit_timings.report(id, slot);
}
pub(crate) fn mark_slot_end_detected(&mut self) {
self.outer_loop_timings.mark_slot_end_detected();
}
}
#[derive(Debug)]
@ -129,15 +133,15 @@ pub(crate) struct OuterLoopTimings {
// Time spent processing buffered packets
pub process_buffered_packets_us: u64,
// Time spent checking for slot boundary and reporting leader slot metrics
pub slot_metrics_check_slot_boundary_us: u64,
// Time spent processing new incoming packets to the banking thread
pub receive_and_buffer_packets_us: u64,
// The number of times the function to receive and buffer new packets
// was called
pub receive_and_buffer_packets_invoked_count: u64,
// Elapsed time between bank was detected and slot end was detected
pub bank_detected_to_slot_end_detected_us: u64,
}
impl OuterLoopTimings {
@ -146,26 +150,31 @@ impl OuterLoopTimings {
bank_detected_time: Instant::now(),
bank_detected_delay_us: bank_creation_time.elapsed().as_micros() as u64,
process_buffered_packets_us: 0,
slot_metrics_check_slot_boundary_us: 0,
receive_and_buffer_packets_us: 0,
receive_and_buffer_packets_invoked_count: 0,
bank_detected_to_slot_end_detected_us: 0,
}
}
/// Call when detected slot end to capture elapsed time, which might be reported later
fn mark_slot_end_detected(&mut self) {
self.bank_detected_to_slot_end_detected_us =
self.bank_detected_time.elapsed().as_micros() as u64;
}
fn report(&self, id: u32, slot: Slot) {
let bank_detected_to_now_us = self.bank_detected_time.elapsed().as_micros() as u64;
datapoint_info!(
"banking_stage-leader_slot_loop_timings",
("id", id as i64, i64),
("slot", slot as i64, i64),
(
"bank_detected_to_slot_end_detected_us",
bank_detected_to_now_us,
self.bank_detected_to_slot_end_detected_us,
i64
),
(
"bank_creation_to_slot_end_detected_us",
bank_detected_to_now_us + self.bank_detected_delay_us,
self.bank_detected_to_slot_end_detected_us + self.bank_detected_delay_us,
i64
),
("bank_detected_delay_us", self.bank_detected_delay_us, i64),
@ -174,11 +183,6 @@ impl OuterLoopTimings {
self.process_buffered_packets_us,
i64
),
(
"slot_metrics_check_slot_boundary_us",
self.slot_metrics_check_slot_boundary_us,
i64
),
(
"receive_and_buffer_packets_us",
self.receive_and_buffer_packets_us,