From fbb90603a917ff0dcdaf77e1b16693fa2243080e Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 25 Jan 2023 19:02:21 -0800 Subject: [PATCH] BankingStage Refactor: Separate transaction commiting module (#29808) Separate transaction commiting module --- core/src/banking_stage.rs | 150 ++------------------------ core/src/banking_stage/committer.rs | 157 ++++++++++++++++++++++++++++ core/src/qos_service.rs | 2 +- 3 files changed, 164 insertions(+), 145 deletions(-) create mode 100644 core/src/banking_stage/committer.rs diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 956ebc8c94..496ac991e2 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -4,11 +4,13 @@ use { self::{ + committer::CommitTransactionDetails, decision_maker::{BufferedPacketsDecision, DecisionMaker}, forwarder::Forwarder, packet_receiver::PacketReceiver, }, crate::{ + banking_stage::committer::Committer, banking_trace::BankingPacketReceiver, immutable_deserialized_packet::ImmutableDeserializedPacket, latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource}, @@ -39,14 +41,8 @@ use { solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, solana_program_runtime::timings::ExecuteTimings, solana_runtime::{ - accounts::TransactionLoadResult, - bank::{ - Bank, CommitTransactionCounts, LoadAndExecuteTransactionsOutput, - TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult, - TransactionResults, - }, + bank::{Bank, LoadAndExecuteTransactionsOutput, TransactionCheckResult}, bank_forks::BankForks, - bank_utils, transaction_batch::TransactionBatch, transaction_error_metrics::TransactionErrorMetrics, vote_sender_types::ReplayVoteSender, @@ -55,13 +51,10 @@ use { clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, feature_set::allow_votes_to_directly_update_vote_state, pubkey::Pubkey, - saturating_add_assign, timing::{timestamp, AtomicInterval}, transaction::{self, SanitizedTransaction, TransactionError, VersionedTransaction}, }, - solana_transaction_status::{ - token_balances::TransactionTokenBalancesSet, TransactionTokenBalance, - }, + solana_transaction_status::TransactionTokenBalance, std::{ cmp, collections::HashMap, @@ -76,6 +69,7 @@ use { }, }; +pub mod committer; mod decision_maker; mod forwarder; mod packet_receiver; @@ -110,12 +104,6 @@ struct RecordTransactionsSummary { starting_transaction_index: Option, } -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum CommitTransactionDetails { - Committed { compute_units: u64 }, - NotCommitted, -} - pub struct ExecuteAndCommitTransactionsOutput { // Total number of transactions that were passed as candidates for execution transactions_attempted_execution_count: usize, @@ -892,132 +880,6 @@ impl BankingStage { } } - #[allow(clippy::too_many_arguments)] - fn commit_transactions( - batch: &TransactionBatch, - loaded_transactions: &mut [TransactionLoadResult], - execution_results: Vec, - sanitized_txs: &[SanitizedTransaction], - starting_transaction_index: Option, - bank: &Arc, - pre_balance_info: &mut PreBalanceInfo, - execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, - transaction_status_sender: &Option, - replay_vote_sender: &ReplayVoteSender, - signature_count: u64, - executed_transactions_count: usize, - executed_non_vote_transactions_count: usize, - executed_with_successful_result_count: usize, - ) -> (u64, Vec) { - inc_new_counter_info!( - "banking_stage-record_transactions_num_to_commit", - executed_transactions_count - ); - - let (last_blockhash, lamports_per_signature) = - bank.last_blockhash_and_lamports_per_signature(); - - let (tx_results, commit_time) = measure!( - bank.commit_transactions( - sanitized_txs, - loaded_transactions, - execution_results, - last_blockhash, - lamports_per_signature, - CommitTransactionCounts { - committed_transactions_count: executed_transactions_count as u64, - committed_non_vote_transactions_count: executed_non_vote_transactions_count - as u64, - committed_with_failure_result_count: executed_transactions_count - .saturating_sub(executed_with_successful_result_count) - as u64, - signature_count, - }, - &mut execute_and_commit_timings.execute_timings, - ), - "commit", - ); - let commit_time_us = commit_time.as_us(); - execute_and_commit_timings.commit_us = commit_time_us; - - let commit_transaction_statuses = tx_results - .execution_results - .iter() - .map(|execution_result| match execution_result.details() { - Some(details) => CommitTransactionDetails::Committed { - compute_units: details.executed_units, - }, - None => CommitTransactionDetails::NotCommitted, - }) - .collect(); - - let (_, find_and_send_votes_time) = measure!( - { - bank_utils::find_and_send_votes( - sanitized_txs, - &tx_results, - Some(replay_vote_sender), - ); - Self::collect_balances_and_send_status_batch( - transaction_status_sender, - tx_results, - bank, - batch, - pre_balance_info, - starting_transaction_index, - ); - }, - "find_and_send_votes", - ); - execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_time.as_us(); - (commit_time_us, commit_transaction_statuses) - } - - fn collect_balances_and_send_status_batch( - transaction_status_sender: &Option, - tx_results: TransactionResults, - bank: &Arc, - batch: &TransactionBatch, - pre_balance_info: &mut PreBalanceInfo, - starting_transaction_index: Option, - ) { - if let Some(transaction_status_sender) = transaction_status_sender { - let txs = batch.sanitized_transactions().to_vec(); - let post_balances = bank.collect_balances(batch); - let post_token_balances = - collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals); - let mut transaction_index = starting_transaction_index.unwrap_or_default(); - let batch_transaction_indexes: Vec<_> = tx_results - .execution_results - .iter() - .map(|result| { - if result.was_executed() { - let this_transaction_index = transaction_index; - saturating_add_assign!(transaction_index, 1); - this_transaction_index - } else { - 0 - } - }) - .collect(); - transaction_status_sender.send_transaction_status_batch( - bank.clone(), - txs, - tx_results.execution_results, - TransactionBalancesSet::new( - std::mem::take(&mut pre_balance_info.native), - post_balances, - ), - TransactionTokenBalancesSet::new( - std::mem::take(&mut pre_balance_info.token), - post_token_balances, - ), - tx_results.rent_debits, - batch_transaction_indexes, - ); - } - } - fn execute_and_commit_transactions_locked( bank: &Arc, poh: &TransactionRecorder, @@ -1128,7 +990,7 @@ impl BankingStage { let sanitized_txs = batch.sanitized_transactions(); let (commit_time_us, commit_transaction_statuses) = if executed_transactions_count != 0 { - Self::commit_transactions( + Committer::commit_transactions( batch, &mut loaded_transactions, execution_results, diff --git a/core/src/banking_stage/committer.rs b/core/src/banking_stage/committer.rs new file mode 100644 index 0000000000..c0fa6ff3ae --- /dev/null +++ b/core/src/banking_stage/committer.rs @@ -0,0 +1,157 @@ +use { + super::PreBalanceInfo, + crate::leader_slot_banking_stage_timing_metrics::LeaderExecuteAndCommitTimings, + solana_ledger::{ + blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances, + }, + solana_measure::measure, + solana_runtime::{ + accounts::TransactionLoadResult, + bank::{ + Bank, CommitTransactionCounts, TransactionBalancesSet, TransactionExecutionResult, + TransactionResults, + }, + bank_utils, + transaction_batch::TransactionBatch, + vote_sender_types::ReplayVoteSender, + }, + solana_sdk::{saturating_add_assign, transaction::SanitizedTransaction}, + solana_transaction_status::token_balances::TransactionTokenBalancesSet, + std::sync::Arc, +}; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum CommitTransactionDetails { + Committed { compute_units: u64 }, + NotCommitted, +} + +pub struct Committer; + +impl Committer { + #[allow(clippy::too_many_arguments)] + pub(super) fn commit_transactions( + batch: &TransactionBatch, + loaded_transactions: &mut [TransactionLoadResult], + execution_results: Vec, + sanitized_txs: &[SanitizedTransaction], + starting_transaction_index: Option, + bank: &Arc, + pre_balance_info: &mut PreBalanceInfo, + execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, + transaction_status_sender: &Option, + replay_vote_sender: &ReplayVoteSender, + signature_count: u64, + executed_transactions_count: usize, + executed_non_vote_transactions_count: usize, + executed_with_successful_result_count: usize, + ) -> (u64, Vec) { + inc_new_counter_info!( + "banking_stage-record_transactions_num_to_commit", + executed_transactions_count + ); + + let (last_blockhash, lamports_per_signature) = + bank.last_blockhash_and_lamports_per_signature(); + + let (tx_results, commit_time) = measure!( + bank.commit_transactions( + sanitized_txs, + loaded_transactions, + execution_results, + last_blockhash, + lamports_per_signature, + CommitTransactionCounts { + committed_transactions_count: executed_transactions_count as u64, + committed_non_vote_transactions_count: executed_non_vote_transactions_count + as u64, + committed_with_failure_result_count: executed_transactions_count + .saturating_sub(executed_with_successful_result_count) + as u64, + signature_count, + }, + &mut execute_and_commit_timings.execute_timings, + ), + "commit", + ); + let commit_time_us = commit_time.as_us(); + execute_and_commit_timings.commit_us = commit_time_us; + + let commit_transaction_statuses = tx_results + .execution_results + .iter() + .map(|execution_result| match execution_result.details() { + Some(details) => CommitTransactionDetails::Committed { + compute_units: details.executed_units, + }, + None => CommitTransactionDetails::NotCommitted, + }) + .collect(); + + let (_, find_and_send_votes_time) = measure!( + { + bank_utils::find_and_send_votes( + sanitized_txs, + &tx_results, + Some(replay_vote_sender), + ); + Self::collect_balances_and_send_status_batch( + transaction_status_sender, + tx_results, + bank, + batch, + pre_balance_info, + starting_transaction_index, + ); + }, + "find_and_send_votes", + ); + execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_time.as_us(); + (commit_time_us, commit_transaction_statuses) + } + + fn collect_balances_and_send_status_batch( + transaction_status_sender: &Option, + tx_results: TransactionResults, + bank: &Arc, + batch: &TransactionBatch, + pre_balance_info: &mut PreBalanceInfo, + starting_transaction_index: Option, + ) { + if let Some(transaction_status_sender) = transaction_status_sender { + let txs = batch.sanitized_transactions().to_vec(); + let post_balances = bank.collect_balances(batch); + let post_token_balances = + collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals); + let mut transaction_index = starting_transaction_index.unwrap_or_default(); + let batch_transaction_indexes: Vec<_> = tx_results + .execution_results + .iter() + .map(|result| { + if result.was_executed() { + let this_transaction_index = transaction_index; + saturating_add_assign!(transaction_index, 1); + this_transaction_index + } else { + 0 + } + }) + .collect(); + transaction_status_sender.send_transaction_status_batch( + bank.clone(), + txs, + tx_results.execution_results, + TransactionBalancesSet::new( + std::mem::take(&mut pre_balance_info.native), + post_balances, + ), + TransactionTokenBalancesSet::new( + std::mem::take(&mut pre_balance_info.token), + post_token_balances, + ), + tx_results.rent_debits, + batch_transaction_indexes, + ); + } + } +} diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index 05f68f83dd..6383f6eda2 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -4,7 +4,7 @@ //! use { - crate::banking_stage::{BatchedTransactionDetails, CommitTransactionDetails}, + crate::banking_stage::{committer::CommitTransactionDetails, BatchedTransactionDetails}, crossbeam_channel::{unbounded, Receiver, Sender}, solana_measure::measure::Measure, solana_runtime::{