From c248fb3f51f0431801eb1cba8a8a1c11cd4bada7 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 23 May 2022 15:48:59 +0000 Subject: [PATCH] renames Packet Meta::{,set_}addr methods to {,set_}socket_addr (#25478) In order to distinguish between Meta.addr field which is an IpAddr and the methods which refer to a SocketAddr. --- bench-streamer/src/main.rs | 4 ++-- core/src/ancestor_hashes_service.rs | 6 +++--- core/src/find_packet_sender_stake_stage.rs | 6 ++++-- core/src/repair_response.rs | 2 +- core/src/serve_repair.rs | 2 +- core/src/window_service.rs | 5 +++-- gossip/src/cluster_info.rs | 6 +++--- perf/src/packet.rs | 2 +- sdk/src/packet.rs | 18 +++++++++--------- streamer/src/nonblocking/recvmmsg.rs | 16 ++++++++-------- streamer/src/packet.rs | 10 +++++----- streamer/src/quic.rs | 2 +- streamer/src/recvmmsg.rs | 18 +++++++++--------- streamer/src/streamer.rs | 4 ++-- 14 files changed, 52 insertions(+), 49 deletions(-) diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index a4541a88ce..63930debe9 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -25,7 +25,7 @@ fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { packet_batch.packets.resize(10, Packet::default()); for w in packet_batch.packets.iter_mut() { w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(addr); + w.meta.set_socket_addr(addr); } let packet_batch = Arc::new(packet_batch); spawn(move || loop { @@ -34,7 +34,7 @@ fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { } let mut num = 0; for p in &packet_batch.packets { - let a = p.meta.addr(); + let a = p.meta.socket_addr(); assert!(p.meta.size <= PACKET_DATA_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); num += 1; diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index 24870aa385..85df9a8b95 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -321,7 +321,7 @@ impl AncestorHashesService { outstanding_requests: &RwLock, blockstore: &Blockstore, ) -> Option<(Slot, DuplicateAncestorDecision)> { - let from_addr = packet.meta.addr(); + let from_addr = packet.meta.socket_addr(); limited_deserialize(&packet.data[..packet.meta.size.saturating_sub(SIZE_OF_NONCE)]) .ok() .and_then(|ancestor_hashes_response| { @@ -1117,7 +1117,7 @@ mod test { .recv_timeout(Duration::from_millis(10_000)) .unwrap(); let packet = &mut response_packet.packets[0]; - packet.meta.set_addr(&responder_info.serve_repair); + packet.meta.set_socket_addr(&responder_info.serve_repair); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, @@ -1478,7 +1478,7 @@ mod test { .recv_timeout(Duration::from_millis(10_000)) .unwrap(); let packet = &mut response_packet.packets[0]; - packet.meta.set_addr(&responder_info.serve_repair); + packet.meta.set_socket_addr(&responder_info.serve_repair); let decision = AncestorHashesService::verify_and_process_ancestor_response( packet, &ancestor_hashes_request_statuses, diff --git a/core/src/find_packet_sender_stake_stage.rs b/core/src/find_packet_sender_stake_stage.rs index baae04334f..108f560a90 100644 --- a/core/src/find_packet_sender_stake_stage.rs +++ b/core/src/find_packet_sender_stake_stage.rs @@ -135,8 +135,10 @@ impl FindPacketSenderStakeStage { .into_par_iter() .flat_map(|batch| batch.packets.par_iter_mut()) .for_each(|packet| { - packet.meta.sender_stake = - *ip_to_stake.get(&packet.meta.addr().ip()).unwrap_or(&0); + packet.meta.sender_stake = ip_to_stake + .get(&packet.meta.addr) + .copied() + .unwrap_or_default(); }); }); } diff --git a/core/src/repair_response.rs b/core/src/repair_response.rs index 71d0be27b7..ddc5973ade 100644 --- a/core/src/repair_response.rs +++ b/core/src/repair_response.rs @@ -33,7 +33,7 @@ pub fn repair_response_packet_from_bytes( if packet.meta.size > packet.data.len() { return None; } - packet.meta.set_addr(dest); + packet.meta.set_socket_addr(dest); packet.data[..bytes.len()].copy_from_slice(&bytes); let mut wr = io::Cursor::new(&mut packet.data[bytes.len()..]); bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce"); diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index c3e9e0c88b..76c887adc7 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -432,7 +432,7 @@ impl ServeRepair { ) { // iter over the packets packet_batch.packets.iter().for_each(|packet| { - let from_addr = packet.meta.addr(); + let from_addr = packet.meta.socket_addr(); limited_deserialize(&packet.data[..packet.meta.size]) .into_iter() .for_each(|request| { diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 74f9ae44ae..49b39e5be7 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -374,7 +374,7 @@ where } if packet.meta.repair() { let repair_info = RepairMeta { - _from_addr: packet.meta.addr(), + _from_addr: packet.meta.socket_addr(), // If can't parse the nonce, dump the packet. nonce: repair_response::nonce(&packet.data)?, }; @@ -408,7 +408,8 @@ where stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::(); for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) { - *stats.addrs.entry(packet.meta.addr()).or_default() += 1; + let addr = packet.meta.socket_addr(); + *stats.addrs.entry(addr).or_default() += 1; } stats.elapsed += now.elapsed(); Ok(()) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index de0cd4d33f..84158035f4 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1901,7 +1901,7 @@ impl ClusterInfo { } check }; - // Because pull-responses are sent back to packet.meta.addr() of + // Because pull-responses are sent back to packet.meta.socket_addr() of // incoming pull-requests, pings are also sent to request.from_addr (as // opposed to caller.gossip address). move |request| { @@ -2470,7 +2470,7 @@ impl ClusterInfo { let protocol: Protocol = limited_deserialize(data).ok()?; protocol.sanitize().ok()?; let protocol = protocol.par_verify(&self.stats)?; - Some((packet.meta.addr(), protocol)) + Some((packet.meta.socket_addr(), protocol)) }; let packets: Vec<_> = { let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time); @@ -3234,7 +3234,7 @@ mod tests { remote_nodes.into_iter(), pongs.into_iter() ) { - assert_eq!(packet.meta.addr(), socket); + assert_eq!(packet.meta.socket_addr(), socket); let bytes = serialize(&pong).unwrap(); match limited_deserialize(&packet.data[..packet.meta.size]).unwrap() { Protocol::PongMessage(pong) => assert_eq!(serialize(&pong).unwrap(), bytes), diff --git a/perf/src/packet.rs b/perf/src/packet.rs index cc4a869dc1..0cd584f516 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -98,7 +98,7 @@ impl PacketBatch { pub fn set_addr(&mut self, addr: &SocketAddr) { for p in self.packets.iter_mut() { - p.meta.set_addr(addr); + p.meta.set_socket_addr(addr); } } diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 760ab0e1cd..671d810b01 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -17,11 +17,11 @@ pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8; bitflags! { #[repr(C)] pub struct PacketFlags: u8 { - const DISCARD = 0b00000001; - const FORWARDED = 0b00000010; - const REPAIR = 0b00000100; - const SIMPLE_VOTE_TX = 0b00001000; - const TRACER_TX = 0b00010000; + const DISCARD = 0b0000_0001; + const FORWARDED = 0b0000_0010; + const REPAIR = 0b0000_0100; + const SIMPLE_VOTE_TX = 0b0000_1000; + const TRACER_TX = 0b0001_0000; } } @@ -63,7 +63,7 @@ impl Packet { let len = wr.position() as usize; packet.meta.size = len; if let Some(dest) = dest { - packet.meta.set_addr(dest); + packet.meta.set_socket_addr(dest); } Ok(()) } @@ -75,7 +75,7 @@ impl fmt::Debug for Packet { f, "Packet {{ size: {:?}, addr: {:?} }}", self.meta.size, - self.meta.addr() + self.meta.socket_addr() ) } } @@ -99,11 +99,11 @@ impl PartialEq for Packet { } impl Meta { - pub fn addr(&self) -> SocketAddr { + pub fn socket_addr(&self) -> SocketAddr { SocketAddr::new(self.addr, self.port) } - pub fn set_addr(&mut self, socket_addr: &SocketAddr) { + pub fn set_socket_addr(&mut self, socket_addr: &SocketAddr) { self.addr = socket_addr.ip(); self.port = socket_addr.port(); } diff --git a/streamer/src/nonblocking/recvmmsg.rs b/streamer/src/nonblocking/recvmmsg.rs index 0313630a3f..b34e74ce68 100644 --- a/streamer/src/nonblocking/recvmmsg.rs +++ b/streamer/src/nonblocking/recvmmsg.rs @@ -28,7 +28,7 @@ pub async fn recv_mmsg( } Ok((nrecv, from)) => { p.meta.size = nrecv; - p.meta.set_addr(&from); + p.meta.set_socket_addr(&from); } } i += 1; @@ -68,7 +68,7 @@ mod tests { assert_eq!(sent, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.addr(), saddr); + assert_eq!(packet.meta.socket_addr(), saddr); } } @@ -94,7 +94,7 @@ mod tests { assert_eq!(TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.addr(), saddr); + assert_eq!(packet.meta.socket_addr(), saddr); } packets @@ -104,7 +104,7 @@ mod tests { assert_eq!(sent - TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.addr(), saddr); + assert_eq!(packet.meta.socket_addr(), saddr); } } @@ -136,7 +136,7 @@ mod tests { assert_eq!(TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.addr(), saddr); + assert_eq!(packet.meta.socket_addr(), saddr); } packets @@ -175,11 +175,11 @@ mod tests { assert_eq!(TEST_NUM_MSGS, recv); for packet in packets.iter().take(sent1) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.addr(), saddr1); + assert_eq!(packet.meta.socket_addr(), saddr1); } for packet in packets.iter().skip(sent1).take(recv - sent1) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.addr(), saddr2); + assert_eq!(packet.meta.socket_addr(), saddr2); } packets @@ -189,7 +189,7 @@ mod tests { assert_eq!(sent1 + sent2 - TEST_NUM_MSGS, recv); for packet in packets.iter().take(recv) { assert_eq!(packet.meta.size, PACKET_DATA_SIZE); - assert_eq!(packet.meta.addr(), saddr2); + assert_eq!(packet.meta.socket_addr(), saddr2); } } } diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index 34404143f1..1e2f949432 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -66,7 +66,7 @@ pub fn send_to( socket_addr_space: &SocketAddrSpace, ) -> Result<()> { for p in &batch.packets { - let addr = p.meta.addr(); + let addr = p.meta.socket_addr(); if socket_addr_space.check(&addr) { socket.send_to(&p.data[..p.meta.size], &addr)?; } @@ -92,7 +92,7 @@ mod tests { let packets = vec![Packet::default()]; let mut packet_batch = PacketBatch::new(packets); packet_batch.set_addr(&send_addr); - assert_eq!(packet_batch.packets[0].meta.addr(), send_addr); + assert_eq!(packet_batch.packets[0].meta.socket_addr(), send_addr); } #[test] @@ -107,7 +107,7 @@ mod tests { batch.packets.resize(10, Packet::default()); for m in batch.packets.iter_mut() { - m.meta.set_addr(&addr); + m.meta.set_socket_addr(&addr); m.meta.size = PACKET_DATA_SIZE; } send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); @@ -122,7 +122,7 @@ mod tests { for m in &batch.packets { assert_eq!(m.meta.size, PACKET_DATA_SIZE); - assert_eq!(m.meta.addr(), saddr); + assert_eq!(m.meta.socket_addr(), saddr); } } @@ -164,7 +164,7 @@ mod tests { let mut batch = PacketBatch::default(); batch.packets.resize(1, Packet::default()); for m in batch.packets.iter_mut() { - m.meta.set_addr(&addr); + m.meta.set_socket_addr(&addr); m.meta.size = 1; } send_to(&batch, &send_socket, &SocketAddrSpace::Unspecified).unwrap(); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 6b9e1a5d64..152aaef3b1 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -178,7 +178,7 @@ fn handle_chunk( if maybe_batch.is_none() { let mut batch = PacketBatch::with_capacity(1); let mut packet = Packet::default(); - packet.meta.set_addr(remote_addr); + packet.meta.set_socket_addr(remote_addr); packet.meta.sender_stake = stake; batch.packets.push(packet); *maybe_batch = Some(batch); diff --git a/streamer/src/recvmmsg.rs b/streamer/src/recvmmsg.rs index cf89bb1ecb..ea6d6bf6ae 100644 --- a/streamer/src/recvmmsg.rs +++ b/streamer/src/recvmmsg.rs @@ -31,7 +31,7 @@ pub fn recv_mmsg(socket: &UdpSocket, packets: &mut [Packet]) -> io::Result { p.meta.size = nrecv; - p.meta.set_addr(&from); + p.meta.set_socket_addr(&from); if i == 0 { socket.set_nonblocking(true)?; } @@ -107,7 +107,7 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result