in mem acct idx scan multiple ages simultaneously (#26853)

This commit is contained in:
Jeff Washington (jwash) 2022-08-03 13:44:01 -05:00 committed by GitHub
parent 526001557d
commit 4b8f881af3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 103 additions and 17 deletions

View File

@ -30,6 +30,61 @@ type CacheRangesHeld = RwLock<Vec<RangeInclusive<Pubkey>>>;
type InMemMap<T> = HashMap<Pubkey, AccountMapEntry<T>>;
#[derive(Debug)]
pub struct PossibleEvictions<T: IndexValue> {
/// vec per age in the future, up to size 'ages_to_stay_in_cache'
possible_evictions: Vec<FlushScanResult<T>>,
/// next index to use into 'possible_evictions'
/// if 'index' >= 'possible_evictions.len()', then there are no available entries
index: usize,
}
impl<T: IndexValue> PossibleEvictions<T> {
fn new(max_ages: Age) -> Self {
Self {
possible_evictions: (0..max_ages).map(|_| FlushScanResult::default()).collect(),
index: max_ages as usize, // initially no data
}
}
/// remove the possible evictions. This is required because we need ownership of the Arc strong counts to transfer to caller so entries can be removed from the accounts index
fn get_possible_evictions(&mut self) -> Option<FlushScanResult<T>> {
self.possible_evictions.get_mut(self.index).map(|result| {
self.index += 1;
// remove the list from 'possible_evictions'
std::mem::take(result)
})
}
/// clear existing data and prepare to add 'entries' more ages of data
fn reset(&mut self, entries: Age) {
self.possible_evictions.iter_mut().for_each(|entry| {
entry.evictions_random.clear();
entry.evictions_age_possible.clear();
});
let entries = entries as usize;
assert!(
entries <= self.possible_evictions.len(),
"entries: {}, len: {}",
entries,
self.possible_evictions.len()
);
self.index = self.possible_evictions.len() - entries;
}
/// insert 'entry' at 'relative_age' in the future into 'possible_evictions'
fn insert(&mut self, relative_age: Age, key: Pubkey, entry: AccountMapEntry<T>, random: bool) {
let index = self.index + (relative_age as usize);
let list = &mut self.possible_evictions[index];
if random {
&mut list.evictions_random
} else {
&mut list.evictions_age_possible
}
.push((key, entry));
}
}
// one instance of this represents one bin of the accounts index.
pub struct InMemAccountsIndex<T: IndexValue> {
last_age_flushed: AtomicU8,
@ -52,6 +107,12 @@ pub struct InMemAccountsIndex<T: IndexValue> {
/// info to streamline initial index generation
startup_info: Mutex<StartupInfo<T>>,
/// possible evictions for next few slots coming up
possible_evictions: RwLock<PossibleEvictions<T>>,
/// when age % ages_to_stay_in_cache == 'age_to_flush_bin_offset', then calculate the next 'ages_to_stay_in_cache' 'possible_evictions'
/// this causes us to scan the entire in-mem hash map every 1/'ages_to_stay_in_cache' instead of each age
age_to_flush_bin_mod: Age,
}
impl<T: IndexValue> Debug for InMemAccountsIndex<T> {
@ -74,6 +135,7 @@ struct StartupInfo<T: IndexValue> {
duplicates: Vec<(Slot, Pubkey)>,
}
#[derive(Default, Debug)]
/// result from scanning in-mem index during flush
struct FlushScanResult<T> {
/// pubkeys whose age indicates they may be evicted now, pending further checks.
@ -84,6 +146,7 @@ struct FlushScanResult<T> {
impl<T: IndexValue> InMemAccountsIndex<T> {
pub fn new(storage: &Arc<BucketMapHolder<T>>, bin: usize) -> Self {
let ages_to_stay_in_cache = storage.ages_to_stay_in_cache;
Self {
map_internal: RwLock::default(),
storage: Arc::clone(storage),
@ -100,6 +163,23 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
// initialize this to max, to make it clear we have not flushed at age 0, the starting age
last_age_flushed: AtomicU8::new(Age::MAX),
startup_info: Mutex::default(),
possible_evictions: RwLock::new(PossibleEvictions::new(ages_to_stay_in_cache)),
// Spread out the scanning across all ages within the window.
// This causes us to scan 1/N of the bins each 'Age'
age_to_flush_bin_mod: thread_rng().gen_range(0, ages_to_stay_in_cache),
}
}
/// # ages to scan ahead
fn ages_to_scan_ahead(&self, current_age: Age) -> Age {
let ages_to_stay_in_cache = self.storage.ages_to_stay_in_cache;
if (self.age_to_flush_bin_mod == current_age % ages_to_stay_in_cache)
&& !self.storage.get_startup()
{
// scan ahead multiple ages
ages_to_stay_in_cache
} else {
1 // just current age
}
}
@ -973,34 +1053,40 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
startup: bool,
_flush_guard: &FlushGuard,
) -> FlushScanResult<T> {
let mut possible_evictions = self.possible_evictions.write().unwrap();
if let Some(result) = possible_evictions.get_possible_evictions() {
// we have previously calculated the possible evictions for this age
return result;
}
// otherwise, we need to scan some number of ages into the future now
let ages_to_scan = self.ages_to_scan_ahead(current_age);
possible_evictions.reset(ages_to_scan);
let m;
let mut evictions_random = Vec::default();
let mut evictions_age_possible;
{
let map = self.map_internal.read().unwrap();
evictions_age_possible = Vec::with_capacity(map.len());
m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
for (k, v) in map.iter() {
let random = Self::random_chance_of_eviction();
if !random && !Self::should_evict_based_on_age(current_age, v, startup) {
// not planning to evict this item from memory now, so don't write it to disk yet
continue;
}
if random {
&mut evictions_random
let age_offset = if random {
thread_rng().gen_range(0, ages_to_scan)
} else if startup {
0
} else {
&mut evictions_age_possible
}
.push((*k, Arc::clone(v)));
let ages_in_future = v.age().wrapping_sub(current_age);
if ages_in_future >= ages_to_scan {
// not planning to evict this item from memory within the next few ages
continue;
}
ages_in_future
};
possible_evictions.insert(age_offset, *k, Arc::clone(v), random);
}
}
Self::update_time_stat(&self.stats().flush_scan_us, m);
FlushScanResult {
evictions_age_possible,
evictions_random,
}
possible_evictions.get_possible_evictions().unwrap()
}
fn write_startup_info_to_disk(&self) {