runtime: add proxy metric support (#39)

This commit is contained in:
Toby Lawrence 2019-07-28 20:10:56 -04:00 committed by GitHub
parent 6af1c97186
commit 6ca2bcf097
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 594 additions and 111 deletions

View File

@ -127,12 +127,13 @@ impl Key {
}
/// Maps the name of this `Key` to a new name.
pub fn map_name<F>(self, f: F) -> Self
pub fn map_name<F, S>(self, f: F) -> Self
where
F: FnOnce(ScopedString) -> ScopedString,
F: FnOnce(ScopedString) -> S,
S: Into<ScopedString>,
{
Key {
name: f(self.name),
name: f(self.name).into(),
labels: self.labels,
}
}

View File

@ -1,8 +1,10 @@
use crate::data::AtomicWindowedHistogram;
use metrics_core::{Key, ScopedString};
use arc_swap::ArcSwapOption;
use metrics_core::Key;
use metrics_util::StreamingIntegers;
use quanta::Clock;
use std::{
fmt,
ops::Deref,
sync::{
atomic::{AtomicI64, AtomicU64, Ordering},
@ -23,14 +25,29 @@ pub enum Scope {
}
impl Scope {
pub(crate) fn into_scoped(self, name: ScopedString) -> ScopedString {
/// Adds a new part to this scope.
pub fn add_part<S>(self, part: S) -> Self
where
S: Into<String>,
{
match self {
Scope::Root => name,
Scope::Root => Scope::Nested(vec![part.into()]),
Scope::Nested(mut parts) => {
if !name.is_empty() {
parts.push(name.to_string());
}
parts.join(".").into()
parts.push(part.into());
Scope::Nested(parts)
}
}
}
pub(crate) fn into_string<S>(self, name: S) -> String
where
S: Into<String>,
{
match self {
Scope::Root => name.into(),
Scope::Nested(mut parts) => {
parts.push(name.into());
parts.join(".")
}
}
}
@ -43,6 +60,7 @@ pub(crate) enum Kind {
Counter,
Gauge,
Histogram,
Proxy,
}
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
@ -70,12 +88,30 @@ enum ValueState {
Counter(AtomicU64),
Gauge(AtomicI64),
Histogram(AtomicWindowedHistogram),
Proxy(ArcSwapOption<Box<ProxyFn>>),
}
#[derive(Debug)]
pub(crate) enum ValueSnapshot {
Single(Measurement),
Multiple(Vec<(Key, Measurement)>),
}
/// A point-in-time metric measurement.
#[derive(Debug)]
pub enum Measurement {
/// Counters represent a single value that can only ever be incremented over time, or reset to
/// zero.
Counter(u64),
/// Gauges represent a single value that can go up _or_ down over time.
Gauge(i64),
/// Histograms measure the distribution of values for a given set of measurements.
///
/// Histograms are slightly special in our case because we want to maintain full fidelity of
/// the underlying dataset. We do this by storing all of the individual data points, but we
/// use [`StreamingIntegers`] to store them in a compressed in-memory form. This allows
/// callers to pass around the compressed dataset and decompress/access the actual integers on
/// demand.
Histogram(StreamingIntegers),
}
@ -108,6 +144,10 @@ impl ValueHandle {
)))
}
pub fn proxy() -> Self {
Self::new(ValueState::Proxy(ArcSwapOption::new(None)))
}
pub fn update_counter(&self, value: u64) {
match self.state.deref() {
ValueState::Counter(inner) => {
@ -131,19 +171,39 @@ impl ValueHandle {
}
}
pub fn update_proxy<F>(&self, value: F)
where
F: Fn() -> Vec<(Key, Measurement)> + Send + Sync + 'static,
{
match self.state.deref() {
ValueState::Proxy(inner) => {
inner.store(Some(Arc::new(Box::new(value))));
}
_ => unreachable!("tried to access as proxy, not a proxy"),
}
}
pub fn snapshot(&self) -> ValueSnapshot {
match self.state.deref() {
ValueState::Counter(inner) => {
let value = inner.load(Ordering::Acquire);
ValueSnapshot::Counter(value)
ValueSnapshot::Single(Measurement::Counter(value))
}
ValueState::Gauge(inner) => {
let value = inner.load(Ordering::Acquire);
ValueSnapshot::Gauge(value)
ValueSnapshot::Single(Measurement::Gauge(value))
}
ValueState::Histogram(inner) => {
let stream = inner.snapshot();
ValueSnapshot::Histogram(stream)
ValueSnapshot::Single(Measurement::Histogram(stream))
}
ValueState::Proxy(maybe) => {
let measurements = match maybe.load() {
None => Vec::new(),
Some(f) => f(),
};
ValueSnapshot::Multiple(measurements)
}
}
}
@ -171,39 +231,42 @@ impl Delta for Instant {
}
}
pub trait ProxyFnInner: Fn() -> Vec<(Key, Measurement)> {}
impl<F> ProxyFnInner for F where F: Fn() -> Vec<(Key, Measurement)> {}
pub type ProxyFn = dyn ProxyFnInner<Output = Vec<(Key, Measurement)>> + Send + Sync + 'static;
impl fmt::Debug for ProxyFn {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ProxyFn")
}
}
#[cfg(test)]
mod tests {
use super::{Scope, ValueHandle, ValueSnapshot};
use super::{Measurement, Scope, ValueHandle, ValueSnapshot};
use metrics_core::Key;
use quanta::Clock;
use std::borrow::Cow;
use std::time::Duration;
#[test]
fn test_metric_scope() {
let root_scope = Scope::Root;
assert_eq!(root_scope.into_scoped("".into()), "".to_string());
assert_eq!(root_scope.into_string(""), "".to_string());
let root_scope = Scope::Root;
assert_eq!(
root_scope.into_scoped("jambalaya".into()),
"jambalaya".to_string()
);
assert_eq!(root_scope.into_string("jambalaya"), "jambalaya".to_string());
let nested_scope = Scope::Nested(vec![]);
assert_eq!(nested_scope.into_scoped("".into()), "".to_string());
assert_eq!(nested_scope.into_string(""), "".to_string());
let nested_scope = Scope::Nested(vec![]);
assert_eq!(
nested_scope.into_scoped("toilet".into()),
"toilet".to_string()
);
assert_eq!(nested_scope.into_string("toilet"), "toilet".to_string());
let nested_scope = Scope::Nested(vec![
"chamber".to_string(),
"of".to_string(),
"secrets".to_string(),
]);
let nested_scope = Scope::Nested(vec!["chamber".to_string(), "of".to_string()]);
assert_eq!(
nested_scope.into_scoped("".into()),
nested_scope.into_string("secrets"),
"chamber.of.secrets".to_string()
);
@ -213,9 +276,30 @@ mod tests {
"secrets".to_string(),
]);
assert_eq!(
nested_scope.into_scoped("toilet".into()),
nested_scope.into_string("toilet"),
"chamber.of.secrets.toilet".to_string()
);
let mut nested_scope = Scope::Root;
nested_scope = nested_scope
.add_part("chamber")
.add_part("of".to_string())
.add_part(Cow::Borrowed("secrets"));
assert_eq!(
nested_scope.into_string(""),
"chamber.of.secrets.".to_string()
);
let mut nested_scope = Scope::Nested(vec![
"chamber".to_string(),
"of".to_string(),
"secrets".to_string(),
]);
nested_scope = nested_scope.add_part("part");
assert_eq!(
nested_scope.into_string("two"),
"chamber.of.secrets.part.two".to_string()
);
}
#[test]
@ -223,14 +307,14 @@ mod tests {
let counter = ValueHandle::counter();
counter.update_counter(42);
match counter.snapshot() {
ValueSnapshot::Counter(value) => assert_eq!(value, 42),
ValueSnapshot::Single(Measurement::Counter(value)) => assert_eq!(value, 42),
_ => panic!("incorrect value snapshot type for counter"),
}
let gauge = ValueHandle::gauge();
gauge.update_gauge(23);
match gauge.snapshot() {
ValueSnapshot::Gauge(value) => assert_eq!(value, 23),
ValueSnapshot::Single(Measurement::Gauge(value)) => assert_eq!(value, 23),
_ => panic!("incorrect value snapshot type for gauge"),
}
@ -240,7 +324,7 @@ mod tests {
histogram.update_histogram(8675309);
histogram.update_histogram(5551212);
match histogram.snapshot() {
ValueSnapshot::Histogram(stream) => {
ValueSnapshot::Single(Measurement::Histogram(stream)) => {
assert_eq!(stream.len(), 2);
let values = stream.decompress();
@ -248,5 +332,37 @@ mod tests {
}
_ => panic!("incorrect value snapshot type for histogram"),
}
let proxy = ValueHandle::proxy();
proxy.update_proxy(|| vec![(Key::from_name("foo"), Measurement::Counter(23))]);
match proxy.snapshot() {
ValueSnapshot::Multiple(mut measurements) => {
assert_eq!(measurements.len(), 1);
let measurement = measurements.pop().expect("should have measurement");
assert_eq!(measurement.0.name().as_ref(), "foo");
match measurement.1 {
Measurement::Counter(i) => assert_eq!(i, 23),
_ => panic!("wrong measurement type"),
}
}
_ => panic!("incorrect value snapshot type for proxy"),
}
// This second one just makes sure that replacing the proxy function functions as intended.
proxy.update_proxy(|| vec![(Key::from_name("bar"), Measurement::Counter(24))]);
match proxy.snapshot() {
ValueSnapshot::Multiple(mut measurements) => {
assert_eq!(measurements.len(), 1);
let measurement = measurements.pop().expect("should have measurement");
assert_eq!(measurement.0.name().as_ref(), "bar");
match measurement.1 {
Measurement::Counter(i) => assert_eq!(i, 24),
_ => panic!("wrong measurement type"),
}
}
_ => panic!("incorrect value snapshot type for proxy"),
}
}
}

View File

@ -1,6 +1,9 @@
use crate::common::ValueHandle;
/// Proxy object to update a counter.
/// A reference to a [`Counter`].
///
/// A [`Counter`] is used for directly updating a counter, without any lookup overhead.
#[derive(Clone)]
pub struct Counter {
handle: ValueHandle,
}

View File

@ -1,6 +1,9 @@
use crate::common::ValueHandle;
/// Proxy object to update a gauge.
/// A reference to a [`Gauge`].
///
/// A [`Gauge`] is used for directly updating a gauge, without any lookup overhead.
#[derive(Clone)]
pub struct Gauge {
handle: ValueHandle,
}

View File

@ -7,7 +7,10 @@ use std::cmp;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::Duration;
/// Proxy object to update a histogram.
/// A reference to a [`Histogram`].
///
/// A [`Histogram`] is used for directly updating a gauge, without any lookup overhead.
#[derive(Clone)]
pub struct Histogram {
handle: ValueHandle,
}

View File

@ -1,15 +1,15 @@
use crate::common::ValueSnapshot;
use crate::common::Measurement;
use metrics_core::Key;
/// A point-in-time view of metric data.
/// A collection of point-in-time metric measurements.
#[derive(Default, Debug)]
pub struct Snapshot {
measurements: Vec<(Key, ValueSnapshot)>,
measurements: Vec<(Key, Measurement)>,
}
impl Snapshot {
pub(crate) fn new(measurements: Vec<(Key, ValueSnapshot)>) -> Self {
Snapshot { measurements }
pub(crate) fn new(measurements: Vec<(Key, Measurement)>) -> Self {
Self { measurements }
}
/// Number of measurements in this snapshot.
@ -21,4 +21,9 @@ impl Snapshot {
pub fn is_empty(&self) -> bool {
self.measurements.len() != 0
}
/// Converts a [`Snapshot`] into the internal measurements.
pub fn into_measurements(self) -> Vec<(Key, Measurement)> {
self.measurements
}
}

View File

@ -86,9 +86,8 @@
//!
//! ```rust
//! # extern crate metrics_runtime;
//! use metrics_runtime::Receiver;
//! let receiver = Receiver::builder().build().expect("failed to create receiver");
//!
//! # use metrics_runtime::Receiver;
//! # let receiver = Receiver::builder().build().expect("failed to create receiver");
//! // This sink has no scope aka the root scope. The metric will just end up as "widgets".
//! let mut root_sink = receiver.get_sink();
//! root_sink.record_counter("widgets", 42);
@ -127,11 +126,9 @@
//! ```rust
//! # extern crate metrics_runtime;
//! # fn run_query(_: &str) -> u64 { 42 }
//! use metrics_runtime::Receiver;
//! let receiver = Receiver::builder().build().expect("failed to create receiver");
//!
//! let mut sink = receiver.get_sink();
//!
//! # use metrics_runtime::Receiver;
//! # let receiver = Receiver::builder().build().expect("failed to create receiver");
//! # let mut sink = receiver.get_sink();
//! // We might have a function that interacts with a database and returns the number of rows it
//! // touched in doing so.
//! fn process_query(query: &str) -> u64 {
@ -162,10 +159,85 @@
//! to utilize.
//!
//! Naturally, these methods can be slightly cumbersome and visually detracting, in which case
//! you can utilize the proxy metric types -- [`Counter`], [`Gauge`], and [`Histogram`] -- and
//! create them with labels ahead of time. These types, by their nature, are bound to a specific
//! metric, which encompasses name, scope, and labels, and so extra labels cannot be passed when
//! actually updating them.
//! you can utilize the metric handles -- [`Counter`](crate::data::Counter),
//! [`Gauge`](crate::data::Gauge), and [`Histogram`](crate::data::Histogram) -- and create them
//! with labels ahead of time.
//!
//! These handles are bound to the given metric type, as well as the name, labels, and scope of the
//! sink. Thus, there is no overhead of looking up the metric as with the `record_*` methods, and
//! the values can be updated directly, and with less overhead, resulting in faster method calls.
//!
//! ```rust
//! # extern crate metrics_runtime;
//! # use metrics_runtime::Receiver;
//! # use std::time::Instant;
//! # let receiver = Receiver::builder().build().expect("failed to create receiver");
//! # let mut sink = receiver.get_sink();
//! // Let's create a counter.
//! let egg_count = sink.counter("eggs");
//!
//! // I want a baker's dozen of eggs!
//! egg_count.increment();
//! egg_count.record(12);
//!
//! // This updates the same metric as above! We have so many eggs now!
//! sink.record_counter("eggs", 12);
//!
//! // Gauges and histograms don't have any extra helper methods, just `record`:
//! let gauge = sink.gauge("population");
//! gauge.record(8_000_000_000);
//!
//! let histogram = sink.histogram("distribution");
//!
//! // You can record a histogram value directly:
//! histogram.record_value(42);
//!
//! // Or handily pass it two [`Delta`]-compatible values, and have it calculate the delta for you:
//! let start = Instant::now();
//! let end = Instant::now();
//! histogram.record_timing(start, end);
//!
//! // Each of these methods also has a labels-aware companion:
//! let labeled_counter = sink.counter_with_labels("egg_count", &[("type", "large_brown")]);
//! let labeled_gauge = sink.gauge_with_labels("population", &[("country", "austria")]);
//! let labeled_histogram = sink.histogram_with_labels("distribution", &[("type", "performance")]);
//! # fn main() {}
//! ```
//!
//! # Proxies
//!
//! Sometimes, you may have a need to pull in "external" metrics: values related to your
//! application that your application itself doesn't generate, such as system-level metrics.
//!
//! [`Sink`] allows you to register a "proxy metric", which gives the ability to return metrics
//! on-demand when a snapshot is being taken. Users provide a closure that is run every time a
//! snapshot is being taken, which can return multiple metrics, which are then added to overall
//! list of metrics being held by `metrics-runtime` itself.
//!
//! If metrics are relatively expensive to calculate -- say, accessing the /proc filesytem on Linux
//! -- then this can be a great alternative to polling them yourself and having to update them
//! normally on some sort of schedule.
//!
//! ```rust
//! # extern crate metrics_runtime;
//! # extern crate metrics_core;
//! # use metrics_core::Key;
//! # use metrics_runtime::{Receiver, Measurement};
//! # use std::time::Instant;
//! # let receiver = Receiver::builder().build().expect("failed to create receiver");
//! # let mut sink = receiver.get_sink();
//! // A proxy is now registered under the name "load_stats", which is prepended to all the metrics
//! // generated by the closure i.e. "load_stats.avg_1min". These metrics are also still scoped
//! // normally based on the [`Sink`].
//! sink.proxy("load_stat", || {
//! let mut values = Vec::new();
//! values.push((Key::from_name("avg_1min"), Measurement::Gauge(19)));
//! values.push((Key::from_name("avg_5min"), Measurement::Gauge(12)));
//! values.push((Key::from_name("avg_10min"), Measurement::Gauge(10)));
//! values
//! });
//! # fn main() { }
//! ```
//!
//! # Snapshots
//!
@ -262,7 +334,7 @@ pub mod observers;
pub use self::{
builder::{Builder, BuilderError},
common::{Delta, Scope},
common::{Delta, Measurement, Scope},
control::Controller,
receiver::Receiver,
sink::{AsScoped, Sink, SinkError},

View File

@ -1,4 +1,4 @@
use crate::common::{Identifier, Kind, ValueHandle, ValueSnapshot};
use crate::common::{Identifier, Kind, Measurement, ValueHandle, ValueSnapshot};
use crate::config::Configuration;
use crate::data::Snapshot;
use crate::registry::ScopeRegistry;
@ -40,6 +40,7 @@ impl MetricRegistry {
self.config.histogram_granularity,
self.clock.clone(),
),
Kind::Proxy => ValueHandle::proxy(),
};
let metrics_ptr = self.metrics.lease();
@ -65,19 +66,35 @@ impl MetricRegistry {
}
pub fn snapshot(&self) -> Snapshot {
let mut named_values = Vec::new();
let mut values = Vec::new();
let metrics = self.metrics.load().deref().clone();
for (id, value) in metrics.into_iter() {
let (key, scope_handle, _) = id.into_parts();
let scope = self.scope_registry.get(scope_handle);
let key = key.map_name(|name| scope.into_scoped(name));
let snapshot = value.snapshot();
named_values.push((key, snapshot));
match value.snapshot() {
ValueSnapshot::Single(measurement) => {
let key = key.map_name(|name| scope.into_string(name));
values.push((key, measurement));
}
ValueSnapshot::Multiple(mut measurements) => {
// Tack on the key name that this proxy was registered with to the scope so
// that we can clone _that_, and then scope our individual measurements.
let (base_key, labels) = key.into_parts();
let scope = scope.clone().add_part(base_key);
for (subkey, measurement) in measurements.drain(..) {
let scope = scope.clone();
let mut subkey = subkey.map_name(|name| scope.into_string(name));
subkey.add_labels(labels.clone());
values.push((subkey, measurement));
}
}
}
}
Snapshot::new(named_values)
Snapshot::new(values)
}
pub fn observe<O: Observer>(&self, observer: &mut O) {
@ -85,15 +102,151 @@ impl MetricRegistry {
for (id, value) in metrics.into_iter() {
let (key, scope_handle, _) = id.into_parts();
let scope = self.scope_registry.get(scope_handle);
let key = key.map_name(|name| scope.into_scoped(name));
match value.snapshot() {
ValueSnapshot::Counter(value) => observer.observe_counter(key, value),
ValueSnapshot::Gauge(value) => observer.observe_gauge(key, value),
ValueSnapshot::Histogram(stream) => stream.decompress_with(|values| {
let observe = |observer: &mut O, key, measurement| match measurement {
Measurement::Counter(value) => observer.observe_counter(key, value),
Measurement::Gauge(value) => observer.observe_gauge(key, value),
Measurement::Histogram(stream) => stream.decompress_with(|values| {
observer.observe_histogram(key.clone(), values);
}),
};
match value.snapshot() {
ValueSnapshot::Single(measurement) => {
let key = key.map_name(|name| scope.into_string(name));
observe(observer, key, measurement);
}
ValueSnapshot::Multiple(mut measurements) => {
// Tack on the key name that this proxy was registered with to the scope so
// that we can clone _that_, and then scope our individual measurements.
let (base_key, labels) = key.into_parts();
let scope = scope.clone().add_part(base_key);
for (subkey, measurement) in measurements.drain(..) {
let scope = scope.clone();
let mut subkey = subkey.map_name(|name| scope.into_string(name));
subkey.add_labels(labels.clone());
observe(observer, subkey, measurement);
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::{
Clock, Configuration, Identifier, Kind, Measurement, MetricRegistry, ScopeRegistry,
};
use crate::data::{Counter, Gauge, Histogram};
use metrics_core::{Key, Label};
use metrics_util::StreamingIntegers;
use std::mem;
use std::sync::Arc;
#[test]
fn test_snapshot() {
// Get our registry.
let sr = Arc::new(ScopeRegistry::new());
let config = Configuration::mock();
let (clock, _) = Clock::mock();
let mr = Arc::new(MetricRegistry::new(sr, config, clock));
// Set some metrics.
let cid = Identifier::new("counter", 0, Kind::Counter);
let counter: Counter = mr.get_or_register(cid).into();
counter.record(15);
let gid = Identifier::new("gauge", 0, Kind::Gauge);
let gauge: Gauge = mr.get_or_register(gid).into();
gauge.record(89);
let hid = Identifier::new("histogram", 0, Kind::Histogram);
let histogram: Histogram = mr.get_or_register(hid).into();
histogram.record_value(89);
let pid = Identifier::new("proxy", 0, Kind::Proxy);
let proxy = mr.get_or_register(pid);
proxy.update_proxy(|| vec![(Key::from_name("counter"), Measurement::Counter(13))]);
let mut snapshot = mr.snapshot().into_measurements();
snapshot.sort_by_key(|(k, _)| k.name());
let mut expected = vec![
(Key::from_name("counter"), Measurement::Counter(15)),
(Key::from_name("gauge"), Measurement::Gauge(89)),
(
Key::from_name("histogram"),
Measurement::Histogram(StreamingIntegers::new()),
),
(Key::from_name("proxy.counter"), Measurement::Counter(13)),
];
expected.sort_by_key(|(k, _)| k.name());
assert_eq!(snapshot.len(), expected.len());
for rhs in expected {
let lhs = snapshot.remove(0);
assert_eq!(lhs.0, rhs.0);
assert_eq!(mem::discriminant(&lhs.1), mem::discriminant(&rhs.1));
}
}
#[test]
fn test_snapshot_with_labels() {
// Get our registry.
let sr = Arc::new(ScopeRegistry::new());
let config = Configuration::mock();
let (clock, _) = Clock::mock();
let mr = Arc::new(MetricRegistry::new(sr, config, clock));
let labels = vec![Label::new("type", "test")];
// Set some metrics.
let cid = Identifier::new(("counter", labels.clone()), 0, Kind::Counter);
let counter: Counter = mr.get_or_register(cid).into();
counter.record(15);
let gid = Identifier::new(("gauge", labels.clone()), 0, Kind::Gauge);
let gauge: Gauge = mr.get_or_register(gid).into();
gauge.record(89);
let hid = Identifier::new(("histogram", labels.clone()), 0, Kind::Histogram);
let histogram: Histogram = mr.get_or_register(hid).into();
histogram.record_value(89);
let pid = Identifier::new(("proxy", labels.clone()), 0, Kind::Proxy);
let proxy = mr.get_or_register(pid);
proxy.update_proxy(|| vec![(Key::from_name("counter"), Measurement::Counter(13))]);
let mut snapshot = mr.snapshot().into_measurements();
snapshot.sort_by_key(|(k, _)| k.name());
let mut expected = vec![
(
Key::from_name_and_labels("counter", labels.clone()),
Measurement::Counter(15),
),
(
Key::from_name_and_labels("gauge", labels.clone()),
Measurement::Gauge(89),
),
(
Key::from_name_and_labels("histogram", labels.clone()),
Measurement::Histogram(StreamingIntegers::new()),
),
(
Key::from_name_and_labels("proxy.counter", labels.clone()),
Measurement::Counter(13),
),
];
expected.sort_by_key(|(k, _)| k.name());
assert_eq!(snapshot.len(), expected.len());
for rhs in expected {
let lhs = snapshot.remove(0);
assert_eq!(lhs.0, rhs.0);
assert_eq!(mem::discriminant(&lhs.1), mem::discriminant(&rhs.1));
}
}
}

View File

@ -54,3 +54,36 @@ impl ScopeRegistry {
rg.backward.get(&scope_id).cloned().unwrap_or(Scope::Root)
}
}
#[cfg(test)]
mod tests {
use super::{Scope, ScopeRegistry};
#[test]
fn test_simple_write_then_read() {
let nested1 = Scope::Root.add_part("nested1");
let nested2 = nested1.clone().add_part("nested2");
let sr = ScopeRegistry::new();
let doesnt_exist0 = sr.get(0);
let doesnt_exist1 = sr.get(1);
let doesnt_exist2 = sr.get(2);
assert_eq!(doesnt_exist0, Scope::Root);
assert_eq!(doesnt_exist1, Scope::Root);
assert_eq!(doesnt_exist2, Scope::Root);
let nested1_original = nested1.clone();
let nested1_id = sr.register(nested1);
let nested2_original = nested2.clone();
let nested2_id = sr.register(nested2);
let exists1 = sr.get(nested1_id);
let exists2 = sr.get(nested2_id);
assert_eq!(exists1, nested1_original);
assert_eq!(exists2, nested2_original);
}
}

View File

@ -1,5 +1,5 @@
use crate::{
common::{Delta, Identifier, Kind, Scope, ScopeHandle, ValueHandle},
common::{Delta, Identifier, Kind, Measurement, Scope, ScopeHandle, ValueHandle},
data::{Counter, Gauge, Histogram},
registry::{MetricRegistry, ScopeRegistry},
};
@ -220,7 +220,7 @@ impl Sink {
///
/// Both the start and end times must be supplied, but any values that implement [`Delta`] can
/// be used which allows for raw values from [`quanta::Clock`] to be used, or measurements from
/// [`Instant::now`].
/// [`Instant::now`](std::time::Instant::now).
///
/// # Examples
///
@ -251,7 +251,7 @@ impl Sink {
///
/// Both the start and end times must be supplied, but any values that implement [`Delta`] can
/// be used which allows for raw values from [`quanta::Clock`] to be used, or measurements from
/// [`Instant::now`].
/// [`Instant::now`](std::time::Instant::now).
///
/// # Examples
///
@ -333,8 +333,10 @@ impl Sink {
/// Creates a handle to the given counter.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying counter. It is merely a proxy, so multiple handles to the same counter can be
/// held and used.
/// underlying counter without requiring a [`Sink`]. This method can be called multiple times
/// with the same `name` and the handle will point to the single underlying instance.
///
/// [`Counter`] is clonable.
///`
/// # Examples
///
@ -362,8 +364,10 @@ impl Sink {
/// Creates a handle to the given counter, with labels attached.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying counter. It is merely a proxy, so multiple handles to the same counter can be
/// held and used.
/// underlying counter without requiring a [`Sink`]. This method can be called multiple times
/// with the same `name`/`labels` and the handle will point to the single underlying instance.
///
/// [`Counter`] is clonable.
///
/// # Examples
///
@ -391,8 +395,10 @@ impl Sink {
/// Creates a handle to the given gauge.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying gauge. It is merely a proxy, so multiple handles to the same gauge can be
/// held and used.
/// underlying gauge without requiring a [`Sink`]. This method can be called multiple times
/// with the same `name` and the handle will point to the single underlying instance.
///
/// [`Gauge`] is clonable.
///
/// # Examples
///
@ -414,11 +420,13 @@ impl Sink {
self.get_owned_value_handle(key, Kind::Gauge).into()
}
/// Creates a handle to the given gauge.
/// Creates a handle to the given gauge, with labels attached.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying gauge. It is merely a proxy, so multiple handles to the same gauge can be
/// held and used.
/// underlying gauge without requiring a [`Sink`]. This method can be called multiple times
/// with the same `name`/`labels` and the handle will point to the single underlying instance.
///
/// [`Gauge`] is clonable.
///
/// # Examples
///
@ -443,8 +451,10 @@ impl Sink {
/// Creates a handle to the given histogram.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying histogram. It is merely a proxy, so multiple handles to the same histogram
/// can be held and used.
/// underlying histogram without requiring a [`Sink`]. This method can be called multiple
/// times with the same `name` and the handle will point to the single underlying instance.
///
/// [`Histogram`] is clonable.
///
/// # Examples
///
@ -476,11 +486,13 @@ impl Sink {
self.get_owned_value_handle(key, Kind::Histogram).into()
}
/// Creates a handle to the given histogram.
/// Creates a handle to the given histogram, with labels attached.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying histogram. It is merely a proxy, so multiple handles to the same histogram
/// can be held and used.
/// underlying histogram without requiring a [`Sink`]. This method can be called multiple
/// times with the same `name` and the handle will point to the single underlying instance.
///
/// [`Histogram`] is clonable.
///
/// # Examples
///
@ -512,6 +524,105 @@ impl Sink {
self.histogram((name, labels))
}
/// Creates a proxy metric.
///
/// Proxy metrics allow you to register a closure that, when a snapshot of the metric state is
/// requested, will be called and have a chance to return multiple metrics that are added to
/// the overall metric of actual metrics.
///
/// This can be useful for metrics which are expensive to constantly recalculate/poll, allowing
/// you to avoid needing to calculate/push them them yourself, with all the boilerplate that
/// comes with doing so periodically.
///
/// Individual metrics must provide their own key (name), which will be appended to the name
/// given when registering the proxy. A proxy can be reregistered at any time by calling this
/// function again with the same name.
///
/// # Examples
///
/// ```rust
/// # extern crate metrics_runtime;
/// # extern crate metrics_core;
/// # use metrics_runtime::{Receiver, Measurement};
/// # use metrics_core::Key;
/// # use std::thread;
/// # use std::time::Duration;
/// # fn main() {
/// let receiver = Receiver::builder().build().expect("failed to create receiver");
/// let mut sink = receiver.get_sink();
///
/// // A proxy is now registered under the name "load_stats", which is prepended to all the
/// // metrics generated by the closure i.e. "load_stats.avg_1min". These metrics are also
/// // still scoped normally based on the [`Sink`].
/// sink.proxy("load_stats", || {
/// let mut values = Vec::new();
/// values.push((Key::from_name("avg_1min"), Measurement::Gauge(19)));
/// values.push((Key::from_name("avg_5min"), Measurement::Gauge(12)));
/// values.push((Key::from_name("avg_10min"), Measurement::Gauge(10)));
/// values
/// });
/// # }
/// ```
pub fn proxy<N, F>(&mut self, name: N, f: F)
where
N: Into<Key>,
F: Fn() -> Vec<(Key, Measurement)> + Send + Sync + 'static,
{
let id = Identifier::new(name.into(), self.scope_handle, Kind::Proxy);
let handle = self.get_cached_value_handle(id);
handle.update_proxy(f);
}
/// Creates a proxy metric, with labels attached.
///
/// Proxy metrics allow you to register a closure that, when a snapshot of the metric state is
/// requested, will be called and have a chance to return multiple metrics that are added to
/// the overall metric of actual metrics.
///
/// This can be useful for metrics which are expensive to constantly recalculate/poll, allowing
/// you to avoid needing to calculate/push them them yourself, with all the boilerplate that
/// comes with doing so periodically.
///
/// Individual metrics must provide their own key (name), which will be appended to the name
/// given when registering the proxy. A proxy can be reregistered at any time by calling this
/// function again with the same name.
///
/// # Examples
///
/// ```rust
/// # extern crate metrics_runtime;
/// # extern crate metrics_core;
/// # use metrics_runtime::{Receiver, Measurement};
/// # use metrics_core::Key;
/// # use std::thread;
/// # use std::time::Duration;
/// # fn main() {
/// let receiver = Receiver::builder().build().expect("failed to create receiver");
/// let mut sink = receiver.get_sink();
///
/// let system_name = "web03".to_string();
///
/// // A proxy is now registered under the name "load_stats", which is prepended to all the
/// // metrics generated by the closure i.e. "load_stats.avg_1min". These metrics are also
/// // still scoped normally based on the [`Sink`].
/// sink.proxy_with_labels("load_stats", &[("system", system_name)], || {
/// let mut values = Vec::new();
/// values.push((Key::from_name("avg_1min"), Measurement::Gauge(19)));
/// values.push((Key::from_name("avg_5min"), Measurement::Gauge(12)));
/// values.push((Key::from_name("avg_10min"), Measurement::Gauge(10)));
/// values
/// });
/// # }
/// ```
pub fn proxy_with_labels<N, L, F>(&mut self, name: N, labels: L, f: F)
where
N: Into<ScopedString>,
L: IntoLabels,
F: Fn() -> Vec<(Key, Measurement)> + Send + Sync + 'static,
{
self.proxy((name, labels), f)
}
pub(crate) fn construct_key<K>(&self, key: K) -> Key
where
K: Into<Key>,
@ -562,16 +673,7 @@ impl Clone for Sink {
impl<'a> AsScoped<'a> for str {
fn as_scoped(&'a self, base: Scope) -> Scope {
match base {
Scope::Root => {
let parts = vec![self.to_owned()];
Scope::Nested(parts)
}
Scope::Nested(mut parts) => {
parts.push(self.to_owned());
Scope::Nested(parts)
}
}
base.add_part(self.to_string())
}
}
@ -581,17 +683,9 @@ where
T: 'a,
{
fn as_scoped(&'a self, base: Scope) -> Scope {
match base {
Scope::Root => {
let parts = self.as_ref().iter().map(|s| s.to_string()).collect();
Scope::Nested(parts)
}
Scope::Nested(mut parts) => {
let mut new_parts = self.as_ref().iter().map(|s| s.to_string()).collect();
parts.append(&mut new_parts);
Scope::Nested(parts)
}
}
self.as_ref()
.iter()
.fold(base, |s, ss| s.add_part(ss.to_string()))
}
}

View File

@ -157,14 +157,14 @@ use std::{
#[macro_use]
mod macros;
static mut RECORDER: &'static Recorder = &NoopRecorder;
static mut RECORDER: &'static dyn Recorder = &NoopRecorder;
static STATE: AtomicUsize = AtomicUsize::new(0);
const UNINITIALIZED: usize = 0;
const INITIALIZING: usize = 1;
const INITIALIZED: usize = 2;
static SET_RECORDER_ERROR: &'static str =
static SET_RECORDER_ERROR: &str =
"attempted to set a recorder after the metrics system was already initialized";
/// A value that records metrics behind the facade.
@ -216,7 +216,7 @@ impl Recorder for NoopRecorder {
///
/// An error is returned if a recorder has already been set.
#[cfg(atomic_cas)]
pub fn set_recorder(recorder: &'static Recorder) -> Result<(), SetRecorderError> {
pub fn set_recorder(recorder: &'static dyn Recorder) -> Result<(), SetRecorderError> {
set_recorder_inner(|| recorder)
}
@ -232,14 +232,14 @@ pub fn set_recorder(recorder: &'static Recorder) -> Result<(), SetRecorderError>
///
/// An error is returned if a recorder has already been set.
#[cfg(all(feature = "std", atomic_cas))]
pub fn set_boxed_recorder(recorder: Box<Recorder>) -> Result<(), SetRecorderError> {
pub fn set_boxed_recorder(recorder: Box<dyn Recorder>) -> Result<(), SetRecorderError> {
set_recorder_inner(|| unsafe { &*Box::into_raw(recorder) })
}
#[cfg(atomic_cas)]
fn set_recorder_inner<F>(make_recorder: F) -> Result<(), SetRecorderError>
where
F: FnOnce() -> &'static Recorder,
F: FnOnce() -> &'static dyn Recorder,
{
unsafe {
match STATE.compare_and_swap(UNINITIALIZED, INITIALIZING, Ordering::SeqCst) {
@ -274,7 +274,7 @@ where
///
/// It is safe to use other metrics functions while this function runs (including all metrics
/// macros).
pub unsafe fn set_recorder_racy(recorder: &'static Recorder) -> Result<(), SetRecorderError> {
pub unsafe fn set_recorder_racy(recorder: &'static dyn Recorder) -> Result<(), SetRecorderError> {
match STATE.load(Ordering::SeqCst) {
UNINITIALIZED => {
RECORDER = recorder;
@ -310,7 +310,7 @@ impl error::Error for SetRecorderError {
/// Returns a reference to the recorder.
///
/// If a recorder has not been set, a no-op implementation is returned.
pub fn recorder() -> &'static Recorder {
pub fn recorder() -> &'static dyn Recorder {
unsafe {
if STATE.load(Ordering::SeqCst) != INITIALIZED {
static NOOP: NoopRecorder = NoopRecorder;