diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 3923228123..2839989c39 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -367,7 +367,7 @@ fn main() { for (packet_batch_index, packet_batch) in packets_for_this_iteration.packet_batches.iter().enumerate() { - sent += packet_batch.packets.len(); + sent += packet_batch.len(); trace!( "Sending PacketBatch index {}, {}", packet_batch_index, diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 63930debe9..81d866360d 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -21,9 +21,10 @@ use { fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut packet_batch = PacketBatch::default(); - packet_batch.packets.resize(10, Packet::default()); - for w in packet_batch.packets.iter_mut() { + let batch_size = 10; + let mut packet_batch = PacketBatch::with_capacity(batch_size); + packet_batch.resize(batch_size, Packet::default()); + for w in packet_batch.iter_mut() { w.meta.size = PACKET_DATA_SIZE; w.meta.set_socket_addr(addr); } @@ -33,7 +34,7 @@ fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { return; } let mut num = 0; - for p in &packet_batch.packets { + for p in packet_batch.iter() { let a = p.meta.socket_addr(); assert!(p.meta.size <= PACKET_DATA_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); @@ -50,7 +51,7 @@ fn sink(exit: Arc, rvs: Arc, r: PacketBatchReceiver) -> } let timer = Duration::new(1, 0); if let Ok(packet_batch) = r.recv_timeout(timer) { - rvs.fetch_add(packet_batch.packets.len(), Ordering::Relaxed); + rvs.fetch_add(packet_batch.len(), Ordering::Relaxed); } }) } diff --git a/client/tests/quic_client.rs b/client/tests/quic_client.rs index 8d3681e2b4..9c661e17bd 100644 --- a/client/tests/quic_client.rs +++ b/client/tests/quic_client.rs @@ -64,7 +64,7 @@ mod tests { let mut total_packets = 0; while now.elapsed().as_secs() < 5 { if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) { - total_packets += packets.packets.len(); + total_packets += packets.len(); all_packets.push(packets) } if total_packets >= num_expected_packets { @@ -72,7 +72,7 @@ mod tests { } } for batch in all_packets { - for p in &batch.packets { + for p in &batch { assert_eq!(p.meta.size, num_bytes); } } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 90026cbc66..d92ef025d9 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -254,7 +254,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { v.len(), ); for xv in v { - sent += xv.packets.len(); + sent += xv.len(); } verified_sender.send(v.to_vec()).unwrap(); } diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 64df3b1418..cdbc480552 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -42,8 +42,8 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) { .collect(); for batch in batches.iter_mut() { - total += batch.packets.len(); - for p in batch.packets.iter_mut() { + total += batch.len(); + for p in batch.iter_mut() { let ip_index = thread_rng().gen_range(0, ips.len()); p.meta.addr = ips[ip_index]; } @@ -54,7 +54,7 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) { SigVerifyStage::discard_excess_packets(&mut batches, 10_000); let mut num_packets = 0; for batch in batches.iter_mut() { - for p in batch.packets.iter_mut() { + for p in batch.iter_mut() { if !p.meta.discard() { num_packets += 1; } @@ -88,7 +88,7 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) { let mut batches = to_packet_batches(&vec![test_tx(); SIZE], CHUNK_SIZE); let spam_addr = new_rand_addr(&mut rng); for batch in batches.iter_mut() { - for packet in batch.packets.iter_mut() { + for packet in batch.iter_mut() { // One spam address, ~1000 unique addresses. packet.meta.addr = if rng.gen_ratio(1, 30) { new_rand_addr(&mut rng) @@ -101,7 +101,7 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) { SigVerifyStage::discard_excess_packets(&mut batches, 10_000); let mut num_packets = 0; for batch in batches.iter_mut() { - for packet in batch.packets.iter_mut() { + for packet in batch.iter_mut() { if !packet.meta.discard() { num_packets += 1; } @@ -158,7 +158,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { let mut sent_len = 0; for _ in 0..batches.len() { if let Some(batch) = batches.pop() { - sent_len += batch.packets.len(); + sent_len += batch.len(); packet_s.send(vec![batch]).unwrap(); } } @@ -167,7 +167,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { loop { if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { while let Some(v) = verifieds.pop() { - received += v.packets.len(); + received += v.len(); batches.push(v); } if use_same_tx || received >= sent_len { diff --git a/core/benches/unprocessed_packet_batches.rs b/core/benches/unprocessed_packet_batches.rs index 8cbac27b59..1e5caf0130 100644 --- a/core/benches/unprocessed_packet_batches.rs +++ b/core/benches/unprocessed_packet_batches.rs @@ -100,7 +100,7 @@ fn bench_packet_clone(bencher: &mut Bencher) { let mut outer_packet = Packet::default(); let mut timer = Measure::start("insert_batch"); - packet_batch.packets.iter().for_each(|packet| { + packet_batch.iter().for_each(|packet| { let mut packet = packet.clone(); packet.meta.sender_stake *= 2; if packet.meta.sender_stake > 2 { diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index 85df9a8b95..dd3e3eb23a 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -252,13 +252,13 @@ impl AncestorHashesService { ) -> Result<()> { let timeout = Duration::new(1, 0); let mut packet_batches = vec![response_receiver.recv_timeout(timeout)?]; - let mut total_packets = packet_batches[0].packets.len(); + let mut total_packets = packet_batches[0].len(); let mut dropped_packets = 0; while let Ok(batch) = response_receiver.try_recv() { - total_packets += batch.packets.len(); + total_packets += batch.len(); if packet_threshold.should_drop(total_packets) { - dropped_packets += batch.packets.len(); + dropped_packets += batch.len(); } else { packet_batches.push(batch); } @@ -292,7 +292,7 @@ impl AncestorHashesService { duplicate_slots_reset_sender: &DuplicateSlotsResetSender, retryable_slots_sender: &RetryableSlotsSender, ) { - packet_batch.packets.iter().for_each(|packet| { + packet_batch.iter().for_each(|packet| { let decision = Self::verify_and_process_ancestor_response( packet, ancestor_hashes_request_statuses, @@ -1116,7 +1116,7 @@ mod test { let mut response_packet = response_receiver .recv_timeout(Duration::from_millis(10_000)) .unwrap(); - let packet = &mut response_packet.packets[0]; + let packet = &mut response_packet[0]; packet.meta.set_socket_addr(&responder_info.serve_repair); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, @@ -1477,7 +1477,7 @@ mod test { let mut response_packet = response_receiver .recv_timeout(Duration::from_millis(10_000)) .unwrap(); - let packet = &mut response_packet.packets[0]; + let packet = &mut response_packet[0]; packet.meta.set_socket_addr(&responder_info.serve_repair); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 48b5a439a8..b57e05d624 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -21,7 +21,6 @@ use { solana_measure::measure::Measure, solana_metrics::inc_new_counter_info, solana_perf::{ - cuda_runtime::PinnedVec, data_budget::DataBudget, packet::{Packet, PacketBatch, PACKETS_PER_BATCH}, perf_libs, @@ -1969,7 +1968,7 @@ impl BankingStage { original_unprocessed_packets_len.saturating_sub(unprocessed_packets.len()) } - fn generate_packet_indexes(vers: &PinnedVec) -> Vec { + fn generate_packet_indexes(vers: &PacketBatch) -> Vec { vers.iter() .enumerate() .filter(|(_, pkt)| !pkt.meta.discard()) @@ -1984,12 +1983,11 @@ impl BankingStage { ) -> Result, RecvTimeoutError> { let start = Instant::now(); let mut packet_batches = verified_receiver.recv_timeout(recv_timeout)?; - let mut num_packets_received: usize = - packet_batches.iter().map(|batch| batch.packets.len()).sum(); + let mut num_packets_received: usize = packet_batches.iter().map(|batch| batch.len()).sum(); while let Ok(packet_batch) = verified_receiver.try_recv() { trace!("got more packet batches in banking stage"); let (packets_received, packet_count_overflowed) = num_packets_received - .overflowing_add(packet_batch.iter().map(|batch| batch.packets.len()).sum()); + .overflowing_add(packet_batch.iter().map(|batch| batch.len()).sum()); packet_batches.extend(packet_batch); // Spend any leftover receive time budget to greedily receive more packet batches, @@ -2025,7 +2023,7 @@ impl BankingStage { recv_time.stop(); let packet_batches_len = packet_batches.len(); - let packet_count: usize = packet_batches.iter().map(|x| x.packets.len()).sum(); + let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum(); debug!( "@{:?} process start stalled for: {:?}ms txs: {} id: {}", timestamp(), @@ -2039,14 +2037,11 @@ impl BankingStage { let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; for packet_batch in packet_batch_iter { - let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); + let packet_indexes = Self::generate_packet_indexes(&packet_batch); // Track all the packets incoming from sigverify, both valid and invalid slot_metrics_tracker.increment_total_new_valid_packets(packet_indexes.len() as u64); slot_metrics_tracker.increment_newly_failed_sigverify_count( - packet_batch - .packets - .len() - .saturating_sub(packet_indexes.len()) as u64, + packet_batch.len().saturating_sub(packet_indexes.len()) as u64, ); Self::push_unprocessed( @@ -2332,8 +2327,7 @@ mod tests { mut with_vers: Vec<(PacketBatch, Vec)>, ) -> Vec { with_vers.iter_mut().for_each(|(b, v)| { - b.packets - .iter_mut() + b.iter_mut() .zip(v) .for_each(|(p, f)| p.meta.set_discard(*f == 0)) }); diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index a52efadcab..3148d4649c 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -303,8 +303,8 @@ impl ClusterInfoVoteListener { .zip(packet_batches) .filter(|(_, packet_batch)| { // to_packet_batches() above splits into 1 packet long batches - assert_eq!(packet_batch.packets.len(), 1); - !packet_batch.packets[0].meta.discard() + assert_eq!(packet_batch.len(), 1); + !packet_batch[0].meta.discard() }) .filter_map(|(tx, packet_batch)| { let (vote_account_key, vote, ..) = vote_parser::parse_vote_transaction(&tx)?; @@ -1515,7 +1515,7 @@ mod tests { fn verify_packets_len(packets: &[VerifiedVoteMetadata], ref_value: usize) { let num_packets: usize = packets .iter() - .map(|vote_metadata| vote_metadata.packet_batch.packets.len()) + .map(|vote_metadata| vote_metadata.packet_batch.len()) .sum(); assert_eq!(num_packets, ref_value); } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 5a702f0312..d78ae84a1a 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -97,12 +97,12 @@ impl FetchStage { }; let mut packet_batch = recvr.recv()?; - let mut num_packets = packet_batch.packets.len(); - packet_batch.packets.iter_mut().for_each(mark_forwarded); + let mut num_packets = packet_batch.len(); + packet_batch.iter_mut().for_each(mark_forwarded); let mut packet_batches = vec![packet_batch]; while let Ok(mut packet_batch) = recvr.try_recv() { - packet_batch.packets.iter_mut().for_each(mark_forwarded); - num_packets += packet_batch.packets.len(); + packet_batch.iter_mut().for_each(mark_forwarded); + num_packets += packet_batch.len(); packet_batches.push(packet_batch); // Read at most 1K transactions in a loop if num_packets > 1024 { diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs index 399a190e62..e068402773 100644 --- a/core/src/find_packet_sender_stake_stage.rs +++ b/core/src/find_packet_sender_stake_stage.rs @@ -160,7 +160,7 @@ impl FindPacketSenderStakeStage { PAR_THREAD_POOL.install(|| { batches .into_par_iter() - .flat_map(|batch| batch.packets.par_iter_mut()) + .flat_map(|batch| batch.par_iter_mut()) .for_each(|packet| { packet.meta.sender_stake = ip_to_stake .get(&packet.meta.addr) diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 76c887adc7..6f110c9487 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -327,13 +327,13 @@ impl ServeRepair { //TODO cache connections let timeout = Duration::new(1, 0); let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?]; - let mut total_packets = reqs_v[0].packets.len(); + let mut total_packets = reqs_v[0].len(); let mut dropped_packets = 0; while let Ok(more) = requests_receiver.try_recv() { - total_packets += more.packets.len(); + total_packets += more.len(); if packet_threshold.should_drop(total_packets) { - dropped_packets += more.packets.len(); + dropped_packets += more.len(); } else { reqs_v.push(more); } @@ -431,7 +431,7 @@ impl ServeRepair { stats: &mut ServeRepairStats, ) { // iter over the packets - packet_batch.packets.iter().for_each(|packet| { + packet_batch.iter().for_each(|packet| { let from_addr = packet.meta.socket_addr(); limited_deserialize(&packet.data[..packet.meta.size]) .into_iter() @@ -700,11 +700,12 @@ impl ServeRepair { nonce, ); if let Some(packet) = packet { - res.packets.push(packet); + res.push(packet); } else { break; } - if meta.parent_slot.is_some() && res.packets.len() < max_responses { + + if meta.parent_slot.is_some() && res.len() < max_responses { slot = meta.parent_slot.unwrap(); } else { break; @@ -810,10 +811,9 @@ mod tests { ) .expect("packets"); let request = ShredRepairType::HighestShred(slot, index); - verify_responses(&request, rv.packets.iter()); + verify_responses(&request, rv.iter()); let rv: Vec = rv - .packets .into_iter() .filter_map(|b| { assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce); @@ -896,9 +896,8 @@ mod tests { ) .expect("packets"); let request = ShredRepairType::Shred(slot, index); - verify_responses(&request, rv.packets.iter()); + verify_responses(&request, rv.iter()); let rv: Vec = rv - .packets .into_iter() .filter_map(|b| { assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce); @@ -1058,7 +1057,6 @@ mod tests { nonce, ) .expect("run_orphan packets") - .packets .iter() .cloned() .collect(); @@ -1128,7 +1126,6 @@ mod tests { nonce, ) .expect("run_orphan packets") - .packets .iter() .cloned() .collect(); @@ -1179,8 +1176,7 @@ mod tests { slot + num_slots, nonce, ) - .expect("run_ancestor_hashes packets") - .packets; + .expect("run_ancestor_hashes packets"); assert_eq!(rv.len(), 1); let packet = &rv[0]; let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet); @@ -1195,8 +1191,7 @@ mod tests { slot + num_slots - 1, nonce, ) - .expect("run_ancestor_hashes packets") - .packets; + .expect("run_ancestor_hashes packets"); assert_eq!(rv.len(), 1); let packet = &rv[0]; let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet); @@ -1218,8 +1213,7 @@ mod tests { slot + num_slots - 1, nonce, ) - .expect("run_ancestor_hashes packets") - .packets; + .expect("run_ancestor_hashes packets"); assert_eq!(rv.len(), 1); let packet = &rv[0]; let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 642ad2e5ed..f098fa553b 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -5,11 +5,7 @@ use { crossbeam_channel::{unbounded, Sender}, lru::LruCache, solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}, - solana_perf::{ - cuda_runtime::PinnedVec, - packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags}, - recycler::Recycler, - }, + solana_perf::packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags}, solana_runtime::bank_forks::BankForks, solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats}, @@ -98,8 +94,8 @@ impl ShredFetchStage { slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch()); } } - stats.shred_count += packet_batch.packets.len(); - packet_batch.packets.iter_mut().for_each(|packet| { + stats.shred_count += packet_batch.len(); + packet_batch.iter_mut().for_each(|packet| { Self::process_packet( packet, &mut shreds_received, @@ -122,7 +118,7 @@ impl ShredFetchStage { sockets: Vec>, exit: &Arc, sender: Sender>, - recycler: Recycler>, + recycler: PacketBatchRecycler, bank_forks: Option>>, name: &'static str, modify: F, @@ -162,7 +158,7 @@ impl ShredFetchStage { bank_forks: Option>>, exit: &Arc, ) -> Self { - let recycler: PacketBatchRecycler = Recycler::warmed(100, 1024); + let recycler = PacketBatchRecycler::warmed(100, 1024); let (mut tvu_threads, tvu_filter) = Self::packet_modifier( sockets, diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index e5f6567c41..97dba300a0 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -35,7 +35,7 @@ impl ShredSigVerifier { fn read_slots(batches: &[PacketBatch]) -> HashSet { batches .iter() - .flat_map(|batch| batch.packets.iter().filter_map(Shred::get_slot_from_packet)) + .flat_map(|batch| batch.iter().filter_map(Shred::get_slot_from_packet)) .collect() } } @@ -91,13 +91,18 @@ pub mod tests { 0, 0xc0de, ); - let mut batches = [PacketBatch::default(), PacketBatch::default()]; + let mut batches: Vec<_> = (0..2) + .map(|_| { + let mut batch = PacketBatch::with_capacity(1); + batch.resize(1, Packet::default()); + batch + }) + .collect(); let keypair = Keypair::new(); shred.sign(&keypair); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[0].meta.size = shred.payload().len(); + batches[0][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][0].meta.size = shred.payload().len(); let mut shred = Shred::new_from_data( 0xc0de_dead, @@ -110,9 +115,8 @@ pub mod tests { 0xc0de, ); shred.sign(&keypair); - batches[1].packets.resize(1, Packet::default()); - batches[1].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[1].packets[0].meta.size = shred.payload().len(); + batches[1][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[1][0].meta.size = shred.payload().len(); let expected: HashSet = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect(); assert_eq!(ShredSigVerifier::read_slots(&batches), expected); @@ -129,8 +133,10 @@ pub mod tests { let bf = Arc::new(RwLock::new(BankForks::new(bank))); let verifier = ShredSigVerifier::new(bf, cache); - let mut batches = vec![PacketBatch::default()]; - batches[0].packets.resize(2, Packet::default()); + let batch_size = 2; + let mut batch = PacketBatch::with_capacity(batch_size); + batch.resize(batch_size, Packet::default()); + let mut batches = vec![batch]; let mut shred = Shred::new_from_data( 0, @@ -143,8 +149,8 @@ pub mod tests { 0xc0de, ); shred.sign(&leader_keypair); - batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[0].meta.size = shred.payload().len(); + batches[0][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][0].meta.size = shred.payload().len(); let mut shred = Shred::new_from_data( 0, @@ -158,12 +164,12 @@ pub mod tests { ); let wrong_keypair = Keypair::new(); shred.sign(&wrong_keypair); - batches[0].packets[1].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[1].meta.size = shred.payload().len(); + batches[0][1].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][1].meta.size = shred.payload().len(); let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches); let rv = verifier.verify_batches(batches, num_packets); - assert!(!rv[0].packets[0].meta.discard()); - assert!(rv[0].packets[1].meta.discard()); + assert!(!rv[0][0].meta.discard()); + assert!(rv[0][1].meta.discard()); } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 49c57dc90d..c1b1a8b276 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -226,7 +226,7 @@ impl SigVerifyStage { let mut addrs = batches .iter_mut() .rev() - .flat_map(|batch| batch.packets.iter_mut().rev()) + .flat_map(|batch| batch.iter_mut().rev()) .filter(|packet| !packet.meta.discard()) .map(|packet| (packet.meta.addr, packet)) .into_group_map(); @@ -423,7 +423,6 @@ mod tests { .iter() .map(|batch| { batch - .packets .iter() .map(|p| if p.meta.discard() { 0 } else { 1 }) .sum::() @@ -434,18 +433,19 @@ mod tests { #[test] fn test_packet_discard() { solana_logger::setup(); - let mut batch = PacketBatch::default(); - batch.packets.resize(10, Packet::default()); - batch.packets[3].meta.addr = std::net::IpAddr::from([1u16; 8]); - batch.packets[3].meta.set_discard(true); - batch.packets[4].meta.addr = std::net::IpAddr::from([2u16; 8]); + let batch_size = 10; + let mut batch = PacketBatch::with_capacity(batch_size); + batch.resize(batch_size, Packet::default()); + batch[3].meta.addr = std::net::IpAddr::from([1u16; 8]); + batch[3].meta.set_discard(true); + batch[4].meta.addr = std::net::IpAddr::from([2u16; 8]); let mut batches = vec![batch]; let max = 3; SigVerifyStage::discard_excess_packets(&mut batches, max); assert_eq!(count_non_discard(&batches), max); - assert!(!batches[0].packets[0].meta.discard()); - assert!(batches[0].packets[3].meta.discard()); - assert!(!batches[0].packets[4].meta.discard()); + assert!(!batches[0][0].meta.discard()); + assert!(batches[0][3].meta.discard()); + assert!(!batches[0][4].meta.discard()); } fn gen_batches(use_same_tx: bool) -> Vec { let len = 4096; @@ -480,7 +480,7 @@ mod tests { let mut sent_len = 0; for _ in 0..batches.len() { if let Some(batch) = batches.pop() { - sent_len += batch.packets.len(); + sent_len += batch.len(); packet_s.send(vec![batch]).unwrap(); } } @@ -489,7 +489,7 @@ mod tests { loop { if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { while let Some(v) = verifieds.pop() { - received += v.packets.len(); + received += v.len(); batches.push(v); } if use_same_tx || received >= sent_len { diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 4c1842c35f..edf62358cf 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -350,7 +350,7 @@ pub fn deserialize_packets<'a>( packet_indexes: &'a [usize], ) -> impl Iterator + 'a { packet_indexes.iter().filter_map(move |packet_index| { - DeserializedPacket::new(packet_batch.packets[*packet_index].clone()).ok() + DeserializedPacket::new(packet_batch[*packet_index].clone()).ok() }) } diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index 75e0a0525c..4bac5d9ade 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -428,10 +428,10 @@ mod tests { for _ in 0..num_expected_batches { let validator_batch: Vec = gossip_votes_iterator.next().unwrap(); assert_eq!(validator_batch.len(), slot_hashes.slot_hashes().len()); - let expected_len = validator_batch[0].packets.len(); + let expected_len = validator_batch[0].len(); assert!(validator_batch .iter() - .all(|batch| batch.packets.len() == expected_len)); + .all(|batch| batch.len() == expected_len)); } // Should be empty now diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 49b39e5be7..004e3c1cb3 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -353,8 +353,8 @@ where F: Fn(&Shred, Arc, /*last root:*/ Slot) -> bool + Sync, { let timer = Duration::from_millis(200); - let mut packets = verified_receiver.recv_timeout(timer)?; - packets.extend(verified_receiver.try_iter().flatten()); + let mut packet_batches = verified_receiver.recv_timeout(timer)?; + packet_batches.extend(verified_receiver.try_iter().flatten()); let now = Instant::now(); let last_root = blockstore.last_root(); let working_bank = bank_forks.read().unwrap().working_bank(); @@ -384,9 +384,9 @@ where } }; let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { - packets + packet_batches .par_iter() - .flat_map_iter(|pkt| pkt.packets.iter().filter_map(handle_packet)) + .flat_map_iter(|packet_batch| packet_batch.iter().filter_map(handle_packet)) .unzip() }); // Exclude repair packets from retransmit. @@ -406,8 +406,14 @@ where } insert_shred_sender.send((shreds, repair_infos))?; - stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::(); - for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) { + stats.num_packets += packet_batches + .iter() + .map(|packet_batch| packet_batch.len()) + .sum::(); + for packet in packet_batches + .iter() + .flat_map(|packet_batch| packet_batch.iter()) + { let addr = packet.meta.socket_addr(); *stats.addrs.entry(addr).or_default() += 1; } diff --git a/entry/src/entry.rs b/entry/src/entry.rs index f1f7f728ac..addc04127a 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -517,20 +517,16 @@ pub fn start_verify_transactions( // uninitialized anyway, so the initilization would simply write junk into // the vector anyway. unsafe { - packet_batch.packets.set_len(vec_size); + packet_batch.set_len(vec_size); } let entry_tx_iter = slice .into_par_iter() .map(|tx| tx.to_versioned_transaction()); - let res = packet_batch - .packets - .par_iter_mut() - .zip(entry_tx_iter) - .all(|pair| { - pair.0.meta = Meta::default(); - Packet::populate_packet(pair.0, None, &pair.1).is_ok() - }); + let res = packet_batch.par_iter_mut().zip(entry_tx_iter).all(|pair| { + pair.0.meta = Meta::default(); + Packet::populate_packet(pair.0, None, &pair.1).is_ok() + }); if res { Ok(packet_batch) } else { @@ -552,7 +548,7 @@ pub fn start_verify_transactions( ); let verified = packet_batches .iter() - .all(|batch| batch.packets.iter().all(|p| !p.meta.discard())); + .all(|batch| batch.iter().all(|p| !p.meta.discard())); verify_time.stop(); (verified, verify_time.as_us()) }); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 84158035f4..3aec6dd833 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1572,7 +1572,7 @@ impl ClusterInfo { ); self.stats .packets_sent_gossip_requests_count - .add_relaxed(packet_batch.packets.len() as u64); + .add_relaxed(packet_batch.len() as u64); sender.send(packet_batch)?; } Ok(()) @@ -1850,7 +1850,7 @@ impl ClusterInfo { if !response.is_empty() { self.stats .packets_sent_pull_responses_count - .add_relaxed(response.packets.len() as u64); + .add_relaxed(response.len() as u64); let _ = response_sender.send(response); } } @@ -1890,7 +1890,7 @@ impl ClusterInfo { if let Some(ping) = ping { let ping = Protocol::PingMessage(ping); match Packet::from_data(Some(&node.1), ping) { - Ok(packet) => packet_batch.packets.push(packet), + Ok(packet) => packet_batch.push(packet), Err(err) => error!("failed to write ping packet: {:?}", err), }; } @@ -1997,7 +1997,7 @@ impl ClusterInfo { Ok(packet) => { if self.outbound_budget.take(packet.meta.size) { total_bytes += packet.meta.size; - packet_batch.packets.push(packet); + packet_batch.push(packet); sent += 1; } else { self.stats.gossip_pull_request_no_budget.add_relaxed(1); @@ -2285,10 +2285,10 @@ impl ClusterInfo { "handle_batch_push_messages", &prune_messages, ); - let num_prune_packets = packet_batch.packets.len(); + let num_prune_packets = packet_batch.len(); self.stats .push_response_count - .add_relaxed(packet_batch.packets.len() as u64); + .add_relaxed(packet_batch.len() as u64); let new_push_requests = self.new_push_requests(stakes); self.stats .push_message_pushes @@ -2296,7 +2296,7 @@ impl ClusterInfo { for (address, request) in new_push_requests { if ContactInfo::is_valid_address(&address, &self.socket_addr_space) { match Packet::from_data(Some(&address), &request) { - Ok(packet) => packet_batch.packets.push(packet), + Ok(packet) => packet_batch.push(packet), Err(err) => error!("failed to write push-request packet: {:?}", err), } } else { @@ -2308,7 +2308,7 @@ impl ClusterInfo { .add_relaxed(num_prune_packets as u64); self.stats .packets_sent_push_messages_count - .add_relaxed((packet_batch.packets.len() - num_prune_packets) as u64); + .add_relaxed((packet_batch.len() - num_prune_packets) as u64); let _ = response_sender.send(packet_batch); } @@ -2450,10 +2450,10 @@ impl ClusterInfo { thread_pool: &ThreadPool, ) -> Result<(), GossipError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); - let packets: Vec<_> = receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); + let packets: Vec<_> = receiver.recv_timeout(RECV_TIMEOUT)?.into(); let mut packets = VecDeque::from(packets); for packet_batch in receiver.try_iter() { - packets.extend(packet_batch.packets.iter().cloned()); + packets.extend(packet_batch.iter().cloned()); let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); if excess_count > 0 { packets.drain(0..excess_count); @@ -3226,8 +3226,7 @@ mod tests { .zip(pings.into_iter()), &recycler, ) - .unwrap() - .packets; + .unwrap(); assert_eq!(remote_nodes.len(), packets.len()); for (packet, (_, socket), pong) in izip!( packets.into_iter(), diff --git a/ledger/benches/sigverify_shreds.rs b/ledger/benches/sigverify_shreds.rs index 4b27d6ad36..324cac542a 100644 --- a/ledger/benches/sigverify_shreds.rs +++ b/ledger/benches/sigverify_shreds.rs @@ -21,13 +21,10 @@ const NUM_BATCHES: usize = 1; fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { let recycler_cache = RecyclerCache::default(); - let mut packet_batch = PacketBatch::default(); - packet_batch.packets.set_pinnable(); + let mut packet_batch = PacketBatch::new_pinned_with_capacity(NUM_PACKETS); + packet_batch.resize(NUM_PACKETS, Packet::default()); let slot = 0xdead_c0de; - // need to pin explicitly since the resize will not cause re-allocation - packet_batch.packets.reserve_and_pin(NUM_PACKETS); - packet_batch.packets.resize(NUM_PACKETS, Packet::default()); - for p in packet_batch.packets.iter_mut() { + for p in packet_batch.iter_mut() { let shred = Shred::new_from_data( slot, 0xc0de, @@ -57,8 +54,8 @@ fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) { let mut packet_batch = PacketBatch::default(); let slot = 0xdead_c0de; - packet_batch.packets.resize(NUM_PACKETS, Packet::default()); - for p in packet_batch.packets.iter_mut() { + packet_batch.resize(NUM_PACKETS, Packet::default()); + for p in packet_batch.iter_mut() { let shred = Shred::new_from_data( slot, 0xc0de, diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 719fa210e7..e83c1de1b3 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -88,7 +88,6 @@ fn verify_shreds_cpu( .into_par_iter() .map(|batch| { batch - .packets .par_iter() .map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0)) .collect() @@ -114,7 +113,6 @@ fn slot_key_data_for_gpu< .into_par_iter() .map(|batch| { batch - .packets .iter() .map(|packet| { let slot_start = size_of::() + size_of::(); @@ -200,7 +198,7 @@ fn shred_gpu_offsets( let mut v_sig_lens = vec![]; for batch in batches.iter() { let mut sig_lens = Vec::new(); - for packet in batch.packets.iter() { + for packet in batch.iter() { let sig_start = pubkeys_end; let sig_end = sig_start + size_of::(); let msg_start = sig_end; @@ -258,13 +256,13 @@ pub fn verify_shreds_gpu( for batch in batches { elems.push(perf_libs::Elems { - elems: batch.packets.as_ptr(), - num: batch.packets.len() as u32, + elems: batch.as_ptr(), + num: batch.len() as u32, }); let mut v = Vec::new(); - v.resize(batch.packets.len(), 0); + v.resize(batch.len(), 0); rvs.push(v); - num_packets += batch.packets.len(); + num_packets += batch.len(); } out.resize(signature_offsets.len(), 0); @@ -327,7 +325,7 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [PacketBatch]) { debug!("CPU SHRED ECDSA for {}", packet_count); SIGVERIFY_THREAD_POOL.install(|| { batches.par_iter_mut().for_each(|batch| { - batch.packets[..] + batch[..] .par_iter_mut() .for_each(|p| sign_shred_cpu(keypair, p)); }); @@ -395,12 +393,12 @@ pub fn sign_shreds_gpu( for batch in batches.iter() { elems.push(perf_libs::Elems { - elems: batch.packets.as_ptr(), - num: batch.packets.len() as u32, + elems: batch.as_ptr(), + num: batch.len() as u32, }); let mut v = Vec::new(); - v.resize(batch.packets.len(), 0); - num_packets += batch.packets.len(); + v.resize(batch.len(), 0); + num_packets += batch.len(); } trace!("Starting verify num packets: {}", num_packets); @@ -427,7 +425,7 @@ pub fn sign_shreds_gpu( } trace!("done sign"); let mut sizes: Vec = vec![0]; - sizes.extend(batches.iter().map(|b| b.packets.len())); + sizes.extend(batches.iter().map(|b| b.len())); for i in 0..sizes.len() { if i == 0 { continue; @@ -440,7 +438,7 @@ pub fn sign_shreds_gpu( .enumerate() .for_each(|(batch_ix, batch)| { let num_packets = sizes[batch_ix]; - batch.packets[..] + batch[..] .par_iter_mut() .enumerate() .for_each(|(packet_ix, packet)| { @@ -523,9 +521,9 @@ pub mod tests { ); let keypair = Keypair::new(); shred.sign(&keypair); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[0].meta.size = shred.payload().len(); + batches[0].resize(1, Packet::default()); + batches[0][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][0].meta.size = shred.payload().len(); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() @@ -550,7 +548,7 @@ pub mod tests { .iter() .cloned() .collect(); - batches[0].packets[0].meta.size = 0; + batches[0][0].meta.size = 0; let rv = verify_shreds_cpu(&batches, &leader_slots); assert_eq!(rv, vec![vec![0]]); } @@ -577,9 +575,9 @@ pub mod tests { ); let keypair = Keypair::new(); shred.sign(&keypair); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[0].meta.size = shred.payload().len(); + batches[0].resize(1, Packet::default()); + batches[0][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][0].meta.size = shred.payload().len(); let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), @@ -606,7 +604,7 @@ pub mod tests { let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); - batches[0].packets[0].meta.size = 0; + batches[0][0].meta.size = 0; let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), (slot, keypair.pubkey().to_bytes()), @@ -627,11 +625,12 @@ pub mod tests { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); - let mut packet_batch = PacketBatch::default(); let num_packets = 32; let num_batches = 100; - packet_batch.packets.resize(num_packets, Packet::default()); - for (i, p) in packet_batch.packets.iter_mut().enumerate() { + let mut packet_batch = PacketBatch::with_capacity(num_packets); + packet_batch.resize(num_packets, Packet::default()); + + for (i, p) in packet_batch.iter_mut().enumerate() { let shred = Shred::new_from_data( slot, 0xc0de, @@ -687,9 +686,10 @@ pub mod tests { 0, 0xc0de, ); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[0].meta.size = shred.payload().len(); + batches[0].resize(1, Packet::default()); + batches[0][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][0].meta.size = shred.payload().len(); + let pubkeys = [ (slot, keypair.pubkey().to_bytes()), (std::u64::MAX, Pubkey::default().to_bytes()), diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index adc8a408c3..a6911b8f58 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -30,7 +30,7 @@ fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) deduper.reset(); batches .iter_mut() - .for_each(|b| b.packets.iter_mut().for_each(|p| p.meta.set_discard(false))); + .for_each(|b| b.iter_mut().for_each(|p| p.meta.set_discard(false))); }); } diff --git a/perf/benches/shrink.rs b/perf/benches/shrink.rs index cf60fd90e5..878b84e181 100644 --- a/perf/benches/shrink.rs +++ b/perf/benches/shrink.rs @@ -26,8 +26,7 @@ fn do_bench_shrink_packets(bencher: &mut Bencher, mut batches: Vec) bencher.iter(|| { let _ans = sigverify::shrink_batches(&mut batches); batches.iter_mut().for_each(|b| { - b.packets - .iter_mut() + b.iter_mut() .for_each(|p| p.meta.set_discard(thread_rng().gen())) }); }); @@ -75,8 +74,7 @@ fn bench_shrink_count_packets(bencher: &mut Bencher) { PACKETS_PER_BATCH, ); batches.iter_mut().for_each(|b| { - b.packets - .iter_mut() + b.iter_mut() .for_each(|p| p.meta.set_discard(thread_rng().gen())) }); diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 1d52638696..748a295cee 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -56,8 +56,8 @@ fn bench_sigverify_uneven(bencher: &mut Bencher) { current_packets = num_packets; } let mut batch = PacketBatch::with_capacity(len); - batch.packets.resize(len, Packet::default()); - for packet in batch.packets.iter_mut() { + batch.resize(len, Packet::default()); + for packet in batch.iter_mut() { if thread_rng().gen_ratio(1, 2) { tx = simple_tx.clone(); } else { diff --git a/perf/src/discard.rs b/perf/src/discard.rs index 56f5636980..792de59109 100644 --- a/perf/src/discard.rs +++ b/perf/src/discard.rs @@ -11,7 +11,7 @@ pub fn discard_batches_randomly( while total_packets > max_packets { let index = thread_rng().gen_range(0, batches.len()); let removed = batches.swap_remove(index); - total_packets = total_packets.saturating_sub(removed.packets.len()); + total_packets = total_packets.saturating_sub(removed.len()); } total_packets } @@ -24,7 +24,7 @@ mod tests { fn test_batch_discard_random() { solana_logger::setup(); let mut batch = PacketBatch::default(); - batch.packets.resize(1, Packet::default()); + batch.resize(1, Packet::default()); let num_batches = 100; let mut batches = vec![batch; num_batches]; let max = 5; diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 0cd584f516..f8bbefe26a 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -3,8 +3,14 @@ pub use solana_sdk::packet::{Meta, Packet, PacketFlags, PACKET_DATA_SIZE}; use { crate::{cuda_runtime::PinnedVec, recycler::Recycler}, bincode::config::Options, + rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator}, serde::{de::DeserializeOwned, Serialize}, - std::{io::Read, net::SocketAddr}, + std::{ + io::Read, + net::SocketAddr, + ops::{Index, IndexMut}, + slice::{Iter, IterMut, SliceIndex}, + }, }; pub const NUM_PACKETS: usize = 1024 * 8; @@ -14,7 +20,7 @@ pub const NUM_RCVMMSGS: usize = 64; #[derive(Debug, Default, Clone)] pub struct PacketBatch { - pub packets: PinnedVec, + packets: PinnedVec, } pub type PacketBatchRecycler = Recycler>; @@ -30,23 +36,29 @@ impl PacketBatch { PacketBatch { packets } } + pub fn new_pinned_with_capacity(capacity: usize) -> Self { + let mut batch = Self::with_capacity(capacity); + batch.packets.reserve_and_pin(capacity); + batch + } + pub fn new_unpinned_with_recycler( recycler: PacketBatchRecycler, - size: usize, + capacity: usize, name: &'static str, ) -> Self { let mut packets = recycler.allocate(name); - packets.reserve(size); + packets.reserve(capacity); PacketBatch { packets } } pub fn new_with_recycler( recycler: PacketBatchRecycler, - size: usize, + capacity: usize, name: &'static str, ) -> Self { let mut packets = recycler.allocate(name); - packets.reserve_and_pin(size); + packets.reserve_and_pin(capacity); PacketBatch { packets } } @@ -96,15 +108,106 @@ impl PacketBatch { batch } + pub fn resize(&mut self, new_len: usize, value: Packet) { + self.packets.resize(new_len, value) + } + + pub fn truncate(&mut self, len: usize) { + self.packets.truncate(len); + } + + pub fn push(&mut self, packet: Packet) { + self.packets.push(packet); + } + pub fn set_addr(&mut self, addr: &SocketAddr) { - for p in self.packets.iter_mut() { + for p in self.iter_mut() { p.meta.set_socket_addr(addr); } } + pub fn len(&self) -> usize { + self.packets.len() + } + + pub fn capacity(&self) -> usize { + self.packets.capacity() + } + pub fn is_empty(&self) -> bool { self.packets.is_empty() } + + pub fn as_ptr(&self) -> *const Packet { + self.packets.as_ptr() + } + + pub fn iter(&self) -> Iter<'_, Packet> { + self.packets.iter() + } + + pub fn iter_mut(&mut self) -> IterMut<'_, Packet> { + self.packets.iter_mut() + } + + /// See Vector::set_len() for more details + /// + /// # Safety + /// + /// - `new_len` must be less than or equal to [`capacity()`]. + /// - The elements at `old_len..new_len` must be initialized. Packet data + /// will likely be overwritten when populating the packet, but the meta + /// should specifically be initialized to known values. + pub unsafe fn set_len(&mut self, new_len: usize) { + self.packets.set_len(new_len); + } +} + +impl> Index for PacketBatch { + type Output = I::Output; + + #[inline] + fn index(&self, index: I) -> &Self::Output { + &self.packets[index] + } +} + +impl> IndexMut for PacketBatch { + #[inline] + fn index_mut(&mut self, index: I) -> &mut Self::Output { + &mut self.packets[index] + } +} + +impl<'a> IntoIterator for &'a PacketBatch { + type Item = &'a Packet; + type IntoIter = Iter<'a, Packet>; + + fn into_iter(self) -> Self::IntoIter { + self.packets.iter() + } +} + +impl<'a> IntoParallelIterator for &'a PacketBatch { + type Iter = rayon::slice::Iter<'a, Packet>; + type Item = &'a Packet; + fn into_par_iter(self) -> Self::Iter { + self.packets.par_iter() + } +} + +impl<'a> IntoParallelIterator for &'a mut PacketBatch { + type Iter = rayon::slice::IterMut<'a, Packet>; + type Item = &'a mut Packet; + fn into_par_iter(self) -> Self::Iter { + self.packets.par_iter_mut() + } +} + +impl From for Vec { + fn from(batch: PacketBatch) -> Self { + batch.packets.into() + } } pub fn to_packet_batches(items: &[T], chunk_size: usize) -> Vec { @@ -112,7 +215,7 @@ pub fn to_packet_batches(items: &[T], chunk_size: usize) -> Vec usize { - batches.iter().map(|batch| batch.packets.len()).sum() + batches.iter().map(|batch| batch.len()).sum() } pub fn count_valid_packets(batches: &[PacketBatch]) -> usize { batches .iter() - .map(|batch| batch.packets.iter().filter(|p| !p.meta.discard()).count()) + .map(|batch| batch.iter().filter(|p| !p.meta.discard()).count()) .sum() } @@ -396,7 +396,6 @@ pub fn generate_offsets( .iter_mut() .map(|batch| { batch - .packets .iter_mut() .map(|packet| { let packet_offsets = @@ -499,7 +498,7 @@ impl Deduper { pub fn dedup_packets_and_count_discards(&self, batches: &mut [PacketBatch]) -> u64 { batches .iter_mut() - .flat_map(|batch| batch.packets.iter_mut().map(|p| self.dedup_packet(p))) + .flat_map(|batch| batch.iter_mut().map(|p| self.dedup_packet(p))) .sum() } } @@ -510,28 +509,25 @@ pub fn shrink_batches(batches: &mut [PacketBatch]) -> usize { let mut valid_packet_ix = 0; let mut last_valid_batch = 0; for batch_ix in 0..batches.len() { - for packet_ix in 0..batches[batch_ix].packets.len() { - if batches[batch_ix].packets[packet_ix].meta.discard() { + for packet_ix in 0..batches[batch_ix].len() { + if batches[batch_ix][packet_ix].meta.discard() { continue; } last_valid_batch = batch_ix.saturating_add(1); let mut found_spot = false; while valid_batch_ix < batch_ix && !found_spot { - while valid_packet_ix < batches[valid_batch_ix].packets.len() { - if batches[valid_batch_ix].packets[valid_packet_ix] - .meta - .discard() - { - batches[valid_batch_ix].packets[valid_packet_ix] = - batches[batch_ix].packets[packet_ix].clone(); - batches[batch_ix].packets[packet_ix].meta.set_discard(true); + while valid_packet_ix < batches[valid_batch_ix].len() { + if batches[valid_batch_ix][valid_packet_ix].meta.discard() { + batches[valid_batch_ix][valid_packet_ix] = + batches[batch_ix][packet_ix].clone(); + batches[batch_ix][packet_ix].meta.set_discard(true); last_valid_batch = valid_batch_ix.saturating_add(1); found_spot = true; break; } valid_packet_ix = valid_packet_ix.saturating_add(1); } - if valid_packet_ix >= batches[valid_batch_ix].packets.len() { + if valid_packet_ix >= batches[valid_batch_ix].len() { valid_packet_ix = 0; valid_batch_ix = valid_batch_ix.saturating_add(1); } @@ -547,7 +543,6 @@ pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, pa PAR_THREAD_POOL.install(|| { batches.into_par_iter().for_each(|batch| { batch - .packets .par_iter_mut() .for_each(|p| verify_packet(p, reject_non_vote)) }); @@ -559,12 +554,9 @@ pub fn ed25519_verify_disabled(batches: &mut [PacketBatch]) { use rayon::prelude::*; let packet_count = count_packets_in_batches(batches); debug!("disabled ECDSA for {}", packet_count); - batches.into_par_iter().for_each(|batch| { - batch - .packets - .par_iter_mut() - .for_each(|p| p.meta.set_discard(false)) - }); + batches + .into_par_iter() + .for_each(|batch| batch.par_iter_mut().for_each(|p| p.meta.set_discard(false))); inc_new_counter_debug!("ed25519_verify_disabled", packet_count); } @@ -621,7 +613,7 @@ pub fn get_checked_scalar(scalar: &[u8; 32]) -> Result<[u8; 32], PacketError> { pub fn mark_disabled(batches: &mut [PacketBatch], r: &[Vec]) { for (batch, v) in batches.iter_mut().zip(r) { - for (pkt, f) in batch.packets.iter_mut().zip(v) { + for (pkt, f) in batch.iter_mut().zip(v) { if !pkt.meta.discard() { pkt.meta.set_discard(*f == 0); } @@ -672,12 +664,12 @@ pub fn ed25519_verify( let mut num_packets: usize = 0; for batch in batches.iter() { elems.push(perf_libs::Elems { - elems: batch.packets.as_ptr(), - num: batch.packets.len() as u32, + elems: batch.as_ptr(), + num: batch.len() as u32, }); - let v = vec![0u8; batch.packets.len()]; + let v = vec![0u8; batch.len()]; rvs.push(v); - num_packets = num_packets.saturating_add(batch.packets.len()); + num_packets = num_packets.saturating_add(batch.len()); } out.resize(signature_offsets.len(), 0); trace!("Starting verify num packets: {}", num_packets); @@ -743,14 +735,15 @@ mod tests { #[test] fn test_mark_disabled() { - let mut batch = PacketBatch::default(); - batch.packets.push(Packet::default()); + let batch_size = 1; + let mut batch = PacketBatch::with_capacity(batch_size); + batch.resize(batch_size, Packet::default()); let mut batches: Vec = vec![batch]; mark_disabled(&mut batches, &[vec![0]]); - assert!(batches[0].packets[0].meta.discard()); - batches[0].packets[0].meta.set_discard(false); + assert!(batches[0][0].meta.discard()); + batches[0][0].meta.set_discard(false); mark_disabled(&mut batches, &[vec![1]]); - assert!(!batches[0].packets[0].meta.discard()); + assert!(!batches[0][0].meta.discard()); } #[test] @@ -854,7 +847,7 @@ mod tests { packet.meta.set_discard(false); let mut batches = generate_packet_batches(&packet, 1, 1); ed25519_verify(&mut batches); - assert!(batches[0].packets[0].meta.discard()); + assert!(batches[0][0].meta.discard()); } #[test] @@ -890,7 +883,7 @@ mod tests { packet.meta.set_discard(false); let mut batches = generate_packet_batches(&packet, 1, 1); ed25519_verify(&mut batches); - assert!(batches[0].packets[0].meta.discard()); + assert!(batches[0][0].meta.discard()); } #[test] @@ -1058,13 +1051,12 @@ mod tests { // generate packet vector let batches: Vec<_> = (0..num_batches) .map(|_| { - let mut packet_batch = PacketBatch::default(); - packet_batch.packets.resize(0, Packet::default()); let num_packets_per_batch = thread_rng().gen_range(1, max_packets_per_batch); + let mut packet_batch = PacketBatch::with_capacity(num_packets_per_batch); for _ in 0..num_packets_per_batch { - packet_batch.packets.push(packet.clone()); + packet_batch.push(packet.clone()); } - assert_eq!(packet_batch.packets.len(), num_packets_per_batch); + assert_eq!(packet_batch.len(), num_packets_per_batch); packet_batch }) .collect(); @@ -1081,12 +1073,11 @@ mod tests { // generate packet vector let batches: Vec<_> = (0..num_batches) .map(|_| { - let mut packet_batch = PacketBatch::default(); - packet_batch.packets.resize(0, Packet::default()); + let mut packet_batch = PacketBatch::with_capacity(num_packets_per_batch); for _ in 0..num_packets_per_batch { - packet_batch.packets.push(packet.clone()); + packet_batch.push(packet.clone()); } - assert_eq!(packet_batch.packets.len(), num_packets_per_batch); + assert_eq!(packet_batch.len(), num_packets_per_batch); packet_batch }) .collect(); @@ -1113,7 +1104,7 @@ mod tests { let should_discard = modify_data; assert!(batches .iter() - .flat_map(|batch| &batch.packets) + .flat_map(|batch| batch.iter()) .all(|p| p.meta.discard() == should_discard)); } @@ -1137,7 +1128,7 @@ mod tests { ed25519_verify(&mut batches); assert!(batches .iter() - .flat_map(|batch| &batch.packets) + .flat_map(|batch| batch.iter()) .all(|p| p.meta.discard())); } @@ -1169,7 +1160,7 @@ mod tests { packet.data[40] = packet.data[40].wrapping_add(8); - batches[0].packets.push(packet); + batches[0].push(packet); // verify packets ed25519_verify(&mut batches); @@ -1180,7 +1171,7 @@ mod tests { ref_vec[0].push(0u8); assert!(batches .iter() - .flat_map(|batch| &batch.packets) + .flat_map(|batch| batch.iter()) .zip(ref_vec.into_iter().flatten()) .all(|(p, discard)| { if discard == 0 { @@ -1208,15 +1199,15 @@ mod tests { let num_modifications = thread_rng().gen_range(0, 5); for _ in 0..num_modifications { let batch = thread_rng().gen_range(0, batches.len()); - let packet = thread_rng().gen_range(0, batches[batch].packets.len()); - let offset = thread_rng().gen_range(0, batches[batch].packets[packet].meta.size); + let packet = thread_rng().gen_range(0, batches[batch].len()); + let offset = thread_rng().gen_range(0, batches[batch][packet].meta.size); let add = thread_rng().gen_range(0, 255); - batches[batch].packets[packet].data[offset] = - batches[batch].packets[packet].data[offset].wrapping_add(add); + batches[batch][packet].data[offset] = + batches[batch][packet].data[offset].wrapping_add(add); } let batch_to_disable = thread_rng().gen_range(0, batches.len()); - for p in batches[batch_to_disable].packets.iter_mut() { + for p in batches[batch_to_disable].iter_mut() { p.meta.set_discard(true); } @@ -1230,8 +1221,8 @@ mod tests { // check result batches .iter() - .flat_map(|batch| &batch.packets) - .zip(batches_cpu.iter().flat_map(|batch| &batch.packets)) + .flat_map(|batch| batch.iter()) + .zip(batches_cpu.iter().flat_map(|batch| batch.iter())) .for_each(|(p1, p2)| assert_eq!(p1, p2)); } } @@ -1386,26 +1377,20 @@ mod tests { let mut current_offset = 0usize; let mut batch = PacketBatch::default(); - batch - .packets - .push(Packet::from_data(None, test_tx()).unwrap()); + batch.push(Packet::from_data(None, test_tx()).unwrap()); let tx = new_test_vote_tx(&mut rng); - batch.packets.push(Packet::from_data(None, tx).unwrap()); - batch - .packets - .iter_mut() - .enumerate() - .for_each(|(index, packet)| { - let packet_offsets = do_get_packet_offsets(packet, current_offset).unwrap(); - check_for_simple_vote_transaction(packet, &packet_offsets, current_offset).ok(); - if index == 1 { - assert!(packet.meta.is_simple_vote_tx()); - } else { - assert!(!packet.meta.is_simple_vote_tx()); - } + batch.push(Packet::from_data(None, tx).unwrap()); + batch.iter_mut().enumerate().for_each(|(index, packet)| { + let packet_offsets = do_get_packet_offsets(packet, current_offset).unwrap(); + check_for_simple_vote_transaction(packet, &packet_offsets, current_offset).ok(); + if index == 1 { + assert!(packet.meta.is_simple_vote_tx()); + } else { + assert!(!packet.meta.is_simple_vote_tx()); + } - current_offset = current_offset.saturating_add(size_of::()); - }); + current_offset = current_offset.saturating_add(size_of::()); + }); } #[test] @@ -1476,15 +1461,13 @@ mod tests { PACKETS_PER_BATCH, ); batches.iter_mut().for_each(|b| { - b.packets - .iter_mut() + b.iter_mut() .for_each(|p| p.meta.set_discard(thread_rng().gen())) }); //find all the non discarded packets let mut start = vec![]; batches.iter_mut().for_each(|b| { - b.packets - .iter_mut() + b.iter_mut() .filter(|p| !p.meta.discard()) .for_each(|p| start.push(p.clone())) }); @@ -1497,8 +1480,7 @@ mod tests { //make sure all the non discarded packets are the same let mut end = vec![]; batches.iter_mut().for_each(|b| { - b.packets - .iter_mut() + b.iter_mut() .filter(|p| !p.meta.discard()) .for_each(|p| end.push(p.clone())) }); @@ -1662,15 +1644,14 @@ mod tests { assert_eq!(batches.len(), BATCH_COUNT); assert_eq!(count_valid_packets(&batches), PACKET_COUNT); batches.iter_mut().enumerate().for_each(|(i, b)| { - b.packets - .iter_mut() + b.iter_mut() .enumerate() .for_each(|(j, p)| p.meta.set_discard(set_discard(i, j))) }); assert_eq!(count_valid_packets(&batches), *expect_valid_packets); debug!("show valid packets for case {}", i); batches.iter_mut().enumerate().for_each(|(i, b)| { - b.packets.iter_mut().enumerate().for_each(|(j, p)| { + b.iter_mut().enumerate().for_each(|(j, p)| { if !p.meta.discard() { debug!("{} {}", i, j) } diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index 1e2f949432..fd809f987f 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -27,11 +27,11 @@ pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) trace!("receiving on {}", socket.local_addr().unwrap()); let start = Instant::now(); loop { - batch.packets.resize( + batch.resize( std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH), Packet::default(), ); - match recv_mmsg(socket, &mut batch.packets[i..]) { + match recv_mmsg(socket, &mut batch[i..]) { Err(_) if i > 0 => { if start.elapsed().as_millis() as u64 > max_wait_ms { break; @@ -55,7 +55,7 @@ pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) } } } - batch.packets.truncate(i); + batch.truncate(i); inc_new_counter_debug!("packets-recv_count", i); Ok(i) } @@ -65,7 +65,7 @@ pub fn send_to( socket: &UdpSocket, socket_addr_space: &SocketAddrSpace, ) -> Result<()> { - for p in &batch.packets { + for p in batch.iter() { let addr = p.meta.socket_addr(); if socket_addr_space.check(&addr) { socket.send_to(&p.data[..p.meta.size], &addr)?; @@ -92,7 +92,7 @@ mod tests { let packets = vec![Packet::default()]; let mut packet_batch = PacketBatch::new(packets); packet_batch.set_addr(&send_addr); - assert_eq!(packet_batch.packets[0].meta.socket_addr(), send_addr); + assert_eq!(packet_batch[0].meta.socket_addr(), send_addr); } #[test] @@ -102,25 +102,23 @@ mod tests { let addr = recv_socket.local_addr().unwrap(); let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); let saddr = send_socket.local_addr().unwrap(); - let mut batch = PacketBatch::default(); - batch.packets.resize(10, Packet::default()); + let packet_batch_size = 10; + let mut batch = PacketBatch::with_capacity(packet_batch_size); + batch.resize(packet_batch_size, Packet::default()); - for m in batch.packets.iter_mut() { + for m in batch.iter_mut() { m.meta.set_socket_addr(&addr); m.meta.size = PACKET_DATA_SIZE; } send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); - batch - .packets - .iter_mut() - .for_each(|pkt| pkt.meta = Meta::default()); + batch.iter_mut().for_each(|pkt| pkt.meta = Meta::default()); let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap(); - assert_eq!(recvd, batch.packets.len()); + assert_eq!(recvd, batch.len()); - for m in &batch.packets { + for m in batch.iter() { assert_eq!(m.meta.size, PACKET_DATA_SIZE); assert_eq!(m.meta.socket_addr(), saddr); } @@ -155,17 +153,18 @@ mod tests { let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); let addr = recv_socket.local_addr().unwrap(); let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let mut batch = PacketBatch::default(); - batch.packets.resize(PACKETS_PER_BATCH, Packet::default()); + let mut batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); + batch.resize(PACKETS_PER_BATCH, Packet::default()); // Should only get PACKETS_PER_BATCH packets per iteration even // if a lot more were sent, and regardless of packet size for _ in 0..2 * PACKETS_PER_BATCH { - let mut batch = PacketBatch::default(); - batch.packets.resize(1, Packet::default()); - for m in batch.packets.iter_mut() { - m.meta.set_socket_addr(&addr); - m.meta.size = 1; + let batch_size = 1; + let mut batch = PacketBatch::with_capacity(batch_size); + batch.resize(batch_size, Packet::default()); + for p in batch.iter_mut() { + p.meta.set_socket_addr(&addr); + p.meta.size = 1; } send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); } @@ -174,6 +173,6 @@ mod tests { // Check we only got PACKETS_PER_BATCH packets assert_eq!(recvd, PACKETS_PER_BATCH); - assert_eq!(batch.packets.capacity(), PACKETS_PER_BATCH); + assert_eq!(batch.capacity(), PACKETS_PER_BATCH); } } diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 152aaef3b1..6429d8d1b2 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -180,7 +180,7 @@ fn handle_chunk( let mut packet = Packet::default(); packet.meta.set_socket_addr(remote_addr); packet.meta.sender_stake = stake; - batch.packets.push(packet); + batch.push(packet); *maybe_batch = Some(batch); stats .total_packets_allocated @@ -189,15 +189,15 @@ fn handle_chunk( if let Some(batch) = maybe_batch.as_mut() { let end = chunk.offset as usize + chunk.bytes.len(); - batch.packets[0].data[chunk.offset as usize..end].copy_from_slice(&chunk.bytes); - batch.packets[0].meta.size = std::cmp::max(batch.packets[0].meta.size, end); + batch[0].data[chunk.offset as usize..end].copy_from_slice(&chunk.bytes); + batch[0].meta.size = std::cmp::max(batch[0].meta.size, end); stats.total_chunks_received.fetch_add(1, Ordering::Relaxed); } } else { trace!("chunk is none"); // done receiving chunks if let Some(batch) = maybe_batch.take() { - let len = batch.packets[0].meta.size; + let len = batch[0].meta.size; if let Err(e) = packet_sender.send(batch) { stats .total_packet_batch_send_err @@ -778,7 +778,7 @@ mod test { let mut total_packets = 0; while now.elapsed().as_secs() < 10 { if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) { - total_packets += packets.packets.len(); + total_packets += packets.len(); all_packets.push(packets) } if total_packets == num_expected_packets { @@ -786,7 +786,7 @@ mod test { } } for batch in all_packets { - for p in &batch.packets { + for p in batch.iter() { assert_eq!(p.meta.size, 1); } } @@ -850,7 +850,7 @@ mod test { let mut total_packets = 0; while now.elapsed().as_secs() < 5 { if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) { - total_packets += packets.packets.len(); + total_packets += packets.len(); all_packets.push(packets) } if total_packets > num_expected_packets { @@ -858,7 +858,7 @@ mod test { } } for batch in all_packets { - for p in &batch.packets { + for p in batch.iter() { assert_eq!(p.meta.size, num_bytes); } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 4bcd020653..ea634d0282 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -292,9 +292,9 @@ fn recv_send( let timer = Duration::new(1, 0); let packet_batch = r.recv_timeout(timer)?; if let Some(stats) = stats { - packet_batch.packets.iter().for_each(|p| stats.record(p)); + packet_batch.iter().for_each(|p| stats.record(p)); } - let packets = packet_batch.packets.iter().filter_map(|pkt| { + let packets = packet_batch.iter().filter_map(|pkt| { let addr = pkt.meta.socket_addr(); socket_addr_space .check(&addr) @@ -313,13 +313,13 @@ pub fn recv_vec_packet_batches( trace!("got packets"); let mut num_packets = packet_batches .iter() - .map(|packets| packets.packets.len()) + .map(|packets| packets.len()) .sum::(); while let Ok(packet_batch) = recvr.try_recv() { trace!("got more packets"); num_packets += packet_batch .iter() - .map(|packets| packets.packets.len()) + .map(|packets| packets.len()) .sum::(); packet_batches.extend(packet_batch); } @@ -339,11 +339,11 @@ pub fn recv_packet_batches( let packet_batch = recvr.recv_timeout(timer)?; let recv_start = Instant::now(); trace!("got packets"); - let mut num_packets = packet_batch.packets.len(); + let mut num_packets = packet_batch.len(); let mut packet_batches = vec![packet_batch]; while let Ok(packet_batch) = recvr.try_recv() { trace!("got more packets"); - num_packets += packet_batch.packets.len(); + num_packets += packet_batch.len(); packet_batches.push(packet_batch); } let recv_duration = recv_start.elapsed(); @@ -431,7 +431,7 @@ mod test { continue; } - *num_packets -= packet_batch_res.unwrap().packets.len(); + *num_packets -= packet_batch_res.unwrap().len(); if *num_packets == 0 { break; @@ -482,7 +482,7 @@ mod test { p.meta.size = PACKET_DATA_SIZE; p.meta.set_socket_addr(&addr); } - packet_batch.packets.push(p); + packet_batch.push(p); } s_responder.send(packet_batch).expect("send"); t_responder