BankingStage Refactor: Simplify Consumer (#30253)

* measure! to measure_us!

* Consistent naming of transaction_recorder

* Remove outdated comment - Instant cannot be None

* use local

* Remove measure! import
This commit is contained in:
Andrew Fitzgerald 2023-02-15 17:20:55 -08:00 committed by GitHub
parent b9b7178f1f
commit 1cefb90271
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 55 additions and 75 deletions

View File

@ -13,7 +13,7 @@ use {
}, },
itertools::Itertools, itertools::Itertools,
solana_ledger::token_balances::collect_token_balances, solana_ledger::token_balances::collect_token_balances,
solana_measure::{measure, measure::Measure}, solana_measure::{measure::Measure, measure_us},
solana_poh::poh_recorder::{ solana_poh::poh_recorder::{
BankStart, PohRecorderError, RecordTransactionsSummary, RecordTransactionsTimings, BankStart, PohRecorderError, RecordTransactionsSummary, RecordTransactionsTimings,
TransactionRecorder, TransactionRecorder,
@ -73,7 +73,7 @@ impl Consumer {
test_fn: Option<impl Fn()>, test_fn: Option<impl Fn()>,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
committer: &Committer, committer: &Committer,
recorder: &TransactionRecorder, transaction_recorder: &TransactionRecorder,
qos_service: &QosService, qos_service: &QosService,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker, slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
@ -92,7 +92,7 @@ impl Consumer {
bank_start, bank_start,
payload, payload,
committer, committer,
recorder, transaction_recorder,
banking_stage_stats, banking_stage_stats,
qos_service, qos_service,
log_messages_bytes_limit, log_messages_bytes_limit,
@ -136,7 +136,7 @@ impl Consumer {
bank_start: &BankStart, bank_start: &BankStart,
payload: &mut ConsumeScannerPayload, payload: &mut ConsumeScannerPayload,
committer: &Committer, committer: &Committer,
recorder: &TransactionRecorder, transaction_recorder: &TransactionRecorder,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
qos_service: &QosService, qos_service: &QosService,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
@ -150,23 +150,21 @@ impl Consumer {
} }
let packets_to_process_len = packets_to_process.len(); let packets_to_process_len = packets_to_process.len();
let (process_transactions_summary, process_packets_transactions_time) = measure!( let (process_transactions_summary, process_packets_transactions_us) =
Self::process_packets_transactions( measure_us!(Self::process_packets_transactions(
&bank_start.working_bank, &bank_start.working_bank,
&bank_start.bank_creation_time, &bank_start.bank_creation_time,
committer, committer,
recorder, transaction_recorder,
&payload.sanitized_transactions, &payload.sanitized_transactions,
banking_stage_stats, banking_stage_stats,
qos_service, qos_service,
payload.slot_metrics_tracker, payload.slot_metrics_tracker,
log_messages_bytes_limit log_messages_bytes_limit
), ));
"process_packets_transactions",
);
payload payload
.slot_metrics_tracker .slot_metrics_tracker
.increment_process_packets_transactions_us(process_packets_transactions_time.as_us()); .increment_process_packets_transactions_us(process_packets_transactions_us);
// Clear payload for next iteration // Clear payload for next iteration
payload.sanitized_transactions.clear(); payload.sanitized_transactions.clear();
@ -210,7 +208,7 @@ impl Consumer {
bank: &Arc<Bank>, bank: &Arc<Bank>,
bank_creation_time: &Instant, bank_creation_time: &Instant,
committer: &Committer, committer: &Committer,
poh: &TransactionRecorder, transaction_recorder: &TransactionRecorder,
sanitized_transactions: &[SanitizedTransaction], sanitized_transactions: &[SanitizedTransaction],
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
qos_service: &QosService, qos_service: &QosService,
@ -218,19 +216,16 @@ impl Consumer {
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
) -> ProcessTransactionsSummary { ) -> ProcessTransactionsSummary {
// Process transactions // Process transactions
let (mut process_transactions_summary, process_transactions_time) = measure!( let (mut process_transactions_summary, process_transactions_us) =
Self::process_transactions( measure_us!(Self::process_transactions(
bank, bank,
bank_creation_time, bank_creation_time,
sanitized_transactions, sanitized_transactions,
committer, committer,
poh, transaction_recorder,
qos_service, qos_service,
log_messages_bytes_limit, log_messages_bytes_limit,
), ));
"process_transaction_time",
);
let process_transactions_us = process_transactions_time.as_us();
slot_metrics_tracker.increment_process_transactions_us(process_transactions_us); slot_metrics_tracker.increment_process_transactions_us(process_transactions_us);
banking_stage_stats banking_stage_stats
.transaction_processing_elapsed .transaction_processing_elapsed
@ -249,15 +244,12 @@ impl Consumer {
inc_new_counter_info!("banking_stage-unprocessed_transactions", retryable_tx_count); inc_new_counter_info!("banking_stage-unprocessed_transactions", retryable_tx_count);
// Filter out the retryable transactions that are too old // Filter out the retryable transactions that are too old
let (filtered_retryable_transaction_indexes, filter_retryable_packets_time) = measure!( let (filtered_retryable_transaction_indexes, filter_retryable_packets_us) =
Self::filter_pending_packets_from_pending_txs( measure_us!(Self::filter_pending_packets_from_pending_txs(
bank, bank,
sanitized_transactions, sanitized_transactions,
retryable_transaction_indexes, retryable_transaction_indexes,
), ));
"filter_pending_packets_time",
);
let filter_retryable_packets_us = filter_retryable_packets_time.as_us();
slot_metrics_tracker.increment_filter_retryable_packets_us(filter_retryable_packets_us); slot_metrics_tracker.increment_filter_retryable_packets_us(filter_retryable_packets_us);
banking_stage_stats banking_stage_stats
.filter_pending_packets_elapsed .filter_pending_packets_elapsed
@ -271,9 +263,7 @@ impl Consumer {
inc_new_counter_info!( inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding", "banking_stage-dropped_tx_before_forwarding",
retryable_transaction_indexes retryable_packets_filtered_count
.len()
.saturating_sub(filtered_retryable_transaction_indexes.len())
); );
process_transactions_summary.retryable_transaction_indexes = process_transactions_summary.retryable_transaction_indexes =
@ -290,7 +280,7 @@ impl Consumer {
bank_creation_time: &Instant, bank_creation_time: &Instant,
transactions: &[SanitizedTransaction], transactions: &[SanitizedTransaction],
committer: &Committer, committer: &Committer,
poh: &TransactionRecorder, transaction_recorder: &TransactionRecorder,
qos_service: &QosService, qos_service: &QosService,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
) -> ProcessTransactionsSummary { ) -> ProcessTransactionsSummary {
@ -320,7 +310,7 @@ impl Consumer {
bank, bank,
&transactions[chunk_start..chunk_end], &transactions[chunk_start..chunk_end],
committer, committer,
poh, transaction_recorder,
chunk_start, chunk_start,
qos_service, qos_service,
log_messages_bytes_limit, log_messages_bytes_limit,
@ -373,8 +363,6 @@ impl Consumer {
// to the list of unprocessed txs. // to the list of unprocessed txs.
all_retryable_tx_indexes.extend_from_slice(&new_retryable_transaction_indexes); all_retryable_tx_indexes.extend_from_slice(&new_retryable_transaction_indexes);
// If `bank_creation_time` is None, it's a test so ignore the option so
// allow processing
let should_bank_still_be_processing_txs = let should_bank_still_be_processing_txs =
Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot); Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot);
match ( match (
@ -419,20 +407,20 @@ impl Consumer {
bank: &Arc<Bank>, bank: &Arc<Bank>,
txs: &[SanitizedTransaction], txs: &[SanitizedTransaction],
committer: &Committer, committer: &Committer,
poh: &TransactionRecorder, transaction_recorder: &TransactionRecorder,
chunk_offset: usize, chunk_offset: usize,
qos_service: &QosService, qos_service: &QosService,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
) -> ProcessTransactionBatchOutput { ) -> ProcessTransactionBatchOutput {
let ( let (
(transaction_costs, transactions_qos_results, cost_model_throttled_transactions_count), (transaction_costs, transactions_qos_results, cost_model_throttled_transactions_count),
cost_model_time, cost_model_us,
) = measure!(qos_service.select_and_accumulate_transaction_costs(bank, txs)); ) = measure_us!(qos_service.select_and_accumulate_transaction_costs(bank, txs));
// Only lock accounts for those transactions are selected for the block; // Only lock accounts for those transactions are selected for the block;
// Once accounts are locked, other threads cannot encode transactions that will modify the // Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state // same account state
let (batch, lock_time) = measure!( let (batch, lock_us) = measure_us!(
bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.iter()) bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.iter())
); );
@ -443,13 +431,13 @@ impl Consumer {
Self::execute_and_commit_transactions_locked( Self::execute_and_commit_transactions_locked(
bank, bank,
committer, committer,
poh, transaction_recorder,
&batch, &batch,
log_messages_bytes_limit, log_messages_bytes_limit,
); );
// Once the accounts are new transactions can enter the pipeline to process them // Once the accounts are new transactions can enter the pipeline to process them
let (_, unlock_time) = measure!(drop(batch)); let (_, unlock_us) = measure_us!(drop(batch));
let ExecuteAndCommitTransactionsOutput { let ExecuteAndCommitTransactionsOutput {
ref mut retryable_transaction_indexes, ref mut retryable_transaction_indexes,
@ -480,14 +468,14 @@ impl Consumer {
debug!( debug!(
"bank: {} lock: {}us unlock: {}us txs_len: {}", "bank: {} lock: {}us unlock: {}us txs_len: {}",
bank.slot(), bank.slot(),
lock_time.as_us(), lock_us,
unlock_time.as_us(), unlock_us,
txs.len(), txs.len(),
); );
ProcessTransactionBatchOutput { ProcessTransactionBatchOutput {
cost_model_throttled_transactions_count, cost_model_throttled_transactions_count,
cost_model_us: cost_model_time.as_us(), cost_model_us,
execute_and_commit_transactions_output, execute_and_commit_transactions_output,
} }
} }
@ -495,7 +483,7 @@ impl Consumer {
fn execute_and_commit_transactions_locked( fn execute_and_commit_transactions_locked(
bank: &Arc<Bank>, bank: &Arc<Bank>,
committer: &Committer, committer: &Committer,
poh: &TransactionRecorder, transaction_recorder: &TransactionRecorder,
batch: &TransactionBatch, batch: &TransactionBatch,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
) -> ExecuteAndCommitTransactionsOutput { ) -> ExecuteAndCommitTransactionsOutput {
@ -503,22 +491,19 @@ impl Consumer {
let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default(); let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
let mut pre_balance_info = PreBalanceInfo::default(); let mut pre_balance_info = PreBalanceInfo::default();
let (_, collect_balances_time) = measure!( let (_, collect_balances_us) = measure_us!({
{ // If the extra meta-data services are enabled for RPC, collect the
// If the extra meta-data services are enabled for RPC, collect the // pre-balances for native and token programs.
// pre-balances for native and token programs. if transaction_status_sender_enabled {
if transaction_status_sender_enabled { pre_balance_info.native = bank.collect_balances(batch);
pre_balance_info.native = bank.collect_balances(batch); pre_balance_info.token =
pre_balance_info.token = collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals)
collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals) }
} });
}, execute_and_commit_timings.collect_balances_us = collect_balances_us;
"collect_balances",
);
execute_and_commit_timings.collect_balances_us = collect_balances_time.as_us();
let (load_and_execute_transactions_output, load_execute_time) = measure!( let (load_and_execute_transactions_output, load_execute_us) = measure_us!(bank
bank.load_and_execute_transactions( .load_and_execute_transactions(
batch, batch,
MAX_PROCESSING_AGE, MAX_PROCESSING_AGE,
transaction_status_sender_enabled, transaction_status_sender_enabled,
@ -527,10 +512,8 @@ impl Consumer {
&mut execute_and_commit_timings.execute_timings, &mut execute_and_commit_timings.execute_timings,
None, // account_overrides None, // account_overrides
log_messages_bytes_limit log_messages_bytes_limit
), ));
"load_execute", execute_and_commit_timings.load_execute_us = load_execute_us;
);
execute_and_commit_timings.load_execute_us = load_execute_time.as_us();
let LoadAndExecuteTransactionsOutput { let LoadAndExecuteTransactionsOutput {
mut loaded_transactions, mut loaded_transactions,
@ -545,8 +528,8 @@ impl Consumer {
} = load_and_execute_transactions_output; } = load_and_execute_transactions_output;
let transactions_attempted_execution_count = execution_results.len(); let transactions_attempted_execution_count = execution_results.len();
let (executed_transactions, execution_results_to_transactions_time): (Vec<_>, Measure) = measure!( let (executed_transactions, execution_results_to_transactions_us) =
execution_results measure_us!(execution_results
.iter() .iter()
.zip(batch.sanitized_transactions()) .zip(batch.sanitized_transactions())
.filter_map(|(execution_result, tx)| { .filter_map(|(execution_result, tx)| {
@ -556,12 +539,10 @@ impl Consumer {
None None
} }
}) })
.collect(), .collect_vec());
"execution_results_to_transactions",
);
let (freeze_lock, freeze_lock_time) = measure!(bank.freeze_lock(), "freeze_lock"); let (freeze_lock, freeze_lock_us) = measure_us!(bank.freeze_lock());
execute_and_commit_timings.freeze_lock_us = freeze_lock_time.as_us(); execute_and_commit_timings.freeze_lock_us = freeze_lock_us;
if !executed_transactions.is_empty() { if !executed_transactions.is_empty() {
inc_new_counter_info!("banking_stage-record_count", 1); inc_new_counter_info!("banking_stage-record_count", 1);
@ -570,11 +551,10 @@ impl Consumer {
executed_transactions_count executed_transactions_count
); );
} }
let (record_transactions_summary, record_time) = measure!( let (record_transactions_summary, record_us) = measure_us!(
poh.record_transactions(bank.slot(), executed_transactions), transaction_recorder.record_transactions(bank.slot(), executed_transactions)
"record_transactions",
); );
execute_and_commit_timings.record_us = record_time.as_us(); execute_and_commit_timings.record_us = record_us;
let RecordTransactionsSummary { let RecordTransactionsSummary {
result: record_transactions_result, result: record_transactions_result,
@ -582,7 +562,7 @@ impl Consumer {
starting_transaction_index, starting_transaction_index,
} = record_transactions_summary; } = record_transactions_summary;
execute_and_commit_timings.record_transactions_timings = RecordTransactionsTimings { execute_and_commit_timings.record_transactions_timings = RecordTransactionsTimings {
execution_results_to_transactions_us: execution_results_to_transactions_time.as_us(), execution_results_to_transactions_us,
..record_transactions_timings ..record_transactions_timings
}; };
@ -634,8 +614,8 @@ impl Consumer {
debug!( debug!(
"bank: {} process_and_record_locked: {}us record: {}us commit: {}us txs_len: {}", "bank: {} process_and_record_locked: {}us record: {}us commit: {}us txs_len: {}",
bank.slot(), bank.slot(),
load_execute_time.as_us(), load_execute_us,
record_time.as_us(), record_us,
commit_time_us, commit_time_us,
batch.sanitized_transactions().len(), batch.sanitized_transactions().len(),
); );