From e614847de65865db72c7a174c2f179cbe936364a Mon Sep 17 00:00:00 2001 From: jean-airoldie <25088801+jean-airoldie@users.noreply.github.com> Date: Wed, 17 Jul 2019 09:06:45 -0400 Subject: [PATCH] core: Recorder -> Observer (#35) --- Cargo.toml | 4 +- metrics-core/src/lib.rs | 101 +++++++----------- metrics-exporter-http/src/lib.rs | 56 ++++------ metrics-exporter-log/src/lib.rs | 85 ++++++--------- .../.gitignore | 0 .../CODE_OF_CONDUCT.md | 0 .../Cargo.toml | 2 +- .../LICENSE | 0 .../README.md | 0 .../src/lib.rs | 39 ++++--- .../.gitignore | 0 .../CODE_OF_CONDUCT.md | 0 .../Cargo.toml | 2 +- .../LICENSE | 0 .../README.md | 0 .../src/lib.rs | 66 ++++++------ metrics-runtime/Cargo.toml | 8 +- metrics-runtime/examples/benchmark.rs | 3 +- metrics-runtime/examples/facade.rs | 3 +- metrics-runtime/src/builder.rs | 4 +- metrics-runtime/src/common.rs | 12 ++- metrics-runtime/src/control.rs | 83 +++----------- metrics-runtime/src/data/snapshot.rs | 85 +-------------- metrics-runtime/src/lib.rs | 27 +++-- metrics-runtime/src/observers.rs | 8 ++ metrics-runtime/src/receiver.rs | 3 +- metrics-runtime/src/recorders.rs | 8 -- metrics-runtime/src/registry/metric.rs | 22 +++- metrics-runtime/src/sink.rs | 4 +- metrics-util/src/bucket.rs | 41 ++++--- metrics/src/lib.rs | 6 +- 31 files changed, 258 insertions(+), 414 deletions(-) rename {metrics-recorder-prometheus => metrics-observer-prometheus}/.gitignore (100%) rename {metrics-recorder-prometheus => metrics-observer-prometheus}/CODE_OF_CONDUCT.md (100%) rename {metrics-recorder-prometheus => metrics-observer-prometheus}/Cargo.toml (93%) rename {metrics-recorder-prometheus => metrics-observer-prometheus}/LICENSE (100%) rename {metrics-recorder-prometheus => metrics-observer-prometheus}/README.md (100%) rename {metrics-recorder-prometheus => metrics-observer-prometheus}/src/lib.rs (85%) rename {metrics-recorder-text => metrics-observer-text}/.gitignore (100%) rename {metrics-recorder-text => metrics-observer-text}/CODE_OF_CONDUCT.md (100%) rename {metrics-recorder-text => metrics-observer-text}/Cargo.toml (94%) rename {metrics-recorder-text => metrics-observer-text}/LICENSE (100%) rename {metrics-recorder-text => metrics-observer-text}/README.md (100%) rename {metrics-recorder-text => metrics-observer-text}/src/lib.rs (83%) create mode 100644 metrics-runtime/src/observers.rs delete mode 100644 metrics-runtime/src/recorders.rs diff --git a/Cargo.toml b/Cargo.toml index ade6a34..ee6f001 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,6 @@ members = [ "metrics-util", "metrics-exporter-log", "metrics-exporter-http", - "metrics-recorder-text", - "metrics-recorder-prometheus", + "metrics-observer-text", + "metrics-observer-prometheus", ] diff --git a/metrics-core/src/lib.rs b/metrics-core/src/lib.rs index 4b5928f..1118d16 100644 --- a/metrics-core/src/lib.rs +++ b/metrics-core/src/lib.rs @@ -33,11 +33,7 @@ //! Histograms are a convenient way to measure behavior not only at the median, but at the edges of //! normal operating behavior. #![deny(missing_docs)] -use futures::future::Future; -use std::borrow::Cow; -use std::fmt; -use std::slice::Iter; -use std::time::Duration; +use std::{borrow::Cow, fmt, slice::Iter, time::Duration}; /// An allocation-optimized string. /// @@ -149,16 +145,15 @@ impl Key { impl fmt::Display for Key { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self.labels.is_empty() { - true => write!(f, "Key({}", self.name), - false => { - let kv_pairs = self - .labels - .iter() - .map(|label| format!("{} = {}", label.0, label.1)) - .collect::>(); - write!(f, "Key({}, [{}])", self.name, kv_pairs.join(", ")) - } + if self.labels.is_empty() { + write!(f, "Key({}", self.name) + } else { + let kv_pairs = self + .labels + .iter() + .map(|label| format!("{} = {}", label.0, label.1)) + .collect::>(); + write!(f, "Key({}, [{}])", self.name, kv_pairs.join(", ")) } } } @@ -255,80 +250,64 @@ impl AsNanoseconds for Duration { } } -/// A value that records metrics. -pub trait Recorder { - /// Records a counter. +/// A value that observes metrics. +pub trait Observer { + /// The method called when a counter is observed. /// - /// From the perspective of an recorder, a counter and gauge are essentially identical, insofar + /// From the perspective of an observer, a counter and gauge are essentially identical, insofar /// as they are both a single value tied to a key. From the perspective of a collector, /// counters and gauges usually have slightly different modes of operation. /// /// For the sake of flexibility on the exporter side, both are provided. - fn record_counter(&mut self, key: Key, value: u64); + fn observe_counter(&mut self, key: Key, value: u64); - /// Records a gauge. + /// The method called when a gauge is observed. /// - /// From the perspective of a recorder, a counter and gauge are essentially identical, insofar + /// From the perspective of a observer, a counter and gauge are essentially identical, insofar /// as they are both a single value tied to a key. From the perspective of a collector, /// counters and gauges usually have slightly different modes of operation. /// /// For the sake of flexibility on the exporter side, both are provided. - fn record_gauge(&mut self, key: Key, value: i64); + fn observe_gauge(&mut self, key: Key, value: i64); - /// Records a histogram. + /// The method called when an histogram is observed. /// - /// Recorders are expected to tally their own histogram views, so this will be called with all + /// Observers are expected to tally their own histogram views, so this will be called with all /// of the underlying observed values, and callers will need to process them accordingly. /// /// There is no guarantee that this method will not be called multiple times for the same key. - fn record_histogram(&mut self, key: Key, values: &[u64]); + fn observe_histogram(&mut self, key: Key, values: &[u64]); } -/// A value that can build a recorder. +/// A value that can build an observer. /// -/// Recorders are intended to be single-use containers for rendering a snapshot in a particular -/// format. As many systems are multi-threaded, we can't easily share a single recorder amongst -/// multiple threads, and so we create a recorder per snapshot, tying them together. +/// Observers are containers used for rendering a snapshot in a particular format. +/// As many systems are multi-threaded, we can't easily share a single recorder amongst +/// multiple threads, and so we create a recorder per observation, tying them together. /// -/// A builder allows us to generate a recorder on demand, giving each specific recorder an -/// interface by which they can do any necessary configuration, initialization, etc of the recorder -/// before handing it over to the exporter. +/// A builder allows us to generate an observer on demand, giving each specific recorder an +/// interface by which they can do any necessary configuration, initialization, etc of the +/// observer before handing it over to the exporter. pub trait Builder { - /// The recorder created by this builder. - type Output: Recorder; + /// The observer created by this builder. + type Output: Observer; /// Creates a new recorder. fn build(&self) -> Self::Output; } -/// A value that holds a point-in-time view of collected metrics. -pub trait Snapshot { - /// Records the snapshot to the given recorder. - fn record(&self, recorder: &mut R); +/// A value that can produce a `T` by draining its content. +/// +/// After being drained, the value should be ready to be reused. +pub trait Drain { + /// Drain the `Observer`, producing a `T`. + fn drain(&mut self) -> T; } -/// A value that can provide on-demand snapshots. -pub trait SnapshotProvider { - /// Snapshot given by the provider. - type Snapshot: Snapshot; - /// Errors produced during generation. - type SnapshotError; - - /// Gets a snapshot. - fn get_snapshot(&self) -> Result; -} - -/// A value that can provide on-demand snapshots asynchronously. -pub trait AsyncSnapshotProvider { - /// Snapshot given by the provider. - type Snapshot: Snapshot; - /// Errors produced during generation. - type SnapshotError; - /// The future response value. - type SnapshotFuture: Future; - - /// Gets a snapshot asynchronously. - fn get_snapshot_async(&self) -> Self::SnapshotFuture; +/// A value whose metrics can be observed by an `Observer`. +pub trait Observe { + /// Observe point-in-time view of the collected metrics. + fn observe(&self, observer: &mut O); } /// Helper macro for generating a set of labels. diff --git a/metrics-exporter-http/src/lib.rs b/metrics-exporter-http/src/lib.rs index 31e3022..85bf357 100644 --- a/metrics-exporter-http/src/lib.rs +++ b/metrics-exporter-http/src/lib.rs @@ -1,7 +1,7 @@ //! Exports metrics over HTTP. //! -//! This exporter can utilize recorders that are able to be converted to a textual representation -//! via [`Into`]. It will respond to any requests, regardless of the method or path. +//! This exporter can utilize observers that are able to be converted to a textual representation +//! via [`Drain`]. It will respond to any requests, regardless of the method or path. //! //! # Run Modes //! - `run` can be used to block the current thread, running the HTTP server on the configured @@ -13,13 +13,13 @@ extern crate log; use hyper::rt::run as hyper_run; -use hyper::rt::Future; -use hyper::service::service_fn; -use hyper::{Body, Response, Server}; -use metrics_core::{AsyncSnapshotProvider, Builder, Snapshot}; -use std::error::Error; -use std::net::SocketAddr; -use std::sync::Arc; +use hyper::{ + rt::Future, + service::service_fn_ok, + {Body, Response, Server}, +}; +use metrics_core::{Builder, Drain, Observe, Observer}; +use std::{net::SocketAddr, sync::Arc}; /// Exports metrics over HTTP. pub struct HttpExporter { @@ -30,15 +30,13 @@ pub struct HttpExporter { impl HttpExporter where - C: AsyncSnapshotProvider + Send + Sync + 'static, - C::SnapshotFuture: Send + Sync + 'static, - C::SnapshotError: Error + Send + Sync + 'static, + C: Observe + Send + Sync + 'static, B: Builder + Send + Sync + 'static, - B::Output: Into, + B::Output: Drain + Observer, { /// Creates a new [`HttpExporter`] that listens on the given `address`. /// - /// Recorders expose their output by being converted into strings. + /// Observers expose their output by being converted into strings. pub fn new(controller: C, builder: B, address: SocketAddr) -> Self { HttpExporter { controller, @@ -50,7 +48,7 @@ where /// Run the exporter on the current thread. /// /// This starts an HTTP server on the `address` the exporter was originally configured with, - /// responding to any request with the output of the configured recorder. + /// responding to any request with the output of the configured observer. pub fn run(self) { let server = self.into_future(); hyper_run(server); @@ -59,7 +57,7 @@ where /// Converts this exporter into a future which can be driven externally. /// /// This starts an HTTP server on the `address` the exporter was originally configured with, - /// responding to any request with the output of the configured recorder. + /// responding to any request with the output of the configured observer. pub fn into_future(self) -> impl Future { let controller = self.controller; let builder = self.builder; @@ -75,33 +73,23 @@ fn build_hyper_server( address: SocketAddr, ) -> impl Future where - C: AsyncSnapshotProvider + Send + Sync + 'static, - C::SnapshotFuture: Send + Sync + 'static, - C::SnapshotError: Error + Send + Sync + 'static, + C: Observe + Send + Sync + 'static, B: Builder + Send + Sync + 'static, - B::Output: Into, + B::Output: Drain + Observer, { let builder = Arc::new(builder); let controller = Arc::new(controller); let service = move || { - let controller = controller.clone(); + let controller2 = controller.clone(); let builder = builder.clone(); - service_fn(move |_| { - let builder = builder.clone(); + service_fn_ok(move |_| { + let mut observer = builder.build(); - controller - .get_snapshot_async() - .then(move |result| match result { - Ok(snapshot) => { - let mut recorder = builder.build(); - snapshot.record(&mut recorder); - let output = recorder.into(); - Ok(Response::new(Body::from(output))) - } - Err(e) => Err(e), - }) + controller2.observe(&mut observer); + let output = observer.drain(); + Response::new(Body::from(output)) }) }; diff --git a/metrics-exporter-log/src/lib.rs b/metrics-exporter-log/src/lib.rs index b5ca4af..c591e11 100644 --- a/metrics-exporter-log/src/lib.rs +++ b/metrics-exporter-log/src/lib.rs @@ -1,7 +1,7 @@ //! Exports metrics via the `log` crate. //! -//! This exporter can utilize recorders that are able to be converted to a textual representation -//! via [`Into`]. It will emit that output by logging via the `log` crate at the specified +//! This exporter can utilize observers that are able to be converted to a textual representation +//! via [`Drain`]. It will emit that output by logging via the `log` crate at the specified //! level. //! //! # Run Modes @@ -15,89 +15,64 @@ extern crate log; use futures::prelude::*; use log::Level; -use metrics_core::{AsyncSnapshotProvider, Builder, Snapshot, SnapshotProvider}; -use std::error::Error; -use std::thread; -use std::time::Duration; +use metrics_core::{Builder, Drain, Observe, Observer}; +use std::{thread, time::Duration}; use tokio_timer::Interval; /// Exports metrics by converting them to a textual representation and logging them. -pub struct LogExporter { +pub struct LogExporter +where + B: Builder, +{ controller: C, - builder: B, + observer: B::Output, level: Level, + interval: Duration, } impl LogExporter where B: Builder, - B::Output: Into, + B::Output: Drain + Observer, + C: Observe, { /// Creates a new [`LogExporter`] that logs at the configurable level. /// - /// Recorders expose their output by being converted into strings. - pub fn new(controller: C, builder: B, level: Level) -> Self { + /// Observers expose their output by being converted into strings. + pub fn new(controller: C, builder: B, level: Level, interval: Duration) -> Self { LogExporter { controller, - builder, + observer: builder.build(), level, + interval, } } - /// Runs this exporter on the current thread, logging output on the given interval. - pub fn run(&mut self, interval: Duration) - where - C: SnapshotProvider, - C::SnapshotError: Error, - { + /// Runs this exporter on the current thread, logging output at the interval + /// given on construction. + pub fn run(&mut self) { loop { - thread::sleep(interval); + thread::sleep(self.interval); self.turn(); } } /// Run this exporter, logging output only once. - pub fn turn(&self) - where - C: SnapshotProvider, - C::SnapshotError: Error, - { - match self.controller.get_snapshot() { - Ok(snapshot) => { - let mut recorder = self.builder.build(); - snapshot.record(&mut recorder); - let output = recorder.into(); - log!(self.level, "{}", output); - } - Err(e) => log!(Level::Error, "failed to get snapshot: {}", e), - } + pub fn turn(&mut self) { + self.controller.observe(&mut self.observer); + let output = self.observer.drain(); + log!(self.level, "{}", output); } - /// Converts this exporter into a future which logs output on the given interval. - pub fn into_future(self, interval: Duration) -> impl Future - where - C: AsyncSnapshotProvider, - C::SnapshotError: Error, - { - let controller = self.controller; - let builder = self.builder; - let level = self.level; - - Interval::new_interval(interval) + /// Converts this exporter into a future which logs output at the intervel + /// given on construction. + pub fn into_future(mut self) -> impl Future { + Interval::new_interval(self.interval) .map_err(|_| ()) .for_each(move |_| { - let mut recorder = builder.build(); - - controller - .get_snapshot_async() - .and_then(move |snapshot| { - snapshot.record(&mut recorder); - let output = recorder.into(); - log!(level, "{}", output); - Ok(()) - }) - .map_err(|e| error!("failed to get snapshot: {}", e)) + self.turn(); + Ok(()) }) } } diff --git a/metrics-recorder-prometheus/.gitignore b/metrics-observer-prometheus/.gitignore similarity index 100% rename from metrics-recorder-prometheus/.gitignore rename to metrics-observer-prometheus/.gitignore diff --git a/metrics-recorder-prometheus/CODE_OF_CONDUCT.md b/metrics-observer-prometheus/CODE_OF_CONDUCT.md similarity index 100% rename from metrics-recorder-prometheus/CODE_OF_CONDUCT.md rename to metrics-observer-prometheus/CODE_OF_CONDUCT.md diff --git a/metrics-recorder-prometheus/Cargo.toml b/metrics-observer-prometheus/Cargo.toml similarity index 93% rename from metrics-recorder-prometheus/Cargo.toml rename to metrics-observer-prometheus/Cargo.toml index 69e7ca7..5e3c21f 100644 --- a/metrics-recorder-prometheus/Cargo.toml +++ b/metrics-observer-prometheus/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "metrics-recorder-prometheus" +name = "metrics-observer-prometheus" version = "0.2.2" authors = ["Toby Lawrence "] edition = "2018" diff --git a/metrics-recorder-prometheus/LICENSE b/metrics-observer-prometheus/LICENSE similarity index 100% rename from metrics-recorder-prometheus/LICENSE rename to metrics-observer-prometheus/LICENSE diff --git a/metrics-recorder-prometheus/README.md b/metrics-observer-prometheus/README.md similarity index 100% rename from metrics-recorder-prometheus/README.md rename to metrics-observer-prometheus/README.md diff --git a/metrics-recorder-prometheus/src/lib.rs b/metrics-observer-prometheus/src/lib.rs similarity index 85% rename from metrics-recorder-prometheus/src/lib.rs rename to metrics-observer-prometheus/src/lib.rs index 3f2428a..584582e 100644 --- a/metrics-recorder-prometheus/src/lib.rs +++ b/metrics-observer-prometheus/src/lib.rs @@ -1,10 +1,9 @@ //! Records metrics in the Prometheus exposition format. #![deny(missing_docs)] use hdrhistogram::Histogram; -use metrics_core::{Builder, Key, Label, Recorder}; +use metrics_core::{Builder, Drain, Key, Label, Observer}; use metrics_util::{parse_quantiles, Quantile}; -use std::collections::HashMap; -use std::time::SystemTime; +use std::{collections::HashMap, time::SystemTime}; /// Builder for [`PrometheusRecorder`]. pub struct PrometheusBuilder { @@ -20,7 +19,7 @@ impl PrometheusBuilder { /// /// The configured quantiles are used when rendering any histograms. pub fn new() -> Self { - Self::with_quantiles(&[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0]) + Self::default() } /// Creates a new [`PrometheusBuilder`] with the given set of quantiles. @@ -36,10 +35,10 @@ impl PrometheusBuilder { } impl Builder for PrometheusBuilder { - type Output = PrometheusRecorder; + type Output = PrometheusObserver; fn build(&self) -> Self::Output { - PrometheusRecorder { + PrometheusObserver { quantiles: self.quantiles.clone(), histos: HashMap::new(), output: get_prom_expo_header(), @@ -47,15 +46,21 @@ impl Builder for PrometheusBuilder { } } +impl Default for PrometheusBuilder { + fn default() -> Self { + Self::with_quantiles(&[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0]) + } +} + /// Records metrics in the Prometheus exposition format. -pub struct PrometheusRecorder { +pub struct PrometheusObserver { pub(crate) quantiles: Vec, pub(crate) histos: HashMap)>, pub(crate) output: String, } -impl Recorder for PrometheusRecorder { - fn record_counter(&mut self, key: Key, value: u64) { +impl Observer for PrometheusObserver { + fn observe_counter(&mut self, key: Key, value: u64) { let (name, labels) = key_to_parts(key); let full_name = render_labeled_name(&name, &labels); self.output.push_str("\n# TYPE "); @@ -67,7 +72,7 @@ impl Recorder for PrometheusRecorder { self.output.push_str("\n"); } - fn record_gauge(&mut self, key: Key, value: i64) { + fn observe_gauge(&mut self, key: Key, value: i64) { let (name, labels) = key_to_parts(key); let full_name = render_labeled_name(&name, &labels); self.output.push_str("\n# TYPE "); @@ -79,7 +84,7 @@ impl Recorder for PrometheusRecorder { self.output.push_str("\n"); } - fn record_histogram(&mut self, key: Key, values: &[u64]) { + fn observe_histogram(&mut self, key: Key, values: &[u64]) { let entry = self.histos.entry(key).or_insert_with(|| { let h = Histogram::::new(3).expect("failed to create histogram"); (0, h) @@ -87,24 +92,24 @@ impl Recorder for PrometheusRecorder { let (sum, h) = entry; for value in values { - h.record(*value).expect("failed to record histogram value"); + h.record(*value).expect("failed to observe histogram value"); *sum += *value; } } } -impl From for String { - fn from(r: PrometheusRecorder) -> String { - let mut output = r.output; +impl Drain for PrometheusObserver { + fn drain(&mut self) -> String { + let mut output: String = self.output.drain(..).collect(); - for (key, sh) in r.histos { + for (key, sh) in self.histos.drain() { let (sum, hist) = sh; let (name, labels) = key_to_parts(key); output.push_str("\n# TYPE "); output.push_str(name.as_str()); output.push_str(" summary\n"); - for quantile in &r.quantiles { + for quantile in &self.quantiles { let value = hist.value_at_quantile(quantile.value()); let mut labels = labels.clone(); labels.push(format!("quantile=\"{}\"", quantile.value())); diff --git a/metrics-recorder-text/.gitignore b/metrics-observer-text/.gitignore similarity index 100% rename from metrics-recorder-text/.gitignore rename to metrics-observer-text/.gitignore diff --git a/metrics-recorder-text/CODE_OF_CONDUCT.md b/metrics-observer-text/CODE_OF_CONDUCT.md similarity index 100% rename from metrics-recorder-text/CODE_OF_CONDUCT.md rename to metrics-observer-text/CODE_OF_CONDUCT.md diff --git a/metrics-recorder-text/Cargo.toml b/metrics-observer-text/Cargo.toml similarity index 94% rename from metrics-recorder-text/Cargo.toml rename to metrics-observer-text/Cargo.toml index 700c401..1a55363 100644 --- a/metrics-recorder-text/Cargo.toml +++ b/metrics-observer-text/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "metrics-recorder-text" +name = "metrics-observer-text" version = "0.2.2" authors = ["Toby Lawrence "] edition = "2018" diff --git a/metrics-recorder-text/LICENSE b/metrics-observer-text/LICENSE similarity index 100% rename from metrics-recorder-text/LICENSE rename to metrics-observer-text/LICENSE diff --git a/metrics-recorder-text/README.md b/metrics-observer-text/README.md similarity index 100% rename from metrics-recorder-text/README.md rename to metrics-observer-text/README.md diff --git a/metrics-recorder-text/src/lib.rs b/metrics-observer-text/src/lib.rs similarity index 83% rename from metrics-recorder-text/src/lib.rs rename to metrics-observer-text/src/lib.rs index 22b8922..ca21a37 100644 --- a/metrics-recorder-text/src/lib.rs +++ b/metrics-observer-text/src/lib.rs @@ -1,4 +1,4 @@ -//! Records metrics in a hierarchical, text-based format. +//! Observes metrics in a hierarchical, text-based format. //! //! Metric scopes are used to provide the hierarchy and indentation of metrics. As an example, for //! a snapshot with two metrics — `server.msgs_received` and `server.msgs_sent` — we would @@ -26,7 +26,7 @@ //! ## Histograms //! //! Histograms are rendered with a configurable set of quantiles that are provided when creating an -//! instance of `TextRecorder`. They are formatted using human-readable labels when displayed to +//! instance of `TextObserver`. They are formatted using human-readable labels when displayed to //! the user. For example, 0.0 is rendered as "min", 1.0 as "max", and anything in between using //! the common "pXXX" format i.e. a quantile of 0.5 or percentile of 50 would be p50, a quantile of //! 0.999 or percentile of 99.9 would be p999, and so on. @@ -44,10 +44,12 @@ //! #![deny(missing_docs)] use hdrhistogram::Histogram; -use metrics_core::{Builder, Key, Label, Recorder}; +use metrics_core::{Builder, Drain, Key, Label, Observer}; use metrics_util::{parse_quantiles, Quantile}; -use std::collections::{HashMap, VecDeque}; -use std::fmt::Display; +use std::{ + collections::{HashMap, VecDeque}, + fmt::Display, +}; /// Builder for [`TextRecorder`]. pub struct TextBuilder { @@ -57,13 +59,13 @@ pub struct TextBuilder { impl TextBuilder { /// Creates a new [`TextBuilder`] with a default set of quantiles. /// - /// Configures the recorder with these default quantiles: 0.0, 0.5, 0.9, 0.95, 0.99, 0.999, and + /// Configures the observer with these default quantiles: 0.0, 0.5, 0.9, 0.95, 0.99, 0.999, and /// 1.0. If you want to customize the quantiles used, you can call - /// [`TextBuilder::with_quantiles`]. + /// [`TextBuilder::with_quantiles`]. /// /// The configured quantiles are used when rendering any histograms. pub fn new() -> Self { - Self::with_quantiles(&[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0]) + Self::default() } /// Creates a new [`TextBuilder`] with the given set of quantiles. @@ -79,10 +81,10 @@ impl TextBuilder { } impl Builder for TextBuilder { - type Output = TextRecorder; + type Output = TextObserver; fn build(&self) -> Self::Output { - TextRecorder { + TextObserver { quantiles: self.quantiles.clone(), structure: MetricsTree::with_level(0), histos: HashMap::new(), @@ -90,27 +92,33 @@ impl Builder for TextBuilder { } } +impl Default for TextBuilder { + fn default() -> Self { + Self::with_quantiles(&[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0]) + } +} + /// Records metrics in a hierarchical, text-based format. -pub struct TextRecorder { +pub struct TextObserver { pub(crate) quantiles: Vec, pub(crate) structure: MetricsTree, pub(crate) histos: HashMap>, } -impl Recorder for TextRecorder { - fn record_counter(&mut self, key: Key, value: u64) { +impl Observer for TextObserver { + fn observe_counter(&mut self, key: Key, value: u64) { let (name_parts, name) = key_to_parts(key); let mut values = single_value_to_values(name, value); self.structure.insert(name_parts, &mut values); } - fn record_gauge(&mut self, key: Key, value: i64) { + fn observe_gauge(&mut self, key: Key, value: i64) { let (name_parts, name) = key_to_parts(key); let mut values = single_value_to_values(name, value); self.structure.insert(name_parts, &mut values); } - fn record_histogram(&mut self, key: Key, values: &[u64]) { + fn observe_histogram(&mut self, key: Key, values: &[u64]) { let entry = self .histos .entry(key) @@ -119,12 +127,11 @@ impl Recorder for TextRecorder { for value in values { entry .record(*value) - .expect("failed to record histogram value"); + .expect("failed to observe histogram value"); } } } -#[derive(Default)] struct MetricsTree { level: usize, current: Vec, @@ -164,15 +171,15 @@ impl MetricsTree { } } - pub fn into_output(self) -> String { + pub fn render(&mut self) -> String { let indent = " ".repeat(self.level); let mut output = String::new(); let mut sorted = self .current - .into_iter() + .drain(..) .map(SortEntry::Inline) - .chain(self.next.into_iter().map(|(k, v)| SortEntry::Nested(k, v))) + .chain(self.next.drain().map(|(k, v)| SortEntry::Nested(k, v))) .collect::>(); sorted.sort(); @@ -182,12 +189,12 @@ impl MetricsTree { output.push_str(s.as_str()); output.push_str("\n"); } - SortEntry::Nested(s, inner) => { + SortEntry::Nested(s, mut inner) => { output.push_str(indent.as_str()); output.push_str(s.as_str()); output.push_str(":\n"); - let layer_output = inner.into_output(); + let layer_output = inner.render(); output.push_str(layer_output.as_str()); } } @@ -197,15 +204,14 @@ impl MetricsTree { } } -impl From for String { - fn from(r: TextRecorder) -> String { - let mut structure = r.structure; - for (key, h) in r.histos { +impl Drain for TextObserver { + fn drain(&mut self) -> String { + for (key, h) in self.histos.drain() { let (name_parts, name) = key_to_parts(key); - let mut values = hist_to_values(name, h, &r.quantiles); - structure.insert(name_parts, &mut values); + let mut values = hist_to_values(name, h.clone(), &self.quantiles); + self.structure.insert(name_parts, &mut values); } - structure.into_output() + self.structure.render() } } @@ -215,7 +221,7 @@ enum SortEntry { } impl SortEntry { - fn name(&self) -> &String { + fn name(&self) -> &str { match self { SortEntry::Inline(s) => s, SortEntry::Nested(s, _) => s, diff --git a/metrics-runtime/Cargo.toml b/metrics-runtime/Cargo.toml index f4d0222..e4a099f 100644 --- a/metrics-runtime/Cargo.toml +++ b/metrics-runtime/Cargo.toml @@ -17,9 +17,9 @@ readme = "README.md" keywords = ["metrics", "telemetry", "histogram", "counter", "gauge"] [features] -default = ["exporters", "recorders"] +default = ["exporters", "observers"] exporters = ["metrics-exporter-log", "metrics-exporter-http"] -recorders = ["metrics-recorder-text", "metrics-recorder-prometheus"] +observers = ["metrics-observer-text", "metrics-observer-prometheus"] [[bench]] name = "histogram" @@ -38,8 +38,8 @@ futures = "^0.1" crossbeam-utils = "^0.6" metrics-exporter-log = { path = "../metrics-exporter-log", version = "^0.2", optional = true } metrics-exporter-http = { path = "../metrics-exporter-http", version = "^0.1", optional = true } -metrics-recorder-text = { path = "../metrics-recorder-text", version = "^0.2", optional = true } -metrics-recorder-prometheus = { path = "../metrics-recorder-prometheus", version = "^0.2", optional = true } +metrics-observer-text = { path = "../metrics-observer-text", version = "^0.2", optional = true } +metrics-observer-prometheus = { path = "../metrics-observer-prometheus", version = "^0.2", optional = true } [dev-dependencies] log = "^0.4" diff --git a/metrics-runtime/examples/benchmark.rs b/metrics-runtime/examples/benchmark.rs index 5c3721f..37ca752 100644 --- a/metrics-runtime/examples/benchmark.rs +++ b/metrics-runtime/examples/benchmark.rs @@ -8,7 +8,6 @@ extern crate metrics_runtime; use getopts::Options; use hdrhistogram::Histogram; -use metrics_core::SnapshotProvider; use metrics_runtime::{Receiver, Sink}; use quanta::Clock; use std::{ @@ -255,7 +254,7 @@ fn main() { let t1 = Instant::now(); let start = Instant::now(); - let _snapshot = controller.get_snapshot().unwrap(); + let _snapshot = controller.snapshot(); let end = Instant::now(); snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64); diff --git a/metrics-runtime/examples/facade.rs b/metrics-runtime/examples/facade.rs index 640429e..cedc920 100644 --- a/metrics-runtime/examples/facade.rs +++ b/metrics-runtime/examples/facade.rs @@ -11,7 +11,6 @@ extern crate metrics; use getopts::Options; use hdrhistogram::Histogram; -use metrics_core::SnapshotProvider; use metrics_runtime::Receiver; use quanta::Clock; use std::{ @@ -192,7 +191,7 @@ fn main() { let t1 = Instant::now(); let start = Instant::now(); - let _snapshot = controller.get_snapshot().unwrap(); + let _snapshot = controller.snapshot(); let end = Instant::now(); snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64); diff --git a/metrics-runtime/src/builder.rs b/metrics-runtime/src/builder.rs index af68ba5..0a65773 100644 --- a/metrics-runtime/src/builder.rs +++ b/metrics-runtime/src/builder.rs @@ -1,7 +1,5 @@ use crate::{config::Configuration, Receiver}; -use std::error::Error; -use std::fmt; -use std::time::Duration; +use std::{error::Error, fmt, time::Duration}; /// Errors during receiver creation. #[derive(Debug, Clone)] diff --git a/metrics-runtime/src/common.rs b/metrics-runtime/src/common.rs index 74595c0..ca5be34 100644 --- a/metrics-runtime/src/common.rs +++ b/metrics-runtime/src/common.rs @@ -2,10 +2,14 @@ use crate::data::AtomicWindowedHistogram; use metrics_core::{Key, ScopedString}; use metrics_util::StreamingIntegers; use quanta::Clock; -use std::ops::Deref; -use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::{ + ops::Deref, + sync::{ + atomic::{AtomicI64, AtomicU64, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; /// A scope, or context, for a metric. #[doc(hidden)] diff --git a/metrics-runtime/src/control.rs b/metrics-runtime/src/control.rs index e8c0a5f..91b898b 100644 --- a/metrics-runtime/src/control.rs +++ b/metrics-runtime/src/control.rs @@ -1,32 +1,12 @@ -use crate::data::Snapshot; -use crate::registry::{MetricRegistry, ScopeRegistry}; -use futures::prelude::*; -use metrics_core::{AsyncSnapshotProvider, SnapshotProvider}; -use std::error::Error; -use std::fmt; +use crate::{ + data::Snapshot, + registry::{MetricRegistry, ScopeRegistry}, +}; + +use metrics_core::{Observe, Observer}; + use std::sync::Arc; -/// Error during snapshot retrieval. -#[derive(Debug, Clone)] -pub enum SnapshotError { - /// The future was polled again after returning the snapshot. - AlreadyUsed, - - #[doc(hidden)] - _NonExhaustive, -} - -impl Error for SnapshotError {} - -impl fmt::Display for SnapshotError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - SnapshotError::AlreadyUsed => write!(f, "snapshot already returned from future"), - SnapshotError::_NonExhaustive => write!(f, "non-exhaustive matching"), - } - } -} - /// Handle for acquiring snapshots. /// /// `Controller` is [`metrics-core`]-compatible as a snapshot provider, both for synchronous and @@ -49,52 +29,15 @@ impl Controller { scope_registry, } } -} -impl SnapshotProvider for Controller { - type Snapshot = Snapshot; - type SnapshotError = SnapshotError; - - /// Gets a snapshot. - fn get_snapshot(&self) -> Result { - let snapshot = self.metric_registry.get_snapshot(); - Ok(snapshot) + /// Provide a snapshot of its collected metrics. + pub fn snapshot(&self) -> Snapshot { + self.metric_registry.snapshot() } } -impl AsyncSnapshotProvider for Controller { - type Snapshot = Snapshot; - type SnapshotError = SnapshotError; - type SnapshotFuture = SnapshotFuture; - - /// Gets a snapshot asynchronously. - fn get_snapshot_async(&self) -> Self::SnapshotFuture { - let snapshot = self.metric_registry.get_snapshot(); - SnapshotFuture::new(snapshot) - } -} - -/// A future representing collecting a snapshot. -pub struct SnapshotFuture { - snapshot: Option, -} - -impl SnapshotFuture { - pub fn new(snapshot: Snapshot) -> Self { - SnapshotFuture { - snapshot: Some(snapshot), - } - } -} - -impl Future for SnapshotFuture { - type Item = Snapshot; - type Error = SnapshotError; - - fn poll(&mut self) -> Poll { - self.snapshot - .take() - .ok_or(SnapshotError::AlreadyUsed) - .map(Async::Ready) +impl Observe for Controller { + fn observe(&self, observer: &mut O) { + self.metric_registry.observe(observer) } } diff --git a/metrics-runtime/src/data/snapshot.rs b/metrics-runtime/src/data/snapshot.rs index 9192295..5a5ac03 100644 --- a/metrics-runtime/src/data/snapshot.rs +++ b/metrics-runtime/src/data/snapshot.rs @@ -1,5 +1,5 @@ use crate::common::ValueSnapshot; -use metrics_core::{Key, Recorder, Snapshot as MetricsSnapshot}; +use metrics_core::Key; /// A point-in-time view of metric data. #[derive(Default, Debug)] @@ -22,86 +22,3 @@ impl Snapshot { self.measurements.len() != 0 } } - -impl MetricsSnapshot for Snapshot { - /// Records the snapshot to the given recorder. - fn record(&self, recorder: &mut R) { - for (key, snapshot) in &self.measurements { - let key = key.clone(); - match snapshot { - ValueSnapshot::Counter(value) => recorder.record_counter(key, *value), - ValueSnapshot::Gauge(value) => recorder.record_gauge(key, *value), - ValueSnapshot::Histogram(stream) => stream.decompress_with(|values| { - recorder.record_histogram(key.clone(), values); - }), - } - } - } -} - -#[cfg(test)] -mod tests { - use super::{MetricsSnapshot, Recorder, Snapshot, ValueSnapshot}; - use metrics_core::Key; - use metrics_util::StreamingIntegers; - use std::collections::HashMap; - - #[derive(Default)] - struct MockRecorder { - counter: HashMap, - gauge: HashMap, - histogram: HashMap>, - } - - impl MockRecorder { - pub fn get_counter_value(&self, key: &Key) -> Option<&u64> { - self.counter.get(key) - } - - pub fn get_gauge_value(&self, key: &Key) -> Option<&i64> { - self.gauge.get(key) - } - - pub fn get_histogram_values(&self, key: &Key) -> Option<&Vec> { - self.histogram.get(key) - } - } - - impl Recorder for MockRecorder { - fn record_counter(&mut self, key: Key, value: u64) { - let _ = self.counter.insert(key, value); - } - - fn record_gauge(&mut self, key: Key, value: i64) { - let _ = self.gauge.insert(key, value); - } - - fn record_histogram(&mut self, key: Key, values: &[u64]) { - let _ = self.histogram.insert(key, values.to_vec()); - } - } - - #[test] - fn test_snapshot_recorder() { - let key = Key::from_name("ok"); - let mut measurements = Vec::new(); - measurements.push((key.clone(), ValueSnapshot::Counter(7))); - measurements.push((key.clone(), ValueSnapshot::Gauge(42))); - - let hvalues = vec![10, 25, 42, 97]; - let mut stream = StreamingIntegers::new(); - stream.compress(&hvalues); - measurements.push((key.clone(), ValueSnapshot::Histogram(stream))); - - let snapshot = Snapshot::new(measurements); - - let mut recorder = MockRecorder::default(); - snapshot.record(&mut recorder); - - assert_eq!(recorder.get_counter_value(&key), Some(&7)); - assert_eq!(recorder.get_gauge_value(&key), Some(&42)); - - let hsum = recorder.get_histogram_values(&key).map(|x| x.iter().sum()); - assert_eq!(hsum, Some(174)); - } -} diff --git a/metrics-runtime/src/lib.rs b/metrics-runtime/src/lib.rs index b5410f7..d1d3076 100644 --- a/metrics-runtime/src/lib.rs +++ b/metrics-runtime/src/lib.rs @@ -172,7 +172,7 @@ //! Naturally, we need a way to get the metrics out of the system, which is where snapshots come //! into play. By utilizing a [`Controller`], we can take a snapshot of the current metrics in the //! registry, and then output them to any desired system/interface by utilizing -//! [`Recorder`](metrics_core::Recorder). A number of pre-baked recorders (which only concern +//! [`Observer`](metrics_core::Observer). A number of pre-baked observers (which only concern //! themselves with formatting the data) and exporters (which take the formatted data and either //! serve it up, such as exposing an HTTP endpoint, or write it somewhere, like stdout) are //! available, some of which are exposed by this crate. @@ -181,7 +181,9 @@ //! `log!`: //! ```rust //! # extern crate metrics_runtime; -//! use metrics_runtime::{Receiver, recorders::TextBuilder, exporters::LogExporter}; +//! use metrics_runtime::{ +//! Receiver, observers::TextBuilder, exporters::LogExporter, +//! }; //! use log::Level; //! use std::{thread, time::Duration}; //! let receiver = Receiver::builder().build().expect("failed to create receiver"); @@ -202,14 +204,19 @@ //! let num_rows = 46; //! sink.record_value("db.queries.select_products_num_rows", num_rows); //! -//! // Now create our exporter/recorder configuration, and wire it up. -//! let exporter = LogExporter::new(receiver.get_controller(), TextBuilder::new(), Level::Info); +//! // Now create our exporter/observer configuration, and wire it up. +//! let exporter = LogExporter::new( +//! receiver.get_controller(), +//! TextBuilder::new(), +//! Level::Info, +//! Duration::from_secs(5), +//! ); //! //! // This exporter will now run every 5 seconds, taking a snapshot, rendering it, and writing it //! // via `log!` at the informational level. This particular exporter is running directly on the //! // current thread, and not on a background thread. //! // -//! // exporter.run(Duration::from_secs(5)); +//! // exporter.run(); //! ``` //! Most exporters have the ability to run on the current thread or to be converted into a future //! which can be spawned on any Tokio-compatible runtime. @@ -231,7 +238,7 @@ //! ``` //! //! [metrics_core]: https://docs.rs/metrics-core -//! [`Recorder`]: https://docs.rs/metrics-core/0.3.1/metrics_core/trait.Recorder.html +//! [`Observer`]: https://docs.rs/metrics-core/0.3.1/metrics_core/trait.Observer.html #![deny(missing_docs)] #![warn(unused_extern_crates)] mod builder; @@ -248,15 +255,15 @@ mod sink; pub mod exporters; #[cfg(any( - feature = "metrics-recorder-text", - feature = "metrics-recorder-prometheus" + feature = "metrics-observer-text", + feature = "metrics-observer-prometheus" ))] -pub mod recorders; +pub mod observers; pub use self::{ builder::{Builder, BuilderError}, common::{Delta, Scope}, - control::{Controller, SnapshotError}, + control::Controller, receiver::Receiver, sink::{AsScoped, Sink, SinkError}, }; diff --git a/metrics-runtime/src/observers.rs b/metrics-runtime/src/observers.rs new file mode 100644 index 0000000..20c1e9c --- /dev/null +++ b/metrics-runtime/src/observers.rs @@ -0,0 +1,8 @@ +//! Commonly used observers. +//! +//! Observers define the format of the metric output: text, JSON, etc. +#[cfg(feature = "metrics-observer-text")] +pub use metrics_observer_text::TextBuilder; + +#[cfg(feature = "metrics-observer-prometheus")] +pub use metrics_observer_prometheus::PrometheusBuilder; diff --git a/metrics-runtime/src/receiver.rs b/metrics-runtime/src/receiver.rs index 4f284bf..54dfe54 100644 --- a/metrics-runtime/src/receiver.rs +++ b/metrics-runtime/src/receiver.rs @@ -9,8 +9,7 @@ use crate::{ use metrics::Recorder; use metrics_core::Key; use quanta::{Builder as UpkeepBuilder, Clock, Handle as UpkeepHandle}; -use std::cell::RefCell; -use std::sync::Arc; +use std::{cell::RefCell, sync::Arc}; thread_local! { static SINK: RefCell> = RefCell::new(None); diff --git a/metrics-runtime/src/recorders.rs b/metrics-runtime/src/recorders.rs deleted file mode 100644 index 4d1eb01..0000000 --- a/metrics-runtime/src/recorders.rs +++ /dev/null @@ -1,8 +0,0 @@ -//! Commonly used recorders. -//! -//! Recorders define the format of the metric output: text, JSON, etc. -#[cfg(feature = "metrics-recorder-text")] -pub use metrics_recorder_text::TextBuilder; - -#[cfg(feature = "metrics-recorder-prometheus")] -pub use metrics_recorder_prometheus::PrometheusBuilder; diff --git a/metrics-runtime/src/registry/metric.rs b/metrics-runtime/src/registry/metric.rs index 9786e59..b13dea3 100644 --- a/metrics-runtime/src/registry/metric.rs +++ b/metrics-runtime/src/registry/metric.rs @@ -1,9 +1,10 @@ -use crate::common::{Identifier, Kind, ValueHandle}; +use crate::common::{Identifier, Kind, ValueHandle, ValueSnapshot}; use crate::config::Configuration; use crate::data::Snapshot; use crate::registry::ScopeRegistry; use arc_swap::{ptr_eq, ArcSwap}; use im::hashmap::HashMap; +use metrics_core::Observer; use quanta::Clock; use std::ops::Deref; use std::sync::Arc; @@ -63,7 +64,7 @@ impl MetricRegistry { } } - pub fn get_snapshot(&self) -> Snapshot { + pub fn snapshot(&self) -> Snapshot { let mut named_values = Vec::new(); let metrics = self.metrics.load().deref().clone(); @@ -78,4 +79,21 @@ impl MetricRegistry { Snapshot::new(named_values) } + + pub fn observe(&self, observer: &mut O) { + let metrics = self.metrics.load().deref().clone(); + for (id, value) in metrics.into_iter() { + let (key, scope_handle, _) = id.into_parts(); + let scope = self.scope_registry.get(scope_handle); + let key = key.map_name(|name| scope.into_scoped(name)); + + match value.snapshot() { + ValueSnapshot::Counter(value) => observer.observe_counter(key, value), + ValueSnapshot::Gauge(value) => observer.observe_gauge(key, value), + ValueSnapshot::Histogram(stream) => stream.decompress_with(|values| { + observer.observe_histogram(key.clone(), values); + }), + } + } + } } diff --git a/metrics-runtime/src/sink.rs b/metrics-runtime/src/sink.rs index 95e7ee6..1ab43d3 100644 --- a/metrics-runtime/src/sink.rs +++ b/metrics-runtime/src/sink.rs @@ -6,9 +6,7 @@ use crate::{ use hashbrown::HashMap; use metrics_core::{IntoLabels, Key, Label, ScopedString}; use quanta::Clock; -use std::error::Error; -use std::fmt; -use std::sync::Arc; +use std::{error::Error, fmt, sync::Arc}; /// Errors during sink creation. #[derive(Debug, Clone)] diff --git a/metrics-util/src/bucket.rs b/metrics-util/src/bucket.rs index 8b644b7..1bc86b0 100644 --- a/metrics-util/src/bucket.rs +++ b/metrics-util/src/bucket.rs @@ -1,7 +1,9 @@ use crossbeam_epoch::{pin as epoch_pin, Atomic, Guard, Owned, Shared}; -use std::cell::UnsafeCell; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::{mem, slice}; +use std::{ + cell::UnsafeCell, + mem, slice, + sync::atomic::{AtomicUsize, Ordering}, +}; const BLOCK_SIZE: usize = 128; @@ -111,9 +113,7 @@ pub struct AtomicBucket { impl AtomicBucket { /// Creates a new, empty bucket. pub fn new() -> Self { - AtomicBucket { - tail: Atomic::null(), - } + Self::default() } /// Pushes an element into the bucket. @@ -235,21 +235,28 @@ impl AtomicBucket { // will see it as empty until another write proceeds. let guard = &epoch_pin(); let tail = self.tail.load(Ordering::Acquire, guard); - if !tail.is_null() { - if self + if !tail.is_null() + && self .tail .compare_and_set(tail, Shared::null(), Ordering::SeqCst, guard) .is_ok() - { - // We won the swap to delete the tail node. Now configure a deferred drop to clean - // things up once nobody else is using it. - unsafe { - // Drop the block, which will cause a cascading drop on the next block, and - // so on and so forth, until all blocks linked to this one are dropped. - guard.defer_destroy(tail); - } - guard.flush(); + { + // We won the swap to delete the tail node. Now configure a deferred drop to clean + // things up once nobody else is using it. + unsafe { + // Drop the block, which will cause a cascading drop on the next block, and + // so on and so forth, until all blocks linked to this one are dropped. + guard.defer_destroy(tail); } + guard.flush(); + } + } +} + +impl Default for AtomicBucket { + fn default() -> Self { + Self { + tail: Atomic::null(), } } } diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index 5fba05e..10ed8eb 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -149,8 +149,10 @@ use metrics_core::AsNanoseconds; pub use metrics_core::{labels, Key, Label}; #[cfg(feature = "std")] use std::error; -use std::fmt; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{ + fmt, + sync::atomic::{AtomicUsize, Ordering}, +}; #[macro_use] mod macros;