Add snapshot traits and expose exporters/recorders from metrics.
We now expose all exporters and recorders via facade modules in the metrics crate, called metrics::exporters and metrics::recorders, respectively. This means that the metrics crate itself has these are optional dependencies, which are included by the default set of features, and so can be turned off by consumers. To curtail the issue of cyclical dependencies, we've also introduced three new traits: MetricsSnapshot, SnapshotProvider, and AsyncSnapshotProvider. These traits let us represent metrics::Controller and metrics::data::snapshot::Snapshot in the exporter, allowing us to get around the cyclical dependency but also expose more flexibility and modularity.
This commit is contained in:
parent
6c9ae9c85b
commit
55d1708e10
|
@ -15,3 +15,6 @@ documentation = "https://docs.rs/metrics-core"
|
|||
readme = "README.md"
|
||||
|
||||
keywords = ["metrics", "interface", "common"]
|
||||
|
||||
[dependencies]
|
||||
futures = "^0.1"
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
//!
|
||||
//! Histograms are a convenient way to measure behavior not only at the median, but at the edges of
|
||||
//! normal operating behavior.
|
||||
use futures::future::Future;
|
||||
|
||||
/// A value that records metrics.
|
||||
pub trait MetricsRecorder {
|
||||
|
@ -41,7 +42,7 @@ pub trait MetricsRecorder {
|
|||
/// as they are both a single value tied to a key. From the perspective of a collector,
|
||||
/// counters and gauges usually have slightly different modes of operation.
|
||||
///
|
||||
/// For the sake of flexibility on the exportr side, both are provided.
|
||||
/// For the sake of flexibility on the exporter side, both are provided.
|
||||
fn record_counter<K: AsRef<str>>(&mut self, key: K, value: u64);
|
||||
|
||||
/// Records a gauge.
|
||||
|
@ -50,7 +51,7 @@ pub trait MetricsRecorder {
|
|||
/// as they are both a single value tied to a key. From the perspective of a collector,
|
||||
/// counters and gauges usually have slightly different modes of operation.
|
||||
///
|
||||
/// For the sake of flexibility on the exportr side, both are provided.
|
||||
/// For the sake of flexibility on the exporter side, both are provided.
|
||||
fn record_gauge<K: AsRef<str>>(&mut self, key: K, value: i64);
|
||||
|
||||
/// Records a histogram.
|
||||
|
@ -59,3 +60,28 @@ pub trait MetricsRecorder {
|
|||
/// of the underlying observed values, and callers will need to process them accordingly.
|
||||
fn record_histogram<K: AsRef<str>>(&mut self, key: K, values: &[u64]);
|
||||
}
|
||||
|
||||
/// A value that holds a point-in-time view of collected metrics.
|
||||
pub trait MetricsSnapshot {
|
||||
/// Records the snapshot to the given recorder.
|
||||
fn record<R: MetricsRecorder>(&self, recorder: &mut R);
|
||||
}
|
||||
|
||||
/// A value that can provide on-demand snapshots.
|
||||
pub trait SnapshotProvider {
|
||||
type Snapshot;
|
||||
type SnapshotError;
|
||||
|
||||
/// Gets a snapshot.
|
||||
fn get_snapshot(&self) -> Result<Self::Snapshot, Self::SnapshotError>;
|
||||
}
|
||||
|
||||
/// A value that can provide on-demand snapshots asynchronously.
|
||||
pub trait AsyncSnapshotProvider {
|
||||
type Snapshot;
|
||||
type SnapshotError;
|
||||
type SnapshotFuture: Future<Item = Self::Snapshot, Error = Self::SnapshotError>;
|
||||
|
||||
/// Gets a snapshot asynchronously.
|
||||
fn get_snapshot_async(&self) -> Self::SnapshotFuture;
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ documentation = "https://docs.rs/metrics-exporter-log"
|
|||
|
||||
[dependencies]
|
||||
metrics-core = { path = "../metrics-core", version = "^0.2" }
|
||||
metrics = { path = "../metrics", version = "^0.9" }
|
||||
log = "^0.4"
|
||||
futures = "^0.1"
|
||||
tokio-timer = "^0.2"
|
||||
|
|
|
@ -7,35 +7,39 @@
|
|||
//! # Run Modes
|
||||
//! - `run` can be used to block the current thread, taking snapshots and exporting them on an
|
||||
//! interval
|
||||
//! - `turn` can be used to take a single snapshot and log it
|
||||
//! - `into_future` will return a [`Future`] that when driven will take a snapshot on the
|
||||
//! configured interval and log it
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
use std::fmt::Display;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use metrics::Controller;
|
||||
use metrics_core::MetricsRecorder;
|
||||
use metrics_core::{MetricsRecorder, MetricsSnapshot, SnapshotProvider, AsyncSnapshotProvider};
|
||||
use log::Level;
|
||||
use futures::prelude::*;
|
||||
use tokio_timer::Interval;
|
||||
|
||||
/// Exports metrics by converting them to a textual representation and logging them.
|
||||
pub struct LogExporter<R> {
|
||||
controller: Controller,
|
||||
pub struct LogExporter<C, R> {
|
||||
controller: C,
|
||||
recorder: R,
|
||||
level: Level,
|
||||
}
|
||||
|
||||
impl<R> LogExporter<R>
|
||||
impl<C, R> LogExporter<C, R>
|
||||
where
|
||||
R: MetricsRecorder + Clone + Into<String>
|
||||
C: SnapshotProvider + AsyncSnapshotProvider,
|
||||
<C as SnapshotProvider>::Snapshot: MetricsSnapshot,
|
||||
<C as SnapshotProvider>::SnapshotError: Display,
|
||||
<C as AsyncSnapshotProvider>::Snapshot: MetricsSnapshot,
|
||||
<C as AsyncSnapshotProvider>::SnapshotError: Display,
|
||||
R: MetricsRecorder + Clone + Into<String>,
|
||||
{
|
||||
/// Creates a new [`LogExporter`] that logs at the configurable level.
|
||||
///
|
||||
/// Recorders expose their output by being converted into strings.
|
||||
pub fn new(controller: Controller, recorder: R, level: Level) -> Self {
|
||||
pub fn new(controller: C, recorder: R, level: Level) -> Self {
|
||||
LogExporter {
|
||||
controller,
|
||||
recorder,
|
||||
|
@ -54,7 +58,15 @@ where
|
|||
|
||||
/// Run this exporter, logging output only once.
|
||||
pub fn turn(&self) {
|
||||
run_once(&self.controller, self.recorder.clone(), self.level);
|
||||
match self.controller.get_snapshot() {
|
||||
Ok(snapshot) => {
|
||||
let mut recorder = self.recorder.clone();
|
||||
snapshot.record(&mut recorder);
|
||||
let output = recorder.into();
|
||||
log!(self.level, "{}", output);
|
||||
},
|
||||
Err(e) => log!(Level::Error, "failed to get snapshot: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts this exporter into a future which logs output on the given interval.
|
||||
|
@ -66,23 +78,16 @@ where
|
|||
Interval::new_interval(interval)
|
||||
.map_err(|_| ())
|
||||
.for_each(move |_| {
|
||||
let recorder = recorder.clone();
|
||||
run_once(&controller, recorder, level);
|
||||
Ok(())
|
||||
let mut recorder = recorder.clone();
|
||||
|
||||
controller.get_snapshot_async()
|
||||
.and_then(move |snapshot| {
|
||||
snapshot.record(&mut recorder);
|
||||
let output = recorder.into();
|
||||
log!(level, "{}", output);
|
||||
Ok(())
|
||||
})
|
||||
.map_err(|e| log!(Level::Error, "failed to get snapshot: {}", e))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn run_once<R>(controller: &Controller, mut recorder: R, level: Level)
|
||||
where
|
||||
R: MetricsRecorder + Into<String>
|
||||
{
|
||||
match controller.get_snapshot() {
|
||||
Ok(snapshot) => {
|
||||
snapshot.record(&mut recorder);
|
||||
let output = recorder.into();
|
||||
log!(level, "{}", output);
|
||||
},
|
||||
Err(e) => log!(Level::Error, "failed to capture snapshot: {}", e),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,11 @@ debug = true
|
|||
opt-level = 3
|
||||
lto = true
|
||||
|
||||
[features]
|
||||
default = ["exporters", "recorders"]
|
||||
exporters = ["metrics-exporter-log"]
|
||||
recorders = ["metrics-recorder-text", "metrics-recorder-prometheus"]
|
||||
|
||||
[dependencies]
|
||||
metrics-core = { path = "../metrics-core", version = "^0.2" }
|
||||
crossbeam-channel = "^0.3"
|
||||
|
@ -28,7 +33,11 @@ parking_lot = "^0.7"
|
|||
fnv = "^1.0"
|
||||
hashbrown = "^0.1"
|
||||
quanta = "^0.2"
|
||||
futures = "^0.1"
|
||||
tokio-sync = "^0.1"
|
||||
metrics-exporter-log = { path = "../metrics-exporter-log", version = "^0.1", optional = true }
|
||||
metrics-recorder-text = { path = "../metrics-recorder-text", version = "^0.1", optional = true }
|
||||
metrics-recorder-prometheus = { path = "../metrics-recorder-prometheus", version = "^0.1", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
log = "^0.4"
|
||||
|
|
|
@ -9,6 +9,7 @@ extern crate metrics_core;
|
|||
use getopts::Options;
|
||||
use hdrhistogram::Histogram;
|
||||
use metrics::{snapshot::TypedMeasurement, Receiver, Sink};
|
||||
use metrics_core::SnapshotProvider;
|
||||
use std::{
|
||||
env,
|
||||
sync::{
|
||||
|
@ -45,13 +46,13 @@ impl Generator {
|
|||
}
|
||||
|
||||
self.gauge += 1;
|
||||
let t1 = self.stats.clock().now();
|
||||
let t1 = self.stats.now();
|
||||
|
||||
if let Some(t0) = self.t0 {
|
||||
let start = self.stats.clock().now();
|
||||
let start = self.stats.now();
|
||||
let _ = self.stats.record_timing("ok", t0, t1);
|
||||
let _ = self.stats.record_gauge("total", self.gauge);
|
||||
let delta = self.stats.clock().now() - start;
|
||||
let delta = self.stats.now() - start;
|
||||
self.hist.saturating_record(delta);
|
||||
}
|
||||
self.t0 = Some(t1);
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
use super::data::snapshot::Snapshot;
|
||||
use crossbeam_channel::{bounded, Sender};
|
||||
use std::fmt;
|
||||
use metrics_core::{SnapshotProvider, AsyncSnapshotProvider};
|
||||
use futures::prelude::*;
|
||||
use tokio_sync::oneshot;
|
||||
|
||||
/// Error conditions when retrieving a snapshot.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum SnapshotError {
|
||||
/// There was an internal error when trying to collect a snapshot.
|
||||
InternalError,
|
||||
|
@ -35,9 +37,14 @@ impl Controller {
|
|||
pub(crate) fn new(control_tx: Sender<ControlFrame>) -> Controller {
|
||||
Controller { control_tx }
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieves a snapshot of the current metric state.
|
||||
pub fn get_snapshot(&self) -> Result<Snapshot, SnapshotError> {
|
||||
impl SnapshotProvider for Controller {
|
||||
type Snapshot = Snapshot;
|
||||
type SnapshotError = SnapshotError;
|
||||
|
||||
/// Gets a snapshot.
|
||||
fn get_snapshot(&self) -> Result<Snapshot, SnapshotError> {
|
||||
let (tx, rx) = bounded(0);
|
||||
let msg = ControlFrame::Snapshot(tx);
|
||||
|
||||
|
@ -46,16 +53,39 @@ impl Controller {
|
|||
.map_err(|_| SnapshotError::ReceiverShutdown)
|
||||
.and_then(move |_| rx.recv().map_err(|_| SnapshotError::InternalError))
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieves a snapshot of the current metric state asynchronously.
|
||||
pub fn get_snapshot_async(&self) -> Result<oneshot::Receiver<Snapshot>, SnapshotError> {
|
||||
impl AsyncSnapshotProvider for Controller {
|
||||
type Snapshot = Snapshot;
|
||||
type SnapshotError = SnapshotError;
|
||||
type SnapshotFuture = SnapshotFuture;
|
||||
|
||||
/// Gets a snapshot asynchronously.
|
||||
fn get_snapshot_async(&self) -> Self::SnapshotFuture {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let msg = ControlFrame::SnapshotAsync(tx);
|
||||
|
||||
self.control_tx
|
||||
.send(msg)
|
||||
.map_err(|_| SnapshotError::ReceiverShutdown)
|
||||
.map(move |_| rx)
|
||||
.map(move |_| SnapshotFuture::Waiting(rx))
|
||||
.unwrap_or(SnapshotFuture::Errored(SnapshotError::ReceiverShutdown))
|
||||
}
|
||||
}
|
||||
|
||||
pub enum SnapshotFuture {
|
||||
Waiting(oneshot::Receiver<Snapshot>),
|
||||
Errored(SnapshotError),
|
||||
}
|
||||
|
||||
impl Future for SnapshotFuture {
|
||||
type Item = Snapshot;
|
||||
type Error = SnapshotError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self {
|
||||
SnapshotFuture::Waiting(rx) => rx.poll().map_err(|_| SnapshotError::InternalError),
|
||||
SnapshotFuture::Errored(err) => Err(err.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use super::histogram::HistogramSnapshot;
|
||||
use metrics_core::MetricsRecorder;
|
||||
use metrics_core::{MetricsSnapshot, MetricsRecorder};
|
||||
use std::fmt::Display;
|
||||
|
||||
/// A typed metric measurement, used in snapshots.
|
||||
|
@ -62,8 +62,15 @@ impl Snapshot {
|
|||
.push(TypedMeasurement::ValueHistogram(key.to_string(), h));
|
||||
}
|
||||
|
||||
/// Records this [`Snapshot`] to the provided [`MetricsRecorder`].
|
||||
pub fn record<R: MetricsRecorder>(&self, recorder: &mut R) {
|
||||
/// Converts this [`Snapshot`] to the underlying vector of measurements.
|
||||
pub fn into_measurements(self) -> Vec<TypedMeasurement> {
|
||||
self.measurements
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricsSnapshot for Snapshot {
|
||||
/// Records the snapshot to the given recorder.
|
||||
fn record<R: MetricsRecorder>(&self, recorder: &mut R) {
|
||||
for measurement in &self.measurements {
|
||||
match measurement {
|
||||
TypedMeasurement::Counter(key, value) => recorder.record_counter(key, *value),
|
||||
|
@ -77,23 +84,19 @@ impl Snapshot {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts this [`Snapshot`] to the underlying vector of measurements.
|
||||
pub fn into_measurements(self) -> Vec<TypedMeasurement> {
|
||||
self.measurements
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{HistogramSnapshot, MetricsRecorder, Snapshot, TypedMeasurement};
|
||||
use std::collections::HashMap;
|
||||
use metrics_core::MetricsSnapshot;
|
||||
|
||||
#[derive(Default)]
|
||||
struct MockRecorder {
|
||||
counter: HashMap<String, u64>,
|
||||
gauge: HashMap<String, i64>,
|
||||
histogram: HashMap<String, u64>,
|
||||
histogram: HashMap<String, Vec<u64>>,
|
||||
}
|
||||
|
||||
impl MockRecorder {
|
||||
|
@ -105,25 +108,22 @@ mod tests {
|
|||
self.gauge.get(key)
|
||||
}
|
||||
|
||||
pub fn get_histogram_value(&self, key: &String) -> Option<&u64> {
|
||||
pub fn get_histogram_values(&self, key: &String) -> Option<&Vec<u64>> {
|
||||
self.histogram.get(key)
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricsRecorder for MockRecorder {
|
||||
fn record_counter<K: AsRef<str>>(&mut self, key: K, value: u64) {
|
||||
let entry = self.counter.entry(key.as_ref().to_owned()).or_insert(0);
|
||||
*entry += value;
|
||||
let _ = self.counter.insert(key.as_ref().to_owned(), value);
|
||||
}
|
||||
|
||||
fn record_gauge<K: AsRef<str>>(&mut self, key: K, value: i64) {
|
||||
let entry = self.gauge.entry(key.as_ref().to_owned()).or_insert(0);
|
||||
*entry += value;
|
||||
let _ = self.gauge.insert(key.as_ref().to_owned(), value);
|
||||
}
|
||||
|
||||
fn record_histogram<K: AsRef<str>>(&mut self, key: K, value: u64) {
|
||||
let entry = self.histogram.entry(key.as_ref().to_owned()).or_insert(0);
|
||||
*entry += value;
|
||||
fn record_histogram<K: AsRef<str>>(&mut self, key: K, values: &[u64]) {
|
||||
let _ = self.histogram.insert(key.as_ref().to_owned(), values.to_vec());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -152,10 +152,12 @@ mod tests {
|
|||
snapshot.set_timing_histogram(key.clone(), histogram);
|
||||
|
||||
let mut recorder = MockRecorder::default();
|
||||
snapshot.export(&mut recorder);
|
||||
snapshot.record(&mut recorder);
|
||||
|
||||
assert_eq!(recorder.get_counter_value(&key), Some(&7));
|
||||
assert_eq!(recorder.get_gauge_value(&key), Some(&42));
|
||||
assert_eq!(recorder.get_histogram_value(&key), Some(&174));
|
||||
|
||||
let hsum = recorder.get_histogram_values(&key).map(|x| x.iter().sum());
|
||||
assert_eq!(hsum, Some(174));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
#[cfg(feature = "metrics-exporter-log")]
|
||||
pub use metrics_exporter_log::LogExporter;
|
|
@ -50,13 +50,14 @@
|
|||
//! // to, so you need to track the overall value on your own.
|
||||
//! sink.record_gauge("red_balloons", 99);
|
||||
//!
|
||||
//! // We can update a timing histogram. For timing, you also must measure the start and end
|
||||
//! // time using the built-in `Clock` exposed by the sink. The receiver internally converts the
|
||||
//! // raw values to calculate the actual wall clock time (in nanoseconds) on your behalf, so you
|
||||
//! // can't just pass in any old number.. otherwise you'll get erroneous measurements!
|
||||
//! let start = sink.clock().start();
|
||||
//! // We can update a timing histogram. For timing, we're using the built-in `Sink::now` method
|
||||
//! // which utilizes a high-speed internal clock. This method returns the time in nanoseconds, so
|
||||
//! // we get great resolution, but giving the time in nanoseconds isn't required! If you want to
|
||||
//! // send it in another unit, that's fine, but just pay attention to that fact when viewing and
|
||||
//! // using those metrics once exported.
|
||||
//! let start = sink.now();
|
||||
//! thread::sleep(Duration::from_millis(10));
|
||||
//! let end = sink.clock().end();
|
||||
//! let end = sink.now();
|
||||
//! sink.record_timing("db.gizmo_query", start, end);
|
||||
//!
|
||||
//! // Finally, we can update a value histogram. Technically speaking, value histograms aren't
|
||||
|
@ -120,6 +121,12 @@ mod receiver;
|
|||
mod scopes;
|
||||
mod sink;
|
||||
|
||||
#[cfg(any(feature = "metrics-exporter-log"))]
|
||||
pub mod exporters;
|
||||
|
||||
#[cfg(any(feature = "metrics-recorder-text", feature = "metrics-recorder-prometheus"))]
|
||||
pub mod recorders;
|
||||
|
||||
pub use self::{
|
||||
configuration::Configuration,
|
||||
control::{Controller, SnapshotError},
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
#[cfg(feature = "metrics-recorder-text")]
|
||||
pub use metrics_recorder_text::TextRecorder;
|
||||
|
||||
#[cfg(feature = "metrics-recorder-prometheus")]
|
||||
pub use metrics_recorder_prometheus::PrometheusRecorder;
|
Loading…
Reference in New Issue