From 619335df1a87b58b7c7d07de20bcb3f56b58b925 Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 17 Feb 2022 01:14:32 -0500 Subject: [PATCH] Add execute timings (#23097) --- core/src/banking_stage.rs | 839 ++++++++++++------ core/src/leader_slot_banking_stage_metrics.rs | 214 ++++- ...eader_slot_banking_stage_timing_metrics.rs | 286 ++++++ core/src/lib.rs | 1 + program-runtime/src/timings.rs | 1 + 5 files changed, 1015 insertions(+), 326 deletions(-) create mode 100644 core/src/leader_slot_banking_stage_timing_metrics.rs diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 4930af4d40..4f294865a8 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -4,6 +4,9 @@ use { crate::{ leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary}, + leader_slot_banking_stage_timing_metrics::{ + LeaderExecuteAndCommitTimings, RecordTransactionsTimings, + }, qos_service::QosService, }, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, @@ -91,9 +94,20 @@ const MIN_THREADS_BANKING: u32 = 1; pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model cost_model_throttled_transactions_count: usize, + // Amount of time spent running the cost model + cost_model_us: u64, execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput, } +struct RecordTransactionsSummary { + // Metrics describing how time was spent recording transactions + record_transactions_timings: RecordTransactionsTimings, + // Result of trying to record the transactions into the PoH stream + result: Result, + // Transactions that failed record, and are retryable + retryable_indexes: Vec, +} + pub struct ExecuteAndCommitTransactionsOutput { // Total number of transactions that were passed as candidates for execution transactions_attempted_execution_count: usize, @@ -110,7 +124,7 @@ pub struct ExecuteAndCommitTransactionsOutput { // committed into the Poh stream. If so, the result tells us // how many such transactions were committed commit_transactions_result: Result<(), PohRecorderError>, - execute_timings: ExecuteTimings, + execute_and_commit_timings: LeaderExecuteAndCommitTimings, } #[derive(Debug, Default)] @@ -533,58 +547,90 @@ impl BankingStage { let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) = buffered_packet_batch_and_offsets; if let Some(end_of_slot) = &reached_end_of_slot { - // We've hit the end of this slot, no need to perform more processing, - // just filter the remaining packets for the invalid (e.g. too old) ones - // if the working_bank is available - if let Some(bank) = &end_of_slot.working_bank { - let new_unprocessed_indexes = - Self::filter_unprocessed_packets_at_end_of_slot( - bank, - packet_batch, - original_unprocessed_indexes, - my_pubkey, - end_of_slot.next_slot_leader, - banking_stage_stats, - ); + let (should_retain, end_of_slot_filtering_time) = Measure::this( + |_| { + // We've hit the end of this slot, no need to perform more processing, + // just filter the remaining packets for the invalid (e.g. too old) ones + // if the working_bank is available + if let Some(bank) = &end_of_slot.working_bank { + let new_unprocessed_indexes = + Self::filter_unprocessed_packets_at_end_of_slot( + bank, + packet_batch, + original_unprocessed_indexes, + my_pubkey, + end_of_slot.next_slot_leader, + banking_stage_stats, + ); - let end_of_slot_filtered_invalid_count = original_unprocessed_indexes - .len() - .saturating_sub(new_unprocessed_indexes.len()); + let end_of_slot_filtered_invalid_count = + original_unprocessed_indexes + .len() + .saturating_sub(new_unprocessed_indexes.len()); - slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( - end_of_slot_filtered_invalid_count as u64, - ); + slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( + end_of_slot_filtered_invalid_count as u64, + ); - banking_stage_stats - .end_of_slot_filtered_invalid_count - .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); + banking_stage_stats + .end_of_slot_filtered_invalid_count + .fetch_add( + end_of_slot_filtered_invalid_count, + Ordering::Relaxed, + ); - Self::update_buffered_packets_with_new_unprocessed( - original_unprocessed_indexes, - new_unprocessed_indexes, - ) - } else { - true - } + Self::update_buffered_packets_with_new_unprocessed( + original_unprocessed_indexes, + new_unprocessed_indexes, + ) + } else { + true + } + }, + (), + "end_of_slot_filtering", + ); + slot_metrics_tracker + .increment_end_of_slot_filtering_us(end_of_slot_filtering_time.as_us()); + should_retain } else { - let bank_start = poh_recorder.lock().unwrap().bank_start(); + let (bank_start, poh_recorder_lock_time) = Measure::this( + |_| poh_recorder.lock().unwrap().bank_start(), + (), + "poh_recorder_lock", + ); + slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us( + poh_recorder_lock_time.as_us(), + ); + if let Some(BankStart { working_bank, bank_creation_time, }) = bank_start { - let process_transactions_summary = Self::process_packets_transactions( - &working_bank, - &bank_creation_time, - recorder, - packet_batch, - original_unprocessed_indexes.to_owned(), - transaction_status_sender.clone(), - gossip_vote_sender, - banking_stage_stats, - qos_service, - slot_metrics_tracker, + let (process_transactions_summary, process_packets_transactions_time) = + Measure::this( + |_| { + Self::process_packets_transactions( + &working_bank, + &bank_creation_time, + recorder, + packet_batch, + original_unprocessed_indexes.to_owned(), + transaction_status_sender.clone(), + gossip_vote_sender, + banking_stage_stats, + qos_service, + slot_metrics_tracker, + ) + }, + (), + "process_packets_transactions", + ); + slot_metrics_tracker.increment_process_packets_transactions_us( + process_packets_transactions_time.as_us(), ); + let ProcessTransactionsSummary { reached_max_poh_height, retryable_transaction_indexes, @@ -592,16 +638,29 @@ impl BankingStage { } = process_transactions_summary; if reached_max_poh_height - // TODO adding timing metrics here from when bank was added to now || !Bank::should_bank_still_be_processing_txs( &bank_creation_time, max_tx_ingestion_ns, ) { - reached_end_of_slot = Some(EndOfSlot { - next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(), - working_bank: Some(working_bank), - }); + let poh_recorder_lock_time = { + let (poh_recorder_locked, poh_recorder_lock_time) = Measure::this( + |_| poh_recorder.lock().unwrap(), + (), + "poh_recorder_lock", + ); + + reached_end_of_slot = Some(EndOfSlot { + next_slot_leader: poh_recorder_locked.next_slot_leader(), + working_bank: Some(working_bank), + }); + poh_recorder_lock_time + }; + + slot_metrics_tracker + .increment_consume_buffered_packets_poh_recorder_lock_us( + poh_recorder_lock_time.as_us(), + ); } // The difference between all transactions passed to execution and the ones that @@ -630,10 +689,23 @@ impl BankingStage { } else { // mark as end-of-slot to avoid aggressively lock poh for the remaining for // packet batches in buffer - reached_end_of_slot = Some(EndOfSlot { - next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(), - working_bank: None, - }); + let poh_recorder_lock_time = { + let (poh_recorder_locked, poh_recorder_lock_time) = Measure::this( + |_| poh_recorder.lock().unwrap(), + (), + "poh_recorder_lock", + ); + + reached_end_of_slot = Some(EndOfSlot { + next_slot_leader: poh_recorder_locked.next_slot_leader(), + working_bank: None, + }); + poh_recorder_lock_time + }; + slot_metrics_tracker + .increment_consume_buffered_packets_poh_recorder_lock_us( + poh_recorder_lock_time.as_us(), + ); // `original_unprocessed_indexes` must have remaining packets to process // if not yet processed. @@ -717,72 +789,102 @@ impl BankingStage { qos_service: &QosService, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> BufferedPacketsDecision { - let bank_start; - let ( - leader_at_slot_offset, - bank_still_processing_txs, - would_be_leader, - would_be_leader_shortly, - ) = { - let poh = poh_recorder.lock().unwrap(); - bank_start = poh.bank_start(); - ( - poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET), - PohRecorder::get_working_bank_if_not_expired(&bank_start.as_ref()), - poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT), - poh.would_be_leader( - (FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT, - ), - ) - }; + let (decision, make_decision_time) = Measure::this( + |_| { + let bank_start; + let ( + leader_at_slot_offset, + bank_still_processing_txs, + would_be_leader, + would_be_leader_shortly, + ) = { + let poh = poh_recorder.lock().unwrap(); + bank_start = poh.bank_start(); + ( + poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET), + PohRecorder::get_working_bank_if_not_expired(&bank_start.as_ref()), + poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT), + poh.would_be_leader( + (FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) + * DEFAULT_TICKS_PER_SLOT, + ), + ) + }; - let decision = Self::consume_or_forward_packets( - my_pubkey, - leader_at_slot_offset, - bank_still_processing_txs, - would_be_leader, - would_be_leader_shortly, + Self::consume_or_forward_packets( + my_pubkey, + leader_at_slot_offset, + bank_still_processing_txs, + would_be_leader, + would_be_leader_shortly, + ) + }, + (), + "make_decision", ); + slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us()); match decision { BufferedPacketsDecision::Consume(max_tx_ingestion_ns) => { - Self::consume_buffered_packets( - my_pubkey, - max_tx_ingestion_ns, - poh_recorder, - buffered_packet_batches, - transaction_status_sender, - gossip_vote_sender, - None::>, - banking_stage_stats, - recorder, - qos_service, - slot_metrics_tracker, + let (_, consume_buffered_packets_time) = Measure::this( + |_| { + Self::consume_buffered_packets( + my_pubkey, + max_tx_ingestion_ns, + poh_recorder, + buffered_packet_batches, + transaction_status_sender, + gossip_vote_sender, + None::>, + banking_stage_stats, + recorder, + qos_service, + slot_metrics_tracker, + ) + }, + (), + "consume_buffered_packets", ); + slot_metrics_tracker + .increment_consume_buffered_packets_us(consume_buffered_packets_time.as_us()); } BufferedPacketsDecision::Forward => { - Self::handle_forwarding( - forward_option, - cluster_info, - buffered_packet_batches, - poh_recorder, - socket, - false, - data_budget, - slot_metrics_tracker, + let (_, forward_time) = Measure::this( + |_| { + Self::handle_forwarding( + forward_option, + cluster_info, + buffered_packet_batches, + poh_recorder, + socket, + false, + data_budget, + slot_metrics_tracker, + ) + }, + (), + "forward", ); + slot_metrics_tracker.increment_forward_us(forward_time.as_us()); } BufferedPacketsDecision::ForwardAndHold => { - Self::handle_forwarding( - forward_option, - cluster_info, - buffered_packet_batches, - poh_recorder, - socket, - true, - data_budget, - slot_metrics_tracker, + let (_, forward_and_hold_time) = Measure::this( + |_| { + Self::handle_forwarding( + forward_option, + cluster_info, + buffered_packet_batches, + poh_recorder, + socket, + true, + data_budget, + slot_metrics_tracker, + ) + }, + (), + "forward_and_hold", ); + slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_time.as_us()); } _ => (), } @@ -872,21 +974,30 @@ impl BankingStage { loop { let my_pubkey = cluster_info.id(); while !buffered_packet_batches.is_empty() { - let decision = Self::process_buffered_packets( - &my_pubkey, - &socket, - poh_recorder, - cluster_info, - &mut buffered_packet_batches, - &forward_option, - transaction_status_sender.clone(), - &gossip_vote_sender, - &banking_stage_stats, - &recorder, - data_budget, - &qos_service, - &mut slot_metrics_tracker, + let (decision, process_buffered_packets_time) = Measure::this( + |_| { + Self::process_buffered_packets( + &my_pubkey, + &socket, + poh_recorder, + cluster_info, + &mut buffered_packet_batches, + &forward_option, + transaction_status_sender.clone(), + &gossip_vote_sender, + &banking_stage_stats, + &recorder, + data_budget, + &qos_service, + &mut slot_metrics_tracker, + ) + }, + (), + "process_buffered_packets", ); + slot_metrics_tracker + .increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); + if matches!(decision, BufferedPacketsDecision::Hold) || matches!(decision, BufferedPacketsDecision::ForwardAndHold) { @@ -896,11 +1007,20 @@ impl BankingStage { } } - let current_poh_bank = { - let poh = poh_recorder.lock().unwrap(); - poh.bank_start() - }; - slot_metrics_tracker.update_on_leader_slot_boundary(¤t_poh_bank); + let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this( + |_| { + let current_poh_bank = { + let poh = poh_recorder.lock().unwrap(); + poh.bank_start() + }; + slot_metrics_tracker.update_on_leader_slot_boundary(¤t_poh_bank); + }, + (), + "slot_metrics_checker_check_slot_boundary", + ); + slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us( + slot_metrics_checker_check_slot_boundary_time.as_us(), + ); let recv_timeout = if !buffered_packet_batches.is_empty() { // If packets are buffered, let's wait for less time on recv from the channel. @@ -912,20 +1032,29 @@ impl BankingStage { Duration::from_millis(100) }; - match Self::receive_and_buffer_packets( - verified_receiver, - recv_start, - recv_timeout, - id, - batch_limit, - &mut buffered_packet_batches, - &mut banking_stage_stats, - &mut slot_metrics_tracker, - ) { + let (res, receive_and_buffer_packets_time) = Measure::this( + |_| { + Self::receive_and_buffer_packets( + verified_receiver, + recv_start, + recv_timeout, + id, + batch_limit, + &mut buffered_packet_batches, + &mut banking_stage_stats, + &mut slot_metrics_tracker, + ) + }, + (), + "receive_and_buffer_packets", + ); + slot_metrics_tracker + .increment_receive_and_buffer_packets_us(receive_and_buffer_packets_time.as_us()); + + match res { Ok(()) | Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break, } - banking_stage_stats.report(1000); } } @@ -945,23 +1074,32 @@ impl BankingStage { txs: &[SanitizedTransaction], execution_results: &[TransactionExecutionResult], recorder: &TransactionRecorder, - ) -> (Result, Vec) { - let mut processed_generation = Measure::start("record::process_generation"); - let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) = - execution_results - .iter() - .zip(txs) - .enumerate() - .filter_map(|(i, (execution_result, tx))| { - if execution_result.was_executed() { - Some((tx.to_versioned_transaction(), i)) - } else { - None - } - }) - .unzip(); + ) -> RecordTransactionsSummary { + let mut record_transactions_timings = RecordTransactionsTimings::default(); + let ( + (processed_transactions, processed_transactions_indexes), + execution_results_to_transactions_time, + ): ((Vec<_>, Vec<_>), Measure) = Measure::this( + |_| { + execution_results + .iter() + .zip(txs) + .enumerate() + .filter_map(|(i, (execution_result, tx))| { + if execution_result.was_executed() { + Some((tx.to_versioned_transaction(), i)) + } else { + None + } + }) + .unzip() + }, + (), + " execution_results_to_transactions", + ); + record_transactions_timings.execution_results_to_transactions_us = + execution_results_to_transactions_time.as_us(); - processed_generation.stop(); let num_to_commit = processed_transactions.len(); debug!("num_to_commit: {} ", num_to_commit); // unlock all the accounts with errors which are filtered by the above `filter_map` @@ -969,13 +1107,21 @@ impl BankingStage { inc_new_counter_info!("banking_stage-record_count", 1); inc_new_counter_info!("banking_stage-record_transactions", num_to_commit); - let mut hash_time = Measure::start("record::hash"); - let hash = hash_transactions(&processed_transactions[..]); - hash_time.stop(); + let (hash, hash_time) = Measure::this( + |_| hash_transactions(&processed_transactions[..]), + (), + "hash", + ); + record_transactions_timings.hash_us = hash_time.as_us(); - let mut poh_record = Measure::start("record::poh_record"); // record and unlock will unlock all the successful transactions - let res = recorder.record(bank_slot, hash, processed_transactions); + let (res, poh_record_time) = Measure::this( + |_| recorder.record(bank_slot, hash, processed_transactions), + (), + "hash", + ); + record_transactions_timings.poh_record_us = poh_record_time.as_us(); + match res { Ok(()) => (), Err(PohRecorderError::MaxHeightReached) => { @@ -986,16 +1132,21 @@ impl BankingStage { ); // If record errors, add all the committable transactions (the ones // we just attempted to record) as retryable - return ( - Err(PohRecorderError::MaxHeightReached), - processed_transactions_indexes, - ); + return RecordTransactionsSummary { + record_transactions_timings, + result: Err(PohRecorderError::MaxHeightReached), + retryable_indexes: processed_transactions_indexes, + }; } Err(e) => panic!("Poh recorder returned unexpected error: {:?}", e), } - poh_record.stop(); } - (Ok(num_to_commit), vec![]) + + RecordTransactionsSummary { + record_transactions_timings, + result: (Ok(num_to_commit)), + retryable_indexes: vec![], + } } fn execute_and_commit_transactions_locked( @@ -1005,34 +1156,48 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, ) -> ExecuteAndCommitTransactionsOutput { - let mut load_execute_time = Measure::start("load_execute_time"); - // Use a shorter maximum age when adding transactions into the pipeline. This will reduce - // the likelihood of any single thread getting starved and processing old ids. - // TODO: Banking stage threads should be prioritized to complete faster then this queue - // expires. - let pre_balances = if transaction_status_sender.is_some() { - bank.collect_balances(batch) - } else { - vec![] - }; - + let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default(); let mut mint_decimals: HashMap = HashMap::new(); - let pre_token_balances = if transaction_status_sender.is_some() { - collect_token_balances(bank, batch, &mut mint_decimals) - } else { - vec![] - }; + let ((pre_balances, pre_token_balances), collect_balances_time) = Measure::this( + |_| { + // Use a shorter maximum age when adding transactions into the pipeline. This will reduce + // the likelihood of any single thread getting starved and processing old ids. + // TODO: Banking stage threads should be prioritized to complete faster then this queue + // expires. + let pre_balances = if transaction_status_sender.is_some() { + bank.collect_balances(batch) + } else { + vec![] + }; - let mut execute_timings = ExecuteTimings::default(); - let load_and_execute_transactions_output = bank.load_and_execute_transactions( - batch, - MAX_PROCESSING_AGE, - transaction_status_sender.is_some(), - transaction_status_sender.is_some(), - &mut execute_timings, + let pre_token_balances = if transaction_status_sender.is_some() { + collect_token_balances(bank, batch, &mut mint_decimals) + } else { + vec![] + }; + + (pre_balances, pre_token_balances) + }, + (), + "collect_balances", ); - load_execute_time.stop(); + execute_and_commit_timings.collect_balances_us = collect_balances_time.as_us(); + + let (load_and_execute_transactions_output, load_execute_time) = Measure::this( + |_| { + bank.load_and_execute_transactions( + batch, + MAX_PROCESSING_AGE, + transaction_status_sender.is_some(), + transaction_status_sender.is_some(), + &mut execute_and_commit_timings.execute_timings, + ) + }, + (), + "load_execute", + ); + execute_and_commit_timings.load_execute_us = load_execute_time.as_us(); let LoadAndExecuteTransactionsOutput { mut loaded_transactions, @@ -1044,16 +1209,31 @@ impl BankingStage { .. } = load_and_execute_transactions_output; - let freeze_lock = bank.freeze_lock(); + let (freeze_lock, freeze_lock_time) = + Measure::this(|_| bank.freeze_lock(), (), "freeze_lock"); + execute_and_commit_timings.freeze_lock_us = freeze_lock_time.as_us(); + + let (record_transactions_summary, record_time) = Measure::this( + |_| { + Self::record_transactions( + bank.slot(), + batch.sanitized_transactions(), + &execution_results, + poh, + ) + }, + (), + "record_transactions", + ); + execute_and_commit_timings.record_us = record_time.as_us(); + + let RecordTransactionsSummary { + result: commit_transactions_result, + retryable_indexes: retryable_record_transaction_indexes, + record_transactions_timings, + } = record_transactions_summary; + execute_and_commit_timings.record_transactions_timings = record_transactions_timings; - let mut record_time = Measure::start("record_time"); - let (commit_transactions_result, retryable_record_transaction_indexes) = - Self::record_transactions( - bank.slot(), - batch.sanitized_transactions(), - &execution_results, - poh, - ); inc_new_counter_info!( "banking_stage-record_transactions_num_to_commit", *commit_transactions_result.as_ref().unwrap_or(&0) @@ -1071,45 +1251,69 @@ impl BankingStage { executed_with_successful_result_count, retryable_transaction_indexes, commit_transactions_result: Err(e), - execute_timings, + execute_and_commit_timings, }; } - record_time.stop(); - let mut commit_time = Measure::start("commit_time"); let sanitized_txs = batch.sanitized_transactions(); let committed_transaction_count = commit_transactions_result.unwrap(); // Note: `committed_transaction_count` should equal `executed_transactions_count`, since // every executed transaction should have been recorded into the Poh stream if the record // was successful (there's no partial records). - if committed_transaction_count != 0 { - let tx_results = bank.commit_transactions( - sanitized_txs, - &mut loaded_transactions, - execution_results, - executed_transactions_count as u64, - executed_transactions_count.saturating_sub(executed_with_successful_result_count) - as u64, - signature_count, - &mut execute_timings, + let commit_time_us = if committed_transaction_count != 0 { + let (tx_results, commit_time) = Measure::this( + |_| { + bank.commit_transactions( + sanitized_txs, + &mut loaded_transactions, + execution_results, + executed_transactions_count as u64, + executed_transactions_count + .saturating_sub(executed_with_successful_result_count) + as u64, + signature_count, + &mut execute_and_commit_timings.execute_timings, + ) + }, + (), + "commit", ); + let commit_time_us = commit_time.as_us(); + execute_and_commit_timings.commit_us = commit_time_us; - bank_utils::find_and_send_votes(sanitized_txs, &tx_results, Some(gossip_vote_sender)); - if let Some(transaction_status_sender) = transaction_status_sender { - let txs = batch.sanitized_transactions().to_vec(); - let post_balances = bank.collect_balances(batch); - let post_token_balances = collect_token_balances(bank, batch, &mut mint_decimals); - transaction_status_sender.send_transaction_status_batch( - bank.clone(), - txs, - tx_results.execution_results, - TransactionBalancesSet::new(pre_balances, post_balances), - TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances), - tx_results.rent_debits, - ); - } - } - commit_time.stop(); + let (_, find_and_send_votes_time) = Measure::this( + |_| { + bank_utils::find_and_send_votes( + sanitized_txs, + &tx_results, + Some(gossip_vote_sender), + ); + if let Some(transaction_status_sender) = transaction_status_sender { + let txs = batch.sanitized_transactions().to_vec(); + let post_balances = bank.collect_balances(batch); + let post_token_balances = + collect_token_balances(bank, batch, &mut mint_decimals); + transaction_status_sender.send_transaction_status_batch( + bank.clone(), + txs, + tx_results.execution_results, + TransactionBalancesSet::new(pre_balances, post_balances), + TransactionTokenBalancesSet::new( + pre_token_balances, + post_token_balances, + ), + tx_results.rent_debits, + ); + } + }, + (), + "find_and_send_votes", + ); + execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_time.as_us(); + commit_time_us + } else { + 0 + }; drop(freeze_lock); @@ -1118,13 +1322,13 @@ impl BankingStage { bank.slot(), load_execute_time.as_us(), record_time.as_us(), - commit_time.as_us(), + commit_time_us, sanitized_txs.len(), ); debug!( "execute_and_commit_transactions_locked: {:?}", - execute_timings + execute_and_commit_timings.execute_timings, ); ExecuteAndCommitTransactionsOutput { @@ -1133,7 +1337,7 @@ impl BankingStage { executed_with_successful_result_count, retryable_transaction_indexes, commit_transactions_result: Ok(()), - execute_timings, + execute_and_commit_timings, } } @@ -1146,19 +1350,31 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, qos_service: &QosService, ) -> ProcessTransactionBatchOutput { - let tx_costs = qos_service.compute_transaction_costs(txs.iter()); + let ((transactions_qos_results, cost_model_throttled_transactions_count), cost_model_time) = + Measure::this( + |_| { + let tx_costs = qos_service.compute_transaction_costs(txs.iter()); - let (transactions_qos_results, num_included) = - qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank); + let (transactions_qos_results, num_included) = + qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank); - let cost_model_throttled_transactions_count = txs.len().saturating_sub(num_included); + let cost_model_throttled_transactions_count = + txs.len().saturating_sub(num_included); - qos_service.accumulate_estimated_transaction_costs( - &Self::accumulate_batched_transaction_costs( - tx_costs.iter(), - transactions_qos_results.iter(), - ), - ); + qos_service.accumulate_estimated_transaction_costs( + &Self::accumulate_batched_transaction_costs( + tx_costs.iter(), + transactions_qos_results.iter(), + ), + ); + ( + transactions_qos_results, + cost_model_throttled_transactions_count, + ) + }, + (), + "cost_model", + ); // Only lock accounts for those transactions are selected for the block; // Once accounts are locked, other threads cannot encode transactions that will modify the @@ -1171,7 +1387,6 @@ impl BankingStage { // retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit // WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit // and WouldExceedMaxAccountDataCostLimit - let mut execute_and_commit_transactions_output = Self::execute_and_commit_transactions_locked( bank, @@ -1183,7 +1398,7 @@ impl BankingStage { let ExecuteAndCommitTransactionsOutput { ref mut retryable_transaction_indexes, - ref execute_timings, + ref execute_and_commit_timings, .. } = execute_and_commit_transactions_output; @@ -1196,7 +1411,8 @@ impl BankingStage { drop(batch); unlock_time.stop(); - let (cu, us) = Self::accumulate_execute_units_and_time(execute_timings); + let (cu, us) = + Self::accumulate_execute_units_and_time(&execute_and_commit_timings.execute_timings); qos_service.accumulate_actual_execute_cu(cu); qos_service.accumulate_actual_execute_time(us); @@ -1213,6 +1429,7 @@ impl BankingStage { ProcessTransactionBatchOutput { cost_model_throttled_transactions_count, + cost_model_us: cost_model_time.as_us(), execute_and_commit_transactions_output, } } @@ -1286,6 +1503,8 @@ impl BankingStage { // slot ended let mut total_failed_commit_count: usize = 0; let mut total_cost_model_throttled_transactions_count: usize = 0; + let mut total_cost_model_us: u64 = 0; + let mut total_execute_and_commit_timings = LeaderExecuteAndCommitTimings::default(); let mut reached_max_poh_height = false; while chunk_start != transactions.len() { let chunk_end = std::cmp::min( @@ -1304,11 +1523,13 @@ impl BankingStage { let ProcessTransactionBatchOutput { cost_model_throttled_transactions_count: new_cost_model_throttled_transactions_count, + cost_model_us: new_cost_model_us, execute_and_commit_transactions_output, } = process_transaction_batch_output; total_cost_model_throttled_transactions_count = total_cost_model_throttled_transactions_count .saturating_add(new_cost_model_throttled_transactions_count); + total_cost_model_us = total_cost_model_us.saturating_add(new_cost_model_us); let ExecuteAndCommitTransactionsOutput { transactions_attempted_execution_count: new_transactions_attempted_execution_count, @@ -1316,9 +1537,11 @@ impl BankingStage { executed_with_successful_result_count: new_executed_with_successful_result_count, retryable_transaction_indexes: new_retryable_transaction_indexes, commit_transactions_result: new_commit_transactions_result, + execute_and_commit_timings: new_execute_and_commit_timings, .. } = execute_and_commit_transactions_output; + total_execute_and_commit_timings.accumulate(&new_execute_and_commit_timings); total_transactions_attempted_execution_count = total_transactions_attempted_execution_count .saturating_add(new_transactions_attempted_execution_count); @@ -1345,7 +1568,6 @@ impl BankingStage { // If `bank_creation_time` is None, it's a test so ignore the option so // allow processing - // TODO adding timing metrics here from when bank was added to now let should_bank_still_be_processing_txs = Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot); match ( @@ -1380,6 +1602,8 @@ impl BankingStage { failed_commit_count: total_failed_commit_count, retryable_transaction_indexes: all_retryable_tx_indexes, cost_model_throttled_transactions_count: total_cost_model_throttled_transactions_count, + cost_model_us: total_cost_model_us, + execute_and_commit_timings: total_execute_and_commit_timings, } } @@ -1505,29 +1729,48 @@ impl BankingStage { qos_service: &QosService, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> ProcessTransactionsSummary { - let mut packet_conversion_time = Measure::start("packet_conversion"); - let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( - packet_batch, - &packet_indexes, - &bank.feature_set, - bank.vote_only_bank(), - bank.as_ref(), + // Convert packets to transactions + let ((transactions, transaction_to_packet_indexes), packet_conversion_time) = Measure::this( + |_| { + Self::transactions_from_packets( + packet_batch, + &packet_indexes, + &bank.feature_set, + bank.vote_only_bank(), + bank.as_ref(), + ) + }, + (), + "packet_conversion", ); - packet_conversion_time.stop(); + let packet_conversion_us = packet_conversion_time.as_us(); + slot_metrics_tracker.increment_transactions_from_packets_us(packet_conversion_us); + banking_stage_stats + .packet_conversion_elapsed + .fetch_add(packet_conversion_us, Ordering::Relaxed); inc_new_counter_info!("banking_stage-packet_conversion", 1); - let mut process_tx_time = Measure::start("process_tx_time"); - - let mut process_transactions_summary = Self::process_transactions( - bank, - bank_creation_time, - &transactions, - poh, - transaction_status_sender, - gossip_vote_sender, - qos_service, + // Process transactions + let (mut process_transactions_summary, process_transactions_time) = Measure::this( + |_| { + Self::process_transactions( + bank, + bank_creation_time, + &transactions, + poh, + transaction_status_sender, + gossip_vote_sender, + qos_service, + ) + }, + (), + "process_transaction_time", ); - process_tx_time.stop(); + let process_transactions_us = process_transactions_time.as_us(); + slot_metrics_tracker.increment_process_transactions_us(process_transactions_us); + banking_stage_stats + .transaction_processing_elapsed + .fetch_add(process_transactions_us, Ordering::Relaxed); let ProcessTransactionsSummary { ref retryable_transaction_indexes, @@ -1539,18 +1782,29 @@ impl BankingStage { let retryable_tx_count = retryable_transaction_indexes.len(); inc_new_counter_info!("banking_stage-unprocessed_transactions", retryable_tx_count); - let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); - let filtered_retryable_tx_indexes = Self::filter_pending_packets_from_pending_txs( - bank, - &transactions, - &transaction_to_packet_indexes, - retryable_transaction_indexes, + // Filter out transactions that can't be retried + let (filtered_retryable_transaction_indexes, filter_retryable_packets_time) = Measure::this( + |_| { + Self::filter_pending_packets_from_pending_txs( + bank, + &transactions, + &transaction_to_packet_indexes, + retryable_transaction_indexes, + ) + }, + (), + "filter_pending_packets_time", ); - filter_pending_packets_time.stop(); + let filter_retryable_packets_us = filter_retryable_packets_time.as_us(); + slot_metrics_tracker + .increment_filter_retryable_packets_us(filter_retryable_packets_us as u64); + banking_stage_stats + .filter_pending_packets_elapsed + .fetch_add(filter_retryable_packets_us, Ordering::Relaxed); let retryable_packets_filtered_count = retryable_transaction_indexes .len() - .saturating_sub(filtered_retryable_tx_indexes.len()); + .saturating_sub(filtered_retryable_transaction_indexes.len()); slot_metrics_tracker .increment_retryable_packets_filtered_count(retryable_packets_filtered_count as u64); @@ -1558,21 +1812,11 @@ impl BankingStage { "banking_stage-dropped_tx_before_forwarding", retryable_transaction_indexes .len() - .saturating_sub(filtered_retryable_tx_indexes.len()) + .saturating_sub(filtered_retryable_transaction_indexes.len()) ); - // Increment timing-based metrics - banking_stage_stats - .packet_conversion_elapsed - .fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed); - banking_stage_stats - .transaction_processing_elapsed - .fetch_add(process_tx_time.as_us(), Ordering::Relaxed); - banking_stage_stats - .filter_pending_packets_elapsed - .fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed); - - process_transactions_summary.retryable_transaction_indexes = filtered_retryable_tx_indexes; + process_transactions_summary.retryable_transaction_indexes = + filtered_retryable_transaction_indexes; process_transactions_summary } @@ -2276,19 +2520,25 @@ mod tests { 1, SystemError::ResultWithNegativeLamports.into(), ))); - let (res, retryable) = - BankingStage::record_transactions(bank.slot(), &txs, &results, &recorder); - res.unwrap(); - assert!(retryable.is_empty()); + let RecordTransactionsSummary { + result, + retryable_indexes, + .. + } = BankingStage::record_transactions(bank.slot(), &txs, &results, &recorder); + result.unwrap(); + assert!(retryable_indexes.is_empty()); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); assert_eq!(entry.transactions.len(), txs.len()); // Other TransactionErrors should not be recorded results[0] = TransactionExecutionResult::NotExecuted(TransactionError::AccountNotFound); - let (res, retryable) = - BankingStage::record_transactions(bank.slot(), &txs, &results, &recorder); - res.unwrap(); - assert!(retryable.is_empty()); + let RecordTransactionsSummary { + result, + retryable_indexes, + .. + } = BankingStage::record_transactions(bank.slot(), &txs, &results, &recorder); + result.unwrap(); + assert!(retryable_indexes.is_empty()); let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap(); assert_eq!(entry.transactions.len(), txs.len() - 1); @@ -2296,12 +2546,15 @@ mod tests { // record_transactions should throw MaxHeightReached and return the set of retryable // txs let next_slot = bank.slot() + 1; - let (res, retryable) = - BankingStage::record_transactions(next_slot, &txs, &results, &recorder); - assert_matches!(res, Err(PohRecorderError::MaxHeightReached)); + let RecordTransactionsSummary { + result, + retryable_indexes, + .. + } = BankingStage::record_transactions(next_slot, &txs, &results, &recorder); + assert_matches!(result, Err(PohRecorderError::MaxHeightReached)); // The first result was an error so it's filtered out. The second result was Ok(), // so it should be marked as retryable - assert_eq!(retryable, vec![1]); + assert_eq!(retryable_indexes, vec![1]); // Should receive nothing from PohRecorder b/c record failed assert!(entry_receiver.try_recv().is_err()); diff --git a/core/src/leader_slot_banking_stage_metrics.rs b/core/src/leader_slot_banking_stage_metrics.rs index 1c4dfa0814..b54d79c61f 100644 --- a/core/src/leader_slot_banking_stage_metrics.rs +++ b/core/src/leader_slot_banking_stage_metrics.rs @@ -1,4 +1,5 @@ use { + crate::leader_slot_banking_stage_timing_metrics::*, solana_poh::poh_recorder::BankStart, solana_sdk::{clock::Slot, saturating_add_assign}, std::time::Instant, @@ -38,41 +39,12 @@ pub(crate) struct ProcessTransactionsSummary { // The number of transactions filtered out by the cost model pub cost_model_throttled_transactions_count: usize, -} -// Metrics capturing wallclock time spent in various parts of BankingStage during this -// validator's leader slot -#[derive(Debug)] -struct LeaderSlotTimingMetrics { - bank_detected_time: Instant, + // Total amount of time spent running the cost model + pub cost_model_us: u64, - // Delay from when the bank was created to when this thread detected it - bank_detected_delay_us: u64, -} - -impl LeaderSlotTimingMetrics { - fn new(bank_creation_time: &Instant) -> Self { - Self { - bank_detected_time: Instant::now(), - bank_detected_delay_us: bank_creation_time.elapsed().as_micros() as u64, - } - } - - fn report(&self, id: u32, slot: Slot) { - let bank_detected_to_now = self.bank_detected_time.elapsed().as_micros() as u64; - datapoint_info!( - "banking_stage-leader_slot_loop_timings", - ("id", id as i64, i64), - ("slot", slot as i64, i64), - ("bank_detected_to_now_us", bank_detected_to_now, i64), - ( - "bank_creation_to_now_us", - bank_detected_to_now + self.bank_detected_delay_us, - i64 - ), - ("bank_detected_delay_us", self.bank_detected_delay_us, i64), - ); - } + // Breakdown of time spent executing and comitting transactions + pub execute_and_commit_timings: LeaderExecuteAndCommitTimings, } // Metrics describing packets ingested/processed in various parts of BankingStage during this @@ -362,6 +334,8 @@ impl LeaderSlotMetricsTracker { failed_commit_count, ref retryable_transaction_indexes, cost_model_throttled_transactions_count, + cost_model_us, + ref execute_and_commit_timings, .. } = process_transactions_summary; @@ -415,9 +389,23 @@ impl LeaderSlotMetricsTracker { .cost_model_throttled_transactions_count, *cost_model_throttled_transactions_count as u64 ); + + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .process_packets_timings + .cost_model_us, + *cost_model_us as u64 + ); + + leader_slot_metrics + .timing_metrics + .execute_and_commit_timings + .accumulate(execute_and_commit_timings); } } + // Packet inflow/outflow/processing metrics pub(crate) fn increment_total_new_valid_packets(&mut self, count: u64) { if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { saturating_add_assign!( @@ -527,6 +515,166 @@ impl LeaderSlotMetricsTracker { ); } } + + // Outermost banking thread's loop timing metrics + pub(crate) fn increment_process_buffered_packets_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .outer_loop_timings + .process_buffered_packets_us, + us + ); + } + } + + pub(crate) fn increment_slot_metrics_check_slot_boundary_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .outer_loop_timings + .slot_metrics_check_slot_boundary_us, + us + ); + } + } + + pub(crate) fn increment_receive_and_buffer_packets_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .outer_loop_timings + .receive_and_buffer_packets_us, + us + ); + } + } + + // Processing buffer timing metrics + pub(crate) fn increment_make_decision_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .process_buffered_packets_timings + .make_decision_us, + us + ); + } + } + + pub(crate) fn increment_consume_buffered_packets_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .process_buffered_packets_timings + .consume_buffered_packets_us, + us + ); + } + } + + pub(crate) fn increment_forward_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .process_buffered_packets_timings + .forward_us, + us + ); + } + } + + pub(crate) fn increment_forward_and_hold_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .process_buffered_packets_timings + .forward_and_hold_us, + us + ); + } + } + + // Consuming buffered packets timing metrics + pub(crate) fn increment_end_of_slot_filtering_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .consume_buffered_packets_timings + .end_of_slot_filtering_us, + us + ); + } + } + + pub(crate) fn increment_consume_buffered_packets_poh_recorder_lock_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .consume_buffered_packets_timings + .poh_recorder_lock_us, + us + ); + } + } + + pub(crate) fn increment_process_packets_transactions_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .consume_buffered_packets_timings + .process_packets_transactions_us, + us + ); + } + } + + // Processing packets timing metrics + pub(crate) fn increment_transactions_from_packets_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .process_packets_timings + .transactions_from_packets_us, + us + ); + } + } + + pub(crate) fn increment_process_transactions_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .process_packets_timings + .process_transactions_us, + us + ); + } + } + + pub(crate) fn increment_filter_retryable_packets_us(&mut self, us: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .process_packets_timings + .filter_retryable_packets_us, + us + ); + } + } } #[cfg(test)] diff --git a/core/src/leader_slot_banking_stage_timing_metrics.rs b/core/src/leader_slot_banking_stage_timing_metrics.rs new file mode 100644 index 0000000000..b2da74977c --- /dev/null +++ b/core/src/leader_slot_banking_stage_timing_metrics.rs @@ -0,0 +1,286 @@ +use { + solana_program_runtime::timings::ExecuteTimings, + solana_sdk::{clock::Slot, saturating_add_assign}, + std::time::Instant, +}; + +#[derive(Default, Debug)] +pub struct LeaderExecuteAndCommitTimings { + pub collect_balances_us: u64, + pub load_execute_us: u64, + pub freeze_lock_us: u64, + pub record_us: u64, + pub commit_us: u64, + pub find_and_send_votes_us: u64, + pub record_transactions_timings: RecordTransactionsTimings, + pub execute_timings: ExecuteTimings, +} + +impl LeaderExecuteAndCommitTimings { + pub fn accumulate(&mut self, other: &LeaderExecuteAndCommitTimings) { + saturating_add_assign!(self.collect_balances_us, other.collect_balances_us); + saturating_add_assign!(self.load_execute_us, other.load_execute_us); + saturating_add_assign!(self.freeze_lock_us, other.freeze_lock_us); + saturating_add_assign!(self.record_us, other.record_us); + saturating_add_assign!(self.commit_us, other.commit_us); + saturating_add_assign!(self.find_and_send_votes_us, other.find_and_send_votes_us); + saturating_add_assign!(self.commit_us, other.commit_us); + self.record_transactions_timings + .accumulate(&other.record_transactions_timings); + self.execute_timings.accumulate(&other.execute_timings); + } + + pub fn report(&self, id: u32, slot: Slot) { + datapoint_info!( + "banking_stage-leader_slot_execute_and_commit_timings", + ("id", id as i64, i64), + ("slot", slot as i64, i64), + ("collect_balances_us", self.collect_balances_us as i64, i64), + ("load_execute_us", self.load_execute_us as i64, i64), + ("freeze_lock_us", self.freeze_lock_us as i64, i64), + ("record_us", self.record_us as i64, i64), + ("commit_us", self.commit_us as i64, i64), + ( + "find_and_send_votes_us", + self.find_and_send_votes_us as i64, + i64 + ), + ); + + datapoint_info!( + "banking_stage-leader_slot_record_timings", + ("id", id as i64, i64), + ("slot", slot as i64, i64), + ( + "execution_results_to_transactions_us", + self.record_transactions_timings + .execution_results_to_transactions_us as i64, + i64 + ), + ( + "hash_us", + self.record_transactions_timings.hash_us as i64, + i64 + ), + ( + "poh_record_us", + self.record_transactions_timings.poh_record_us as i64, + i64 + ), + ); + } +} + +#[derive(Default, Debug)] +pub struct RecordTransactionsTimings { + pub execution_results_to_transactions_us: u64, + pub hash_us: u64, + pub poh_record_us: u64, +} + +impl RecordTransactionsTimings { + pub fn accumulate(&mut self, other: &RecordTransactionsTimings) { + saturating_add_assign!( + self.execution_results_to_transactions_us, + other.execution_results_to_transactions_us + ); + saturating_add_assign!(self.hash_us, other.hash_us); + saturating_add_assign!(self.poh_record_us, other.poh_record_us); + } +} + +// Metrics capturing wallclock time spent in various parts of BankingStage during this +// validator's leader slot +#[derive(Debug)] +pub(crate) struct LeaderSlotTimingMetrics { + pub outer_loop_timings: OuterLoopTimings, + pub process_buffered_packets_timings: ProcessBufferedPacketsTimings, + pub consume_buffered_packets_timings: ConsumeBufferedPacketsTimings, + pub process_packets_timings: ProcessPacketsTimings, + pub execute_and_commit_timings: LeaderExecuteAndCommitTimings, +} + +impl LeaderSlotTimingMetrics { + pub(crate) fn new(bank_creation_time: &Instant) -> Self { + Self { + outer_loop_timings: OuterLoopTimings::new(bank_creation_time), + process_buffered_packets_timings: ProcessBufferedPacketsTimings::default(), + consume_buffered_packets_timings: ConsumeBufferedPacketsTimings::default(), + process_packets_timings: ProcessPacketsTimings::default(), + execute_and_commit_timings: LeaderExecuteAndCommitTimings::default(), + } + } + + pub(crate) fn report(&self, id: u32, slot: Slot) { + self.outer_loop_timings.report(id, slot); + self.process_buffered_packets_timings.report(id, slot); + self.consume_buffered_packets_timings.report(id, slot); + self.process_packets_timings.report(id, slot); + self.execute_and_commit_timings.report(id, slot); + } +} + +#[derive(Debug)] +pub(crate) struct OuterLoopTimings { + pub bank_detected_time: Instant, + + // Delay from when the bank was created to when this thread detected it + pub bank_detected_delay_us: u64, + + // Time spent processing buffered packets + pub process_buffered_packets_us: u64, + + // Time spent checking for slot boundary and reporting leader slot metrics + pub slot_metrics_check_slot_boundary_us: u64, + + // Time spent processing new incoming packets to the banking thread + pub receive_and_buffer_packets_us: u64, +} + +impl OuterLoopTimings { + fn new(bank_creation_time: &Instant) -> Self { + Self { + bank_detected_time: Instant::now(), + bank_detected_delay_us: bank_creation_time.elapsed().as_micros() as u64, + process_buffered_packets_us: 0, + slot_metrics_check_slot_boundary_us: 0, + receive_and_buffer_packets_us: 0, + } + } + + fn report(&self, id: u32, slot: Slot) { + let bank_detected_to_now_us = self.bank_detected_time.elapsed().as_micros() as u64; + datapoint_info!( + "banking_stage-leader_slot_loop_timings", + ("id", id as i64, i64), + ("slot", slot as i64, i64), + ( + "bank_detected_to_slot_end_detected_us", + bank_detected_to_now_us, + i64 + ), + ( + "bank_creation_to_slot_end_detected_us", + bank_detected_to_now_us + self.bank_detected_delay_us, + i64 + ), + ("bank_detected_delay_us", self.bank_detected_delay_us, i64), + ( + "process_buffered_packets_us", + self.process_buffered_packets_us, + i64 + ), + ( + "slot_metrics_check_slot_boundary_us", + self.slot_metrics_check_slot_boundary_us, + i64 + ), + ( + "receive_and_buffer_packets_us", + self.receive_and_buffer_packets_us, + i64 + ), + ); + } +} + +#[derive(Debug, Default)] +pub(crate) struct ProcessBufferedPacketsTimings { + pub make_decision_us: u64, + pub consume_buffered_packets_us: u64, + pub forward_us: u64, + pub forward_and_hold_us: u64, +} +impl ProcessBufferedPacketsTimings { + fn report(&self, id: u32, slot: Slot) { + datapoint_info!( + "banking_stage-leader_slot_process_buffered_packets_timings", + ("id", id as i64, i64), + ("slot", slot as i64, i64), + ("make_decision_us", self.make_decision_us as i64, i64), + ( + "consume_buffered_packets_us", + self.consume_buffered_packets_us as i64, + i64 + ), + ("forward_us", self.forward_us as i64, i64), + ("forward_and_hold_us", self.forward_and_hold_us as i64, i64), + ); + } +} + +#[derive(Debug, Default)] +pub(crate) struct ConsumeBufferedPacketsTimings { + // Time spent grabbing poh recorder lock + pub poh_recorder_lock_us: u64, + + // Time spent filtering invalid packets after leader slot has ended + pub end_of_slot_filtering_us: u64, + + // Time spent processing transactions + pub process_packets_transactions_us: u64, +} + +impl ConsumeBufferedPacketsTimings { + fn report(&self, id: u32, slot: Slot) { + datapoint_info!( + "banking_stage-leader_slot_consume_buffered_packets_timings", + ("id", id as i64, i64), + ("slot", slot as i64, i64), + ( + "poh_recorder_lock_us", + self.poh_recorder_lock_us as i64, + i64 + ), + ( + "end_of_slot_filtering_us", + self.end_of_slot_filtering_us as i64, + i64 + ), + ( + "process_packets_transactions_us", + self.process_packets_transactions_us as i64, + i64 + ), + ); + } +} + +#[derive(Debug, Default)] +pub(crate) struct ProcessPacketsTimings { + // Time spent converting packets to transactions + pub transactions_from_packets_us: u64, + + // Time spent processing transactions + pub process_transactions_us: u64, + + // Time spent filtering retryable packets that were returned after transaction + // processing + pub filter_retryable_packets_us: u64, + + // Time spent running the cost model in processing transactions before executing + // transactions + pub cost_model_us: u64, +} + +impl ProcessPacketsTimings { + fn report(&self, id: u32, slot: Slot) { + datapoint_info!( + "banking_stage-leader_slot_process_packets_timings", + ("id", id as i64, i64), + ("slot", slot as i64, i64), + ( + "transactions_from_packets_us", + self.transactions_from_packets_us, + i64 + ), + ("process_transactions_us", self.process_transactions_us, i64), + ( + "filter_retryable_packets_us", + self.filter_retryable_packets_us, + i64 + ), + ("cost_model_us", self.cost_model_us, i64), + ); + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 641f239694..24383ccd95 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -29,6 +29,7 @@ pub mod gen_keys; pub mod heaviest_subtree_fork_choice; pub mod latest_validator_votes_for_frozen_banks; pub mod leader_slot_banking_stage_metrics; +pub mod leader_slot_banking_stage_timing_metrics; pub mod ledger_cleanup_service; pub mod optimistic_confirmation_verifier; pub mod outstanding_requests; diff --git a/program-runtime/src/timings.rs b/program-runtime/src/timings.rs index 40e6dc9753..1ca577f287 100644 --- a/program-runtime/src/timings.rs +++ b/program-runtime/src/timings.rs @@ -129,6 +129,7 @@ pub struct ExecuteDetailsTimings { pub create_executor_jit_compile_us: u64, pub per_program_timings: HashMap, } + impl ExecuteDetailsTimings { pub fn accumulate(&mut self, other: &ExecuteDetailsTimings) { saturating_add_assign!(self.serialize_us, other.serialize_us);