diff --git a/Cargo.toml b/Cargo.toml index 6c6a2e9..ae7ee46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,4 +3,5 @@ members = [ "metrics", "metrics-macros", "metrics-util", + "metrics-benchmark", ] diff --git a/metrics-macros/src/lib.rs b/metrics-macros/src/lib.rs index 69784cb..c6c4760 100644 --- a/metrics-macros/src/lib.rs +++ b/metrics-macros/src/lib.rs @@ -234,7 +234,7 @@ where let mlabels = vec![#(#insertable_labels),*]; let id = recorder.#register_ident((#key, mlabels).into(), None); - recorder.#op_ident(&id, #op_values); + recorder.#op_ident(id, #op_values); } } } @@ -257,7 +257,7 @@ fn make_key_safe(key: &LitStr) -> String { String::from_iter(safe_chars) } -fn can_use_fast_path(labels: &Vec<(LitStr, Expr)>) -> bool { +fn can_use_fast_path(labels: &[(LitStr, Expr)]) -> bool { let mut use_fast_path = true; for (_, lvalue) in labels { match lvalue { diff --git a/metrics-util/Cargo.toml b/metrics-util/Cargo.toml index 42b7aa5..6f485c9 100644 --- a/metrics-util/Cargo.toml +++ b/metrics-util/Cargo.toml @@ -28,16 +28,16 @@ name = "streaming_integers" harness = false [dependencies] -metrics = { path = "../metrics", version = "^0.12" } +metrics = { path = "../metrics", version = "^0.12", features = ["std"] } crossbeam-epoch = "^0.8" +crossbeam-utils = "^0.7" serde = "^1.0" arc-swap = "^0.4" im = "^14" -sharded-slab = "0.0.9" parking_lot = "^0.10" [dev-dependencies] -crossbeam-utils = "^0.7" -criterion = "^0.2.9" +criterion = "^0.3" lazy_static = "^1.3" -rand = "^0.6" +rand = { version = "^0.7", features = ["small_rng"] } +rand_distr = "^0.2" diff --git a/metrics-util/benches/bucket.rs b/metrics-util/benches/bucket.rs index 3da95ae..b6525dc 100644 --- a/metrics-util/benches/bucket.rs +++ b/metrics-util/benches/bucket.rs @@ -52,7 +52,7 @@ fn bucket_benchmark(c: &mut Criterion) { } }) }) - .throughput(Throughput::Elements(RANDOM_INTS.len() as u32)), + .throughput(Throughput::Elements(RANDOM_INTS.len() as u64)), ); } diff --git a/metrics-util/benches/registry.rs b/metrics-util/benches/registry.rs index 2748b9c..2fd812d 100644 --- a/metrics-util/benches/registry.rs +++ b/metrics-util/benches/registry.rs @@ -9,7 +9,7 @@ fn registry_benchmark(c: &mut Criterion) { c.bench( "registry", Benchmark::new("cached get/create (basic)", |b| { - let registry = Registry::new(); + let registry: Registry = Registry::new(); b.iter(|| { let key = "simple_key".into(); @@ -17,7 +17,7 @@ fn registry_benchmark(c: &mut Criterion) { }) }) .with_function("cached get/create (labels)", |b| { - let registry = Registry::new(); + let registry: Registry = Registry::new(); b.iter(|| { let labels = vec![Label::new("type", "http")]; @@ -27,7 +27,7 @@ fn registry_benchmark(c: &mut Criterion) { }) .with_function("uncached get/create (basic)", |b| { b.iter_batched_ref( - || Registry::new(), + || Registry::::new(), |registry| { let key = "simple_key".into(); let _ = registry.get_or_create_identifier(key, ()); @@ -37,7 +37,7 @@ fn registry_benchmark(c: &mut Criterion) { }) .with_function("uncached get/create (labels)", |b| { b.iter_batched_ref( - || Registry::new(), + || Registry::::new(), |registry| { let labels = vec![Label::new("type", "http")]; let key = ("simple_key", labels).into(); @@ -46,18 +46,16 @@ fn registry_benchmark(c: &mut Criterion) { BatchSize::SmallInput, ) }) - .with_function("get handle", |b| { - let registry = Registry::new(); + .with_function("with handle", |b| { + let registry = Registry::::new(); let id = registry.get_or_create_identifier("foo".into(), ()); - b.iter(|| { - let _handle = registry.get_handle(&id); - }) + b.iter(|| registry.with_handle(id, |_| {})) }) .with_function("registry overhead", |b| { b.iter_batched( || (), - |_| Registry::<()>::new(), + |_| Registry::<(), ()>::new(), BatchSize::NumIterations(1), ) }) diff --git a/metrics-util/benches/streaming_integers.rs b/metrics-util/benches/streaming_integers.rs index 9748420..4e9fc3b 100644 --- a/metrics-util/benches/streaming_integers.rs +++ b/metrics-util/benches/streaming_integers.rs @@ -6,11 +6,8 @@ extern crate lazy_static; use criterion::{Benchmark, Criterion, Throughput}; use metrics_util::StreamingIntegers; -use rand::{ - distributions::{Distribution, Gamma}, - rngs::SmallRng, - Rng, SeedableRng, -}; +use rand::{distributions::Distribution, rngs::SmallRng, SeedableRng}; +use rand_distr::Gamma; use std::time::Duration; lazy_static! { @@ -28,7 +25,7 @@ fn get_gamma_distribution(len: usize, upper_bound: Duration) -> Vec { // This Gamma distribution gets us pretty close to a typical web server response time // distribution where there's a big peak down low, and a long tail that drops off sharply. - let gamma = Gamma::new(1.75, 1.0); + let gamma = Gamma::new(1.75, 1.0).expect("failed to create gamma distribution"); // Scale all the values by 22 million to simulate a lower bound of 22ms (but in // nanoseconds) for all generated values. @@ -86,7 +83,7 @@ macro_rules! define_basic_benches { }); }); }) - .throughput(Throughput::Elements($input.len() as u32)), + .throughput(Throughput::Elements($input.len() as u64)), ) }; } diff --git a/metrics-util/src/bucket.rs b/metrics-util/src/bucket.rs index e6bcda3..2da19fa 100644 --- a/metrics-util/src/bucket.rs +++ b/metrics-util/src/bucket.rs @@ -5,7 +5,7 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, }; -const BLOCK_SIZE: usize = 128; +const BLOCK_SIZE: usize = 512; /// Discrete chunk of values with atomic read/write access. struct Block { diff --git a/metrics-util/src/handle.rs b/metrics-util/src/handle.rs index 1a5de8f..691164f 100644 --- a/metrics-util/src/handle.rs +++ b/metrics-util/src/handle.rs @@ -1,40 +1,110 @@ -use std::sync::atomic::{AtomicU64, AtomicI64}; use crate::AtomicBucket; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; /// Basic metric handle. /// /// Provides fast, thread-safe access and storage for the three supported metric types: counters, /// gauges, and histograms. +#[derive(Clone)] pub enum Handle { /// A counter. - Counter(AtomicU64), + Counter(Arc), /// A gauge. - Gauge(AtomicI64), + Gauge(Arc), /// A histogram. - Histogram(AtomicBucket), + Histogram(Arc>), } impl Handle { /// Creates a counter handle. /// /// The counter is initialized to 0. - pub const fn counter() -> Handle { - Handle::Counter(AtomicU64::new(0)) + pub fn counter() -> Handle { + Handle::Counter(Arc::new(AtomicU64::new(0))) } /// Creates a gauge handle. /// /// The gauge is initialized to 0. - pub const fn gauge() -> Handle { - Handle::Gauge(AtomicI64::new(0)) + pub fn gauge() -> Handle { + Handle::Gauge(Arc::new(AtomicU64::new(0))) } /// Creates a histogram handle. /// /// The histogram handle is initialized to empty. - pub const fn histogram() -> Handle { - Handle::Histogram(AtomicBucket::new()) + pub fn histogram() -> Handle { + Handle::Histogram(Arc::new(AtomicBucket::new())) + } + + /// Increments this handle as a counter. + /// + /// Panics if this handle is not a counter. + pub fn increment_counter(&self, value: u64) { + match self { + Handle::Counter(counter) => { + counter.fetch_add(value, Ordering::SeqCst); + } + _ => panic!("tried to increment as counter"), + } + } + + /// Updates this handle as a gauge. + /// + /// Panics if this handle is not a gauge. + pub fn update_gauge(&self, value: f64) { + let unsigned = unsafe { std::mem::transmute(value) }; + match self { + Handle::Gauge(gauge) => gauge.store(unsigned, Ordering::SeqCst), + _ => panic!("tried to update as gauge"), + } + } + + /// Records to this handle as a histogram. + /// + /// Panics if this handle is not a histogram. + pub fn record_histogram(&self, value: f64) { + match self { + Handle::Histogram(bucket) => bucket.push(value), + _ => panic!("tried to record as histogram"), + } + } + + /// Reads this handle as a counter. + /// + /// Panics if this handle is not a counter. + pub fn read_counter(&self) -> u64 { + match self { + Handle::Counter(counter) => counter.load(Ordering::Relaxed), + _ => panic!("tried to read as counter"), + } + } + + /// Reads this handle as a gauge. + /// + /// Panics if this handle is not a gauge. + pub fn read_gauge(&self) -> f64 { + match self { + Handle::Gauge(gauge) => { + let unsigned = gauge.load(Ordering::Relaxed); + unsafe { std::mem::transmute(unsigned) } + } + _ => panic!("tried to read as gauge"), + } + } + + /// Reads this handle as a histogram. + /// + /// Panics if this handle is not a histogram. + pub fn read_histogram(&self) -> Vec { + match self { + Handle::Histogram(bucket) => bucket.data(), + _ => panic!("tried to read as histogram"), + } } } diff --git a/metrics-util/src/lib.rs b/metrics-util/src/lib.rs index ac27e55..f935800 100644 --- a/metrics-util/src/lib.rs +++ b/metrics-util/src/lib.rs @@ -3,6 +3,9 @@ mod bucket; pub use bucket::AtomicBucket; +mod debugging; +pub use debugging::{DebugValue, DebuggingRecorder, MetricKind, Snapshotter}; + mod handle; pub use handle::Handle; diff --git a/metrics-util/src/registry.rs b/metrics-util/src/registry.rs index 7870f61..0cbca4e 100644 --- a/metrics-util/src/registry.rs +++ b/metrics-util/src/registry.rs @@ -1,36 +1,44 @@ +use std::collections::HashMap; +use std::hash::Hash; use std::sync::Arc; use arc_swap::ArcSwap; -use im::HashMap; -use metrics::{Identifier, Key}; +use crossbeam_utils::sync::ShardedLock; +use im::HashMap as ImmutableHashMap; +use metrics::Identifier; use parking_lot::Mutex; -use sharded_slab::Slab; - -pub use sharded_slab::Guard; /// A high-performance metric registry. /// -/// All metrics are defined by a `Key`, which represents the name of a metric, along with potential -/// labels. Registering a new metric, in turn, provides the caller with an opaque `Identifier` -/// that can be used to look up the associated handle with a metric. +/// `Registry` provides the ability to maintain a central listing of metrics mapped by a given key. /// -/// Handles would usually be a thread-safe type that can be used to manipulate a metric -- i.e. -/// increment a counter or add a value to a histogram -- but `Registry` does not care what data is -/// stored, it focuses purely on providing fast insertion and lookup. +/// In many cases, `K` will be a composite key, where the fundamental `Key` type from `metrics` is +/// present, and differentiation is provided by storing the metric type alongside. +/// +/// Metrics themselves are represented opaquely behind `H`. In most cases, this would be a +/// thread-safe handle to the underlying metrics storage that the owner of the registry can use to +/// update the actual metric value(s) as needed. `Handle`, from this crate, is a solid default +/// choice. +/// +/// `Registry` handles deduplicating metrics, and will return the `Identifier` for an existing +/// metric if a caller attempts to reregister it. /// /// `Registry` is optimized for reads. -pub struct Registry { - mappings: ArcSwap>, - handles: Slab, +pub struct Registry { + mappings: ArcSwap>, + handles: ShardedLock>, lock: Mutex<()>, } -impl Registry { +impl Registry +where + K: Eq + Hash + Clone, +{ /// Creates a new `Registry`. pub fn new() -> Self { Registry { - mappings: ArcSwap::from(Arc::new(HashMap::new())), - handles: Slab::new(), + mappings: ArcSwap::from(Arc::new(ImmutableHashMap::new())), + handles: ShardedLock::new(Vec::new()), lock: Mutex::new(()), } } @@ -39,7 +47,7 @@ impl Registry { /// /// If the key is not already mapped, a new identifier will be generated, and the given handle /// stored along side of it. If the key is already mapped, its identifier will be returned. - pub fn get_or_create_identifier(&self, key: Key, handle: H) -> Identifier { + pub fn get_or_create_identifier(&self, key: K, handle: H) -> Identifier { // Check our mapping table first. if let Some(id) = self.mappings.load().get(&key) { return id.clone(); @@ -55,11 +63,13 @@ impl Registry { } // Our identifier will be the index we insert the handle into. - let id = self + let mut wg = self .handles - .insert(handle) - .expect("current thread ran out of slots to register new metrics!") - .into(); + .write() + .expect("handles write lock was poisoned!"); + let id = wg.len().into(); + wg.push(handle); + drop(wg); // Update our mapping table and drop the lock. let new_mappings = mappings.update(key, id); @@ -71,7 +81,43 @@ impl Registry { } /// Gets the handle for a given identifier. - pub fn get_handle(&self, identifier: &Identifier) -> Option> { - self.handles.get(identifier.into()) + pub fn with_handle(&self, identifier: Identifier, mut f: F) + where + F: FnMut(&H), + { + let id: usize = identifier.into(); + let rg = self + .handles + .read() + .expect("handles read lock was poisoned!"); + if let Some(h) = rg.get(id) { + f(h); + } + } +} + +impl Registry +where + K: Eq + Hash + Clone, + H: Clone, +{ + /// Gets a map of all present handles, mapped by key. + /// + /// Handles must implement `Clone`. This map is a point-in-time snapshot of the registry. + pub fn get_handles(&self) -> HashMap { + let guard = self.mappings.load(); + let mappings = ImmutableHashMap::clone(&guard); + let rg = self + .handles + .read() + .expect("handles read lock was poisoned!"); + mappings + .into_iter() + .map(|(key, id)| { + let id: usize = id.into(); + let handle = rg.get(id).expect("handle not present!").clone(); + (key, handle) + }) + .collect::>() } } diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 676f885..2293bb7 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -30,7 +30,7 @@ proc-macro-hack = "^0.5" [dev-dependencies] log = "^0.4" -criterion = "^0.2" +criterion = "^0.3" rand = "^0.7" [features] diff --git a/metrics/benches/macros.rs b/metrics/benches/macros.rs index 44dec2d..063fead 100644 --- a/metrics/benches/macros.rs +++ b/metrics/benches/macros.rs @@ -9,18 +9,18 @@ use rand::{thread_rng, Rng}; #[derive(Default)] struct TestRecorder; impl Recorder for TestRecorder { - fn register_counter(&self, _key: Key) -> Identifier { + fn register_counter(&self, _key: Key, _description: Option<&'static str>) -> Identifier { thread_rng().gen::().into() } - fn register_gauge(&self, _key: Key) -> Identifier { + fn register_gauge(&self, _key: Key, _description: Option<&'static str>) -> Identifier { thread_rng().gen::().into() } - fn register_histogram(&self, _key: Key) -> Identifier { + fn register_histogram(&self, _key: Key, _description: Option<&'static str>) -> Identifier { thread_rng().gen::().into() } - fn increment_counter(&self, _id: &Identifier, _value: u64) {} - fn update_gauge(&self, _id: &Identifier, _value: f64) {} - fn record_histogram(&self, _id: &Identifier, _value: f64) {} + fn increment_counter(&self, _id: Identifier, _value: u64) {} + fn update_gauge(&self, _id: Identifier, _value: f64) {} + fn record_histogram(&self, _id: Identifier, _value: f64) {} } fn reset_recorder() { diff --git a/metrics/examples/basic.rs b/metrics/examples/basic.rs index 5c65c12..1b5aef9 100644 --- a/metrics/examples/basic.rs +++ b/metrics/examples/basic.rs @@ -33,17 +33,17 @@ impl Recorder for PrintRecorder { id.into() } - fn increment_counter(&self, id: &Identifier, value: u64) { + fn increment_counter(&self, id: Identifier, value: u64) { let uid: usize = id.into(); println!("(counter) got value {} for id {}", value, uid); } - fn update_gauge(&self, id: &Identifier, value: f64) { + fn update_gauge(&self, id: Identifier, value: f64) { let uid: usize = id.into(); println!("(gauge) got value {} for id {}", value, uid); } - fn record_histogram(&self, id: &Identifier, value: f64) { + fn record_histogram(&self, id: Identifier, value: f64) { let uid: usize = id.into(); println!("(histogram) got value {} for id {}", value, uid); } diff --git a/metrics/src/common.rs b/metrics/src/common.rs index fe57f4f..ba15cea 100644 --- a/metrics/src/common.rs +++ b/metrics/src/common.rs @@ -32,12 +32,6 @@ impl Into for Identifier { } } -impl Into for &Identifier { - fn into(self) -> usize { - self.0 - } -} - /// Atomically-guarded identifier initialization. /// /// Stores an identifier in an atomically-backed fashion, allowing for multiple callers to @@ -64,7 +58,7 @@ impl OnceIdentifier { /// /// All callers rondezvous on an internal atomic guard, so it impossible to see /// invalid state. - pub fn get_or_init(&self, f: F) -> &Identifier + pub fn get_or_init(&self, f: F) -> Identifier where F: Fn() -> Identifier, { @@ -75,7 +69,7 @@ impl OnceIdentifier { } }); - unsafe { &*self.inner.get() } + unsafe { *self.inner.get() } } } diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index 2eb9c73..d5e7232 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -85,9 +85,9 @@ //! id //! } //! -//! fn get_key(&self, id: &Identifier) -> Key { +//! fn get_key(&self, id: Identifier) -> Key { //! let keys = self.keys.lock().expect("failed to lock keys"); -//! keys.get(id).expect("invalid identifier").clone() +//! keys.get(&id).expect("invalid identifier").clone() //! } //! } //! @@ -104,17 +104,17 @@ //! self.register(key) //! } //! -//! fn increment_counter(&self, id: &Identifier, value: u64) { +//! fn increment_counter(&self, id: Identifier, value: u64) { //! let key = self.get_key(id); //! info!("counter '{}' -> {}", key, value); //! } //! -//! fn update_gauge(&self, id: &Identifier, value: f64) { +//! fn update_gauge(&self, id: Identifier, value: f64) { //! let key = self.get_key(id); //! info!("gauge '{}' -> {}", key, value); //! } //! -//! fn record_histogram(&self, id: &Identifier, value: f64) { +//! fn record_histogram(&self, id: Identifier, value: f64) { //! let key = self.get_key(id); //! info!("histogram '{}' -> {}", key, value); //! } @@ -132,9 +132,9 @@ //! # fn register_counter(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() } //! # fn register_gauge(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() } //! # fn register_histogram(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() } -//! # fn increment_counter(&self, _id: &Identifier, _value: u64) {} -//! # fn update_gauge(&self, _id: &Identifier, _value: f64) {} -//! # fn record_histogram(&self, _id: &Identifier, _value: f64) {} +//! # fn increment_counter(&self, _id: Identifier, _value: u64) {} +//! # fn update_gauge(&self, _id: Identifier, _value: f64) {} +//! # fn record_histogram(&self, _id: Identifier, _value: f64) {} //! # } //! use metrics::SetRecorderError; //! @@ -160,9 +160,9 @@ //! # fn register_counter(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() } //! # fn register_gauge(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() } //! # fn register_histogram(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() } -//! # fn increment_counter(&self, _id: &Identifier, _value: u64) {} -//! # fn update_gauge(&self, _id: &Identifier, _value: f64) {} -//! # fn record_histogram(&self, _id: &Identifier, _value: f64) {} +//! # fn increment_counter(&self, _id: Identifier, _value: u64) {} +//! # fn update_gauge(&self, _id: Identifier, _value: f64) {} +//! # fn record_histogram(&self, _id: Identifier, _value: f64) {} //! # } //! use metrics::SetRecorderError; //! diff --git a/metrics/src/recorder.rs b/metrics/src/recorder.rs index 9b9c98f..8dd02ea 100644 --- a/metrics/src/recorder.rs +++ b/metrics/src/recorder.rs @@ -36,13 +36,13 @@ pub trait Recorder { fn register_histogram(&self, key: Key, description: Option<&'static str>) -> Identifier; /// Increments a counter. - fn increment_counter(&self, id: &Identifier, value: u64); + fn increment_counter(&self, id: Identifier, value: u64); /// Updates a gauge. - fn update_gauge(&self, id: &Identifier, value: f64); + fn update_gauge(&self, id: Identifier, value: f64); /// Records a histogram. - fn record_histogram(&self, id: &Identifier, value: f64); + fn record_histogram(&self, id: Identifier, value: f64); } struct NoopRecorder; @@ -57,9 +57,9 @@ impl Recorder for NoopRecorder { fn register_histogram(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() } - fn increment_counter(&self, _id: &Identifier, _value: u64) {} - fn update_gauge(&self, _id: &Identifier, _value: f64) {} - fn record_histogram(&self, _id: &Identifier, _value: f64) {} + fn increment_counter(&self, _id: Identifier, _value: u64) {} + fn update_gauge(&self, _id: Identifier, _value: f64) {} + fn record_histogram(&self, _id: Identifier, _value: f64) {} } /// Sets the global recorder to a `&'static Recorder`.