From 4b17acf64ed423ea89792f3e46e46a29e734cdff Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 9 Feb 2023 13:22:42 -0800 Subject: [PATCH] BankingStage Refactor: Add state to Committer (#30107) --- core/benches/banking_stage.rs | 6 +- core/src/banking_stage.rs | 137 +++++++++++++--------------- core/src/banking_stage/committer.rs | 31 +++++-- 3 files changed, 89 insertions(+), 85 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 8f8b2d0a25..c918cae950 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -10,7 +10,7 @@ use { rayon::prelude::*, solana_client::connection_cache::ConnectionCache, solana_core::{ - banking_stage::{BankingStage, BankingStageStats}, + banking_stage::{committer::Committer, BankingStage, BankingStageStats}, banking_trace::{BankingPacketBatch, BankingTracer}, leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, qos_service::QosService, @@ -91,16 +91,16 @@ fn bench_consume_buffered(bencher: &mut Bencher) { ThreadType::Transactions, ); let (s, _r) = unbounded(); + let committer = Committer::new(None, s); // This tests the performance of buffering packets. // If the packet buffers are copied, performance will be poor. bencher.iter(move || { BankingStage::consume_buffered_packets( &bank_start, &mut transaction_buffer, - &None, - &s, None::>, &BankingStageStats::default(), + &committer, &recorder, &QosService::new(1), &mut LeaderSlotMetricsTracker::new(0), diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0c3ababc7e..4619b15d4a 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -443,9 +443,11 @@ impl BankingStage { let mut packet_receiver = PacketReceiver::new(id, packet_receiver); let poh_recorder = poh_recorder.clone(); - let transaction_status_sender = transaction_status_sender.clone(); - let replay_vote_sender = replay_vote_sender.clone(); + let committer = Committer::new( + transaction_status_sender.clone(), + replay_vote_sender.clone(), + ); let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let forwarder = Forwarder::new( poh_recorder.clone(), @@ -462,10 +464,9 @@ impl BankingStage { &mut packet_receiver, &decision_maker, &forwarder, + &committer, &poh_recorder, id, - transaction_status_sender, - replay_vote_sender, log_messages_bytes_limit, unprocessed_transaction_storage, ); @@ -480,9 +481,8 @@ impl BankingStage { fn do_process_packets( bank_start: &BankStart, payload: &mut ConsumeScannerPayload, + committer: &Committer, recorder: &TransactionRecorder, - transaction_status_sender: &Option, - replay_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, qos_service: &QosService, log_messages_bytes_limit: Option, @@ -500,10 +500,9 @@ impl BankingStage { Self::process_packets_transactions( &bank_start.working_bank, &bank_start.bank_creation_time, + committer, recorder, &payload.sanitized_transactions, - transaction_status_sender, - replay_vote_sender, banking_stage_stats, qos_service, payload.slot_metrics_tracker, @@ -553,14 +552,12 @@ impl BankingStage { Some(retryable_transaction_indexes) } - #[allow(clippy::too_many_arguments)] pub fn consume_buffered_packets( bank_start: &BankStart, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, - transaction_status_sender: &Option, - replay_vote_sender: &ReplayVoteSender, test_fn: Option, banking_stage_stats: &BankingStageStats, + committer: &Committer, recorder: &TransactionRecorder, qos_service: &QosService, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, @@ -579,9 +576,8 @@ impl BankingStage { Self::do_process_packets( bank_start, payload, + committer, recorder, - transaction_status_sender, - replay_vote_sender, banking_stage_stats, qos_service, log_messages_bytes_limit, @@ -624,9 +620,8 @@ impl BankingStage { fn process_buffered_packets( decision_maker: &DecisionMaker, forwarder: &Forwarder, + committer: &Committer, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, - transaction_status_sender: &Option, - replay_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, qos_service: &QosService, @@ -652,10 +647,9 @@ impl BankingStage { Self::consume_buffered_packets( &bank_start, unprocessed_transaction_storage, - transaction_status_sender, - replay_vote_sender, None::>, banking_stage_stats, + committer, recorder, qos_service, slot_metrics_tracker, @@ -699,10 +693,9 @@ impl BankingStage { packet_receiver: &mut PacketReceiver, decision_maker: &DecisionMaker, forwarder: &Forwarder, + committer: &Committer, poh_recorder: &Arc>, id: u32, - transaction_status_sender: Option, - replay_vote_sender: ReplayVoteSender, log_messages_bytes_limit: Option, mut unprocessed_transaction_storage: UnprocessedTransactionStorage, ) { @@ -722,9 +715,8 @@ impl BankingStage { Self::process_buffered_packets( decision_maker, forwarder, + committer, &mut unprocessed_transaction_storage, - &transaction_status_sender, - &replay_vote_sender, &banking_stage_stats, &recorder, &qos_service, @@ -765,12 +757,12 @@ impl BankingStage { fn execute_and_commit_transactions_locked( bank: &Arc, + committer: &Committer, poh: &TransactionRecorder, batch: &TransactionBatch, - transaction_status_sender: &Option, - replay_vote_sender: &ReplayVoteSender, log_messages_bytes_limit: Option, ) -> ExecuteAndCommitTransactionsOutput { + let transaction_status_sender_enabled = committer.transaction_status_sender_enabled(); let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default(); let mut pre_balance_info = PreBalanceInfo::default(); @@ -778,7 +770,7 @@ impl BankingStage { { // If the extra meta-data services are enabled for RPC, collect the // pre-balances for native and token programs. - if transaction_status_sender.is_some() { + if transaction_status_sender_enabled { pre_balance_info.native = bank.collect_balances(batch); pre_balance_info.token = collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals) @@ -792,9 +784,9 @@ impl BankingStage { bank.load_and_execute_transactions( batch, MAX_PROCESSING_AGE, - transaction_status_sender.is_some(), - transaction_status_sender.is_some(), - transaction_status_sender.is_some(), + transaction_status_sender_enabled, + transaction_status_sender_enabled, + transaction_status_sender_enabled, &mut execute_and_commit_timings.execute_timings, None, // account_overrides log_messages_bytes_limit @@ -880,7 +872,7 @@ impl BankingStage { } let (commit_time_us, commit_transaction_statuses) = if executed_transactions_count != 0 { - Committer::commit_transactions( + committer.commit_transactions( batch, &mut loaded_transactions, execution_results, @@ -888,8 +880,6 @@ impl BankingStage { bank, &mut pre_balance_info, &mut execute_and_commit_timings, - transaction_status_sender, - replay_vote_sender, signature_count, executed_transactions_count, executed_non_vote_transactions_count, @@ -937,10 +927,9 @@ impl BankingStage { pub fn process_and_record_transactions( bank: &Arc, txs: &[SanitizedTransaction], + committer: &Committer, poh: &TransactionRecorder, chunk_offset: usize, - transaction_status_sender: &Option, - replay_vote_sender: &ReplayVoteSender, qos_service: &QosService, log_messages_bytes_limit: Option, ) -> ProcessTransactionBatchOutput { @@ -962,10 +951,9 @@ impl BankingStage { let mut execute_and_commit_transactions_output = Self::execute_and_commit_transactions_locked( bank, + committer, poh, &batch, - transaction_status_sender, - replay_vote_sender, log_messages_bytes_limit, ); @@ -1036,9 +1024,8 @@ impl BankingStage { bank: &Arc, bank_creation_time: &Instant, transactions: &[SanitizedTransaction], + committer: &Committer, poh: &TransactionRecorder, - transaction_status_sender: &Option, - replay_vote_sender: &ReplayVoteSender, qos_service: &QosService, log_messages_bytes_limit: Option, ) -> ProcessTransactionsSummary { @@ -1067,10 +1054,9 @@ impl BankingStage { let process_transaction_batch_output = Self::process_and_record_transactions( bank, &transactions[chunk_start..chunk_end], + committer, poh, chunk_start, - transaction_status_sender, - replay_vote_sender, qos_service, log_messages_bytes_limit, ); @@ -1206,14 +1192,12 @@ impl BankingStage { Self::filter_valid_transaction_indexes(&results) } - #[allow(clippy::too_many_arguments)] fn process_packets_transactions<'a>( bank: &'a Arc, bank_creation_time: &Instant, + committer: &'a Committer, poh: &'a TransactionRecorder, sanitized_transactions: &[SanitizedTransaction], - transaction_status_sender: &Option, - replay_vote_sender: &'a ReplayVoteSender, banking_stage_stats: &'a BankingStageStats, qos_service: &'a QosService, slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, @@ -1225,9 +1209,8 @@ impl BankingStage { bank, bank_creation_time, sanitized_transactions, + committer, poh, - transaction_status_sender, - replay_vote_sender, qos_service, log_messages_bytes_limit, ), @@ -1910,14 +1893,14 @@ mod tests { poh_recorder.write().unwrap().set_bank(&bank, false); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let committer = Committer::new(None, replay_vote_sender); let process_transactions_batch_output = BankingStage::process_and_record_transactions( &bank, &transactions, + &committer, &recorder, 0, - &None, - &replay_vote_sender, &QosService::new(1), None, ); @@ -1967,10 +1950,9 @@ mod tests { let process_transactions_batch_output = BankingStage::process_and_record_transactions( &bank, &transactions, + &committer, &recorder, 0, - &None, - &replay_vote_sender, &QosService::new(1), None, ); @@ -2047,14 +2029,14 @@ mod tests { poh_recorder.write().unwrap().set_bank(&bank, false); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let committer = Committer::new(None, replay_vote_sender); let process_transactions_batch_output = BankingStage::process_and_record_transactions( &bank, &transactions, + &committer, &recorder, 0, - &None, - &replay_vote_sender, &QosService::new(1), None, ); @@ -2121,7 +2103,7 @@ mod tests { poh_recorder.write().unwrap().set_bank(&bank, false); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - + let committer = Committer::new(None, replay_vote_sender); let qos_service = QosService::new(1); let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost(); @@ -2143,10 +2125,9 @@ mod tests { let process_transactions_batch_output = BankingStage::process_and_record_transactions( &bank, &transactions, + &committer, &recorder, 0, - &None, - &replay_vote_sender, &qos_service, None, ); @@ -2183,10 +2164,9 @@ mod tests { let process_transactions_batch_output = BankingStage::process_and_record_transactions( &bank, &transactions, + &committer, &recorder, 0, - &None, - &replay_vote_sender, &qos_service, None, ); @@ -2276,14 +2256,14 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let committer = Committer::new(None, replay_vote_sender); let process_transactions_batch_output = BankingStage::process_and_record_transactions( &bank, &transactions, + &committer, &recorder, 0, - &None, - &replay_vote_sender, &QosService::new(1), None, ); @@ -2354,14 +2334,14 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &Arc::new(RwLock::new(poh_recorder))); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let committer = Committer::new(None, replay_vote_sender); let process_transactions_summary = BankingStage::process_transactions( &bank, &Instant::now(), &transactions, + &committer, &recorder, - &None, - &replay_vote_sender, &QosService::new(1), None, ); @@ -2421,14 +2401,14 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let committer = Committer::new(None, replay_vote_sender); let process_transactions_summary = BankingStage::process_transactions( &bank, &Instant::now(), &transactions, + &committer, &recorder, - &None, - &replay_vote_sender, &QosService::new(1), None, ); @@ -2649,20 +2629,24 @@ mod tests { ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let committer = Committer::new( + Some(TransactionStatusSender { + sender: transaction_status_sender, + }), + replay_vote_sender, + ); let _ = BankingStage::process_and_record_transactions( &bank, &transactions, + &committer, &recorder, 0, - &Some(TransactionStatusSender { - sender: transaction_status_sender, - }), - &replay_vote_sender, &QosService::new(1), None, ); + drop(committer); // drop/disconnect transaction_status_sender transaction_status_service.join().unwrap(); let confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap(); @@ -2818,20 +2802,24 @@ mod tests { ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let committer = Committer::new( + Some(TransactionStatusSender { + sender: transaction_status_sender, + }), + replay_vote_sender, + ); let _ = BankingStage::process_and_record_transactions( &bank, &[sanitized_tx.clone()], + &committer, &recorder, 0, - &Some(TransactionStatusSender { - sender: transaction_status_sender, - }), - &replay_vote_sender, &QosService::new(1), None, ); + drop(committer); // drop/disconnect transaction_status_sender transaction_status_service.join().unwrap(); let mut confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap(); @@ -2939,6 +2927,7 @@ mod tests { ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let committer = Committer::new(None, replay_vote_sender); // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) assert!(!poh_recorder.read().unwrap().has_bank()); @@ -2950,10 +2939,9 @@ mod tests { BankingStage::consume_buffered_packets( &bank_start, &mut buffered_packet_batches, - &None, - &replay_vote_sender, None::>, &BankingStageStats::default(), + &committer, &recorder, &QosService::new(1), &mut LeaderSlotMetricsTracker::new(0), @@ -2997,6 +2985,7 @@ mod tests { ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let committer = Committer::new(None, replay_vote_sender); // When the working bank in poh_recorder is None, no packets should be processed assert!(!poh_recorder.read().unwrap().has_bank()); @@ -3008,10 +2997,9 @@ mod tests { BankingStage::consume_buffered_packets( &bank_start, &mut buffered_packet_batches, - &None, - &replay_vote_sender, None::>, &BankingStageStats::default(), + &committer, &recorder, &QosService::new(1), &mut LeaderSlotMetricsTracker::new(0), @@ -3048,6 +3036,8 @@ mod tests { let recorder = poh_recorder_.read().unwrap().recorder(); let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let committer = Committer::new(None, replay_vote_sender); + // Start up thread to process the banks let t_consume = Builder::new() .name("consume-buffered-packets".to_string()) @@ -3070,10 +3060,9 @@ mod tests { BankingStage::consume_buffered_packets( &bank_start, &mut buffered_packet_batches, - &None, - &replay_vote_sender, test_fn, &BankingStageStats::default(), + &committer, &recorder, &QosService::new(1), &mut LeaderSlotMetricsTracker::new(0), diff --git a/core/src/banking_stage/committer.rs b/core/src/banking_stage/committer.rs index 7cbb702d23..4a71ffe509 100644 --- a/core/src/banking_stage/committer.rs +++ b/core/src/banking_stage/committer.rs @@ -26,11 +26,29 @@ pub enum CommitTransactionDetails { NotCommitted, } -pub struct Committer; +pub struct Committer { + transaction_status_sender: Option, + replay_vote_sender: ReplayVoteSender, +} impl Committer { + pub fn new( + transaction_status_sender: Option, + replay_vote_sender: ReplayVoteSender, + ) -> Self { + Self { + transaction_status_sender, + replay_vote_sender, + } + } + + pub(super) fn transaction_status_sender_enabled(&self) -> bool { + self.transaction_status_sender.is_some() + } + #[allow(clippy::too_many_arguments)] pub(super) fn commit_transactions( + &self, batch: &TransactionBatch, loaded_transactions: &mut [TransactionLoadResult], execution_results: Vec, @@ -38,8 +56,6 @@ impl Committer { bank: &Arc, pre_balance_info: &mut PreBalanceInfo, execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, - transaction_status_sender: &Option, - replay_vote_sender: &ReplayVoteSender, signature_count: u64, executed_transactions_count: usize, executed_non_vote_transactions_count: usize, @@ -86,10 +102,9 @@ impl Committer { bank_utils::find_and_send_votes( batch.sanitized_transactions(), &tx_results, - Some(replay_vote_sender), + Some(&self.replay_vote_sender), ); - Self::collect_balances_and_send_status_batch( - transaction_status_sender, + self.collect_balances_and_send_status_batch( tx_results, bank, batch, @@ -102,14 +117,14 @@ impl Committer { } fn collect_balances_and_send_status_batch( - transaction_status_sender: &Option, + &self, tx_results: TransactionResults, bank: &Arc, batch: &TransactionBatch, pre_balance_info: &mut PreBalanceInfo, starting_transaction_index: Option, ) { - if let Some(transaction_status_sender) = transaction_status_sender { + if let Some(transaction_status_sender) = &self.transaction_status_sender { let txs = batch.sanitized_transactions().to_vec(); let post_balances = bank.collect_balances(batch); let post_token_balances =