Automatically call `.recycle` in Drop (#7429)

automerge
This commit is contained in:
anatoly yakovenko 2019-12-11 11:58:40 -08:00 committed by Grimes
parent 0aa4dc904e
commit bec5835289
5 changed files with 74 additions and 111 deletions

View File

@ -267,12 +267,6 @@ pub fn verify_shreds_gpu(
sigverify::copy_return_values(&v_sig_lens, &out, &mut rvs);
inc_new_counter_debug!("ed25519_shred_verify_gpu", count);
recycler_cache.buffer().recycle(out);
recycler_cache.buffer().recycle(pubkeys);
recycler_cache.offsets().recycle(signature_offsets);
recycler_cache.offsets().recycle(pubkey_offsets);
recycler_cache.offsets().recycle(msg_sizes);
recycler_cache.offsets().recycle(msg_start_offsets);
rvs
}
@ -467,12 +461,6 @@ pub fn sign_shreds_gpu(
});
});
inc_new_counter_debug!("ed25519_shred_sign_gpu", count);
recycler_cache.buffer().recycle(signatures_out);
recycler_cache.buffer().recycle(pubkeys);
recycler_cache.offsets().recycle(signature_offsets);
recycler_cache.offsets().recycle(pubkey_offsets);
recycler_cache.offsets().recycle(msg_sizes);
recycler_cache.offsets().recycle(msg_start_offsets);
}
#[cfg(test)]

View File

@ -6,12 +6,13 @@
// cannot be paged to disk. The cuda driver provides these interfaces to pin and unpin memory.
use crate::perf_libs;
use crate::recycler::Reset;
use crate::recycler::{RecyclerX, Reset};
use rand::seq::SliceRandom;
use rand::Rng;
use rayon::prelude::*;
use std::ops::{Index, IndexMut};
use std::slice::SliceIndex;
use std::sync::{Arc, Weak};
use std::os::raw::c_int;
@ -57,29 +58,33 @@ pub fn unpin<T>(_mem: *mut T) {
// page-pinned. Controlled by flags in case user only wants
// to pin in certain circumstances.
#[derive(Debug)]
pub struct PinnedVec<T> {
pub struct PinnedVec<T: Default + Clone + Sized> {
x: Vec<T>,
pinned: bool,
pinnable: bool,
recycler: Option<Weak<RecyclerX<PinnedVec<T>>>>,
}
impl<T: Default + Clone> Reset for PinnedVec<T> {
impl<T: Default + Clone + Sized> Reset for PinnedVec<T> {
fn reset(&mut self) {
self.resize(0, T::default());
}
fn warm(&mut self, size_hint: usize) {
self.set_pinnable();
self.resize(size_hint, T::default());
}
fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>) {
self.recycler = Some(recycler);
}
}
impl<T: Clone> Default for PinnedVec<T> {
impl<T: Clone + Default + Sized> Default for PinnedVec<T> {
fn default() -> Self {
Self {
x: Vec::new(),
pinned: false,
pinnable: false,
recycler: None,
}
}
}
@ -88,7 +93,7 @@ pub struct PinnedIter<'a, T>(std::slice::Iter<'a, T>);
pub struct PinnedIterMut<'a, T>(std::slice::IterMut<'a, T>);
impl<'a, T> Iterator for PinnedIter<'a, T> {
impl<'a, T: Clone + Default + Sized> Iterator for PinnedIter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
@ -96,7 +101,7 @@ impl<'a, T> Iterator for PinnedIter<'a, T> {
}
}
impl<'a, T> Iterator for PinnedIterMut<'a, T> {
impl<'a, T: Clone + Default + Sized> Iterator for PinnedIterMut<'a, T> {
type Item = &'a mut T;
fn next(&mut self) -> Option<Self::Item> {
@ -104,7 +109,7 @@ impl<'a, T> Iterator for PinnedIterMut<'a, T> {
}
}
impl<'a, T> IntoIterator for &'a mut PinnedVec<T> {
impl<'a, T: Clone + Default + Sized> IntoIterator for &'a mut PinnedVec<T> {
type Item = &'a T;
type IntoIter = PinnedIter<'a, T>;
@ -113,7 +118,7 @@ impl<'a, T> IntoIterator for &'a mut PinnedVec<T> {
}
}
impl<'a, T> IntoIterator for &'a PinnedVec<T> {
impl<'a, T: Clone + Default + Sized> IntoIterator for &'a PinnedVec<T> {
type Item = &'a T;
type IntoIter = PinnedIter<'a, T>;
@ -122,7 +127,7 @@ impl<'a, T> IntoIterator for &'a PinnedVec<T> {
}
}
impl<T, I: SliceIndex<[T]>> Index<I> for PinnedVec<T> {
impl<T: Clone + Default + Sized, I: SliceIndex<[T]>> Index<I> for PinnedVec<T> {
type Output = I::Output;
#[inline]
@ -131,14 +136,14 @@ impl<T, I: SliceIndex<[T]>> Index<I> for PinnedVec<T> {
}
}
impl<T, I: SliceIndex<[T]>> IndexMut<I> for PinnedVec<T> {
impl<T: Clone + Default + Sized, I: SliceIndex<[T]>> IndexMut<I> for PinnedVec<T> {
#[inline]
fn index_mut(&mut self, index: I) -> &mut Self::Output {
&mut self.x[index]
}
}
impl<T> PinnedVec<T> {
impl<T: Clone + Default + Sized> PinnedVec<T> {
pub fn iter(&self) -> PinnedIter<T> {
PinnedIter(self.x.iter())
}
@ -148,7 +153,7 @@ impl<T> PinnedVec<T> {
}
}
impl<'a, T: Send + Sync> IntoParallelIterator for &'a PinnedVec<T> {
impl<'a, T: Clone + Send + Sync + Default + Sized> IntoParallelIterator for &'a PinnedVec<T> {
type Iter = rayon::slice::Iter<'a, T>;
type Item = &'a T;
fn into_par_iter(self) -> Self::Iter {
@ -156,7 +161,7 @@ impl<'a, T: Send + Sync> IntoParallelIterator for &'a PinnedVec<T> {
}
}
impl<T: Clone> PinnedVec<T> {
impl<T: Clone + Default + Sized> PinnedVec<T> {
pub fn reserve_and_pin(&mut self, size: usize) {
if self.x.capacity() < size {
if self.pinned {
@ -181,6 +186,7 @@ impl<T: Clone> PinnedVec<T> {
x: source,
pinned: false,
pinnable: false,
recycler: None,
}
}
@ -190,6 +196,7 @@ impl<T: Clone> PinnedVec<T> {
x,
pinned: false,
pinnable: false,
recycler: None,
}
}
@ -272,9 +279,13 @@ impl<T: Clone> PinnedVec<T> {
self.pinned = true;
}
}
fn recycler_ref(&self) -> Option<Arc<RecyclerX<Self>>> {
let r = self.recycler.as_ref()?;
r.upgrade()
}
}
impl<T: Clone> Clone for PinnedVec<T> {
impl<T: Clone + Default + Sized> Clone for PinnedVec<T> {
fn clone(&self) -> Self {
let mut x = self.x.clone();
let pinned = if self.pinned {
@ -293,12 +304,18 @@ impl<T: Clone> Clone for PinnedVec<T> {
x,
pinned,
pinnable: self.pinnable,
recycler: self.recycler.clone(),
}
}
}
impl<T> Drop for PinnedVec<T> {
impl<T: Sized + Default + Clone> Drop for PinnedVec<T> {
fn drop(&mut self) {
if let Some(strong) = self.recycler_ref() {
let mut vec = PinnedVec::default();
std::mem::swap(&mut vec, self);
strong.recycle(vec);
}
if self.pinned {
unpin(self.x.as_mut_ptr());
}

View File

@ -1,11 +1,8 @@
//! The `packet` module defines data structures and methods to pull data from the network.
use crate::{
cuda_runtime::PinnedVec,
recycler::{Recycler, Reset},
};
use crate::{cuda_runtime::PinnedVec, recycler::Recycler};
use serde::Serialize;
pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE};
use std::{mem, net::SocketAddr};
use std::net::SocketAddr;
pub const NUM_PACKETS: usize = 1024 * 8;
@ -16,38 +13,13 @@ pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE);
#[derive(Debug, Clone)]
pub struct Packets {
pub packets: PinnedVec<Packet>,
recycler: Option<PacketsRecycler>,
}
impl Drop for Packets {
fn drop(&mut self) {
if let Some(ref recycler) = self.recycler {
let old = mem::replace(&mut self.packets, PinnedVec::default());
recycler.recycle(old)
}
}
}
impl Reset for Packets {
fn reset(&mut self) {
self.packets.resize(0, Packet::default());
}
fn warm(&mut self, size_hint: usize) {
self.packets.set_pinnable();
self.packets.resize(size_hint, Packet::default());
}
}
//auto derive doesn't support large arrays
impl Default for Packets {
fn default() -> Packets {
let packets = PinnedVec::with_capacity(NUM_RCVMMSGS);
Packets {
packets,
recycler: None,
}
Packets { packets }
}
}
@ -56,19 +28,13 @@ pub type PacketsRecycler = Recycler<PinnedVec<Packet>>;
impl Packets {
pub fn new(packets: Vec<Packet>) -> Self {
let packets = PinnedVec::from_vec(packets);
Self {
packets,
recycler: None,
}
Self { packets }
}
pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self {
let mut packets = recycler.allocate(name);
packets.reserve_and_pin(size);
Packets {
packets,
recycler: Some(recycler),
}
Packets { packets }
}
pub fn new_with_recycler_data(
recycler: &PacketsRecycler,
@ -142,15 +108,6 @@ mod tests {
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
#[test]
fn test_packets_reset() {
let mut packets = Packets::default();
packets.packets.resize(10, Packet::default());
assert_eq!(packets.packets.len(), 10);
packets.reset();
assert_eq!(packets.packets.len(), 0);
}
#[test]
fn test_to_packets() {
let keypair = Keypair::new();

View File

@ -1,7 +1,7 @@
use rand::{thread_rng, Rng};
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, Weak};
#[derive(Debug, Default)]
struct RecyclerStats {
@ -11,38 +11,36 @@ struct RecyclerStats {
max_gc: AtomicUsize,
}
#[derive(Debug)]
#[derive(Clone, Default)]
pub struct Recycler<T> {
gc: Arc<Mutex<Vec<T>>>,
stats: Arc<RecyclerStats>,
recycler: Arc<RecyclerX<T>>,
}
#[derive(Debug)]
pub struct RecyclerX<T> {
gc: Mutex<Vec<T>>,
stats: RecyclerStats,
id: usize,
}
impl<T: Default> Default for Recycler<T> {
fn default() -> Recycler<T> {
impl<T: Default> Default for RecyclerX<T> {
fn default() -> RecyclerX<T> {
let id = thread_rng().gen_range(0, 1000);
trace!("new recycler..{}", id);
Recycler {
gc: Arc::new(Mutex::new(vec![])),
stats: Arc::new(RecyclerStats::default()),
RecyclerX {
gc: Mutex::new(vec![]),
stats: RecyclerStats::default(),
id,
}
}
}
impl<T: Default> Clone for Recycler<T> {
fn clone(&self) -> Recycler<T> {
Recycler {
gc: self.gc.clone(),
stats: self.stats.clone(),
id: self.id,
}
}
}
pub trait Reset {
fn reset(&mut self);
fn warm(&mut self, size_hint: usize);
fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>)
where
Self: std::marker::Sized;
}
lazy_static! {
@ -57,7 +55,7 @@ fn warm_recyclers() -> bool {
WARM_RECYCLERS.load(Ordering::Relaxed)
}
impl<T: Default + Reset> Recycler<T> {
impl<T: Default + Reset + Sized> Recycler<T> {
pub fn warmed(num: usize, size_hint: usize) -> Self {
let new = Self::default();
if warm_recyclers() {
@ -68,37 +66,44 @@ impl<T: Default + Reset> Recycler<T> {
item
})
.collect();
warmed_items.into_iter().for_each(|i| new.recycle(i));
warmed_items
.into_iter()
.for_each(|i| new.recycler.recycle(i));
}
new
}
pub fn allocate(&self, name: &'static str) -> T {
let new = self
.recycler
.gc
.lock()
.expect("recycler lock in pb fn allocate")
.pop();
if let Some(mut x) = new {
self.stats.reuse.fetch_add(1, Ordering::Relaxed);
self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
x.reset();
return x;
}
let total = self.stats.total.fetch_add(1, Ordering::Relaxed);
let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed);
trace!(
"allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
total,
name,
self.id,
self.stats.reuse.load(Ordering::Relaxed),
self.stats.max_gc.load(Ordering::Relaxed),
self.recycler.id,
self.recycler.stats.reuse.load(Ordering::Relaxed),
self.recycler.stats.max_gc.load(Ordering::Relaxed),
);
T::default()
let mut t = T::default();
t.set_recycler(Arc::downgrade(&self.recycler));
t
}
}
impl<T: Default + Reset> RecyclerX<T> {
pub fn recycle(&self, x: T) {
let len = {
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
@ -135,6 +140,7 @@ mod tests {
*self = 10;
}
fn warm(&mut self, _size_hint: usize) {}
fn set_recycler(&mut self, _recycler: Weak<RecyclerX<Self>>) {}
}
#[test]
@ -144,10 +150,10 @@ mod tests {
assert_eq!(y, 0);
y = 20;
let recycler2 = recycler.clone();
recycler2.recycle(y);
assert_eq!(recycler.gc.lock().unwrap().len(), 1);
recycler2.recycler.recycle(y);
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1);
let z = recycler.allocate("test_recycler2");
assert_eq!(z, 10);
assert_eq!(recycler.gc.lock().unwrap().len(), 0);
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
}
}

View File

@ -365,11 +365,6 @@ pub fn ed25519_verify(
trace!("done verify");
copy_return_values(&sig_lens, &out, &mut rvs);
inc_new_counter_debug!("ed25519_verify_gpu", count);
recycler_out.recycle(out);
recycler.recycle(signature_offsets);
recycler.recycle(pubkey_offsets);
recycler.recycle(msg_sizes);
recycler.recycle(msg_start_offsets);
rvs
}