diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index e484d3e4a..96273bac4 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -20,13 +20,13 @@ fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { w.meta.size = PACKET_DATA_SIZE; w.meta.set_addr(&addr); } - let msgs_ = msgs.clone(); + let msgs = Arc::new(msgs); spawn(move || loop { if exit.load(Ordering::Relaxed) { return; } let mut num = 0; - for p in &msgs_.packets { + for p in &msgs.packets { let a = p.meta.addr(); assert!(p.meta.size < BLOB_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index f08cafd71..a1ddc092b 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -15,6 +15,7 @@ use solana::genesis_utils::create_genesis_block; use solana::packet::to_packets_chunked; use solana::poh_recorder::WorkingBankEntries; use solana::service::Service; +use solana::test_tx::test_tx; use solana_runtime::bank::Bank; use solana_sdk::hash::hash; use solana_sdk::pubkey::Pubkey; @@ -24,6 +25,7 @@ use solana_sdk::timing::{ duration_as_ms, timestamp, DEFAULT_TICKS_PER_SLOT, MAX_RECENT_BLOCKHASHES, }; use std::iter; +use std::rc::Rc; use std::sync::atomic::Ordering; use std::sync::mpsc::{channel, Receiver}; use std::sync::{Arc, RwLock}; @@ -48,6 +50,40 @@ fn check_txs(receiver: &Arc>, ref_tx_count: usize) assert_eq!(total, ref_tx_count); } +#[bench] +fn bench_consume_buffered(bencher: &mut Bencher) { + let (genesis_block, _mint_keypair) = create_genesis_block(100_000); + let bank = Arc::new(Bank::new(&genesis_block)); + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = Arc::new( + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), + ); + let (exit, poh_recorder, poh_service, _signal_receiver) = + create_test_recorder(&bank, &blocktree); + + let tx = test_tx(); + let len = 4096; + let chunk_size = 1024; + let batches = to_packets_chunked(&vec![tx; len], chunk_size); + let mut packets = vec![]; + for batch in batches { + let batch_len = batch.packets.len(); + packets.push((Rc::new(batch), vec![0usize; batch_len])); + } + // This tests the performance of buffering packets. + // If the packet buffers are copied, performance will be poor. + bencher.iter(move || { + let packets_len = packets.len(); + let res = BankingStage::consume_buffered_packets(&poh_recorder, packets.as_slice()); + }); + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + let _unused = Blocktree::destroy(&ledger_path); +} + #[bench] fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { solana_logger::setup(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index acc0df5c6..7461fff3e 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -28,6 +28,7 @@ use solana_sdk::timing::{ use solana_sdk::transaction::{self, Transaction, TransactionError}; use std::cmp; use std::net::UdpSocket; +use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::{Arc, Mutex, RwLock}; @@ -36,7 +37,9 @@ use std::time::Duration; use std::time::Instant; use sys_info; -pub type UnprocessedPackets = Vec<(Packets, Vec)>; +// Rc prevents clone/copy of Packets vector buffer +type PacketsAndOffsets = (Rc, Vec); +pub type UnprocessedPackets = Vec; // number of threads is 1 until mt bank is ready pub const NUM_THREADS: u32 = 10; @@ -67,7 +70,7 @@ impl BankingStage { poh_recorder, verified_receiver, verified_vote_receiver, - cmp::min(2, Self::num_threads()), + cmp::max(2, Self::num_threads()), ) } @@ -109,6 +112,7 @@ impl BankingStage { &cluster_info, &mut recv_start, enable_forwarding, + i, ); exit.store(true, Ordering::Relaxed); }) @@ -118,7 +122,7 @@ impl BankingStage { Self { bank_thread_hdls } } - fn filter_valid_packets_for_forwarding(all_packets: &[(Packets, Vec)]) -> Vec<&Packet> { + fn filter_valid_packets_for_forwarding(all_packets: &[PacketsAndOffsets]) -> Vec<&Packet> { all_packets .iter() .flat_map(|(p, valid_indexes)| valid_indexes.iter().map(move |x| &p.packets[*x])) @@ -128,7 +132,7 @@ impl BankingStage { fn forward_buffered_packets( socket: &std::net::UdpSocket, tpu_via_blobs: &std::net::SocketAddr, - unprocessed_packets: &[(Packets, Vec)], + unprocessed_packets: &[PacketsAndOffsets], ) -> std::io::Result<()> { let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); @@ -141,19 +145,26 @@ impl BankingStage { Ok(()) } - fn consume_buffered_packets( + pub fn consume_buffered_packets( poh_recorder: &Arc>, - buffered_packets: &[(Packets, Vec)], + buffered_packets: &[PacketsAndOffsets], ) -> Result { let mut unprocessed_packets = vec![]; let mut rebuffered_packets = 0; let mut new_tx_count = 0; + let buffered_len = buffered_packets.len(); let mut buffered_packets_iter = buffered_packets.iter(); + + let proc_start = Instant::now(); while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() { let bank = poh_recorder.lock().unwrap().bank(); if bank.is_none() { rebuffered_packets += unprocessed_indexes.len(); - unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes.to_owned())); + Self::push_unprocessed( + &mut unprocessed_packets, + msgs.to_owned(), + unprocessed_indexes.to_owned(), + ); continue; } let bank = bank.unwrap(); @@ -169,23 +180,39 @@ impl BankingStage { new_tx_count += processed; // Collect any unprocessed transactions in this batch for forwarding - if !new_unprocessed_indexes.is_empty() { - rebuffered_packets += new_unprocessed_indexes.len(); - unprocessed_packets.push((msgs.to_owned(), new_unprocessed_indexes)); - } + rebuffered_packets += new_unprocessed_indexes.len(); + Self::push_unprocessed( + &mut unprocessed_packets, + msgs.to_owned(), + new_unprocessed_indexes, + ); if processed < verified_txs_len { // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones while let Some((msgs, unprocessed_indexes)) = buffered_packets_iter.next() { let unprocessed_indexes = Self::filter_unprocessed_packets(&bank, &msgs, &unprocessed_indexes); - if !unprocessed_indexes.is_empty() { - unprocessed_packets.push((msgs.to_owned(), unprocessed_indexes)); - } + Self::push_unprocessed( + &mut unprocessed_packets, + msgs.to_owned(), + unprocessed_indexes, + ); } } } + let total_time_s = timing::duration_as_s(&proc_start.elapsed()); + let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + + debug!( + "@{:?} done processing buffered batches: {} time: {:?}ms tx count: {} tx/s: {}", + timing::timestamp(), + buffered_len, + total_time_ms, + new_tx_count, + (new_tx_count as f32) / (total_time_s) + ); + inc_new_counter_info!("banking_stage-rebuffered_packets", rebuffered_packets); inc_new_counter_info!("banking_stage-consumed_buffered_packets", new_tx_count); inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count); @@ -225,7 +252,7 @@ impl BankingStage { socket: &std::net::UdpSocket, poh_recorder: &Arc>, cluster_info: &Arc>, - buffered_packets: &[(Packets, Vec)], + buffered_packets: &[PacketsAndOffsets], enable_forwarding: bool, ) -> Result { let rcluster_info = cluster_info.read().unwrap(); @@ -277,6 +304,7 @@ impl BankingStage { cluster_info: &Arc>, recv_start: &mut Instant, enable_forwarding: bool, + id: u32, ) { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = vec![]; @@ -303,8 +331,13 @@ impl BankingStage { Duration::from_millis(100) }; - match Self::process_packets(&verified_receiver, &poh_recorder, recv_start, recv_timeout) - { + match Self::process_packets( + &verified_receiver, + &poh_recorder, + recv_start, + recv_timeout, + id, + ) { Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), Ok(unprocessed_packets) => { if unprocessed_packets.is_empty() { @@ -645,12 +678,20 @@ impl BankingStage { filtered_unprocessed_tx_indexes } + fn generate_packet_indexes(vers: Vec) -> Vec { + vers.iter() + .enumerate() + .filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None }) + .collect() + } + /// Process the incoming packets pub fn process_packets( verified_receiver: &Arc>>, poh: &Arc>, recv_start: &mut Instant, recv_timeout: Duration, + id: u32, ) -> Result { let mms = verified_receiver .lock() @@ -658,29 +699,30 @@ impl BankingStage { .recv_timeout(recv_timeout)?; let mms_len = mms.len(); - let count = mms.iter().map(|x| x.1.len()).sum(); + let count: usize = mms.iter().map(|x| x.1.len()).sum(); debug!( - "@{:?} process start stalled for: {:?}ms txs: {}", + "@{:?} process start stalled for: {:?}ms txs: {} id: {}", timing::timestamp(), timing::duration_as_ms(&recv_start.elapsed()), count, + id, ); inc_new_counter_debug!("banking_stage-transactions_received", count); let proc_start = Instant::now(); let mut new_tx_count = 0; + + let mms: Vec<_> = mms + .into_iter() + .map(|(packets, vers)| (Rc::new(packets), vers)) + .collect(); + let mut mms_iter = mms.into_iter(); let mut unprocessed_packets = vec![]; while let Some((msgs, vers)) = mms_iter.next() { - let packet_indexes: Vec = vers - .iter() - .enumerate() - .filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None }) - .collect(); + let packet_indexes = Self::generate_packet_indexes(vers); let bank = poh.lock().unwrap().bank(); if bank.is_none() { - if !packet_indexes.is_empty() { - unprocessed_packets.push((msgs, packet_indexes)); - } + Self::push_unprocessed(&mut unprocessed_packets, msgs, packet_indexes); continue; } let bank = bank.unwrap(); @@ -691,23 +733,15 @@ impl BankingStage { new_tx_count += processed; // Collect any unprocessed transactions in this batch for forwarding - if !unprocessed_indexes.is_empty() { - unprocessed_packets.push((msgs, unprocessed_indexes)); - } + Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); if processed < verified_txs_len { // Walk thru rest of the transactions and filter out the invalid (e.g. too old) ones while let Some((msgs, vers)) = mms_iter.next() { - let packet_indexes: Vec = vers - .iter() - .enumerate() - .filter_map(|(index, ver)| if *ver != 0 { Some(index) } else { None }) - .collect(); + let packet_indexes = Self::generate_packet_indexes(vers); let unprocessed_indexes = Self::filter_unprocessed_packets(&bank, &msgs, &packet_indexes); - if !unprocessed_indexes.is_empty() { - unprocessed_packets.push((msgs, unprocessed_indexes)); - } + Self::push_unprocessed(&mut unprocessed_packets, msgs, unprocessed_indexes); } } } @@ -719,12 +753,14 @@ impl BankingStage { let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); debug!( - "@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {}", + "@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {} total count: {} id: {}", timing::timestamp(), mms_len, total_time_ms, new_tx_count, - (new_tx_count as f32) / (total_time_s) + (new_tx_count as f32) / (total_time_s), + count, + id, ); inc_new_counter_debug!("banking_stage-process_packets", count); inc_new_counter_debug!("banking_stage-process_transactions", new_tx_count); @@ -733,6 +769,16 @@ impl BankingStage { Ok(unprocessed_packets) } + + fn push_unprocessed( + unprocessed_packets: &mut UnprocessedPackets, + packets: Rc, + packet_indexes: Vec, + ) { + if !packet_indexes.is_empty() { + unprocessed_packets.push((packets, packet_indexes)); + } + } } impl Service for BankingStage { @@ -924,8 +970,14 @@ mod tests { // glad they all fit assert_eq!(packets.len(), 1); + + let packets = packets + .into_iter() + .map(|packets| (packets, vec![0u8, 1u8, 1u8])) + .collect(); + verified_sender // no_ver, anf, tx - .send(vec![(packets[0].clone(), vec![0u8, 1u8, 1u8])]) + .send(packets) .unwrap(); drop(verified_sender); @@ -990,9 +1042,11 @@ mod tests { ); let packets = to_packets(&[tx]); - verified_sender - .send(vec![(packets[0].clone(), vec![1u8])]) - .unwrap(); + let packets = packets + .into_iter() + .map(|packets| (packets, vec![1u8])) + .collect(); + verified_sender.send(packets).unwrap(); // Process a second batch that spends one of those lamports. let tx = system_transaction::create_user_account( @@ -1003,9 +1057,11 @@ mod tests { 0, ); let packets = to_packets(&[tx]); - verified_sender - .send(vec![(packets[0].clone(), vec![1u8])]) - .unwrap(); + let packets = packets + .into_iter() + .map(|packets| (packets, vec![1u8])) + .collect(); + verified_sender.send(packets).unwrap(); let (vote_sender, vote_receiver) = channel(); let ledger_path = get_tmp_ledger_path!(); @@ -1493,7 +1549,7 @@ mod tests { let valid_indexes = (0..32) .filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None }) .collect_vec(); - (packets, valid_indexes) + (Rc::new(packets), valid_indexes) }) .collect_vec(); diff --git a/core/src/packet.rs b/core/src/packet.rs index cc97da77e..664f441b8 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -18,6 +18,7 @@ use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, RwLock}; +use std::time::Instant; pub type SharedBlob = Arc>; pub type SharedBlobs = Vec; @@ -213,6 +214,9 @@ pub enum BlobError { impl Packets { pub fn recv_from(&mut self, socket: &UdpSocket) -> Result { + const MAX_PACKETS_PER_VEC: usize = 1024; + const MAX_MILLIS_PER_BATCH: u128 = 2; + let mut i = 0; //DOCUMENTED SIDE-EFFECT //Performance out of the IO without poll @@ -222,11 +226,16 @@ impl Packets { // * set it back to blocking before returning socket.set_nonblocking(false)?; trace!("receiving on {}", socket.local_addr().unwrap()); + let start = Instant::now(); loop { self.packets.resize(i + NUM_RCVMMSGS, Packet::default()); match recv_mmsg(socket, &mut self.packets[i..]) { Err(_) if i > 0 => { - break; + if i >= MAX_PACKETS_PER_VEC + || start.elapsed().as_millis() > MAX_MILLIS_PER_BATCH + { + break; + } } Err(e) => { trace!("recv_from err {:?}", e); @@ -238,7 +247,9 @@ impl Packets { } trace!("got {} packets", npkts); i += npkts; - if npkts != NUM_RCVMMSGS || i >= 1024 { + if i >= MAX_PACKETS_PER_VEC + || start.elapsed().as_millis() > MAX_MILLIS_PER_BATCH + { break; } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index c1d6fd058..4d5515b98 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -75,8 +75,12 @@ impl SigVerifyStage { let verified_batch = Self::verify_batch(batch, sigverify_disabled); inc_new_counter_info!("sigverify_stage-verified_packets_send", len); - if sendr.send(verified_batch).is_err() { - return Err(Error::SendError); + // Batch may be very large. Break it up so banking_stage can have maximum + // parallelism. + for item in verified_batch { + if sendr.send(vec![item]).is_err() { + return Err(Error::SendError); + } } let total_time_ms = timing::duration_as_ms(&now.elapsed());