diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index ba915bc767..af7b5b93e4 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -757,13 +757,17 @@ mod tests { self, state::{AddressLookupTable, LookupTableMeta}, }, + compute_budget, instruction::InstructionError, - message::{v0, v0::MessageAddressTableLookup, MessageHeader, VersionedMessage}, + message::{ + v0::{self, MessageAddressTableLookup}, + Message, MessageHeader, VersionedMessage, + }, poh_config::PohConfig, pubkey::Pubkey, signature::Keypair, signer::Signer, - system_transaction, + system_instruction, system_transaction, transaction::{MessageHash, Transaction, VersionedTransaction}, }, solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta}, @@ -862,10 +866,11 @@ mod tests { Arc, Arc>, Receiver, + GenesisConfigInfo, JoinHandle<()>, ) { Blockstore::destroy(ledger_path).unwrap(); - let genesis_config_info = create_slow_genesis_config(10_000); + let genesis_config_info = create_slow_genesis_config(100_000_000); let GenesisConfigInfo { genesis_config, mint_keypair, @@ -905,6 +910,7 @@ mod tests { bank, poh_recorder, entry_receiver, + genesis_config_info, poh_simulator, ) } @@ -1830,9 +1836,9 @@ mod tests { fn test_consume_buffered_packets() { let ledger_path = get_tmp_ledger_path_auto_delete!(); { - let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = + let (transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) = setup_conflicting_transactions(ledger_path.path()); - let recorder = poh_recorder.read().unwrap().new_recorder(); + let recorder: TransactionRecorder = poh_recorder.read().unwrap().new_recorder(); let num_conflicting_transactions = transactions.len(); let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); assert_eq!(deserialized_packets.len(), num_conflicting_transactions); @@ -1903,7 +1909,7 @@ mod tests { fn test_consume_buffered_packets_sanitization_error() { let ledger_path = get_tmp_ledger_path_auto_delete!(); { - let (mut transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = + let (mut transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) = setup_conflicting_transactions(ledger_path.path()); let duplicate_account_key = transactions[0].message.account_keys[0]; transactions[0] @@ -1959,7 +1965,7 @@ mod tests { fn test_consume_buffered_packets_retryable() { let ledger_path = get_tmp_ledger_path_auto_delete!(); { - let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = + let (transactions, bank, poh_recorder, _entry_receiver, _, poh_simulator) = setup_conflicting_transactions(ledger_path.path()); let recorder = poh_recorder.read().unwrap().new_recorder(); let num_conflicting_transactions = transactions.len(); @@ -2048,6 +2054,109 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } + #[test] + fn test_consume_buffered_packets_batch_priority_guard() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + { + let (_, bank, poh_recorder, _entry_receiver, genesis_config_info, poh_simulator) = + setup_conflicting_transactions(ledger_path.path()); + let recorder = poh_recorder.read().unwrap().new_recorder(); + + // Setup transactions: + // [(AB), (BC), (CD)] + // (AB) and (BC) are conflicting, and cannot go into the same batch. + // (AB) and (CD) are not conflict. However, (CD) should not be able to take locks needed by (BC). + let keypair_a = Keypair::new(); + let keypair_b = Keypair::new(); + let keypair_c = Keypair::new(); + let keypair_d = Keypair::new(); + for keypair in &[&keypair_a, &keypair_b, &keypair_c, &keypair_d] { + bank.transfer(5_000, &genesis_config_info.mint_keypair, &keypair.pubkey()) + .unwrap(); + } + + let make_prioritized_transfer = + |from: &Keypair, to, lamports, priority| -> Transaction { + let ixs = vec![ + system_instruction::transfer(&from.pubkey(), to, lamports), + compute_budget::ComputeBudgetInstruction::set_compute_unit_price(priority), + ]; + let message = Message::new(&ixs, Some(&from.pubkey())); + Transaction::new(&[from], message, bank.last_blockhash()) + }; + + let transactions = vec![ + make_prioritized_transfer(&keypair_a, &keypair_b.pubkey(), 1, 3), + make_prioritized_transfer(&keypair_b, &keypair_c.pubkey(), 1, 2), + make_prioritized_transfer(&keypair_c, &keypair_d.pubkey(), 1, 1), + ]; + + let num_conflicting_transactions = transactions.len(); + let deserialized_packets = transactions_to_deserialized_packets(&transactions).unwrap(); + assert_eq!(deserialized_packets.len(), num_conflicting_transactions); + let mut buffered_packet_batches = + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::from_iter( + deserialized_packets, + num_conflicting_transactions, + ), + ThreadType::Transactions, + ); + + let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None); + + // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) + assert!(!poh_recorder.read().unwrap().has_bank()); + assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); + // When the working bank in poh_recorder is Some, all packets should be processed. + // Multi-Iterator will process them 1-by-1 if all txs are conflicting. + poh_recorder.write().unwrap().set_bank(bank, false); + let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); + let banking_stage_stats = BankingStageStats::default(); + consumer.consume_buffered_packets( + &bank_start, + &mut buffered_packet_batches, + &banking_stage_stats, + &mut LeaderSlotMetricsTracker::new(0), + ); + + // Check that all packets were processed without retrying + assert!(buffered_packet_batches.is_empty()); + assert_eq!( + banking_stage_stats + .consumed_buffered_packets_count + .load(Ordering::Relaxed), + num_conflicting_transactions + ); + assert_eq!( + banking_stage_stats + .rebuffered_packets_count + .load(Ordering::Relaxed), + 0 + ); + // Use bank to check the number of entries (batches) + assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1); + assert_eq!( + bank_start.working_bank.transaction_entries_count(), + 4 + num_conflicting_transactions as u64 // 4 for funding transfers + ); + + poh_recorder + .read() + .unwrap() + .is_exited + .store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); + } + Blockstore::destroy(ledger_path.path()).unwrap(); + } + #[test] fn test_accumulate_execute_units_and_time() { let mut execute_timings = ExecuteTimings::default(); diff --git a/core/src/banking_stage/read_write_account_set.rs b/core/src/banking_stage/read_write_account_set.rs index 691f81d0f5..7a2117675b 100644 --- a/core/src/banking_stage/read_write_account_set.rs +++ b/core/src/banking_stage/read_write_account_set.rs @@ -1,12 +1,9 @@ use { - solana_sdk::{ - message::{SanitizedMessage, VersionedMessage}, - pubkey::Pubkey, - }, + solana_sdk::{message::SanitizedMessage, pubkey::Pubkey}, std::collections::HashSet, }; -/// Wrapper struct to check account locks for a batch of transactions. +/// Wrapper struct to accumulate locks for a batch of transactions. #[derive(Debug, Default)] pub struct ReadWriteAccountSet { /// Set of accounts that are locked for read @@ -16,30 +13,36 @@ pub struct ReadWriteAccountSet { } impl ReadWriteAccountSet { - /// Check static account locks for a transaction message. - pub fn check_static_account_locks(&self, message: &VersionedMessage) -> bool { - !message - .static_account_keys() + /// Returns true if all account locks were available and false otherwise. + #[allow(dead_code)] + pub fn check_locks(&self, message: &SanitizedMessage) -> bool { + message + .account_keys() .iter() .enumerate() - .any(|(index, pubkey)| { - if message.is_maybe_writable(index) { - !self.can_write(pubkey) + .all(|(index, pubkey)| { + if message.is_writable(index) { + self.can_write(pubkey) } else { - !self.can_read(pubkey) + self.can_read(pubkey) } }) } - /// Check all account locks and if they are available, lock them. - /// Returns true if all account locks are available and false otherwise. - pub fn try_locking(&mut self, message: &SanitizedMessage) -> bool { - if self.check_sanitized_message_account_locks(message) { - self.add_sanitized_message_account_locks(message); - true - } else { - false - } + /// Add all account locks. + /// Returns true if all account locks were available and false otherwise. + pub fn take_locks(&mut self, message: &SanitizedMessage) -> bool { + message + .account_keys() + .iter() + .enumerate() + .fold(true, |all_available, (index, pubkey)| { + if message.is_writable(index) { + all_available & self.add_write(pubkey) + } else { + all_available & self.add_read(pubkey) + } + }) } /// Clears the read and write sets @@ -48,36 +51,6 @@ impl ReadWriteAccountSet { self.write_set.clear(); } - /// Check if a sanitized message's account locks are available. - fn check_sanitized_message_account_locks(&self, message: &SanitizedMessage) -> bool { - !message - .account_keys() - .iter() - .enumerate() - .any(|(index, pubkey)| { - if message.is_writable(index) { - !self.can_write(pubkey) - } else { - !self.can_read(pubkey) - } - }) - } - - /// Insert the read and write locks for a sanitized message. - fn add_sanitized_message_account_locks(&mut self, message: &SanitizedMessage) { - message - .account_keys() - .iter() - .enumerate() - .for_each(|(index, pubkey)| { - if message.is_writable(index) { - self.add_write(pubkey); - } else { - self.add_read(pubkey); - } - }); - } - /// Check if an account can be read-locked fn can_read(&self, pubkey: &Pubkey) -> bool { !self.write_set.contains(pubkey) @@ -89,15 +62,21 @@ impl ReadWriteAccountSet { } /// Add an account to the read-set. - /// Should only be called after `can_read()` returns true - fn add_read(&mut self, pubkey: &Pubkey) { + /// Returns true if the lock was available. + fn add_read(&mut self, pubkey: &Pubkey) -> bool { + let can_read = self.can_read(pubkey); self.read_set.insert(*pubkey); + + can_read } /// Add an account to the write-set. - /// Should only be called after `can_write()` returns true - fn add_write(&mut self, pubkey: &Pubkey) { + /// Returns true if the lock was available. + fn add_write(&mut self, pubkey: &Pubkey) -> bool { + let can_write = self.can_write(pubkey); self.write_set.insert(*pubkey); + + can_write } } @@ -197,58 +176,12 @@ mod tests { Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)) } - // Helper function (could potentially use test_case in future). - // conflict_index = 0 means write lock conflict - // conflict_index = 1 means read lock conflict - fn test_check_static_account_locks(conflict_index: usize, add_write: bool, expectation: bool) { - let message = - create_test_versioned_message(&[Pubkey::new_unique()], &[Pubkey::new_unique()], vec![]); - - let mut account_locks = ReadWriteAccountSet::default(); - assert!(account_locks.check_static_account_locks(&message)); - - let conflict_key = message.static_account_keys().get(conflict_index).unwrap(); - if add_write { - account_locks.add_write(conflict_key); - } else { - account_locks.add_read(conflict_key); - } - assert_eq!( - expectation, - account_locks.check_static_account_locks(&message) - ); - } - - #[test] - fn test_check_static_account_locks_write_write_conflict() { - test_check_static_account_locks(0, true, false); - } - - #[test] - fn test_check_static_account_locks_read_write_conflict() { - test_check_static_account_locks(0, false, false); - } - - #[test] - fn test_check_static_account_locks_write_read_conflict() { - test_check_static_account_locks(1, true, false); - } - - #[test] - fn test_check_static_account_locks_read_read_non_conflict() { - test_check_static_account_locks(1, false, true); - } - // Helper function (could potentially use test_case in future). // conflict_index = 0 means write lock conflict with static key // conflict_index = 1 means read lock conflict with static key // conflict_index = 2 means write lock conflict with address table key // conflict_index = 3 means read lock conflict with address table key - fn test_check_sanitized_message_account_locks( - conflict_index: usize, - add_write: bool, - expectation: bool, - ) { + fn test_check_and_take_locks(conflict_index: usize, add_write: bool, expectation: bool) { let bank = create_test_bank(); let (bank, table_address) = create_test_address_lookup_table(bank, 2); let tx = create_test_sanitized_transaction( @@ -264,7 +197,6 @@ mod tests { let message = tx.message(); let mut account_locks = ReadWriteAccountSet::default(); - assert!(account_locks.check_sanitized_message_account_locks(message)); let conflict_key = message.account_keys().get(conflict_index).unwrap(); if add_write { @@ -272,34 +204,32 @@ mod tests { } else { account_locks.add_read(conflict_key); } - assert_eq!( - expectation, - account_locks.check_sanitized_message_account_locks(message) - ); + assert_eq!(expectation, account_locks.check_locks(message)); + assert_eq!(expectation, account_locks.take_locks(message)); } #[test] - fn test_check_sanitized_message_account_locks_write_write_conflict() { - test_check_sanitized_message_account_locks(0, true, false); // static key conflict - test_check_sanitized_message_account_locks(2, true, false); // lookup key conflict + fn test_check_and_take_locks_write_write_conflict() { + test_check_and_take_locks(0, true, false); // static key conflict + test_check_and_take_locks(2, true, false); // lookup key conflict } #[test] - fn test_check_sanitized_message_account_locks_read_write_conflict() { - test_check_sanitized_message_account_locks(0, false, false); // static key conflict - test_check_sanitized_message_account_locks(2, false, false); // lookup key conflict + fn test_check_and_take_locks_read_write_conflict() { + test_check_and_take_locks(0, false, false); // static key conflict + test_check_and_take_locks(2, false, false); // lookup key conflict } #[test] - fn test_check_sanitized_message_account_locks_write_read_conflict() { - test_check_sanitized_message_account_locks(1, true, false); // static key conflict - test_check_sanitized_message_account_locks(3, true, false); // lookup key conflict + fn test_check_and_take_locks_write_read_conflict() { + test_check_and_take_locks(1, true, false); // static key conflict + test_check_and_take_locks(3, true, false); // lookup key conflict } #[test] - fn test_check_sanitized_message_account_locks_read_read_non_conflict() { - test_check_sanitized_message_account_locks(1, false, true); // static key conflict - test_check_sanitized_message_account_locks(3, false, true); // lookup key conflict + fn test_check_and_take_locks_read_read_non_conflict() { + test_check_and_take_locks(1, false, true); // static key conflict + test_check_and_take_locks(3, false, true); // lookup key conflict } #[test] diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 80ce087532..03b3e58332 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -16,7 +16,7 @@ use { }, itertools::Itertools, min_max_heap::MinMaxHeap, - solana_measure::measure, + solana_measure::{measure, measure_us}, solana_runtime::bank::Bank, solana_sdk::{ clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, feature_set::FeatureSet, hash::Hash, @@ -149,18 +149,11 @@ fn consume_scan_should_process_packet( return ProcessingDecision::Now; } - // Before sanitization, let's quickly check the static keys (performance optimization) - let message = &packet.transaction().get_message().message; - if !payload.account_locks.check_static_account_locks(message) { - return ProcessingDecision::Later; - } - - // Try to deserialize the packet - let (maybe_sanitized_transaction, sanitization_time) = measure!( + // Try to sanitize the packet + let (maybe_sanitized_transaction, sanitization_time_us) = measure_us!( packet.build_sanitized_transaction(&bank.feature_set, bank.vote_only_bank(), bank) ); - let sanitization_time_us = sanitization_time.as_us(); payload .slot_metrics_tracker .increment_transactions_from_packets_us(sanitization_time_us); @@ -181,13 +174,18 @@ fn consume_scan_should_process_packet( payload .message_hash_to_transaction .remove(packet.message_hash()); - ProcessingDecision::Never - } else if payload.account_locks.try_locking(message) { - payload.sanitized_transactions.push(sanitized_transaction); - ProcessingDecision::Now - } else { - ProcessingDecision::Later + return ProcessingDecision::Never; } + + // Always take locks during batch creation. + // This prevents lower-priority transactions from taking locks + // needed by higher-priority txs that were skipped by this check. + if !payload.account_locks.take_locks(message) { + return ProcessingDecision::Later; + } + + payload.sanitized_transactions.push(sanitized_transaction); + ProcessingDecision::Now } else { payload .message_hash_to_transaction