From 81a007b3c86b924f170139fbd62693b65f0b16b9 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 13 Nov 2023 22:18:54 +0800 Subject: [PATCH] TransactionScheduler: CLI and hookup for central-scheduler (#33890) --- core/src/banking_stage.rs | 245 +++++++++++++++--- core/src/banking_stage/committer.rs | 1 + core/src/banking_stage/decision_maker.rs | 1 + .../transaction_scheduler/mod.rs | 18 +- core/src/validator.rs | 1 + 5 files changed, 219 insertions(+), 47 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 19c3eb55eb..e4e5f3125e 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -16,17 +16,26 @@ use { unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, }, crate::{ - banking_trace::BankingPacketReceiver, tracer_packet_stats::TracerPacketStats, + banking_stage::{ + consume_worker::ConsumeWorker, + packet_deserializer::PacketDeserializer, + transaction_scheduler::{ + prio_graph_scheduler::PrioGraphScheduler, + scheduler_controller::SchedulerController, scheduler_error::SchedulerError, + }, + }, + banking_trace::BankingPacketReceiver, + tracer_packet_stats::TracerPacketStats, validator::BlockProductionMethod, }, - crossbeam_channel::RecvTimeoutError, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, histogram::Histogram, solana_client::connection_cache::ConnectionCache, solana_gossip::cluster_info::ClusterInfo, solana_ledger::blockstore_processor::TransactionStatusSender, solana_measure::{measure, measure_us}, solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH}, - solana_poh::poh_recorder::PohRecorder, + solana_poh::poh_recorder::{PohRecorder, TransactionRecorder}, solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache}, solana_sdk::timing::AtomicInterval, solana_vote::vote_sender_types::ReplayVoteSender, @@ -378,6 +387,20 @@ impl BankingStage { prioritization_fee_cache, ) } + BlockProductionMethod::CentralScheduler => Self::new_central_scheduler( + cluster_info, + poh_recorder, + non_vote_receiver, + tpu_vote_receiver, + gossip_vote_receiver, + num_threads, + transaction_status_sender, + replay_vote_sender, + log_messages_bytes_limit, + connection_cache, + bank_forks, + prioritization_fee_cache, + ), } } @@ -405,6 +428,15 @@ impl BankingStage { TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize); // Keeps track of extraneous vote transactions for the vote threads let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + + let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); + let committer = Committer::new( + transaction_status_sender.clone(), + replay_vote_sender.clone(), + prioritization_fee_cache.clone(), + ); + let transaction_recorder = poh_recorder.read().unwrap().new_recorder(); + // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) .map(|id| { @@ -432,16 +464,6 @@ impl BankingStage { ), }; - let mut packet_receiver = - PacketReceiver::new(id, packet_receiver, bank_forks.clone()); - let poh_recorder = poh_recorder.clone(); - - let committer = Committer::new( - transaction_status_sender.clone(), - replay_vote_sender.clone(), - prioritization_fee_cache.clone(), - ); - let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let forwarder = Forwarder::new( poh_recorder.clone(), bank_forks.clone(), @@ -449,31 +471,175 @@ impl BankingStage { connection_cache.clone(), data_budget.clone(), ); - let consumer = Consumer::new( - committer, - poh_recorder.read().unwrap().new_recorder(), - QosService::new(id), - log_messages_bytes_limit, - ); - Builder::new() - .name(format!("solBanknStgTx{id:02}")) - .spawn(move || { - Self::process_loop( - &mut packet_receiver, - &decision_maker, - &forwarder, - &consumer, - id, - unprocessed_transaction_storage, - ); - }) - .unwrap() + Self::spawn_thread_local_multi_iterator_thread( + id, + packet_receiver, + bank_forks.clone(), + decision_maker.clone(), + committer.clone(), + transaction_recorder.clone(), + log_messages_bytes_limit, + forwarder, + unprocessed_transaction_storage, + ) }) .collect(); Self { bank_thread_hdls } } + #[allow(clippy::too_many_arguments)] + pub fn new_central_scheduler( + cluster_info: &Arc, + poh_recorder: &Arc>, + non_vote_receiver: BankingPacketReceiver, + tpu_vote_receiver: BankingPacketReceiver, + gossip_vote_receiver: BankingPacketReceiver, + num_threads: u32, + transaction_status_sender: Option, + replay_vote_sender: ReplayVoteSender, + log_messages_bytes_limit: Option, + connection_cache: Arc, + bank_forks: Arc>, + prioritization_fee_cache: &Arc, + ) -> Self { + assert!(num_threads >= MIN_TOTAL_THREADS); + // Single thread to generate entries from many banks. + // This thread talks to poh_service and broadcasts the entries once they have been recorded. + // Once an entry has been recorded, its blockhash is registered with the bank. + let data_budget = Arc::new(DataBudget::default()); + // Keeps track of extraneous vote transactions for the vote threads + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + + let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); + let committer = Committer::new( + transaction_status_sender.clone(), + replay_vote_sender.clone(), + prioritization_fee_cache.clone(), + ); + let transaction_recorder = poh_recorder.read().unwrap().new_recorder(); + + // + 1 for the central scheduler thread + let mut bank_thread_hdls = Vec::with_capacity(num_threads as usize + 1); + + // Spawn legacy voting threads first: 1 gossip, 1 tpu + for (id, packet_receiver, vote_source) in [ + (0, gossip_vote_receiver, VoteSource::Gossip), + (1, tpu_vote_receiver, VoteSource::Tpu), + ] { + bank_thread_hdls.push(Self::spawn_thread_local_multi_iterator_thread( + id, + packet_receiver, + bank_forks.clone(), + decision_maker.clone(), + committer.clone(), + transaction_recorder.clone(), + log_messages_bytes_limit, + Forwarder::new( + poh_recorder.clone(), + bank_forks.clone(), + cluster_info.clone(), + connection_cache.clone(), + data_budget.clone(), + ), + UnprocessedTransactionStorage::new_vote_storage( + latest_unprocessed_votes.clone(), + vote_source, + ), + )); + } + + // Create channels for communication between scheduler and workers + let num_workers = (num_threads).saturating_sub(NUM_VOTE_PROCESSING_THREADS); + let (work_senders, work_receivers): (Vec>, Vec>) = + (0..num_workers).map(|_| unbounded()).unzip(); + let (finished_work_sender, finished_work_receiver) = unbounded(); + + // Spawn the worker threads + for (index, work_receiver) in work_receivers.into_iter().enumerate() { + let id = (index as u32).saturating_add(NUM_VOTE_PROCESSING_THREADS); + let consume_worker = ConsumeWorker::new( + work_receiver, + Consumer::new( + committer.clone(), + poh_recorder.read().unwrap().new_recorder(), + QosService::new(id), + log_messages_bytes_limit, + ), + finished_work_sender.clone(), + poh_recorder.read().unwrap().new_leader_bank_notifier(), + ); + + bank_thread_hdls.push( + Builder::new() + .name(format!("solCoWorker{id:02}")) + .spawn(move || { + let _ = consume_worker.run(); + }) + .unwrap(), + ) + } + + // Spawn the central scheduler thread + bank_thread_hdls.push({ + let packet_deserializer = + PacketDeserializer::new(non_vote_receiver, bank_forks.clone()); + let scheduler = PrioGraphScheduler::new(work_senders, finished_work_receiver); + let scheduler_controller = SchedulerController::new( + decision_maker.clone(), + packet_deserializer, + bank_forks, + scheduler, + ); + Builder::new() + .name("solBnkTxSched".to_string()) + .spawn(move || match scheduler_controller.run() { + Ok(_) => {} + Err(SchedulerError::DisconnectedRecvChannel(_)) => {} + Err(SchedulerError::DisconnectedSendChannel(_)) => { + warn!("Unexpected worker disconnect from scheduler") + } + }) + .unwrap() + }); + + Self { bank_thread_hdls } + } + + fn spawn_thread_local_multi_iterator_thread( + id: u32, + packet_receiver: BankingPacketReceiver, + bank_forks: Arc>, + decision_maker: DecisionMaker, + committer: Committer, + transaction_recorder: TransactionRecorder, + log_messages_bytes_limit: Option, + forwarder: Forwarder, + unprocessed_transaction_storage: UnprocessedTransactionStorage, + ) -> JoinHandle<()> { + let mut packet_receiver = PacketReceiver::new(id, packet_receiver, bank_forks); + let consumer = Consumer::new( + committer, + transaction_recorder, + QosService::new(id), + log_messages_bytes_limit, + ); + + Builder::new() + .name(format!("solBanknStgTx{id:02}")) + .spawn(move || { + Self::process_loop( + &mut packet_receiver, + &decision_maker, + &forwarder, + &consumer, + id, + unprocessed_transaction_storage, + ) + }) + .unwrap() + } + #[allow(clippy::too_many_arguments)] fn process_buffered_packets( decision_maker: &DecisionMaker, @@ -793,8 +959,7 @@ mod tests { with_vers.into_iter().map(|(b, _)| b).collect() } - #[test] - fn test_banking_stage_entries_only() { + fn test_banking_stage_entries_only(block_production_method: BlockProductionMethod) { solana_logger::setup(); let GenesisConfigInfo { genesis_config, @@ -829,7 +994,7 @@ mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( - BlockProductionMethod::ThreadLocalMultiIterator, + block_production_method, &cluster_info, &poh_recorder, non_vote_receiver, @@ -922,6 +1087,16 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } + #[test] + fn test_banking_stage_entries_only_thread_local_multi_iterator() { + test_banking_stage_entries_only(BlockProductionMethod::ThreadLocalMultiIterator); + } + + #[test] + fn test_banking_stage_entries_only_central_scheduler() { + test_banking_stage_entries_only(BlockProductionMethod::CentralScheduler); + } + #[test] fn test_banking_stage_entryfication() { solana_logger::setup(); diff --git a/core/src/banking_stage/committer.rs b/core/src/banking_stage/committer.rs index a5e42cbc75..78bdc88cb1 100644 --- a/core/src/banking_stage/committer.rs +++ b/core/src/banking_stage/committer.rs @@ -36,6 +36,7 @@ pub(super) struct PreBalanceInfo { pub mint_decimals: HashMap, } +#[derive(Clone)] pub struct Committer { transaction_status_sender: Option, replay_vote_sender: ReplayVoteSender, diff --git a/core/src/banking_stage/decision_maker.rs b/core/src/banking_stage/decision_maker.rs index a2d19937ad..9b49054183 100644 --- a/core/src/banking_stage/decision_maker.rs +++ b/core/src/banking_stage/decision_maker.rs @@ -28,6 +28,7 @@ impl BufferedPacketsDecision { } } +#[derive(Clone)] pub struct DecisionMaker { my_pubkey: Pubkey, poh_recorder: Arc>, diff --git a/core/src/banking_stage/transaction_scheduler/mod.rs b/core/src/banking_stage/transaction_scheduler/mod.rs index 0b65dce06a..65ece5fee6 100644 --- a/core/src/banking_stage/transaction_scheduler/mod.rs +++ b/core/src/banking_stage/transaction_scheduler/mod.rs @@ -1,19 +1,13 @@ +mod batch_id_generator; #[allow(dead_code)] +mod in_flight_tracker; +pub(crate) mod prio_graph_scheduler; +pub(crate) mod scheduler_controller; +pub(crate) mod scheduler_error; mod thread_aware_account_locks; - +mod transaction_id_generator; mod transaction_priority_id; #[allow(dead_code)] mod transaction_state; #[allow(dead_code)] mod transaction_state_container; - -mod batch_id_generator; -#[allow(dead_code)] -mod in_flight_tracker; -#[allow(dead_code)] -mod prio_graph_scheduler; -#[allow(dead_code)] -mod scheduler_controller; -mod scheduler_error; -#[allow(dead_code)] -mod transaction_id_generator; diff --git a/core/src/validator.rs b/core/src/validator.rs index d73bf58e86..700315f4a6 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -168,6 +168,7 @@ impl BlockVerificationMethod { pub enum BlockProductionMethod { #[default] ThreadLocalMultiIterator, + CentralScheduler, } impl BlockProductionMethod {