From 6b9818b0054ef8a115f433951632d152a1b4179d Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Wed, 3 Apr 2019 00:30:24 -0400 Subject: [PATCH] A lot of documentation and syntax cleanups. --- metrics/CODE_OF_CONDUCT.md | 6 ++- metrics/Cargo.toml | 5 +- metrics/Makefile.toml | 33 ------------ metrics/examples/benchmark.rs | 39 ++++++++++---- metrics/src/configuration.rs | 29 +++++------ metrics/src/control.rs | 4 +- metrics/src/data/counter.rs | 8 +-- metrics/src/data/gauge.rs | 8 +-- metrics/src/data/histogram.rs | 29 ++++++----- metrics/src/data/mod.rs | 8 ++- metrics/src/data/snapshot.rs | 15 +++--- metrics/src/helper.rs | 8 ++- metrics/src/lib.rs | 97 ++++++++++++++--------------------- metrics/src/receiver.rs | 54 ++++++++++--------- metrics/src/sink.rs | 28 +++++++--- 15 files changed, 189 insertions(+), 182 deletions(-) delete mode 100644 metrics/Makefile.toml diff --git a/metrics/CODE_OF_CONDUCT.md b/metrics/CODE_OF_CONDUCT.md index 3cee6e1..697add6 100644 --- a/metrics/CODE_OF_CONDUCT.md +++ b/metrics/CODE_OF_CONDUCT.md @@ -4,7 +4,9 @@ This document is based on the [Rust Code of Conduct](https://www.rust-lang.org/c ## Conduct -**Contact**: [toby@nuclearfurnace.com](mailto:toby@nuclearfurnace.com) +**Contact**: +[toby@nuclearfurnace.com](mailto:toby@nuclearfurnace.com) +[luciofranco14@gmail.com](mailto:luciofranco14@gmail.com) * We are committed to providing a friendly, safe and welcoming environment for all, regardless of level of experience, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, nationality, or other similar characteristic. * Avoid using overtly sexual nicknames or other nicknames that might detract from a friendly, safe and welcoming environment for all. @@ -17,7 +19,7 @@ This document is based on the [Rust Code of Conduct](https://www.rust-lang.org/c ## Moderation -These are the policies for upholding our community's standards of conduct. If you feel that a thread needs moderation, please use the contact information above, or mention @tobz in the thread. +These are the policies for upholding our community's standards of conduct. If you feel that a thread needs moderation, please use the contact information above, or mention @tobz or @LucioFranco in the thread. 1. Remarks that violate this Code of Conduct, including hateful, hurtful, oppressive, or exclusionary remarks, are not allowed. (Cursing is allowed, but never targeting another user, and never in a hateful manner.) 2. Remarks that moderators find inappropriate, whether listed in the code of conduct or not, are also not allowed. diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 1c0ec3f..62e0b54 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "metrics" version = "0.9.0" -authors = ["Toby Lawrence "] +authors = ["Toby Lawrence "] edition = "2018" license = "MIT" @@ -27,8 +27,7 @@ crossbeam-channel = "^0.3" parking_lot = "^0.7" fnv = "^1.0" hashbrown = "^0.1" -quanta = { version = "^0.2", features = ["asm"] } -derivative = "^1.0" +quanta = "^0.2" tokio-sync = "^0.1" [dev-dependencies] diff --git a/metrics/Makefile.toml b/metrics/Makefile.toml deleted file mode 100644 index 83fd0ad..0000000 --- a/metrics/Makefile.toml +++ /dev/null @@ -1,33 +0,0 @@ -[env] -RUST_TEST_THREADS = "1" -CARGO_MAKE_RUN_CODECOV = "true" -RUSTFLAGS = "-D warnings" -CARGO_FEATURES = "--no-default-features" - -[tasks.format-stable] -command = "cargo" -args = ["fmt"] - -[tasks.pre-verify-project] -dependencies = ["check-format", "clippy", "coverage-flow"] - -[tasks.pre-coverage] -dependencies = ["test"] - -[tasks.build-verbose] -description = "Runs the rust compiler with verbose output, with interpolated features." -category = "Build" -command = "cargo" -args = ["build", "--verbose", "${CARGO_FEATURES}"] - -[tasks.test-verbose] -description = "Runs all available tests with verbose output, with interpolated features." -category = "Test" -command = "cargo" -args = ["test", "--verbose", "${CARGO_FEATURES}"] - -[tasks.test] -description = "Runs all available tests, with interpolated features." -category = "Test" -command = "cargo" -args = ["test", "${CARGO_FEATURES}"] diff --git a/metrics/examples/benchmark.rs b/metrics/examples/benchmark.rs index f759a12..0ac4528 100644 --- a/metrics/examples/benchmark.rs +++ b/metrics/examples/benchmark.rs @@ -8,7 +8,7 @@ extern crate metrics_core; use getopts::Options; use hdrhistogram::Histogram; -use metrics::{Receiver, Sink, snapshot::TypedMeasurement}; +use metrics::{snapshot::TypedMeasurement, Receiver, Sink}; use std::{ env, sync::{ @@ -81,10 +81,25 @@ fn print_usage(program: &str, opts: &Options) { pub fn opts() -> Options { let mut opts = Options::new(); - opts.optopt("d", "duration", "number of seconds to run the benchmark", "INTEGER"); + opts.optopt( + "d", + "duration", + "number of seconds to run the benchmark", + "INTEGER", + ); opts.optopt("p", "producers", "number of producers", "INTEGER"); - opts.optopt("c", "capacity", "maximum number of unprocessed items", "INTEGER"); - opts.optopt("b", "batch-size", "maximum number of items in a batch", "INTEGER"); + opts.optopt( + "c", + "capacity", + "maximum number of unprocessed items", + "INTEGER", + ); + opts.optopt( + "b", + "batch-size", + "maximum number of items in a batch", + "INTEGER", + ); opts.optflag("h", "help", "print this help menu"); opts @@ -102,7 +117,7 @@ fn main() { Err(f) => { error!("Failed to parse command line args: {}", f); return; - }, + } }; if matches.opt_present("help") { @@ -183,19 +198,21 @@ fn main() { let end = Instant::now(); snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64); - let turn_total = snapshot.unwrap().into_measurements() + let turn_total = snapshot + .unwrap() + .into_measurements() .iter() .fold(0, |mut acc, m| { match m { TypedMeasurement::Counter(key, value) => { println!("got counter {} -> {}", key, value); acc += *value; - }, + } TypedMeasurement::Gauge(key, value) => { println!("got counter {} -> {}", key, value); acc += *value as u64; - }, - _ => {}, + } + _ => {} } acc @@ -229,7 +246,9 @@ fn main() { } } -fn duration_as_nanos(d: Duration) -> f64 { (d.as_secs() as f64 * 1e9) + d.subsec_nanos() as f64 } +fn duration_as_nanos(d: Duration) -> f64 { + (d.as_secs() as f64 * 1e9) + d.subsec_nanos() as f64 +} fn nanos_to_readable(t: u64) -> String { let f = t as f64; diff --git a/metrics/src/configuration.rs b/metrics/src/configuration.rs index beb3dd2..463b52d 100644 --- a/metrics/src/configuration.rs +++ b/metrics/src/configuration.rs @@ -23,7 +23,9 @@ impl Default for Configuration { impl Configuration { /// Creates a new [`Configuration`] with default values. - pub fn new() -> Configuration { Default::default() } + pub fn new() -> Configuration { + Default::default() + } /// Sets the buffer capacity. /// @@ -38,7 +40,7 @@ impl Configuration { /// at our default value, we preallocate roughly ~32KB. /// /// Generally speaking, sending and processing metrics is fast enough that the default value of - /// 4096 supports millions of samples per second. + /// 512 supports millions of samples per second. pub fn capacity(mut self, capacity: usize) -> Self { self.capacity = capacity; self @@ -50,8 +52,8 @@ impl Configuration { /// /// This controls the size of message batches that we collect for processing. The only real /// reason to tweak this is to control the latency from the sender side. Larger batches lower - /// the ingest latency in the face of high metric ingest pressure at the cost of higher tail - /// latencies. + /// the ingest latency in the face of high metric ingest pressure at the cost of higher ingest + /// tail latencies. /// /// Long story short, you shouldn't need to change this, but it's here if you really do. pub fn batch_size(mut self, batch_size: usize) -> Self { @@ -63,17 +65,12 @@ impl Configuration { /// /// Defaults to a 10 second window with 1 second granularity. /// - /// This controls how long of a time frame the histogram will track, on a rolling window. - /// We'll create enough underlying histogram buckets so that we have (window / granularity) - /// buckets, and every interval that passes (granularity), we'll add a new bucket and drop the - /// oldest one, thereby providing a rolling window. + /// This controls both how long of a time window we track histogram data for, and the + /// granularity in which we roll off old data. /// - /// Histograms, under the hood, are hard-coded to track three significant digits, and will take - /// a theoretical maximum of around 60KB per bucket, so a single histogram metric with the - /// default window/granularity will take a maximum of around 600KB. - /// - /// In practice, this should be much smaller based on the maximum values pushed into the - /// histogram, as the underlying histogram storage is automatically resized on the fly. + /// As an example, with the default values, we would keep the last 10 seconds worth of + /// histogram data, and would remove 1 seconds worth of data at a time as the window rolled + /// forward. pub fn histogram(mut self, window: Duration, granularity: Duration) -> Self { self.histogram_window = window; self.histogram_granularity = granularity; @@ -81,5 +78,7 @@ impl Configuration { } /// Create a [`Receiver`] based on this configuration. - pub fn build(self) -> Receiver { Receiver::from_config(self) } + pub fn build(self) -> Receiver { + Receiver::from_config(self) + } } diff --git a/metrics/src/control.rs b/metrics/src/control.rs index 5432fa2..5cf1986 100644 --- a/metrics/src/control.rs +++ b/metrics/src/control.rs @@ -32,7 +32,9 @@ pub struct Controller { } impl Controller { - pub(crate) fn new(control_tx: Sender) -> Controller { Controller { control_tx } } + pub(crate) fn new(control_tx: Sender) -> Controller { + Controller { control_tx } + } /// Retrieves a snapshot of the current metric state. pub fn get_snapshot(&self) -> Result { diff --git a/metrics/src/data/counter.rs b/metrics/src/data/counter.rs index a4bc827..f68d2e3 100644 --- a/metrics/src/data/counter.rs +++ b/metrics/src/data/counter.rs @@ -1,6 +1,6 @@ +use crate::data::ScopedKey; use fnv::FnvBuildHasher; use hashbrown::HashMap; -use crate::data::ScopedKey; pub(crate) struct Counter { data: HashMap, @@ -9,7 +9,7 @@ pub(crate) struct Counter { impl Counter { pub fn new() -> Counter { Counter { - data: HashMap::::default(), + data: HashMap::default(), } } @@ -18,7 +18,9 @@ impl Counter { *value = value.wrapping_add(delta); } - pub fn values(&self) -> Vec<(ScopedKey, u64)> { self.data.iter().map(|(k, v)| (k.clone(), *v)).collect() } + pub fn values(&self) -> Vec<(ScopedKey, u64)> { + self.data.iter().map(|(k, v)| (k.clone(), *v)).collect() + } } #[cfg(test)] diff --git a/metrics/src/data/gauge.rs b/metrics/src/data/gauge.rs index 29a92ae..9b8c866 100644 --- a/metrics/src/data/gauge.rs +++ b/metrics/src/data/gauge.rs @@ -1,6 +1,6 @@ +use crate::data::ScopedKey; use fnv::FnvBuildHasher; use hashbrown::HashMap; -use crate::data::ScopedKey; pub(crate) struct Gauge { data: HashMap, @@ -9,7 +9,7 @@ pub(crate) struct Gauge { impl Gauge { pub fn new() -> Gauge { Gauge { - data: HashMap::::default(), + data: HashMap::default(), } } @@ -18,7 +18,9 @@ impl Gauge { *ivalue = value; } - pub fn values(&self) -> Vec<(ScopedKey, i64)> { self.data.iter().map(|(k, v)| (k.clone(), *v)).collect() } + pub fn values(&self) -> Vec<(ScopedKey, i64)> { + self.data.iter().map(|(k, v)| (k.clone(), *v)).collect() + } } #[cfg(test)] diff --git a/metrics/src/data/histogram.rs b/metrics/src/data/histogram.rs index 6d70ec1..b3b745a 100644 --- a/metrics/src/data/histogram.rs +++ b/metrics/src/data/histogram.rs @@ -1,7 +1,4 @@ -use crate::{ - data::ScopedKey, - helper::duration_as_nanos, -}; +use crate::{data::ScopedKey, helper::duration_as_nanos}; use fnv::FnvBuildHasher; use hashbrown::HashMap; use std::time::{Duration, Instant}; @@ -17,7 +14,7 @@ impl Histogram { Histogram { window, granularity, - data: HashMap::::default(), + data: HashMap::default(), } } @@ -38,7 +35,10 @@ impl Histogram { } pub fn values(&self) -> Vec<(ScopedKey, HistogramSnapshot)> { - self.data.iter().map(|(k, v)| (k.clone(), v.snapshot())).collect() + self.data + .iter() + .map(|(k, v)| (k.clone(), v.snapshot())) + .collect() } } @@ -52,7 +52,8 @@ pub(crate) struct WindowedRawHistogram { impl WindowedRawHistogram { pub fn new(window: Duration, granularity: Duration) -> WindowedRawHistogram { - let num_buckets = ((duration_as_nanos(window) / duration_as_nanos(granularity)) as usize) + 1; + let num_buckets = + ((duration_as_nanos(window) / duration_as_nanos(granularity)) as usize) + 1; let mut buckets = Vec::with_capacity(num_buckets); for _ in 0..num_buckets { @@ -92,22 +93,26 @@ impl WindowedRawHistogram { } } +/// A point-in-time snapshot of a single histogram. #[derive(Debug, PartialEq, Eq)] pub struct HistogramSnapshot { - values: Vec + values: Vec, } impl HistogramSnapshot { - pub fn new(values: Vec) -> Self { + pub(crate) fn new(values: Vec) -> Self { HistogramSnapshot { values } } - pub fn values(&self) -> &Vec { &self.values } + /// Gets the raw values that compromise the entire histogram. + pub fn values(&self) -> &Vec { + &self.values + } } #[cfg(test)] mod tests { - use super::{Histogram, WindowedRawHistogram, ScopedKey}; + use super::{Histogram, ScopedKey, WindowedRawHistogram}; use std::time::{Duration, Instant}; #[test] @@ -139,7 +144,7 @@ mod tests { assert_eq!(values.len(), 1); let hdr = &values[0].1; - assert_eq!(hdr.values().len(), 1); + assert_eq!(hdr.values().len(), 4); assert_eq!(hdr.values().get(0).unwrap(), &1245); assert_eq!(hdr.values().get(1).unwrap(), &213); assert_eq!(hdr.values().get(2).unwrap(), &1022); diff --git a/metrics/src/data/mod.rs b/metrics/src/data/mod.rs index 3ae61b7..d52f243 100644 --- a/metrics/src/data/mod.rs +++ b/metrics/src/data/mod.rs @@ -50,8 +50,12 @@ pub(crate) enum Sample { pub(crate) struct ScopedKey(pub u64, pub MetricKey); impl ScopedKey { - pub(crate) fn id(&self) -> u64 { self.0 } - pub(crate) fn into_string_scoped(self, scope: String) -> StringScopedKey { StringScopedKey(scope, self.1) } + pub(crate) fn id(&self) -> u64 { + self.0 + } + pub(crate) fn into_string_scoped(self, scope: String) -> StringScopedKey { + StringScopedKey(scope, self.1) + } } /// A string scoped metric key. diff --git a/metrics/src/data/snapshot.rs b/metrics/src/data/snapshot.rs index 834fa97..ec206a8 100644 --- a/metrics/src/data/snapshot.rs +++ b/metrics/src/data/snapshot.rs @@ -1,6 +1,6 @@ use super::histogram::HistogramSnapshot; -use std::fmt::Display; use metrics_core::MetricsExporter; +use std::fmt::Display; /// A typed metric measurement, used in snapshots. /// @@ -36,7 +36,8 @@ impl Snapshot { where T: Display, { - self.measurements.push(TypedMeasurement::Gauge(key.to_string(), value)); + self.measurements + .push(TypedMeasurement::Gauge(key.to_string(), value)); } /// Sets timing percentiles for the given metric key. @@ -71,24 +72,26 @@ impl Snapshot { for value in hs.values() { exporter.export_histogram(key, *value); } - }, + } TypedMeasurement::ValueHistogram(key, hs) => { for value in hs.values() { exporter.export_histogram(key, *value); } - }, + } } } } /// Converts this [`Snapshot`] to the underlying vector of measurements. - pub fn into_measurements(self) -> Vec { self.measurements } + pub fn into_measurements(self) -> Vec { + self.measurements + } } #[cfg(test)] mod tests { + use super::{HistogramSnapshot, MetricsExporter, Snapshot, TypedMeasurement}; use std::collections::HashMap; - use super::{Snapshot, HistogramSnapshot, TypedMeasurement, MetricsExporter}; #[derive(Default)] struct MockExporter { diff --git a/metrics/src/helper.rs b/metrics/src/helper.rs index dc321e3..0212980 100644 --- a/metrics/src/helper.rs +++ b/metrics/src/helper.rs @@ -4,10 +4,14 @@ use std::{ }; /// Helpers to create an I/O error from a string. -pub fn io_error(reason: &str) -> Error { Error::new(ErrorKind::Other, reason) } +pub fn io_error(reason: &str) -> Error { + Error::new(ErrorKind::Other, reason) +} /// Converts a duration to nanoseconds. -pub fn duration_as_nanos(d: Duration) -> u64 { (d.as_secs() * 1_000_000_000) + u64::from(d.subsec_nanos()) } +pub fn duration_as_nanos(d: Duration) -> u64 { + (d.as_secs() * 1_000_000_000) + u64::from(d.subsec_nanos()) +} #[cfg(test)] mod tests { diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index 7302f59..855c16f 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -1,6 +1,6 @@ //! High-speed metrics collection library. //! -//! hotmic provides a generalized metrics collection library targeted at users who want to log +//! `metrics` provides a generalized metrics collection library targeted at users who want to log //! metrics at high volume and high speed. //! //! # Design @@ -11,55 +11,44 @@ //! aggregation, and summarization. The [`Receiver`] is intended to be spawned onto a dedicated //! background thread. //! -//! From a [`Receiver`], callers can create a [`Sink`], which allows registering facets -- or -//! interests -- in a given metric, along with sending the metrics themselves. All metrics need to -//! be pre-registered, in essence, with the receiver, which allows us to know which aspects of a -//! metric to track: count, value, or percentile. +//! Once a [`Receiver`] is created, callers can either create a [`Sink`] for sending metrics, or a +//! [`Controller`] for getting metrics out. //! -//! A [`Sink`] can be cheaply cloned and does not require a mutable reference to send metrics, and -//! so callers have great flexibility in being able to control their resource consumption when it -//! comes to sinks. [`Receiver`] also allows configuring the capacity of the underlying channels to -//! finely tune resource consumption. +//! A [`Sink`] can be cheaply cloned and does not require a mutable reference to send metrics, so +//! callers have increased flexibility in usage and control over whether or not to clone sinks, +//! share references, etc. //! -//! Being based on [`crossbeam-channel`] allows us to process close to fifteen million metrics per -//! second on a single core, with very low ingest latencies: 100ns on average at full throughput. +//! A [`Controller`] provides both a synchronous and asynchronous snapshotting interface, which is +//! [`metrics-core`][metrics_core] compatible for exporting. This allows flexibility in +//! integration amongst traditional single-threaded or hand-rolled multi-threaded applications and +//! the emerging asynchronous Rust ecosystem. +//! +//! # Performance +//! +//! Being based on [`crossbeam-channel`][crossbeam_channel] allows us to process close to ten +//! million metrics per second using a single core, with average ingest latencies of around 100ns. //! //! # Metrics //! -//! hotmic supports counters, gauges, and histograms. +//! Counters, gauges, and histograms are supported, and follow the definitions outlined in +//! [`metrics-core`][metrics_core]. //! -//! A counter is a single value that can be updated with deltas to increase or decrease the value. -//! This would be your typical "messages sent" or "database queries executed" style of metric, -//! where the value changes over time. -//! -//! A gauge is also a single value but does not support delta updates. When a gauge is set, the -//! value sent becomes _the_ value of the gauge. Gauges can be useful for metrics that measure a -//! point-in-time value, such as "connected clients" or "running queries". While those metrics -//! could also be represented by a count, gauges can be simpler in cases where you're already -//! computing and storing the value, and simply want to expose it in your metrics. -//! -//! A histogram tracks the distribution of values: how many values were between 0-5, between 6-10, -//! etc. This is the canonical way to measure latency: the time spent running a piece of code or -//! servicing an operation. By keeping track of the individual measurements, we can better see how -//! many are slow, fast, average, and in what proportions. +//! Here's a simple example of creating a receiver and working with a sink: //! //! ``` -//! # extern crate hotmic; -//! use hotmic::Receiver; +//! # extern crate metrics; +//! use metrics::Receiver; //! use std::{thread, time::Duration}; //! let receiver = Receiver::builder().build(); //! let sink = receiver.get_sink(); //! -//! // We can update a counter. Counters are signed, and can be updated either with a delta, or -//! // can be incremented and decremented with the [`Sink::increment`] and [`Sink::decrement`]. -//! sink.update_count("widgets", 5); -//! sink.update_count("widgets", -3); -//! sink.increment("widgets"); -//! sink.decrement("widgets"); +//! // We can update a counter. Counters are monotonic, unsigned integers that start at 0 and +//! // increase over time. +//! sink.record_count("widgets", 5); //! -//! // We can update a gauge. Gauges are unsigned, and hold on to the last value they were updated +//! // We can update a gauge. Gauges are signed, and hold on to the last value they were updated //! // to, so you need to track the overall value on your own. -//! sink.update_gauge("red_balloons", 99); +//! sink.record_gauge("red_balloons", 99); //! //! // We can update a timing histogram. For timing, you also must measure the start and end //! // time using the built-in `Clock` exposed by the sink. The receiver internally converts the @@ -68,22 +57,14 @@ //! let start = sink.clock().start(); //! thread::sleep(Duration::from_millis(10)); //! let end = sink.clock().end(); -//! let rows = 42; -//! -//! // This would just set the timing: -//! sink.update_timing("db.gizmo_query", start, end); -//! -//! // This would set the timing and also let you provide a customized count value. Being able to -//! // specify a count is handy when tracking things like the time it took to execute a database -//! // query, along with how many rows that query returned: -//! sink.update_timing_with_count("db.gizmo_query", start, end, rows); +//! sink.record_timing("db.gizmo_query", start, end); //! //! // Finally, we can update a value histogram. Technically speaking, value histograms aren't //! // fundamentally different from timing histograms. If you use a timing histogram, we do the //! // math for you of getting the time difference, and we make sure the metric name has the right //! // unit suffix so you can tell it's measuring time, but other than that, nearly identical! //! let buf_size = 4096; -//! sink.update_value("buf_size", buf_size); +//! sink.record_value("buf_size", buf_size); //! ``` //! //! # Scopes @@ -94,35 +75,31 @@ //! This feature is a simpler approach to tagging: while not as semantically rich, it provides the //! level of detail necessary to distinguish a single metric between multiple callsites. //! -//! An important thing to note is: registered metrics are only good for the scope they were -//! registered at. If you create a scoped [`Sink`], you must register, or reregister, the metrics -//! you will be sending to it. -//! //! For example, after getting a [`Sink`] from the [`Receiver`], we can easily nest ourselves under //! the root scope and then send some metrics: //! //! ``` -//! # extern crate hotmic; -//! use hotmic::Receiver; +//! # extern crate metrics; +//! use metrics::Receiver; //! let receiver = Receiver::builder().build(); //! //! // This sink has no scope aka the root scope. The metric will just end up as "widgets". //! let root_sink = receiver.get_sink(); -//! root_sink.update_count("widgets", 42); +//! root_sink.record_count("widgets", 42); //! //! // This sink is under the "secret" scope. Since we derived ourselves from the root scope, //! // we're not nested under anything, but our metric name will end up being "secret.widgets". //! let scoped_sink = root_sink.scoped("secret"); -//! scoped_sink.update_count("widgets", 42); +//! scoped_sink.record_count("widgets", 42); //! //! // This sink is under the "supersecret" scope, but we're also nested! The metric name for this //! // sample will end up being "secret.supersecret.widget". //! let scoped_sink_two = scoped_sink.scoped("supersecret"); -//! scoped_sink_two.update_count("widgets", 42); +//! scoped_sink_two.record_count("widgets", 42); //! //! // Sinks retain their scope even when cloned, so the metric name will be the same as above. //! let cloned_sink = scoped_sink_two.clone(); -//! cloned_sink.update_count("widgets", 42); +//! cloned_sink.record_count("widgets", 42); //! //! // This sink will be nested two levels deeper than its parent by using a slightly different //! // input scope: scope can be a single string, or multiple strings, which is interpreted as @@ -130,8 +107,11 @@ //! // //! // This metric name will end up being "super.secret.ultra.special.widgets". //! let scoped_sink_three = scoped_sink.scoped(&["super", "secret", "ultra", "special"]); -//! scoped_sink_two.update_count("widgets", 42); +//! scoped_sink_two.record_count("widgets", 42); //! ``` +//! +//! [crossbeam_channel]: https://docs.rs/crossbeam-channel +//! [metrics_core]: https://docs.rs/metrics-core mod configuration; mod control; mod data; @@ -143,8 +123,9 @@ mod sink; pub use self::{ configuration::Configuration, control::{Controller, SnapshotError}, + data::histogram::HistogramSnapshot, receiver::Receiver, - sink::{Sink, SinkError}, + sink::{AsScoped, Sink, SinkError}, }; pub mod snapshot { diff --git a/metrics/src/receiver.rs b/metrics/src/receiver.rs index b9cd520..003c92a 100644 --- a/metrics/src/receiver.rs +++ b/metrics/src/receiver.rs @@ -62,10 +62,12 @@ impl Receiver { } } - /// Gets a builder to configure a `Receiver` instance with. - pub fn builder() -> Configuration { Configuration::default() } + /// Gets a builder to configure a [`Receiver`] instance with. + pub fn builder() -> Configuration { + Configuration::default() + } - /// Creates a `Sink` bound to this receiver. + /// Creates a [`Sink`] bound to this receiver. pub fn get_sink(&self) -> Sink { Sink::new_with_scope_id( self.msg_tx.clone(), @@ -76,10 +78,14 @@ impl Receiver { ) } - /// Creates a `Controller` bound to this receiver. - pub fn get_controller(&self) -> Controller { Controller::new(self.control_tx.clone()) } + /// Creates a [`Controller`] bound to this receiver. + pub fn get_controller(&self) -> Controller { + Controller::new(self.control_tx.clone()) + } /// Run the receiver. + /// + /// This is blocking, and should be run in a dedicated background thread. pub fn run(&mut self) { let batch_size = self.config.batch_size; let mut batch = Vec::with_capacity(batch_size); @@ -136,7 +142,9 @@ impl Receiver { return Some(key.into_string_scoped("".to_owned())); } - self.scopes.get(scope_id).map(|scope| key.into_string_scoped(scope)) + self.scopes + .get(scope_id) + .map(|scope| key.into_string_scoped(scope)) } /// Gets a snapshot of the current metrics/facets. @@ -180,33 +188,31 @@ impl Receiver { ControlFrame::Snapshot(tx) => { let snapshot = self.get_snapshot(); let _ = tx.send(snapshot); - }, + } ControlFrame::SnapshotAsync(tx) => { let snapshot = self.get_snapshot(); let _ = tx.send(snapshot); - }, + } } } /// Processes a message frame. fn process_msg_frame(&mut self, msg: MessageFrame) { match msg { - MessageFrame::Data(sample) => { - match sample { - Sample::Count(key, count) => { - self.counter.update(key, count); - }, - Sample::Gauge(key, value) => { - self.gauge.update(key, value); - }, - Sample::TimingHistogram(key, start, end) => { - let delta = end - start; - self.counter.update(key.clone(), 1); - self.thistogram.update(key, delta); - }, - Sample::ValueHistogram(key, value) => { - self.vhistogram.update(key, value); - }, + MessageFrame::Data(sample) => match sample { + Sample::Count(key, count) => { + self.counter.update(key, count); + } + Sample::Gauge(key, value) => { + self.gauge.update(key, value); + } + Sample::TimingHistogram(key, start, end) => { + let delta = end - start; + self.counter.update(key.clone(), 1); + self.thistogram.update(key, delta); + } + Sample::ValueHistogram(key, value) => { + self.vhistogram.update(key, value); } }, } diff --git a/metrics/src/sink.rs b/metrics/src/sink.rs index 6f49641..ae82c2e 100644 --- a/metrics/src/sink.rs +++ b/metrics/src/sink.rs @@ -1,5 +1,5 @@ use crate::{ - data::{Sample, MetricKey, ScopedKey}, + data::{MetricKey, Sample, ScopedKey}, helper::io_error, receiver::MessageFrame, scopes::Scopes, @@ -34,7 +34,10 @@ pub struct Sink { impl Sink { pub(crate) fn new( - msg_tx: Sender, clock: Clock, scopes: Arc, scope: String, + msg_tx: Sender, + clock: Clock, + scopes: Arc, + scope: String, ) -> Sink { let scope_id = scopes.register(scope.clone()); @@ -48,7 +51,11 @@ impl Sink { } pub(crate) fn new_with_scope_id( - msg_tx: Sender, clock: Clock, scopes: Arc, scope: String, scope_id: u64, + msg_tx: Sender, + clock: Clock, + scopes: Arc, + scope: String, + scope_id: u64, ) -> Sink { Sink { msg_tx, @@ -82,11 +89,18 @@ impl Sink { pub fn scoped<'a, S: AsScoped<'a> + ?Sized>(&self, scope: &'a S) -> Sink { let new_scope = scope.as_scoped(self.scope.clone()); - Sink::new(self.msg_tx.clone(), self.clock.clone(), self.scopes.clone(), new_scope) + Sink::new( + self.msg_tx.clone(), + self.clock.clone(), + self.scopes.clone(), + new_scope, + ) } /// Reference to the internal high-speed clock interface. - pub fn clock(&self) -> &Clock { &self.clock } + pub fn clock(&self) -> &Clock { + &self.clock + } /// Records the count for a given metric. pub fn record_count>(&self, key: K, delta: u64) { @@ -94,9 +108,7 @@ impl Sink { self.send(Sample::Count(scoped_key, delta)) } - /// Records the value for a given metric. - /// - /// This can be used either for setting a gauge or updating a value histogram. + /// Records the gauge for a given metric. pub fn record_gauge>(&self, key: K, value: i64) { let scoped_key = ScopedKey(self.scope_id, key.into()); self.send(Sample::Gauge(scoped_key, value))