From deab944b3000db4c7f33c775d7fbf9cd003079b4 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Wed, 22 Sep 2021 09:55:40 -0500 Subject: [PATCH] AcctIdx: flush inserts to disk (#20074) --- runtime/src/bucket_map_holder_stats.rs | 25 +++++++++++ runtime/src/in_mem_accounts_index.rs | 60 ++++++++++++++++++++------ 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/runtime/src/bucket_map_holder_stats.rs b/runtime/src/bucket_map_holder_stats.rs index c77179573c..6073b03ecc 100644 --- a/runtime/src/bucket_map_holder_stats.rs +++ b/runtime/src/bucket_map_holder_stats.rs @@ -22,10 +22,14 @@ pub struct BucketMapHolderStats { pub inserts: AtomicU64, pub count_in_mem: AtomicU64, pub per_bucket_count: Vec, + pub flush_entries_updated_on_disk: AtomicU64, pub active_threads: AtomicU64, pub get_range_us: AtomicU64, last_age: AtomicU8, last_age_time: AtomicU64, + pub flush_scan_us: AtomicU64, + pub flush_update_us: AtomicU64, + pub flush_remove_us: AtomicU64, last_time: AtomicInterval, } @@ -163,6 +167,27 @@ impl BucketMapHolderStats { ("items", self.items.swap(0, Ordering::Relaxed), i64), ("keys", self.keys.swap(0, Ordering::Relaxed), i64), ("ms_per_age", ms_per_age, i64), + ( + "flush_scan_us", + self.flush_scan_us.swap(0, Ordering::Relaxed), + i64 + ), + ( + "flush_update_us", + self.flush_update_us.swap(0, Ordering::Relaxed), + i64 + ), + ( + "flush_remove_us", + self.flush_remove_us.swap(0, Ordering::Relaxed), + i64 + ), + ( + "flush_entries_updated_on_disk", + self.flush_entries_updated_on_disk + .swap(0, Ordering::Relaxed), + i64 + ), ); } } diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index bf7fb414d0..cf56f2020f 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -515,6 +515,11 @@ impl InMemAccountsIndex { self.storage.wait_dirty_or_aged.notify_one(); } + fn should_remove_from_mem(&self, current_age: Age, entry: &AccountMapEntry) -> bool { + // this could be tunable dynamically based on memory pressure + current_age == entry.age() + } + fn flush_internal(&self) { let was_dirty = self.bin_dirty.swap(false, Ordering::Acquire); let current_age = self.storage.current_age(); @@ -524,21 +529,52 @@ impl InMemAccountsIndex { return; } - let map = self.map().read().unwrap(); - for (_k, _v) in map.iter() { - /* - if v.dirty() { - // step 1: clear the dirty flag - // step 2: perform the update on disk based on the fields in the entry - // If a parallel operation dirties the item again - even while this flush is occurring, - // the last thing the writer will do, after updating contents, is set_dirty(true) - // That prevents dropping an item from cache before disk is updated to latest in mem. - v.set_dirty(false); + let mut removes = Vec::default(); + let disk = self.storage.disk.as_ref().unwrap(); - // soon, this will update disk from the in-mem contents + let mut updates = Vec::default(); + let m = Measure::start("flush_scan"); + // scan and update loop + { + let map = self.map().read().unwrap(); + for (k, v) in map.iter() { + if v.dirty() { + // step 1: clear the dirty flag + // step 2: perform the update on disk based on the fields in the entry + // If a parallel operation dirties the item again - even while this flush is occurring, + // the last thing the writer will do, after updating contents, is set_dirty(true) + // That prevents dropping an item from cache before disk is updated to latest in mem. + v.set_dirty(false); + + updates.push((*k, Arc::clone(v))); + } + + if self.should_remove_from_mem(current_age, v) { + removes.push(*k); + } } - */ } + Self::update_time_stat(&self.stats().flush_scan_us, m); + + let mut flush_entries_updated_on_disk = 0; + // happens outside of lock on in-mem cache + // it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok. + let m = Measure::start("flush_update"); + for (k, v) in updates.into_iter() { + if v.dirty() { + continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed + } + flush_entries_updated_on_disk += 1; + disk.insert(&k, (&v.slot_list.read().unwrap(), v.ref_count())); + } + Self::update_time_stat(&self.stats().flush_update_us, m); + Self::update_stat( + &self.stats().flush_entries_updated_on_disk, + flush_entries_updated_on_disk, + ); + + // loop that processes 'removes' will go here + if iterate_for_age { // completed iteration of the buckets at the current age assert_eq!(current_age, self.storage.current_age());