BankingStage Refactor: Separate transaction commiting module (#29808)

Separate transaction commiting module
This commit is contained in:
Andrew Fitzgerald 2023-01-25 19:02:21 -08:00 committed by GitHub
parent e326f4f60b
commit fbb90603a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 164 additions and 145 deletions

View File

@ -4,11 +4,13 @@
use {
self::{
committer::CommitTransactionDetails,
decision_maker::{BufferedPacketsDecision, DecisionMaker},
forwarder::Forwarder,
packet_receiver::PacketReceiver,
},
crate::{
banking_stage::committer::Committer,
banking_trace::BankingPacketReceiver,
immutable_deserialized_packet::ImmutableDeserializedPacket,
latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource},
@ -39,14 +41,8 @@ use {
solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder},
solana_program_runtime::timings::ExecuteTimings,
solana_runtime::{
accounts::TransactionLoadResult,
bank::{
Bank, CommitTransactionCounts, LoadAndExecuteTransactionsOutput,
TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult,
TransactionResults,
},
bank::{Bank, LoadAndExecuteTransactionsOutput, TransactionCheckResult},
bank_forks::BankForks,
bank_utils,
transaction_batch::TransactionBatch,
transaction_error_metrics::TransactionErrorMetrics,
vote_sender_types::ReplayVoteSender,
@ -55,13 +51,10 @@ use {
clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE},
feature_set::allow_votes_to_directly_update_vote_state,
pubkey::Pubkey,
saturating_add_assign,
timing::{timestamp, AtomicInterval},
transaction::{self, SanitizedTransaction, TransactionError, VersionedTransaction},
},
solana_transaction_status::{
token_balances::TransactionTokenBalancesSet, TransactionTokenBalance,
},
solana_transaction_status::TransactionTokenBalance,
std::{
cmp,
collections::HashMap,
@ -76,6 +69,7 @@ use {
},
};
pub mod committer;
mod decision_maker;
mod forwarder;
mod packet_receiver;
@ -110,12 +104,6 @@ struct RecordTransactionsSummary {
starting_transaction_index: Option<usize>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum CommitTransactionDetails {
Committed { compute_units: u64 },
NotCommitted,
}
pub struct ExecuteAndCommitTransactionsOutput {
// Total number of transactions that were passed as candidates for execution
transactions_attempted_execution_count: usize,
@ -892,132 +880,6 @@ impl BankingStage {
}
}
#[allow(clippy::too_many_arguments)]
fn commit_transactions(
batch: &TransactionBatch,
loaded_transactions: &mut [TransactionLoadResult],
execution_results: Vec<TransactionExecutionResult>,
sanitized_txs: &[SanitizedTransaction],
starting_transaction_index: Option<usize>,
bank: &Arc<Bank>,
pre_balance_info: &mut PreBalanceInfo,
execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings,
transaction_status_sender: &Option<TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
signature_count: u64,
executed_transactions_count: usize,
executed_non_vote_transactions_count: usize,
executed_with_successful_result_count: usize,
) -> (u64, Vec<CommitTransactionDetails>) {
inc_new_counter_info!(
"banking_stage-record_transactions_num_to_commit",
executed_transactions_count
);
let (last_blockhash, lamports_per_signature) =
bank.last_blockhash_and_lamports_per_signature();
let (tx_results, commit_time) = measure!(
bank.commit_transactions(
sanitized_txs,
loaded_transactions,
execution_results,
last_blockhash,
lamports_per_signature,
CommitTransactionCounts {
committed_transactions_count: executed_transactions_count as u64,
committed_non_vote_transactions_count: executed_non_vote_transactions_count
as u64,
committed_with_failure_result_count: executed_transactions_count
.saturating_sub(executed_with_successful_result_count)
as u64,
signature_count,
},
&mut execute_and_commit_timings.execute_timings,
),
"commit",
);
let commit_time_us = commit_time.as_us();
execute_and_commit_timings.commit_us = commit_time_us;
let commit_transaction_statuses = tx_results
.execution_results
.iter()
.map(|execution_result| match execution_result.details() {
Some(details) => CommitTransactionDetails::Committed {
compute_units: details.executed_units,
},
None => CommitTransactionDetails::NotCommitted,
})
.collect();
let (_, find_and_send_votes_time) = measure!(
{
bank_utils::find_and_send_votes(
sanitized_txs,
&tx_results,
Some(replay_vote_sender),
);
Self::collect_balances_and_send_status_batch(
transaction_status_sender,
tx_results,
bank,
batch,
pre_balance_info,
starting_transaction_index,
);
},
"find_and_send_votes",
);
execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_time.as_us();
(commit_time_us, commit_transaction_statuses)
}
fn collect_balances_and_send_status_batch(
transaction_status_sender: &Option<TransactionStatusSender>,
tx_results: TransactionResults,
bank: &Arc<Bank>,
batch: &TransactionBatch,
pre_balance_info: &mut PreBalanceInfo,
starting_transaction_index: Option<usize>,
) {
if let Some(transaction_status_sender) = transaction_status_sender {
let txs = batch.sanitized_transactions().to_vec();
let post_balances = bank.collect_balances(batch);
let post_token_balances =
collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals);
let mut transaction_index = starting_transaction_index.unwrap_or_default();
let batch_transaction_indexes: Vec<_> = tx_results
.execution_results
.iter()
.map(|result| {
if result.was_executed() {
let this_transaction_index = transaction_index;
saturating_add_assign!(transaction_index, 1);
this_transaction_index
} else {
0
}
})
.collect();
transaction_status_sender.send_transaction_status_batch(
bank.clone(),
txs,
tx_results.execution_results,
TransactionBalancesSet::new(
std::mem::take(&mut pre_balance_info.native),
post_balances,
),
TransactionTokenBalancesSet::new(
std::mem::take(&mut pre_balance_info.token),
post_token_balances,
),
tx_results.rent_debits,
batch_transaction_indexes,
);
}
}
fn execute_and_commit_transactions_locked(
bank: &Arc<Bank>,
poh: &TransactionRecorder,
@ -1128,7 +990,7 @@ impl BankingStage {
let sanitized_txs = batch.sanitized_transactions();
let (commit_time_us, commit_transaction_statuses) = if executed_transactions_count != 0 {
Self::commit_transactions(
Committer::commit_transactions(
batch,
&mut loaded_transactions,
execution_results,

View File

@ -0,0 +1,157 @@
use {
super::PreBalanceInfo,
crate::leader_slot_banking_stage_timing_metrics::LeaderExecuteAndCommitTimings,
solana_ledger::{
blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances,
},
solana_measure::measure,
solana_runtime::{
accounts::TransactionLoadResult,
bank::{
Bank, CommitTransactionCounts, TransactionBalancesSet, TransactionExecutionResult,
TransactionResults,
},
bank_utils,
transaction_batch::TransactionBatch,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{saturating_add_assign, transaction::SanitizedTransaction},
solana_transaction_status::token_balances::TransactionTokenBalancesSet,
std::sync::Arc,
};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum CommitTransactionDetails {
Committed { compute_units: u64 },
NotCommitted,
}
pub struct Committer;
impl Committer {
#[allow(clippy::too_many_arguments)]
pub(super) fn commit_transactions(
batch: &TransactionBatch,
loaded_transactions: &mut [TransactionLoadResult],
execution_results: Vec<TransactionExecutionResult>,
sanitized_txs: &[SanitizedTransaction],
starting_transaction_index: Option<usize>,
bank: &Arc<Bank>,
pre_balance_info: &mut PreBalanceInfo,
execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings,
transaction_status_sender: &Option<TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
signature_count: u64,
executed_transactions_count: usize,
executed_non_vote_transactions_count: usize,
executed_with_successful_result_count: usize,
) -> (u64, Vec<CommitTransactionDetails>) {
inc_new_counter_info!(
"banking_stage-record_transactions_num_to_commit",
executed_transactions_count
);
let (last_blockhash, lamports_per_signature) =
bank.last_blockhash_and_lamports_per_signature();
let (tx_results, commit_time) = measure!(
bank.commit_transactions(
sanitized_txs,
loaded_transactions,
execution_results,
last_blockhash,
lamports_per_signature,
CommitTransactionCounts {
committed_transactions_count: executed_transactions_count as u64,
committed_non_vote_transactions_count: executed_non_vote_transactions_count
as u64,
committed_with_failure_result_count: executed_transactions_count
.saturating_sub(executed_with_successful_result_count)
as u64,
signature_count,
},
&mut execute_and_commit_timings.execute_timings,
),
"commit",
);
let commit_time_us = commit_time.as_us();
execute_and_commit_timings.commit_us = commit_time_us;
let commit_transaction_statuses = tx_results
.execution_results
.iter()
.map(|execution_result| match execution_result.details() {
Some(details) => CommitTransactionDetails::Committed {
compute_units: details.executed_units,
},
None => CommitTransactionDetails::NotCommitted,
})
.collect();
let (_, find_and_send_votes_time) = measure!(
{
bank_utils::find_and_send_votes(
sanitized_txs,
&tx_results,
Some(replay_vote_sender),
);
Self::collect_balances_and_send_status_batch(
transaction_status_sender,
tx_results,
bank,
batch,
pre_balance_info,
starting_transaction_index,
);
},
"find_and_send_votes",
);
execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_time.as_us();
(commit_time_us, commit_transaction_statuses)
}
fn collect_balances_and_send_status_batch(
transaction_status_sender: &Option<TransactionStatusSender>,
tx_results: TransactionResults,
bank: &Arc<Bank>,
batch: &TransactionBatch,
pre_balance_info: &mut PreBalanceInfo,
starting_transaction_index: Option<usize>,
) {
if let Some(transaction_status_sender) = transaction_status_sender {
let txs = batch.sanitized_transactions().to_vec();
let post_balances = bank.collect_balances(batch);
let post_token_balances =
collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals);
let mut transaction_index = starting_transaction_index.unwrap_or_default();
let batch_transaction_indexes: Vec<_> = tx_results
.execution_results
.iter()
.map(|result| {
if result.was_executed() {
let this_transaction_index = transaction_index;
saturating_add_assign!(transaction_index, 1);
this_transaction_index
} else {
0
}
})
.collect();
transaction_status_sender.send_transaction_status_batch(
bank.clone(),
txs,
tx_results.execution_results,
TransactionBalancesSet::new(
std::mem::take(&mut pre_balance_info.native),
post_balances,
),
TransactionTokenBalancesSet::new(
std::mem::take(&mut pre_balance_info.token),
post_token_balances,
),
tx_results.rent_debits,
batch_transaction_indexes,
);
}
}
}

View File

@ -4,7 +4,7 @@
//!
use {
crate::banking_stage::{BatchedTransactionDetails, CommitTransactionDetails},
crate::banking_stage::{committer::CommitTransactionDetails, BatchedTransactionDetails},
crossbeam_channel::{unbounded, Receiver, Sender},
solana_measure::measure::Measure,
solana_runtime::{