add support for culling idle metrics from prometheus output

This commit is contained in:
Toby Lawrence 2020-11-18 18:43:48 -05:00
parent 7ef47304ed
commit 8307366bba
7 changed files with 244 additions and 78 deletions

View File

@ -25,6 +25,7 @@ metrics-util = { version = "0.4.0-alpha.1", path = "../metrics-util" }
hdrhistogram = "7.1"
parking_lot = "0.11"
thiserror = "1.0"
quanta = "0.6"
# Optional
hyper = { version = "0.13", default-features = false, features = ["tcp"], optional = true }

View File

@ -11,6 +11,7 @@ fn main() {
let builder = PrometheusBuilder::new();
builder
.idle_timeout(Some(Duration::from_secs(10)))
.install()
.expect("failed to install Prometheus recorder");
@ -32,6 +33,8 @@ fn main() {
let mut clock = Clock::new();
let mut last = None;
increment!("idle_metric");
// Loop over and over, pretending to do some work.
loop {
increment!("tcp_server_loops", "system" => "foo");

View File

@ -12,13 +12,16 @@ use metrics::{Key, Recorder, SetRecorderError, Unit};
use metrics_util::{
parse_quantiles, CompositeKey, Handle, Histogram, MetricKind, Quantile, Registry,
};
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use quanta::{Clock, Instant};
use std::io;
use std::iter::FromIterator;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::ops::DerefMut;
use std::sync::Arc;
#[cfg(feature = "tokio-exporter")]
use std::thread;
use std::time::Duration;
use std::{collections::HashMap, time::SystemTime};
use thiserror::Error as ThisError;
#[cfg(feature = "tokio-exporter")]
@ -66,8 +69,10 @@ struct Snapshot {
pub distributions: HashMap<String, HashMap<Vec<String>, Distribution>>,
}
struct Inner {
pub(crate) struct Inner {
registry: PrometheusRegistry,
recency: Mutex<(Clock, HashMap<CompositeKey, (usize, Instant)>)>,
idle_timeout: Option<Duration>,
distributions: RwLock<HashMap<String, HashMap<Vec<String>, Distribution>>>,
quantiles: Vec<Quantile>,
buckets: Vec<u64>,
@ -80,9 +85,44 @@ impl Inner {
&self.registry
}
fn should_store(
&self,
key: &CompositeKey,
current_gen: usize,
clock: &mut Clock,
recency: &mut HashMap<CompositeKey, (usize, Instant)>,
) -> bool {
if let Some(idle_timeout) = self.idle_timeout {
let now = clock.now();
if let Some((last_gen, last_update)) = recency.get_mut(&key) {
// If the value is the same as the latest value we have internally, and
// we're over the idle timeout period, then remove it and continue.
if *last_gen == current_gen {
if (now - *last_update) > idle_timeout {
// If the delete returns false, that means that our generation counter is
// out-of-date, and that the metric has been updated since, so we don't
// actually want to delete it yet.
if self.registry.delete(&key, current_gen) {
return false;
}
}
} else {
// Value has changed, so mark it such.
*last_update = now;
}
} else {
recency.insert(key.clone(), (current_gen, now));
}
}
true
}
fn get_recent_metrics(&self) -> Snapshot {
let metrics = self.registry.get_handles();
let mut rg = self.recency.lock();
let (clock, recency) = rg.deref_mut();
let mut counters = HashMap::new();
let mut gauges = HashMap::new();
@ -93,64 +133,76 @@ impl Inner {
.unwrap_or_else(|| vec![]);
sorted_overrides.sort_by(|(a, _), (b, _)| b.len().cmp(&a.len()));
for (key, handle) in metrics.into_iter() {
let (kind, key) = key.into_parts();
let (name, labels) = key_to_parts(key);
match kind {
for (key, (gen, handle)) in metrics.into_iter() {
match key.kind() {
MetricKind::Counter => {
let entry = counters
.entry(name)
.or_insert_with(|| HashMap::new())
.entry(labels)
.or_insert(0);
*entry = handle.read_counter();
let value = handle.read_counter();
if self.should_store(&key, gen, clock, recency) {
let (_, key) = key.into_parts();
let (name, labels) = key_to_parts(key);
let entry = counters
.entry(name)
.or_insert_with(|| HashMap::new())
.entry(labels)
.or_insert(0);
*entry = value;
}
}
MetricKind::Gauge => {
let entry = gauges
.entry(name)
.or_insert_with(|| HashMap::new())
.entry(labels)
.or_insert(0.0);
*entry = handle.read_gauge();
let value = handle.read_gauge();
if self.should_store(&key, gen, clock, recency) {
let (_, key) = key.into_parts();
let (name, labels) = key_to_parts(key);
let entry = gauges
.entry(name)
.or_insert_with(|| HashMap::new())
.entry(labels)
.or_insert(0.0);
*entry = value;
}
}
MetricKind::Histogram => {
let buckets = sorted_overrides
.iter()
.find(|(k, _)| name.ends_with(*k))
.map(|(_, buckets)| *buckets)
.unwrap_or(&self.buckets);
if self.should_store(&key, gen, clock, recency) {
let (_, key) = key.into_parts();
let (name, labels) = key_to_parts(key);
let mut wg = self.distributions.write();
let entry = wg
.entry(name.clone())
.or_insert_with(|| HashMap::new())
.entry(labels)
.or_insert_with(|| match buckets.is_empty() {
false => {
let histogram = Histogram::new(buckets)
.expect("failed to create histogram with buckets defined");
Distribution::Histogram(histogram)
}
true => {
let summary =
HdrHistogram::new(3).expect("failed to create histogram");
Distribution::Summary(summary, 0)
}
});
let buckets = sorted_overrides
.iter()
.find(|(k, _)| name.ends_with(*k))
.map(|(_, buckets)| *buckets)
.unwrap_or(&self.buckets);
match entry {
Distribution::Histogram(histogram) => handle
.read_histogram_with_clear(|samples| histogram.record_many(samples)),
Distribution::Summary(summary, sum) => {
handle.read_histogram_with_clear(|samples| {
for sample in samples {
let _ = summary.record(*sample);
*sum += *sample;
let mut wg = self.distributions.write();
let entry = wg
.entry(name.clone())
.or_insert_with(|| HashMap::new())
.entry(labels)
.or_insert_with(|| match buckets.is_empty() {
false => {
let histogram = Histogram::new(buckets)
.expect("failed to create histogram with buckets defined");
Distribution::Histogram(histogram)
}
})
true => {
let summary =
HdrHistogram::new(3).expect("failed to create histogram");
Distribution::Summary(summary, 0)
}
});
match entry {
Distribution::Histogram(histogram) => {
handle.read_histogram_with_clear(|samples| {
histogram.record_many(samples)
})
}
Distribution::Summary(summary, sum) => handle
.read_histogram_with_clear(|samples| {
for sample in samples {
let _ = summary.record(*sample);
*sum += *sample;
}
}),
}
}
}
@ -371,6 +423,7 @@ pub struct PrometheusBuilder {
listen_address: SocketAddr,
quantiles: Vec<Quantile>,
buckets: Vec<u64>,
idle_timeout: Option<Duration>,
buckets_by_name: Option<HashMap<String, Vec<u64>>>,
}
@ -383,6 +436,7 @@ impl PrometheusBuilder {
listen_address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000),
quantiles,
buckets: vec![],
idle_timeout: None,
buckets_by_name: None,
}
}
@ -422,6 +476,17 @@ impl PrometheusBuilder {
self
}
/// Sets the idle timeout for metrics.
///
/// If a metric hasn't been updated within this timeout, it will be removed from the registry
/// and in turn removed from the normal scrape output until the metric is emitted again. This
/// behavior is driven by requests to generate rendered output, and so metrics will not be
/// removed unless a request has been made recently enough to prune the idle metrics.
pub fn idle_timeout(mut self, timeout: Option<Duration>) -> Self {
self.idle_timeout = timeout;
self
}
/// Sets the buckets for a specific metric, overidding the default.
///
/// The match is suffix-based, and the longest match found will be used.
@ -470,8 +535,14 @@ impl PrometheusBuilder {
/// Builds the recorder and returns it.
/// This function is only enabled when default features are not set.
pub fn build(self) -> Result<PrometheusRecorder, Error> {
self.build_with_clock(Clock::new())
}
pub(crate) fn build_with_clock(self, clock: Clock) -> Result<PrometheusRecorder, Error> {
let inner = Arc::new(Inner {
registry: Registry::new(),
recency: Mutex::new((clock, HashMap::new())),
idle_timeout: self.idle_timeout,
distributions: RwLock::new(HashMap::new()),
quantiles: self.quantiles.clone(),
buckets: self.buckets.clone(),
@ -496,12 +567,14 @@ impl PrometheusBuilder {
) -> Result<
(
PrometheusRecorder,
impl Future<Output = Result<(), HyperError>> + Send + Sync + 'static,
impl Future<Output = Result<(), HyperError>> + Send + 'static,
),
Error,
> {
let inner = Arc::new(Inner {
registry: Registry::new(),
recency: Mutex::new((Clock::new(), HashMap::new())),
idle_timeout: self.idle_timeout,
distributions: RwLock::new(HashMap::new()),
quantiles: self.quantiles.clone(),
buckets: self.buckets.clone(),
@ -593,15 +666,10 @@ impl Recorder for PrometheusRecorder {
}
fn key_to_parts(key: Key) -> (String, Vec<String>) {
let name = key.name();
let labels = key.labels();
let sanitize = |c| c == '.' || c == '=' || c == '{' || c == '}' || c == '+' || c == '-';
let name = name
.parts()
.map(|s| s.replace(sanitize, "_"))
.collect::<Vec<_>>()
.join("_");
let labels = labels
let name = key.name().to_string().replace(sanitize, "_");
let labels = key
.labels()
.into_iter()
.map(|label| {
let k = label.key();

View File

@ -38,6 +38,17 @@ impl<T> Block<T> {
}
}
// Gets the length of the previous block, if it exists.
pub(crate) fn prev_len(&self, guard: &Guard) -> usize {
let tail = self.prev.load(Ordering::Acquire, guard);
if tail.is_null() {
return 0;
}
let tail_block = unsafe { tail.deref() };
tail_block.len()
}
/// Gets the current length of this block.
pub fn len(&self) -> usize {
self.read.load(Ordering::Acquire).trailing_ones() as usize
@ -123,6 +134,18 @@ impl<T> AtomicBucket<T> {
}
}
/// Checks whether or not this bucket is empty.
pub fn is_empty(&self) -> bool {
let guard = &epoch_pin();
let tail = self.tail.load(Ordering::Acquire, guard);
if tail.is_null() {
return true;
}
let tail_block = unsafe { tail.deref() };
tail_block.len() == 0 && tail_block.prev_len(&guard) == 0
}
/// Pushes an element into the bucket.
pub fn push(&self, value: T) {
let mut original = value;
@ -521,4 +544,23 @@ mod tests {
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 0);
}
#[test]
fn test_bucket_len_and_prev_len() {
let bucket = AtomicBucket::new();
assert!(bucket.is_empty());
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 0);
// Just making sure that `is_empty` holds as we go from
// the first block, to the second block, to exercise the
// `Block::prev_len` codepath.
let mut i = 0;
while i < BLOCK_SIZE * 2 {
bucket.push(i);
assert!(!bucket.is_empty());
i += 1;
}
}
}

View File

@ -116,13 +116,13 @@ impl Snapshotter {
};
for (dk, _) in metrics.into_iter() {
if let Some(h) = handles.get(&dk) {
if let Some((_, h)) = handles.get(&dk) {
collect_metric(dk, h, &self.units, &self.descs, &mut snapshot);
}
}
}
None => {
for (dk, h) in handles.into_iter() {
for (dk, (_, h)) in handles.into_iter() {
collect_metric(dk, &h, &self.units, &self.descs, &mut snapshot);
}
}

View File

@ -107,6 +107,16 @@ impl Handle {
}
}
/// Reads this handle as a histogram, and whether or not it's empty.
///
/// Panics if this handle is not a histogram.
pub fn read_histogram_is_empty(&self) -> bool {
match self {
Handle::Histogram(bucket) => bucket.is_empty(),
_ => panic!("tried to read as histogram"),
}
}
/// Reads this handle as a histogram incrementally into a closure, and clears the histogram.
///
/// The closure `f` passed in is invoked multiple times with slices of values present in the

View File

@ -1,7 +1,35 @@
use core::hash::Hash;
use core::{
hash::Hash,
sync::atomic::{AtomicUsize, Ordering},
};
use dashmap::DashMap;
use std::collections::HashMap;
#[derive(Debug)]
struct Generational<H>(AtomicUsize, H);
impl<H: Clone> Generational<H> {
pub fn new(h: H) -> Generational<H> {
Generational(AtomicUsize::new(0), h)
}
pub fn increment_generation(&self) {
self.0.fetch_add(1, Ordering::Release);
}
pub fn get_generation(&self) -> usize {
self.0.load(Ordering::Acquire)
}
pub fn get_inner(&self) -> &H {
&self.1
}
pub fn to_owned(&self) -> (usize, H) {
(self.get_generation(), self.get_inner().clone())
}
}
/// A high-performance metric registry.
///
/// `Registry` provides the ability to maintain a central listing of metrics mapped by a given key.
@ -14,17 +42,20 @@ use std::collections::HashMap;
/// update the actual metric value(s) as needed. `Handle`, from this crate, is a solid default
/// choice.
///
/// `Registry` handles deduplicating metrics, and will return the `Identifier` for an existing
/// metric if a caller attempts to reregister it.
/// As well, handles have an associated generation counter which is incremented any time an entry is
/// operated on. This generation is returned with the handle when querying the registry, and can be
/// used in order to delete a handle from the registry, allowing callers to prune old/stale handles
/// over time.
///
/// `Registry` is optimized for reads.
pub struct Registry<K, H> {
map: DashMap<K, H>,
map: DashMap<K, Generational<H>>,
}
impl<K, H> Registry<K, H>
where
K: Eq + Hash + Clone,
K: Eq + Hash + Clone + 'static,
H: Clone + 'static,
{
/// Creates a new `Registry`.
pub fn new() -> Self {
@ -44,23 +75,34 @@ where
I: FnOnce() -> H,
O: FnOnce(&H) -> V,
{
let valref = self.map.entry(key).or_insert_with(init);
op(valref.value())
let valref = self.map.entry(key).or_insert_with(|| {
let value = init();
Generational::new(value)
});
let value = valref.value();
let result = op(value.get_inner());
value.increment_generation();
result
}
/// Deletes a handle from the registry.
///
/// The generation of a given key is passed along when querying the registry via
/// [`get_handles`]. If the generation given here does not match the current generation, then
/// the handle will not be removed.
pub fn delete(&self, key: &K, generation: usize) -> bool {
self.map
.remove_if(key, |_, g| g.get_generation() == generation)
.is_some()
}
}
impl<K, H> Registry<K, H>
where
K: Eq + Hash + Clone + 'static,
H: Clone + 'static,
{
/// Gets a map of all present handles, mapped by key.
///
/// Handles must implement `Clone`. This map is a point-in-time snapshot of the registry.
pub fn get_handles(&self) -> HashMap<K, H> {
pub fn get_handles(&self) -> HashMap<K, (usize, H)> {
self.map
.iter()
.map(|item| (item.key().clone(), item.value().clone()))
.map(|item| (item.key().clone(), item.value().to_owned()))
.collect()
}
}