Add PacketBatch packet_indexes stat (#22564)

* collect stats on packet batch indicies

* cleanup

* cleanup

* cleanup

* change name
This commit is contained in:
buffalu 2022-01-19 00:13:07 -08:00 committed by GitHub
parent e616a7ebfc
commit 650882217c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 42 additions and 10 deletions

View File

@ -4,6 +4,7 @@
use { use {
crate::{packet_deduper::PacketDeduper, qos_service::QosService}, crate::{packet_deduper::PacketDeduper, qos_service::QosService},
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
histogram::Histogram,
itertools::Itertools, itertools::Itertools,
retain_mut::RetainMut, retain_mut::RetainMut,
solana_entry::entry::hash_transactions, solana_entry::entry::hash_transactions,
@ -95,6 +96,7 @@ pub struct BankingStageStats {
current_buffered_packet_batches_count: AtomicUsize, current_buffered_packet_batches_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize, rebuffered_packets_count: AtomicUsize,
consumed_buffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize,
batch_packet_indexes_len: Histogram,
// Timing // Timing
consume_buffered_packets_elapsed: AtomicU64, consume_buffered_packets_elapsed: AtomicU64,
@ -111,6 +113,10 @@ impl BankingStageStats {
pub fn new(id: u32) -> Self { pub fn new(id: u32) -> Self {
BankingStageStats { BankingStageStats {
id, id,
batch_packet_indexes_len: Histogram::configure()
.max_value(PACKETS_PER_BATCH as u64)
.build()
.unwrap(),
..BankingStageStats::default() ..BankingStageStats::default()
} }
} }
@ -147,9 +153,10 @@ impl BankingStageStats {
.unprocessed_packet_conversion_elapsed .unprocessed_packet_conversion_elapsed
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
+ self.transaction_processing_elapsed.load(Ordering::Relaxed) + self.transaction_processing_elapsed.load(Ordering::Relaxed)
+ self.batch_packet_indexes_len.entries()
} }
fn report(&self, report_interval_ms: u64) { fn report(&mut self, report_interval_ms: u64) {
// skip repoting metrics if stats is empty // skip repoting metrics if stats is empty
if self.is_empty() { if self.is_empty() {
return; return;
@ -255,7 +262,28 @@ impl BankingStageStats {
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"packet_batch_indices_len_min",
self.batch_packet_indexes_len.minimum().unwrap_or(0) as i64,
i64
),
(
"packet_batch_indices_len_max",
self.batch_packet_indexes_len.maximum().unwrap_or(0) as i64,
i64
),
(
"packet_batch_indices_len_mean",
self.batch_packet_indexes_len.mean().unwrap_or(0) as i64,
i64
),
(
"packet_batch_indices_len_90pct",
self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0) as i64,
i64
)
); );
self.batch_packet_indexes_len.clear();
} }
} }
} }
@ -734,7 +762,7 @@ impl BankingStage {
let recorder = poh_recorder.lock().unwrap().recorder(); let recorder = poh_recorder.lock().unwrap().recorder();
let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit);
let banking_stage_stats = BankingStageStats::new(id); let mut banking_stage_stats = BankingStageStats::new(id);
let qos_service = QosService::new(cost_model, id); let qos_service = QosService::new(cost_model, id);
loop { loop {
let my_pubkey = cluster_info.id(); let my_pubkey = cluster_info.id();
@ -779,7 +807,7 @@ impl BankingStage {
id, id,
batch_limit, batch_limit,
&mut buffered_packet_batches, &mut buffered_packet_batches,
&banking_stage_stats, &mut banking_stage_stats,
packet_deduper, packet_deduper,
) { ) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (), Ok(()) | Err(RecvTimeoutError::Timeout) => (),
@ -1384,7 +1412,7 @@ impl BankingStage {
id: u32, id: u32,
batch_limit: usize, batch_limit: usize,
buffered_packet_batches: &mut UnprocessedPacketBatches, buffered_packet_batches: &mut UnprocessedPacketBatches,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &mut BankingStageStats,
packet_deduper: &PacketDeduper, packet_deduper: &PacketDeduper,
) -> Result<(), RecvTimeoutError> { ) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("receive_and_buffer_packets_recv"); let mut recv_time = Measure::start("receive_and_buffer_packets_recv");
@ -1468,7 +1496,7 @@ impl BankingStage {
newly_buffered_packets_count: &mut usize, newly_buffered_packets_count: &mut usize,
batch_limit: usize, batch_limit: usize,
packet_deduper: &PacketDeduper, packet_deduper: &PacketDeduper,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &mut BankingStageStats,
) { ) {
packet_deduper.dedupe_packets(&packet_batch, &mut packet_indexes, banking_stage_stats); packet_deduper.dedupe_packets(&packet_batch, &mut packet_indexes, banking_stage_stats);
if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if Self::packet_has_more_unprocessed_transactions(&packet_indexes) {
@ -1478,6 +1506,10 @@ impl BankingStage {
*dropped_packets_count += dropped_batch.1.len(); *dropped_packets_count += dropped_batch.1.len();
} }
} }
let _ = banking_stage_stats
.batch_packet_indexes_len
.increment(packet_indexes.len() as u64);
*newly_buffered_packets_count += packet_indexes.len(); *newly_buffered_packets_count += packet_indexes.len();
unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false)); unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false));
} }
@ -3209,7 +3241,7 @@ mod tests {
let mut dropped_packet_batches_count = 0; let mut dropped_packet_batches_count = 0;
let mut dropped_packets_count = 0; let mut dropped_packets_count = 0;
let mut newly_buffered_packets_count = 0; let mut newly_buffered_packets_count = 0;
let banking_stage_stats = BankingStageStats::default(); let mut banking_stage_stats = BankingStageStats::default();
// Because the set of unprocessed `packet_indexes` is empty, the // Because the set of unprocessed `packet_indexes` is empty, the
// packets are not added to the unprocessed queue // packets are not added to the unprocessed queue
BankingStage::push_unprocessed( BankingStage::push_unprocessed(
@ -3221,7 +3253,7 @@ mod tests {
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
batch_limit, batch_limit,
&packet_deduper, &packet_deduper,
&banking_stage_stats, &mut banking_stage_stats,
); );
assert_eq!(unprocessed_packets.len(), 1); assert_eq!(unprocessed_packets.len(), 1);
assert_eq!(dropped_packet_batches_count, 0); assert_eq!(dropped_packet_batches_count, 0);
@ -3240,7 +3272,7 @@ mod tests {
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
batch_limit, batch_limit,
&packet_deduper, &packet_deduper,
&banking_stage_stats, &mut banking_stage_stats,
); );
assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets.len(), 2);
assert_eq!(dropped_packet_batches_count, 0); assert_eq!(dropped_packet_batches_count, 0);
@ -3264,7 +3296,7 @@ mod tests {
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
batch_limit, batch_limit,
&packet_deduper, &packet_deduper,
&banking_stage_stats, &mut banking_stage_stats,
); );
assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets.len(), 2);
assert_eq!( assert_eq!(
@ -3285,7 +3317,7 @@ mod tests {
&mut newly_buffered_packets_count, &mut newly_buffered_packets_count,
3, 3,
&packet_deduper, &packet_deduper,
&banking_stage_stats, &mut banking_stage_stats,
); );
assert_eq!(unprocessed_packets.len(), 2); assert_eq!(unprocessed_packets.len(), 2);
assert_eq!( assert_eq!(