diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 2dfb1e32b..e8b61de94 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -85,6 +85,7 @@ pub struct BankingStageStats { pub(crate) dropped_duplicated_packets_count: AtomicUsize, dropped_forward_packets_count: AtomicUsize, newly_buffered_packets_count: AtomicUsize, + newly_buffered_forwarded_packets_count: AtomicUsize, current_buffered_packets_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, @@ -147,109 +148,115 @@ impl BankingStageStats { if self.last_report.should_update(report_interval_ms) { datapoint_info!( "banking_stage-loop-stats", - ("id", self.id as i64, i64), + ("id", self.id, i64), ( "receive_and_buffer_packets_count", self.receive_and_buffer_packets_count - .swap(0, Ordering::Relaxed) as i64, + .swap(0, Ordering::Relaxed), i64 ), ( "dropped_packets_count", - self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64, + self.dropped_packets_count.swap(0, Ordering::Relaxed), i64 ), ( "dropped_duplicated_packets_count", self.dropped_duplicated_packets_count - .swap(0, Ordering::Relaxed) as i64, + .swap(0, Ordering::Relaxed), i64 ), ( "dropped_forward_packets_count", self.dropped_forward_packets_count - .swap(0, Ordering::Relaxed) as i64, + .swap(0, Ordering::Relaxed), i64 ), ( "newly_buffered_packets_count", - self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, + self.newly_buffered_packets_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "newly_buffered_forwarded_packets_count", + self.newly_buffered_forwarded_packets_count + .swap(0, Ordering::Relaxed), i64 ), ( "current_buffered_packets_count", self.current_buffered_packets_count - .swap(0, Ordering::Relaxed) as i64, + .swap(0, Ordering::Relaxed), i64 ), ( "rebuffered_packets_count", - self.rebuffered_packets_count.swap(0, Ordering::Relaxed) as i64, + self.rebuffered_packets_count.swap(0, Ordering::Relaxed), i64 ), ( "consumed_buffered_packets_count", self.consumed_buffered_packets_count - .swap(0, Ordering::Relaxed) as i64, + .swap(0, Ordering::Relaxed), i64 ), ( "forwarded_transaction_count", - self.forwarded_transaction_count.swap(0, Ordering::Relaxed) as i64, + self.forwarded_transaction_count.swap(0, Ordering::Relaxed), i64 ), ( "forwarded_vote_count", - self.forwarded_vote_count.swap(0, Ordering::Relaxed) as i64, + self.forwarded_vote_count.swap(0, Ordering::Relaxed), i64 ), ( "consume_buffered_packets_elapsed", self.consume_buffered_packets_elapsed - .swap(0, Ordering::Relaxed) as i64, + .swap(0, Ordering::Relaxed), i64 ), ( "receive_and_buffer_packets_elapsed", self.receive_and_buffer_packets_elapsed - .swap(0, Ordering::Relaxed) as i64, + .swap(0, Ordering::Relaxed), i64 ), ( "filter_pending_packets_elapsed", self.filter_pending_packets_elapsed - .swap(0, Ordering::Relaxed) as i64, + .swap(0, Ordering::Relaxed), i64 ), ( "packet_conversion_elapsed", - self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64, + self.packet_conversion_elapsed.swap(0, Ordering::Relaxed), i64 ), ( "transaction_processing_elapsed", self.transaction_processing_elapsed - .swap(0, Ordering::Relaxed) as i64, + .swap(0, Ordering::Relaxed), i64 ), ( "packet_batch_indices_len_min", - self.batch_packet_indexes_len.minimum().unwrap_or(0) as i64, + self.batch_packet_indexes_len.minimum().unwrap_or(0), i64 ), ( "packet_batch_indices_len_max", - self.batch_packet_indexes_len.maximum().unwrap_or(0) as i64, + self.batch_packet_indexes_len.maximum().unwrap_or(0), i64 ), ( "packet_batch_indices_len_mean", - self.batch_packet_indexes_len.mean().unwrap_or(0) as i64, + self.batch_packet_indexes_len.mean().unwrap_or(0), i64 ), ( "packet_batch_indices_len_90pct", - self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0) as i64, + self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0), i64 ) ); diff --git a/core/src/banking_stage/packet_receiver.rs b/core/src/banking_stage/packet_receiver.rs index 00d3f1549..a566ef7cf 100644 --- a/core/src/banking_stage/packet_receiver.rs +++ b/core/src/banking_stage/packet_receiver.rs @@ -115,11 +115,13 @@ impl PacketReceiver { let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; + let mut newly_buffered_forwarded_packets_count = 0; Self::push_unprocessed( unprocessed_transaction_storage, deserialized_packets, &mut dropped_packets_count, &mut newly_buffered_packets_count, + &mut newly_buffered_forwarded_packets_count, banking_stage_stats, slot_metrics_tracker, tracer_packet_stats, @@ -144,6 +146,7 @@ impl PacketReceiver { deserialized_packets: Vec, dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, + newly_buffered_forwarded_packets_count: &mut usize, banking_stage_stats: &mut BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, tracer_packet_stats: &mut TracerPacketStats, @@ -154,6 +157,10 @@ impl PacketReceiver { .increment(deserialized_packets.len() as u64); *newly_buffered_packets_count += deserialized_packets.len(); + *newly_buffered_forwarded_packets_count += deserialized_packets + .iter() + .filter(|p| p.original_packet().meta().forwarded()) + .count(); slot_metrics_tracker .increment_newly_buffered_packets_count(deserialized_packets.len() as u64);