Rework upkeep in AtomicWindowedHistogram.

We've reworked upkeep a bit to avoid any datapoint loss during upkeep,
and to make it a bit easier to grok and follow along with.  We also
added a better "gauntlet" test to atempt to hit with with multiple
writers and eke out any glaring issues, as well as a bench test for it.
This commit is contained in:
Toby Lawrence 2019-06-03 17:13:42 -04:00
parent 0a3e8bbff6
commit 275be9ddf0
No known key found for this signature in database
GPG Key ID: 3BB201B0EEE9212E
4 changed files with 251 additions and 33 deletions

View File

@ -26,6 +26,10 @@ default = ["exporters", "recorders"]
exporters = ["metrics-exporter-log", "metrics-exporter-http"]
recorders = ["metrics-recorder-text", "metrics-recorder-prometheus"]
[[bench]]
name = "histogram"
harness = false
[dependencies]
metrics-core = { path = "../metrics-core", version = "^0.3" }
metrics-util = { path = "../metrics-util", version = "^0.2" }
@ -36,6 +40,7 @@ parking_lot = "^0.7"
hashbrown = "^0.3"
quanta = "^0.3"
futures = "^0.1"
crossbeam-utils = "^0.6"
metrics-exporter-log = { path = "../metrics-exporter-log", version = "^0.2", optional = true }
metrics-exporter-http = { path = "../metrics-exporter-http", version = "^0.1", optional = true }
metrics-recorder-text = { path = "../metrics-recorder-text", version = "^0.2", optional = true }
@ -46,3 +51,5 @@ log = "^0.4"
env_logger = "^0.6"
getopts = "^0.2"
hdrhistogram = "^6.1"
criterion = "^0.2.9"
lazy_static = "^1.3"

View File

@ -0,0 +1,77 @@
#[macro_use]
extern crate criterion;
#[macro_use]
extern crate lazy_static;
use criterion::{Benchmark, Criterion, Throughput};
use metrics::data::AtomicWindowedHistogram;
use quanta::{Builder as UpkeepBuilder, Clock, Handle as UpkeepHandle};
use std::time::Duration;
lazy_static! {
static ref QUANTA_UPKEEP: UpkeepHandle = {
let builder = UpkeepBuilder::new(Duration::from_millis(10));
let handle = builder
.start()
.expect("failed to start quanta upkeep thread");
handle
};
static ref RANDOM_INTS: Vec<u64> = vec![
21061184, 21301862, 21331592, 21457012, 21500016, 21537837, 21581557, 21620030, 21664102,
21678463, 21708437, 21751808, 21845243, 21850265, 21938879, 21971014, 22005842, 22034601,
22085552, 22101746, 22115429, 22139883, 22260209, 22270768, 22298080, 22299780, 22307659,
22354697, 22355668, 22359397, 22463872, 22496590, 22590978, 22603740, 22706352, 22820895,
22849491, 22891538, 22912955, 22919915, 22928920, 22968656, 22985992, 23033739, 23061395,
23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058,
23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769,
25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202,
26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268,
26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584,
27024044, 27057184, 23061395, 23077554, 23138588, 23185172, 23282479, 23290830, 23316844,
23386911, 23641319, 23677058, 23742930, 25350389, 25399746, 25404925, 25464391, 25478415,
25480015, 25632783, 25639769, 25645612, 25688228, 25724427, 25862192, 25954476, 25994479,
26008752, 26036460, 26038202, 26078874, 26118327, 26132679, 26207601, 26262418, 26270737,
26274860, 26431248, 26434268, 26562736, 26580134, 26593740, 26618561, 26844181, 26866971,
26907883, 27005270, 27023584, 27024044, 27057184, 23061395, 23077554, 23138588, 23185172,
23282479, 23290830, 23316844, 23386911, 23641319, 23677058, 23742930, 25350389, 25399746,
25404925, 25464391, 25478415, 25480015, 25632783, 25639769, 25645612, 25688228, 25724427,
25862192, 25954476, 25994479, 26008752, 26036460, 26038202, 26078874, 26118327, 26132679,
26207601, 26262418, 26270737, 26274860, 26431248, 26434268, 26562736, 26580134, 26593740,
26618561, 26844181, 26866971, 26907883, 27005270, 27023584, 27024044, 27057184, 23061395,
23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058,
23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769,
25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202,
26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268,
26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584,
27024044, 27057184, 27088034, 27088550, 27302898, 27353925, 27412984, 27488633, 27514155,
27558052, 27601937, 27606339, 27624514, 27680396, 27684064, 27963602, 27414982, 28450673
];
}
fn bucket_benchmark(c: &mut Criterion) {
// Trigger the quanta upkeep thread to spawn and start updating the time.
let _handle = &QUANTA_UPKEEP;
c.bench(
"histogram",
Benchmark::new("record", |b| {
let clock = Clock::new();
let bucket = AtomicWindowedHistogram::new(
Duration::from_secs(1),
Duration::from_millis(100),
clock,
);
b.iter(|| {
for value in RANDOM_INTS.iter() {
bucket.record(*value);
}
})
})
.throughput(Throughput::Elements(RANDOM_INTS.len() as u32)),
);
}
criterion_group!(benches, bucket_benchmark);
criterion_main!(benches);

View File

@ -1,7 +1,9 @@
use crate::common::{Delta, MetricValue};
use crate::helper::duration_as_nanos;
use crossbeam_utils::Backoff;
use metrics_util::{AtomicBucket, StreamingIntegers};
use quanta::Clock;
use std::cmp;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::Duration;
@ -29,20 +31,39 @@ impl From<MetricValue> for Histogram {
}
}
/// An atomic windowed histogram.
///
/// This histogram provides a windowed view of values that rolls forward over time, dropping old
/// values as they exceed the window of the histogram. Writes into the histogram are lock-free, as
/// well as snapshots of the histogram.
#[derive(Debug)]
pub struct AtomicWindowedHistogram {
buckets: Vec<AtomicBucket<u64>>,
bucket_count: usize,
granularity: u64,
upkeep_index: AtomicUsize,
index: AtomicUsize,
next_upkeep: AtomicU64,
clock: Clock,
}
impl AtomicWindowedHistogram {
/// Creates a new [`AtomicWindowedHistogram`].
///
/// Internally, a number of buckets will be created, based on how many times `granularity` goes
/// into `window`. As time passes, buckets will be cleared to avoid values older than the
/// `window` duration.
///
/// As buckets will hold values represneting a period of time up to `granularity`, the
/// granularity can be lowered or raised to roll values off more precisely, or less precisely,
/// against the provided clock.
///
/// # Panics
/// Panics if `granularity` is larger than `window`.
pub fn new(window: Duration, granularity: Duration, clock: Clock) -> Self {
let window_ns = duration_as_nanos(window);
let granularity_ns = duration_as_nanos(granularity);
assert!(window_ns > granularity_ns);
let now = clock.recent();
let bucket_count = ((window_ns / granularity_ns) as usize) + 1;
@ -57,62 +78,122 @@ impl AtomicWindowedHistogram {
buckets,
bucket_count,
granularity: granularity_ns,
upkeep_index: AtomicUsize::new(0),
index: AtomicUsize::new(0),
next_upkeep: AtomicU64::new(next_upkeep),
clock,
}
}
/// Takes a snapshot of the current histogram.
///
/// Returns a [`StreamingIntegers`] value, representing all observed values in the
/// histogram. As writes happen concurrently, along with buckets being cleared, a snapshot is
/// not guaranteed to have all values present at the time the method was called.
pub fn snapshot(&self) -> StreamingIntegers {
// Run upkeep to make sure our window reflects any time passage since the last write.
let _ = self.upkeep();
let index = self.upkeep();
let mut streaming = StreamingIntegers::new();
for bucket in &self.buckets {
// Start from the bucket ahead of the currently-being-written-to-bucket so that we outrace
// any upkeep and get access to more of the data.
for i in 0..self.bucket_count {
let bucket_index = (index + i + 1) % self.bucket_count;
let bucket = &self.buckets[bucket_index];
bucket.data_with(|block| streaming.compress(block));
}
streaming
}
/// Records a value to the histogram.
pub fn record(&self, value: u64) {
let index = self.upkeep();
self.buckets[index].push(value);
}
fn upkeep(&self) -> usize {
loop {
let now = self.clock.recent();
let index = self.index.load(Ordering::Acquire);
let backoff = Backoff::new();
// See if we need to update the index because we're past our upkeep target.
loop {
// Start by figuring out if the histogram needs to perform upkeep.
let now = self.clock.recent();
let next_upkeep = self.next_upkeep.load(Ordering::Acquire);
if now < next_upkeep {
return index;
if now <= next_upkeep {
let index = self.index.load(Ordering::Acquire);
let actual_index = index % self.bucket_count;
//println!("upkeep -> not yet time, index={} bucket={}", index, actual_index);
return actual_index;
}
let new_index = (index + 1) % self.bucket_count;
if self
.index
.compare_and_swap(index, new_index, Ordering::AcqRel)
== index
{
// If we've had to update the index, go ahead and clear the bucket in front of our
// new bucket. Since we write low to high/left to right, the "oldest" bucket,
// which is the one that should be dropped, is the one we just updated our index
// to, but we always add an extra bucket on top of what we need so that we can
// clear that one, instead of clearing the one we're going to be writing to next so
// that we don't clear the values of writers who start writing to the new bucket
// while we're doing the clear.
self.buckets[new_index].clear();
// We do need to perform upkeep, but someone *else* might actually be doing it already,
// so go ahead and wait until the index is caught up with the upkeep index: the upkeep
// index will be ahead of index until upkeep is complete.
let mut upkeep_in_progress = false;
let mut index = 0;
//println!("upkeep -> checking to make sure upkeep not running");
loop {
index = self.index.load(Ordering::Acquire);
let upkeep_index = self.upkeep_index.load(Ordering::Acquire);
if index == upkeep_index {
//println!("upkeep -> operation concluded/no operation running");
break;
}
// Since another write could outrun us, just do a single CAS. 99.99999999% of the
// time, the CAS will go through, because it takes nanoseconds and our granularity
// will be in the hundreds of milliseconds, if not seconds.
self.next_upkeep.compare_and_swap(
next_upkeep,
next_upkeep + self.granularity,
Ordering::AcqRel,
);
upkeep_in_progress = true;
backoff.snooze();
}
// If we waited for another upkeep operation to complete, then there's the chance that
// enough time has passed that we're due for upkeep again, so restart our loop.
if upkeep_in_progress {
//println!("upkeep -> restarting loop as upkeep was running");
continue;
}
// Figure out how many buckets, up to the maximum, need to be cleared based on the
// delta between the target upkeep time and the actual time. We always clear at least
// one bucket, but may need to clear them all.
let delta = now - next_upkeep;
let bucket_depth = cmp::min((delta / self.granularity) as usize, self.bucket_count) + 1;
//println!("upkeep -> clearing {} buckets", bucket_depth);
// Now that we we know how many buckets we need to clear, update the index to pointer
// writers at the next bucket past the last one that we will be clearing.
let new_index = index + bucket_depth;
let prev_index = self
.index
.compare_and_swap(index, new_index, Ordering::SeqCst);
if prev_index == index {
//println!("upkeep -> updated index from {} to {} (bucket #{} -> #{})", prev_index, new_index, prev_index % self.bucket_count, new_index % self.bucket_count);
// We won the race to update the index, so we're going to do two things:
// - update the next upkeep time value
// - actually do upkeep and clear the buckets
// We have to update the upkeep target so that writers can proceed through the "is
// it time for upkeep?" check. We do this before the actual upkeep so writers can
// proceed as soon as possible after the index update.
let now = self.clock.now();
let next_upkeep = now + self.granularity;
self.next_upkeep.store(next_upkeep, Ordering::Release);
//println!("upkeep -> next upkeep set as {} (now={})", next_upkeep, now);
// Now we clear the buckets. We want to clear everything from the starting value
// of index up to the new index we've set. We don't want to clear the now-current
// bucket because we'd be clearing very recent values.
while index < new_index {
index += 1;
let clear_index = index % self.bucket_count;
self.buckets[clear_index].clear();
//println!("upkeep -> cleared bucket #{}", clear_index);
}
// We've cleared the old buckets, so upkeep is done. Push our upkeep index forward
// so that writers who were blocked waiting for upkeep to conclude can restart.
self.upkeep_index.store(new_index, Ordering::Release);
//println!("upkeep -> concluded, upkeep index is now {}", new_index);
}
}
}
@ -121,6 +202,7 @@ impl AtomicWindowedHistogram {
#[cfg(test)]
mod tests {
use super::{AtomicWindowedHistogram, Clock};
use crossbeam_utils::thread;
use std::time::Duration;
#[test]
@ -162,7 +244,12 @@ mod tests {
#[test]
fn test_windowed_histogram_rollover() {
let (clock, ctl) = Clock::mock();
let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock);
// Set our granularity at right below a second, so that when we when add a second, we don't
// land on the same exact value, and our "now" time should always be ahead of the upkeep
// time when we expect it to be.
let h =
AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_millis(999), clock);
// Histogram is empty, snapshot is empty.
let snapshot = h.snapshot();
@ -226,4 +313,52 @@ mod tests {
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 0);
}
#[test]
fn test_histogram_write_gauntlet_mt() {
let clock = Clock::new();
let clock2 = clock.clone();
let target = clock.now() + Duration::from_secs(5).as_nanos() as u64;
let h = AtomicWindowedHistogram::new(
Duration::from_secs(20),
Duration::from_millis(500),
clock,
);
thread::scope(|s| {
let t1 = s.spawn(|_| {
let mut total = 0;
while clock2.now() < target {
h.record(42);
total += 1;
}
total
});
let t2 = s.spawn(|_| {
let mut total = 0;
while clock2.now() < target {
h.record(42);
total += 1;
}
total
});
let t3 = s.spawn(|_| {
let mut total = 0;
while clock2.now() < target {
h.record(42);
total += 1;
}
total
});
let t1_total = t1.join().expect("thread 1 panicked during test");
let t2_total = t2.join().expect("thread 2 panicked during test");
let t3_total = t3.join().expect("thread 3 panicked during test");
let total = t1_total + t2_total + t3_total;
let snap = h.snapshot();
assert_eq!(total, snap.len());
})
.unwrap();
}
}

View File

@ -6,8 +6,7 @@ mod gauge;
pub use gauge::Gauge;
mod histogram;
pub(crate) use histogram::AtomicWindowedHistogram;
pub use histogram::Histogram;
pub use histogram::{AtomicWindowedHistogram, Histogram};
mod snapshot;
pub use snapshot::Snapshot;