diff --git a/azure-pipelines.yml b/azure-pipelines.yml new file mode 100644 index 0000000..40bd9cd --- /dev/null +++ b/azure-pipelines.yml @@ -0,0 +1,15 @@ +trigger: ["master"] +pr: ["master"] + +jobs: +# Check the crate formatting. +- template: ci/azure-rustfmt.yml + +# Actaully test the crate. +- template: ci/azure-test-stable.yml + +# Test it to make sure it still works on our minimum version. +- template: ci/azure-test-minimum.yaml + +# Now test it against nightly w/ ASM support. +- template: ci/azure-test-nightly.yml diff --git a/ci/azure-install-rust.yml b/ci/azure-install-rust.yml new file mode 100644 index 0000000..654db47 --- /dev/null +++ b/ci/azure-install-rust.yml @@ -0,0 +1,27 @@ +steps: + # Linux and macOS. + - script: | + set -e + curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain $RUSTUP_TOOLCHAIN + echo "##vso[task.setvariable variable=PATH;]$PATH:$HOME/.cargo/bin" + env: + RUSTUP_TOOLCHAIN: ${{parameters.rust_version}} + displayName: "Install rust (*nix)" + condition: not(eq(variables['Agent.OS'], 'Windows_NT')) + + # Windows. + - script: | + curl -sSf -o rustup-init.exe https://win.rustup.rs + rustup-init.exe -y --default-toolchain %RUSTUP_TOOLCHAIN% + set PATH=%PATH%;%USERPROFILE%\.cargo\bin + echo "##vso[task.setvariable variable=PATH;]%PATH%;%USERPROFILE%\.cargo\bin" + env: + RUSTUP_TOOLCHAIN: ${{parameters.rust_version}} + displayName: "Install rust (windows)" + condition: eq(variables['Agent.OS'], 'Windows_NT') + + # All platforms. + - script: | + rustc -Vv + cargo -V + displayName: Query rust and cargo versions diff --git a/ci/azure-rustfmt.yml b/ci/azure-rustfmt.yml new file mode 100644 index 0000000..cb52dbb --- /dev/null +++ b/ci/azure-rustfmt.yml @@ -0,0 +1,16 @@ +jobs: +# Check formatting +- job: rustfmt + displayName: Check rustfmt + pool: + vmImage: ubuntu-16.04 + steps: + - template: azure-install-rust.yml + parameters: + rust_version: nightly + - script: | + rustup component add rustfmt + displayName: Install rustfmt + - script: | + cargo fmt --all -- --check + displayName: Check formatting diff --git a/ci/azure-test-minimum.yaml b/ci/azure-test-minimum.yaml new file mode 100644 index 0000000..7a78772 --- /dev/null +++ b/ci/azure-test-minimum.yaml @@ -0,0 +1,20 @@ +jobs: +- job: test_metrics_minimum + displayName: Test Metrics Minimum + strategy: + matrix: + Linux: + vmImage: ubuntu-16.04 + MacOS: + vmImage: macOS-10.13 + Windows: + vmImage: vs2017-win2016 + pool: + vmImage: $(vmImage) + + steps: + - template: azure-install-rust.yml + parameters: + rust_version: 1.34.0 + - script: cargo test + displayName: cargo test diff --git a/ci/azure-test-nightly.yml b/ci/azure-test-nightly.yml new file mode 100644 index 0000000..ed16f62 --- /dev/null +++ b/ci/azure-test-nightly.yml @@ -0,0 +1,22 @@ +jobs: +- job: test_metrics_nightly + displayName: Test Metrics Nightly + strategy: + matrix: + Linux: + vmImage: ubuntu-16.04 + MacOS: + vmImage: macOS-10.13 + Windows: + vmImage: vs2017-win2016 + pool: + vmImage: $(vmImage) + + steps: + - template: azure-install-rust.yml + parameters: + rust_version: nightly + - script: cargo test + displayName: cargo test + - script: cargo bench + displayName: cargo bench diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml new file mode 100644 index 0000000..b6ade9b --- /dev/null +++ b/ci/azure-test-stable.yml @@ -0,0 +1,20 @@ +jobs: +- job: test_metrics_stable + displayName: Test Metrics Stable + strategy: + matrix: + Linux: + vmImage: ubuntu-16.04 + MacOS: + vmImage: macOS-10.13 + Windows: + vmImage: vs2017-win2016 + pool: + vmImage: $(vmImage) + + steps: + - template: azure-install-rust.yml + parameters: + rust_version: stable + - script: cargo test + displayName: cargo test diff --git a/metrics-exporter-http/Cargo.toml b/metrics-exporter-http/Cargo.toml index 1b1a56c..09c97b7 100644 --- a/metrics-exporter-http/Cargo.toml +++ b/metrics-exporter-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "metrics-exporter-http" -version = "0.1.0" +version = "0.1.1" authors = ["Toby Lawrence "] edition = "2018" diff --git a/metrics-exporter-http/src/lib.rs b/metrics-exporter-http/src/lib.rs index 2f3b3fe..40f3f0a 100644 --- a/metrics-exporter-http/src/lib.rs +++ b/metrics-exporter-http/src/lib.rs @@ -11,13 +11,13 @@ #[macro_use] extern crate log; +use hyper::rt::run as hyper_run; +use hyper::rt::Future; +use hyper::service::service_fn; +use hyper::{Body, Response, Server}; use metrics_core::{AsyncSnapshotProvider, Recorder, Snapshot}; use std::error::Error; use std::net::SocketAddr; -use hyper::{Body, Response, Server}; -use hyper::service::service_fn; -use hyper::rt::run as hyper_run; -use hyper::rt::Future; /// Exports metrics over HTTP. pub struct HttpExporter { @@ -54,7 +54,6 @@ where } /// Converts this exporter into a future which can be driven externally. - /// logs output on the given interval. /// /// This starts an HTTP server on the `address` the exporter was originally configured with, /// responding to any request with the output of the configured recorder. @@ -67,7 +66,11 @@ where } } -fn build_hyper_server(controller: C, recorder: R, address: SocketAddr) -> impl Future +fn build_hyper_server( + controller: C, + recorder: R, + address: SocketAddr, +) -> impl Future where C: AsyncSnapshotProvider + Clone + Send + 'static, C::SnapshotFuture: Send + 'static, @@ -81,14 +84,15 @@ where service_fn(move |_| { let recorder3 = recorder2.clone(); - controller2.get_snapshot_async() + controller2 + .get_snapshot_async() .then(move |result| match result { Ok(snapshot) => { let mut r = recorder3.clone(); snapshot.record(&mut r); let output = r.into(); Ok(Response::new(Body::from(output))) - }, + } Err(e) => Err(e), }) }) diff --git a/metrics-recorder-prometheus/Cargo.toml b/metrics-recorder-prometheus/Cargo.toml index 21fd5e5..5881991 100644 --- a/metrics-recorder-prometheus/Cargo.toml +++ b/metrics-recorder-prometheus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "metrics-recorder-prometheus" -version = "0.2.0" +version = "0.2.1" authors = ["Toby Lawrence "] edition = "2018" @@ -14,5 +14,5 @@ documentation = "https://docs.rs/metrics-recorder-prometheus" [dependencies] metrics-core = { path = "../metrics-core", version = "^0.3" } -metrics-util = { path = "../metrics-util", version = "^0.1" } +metrics-util = { path = "../metrics-util", version = "^0.2" } hdrhistogram = "^6.1" diff --git a/metrics-recorder-text/Cargo.toml b/metrics-recorder-text/Cargo.toml index 9e26d30..7aa60df 100644 --- a/metrics-recorder-text/Cargo.toml +++ b/metrics-recorder-text/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "metrics-recorder-text" -version = "0.2.0" +version = "0.2.1" authors = ["Toby Lawrence "] edition = "2018" @@ -14,5 +14,5 @@ documentation = "https://docs.rs/metrics-recorder-text" [dependencies] metrics-core = { path = "../metrics-core", version = "^0.3" } -metrics-util = { path = "../metrics-util", version = "^0.1" } +metrics-util = { path = "../metrics-util", version = "^0.2" } hdrhistogram = "^6.1" diff --git a/metrics-util/Cargo.toml b/metrics-util/Cargo.toml index 11e1549..4819fad 100644 --- a/metrics-util/Cargo.toml +++ b/metrics-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "metrics-util" -version = "0.1.0" +version = "0.2.0" authors = ["Toby Lawrence "] edition = "2018" @@ -16,4 +16,19 @@ readme = "README.md" keywords = ["metrics", "quantile", "percentile"] +[[bench]] +name = "bucket" +harness = false + +[[bench]] +name = "streaming_integers" +harness = false + [dependencies] +crossbeam-epoch = "^0.7" + +[dev-dependencies] +crossbeam = "^0.7" +criterion = "^0.2.9" +lazy_static = "^1.3" +rand = "^0.6" diff --git a/metrics-util/benches/bucket.rs b/metrics-util/benches/bucket.rs new file mode 100644 index 0000000..3da95ae --- /dev/null +++ b/metrics-util/benches/bucket.rs @@ -0,0 +1,60 @@ +#[macro_use] +extern crate criterion; + +#[macro_use] +extern crate lazy_static; + +use criterion::{Benchmark, Criterion, Throughput}; +use metrics_util::AtomicBucket; + +lazy_static! { + static ref RANDOM_INTS: Vec = vec![ + 21061184, 21301862, 21331592, 21457012, 21500016, 21537837, 21581557, 21620030, 21664102, + 21678463, 21708437, 21751808, 21845243, 21850265, 21938879, 21971014, 22005842, 22034601, + 22085552, 22101746, 22115429, 22139883, 22260209, 22270768, 22298080, 22299780, 22307659, + 22354697, 22355668, 22359397, 22463872, 22496590, 22590978, 22603740, 22706352, 22820895, + 22849491, 22891538, 22912955, 22919915, 22928920, 22968656, 22985992, 23033739, 23061395, + 23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058, + 23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769, + 25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202, + 26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268, + 26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584, + 27024044, 27057184, 23061395, 23077554, 23138588, 23185172, 23282479, 23290830, 23316844, + 23386911, 23641319, 23677058, 23742930, 25350389, 25399746, 25404925, 25464391, 25478415, + 25480015, 25632783, 25639769, 25645612, 25688228, 25724427, 25862192, 25954476, 25994479, + 26008752, 26036460, 26038202, 26078874, 26118327, 26132679, 26207601, 26262418, 26270737, + 26274860, 26431248, 26434268, 26562736, 26580134, 26593740, 26618561, 26844181, 26866971, + 26907883, 27005270, 27023584, 27024044, 27057184, 23061395, 23077554, 23138588, 23185172, + 23282479, 23290830, 23316844, 23386911, 23641319, 23677058, 23742930, 25350389, 25399746, + 25404925, 25464391, 25478415, 25480015, 25632783, 25639769, 25645612, 25688228, 25724427, + 25862192, 25954476, 25994479, 26008752, 26036460, 26038202, 26078874, 26118327, 26132679, + 26207601, 26262418, 26270737, 26274860, 26431248, 26434268, 26562736, 26580134, 26593740, + 26618561, 26844181, 26866971, 26907883, 27005270, 27023584, 27024044, 27057184, 23061395, + 23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058, + 23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769, + 25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202, + 26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268, + 26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584, + 27024044, 27057184, 27088034, 27088550, 27302898, 27353925, 27412984, 27488633, 27514155, + 27558052, 27601937, 27606339, 27624514, 27680396, 27684064, 27963602, 27414982, 28450673 + ]; +} + +fn bucket_benchmark(c: &mut Criterion) { + c.bench( + "bucket", + Benchmark::new("write", |b| { + let bucket = AtomicBucket::new(); + + b.iter(|| { + for value in RANDOM_INTS.iter() { + bucket.push(value); + } + }) + }) + .throughput(Throughput::Elements(RANDOM_INTS.len() as u32)), + ); +} + +criterion_group!(benches, bucket_benchmark); +criterion_main!(benches); diff --git a/metrics-util/benches/streaming_integers.rs b/metrics-util/benches/streaming_integers.rs new file mode 100644 index 0000000..9748420 --- /dev/null +++ b/metrics-util/benches/streaming_integers.rs @@ -0,0 +1,104 @@ +#[macro_use] +extern crate criterion; + +#[macro_use] +extern crate lazy_static; + +use criterion::{Benchmark, Criterion, Throughput}; +use metrics_util::StreamingIntegers; +use rand::{ + distributions::{Distribution, Gamma}, + rngs::SmallRng, + Rng, SeedableRng, +}; +use std::time::Duration; + +lazy_static! { + static ref NORMAL_SMALL: Vec = get_gamma_distribution(100, Duration::from_millis(200)); + static ref NORMAL_MEDIUM: Vec = get_gamma_distribution(10000, Duration::from_millis(200)); + static ref NORMAL_LARGE: Vec = get_gamma_distribution(1000000, Duration::from_millis(200)); + static ref LINEAR_SMALL: Vec = get_linear_distribution(100); + static ref LINEAR_MEDIUM: Vec = get_linear_distribution(10000); + static ref LINEAR_LARGE: Vec = get_linear_distribution(1000000); +} + +fn get_gamma_distribution(len: usize, upper_bound: Duration) -> Vec { + // Start with a seeded RNG so that we predictably regenerate our data. + let mut rng = SmallRng::seed_from_u64(len as u64); + + // This Gamma distribution gets us pretty close to a typical web server response time + // distribution where there's a big peak down low, and a long tail that drops off sharply. + let gamma = Gamma::new(1.75, 1.0); + + // Scale all the values by 22 million to simulate a lower bound of 22ms (but in + // nanoseconds) for all generated values. + gamma + .sample_iter(&mut rng) + .map(|n| n * upper_bound.as_nanos() as f64) + .map(|n| n as u64) + .take(len) + .collect::>() +} + +fn get_linear_distribution(len: usize) -> Vec { + let mut values = Vec::new(); + for i in 0..len as u64 { + values.push(i); + } + values +} + +macro_rules! define_basic_benches { + ($c:ident, $name:expr, $input:ident) => { + $c.bench( + $name, + Benchmark::new("compress", |b| { + b.iter_with_large_drop(|| { + let mut si = StreamingIntegers::new(); + si.compress(&$input); + si + }) + }) + .with_function("decompress", |b| { + let mut si = StreamingIntegers::new(); + si.compress(&$input); + + b.iter_with_large_drop(move || si.decompress()) + }) + .with_function("decompress + sum", |b| { + let mut si = StreamingIntegers::new(); + si.compress(&$input); + + b.iter_with_large_drop(move || { + let total: u64 = si.decompress().iter().sum(); + total + }) + }) + .with_function("decompress_with + sum", |b| { + let mut si = StreamingIntegers::new(); + si.compress(&$input); + + b.iter(move || { + let mut total = 0; + si.decompress_with(|batch| { + let batch_total: u64 = batch.iter().sum(); + total += batch_total; + }); + }); + }) + .throughput(Throughput::Elements($input.len() as u32)), + ) + }; +} + +fn streaming_integer_benchmark(c: &mut Criterion) { + define_basic_benches!(c, "normal small", NORMAL_SMALL); + define_basic_benches!(c, "normal medium", NORMAL_MEDIUM); + define_basic_benches!(c, "normal large", NORMAL_LARGE); + define_basic_benches!(c, "linear small", LINEAR_SMALL); + define_basic_benches!(c, "linear medium", LINEAR_MEDIUM); + define_basic_benches!(c, "linear large", LINEAR_LARGE); +} + +criterion_group!(benches, streaming_integer_benchmark); +criterion_main!(benches); diff --git a/metrics-util/src/bucket.rs b/metrics-util/src/bucket.rs new file mode 100644 index 0000000..8b644b7 --- /dev/null +++ b/metrics-util/src/bucket.rs @@ -0,0 +1,444 @@ +use crossbeam_epoch::{pin as epoch_pin, Atomic, Guard, Owned, Shared}; +use std::cell::UnsafeCell; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{mem, slice}; + +const BLOCK_SIZE: usize = 128; + +/// Discrete chunk of values with atomic read/write access. +struct Block { + // Write index. + write: AtomicUsize, + + // Read index. + read: AtomicUsize, + + // The individual slots. + slots: [UnsafeCell; BLOCK_SIZE], + + // The next block before this one. + prev: Atomic>, +} + +impl Block { + /// Creates a new [`Block`]. + pub fn new() -> Self { + Block { + write: AtomicUsize::new(0), + read: AtomicUsize::new(0), + slots: unsafe { mem::zeroed() }, + prev: Atomic::null(), + } + } + + /// Gets the current length of this block. + pub fn len(&self) -> usize { + self.read.load(Ordering::Acquire) + } + + /// Gets a slice of the data written to this block. + pub fn data(&self) -> &[T] { + let len = self.len(); + let head = self.slots[0].get(); + unsafe { slice::from_raw_parts(head as *const T, len) } + } + + /// Links this block to the previous block in the bucket. + pub fn set_prev(&self, prev: Shared>, guard: &Guard) { + match self + .prev + .compare_and_set(Shared::null(), prev, Ordering::AcqRel, guard) + { + Ok(_) => {} + Err(_) => unreachable!(), + } + } + + /// Pushes a value into this block. + pub fn push(&self, value: T) -> Result<(), T> { + // Try to increment the index. If we've reached the end of the block, let the bucket know + // so it can attach another block. + let index = self.write.fetch_add(1, Ordering::AcqRel); + if index >= BLOCK_SIZE { + return Err(value); + } + + // Update the slot. + unsafe { + self.slots[index].get().write(value); + } + + // Scoot our read index forward. + self.read.fetch_add(1, Ordering::AcqRel); + + Ok(()) + } +} + +unsafe impl Send for Block {} +unsafe impl Sync for Block {} + +impl Drop for Block { + fn drop(&mut self) { + let guard = &epoch_pin(); + let prev = self.prev.swap(Shared::null(), Ordering::AcqRel, guard); + if !prev.is_null() { + unsafe { + guard.defer_destroy(prev); + } + guard.flush(); + } + } +} + +/// An atomic bucket with snapshot capabilities. +/// +/// This bucket is implemented as a singly-linked list of blocks, where each block is a small +/// buffer that can hold a handful of elements. There is no limit to how many elements can be in +/// the bucket at a time. Blocks are dynamically allocated as elements are pushed into the bucket. +/// +/// Unlike a queue, buckets cannot be drained element by element: callers must iterate the whole +/// structure. Reading the bucket happens in reverse, to allow writers to make forward progress +/// without affecting the iteration of the previously-written values. +/// +/// The bucket can be cleared while a concurrent snapshot is taking place, and will not affect the +/// reader. +#[derive(Debug)] +pub struct AtomicBucket { + tail: Atomic>, +} + +impl AtomicBucket { + /// Creates a new, empty bucket. + pub fn new() -> Self { + AtomicBucket { + tail: Atomic::null(), + } + } + + /// Pushes an element into the bucket. + pub fn push(&self, value: T) { + let mut original = value; + loop { + // Load the tail block, or install a new one. + let guard = &epoch_pin(); + let mut tail = self.tail.load(Ordering::Acquire, guard); + if tail.is_null() { + // No blocks at all yet. We need to create one. + match self.tail.compare_and_set( + Shared::null(), + Owned::new(Block::new()), + Ordering::AcqRel, + guard, + ) { + // We won the race to install the new block. + Ok(ptr) => tail = ptr, + // Somebody else beat us, so just update our pointer. + Err(e) => tail = e.current, + } + } + + // We have a block now, so we need to try writing to it. + let tail_block = unsafe { tail.deref() }; + match tail_block.push(original) { + // If the push was OK, then the block wasn't full. It might _now_ be full, but we'll + // let future callers deal with installing a new block if necessary. + Ok(_) => return, + // The block was full, so we've been given the value back and we need to install a new block. + Err(value) => { + match self.tail.compare_and_set( + tail, + Owned::new(Block::new()), + Ordering::AcqRel, + guard, + ) { + // We managed to install the block, so we need to link this new block to + // the previous block. + Ok(ptr) => { + let new_tail = unsafe { ptr.deref() }; + new_tail.set_prev(tail, guard); + + // Now push into our new block. + match new_tail.push(value) { + // We wrote the value successfully, so we're good here! + Ok(_) => return, + // The block was full, so just loop and start over. + Err(value) => { + original = value; + continue; + } + } + } + // Somebody else installed the block before us, so let's just start over. + Err(_) => { + original = value; + continue; + } + } + } + } + } + } + + /// Collects all of the elements written to the bucket. + /// + /// This operation can be slow as it involves allocating enough space to hold all of the + /// elements within the bucket. Consider [`data_with`](AtomicBucket::data_with) to incrementally iterate + /// the internal blocks within the bucket. + /// + /// Elements are in partial reverse order: blocks are iterated in reverse order, but the + /// elements within them will appear in their original order. + pub fn data(&self) -> Vec + where + T: Clone, + { + let mut values = Vec::new(); + self.data_with(|block| values.extend_from_slice(block)); + values + } + + /// Iterates all of the elements written to the bucket, invoking `f` for each block. + /// + /// Elements are in partial reverse order: blocks are iterated in reverse order, but the + /// elements within them will appear in their original order. + pub fn data_with(&self, mut f: F) + where + F: FnMut(&[T]), + { + let guard = &epoch_pin(); + + // While we have a valid block -- either `tail` or the next block as we keep reading -- we + // load the data from each block and process it by calling `f`. + let mut block_ptr = self.tail.load(Ordering::Acquire, guard); + while !block_ptr.is_null() { + let block = unsafe { block_ptr.deref() }; + + // Read the data out of the block. + let data = block.data(); + f(data); + + // Load the next block. + block_ptr = block.prev.load(Ordering::Acquire, guard); + } + } + + /// Clears the bucket. + /// + /// Deallocation of the internal blocks happens only when all readers have finished, and so + /// will not necessarily occur during or immediately preceding this method. + /// + /// # Note + /// This method will not affect reads that are already in progress. + pub fn clear(&self) { + // We simply swap the tail pointer which effectively clears the bucket. Callers might + // still be in process of writing to the tail node, or reading the data, but new callers + // will see it as empty until another write proceeds. + let guard = &epoch_pin(); + let tail = self.tail.load(Ordering::Acquire, guard); + if !tail.is_null() { + if self + .tail + .compare_and_set(tail, Shared::null(), Ordering::SeqCst, guard) + .is_ok() + { + // We won the swap to delete the tail node. Now configure a deferred drop to clean + // things up once nobody else is using it. + unsafe { + // Drop the block, which will cause a cascading drop on the next block, and + // so on and so forth, until all blocks linked to this one are dropped. + guard.defer_destroy(tail); + } + guard.flush(); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::{AtomicBucket, Block, BLOCK_SIZE}; + + #[test] + fn test_create_new_block() { + let block: Block = Block::new(); + assert_eq!(block.len(), 0); + + let data = block.data(); + assert_eq!(data.len(), 0); + } + + #[test] + fn test_block_write_then_read() { + let block = Block::new(); + assert_eq!(block.len(), 0); + + let data = block.data(); + assert_eq!(data.len(), 0); + + let result = block.push(42); + assert!(result.is_ok()); + + let data = block.data(); + assert_eq!(data.len(), 1); + assert_eq!(data[0], 42); + } + + #[test] + fn test_block_write_until_full_then_read() { + let block = Block::new(); + assert_eq!(block.len(), 0); + + let data = block.data(); + assert_eq!(data.len(), 0); + + let mut i = 0; + let mut total = 0; + while i < BLOCK_SIZE as u64 { + assert!(block.push(i).is_ok()); + + total += i; + i += 1; + } + + let data = block.data(); + assert_eq!(data.len(), BLOCK_SIZE); + + let sum: u64 = data.iter().sum(); + assert_eq!(sum, total); + + let result = block.push(42); + assert!(result.is_err()); + } + + #[test] + fn test_block_write_until_full_then_read_mt() { + let block = Block::new(); + assert_eq!(block.len(), 0); + + let data = block.data(); + assert_eq!(data.len(), 0); + + let res = crossbeam::scope(|s| { + let t1 = s.spawn(|_| { + let mut i = 0; + let mut total = 0; + while i < BLOCK_SIZE as u64 / 2 { + assert!(block.push(i).is_ok()); + + total += i; + i += 1; + } + total + }); + + let t2 = s.spawn(|_| { + let mut i = 0; + let mut total = 0; + while i < BLOCK_SIZE as u64 / 2 { + assert!(block.push(i).is_ok()); + + total += i; + i += 1; + } + total + }); + + let t1_total = t1.join().unwrap(); + let t2_total = t2.join().unwrap(); + + t1_total + t2_total + }); + + let total = res.unwrap(); + + let data = block.data(); + assert_eq!(data.len(), BLOCK_SIZE); + + let sum: u64 = data.iter().sum(); + assert_eq!(sum, total); + + let result = block.push(42); + assert!(result.is_err()); + } + + #[test] + fn test_bucket_write_then_read() { + let bucket = AtomicBucket::new(); + bucket.push(42); + + let snapshot = bucket.data(); + assert_eq!(snapshot.len(), 1); + assert_eq!(snapshot[0], 42); + } + + #[test] + fn test_bucket_multiple_blocks_write_then_read() { + let bucket = AtomicBucket::new(); + + let snapshot = bucket.data(); + assert_eq!(snapshot.len(), 0); + + let target = (BLOCK_SIZE * 3 + BLOCK_SIZE / 2) as u64; + let mut i = 0; + let mut total = 0; + while i < target { + bucket.push(i); + + total += i; + i += 1; + } + + let snapshot = bucket.data(); + assert_eq!(snapshot.len(), target as usize); + + let sum: u64 = snapshot.iter().sum(); + assert_eq!(sum, total); + } + + #[test] + fn test_bucket_write_then_read_mt() { + let bucket = AtomicBucket::new(); + + let snapshot = bucket.data(); + assert_eq!(snapshot.len(), 0); + + let res = crossbeam::scope(|s| { + let t1 = s.spawn(|_| { + let mut i = 0; + let mut total = 0; + while i < BLOCK_SIZE as u64 * 10_000 { + bucket.push(i); + + total += i; + i += 1; + } + total + }); + + let t2 = s.spawn(|_| { + let mut i = 0; + let mut total = 0; + while i < BLOCK_SIZE as u64 * 10_000 { + bucket.push(i); + + total += i; + i += 1; + } + total + }); + + let t1_total = t1.join().unwrap(); + let t2_total = t2.join().unwrap(); + + t1_total + t2_total + }); + + let total = res.unwrap(); + + let snapshot = bucket.data(); + assert_eq!(snapshot.len(), BLOCK_SIZE * 20_000); + + let sum: u64 = snapshot.iter().sum(); + assert_eq!(sum, total); + } +} diff --git a/metrics-util/src/lib.rs b/metrics-util/src/lib.rs index cab2453..a9010a1 100644 --- a/metrics-util/src/lib.rs +++ b/metrics-util/src/lib.rs @@ -1,45 +1,9 @@ //! Helper types and functions used within the metrics ecosystem. +mod bucket; +pub use bucket::AtomicBucket; -/// A quantile that has both the raw value and a human-friendly display label. -#[derive(Clone)] -pub struct Quantile(f64, String); +mod streaming; +pub use streaming::StreamingIntegers; -impl Quantile { - /// Creates a new `Quantile` from a floating-point value. - /// - /// All values clamped between 0.0 and 1.0. - pub fn new(quantile: f64) -> Quantile { - let clamped = quantile.max(0.0); - let clamped = clamped.min(1.0); - let display = clamped * 100.0; - - let raw_label = format!("{}", clamped); - let label = match raw_label.as_str() { - "0" => "min".to_string(), - "1" => "max".to_string(), - _ => { - let raw = format!("p{}", display); - raw.replace(".", "") - }, - }; - - Quantile(clamped, label) - } - - /// Gets the human-friendly display label for this quantile. - pub fn label(&self) -> &str { - self.1.as_str() - } - - /// Gets the raw value for this quantile. - pub fn value(&self) -> f64 { - self.0 - } -} - -/// Parses a list of floating-point values into a list of `Quantile`s. -pub fn parse_quantiles(quantiles: &[f64]) -> Vec { - quantiles.iter() - .map(|f| Quantile::new(*f)) - .collect() -} +mod quantile; +pub use quantile::{parse_quantiles, Quantile}; diff --git a/metrics-util/src/quantile.rs b/metrics-util/src/quantile.rs new file mode 100644 index 0000000..2f6c6db --- /dev/null +++ b/metrics-util/src/quantile.rs @@ -0,0 +1,102 @@ +/// A quantile that has both the raw value and a human-friendly display label. +/// +/// We work with quantiles for optimal floating-point precison over percentiles, but most of the +/// time, monitoring systems show us percentiles, and usually in an abbreviated form: `p99`. +/// +/// On top of holding the quantile value, we calculate the familiar "p99" style of label, doing the +/// appropriate percentile conversion. Thus, if you have a quantile of `0.99`, the resulting label +/// is `p99`, and if you have a quantile of `0.999`, the resulting label is `p999`. +/// +/// There are two special cases, where we label `0.0` and `1.0` as `min` and `max`, respectively. +#[derive(Debug, Clone, PartialEq)] +pub struct Quantile(f64, String); + +impl Quantile { + /// Creates a new [`Quantile`] from a floating-point value. + /// + /// All values are clamped between 0.0 and 1.0. + pub fn new(quantile: f64) -> Quantile { + let clamped = quantile.max(0.0); + let clamped = clamped.min(1.0); + let display = clamped * 100.0; + + let raw_label = format!("{}", clamped); + let label = match raw_label.as_str() { + "0" => "min".to_string(), + "1" => "max".to_string(), + _ => { + let raw = format!("p{}", display); + raw.replace(".", "") + } + }; + + Quantile(clamped, label) + } + + /// Gets the human-friendly display label. + pub fn label(&self) -> &str { + self.1.as_str() + } + + /// Gets the raw quantile value. + pub fn value(&self) -> f64 { + self.0 + } +} + +/// Parses a slice of floating-point values into a vector of [`Quantile`]s. +pub fn parse_quantiles(quantiles: &[f64]) -> Vec { + quantiles.iter().map(|f| Quantile::new(*f)).collect() +} + +#[cfg(test)] +mod tests { + use super::{parse_quantiles, Quantile}; + + #[test] + fn test_quantiles() { + let min = Quantile::new(0.0); + assert_eq!(min.value(), 0.0); + assert_eq!(min.label(), "min"); + + let max = Quantile::new(1.0); + assert_eq!(max.value(), 1.0); + assert_eq!(max.label(), "max"); + + let p99 = Quantile::new(0.99); + assert_eq!(p99.value(), 0.99); + assert_eq!(p99.label(), "p99"); + + let p999 = Quantile::new(0.999); + assert_eq!(p999.value(), 0.999); + assert_eq!(p999.label(), "p999"); + + let p9999 = Quantile::new(0.9999); + assert_eq!(p9999.value(), 0.9999); + assert_eq!(p9999.label(), "p9999"); + + let under = Quantile::new(-1.0); + assert_eq!(under.value(), 0.0); + assert_eq!(under.label(), "min"); + + let over = Quantile::new(1.2); + assert_eq!(over.value(), 1.0); + assert_eq!(over.label(), "max"); + } + + #[test] + fn test_parse_quantiles() { + let empty = vec![]; + let result = parse_quantiles(&empty); + assert_eq!(result.len(), 0); + + let normal = vec![0.0, 0.5, 0.99, 0.999, 1.0]; + let result = parse_quantiles(&normal); + assert_eq!(result.len(), 5); + assert_eq!(result[0], Quantile::new(0.0)); + assert_eq!(result[1], Quantile::new(0.5)); + assert_eq!(result[2], Quantile::new(0.99)); + assert_eq!(result[3], Quantile::new(0.999)); + assert_eq!(result[4], Quantile::new(1.0)); + } +} diff --git a/metrics-util/src/streaming.rs b/metrics-util/src/streaming.rs new file mode 100644 index 0000000..ef1ed5b --- /dev/null +++ b/metrics-util/src/streaming.rs @@ -0,0 +1,295 @@ +use std::slice; + +/// A compressed set of integers. +/// +/// For some workloads, working with a large set of integers can require an outsized amount of +/// memory for numbers that are very similar. This data structure takes chunks of integers and +/// compresses then by using delta encoding and variable-byte encoding. +/// +/// Delta encoding tracks the difference between successive integers: if you have 1000000 and +/// 1000001, the difference between the two is only 1. Coupled with variable-byte encoding, we can +/// compress those two numbers within 4 bytes, where normally they would require a minimum of 8 +/// bytes if they were 32-bit integers, or 16 bytes if they were 64-bit integers. Over large runs +/// of integers where the delta is relatively small compared to the original value, the compression +/// savings add up quickly. +/// +/// The original integers can be decompressed and collected, or can be decompressed on-the-fly +/// while passing them to a given function, allowing callers to observe the integers without +/// allocating the entire size of the decompressed set. +/// +/// # Performance +/// As this is a scalar implementation, performance depends heavily on not only the input size, but +/// also the delta between values, as well as whether or not the decompressed values are being +/// collected or used on-the-fly. +/// +/// Bigger deltas between values means longer variable-byte sizes which is hard for the CPU to +/// predict. As the linear benchemarks show, things are much faster when the delta between values +/// is minimal. +/// +/// These figures were generated on a 2015 Macbook Pro (Core i7, 2.2GHz base/3.7GHz turbo). +/// +/// | | compress (1) | decompress (2) | decompress/sum (3) | decompress_with/sum (4) | +/// |------------------------|--------------|----------------|--------------------|-------------------------| +/// | normal, 100 values | 94 Melem/s | 76 Melem/s | 71 Melem/s | 126 Melem/s | +/// | normal, 10000 values | 92 Melem/s | 85 Melem/s | 109 Melem/s | 109 Melem/s | +/// | normal, 1000000 values | 86 Melem/s | 79 Melem/s | 68 Melem/s | 110 Melem/s | +/// | linear, 100 values | 334 Melem/s | 109 Melem/s | 110 Melem/s | 297 Melem/s | +/// | linear, 10000 values | 654 Melem/s | 174 Melem/s | 374 Melem/s | 390 Melem/s | +/// | linear, 1000000 values | 703 Melem/s | 180 Melem/s | 132 Melem/s | 392 Melem/s | +/// +/// The normal values consistent of an approximation of real nanosecond-based timing measurements +/// of a web service. The linear values are simply sequential integers ranging from 0 to the +/// configured size of the test run. +/// +/// Operations: +/// 1. simply compress the input set, no decompression +/// 2. decompress the entire compressed set into a single vector +/// 3. same as #2 but sum all of the original values at the end +/// 4. use `decompress_with` to sum the numbers incrementally +#[derive(Debug, Default, Clone)] +pub struct StreamingIntegers { + inner: Vec, + len: usize, + last: Option, +} + +impl StreamingIntegers { + /// Creates a new, empty streaming set. + pub fn new() -> Self { + Default::default() + } + + /// Returns the number of elements in the set, also referred to as its 'length'. + pub fn len(&self) -> usize { + self.len + } + + /// Returns `true` if the set contains no elements. + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + /// Compresses a slice of integers, and adds them to the set. + pub fn compress(&mut self, src: &[u64]) { + let src_len = src.len(); + if src_len == 0 { + return; + } + + self.len += src_len; + + // Technically, 64-bit integers can take up to 10 bytes when encoded as variable integers + // if they're at the maximum size, so we need to properly allocate here. As we directly + // operate on a mutable slice of the inner buffer below, we _can't_ afford to lazily + // allocate or guess at the resulting compression, otherwise we'll get a panic at runtime + // for bounds checks. + // + // TODO: we should try and add some heuristic here, because we're potentially + // overallocating by a lot when we plan for the worst case scenario + self.inner.reserve(src_len * 10); + + let mut buf_idx = self.inner.len(); + let buf_cap = self.inner.capacity(); + let mut buf = unsafe { + let buf_ptr = self.inner.as_mut_ptr(); + slice::from_raw_parts_mut(buf_ptr, buf_cap) + }; + + // If we have no last value, then the very first integer we write is the full value and not + // a delta value. + let mut src_idx = 0; + if self.last.is_none() { + let first = src[src_idx] as i64; + self.last = Some(first); + + let zigzag = zigzag_encode(first); + buf_idx = vbyte_encode(zigzag, &mut buf, buf_idx); + + src_idx += 1; + } + + // Set up for our actual compression run. + let mut last = self.last.unwrap(); + + while src_idx < src_len { + let value = src[src_idx] as i64; + let diff = value - last; + let zigzag = zigzag_encode(diff); + buf_idx = vbyte_encode(zigzag, &mut buf, buf_idx); + last = value; + src_idx += 1; + } + + unsafe { + self.inner.set_len(buf_idx); + } + + self.last = Some(last); + } + + /// Decompresses all of the integers written to the set. + /// + /// Returns a vector with all of the original values. For larger sets of integers, this can be + /// slow due to the allocation required. Consider [decompress_with] to incrementally iterate + /// the decompresset set in smaller chunks. + /// + /// [decompress_with]: StreamingIntegers::decompress_with + pub fn decompress(&self) -> Vec { + let mut values = Vec::new(); + + let mut buf_idx = 0; + let buf_len = self.inner.len(); + let buf = self.inner.as_slice(); + + let mut last = 0; + while buf_idx < buf_len { + let (value, new_idx) = vbyte_decode(&buf, buf_idx); + buf_idx = new_idx; + + let delta = zigzag_decode(value); + let original = last + delta; + last = original; + + values.push(original as u64); + } + + values + } + + /// Decompresses all of the integers written to the set, invoking `f` for each batch. + /// + /// During decompression, values are batched internally until a limit is reached, and then `f` + /// is called with a reference to the batch. This leads to minimal allocation to decompress + /// the entire set, for use cases where the values can be observed incrementally without issue. + pub fn decompress_with(&self, mut f: F) + where + F: FnMut(&[u64]), + { + let mut values = Vec::with_capacity(1024); + + let mut buf_idx = 0; + let buf_len = self.inner.len(); + let buf = self.inner.as_slice(); + + let mut last = 0; + while buf_idx < buf_len { + let (value, new_idx) = vbyte_decode(&buf, buf_idx); + buf_idx = new_idx; + + let delta = zigzag_decode(value); + let original = last + delta; + last = original; + + values.push(original as u64); + if values.len() == values.capacity() { + f(&values); + values.clear(); + } + } + + if !values.is_empty() { + f(&values); + } + } +} + +#[inline] +fn zigzag_encode(input: i64) -> u64 { + ((input << 1) ^ (input >> 63)) as u64 +} + +#[inline] +fn zigzag_decode(input: u64) -> i64 { + ((input >> 1) as i64) ^ (-((input & 1) as i64)) +} + +#[inline] +fn vbyte_encode(mut input: u64, buf: &mut [u8], mut buf_idx: usize) -> usize { + while input >= 128 { + buf[buf_idx] = 0x80 as u8 | (input as u8 & 0x7F); + buf_idx += 1; + input >>= 7; + } + buf[buf_idx] = input as u8; + buf_idx + 1 +} + +#[inline] +fn vbyte_decode(buf: &[u8], mut buf_idx: usize) -> (u64, usize) { + let mut tmp = 0; + let mut factor = 0; + loop { + tmp |= u64::from(buf[buf_idx] & 0x7F) << (7 * factor); + if buf[buf_idx] & 0x80 != 0x80 { + return (tmp, buf_idx + 1); + } + + buf_idx += 1; + factor += 1; + } +} + +#[cfg(test)] +mod tests { + use super::StreamingIntegers; + + #[test] + fn test_streaming_integers_new() { + let si = StreamingIntegers::new(); + let decompressed = si.decompress(); + assert_eq!(decompressed.len(), 0); + } + + #[test] + fn test_streaming_integers_single_block() { + let mut si = StreamingIntegers::new(); + let decompressed = si.decompress(); + assert_eq!(decompressed.len(), 0); + + let values = vec![8, 6, 7, 5, 3, 0, 9]; + si.compress(&values); + + let decompressed = si.decompress(); + assert_eq!(decompressed, values); + } + + #[test] + fn test_streaming_integers_multiple_blocks() { + let mut si = StreamingIntegers::new(); + let decompressed = si.decompress(); + assert_eq!(decompressed.len(), 0); + + let values = vec![8, 6, 7, 5, 3, 0, 9]; + si.compress(&values); + + let values2 = vec![6, 6, 6]; + si.compress(&values2); + + let values3 = vec![]; + si.compress(&values3); + + let values4 = vec![6, 6, 6, 7, 7, 7, 8, 8, 8]; + si.compress(&values4); + + let total = vec![values, values2, values3, values4] + .into_iter() + .flatten() + .collect::>(); + + let decompressed = si.decompress(); + assert_eq!(decompressed, total); + } + + #[test] + fn test_streaming_integers_empty_block() { + let mut si = StreamingIntegers::new(); + let decompressed = si.decompress(); + assert_eq!(decompressed.len(), 0); + + let values = vec![]; + si.compress(&values); + + let decompressed = si.decompress(); + assert_eq!(decompressed.len(), 0); + } +} diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 2e29a41..6ae14bf 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "metrics" -version = "0.9.1" +version = "0.10.0" authors = ["Toby Lawrence "] edition = "2018" @@ -26,15 +26,21 @@ default = ["exporters", "recorders"] exporters = ["metrics-exporter-log", "metrics-exporter-http"] recorders = ["metrics-recorder-text", "metrics-recorder-prometheus"] +[[bench]] +name = "histogram" +harness = false + [dependencies] metrics-core = { path = "../metrics-core", version = "^0.3" } -crossbeam-channel = "^0.3" +metrics-util = { path = "../metrics-util", version = "^0.2" } +im = "^12" +fxhash = "^0.2" +arc-swap = "^0.3" parking_lot = "^0.7" -fnv = "^1.0" -hashbrown = "^0.1" -quanta = "^0.2" +hashbrown = "^0.3" +quanta = "^0.3" futures = "^0.1" -tokio-sync = "^0.1" +crossbeam-utils = "^0.6" metrics-exporter-log = { path = "../metrics-exporter-log", version = "^0.2", optional = true } metrics-exporter-http = { path = "../metrics-exporter-http", version = "^0.1", optional = true } metrics-recorder-text = { path = "../metrics-recorder-text", version = "^0.2", optional = true } @@ -45,3 +51,5 @@ log = "^0.4" env_logger = "^0.6" getopts = "^0.2" hdrhistogram = "^6.1" +criterion = "^0.2.9" +lazy_static = "^1.3" diff --git a/metrics/benches/histogram.rs b/metrics/benches/histogram.rs new file mode 100644 index 0000000..eac1d6c --- /dev/null +++ b/metrics/benches/histogram.rs @@ -0,0 +1,77 @@ +#[macro_use] +extern crate criterion; + +#[macro_use] +extern crate lazy_static; + +use criterion::{Benchmark, Criterion, Throughput}; +use metrics::data::AtomicWindowedHistogram; +use quanta::{Builder as UpkeepBuilder, Clock, Handle as UpkeepHandle}; +use std::time::Duration; + +lazy_static! { + static ref QUANTA_UPKEEP: UpkeepHandle = { + let builder = UpkeepBuilder::new(Duration::from_millis(10)); + let handle = builder + .start() + .expect("failed to start quanta upkeep thread"); + handle + }; + static ref RANDOM_INTS: Vec = vec![ + 21061184, 21301862, 21331592, 21457012, 21500016, 21537837, 21581557, 21620030, 21664102, + 21678463, 21708437, 21751808, 21845243, 21850265, 21938879, 21971014, 22005842, 22034601, + 22085552, 22101746, 22115429, 22139883, 22260209, 22270768, 22298080, 22299780, 22307659, + 22354697, 22355668, 22359397, 22463872, 22496590, 22590978, 22603740, 22706352, 22820895, + 22849491, 22891538, 22912955, 22919915, 22928920, 22968656, 22985992, 23033739, 23061395, + 23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058, + 23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769, + 25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202, + 26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268, + 26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584, + 27024044, 27057184, 23061395, 23077554, 23138588, 23185172, 23282479, 23290830, 23316844, + 23386911, 23641319, 23677058, 23742930, 25350389, 25399746, 25404925, 25464391, 25478415, + 25480015, 25632783, 25639769, 25645612, 25688228, 25724427, 25862192, 25954476, 25994479, + 26008752, 26036460, 26038202, 26078874, 26118327, 26132679, 26207601, 26262418, 26270737, + 26274860, 26431248, 26434268, 26562736, 26580134, 26593740, 26618561, 26844181, 26866971, + 26907883, 27005270, 27023584, 27024044, 27057184, 23061395, 23077554, 23138588, 23185172, + 23282479, 23290830, 23316844, 23386911, 23641319, 23677058, 23742930, 25350389, 25399746, + 25404925, 25464391, 25478415, 25480015, 25632783, 25639769, 25645612, 25688228, 25724427, + 25862192, 25954476, 25994479, 26008752, 26036460, 26038202, 26078874, 26118327, 26132679, + 26207601, 26262418, 26270737, 26274860, 26431248, 26434268, 26562736, 26580134, 26593740, + 26618561, 26844181, 26866971, 26907883, 27005270, 27023584, 27024044, 27057184, 23061395, + 23077554, 23138588, 23185172, 23282479, 23290830, 23316844, 23386911, 23641319, 23677058, + 23742930, 25350389, 25399746, 25404925, 25464391, 25478415, 25480015, 25632783, 25639769, + 25645612, 25688228, 25724427, 25862192, 25954476, 25994479, 26008752, 26036460, 26038202, + 26078874, 26118327, 26132679, 26207601, 26262418, 26270737, 26274860, 26431248, 26434268, + 26562736, 26580134, 26593740, 26618561, 26844181, 26866971, 26907883, 27005270, 27023584, + 27024044, 27057184, 27088034, 27088550, 27302898, 27353925, 27412984, 27488633, 27514155, + 27558052, 27601937, 27606339, 27624514, 27680396, 27684064, 27963602, 27414982, 28450673 + ]; +} + +fn bucket_benchmark(c: &mut Criterion) { + // Trigger the quanta upkeep thread to spawn and start updating the time. + let _handle = &QUANTA_UPKEEP; + + c.bench( + "histogram", + Benchmark::new("record", |b| { + let clock = Clock::new(); + let bucket = AtomicWindowedHistogram::new( + Duration::from_secs(1), + Duration::from_millis(100), + clock, + ); + + b.iter(|| { + for value in RANDOM_INTS.iter() { + bucket.record(*value); + } + }) + }) + .throughput(Throughput::Elements(RANDOM_INTS.len() as u32)), + ); +} + +criterion_group!(benches, bucket_benchmark); +criterion_main!(benches); diff --git a/metrics/examples/benchmark.rs b/metrics/examples/benchmark.rs index 23412ca..252ac1c 100644 --- a/metrics/examples/benchmark.rs +++ b/metrics/examples/benchmark.rs @@ -8,53 +8,126 @@ extern crate metrics_core; use getopts::Options; use hdrhistogram::Histogram; -use metrics::{snapshot::TypedMeasurement, Receiver, Sink}; -use metrics_core::SnapshotProvider; +use metrics::{Receiver, Sink}; +use metrics_core::{Recorder, Snapshot, SnapshotProvider}; +use quanta::Clock; use std::{ env, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, thread, time::{Duration, Instant}, }; +const LOOP_SAMPLE: u64 = 1000; + struct Generator { stats: Sink, t0: Option, gauge: i64, hist: Histogram, done: Arc, + rate_counter: Arc, + clock: Clock, } impl Generator { - fn new(stats: Sink, done: Arc) -> Generator { + fn new( + stats: Sink, + done: Arc, + rate_counter: Arc, + clock: Clock, + ) -> Generator { Generator { stats, t0: None, gauge: 0, hist: Histogram::::new_with_bounds(1, u64::max_value(), 3).unwrap(), done, + rate_counter, + clock, } } fn run(&mut self) { + let mut counter = 0; loop { + counter += 1; + if self.done.load(Ordering::Relaxed) { break; } self.gauge += 1; + let t1 = self.stats.now(); if let Some(t0) = self.t0 { - let start = self.stats.now(); + let start = if counter % 33 == 0 { + self.stats.now() + } else { + 0 + }; + + let _ = self.stats.record_count("ok", 1); let _ = self.stats.record_timing("ok", t0, t1); let _ = self.stats.record_gauge("total", self.gauge); - let delta = self.stats.now() - start; - self.hist.saturating_record(delta); + + if start != 0 { + let delta = self.stats.now() - start; + self.hist.saturating_record(delta); + + // We also increment our global counter for the sample rate here. + self.rate_counter + .fetch_add(LOOP_SAMPLE * 3, Ordering::AcqRel); + } } + + self.t0 = Some(t1); + } + } + + fn run_cached(&mut self) { + let mut counter = 0; + + let counter_handle = self.stats.counter("ok"); + let timing_handle = self.stats.histogram("ok"); + let gauge_handle = self.stats.gauge("total"); + + loop { + counter += 1; + + if self.done.load(Ordering::Relaxed) { + break; + } + + self.gauge += 1; + + let t1 = self.clock.recent(); + + if let Some(t0) = self.t0 { + let start = if counter % LOOP_SAMPLE == 0 { + self.stats.now() + } else { + 0 + }; + + let _ = counter_handle.record(1); + let _ = timing_handle.record_timing(t0, t1); + let _ = gauge_handle.record(self.gauge); + + if start != 0 { + let delta = self.stats.now() - start; + self.hist.saturating_record(delta); + + // We also increment our global counter for the sample rate here. + self.rate_counter + .fetch_add(LOOP_SAMPLE * 3, Ordering::AcqRel); + } + } + self.t0 = Some(t1); } } @@ -89,18 +162,6 @@ pub fn opts() -> Options { "INTEGER", ); opts.optopt("p", "producers", "number of producers", "INTEGER"); - opts.optopt( - "c", - "capacity", - "maximum number of unprocessed items", - "INTEGER", - ); - opts.optopt( - "b", - "batch-size", - "maximum number of items in a batch", - "INTEGER", - ); opts.optflag("h", "help", "print this help menu"); opts @@ -134,31 +195,19 @@ fn main() { .unwrap_or_else(|| "60".to_owned()) .parse() .unwrap(); - let capacity = matches - .opt_str("capacity") - .unwrap_or_else(|| "1024".to_owned()) - .parse() - .unwrap(); - let batch_size = matches - .opt_str("batch-size") - .unwrap_or_else(|| "256".to_owned()) - .parse() - .unwrap(); let producers = matches .opt_str("producers") .unwrap_or_else(|| "1".to_owned()) .parse() .unwrap(); + info!("duration: {}s", seconds); info!("producers: {}", producers); - info!("capacity: {}", capacity); - info!("batch size: {}", batch_size); - let mut receiver = Receiver::builder() - .capacity(capacity) - .batch_size(batch_size) - .histogram(Duration::from_secs(5), Duration::from_secs(1)) - .build(); + let receiver = Receiver::builder() + .histogram(Duration::from_secs(5), Duration::from_millis(100)) + .build() + .expect("failed to build receiver"); let sink = receiver.get_sink(); let sink = sink.scoped(&["alpha", "pools", "primary"]); @@ -167,13 +216,17 @@ fn main() { // Spin up our sample producers. let done = Arc::new(AtomicBool::new(false)); + let rate_counter = Arc::new(AtomicU64::new(0)); let mut handles = Vec::new(); + let clock = Clock::new(); for _ in 0..producers { let s = sink.clone(); let d = done.clone(); + let r = rate_counter.clone(); + let c = clock.clone(); let handle = thread::spawn(move || { - Generator::new(s, d).run(); + Generator::new(s, d, r, c).run_cached(); }); handles.push(handle); @@ -182,10 +235,6 @@ fn main() { // Spin up the sink and let 'er rip. let controller = receiver.get_controller(); - thread::spawn(move || { - receiver.run(); - }); - // Poll the controller to figure out the sample rate. let mut total = 0; let mut t0 = Instant::now(); @@ -195,22 +244,11 @@ fn main() { let t1 = Instant::now(); let start = Instant::now(); - let snapshot = controller.get_snapshot(); + let snapshot = controller.get_snapshot().unwrap(); let end = Instant::now(); snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64); - let turn_total = snapshot - .unwrap() - .into_measurements() - .iter() - .fold(0, |acc, m| { - acc + match m { - TypedMeasurement::Counter(_key, value) => *value, - TypedMeasurement::Gauge(_key, value) => *value as u64, - _ => 0, - } - }); - + let turn_total = rate_counter.load(Ordering::Acquire); let turn_delta = turn_total - total; total = turn_total; let rate = turn_delta as f64 / (duration_as_nanos(t1 - t0) / 1_000_000_000.0); @@ -239,6 +277,34 @@ fn main() { } } +struct TotalRecorder { + total: u64, +} + +impl TotalRecorder { + pub fn new() -> Self { + Self { total: 0 } + } + + pub fn total(&self) -> u64 { + self.total + } +} + +impl Recorder for TotalRecorder { + fn record_counter>(&mut self, _key: K, value: u64) { + self.total += value; + } + + fn record_gauge>(&mut self, _key: K, value: i64) { + self.total += value as u64; + } + + fn record_histogram>(&mut self, _key: K, values: &[u64]) { + self.total += values.len() as u64; + } +} + fn duration_as_nanos(d: Duration) -> f64 { (d.as_secs() as f64 * 1e9) + d.subsec_nanos() as f64 } diff --git a/metrics/examples/stdbenchmark.rs b/metrics/examples/stdbenchmark.rs new file mode 100644 index 0000000..76f5e09 --- /dev/null +++ b/metrics/examples/stdbenchmark.rs @@ -0,0 +1,202 @@ +#[macro_use] +extern crate log; +extern crate env_logger; +extern crate getopts; +extern crate hdrhistogram; + +use getopts::Options; +use hdrhistogram::Histogram; +use quanta::Clock; +use std::{ + env, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, + }, + thread, + time::{Duration, Instant}, +}; + +struct Generator { + counter: Arc, + clock: Clock, + hist: Histogram, + done: Arc, +} + +impl Generator { + fn new(counter: Arc, done: Arc) -> Generator { + Generator { + counter, + clock: Clock::new(), + hist: Histogram::::new_with_bounds(1, u64::max_value(), 3).unwrap(), + done, + } + } + + fn run(&mut self) { + let mut counter = 0; + loop { + if self.done.load(Ordering::Relaxed) { + break; + } + + let start = if counter % 100 == 0 { + self.clock.now() + } else { + 0 + }; + + counter = self.counter.fetch_add(1, Ordering::AcqRel); + + if start != 0 { + let delta = self.clock.now() - start; + self.hist.saturating_record(delta); + } + } + } +} + +impl Drop for Generator { + fn drop(&mut self) { + info!( + " sender latency: min: {:9} p50: {:9} p95: {:9} p99: {:9} p999: {:9} max: {:9}", + nanos_to_readable(self.hist.min()), + nanos_to_readable(self.hist.value_at_percentile(50.0)), + nanos_to_readable(self.hist.value_at_percentile(95.0)), + nanos_to_readable(self.hist.value_at_percentile(99.0)), + nanos_to_readable(self.hist.value_at_percentile(99.9)), + nanos_to_readable(self.hist.max()) + ); + } +} + +fn print_usage(program: &str, opts: &Options) { + let brief = format!("Usage: {} [options]", program); + print!("{}", opts.usage(&brief)); +} + +pub fn opts() -> Options { + let mut opts = Options::new(); + + opts.optopt( + "d", + "duration", + "number of seconds to run the benchmark", + "INTEGER", + ); + opts.optopt("p", "producers", "number of producers", "INTEGER"); + opts.optflag("h", "help", "print this help menu"); + + opts +} + +fn main() { + env_logger::init(); + + let args: Vec = env::args().collect(); + let program = &args[0]; + let opts = opts(); + + let matches = match opts.parse(&args[1..]) { + Ok(m) => m, + Err(f) => { + error!("Failed to parse command line args: {}", f); + return; + } + }; + + if matches.opt_present("help") { + print_usage(program, &opts); + return; + } + + info!("metrics benchmark"); + + // Build our sink and configure the facets. + let seconds = matches + .opt_str("duration") + .unwrap_or_else(|| "60".to_owned()) + .parse() + .unwrap(); + let producers = matches + .opt_str("producers") + .unwrap_or_else(|| "1".to_owned()) + .parse() + .unwrap(); + + info!("duration: {}s", seconds); + info!("producers: {}", producers); + + // Spin up our sample producers. + let counter = Arc::new(AtomicU64::new(0)); + let done = Arc::new(AtomicBool::new(false)); + let mut handles = Vec::new(); + + for _ in 0..producers { + let c = counter.clone(); + let d = done.clone(); + let handle = thread::spawn(move || { + Generator::new(c, d).run(); + }); + + handles.push(handle); + } + + // Poll the controller to figure out the sample rate. + let mut total = 0; + let mut t0 = Instant::now(); + + let mut snapshot_hist = Histogram::::new_with_bounds(1, u64::max_value(), 3).unwrap(); + for _ in 0..seconds { + let t1 = Instant::now(); + + let start = Instant::now(); + let turn_total = counter.load(Ordering::Acquire); + let end = Instant::now(); + snapshot_hist.saturating_record(duration_as_nanos(end - start) as u64); + + let turn_delta = turn_total - total; + total = turn_total; + let rate = turn_delta as f64 / (duration_as_nanos(t1 - t0) / 1_000_000_000.0); + + info!("sample ingest rate: {:.0} samples/sec", rate); + t0 = t1; + thread::sleep(Duration::new(1, 0)); + } + + info!("--------------------------------------------------------------------------------"); + info!(" ingested samples total: {}", total); + info!( + "snapshot retrieval: min: {:9} p50: {:9} p95: {:9} p99: {:9} p999: {:9} max: {:9}", + nanos_to_readable(snapshot_hist.min()), + nanos_to_readable(snapshot_hist.value_at_percentile(50.0)), + nanos_to_readable(snapshot_hist.value_at_percentile(95.0)), + nanos_to_readable(snapshot_hist.value_at_percentile(99.0)), + nanos_to_readable(snapshot_hist.value_at_percentile(99.9)), + nanos_to_readable(snapshot_hist.max()) + ); + + // Wait for the producers to finish so we can get their stats too. + done.store(true, Ordering::SeqCst); + for handle in handles { + let _ = handle.join(); + } +} + +fn duration_as_nanos(d: Duration) -> f64 { + (d.as_secs() as f64 * 1e9) + d.subsec_nanos() as f64 +} + +fn nanos_to_readable(t: u64) -> String { + let f = t as f64; + if f < 1_000.0 { + format!("{}ns", f) + } else if f < 1_000_000.0 { + format!("{:.0}μs", f / 1_000.0) + } else if f < 2_000_000_000.0 { + format!("{:.2}ms", f / 1_000_000.0) + } else { + format!("{:.3}s", f / 1_000_000_000.0) + } +} diff --git a/metrics/src/builder.rs b/metrics/src/builder.rs new file mode 100644 index 0000000..af68ba5 --- /dev/null +++ b/metrics/src/builder.rs @@ -0,0 +1,98 @@ +use crate::{config::Configuration, Receiver}; +use std::error::Error; +use std::fmt; +use std::time::Duration; + +/// Errors during receiver creation. +#[derive(Debug, Clone)] +pub enum BuilderError { + /// Failed to spawn the upkeep thread. + /// + /// As histograms are windowed, reads and writes require getting the current time so they can + /// perform the required maintenance, or upkeep, on the internal structures to roll over old + /// buckets, etc. + /// + /// Acquiring the current time is fast compared to most operations, but is a significant + /// portion of the other time it takes to write to a histogram, which limits overall throughput + /// under high load. + /// + /// We spin up a background thread, or the "upkeep thread", which updates a global time source + /// that the read and write operations exclusively rely on. While this source is not as + /// up-to-date as the real clock, it is much faster to access. + UpkeepFailure, + + #[doc(hidden)] + _NonExhaustive, +} + +impl Error for BuilderError {} + +impl fmt::Display for BuilderError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + BuilderError::UpkeepFailure => write!(f, "failed to spawn quanta upkeep thread"), + BuilderError::_NonExhaustive => write!(f, "non-exhaustive matching"), + } + } +} + +/// Builder for [`Receiver`]. +#[derive(Clone)] +pub struct Builder { + pub(crate) histogram_window: Duration, + pub(crate) histogram_granularity: Duration, + pub(crate) upkeep_interval: Duration, +} + +impl Default for Builder { + fn default() -> Self { + Self { + histogram_window: Duration::from_secs(10), + histogram_granularity: Duration::from_secs(1), + upkeep_interval: Duration::from_millis(50), + } + } +} + +impl Builder { + /// Creates a new [`Builder`] with default values. + pub fn new() -> Self { + Default::default() + } + + /// Sets the histogram configuration. + /// + /// Defaults to a 10 second window with 1 second granularity. + /// + /// This controls both how long of a time window we track histogram data for, and the + /// granularity in which we roll off old data. + /// + /// As an example, with the default values, we would keep the last 10 seconds worth of + /// histogram data, and would remove 1 seconds worth of data at a time as the window rolled + /// forward. + pub fn histogram(mut self, window: Duration, granularity: Duration) -> Self { + self.histogram_window = window; + self.histogram_granularity = granularity; + self + } + + /// Sets the upkeep interval. + /// + /// Defaults to 50 milliseconds. + /// + /// This controls how often the time source, used internally by histograms, is updated with the + /// real time. For performance reasons, histograms use a sampled time source when they perform + /// checks to see if internal maintenance needs to occur. If the histogram granularity is set + /// very low, then this interval might need to be similarly reduced to make sure we're able to + /// update the time more often than histograms need to perform upkeep. + pub fn upkeep_interval(mut self, interval: Duration) -> Self { + self.upkeep_interval = interval; + self + } + + /// Create a [`Receiver`] based on this configuration. + pub fn build(self) -> Result { + let config = Configuration::from_builder(&self); + Receiver::from_config(config) + } +} diff --git a/metrics/src/common.rs b/metrics/src/common.rs new file mode 100644 index 0000000..1d12923 --- /dev/null +++ b/metrics/src/common.rs @@ -0,0 +1,242 @@ +use crate::data::AtomicWindowedHistogram; +use metrics_util::StreamingIntegers; +use quanta::Clock; +use std::borrow::Cow; +use std::ops::Deref; +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +/// Optimized metric name. +/// +/// This can either be a [`&'static str`](str) or [`String`]. +pub type MetricName = Cow<'static, str>; + +/// A scope, or context, for a metric. +/// +/// Not interacted with directly by end users, and only exposed due to a lack of trait method +/// visbility controls. +/// +/// See also: [Sink::scoped](crate::Sink::scoped). +#[derive(PartialEq, Eq, Hash, Clone)] +pub enum MetricScope { + /// Root scope. + Root, + + /// A nested scope, with arbitrarily deep nesting. + Nested(Vec), +} + +impl MetricScope { + pub(crate) fn into_scoped(self, name: MetricName) -> String { + match self { + MetricScope::Root => name.to_string(), + MetricScope::Nested(mut parts) => { + if !name.is_empty() { + parts.push(name.to_string()); + } + parts.join(".") + } + } + } +} + +pub(crate) type MetricScopeHandle = u64; + +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub(crate) enum MetricKind { + Counter, + Gauge, + Histogram, +} + +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub(crate) enum MetricIdentifier { + Unlabeled(MetricName, MetricScopeHandle, MetricKind), +} + +#[derive(Debug)] +enum ValueState { + Counter(AtomicU64), + Gauge(AtomicI64), + Histogram(AtomicWindowedHistogram), +} + +#[derive(Debug)] +pub(crate) enum ValueSnapshot { + Counter(u64), + Gauge(i64), + Histogram(StreamingIntegers), +} + +#[derive(Clone, Debug)] +/// Handle to the underlying measurement for a metric. +pub(crate) struct MetricValue { + state: Arc, +} + +impl MetricValue { + fn new(state: ValueState) -> Self { + MetricValue { + state: Arc::new(state), + } + } + + pub fn counter() -> Self { + Self::new(ValueState::Counter(AtomicU64::new(0))) + } + + pub fn gauge() -> Self { + Self::new(ValueState::Gauge(AtomicI64::new(0))) + } + + pub fn histogram(window: Duration, granularity: Duration, clock: Clock) -> Self { + Self::new(ValueState::Histogram(AtomicWindowedHistogram::new( + window, + granularity, + clock, + ))) + } + + pub fn update_counter(&self, value: u64) { + match self.state.deref() { + ValueState::Counter(inner) => { + inner.fetch_add(value, Ordering::Release); + } + _ => unreachable!("tried to access as counter, not a counter"), + } + } + + pub fn update_gauge(&self, value: i64) { + match self.state.deref() { + ValueState::Gauge(inner) => inner.store(value, Ordering::Release), + _ => unreachable!("tried to access as gauge, not a gauge"), + } + } + + pub fn update_histogram(&self, value: u64) { + match self.state.deref() { + ValueState::Histogram(inner) => inner.record(value), + _ => unreachable!("tried to access as histogram, not a histogram"), + } + } + + pub fn snapshot(&self) -> ValueSnapshot { + match self.state.deref() { + ValueState::Counter(inner) => { + let value = inner.load(Ordering::Acquire); + ValueSnapshot::Counter(value) + } + ValueState::Gauge(inner) => { + let value = inner.load(Ordering::Acquire); + ValueSnapshot::Gauge(value) + } + ValueState::Histogram(inner) => { + let stream = inner.snapshot(); + ValueSnapshot::Histogram(stream) + } + } + } +} + +/// Trait for types that represent time and can be subtracted from each other to generate a delta. +pub trait Delta { + /// Get the delta between this value and another value. + /// + /// For `Instant`, we explicitly return the nanosecond difference. For `u64`, we return the + /// integer difference, but the timescale itself can be whatever the user desires. + fn delta(&self, other: Self) -> u64; +} + +impl Delta for u64 { + fn delta(&self, other: u64) -> u64 { + self.wrapping_sub(other) + } +} + +impl Delta for Instant { + fn delta(&self, other: Instant) -> u64 { + let dur = *self - other; + dur.as_nanos() as u64 + } +} + +#[cfg(test)] +mod tests { + use super::{MetricScope, MetricValue, ValueSnapshot}; + use quanta::Clock; + use std::time::Duration; + + #[test] + fn test_metric_scope() { + let root_scope = MetricScope::Root; + assert_eq!(root_scope.into_scoped("".into()), "".to_string()); + + let root_scope = MetricScope::Root; + assert_eq!( + root_scope.into_scoped("jambalaya".into()), + "jambalaya".to_string() + ); + + let nested_scope = MetricScope::Nested(vec![]); + assert_eq!(nested_scope.into_scoped("".into()), "".to_string()); + + let nested_scope = MetricScope::Nested(vec![]); + assert_eq!( + nested_scope.into_scoped("toilet".into()), + "toilet".to_string() + ); + + let nested_scope = MetricScope::Nested(vec![ + "chamber".to_string(), + "of".to_string(), + "secrets".to_string(), + ]); + assert_eq!( + nested_scope.into_scoped("".into()), + "chamber.of.secrets".to_string() + ); + + let nested_scope = MetricScope::Nested(vec![ + "chamber".to_string(), + "of".to_string(), + "secrets".to_string(), + ]); + assert_eq!( + nested_scope.into_scoped("toilet".into()), + "chamber.of.secrets.toilet".to_string() + ); + } + + #[test] + fn test_metric_values() { + let counter = MetricValue::counter(); + counter.update_counter(42); + match counter.snapshot() { + ValueSnapshot::Counter(value) => assert_eq!(value, 42), + _ => panic!("incorrect value snapshot type for counter"), + } + + let gauge = MetricValue::gauge(); + gauge.update_gauge(23); + match gauge.snapshot() { + ValueSnapshot::Gauge(value) => assert_eq!(value, 23), + _ => panic!("incorrect value snapshot type for gauge"), + } + + let (mock, _) = Clock::mock(); + let histogram = + MetricValue::histogram(Duration::from_secs(10), Duration::from_secs(1), mock); + histogram.update_histogram(8675309); + histogram.update_histogram(5551212); + match histogram.snapshot() { + ValueSnapshot::Histogram(stream) => { + assert_eq!(stream.len(), 2); + + let values = stream.decompress(); + assert_eq!(&values[..], [8675309, 5551212]); + } + _ => panic!("incorrect value snapshot type for histogram"), + } + } +} diff --git a/metrics/src/config.rs b/metrics/src/config.rs new file mode 100644 index 0000000..f73b573 --- /dev/null +++ b/metrics/src/config.rs @@ -0,0 +1,20 @@ +use crate::Builder; +use std::time::Duration; + +/// Holds the configuration for complex metric types. +#[derive(Clone, Debug)] +pub(crate) struct Configuration { + pub histogram_window: Duration, + pub histogram_granularity: Duration, + pub upkeep_interval: Duration, +} + +impl Configuration { + pub fn from_builder(builder: &Builder) -> Self { + Self { + histogram_window: builder.histogram_window, + histogram_granularity: builder.histogram_granularity, + upkeep_interval: builder.upkeep_interval, + } + } +} diff --git a/metrics/src/configuration.rs b/metrics/src/configuration.rs deleted file mode 100644 index 463b52d..0000000 --- a/metrics/src/configuration.rs +++ /dev/null @@ -1,84 +0,0 @@ -use crate::receiver::Receiver; -use std::time::Duration; - -/// A configuration builder for [`Receiver`]. -#[derive(Clone)] -pub struct Configuration { - pub(crate) capacity: usize, - pub(crate) batch_size: usize, - pub(crate) histogram_window: Duration, - pub(crate) histogram_granularity: Duration, -} - -impl Default for Configuration { - fn default() -> Configuration { - Configuration { - capacity: 512, - batch_size: 64, - histogram_window: Duration::from_secs(10), - histogram_granularity: Duration::from_secs(1), - } - } -} - -impl Configuration { - /// Creates a new [`Configuration`] with default values. - pub fn new() -> Configuration { - Default::default() - } - - /// Sets the buffer capacity. - /// - /// Defaults to 512. - /// - /// This controls the size of the channel used to send metrics. This channel is shared amongst - /// all active sinks. If this channel is full when sending a metric, that send will be blocked - /// until the channel has free space. - /// - /// Tweaking this value allows for a trade-off between low memory consumption and throughput - /// burst capabilities. By default, we expect samples to occupy approximately 64 bytes. Thus, - /// at our default value, we preallocate roughly ~32KB. - /// - /// Generally speaking, sending and processing metrics is fast enough that the default value of - /// 512 supports millions of samples per second. - pub fn capacity(mut self, capacity: usize) -> Self { - self.capacity = capacity; - self - } - - /// Sets the batch size. - /// - /// Defaults to 64. - /// - /// This controls the size of message batches that we collect for processing. The only real - /// reason to tweak this is to control the latency from the sender side. Larger batches lower - /// the ingest latency in the face of high metric ingest pressure at the cost of higher ingest - /// tail latencies. - /// - /// Long story short, you shouldn't need to change this, but it's here if you really do. - pub fn batch_size(mut self, batch_size: usize) -> Self { - self.batch_size = batch_size; - self - } - - /// Sets the histogram configuration. - /// - /// Defaults to a 10 second window with 1 second granularity. - /// - /// This controls both how long of a time window we track histogram data for, and the - /// granularity in which we roll off old data. - /// - /// As an example, with the default values, we would keep the last 10 seconds worth of - /// histogram data, and would remove 1 seconds worth of data at a time as the window rolled - /// forward. - pub fn histogram(mut self, window: Duration, granularity: Duration) -> Self { - self.histogram_window = window; - self.histogram_granularity = granularity; - self - } - - /// Create a [`Receiver`] based on this configuration. - pub fn build(self) -> Receiver { - Receiver::from_config(self) - } -} diff --git a/metrics/src/control.rs b/metrics/src/control.rs index 6cc114d..e8c0a5f 100644 --- a/metrics/src/control.rs +++ b/metrics/src/control.rs @@ -1,19 +1,19 @@ -use super::data::snapshot::Snapshot; -use crossbeam_channel::{bounded, Sender}; +use crate::data::Snapshot; +use crate::registry::{MetricRegistry, ScopeRegistry}; use futures::prelude::*; use metrics_core::{AsyncSnapshotProvider, SnapshotProvider}; use std::error::Error; use std::fmt; -use tokio_sync::oneshot; +use std::sync::Arc; -/// Error conditions when retrieving a snapshot. +/// Error during snapshot retrieval. #[derive(Debug, Clone)] pub enum SnapshotError { - /// There was an internal error when trying to collect a snapshot. - InternalError, + /// The future was polled again after returning the snapshot. + AlreadyUsed, - /// A snapshot was requested but the receiver is shutdown. - ReceiverShutdown, + #[doc(hidden)] + _NonExhaustive, } impl Error for SnapshotError {} @@ -21,33 +21,33 @@ impl Error for SnapshotError {} impl fmt::Display for SnapshotError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - SnapshotError::InternalError => write!(f, "internal error while collecting snapshot"), - SnapshotError::ReceiverShutdown => write!(f, "receiver is shutdown"), + SnapshotError::AlreadyUsed => write!(f, "snapshot already returned from future"), + SnapshotError::_NonExhaustive => write!(f, "non-exhaustive matching"), } } } -/// Various control actions performed by a controller. -pub(crate) enum ControlFrame { - /// Takes a snapshot of the current metric state. - Snapshot(Sender), - - /// Takes a snapshot of the current metric state, but uses an asynchronous channel. - SnapshotAsync(oneshot::Sender), -} - -/// Dedicated handle for performing operations on a running [`Receiver`](crate::receiver::Receiver). +/// Handle for acquiring snapshots. /// -/// The caller is able to request metric snapshots at any time without requiring mutable access to -/// the sink. This all flows through the existing control mechanism, and so is very fast. +/// `Controller` is [`metrics-core`]-compatible as a snapshot provider, both for synchronous and +/// asynchronous snapshotting. +/// +/// [`metrics-core`]: https://docs.rs/metrics-core #[derive(Clone)] pub struct Controller { - control_tx: Sender, + metric_registry: Arc, + scope_registry: Arc, } impl Controller { - pub(crate) fn new(control_tx: Sender) -> Controller { - Controller { control_tx } + pub(crate) fn new( + metric_registry: Arc, + scope_registry: Arc, + ) -> Controller { + Controller { + metric_registry, + scope_registry, + } } } @@ -57,13 +57,8 @@ impl SnapshotProvider for Controller { /// Gets a snapshot. fn get_snapshot(&self) -> Result { - let (tx, rx) = bounded(0); - let msg = ControlFrame::Snapshot(tx); - - self.control_tx - .send(msg) - .map_err(|_| SnapshotError::ReceiverShutdown) - .and_then(move |_| rx.recv().map_err(|_| SnapshotError::InternalError)) + let snapshot = self.metric_registry.get_snapshot(); + Ok(snapshot) } } @@ -74,20 +69,22 @@ impl AsyncSnapshotProvider for Controller { /// Gets a snapshot asynchronously. fn get_snapshot_async(&self) -> Self::SnapshotFuture { - let (tx, rx) = oneshot::channel(); - let msg = ControlFrame::SnapshotAsync(tx); - - self.control_tx - .send(msg) - .map(move |_| SnapshotFuture::Waiting(rx)) - .unwrap_or(SnapshotFuture::Errored(SnapshotError::ReceiverShutdown)) + let snapshot = self.metric_registry.get_snapshot(); + SnapshotFuture::new(snapshot) } } /// A future representing collecting a snapshot. -pub enum SnapshotFuture { - Waiting(oneshot::Receiver), - Errored(SnapshotError), +pub struct SnapshotFuture { + snapshot: Option, +} + +impl SnapshotFuture { + pub fn new(snapshot: Snapshot) -> Self { + SnapshotFuture { + snapshot: Some(snapshot), + } + } } impl Future for SnapshotFuture { @@ -95,9 +92,9 @@ impl Future for SnapshotFuture { type Error = SnapshotError; fn poll(&mut self) -> Poll { - match self { - SnapshotFuture::Waiting(rx) => rx.poll().map_err(|_| SnapshotError::InternalError), - SnapshotFuture::Errored(err) => Err(err.clone()), - } + self.snapshot + .take() + .ok_or(SnapshotError::AlreadyUsed) + .map(Async::Ready) } } diff --git a/metrics/src/data/counter.rs b/metrics/src/data/counter.rs index f68d2e3..a4d226c 100644 --- a/metrics/src/data/counter.rs +++ b/metrics/src/data/counter.rs @@ -1,44 +1,19 @@ -use crate::data::ScopedKey; -use fnv::FnvBuildHasher; -use hashbrown::HashMap; +use crate::common::MetricValue; -pub(crate) struct Counter { - data: HashMap, +/// Proxy object to update a counter. +pub struct Counter { + handle: MetricValue, } impl Counter { - pub fn new() -> Counter { - Counter { - data: HashMap::default(), - } - } - - pub fn update(&mut self, key: ScopedKey, delta: u64) { - let value = self.data.entry(key).or_insert(0); - *value = value.wrapping_add(delta); - } - - pub fn values(&self) -> Vec<(ScopedKey, u64)> { - self.data.iter().map(|(k, v)| (k.clone(), *v)).collect() + /// Records a value for the counter. + pub fn record(&self, value: u64) { + self.handle.update_counter(value); } } -#[cfg(test)] -mod tests { - use super::{Counter, ScopedKey}; - - #[test] - fn test_counter_simple_update() { - let mut counter = Counter::new(); - - let key = ScopedKey(0, "foo".into()); - counter.update(key, 42); - - let key2 = ScopedKey(0, "foo".to_owned().into()); - counter.update(key2, 31); - - let values = counter.values(); - assert_eq!(values.len(), 1); - assert_eq!(values[0].1, 73); +impl From for Counter { + fn from(handle: MetricValue) -> Self { + Self { handle } } } diff --git a/metrics/src/data/gauge.rs b/metrics/src/data/gauge.rs index 9b8c866..f9a24c2 100644 --- a/metrics/src/data/gauge.rs +++ b/metrics/src/data/gauge.rs @@ -1,48 +1,19 @@ -use crate::data::ScopedKey; -use fnv::FnvBuildHasher; -use hashbrown::HashMap; +use crate::common::MetricValue; -pub(crate) struct Gauge { - data: HashMap, +/// Proxy object to update a gauge. +pub struct Gauge { + handle: MetricValue, } impl Gauge { - pub fn new() -> Gauge { - Gauge { - data: HashMap::default(), - } - } - - pub fn update(&mut self, key: ScopedKey, value: i64) { - let ivalue = self.data.entry(key).or_insert(0); - *ivalue = value; - } - - pub fn values(&self) -> Vec<(ScopedKey, i64)> { - self.data.iter().map(|(k, v)| (k.clone(), *v)).collect() + /// Records a value for the gauge. + pub fn record(&self, value: i64) { + self.handle.update_gauge(value); } } -#[cfg(test)] -mod tests { - use super::{Gauge, ScopedKey}; - - #[test] - fn test_gauge_simple_update() { - let mut gauge = Gauge::new(); - - let key = ScopedKey(0, "foo".into()); - gauge.update(key, 42); - - let values = gauge.values(); - assert_eq!(values.len(), 1); - assert_eq!(values[0].1, 42); - - let key2 = ScopedKey(0, "foo".to_owned().into()); - gauge.update(key2, 43); - - let values = gauge.values(); - assert_eq!(values.len(), 1); - assert_eq!(values[0].1, 43); +impl From for Gauge { + fn from(handle: MetricValue) -> Self { + Self { handle } } } diff --git a/metrics/src/data/histogram.rs b/metrics/src/data/histogram.rs index b3b745a..a1b1187 100644 --- a/metrics/src/data/histogram.rs +++ b/metrics/src/data/histogram.rs @@ -1,212 +1,371 @@ -use crate::{data::ScopedKey, helper::duration_as_nanos}; -use fnv::FnvBuildHasher; -use hashbrown::HashMap; -use std::time::{Duration, Instant}; +use crate::common::{Delta, MetricValue}; +use crate::helper::duration_as_nanos; +use crossbeam_utils::Backoff; +use metrics_util::{AtomicBucket, StreamingIntegers}; +use quanta::Clock; +use std::cmp; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::time::Duration; -pub(crate) struct Histogram { - window: Duration, - granularity: Duration, - data: HashMap, +/// Proxy object to update a histogram. +pub struct Histogram { + handle: MetricValue, } impl Histogram { - pub fn new(window: Duration, granularity: Duration) -> Histogram { - Histogram { - window, - granularity, - data: HashMap::default(), - } + /// Records a timing for the histogram. + pub fn record_timing(&self, start: D, end: D) { + let value = end.delta(start); + self.handle.update_histogram(value); } - pub fn update(&mut self, key: ScopedKey, value: u64) { - if let Some(wh) = self.data.get_mut(&key) { - wh.update(value); - } else { - let mut wh = WindowedRawHistogram::new(self.window, self.granularity); - wh.update(value); - let _ = self.data.insert(key, wh); - } - } - - pub fn upkeep(&mut self, at: Instant) { - for (_, histogram) in self.data.iter_mut() { - histogram.upkeep(at); - } - } - - pub fn values(&self) -> Vec<(ScopedKey, HistogramSnapshot)> { - self.data - .iter() - .map(|(k, v)| (k.clone(), v.snapshot())) - .collect() + /// Records a value for the histogram. + pub fn record_value(&self, value: u64) { + self.handle.update_histogram(value); } } -pub(crate) struct WindowedRawHistogram { - buckets: Vec>, - num_buckets: usize, - bucket_index: usize, - last_upkeep: Instant, - granularity: Duration, +impl From for Histogram { + fn from(handle: MetricValue) -> Self { + Self { handle } + } } -impl WindowedRawHistogram { - pub fn new(window: Duration, granularity: Duration) -> WindowedRawHistogram { - let num_buckets = - ((duration_as_nanos(window) / duration_as_nanos(granularity)) as usize) + 1; - let mut buckets = Vec::with_capacity(num_buckets); +/// An atomic windowed histogram. +/// +/// This histogram provides a windowed view of values that rolls forward over time, dropping old +/// values as they exceed the window of the histogram. Writes into the histogram are lock-free, as +/// well as snapshots of the histogram. +#[derive(Debug)] +pub struct AtomicWindowedHistogram { + buckets: Vec>, + bucket_count: usize, + granularity: u64, + upkeep_index: AtomicUsize, + index: AtomicUsize, + next_upkeep: AtomicU64, + clock: Clock, +} - for _ in 0..num_buckets { - let histogram = Vec::new(); - buckets.push(histogram); +impl AtomicWindowedHistogram { + /// Creates a new [`AtomicWindowedHistogram`]. + /// + /// Internally, a number of buckets will be created, based on how many times `granularity` goes + /// into `window`. As time passes, buckets will be cleared to avoid values older than the + /// `window` duration. + /// + /// As buckets will hold values represneting a period of time up to `granularity`, the + /// granularity can be lowered or raised to roll values off more precisely, or less precisely, + /// against the provided clock. + /// + /// # Panics + /// Panics if `granularity` is larger than `window`. + pub fn new(window: Duration, granularity: Duration, clock: Clock) -> Self { + let window_ns = duration_as_nanos(window); + let granularity_ns = duration_as_nanos(granularity); + assert!(window_ns > granularity_ns); + let now = clock.recent(); + + let bucket_count = ((window_ns / granularity_ns) as usize) + 1; + let mut buckets = Vec::new(); + for _ in 0..bucket_count { + buckets.push(AtomicBucket::new()); } - WindowedRawHistogram { + let next_upkeep = now + granularity_ns; + + AtomicWindowedHistogram { buckets, - num_buckets, - bucket_index: 0, - last_upkeep: Instant::now(), - granularity, + bucket_count, + granularity: granularity_ns, + upkeep_index: AtomicUsize::new(0), + index: AtomicUsize::new(0), + next_upkeep: AtomicU64::new(next_upkeep), + clock, } } - pub fn upkeep(&mut self, at: Instant) { - if at >= self.last_upkeep + self.granularity { - self.bucket_index += 1; - self.bucket_index %= self.num_buckets; - self.buckets[self.bucket_index].clear(); - self.last_upkeep = at; + /// Takes a snapshot of the current histogram. + /// + /// Returns a [`StreamingIntegers`] value, representing all observed values in the + /// histogram. As writes happen concurrently, along with buckets being cleared, a snapshot is + /// not guaranteed to have all values present at the time the method was called. + pub fn snapshot(&self) -> StreamingIntegers { + // Run upkeep to make sure our window reflects any time passage since the last write. + let index = self.upkeep(); + + let mut streaming = StreamingIntegers::new(); + + // Start from the bucket ahead of the currently-being-written-to-bucket so that we outrace + // any upkeep and get access to more of the data. + for i in 0..self.bucket_count { + let bucket_index = (index + i + 1) % self.bucket_count; + let bucket = &self.buckets[bucket_index]; + bucket.data_with(|block| streaming.compress(block)); } + streaming } - pub fn update(&mut self, value: u64) { - self.buckets[self.bucket_index].push(value); + /// Records a value to the histogram. + pub fn record(&self, value: u64) { + let index = self.upkeep(); + self.buckets[index].push(value); } - pub fn snapshot(&self) -> HistogramSnapshot { - let mut aggregate = Vec::new(); - for bucket in &self.buckets { - aggregate.extend_from_slice(&bucket); + fn upkeep(&self) -> usize { + let backoff = Backoff::new(); + + loop { + // Start by figuring out if the histogram needs to perform upkeep. + let now = self.clock.recent(); + let next_upkeep = self.next_upkeep.load(Ordering::Acquire); + if now <= next_upkeep { + let index = self.index.load(Ordering::Acquire); + let actual_index = index % self.bucket_count; + + return actual_index; + } + + // We do need to perform upkeep, but someone *else* might actually be doing it already, + // so go ahead and wait until the index is caught up with the upkeep index: the upkeep + // index will be ahead of index until upkeep is complete. + let mut upkeep_in_progress = false; + let mut index = 0; + loop { + index = self.index.load(Ordering::Acquire); + let upkeep_index = self.upkeep_index.load(Ordering::Acquire); + if index == upkeep_index { + break; + } + + upkeep_in_progress = true; + backoff.snooze(); + } + + // If we waited for another upkeep operation to complete, then there's the chance that + // enough time has passed that we're due for upkeep again, so restart our loop. + if upkeep_in_progress { + continue; + } + + // Figure out how many buckets, up to the maximum, need to be cleared based on the + // delta between the target upkeep time and the actual time. We always clear at least + // one bucket, but may need to clear them all. + let delta = now - next_upkeep; + let bucket_depth = cmp::min((delta / self.granularity) as usize, self.bucket_count) + 1; + + // Now that we we know how many buckets we need to clear, update the index to pointer + // writers at the next bucket past the last one that we will be clearing. + let new_index = index + bucket_depth; + let prev_index = self + .index + .compare_and_swap(index, new_index, Ordering::SeqCst); + if prev_index == index { + // Clear the target bucket first, and then update the upkeep target time so new + // writers can proceed. We may still have other buckets to clean up if we had + // multiple rounds worth of upkeep to do, but this will let new writes proceed as + // soon as possible. + let clear_index = new_index % self.bucket_count; + self.buckets[clear_index].clear(); + + let now = self.clock.now(); + let next_upkeep = now + self.granularity; + self.next_upkeep.store(next_upkeep, Ordering::Release); + + // Now that we've cleared the actual bucket that writers will use going forward, we + // have to clear any older buckets that we skipped over. If our granularity was 1 + // second, and we skipped over 4 seconds worth of buckets, we would still have + // 3 buckets to clear, etc. + let last_index = new_index - 1; + while index < last_index { + index += 1; + let clear_index = index % self.bucket_count; + self.buckets[clear_index].clear(); + } + + // We've cleared the old buckets, so upkeep is done. Push our upkeep index forward + // so that writers who were blocked waiting for upkeep to conclude can restart. + self.upkeep_index.store(new_index, Ordering::Release); + } } - - HistogramSnapshot::new(aggregate) - } -} - -/// A point-in-time snapshot of a single histogram. -#[derive(Debug, PartialEq, Eq)] -pub struct HistogramSnapshot { - values: Vec, -} - -impl HistogramSnapshot { - pub(crate) fn new(values: Vec) -> Self { - HistogramSnapshot { values } - } - - /// Gets the raw values that compromise the entire histogram. - pub fn values(&self) -> &Vec { - &self.values } } #[cfg(test)] mod tests { - use super::{Histogram, ScopedKey, WindowedRawHistogram}; - use std::time::{Duration, Instant}; + use super::{AtomicWindowedHistogram, Clock}; + use crossbeam_utils::thread; + use std::time::Duration; #[test] fn test_histogram_simple_update() { - let mut histogram = Histogram::new(Duration::new(5, 0), Duration::new(1, 0)); + let (clock, _ctl) = Clock::mock(); + let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock); - let key = ScopedKey(0, "foo".into()); - histogram.update(key, 1245); + h.record(1245); - let values = histogram.values(); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 1); + + let values = snapshot.decompress(); assert_eq!(values.len(), 1); - - let hdr = &values[0].1; - assert_eq!(hdr.values().len(), 1); - assert_eq!(hdr.values().get(0).unwrap(), &1245); + assert_eq!(values.get(0).unwrap(), &1245); } #[test] fn test_histogram_complex_update() { - let mut histogram = Histogram::new(Duration::new(5, 0), Duration::new(1, 0)); + let (clock, _ctl) = Clock::mock(); + let h = AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_secs(1), clock); - let key = ScopedKey(0, "foo".into()); - histogram.update(key.clone(), 1245); - histogram.update(key.clone(), 213); - histogram.update(key.clone(), 1022); - histogram.update(key, 1248); + h.record(1245); + h.record(213); + h.record(1022); + h.record(1248); - let values = histogram.values(); - assert_eq!(values.len(), 1); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 4); - let hdr = &values[0].1; - assert_eq!(hdr.values().len(), 4); - assert_eq!(hdr.values().get(0).unwrap(), &1245); - assert_eq!(hdr.values().get(1).unwrap(), &213); - assert_eq!(hdr.values().get(2).unwrap(), &1022); - assert_eq!(hdr.values().get(3).unwrap(), &1248); + let values = snapshot.decompress(); + assert_eq!(values.len(), 4); + assert_eq!(values.get(0).unwrap(), &1245); + assert_eq!(values.get(1).unwrap(), &213); + assert_eq!(values.get(2).unwrap(), &1022); + assert_eq!(values.get(3).unwrap(), &1248); } #[test] fn test_windowed_histogram_rollover() { - let mut wh = WindowedRawHistogram::new(Duration::new(5, 0), Duration::new(1, 0)); - let now = Instant::now(); + let (clock, ctl) = Clock::mock(); - let snapshot = wh.snapshot(); - assert_eq!(snapshot.values().len(), 0); + // Set our granularity at right below a second, so that when we when add a second, we don't + // land on the same exact value, and our "now" time should always be ahead of the upkeep + // time when we expect it to be. + let h = + AtomicWindowedHistogram::new(Duration::from_secs(5), Duration::from_millis(999), clock); - wh.update(1); - wh.update(2); - let snapshot = wh.snapshot(); - assert_eq!(snapshot.values().len(), 2); + // Histogram is empty, snapshot is empty. + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 0); + + // Immediately add two values, and observe the histogram and snapshot having two values. + h.record(1); + h.record(2); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 2); + let total: u64 = snapshot.decompress().iter().sum(); + assert_eq!(total, 3); // Roll forward 3 seconds, should still have everything. - let now = now + Duration::new(1, 0); - wh.upkeep(now); - let snapshot = wh.snapshot(); - assert_eq!(snapshot.values().len(), 2); + ctl.increment(Duration::from_secs(3)); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 2); + let total: u64 = snapshot.decompress().iter().sum(); + assert_eq!(total, 3); - let now = now + Duration::new(1, 0); - wh.upkeep(now); - let snapshot = wh.snapshot(); - assert_eq!(snapshot.values().len(), 2); + // Roll forward 1 second, should still have everything. + ctl.increment(Duration::from_secs(1)); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 2); + let total: u64 = snapshot.decompress().iter().sum(); + assert_eq!(total, 3); - let now = now + Duration::new(1, 0); - wh.upkeep(now); - let snapshot = wh.snapshot(); - assert_eq!(snapshot.values().len(), 2); + // Roll forward 1 second, should still have everything. + ctl.increment(Duration::from_secs(1)); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 2); + let total: u64 = snapshot.decompress().iter().sum(); + assert_eq!(total, 3); - // Pump in some new values. - wh.update(3); - wh.update(4); - wh.update(5); + // Pump in some new values. We should have a total of 5 values now. + h.record(3); + h.record(4); + h.record(5); - let snapshot = wh.snapshot(); - assert_eq!(snapshot.values().len(), 5); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 5); + let total: u64 = snapshot.decompress().iter().sum(); + assert_eq!(total, 15); - // Roll forward 3 seconds, and make sure the first two values are gone. - // You might think this should be 2 seconds, but we have one extra bucket - // allocated so that there's always a clear bucket that we can write into. - // This means we have more than our total window, but only having the exact - // number of buckets would mean we were constantly missing a bucket's worth - // of granularity. - let now = now + Duration::new(1, 0); - wh.upkeep(now); - let snapshot = wh.snapshot(); - assert_eq!(snapshot.values().len(), 5); + // Roll forward 6 seconds, in increments. The first one rolls over a single bucket, and + // cleans bucket #0, the first one we wrote to. The second and third ones get us right up + // to the last three values, and then clear them out. + ctl.increment(Duration::from_secs(1)); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 3); + let total: u64 = snapshot.decompress().iter().sum(); + assert_eq!(total, 12); - let now = now + Duration::new(1, 0); - wh.upkeep(now); - let snapshot = wh.snapshot(); - assert_eq!(snapshot.values().len(), 5); + ctl.increment(Duration::from_secs(4)); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 3); + let total: u64 = snapshot.decompress().iter().sum(); + assert_eq!(total, 12); - let now = now + Duration::new(1, 0); - wh.upkeep(now); - let snapshot = wh.snapshot(); - assert_eq!(snapshot.values().len(), 3); + ctl.increment(Duration::from_secs(1)); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 0); + + // We should also be able to advance by vast periods of time and observe not only old + // values going away but no weird overflow issues or index or anything. This ensures that + // our upkeep code functions not just for under-load single bucket rollovers but also "been + // idle for a while and just got a write" scenarios. + h.record(42); + + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 1); + let total: u64 = snapshot.decompress().iter().sum(); + assert_eq!(total, 42); + + ctl.increment(Duration::from_secs(1000)); + let snapshot = h.snapshot(); + assert_eq!(snapshot.len(), 0); + } + + #[test] + fn test_histogram_write_gauntlet_mt() { + let clock = Clock::new(); + let clock2 = clock.clone(); + let target = clock.now() + Duration::from_secs(5).as_nanos() as u64; + let h = AtomicWindowedHistogram::new( + Duration::from_secs(20), + Duration::from_millis(500), + clock, + ); + + thread::scope(|s| { + let t1 = s.spawn(|_| { + let mut total = 0; + while clock2.now() < target { + h.record(42); + total += 1; + } + total + }); + let t2 = s.spawn(|_| { + let mut total = 0; + while clock2.now() < target { + h.record(42); + total += 1; + } + total + }); + let t3 = s.spawn(|_| { + let mut total = 0; + while clock2.now() < target { + h.record(42); + total += 1; + } + total + }); + + let t1_total = t1.join().expect("thread 1 panicked during test"); + let t2_total = t2.join().expect("thread 2 panicked during test"); + let t3_total = t3.join().expect("thread 3 panicked during test"); + + let total = t1_total + t2_total + t3_total; + let snap = h.snapshot(); + assert_eq!(total, snap.len()); + }) + .unwrap(); } } diff --git a/metrics/src/data/mod.rs b/metrics/src/data/mod.rs index d52f243..4e83f16 100644 --- a/metrics/src/data/mod.rs +++ b/metrics/src/data/mod.rs @@ -1,73 +1,12 @@ -use std::{ - borrow::Cow, - fmt::{self, Display}, -}; +//! Core data types for metrics. +mod counter; +pub use counter::Counter; -pub mod counter; -pub mod gauge; -pub mod histogram; -pub mod snapshot; +mod gauge; +pub use gauge::Gauge; -pub(crate) use self::{counter::Counter, gauge::Gauge, histogram::Histogram, snapshot::Snapshot}; +mod histogram; +pub use histogram::{AtomicWindowedHistogram, Histogram}; -pub type MetricKey = Cow<'static, str>; - -/// A measurement. -/// -/// Samples are the decoupled way of submitting data into the sink. -#[derive(Debug)] -pub(crate) enum Sample { - /// A counter delta. - /// - /// The value is added directly to the existing counter, and so negative deltas will decrease - /// the counter, and positive deltas will increase the counter. - Count(ScopedKey, u64), - - /// A single value, also known as a gauge. - /// - /// Values operate in last-write-wins mode. - /// - /// Values themselves cannot be incremented or decremented, so you must hold them externally - /// before sending them. - Gauge(ScopedKey, i64), - - /// A timed sample. - /// - /// Includes the start and end times. - TimingHistogram(ScopedKey, u64, u64), - - /// A single value measured over time. - /// - /// Unlike a gauge, where the value is only ever measured at a point in time, value histogram - /// measure values over time, and their distribution. This is nearly identical to timing - /// histograms, since the end result is just a single number, but we don't spice it up with - /// special unit labels or anything. - ValueHistogram(ScopedKey, u64), -} - -/// An integer scoped metric key. -#[derive(Clone, Hash, PartialEq, Eq, Debug)] -pub(crate) struct ScopedKey(pub u64, pub MetricKey); - -impl ScopedKey { - pub(crate) fn id(&self) -> u64 { - self.0 - } - pub(crate) fn into_string_scoped(self, scope: String) -> StringScopedKey { - StringScopedKey(scope, self.1) - } -} - -/// A string scoped metric key. -#[derive(Clone, Hash, PartialEq, Eq, Debug)] -pub(crate) struct StringScopedKey(String, MetricKey); - -impl Display for StringScopedKey { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - if self.0.is_empty() { - write!(f, "{}", self.1) - } else { - write!(f, "{}.{}", self.0, self.1.as_ref()) - } - } -} +mod snapshot; +pub use snapshot::Snapshot; diff --git a/metrics/src/data/snapshot.rs b/metrics/src/data/snapshot.rs index e211c07..fa589fd 100644 --- a/metrics/src/data/snapshot.rs +++ b/metrics/src/data/snapshot.rs @@ -1,86 +1,28 @@ -use super::histogram::HistogramSnapshot; +use crate::common::ValueSnapshot; use metrics_core::{Recorder, Snapshot as MetricsSnapshot}; -use std::fmt::Display; - -/// A typed metric measurement, used in snapshots. -/// -/// This type provides a way to wrap the value of a metric, for use in a snapshot, while also -/// providing the overall type of the metric, so that downstream consumers who how to properly -/// format the data. -#[derive(Debug, PartialEq, Eq)] -pub enum TypedMeasurement { - Counter(String, u64), - Gauge(String, i64), - TimingHistogram(String, HistogramSnapshot), - ValueHistogram(String, HistogramSnapshot), -} /// A point-in-time view of metric data. #[derive(Default, Debug)] pub struct Snapshot { - measurements: Vec, + measurements: Vec<(String, ValueSnapshot)>, } impl Snapshot { - /// Stores a counter value for the given metric key. - pub(crate) fn set_count(&mut self, key: T, value: u64) - where - T: Display, - { - self.measurements - .push(TypedMeasurement::Counter(key.to_string(), value)); - } - - /// Stores a gauge value for the given metric key. - pub(crate) fn set_gauge(&mut self, key: T, value: i64) - where - T: Display, - { - self.measurements - .push(TypedMeasurement::Gauge(key.to_string(), value)); - } - - /// Sets timing percentiles for the given metric key. - /// - /// From the given `HdrHistogram`, all the specific `percentiles` will be extracted and stored. - pub(crate) fn set_timing_histogram(&mut self, key: T, h: HistogramSnapshot) - where - T: Display, - { - self.measurements - .push(TypedMeasurement::TimingHistogram(key.to_string(), h)); - } - - /// Sets value percentiles for the given metric key. - /// - /// From the given `HdrHistogram`, all the specific `percentiles` will be extracted and stored. - pub(crate) fn set_value_histogram(&mut self, key: T, h: HistogramSnapshot) - where - T: Display, - { - self.measurements - .push(TypedMeasurement::ValueHistogram(key.to_string(), h)); - } - - /// Converts this [`Snapshot`] to the underlying vector of measurements. - pub fn into_measurements(self) -> Vec { - self.measurements + pub(crate) fn from(from: Vec<(String, ValueSnapshot)>) -> Self { + Snapshot { measurements: from } } } impl MetricsSnapshot for Snapshot { /// Records the snapshot to the given recorder. fn record(&self, recorder: &mut R) { - for measurement in &self.measurements { - match measurement { - TypedMeasurement::Counter(key, value) => recorder.record_counter(key, *value), - TypedMeasurement::Gauge(key, value) => recorder.record_gauge(key, *value), - TypedMeasurement::TimingHistogram(key, hs) => { - recorder.record_histogram(key, hs.values().as_slice()); - } - TypedMeasurement::ValueHistogram(key, hs) => { - recorder.record_histogram(key, hs.values().as_slice()); - } + for (key, snapshot) in &self.measurements { + match snapshot { + ValueSnapshot::Counter(value) => recorder.record_counter(key, *value), + ValueSnapshot::Gauge(value) => recorder.record_gauge(key, *value), + ValueSnapshot::Histogram(stream) => stream.decompress_with(|values| { + recorder.record_histogram(key, values); + }), } } } @@ -88,7 +30,8 @@ impl MetricsSnapshot for Snapshot { #[cfg(test)] mod tests { - use super::{HistogramSnapshot, MetricsSnapshot, Recorder, Snapshot, TypedMeasurement}; + use super::{MetricsSnapshot, Recorder, Snapshot, ValueSnapshot}; + use metrics_util::StreamingIntegers; use std::collections::HashMap; #[derive(Default)] @@ -128,29 +71,19 @@ mod tests { } } - #[test] - fn test_snapshot_simple_set_and_get() { - let key = "ok".to_owned(); - let mut snapshot = Snapshot::default(); - snapshot.set_count(key.clone(), 1); - snapshot.set_gauge(key.clone(), 42); - - let values = snapshot.into_measurements(); - - assert_eq!(values[0], TypedMeasurement::Counter(key.clone(), 1)); - assert_eq!(values[1], TypedMeasurement::Gauge(key.clone(), 42)); - } - #[test] fn test_snapshot_recorder() { let key = "ok".to_owned(); - let mut snapshot = Snapshot::default(); - snapshot.set_count(key.clone(), 7); - snapshot.set_gauge(key.clone(), 42); + let mut measurements = Vec::new(); + measurements.push((key.clone(), ValueSnapshot::Counter(7))); + measurements.push((key.clone(), ValueSnapshot::Gauge(42))); let hvalues = vec![10, 25, 42, 97]; - let histogram = HistogramSnapshot::new(hvalues); - snapshot.set_timing_histogram(key.clone(), histogram); + let mut stream = StreamingIntegers::new(); + stream.compress(&hvalues); + measurements.push((key.clone(), ValueSnapshot::Histogram(stream))); + + let snapshot: Snapshot = Snapshot::from(measurements); let mut recorder = MockRecorder::default(); snapshot.record(&mut recorder); diff --git a/metrics/src/helper.rs b/metrics/src/helper.rs index 0212980..049f601 100644 --- a/metrics/src/helper.rs +++ b/metrics/src/helper.rs @@ -1,12 +1,4 @@ -use std::{ - io::{Error, ErrorKind}, - time::Duration, -}; - -/// Helpers to create an I/O error from a string. -pub fn io_error(reason: &str) -> Error { - Error::new(ErrorKind::Other, reason) -} +use std::time::Duration; /// Converts a duration to nanoseconds. pub fn duration_as_nanos(d: Duration) -> u64 { diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index 3a3da59..7a3946d 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -7,16 +7,18 @@ //! //! The library follows a pattern of "senders" and a "receiver." //! -//! Callers create a [`Receiver`], which acts as a contained unit: metric registration, -//! aggregation, and summarization. The [`Receiver`] is intended to be spawned onto a dedicated -//! background thread. +//! Callers create a [`Receiver`], which acts as a registry for all metrics that flow through it. +//! It allows creating new sinks as well as controllers, both necessary to push in and pull out +//! metrics from the system. It also manages background resources necessary for the registry to +//! operate. //! //! Once a [`Receiver`] is created, callers can either create a [`Sink`] for sending metrics, or a //! [`Controller`] for getting metrics out. //! -//! A [`Sink`] can be cheaply cloned and does not require a mutable reference to send metrics, so -//! callers have increased flexibility in usage and control over whether or not to clone sinks, -//! share references, etc. +//! A [`Sink`] can be cheaply cloned, and offers convenience methods for getting the current time +//! as well as getting direct handles to a given metric. This allows users to either work with the +//! fuller API exposed by [`Sink`] or to take a compositional approach and embed fields that +//! represent each particular metric to be sent. //! //! A [`Controller`] provides both a synchronous and asynchronous snapshotting interface, which is //! [`metrics-core`][metrics_core] compatible for exporting. This allows flexibility in @@ -25,8 +27,11 @@ //! //! # Performance //! -//! Being based on [`crossbeam-channel`][crossbeam_channel] allows us to process close to ten -//! million metrics per second using a single core, with average ingest latencies of around 100ns. +//! Users can expect to be able to send tens of millions of samples per second, with ingest +//! latencies at roughly 65-70ns at p50, and 250ns at p99. Depending on the workload -- counters +//! vs histograms -- latencies may be even lower, as counters and gauges are markedly faster to +//! update than histograms. Concurrent updates of the same metric will also cause natural +//! contention and lower the throughput/increase the latency of ingestion. //! //! # Metrics //! @@ -39,8 +44,8 @@ //! # extern crate metrics; //! use metrics::Receiver; //! use std::{thread, time::Duration}; -//! let receiver = Receiver::builder().build(); -//! let sink = receiver.get_sink(); +//! let receiver = Receiver::builder().build().expect("failed to create receiver"); +//! let mut sink = receiver.get_sink(); //! //! // We can update a counter. Counters are monotonic, unsigned integers that start at 0 and //! // increase over time. @@ -54,18 +59,18 @@ //! // which utilizes a high-speed internal clock. This method returns the time in nanoseconds, so //! // we get great resolution, but giving the time in nanoseconds isn't required! If you want to //! // send it in another unit, that's fine, but just pay attention to that fact when viewing and -//! // using those metrics once exported. +//! // using those metrics once exported. We also support passing `Instant` values -- both `start` +//! // and `end` need to be the same type, though! -- and we'll take the nanosecond output of that. //! let start = sink.now(); //! thread::sleep(Duration::from_millis(10)); //! let end = sink.now(); -//! sink.record_timing("db.gizmo_query", start, end); +//! sink.record_timing("db.queries.select_products_ns", start, end); //! //! // Finally, we can update a value histogram. Technically speaking, value histograms aren't //! // fundamentally different from timing histograms. If you use a timing histogram, we do the -//! // math for you of getting the time difference, and we make sure the metric name has the right -//! // unit suffix so you can tell it's measuring time, but other than that, nearly identical! -//! let buf_size = 4096; -//! sink.record_value("buf_size", buf_size); +//! // math for you of getting the time difference, but other than that, identical under the hood. +//! let row_count = 46; +//! sink.record_value("db.queries.select_products_num_rows", row_count); //! ``` //! //! # Scopes @@ -82,24 +87,24 @@ //! ``` //! # extern crate metrics; //! use metrics::Receiver; -//! let receiver = Receiver::builder().build(); +//! let receiver = Receiver::builder().build().expect("failed to create receiver"); //! //! // This sink has no scope aka the root scope. The metric will just end up as "widgets". -//! let root_sink = receiver.get_sink(); +//! let mut root_sink = receiver.get_sink(); //! root_sink.record_count("widgets", 42); //! //! // This sink is under the "secret" scope. Since we derived ourselves from the root scope, //! // we're not nested under anything, but our metric name will end up being "secret.widgets". -//! let scoped_sink = root_sink.scoped("secret"); +//! let mut scoped_sink = root_sink.scoped("secret"); //! scoped_sink.record_count("widgets", 42); //! //! // This sink is under the "supersecret" scope, but we're also nested! The metric name for this //! // sample will end up being "secret.supersecret.widget". -//! let scoped_sink_two = scoped_sink.scoped("supersecret"); +//! let mut scoped_sink_two = scoped_sink.scoped("supersecret"); //! scoped_sink_two.record_count("widgets", 42); //! //! // Sinks retain their scope even when cloned, so the metric name will be the same as above. -//! let cloned_sink = scoped_sink_two.clone(); +//! let mut cloned_sink = scoped_sink_two.clone(); //! cloned_sink.record_count("widgets", 42); //! //! // This sink will be nested two levels deeper than its parent by using a slightly different @@ -107,24 +112,72 @@ //! // nesting N levels deep. //! // //! // This metric name will end up being "super.secret.ultra.special.widgets". -//! let scoped_sink_three = scoped_sink.scoped(&["super", "secret", "ultra", "special"]); +//! let mut scoped_sink_three = scoped_sink.scoped(&["super", "secret", "ultra", "special"]); //! scoped_sink_two.record_count("widgets", 42); //! ``` //! -//! [crossbeam_channel]: https://docs.rs/crossbeam-channel +//! # Snapshots +//! +//! Naturally, we need a way to get the metrics out of the system, which is where snapshots come +//! into play. By utilizing a [`Controller`], we can take a snapshot of the current metrics in the +//! registry, and then output them to any desired system/interface by utilizing +//! [`Recorder`](metrics_core::Recorder). A number of pre-baked recorders (which only concern +//! themselves with formatting the data) and exporters (which take the formatted data and either +//! serve it up, such as exposing an HTTP endpoint, or write it somewhere, like stdout) are +//! available, some of which are exposed by this crate. +//! +//! Let's take an example of writing out our metrics in a yaml-like format, writing them via +//! `log!`: +//! ``` +//! # extern crate metrics; +//! use metrics::{Receiver, recorders::TextRecorder, exporters::LogExporter}; +//! use log::Level; +//! use std::{thread, time::Duration}; +//! let receiver = Receiver::builder().build().expect("failed to create receiver"); +//! let mut sink = receiver.get_sink(); +//! +//! // We can update a counter. Counters are monotonic, unsigned integers that start at 0 and +//! // increase over time. +//! // Take some measurements, similar to what we had in other examples: +//! sink.record_count("widgets", 5); +//! sink.record_gauge("red_balloons", 99); +//! +//! let start = sink.now(); +//! thread::sleep(Duration::from_millis(10)); +//! let end = sink.now(); +//! sink.record_timing("db.queries.select_products_ns", start, end); +//! sink.record_timing("db.gizmo_query", start, end); +//! +//! let num_rows = 46; +//! sink.record_value("db.queries.select_products_num_rows", num_rows); +//! +//! // Now create our exporter/recorder configuration, and wire it up. +//! let exporter = LogExporter::new(receiver.get_controller(), TextRecorder::new(), Level::Info); +//! +//! // This exporter will now run every 5 seconds, taking a snapshot, rendering it, and writing it +//! // via `log!` at the informational level. This particular exporter is running directly on the +//! // current thread, and not on a background thread. +//! // +//! // exporter.run(Duration::from_secs(5)); +//! ``` +//! Most exporters have the ability to run on the current thread or to be converted into a future +//! which can be spawned on any Tokio-compatible runtime. +//! //! [metrics_core]: https://docs.rs/metrics-core -mod configuration; +//! [`Recorder`]: https://docs.rs/metrics-core/0.3.1/metrics_core/trait.Recorder.html +#![deny(missing_docs)] +#![warn(unused_extern_crates)] +mod builder; +mod common; +mod config; mod control; -mod data; +pub mod data; mod helper; mod receiver; -mod scopes; +mod registry; mod sink; -#[cfg(any( - feature = "metrics-exporter-log", - feature = "metrics-exporter-http" -))] +#[cfg(any(feature = "metrics-exporter-log", feature = "metrics-exporter-http"))] pub mod exporters; #[cfg(any( @@ -134,13 +187,9 @@ pub mod exporters; pub mod recorders; pub use self::{ - configuration::Configuration, + builder::{Builder, BuilderError}, + common::{Delta, MetricName, MetricScope}, control::{Controller, SnapshotError}, - data::histogram::HistogramSnapshot, receiver::Receiver, sink::{AsScoped, Sink, SinkError}, }; - -pub mod snapshot { - pub use super::data::snapshot::{Snapshot, TypedMeasurement}; -} diff --git a/metrics/src/receiver.rs b/metrics/src/receiver.rs index 003c92a..c06a510 100644 --- a/metrics/src/receiver.rs +++ b/metrics/src/receiver.rs @@ -1,220 +1,69 @@ use crate::{ - configuration::Configuration, - control::{ControlFrame, Controller}, - data::{Counter, Gauge, Histogram, Sample, ScopedKey, Snapshot, StringScopedKey}, - scopes::Scopes, + builder::{Builder, BuilderError}, + common::MetricScope, + config::Configuration, + control::Controller, + registry::{MetricRegistry, ScopeRegistry}, sink::Sink, }; -use crossbeam_channel::{self, bounded, tick, Select, TryRecvError}; -use quanta::Clock; -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; +use quanta::{Builder as UpkeepBuilder, Clock, Handle as UpkeepHandle}; +use std::sync::Arc; -/// Wrapper for all messages that flow over the data channel between sink/receiver. -pub(crate) enum MessageFrame { - /// A normal data message holding a metric sample. - Data(Sample), -} - -/// Metrics receiver which aggregates and processes samples. +/// Central store for metrics. +/// +/// `Receiver` is the nucleus for all metrics operations. While no operations are performed by it +/// directly, it holds the registeries and references to resources and so it must live as long as +/// any [`Sink`] or `[`Controller`] does. pub struct Receiver { - config: Configuration, - - // Sample aggregation machinery. - msg_tx: crossbeam_channel::Sender, - msg_rx: Option>, - control_tx: crossbeam_channel::Sender, - control_rx: Option>, - - // Metric machinery. - counter: Counter, - gauge: Gauge, - thistogram: Histogram, - vhistogram: Histogram, - + metric_registry: Arc, + scope_registry: Arc, clock: Clock, - scopes: Arc, + _upkeep_handle: UpkeepHandle, } impl Receiver { - pub(crate) fn from_config(config: Configuration) -> Receiver { - // Create our data, control, and buffer channels. - let (msg_tx, msg_rx) = bounded(config.capacity); - let (control_tx, control_rx) = bounded(16); + pub(crate) fn from_config(config: Configuration) -> Result { + // Configure our clock and configure the quanta upkeep thread. The upkeep thread does that + // for us, and keeps us within `upkeep_interval` of the true time. The reads of this cache + // time are faster than calling the underlying time source directly, and for histogram + // windowing, we can afford to have a very granular value compared to the raw nanosecond + // precsion provided by quanta by default. + let clock = Clock::new(); + let upkeep = UpkeepBuilder::new_with_clock(config.upkeep_interval, clock.clone()); + let _upkeep_handle = upkeep.start().map_err(|_| BuilderError::UpkeepFailure)?; - let histogram_window = config.histogram_window; - let histogram_granularity = config.histogram_granularity; - - Receiver { + let scope_registry = Arc::new(ScopeRegistry::new()); + let metric_registry = Arc::new(MetricRegistry::new( + scope_registry.clone(), config, - msg_tx, - msg_rx: Some(msg_rx), - control_tx, - control_rx: Some(control_rx), - counter: Counter::new(), - gauge: Gauge::new(), - thistogram: Histogram::new(histogram_window, histogram_granularity), - vhistogram: Histogram::new(histogram_window, histogram_granularity), - clock: Clock::new(), - scopes: Arc::new(Scopes::new()), - } + clock.clone(), + )); + + Ok(Receiver { + metric_registry, + scope_registry, + clock, + _upkeep_handle, + }) } - /// Gets a builder to configure a [`Receiver`] instance with. - pub fn builder() -> Configuration { - Configuration::default() + /// Creates a new [`Builder`] for building a [`Receiver`]. + pub fn builder() -> Builder { + Builder::default() } /// Creates a [`Sink`] bound to this receiver. pub fn get_sink(&self) -> Sink { - Sink::new_with_scope_id( - self.msg_tx.clone(), + Sink::new( + self.metric_registry.clone(), + self.scope_registry.clone(), + MetricScope::Root, self.clock.clone(), - self.scopes.clone(), - "".to_owned(), - 0, ) } /// Creates a [`Controller`] bound to this receiver. pub fn get_controller(&self) -> Controller { - Controller::new(self.control_tx.clone()) - } - - /// Run the receiver. - /// - /// This is blocking, and should be run in a dedicated background thread. - pub fn run(&mut self) { - let batch_size = self.config.batch_size; - let mut batch = Vec::with_capacity(batch_size); - let upkeep_rx = tick(Duration::from_millis(100)); - let control_rx = self.control_rx.take().expect("failed to take control rx"); - let msg_rx = self.msg_rx.take().expect("failed to take msg rx"); - - let mut selector = Select::new(); - let _ = selector.recv(&upkeep_rx); - let _ = selector.recv(&control_rx); - let _ = selector.recv(&msg_rx); - - loop { - // Block on having something to do. - let _ = selector.ready(); - - if upkeep_rx.try_recv().is_ok() { - let now = Instant::now(); - self.thistogram.upkeep(now); - self.vhistogram.upkeep(now); - } - - while let Ok(cframe) = control_rx.try_recv() { - self.process_control_frame(cframe); - } - - loop { - match msg_rx.try_recv() { - Ok(mframe) => batch.push(mframe), - Err(TryRecvError::Empty) => break, - Err(e) => eprintln!("error receiving message frame: {}", e), - } - - if batch.len() == batch_size { - break; - } - } - - if !batch.is_empty() { - for mframe in batch.drain(0..) { - self.process_msg_frame(mframe); - } - } - } - } - - /// Gets the string representation of an integer scope. - /// - /// Returns `Some(scope)` if found, `None` otherwise. Scope ID `0` is reserved for the root - /// scope. - fn get_string_scope(&self, key: ScopedKey) -> Option { - let scope_id = key.id(); - if scope_id == 0 { - return Some(key.into_string_scoped("".to_owned())); - } - - self.scopes - .get(scope_id) - .map(|scope| key.into_string_scoped(scope)) - } - - /// Gets a snapshot of the current metrics/facets. - fn get_snapshot(&self) -> Snapshot { - let mut snapshot = Snapshot::default(); - let cvalues = self.counter.values(); - let gvalues = self.gauge.values(); - let tvalues = self.thistogram.values(); - let vvalues = self.vhistogram.values(); - - for (key, value) in cvalues { - if let Some(actual_key) = self.get_string_scope(key) { - snapshot.set_count(actual_key, value); - } - } - - for (key, value) in gvalues { - if let Some(actual_key) = self.get_string_scope(key) { - snapshot.set_gauge(actual_key, value); - } - } - - for (key, value) in tvalues { - if let Some(actual_key) = self.get_string_scope(key) { - snapshot.set_timing_histogram(actual_key, value); - } - } - - for (key, value) in vvalues { - if let Some(actual_key) = self.get_string_scope(key) { - snapshot.set_value_histogram(actual_key, value); - } - } - - snapshot - } - - /// Processes a control frame. - fn process_control_frame(&self, msg: ControlFrame) { - match msg { - ControlFrame::Snapshot(tx) => { - let snapshot = self.get_snapshot(); - let _ = tx.send(snapshot); - } - ControlFrame::SnapshotAsync(tx) => { - let snapshot = self.get_snapshot(); - let _ = tx.send(snapshot); - } - } - } - - /// Processes a message frame. - fn process_msg_frame(&mut self, msg: MessageFrame) { - match msg { - MessageFrame::Data(sample) => match sample { - Sample::Count(key, count) => { - self.counter.update(key, count); - } - Sample::Gauge(key, value) => { - self.gauge.update(key, value); - } - Sample::TimingHistogram(key, start, end) => { - let delta = end - start; - self.counter.update(key.clone(), 1); - self.thistogram.update(key, delta); - } - Sample::ValueHistogram(key, value) => { - self.vhistogram.update(key, value); - } - }, - } + Controller::new(self.metric_registry.clone(), self.scope_registry.clone()) } } diff --git a/metrics/src/registry/metric.rs b/metrics/src/registry/metric.rs new file mode 100644 index 0000000..4256b76 --- /dev/null +++ b/metrics/src/registry/metric.rs @@ -0,0 +1,86 @@ +use crate::common::{MetricIdentifier, MetricKind, MetricValue}; +use crate::config::Configuration; +use crate::data::Snapshot; +use crate::registry::ScopeRegistry; +use arc_swap::{ptr_eq, ArcSwap}; +use im::hashmap::HashMap; +use quanta::Clock; +use std::ops::Deref; +use std::sync::Arc; + +pub(crate) struct MetricRegistry { + scope_registry: Arc, + metrics: ArcSwap>, + config: Configuration, + clock: Clock, +} + +impl MetricRegistry { + pub fn new(scope_registry: Arc, config: Configuration, clock: Clock) -> Self { + MetricRegistry { + scope_registry, + metrics: ArcSwap::new(Arc::new(HashMap::new())), + config, + clock, + } + } + + pub fn get_value_handle(&self, identifier: MetricIdentifier) -> MetricValue { + loop { + match self.metrics.lease().deref().get(&identifier) { + Some(handle) => return handle.clone(), + None => { + let kind = match &identifier { + MetricIdentifier::Unlabeled(_, _, kind) => kind, + }; + + let value_handle = match kind { + MetricKind::Counter => MetricValue::counter(), + MetricKind::Gauge => MetricValue::gauge(), + MetricKind::Histogram => MetricValue::histogram( + self.config.histogram_window, + self.config.histogram_granularity, + self.clock.clone(), + ), + }; + + let metrics_ptr = self.metrics.lease(); + let mut metrics = metrics_ptr.deref().clone(); + match metrics.insert(identifier.clone(), value_handle.clone()) { + // Somebody else beat us to it, loop. + Some(_) => continue, + None => { + // If we weren't able to cleanly update the map, then try again. + let old = self + .metrics + .compare_and_swap(&metrics_ptr, Arc::new(metrics)); + if !ptr_eq(old, metrics_ptr) { + continue; + } + } + } + + return value_handle; + } + } + } + } + + pub fn get_snapshot(&self) -> Snapshot { + let mut named_values = Vec::new(); + + let metrics = self.metrics.load().deref().clone(); + for (identifier, value) in metrics.into_iter() { + let (name, scope_handle) = match identifier { + MetricIdentifier::Unlabeled(name, scope, _) => (name, scope), + }; + + let scope = self.scope_registry.get(scope_handle); + let scoped_name = scope.into_scoped(name); + let snapshot = value.snapshot(); + named_values.push((scoped_name, snapshot)); + } + + Snapshot::from(named_values) + } +} diff --git a/metrics/src/registry/mod.rs b/metrics/src/registry/mod.rs new file mode 100644 index 0000000..a047fd1 --- /dev/null +++ b/metrics/src/registry/mod.rs @@ -0,0 +1,5 @@ +mod scope; +pub(crate) use self::scope::ScopeRegistry; + +mod metric; +pub(crate) use self::metric::MetricRegistry; diff --git a/metrics/src/registry/scope.rs b/metrics/src/registry/scope.rs new file mode 100644 index 0000000..fe8edff --- /dev/null +++ b/metrics/src/registry/scope.rs @@ -0,0 +1,57 @@ +use crate::common::{MetricScope, MetricScopeHandle}; +use parking_lot::RwLock; +use std::collections::HashMap; + +struct Inner { + id: u64, + forward: HashMap, + backward: HashMap, +} + +impl Inner { + pub fn new() -> Self { + Inner { + id: 1, + forward: HashMap::new(), + backward: HashMap::new(), + } + } +} + +pub(crate) struct ScopeRegistry { + inner: RwLock, +} + +impl ScopeRegistry { + pub fn new() -> Self { + Self { + inner: RwLock::new(Inner::new()), + } + } + + pub fn register(&self, scope: MetricScope) -> u64 { + let mut wg = self.inner.write(); + + // If the key is already registered, send back the existing scope ID. + if wg.forward.contains_key(&scope) { + return wg.forward.get(&scope).cloned().unwrap(); + } + + // Otherwise, take the current scope ID for this registration, store it, and increment + // the scope ID counter for the next registration. + let scope_id = wg.id; + let _ = wg.forward.insert(scope.clone(), scope_id); + let _ = wg.backward.insert(scope_id, scope); + wg.id += 1; + scope_id + } + + pub fn get(&self, scope_id: MetricScopeHandle) -> MetricScope { + // See if we have an entry for the scope ID, and clone the scope if so. + let rg = self.inner.read(); + rg.backward + .get(&scope_id) + .cloned() + .unwrap_or(MetricScope::Root) + } +} diff --git a/metrics/src/sink.rs b/metrics/src/sink.rs index 224eff7..03f935f 100644 --- a/metrics/src/sink.rs +++ b/metrics/src/sink.rs @@ -1,68 +1,72 @@ use crate::{ - data::{MetricKey, Sample, ScopedKey}, - helper::io_error, - receiver::MessageFrame, - scopes::Scopes, + common::{ + Delta, MetricIdentifier, MetricKind, MetricName, MetricScope, MetricScopeHandle, + MetricValue, + }, + data::{Counter, Gauge, Histogram}, + registry::{MetricRegistry, ScopeRegistry}, }; -use crossbeam_channel::Sender; +use fxhash::FxBuildHasher; use quanta::Clock; +use std::error::Error; +use std::fmt; use std::sync::Arc; -/// Erorrs during sink creation. -#[derive(Debug)] +type FastHashMap = hashbrown::HashMap; + +/// Errors during sink creation. +#[derive(Debug, Clone)] pub enum SinkError { /// The scope value given was invalid i.e. empty or illegal characters. InvalidScope, } -/// A value that can be used as a metric scope. -pub trait AsScoped<'a> { - fn as_scoped(&'a self, base: String) -> String; +impl Error for SinkError {} + +impl fmt::Display for SinkError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + SinkError::InvalidScope => write!(f, "given scope is invalid"), + } + } } -/// Handle for sending metric samples into the receiver. +/// A value that can be used as a metric scope. /// -/// [`Sink`] is cloneable, and can not only send metric samples but can register and deregister -/// metric facets at any time. +/// This helper trait allows us to accept either a single string or a slice of strings to use as a +/// scope, to avoid needing to allocate in the case where we want to be able to specify multiple +/// scope levels in a single go. +pub trait AsScoped<'a> { + /// Creates a new [`MetricScope`] by adding `self` to the `base` scope. + fn as_scoped(&'a self, base: MetricScope) -> MetricScope; +} + +/// Handle for sending metric samples. pub struct Sink { - msg_tx: Sender, + metric_registry: Arc, + metric_cache: FastHashMap, + scope_registry: Arc, + scope: MetricScope, + scope_handle: MetricScopeHandle, clock: Clock, - scopes: Arc, - scope: String, - scope_id: u64, } impl Sink { pub(crate) fn new( - msg_tx: Sender, + metric_registry: Arc, + scope_registry: Arc, + scope: MetricScope, clock: Clock, - scopes: Arc, - scope: String, ) -> Sink { - let scope_id = scopes.register(scope.clone()); + let scope_handle = scope_registry.register(scope.clone()); Sink { - msg_tx, - clock, - scopes, + metric_registry, + metric_cache: FastHashMap::default(), + scope_registry, scope, - scope_id, - } - } - - pub(crate) fn new_with_scope_id( - msg_tx: Sender, - clock: Clock, - scopes: Arc, - scope: String, - scope_id: u64, - ) -> Sink { - Sink { - msg_tx, + scope_handle, clock, - scopes, - scope, - scope_id, } } @@ -90,10 +94,10 @@ impl Sink { let new_scope = scope.as_scoped(self.scope.clone()); Sink::new( - self.msg_tx.clone(), - self.clock.clone(), - self.scopes.clone(), + self.metric_registry.clone(), + self.scope_registry.clone(), new_scope, + self.clock.clone(), ) } @@ -102,58 +106,113 @@ impl Sink { self.clock.now() } - /// Records the count for a given metric. - pub fn record_count>(&self, key: K, delta: u64) { - let scoped_key = ScopedKey(self.scope_id, key.into()); - self.send(Sample::Count(scoped_key, delta)) + /// Records a value for a counter identified by the given name. + pub fn record_count>(&mut self, name: N, value: u64) { + let identifier = + MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Counter); + let value_handle = self.get_cached_value_handle(identifier); + value_handle.update_counter(value); } - /// Records the gauge for a given metric. - pub fn record_gauge>(&self, key: K, value: i64) { - let scoped_key = ScopedKey(self.scope_id, key.into()); - self.send(Sample::Gauge(scoped_key, value)) + /// Records the value for a gauge identified by the given name. + pub fn record_gauge>(&mut self, name: N, value: i64) { + let identifier = + MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Gauge); + let value_handle = self.get_cached_value_handle(identifier); + value_handle.update_gauge(value); } - /// Records the timing histogram for a given metric. - pub fn record_timing>(&self, key: K, start: u64, end: u64) { - let scoped_key = ScopedKey(self.scope_id, key.into()); - self.send(Sample::TimingHistogram(scoped_key, start, end)) + /// Records the value for a timing histogram identified by the given name. + /// + /// Both the start and end times must be supplied, but any values that implement [`Delta`] can + /// be used which allows for raw values from [`quanta::Clock`] to be used, or measurements from + /// [`Instant::now`]. + pub fn record_timing, V: Delta>(&mut self, name: N, start: V, end: V) { + let value = end.delta(start); + self.record_value(name, value); } - /// Records the value histogram for a given metric. - pub fn record_value>(&self, key: K, value: u64) { - let scoped_key = ScopedKey(self.scope_id, key.into()); - self.send(Sample::ValueHistogram(scoped_key, value)) + /// Records the value for a value histogram identified by the given name. + pub fn record_value>(&mut self, name: N, value: u64) { + let identifier = + MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Histogram); + let value_handle = self.get_cached_value_handle(identifier); + value_handle.update_histogram(value); } - /// Sends a raw metric sample to the receiver. - fn send(&self, sample: Sample) { - let _ = self - .msg_tx - .send(MessageFrame::Data(sample)) - .map_err(|_| io_error("failed to send sample")); + /// Creates a handle to the given counter. + /// + /// This handle can be embedded into an existing type and used to directly update the + /// underlying counter. It is merely a proxy, so multiple handles to the same counter can be + /// held and used. + pub fn counter>(&mut self, name: N) -> Counter { + let identifier = + MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Counter); + self.get_cached_value_handle(identifier).clone().into() + } + + /// Creates a handle to the given gauge. + /// + /// This handle can be embedded into an existing type and used to directly update the + /// underlying gauge. It is merely a proxy, so multiple handles to the same gauge can be + /// held and used. + pub fn gauge>(&mut self, name: N) -> Gauge { + let identifier = + MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Gauge); + self.get_cached_value_handle(identifier).clone().into() + } + + /// Creates a handle to the given histogram. + /// + /// This handle can be embedded into an existing type and used to directly update the + /// underlying histogram. It is merely a proxy, so multiple handles to the same histogram + /// can be held and used. + pub fn histogram>(&mut self, name: N) -> Histogram { + let identifier = + MetricIdentifier::Unlabeled(name.into(), self.scope_handle, MetricKind::Histogram); + self.get_cached_value_handle(identifier).clone().into() + } + + fn get_cached_value_handle(&mut self, identifier: MetricIdentifier) -> &MetricValue { + // This gross hack gets around lifetime rules until full NLL is stable. Without it, the + // borrow checker doesn't understand the flow control and thinks the reference lives all + // the way until the of the function, which breaks when we try to take a mutable reference + // for inserting into the handle cache. + if let Some(handle) = self.metric_cache.get(&identifier) { + return unsafe { &*(handle as *const MetricValue) }; + } + + let handle = self.metric_registry.get_value_handle(identifier.clone()); + self.metric_cache.insert(identifier.clone(), handle); + self.metric_cache.get(&identifier).unwrap() } } impl Clone for Sink { fn clone(&self) -> Sink { Sink { - msg_tx: self.msg_tx.clone(), - clock: self.clock.clone(), - scopes: self.scopes.clone(), + metric_registry: self.metric_registry.clone(), + metric_cache: self.metric_cache.clone(), + scope_registry: self.scope_registry.clone(), scope: self.scope.clone(), - scope_id: self.scope_id, + scope_handle: self.scope_handle, + clock: self.clock.clone(), } } } impl<'a> AsScoped<'a> for str { - fn as_scoped(&'a self, mut base: String) -> String { - if !base.is_empty() { - base.push_str("."); + fn as_scoped(&'a self, base: MetricScope) -> MetricScope { + match base { + MetricScope::Root => { + let parts = vec![self.to_owned()]; + MetricScope::Nested(parts) + } + MetricScope::Nested(mut parts) => { + parts.push(self.to_owned()); + MetricScope::Nested(parts) + } } - base.push_str(self); - base } } @@ -162,13 +221,17 @@ where &'a T: AsRef<[&'b str]>, T: 'a, { - fn as_scoped(&'a self, mut base: String) -> String { - for item in self.as_ref() { - if !base.is_empty() { - base.push('.'); + fn as_scoped(&'a self, base: MetricScope) -> MetricScope { + match base { + MetricScope::Root => { + let parts = self.as_ref().iter().map(|s| s.to_string()).collect(); + MetricScope::Nested(parts) + } + MetricScope::Nested(mut parts) => { + let mut new_parts = self.as_ref().iter().map(|s| s.to_string()).collect(); + parts.append(&mut new_parts); + MetricScope::Nested(parts) } - base.push_str(item); } - base } }