Return actual committed transactions from process_transactions() (#22802)

This commit is contained in:
carllin 2022-02-03 03:56:36 -05:00 committed by GitHub
parent c62f9839a2
commit bd1850df25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 649 additions and 200 deletions

File diff suppressed because it is too large Load Diff

View File

@ -124,15 +124,17 @@ impl QosService {
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
transactions_costs: impl Iterator<Item = &'a TransactionCost>,
bank: &Arc<Bank>,
) -> Vec<transaction::Result<()>> {
) -> (Vec<transaction::Result<()>>, usize) {
let mut cost_tracking_time = Measure::start("cost_tracking_time");
let mut cost_tracker = bank.write_cost_tracker().unwrap();
let mut num_included = 0;
let select_results = transactions
.zip(transactions_costs)
.map(|(tx, cost)| match cost_tracker.try_add(tx, cost) {
Ok(current_block_cost) => {
debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost);
self.metrics.selected_txs_count.fetch_add(1, Ordering::Relaxed);
num_included += 1;
Ok(())
},
Err(e) => {
@ -162,7 +164,7 @@ impl QosService {
self.metrics
.cost_tracking_time
.fetch_add(cost_tracking_time.as_us(), Ordering::Relaxed);
select_results
(select_results, num_included)
}
// metrics are reported by bank slot
@ -542,7 +544,9 @@ mod tests {
bank.write_cost_tracker()
.unwrap()
.set_limits(cost_limit, cost_limit, cost_limit);
let results = qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
let (results, num_selected) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
assert_eq!(num_selected, 2);
// verify that first transfer tx and first vote are allowed
assert_eq!(results.len(), txs.len());

View File

@ -421,7 +421,8 @@ fn setup_fees(bank: Bank) -> Bank {
&[], // transactions
&mut [], // loaded accounts
vec![], // transaction execution results
0, // tx count
0, // executed tx count
0, // executed with failure output tx count
1, // signature count
&mut ExecuteTimings::default(),
);

View File

@ -626,6 +626,20 @@ impl TransactionExecutionResult {
}
}
pub struct LoadAndExecuteTransactionsOutput {
pub loaded_transactions: Vec<TransactionLoadResult>,
// Vector of results indicating whether a transaction was executed or could not
// be executed. Note executed transactions can still have failed!
pub execution_results: Vec<TransactionExecutionResult>,
pub retryable_transaction_indexes: Vec<usize>,
// Total number of transactions that were executed
pub executed_transactions_count: usize,
// Total number of the executed transactions that returned success/not
// an error.
pub executed_with_successful_result_count: usize,
pub signature_count: u64,
}
#[derive(Debug, Clone)]
pub enum DurableNonceFee {
Valid(u64),
@ -3455,13 +3469,11 @@ impl Bank {
let batch = self.prepare_simulation_batch(transaction);
let mut timings = ExecuteTimings::default();
let (
let LoadAndExecuteTransactionsOutput {
loaded_transactions,
mut execution_results,
_retryable_transactions,
_transaction_count,
_signature_count,
) = self.load_and_execute_transactions(
..
} = self.load_and_execute_transactions(
&batch,
// After simulation, transactions will need to be forwarded to the leader
// for processing. During forwarding, the transaction could expire if the
@ -3942,19 +3954,13 @@ impl Bank {
enable_cpi_recording: bool,
enable_log_recording: bool,
timings: &mut ExecuteTimings,
) -> (
Vec<TransactionLoadResult>,
Vec<TransactionExecutionResult>,
Vec<usize>,
u64,
u64,
) {
) -> LoadAndExecuteTransactionsOutput {
let sanitized_txs = batch.sanitized_transactions();
debug!("processing transactions: {}", sanitized_txs.len());
inc_new_counter_info!("bank-process_transactions", sanitized_txs.len());
let mut error_counters = ErrorCounters::default();
let retryable_txs: Vec<_> = batch
let retryable_transaction_indexes: Vec<_> = batch
.lock_results()
.iter()
.enumerate()
@ -3982,7 +3988,7 @@ impl Bank {
check_time.stop();
let mut load_time = Measure::start("accounts_load");
let mut loaded_txs = self.rc.accounts.load_accounts(
let mut loaded_transactions = self.rc.accounts.load_accounts(
&self.ancestors,
sanitized_txs,
check_results,
@ -3996,7 +4002,7 @@ impl Bank {
let mut execution_time = Measure::start("execution_time");
let mut signature_count: u64 = 0;
let execution_results: Vec<TransactionExecutionResult> = loaded_txs
let execution_results: Vec<TransactionExecutionResult> = loaded_transactions
.iter_mut()
.zip(sanitized_txs.iter())
.map(|(accs, tx)| match accs {
@ -4059,7 +4065,8 @@ impl Bank {
timings.load_us = timings.load_us.saturating_add(load_time.as_us());
timings.execute_us = timings.execute_us.saturating_add(execution_time.as_us());
let mut tx_count: u64 = 0;
let mut executed_transactions_count: usize = 0;
let mut executed_with_successful_result_count: usize = 0;
let err_count = &mut error_counters.total;
let transaction_log_collector_config =
self.transaction_log_collector_config.read().unwrap();
@ -4133,9 +4140,13 @@ impl Bank {
}
}
if execution_result.was_executed() {
executed_transactions_count += 1;
}
match execution_result.flattened_result() {
Ok(()) => {
tx_count += 1;
executed_with_successful_result_count += 1;
}
Err(err) => {
if *err_count == 0 {
@ -4149,17 +4160,18 @@ impl Bank {
debug!(
"{} errors of {} txs",
*err_count,
*err_count as u64 + tx_count
*err_count + executed_with_successful_result_count
);
}
Self::update_error_counters(&error_counters);
(
loaded_txs,
LoadAndExecuteTransactionsOutput {
loaded_transactions,
execution_results,
retryable_txs,
tx_count,
retryable_transaction_indexes,
executed_transactions_count,
executed_with_successful_result_count,
signature_count,
)
}
}
/// Load the accounts data len
@ -4260,12 +4272,17 @@ impl Bank {
results
}
/// `committed_transactions_count` is the number of transactions out of `sanitized_txs`
/// that was executed. Of those, `committed_transactions_count`,
/// `committed_with_failure_result_count` is the number of executed transactions that returned
/// a failure result.
pub fn commit_transactions(
&self,
sanitized_txs: &[SanitizedTransaction],
loaded_txs: &mut [TransactionLoadResult],
execution_results: Vec<TransactionExecutionResult>,
tx_count: u64,
committed_transactions_count: u64,
committed_with_failure_result_count: u64,
signature_count: u64,
timings: &mut ExecuteTimings,
) -> TransactionResults {
@ -4274,24 +4291,32 @@ impl Bank {
"commit_transactions() working on a bank that is already frozen or is undergoing freezing!"
);
let tx_count = if self.bank_tranaction_count_fix_enabled() {
committed_transactions_count
} else {
committed_transactions_count.saturating_sub(committed_with_failure_result_count)
};
self.increment_transaction_count(tx_count);
self.increment_signature_count(signature_count);
inc_new_counter_info!("bank-process_transactions-txs", tx_count as usize);
inc_new_counter_info!(
"bank-process_transactions-txs",
committed_transactions_count as usize
);
inc_new_counter_info!("bank-process_transactions-sigs", signature_count as usize);
if !sanitized_txs.is_empty() {
let processed_tx_count = sanitized_txs.len() as u64;
let failed_tx_count = processed_tx_count.saturating_sub(tx_count);
if committed_with_failure_result_count > 0 {
self.transaction_error_count
.fetch_add(failed_tx_count, Relaxed);
self.transaction_entries_count.fetch_add(1, Relaxed);
self.transactions_per_entry_max
.fetch_max(processed_tx_count, Relaxed);
.fetch_add(committed_with_failure_result_count, Relaxed);
}
// Should be equivalent to checking `committed_transactions_count > 0`
if execution_results.iter().any(|result| result.was_executed()) {
self.is_delta.store(true, Relaxed);
self.transaction_entries_count.fetch_add(1, Relaxed);
self.transactions_per_entry_max
.fetch_max(committed_transactions_count, Relaxed);
}
let (blockhash, lamports_per_signature) = self.last_blockhash_and_lamports_per_signature();
@ -5113,20 +5138,28 @@ impl Bank {
vec![]
};
let (mut loaded_txs, execution_results, _, tx_count, signature_count) = self
.load_and_execute_transactions(
batch,
max_age,
enable_cpi_recording,
enable_log_recording,
timings,
);
let LoadAndExecuteTransactionsOutput {
mut loaded_transactions,
execution_results,
executed_transactions_count,
executed_with_successful_result_count,
signature_count,
..
} = self.load_and_execute_transactions(
batch,
max_age,
enable_cpi_recording,
enable_log_recording,
timings,
);
let results = self.commit_transactions(
batch.sanitized_transactions(),
&mut loaded_txs,
&mut loaded_transactions,
execution_results,
tx_count,
executed_transactions_count as u64,
executed_transactions_count.saturating_sub(executed_with_successful_result_count)
as u64,
signature_count,
timings,
);
@ -6201,6 +6234,11 @@ impl Bank {
consumed_budget.saturating_sub(budget_recovery_delta)
}
pub fn bank_tranaction_count_fix_enabled(&self) -> bool {
self.feature_set
.is_active(&feature_set::bank_tranaction_count_fix::id())
}
pub fn shrink_candidate_slots(&self) -> usize {
self.rc.accounts.accounts_db.shrink_candidate_slots()
}

View File

@ -315,6 +315,10 @@ pub mod add_get_processed_sibling_instruction_syscall {
solana_sdk::declare_id!("CFK1hRCNy8JJuAAY8Pb2GjLFNdCThS2qwZNe3izzBMgn");
}
pub mod bank_tranaction_count_fix {
solana_sdk::declare_id!("Vo5siZ442SaZBKPXNocthiXysNviW4UYPwRFggmbgAp");
}
lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@ -388,6 +392,7 @@ lazy_static! {
(spl_associated_token_account_v1_0_4::id(), "SPL Associated Token Account Program release version 1.0.4, tied to token 3.3.0 #22648"),
(reject_vote_account_close_unless_zero_credit_epoch::id(), "fail vote account withdraw to 0 unless account earned 0 credits in last completed epoch"),
(add_get_processed_sibling_instruction_syscall::id(), "add add_get_processed_sibling_instruction_syscall"),
(bank_tranaction_count_fix::id(), "Fixes Bank::transaction_count to include all committed transactions, not just successful ones"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()