Add new received forwarded packets metric to banking stage (#33414)

This commit is contained in:
Andrew Fitzgerald 2023-09-28 09:25:10 -07:00 committed by GitHub
parent cc4e9283db
commit e3cd13e49d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 20 deletions

View File

@ -85,6 +85,7 @@ pub struct BankingStageStats {
pub(crate) dropped_duplicated_packets_count: AtomicUsize, pub(crate) dropped_duplicated_packets_count: AtomicUsize,
dropped_forward_packets_count: AtomicUsize, dropped_forward_packets_count: AtomicUsize,
newly_buffered_packets_count: AtomicUsize, newly_buffered_packets_count: AtomicUsize,
newly_buffered_forwarded_packets_count: AtomicUsize,
current_buffered_packets_count: AtomicUsize, current_buffered_packets_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize, rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize,
@ -147,109 +148,115 @@ impl BankingStageStats {
if self.last_report.should_update(report_interval_ms) { if self.last_report.should_update(report_interval_ms) {
datapoint_info!( datapoint_info!(
"banking_stage-loop-stats", "banking_stage-loop-stats",
("id", self.id as i64, i64), ("id", self.id, i64),
( (
"receive_and_buffer_packets_count", "receive_and_buffer_packets_count",
self.receive_and_buffer_packets_count self.receive_and_buffer_packets_count
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"dropped_packets_count", "dropped_packets_count",
self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64, self.dropped_packets_count.swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"dropped_duplicated_packets_count", "dropped_duplicated_packets_count",
self.dropped_duplicated_packets_count self.dropped_duplicated_packets_count
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"dropped_forward_packets_count", "dropped_forward_packets_count",
self.dropped_forward_packets_count self.dropped_forward_packets_count
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"newly_buffered_packets_count", "newly_buffered_packets_count",
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, self.newly_buffered_packets_count.swap(0, Ordering::Relaxed),
i64
),
(
"newly_buffered_forwarded_packets_count",
self.newly_buffered_forwarded_packets_count
.swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"current_buffered_packets_count", "current_buffered_packets_count",
self.current_buffered_packets_count self.current_buffered_packets_count
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"rebuffered_packets_count", "rebuffered_packets_count",
self.rebuffered_packets_count.swap(0, Ordering::Relaxed) as i64, self.rebuffered_packets_count.swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"consumed_buffered_packets_count", "consumed_buffered_packets_count",
self.consumed_buffered_packets_count self.consumed_buffered_packets_count
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"forwarded_transaction_count", "forwarded_transaction_count",
self.forwarded_transaction_count.swap(0, Ordering::Relaxed) as i64, self.forwarded_transaction_count.swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"forwarded_vote_count", "forwarded_vote_count",
self.forwarded_vote_count.swap(0, Ordering::Relaxed) as i64, self.forwarded_vote_count.swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"consume_buffered_packets_elapsed", "consume_buffered_packets_elapsed",
self.consume_buffered_packets_elapsed self.consume_buffered_packets_elapsed
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"receive_and_buffer_packets_elapsed", "receive_and_buffer_packets_elapsed",
self.receive_and_buffer_packets_elapsed self.receive_and_buffer_packets_elapsed
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"filter_pending_packets_elapsed", "filter_pending_packets_elapsed",
self.filter_pending_packets_elapsed self.filter_pending_packets_elapsed
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"packet_conversion_elapsed", "packet_conversion_elapsed",
self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64, self.packet_conversion_elapsed.swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"transaction_processing_elapsed", "transaction_processing_elapsed",
self.transaction_processing_elapsed self.transaction_processing_elapsed
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed),
i64 i64
), ),
( (
"packet_batch_indices_len_min", "packet_batch_indices_len_min",
self.batch_packet_indexes_len.minimum().unwrap_or(0) as i64, self.batch_packet_indexes_len.minimum().unwrap_or(0),
i64 i64
), ),
( (
"packet_batch_indices_len_max", "packet_batch_indices_len_max",
self.batch_packet_indexes_len.maximum().unwrap_or(0) as i64, self.batch_packet_indexes_len.maximum().unwrap_or(0),
i64 i64
), ),
( (
"packet_batch_indices_len_mean", "packet_batch_indices_len_mean",
self.batch_packet_indexes_len.mean().unwrap_or(0) as i64, self.batch_packet_indexes_len.mean().unwrap_or(0),
i64 i64
), ),
( (
"packet_batch_indices_len_90pct", "packet_batch_indices_len_90pct",
self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0) as i64, self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0),
i64 i64
) )
); );

View File

@ -115,11 +115,13 @@ impl PacketReceiver {
let mut dropped_packets_count = 0; let mut dropped_packets_count = 0;
let mut newly_buffered_packets_count = 0; let mut newly_buffered_packets_count = 0;
let mut newly_buffered_forwarded_packets_count = 0;
Self::push_unprocessed( Self::push_unprocessed(
unprocessed_transaction_storage, unprocessed_transaction_storage,
deserialized_packets, deserialized_packets,
&mut dropped_packets_count, &mut dropped_packets_count,
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
&mut newly_buffered_forwarded_packets_count,
banking_stage_stats, banking_stage_stats,
slot_metrics_tracker, slot_metrics_tracker,
tracer_packet_stats, tracer_packet_stats,
@ -144,6 +146,7 @@ impl PacketReceiver {
deserialized_packets: Vec<ImmutableDeserializedPacket>, deserialized_packets: Vec<ImmutableDeserializedPacket>,
dropped_packets_count: &mut usize, dropped_packets_count: &mut usize,
newly_buffered_packets_count: &mut usize, newly_buffered_packets_count: &mut usize,
newly_buffered_forwarded_packets_count: &mut usize,
banking_stage_stats: &mut BankingStageStats, banking_stage_stats: &mut BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker, slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
tracer_packet_stats: &mut TracerPacketStats, tracer_packet_stats: &mut TracerPacketStats,
@ -154,6 +157,10 @@ impl PacketReceiver {
.increment(deserialized_packets.len() as u64); .increment(deserialized_packets.len() as u64);
*newly_buffered_packets_count += deserialized_packets.len(); *newly_buffered_packets_count += deserialized_packets.len();
*newly_buffered_forwarded_packets_count += deserialized_packets
.iter()
.filter(|p| p.original_packet().meta().forwarded())
.count();
slot_metrics_tracker slot_metrics_tracker
.increment_newly_buffered_packets_count(deserialized_packets.len() as u64); .increment_newly_buffered_packets_count(deserialized_packets.len() as u64);