From 4dc30ea1047ec35255cb39eec891856c5a4eaba1 Mon Sep 17 00:00:00 2001 From: sakridge Date: Thu, 13 Sep 2018 14:49:48 -0700 Subject: [PATCH] Add recycler stats (#1187) --- src/fullnode.rs | 3 ++- src/packet.rs | 53 +++++++++++++++++++++++++++++++++++++++++++++++-- src/rpu.rs | 3 ++- src/tpu.rs | 3 ++- 4 files changed, 57 insertions(+), 5 deletions(-) diff --git a/src/fullnode.rs b/src/fullnode.rs index 5c94c6d73a..19759ab676 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -220,7 +220,8 @@ impl Fullnode { } let exit = Arc::new(AtomicBool::new(false)); let bank = Arc::new(bank); - let blob_recycler = BlobRecycler::default(); + let mut blob_recycler = BlobRecycler::default(); + blob_recycler.set_name("fullnode::Blob"); let rpu = Rpu::new( &bank, diff --git a/src/packet.rs b/src/packet.rs index 626ba6a9fb..f438dbfa9c 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -11,7 +11,7 @@ use std::fmt; use std::io; use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; -use std::sync::atomic::AtomicUsize; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; pub type SharedPackets = Arc>; @@ -181,12 +181,22 @@ pub enum BlobError { pub struct Recycler { #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] gc: Arc>, &'static str)>>>, + allocated_count: Arc, + recycled_count: Arc, + reuse_count: Arc, + skipped_count: Arc, + name: String, } impl Default for Recycler { fn default() -> Recycler { Recycler { gc: Arc::new(Mutex::new(vec![])), + allocated_count: Arc::new(AtomicUsize::new(0)), + recycled_count: Arc::new(AtomicUsize::new(0)), + reuse_count: Arc::new(AtomicUsize::new(0)), + skipped_count: Arc::new(AtomicUsize::new(0)), + name: format!("? sz: {}", size_of::()).to_string(), } } } @@ -195,13 +205,27 @@ impl Clone for Recycler { fn clone(&self) -> Recycler { Recycler { gc: self.gc.clone(), + allocated_count: self.allocated_count.clone(), + recycled_count: self.recycled_count.clone(), + reuse_count: self.reuse_count.clone(), + skipped_count: self.skipped_count.clone(), + name: self.name.clone(), } } } +fn inc_counter(x: &AtomicUsize) { + x.fetch_add(1, Ordering::Relaxed); +} + impl Recycler { + pub fn set_name(&mut self, name: &'static str) { + self.name = name.to_string(); + } + pub fn allocate(&self) -> Arc> { let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate"); + let gc_count = gc.len(); loop { if let Some((x, who)) = gc.pop() { @@ -218,10 +242,12 @@ impl Recycler { // // warn!("Recycled item still in use. Booting it."); trace!( - "Recycled item from \"{}\" still in use. {} Booting it.", + "{} Recycled item from \"{}\" still in use. {} Booting it.", + self.name, who, Arc::strong_count(&x) ); + inc_counter(&self.skipped_count); continue; } @@ -229,14 +255,37 @@ impl Recycler { let mut w = x.write().unwrap(); w.reset(); } + inc_counter(&self.reuse_count); return x; } else { + inc_counter(&self.allocated_count); + if self.allocated_count.load(Ordering::Relaxed) % 2048 == 0 { + self.print_stats(gc_count); + } return Arc::new(RwLock::new(Default::default())); } } } + + fn print_stats(&self, gc_count: usize) { + info!( + "{} recycler stats: allocated: {} reused: {} skipped: {} recycled: {} gc_count: {}", + self.name, + self.allocated_count.load(Ordering::Relaxed), + self.reuse_count.load(Ordering::Relaxed), + self.skipped_count.load(Ordering::Relaxed), + self.recycled_count.load(Ordering::Relaxed), + gc_count + ); + } + pub fn recycle(&self, x: Arc>, who: &'static str) { let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); + inc_counter(&self.recycled_count); + if self.recycled_count.load(Ordering::Relaxed) % 2048 == 0 { + self.print_stats(0); + } + gc.push((x, who)); } } diff --git a/src/rpu.rs b/src/rpu.rs index d3cc33621c..7a634fb619 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -48,7 +48,8 @@ impl Rpu { blob_recycler: &BlobRecycler, exit: Arc, ) -> Self { - let packet_recycler = PacketRecycler::default(); + let mut packet_recycler = PacketRecycler::default(); + packet_recycler.set_name("rpu::Packet"); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( Arc::new(requests_socket), diff --git a/src/tpu.rs b/src/tpu.rs index 7b0c98866a..7ae5221806 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -62,7 +62,8 @@ impl Tpu { ledger_path: &str, sigverify_disabled: bool, ) -> (Self, BlobReceiver) { - let packet_recycler = PacketRecycler::default(); + let mut packet_recycler = PacketRecycler::default(); + packet_recycler.set_name("tpu::Packet"); let (fetch_stage, packet_receiver) = FetchStage::new(transactions_sockets, exit, &packet_recycler);