Refactor/banking_stage_make_decision_consume_bank (#28946)

This commit is contained in:
apfitzge 2022-12-02 10:07:01 -06:00 committed by GitHub
parent 63e19c0068
commit fd3b5d08d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 134 deletions

View File

@ -78,6 +78,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
create_test_recorder(&bank, &blockstore, None, None);
let recorder = poh_recorder.read().unwrap().recorder();
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let tx = test_tx();
let transactions = vec![tx; 4194304];
@ -92,8 +93,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
BankingStage::consume_buffered_packets(
std::u128::MAX,
&poh_recorder,
&bank_start,
&mut transaction_buffer,
&None,
&s,

View File

@ -355,7 +355,7 @@ pub struct BankingStage {
#[derive(Debug, Clone)]
pub enum BufferedPacketsDecision {
Consume(u128),
Consume(BankStart),
Forward,
ForwardAndHold,
Hold,
@ -610,9 +610,7 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
fn do_process_packets(
max_tx_ingestion_ns: u128,
working_bank: &Arc<Bank>,
bank_creation_time: &Arc<Instant>,
bank_start: &BankStart,
payload: &mut ConsumeScannerPayload,
recorder: &TransactionRecorder,
transaction_status_sender: &Option<TransactionStatusSender>,
@ -632,8 +630,8 @@ impl BankingStage {
let packets_to_process_len = packets_to_process.len();
let (process_transactions_summary, process_packets_transactions_time) = measure!(
Self::process_packets_transactions(
working_bank,
bank_creation_time,
&bank_start.working_bank,
&bank_start.bank_creation_time,
recorder,
&payload.sanitized_transactions,
transaction_status_sender,
@ -659,9 +657,7 @@ impl BankingStage {
..
} = process_transactions_summary;
if reached_max_poh_height
|| !Bank::should_bank_still_be_processing_txs(bank_creation_time, max_tx_ingestion_ns)
{
if reached_max_poh_height || !bank_start.should_working_bank_still_be_processing_txs() {
payload.reached_end_of_slot = true;
}
@ -691,8 +687,7 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
pub fn consume_buffered_packets(
max_tx_ingestion_ns: u128,
poh_recorder: &Arc<RwLock<PohRecorder>>,
bank_start: &BankStart,
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
transaction_status_sender: &Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
@ -706,56 +701,34 @@ impl BankingStage {
let mut rebuffered_packet_count = 0;
let mut consumed_buffered_packets_count = 0;
let mut proc_start = Measure::start("consume_buffered_process");
let reached_end_of_slot;
let num_packets_to_process = unprocessed_transaction_storage.len();
let (bank_start, poh_recorder_lock_time) = measure!(
poh_recorder.read().unwrap().bank_start(),
"poh_recorder.read",
let reached_end_of_slot = unprocessed_transaction_storage.process_packets(
bank_start.working_bank.clone(),
banking_stage_stats,
slot_metrics_tracker,
|packets_to_process, payload| {
Self::do_process_packets(
bank_start,
payload,
recorder,
transaction_status_sender,
gossip_vote_sender,
banking_stage_stats,
qos_service,
log_messages_bytes_limit,
&mut consumed_buffered_packets_count,
&mut rebuffered_packet_count,
&test_fn,
packets_to_process,
)
},
);
slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us(
poh_recorder_lock_time.as_us(),
);
if let Some(BankStart {
working_bank,
bank_creation_time,
}) = bank_start
{
reached_end_of_slot = unprocessed_transaction_storage.process_packets(
working_bank.clone(),
banking_stage_stats,
slot_metrics_tracker,
|packets_to_process, payload| {
Self::do_process_packets(
max_tx_ingestion_ns,
&working_bank,
&bank_creation_time,
payload,
recorder,
transaction_status_sender,
gossip_vote_sender,
banking_stage_stats,
qos_service,
log_messages_bytes_limit,
&mut consumed_buffered_packets_count,
&mut rebuffered_packet_count,
&test_fn,
packets_to_process,
)
},
);
} else {
reached_end_of_slot = true;
}
if reached_end_of_slot {
slot_metrics_tracker.set_end_of_slot_unprocessed_buffer_len(
unprocessed_transaction_storage.len() as u64,
);
// We've hit the end of this slot, no need to perform more processing,
// Packet filtering will be done before forwarding.
}
proc_start.stop();
@ -782,15 +755,15 @@ impl BankingStage {
fn consume_or_forward_packets(
my_pubkey: &Pubkey,
leader_pubkey: Option<Pubkey>,
bank_still_processing_txs: Option<&Arc<Bank>>,
bank_start: Option<BankStart>,
would_be_leader: bool,
would_be_leader_shortly: bool,
) -> BufferedPacketsDecision {
// If has active bank, then immediately process buffered packets
// otherwise, based on leader schedule to either forward or hold packets
if let Some(bank) = bank_still_processing_txs {
if let Some(bank_start) = bank_start {
// If the bank is available, this node is the leader
BufferedPacketsDecision::Consume(bank.ns_per_slot)
BufferedPacketsDecision::Consume(bank_start)
} else if would_be_leader_shortly {
// If the node will be the leader soon, hold the packets for now
BufferedPacketsDecision::Hold
@ -817,18 +790,14 @@ impl BankingStage {
poh_recorder: &RwLock<PohRecorder>,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> (MetricsTrackerAction, BufferedPacketsDecision) {
let bank_start;
let (
leader_at_slot_offset,
bank_still_processing_txs,
would_be_leader,
would_be_leader_shortly,
) = {
let (leader_at_slot_offset, bank_start, would_be_leader, would_be_leader_shortly) = {
let poh = poh_recorder.read().unwrap();
bank_start = poh.bank_start();
let bank_start = poh
.bank_start()
.filter(|bank_start| bank_start.should_working_bank_still_be_processing_txs());
(
poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET),
PohRecorder::get_working_bank_if_not_expired(&bank_start.as_ref()),
bank_start,
poh.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT),
poh.would_be_leader(
(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET - 1) * DEFAULT_TICKS_PER_SLOT,
@ -841,7 +810,7 @@ impl BankingStage {
Self::consume_or_forward_packets(
my_pubkey,
leader_at_slot_offset,
bank_still_processing_txs,
bank_start,
would_be_leader,
would_be_leader_shortly,
),
@ -876,7 +845,7 @@ impl BankingStage {
slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us());
match decision {
BufferedPacketsDecision::Consume(max_tx_ingestion_ns) => {
BufferedPacketsDecision::Consume(bank_start) => {
// Take metrics action before consume packets (potentially resetting the
// slot metrics tracker to the next slot) so that we don't count the
// packet processing metrics from the next slot towards the metrics
@ -884,8 +853,7 @@ impl BankingStage {
slot_metrics_tracker.apply_action(metrics_action);
let (_, consume_buffered_packets_time) = measure!(
Self::consume_buffered_packets(
max_tx_ingestion_ns,
poh_recorder,
&bank_start,
unprocessed_transaction_storage,
transaction_status_sender,
gossip_vote_sender,
@ -2525,9 +2493,19 @@ mod tests {
let my_pubkey = solana_sdk::pubkey::new_rand();
let my_pubkey1 = solana_sdk::pubkey::new_rand();
let bank = Arc::new(Bank::default_for_tests());
let bank_start = Some(BankStart {
working_bank: bank,
bank_creation_time: Arc::new(Instant::now()),
});
// having active bank allows to consume immediately
assert_matches!(
BankingStage::consume_or_forward_packets(&my_pubkey, None, Some(&bank), false, false),
BankingStage::consume_or_forward_packets(
&my_pubkey,
None,
bank_start.clone(),
false,
false
),
BufferedPacketsDecision::Consume(_)
);
assert_matches!(
@ -2574,7 +2552,7 @@ mod tests {
BankingStage::consume_or_forward_packets(
&my_pubkey,
Some(my_pubkey1),
Some(&bank),
bank_start.clone(),
false,
false
),
@ -2594,7 +2572,7 @@ mod tests {
BankingStage::consume_or_forward_packets(
&my_pubkey1,
Some(my_pubkey1),
Some(&bank),
bank_start,
false,
false
),
@ -3680,29 +3658,15 @@ mod tests {
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
// When the working bank in poh_recorder is None, no packets should be processed
// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
let max_tx_processing_ns = std::u128::MAX;
BankingStage::consume_buffered_packets(
max_tx_processing_ns,
&poh_recorder,
&mut buffered_packet_batches,
&None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&QosService::new(1),
&mut LeaderSlotMetricsTracker::new(0),
None,
);
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank(&bank, false);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
BankingStage::consume_buffered_packets(
max_tx_processing_ns,
&poh_recorder,
&bank_start,
&mut buffered_packet_batches,
&None,
&gossip_vote_sender,
@ -3754,27 +3718,13 @@ mod tests {
// When the working bank in poh_recorder is None, no packets should be processed
assert!(!poh_recorder.read().unwrap().has_bank());
let max_tx_processing_ns = std::u128::MAX;
BankingStage::consume_buffered_packets(
max_tx_processing_ns,
&poh_recorder,
&mut buffered_packet_batches,
&None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&QosService::new(1),
&mut LeaderSlotMetricsTracker::new(0),
None,
);
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank(&bank, false);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
BankingStage::consume_buffered_packets(
max_tx_processing_ns,
&poh_recorder,
&bank_start,
&mut buffered_packet_batches,
&None,
&gossip_vote_sender,
@ -3814,6 +3764,7 @@ mod tests {
poh_recorder.write().unwrap().set_bank(&bank, false);
let poh_recorder_ = poh_recorder.clone();
let recorder = poh_recorder_.read().unwrap().recorder();
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
// Start up thread to process the banks
let t_consume = Builder::new()
@ -3835,8 +3786,7 @@ mod tests {
ThreadType::Transactions,
);
BankingStage::consume_buffered_packets(
std::u128::MAX,
&poh_recorder_,
&bank_start,
&mut buffered_packet_batches,
&None,
&gossip_vote_sender,

View File

@ -769,18 +769,6 @@ impl LeaderSlotMetricsTracker {
}
}
pub(crate) fn increment_consume_buffered_packets_poh_recorder_lock_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.consume_buffered_packets_timings
.poh_recorder_lock_us,
us
);
}
}
pub(crate) fn increment_process_packets_transactions_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(

View File

@ -224,9 +224,6 @@ impl ProcessBufferedPacketsTimings {
#[derive(Debug, Default)]
pub(crate) struct ConsumeBufferedPacketsTimings {
// Time spent grabbing poh recorder lock
pub poh_recorder_lock_us: u64,
// Time spent processing transactions
pub process_packets_transactions_us: u64,
}
@ -237,11 +234,6 @@ impl ConsumeBufferedPacketsTimings {
"banking_stage-leader_slot_consume_buffered_packets_timings",
("id", id as i64, i64),
("slot", slot as i64, i64),
(
"poh_recorder_lock_us",
self.poh_recorder_lock_us as i64,
i64
),
(
"process_packets_transactions_us",
self.process_packets_transactions_us as i64,

View File

@ -61,7 +61,7 @@ type Result<T> = std::result::Result<T, PohRecorderError>;
pub type WorkingBankEntry = (Arc<Bank>, (Entry, u64));
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct BankStart {
pub working_bank: Arc<Bank>,
pub bank_creation_time: Arc<Instant>,
@ -69,15 +69,19 @@ pub struct BankStart {
impl BankStart {
fn get_working_bank_if_not_expired(&self) -> Option<&Arc<Bank>> {
if Bank::should_bank_still_be_processing_txs(
&self.bank_creation_time,
self.working_bank.ns_per_slot,
) {
if self.should_working_bank_still_be_processing_txs() {
Some(&self.working_bank)
} else {
None
}
}
pub fn should_working_bank_still_be_processing_txs(&self) -> bool {
Bank::should_bank_still_be_processing_txs(
&self.bank_creation_time,
self.working_bank.ns_per_slot,
)
}
}
// Sends the Result of the record operation, including the index in the slot of the first