diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index e70d1b6d34..d4c44582d6 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -53,10 +53,7 @@ use { }, solana_perf::{ data_budget::DataBudget, - packet::{ - limited_deserialize, to_packet_batch_with_destination, Packet, PacketBatch, - PacketBatchRecycler, PACKET_DATA_SIZE, - }, + packet::{limited_deserialize, Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, }, solana_rayon_threadlimit::get_thread_count, solana_runtime::{bank_forks::BankForks, vote_parser}, @@ -1567,7 +1564,11 @@ impl ClusterInfo { generate_pull_requests, ); if !reqs.is_empty() { - let packet_batch = to_packet_batch_with_destination(recycler.clone(), &reqs); + let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests( + recycler.clone(), + "run_gossip", + &reqs, + ); self.stats .packets_sent_gossip_requests_count .add_relaxed(packet_batch.packets.len() as u64); @@ -2160,27 +2161,21 @@ impl ClusterInfo { I: IntoIterator, { let keypair = self.keypair(); - let packets: Vec<_> = pings + let pongs_and_dests: Vec<_> = pings .into_iter() .filter_map(|(addr, ping)| { let pong = Pong::new(&ping, &keypair).ok()?; let pong = Protocol::PongMessage(pong); - match Packet::from_data(Some(&addr), pong) { - Ok(packet) => Some(packet), - Err(err) => { - error!("failed to write pong packet: {:?}", err); - None - } - } + Some((addr, pong)) }) .collect(); - if packets.is_empty() { + if pongs_and_dests.is_empty() { None } else { - let packet_batch = PacketBatch::new_unpinned_with_recycler_data( - recycler, + let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests( + recycler.clone(), "handle_ping_messages", - packets, + &pongs_and_dests, ); Some(packet_batch) } @@ -2284,7 +2279,11 @@ impl ClusterInfo { if prune_messages.is_empty() { return; } - let mut packet_batch = to_packet_batch_with_destination(recycler.clone(), &prune_messages); + let mut packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests( + recycler.clone(), + "handle_batch_push_messages", + &prune_messages, + ); let num_prune_packets = packet_batch.packets.len(); self.stats .push_response_count @@ -2961,7 +2960,11 @@ pub fn push_messages_to_peer( let reqs: Vec<_> = ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, messages) .map(move |payload| (peer_gossip, Protocol::PushMessage(self_id, payload))) .collect(); - let packet_batch = to_packet_batch_with_destination(PacketBatchRecycler::default(), &reqs); + let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests( + PacketBatchRecycler::default(), + "push_messages_to_peer", + &reqs, + ); let sock = UdpSocket::bind("0.0.0.0:0").unwrap(); packet::send_to(&packet_batch, &sock, socket_addr_space)?; Ok(()) diff --git a/perf/src/packet.rs b/perf/src/packet.rs index ed66870c74..b4cea63ee0 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -60,6 +60,32 @@ impl PacketBatch { batch } + pub fn new_unpinned_with_recycler_data_and_dests( + recycler: PacketBatchRecycler, + name: &'static str, + dests_and_data: &[(SocketAddr, T)], + ) -> Self { + let mut batch = + PacketBatch::new_unpinned_with_recycler(recycler, dests_and_data.len(), name); + batch + .packets + .resize(dests_and_data.len(), Packet::default()); + + for ((addr, data), packet) in dests_and_data.iter().zip(batch.packets.iter_mut()) { + if !addr.ip().is_unspecified() && addr.port() != 0 { + if let Err(e) = Packet::populate_packet(packet, Some(addr), &data) { + // TODO: This should never happen. Instead the caller should + // break the payload into smaller messages, and here any errors + // should be propagated. + error!("Couldn't write to packet {:?}. Data skipped.", e); + } + } else { + trace!("Dropping packet, as destination is unknown"); + } + } + batch + } + pub fn new_unpinned_with_recycler_data( recycler: &PacketBatchRecycler, name: &'static str, @@ -100,33 +126,6 @@ pub fn to_packet_batches_for_tests(items: &[T]) -> Vec( - recycler: PacketBatchRecycler, - dests_and_data: &[(SocketAddr, T)], -) -> PacketBatch { - let mut batch = PacketBatch::new_unpinned_with_recycler( - recycler, - dests_and_data.len(), - "to_packet_batch_with_destination", - ); - batch - .packets - .resize(dests_and_data.len(), Packet::default()); - for ((addr, data), packet) in dests_and_data.iter().zip(batch.packets.iter_mut()) { - if !addr.ip().is_unspecified() && addr.port() != 0 { - if let Err(e) = Packet::populate_packet(packet, Some(addr), &data) { - // TODO: This should never happen. Instead the caller should - // break the payload into smaller messages, and here any errors - // should be propagated. - error!("Couldn't write to packet {:?}. Data skipped.", e); - } - } else { - trace!("Dropping packet, as destination is unknown"); - } - } - batch -} - pub fn limited_deserialize(data: &[u8]) -> bincode::Result where T: serde::de::DeserializeOwned,