Tune banking_stage receive loop timing (#25172)
This commit is contained in:
parent
e4ac75af90
commit
71dd95e842
|
@ -86,9 +86,6 @@ const MIN_THREADS_BANKING: u32 = 1;
|
||||||
const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING;
|
const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING;
|
||||||
const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128;
|
const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128;
|
||||||
|
|
||||||
const MAX_RECEIVE_BATCH_SIZE_PER_ITERATION: usize = 50_000;
|
|
||||||
const MAX_RECEIVE_TIME_MS_PER_ITERATION: u64 = 50;
|
|
||||||
|
|
||||||
pub struct ProcessTransactionBatchOutput {
|
pub struct ProcessTransactionBatchOutput {
|
||||||
// The number of transactions filtered out by the cost model
|
// The number of transactions filtered out by the cost model
|
||||||
cost_model_throttled_transactions_count: usize,
|
cost_model_throttled_transactions_count: usize,
|
||||||
|
@ -1982,18 +1979,27 @@ impl BankingStage {
|
||||||
fn receive_until(
|
fn receive_until(
|
||||||
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
|
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
|
||||||
recv_timeout: Duration,
|
recv_timeout: Duration,
|
||||||
batching_timeout: Duration,
|
packet_count_upperbound: usize,
|
||||||
batch_size_upperbound: usize,
|
|
||||||
) -> Result<Vec<PacketBatch>, RecvTimeoutError> {
|
) -> Result<Vec<PacketBatch>, RecvTimeoutError> {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let mut packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
|
let mut packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
|
||||||
|
let mut num_packets_received: usize =
|
||||||
|
packet_batches.iter().map(|batch| batch.packets.len()).sum();
|
||||||
while let Ok(packet_batch) = verified_receiver.try_recv() {
|
while let Ok(packet_batch) = verified_receiver.try_recv() {
|
||||||
trace!("got more packets");
|
trace!("got more packet batches in banking stage");
|
||||||
|
let (packets_received, packet_count_overflowed) = num_packets_received
|
||||||
|
.overflowing_add(packet_batch.iter().map(|batch| batch.packets.len()).sum());
|
||||||
packet_batches.extend(packet_batch);
|
packet_batches.extend(packet_batch);
|
||||||
if start.elapsed() >= batching_timeout || packet_batches.len() >= batch_size_upperbound
|
|
||||||
|
// Spend any leftover receive time budget to greedily receive more packet batches,
|
||||||
|
// until the upperbound of the packet count is reached.
|
||||||
|
if start.elapsed() >= recv_timeout
|
||||||
|
|| packet_count_overflowed
|
||||||
|
|| packets_received >= packet_count_upperbound
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
num_packets_received = packets_received;
|
||||||
}
|
}
|
||||||
Ok(packet_batches)
|
Ok(packet_batches)
|
||||||
}
|
}
|
||||||
|
@ -2013,8 +2019,7 @@ impl BankingStage {
|
||||||
let packet_batches = Self::receive_until(
|
let packet_batches = Self::receive_until(
|
||||||
verified_receiver,
|
verified_receiver,
|
||||||
recv_timeout,
|
recv_timeout,
|
||||||
Duration::from_millis(MAX_RECEIVE_TIME_MS_PER_ITERATION),
|
buffered_packet_batches.capacity() - buffered_packet_batches.len(),
|
||||||
MAX_RECEIVE_BATCH_SIZE_PER_ITERATION,
|
|
||||||
)?;
|
)?;
|
||||||
recv_time.stop();
|
recv_time.stop();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue