diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index 1d94d8e4e..8885d8c30 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -3,6 +3,18 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; +// A temporary burst in the workload can cause a large number of allocations, +// after which they will be recycled and still reside in memory. If the number +// of recycled objects stays above below limit for long, they will be deemed as +// redundant since they are not getting reused. The recycler will then shrink +// by releasing objects above this threshold. This limit aims to maintain a +// cushion against *normal* variations in the workload while bounding the +// number of redundant garbage collected objects after temporary bursts. +const RECYCLER_SHRINK_SIZE: usize = 1024; +// Lookback window for averaging number of garbage collected objects in terms +// of number of allocations. +const RECYCLER_SHRINK_WINDOW: usize = 16384; + #[derive(Debug, Default)] struct RecyclerStats { total: AtomicUsize, @@ -21,6 +33,8 @@ pub struct RecyclerX { gc: Mutex>, stats: RecyclerStats, id: usize, + // Shrink window times the exponential moving average size of gc.len(). + size_factor: AtomicUsize, } impl Default for RecyclerX { @@ -28,9 +42,10 @@ impl Default for RecyclerX { let id = thread_rng().gen_range(0, 1000); trace!("new recycler..{}", id); RecyclerX { - gc: Mutex::new(vec![]), + gc: Mutex::default(), stats: RecyclerStats::default(), id, + size_factor: AtomicUsize::default(), } } } @@ -74,19 +89,37 @@ impl Recycler { } pub fn allocate(&self, name: &'static str) -> T { - let new = self - .recycler - .gc - .lock() - .expect("recycler lock in pb fn allocate") - .pop(); - - if let Some(mut x) = new { - self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); - x.reset(); - return x; + { + const RECYCLER_SHRINK_WINDOW_HALF: usize = RECYCLER_SHRINK_WINDOW / 2; + const RECYCLER_SHRINK_WINDOW_SUB_ONE: usize = RECYCLER_SHRINK_WINDOW - 1; + let mut gc = self.recycler.gc.lock().unwrap(); + // Update the exponential moving average of gc.len(). + // The update equation is: + // a <- a * (n - 1) / n + x / n + // To avoid floating point math, define b = n a: + // b <- b * (n - 1) / n + x + // To make the remaining division to round (instead of truncate), + // add n/2 to the numerator. + // Effectively b (size_factor here) is an exponential moving + // estimate of the "sum" of x (gc.len()) over the window as opposed + // to the "average". + self.recycler.size_factor.store( + self.recycler + .size_factor + .load(Ordering::Acquire) + .saturating_mul(RECYCLER_SHRINK_WINDOW_SUB_ONE) + .saturating_add(RECYCLER_SHRINK_WINDOW_HALF) + .checked_div(RECYCLER_SHRINK_WINDOW) + .unwrap() + .saturating_add(gc.len()), + Ordering::Release, + ); + if let Some(mut x) = gc.pop() { + self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); + x.reset(); + return x; + } } - let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed); trace!( "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}", @@ -108,6 +141,16 @@ impl RecyclerX { let len = { let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); gc.push(x); + const SIZE_FACTOR_AFTER_SHRINK: usize = RECYCLER_SHRINK_SIZE * RECYCLER_SHRINK_WINDOW; + if gc.len() > RECYCLER_SHRINK_SIZE + && self.size_factor.load(Ordering::Acquire) >= SIZE_FACTOR_AFTER_SHRINK + { + for mut x in gc.drain(RECYCLER_SHRINK_SIZE..) { + x.set_recycler(Weak::default()); + } + self.size_factor + .store(SIZE_FACTOR_AFTER_SHRINK, Ordering::Release); + } gc.len() }; @@ -137,6 +180,8 @@ impl RecyclerX { #[cfg(test)] mod tests { use super::*; + use crate::packet::PacketsRecycler; + use std::iter::repeat_with; impl Reset for u64 { fn reset(&mut self) { @@ -159,4 +204,28 @@ mod tests { assert_eq!(z, 10); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); } + + #[test] + fn test_recycler_shrink() { + let mut rng = rand::thread_rng(); + let recycler = PacketsRecycler::default(); + // Allocate a burst of packets. + const NUM_PACKETS: usize = RECYCLER_SHRINK_SIZE * 2; + { + let _packets: Vec<_> = repeat_with(|| recycler.allocate("")) + .take(NUM_PACKETS) + .collect(); + } + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), NUM_PACKETS); + // Process a normal load of packets for a while. + for _ in 0..RECYCLER_SHRINK_WINDOW / 16 { + let count = rng.gen_range(1, 128); + let _packets: Vec<_> = repeat_with(|| recycler.allocate("")).take(count).collect(); + } + // Assert that the gc size has shrinked. + assert_eq!( + recycler.recycler.gc.lock().unwrap().len(), + RECYCLER_SHRINK_SIZE + ); + } }