A lot of documentation and syntax cleanups.

This commit is contained in:
Toby Lawrence 2019-04-03 00:30:24 -04:00
parent c58d1fe8c0
commit 6b9818b005
No known key found for this signature in database
GPG Key ID: 3BB201B0EEE9212E
15 changed files with 189 additions and 182 deletions

View File

@ -4,7 +4,9 @@ This document is based on the [Rust Code of Conduct](https://www.rust-lang.org/c
## Conduct
**Contact**: [toby@nuclearfurnace.com](mailto:toby@nuclearfurnace.com)
**Contact**:
[toby@nuclearfurnace.com](mailto:toby@nuclearfurnace.com)
[luciofranco14@gmail.com](mailto:luciofranco14@gmail.com)
* We are committed to providing a friendly, safe and welcoming environment for all, regardless of level of experience, gender identity and expression, sexual orientation, disability, personal appearance, body size, race, ethnicity, age, religion, nationality, or other similar characteristic.
* Avoid using overtly sexual nicknames or other nicknames that might detract from a friendly, safe and welcoming environment for all.
@ -17,7 +19,7 @@ This document is based on the [Rust Code of Conduct](https://www.rust-lang.org/c
## Moderation
These are the policies for upholding our community's standards of conduct. If you feel that a thread needs moderation, please use the contact information above, or mention @tobz in the thread.
These are the policies for upholding our community's standards of conduct. If you feel that a thread needs moderation, please use the contact information above, or mention @tobz or @LucioFranco in the thread.
1. Remarks that violate this Code of Conduct, including hateful, hurtful, oppressive, or exclusionary remarks, are not allowed. (Cursing is allowed, but never targeting another user, and never in a hateful manner.)
2. Remarks that moderators find inappropriate, whether listed in the code of conduct or not, are also not allowed.

View File

@ -1,7 +1,7 @@
[package]
name = "metrics"
version = "0.9.0"
authors = ["Toby Lawrence <toby@metrics-rs.com>"]
authors = ["Toby Lawrence <toby@nuclearfurnace.com>"]
edition = "2018"
license = "MIT"
@ -27,8 +27,7 @@ crossbeam-channel = "^0.3"
parking_lot = "^0.7"
fnv = "^1.0"
hashbrown = "^0.1"
quanta = { version = "^0.2", features = ["asm"] }
derivative = "^1.0"
quanta = "^0.2"
tokio-sync = "^0.1"
[dev-dependencies]

View File

@ -1,33 +0,0 @@
[env]
RUST_TEST_THREADS = "1"
CARGO_MAKE_RUN_CODECOV = "true"
RUSTFLAGS = "-D warnings"
CARGO_FEATURES = "--no-default-features"
[tasks.format-stable]
command = "cargo"
args = ["fmt"]
[tasks.pre-verify-project]
dependencies = ["check-format", "clippy", "coverage-flow"]
[tasks.pre-coverage]
dependencies = ["test"]
[tasks.build-verbose]
description = "Runs the rust compiler with verbose output, with interpolated features."
category = "Build"
command = "cargo"
args = ["build", "--verbose", "${CARGO_FEATURES}"]
[tasks.test-verbose]
description = "Runs all available tests with verbose output, with interpolated features."
category = "Test"
command = "cargo"
args = ["test", "--verbose", "${CARGO_FEATURES}"]
[tasks.test]
description = "Runs all available tests, with interpolated features."
category = "Test"
command = "cargo"
args = ["test", "${CARGO_FEATURES}"]

View File

@ -8,7 +8,7 @@ extern crate metrics_core;
use getopts::Options;
use hdrhistogram::Histogram;
use metrics::{Receiver, Sink, snapshot::TypedMeasurement};
use metrics::{snapshot::TypedMeasurement, Receiver, Sink};
use std::{
env,
sync::{
@ -81,10 +81,25 @@ fn print_usage(program: &str, opts: &Options) {
pub fn opts() -> Options {
let mut opts = Options::new();
opts.optopt("d", "duration", "number of seconds to run the benchmark", "INTEGER");
opts.optopt(
"d",
"duration",
"number of seconds to run the benchmark",
"INTEGER",
);
opts.optopt("p", "producers", "number of producers", "INTEGER");
opts.optopt("c", "capacity", "maximum number of unprocessed items", "INTEGER");
opts.optopt("b", "batch-size", "maximum number of items in a batch", "INTEGER");
opts.optopt(
"c",
"capacity",
"maximum number of unprocessed items",
"INTEGER",
);
opts.optopt(
"b",
"batch-size",
"maximum number of items in a batch",
"INTEGER",
);
opts.optflag("h", "help", "print this help menu");
opts
@ -102,7 +117,7 @@ fn main() {
Err(f) => {
error!("Failed to parse command line args: {}", f);
return;
},
}
};
if matches.opt_present("help") {
@ -183,19 +198,21 @@ fn main() {
let end = Instant::now();
snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64);
let turn_total = snapshot.unwrap().into_measurements()
let turn_total = snapshot
.unwrap()
.into_measurements()
.iter()
.fold(0, |mut acc, m| {
match m {
TypedMeasurement::Counter(key, value) => {
println!("got counter {} -> {}", key, value);
acc += *value;
},
}
TypedMeasurement::Gauge(key, value) => {
println!("got counter {} -> {}", key, value);
acc += *value as u64;
},
_ => {},
}
_ => {}
}
acc
@ -229,7 +246,9 @@ fn main() {
}
}
fn duration_as_nanos(d: Duration) -> f64 { (d.as_secs() as f64 * 1e9) + d.subsec_nanos() as f64 }
fn duration_as_nanos(d: Duration) -> f64 {
(d.as_secs() as f64 * 1e9) + d.subsec_nanos() as f64
}
fn nanos_to_readable(t: u64) -> String {
let f = t as f64;

View File

@ -23,7 +23,9 @@ impl Default for Configuration {
impl Configuration {
/// Creates a new [`Configuration`] with default values.
pub fn new() -> Configuration { Default::default() }
pub fn new() -> Configuration {
Default::default()
}
/// Sets the buffer capacity.
///
@ -38,7 +40,7 @@ impl Configuration {
/// at our default value, we preallocate roughly ~32KB.
///
/// Generally speaking, sending and processing metrics is fast enough that the default value of
/// 4096 supports millions of samples per second.
/// 512 supports millions of samples per second.
pub fn capacity(mut self, capacity: usize) -> Self {
self.capacity = capacity;
self
@ -50,8 +52,8 @@ impl Configuration {
///
/// This controls the size of message batches that we collect for processing. The only real
/// reason to tweak this is to control the latency from the sender side. Larger batches lower
/// the ingest latency in the face of high metric ingest pressure at the cost of higher tail
/// latencies.
/// the ingest latency in the face of high metric ingest pressure at the cost of higher ingest
/// tail latencies.
///
/// Long story short, you shouldn't need to change this, but it's here if you really do.
pub fn batch_size(mut self, batch_size: usize) -> Self {
@ -63,17 +65,12 @@ impl Configuration {
///
/// Defaults to a 10 second window with 1 second granularity.
///
/// This controls how long of a time frame the histogram will track, on a rolling window.
/// We'll create enough underlying histogram buckets so that we have (window / granularity)
/// buckets, and every interval that passes (granularity), we'll add a new bucket and drop the
/// oldest one, thereby providing a rolling window.
/// This controls both how long of a time window we track histogram data for, and the
/// granularity in which we roll off old data.
///
/// Histograms, under the hood, are hard-coded to track three significant digits, and will take
/// a theoretical maximum of around 60KB per bucket, so a single histogram metric with the
/// default window/granularity will take a maximum of around 600KB.
///
/// In practice, this should be much smaller based on the maximum values pushed into the
/// histogram, as the underlying histogram storage is automatically resized on the fly.
/// As an example, with the default values, we would keep the last 10 seconds worth of
/// histogram data, and would remove 1 seconds worth of data at a time as the window rolled
/// forward.
pub fn histogram(mut self, window: Duration, granularity: Duration) -> Self {
self.histogram_window = window;
self.histogram_granularity = granularity;
@ -81,5 +78,7 @@ impl Configuration {
}
/// Create a [`Receiver`] based on this configuration.
pub fn build(self) -> Receiver { Receiver::from_config(self) }
pub fn build(self) -> Receiver {
Receiver::from_config(self)
}
}

View File

@ -32,7 +32,9 @@ pub struct Controller {
}
impl Controller {
pub(crate) fn new(control_tx: Sender<ControlFrame>) -> Controller { Controller { control_tx } }
pub(crate) fn new(control_tx: Sender<ControlFrame>) -> Controller {
Controller { control_tx }
}
/// Retrieves a snapshot of the current metric state.
pub fn get_snapshot(&self) -> Result<Snapshot, SnapshotError> {

View File

@ -1,6 +1,6 @@
use crate::data::ScopedKey;
use fnv::FnvBuildHasher;
use hashbrown::HashMap;
use crate::data::ScopedKey;
pub(crate) struct Counter {
data: HashMap<ScopedKey, u64, FnvBuildHasher>,
@ -9,7 +9,7 @@ pub(crate) struct Counter {
impl Counter {
pub fn new() -> Counter {
Counter {
data: HashMap::<ScopedKey, u64, FnvBuildHasher>::default(),
data: HashMap::default(),
}
}
@ -18,7 +18,9 @@ impl Counter {
*value = value.wrapping_add(delta);
}
pub fn values(&self) -> Vec<(ScopedKey, u64)> { self.data.iter().map(|(k, v)| (k.clone(), *v)).collect() }
pub fn values(&self) -> Vec<(ScopedKey, u64)> {
self.data.iter().map(|(k, v)| (k.clone(), *v)).collect()
}
}
#[cfg(test)]

View File

@ -1,6 +1,6 @@
use crate::data::ScopedKey;
use fnv::FnvBuildHasher;
use hashbrown::HashMap;
use crate::data::ScopedKey;
pub(crate) struct Gauge {
data: HashMap<ScopedKey, i64, FnvBuildHasher>,
@ -9,7 +9,7 @@ pub(crate) struct Gauge {
impl Gauge {
pub fn new() -> Gauge {
Gauge {
data: HashMap::<ScopedKey, i64, FnvBuildHasher>::default(),
data: HashMap::default(),
}
}
@ -18,7 +18,9 @@ impl Gauge {
*ivalue = value;
}
pub fn values(&self) -> Vec<(ScopedKey, i64)> { self.data.iter().map(|(k, v)| (k.clone(), *v)).collect() }
pub fn values(&self) -> Vec<(ScopedKey, i64)> {
self.data.iter().map(|(k, v)| (k.clone(), *v)).collect()
}
}
#[cfg(test)]

View File

@ -1,7 +1,4 @@
use crate::{
data::ScopedKey,
helper::duration_as_nanos,
};
use crate::{data::ScopedKey, helper::duration_as_nanos};
use fnv::FnvBuildHasher;
use hashbrown::HashMap;
use std::time::{Duration, Instant};
@ -17,7 +14,7 @@ impl Histogram {
Histogram {
window,
granularity,
data: HashMap::<ScopedKey, WindowedRawHistogram, FnvBuildHasher>::default(),
data: HashMap::default(),
}
}
@ -38,7 +35,10 @@ impl Histogram {
}
pub fn values(&self) -> Vec<(ScopedKey, HistogramSnapshot)> {
self.data.iter().map(|(k, v)| (k.clone(), v.snapshot())).collect()
self.data
.iter()
.map(|(k, v)| (k.clone(), v.snapshot()))
.collect()
}
}
@ -52,7 +52,8 @@ pub(crate) struct WindowedRawHistogram {
impl WindowedRawHistogram {
pub fn new(window: Duration, granularity: Duration) -> WindowedRawHistogram {
let num_buckets = ((duration_as_nanos(window) / duration_as_nanos(granularity)) as usize) + 1;
let num_buckets =
((duration_as_nanos(window) / duration_as_nanos(granularity)) as usize) + 1;
let mut buckets = Vec::with_capacity(num_buckets);
for _ in 0..num_buckets {
@ -92,22 +93,26 @@ impl WindowedRawHistogram {
}
}
/// A point-in-time snapshot of a single histogram.
#[derive(Debug, PartialEq, Eq)]
pub struct HistogramSnapshot {
values: Vec<u64>
values: Vec<u64>,
}
impl HistogramSnapshot {
pub fn new(values: Vec<u64>) -> Self {
pub(crate) fn new(values: Vec<u64>) -> Self {
HistogramSnapshot { values }
}
pub fn values(&self) -> &Vec<u64> { &self.values }
/// Gets the raw values that compromise the entire histogram.
pub fn values(&self) -> &Vec<u64> {
&self.values
}
}
#[cfg(test)]
mod tests {
use super::{Histogram, WindowedRawHistogram, ScopedKey};
use super::{Histogram, ScopedKey, WindowedRawHistogram};
use std::time::{Duration, Instant};
#[test]
@ -139,7 +144,7 @@ mod tests {
assert_eq!(values.len(), 1);
let hdr = &values[0].1;
assert_eq!(hdr.values().len(), 1);
assert_eq!(hdr.values().len(), 4);
assert_eq!(hdr.values().get(0).unwrap(), &1245);
assert_eq!(hdr.values().get(1).unwrap(), &213);
assert_eq!(hdr.values().get(2).unwrap(), &1022);

View File

@ -50,8 +50,12 @@ pub(crate) enum Sample {
pub(crate) struct ScopedKey(pub u64, pub MetricKey);
impl ScopedKey {
pub(crate) fn id(&self) -> u64 { self.0 }
pub(crate) fn into_string_scoped(self, scope: String) -> StringScopedKey { StringScopedKey(scope, self.1) }
pub(crate) fn id(&self) -> u64 {
self.0
}
pub(crate) fn into_string_scoped(self, scope: String) -> StringScopedKey {
StringScopedKey(scope, self.1)
}
}
/// A string scoped metric key.

View File

@ -1,6 +1,6 @@
use super::histogram::HistogramSnapshot;
use std::fmt::Display;
use metrics_core::MetricsExporter;
use std::fmt::Display;
/// A typed metric measurement, used in snapshots.
///
@ -36,7 +36,8 @@ impl Snapshot {
where
T: Display,
{
self.measurements.push(TypedMeasurement::Gauge(key.to_string(), value));
self.measurements
.push(TypedMeasurement::Gauge(key.to_string(), value));
}
/// Sets timing percentiles for the given metric key.
@ -71,24 +72,26 @@ impl Snapshot {
for value in hs.values() {
exporter.export_histogram(key, *value);
}
},
}
TypedMeasurement::ValueHistogram(key, hs) => {
for value in hs.values() {
exporter.export_histogram(key, *value);
}
},
}
}
}
}
/// Converts this [`Snapshot`] to the underlying vector of measurements.
pub fn into_measurements(self) -> Vec<TypedMeasurement> { self.measurements }
pub fn into_measurements(self) -> Vec<TypedMeasurement> {
self.measurements
}
}
#[cfg(test)]
mod tests {
use super::{HistogramSnapshot, MetricsExporter, Snapshot, TypedMeasurement};
use std::collections::HashMap;
use super::{Snapshot, HistogramSnapshot, TypedMeasurement, MetricsExporter};
#[derive(Default)]
struct MockExporter {

View File

@ -4,10 +4,14 @@ use std::{
};
/// Helpers to create an I/O error from a string.
pub fn io_error(reason: &str) -> Error { Error::new(ErrorKind::Other, reason) }
pub fn io_error(reason: &str) -> Error {
Error::new(ErrorKind::Other, reason)
}
/// Converts a duration to nanoseconds.
pub fn duration_as_nanos(d: Duration) -> u64 { (d.as_secs() * 1_000_000_000) + u64::from(d.subsec_nanos()) }
pub fn duration_as_nanos(d: Duration) -> u64 {
(d.as_secs() * 1_000_000_000) + u64::from(d.subsec_nanos())
}
#[cfg(test)]
mod tests {

View File

@ -1,6 +1,6 @@
//! High-speed metrics collection library.
//!
//! hotmic provides a generalized metrics collection library targeted at users who want to log
//! `metrics` provides a generalized metrics collection library targeted at users who want to log
//! metrics at high volume and high speed.
//!
//! # Design
@ -11,55 +11,44 @@
//! aggregation, and summarization. The [`Receiver`] is intended to be spawned onto a dedicated
//! background thread.
//!
//! From a [`Receiver`], callers can create a [`Sink`], which allows registering facets -- or
//! interests -- in a given metric, along with sending the metrics themselves. All metrics need to
//! be pre-registered, in essence, with the receiver, which allows us to know which aspects of a
//! metric to track: count, value, or percentile.
//! Once a [`Receiver`] is created, callers can either create a [`Sink`] for sending metrics, or a
//! [`Controller`] for getting metrics out.
//!
//! A [`Sink`] can be cheaply cloned and does not require a mutable reference to send metrics, and
//! so callers have great flexibility in being able to control their resource consumption when it
//! comes to sinks. [`Receiver`] also allows configuring the capacity of the underlying channels to
//! finely tune resource consumption.
//! A [`Sink`] can be cheaply cloned and does not require a mutable reference to send metrics, so
//! callers have increased flexibility in usage and control over whether or not to clone sinks,
//! share references, etc.
//!
//! Being based on [`crossbeam-channel`] allows us to process close to fifteen million metrics per
//! second on a single core, with very low ingest latencies: 100ns on average at full throughput.
//! A [`Controller`] provides both a synchronous and asynchronous snapshotting interface, which is
//! [`metrics-core`][metrics_core] compatible for exporting. This allows flexibility in
//! integration amongst traditional single-threaded or hand-rolled multi-threaded applications and
//! the emerging asynchronous Rust ecosystem.
//!
//! # Performance
//!
//! Being based on [`crossbeam-channel`][crossbeam_channel] allows us to process close to ten
//! million metrics per second using a single core, with average ingest latencies of around 100ns.
//!
//! # Metrics
//!
//! hotmic supports counters, gauges, and histograms.
//! Counters, gauges, and histograms are supported, and follow the definitions outlined in
//! [`metrics-core`][metrics_core].
//!
//! A counter is a single value that can be updated with deltas to increase or decrease the value.
//! This would be your typical "messages sent" or "database queries executed" style of metric,
//! where the value changes over time.
//!
//! A gauge is also a single value but does not support delta updates. When a gauge is set, the
//! value sent becomes _the_ value of the gauge. Gauges can be useful for metrics that measure a
//! point-in-time value, such as "connected clients" or "running queries". While those metrics
//! could also be represented by a count, gauges can be simpler in cases where you're already
//! computing and storing the value, and simply want to expose it in your metrics.
//!
//! A histogram tracks the distribution of values: how many values were between 0-5, between 6-10,
//! etc. This is the canonical way to measure latency: the time spent running a piece of code or
//! servicing an operation. By keeping track of the individual measurements, we can better see how
//! many are slow, fast, average, and in what proportions.
//! Here's a simple example of creating a receiver and working with a sink:
//!
//! ```
//! # extern crate hotmic;
//! use hotmic::Receiver;
//! # extern crate metrics;
//! use metrics::Receiver;
//! use std::{thread, time::Duration};
//! let receiver = Receiver::builder().build();
//! let sink = receiver.get_sink();
//!
//! // We can update a counter. Counters are signed, and can be updated either with a delta, or
//! // can be incremented and decremented with the [`Sink::increment`] and [`Sink::decrement`].
//! sink.update_count("widgets", 5);
//! sink.update_count("widgets", -3);
//! sink.increment("widgets");
//! sink.decrement("widgets");
//! // We can update a counter. Counters are monotonic, unsigned integers that start at 0 and
//! // increase over time.
//! sink.record_count("widgets", 5);
//!
//! // We can update a gauge. Gauges are unsigned, and hold on to the last value they were updated
//! // We can update a gauge. Gauges are signed, and hold on to the last value they were updated
//! // to, so you need to track the overall value on your own.
//! sink.update_gauge("red_balloons", 99);
//! sink.record_gauge("red_balloons", 99);
//!
//! // We can update a timing histogram. For timing, you also must measure the start and end
//! // time using the built-in `Clock` exposed by the sink. The receiver internally converts the
@ -68,22 +57,14 @@
//! let start = sink.clock().start();
//! thread::sleep(Duration::from_millis(10));
//! let end = sink.clock().end();
//! let rows = 42;
//!
//! // This would just set the timing:
//! sink.update_timing("db.gizmo_query", start, end);
//!
//! // This would set the timing and also let you provide a customized count value. Being able to
//! // specify a count is handy when tracking things like the time it took to execute a database
//! // query, along with how many rows that query returned:
//! sink.update_timing_with_count("db.gizmo_query", start, end, rows);
//! sink.record_timing("db.gizmo_query", start, end);
//!
//! // Finally, we can update a value histogram. Technically speaking, value histograms aren't
//! // fundamentally different from timing histograms. If you use a timing histogram, we do the
//! // math for you of getting the time difference, and we make sure the metric name has the right
//! // unit suffix so you can tell it's measuring time, but other than that, nearly identical!
//! let buf_size = 4096;
//! sink.update_value("buf_size", buf_size);
//! sink.record_value("buf_size", buf_size);
//! ```
//!
//! # Scopes
@ -94,35 +75,31 @@
//! This feature is a simpler approach to tagging: while not as semantically rich, it provides the
//! level of detail necessary to distinguish a single metric between multiple callsites.
//!
//! An important thing to note is: registered metrics are only good for the scope they were
//! registered at. If you create a scoped [`Sink`], you must register, or reregister, the metrics
//! you will be sending to it.
//!
//! For example, after getting a [`Sink`] from the [`Receiver`], we can easily nest ourselves under
//! the root scope and then send some metrics:
//!
//! ```
//! # extern crate hotmic;
//! use hotmic::Receiver;
//! # extern crate metrics;
//! use metrics::Receiver;
//! let receiver = Receiver::builder().build();
//!
//! // This sink has no scope aka the root scope. The metric will just end up as "widgets".
//! let root_sink = receiver.get_sink();
//! root_sink.update_count("widgets", 42);
//! root_sink.record_count("widgets", 42);
//!
//! // This sink is under the "secret" scope. Since we derived ourselves from the root scope,
//! // we're not nested under anything, but our metric name will end up being "secret.widgets".
//! let scoped_sink = root_sink.scoped("secret");
//! scoped_sink.update_count("widgets", 42);
//! scoped_sink.record_count("widgets", 42);
//!
//! // This sink is under the "supersecret" scope, but we're also nested! The metric name for this
//! // sample will end up being "secret.supersecret.widget".
//! let scoped_sink_two = scoped_sink.scoped("supersecret");
//! scoped_sink_two.update_count("widgets", 42);
//! scoped_sink_two.record_count("widgets", 42);
//!
//! // Sinks retain their scope even when cloned, so the metric name will be the same as above.
//! let cloned_sink = scoped_sink_two.clone();
//! cloned_sink.update_count("widgets", 42);
//! cloned_sink.record_count("widgets", 42);
//!
//! // This sink will be nested two levels deeper than its parent by using a slightly different
//! // input scope: scope can be a single string, or multiple strings, which is interpreted as
@ -130,8 +107,11 @@
//! //
//! // This metric name will end up being "super.secret.ultra.special.widgets".
//! let scoped_sink_three = scoped_sink.scoped(&["super", "secret", "ultra", "special"]);
//! scoped_sink_two.update_count("widgets", 42);
//! scoped_sink_two.record_count("widgets", 42);
//! ```
//!
//! [crossbeam_channel]: https://docs.rs/crossbeam-channel
//! [metrics_core]: https://docs.rs/metrics-core
mod configuration;
mod control;
mod data;
@ -143,8 +123,9 @@ mod sink;
pub use self::{
configuration::Configuration,
control::{Controller, SnapshotError},
data::histogram::HistogramSnapshot,
receiver::Receiver,
sink::{Sink, SinkError},
sink::{AsScoped, Sink, SinkError},
};
pub mod snapshot {

View File

@ -62,10 +62,12 @@ impl Receiver {
}
}
/// Gets a builder to configure a `Receiver` instance with.
pub fn builder() -> Configuration { Configuration::default() }
/// Gets a builder to configure a [`Receiver`] instance with.
pub fn builder() -> Configuration {
Configuration::default()
}
/// Creates a `Sink` bound to this receiver.
/// Creates a [`Sink`] bound to this receiver.
pub fn get_sink(&self) -> Sink {
Sink::new_with_scope_id(
self.msg_tx.clone(),
@ -76,10 +78,14 @@ impl Receiver {
)
}
/// Creates a `Controller` bound to this receiver.
pub fn get_controller(&self) -> Controller { Controller::new(self.control_tx.clone()) }
/// Creates a [`Controller`] bound to this receiver.
pub fn get_controller(&self) -> Controller {
Controller::new(self.control_tx.clone())
}
/// Run the receiver.
///
/// This is blocking, and should be run in a dedicated background thread.
pub fn run(&mut self) {
let batch_size = self.config.batch_size;
let mut batch = Vec::with_capacity(batch_size);
@ -136,7 +142,9 @@ impl Receiver {
return Some(key.into_string_scoped("".to_owned()));
}
self.scopes.get(scope_id).map(|scope| key.into_string_scoped(scope))
self.scopes
.get(scope_id)
.map(|scope| key.into_string_scoped(scope))
}
/// Gets a snapshot of the current metrics/facets.
@ -180,33 +188,31 @@ impl Receiver {
ControlFrame::Snapshot(tx) => {
let snapshot = self.get_snapshot();
let _ = tx.send(snapshot);
},
}
ControlFrame::SnapshotAsync(tx) => {
let snapshot = self.get_snapshot();
let _ = tx.send(snapshot);
},
}
}
}
/// Processes a message frame.
fn process_msg_frame(&mut self, msg: MessageFrame) {
match msg {
MessageFrame::Data(sample) => {
match sample {
Sample::Count(key, count) => {
self.counter.update(key, count);
},
Sample::Gauge(key, value) => {
self.gauge.update(key, value);
},
Sample::TimingHistogram(key, start, end) => {
let delta = end - start;
self.counter.update(key.clone(), 1);
self.thistogram.update(key, delta);
},
Sample::ValueHistogram(key, value) => {
self.vhistogram.update(key, value);
},
MessageFrame::Data(sample) => match sample {
Sample::Count(key, count) => {
self.counter.update(key, count);
}
Sample::Gauge(key, value) => {
self.gauge.update(key, value);
}
Sample::TimingHistogram(key, start, end) => {
let delta = end - start;
self.counter.update(key.clone(), 1);
self.thistogram.update(key, delta);
}
Sample::ValueHistogram(key, value) => {
self.vhistogram.update(key, value);
}
},
}

View File

@ -1,5 +1,5 @@
use crate::{
data::{Sample, MetricKey, ScopedKey},
data::{MetricKey, Sample, ScopedKey},
helper::io_error,
receiver::MessageFrame,
scopes::Scopes,
@ -34,7 +34,10 @@ pub struct Sink {
impl Sink {
pub(crate) fn new(
msg_tx: Sender<MessageFrame>, clock: Clock, scopes: Arc<Scopes>, scope: String,
msg_tx: Sender<MessageFrame>,
clock: Clock,
scopes: Arc<Scopes>,
scope: String,
) -> Sink {
let scope_id = scopes.register(scope.clone());
@ -48,7 +51,11 @@ impl Sink {
}
pub(crate) fn new_with_scope_id(
msg_tx: Sender<MessageFrame>, clock: Clock, scopes: Arc<Scopes>, scope: String, scope_id: u64,
msg_tx: Sender<MessageFrame>,
clock: Clock,
scopes: Arc<Scopes>,
scope: String,
scope_id: u64,
) -> Sink {
Sink {
msg_tx,
@ -82,11 +89,18 @@ impl Sink {
pub fn scoped<'a, S: AsScoped<'a> + ?Sized>(&self, scope: &'a S) -> Sink {
let new_scope = scope.as_scoped(self.scope.clone());
Sink::new(self.msg_tx.clone(), self.clock.clone(), self.scopes.clone(), new_scope)
Sink::new(
self.msg_tx.clone(),
self.clock.clone(),
self.scopes.clone(),
new_scope,
)
}
/// Reference to the internal high-speed clock interface.
pub fn clock(&self) -> &Clock { &self.clock }
pub fn clock(&self) -> &Clock {
&self.clock
}
/// Records the count for a given metric.
pub fn record_count<K: Into<MetricKey>>(&self, key: K, delta: u64) {
@ -94,9 +108,7 @@ impl Sink {
self.send(Sample::Count(scoped_key, delta))
}
/// Records the value for a given metric.
///
/// This can be used either for setting a gauge or updating a value histogram.
/// Records the gauge for a given metric.
pub fn record_gauge<K: Into<MetricKey>>(&self, key: K, value: i64) {
let scoped_key = ScopedKey(self.scope_id, key.into());
self.send(Sample::Gauge(scoped_key, value))