diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 65714d35d..cb1c683ee 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -414,7 +414,7 @@ pub fn broadcast_shreds( let seed = shred.seed(Some(self_pubkey), &root_bank); let node = cluster_nodes.get_broadcast_peer(seed)?; if socket_addr_space.check(&node.tvu) { - Some((&shred.payload[..], &node.tvu)) + Some((&shred.payload, node.tvu)) } else { None } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 06024ed3b..d5a0303fd 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -288,7 +288,6 @@ impl RepairService { }) .collect() }; - let batch: Vec<(&[u8], &SocketAddr)> = batch.iter().map(|(v, s)| (&v[..], s)).collect(); build_repairs_batch_elapsed.stop(); let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed"); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 607d73ffc..e4725d485 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1250,13 +1250,13 @@ impl ClusterInfo { let dests: Vec<_> = if forwarded { peers .iter() - .map(|peer| &peer.tvu_forwards) + .map(|peer| peer.tvu_forwards) .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) .collect() } else { peers .iter() - .map(|peer| &peer.tvu) + .map(|peer| peer.tvu) .filter(|addr| socket_addr_space.check(addr)) .collect() }; diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index d1e4a5875..7f2526b5b 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -1,8 +1,14 @@ //! The `sendmmsg` module provides sendmmsg() API implementation -use std::io; -use std::net::{SocketAddr, UdpSocket}; -use thiserror::Error; +use { + std::{ + borrow::Borrow, + io, + iter::repeat, + net::{SocketAddr, UdpSocket}, + }, + thiserror::Error, +}; #[derive(Debug, Error)] pub enum SendPktsError { @@ -12,11 +18,15 @@ pub enum SendPktsError { } #[cfg(not(target_os = "linux"))] -pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<(), SendPktsError> { +pub fn batch_send(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> +where + S: Borrow, + T: AsRef<[u8]>, +{ let mut num_failed = 0; let mut erropt = None; for (p, a) in packets { - if let Err(e) = sock.send_to(p, *a) { + if let Err(e) = sock.send_to(p.as_ref(), a.borrow()) { num_failed += 1; if erropt.is_none() { erropt = Some(e); @@ -128,7 +138,11 @@ fn sendmmsg_retry(sock: &UdpSocket, hdrs: &mut Vec) -> Result<(), SendP } #[cfg(target_os = "linux")] -pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result<(), SendPktsError> { +pub fn batch_send(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError> +where + S: Borrow, + T: AsRef<[u8]>, +{ // The vectors are allocated with capacity, as later code inserts elements // at specific indices, and uses the address of the vector index in hdrs let mut iovs: Vec = Vec::with_capacity(packets.len()); @@ -136,19 +150,30 @@ pub fn batch_send(sock: &UdpSocket, packets: &[(&[u8], &SocketAddr)]) -> Result< let mut hdrs: Vec = Vec::with_capacity(packets.len()); for (pkt, dest) in packets.iter() { - mmsghdr_for_packet(pkt, dest, &mut iovs, &mut addrs, &mut hdrs); + mmsghdr_for_packet( + pkt.as_ref(), + dest.borrow(), + &mut iovs, + &mut addrs, + &mut hdrs, + ); } sendmmsg_retry(sock, &mut hdrs) } -pub fn multi_target_send( +pub fn multi_target_send( sock: &UdpSocket, - packet: &[u8], - dests: &[&SocketAddr], -) -> Result<(), SendPktsError> { - let pkts: Vec<_> = dests.iter().map(|addr| (packet, *addr)).collect(); - batch_send(sock, &pkts[..]) + packet: T, + dests: &[S], +) -> Result<(), SendPktsError> +where + S: Borrow, + T: AsRef<[u8]>, +{ + let dests = dests.iter().map(Borrow::borrow); + let pkts: Vec<_> = repeat(&packet).zip(dests).collect(); + batch_send(sock, &pkts) } #[cfg(test)]