From cfe7a4340b9ebce91c8bd80bf37a6782fe738975 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 7 Apr 2021 11:16:03 -0400 Subject: [PATCH] adds a shrink policy to the recycler without an allocation limit https://github.com/solana-labs/solana/pull/15320 added an allocation limit to the recycler, which has been the source of a number of bugs. For example the code bellow panics by simply cloning packets: const RECYCLER_LIMIT: usize = 8; let recycler = PacketsRecycler::new_with_limit("", RECYCLER_LIMIT as u32); let packets = Packets::new_with_recycler(recycler.clone(), 1).unwrap(); for _ in 0..RECYCLER_LIMIT { let _ = packets.clone(); } Packets::new_with_recycler(recycler.clone(), 1); The implementation also fails to account for instances where objects are consumed. Having the allocation limit in the recycler also seems out of place, as higher level code has better context to impose allocation limits (e.g. by using bounded channels to rate-limit), whereas the recycler would be simpler and more efficient if it just do the recycling. This commit: * Reverts https://github.com/solana-labs/solana/pull/15320 * Adds a shrink policy to the recycler without an allocation limit. --- perf/src/recycler.rs | 95 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 82 insertions(+), 13 deletions(-) diff --git a/perf/src/recycler.rs b/perf/src/recycler.rs index 1d94d8e4e..8885d8c30 100644 --- a/perf/src/recycler.rs +++ b/perf/src/recycler.rs @@ -3,6 +3,18 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; +// A temporary burst in the workload can cause a large number of allocations, +// after which they will be recycled and still reside in memory. If the number +// of recycled objects stays above below limit for long, they will be deemed as +// redundant since they are not getting reused. The recycler will then shrink +// by releasing objects above this threshold. This limit aims to maintain a +// cushion against *normal* variations in the workload while bounding the +// number of redundant garbage collected objects after temporary bursts. +const RECYCLER_SHRINK_SIZE: usize = 1024; +// Lookback window for averaging number of garbage collected objects in terms +// of number of allocations. +const RECYCLER_SHRINK_WINDOW: usize = 16384; + #[derive(Debug, Default)] struct RecyclerStats { total: AtomicUsize, @@ -21,6 +33,8 @@ pub struct RecyclerX { gc: Mutex>, stats: RecyclerStats, id: usize, + // Shrink window times the exponential moving average size of gc.len(). + size_factor: AtomicUsize, } impl Default for RecyclerX { @@ -28,9 +42,10 @@ impl Default for RecyclerX { let id = thread_rng().gen_range(0, 1000); trace!("new recycler..{}", id); RecyclerX { - gc: Mutex::new(vec![]), + gc: Mutex::default(), stats: RecyclerStats::default(), id, + size_factor: AtomicUsize::default(), } } } @@ -74,19 +89,37 @@ impl Recycler { } pub fn allocate(&self, name: &'static str) -> T { - let new = self - .recycler - .gc - .lock() - .expect("recycler lock in pb fn allocate") - .pop(); - - if let Some(mut x) = new { - self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); - x.reset(); - return x; + { + const RECYCLER_SHRINK_WINDOW_HALF: usize = RECYCLER_SHRINK_WINDOW / 2; + const RECYCLER_SHRINK_WINDOW_SUB_ONE: usize = RECYCLER_SHRINK_WINDOW - 1; + let mut gc = self.recycler.gc.lock().unwrap(); + // Update the exponential moving average of gc.len(). + // The update equation is: + // a <- a * (n - 1) / n + x / n + // To avoid floating point math, define b = n a: + // b <- b * (n - 1) / n + x + // To make the remaining division to round (instead of truncate), + // add n/2 to the numerator. + // Effectively b (size_factor here) is an exponential moving + // estimate of the "sum" of x (gc.len()) over the window as opposed + // to the "average". + self.recycler.size_factor.store( + self.recycler + .size_factor + .load(Ordering::Acquire) + .saturating_mul(RECYCLER_SHRINK_WINDOW_SUB_ONE) + .saturating_add(RECYCLER_SHRINK_WINDOW_HALF) + .checked_div(RECYCLER_SHRINK_WINDOW) + .unwrap() + .saturating_add(gc.len()), + Ordering::Release, + ); + if let Some(mut x) = gc.pop() { + self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed); + x.reset(); + return x; + } } - let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed); trace!( "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}", @@ -108,6 +141,16 @@ impl RecyclerX { let len = { let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle"); gc.push(x); + const SIZE_FACTOR_AFTER_SHRINK: usize = RECYCLER_SHRINK_SIZE * RECYCLER_SHRINK_WINDOW; + if gc.len() > RECYCLER_SHRINK_SIZE + && self.size_factor.load(Ordering::Acquire) >= SIZE_FACTOR_AFTER_SHRINK + { + for mut x in gc.drain(RECYCLER_SHRINK_SIZE..) { + x.set_recycler(Weak::default()); + } + self.size_factor + .store(SIZE_FACTOR_AFTER_SHRINK, Ordering::Release); + } gc.len() }; @@ -137,6 +180,8 @@ impl RecyclerX { #[cfg(test)] mod tests { use super::*; + use crate::packet::PacketsRecycler; + use std::iter::repeat_with; impl Reset for u64 { fn reset(&mut self) { @@ -159,4 +204,28 @@ mod tests { assert_eq!(z, 10); assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0); } + + #[test] + fn test_recycler_shrink() { + let mut rng = rand::thread_rng(); + let recycler = PacketsRecycler::default(); + // Allocate a burst of packets. + const NUM_PACKETS: usize = RECYCLER_SHRINK_SIZE * 2; + { + let _packets: Vec<_> = repeat_with(|| recycler.allocate("")) + .take(NUM_PACKETS) + .collect(); + } + assert_eq!(recycler.recycler.gc.lock().unwrap().len(), NUM_PACKETS); + // Process a normal load of packets for a while. + for _ in 0..RECYCLER_SHRINK_WINDOW / 16 { + let count = rng.gen_range(1, 128); + let _packets: Vec<_> = repeat_with(|| recycler.allocate("")).take(count).collect(); + } + // Assert that the gc size has shrinked. + assert_eq!( + recycler.recycler.gc.lock().unwrap().len(), + RECYCLER_SHRINK_SIZE + ); + } }