Banking stage refactor commit transactions (#28660)
* Refactor commit transactions step * Cleanup token pre-balances * Collect prebalances together * Collect pre/post balances in separate function * Fix clippy
This commit is contained in:
parent
50d811653a
commit
340ad68223
|
@ -40,9 +40,11 @@ use {
|
|||
solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder},
|
||||
solana_program_runtime::timings::ExecuteTimings,
|
||||
solana_runtime::{
|
||||
accounts::TransactionLoadResult,
|
||||
bank::{
|
||||
Bank, CommitTransactionCounts, LoadAndExecuteTransactionsOutput,
|
||||
TransactionBalancesSet, TransactionCheckResult,
|
||||
TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult,
|
||||
TransactionResults,
|
||||
},
|
||||
bank_forks::BankForks,
|
||||
bank_utils,
|
||||
|
@ -65,7 +67,9 @@ use {
|
|||
},
|
||||
solana_streamer::sendmmsg::batch_send,
|
||||
solana_tpu_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
|
||||
solana_transaction_status::token_balances::TransactionTokenBalancesSet,
|
||||
solana_transaction_status::{
|
||||
token_balances::TransactionTokenBalancesSet, TransactionTokenBalance,
|
||||
},
|
||||
std::{
|
||||
cmp,
|
||||
collections::HashMap,
|
||||
|
@ -312,6 +316,13 @@ impl BankingStageStats {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct PreBalanceInfo {
|
||||
native: Vec<Vec<u64>>,
|
||||
token: Vec<Vec<TransactionTokenBalance>>,
|
||||
mint_decimals: HashMap<Pubkey, u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct BatchedTransactionDetails {
|
||||
pub costs: BatchedTransactionCostDetails,
|
||||
|
@ -1192,6 +1203,129 @@ impl BankingStage {
|
|||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn commit_transactions(
|
||||
batch: &TransactionBatch,
|
||||
loaded_transactions: &mut [TransactionLoadResult],
|
||||
execution_results: Vec<TransactionExecutionResult>,
|
||||
sanitized_txs: &[SanitizedTransaction],
|
||||
starting_transaction_index: Option<usize>,
|
||||
bank: &Arc<Bank>,
|
||||
pre_balance_info: &mut PreBalanceInfo,
|
||||
execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings,
|
||||
transaction_status_sender: &Option<TransactionStatusSender>,
|
||||
gossip_vote_sender: &ReplayVoteSender,
|
||||
signature_count: u64,
|
||||
executed_transactions_count: usize,
|
||||
executed_with_successful_result_count: usize,
|
||||
) -> (u64, Vec<CommitTransactionDetails>) {
|
||||
inc_new_counter_info!(
|
||||
"banking_stage-record_transactions_num_to_commit",
|
||||
executed_transactions_count
|
||||
);
|
||||
|
||||
let (last_blockhash, lamports_per_signature) =
|
||||
bank.last_blockhash_and_lamports_per_signature();
|
||||
|
||||
let (tx_results, commit_time) = measure!(
|
||||
bank.commit_transactions(
|
||||
sanitized_txs,
|
||||
loaded_transactions,
|
||||
execution_results,
|
||||
last_blockhash,
|
||||
lamports_per_signature,
|
||||
CommitTransactionCounts {
|
||||
committed_transactions_count: executed_transactions_count as u64,
|
||||
committed_with_failure_result_count: executed_transactions_count
|
||||
.saturating_sub(executed_with_successful_result_count)
|
||||
as u64,
|
||||
signature_count,
|
||||
},
|
||||
&mut execute_and_commit_timings.execute_timings,
|
||||
),
|
||||
"commit",
|
||||
);
|
||||
let commit_time_us = commit_time.as_us();
|
||||
execute_and_commit_timings.commit_us = commit_time_us;
|
||||
|
||||
let commit_transaction_statuses = tx_results
|
||||
.execution_results
|
||||
.iter()
|
||||
.map(|execution_result| match execution_result.details() {
|
||||
Some(details) => CommitTransactionDetails::Committed {
|
||||
compute_units: details.executed_units,
|
||||
},
|
||||
None => CommitTransactionDetails::NotCommitted,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (_, find_and_send_votes_time) = measure!(
|
||||
{
|
||||
bank_utils::find_and_send_votes(
|
||||
sanitized_txs,
|
||||
&tx_results,
|
||||
Some(gossip_vote_sender),
|
||||
);
|
||||
Self::collect_balances_and_send_status_batch(
|
||||
transaction_status_sender,
|
||||
tx_results,
|
||||
bank,
|
||||
batch,
|
||||
pre_balance_info,
|
||||
starting_transaction_index,
|
||||
);
|
||||
},
|
||||
"find_and_send_votes",
|
||||
);
|
||||
execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_time.as_us();
|
||||
(commit_time_us, commit_transaction_statuses)
|
||||
}
|
||||
|
||||
fn collect_balances_and_send_status_batch(
|
||||
transaction_status_sender: &Option<TransactionStatusSender>,
|
||||
tx_results: TransactionResults,
|
||||
bank: &Arc<Bank>,
|
||||
batch: &TransactionBatch,
|
||||
pre_balance_info: &mut PreBalanceInfo,
|
||||
starting_transaction_index: Option<usize>,
|
||||
) {
|
||||
if let Some(transaction_status_sender) = transaction_status_sender {
|
||||
let txs = batch.sanitized_transactions().to_vec();
|
||||
let post_balances = bank.collect_balances(batch);
|
||||
let post_token_balances =
|
||||
collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals);
|
||||
let mut transaction_index = starting_transaction_index.unwrap_or_default();
|
||||
let batch_transaction_indexes: Vec<_> = tx_results
|
||||
.execution_results
|
||||
.iter()
|
||||
.map(|result| {
|
||||
if result.was_executed() {
|
||||
let this_transaction_index = transaction_index;
|
||||
saturating_add_assign!(transaction_index, 1);
|
||||
this_transaction_index
|
||||
} else {
|
||||
0
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
transaction_status_sender.send_transaction_status_batch(
|
||||
bank.clone(),
|
||||
txs,
|
||||
tx_results.execution_results,
|
||||
TransactionBalancesSet::new(
|
||||
std::mem::take(&mut pre_balance_info.native),
|
||||
post_balances,
|
||||
),
|
||||
TransactionTokenBalancesSet::new(
|
||||
std::mem::take(&mut pre_balance_info.token),
|
||||
post_token_balances,
|
||||
),
|
||||
tx_results.rent_debits,
|
||||
batch_transaction_indexes,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_and_commit_transactions_locked(
|
||||
bank: &Arc<Bank>,
|
||||
poh: &TransactionRecorder,
|
||||
|
@ -1201,27 +1335,17 @@ impl BankingStage {
|
|||
log_messages_bytes_limit: Option<usize>,
|
||||
) -> ExecuteAndCommitTransactionsOutput {
|
||||
let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
|
||||
let mut mint_decimals: HashMap<Pubkey, u8> = HashMap::new();
|
||||
|
||||
let ((pre_balances, pre_token_balances), collect_balances_time) = measure!(
|
||||
let mut pre_balance_info = PreBalanceInfo::default();
|
||||
let (_, collect_balances_time) = measure!(
|
||||
{
|
||||
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
|
||||
// the likelihood of any single thread getting starved and processing old ids.
|
||||
// TODO: Banking stage threads should be prioritized to complete faster then this queue
|
||||
// expires.
|
||||
let pre_balances = if transaction_status_sender.is_some() {
|
||||
bank.collect_balances(batch)
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
||||
let pre_token_balances = if transaction_status_sender.is_some() {
|
||||
collect_token_balances(bank, batch, &mut mint_decimals)
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
||||
(pre_balances, pre_token_balances)
|
||||
// If the extra meta-data services are enabled for RPC, collect the
|
||||
// pre-balances for native and token programs.
|
||||
if transaction_status_sender.is_some() {
|
||||
pre_balance_info.native = bank.collect_balances(batch);
|
||||
pre_balance_info.token =
|
||||
collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals)
|
||||
}
|
||||
},
|
||||
"collect_balances",
|
||||
);
|
||||
|
@ -1269,8 +1393,6 @@ impl BankingStage {
|
|||
"execution_results_to_transactions",
|
||||
);
|
||||
|
||||
let (last_blockhash, lamports_per_signature) =
|
||||
bank.last_blockhash_and_lamports_per_signature();
|
||||
let (freeze_lock, freeze_lock_time) = measure!(bank.freeze_lock(), "freeze_lock");
|
||||
execute_and_commit_timings.freeze_lock_us = freeze_lock_time.as_us();
|
||||
|
||||
|
@ -1313,87 +1435,21 @@ impl BankingStage {
|
|||
|
||||
let sanitized_txs = batch.sanitized_transactions();
|
||||
let (commit_time_us, commit_transaction_statuses) = if executed_transactions_count != 0 {
|
||||
inc_new_counter_info!(
|
||||
"banking_stage-record_transactions_num_to_commit",
|
||||
executed_transactions_count
|
||||
);
|
||||
|
||||
let (tx_results, commit_time) = measure!(
|
||||
bank.commit_transactions(
|
||||
sanitized_txs,
|
||||
&mut loaded_transactions,
|
||||
execution_results,
|
||||
last_blockhash,
|
||||
lamports_per_signature,
|
||||
CommitTransactionCounts {
|
||||
committed_transactions_count: executed_transactions_count as u64,
|
||||
committed_with_failure_result_count: executed_transactions_count
|
||||
.saturating_sub(executed_with_successful_result_count)
|
||||
as u64,
|
||||
signature_count,
|
||||
},
|
||||
&mut execute_and_commit_timings.execute_timings,
|
||||
),
|
||||
"commit",
|
||||
);
|
||||
let commit_time_us = commit_time.as_us();
|
||||
execute_and_commit_timings.commit_us = commit_time_us;
|
||||
|
||||
let commit_transaction_statuses = tx_results
|
||||
.execution_results
|
||||
.iter()
|
||||
.map(|execution_result| match execution_result.details() {
|
||||
Some(details) => CommitTransactionDetails::Committed {
|
||||
compute_units: details.executed_units,
|
||||
},
|
||||
None => CommitTransactionDetails::NotCommitted,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (_, find_and_send_votes_time) = measure!(
|
||||
{
|
||||
bank_utils::find_and_send_votes(
|
||||
sanitized_txs,
|
||||
&tx_results,
|
||||
Some(gossip_vote_sender),
|
||||
);
|
||||
if let Some(transaction_status_sender) = transaction_status_sender {
|
||||
let txs = batch.sanitized_transactions().to_vec();
|
||||
let post_balances = bank.collect_balances(batch);
|
||||
let post_token_balances =
|
||||
collect_token_balances(bank, batch, &mut mint_decimals);
|
||||
let mut transaction_index = starting_transaction_index.unwrap_or_default();
|
||||
let batch_transaction_indexes: Vec<_> = tx_results
|
||||
.execution_results
|
||||
.iter()
|
||||
.map(|result| {
|
||||
if result.was_executed() {
|
||||
let this_transaction_index = transaction_index;
|
||||
saturating_add_assign!(transaction_index, 1);
|
||||
this_transaction_index
|
||||
} else {
|
||||
0
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
transaction_status_sender.send_transaction_status_batch(
|
||||
bank.clone(),
|
||||
txs,
|
||||
tx_results.execution_results,
|
||||
TransactionBalancesSet::new(pre_balances, post_balances),
|
||||
TransactionTokenBalancesSet::new(
|
||||
pre_token_balances,
|
||||
post_token_balances,
|
||||
),
|
||||
tx_results.rent_debits,
|
||||
batch_transaction_indexes,
|
||||
);
|
||||
}
|
||||
},
|
||||
"find_and_send_votes",
|
||||
);
|
||||
execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_time.as_us();
|
||||
(commit_time_us, commit_transaction_statuses)
|
||||
Self::commit_transactions(
|
||||
batch,
|
||||
&mut loaded_transactions,
|
||||
execution_results,
|
||||
sanitized_txs,
|
||||
starting_transaction_index,
|
||||
bank,
|
||||
&mut pre_balance_info,
|
||||
&mut execute_and_commit_timings,
|
||||
transaction_status_sender,
|
||||
gossip_vote_sender,
|
||||
signature_count,
|
||||
executed_transactions_count,
|
||||
executed_with_successful_result_count,
|
||||
)
|
||||
} else {
|
||||
(
|
||||
0,
|
||||
|
|
Loading…
Reference in New Issue