AcctIdx: Orderings and cleanup (#20046)
This commit is contained in:
parent
013e1d9d49
commit
7f3d445af5
|
@ -12,7 +12,6 @@ pub type Age = u8;
|
|||
|
||||
const AGE_MS: u64 = SLOT_MS; // match one age per slot time
|
||||
|
||||
// will eventually hold the bucket map
|
||||
pub struct BucketMapHolder<T: IndexValue> {
|
||||
pub disk: Option<BucketMap<SlotT<T>>>,
|
||||
|
||||
|
@ -49,11 +48,12 @@ impl<T: IndexValue> Debug for BucketMapHolder<T> {
|
|||
#[allow(clippy::mutex_atomic)]
|
||||
impl<T: IndexValue> BucketMapHolder<T> {
|
||||
pub fn increment_age(&self) {
|
||||
// since we are about to change age, there are now 0 buckets that have been flushed at this age
|
||||
// this should happen before the age.fetch_add
|
||||
let previous = self.count_ages_flushed.swap(0, Ordering::Acquire);
|
||||
// fetch_add is defined to wrap.
|
||||
// That's what we want. 0..255, then back to 0.
|
||||
self.age.fetch_add(1, Ordering::Relaxed);
|
||||
// since we changed age, there are now 0 buckets that have been flushed at this age
|
||||
let previous = self.count_ages_flushed.swap(0, Ordering::Relaxed);
|
||||
self.age.fetch_add(1, Ordering::Release);
|
||||
assert!(previous >= self.bins); // we should not have increased age before previous age was fully flushed
|
||||
self.wait_dirty_or_aged.notify_all(); // notify all because we can age scan in parallel
|
||||
}
|
||||
|
@ -84,11 +84,12 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||
}
|
||||
|
||||
pub fn current_age(&self) -> Age {
|
||||
self.age.load(Ordering::Relaxed)
|
||||
self.age.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub fn bucket_flushed_at_current_age(&self) {
|
||||
self.count_ages_flushed.fetch_add(1, Ordering::Acquire);
|
||||
self.count_ages_flushed.fetch_add(1, Ordering::Release);
|
||||
self.maybe_advance_age();
|
||||
}
|
||||
|
||||
// have all buckets been flushed at the current age?
|
||||
|
@ -97,7 +98,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||
}
|
||||
|
||||
pub fn count_ages_flushed(&self) -> usize {
|
||||
self.count_ages_flushed.load(Ordering::Relaxed)
|
||||
self.count_ages_flushed.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub fn maybe_advance_age(&self) -> bool {
|
||||
|
@ -193,9 +194,11 @@ pub mod tests {
|
|||
// inc all
|
||||
for _ in 0..bins {
|
||||
assert!(!test.all_buckets_flushed_at_current_age());
|
||||
test.bucket_flushed_at_current_age();
|
||||
// cannot call this because based on timing, it may fire: test.bucket_flushed_at_current_age();
|
||||
}
|
||||
|
||||
// this would normally happen once time went off and all buckets had been flushed at the previous age
|
||||
test.count_ages_flushed.fetch_add(bins, Ordering::Release);
|
||||
test.increment_age();
|
||||
}
|
||||
}
|
||||
|
@ -226,16 +229,12 @@ pub mod tests {
|
|||
let bins = 4;
|
||||
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()));
|
||||
assert_eq!(test.current_age(), 0);
|
||||
assert!(!test.all_buckets_flushed_at_current_age());
|
||||
// inc all but 1
|
||||
for _ in 1..bins {
|
||||
test.bucket_flushed_at_current_age();
|
||||
for _ in 0..bins {
|
||||
assert!(!test.all_buckets_flushed_at_current_age());
|
||||
test.bucket_flushed_at_current_age();
|
||||
}
|
||||
test.bucket_flushed_at_current_age();
|
||||
assert!(test.all_buckets_flushed_at_current_age());
|
||||
test.increment_age();
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2));
|
||||
test.maybe_advance_age();
|
||||
assert_eq!(test.current_age(), 1);
|
||||
assert!(!test.all_buckets_flushed_at_current_age());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue