From 7f3d445af534ac39c1c04b722dc3ffe87e2c8894 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Tue, 21 Sep 2021 10:52:39 -0500 Subject: [PATCH] AcctIdx: Orderings and cleanup (#20046) --- runtime/src/bucket_map_holder.rs | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/runtime/src/bucket_map_holder.rs b/runtime/src/bucket_map_holder.rs index 05367511f..1d342cd48 100644 --- a/runtime/src/bucket_map_holder.rs +++ b/runtime/src/bucket_map_holder.rs @@ -12,7 +12,6 @@ pub type Age = u8; const AGE_MS: u64 = SLOT_MS; // match one age per slot time -// will eventually hold the bucket map pub struct BucketMapHolder { pub disk: Option>>, @@ -49,11 +48,12 @@ impl Debug for BucketMapHolder { #[allow(clippy::mutex_atomic)] impl BucketMapHolder { pub fn increment_age(&self) { + // since we are about to change age, there are now 0 buckets that have been flushed at this age + // this should happen before the age.fetch_add + let previous = self.count_ages_flushed.swap(0, Ordering::Acquire); // fetch_add is defined to wrap. // That's what we want. 0..255, then back to 0. - self.age.fetch_add(1, Ordering::Relaxed); - // since we changed age, there are now 0 buckets that have been flushed at this age - let previous = self.count_ages_flushed.swap(0, Ordering::Relaxed); + self.age.fetch_add(1, Ordering::Release); assert!(previous >= self.bins); // we should not have increased age before previous age was fully flushed self.wait_dirty_or_aged.notify_all(); // notify all because we can age scan in parallel } @@ -84,11 +84,12 @@ impl BucketMapHolder { } pub fn current_age(&self) -> Age { - self.age.load(Ordering::Relaxed) + self.age.load(Ordering::Acquire) } pub fn bucket_flushed_at_current_age(&self) { - self.count_ages_flushed.fetch_add(1, Ordering::Acquire); + self.count_ages_flushed.fetch_add(1, Ordering::Release); + self.maybe_advance_age(); } // have all buckets been flushed at the current age? @@ -97,7 +98,7 @@ impl BucketMapHolder { } pub fn count_ages_flushed(&self) -> usize { - self.count_ages_flushed.load(Ordering::Relaxed) + self.count_ages_flushed.load(Ordering::Acquire) } pub fn maybe_advance_age(&self) -> bool { @@ -193,9 +194,11 @@ pub mod tests { // inc all for _ in 0..bins { assert!(!test.all_buckets_flushed_at_current_age()); - test.bucket_flushed_at_current_age(); + // cannot call this because based on timing, it may fire: test.bucket_flushed_at_current_age(); } + // this would normally happen once time went off and all buckets had been flushed at the previous age + test.count_ages_flushed.fetch_add(bins, Ordering::Release); test.increment_age(); } } @@ -226,16 +229,12 @@ pub mod tests { let bins = 4; let test = BucketMapHolder::::new(bins, &Some(AccountsIndexConfig::default())); assert_eq!(test.current_age(), 0); - assert!(!test.all_buckets_flushed_at_current_age()); - // inc all but 1 - for _ in 1..bins { - test.bucket_flushed_at_current_age(); + for _ in 0..bins { assert!(!test.all_buckets_flushed_at_current_age()); + test.bucket_flushed_at_current_age(); } - test.bucket_flushed_at_current_age(); - assert!(test.all_buckets_flushed_at_current_age()); - test.increment_age(); - + std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2)); + test.maybe_advance_age(); assert_eq!(test.current_age(), 1); assert!(!test.all_buckets_flushed_at_current_age()); }