Add recycler stats (#1187)

This commit is contained in:
sakridge 2018-09-13 14:49:48 -07:00 committed by GitHub
parent 90df6237c6
commit 4dc30ea104
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 5 deletions

View File

@ -220,7 +220,8 @@ impl Fullnode {
}
let exit = Arc::new(AtomicBool::new(false));
let bank = Arc::new(bank);
let blob_recycler = BlobRecycler::default();
let mut blob_recycler = BlobRecycler::default();
blob_recycler.set_name("fullnode::Blob");
let rpu = Rpu::new(
&bank,

View File

@ -11,7 +11,7 @@ use std::fmt;
use std::io;
use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
pub type SharedPackets = Arc<RwLock<Packets>>;
@ -181,12 +181,22 @@ pub enum BlobError {
pub struct Recycler<T> {
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
gc: Arc<Mutex<Vec<(Arc<RwLock<T>>, &'static str)>>>,
allocated_count: Arc<AtomicUsize>,
recycled_count: Arc<AtomicUsize>,
reuse_count: Arc<AtomicUsize>,
skipped_count: Arc<AtomicUsize>,
name: String,
}
impl<T: Default> Default for Recycler<T> {
fn default() -> Recycler<T> {
Recycler {
gc: Arc::new(Mutex::new(vec![])),
allocated_count: Arc::new(AtomicUsize::new(0)),
recycled_count: Arc::new(AtomicUsize::new(0)),
reuse_count: Arc::new(AtomicUsize::new(0)),
skipped_count: Arc::new(AtomicUsize::new(0)),
name: format!("? sz: {}", size_of::<T>()).to_string(),
}
}
}
@ -195,13 +205,27 @@ impl<T: Default> Clone for Recycler<T> {
fn clone(&self) -> Recycler<T> {
Recycler {
gc: self.gc.clone(),
allocated_count: self.allocated_count.clone(),
recycled_count: self.recycled_count.clone(),
reuse_count: self.reuse_count.clone(),
skipped_count: self.skipped_count.clone(),
name: self.name.clone(),
}
}
}
fn inc_counter(x: &AtomicUsize) {
x.fetch_add(1, Ordering::Relaxed);
}
impl<T: Default + Reset> Recycler<T> {
pub fn set_name(&mut self, name: &'static str) {
self.name = name.to_string();
}
pub fn allocate(&self) -> Arc<RwLock<T>> {
let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate");
let gc_count = gc.len();
loop {
if let Some((x, who)) = gc.pop() {
@ -218,10 +242,12 @@ impl<T: Default + Reset> Recycler<T> {
//
// warn!("Recycled item still in use. Booting it.");
trace!(
"Recycled item from \"{}\" still in use. {} Booting it.",
"{} Recycled item from \"{}\" still in use. {} Booting it.",
self.name,
who,
Arc::strong_count(&x)
);
inc_counter(&self.skipped_count);
continue;
}
@ -229,14 +255,37 @@ impl<T: Default + Reset> Recycler<T> {
let mut w = x.write().unwrap();
w.reset();
}
inc_counter(&self.reuse_count);
return x;
} else {
inc_counter(&self.allocated_count);
if self.allocated_count.load(Ordering::Relaxed) % 2048 == 0 {
self.print_stats(gc_count);
}
return Arc::new(RwLock::new(Default::default()));
}
}
}
fn print_stats(&self, gc_count: usize) {
info!(
"{} recycler stats: allocated: {} reused: {} skipped: {} recycled: {} gc_count: {}",
self.name,
self.allocated_count.load(Ordering::Relaxed),
self.reuse_count.load(Ordering::Relaxed),
self.skipped_count.load(Ordering::Relaxed),
self.recycled_count.load(Ordering::Relaxed),
gc_count
);
}
pub fn recycle(&self, x: Arc<RwLock<T>>, who: &'static str) {
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
inc_counter(&self.recycled_count);
if self.recycled_count.load(Ordering::Relaxed) % 2048 == 0 {
self.print_stats(0);
}
gc.push((x, who));
}
}

View File

@ -48,7 +48,8 @@ impl Rpu {
blob_recycler: &BlobRecycler,
exit: Arc<AtomicBool>,
) -> Self {
let packet_recycler = PacketRecycler::default();
let mut packet_recycler = PacketRecycler::default();
packet_recycler.set_name("rpu::Packet");
let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver(
Arc::new(requests_socket),

View File

@ -62,7 +62,8 @@ impl Tpu {
ledger_path: &str,
sigverify_disabled: bool,
) -> (Self, BlobReceiver) {
let packet_recycler = PacketRecycler::default();
let mut packet_recycler = PacketRecycler::default();
packet_recycler.set_name("tpu::Packet");
let (fetch_stage, packet_receiver) =
FetchStage::new(transactions_sockets, exit, &packet_recycler);