diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0373079f1b..1f7e69e309 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -86,6 +86,9 @@ const MIN_THREADS_BANKING: u32 = 1; const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING; const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128; +const MAX_RECEIVE_BATCH_SIZE_PER_ITERATION: usize = 50_000; +const MAX_RECEIVE_TIME_MS_PER_ITERATION: u64 = 50; + pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model cost_model_throttled_transactions_count: usize, @@ -1977,6 +1980,25 @@ impl BankingStage { .collect() } + fn receive_until( + verified_receiver: &CrossbeamReceiver>, + recv_timeout: Duration, + batching_timeout: Duration, + batch_size_upperbound: usize, + ) -> Result, RecvTimeoutError> { + let start = Instant::now(); + let mut packet_batches = verified_receiver.recv_timeout(recv_timeout)?; + while let Ok(packet_batch) = verified_receiver.try_recv() { + trace!("got more packets"); + packet_batches.extend(packet_batch); + if start.elapsed() >= batching_timeout || packet_batches.len() >= batch_size_upperbound + { + break; + } + } + Ok(packet_batches) + } + #[allow(clippy::too_many_arguments)] /// Receive incoming packets, push into unprocessed buffer with packet indexes fn receive_and_buffer_packets( @@ -1989,7 +2011,12 @@ impl BankingStage { slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("receive_and_buffer_packets_recv"); - let packet_batches = verified_receiver.recv_timeout(recv_timeout)?; + let packet_batches = Self::receive_until( + verified_receiver, + recv_timeout, + Duration::from_millis(MAX_RECEIVE_TIME_MS_PER_ITERATION), + MAX_RECEIVE_BATCH_SIZE_PER_ITERATION, + )?; recv_time.stop(); let packet_batches_len = packet_batches.len(); @@ -2442,29 +2469,7 @@ mod tests { } = create_slow_genesis_config(2); let (verified_sender, verified_receiver) = unbounded(); - // Process a batch that includes a transaction that receives two lamports. let alice = Keypair::new(); - let tx = - system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, genesis_config.hash()); - - let packet_batches = to_packet_batches(&[tx], 1); - let packet_batches = packet_batches - .into_iter() - .map(|batch| (batch, vec![1u8])) - .collect(); - let packet_batches = convert_from_old_verified(packet_batches); - verified_sender.send(packet_batches).unwrap(); - - // Process a second batch that uses the same from account, so conflicts with above TX - let tx = - system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, genesis_config.hash()); - let packet_batches = to_packet_batches(&[tx], 1); - let packet_batches = packet_batches - .into_iter() - .map(|batch| (batch, vec![1u8])) - .collect(); - let packet_batches = convert_from_old_verified(packet_batches); - verified_sender.send(packet_batches).unwrap(); let (vote_sender, vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); @@ -2501,6 +2506,38 @@ mod tests { Arc::new(RwLock::new(CostModel::default())), ); + // Process a batch that includes a transaction that receives two lamports. + let tx = system_transaction::transfer( + &mint_keypair, + &alice.pubkey(), + 2, + genesis_config.hash(), + ); + + let packet_batches = to_packet_batches(&[tx], 1); + let packet_batches = packet_batches + .into_iter() + .map(|batch| (batch, vec![1u8])) + .collect(); + let packet_batches = convert_from_old_verified(packet_batches); + verified_sender.send(packet_batches).unwrap(); + + sleep(Duration::from_millis(200)); + // Process a second batch that uses the same from account, so conflicts with above TX + let tx = system_transaction::transfer( + &mint_keypair, + &alice.pubkey(), + 1, + genesis_config.hash(), + ); + let packet_batches = to_packet_batches(&[tx], 1); + let packet_batches = packet_batches + .into_iter() + .map(|batch| (batch, vec![1u8])) + .collect(); + let packet_batches = convert_from_old_verified(packet_batches); + verified_sender.send(packet_batches).unwrap(); + // wait for banking_stage to eat the packets while bank.get_balance(&alice.pubkey()) < 2 { sleep(Duration::from_millis(100));