diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 3ce4db6b5..cdef038b6 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -86,9 +86,6 @@ const MIN_THREADS_BANKING: u32 = 1; const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING; const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128; -const MAX_RECEIVE_BATCH_SIZE_PER_ITERATION: usize = 50_000; -const MAX_RECEIVE_TIME_MS_PER_ITERATION: u64 = 50; - pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model cost_model_throttled_transactions_count: usize, @@ -1982,18 +1979,27 @@ impl BankingStage { fn receive_until( verified_receiver: &CrossbeamReceiver>, recv_timeout: Duration, - batching_timeout: Duration, - batch_size_upperbound: usize, + packet_count_upperbound: usize, ) -> Result, RecvTimeoutError> { let start = Instant::now(); let mut packet_batches = verified_receiver.recv_timeout(recv_timeout)?; + let mut num_packets_received: usize = + packet_batches.iter().map(|batch| batch.packets.len()).sum(); while let Ok(packet_batch) = verified_receiver.try_recv() { - trace!("got more packets"); + trace!("got more packet batches in banking stage"); + let (packets_received, packet_count_overflowed) = num_packets_received + .overflowing_add(packet_batch.iter().map(|batch| batch.packets.len()).sum()); packet_batches.extend(packet_batch); - if start.elapsed() >= batching_timeout || packet_batches.len() >= batch_size_upperbound + + // Spend any leftover receive time budget to greedily receive more packet batches, + // until the upperbound of the packet count is reached. + if start.elapsed() >= recv_timeout + || packet_count_overflowed + || packets_received >= packet_count_upperbound { break; } + num_packets_received = packets_received; } Ok(packet_batches) } @@ -2013,8 +2019,7 @@ impl BankingStage { let packet_batches = Self::receive_until( verified_receiver, recv_timeout, - Duration::from_millis(MAX_RECEIVE_TIME_MS_PER_ITERATION), - MAX_RECEIVE_BATCH_SIZE_PER_ITERATION, + buffered_packet_batches.capacity() - buffered_packet_batches.len(), )?; recv_time.stop();