diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 9ccba18900..94bdcb9330 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -267,12 +267,6 @@ pub fn verify_shreds_gpu( sigverify::copy_return_values(&v_sig_lens, &out, &mut rvs); inc_new_counter_debug!("ed25519_shred_verify_gpu", count); - recycler_cache.buffer().recycle(out); - recycler_cache.buffer().recycle(pubkeys); - recycler_cache.offsets().recycle(signature_offsets); - recycler_cache.offsets().recycle(pubkey_offsets); - recycler_cache.offsets().recycle(msg_sizes); - recycler_cache.offsets().recycle(msg_start_offsets); rvs } @@ -467,12 +461,6 @@ pub fn sign_shreds_gpu( }); }); inc_new_counter_debug!("ed25519_shred_sign_gpu", count); - recycler_cache.buffer().recycle(signatures_out); - recycler_cache.buffer().recycle(pubkeys); - recycler_cache.offsets().recycle(signature_offsets); - recycler_cache.offsets().recycle(pubkey_offsets); - recycler_cache.offsets().recycle(msg_sizes); - recycler_cache.offsets().recycle(msg_start_offsets); } #[cfg(test)] diff --git a/perf/src/cuda_runtime.rs b/perf/src/cuda_runtime.rs index 61e919afdc..fda5277e22 100644 --- a/perf/src/cuda_runtime.rs +++ b/perf/src/cuda_runtime.rs @@ -6,12 +6,13 @@ // cannot be paged to disk. The cuda driver provides these interfaces to pin and unpin memory. use crate::perf_libs; -use crate::recycler::Reset; +use crate::recycler::{RecyclerX, Reset}; use rand::seq::SliceRandom; use rand::Rng; use rayon::prelude::*; use std::ops::{Index, IndexMut}; use std::slice::SliceIndex; +use std::sync::{Arc, Weak}; use std::os::raw::c_int; @@ -57,29 +58,33 @@ pub fn unpin(_mem: *mut T) { // page-pinned. Controlled by flags in case user only wants // to pin in certain circumstances. #[derive(Debug)] -pub struct PinnedVec { +pub struct PinnedVec { x: Vec, pinned: bool, pinnable: bool, + recycler: Option>>>, } -impl Reset for PinnedVec { +impl Reset for PinnedVec { fn reset(&mut self) { self.resize(0, T::default()); } - fn warm(&mut self, size_hint: usize) { self.set_pinnable(); self.resize(size_hint, T::default()); } + fn set_recycler(&mut self, recycler: Weak>) { + self.recycler = Some(recycler); + } } -impl Default for PinnedVec { +impl Default for PinnedVec { fn default() -> Self { Self { x: Vec::new(), pinned: false, pinnable: false, + recycler: None, } } } @@ -88,7 +93,7 @@ pub struct PinnedIter<'a, T>(std::slice::Iter<'a, T>); pub struct PinnedIterMut<'a, T>(std::slice::IterMut<'a, T>); -impl<'a, T> Iterator for PinnedIter<'a, T> { +impl<'a, T: Clone + Default + Sized> Iterator for PinnedIter<'a, T> { type Item = &'a T; fn next(&mut self) -> Option { @@ -96,7 +101,7 @@ impl<'a, T> Iterator for PinnedIter<'a, T> { } } -impl<'a, T> Iterator for PinnedIterMut<'a, T> { +impl<'a, T: Clone + Default + Sized> Iterator for PinnedIterMut<'a, T> { type Item = &'a mut T; fn next(&mut self) -> Option { @@ -104,7 +109,7 @@ impl<'a, T> Iterator for PinnedIterMut<'a, T> { } } -impl<'a, T> IntoIterator for &'a mut PinnedVec { +impl<'a, T: Clone + Default + Sized> IntoIterator for &'a mut PinnedVec { type Item = &'a T; type IntoIter = PinnedIter<'a, T>; @@ -113,7 +118,7 @@ impl<'a, T> IntoIterator for &'a mut PinnedVec { } } -impl<'a, T> IntoIterator for &'a PinnedVec { +impl<'a, T: Clone + Default + Sized> IntoIterator for &'a PinnedVec { type Item = &'a T; type IntoIter = PinnedIter<'a, T>; @@ -122,7 +127,7 @@ impl<'a, T> IntoIterator for &'a PinnedVec { } } -impl> Index for PinnedVec { +impl> Index for PinnedVec { type Output = I::Output; #[inline] @@ -131,14 +136,14 @@ impl> Index for PinnedVec { } } -impl> IndexMut for PinnedVec { +impl> IndexMut for PinnedVec { #[inline] fn index_mut(&mut self, index: I) -> &mut Self::Output { &mut self.x[index] } } -impl PinnedVec { +impl PinnedVec { pub fn iter(&self) -> PinnedIter { PinnedIter(self.x.iter()) } @@ -148,7 +153,7 @@ impl PinnedVec { } } -impl<'a, T: Send + Sync> IntoParallelIterator for &'a PinnedVec { +impl<'a, T: Clone + Send + Sync + Default + Sized> IntoParallelIterator for &'a PinnedVec { type Iter = rayon::slice::Iter<'a, T>; type Item = &'a T; fn into_par_iter(self) -> Self::Iter { @@ -156,7 +161,7 @@ impl<'a, T: Send + Sync> IntoParallelIterator for &'a PinnedVec { } } -impl PinnedVec { +impl PinnedVec { pub fn reserve_and_pin(&mut self, size: usize) { if self.x.capacity() < size { if self.pinned { @@ -181,6 +186,7 @@ impl PinnedVec { x: source, pinned: false, pinnable: false, + recycler: None, } } @@ -190,6 +196,7 @@ impl PinnedVec { x, pinned: false, pinnable: false, + recycler: None, } } @@ -272,9 +279,13 @@ impl PinnedVec { self.pinned = true; } } + fn recycler_ref(&self) -> Option>> { + let r = self.recycler.as_ref()?; + r.upgrade() + } } -impl Clone for PinnedVec { +impl Clone for PinnedVec { fn clone(&self) -> Self { let mut x = self.x.clone(); let pinned = if self.pinned { @@ -293,12 +304,18 @@ impl Clone for PinnedVec { x, pinned, pinnable: self.pinnable, + recycler: self.recycler.clone(), } } } -impl Drop for PinnedVec { +impl Drop for PinnedVec { fn drop(&mut self) { + if let Some(strong) = self.recycler_ref() { + let mut vec = PinnedVec::default(); + std::mem::swap(&mut vec, self); + strong.recycle(vec); + } if self.pinned { unpin(self.x.as_mut_ptr()); } diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 1981e452b8..eb4f834775 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -1,11 +1,8 @@ //! The `packet` module defines data structures and methods to pull data from the network. -use crate::{ - cuda_runtime::PinnedVec, - recycler::{Recycler, Reset}, -}; +use crate::{cuda_runtime::PinnedVec, recycler::Recycler}; use serde::Serialize; pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; -use std::{mem, net::SocketAddr}; +use std::net::SocketAddr; pub const NUM_PACKETS: usize = 1024 * 8; @@ -16,38 +13,13 @@ pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE); #[derive(Debug, Clone)] pub struct Packets { pub packets: PinnedVec, - - recycler: Option, -} - -impl Drop for Packets { - fn drop(&mut self) { - if let Some(ref recycler) = self.recycler { - let old = mem::replace(&mut self.packets, PinnedVec::default()); - recycler.recycle(old) - } - } -} - -impl Reset for Packets { - fn reset(&mut self) { - self.packets.resize(0, Packet::default()); - } - - fn warm(&mut self, size_hint: usize) { - self.packets.set_pinnable(); - self.packets.resize(size_hint, Packet::default()); - } } //auto derive doesn't support large arrays impl Default for Packets { fn default() -> Packets { let packets = PinnedVec::with_capacity(NUM_RCVMMSGS); - Packets { - packets, - recycler: None, - } + Packets { packets } } } @@ -56,19 +28,13 @@ pub type PacketsRecycler = Recycler>; impl Packets { pub fn new(packets: Vec) -> Self { let packets = PinnedVec::from_vec(packets); - Self { - packets, - recycler: None, - } + Self { packets } } pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self { let mut packets = recycler.allocate(name); packets.reserve_and_pin(size); - Packets { - packets, - recycler: Some(recycler), - } + Packets { packets } } pub fn new_with_recycler_data( recycler: &PacketsRecycler, @@ -142,15 +108,6 @@ mod tests { use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; - #[test] - fn test_packets_reset() { - let mut packets = Packets::default(); - packets.packets.resize(10, Packet::default()); - assert_eq!(packets.packets.len(), 10); - packets.reset(); - assert_eq!(packets.packets.len(), 0); - } - #[test] fn test_to_packets() { let keypair = Keypair::new(); diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index 40e45a7946..40dc09217e 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -1,7 +1,7 @@ use rand::{thread_rng, Rng}; use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Weak}; #[derive(Debug, Default)] struct RecyclerStats { @@ -11,38 +11,36 @@ struct RecyclerStats { max_gc: AtomicUsize, } -#[derive(Debug)] +#[derive(Clone, Default)] pub struct Recycler { - gc: Arc>>, - stats: Arc, + recycler: Arc>, +} + +#[derive(Debug)] +pub struct RecyclerX { + gc: Mutex>, + stats: RecyclerStats, id: usize, } -impl Default for Recycler { - fn default() -> Recycler { +impl Default for RecyclerX { + fn default() -> RecyclerX { let id = thread_rng().gen_range(0, 1000); trace!("new recycler..{}", id); - Recycler { - gc: Arc::new(Mutex::new(vec![])), - stats: Arc::new(RecyclerStats::default()), + RecyclerX { + gc: Mutex::new(vec![]), + stats: RecyclerStats::default(), id, } } } -impl Clone for Recycler { - fn clone(&self) -> Recycler { - Recycler { - gc: self.gc.clone(), - stats: self.stats.clone(), - id: self.id, - } - } -} - pub trait Reset { fn reset(&mut self); fn warm(&mut self, size_hint: usize); + fn set_recycler(&mut self, recycler: Weak>) + where + Self: std::marker::Sized; } lazy_static! { @@ -57,7 +55,7 @@ fn warm_recyclers() -> bool { WARM_RECYCLERS.load(Ordering::Relaxed) } -impl Recycler { +impl Recycler { pub fn warmed(num: usize, size_hint: usize) -> Self { let new = Self::default(); if warm_recyclers() { @@ -68,37 +66,44 @@ impl Recycler { item }) .collect(); - warmed_items.into_iter().for_each(|i| new.recycle(i)); + warmed_items + .into_iter() + .for_each(|i| new.recycler.recycle(i)); } new } pub fn allocate(&self, name: &'static str) -> T { let new = self + .recycler .gc .lock() .expect("recycler lock in pb fn allocate") .pop(); if let Some(mut x) = new { - self.stats.reuse.fetch_add(1, Ordering::Relaxed); + self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); x.reset(); return x; } - let total = self.stats.total.fetch_add(1, Ordering::Relaxed); + let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed); trace!( "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}", total, name, - self.id, - self.stats.reuse.load(Ordering::Relaxed), - self.stats.max_gc.load(Ordering::Relaxed), + self.recycler.id, + self.recycler.stats.reuse.load(Ordering::Relaxed), + self.recycler.stats.max_gc.load(Ordering::Relaxed), ); - T::default() + let mut t = T::default(); + t.set_recycler(Arc::downgrade(&self.recycler)); + t } +} +impl RecyclerX { pub fn recycle(&self, x: T) { let len = { let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); @@ -135,6 +140,7 @@ mod tests { *self = 10; } fn warm(&mut self, _size_hint: usize) {} + fn set_recycler(&mut self, _recycler: Weak>) {} } #[test] @@ -144,10 +150,10 @@ mod tests { assert_eq!(y, 0); y = 20; let recycler2 = recycler.clone(); - recycler2.recycle(y); - assert_eq!(recycler.gc.lock().unwrap().len(), 1); + recycler2.recycler.recycle(y); + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1); let z = recycler.allocate("test_recycler2"); assert_eq!(z, 10); - assert_eq!(recycler.gc.lock().unwrap().len(), 0); + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); } } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index a44c27ce15..d262c7cdfb 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -365,11 +365,6 @@ pub fn ed25519_verify( trace!("done verify"); copy_return_values(&sig_lens, &out, &mut rvs); inc_new_counter_debug!("ed25519_verify_gpu", count); - recycler_out.recycle(out); - recycler.recycle(signature_offsets); - recycler.recycle(pubkey_offsets); - recycler.recycle(msg_sizes); - recycler.recycle(msg_start_offsets); rvs }