fix banking stage starvation (#25245)

This commit is contained in:
buffalu 2022-05-18 15:37:47 -05:00 committed by GitHub
parent 5299038065
commit 971748b335
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 31 deletions

View File

@ -86,6 +86,8 @@ const MIN_THREADS_BANKING: u32 = 1;
const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING; const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING;
const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128; const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128;
const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10);
pub struct ProcessTransactionBatchOutput { pub struct ProcessTransactionBatchOutput {
// The number of transactions filtered out by the cost model // The number of transactions filtered out by the cost model
cost_model_throttled_transactions_count: usize, cost_model_throttled_transactions_count: usize,
@ -826,7 +828,7 @@ impl BankingStage {
data_budget: &DataBudget, data_budget: &DataBudget,
qos_service: &QosService, qos_service: &QosService,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker, slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> BufferedPacketsDecision { ) {
let (decision, make_decision_time) = Measure::this( let (decision, make_decision_time) = Measure::this(
|_| { |_| {
let bank_start; let bank_start;
@ -925,8 +927,6 @@ impl BankingStage {
} }
_ => (), _ => (),
} }
decision
} }
fn handle_forwarding( fn handle_forwarding(
@ -1004,11 +1004,14 @@ impl BankingStage {
let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit); let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit);
let mut banking_stage_stats = BankingStageStats::new(id); let mut banking_stage_stats = BankingStageStats::new(id);
let qos_service = QosService::new(cost_model, id); let qos_service = QosService::new(cost_model, id);
let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
let mut last_metrics_update = Instant::now();
loop { loop {
let my_pubkey = cluster_info.id(); let my_pubkey = cluster_info.id();
while !buffered_packet_batches.is_empty() { if !buffered_packet_batches.is_empty() {
let (decision, process_buffered_packets_time) = Measure::this( let (_, process_buffered_packets_time) = Measure::this(
|_| { |_| {
Self::process_buffered_packets( Self::process_buffered_packets(
&my_pubkey, &my_pubkey,
@ -1030,36 +1033,33 @@ impl BankingStage {
); );
slot_metrics_tracker slot_metrics_tracker
.increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); .increment_process_buffered_packets_us(process_buffered_packets_time.as_us());
if matches!(decision, BufferedPacketsDecision::Hold)
|| matches!(decision, BufferedPacketsDecision::ForwardAndHold)
{
// If we are waiting on a new bank,
// check the receiver for more transactions/for exiting
break;
}
} }
let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this( if last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD {
|_| { let (_, slot_metrics_checker_check_slot_boundary_time) = Measure::this(
let current_poh_bank = { |_| {
let poh = poh_recorder.lock().unwrap(); let current_poh_bank = {
poh.bank_start() let poh = poh_recorder.lock().unwrap();
}; poh.bank_start()
slot_metrics_tracker.update_on_leader_slot_boundary(&current_poh_bank); };
}, slot_metrics_tracker.update_on_leader_slot_boundary(&current_poh_bank);
(), },
"slot_metrics_checker_check_slot_boundary", (),
); "slot_metrics_checker_check_slot_boundary",
slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us( );
slot_metrics_checker_check_slot_boundary_time.as_us(), slot_metrics_tracker.increment_slot_metrics_check_slot_boundary_us(
); slot_metrics_checker_check_slot_boundary_time.as_us(),
);
last_metrics_update = Instant::now();
}
let recv_timeout = if !buffered_packet_batches.is_empty() { let recv_timeout = if !buffered_packet_batches.is_empty() {
// If packets are buffered, let's wait for less time on recv from the channel. // If there are buffered packets, run the equivalent of try_recv to try reading more
// This helps detect the next leader faster, and processing the buffered // packets. This prevents starving BankingStage::consume_buffered_packets due to
// packets quickly // buffered_packet_batches containing transactions that exceed the cost model for
Duration::from_millis(10) // the current bank.
Duration::from_millis(0)
} else { } else {
// Default wait time // Default wait time
Duration::from_millis(100) Duration::from_millis(100)