diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 9dd2cb169..5bec058d1 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -184,6 +184,14 @@ impl AccountMapEntryInner { self.meta.dirty.store(value, Ordering::Release) } + /// set dirty to false, return true if was dirty + pub fn clear_dirty(&self) -> bool { + self.meta + .dirty + .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed) + .is_ok() + } + pub fn age(&self) -> Age { self.meta.age.load(Ordering::Relaxed) } diff --git a/runtime/src/bucket_map_holder_stats.rs b/runtime/src/bucket_map_holder_stats.rs index 7d5de67ed..68d971c05 100644 --- a/runtime/src/bucket_map_holder_stats.rs +++ b/runtime/src/bucket_map_holder_stats.rs @@ -36,8 +36,7 @@ pub struct BucketMapHolderStats { pub get_range_us: AtomicU64, last_age: AtomicU8, last_age_time: AtomicU64, - pub flush_scan_us: AtomicU64, - pub flush_update_us: AtomicU64, + pub flush_scan_update_us: AtomicU64, pub flush_remove_us: AtomicU64, pub flush_grow_us: AtomicU64, last_time: AtomicInterval, @@ -251,13 +250,8 @@ impl BucketMapHolderStats { ("keys", self.keys.swap(0, Ordering::Relaxed), i64), ("ms_per_age", ms_per_age, i64), ( - "flush_scan_us", - self.flush_scan_us.swap(0, Ordering::Relaxed), - i64 - ), - ( - "flush_update_us", - self.flush_update_us.swap(0, Ordering::Relaxed), + "flush_scan_update_us", + self.flush_scan_update_us.swap(0, Ordering::Relaxed), i64 ), ( diff --git a/runtime/src/in_mem_accounts_index.rs b/runtime/src/in_mem_accounts_index.rs index 1229be4ee..a5817116d 100644 --- a/runtime/src/in_mem_accounts_index.rs +++ b/runtime/src/in_mem_accounts_index.rs @@ -675,27 +675,36 @@ impl InMemAccountsIndex { // may have to loop if disk has to grow and we have to restart loop { - let mut removes = Vec::default(); + let mut removes; let mut removes_random = Vec::default(); let disk = self.bucket.as_ref().unwrap(); - let mut updates = Vec::default(); - let m = Measure::start("flush_scan"); let mut flush_entries_updated_on_disk = 0; let mut disk_resize = Ok(()); // scan and update loop + // holds read lock { let map = self.map().read().unwrap(); + removes = Vec::with_capacity(map.len()); + let m = Measure::start("flush_scan_and_update"); // we don't care about lock time in this metric - bg threads can wait for (k, v) in map.iter() { - if v.dirty() { + if v.clear_dirty() { // step 1: clear the dirty flag // step 2: perform the update on disk based on the fields in the entry // If a parallel operation dirties the item again - even while this flush is occurring, // the last thing the writer will do, after updating contents, is set_dirty(true) // That prevents dropping an item from cache before disk is updated to latest in mem. - v.set_dirty(false); - - updates.push((*k, Arc::clone(v))); + // happens inside of lock on in-mem cache. This is because of deleting items + // it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok. + disk_resize = + disk.try_write(k, (&v.slot_list.read().unwrap(), v.ref_count())); + if disk_resize.is_ok() { + flush_entries_updated_on_disk += 1; + } else { + // disk needs to resize, so mark all unprocessed items as dirty again so we pick them up after the resize + v.set_dirty(true); + break; + } } if self.should_remove_from_mem(current_age, v, startup) { @@ -704,33 +713,12 @@ impl InMemAccountsIndex { removes_random.push(*k); } } - Self::update_time_stat(&self.stats().flush_scan_us, m); - - // happens inside of lock on in-mem cache. This is because of deleting items - // it is possible that the item in the cache is marked as dirty while these updates are happening. That is ok. - let m = Measure::start("flush_update"); - for (k, v) in updates.into_iter() { - if v.dirty() { - continue; // marked dirty after we grabbed it above, so handle this the next time this bucket is flushed - } - if disk_resize.is_ok() { - disk_resize = - disk.try_write(&k, (&v.slot_list.read().unwrap(), v.ref_count())); - if disk_resize.is_ok() { - flush_entries_updated_on_disk += 1; - } - } - if disk_resize.is_err() { - // disk needs to resize, so mark all unprocessed items as dirty again so we pick them up after the resize - v.set_dirty(true); - } - } - Self::update_time_stat(&self.stats().flush_update_us, m); - Self::update_stat( - &self.stats().flush_entries_updated_on_disk, - flush_entries_updated_on_disk, - ); + Self::update_time_stat(&self.stats().flush_scan_update_us, m); } + Self::update_stat( + &self.stats().flush_entries_updated_on_disk, + flush_entries_updated_on_disk, + ); let m = Measure::start("flush_remove_or_grow"); match disk_resize {