Add execute timings (#23097)

This commit is contained in:
carllin 2022-02-17 01:14:32 -05:00 committed by GitHub
parent fa680a35ea
commit 619335df1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1015 additions and 326 deletions

View File

@ -4,6 +4,9 @@
use {
crate::{
leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary},
leader_slot_banking_stage_timing_metrics::{
LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
},
qos_service::QosService,
},
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
@ -91,9 +94,20 @@ const MIN_THREADS_BANKING: u32 = 1;
pub struct ProcessTransactionBatchOutput {
// The number of transactions filtered out by the cost model
cost_model_throttled_transactions_count: usize,
// Amount of time spent running the cost model
cost_model_us: u64,
execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput,
}
struct RecordTransactionsSummary {
// Metrics describing how time was spent recording transactions
record_transactions_timings: RecordTransactionsTimings,
// Result of trying to record the transactions into the PoH stream
result: Result<usize, PohRecorderError>,
// Transactions that failed record, and are retryable
retryable_indexes: Vec<usize>,
}
pub struct ExecuteAndCommitTransactionsOutput {
// Total number of transactions that were passed as candidates for execution
transactions_attempted_execution_count: usize,
@ -110,7 +124,7 @@ pub struct ExecuteAndCommitTransactionsOutput {
// committed into the Poh stream. If so, the result tells us
// how many such transactions were committed
commit_transactions_result: Result<(), PohRecorderError>,
execute_timings: ExecuteTimings,
execute_and_commit_timings: LeaderExecuteAndCommitTimings,
}
#[derive(Debug, Default)]
@ -533,6 +547,8 @@ impl BankingStage {
let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) =
buffered_packet_batch_and_offsets;
if let Some(end_of_slot) = &reached_end_of_slot {
let (should_retain, end_of_slot_filtering_time) = Measure::this(
|_| {
// We've hit the end of this slot, no need to perform more processing,
// just filter the remaining packets for the invalid (e.g. too old) ones
// if the working_bank is available
@ -547,7 +563,8 @@ impl BankingStage {
banking_stage_stats,
);
let end_of_slot_filtered_invalid_count = original_unprocessed_indexes
let end_of_slot_filtered_invalid_count =
original_unprocessed_indexes
.len()
.saturating_sub(new_unprocessed_indexes.len());
@ -557,7 +574,10 @@ impl BankingStage {
banking_stage_stats
.end_of_slot_filtered_invalid_count
.fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed);
.fetch_add(
end_of_slot_filtered_invalid_count,
Ordering::Relaxed,
);
Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes,
@ -566,14 +586,32 @@ impl BankingStage {
} else {
true
}
},
(),
"end_of_slot_filtering",
);
slot_metrics_tracker
.increment_end_of_slot_filtering_us(end_of_slot_filtering_time.as_us());
should_retain
} else {
let bank_start = poh_recorder.lock().unwrap().bank_start();
let (bank_start, poh_recorder_lock_time) = Measure::this(
|_| poh_recorder.lock().unwrap().bank_start(),
(),
"poh_recorder_lock",
);
slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us(
poh_recorder_lock_time.as_us(),
);
if let Some(BankStart {
working_bank,
bank_creation_time,
}) = bank_start
{
let process_transactions_summary = Self::process_packets_transactions(
let (process_transactions_summary, process_packets_transactions_time) =
Measure::this(
|_| {
Self::process_packets_transactions(
&working_bank,
&bank_creation_time,
recorder,
@ -584,7 +622,15 @@ impl BankingStage {
banking_stage_stats,
qos_service,
slot_metrics_tracker,
)
},
(),
"process_packets_transactions",
);
slot_metrics_tracker.increment_process_packets_transactions_us(
process_packets_transactions_time.as_us(),
);
let ProcessTransactionsSummary {
reached_max_poh_height,
retryable_transaction_indexes,
@ -592,16 +638,29 @@ impl BankingStage {
} = process_transactions_summary;
if reached_max_poh_height
// TODO adding timing metrics here from when bank was added to now
|| !Bank::should_bank_still_be_processing_txs(
&bank_creation_time,
max_tx_ingestion_ns,
)
{
let poh_recorder_lock_time = {
let (poh_recorder_locked, poh_recorder_lock_time) = Measure::this(
|_| poh_recorder.lock().unwrap(),
(),
"poh_recorder_lock",
);
reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(),
next_slot_leader: poh_recorder_locked.next_slot_leader(),
working_bank: Some(working_bank),
});
poh_recorder_lock_time
};
slot_metrics_tracker
.increment_consume_buffered_packets_poh_recorder_lock_us(
poh_recorder_lock_time.as_us(),
);
}
// The difference between all transactions passed to execution and the ones that
@ -630,10 +689,23 @@ impl BankingStage {
} else {
// mark as end-of-slot to avoid aggressively lock poh for the remaining for
// packet batches in buffer
let poh_recorder_lock_time = {
let (poh_recorder_locked, poh_recorder_lock_time) = Measure::this(
|_| poh_recorder.lock().unwrap(),
(),
"poh_recorder_lock",
);
reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(),
next_slot_leader: poh_recorder_locked.next_slot_leader(),
working_bank: None,
});
poh_recorder_lock_time
};
slot_metrics_tracker
.increment_consume_buffered_packets_poh_recorder_lock_us(
poh_recorder_lock_time.as_us(),
);
// `original_unprocessed_indexes` must have remaining packets to process
// if not yet processed.
@ -717,6 +789,8 @@ impl BankingStage {
qos_service: &QosService,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> BufferedPacketsDecision {
let (decision, make_decision_time) = Measure::this(
|_| {
let bank_start;
let (
leader_at_slot_offset,
@ -731,21 +805,29 @@ impl BankingStage {
PohRecorder::get_working_bank_if_not_expired(&bank_start.as_ref()),
poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT),
poh.would_be_leader(
(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT,
(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1)
* DEFAULT_TICKS_PER_SLOT,
),
)
};
let decision = Self::consume_or_forward_packets(
Self::consume_or_forward_packets(
my_pubkey,
leader_at_slot_offset,
bank_still_processing_txs,
would_be_leader,
would_be_leader_shortly,
)
},
(),
"make_decision",
);
slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us());
match decision {
BufferedPacketsDecision::Consume(max_tx_ingestion_ns) => {
let (_, consume_buffered_packets_time) = Measure::this(
|_| {
Self::consume_buffered_packets(
my_pubkey,
max_tx_ingestion_ns,
@ -758,9 +840,17 @@ impl BankingStage {
recorder,
qos_service,
slot_metrics_tracker,
)
},
(),
"consume_buffered_packets",
);
slot_metrics_tracker
.increment_consume_buffered_packets_us(consume_buffered_packets_time.as_us());
}
BufferedPacketsDecision::Forward => {
let (_, forward_time) = Measure::this(
|_| {
Self::handle_forwarding(
forward_option,
cluster_info,
@ -770,9 +860,16 @@ impl BankingStage {
false,
data_budget,
slot_metrics_tracker,
)
},
(),
"forward",
);
slot_metrics_tracker.increment_forward_us(forward_time.as_us());
}
BufferedPacketsDecision::ForwardAndHold => {
let (_, forward_and_hold_time) = Measure::this(
|_| {
Self::handle_forwarding(
forward_option,
cluster_info,
@ -782,7 +879,12 @@ impl BankingStage {
true,
data_budget,
slot_metrics_tracker,
)
},
(),
"forward_and_hold",
);
slot_metrics_tracker.increment_forward_and_hold_us(forward_and_hold_time.as_us());
}
_ => (),
}
@ -872,7 +974,9 @@ impl BankingStage {
loop {
let my_pubkey = cluster_info.id();
while !buffered_packet_batches.is_empty() {
let decision = Self::process_buffered_packets(
let (decision, process_buffered_packets_time) = Measure::this(
|_| {
Self::process_buffered_packets(
&my_pubkey,
&socket,
poh_recorder,
@ -886,7 +990,14 @@ impl BankingStage {
data_budget,
&qos_service,
&mut slot_metrics_tracker,
)
},
(),
"process_buffered_packets",
);
slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_time.as_us());
if matches!(decision, BufferedPacketsDecision::Hold)
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
{
@ -896,11 +1007,20 @@ impl BankingStage {
}
}
let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this(
|_| {
let current_poh_bank = {
let poh = poh_recorder.lock().unwrap();
poh.bank_start()
};
slot_metrics_tracker.update_on_leader_slot_boundary(&current_poh_bank);
},
(),
"slot_metrics_checker_check_slot_boundary",
);
slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us(
slot_metrics_checker_check_slot_boundary_time.as_us(),
);
let recv_timeout = if !buffered_packet_batches.is_empty() {
// If packets are buffered, let's wait for less time on recv from the channel.
@ -912,7 +1032,9 @@ impl BankingStage {
Duration::from_millis(100)
};
match Self::receive_and_buffer_packets(
let (res, receive_and_buffer_packets_time) = Measure::this(
|_| {
Self::receive_and_buffer_packets(
verified_receiver,
recv_start,
recv_timeout,
@ -921,11 +1043,18 @@ impl BankingStage {
&mut buffered_packet_batches,
&mut banking_stage_stats,
&mut slot_metrics_tracker,
) {
)
},
(),
"receive_and_buffer_packets",
);
slot_metrics_tracker
.increment_receive_and_buffer_packets_us(receive_and_buffer_packets_time.as_us());
match res {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
}
banking_stage_stats.report(1000);
}
}
@ -945,9 +1074,13 @@ impl BankingStage {
txs: &[SanitizedTransaction],
execution_results: &[TransactionExecutionResult],
recorder: &TransactionRecorder,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut processed_generation = Measure::start("record::process_generation");
let (processed_transactions, processed_transactions_indexes): (Vec<_>, Vec<_>) =
) -> RecordTransactionsSummary {
let mut record_transactions_timings = RecordTransactionsTimings::default();
let (
(processed_transactions, processed_transactions_indexes),
execution_results_to_transactions_time,
): ((Vec<_>, Vec<_>), Measure) = Measure::this(
|_| {
execution_results
.iter()
.zip(txs)
@ -959,9 +1092,14 @@ impl BankingStage {
None
}
})
.unzip();
.unzip()
},
(),
" execution_results_to_transactions",
);
record_transactions_timings.execution_results_to_transactions_us =
execution_results_to_transactions_time.as_us();
processed_generation.stop();
let num_to_commit = processed_transactions.len();
debug!("num_to_commit: {} ", num_to_commit);
// unlock all the accounts with errors which are filtered by the above `filter_map`
@ -969,13 +1107,21 @@ impl BankingStage {
inc_new_counter_info!("banking_stage-record_count", 1);
inc_new_counter_info!("banking_stage-record_transactions", num_to_commit);
let mut hash_time = Measure::start("record::hash");
let hash = hash_transactions(&processed_transactions[..]);
hash_time.stop();
let (hash, hash_time) = Measure::this(
|_| hash_transactions(&processed_transactions[..]),
(),
"hash",
);
record_transactions_timings.hash_us = hash_time.as_us();
let mut poh_record = Measure::start("record::poh_record");
// record and unlock will unlock all the successful transactions
let res = recorder.record(bank_slot, hash, processed_transactions);
let (res, poh_record_time) = Measure::this(
|_| recorder.record(bank_slot, hash, processed_transactions),
(),
"hash",
);
record_transactions_timings.poh_record_us = poh_record_time.as_us();
match res {
Ok(()) => (),
Err(PohRecorderError::MaxHeightReached) => {
@ -986,16 +1132,21 @@ impl BankingStage {
);
// If record errors, add all the committable transactions (the ones
// we just attempted to record) as retryable
return (
Err(PohRecorderError::MaxHeightReached),
processed_transactions_indexes,
);
return RecordTransactionsSummary {
record_transactions_timings,
result: Err(PohRecorderError::MaxHeightReached),
retryable_indexes: processed_transactions_indexes,
};
}
Err(e) => panic!("Poh recorder returned unexpected error: {:?}", e),
}
poh_record.stop();
}
(Ok(num_to_commit), vec![])
RecordTransactionsSummary {
record_transactions_timings,
result: (Ok(num_to_commit)),
retryable_indexes: vec![],
}
}
fn execute_and_commit_transactions_locked(
@ -1005,7 +1156,11 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
) -> ExecuteAndCommitTransactionsOutput {
let mut load_execute_time = Measure::start("load_execute_time");
let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
let mut mint_decimals: HashMap<Pubkey, u8> = HashMap::new();
let ((pre_balances, pre_token_balances), collect_balances_time) = Measure::this(
|_| {
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
// the likelihood of any single thread getting starved and processing old ids.
// TODO: Banking stage threads should be prioritized to complete faster then this queue
@ -1016,23 +1171,33 @@ impl BankingStage {
vec![]
};
let mut mint_decimals: HashMap<Pubkey, u8> = HashMap::new();
let pre_token_balances = if transaction_status_sender.is_some() {
collect_token_balances(bank, batch, &mut mint_decimals)
} else {
vec![]
};
let mut execute_timings = ExecuteTimings::default();
let load_and_execute_transactions_output = bank.load_and_execute_transactions(
(pre_balances, pre_token_balances)
},
(),
"collect_balances",
);
execute_and_commit_timings.collect_balances_us = collect_balances_time.as_us();
let (load_and_execute_transactions_output, load_execute_time) = Measure::this(
|_| {
bank.load_and_execute_transactions(
batch,
MAX_PROCESSING_AGE,
transaction_status_sender.is_some(),
transaction_status_sender.is_some(),
&mut execute_timings,
&mut execute_and_commit_timings.execute_timings,
)
},
(),
"load_execute",
);
load_execute_time.stop();
execute_and_commit_timings.load_execute_us = load_execute_time.as_us();
let LoadAndExecuteTransactionsOutput {
mut loaded_transactions,
@ -1044,16 +1209,31 @@ impl BankingStage {
..
} = load_and_execute_transactions_output;
let freeze_lock = bank.freeze_lock();
let (freeze_lock, freeze_lock_time) =
Measure::this(|_| bank.freeze_lock(), (), "freeze_lock");
execute_and_commit_timings.freeze_lock_us = freeze_lock_time.as_us();
let mut record_time = Measure::start("record_time");
let (commit_transactions_result, retryable_record_transaction_indexes) =
let (record_transactions_summary, record_time) = Measure::this(
|_| {
Self::record_transactions(
bank.slot(),
batch.sanitized_transactions(),
&execution_results,
poh,
)
},
(),
"record_transactions",
);
execute_and_commit_timings.record_us = record_time.as_us();
let RecordTransactionsSummary {
result: commit_transactions_result,
retryable_indexes: retryable_record_transaction_indexes,
record_transactions_timings,
} = record_transactions_summary;
execute_and_commit_timings.record_transactions_timings = record_transactions_timings;
inc_new_counter_info!(
"banking_stage-record_transactions_num_to_commit",
*commit_transactions_result.as_ref().unwrap_or(&0)
@ -1071,45 +1251,69 @@ impl BankingStage {
executed_with_successful_result_count,
retryable_transaction_indexes,
commit_transactions_result: Err(e),
execute_timings,
execute_and_commit_timings,
};
}
record_time.stop();
let mut commit_time = Measure::start("commit_time");
let sanitized_txs = batch.sanitized_transactions();
let committed_transaction_count = commit_transactions_result.unwrap();
// Note: `committed_transaction_count` should equal `executed_transactions_count`, since
// every executed transaction should have been recorded into the Poh stream if the record
// was successful (there's no partial records).
if committed_transaction_count != 0 {
let tx_results = bank.commit_transactions(
let commit_time_us = if committed_transaction_count != 0 {
let (tx_results, commit_time) = Measure::this(
|_| {
bank.commit_transactions(
sanitized_txs,
&mut loaded_transactions,
execution_results,
executed_transactions_count as u64,
executed_transactions_count.saturating_sub(executed_with_successful_result_count)
executed_transactions_count
.saturating_sub(executed_with_successful_result_count)
as u64,
signature_count,
&mut execute_timings,
&mut execute_and_commit_timings.execute_timings,
)
},
(),
"commit",
);
let commit_time_us = commit_time.as_us();
execute_and_commit_timings.commit_us = commit_time_us;
bank_utils::find_and_send_votes(sanitized_txs, &tx_results, Some(gossip_vote_sender));
let (_, find_and_send_votes_time) = Measure::this(
|_| {
bank_utils::find_and_send_votes(
sanitized_txs,
&tx_results,
Some(gossip_vote_sender),
);
if let Some(transaction_status_sender) = transaction_status_sender {
let txs = batch.sanitized_transactions().to_vec();
let post_balances = bank.collect_balances(batch);
let post_token_balances = collect_token_balances(bank, batch, &mut mint_decimals);
let post_token_balances =
collect_token_balances(bank, batch, &mut mint_decimals);
transaction_status_sender.send_transaction_status_batch(
bank.clone(),
txs,
tx_results.execution_results,
TransactionBalancesSet::new(pre_balances, post_balances),
TransactionTokenBalancesSet::new(pre_token_balances, post_token_balances),
TransactionTokenBalancesSet::new(
pre_token_balances,
post_token_balances,
),
tx_results.rent_debits,
);
}
}
commit_time.stop();
},
(),
"find_and_send_votes",
);
execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_time.as_us();
commit_time_us
} else {
0
};
drop(freeze_lock);
@ -1118,13 +1322,13 @@ impl BankingStage {
bank.slot(),
load_execute_time.as_us(),
record_time.as_us(),
commit_time.as_us(),
commit_time_us,
sanitized_txs.len(),
);
debug!(
"execute_and_commit_transactions_locked: {:?}",
execute_timings
execute_and_commit_timings.execute_timings,
);
ExecuteAndCommitTransactionsOutput {
@ -1133,7 +1337,7 @@ impl BankingStage {
executed_with_successful_result_count,
retryable_transaction_indexes,
commit_transactions_result: Ok(()),
execute_timings,
execute_and_commit_timings,
}
}
@ -1146,12 +1350,16 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender,
qos_service: &QosService,
) -> ProcessTransactionBatchOutput {
let ((transactions_qos_results, cost_model_throttled_transactions_count), cost_model_time) =
Measure::this(
|_| {
let tx_costs = qos_service.compute_transaction_costs(txs.iter());
let (transactions_qos_results, num_included) =
qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank);
let cost_model_throttled_transactions_count = txs.len().saturating_sub(num_included);
let cost_model_throttled_transactions_count =
txs.len().saturating_sub(num_included);
qos_service.accumulate_estimated_transaction_costs(
&Self::accumulate_batched_transaction_costs(
@ -1159,6 +1367,14 @@ impl BankingStage {
transactions_qos_results.iter(),
),
);
(
transactions_qos_results,
cost_model_throttled_transactions_count,
)
},
(),
"cost_model",
);
// Only lock accounts for those transactions are selected for the block;
// Once accounts are locked, other threads cannot encode transactions that will modify the
@ -1171,7 +1387,6 @@ impl BankingStage {
// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit
// WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit
// and WouldExceedMaxAccountDataCostLimit
let mut execute_and_commit_transactions_output =
Self::execute_and_commit_transactions_locked(
bank,
@ -1183,7 +1398,7 @@ impl BankingStage {
let ExecuteAndCommitTransactionsOutput {
ref mut retryable_transaction_indexes,
ref execute_timings,
ref execute_and_commit_timings,
..
} = execute_and_commit_transactions_output;
@ -1196,7 +1411,8 @@ impl BankingStage {
drop(batch);
unlock_time.stop();
let (cu, us) = Self::accumulate_execute_units_and_time(execute_timings);
let (cu, us) =
Self::accumulate_execute_units_and_time(&execute_and_commit_timings.execute_timings);
qos_service.accumulate_actual_execute_cu(cu);
qos_service.accumulate_actual_execute_time(us);
@ -1213,6 +1429,7 @@ impl BankingStage {
ProcessTransactionBatchOutput {
cost_model_throttled_transactions_count,
cost_model_us: cost_model_time.as_us(),
execute_and_commit_transactions_output,
}
}
@ -1286,6 +1503,8 @@ impl BankingStage {
// slot ended
let mut total_failed_commit_count: usize = 0;
let mut total_cost_model_throttled_transactions_count: usize = 0;
let mut total_cost_model_us: u64 = 0;
let mut total_execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
let mut reached_max_poh_height = false;
while chunk_start != transactions.len() {
let chunk_end = std::cmp::min(
@ -1304,11 +1523,13 @@ impl BankingStage {
let ProcessTransactionBatchOutput {
cost_model_throttled_transactions_count: new_cost_model_throttled_transactions_count,
cost_model_us: new_cost_model_us,
execute_and_commit_transactions_output,
} = process_transaction_batch_output;
total_cost_model_throttled_transactions_count =
total_cost_model_throttled_transactions_count
.saturating_add(new_cost_model_throttled_transactions_count);
total_cost_model_us = total_cost_model_us.saturating_add(new_cost_model_us);
let ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count: new_transactions_attempted_execution_count,
@ -1316,9 +1537,11 @@ impl BankingStage {
executed_with_successful_result_count: new_executed_with_successful_result_count,
retryable_transaction_indexes: new_retryable_transaction_indexes,
commit_transactions_result: new_commit_transactions_result,
execute_and_commit_timings: new_execute_and_commit_timings,
..
} = execute_and_commit_transactions_output;
total_execute_and_commit_timings.accumulate(&new_execute_and_commit_timings);
total_transactions_attempted_execution_count =
total_transactions_attempted_execution_count
.saturating_add(new_transactions_attempted_execution_count);
@ -1345,7 +1568,6 @@ impl BankingStage {
// If `bank_creation_time` is None, it's a test so ignore the option so
// allow processing
// TODO adding timing metrics here from when bank was added to now
let should_bank_still_be_processing_txs =
Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot);
match (
@ -1380,6 +1602,8 @@ impl BankingStage {
failed_commit_count: total_failed_commit_count,
retryable_transaction_indexes: all_retryable_tx_indexes,
cost_model_throttled_transactions_count: total_cost_model_throttled_transactions_count,
cost_model_us: total_cost_model_us,
execute_and_commit_timings: total_execute_and_commit_timings,
}
}
@ -1505,20 +1729,31 @@ impl BankingStage {
qos_service: &QosService,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> ProcessTransactionsSummary {
let mut packet_conversion_time = Measure::start("packet_conversion");
let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(
// Convert packets to transactions
let ((transactions, transaction_to_packet_indexes), packet_conversion_time) = Measure::this(
|_| {
Self::transactions_from_packets(
packet_batch,
&packet_indexes,
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
)
},
(),
"packet_conversion",
);
packet_conversion_time.stop();
let packet_conversion_us = packet_conversion_time.as_us();
slot_metrics_tracker.increment_transactions_from_packets_us(packet_conversion_us);
banking_stage_stats
.packet_conversion_elapsed
.fetch_add(packet_conversion_us, Ordering::Relaxed);
inc_new_counter_info!("banking_stage-packet_conversion", 1);
let mut process_tx_time = Measure::start("process_tx_time");
let mut process_transactions_summary = Self::process_transactions(
// Process transactions
let (mut process_transactions_summary, process_transactions_time) = Measure::this(
|_| {
Self::process_transactions(
bank,
bank_creation_time,
&transactions,
@ -1526,8 +1761,16 @@ impl BankingStage {
transaction_status_sender,
gossip_vote_sender,
qos_service,
)
},
(),
"process_transaction_time",
);
process_tx_time.stop();
let process_transactions_us = process_transactions_time.as_us();
slot_metrics_tracker.increment_process_transactions_us(process_transactions_us);
banking_stage_stats
.transaction_processing_elapsed
.fetch_add(process_transactions_us, Ordering::Relaxed);
let ProcessTransactionsSummary {
ref retryable_transaction_indexes,
@ -1539,18 +1782,29 @@ impl BankingStage {
let retryable_tx_count = retryable_transaction_indexes.len();
inc_new_counter_info!("banking_stage-unprocessed_transactions", retryable_tx_count);
let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time");
let filtered_retryable_tx_indexes = Self::filter_pending_packets_from_pending_txs(
// Filter out transactions that can't be retried
let (filtered_retryable_transaction_indexes, filter_retryable_packets_time) = Measure::this(
|_| {
Self::filter_pending_packets_from_pending_txs(
bank,
&transactions,
&transaction_to_packet_indexes,
retryable_transaction_indexes,
)
},
(),
"filter_pending_packets_time",
);
filter_pending_packets_time.stop();
let filter_retryable_packets_us = filter_retryable_packets_time.as_us();
slot_metrics_tracker
.increment_filter_retryable_packets_us(filter_retryable_packets_us as u64);
banking_stage_stats
.filter_pending_packets_elapsed
.fetch_add(filter_retryable_packets_us, Ordering::Relaxed);
let retryable_packets_filtered_count = retryable_transaction_indexes
.len()
.saturating_sub(filtered_retryable_tx_indexes.len());
.saturating_sub(filtered_retryable_transaction_indexes.len());
slot_metrics_tracker
.increment_retryable_packets_filtered_count(retryable_packets_filtered_count as u64);
@ -1558,21 +1812,11 @@ impl BankingStage {
"banking_stage-dropped_tx_before_forwarding",
retryable_transaction_indexes
.len()
.saturating_sub(filtered_retryable_tx_indexes.len())
.saturating_sub(filtered_retryable_transaction_indexes.len())
);
// Increment timing-based metrics
banking_stage_stats
.packet_conversion_elapsed
.fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed);
banking_stage_stats
.transaction_processing_elapsed
.fetch_add(process_tx_time.as_us(), Ordering::Relaxed);
banking_stage_stats
.filter_pending_packets_elapsed
.fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed);
process_transactions_summary.retryable_transaction_indexes = filtered_retryable_tx_indexes;
process_transactions_summary.retryable_transaction_indexes =
filtered_retryable_transaction_indexes;
process_transactions_summary
}
@ -2276,19 +2520,25 @@ mod tests {
1,
SystemError::ResultWithNegativeLamports.into(),
)));
let (res, retryable) =
BankingStage::record_transactions(bank.slot(), &txs, &results, &recorder);
res.unwrap();
assert!(retryable.is_empty());
let RecordTransactionsSummary {
result,
retryable_indexes,
..
} = BankingStage::record_transactions(bank.slot(), &txs, &results, &recorder);
result.unwrap();
assert!(retryable_indexes.is_empty());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entry.transactions.len(), txs.len());
// Other TransactionErrors should not be recorded
results[0] = TransactionExecutionResult::NotExecuted(TransactionError::AccountNotFound);
let (res, retryable) =
BankingStage::record_transactions(bank.slot(), &txs, &results, &recorder);
res.unwrap();
assert!(retryable.is_empty());
let RecordTransactionsSummary {
result,
retryable_indexes,
..
} = BankingStage::record_transactions(bank.slot(), &txs, &results, &recorder);
result.unwrap();
assert!(retryable_indexes.is_empty());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entry.transactions.len(), txs.len() - 1);
@ -2296,12 +2546,15 @@ mod tests {
// record_transactions should throw MaxHeightReached and return the set of retryable
// txs
let next_slot = bank.slot() + 1;
let (res, retryable) =
BankingStage::record_transactions(next_slot, &txs, &results, &recorder);
assert_matches!(res, Err(PohRecorderError::MaxHeightReached));
let RecordTransactionsSummary {
result,
retryable_indexes,
..
} = BankingStage::record_transactions(next_slot, &txs, &results, &recorder);
assert_matches!(result, Err(PohRecorderError::MaxHeightReached));
// The first result was an error so it's filtered out. The second result was Ok(),
// so it should be marked as retryable
assert_eq!(retryable, vec![1]);
assert_eq!(retryable_indexes, vec![1]);
// Should receive nothing from PohRecorder b/c record failed
assert!(entry_receiver.try_recv().is_err());

View File

@ -1,4 +1,5 @@
use {
crate::leader_slot_banking_stage_timing_metrics::*,
solana_poh::poh_recorder::BankStart,
solana_sdk::{clock::Slot, saturating_add_assign},
std::time::Instant,
@ -38,41 +39,12 @@ pub(crate) struct ProcessTransactionsSummary {
// The number of transactions filtered out by the cost model
pub cost_model_throttled_transactions_count: usize,
}
// Metrics capturing wallclock time spent in various parts of BankingStage during this
// validator's leader slot
#[derive(Debug)]
struct LeaderSlotTimingMetrics {
bank_detected_time: Instant,
// Total amount of time spent running the cost model
pub cost_model_us: u64,
// Delay from when the bank was created to when this thread detected it
bank_detected_delay_us: u64,
}
impl LeaderSlotTimingMetrics {
fn new(bank_creation_time: &Instant) -> Self {
Self {
bank_detected_time: Instant::now(),
bank_detected_delay_us: bank_creation_time.elapsed().as_micros() as u64,
}
}
fn report(&self, id: u32, slot: Slot) {
let bank_detected_to_now = self.bank_detected_time.elapsed().as_micros() as u64;
datapoint_info!(
"banking_stage-leader_slot_loop_timings",
("id", id as i64, i64),
("slot", slot as i64, i64),
("bank_detected_to_now_us", bank_detected_to_now, i64),
(
"bank_creation_to_now_us",
bank_detected_to_now + self.bank_detected_delay_us,
i64
),
("bank_detected_delay_us", self.bank_detected_delay_us, i64),
);
}
// Breakdown of time spent executing and comitting transactions
pub execute_and_commit_timings: LeaderExecuteAndCommitTimings,
}
// Metrics describing packets ingested/processed in various parts of BankingStage during this
@ -362,6 +334,8 @@ impl LeaderSlotMetricsTracker {
failed_commit_count,
ref retryable_transaction_indexes,
cost_model_throttled_transactions_count,
cost_model_us,
ref execute_and_commit_timings,
..
} = process_transactions_summary;
@ -415,9 +389,23 @@ impl LeaderSlotMetricsTracker {
.cost_model_throttled_transactions_count,
*cost_model_throttled_transactions_count as u64
);
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.process_packets_timings
.cost_model_us,
*cost_model_us as u64
);
leader_slot_metrics
.timing_metrics
.execute_and_commit_timings
.accumulate(execute_and_commit_timings);
}
}
// Packet inflow/outflow/processing metrics
pub(crate) fn increment_total_new_valid_packets(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
@ -527,6 +515,166 @@ impl LeaderSlotMetricsTracker {
);
}
}
// Outermost banking thread's loop timing metrics
pub(crate) fn increment_process_buffered_packets_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.outer_loop_timings
.process_buffered_packets_us,
us
);
}
}
pub(crate) fn increment_slot_metrics_check_slot_boundary_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.outer_loop_timings
.slot_metrics_check_slot_boundary_us,
us
);
}
}
pub(crate) fn increment_receive_and_buffer_packets_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.outer_loop_timings
.receive_and_buffer_packets_us,
us
);
}
}
// Processing buffer timing metrics
pub(crate) fn increment_make_decision_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.process_buffered_packets_timings
.make_decision_us,
us
);
}
}
pub(crate) fn increment_consume_buffered_packets_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.process_buffered_packets_timings
.consume_buffered_packets_us,
us
);
}
}
pub(crate) fn increment_forward_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.process_buffered_packets_timings
.forward_us,
us
);
}
}
pub(crate) fn increment_forward_and_hold_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.process_buffered_packets_timings
.forward_and_hold_us,
us
);
}
}
// Consuming buffered packets timing metrics
pub(crate) fn increment_end_of_slot_filtering_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.consume_buffered_packets_timings
.end_of_slot_filtering_us,
us
);
}
}
pub(crate) fn increment_consume_buffered_packets_poh_recorder_lock_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.consume_buffered_packets_timings
.poh_recorder_lock_us,
us
);
}
}
pub(crate) fn increment_process_packets_transactions_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.consume_buffered_packets_timings
.process_packets_transactions_us,
us
);
}
}
// Processing packets timing metrics
pub(crate) fn increment_transactions_from_packets_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.process_packets_timings
.transactions_from_packets_us,
us
);
}
}
pub(crate) fn increment_process_transactions_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.process_packets_timings
.process_transactions_us,
us
);
}
}
pub(crate) fn increment_filter_retryable_packets_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.process_packets_timings
.filter_retryable_packets_us,
us
);
}
}
}
#[cfg(test)]

View File

@ -0,0 +1,286 @@
use {
solana_program_runtime::timings::ExecuteTimings,
solana_sdk::{clock::Slot, saturating_add_assign},
std::time::Instant,
};
#[derive(Default, Debug)]
pub struct LeaderExecuteAndCommitTimings {
pub collect_balances_us: u64,
pub load_execute_us: u64,
pub freeze_lock_us: u64,
pub record_us: u64,
pub commit_us: u64,
pub find_and_send_votes_us: u64,
pub record_transactions_timings: RecordTransactionsTimings,
pub execute_timings: ExecuteTimings,
}
impl LeaderExecuteAndCommitTimings {
pub fn accumulate(&mut self, other: &LeaderExecuteAndCommitTimings) {
saturating_add_assign!(self.collect_balances_us, other.collect_balances_us);
saturating_add_assign!(self.load_execute_us, other.load_execute_us);
saturating_add_assign!(self.freeze_lock_us, other.freeze_lock_us);
saturating_add_assign!(self.record_us, other.record_us);
saturating_add_assign!(self.commit_us, other.commit_us);
saturating_add_assign!(self.find_and_send_votes_us, other.find_and_send_votes_us);
saturating_add_assign!(self.commit_us, other.commit_us);
self.record_transactions_timings
.accumulate(&other.record_transactions_timings);
self.execute_timings.accumulate(&other.execute_timings);
}
pub fn report(&self, id: u32, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_execute_and_commit_timings",
("id", id as i64, i64),
("slot", slot as i64, i64),
("collect_balances_us", self.collect_balances_us as i64, i64),
("load_execute_us", self.load_execute_us as i64, i64),
("freeze_lock_us", self.freeze_lock_us as i64, i64),
("record_us", self.record_us as i64, i64),
("commit_us", self.commit_us as i64, i64),
(
"find_and_send_votes_us",
self.find_and_send_votes_us as i64,
i64
),
);
datapoint_info!(
"banking_stage-leader_slot_record_timings",
("id", id as i64, i64),
("slot", slot as i64, i64),
(
"execution_results_to_transactions_us",
self.record_transactions_timings
.execution_results_to_transactions_us as i64,
i64
),
(
"hash_us",
self.record_transactions_timings.hash_us as i64,
i64
),
(
"poh_record_us",
self.record_transactions_timings.poh_record_us as i64,
i64
),
);
}
}
#[derive(Default, Debug)]
pub struct RecordTransactionsTimings {
pub execution_results_to_transactions_us: u64,
pub hash_us: u64,
pub poh_record_us: u64,
}
impl RecordTransactionsTimings {
pub fn accumulate(&mut self, other: &RecordTransactionsTimings) {
saturating_add_assign!(
self.execution_results_to_transactions_us,
other.execution_results_to_transactions_us
);
saturating_add_assign!(self.hash_us, other.hash_us);
saturating_add_assign!(self.poh_record_us, other.poh_record_us);
}
}
// Metrics capturing wallclock time spent in various parts of BankingStage during this
// validator's leader slot
#[derive(Debug)]
pub(crate) struct LeaderSlotTimingMetrics {
pub outer_loop_timings: OuterLoopTimings,
pub process_buffered_packets_timings: ProcessBufferedPacketsTimings,
pub consume_buffered_packets_timings: ConsumeBufferedPacketsTimings,
pub process_packets_timings: ProcessPacketsTimings,
pub execute_and_commit_timings: LeaderExecuteAndCommitTimings,
}
impl LeaderSlotTimingMetrics {
pub(crate) fn new(bank_creation_time: &Instant) -> Self {
Self {
outer_loop_timings: OuterLoopTimings::new(bank_creation_time),
process_buffered_packets_timings: ProcessBufferedPacketsTimings::default(),
consume_buffered_packets_timings: ConsumeBufferedPacketsTimings::default(),
process_packets_timings: ProcessPacketsTimings::default(),
execute_and_commit_timings: LeaderExecuteAndCommitTimings::default(),
}
}
pub(crate) fn report(&self, id: u32, slot: Slot) {
self.outer_loop_timings.report(id, slot);
self.process_buffered_packets_timings.report(id, slot);
self.consume_buffered_packets_timings.report(id, slot);
self.process_packets_timings.report(id, slot);
self.execute_and_commit_timings.report(id, slot);
}
}
#[derive(Debug)]
pub(crate) struct OuterLoopTimings {
pub bank_detected_time: Instant,
// Delay from when the bank was created to when this thread detected it
pub bank_detected_delay_us: u64,
// Time spent processing buffered packets
pub process_buffered_packets_us: u64,
// Time spent checking for slot boundary and reporting leader slot metrics
pub slot_metrics_check_slot_boundary_us: u64,
// Time spent processing new incoming packets to the banking thread
pub receive_and_buffer_packets_us: u64,
}
impl OuterLoopTimings {
fn new(bank_creation_time: &Instant) -> Self {
Self {
bank_detected_time: Instant::now(),
bank_detected_delay_us: bank_creation_time.elapsed().as_micros() as u64,
process_buffered_packets_us: 0,
slot_metrics_check_slot_boundary_us: 0,
receive_and_buffer_packets_us: 0,
}
}
fn report(&self, id: u32, slot: Slot) {
let bank_detected_to_now_us = self.bank_detected_time.elapsed().as_micros() as u64;
datapoint_info!(
"banking_stage-leader_slot_loop_timings",
("id", id as i64, i64),
("slot", slot as i64, i64),
(
"bank_detected_to_slot_end_detected_us",
bank_detected_to_now_us,
i64
),
(
"bank_creation_to_slot_end_detected_us",
bank_detected_to_now_us + self.bank_detected_delay_us,
i64
),
("bank_detected_delay_us", self.bank_detected_delay_us, i64),
(
"process_buffered_packets_us",
self.process_buffered_packets_us,
i64
),
(
"slot_metrics_check_slot_boundary_us",
self.slot_metrics_check_slot_boundary_us,
i64
),
(
"receive_and_buffer_packets_us",
self.receive_and_buffer_packets_us,
i64
),
);
}
}
#[derive(Debug, Default)]
pub(crate) struct ProcessBufferedPacketsTimings {
pub make_decision_us: u64,
pub consume_buffered_packets_us: u64,
pub forward_us: u64,
pub forward_and_hold_us: u64,
}
impl ProcessBufferedPacketsTimings {
fn report(&self, id: u32, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_process_buffered_packets_timings",
("id", id as i64, i64),
("slot", slot as i64, i64),
("make_decision_us", self.make_decision_us as i64, i64),
(
"consume_buffered_packets_us",
self.consume_buffered_packets_us as i64,
i64
),
("forward_us", self.forward_us as i64, i64),
("forward_and_hold_us", self.forward_and_hold_us as i64, i64),
);
}
}
#[derive(Debug, Default)]
pub(crate) struct ConsumeBufferedPacketsTimings {
// Time spent grabbing poh recorder lock
pub poh_recorder_lock_us: u64,
// Time spent filtering invalid packets after leader slot has ended
pub end_of_slot_filtering_us: u64,
// Time spent processing transactions
pub process_packets_transactions_us: u64,
}
impl ConsumeBufferedPacketsTimings {
fn report(&self, id: u32, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_consume_buffered_packets_timings",
("id", id as i64, i64),
("slot", slot as i64, i64),
(
"poh_recorder_lock_us",
self.poh_recorder_lock_us as i64,
i64
),
(
"end_of_slot_filtering_us",
self.end_of_slot_filtering_us as i64,
i64
),
(
"process_packets_transactions_us",
self.process_packets_transactions_us as i64,
i64
),
);
}
}
#[derive(Debug, Default)]
pub(crate) struct ProcessPacketsTimings {
// Time spent converting packets to transactions
pub transactions_from_packets_us: u64,
// Time spent processing transactions
pub process_transactions_us: u64,
// Time spent filtering retryable packets that were returned after transaction
// processing
pub filter_retryable_packets_us: u64,
// Time spent running the cost model in processing transactions before executing
// transactions
pub cost_model_us: u64,
}
impl ProcessPacketsTimings {
fn report(&self, id: u32, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_process_packets_timings",
("id", id as i64, i64),
("slot", slot as i64, i64),
(
"transactions_from_packets_us",
self.transactions_from_packets_us,
i64
),
("process_transactions_us", self.process_transactions_us, i64),
(
"filter_retryable_packets_us",
self.filter_retryable_packets_us,
i64
),
("cost_model_us", self.cost_model_us, i64),
);
}
}

View File

@ -29,6 +29,7 @@ pub mod gen_keys;
pub mod heaviest_subtree_fork_choice;
pub mod latest_validator_votes_for_frozen_banks;
pub mod leader_slot_banking_stage_metrics;
pub mod leader_slot_banking_stage_timing_metrics;
pub mod ledger_cleanup_service;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;

View File

@ -129,6 +129,7 @@ pub struct ExecuteDetailsTimings {
pub create_executor_jit_compile_us: u64,
pub per_program_timings: HashMap<Pubkey, ProgramTiming>,
}
impl ExecuteDetailsTimings {
pub fn accumulate(&mut self, other: &ExecuteDetailsTimings) {
saturating_add_assign!(self.serialize_us, other.serialize_us);