Merge pull request #13 from metrics-rs/refactor/event-loopless

Entirely remove the event loop and switch to pure atomics.
This commit is contained in:
Toby Lawrence 2019-06-04 18:01:16 -04:00 committed by GitHub
commit 2046be737d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 2763 additions and 951 deletions

15
azure-pipelines.yml Normal file
View File

@ -0,0 +1,15 @@
trigger: ["master"]
pr: ["master"]
jobs:
# Check the crate formatting.
- template: ci/azure-rustfmt.yml
# Actaully test the crate.
- template: ci/azure-test-stable.yml
# Test it to make sure it still works on our minimum version.
- template: ci/azure-test-minimum.yaml
# Now test it against nightly w/ ASM support.
- template: ci/azure-test-nightly.yml

27
ci/azure-install-rust.yml Normal file
View File

@ -0,0 +1,27 @@
steps:
# Linux and macOS.
- script: |
set -e
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $RUSTUP_TOOLCHAIN
echo "##vso[task.setvariable variable=PATH;]$PATH:$HOME/.cargo/bin"
env:
RUSTUP_TOOLCHAIN: ${{parameters.rust_version}}
displayName: "Install rust (*nix)"
condition: not(eq(variables['Agent.OS'], 'Windows_NT'))
# Windows.
- script: |
curl -sSf -o rustup-init.exe https://win.rustup.rs
rustup-init.exe -y --default-toolchain %RUSTUP_TOOLCHAIN%
set PATH=%PATH%;%USERPROFILE%\.cargo\bin
echo "##vso[task.setvariable variable=PATH;]%PATH%;%USERPROFILE%\.cargo\bin"
env:
RUSTUP_TOOLCHAIN: ${{parameters.rust_version}}
displayName: "Install rust (windows)"
condition: eq(variables['Agent.OS'], 'Windows_NT')
# All platforms.
- script: |
rustc -Vv
cargo -V
displayName: Query rust and cargo versions

16
ci/azure-rustfmt.yml Normal file
View File

@ -0,0 +1,16 @@
jobs:
# Check formatting
- job: rustfmt
displayName: Check rustfmt
pool:
vmImage: ubuntu-16.04
steps:
- template: azure-install-rust.yml
parameters:
rust_version: nightly
- script: |
rustup component add rustfmt
displayName: Install rustfmt
- script: |
cargo fmt --all -- --check
displayName: Check formatting

View File

@ -0,0 +1,20 @@
jobs:
- job: test_metrics_minimum
displayName: Test Metrics Minimum
strategy:
matrix:
Linux:
vmImage: ubuntu-16.04
MacOS:
vmImage: macOS-10.13
Windows:
vmImage: vs2017-win2016
pool:
vmImage: $(vmImage)
steps:
- template: azure-install-rust.yml
parameters:
rust_version: 1.34.0
- script: cargo test
displayName: cargo test

22
ci/azure-test-nightly.yml Normal file
View File

@ -0,0 +1,22 @@
jobs:
- job: test_metrics_nightly
displayName: Test Metrics Nightly
strategy:
matrix:
Linux:
vmImage: ubuntu-16.04
MacOS:
vmImage: macOS-10.13
Windows:
vmImage: vs2017-win2016
pool:
vmImage: $(vmImage)
steps:
- template: azure-install-rust.yml
parameters:
rust_version: nightly
- script: cargo test
displayName: cargo test
- script: cargo bench
displayName: cargo bench

20
ci/azure-test-stable.yml Normal file
View File

@ -0,0 +1,20 @@
jobs:
- job: test_metrics_stable
displayName: Test Metrics Stable
strategy:
matrix:
Linux:
vmImage: ubuntu-16.04
MacOS:
vmImage: macOS-10.13
Windows:
vmImage: vs2017-win2016
pool:
vmImage: $(vmImage)
steps:
- template: azure-install-rust.yml
parameters:
rust_version: stable
- script: cargo test
displayName: cargo test

View File

@ -1,6 +1,6 @@
[package]
name = "metrics-exporter-http"
version = "0.1.0"
version = "0.1.1"
authors = ["Toby Lawrence <toby@nuclearfurnace.com>"]
edition = "2018"

View File

@ -11,13 +11,13 @@
#[macro_use]
extern crate log;
use hyper::rt::run as hyper_run;
use hyper::rt::Future;
use hyper::service::service_fn;
use hyper::{Body, Response, Server};
use metrics_core::{AsyncSnapshotProvider, Recorder, Snapshot};
use std::error::Error;
use std::net::SocketAddr;
use hyper::{Body, Response, Server};
use hyper::service::service_fn;
use hyper::rt::run as hyper_run;
use hyper::rt::Future;
/// Exports metrics over HTTP.
pub struct HttpExporter<C, R> {
@ -54,7 +54,6 @@ where
}
/// Converts this exporter into a future which can be driven externally.
/// logs output on the given interval.
///
/// This starts an HTTP server on the `address` the exporter was originally configured with,
/// responding to any request with the output of the configured recorder.
@ -67,7 +66,11 @@ where
}
}
fn build_hyper_server<C, R>(controller: C, recorder: R, address: SocketAddr) -> impl Future<Item = (), Error = ()>
fn build_hyper_server<C, R>(
controller: C,
recorder: R,
address: SocketAddr,
) -> impl Future<Item = (), Error = ()>
where
C: AsyncSnapshotProvider + Clone + Send + 'static,
C::SnapshotFuture: Send + 'static,
@ -81,14 +84,15 @@ where
service_fn(move |_| {
let recorder3 = recorder2.clone();
controller2.get_snapshot_async()
controller2
.get_snapshot_async()
.then(move |result| match result {
Ok(snapshot) => {
let mut r = recorder3.clone();
snapshot.record(&mut r);
let output = r.into();
Ok(Response::new(Body::from(output)))
},
}
Err(e) => Err(e),
})
})

View File

@ -1,6 +1,6 @@
[package]
name = "metrics-recorder-prometheus"
version = "0.2.0"
version = "0.2.1"
authors = ["Toby Lawrence <toby@nuclearfurnace.com>"]
edition = "2018"
@ -14,5 +14,5 @@ documentation = "https://docs.rs/metrics-recorder-prometheus"
[dependencies]
metrics-core = { path = "../metrics-core", version = "^0.3" }
metrics-util = { path = "../metrics-util", version = "^0.1" }
metrics-util = { path = "../metrics-util", version = "^0.2" }
hdrhistogram = "^6.1"

View File

@ -1,6 +1,6 @@
[package]
name = "metrics-recorder-text"
version = "0.2.0"
version = "0.2.1"
authors = ["Toby Lawrence <toby@nuclearfurnace.com>"]
edition = "2018"
@ -14,5 +14,5 @@ documentation = "https://docs.rs/metrics-recorder-text"
[dependencies]
metrics-core = { path = "../metrics-core", version = "^0.3" }
metrics-util = { path = "../metrics-util", version = "^0.1" }
metrics-util = { path = "../metrics-util", version = "^0.2" }
hdrhistogram = "^6.1"

View File

@ -1,6 +1,6 @@
[package]
name = "metrics-util"
version = "0.1.0"
version = "0.2.0"
authors = ["Toby Lawrence <toby@nuclearfurnace.com>"]
edition = "2018"
@ -16,4 +16,19 @@ readme = "README.md"
keywords = ["metrics", "quantile", "percentile"]
[[bench]]
name = "bucket"
harness = false
[[bench]]
name = "streaming_integers"
harness = false
[dependencies]
crossbeam-epoch = "^0.7"
[dev-dependencies]
crossbeam = "^0.7"
criterion = "^0.2.9"
lazy_static = "^1.3"
rand = "^0.6"

View File

@ -0,0 +1,60 @@
#[macro_use]
extern crate criterion;
#[macro_use]
extern crate lazy_static;
use criterion::{Benchmark, Criterion, Throughput};
use metrics_util::AtomicBucket;
lazy_static! {
static ref RANDOM_INTS: Vec<u64> = vec![
21061184, 21301862, 21331592, 21457012, 21500016, 21537837, 21581557, 21620030, 21664102,
21678463, 21708437, 21751808, 21845243, 21850265, 21938879, 21971014, 22005842, 22034601,
22085552, 22101746, 22115429, 22139883, 22260209, 22270768, 22298080, 22299780, 22307659,
22354697, 22355668, 22359397, 22463872, 22496590, 22590978, 22603740, 22706352, 22820895,
22849491, 22891538, 22912955, 22919915, 22928920, 22968656, 22985992, 23033739, 23061395,
23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058,
23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769,
25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202,
26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268,
26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584,
27024044, 27057184, 23061395, 23077554, 23138588, 23185172, 23282479, 23290830, 23316844,
23386911, 23641319, 23677058, 23742930, 25350389, 25399746, 25404925, 25464391, 25478415,
25480015, 25632783, 25639769, 25645612, 25688228, 25724427, 25862192, 25954476, 25994479,
26008752, 26036460, 26038202, 26078874, 26118327, 26132679, 26207601, 26262418, 26270737,
26274860, 26431248, 26434268, 26562736, 26580134, 26593740, 26618561, 26844181, 26866971,
26907883, 27005270, 27023584, 27024044, 27057184, 23061395, 23077554, 23138588, 23185172,
23282479, 23290830, 23316844, 23386911, 23641319, 23677058, 23742930, 25350389, 25399746,
25404925, 25464391, 25478415, 25480015, 25632783, 25639769, 25645612, 25688228, 25724427,
25862192, 25954476, 25994479, 26008752, 26036460, 26038202, 26078874, 26118327, 26132679,
26207601, 26262418, 26270737, 26274860, 26431248, 26434268, 26562736, 26580134, 26593740,
26618561, 26844181, 26866971, 26907883, 27005270, 27023584, 27024044, 27057184, 23061395,
23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058,
23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769,
25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202,
26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268,
26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584,
27024044, 27057184, 27088034, 27088550, 27302898, 27353925, 27412984, 27488633, 27514155,
27558052, 27601937, 27606339, 27624514, 27680396, 27684064, 27963602, 27414982, 28450673
];
}
fn bucket_benchmark(c: &mut Criterion) {
c.bench(
"bucket",
Benchmark::new("write", |b| {
let bucket = AtomicBucket::new();
b.iter(|| {
for value in RANDOM_INTS.iter() {
bucket.push(value);
}
})
})
.throughput(Throughput::Elements(RANDOM_INTS.len() as u32)),
);
}
criterion_group!(benches, bucket_benchmark);
criterion_main!(benches);

View File

@ -0,0 +1,104 @@
#[macro_use]
extern crate criterion;
#[macro_use]
extern crate lazy_static;
use criterion::{Benchmark, Criterion, Throughput};
use metrics_util::StreamingIntegers;
use rand::{
distributions::{Distribution, Gamma},
rngs::SmallRng,
Rng, SeedableRng,
};
use std::time::Duration;
lazy_static! {
static ref NORMAL_SMALL: Vec<u64> = get_gamma_distribution(100, Duration::from_millis(200));
static ref NORMAL_MEDIUM: Vec<u64> = get_gamma_distribution(10000, Duration::from_millis(200));
static ref NORMAL_LARGE: Vec<u64> = get_gamma_distribution(1000000, Duration::from_millis(200));
static ref LINEAR_SMALL: Vec<u64> = get_linear_distribution(100);
static ref LINEAR_MEDIUM: Vec<u64> = get_linear_distribution(10000);
static ref LINEAR_LARGE: Vec<u64> = get_linear_distribution(1000000);
}
fn get_gamma_distribution(len: usize, upper_bound: Duration) -> Vec<u64> {
// Start with a seeded RNG so that we predictably regenerate our data.
let mut rng = SmallRng::seed_from_u64(len as u64);
// This Gamma distribution gets us pretty close to a typical web server response time
// distribution where there's a big peak down low, and a long tail that drops off sharply.
let gamma = Gamma::new(1.75, 1.0);
// Scale all the values by 22 million to simulate a lower bound of 22ms (but in
// nanoseconds) for all generated values.
gamma
.sample_iter(&mut rng)
.map(|n| n * upper_bound.as_nanos() as f64)
.map(|n| n as u64)
.take(len)
.collect::<Vec<u64>>()
}
fn get_linear_distribution(len: usize) -> Vec<u64> {
let mut values = Vec::new();
for i in 0..len as u64 {
values.push(i);
}
values
}
macro_rules! define_basic_benches {
($c:ident, $name:expr, $input:ident) => {
$c.bench(
$name,
Benchmark::new("compress", |b| {
b.iter_with_large_drop(|| {
let mut si = StreamingIntegers::new();
si.compress(&$input);
si
})
})
.with_function("decompress", |b| {
let mut si = StreamingIntegers::new();
si.compress(&$input);
b.iter_with_large_drop(move || si.decompress())
})
.with_function("decompress + sum", |b| {
let mut si = StreamingIntegers::new();
si.compress(&$input);
b.iter_with_large_drop(move || {
let total: u64 = si.decompress().iter().sum();
total
})
})
.with_function("decompress_with + sum", |b| {
let mut si = StreamingIntegers::new();
si.compress(&$input);
b.iter(move || {
let mut total = 0;
si.decompress_with(|batch| {
let batch_total: u64 = batch.iter().sum();
total += batch_total;
});
});
})
.throughput(Throughput::Elements($input.len() as u32)),
)
};
}
fn streaming_integer_benchmark(c: &mut Criterion) {
define_basic_benches!(c, "normal small", NORMAL_SMALL);
define_basic_benches!(c, "normal medium", NORMAL_MEDIUM);
define_basic_benches!(c, "normal large", NORMAL_LARGE);
define_basic_benches!(c, "linear small", LINEAR_SMALL);
define_basic_benches!(c, "linear medium", LINEAR_MEDIUM);
define_basic_benches!(c, "linear large", LINEAR_LARGE);
}
criterion_group!(benches, streaming_integer_benchmark);
criterion_main!(benches);

444
metrics-util/src/bucket.rs Normal file
View File

@ -0,0 +1,444 @@
use crossbeam_epoch::{pin as epoch_pin, Atomic, Guard, Owned, Shared};
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{mem, slice};
const BLOCK_SIZE: usize = 128;
/// Discrete chunk of values with atomic read/write access.
struct Block<T> {
// Write index.
write: AtomicUsize,
// Read index.
read: AtomicUsize,
// The individual slots.
slots: [UnsafeCell<T>; BLOCK_SIZE],
// The next block before this one.
prev: Atomic<Block<T>>,
}
impl<T> Block<T> {
/// Creates a new [`Block`].
pub fn new() -> Self {
Block {
write: AtomicUsize::new(0),
read: AtomicUsize::new(0),
slots: unsafe { mem::zeroed() },
prev: Atomic::null(),
}
}
/// Gets the current length of this block.
pub fn len(&self) -> usize {
self.read.load(Ordering::Acquire)
}
/// Gets a slice of the data written to this block.
pub fn data(&self) -> &[T] {
let len = self.len();
let head = self.slots[0].get();
unsafe { slice::from_raw_parts(head as *const T, len) }
}
/// Links this block to the previous block in the bucket.
pub fn set_prev(&self, prev: Shared<Block<T>>, guard: &Guard) {
match self
.prev
.compare_and_set(Shared::null(), prev, Ordering::AcqRel, guard)
{
Ok(_) => {}
Err(_) => unreachable!(),
}
}
/// Pushes a value into this block.
pub fn push(&self, value: T) -> Result<(), T> {
// Try to increment the index. If we've reached the end of the block, let the bucket know
// so it can attach another block.
let index = self.write.fetch_add(1, Ordering::AcqRel);
if index >= BLOCK_SIZE {
return Err(value);
}
// Update the slot.
unsafe {
self.slots[index].get().write(value);
}
// Scoot our read index forward.
self.read.fetch_add(1, Ordering::AcqRel);
Ok(())
}
}
unsafe impl<T> Send for Block<T> {}
unsafe impl<T> Sync for Block<T> {}
impl<T> Drop for Block<T> {
fn drop(&mut self) {
let guard = &epoch_pin();
let prev = self.prev.swap(Shared::null(), Ordering::AcqRel, guard);
if !prev.is_null() {
unsafe {
guard.defer_destroy(prev);
}
guard.flush();
}
}
}
/// An atomic bucket with snapshot capabilities.
///
/// This bucket is implemented as a singly-linked list of blocks, where each block is a small
/// buffer that can hold a handful of elements. There is no limit to how many elements can be in
/// the bucket at a time. Blocks are dynamically allocated as elements are pushed into the bucket.
///
/// Unlike a queue, buckets cannot be drained element by element: callers must iterate the whole
/// structure. Reading the bucket happens in reverse, to allow writers to make forward progress
/// without affecting the iteration of the previously-written values.
///
/// The bucket can be cleared while a concurrent snapshot is taking place, and will not affect the
/// reader.
#[derive(Debug)]
pub struct AtomicBucket<T> {
tail: Atomic<Block<T>>,
}
impl<T> AtomicBucket<T> {
/// Creates a new, empty bucket.
pub fn new() -> Self {
AtomicBucket {
tail: Atomic::null(),
}
}
/// Pushes an element into the bucket.
pub fn push(&self, value: T) {
let mut original = value;
loop {
// Load the tail block, or install a new one.
let guard = &epoch_pin();
let mut tail = self.tail.load(Ordering::Acquire, guard);
if tail.is_null() {
// No blocks at all yet. We need to create one.
match self.tail.compare_and_set(
Shared::null(),
Owned::new(Block::new()),
Ordering::AcqRel,
guard,
) {
// We won the race to install the new block.
Ok(ptr) => tail = ptr,
// Somebody else beat us, so just update our pointer.
Err(e) => tail = e.current,
}
}
// We have a block now, so we need to try writing to it.
let tail_block = unsafe { tail.deref() };
match tail_block.push(original) {
// If the push was OK, then the block wasn't full. It might _now_ be full, but we'll
// let future callers deal with installing a new block if necessary.
Ok(_) => return,
// The block was full, so we've been given the value back and we need to install a new block.
Err(value) => {
match self.tail.compare_and_set(
tail,
Owned::new(Block::new()),
Ordering::AcqRel,
guard,
) {
// We managed to install the block, so we need to link this new block to
// the previous block.
Ok(ptr) => {
let new_tail = unsafe { ptr.deref() };
new_tail.set_prev(tail, guard);
// Now push into our new block.
match new_tail.push(value) {
// We wrote the value successfully, so we're good here!
Ok(_) => return,
// The block was full, so just loop and start over.
Err(value) => {
original = value;
continue;
}
}
}
// Somebody else installed the block before us, so let's just start over.
Err(_) => {
original = value;
continue;
}
}
}
}
}
}
/// Collects all of the elements written to the bucket.
///
/// This operation can be slow as it involves allocating enough space to hold all of the
/// elements within the bucket. Consider [`data_with`](AtomicBucket::data_with) to incrementally iterate
/// the internal blocks within the bucket.
///
/// Elements are in partial reverse order: blocks are iterated in reverse order, but the
/// elements within them will appear in their original order.
pub fn data(&self) -> Vec<T>
where
T: Clone,
{
let mut values = Vec::new();
self.data_with(|block| values.extend_from_slice(block));
values
}
/// Iterates all of the elements written to the bucket, invoking `f` for each block.
///
/// Elements are in partial reverse order: blocks are iterated in reverse order, but the
/// elements within them will appear in their original order.
pub fn data_with<F>(&self, mut f: F)
where
F: FnMut(&[T]),
{
let guard = &epoch_pin();
// While we have a valid block -- either `tail` or the next block as we keep reading -- we
// load the data from each block and process it by calling `f`.
let mut block_ptr = self.tail.load(Ordering::Acquire, guard);
while !block_ptr.is_null() {
let block = unsafe { block_ptr.deref() };
// Read the data out of the block.
let data = block.data();
f(data);
// Load the next block.
block_ptr = block.prev.load(Ordering::Acquire, guard);
}
}
/// Clears the bucket.
///
/// Deallocation of the internal blocks happens only when all readers have finished, and so
/// will not necessarily occur during or immediately preceding this method.
///
/// # Note
/// This method will not affect reads that are already in progress.
pub fn clear(&self) {
// We simply swap the tail pointer which effectively clears the bucket. Callers might
// still be in process of writing to the tail node, or reading the data, but new callers
// will see it as empty until another write proceeds.
let guard = &epoch_pin();
let tail = self.tail.load(Ordering::Acquire, guard);
if !tail.is_null() {
if self
.tail
.compare_and_set(tail, Shared::null(), Ordering::SeqCst, guard)
.is_ok()
{
// We won the swap to delete the tail node. Now configure a deferred drop to clean
// things up once nobody else is using it.
unsafe {
// Drop the block, which will cause a cascading drop on the next block, and
// so on and so forth, until all blocks linked to this one are dropped.
guard.defer_destroy(tail);
}
guard.flush();
}
}
}
}
#[cfg(test)]
mod tests {
use super::{AtomicBucket, Block, BLOCK_SIZE};
#[test]
fn test_create_new_block() {
let block: Block<u64> = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
}
#[test]
fn test_block_write_then_read() {
let block = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
let result = block.push(42);
assert!(result.is_ok());
let data = block.data();
assert_eq!(data.len(), 1);
assert_eq!(data[0], 42);
}
#[test]
fn test_block_write_until_full_then_read() {
let block = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 {
assert!(block.push(i).is_ok());
total += i;
i += 1;
}
let data = block.data();
assert_eq!(data.len(), BLOCK_SIZE);
let sum: u64 = data.iter().sum();
assert_eq!(sum, total);
let result = block.push(42);
assert!(result.is_err());
}
#[test]
fn test_block_write_until_full_then_read_mt() {
let block = Block::new();
assert_eq!(block.len(), 0);
let data = block.data();
assert_eq!(data.len(), 0);
let res = crossbeam::scope(|s| {
let t1 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 / 2 {
assert!(block.push(i).is_ok());
total += i;
i += 1;
}
total
});
let t2 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 / 2 {
assert!(block.push(i).is_ok());
total += i;
i += 1;
}
total
});
let t1_total = t1.join().unwrap();
let t2_total = t2.join().unwrap();
t1_total + t2_total
});
let total = res.unwrap();
let data = block.data();
assert_eq!(data.len(), BLOCK_SIZE);
let sum: u64 = data.iter().sum();
assert_eq!(sum, total);
let result = block.push(42);
assert!(result.is_err());
}
#[test]
fn test_bucket_write_then_read() {
let bucket = AtomicBucket::new();
bucket.push(42);
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 1);
assert_eq!(snapshot[0], 42);
}
#[test]
fn test_bucket_multiple_blocks_write_then_read() {
let bucket = AtomicBucket::new();
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 0);
let target = (BLOCK_SIZE * 3 + BLOCK_SIZE / 2) as u64;
let mut i = 0;
let mut total = 0;
while i < target {
bucket.push(i);
total += i;
i += 1;
}
let snapshot = bucket.data();
assert_eq!(snapshot.len(), target as usize);
let sum: u64 = snapshot.iter().sum();
assert_eq!(sum, total);
}
#[test]
fn test_bucket_write_then_read_mt() {
let bucket = AtomicBucket::new();
let snapshot = bucket.data();
assert_eq!(snapshot.len(), 0);
let res = crossbeam::scope(|s| {
let t1 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 * 10_000 {
bucket.push(i);
total += i;
i += 1;
}
total
});
let t2 = s.spawn(|_| {
let mut i = 0;
let mut total = 0;
while i < BLOCK_SIZE as u64 * 10_000 {
bucket.push(i);
total += i;
i += 1;
}
total
});
let t1_total = t1.join().unwrap();
let t2_total = t2.join().unwrap();
t1_total + t2_total
});
let total = res.unwrap();
let snapshot = bucket.data();
assert_eq!(snapshot.len(), BLOCK_SIZE * 20_000);
let sum: u64 = snapshot.iter().sum();
assert_eq!(sum, total);
}
}

View File

@ -1,45 +1,9 @@
//! Helper types and functions used within the metrics ecosystem.
mod bucket;
pub use bucket::AtomicBucket;
/// A quantile that has both the raw value and a human-friendly display label.
#[derive(Clone)]
pub struct Quantile(f64, String);
mod streaming;
pub use streaming::StreamingIntegers;
impl Quantile {
/// Creates a new `Quantile` from a floating-point value.
///
/// All values clamped between 0.0 and 1.0.
pub fn new(quantile: f64) -> Quantile {
let clamped = quantile.max(0.0);
let clamped = clamped.min(1.0);
let display = clamped * 100.0;
let raw_label = format!("{}", clamped);
let label = match raw_label.as_str() {
"0" => "min".to_string(),
"1" => "max".to_string(),
_ => {
let raw = format!("p{}", display);
raw.replace(".", "")
},
};
Quantile(clamped, label)
}
/// Gets the human-friendly display label for this quantile.
pub fn label(&self) -> &str {
self.1.as_str()
}
/// Gets the raw value for this quantile.
pub fn value(&self) -> f64 {
self.0
}
}
/// Parses a list of floating-point values into a list of `Quantile`s.
pub fn parse_quantiles(quantiles: &[f64]) -> Vec<Quantile> {
quantiles.iter()
.map(|f| Quantile::new(*f))
.collect()
}
mod quantile;
pub use quantile::{parse_quantiles, Quantile};

View File

@ -0,0 +1,102 @@
/// A quantile that has both the raw value and a human-friendly display label.
///
/// We work with quantiles for optimal floating-point precison over percentiles, but most of the
/// time, monitoring systems show us percentiles, and usually in an abbreviated form: `p99`.
///
/// On top of holding the quantile value, we calculate the familiar "p99" style of label, doing the
/// appropriate percentile conversion. Thus, if you have a quantile of `0.99`, the resulting label
/// is `p99`, and if you have a quantile of `0.999`, the resulting label is `p999`.
///
/// There are two special cases, where we label `0.0` and `1.0` as `min` and `max`, respectively.
#[derive(Debug, Clone, PartialEq)]
pub struct Quantile(f64, String);
impl Quantile {
/// Creates a new [`Quantile`] from a floating-point value.
///
/// All values are clamped between 0.0 and 1.0.
pub fn new(quantile: f64) -> Quantile {
let clamped = quantile.max(0.0);
let clamped = clamped.min(1.0);
let display = clamped * 100.0;
let raw_label = format!("{}", clamped);
let label = match raw_label.as_str() {
"0" => "min".to_string(),
"1" => "max".to_string(),
_ => {
let raw = format!("p{}", display);
raw.replace(".", "")
}
};
Quantile(clamped, label)
}
/// Gets the human-friendly display label.
pub fn label(&self) -> &str {
self.1.as_str()
}
/// Gets the raw quantile value.
pub fn value(&self) -> f64 {
self.0
}
}
/// Parses a slice of floating-point values into a vector of [`Quantile`]s.
pub fn parse_quantiles(quantiles: &[f64]) -> Vec<Quantile> {
quantiles.iter().map(|f| Quantile::new(*f)).collect()
}
#[cfg(test)]
mod tests {
use super::{parse_quantiles, Quantile};
#[test]
fn test_quantiles() {
let min = Quantile::new(0.0);
assert_eq!(min.value(), 0.0);
assert_eq!(min.label(), "min");
let max = Quantile::new(1.0);
assert_eq!(max.value(), 1.0);
assert_eq!(max.label(), "max");
let p99 = Quantile::new(0.99);
assert_eq!(p99.value(), 0.99);
assert_eq!(p99.label(), "p99");
let p999 = Quantile::new(0.999);
assert_eq!(p999.value(), 0.999);
assert_eq!(p999.label(), "p999");
let p9999 = Quantile::new(0.9999);
assert_eq!(p9999.value(), 0.9999);
assert_eq!(p9999.label(), "p9999");
let under = Quantile::new(-1.0);
assert_eq!(under.value(), 0.0);
assert_eq!(under.label(), "min");
let over = Quantile::new(1.2);
assert_eq!(over.value(), 1.0);
assert_eq!(over.label(), "max");
}
#[test]
fn test_parse_quantiles() {
let empty = vec![];
let result = parse_quantiles(&empty);
assert_eq!(result.len(), 0);
let normal = vec![0.0, 0.5, 0.99, 0.999, 1.0];
let result = parse_quantiles(&normal);
assert_eq!(result.len(), 5);
assert_eq!(result[0], Quantile::new(0.0));
assert_eq!(result[1], Quantile::new(0.5));
assert_eq!(result[2], Quantile::new(0.99));
assert_eq!(result[3], Quantile::new(0.999));
assert_eq!(result[4], Quantile::new(1.0));
}
}

View File

@ -0,0 +1,295 @@
use std::slice;
/// A compressed set of integers.
///
/// For some workloads, working with a large set of integers can require an outsized amount of
/// memory for numbers that are very similar. This data structure takes chunks of integers and
/// compresses then by using delta encoding and variable-byte encoding.
///
/// Delta encoding tracks the difference between successive integers: if you have 1000000 and
/// 1000001, the difference between the two is only 1. Coupled with variable-byte encoding, we can
/// compress those two numbers within 4 bytes, where normally they would require a minimum of 8
/// bytes if they were 32-bit integers, or 16 bytes if they were 64-bit integers. Over large runs
/// of integers where the delta is relatively small compared to the original value, the compression
/// savings add up quickly.
///
/// The original integers can be decompressed and collected, or can be decompressed on-the-fly
/// while passing them to a given function, allowing callers to observe the integers without
/// allocating the entire size of the decompressed set.
///
/// # Performance
/// As this is a scalar implementation, performance depends heavily on not only the input size, but
/// also the delta between values, as well as whether or not the decompressed values are being
/// collected or used on-the-fly.
///
/// Bigger deltas between values means longer variable-byte sizes which is hard for the CPU to
/// predict. As the linear benchemarks show, things are much faster when the delta between values
/// is minimal.
///
/// These figures were generated on a 2015 Macbook Pro (Core i7, 2.2GHz base/3.7GHz turbo).
///
/// | | compress (1) | decompress (2) | decompress/sum (3) | decompress_with/sum (4) |
/// |------------------------|--------------|----------------|--------------------|-------------------------|
/// | normal, 100 values | 94 Melem/s | 76 Melem/s | 71 Melem/s | 126 Melem/s |
/// | normal, 10000 values | 92 Melem/s | 85 Melem/s | 109 Melem/s | 109 Melem/s |
/// | normal, 1000000 values | 86 Melem/s | 79 Melem/s | 68 Melem/s | 110 Melem/s |
/// | linear, 100 values | 334 Melem/s | 109 Melem/s | 110 Melem/s | 297 Melem/s |
/// | linear, 10000 values | 654 Melem/s | 174 Melem/s | 374 Melem/s | 390 Melem/s |
/// | linear, 1000000 values | 703 Melem/s | 180 Melem/s | 132 Melem/s | 392 Melem/s |
///
/// The normal values consistent of an approximation of real nanosecond-based timing measurements
/// of a web service. The linear values are simply sequential integers ranging from 0 to the
/// configured size of the test run.
///
/// Operations:
/// 1. simply compress the input set, no decompression
/// 2. decompress the entire compressed set into a single vector
/// 3. same as #2 but sum all of the original values at the end
/// 4. use `decompress_with` to sum the numbers incrementally
#[derive(Debug, Default, Clone)]
pub struct StreamingIntegers {
inner: Vec<u8>,
len: usize,
last: Option<i64>,
}
impl StreamingIntegers {
/// Creates a new, empty streaming set.
pub fn new() -> Self {
Default::default()
}
/// Returns the number of elements in the set, also referred to as its 'length'.
pub fn len(&self) -> usize {
self.len
}
/// Returns `true` if the set contains no elements.
pub fn is_empty(&self) -> bool {
self.len == 0
}
/// Compresses a slice of integers, and adds them to the set.
pub fn compress(&mut self, src: &[u64]) {
let src_len = src.len();
if src_len == 0 {
return;
}
self.len += src_len;
// Technically, 64-bit integers can take up to 10 bytes when encoded as variable integers
// if they're at the maximum size, so we need to properly allocate here. As we directly
// operate on a mutable slice of the inner buffer below, we _can't_ afford to lazily
// allocate or guess at the resulting compression, otherwise we'll get a panic at runtime
// for bounds checks.
//
// TODO: we should try and add some heuristic here, because we're potentially
// overallocating by a lot when we plan for the worst case scenario
self.inner.reserve(src_len * 10);
let mut buf_idx = self.inner.len();
let buf_cap = self.inner.capacity();
let mut buf = unsafe {
let buf_ptr = self.inner.as_mut_ptr();
slice::from_raw_parts_mut(buf_ptr, buf_cap)
};
// If we have no last value, then the very first integer we write is the full value and not
// a delta value.
let mut src_idx = 0;
if self.last.is_none() {
let first = src[src_idx] as i64;
self.last = Some(first);
let zigzag = zigzag_encode(first);
buf_idx = vbyte_encode(zigzag, &mut buf, buf_idx);
src_idx += 1;
}
// Set up for our actual compression run.
let mut last = self.last.unwrap();
while src_idx < src_len {
let value = src[src_idx] as i64;
let diff = value - last;
let zigzag = zigzag_encode(diff);
buf_idx = vbyte_encode(zigzag, &mut buf, buf_idx);
last = value;
src_idx += 1;
}
unsafe {
self.inner.set_len(buf_idx);
}
self.last = Some(last);
}
/// Decompresses all of the integers written to the set.
///
/// Returns a vector with all of the original values. For larger sets of integers, this can be
/// slow due to the allocation required. Consider [decompress_with] to incrementally iterate
/// the decompresset set in smaller chunks.
///
/// [decompress_with]: StreamingIntegers::decompress_with
pub fn decompress(&self) -> Vec<u64> {
let mut values = Vec::new();
let mut buf_idx = 0;
let buf_len = self.inner.len();
let buf = self.inner.as_slice();
let mut last = 0;
while buf_idx < buf_len {
let (value, new_idx) = vbyte_decode(&buf, buf_idx);
buf_idx = new_idx;
let delta = zigzag_decode(value);
let original = last + delta;
last = original;
values.push(original as u64);
}
values
}
/// Decompresses all of the integers written to the set, invoking `f` for each batch.
///
/// During decompression, values are batched internally until a limit is reached, and then `f`
/// is called with a reference to the batch. This leads to minimal allocation to decompress
/// the entire set, for use cases where the values can be observed incrementally without issue.
pub fn decompress_with<F>(&self, mut f: F)
where
F: FnMut(&[u64]),
{
let mut values = Vec::with_capacity(1024);
let mut buf_idx = 0;
let buf_len = self.inner.len();
let buf = self.inner.as_slice();
let mut last = 0;
while buf_idx < buf_len {
let (value, new_idx) = vbyte_decode(&buf, buf_idx);
buf_idx = new_idx;
let delta = zigzag_decode(value);
let original = last + delta;
last = original;
values.push(original as u64);
if values.len() == values.capacity() {
f(&values);
values.clear();
}
}
if !values.is_empty() {
f(&values);
}
}
}
#[inline]
fn zigzag_encode(input: i64) -> u64 {
((input << 1) ^ (input >> 63)) as u64
}
#[inline]
fn zigzag_decode(input: u64) -> i64 {
((input >> 1) as i64) ^ (-((input & 1) as i64))
}
#[inline]
fn vbyte_encode(mut input: u64, buf: &mut [u8], mut buf_idx: usize) -> usize {
while input >= 128 {
buf[buf_idx] = 0x80 as u8 | (input as u8 & 0x7F);
buf_idx += 1;
input >>= 7;
}
buf[buf_idx] = input as u8;
buf_idx + 1
}
#[inline]
fn vbyte_decode(buf: &[u8], mut buf_idx: usize) -> (u64, usize) {
let mut tmp = 0;
let mut factor = 0;
loop {
tmp |= u64::from(buf[buf_idx] & 0x7F) << (7 * factor);
if buf[buf_idx] & 0x80 != 0x80 {
return (tmp, buf_idx + 1);
}
buf_idx += 1;
factor += 1;
}
}
#[cfg(test)]
mod tests {
use super::StreamingIntegers;
#[test]
fn test_streaming_integers_new() {
let si = StreamingIntegers::new();
let decompressed = si.decompress();
assert_eq!(decompressed.len(), 0);
}
#[test]
fn test_streaming_integers_single_block() {
let mut si = StreamingIntegers::new();
let decompressed = si.decompress();
assert_eq!(decompressed.len(), 0);
let values = vec![8, 6, 7, 5, 3, 0, 9];
si.compress(&values);
let decompressed = si.decompress();
assert_eq!(decompressed, values);
}
#[test]
fn test_streaming_integers_multiple_blocks() {
let mut si = StreamingIntegers::new();
let decompressed = si.decompress();
assert_eq!(decompressed.len(), 0);
let values = vec![8, 6, 7, 5, 3, 0, 9];
si.compress(&values);
let values2 = vec![6, 6, 6];
si.compress(&values2);
let values3 = vec![];
si.compress(&values3);
let values4 = vec![6, 6, 6, 7, 7, 7, 8, 8, 8];
si.compress(&values4);
let total = vec![values, values2, values3, values4]
.into_iter()
.flatten()
.collect::<Vec<_>>();
let decompressed = si.decompress();
assert_eq!(decompressed, total);
}
#[test]
fn test_streaming_integers_empty_block() {
let mut si = StreamingIntegers::new();
let decompressed = si.decompress();
assert_eq!(decompressed.len(), 0);
let values = vec![];
si.compress(&values);
let decompressed = si.decompress();
assert_eq!(decompressed.len(), 0);
}
}

View File

@ -1,6 +1,6 @@
[package]
name = "metrics"
version = "0.9.1"
version = "0.10.0"
authors = ["Toby Lawrence <toby@nuclearfurnace.com>"]
edition = "2018"
@ -26,15 +26,21 @@ default = ["exporters", "recorders"]
exporters = ["metrics-exporter-log", "metrics-exporter-http"]
recorders = ["metrics-recorder-text", "metrics-recorder-prometheus"]
[[bench]]
name = "histogram"
harness = false
[dependencies]
metrics-core = { path = "../metrics-core", version = "^0.3" }
crossbeam-channel = "^0.3"
metrics-util = { path = "../metrics-util", version = "^0.2" }
im = "^12"
fxhash = "^0.2"
arc-swap = "^0.3"
parking_lot = "^0.7"
fnv = "^1.0"
hashbrown = "^0.1"
quanta = "^0.2"
hashbrown = "^0.3"
quanta = "^0.3"
futures = "^0.1"
tokio-sync = "^0.1"
crossbeam-utils = "^0.6"
metrics-exporter-log = { path = "../metrics-exporter-log", version = "^0.2", optional = true }
metrics-exporter-http = { path = "../metrics-exporter-http", version = "^0.1", optional = true }
metrics-recorder-text = { path = "../metrics-recorder-text", version = "^0.2", optional = true }
@ -45,3 +51,5 @@ log = "^0.4"
env_logger = "^0.6"
getopts = "^0.2"
hdrhistogram = "^6.1"
criterion = "^0.2.9"
lazy_static = "^1.3"

View File

@ -0,0 +1,77 @@
#[macro_use]
extern crate criterion;
#[macro_use]
extern crate lazy_static;
use criterion::{Benchmark, Criterion, Throughput};
use metrics::data::AtomicWindowedHistogram;
use quanta::{Builder as UpkeepBuilder, Clock, Handle as UpkeepHandle};
use std::time::Duration;
lazy_static! {
static ref QUANTA_UPKEEP: UpkeepHandle = {
let builder = UpkeepBuilder::new(Duration::from_millis(10));
let handle = builder
.start()
.expect("failed to start quanta upkeep thread");
handle
};
static ref RANDOM_INTS: Vec<u64> = vec![
21061184, 21301862, 21331592, 21457012, 21500016, 21537837, 21581557, 21620030, 21664102,
21678463, 21708437, 21751808, 21845243, 21850265, 21938879, 21971014, 22005842, 22034601,
22085552, 22101746, 22115429, 22139883, 22260209, 22270768, 22298080, 22299780, 22307659,
22354697, 22355668, 22359397, 22463872, 22496590, 22590978, 22603740, 22706352, 22820895,
22849491, 22891538, 22912955, 22919915, 22928920, 22968656, 22985992, 23033739, 23061395,
23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058,
23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769,
25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202,
26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268,
26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584,
27024044, 27057184, 23061395, 23077554, 23138588, 23185172, 23282479, 23290830, 23316844,
23386911, 23641319, 23677058, 23742930, 25350389, 25399746, 25404925, 25464391, 25478415,
25480015, 25632783, 25639769, 25645612, 25688228, 25724427, 25862192, 25954476, 25994479,
26008752, 26036460, 26038202, 26078874, 26118327, 26132679, 26207601, 26262418, 26270737,
26274860, 26431248, 26434268, 26562736, 26580134, 26593740, 26618561, 26844181, 26866971,
26907883, 27005270, 27023584, 27024044, 27057184, 23061395, 23077554, 23138588, 23185172,
23282479, 23290830, 23316844, 23386911, 23641319, 23677058, 23742930, 25350389, 25399746,
25404925, 25464391, 25478415, 25480015, 25632783, 25639769, 25645612, 25688228, 25724427,
25862192, 25954476, 25994479, 26008752, 26036460, 26038202, 26078874, 26118327, 26132679,
26207601, 26262418, 26270737, 26274860, 26431248, 26434268, 26562736, 26580134, 26593740,
26618561, 26844181, 26866971, 26907883, 27005270, 27023584, 27024044, 27057184, 23061395,
23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058,
23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769,
25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202,
26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268,
26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584,
27024044, 27057184, 27088034, 27088550, 27302898, 27353925, 27412984, 27488633, 27514155,
27558052, 27601937, 27606339, 27624514, 27680396, 27684064, 27963602, 27414982, 28450673
];
}
fn bucket_benchmark(c: &mut Criterion) {
// Trigger the quanta upkeep thread to spawn and start updating the time.
let _handle = &QUANTA_UPKEEP;
c.bench(
"histogram",
Benchmark::new("record", |b| {
let clock = Clock::new();
let bucket = AtomicWindowedHistogram::new(
Duration::from_secs(1),
Duration::from_millis(100),
clock,
);
b.iter(|| {
for value in RANDOM_INTS.iter() {
bucket.record(*value);
}
})
})
.throughput(Throughput::Elements(RANDOM_INTS.len() as u32)),
);
}
criterion_group!(benches, bucket_benchmark);
criterion_main!(benches);

View File

@ -8,53 +8,126 @@ extern crate metrics_core;
use getopts::Options;
use hdrhistogram::Histogram;
use metrics::{snapshot::TypedMeasurement, Receiver, Sink};
use metrics_core::SnapshotProvider;
use metrics::{Receiver, Sink};
use metrics_core::{Recorder, Snapshot, SnapshotProvider};
use quanta::Clock;
use std::{
env,
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
thread,
time::{Duration, Instant},
};
const LOOP_SAMPLE: u64 = 1000;
struct Generator {
stats: Sink,
t0: Option<u64>,
gauge: i64,
hist: Histogram<u64>,
done: Arc<AtomicBool>,
rate_counter: Arc<AtomicU64>,
clock: Clock,
}
impl Generator {
fn new(stats: Sink, done: Arc<AtomicBool>) -> Generator {
fn new(
stats: Sink,
done: Arc<AtomicBool>,
rate_counter: Arc<AtomicU64>,
clock: Clock,
) -> Generator {
Generator {
stats,
t0: None,
gauge: 0,
hist: Histogram::<u64>::new_with_bounds(1, u64::max_value(), 3).unwrap(),
done,
rate_counter,
clock,
}
}
fn run(&mut self) {
let mut counter = 0;
loop {
counter += 1;
if self.done.load(Ordering::Relaxed) {
break;
}
self.gauge += 1;
let t1 = self.stats.now();
if let Some(t0) = self.t0 {
let start = self.stats.now();
let start = if counter % 33 == 0 {
self.stats.now()
} else {
0
};
let _ = self.stats.record_count("ok", 1);
let _ = self.stats.record_timing("ok", t0, t1);
let _ = self.stats.record_gauge("total", self.gauge);
let delta = self.stats.now() - start;
self.hist.saturating_record(delta);
if start != 0 {
let delta = self.stats.now() - start;
self.hist.saturating_record(delta);
// We also increment our global counter for the sample rate here.
self.rate_counter
.fetch_add(LOOP_SAMPLE * 3, Ordering::AcqRel);
}
}
self.t0 = Some(t1);
}
}
fn run_cached(&mut self) {
let mut counter = 0;
let counter_handle = self.stats.counter("ok");
let timing_handle = self.stats.histogram("ok");
let gauge_handle = self.stats.gauge("total");
loop {
counter += 1;
if self.done.load(Ordering::Relaxed) {
break;
}
self.gauge += 1;
let t1 = self.clock.recent();
if let Some(t0) = self.t0 {
let start = if counter % LOOP_SAMPLE == 0 {
self.stats.now()
} else {
0
};
let _ = counter_handle.record(1);
let _ = timing_handle.record_timing(t0, t1);
let _ = gauge_handle.record(self.gauge);
if start != 0 {
let delta = self.stats.now() - start;
self.hist.saturating_record(delta);
// We also increment our global counter for the sample rate here.
self.rate_counter
.fetch_add(LOOP_SAMPLE * 3, Ordering::AcqRel);
}
}
self.t0 = Some(t1);
}
}
@ -89,18 +162,6 @@ pub fn opts() -> Options {
"INTEGER",
);
opts.optopt("p", "producers", "number of producers", "INTEGER");
opts.optopt(
"c",
"capacity",
"maximum number of unprocessed items",
"INTEGER",
);
opts.optopt(
"b",
"batch-size",
"maximum number of items in a batch",
"INTEGER",
);
opts.optflag("h", "help", "print this help menu");
opts
@ -134,31 +195,19 @@ fn main() {
.unwrap_or_else(|| "60".to_owned())
.parse()
.unwrap();
let capacity = matches
.opt_str("capacity")
.unwrap_or_else(|| "1024".to_owned())
.parse()
.unwrap();
let batch_size = matches
.opt_str("batch-size")
.unwrap_or_else(|| "256".to_owned())
.parse()
.unwrap();
let producers = matches
.opt_str("producers")
.unwrap_or_else(|| "1".to_owned())
.parse()
.unwrap();
info!("duration: {}s", seconds);
info!("producers: {}", producers);
info!("capacity: {}", capacity);
info!("batch size: {}", batch_size);
let mut receiver = Receiver::builder()
.capacity(capacity)
.batch_size(batch_size)
.histogram(Duration::from_secs(5), Duration::from_secs(1))
.build();
let receiver = Receiver::builder()
.histogram(Duration::from_secs(5), Duration::from_millis(100))
.build()
.expect("failed to build receiver");
let sink = receiver.get_sink();
let sink = sink.scoped(&["alpha", "pools", "primary"]);
@ -167,13 +216,17 @@ fn main() {
// Spin up our sample producers.
let done = Arc::new(AtomicBool::new(false));
let rate_counter = Arc::new(AtomicU64::new(0));
let mut handles = Vec::new();
let clock = Clock::new();
for _ in 0..producers {
let s = sink.clone();
let d = done.clone();
let r = rate_counter.clone();
let c = clock.clone();
let handle = thread::spawn(move || {
Generator::new(s, d).run();
Generator::new(s, d, r, c).run_cached();
});
handles.push(handle);
@ -182,10 +235,6 @@ fn main() {
// Spin up the sink and let 'er rip.
let controller = receiver.get_controller();
thread::spawn(move || {
receiver.run();
});
// Poll the controller to figure out the sample rate.
let mut total = 0;
let mut t0 = Instant::now();
@ -195,22 +244,11 @@ fn main() {
let t1 = Instant::now();
let start = Instant::now();
let snapshot = controller.get_snapshot();
let snapshot = controller.get_snapshot().unwrap();
let end = Instant::now();
snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64);
let turn_total = snapshot
.unwrap()
.into_measurements()
.iter()
.fold(0, |acc, m| {
acc + match m {
TypedMeasurement::Counter(_key, value) => *value,
TypedMeasurement::Gauge(_key, value) => *value as u64,
_ => 0,
}
});
let turn_total = rate_counter.load(Ordering::Acquire);
let turn_delta = turn_total - total;
total = turn_total;
let rate = turn_delta as f64 / (duration_as_nanos(t1 - t0) / 1_000_000_000.0);
@ -239,6 +277,34 @@ fn main() {
}
}
struct TotalRecorder {
total: u64,
}
impl TotalRecorder {
pub fn new() -> Self {
Self { total: 0 }
}
pub fn total(&self) -> u64 {
self.total
}
}
impl Recorder for TotalRecorder {
fn record_counter<K: AsRef<str>>(&mut self, _key: K, value: u64) {
self.total += value;
}
fn record_gauge<K: AsRef<str>>(&mut self, _key: K, value: i64) {
self.total += value as u64;
}
fn record_histogram<K: AsRef<str>>(&mut self, _key: K, values: &[u64]) {
self.total += values.len() as u64;
}
}
fn duration_as_nanos(d: Duration) -> f64 {
(d.as_secs() as f64 * 1e9) + d.subsec_nanos() as f64
}

View File

@ -0,0 +1,202 @@
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate getopts;
extern crate hdrhistogram;
use getopts::Options;
use hdrhistogram::Histogram;
use quanta::Clock;
use std::{
env,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
thread,
time::{Duration, Instant},
};
struct Generator {
counter: Arc<AtomicU64>,
clock: Clock,
hist: Histogram<u64>,
done: Arc<AtomicBool>,
}
impl Generator {
fn new(counter: Arc<AtomicU64>, done: Arc<AtomicBool>) -> Generator {
Generator {
counter,
clock: Clock::new(),
hist: Histogram::<u64>::new_with_bounds(1, u64::max_value(), 3).unwrap(),
done,
}
}
fn run(&mut self) {
let mut counter = 0;
loop {
if self.done.load(Ordering::Relaxed) {
break;
}
let start = if counter % 100 == 0 {
self.clock.now()
} else {
0
};
counter = self.counter.fetch_add(1, Ordering::AcqRel);
if start != 0 {
let delta = self.clock.now() - start;
self.hist.saturating_record(delta);
}
}
}
}
impl Drop for Generator {
fn drop(&mut self) {
info!(
" sender latency: min: {:9} p50: {:9} p95: {:9} p99: {:9} p999: {:9} max: {:9}",
nanos_to_readable(self.hist.min()),
nanos_to_readable(self.hist.value_at_percentile(50.0)),
nanos_to_readable(self.hist.value_at_percentile(95.0)),
nanos_to_readable(self.hist.value_at_percentile(99.0)),
nanos_to_readable(self.hist.value_at_percentile(99.9)),
nanos_to_readable(self.hist.max())
);
}
}
fn print_usage(program: &str, opts: &Options) {
let brief = format!("Usage: {} [options]", program);
print!("{}", opts.usage(&brief));
}
pub fn opts() -> Options {
let mut opts = Options::new();
opts.optopt(
"d",
"duration",
"number of seconds to run the benchmark",
"INTEGER",
);
opts.optopt("p", "producers", "number of producers", "INTEGER");
opts.optflag("h", "help", "print this help menu");
opts
}
fn main() {
env_logger::init();
let args: Vec<String> = env::args().collect();
let program = &args[0];
let opts = opts();
let matches = match opts.parse(&args[1..]) {
Ok(m) => m,
Err(f) => {
error!("Failed to parse command line args: {}", f);
return;
}
};
if matches.opt_present("help") {
print_usage(program, &opts);
return;
}
info!("metrics benchmark");
// Build our sink and configure the facets.
let seconds = matches
.opt_str("duration")
.unwrap_or_else(|| "60".to_owned())
.parse()
.unwrap();
let producers = matches
.opt_str("producers")
.unwrap_or_else(|| "1".to_owned())
.parse()
.unwrap();
info!("duration: {}s", seconds);
info!("producers: {}", producers);
// Spin up our sample producers.
let counter = Arc::new(AtomicU64::new(0));
let done = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
for _ in 0..producers {
let c = counter.clone();
let d = done.clone();
let handle = thread::spawn(move || {
Generator::new(c, d).run();
});
handles.push(handle);
}
// Poll the controller to figure out the sample rate.
let mut total = 0;
let mut t0 = Instant::now();
let mut snapshot_hist = Histogram::<u64>::new_with_bounds(1, u64::max_value(), 3).unwrap();
for _ in 0..seconds {
let t1 = Instant::now();
let start = Instant::now();
let turn_total = counter.load(Ordering::Acquire);
let end = Instant::now();
snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64);
let turn_delta = turn_total - total;
total = turn_total;
let rate = turn_delta as f64 / (duration_as_nanos(t1 - t0) / 1_000_000_000.0);
info!("sample ingest rate: {:.0} samples/sec", rate);
t0 = t1;
thread::sleep(Duration::new(1, 0));
}
info!("--------------------------------------------------------------------------------");
info!(" ingested samples total: {}", total);
info!(
"snapshot retrieval: min: {:9} p50: {:9} p95: {:9} p99: {:9} p999: {:9} max: {:9}",
nanos_to_readable(snapshot_hist.min()),
nanos_to_readable(snapshot_hist.value_at_percentile(50.0)),
nanos_to_readable(snapshot_hist.value_at_percentile(95.0)),
nanos_to_readable(snapshot_hist.value_at_percentile(99.0)),
nanos_to_readable(snapshot_hist.value_at_percentile(99.9)),
nanos_to_readable(snapshot_hist.max())
);
// Wait for the producers to finish so we can get their stats too.
done.store(true, Ordering::SeqCst);
for handle in handles {
let _ = handle.join();
}
}
fn duration_as_nanos(d: Duration) -> f64 {
(d.as_secs() as f64 * 1e9) + d.subsec_nanos() as f64
}
fn nanos_to_readable(t: u64) -> String {
let f = t as f64;
if f < 1_000.0 {
format!("{}ns", f)
} else if f < 1_000_000.0 {
format!("{:.0}μs", f / 1_000.0)
} else if f < 2_000_000_000.0 {
format!("{:.2}ms", f / 1_000_000.0)
} else {
format!("{:.3}s", f / 1_000_000_000.0)
}
}

98
metrics/src/builder.rs Normal file
View File

@ -0,0 +1,98 @@
use crate::{config::Configuration, Receiver};
use std::error::Error;
use std::fmt;
use std::time::Duration;
/// Errors during receiver creation.
#[derive(Debug, Clone)]
pub enum BuilderError {
/// Failed to spawn the upkeep thread.
///
/// As histograms are windowed, reads and writes require getting the current time so they can
/// perform the required maintenance, or upkeep, on the internal structures to roll over old
/// buckets, etc.
///
/// Acquiring the current time is fast compared to most operations, but is a significant
/// portion of the other time it takes to write to a histogram, which limits overall throughput
/// under high load.
///
/// We spin up a background thread, or the "upkeep thread", which updates a global time source
/// that the read and write operations exclusively rely on. While this source is not as
/// up-to-date as the real clock, it is much faster to access.
UpkeepFailure,
#[doc(hidden)]
_NonExhaustive,
}
impl Error for BuilderError {}
impl fmt::Display for BuilderError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
BuilderError::UpkeepFailure => write!(f, "failed to spawn quanta upkeep thread"),
BuilderError::_NonExhaustive => write!(f, "non-exhaustive matching"),
}
}
}
/// Builder for [`Receiver`].
#[derive(Clone)]
pub struct Builder {
pub(crate) histogram_window: Duration,
pub(crate) histogram_granularity: Duration,
pub(crate) upkeep_interval: Duration,
}
impl Default for Builder {
fn default() -> Self {
Self {
histogram_window: Duration::from_secs(10),
histogram_granularity: Duration::from_secs(1),
upkeep_interval: Duration::from_millis(50),
}
}
}
impl Builder {
/// Creates a new [`Builder`] with default values.
pub fn new() -> Self {
Default::default()
}
/// Sets the histogram configuration.
///
/// Defaults to a 10 second window with 1 second granularity.
///
/// This controls both how long of a time window we track histogram data for, and the
/// granularity in which we roll off old data.
///
/// As an example, with the default values, we would keep the last 10 seconds worth of
/// histogram data, and would remove 1 seconds worth of data at a time as the window rolled
/// forward.
pub fn histogram(mut self, window: Duration, granularity: Duration) -> Self {
self.histogram_window = window;
self.histogram_granularity = granularity;
self
}
/// Sets the upkeep interval.
///
/// Defaults to 50 milliseconds.
///
/// This controls how often the time source, used internally by histograms, is updated with the
/// real time. For performance reasons, histograms use a sampled time source when they perform
/// checks to see if internal maintenance needs to occur. If the histogram granularity is set
/// very low, then this interval might need to be similarly reduced to make sure we're able to
/// update the time more often than histograms need to perform upkeep.
pub fn upkeep_interval(mut self, interval: Duration) -> Self {
self.upkeep_interval = interval;
self
}
/// Create a [`Receiver`] based on this configuration.
pub fn build(self) -> Result<Receiver, BuilderError> {
let config = Configuration::from_builder(&self);
Receiver::from_config(config)
}
}

242
metrics/src/common.rs Normal file
View File

@ -0,0 +1,242 @@
use crate::data::AtomicWindowedHistogram;
use metrics_util::StreamingIntegers;
use quanta::Clock;
use std::borrow::Cow;
use std::ops::Deref;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
/// Optimized metric name.
///
/// This can either be a [`&'static str`](str) or [`String`].
pub type MetricName = Cow<'static, str>;
/// A scope, or context, for a metric.
///
/// Not interacted with directly by end users, and only exposed due to a lack of trait method
/// visbility controls.
///
/// See also: [Sink::scoped](crate::Sink::scoped).
#[derive(PartialEq, Eq, Hash, Clone)]
pub enum MetricScope {
/// Root scope.
Root,
/// A nested scope, with arbitrarily deep nesting.
Nested(Vec<String>),
}
impl MetricScope {
pub(crate) fn into_scoped(self, name: MetricName) -> String {
match self {
MetricScope::Root => name.to_string(),
MetricScope::Nested(mut parts) => {
if !name.is_empty() {
parts.push(name.to_string());
}
parts.join(".")
}
}
}
}
pub(crate) type MetricScopeHandle = u64;
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub(crate) enum MetricKind {
Counter,
Gauge,
Histogram,
}
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub(crate) enum MetricIdentifier {
Unlabeled(MetricName, MetricScopeHandle, MetricKind),
}
#[derive(Debug)]
enum ValueState {
Counter(AtomicU64),
Gauge(AtomicI64),
Histogram(AtomicWindowedHistogram),
}
#[derive(Debug)]
pub(crate) enum ValueSnapshot {
Counter(u64),
Gauge(i64),
Histogram(StreamingIntegers),
}
#[derive(Clone, Debug)]
/// Handle to the underlying measurement for a metric.
pub(crate) struct MetricValue {
state: Arc<ValueState>,
}
impl MetricValue {
fn new(state: ValueState) -> Self {
MetricValue {
state: Arc::new(state),
}
}
pub fn counter() -> Self {
Self::new(ValueState::Counter(AtomicU64::new(0)))
}
pub fn gauge() -> Self {
Self::new(ValueState::Gauge(AtomicI64::new(0)))
}
pub fn histogram(window: Duration, granularity: Duration, clock: Clock) -> Self {
Self::new(ValueState::Histogram(AtomicWindowedHistogram::new(
window,
granularity,
clock,
)))
}
pub fn update_counter(&self, value: u64) {
match self.state.deref() {
ValueState::Counter(inner) => {
inner.fetch_add(value, Ordering::Release);
}
_ => unreachable!("tried to access as counter, not a counter"),
}
}
pub fn update_gauge(&self, value: i64) {
match self.state.deref() {
ValueState::Gauge(inner) => inner.store(value, Ordering::Release),
_ => unreachable!("tried to access as gauge, not a gauge"),
}
}
pub fn update_histogram(&self, value: u64) {
match self.state.deref() {
ValueState::Histogram(inner) => inner.record(value),
_ => unreachable!("tried to access as histogram, not a histogram"),
}
}
pub fn snapshot(&self) -> ValueSnapshot {
match self.state.deref() {
ValueState::Counter(inner) => {
let value = inner.load(Ordering::Acquire);
ValueSnapshot::Counter(value)
}
ValueState::Gauge(inner) => {
let value = inner.load(Ordering::Acquire);
ValueSnapshot::Gauge(value)
}
ValueState::Histogram(inner) => {
let stream = inner.snapshot();
ValueSnapshot::Histogram(stream)
}
}
}
}
/// Trait for types that represent time and can be subtracted from each other to generate a delta.
pub trait Delta {
/// Get the delta between this value and another value.
///
/// For `Instant`, we explicitly return the nanosecond difference. For `u64`, we return the
/// integer difference, but the timescale itself can be whatever the user desires.
fn delta(&self, other: Self) -> u64;
}
impl Delta for u64 {
fn delta(&self, other: u64) -> u64 {
self.wrapping_sub(other)
}
}
impl Delta for Instant {
fn delta(&self, other: Instant) -> u64 {
let dur = *self - other;
dur.as_nanos() as u64
}
}
#[cfg(test)]
mod tests {
use super::{MetricScope, MetricValue, ValueSnapshot};
use quanta::Clock;
use std::time::Duration;
#[test]
fn test_metric_scope() {
let root_scope = MetricScope::Root;
assert_eq!(root_scope.into_scoped("".into()), "".to_string());
let root_scope = MetricScope::Root;
assert_eq!(
root_scope.into_scoped("jambalaya".into()),
"jambalaya".to_string()
);
let nested_scope = MetricScope::Nested(vec![]);
assert_eq!(nested_scope.into_scoped("".into()), "".to_string());
let nested_scope = MetricScope::Nested(vec![]);
assert_eq!(
nested_scope.into_scoped("toilet".into()),
"toilet".to_string()
);
let nested_scope = MetricScope::Nested(vec![
"chamber".to_string(),
"of".to_string(),
"secrets".to_string(),
]);
assert_eq!(
nested_scope.into_scoped("".into()),
"chamber.of.secrets".to_string()
);
let nested_scope = MetricScope::Nested(vec![
"chamber".to_string(),
"of".to_string(),
"secrets".to_string(),
]);
assert_eq!(
nested_scope.into_scoped("toilet".into()),
"chamber.of.secrets.toilet".to_string()
);
}
#[test]
fn test_metric_values() {
let counter = MetricValue::counter();
counter.update_counter(42);
match counter.snapshot() {
ValueSnapshot::Counter(value) => assert_eq!(value, 42),
_ => panic!("incorrect value snapshot type for counter"),
}
let gauge = MetricValue::gauge();
gauge.update_gauge(23);
match gauge.snapshot() {
ValueSnapshot::Gauge(value) => assert_eq!(value, 23),
_ => panic!("incorrect value snapshot type for gauge"),
}
let (mock, _) = Clock::mock();
let histogram =
MetricValue::histogram(Duration::from_secs(10), Duration::from_secs(1), mock);
histogram.update_histogram(8675309);
histogram.update_histogram(5551212);
match histogram.snapshot() {
ValueSnapshot::Histogram(stream) => {
assert_eq!(stream.len(), 2);
let values = stream.decompress();
assert_eq!(&values[..], [8675309, 5551212]);
}
_ => panic!("incorrect value snapshot type for histogram"),
}
}
}

20
metrics/src/config.rs Normal file
View File

@ -0,0 +1,20 @@
use crate::Builder;
use std::time::Duration;
/// Holds the configuration for complex metric types.
#[derive(Clone, Debug)]
pub(crate) struct Configuration {
pub histogram_window: Duration,
pub histogram_granularity: Duration,
pub upkeep_interval: Duration,
}
impl Configuration {
pub fn from_builder(builder: &Builder) -> Self {
Self {
histogram_window: builder.histogram_window,
histogram_granularity: builder.histogram_granularity,
upkeep_interval: builder.upkeep_interval,
}
}
}

View File

@ -1,84 +0,0 @@
use crate::receiver::Receiver;
use std::time::Duration;
/// A configuration builder for [`Receiver`].
#[derive(Clone)]
pub struct Configuration {
pub(crate) capacity: usize,
pub(crate) batch_size: usize,
pub(crate) histogram_window: Duration,
pub(crate) histogram_granularity: Duration,
}
impl Default for Configuration {
fn default() -> Configuration {
Configuration {
capacity: 512,
batch_size: 64,
histogram_window: Duration::from_secs(10),
histogram_granularity: Duration::from_secs(1),
}
}
}
impl Configuration {
/// Creates a new [`Configuration`] with default values.
pub fn new() -> Configuration {
Default::default()
}
/// Sets the buffer capacity.
///
/// Defaults to 512.
///
/// This controls the size of the channel used to send metrics. This channel is shared amongst
/// all active sinks. If this channel is full when sending a metric, that send will be blocked
/// until the channel has free space.
///
/// Tweaking this value allows for a trade-off between low memory consumption and throughput
/// burst capabilities. By default, we expect samples to occupy approximately 64 bytes. Thus,
/// at our default value, we preallocate roughly ~32KB.
///
/// Generally speaking, sending and processing metrics is fast enough that the default value of
/// 512 supports millions of samples per second.
pub fn capacity(mut self, capacity: usize) -> Self {
self.capacity = capacity;
self
}
/// Sets the batch size.
///
/// Defaults to 64.
///
/// This controls the size of message batches that we collect for processing. The only real
/// reason to tweak this is to control the latency from the sender side. Larger batches lower
/// the ingest latency in the face of high metric ingest pressure at the cost of higher ingest
/// tail latencies.
///
/// Long story short, you shouldn't need to change this, but it's here if you really do.
pub fn batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
/// Sets the histogram configuration.
///
/// Defaults to a 10 second window with 1 second granularity.
///
/// This controls both how long of a time window we track histogram data for, and the
/// granularity in which we roll off old data.
///
/// As an example, with the default values, we would keep the last 10 seconds worth of
/// histogram data, and would remove 1 seconds worth of data at a time as the window rolled
/// forward.
pub fn histogram(mut self, window: Duration, granularity: Duration) -> Self {
self.histogram_window = window;
self.histogram_granularity = granularity;
self
}
/// Create a [`Receiver`] based on this configuration.
pub fn build(self) -> Receiver {
Receiver::from_config(self)
}
}

View File

@ -1,19 +1,19 @@
use super::data::snapshot::Snapshot;
use crossbeam_channel::{bounded, Sender};
use crate::data::Snapshot;
use crate::registry::{MetricRegistry, ScopeRegistry};
use futures::prelude::*;
use metrics_core::{AsyncSnapshotProvider, SnapshotProvider};
use std::error::Error;
use std::fmt;
use tokio_sync::oneshot;
use std::sync::Arc;
/// Error conditions when retrieving a snapshot.
/// Error during snapshot retrieval.
#[derive(Debug, Clone)]
pub enum SnapshotError {
/// There was an internal error when trying to collect a snapshot.
InternalError,
/// The future was polled again after returning the snapshot.
AlreadyUsed,
/// A snapshot was requested but the receiver is shutdown.
ReceiverShutdown,
#[doc(hidden)]
_NonExhaustive,
}
impl Error for SnapshotError {}
@ -21,33 +21,33 @@ impl Error for SnapshotError {}
impl fmt::Display for SnapshotError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SnapshotError::InternalError => write!(f, "internal error while collecting snapshot"),
SnapshotError::ReceiverShutdown => write!(f, "receiver is shutdown"),
SnapshotError::AlreadyUsed => write!(f, "snapshot already returned from future"),
SnapshotError::_NonExhaustive => write!(f, "non-exhaustive matching"),
}
}
}
/// Various control actions performed by a controller.
pub(crate) enum ControlFrame {
/// Takes a snapshot of the current metric state.
Snapshot(Sender<Snapshot>),
/// Takes a snapshot of the current metric state, but uses an asynchronous channel.
SnapshotAsync(oneshot::Sender<Snapshot>),
}
/// Dedicated handle for performing operations on a running [`Receiver`](crate::receiver::Receiver).
/// Handle for acquiring snapshots.
///
/// The caller is able to request metric snapshots at any time without requiring mutable access to
/// the sink. This all flows through the existing control mechanism, and so is very fast.
/// `Controller` is [`metrics-core`]-compatible as a snapshot provider, both for synchronous and
/// asynchronous snapshotting.
///
/// [`metrics-core`]: https://docs.rs/metrics-core
#[derive(Clone)]
pub struct Controller {
control_tx: Sender<ControlFrame>,
metric_registry: Arc<MetricRegistry>,
scope_registry: Arc<ScopeRegistry>,
}
impl Controller {
pub(crate) fn new(control_tx: Sender<ControlFrame>) -> Controller {
Controller { control_tx }
pub(crate) fn new(
metric_registry: Arc<MetricRegistry>,
scope_registry: Arc<ScopeRegistry>,
) -> Controller {
Controller {
metric_registry,
scope_registry,
}
}
}
@ -57,13 +57,8 @@ impl SnapshotProvider for Controller {
/// Gets a snapshot.
fn get_snapshot(&self) -> Result<Snapshot, SnapshotError> {
let (tx, rx) = bounded(0);
let msg = ControlFrame::Snapshot(tx);
self.control_tx
.send(msg)
.map_err(|_| SnapshotError::ReceiverShutdown)
.and_then(move |_| rx.recv().map_err(|_| SnapshotError::InternalError))
let snapshot = self.metric_registry.get_snapshot();
Ok(snapshot)
}
}
@ -74,20 +69,22 @@ impl AsyncSnapshotProvider for Controller {
/// Gets a snapshot asynchronously.
fn get_snapshot_async(&self) -> Self::SnapshotFuture {
let (tx, rx) = oneshot::channel();
let msg = ControlFrame::SnapshotAsync(tx);
self.control_tx
.send(msg)
.map(move |_| SnapshotFuture::Waiting(rx))
.unwrap_or(SnapshotFuture::Errored(SnapshotError::ReceiverShutdown))
let snapshot = self.metric_registry.get_snapshot();
SnapshotFuture::new(snapshot)
}
}
/// A future representing collecting a snapshot.
pub enum SnapshotFuture {
Waiting(oneshot::Receiver<Snapshot>),
Errored(SnapshotError),
pub struct SnapshotFuture {
snapshot: Option<Snapshot>,
}
impl SnapshotFuture {
pub fn new(snapshot: Snapshot) -> Self {
SnapshotFuture {
snapshot: Some(snapshot),
}
}
}
impl Future for SnapshotFuture {
@ -95,9 +92,9 @@ impl Future for SnapshotFuture {
type Error = SnapshotError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
SnapshotFuture::Waiting(rx) => rx.poll().map_err(|_| SnapshotError::InternalError),
SnapshotFuture::Errored(err) => Err(err.clone()),
}
self.snapshot
.take()
.ok_or(SnapshotError::AlreadyUsed)
.map(Async::Ready)
}
}

View File

@ -1,44 +1,19 @@
use crate::data::ScopedKey;
use fnv::FnvBuildHasher;
use hashbrown::HashMap;
use crate::common::MetricValue;
pub(crate) struct Counter {
data: HashMap<ScopedKey, u64, FnvBuildHasher>,
/// Proxy object to update a counter.
pub struct Counter {
handle: MetricValue,
}
impl Counter {
pub fn new() -> Counter {
Counter {
data: HashMap::default(),
}
}
pub fn update(&mut self, key: ScopedKey, delta: u64) {
let value = self.data.entry(key).or_insert(0);
*value = value.wrapping_add(delta);
}
pub fn values(&self) -> Vec<(ScopedKey, u64)> {
self.data.iter().map(|(k, v)| (k.clone(), *v)).collect()
/// Records a value for the counter.
pub fn record(&self, value: u64) {
self.handle.update_counter(value);
}
}
#[cfg(test)]
mod tests {
use super::{Counter, ScopedKey};
#[test]
fn test_counter_simple_update() {
let mut counter = Counter::new();
let key = ScopedKey(0, "foo".into());
counter.update(key, 42);
let key2 = ScopedKey(0, "foo".to_owned().into());
counter.update(key2, 31);
let values = counter.values();
assert_eq!(values.len(), 1);
assert_eq!(values[0].1, 73);
impl From<MetricValue> for Counter {
fn from(handle: MetricValue) -> Self {
Self { handle }
}
}

View File

@ -1,48 +1,19 @@
use crate::data::ScopedKey;
use fnv::FnvBuildHasher;
use hashbrown::HashMap;
use crate::common::MetricValue;
pub(crate) struct Gauge {
data: HashMap<ScopedKey, i64, FnvBuildHasher>,
/// Proxy object to update a gauge.
pub struct Gauge {
handle: MetricValue,
}
impl Gauge {
pub fn new() -> Gauge {
Gauge {
data: HashMap::default(),
}
}
pub fn update(&mut self, key: ScopedKey, value: i64) {
let ivalue = self.data.entry(key).or_insert(0);
*ivalue = value;
}
pub fn values(&self) -> Vec<(ScopedKey, i64)> {
self.data.iter().map(|(k, v)| (k.clone(), *v)).collect()
/// Records a value for the gauge.
pub fn record(&self, value: i64) {
self.handle.update_gauge(value);
}
}
#[cfg(test)]
mod tests {
use super::{Gauge, ScopedKey};
#[test]
fn test_gauge_simple_update() {
let mut gauge = Gauge::new();
let key = ScopedKey(0, "foo".into());
gauge.update(key, 42);
let values = gauge.values();
assert_eq!(values.len(), 1);
assert_eq!(values[0].1, 42);
let key2 = ScopedKey(0, "foo".to_owned().into());
gauge.update(key2, 43);
let values = gauge.values();
assert_eq!(values.len(), 1);
assert_eq!(values[0].1, 43);
impl From<MetricValue> for Gauge {
fn from(handle: MetricValue) -> Self {
Self { handle }
}
}

View File

@ -1,212 +1,371 @@
use crate::{data::ScopedKey, helper::duration_as_nanos};
use fnv::FnvBuildHasher;
use hashbrown::HashMap;
use std::time::{Duration, Instant};
use crate::common::{Delta, MetricValue};
use crate::helper::duration_as_nanos;
use crossbeam_utils::Backoff;
use metrics_util::{AtomicBucket, StreamingIntegers};
use quanta::Clock;
use std::cmp;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::Duration;
pub(crate) struct Histogram {
window: Duration,
granularity: Duration,
data: HashMap<ScopedKey, WindowedRawHistogram, FnvBuildHasher>,
/// Proxy object to update a histogram.
pub struct Histogram {
handle: MetricValue,
}
impl Histogram {
pub fn new(window: Duration, granularity: Duration) -> Histogram {
Histogram {
window,
granularity,
data: HashMap::default(),
}
/// Records a timing for the histogram.
pub fn record_timing<D: Delta>(&self, start: D, end: D) {
let value = end.delta(start);
self.handle.update_histogram(value);
}
pub fn update(&mut self, key: ScopedKey, value: u64) {
if let Some(wh) = self.data.get_mut(&key) {
wh.update(value);
} else {
let mut wh = WindowedRawHistogram::new(self.window, self.granularity);
wh.update(value);
let _ = self.data.insert(key, wh);
}
}
pub fn upkeep(&mut self, at: Instant) {
for (_, histogram) in self.data.iter_mut() {
histogram.upkeep(at);
}
}
pub fn values(&self) -> Vec<(ScopedKey, HistogramSnapshot)> {
self.data
.iter()
.map(|(k, v)| (k.clone(), v.snapshot()))
.collect()
/// Records a value for the histogram.
pub fn record_value(&self, value: u64) {
self.handle.update_histogram(value);
}
}
pub(crate) struct WindowedRawHistogram {
buckets: Vec<Vec<u64>>,
num_buckets: usize,
bucket_index: usize,
last_upkeep: Instant,
granularity: Duration,
impl From<MetricValue> for Histogram {
fn from(handle: MetricValue) -> Self {
Self { handle }
}
}
impl WindowedRawHistogram {
pub fn new(window: Duration, granularity: Duration) -> WindowedRawHistogram {
let num_buckets =
((duration_as_nanos(window) / duration_as_nanos(granularity)) as usize) + 1;
let mut buckets = Vec::with_capacity(num_buckets);
/// An atomic windowed histogram.
///
/// This histogram provides a windowed view of values that rolls forward over time, dropping old
/// values as they exceed the window of the histogram. Writes into the histogram are lock-free, as
/// well as snapshots of the histogram.
#[derive(Debug)]
pub struct AtomicWindowedHistogram {
buckets: Vec<AtomicBucket<u64>>,
bucket_count: usize,
granularity: u64,
upkeep_index: AtomicUsize,
index: AtomicUsize,
next_upkeep: AtomicU64,
clock: Clock,
}
for _ in 0..num_buckets {
let histogram = Vec::new();
buckets.push(histogram);
impl AtomicWindowedHistogram {
/// Creates a new [`AtomicWindowedHistogram`].
///
/// Internally, a number of buckets will be created, based on how many times `granularity` goes
/// into `window`. As time passes, buckets will be cleared to avoid values older than the
/// `window` duration.
///
/// As buckets will hold values represneting a period of time up to `granularity`, the
/// granularity can be lowered or raised to roll values off more precisely, or less precisely,
/// against the provided clock.
///
/// # Panics
/// Panics if `granularity` is larger than `window`.
pub fn new(window: Duration, granularity: Duration, clock: Clock) -> Self {
let window_ns = duration_as_nanos(window);
let granularity_ns = duration_as_nanos(granularity);
assert!(window_ns > granularity_ns);
let now = clock.recent();
let bucket_count = ((window_ns / granularity_ns) as usize) + 1;
let mut buckets = Vec::new();
for _ in 0..bucket_count {
buckets.push(AtomicBucket::new());
}
WindowedRawHistogram {
let next_upkeep = now + granularity_ns;
AtomicWindowedHistogram {
buckets,
num_buckets,
bucket_index: 0,
last_upkeep: Instant::now(),
granularity,
bucket_count,
granularity: granularity_ns,
upkeep_index: AtomicUsize::new(0),
index: AtomicUsize::new(0),
next_upkeep: AtomicU64::new(next_upkeep),
clock,
}
}
pub fn upkeep(&mut self, at: Instant) {
if at >= self.last_upkeep + self.granularity {
self.bucket_index += 1;
self.bucket_index %= self.num_buckets;
self.buckets[self.bucket_index].clear();
self.last_upkeep = at;
/// Takes a snapshot of the current histogram.
///
/// Returns a [`StreamingIntegers`] value, representing all observed values in the
/// histogram. As writes happen concurrently, along with buckets being cleared, a snapshot is
/// not guaranteed to have all values present at the time the method was called.
pub fn snapshot(&self) -> StreamingIntegers {
// Run upkeep to make sure our window reflects any time passage since the last write.
let index = self.upkeep();
let mut streaming = StreamingIntegers::new();
// Start from the bucket ahead of the currently-being-written-to-bucket so that we outrace
// any upkeep and get access to more of the data.
for i in 0..self.bucket_count {
let bucket_index = (index + i + 1) % self.bucket_count;
let bucket = &self.buckets[bucket_index];
bucket.data_with(|block| streaming.compress(block));
}
streaming
}
pub fn update(&mut self, value: u64) {
self.buckets[self.bucket_index].push(value);
/// Records a value to the histogram.
pub fn record(&self, value: u64) {
let index = self.upkeep();
self.buckets[index].push(value);
}
pub fn snapshot(&self) -> HistogramSnapshot {
let mut aggregate = Vec::new();
for bucket in &self.buckets {
aggregate.extend_from_slice(&bucket);
fn upkeep(&self) -> usize {
let backoff = Backoff::new();
loop {
// Start by figuring out if the histogram needs to perform upkeep.
let now = self.clock.recent();
let next_upkeep = self.next_upkeep.load(Ordering::Acquire);
if now <= next_upkeep {
let index = self.index.load(Ordering::Acquire);
let actual_index = index % self.bucket_count;
return actual_index;
}
// We do need to perform upkeep, but someone *else* might actually be doing it already,
// so go ahead and wait until the index is caught up with the upkeep index: the upkeep
// index will be ahead of index until upkeep is complete.
let mut upkeep_in_progress = false;
let mut index = 0;
loop {
index = self.index.load(Ordering::Acquire);
let upkeep_index = self.upkeep_index.load(Ordering::Acquire);
if index == upkeep_index {
break;
}
upkeep_in_progress = true;
backoff.snooze();
}
// If we waited for another upkeep operation to complete, then there's the chance that
// enough time has passed that we're due for upkeep again, so restart our loop.
if upkeep_in_progress {
continue;
}
// Figure out how many buckets, up to the maximum, need to be cleared based on the
// delta between the target upkeep time and the actual time. We always clear at least
// one bucket, but may need to clear them all.
let delta = now - next_upkeep;
let bucket_depth = cmp::min((delta / self.granularity) as usize, self.bucket_count) + 1;
// Now that we we know how many buckets we need to clear, update the index to pointer
// writers at the next bucket past the last one that we will be clearing.
let new_index = index + bucket_depth;
let prev_index = self
.index
.compare_and_swap(index, new_index, Ordering::SeqCst);
if prev_index == index {
// Clear the target bucket first, and then update the upkeep target time so new
// writers can proceed. We may still have other buckets to clean up if we had
// multiple rounds worth of upkeep to do, but this will let new writes proceed as
// soon as possible.
let clear_index = new_index % self.bucket_count;
self.buckets[clear_index].clear();
let now = self.clock.now();
let next_upkeep = now + self.granularity;
self.next_upkeep.store(next_upkeep, Ordering::Release);
// Now that we've cleared the actual bucket that writers will use going forward, we
// have to clear any older buckets that we skipped over. If our granularity was 1
// second, and we skipped over 4 seconds worth of buckets, we would still have
// 3 buckets to clear, etc.
let last_index = new_index - 1;
while index < last_index {
index += 1;
let clear_index = index % self.bucket_count;
self.buckets[clear_index].clear();
}
// We've cleared the old buckets, so upkeep is done. Push our upkeep index forward
// so that writers who were blocked waiting for upkeep to conclude can restart.
self.upkeep_index.store(new_index, Ordering::Release);
}
}
HistogramSnapshot::new(aggregate)
}
}
/// A point-in-time snapshot of a single histogram.
#[derive(Debug, PartialEq, Eq)]
pub struct HistogramSnapshot {
values: Vec<u64>,
}
impl HistogramSnapshot {
pub(crate) fn new(values: Vec<u64>) -> Self {
HistogramSnapshot { values }
}
/// Gets the raw values that compromise the entire histogram.
pub fn values(&self) -> &Vec<u64> {
&self.values
}
}
#[cfg(test)]
mod tests {
use super::{Histogram, ScopedKey, WindowedRawHistogram};
use std::time::{Duration, Instant};
use super::{AtomicWindowedHistogram, Clock};
use crossbeam_utils::thread;
use std::time::Duration;
#[test]
fn test_histogram_simple_update() {
let mut histogram = Histogram::new(Duration::new(5, 0), Duration::new(1, 0));
let (clock, _ctl) = Clock::mock();
let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock);
let key = ScopedKey(0, "foo".into());
histogram.update(key, 1245);
h.record(1245);
let values = histogram.values();
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 1);
let values = snapshot.decompress();
assert_eq!(values.len(), 1);
let hdr = &values[0].1;
assert_eq!(hdr.values().len(), 1);
assert_eq!(hdr.values().get(0).unwrap(), &1245);
assert_eq!(values.get(0).unwrap(), &1245);
}
#[test]
fn test_histogram_complex_update() {
let mut histogram = Histogram::new(Duration::new(5, 0), Duration::new(1, 0));
let (clock, _ctl) = Clock::mock();
let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock);
let key = ScopedKey(0, "foo".into());
histogram.update(key.clone(), 1245);
histogram.update(key.clone(), 213);
histogram.update(key.clone(), 1022);
histogram.update(key, 1248);
h.record(1245);
h.record(213);
h.record(1022);
h.record(1248);
let values = histogram.values();
assert_eq!(values.len(), 1);
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 4);
let hdr = &values[0].1;
assert_eq!(hdr.values().len(), 4);
assert_eq!(hdr.values().get(0).unwrap(), &1245);
assert_eq!(hdr.values().get(1).unwrap(), &213);
assert_eq!(hdr.values().get(2).unwrap(), &1022);
assert_eq!(hdr.values().get(3).unwrap(), &1248);
let values = snapshot.decompress();
assert_eq!(values.len(), 4);
assert_eq!(values.get(0).unwrap(), &1245);
assert_eq!(values.get(1).unwrap(), &213);
assert_eq!(values.get(2).unwrap(), &1022);
assert_eq!(values.get(3).unwrap(), &1248);
}
#[test]
fn test_windowed_histogram_rollover() {
let mut wh = WindowedRawHistogram::new(Duration::new(5, 0), Duration::new(1, 0));
let now = Instant::now();
let (clock, ctl) = Clock::mock();
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 0);
// Set our granularity at right below a second, so that when we when add a second, we don't
// land on the same exact value, and our "now" time should always be ahead of the upkeep
// time when we expect it to be.
let h =
AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_millis(999), clock);
wh.update(1);
wh.update(2);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 2);
// Histogram is empty, snapshot is empty.
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 0);
// Immediately add two values, and observe the histogram and snapshot having two values.
h.record(1);
h.record(2);
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 2);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 3);
// Roll forward 3 seconds, should still have everything.
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 2);
ctl.increment(Duration::from_secs(3));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 2);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 3);
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 2);
// Roll forward 1 second, should still have everything.
ctl.increment(Duration::from_secs(1));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 2);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 3);
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 2);
// Roll forward 1 second, should still have everything.
ctl.increment(Duration::from_secs(1));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 2);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 3);
// Pump in some new values.
wh.update(3);
wh.update(4);
wh.update(5);
// Pump in some new values. We should have a total of 5 values now.
h.record(3);
h.record(4);
h.record(5);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 5);
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 5);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 15);
// Roll forward 3 seconds, and make sure the first two values are gone.
// You might think this should be 2 seconds, but we have one extra bucket
// allocated so that there's always a clear bucket that we can write into.
// This means we have more than our total window, but only having the exact
// number of buckets would mean we were constantly missing a bucket's worth
// of granularity.
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 5);
// Roll forward 6 seconds, in increments. The first one rolls over a single bucket, and
// cleans bucket #0, the first one we wrote to. The second and third ones get us right up
// to the last three values, and then clear them out.
ctl.increment(Duration::from_secs(1));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 3);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 12);
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 5);
ctl.increment(Duration::from_secs(4));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 3);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 12);
let now = now + Duration::new(1, 0);
wh.upkeep(now);
let snapshot = wh.snapshot();
assert_eq!(snapshot.values().len(), 3);
ctl.increment(Duration::from_secs(1));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 0);
// We should also be able to advance by vast periods of time and observe not only old
// values going away but no weird overflow issues or index or anything. This ensures that
// our upkeep code functions not just for under-load single bucket rollovers but also "been
// idle for a while and just got a write" scenarios.
h.record(42);
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 1);
let total: u64 = snapshot.decompress().iter().sum();
assert_eq!(total, 42);
ctl.increment(Duration::from_secs(1000));
let snapshot = h.snapshot();
assert_eq!(snapshot.len(), 0);
}
#[test]
fn test_histogram_write_gauntlet_mt() {
let clock = Clock::new();
let clock2 = clock.clone();
let target = clock.now() + Duration::from_secs(5).as_nanos() as u64;
let h = AtomicWindowedHistogram::new(
Duration::from_secs(20),
Duration::from_millis(500),
clock,
);
thread::scope(|s| {
let t1 = s.spawn(|_| {
let mut total = 0;
while clock2.now() < target {
h.record(42);
total += 1;
}
total
});
let t2 = s.spawn(|_| {
let mut total = 0;
while clock2.now() < target {
h.record(42);
total += 1;
}
total
});
let t3 = s.spawn(|_| {
let mut total = 0;
while clock2.now() < target {
h.record(42);
total += 1;
}
total
});
let t1_total = t1.join().expect("thread 1 panicked during test");
let t2_total = t2.join().expect("thread 2 panicked during test");
let t3_total = t3.join().expect("thread 3 panicked during test");
let total = t1_total + t2_total + t3_total;
let snap = h.snapshot();
assert_eq!(total, snap.len());
})
.unwrap();
}
}

View File

@ -1,73 +1,12 @@
use std::{
borrow::Cow,
fmt::{self, Display},
};
//! Core data types for metrics.
mod counter;
pub use counter::Counter;
pub mod counter;
pub mod gauge;
pub mod histogram;
pub mod snapshot;
mod gauge;
pub use gauge::Gauge;
pub(crate) use self::{counter::Counter, gauge::Gauge, histogram::Histogram, snapshot::Snapshot};
mod histogram;
pub use histogram::{AtomicWindowedHistogram, Histogram};
pub type MetricKey = Cow<'static, str>;
/// A measurement.
///
/// Samples are the decoupled way of submitting data into the sink.
#[derive(Debug)]
pub(crate) enum Sample {
/// A counter delta.
///
/// The value is added directly to the existing counter, and so negative deltas will decrease
/// the counter, and positive deltas will increase the counter.
Count(ScopedKey, u64),
/// A single value, also known as a gauge.
///
/// Values operate in last-write-wins mode.
///
/// Values themselves cannot be incremented or decremented, so you must hold them externally
/// before sending them.
Gauge(ScopedKey, i64),
/// A timed sample.
///
/// Includes the start and end times.
TimingHistogram(ScopedKey, u64, u64),
/// A single value measured over time.
///
/// Unlike a gauge, where the value is only ever measured at a point in time, value histogram
/// measure values over time, and their distribution. This is nearly identical to timing
/// histograms, since the end result is just a single number, but we don't spice it up with
/// special unit labels or anything.
ValueHistogram(ScopedKey, u64),
}
/// An integer scoped metric key.
#[derive(Clone, Hash, PartialEq, Eq, Debug)]
pub(crate) struct ScopedKey(pub u64, pub MetricKey);
impl ScopedKey {
pub(crate) fn id(&self) -> u64 {
self.0
}
pub(crate) fn into_string_scoped(self, scope: String) -> StringScopedKey {
StringScopedKey(scope, self.1)
}
}
/// A string scoped metric key.
#[derive(Clone, Hash, PartialEq, Eq, Debug)]
pub(crate) struct StringScopedKey(String, MetricKey);
impl Display for StringScopedKey {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.0.is_empty() {
write!(f, "{}", self.1)
} else {
write!(f, "{}.{}", self.0, self.1.as_ref())
}
}
}
mod snapshot;
pub use snapshot::Snapshot;

View File

@ -1,86 +1,28 @@
use super::histogram::HistogramSnapshot;
use crate::common::ValueSnapshot;
use metrics_core::{Recorder, Snapshot as MetricsSnapshot};
use std::fmt::Display;
/// A typed metric measurement, used in snapshots.
///
/// This type provides a way to wrap the value of a metric, for use in a snapshot, while also
/// providing the overall type of the metric, so that downstream consumers who how to properly
/// format the data.
#[derive(Debug, PartialEq, Eq)]
pub enum TypedMeasurement {
Counter(String, u64),
Gauge(String, i64),
TimingHistogram(String, HistogramSnapshot),
ValueHistogram(String, HistogramSnapshot),
}
/// A point-in-time view of metric data.
#[derive(Default, Debug)]
pub struct Snapshot {
measurements: Vec<TypedMeasurement>,
measurements: Vec<(String, ValueSnapshot)>,
}
impl Snapshot {
/// Stores a counter value for the given metric key.
pub(crate) fn set_count<T>(&mut self, key: T, value: u64)
where
T: Display,
{
self.measurements
.push(TypedMeasurement::Counter(key.to_string(), value));
}
/// Stores a gauge value for the given metric key.
pub(crate) fn set_gauge<T>(&mut self, key: T, value: i64)
where
T: Display,
{
self.measurements
.push(TypedMeasurement::Gauge(key.to_string(), value));
}
/// Sets timing percentiles for the given metric key.
///
/// From the given `HdrHistogram`, all the specific `percentiles` will be extracted and stored.
pub(crate) fn set_timing_histogram<T>(&mut self, key: T, h: HistogramSnapshot)
where
T: Display,
{
self.measurements
.push(TypedMeasurement::TimingHistogram(key.to_string(), h));
}
/// Sets value percentiles for the given metric key.
///
/// From the given `HdrHistogram`, all the specific `percentiles` will be extracted and stored.
pub(crate) fn set_value_histogram<T>(&mut self, key: T, h: HistogramSnapshot)
where
T: Display,
{
self.measurements
.push(TypedMeasurement::ValueHistogram(key.to_string(), h));
}
/// Converts this [`Snapshot`] to the underlying vector of measurements.
pub fn into_measurements(self) -> Vec<TypedMeasurement> {
self.measurements
pub(crate) fn from(from: Vec<(String, ValueSnapshot)>) -> Self {
Snapshot { measurements: from }
}
}
impl MetricsSnapshot for Snapshot {
/// Records the snapshot to the given recorder.
fn record<R: Recorder>(&self, recorder: &mut R) {
for measurement in &self.measurements {
match measurement {
TypedMeasurement::Counter(key, value) => recorder.record_counter(key, *value),
TypedMeasurement::Gauge(key, value) => recorder.record_gauge(key, *value),
TypedMeasurement::TimingHistogram(key, hs) => {
recorder.record_histogram(key, hs.values().as_slice());
}
TypedMeasurement::ValueHistogram(key, hs) => {
recorder.record_histogram(key, hs.values().as_slice());
}
for (key, snapshot) in &self.measurements {
match snapshot {
ValueSnapshot::Counter(value) => recorder.record_counter(key, *value),
ValueSnapshot::Gauge(value) => recorder.record_gauge(key, *value),
ValueSnapshot::Histogram(stream) => stream.decompress_with(|values| {
recorder.record_histogram(key, values);
}),
}
}
}
@ -88,7 +30,8 @@ impl MetricsSnapshot for Snapshot {
#[cfg(test)]
mod tests {
use super::{HistogramSnapshot, MetricsSnapshot, Recorder, Snapshot, TypedMeasurement};
use super::{MetricsSnapshot, Recorder, Snapshot, ValueSnapshot};
use metrics_util::StreamingIntegers;
use std::collections::HashMap;
#[derive(Default)]
@ -128,29 +71,19 @@ mod tests {
}
}
#[test]
fn test_snapshot_simple_set_and_get() {
let key = "ok".to_owned();
let mut snapshot = Snapshot::default();
snapshot.set_count(key.clone(), 1);
snapshot.set_gauge(key.clone(), 42);
let values = snapshot.into_measurements();
assert_eq!(values[0], TypedMeasurement::Counter(key.clone(), 1));
assert_eq!(values[1], TypedMeasurement::Gauge(key.clone(), 42));
}
#[test]
fn test_snapshot_recorder() {
let key = "ok".to_owned();
let mut snapshot = Snapshot::default();
snapshot.set_count(key.clone(), 7);
snapshot.set_gauge(key.clone(), 42);
let mut measurements = Vec::new();
measurements.push((key.clone(), ValueSnapshot::Counter(7)));
measurements.push((key.clone(), ValueSnapshot::Gauge(42)));
let hvalues = vec![10, 25, 42, 97];
let histogram = HistogramSnapshot::new(hvalues);
snapshot.set_timing_histogram(key.clone(), histogram);
let mut stream = StreamingIntegers::new();
stream.compress(&hvalues);
measurements.push((key.clone(), ValueSnapshot::Histogram(stream)));
let snapshot: Snapshot = Snapshot::from(measurements);
let mut recorder = MockRecorder::default();
snapshot.record(&mut recorder);

View File

@ -1,12 +1,4 @@
use std::{
io::{Error, ErrorKind},
time::Duration,
};
/// Helpers to create an I/O error from a string.
pub fn io_error(reason: &str) -> Error {
Error::new(ErrorKind::Other, reason)
}
use std::time::Duration;
/// Converts a duration to nanoseconds.
pub fn duration_as_nanos(d: Duration) -> u64 {

View File

@ -7,16 +7,18 @@
//!
//! The library follows a pattern of "senders" and a "receiver."
//!
//! Callers create a [`Receiver`], which acts as a contained unit: metric registration,
//! aggregation, and summarization. The [`Receiver`] is intended to be spawned onto a dedicated
//! background thread.
//! Callers create a [`Receiver`], which acts as a registry for all metrics that flow through it.
//! It allows creating new sinks as well as controllers, both necessary to push in and pull out
//! metrics from the system. It also manages background resources necessary for the registry to
//! operate.
//!
//! Once a [`Receiver`] is created, callers can either create a [`Sink`] for sending metrics, or a
//! [`Controller`] for getting metrics out.
//!
//! A [`Sink`] can be cheaply cloned and does not require a mutable reference to send metrics, so
//! callers have increased flexibility in usage and control over whether or not to clone sinks,
//! share references, etc.
//! A [`Sink`] can be cheaply cloned, and offers convenience methods for getting the current time
//! as well as getting direct handles to a given metric. This allows users to either work with the
//! fuller API exposed by [`Sink`] or to take a compositional approach and embed fields that
//! represent each particular metric to be sent.
//!
//! A [`Controller`] provides both a synchronous and asynchronous snapshotting interface, which is
//! [`metrics-core`][metrics_core] compatible for exporting. This allows flexibility in
@ -25,8 +27,11 @@
//!
//! # Performance
//!
//! Being based on [`crossbeam-channel`][crossbeam_channel] allows us to process close to ten
//! million metrics per second using a single core, with average ingest latencies of around 100ns.
//! Users can expect to be able to send tens of millions of samples per second, with ingest
//! latencies at roughly 65-70ns at p50, and 250ns at p99. Depending on the workload -- counters
//! vs histograms -- latencies may be even lower, as counters and gauges are markedly faster to
//! update than histograms. Concurrent updates of the same metric will also cause natural
//! contention and lower the throughput/increase the latency of ingestion.
//!
//! # Metrics
//!
@ -39,8 +44,8 @@
//! # extern crate metrics;
//! use metrics::Receiver;
//! use std::{thread, time::Duration};
//! let receiver = Receiver::builder().build();
//! let sink = receiver.get_sink();
//! let receiver = Receiver::builder().build().expect("failed to create receiver");
//! let mut sink = receiver.get_sink();
//!
//! // We can update a counter. Counters are monotonic, unsigned integers that start at 0 and
//! // increase over time.
@ -54,18 +59,18 @@
//! // which utilizes a high-speed internal clock. This method returns the time in nanoseconds, so
//! // we get great resolution, but giving the time in nanoseconds isn't required! If you want to
//! // send it in another unit, that's fine, but just pay attention to that fact when viewing and
//! // using those metrics once exported.
//! // using those metrics once exported. We also support passing `Instant` values -- both `start`
//! // and `end` need to be the same type, though! -- and we'll take the nanosecond output of that.
//! let start = sink.now();
//! thread::sleep(Duration::from_millis(10));
//! let end = sink.now();
//! sink.record_timing("db.gizmo_query", start, end);
//! sink.record_timing("db.queries.select_products_ns", start, end);
//!
//! // Finally, we can update a value histogram. Technically speaking, value histograms aren't
//! // fundamentally different from timing histograms. If you use a timing histogram, we do the
//! // math for you of getting the time difference, and we make sure the metric name has the right
//! // unit suffix so you can tell it's measuring time, but other than that, nearly identical!
//! let buf_size = 4096;
//! sink.record_value("buf_size", buf_size);
//! // math for you of getting the time difference, but other than that, identical under the hood.
//! let row_count = 46;
//! sink.record_value("db.queries.select_products_num_rows", row_count);
//! ```
//!
//! # Scopes
@ -82,24 +87,24 @@
//! ```
//! # extern crate metrics;
//! use metrics::Receiver;
//! let receiver = Receiver::builder().build();
//! let receiver = Receiver::builder().build().expect("failed to create receiver");
//!
//! // This sink has no scope aka the root scope. The metric will just end up as "widgets".
//! let root_sink = receiver.get_sink();
//! let mut root_sink = receiver.get_sink();
//! root_sink.record_count("widgets", 42);
//!
//! // This sink is under the "secret" scope. Since we derived ourselves from the root scope,
//! // we're not nested under anything, but our metric name will end up being "secret.widgets".
//! let scoped_sink = root_sink.scoped("secret");
//! let mut scoped_sink = root_sink.scoped("secret");
//! scoped_sink.record_count("widgets", 42);
//!
//! // This sink is under the "supersecret" scope, but we're also nested! The metric name for this
//! // sample will end up being "secret.supersecret.widget".
//! let scoped_sink_two = scoped_sink.scoped("supersecret");
//! let mut scoped_sink_two = scoped_sink.scoped("supersecret");
//! scoped_sink_two.record_count("widgets", 42);
//!
//! // Sinks retain their scope even when cloned, so the metric name will be the same as above.
//! let cloned_sink = scoped_sink_two.clone();
//! let mut cloned_sink = scoped_sink_two.clone();
//! cloned_sink.record_count("widgets", 42);
//!
//! // This sink will be nested two levels deeper than its parent by using a slightly different
@ -107,24 +112,72 @@
//! // nesting N levels deep.
//! //
//! // This metric name will end up being "super.secret.ultra.special.widgets".
//! let scoped_sink_three = scoped_sink.scoped(&["super", "secret", "ultra", "special"]);
//! let mut scoped_sink_three = scoped_sink.scoped(&["super", "secret", "ultra", "special"]);
//! scoped_sink_two.record_count("widgets", 42);
//! ```
//!
//! [crossbeam_channel]: https://docs.rs/crossbeam-channel
//! # Snapshots
//!
//! Naturally, we need a way to get the metrics out of the system, which is where snapshots come
//! into play. By utilizing a [`Controller`], we can take a snapshot of the current metrics in the
//! registry, and then output them to any desired system/interface by utilizing
//! [`Recorder`](metrics_core::Recorder). A number of pre-baked recorders (which only concern
//! themselves with formatting the data) and exporters (which take the formatted data and either
//! serve it up, such as exposing an HTTP endpoint, or write it somewhere, like stdout) are
//! available, some of which are exposed by this crate.
//!
//! Let's take an example of writing out our metrics in a yaml-like format, writing them via
//! `log!`:
//! ```
//! # extern crate metrics;
//! use metrics::{Receiver, recorders::TextRecorder, exporters::LogExporter};
//! use log::Level;
//! use std::{thread, time::Duration};
//! let receiver = Receiver::builder().build().expect("failed to create receiver");
//! let mut sink = receiver.get_sink();
//!
//! // We can update a counter. Counters are monotonic, unsigned integers that start at 0 and
//! // increase over time.
//! // Take some measurements, similar to what we had in other examples:
//! sink.record_count("widgets", 5);
//! sink.record_gauge("red_balloons", 99);
//!
//! let start = sink.now();
//! thread::sleep(Duration::from_millis(10));
//! let end = sink.now();
//! sink.record_timing("db.queries.select_products_ns", start, end);
//! sink.record_timing("db.gizmo_query", start, end);
//!
//! let num_rows = 46;
//! sink.record_value("db.queries.select_products_num_rows", num_rows);
//!
//! // Now create our exporter/recorder configuration, and wire it up.
//! let exporter = LogExporter::new(receiver.get_controller(), TextRecorder::new(), Level::Info);
//!
//! // This exporter will now run every 5 seconds, taking a snapshot, rendering it, and writing it
//! // via `log!` at the informational level. This particular exporter is running directly on the
//! // current thread, and not on a background thread.
//! //
//! // exporter.run(Duration::from_secs(5));
//! ```
//! Most exporters have the ability to run on the current thread or to be converted into a future
//! which can be spawned on any Tokio-compatible runtime.
//!
//! [metrics_core]: https://docs.rs/metrics-core
mod configuration;
//! [`Recorder`]: https://docs.rs/metrics-core/0.3.1/metrics_core/trait.Recorder.html
#![deny(missing_docs)]
#![warn(unused_extern_crates)]
mod builder;
mod common;
mod config;
mod control;
mod data;
pub mod data;
mod helper;
mod receiver;
mod scopes;
mod registry;
mod sink;
#[cfg(any(
feature = "metrics-exporter-log",
feature = "metrics-exporter-http"
))]
#[cfg(any(feature = "metrics-exporter-log", feature = "metrics-exporter-http"))]
pub mod exporters;
#[cfg(any(
@ -134,13 +187,9 @@ pub mod exporters;
pub mod recorders;
pub use self::{
configuration::Configuration,
builder::{Builder, BuilderError},
common::{Delta, MetricName, MetricScope},
control::{Controller, SnapshotError},
data::histogram::HistogramSnapshot,
receiver::Receiver,
sink::{AsScoped, Sink, SinkError},
};
pub mod snapshot {
pub use super::data::snapshot::{Snapshot, TypedMeasurement};
}

View File

@ -1,220 +1,69 @@
use crate::{
configuration::Configuration,
control::{ControlFrame, Controller},
data::{Counter, Gauge, Histogram, Sample, ScopedKey, Snapshot, StringScopedKey},
scopes::Scopes,
builder::{Builder, BuilderError},
common::MetricScope,
config::Configuration,
control::Controller,
registry::{MetricRegistry, ScopeRegistry},
sink::Sink,
};
use crossbeam_channel::{self, bounded, tick, Select, TryRecvError};
use quanta::Clock;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use quanta::{Builder as UpkeepBuilder, Clock, Handle as UpkeepHandle};
use std::sync::Arc;
/// Wrapper for all messages that flow over the data channel between sink/receiver.
pub(crate) enum MessageFrame {
/// A normal data message holding a metric sample.
Data(Sample),
}
/// Metrics receiver which aggregates and processes samples.
/// Central store for metrics.
///
/// `Receiver` is the nucleus for all metrics operations. While no operations are performed by it
/// directly, it holds the registeries and references to resources and so it must live as long as
/// any [`Sink`] or `[`Controller`] does.
pub struct Receiver {
config: Configuration,
// Sample aggregation machinery.
msg_tx: crossbeam_channel::Sender<MessageFrame>,
msg_rx: Option<crossbeam_channel::Receiver<MessageFrame>>,
control_tx: crossbeam_channel::Sender<ControlFrame>,
control_rx: Option<crossbeam_channel::Receiver<ControlFrame>>,
// Metric machinery.
counter: Counter,
gauge: Gauge,
thistogram: Histogram,
vhistogram: Histogram,
metric_registry: Arc<MetricRegistry>,
scope_registry: Arc<ScopeRegistry>,
clock: Clock,
scopes: Arc<Scopes>,
_upkeep_handle: UpkeepHandle,
}
impl Receiver {
pub(crate) fn from_config(config: Configuration) -> Receiver {
// Create our data, control, and buffer channels.
let (msg_tx, msg_rx) = bounded(config.capacity);
let (control_tx, control_rx) = bounded(16);
pub(crate) fn from_config(config: Configuration) -> Result<Receiver, BuilderError> {
// Configure our clock and configure the quanta upkeep thread. The upkeep thread does that
// for us, and keeps us within `upkeep_interval` of the true time. The reads of this cache
// time are faster than calling the underlying time source directly, and for histogram
// windowing, we can afford to have a very granular value compared to the raw nanosecond
// precsion provided by quanta by default.
let clock = Clock::new();
let upkeep = UpkeepBuilder::new_with_clock(config.upkeep_interval, clock.clone());
let _upkeep_handle = upkeep.start().map_err(|_| BuilderError::UpkeepFailure)?;
let histogram_window = config.histogram_window;
let histogram_granularity = config.histogram_granularity;
Receiver {
let scope_registry = Arc::new(ScopeRegistry::new());
let metric_registry = Arc::new(MetricRegistry::new(
scope_registry.clone(),
config,
msg_tx,
msg_rx: Some(msg_rx),
control_tx,
control_rx: Some(control_rx),
counter: Counter::new(),
gauge: Gauge::new(),
thistogram: Histogram::new(histogram_window, histogram_granularity),
vhistogram: Histogram::new(histogram_window, histogram_granularity),
clock: Clock::new(),
scopes: Arc::new(Scopes::new()),
}
clock.clone(),
));
Ok(Receiver {
metric_registry,
scope_registry,
clock,
_upkeep_handle,
})
}
/// Gets a builder to configure a [`Receiver`] instance with.
pub fn builder() -> Configuration {
Configuration::default()
/// Creates a new [`Builder`] for building a [`Receiver`].
pub fn builder() -> Builder {
Builder::default()
}
/// Creates a [`Sink`] bound to this receiver.
pub fn get_sink(&self) -> Sink {
Sink::new_with_scope_id(
self.msg_tx.clone(),
Sink::new(
self.metric_registry.clone(),
self.scope_registry.clone(),
MetricScope::Root,
self.clock.clone(),
self.scopes.clone(),
"".to_owned(),
0,
)
}
/// Creates a [`Controller`] bound to this receiver.
pub fn get_controller(&self) -> Controller {
Controller::new(self.control_tx.clone())
}
/// Run the receiver.
///
/// This is blocking, and should be run in a dedicated background thread.
pub fn run(&mut self) {
let batch_size = self.config.batch_size;
let mut batch = Vec::with_capacity(batch_size);
let upkeep_rx = tick(Duration::from_millis(100));
let control_rx = self.control_rx.take().expect("failed to take control rx");
let msg_rx = self.msg_rx.take().expect("failed to take msg rx");
let mut selector = Select::new();
let _ = selector.recv(&upkeep_rx);
let _ = selector.recv(&control_rx);
let _ = selector.recv(&msg_rx);
loop {
// Block on having something to do.
let _ = selector.ready();
if upkeep_rx.try_recv().is_ok() {
let now = Instant::now();
self.thistogram.upkeep(now);
self.vhistogram.upkeep(now);
}
while let Ok(cframe) = control_rx.try_recv() {
self.process_control_frame(cframe);
}
loop {
match msg_rx.try_recv() {
Ok(mframe) => batch.push(mframe),
Err(TryRecvError::Empty) => break,
Err(e) => eprintln!("error receiving message frame: {}", e),
}
if batch.len() == batch_size {
break;
}
}
if !batch.is_empty() {
for mframe in batch.drain(0..) {
self.process_msg_frame(mframe);
}
}
}
}
/// Gets the string representation of an integer scope.
///
/// Returns `Some(scope)` if found, `None` otherwise. Scope ID `0` is reserved for the root
/// scope.
fn get_string_scope(&self, key: ScopedKey) -> Option<StringScopedKey> {
let scope_id = key.id();
if scope_id == 0 {
return Some(key.into_string_scoped("".to_owned()));
}
self.scopes
.get(scope_id)
.map(|scope| key.into_string_scoped(scope))
}
/// Gets a snapshot of the current metrics/facets.
fn get_snapshot(&self) -> Snapshot {
let mut snapshot = Snapshot::default();
let cvalues = self.counter.values();
let gvalues = self.gauge.values();
let tvalues = self.thistogram.values();
let vvalues = self.vhistogram.values();
for (key, value) in cvalues {
if let Some(actual_key) = self.get_string_scope(key) {
snapshot.set_count(actual_key, value);
}
}
for (key, value) in gvalues {
if let Some(actual_key) = self.get_string_scope(key) {
snapshot.set_gauge(actual_key, value);
}
}
for (key, value) in tvalues {
if let Some(actual_key) = self.get_string_scope(key) {
snapshot.set_timing_histogram(actual_key, value);
}
}
for (key, value) in vvalues {
if let Some(actual_key) = self.get_string_scope(key) {
snapshot.set_value_histogram(actual_key, value);
}
}
snapshot
}
/// Processes a control frame.
fn process_control_frame(&self, msg: ControlFrame) {
match msg {
ControlFrame::Snapshot(tx) => {
let snapshot = self.get_snapshot();
let _ = tx.send(snapshot);
}
ControlFrame::SnapshotAsync(tx) => {
let snapshot = self.get_snapshot();
let _ = tx.send(snapshot);
}
}
}
/// Processes a message frame.
fn process_msg_frame(&mut self, msg: MessageFrame) {
match msg {
MessageFrame::Data(sample) => match sample {
Sample::Count(key, count) => {
self.counter.update(key, count);
}
Sample::Gauge(key, value) => {
self.gauge.update(key, value);
}
Sample::TimingHistogram(key, start, end) => {
let delta = end - start;
self.counter.update(key.clone(), 1);
self.thistogram.update(key, delta);
}
Sample::ValueHistogram(key, value) => {
self.vhistogram.update(key, value);
}
},
}
Controller::new(self.metric_registry.clone(), self.scope_registry.clone())
}
}

View File

@ -0,0 +1,86 @@
use crate::common::{MetricIdentifier, MetricKind, MetricValue};
use crate::config::Configuration;
use crate::data::Snapshot;
use crate::registry::ScopeRegistry;
use arc_swap::{ptr_eq, ArcSwap};
use im::hashmap::HashMap;
use quanta::Clock;
use std::ops::Deref;
use std::sync::Arc;
pub(crate) struct MetricRegistry {
scope_registry: Arc<ScopeRegistry>,
metrics: ArcSwap<HashMap<MetricIdentifier, MetricValue>>,
config: Configuration,
clock: Clock,
}
impl MetricRegistry {
pub fn new(scope_registry: Arc<ScopeRegistry>, config: Configuration, clock: Clock) -> Self {
MetricRegistry {
scope_registry,
metrics: ArcSwap::new(Arc::new(HashMap::new())),
config,
clock,
}
}
pub fn get_value_handle(&self, identifier: MetricIdentifier) -> MetricValue {
loop {
match self.metrics.lease().deref().get(&identifier) {
Some(handle) => return handle.clone(),
None => {
let kind = match &identifier {
MetricIdentifier::Unlabeled(_, _, kind) => kind,
};
let value_handle = match kind {
MetricKind::Counter => MetricValue::counter(),
MetricKind::Gauge => MetricValue::gauge(),
MetricKind::Histogram => MetricValue::histogram(
self.config.histogram_window,
self.config.histogram_granularity,
self.clock.clone(),
),
};
let metrics_ptr = self.metrics.lease();
let mut metrics = metrics_ptr.deref().clone();
match metrics.insert(identifier.clone(), value_handle.clone()) {
// Somebody else beat us to it, loop.
Some(_) => continue,
None => {
// If we weren't able to cleanly update the map, then try again.
let old = self
.metrics
.compare_and_swap(&metrics_ptr, Arc::new(metrics));
if !ptr_eq(old, metrics_ptr) {
continue;
}
}
}
return value_handle;
}
}
}
}
pub fn get_snapshot(&self) -> Snapshot {
let mut named_values = Vec::new();
let metrics = self.metrics.load().deref().clone();
for (identifier, value) in metrics.into_iter() {
let (name, scope_handle) = match identifier {
MetricIdentifier::Unlabeled(name, scope, _) => (name, scope),
};
let scope = self.scope_registry.get(scope_handle);
let scoped_name = scope.into_scoped(name);
let snapshot = value.snapshot();
named_values.push((scoped_name, snapshot));
}
Snapshot::from(named_values)
}
}

View File

@ -0,0 +1,5 @@
mod scope;
pub(crate) use self::scope::ScopeRegistry;
mod metric;
pub(crate) use self::metric::MetricRegistry;

View File

@ -0,0 +1,57 @@
use crate::common::{MetricScope, MetricScopeHandle};
use parking_lot::RwLock;
use std::collections::HashMap;
struct Inner {
id: u64,
forward: HashMap<MetricScope, MetricScopeHandle>,
backward: HashMap<MetricScopeHandle, MetricScope>,
}
impl Inner {
pub fn new() -> Self {
Inner {
id: 1,
forward: HashMap::new(),
backward: HashMap::new(),
}
}
}
pub(crate) struct ScopeRegistry {
inner: RwLock<Inner>,
}
impl ScopeRegistry {
pub fn new() -> Self {
Self {
inner: RwLock::new(Inner::new()),
}
}
pub fn register(&self, scope: MetricScope) -> u64 {
let mut wg = self.inner.write();
// If the key is already registered, send back the existing scope ID.
if wg.forward.contains_key(&scope) {
return wg.forward.get(&scope).cloned().unwrap();
}
// Otherwise, take the current scope ID for this registration, store it, and increment
// the scope ID counter for the next registration.
let scope_id = wg.id;
let _ = wg.forward.insert(scope.clone(), scope_id);
let _ = wg.backward.insert(scope_id, scope);
wg.id += 1;
scope_id
}
pub fn get(&self, scope_id: MetricScopeHandle) -> MetricScope {
// See if we have an entry for the scope ID, and clone the scope if so.
let rg = self.inner.read();
rg.backward
.get(&scope_id)
.cloned()
.unwrap_or(MetricScope::Root)
}
}

View File

@ -1,68 +1,72 @@
use crate::{
data::{MetricKey, Sample, ScopedKey},
helper::io_error,
receiver::MessageFrame,
scopes::Scopes,
common::{
Delta, MetricIdentifier, MetricKind, MetricName, MetricScope, MetricScopeHandle,
MetricValue,
},
data::{Counter, Gauge, Histogram},
registry::{MetricRegistry, ScopeRegistry},
};
use crossbeam_channel::Sender;
use fxhash::FxBuildHasher;
use quanta::Clock;
use std::error::Error;
use std::fmt;
use std::sync::Arc;
/// Erorrs during sink creation.
#[derive(Debug)]
type FastHashMap<K, V> = hashbrown::HashMap<K, V, FxBuildHasher>;
/// Errors during sink creation.
#[derive(Debug, Clone)]
pub enum SinkError {
/// The scope value given was invalid i.e. empty or illegal characters.
InvalidScope,
}
/// A value that can be used as a metric scope.
pub trait AsScoped<'a> {
fn as_scoped(&'a self, base: String) -> String;
impl Error for SinkError {}
impl fmt::Display for SinkError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SinkError::InvalidScope => write!(f, "given scope is invalid"),
}
}
}
/// Handle for sending metric samples into the receiver.
/// A value that can be used as a metric scope.
///
/// [`Sink`] is cloneable, and can not only send metric samples but can register and deregister
/// metric facets at any time.
/// This helper trait allows us to accept either a single string or a slice of strings to use as a
/// scope, to avoid needing to allocate in the case where we want to be able to specify multiple
/// scope levels in a single go.
pub trait AsScoped<'a> {
/// Creates a new [`MetricScope`] by adding `self` to the `base` scope.
fn as_scoped(&'a self, base: MetricScope) -> MetricScope;
}
/// Handle for sending metric samples.
pub struct Sink {
msg_tx: Sender<MessageFrame>,
metric_registry: Arc<MetricRegistry>,
metric_cache: FastHashMap<MetricIdentifier, MetricValue>,
scope_registry: Arc<ScopeRegistry>,
scope: MetricScope,
scope_handle: MetricScopeHandle,
clock: Clock,
scopes: Arc<Scopes>,
scope: String,
scope_id: u64,
}
impl Sink {
pub(crate) fn new(
msg_tx: Sender<MessageFrame>,
metric_registry: Arc<MetricRegistry>,
scope_registry: Arc<ScopeRegistry>,
scope: MetricScope,
clock: Clock,
scopes: Arc<Scopes>,
scope: String,
) -> Sink {
let scope_id = scopes.register(scope.clone());
let scope_handle = scope_registry.register(scope.clone());
Sink {
msg_tx,
clock,
scopes,
metric_registry,
metric_cache: FastHashMap::default(),
scope_registry,
scope,
scope_id,
}
}
pub(crate) fn new_with_scope_id(
msg_tx: Sender<MessageFrame>,
clock: Clock,
scopes: Arc<Scopes>,
scope: String,
scope_id: u64,
) -> Sink {
Sink {
msg_tx,
scope_handle,
clock,
scopes,
scope,
scope_id,
}
}
@ -90,10 +94,10 @@ impl Sink {
let new_scope = scope.as_scoped(self.scope.clone());
Sink::new(
self.msg_tx.clone(),
self.clock.clone(),
self.scopes.clone(),
self.metric_registry.clone(),
self.scope_registry.clone(),
new_scope,
self.clock.clone(),
)
}
@ -102,58 +106,113 @@ impl Sink {
self.clock.now()
}
/// Records the count for a given metric.
pub fn record_count<K: Into<MetricKey>>(&self, key: K, delta: u64) {
let scoped_key = ScopedKey(self.scope_id, key.into());
self.send(Sample::Count(scoped_key, delta))
/// Records a value for a counter identified by the given name.
pub fn record_count<N: Into<MetricName>>(&mut self, name: N, value: u64) {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Counter);
let value_handle = self.get_cached_value_handle(identifier);
value_handle.update_counter(value);
}
/// Records the gauge for a given metric.
pub fn record_gauge<K: Into<MetricKey>>(&self, key: K, value: i64) {
let scoped_key = ScopedKey(self.scope_id, key.into());
self.send(Sample::Gauge(scoped_key, value))
/// Records the value for a gauge identified by the given name.
pub fn record_gauge<N: Into<MetricName>>(&mut self, name: N, value: i64) {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Gauge);
let value_handle = self.get_cached_value_handle(identifier);
value_handle.update_gauge(value);
}
/// Records the timing histogram for a given metric.
pub fn record_timing<K: Into<MetricKey>>(&self, key: K, start: u64, end: u64) {
let scoped_key = ScopedKey(self.scope_id, key.into());
self.send(Sample::TimingHistogram(scoped_key, start, end))
/// Records the value for a timing histogram identified by the given name.
///
/// Both the start and end times must be supplied, but any values that implement [`Delta`] can
/// be used which allows for raw values from [`quanta::Clock`] to be used, or measurements from
/// [`Instant::now`].
pub fn record_timing<N: Into<MetricName>, V: Delta>(&mut self, name: N, start: V, end: V) {
let value = end.delta(start);
self.record_value(name, value);
}
/// Records the value histogram for a given metric.
pub fn record_value<K: Into<MetricKey>>(&self, key: K, value: u64) {
let scoped_key = ScopedKey(self.scope_id, key.into());
self.send(Sample::ValueHistogram(scoped_key, value))
/// Records the value for a value histogram identified by the given name.
pub fn record_value<N: Into<MetricName>>(&mut self, name: N, value: u64) {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Histogram);
let value_handle = self.get_cached_value_handle(identifier);
value_handle.update_histogram(value);
}
/// Sends a raw metric sample to the receiver.
fn send(&self, sample: Sample) {
let _ = self
.msg_tx
.send(MessageFrame::Data(sample))
.map_err(|_| io_error("failed to send sample"));
/// Creates a handle to the given counter.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying counter. It is merely a proxy, so multiple handles to the same counter can be
/// held and used.
pub fn counter<N: Into<MetricName>>(&mut self, name: N) -> Counter {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Counter);
self.get_cached_value_handle(identifier).clone().into()
}
/// Creates a handle to the given gauge.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying gauge. It is merely a proxy, so multiple handles to the same gauge can be
/// held and used.
pub fn gauge<N: Into<MetricName>>(&mut self, name: N) -> Gauge {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Gauge);
self.get_cached_value_handle(identifier).clone().into()
}
/// Creates a handle to the given histogram.
///
/// This handle can be embedded into an existing type and used to directly update the
/// underlying histogram. It is merely a proxy, so multiple handles to the same histogram
/// can be held and used.
pub fn histogram<N: Into<MetricName>>(&mut self, name: N) -> Histogram {
let identifier =
MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Histogram);
self.get_cached_value_handle(identifier).clone().into()
}
fn get_cached_value_handle(&mut self, identifier: MetricIdentifier) -> &MetricValue {
// This gross hack gets around lifetime rules until full NLL is stable. Without it, the
// borrow checker doesn't understand the flow control and thinks the reference lives all
// the way until the of the function, which breaks when we try to take a mutable reference
// for inserting into the handle cache.
if let Some(handle) = self.metric_cache.get(&identifier) {
return unsafe { &*(handle as *const MetricValue) };
}
let handle = self.metric_registry.get_value_handle(identifier.clone());
self.metric_cache.insert(identifier.clone(), handle);
self.metric_cache.get(&identifier).unwrap()
}
}
impl Clone for Sink {
fn clone(&self) -> Sink {
Sink {
msg_tx: self.msg_tx.clone(),
clock: self.clock.clone(),
scopes: self.scopes.clone(),
metric_registry: self.metric_registry.clone(),
metric_cache: self.metric_cache.clone(),
scope_registry: self.scope_registry.clone(),
scope: self.scope.clone(),
scope_id: self.scope_id,
scope_handle: self.scope_handle,
clock: self.clock.clone(),
}
}
}
impl<'a> AsScoped<'a> for str {
fn as_scoped(&'a self, mut base: String) -> String {
if !base.is_empty() {
base.push_str(".");
fn as_scoped(&'a self, base: MetricScope) -> MetricScope {
match base {
MetricScope::Root => {
let parts = vec![self.to_owned()];
MetricScope::Nested(parts)
}
MetricScope::Nested(mut parts) => {
parts.push(self.to_owned());
MetricScope::Nested(parts)
}
}
base.push_str(self);
base
}
}
@ -162,13 +221,17 @@ where
&'a T: AsRef<[&'b str]>,
T: 'a,
{
fn as_scoped(&'a self, mut base: String) -> String {
for item in self.as_ref() {
if !base.is_empty() {
base.push('.');
fn as_scoped(&'a self, base: MetricScope) -> MetricScope {
match base {
MetricScope::Root => {
let parts = self.as_ref().iter().map(|s| s.to_string()).collect();
MetricScope::Nested(parts)
}
MetricScope::Nested(mut parts) => {
let mut new_parts = self.as_ref().iter().map(|s| s.to_string()).collect();
parts.append(&mut new_parts);
MetricScope::Nested(parts)
}
base.push_str(item);
}
base
}
}