From e860019687734ce1e01702514217f5839c73a31d Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 18 Sep 2023 10:05:27 -0700 Subject: [PATCH] TransactionScheduler: Pipe BlockProductionMethod (#33217) --- banking-bench/src/main.rs | 2 +- core/benches/banking_stage.rs | 3 +++ core/src/banking_stage.rs | 49 +++++++++++++++++++++++++++++++++-- core/src/tpu.rs | 4 ++- core/src/validator.rs | 1 + 5 files changed, 55 insertions(+), 4 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 5d402592ad..bb5149f47c 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -450,7 +450,7 @@ fn main() { DEFAULT_TPU_CONNECTION_POOL_SIZE, ), }; - let banking_stage = BankingStage::new_num_threads( + let banking_stage = BankingStage::new_thread_local_multi_iterator( &cluster_info, &poh_recorder, non_vote_receiver, diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 0b0e6876c3..6219f4abb9 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -1,6 +1,8 @@ #![allow(clippy::arithmetic_side_effects)] #![feature(test)] +use solana_core::validator::BlockProductionMethod; + extern crate test; use { @@ -291,6 +293,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { let cluster_info = Arc::new(cluster_info); let (s, _r) = unbounded(); let _banking_stage = BankingStage::new( + BlockProductionMethod::ThreadLocalMultiIterator, &cluster_info, &poh_recorder, non_vote_receiver, diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 398dad86d0..7e9138048c 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -15,7 +15,10 @@ use { unprocessed_packet_batches::*, unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, }, - crate::{banking_trace::BankingPacketReceiver, tracer_packet_stats::TracerPacketStats}, + crate::{ + banking_trace::BankingPacketReceiver, tracer_packet_stats::TracerPacketStats, + validator::BlockProductionMethod, + }, crossbeam_channel::RecvTimeoutError, histogram::Histogram, solana_client::connection_cache::ConnectionCache, @@ -307,6 +310,7 @@ impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::too_many_arguments)] pub fn new( + block_production_method: BlockProductionMethod, cluster_info: &Arc, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, @@ -320,6 +324,7 @@ impl BankingStage { prioritization_fee_cache: &Arc, ) -> Self { Self::new_num_threads( + block_production_method, cluster_info, poh_recorder, non_vote_receiver, @@ -337,6 +342,42 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] pub fn new_num_threads( + block_production_method: BlockProductionMethod, + cluster_info: &Arc, + poh_recorder: &Arc>, + non_vote_receiver: BankingPacketReceiver, + tpu_vote_receiver: BankingPacketReceiver, + gossip_vote_receiver: BankingPacketReceiver, + num_threads: u32, + transaction_status_sender: Option, + replay_vote_sender: ReplayVoteSender, + log_messages_bytes_limit: Option, + connection_cache: Arc, + bank_forks: Arc>, + prioritization_fee_cache: &Arc, + ) -> Self { + match block_production_method { + BlockProductionMethod::ThreadLocalMultiIterator => { + Self::new_thread_local_multi_iterator( + cluster_info, + poh_recorder, + non_vote_receiver, + tpu_vote_receiver, + gossip_vote_receiver, + num_threads, + transaction_status_sender, + replay_vote_sender, + log_messages_bytes_limit, + connection_cache, + bank_forks, + prioritization_fee_cache, + ) + } + } + } + + #[allow(clippy::too_many_arguments)] + pub fn new_thread_local_multi_iterator( cluster_info: &Arc, poh_recorder: &Arc>, non_vote_receiver: BankingPacketReceiver, @@ -644,6 +685,7 @@ mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( + BlockProductionMethod::ThreadLocalMultiIterator, &cluster_info, &poh_recorder, non_vote_receiver, @@ -700,6 +742,7 @@ mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( + BlockProductionMethod::ThreadLocalMultiIterator, &cluster_info, &poh_recorder, non_vote_receiver, @@ -781,6 +824,7 @@ mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( + BlockProductionMethod::ThreadLocalMultiIterator, &cluster_info, &poh_recorder, non_vote_receiver, @@ -941,7 +985,7 @@ mod tests { create_test_recorder(bank.clone(), blockstore, Some(poh_config), None); let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None); let cluster_info = Arc::new(cluster_info); - let _banking_stage = BankingStage::new_num_threads( + let _banking_stage = BankingStage::new_thread_local_multi_iterator( &cluster_info, &poh_recorder, non_vote_receiver, @@ -1133,6 +1177,7 @@ mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( + BlockProductionMethod::ThreadLocalMultiIterator, &cluster_info, &poh_recorder, non_vote_receiver, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 5b6b939f87..884153d3d6 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -15,7 +15,7 @@ use { sigverify_stage::SigVerifyStage, staked_nodes_updater_service::StakedNodesUpdaterService, tpu_entry_notifier::TpuEntryNotifier, - validator::GeneratorConfig, + validator::{BlockProductionMethod, GeneratorConfig}, }, bytes::Bytes, crossbeam_channel::{unbounded, Receiver}, @@ -112,6 +112,7 @@ impl Tpu { tracer_thread_hdl: TracerThread, tpu_enable_udp: bool, prioritization_fee_cache: &Arc, + block_production_method: BlockProductionMethod, _generator_config: Option, /* vestigial code for replay invalidator */ ) -> Self { let TpuSockets { @@ -221,6 +222,7 @@ impl Tpu { ); let banking_stage = BankingStage::new( + block_production_method, cluster_info, poh_recorder, non_vote_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index cb40bd0ff9..a0c39da764 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1296,6 +1296,7 @@ impl Validator { tracer_thread, tpu_enable_udp, &prioritization_fee_cache, + config.block_production_method.clone(), config.generator_config.clone(), );