diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 72fe753c70..88119b3a92 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -64,10 +64,12 @@ pub enum InsertNewEntryResults { ExistedNewEntryNonZeroLamports, } +/// result from scanning in-mem index during flush struct FlushScanResult { - evictions: Vec, - evictions_random: Vec, - dirty_items: Vec<(Pubkey, AccountMapEntry)>, + /// pubkeys whose age indicates they may be evicted now, pending further checks. + evictions_age_possible: Vec<(Pubkey, Option>)>, + /// pubkeys chosen to evict based on random eviction + evictions_random: Vec<(Pubkey, Option>)>, } #[allow(dead_code)] // temporary during staging @@ -942,50 +944,33 @@ impl InMemAccountsIndex { startup: bool, _flush_guard: &FlushGuard, ) -> FlushScanResult { - let exceeds_budget = self.get_exceeds_budget(); - let map = self.map().read().unwrap(); - let mut evictions = Vec::with_capacity(map.len()); + let m; let mut evictions_random = Vec::default(); - let mut dirty_items = Vec::with_capacity(map.len()); - let mut flush_should_evict_us = 0; - let m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait - for (k, v) in map.iter() { - let mut mse = Measure::start("flush_should_evict"); - let (evict_for_age, slot_list) = - self.should_evict_from_mem(current_age, v, startup, true, exceeds_budget); - mse.stop(); - flush_should_evict_us += mse.as_us(); - if !evict_for_age && !Self::random_chance_of_eviction() { - // not planning to evict this item from memory now, so don't write it to disk yet - continue; - } + let mut evictions_age_possible; + { + let map = self.map().read().unwrap(); + evictions_age_possible = Vec::with_capacity(map.len()); + m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait + for (k, v) in map.iter() { + let random = Self::random_chance_of_eviction(); + if !random && !Self::should_evict_based_on_age(current_age, v, startup) { + // not planning to evict this item from memory now, so don't write it to disk yet + continue; + } - // if we are removing it, then we need to update disk if we're dirty - if v.clear_dirty() { - // step 1: clear the dirty flag - // step 2: perform the update on disk based on the fields in the entry - // If a parallel operation dirties the item again - even while this flush is occurring, - // the last thing the writer will do, after updating contents, is set_dirty(true) - // That prevents dropping an item from cache before disk is updated to latest in mem. - // happens inside of lock on in-mem cache. This is because of deleting items - // it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok. - dirty_items.push((*k, Arc::clone(v))); - } else { - drop(slot_list); - } - if evict_for_age { - evictions.push(*k); - } else { - evictions_random.push(*k); + if random { + &mut evictions_random + } else { + &mut evictions_age_possible + } + .push((*k, Some(Arc::clone(v)))); } } Self::update_time_stat(&self.stats().flush_scan_us, m); - Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us); FlushScanResult { - evictions, + evictions_age_possible, evictions_random, - dirty_items, } } @@ -1000,57 +985,103 @@ impl InMemAccountsIndex { return; } - // may have to loop if disk has to grow and we have to restart - { - let disk = self.bucket.as_ref().unwrap(); + // scan in-mem map for items that we may evict + let FlushScanResult { + mut evictions_age_possible, + mut evictions_random, + } = self.flush_scan(current_age, startup, flush_guard); - let mut flush_entries_updated_on_disk = 0; - let FlushScanResult { - evictions, - evictions_random, - dirty_items, - } = self.flush_scan(current_age, startup, flush_guard); - { - // write to disk outside giant read lock - let m = Measure::start("flush_update"); // we don't care about lock time in this metric - bg threads can wait - for (k, v) in dirty_items { - if v.dirty() { - // already marked dirty again, skip it - continue; - } - loop { - let disk_resize = { - let slot_list = v.slot_list.read().unwrap(); - disk.try_write(&k, (&slot_list, v.ref_count())) - }; - match disk_resize { - Ok(_) => { - // successfully written to disk - flush_entries_updated_on_disk += 1; - break; + // write to disk outside in-mem map read lock + { + let mut evictions_age = Vec::with_capacity(evictions_age_possible.len()); + if !evictions_age_possible.is_empty() || !evictions_random.is_empty() { + let disk = self.bucket.as_ref().unwrap(); + let mut flush_entries_updated_on_disk = 0; + let exceeds_budget = self.get_exceeds_budget(); + let mut flush_should_evict_us = 0; + // we don't care about lock time in this metric - bg threads can wait + let m = Measure::start("flush_update"); + + // consider whether to write to disk for all the items we may evict, whether evicting due to age or random + for (is_random, check_for_eviction_and_dirty) in [ + (false, &mut evictions_age_possible), + (true, &mut evictions_random), + ] { + for (k, v) in check_for_eviction_and_dirty { + let v = v.take().unwrap(); + let mut slot_list = None; + if !is_random { + let mut mse = Measure::start("flush_should_evict"); + let (evict_for_age, slot_list_temp) = self.should_evict_from_mem( + current_age, + &v, + startup, + true, + exceeds_budget, + ); + slot_list = slot_list_temp; + mse.stop(); + flush_should_evict_us += mse.as_us(); + if evict_for_age { + evictions_age.push(*k); + } else { + // not evicting, so don't write, even if dirty + continue; } - Err(err) => { - // disk needs to resize. This item did not get resized. Resize and try again. - let m = Measure::start("flush_grow"); - disk.grow(err); - Self::update_time_stat(&self.stats().flush_grow_us, m); + } + // if we are evicting it, then we need to update disk if we're dirty + if v.clear_dirty() { + // step 1: clear the dirty flag + // step 2: perform the update on disk based on the fields in the entry + // If a parallel operation dirties the item again - even while this flush is occurring, + // the last thing the writer will do, after updating contents, is set_dirty(true) + // That prevents dropping an item from cache before disk is updated to latest in mem. + // It is possible that the item in the cache is marked as dirty while these updates are happening. That is ok. + // The dirty will be picked up and the item will be prevented from being evicted. + + // may have to loop if disk has to grow and we have to retry the write + loop { + let disk_resize = { + let slot_list = slot_list + .take() + .unwrap_or_else(|| v.slot_list.read().unwrap()); + disk.try_write(k, (&slot_list, v.ref_count())) + }; + match disk_resize { + Ok(_) => { + // successfully written to disk + flush_entries_updated_on_disk += 1; + break; + } + Err(err) => { + // disk needs to resize. This item did not get written. Resize and try again. + let m = Measure::start("flush_grow"); + disk.grow(err); + Self::update_time_stat(&self.stats().flush_grow_us, m); + } + } } } } } Self::update_time_stat(&self.stats().flush_update_us, m); + Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us); + Self::update_stat( + &self.stats().flush_entries_updated_on_disk, + flush_entries_updated_on_disk, + ); + // remove the 'v' + let evictions_random = evictions_random + .into_iter() + .map(|(k, _v)| k) + .collect::>(); + + let m = Measure::start("flush_evict"); + self.evict_from_cache(evictions_age, current_age, startup, false); + self.evict_from_cache(evictions_random, current_age, startup, true); + Self::update_time_stat(&self.stats().flush_evict_us, m); } - Self::update_stat( - &self.stats().flush_entries_updated_on_disk, - flush_entries_updated_on_disk, - ); - - let m = Measure::start("flush_evict"); - self.evict_from_cache(evictions, current_age, startup, false); - self.evict_from_cache(evictions_random, current_age, startup, true); - Self::update_time_stat(&self.stats().flush_evict_us, m); - if iterate_for_age { // completed iteration of the buckets at the current age assert_eq!(current_age, self.storage.current_age());