Condense banking_stage counters into existing datapoint (#31564)

Counters incur additional overhead in sending points to the MetricsAgent
over a crossbeam channel. Additionally, some of these counters would be
submitted by non-voting nodes which is just extra overhead and noise.

This change condenses several updates of a counter into a field of the
existing BankingStageStats metrics struct.
This commit is contained in:
steviez 2023-05-10 15:44:42 -05:00 committed by GitHub
parent 39701fa560
commit 18a118b438
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 42 additions and 36 deletions

View File

@ -72,6 +72,7 @@ pub struct BankingStageStats {
receive_and_buffer_packets_count: AtomicUsize, receive_and_buffer_packets_count: AtomicUsize,
dropped_packets_count: AtomicUsize, dropped_packets_count: AtomicUsize,
pub(crate) dropped_duplicated_packets_count: AtomicUsize, pub(crate) dropped_duplicated_packets_count: AtomicUsize,
dropped_forward_packets_count: AtomicUsize,
newly_buffered_packets_count: AtomicUsize, newly_buffered_packets_count: AtomicUsize,
current_buffered_packets_count: AtomicUsize, current_buffered_packets_count: AtomicUsize,
rebuffered_packets_count: AtomicUsize, rebuffered_packets_count: AtomicUsize,
@ -108,6 +109,7 @@ impl BankingStageStats {
+ self + self
.dropped_duplicated_packets_count .dropped_duplicated_packets_count
.load(Ordering::Relaxed) as u64 .load(Ordering::Relaxed) as u64
+ self.dropped_forward_packets_count.load(Ordering::Relaxed) as u64
+ self.newly_buffered_packets_count.load(Ordering::Relaxed) as u64 + self.newly_buffered_packets_count.load(Ordering::Relaxed) as u64
+ self.current_buffered_packets_count.load(Ordering::Relaxed) as u64 + self.current_buffered_packets_count.load(Ordering::Relaxed) as u64
+ self.rebuffered_packets_count.load(Ordering::Relaxed) as u64 + self.rebuffered_packets_count.load(Ordering::Relaxed) as u64
@ -152,6 +154,12 @@ impl BankingStageStats {
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"dropped_forward_packets_count",
self.dropped_forward_packets_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
( (
"newly_buffered_packets_count", "newly_buffered_packets_count",
self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64,
@ -280,6 +288,7 @@ pub struct FilterForwardingResults {
pub(crate) total_forwardable_packets: usize, pub(crate) total_forwardable_packets: usize,
pub(crate) total_tracer_packets_in_buffer: usize, pub(crate) total_tracer_packets_in_buffer: usize,
pub(crate) total_forwardable_tracer_packets: usize, pub(crate) total_forwardable_tracer_packets: usize,
pub(crate) total_dropped_packets: usize,
pub(crate) total_packet_conversion_us: u64, pub(crate) total_packet_conversion_us: u64,
pub(crate) total_filter_packets_us: u64, pub(crate) total_filter_packets_us: u64,
} }

View File

@ -249,10 +249,9 @@ impl Consumer {
slot_metrics_tracker slot_metrics_tracker
.increment_retryable_packets_filtered_count(retryable_packets_filtered_count as u64); .increment_retryable_packets_filtered_count(retryable_packets_filtered_count as u64);
inc_new_counter_info!( banking_stage_stats
"banking_stage-dropped_tx_before_forwarding", .dropped_forward_packets_count
retryable_packets_filtered_count .fetch_add(retryable_packets_filtered_count, Ordering::Relaxed);
);
process_transactions_summary.retryable_transaction_indexes = process_transactions_summary.retryable_transaction_indexes =
filtered_retryable_transaction_indexes; filtered_retryable_transaction_indexes;

View File

@ -86,6 +86,10 @@ impl Forwarder {
filter_forwarding_result.total_filter_packets_us, filter_forwarding_result.total_filter_packets_us,
Ordering::Relaxed, Ordering::Relaxed,
); );
banking_stage_stats.dropped_forward_packets_count.fetch_add(
filter_forwarding_result.total_dropped_packets,
Ordering::Relaxed,
);
forward_packet_batches_by_accounts forward_packet_batches_by_accounts
.iter_batches() .iter_batches()

View File

@ -556,7 +556,7 @@ impl ThreadLocalUnprocessedPackets {
let mut total_forwardable_packets: usize = 0; let mut total_forwardable_packets: usize = 0;
let mut total_packet_conversion_us: u64 = 0; let mut total_packet_conversion_us: u64 = 0;
let mut total_filter_packets_us: u64 = 0; let mut total_filter_packets_us: u64 = 0;
let mut dropped_tx_before_forwarding_count: usize = 0; let mut total_dropped_packets: usize = 0;
let mut original_priority_queue = self.take_priority_queue(); let mut original_priority_queue = self.take_priority_queue();
let original_capacity = original_priority_queue.capacity(); let original_capacity = original_priority_queue.capacity();
@ -589,7 +589,11 @@ impl ThreadLocalUnprocessedPackets {
(Vec<SanitizedTransaction>, Vec<usize>), (Vec<SanitizedTransaction>, Vec<usize>),
_, _,
) = measure!( ) = measure!(
self.sanitize_unforwarded_packets(&packets_to_forward, &bank), self.sanitize_unforwarded_packets(
&packets_to_forward,
&bank,
&mut total_dropped_packets
),
"sanitize_packet", "sanitize_packet",
); );
saturating_add_assign!( saturating_add_assign!(
@ -598,7 +602,11 @@ impl ThreadLocalUnprocessedPackets {
); );
let (forwardable_transaction_indexes, filter_packets_time) = measure!( let (forwardable_transaction_indexes, filter_packets_time) = measure!(
Self::filter_invalid_transactions(&sanitized_transactions, &bank), Self::filter_invalid_transactions(
&sanitized_transactions,
&bank,
&mut total_dropped_packets
),
"filter_packets", "filter_packets",
); );
saturating_add_assign!( saturating_add_assign!(
@ -622,7 +630,7 @@ impl ThreadLocalUnprocessedPackets {
&sanitized_transactions, &sanitized_transactions,
&transaction_to_packet_indexes, &transaction_to_packet_indexes,
&forwardable_transaction_indexes, &forwardable_transaction_indexes,
&mut dropped_tx_before_forwarding_count, &mut total_dropped_packets,
&bank.feature_set, &bank.feature_set,
); );
accepting_packets = accepted_packet_indexes.len() accepting_packets = accepted_packet_indexes.len()
@ -644,10 +652,7 @@ impl ThreadLocalUnprocessedPackets {
) )
} else { } else {
// skip sanitizing and filtering if not longer able to add more packets for forwarding // skip sanitizing and filtering if not longer able to add more packets for forwarding
saturating_add_assign!( saturating_add_assign!(total_dropped_packets, packets_to_forward.len());
dropped_tx_before_forwarding_count,
packets_to_forward.len()
);
packets_to_forward packets_to_forward
}, },
] ]
@ -667,15 +672,11 @@ impl ThreadLocalUnprocessedPackets {
.len() .len()
); );
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
dropped_tx_before_forwarding_count
);
FilterForwardingResults { FilterForwardingResults {
total_forwardable_packets, total_forwardable_packets,
total_tracer_packets_in_buffer, total_tracer_packets_in_buffer,
total_forwardable_tracer_packets, total_forwardable_tracer_packets,
total_dropped_packets,
total_packet_conversion_us, total_packet_conversion_us,
total_filter_packets_us, total_filter_packets_us,
} }
@ -711,6 +712,7 @@ impl ThreadLocalUnprocessedPackets {
&mut self, &mut self,
packets_to_process: &[Arc<ImmutableDeserializedPacket>], packets_to_process: &[Arc<ImmutableDeserializedPacket>],
bank: &Arc<Bank>, bank: &Arc<Bank>,
total_dropped_packets: &mut usize,
) -> (Vec<SanitizedTransaction>, Vec<usize>) { ) -> (Vec<SanitizedTransaction>, Vec<usize>) {
// Get ref of ImmutableDeserializedPacket // Get ref of ImmutableDeserializedPacket
let deserialized_packets = packets_to_process.iter().map(|p| &**p); let deserialized_packets = packets_to_process.iter().map(|p| &**p);
@ -728,14 +730,9 @@ impl ThreadLocalUnprocessedPackets {
}) })
.unzip(); .unzip();
// report metrics
inc_new_counter_info!("banking_stage-packet_conversion", 1); inc_new_counter_info!("banking_stage-packet_conversion", 1);
let unsanitized_packets_filtered_count = let filtered_count = packets_to_process.len().saturating_sub(transactions.len());
packets_to_process.len().saturating_sub(transactions.len()); saturating_add_assign!(*total_dropped_packets, filtered_count);
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
unsanitized_packets_filtered_count
);
(transactions, transaction_to_packet_indexes) (transactions, transaction_to_packet_indexes)
} }
@ -744,6 +741,7 @@ impl ThreadLocalUnprocessedPackets {
fn filter_invalid_transactions( fn filter_invalid_transactions(
transactions: &[SanitizedTransaction], transactions: &[SanitizedTransaction],
bank: &Arc<Bank>, bank: &Arc<Bank>,
total_dropped_packets: &mut usize,
) -> Vec<usize> { ) -> Vec<usize> {
let filter = vec![Ok(()); transactions.len()]; let filter = vec![Ok(()); transactions.len()];
let results = bank.check_transactions_with_forwarding_delay( let results = bank.check_transactions_with_forwarding_delay(
@ -751,12 +749,9 @@ impl ThreadLocalUnprocessedPackets {
&filter, &filter,
FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET,
); );
// report metrics
let filtered_out_transactions_count = transactions.len().saturating_sub(results.len()); let filtered_count = transactions.len().saturating_sub(results.len());
inc_new_counter_info!( saturating_add_assign!(*total_dropped_packets, filtered_count);
"banking_stage-dropped_tx_before_forwarding",
filtered_out_transactions_count
);
results results
.iter() .iter()
@ -785,7 +780,7 @@ impl ThreadLocalUnprocessedPackets {
transactions: &[SanitizedTransaction], transactions: &[SanitizedTransaction],
transaction_to_packet_indexes: &[usize], transaction_to_packet_indexes: &[usize],
forwardable_transaction_indexes: &[usize], forwardable_transaction_indexes: &[usize],
dropped_tx_before_forwarding_count: &mut usize, total_dropped_packets: &mut usize,
feature_set: &FeatureSet, feature_set: &FeatureSet,
) -> Vec<usize> { ) -> Vec<usize> {
let mut added_packets_count: usize = 0; let mut added_packets_count: usize = 0;
@ -807,11 +802,10 @@ impl ThreadLocalUnprocessedPackets {
saturating_add_assign!(added_packets_count, 1); saturating_add_assign!(added_packets_count, 1);
} }
// count the packets not being forwarded in this batch let filtered_count = forwardable_transaction_indexes
saturating_add_assign!( .len()
*dropped_tx_before_forwarding_count, .saturating_sub(added_packets_count);
forwardable_transaction_indexes.len() - added_packets_count saturating_add_assign!(*total_dropped_packets, filtered_count);
);
accepted_packet_indexes accepted_packet_indexes
} }