diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 9173395a68..dc75d24a53 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -48,9 +48,12 @@ pub mod consumer; mod decision_maker; mod forwarder; mod packet_receiver; + #[allow(dead_code)] mod scheduler_messages; +mod consume_worker; +mod forward_worker; #[allow(dead_code)] mod thread_aware_account_locks; @@ -609,6 +612,7 @@ mod tests { pubkey::Pubkey, signature::{Keypair, Signer}, system_transaction, + transaction::{SanitizedTransaction, Transaction}, }, solana_streamer::socket::SocketAddrSpace, solana_vote_program::{ @@ -628,6 +632,12 @@ mod tests { (node, cluster_info) } + pub(crate) fn sanitize_transactions(txs: Vec) -> Vec { + txs.into_iter() + .map(SanitizedTransaction::from_transaction_for_tests) + .collect() + } + #[test] fn test_banking_stage_shutdown1() { let genesis_config = create_genesis_config(2).genesis_config; diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs new file mode 100644 index 0000000000..dfc7a533e8 --- /dev/null +++ b/core/src/banking_stage/consume_worker.rs @@ -0,0 +1,431 @@ +use { + super::{ + consumer::{Consumer, ExecuteAndCommitTransactionsOutput, ProcessTransactionBatchOutput}, + scheduler_messages::{ConsumeWork, FinishedConsumeWork}, + }, + crossbeam_channel::{Receiver, RecvError, SendError, Sender}, + solana_poh::leader_bank_notifier::LeaderBankNotifier, + solana_runtime::bank::Bank, + std::{sync::Arc, time::Duration}, + thiserror::Error, +}; + +#[derive(Debug, Error)] +pub enum ConsumeWorkerError { + #[error("Failed to receive work from scheduler: {0}")] + Recv(#[from] RecvError), + #[error("Failed to send finalized consume work to scheduler: {0}")] + Send(#[from] SendError), +} + +pub(crate) struct ConsumeWorker { + consume_receiver: Receiver, + consumer: Consumer, + consumed_sender: Sender, + + leader_bank_notifier: Arc, +} + +#[allow(dead_code)] +impl ConsumeWorker { + pub fn new( + consume_receiver: Receiver, + consumer: Consumer, + consumed_sender: Sender, + leader_bank_notifier: Arc, + ) -> Self { + Self { + consume_receiver, + consumer, + consumed_sender, + leader_bank_notifier, + } + } + + pub fn run(self) -> Result<(), ConsumeWorkerError> { + loop { + let work = self.consume_receiver.recv()?; + self.consume_loop(work)?; + } + } + + fn consume_loop(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { + let Some(mut bank) = self.get_consume_bank() else { + return self.retry_drain(work); + }; + + for work in try_drain_iter(work, &self.consume_receiver) { + if bank.is_complete() { + if let Some(new_bank) = self.get_consume_bank() { + bank = new_bank; + } else { + return self.retry_drain(work); + } + } + self.consume(&bank, work)?; + } + + Ok(()) + } + + /// Consume a single batch. + fn consume(&self, bank: &Arc, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { + let ProcessTransactionBatchOutput { + execute_and_commit_transactions_output: + ExecuteAndCommitTransactionsOutput { + retryable_transaction_indexes, + .. + }, + .. + } = self.consumer.process_and_record_aged_transactions( + bank, + &work.transactions, + &work.max_age_slots, + ); + + self.consumed_sender.send(FinishedConsumeWork { + work, + retryable_indexes: retryable_transaction_indexes, + })?; + Ok(()) + } + + /// Try to get a bank for consuming. + fn get_consume_bank(&self) -> Option> { + self.leader_bank_notifier + .get_or_wait_for_in_progress(Duration::from_millis(50)) + .upgrade() + } + + /// Retry current batch and all outstanding batches. + fn retry_drain(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { + for work in try_drain_iter(work, &self.consume_receiver) { + self.retry(work)?; + } + Ok(()) + } + + /// Send transactions back to scheduler as retryable. + fn retry(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { + let retryable_indexes = (0..work.transactions.len()).collect(); + self.consumed_sender.send(FinishedConsumeWork { + work, + retryable_indexes, + })?; + Ok(()) + } +} + +/// Helper function to create an non-blocking iterator over work in the receiver, +/// starting with the given work item. +fn try_drain_iter(work: T, receiver: &Receiver) -> impl Iterator + '_ { + std::iter::once(work).chain(receiver.try_iter()) +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::{ + banking_stage::{ + committer::Committer, + scheduler_messages::{TransactionBatchId, TransactionId}, + tests::{create_slow_genesis_config, sanitize_transactions, simulate_poh}, + }, + qos_service::QosService, + }, + crossbeam_channel::unbounded, + solana_ledger::{ + blockstore::Blockstore, genesis_utils::GenesisConfigInfo, + get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, + }, + solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, + solana_runtime::{ + bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, + vote_sender_types::ReplayVoteReceiver, + }, + solana_sdk::{ + genesis_config::GenesisConfig, poh_config::PohConfig, pubkey::Pubkey, + signature::Keypair, system_transaction, + }, + std::{ + sync::{atomic::AtomicBool, RwLock}, + thread::JoinHandle, + }, + tempfile::TempDir, + }; + + // Helper struct to create tests that hold channels, files, etc. + // such that our tests can be more easily set up and run. + struct TestFrame { + mint_keypair: Keypair, + genesis_config: GenesisConfig, + bank: Arc, + _ledger_path: TempDir, + _entry_receiver: Receiver, + poh_recorder: Arc>, + _poh_simulator: JoinHandle<()>, + _replay_vote_receiver: ReplayVoteReceiver, + + consume_sender: Sender, + consumed_receiver: Receiver, + } + + fn setup_test_frame() -> (TestFrame, ConsumeWorker) { + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_slow_genesis_config(10_000); + let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = bank_forks.read().unwrap().working_bank(); + + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.clone(), + Some((4, 4)), + bank.ticks_per_slot(), + &Pubkey::new_unique(), + Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &PohConfig::default(), + Arc::new(AtomicBool::default()), + ); + let recorder = poh_recorder.new_recorder(); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); + let poh_simulator = simulate_poh(record_receiver, &poh_recorder); + + let (replay_vote_sender, replay_vote_receiver) = unbounded(); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None); + + let (consume_sender, consume_receiver) = unbounded(); + let (consumed_sender, consumed_receiver) = unbounded(); + let worker = ConsumeWorker::new( + consume_receiver, + consumer, + consumed_sender, + poh_recorder.read().unwrap().new_leader_bank_notifier(), + ); + + ( + TestFrame { + mint_keypair, + genesis_config, + bank, + _ledger_path: ledger_path, + _entry_receiver: entry_receiver, + poh_recorder, + _poh_simulator: poh_simulator, + _replay_vote_receiver: replay_vote_receiver, + consume_sender, + consumed_receiver, + }, + worker, + ) + } + + #[test] + fn test_worker_consume_no_bank() { + let (test_frame, worker) = setup_test_frame(); + let TestFrame { + mint_keypair, + genesis_config, + bank, + consume_sender, + consumed_receiver, + .. + } = &test_frame; + let worker_thread = std::thread::spawn(move || worker.run()); + + let pubkey1 = Pubkey::new_unique(); + + let transactions = sanitize_transactions(vec![system_transaction::transfer( + mint_keypair, + &pubkey1, + 1, + genesis_config.hash(), + )]); + let bid = TransactionBatchId::new(0); + let id = TransactionId::new(0); + let work = ConsumeWork { + batch_id: bid, + ids: vec![id], + transactions, + max_age_slots: vec![bank.slot()], + }; + consume_sender.send(work).unwrap(); + let consumed = consumed_receiver.recv().unwrap(); + assert_eq!(consumed.work.batch_id, bid); + assert_eq!(consumed.work.ids, vec![id]); + assert_eq!(consumed.work.max_age_slots, vec![bank.slot()]); + assert_eq!(consumed.retryable_indexes, vec![0]); + + drop(test_frame); + let _ = worker_thread.join().unwrap(); + } + + #[test] + fn test_worker_consume_simple() { + let (test_frame, worker) = setup_test_frame(); + let TestFrame { + mint_keypair, + genesis_config, + bank, + poh_recorder, + consume_sender, + consumed_receiver, + .. + } = &test_frame; + let worker_thread = std::thread::spawn(move || worker.run()); + poh_recorder.write().unwrap().set_bank(bank, false); + + let pubkey1 = Pubkey::new_unique(); + + let transactions = sanitize_transactions(vec![system_transaction::transfer( + mint_keypair, + &pubkey1, + 1, + genesis_config.hash(), + )]); + let bid = TransactionBatchId::new(0); + let id = TransactionId::new(0); + let work = ConsumeWork { + batch_id: bid, + ids: vec![id], + transactions, + max_age_slots: vec![bank.slot()], + }; + consume_sender.send(work).unwrap(); + let consumed = consumed_receiver.recv().unwrap(); + assert_eq!(consumed.work.batch_id, bid); + assert_eq!(consumed.work.ids, vec![id]); + assert_eq!(consumed.work.max_age_slots, vec![bank.slot()]); + assert_eq!(consumed.retryable_indexes, Vec::::new()); + + drop(test_frame); + let _ = worker_thread.join().unwrap(); + } + + #[test] + fn test_worker_consume_self_conflicting() { + let (test_frame, worker) = setup_test_frame(); + let TestFrame { + mint_keypair, + genesis_config, + bank, + poh_recorder, + consume_sender, + consumed_receiver, + .. + } = &test_frame; + let worker_thread = std::thread::spawn(move || worker.run()); + poh_recorder.write().unwrap().set_bank(bank, false); + + let pubkey1 = Pubkey::new_unique(); + let pubkey2 = Pubkey::new_unique(); + + let txs = sanitize_transactions(vec![ + system_transaction::transfer(mint_keypair, &pubkey1, 2, genesis_config.hash()), + system_transaction::transfer(mint_keypair, &pubkey2, 2, genesis_config.hash()), + ]); + + let bid = TransactionBatchId::new(0); + let id1 = TransactionId::new(1); + let id2 = TransactionId::new(0); + consume_sender + .send(ConsumeWork { + batch_id: bid, + ids: vec![id1, id2], + transactions: txs, + max_age_slots: vec![bank.slot(), bank.slot()], + }) + .unwrap(); + + let consumed = consumed_receiver.recv().unwrap(); + assert_eq!(consumed.work.batch_id, bid); + assert_eq!(consumed.work.ids, vec![id1, id2]); + assert_eq!(consumed.work.max_age_slots, vec![bank.slot(), bank.slot()]); + assert_eq!(consumed.retryable_indexes, vec![1]); // id2 is retryable since lock conflict + + drop(test_frame); + let _ = worker_thread.join().unwrap(); + } + + #[test] + fn test_worker_consume_multiple_messages() { + let (test_frame, worker) = setup_test_frame(); + let TestFrame { + mint_keypair, + genesis_config, + bank, + poh_recorder, + consume_sender, + consumed_receiver, + .. + } = &test_frame; + let worker_thread = std::thread::spawn(move || worker.run()); + poh_recorder.write().unwrap().set_bank(bank, false); + + let pubkey1 = Pubkey::new_unique(); + let pubkey2 = Pubkey::new_unique(); + + let txs1 = sanitize_transactions(vec![system_transaction::transfer( + mint_keypair, + &pubkey1, + 2, + genesis_config.hash(), + )]); + let txs2 = sanitize_transactions(vec![system_transaction::transfer( + mint_keypair, + &pubkey2, + 2, + genesis_config.hash(), + )]); + + let bid1 = TransactionBatchId::new(0); + let bid2 = TransactionBatchId::new(1); + let id1 = TransactionId::new(1); + let id2 = TransactionId::new(0); + consume_sender + .send(ConsumeWork { + batch_id: bid1, + ids: vec![id1], + transactions: txs1, + max_age_slots: vec![bank.slot()], + }) + .unwrap(); + + consume_sender + .send(ConsumeWork { + batch_id: bid2, + ids: vec![id2], + transactions: txs2, + max_age_slots: vec![bank.slot()], + }) + .unwrap(); + let consumed = consumed_receiver.recv().unwrap(); + assert_eq!(consumed.work.batch_id, bid1); + assert_eq!(consumed.work.ids, vec![id1]); + assert_eq!(consumed.work.max_age_slots, vec![bank.slot()]); + assert_eq!(consumed.retryable_indexes, Vec::::new()); + + let consumed = consumed_receiver.recv().unwrap(); + assert_eq!(consumed.work.batch_id, bid2); + assert_eq!(consumed.work.ids, vec![id2]); + assert_eq!(consumed.work.max_age_slots, vec![bank.slot()]); + assert_eq!(consumed.retryable_indexes, Vec::::new()); + + drop(test_frame); + let _ = worker_thread.join().unwrap(); + } +} diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 3ab96e79ff..ba58bd48d0 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -25,10 +25,10 @@ use { transaction_error_metrics::TransactionErrorMetrics, }, solana_sdk::{ - clock::{FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, + clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE}, feature_set, saturating_add_assign, timing::timestamp, - transaction::{self, SanitizedTransaction, TransactionError}, + transaction::{self, AddressLoader, SanitizedTransaction, TransactionError}, }, std::{ sync::{atomic::Ordering, Arc}, @@ -57,7 +57,7 @@ pub struct ExecuteAndCommitTransactionsOutput { executed_with_successful_result_count: usize, // Transactions that either were not executed, or were executed and failed to be committed due // to the block ending. - retryable_transaction_indexes: Vec, + pub(crate) retryable_transaction_indexes: Vec, // A result that indicates whether transactions were successfully // committed into the Poh stream. pub commit_transactions_result: Result, PohRecorderError>, @@ -392,6 +392,51 @@ impl Consumer { bank: &Arc, txs: &[SanitizedTransaction], chunk_offset: usize, + ) -> ProcessTransactionBatchOutput { + // No filtering before QoS - transactions should have been sanitized immediately prior to this call + let pre_results = std::iter::repeat(Ok(())); + self.process_and_record_transactions_with_pre_results(bank, txs, chunk_offset, pre_results) + } + + pub fn process_and_record_aged_transactions( + &self, + bank: &Arc, + txs: &[SanitizedTransaction], + max_slot_ages: &[Slot], + ) -> ProcessTransactionBatchOutput { + // Need to filter out transactions since they were sanitized earlier. + // This means that the transaction may cross and epoch boundary (not allowed), + // or account lookup tables may have been closed. + let pre_results = txs.iter().zip(max_slot_ages).map(|(tx, max_slot_age)| { + if *max_slot_age < bank.slot() { + // Attempt re-sanitization after epoch-cross. + // Re-sanitized transaction should be equal to the original transaction, + // but whether it will pass sanitization needs to be checked. + let resanitized_tx = + bank.fully_verify_transaction(tx.to_versioned_transaction())?; + if resanitized_tx != *tx { + // Sanitization before/after epoch give different transaction data - do not execute. + return Err(TransactionError::ResanitizationNeeded); + } + } else { + // Any transaction executed between sanitization time and now may have closed the lookup table(s). + // Above re-sanitization already loads addresses, so don't need to re-check in that case. + let lookup_tables = tx.message().message_address_table_lookups(); + if !lookup_tables.is_empty() { + bank.load_addresses(lookup_tables)?; + } + } + Ok(()) + }); + self.process_and_record_transactions_with_pre_results(bank, txs, 0, pre_results) + } + + fn process_and_record_transactions_with_pre_results( + &self, + bank: &Arc, + txs: &[SanitizedTransaction], + chunk_offset: usize, + pre_results: impl Iterator>, ) -> ProcessTransactionBatchOutput { let ( (transaction_qos_cost_results, cost_model_throttled_transactions_count), @@ -399,7 +444,7 @@ impl Consumer { ) = measure_us!(self.qos_service.select_and_accumulate_transaction_costs( bank, txs, - std::iter::repeat(Ok(())) // no filtering before QoS + pre_results )); // Only lock accounts for those transactions are selected for the block; @@ -676,7 +721,9 @@ mod tests { use { super::*, crate::{ - banking_stage::tests::{create_slow_genesis_config, simulate_poh}, + banking_stage::tests::{ + create_slow_genesis_config, sanitize_transactions, simulate_poh, + }, immutable_deserialized_packet::DeserializedPacketError, unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches}, unprocessed_transaction_storage::ThreadType, @@ -719,12 +766,6 @@ mod tests { }, }; - fn sanitize_transactions(txs: Vec) -> Vec { - txs.into_iter() - .map(SanitizedTransaction::from_transaction_for_tests) - .collect() - } - fn execute_transactions_with_dummy_poh_service( bank: Arc, transactions: Vec, diff --git a/core/src/banking_stage/forward_worker.rs b/core/src/banking_stage/forward_worker.rs new file mode 100644 index 0000000000..e171eb6b45 --- /dev/null +++ b/core/src/banking_stage/forward_worker.rs @@ -0,0 +1,231 @@ +use { + super::{ + forwarder::Forwarder, + scheduler_messages::{FinishedForwardWork, ForwardWork}, + ForwardOption, + }, + crossbeam_channel::{Receiver, RecvError, SendError, Sender}, + thiserror::Error, +}; + +#[derive(Debug, Error)] +pub enum ForwardWorkerError { + #[error("Failed to receive work from scheduler: {0}")] + Recv(#[from] RecvError), + #[error("Failed to send finalized forward work to scheduler: {0}")] + Send(#[from] SendError), +} + +pub(crate) struct ForwardWorker { + forward_receiver: Receiver, + forward_option: ForwardOption, + forwarder: Forwarder, + forwarded_sender: Sender, +} + +#[allow(dead_code)] +impl ForwardWorker { + pub fn new( + forward_receiver: Receiver, + forward_option: ForwardOption, + forwarder: Forwarder, + forwarded_sender: Sender, + ) -> Self { + Self { + forward_receiver, + forward_option, + forwarder, + forwarded_sender, + } + } + + pub fn run(self) -> Result<(), ForwardWorkerError> { + loop { + let work = self.forward_receiver.recv()?; + self.forward_loop(work)?; + } + } + + fn forward_loop(&self, work: ForwardWork) -> Result<(), ForwardWorkerError> { + for work in try_drain_iter(work, &self.forward_receiver) { + let (res, _num_packets, _forward_us, _leader_pubkey) = self.forwarder.forward_packets( + &self.forward_option, + work.packets.iter().map(|p| p.original_packet()), + ); + match res { + Ok(()) => self.forwarded_sender.send(FinishedForwardWork { + work, + successful: true, + })?, + Err(_err) => return self.failed_forward_drain(work), + }; + } + Ok(()) + } + + fn failed_forward_drain(&self, work: ForwardWork) -> Result<(), ForwardWorkerError> { + for work in try_drain_iter(work, &self.forward_receiver) { + self.forwarded_sender.send(FinishedForwardWork { + work, + successful: false, + })?; + } + Ok(()) + } +} + +/// Helper function to create an non-blocking iterator over work in the receiver, +/// starting with the given work item. +fn try_drain_iter(work: T, receiver: &Receiver) -> impl Iterator + '_ { + std::iter::once(work).chain(receiver.try_iter()) +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::{ + banking_stage::{ + scheduler_messages::TransactionId, + tests::{create_slow_genesis_config, new_test_cluster_info, simulate_poh}, + }, + immutable_deserialized_packet::ImmutableDeserializedPacket, + }, + crossbeam_channel::unbounded, + solana_ledger::{ + blockstore::Blockstore, genesis_utils::GenesisConfigInfo, + get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, + }, + solana_perf::packet::to_packet_batches, + solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, + solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_sdk::{ + genesis_config::GenesisConfig, poh_config::PohConfig, pubkey::Pubkey, + signature::Keypair, system_transaction, + }, + std::{ + sync::{atomic::AtomicBool, Arc, RwLock}, + thread::JoinHandle, + }, + tempfile::TempDir, + }; + + // Helper struct to create tests that hold channels, files, etc. + // such that our tests can be more easily set up and run. + struct TestFrame { + mint_keypair: Keypair, + genesis_config: GenesisConfig, + _ledger_path: TempDir, + _entry_receiver: Receiver, + _poh_simulator: JoinHandle<()>, + + forward_sender: Sender, + forwarded_receiver: Receiver, + } + + fn setup_test_frame() -> (TestFrame, ForwardWorker) { + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_slow_genesis_config(10_000); + let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = bank_forks.read().unwrap().working_bank(); + + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.clone(), + Some((4, 4)), + bank.ticks_per_slot(), + &Pubkey::new_unique(), + Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &PohConfig::default(), + Arc::new(AtomicBool::default()), + ); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); + let poh_simulator = simulate_poh(record_receiver, &poh_recorder); + + let (_local_node, cluster_info) = new_test_cluster_info(None); + let cluster_info = Arc::new(cluster_info); + let forwarder = Forwarder::new( + poh_recorder, + bank_forks, + cluster_info, + Arc::default(), + Arc::default(), + ); + + let (forward_sender, forward_receiver) = unbounded(); + let (forwarded_sender, forwarded_receiver) = unbounded(); + let worker = ForwardWorker::new( + forward_receiver, + ForwardOption::ForwardTransaction, + forwarder, + forwarded_sender, + ); + + ( + TestFrame { + mint_keypair, + genesis_config, + _ledger_path: ledger_path, + _entry_receiver: entry_receiver, + _poh_simulator: poh_simulator, + forward_sender, + forwarded_receiver, + }, + worker, + ) + } + + #[test] + fn test_worker_forward_simple() { + let (test_frame, worker) = setup_test_frame(); + let TestFrame { + mint_keypair, + genesis_config, + forward_sender, + forwarded_receiver, + .. + } = &test_frame; + let worker_thread = std::thread::spawn(move || worker.run()); + + let pubkey1 = Pubkey::new_unique(); + let pubkey2 = Pubkey::new_unique(); + + let txs = vec![ + system_transaction::transfer(mint_keypair, &pubkey1, 2, genesis_config.hash()), + system_transaction::transfer(mint_keypair, &pubkey2, 2, genesis_config.hash()), + ]; + + let id1 = TransactionId::new(1); + let id2 = TransactionId::new(0); + + let packets = to_packet_batches(&txs, 2); + assert_eq!(packets.len(), 1); + let packets = packets[0] + .into_iter() + .cloned() + .map(|p| ImmutableDeserializedPacket::new(p).unwrap()) + .map(Arc::new) + .collect(); + forward_sender + .send(ForwardWork { + packets, + ids: vec![id1, id2], + }) + .unwrap(); + let forwarded = forwarded_receiver.recv().unwrap(); + assert_eq!(forwarded.work.ids, vec![id1, id2]); + assert!(forwarded.successful); + + drop(test_frame); + let _ = worker_thread.join().unwrap(); + } +} diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index f563d82217..6bb36bbf8d 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -142,7 +142,7 @@ impl Forwarder { /// Forwards all valid, unprocessed packets in the iterator, up to a rate limit. /// Returns whether forwarding succeeded, the number of attempted forwarded packets /// if any, the time spent forwarding in us, and the leader pubkey if any. - fn forward_packets<'a>( + pub(crate) fn forward_packets<'a>( &self, forward_option: &ForwardOption, forwardable_packets: impl Iterator, diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index f1bbab9e65..b4b608258f 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -221,7 +221,7 @@ struct RentMetrics { } pub type BankStatusCache = StatusCache>; -#[frozen_abi(digest = "GBTLfFjModD9ykS9LV4pGi4S8eCrUj2JjWSDQLf8tMwV")] +#[frozen_abi(digest = "4uKZVBUbS5wkMK6vSzUoeQjAKbXd7AGeNakBeaBG9f4i")] pub type BankSlotDelta = SlotDelta>; #[derive(Default, Copy, Clone, Debug, PartialEq, Eq)] diff --git a/sdk/program/src/message/sanitized.rs b/sdk/program/src/message/sanitized.rs index f48fd29969..3ae6575933 100644 --- a/sdk/program/src/message/sanitized.rs +++ b/sdk/program/src/message/sanitized.rs @@ -197,6 +197,14 @@ impl SanitizedMessage { } } + /// Returns the list of account keys used for account lookup tables. + pub fn message_address_table_lookups(&self) -> &[v0::MessageAddressTableLookup] { + match self { + Self::Legacy(_message) => &[], + Self::V0(message) => &message.message.address_table_lookups, + } + } + /// Returns true if the account at the specified index is an input to some /// program instruction in this message. fn is_key_passed_to_program(&self, key_index: usize) -> bool { diff --git a/sdk/src/transaction/error.rs b/sdk/src/transaction/error.rs index 68f3ade8c0..f26e0189c1 100644 --- a/sdk/src/transaction/error.rs +++ b/sdk/src/transaction/error.rs @@ -157,6 +157,10 @@ pub enum TransactionError { /// LoadedAccountsDataSizeLimit set for transaction must be greater than 0. #[error("LoadedAccountsDataSizeLimit set for transaction must be greater than 0.")] InvalidLoadedAccountsDataSizeLimit, + + /// Sanitized transaction differed before/after feature activiation. Needs to be resanitized. + #[error("ResanitizationNeeded")] + ResanitizationNeeded, } impl From for TransactionError { diff --git a/storage-proto/proto/transaction_by_addr.proto b/storage-proto/proto/transaction_by_addr.proto index 375b6d189a..8b4d6d572d 100644 --- a/storage-proto/proto/transaction_by_addr.proto +++ b/storage-proto/proto/transaction_by_addr.proto @@ -59,6 +59,7 @@ enum TransactionErrorType { INSUFFICIENT_FUNDS_FOR_RENT = 31; MAX_LOADED_ACCOUNTS_DATA_SIZE_EXCEEDED = 32; INVALID_LOADED_ACCOUNTS_DATA_SIZE_LIMIT = 33; + RESANITIZATION_NEEDED = 34; } message InstructionError { diff --git a/storage-proto/src/convert.rs b/storage-proto/src/convert.rs index 44c332051f..0817e0553b 100644 --- a/storage-proto/src/convert.rs +++ b/storage-proto/src/convert.rs @@ -804,6 +804,7 @@ impl TryFrom for TransactionError { 29 => TransactionError::WouldExceedAccountDataTotalLimit, 32 => TransactionError::MaxLoadedAccountsDataSizeExceeded, 33 => TransactionError::InvalidLoadedAccountsDataSizeLimit, + 34 => TransactionError::ResanitizationNeeded, _ => return Err("Invalid TransactionError"), }) } @@ -913,6 +914,9 @@ impl From for tx_by_addr::TransactionError { TransactionError::InvalidLoadedAccountsDataSizeLimit => { tx_by_addr::TransactionErrorType::InvalidLoadedAccountsDataSizeLimit } + TransactionError::ResanitizationNeeded => { + tx_by_addr::TransactionErrorType::ResanitizationNeeded + } } as i32, instruction_error: match transaction_error { TransactionError::InstructionError(index, ref instruction_error) => {