From ec7ca411dd46a2d9c471b051a7700dc51dded074 Mon Sep 17 00:00:00 2001 From: steviez Date: Mon, 23 May 2022 15:30:15 -0500 Subject: [PATCH] Make PacketBatch packets vector non-public (#25413) Upcoming changes to PacketBatch to support variable sized packets will modify the internals of PacketBatch. So, this change removes usage of the internal packet struct and instead uses accessors (which are currently just wrappers of Vector functions but will change down the road). --- banking-bench/src/main.rs | 2 +- bench-streamer/src/main.rs | 11 +- client/tests/quic_client.rs | 4 +- core/benches/banking_stage.rs | 2 +- core/benches/sigverify_stage.rs | 14 +- core/benches/unprocessed_packet_batches.rs | 2 +- core/src/ancestor_hashes_service.rs | 12 +- core/src/banking_stage.rs | 20 +-- core/src/cluster_info_vote_listener.rs | 6 +- core/src/fetch_stage.rs | 8 +- core/src/find_packet_sender_stake_stage.rs | 2 +- core/src/serve_repair.rs | 30 ++--- core/src/shred_fetch_stage.rs | 14 +- core/src/sigverify_shreds.rs | 38 +++--- core/src/sigverify_stage.rs | 24 ++-- core/src/unprocessed_packet_batches.rs | 2 +- core/src/verified_vote_packets.rs | 4 +- core/src/window_service.rs | 18 ++- entry/src/entry.rs | 16 +-- gossip/src/cluster_info.rs | 23 ++-- ledger/benches/sigverify_shreds.rs | 13 +- ledger/src/sigverify_shreds.rs | 56 ++++---- perf/benches/dedup.rs | 2 +- perf/benches/shrink.rs | 6 +- perf/benches/sigverify.rs | 4 +- perf/src/discard.rs | 4 +- perf/src/packet.rs | 127 +++++++++++++++++-- perf/src/sigverify.rs | 141 +++++++++------------ streamer/src/packet.rs | 43 +++---- streamer/src/quic.rs | 16 +-- streamer/src/streamer.rs | 16 +-- 31 files changed, 375 insertions(+), 305 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 3923228123..2839989c39 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -367,7 +367,7 @@ fn main() { for (packet_batch_index, packet_batch) in packets_for_this_iteration.packet_batches.iter().enumerate() { - sent += packet_batch.packets.len(); + sent += packet_batch.len(); trace!( "Sending PacketBatch index {}, {}", packet_batch_index, diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 63930debe9..81d866360d 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -21,9 +21,10 @@ use { fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut packet_batch = PacketBatch::default(); - packet_batch.packets.resize(10, Packet::default()); - for w in packet_batch.packets.iter_mut() { + let batch_size = 10; + let mut packet_batch = PacketBatch::with_capacity(batch_size); + packet_batch.resize(batch_size, Packet::default()); + for w in packet_batch.iter_mut() { w.meta.size = PACKET_DATA_SIZE; w.meta.set_socket_addr(addr); } @@ -33,7 +34,7 @@ fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { return; } let mut num = 0; - for p in &packet_batch.packets { + for p in packet_batch.iter() { let a = p.meta.socket_addr(); assert!(p.meta.size <= PACKET_DATA_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); @@ -50,7 +51,7 @@ fn sink(exit: Arc, rvs: Arc, r: PacketBatchReceiver) -> } let timer = Duration::new(1, 0); if let Ok(packet_batch) = r.recv_timeout(timer) { - rvs.fetch_add(packet_batch.packets.len(), Ordering::Relaxed); + rvs.fetch_add(packet_batch.len(), Ordering::Relaxed); } }) } diff --git a/client/tests/quic_client.rs b/client/tests/quic_client.rs index 8d3681e2b4..9c661e17bd 100644 --- a/client/tests/quic_client.rs +++ b/client/tests/quic_client.rs @@ -64,7 +64,7 @@ mod tests { let mut total_packets = 0; while now.elapsed().as_secs() < 5 { if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) { - total_packets += packets.packets.len(); + total_packets += packets.len(); all_packets.push(packets) } if total_packets >= num_expected_packets { @@ -72,7 +72,7 @@ mod tests { } } for batch in all_packets { - for p in &batch.packets { + for p in &batch { assert_eq!(p.meta.size, num_bytes); } } diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 90026cbc66..d92ef025d9 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -254,7 +254,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { v.len(), ); for xv in v { - sent += xv.packets.len(); + sent += xv.len(); } verified_sender.send(v.to_vec()).unwrap(); } diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 64df3b1418..cdbc480552 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -42,8 +42,8 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) { .collect(); for batch in batches.iter_mut() { - total += batch.packets.len(); - for p in batch.packets.iter_mut() { + total += batch.len(); + for p in batch.iter_mut() { let ip_index = thread_rng().gen_range(0, ips.len()); p.meta.addr = ips[ip_index]; } @@ -54,7 +54,7 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) { SigVerifyStage::discard_excess_packets(&mut batches, 10_000); let mut num_packets = 0; for batch in batches.iter_mut() { - for p in batch.packets.iter_mut() { + for p in batch.iter_mut() { if !p.meta.discard() { num_packets += 1; } @@ -88,7 +88,7 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) { let mut batches = to_packet_batches(&vec![test_tx(); SIZE], CHUNK_SIZE); let spam_addr = new_rand_addr(&mut rng); for batch in batches.iter_mut() { - for packet in batch.packets.iter_mut() { + for packet in batch.iter_mut() { // One spam address, ~1000 unique addresses. packet.meta.addr = if rng.gen_ratio(1, 30) { new_rand_addr(&mut rng) @@ -101,7 +101,7 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) { SigVerifyStage::discard_excess_packets(&mut batches, 10_000); let mut num_packets = 0; for batch in batches.iter_mut() { - for packet in batch.packets.iter_mut() { + for packet in batch.iter_mut() { if !packet.meta.discard() { num_packets += 1; } @@ -158,7 +158,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { let mut sent_len = 0; for _ in 0..batches.len() { if let Some(batch) = batches.pop() { - sent_len += batch.packets.len(); + sent_len += batch.len(); packet_s.send(vec![batch]).unwrap(); } } @@ -167,7 +167,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) { loop { if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { while let Some(v) = verifieds.pop() { - received += v.packets.len(); + received += v.len(); batches.push(v); } if use_same_tx || received >= sent_len { diff --git a/core/benches/unprocessed_packet_batches.rs b/core/benches/unprocessed_packet_batches.rs index 8cbac27b59..1e5caf0130 100644 --- a/core/benches/unprocessed_packet_batches.rs +++ b/core/benches/unprocessed_packet_batches.rs @@ -100,7 +100,7 @@ fn bench_packet_clone(bencher: &mut Bencher) { let mut outer_packet = Packet::default(); let mut timer = Measure::start("insert_batch"); - packet_batch.packets.iter().for_each(|packet| { + packet_batch.iter().for_each(|packet| { let mut packet = packet.clone(); packet.meta.sender_stake *= 2; if packet.meta.sender_stake > 2 { diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index 85df9a8b95..dd3e3eb23a 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -252,13 +252,13 @@ impl AncestorHashesService { ) -> Result<()> { let timeout = Duration::new(1, 0); let mut packet_batches = vec![response_receiver.recv_timeout(timeout)?]; - let mut total_packets = packet_batches[0].packets.len(); + let mut total_packets = packet_batches[0].len(); let mut dropped_packets = 0; while let Ok(batch) = response_receiver.try_recv() { - total_packets += batch.packets.len(); + total_packets += batch.len(); if packet_threshold.should_drop(total_packets) { - dropped_packets += batch.packets.len(); + dropped_packets += batch.len(); } else { packet_batches.push(batch); } @@ -292,7 +292,7 @@ impl AncestorHashesService { duplicate_slots_reset_sender: &DuplicateSlotsResetSender, retryable_slots_sender: &RetryableSlotsSender, ) { - packet_batch.packets.iter().for_each(|packet| { + packet_batch.iter().for_each(|packet| { let decision = Self::verify_and_process_ancestor_response( packet, ancestor_hashes_request_statuses, @@ -1116,7 +1116,7 @@ mod test { let mut response_packet = response_receiver .recv_timeout(Duration::from_millis(10_000)) .unwrap(); - let packet = &mut response_packet.packets[0]; + let packet = &mut response_packet[0]; packet.meta.set_socket_addr(&responder_info.serve_repair); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, @@ -1477,7 +1477,7 @@ mod test { let mut response_packet = response_receiver .recv_timeout(Duration::from_millis(10_000)) .unwrap(); - let packet = &mut response_packet.packets[0]; + let packet = &mut response_packet[0]; packet.meta.set_socket_addr(&responder_info.serve_repair); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 48b5a439a8..b57e05d624 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -21,7 +21,6 @@ use { solana_measure::measure::Measure, solana_metrics::inc_new_counter_info, solana_perf::{ - cuda_runtime::PinnedVec, data_budget::DataBudget, packet::{Packet, PacketBatch, PACKETS_PER_BATCH}, perf_libs, @@ -1969,7 +1968,7 @@ impl BankingStage { original_unprocessed_packets_len.saturating_sub(unprocessed_packets.len()) } - fn generate_packet_indexes(vers: &PinnedVec) -> Vec { + fn generate_packet_indexes(vers: &PacketBatch) -> Vec { vers.iter() .enumerate() .filter(|(_, pkt)| !pkt.meta.discard()) @@ -1984,12 +1983,11 @@ impl BankingStage { ) -> Result, RecvTimeoutError> { let start = Instant::now(); let mut packet_batches = verified_receiver.recv_timeout(recv_timeout)?; - let mut num_packets_received: usize = - packet_batches.iter().map(|batch| batch.packets.len()).sum(); + let mut num_packets_received: usize = packet_batches.iter().map(|batch| batch.len()).sum(); while let Ok(packet_batch) = verified_receiver.try_recv() { trace!("got more packet batches in banking stage"); let (packets_received, packet_count_overflowed) = num_packets_received - .overflowing_add(packet_batch.iter().map(|batch| batch.packets.len()).sum()); + .overflowing_add(packet_batch.iter().map(|batch| batch.len()).sum()); packet_batches.extend(packet_batch); // Spend any leftover receive time budget to greedily receive more packet batches, @@ -2025,7 +2023,7 @@ impl BankingStage { recv_time.stop(); let packet_batches_len = packet_batches.len(); - let packet_count: usize = packet_batches.iter().map(|x| x.packets.len()).sum(); + let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum(); debug!( "@{:?} process start stalled for: {:?}ms txs: {} id: {}", timestamp(), @@ -2039,14 +2037,11 @@ impl BankingStage { let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; for packet_batch in packet_batch_iter { - let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); + let packet_indexes = Self::generate_packet_indexes(&packet_batch); // Track all the packets incoming from sigverify, both valid and invalid slot_metrics_tracker.increment_total_new_valid_packets(packet_indexes.len() as u64); slot_metrics_tracker.increment_newly_failed_sigverify_count( - packet_batch - .packets - .len() - .saturating_sub(packet_indexes.len()) as u64, + packet_batch.len().saturating_sub(packet_indexes.len()) as u64, ); Self::push_unprocessed( @@ -2332,8 +2327,7 @@ mod tests { mut with_vers: Vec<(PacketBatch, Vec)>, ) -> Vec { with_vers.iter_mut().for_each(|(b, v)| { - b.packets - .iter_mut() + b.iter_mut() .zip(v) .for_each(|(p, f)| p.meta.set_discard(*f == 0)) }); diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index a52efadcab..3148d4649c 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -303,8 +303,8 @@ impl ClusterInfoVoteListener { .zip(packet_batches) .filter(|(_, packet_batch)| { // to_packet_batches() above splits into 1 packet long batches - assert_eq!(packet_batch.packets.len(), 1); - !packet_batch.packets[0].meta.discard() + assert_eq!(packet_batch.len(), 1); + !packet_batch[0].meta.discard() }) .filter_map(|(tx, packet_batch)| { let (vote_account_key, vote, ..) = vote_parser::parse_vote_transaction(&tx)?; @@ -1515,7 +1515,7 @@ mod tests { fn verify_packets_len(packets: &[VerifiedVoteMetadata], ref_value: usize) { let num_packets: usize = packets .iter() - .map(|vote_metadata| vote_metadata.packet_batch.packets.len()) + .map(|vote_metadata| vote_metadata.packet_batch.len()) .sum(); assert_eq!(num_packets, ref_value); } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 5a702f0312..d78ae84a1a 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -97,12 +97,12 @@ impl FetchStage { }; let mut packet_batch = recvr.recv()?; - let mut num_packets = packet_batch.packets.len(); - packet_batch.packets.iter_mut().for_each(mark_forwarded); + let mut num_packets = packet_batch.len(); + packet_batch.iter_mut().for_each(mark_forwarded); let mut packet_batches = vec![packet_batch]; while let Ok(mut packet_batch) = recvr.try_recv() { - packet_batch.packets.iter_mut().for_each(mark_forwarded); - num_packets += packet_batch.packets.len(); + packet_batch.iter_mut().for_each(mark_forwarded); + num_packets += packet_batch.len(); packet_batches.push(packet_batch); // Read at most 1K transactions in a loop if num_packets > 1024 { diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs index 399a190e62..e068402773 100644 --- a/core/src/find_packet_sender_stake_stage.rs +++ b/core/src/find_packet_sender_stake_stage.rs @@ -160,7 +160,7 @@ impl FindPacketSenderStakeStage { PAR_THREAD_POOL.install(|| { batches .into_par_iter() - .flat_map(|batch| batch.packets.par_iter_mut()) + .flat_map(|batch| batch.par_iter_mut()) .for_each(|packet| { packet.meta.sender_stake = ip_to_stake .get(&packet.meta.addr) diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 76c887adc7..6f110c9487 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -327,13 +327,13 @@ impl ServeRepair { //TODO cache connections let timeout = Duration::new(1, 0); let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?]; - let mut total_packets = reqs_v[0].packets.len(); + let mut total_packets = reqs_v[0].len(); let mut dropped_packets = 0; while let Ok(more) = requests_receiver.try_recv() { - total_packets += more.packets.len(); + total_packets += more.len(); if packet_threshold.should_drop(total_packets) { - dropped_packets += more.packets.len(); + dropped_packets += more.len(); } else { reqs_v.push(more); } @@ -431,7 +431,7 @@ impl ServeRepair { stats: &mut ServeRepairStats, ) { // iter over the packets - packet_batch.packets.iter().for_each(|packet| { + packet_batch.iter().for_each(|packet| { let from_addr = packet.meta.socket_addr(); limited_deserialize(&packet.data[..packet.meta.size]) .into_iter() @@ -700,11 +700,12 @@ impl ServeRepair { nonce, ); if let Some(packet) = packet { - res.packets.push(packet); + res.push(packet); } else { break; } - if meta.parent_slot.is_some() && res.packets.len() < max_responses { + + if meta.parent_slot.is_some() && res.len() < max_responses { slot = meta.parent_slot.unwrap(); } else { break; @@ -810,10 +811,9 @@ mod tests { ) .expect("packets"); let request = ShredRepairType::HighestShred(slot, index); - verify_responses(&request, rv.packets.iter()); + verify_responses(&request, rv.iter()); let rv: Vec = rv - .packets .into_iter() .filter_map(|b| { assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce); @@ -896,9 +896,8 @@ mod tests { ) .expect("packets"); let request = ShredRepairType::Shred(slot, index); - verify_responses(&request, rv.packets.iter()); + verify_responses(&request, rv.iter()); let rv: Vec = rv - .packets .into_iter() .filter_map(|b| { assert_eq!(repair_response::nonce(&b.data[..]).unwrap(), nonce); @@ -1058,7 +1057,6 @@ mod tests { nonce, ) .expect("run_orphan packets") - .packets .iter() .cloned() .collect(); @@ -1128,7 +1126,6 @@ mod tests { nonce, ) .expect("run_orphan packets") - .packets .iter() .cloned() .collect(); @@ -1179,8 +1176,7 @@ mod tests { slot + num_slots, nonce, ) - .expect("run_ancestor_hashes packets") - .packets; + .expect("run_ancestor_hashes packets"); assert_eq!(rv.len(), 1); let packet = &rv[0]; let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet); @@ -1195,8 +1191,7 @@ mod tests { slot + num_slots - 1, nonce, ) - .expect("run_ancestor_hashes packets") - .packets; + .expect("run_ancestor_hashes packets"); assert_eq!(rv.len(), 1); let packet = &rv[0]; let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet); @@ -1218,8 +1213,7 @@ mod tests { slot + num_slots - 1, nonce, ) - .expect("run_ancestor_hashes packets") - .packets; + .expect("run_ancestor_hashes packets"); assert_eq!(rv.len(), 1); let packet = &rv[0]; let ancestor_hashes_response = deserialize_ancestor_hashes_response(packet); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 642ad2e5ed..f098fa553b 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -5,11 +5,7 @@ use { crossbeam_channel::{unbounded, Sender}, lru::LruCache, solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}, - solana_perf::{ - cuda_runtime::PinnedVec, - packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags}, - recycler::Recycler, - }, + solana_perf::packet::{Packet, PacketBatch, PacketBatchRecycler, PacketFlags}, solana_runtime::bank_forks::BankForks, solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT}, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats}, @@ -98,8 +94,8 @@ impl ShredFetchStage { slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch()); } } - stats.shred_count += packet_batch.packets.len(); - packet_batch.packets.iter_mut().for_each(|packet| { + stats.shred_count += packet_batch.len(); + packet_batch.iter_mut().for_each(|packet| { Self::process_packet( packet, &mut shreds_received, @@ -122,7 +118,7 @@ impl ShredFetchStage { sockets: Vec>, exit: &Arc, sender: Sender>, - recycler: Recycler>, + recycler: PacketBatchRecycler, bank_forks: Option>>, name: &'static str, modify: F, @@ -162,7 +158,7 @@ impl ShredFetchStage { bank_forks: Option>>, exit: &Arc, ) -> Self { - let recycler: PacketBatchRecycler = Recycler::warmed(100, 1024); + let recycler = PacketBatchRecycler::warmed(100, 1024); let (mut tvu_threads, tvu_filter) = Self::packet_modifier( sockets, diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index e5f6567c41..97dba300a0 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -35,7 +35,7 @@ impl ShredSigVerifier { fn read_slots(batches: &[PacketBatch]) -> HashSet { batches .iter() - .flat_map(|batch| batch.packets.iter().filter_map(Shred::get_slot_from_packet)) + .flat_map(|batch| batch.iter().filter_map(Shred::get_slot_from_packet)) .collect() } } @@ -91,13 +91,18 @@ pub mod tests { 0, 0xc0de, ); - let mut batches = [PacketBatch::default(), PacketBatch::default()]; + let mut batches: Vec<_> = (0..2) + .map(|_| { + let mut batch = PacketBatch::with_capacity(1); + batch.resize(1, Packet::default()); + batch + }) + .collect(); let keypair = Keypair::new(); shred.sign(&keypair); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[0].meta.size = shred.payload().len(); + batches[0][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][0].meta.size = shred.payload().len(); let mut shred = Shred::new_from_data( 0xc0de_dead, @@ -110,9 +115,8 @@ pub mod tests { 0xc0de, ); shred.sign(&keypair); - batches[1].packets.resize(1, Packet::default()); - batches[1].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[1].packets[0].meta.size = shred.payload().len(); + batches[1][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[1][0].meta.size = shred.payload().len(); let expected: HashSet = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect(); assert_eq!(ShredSigVerifier::read_slots(&batches), expected); @@ -129,8 +133,10 @@ pub mod tests { let bf = Arc::new(RwLock::new(BankForks::new(bank))); let verifier = ShredSigVerifier::new(bf, cache); - let mut batches = vec![PacketBatch::default()]; - batches[0].packets.resize(2, Packet::default()); + let batch_size = 2; + let mut batch = PacketBatch::with_capacity(batch_size); + batch.resize(batch_size, Packet::default()); + let mut batches = vec![batch]; let mut shred = Shred::new_from_data( 0, @@ -143,8 +149,8 @@ pub mod tests { 0xc0de, ); shred.sign(&leader_keypair); - batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[0].meta.size = shred.payload().len(); + batches[0][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][0].meta.size = shred.payload().len(); let mut shred = Shred::new_from_data( 0, @@ -158,12 +164,12 @@ pub mod tests { ); let wrong_keypair = Keypair::new(); shred.sign(&wrong_keypair); - batches[0].packets[1].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[1].meta.size = shred.payload().len(); + batches[0][1].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][1].meta.size = shred.payload().len(); let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches); let rv = verifier.verify_batches(batches, num_packets); - assert!(!rv[0].packets[0].meta.discard()); - assert!(rv[0].packets[1].meta.discard()); + assert!(!rv[0][0].meta.discard()); + assert!(rv[0][1].meta.discard()); } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 49c57dc90d..c1b1a8b276 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -226,7 +226,7 @@ impl SigVerifyStage { let mut addrs = batches .iter_mut() .rev() - .flat_map(|batch| batch.packets.iter_mut().rev()) + .flat_map(|batch| batch.iter_mut().rev()) .filter(|packet| !packet.meta.discard()) .map(|packet| (packet.meta.addr, packet)) .into_group_map(); @@ -423,7 +423,6 @@ mod tests { .iter() .map(|batch| { batch - .packets .iter() .map(|p| if p.meta.discard() { 0 } else { 1 }) .sum::() @@ -434,18 +433,19 @@ mod tests { #[test] fn test_packet_discard() { solana_logger::setup(); - let mut batch = PacketBatch::default(); - batch.packets.resize(10, Packet::default()); - batch.packets[3].meta.addr = std::net::IpAddr::from([1u16; 8]); - batch.packets[3].meta.set_discard(true); - batch.packets[4].meta.addr = std::net::IpAddr::from([2u16; 8]); + let batch_size = 10; + let mut batch = PacketBatch::with_capacity(batch_size); + batch.resize(batch_size, Packet::default()); + batch[3].meta.addr = std::net::IpAddr::from([1u16; 8]); + batch[3].meta.set_discard(true); + batch[4].meta.addr = std::net::IpAddr::from([2u16; 8]); let mut batches = vec![batch]; let max = 3; SigVerifyStage::discard_excess_packets(&mut batches, max); assert_eq!(count_non_discard(&batches), max); - assert!(!batches[0].packets[0].meta.discard()); - assert!(batches[0].packets[3].meta.discard()); - assert!(!batches[0].packets[4].meta.discard()); + assert!(!batches[0][0].meta.discard()); + assert!(batches[0][3].meta.discard()); + assert!(!batches[0][4].meta.discard()); } fn gen_batches(use_same_tx: bool) -> Vec { let len = 4096; @@ -480,7 +480,7 @@ mod tests { let mut sent_len = 0; for _ in 0..batches.len() { if let Some(batch) = batches.pop() { - sent_len += batch.packets.len(); + sent_len += batch.len(); packet_s.send(vec![batch]).unwrap(); } } @@ -489,7 +489,7 @@ mod tests { loop { if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) { while let Some(v) = verifieds.pop() { - received += v.packets.len(); + received += v.len(); batches.push(v); } if use_same_tx || received >= sent_len { diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 4c1842c35f..edf62358cf 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -350,7 +350,7 @@ pub fn deserialize_packets<'a>( packet_indexes: &'a [usize], ) -> impl Iterator + 'a { packet_indexes.iter().filter_map(move |packet_index| { - DeserializedPacket::new(packet_batch.packets[*packet_index].clone()).ok() + DeserializedPacket::new(packet_batch[*packet_index].clone()).ok() }) } diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs index 75e0a0525c..4bac5d9ade 100644 --- a/core/src/verified_vote_packets.rs +++ b/core/src/verified_vote_packets.rs @@ -428,10 +428,10 @@ mod tests { for _ in 0..num_expected_batches { let validator_batch: Vec = gossip_votes_iterator.next().unwrap(); assert_eq!(validator_batch.len(), slot_hashes.slot_hashes().len()); - let expected_len = validator_batch[0].packets.len(); + let expected_len = validator_batch[0].len(); assert!(validator_batch .iter() - .all(|batch| batch.packets.len() == expected_len)); + .all(|batch| batch.len() == expected_len)); } // Should be empty now diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 49b39e5be7..004e3c1cb3 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -353,8 +353,8 @@ where F: Fn(&Shred, Arc, /*last root:*/ Slot) -> bool + Sync, { let timer = Duration::from_millis(200); - let mut packets = verified_receiver.recv_timeout(timer)?; - packets.extend(verified_receiver.try_iter().flatten()); + let mut packet_batches = verified_receiver.recv_timeout(timer)?; + packet_batches.extend(verified_receiver.try_iter().flatten()); let now = Instant::now(); let last_root = blockstore.last_root(); let working_bank = bank_forks.read().unwrap().working_bank(); @@ -384,9 +384,9 @@ where } }; let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { - packets + packet_batches .par_iter() - .flat_map_iter(|pkt| pkt.packets.iter().filter_map(handle_packet)) + .flat_map_iter(|packet_batch| packet_batch.iter().filter_map(handle_packet)) .unzip() }); // Exclude repair packets from retransmit. @@ -406,8 +406,14 @@ where } insert_shred_sender.send((shreds, repair_infos))?; - stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::(); - for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) { + stats.num_packets += packet_batches + .iter() + .map(|packet_batch| packet_batch.len()) + .sum::(); + for packet in packet_batches + .iter() + .flat_map(|packet_batch| packet_batch.iter()) + { let addr = packet.meta.socket_addr(); *stats.addrs.entry(addr).or_default() += 1; } diff --git a/entry/src/entry.rs b/entry/src/entry.rs index f1f7f728ac..addc04127a 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -517,20 +517,16 @@ pub fn start_verify_transactions( // uninitialized anyway, so the initilization would simply write junk into // the vector anyway. unsafe { - packet_batch.packets.set_len(vec_size); + packet_batch.set_len(vec_size); } let entry_tx_iter = slice .into_par_iter() .map(|tx| tx.to_versioned_transaction()); - let res = packet_batch - .packets - .par_iter_mut() - .zip(entry_tx_iter) - .all(|pair| { - pair.0.meta = Meta::default(); - Packet::populate_packet(pair.0, None, &pair.1).is_ok() - }); + let res = packet_batch.par_iter_mut().zip(entry_tx_iter).all(|pair| { + pair.0.meta = Meta::default(); + Packet::populate_packet(pair.0, None, &pair.1).is_ok() + }); if res { Ok(packet_batch) } else { @@ -552,7 +548,7 @@ pub fn start_verify_transactions( ); let verified = packet_batches .iter() - .all(|batch| batch.packets.iter().all(|p| !p.meta.discard())); + .all(|batch| batch.iter().all(|p| !p.meta.discard())); verify_time.stop(); (verified, verify_time.as_us()) }); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 84158035f4..3aec6dd833 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1572,7 +1572,7 @@ impl ClusterInfo { ); self.stats .packets_sent_gossip_requests_count - .add_relaxed(packet_batch.packets.len() as u64); + .add_relaxed(packet_batch.len() as u64); sender.send(packet_batch)?; } Ok(()) @@ -1850,7 +1850,7 @@ impl ClusterInfo { if !response.is_empty() { self.stats .packets_sent_pull_responses_count - .add_relaxed(response.packets.len() as u64); + .add_relaxed(response.len() as u64); let _ = response_sender.send(response); } } @@ -1890,7 +1890,7 @@ impl ClusterInfo { if let Some(ping) = ping { let ping = Protocol::PingMessage(ping); match Packet::from_data(Some(&node.1), ping) { - Ok(packet) => packet_batch.packets.push(packet), + Ok(packet) => packet_batch.push(packet), Err(err) => error!("failed to write ping packet: {:?}", err), }; } @@ -1997,7 +1997,7 @@ impl ClusterInfo { Ok(packet) => { if self.outbound_budget.take(packet.meta.size) { total_bytes += packet.meta.size; - packet_batch.packets.push(packet); + packet_batch.push(packet); sent += 1; } else { self.stats.gossip_pull_request_no_budget.add_relaxed(1); @@ -2285,10 +2285,10 @@ impl ClusterInfo { "handle_batch_push_messages", &prune_messages, ); - let num_prune_packets = packet_batch.packets.len(); + let num_prune_packets = packet_batch.len(); self.stats .push_response_count - .add_relaxed(packet_batch.packets.len() as u64); + .add_relaxed(packet_batch.len() as u64); let new_push_requests = self.new_push_requests(stakes); self.stats .push_message_pushes @@ -2296,7 +2296,7 @@ impl ClusterInfo { for (address, request) in new_push_requests { if ContactInfo::is_valid_address(&address, &self.socket_addr_space) { match Packet::from_data(Some(&address), &request) { - Ok(packet) => packet_batch.packets.push(packet), + Ok(packet) => packet_batch.push(packet), Err(err) => error!("failed to write push-request packet: {:?}", err), } } else { @@ -2308,7 +2308,7 @@ impl ClusterInfo { .add_relaxed(num_prune_packets as u64); self.stats .packets_sent_push_messages_count - .add_relaxed((packet_batch.packets.len() - num_prune_packets) as u64); + .add_relaxed((packet_batch.len() - num_prune_packets) as u64); let _ = response_sender.send(packet_batch); } @@ -2450,10 +2450,10 @@ impl ClusterInfo { thread_pool: &ThreadPool, ) -> Result<(), GossipError> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); - let packets: Vec<_> = receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); + let packets: Vec<_> = receiver.recv_timeout(RECV_TIMEOUT)?.into(); let mut packets = VecDeque::from(packets); for packet_batch in receiver.try_iter() { - packets.extend(packet_batch.packets.iter().cloned()); + packets.extend(packet_batch.iter().cloned()); let excess_count = packets.len().saturating_sub(MAX_GOSSIP_TRAFFIC); if excess_count > 0 { packets.drain(0..excess_count); @@ -3226,8 +3226,7 @@ mod tests { .zip(pings.into_iter()), &recycler, ) - .unwrap() - .packets; + .unwrap(); assert_eq!(remote_nodes.len(), packets.len()); for (packet, (_, socket), pong) in izip!( packets.into_iter(), diff --git a/ledger/benches/sigverify_shreds.rs b/ledger/benches/sigverify_shreds.rs index 4b27d6ad36..324cac542a 100644 --- a/ledger/benches/sigverify_shreds.rs +++ b/ledger/benches/sigverify_shreds.rs @@ -21,13 +21,10 @@ const NUM_BATCHES: usize = 1; fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { let recycler_cache = RecyclerCache::default(); - let mut packet_batch = PacketBatch::default(); - packet_batch.packets.set_pinnable(); + let mut packet_batch = PacketBatch::new_pinned_with_capacity(NUM_PACKETS); + packet_batch.resize(NUM_PACKETS, Packet::default()); let slot = 0xdead_c0de; - // need to pin explicitly since the resize will not cause re-allocation - packet_batch.packets.reserve_and_pin(NUM_PACKETS); - packet_batch.packets.resize(NUM_PACKETS, Packet::default()); - for p in packet_batch.packets.iter_mut() { + for p in packet_batch.iter_mut() { let shred = Shred::new_from_data( slot, 0xc0de, @@ -57,8 +54,8 @@ fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) { fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) { let mut packet_batch = PacketBatch::default(); let slot = 0xdead_c0de; - packet_batch.packets.resize(NUM_PACKETS, Packet::default()); - for p in packet_batch.packets.iter_mut() { + packet_batch.resize(NUM_PACKETS, Packet::default()); + for p in packet_batch.iter_mut() { let shred = Shred::new_from_data( slot, 0xc0de, diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 719fa210e7..e83c1de1b3 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -88,7 +88,6 @@ fn verify_shreds_cpu( .into_par_iter() .map(|batch| { batch - .packets .par_iter() .map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0)) .collect() @@ -114,7 +113,6 @@ fn slot_key_data_for_gpu< .into_par_iter() .map(|batch| { batch - .packets .iter() .map(|packet| { let slot_start = size_of::() + size_of::(); @@ -200,7 +198,7 @@ fn shred_gpu_offsets( let mut v_sig_lens = vec![]; for batch in batches.iter() { let mut sig_lens = Vec::new(); - for packet in batch.packets.iter() { + for packet in batch.iter() { let sig_start = pubkeys_end; let sig_end = sig_start + size_of::(); let msg_start = sig_end; @@ -258,13 +256,13 @@ pub fn verify_shreds_gpu( for batch in batches { elems.push(perf_libs::Elems { - elems: batch.packets.as_ptr(), - num: batch.packets.len() as u32, + elems: batch.as_ptr(), + num: batch.len() as u32, }); let mut v = Vec::new(); - v.resize(batch.packets.len(), 0); + v.resize(batch.len(), 0); rvs.push(v); - num_packets += batch.packets.len(); + num_packets += batch.len(); } out.resize(signature_offsets.len(), 0); @@ -327,7 +325,7 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [PacketBatch]) { debug!("CPU SHRED ECDSA for {}", packet_count); SIGVERIFY_THREAD_POOL.install(|| { batches.par_iter_mut().for_each(|batch| { - batch.packets[..] + batch[..] .par_iter_mut() .for_each(|p| sign_shred_cpu(keypair, p)); }); @@ -395,12 +393,12 @@ pub fn sign_shreds_gpu( for batch in batches.iter() { elems.push(perf_libs::Elems { - elems: batch.packets.as_ptr(), - num: batch.packets.len() as u32, + elems: batch.as_ptr(), + num: batch.len() as u32, }); let mut v = Vec::new(); - v.resize(batch.packets.len(), 0); - num_packets += batch.packets.len(); + v.resize(batch.len(), 0); + num_packets += batch.len(); } trace!("Starting verify num packets: {}", num_packets); @@ -427,7 +425,7 @@ pub fn sign_shreds_gpu( } trace!("done sign"); let mut sizes: Vec = vec![0]; - sizes.extend(batches.iter().map(|b| b.packets.len())); + sizes.extend(batches.iter().map(|b| b.len())); for i in 0..sizes.len() { if i == 0 { continue; @@ -440,7 +438,7 @@ pub fn sign_shreds_gpu( .enumerate() .for_each(|(batch_ix, batch)| { let num_packets = sizes[batch_ix]; - batch.packets[..] + batch[..] .par_iter_mut() .enumerate() .for_each(|(packet_ix, packet)| { @@ -523,9 +521,9 @@ pub mod tests { ); let keypair = Keypair::new(); shred.sign(&keypair); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[0].meta.size = shred.payload().len(); + batches[0].resize(1, Packet::default()); + batches[0][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][0].meta.size = shred.payload().len(); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() @@ -550,7 +548,7 @@ pub mod tests { .iter() .cloned() .collect(); - batches[0].packets[0].meta.size = 0; + batches[0][0].meta.size = 0; let rv = verify_shreds_cpu(&batches, &leader_slots); assert_eq!(rv, vec![vec![0]]); } @@ -577,9 +575,9 @@ pub mod tests { ); let keypair = Keypair::new(); shred.sign(&keypair); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[0].meta.size = shred.payload().len(); + batches[0].resize(1, Packet::default()); + batches[0][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][0].meta.size = shred.payload().len(); let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), @@ -606,7 +604,7 @@ pub mod tests { let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); - batches[0].packets[0].meta.size = 0; + batches[0][0].meta.size = 0; let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), (slot, keypair.pubkey().to_bytes()), @@ -627,11 +625,12 @@ pub mod tests { solana_logger::setup(); let recycler_cache = RecyclerCache::default(); - let mut packet_batch = PacketBatch::default(); let num_packets = 32; let num_batches = 100; - packet_batch.packets.resize(num_packets, Packet::default()); - for (i, p) in packet_batch.packets.iter_mut().enumerate() { + let mut packet_batch = PacketBatch::with_capacity(num_packets); + packet_batch.resize(num_packets, Packet::default()); + + for (i, p) in packet_batch.iter_mut().enumerate() { let shred = Shred::new_from_data( slot, 0xc0de, @@ -687,9 +686,10 @@ pub mod tests { 0, 0xc0de, ); - batches[0].packets.resize(1, Packet::default()); - batches[0].packets[0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0].packets[0].meta.size = shred.payload().len(); + batches[0].resize(1, Packet::default()); + batches[0][0].data[0..shred.payload().len()].copy_from_slice(shred.payload()); + batches[0][0].meta.size = shred.payload().len(); + let pubkeys = [ (slot, keypair.pubkey().to_bytes()), (std::u64::MAX, Pubkey::default().to_bytes()), diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index adc8a408c3..a6911b8f58 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -30,7 +30,7 @@ fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) deduper.reset(); batches .iter_mut() - .for_each(|b| b.packets.iter_mut().for_each(|p| p.meta.set_discard(false))); + .for_each(|b| b.iter_mut().for_each(|p| p.meta.set_discard(false))); }); } diff --git a/perf/benches/shrink.rs b/perf/benches/shrink.rs index cf60fd90e5..878b84e181 100644 --- a/perf/benches/shrink.rs +++ b/perf/benches/shrink.rs @@ -26,8 +26,7 @@ fn do_bench_shrink_packets(bencher: &mut Bencher, mut batches: Vec) bencher.iter(|| { let _ans = sigverify::shrink_batches(&mut batches); batches.iter_mut().for_each(|b| { - b.packets - .iter_mut() + b.iter_mut() .for_each(|p| p.meta.set_discard(thread_rng().gen())) }); }); @@ -75,8 +74,7 @@ fn bench_shrink_count_packets(bencher: &mut Bencher) { PACKETS_PER_BATCH, ); batches.iter_mut().for_each(|b| { - b.packets - .iter_mut() + b.iter_mut() .for_each(|p| p.meta.set_discard(thread_rng().gen())) }); diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 1d52638696..748a295cee 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -56,8 +56,8 @@ fn bench_sigverify_uneven(bencher: &mut Bencher) { current_packets = num_packets; } let mut batch = PacketBatch::with_capacity(len); - batch.packets.resize(len, Packet::default()); - for packet in batch.packets.iter_mut() { + batch.resize(len, Packet::default()); + for packet in batch.iter_mut() { if thread_rng().gen_ratio(1, 2) { tx = simple_tx.clone(); } else { diff --git a/perf/src/discard.rs b/perf/src/discard.rs index 56f5636980..792de59109 100644 --- a/perf/src/discard.rs +++ b/perf/src/discard.rs @@ -11,7 +11,7 @@ pub fn discard_batches_randomly( while total_packets > max_packets { let index = thread_rng().gen_range(0, batches.len()); let removed = batches.swap_remove(index); - total_packets = total_packets.saturating_sub(removed.packets.len()); + total_packets = total_packets.saturating_sub(removed.len()); } total_packets } @@ -24,7 +24,7 @@ mod tests { fn test_batch_discard_random() { solana_logger::setup(); let mut batch = PacketBatch::default(); - batch.packets.resize(1, Packet::default()); + batch.resize(1, Packet::default()); let num_batches = 100; let mut batches = vec![batch; num_batches]; let max = 5; diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 0cd584f516..f8bbefe26a 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -3,8 +3,14 @@ pub use solana_sdk::packet::{Meta, Packet, PacketFlags, PACKET_DATA_SIZE}; use { crate::{cuda_runtime::PinnedVec, recycler::Recycler}, bincode::config::Options, + rayon::prelude::{IntoParallelIterator, IntoParallelRefIterator, IntoParallelRefMutIterator}, serde::{de::DeserializeOwned, Serialize}, - std::{io::Read, net::SocketAddr}, + std::{ + io::Read, + net::SocketAddr, + ops::{Index, IndexMut}, + slice::{Iter, IterMut, SliceIndex}, + }, }; pub const NUM_PACKETS: usize = 1024 * 8; @@ -14,7 +20,7 @@ pub const NUM_RCVMMSGS: usize = 64; #[derive(Debug, Default, Clone)] pub struct PacketBatch { - pub packets: PinnedVec, + packets: PinnedVec, } pub type PacketBatchRecycler = Recycler>; @@ -30,23 +36,29 @@ impl PacketBatch { PacketBatch { packets } } + pub fn new_pinned_with_capacity(capacity: usize) -> Self { + let mut batch = Self::with_capacity(capacity); + batch.packets.reserve_and_pin(capacity); + batch + } + pub fn new_unpinned_with_recycler( recycler: PacketBatchRecycler, - size: usize, + capacity: usize, name: &'static str, ) -> Self { let mut packets = recycler.allocate(name); - packets.reserve(size); + packets.reserve(capacity); PacketBatch { packets } } pub fn new_with_recycler( recycler: PacketBatchRecycler, - size: usize, + capacity: usize, name: &'static str, ) -> Self { let mut packets = recycler.allocate(name); - packets.reserve_and_pin(size); + packets.reserve_and_pin(capacity); PacketBatch { packets } } @@ -96,15 +108,106 @@ impl PacketBatch { batch } + pub fn resize(&mut self, new_len: usize, value: Packet) { + self.packets.resize(new_len, value) + } + + pub fn truncate(&mut self, len: usize) { + self.packets.truncate(len); + } + + pub fn push(&mut self, packet: Packet) { + self.packets.push(packet); + } + pub fn set_addr(&mut self, addr: &SocketAddr) { - for p in self.packets.iter_mut() { + for p in self.iter_mut() { p.meta.set_socket_addr(addr); } } + pub fn len(&self) -> usize { + self.packets.len() + } + + pub fn capacity(&self) -> usize { + self.packets.capacity() + } + pub fn is_empty(&self) -> bool { self.packets.is_empty() } + + pub fn as_ptr(&self) -> *const Packet { + self.packets.as_ptr() + } + + pub fn iter(&self) -> Iter<'_, Packet> { + self.packets.iter() + } + + pub fn iter_mut(&mut self) -> IterMut<'_, Packet> { + self.packets.iter_mut() + } + + /// See Vector::set_len() for more details + /// + /// # Safety + /// + /// - `new_len` must be less than or equal to [`capacity()`]. + /// - The elements at `old_len..new_len` must be initialized. Packet data + /// will likely be overwritten when populating the packet, but the meta + /// should specifically be initialized to known values. + pub unsafe fn set_len(&mut self, new_len: usize) { + self.packets.set_len(new_len); + } +} + +impl> Index for PacketBatch { + type Output = I::Output; + + #[inline] + fn index(&self, index: I) -> &Self::Output { + &self.packets[index] + } +} + +impl> IndexMut for PacketBatch { + #[inline] + fn index_mut(&mut self, index: I) -> &mut Self::Output { + &mut self.packets[index] + } +} + +impl<'a> IntoIterator for &'a PacketBatch { + type Item = &'a Packet; + type IntoIter = Iter<'a, Packet>; + + fn into_iter(self) -> Self::IntoIter { + self.packets.iter() + } +} + +impl<'a> IntoParallelIterator for &'a PacketBatch { + type Iter = rayon::slice::Iter<'a, Packet>; + type Item = &'a Packet; + fn into_par_iter(self) -> Self::Iter { + self.packets.par_iter() + } +} + +impl<'a> IntoParallelIterator for &'a mut PacketBatch { + type Iter = rayon::slice::IterMut<'a, Packet>; + type Item = &'a mut Packet; + fn into_par_iter(self) -> Self::Iter { + self.packets.par_iter_mut() + } +} + +impl From for Vec { + fn from(batch: PacketBatch) -> Self { + batch.packets.into() + } } pub fn to_packet_batches(items: &[T], chunk_size: usize) -> Vec { @@ -112,7 +215,7 @@ pub fn to_packet_batches(items: &[T], chunk_size: usize) -> Vec usize { - batches.iter().map(|batch| batch.packets.len()).sum() + batches.iter().map(|batch| batch.len()).sum() } pub fn count_valid_packets(batches: &[PacketBatch]) -> usize { batches .iter() - .map(|batch| batch.packets.iter().filter(|p| !p.meta.discard()).count()) + .map(|batch| batch.iter().filter(|p| !p.meta.discard()).count()) .sum() } @@ -396,7 +396,6 @@ pub fn generate_offsets( .iter_mut() .map(|batch| { batch - .packets .iter_mut() .map(|packet| { let packet_offsets = @@ -499,7 +498,7 @@ impl Deduper { pub fn dedup_packets_and_count_discards(&self, batches: &mut [PacketBatch]) -> u64 { batches .iter_mut() - .flat_map(|batch| batch.packets.iter_mut().map(|p| self.dedup_packet(p))) + .flat_map(|batch| batch.iter_mut().map(|p| self.dedup_packet(p))) .sum() } } @@ -510,28 +509,25 @@ pub fn shrink_batches(batches: &mut [PacketBatch]) -> usize { let mut valid_packet_ix = 0; let mut last_valid_batch = 0; for batch_ix in 0..batches.len() { - for packet_ix in 0..batches[batch_ix].packets.len() { - if batches[batch_ix].packets[packet_ix].meta.discard() { + for packet_ix in 0..batches[batch_ix].len() { + if batches[batch_ix][packet_ix].meta.discard() { continue; } last_valid_batch = batch_ix.saturating_add(1); let mut found_spot = false; while valid_batch_ix < batch_ix && !found_spot { - while valid_packet_ix < batches[valid_batch_ix].packets.len() { - if batches[valid_batch_ix].packets[valid_packet_ix] - .meta - .discard() - { - batches[valid_batch_ix].packets[valid_packet_ix] = - batches[batch_ix].packets[packet_ix].clone(); - batches[batch_ix].packets[packet_ix].meta.set_discard(true); + while valid_packet_ix < batches[valid_batch_ix].len() { + if batches[valid_batch_ix][valid_packet_ix].meta.discard() { + batches[valid_batch_ix][valid_packet_ix] = + batches[batch_ix][packet_ix].clone(); + batches[batch_ix][packet_ix].meta.set_discard(true); last_valid_batch = valid_batch_ix.saturating_add(1); found_spot = true; break; } valid_packet_ix = valid_packet_ix.saturating_add(1); } - if valid_packet_ix >= batches[valid_batch_ix].packets.len() { + if valid_packet_ix >= batches[valid_batch_ix].len() { valid_packet_ix = 0; valid_batch_ix = valid_batch_ix.saturating_add(1); } @@ -547,7 +543,6 @@ pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, pa PAR_THREAD_POOL.install(|| { batches.into_par_iter().for_each(|batch| { batch - .packets .par_iter_mut() .for_each(|p| verify_packet(p, reject_non_vote)) }); @@ -559,12 +554,9 @@ pub fn ed25519_verify_disabled(batches: &mut [PacketBatch]) { use rayon::prelude::*; let packet_count = count_packets_in_batches(batches); debug!("disabled ECDSA for {}", packet_count); - batches.into_par_iter().for_each(|batch| { - batch - .packets - .par_iter_mut() - .for_each(|p| p.meta.set_discard(false)) - }); + batches + .into_par_iter() + .for_each(|batch| batch.par_iter_mut().for_each(|p| p.meta.set_discard(false))); inc_new_counter_debug!("ed25519_verify_disabled", packet_count); } @@ -621,7 +613,7 @@ pub fn get_checked_scalar(scalar: &[u8; 32]) -> Result<[u8; 32], PacketError> { pub fn mark_disabled(batches: &mut [PacketBatch], r: &[Vec]) { for (batch, v) in batches.iter_mut().zip(r) { - for (pkt, f) in batch.packets.iter_mut().zip(v) { + for (pkt, f) in batch.iter_mut().zip(v) { if !pkt.meta.discard() { pkt.meta.set_discard(*f == 0); } @@ -672,12 +664,12 @@ pub fn ed25519_verify( let mut num_packets: usize = 0; for batch in batches.iter() { elems.push(perf_libs::Elems { - elems: batch.packets.as_ptr(), - num: batch.packets.len() as u32, + elems: batch.as_ptr(), + num: batch.len() as u32, }); - let v = vec![0u8; batch.packets.len()]; + let v = vec![0u8; batch.len()]; rvs.push(v); - num_packets = num_packets.saturating_add(batch.packets.len()); + num_packets = num_packets.saturating_add(batch.len()); } out.resize(signature_offsets.len(), 0); trace!("Starting verify num packets: {}", num_packets); @@ -743,14 +735,15 @@ mod tests { #[test] fn test_mark_disabled() { - let mut batch = PacketBatch::default(); - batch.packets.push(Packet::default()); + let batch_size = 1; + let mut batch = PacketBatch::with_capacity(batch_size); + batch.resize(batch_size, Packet::default()); let mut batches: Vec = vec![batch]; mark_disabled(&mut batches, &[vec![0]]); - assert!(batches[0].packets[0].meta.discard()); - batches[0].packets[0].meta.set_discard(false); + assert!(batches[0][0].meta.discard()); + batches[0][0].meta.set_discard(false); mark_disabled(&mut batches, &[vec![1]]); - assert!(!batches[0].packets[0].meta.discard()); + assert!(!batches[0][0].meta.discard()); } #[test] @@ -854,7 +847,7 @@ mod tests { packet.meta.set_discard(false); let mut batches = generate_packet_batches(&packet, 1, 1); ed25519_verify(&mut batches); - assert!(batches[0].packets[0].meta.discard()); + assert!(batches[0][0].meta.discard()); } #[test] @@ -890,7 +883,7 @@ mod tests { packet.meta.set_discard(false); let mut batches = generate_packet_batches(&packet, 1, 1); ed25519_verify(&mut batches); - assert!(batches[0].packets[0].meta.discard()); + assert!(batches[0][0].meta.discard()); } #[test] @@ -1058,13 +1051,12 @@ mod tests { // generate packet vector let batches: Vec<_> = (0..num_batches) .map(|_| { - let mut packet_batch = PacketBatch::default(); - packet_batch.packets.resize(0, Packet::default()); let num_packets_per_batch = thread_rng().gen_range(1, max_packets_per_batch); + let mut packet_batch = PacketBatch::with_capacity(num_packets_per_batch); for _ in 0..num_packets_per_batch { - packet_batch.packets.push(packet.clone()); + packet_batch.push(packet.clone()); } - assert_eq!(packet_batch.packets.len(), num_packets_per_batch); + assert_eq!(packet_batch.len(), num_packets_per_batch); packet_batch }) .collect(); @@ -1081,12 +1073,11 @@ mod tests { // generate packet vector let batches: Vec<_> = (0..num_batches) .map(|_| { - let mut packet_batch = PacketBatch::default(); - packet_batch.packets.resize(0, Packet::default()); + let mut packet_batch = PacketBatch::with_capacity(num_packets_per_batch); for _ in 0..num_packets_per_batch { - packet_batch.packets.push(packet.clone()); + packet_batch.push(packet.clone()); } - assert_eq!(packet_batch.packets.len(), num_packets_per_batch); + assert_eq!(packet_batch.len(), num_packets_per_batch); packet_batch }) .collect(); @@ -1113,7 +1104,7 @@ mod tests { let should_discard = modify_data; assert!(batches .iter() - .flat_map(|batch| &batch.packets) + .flat_map(|batch| batch.iter()) .all(|p| p.meta.discard() == should_discard)); } @@ -1137,7 +1128,7 @@ mod tests { ed25519_verify(&mut batches); assert!(batches .iter() - .flat_map(|batch| &batch.packets) + .flat_map(|batch| batch.iter()) .all(|p| p.meta.discard())); } @@ -1169,7 +1160,7 @@ mod tests { packet.data[40] = packet.data[40].wrapping_add(8); - batches[0].packets.push(packet); + batches[0].push(packet); // verify packets ed25519_verify(&mut batches); @@ -1180,7 +1171,7 @@ mod tests { ref_vec[0].push(0u8); assert!(batches .iter() - .flat_map(|batch| &batch.packets) + .flat_map(|batch| batch.iter()) .zip(ref_vec.into_iter().flatten()) .all(|(p, discard)| { if discard == 0 { @@ -1208,15 +1199,15 @@ mod tests { let num_modifications = thread_rng().gen_range(0, 5); for _ in 0..num_modifications { let batch = thread_rng().gen_range(0, batches.len()); - let packet = thread_rng().gen_range(0, batches[batch].packets.len()); - let offset = thread_rng().gen_range(0, batches[batch].packets[packet].meta.size); + let packet = thread_rng().gen_range(0, batches[batch].len()); + let offset = thread_rng().gen_range(0, batches[batch][packet].meta.size); let add = thread_rng().gen_range(0, 255); - batches[batch].packets[packet].data[offset] = - batches[batch].packets[packet].data[offset].wrapping_add(add); + batches[batch][packet].data[offset] = + batches[batch][packet].data[offset].wrapping_add(add); } let batch_to_disable = thread_rng().gen_range(0, batches.len()); - for p in batches[batch_to_disable].packets.iter_mut() { + for p in batches[batch_to_disable].iter_mut() { p.meta.set_discard(true); } @@ -1230,8 +1221,8 @@ mod tests { // check result batches .iter() - .flat_map(|batch| &batch.packets) - .zip(batches_cpu.iter().flat_map(|batch| &batch.packets)) + .flat_map(|batch| batch.iter()) + .zip(batches_cpu.iter().flat_map(|batch| batch.iter())) .for_each(|(p1, p2)| assert_eq!(p1, p2)); } } @@ -1386,26 +1377,20 @@ mod tests { let mut current_offset = 0usize; let mut batch = PacketBatch::default(); - batch - .packets - .push(Packet::from_data(None, test_tx()).unwrap()); + batch.push(Packet::from_data(None, test_tx()).unwrap()); let tx = new_test_vote_tx(&mut rng); - batch.packets.push(Packet::from_data(None, tx).unwrap()); - batch - .packets - .iter_mut() - .enumerate() - .for_each(|(index, packet)| { - let packet_offsets = do_get_packet_offsets(packet, current_offset).unwrap(); - check_for_simple_vote_transaction(packet, &packet_offsets, current_offset).ok(); - if index == 1 { - assert!(packet.meta.is_simple_vote_tx()); - } else { - assert!(!packet.meta.is_simple_vote_tx()); - } + batch.push(Packet::from_data(None, tx).unwrap()); + batch.iter_mut().enumerate().for_each(|(index, packet)| { + let packet_offsets = do_get_packet_offsets(packet, current_offset).unwrap(); + check_for_simple_vote_transaction(packet, &packet_offsets, current_offset).ok(); + if index == 1 { + assert!(packet.meta.is_simple_vote_tx()); + } else { + assert!(!packet.meta.is_simple_vote_tx()); + } - current_offset = current_offset.saturating_add(size_of::()); - }); + current_offset = current_offset.saturating_add(size_of::()); + }); } #[test] @@ -1476,15 +1461,13 @@ mod tests { PACKETS_PER_BATCH, ); batches.iter_mut().for_each(|b| { - b.packets - .iter_mut() + b.iter_mut() .for_each(|p| p.meta.set_discard(thread_rng().gen())) }); //find all the non discarded packets let mut start = vec![]; batches.iter_mut().for_each(|b| { - b.packets - .iter_mut() + b.iter_mut() .filter(|p| !p.meta.discard()) .for_each(|p| start.push(p.clone())) }); @@ -1497,8 +1480,7 @@ mod tests { //make sure all the non discarded packets are the same let mut end = vec![]; batches.iter_mut().for_each(|b| { - b.packets - .iter_mut() + b.iter_mut() .filter(|p| !p.meta.discard()) .for_each(|p| end.push(p.clone())) }); @@ -1662,15 +1644,14 @@ mod tests { assert_eq!(batches.len(), BATCH_COUNT); assert_eq!(count_valid_packets(&batches), PACKET_COUNT); batches.iter_mut().enumerate().for_each(|(i, b)| { - b.packets - .iter_mut() + b.iter_mut() .enumerate() .for_each(|(j, p)| p.meta.set_discard(set_discard(i, j))) }); assert_eq!(count_valid_packets(&batches), *expect_valid_packets); debug!("show valid packets for case {}", i); batches.iter_mut().enumerate().for_each(|(i, b)| { - b.packets.iter_mut().enumerate().for_each(|(j, p)| { + b.iter_mut().enumerate().for_each(|(j, p)| { if !p.meta.discard() { debug!("{} {}", i, j) } diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index 1e2f949432..fd809f987f 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -27,11 +27,11 @@ pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) trace!("receiving on {}", socket.local_addr().unwrap()); let start = Instant::now(); loop { - batch.packets.resize( + batch.resize( std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH), Packet::default(), ); - match recv_mmsg(socket, &mut batch.packets[i..]) { + match recv_mmsg(socket, &mut batch[i..]) { Err(_) if i > 0 => { if start.elapsed().as_millis() as u64 > max_wait_ms { break; @@ -55,7 +55,7 @@ pub fn recv_from(batch: &mut PacketBatch, socket: &UdpSocket, max_wait_ms: u64) } } } - batch.packets.truncate(i); + batch.truncate(i); inc_new_counter_debug!("packets-recv_count", i); Ok(i) } @@ -65,7 +65,7 @@ pub fn send_to( socket: &UdpSocket, socket_addr_space: &SocketAddrSpace, ) -> Result<()> { - for p in &batch.packets { + for p in batch.iter() { let addr = p.meta.socket_addr(); if socket_addr_space.check(&addr) { socket.send_to(&p.data[..p.meta.size], &addr)?; @@ -92,7 +92,7 @@ mod tests { let packets = vec![Packet::default()]; let mut packet_batch = PacketBatch::new(packets); packet_batch.set_addr(&send_addr); - assert_eq!(packet_batch.packets[0].meta.socket_addr(), send_addr); + assert_eq!(packet_batch[0].meta.socket_addr(), send_addr); } #[test] @@ -102,25 +102,23 @@ mod tests { let addr = recv_socket.local_addr().unwrap(); let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); let saddr = send_socket.local_addr().unwrap(); - let mut batch = PacketBatch::default(); - batch.packets.resize(10, Packet::default()); + let packet_batch_size = 10; + let mut batch = PacketBatch::with_capacity(packet_batch_size); + batch.resize(packet_batch_size, Packet::default()); - for m in batch.packets.iter_mut() { + for m in batch.iter_mut() { m.meta.set_socket_addr(&addr); m.meta.size = PACKET_DATA_SIZE; } send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); - batch - .packets - .iter_mut() - .for_each(|pkt| pkt.meta = Meta::default()); + batch.iter_mut().for_each(|pkt| pkt.meta = Meta::default()); let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap(); - assert_eq!(recvd, batch.packets.len()); + assert_eq!(recvd, batch.len()); - for m in &batch.packets { + for m in batch.iter() { assert_eq!(m.meta.size, PACKET_DATA_SIZE); assert_eq!(m.meta.socket_addr(), saddr); } @@ -155,17 +153,18 @@ mod tests { let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); let addr = recv_socket.local_addr().unwrap(); let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let mut batch = PacketBatch::default(); - batch.packets.resize(PACKETS_PER_BATCH, Packet::default()); + let mut batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); + batch.resize(PACKETS_PER_BATCH, Packet::default()); // Should only get PACKETS_PER_BATCH packets per iteration even // if a lot more were sent, and regardless of packet size for _ in 0..2 * PACKETS_PER_BATCH { - let mut batch = PacketBatch::default(); - batch.packets.resize(1, Packet::default()); - for m in batch.packets.iter_mut() { - m.meta.set_socket_addr(&addr); - m.meta.size = 1; + let batch_size = 1; + let mut batch = PacketBatch::with_capacity(batch_size); + batch.resize(batch_size, Packet::default()); + for p in batch.iter_mut() { + p.meta.set_socket_addr(&addr); + p.meta.size = 1; } send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); } @@ -174,6 +173,6 @@ mod tests { // Check we only got PACKETS_PER_BATCH packets assert_eq!(recvd, PACKETS_PER_BATCH); - assert_eq!(batch.packets.capacity(), PACKETS_PER_BATCH); + assert_eq!(batch.capacity(), PACKETS_PER_BATCH); } } diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 152aaef3b1..6429d8d1b2 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -180,7 +180,7 @@ fn handle_chunk( let mut packet = Packet::default(); packet.meta.set_socket_addr(remote_addr); packet.meta.sender_stake = stake; - batch.packets.push(packet); + batch.push(packet); *maybe_batch = Some(batch); stats .total_packets_allocated @@ -189,15 +189,15 @@ fn handle_chunk( if let Some(batch) = maybe_batch.as_mut() { let end = chunk.offset as usize + chunk.bytes.len(); - batch.packets[0].data[chunk.offset as usize..end].copy_from_slice(&chunk.bytes); - batch.packets[0].meta.size = std::cmp::max(batch.packets[0].meta.size, end); + batch[0].data[chunk.offset as usize..end].copy_from_slice(&chunk.bytes); + batch[0].meta.size = std::cmp::max(batch[0].meta.size, end); stats.total_chunks_received.fetch_add(1, Ordering::Relaxed); } } else { trace!("chunk is none"); // done receiving chunks if let Some(batch) = maybe_batch.take() { - let len = batch.packets[0].meta.size; + let len = batch[0].meta.size; if let Err(e) = packet_sender.send(batch) { stats .total_packet_batch_send_err @@ -778,7 +778,7 @@ mod test { let mut total_packets = 0; while now.elapsed().as_secs() < 10 { if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) { - total_packets += packets.packets.len(); + total_packets += packets.len(); all_packets.push(packets) } if total_packets == num_expected_packets { @@ -786,7 +786,7 @@ mod test { } } for batch in all_packets { - for p in &batch.packets { + for p in batch.iter() { assert_eq!(p.meta.size, 1); } } @@ -850,7 +850,7 @@ mod test { let mut total_packets = 0; while now.elapsed().as_secs() < 5 { if let Ok(packets) = receiver.recv_timeout(Duration::from_secs(1)) { - total_packets += packets.packets.len(); + total_packets += packets.len(); all_packets.push(packets) } if total_packets > num_expected_packets { @@ -858,7 +858,7 @@ mod test { } } for batch in all_packets { - for p in &batch.packets { + for p in batch.iter() { assert_eq!(p.meta.size, num_bytes); } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 4bcd020653..ea634d0282 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -292,9 +292,9 @@ fn recv_send( let timer = Duration::new(1, 0); let packet_batch = r.recv_timeout(timer)?; if let Some(stats) = stats { - packet_batch.packets.iter().for_each(|p| stats.record(p)); + packet_batch.iter().for_each(|p| stats.record(p)); } - let packets = packet_batch.packets.iter().filter_map(|pkt| { + let packets = packet_batch.iter().filter_map(|pkt| { let addr = pkt.meta.socket_addr(); socket_addr_space .check(&addr) @@ -313,13 +313,13 @@ pub fn recv_vec_packet_batches( trace!("got packets"); let mut num_packets = packet_batches .iter() - .map(|packets| packets.packets.len()) + .map(|packets| packets.len()) .sum::(); while let Ok(packet_batch) = recvr.try_recv() { trace!("got more packets"); num_packets += packet_batch .iter() - .map(|packets| packets.packets.len()) + .map(|packets| packets.len()) .sum::(); packet_batches.extend(packet_batch); } @@ -339,11 +339,11 @@ pub fn recv_packet_batches( let packet_batch = recvr.recv_timeout(timer)?; let recv_start = Instant::now(); trace!("got packets"); - let mut num_packets = packet_batch.packets.len(); + let mut num_packets = packet_batch.len(); let mut packet_batches = vec![packet_batch]; while let Ok(packet_batch) = recvr.try_recv() { trace!("got more packets"); - num_packets += packet_batch.packets.len(); + num_packets += packet_batch.len(); packet_batches.push(packet_batch); } let recv_duration = recv_start.elapsed(); @@ -431,7 +431,7 @@ mod test { continue; } - *num_packets -= packet_batch_res.unwrap().packets.len(); + *num_packets -= packet_batch_res.unwrap().len(); if *num_packets == 0 { break; @@ -482,7 +482,7 @@ mod test { p.meta.size = PACKET_DATA_SIZE; p.meta.set_socket_addr(&addr); } - packet_batch.packets.push(p); + packet_batch.push(p); } s_responder.send(packet_batch).expect("send"); t_responder