From 4b8f881af3e14596c8c82ac06d2c5576b6e2a3ab Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Wed, 3 Aug 2022 13:44:01 -0500 Subject: [PATCH] in mem acct idx scan multiple ages simultaneously (#26853) --- runtime/src/in_mem_accounts_index.rs | 120 +++++++++++++++++++++++---- 1 file changed, 103 insertions(+), 17 deletions(-) diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 89babf1dad..b252499267 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -30,6 +30,61 @@ type CacheRangesHeld = RwLock>>; type InMemMap = HashMap>; +#[derive(Debug)] +pub struct PossibleEvictions { + /// vec per age in the future, up to size 'ages_to_stay_in_cache' + possible_evictions: Vec>, + /// next index to use into 'possible_evictions' + /// if 'index' >= 'possible_evictions.len()', then there are no available entries + index: usize, +} + +impl PossibleEvictions { + fn new(max_ages: Age) -> Self { + Self { + possible_evictions: (0..max_ages).map(|_| FlushScanResult::default()).collect(), + index: max_ages as usize, // initially no data + } + } + + /// remove the possible evictions. This is required because we need ownership of the Arc strong counts to transfer to caller so entries can be removed from the accounts index + fn get_possible_evictions(&mut self) -> Option> { + self.possible_evictions.get_mut(self.index).map(|result| { + self.index += 1; + // remove the list from 'possible_evictions' + std::mem::take(result) + }) + } + + /// clear existing data and prepare to add 'entries' more ages of data + fn reset(&mut self, entries: Age) { + self.possible_evictions.iter_mut().for_each(|entry| { + entry.evictions_random.clear(); + entry.evictions_age_possible.clear(); + }); + let entries = entries as usize; + assert!( + entries <= self.possible_evictions.len(), + "entries: {}, len: {}", + entries, + self.possible_evictions.len() + ); + self.index = self.possible_evictions.len() - entries; + } + + /// insert 'entry' at 'relative_age' in the future into 'possible_evictions' + fn insert(&mut self, relative_age: Age, key: Pubkey, entry: AccountMapEntry, random: bool) { + let index = self.index + (relative_age as usize); + let list = &mut self.possible_evictions[index]; + if random { + &mut list.evictions_random + } else { + &mut list.evictions_age_possible + } + .push((key, entry)); + } +} + // one instance of this represents one bin of the accounts index. pub struct InMemAccountsIndex { last_age_flushed: AtomicU8, @@ -52,6 +107,12 @@ pub struct InMemAccountsIndex { /// info to streamline initial index generation startup_info: Mutex>, + + /// possible evictions for next few slots coming up + possible_evictions: RwLock>, + /// when age % ages_to_stay_in_cache == 'age_to_flush_bin_offset', then calculate the next 'ages_to_stay_in_cache' 'possible_evictions' + /// this causes us to scan the entire in-mem hash map every 1/'ages_to_stay_in_cache' instead of each age + age_to_flush_bin_mod: Age, } impl Debug for InMemAccountsIndex { @@ -74,6 +135,7 @@ struct StartupInfo { duplicates: Vec<(Slot, Pubkey)>, } +#[derive(Default, Debug)] /// result from scanning in-mem index during flush struct FlushScanResult { /// pubkeys whose age indicates they may be evicted now, pending further checks. @@ -84,6 +146,7 @@ struct FlushScanResult { impl InMemAccountsIndex { pub fn new(storage: &Arc>, bin: usize) -> Self { + let ages_to_stay_in_cache = storage.ages_to_stay_in_cache; Self { map_internal: RwLock::default(), storage: Arc::clone(storage), @@ -100,6 +163,23 @@ impl InMemAccountsIndex { // initialize this to max, to make it clear we have not flushed at age 0, the starting age last_age_flushed: AtomicU8::new(Age::MAX), startup_info: Mutex::default(), + possible_evictions: RwLock::new(PossibleEvictions::new(ages_to_stay_in_cache)), + // Spread out the scanning across all ages within the window. + // This causes us to scan 1/N of the bins each 'Age' + age_to_flush_bin_mod: thread_rng().gen_range(0, ages_to_stay_in_cache), + } + } + + /// # ages to scan ahead + fn ages_to_scan_ahead(&self, current_age: Age) -> Age { + let ages_to_stay_in_cache = self.storage.ages_to_stay_in_cache; + if (self.age_to_flush_bin_mod == current_age % ages_to_stay_in_cache) + && !self.storage.get_startup() + { + // scan ahead multiple ages + ages_to_stay_in_cache + } else { + 1 // just current age } } @@ -973,34 +1053,40 @@ impl InMemAccountsIndex { startup: bool, _flush_guard: &FlushGuard, ) -> FlushScanResult { + let mut possible_evictions = self.possible_evictions.write().unwrap(); + if let Some(result) = possible_evictions.get_possible_evictions() { + // we have previously calculated the possible evictions for this age + return result; + } + // otherwise, we need to scan some number of ages into the future now + let ages_to_scan = self.ages_to_scan_ahead(current_age); + possible_evictions.reset(ages_to_scan); + let m; - let mut evictions_random = Vec::default(); - let mut evictions_age_possible; { let map = self.map_internal.read().unwrap(); - evictions_age_possible = Vec::with_capacity(map.len()); m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait for (k, v) in map.iter() { let random = Self::random_chance_of_eviction(); - if !random && !Self::should_evict_based_on_age(current_age, v, startup) { - // not planning to evict this item from memory now, so don't write it to disk yet - continue; - } - - if random { - &mut evictions_random + let age_offset = if random { + thread_rng().gen_range(0, ages_to_scan) + } else if startup { + 0 } else { - &mut evictions_age_possible - } - .push((*k, Arc::clone(v))); + let ages_in_future = v.age().wrapping_sub(current_age); + if ages_in_future >= ages_to_scan { + // not planning to evict this item from memory within the next few ages + continue; + } + ages_in_future + }; + + possible_evictions.insert(age_offset, *k, Arc::clone(v), random); } } Self::update_time_stat(&self.stats().flush_scan_us, m); - FlushScanResult { - evictions_age_possible, - evictions_random, - } + possible_evictions.get_possible_evictions().unwrap() } fn write_startup_info_to_disk(&self) {