lots of things: adding basic recorder impl for benchmarking, fmting, reducing/updated deps, etc

This commit is contained in:
Toby Lawrence 2020-04-12 14:52:03 -04:00
parent 313706a8f7
commit 9ee7e10f0a
16 changed files with 204 additions and 95 deletions

View File

@ -3,4 +3,5 @@ members = [
"metrics",
"metrics-macros",
"metrics-util",
"metrics-benchmark",
]

View File

@ -234,7 +234,7 @@ where
let mlabels = vec![#(#insertable_labels),*];
let id = recorder.#register_ident((#key, mlabels).into(), None);
recorder.#op_ident(&id, #op_values);
recorder.#op_ident(id, #op_values);
}
}
}
@ -257,7 +257,7 @@ fn make_key_safe(key: &LitStr) -> String {
String::from_iter(safe_chars)
}
fn can_use_fast_path(labels: &Vec<(LitStr, Expr)>) -> bool {
fn can_use_fast_path(labels: &[(LitStr, Expr)]) -> bool {
let mut use_fast_path = true;
for (_, lvalue) in labels {
match lvalue {

View File

@ -28,16 +28,16 @@ name = "streaming_integers"
harness = false
[dependencies]
metrics = { path = "../metrics", version = "^0.12" }
metrics = { path = "../metrics", version = "^0.12", features = ["std"] }
crossbeam-epoch = "^0.8"
crossbeam-utils = "^0.7"
serde = "^1.0"
arc-swap = "^0.4"
im = "^14"
sharded-slab = "0.0.9"
parking_lot = "^0.10"
[dev-dependencies]
crossbeam-utils = "^0.7"
criterion = "^0.2.9"
criterion = "^0.3"
lazy_static = "^1.3"
rand = "^0.6"
rand = { version = "^0.7", features = ["small_rng"] }
rand_distr = "^0.2"

View File

@ -52,7 +52,7 @@ fn bucket_benchmark(c: &mut Criterion) {
}
})
})
.throughput(Throughput::Elements(RANDOM_INTS.len() as u32)),
.throughput(Throughput::Elements(RANDOM_INTS.len() as u64)),
);
}

View File

@ -9,7 +9,7 @@ fn registry_benchmark(c: &mut Criterion) {
c.bench(
"registry",
Benchmark::new("cached get/create (basic)", |b| {
let registry = Registry::new();
let registry: Registry<Key, ()> = Registry::new();
b.iter(|| {
let key = "simple_key".into();
@ -17,7 +17,7 @@ fn registry_benchmark(c: &mut Criterion) {
})
})
.with_function("cached get/create (labels)", |b| {
let registry = Registry::new();
let registry: Registry<Key, ()> = Registry::new();
b.iter(|| {
let labels = vec![Label::new("type", "http")];
@ -27,7 +27,7 @@ fn registry_benchmark(c: &mut Criterion) {
})
.with_function("uncached get/create (basic)", |b| {
b.iter_batched_ref(
|| Registry::new(),
|| Registry::<Key, ()>::new(),
|registry| {
let key = "simple_key".into();
let _ = registry.get_or_create_identifier(key, ());
@ -37,7 +37,7 @@ fn registry_benchmark(c: &mut Criterion) {
})
.with_function("uncached get/create (labels)", |b| {
b.iter_batched_ref(
|| Registry::new(),
|| Registry::<Key, ()>::new(),
|registry| {
let labels = vec![Label::new("type", "http")];
let key = ("simple_key", labels).into();
@ -46,18 +46,16 @@ fn registry_benchmark(c: &mut Criterion) {
BatchSize::SmallInput,
)
})
.with_function("get handle", |b| {
let registry = Registry::new();
.with_function("with handle", |b| {
let registry = Registry::<Key, ()>::new();
let id = registry.get_or_create_identifier("foo".into(), ());
b.iter(|| {
let _handle = registry.get_handle(&id);
})
b.iter(|| registry.with_handle(id, |_| {}))
})
.with_function("registry overhead", |b| {
b.iter_batched(
|| (),
|_| Registry::<()>::new(),
|_| Registry::<(), ()>::new(),
BatchSize::NumIterations(1),
)
})

View File

@ -6,11 +6,8 @@ extern crate lazy_static;
use criterion::{Benchmark, Criterion, Throughput};
use metrics_util::StreamingIntegers;
use rand::{
distributions::{Distribution, Gamma},
rngs::SmallRng,
Rng, SeedableRng,
};
use rand::{distributions::Distribution, rngs::SmallRng, SeedableRng};
use rand_distr::Gamma;
use std::time::Duration;
lazy_static! {
@ -28,7 +25,7 @@ fn get_gamma_distribution(len: usize, upper_bound: Duration) -> Vec<u64> {
// This Gamma distribution gets us pretty close to a typical web server response time
// distribution where there's a big peak down low, and a long tail that drops off sharply.
let gamma = Gamma::new(1.75, 1.0);
let gamma = Gamma::new(1.75, 1.0).expect("failed to create gamma distribution");
// Scale all the values by 22 million to simulate a lower bound of 22ms (but in
// nanoseconds) for all generated values.
@ -86,7 +83,7 @@ macro_rules! define_basic_benches {
});
});
})
.throughput(Throughput::Elements($input.len() as u32)),
.throughput(Throughput::Elements($input.len() as u64)),
)
};
}

View File

@ -5,7 +5,7 @@ use std::{
sync::atomic::{AtomicUsize, Ordering},
};
const BLOCK_SIZE: usize = 128;
const BLOCK_SIZE: usize = 512;
/// Discrete chunk of values with atomic read/write access.
struct Block<T> {

View File

@ -1,40 +1,110 @@
use std::sync::atomic::{AtomicU64, AtomicI64};
use crate::AtomicBucket;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
/// Basic metric handle.
///
/// Provides fast, thread-safe access and storage for the three supported metric types: counters,
/// gauges, and histograms.
#[derive(Clone)]
pub enum Handle {
/// A counter.
Counter(AtomicU64),
Counter(Arc<AtomicU64>),
/// A gauge.
Gauge(AtomicI64),
Gauge(Arc<AtomicU64>),
/// A histogram.
Histogram(AtomicBucket<f64>),
Histogram(Arc<AtomicBucket<f64>>),
}
impl Handle {
/// Creates a counter handle.
///
/// The counter is initialized to 0.
pub const fn counter() -> Handle {
Handle::Counter(AtomicU64::new(0))
pub fn counter() -> Handle {
Handle::Counter(Arc::new(AtomicU64::new(0)))
}
/// Creates a gauge handle.
///
/// The gauge is initialized to 0.
pub const fn gauge() -> Handle {
Handle::Gauge(AtomicI64::new(0))
pub fn gauge() -> Handle {
Handle::Gauge(Arc::new(AtomicU64::new(0)))
}
/// Creates a histogram handle.
///
/// The histogram handle is initialized to empty.
pub const fn histogram() -> Handle {
Handle::Histogram(AtomicBucket::new())
pub fn histogram() -> Handle {
Handle::Histogram(Arc::new(AtomicBucket::new()))
}
/// Increments this handle as a counter.
///
/// Panics if this handle is not a counter.
pub fn increment_counter(&self, value: u64) {
match self {
Handle::Counter(counter) => {
counter.fetch_add(value, Ordering::SeqCst);
}
_ => panic!("tried to increment as counter"),
}
}
/// Updates this handle as a gauge.
///
/// Panics if this handle is not a gauge.
pub fn update_gauge(&self, value: f64) {
let unsigned = unsafe { std::mem::transmute(value) };
match self {
Handle::Gauge(gauge) => gauge.store(unsigned, Ordering::SeqCst),
_ => panic!("tried to update as gauge"),
}
}
/// Records to this handle as a histogram.
///
/// Panics if this handle is not a histogram.
pub fn record_histogram(&self, value: f64) {
match self {
Handle::Histogram(bucket) => bucket.push(value),
_ => panic!("tried to record as histogram"),
}
}
/// Reads this handle as a counter.
///
/// Panics if this handle is not a counter.
pub fn read_counter(&self) -> u64 {
match self {
Handle::Counter(counter) => counter.load(Ordering::Relaxed),
_ => panic!("tried to read as counter"),
}
}
/// Reads this handle as a gauge.
///
/// Panics if this handle is not a gauge.
pub fn read_gauge(&self) -> f64 {
match self {
Handle::Gauge(gauge) => {
let unsigned = gauge.load(Ordering::Relaxed);
unsafe { std::mem::transmute(unsigned) }
}
_ => panic!("tried to read as gauge"),
}
}
/// Reads this handle as a histogram.
///
/// Panics if this handle is not a histogram.
pub fn read_histogram(&self) -> Vec<f64> {
match self {
Handle::Histogram(bucket) => bucket.data(),
_ => panic!("tried to read as histogram"),
}
}
}

View File

@ -3,6 +3,9 @@
mod bucket;
pub use bucket::AtomicBucket;
mod debugging;
pub use debugging::{DebugValue, DebuggingRecorder, MetricKind, Snapshotter};
mod handle;
pub use handle::Handle;

View File

@ -1,36 +1,44 @@
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use arc_swap::ArcSwap;
use im::HashMap;
use metrics::{Identifier, Key};
use crossbeam_utils::sync::ShardedLock;
use im::HashMap as ImmutableHashMap;
use metrics::Identifier;
use parking_lot::Mutex;
use sharded_slab::Slab;
pub use sharded_slab::Guard;
/// A high-performance metric registry.
///
/// All metrics are defined by a `Key`, which represents the name of a metric, along with potential
/// labels. Registering a new metric, in turn, provides the caller with an opaque `Identifier`
/// that can be used to look up the associated handle with a metric.
/// `Registry` provides the ability to maintain a central listing of metrics mapped by a given key.
///
/// Handles would usually be a thread-safe type that can be used to manipulate a metric -- i.e.
/// increment a counter or add a value to a histogram -- but `Registry` does not care what data is
/// stored, it focuses purely on providing fast insertion and lookup.
/// In many cases, `K` will be a composite key, where the fundamental `Key` type from `metrics` is
/// present, and differentiation is provided by storing the metric type alongside.
///
/// Metrics themselves are represented opaquely behind `H`. In most cases, this would be a
/// thread-safe handle to the underlying metrics storage that the owner of the registry can use to
/// update the actual metric value(s) as needed. `Handle`, from this crate, is a solid default
/// choice.
///
/// `Registry` handles deduplicating metrics, and will return the `Identifier` for an existing
/// metric if a caller attempts to reregister it.
///
/// `Registry` is optimized for reads.
pub struct Registry<H> {
mappings: ArcSwap<HashMap<Key, Identifier>>,
handles: Slab<H>,
pub struct Registry<K, H> {
mappings: ArcSwap<ImmutableHashMap<K, Identifier>>,
handles: ShardedLock<Vec<H>>,
lock: Mutex<()>,
}
impl<H> Registry<H> {
impl<K, H> Registry<K, H>
where
K: Eq + Hash + Clone,
{
/// Creates a new `Registry`.
pub fn new() -> Self {
Registry {
mappings: ArcSwap::from(Arc::new(HashMap::new())),
handles: Slab::new(),
mappings: ArcSwap::from(Arc::new(ImmutableHashMap::new())),
handles: ShardedLock::new(Vec::new()),
lock: Mutex::new(()),
}
}
@ -39,7 +47,7 @@ impl<H> Registry<H> {
///
/// If the key is not already mapped, a new identifier will be generated, and the given handle
/// stored along side of it. If the key is already mapped, its identifier will be returned.
pub fn get_or_create_identifier(&self, key: Key, handle: H) -> Identifier {
pub fn get_or_create_identifier(&self, key: K, handle: H) -> Identifier {
// Check our mapping table first.
if let Some(id) = self.mappings.load().get(&key) {
return id.clone();
@ -55,11 +63,13 @@ impl<H> Registry<H> {
}
// Our identifier will be the index we insert the handle into.
let id = self
let mut wg = self
.handles
.insert(handle)
.expect("current thread ran out of slots to register new metrics!")
.into();
.write()
.expect("handles write lock was poisoned!");
let id = wg.len().into();
wg.push(handle);
drop(wg);
// Update our mapping table and drop the lock.
let new_mappings = mappings.update(key, id);
@ -71,7 +81,43 @@ impl<H> Registry<H> {
}
/// Gets the handle for a given identifier.
pub fn get_handle(&self, identifier: &Identifier) -> Option<Guard<'_, H>> {
self.handles.get(identifier.into())
pub fn with_handle<F>(&self, identifier: Identifier, mut f: F)
where
F: FnMut(&H),
{
let id: usize = identifier.into();
let rg = self
.handles
.read()
.expect("handles read lock was poisoned!");
if let Some(h) = rg.get(id) {
f(h);
}
}
}
impl<K, H> Registry<K, H>
where
K: Eq + Hash + Clone,
H: Clone,
{
/// Gets a map of all present handles, mapped by key.
///
/// Handles must implement `Clone`. This map is a point-in-time snapshot of the registry.
pub fn get_handles(&self) -> HashMap<K, H> {
let guard = self.mappings.load();
let mappings = ImmutableHashMap::clone(&guard);
let rg = self
.handles
.read()
.expect("handles read lock was poisoned!");
mappings
.into_iter()
.map(|(key, id)| {
let id: usize = id.into();
let handle = rg.get(id).expect("handle not present!").clone();
(key, handle)
})
.collect::<HashMap<_, _>>()
}
}

View File

@ -30,7 +30,7 @@ proc-macro-hack = "^0.5"
[dev-dependencies]
log = "^0.4"
criterion = "^0.2"
criterion = "^0.3"
rand = "^0.7"
[features]

View File

@ -9,18 +9,18 @@ use rand::{thread_rng, Rng};
#[derive(Default)]
struct TestRecorder;
impl Recorder for TestRecorder {
fn register_counter(&self, _key: Key) -> Identifier {
fn register_counter(&self, _key: Key, _description: Option<&'static str>) -> Identifier {
thread_rng().gen::<usize>().into()
}
fn register_gauge(&self, _key: Key) -> Identifier {
fn register_gauge(&self, _key: Key, _description: Option<&'static str>) -> Identifier {
thread_rng().gen::<usize>().into()
}
fn register_histogram(&self, _key: Key) -> Identifier {
fn register_histogram(&self, _key: Key, _description: Option<&'static str>) -> Identifier {
thread_rng().gen::<usize>().into()
}
fn increment_counter(&self, _id: &Identifier, _value: u64) {}
fn update_gauge(&self, _id: &Identifier, _value: f64) {}
fn record_histogram(&self, _id: &Identifier, _value: f64) {}
fn increment_counter(&self, _id: Identifier, _value: u64) {}
fn update_gauge(&self, _id: Identifier, _value: f64) {}
fn record_histogram(&self, _id: Identifier, _value: f64) {}
}
fn reset_recorder() {

View File

@ -33,17 +33,17 @@ impl Recorder for PrintRecorder {
id.into()
}
fn increment_counter(&self, id: &Identifier, value: u64) {
fn increment_counter(&self, id: Identifier, value: u64) {
let uid: usize = id.into();
println!("(counter) got value {} for id {}", value, uid);
}
fn update_gauge(&self, id: &Identifier, value: f64) {
fn update_gauge(&self, id: Identifier, value: f64) {
let uid: usize = id.into();
println!("(gauge) got value {} for id {}", value, uid);
}
fn record_histogram(&self, id: &Identifier, value: f64) {
fn record_histogram(&self, id: Identifier, value: f64) {
let uid: usize = id.into();
println!("(histogram) got value {} for id {}", value, uid);
}

View File

@ -32,12 +32,6 @@ impl Into<usize> for Identifier {
}
}
impl Into<usize> for &Identifier {
fn into(self) -> usize {
self.0
}
}
/// Atomically-guarded identifier initialization.
///
/// Stores an identifier in an atomically-backed fashion, allowing for multiple callers to
@ -64,7 +58,7 @@ impl OnceIdentifier {
///
/// All callers rondezvous on an internal atomic guard, so it impossible to see
/// invalid state.
pub fn get_or_init<F>(&self, f: F) -> &Identifier
pub fn get_or_init<F>(&self, f: F) -> Identifier
where
F: Fn() -> Identifier,
{
@ -75,7 +69,7 @@ impl OnceIdentifier {
}
});
unsafe { &*self.inner.get() }
unsafe { *self.inner.get() }
}
}

View File

@ -85,9 +85,9 @@
//! id
//! }
//!
//! fn get_key(&self, id: &Identifier) -> Key {
//! fn get_key(&self, id: Identifier) -> Key {
//! let keys = self.keys.lock().expect("failed to lock keys");
//! keys.get(id).expect("invalid identifier").clone()
//! keys.get(&id).expect("invalid identifier").clone()
//! }
//! }
//!
@ -104,17 +104,17 @@
//! self.register(key)
//! }
//!
//! fn increment_counter(&self, id: &Identifier, value: u64) {
//! fn increment_counter(&self, id: Identifier, value: u64) {
//! let key = self.get_key(id);
//! info!("counter '{}' -> {}", key, value);
//! }
//!
//! fn update_gauge(&self, id: &Identifier, value: f64) {
//! fn update_gauge(&self, id: Identifier, value: f64) {
//! let key = self.get_key(id);
//! info!("gauge '{}' -> {}", key, value);
//! }
//!
//! fn record_histogram(&self, id: &Identifier, value: f64) {
//! fn record_histogram(&self, id: Identifier, value: f64) {
//! let key = self.get_key(id);
//! info!("histogram '{}' -> {}", key, value);
//! }
@ -132,9 +132,9 @@
//! # fn register_counter(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() }
//! # fn register_gauge(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() }
//! # fn register_histogram(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() }
//! # fn increment_counter(&self, _id: &Identifier, _value: u64) {}
//! # fn update_gauge(&self, _id: &Identifier, _value: f64) {}
//! # fn record_histogram(&self, _id: &Identifier, _value: f64) {}
//! # fn increment_counter(&self, _id: Identifier, _value: u64) {}
//! # fn update_gauge(&self, _id: Identifier, _value: f64) {}
//! # fn record_histogram(&self, _id: Identifier, _value: f64) {}
//! # }
//! use metrics::SetRecorderError;
//!
@ -160,9 +160,9 @@
//! # fn register_counter(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() }
//! # fn register_gauge(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() }
//! # fn register_histogram(&self, _key: Key, _description: Option<&'static str>) -> Identifier { Identifier::default() }
//! # fn increment_counter(&self, _id: &Identifier, _value: u64) {}
//! # fn update_gauge(&self, _id: &Identifier, _value: f64) {}
//! # fn record_histogram(&self, _id: &Identifier, _value: f64) {}
//! # fn increment_counter(&self, _id: Identifier, _value: u64) {}
//! # fn update_gauge(&self, _id: Identifier, _value: f64) {}
//! # fn record_histogram(&self, _id: Identifier, _value: f64) {}
//! # }
//! use metrics::SetRecorderError;
//!

View File

@ -36,13 +36,13 @@ pub trait Recorder {
fn register_histogram(&self, key: Key, description: Option<&'static str>) -> Identifier;
/// Increments a counter.
fn increment_counter(&self, id: &Identifier, value: u64);
fn increment_counter(&self, id: Identifier, value: u64);
/// Updates a gauge.
fn update_gauge(&self, id: &Identifier, value: f64);
fn update_gauge(&self, id: Identifier, value: f64);
/// Records a histogram.
fn record_histogram(&self, id: &Identifier, value: f64);
fn record_histogram(&self, id: Identifier, value: f64);
}
struct NoopRecorder;
@ -57,9 +57,9 @@ impl Recorder for NoopRecorder {
fn register_histogram(&self, _key: Key, _description: Option<&'static str>) -> Identifier {
Identifier::default()
}
fn increment_counter(&self, _id: &Identifier, _value: u64) {}
fn update_gauge(&self, _id: &Identifier, _value: f64) {}
fn record_histogram(&self, _id: &Identifier, _value: f64) {}
fn increment_counter(&self, _id: Identifier, _value: u64) {}
fn update_gauge(&self, _id: Identifier, _value: f64) {}
fn record_histogram(&self, _id: Identifier, _value: f64) {}
}
/// Sets the global recorder to a `&'static Recorder`.