BankingStage Refactor: Move decision making functions to new module (#29788)

Move decision making functions to new module
This commit is contained in:
apfitzge 2023-01-20 10:10:47 -08:00 committed by GitHub
parent 5fc83a3d19
commit 8c793da7d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 203 additions and 176 deletions

View File

@ -3,14 +3,15 @@
//! can do its processing in parallel with signature verification on the GPU.
use {
self::packet_receiver::PacketReceiver,
self::{
decision_maker::{BufferedPacketsDecision, DecisionMaker},
packet_receiver::PacketReceiver,
},
crate::{
forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
immutable_deserialized_packet::ImmutableDeserializedPacket,
latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource},
leader_slot_banking_stage_metrics::{
LeaderSlotMetricsTracker, MetricsTrackerAction, ProcessTransactionsSummary,
},
leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary},
leader_slot_banking_stage_timing_metrics::{
LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
},
@ -58,10 +59,7 @@ use {
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{
clock::{
Slot, DEFAULT_TICKS_PER_SLOT, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET,
HOLD_TRANSACTIONS_SLOT_OFFSET, MAX_PROCESSING_AGE,
},
clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE},
feature_set::allow_votes_to_directly_update_vote_state,
pubkey::Pubkey,
saturating_add_assign,
@ -87,6 +85,7 @@ use {
},
};
mod decision_maker;
mod packet_receiver;
// Fixed thread size seems to be fastest on GCP setup
@ -357,14 +356,6 @@ pub struct BankingStage {
bank_thread_hdls: Vec<JoinHandle<()>>,
}
#[derive(Debug, Clone)]
pub enum BufferedPacketsDecision {
Consume(BankStart),
Forward,
ForwardAndHold,
Hold,
}
#[derive(Debug, Clone)]
pub enum ForwardOption {
NotForward,
@ -756,71 +747,6 @@ impl BankingStage {
.fetch_add(consumed_buffered_packets_count, Ordering::Relaxed);
}
fn consume_or_forward_packets(
my_pubkey: &Pubkey,
leader_pubkey: Option<Pubkey>,
bank_start: Option<BankStart>,
would_be_leader: bool,
would_be_leader_shortly: bool,
) -> BufferedPacketsDecision {
// If has active bank, then immediately process buffered packets
// otherwise, based on leader schedule to either forward or hold packets
if let Some(bank_start) = bank_start {
// If the bank is available, this node is the leader
BufferedPacketsDecision::Consume(bank_start)
} else if would_be_leader_shortly {
// If the node will be the leader soon, hold the packets for now
BufferedPacketsDecision::Hold
} else if would_be_leader {
// Node will be leader within ~20 slots, hold the transactions in
// case it is the only node which produces an accepted slot.
BufferedPacketsDecision::ForwardAndHold
} else if let Some(x) = leader_pubkey {
if x != *my_pubkey {
// If the current node is not the leader, forward the buffered packets
BufferedPacketsDecision::Forward
} else {
// If the current node is the leader, return the buffered packets as is
BufferedPacketsDecision::Hold
}
} else {
// We don't know the leader. Hold the packets for now
BufferedPacketsDecision::Hold
}
}
fn make_consume_or_forward_decision(
my_pubkey: &Pubkey,
poh_recorder: &RwLock<PohRecorder>,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> (MetricsTrackerAction, BufferedPacketsDecision) {
let (leader_at_slot_offset, bank_start, would_be_leader, would_be_leader_shortly) = {
let poh = poh_recorder.read().unwrap();
let bank_start = poh
.bank_start()
.filter(|bank_start| bank_start.should_working_bank_still_be_processing_txs());
(
poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET),
bank_start,
poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT),
poh.would_be_leader(
(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT,
),
)
};
(
slot_metrics_tracker.check_leader_slot_boundary(&bank_start),
Self::consume_or_forward_packets(
my_pubkey,
leader_at_slot_offset,
bank_start,
would_be_leader,
would_be_leader_shortly,
),
)
}
#[allow(clippy::too_many_arguments)]
fn process_buffered_packets(
my_pubkey: &Pubkey,
@ -843,9 +769,12 @@ impl BankingStage {
if unprocessed_transaction_storage.should_not_process() {
return;
}
let ((metrics_action, decision), make_decision_time) = measure!(
Self::make_consume_or_forward_decision(my_pubkey, poh_recorder, slot_metrics_tracker)
);
let ((metrics_action, decision), make_decision_time) =
measure!(DecisionMaker::make_consume_or_forward_decision(
my_pubkey,
poh_recorder,
slot_metrics_tracker
));
slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us());
match decision {
@ -2354,98 +2283,6 @@ mod tests {
);
}
#[test]
fn test_should_process_or_forward_packets() {
let my_pubkey = solana_sdk::pubkey::new_rand();
let my_pubkey1 = solana_sdk::pubkey::new_rand();
let bank = Arc::new(Bank::default_for_tests());
let bank_start = Some(BankStart {
working_bank: bank,
bank_creation_time: Arc::new(Instant::now()),
});
// having active bank allows to consume immediately
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
None,
bank_start.clone(),
false,
false
),
BufferedPacketsDecision::Consume(_)
);
assert_matches!(
BankingStage::consume_or_forward_packets(&my_pubkey, None, None, false, false),
BufferedPacketsDecision::Hold
);
assert_matches!(
BankingStage::consume_or_forward_packets(&my_pubkey1, None, None, false, false),
BufferedPacketsDecision::Hold
);
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
None,
false,
false
),
BufferedPacketsDecision::Forward
);
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
None,
true,
true
),
BufferedPacketsDecision::Hold
);
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
None,
true,
false
),
BufferedPacketsDecision::ForwardAndHold
);
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
bank_start.clone(),
false,
false
),
BufferedPacketsDecision::Consume(_)
);
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey1,
Some(my_pubkey1),
None,
false,
false
),
BufferedPacketsDecision::Hold
);
assert_matches!(
BankingStage::consume_or_forward_packets(
&my_pubkey1,
Some(my_pubkey1),
bank_start,
false,
false
),
BufferedPacketsDecision::Consume(_)
);
}
fn create_slow_genesis_config(lamports: u64) -> GenesisConfigInfo {
let mut config_info = create_genesis_config(lamports);
// For these tests there's only 1 slot, don't want to run out of ticks

View File

@ -0,0 +1,190 @@
use {
crate::leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, MetricsTrackerAction},
solana_poh::poh_recorder::{BankStart, PohRecorder},
solana_sdk::{
clock::{
DEFAULT_TICKS_PER_SLOT, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET,
HOLD_TRANSACTIONS_SLOT_OFFSET,
},
pubkey::Pubkey,
},
std::sync::RwLock,
};
#[derive(Debug, Clone)]
pub enum BufferedPacketsDecision {
Consume(BankStart),
Forward,
ForwardAndHold,
Hold,
}
pub struct DecisionMaker;
impl DecisionMaker {
pub(crate) fn make_consume_or_forward_decision(
my_pubkey: &Pubkey,
poh_recorder: &RwLock<PohRecorder>,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> (MetricsTrackerAction, BufferedPacketsDecision) {
let (leader_at_slot_offset, bank_start, would_be_leader, would_be_leader_shortly) = {
let poh = poh_recorder.read().unwrap();
let bank_start = poh
.bank_start()
.filter(|bank_start| bank_start.should_working_bank_still_be_processing_txs());
(
poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET),
bank_start,
poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT),
poh.would_be_leader(
(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT,
),
)
};
(
slot_metrics_tracker.check_leader_slot_boundary(&bank_start),
Self::consume_or_forward_packets(
my_pubkey,
leader_at_slot_offset,
bank_start,
would_be_leader,
would_be_leader_shortly,
),
)
}
fn consume_or_forward_packets(
my_pubkey: &Pubkey,
leader_pubkey: Option<Pubkey>,
bank_start: Option<BankStart>,
would_be_leader: bool,
would_be_leader_shortly: bool,
) -> BufferedPacketsDecision {
// If has active bank, then immediately process buffered packets
// otherwise, based on leader schedule to either forward or hold packets
if let Some(bank_start) = bank_start {
// If the bank is available, this node is the leader
BufferedPacketsDecision::Consume(bank_start)
} else if would_be_leader_shortly {
// If the node will be the leader soon, hold the packets for now
BufferedPacketsDecision::Hold
} else if would_be_leader {
// Node will be leader within ~20 slots, hold the transactions in
// case it is the only node which produces an accepted slot.
BufferedPacketsDecision::ForwardAndHold
} else if let Some(x) = leader_pubkey {
if x != *my_pubkey {
// If the current node is not the leader, forward the buffered packets
BufferedPacketsDecision::Forward
} else {
// If the current node is the leader, return the buffered packets as is
BufferedPacketsDecision::Hold
}
} else {
// We don't know the leader. Hold the packets for now
BufferedPacketsDecision::Hold
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
solana_runtime::bank::Bank,
std::{sync::Arc, time::Instant},
};
#[test]
fn test_should_process_or_forward_packets() {
let my_pubkey = solana_sdk::pubkey::new_rand();
let my_pubkey1 = solana_sdk::pubkey::new_rand();
let bank = Arc::new(Bank::default_for_tests());
let bank_start = Some(BankStart {
working_bank: bank,
bank_creation_time: Arc::new(Instant::now()),
});
// having active bank allows to consume immediately
assert_matches!(
DecisionMaker::consume_or_forward_packets(
&my_pubkey,
None,
bank_start.clone(),
false,
false
),
BufferedPacketsDecision::Consume(_)
);
assert_matches!(
DecisionMaker::consume_or_forward_packets(&my_pubkey, None, None, false, false),
BufferedPacketsDecision::Hold
);
assert_matches!(
DecisionMaker::consume_or_forward_packets(&my_pubkey1, None, None, false, false),
BufferedPacketsDecision::Hold
);
assert_matches!(
DecisionMaker::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
None,
false,
false
),
BufferedPacketsDecision::Forward
);
assert_matches!(
DecisionMaker::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
None,
true,
true
),
BufferedPacketsDecision::Hold
);
assert_matches!(
DecisionMaker::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
None,
true,
false
),
BufferedPacketsDecision::ForwardAndHold
);
assert_matches!(
DecisionMaker::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
bank_start.clone(),
false,
false
),
BufferedPacketsDecision::Consume(_)
);
assert_matches!(
DecisionMaker::consume_or_forward_packets(
&my_pubkey1,
Some(my_pubkey1),
None,
false,
false
),
BufferedPacketsDecision::Hold
);
assert_matches!(
DecisionMaker::consume_or_forward_packets(
&my_pubkey1,
Some(my_pubkey1),
bank_start,
false,
false
),
BufferedPacketsDecision::Consume(_)
);
}
}