Update tests and make sure we don't double clear the new bucket.
This commit is contained in:
parent
275be9ddf0
commit
21f2b937c3
|
@ -123,8 +123,6 @@ impl AtomicWindowedHistogram {
|
|||
let index = self.index.load(Ordering::Acquire);
|
||||
let actual_index = index % self.bucket_count;
|
||||
|
||||
//println!("upkeep -> not yet time, index={} bucket={}", index, actual_index);
|
||||
|
||||
return actual_index;
|
||||
}
|
||||
|
||||
|
@ -133,12 +131,10 @@ impl AtomicWindowedHistogram {
|
|||
// index will be ahead of index until upkeep is complete.
|
||||
let mut upkeep_in_progress = false;
|
||||
let mut index = 0;
|
||||
//println!("upkeep -> checking to make sure upkeep not running");
|
||||
loop {
|
||||
index = self.index.load(Ordering::Acquire);
|
||||
let upkeep_index = self.upkeep_index.load(Ordering::Acquire);
|
||||
if index == upkeep_index {
|
||||
//println!("upkeep -> operation concluded/no operation running");
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -149,7 +145,6 @@ impl AtomicWindowedHistogram {
|
|||
// If we waited for another upkeep operation to complete, then there's the chance that
|
||||
// enough time has passed that we're due for upkeep again, so restart our loop.
|
||||
if upkeep_in_progress {
|
||||
//println!("upkeep -> restarting loop as upkeep was running");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -158,7 +153,6 @@ impl AtomicWindowedHistogram {
|
|||
// one bucket, but may need to clear them all.
|
||||
let delta = now - next_upkeep;
|
||||
let bucket_depth = cmp::min((delta / self.granularity) as usize, self.bucket_count) + 1;
|
||||
//println!("upkeep -> clearing {} buckets", bucket_depth);
|
||||
|
||||
// Now that we we know how many buckets we need to clear, update the index to pointer
|
||||
// writers at the next bucket past the last one that we will be clearing.
|
||||
|
@ -167,33 +161,31 @@ impl AtomicWindowedHistogram {
|
|||
.index
|
||||
.compare_and_swap(index, new_index, Ordering::SeqCst);
|
||||
if prev_index == index {
|
||||
//println!("upkeep -> updated index from {} to {} (bucket #{} -> #{})", prev_index, new_index, prev_index % self.bucket_count, new_index % self.bucket_count);
|
||||
// We won the race to update the index, so we're going to do two things:
|
||||
// - update the next upkeep time value
|
||||
// - actually do upkeep and clear the buckets
|
||||
// Clear the target bucket first, and then update the upkeep target time so new
|
||||
// writers can proceed. We may still have other buckets to clean up if we had
|
||||
// multiple rounds worth of upkeep to do, but this will let new writes proceed as
|
||||
// soon as possible.
|
||||
let clear_index = new_index % self.bucket_count;
|
||||
self.buckets[clear_index].clear();
|
||||
|
||||
// We have to update the upkeep target so that writers can proceed through the "is
|
||||
// it time for upkeep?" check. We do this before the actual upkeep so writers can
|
||||
// proceed as soon as possible after the index update.
|
||||
let now = self.clock.now();
|
||||
let next_upkeep = now + self.granularity;
|
||||
self.next_upkeep.store(next_upkeep, Ordering::Release);
|
||||
//println!("upkeep -> next upkeep set as {} (now={})", next_upkeep, now);
|
||||
|
||||
// Now we clear the buckets. We want to clear everything from the starting value
|
||||
// of index up to the new index we've set. We don't want to clear the now-current
|
||||
// bucket because we'd be clearing very recent values.
|
||||
while index < new_index {
|
||||
// Now that we've cleared the actual bucket that writers will use going forward, we
|
||||
// have to clear any older buckets that we skipped over. If our granularity was 1
|
||||
// second, and we skipped over 4 seconds worth of buckets, we would still have
|
||||
// 3 buckets to clear, etc.
|
||||
let last_index = new_index - 1;
|
||||
while index < last_index {
|
||||
index += 1;
|
||||
let clear_index = index % self.bucket_count;
|
||||
self.buckets[clear_index].clear();
|
||||
//println!("upkeep -> cleared bucket #{}", clear_index);
|
||||
}
|
||||
|
||||
// We've cleared the old buckets, so upkeep is done. Push our upkeep index forward
|
||||
// so that writers who were blocked waiting for upkeep to conclude can restart.
|
||||
self.upkeep_index.store(new_index, Ordering::Release);
|
||||
//println!("upkeep -> concluded, upkeep index is now {}", new_index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -312,6 +304,21 @@ mod tests {
|
|||
ctl.increment(Duration::from_secs(1));
|
||||
let snapshot = h.snapshot();
|
||||
assert_eq!(snapshot.len(), 0);
|
||||
|
||||
// We should also be able to advance by vast periods of time and observe not only old
|
||||
// values going away but no weird overflow issues or index or anything. This ensures that
|
||||
// our upkeep code functions not just for under-load single bucket rollovers but also "been
|
||||
// idle for a while and just got a write" scenarios.
|
||||
h.record(42);
|
||||
|
||||
let snapshot = h.snapshot();
|
||||
assert_eq!(snapshot.len(), 1);
|
||||
let total: u64 = snapshot.decompress().iter().sum();
|
||||
assert_eq!(total, 42);
|
||||
|
||||
ctl.increment(Duration::from_secs(1000));
|
||||
let snapshot = h.snapshot();
|
||||
assert_eq!(snapshot.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in New Issue