diff --git a/runtime/src/bucket_map_holder_stats.rs b/runtime/src/bucket_map_holder_stats.rs index 69cbd1460e..663180124d 100644 --- a/runtime/src/bucket_map_holder_stats.rs +++ b/runtime/src/bucket_map_holder_stats.rs @@ -43,7 +43,8 @@ pub struct BucketMapHolderStats { pub get_range_us: AtomicU64, last_age: AtomicU8, last_ages_flushed: AtomicU64, - pub flush_scan_update_us: AtomicU64, + pub flush_scan_us: AtomicU64, + pub flush_update_us: AtomicU64, pub flush_remove_us: AtomicU64, pub flush_grow_us: AtomicU64, last_was_startup: AtomicBool, @@ -329,8 +330,13 @@ impl BucketMapHolderStats { ("keys", self.keys.swap(0, Ordering::Relaxed), i64), ("ms_per_age", ms_per_age, i64), ( - "flush_scan_update_us", - self.flush_scan_update_us.swap(0, Ordering::Relaxed), + "flush_scan_us", + self.flush_scan_us.swap(0, Ordering::Relaxed), + i64 + ), + ( + "flush_update_us", + self.flush_update_us.swap(0, Ordering::Relaxed), i64 ), ( diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index efd6b4cf5f..3dfdd585cc 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -948,20 +948,21 @@ impl InMemAccountsIndex { .store(estimate_mem as u64, Ordering::Relaxed); // may have to loop if disk has to grow and we have to restart - loop { + { + let mut dirty_items; let mut evictions; let mut evictions_random = Vec::default(); let disk = self.bucket.as_ref().unwrap(); let mut flush_entries_updated_on_disk = 0; - let mut disk_resize = Ok(()); let mut flush_should_evict_us = 0; - // scan and update loop + // scan loop // holds read lock { let map = self.map().read().unwrap(); evictions = Vec::with_capacity(map.len()); - let m = Measure::start("flush_scan_and_update"); // we don't care about lock time in this metric - bg threads can wait + dirty_items = Vec::with_capacity(map.len()); + let m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait for (k, v) in map.iter() { let mut mse = Measure::start("flush_should_evict"); let (evict_for_age, slot_list) = @@ -982,18 +983,7 @@ impl InMemAccountsIndex { // That prevents dropping an item from cache before disk is updated to latest in mem. // happens inside of lock on in-mem cache. This is because of deleting items // it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok. - { - let slot_list = - slot_list.unwrap_or_else(|| v.slot_list.read().unwrap()); - disk_resize = disk.try_write(k, (&slot_list, v.ref_count())); - } - if disk_resize.is_ok() { - flush_entries_updated_on_disk += 1; - } else { - // disk needs to resize, so mark all unprocessed items as dirty again so we pick them up after the resize - v.set_dirty(true); - break; - } + dirty_items.push((*k, Arc::clone(v))); } else { drop(slot_list); } @@ -1003,7 +993,36 @@ impl InMemAccountsIndex { evictions_random.push(*k); } } - Self::update_time_stat(&self.stats().flush_scan_update_us, m); + Self::update_time_stat(&self.stats().flush_scan_us, m); + } + { + // write to disk outside giant read lock + let m = Measure::start("flush_update"); // we don't care about lock time in this metric - bg threads can wait + for (k, v) in dirty_items { + if v.dirty() { + // already marked dirty again, skip it + continue; + } + loop { + let disk_resize = { + let slot_list = v.slot_list.read().unwrap(); + disk.try_write(&k, (&slot_list, v.ref_count())) + }; + match disk_resize { + Ok(_) => { + // successfully written to disk + flush_entries_updated_on_disk += 1; + } + Err(err) => { + // disk needs to resize. This item did not get resized. Resize and try again. + let m = Measure::start("flush_grow"); + disk.grow(err); + Self::update_time_stat(&self.stats().flush_grow_us, m); + } + } + } + } + Self::update_time_stat(&self.stats().flush_update_us, m); } Self::update_stat(&self.stats().flush_should_evict_us, flush_should_evict_us); @@ -1012,29 +1031,18 @@ impl InMemAccountsIndex { flush_entries_updated_on_disk, ); - let m = Measure::start("flush_evict_or_grow"); - match disk_resize { - Ok(_) => { - if !self.evict_from_cache(evictions, current_age, startup, false) - || !self.evict_from_cache(evictions_random, current_age, startup, true) - { - iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely - } - Self::update_time_stat(&self.stats().flush_remove_us, m); + let m = Measure::start("flush_evict"); + if !self.evict_from_cache(evictions, current_age, startup, false) + || !self.evict_from_cache(evictions_random, current_age, startup, true) + { + iterate_for_age = false; // did not make it all the way through this bucket, so didn't handle age completely + } + Self::update_time_stat(&self.stats().flush_remove_us, m); - if iterate_for_age { - // completed iteration of the buckets at the current age - assert_eq!(current_age, self.storage.current_age()); - self.set_has_aged(current_age); - } - return; - } - Err(err) => { - // grow the bucket, outside of all in-mem locks. - // then, loop to try again - disk.grow(err); - Self::update_time_stat(&self.stats().flush_grow_us, m); - } + if iterate_for_age { + // completed iteration of the buckets at the current age + assert_eq!(current_age, self.storage.current_age()); + self.set_has_aged(current_age); } } }