core: Recorder -> Observer (#35)

This commit is contained in:
jean-airoldie 2019-07-17 09:06:45 -04:00 committed by Toby Lawrence
parent ad72bc16c1
commit e614847de6
31 changed files with 258 additions and 414 deletions

View File

@ -6,6 +6,6 @@ members = [
"metrics-util",
"metrics-exporter-log",
"metrics-exporter-http",
"metrics-recorder-text",
"metrics-recorder-prometheus",
"metrics-observer-text",
"metrics-observer-prometheus",
]

View File

@ -33,11 +33,7 @@
//! Histograms are a convenient way to measure behavior not only at the median, but at the edges of
//! normal operating behavior.
#![deny(missing_docs)]
use futures::future::Future;
use std::borrow::Cow;
use std::fmt;
use std::slice::Iter;
use std::time::Duration;
use std::{borrow::Cow, fmt, slice::Iter, time::Duration};
/// An allocation-optimized string.
///
@ -149,16 +145,15 @@ impl Key {
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.labels.is_empty() {
true => write!(f, "Key({}", self.name),
false => {
let kv_pairs = self
.labels
.iter()
.map(|label| format!("{} = {}", label.0, label.1))
.collect::<Vec<_>>();
write!(f, "Key({}, [{}])", self.name, kv_pairs.join(", "))
}
if self.labels.is_empty() {
write!(f, "Key({}", self.name)
} else {
let kv_pairs = self
.labels
.iter()
.map(|label| format!("{} = {}", label.0, label.1))
.collect::<Vec<_>>();
write!(f, "Key({}, [{}])", self.name, kv_pairs.join(", "))
}
}
}
@ -255,80 +250,64 @@ impl AsNanoseconds for Duration {
}
}
/// A value that records metrics.
pub trait Recorder {
/// Records a counter.
/// A value that observes metrics.
pub trait Observer {
/// The method called when a counter is observed.
///
/// From the perspective of an recorder, a counter and gauge are essentially identical, insofar
/// From the perspective of an observer, a counter and gauge are essentially identical, insofar
/// as they are both a single value tied to a key. From the perspective of a collector,
/// counters and gauges usually have slightly different modes of operation.
///
/// For the sake of flexibility on the exporter side, both are provided.
fn record_counter(&mut self, key: Key, value: u64);
fn observe_counter(&mut self, key: Key, value: u64);
/// Records a gauge.
/// The method called when a gauge is observed.
///
/// From the perspective of a recorder, a counter and gauge are essentially identical, insofar
/// From the perspective of a observer, a counter and gauge are essentially identical, insofar
/// as they are both a single value tied to a key. From the perspective of a collector,
/// counters and gauges usually have slightly different modes of operation.
///
/// For the sake of flexibility on the exporter side, both are provided.
fn record_gauge(&mut self, key: Key, value: i64);
fn observe_gauge(&mut self, key: Key, value: i64);
/// Records a histogram.
/// The method called when an histogram is observed.
///
/// Recorders are expected to tally their own histogram views, so this will be called with all
/// Observers are expected to tally their own histogram views, so this will be called with all
/// of the underlying observed values, and callers will need to process them accordingly.
///
/// There is no guarantee that this method will not be called multiple times for the same key.
fn record_histogram(&mut self, key: Key, values: &[u64]);
fn observe_histogram(&mut self, key: Key, values: &[u64]);
}
/// A value that can build a recorder.
/// A value that can build an observer.
///
/// Recorders are intended to be single-use containers for rendering a snapshot in a particular
/// format. As many systems are multi-threaded, we can't easily share a single recorder amongst
/// multiple threads, and so we create a recorder per snapshot, tying them together.
/// Observers are containers used for rendering a snapshot in a particular format.
/// As many systems are multi-threaded, we can't easily share a single recorder amongst
/// multiple threads, and so we create a recorder per observation, tying them together.
///
/// A builder allows us to generate a recorder on demand, giving each specific recorder an
/// interface by which they can do any necessary configuration, initialization, etc of the recorder
/// before handing it over to the exporter.
/// A builder allows us to generate an observer on demand, giving each specific recorder an
/// interface by which they can do any necessary configuration, initialization, etc of the
/// observer before handing it over to the exporter.
pub trait Builder {
/// The recorder created by this builder.
type Output: Recorder;
/// The observer created by this builder.
type Output: Observer;
/// Creates a new recorder.
fn build(&self) -> Self::Output;
}
/// A value that holds a point-in-time view of collected metrics.
pub trait Snapshot {
/// Records the snapshot to the given recorder.
fn record<R: Recorder>(&self, recorder: &mut R);
/// A value that can produce a `T` by draining its content.
///
/// After being drained, the value should be ready to be reused.
pub trait Drain<T> {
/// Drain the `Observer`, producing a `T`.
fn drain(&mut self) -> T;
}
/// A value that can provide on-demand snapshots.
pub trait SnapshotProvider {
/// Snapshot given by the provider.
type Snapshot: Snapshot;
/// Errors produced during generation.
type SnapshotError;
/// Gets a snapshot.
fn get_snapshot(&self) -> Result<Self::Snapshot, Self::SnapshotError>;
}
/// A value that can provide on-demand snapshots asynchronously.
pub trait AsyncSnapshotProvider {
/// Snapshot given by the provider.
type Snapshot: Snapshot;
/// Errors produced during generation.
type SnapshotError;
/// The future response value.
type SnapshotFuture: Future<Item = Self::Snapshot, Error = Self::SnapshotError>;
/// Gets a snapshot asynchronously.
fn get_snapshot_async(&self) -> Self::SnapshotFuture;
/// A value whose metrics can be observed by an `Observer`.
pub trait Observe {
/// Observe point-in-time view of the collected metrics.
fn observe<O: Observer>(&self, observer: &mut O);
}
/// Helper macro for generating a set of labels.

View File

@ -1,7 +1,7 @@
//! Exports metrics over HTTP.
//!
//! This exporter can utilize recorders that are able to be converted to a textual representation
//! via [`Into`]. It will respond to any requests, regardless of the method or path.
//! This exporter can utilize observers that are able to be converted to a textual representation
//! via [`Drain<String>`]. It will respond to any requests, regardless of the method or path.
//!
//! # Run Modes
//! - `run` can be used to block the current thread, running the HTTP server on the configured
@ -13,13 +13,13 @@
extern crate log;
use hyper::rt::run as hyper_run;
use hyper::rt::Future;
use hyper::service::service_fn;
use hyper::{Body, Response, Server};
use metrics_core::{AsyncSnapshotProvider, Builder, Snapshot};
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use hyper::{
rt::Future,
service::service_fn_ok,
{Body, Response, Server},
};
use metrics_core::{Builder, Drain, Observe, Observer};
use std::{net::SocketAddr, sync::Arc};
/// Exports metrics over HTTP.
pub struct HttpExporter<C, B> {
@ -30,15 +30,13 @@ pub struct HttpExporter<C, B> {
impl<C, B> HttpExporter<C, B>
where
C: AsyncSnapshotProvider + Send + Sync + 'static,
C::SnapshotFuture: Send + Sync + 'static,
C::SnapshotError: Error + Send + Sync + 'static,
C: Observe + Send + Sync + 'static,
B: Builder + Send + Sync + 'static,
B::Output: Into<String>,
B::Output: Drain<String> + Observer,
{
/// Creates a new [`HttpExporter`] that listens on the given `address`.
///
/// Recorders expose their output by being converted into strings.
/// Observers expose their output by being converted into strings.
pub fn new(controller: C, builder: B, address: SocketAddr) -> Self {
HttpExporter {
controller,
@ -50,7 +48,7 @@ where
/// Run the exporter on the current thread.
///
/// This starts an HTTP server on the `address` the exporter was originally configured with,
/// responding to any request with the output of the configured recorder.
/// responding to any request with the output of the configured observer.
pub fn run(self) {
let server = self.into_future();
hyper_run(server);
@ -59,7 +57,7 @@ where
/// Converts this exporter into a future which can be driven externally.
///
/// This starts an HTTP server on the `address` the exporter was originally configured with,
/// responding to any request with the output of the configured recorder.
/// responding to any request with the output of the configured observer.
pub fn into_future(self) -> impl Future<Item = (), Error = ()> {
let controller = self.controller;
let builder = self.builder;
@ -75,33 +73,23 @@ fn build_hyper_server<C, B>(
address: SocketAddr,
) -> impl Future<Item = (), Error = ()>
where
C: AsyncSnapshotProvider + Send + Sync + 'static,
C::SnapshotFuture: Send + Sync + 'static,
C::SnapshotError: Error + Send + Sync + 'static,
C: Observe + Send + Sync + 'static,
B: Builder + Send + Sync + 'static,
B::Output: Into<String>,
B::Output: Drain<String> + Observer,
{
let builder = Arc::new(builder);
let controller = Arc::new(controller);
let service = move || {
let controller = controller.clone();
let controller2 = controller.clone();
let builder = builder.clone();
service_fn(move |_| {
let builder = builder.clone();
service_fn_ok(move |_| {
let mut observer = builder.build();
controller
.get_snapshot_async()
.then(move |result| match result {
Ok(snapshot) => {
let mut recorder = builder.build();
snapshot.record(&mut recorder);
let output = recorder.into();
Ok(Response::new(Body::from(output)))
}
Err(e) => Err(e),
})
controller2.observe(&mut observer);
let output = observer.drain();
Response::new(Body::from(output))
})
};

View File

@ -1,7 +1,7 @@
//! Exports metrics via the `log` crate.
//!
//! This exporter can utilize recorders that are able to be converted to a textual representation
//! via [`Into`]. It will emit that output by logging via the `log` crate at the specified
//! This exporter can utilize observers that are able to be converted to a textual representation
//! via [`Drain<String>`]. It will emit that output by logging via the `log` crate at the specified
//! level.
//!
//! # Run Modes
@ -15,89 +15,64 @@ extern crate log;
use futures::prelude::*;
use log::Level;
use metrics_core::{AsyncSnapshotProvider, Builder, Snapshot, SnapshotProvider};
use std::error::Error;
use std::thread;
use std::time::Duration;
use metrics_core::{Builder, Drain, Observe, Observer};
use std::{thread, time::Duration};
use tokio_timer::Interval;
/// Exports metrics by converting them to a textual representation and logging them.
pub struct LogExporter<C, B> {
pub struct LogExporter<C, B>
where
B: Builder,
{
controller: C,
builder: B,
observer: B::Output,
level: Level,
interval: Duration,
}
impl<C, B> LogExporter<C, B>
where
B: Builder,
B::Output: Into<String>,
B::Output: Drain<String> + Observer,
C: Observe,
{
/// Creates a new [`LogExporter`] that logs at the configurable level.
///
/// Recorders expose their output by being converted into strings.
pub fn new(controller: C, builder: B, level: Level) -> Self {
/// Observers expose their output by being converted into strings.
pub fn new(controller: C, builder: B, level: Level, interval: Duration) -> Self {
LogExporter {
controller,
builder,
observer: builder.build(),
level,
interval,
}
}
/// Runs this exporter on the current thread, logging output on the given interval.
pub fn run(&mut self, interval: Duration)
where
C: SnapshotProvider,
C::SnapshotError: Error,
{
/// Runs this exporter on the current thread, logging output at the interval
/// given on construction.
pub fn run(&mut self) {
loop {
thread::sleep(interval);
thread::sleep(self.interval);
self.turn();
}
}
/// Run this exporter, logging output only once.
pub fn turn(&self)
where
C: SnapshotProvider,
C::SnapshotError: Error,
{
match self.controller.get_snapshot() {
Ok(snapshot) => {
let mut recorder = self.builder.build();
snapshot.record(&mut recorder);
let output = recorder.into();
log!(self.level, "{}", output);
}
Err(e) => log!(Level::Error, "failed to get snapshot: {}", e),
}
pub fn turn(&mut self) {
self.controller.observe(&mut self.observer);
let output = self.observer.drain();
log!(self.level, "{}", output);
}
/// Converts this exporter into a future which logs output on the given interval.
pub fn into_future(self, interval: Duration) -> impl Future<Item = (), Error = ()>
where
C: AsyncSnapshotProvider,
C::SnapshotError: Error,
{
let controller = self.controller;
let builder = self.builder;
let level = self.level;
Interval::new_interval(interval)
/// Converts this exporter into a future which logs output at the intervel
/// given on construction.
pub fn into_future(mut self) -> impl Future<Item = (), Error = ()> {
Interval::new_interval(self.interval)
.map_err(|_| ())
.for_each(move |_| {
let mut recorder = builder.build();
controller
.get_snapshot_async()
.and_then(move |snapshot| {
snapshot.record(&mut recorder);
let output = recorder.into();
log!(level, "{}", output);
Ok(())
})
.map_err(|e| error!("failed to get snapshot: {}", e))
self.turn();
Ok(())
})
}
}

View File

@ -1,5 +1,5 @@
[package]
name = "metrics-recorder-prometheus"
name = "metrics-observer-prometheus"
version = "0.2.2"
authors = ["Toby Lawrence <toby@nuclearfurnace.com>"]
edition = "2018"

View File

@ -1,10 +1,9 @@
//! Records metrics in the Prometheus exposition format.
#![deny(missing_docs)]
use hdrhistogram::Histogram;
use metrics_core::{Builder, Key, Label, Recorder};
use metrics_core::{Builder, Drain, Key, Label, Observer};
use metrics_util::{parse_quantiles, Quantile};
use std::collections::HashMap;
use std::time::SystemTime;
use std::{collections::HashMap, time::SystemTime};
/// Builder for [`PrometheusRecorder`].
pub struct PrometheusBuilder {
@ -20,7 +19,7 @@ impl PrometheusBuilder {
///
/// The configured quantiles are used when rendering any histograms.
pub fn new() -> Self {
Self::with_quantiles(&[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0])
Self::default()
}
/// Creates a new [`PrometheusBuilder`] with the given set of quantiles.
@ -36,10 +35,10 @@ impl PrometheusBuilder {
}
impl Builder for PrometheusBuilder {
type Output = PrometheusRecorder;
type Output = PrometheusObserver;
fn build(&self) -> Self::Output {
PrometheusRecorder {
PrometheusObserver {
quantiles: self.quantiles.clone(),
histos: HashMap::new(),
output: get_prom_expo_header(),
@ -47,15 +46,21 @@ impl Builder for PrometheusBuilder {
}
}
impl Default for PrometheusBuilder {
fn default() -> Self {
Self::with_quantiles(&[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0])
}
}
/// Records metrics in the Prometheus exposition format.
pub struct PrometheusRecorder {
pub struct PrometheusObserver {
pub(crate) quantiles: Vec<Quantile>,
pub(crate) histos: HashMap<Key, (u64, Histogram<u64>)>,
pub(crate) output: String,
}
impl Recorder for PrometheusRecorder {
fn record_counter(&mut self, key: Key, value: u64) {
impl Observer for PrometheusObserver {
fn observe_counter(&mut self, key: Key, value: u64) {
let (name, labels) = key_to_parts(key);
let full_name = render_labeled_name(&name, &labels);
self.output.push_str("\n# TYPE ");
@ -67,7 +72,7 @@ impl Recorder for PrometheusRecorder {
self.output.push_str("\n");
}
fn record_gauge(&mut self, key: Key, value: i64) {
fn observe_gauge(&mut self, key: Key, value: i64) {
let (name, labels) = key_to_parts(key);
let full_name = render_labeled_name(&name, &labels);
self.output.push_str("\n# TYPE ");
@ -79,7 +84,7 @@ impl Recorder for PrometheusRecorder {
self.output.push_str("\n");
}
fn record_histogram(&mut self, key: Key, values: &[u64]) {
fn observe_histogram(&mut self, key: Key, values: &[u64]) {
let entry = self.histos.entry(key).or_insert_with(|| {
let h = Histogram::<u64>::new(3).expect("failed to create histogram");
(0, h)
@ -87,24 +92,24 @@ impl Recorder for PrometheusRecorder {
let (sum, h) = entry;
for value in values {
h.record(*value).expect("failed to record histogram value");
h.record(*value).expect("failed to observe histogram value");
*sum += *value;
}
}
}
impl From<PrometheusRecorder> for String {
fn from(r: PrometheusRecorder) -> String {
let mut output = r.output;
impl Drain<String> for PrometheusObserver {
fn drain(&mut self) -> String {
let mut output: String = self.output.drain(..).collect();
for (key, sh) in r.histos {
for (key, sh) in self.histos.drain() {
let (sum, hist) = sh;
let (name, labels) = key_to_parts(key);
output.push_str("\n# TYPE ");
output.push_str(name.as_str());
output.push_str(" summary\n");
for quantile in &r.quantiles {
for quantile in &self.quantiles {
let value = hist.value_at_quantile(quantile.value());
let mut labels = labels.clone();
labels.push(format!("quantile=\"{}\"", quantile.value()));

View File

@ -1,5 +1,5 @@
[package]
name = "metrics-recorder-text"
name = "metrics-observer-text"
version = "0.2.2"
authors = ["Toby Lawrence <toby@nuclearfurnace.com>"]
edition = "2018"

View File

@ -1,4 +1,4 @@
//! Records metrics in a hierarchical, text-based format.
//! Observes metrics in a hierarchical, text-based format.
//!
//! Metric scopes are used to provide the hierarchy and indentation of metrics. As an example, for
//! a snapshot with two metrics — `server.msgs_received` and `server.msgs_sent` — we would
@ -26,7 +26,7 @@
//! ## Histograms
//!
//! Histograms are rendered with a configurable set of quantiles that are provided when creating an
//! instance of `TextRecorder`. They are formatted using human-readable labels when displayed to
//! instance of `TextObserver`. They are formatted using human-readable labels when displayed to
//! the user. For example, 0.0 is rendered as "min", 1.0 as "max", and anything in between using
//! the common "pXXX" format i.e. a quantile of 0.5 or percentile of 50 would be p50, a quantile of
//! 0.999 or percentile of 99.9 would be p999, and so on.
@ -44,10 +44,12 @@
//!
#![deny(missing_docs)]
use hdrhistogram::Histogram;
use metrics_core::{Builder, Key, Label, Recorder};
use metrics_core::{Builder, Drain, Key, Label, Observer};
use metrics_util::{parse_quantiles, Quantile};
use std::collections::{HashMap, VecDeque};
use std::fmt::Display;
use std::{
collections::{HashMap, VecDeque},
fmt::Display,
};
/// Builder for [`TextRecorder`].
pub struct TextBuilder {
@ -57,13 +59,13 @@ pub struct TextBuilder {
impl TextBuilder {
/// Creates a new [`TextBuilder`] with a default set of quantiles.
///
/// Configures the recorder with these default quantiles: 0.0, 0.5, 0.9, 0.95, 0.99, 0.999, and
/// Configures the observer with these default quantiles: 0.0, 0.5, 0.9, 0.95, 0.99, 0.999, and
/// 1.0. If you want to customize the quantiles used, you can call
/// [`TextBuilder::with_quantiles`].
/// [`TextBuilder::with_quantiles`].
///
/// The configured quantiles are used when rendering any histograms.
pub fn new() -> Self {
Self::with_quantiles(&[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0])
Self::default()
}
/// Creates a new [`TextBuilder`] with the given set of quantiles.
@ -79,10 +81,10 @@ impl TextBuilder {
}
impl Builder for TextBuilder {
type Output = TextRecorder;
type Output = TextObserver;
fn build(&self) -> Self::Output {
TextRecorder {
TextObserver {
quantiles: self.quantiles.clone(),
structure: MetricsTree::with_level(0),
histos: HashMap::new(),
@ -90,27 +92,33 @@ impl Builder for TextBuilder {
}
}
impl Default for TextBuilder {
fn default() -> Self {
Self::with_quantiles(&[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0])
}
}
/// Records metrics in a hierarchical, text-based format.
pub struct TextRecorder {
pub struct TextObserver {
pub(crate) quantiles: Vec<Quantile>,
pub(crate) structure: MetricsTree,
pub(crate) histos: HashMap<Key, Histogram<u64>>,
}
impl Recorder for TextRecorder {
fn record_counter(&mut self, key: Key, value: u64) {
impl Observer for TextObserver {
fn observe_counter(&mut self, key: Key, value: u64) {
let (name_parts, name) = key_to_parts(key);
let mut values = single_value_to_values(name, value);
self.structure.insert(name_parts, &mut values);
}
fn record_gauge(&mut self, key: Key, value: i64) {
fn observe_gauge(&mut self, key: Key, value: i64) {
let (name_parts, name) = key_to_parts(key);
let mut values = single_value_to_values(name, value);
self.structure.insert(name_parts, &mut values);
}
fn record_histogram(&mut self, key: Key, values: &[u64]) {
fn observe_histogram(&mut self, key: Key, values: &[u64]) {
let entry = self
.histos
.entry(key)
@ -119,12 +127,11 @@ impl Recorder for TextRecorder {
for value in values {
entry
.record(*value)
.expect("failed to record histogram value");
.expect("failed to observe histogram value");
}
}
}
#[derive(Default)]
struct MetricsTree {
level: usize,
current: Vec<String>,
@ -164,15 +171,15 @@ impl MetricsTree {
}
}
pub fn into_output(self) -> String {
pub fn render(&mut self) -> String {
let indent = " ".repeat(self.level);
let mut output = String::new();
let mut sorted = self
.current
.into_iter()
.drain(..)
.map(SortEntry::Inline)
.chain(self.next.into_iter().map(|(k, v)| SortEntry::Nested(k, v)))
.chain(self.next.drain().map(|(k, v)| SortEntry::Nested(k, v)))
.collect::<Vec<_>>();
sorted.sort();
@ -182,12 +189,12 @@ impl MetricsTree {
output.push_str(s.as_str());
output.push_str("\n");
}
SortEntry::Nested(s, inner) => {
SortEntry::Nested(s, mut inner) => {
output.push_str(indent.as_str());
output.push_str(s.as_str());
output.push_str(":\n");
let layer_output = inner.into_output();
let layer_output = inner.render();
output.push_str(layer_output.as_str());
}
}
@ -197,15 +204,14 @@ impl MetricsTree {
}
}
impl From<TextRecorder> for String {
fn from(r: TextRecorder) -> String {
let mut structure = r.structure;
for (key, h) in r.histos {
impl Drain<String> for TextObserver {
fn drain(&mut self) -> String {
for (key, h) in self.histos.drain() {
let (name_parts, name) = key_to_parts(key);
let mut values = hist_to_values(name, h, &r.quantiles);
structure.insert(name_parts, &mut values);
let mut values = hist_to_values(name, h.clone(), &self.quantiles);
self.structure.insert(name_parts, &mut values);
}
structure.into_output()
self.structure.render()
}
}
@ -215,7 +221,7 @@ enum SortEntry {
}
impl SortEntry {
fn name(&self) -> &String {
fn name(&self) -> &str {
match self {
SortEntry::Inline(s) => s,
SortEntry::Nested(s, _) => s,

View File

@ -17,9 +17,9 @@ readme = "README.md"
keywords = ["metrics", "telemetry", "histogram", "counter", "gauge"]
[features]
default = ["exporters", "recorders"]
default = ["exporters", "observers"]
exporters = ["metrics-exporter-log", "metrics-exporter-http"]
recorders = ["metrics-recorder-text", "metrics-recorder-prometheus"]
observers = ["metrics-observer-text", "metrics-observer-prometheus"]
[[bench]]
name = "histogram"
@ -38,8 +38,8 @@ futures = "^0.1"
crossbeam-utils = "^0.6"
metrics-exporter-log = { path = "../metrics-exporter-log", version = "^0.2", optional = true }
metrics-exporter-http = { path = "../metrics-exporter-http", version = "^0.1", optional = true }
metrics-recorder-text = { path = "../metrics-recorder-text", version = "^0.2", optional = true }
metrics-recorder-prometheus = { path = "../metrics-recorder-prometheus", version = "^0.2", optional = true }
metrics-observer-text = { path = "../metrics-observer-text", version = "^0.2", optional = true }
metrics-observer-prometheus = { path = "../metrics-observer-prometheus", version = "^0.2", optional = true }
[dev-dependencies]
log = "^0.4"

View File

@ -8,7 +8,6 @@ extern crate metrics_runtime;
use getopts::Options;
use hdrhistogram::Histogram;
use metrics_core::SnapshotProvider;
use metrics_runtime::{Receiver, Sink};
use quanta::Clock;
use std::{
@ -255,7 +254,7 @@ fn main() {
let t1 = Instant::now();
let start = Instant::now();
let _snapshot = controller.get_snapshot().unwrap();
let _snapshot = controller.snapshot();
let end = Instant::now();
snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64);

View File

@ -11,7 +11,6 @@ extern crate metrics;
use getopts::Options;
use hdrhistogram::Histogram;
use metrics_core::SnapshotProvider;
use metrics_runtime::Receiver;
use quanta::Clock;
use std::{
@ -192,7 +191,7 @@ fn main() {
let t1 = Instant::now();
let start = Instant::now();
let _snapshot = controller.get_snapshot().unwrap();
let _snapshot = controller.snapshot();
let end = Instant::now();
snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64);

View File

@ -1,7 +1,5 @@
use crate::{config::Configuration, Receiver};
use std::error::Error;
use std::fmt;
use std::time::Duration;
use std::{error::Error, fmt, time::Duration};
/// Errors during receiver creation.
#[derive(Debug, Clone)]

View File

@ -2,10 +2,14 @@ use crate::data::AtomicWindowedHistogram;
use metrics_core::{Key, ScopedString};
use metrics_util::StreamingIntegers;
use quanta::Clock;
use std::ops::Deref;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{
ops::Deref,
sync::{
atomic::{AtomicI64, AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
};
/// A scope, or context, for a metric.
#[doc(hidden)]

View File

@ -1,32 +1,12 @@
use crate::data::Snapshot;
use crate::registry::{MetricRegistry, ScopeRegistry};
use futures::prelude::*;
use metrics_core::{AsyncSnapshotProvider, SnapshotProvider};
use std::error::Error;
use std::fmt;
use crate::{
data::Snapshot,
registry::{MetricRegistry, ScopeRegistry},
};
use metrics_core::{Observe, Observer};
use std::sync::Arc;
/// Error during snapshot retrieval.
#[derive(Debug, Clone)]
pub enum SnapshotError {
/// The future was polled again after returning the snapshot.
AlreadyUsed,
#[doc(hidden)]
_NonExhaustive,
}
impl Error for SnapshotError {}
impl fmt::Display for SnapshotError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SnapshotError::AlreadyUsed => write!(f, "snapshot already returned from future"),
SnapshotError::_NonExhaustive => write!(f, "non-exhaustive matching"),
}
}
}
/// Handle for acquiring snapshots.
///
/// `Controller` is [`metrics-core`]-compatible as a snapshot provider, both for synchronous and
@ -49,52 +29,15 @@ impl Controller {
scope_registry,
}
}
}
impl SnapshotProvider for Controller {
type Snapshot = Snapshot;
type SnapshotError = SnapshotError;
/// Gets a snapshot.
fn get_snapshot(&self) -> Result<Snapshot, SnapshotError> {
let snapshot = self.metric_registry.get_snapshot();
Ok(snapshot)
/// Provide a snapshot of its collected metrics.
pub fn snapshot(&self) -> Snapshot {
self.metric_registry.snapshot()
}
}
impl AsyncSnapshotProvider for Controller {
type Snapshot = Snapshot;
type SnapshotError = SnapshotError;
type SnapshotFuture = SnapshotFuture;
/// Gets a snapshot asynchronously.
fn get_snapshot_async(&self) -> Self::SnapshotFuture {
let snapshot = self.metric_registry.get_snapshot();
SnapshotFuture::new(snapshot)
}
}
/// A future representing collecting a snapshot.
pub struct SnapshotFuture {
snapshot: Option<Snapshot>,
}
impl SnapshotFuture {
pub fn new(snapshot: Snapshot) -> Self {
SnapshotFuture {
snapshot: Some(snapshot),
}
}
}
impl Future for SnapshotFuture {
type Item = Snapshot;
type Error = SnapshotError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.snapshot
.take()
.ok_or(SnapshotError::AlreadyUsed)
.map(Async::Ready)
impl Observe for Controller {
fn observe<O: Observer>(&self, observer: &mut O) {
self.metric_registry.observe(observer)
}
}

View File

@ -1,5 +1,5 @@
use crate::common::ValueSnapshot;
use metrics_core::{Key, Recorder, Snapshot as MetricsSnapshot};
use metrics_core::Key;
/// A point-in-time view of metric data.
#[derive(Default, Debug)]
@ -22,86 +22,3 @@ impl Snapshot {
self.measurements.len() != 0
}
}
impl MetricsSnapshot for Snapshot {
/// Records the snapshot to the given recorder.
fn record<R: Recorder>(&self, recorder: &mut R) {
for (key, snapshot) in &self.measurements {
let key = key.clone();
match snapshot {
ValueSnapshot::Counter(value) => recorder.record_counter(key, *value),
ValueSnapshot::Gauge(value) => recorder.record_gauge(key, *value),
ValueSnapshot::Histogram(stream) => stream.decompress_with(|values| {
recorder.record_histogram(key.clone(), values);
}),
}
}
}
}
#[cfg(test)]
mod tests {
use super::{MetricsSnapshot, Recorder, Snapshot, ValueSnapshot};
use metrics_core::Key;
use metrics_util::StreamingIntegers;
use std::collections::HashMap;
#[derive(Default)]
struct MockRecorder {
counter: HashMap<Key, u64>,
gauge: HashMap<Key, i64>,
histogram: HashMap<Key, Vec<u64>>,
}
impl MockRecorder {
pub fn get_counter_value(&self, key: &Key) -> Option<&u64> {
self.counter.get(key)
}
pub fn get_gauge_value(&self, key: &Key) -> Option<&i64> {
self.gauge.get(key)
}
pub fn get_histogram_values(&self, key: &Key) -> Option<&Vec<u64>> {
self.histogram.get(key)
}
}
impl Recorder for MockRecorder {
fn record_counter(&mut self, key: Key, value: u64) {
let _ = self.counter.insert(key, value);
}
fn record_gauge(&mut self, key: Key, value: i64) {
let _ = self.gauge.insert(key, value);
}
fn record_histogram(&mut self, key: Key, values: &[u64]) {
let _ = self.histogram.insert(key, values.to_vec());
}
}
#[test]
fn test_snapshot_recorder() {
let key = Key::from_name("ok");
let mut measurements = Vec::new();
measurements.push((key.clone(), ValueSnapshot::Counter(7)));
measurements.push((key.clone(), ValueSnapshot::Gauge(42)));
let hvalues = vec![10, 25, 42, 97];
let mut stream = StreamingIntegers::new();
stream.compress(&hvalues);
measurements.push((key.clone(), ValueSnapshot::Histogram(stream)));
let snapshot = Snapshot::new(measurements);
let mut recorder = MockRecorder::default();
snapshot.record(&mut recorder);
assert_eq!(recorder.get_counter_value(&key), Some(&7));
assert_eq!(recorder.get_gauge_value(&key), Some(&42));
let hsum = recorder.get_histogram_values(&key).map(|x| x.iter().sum());
assert_eq!(hsum, Some(174));
}
}

View File

@ -172,7 +172,7 @@
//! Naturally, we need a way to get the metrics out of the system, which is where snapshots come
//! into play. By utilizing a [`Controller`], we can take a snapshot of the current metrics in the
//! registry, and then output them to any desired system/interface by utilizing
//! [`Recorder`](metrics_core::Recorder). A number of pre-baked recorders (which only concern
//! [`Observer`](metrics_core::Observer). A number of pre-baked observers (which only concern
//! themselves with formatting the data) and exporters (which take the formatted data and either
//! serve it up, such as exposing an HTTP endpoint, or write it somewhere, like stdout) are
//! available, some of which are exposed by this crate.
@ -181,7 +181,9 @@
//! `log!`:
//! ```rust
//! # extern crate metrics_runtime;
//! use metrics_runtime::{Receiver, recorders::TextBuilder, exporters::LogExporter};
//! use metrics_runtime::{
//! Receiver, observers::TextBuilder, exporters::LogExporter,
//! };
//! use log::Level;
//! use std::{thread, time::Duration};
//! let receiver = Receiver::builder().build().expect("failed to create receiver");
@ -202,14 +204,19 @@
//! let num_rows = 46;
//! sink.record_value("db.queries.select_products_num_rows", num_rows);
//!
//! // Now create our exporter/recorder configuration, and wire it up.
//! let exporter = LogExporter::new(receiver.get_controller(), TextBuilder::new(), Level::Info);
//! // Now create our exporter/observer configuration, and wire it up.
//! let exporter = LogExporter::new(
//! receiver.get_controller(),
//! TextBuilder::new(),
//! Level::Info,
//! Duration::from_secs(5),
//! );
//!
//! // This exporter will now run every 5 seconds, taking a snapshot, rendering it, and writing it
//! // via `log!` at the informational level. This particular exporter is running directly on the
//! // current thread, and not on a background thread.
//! //
//! // exporter.run(Duration::from_secs(5));
//! // exporter.run();
//! ```
//! Most exporters have the ability to run on the current thread or to be converted into a future
//! which can be spawned on any Tokio-compatible runtime.
@ -231,7 +238,7 @@
//! ```
//!
//! [metrics_core]: https://docs.rs/metrics-core
//! [`Recorder`]: https://docs.rs/metrics-core/0.3.1/metrics_core/trait.Recorder.html
//! [`Observer`]: https://docs.rs/metrics-core/0.3.1/metrics_core/trait.Observer.html
#![deny(missing_docs)]
#![warn(unused_extern_crates)]
mod builder;
@ -248,15 +255,15 @@ mod sink;
pub mod exporters;
#[cfg(any(
feature = "metrics-recorder-text",
feature = "metrics-recorder-prometheus"
feature = "metrics-observer-text",
feature = "metrics-observer-prometheus"
))]
pub mod recorders;
pub mod observers;
pub use self::{
builder::{Builder, BuilderError},
common::{Delta, Scope},
control::{Controller, SnapshotError},
control::Controller,
receiver::Receiver,
sink::{AsScoped, Sink, SinkError},
};

View File

@ -0,0 +1,8 @@
//! Commonly used observers.
//!
//! Observers define the format of the metric output: text, JSON, etc.
#[cfg(feature = "metrics-observer-text")]
pub use metrics_observer_text::TextBuilder;
#[cfg(feature = "metrics-observer-prometheus")]
pub use metrics_observer_prometheus::PrometheusBuilder;

View File

@ -9,8 +9,7 @@ use crate::{
use metrics::Recorder;
use metrics_core::Key;
use quanta::{Builder as UpkeepBuilder, Clock, Handle as UpkeepHandle};
use std::cell::RefCell;
use std::sync::Arc;
use std::{cell::RefCell, sync::Arc};
thread_local! {
static SINK: RefCell<Option<Sink>> = RefCell::new(None);

View File

@ -1,8 +0,0 @@
//! Commonly used recorders.
//!
//! Recorders define the format of the metric output: text, JSON, etc.
#[cfg(feature = "metrics-recorder-text")]
pub use metrics_recorder_text::TextBuilder;
#[cfg(feature = "metrics-recorder-prometheus")]
pub use metrics_recorder_prometheus::PrometheusBuilder;

View File

@ -1,9 +1,10 @@
use crate::common::{Identifier, Kind, ValueHandle};
use crate::common::{Identifier, Kind, ValueHandle, ValueSnapshot};
use crate::config::Configuration;
use crate::data::Snapshot;
use crate::registry::ScopeRegistry;
use arc_swap::{ptr_eq, ArcSwap};
use im::hashmap::HashMap;
use metrics_core::Observer;
use quanta::Clock;
use std::ops::Deref;
use std::sync::Arc;
@ -63,7 +64,7 @@ impl MetricRegistry {
}
}
pub fn get_snapshot(&self) -> Snapshot {
pub fn snapshot(&self) -> Snapshot {
let mut named_values = Vec::new();
let metrics = self.metrics.load().deref().clone();
@ -78,4 +79,21 @@ impl MetricRegistry {
Snapshot::new(named_values)
}
pub fn observe<O: Observer>(&self, observer: &mut O) {
let metrics = self.metrics.load().deref().clone();
for (id, value) in metrics.into_iter() {
let (key, scope_handle, _) = id.into_parts();
let scope = self.scope_registry.get(scope_handle);
let key = key.map_name(|name| scope.into_scoped(name));
match value.snapshot() {
ValueSnapshot::Counter(value) => observer.observe_counter(key, value),
ValueSnapshot::Gauge(value) => observer.observe_gauge(key, value),
ValueSnapshot::Histogram(stream) => stream.decompress_with(|values| {
observer.observe_histogram(key.clone(), values);
}),
}
}
}
}

View File

@ -6,9 +6,7 @@ use crate::{
use hashbrown::HashMap;
use metrics_core::{IntoLabels, Key, Label, ScopedString};
use quanta::Clock;
use std::error::Error;
use std::fmt;
use std::sync::Arc;
use std::{error::Error, fmt, sync::Arc};
/// Errors during sink creation.
#[derive(Debug, Clone)]

View File

@ -1,7 +1,9 @@
use crossbeam_epoch::{pin as epoch_pin, Atomic, Guard, Owned, Shared};
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{mem, slice};
use std::{
cell::UnsafeCell,
mem, slice,
sync::atomic::{AtomicUsize, Ordering},
};
const BLOCK_SIZE: usize = 128;
@ -111,9 +113,7 @@ pub struct AtomicBucket<T> {
impl<T> AtomicBucket<T> {
/// Creates a new, empty bucket.
pub fn new() -> Self {
AtomicBucket {
tail: Atomic::null(),
}
Self::default()
}
/// Pushes an element into the bucket.
@ -235,21 +235,28 @@ impl<T> AtomicBucket<T> {
// will see it as empty until another write proceeds.
let guard = &epoch_pin();
let tail = self.tail.load(Ordering::Acquire, guard);
if !tail.is_null() {
if self
if !tail.is_null()
&& self
.tail
.compare_and_set(tail, Shared::null(), Ordering::SeqCst, guard)
.is_ok()
{
// We won the swap to delete the tail node. Now configure a deferred drop to clean
// things up once nobody else is using it.
unsafe {
// Drop the block, which will cause a cascading drop on the next block, and
// so on and so forth, until all blocks linked to this one are dropped.
guard.defer_destroy(tail);
}
guard.flush();
{
// We won the swap to delete the tail node. Now configure a deferred drop to clean
// things up once nobody else is using it.
unsafe {
// Drop the block, which will cause a cascading drop on the next block, and
// so on and so forth, until all blocks linked to this one are dropped.
guard.defer_destroy(tail);
}
guard.flush();
}
}
}
impl<T> Default for AtomicBucket<T> {
fn default() -> Self {
Self {
tail: Atomic::null(),
}
}
}

View File

@ -149,8 +149,10 @@ use metrics_core::AsNanoseconds;
pub use metrics_core::{labels, Key, Label};
#[cfg(feature = "std")]
use std::error;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{
fmt,
sync::atomic::{AtomicUsize, Ordering},
};
#[macro_use]
mod macros;