AcctIdx: rework scan and write to disk (#23794)

This commit is contained in:
Jeff Washington (jwash) 2022-03-22 11:54:12 -05:00 committed by GitHub
parent 89ba3ff139
commit 1089a38aaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 112 additions and 81 deletions

View File

@ -64,10 +64,12 @@ pub enum InsertNewEntryResults {
ExistedNewEntryNonZeroLamports, ExistedNewEntryNonZeroLamports,
} }
/// result from scanning in-mem index during flush
struct FlushScanResult<T> { struct FlushScanResult<T> {
evictions: Vec<Pubkey>, /// pubkeys whose age indicates they may be evicted now, pending further checks.
evictions_random: Vec<Pubkey>, evictions_age_possible: Vec<(Pubkey, Option<AccountMapEntry<T>>)>,
dirty_items: Vec<(Pubkey, AccountMapEntry<T>)>, /// pubkeys chosen to evict based on random eviction
evictions_random: Vec<(Pubkey, Option<AccountMapEntry<T>>)>,
} }
#[allow(dead_code)] // temporary during staging #[allow(dead_code)] // temporary during staging
@ -942,50 +944,33 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
startup: bool, startup: bool,
_flush_guard: &FlushGuard, _flush_guard: &FlushGuard,
) -> FlushScanResult<T> { ) -> FlushScanResult<T> {
let exceeds_budget = self.get_exceeds_budget(); let m;
let map = self.map().read().unwrap();
let mut evictions = Vec::with_capacity(map.len());
let mut evictions_random = Vec::default(); let mut evictions_random = Vec::default();
let mut dirty_items = Vec::with_capacity(map.len()); let mut evictions_age_possible;
let mut flush_should_evict_us = 0; {
let m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait let map = self.map().read().unwrap();
for (k, v) in map.iter() { evictions_age_possible = Vec::with_capacity(map.len());
let mut mse = Measure::start("flush_should_evict"); m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
let (evict_for_age, slot_list) = for (k, v) in map.iter() {
self.should_evict_from_mem(current_age, v, startup, true, exceeds_budget); let random = Self::random_chance_of_eviction();
mse.stop(); if !random && !Self::should_evict_based_on_age(current_age, v, startup) {
flush_should_evict_us += mse.as_us(); // not planning to evict this item from memory now, so don't write it to disk yet
if !evict_for_age && !Self::random_chance_of_eviction() { continue;
// not planning to evict this item from memory now, so don't write it to disk yet }
continue;
}
// if we are removing it, then we need to update disk if we're dirty if random {
if v.clear_dirty() { &mut evictions_random
// step 1: clear the dirty flag } else {
// step 2: perform the update on disk based on the fields in the entry &mut evictions_age_possible
// If a parallel operation dirties the item again - even while this flush is occurring, }
// the last thing the writer will do, after updating contents, is set_dirty(true) .push((*k, Some(Arc::clone(v))));
// That prevents dropping an item from cache before disk is updated to latest in mem.
// happens inside of lock on in-mem cache. This is because of deleting items
// it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok.
dirty_items.push((*k, Arc::clone(v)));
} else {
drop(slot_list);
}
if evict_for_age {
evictions.push(*k);
} else {
evictions_random.push(*k);
} }
} }
Self::update_time_stat(&self.stats().flush_scan_us, m); Self::update_time_stat(&self.stats().flush_scan_us, m);
Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us);
FlushScanResult { FlushScanResult {
evictions, evictions_age_possible,
evictions_random, evictions_random,
dirty_items,
} }
} }
@ -1000,57 +985,103 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
return; return;
} }
// may have to loop if disk has to grow and we have to restart // scan in-mem map for items that we may evict
{ let FlushScanResult {
let disk = self.bucket.as_ref().unwrap(); mut evictions_age_possible,
mut evictions_random,
} = self.flush_scan(current_age, startup, flush_guard);
let mut flush_entries_updated_on_disk = 0; // write to disk outside in-mem map read lock
let FlushScanResult { {
evictions, let mut evictions_age = Vec::with_capacity(evictions_age_possible.len());
evictions_random, if !evictions_age_possible.is_empty() || !evictions_random.is_empty() {
dirty_items, let disk = self.bucket.as_ref().unwrap();
} = self.flush_scan(current_age, startup, flush_guard); let mut flush_entries_updated_on_disk = 0;
{ let exceeds_budget = self.get_exceeds_budget();
// write to disk outside giant read lock let mut flush_should_evict_us = 0;
let m = Measure::start("flush_update"); // we don't care about lock time in this metric - bg threads can wait // we don't care about lock time in this metric - bg threads can wait
for (k, v) in dirty_items { let m = Measure::start("flush_update");
if v.dirty() {
// already marked dirty again, skip it // consider whether to write to disk for all the items we may evict, whether evicting due to age or random
continue; for (is_random, check_for_eviction_and_dirty) in [
} (false, &mut evictions_age_possible),
loop { (true, &mut evictions_random),
let disk_resize = { ] {
let slot_list = v.slot_list.read().unwrap(); for (k, v) in check_for_eviction_and_dirty {
disk.try_write(&k, (&slot_list, v.ref_count())) let v = v.take().unwrap();
}; let mut slot_list = None;
match disk_resize { if !is_random {
Ok(_) => { let mut mse = Measure::start("flush_should_evict");
// successfully written to disk let (evict_for_age, slot_list_temp) = self.should_evict_from_mem(
flush_entries_updated_on_disk += 1; current_age,
break; &v,
startup,
true,
exceeds_budget,
);
slot_list = slot_list_temp;
mse.stop();
flush_should_evict_us += mse.as_us();
if evict_for_age {
evictions_age.push(*k);
} else {
// not evicting, so don't write, even if dirty
continue;
} }
Err(err) => { }
// disk needs to resize. This item did not get resized. Resize and try again. // if we are evicting it, then we need to update disk if we're dirty
let m = Measure::start("flush_grow"); if v.clear_dirty() {
disk.grow(err); // step 1: clear the dirty flag
Self::update_time_stat(&self.stats().flush_grow_us, m); // step 2: perform the update on disk based on the fields in the entry
// If a parallel operation dirties the item again - even while this flush is occurring,
// the last thing the writer will do, after updating contents, is set_dirty(true)
// That prevents dropping an item from cache before disk is updated to latest in mem.
// It is possible that the item in the cache is marked as dirty while these updates are happening. That is ok.
// The dirty will be picked up and the item will be prevented from being evicted.
// may have to loop if disk has to grow and we have to retry the write
loop {
let disk_resize = {
let slot_list = slot_list
.take()
.unwrap_or_else(|| v.slot_list.read().unwrap());
disk.try_write(k, (&slot_list, v.ref_count()))
};
match disk_resize {
Ok(_) => {
// successfully written to disk
flush_entries_updated_on_disk += 1;
break;
}
Err(err) => {
// disk needs to resize. This item did not get written. Resize and try again.
let m = Measure::start("flush_grow");
disk.grow(err);
Self::update_time_stat(&self.stats().flush_grow_us, m);
}
}
} }
} }
} }
} }
Self::update_time_stat(&self.stats().flush_update_us, m); Self::update_time_stat(&self.stats().flush_update_us, m);
Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us);
Self::update_stat(
&self.stats().flush_entries_updated_on_disk,
flush_entries_updated_on_disk,
);
// remove the 'v'
let evictions_random = evictions_random
.into_iter()
.map(|(k, _v)| k)
.collect::<Vec<_>>();
let m = Measure::start("flush_evict");
self.evict_from_cache(evictions_age, current_age, startup, false);
self.evict_from_cache(evictions_random, current_age, startup, true);
Self::update_time_stat(&self.stats().flush_evict_us, m);
} }
Self::update_stat(
&self.stats().flush_entries_updated_on_disk,
flush_entries_updated_on_disk,
);
let m = Measure::start("flush_evict");
self.evict_from_cache(evictions, current_age, startup, false);
self.evict_from_cache(evictions_random, current_age, startup, true);
Self::update_time_stat(&self.stats().flush_evict_us, m);
if iterate_for_age { if iterate_for_age {
// completed iteration of the buckets at the current age // completed iteration of the buckets at the current age
assert_eq!(current_age, self.storage.current_age()); assert_eq!(current_age, self.storage.current_age());