Entirely remove the event loop and switch to pure atomics.

Originally, metrics (and `hotmic` before it was converted) was based on
an event loop that centered around `mio`'s `Poll` interface with a
custom channel to read and write metrics to.  That model required a
dedicated thread to run to poll for writes, and ingest them, managing
the internal data structures in turn.

Eventually, I rewrote that portion to be based on `crossbeam-channel`
but we still depended on a background thread to pop samples off the
channel and process them.

Instead, we've rewritten the core of metrics to be based purely on
atomics, with the caveat that we still do have a background thread.

Instead of a single channel that metrics are funneled into, each
underlying metric becomes a single-track codepath: each metric is backed
by an atomic structure which means we can pass handles to that storage
as far as the callers themselves, eliminating the need to funnel metrics
into the "core" where they all contend for processing.

Counters are gauges are now, effectively, wrapped atomic integers, which
means we can process over 100 million counter/gauge updates per core.
Histograms are based on a brand-new atomic "bucket" that allows for
fast, unbounded writes and the ability to snapshot at any time.

The end result is that we can process a mixed workload (counter, gauge,
and histogram) at sample rates of up to 30 million samples per second
per core, with p999 ingest latencies of in the low hundreds of
nanoseconds.  Taking snapshots also now avoids stalling the event loop
and driving up tail latencies for ingest, and writers can proceed with
no interruption.

There is still a background thread that is part of quanta's new "recent"
time support, which allows a background thread to incrementally update a
shared global time, which can be accessed more quickly than grabbing the
time directly.  For our purposes, only histograms need the time to
perform the window upkeep inherent to the sliding windows we use, and
the time they need can be far less precise than what quanta is capable
of.  This background thread is spawned automatically when creating a
receiver, and drops when the receiver goes away.  By default, it updates
20 times a second performing an operation which itself takes less than
100ns, so all in all, this background thread should be imperceptible,
performance-wise, on all systems*.

On top of all of this, we've embraced the natural pattern of defining
metrics individually at the variable/field level, and added supported
for proxy types, which can be acquired from a sink and embedded as
  fields within your existing types, which lets you directly update
  metrics with the ease of accessing a field in an object.  Sinks still
  have the ability to have metrics pushed directly into them, but this
  just opens up more possibilities.

* - famous last words
This commit is contained in:
Toby Lawrence 2019-05-27 20:41:42 -04:00
parent ebe5d9266b
commit 9f1bcb5226
20 changed files with 1346 additions and 897 deletions

View File

@ -28,13 +28,14 @@ recorders = ["metrics-recorder-text", "metrics-recorder-prometheus"]
[dependencies]
metrics-core = { path = "../metrics-core", version = "^0.3" }
crossbeam-channel = "^0.3"
metrics-util = { path = "../metrics-util", version = "^0.2" }
im = "^12"
fxhash = "^0.2"
arc-swap = "^0.3"
parking_lot = "^0.7"
fnv = "^1.0"
hashbrown = "^0.1"
quanta = "^0.2"
hashbrown = "^0.3"
quanta = "^0.3"
futures = "^0.1"
tokio-sync = "^0.1"
metrics-exporter-log = { path = "../metrics-exporter-log", version = "^0.2", optional = true }
metrics-exporter-http = { path = "../metrics-exporter-http", version = "^0.1", optional = true }
metrics-recorder-text = { path = "../metrics-recorder-text", version = "^0.2", optional = true }

View File

@ -8,53 +8,126 @@ extern crate metrics_core;
use getopts::Options;
use hdrhistogram::Histogram;
use metrics::{snapshot::TypedMeasurement, Receiver, Sink};
use metrics_core::SnapshotProvider;
use metrics::{Receiver, Sink};
use metrics_core::{Recorder, Snapshot, SnapshotProvider};
use quanta::Clock;
use std::{
env,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
thread,
time::{Duration, Instant},
};
const LOOP_SAMPLE: u64 = 1000;
struct Generator {
stats: Sink,
t0: Option<u64>,
gauge: i64,
hist: Histogram<u64>,
done: Arc<AtomicBool>,
rate_counter: Arc<AtomicU64>,
clock: Clock,
}
impl Generator {
fn new(stats: Sink, done: Arc<AtomicBool>) -> Generator {
fn new(
stats: Sink,
done: Arc<AtomicBool>,
rate_counter: Arc<AtomicU64>,
clock: Clock,
) -> Generator {
Generator {
stats,
t0: None,
gauge: 0,
hist: Histogram::<u64>::new_with_bounds(1, u64::max_value(), 3).unwrap(),
done,
rate_counter,
clock,
}
}
fn run(&mut self) {
let mut counter = 0;
loop {
counter += 1;
if self.done.load(Ordering::Relaxed) {
break;
}
self.gauge += 1;
let t1 = self.stats.now();
if let Some(t0) = self.t0 {
let start = self.stats.now();
let start = if counter % 33 == 0 {
self.stats.now()
} else {
0
};
//let _ = self.stats.record_count("ok", 1);
let _ = self.stats.record_timing("ok", t0, t1);
let _ = self.stats.record_gauge("total", self.gauge);
let delta = self.stats.now() - start;
self.hist.saturating_record(delta);
//let _ = self.stats.record_gauge("total", self.gauge);
if start != 0 {
let delta = self.stats.now() - start;
self.hist.saturating_record(delta);
// We also increment our global counter for the sample rate here.
self.rate_counter
.fetch_add(LOOP_SAMPLE * 1, Ordering::AcqRel);
}
}
self.t0 = Some(t1);
}
}
fn run_cached(&mut self) {
let mut counter = 0;
let counter_handle = self.stats.counter("ok");
let timing_handle = self.stats.histogram("ok");
let gauge_handle = self.stats.gauge("total");
loop {
counter += 1;
if self.done.load(Ordering::Relaxed) {
break;
}
self.gauge += 1;
let t1 = self.clock.recent();
if let Some(t0) = self.t0 {
let start = if counter % LOOP_SAMPLE == 0 {
self.stats.now()
} else {
0
};
//let _ = counter_handle.record(1);
let _ = timing_handle.record_timing(t0, t1);
//let _ = gauge_handle.record(self.gauge);
if start != 0 {
let delta = self.stats.now() - start;
self.hist.saturating_record(delta);
// We also increment our global counter for the sample rate here.
self.rate_counter
.fetch_add(LOOP_SAMPLE * 1, Ordering::AcqRel);
}
}
self.t0 = Some(t1);
}
}
@ -89,18 +162,6 @@ pub fn opts() -> Options {
"INTEGER",
);
opts.optopt("p", "producers", "number of producers", "INTEGER");
opts.optopt(
"c",
"capacity",
"maximum number of unprocessed items",
"INTEGER",
);
opts.optopt(
"b",
"batch-size",
"maximum number of items in a batch",
"INTEGER",
);
opts.optflag("h", "help", "print this help menu");
opts
@ -134,31 +195,19 @@ fn main() {
.unwrap_or_else(|| "60".to_owned())
.parse()
.unwrap();
let capacity = matches
.opt_str("capacity")
.unwrap_or_else(|| "1024".to_owned())
.parse()
.unwrap();
let batch_size = matches
.opt_str("batch-size")
.unwrap_or_else(|| "256".to_owned())
.parse()
.unwrap();
let producers = matches
.opt_str("producers")
.unwrap_or_else(|| "1".to_owned())
.parse()
.unwrap();
info!("duration: {}s", seconds);
info!("producers: {}", producers);
info!("capacity: {}", capacity);
info!("batch size: {}", batch_size);
let mut receiver = Receiver::builder()
.capacity(capacity)
.batch_size(batch_size)
.histogram(Duration::from_secs(5), Duration::from_secs(1))
.build();
let receiver = Receiver::builder()
.histogram(Duration::from_secs(5), Duration::from_millis(100))
.build()
.expect("failed to build receiver");
let sink = receiver.get_sink();
let sink = sink.scoped(&["alpha", "pools", "primary"]);
@ -167,13 +216,17 @@ fn main() {
// Spin up our sample producers.
let done = Arc::new(AtomicBool::new(false));
let rate_counter = Arc::new(AtomicU64::new(0));
let mut handles = Vec::new();
let clock = Clock::new();
for _ in 0..producers {
let s = sink.clone();
let d = done.clone();
let r = rate_counter.clone();
let c = clock.clone();
let handle = thread::spawn(move || {
Generator::new(s, d).run();
Generator::new(s, d, r, c).run_cached();
});
handles.push(handle);
@ -182,10 +235,6 @@ fn main() {
// Spin up the sink and let 'er rip.
let controller = receiver.get_controller();
thread::spawn(move || {
receiver.run();
});
// Poll the controller to figure out the sample rate.
let mut total = 0;
let mut t0 = Instant::now();
@ -195,22 +244,11 @@ fn main() {
let t1 = Instant::now();
let start = Instant::now();
let snapshot = controller.get_snapshot();
let snapshot = controller.get_snapshot().unwrap();
let end = Instant::now();
snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64);
let turn_total = snapshot
.unwrap()
.into_measurements()
.iter()
.fold(0, |acc, m| {
acc + match m {
TypedMeasurement::Counter(_key, value) => *value,
TypedMeasurement::Gauge(_key, value) => *value as u64,
_ => 0,
}
});
let turn_total = rate_counter.load(Ordering::Acquire);
let turn_delta = turn_total - total;
total = turn_total;
let rate = turn_delta as f64 / (duration_as_nanos(t1 - t0) / 1_000_000_000.0);
@ -239,6 +277,34 @@ fn main() {
}
}
struct TotalRecorder {
total: u64,
}
impl TotalRecorder {
pub fn new() -> Self {
Self { total: 0 }
}
pub fn total(&self) -> u64 {
self.total
}
}
impl Recorder for TotalRecorder {
fn record_counter<K: AsRef<str>>(&mut self, _key: K, value: u64) {
self.total += value;
}
fn record_gauge<K: AsRef<str>>(&mut self, _key: K, value: i64) {
self.total += value as u64;
}
fn record_histogram<K: AsRef<str>>(&mut self, _key: K, values: &[u64]) {
self.total += values.len() as u64;
}
}
fn duration_as_nanos(d: Duration) -> f64 {
(d.as_secs() as f64 * 1e9) + d.subsec_nanos() as f64
}

View File

@ -0,0 +1,202 @@
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate getopts;
extern crate hdrhistogram;
use getopts::Options;
use hdrhistogram::Histogram;
use quanta::Clock;
use std::{
env,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
thread,
time::{Duration, Instant},
};
struct Generator {
counter: Arc<AtomicU64>,
clock: Clock,
hist: Histogram<u64>,
done: Arc<AtomicBool>,
}
impl Generator {
fn new(counter: Arc<AtomicU64>, done: Arc<AtomicBool>) -> Generator {
Generator {
counter,
clock: Clock::new(),
hist: Histogram::<u64>::new_with_bounds(1, u64::max_value(), 3).unwrap(),
done,
}
}
fn run(&mut self) {
let mut counter = 0;
loop {
if self.done.load(Ordering::Relaxed) {
break;
}
let start = if counter % 100 == 0 {
self.clock.now()
} else {
0
};
counter = self.counter.fetch_add(1, Ordering::AcqRel);
if start != 0 {
let delta = self.clock.now() - start;
self.hist.saturating_record(delta);
}
}
}
}
impl Drop for Generator {
fn drop(&mut self) {
info!(
" sender latency: min: {:9} p50: {:9} p95: {:9} p99: {:9} p999: {:9} max: {:9}",
nanos_to_readable(self.hist.min()),
nanos_to_readable(self.hist.value_at_percentile(50.0)),
nanos_to_readable(self.hist.value_at_percentile(95.0)),
nanos_to_readable(self.hist.value_at_percentile(99.0)),
nanos_to_readable(self.hist.value_at_percentile(99.9)),
nanos_to_readable(self.hist.max())
);
}
}
fn print_usage(program: &str, opts: &Options) {
let brief = format!("Usage: {} [options]", program);
print!("{}", opts.usage(&brief));
}
pub fn opts() -> Options {
let mut opts = Options::new();
opts.optopt(
"d",
"duration",
"number of seconds to run the benchmark",
"INTEGER",
);
opts.optopt("p", "producers", "number of producers", "INTEGER");
opts.optflag("h", "help", "print this help menu");
opts
}
fn main() {
env_logger::init();
let args: Vec<String> = env::args().collect();
let program = &args[0];
let opts = opts();
let matches = match opts.parse(&args[1..]) {
Ok(m) => m,
Err(f) => {
error!("Failed to parse command line args: {}", f);
return;
}
};
if matches.opt_present("help") {
print_usage(program, &opts);
return;
}
info!("metrics benchmark");
// Build our sink and configure the facets.
let seconds = matches
.opt_str("duration")
.unwrap_or_else(|| "60".to_owned())
.parse()
.unwrap();
let producers = matches
.opt_str("producers")
.unwrap_or_else(|| "1".to_owned())
.parse()
.unwrap();
info!("duration: {}s", seconds);
info!("producers: {}", producers);
// Spin up our sample producers.
let counter = Arc::new(AtomicU64::new(0));
let done = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
for _ in 0..producers {
let c = counter.clone();
let d = done.clone();
let handle = thread::spawn(move || {
Generator::new(c, d).run();
});
handles.push(handle);
}
// Poll the controller to figure out the sample rate.
let mut total = 0;
let mut t0 = Instant::now();
let mut snapshot_hist = Histogram::<u64>::new_with_bounds(1, u64::max_value(), 3).unwrap();
for _ in 0..seconds {
let t1 = Instant::now();
let start = Instant::now();
let turn_total = counter.load(Ordering::Acquire);
let end = Instant::now();
snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64);
let turn_delta = turn_total - total;
total = turn_total;
let rate = turn_delta as f64 / (duration_as_nanos(t1 - t0) / 1_000_000_000.0);
info!("sample ingest rate: {:.0} samples/sec", rate);
t0 = t1;
thread::sleep(Duration::new(1, 0));
}
info!("--------------------------------------------------------------------------------");
info!(" ingested samples total: {}", total);
info!(
"snapshot retrieval: min: {:9} p50: {:9} p95: {:9} p99: {:9} p999: {:9} max: {:9}",
nanos_to_readable(snapshot_hist.min()),
nanos_to_readable(snapshot_hist.value_at_percentile(50.0)),
nanos_to_readable(snapshot_hist.value_at_percentile(95.0)),
nanos_to_readable(snapshot_hist.value_at_percentile(99.0)),
nanos_to_readable(snapshot_hist.value_at_percentile(99.9)),
nanos_to_readable(snapshot_hist.max())
);
// Wait for the producers to finish so we can get their stats too.
done.store(true, Ordering::SeqCst);
for handle in handles {
let _ = handle.join();
}
}
fn duration_as_nanos(d: Duration) -> f64 {
(d.as_secs() as f64 * 1e9) + d.subsec_nanos() as f64
}
fn nanos_to_readable(t: u64) -> String {
let f = t as f64;
if f < 1_000.0 {
format!("{}ns", f)
} else if f < 1_000_000.0 {
format!("{:.0}μs", f / 1_000.0)
} else if f < 2_000_000_000.0 {
format!("{:.2}ms", f / 1_000_000.0)
} else {
format!("{:.3}s", f / 1_000_000_000.0)
}
}

77
metrics/src/builder.rs Normal file
View File

@ -0,0 +1,77 @@
use crate::Receiver;
use std::error::Error;
use std::fmt;
use std::time::Duration;
/// Errors during receiver creation.
#[derive(Debug, Clone)]
pub enum BuilderError {
/// Failed to spawn the upkeep thread.
///
/// As histograms are windowed, reads and writes require getting the current time so they can
/// perform the required maintenance, or upkeep, on the internal structures to roll over old
/// buckets, etc.
///
/// Acquiring the current time is fast compared to most operations, but is a significant
/// portion of the other time it takes to write to a histogram, which limits overall throughput
/// under high load.
///
/// We spin up a background thread, or the "upkeep thread", which updates a global time source
/// that the read and write operations exclusively rely on. While this source is not as
/// up-to-date as the real clock, it is much faster to access.
UpkeepFailure,
}
impl Error for BuilderError {}
impl fmt::Display for BuilderError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
BuilderError::UpkeepFailure => write!(f, "failed to spawn quanta upkeep thread"),
}
}
}
/// Builder for [`Receiver`].
#[derive(Clone)]
pub struct Builder {
pub(crate) histogram_window: Duration,
pub(crate) histogram_granularity: Duration,
}
impl Default for Builder {
fn default() -> Self {
Self {
histogram_window: Duration::from_secs(10),
histogram_granularity: Duration::from_secs(1),
}
}
}
impl Builder {
/// Creates a new [`Builder`] with default values.
pub fn new() -> Self {
Default::default()
}
/// Sets the histogram configuration.
///
/// Defaults to a 10 second window with 1 second granularity.
///
/// This controls both how long of a time window we track histogram data for, and the
/// granularity in which we roll off old data.
///
/// As an example, with the default values, we would keep the last 10 seconds worth of
/// histogram data, and would remove 1 seconds worth of data at a time as the window rolled
/// forward.
pub fn histogram(mut self, window: Duration, granularity: Duration) -> Self {
self.histogram_window = window;
self.histogram_granularity = granularity;
self
}
/// Create a [`Receiver`] based on this configuration.
pub fn build(self) -> Result<Receiver, BuilderError> {
Receiver::from_builder(self)
}
}

235
metrics/src/common.rs Normal file
View File

@ -0,0 +1,235 @@
use crate::data::AtomicWindowedHistogram;
use metrics_util::StreamingIntegers;
use quanta::Clock;
use std::borrow::Cow;
use std::ops::Deref;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
/// Optimized metric name.
///
/// This can either be a [`&'static str`](str) or [`String`].
pub type MetricName = Cow<'static, str>;
/// A scope, or context, for a metric.
///
/// Not interacted with directly by end users, and only exposed due to a lack of trait method
/// visbility controls.
///
/// See also: [Sink::scoped](crate::Sink::scoped).
#[derive(PartialEq, Eq, Hash, Clone)]
pub enum MetricScope {
Root,
Nested(Vec<String>),
}
impl MetricScope {
pub(crate) fn into_scoped(self, name: MetricName) -> String {
match self {
MetricScope::Root => name.to_string(),
MetricScope::Nested(mut parts) => {
if !name.is_empty() {
parts.push(name.to_string());
}
parts.join(".")
}
}
}
}
pub(crate) type MetricScopeHandle = u64;
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub(crate) enum MetricKind {
Counter,
Gauge,
Histogram,
}
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub(crate) enum MetricIdentifier {
Unlabeled(MetricName, MetricScopeHandle, MetricKind),
}
#[derive(Debug)]
enum ValueState {
Counter(AtomicU64),
Gauge(AtomicI64),
Histogram(AtomicWindowedHistogram),
}
#[derive(Debug)]
pub(crate) enum ValueSnapshot {
Counter(u64),
Gauge(i64),
Histogram(StreamingIntegers),
}
#[derive(Clone, Debug)]
/// Handle to the underlying measurement for a metric.
pub(crate) struct MetricValue {
state: Arc<ValueState>,
}
impl MetricValue {
fn new(state: ValueState) -> Self {
MetricValue {
state: Arc::new(state),
}
}
pub(crate) fn new_counter() -> Self {
Self::new(ValueState::Counter(AtomicU64::new(0)))
}
pub(crate) fn new_gauge() -> Self {
Self::new(ValueState::Gauge(AtomicI64::new(0)))
}
pub(crate) fn new_histogram(window: Duration, granularity: Duration, clock: Clock) -> Self {
Self::new(ValueState::Histogram(AtomicWindowedHistogram::new(
window,
granularity,
clock,
)))
}
pub(crate) fn update_counter(&self, value: u64) {
match self.state.deref() {
ValueState::Counter(inner) => {
inner.fetch_add(value, Ordering::Release);
}
_ => unreachable!("tried to access as counter, not a counter"),
}
}
pub(crate) fn update_gauge(&self, value: i64) {
match self.state.deref() {
ValueState::Gauge(inner) => inner.store(value, Ordering::Release),
_ => unreachable!("tried to access as gauge, not a gauge"),
}
}
pub(crate) fn update_histogram(&self, value: u64) {
match self.state.deref() {
ValueState::Histogram(inner) => inner.record(value),
_ => unreachable!("tried to access as histogram, not a histogram"),
}
}
pub(crate) fn snapshot(&self) -> ValueSnapshot {
match self.state.deref() {
ValueState::Counter(inner) => {
let value = inner.load(Ordering::Acquire);
ValueSnapshot::Counter(value)
}
ValueState::Gauge(inner) => {
let value = inner.load(Ordering::Acquire);
ValueSnapshot::Gauge(value)
}
ValueState::Histogram(inner) => {
let stream = inner.snapshot();
ValueSnapshot::Histogram(stream)
}
}
}
}
/// Trait for types that represent time and can be subtracted from each other to generate a delta.
pub trait Delta {
fn delta(&self, other: Self) -> u64;
}
impl Delta for u64 {
fn delta(&self, other: u64) -> u64 {
self.wrapping_sub(other)
}
}
impl Delta for Instant {
fn delta(&self, other: Instant) -> u64 {
let dur = *self - other;
dur.as_nanos() as u64
}
}
#[cfg(test)]
mod tests {
use super::{MetricScope, MetricValue, ValueSnapshot};
use quanta::Clock;
use std::time::Duration;
#[test]
fn test_metric_scope() {
let root_scope = MetricScope::Root;
assert_eq!(root_scope.into_scoped("".into()), "".to_string());
let root_scope = MetricScope::Root;
assert_eq!(
root_scope.into_scoped("jambalaya".into()),
"jambalaya".to_string()
);
let nested_scope = MetricScope::Nested(vec![]);
assert_eq!(nested_scope.into_scoped("".into()), "".to_string());
let nested_scope = MetricScope::Nested(vec![]);
assert_eq!(
nested_scope.into_scoped("toilet".into()),
"toilet".to_string()
);
let nested_scope = MetricScope::Nested(vec![
"chamber".to_string(),
"of".to_string(),
"secrets".to_string(),
]);
assert_eq!(
nested_scope.into_scoped("".into()),
"chamber.of.secrets".to_string()
);
let nested_scope = MetricScope::Nested(vec![
"chamber".to_string(),
"of".to_string(),
"secrets".to_string(),
]);
assert_eq!(
nested_scope.into_scoped("toilet".into()),
"chamber.of.secrets.toilet".to_string()
);
}
#[test]
fn test_metric_values() {
let counter = MetricValue::new_counter();
counter.update_counter(42);
match counter.snapshot() {
ValueSnapshot::Counter(value) => assert_eq!(value, 42),
_ => panic!("incorrect value snapshot type for counter"),
}
let gauge = MetricValue::new_gauge();
gauge.update_gauge(23);
match gauge.snapshot() {
ValueSnapshot::Gauge(value) => assert_eq!(value, 23),
_ => panic!("incorrect value snapshot type for gauge"),
}
let (mock, _) = Clock::mock();
let histogram =
MetricValue::new_histogram(Duration::from_secs(10), Duration::from_secs(1), mock);
histogram.update_histogram(8675309);
histogram.update_histogram(5551212);
match histogram.snapshot() {
ValueSnapshot::Histogram(stream) => {
assert_eq!(stream.len(), 2);
let values = stream.decompress();
assert_eq!(&values[..], [8675309, 5551212]);
}
_ => panic!("incorrect value snapshot type for histogram"),
}
}
}

17
metrics/src/config.rs Normal file
View File

@ -0,0 +1,17 @@
use crate::Builder;
use std::time::Duration;
/// Holds the configuration for complex metric types.
pub(crate) struct MetricConfiguration {
pub histogram_window: Duration,
pub histogram_granularity: Duration,
}
impl MetricConfiguration {
pub fn from_builder(builder: &Builder) -> Self {
Self {
histogram_window: builder.histogram_window,
histogram_granularity: builder.histogram_granularity,
}
}
}

View File

@ -1,84 +0,0 @@
use crate::receiver::Receiver;
use std::time::Duration;
/// A configuration builder for [`Receiver`].
#[derive(Clone)]
pub struct Configuration {
pub(crate) capacity: usize,
pub(crate) batch_size: usize,
pub(crate) histogram_window: Duration,
pub(crate) histogram_granularity: Duration,
}
impl Default for Configuration {
fn default() -> Configuration {
Configuration {
capacity: 512,
batch_size: 64,
histogram_window: Duration::from_secs(10),
histogram_granularity: Duration::from_secs(1),
}
}
}
impl Configuration {
/// Creates a new [`Configuration`] with default values.
pub fn new() -> Configuration {
Default::default()
}
/// Sets the buffer capacity.
///
/// Defaults to 512.
///
/// This controls the size of the channel used to send metrics. This channel is shared amongst
/// all active sinks. If this channel is full when sending a metric, that send will be blocked
/// until the channel has free space.
///
/// Tweaking this value allows for a trade-off between low memory consumption and throughput
/// burst capabilities. By default, we expect samples to occupy approximately 64 bytes. Thus,
/// at our default value, we preallocate roughly ~32KB.
///
/// Generally speaking, sending and processing metrics is fast enough that the default value of
/// 512 supports millions of samples per second.
pub fn capacity(mut self, capacity: usize) -> Self {
self.capacity = capacity;
self
}
/// Sets the batch size.
///
/// Defaults to 64.
///
/// This controls the size of message batches that we collect for processing. The only real
/// reason to tweak this is to control the latency from the sender side. Larger batches lower
/// the ingest latency in the face of high metric ingest pressure at the cost of higher ingest
/// tail latencies.
///
/// Long story short, you shouldn't need to change this, but it's here if you really do.
pub fn batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
/// Sets the histogram configuration.
///
/// Defaults to a 10 second window with 1 second granularity.
///
/// This controls both how long of a time window we track histogram data for, and the
/// granularity in which we roll off old data.
///
/// As an example, with the default values, we would keep the last 10 seconds worth of
/// histogram data, and would remove 1 seconds worth of data at a time as the window rolled
/// forward.
pub fn histogram(mut self, window: Duration, granularity: Duration) -> Self {
self.histogram_window = window;
self.histogram_granularity = granularity;
self
}
/// Create a [`Receiver`] based on this configuration.
pub fn build(self) -> Receiver {
Receiver::from_config(self)
}
}

View File

@ -1,19 +1,16 @@
use super::data::snapshot::Snapshot;
use crossbeam_channel::{bounded, Sender};
use crate::data::Snapshot;
use crate::registry::{MetricRegistry, ScopeRegistry};
use futures::prelude::*;
use metrics_core::{AsyncSnapshotProvider, SnapshotProvider};
use std::error::Error;
use std::fmt;
use tokio_sync::oneshot;
use std::sync::Arc;
/// Error conditions when retrieving a snapshot.
/// Error during snapshot retrieval.
#[derive(Debug, Clone)]
pub enum SnapshotError {
/// There was an internal error when trying to collect a snapshot.
InternalError,
/// A snapshot was requested but the receiver is shutdown.
ReceiverShutdown,
/// The future was polled again after returning the snapshot.
AlreadyUsed,
}
impl Error for SnapshotError {}
@ -21,33 +18,32 @@ impl Error for SnapshotError {}
impl fmt::Display for SnapshotError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SnapshotError::InternalError => write!(f, "internal error while collecting snapshot"),
SnapshotError::ReceiverShutdown => write!(f, "receiver is shutdown"),
SnapshotError::AlreadyUsed => write!(f, "snapshot already returned from future"),
}
}
}
/// Various control actions performed by a controller.
pub(crate) enum ControlFrame {
/// Takes a snapshot of the current metric state.
Snapshot(Sender<Snapshot>),
/// Takes a snapshot of the current metric state, but uses an asynchronous channel.
SnapshotAsync(oneshot::Sender<Snapshot>),
}
/// Dedicated handle for performing operations on a running [`Receiver`](crate::receiver::Receiver).
/// Handle for acquiring snapshots.
///
/// The caller is able to request metric snapshots at any time without requiring mutable access to
/// the sink. This all flows through the existing control mechanism, and so is very fast.
/// `Controller` is [`metrics-core`]-compatible as a snapshot provider, both for synchronous and
/// asynchronous snapshotting.
///
/// [`metrics-core`]: https://docs.rs/metrics-core
#[derive(Clone)]
pub struct Controller {
control_tx: Sender<ControlFrame>,
metric_registry: Arc<MetricRegistry>,
scope_registry: Arc<ScopeRegistry>,
}
impl Controller {
pub(crate) fn new(control_tx: Sender<ControlFrame>) -> Controller {
Controller { control_tx }
pub(crate) fn new(
metric_registry: Arc<MetricRegistry>,
scope_registry: Arc<ScopeRegistry>,
) -> Controller {
Controller {
metric_registry,
scope_registry,
}
}
}
@ -57,13 +53,8 @@ impl SnapshotProvider for Controller {
/// Gets a snapshot.
fn get_snapshot(&self) -> Result<Snapshot, SnapshotError> {
let (tx, rx) = bounded(0);
let msg = ControlFrame::Snapshot(tx);
self.control_tx
.send(msg)
.map_err(|_| SnapshotError::ReceiverShutdown)
.and_then(move |_| rx.recv().map_err(|_| SnapshotError::InternalError))
let snapshot = self.metric_registry.get_snapshot();
Ok(snapshot)
}
}
@ -74,20 +65,22 @@ impl AsyncSnapshotProvider for Controller {
/// Gets a snapshot asynchronously.
fn get_snapshot_async(&self) -> Self::SnapshotFuture {
let (tx, rx) = oneshot::channel();
let msg = ControlFrame::SnapshotAsync(tx);
self.control_tx
.send(msg)
.map(move |_| SnapshotFuture::Waiting(rx))
.unwrap_or(SnapshotFuture::Errored(SnapshotError::ReceiverShutdown))
let snapshot = self.metric_registry.get_snapshot();
SnapshotFuture::new(snapshot)
}
}
/// A future representing collecting a snapshot.
pub enum SnapshotFuture {
Waiting(oneshot::Receiver<Snapshot>),
Errored(SnapshotError),
pub struct SnapshotFuture {
snapshot: Option<Snapshot>,
}
impl SnapshotFuture {
pub fn new(snapshot: Snapshot) -> Self {
SnapshotFuture {
snapshot: Some(snapshot),
}
}
}
impl Future for SnapshotFuture {
@ -95,9 +88,9 @@ impl Future for SnapshotFuture {
type Error = SnapshotError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
SnapshotFuture::Waiting(rx) => rx.poll().map_err(|_| SnapshotError::InternalError),
SnapshotFuture::Errored(err) => Err(err.clone()),
}
self.snapshot
.take()
.ok_or(SnapshotError::AlreadyUsed)
.map(Async::Ready)
}
}

View File

@ -1,44 +1,19 @@
use crate::data::ScopedKey;
use fnv::FnvBuildHasher;
use hashbrown::HashMap;
use crate::common::MetricValue;
pub(crate) struct Counter {
data: HashMap<ScopedKey, u64, FnvBuildHasher>,
/// Proxy object to update a counter.
pub struct Counter {
handle: MetricValue,
}
impl Counter {
pub fn new() -> Counter {
Counter {
data: HashMap::default(),
}
}
pub fn update(&mut self, key: ScopedKey, delta: u64) {
let value = self.data.entry(key).or_insert(0);
*value = value.wrapping_add(delta);
}
pub fn values(&self) -> Vec<(ScopedKey, u64)> {
self.data.iter().map(|(k, v)| (k.clone(), *v)).collect()
/// Records a value for the counter.
pub fn record(&self, value: u64) {
self.handle.update_counter(value);
}
}
#[cfg(test)]
mod tests {
use super::{Counter, ScopedKey};
#[test]
fn test_counter_simple_update() {
let mut counter = Counter::new();
let key = ScopedKey(0, "foo".into());
counter.update(key, 42);
let key2 = ScopedKey(0, "foo".to_owned().into());
counter.update(key2, 31);
let values = counter.values();
assert_eq!(values.len(), 1);
assert_eq!(values[0].1, 73);
impl From<MetricValue> for Counter {
fn from(handle: MetricValue) -> Self {
Self { handle }
}
}

View File

@ -1,48 +1,19 @@
use crate::data::ScopedKey;
use fnv::FnvBuildHasher;
use hashbrown::HashMap;
use crate::common::MetricValue;
pub(crate) struct Gauge {
data: HashMap<ScopedKey, i64, FnvBuildHasher>,
/// Proxy object to update a gauge.
pub struct Gauge {
handle: MetricValue,
}
impl Gauge {
pub fn new() -> Gauge {
Gauge {
data: HashMap::default(),
}
}
pub fn update(&mut self, key: ScopedKey, value: i64) {
let ivalue = self.data.entry(key).or_insert(0);
*ivalue = value;
}
pub fn values(&self) -> Vec<(ScopedKey, i64)> {
self.data.iter().map(|(k, v)| (k.clone(), *v)).collect()
/// Records a value for the gauge.
pub fn record(&self, value: i64) {
self.handle.update_gauge(value);
}
}
#[cfg(test)]
mod tests {
use super::{Gauge, ScopedKey};
#[test]
fn test_gauge_simple_update() {
let mut gauge = Gauge::new();
let key = ScopedKey(0, "foo".into());
gauge.update(key, 42);
let values = gauge.values();
assert_eq!(values.len(), 1);
assert_eq!(values[0].1, 42);
let key2 = ScopedKey(0, "foo".to_owned().into());
gauge.update(key2, 43);
let values = gauge.values();
assert_eq!(values.len(), 1);
assert_eq!(values[0].1, 43);
impl From<MetricValue> for Gauge {
fn from(handle: MetricValue) -> Self {
Self { handle }
}
}

View File

@ -1,212 +1,229 @@
use crate::{data::ScopedKey, helper::duration_as_nanos};
use fnv::FnvBuildHasher;
use hashbrown::HashMap;
use std::time::{Duration, Instant};
use crate::common::{Delta, MetricValue};
use crate::helper::duration_as_nanos;
use metrics_util::{AtomicBucket, StreamingIntegers};
use quanta::Clock;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::Duration;
pub(crate) struct Histogram {
window: Duration,
granularity: Duration,
data: HashMap<ScopedKey, WindowedRawHistogram, FnvBuildHasher>,
/// Proxy object to update a histogram.
pub struct Histogram {
handle: MetricValue,
}
impl Histogram {
pub fn new(window: Duration, granularity: Duration) -> Histogram {
Histogram {
window,
granularity,
data: HashMap::default(),
}
/// Records a timing for the histogram.
pub fn record_timing<D: Delta>(&self, start: D, end: D) {
let value = end.delta(start);
self.handle.update_histogram(value);
}
pub fn update(&mut self, key: ScopedKey, value: u64) {
if let Some(wh) = self.data.get_mut(&key) {
wh.update(value);
} else {
let mut wh = WindowedRawHistogram::new(self.window, self.granularity);
wh.update(value);
let _ = self.data.insert(key, wh);
}
}
pub fn upkeep(&mut self, at: Instant) {
for (_, histogram) in self.data.iter_mut() {
histogram.upkeep(at);
}
}
pub fn values(&self) -> Vec<(ScopedKey, HistogramSnapshot)> {
self.data
.iter()
.map(|(k, v)| (k.clone(), v.snapshot()))
.collect()
/// Records a value for the histogram.
pub fn record_value(&self, value: u64) {
self.handle.update_histogram(value);
}
}
pub(crate) struct WindowedRawHistogram {
buckets: Vec<Vec<u64>>,
num_buckets: usize,
bucket_index: usize,
last_upkeep: Instant,
granularity: Duration,
impl From<MetricValue> for Histogram {
fn from(handle: MetricValue) -> Self {
Self { handle }
}
}
impl WindowedRawHistogram {
pub fn new(window: Duration, granularity: Duration) -> WindowedRawHistogram {
let num_buckets =
((duration_as_nanos(window) / duration_as_nanos(granularity)) as usize) + 1;
let mut buckets = Vec::with_capacity(num_buckets);
#[derive(Debug)]
pub struct AtomicWindowedHistogram {
buckets: Vec<AtomicBucket<u64>>,
bucket_count: usize,
granularity: u64,
index: AtomicUsize,
next_upkeep: AtomicU64,
clock: Clock,
}
for _ in 0..num_buckets {
let histogram = Vec::new();
buckets.push(histogram);
impl AtomicWindowedHistogram {
pub fn new(window: Duration, granularity: Duration, clock: Clock) -> Self {
let window_ns = duration_as_nanos(window);
let granularity_ns = duration_as_nanos(granularity);
let now = clock.recent();
let bucket_count = ((window_ns / granularity_ns) as usize) + 1;
let mut buckets = Vec::new();
for _ in 0..bucket_count {
buckets.push(AtomicBucket::new());
}
WindowedRawHistogram {
let next_upkeep = now + granularity_ns;
AtomicWindowedHistogram {
buckets,
num_buckets,
bucket_index: 0,
last_upkeep: Instant::now(),
granularity,
bucket_count,
granularity: granularity_ns,
index: AtomicUsize::new(0),
next_upkeep: AtomicU64::new(next_upkeep),
clock,
}
}
pub fn upkeep(&mut self, at: Instant) {
if at >= self.last_upkeep + self.granularity {
self.bucket_index += 1;
self.bucket_index %= self.num_buckets;
self.buckets[self.bucket_index].clear();
self.last_upkeep = at;
}
}
pub fn snapshot(&self) -> StreamingIntegers {
// Run upkeep to make sure our window reflects any time passage since the last write.
let _ = self.upkeep();
pub fn update(&mut self, value: u64) {
self.buckets[self.bucket_index].push(value);
}
pub fn snapshot(&self) -> HistogramSnapshot {
let mut aggregate = Vec::new();
let mut streaming = StreamingIntegers::new();
for bucket in &self.buckets {
aggregate.extend_from_slice(&bucket);
bucket.data_with(|block| streaming.compress(block));
}
HistogramSnapshot::new(aggregate)
}
}
/// A point-in-time snapshot of a single histogram.
#[derive(Debug, PartialEq, Eq)]
pub struct HistogramSnapshot {
values: Vec<u64>,
}
impl HistogramSnapshot {
pub(crate) fn new(values: Vec<u64>) -> Self {
HistogramSnapshot { values }
streaming
}
/// Gets the raw values that compromise the entire histogram.
pub fn values(&self) -> &Vec<u64> {
&self.values
pub fn record(&self, value: u64) {
let index = self.upkeep();
self.buckets[index].push(value);
}
fn upkeep(&self) -> usize {
loop {
let now = self.clock.recent();
let index = self.index.load(Ordering::Acquire);
// See if we need to update the index because we're past our upkeep target.
let next_upkeep = self.next_upkeep.load(Ordering::Acquire);
if now < next_upkeep {
return index;
}
let new_index = (index + 1) % self.bucket_count;
if self
.index
.compare_and_swap(index, new_index, Ordering::AcqRel)
== index
{
// If we've had to update the index, go ahead and clear the bucket in front of our
// new bucket. Since we write low to high/left to right, the "oldest" bucket,
// which is the one that should be dropped, is the one we just updated our index
// to, but we always add an extra bucket on top of what we need so that we can
// clear that one, instead of clearing the one we're going to be writing to next so
// that we don't clear the values of writers who start writing to the new bucket
// while we're doing the clear.
self.buckets[new_index].clear();
// Since another write could outrun us, just do a single CAS. 99.99999999% of the
// time, the CAS will go through, because it takes nanoseconds and our granularity
// will be in the hundreds of milliseconds, if not seconds.
self.next_upkeep.compare_and_swap(
next_upkeep,
next_upkeep + self.granularity,
Ordering::AcqRel,
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::{Histogram, ScopedKey, WindowedRawHistogram};
use std::time::{Duration, Instant};
use super::{AtomicWindowedHistogram, Clock};
use std::time::Duration;
#[test]
fn test_histogram_simple_update() {
let mut histogram = Histogram::new(Duration::new(5, 0), Duration::new(1, 0));
let (clock, _ctl) = Clock::mock();
let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock);
let key = ScopedKey(0, "foo".into());
histogram.update(key, 1245);
h.record(1245);
let values = histogram.values();
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 1);
let values = snapshot.decompress();
assert_eq!(values.len(), 1);
let hdr = &values[0].1;
assert_eq!(hdr.values().len(), 1);
assert_eq!(hdr.values().get(0).unwrap(), &1245);
assert_eq!(values.get(0).unwrap(), &1245);
}
#[test]
fn test_histogram_complex_update() {
let mut histogram = Histogram::new(Duration::new(5, 0), Duration::new(1, 0));
let (clock, _ctl) = Clock::mock();
let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock);
let key = ScopedKey(0, "foo".into());
histogram.update(key.clone(), 1245);
histogram.update(key.clone(), 213);
histogram.update(key.clone(), 1022);
histogram.update(key, 1248);
h.record(1245);
h.record(213);
h.record(1022);
h.record(1248);
let values = histogram.values();
assert_eq!(values.len(), 1);
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 4);
let hdr = &values[0].1;
assert_eq!(hdr.values().len(), 4);
assert_eq!(hdr.values().get(0).unwrap(), &1245);
assert_eq!(hdr.values().get(1).unwrap(), &213);
assert_eq!(hdr.values().get(2).unwrap(), &1022);
assert_eq!(hdr.values().get(3).unwrap(), &1248);
let values = snapshot.decompress();
assert_eq!(values.len(), 4);
assert_eq!(values.get(0).unwrap(), &1245);
assert_eq!(values.get(1).unwrap(), &213);
assert_eq!(values.get(2).unwrap(), &1022);
assert_eq!(values.get(3).unwrap(), &1248);
}
#[test]
fn test_windowed_histogram_rollover() {
let mut wh = WindowedRawHistogram::new(Duration::new(5, 0), Duration::new(1, 0));
let now = Instant::now();
let (clock, ctl) = Clock::mock();
let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 0);
// Histogram is empty, snapshot is empty.
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 0);
wh.update(1);
wh.update(2);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 2);
// Immediately add two values, and observe the histogram and snapshot having two values.
h.record(1);
h.record(2);
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 2);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 3);
// Roll forward 3 seconds, should still have everything.
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 2);
ctl.increment(Duration::from_secs(3));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 2);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 3);
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 2);
// Roll forward 1 second, should still have everything.
ctl.increment(Duration::from_secs(1));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 2);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 3);
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 2);
// Roll forward 1 second, should still have everything.
ctl.increment(Duration::from_secs(1));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 2);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 3);
// Pump in some new values.
wh.update(3);
wh.update(4);
wh.update(5);
// Pump in some new values. We should have a total of 5 values now.
h.record(3);
h.record(4);
h.record(5);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 5);
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 5);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 15);
// Roll forward 3 seconds, and make sure the first two values are gone.
// You might think this should be 2 seconds, but we have one extra bucket
// allocated so that there's always a clear bucket that we can write into.
// This means we have more than our total window, but only having the exact
// number of buckets would mean we were constantly missing a bucket's worth
// of granularity.
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 5);
// Roll forward 6 seconds, in increments. The first one rolls over a single bucket, and
// cleans bucket #0, the first one we wrote to. The second and third ones get us right up
// to the last three values, and then clear them out.
ctl.increment(Duration::from_secs(1));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 3);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 12);
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 5);
ctl.increment(Duration::from_secs(4));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 3);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 12);
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 3);
ctl.increment(Duration::from_secs(1));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 0);
}
}

View File

@ -1,73 +1,13 @@
use std::{
borrow::Cow,
fmt::{self, Display},
};
//! Core data types for metrics.
mod counter;
pub use counter::Counter;
pub mod counter;
pub mod gauge;
pub mod histogram;
pub mod snapshot;
mod gauge;
pub use gauge::Gauge;
pub(crate) use self::{counter::Counter, gauge::Gauge, histogram::Histogram, snapshot::Snapshot};
mod histogram;
pub(crate) use histogram::AtomicWindowedHistogram;
pub use histogram::Histogram;
pub type MetricKey = Cow<'static, str>;
/// A measurement.
///
/// Samples are the decoupled way of submitting data into the sink.
#[derive(Debug)]
pub(crate) enum Sample {
/// A counter delta.
///
/// The value is added directly to the existing counter, and so negative deltas will decrease
/// the counter, and positive deltas will increase the counter.
Count(ScopedKey, u64),
/// A single value, also known as a gauge.
///
/// Values operate in last-write-wins mode.
///
/// Values themselves cannot be incremented or decremented, so you must hold them externally
/// before sending them.
Gauge(ScopedKey, i64),
/// A timed sample.
///
/// Includes the start and end times.
TimingHistogram(ScopedKey, u64, u64),
/// A single value measured over time.
///
/// Unlike a gauge, where the value is only ever measured at a point in time, value histogram
/// measure values over time, and their distribution. This is nearly identical to timing
/// histograms, since the end result is just a single number, but we don't spice it up with
/// special unit labels or anything.
ValueHistogram(ScopedKey, u64),
}
/// An integer scoped metric key.
#[derive(Clone, Hash, PartialEq, Eq, Debug)]
pub(crate) struct ScopedKey(pub u64, pub MetricKey);
impl ScopedKey {
pub(crate) fn id(&self) -> u64 {
self.0
}
pub(crate) fn into_string_scoped(self, scope: String) -> StringScopedKey {
StringScopedKey(scope, self.1)
}
}
/// A string scoped metric key.
#[derive(Clone, Hash, PartialEq, Eq, Debug)]
pub(crate) struct StringScopedKey(String, MetricKey);
impl Display for StringScopedKey {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.0.is_empty() {
write!(f, "{}", self.1)
} else {
write!(f, "{}.{}", self.0, self.1.as_ref())
}
}
}
mod snapshot;
pub use snapshot::Snapshot;

View File

@ -1,86 +1,28 @@
use super::histogram::HistogramSnapshot;
use crate::common::ValueSnapshot;
use metrics_core::{Recorder, Snapshot as MetricsSnapshot};
use std::fmt::Display;
/// A typed metric measurement, used in snapshots.
///
/// This type provides a way to wrap the value of a metric, for use in a snapshot, while also
/// providing the overall type of the metric, so that downstream consumers who how to properly
/// format the data.
#[derive(Debug, PartialEq, Eq)]
pub enum TypedMeasurement {
Counter(String, u64),
Gauge(String, i64),
TimingHistogram(String, HistogramSnapshot),
ValueHistogram(String, HistogramSnapshot),
}
/// A point-in-time view of metric data.
#[derive(Default, Debug)]
pub struct Snapshot {
measurements: Vec<TypedMeasurement>,
measurements: Vec<(String, ValueSnapshot)>,
}
impl Snapshot {
/// Stores a counter value for the given metric key.
pub(crate) fn set_count<T>(&mut self, key: T, value: u64)
where
T: Display,
{
self.measurements
.push(TypedMeasurement::Counter(key.to_string(), value));
}
/// Stores a gauge value for the given metric key.
pub(crate) fn set_gauge<T>(&mut self, key: T, value: i64)
where
T: Display,
{
self.measurements
.push(TypedMeasurement::Gauge(key.to_string(), value));
}
/// Sets timing percentiles for the given metric key.
///
/// From the given `HdrHistogram`, all the specific `percentiles` will be extracted and stored.
pub(crate) fn set_timing_histogram<T>(&mut self, key: T, h: HistogramSnapshot)
where
T: Display,
{
self.measurements
.push(TypedMeasurement::TimingHistogram(key.to_string(), h));
}
/// Sets value percentiles for the given metric key.
///
/// From the given `HdrHistogram`, all the specific `percentiles` will be extracted and stored.
pub(crate) fn set_value_histogram<T>(&mut self, key: T, h: HistogramSnapshot)
where
T: Display,
{
self.measurements
.push(TypedMeasurement::ValueHistogram(key.to_string(), h));
}
/// Converts this [`Snapshot`] to the underlying vector of measurements.
pub fn into_measurements(self) -> Vec<TypedMeasurement> {
self.measurements
pub(crate) fn from(from: Vec<(String, ValueSnapshot)>) -> Self {
Snapshot { measurements: from }
}
}
impl MetricsSnapshot for Snapshot {
/// Records the snapshot to the given recorder.
fn record<R: Recorder>(&self, recorder: &mut R) {
for measurement in &self.measurements {
match measurement {
TypedMeasurement::Counter(key, value) => recorder.record_counter(key, *value),
TypedMeasurement::Gauge(key, value) => recorder.record_gauge(key, *value),
TypedMeasurement::TimingHistogram(key, hs) => {
recorder.record_histogram(key, hs.values().as_slice());
}
TypedMeasurement::ValueHistogram(key, hs) => {
recorder.record_histogram(key, hs.values().as_slice());
}
for (key, snapshot) in &self.measurements {
match snapshot {
ValueSnapshot::Counter(value) => recorder.record_counter(key, *value),
ValueSnapshot::Gauge(value) => recorder.record_gauge(key, *value),
ValueSnapshot::Histogram(stream) => stream.decompress_with(|values| {
recorder.record_histogram(key, values);
}),
}
}
}
@ -88,7 +30,8 @@ impl MetricsSnapshot for Snapshot {
#[cfg(test)]
mod tests {
use super::{HistogramSnapshot, MetricsSnapshot, Recorder, Snapshot, TypedMeasurement};
use super::{MetricsSnapshot, Recorder, Snapshot, ValueSnapshot};
use metrics_util::StreamingIntegers;
use std::collections::HashMap;
#[derive(Default)]
@ -128,29 +71,19 @@ mod tests {
}
}
#[test]
fn test_snapshot_simple_set_and_get() {
let key = "ok".to_owned();
let mut snapshot = Snapshot::default();
snapshot.set_count(key.clone(), 1);
snapshot.set_gauge(key.clone(), 42);
let values = snapshot.into_measurements();
assert_eq!(values[0], TypedMeasurement::Counter(key.clone(), 1));
assert_eq!(values[1], TypedMeasurement::Gauge(key.clone(), 42));
}
#[test]
fn test_snapshot_recorder() {
let key = "ok".to_owned();
let mut snapshot = Snapshot::default();
snapshot.set_count(key.clone(), 7);
snapshot.set_gauge(key.clone(), 42);
let mut measurements = Vec::new();
measurements.push((key.clone(), ValueSnapshot::Counter(7)));
measurements.push((key.clone(), ValueSnapshot::Gauge(42)));
let hvalues = vec![10, 25, 42, 97];
let histogram = HistogramSnapshot::new(hvalues);
snapshot.set_timing_histogram(key.clone(), histogram);
let mut stream = StreamingIntegers::new();
stream.compress(&hvalues);
measurements.push((key.clone(), ValueSnapshot::Histogram(stream)));
let snapshot: Snapshot = Snapshot::from(measurements);
let mut recorder = MockRecorder::default();
snapshot.record(&mut recorder);

View File

@ -1,12 +1,4 @@
use std::{
io::{Error, ErrorKind},
time::Duration,
};
/// Helpers to create an I/O error from a string.
pub fn io_error(reason: &str) -> Error {
Error::new(ErrorKind::Other, reason)
}
use std::time::Duration;
/// Converts a duration to nanoseconds.
pub fn duration_as_nanos(d: Duration) -> u64 {

View File

@ -7,16 +7,18 @@
//!
//! The library follows a pattern of "senders" and a "receiver."
//!
//! Callers create a [`Receiver`], which acts as a contained unit: metric registration,
//! aggregation, and summarization. The [`Receiver`] is intended to be spawned onto a dedicated
//! background thread.
//! Callers create a [`Receiver`], which acts as a registry for all metrics that flow through it.
//! It allows creating new sinks as well as controllers, both necessary to push in and pull out
//! metrics from the system. It also manages background resources necessary for the registry to
//! operate.
//!
//! Once a [`Receiver`] is created, callers can either create a [`Sink`] for sending metrics, or a
//! [`Controller`] for getting metrics out.
//!
//! A [`Sink`] can be cheaply cloned and does not require a mutable reference to send metrics, so
//! callers have increased flexibility in usage and control over whether or not to clone sinks,
//! share references, etc.
//! A [`Sink`] can be cheaply cloned, and offers convenience methods for getting the current time
//! as well as getting direct handles to a given metric. This allows users to either work with the
//! fuller API exposed by [`Sink`] or to take a compositional approach and embed fields that
//! represent each particular metric to be sent.
//!
//! A [`Controller`] provides both a synchronous and asynchronous snapshotting interface, which is
//! [`metrics-core`][metrics_core] compatible for exporting. This allows flexibility in
@ -25,8 +27,11 @@
//!
//! # Performance
//!
//! Being based on [`crossbeam-channel`][crossbeam_channel] allows us to process close to ten
//! million metrics per second using a single core, with average ingest latencies of around 100ns.
//! Users can expect to be able to send tens of millions of samples per second, with ingest
//! latencies at roughly 65-70ns at p50, and 250ns at p99. Depending on the workload -- counters
//! vs histograms -- latencies may be even lower, as counters and gauges are markedly faster to
//! update than histograms. Concurrent updates of the same metric will also cause natural
//! contention and lower the throughput/increase the latency of ingestion.
//!
//! # Metrics
//!
@ -39,8 +44,8 @@
//! # extern crate metrics;
//! use metrics::Receiver;
//! use std::{thread, time::Duration};
//! let receiver = Receiver::builder().build();
//! let sink = receiver.get_sink();
//! let receiver = Receiver::builder().build().expect("failed to create receiver");
//! let mut sink = receiver.get_sink();
//!
//! // We can update a counter. Counters are monotonic, unsigned integers that start at 0 and
//! // increase over time.
@ -54,18 +59,18 @@
//! // which utilizes a high-speed internal clock. This method returns the time in nanoseconds, so
//! // we get great resolution, but giving the time in nanoseconds isn't required! If you want to
//! // send it in another unit, that's fine, but just pay attention to that fact when viewing and
//! // using those metrics once exported.
//! // using those metrics once exported. We also support passing `Instant` values -- both `start`
//! // and `end` need to be the same type, though! -- and we'll take the nanosecond output of that.
//! let start = sink.now();
//! thread::sleep(Duration::from_millis(10));
//! let end = sink.now();
//! sink.record_timing("db.gizmo_query", start, end);
//! sink.record_timing("db.queries.select_products_ns", start, end);
//!
//! // Finally, we can update a value histogram. Technically speaking, value histograms aren't
//! // fundamentally different from timing histograms. If you use a timing histogram, we do the
//! // math for you of getting the time difference, and we make sure the metric name has the right
//! // unit suffix so you can tell it's measuring time, but other than that, nearly identical!
//! let buf_size = 4096;
//! sink.record_value("buf_size", buf_size);
//! // math for you of getting the time difference, but other than that, identical under the hood.
//! let row_count = 46;
//! sink.record_value("db.queries.select_products_num_rows", row_count);
//! ```
//!
//! # Scopes
@ -82,24 +87,24 @@
//! ```
//! # extern crate metrics;
//! use metrics::Receiver;
//! let receiver = Receiver::builder().build();
//! let receiver = Receiver::builder().build().expect("failed to create receiver");
//!
//! // This sink has no scope aka the root scope. The metric will just end up as "widgets".
//! let root_sink = receiver.get_sink();
//! let mut root_sink = receiver.get_sink();
//! root_sink.record_count("widgets", 42);
//!
//! // This sink is under the "secret" scope. Since we derived ourselves from the root scope,
//! // we're not nested under anything, but our metric name will end up being "secret.widgets".
//! let scoped_sink = root_sink.scoped("secret");
//! let mut scoped_sink = root_sink.scoped("secret");
//! scoped_sink.record_count("widgets", 42);
//!
//! // This sink is under the "supersecret" scope, but we're also nested! The metric name for this
//! // sample will end up being "secret.supersecret.widget".
//! let scoped_sink_two = scoped_sink.scoped("supersecret");
//! let mut scoped_sink_two = scoped_sink.scoped("supersecret");
//! scoped_sink_two.record_count("widgets", 42);
//!
//! // Sinks retain their scope even when cloned, so the metric name will be the same as above.
//! let cloned_sink = scoped_sink_two.clone();
//! let mut cloned_sink = scoped_sink_two.clone();
//! cloned_sink.record_count("widgets", 42);
//!
//! // This sink will be nested two levels deeper than its parent by using a slightly different
@ -107,24 +112,70 @@
//! // nesting N levels deep.
//! //
//! // This metric name will end up being "super.secret.ultra.special.widgets".
//! let scoped_sink_three = scoped_sink.scoped(&["super", "secret", "ultra", "special"]);
//! let mut scoped_sink_three = scoped_sink.scoped(&["super", "secret", "ultra", "special"]);
//! scoped_sink_two.record_count("widgets", 42);
//! ```
//!
//! [crossbeam_channel]: https://docs.rs/crossbeam-channel
//! # Snapshots
//!
//! Naturally, we need a way to get the metrics out of the system, which is where snapshots come
//! into play. By utilizing a [`Controller`], we can take a snapshot of the current metrics in the
//! registry, and then output them to any desired system/interface by utilizing
//! [`Recorder`](metrics_core::Recorder). A number of pre-baked recorders (which only concern
//! themselves with formatting the data) and exporters (which take the formatted data and either
//! serve it up, such as exposing an HTTP endpoint, or write it somewhere, like stdout) are
//! available, some of which are exposed by this crate.
//!
//! Let's take an example of writing out our metrics in a yaml-like format, writing them via
//! `log!`:
//! ```
//! # extern crate metrics;
//! use metrics::{Receiver, recorders::TextRecorder, exporters::LogExporter};
//! use log::Level;
//! use std::{thread, time::Duration};
//! let receiver = Receiver::builder().build().expect("failed to create receiver");
//! let mut sink = receiver.get_sink();
//!
//! // We can update a counter. Counters are monotonic, unsigned integers that start at 0 and
//! // increase over time.
//! // Take some measurements, similar to what we had in other examples:
//! sink.record_count("widgets", 5);
//! sink.record_gauge("red_balloons", 99);
//!
//! let start = sink.now();
//! thread::sleep(Duration::from_millis(10));
//! let end = sink.now();
//! sink.record_timing("db.queries.select_products_ns", start, end);
//! sink.record_timing("db.gizmo_query", start, end);
//!
//! let num_rows = 46;
//! sink.record_value("db.queries.select_products_num_rows", num_rows);
//!
//! // Now create our exporter/recorder configuration, and wire it up.
//! let exporter = LogExporter::new(receiver.get_controller(), TextRecorder::new(), Level::Info);
//!
//! // This exporter will now run every 5 seconds, taking a snapshot, rendering it, and writing it
//! // via `log!` at the informational level. This particular exporter is running directly on the
//! // current thread, and not on a background thread.
//! //
//! // exporter.run(Duration::from_secs(5));
//! ```
//! Most exporters have the ability to run on the current thread or to be converted into a future
//! which can be spawned on any Tokio-compatible runtime.
//!
//! [metrics_core]: https://docs.rs/metrics-core
mod configuration;
//! [`Recorder`]: https://docs.rs/metrics-core/0.3.1/metrics_core/trait.Recorder.html
mod builder;
mod common;
mod config;
mod control;
mod data;
pub mod data;
mod helper;
mod receiver;
mod scopes;
mod registry;
mod sink;
#[cfg(any(
feature = "metrics-exporter-log",
feature = "metrics-exporter-http"
))]
#[cfg(any(feature = "metrics-exporter-log", feature = "metrics-exporter-http"))]
pub mod exporters;
#[cfg(any(
@ -134,13 +185,9 @@ pub mod exporters;
pub mod recorders;
pub use self::{
configuration::Configuration,
builder::{Builder, BuilderError},
common::{Delta, MetricName, MetricScope},
control::{Controller, SnapshotError},
data::histogram::HistogramSnapshot,
receiver::Receiver,
sink::{AsScoped, Sink, SinkError},
};
pub mod snapshot {
pub use super::data::snapshot::{Snapshot, TypedMeasurement};
}

View File

@ -1,220 +1,73 @@
use crate::{
configuration::Configuration,
control::{ControlFrame, Controller},
data::{Counter, Gauge, Histogram, Sample, ScopedKey, Snapshot, StringScopedKey},
scopes::Scopes,
builder::{Builder, BuilderError},
common::MetricScope,
config::MetricConfiguration,
control::Controller,
registry::{MetricRegistry, ScopeRegistry},
sink::Sink,
};
use crossbeam_channel::{self, bounded, tick, Select, TryRecvError};
use quanta::Clock;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use quanta::{Builder as UpkeepBuilder, Clock, Handle as UpkeepHandle};
use std::sync::Arc;
use std::time::Duration;
/// Wrapper for all messages that flow over the data channel between sink/receiver.
pub(crate) enum MessageFrame {
/// A normal data message holding a metric sample.
Data(Sample),
}
/// Metrics receiver which aggregates and processes samples.
/// Central store for metrics.
///
/// `Receiver` is the nucleus for all metrics operations. While no operations are performed by it
/// directly, it holds the registeries and references to resources and so it must live as long as
/// any [`Sink`] or `[`Controller`] does.
pub struct Receiver {
config: Configuration,
// Sample aggregation machinery.
msg_tx: crossbeam_channel::Sender<MessageFrame>,
msg_rx: Option<crossbeam_channel::Receiver<MessageFrame>>,
control_tx: crossbeam_channel::Sender<ControlFrame>,
control_rx: Option<crossbeam_channel::Receiver<ControlFrame>>,
// Metric machinery.
counter: Counter,
gauge: Gauge,
thistogram: Histogram,
vhistogram: Histogram,
metric_registry: Arc<MetricRegistry>,
scope_registry: Arc<ScopeRegistry>,
clock: Clock,
scopes: Arc<Scopes>,
_upkeep_handle: UpkeepHandle,
}
impl Receiver {
pub(crate) fn from_config(config: Configuration) -> Receiver {
// Create our data, control, and buffer channels.
let (msg_tx, msg_rx) = bounded(config.capacity);
let (control_tx, control_rx) = bounded(16);
pub(crate) fn from_builder(builder: Builder) -> Result<Receiver, BuilderError> {
// Configure our clock and configure the quanta upkeep thread. The upkeep thread does that
// for us, and keeps us within `upkeep_interval` of the true time. The reads of this cache
// time are faster than calling the underlying time source directly, and for histogram
// windowing, we can afford to have a very granular value compared to the raw nanosecond
// precsion provided by quanta by default.
let clock = Clock::new();
let upkeep_interval = Duration::from_millis(50);
let upkeep = UpkeepBuilder::new_with_clock(upkeep_interval, clock.clone());
let _upkeep_handle = upkeep.start().map_err(|_| BuilderError::UpkeepFailure)?;
let histogram_window = config.histogram_window;
let histogram_granularity = config.histogram_granularity;
let metric_config = MetricConfiguration::from_builder(&builder);
Receiver {
config,
msg_tx,
msg_rx: Some(msg_rx),
control_tx,
control_rx: Some(control_rx),
counter: Counter::new(),
gauge: Gauge::new(),
thistogram: Histogram::new(histogram_window, histogram_granularity),
vhistogram: Histogram::new(histogram_window, histogram_granularity),
clock: Clock::new(),
scopes: Arc::new(Scopes::new()),
}
let scope_registry = Arc::new(ScopeRegistry::new());
let metric_registry = Arc::new(MetricRegistry::new(
scope_registry.clone(),
metric_config,
clock.clone(),
));
Ok(Receiver {
metric_registry,
scope_registry,
clock,
_upkeep_handle,
})
}
/// Gets a builder to configure a [`Receiver`] instance with.
pub fn builder() -> Configuration {
Configuration::default()
/// Creates a new [`Builder`] for building a [`Receiver`].
pub fn builder() -> Builder {
Builder::default()
}
/// Creates a [`Sink`] bound to this receiver.
pub fn get_sink(&self) -> Sink {
Sink::new_with_scope_id(
self.msg_tx.clone(),
Sink::new(
self.metric_registry.clone(),
self.scope_registry.clone(),
MetricScope::Root,
self.clock.clone(),
self.scopes.clone(),
"".to_owned(),
0,
)
}
/// Creates a [`Controller`] bound to this receiver.
pub fn get_controller(&self) -> Controller {
Controller::new(self.control_tx.clone())
}
/// Run the receiver.
///
/// This is blocking, and should be run in a dedicated background thread.
pub fn run(&mut self) {
let batch_size = self.config.batch_size;
let mut batch = Vec::with_capacity(batch_size);
let upkeep_rx = tick(Duration::from_millis(100));
let control_rx = self.control_rx.take().expect("failed to take control rx");
let msg_rx = self.msg_rx.take().expect("failed to take msg rx");
let mut selector = Select::new();
let _ = selector.recv(&upkeep_rx);
let _ = selector.recv(&control_rx);
let _ = selector.recv(&msg_rx);
loop {
// Block on having something to do.
let _ = selector.ready();
if upkeep_rx.try_recv().is_ok() {
let now = Instant::now();
self.thistogram.upkeep(now);
self.vhistogram.upkeep(now);
}
while let Ok(cframe) = control_rx.try_recv() {
self.process_control_frame(cframe);
}
loop {
match msg_rx.try_recv() {
Ok(mframe) => batch.push(mframe),
Err(TryRecvError::Empty) => break,
Err(e) => eprintln!("error receiving message frame: {}", e),
}
if batch.len() == batch_size {
break;
}
}
if !batch.is_empty() {
for mframe in batch.drain(0..) {
self.process_msg_frame(mframe);
}
}
}
}
/// Gets the string representation of an integer scope.
///
/// Returns `Some(scope)` if found, `None` otherwise. Scope ID `0` is reserved for the root
/// scope.
fn get_string_scope(&self, key: ScopedKey) -> Option<StringScopedKey> {
let scope_id = key.id();
if scope_id == 0 {
return Some(key.into_string_scoped("".to_owned()));
}
self.scopes
.get(scope_id)
.map(|scope| key.into_string_scoped(scope))
}
/// Gets a snapshot of the current metrics/facets.
fn get_snapshot(&self) -> Snapshot {
let mut snapshot = Snapshot::default();
let cvalues = self.counter.values();
let gvalues = self.gauge.values();
let tvalues = self.thistogram.values();
let vvalues = self.vhistogram.values();
for (key, value) in cvalues {
if let Some(actual_key) = self.get_string_scope(key) {
snapshot.set_count(actual_key, value);
}
}
for (key, value) in gvalues {
if let Some(actual_key) = self.get_string_scope(key) {
snapshot.set_gauge(actual_key, value);
}
}
for (key, value) in tvalues {
if let Some(actual_key) = self.get_string_scope(key) {
snapshot.set_timing_histogram(actual_key, value);
}
}
for (key, value) in vvalues {
if let Some(actual_key) = self.get_string_scope(key) {
snapshot.set_value_histogram(actual_key, value);
}
}
snapshot
}
/// Processes a control frame.
fn process_control_frame(&self, msg: ControlFrame) {
match msg {
ControlFrame::Snapshot(tx) => {
let snapshot = self.get_snapshot();
let _ = tx.send(snapshot);
}
ControlFrame::SnapshotAsync(tx) => {
let snapshot = self.get_snapshot();
let _ = tx.send(snapshot);
}
}
}
/// Processes a message frame.
fn process_msg_frame(&mut self, msg: MessageFrame) {
match msg {
MessageFrame::Data(sample) => match sample {
Sample::Count(key, count) => {
self.counter.update(key, count);
}
Sample::Gauge(key, value) => {
self.gauge.update(key, value);
}
Sample::TimingHistogram(key, start, end) => {
let delta = end - start;
self.counter.update(key.clone(), 1);
self.thistogram.update(key, delta);
}
Sample::ValueHistogram(key, value) => {
self.vhistogram.update(key, value);
}
},
}
Controller::new(self.metric_registry.clone(), self.scope_registry.clone())
}
}

View File

@ -0,0 +1,90 @@
use crate::common::{MetricIdentifier, MetricKind, MetricValue};
use crate::config::MetricConfiguration;
use crate::data::Snapshot;
use crate::registry::ScopeRegistry;
use arc_swap::{ptr_eq, ArcSwap};
use im::hashmap::HashMap;
use quanta::Clock;
use std::ops::Deref;
use std::sync::Arc;
pub(crate) struct MetricRegistry {
scope_registry: Arc<ScopeRegistry>,
metrics: ArcSwap<HashMap<MetricIdentifier, MetricValue>>,
config: MetricConfiguration,
clock: Clock,
}
impl MetricRegistry {
pub fn new(
scope_registry: Arc<ScopeRegistry>,
config: MetricConfiguration,
clock: Clock,
) -> Self {
MetricRegistry {
scope_registry,
metrics: ArcSwap::new(Arc::new(HashMap::new())),
config,
clock,
}
}
pub fn get_value_handle(&self, identifier: MetricIdentifier) -> MetricValue {
loop {
match self.metrics.lease().deref().get(&identifier) {
Some(handle) => return handle.clone(),
None => {
let kind = match &identifier {
MetricIdentifier::Unlabeled(_, _, kind) => kind,
};
let value_handle = match kind {
MetricKind::Counter => MetricValue::new_counter(),
MetricKind::Gauge => MetricValue::new_gauge(),
MetricKind::Histogram => MetricValue::new_histogram(
self.config.histogram_window,
self.config.histogram_granularity,
self.clock.clone(),
),
};
let metrics_ptr = self.metrics.lease();
let mut metrics = metrics_ptr.deref().clone();
match metrics.insert(identifier.clone(), value_handle.clone()) {
// Somebody else beat us to it, loop.
Some(_) => continue,
None => {
// If we weren't able to cleanly update the map, then try again.
let old = self
.metrics
.compare_and_swap(&metrics_ptr, Arc::new(metrics));
if !ptr_eq(old, metrics_ptr) {
continue;
}
}
}
return value_handle;
}
}
}
}
pub fn get_snapshot(&self) -> Snapshot {
let mut named_values = Vec::new();
let metrics = self.metrics.load().deref().clone();
for (identifier, value) in metrics.into_iter() {
let (name, scope_handle) = match identifier {
MetricIdentifier::Unlabeled(name, scope, _) => (name, scope),
};
let scope = self.scope_registry.get(scope_handle);
let scoped_name = scope.into_scoped(name);
let snapshot = value.snapshot();
named_values.push((scoped_name, snapshot));
}
Snapshot::from(named_values)
}
}

View File

@ -0,0 +1,5 @@
mod scope;
pub(crate) use self::scope::ScopeRegistry;
mod metric;
pub(crate) use self::metric::MetricRegistry;

View File

@ -0,0 +1,57 @@
use crate::common::{MetricScope, MetricScopeHandle};
use parking_lot::RwLock;
use std::collections::HashMap;
struct Inner {
id: u64,
forward: HashMap<MetricScope, MetricScopeHandle>,
backward: HashMap<MetricScopeHandle, MetricScope>,
}
impl Inner {
pub fn new() -> Self {
Inner {
id: 1,
forward: HashMap::new(),
backward: HashMap::new(),
}
}
}
pub(crate) struct ScopeRegistry {
inner: RwLock<Inner>,
}
impl ScopeRegistry {
pub fn new() -> Self {
Self {
inner: RwLock::new(Inner::new()),
}
}
pub fn register(&self, scope: MetricScope) -> u64 {
let mut wg = self.inner.write();
// If the key is already registered, send back the existing scope ID.
if wg.forward.contains_key(&scope) {
return wg.forward.get(&scope).cloned().unwrap();
}
// Otherwise, take the current scope ID for this registration, store it, and increment
// the scope ID counter for the next registration.
let scope_id = wg.id;
let _ = wg.forward.insert(scope.clone(), scope_id);
let _ = wg.backward.insert(scope_id, scope);
wg.id += 1;
scope_id
}
pub fn get(&self, scope_id: MetricScopeHandle) -> MetricScope {
// See if we have an entry for the scope ID, and clone the scope if so.
let rg = self.inner.read();
rg.backward
.get(&scope_id)
.cloned()
.unwrap_or(MetricScope::Root)
}
}

View File

@ -1,68 +1,71 @@
use crate::{
data::{MetricKey, Sample, ScopedKey},
helper::io_error,
receiver::MessageFrame,
scopes::Scopes,
common::{
Delta, MetricIdentifier, MetricKind, MetricName, MetricScope, MetricScopeHandle,
MetricValue,
},
data::{Counter, Gauge, Histogram},
registry::{MetricRegistry, ScopeRegistry},
};
use crossbeam_channel::Sender;
use fxhash::FxBuildHasher;
use quanta::Clock;
use std::error::Error;
use std::fmt;
use std::sync::Arc;
/// Erorrs during sink creation.
#[derive(Debug)]
type FastHashMap<K, V> = hashbrown::HashMap<K, V, FxBuildHasher>;
/// Errors during sink creation.
#[derive(Debug, Clone)]
pub enum SinkError {
/// The scope value given was invalid i.e. empty or illegal characters.
InvalidScope,
}
/// A value that can be used as a metric scope.
pub trait AsScoped<'a> {
fn as_scoped(&'a self, base: String) -> String;
impl Error for SinkError {}
impl fmt::Display for SinkError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SinkError::InvalidScope => write!(f, "given scope is invalid"),
}
}
}
/// Handle for sending metric samples into the receiver.
/// A value that can be used as a metric scope.
///
/// [`Sink`] is cloneable, and can not only send metric samples but can register and deregister
/// metric facets at any time.
/// This helper trait allows us to accept either a single string or a slice of strings to use as a
/// scope, to avoid needing to allocate in the case where we want to be able to specify multiple
/// scope levels in a single go.
pub trait AsScoped<'a> {
fn as_scoped(&'a self, base: MetricScope) -> MetricScope;
}
/// Handle for sending metric samples.
pub struct Sink {
msg_tx: Sender<MessageFrame>,
metric_registry: Arc<MetricRegistry>,
metric_cache: FastHashMap<MetricIdentifier, MetricValue>,
scope_registry: Arc<ScopeRegistry>,
scope: MetricScope,
scope_handle: MetricScopeHandle,
clock: Clock,
scopes: Arc<Scopes>,
scope: String,
scope_id: u64,
}
impl Sink {
pub(crate) fn new(
msg_tx: Sender<MessageFrame>,
metric_registry: Arc<MetricRegistry>,
scope_registry: Arc<ScopeRegistry>,
scope: MetricScope,
clock: Clock,
scopes: Arc<Scopes>,
scope: String,
) -> Sink {
let scope_id = scopes.register(scope.clone());
let scope_handle = scope_registry.register(scope.clone());
Sink {
msg_tx,
clock,
scopes,
metric_registry,
metric_cache: FastHashMap::default(),
scope_registry,
scope,
scope_id,
}
}
pub(crate) fn new_with_scope_id(
msg_tx: Sender<MessageFrame>,
clock: Clock,
scopes: Arc<Scopes>,
scope: String,
scope_id: u64,
) -> Sink {
Sink {
msg_tx,
scope_handle,
clock,
scopes,
scope,
scope_id,
}
}
@ -90,10 +93,10 @@ impl Sink {
let new_scope = scope.as_scoped(self.scope.clone());
Sink::new(
self.msg_tx.clone(),
self.clock.clone(),
self.scopes.clone(),
self.metric_registry.clone(),
self.scope_registry.clone(),
new_scope,
self.clock.clone(),
)
}
@ -102,58 +105,113 @@ impl Sink {
self.clock.now()
}
/// Records the count for a given metric.
pub fn record_count<K: Into<MetricKey>>(&self, key: K, delta: u64) {
let scoped_key = ScopedKey(self.scope_id, key.into());
self.send(Sample::Count(scoped_key, delta))
/// Records a value for a counter identified by the given name.
pub fn record_count<N: Into<MetricName>>(&mut self, name: N, value: u64) {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Counter);
let value_handle = self.get_cached_value_handle(identifier);
value_handle.update_counter(value);
}
/// Records the gauge for a given metric.
pub fn record_gauge<K: Into<MetricKey>>(&self, key: K, value: i64) {
let scoped_key = ScopedKey(self.scope_id, key.into());
self.send(Sample::Gauge(scoped_key, value))
/// Records the value for a gauge identified by the given name.
pub fn record_gauge<N: Into<MetricName>>(&mut self, name: N, value: i64) {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Gauge);
let value_handle = self.get_cached_value_handle(identifier);
value_handle.update_gauge(value);
}
/// Records the timing histogram for a given metric.
pub fn record_timing<K: Into<MetricKey>>(&self, key: K, start: u64, end: u64) {
let scoped_key = ScopedKey(self.scope_id, key.into());
self.send(Sample::TimingHistogram(scoped_key, start, end))
/// Records the value for a timing histogram identified by the given name.
///
/// Both the start and end times must be supplied, but any values that implement [`Delta`] can
/// be used which allows for raw values from [`quanta::Clock`] to be used, or measurements from
/// [`Instant::now`].
pub fn record_timing<N: Into<MetricName>, V: Delta>(&mut self, name: N, start: V, end: V) {
let value = end.delta(start);
self.record_value(name, value);
}
/// Records the value histogram for a given metric.
pub fn record_value<K: Into<MetricKey>>(&self, key: K, value: u64) {
let scoped_key = ScopedKey(self.scope_id, key.into());
self.send(Sample::ValueHistogram(scoped_key, value))
/// Records the value for a value histogram identified by the given name.
pub fn record_value<N: Into<MetricName>>(&mut self, name: N, value: u64) {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Histogram);
let value_handle = self.get_cached_value_handle(identifier);
value_handle.update_histogram(value);
}
/// Sends a raw metric sample to the receiver.
fn send(&self, sample: Sample) {
let _ = self
.msg_tx
.send(MessageFrame::Data(sample))
.map_err(|_| io_error("failed to send sample"));
/// Creates a handle to the given counter.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying counter. It is merely a proxy, so multiple handles to the same counter can be
/// held and used.
pub fn counter<N: Into<MetricName>>(&mut self, name: N) -> Counter {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Counter);
self.get_cached_value_handle(identifier).clone().into()
}
/// Creates a handle to the given gauge.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying gauge. It is merely a proxy, so multiple handles to the same gauge can be
/// held and used.
pub fn gauge<N: Into<MetricName>>(&mut self, name: N) -> Gauge {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Gauge);
self.get_cached_value_handle(identifier).clone().into()
}
/// Creates a handle to the given histogram.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying histogram. It is merely a proxy, so multiple handles to the same histogram
/// can be held and used.
pub fn histogram<N: Into<MetricName>>(&mut self, name: N) -> Histogram {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Histogram);
self.get_cached_value_handle(identifier).clone().into()
}
fn get_cached_value_handle(&mut self, identifier: MetricIdentifier) -> &MetricValue {
// This gross hack gets around lifetime rules until full NLL is stable. Without it, the
// borrow checker doesn't understand the flow control and thinks the reference lives all
// the way until the of the function, which breaks when we try to take a mutable reference
// for inserting into the handle cache.
if let Some(handle) = self.metric_cache.get(&identifier) {
return unsafe { &*(handle as *const MetricValue) };
}
let handle = self.metric_registry.get_value_handle(identifier.clone());
self.metric_cache.insert(identifier.clone(), handle);
self.metric_cache.get(&identifier).unwrap()
}
}
impl Clone for Sink {
fn clone(&self) -> Sink {
Sink {
msg_tx: self.msg_tx.clone(),
clock: self.clock.clone(),
scopes: self.scopes.clone(),
metric_registry: self.metric_registry.clone(),
metric_cache: self.metric_cache.clone(),
scope_registry: self.scope_registry.clone(),
scope: self.scope.clone(),
scope_id: self.scope_id,
scope_handle: self.scope_handle,
clock: self.clock.clone(),
}
}
}
impl<'a> AsScoped<'a> for str {
fn as_scoped(&'a self, mut base: String) -> String {
if !base.is_empty() {
base.push_str(".");
fn as_scoped(&'a self, base: MetricScope) -> MetricScope {
match base {
MetricScope::Root => {
let parts = vec![self.to_owned()];
MetricScope::Nested(parts)
}
MetricScope::Nested(mut parts) => {
parts.push(self.to_owned());
MetricScope::Nested(parts)
}
}
base.push_str(self);
base
}
}
@ -162,13 +220,17 @@ where
&'a T: AsRef<[&'b str]>,
T: 'a,
{
fn as_scoped(&'a self, mut base: String) -> String {
for item in self.as_ref() {
if !base.is_empty() {
base.push('.');
fn as_scoped(&'a self, base: MetricScope) -> MetricScope {
match base {
MetricScope::Root => {
let parts = self.as_ref().iter().map(|s| s.to_string()).collect();
MetricScope::Nested(parts)
}
MetricScope::Nested(mut parts) => {
let mut new_parts = self.as_ref().iter().map(|s| s.to_string()).collect();
parts.append(&mut new_parts);
MetricScope::Nested(parts)
}
base.push_str(item);
}
base
}
}