From 21f2b937c3a365b68aff138c879e9bb772c3bbab Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Tue, 4 Jun 2019 16:29:36 -0400 Subject: [PATCH] Update tests and make sure we don't double clear the new bucket. --- metrics/src/data/histogram.rs | 47 ++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/metrics/src/data/histogram.rs b/metrics/src/data/histogram.rs index 10633fd..a1b1187 100644 --- a/metrics/src/data/histogram.rs +++ b/metrics/src/data/histogram.rs @@ -123,8 +123,6 @@ impl AtomicWindowedHistogram { let index = self.index.load(Ordering::Acquire); let actual_index = index % self.bucket_count; - //println!("upkeep -> not yet time, index={} bucket={}", index, actual_index); - return actual_index; } @@ -133,12 +131,10 @@ impl AtomicWindowedHistogram { // index will be ahead of index until upkeep is complete. let mut upkeep_in_progress = false; let mut index = 0; - //println!("upkeep -> checking to make sure upkeep not running"); loop { index = self.index.load(Ordering::Acquire); let upkeep_index = self.upkeep_index.load(Ordering::Acquire); if index == upkeep_index { - //println!("upkeep -> operation concluded/no operation running"); break; } @@ -149,7 +145,6 @@ impl AtomicWindowedHistogram { // If we waited for another upkeep operation to complete, then there's the chance that // enough time has passed that we're due for upkeep again, so restart our loop. if upkeep_in_progress { - //println!("upkeep -> restarting loop as upkeep was running"); continue; } @@ -158,7 +153,6 @@ impl AtomicWindowedHistogram { // one bucket, but may need to clear them all. let delta = now - next_upkeep; let bucket_depth = cmp::min((delta / self.granularity) as usize, self.bucket_count) + 1; - //println!("upkeep -> clearing {} buckets", bucket_depth); // Now that we we know how many buckets we need to clear, update the index to pointer // writers at the next bucket past the last one that we will be clearing. @@ -167,33 +161,31 @@ impl AtomicWindowedHistogram { .index .compare_and_swap(index, new_index, Ordering::SeqCst); if prev_index == index { - //println!("upkeep -> updated index from {} to {} (bucket #{} -> #{})", prev_index, new_index, prev_index % self.bucket_count, new_index % self.bucket_count); - // We won the race to update the index, so we're going to do two things: - // - update the next upkeep time value - // - actually do upkeep and clear the buckets + // Clear the target bucket first, and then update the upkeep target time so new + // writers can proceed. We may still have other buckets to clean up if we had + // multiple rounds worth of upkeep to do, but this will let new writes proceed as + // soon as possible. + let clear_index = new_index % self.bucket_count; + self.buckets[clear_index].clear(); - // We have to update the upkeep target so that writers can proceed through the "is - // it time for upkeep?" check. We do this before the actual upkeep so writers can - // proceed as soon as possible after the index update. let now = self.clock.now(); let next_upkeep = now + self.granularity; self.next_upkeep.store(next_upkeep, Ordering::Release); - //println!("upkeep -> next upkeep set as {} (now={})", next_upkeep, now); - // Now we clear the buckets. We want to clear everything from the starting value - // of index up to the new index we've set. We don't want to clear the now-current - // bucket because we'd be clearing very recent values. - while index < new_index { + // Now that we've cleared the actual bucket that writers will use going forward, we + // have to clear any older buckets that we skipped over. If our granularity was 1 + // second, and we skipped over 4 seconds worth of buckets, we would still have + // 3 buckets to clear, etc. + let last_index = new_index - 1; + while index < last_index { index += 1; let clear_index = index % self.bucket_count; self.buckets[clear_index].clear(); - //println!("upkeep -> cleared bucket #{}", clear_index); } // We've cleared the old buckets, so upkeep is done. Push our upkeep index forward // so that writers who were blocked waiting for upkeep to conclude can restart. self.upkeep_index.store(new_index, Ordering::Release); - //println!("upkeep -> concluded, upkeep index is now {}", new_index); } } } @@ -312,6 +304,21 @@ mod tests { ctl.increment(Duration::from_secs(1)); let snapshot = h.snapshot(); assert_eq!(snapshot.len(), 0); + + // We should also be able to advance by vast periods of time and observe not only old + // values going away but no weird overflow issues or index or anything. This ensures that + // our upkeep code functions not just for under-load single bucket rollovers but also "been + // idle for a while and just got a write" scenarios. + h.record(42); + + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 1); + let total: u64 = snapshot.decompress().iter().sum(); + assert_eq!(total, 42); + + ctl.increment(Duration::from_secs(1000)); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 0); } #[test]