Merge pull request #8 from metrics-rs/tobz/facade-modules

Add snapshot traits and expose exporters/recorders from metrics.
This commit is contained in:
Toby Lawrence 2019-04-29 22:17:29 -04:00 committed by GitHub
commit f699b5ad04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 183 additions and 78 deletions

View File

@ -15,3 +15,6 @@ documentation = "https://docs.rs/metrics-core"
readme = "README.md"
keywords = ["metrics", "interface", "common"]
[dependencies]
futures = "^0.1"

View File

@ -32,16 +32,18 @@
//!
//! Histograms are a convenient way to measure behavior not only at the median, but at the edges of
//! normal operating behavior.
use std::fmt::Display;
use futures::future::Future;
/// A value that records metrics.
pub trait MetricsRecorder {
pub trait Recorder {
/// Records a counter.
///
/// From the perspective of an recorder, a counter and gauge are essentially identical, insofar
/// as they are both a single value tied to a key. From the perspective of a collector,
/// counters and gauges usually have slightly different modes of operation.
///
/// For the sake of flexibility on the exportr side, both are provided.
/// For the sake of flexibility on the exporter side, both are provided.
fn record_counter<K: AsRef<str>>(&mut self, key: K, value: u64);
/// Records a gauge.
@ -50,7 +52,7 @@ pub trait MetricsRecorder {
/// as they are both a single value tied to a key. From the perspective of a collector,
/// counters and gauges usually have slightly different modes of operation.
///
/// For the sake of flexibility on the exportr side, both are provided.
/// For the sake of flexibility on the exporter side, both are provided.
fn record_gauge<K: AsRef<str>>(&mut self, key: K, value: i64);
/// Records a histogram.
@ -59,3 +61,28 @@ pub trait MetricsRecorder {
/// of the underlying observed values, and callers will need to process them accordingly.
fn record_histogram<K: AsRef<str>>(&mut self, key: K, values: &[u64]);
}
/// A value that holds a point-in-time view of collected metrics.
pub trait Snapshot {
/// Records the snapshot to the given recorder.
fn record<R: Recorder>(&self, recorder: &mut R);
}
/// A value that can provide on-demand snapshots.
pub trait SnapshotProvider {
type Snapshot: Snapshot;
type SnapshotError;
/// Gets a snapshot.
fn get_snapshot(&self) -> Result<Self::Snapshot, Self::SnapshotError>;
}
/// A value that can provide on-demand snapshots asynchronously.
pub trait AsyncSnapshotProvider {
type Snapshot: Snapshot;
type SnapshotError;
type SnapshotFuture: Future<Item = Self::Snapshot, Error = Self::SnapshotError>;
/// Gets a snapshot asynchronously.
fn get_snapshot_async(&self) -> Self::SnapshotFuture;
}

View File

@ -14,7 +14,6 @@ documentation = "https://docs.rs/metrics-exporter-log"
[dependencies]
metrics-core = { path = "../metrics-core", version = "^0.2" }
metrics = { path = "../metrics", version = "^0.9" }
log = "^0.4"
futures = "^0.1"
tokio-timer = "^0.2"

View File

@ -7,35 +7,34 @@
//! # Run Modes
//! - `run` can be used to block the current thread, taking snapshots and exporting them on an
//! interval
//! - `turn` can be used to take a single snapshot and log it
//! - `into_future` will return a [`Future`] that when driven will take a snapshot on the
//! configured interval and log it
#[macro_use]
extern crate log;
use std::error::Error;
use std::thread;
use std::time::Duration;
use metrics::Controller;
use metrics_core::MetricsRecorder;
use metrics_core::{Recorder, Snapshot, SnapshotProvider, AsyncSnapshotProvider};
use log::Level;
use futures::prelude::*;
use tokio_timer::Interval;
/// Exports metrics by converting them to a textual representation and logging them.
pub struct LogExporter<R> {
controller: Controller,
pub struct LogExporter<C, R> {
controller: C,
recorder: R,
level: Level,
}
impl<R> LogExporter<R>
impl<C, R> LogExporter<C, R>
where
R: MetricsRecorder + Clone + Into<String>
R: Recorder + Clone + Into<String>,
{
/// Creates a new [`LogExporter`] that logs at the configurable level.
///
/// Recorders expose their output by being converted into strings.
pub fn new(controller: Controller, recorder: R, level: Level) -> Self {
pub fn new(controller: C, recorder: R, level: Level) -> Self {
LogExporter {
controller,
recorder,
@ -44,7 +43,11 @@ where
}
/// Runs this exporter on the current thread, logging output on the given interval.
pub fn run(&mut self, interval: Duration) {
pub fn run(&mut self, interval: Duration)
where
C: SnapshotProvider,
C::SnapshotError: Error,
{
loop {
thread::sleep(interval);
@ -53,12 +56,28 @@ where
}
/// Run this exporter, logging output only once.
pub fn turn(&self) {
run_once(&self.controller, self.recorder.clone(), self.level);
pub fn turn(&self)
where
C: SnapshotProvider,
C::SnapshotError: Error,
{
match self.controller.get_snapshot() {
Ok(snapshot) => {
let mut recorder = self.recorder.clone();
snapshot.record(&mut recorder);
let output = recorder.into();
log!(self.level, "{}", output);
},
Err(e) => log!(Level::Error, "failed to get snapshot: {}", e),
}
}
/// Converts this exporter into a future which logs output on the given interval.
pub fn into_future(self, interval: Duration) -> impl Future<Item = (), Error = ()> {
pub fn into_future(self, interval: Duration) -> impl Future<Item = (), Error = ()>
where
C: AsyncSnapshotProvider,
C::SnapshotError: Error,
{
let controller = self.controller;
let recorder = self.recorder;
let level = self.level;
@ -66,23 +85,16 @@ where
Interval::new_interval(interval)
.map_err(|_| ())
.for_each(move |_| {
let recorder = recorder.clone();
run_once(&controller, recorder, level);
Ok(())
let mut recorder = recorder.clone();
controller.get_snapshot_async()
.and_then(move |snapshot| {
snapshot.record(&mut recorder);
let output = recorder.into();
log!(level, "{}", output);
Ok(())
})
.map_err(|e| error!("failed to get snapshot: {}", e))
})
}
}
fn run_once<R>(controller: &Controller, mut recorder: R, level: Level)
where
R: MetricsRecorder + Into<String>
{
match controller.get_snapshot() {
Ok(snapshot) => {
snapshot.record(&mut recorder);
let output = recorder.into();
log!(level, "{}", output);
},
Err(e) => log!(Level::Error, "failed to capture snapshot: {}", e),
}
}

View File

@ -1,7 +1,7 @@
//! Records metrics in the Prometheus exposition format.
use std::time::SystemTime;
use hdrhistogram::Histogram;
use metrics_core::MetricsRecorder;
use metrics_core::Recorder;
use metrics_util::{Quantile, parse_quantiles};
/// Records metrics in the Prometheus exposition format.
@ -30,7 +30,7 @@ impl PrometheusRecorder {
}
}
impl MetricsRecorder for PrometheusRecorder {
impl Recorder for PrometheusRecorder {
fn record_counter<K: AsRef<str>>(&mut self, key: K, value: u64) {
let label = key.as_ref().replace('.', "_");
self.output.push_str("\n# TYPE ");

View File

@ -45,7 +45,7 @@
use std::collections::{HashMap, VecDeque};
use std::fmt::Display;
use hdrhistogram::Histogram;
use metrics_core::MetricsRecorder;
use metrics_core::Recorder;
use metrics_util::{Quantile, parse_quantiles};
/// Records metrics in a hierarchical, text-based format.
@ -75,7 +75,7 @@ impl TextRecorder {
}
}
impl MetricsRecorder for TextRecorder {
impl Recorder for TextRecorder {
fn record_counter<K: AsRef<str>>(&mut self, key: K, value: u64) {
let (name_parts, name) = name_to_parts(key.as_ref());
let mut values = single_value_to_values(name, value);

View File

@ -21,6 +21,11 @@ debug = true
opt-level = 3
lto = true
[features]
default = ["exporters", "recorders"]
exporters = ["metrics-exporter-log"]
recorders = ["metrics-recorder-text", "metrics-recorder-prometheus"]
[dependencies]
metrics-core = { path = "../metrics-core", version = "^0.2" }
crossbeam-channel = "^0.3"
@ -28,7 +33,11 @@ parking_lot = "^0.7"
fnv = "^1.0"
hashbrown = "^0.1"
quanta = "^0.2"
futures = "^0.1"
tokio-sync = "^0.1"
metrics-exporter-log = { path = "../metrics-exporter-log", version = "^0.1", optional = true }
metrics-recorder-text = { path = "../metrics-recorder-text", version = "^0.1", optional = true }
metrics-recorder-prometheus = { path = "../metrics-recorder-prometheus", version = "^0.1", optional = true }
[dev-dependencies]
log = "^0.4"

View File

@ -9,6 +9,7 @@ extern crate metrics_core;
use getopts::Options;
use hdrhistogram::Histogram;
use metrics::{snapshot::TypedMeasurement, Receiver, Sink};
use metrics_core::SnapshotProvider;
use std::{
env,
sync::{
@ -45,13 +46,13 @@ impl Generator {
}
self.gauge += 1;
let t1 = self.stats.clock().now();
let t1 = self.stats.now();
if let Some(t0) = self.t0 {
let start = self.stats.clock().now();
let start = self.stats.now();
let _ = self.stats.record_timing("ok", t0, t1);
let _ = self.stats.record_gauge("total", self.gauge);
let delta = self.stats.clock().now() - start;
let delta = self.stats.now() - start;
self.hist.saturating_record(delta);
}
self.t0 = Some(t1);

View File

@ -1,10 +1,13 @@
use super::data::snapshot::Snapshot;
use crossbeam_channel::{bounded, Sender};
use std::error::Error;
use std::fmt;
use metrics_core::{SnapshotProvider, AsyncSnapshotProvider};
use futures::prelude::*;
use tokio_sync::oneshot;
/// Error conditions when retrieving a snapshot.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum SnapshotError {
/// There was an internal error when trying to collect a snapshot.
InternalError,
@ -13,6 +16,22 @@ pub enum SnapshotError {
ReceiverShutdown,
}
impl Error for SnapshotError {
}
impl fmt::Display for SnapshotError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SnapshotError::InternalError => {
write!(f, "internal error while collecting snapshot")
},
SnapshotError::ReceiverShutdown => {
write!(f, "receiver is shutdown")
},
}
}
}
/// Various control actions performed by a controller.
pub(crate) enum ControlFrame {
/// Takes a snapshot of the current metric state.
@ -35,9 +54,14 @@ impl Controller {
pub(crate) fn new(control_tx: Sender<ControlFrame>) -> Controller {
Controller { control_tx }
}
}
/// Retrieves a snapshot of the current metric state.
pub fn get_snapshot(&self) -> Result<Snapshot, SnapshotError> {
impl SnapshotProvider for Controller {
type Snapshot = Snapshot;
type SnapshotError = SnapshotError;
/// Gets a snapshot.
fn get_snapshot(&self) -> Result<Snapshot, SnapshotError> {
let (tx, rx) = bounded(0);
let msg = ControlFrame::Snapshot(tx);
@ -46,24 +70,39 @@ impl Controller {
.map_err(|_| SnapshotError::ReceiverShutdown)
.and_then(move |_| rx.recv().map_err(|_| SnapshotError::InternalError))
}
}
/// Retrieves a snapshot of the current metric state asynchronously.
pub fn get_snapshot_async(&self) -> Result<oneshot::Receiver<Snapshot>, SnapshotError> {
impl AsyncSnapshotProvider for Controller {
type Snapshot = Snapshot;
type SnapshotError = SnapshotError;
type SnapshotFuture = SnapshotFuture;
/// Gets a snapshot asynchronously.
fn get_snapshot_async(&self) -> Self::SnapshotFuture {
let (tx, rx) = oneshot::channel();
let msg = ControlFrame::SnapshotAsync(tx);
self.control_tx
.send(msg)
.map_err(|_| SnapshotError::ReceiverShutdown)
.map(move |_| rx)
.map(move |_| SnapshotFuture::Waiting(rx))
.unwrap_or(SnapshotFuture::Errored(SnapshotError::ReceiverShutdown))
}
}
impl fmt::Display for SnapshotError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
/// A future representing collecting a snapshot.
pub enum SnapshotFuture {
Waiting(oneshot::Receiver<Snapshot>),
Errored(SnapshotError),
}
impl Future for SnapshotFuture {
type Item = Snapshot;
type Error = SnapshotError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
SnapshotError::InternalError => write!(f, "internal error during snapshot generation"),
SnapshotError::ReceiverShutdown => write!(f, "the receiver is not currently running"),
SnapshotFuture::Waiting(rx) => rx.poll().map_err(|_| SnapshotError::InternalError),
SnapshotFuture::Errored(err) => Err(err.clone()),
}
}
}

View File

@ -1,5 +1,5 @@
use super::histogram::HistogramSnapshot;
use metrics_core::MetricsRecorder;
use metrics_core::{Snapshot as MetricsSnapshot, Recorder};
use std::fmt::Display;
/// A typed metric measurement, used in snapshots.
@ -62,8 +62,15 @@ impl Snapshot {
.push(TypedMeasurement::ValueHistogram(key.to_string(), h));
}
/// Records this [`Snapshot`] to the provided [`MetricsRecorder`].
pub fn record<R: MetricsRecorder>(&self, recorder: &mut R) {
/// Converts this [`Snapshot`] to the underlying vector of measurements.
pub fn into_measurements(self) -> Vec<TypedMeasurement> {
self.measurements
}
}
impl MetricsSnapshot for Snapshot {
/// Records the snapshot to the given recorder.
fn record<R: Recorder>(&self, recorder: &mut R) {
for measurement in &self.measurements {
match measurement {
TypedMeasurement::Counter(key, value) => recorder.record_counter(key, *value),
@ -77,23 +84,18 @@ impl Snapshot {
}
}
}
/// Converts this [`Snapshot`] to the underlying vector of measurements.
pub fn into_measurements(self) -> Vec<TypedMeasurement> {
self.measurements
}
}
#[cfg(test)]
mod tests {
use super::{HistogramSnapshot, MetricsRecorder, Snapshot, TypedMeasurement};
use super::{HistogramSnapshot, MetricsSnapshot, Recorder, Snapshot, TypedMeasurement};
use std::collections::HashMap;
#[derive(Default)]
struct MockRecorder {
counter: HashMap<String, u64>,
gauge: HashMap<String, i64>,
histogram: HashMap<String, u64>,
histogram: HashMap<String, Vec<u64>>,
}
impl MockRecorder {
@ -105,25 +107,22 @@ mod tests {
self.gauge.get(key)
}
pub fn get_histogram_value(&self, key: &String) -> Option<&u64> {
pub fn get_histogram_values(&self, key: &String) -> Option<&Vec<u64>> {
self.histogram.get(key)
}
}
impl MetricsRecorder for MockRecorder {
impl Recorder for MockRecorder {
fn record_counter<K: AsRef<str>>(&mut self, key: K, value: u64) {
let entry = self.counter.entry(key.as_ref().to_owned()).or_insert(0);
*entry += value;
let _ = self.counter.insert(key.as_ref().to_owned(), value);
}
fn record_gauge<K: AsRef<str>>(&mut self, key: K, value: i64) {
let entry = self.gauge.entry(key.as_ref().to_owned()).or_insert(0);
*entry += value;
let _ = self.gauge.insert(key.as_ref().to_owned(), value);
}
fn record_histogram<K: AsRef<str>>(&mut self, key: K, value: u64) {
let entry = self.histogram.entry(key.as_ref().to_owned()).or_insert(0);
*entry += value;
fn record_histogram<K: AsRef<str>>(&mut self, key: K, values: &[u64]) {
let _ = self.histogram.insert(key.as_ref().to_owned(), values.to_vec());
}
}
@ -152,10 +151,12 @@ mod tests {
snapshot.set_timing_histogram(key.clone(), histogram);
let mut recorder = MockRecorder::default();
snapshot.export(&mut recorder);
snapshot.record(&mut recorder);
assert_eq!(recorder.get_counter_value(&key), Some(&7));
assert_eq!(recorder.get_gauge_value(&key), Some(&42));
assert_eq!(recorder.get_histogram_value(&key), Some(&174));
let hsum = recorder.get_histogram_values(&key).map(|x| x.iter().sum());
assert_eq!(hsum, Some(174));
}
}

2
metrics/src/exporters.rs Normal file
View File

@ -0,0 +1,2 @@
#[cfg(feature = "metrics-exporter-log")]
pub use metrics_exporter_log::LogExporter;

View File

@ -50,13 +50,14 @@
//! // to, so you need to track the overall value on your own.
//! sink.record_gauge("red_balloons", 99);
//!
//! // We can update a timing histogram. For timing, you also must measure the start and end
//! // time using the built-in `Clock` exposed by the sink. The receiver internally converts the
//! // raw values to calculate the actual wall clock time (in nanoseconds) on your behalf, so you
//! // can't just pass in any old number.. otherwise you'll get erroneous measurements!
//! let start = sink.clock().start();
//! // We can update a timing histogram. For timing, we're using the built-in `Sink::now` method
//! // which utilizes a high-speed internal clock. This method returns the time in nanoseconds, so
//! // we get great resolution, but giving the time in nanoseconds isn't required! If you want to
//! // send it in another unit, that's fine, but just pay attention to that fact when viewing and
//! // using those metrics once exported.
//! let start = sink.now();
//! thread::sleep(Duration::from_millis(10));
//! let end = sink.clock().end();
//! let end = sink.now();
//! sink.record_timing("db.gizmo_query", start, end);
//!
//! // Finally, we can update a value histogram. Technically speaking, value histograms aren't
@ -120,6 +121,12 @@ mod receiver;
mod scopes;
mod sink;
#[cfg(any(feature = "metrics-exporter-log"))]
pub mod exporters;
#[cfg(any(feature = "metrics-recorder-text", feature = "metrics-recorder-prometheus"))]
pub mod recorders;
pub use self::{
configuration::Configuration,
control::{Controller, SnapshotError},

5
metrics/src/recorders.rs Normal file
View File

@ -0,0 +1,5 @@
#[cfg(feature = "metrics-recorder-text")]
pub use metrics_recorder_text::TextRecorder;
#[cfg(feature = "metrics-recorder-prometheus")]
pub use metrics_recorder_prometheus::PrometheusRecorder;