From c06053f5056b9f5a2bf2c6ff15b8bcf64306e7af Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 1 Feb 2023 09:18:40 -0800 Subject: [PATCH] BankingStage Refactor: Add state to DecisionMaker (#29806) --- core/src/banking_stage.rs | 16 ++++++++-------- core/src/banking_stage/decision_maker.rs | 21 +++++++++++++++------ 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 496ac991e2..ae3ae783b3 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -461,11 +461,15 @@ impl BankingStage { let data_budget = data_budget.clone(); let connection_cache = connection_cache.clone(); let bank_forks = bank_forks.clone(); + + let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); + Builder::new() .name(format!("solBanknStgTx{i:02}")) .spawn(move || { Self::process_loop( &mut packet_deserializer, + &decision_maker, &poh_recorder, &cluster_info, &mut recv_start, @@ -631,7 +635,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] fn process_buffered_packets( - my_pubkey: &Pubkey, + decision_maker: &DecisionMaker, socket: &UdpSocket, poh_recorder: &Arc>, cluster_info: &ClusterInfo, @@ -652,11 +656,7 @@ impl BankingStage { return; } let ((metrics_action, decision), make_decision_time) = - measure!(DecisionMaker::make_consume_or_forward_decision( - my_pubkey, - poh_recorder, - slot_metrics_tracker - )); + measure!(decision_maker.make_consume_or_forward_decision(slot_metrics_tracker)); slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us()); match decision { @@ -734,6 +734,7 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] fn process_loop( packet_deserializer: &mut PacketDeserializer, + decision_maker: &DecisionMaker, poh_recorder: &Arc>, cluster_info: &ClusterInfo, recv_start: &mut Instant, @@ -756,13 +757,12 @@ impl BankingStage { let mut last_metrics_update = Instant::now(); loop { - let my_pubkey = cluster_info.id(); if !unprocessed_transaction_storage.is_empty() || last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD { let (_, process_buffered_packets_time) = measure!( Self::process_buffered_packets( - &my_pubkey, + decision_maker, &socket, poh_recorder, cluster_info, diff --git a/core/src/banking_stage/decision_maker.rs b/core/src/banking_stage/decision_maker.rs index 6c7dd67247..819b7618ae 100644 --- a/core/src/banking_stage/decision_maker.rs +++ b/core/src/banking_stage/decision_maker.rs @@ -8,7 +8,7 @@ use { }, pubkey::Pubkey, }, - std::sync::RwLock, + std::sync::{Arc, RwLock}, }; #[derive(Debug, Clone)] @@ -19,16 +19,25 @@ pub enum BufferedPacketsDecision { Hold, } -pub struct DecisionMaker; +pub struct DecisionMaker { + my_pubkey: Pubkey, + poh_recorder: Arc>, +} impl DecisionMaker { + pub fn new(my_pubkey: Pubkey, poh_recorder: Arc>) -> Self { + Self { + my_pubkey, + poh_recorder, + } + } + pub(crate) fn make_consume_or_forward_decision( - my_pubkey: &Pubkey, - poh_recorder: &RwLock, + &self, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> (MetricsTrackerAction, BufferedPacketsDecision) { let (leader_at_slot_offset, bank_start, would_be_leader, would_be_leader_shortly) = { - let poh = poh_recorder.read().unwrap(); + let poh = self.poh_recorder.read().unwrap(); let bank_start = poh .bank_start() .filter(|bank_start| bank_start.should_working_bank_still_be_processing_txs()); @@ -45,7 +54,7 @@ impl DecisionMaker { ( slot_metrics_tracker.check_leader_slot_boundary(&bank_start), Self::consume_or_forward_packets( - my_pubkey, + &self.my_pubkey, leader_at_slot_offset, bank_start, would_be_leader,