diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 0ef27747a..5b051469e 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -69,7 +69,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let mut packets = vec![]; for batch in batches { let batch_len = batch.packets.len(); - packets.push((Rc::new(batch), vec![0usize; batch_len])); + packets.push((batch, vec![0usize; batch_len])); } // This tests the performance of buffering packets. // If the packet buffers are copied, performance will be poor. diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 86565cd7d..881317db3 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -27,7 +27,6 @@ use solana_sdk::timing::{ }; use solana_sdk::transaction::{self, Transaction, TransactionError}; use std::net::UdpSocket; -use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex, RwLock}; @@ -36,8 +35,7 @@ use std::time::Duration; use std::time::Instant; use sys_info; -// Rc prevents clone/copy of Packets vector buffer -type PacketsAndOffsets = (Rc, Vec); +type PacketsAndOffsets = (Packets, Vec); pub type UnprocessedPackets = Vec; // number of threads is 1 until mt bank is ready @@ -711,11 +709,6 @@ impl BankingStage { let proc_start = Instant::now(); let mut new_tx_count = 0; - let mms: Vec<_> = mms - .into_iter() - .map(|(packets, vers)| (Rc::new(packets), vers)) - .collect(); - let mut mms_iter = mms.into_iter(); let mut unprocessed_packets = vec![]; while let Some((msgs, vers)) = mms_iter.next() { @@ -772,7 +765,7 @@ impl BankingStage { fn push_unprocessed( unprocessed_packets: &mut UnprocessedPackets, - packets: Rc, + packets: Packets, packet_indexes: Vec, ) { if !packet_indexes.is_empty() { @@ -1537,7 +1530,7 @@ mod tests { let valid_indexes = (0..32) .filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None }) .collect_vec(); - (Rc::new(packets), valid_indexes) + (packets, valid_indexes) }) .collect_vec(); diff --git a/core/src/packet.rs b/core/src/packet.rs index 4b3a6c8ad..e0003e8f3 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -18,7 +18,6 @@ use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, RwLock}; -use std::time::Instant; pub type SharedBlob = Arc>; pub type SharedBlobs = Vec; @@ -214,9 +213,6 @@ pub enum BlobError { impl Packets { pub fn recv_from(&mut self, socket: &UdpSocket) -> Result { - const MAX_PACKETS_PER_VEC: usize = 1024; - const MAX_MILLIS_PER_BATCH: u128 = 2; - let mut i = 0; //DOCUMENTED SIDE-EFFECT //Performance out of the IO without poll @@ -226,16 +222,11 @@ impl Packets { // * set it back to blocking before returning socket.set_nonblocking(false)?; trace!("receiving on {}", socket.local_addr().unwrap()); - let start = Instant::now(); loop { self.packets.resize(i + NUM_RCVMMSGS, Packet::default()); match recv_mmsg(socket, &mut self.packets[i..]) { Err(_) if i > 0 => { - if i >= MAX_PACKETS_PER_VEC - || start.elapsed().as_millis() > MAX_MILLIS_PER_BATCH - { - break; - } + break; } Err(e) => { trace!("recv_from err {:?}", e); @@ -247,9 +238,7 @@ impl Packets { } trace!("got {} packets", npkts); i += npkts; - if i >= MAX_PACKETS_PER_VEC - || start.elapsed().as_millis() > MAX_MILLIS_PER_BATCH - { + if npkts != NUM_RCVMMSGS || i >= 1024 { break; } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 4d5515b98..c1d6fd058 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -75,12 +75,8 @@ impl SigVerifyStage { let verified_batch = Self::verify_batch(batch, sigverify_disabled); inc_new_counter_info!("sigverify_stage-verified_packets_send", len); - // Batch may be very large. Break it up so banking_stage can have maximum - // parallelism. - for item in verified_batch { - if sendr.send(vec![item]).is_err() { - return Err(Error::SendError); - } + if sendr.send(verified_batch).is_err() { + return Err(Error::SendError); } let total_time_ms = timing::duration_as_ms(&now.elapsed());