From b1340d77a2166448b18b7c6042261fac07599f8b Mon Sep 17 00:00:00 2001 From: Jon Cinque Date: Tue, 6 Dec 2022 12:54:49 +0100 Subject: [PATCH] sdk: Make Packet::meta private, use accessor functions (#29092) sdk: Make packet meta private --- bench-streamer/src/main.rs | 8 +- core/benches/banking_stage.rs | 2 +- core/benches/sigverify_stage.rs | 14 +-- core/benches/unprocessed_packet_batches.rs | 8 +- core/src/ancestor_hashes_service.rs | 12 +- core/src/banking_stage.rs | 8 +- core/src/cluster_info_vote_listener.rs | 2 +- core/src/fetch_stage.rs | 2 +- core/src/find_packet_sender_stake_stage.rs | 4 +- core/src/immutable_deserialized_packet.rs | 2 +- core/src/latest_unprocessed_votes.rs | 15 ++- core/src/packet_deserializer.rs | 4 +- core/src/repair_response.rs | 6 +- core/src/serve_repair.rs | 16 +-- core/src/shred_fetch_stage.rs | 4 +- core/src/sigverify.rs | 6 +- core/src/sigverify_shreds.rs | 18 +-- core/src/sigverify_stage.rs | 30 ++--- core/src/unprocessed_packet_batches.rs | 4 +- core/src/unprocessed_transaction_storage.rs | 14 +-- core/src/window_service.rs | 8 +- entry/src/entry.rs | 4 +- gossip/src/cluster_info.rs | 10 +- gossip/tests/gossip.rs | 2 +- ledger/src/shred.rs | 20 ++-- ledger/src/sigverify_shreds.rs | 18 +-- perf/benches/dedup.rs | 2 +- perf/benches/shrink.rs | 4 +- perf/benches/sigverify.rs | 2 +- perf/src/packet.rs | 2 +- perf/src/sigverify.rs | 116 ++++++++++---------- quic-client/tests/quic_client.rs | 2 +- sdk/src/packet.rs | 14 ++- streamer/src/nonblocking/quic.rs | 12 +- streamer/src/nonblocking/recvmmsg.rs | 42 +++---- streamer/src/packet.rs | 24 ++-- streamer/src/recvmmsg.rs | 48 ++++---- streamer/src/streamer.rs | 8 +- streamer/tests/recvmmsg.rs | 2 +- 39 files changed, 270 insertions(+), 249 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index c4951574f1..32be598c84 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -25,8 +25,8 @@ fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { let mut packet_batch = PacketBatch::with_capacity(batch_size); packet_batch.resize(batch_size, Packet::default()); for w in packet_batch.iter_mut() { - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_socket_addr(addr); + w.meta_mut().size = PACKET_DATA_SIZE; + w.meta_mut().set_socket_addr(addr); } let packet_batch = Arc::new(packet_batch); spawn(move || loop { @@ -35,8 +35,8 @@ fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { } let mut num = 0; for p in packet_batch.iter() { - let a = p.meta.socket_addr(); - assert!(p.meta.size <= PACKET_DATA_SIZE); + let a = p.meta().socket_addr(); + assert!(p.meta().size <= PACKET_DATA_SIZE); let data = p.data(..).unwrap_or_default(); send.send_to(data, a).unwrap(); num += 1; diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 7c7251f214..e537b67848 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -255,7 +255,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { let mut packet_batches = to_packet_batches(&vote_txs, PACKETS_PER_BATCH); for batch in packet_batches.iter_mut() { for packet in batch.iter_mut() { - packet.meta.set_simple_vote(true); + packet.meta_mut().set_simple_vote(true); } } packet_batches diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 299575f3d1..689bf1d011 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -52,7 +52,7 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) { total += batch.len(); for p in batch.iter_mut() { let ip_index = thread_rng().gen_range(0, ips.len()); - p.meta.addr = ips[ip_index]; + p.meta_mut().addr = ips[ip_index]; } } info!("total packets: {}", total); @@ -62,10 +62,10 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) { let mut num_packets = 0; for batch in batches.iter_mut() { for p in batch.iter_mut() { - if !p.meta.discard() { + if !p.meta().discard() { num_packets += 1; } - p.meta.set_discard(false); + p.meta_mut().set_discard(false); } } assert_eq!(num_packets, 10_000); @@ -97,7 +97,7 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) { for batch in batches.iter_mut() { for packet in batch.iter_mut() { // One spam address, ~1000 unique addresses. - packet.meta.addr = if rng.gen_ratio(1, 30) { + packet.meta_mut().addr = if rng.gen_ratio(1, 30) { new_rand_addr(&mut rng) } else { spam_addr @@ -109,10 +109,10 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) { let mut num_packets = 0; for batch in batches.iter_mut() { for packet in batch.iter_mut() { - if !packet.meta.discard() { + if !packet.meta().discard() { num_packets += 1; } - packet.meta.set_discard(false); + packet.meta_mut().set_discard(false); } } assert_eq!(num_packets, 10_000); @@ -215,7 +215,7 @@ fn prepare_batches(discard_factor: i32) -> (Vec, usize) { batch.iter_mut().for_each(|p| { let throw = die.sample(&mut rng); if throw < discard_factor { - p.meta.set_discard(true); + p.meta_mut().set_discard(true); c += 1; } }) diff --git a/core/benches/unprocessed_packet_batches.rs b/core/benches/unprocessed_packet_batches.rs index f8c03961db..4732b5cbfc 100644 --- a/core/benches/unprocessed_packet_batches.rs +++ b/core/benches/unprocessed_packet_batches.rs @@ -38,7 +38,7 @@ fn build_packet_batch( recent_blockhash.unwrap_or_else(Hash::new_unique), ); let mut packet = Packet::from_data(None, tx).unwrap(); - packet.meta.sender_stake = sender_stake as u64; + packet.meta_mut().sender_stake = sender_stake as u64; packet }) .collect(), @@ -66,7 +66,7 @@ fn build_randomized_packet_batch( ); let mut packet = Packet::from_data(None, tx).unwrap(); let sender_stake = distribution.sample(&mut rng); - packet.meta.sender_stake = sender_stake as u64; + packet.meta_mut().sender_stake = sender_stake as u64; packet }) .collect(), @@ -120,8 +120,8 @@ fn bench_packet_clone(bencher: &mut Bencher) { let mut timer = Measure::start("insert_batch"); packet_batch.iter().for_each(|packet| { let mut packet = packet.clone(); - packet.meta.sender_stake *= 2; - if packet.meta.sender_stake > 2 { + packet.meta_mut().sender_stake *= 2; + if packet.meta().sender_stake > 2 { outer_packet = packet; } }); diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index faa341d15c..6707171ec7 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -343,7 +343,7 @@ impl AncestorHashesService { keypair: &Keypair, ancestor_socket: &UdpSocket, ) -> Option<(Slot, DuplicateAncestorDecision)> { - let from_addr = packet.meta.socket_addr(); + let from_addr = packet.meta().socket_addr(); let packet_data = match packet.data(..) { Some(data) => data, None => { @@ -1205,7 +1205,9 @@ mod test { .recv_timeout(Duration::from_millis(10_000)) .unwrap(); let packet = &mut response_packet[0]; - packet.meta.set_socket_addr(&responder_info.serve_repair); + packet + .meta_mut() + .set_socket_addr(&responder_info.serve_repair); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, @@ -1465,7 +1467,7 @@ mod test { // Create invalid packet with fewer bytes than the size of the nonce let mut packet = Packet::default(); - packet.meta.size = 0; + packet.meta_mut().size = 0; assert!(AncestorHashesService::verify_and_process_ancestor_response( &packet, @@ -1573,7 +1575,9 @@ mod test { .recv_timeout(Duration::from_millis(10_000)) .unwrap(); let packet = &mut response_packet[0]; - packet.meta.set_socket_addr(&responder_info.serve_repair); + packet + .meta_mut() + .set_socket_addr(&responder_info.serve_repair); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index e3ad152fec..2e1831a34a 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -556,7 +556,7 @@ impl BankingStage { let packet_vec: Vec<_> = forwardable_packets .filter_map(|p| { - if !p.meta.forwarded() && data_budget.take(p.meta.size) { + if !p.meta().forwarded() && data_budget.take(p.meta().size) { Some(p.data(..)?.to_vec()) } else { None @@ -2122,7 +2122,7 @@ mod tests { with_vers.iter_mut().for_each(|(b, v)| { b.iter_mut() .zip(v) - .for_each(|(p, f)| p.meta.set_discard(*f == 0)) + .for_each(|(p, f)| p.meta_mut().set_discard(*f == 0)) }); with_vers.into_iter().map(|(b, _)| b).collect() } @@ -3925,7 +3925,7 @@ mod tests { let forwarded_packet = { let transaction = system_transaction::transfer(&keypair, &pubkey, 1, fwd_block_hash); let mut packet = Packet::from_data(None, transaction).unwrap(); - packet.meta.flags |= PacketFlags::FORWARDED; + packet.meta_mut().flags |= PacketFlags::FORWARDED; DeserializedPacket::new(packet).unwrap() }; @@ -4005,7 +4005,7 @@ mod tests { let num_received = recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); assert_eq!(num_received, expected_ids.len(), "{}", name); for (i, expected_id) in expected_ids.iter().enumerate() { - assert_eq!(packets[i].meta.size, 215); + assert_eq!(packets[i].meta().size, 215); let recv_transaction: VersionedTransaction = packets[i].deserialize_slice(..).unwrap(); assert_eq!( diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 511d5e74aa..8e56a210ed 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -347,7 +347,7 @@ impl ClusterInfoVoteListener { .filter(|(_, packet_batch)| { // to_packet_batches() above splits into 1 packet long batches assert_eq!(packet_batch.len(), 1); - !packet_batch[0].meta.discard() + !packet_batch[0].meta().discard() }) .filter_map(|(tx, packet_batch)| { let (vote_account_key, vote, ..) = vote_parser::parse_vote_transaction(&tx)?; diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 9d0b404fc2..6ffeeaaacc 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -102,7 +102,7 @@ impl FetchStage { poh_recorder: &Arc>, ) -> Result<()> { let mark_forwarded = |packet: &mut Packet| { - packet.meta.flags |= PacketFlags::FORWARDED; + packet.meta_mut().flags |= PacketFlags::FORWARDED; }; let mut packet_batch = recvr.recv()?; diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs index 53f1d03366..bc23652b5e 100644 --- a/core/src/find_packet_sender_stake_stage.rs +++ b/core/src/find_packet_sender_stake_stage.rs @@ -150,8 +150,8 @@ impl FindPacketSenderStakeStage { .iter_mut() .flat_map(|batch| batch.iter_mut()) .for_each(|packet| { - packet.meta.sender_stake = ip_to_stake - .get(&packet.meta.addr) + packet.meta_mut().sender_stake = ip_to_stake + .get(&packet.meta().addr) .copied() .unwrap_or_default(); }); diff --git a/core/src/immutable_deserialized_packet.rs b/core/src/immutable_deserialized_packet.rs index 7a53dcf87b..996d3d588f 100644 --- a/core/src/immutable_deserialized_packet.rs +++ b/core/src/immutable_deserialized_packet.rs @@ -54,7 +54,7 @@ impl ImmutableDeserializedPacket { let sanitized_transaction = SanitizedVersionedTransaction::try_from(versioned_transaction)?; let message_bytes = packet_message(&packet)?; let message_hash = Message::hash_raw_message(message_bytes); - let is_simple_vote = packet.meta.is_simple_vote_tx(); + let is_simple_vote = packet.meta().is_simple_vote_tx(); // drop transaction if prioritization fails. let mut priority_details = priority_details diff --git a/core/src/latest_unprocessed_votes.rs b/core/src/latest_unprocessed_votes.rs index cd0c3522b8..af57444619 100644 --- a/core/src/latest_unprocessed_votes.rs +++ b/core/src/latest_unprocessed_votes.rs @@ -34,7 +34,7 @@ pub struct LatestValidatorVotePacket { impl LatestValidatorVotePacket { pub fn new(packet: Packet, vote_source: VoteSource) -> Result { - if !packet.meta.is_simple_vote_tx() { + if !packet.meta().is_simple_vote_tx() { return Err(DeserializedPacketError::VoteTransactionError); } @@ -347,7 +347,10 @@ mod tests { None, ); let mut packet = Packet::from_data(None, vote_tx).unwrap(); - packet.meta.flags.set(PacketFlags::SIMPLE_VOTE_TX, true); + packet + .meta_mut() + .flags + .set(PacketFlags::SIMPLE_VOTE_TX, true); LatestValidatorVotePacket::new(packet, vote_source).unwrap() } @@ -380,7 +383,7 @@ mod tests { ), ) .unwrap(); - vote.meta.flags.set(PacketFlags::SIMPLE_VOTE_TX, true); + vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true); let mut vote_switch = Packet::from_data( None, new_vote_transaction( @@ -395,7 +398,7 @@ mod tests { ) .unwrap(); vote_switch - .meta + .meta_mut() .flags .set(PacketFlags::SIMPLE_VOTE_TX, true); let mut vote_state_update = Packet::from_data( @@ -411,7 +414,7 @@ mod tests { ) .unwrap(); vote_state_update - .meta + .meta_mut() .flags .set(PacketFlags::SIMPLE_VOTE_TX, true); let mut vote_state_update_switch = Packet::from_data( @@ -427,7 +430,7 @@ mod tests { ) .unwrap(); vote_state_update_switch - .meta + .meta_mut() .flags .set(PacketFlags::SIMPLE_VOTE_TX, true); let random_transaction = Packet::from_data( diff --git a/core/src/packet_deserializer.rs b/core/src/packet_deserializer.rs index 3511d2540c..a27718381d 100644 --- a/core/src/packet_deserializer.rs +++ b/core/src/packet_deserializer.rs @@ -122,7 +122,7 @@ impl PacketDeserializer { packet_batch .iter() .enumerate() - .filter(|(_, pkt)| !pkt.meta.discard()) + .filter(|(_, pkt)| !pkt.meta().discard()) .map(|(index, _)| index) .collect() } @@ -179,7 +179,7 @@ mod tests { let transactions = vec![random_transfer(), random_transfer()]; let mut packet_batches = to_packet_batches(&transactions, 1); assert_eq!(packet_batches.len(), 2); - packet_batches[0][0].meta.set_discard(true); + packet_batches[0][0].meta_mut().set_discard(true); let results = PacketDeserializer::deserialize_and_collect_packets(&packet_batches, None); assert_eq!(results.deserialized_packets.len(), 1); diff --git a/core/src/repair_response.rs b/core/src/repair_response.rs index 52623677aa..01b876af5f 100644 --- a/core/src/repair_response.rs +++ b/core/src/repair_response.rs @@ -32,8 +32,8 @@ pub fn repair_response_packet_from_bytes( if size > packet.buffer_mut().len() { return None; } - packet.meta.size = size; - packet.meta.set_socket_addr(dest); + packet.meta_mut().size = size; + packet.meta_mut().set_socket_addr(dest); packet.buffer_mut()[..bytes.len()].copy_from_slice(&bytes); let mut wr = io::Cursor::new(&mut packet.buffer_mut()[bytes.len()..]); bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce"); @@ -90,7 +90,7 @@ mod test { nonce, ) .unwrap(); - packet.meta.flags |= PacketFlags::REPAIR; + packet.meta_mut().flags |= PacketFlags::REPAIR; let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 361ac8177e..e58be14bcc 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -485,7 +485,7 @@ impl ServeRepair { } }; - let from_addr = packet.meta.socket_addr(); + let from_addr = packet.meta().socket_addr(); if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) { stats.err_malformed += 1; continue; @@ -807,7 +807,7 @@ impl ServeRepair { Some(rsp) => rsp, }; let num_response_packets = rsp.len(); - let num_response_bytes = rsp.iter().map(|p| p.meta.size).sum(); + let num_response_bytes = rsp.iter().map(|p| p.meta().size).sum(); if data_budget.take(num_response_bytes) && response_sender.send(rsp).is_ok() { stats.total_response_packets += num_response_packets; match stake > 0 { @@ -984,7 +984,7 @@ impl ServeRepair { ) { let mut pending_pongs = Vec::default(); for packet in packet_batch.iter_mut() { - if packet.meta.size != REPAIR_RESPONSE_SERIALIZED_PING_BYTES { + if packet.meta().size != REPAIR_RESPONSE_SERIALIZED_PING_BYTES { continue; } if let Ok(RepairResponse::Ping(ping)) = packet.deserialize_slice(..) { @@ -998,12 +998,12 @@ impl ServeRepair { stats.ping_err_verify_count += 1; continue; } - packet.meta.set_discard(true); + packet.meta_mut().set_discard(true); stats.ping_count += 1; if let Ok(pong) = Pong::new(&ping, keypair) { let pong = RepairProtocol::Pong(pong); if let Ok(pong_bytes) = serialize(&pong) { - let from_addr = packet.meta.socket_addr(); + let from_addr = packet.meta().socket_addr(); pending_pongs.push((pong_bytes, from_addr)); } } @@ -1210,7 +1210,7 @@ mod tests { let ping = Ping::new_rand(&mut rng, &keypair).unwrap(); let ping = RepairResponse::Ping(ping); let pkt = Packet::from_data(None, ping).unwrap(); - assert_eq!(pkt.meta.size, REPAIR_RESPONSE_SERIALIZED_PING_BYTES); + assert_eq!(pkt.meta().size, REPAIR_RESPONSE_SERIALIZED_PING_BYTES); } #[test] @@ -1230,7 +1230,7 @@ mod tests { shred.sign(&keypair); let mut pkt = Packet::default(); shred.copy_to_packet(&mut pkt); - pkt.meta.size = REPAIR_RESPONSE_SERIALIZED_PING_BYTES; + pkt.meta_mut().size = REPAIR_RESPONSE_SERIALIZED_PING_BYTES; let res = pkt.deserialize_slice::(..); if let Ok(RepairResponse::Ping(ping)) = res { assert!(!ping.verify()); @@ -1870,7 +1870,7 @@ mod tests { fn test_run_ancestor_hashes() { fn deserialize_ancestor_hashes_response(packet: &Packet) -> AncestorHashesResponse { packet - .deserialize_slice(..packet.meta.size - SIZE_OF_NONCE) + .deserialize_slice(..packet.meta().size - SIZE_OF_NONCE) .unwrap() } diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index cde36c8389..03ab5ee114 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -95,9 +95,9 @@ impl ShredFetchStage { &mut shreds_received, &mut stats, ) { - packet.meta.set_discard(true); + packet.meta_mut().set_discard(true); } else { - packet.meta.flags.insert(flags); + packet.meta_mut().flags.insert(flags); } } stats.maybe_submit(name, STATS_SUBMIT_CADENCE); diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index b9aba19204..573da5eef1 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -94,7 +94,7 @@ impl SigVerifier for TransactionSigVerifier { is_dup: bool, ) { sigverify::check_for_tracer_packet(packet); - if packet.meta.is_tracer_packet() { + if packet.meta().is_tracer_packet() { if removed_before_sigverify_stage { self.tracer_packet_stats .total_removed_before_sigverify_stage += 1; @@ -110,14 +110,14 @@ impl SigVerifier for TransactionSigVerifier { #[inline(always)] fn process_excess_packet(&mut self, packet: &Packet) { - if packet.meta.is_tracer_packet() { + if packet.meta().is_tracer_packet() { self.tracer_packet_stats.total_excess_tracer_packets += 1; } } #[inline(always)] fn process_passed_sigverify_packet(&mut self, packet: &Packet) { - if packet.meta.is_tracer_packet() { + if packet.meta().is_tracer_packet() { self.tracer_packet_stats .total_tracker_packets_passed_sigverify += 1; } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 0e5fa278cd..8a4c335752 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -92,7 +92,7 @@ fn run_shred_sigverify( let shreds: Vec<_> = packets .iter() .flat_map(PacketBatch::iter) - .filter(|packet| !packet.meta.discard() && !packet.meta.repair()) + .filter(|packet| !packet.meta().discard() && !packet.meta().repair()) .filter_map(shred::layout::get_shred) .map(<[u8]>::to_vec) .collect(); @@ -137,13 +137,13 @@ fn get_slot_leaders( let mut leaders = HashMap::>::new(); for batch in batches { for packet in batch.iter_mut() { - if packet.meta.discard() { + if packet.meta().discard() { continue; } let shred = shred::layout::get_shred(packet); let slot = match shred.and_then(shred::layout::get_slot) { None => { - packet.meta.set_discard(true); + packet.meta_mut().set_discard(true); continue; } Some(slot) => slot, @@ -154,7 +154,7 @@ fn get_slot_leaders( (&leader != self_pubkey).then_some(leader) }); if leader.is_none() { - packet.meta.set_discard(true); + packet.meta_mut().set_discard(true); } } } @@ -165,7 +165,7 @@ fn count_discards(packets: &[PacketBatch]) -> usize { packets .iter() .flat_map(PacketBatch::iter) - .filter(|packet| packet.meta.discard()) + .filter(|packet| packet.meta().discard()) .count() } @@ -265,7 +265,7 @@ mod tests { ); shred.sign(&leader_keypair); batches[0][0].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0][0].meta.size = shred.payload().len(); + batches[0][0].meta_mut().size = shred.payload().len(); let mut shred = Shred::new_from_data( 0, @@ -280,7 +280,7 @@ mod tests { let wrong_keypair = Keypair::new(); shred.sign(&wrong_keypair); batches[0][1].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0][1].meta.size = shred.payload().len(); + batches[0][1].meta_mut().size = shred.payload().len(); verify_packets( &Pubkey::new_unique(), // self_pubkey @@ -289,7 +289,7 @@ mod tests { &RecyclerCache::warmed(), &mut batches, ); - assert!(!batches[0][0].meta.discard()); - assert!(batches[0][1].meta.discard()); + assert!(!batches[0][0].meta().discard()); + assert!(batches[0][1].meta().discard()); } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 15fa4fc7cd..9438327d86 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -253,8 +253,8 @@ impl SigVerifyStage { .iter_mut() .rev() .flat_map(|batch| batch.iter_mut().rev()) - .filter(|packet| !packet.meta.discard()) - .map(|packet| (packet.meta.addr, packet)) + .filter(|packet| !packet.meta().discard()) + .map(|packet| (packet.meta().addr, packet)) .into_group_map(); // Allocate max_packets evenly across addresses. while max_packets > 0 && !addrs.is_empty() { @@ -269,7 +269,7 @@ impl SigVerifyStage { // Discard excess packets from each address. for packet in addrs.into_values().flatten() { process_excess_packet(packet); - packet.meta.set_discard(true); + packet.meta_mut().set_discard(true); } } @@ -473,7 +473,7 @@ mod tests { packet_batches .iter() .flatten() - .filter(|p| !p.meta.discard()) + .filter(|p| !p.meta().discard()) .count() } @@ -483,18 +483,18 @@ mod tests { let batch_size = 10; let mut batch = PacketBatch::with_capacity(batch_size); let mut tracer_packet = Packet::default(); - tracer_packet.meta.flags |= PacketFlags::TRACER_PACKET; + tracer_packet.meta_mut().flags |= PacketFlags::TRACER_PACKET; batch.resize(batch_size, tracer_packet); - batch[3].meta.addr = std::net::IpAddr::from([1u16; 8]); - batch[3].meta.set_discard(true); + batch[3].meta_mut().addr = std::net::IpAddr::from([1u16; 8]); + batch[3].meta_mut().set_discard(true); let num_discarded_before_filter = 1; - batch[4].meta.addr = std::net::IpAddr::from([2u16; 8]); + batch[4].meta_mut().addr = std::net::IpAddr::from([2u16; 8]); let total_num_packets = batch.len(); let mut batches = vec![batch]; let max = 3; let mut total_tracer_packets_discarded = 0; SigVerifyStage::discard_excess_packets(&mut batches, max, |packet| { - if packet.meta.is_tracer_packet() { + if packet.meta().is_tracer_packet() { total_tracer_packets_discarded += 1; } }); @@ -508,9 +508,9 @@ mod tests { total_discarded - num_discarded_before_filter ); assert_eq!(total_non_discard, max); - assert!(!batches[0][0].meta.discard()); - assert!(batches[0][3].meta.discard()); - assert!(!batches[0][4].meta.discard()); + assert!(!batches[0][0].meta().discard()); + assert!(batches[0][3].meta().discard()); + assert!(!batches[0][4].meta().discard()); } fn gen_batches( @@ -556,7 +556,7 @@ mod tests { sent_len += batch.len(); batch .iter_mut() - .for_each(|packet| packet.meta.flags |= PacketFlags::TRACER_PACKET); + .for_each(|packet| packet.meta_mut().flags |= PacketFlags::TRACER_PACKET); assert_eq!(batch.len(), packets_per_batch); packet_s.send(vec![batch]).unwrap(); } @@ -637,7 +637,7 @@ mod tests { batches.iter_mut().for_each(|batch| { batch.iter_mut().for_each(|p| { if ((index + 1) as f64 / num_packets as f64) < MAX_DISCARDED_PACKET_RATE { - p.meta.set_discard(true); + p.meta_mut().set_discard(true); } index += 1; }) @@ -647,7 +647,7 @@ mod tests { assert_eq!(SigVerifyStage::maybe_shrink_batches(&mut batches).1, 0); // discard one more to exceed shrink threshold - batches.last_mut().unwrap()[0].meta.set_discard(true); + batches.last_mut().unwrap()[0].meta_mut().set_discard(true); let expected_num_shrunk_batches = 1.max((num_generated_batches as f64 * MAX_DISCARDED_PACKET_RATE) as usize); diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index f186bfbee2..17cf859724 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -126,7 +126,7 @@ impl UnprocessedPacketBatches { if dropped_packet .immutable_section() .original_packet() - .meta + .meta() .is_tracer_packet() { num_dropped_tracer_packets += 1; @@ -478,7 +478,7 @@ mod tests { packet_vector.push(Packet::from_data(None, tx).unwrap()); } for index in vote_indexes.iter() { - packet_vector[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX; + packet_vector[*index].meta_mut().flags |= PacketFlags::SIMPLE_VOTE_TX; } packet_vector diff --git a/core/src/unprocessed_transaction_storage.rs b/core/src/unprocessed_transaction_storage.rs index 5d2e0fe5ee..7a4d522619 100644 --- a/core/src/unprocessed_transaction_storage.rs +++ b/core/src/unprocessed_transaction_storage.rs @@ -941,7 +941,7 @@ impl ThreadLocalUnprocessedPackets { .filter_map(|immutable_deserialized_packet| { let is_tracer_packet = immutable_deserialized_packet .original_packet() - .meta + .meta() .is_tracer_packet(); if is_tracer_packet { saturating_add_assign!(*total_tracer_packets_in_buffer, 1); @@ -1060,8 +1060,8 @@ mod tests { .enumerate() .map(|(packets_id, transaction)| { let mut p = Packet::from_data(None, transaction).unwrap(); - p.meta.port = packets_id as u16; - p.meta.set_tracer(true); + p.meta_mut().port = packets_id as u16; + p.meta_mut().set_tracer(true); DeserializedPacket::new(p).unwrap() }) .collect_vec(); @@ -1099,7 +1099,7 @@ mod tests { batch .get_forwardable_packets() .into_iter() - .map(|p| p.meta.port) + .map(|p| p.meta().port) }) .collect(); forwarded_ports.sort_unstable(); @@ -1196,7 +1196,7 @@ mod tests { None, ), )?; - vote.meta.flags.set(PacketFlags::SIMPLE_VOTE_TX, true); + vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true); let big_transfer = Packet::from_data( None, system_transaction::transfer(&keypair, &pubkey, 1000000, Hash::new_unique()), @@ -1269,8 +1269,8 @@ mod tests { .enumerate() .map(|(packets_id, transaction)| { let mut p = Packet::from_data(None, transaction).unwrap(); - p.meta.port = packets_id as u16; - p.meta.set_tracer(true); + p.meta_mut().port = packets_id as u16; + p.meta_mut().set_tracer(true); DeserializedPacket::new(p).unwrap() }) .collect_vec(); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index a3eafc890b..8ca1861cb9 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -233,14 +233,14 @@ where ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us(); ws_metrics.run_insert_count += 1; let handle_packet = |packet: &Packet| { - if packet.meta.discard() { + if packet.meta().discard() { return None; } let shred = shred::layout::get_shred(packet)?; let shred = Shred::new_from_serialized_shred(shred.to_vec()).ok()?; - if packet.meta.repair() { + if packet.meta().repair() { let repair_info = RepairMeta { - _from_addr: packet.meta.socket_addr(), + _from_addr: packet.meta().socket_addr(), // If can't parse the nonce, dump the packet. nonce: repair_response::nonce(packet)?, }; @@ -261,7 +261,7 @@ where ws_metrics.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count(); ws_metrics.num_shreds_received += shreds.len(); for packet in packets.iter().flat_map(PacketBatch::iter) { - let addr = packet.meta.socket_addr(); + let addr = packet.meta().socket_addr(); *ws_metrics.addrs.entry(addr).or_default() += 1; } diff --git a/entry/src/entry.rs b/entry/src/entry.rs index c9ed6d754b..9f8bb2e816 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -511,7 +511,7 @@ pub fn start_verify_transactions( .map(|tx| tx.to_versioned_transaction()); let res = packet_batch.par_iter_mut().zip(entry_tx_iter).all(|pair| { - pair.0.meta = Meta::default(); + *pair.0.meta_mut() = Meta::default(); Packet::populate_packet(pair.0, None, &pair.1).is_ok() }); if res { @@ -538,7 +538,7 @@ pub fn start_verify_transactions( ); let verified = packet_batches .iter() - .all(|batch| batch.iter().all(|p| !p.meta.discard())); + .all(|batch| batch.iter().all(|p| !p.meta().discard())); verify_time.stop(); (verified, verify_time.as_us()) }) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index c4610b7d95..d0aaf2e80e 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1947,7 +1947,7 @@ impl ClusterInfo { } check }; - // Because pull-responses are sent back to packet.meta.socket_addr() of + // Because pull-responses are sent back to packet.meta().socket_addr() of // incoming pull-requests, pings are also sent to request.from_addr (as // opposed to caller.gossip address). move |request| { @@ -2041,8 +2041,8 @@ impl ClusterInfo { match Packet::from_data(Some(addr), response) { Err(err) => error!("failed to write pull-response packet: {:?}", err), Ok(packet) => { - if self.outbound_budget.take(packet.meta.size) { - total_bytes += packet.meta.size; + if self.outbound_budget.take(packet.meta().size) { + total_bytes += packet.meta().size; packet_batch.push(packet); sent += 1; } else { @@ -2520,7 +2520,7 @@ impl ClusterInfo { let protocol: Protocol = packet.deserialize_slice(..).ok()?; protocol.sanitize().ok()?; let protocol = protocol.par_verify(&self.stats)?; - Some((packet.meta.socket_addr(), protocol)) + Some((packet.meta().socket_addr(), protocol)) }; let packets: Vec<_> = { let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time); @@ -3412,7 +3412,7 @@ RPC Enabled Nodes: 1"#; remote_nodes.into_iter(), pongs.into_iter() ) { - assert_eq!(packet.meta.socket_addr(), socket); + assert_eq!(packet.meta().socket_addr(), socket); let bytes = serialize(&pong).unwrap(); match packet.deserialize_slice(..).unwrap() { Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes), diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index 066bdb24f7..2a8f12c58a 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -255,7 +255,7 @@ pub fn cluster_info_retransmit() { } assert!(done); let mut p = Packet::default(); - p.meta.size = 10; + p.meta_mut().size = 10; let peers = c1.tvu_peers(); let retransmit_peers: Vec<_> = peers.iter().collect(); retransmit_to( diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 0695236214..f36ac9b64e 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -330,7 +330,7 @@ impl Shred { let payload = self.payload(); let size = payload.len(); packet.buffer_mut()[..size].copy_from_slice(&payload[..]); - packet.meta.size = size; + packet.meta_mut().size = size; } // TODO: Should this sanitize output? @@ -542,7 +542,7 @@ pub mod layout { fn get_shred_size(packet: &Packet) -> Option { let size = packet.data(..)?.len(); - if packet.meta.repair() { + if packet.meta().repair() { size.checked_sub(SIZE_OF_NONCE) } else { Some(size) @@ -1066,7 +1066,7 @@ mod tests { )); assert_eq!(stats, ShredFetchStats::default()); - packet.meta.size = OFFSET_OF_SHRED_VARIANT; + packet.meta_mut().size = OFFSET_OF_SHRED_VARIANT; assert!(should_discard_shred( &packet, root, @@ -1076,7 +1076,7 @@ mod tests { )); assert_eq!(stats.index_overrun, 1); - packet.meta.size = OFFSET_OF_SHRED_INDEX; + packet.meta_mut().size = OFFSET_OF_SHRED_INDEX; assert!(should_discard_shred( &packet, root, @@ -1086,7 +1086,7 @@ mod tests { )); assert_eq!(stats.index_overrun, 2); - packet.meta.size = OFFSET_OF_SHRED_INDEX + 1; + packet.meta_mut().size = OFFSET_OF_SHRED_INDEX + 1; assert!(should_discard_shred( &packet, root, @@ -1096,7 +1096,7 @@ mod tests { )); assert_eq!(stats.index_overrun, 3); - packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX - 1; + packet.meta_mut().size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX - 1; assert!(should_discard_shred( &packet, root, @@ -1106,7 +1106,7 @@ mod tests { )); assert_eq!(stats.index_overrun, 4); - packet.meta.size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX + 2; + packet.meta_mut().size = OFFSET_OF_SHRED_INDEX + SIZE_OF_SHRED_INDEX + 2; assert!(should_discard_shred( &packet, root, @@ -1419,7 +1419,7 @@ mod tests { }); let mut packet = Packet::default(); packet.buffer_mut()[..payload.len()].copy_from_slice(&payload); - packet.meta.size = payload.len(); + packet.meta_mut().size = payload.len(); assert_eq!(shred.bytes_to_store(), payload); assert_eq!(shred, Shred::new_from_serialized_shred(payload).unwrap()); verify_shred_layout(&shred, &packet); @@ -1452,7 +1452,7 @@ mod tests { let payload = bs58_decode(PAYLOAD); let mut packet = Packet::default(); packet.buffer_mut()[..payload.len()].copy_from_slice(&payload); - packet.meta.size = payload.len(); + packet.meta_mut().size = payload.len(); assert_eq!(shred.bytes_to_store(), payload); assert_eq!(shred, Shred::new_from_serialized_shred(payload).unwrap()); verify_shred_layout(&shred, &packet); @@ -1492,7 +1492,7 @@ mod tests { }); let mut packet = Packet::default(); packet.buffer_mut()[..payload.len()].copy_from_slice(&payload); - packet.meta.size = payload.len(); + packet.meta_mut().size = payload.len(); assert_eq!(shred.bytes_to_store(), payload); assert_eq!(shred, Shred::new_from_serialized_shred(payload).unwrap()); verify_shred_layout(&shred, &packet); diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 653d331a76..39185a5ff9 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -35,7 +35,7 @@ pub fn verify_shred_cpu( packet: &Packet, slot_leaders: &HashMap, ) -> bool { - if packet.meta.discard() { + if packet.meta().discard() { return false; } let shred = match shred::layout::get_shred(packet) { @@ -101,7 +101,7 @@ where .into_par_iter() .flat_map_iter(|batch| { batch.iter().map(|packet| { - if packet.meta.discard() { + if packet.meta().discard() { return Slot::MAX; } let shred = shred::layout::get_shred(packet); @@ -278,7 +278,7 @@ fn sign_shred_cpu(keypair: &Keypair, packet: &mut Packet) { .and_then(shred::layout::get_signed_message_range) .unwrap(); assert!( - packet.meta.size >= sig.end, + packet.meta().size >= sig.end, "packet is not large enough for a signature" ); let signature = keypair.sign_message(packet.data(msg).unwrap()); @@ -445,7 +445,7 @@ mod tests { shred.sign(&keypair); trace!("signature {}", shred.signature()); packet.buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload()); - packet.meta.size = shred.payload().len(); + packet.meta_mut().size = shred.payload().len(); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() @@ -486,7 +486,7 @@ mod tests { shred.sign(&keypair); batches[0].resize(1, Packet::default()); batches[0][0].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0][0].meta.size = shred.payload().len(); + batches[0][0].meta_mut().size = shred.payload().len(); let leader_slots = [(slot, keypair.pubkey().to_bytes())] .iter() @@ -511,7 +511,7 @@ mod tests { .iter() .cloned() .collect(); - batches[0][0].meta.size = 0; + batches[0][0].meta_mut().size = 0; let rv = verify_shreds_cpu(&batches, &leader_slots); assert_eq!(rv, vec![vec![0]]); } @@ -540,7 +540,7 @@ mod tests { shred.sign(&keypair); batches[0].resize(1, Packet::default()); batches[0][0].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0][0].meta.size = shred.payload().len(); + batches[0][0].meta_mut().size = shred.payload().len(); let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), @@ -567,7 +567,7 @@ mod tests { let rv = verify_shreds_gpu(&batches, &leader_slots, &recycler_cache); assert_eq!(rv, vec![vec![0]]); - batches[0][0].meta.size = 0; + batches[0][0].meta_mut().size = 0; let leader_slots = [ (std::u64::MAX, Pubkey::default().to_bytes()), (slot, keypair.pubkey().to_bytes()), @@ -651,7 +651,7 @@ mod tests { ); batches[0].resize(1, Packet::default()); batches[0][0].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload()); - batches[0][0].meta.size = shred.payload().len(); + batches[0][0].meta_mut().size = shred.payload().len(); let pubkeys = [ (slot, keypair.pubkey().to_bytes()), diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index c4f3169979..6e00d7da3a 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -30,7 +30,7 @@ fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) deduper.reset(); batches .iter_mut() - .for_each(|b| b.iter_mut().for_each(|p| p.meta.set_discard(false))); + .for_each(|b| b.iter_mut().for_each(|p| p.meta_mut().set_discard(false))); }); } diff --git a/perf/benches/shrink.rs b/perf/benches/shrink.rs index e813a53035..08f1915789 100644 --- a/perf/benches/shrink.rs +++ b/perf/benches/shrink.rs @@ -27,7 +27,7 @@ fn do_bench_shrink_packets(bencher: &mut Bencher, mut batches: Vec) sigverify::shrink_batches(&mut batches); batches.iter_mut().for_each(|b| { b.iter_mut() - .for_each(|p| p.meta.set_discard(thread_rng().gen())) + .for_each(|p| p.meta_mut().set_discard(thread_rng().gen())) }); }); } @@ -75,7 +75,7 @@ fn bench_shrink_count_packets(bencher: &mut Bencher) { ); batches.iter_mut().for_each(|b| { b.iter_mut() - .for_each(|p| p.meta.set_discard(thread_rng().gen())) + .for_each(|p| p.meta_mut().set_discard(thread_rng().gen())) }); bencher.iter(|| { diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 65991a950b..e2cfddb159 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -155,7 +155,7 @@ fn bench_sigverify_uneven(bencher: &mut Bencher) { }; Packet::populate_packet(packet, None, &tx).expect("serialize request"); if thread_rng().gen_ratio((num_packets - NUM) as u32, num_packets as u32) { - packet.meta.set_discard(true); + packet.meta_mut().set_discard(true); } else { num_valid += 1; } diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 37f8f1b2be..4e6b64fe15 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -122,7 +122,7 @@ impl PacketBatch { pub fn set_addr(&mut self, addr: &SocketAddr) { for p in self.iter_mut() { - p.meta.set_socket_addr(addr); + p.meta_mut().set_socket_addr(addr); } } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 844c5f4d3d..cc9d9a0f0b 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -121,7 +121,7 @@ pub fn init() { #[must_use] fn verify_packet(packet: &mut Packet, reject_non_vote: bool) -> bool { // If this packet was already marked as discard, drop it - if packet.meta.discard() { + if packet.meta().discard() { return false; } @@ -134,7 +134,7 @@ fn verify_packet(packet: &mut Packet, reject_non_vote: bool) -> bool { return false; } - if packet.meta.size <= msg_start { + if packet.meta().size <= msg_start { return false; } @@ -179,7 +179,7 @@ pub fn count_valid_packets( batch .iter() .filter(|p| { - let should_keep = !p.meta.discard(); + let should_keep = !p.meta().discard(); if should_keep { process_valid_packet(p); } @@ -193,7 +193,7 @@ pub fn count_valid_packets( pub fn count_discarded_packets(batches: &[PacketBatch]) -> usize { batches .iter() - .map(|batch| batch.iter().filter(|p| p.meta.discard()).count()) + .map(|batch| batch.iter().filter(|p| p.meta().discard()).count()) .sum() } @@ -205,7 +205,7 @@ fn do_get_packet_offsets( // should have at least 1 signature and sig lengths let _ = 1usize .checked_add(size_of::()) - .filter(|v| *v <= packet.meta.size) + .filter(|v| *v <= packet.meta().size) .ok_or(PacketError::InvalidLen)?; // read the length of Transaction.signatures (serialized with short_vec) @@ -223,7 +223,7 @@ fn do_get_packet_offsets( // Determine the start of the message header by checking the message prefix bit. let msg_header_offset = { // Packet should have data for prefix bit - if msg_start_offset >= packet.meta.size { + if msg_start_offset >= packet.meta().size { return Err(PacketError::InvalidSignatureLen); } @@ -258,7 +258,7 @@ fn do_get_packet_offsets( // Packet should have data at least for MessageHeader and 1 byte for Message.account_keys.len let _ = msg_header_offset_plus_one .checked_add(MESSAGE_HEADER_LENGTH) - .filter(|v| *v <= packet.meta.size) + .filter(|v| *v <= packet.meta().size) .ok_or(PacketError::InvalidSignatureLen)?; // read MessageHeader.num_required_signatures (serialized with u8) @@ -298,7 +298,7 @@ fn do_get_packet_offsets( let _ = pubkey_len .checked_mul(size_of::()) .and_then(|v| v.checked_add(pubkey_start)) - .filter(|v| *v <= packet.meta.size) + .filter(|v| *v <= packet.meta().size) .ok_or(PacketError::InvalidPubkeyLen)?; if pubkey_len < sig_len_untrusted { @@ -333,7 +333,7 @@ pub fn check_for_tracer_packet(packet: &mut Packet) -> bool { // Check for tracer pubkey match packet.data(first_pubkey_start..first_pubkey_end) { Some(pubkey) if pubkey == TRACER_KEY.as_ref() => { - packet.meta.set_tracer(true); + packet.meta_mut().set_tracer(true); true } _ => false, @@ -348,7 +348,7 @@ fn get_packet_offsets( let unsanitized_packet_offsets = do_get_packet_offsets(packet, current_offset); if let Ok(offsets) = unsanitized_packet_offsets { check_for_simple_vote_transaction(packet, &offsets, current_offset).ok(); - if !reject_non_vote || packet.meta.is_simple_vote_tx() { + if !reject_non_vote || packet.meta().is_simple_vote_tx() { return offsets; } } @@ -380,7 +380,7 @@ fn check_for_simple_vote_transaction( // Packet should have at least 1 more byte for instructions.len let _ = instructions_len_offset .checked_add(1usize) - .filter(|v| *v <= packet.meta.size) + .filter(|v| *v <= packet.meta().size) .ok_or(PacketError::InvalidLen)?; let (instruction_len, instruction_len_size) = packet @@ -399,7 +399,7 @@ fn check_for_simple_vote_transaction( // Packet should have at least 1 more byte for one instructions_program_id let _ = instruction_start .checked_add(1usize) - .filter(|v| *v <= packet.meta.size) + .filter(|v| *v <= packet.meta().size) .ok_or(PacketError::InvalidLen)?; let instruction_program_id_index: usize = usize::from( @@ -425,7 +425,7 @@ fn check_for_simple_vote_transaction( .ok_or(PacketError::InvalidLen)? == solana_sdk::vote::program::id().as_ref() { - packet.meta.flags |= PacketFlags::SIMPLE_VOTE_TX; + packet.meta_mut().flags |= PacketFlags::SIMPLE_VOTE_TX; } Ok(()) } @@ -458,7 +458,7 @@ pub fn generate_offsets( let mut pubkey_offset = packet_offsets.pubkey_start; let mut sig_offset = packet_offsets.sig_start; - let msg_size = current_offset.saturating_add(packet.meta.size) as u32; + let msg_size = current_offset.saturating_add(packet.meta().size) as u32; for _ in 0..packet_offsets.sig_len { signature_offsets.push(sig_offset); sig_offset = sig_offset.saturating_add(size_of::() as u32); @@ -536,7 +536,7 @@ impl Deduper { // Deduplicates packets and returns 1 if packet is to be discarded. Else, 0. fn dedup_packet(&self, packet: &mut Packet) -> u64 { // If this packet was already marked as discard, drop it - if packet.meta.discard() { + if packet.meta().discard() { return 1; } let (hash, pos) = self.compute_hash(packet); @@ -548,7 +548,7 @@ impl Deduper { self.filter[pos].store(hash, Ordering::Relaxed); } if hash == prev & hash { - packet.meta.set_discard(true); + packet.meta_mut().set_discard(true); return 1; } 0 @@ -562,7 +562,7 @@ impl Deduper { let mut num_removed: u64 = 0; batches.iter_mut().for_each(|batch| { batch.iter_mut().for_each(|p| { - let removed_before_sigverify = p.meta.discard(); + let removed_before_sigverify = p.meta().discard(); let is_duplicate = self.dedup_packet(p); if is_duplicate == 1 { saturating_add_assign!(num_removed, 1); @@ -581,17 +581,17 @@ pub fn shrink_batches(batches: &mut Vec) { let mut last_valid_batch = 0; for batch_ix in 0..batches.len() { for packet_ix in 0..batches[batch_ix].len() { - if batches[batch_ix][packet_ix].meta.discard() { + if batches[batch_ix][packet_ix].meta().discard() { continue; } last_valid_batch = batch_ix.saturating_add(1); let mut found_spot = false; while valid_batch_ix < batch_ix && !found_spot { while valid_packet_ix < batches[valid_batch_ix].len() { - if batches[valid_batch_ix][valid_packet_ix].meta.discard() { + if batches[valid_batch_ix][valid_packet_ix].meta().discard() { batches[valid_batch_ix][valid_packet_ix] = batches[batch_ix][packet_ix].clone(); - batches[batch_ix][packet_ix].meta.set_discard(true); + batches[batch_ix][packet_ix].meta_mut().set_discard(true); last_valid_batch = valid_batch_ix.saturating_add(1); found_spot = true; break; @@ -617,8 +617,8 @@ pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, pa // When using single thread, skip rayon overhead. batches.iter_mut().for_each(|batch| { batch.iter_mut().for_each(|packet| { - if !packet.meta.discard() && !verify_packet(packet, reject_non_vote) { - packet.meta.set_discard(true); + if !packet.meta().discard() && !verify_packet(packet, reject_non_vote) { + packet.meta_mut().set_discard(true); } }) }); @@ -633,8 +633,8 @@ pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, pa .into_par_iter() .with_min_len(packets_per_thread) .for_each(|packet: &mut Packet| { - if !packet.meta.discard() && !verify_packet(packet, reject_non_vote) { - packet.meta.set_discard(true); + if !packet.meta().discard() && !verify_packet(packet, reject_non_vote) { + packet.meta_mut().set_discard(true); } }) }); @@ -643,8 +643,8 @@ pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, pa PAR_THREAD_POOL.install(|| { batches.into_par_iter().for_each(|batch: &mut PacketBatch| { batch.par_iter_mut().for_each(|packet: &mut Packet| { - if !packet.meta.discard() && !verify_packet(packet, reject_non_vote) { - packet.meta.set_discard(true); + if !packet.meta().discard() && !verify_packet(packet, reject_non_vote) { + packet.meta_mut().set_discard(true); } }) }); @@ -656,9 +656,11 @@ pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, pa pub fn ed25519_verify_disabled(batches: &mut [PacketBatch]) { let packet_count = count_packets_in_batches(batches); debug!("disabled ECDSA for {}", packet_count); - batches - .into_par_iter() - .for_each(|batch| batch.par_iter_mut().for_each(|p| p.meta.set_discard(false))); + batches.into_par_iter().for_each(|batch| { + batch + .par_iter_mut() + .for_each(|p| p.meta_mut().set_discard(false)) + }); inc_new_counter_debug!("ed25519_verify_disabled", packet_count); } @@ -716,8 +718,8 @@ pub fn get_checked_scalar(scalar: &[u8; 32]) -> Result<[u8; 32], PacketError> { pub fn mark_disabled(batches: &mut [PacketBatch], r: &[Vec]) { for (batch, v) in batches.iter_mut().zip(r) { for (pkt, f) in batch.iter_mut().zip(v) { - if !pkt.meta.discard() { - pkt.meta.set_discard(*f == 0); + if !pkt.meta().discard() { + pkt.meta_mut().set_discard(*f == 0); } } } @@ -840,10 +842,10 @@ mod tests { batch.resize(batch_size, Packet::default()); let mut batches: Vec = vec![batch]; mark_disabled(&mut batches, &[vec![0]]); - assert!(batches[0][0].meta.discard()); - batches[0][0].meta.set_discard(false); + assert!(batches[0][0].meta().discard()); + batches[0][0].meta_mut().set_discard(false); mark_disabled(&mut batches, &[vec![1]]); - assert!(!batches[0][0].meta.discard()); + assert!(!batches[0][0].meta().discard()); } #[test] @@ -921,7 +923,7 @@ mod tests { packet.buffer_mut()[0] = 0xff; packet.buffer_mut()[1] = 0xff; - packet.meta.size = 2; + packet.meta_mut().size = 2; let res = sigverify::do_get_packet_offsets(&packet, 0); assert_eq!(res, Err(PacketError::InvalidLen)); @@ -943,10 +945,10 @@ mod tests { assert!(!verify_packet(&mut packet, false)); - packet.meta.set_discard(false); + packet.meta_mut().set_discard(false); let mut batches = generate_packet_batches(&packet, 1, 1); ed25519_verify(&mut batches); - assert!(batches[0][0].meta.discard()); + assert!(batches[0][0].meta().discard()); } #[test] @@ -977,10 +979,10 @@ mod tests { assert!(!verify_packet(&mut packet, false)); - packet.meta.set_discard(false); + packet.meta_mut().set_discard(false); let mut batches = generate_packet_batches(&packet, 1, 1); ed25519_verify(&mut batches); - assert!(batches[0][0].meta.discard()); + assert!(batches[0][0].meta().discard()); } #[test] @@ -1069,8 +1071,8 @@ mod tests { let msg_start = legacy_offsets.msg_start as usize; let msg_bytes = packet.data(msg_start..).unwrap().to_vec(); packet.buffer_mut()[msg_start] = MESSAGE_VERSION_PREFIX; - packet.meta.size += 1; - let msg_end = packet.meta.size; + packet.meta_mut().size += 1; + let msg_end = packet.meta().size; packet.buffer_mut()[msg_start + 1..msg_end].copy_from_slice(&msg_bytes); let offsets = sigverify::do_get_packet_offsets(&packet, 0).unwrap(); @@ -1202,7 +1204,7 @@ mod tests { assert!(batches .iter() .flat_map(|batch| batch.iter()) - .all(|p| p.meta.discard() == should_discard)); + .all(|p| p.meta().discard() == should_discard)); } fn ed25519_verify(batches: &mut [PacketBatch]) { @@ -1226,7 +1228,7 @@ mod tests { assert!(batches .iter() .flat_map(|batch| batch.iter()) - .all(|p| p.meta.discard())); + .all(|p| p.meta().discard())); } #[test] @@ -1292,9 +1294,9 @@ mod tests { .zip(ref_vec.into_iter().flatten()) .all(|(p, discard)| { if discard == 0 { - p.meta.discard() + p.meta().discard() } else { - !p.meta.discard() + !p.meta().discard() } })); } @@ -1316,7 +1318,7 @@ mod tests { for _ in 0..num_modifications { let batch = thread_rng().gen_range(0, batches.len()); let packet = thread_rng().gen_range(0, batches[batch].len()); - let offset = thread_rng().gen_range(0, batches[batch][packet].meta.size); + let offset = thread_rng().gen_range(0, batches[batch][packet].meta().size); let add = thread_rng().gen_range(0, 255); batches[batch][packet].buffer_mut()[offset] = batches[batch][packet] .data(offset) @@ -1326,7 +1328,7 @@ mod tests { let batch_to_disable = thread_rng().gen_range(0, batches.len()); for p in batches[batch_to_disable].iter_mut() { - p.meta.set_discard(true); + p.meta_mut().set_discard(true); } // verify from GPU verification pipeline (when GPU verification is enabled) are @@ -1439,7 +1441,7 @@ mod tests { let mut packet = Packet::from_data(None, tx).unwrap(); let packet_offsets = do_get_packet_offsets(&packet, 0).unwrap(); check_for_simple_vote_transaction(&mut packet, &packet_offsets, 0).ok(); - assert!(!packet.meta.is_simple_vote_tx()); + assert!(!packet.meta().is_simple_vote_tx()); } // single vote tx is @@ -1449,7 +1451,7 @@ mod tests { let mut packet = Packet::from_data(None, tx).unwrap(); let packet_offsets = do_get_packet_offsets(&packet, 0).unwrap(); check_for_simple_vote_transaction(&mut packet, &packet_offsets, 0).ok(); - assert!(packet.meta.is_simple_vote_tx()); + assert!(packet.meta().is_simple_vote_tx()); } // multiple mixed tx is not @@ -1470,7 +1472,7 @@ mod tests { let mut packet = Packet::from_data(None, tx).unwrap(); let packet_offsets = do_get_packet_offsets(&packet, 0).unwrap(); check_for_simple_vote_transaction(&mut packet, &packet_offsets, 0).ok(); - assert!(!packet.meta.is_simple_vote_tx()); + assert!(!packet.meta().is_simple_vote_tx()); } } @@ -1488,9 +1490,9 @@ mod tests { let packet_offsets = do_get_packet_offsets(packet, current_offset).unwrap(); check_for_simple_vote_transaction(packet, &packet_offsets, current_offset).ok(); if index == 1 { - assert!(packet.meta.is_simple_vote_tx()); + assert!(packet.meta().is_simple_vote_tx()); } else { - assert!(!packet.meta.is_simple_vote_tx()); + assert!(!packet.meta().is_simple_vote_tx()); } current_offset = current_offset.saturating_add(size_of::()); @@ -1572,13 +1574,13 @@ mod tests { ); batches.iter_mut().for_each(|b| { b.iter_mut() - .for_each(|p| p.meta.set_discard(thread_rng().gen())) + .for_each(|p| p.meta_mut().set_discard(thread_rng().gen())) }); //find all the non discarded packets let mut start = vec![]; batches.iter_mut().for_each(|b| { b.iter_mut() - .filter(|p| !p.meta.discard()) + .filter(|p| !p.meta().discard()) .for_each(|p| start.push(p.clone())) }); start.sort_by(|a, b| a.data(..).cmp(&b.data(..))); @@ -1590,7 +1592,7 @@ mod tests { let mut end = vec![]; batches.iter_mut().for_each(|b| { b.iter_mut() - .filter(|p| !p.meta.discard()) + .filter(|p| !p.meta().discard()) .for_each(|p| end.push(p.clone())) }); end.sort_by(|a, b| a.data(..).cmp(&b.data(..))); @@ -1762,13 +1764,13 @@ mod tests { batches.iter_mut().enumerate().for_each(|(i, b)| { b.iter_mut() .enumerate() - .for_each(|(j, p)| p.meta.set_discard(set_discard(i, j))) + .for_each(|(j, p)| p.meta_mut().set_discard(set_discard(i, j))) }); assert_eq!(count_valid_packets(&batches, |_| ()), *expect_valid_packets); debug!("show valid packets for case {}", i); batches.iter_mut().enumerate().for_each(|(i, b)| { b.iter_mut().enumerate().for_each(|(j, p)| { - if !p.meta.discard() { + if !p.meta().discard() { trace!("{} {}", i, j) } }) diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index cb210a88a6..c4dd0d7a42 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -36,7 +36,7 @@ mod tests { } for batch in all_packets { for p in &batch { - assert_eq!(p.meta.size, num_bytes); + assert_eq!(p.meta().size, num_bytes); } } assert_eq!(total_packets, num_expected_packets); diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 08389860f9..7fa9d1b3a1 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -44,7 +44,7 @@ pub struct Packet { // Bytes past Packet.meta.size are not valid to read from. // Use Packet.data(index) to read from the buffer. buffer: [u8; PACKET_DATA_SIZE], - pub meta: Meta, + meta: Meta, } impl Packet { @@ -81,6 +81,16 @@ impl Packet { &mut self.buffer[..] } + #[inline] + pub fn meta(&self) -> &Meta { + &self.meta + } + + #[inline] + pub fn meta_mut(&mut self) -> &mut Meta { + &mut self.meta + } + pub fn from_data(dest: Option<&SocketAddr>, data: T) -> Result { let mut packet = Packet::default(); Self::populate_packet(&mut packet, dest, &data)?; @@ -140,7 +150,7 @@ impl Default for Packet { impl PartialEq for Packet { fn eq(&self, other: &Packet) -> bool { - self.meta == other.meta && self.data(..) == other.data(..) + self.meta() == other.meta() && self.data(..) == other.data(..) } } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index a4cf15f59f..7923f1bba0 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -653,8 +653,8 @@ fn handle_chunk( if maybe_batch.is_none() { let mut batch = PacketBatch::with_capacity(1); let mut packet = Packet::default(); - packet.meta.set_socket_addr(remote_addr); - packet.meta.sender_stake = stake; + packet.meta_mut().set_socket_addr(remote_addr); + packet.meta_mut().sender_stake = stake; batch.push(packet); *maybe_batch = Some(batch); stats @@ -670,7 +670,7 @@ fn handle_chunk( }; batch[0].buffer_mut()[chunk.offset as usize..end_of_chunk] .copy_from_slice(&chunk.bytes); - batch[0].meta.size = std::cmp::max(batch[0].meta.size, end_of_chunk); + batch[0].meta_mut().size = std::cmp::max(batch[0].meta().size, end_of_chunk); stats.total_chunks_received.fetch_add(1, Ordering::Relaxed); match peer_type { ConnectionPeerType::Staked => { @@ -689,7 +689,7 @@ fn handle_chunk( trace!("chunk is none"); // done receiving chunks if let Some(batch) = maybe_batch.take() { - let len = batch[0].meta.size; + let len = batch[0].meta().size; if let Err(e) = packet_sender.send(batch) { stats .total_packet_batch_send_err @@ -1116,7 +1116,7 @@ pub mod test { } for batch in all_packets { for p in batch.iter() { - assert_eq!(p.meta.size, 1); + assert_eq!(p.meta().size, 1); } } assert_eq!(total_packets, num_expected_packets); @@ -1152,7 +1152,7 @@ pub mod test { } for batch in all_packets { for p in batch.iter() { - assert_eq!(p.meta.size, num_bytes); + assert_eq!(p.meta().size, num_bytes); } } assert_eq!(total_packets, num_expected_packets); diff --git a/streamer/src/nonblocking/recvmmsg.rs b/streamer/src/nonblocking/recvmmsg.rs index 794b9d72e0..90eeca0ab4 100644 --- a/streamer/src/nonblocking/recvmmsg.rs +++ b/streamer/src/nonblocking/recvmmsg.rs @@ -15,12 +15,12 @@ pub async fn recv_mmsg( socket: &UdpSocket, packets: &mut [Packet], ) -> io::Result { - debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default())); + debug_assert!(packets.iter().all(|pkt| pkt.meta() == &Meta::default())); let count = cmp::min(NUM_RCVMMSGS, packets.len()); socket.readable().await?; let mut i = 0; for p in packets.iter_mut().take(count) { - p.meta.size = 0; + p.meta_mut().size = 0; match socket.try_recv_from(p.buffer_mut()) { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { break; @@ -29,8 +29,8 @@ pub async fn recv_mmsg( return Err(e); } Ok((nrecv, from)) => { - p.meta.size = nrecv; - p.meta.set_socket_addr(&from); + p.meta_mut().size = nrecv; + p.meta_mut().set_socket_addr(&from); } } i += 1; @@ -84,8 +84,8 @@ mod tests { let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap(); assert_eq!(sent, recv); for packet in packets.iter().take(recv) { - assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.socket_addr(), saddr); + assert_eq!(packet.meta().size, PACKET_DATA_SIZE); + assert_eq!(packet.meta().socket_addr(), saddr); } } @@ -110,19 +110,19 @@ mod tests { let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap(); assert_eq!(TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { - assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.socket_addr(), saddr); + assert_eq!(packet.meta().size, PACKET_DATA_SIZE); + assert_eq!(packet.meta().socket_addr(), saddr); } let mut packets = vec![Packet::default(); sent - TEST_NUM_MSGS]; packets .iter_mut() - .for_each(|pkt| pkt.meta = Meta::default()); + .for_each(|pkt| *pkt.meta_mut() = Meta::default()); let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap(); assert_eq!(sent - TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { - assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.socket_addr(), saddr); + assert_eq!(packet.meta().size, PACKET_DATA_SIZE); + assert_eq!(packet.meta().socket_addr(), saddr); } } @@ -153,13 +153,13 @@ mod tests { let recv = recv_mmsg_exact(&reader, &mut packets[..]).await.unwrap(); assert_eq!(TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { - assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.socket_addr(), saddr); + assert_eq!(packet.meta().size, PACKET_DATA_SIZE); + assert_eq!(packet.meta().socket_addr(), saddr); } packets .iter_mut() - .for_each(|pkt| pkt.meta = Meta::default()); + .for_each(|pkt| *pkt.meta_mut() = Meta::default()); let _recv = recv_mmsg(&reader, &mut packets[..]).await; assert!(start.elapsed().as_secs() < 5); } @@ -192,22 +192,22 @@ mod tests { let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap(); assert_eq!(TEST_NUM_MSGS, recv); for packet in packets.iter().take(sent1) { - assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.socket_addr(), saddr1); + assert_eq!(packet.meta().size, PACKET_DATA_SIZE); + assert_eq!(packet.meta().socket_addr(), saddr1); } for packet in packets.iter().skip(sent1).take(recv - sent1) { - assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.socket_addr(), saddr2); + assert_eq!(packet.meta().size, PACKET_DATA_SIZE); + assert_eq!(packet.meta().socket_addr(), saddr2); } packets .iter_mut() - .for_each(|pkt| pkt.meta = Meta::default()); + .for_each(|pkt| *pkt.meta_mut() = Meta::default()); let recv = recv_mmsg(&reader, &mut packets[..]).await.unwrap(); assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { - assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.socket_addr(), saddr2); + assert_eq!(packet.meta().size, PACKET_DATA_SIZE); + assert_eq!(packet.meta().socket_addr(), saddr2); } } } diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index 9824b052ce..f6ce390af0 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -65,7 +65,7 @@ pub fn send_to( socket_addr_space: &SocketAddrSpace, ) -> Result<()> { for p in batch.iter() { - let addr = p.meta.socket_addr(); + let addr = p.meta().socket_addr(); if socket_addr_space.check(&addr) { if let Some(data) = p.data(..) { socket.send_to(data, addr)?; @@ -93,7 +93,7 @@ mod tests { let packets = vec![Packet::default()]; let mut packet_batch = PacketBatch::new(packets); packet_batch.set_addr(&send_addr); - assert_eq!(packet_batch[0].meta.socket_addr(), send_addr); + assert_eq!(packet_batch[0].meta().socket_addr(), send_addr); } #[test] @@ -109,19 +109,21 @@ mod tests { batch.resize(packet_batch_size, Packet::default()); for m in batch.iter_mut() { - m.meta.set_socket_addr(&addr); - m.meta.size = PACKET_DATA_SIZE; + m.meta_mut().set_socket_addr(&addr); + m.meta_mut().size = PACKET_DATA_SIZE; } send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); - batch.iter_mut().for_each(|pkt| pkt.meta = Meta::default()); + batch + .iter_mut() + .for_each(|pkt| *pkt.meta_mut() = Meta::default()); let recvd = recv_from(&mut batch, &recv_socket, 1).unwrap(); assert_eq!(recvd, batch.len()); for m in batch.iter() { - assert_eq!(m.meta.size, PACKET_DATA_SIZE); - assert_eq!(m.meta.socket_addr(), saddr); + assert_eq!(m.meta().size, PACKET_DATA_SIZE); + assert_eq!(m.meta().socket_addr(), saddr); } } @@ -136,10 +138,10 @@ mod tests { let mut p1 = Packet::default(); let mut p2 = Packet::default(); - p1.meta.size = 1; + p1.meta_mut().size = 1; p1.buffer_mut()[0] = 0; - p2.meta.size = 1; + p2.meta_mut().size = 1; p2.buffer_mut()[0] = 0; assert!(p1 == p2); @@ -164,8 +166,8 @@ mod tests { let mut batch = PacketBatch::with_capacity(batch_size); batch.resize(batch_size, Packet::default()); for p in batch.iter_mut() { - p.meta.set_socket_addr(&addr); - p.meta.size = 1; + p.meta_mut().set_socket_addr(&addr); + p.meta_mut().size = 1; } send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); } diff --git a/streamer/src/recvmmsg.rs b/streamer/src/recvmmsg.rs index 722b4a22cd..507c07651a 100644 --- a/streamer/src/recvmmsg.rs +++ b/streamer/src/recvmmsg.rs @@ -17,11 +17,11 @@ use { #[cfg(not(target_os = "linux"))] pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result { - debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default())); + debug_assert!(packets.iter().all(|pkt| pkt.meta() == &Meta::default())); let mut i = 0; let count = cmp::min(NUM_RCVMMSGS, packets.len()); for p in packets.iter_mut().take(count) { - p.meta.size = 0; + p.meta_mut().size = 0; match socket.recv_from(p.buffer_mut()) { Err(_) if i > 0 => { break; @@ -30,8 +30,8 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result { - p.meta.size = nrecv; - p.meta.set_socket_addr(&from); + p.meta_mut().size = nrecv; + p.meta_mut().set_socket_addr(&from); if i == 0 { socket.set_nonblocking(true)?; } @@ -71,7 +71,7 @@ fn cast_socket_addr(addr: &sockaddr_storage, hdr: &mmsghdr) -> Option #[allow(clippy::uninit_assumed_init)] pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result { // Assert that there are no leftovers in packets. - debug_assert!(packets.iter().all(|pkt| pkt.meta == Meta::default())); + debug_assert!(packets.iter().all(|pkt| pkt.meta() == &Meta::default())); const SOCKADDR_STORAGE_SIZE: usize = mem::size_of::(); let mut hdrs: [mmsghdr; NUM_RCVMMSGS] = unsafe { mem::zeroed() }; @@ -107,9 +107,9 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result::len).unwrap_or_default() as u64; } @@ -305,7 +305,7 @@ fn recv_send( packet_batch.iter().for_each(|p| stats.record(p)); } let packets = packet_batch.iter().filter_map(|pkt| { - let addr = pkt.meta.socket_addr(); + let addr = pkt.meta().socket_addr(); let data = pkt.data(..)?; socket_addr_space.check(&addr).then_some((data, addr)) }); @@ -488,8 +488,8 @@ mod test { let mut p = Packet::default(); { p.buffer_mut()[0] = i as u8; - p.meta.size = PACKET_DATA_SIZE; - p.meta.set_socket_addr(&addr); + p.meta_mut().size = PACKET_DATA_SIZE; + p.meta_mut().set_socket_addr(&addr); } packet_batch.push(p); } diff --git a/streamer/tests/recvmmsg.rs b/streamer/tests/recvmmsg.rs index 7c8f602ad2..fb3df661d5 100644 --- a/streamer/tests/recvmmsg.rs +++ b/streamer/tests/recvmmsg.rs @@ -50,7 +50,7 @@ pub fn test_recv_mmsg_batch_size() { } packets .iter_mut() - .for_each(|pkt| pkt.meta = Meta::default()); + .for_each(|pkt| *pkt.meta_mut() = Meta::default()); } elapsed_in_small_batch += now.elapsed().as_nanos(); assert_eq!(TEST_BATCH_SIZE, recv);