Construct PacketBatches from PongMessages directly (#24708)

Serialize pongs directly into PacketBatch to save copying the data from
intermediate packets into PacketBatch.
This commit is contained in:
steviez 2022-04-26 21:30:00 -05:00 committed by GitHub
parent 081c844d6e
commit b48fd4eec2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 46 deletions

View File

@ -53,10 +53,7 @@ use {
},
solana_perf::{
data_budget::DataBudget,
packet::{
limited_deserialize, to_packet_batch_with_destination, Packet, PacketBatch,
PacketBatchRecycler, PACKET_DATA_SIZE,
},
packet::{limited_deserialize, Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank_forks::BankForks, vote_parser},
@ -1567,7 +1564,11 @@ impl ClusterInfo {
generate_pull_requests,
);
if !reqs.is_empty() {
let packet_batch = to_packet_batch_with_destination(recycler.clone(), &reqs);
let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler.clone(),
"run_gossip",
&reqs,
);
self.stats
.packets_sent_gossip_requests_count
.add_relaxed(packet_batch.packets.len() as u64);
@ -2160,27 +2161,21 @@ impl ClusterInfo {
I: IntoIterator<Item = (SocketAddr, Ping)>,
{
let keypair = self.keypair();
let packets: Vec<_> = pings
let pongs_and_dests: Vec<_> = pings
.into_iter()
.filter_map(|(addr, ping)| {
let pong = Pong::new(&ping, &keypair).ok()?;
let pong = Protocol::PongMessage(pong);
match Packet::from_data(Some(&addr), pong) {
Ok(packet) => Some(packet),
Err(err) => {
error!("failed to write pong packet: {:?}", err);
None
}
}
Some((addr, pong))
})
.collect();
if packets.is_empty() {
if pongs_and_dests.is_empty() {
None
} else {
let packet_batch = PacketBatch::new_unpinned_with_recycler_data(
recycler,
let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler.clone(),
"handle_ping_messages",
packets,
&pongs_and_dests,
);
Some(packet_batch)
}
@ -2284,7 +2279,11 @@ impl ClusterInfo {
if prune_messages.is_empty() {
return;
}
let mut packet_batch = to_packet_batch_with_destination(recycler.clone(), &prune_messages);
let mut packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler.clone(),
"handle_batch_push_messages",
&prune_messages,
);
let num_prune_packets = packet_batch.packets.len();
self.stats
.push_response_count
@ -2961,7 +2960,11 @@ pub fn push_messages_to_peer(
let reqs: Vec<_> = ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, messages)
.map(move |payload| (peer_gossip, Protocol::PushMessage(self_id, payload)))
.collect();
let packet_batch = to_packet_batch_with_destination(PacketBatchRecycler::default(), &reqs);
let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
PacketBatchRecycler::default(),
"push_messages_to_peer",
&reqs,
);
let sock = UdpSocket::bind("0.0.0.0:0").unwrap();
packet::send_to(&packet_batch, &sock, socket_addr_space)?;
Ok(())

View File

@ -60,6 +60,32 @@ impl PacketBatch {
batch
}
pub fn new_unpinned_with_recycler_data_and_dests<T: Serialize>(
recycler: PacketBatchRecycler,
name: &'static str,
dests_and_data: &[(SocketAddr, T)],
) -> Self {
let mut batch =
PacketBatch::new_unpinned_with_recycler(recycler, dests_and_data.len(), name);
batch
.packets
.resize(dests_and_data.len(), Packet::default());
for ((addr, data), packet) in dests_and_data.iter().zip(batch.packets.iter_mut()) {
if !addr.ip().is_unspecified() && addr.port() != 0 {
if let Err(e) = Packet::populate_packet(packet, Some(addr), &data) {
// TODO: This should never happen. Instead the caller should
// break the payload into smaller messages, and here any errors
// should be propagated.
error!("Couldn't write to packet {:?}. Data skipped.", e);
}
} else {
trace!("Dropping packet, as destination is unknown");
}
}
batch
}
pub fn new_unpinned_with_recycler_data(
recycler: &PacketBatchRecycler,
name: &'static str,
@ -100,33 +126,6 @@ pub fn to_packet_batches_for_tests<T: Serialize>(items: &[T]) -> Vec<PacketBatch
to_packet_batches(items, NUM_PACKETS)
}
pub fn to_packet_batch_with_destination<T: Serialize>(
recycler: PacketBatchRecycler,
dests_and_data: &[(SocketAddr, T)],
) -> PacketBatch {
let mut batch = PacketBatch::new_unpinned_with_recycler(
recycler,
dests_and_data.len(),
"to_packet_batch_with_destination",
);
batch
.packets
.resize(dests_and_data.len(), Packet::default());
for ((addr, data), packet) in dests_and_data.iter().zip(batch.packets.iter_mut()) {
if !addr.ip().is_unspecified() && addr.port() != 0 {
if let Err(e) = Packet::populate_packet(packet, Some(addr), &data) {
// TODO: This should never happen. Instead the caller should
// break the payload into smaller messages, and here any errors
// should be propagated.
error!("Couldn't write to packet {:?}. Data skipped.", e);
}
} else {
trace!("Dropping packet, as destination is unknown");
}
}
batch
}
pub fn limited_deserialize<T>(data: &[u8]) -> bincode::Result<T>
where
T: serde::de::DeserializeOwned,