diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 7c407571e6..19626d39a4 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -22,7 +22,10 @@ use { solana_program_runtime::timings::ExecuteTimings, solana_runtime::{ accounts_db::ErrorCounters, - bank::{Bank, TransactionBalancesSet, TransactionCheckResult, TransactionExecutionResult}, + bank::{ + Bank, LoadAndExecuteTransactionsOutput, TransactionBalancesSet, TransactionCheckResult, + TransactionExecutionResult, + }, bank_utils, cost_model::{CostModel, TransactionCost}, transaction_batch::TransactionBatch, @@ -83,6 +86,72 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; +/// A summary of what happened to transactions passed to the execution pipeline. +/// Transactions can +/// 1) Did not even make it to execution due to being filtered out by things like AccountInUse +/// lock conflictss or CostModel compute limits. These types of errors are retryable and +/// counted in `Self::retryable_transaction_indexes`. +/// 2) Did not execute due to some fatal error like too old, or duplicate signature. These +/// will be dropped from the transactions queue and not counted in `Self::retryable_transaction_indexes` +/// 3) Were executed and committed, captured by `committed_transactions_count` below. +/// 4) Were executed and failed commit, captured by `failed_commit_count` below. +struct ProcessTransactionsSummary { + // Returns true if we hit the end of the block/max PoH height for the block before + // processing all the transactions in the batch. + reached_max_poh_height: bool, + + // Total number of transactions that were passed as candidates for execution. See description + // of struct above for possible outcomes for these transactions + #[allow(dead_code)] + transactions_attempted_execution_count: usize, + + // Total number of transactions that made it into the block + #[allow(dead_code)] + committed_transactions_count: usize, + + // Total number of transactions that made it into the block where the transactions + // output from execution was success/no error. + #[allow(dead_code)] + committed_transactions_with_successful_result_count: usize, + + // All transactions that were executed but then failed record because the + // slot ended + #[allow(dead_code)] + failed_commit_count: usize, + + // Indexes of transactions in the transactions slice that were not committed but are retryable + retryable_transaction_indexes: Vec, + + // The number of transactions filtered out by the cost model + #[allow(dead_code)] + cost_model_throttled_transactions_count: usize, +} + +pub struct ProcessTransactionBatchOutput { + // The number of transactions filtered out by the cost model + cost_model_throttled_transactions_count: usize, + execute_and_commit_transactions_output: ExecuteAndCommitTransactionsOutput, +} + +pub struct ExecuteAndCommitTransactionsOutput { + // Total number of transactions that were passed as candidates for execution + transactions_attempted_execution_count: usize, + // The number of transactions of that were executed. See description of in `ProcessTransactionsSummary` + // for possible outcomes of execution. + executed_transactions_count: usize, + // Total number of the executed transactions that returned success/not + // an error. + executed_with_successful_result_count: usize, + // Transactions that either were not executed, or were executed and failed to be committed due + // to the block ending. + retryable_transaction_indexes: Vec, + // A result that indicates whether transactions were successfully + // committed into the Poh stream. If so, the result tells us + // how many such transactions were committed + commit_transactions_result: Result<(), PohRecorderError>, + execute_timings: ExecuteTimings, +} + #[derive(Debug, Default)] pub struct BankingStageStats { last_report: AtomicInterval, @@ -485,7 +554,7 @@ impl BankingStage { qos_service: &QosService, ) { let mut rebuffered_packet_count = 0; - let mut new_tx_count = 0; + let mut consumed_buffered_packets_count = 0; let buffered_packet_batches_len = buffered_packet_batches.len(); let mut proc_start = Measure::start("consume_buffered_process"); let mut reached_end_of_slot = None; @@ -517,19 +586,25 @@ impl BankingStage { bank_creation_time, }) = bank_start { - let (processed, verified_txs_len, new_unprocessed_indexes) = - Self::process_packets_transactions( - &working_bank, - &bank_creation_time, - recorder, - packet_batch, - original_unprocessed_indexes.to_owned(), - transaction_status_sender.clone(), - gossip_vote_sender, - banking_stage_stats, - qos_service, - ); - if processed < verified_txs_len + let process_transactions_summary = Self::process_packets_transactions( + &working_bank, + &bank_creation_time, + recorder, + packet_batch, + original_unprocessed_indexes.to_owned(), + transaction_status_sender.clone(), + gossip_vote_sender, + banking_stage_stats, + qos_service, + ); + let ProcessTransactionsSummary { + reached_max_poh_height, + retryable_transaction_indexes, + .. + } = process_transactions_summary; + + if reached_max_poh_height + // TODO adding timing metrics here from when bank was added to now || !Bank::should_bank_still_be_processing_txs( &bank_creation_time, max_tx_ingestion_ns, @@ -540,22 +615,31 @@ impl BankingStage { working_bank, )); } - new_tx_count += processed; + + // The difference between all transactions passed to execution and the ones that + // are retryable were the ones that were either: + // 1) Committed into the block + // 2) Dropped without being committed because they had some fatal error (too old, + // duplicate signature, etc.) + // + // Note: This assumes that every packet deserializes into one transaction! + consumed_buffered_packets_count += original_unprocessed_indexes + .len() + .saturating_sub(retryable_transaction_indexes.len()); // Out of the buffered packets just retried, collect any still unprocessed // transactions in this batch for forwarding - rebuffered_packet_count += new_unprocessed_indexes.len(); + rebuffered_packet_count += retryable_transaction_indexes.len(); let has_more_unprocessed_transactions = Self::update_buffered_packets_with_new_unprocessed( original_unprocessed_indexes, - new_unprocessed_indexes, + retryable_transaction_indexes, ); if let Some(test_fn) = &test_fn { test_fn(); } has_more_unprocessed_transactions } else { - rebuffered_packet_count += original_unprocessed_indexes.len(); // `original_unprocessed_indexes` must have remaining packets to process // if not yet processed. assert!(Self::packet_has_more_unprocessed_transactions( @@ -574,8 +658,8 @@ impl BankingStage { timestamp(), buffered_packet_batches_len, proc_start.as_ms(), - new_tx_count, - (new_tx_count as f32) / (proc_start.as_s()) + consumed_buffered_packets_count, + (consumed_buffered_packets_count as f32) / (proc_start.as_s()) ); banking_stage_stats @@ -586,7 +670,7 @@ impl BankingStage { .fetch_add(rebuffered_packet_count, Ordering::Relaxed); banking_stage_stats .consumed_buffered_packets_count - .fetch_add(new_tx_count, Ordering::Relaxed); + .fetch_add(consumed_buffered_packets_count, Ordering::Relaxed); } fn consume_or_forward_packets( @@ -882,13 +966,13 @@ impl BankingStage { (Ok(num_to_commit), vec![]) } - fn process_and_record_transactions_locked( + fn execute_and_commit_transactions_locked( bank: &Arc, poh: &TransactionRecorder, batch: &TransactionBatch, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, - ) -> (Result, Vec, ExecuteTimings) { + ) -> ExecuteAndCommitTransactionsOutput { let mut load_execute_time = Measure::start("load_execute_time"); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce // the likelihood of any single thread getting starved and processing old ids. @@ -909,48 +993,71 @@ impl BankingStage { }; let mut execute_timings = ExecuteTimings::default(); - let (mut loaded_accounts, execution_results, mut retryable_txs, tx_count, signature_count) = - bank.load_and_execute_transactions( - batch, - MAX_PROCESSING_AGE, - transaction_status_sender.is_some(), - transaction_status_sender.is_some(), - &mut execute_timings, - ); + let load_and_execute_transactions_output = bank.load_and_execute_transactions( + batch, + MAX_PROCESSING_AGE, + transaction_status_sender.is_some(), + transaction_status_sender.is_some(), + &mut execute_timings, + ); load_execute_time.stop(); + let LoadAndExecuteTransactionsOutput { + mut loaded_transactions, + execution_results, + mut retryable_transaction_indexes, + executed_transactions_count, + executed_with_successful_result_count, + signature_count, + .. + } = load_and_execute_transactions_output; + let freeze_lock = bank.freeze_lock(); let mut record_time = Measure::start("record_time"); - let (num_to_commit, retryable_record_txs) = Self::record_transactions( - bank.slot(), - batch.sanitized_transactions(), - &execution_results, - poh, - ); + let (commit_transactions_result, retryable_record_transaction_indexes) = + Self::record_transactions( + bank.slot(), + batch.sanitized_transactions(), + &execution_results, + poh, + ); inc_new_counter_info!( "banking_stage-record_transactions_num_to_commit", - *num_to_commit.as_ref().unwrap_or(&0) + *commit_transactions_result.as_ref().unwrap_or(&0) ); inc_new_counter_info!( "banking_stage-record_transactions_retryable_record_txs", - retryable_record_txs.len() + retryable_record_transaction_indexes.len() ); - retryable_txs.extend(retryable_record_txs); - if num_to_commit.is_err() { - return (num_to_commit, retryable_txs, execute_timings); + retryable_transaction_indexes.extend(retryable_record_transaction_indexes); + let transactions_attempted_execution_count = execution_results.len(); + if let Err(e) = commit_transactions_result { + return ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + executed_with_successful_result_count, + retryable_transaction_indexes, + commit_transactions_result: Err(e), + execute_timings, + }; } record_time.stop(); let mut commit_time = Measure::start("commit_time"); let sanitized_txs = batch.sanitized_transactions(); - let num_to_commit = num_to_commit.unwrap(); - if num_to_commit != 0 { + let committed_transaction_count = commit_transactions_result.unwrap(); + // Note: `committed_transaction_count` should equal `executed_transactions_count`, since + // every executed transaction should have been recorded into the Poh stream if the record + // was successful (there's no partial records). + if committed_transaction_count != 0 { let tx_results = bank.commit_transactions( sanitized_txs, - &mut loaded_accounts, + &mut loaded_transactions, execution_results, - tx_count, + executed_transactions_count as u64, + executed_transactions_count.saturating_sub(executed_with_successful_result_count) + as u64, signature_count, &mut execute_timings, ); @@ -984,11 +1091,18 @@ impl BankingStage { ); debug!( - "process_and_record_transactions_locked: {:?}", + "execute_and_commit_transactions_locked: {:?}", execute_timings ); - (Ok(num_to_commit), retryable_txs, execute_timings) + ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + executed_with_successful_result_count, + retryable_transaction_indexes, + commit_transactions_result: Ok(()), + execute_timings, + } } pub fn process_and_record_transactions( @@ -999,12 +1113,14 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, qos_service: &QosService, - ) -> (Result, Vec) { + ) -> ProcessTransactionBatchOutput { let tx_costs = qos_service.compute_transaction_costs(txs.iter()); - let transactions_qos_results = + let (transactions_qos_results, num_included) = qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank); + let cost_model_throttled_transactions_count = txs.len().saturating_sub(num_included); + qos_service.accumulate_estimated_transaction_costs( &Self::accumulate_batched_transaction_costs( tx_costs.iter(), @@ -1023,22 +1139,32 @@ impl BankingStage { // retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit // WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit // and WouldExceedMaxAccountDataCostLimit - let (result, mut retryable_txs, execute_timings) = - Self::process_and_record_transactions_locked( + + let mut execute_and_commit_transactions_output = + Self::execute_and_commit_transactions_locked( bank, poh, &batch, transaction_status_sender, gossip_vote_sender, ); - retryable_txs.iter_mut().for_each(|x| *x += chunk_offset); + + let ExecuteAndCommitTransactionsOutput { + ref mut retryable_transaction_indexes, + ref execute_timings, + .. + } = execute_and_commit_transactions_output; + + retryable_transaction_indexes + .iter_mut() + .for_each(|x| *x += chunk_offset); let mut unlock_time = Measure::start("unlock_time"); // Once the accounts are new transactions can enter the pipeline to process them drop(batch); unlock_time.stop(); - let (cu, us) = Self::accumulate_execute_units_and_time(&execute_timings); + let (cu, us) = Self::accumulate_execute_units_and_time(execute_timings); qos_service.accumulate_actual_execute_cu(cu); qos_service.accumulate_actual_execute_time(us); @@ -1053,7 +1179,10 @@ impl BankingStage { txs.len(), ); - (result, retryable_txs) + ProcessTransactionBatchOutput { + cost_model_throttled_transactions_count, + execute_and_commit_transactions_output, + } } // rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and @@ -1111,16 +1240,27 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, qos_service: &QosService, - ) -> (usize, Vec) { + ) -> ProcessTransactionsSummary { let mut chunk_start = 0; - let mut unprocessed_txs = vec![]; - + let mut all_retryable_tx_indexes = vec![]; + // All the transactions that attempted execution. See description of + // struct ProcessTransactionsSummary above for possible outcomes. + let mut total_transactions_attempted_execution_count: usize = 0; + // All transactions that were executed and committed + let mut total_committed_transactions_count: usize = 0; + // All transactions that were executed and committed with a successful result + let mut total_committed_transactions_with_successful_result_count: usize = 0; + // All transactions that were executed but then failed record because the + // slot ended + let mut total_failed_commit_count: usize = 0; + let mut total_cost_model_throttled_transactions_count: usize = 0; + let mut reached_max_poh_height = false; while chunk_start != transactions.len() { let chunk_end = std::cmp::min( transactions.len(), chunk_start + MAX_NUM_TRANSACTIONS_PER_BATCH, ); - let (result, retryable_txs_in_chunk) = Self::process_and_record_transactions( + let process_transaction_batch_output = Self::process_and_record_transactions( bank, &transactions[chunk_start..chunk_end], poh, @@ -1129,17 +1269,56 @@ impl BankingStage { gossip_vote_sender, qos_service, ); - trace!("process_transactions result: {:?}", result); + + let ProcessTransactionBatchOutput { + cost_model_throttled_transactions_count: new_cost_model_throttled_transactions_count, + execute_and_commit_transactions_output, + } = process_transaction_batch_output; + total_cost_model_throttled_transactions_count = + total_cost_model_throttled_transactions_count + .saturating_add(new_cost_model_throttled_transactions_count); + + let ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count: new_transactions_attempted_execution_count, + executed_transactions_count: new_executed_transactions_count, + executed_with_successful_result_count: new_executed_with_successful_result_count, + retryable_transaction_indexes: new_retryable_transaction_indexes, + commit_transactions_result: new_commit_transactions_result, + .. + } = execute_and_commit_transactions_output; + + total_transactions_attempted_execution_count = + total_transactions_attempted_execution_count + .saturating_add(new_transactions_attempted_execution_count); + + trace!( + "process_transactions result: {:?}", + new_commit_transactions_result + ); + + if new_commit_transactions_result.is_ok() { + total_committed_transactions_count = total_committed_transactions_count + .saturating_add(new_executed_transactions_count); + total_committed_transactions_with_successful_result_count = + total_committed_transactions_with_successful_result_count + .saturating_add(new_executed_with_successful_result_count); + } else { + total_failed_commit_count = + total_failed_commit_count.saturating_add(new_executed_transactions_count); + } // Add the retryable txs (transactions that errored in a way that warrants a retry) // to the list of unprocessed txs. - unprocessed_txs.extend_from_slice(&retryable_txs_in_chunk); + all_retryable_tx_indexes.extend_from_slice(&new_retryable_transaction_indexes); // If `bank_creation_time` is None, it's a test so ignore the option so // allow processing let should_bank_still_be_processing_txs = Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot); - match (result, should_bank_still_be_processing_txs) { + match ( + new_commit_transactions_result, + should_bank_still_be_processing_txs, + ) { (Err(PohRecorderError::MaxHeightReached), _) | (_, false) => { info!( "process transactions: max height reached slot: {} height: {}", @@ -1149,7 +1328,8 @@ impl BankingStage { // process_and_record_transactions has returned all retryable errors in // transactions[chunk_start..chunk_end], so we just need to push the remaining // transactions into the unprocessed queue. - unprocessed_txs.extend(chunk_end..transactions.len()); + all_retryable_tx_indexes.extend(chunk_end..transactions.len()); + reached_max_poh_height = true; break; } _ => (), @@ -1158,7 +1338,16 @@ impl BankingStage { chunk_start = chunk_end; } - (chunk_start, unprocessed_txs) + ProcessTransactionsSummary { + reached_max_poh_height, + transactions_attempted_execution_count: total_transactions_attempted_execution_count, + committed_transactions_count: total_committed_transactions_count, + committed_transactions_with_successful_result_count: + total_committed_transactions_with_successful_result_count, + failed_commit_count: total_failed_commit_count, + retryable_transaction_indexes: all_retryable_tx_indexes, + cost_model_throttled_transactions_count: total_cost_model_throttled_transactions_count, + } } // This function creates a filter of transaction results with Ok() for every pending @@ -1281,7 +1470,7 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, qos_service: &QosService, - ) -> (usize, usize, Vec) { + ) -> ProcessTransactionsSummary { let mut packet_conversion_time = Measure::start("packet_conversion"); let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( packet_batch, @@ -1293,10 +1482,9 @@ impl BankingStage { packet_conversion_time.stop(); inc_new_counter_info!("banking_stage-packet_conversion", 1); - let tx_len = transactions.len(); - let mut process_tx_time = Measure::start("process_tx_time"); - let (processed, unprocessed_tx_indexes) = Self::process_transactions( + + let mut process_transactions_summary = Self::process_transactions( bank, bank_creation_time, &transactions, @@ -1306,26 +1494,29 @@ impl BankingStage { qos_service, ); process_tx_time.stop(); - let unprocessed_tx_count = unprocessed_tx_indexes.len(); - inc_new_counter_info!( - "banking_stage-unprocessed_transactions", - unprocessed_tx_count - ); + + let ProcessTransactionsSummary { + ref retryable_transaction_indexes, + .. + } = process_transactions_summary; let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); - let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( + let filtered_retryable_tx_indexes = Self::filter_pending_packets_from_pending_txs( bank, &transactions, &transaction_to_packet_indexes, - &unprocessed_tx_indexes, + retryable_transaction_indexes, ); filter_pending_packets_time.stop(); inc_new_counter_info!( "banking_stage-dropped_tx_before_forwarding", - unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) + retryable_transaction_indexes + .len() + .saturating_sub(filtered_retryable_tx_indexes.len()) ); + // Increment timing-based metrics banking_stage_stats .packet_conversion_elapsed .fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed); @@ -1336,7 +1527,8 @@ impl BankingStage { .filter_pending_packets_elapsed .fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed); - (processed, tx_len, filtered_unprocessed_packet_indexes) + process_transactions_summary.retryable_transaction_indexes = filtered_retryable_tx_indexes; + process_transactions_summary } fn filter_unprocessed_packets( @@ -1367,8 +1559,6 @@ impl BankingStage { ); unprocessed_packet_conversion_time.stop(); - let tx_count = transaction_to_packet_indexes.len(); - let unprocessed_tx_indexes = (0..transactions.len()).collect_vec(); let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( bank, @@ -1379,7 +1569,9 @@ impl BankingStage { inc_new_counter_info!( "banking_stage-dropped_tx_before_forwarding", - tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) + unprocessed_tx_indexes + .len() + .saturating_sub(filtered_unprocessed_packet_indexes.len()) ); banking_stage_stats .unprocessed_packet_conversion_elapsed @@ -1571,7 +1763,7 @@ mod tests { solana_ledger::{ blockstore::{entries_to_test_shreds, Blockstore}, genesis_utils::{create_genesis_config, GenesisConfigInfo}, - get_tmp_ledger_path, + get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, }, solana_perf::packet::{to_packet_batches, PacketFlags}, @@ -1629,10 +1821,10 @@ mod tests { let (verified_sender, verified_receiver) = unbounded(); let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( - Blockstore::open(&ledger_path) + Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"), ); let (exit, poh_recorder, poh_service, _entry_receiever) = @@ -1658,7 +1850,7 @@ mod tests { banking_stage.join().unwrap(); poh_service.join().unwrap(); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } #[test] @@ -1673,10 +1865,10 @@ mod tests { let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( - Blockstore::open(&ledger_path) + Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"), ); let poh_config = PohConfig { @@ -1719,7 +1911,7 @@ mod tests { assert_eq!(entries[entries.len() - 1].hash, bank.last_blockhash()); banking_stage.join().unwrap(); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } pub fn convert_from_old_verified( @@ -1747,10 +1939,10 @@ mod tests { let (verified_sender, verified_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( - Blockstore::open(&ledger_path) + Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"), ); let poh_config = PohConfig { @@ -1852,7 +2044,7 @@ mod tests { drop(entry_receiver); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } #[test] @@ -1894,7 +2086,7 @@ mod tests { let (vote_sender, vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); @@ -1902,7 +2094,7 @@ mod tests { // start a banking_stage to eat verified receiver let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); let blockstore = Arc::new( - Blockstore::open(&ledger_path) + Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"), ); let poh_config = PohConfig { @@ -1958,7 +2150,7 @@ mod tests { // the account balance below zero before the credit is added. assert_eq!(bank.get_balance(&alice.pubkey()), 2); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } fn sanitize_transactions(txs: Vec) -> Vec { @@ -1977,9 +2169,9 @@ mod tests { .. } = create_genesis_config(10_000); let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { - let blockstore = Blockstore::open(&ledger_path) + let blockstore = Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"); let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( // TODO use record_receiver @@ -2055,7 +2247,7 @@ mod tests { .store(true, Ordering::Relaxed); let _ = poh_simulator.join(); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } #[test] @@ -2224,9 +2416,9 @@ mod tests { genesis_config.hash(), )]); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { - let blockstore = Blockstore::open(&ledger_path) + let blockstore = Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"); let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( bank.tick_height(), @@ -2248,7 +2440,7 @@ mod tests { poh_recorder.lock().unwrap().set_bank(&bank); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); - BankingStage::process_and_record_transactions( + let process_transactions_batch_output = BankingStage::process_and_record_transactions( &bank, &transactions, &recorder, @@ -2256,9 +2448,20 @@ mod tests { None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), - ) - .0 - .unwrap(); + ); + + let ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + executed_with_successful_result_count, + commit_transactions_result, + .. + } = process_transactions_batch_output.execute_and_commit_transactions_output; + + assert_eq!(transactions_attempted_execution_count, 1); + assert_eq!(executed_transactions_count, 1); + assert_eq!(executed_with_successful_result_count, 1); + assert!(commit_transactions_result.is_ok()); // Tick up to max tick height while poh_recorder.lock().unwrap().tick_height() != bank.max_tick_height() { @@ -2289,17 +2492,31 @@ mod tests { genesis_config.hash(), )]); + let process_transactions_batch_output = BankingStage::process_and_record_transactions( + &bank, + &transactions, + &recorder, + 0, + None, + &gossip_vote_sender, + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), + ); + + let ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + executed_with_successful_result_count, + retryable_transaction_indexes, + commit_transactions_result, + .. + } = process_transactions_batch_output.execute_and_commit_transactions_output; + assert_eq!(transactions_attempted_execution_count, 1); + // Transactions was still executed, just wasn't committed, so should be counted here. + assert_eq!(executed_transactions_count, 1); + assert_eq!(executed_with_successful_result_count, 1); + assert_eq!(retryable_transaction_indexes, vec![0]); assert_matches!( - BankingStage::process_and_record_transactions( - &bank, - &transactions, - &recorder, - 0, - None, - &gossip_vote_sender, - &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), - ) - .0, + commit_transactions_result, Err(PohRecorderError::MaxHeightReached) ); @@ -2312,7 +2529,7 @@ mod tests { assert_eq!(bank.get_balance(&pubkey), 1); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } fn simulate_poh( @@ -2353,9 +2570,9 @@ mod tests { system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()), ]); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { - let blockstore = Blockstore::open(&ledger_path) + let blockstore = Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"); let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new( bank.tick_height(), @@ -2378,7 +2595,7 @@ mod tests { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); - let (result, unprocessed) = BankingStage::process_and_record_transactions( + let process_transactions_batch_output = BankingStage::process_and_record_transactions( &bank, &transactions, &recorder, @@ -2395,10 +2612,20 @@ mod tests { .store(true, Ordering::Relaxed); let _ = poh_simulator.join(); - assert!(result.is_ok()); - assert_eq!(unprocessed.len(), 1); + let ExecuteAndCommitTransactionsOutput { + transactions_attempted_execution_count, + executed_transactions_count, + retryable_transaction_indexes, + commit_transactions_result, + .. + } = process_transactions_batch_output.execute_and_commit_transactions_output; + + assert_eq!(transactions_attempted_execution_count, 2); + assert_eq!(executed_transactions_count, 1); + assert_eq!(retryable_transaction_indexes, vec![1],); + assert!(commit_transactions_result.is_ok()); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } #[test] @@ -2461,9 +2688,9 @@ mod tests { genesis_config.hash(), )]); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { - let blockstore = Blockstore::open(&ledger_path) + let blockstore = Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"); let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new( bank.tick_height(), @@ -2486,28 +2713,202 @@ mod tests { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); - let (processed_transactions_count, mut retryable_txs) = - BankingStage::process_transactions( - &bank, - &Instant::now(), - &transactions, - &recorder, - None, - &gossip_vote_sender, - &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), - ); + let process_transactions_summary = BankingStage::process_transactions( + &bank, + &Instant::now(), + &transactions, + &recorder, + None, + &gossip_vote_sender, + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), + ); - assert_eq!(processed_transactions_count, 0,); + let ProcessTransactionsSummary { + reached_max_poh_height, + transactions_attempted_execution_count, + committed_transactions_count, + committed_transactions_with_successful_result_count, + failed_commit_count, + mut retryable_transaction_indexes, + .. + } = process_transactions_summary; + assert!(reached_max_poh_height); + assert_eq!(transactions_attempted_execution_count, 1); + assert_eq!(failed_commit_count, 1); + // MaxHeightReached error does not commit, should be zero here + assert_eq!(committed_transactions_count, 0); + assert_eq!(committed_transactions_with_successful_result_count, 0); - retryable_txs.sort_unstable(); + retryable_transaction_indexes.sort_unstable(); let expected: Vec = (0..transactions.len()).collect(); - assert_eq!(retryable_txs, expected); + assert_eq!(retryable_transaction_indexes, expected); recorder.is_exited.store(true, Ordering::Relaxed); let _ = poh_simulator.join(); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); + } + + fn execute_transactions_with_dummy_poh_service( + bank: Arc, + transactions: Vec, + ) -> ProcessTransactionsSummary { + let transactions = sanitize_transactions(transactions); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.clone(), + Some((4, 4)), + bank.ticks_per_slot(), + &Pubkey::new_unique(), + &Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &Arc::new(PohConfig::default()), + Arc::new(AtomicBool::default()), + ); + let recorder = poh_recorder.recorder(); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + + poh_recorder.lock().unwrap().set_bank(&bank); + + let poh_simulator = simulate_poh(record_receiver, &poh_recorder); + + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + + let process_transactions_summary = BankingStage::process_transactions( + &bank, + &Instant::now(), + &transactions, + &recorder, + None, + &gossip_vote_sender, + &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), + ); + + poh_recorder + .lock() + .unwrap() + .is_exited + .store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); + + process_transactions_summary + } + + #[test] + fn test_process_transactions_instruction_error() { + solana_logger::setup(); + let lamports = 10_000; + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_slow_genesis_config(lamports); + let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); + + // Transfer more than the balance of the mint keypair, should cause a + // InstructionError::InsufficientFunds that is then committed. Needs to be + // MAX_NUM_TRANSACTIONS_PER_BATCH at least so it doesn't conflict on account locks + // with the below transaction + let mut transactions = vec![ + system_transaction::transfer( + &mint_keypair, + &Pubkey::new_unique(), + lamports + 1, + genesis_config.hash(), + ); + MAX_NUM_TRANSACTIONS_PER_BATCH + ]; + + // Make one transaction that will succeed. + transactions.push(system_transaction::transfer( + &mint_keypair, + &Pubkey::new_unique(), + 1, + genesis_config.hash(), + )); + + let transactions_count = transactions.len(); + let ProcessTransactionsSummary { + reached_max_poh_height, + transactions_attempted_execution_count, + committed_transactions_count, + committed_transactions_with_successful_result_count, + failed_commit_count, + retryable_transaction_indexes, + .. + } = execute_transactions_with_dummy_poh_service(bank, transactions); + + // All the transactions should have been replayed, but only 1 committed + assert!(!reached_max_poh_height); + assert_eq!(transactions_attempted_execution_count, transactions_count); + // Both transactions should have been committed, even though one was an error, + // because InstructionErrors are committed + assert_eq!(committed_transactions_count, 2); + assert_eq!(committed_transactions_with_successful_result_count, 1); + assert_eq!(failed_commit_count, 0); + assert_eq!( + retryable_transaction_indexes, + (1..transactions_count - 1).collect::>() + ); + } + #[test] + fn test_process_transactions_account_in_use() { + solana_logger::setup(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_slow_genesis_config(10_000); + let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); + + // Make all repetitive transactions that conflict on the `mint_keypair`, so only 1 should be executed + let mut transactions = vec![ + system_transaction::transfer( + &mint_keypair, + &Pubkey::new_unique(), + 1, + genesis_config.hash() + ); + MAX_NUM_TRANSACTIONS_PER_BATCH + ]; + + // Make one more in separate batch that also conflicts, but because it's in a separate batch, it + // should be executed + transactions.push(system_transaction::transfer( + &mint_keypair, + &Pubkey::new_unique(), + 1, + genesis_config.hash(), + )); + + let transactions_count = transactions.len(); + let ProcessTransactionsSummary { + reached_max_poh_height, + transactions_attempted_execution_count, + committed_transactions_count, + committed_transactions_with_successful_result_count, + failed_commit_count, + retryable_transaction_indexes, + .. + } = execute_transactions_with_dummy_poh_service(bank, transactions); + + // All the transactions should have been replayed, but only 2 committed (first and last) + assert!(!reached_max_poh_height); + assert_eq!(transactions_attempted_execution_count, transactions_count); + assert_eq!(committed_transactions_count, 2); + assert_eq!(committed_transactions_with_successful_result_count, 2); + assert_eq!(failed_commit_count, 0,); + + // Everything except first and last index of the transactions failed and are last retryable + assert_eq!( + retryable_transaction_indexes, + (1..transactions_count - 1).collect::>() + ); } #[test] @@ -2556,9 +2957,9 @@ mod tests { bank.transfer(rent_exempt_amount, &mint_keypair, &keypair1.pubkey()) .unwrap(); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { - let blockstore = Blockstore::open(&ledger_path) + let blockstore = Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"); let blockstore = Arc::new(blockstore); let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new( @@ -2641,7 +3042,7 @@ mod tests { .store(true, Ordering::Relaxed); let _ = poh_simulator.join(); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } fn generate_new_address_lookup_table( @@ -2721,9 +3122,9 @@ mod tests { // todo: check if sig fees are needed bank.transfer(1, &mint_keypair, &keypair.pubkey()).unwrap(); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { - let blockstore = Blockstore::open(&ledger_path) + let blockstore = Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"); let blockstore = Arc::new(blockstore); let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new( @@ -2800,7 +3201,7 @@ mod tests { .store(true, Ordering::Relaxed); let _ = poh_simulator.join(); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } #[allow(clippy::type_complexity)] @@ -2860,10 +3261,10 @@ mod tests { #[test] fn test_consume_buffered_packets() { - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = - setup_conflicting_transactions(&ledger_path); + setup_conflicting_transactions(ledger_path.path()); let recorder = poh_recorder.lock().unwrap().recorder(); let num_conflicting_transactions = transactions.len(); let mut packet_batches = to_packet_batches(&transactions, num_conflicting_transactions); @@ -2931,15 +3332,15 @@ mod tests { .store(true, Ordering::Relaxed); let _ = poh_simulator.join(); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } #[test] fn test_consume_buffered_packets_interrupted() { - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = - setup_conflicting_transactions(&ledger_path); + setup_conflicting_transactions(ledger_path.path()); let num_conflicting_transactions = transactions.len(); let packet_batches = to_packet_batches(&transactions, 1); assert_eq!(packet_batches.len(), num_conflicting_transactions); @@ -3023,7 +3424,7 @@ mod tests { .store(true, Ordering::Relaxed); let _ = poh_simulator.join(); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } #[test] @@ -3041,10 +3442,10 @@ mod tests { } = &genesis_config_info; let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config)); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( - Blockstore::open(&ledger_path) + Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"), ); let poh_config = PohConfig { @@ -3094,7 +3495,7 @@ mod tests { exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } #[test] @@ -3124,10 +3525,10 @@ mod tests { .. } = &genesis_config_info; let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config)); - let ledger_path = get_tmp_ledger_path!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( - Blockstore::open(&ledger_path) + Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"), ); let poh_config = PohConfig { @@ -3207,7 +3608,7 @@ mod tests { exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); } - Blockstore::destroy(&ledger_path).unwrap(); + Blockstore::destroy(ledger_path.path()).unwrap(); } #[test] diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index e7e002279f..64679b3155 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -124,15 +124,17 @@ impl QosService { transactions: impl Iterator, transactions_costs: impl Iterator, bank: &Arc, - ) -> Vec> { + ) -> (Vec>, usize) { let mut cost_tracking_time = Measure::start("cost_tracking_time"); let mut cost_tracker = bank.write_cost_tracker().unwrap(); + let mut num_included = 0; let select_results = transactions .zip(transactions_costs) .map(|(tx, cost)| match cost_tracker.try_add(tx, cost) { Ok(current_block_cost) => { debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost); self.metrics.selected_txs_count.fetch_add(1, Ordering::Relaxed); + num_included += 1; Ok(()) }, Err(e) => { @@ -162,7 +164,7 @@ impl QosService { self.metrics .cost_tracking_time .fetch_add(cost_tracking_time.as_us(), Ordering::Relaxed); - select_results + (select_results, num_included) } // metrics are reported by bank slot @@ -542,7 +544,9 @@ mod tests { bank.write_cost_tracker() .unwrap() .set_limits(cost_limit, cost_limit, cost_limit); - let results = qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank); + let (results, num_selected) = + qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank); + assert_eq!(num_selected, 2); // verify that first transfer tx and first vote are allowed assert_eq!(results.len(), txs.len()); diff --git a/program-test/src/lib.rs b/program-test/src/lib.rs index f962ba92d7..ce22b9bf2d 100644 --- a/program-test/src/lib.rs +++ b/program-test/src/lib.rs @@ -421,7 +421,8 @@ fn setup_fees(bank: Bank) -> Bank { &[], // transactions &mut [], // loaded accounts vec![], // transaction execution results - 0, // tx count + 0, // executed tx count + 0, // executed with failure output tx count 1, // signature count &mut ExecuteTimings::default(), ); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 08e71ba2ea..b9f4803eca 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -626,6 +626,20 @@ impl TransactionExecutionResult { } } +pub struct LoadAndExecuteTransactionsOutput { + pub loaded_transactions: Vec, + // Vector of results indicating whether a transaction was executed or could not + // be executed. Note executed transactions can still have failed! + pub execution_results: Vec, + pub retryable_transaction_indexes: Vec, + // Total number of transactions that were executed + pub executed_transactions_count: usize, + // Total number of the executed transactions that returned success/not + // an error. + pub executed_with_successful_result_count: usize, + pub signature_count: u64, +} + #[derive(Debug, Clone)] pub enum DurableNonceFee { Valid(u64), @@ -3455,13 +3469,11 @@ impl Bank { let batch = self.prepare_simulation_batch(transaction); let mut timings = ExecuteTimings::default(); - let ( + let LoadAndExecuteTransactionsOutput { loaded_transactions, mut execution_results, - _retryable_transactions, - _transaction_count, - _signature_count, - ) = self.load_and_execute_transactions( + .. + } = self.load_and_execute_transactions( &batch, // After simulation, transactions will need to be forwarded to the leader // for processing. During forwarding, the transaction could expire if the @@ -3942,19 +3954,13 @@ impl Bank { enable_cpi_recording: bool, enable_log_recording: bool, timings: &mut ExecuteTimings, - ) -> ( - Vec, - Vec, - Vec, - u64, - u64, - ) { + ) -> LoadAndExecuteTransactionsOutput { let sanitized_txs = batch.sanitized_transactions(); debug!("processing transactions: {}", sanitized_txs.len()); inc_new_counter_info!("bank-process_transactions", sanitized_txs.len()); let mut error_counters = ErrorCounters::default(); - let retryable_txs: Vec<_> = batch + let retryable_transaction_indexes: Vec<_> = batch .lock_results() .iter() .enumerate() @@ -3982,7 +3988,7 @@ impl Bank { check_time.stop(); let mut load_time = Measure::start("accounts_load"); - let mut loaded_txs = self.rc.accounts.load_accounts( + let mut loaded_transactions = self.rc.accounts.load_accounts( &self.ancestors, sanitized_txs, check_results, @@ -3996,7 +4002,7 @@ impl Bank { let mut execution_time = Measure::start("execution_time"); let mut signature_count: u64 = 0; - let execution_results: Vec = loaded_txs + let execution_results: Vec = loaded_transactions .iter_mut() .zip(sanitized_txs.iter()) .map(|(accs, tx)| match accs { @@ -4059,7 +4065,8 @@ impl Bank { timings.load_us = timings.load_us.saturating_add(load_time.as_us()); timings.execute_us = timings.execute_us.saturating_add(execution_time.as_us()); - let mut tx_count: u64 = 0; + let mut executed_transactions_count: usize = 0; + let mut executed_with_successful_result_count: usize = 0; let err_count = &mut error_counters.total; let transaction_log_collector_config = self.transaction_log_collector_config.read().unwrap(); @@ -4133,9 +4140,13 @@ impl Bank { } } + if execution_result.was_executed() { + executed_transactions_count += 1; + } + match execution_result.flattened_result() { Ok(()) => { - tx_count += 1; + executed_with_successful_result_count += 1; } Err(err) => { if *err_count == 0 { @@ -4149,17 +4160,18 @@ impl Bank { debug!( "{} errors of {} txs", *err_count, - *err_count as u64 + tx_count + *err_count + executed_with_successful_result_count ); } Self::update_error_counters(&error_counters); - ( - loaded_txs, + LoadAndExecuteTransactionsOutput { + loaded_transactions, execution_results, - retryable_txs, - tx_count, + retryable_transaction_indexes, + executed_transactions_count, + executed_with_successful_result_count, signature_count, - ) + } } /// Load the accounts data len @@ -4260,12 +4272,17 @@ impl Bank { results } + /// `committed_transactions_count` is the number of transactions out of `sanitized_txs` + /// that was executed. Of those, `committed_transactions_count`, + /// `committed_with_failure_result_count` is the number of executed transactions that returned + /// a failure result. pub fn commit_transactions( &self, sanitized_txs: &[SanitizedTransaction], loaded_txs: &mut [TransactionLoadResult], execution_results: Vec, - tx_count: u64, + committed_transactions_count: u64, + committed_with_failure_result_count: u64, signature_count: u64, timings: &mut ExecuteTimings, ) -> TransactionResults { @@ -4274,24 +4291,32 @@ impl Bank { "commit_transactions() working on a bank that is already frozen or is undergoing freezing!" ); + let tx_count = if self.bank_tranaction_count_fix_enabled() { + committed_transactions_count + } else { + committed_transactions_count.saturating_sub(committed_with_failure_result_count) + }; + self.increment_transaction_count(tx_count); self.increment_signature_count(signature_count); - inc_new_counter_info!("bank-process_transactions-txs", tx_count as usize); + inc_new_counter_info!( + "bank-process_transactions-txs", + committed_transactions_count as usize + ); inc_new_counter_info!("bank-process_transactions-sigs", signature_count as usize); - if !sanitized_txs.is_empty() { - let processed_tx_count = sanitized_txs.len() as u64; - let failed_tx_count = processed_tx_count.saturating_sub(tx_count); + if committed_with_failure_result_count > 0 { self.transaction_error_count - .fetch_add(failed_tx_count, Relaxed); - self.transaction_entries_count.fetch_add(1, Relaxed); - self.transactions_per_entry_max - .fetch_max(processed_tx_count, Relaxed); + .fetch_add(committed_with_failure_result_count, Relaxed); } + // Should be equivalent to checking `committed_transactions_count > 0` if execution_results.iter().any(|result| result.was_executed()) { self.is_delta.store(true, Relaxed); + self.transaction_entries_count.fetch_add(1, Relaxed); + self.transactions_per_entry_max + .fetch_max(committed_transactions_count, Relaxed); } let (blockhash, lamports_per_signature) = self.last_blockhash_and_lamports_per_signature(); @@ -5113,20 +5138,28 @@ impl Bank { vec![] }; - let (mut loaded_txs, execution_results, _, tx_count, signature_count) = self - .load_and_execute_transactions( - batch, - max_age, - enable_cpi_recording, - enable_log_recording, - timings, - ); + let LoadAndExecuteTransactionsOutput { + mut loaded_transactions, + execution_results, + executed_transactions_count, + executed_with_successful_result_count, + signature_count, + .. + } = self.load_and_execute_transactions( + batch, + max_age, + enable_cpi_recording, + enable_log_recording, + timings, + ); let results = self.commit_transactions( batch.sanitized_transactions(), - &mut loaded_txs, + &mut loaded_transactions, execution_results, - tx_count, + executed_transactions_count as u64, + executed_transactions_count.saturating_sub(executed_with_successful_result_count) + as u64, signature_count, timings, ); @@ -6201,6 +6234,11 @@ impl Bank { consumed_budget.saturating_sub(budget_recovery_delta) } + pub fn bank_tranaction_count_fix_enabled(&self) -> bool { + self.feature_set + .is_active(&feature_set::bank_tranaction_count_fix::id()) + } + pub fn shrink_candidate_slots(&self) -> usize { self.rc.accounts.accounts_db.shrink_candidate_slots() } diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 36112e06e0..edd04ce981 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -315,6 +315,10 @@ pub mod add_get_processed_sibling_instruction_syscall { solana_sdk::declare_id!("CFK1hRCNy8JJuAAY8Pb2GjLFNdCThS2qwZNe3izzBMgn"); } +pub mod bank_tranaction_count_fix { + solana_sdk::declare_id!("Vo5siZ442SaZBKPXNocthiXysNviW4UYPwRFggmbgAp"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -388,6 +392,7 @@ lazy_static! { (spl_associated_token_account_v1_0_4::id(), "SPL Associated Token Account Program release version 1.0.4, tied to token 3.3.0 #22648"), (reject_vote_account_close_unless_zero_credit_epoch::id(), "fail vote account withdraw to 0 unless account earned 0 credits in last completed epoch"), (add_get_processed_sibling_instruction_syscall::id(), "add add_get_processed_sibling_instruction_syscall"), + (bank_tranaction_count_fix::id(), "Fixes Bank::transaction_count to include all committed transactions, not just successful ones"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()