diff --git a/core/src/banking_stage/immutable_deserialized_packet.rs b/core/src/banking_stage/immutable_deserialized_packet.rs index 461770205..8a9d82e32 100644 --- a/core/src/banking_stage/immutable_deserialized_packet.rs +++ b/core/src/banking_stage/immutable_deserialized_packet.rs @@ -96,6 +96,10 @@ impl ImmutableDeserializedPacket { self.priority_details.compute_unit_limit } + pub fn priority_details(&self) -> TransactionPriorityDetails { + self.priority_details.clone() + } + // This function deserializes packets into transactions, computes the blake3 hash of transaction // messages, and verifies secp256k1 instructions. pub fn build_sanitized_transaction( diff --git a/core/src/banking_stage/transaction_scheduler/mod.rs b/core/src/banking_stage/transaction_scheduler/mod.rs index bf6f761ba..0b65dce06 100644 --- a/core/src/banking_stage/transaction_scheduler/mod.rs +++ b/core/src/banking_stage/transaction_scheduler/mod.rs @@ -12,6 +12,8 @@ mod batch_id_generator; mod in_flight_tracker; #[allow(dead_code)] mod prio_graph_scheduler; +#[allow(dead_code)] +mod scheduler_controller; mod scheduler_error; #[allow(dead_code)] mod transaction_id_generator; diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs new file mode 100644 index 000000000..8c1dc4f91 --- /dev/null +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -0,0 +1,630 @@ +//! Control flow for BankingStage's transaction scheduler. +//! + +use { + super::{ + prio_graph_scheduler::PrioGraphScheduler, scheduler_error::SchedulerError, + transaction_id_generator::TransactionIdGenerator, + transaction_state::SanitizedTransactionTTL, + transaction_state_container::TransactionStateContainer, + }, + crate::banking_stage::{ + decision_maker::{BufferedPacketsDecision, DecisionMaker}, + immutable_deserialized_packet::ImmutableDeserializedPacket, + packet_deserializer::PacketDeserializer, + TOTAL_BUFFERED_PACKETS, + }, + crossbeam_channel::RecvTimeoutError, + solana_runtime::bank_forks::BankForks, + std::{ + sync::{Arc, RwLock}, + time::Duration, + }, +}; + +/// Controls packet and transaction flow into scheduler, and scheduling execution. +pub(crate) struct SchedulerController { + /// Decision maker for determining what should be done with transactions. + decision_maker: DecisionMaker, + /// Packet/Transaction ingress. + packet_receiver: PacketDeserializer, + bank_forks: Arc>, + /// Generates unique IDs for incoming transactions. + transaction_id_generator: TransactionIdGenerator, + /// Container for transaction state. + /// Shared resource between `packet_receiver` and `scheduler`. + container: TransactionStateContainer, + /// State for scheduling and communicating with worker threads. + scheduler: PrioGraphScheduler, +} + +impl SchedulerController { + pub fn new( + decision_maker: DecisionMaker, + packet_deserializer: PacketDeserializer, + bank_forks: Arc>, + scheduler: PrioGraphScheduler, + ) -> Self { + Self { + decision_maker, + packet_receiver: packet_deserializer, + bank_forks, + transaction_id_generator: TransactionIdGenerator::default(), + container: TransactionStateContainer::with_capacity(TOTAL_BUFFERED_PACKETS), + scheduler, + } + } + + pub fn run(mut self) -> Result<(), SchedulerError> { + loop { + // BufferedPacketsDecision is shared with legacy BankingStage, which will forward + // packets. Initially, not renaming these decision variants but the actions taken + // are different, since new BankingStage will not forward packets. + // For `Forward` and `ForwardAndHold`, we want to receive packets but will not + // forward them to the next leader. In this case, `ForwardAndHold` is + // indistiguishable from `Hold`. + // + // `Forward` will drop packets from the buffer instead of forwarding. + // During receiving, since packets would be dropped from buffer anyway, we can + // bypass sanitization and buffering and immediately drop the packets. + let decision = self.decision_maker.make_consume_or_forward_decision(); + + self.process_transactions(&decision)?; + self.scheduler.receive_completed(&mut self.container)?; + if !self.receive_packets(&decision) { + break; + } + } + + Ok(()) + } + + /// Process packets based on decision. + fn process_transactions( + &mut self, + decision: &BufferedPacketsDecision, + ) -> Result<(), SchedulerError> { + match decision { + BufferedPacketsDecision::Consume(_bank_start) => { + let _num_scheduled = self.scheduler.schedule(&mut self.container)?; + } + BufferedPacketsDecision::Forward => { + self.clear_container(); + } + BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => {} + } + + Ok(()) + } + + /// Clears the transaction state container. + /// This only clears pending transactions, and does **not** clear in-flight transactions. + fn clear_container(&mut self) { + while let Some(id) = self.container.pop() { + self.container.remove_by_id(&id.id); + } + } + + /// Returns whether the packet receiver is still connected. + fn receive_packets(&mut self, decision: &BufferedPacketsDecision) -> bool { + let remaining_queue_capacity = self.container.remaining_queue_capacity(); + + const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(100); + let (recv_timeout, should_buffer) = match decision { + BufferedPacketsDecision::Consume(_) => ( + if self.container.is_empty() { + MAX_PACKET_RECEIVE_TIME + } else { + Duration::ZERO + }, + true, + ), + BufferedPacketsDecision::Forward => (MAX_PACKET_RECEIVE_TIME, false), + BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => { + (MAX_PACKET_RECEIVE_TIME, true) + } + }; + + let received_packet_results = self + .packet_receiver + .receive_packets(recv_timeout, remaining_queue_capacity); + + match (received_packet_results, should_buffer) { + (Ok(receive_packet_results), true) => { + self.buffer_packets(receive_packet_results.deserialized_packets) + } + (Ok(receive_packet_results), false) => drop(receive_packet_results), + (Err(RecvTimeoutError::Timeout), _) => {} + (Err(RecvTimeoutError::Disconnected), _) => return false, + } + + true + } + + fn buffer_packets(&mut self, packets: Vec) { + // Sanitize packets, generate IDs, and insert into the container. + let bank = self.bank_forks.read().unwrap().working_bank(); + let last_slot_in_epoch = bank.epoch_schedule().get_last_slot_in_epoch(bank.epoch()); + let feature_set = &bank.feature_set; + let vote_only = bank.vote_only_bank(); + for packet in packets { + let Some(transaction) = + packet.build_sanitized_transaction(feature_set, vote_only, bank.as_ref()) + else { + continue; + }; + + let transaction_id = self.transaction_id_generator.next(); + let transaction_ttl = SanitizedTransactionTTL { + transaction, + max_age_slot: last_slot_in_epoch, + }; + let transaction_priority_details = packet.priority_details(); + self.container.insert_new_transaction( + transaction_id, + transaction_ttl, + transaction_priority_details, + ); + } + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::{ + banking_stage::{ + consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, + scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId}, + tests::create_slow_genesis_config, + }, + banking_trace::BankingPacketBatch, + sigverify::SigverifyTracerPacketStats, + }, + crossbeam_channel::{unbounded, Receiver, Sender}, + itertools::Itertools, + solana_ledger::{ + blockstore::Blockstore, genesis_utils::GenesisConfigInfo, + get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache, + }, + solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS}, + solana_poh::poh_recorder::{PohRecorder, Record, WorkingBankEntry}, + solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, hash::Hash, message::Message, + poh_config::PohConfig, pubkey::Pubkey, signature::Keypair, signer::Signer, + system_instruction, transaction::Transaction, + }, + std::sync::{atomic::AtomicBool, Arc, RwLock}, + tempfile::TempDir, + }; + + const TEST_TIMEOUT: Duration = Duration::from_millis(1000); + + fn create_channels(num: usize) -> (Vec>, Vec>) { + (0..num).map(|_| unbounded()).unzip() + } + + // Helper struct to create tests that hold channels, files, etc. + // such that our tests can be more easily set up and run. + struct TestFrame { + bank: Arc, + _ledger_path: TempDir, + _entry_receiver: Receiver, + _record_receiver: Receiver, + poh_recorder: Arc>, + banking_packet_sender: Sender, Option)>>, + + consume_work_receivers: Vec>, + finished_consume_work_sender: Sender, + } + + fn create_test_frame(num_threads: usize) -> (TestFrame, SchedulerController) { + let GenesisConfigInfo { genesis_config, .. } = create_slow_genesis_config(10_000); + let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + let bank_forks = BankForks::new_rw_arc(bank); + let bank = bank_forks.read().unwrap().working_bank(); + + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let (poh_recorder, entry_receiver, record_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.clone(), + Some((4, 4)), + bank.ticks_per_slot(), + &Pubkey::new_unique(), + Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &PohConfig::default(), + Arc::new(AtomicBool::default()), + ); + let poh_recorder = Arc::new(RwLock::new(poh_recorder)); + let decision_maker = DecisionMaker::new(Pubkey::new_unique(), poh_recorder.clone()); + + let (banking_packet_sender, banking_packet_receiver) = unbounded(); + let packet_deserializer = + PacketDeserializer::new(banking_packet_receiver, bank_forks.clone()); + + let (consume_work_senders, consume_work_receivers) = create_channels(num_threads); + let (finished_consume_work_sender, finished_consume_work_receiver) = unbounded(); + + let test_frame = TestFrame { + bank, + _ledger_path: ledger_path, + _entry_receiver: entry_receiver, + _record_receiver: record_receiver, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + finished_consume_work_sender, + }; + let scheduler_controller = SchedulerController::new( + decision_maker, + packet_deserializer, + bank_forks, + PrioGraphScheduler::new(consume_work_senders, finished_consume_work_receiver), + ); + + (test_frame, scheduler_controller) + } + + fn prioritized_tranfer( + from_keypair: &Keypair, + to_pubkey: &Pubkey, + lamports: u64, + priority: u64, + recent_blockhash: Hash, + ) -> Transaction { + let transfer = system_instruction::transfer(&from_keypair.pubkey(), to_pubkey, lamports); + let prioritization = ComputeBudgetInstruction::set_compute_unit_price(priority); + let message = Message::new(&[transfer, prioritization], Some(&from_keypair.pubkey())); + Transaction::new(&vec![from_keypair], message, recent_blockhash) + } + + fn to_banking_packet_batch(txs: &[Transaction]) -> BankingPacketBatch { + let packet_batch = to_packet_batches(txs, NUM_PACKETS); + Arc::new((packet_batch, None)) + } + + #[test] + #[should_panic(expected = "batch id 0 is not being tracked")] + fn test_unexpected_batch_id() { + let (test_frame, central_scheduler_banking_stage) = create_test_frame(1); + let TestFrame { + finished_consume_work_sender, + .. + } = &test_frame; + + finished_consume_work_sender + .send(FinishedConsumeWork { + work: ConsumeWork { + batch_id: TransactionBatchId::new(0), + ids: vec![], + transactions: vec![], + max_age_slots: vec![], + }, + retryable_indexes: vec![], + }) + .unwrap(); + + central_scheduler_banking_stage.run().unwrap(); + } + + #[test] + fn test_schedule_consume_single_threaded_no_conflicts() { + let (test_frame, central_scheduler_banking_stage) = create_test_frame(1); + let TestFrame { + bank, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + .. + } = &test_frame; + + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + let scheduler_thread = std::thread::spawn(move || central_scheduler_banking_stage.run()); + + // Send packet batch to the scheduler - should do nothing until we become the leader. + let tx1 = prioritized_tranfer( + &Keypair::new(), + &Pubkey::new_unique(), + 1, + 1, + bank.last_blockhash(), + ); + let tx2 = prioritized_tranfer( + &Keypair::new(), + &Pubkey::new_unique(), + 1, + 2, + bank.last_blockhash(), + ); + let tx1_hash = tx1.message().hash(); + let tx2_hash = tx2.message().hash(); + + let txs = vec![tx1, tx2]; + banking_packet_sender + .send(to_banking_packet_batch(&txs)) + .unwrap(); + + let consume_work = consume_work_receivers[0] + .recv_timeout(TEST_TIMEOUT) + .unwrap(); + assert_eq!(consume_work.ids.len(), 2); + assert_eq!(consume_work.transactions.len(), 2); + let message_hashes = consume_work + .transactions + .iter() + .map(|tx| tx.message_hash()) + .collect_vec(); + assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); + + drop(test_frame); + let _ = scheduler_thread.join(); + } + + #[test] + fn test_schedule_consume_single_threaded_conflict() { + let (test_frame, central_scheduler_banking_stage) = create_test_frame(1); + let TestFrame { + bank, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + .. + } = &test_frame; + + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + let scheduler_thread = std::thread::spawn(move || central_scheduler_banking_stage.run()); + + let pk = Pubkey::new_unique(); + let tx1 = prioritized_tranfer(&Keypair::new(), &pk, 1, 1, bank.last_blockhash()); + let tx2 = prioritized_tranfer(&Keypair::new(), &pk, 1, 2, bank.last_blockhash()); + let tx1_hash = tx1.message().hash(); + let tx2_hash = tx2.message().hash(); + + let txs = vec![tx1, tx2]; + banking_packet_sender + .send(to_banking_packet_batch(&txs)) + .unwrap(); + + // We expect 2 batches to be scheduled + let consume_works = (0..2) + .map(|_| { + consume_work_receivers[0] + .recv_timeout(TEST_TIMEOUT) + .unwrap() + }) + .collect_vec(); + + let num_txs_per_batch = consume_works.iter().map(|cw| cw.ids.len()).collect_vec(); + let message_hashes = consume_works + .iter() + .flat_map(|cw| cw.transactions.iter().map(|tx| tx.message_hash())) + .collect_vec(); + assert_eq!(num_txs_per_batch, vec![1; 2]); + assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); + + drop(test_frame); + let _ = scheduler_thread.join(); + } + + #[test] + fn test_schedule_consume_single_threaded_multi_batch() { + let (test_frame, central_scheduler_banking_stage) = create_test_frame(1); + let TestFrame { + bank, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + .. + } = &test_frame; + + let scheduler_thread = std::thread::spawn(move || central_scheduler_banking_stage.run()); + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + + // Send multiple batches - all get scheduled + let txs1 = (0..2 * TARGET_NUM_TRANSACTIONS_PER_BATCH) + .map(|i| { + prioritized_tranfer( + &Keypair::new(), + &Pubkey::new_unique(), + i as u64, + 1, + bank.last_blockhash(), + ) + }) + .collect_vec(); + let txs2 = (0..2 * TARGET_NUM_TRANSACTIONS_PER_BATCH) + .map(|i| { + prioritized_tranfer( + &Keypair::new(), + &Pubkey::new_unique(), + i as u64, + 2, + bank.last_blockhash(), + ) + }) + .collect_vec(); + + banking_packet_sender + .send(to_banking_packet_batch(&txs1)) + .unwrap(); + banking_packet_sender + .send(to_banking_packet_batch(&txs2)) + .unwrap(); + + // We expect 4 batches to be scheduled + let consume_works = (0..4) + .map(|_| { + consume_work_receivers[0] + .recv_timeout(TEST_TIMEOUT) + .unwrap() + }) + .collect_vec(); + + assert_eq!( + consume_works.iter().map(|cw| cw.ids.len()).collect_vec(), + vec![TARGET_NUM_TRANSACTIONS_PER_BATCH; 4] + ); + + drop(test_frame); + let _ = scheduler_thread.join(); + } + + #[test] + fn test_schedule_consume_simple_thread_selection() { + let (test_frame, central_scheduler_banking_stage) = create_test_frame(2); + let TestFrame { + bank, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + .. + } = &test_frame; + + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + let scheduler_thread = std::thread::spawn(move || central_scheduler_banking_stage.run()); + + // Send 4 transactions w/o conflicts. 2 should be scheduled on each thread + let txs = (0..4) + .map(|i| { + prioritized_tranfer( + &Keypair::new(), + &Pubkey::new_unique(), + 1, + i, + bank.last_blockhash(), + ) + }) + .collect_vec(); + banking_packet_sender + .send(to_banking_packet_batch(&txs)) + .unwrap(); + + // Priority Expectation: + // Thread 0: [3, 1] + // Thread 1: [2, 0] + let t0_expected = [3, 1] + .into_iter() + .map(|i| txs[i].message().hash()) + .collect_vec(); + let t1_expected = [2, 0] + .into_iter() + .map(|i| txs[i].message().hash()) + .collect_vec(); + let t0_actual = consume_work_receivers[0] + .recv_timeout(TEST_TIMEOUT) + .unwrap() + .transactions + .iter() + .map(|tx| *tx.message_hash()) + .collect_vec(); + let t1_actual = consume_work_receivers[1] + .recv_timeout(TEST_TIMEOUT) + .unwrap() + .transactions + .iter() + .map(|tx| *tx.message_hash()) + .collect_vec(); + + assert_eq!(t0_actual, t0_expected); + assert_eq!(t1_actual, t1_expected); + + drop(test_frame); + let _ = scheduler_thread.join(); + } + + #[test] + fn test_schedule_consume_retryable() { + let (test_frame, central_scheduler_banking_stage) = create_test_frame(1); + let TestFrame { + bank, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + finished_consume_work_sender, + .. + } = &test_frame; + + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + let scheduler_thread = std::thread::spawn(move || central_scheduler_banking_stage.run()); + + // Send packet batch to the scheduler - should do nothing until we become the leader. + let tx1 = prioritized_tranfer( + &Keypair::new(), + &Pubkey::new_unique(), + 1, + 1, + bank.last_blockhash(), + ); + let tx2 = prioritized_tranfer( + &Keypair::new(), + &Pubkey::new_unique(), + 1, + 2, + bank.last_blockhash(), + ); + let tx1_hash = tx1.message().hash(); + let tx2_hash = tx2.message().hash(); + + let txs = vec![tx1, tx2]; + banking_packet_sender + .send(to_banking_packet_batch(&txs)) + .unwrap(); + + let consume_work = consume_work_receivers[0] + .recv_timeout(TEST_TIMEOUT) + .unwrap(); + assert_eq!(consume_work.ids.len(), 2); + assert_eq!(consume_work.transactions.len(), 2); + let message_hashes = consume_work + .transactions + .iter() + .map(|tx| tx.message_hash()) + .collect_vec(); + assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); + + // Complete the batch - marking the second transaction as retryable + finished_consume_work_sender + .send(FinishedConsumeWork { + work: consume_work, + retryable_indexes: vec![1], + }) + .unwrap(); + + // Transaction should be rescheduled + let consume_work = consume_work_receivers[0] + .recv_timeout(TEST_TIMEOUT) + .unwrap(); + assert_eq!(consume_work.ids.len(), 1); + assert_eq!(consume_work.transactions.len(), 1); + let message_hashes = consume_work + .transactions + .iter() + .map(|tx| tx.message_hash()) + .collect_vec(); + assert_eq!(message_hashes, vec![&tx1_hash]); + + drop(test_frame); + let _ = scheduler_thread.join(); + } +} diff --git a/runtime/src/transaction_priority_details.rs b/runtime/src/transaction_priority_details.rs index 0d0a94df4..1e4ddc532 100644 --- a/runtime/src/transaction_priority_details.rs +++ b/runtime/src/transaction_priority_details.rs @@ -7,7 +7,7 @@ use { }, }; -#[derive(Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct TransactionPriorityDetails { pub priority: u64, pub compute_unit_limit: u64,