diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 18fa4f0d0..e39759918 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -92,6 +92,7 @@ fn main() -> Result<()> { recycler.clone(), "bench-streamer-test", 1, + true, )); } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 573e17076..48f44d60e 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -113,6 +113,7 @@ impl FetchStage { recycler.clone(), "fetch_stage", coalesce_ms, + true, ) }); @@ -125,6 +126,7 @@ impl FetchStage { recycler.clone(), "fetch_forward_stage", coalesce_ms, + true, ) }); diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 03e8d3f91..feb5191ba 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -515,7 +515,7 @@ impl ServeRepair { if let Some(packet) = packet { inc_new_counter_debug!("serve_repair-window-request-ledger", 1); - return Some(Packets::new_with_recycler_data( + return Some(Packets::new_unpinned_with_recycler_data( recycler, "run_window_request", vec![packet], @@ -555,7 +555,7 @@ impl ServeRepair { from_addr, nonce, )?; - return Some(Packets::new_with_recycler_data( + return Some(Packets::new_unpinned_with_recycler_data( recycler, "run_highest_window_request", vec![packet], @@ -572,7 +572,7 @@ impl ServeRepair { max_responses: usize, nonce: Nonce, ) -> Option { - let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan"); + let mut res = Packets::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan"); if let Some(blockstore) = blockstore { // Try to find the next "n" parent slots of the input slot while let Ok(Some(meta)) = blockstore.meta(slot) { diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index f2ef152e9..dae275a1e 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -33,6 +33,7 @@ impl ServeRepairService { Recycler::default(), "serve_repair_receiver", 1, + false, ); let (response_sender, response_receiver) = channel(); let t_responder = diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index b7946b54d..2c9a9961a 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -150,6 +150,7 @@ impl ShredFetchStage { recycler.clone(), "packet_modifier", 1, + true, ) }) .collect(); diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 3ad0510fb..452c7c099 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2051,7 +2051,8 @@ impl ClusterInfo { .process_pull_requests(callers.cloned(), timestamp()); let output_size_limit = self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE; - let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); + let mut packets = + Packets::new_unpinned_with_recycler(recycler.clone(), 64, "handle_pull_requests"); let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = { let mut rng = rand::thread_rng(); let check_pull_request = @@ -2323,7 +2324,7 @@ impl ClusterInfo { None } else { let packets = - Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets); + Packets::new_unpinned_with_recycler_data(recycler, "handle_ping_messages", packets); Some(packets) } } diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 075128269..589ee3758 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -54,6 +54,7 @@ impl GossipService { Recycler::default(), "gossip_receiver", 1, + false, ); let (response_sender, response_receiver) = channel(); let (consume_sender, listen_receiver) = channel(); diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index 3b5b054c1..eb0bf6a79 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -146,6 +146,10 @@ impl<'a, T: Clone + Send + Sync + Default + Sized> IntoParallelIterator for &'a } impl PinnedVec { + pub fn reserve(&mut self, size: usize) { + self.x.reserve(size); + } + pub fn reserve_and_pin(&mut self, size: usize) { if self.x.capacity() < size { if self.pinned { diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 946124e7f..bdd2052c7 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -28,11 +28,22 @@ impl Packets { Packets { packets } } + pub fn new_unpinned_with_recycler( + recycler: PacketsRecycler, + size: usize, + name: &'static str, + ) -> Self { + let mut packets = recycler.allocate(name); + packets.reserve(size); + Packets { packets } + } + pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { let mut packets = recycler.allocate(name); packets.reserve_and_pin(size); Packets { packets } } + pub fn new_with_recycler_data( recycler: &PacketsRecycler, name: &'static str, @@ -43,6 +54,16 @@ impl Packets { vec } + pub fn new_unpinned_with_recycler_data( + recycler: &PacketsRecycler, + name: &'static str, + mut packets: Vec, + ) -> Self { + let mut vec = Self::new_unpinned_with_recycler(recycler.clone(), packets.len(), name); + vec.packets.append(&mut packets); + vec + } + pub fn set_addr(&mut self, addr: &SocketAddr) { for m in self.packets.iter_mut() { m.meta.set_addr(&addr); @@ -76,7 +97,7 @@ pub fn to_packets_with_destination( recycler: PacketsRecycler, dests_and_data: &[(SocketAddr, T)], ) -> Packets { - let mut out = Packets::new_with_recycler( + let mut out = Packets::new_unpinned_with_recycler( recycler, dests_and_data.len(), "to_packets_with_destination", diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 349142a56..4cccbdc9e 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -36,13 +36,18 @@ fn recv_loop( recycler: &PacketsRecycler, name: &'static str, coalesce_ms: u64, + use_pinned_memory: bool, ) -> Result<()> { let mut recv_count = 0; let mut call_count = 0; let mut now = Instant::now(); let mut num_max_received = 0; // Number of times maximum packets were received loop { - let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name); + let mut msgs = if use_pinned_memory { + Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name) + } else { + Packets::with_capacity(PACKETS_PER_BATCH) + }; loop { // Check for exit signal, even if socket is busy // (for instance the leader transaction socket) @@ -84,6 +89,7 @@ pub fn receiver( recycler: PacketsRecycler, name: &'static str, coalesce_ms: u64, + use_pinned_memory: bool, ) -> JoinHandle<()> { let res = sock.set_read_timeout(Some(Duration::new(1, 0))); if res.is_err() { @@ -100,6 +106,7 @@ pub fn receiver( &recycler.clone(), name, coalesce_ms, + use_pinned_memory, ); }) .unwrap() @@ -211,6 +218,7 @@ mod test { Recycler::default(), "test", 1, + true, ); let t_responder = { let (s_responder, r_responder) = channel();