diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 0c4cee978..776b6cfdf 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -10,6 +10,7 @@ use { rayon::prelude::*, solana_core::{ banking_stage::{BankingStage, BankingStageStats}, + leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, qos_service::QosService, }, solana_entry::entry::{next_hash, Entry}, @@ -98,6 +99,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &BankingStageStats::default(), &recorder, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), + &mut LeaderSlotMetricsTracker::new(0), ); }); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 944fd67bd..12d3b2c1d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -2,7 +2,10 @@ //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. use { - crate::qos_service::QosService, + crate::{ + leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary}, + qos_service::QosService, + }, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, histogram::Histogram, itertools::Itertools, @@ -85,47 +88,6 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; -/// A summary of what happened to transactions passed to the execution pipeline. -/// Transactions can -/// 1) Did not even make it to execution due to being filtered out by things like AccountInUse -/// lock conflictss or CostModel compute limits. These types of errors are retryable and -/// counted in `Self::retryable_transaction_indexes`. -/// 2) Did not execute due to some fatal error like too old, or duplicate signature. These -/// will be dropped from the transactions queue and not counted in `Self::retryable_transaction_indexes` -/// 3) Were executed and committed, captured by `committed_transactions_count` below. -/// 4) Were executed and failed commit, captured by `failed_commit_count` below. -struct ProcessTransactionsSummary { - // Returns true if we hit the end of the block/max PoH height for the block before - // processing all the transactions in the batch. - reached_max_poh_height: bool, - - // Total number of transactions that were passed as candidates for execution. See description - // of struct above for possible outcomes for these transactions - #[allow(dead_code)] - transactions_attempted_execution_count: usize, - - // Total number of transactions that made it into the block - #[allow(dead_code)] - committed_transactions_count: usize, - - // Total number of transactions that made it into the block where the transactions - // output from execution was success/no error. - #[allow(dead_code)] - committed_transactions_with_successful_result_count: usize, - - // All transactions that were executed but then failed record because the - // slot ended - #[allow(dead_code)] - failed_commit_count: usize, - - // Indexes of transactions in the transactions slice that were not committed but are retryable - retryable_transaction_indexes: Vec, - - // The number of transactions filtered out by the cost model - #[allow(dead_code)] - cost_model_throttled_transactions_count: usize, -} - pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model cost_model_throttled_transactions_count: usize, @@ -164,6 +126,7 @@ pub struct BankingStageStats { current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, + end_of_slot_filtered_invalid_count: AtomicUsize, batch_packet_indexes_len: Histogram, // Timing @@ -171,7 +134,6 @@ pub struct BankingStageStats { receive_and_buffer_packets_elapsed: AtomicU64, handle_retryable_packets_elapsed: AtomicU64, filter_pending_packets_elapsed: AtomicU64, - pub(crate) packet_duplicate_check_elapsed: AtomicU64, packet_conversion_elapsed: AtomicU64, unprocessed_packet_conversion_elapsed: AtomicU64, transaction_processing_elapsed: AtomicU64, @@ -215,7 +177,6 @@ impl BankingStageStats { .handle_retryable_packets_elapsed .load(Ordering::Relaxed) + self.filter_pending_packets_elapsed.load(Ordering::Relaxed) - + self.packet_duplicate_check_elapsed.load(Ordering::Relaxed) + self.packet_conversion_elapsed.load(Ordering::Relaxed) + self .unprocessed_packet_conversion_elapsed @@ -283,6 +244,12 @@ impl BankingStageStats { .swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "end_of_slot_filtered_invalid_count", + self.end_of_slot_filtered_invalid_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "consume_buffered_packets_elapsed", self.consume_buffered_packets_elapsed @@ -307,12 +274,6 @@ impl BankingStageStats { .swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "packet_duplicate_check_elapsed", - self.packet_duplicate_check_elapsed - .swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "packet_conversion_elapsed", self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64, @@ -485,13 +446,14 @@ impl BankingStage { .collect() } + /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns + /// the number of successfully forwarded packets in second part of tuple fn forward_buffered_packets( socket: &std::net::UdpSocket, tpu_forwards: &std::net::SocketAddr, - buffered_packet_batches: &UnprocessedPacketBatches, + packets: Vec<&Packet>, data_budget: &DataBudget, - ) -> std::io::Result<()> { - let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); + ) -> (std::io::Result<()>, usize) { const INTERVAL_MS: u64 = 100; const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; @@ -518,11 +480,11 @@ impl BankingStage { inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec.len()); if let Err(SendPktsError::IoError(ioerr, _num_failed)) = batch_send(socket, &packet_vec) { - return Err(ioerr); + return (Err(ioerr), 0); } } - Ok(()) + (Ok(()), packet_vec.len()) } // Returns whether the given `PacketBatch` has any more remaining unprocessed @@ -551,6 +513,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, qos_service: &QosService, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { let mut rebuffered_packet_count = 0; let mut consumed_buffered_packets_count = 0; @@ -566,7 +529,7 @@ impl BankingStage { if let Some((next_leader, bank)) = &reached_end_of_slot { // We've hit the end of this slot, no need to perform more processing, // just filter the remaining packets for the invalid (e.g. too old) ones - let new_unprocessed_indexes = Self::filter_unprocessed_packets( + let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot( bank, packet_batch, original_unprocessed_indexes, @@ -574,6 +537,19 @@ impl BankingStage { *next_leader, banking_stage_stats, ); + + let end_of_slot_filtered_invalid_count = original_unprocessed_indexes + .len() + .saturating_sub(new_unprocessed_indexes.len()); + + slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( + end_of_slot_filtered_invalid_count as u64, + ); + + banking_stage_stats + .end_of_slot_filtered_invalid_count + .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); + Self::update_buffered_packets_with_new_unprocessed( original_unprocessed_indexes, new_unprocessed_indexes, @@ -595,6 +571,7 @@ impl BankingStage { gossip_vote_sender, banking_stage_stats, qos_service, + slot_metrics_tracker, ); let ProcessTransactionsSummary { reached_max_poh_height, @@ -719,6 +696,7 @@ impl BankingStage { recorder: &TransactionRecorder, data_budget: &DataBudget, qos_service: &QosService, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> BufferedPacketsDecision { let bank_start; let ( @@ -760,6 +738,7 @@ impl BankingStage { banking_stage_stats, recorder, qos_service, + slot_metrics_tracker, ); } BufferedPacketsDecision::Forward => { @@ -771,6 +750,7 @@ impl BankingStage { socket, false, data_budget, + slot_metrics_tracker, ); } BufferedPacketsDecision::ForwardAndHold => { @@ -782,10 +762,12 @@ impl BankingStage { socket, true, data_budget, + slot_metrics_tracker, ); } _ => (), } + decision } @@ -797,6 +779,7 @@ impl BankingStage { socket: &UdpSocket, hold: bool, data_budget: &DataBudget, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { let addr = match forward_option { ForwardOption::NotForward => { @@ -814,13 +797,35 @@ impl BankingStage { Some(addr) => addr, None => return, }; - let _ = Self::forward_buffered_packets(socket, &addr, buffered_packet_batches, data_budget); + + let forwardable_packets = + Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); + let forwardable_packets_len = forwardable_packets.len(); + let (_forward_result, sucessful_forwarded_packets_count) = + Self::forward_buffered_packets(socket, &addr, forwardable_packets, data_budget); + let failed_forwarded_packets_count = + forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count); + + if failed_forwarded_packets_count > 0 { + slot_metrics_tracker + .increment_failed_forwarded_packets_count(failed_forwarded_packets_count as u64); + slot_metrics_tracker.increment_packet_batch_forward_failure_count(1); + } + + if sucessful_forwarded_packets_count > 0 { + slot_metrics_tracker.increment_successful_forwarded_packets_count( + sucessful_forwarded_packets_count as u64, + ); + } + if hold { buffered_packet_batches.retain(|(_, index, _)| !index.is_empty()); for (_, _, forwarded) in buffered_packet_batches.iter_mut() { *forwarded = true; } } else { + slot_metrics_tracker + .increment_cleared_from_buffer_after_forward_count(forwardable_packets_len as u64); buffered_packet_batches.clear(); } } @@ -844,6 +849,7 @@ impl BankingStage { let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); let qos_service = QosService::new(cost_model, id); + let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); loop { let my_pubkey = cluster_info.id(); while !buffered_packet_batches.is_empty() { @@ -860,6 +866,7 @@ impl BankingStage { &recorder, data_budget, &qos_service, + &mut slot_metrics_tracker, ); if matches!(decision, BufferedPacketsDecision::Hold) || matches!(decision, BufferedPacketsDecision::ForwardAndHold) @@ -870,6 +877,12 @@ impl BankingStage { } } + let current_poh_bank = { + let poh = poh_recorder.lock().unwrap(); + poh.bank_start() + }; + slot_metrics_tracker.update_on_leader_slot_boundary(¤t_poh_bank); + let recv_timeout = if !buffered_packet_batches.is_empty() { // If packets are buffered, let's wait for less time on recv from the channel. // This helps detect the next leader faster, and processing the buffered @@ -888,6 +901,7 @@ impl BankingStage { batch_limit, &mut buffered_packet_batches, &mut banking_stage_stats, + &mut slot_metrics_tracker, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break, @@ -1312,6 +1326,7 @@ impl BankingStage { // If `bank_creation_time` is None, it's a test so ignore the option so // allow processing + // TODO adding timing metrics here from when bank was added to now let should_bank_still_be_processing_txs = Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot); match ( @@ -1469,6 +1484,7 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, qos_service: &QosService, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> ProcessTransactionsSummary { let mut packet_conversion_time = Measure::start("packet_conversion"); let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( @@ -1499,6 +1515,11 @@ impl BankingStage { .. } = process_transactions_summary; + slot_metrics_tracker.accumulate_process_transactions_summary(&process_transactions_summary); + + let retryable_tx_count = retryable_transaction_indexes.len(); + inc_new_counter_info!("banking_stage-unprocessed_transactions", retryable_tx_count); + let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); let filtered_retryable_tx_indexes = Self::filter_pending_packets_from_pending_txs( bank, @@ -1508,6 +1529,12 @@ impl BankingStage { ); filter_pending_packets_time.stop(); + let retryable_packets_filtered_count = retryable_transaction_indexes + .len() + .saturating_sub(filtered_retryable_tx_indexes.len()); + slot_metrics_tracker + .increment_retryable_packets_filtered_count(retryable_packets_filtered_count as u64); + inc_new_counter_info!( "banking_stage-dropped_tx_before_forwarding", retryable_transaction_indexes @@ -1530,7 +1557,7 @@ impl BankingStage { process_transactions_summary } - fn filter_unprocessed_packets( + fn filter_unprocessed_packets_at_end_of_slot( bank: &Arc, packet_batch: &PacketBatch, transaction_indexes: &[usize], @@ -1600,6 +1627,7 @@ impl BankingStage { batch_limit: usize, buffered_packet_batches: &mut UnprocessedPacketBatches, banking_stage_stats: &mut BankingStageStats, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("receive_and_buffer_packets_recv"); let packet_batches = verified_receiver.recv_timeout(recv_timeout)?; @@ -1622,6 +1650,15 @@ impl BankingStage { let mut newly_buffered_packets_count = 0; for packet_batch in packet_batch_iter { let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); + // Track all the packets incoming from sigverify, both valid and invalid + slot_metrics_tracker.increment_total_new_valid_packets(packet_indexes.len() as u64); + slot_metrics_tracker.increment_newly_failed_sigverify_count( + packet_batch + .packets + .len() + .saturating_sub(packet_indexes.len()) as u64, + ); + Self::push_unprocessed( buffered_packet_batches, packet_batch, @@ -1631,6 +1668,7 @@ impl BankingStage { &mut newly_buffered_packets_count, batch_limit, banking_stage_stats, + slot_metrics_tracker, ); } proc_start.stop(); @@ -1681,12 +1719,16 @@ impl BankingStage { newly_buffered_packets_count: &mut usize, batch_limit: usize, banking_stage_stats: &mut BankingStageStats, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { if unprocessed_packet_batches.len() >= batch_limit { *dropped_packet_batches_count += 1; if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() { *dropped_packets_count += dropped_batch.1.len(); + slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count( + dropped_batch.1.len() as u64, + ); } } let _ = banking_stage_stats @@ -1694,6 +1736,8 @@ impl BankingStage { .increment(packet_indexes.len() as u64); *newly_buffered_packets_count += packet_indexes.len(); + slot_metrics_tracker + .increment_newly_buffered_packets_count(packet_indexes.len() as u64); unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false)); } } @@ -3296,6 +3340,7 @@ mod tests { &BankingStageStats::default(), &recorder, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), + &mut LeaderSlotMetricsTracker::new(0), ); assert_eq!( buffered_packet_batches[0].1.len(), @@ -3316,6 +3361,7 @@ mod tests { &BankingStageStats::default(), &recorder, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), + &mut LeaderSlotMetricsTracker::new(0), ); if num_expected_unprocessed == 0 { assert!(buffered_packet_batches.is_empty()) @@ -3382,6 +3428,7 @@ mod tests { &BankingStageStats::default(), &recorder, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), + &mut LeaderSlotMetricsTracker::new(0), ); // Check everything is correct. All indexes after `interrupted_iteration` @@ -3479,6 +3526,7 @@ mod tests { &send_socket, true, &data_budget, + &mut LeaderSlotMetricsTracker::new(0), ); recv_socket @@ -3578,6 +3626,7 @@ mod tests { &send_socket, hold, &DataBudget::default(), + &mut LeaderSlotMetricsTracker::new(0), ); recv_socket @@ -3639,6 +3688,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &mut banking_stage_stats, + &mut LeaderSlotMetricsTracker::new(0), ); assert_eq!(unprocessed_packets.len(), 1); assert_eq!(dropped_packet_batches_count, 0); @@ -3657,6 +3707,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &mut banking_stage_stats, + &mut LeaderSlotMetricsTracker::new(0), ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(dropped_packet_batches_count, 0); @@ -3680,6 +3731,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &mut banking_stage_stats, + &mut LeaderSlotMetricsTracker::new(0), ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!( diff --git a/core/src/leader_slot_banking_stage_metrics.rs b/core/src/leader_slot_banking_stage_metrics.rs new file mode 100644 index 000000000..1c4dfa081 --- /dev/null +++ b/core/src/leader_slot_banking_stage_metrics.rs @@ -0,0 +1,723 @@ +use { + solana_poh::poh_recorder::BankStart, + solana_sdk::{clock::Slot, saturating_add_assign}, + std::time::Instant, +}; + +/// A summary of what happened to transactions passed to the execution pipeline. +/// Transactions can +/// 1) Did not even make it to execution due to being filtered out by things like AccountInUse +/// lock conflicts or CostModel compute limits. These types of errors are retryable and +/// counted in `Self::retryable_transaction_indexes`. +/// 2) Did not execute due to some fatal error like too old, or duplicate signature. These +/// will be dropped from the transactions queue and not counted in `Self::retryable_transaction_indexes` +/// 3) Were executed and committed, captured by `committed_transactions_count` below. +/// 4) Were executed and failed commit, captured by `failed_commit_count` below. +pub(crate) struct ProcessTransactionsSummary { + // Returns true if we hit the end of the block/max PoH height for the block before + // processing all the transactions in the batch. + pub reached_max_poh_height: bool, + + // Total number of transactions that were passed as candidates for execution. See description + // of struct above for possible outcomes for these transactions + pub transactions_attempted_execution_count: usize, + + // Total number of transactions that made it into the block + pub committed_transactions_count: usize, + + // Total number of transactions that made it into the block where the transactions + // output from execution was success/no error. + pub committed_transactions_with_successful_result_count: usize, + + // All transactions that were executed but then failed record because the + // slot ended + pub failed_commit_count: usize, + + // Indexes of transactions in the transactions slice that were not committed but are retryable + pub retryable_transaction_indexes: Vec, + + // The number of transactions filtered out by the cost model + pub cost_model_throttled_transactions_count: usize, +} + +// Metrics capturing wallclock time spent in various parts of BankingStage during this +// validator's leader slot +#[derive(Debug)] +struct LeaderSlotTimingMetrics { + bank_detected_time: Instant, + + // Delay from when the bank was created to when this thread detected it + bank_detected_delay_us: u64, +} + +impl LeaderSlotTimingMetrics { + fn new(bank_creation_time: &Instant) -> Self { + Self { + bank_detected_time: Instant::now(), + bank_detected_delay_us: bank_creation_time.elapsed().as_micros() as u64, + } + } + + fn report(&self, id: u32, slot: Slot) { + let bank_detected_to_now = self.bank_detected_time.elapsed().as_micros() as u64; + datapoint_info!( + "banking_stage-leader_slot_loop_timings", + ("id", id as i64, i64), + ("slot", slot as i64, i64), + ("bank_detected_to_now_us", bank_detected_to_now, i64), + ( + "bank_creation_to_now_us", + bank_detected_to_now + self.bank_detected_delay_us, + i64 + ), + ("bank_detected_delay_us", self.bank_detected_delay_us, i64), + ); + } +} + +// Metrics describing packets ingested/processed in various parts of BankingStage during this +// validator's leader slot +#[derive(Debug, Default)] +struct LeaderSlotPacketCountMetrics { + // total number of live packets TPU received from verified receiver for processing. + total_new_valid_packets: u64, + + // total number of packets TPU received from sigverify that failed signature verification. + newly_failed_sigverify_count: u64, + + // total number of dropped packet due to the thread's buffered packets capacity being reached. + exceeded_buffer_limit_dropped_packets_count: u64, + + // total number of packets that got added to the pending buffer after arriving to BankingStage + newly_buffered_packets_count: u64, + + // total number of transactions in the buffer that were filtered out due to things like age and + // duplicate signature checks + retryable_packets_filtered_count: u64, + + // total number of transactions that attempted execution in this slot. Should equal the sum + // of `committed_transactions_count`, `retryable_errored_transaction_count`, and + // `nonretryable_errored_transactions_count`. + transactions_attempted_execution_count: u64, + + // total number of transactions that were executed and committed into the block + // on this thread + committed_transactions_count: u64, + + // total number of transactions that were executed, got a successful execution output/no error, + // and were then committed into the block + committed_transactions_with_successful_result_count: u64, + + // total number of transactions that were not executed or failed commit, BUT were added back to the buffered + // queue becaus they were retryable errors + retryable_errored_transaction_count: u64, + + // total number of transactions that attempted execution due to some fatal error (too old, duplicate signature, etc.) + // AND were dropped from the buffered queue + nonretryable_errored_transactions_count: u64, + + // total number of transactions that were executed, but failed to be committed into the Poh stream because + // the block ended. Some of these may be already counted in `nonretryable_errored_transactions_count` if they + // then hit the age limit after failing to be comitted. + executed_transactions_failed_commit_count: u64, + + // total number of transactions that were excluded from the block because they were too expensive + // according to the cost model. These transactions are added back to the buffered queue and are + // already counted in `self.retrayble_errored_transaction_count`. + cost_model_throttled_transactions_count: u64, + + // total number of forwardsable packets that failed forwarding + failed_forwarded_packets_count: u64, + + // total number of forwardsable packets that were successfully forwarded + successful_forwarded_packets_count: u64, + + // total number of attempted forwards that failed. Note this is not a count of the number of packets + // that failed, just the total number of batches of packets that failed forwarding + packet_batch_forward_failure_count: u64, + + // total number of valid unprocessed packets in the buffer that were removed after being forwarded + cleared_from_buffer_after_forward_count: u64, + + // total number of packets removed at the end of the slot due to being too old, duplicate, etc. + end_of_slot_filtered_invalid_count: u64, +} + +impl LeaderSlotPacketCountMetrics { + fn new() -> Self { + Self { ..Self::default() } + } + + fn report(&self, id: u32, slot: Slot) { + datapoint_info!( + "banking_stage-leader_slot_packet_counts", + ("id", id as i64, i64), + ("slot", slot as i64, i64), + ( + "total_new_valid_packets", + self.total_new_valid_packets as i64, + i64 + ), + ( + "newly_failed_sigverify_count", + self.newly_failed_sigverify_count as i64, + i64 + ), + ( + "exceeded_buffer_limit_dropped_packets_count", + self.exceeded_buffer_limit_dropped_packets_count as i64, + i64 + ), + ( + "newly_buffered_packets_count", + self.newly_buffered_packets_count as i64, + i64 + ), + ( + "retryable_packets_filtered_count", + self.retryable_packets_filtered_count as i64, + i64 + ), + ( + "transactions_attempted_execution_count", + self.transactions_attempted_execution_count as i64, + i64 + ), + ( + "committed_transactions_count", + self.committed_transactions_count as i64, + i64 + ), + ( + "committed_transactions_with_successful_result_count", + self.committed_transactions_with_successful_result_count as i64, + i64 + ), + ( + "retryable_errored_transaction_count", + self.retryable_errored_transaction_count as i64, + i64 + ), + ( + "nonretryable_errored_transactions_count", + self.nonretryable_errored_transactions_count as i64, + i64 + ), + ( + "executed_transactions_failed_commit_count", + self.executed_transactions_failed_commit_count as i64, + i64 + ), + ( + "cost_model_throttled_transactions_count", + self.cost_model_throttled_transactions_count as i64, + i64 + ), + ( + "failed_forwarded_packets_count", + self.failed_forwarded_packets_count as i64, + i64 + ), + ( + "successful_forwarded_packets_count", + self.successful_forwarded_packets_count as i64, + i64 + ), + ( + "packet_batch_forward_failure_count", + self.packet_batch_forward_failure_count as i64, + i64 + ), + ( + "cleared_from_buffer_after_forward_count", + self.cleared_from_buffer_after_forward_count as i64, + i64 + ), + ( + "end_of_slot_filtered_invalid_count", + self.end_of_slot_filtered_invalid_count as i64, + i64 + ), + ); + } +} + +#[derive(Debug)] +pub(crate) struct LeaderSlotMetrics { + // banking_stage creates one QosService instance per working threads, that is uniquely + // identified by id. This field allows to categorize metrics for gossip votes, TPU votes + // and other transactions. + id: u32, + + // aggregate metrics per slot + slot: Slot, + + packet_count_metrics: LeaderSlotPacketCountMetrics, + + timing_metrics: LeaderSlotTimingMetrics, + + // Used by tests to check if the `self.report()` method was called + is_reported: bool, +} + +impl LeaderSlotMetrics { + pub(crate) fn new(id: u32, slot: Slot, bank_creation_time: &Instant) -> Self { + Self { + id, + slot, + packet_count_metrics: LeaderSlotPacketCountMetrics::new(), + timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time), + is_reported: false, + } + } + + pub(crate) fn report(&mut self) { + self.is_reported = true; + + self.timing_metrics.report(self.id, self.slot); + self.packet_count_metrics.report(self.id, self.slot); + } + + /// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None + fn reported_slot(&self) -> Option { + if self.is_reported { + Some(self.slot) + } else { + None + } + } +} + +#[derive(Debug)] +pub struct LeaderSlotMetricsTracker { + // Only `Some` if BankingStage detects it's time to construct our leader slot, + // otherwise `None` + leader_slot_metrics: Option, + id: u32, +} + +impl LeaderSlotMetricsTracker { + pub fn new(id: u32) -> Self { + Self { + leader_slot_metrics: None, + id, + } + } + + // Returns reported slot if metrics were reported + pub(crate) fn update_on_leader_slot_boundary( + &mut self, + bank_start: &Option, + ) -> Option { + match (self.leader_slot_metrics.as_mut(), bank_start) { + (None, None) => None, + + (Some(leader_slot_metrics), None) => { + leader_slot_metrics.report(); + // Ensure tests catch that `report()` method was called + let reported_slot = leader_slot_metrics.reported_slot(); + // Slot has ended, time to report metrics + self.leader_slot_metrics = None; + reported_slot + } + + (None, Some(bank_start)) => { + // Our leader slot has begain, time to create a new slot tracker + self.leader_slot_metrics = Some(LeaderSlotMetrics::new( + self.id, + bank_start.working_bank.slot(), + &bank_start.bank_creation_time, + )); + self.leader_slot_metrics.as_ref().unwrap().reported_slot() + } + + (Some(leader_slot_metrics), Some(bank_start)) => { + if leader_slot_metrics.slot != bank_start.working_bank.slot() { + // Last slot has ended, new slot has began + leader_slot_metrics.report(); + // Ensure tests catch that `report()` method was called + let reported_slot = leader_slot_metrics.reported_slot(); + self.leader_slot_metrics = Some(LeaderSlotMetrics::new( + self.id, + bank_start.working_bank.slot(), + &bank_start.bank_creation_time, + )); + reported_slot + } else { + leader_slot_metrics.reported_slot() + } + } + } + } + + pub(crate) fn accumulate_process_transactions_summary( + &mut self, + process_transactions_summary: &ProcessTransactionsSummary, + ) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + let ProcessTransactionsSummary { + transactions_attempted_execution_count, + committed_transactions_count, + committed_transactions_with_successful_result_count, + failed_commit_count, + ref retryable_transaction_indexes, + cost_model_throttled_transactions_count, + .. + } = process_transactions_summary; + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .transactions_attempted_execution_count, + *transactions_attempted_execution_count as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .committed_transactions_count, + *committed_transactions_count as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .committed_transactions_with_successful_result_count, + *committed_transactions_with_successful_result_count as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .executed_transactions_failed_commit_count, + *failed_commit_count as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .retryable_errored_transaction_count, + retryable_transaction_indexes.len() as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .nonretryable_errored_transactions_count, + transactions_attempted_execution_count + .saturating_sub(*committed_transactions_count) + .saturating_sub(retryable_transaction_indexes.len()) as u64 + ); + + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .cost_model_throttled_transactions_count, + *cost_model_throttled_transactions_count as u64 + ); + } + } + + pub(crate) fn increment_total_new_valid_packets(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .total_new_valid_packets, + count + ); + } + } + + pub(crate) fn increment_newly_failed_sigverify_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .newly_failed_sigverify_count, + count + ); + } + } + + pub(crate) fn increment_exceeded_buffer_limit_dropped_packets_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .exceeded_buffer_limit_dropped_packets_count, + count + ); + } + } + + pub(crate) fn increment_newly_buffered_packets_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .newly_buffered_packets_count, + count + ); + } + } + + pub(crate) fn increment_retryable_packets_filtered_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .retryable_packets_filtered_count, + count + ); + } + } + + pub(crate) fn increment_failed_forwarded_packets_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .failed_forwarded_packets_count, + count + ); + } + } + + pub(crate) fn increment_successful_forwarded_packets_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .successful_forwarded_packets_count, + count + ); + } + } + + pub(crate) fn increment_packet_batch_forward_failure_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .packet_batch_forward_failure_count, + count + ); + } + } + + pub(crate) fn increment_cleared_from_buffer_after_forward_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .cleared_from_buffer_after_forward_count, + count + ); + } + } + + pub(crate) fn increment_end_of_slot_filtered_invalid_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .end_of_slot_filtered_invalid_count, + count + ); + } + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + solana_runtime::{bank::Bank, genesis_utils::create_genesis_config}, + solana_sdk::pubkey::Pubkey, + std::sync::Arc, + }; + + struct TestSlotBoundaryComponents { + first_bank: Arc, + first_poh_recorder_bank: BankStart, + next_bank: Arc, + next_poh_recorder_bank: BankStart, + leader_slot_metrics_tracker: LeaderSlotMetricsTracker, + } + + fn setup_test_slot_boundary_banks() -> TestSlotBoundaryComponents { + let genesis = create_genesis_config(10); + let first_bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config)); + let first_poh_recorder_bank = BankStart { + working_bank: first_bank.clone(), + bank_creation_time: Arc::new(Instant::now()), + }; + + // Create a child descended from the first bank + let next_bank = Arc::new(Bank::new_from_parent( + &first_bank, + &Pubkey::new_unique(), + first_bank.slot() + 1, + )); + let next_poh_recorder_bank = BankStart { + working_bank: next_bank.clone(), + bank_creation_time: Arc::new(Instant::now()), + }; + + let banking_stage_thread_id = 0; + let leader_slot_metrics_tracker = LeaderSlotMetricsTracker::new(banking_stage_thread_id); + + TestSlotBoundaryComponents { + first_bank, + first_poh_recorder_bank, + next_bank, + next_poh_recorder_bank, + leader_slot_metrics_tracker, + } + } + + #[test] + pub fn test_update_on_leader_slot_boundary_not_leader_to_not_leader() { + let TestSlotBoundaryComponents { + mut leader_slot_metrics_tracker, + .. + } = setup_test_slot_boundary_banks(); + // Test that with no bank being tracked, and no new bank being tracked, nothing is reported + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .is_none()); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } + + #[test] + pub fn test_update_on_leader_slot_boundary_not_leader_to_leader() { + let TestSlotBoundaryComponents { + first_poh_recorder_bank, + mut leader_slot_metrics_tracker, + .. + } = setup_test_slot_boundary_banks(); + + // Test case where the thread has not detected a leader bank, and now sees a leader bank. + // Metrics should not be reported because leader slot has not ended + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) + .is_none()); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_some()); + } + + #[test] + pub fn test_update_on_leader_slot_boundary_leader_to_not_leader() { + let TestSlotBoundaryComponents { + first_bank, + first_poh_recorder_bank, + mut leader_slot_metrics_tracker, + .. + } = setup_test_slot_boundary_banks(); + + // Test case where the thread has a leader bank, and now detects there's no more leader bank, + // implying the slot has ended. Metrics should be reported for `first_bank.slot()`, + // because that leader slot has just ended. + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) + .is_none()); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .unwrap(), + first_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .is_none()); + } + + #[test] + pub fn test_update_on_leader_slot_boundary_leader_to_leader_same_slot() { + let TestSlotBoundaryComponents { + first_bank, + first_poh_recorder_bank, + mut leader_slot_metrics_tracker, + .. + } = setup_test_slot_boundary_banks(); + + // Test case where the thread has a leader bank, and now detects the same leader bank, + // implying the slot is still running. Metrics should not be reported + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank.clone())) + .is_none()); + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) + .is_none()); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .unwrap(), + first_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } + + #[test] + pub fn test_update_on_leader_slot_boundary_leader_to_leader_bigger_slot() { + let TestSlotBoundaryComponents { + first_bank, + first_poh_recorder_bank, + next_bank, + next_poh_recorder_bank, + mut leader_slot_metrics_tracker, + } = setup_test_slot_boundary_banks(); + + // Test case where the thread has a leader bank, and now detects there's a new leader bank + // for a bigger slot, implying the slot has ended. Metrics should be reported for the + // smaller slot + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) + .is_none()); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(next_poh_recorder_bank)) + .unwrap(), + first_bank.slot() + ); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .unwrap(), + next_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } + + #[test] + pub fn test_update_on_leader_slot_boundary_leader_to_leader_smaller_slot() { + let TestSlotBoundaryComponents { + first_bank, + first_poh_recorder_bank, + next_bank, + next_poh_recorder_bank, + mut leader_slot_metrics_tracker, + } = setup_test_slot_boundary_banks(); + // Test case where the thread has a leader bank, and now detects there's a new leader bank + // for a samller slot, implying the slot has ended. Metrics should be reported for the + // bigger slot + assert!(leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(next_poh_recorder_bank)) + .is_none()); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&Some(first_poh_recorder_bank)) + .unwrap(), + next_bank.slot() + ); + assert_eq!( + leader_slot_metrics_tracker + .update_on_leader_slot_boundary(&None) + .unwrap(), + first_bank.slot() + ); + assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 5a0a59314..641f23969 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -28,6 +28,7 @@ pub mod fork_choice; pub mod gen_keys; pub mod heaviest_subtree_fork_choice; pub mod latest_validator_votes_for_frozen_banks; +pub mod leader_slot_banking_stage_metrics; pub mod ledger_cleanup_service; pub mod optimistic_confirmation_verifier; pub mod outstanding_requests; diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index 64679b315..7fabcca26 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -117,8 +117,9 @@ impl QosService { txs_costs } - // Given a list of transactions and their costs, this function returns a corresponding - // list of Results that indicate if a transaction is selected to be included in the current block, + /// Given a list of transactions and their costs, this function returns a corresponding + /// list of Results that indicate if a transaction is selected to be included in the current block, + /// and a count of the number of transactions that would fit in the block pub fn select_transactions_per_cost<'a>( &self, transactions: impl Iterator, @@ -174,37 +175,6 @@ impl QosService { .unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err)); } - // metrics accumulating apis - pub fn accumulate_tpu_ingested_packets_count(&self, count: u64) { - self.metrics - .tpu_ingested_packets_count - .fetch_add(count, Ordering::Relaxed); - } - - pub fn accumulate_tpu_buffered_packets_count(&self, count: u64) { - self.metrics - .tpu_buffered_packets_count - .fetch_add(count, Ordering::Relaxed); - } - - pub fn accumulated_verified_txs_count(&self, count: u64) { - self.metrics - .verified_txs_count - .fetch_add(count, Ordering::Relaxed); - } - - pub fn accumulated_processed_txs_count(&self, count: u64) { - self.metrics - .processed_txs_count - .fetch_add(count, Ordering::Relaxed); - } - - pub fn accumulated_retryable_txs_count(&self, count: u64) { - self.metrics - .retryable_txs_count - .fetch_add(count, Ordering::Relaxed); - } - pub fn accumulate_estimated_transaction_costs( &self, cost_details: &BatchedTransactionCostDetails, @@ -263,24 +233,6 @@ struct QosServiceMetrics { // aggregate metrics per slot slot: AtomicU64, - // accumulated number of live packets TPU received from verified receiver for processing. - tpu_ingested_packets_count: AtomicU64, - - // accumulated number of live packets TPU put into buffer due to no active bank. - tpu_buffered_packets_count: AtomicU64, - - // accumulated number of verified txs, which excludes unsanitized transactions and - // non-vote transactions when in vote-only mode from ingested packets - verified_txs_count: AtomicU64, - - // accumulated number of transactions been processed, includes those landed and those to be - // returned (due to AccountInUse, and other QoS related reasons) - processed_txs_count: AtomicU64, - - // accumulated number of transactions buffered for retry, often due to AccountInUse and QoS - // reasons, includes retried_txs_per_block_limit_count and retried_txs_per_account_limit_count - retryable_txs_count: AtomicU64, - // accumulated time in micro-sec spent in computing transaction cost. It is the main performance // overhead introduced by cost_model compute_cost_time: AtomicU64, @@ -342,31 +294,6 @@ impl QosServiceMetrics { "qos-service-stats", ("id", self.id as i64, i64), ("bank_slot", bank_slot as i64, i64), - ( - "tpu_ingested_packets_count", - self.tpu_ingested_packets_count.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "tpu_buffered_packets_count", - self.tpu_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "verified_txs_count", - self.verified_txs_count.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "processed_txs_count", - self.processed_txs_count.swap(0, Ordering::Relaxed) as i64, - i64 - ), - ( - "retryable_txs_count", - self.retryable_txs_count.swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "compute_cost_time", self.compute_cost_time.swap(0, Ordering::Relaxed) as i64, diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 514f75770..480d377d2 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -60,6 +60,7 @@ type Result = std::result::Result; pub type WorkingBankEntry = (Arc, (Entry, u64)); +#[derive(Clone)] pub struct BankStart { pub working_bank: Arc, pub bank_creation_time: Arc,