diff --git a/metrics-exporter-prometheus/Cargo.toml b/metrics-exporter-prometheus/Cargo.toml index 0466d8f..773aa82 100644 --- a/metrics-exporter-prometheus/Cargo.toml +++ b/metrics-exporter-prometheus/Cargo.toml @@ -25,6 +25,7 @@ metrics-util = { version = "0.4.0-alpha.1", path = "../metrics-util" } hdrhistogram = "7.1" parking_lot = "0.11" thiserror = "1.0" +quanta = "0.6" # Optional hyper = { version = "0.13", default-features = false, features = ["tcp"], optional = true } diff --git a/metrics-exporter-prometheus/examples/prometheus_server.rs b/metrics-exporter-prometheus/examples/prometheus_server.rs index 3791d27..27f2841 100644 --- a/metrics-exporter-prometheus/examples/prometheus_server.rs +++ b/metrics-exporter-prometheus/examples/prometheus_server.rs @@ -11,6 +11,7 @@ fn main() { let builder = PrometheusBuilder::new(); builder + .idle_timeout(Some(Duration::from_secs(10))) .install() .expect("failed to install Prometheus recorder"); @@ -32,6 +33,8 @@ fn main() { let mut clock = Clock::new(); let mut last = None; + increment!("idle_metric"); + // Loop over and over, pretending to do some work. loop { increment!("tcp_server_loops", "system" => "foo"); diff --git a/metrics-exporter-prometheus/src/lib.rs b/metrics-exporter-prometheus/src/lib.rs index bc0c8b0..3c4bb5d 100644 --- a/metrics-exporter-prometheus/src/lib.rs +++ b/metrics-exporter-prometheus/src/lib.rs @@ -12,13 +12,16 @@ use metrics::{Key, Recorder, SetRecorderError, Unit}; use metrics_util::{ parse_quantiles, CompositeKey, Handle, Histogram, MetricKind, Quantile, Registry, }; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; +use quanta::{Clock, Instant}; use std::io; use std::iter::FromIterator; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::ops::DerefMut; use std::sync::Arc; #[cfg(feature = "tokio-exporter")] use std::thread; +use std::time::Duration; use std::{collections::HashMap, time::SystemTime}; use thiserror::Error as ThisError; #[cfg(feature = "tokio-exporter")] @@ -66,8 +69,10 @@ struct Snapshot { pub distributions: HashMap, Distribution>>, } -struct Inner { +pub(crate) struct Inner { registry: PrometheusRegistry, + recency: Mutex<(Clock, HashMap)>, + idle_timeout: Option, distributions: RwLock, Distribution>>>, quantiles: Vec, buckets: Vec, @@ -80,9 +85,44 @@ impl Inner { &self.registry } + fn should_store( + &self, + key: &CompositeKey, + current_gen: usize, + clock: &mut Clock, + recency: &mut HashMap, + ) -> bool { + if let Some(idle_timeout) = self.idle_timeout { + let now = clock.now(); + if let Some((last_gen, last_update)) = recency.get_mut(&key) { + // If the value is the same as the latest value we have internally, and + // we're over the idle timeout period, then remove it and continue. + if *last_gen == current_gen { + if (now - *last_update) > idle_timeout { + // If the delete returns false, that means that our generation counter is + // out-of-date, and that the metric has been updated since, so we don't + // actually want to delete it yet. + if self.registry.delete(&key, current_gen) { + return false; + } + } + } else { + // Value has changed, so mark it such. + *last_update = now; + } + } else { + recency.insert(key.clone(), (current_gen, now)); + } + } + + true + } + fn get_recent_metrics(&self) -> Snapshot { let metrics = self.registry.get_handles(); + let mut rg = self.recency.lock(); + let (clock, recency) = rg.deref_mut(); let mut counters = HashMap::new(); let mut gauges = HashMap::new(); @@ -93,64 +133,76 @@ impl Inner { .unwrap_or_else(|| vec![]); sorted_overrides.sort_by(|(a, _), (b, _)| b.len().cmp(&a.len())); - for (key, handle) in metrics.into_iter() { - let (kind, key) = key.into_parts(); - let (name, labels) = key_to_parts(key); - - match kind { + for (key, (gen, handle)) in metrics.into_iter() { + match key.kind() { MetricKind::Counter => { - let entry = counters - .entry(name) - .or_insert_with(|| HashMap::new()) - .entry(labels) - .or_insert(0); - - *entry = handle.read_counter(); + let value = handle.read_counter(); + if self.should_store(&key, gen, clock, recency) { + let (_, key) = key.into_parts(); + let (name, labels) = key_to_parts(key); + let entry = counters + .entry(name) + .or_insert_with(|| HashMap::new()) + .entry(labels) + .or_insert(0); + *entry = value; + } } MetricKind::Gauge => { - let entry = gauges - .entry(name) - .or_insert_with(|| HashMap::new()) - .entry(labels) - .or_insert(0.0); - - *entry = handle.read_gauge(); + let value = handle.read_gauge(); + if self.should_store(&key, gen, clock, recency) { + let (_, key) = key.into_parts(); + let (name, labels) = key_to_parts(key); + let entry = gauges + .entry(name) + .or_insert_with(|| HashMap::new()) + .entry(labels) + .or_insert(0.0); + *entry = value; + } } MetricKind::Histogram => { - let buckets = sorted_overrides - .iter() - .find(|(k, _)| name.ends_with(*k)) - .map(|(_, buckets)| *buckets) - .unwrap_or(&self.buckets); + if self.should_store(&key, gen, clock, recency) { + let (_, key) = key.into_parts(); + let (name, labels) = key_to_parts(key); - let mut wg = self.distributions.write(); - let entry = wg - .entry(name.clone()) - .or_insert_with(|| HashMap::new()) - .entry(labels) - .or_insert_with(|| match buckets.is_empty() { - false => { - let histogram = Histogram::new(buckets) - .expect("failed to create histogram with buckets defined"); - Distribution::Histogram(histogram) - } - true => { - let summary = - HdrHistogram::new(3).expect("failed to create histogram"); - Distribution::Summary(summary, 0) - } - }); + let buckets = sorted_overrides + .iter() + .find(|(k, _)| name.ends_with(*k)) + .map(|(_, buckets)| *buckets) + .unwrap_or(&self.buckets); - match entry { - Distribution::Histogram(histogram) => handle - .read_histogram_with_clear(|samples| histogram.record_many(samples)), - Distribution::Summary(summary, sum) => { - handle.read_histogram_with_clear(|samples| { - for sample in samples { - let _ = summary.record(*sample); - *sum += *sample; + let mut wg = self.distributions.write(); + let entry = wg + .entry(name.clone()) + .or_insert_with(|| HashMap::new()) + .entry(labels) + .or_insert_with(|| match buckets.is_empty() { + false => { + let histogram = Histogram::new(buckets) + .expect("failed to create histogram with buckets defined"); + Distribution::Histogram(histogram) } - }) + true => { + let summary = + HdrHistogram::new(3).expect("failed to create histogram"); + Distribution::Summary(summary, 0) + } + }); + + match entry { + Distribution::Histogram(histogram) => { + handle.read_histogram_with_clear(|samples| { + histogram.record_many(samples) + }) + } + Distribution::Summary(summary, sum) => handle + .read_histogram_with_clear(|samples| { + for sample in samples { + let _ = summary.record(*sample); + *sum += *sample; + } + }), } } } @@ -371,6 +423,7 @@ pub struct PrometheusBuilder { listen_address: SocketAddr, quantiles: Vec, buckets: Vec, + idle_timeout: Option, buckets_by_name: Option>>, } @@ -383,6 +436,7 @@ impl PrometheusBuilder { listen_address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 9000), quantiles, buckets: vec![], + idle_timeout: None, buckets_by_name: None, } } @@ -422,6 +476,17 @@ impl PrometheusBuilder { self } + /// Sets the idle timeout for metrics. + /// + /// If a metric hasn't been updated within this timeout, it will be removed from the registry + /// and in turn removed from the normal scrape output until the metric is emitted again. This + /// behavior is driven by requests to generate rendered output, and so metrics will not be + /// removed unless a request has been made recently enough to prune the idle metrics. + pub fn idle_timeout(mut self, timeout: Option) -> Self { + self.idle_timeout = timeout; + self + } + /// Sets the buckets for a specific metric, overidding the default. /// /// The match is suffix-based, and the longest match found will be used. @@ -470,8 +535,14 @@ impl PrometheusBuilder { /// Builds the recorder and returns it. /// This function is only enabled when default features are not set. pub fn build(self) -> Result { + self.build_with_clock(Clock::new()) + } + + pub(crate) fn build_with_clock(self, clock: Clock) -> Result { let inner = Arc::new(Inner { registry: Registry::new(), + recency: Mutex::new((clock, HashMap::new())), + idle_timeout: self.idle_timeout, distributions: RwLock::new(HashMap::new()), quantiles: self.quantiles.clone(), buckets: self.buckets.clone(), @@ -496,12 +567,14 @@ impl PrometheusBuilder { ) -> Result< ( PrometheusRecorder, - impl Future> + Send + Sync + 'static, + impl Future> + Send + 'static, ), Error, > { let inner = Arc::new(Inner { registry: Registry::new(), + recency: Mutex::new((Clock::new(), HashMap::new())), + idle_timeout: self.idle_timeout, distributions: RwLock::new(HashMap::new()), quantiles: self.quantiles.clone(), buckets: self.buckets.clone(), @@ -593,15 +666,10 @@ impl Recorder for PrometheusRecorder { } fn key_to_parts(key: Key) -> (String, Vec) { - let name = key.name(); - let labels = key.labels(); let sanitize = |c| c == '.' || c == '=' || c == '{' || c == '}' || c == '+' || c == '-'; - let name = name - .parts() - .map(|s| s.replace(sanitize, "_")) - .collect::>() - .join("_"); - let labels = labels + let name = key.name().to_string().replace(sanitize, "_"); + let labels = key + .labels() .into_iter() .map(|label| { let k = label.key(); diff --git a/metrics-util/src/bucket.rs b/metrics-util/src/bucket.rs index e3fc6ae..372a80c 100644 --- a/metrics-util/src/bucket.rs +++ b/metrics-util/src/bucket.rs @@ -38,6 +38,17 @@ impl Block { } } + // Gets the length of the previous block, if it exists. + pub(crate) fn prev_len(&self, guard: &Guard) -> usize { + let tail = self.prev.load(Ordering::Acquire, guard); + if tail.is_null() { + return 0; + } + + let tail_block = unsafe { tail.deref() }; + tail_block.len() + } + /// Gets the current length of this block. pub fn len(&self) -> usize { self.read.load(Ordering::Acquire).trailing_ones() as usize @@ -123,6 +134,18 @@ impl AtomicBucket { } } + /// Checks whether or not this bucket is empty. + pub fn is_empty(&self) -> bool { + let guard = &epoch_pin(); + let tail = self.tail.load(Ordering::Acquire, guard); + if tail.is_null() { + return true; + } + + let tail_block = unsafe { tail.deref() }; + tail_block.len() == 0 && tail_block.prev_len(&guard) == 0 + } + /// Pushes an element into the bucket. pub fn push(&self, value: T) { let mut original = value; @@ -521,4 +544,23 @@ mod tests { let snapshot = bucket.data(); assert_eq!(snapshot.len(), 0); } + + #[test] + fn test_bucket_len_and_prev_len() { + let bucket = AtomicBucket::new(); + assert!(bucket.is_empty()); + + let snapshot = bucket.data(); + assert_eq!(snapshot.len(), 0); + + // Just making sure that `is_empty` holds as we go from + // the first block, to the second block, to exercise the + // `Block::prev_len` codepath. + let mut i = 0; + while i < BLOCK_SIZE * 2 { + bucket.push(i); + assert!(!bucket.is_empty()); + i += 1; + } + } } diff --git a/metrics-util/src/debugging.rs b/metrics-util/src/debugging.rs index 4c6ab66..2f88fa2 100644 --- a/metrics-util/src/debugging.rs +++ b/metrics-util/src/debugging.rs @@ -116,13 +116,13 @@ impl Snapshotter { }; for (dk, _) in metrics.into_iter() { - if let Some(h) = handles.get(&dk) { + if let Some((_, h)) = handles.get(&dk) { collect_metric(dk, h, &self.units, &self.descs, &mut snapshot); } } } None => { - for (dk, h) in handles.into_iter() { + for (dk, (_, h)) in handles.into_iter() { collect_metric(dk, &h, &self.units, &self.descs, &mut snapshot); } } diff --git a/metrics-util/src/handle.rs b/metrics-util/src/handle.rs index 2aad114..2c9e8b4 100644 --- a/metrics-util/src/handle.rs +++ b/metrics-util/src/handle.rs @@ -107,6 +107,16 @@ impl Handle { } } + /// Reads this handle as a histogram, and whether or not it's empty. + /// + /// Panics if this handle is not a histogram. + pub fn read_histogram_is_empty(&self) -> bool { + match self { + Handle::Histogram(bucket) => bucket.is_empty(), + _ => panic!("tried to read as histogram"), + } + } + /// Reads this handle as a histogram incrementally into a closure, and clears the histogram. /// /// The closure `f` passed in is invoked multiple times with slices of values present in the diff --git a/metrics-util/src/registry.rs b/metrics-util/src/registry.rs index ebbee01..ec81341 100644 --- a/metrics-util/src/registry.rs +++ b/metrics-util/src/registry.rs @@ -1,7 +1,35 @@ -use core::hash::Hash; +use core::{ + hash::Hash, + sync::atomic::{AtomicUsize, Ordering}, +}; use dashmap::DashMap; use std::collections::HashMap; +#[derive(Debug)] +struct Generational(AtomicUsize, H); + +impl Generational { + pub fn new(h: H) -> Generational { + Generational(AtomicUsize::new(0), h) + } + + pub fn increment_generation(&self) { + self.0.fetch_add(1, Ordering::Release); + } + + pub fn get_generation(&self) -> usize { + self.0.load(Ordering::Acquire) + } + + pub fn get_inner(&self) -> &H { + &self.1 + } + + pub fn to_owned(&self) -> (usize, H) { + (self.get_generation(), self.get_inner().clone()) + } +} + /// A high-performance metric registry. /// /// `Registry` provides the ability to maintain a central listing of metrics mapped by a given key. @@ -14,17 +42,20 @@ use std::collections::HashMap; /// update the actual metric value(s) as needed. `Handle`, from this crate, is a solid default /// choice. /// -/// `Registry` handles deduplicating metrics, and will return the `Identifier` for an existing -/// metric if a caller attempts to reregister it. +/// As well, handles have an associated generation counter which is incremented any time an entry is +/// operated on. This generation is returned with the handle when querying the registry, and can be +/// used in order to delete a handle from the registry, allowing callers to prune old/stale handles +/// over time. /// /// `Registry` is optimized for reads. pub struct Registry { - map: DashMap, + map: DashMap>, } impl Registry where - K: Eq + Hash + Clone, + K: Eq + Hash + Clone + 'static, + H: Clone + 'static, { /// Creates a new `Registry`. pub fn new() -> Self { @@ -44,23 +75,34 @@ where I: FnOnce() -> H, O: FnOnce(&H) -> V, { - let valref = self.map.entry(key).or_insert_with(init); - op(valref.value()) + let valref = self.map.entry(key).or_insert_with(|| { + let value = init(); + Generational::new(value) + }); + let value = valref.value(); + let result = op(value.get_inner()); + value.increment_generation(); + result + } + + /// Deletes a handle from the registry. + /// + /// The generation of a given key is passed along when querying the registry via + /// [`get_handles`]. If the generation given here does not match the current generation, then + /// the handle will not be removed. + pub fn delete(&self, key: &K, generation: usize) -> bool { + self.map + .remove_if(key, |_, g| g.get_generation() == generation) + .is_some() } -} -impl Registry -where - K: Eq + Hash + Clone + 'static, - H: Clone + 'static, -{ /// Gets a map of all present handles, mapped by key. /// /// Handles must implement `Clone`. This map is a point-in-time snapshot of the registry. - pub fn get_handles(&self) -> HashMap { + pub fn get_handles(&self) -> HashMap { self.map .iter() - .map(|item| (item.key().clone(), item.value().clone())) + .map(|item| (item.key().clone(), item.value().to_owned())) .collect() } }