AcctIdx: move write to disk outside in mem write lock (#23731)

This commit is contained in:
Jeff Washington (jwash) 2022-03-17 23:09:41 -05:00 committed by GitHub
parent 7ff8c80e25
commit 857576d76f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 42 deletions

View File

@ -43,7 +43,8 @@ pub struct BucketMapHolderStats {
pub get_range_us: AtomicU64,
last_age: AtomicU8,
last_ages_flushed: AtomicU64,
pub flush_scan_update_us: AtomicU64,
pub flush_scan_us: AtomicU64,
pub flush_update_us: AtomicU64,
pub flush_remove_us: AtomicU64,
pub flush_grow_us: AtomicU64,
last_was_startup: AtomicBool,
@ -329,8 +330,13 @@ impl BucketMapHolderStats {
("keys", self.keys.swap(0, Ordering::Relaxed), i64),
("ms_per_age", ms_per_age, i64),
(
"flush_scan_update_us",
self.flush_scan_update_us.swap(0, Ordering::Relaxed),
"flush_scan_us",
self.flush_scan_us.swap(0, Ordering::Relaxed),
i64
),
(
"flush_update_us",
self.flush_update_us.swap(0, Ordering::Relaxed),
i64
),
(

View File

@ -948,20 +948,21 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
.store(estimate_mem as u64, Ordering::Relaxed);
// may have to loop if disk has to grow and we have to restart
loop {
{
let mut dirty_items;
let mut evictions;
let mut evictions_random = Vec::default();
let disk = self.bucket.as_ref().unwrap();
let mut flush_entries_updated_on_disk = 0;
let mut disk_resize = Ok(());
let mut flush_should_evict_us = 0;
// scan and update loop
// scan loop
// holds read lock
{
let map = self.map().read().unwrap();
evictions = Vec::with_capacity(map.len());
let m = Measure::start("flush_scan_and_update"); // we don't care about lock time in this metric - bg threads can wait
dirty_items = Vec::with_capacity(map.len());
let m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
for (k, v) in map.iter() {
let mut mse = Measure::start("flush_should_evict");
let (evict_for_age, slot_list) =
@ -982,18 +983,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
// That prevents dropping an item from cache before disk is updated to latest in mem.
// happens inside of lock on in-mem cache. This is because of deleting items
// it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok.
{
let slot_list =
slot_list.unwrap_or_else(|| v.slot_list.read().unwrap());
disk_resize = disk.try_write(k, (&slot_list, v.ref_count()));
}
if disk_resize.is_ok() {
flush_entries_updated_on_disk += 1;
} else {
// disk needs to resize, so mark all unprocessed items as dirty again so we pick them up after the resize
v.set_dirty(true);
break;
}
dirty_items.push((*k, Arc::clone(v)));
} else {
drop(slot_list);
}
@ -1003,7 +993,36 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
evictions_random.push(*k);
}
}
Self::update_time_stat(&self.stats().flush_scan_update_us, m);
Self::update_time_stat(&self.stats().flush_scan_us, m);
}
{
// write to disk outside giant read lock
let m = Measure::start("flush_update"); // we don't care about lock time in this metric - bg threads can wait
for (k, v) in dirty_items {
if v.dirty() {
// already marked dirty again, skip it
continue;
}
loop {
let disk_resize = {
let slot_list = v.slot_list.read().unwrap();
disk.try_write(&k, (&slot_list, v.ref_count()))
};
match disk_resize {
Ok(_) => {
// successfully written to disk
flush_entries_updated_on_disk += 1;
}
Err(err) => {
// disk needs to resize. This item did not get resized. Resize and try again.
let m = Measure::start("flush_grow");
disk.grow(err);
Self::update_time_stat(&self.stats().flush_grow_us, m);
}
}
}
}
Self::update_time_stat(&self.stats().flush_update_us, m);
}
Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us);
@ -1012,29 +1031,18 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
flush_entries_updated_on_disk,
);
let m = Measure::start("flush_evict_or_grow");
match disk_resize {
Ok(_) => {
if !self.evict_from_cache(evictions, current_age, startup, false)
|| !self.evict_from_cache(evictions_random, current_age, startup, true)
{
iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely
}
Self::update_time_stat(&self.stats().flush_remove_us, m);
let m = Measure::start("flush_evict");
if !self.evict_from_cache(evictions, current_age, startup, false)
|| !self.evict_from_cache(evictions_random, current_age, startup, true)
{
iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely
}
Self::update_time_stat(&self.stats().flush_remove_us, m);
if iterate_for_age {
// completed iteration of the buckets at the current age
assert_eq!(current_age, self.storage.current_age());
self.set_has_aged(current_age);
}
return;
}
Err(err) => {
// grow the bucket, outside of all in-mem locks.
// then, loop to try again
disk.grow(err);
Self::update_time_stat(&self.stats().flush_grow_us, m);
}
if iterate_for_age {
// completed iteration of the buckets at the current age
assert_eq!(current_age, self.storage.current_age());
self.set_has_aged(current_age);
}
}
}