flag end-of-slot when poh bank is gone

This commit is contained in:
Tao Zhu 2022-02-10 20:00:36 -06:00 committed by Tao Zhu
parent ab92578b02
commit 03bf66a51b
1 changed files with 46 additions and 27 deletions

View File

@ -325,6 +325,12 @@ pub struct BatchedTransactionCostDetails {
pub batched_execute_cost: u64, pub batched_execute_cost: u64,
} }
#[derive(Debug, Default)]
struct EndOfSlot {
next_slot_leader: Option<Pubkey>,
working_bank: Option<Arc<Bank>>,
}
/// Stores the stage's thread handle and output receiver. /// Stores the stage's thread handle and output receiver.
pub struct BankingStage { pub struct BankingStage {
bank_thread_hdls: Vec<JoinHandle<()>>, bank_thread_hdls: Vec<JoinHandle<()>>,
@ -519,41 +525,47 @@ impl BankingStage {
let mut consumed_buffered_packets_count = 0; let mut consumed_buffered_packets_count = 0;
let buffered_packet_batches_len = buffered_packet_batches.len(); let buffered_packet_batches_len = buffered_packet_batches.len();
let mut proc_start = Measure::start("consume_buffered_process"); let mut proc_start = Measure::start("consume_buffered_process");
let mut reached_end_of_slot = None; let mut reached_end_of_slot: Option<EndOfSlot> = None;
RetainMut::retain_mut( RetainMut::retain_mut(
buffered_packet_batches, buffered_packet_batches,
|buffered_packet_batch_and_offsets| { |buffered_packet_batch_and_offsets| {
let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) = let (packet_batch, ref mut original_unprocessed_indexes, _forwarded) =
buffered_packet_batch_and_offsets; buffered_packet_batch_and_offsets;
if let Some((next_leader, bank)) = &reached_end_of_slot { if let Some(end_of_slot) = &reached_end_of_slot {
// We've hit the end of this slot, no need to perform more processing, // We've hit the end of this slot, no need to perform more processing,
// just filter the remaining packets for the invalid (e.g. too old) ones // just filter the remaining packets for the invalid (e.g. too old) ones
let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot( // if the working_bank is available
bank, if let Some(bank) = &end_of_slot.working_bank {
packet_batch, let new_unprocessed_indexes =
original_unprocessed_indexes, Self::filter_unprocessed_packets_at_end_of_slot(
my_pubkey, bank,
*next_leader, packet_batch,
banking_stage_stats, original_unprocessed_indexes,
); my_pubkey,
end_of_slot.next_slot_leader,
banking_stage_stats,
);
let end_of_slot_filtered_invalid_count = original_unprocessed_indexes let end_of_slot_filtered_invalid_count = original_unprocessed_indexes
.len() .len()
.saturating_sub(new_unprocessed_indexes.len()); .saturating_sub(new_unprocessed_indexes.len());
slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count(
end_of_slot_filtered_invalid_count as u64, end_of_slot_filtered_invalid_count as u64,
); );
banking_stage_stats banking_stage_stats
.end_of_slot_filtered_invalid_count .end_of_slot_filtered_invalid_count
.fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed);
Self::update_buffered_packets_with_new_unprocessed( Self::update_buffered_packets_with_new_unprocessed(
original_unprocessed_indexes, original_unprocessed_indexes,
new_unprocessed_indexes, new_unprocessed_indexes,
) )
} else {
true
}
} else { } else {
let bank_start = poh_recorder.lock().unwrap().bank_start(); let bank_start = poh_recorder.lock().unwrap().bank_start();
if let Some(BankStart { if let Some(BankStart {
@ -586,10 +598,10 @@ impl BankingStage {
max_tx_ingestion_ns, max_tx_ingestion_ns,
) )
{ {
reached_end_of_slot = Some(( reached_end_of_slot = Some(EndOfSlot {
poh_recorder.lock().unwrap().next_slot_leader(), next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(),
working_bank, working_bank: Some(working_bank),
)); });
} }
// The difference between all transactions passed to execution and the ones that // The difference between all transactions passed to execution and the ones that
@ -616,6 +628,13 @@ impl BankingStage {
} }
has_more_unprocessed_transactions has_more_unprocessed_transactions
} else { } else {
// mark as end-of-slot to avoid aggressively lock poh for the remaining for
// packet batches in buffer
reached_end_of_slot = Some(EndOfSlot {
next_slot_leader: poh_recorder.lock().unwrap().next_slot_leader(),
working_bank: None,
});
// `original_unprocessed_indexes` must have remaining packets to process // `original_unprocessed_indexes` must have remaining packets to process
// if not yet processed. // if not yet processed.
assert!(Self::packet_has_more_unprocessed_transactions( assert!(Self::packet_has_more_unprocessed_transactions(