From bba0ed702f917e350fd29ffa865f8236767e7857 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Mon, 6 Mar 2023 09:13:28 -0800 Subject: [PATCH] BankingStage Refactor: Consumer State (#30288) * BankingStage Refactor: Consumer add state * remove trailing comma --- core/benches/banking_stage.rs | 8 +- core/src/banking_stage.rs | 36 ++-- core/src/banking_stage/consumer.rs | 274 +++++++++-------------------- 3 files changed, 102 insertions(+), 216 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index e211679e45..668110d783 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -94,19 +94,15 @@ fn bench_consume_buffered(bencher: &mut Bencher) { ); let (s, _r) = unbounded(); let committer = Committer::new(None, s); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None); // This tests the performance of buffering packets. // If the packet buffers are copied, performance will be poor. bencher.iter(move || { - Consumer::consume_buffered_packets( + consumer.consume_buffered_packets( &bank_start, &mut transaction_buffer, - None::>, &BankingStageStats::default(), - &committer, - &recorder, - &QosService::new(1), &mut LeaderSlotMetricsTracker::new(0), - None, ); }); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 8cacd2c8af..1aee3cf9e3 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -26,7 +26,7 @@ use { solana_ledger::blockstore_processor::TransactionStatusSender, solana_measure::{measure, measure_us}, solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH}, - solana_poh::poh_recorder::{PohRecorder, TransactionRecorder}, + solana_poh::poh_recorder::PohRecorder, solana_runtime::{bank_forks::BankForks, vote_sender_types::ReplayVoteSender}, solana_sdk::{feature_set::allow_votes_to_directly_update_vote_state, timing::AtomicInterval}, std::{ @@ -394,6 +394,13 @@ impl BankingStage { connection_cache.clone(), data_budget.clone(), ); + let consumer = Consumer::new( + committer, + poh_recorder.read().unwrap().recorder(), + QosService::new(id), + log_messages_bytes_limit, + None, + ); Builder::new() .name(format!("solBanknStgTx{id:02}")) @@ -402,10 +409,8 @@ impl BankingStage { &mut packet_receiver, &decision_maker, &forwarder, - &committer, - &poh_recorder, + &consumer, id, - log_messages_bytes_limit, unprocessed_transaction_storage, ); }) @@ -419,13 +424,10 @@ impl BankingStage { fn process_buffered_packets( decision_maker: &DecisionMaker, forwarder: &Forwarder, - committer: &Committer, + consumer: &Consumer, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &BankingStageStats, - recorder: &TransactionRecorder, - qos_service: &QosService, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - log_messages_bytes_limit: Option, tracer_packet_stats: &mut TracerPacketStats, ) { if unprocessed_transaction_storage.should_not_process() { @@ -443,16 +445,11 @@ impl BankingStage { // of the previous slot slot_metrics_tracker.apply_action(metrics_action); let (_, consume_buffered_packets_time) = measure!( - Consumer::consume_buffered_packets( + consumer.consume_buffered_packets( &bank_start, unprocessed_transaction_storage, - None::>, banking_stage_stats, - committer, - recorder, - qos_service, slot_metrics_tracker, - log_messages_bytes_limit ), "consume_buffered_packets", ); @@ -492,16 +489,12 @@ impl BankingStage { packet_receiver: &mut PacketReceiver, decision_maker: &DecisionMaker, forwarder: &Forwarder, - committer: &Committer, - poh_recorder: &Arc>, + consumer: &Consumer, id: u32, - log_messages_bytes_limit: Option, mut unprocessed_transaction_storage: UnprocessedTransactionStorage, ) { - let recorder = poh_recorder.read().unwrap().recorder(); let mut banking_stage_stats = BankingStageStats::new(id); let mut tracer_packet_stats = TracerPacketStats::new(id); - let qos_service = QosService::new(id); let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); let mut last_metrics_update = Instant::now(); @@ -514,13 +507,10 @@ impl BankingStage { Self::process_buffered_packets( decision_maker, forwarder, - committer, + consumer, &mut unprocessed_transaction_storage, &banking_stage_stats, - &recorder, - &qos_service, &mut slot_metrics_tracker, - log_messages_bytes_limit, &mut tracer_packet_stats, ), "process_buffered_packets", diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 5fc084800e..8dee3c0658 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -65,19 +65,37 @@ pub struct ExecuteAndCommitTransactionsOutput { error_counters: TransactionErrorMetrics, } -pub struct Consumer; +pub struct Consumer { + committer: Committer, + transaction_recorder: TransactionRecorder, + qos_service: QosService, + log_messages_bytes_limit: Option, + test_fn: Option>, +} impl Consumer { + pub fn new( + committer: Committer, + transaction_recorder: TransactionRecorder, + qos_service: QosService, + log_messages_bytes_limit: Option, + test_fn: Option>, + ) -> Self { + Self { + committer, + transaction_recorder, + qos_service, + log_messages_bytes_limit, + test_fn, + } + } + pub fn consume_buffered_packets( + &self, bank_start: &BankStart, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, - test_fn: Option, banking_stage_stats: &BankingStageStats, - committer: &Committer, - transaction_recorder: &TransactionRecorder, - qos_service: &QosService, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - log_messages_bytes_limit: Option, ) { let mut rebuffered_packet_count = 0; let mut consumed_buffered_packets_count = 0; @@ -89,17 +107,12 @@ impl Consumer { banking_stage_stats, slot_metrics_tracker, |packets_to_process, payload| { - Self::do_process_packets( + self.do_process_packets( bank_start, payload, - committer, - transaction_recorder, banking_stage_stats, - qos_service, - log_messages_bytes_limit, &mut consumed_buffered_packets_count, &mut rebuffered_packet_count, - &test_fn, packets_to_process, ) }, @@ -132,18 +145,13 @@ impl Consumer { .fetch_add(consumed_buffered_packets_count, Ordering::Relaxed); } - #[allow(clippy::too_many_arguments)] fn do_process_packets( + &self, bank_start: &BankStart, payload: &mut ConsumeScannerPayload, - committer: &Committer, - transaction_recorder: &TransactionRecorder, banking_stage_stats: &BankingStageStats, - qos_service: &QosService, - log_messages_bytes_limit: Option, consumed_buffered_packets_count: &mut usize, rebuffered_packet_count: &mut usize, - test_fn: &Option, packets_to_process: &Vec>, ) -> Option> { if payload.reached_end_of_slot { @@ -151,17 +159,13 @@ impl Consumer { } let packets_to_process_len = packets_to_process.len(); - let (process_transactions_summary, process_packets_transactions_us) = - measure_us!(Self::process_packets_transactions( + let (process_transactions_summary, process_packets_transactions_us) = measure_us!(self + .process_packets_transactions( &bank_start.working_bank, &bank_start.bank_creation_time, - committer, - transaction_recorder, &payload.sanitized_transactions, banking_stage_stats, - qos_service, payload.slot_metrics_tracker, - log_messages_bytes_limit )); payload .slot_metrics_tracker @@ -194,7 +198,7 @@ impl Consumer { // Out of the buffered packets just retried, collect any still unprocessed // transactions in this batch for forwarding *rebuffered_packet_count += retryable_transaction_indexes.len(); - if let Some(test_fn) = test_fn { + if let Some(test_fn) = &self.test_fn { test_fn(); } @@ -206,27 +210,16 @@ impl Consumer { } fn process_packets_transactions( + &self, bank: &Arc, bank_creation_time: &Instant, - committer: &Committer, - transaction_recorder: &TransactionRecorder, sanitized_transactions: &[SanitizedTransaction], banking_stage_stats: &BankingStageStats, - qos_service: &QosService, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - log_messages_bytes_limit: Option, ) -> ProcessTransactionsSummary { - // Process transactions - let (mut process_transactions_summary, process_transactions_us) = - measure_us!(Self::process_transactions( - bank, - bank_creation_time, - sanitized_transactions, - committer, - transaction_recorder, - qos_service, - log_messages_bytes_limit, - )); + let (mut process_transactions_summary, process_transactions_us) = measure_us!( + self.process_transactions(bank, bank_creation_time, sanitized_transactions) + ); slot_metrics_tracker.increment_process_transactions_us(process_transactions_us); banking_stage_stats .transaction_processing_elapsed @@ -277,13 +270,10 @@ impl Consumer { /// Returns the number of transactions successfully processed by the bank, which may be less /// than the total number if max PoH height was reached and the bank halted fn process_transactions( + &self, bank: &Arc, bank_creation_time: &Instant, transactions: &[SanitizedTransaction], - committer: &Committer, - transaction_recorder: &TransactionRecorder, - qos_service: &QosService, - log_messages_bytes_limit: Option, ) -> ProcessTransactionsSummary { let mut chunk_start = 0; let mut all_retryable_tx_indexes = vec![]; @@ -307,14 +297,10 @@ impl Consumer { transactions.len(), chunk_start + MAX_NUM_TRANSACTIONS_PER_BATCH, ); - let process_transaction_batch_output = Self::process_and_record_transactions( + let process_transaction_batch_output = self.process_and_record_transactions( bank, &transactions[chunk_start..chunk_end], - committer, - transaction_recorder, chunk_start, - qos_service, - log_messages_bytes_limit, ); let ProcessTransactionBatchOutput { @@ -409,18 +395,17 @@ impl Consumer { } pub fn process_and_record_transactions( + &self, bank: &Arc, txs: &[SanitizedTransaction], - committer: &Committer, - transaction_recorder: &TransactionRecorder, chunk_offset: usize, - qos_service: &QosService, - log_messages_bytes_limit: Option, ) -> ProcessTransactionBatchOutput { let ( (transaction_costs, transactions_qos_results, cost_model_throttled_transactions_count), cost_model_us, - ) = measure_us!(qos_service.select_and_accumulate_transaction_costs(bank, txs)); + ) = measure_us!(self + .qos_service + .select_and_accumulate_transaction_costs(bank, txs)); // Only lock accounts for those transactions are selected for the block; // Once accounts are locked, other threads cannot encode transactions that will modify the @@ -433,13 +418,7 @@ impl Consumer { // WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit // and WouldExceedMaxAccountDataCostLimit let mut execute_and_commit_transactions_output = - Self::execute_and_commit_transactions_locked( - bank, - committer, - transaction_recorder, - &batch, - log_messages_bytes_limit, - ); + self.execute_and_commit_transactions_locked(bank, &batch); // Once the accounts are new transactions can enter the pipeline to process them let (_, unlock_us) = measure_us!(drop(batch)); @@ -464,11 +443,11 @@ impl Consumer { let (cu, us) = Self::accumulate_execute_units_and_time(&execute_and_commit_timings.execute_timings); - qos_service.accumulate_actual_execute_cu(cu); - qos_service.accumulate_actual_execute_time(us); + self.qos_service.accumulate_actual_execute_cu(cu); + self.qos_service.accumulate_actual_execute_time(us); // reports qos service stats for this batch - qos_service.report_metrics(bank.clone()); + self.qos_service.report_metrics(bank.clone()); debug!( "bank: {} lock: {}us unlock: {}us txs_len: {}", @@ -486,13 +465,11 @@ impl Consumer { } fn execute_and_commit_transactions_locked( + &self, bank: &Arc, - committer: &Committer, - transaction_recorder: &TransactionRecorder, batch: &TransactionBatch, - log_messages_bytes_limit: Option, ) -> ExecuteAndCommitTransactionsOutput { - let transaction_status_sender_enabled = committer.transaction_status_sender_enabled(); + let transaction_status_sender_enabled = self.committer.transaction_status_sender_enabled(); let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default(); let mut pre_balance_info = PreBalanceInfo::default(); @@ -516,7 +493,7 @@ impl Consumer { transaction_status_sender_enabled, &mut execute_and_commit_timings.execute_timings, None, // account_overrides - log_messages_bytes_limit + self.log_messages_bytes_limit )); execute_and_commit_timings.load_execute_us = load_execute_us; @@ -556,9 +533,9 @@ impl Consumer { executed_transactions_count ); } - let (record_transactions_summary, record_us) = measure_us!( - transaction_recorder.record_transactions(bank.slot(), executed_transactions) - ); + let (record_transactions_summary, record_us) = measure_us!(self + .transaction_recorder + .record_transactions(bank.slot(), executed_transactions)); execute_and_commit_timings.record_us = record_us; let RecordTransactionsSummary { @@ -594,7 +571,7 @@ impl Consumer { } let (commit_time_us, commit_transaction_statuses) = if executed_transactions_count != 0 { - committer.commit_transactions( + self.committer.commit_transactions( batch, &mut loaded_transactions, execution_results, @@ -781,16 +758,9 @@ mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new(None, replay_vote_sender); - - let process_transactions_summary = Consumer::process_transactions( - &bank, - &Instant::now(), - &transactions, - &committer, - &recorder, - &QosService::new(1), - None, - ); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None); + let process_transactions_summary = + consumer.process_transactions(&bank, &Instant::now(), &transactions); poh_recorder .read() @@ -928,16 +898,10 @@ mod tests { poh_recorder.write().unwrap().set_bank(&bank, false); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new(None, replay_vote_sender); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None); - let process_transactions_batch_output = Consumer::process_and_record_transactions( - &bank, - &transactions, - &committer, - &recorder, - 0, - &QosService::new(1), - None, - ); + let process_transactions_batch_output = + consumer.process_and_record_transactions(&bank, &transactions, 0); let ExecuteAndCommitTransactionsOutput { transactions_attempted_execution_count, @@ -981,15 +945,8 @@ mod tests { genesis_config.hash(), )]); - let process_transactions_batch_output = Consumer::process_and_record_transactions( - &bank, - &transactions, - &committer, - &recorder, - 0, - &QosService::new(1), - None, - ); + let process_transactions_batch_output = + consumer.process_and_record_transactions(&bank, &transactions, 0); let ExecuteAndCommitTransactionsOutput { transactions_attempted_execution_count, @@ -1064,16 +1021,10 @@ mod tests { poh_recorder.write().unwrap().set_bank(&bank, false); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new(None, replay_vote_sender); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None); - let process_transactions_batch_output = Consumer::process_and_record_transactions( - &bank, - &transactions, - &committer, - &recorder, - 0, - &QosService::new(1), - None, - ); + let process_transactions_batch_output = + consumer.process_and_record_transactions(&bank, &transactions, 0); let ExecuteAndCommitTransactionsOutput { transactions_attempted_execution_count, @@ -1138,7 +1089,7 @@ mod tests { poh_recorder.write().unwrap().set_bank(&bank, false); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new(None, replay_vote_sender); - let qos_service = QosService::new(1); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None); let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost(); let get_tx_count = || bank.read_cost_tracker().unwrap().transaction_count(); @@ -1156,15 +1107,8 @@ mod tests { genesis_config.hash(), )]); - let process_transactions_batch_output = Consumer::process_and_record_transactions( - &bank, - &transactions, - &committer, - &recorder, - 0, - &qos_service, - None, - ); + let process_transactions_batch_output = + consumer.process_and_record_transactions(&bank, &transactions, 0); let ExecuteAndCommitTransactionsOutput { executed_with_successful_result_count, @@ -1195,15 +1139,8 @@ mod tests { ), ]); - let process_transactions_batch_output = Consumer::process_and_record_transactions( - &bank, - &transactions, - &committer, - &recorder, - 0, - &qos_service, - None, - ); + let process_transactions_batch_output = + consumer.process_and_record_transactions(&bank, &transactions, 0); let ExecuteAndCommitTransactionsOutput { executed_with_successful_result_count, @@ -1270,16 +1207,10 @@ mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new(None, replay_vote_sender); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None); - let process_transactions_batch_output = Consumer::process_and_record_transactions( - &bank, - &transactions, - &committer, - &recorder, - 0, - &QosService::new(1), - None, - ); + let process_transactions_batch_output = + consumer.process_and_record_transactions(&bank, &transactions, 0); poh_recorder .read() @@ -1469,16 +1400,11 @@ mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new(None, replay_vote_sender); + let consumer = + Consumer::new(committer, recorder.clone(), QosService::new(1), None, None); - let process_transactions_summary = Consumer::process_transactions( - &bank, - &Instant::now(), - &transactions, - &committer, - &recorder, - &QosService::new(1), - None, - ); + let process_transactions_summary = + consumer.process_transactions(&bank, &Instant::now(), &transactions); let ProcessTransactionsSummary { reached_max_poh_height, @@ -1599,18 +1525,11 @@ mod tests { }), replay_vote_sender, ); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None); - let _ = Consumer::process_and_record_transactions( - &bank, - &transactions, - &committer, - &recorder, - 0, - &QosService::new(1), - None, - ); + let _ = consumer.process_and_record_transactions(&bank, &transactions, 0); - drop(committer); // drop/disconnect transaction_status_sender + drop(consumer); // drop/disconnect transaction_status_sender transaction_status_service.join().unwrap(); let confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap(); @@ -1743,18 +1662,11 @@ mod tests { }), replay_vote_sender, ); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None); - let _ = Consumer::process_and_record_transactions( - &bank, - &[sanitized_tx.clone()], - &committer, - &recorder, - 0, - &QosService::new(1), - None, - ); + let _ = consumer.process_and_record_transactions(&bank, &[sanitized_tx.clone()], 0); - drop(committer); // drop/disconnect transaction_status_sender + drop(consumer); // drop/disconnect transaction_status_sender transaction_status_service.join().unwrap(); let mut confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap(); @@ -1808,6 +1720,7 @@ mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new(None, replay_vote_sender); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None); // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) assert!(!poh_recorder.read().unwrap().has_bank()); @@ -1816,16 +1729,11 @@ mod tests { // Multi-Iterator will process them 1-by-1 if all txs are conflicting. poh_recorder.write().unwrap().set_bank(&bank, false); let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - Consumer::consume_buffered_packets( + consumer.consume_buffered_packets( &bank_start, &mut buffered_packet_batches, - None::>, &BankingStageStats::default(), - &committer, - &recorder, - &QosService::new(1), &mut LeaderSlotMetricsTracker::new(0), - None, ); assert!(buffered_packet_batches.is_empty()); poh_recorder @@ -1866,6 +1774,7 @@ mod tests { let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new(None, replay_vote_sender); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None); // When the working bank in poh_recorder is None, no packets should be processed assert!(!poh_recorder.read().unwrap().has_bank()); @@ -1874,16 +1783,11 @@ mod tests { // Multi-Iterator will process them 1-by-1 if all txs are conflicting. poh_recorder.write().unwrap().set_bank(&bank, false); let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - Consumer::consume_buffered_packets( + consumer.consume_buffered_packets( &bank_start, &mut buffered_packet_batches, - None::>, &BankingStageStats::default(), - &committer, - &recorder, - &QosService::new(1), &mut LeaderSlotMetricsTracker::new(0), - None, ); assert!(buffered_packet_batches.is_empty()); poh_recorder @@ -1905,10 +1809,10 @@ mod tests { let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = setup_conflicting_transactions(ledger_path.path()); - let test_fn = Some(move || { + let test_fn: Option> = Some(Box::new(move || { finished_packet_sender.send(()).unwrap(); continue_receiver.recv().unwrap(); - }); + })); // When the poh recorder has a bank, it should process all buffered packets. let num_conflicting_transactions = transactions.len(); poh_recorder.write().unwrap().set_bank(&bank, false); @@ -1917,6 +1821,7 @@ mod tests { let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new(None, replay_vote_sender); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, test_fn); // Start up thread to process the banks let t_consume = Builder::new() @@ -1937,16 +1842,11 @@ mod tests { ), ThreadType::Transactions, ); - Consumer::consume_buffered_packets( + consumer.consume_buffered_packets( &bank_start, &mut buffered_packet_batches, - test_fn, &BankingStageStats::default(), - &committer, - &recorder, - &QosService::new(1), &mut LeaderSlotMetricsTracker::new(0), - None, ); // Check everything is correct. All valid packets should be processed.