diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 5a8b67b83c..bbccac5dc5 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -74,11 +74,8 @@ fn bench_consume_buffered(bencher: &mut Bencher) { // This tests the performance of buffering packets. // If the packet buffers are copied, performance will be poor. bencher.iter(move || { - let _ignored = BankingStage::consume_buffered_packets( - &my_pubkey, - &poh_recorder, - packets.as_slice(), - ); + let _ignored = + BankingStage::consume_buffered_packets(&my_pubkey, &poh_recorder, &mut packets); }); exit.store(true, Ordering::Relaxed); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index e412882db0..e17be807e6 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -146,24 +146,20 @@ impl BankingStage { pub fn consume_buffered_packets( my_pubkey: &Pubkey, poh_recorder: &Arc>, - buffered_packets: &[PacketsAndOffsets], + buffered_packets: &mut Vec, ) -> Result { let mut unprocessed_packets = vec![]; let mut rebuffered_packets = 0; let mut new_tx_count = 0; let buffered_len = buffered_packets.len(); - let mut buffered_packets_iter = buffered_packets.iter(); + let mut buffered_packets_iter = buffered_packets.drain(..); let proc_start = Instant::now(); while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() { let bank = poh_recorder.lock().unwrap().bank(); if bank.is_none() { rebuffered_packets += unprocessed_indexes.len(); - Self::push_unprocessed( - &mut unprocessed_packets, - msgs.to_owned(), - unprocessed_indexes.to_owned(), - ); + Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); continue; } let bank = bank.unwrap(); @@ -180,11 +176,7 @@ impl BankingStage { // Collect any unprocessed transactions in this batch for forwarding rebuffered_packets += new_unprocessed_indexes.len(); - Self::push_unprocessed( - &mut unprocessed_packets, - msgs.to_owned(), - new_unprocessed_indexes, - ); + Self::push_unprocessed(&mut unprocessed_packets, msgs, new_unprocessed_indexes); if processed < verified_txs_len { let next_leader = poh_recorder.lock().unwrap().next_slot_leader(); @@ -197,11 +189,7 @@ impl BankingStage { my_pubkey, next_leader, ); - Self::push_unprocessed( - &mut unprocessed_packets, - msgs.to_owned(), - unprocessed_indexes, - ); + Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); } } } @@ -257,9 +245,9 @@ impl BankingStage { socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &Arc>, - buffered_packets: &[PacketsAndOffsets], + buffered_packets: &mut Vec, enable_forwarding: bool, - ) -> Result { + ) -> Result<()> { let rcluster_info = cluster_info.read().unwrap(); let (decision, next_leader) = { @@ -278,28 +266,35 @@ impl BankingStage { match decision { BufferedPacketsDecision::Consume => { - Self::consume_buffered_packets(&rcluster_info.id(), poh_recorder, buffered_packets) + let mut unprocessed = Self::consume_buffered_packets( + &rcluster_info.id(), + poh_recorder, + buffered_packets, + )?; + buffered_packets.append(&mut unprocessed); + Ok(()) } BufferedPacketsDecision::Forward => { if enable_forwarding { - next_leader.map_or(Ok(buffered_packets.to_vec()), |leader_pubkey| { - rcluster_info.lookup(&leader_pubkey).map_or( - Ok(buffered_packets.to_vec()), - |leader| { + next_leader.map_or(Ok(()), |leader_pubkey| { + rcluster_info + .lookup(&leader_pubkey) + .map_or(Ok(()), |leader| { let _ = Self::forward_buffered_packets( &socket, &leader.tpu_via_blobs, &buffered_packets, ); - Ok(vec![]) - }, - ) + buffered_packets.clear(); + Ok(()) + }) }) } else { - Ok(vec![]) + buffered_packets.clear(); + Ok(()) } } - _ => Ok(buffered_packets.to_vec()), + _ => Ok(()), } } @@ -319,10 +314,9 @@ impl BankingStage { &socket, poh_recorder, cluster_info, - &buffered_packets, + &mut buffered_packets, enable_forwarding, ) - .map(|packets| buffered_packets = packets) .unwrap_or_else(|_| buffered_packets.clear()); } @@ -345,7 +339,7 @@ impl BankingStage { id, ) { Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), - Ok(unprocessed_packets) => { + Ok(mut unprocessed_packets) => { if unprocessed_packets.is_empty() { continue; } @@ -354,7 +348,7 @@ impl BankingStage { .map(|(_, unprocessed)| unprocessed.len()) .sum(); inc_new_counter_info!("banking_stage-buffered_packets", num); - buffered_packets.extend_from_slice(&unprocessed_packets); + buffered_packets.append(&mut unprocessed_packets); } Err(err) => { debug!("solana-banking-stage-tx: exit due to {:?}", err);