diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 089043b787..ad33a30b31 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -40,9 +40,11 @@ use { solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, solana_program_runtime::timings::ExecuteTimings, solana_runtime::{ + accounts::TransactionLoadResult, bank::{ Bank, CommitTransactionCounts, LoadAndExecuteTransactionsOutput, - TransactionBalancesSet, TransactionCheckResult, + TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult, + TransactionResults, }, bank_forks::BankForks, bank_utils, @@ -65,7 +67,9 @@ use { }, solana_streamer::sendmmsg::batch_send, solana_tpu_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, - solana_transaction_status::token_balances::TransactionTokenBalancesSet, + solana_transaction_status::{ + token_balances::TransactionTokenBalancesSet, TransactionTokenBalance, + }, std::{ cmp, collections::HashMap, @@ -312,6 +316,13 @@ impl BankingStageStats { } } +#[derive(Default)] +struct PreBalanceInfo { + native: Vec>, + token: Vec>, + mint_decimals: HashMap, +} + #[derive(Debug, Default)] pub struct BatchedTransactionDetails { pub costs: BatchedTransactionCostDetails, @@ -1192,6 +1203,129 @@ impl BankingStage { } } + #[allow(clippy::too_many_arguments)] + fn commit_transactions( + batch: &TransactionBatch, + loaded_transactions: &mut [TransactionLoadResult], + execution_results: Vec, + sanitized_txs: &[SanitizedTransaction], + starting_transaction_index: Option, + bank: &Arc, + pre_balance_info: &mut PreBalanceInfo, + execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, + transaction_status_sender: &Option, + gossip_vote_sender: &ReplayVoteSender, + signature_count: u64, + executed_transactions_count: usize, + executed_with_successful_result_count: usize, + ) -> (u64, Vec) { + inc_new_counter_info!( + "banking_stage-record_transactions_num_to_commit", + executed_transactions_count + ); + + let (last_blockhash, lamports_per_signature) = + bank.last_blockhash_and_lamports_per_signature(); + + let (tx_results, commit_time) = measure!( + bank.commit_transactions( + sanitized_txs, + loaded_transactions, + execution_results, + last_blockhash, + lamports_per_signature, + CommitTransactionCounts { + committed_transactions_count: executed_transactions_count as u64, + committed_with_failure_result_count: executed_transactions_count + .saturating_sub(executed_with_successful_result_count) + as u64, + signature_count, + }, + &mut execute_and_commit_timings.execute_timings, + ), + "commit", + ); + let commit_time_us = commit_time.as_us(); + execute_and_commit_timings.commit_us = commit_time_us; + + let commit_transaction_statuses = tx_results + .execution_results + .iter() + .map(|execution_result| match execution_result.details() { + Some(details) => CommitTransactionDetails::Committed { + compute_units: details.executed_units, + }, + None => CommitTransactionDetails::NotCommitted, + }) + .collect(); + + let (_, find_and_send_votes_time) = measure!( + { + bank_utils::find_and_send_votes( + sanitized_txs, + &tx_results, + Some(gossip_vote_sender), + ); + Self::collect_balances_and_send_status_batch( + transaction_status_sender, + tx_results, + bank, + batch, + pre_balance_info, + starting_transaction_index, + ); + }, + "find_and_send_votes", + ); + execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_time.as_us(); + (commit_time_us, commit_transaction_statuses) + } + + fn collect_balances_and_send_status_batch( + transaction_status_sender: &Option, + tx_results: TransactionResults, + bank: &Arc, + batch: &TransactionBatch, + pre_balance_info: &mut PreBalanceInfo, + starting_transaction_index: Option, + ) { + if let Some(transaction_status_sender) = transaction_status_sender { + let txs = batch.sanitized_transactions().to_vec(); + let post_balances = bank.collect_balances(batch); + let post_token_balances = + collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals); + let mut transaction_index = starting_transaction_index.unwrap_or_default(); + let batch_transaction_indexes: Vec<_> = tx_results + .execution_results + .iter() + .map(|result| { + if result.was_executed() { + let this_transaction_index = transaction_index; + saturating_add_assign!(transaction_index, 1); + this_transaction_index + } else { + 0 + } + }) + .collect(); + transaction_status_sender.send_transaction_status_batch( + bank.clone(), + txs, + tx_results.execution_results, + TransactionBalancesSet::new( + std::mem::take(&mut pre_balance_info.native), + post_balances, + ), + TransactionTokenBalancesSet::new( + std::mem::take(&mut pre_balance_info.token), + post_token_balances, + ), + tx_results.rent_debits, + batch_transaction_indexes, + ); + } + } + fn execute_and_commit_transactions_locked( bank: &Arc, poh: &TransactionRecorder, @@ -1201,27 +1335,17 @@ impl BankingStage { log_messages_bytes_limit: Option, ) -> ExecuteAndCommitTransactionsOutput { let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default(); - let mut mint_decimals: HashMap = HashMap::new(); - let ((pre_balances, pre_token_balances), collect_balances_time) = measure!( + let mut pre_balance_info = PreBalanceInfo::default(); + let (_, collect_balances_time) = measure!( { - // Use a shorter maximum age when adding transactions into the pipeline. This will reduce - // the likelihood of any single thread getting starved and processing old ids. - // TODO: Banking stage threads should be prioritized to complete faster then this queue - // expires. - let pre_balances = if transaction_status_sender.is_some() { - bank.collect_balances(batch) - } else { - vec![] - }; - - let pre_token_balances = if transaction_status_sender.is_some() { - collect_token_balances(bank, batch, &mut mint_decimals) - } else { - vec![] - }; - - (pre_balances, pre_token_balances) + // If the extra meta-data services are enabled for RPC, collect the + // pre-balances for native and token programs. + if transaction_status_sender.is_some() { + pre_balance_info.native = bank.collect_balances(batch); + pre_balance_info.token = + collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals) + } }, "collect_balances", ); @@ -1269,8 +1393,6 @@ impl BankingStage { "execution_results_to_transactions", ); - let (last_blockhash, lamports_per_signature) = - bank.last_blockhash_and_lamports_per_signature(); let (freeze_lock, freeze_lock_time) = measure!(bank.freeze_lock(), "freeze_lock"); execute_and_commit_timings.freeze_lock_us = freeze_lock_time.as_us(); @@ -1313,87 +1435,21 @@ impl BankingStage { let sanitized_txs = batch.sanitized_transactions(); let (commit_time_us, commit_transaction_statuses) = if executed_transactions_count != 0 { - inc_new_counter_info!( - "banking_stage-record_transactions_num_to_commit", - executed_transactions_count - ); - - let (tx_results, commit_time) = measure!( - bank.commit_transactions( - sanitized_txs, - &mut loaded_transactions, - execution_results, - last_blockhash, - lamports_per_signature, - CommitTransactionCounts { - committed_transactions_count: executed_transactions_count as u64, - committed_with_failure_result_count: executed_transactions_count - .saturating_sub(executed_with_successful_result_count) - as u64, - signature_count, - }, - &mut execute_and_commit_timings.execute_timings, - ), - "commit", - ); - let commit_time_us = commit_time.as_us(); - execute_and_commit_timings.commit_us = commit_time_us; - - let commit_transaction_statuses = tx_results - .execution_results - .iter() - .map(|execution_result| match execution_result.details() { - Some(details) => CommitTransactionDetails::Committed { - compute_units: details.executed_units, - }, - None => CommitTransactionDetails::NotCommitted, - }) - .collect(); - - let (_, find_and_send_votes_time) = measure!( - { - bank_utils::find_and_send_votes( - sanitized_txs, - &tx_results, - Some(gossip_vote_sender), - ); - if let Some(transaction_status_sender) = transaction_status_sender { - let txs = batch.sanitized_transactions().to_vec(); - let post_balances = bank.collect_balances(batch); - let post_token_balances = - collect_token_balances(bank, batch, &mut mint_decimals); - let mut transaction_index = starting_transaction_index.unwrap_or_default(); - let batch_transaction_indexes: Vec<_> = tx_results - .execution_results - .iter() - .map(|result| { - if result.was_executed() { - let this_transaction_index = transaction_index; - saturating_add_assign!(transaction_index, 1); - this_transaction_index - } else { - 0 - } - }) - .collect(); - transaction_status_sender.send_transaction_status_batch( - bank.clone(), - txs, - tx_results.execution_results, - TransactionBalancesSet::new(pre_balances, post_balances), - TransactionTokenBalancesSet::new( - pre_token_balances, - post_token_balances, - ), - tx_results.rent_debits, - batch_transaction_indexes, - ); - } - }, - "find_and_send_votes", - ); - execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_time.as_us(); - (commit_time_us, commit_transaction_statuses) + Self::commit_transactions( + batch, + &mut loaded_transactions, + execution_results, + sanitized_txs, + starting_transaction_index, + bank, + &mut pre_balance_info, + &mut execute_and_commit_timings, + transaction_status_sender, + gossip_vote_sender, + signature_count, + executed_transactions_count, + executed_with_successful_result_count, + ) } else { ( 0,