adds a shrink policy to the recycler without an allocation limit

https://github.com/solana-labs/solana/pull/15320
added an allocation limit to the recycler, which has been the source of a
number of bugs. For example the code bellow panics by simply cloning packets:

    const RECYCLER_LIMIT: usize = 8;
    let recycler = PacketsRecycler::new_with_limit("", RECYCLER_LIMIT as u32);
    let packets = Packets::new_with_recycler(recycler.clone(), 1).unwrap();
    for _ in 0..RECYCLER_LIMIT {
        let _ = packets.clone();
    }
    Packets::new_with_recycler(recycler.clone(), 1);

The implementation also fails to account for instances where objects are
consumed. Having the allocation limit in the recycler also seems out of place,
as higher level code has better context to impose allocation limits (e.g. by
using bounded channels to rate-limit), whereas the recycler would be simpler
and more efficient if it just do the recycling.

This commit:
* Reverts https://github.com/solana-labs/solana/pull/15320
* Adds a shrink policy to the recycler without an allocation limit.
This commit is contained in:
behzad nouri 2021-04-07 11:16:03 -04:00
parent e405747409
commit cfe7a4340b
1 changed files with 82 additions and 13 deletions

View File

@ -3,6 +3,18 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
// A temporary burst in the workload can cause a large number of allocations,
// after which they will be recycled and still reside in memory. If the number
// of recycled objects stays above below limit for long, they will be deemed as
// redundant since they are not getting reused. The recycler will then shrink
// by releasing objects above this threshold. This limit aims to maintain a
// cushion against *normal* variations in the workload while bounding the
// number of redundant garbage collected objects after temporary bursts.
const RECYCLER_SHRINK_SIZE: usize = 1024;
// Lookback window for averaging number of garbage collected objects in terms
// of number of allocations.
const RECYCLER_SHRINK_WINDOW: usize = 16384;
#[derive(Debug, Default)]
struct RecyclerStats {
total: AtomicUsize,
@ -21,6 +33,8 @@ pub struct RecyclerX<T> {
gc: Mutex<Vec<T>>,
stats: RecyclerStats,
id: usize,
// Shrink window times the exponential moving average size of gc.len().
size_factor: AtomicUsize,
}
impl<T: Default> Default for RecyclerX<T> {
@ -28,9 +42,10 @@ impl<T: Default> Default for RecyclerX<T> {
let id = thread_rng().gen_range(0, 1000);
trace!("new recycler..{}", id);
RecyclerX {
gc: Mutex::new(vec![]),
gc: Mutex::default(),
stats: RecyclerStats::default(),
id,
size_factor: AtomicUsize::default(),
}
}
}
@ -74,19 +89,37 @@ impl<T: Default + Reset + Sized> Recycler<T> {
}
pub fn allocate(&self, name: &'static str) -> T {
let new = self
.recycler
.gc
.lock()
.expect("recycler lock in pb fn allocate")
.pop();
if let Some(mut x) = new {
self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
x.reset();
return x;
{
const RECYCLER_SHRINK_WINDOW_HALF: usize = RECYCLER_SHRINK_WINDOW / 2;
const RECYCLER_SHRINK_WINDOW_SUB_ONE: usize = RECYCLER_SHRINK_WINDOW - 1;
let mut gc = self.recycler.gc.lock().unwrap();
// Update the exponential moving average of gc.len().
// The update equation is:
// a <- a * (n - 1) / n + x / n
// To avoid floating point math, define b = n a:
// b <- b * (n - 1) / n + x
// To make the remaining division to round (instead of truncate),
// add n/2 to the numerator.
// Effectively b (size_factor here) is an exponential moving
// estimate of the "sum" of x (gc.len()) over the window as opposed
// to the "average".
self.recycler.size_factor.store(
self.recycler
.size_factor
.load(Ordering::Acquire)
.saturating_mul(RECYCLER_SHRINK_WINDOW_SUB_ONE)
.saturating_add(RECYCLER_SHRINK_WINDOW_HALF)
.checked_div(RECYCLER_SHRINK_WINDOW)
.unwrap()
.saturating_add(gc.len()),
Ordering::Release,
);
if let Some(mut x) = gc.pop() {
self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
x.reset();
return x;
}
}
let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed);
trace!(
"allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
@ -108,6 +141,16 @@ impl<T: Default + Reset> RecyclerX<T> {
let len = {
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
gc.push(x);
const SIZE_FACTOR_AFTER_SHRINK: usize = RECYCLER_SHRINK_SIZE * RECYCLER_SHRINK_WINDOW;
if gc.len() > RECYCLER_SHRINK_SIZE
&& self.size_factor.load(Ordering::Acquire) >= SIZE_FACTOR_AFTER_SHRINK
{
for mut x in gc.drain(RECYCLER_SHRINK_SIZE..) {
x.set_recycler(Weak::default());
}
self.size_factor
.store(SIZE_FACTOR_AFTER_SHRINK, Ordering::Release);
}
gc.len()
};
@ -137,6 +180,8 @@ impl<T: Default + Reset> RecyclerX<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::packet::PacketsRecycler;
use std::iter::repeat_with;
impl Reset for u64 {
fn reset(&mut self) {
@ -159,4 +204,28 @@ mod tests {
assert_eq!(z, 10);
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
}
#[test]
fn test_recycler_shrink() {
let mut rng = rand::thread_rng();
let recycler = PacketsRecycler::default();
// Allocate a burst of packets.
const NUM_PACKETS: usize = RECYCLER_SHRINK_SIZE * 2;
{
let _packets: Vec<_> = repeat_with(|| recycler.allocate(""))
.take(NUM_PACKETS)
.collect();
}
assert_eq!(recycler.recycler.gc.lock().unwrap().len(), NUM_PACKETS);
// Process a normal load of packets for a while.
for _ in 0..RECYCLER_SHRINK_WINDOW / 16 {
let count = rng.gen_range(1, 128);
let _packets: Vec<_> = repeat_with(|| recycler.allocate("")).take(count).collect();
}
// Assert that the gc size has shrinked.
assert_eq!(
recycler.recycler.gc.lock().unwrap().len(),
RECYCLER_SHRINK_SIZE
);
}
}