core: add Builder trait for recorders (#30)
This change adds a new trait -- `Builder` -- which defines a value that can create new recorder instances. As we have a need to generate owned recorders, in particular for futures-based code, the `Builder` trait provides a way to do so without any jankiness, such as the prior Clone-based approach. As such, all exporters now expect a builder to be passed in, rather than the recorder itself.
This commit is contained in:
parent
8c4e130170
commit
ad72bc16c1
|
@ -284,6 +284,23 @@ pub trait Recorder {
|
|||
fn record_histogram(&mut self, key: Key, values: &[u64]);
|
||||
}
|
||||
|
||||
/// A value that can build a recorder.
|
||||
///
|
||||
/// Recorders are intended to be single-use containers for rendering a snapshot in a particular
|
||||
/// format. As many systems are multi-threaded, we can't easily share a single recorder amongst
|
||||
/// multiple threads, and so we create a recorder per snapshot, tying them together.
|
||||
///
|
||||
/// A builder allows us to generate a recorder on demand, giving each specific recorder an
|
||||
/// interface by which they can do any necessary configuration, initialization, etc of the recorder
|
||||
/// before handing it over to the exporter.
|
||||
pub trait Builder {
|
||||
/// The recorder created by this builder.
|
||||
type Output: Recorder;
|
||||
|
||||
/// Creates a new recorder.
|
||||
fn build(&self) -> Self::Output;
|
||||
}
|
||||
|
||||
/// A value that holds a point-in-time view of collected metrics.
|
||||
pub trait Snapshot {
|
||||
/// Records the snapshot to the given recorder.
|
||||
|
|
|
@ -16,31 +16,33 @@ use hyper::rt::run as hyper_run;
|
|||
use hyper::rt::Future;
|
||||
use hyper::service::service_fn;
|
||||
use hyper::{Body, Response, Server};
|
||||
use metrics_core::{AsyncSnapshotProvider, Recorder, Snapshot};
|
||||
use metrics_core::{AsyncSnapshotProvider, Builder, Snapshot};
|
||||
use std::error::Error;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Exports metrics over HTTP.
|
||||
pub struct HttpExporter<C, R> {
|
||||
pub struct HttpExporter<C, B> {
|
||||
controller: C,
|
||||
recorder: R,
|
||||
builder: B,
|
||||
address: SocketAddr,
|
||||
}
|
||||
|
||||
impl<C, R> HttpExporter<C, R>
|
||||
impl<C, B> HttpExporter<C, B>
|
||||
where
|
||||
C: AsyncSnapshotProvider + Clone + Send + 'static,
|
||||
C::SnapshotFuture: Send + 'static,
|
||||
C: AsyncSnapshotProvider + Send + Sync + 'static,
|
||||
C::SnapshotFuture: Send + Sync + 'static,
|
||||
C::SnapshotError: Error + Send + Sync + 'static,
|
||||
R: Recorder + Clone + Into<String> + Send + 'static,
|
||||
B: Builder + Send + Sync + 'static,
|
||||
B::Output: Into<String>,
|
||||
{
|
||||
/// Creates a new [`HttpExporter`] that listens on the given `address`.
|
||||
///
|
||||
/// Recorders expose their output by being converted into strings.
|
||||
pub fn new(controller: C, recorder: R, address: SocketAddr) -> Self {
|
||||
pub fn new(controller: C, builder: B, address: SocketAddr) -> Self {
|
||||
HttpExporter {
|
||||
controller,
|
||||
recorder,
|
||||
builder,
|
||||
address,
|
||||
}
|
||||
}
|
||||
|
@ -60,38 +62,42 @@ where
|
|||
/// responding to any request with the output of the configured recorder.
|
||||
pub fn into_future(self) -> impl Future<Item = (), Error = ()> {
|
||||
let controller = self.controller;
|
||||
let recorder = self.recorder;
|
||||
let builder = self.builder;
|
||||
let address = self.address;
|
||||
|
||||
build_hyper_server(controller, recorder, address)
|
||||
build_hyper_server(controller, builder, address)
|
||||
}
|
||||
}
|
||||
|
||||
fn build_hyper_server<C, R>(
|
||||
fn build_hyper_server<C, B>(
|
||||
controller: C,
|
||||
recorder: R,
|
||||
builder: B,
|
||||
address: SocketAddr,
|
||||
) -> impl Future<Item = (), Error = ()>
|
||||
where
|
||||
C: AsyncSnapshotProvider + Clone + Send + 'static,
|
||||
C::SnapshotFuture: Send + 'static,
|
||||
C: AsyncSnapshotProvider + Send + Sync + 'static,
|
||||
C::SnapshotFuture: Send + Sync + 'static,
|
||||
C::SnapshotError: Error + Send + Sync + 'static,
|
||||
R: Recorder + Clone + Into<String> + Send + 'static,
|
||||
B: Builder + Send + Sync + 'static,
|
||||
B::Output: Into<String>,
|
||||
{
|
||||
let builder = Arc::new(builder);
|
||||
let controller = Arc::new(controller);
|
||||
|
||||
let service = move || {
|
||||
let controller2 = controller.clone();
|
||||
let recorder2 = recorder.clone();
|
||||
let controller = controller.clone();
|
||||
let builder = builder.clone();
|
||||
|
||||
service_fn(move |_| {
|
||||
let recorder3 = recorder2.clone();
|
||||
let builder = builder.clone();
|
||||
|
||||
controller2
|
||||
controller
|
||||
.get_snapshot_async()
|
||||
.then(move |result| match result {
|
||||
Ok(snapshot) => {
|
||||
let mut r = recorder3.clone();
|
||||
snapshot.record(&mut r);
|
||||
let output = r.into();
|
||||
let mut recorder = builder.build();
|
||||
snapshot.record(&mut recorder);
|
||||
let output = recorder.into();
|
||||
Ok(Response::new(Body::from(output)))
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
|
|
|
@ -15,30 +15,31 @@ extern crate log;
|
|||
|
||||
use futures::prelude::*;
|
||||
use log::Level;
|
||||
use metrics_core::{AsyncSnapshotProvider, Recorder, Snapshot, SnapshotProvider};
|
||||
use metrics_core::{AsyncSnapshotProvider, Builder, Snapshot, SnapshotProvider};
|
||||
use std::error::Error;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use tokio_timer::Interval;
|
||||
|
||||
/// Exports metrics by converting them to a textual representation and logging them.
|
||||
pub struct LogExporter<C, R> {
|
||||
pub struct LogExporter<C, B> {
|
||||
controller: C,
|
||||
recorder: R,
|
||||
builder: B,
|
||||
level: Level,
|
||||
}
|
||||
|
||||
impl<C, R> LogExporter<C, R>
|
||||
impl<C, B> LogExporter<C, B>
|
||||
where
|
||||
R: Recorder + Clone + Into<String>,
|
||||
B: Builder,
|
||||
B::Output: Into<String>,
|
||||
{
|
||||
/// Creates a new [`LogExporter`] that logs at the configurable level.
|
||||
///
|
||||
/// Recorders expose their output by being converted into strings.
|
||||
pub fn new(controller: C, recorder: R, level: Level) -> Self {
|
||||
pub fn new(controller: C, builder: B, level: Level) -> Self {
|
||||
LogExporter {
|
||||
controller,
|
||||
recorder,
|
||||
builder,
|
||||
level,
|
||||
}
|
||||
}
|
||||
|
@ -64,7 +65,7 @@ where
|
|||
{
|
||||
match self.controller.get_snapshot() {
|
||||
Ok(snapshot) => {
|
||||
let mut recorder = self.recorder.clone();
|
||||
let mut recorder = self.builder.build();
|
||||
snapshot.record(&mut recorder);
|
||||
let output = recorder.into();
|
||||
log!(self.level, "{}", output);
|
||||
|
@ -80,13 +81,13 @@ where
|
|||
C::SnapshotError: Error,
|
||||
{
|
||||
let controller = self.controller;
|
||||
let recorder = self.recorder;
|
||||
let builder = self.builder;
|
||||
let level = self.level;
|
||||
|
||||
Interval::new_interval(interval)
|
||||
.map_err(|_| ())
|
||||
.for_each(move |_| {
|
||||
let mut recorder = recorder.clone();
|
||||
let mut recorder = builder.build();
|
||||
|
||||
controller
|
||||
.get_snapshot_async()
|
||||
|
|
|
@ -1,43 +1,59 @@
|
|||
//! Records metrics in the Prometheus exposition format.
|
||||
#![deny(missing_docs)]
|
||||
use hdrhistogram::Histogram;
|
||||
use metrics_core::{Key, Recorder};
|
||||
use metrics_core::{Builder, Key, Label, Recorder};
|
||||
use metrics_util::{parse_quantiles, Quantile};
|
||||
use std::collections::HashMap;
|
||||
use std::time::SystemTime;
|
||||
|
||||
/// Records metrics in the Prometheus exposition format.
|
||||
pub struct PrometheusRecorder {
|
||||
/// Builder for [`PrometheusRecorder`].
|
||||
pub struct PrometheusBuilder {
|
||||
quantiles: Vec<Quantile>,
|
||||
histos: HashMap<Key, (u64, Histogram<u64>)>,
|
||||
output: String,
|
||||
}
|
||||
|
||||
impl PrometheusRecorder {
|
||||
/// Creates a new [`PrometheusRecorder`] with a default set of quantiles.
|
||||
impl PrometheusBuilder {
|
||||
/// Creates a new [`PrometheusBuilder`] with a default set of quantiles.
|
||||
///
|
||||
/// Configures the recorder with these default quantiles: 0.0, 0.5, 0.9, 0.95, 0.99, 0.999, and
|
||||
/// 1.0. If you want to customize the quantiles used, you can call
|
||||
/// [`PrometheusRecorder::with_quantiles`].
|
||||
/// [`PrometheusBuilder::with_quantiles`].
|
||||
///
|
||||
/// The configured quantiles are used when rendering any histograms.
|
||||
pub fn new() -> Self {
|
||||
Self::with_quantiles(&[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0])
|
||||
}
|
||||
|
||||
/// Creates a new [`PrometheusRecorder`] with the given set of quantiles.
|
||||
/// Creates a new [`PrometheusBuilder`] with the given set of quantiles.
|
||||
///
|
||||
/// The configured quantiles are used when rendering any histograms.
|
||||
pub fn with_quantiles(quantiles: &[f64]) -> Self {
|
||||
let actual_quantiles = parse_quantiles(quantiles);
|
||||
|
||||
Self {
|
||||
quantiles: actual_quantiles,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Builder for PrometheusBuilder {
|
||||
type Output = PrometheusRecorder;
|
||||
|
||||
fn build(&self) -> Self::Output {
|
||||
PrometheusRecorder {
|
||||
quantiles: self.quantiles.clone(),
|
||||
histos: HashMap::new(),
|
||||
output: get_prom_expo_header(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Records metrics in the Prometheus exposition format.
|
||||
pub struct PrometheusRecorder {
|
||||
pub(crate) quantiles: Vec<Quantile>,
|
||||
pub(crate) histos: HashMap<Key, (u64, Histogram<u64>)>,
|
||||
pub(crate) output: String,
|
||||
}
|
||||
|
||||
impl Recorder for PrometheusRecorder {
|
||||
fn record_counter(&mut self, key: Key, value: u64) {
|
||||
let (name, labels) = key_to_parts(key);
|
||||
|
@ -77,16 +93,6 @@ impl Recorder for PrometheusRecorder {
|
|||
}
|
||||
}
|
||||
|
||||
impl Clone for PrometheusRecorder {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
output: get_prom_expo_header(),
|
||||
histos: HashMap::new(),
|
||||
quantiles: self.quantiles.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PrometheusRecorder> for String {
|
||||
fn from(r: PrometheusRecorder) -> String {
|
||||
let mut output = r.output;
|
||||
|
@ -131,7 +137,7 @@ fn key_to_parts(key: Key) -> (String, Vec<String>) {
|
|||
let name = name.replace('.', "_");
|
||||
let labels = labels
|
||||
.into_iter()
|
||||
.map(|label| label.into_parts())
|
||||
.map(Label::into_parts)
|
||||
.map(|(k, v)| format!("{}=\"{}\"", k, v))
|
||||
.collect();
|
||||
|
||||
|
|
|
@ -44,44 +44,59 @@
|
|||
//!
|
||||
#![deny(missing_docs)]
|
||||
use hdrhistogram::Histogram;
|
||||
use metrics_core::{Key, Recorder};
|
||||
use metrics_core::{Builder, Key, Label, Recorder};
|
||||
use metrics_util::{parse_quantiles, Quantile};
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fmt::Display;
|
||||
|
||||
/// Records metrics in a hierarchical, text-based format.
|
||||
pub struct TextRecorder {
|
||||
structure: MetricsTree,
|
||||
histos: HashMap<Key, Histogram<u64>>,
|
||||
/// Builder for [`TextRecorder`].
|
||||
pub struct TextBuilder {
|
||||
quantiles: Vec<Quantile>,
|
||||
}
|
||||
|
||||
impl TextRecorder {
|
||||
/// Creates a new [`TextRecorder`] with a default set of quantiles.
|
||||
impl TextBuilder {
|
||||
/// Creates a new [`TextBuilder`] with a default set of quantiles.
|
||||
///
|
||||
/// Configures the recorder with these default quantiles: 0.0, 0.5, 0.9, 0.95, 0.99, 0.999, and
|
||||
/// 1.0. If you want to customize the quantiles used, you can call
|
||||
/// [`TextRecorder::with_quantiles`].
|
||||
/// [`TextBuilder::with_quantiles`].
|
||||
///
|
||||
/// The configured quantiles are used when rendering any histograms.
|
||||
pub fn new() -> Self {
|
||||
Self::with_quantiles(&[0.0, 0.5, 0.9, 0.95, 0.99, 0.999, 1.0])
|
||||
}
|
||||
|
||||
/// Creates a new [`TextRecorder`] with the given set of quantiles.
|
||||
/// Creates a new [`TextBuilder`] with the given set of quantiles.
|
||||
///
|
||||
/// The configured quantiles are used when rendering any histograms.
|
||||
pub fn with_quantiles(quantiles: &[f64]) -> Self {
|
||||
let actual_quantiles = parse_quantiles(quantiles);
|
||||
|
||||
Self {
|
||||
structure: MetricsTree::with_level(0),
|
||||
histos: HashMap::new(),
|
||||
quantiles: actual_quantiles,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Builder for TextBuilder {
|
||||
type Output = TextRecorder;
|
||||
|
||||
fn build(&self) -> Self::Output {
|
||||
TextRecorder {
|
||||
quantiles: self.quantiles.clone(),
|
||||
structure: MetricsTree::with_level(0),
|
||||
histos: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Records metrics in a hierarchical, text-based format.
|
||||
pub struct TextRecorder {
|
||||
pub(crate) quantiles: Vec<Quantile>,
|
||||
pub(crate) structure: MetricsTree,
|
||||
pub(crate) histos: HashMap<Key, Histogram<u64>>,
|
||||
}
|
||||
|
||||
impl Recorder for TextRecorder {
|
||||
fn record_counter(&mut self, key: Key, value: u64) {
|
||||
let (name_parts, name) = key_to_parts(key);
|
||||
|
@ -109,16 +124,6 @@ impl Recorder for TextRecorder {
|
|||
}
|
||||
}
|
||||
|
||||
impl Clone for TextRecorder {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
structure: MetricsTree::with_level(0),
|
||||
histos: HashMap::new(),
|
||||
quantiles: self.quantiles.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct MetricsTree {
|
||||
level: usize,
|
||||
|
@ -248,7 +253,7 @@ fn key_to_parts(key: Key) -> (VecDeque<String>, String) {
|
|||
|
||||
let labels = labels
|
||||
.into_iter()
|
||||
.map(|label| label.into_parts())
|
||||
.map(Label::into_parts)
|
||||
.map(|(k, v)| format!("{}=\"{}\"", k, v))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
|
|
|
@ -130,7 +130,7 @@ impl AtomicWindowedHistogram {
|
|||
// so go ahead and wait until the index is caught up with the upkeep index: the upkeep
|
||||
// index will be ahead of index until upkeep is complete.
|
||||
let mut upkeep_in_progress = false;
|
||||
let mut index = 0;
|
||||
let mut index;
|
||||
loop {
|
||||
index = self.index.load(Ordering::Acquire);
|
||||
let upkeep_index = self.upkeep_index.load(Ordering::Acquire);
|
||||
|
|
|
@ -181,7 +181,7 @@
|
|||
//! `log!`:
|
||||
//! ```rust
|
||||
//! # extern crate metrics_runtime;
|
||||
//! use metrics_runtime::{Receiver, recorders::TextRecorder, exporters::LogExporter};
|
||||
//! use metrics_runtime::{Receiver, recorders::TextBuilder, exporters::LogExporter};
|
||||
//! use log::Level;
|
||||
//! use std::{thread, time::Duration};
|
||||
//! let receiver = Receiver::builder().build().expect("failed to create receiver");
|
||||
|
@ -203,7 +203,7 @@
|
|||
//! sink.record_value("db.queries.select_products_num_rows", num_rows);
|
||||
//!
|
||||
//! // Now create our exporter/recorder configuration, and wire it up.
|
||||
//! let exporter = LogExporter::new(receiver.get_controller(), TextRecorder::new(), Level::Info);
|
||||
//! let exporter = LogExporter::new(receiver.get_controller(), TextBuilder::new(), Level::Info);
|
||||
//!
|
||||
//! // This exporter will now run every 5 seconds, taking a snapshot, rendering it, and writing it
|
||||
//! // via `log!` at the informational level. This particular exporter is running directly on the
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
//!
|
||||
//! Recorders define the format of the metric output: text, JSON, etc.
|
||||
#[cfg(feature = "metrics-recorder-text")]
|
||||
pub use metrics_recorder_text::TextRecorder;
|
||||
pub use metrics_recorder_text::TextBuilder;
|
||||
|
||||
#[cfg(feature = "metrics-recorder-prometheus")]
|
||||
pub use metrics_recorder_prometheus::PrometheusRecorder;
|
||||
pub use metrics_recorder_prometheus::PrometheusBuilder;
|
||||
|
|
Loading…
Reference in New Issue