BankingStage Refactor: Add state to DecisionMaker (#29806)
This commit is contained in:
parent
d944c657a2
commit
c06053f505
|
@ -461,11 +461,15 @@ impl BankingStage {
|
||||||
let data_budget = data_budget.clone();
|
let data_budget = data_budget.clone();
|
||||||
let connection_cache = connection_cache.clone();
|
let connection_cache = connection_cache.clone();
|
||||||
let bank_forks = bank_forks.clone();
|
let bank_forks = bank_forks.clone();
|
||||||
|
|
||||||
|
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
|
||||||
|
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name(format!("solBanknStgTx{i:02}"))
|
.name(format!("solBanknStgTx{i:02}"))
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
Self::process_loop(
|
Self::process_loop(
|
||||||
&mut packet_deserializer,
|
&mut packet_deserializer,
|
||||||
|
&decision_maker,
|
||||||
&poh_recorder,
|
&poh_recorder,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut recv_start,
|
&mut recv_start,
|
||||||
|
@ -631,7 +635,7 @@ impl BankingStage {
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn process_buffered_packets(
|
fn process_buffered_packets(
|
||||||
my_pubkey: &Pubkey,
|
decision_maker: &DecisionMaker,
|
||||||
socket: &UdpSocket,
|
socket: &UdpSocket,
|
||||||
poh_recorder: &Arc<RwLock<PohRecorder>>,
|
poh_recorder: &Arc<RwLock<PohRecorder>>,
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
|
@ -652,11 +656,7 @@ impl BankingStage {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let ((metrics_action, decision), make_decision_time) =
|
let ((metrics_action, decision), make_decision_time) =
|
||||||
measure!(DecisionMaker::make_consume_or_forward_decision(
|
measure!(decision_maker.make_consume_or_forward_decision(slot_metrics_tracker));
|
||||||
my_pubkey,
|
|
||||||
poh_recorder,
|
|
||||||
slot_metrics_tracker
|
|
||||||
));
|
|
||||||
slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us());
|
slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us());
|
||||||
|
|
||||||
match decision {
|
match decision {
|
||||||
|
@ -734,6 +734,7 @@ impl BankingStage {
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn process_loop(
|
fn process_loop(
|
||||||
packet_deserializer: &mut PacketDeserializer,
|
packet_deserializer: &mut PacketDeserializer,
|
||||||
|
decision_maker: &DecisionMaker,
|
||||||
poh_recorder: &Arc<RwLock<PohRecorder>>,
|
poh_recorder: &Arc<RwLock<PohRecorder>>,
|
||||||
cluster_info: &ClusterInfo,
|
cluster_info: &ClusterInfo,
|
||||||
recv_start: &mut Instant,
|
recv_start: &mut Instant,
|
||||||
|
@ -756,13 +757,12 @@ impl BankingStage {
|
||||||
let mut last_metrics_update = Instant::now();
|
let mut last_metrics_update = Instant::now();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let my_pubkey = cluster_info.id();
|
|
||||||
if !unprocessed_transaction_storage.is_empty()
|
if !unprocessed_transaction_storage.is_empty()
|
||||||
|| last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD
|
|| last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD
|
||||||
{
|
{
|
||||||
let (_, process_buffered_packets_time) = measure!(
|
let (_, process_buffered_packets_time) = measure!(
|
||||||
Self::process_buffered_packets(
|
Self::process_buffered_packets(
|
||||||
&my_pubkey,
|
decision_maker,
|
||||||
&socket,
|
&socket,
|
||||||
poh_recorder,
|
poh_recorder,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
|
|
|
@ -8,7 +8,7 @@ use {
|
||||||
},
|
},
|
||||||
pubkey::Pubkey,
|
pubkey::Pubkey,
|
||||||
},
|
},
|
||||||
std::sync::RwLock,
|
std::sync::{Arc, RwLock},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
@ -19,16 +19,25 @@ pub enum BufferedPacketsDecision {
|
||||||
Hold,
|
Hold,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct DecisionMaker;
|
pub struct DecisionMaker {
|
||||||
|
my_pubkey: Pubkey,
|
||||||
|
poh_recorder: Arc<RwLock<PohRecorder>>,
|
||||||
|
}
|
||||||
|
|
||||||
impl DecisionMaker {
|
impl DecisionMaker {
|
||||||
|
pub fn new(my_pubkey: Pubkey, poh_recorder: Arc<RwLock<PohRecorder>>) -> Self {
|
||||||
|
Self {
|
||||||
|
my_pubkey,
|
||||||
|
poh_recorder,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn make_consume_or_forward_decision(
|
pub(crate) fn make_consume_or_forward_decision(
|
||||||
my_pubkey: &Pubkey,
|
&self,
|
||||||
poh_recorder: &RwLock<PohRecorder>,
|
|
||||||
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
) -> (MetricsTrackerAction, BufferedPacketsDecision) {
|
) -> (MetricsTrackerAction, BufferedPacketsDecision) {
|
||||||
let (leader_at_slot_offset, bank_start, would_be_leader, would_be_leader_shortly) = {
|
let (leader_at_slot_offset, bank_start, would_be_leader, would_be_leader_shortly) = {
|
||||||
let poh = poh_recorder.read().unwrap();
|
let poh = self.poh_recorder.read().unwrap();
|
||||||
let bank_start = poh
|
let bank_start = poh
|
||||||
.bank_start()
|
.bank_start()
|
||||||
.filter(|bank_start| bank_start.should_working_bank_still_be_processing_txs());
|
.filter(|bank_start| bank_start.should_working_bank_still_be_processing_txs());
|
||||||
|
@ -45,7 +54,7 @@ impl DecisionMaker {
|
||||||
(
|
(
|
||||||
slot_metrics_tracker.check_leader_slot_boundary(&bank_start),
|
slot_metrics_tracker.check_leader_slot_boundary(&bank_start),
|
||||||
Self::consume_or_forward_packets(
|
Self::consume_or_forward_packets(
|
||||||
my_pubkey,
|
&self.my_pubkey,
|
||||||
leader_at_slot_offset,
|
leader_at_slot_offset,
|
||||||
bank_start,
|
bank_start,
|
||||||
would_be_leader,
|
would_be_leader,
|
||||||
|
|
Loading…
Reference in New Issue