core: add unit support (#107)

This commit is contained in:
Toby Lawrence 2020-10-24 10:55:12 -04:00 committed by GitHub
commit cf1d93c979
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
104 changed files with 1400 additions and 432 deletions

View File

@ -73,3 +73,30 @@ their own copyright notices and license terms:
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
* metrics-observer reuses code from `std::time::Duration` which carries
the following license:
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -7,5 +7,5 @@ members = [
"metrics-exporter-tcp",
"metrics-exporter-prometheus",
"metrics-tracing-context",
"metrics-observer",
]
exclude = ["metrics-observer"]

View File

@ -6,7 +6,7 @@ use hyper::{
service::{make_service_fn, service_fn},
{Body, Error as HyperError, Response, Server},
};
use metrics::{Key, Recorder, SetRecorderError};
use metrics::{Key, Recorder, SetRecorderError, Unit};
use metrics_util::{
parse_quantiles, CompositeKey, Handle, Histogram, MetricKind, Quantile, Registry,
};
@ -494,7 +494,7 @@ impl PrometheusBuilder {
}
impl Recorder for PrometheusRecorder {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
fn register_counter(&self, key: Key, _unit: Option<Unit>, description: Option<&'static str>) {
self.add_description_if_missing(&key, description);
self.inner.registry().op(
CompositeKey::new(MetricKind::Counter, key),
@ -503,7 +503,7 @@ impl Recorder for PrometheusRecorder {
);
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
fn register_gauge(&self, key: Key, _unit: Option<Unit>, description: Option<&'static str>) {
self.add_description_if_missing(&key, description);
self.inner.registry().op(
CompositeKey::new(MetricKind::Gauge, key),
@ -512,7 +512,7 @@ impl Recorder for PrometheusRecorder {
);
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
fn register_histogram(&self, key: Key, _unit: Option<Unit>, description: Option<&'static str>) {
self.add_description_if_missing(&key, description);
self.inner.registry().op(
CompositeKey::new(MetricKind::Histogram, key),

View File

@ -0,0 +1,11 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
<!-- next-header -->
## [Unreleased] - ReleaseDate
### Added
- Effective birth of the crate.

View File

@ -1,7 +1,7 @@
use std::thread;
use std::time::Duration;
use metrics::{histogram, increment};
use metrics::{histogram, increment, register_histogram, Unit};
use metrics_exporter_tcp::TcpBuilder;
use quanta::Clock;
@ -15,6 +15,8 @@ fn main() {
let mut clock = Clock::new();
let mut last = None;
register_histogram!("tcp_server_loop_delta_ns", Unit::Nanoseconds);
loop {
increment!("tcp_server_loops", "system" => "foo");

View File

@ -4,6 +4,22 @@ import "google/protobuf/timestamp.proto";
package event.proto;
message Metadata {
string name = 1;
enum MetricType {
COUNTER = 0;
GAUGE = 1;
HISTOGRAM = 2;
}
MetricType metric_type = 2;
oneof unit {
string unit_value = 3;
}
oneof description {
string description_value = 4;
}
}
message Metric {
string name = 1;
google.protobuf.Timestamp timestamp = 2;
@ -26,3 +42,10 @@ message Gauge {
message Histogram {
uint64 value = 1;
}
message Event {
oneof event {
Metadata metadata = 1;
Metric metric = 2;
}
}

View File

@ -53,7 +53,7 @@ use std::time::SystemTime;
use bytes::Bytes;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use metrics::{Key, Recorder, SetRecorderError};
use metrics::{Key, Recorder, SetRecorderError, Unit};
use mio::{
net::{TcpListener, TcpStream},
Events, Interest, Poll, Token, Waker,
@ -70,12 +70,19 @@ mod proto {
include!(concat!(env!("OUT_DIR"), "/event.proto.rs"));
}
use self::proto::metadata::MetricType;
enum MetricValue {
Counter(u64),
Gauge(f64),
Histogram(u64),
}
enum Event {
Metadata(Key, MetricType, Option<Unit>, Option<&'static str>),
Metric(Key, MetricValue),
}
/// Errors that could occur while installing a TCP recorder/exporter.
#[derive(Debug)]
pub enum Error {
@ -100,7 +107,7 @@ impl From<SetRecorderError> for Error {
/// A TCP recorder.
pub struct TcpRecorder {
tx: Sender<(Key, MetricValue)>,
tx: Sender<Event>,
waker: Arc<Waker>,
}
@ -191,18 +198,37 @@ impl TcpBuilder {
}
impl TcpRecorder {
fn register_metric(
&self,
key: Key,
metric_type: MetricType,
unit: Option<Unit>,
description: Option<&'static str>,
) {
let _ = self
.tx
.try_send(Event::Metadata(key, metric_type, unit, description));
let _ = self.waker.wake();
}
fn push_metric(&self, key: Key, value: MetricValue) {
let _ = self.tx.try_send((key, value));
let _ = self.tx.try_send(Event::Metric(key, value));
let _ = self.waker.wake();
}
}
impl Recorder for TcpRecorder {
fn register_counter(&self, _key: Key, _description: Option<&'static str>) {}
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.register_metric(key, MetricType::Counter, unit, description);
}
fn register_gauge(&self, _key: Key, _description: Option<&'static str>) {}
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.register_metric(key, MetricType::Gauge, unit, description);
}
fn register_histogram(&self, _key: Key, _description: Option<&'static str>) {}
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.register_metric(key, MetricType::Histogram, unit, description);
}
fn increment_counter(&self, key: Key, value: u64) {
self.push_metric(key, MetricValue::Counter(value));
@ -221,13 +247,14 @@ fn run_transport(
mut poll: Poll,
waker: Arc<Waker>,
listener: TcpListener,
rx: Receiver<(Key, MetricValue)>,
rx: Receiver<Event>,
buffer_size: Option<usize>,
) {
let buffer_limit = buffer_size.unwrap_or(std::usize::MAX);
let mut events = Events::with_capacity(1024);
let mut clients = HashMap::new();
let mut clients_to_remove = Vec::new();
let mut metadata = HashMap::new();
let mut next_token = START_TOKEN;
let mut buffered_pmsgs = VecDeque::with_capacity(buffer_limit);
@ -270,10 +297,22 @@ fn run_transport(
// If our sender is dead, we can't do anything else, so just return.
Err(_) => return,
};
let (key, value) = msg;
match convert_metric_to_protobuf_encoded(key, value) {
Ok(pmsg) => buffered_pmsgs.push_back(pmsg),
Err(e) => error!(error = ?e, "error encoding metric"),
match msg {
Event::Metadata(key, metric_type, unit, desc) => {
let entry = metadata
.entry(key)
.or_insert_with(|| (metric_type, None, None));
let (_, uentry, dentry) = entry;
*uentry = unit;
*dentry = desc;
}
Event::Metric(key, value) => {
match convert_metric_to_protobuf_encoded(key, value) {
Ok(pmsg) => buffered_pmsgs.push_back(pmsg),
Err(e) => error!(error = ?e, "error encoding metric"),
}
}
}
}
drop(_mrxspan);
@ -340,9 +379,10 @@ fn run_transport(
.register(&mut conn, token, CLIENT_INTEREST)
.expect("failed to register interest for client connection");
// Start tracking them.
// Start tracking them, and enqueue all of the metadata.
let metadata = generate_metadata_messages(&metadata);
clients
.insert(token, (conn, None, VecDeque::new()))
.insert(token, (conn, None, metadata))
.ok_or(())
.expect_err("client mapped to existing token!");
}
@ -370,6 +410,23 @@ fn run_transport(
}
}
fn generate_metadata_messages(
metadata: &HashMap<Key, (MetricType, Option<Unit>, Option<&'static str>)>,
) -> VecDeque<Bytes> {
let mut bufs = VecDeque::new();
for (key, (metric_type, unit, desc)) in metadata.iter() {
let msg = convert_metadata_to_protobuf_encoded(
key,
metric_type.clone(),
unit.clone(),
desc.clone(),
)
.expect("failed to encode metadata buffer");
bufs.push_back(msg);
}
bufs
}
#[tracing::instrument(skip(wbuf, msgs))]
fn drive_connection(
conn: &mut TcpStream,
@ -421,6 +478,28 @@ fn drive_connection(
}
}
fn convert_metadata_to_protobuf_encoded(
key: &Key,
metric_type: MetricType,
unit: Option<Unit>,
desc: Option<&'static str>,
) -> Result<Bytes, EncodeError> {
let name = key.name().to_string();
let metadata = proto::Metadata {
name,
metric_type: metric_type.into(),
unit: unit.map(|u| proto::metadata::Unit::UnitValue(u.as_str().to_owned())),
description: desc.map(|d| proto::metadata::Description::DescriptionValue(d.to_owned())),
};
let event = proto::Event {
event: Some(proto::event::Event::Metadata(metadata)),
};
let mut buf = Vec::new();
event.encode_length_delimited(&mut buf)?;
Ok(Bytes::from(buf))
}
fn convert_metric_to_protobuf_encoded(key: Key, value: MetricValue) -> Result<Bytes, EncodeError> {
let name = key.name().to_string();
let labels = key
@ -442,9 +521,12 @@ fn convert_metric_to_protobuf_encoded(key: Key, value: MetricValue) -> Result<By
timestamp: Some(now),
value: Some(mvalue),
};
let event = proto::Event {
event: Some(proto::event::Event::Metric(metric)),
};
let mut buf = Vec::new();
metric.encode_length_delimited(&mut buf)?;
event.encode_length_delimited(&mut buf)?;
Ok(Bytes::from(buf))
}

View File

@ -0,0 +1,11 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
<!-- next-header -->
## [Unreleased] - ReleaseDate
### Added
- Effective birth of the crate.

View File

@ -7,6 +7,7 @@ use proc_macro2::Span;
use proc_macro_hack::proc_macro_hack;
use quote::{format_ident, quote, ToTokens};
use regex::Regex;
use syn::parse::discouraged::Speculative;
use syn::parse::{Error, Parse, ParseStream, Result};
use syn::{parse_macro_input, Expr, LitStr, Token};
@ -45,6 +46,7 @@ struct WithExpression {
struct Registration {
key: Key,
unit: Option<Expr>,
description: Option<LitStr>,
labels: Option<Labels>,
}
@ -79,30 +81,78 @@ impl Parse for Registration {
fn parse(mut input: ParseStream) -> Result<Self> {
let key = read_key(&mut input)?;
// We accept three possible parameters: unit, description, and labels.
//
// If our first parameter is a literal string, we either have the description and no labels,
// or a description and labels. Peek at the trailing token after the description to see if
// we need to keep parsing.
// This may or may not be the start of labels, if the description has been omitted, so
// we hold on to it until we can make sure nothing else is behind it, or if it's a full
// fledged set of labels.
let (description, labels) = if input.peek(Token![,]) && input.peek3(Token![=>]) {
let (unit, description, labels) = if input.peek(Token![,]) && input.peek3(Token![=>]) {
// We have a ", <something> =>" pattern, which can only be labels, so we have no
// description.
// unit or description.
let labels = parse_labels(&mut input)?;
(None, labels)
(None, None, labels)
} else if input.peek(Token![,]) && input.peek2(LitStr) {
// We already know we're not working with labels only, and if we have ", <literal
// string>" then we have to at least have a description, possibly with labels.
input.parse::<Token![,]>()?;
let description = input.parse::<LitStr>().ok();
let labels = parse_labels(&mut input)?;
(description, labels)
} else {
// We might have labels passed as an expression.
(None, description, labels)
} else if input.peek(Token![,]) {
// We may or may not have anything left to parse here, but it could also be any
// combination of unit + description and/or labels.
//
// We speculatively try and parse an expression from the buffer, and see if we can match
// it to the qualified name of the Unit enum. We run all of the other normal parsing
// after that for description and labels.
let forked = input.fork();
forked.parse::<Token![,]>()?;
let unit = if let Ok(Expr::Path(path)) = forked.parse::<Expr>() {
let qname = path
.path
.segments
.iter()
.map(|x| x.ident.to_string())
.collect::<Vec<_>>()
.join("::");
if qname.starts_with("metrics::Unit") || qname.starts_with("Unit") {
Some(Expr::Path(path))
} else {
None
}
} else {
None
};
// If we succeeded, advance the main parse stream up to where the fork left off.
if unit.is_some() {
input.advance_to(&forked);
}
// We still have to check for a possible description.
let description =
if input.peek(Token![,]) && input.peek2(LitStr) && !input.peek3(Token![=>]) {
input.parse::<Token![,]>()?;
input.parse::<LitStr>().ok()
} else {
None
};
let labels = parse_labels(&mut input)?;
(None, labels)
(unit, description, labels)
} else {
(None, None, None)
};
Ok(Registration {
key,
unit,
description,
labels,
})
@ -113,33 +163,36 @@ impl Parse for Registration {
pub fn register_counter(input: TokenStream) -> TokenStream {
let Registration {
key,
unit,
description,
labels,
} = parse_macro_input!(input as Registration);
get_expanded_registration("counter", key, description, labels).into()
get_expanded_registration("counter", key, unit, description, labels).into()
}
#[proc_macro_hack]
pub fn register_gauge(input: TokenStream) -> TokenStream {
let Registration {
key,
unit,
description,
labels,
} = parse_macro_input!(input as Registration);
get_expanded_registration("gauge", key, description, labels).into()
get_expanded_registration("gauge", key, unit, description, labels).into()
}
#[proc_macro_hack]
pub fn register_histogram(input: TokenStream) -> TokenStream {
let Registration {
key,
unit,
description,
labels,
} = parse_macro_input!(input as Registration);
get_expanded_registration("histogram", key, description, labels).into()
get_expanded_registration("histogram", key, unit, description, labels).into()
}
#[proc_macro_hack]
@ -187,12 +240,18 @@ pub fn histogram(input: TokenStream) -> TokenStream {
fn get_expanded_registration(
metric_type: &str,
key: Key,
unit: Option<Expr>,
description: Option<LitStr>,
labels: Option<Labels>,
) -> proc_macro2::TokenStream {
let register_ident = format_ident!("register_{}", metric_type);
let key = key_to_quoted(key, labels);
let unit = match unit {
Some(e) => quote! { Some(#e) },
None => quote! { None },
};
let description = match description {
Some(s) => quote! { Some(#s) },
None => quote! { None },
@ -204,7 +263,7 @@ fn get_expanded_registration(
if let Some(recorder) = metrics::try_recorder() {
// Registrations are fairly rare, don't attempt to cache here
// and just use an owned ref.
recorder.#register_ident(metrics::Key::Owned(#key), #description);
recorder.#register_ident(metrics::Key::Owned(#key), #unit, #description);
}
}
}

View File

@ -1,10 +1,11 @@
use syn::parse_quote;
use syn::{Expr, ExprPath};
use super::*;
#[test]
fn test_quote_key_name_scoped() {
let stream = quote_key_name(Key::Scoped(parse_quote! {"qwerty"}));
let stream = quote_key_name(Key::Scoped(parse_quote! { "qwerty" }));
let expected =
"format ! (\"{}.{}\" , std :: module_path ! () . replace (\"::\" , \".\") , \"qwerty\")";
assert_eq!(stream.to_string(), expected);
@ -12,16 +13,18 @@ fn test_quote_key_name_scoped() {
#[test]
fn test_quote_key_name_not_scoped() {
let stream = quote_key_name(Key::NotScoped(parse_quote! {"qwerty"}));
let stream = quote_key_name(Key::NotScoped(parse_quote! { "qwerty" }));
let expected = "\"qwerty\"";
assert_eq!(stream.to_string(), expected);
}
#[test]
fn test_get_expanded_registration() {
// Basic registration.
let stream = get_expanded_registration(
"mytype",
Key::NotScoped(parse_quote! {"mykeyname"}),
Key::NotScoped(parse_quote! { "mykeyname" }),
None,
None,
None,
);
@ -30,6 +33,7 @@ fn test_get_expanded_registration() {
"{ if let Some (recorder) = metrics :: try_recorder () { ",
"recorder . register_mytype (",
"metrics :: Key :: Owned (metrics :: KeyData :: from_name (\"mykeyname\")) , ",
"None , ",
"None",
") ; ",
"} }",
@ -38,6 +42,80 @@ fn test_get_expanded_registration() {
assert_eq!(stream.to_string(), expected);
}
#[test]
fn test_get_expanded_registration_with_unit() {
// Now with unit.
let units: ExprPath = parse_quote! { metrics::Unit::Nanoseconds };
let stream = get_expanded_registration(
"mytype",
Key::NotScoped(parse_quote! { "mykeyname" }),
Some(Expr::Path(units)),
None,
None,
);
let expected = concat!(
"{ if let Some (recorder) = metrics :: try_recorder () { ",
"recorder . register_mytype (",
"metrics :: Key :: Owned (metrics :: KeyData :: from_name (\"mykeyname\")) , ",
"Some (metrics :: Unit :: Nanoseconds) , ",
"None",
") ; ",
"} }",
);
assert_eq!(stream.to_string(), expected);
}
#[test]
fn test_get_expanded_registration_with_description() {
// And with description.
let stream = get_expanded_registration(
"mytype",
Key::NotScoped(parse_quote! { "mykeyname" }),
None,
Some(parse_quote! { "flerkin" }),
None,
);
let expected = concat!(
"{ if let Some (recorder) = metrics :: try_recorder () { ",
"recorder . register_mytype (",
"metrics :: Key :: Owned (metrics :: KeyData :: from_name (\"mykeyname\")) , ",
"None , ",
"Some (\"flerkin\")",
") ; ",
"} }",
);
assert_eq!(stream.to_string(), expected);
}
#[test]
fn test_get_expanded_registration_with_unit_and_description() {
// And with unit and description.
let units: ExprPath = parse_quote! { metrics::Unit::Nanoseconds };
let stream = get_expanded_registration(
"mytype",
Key::NotScoped(parse_quote! { "mykeyname" }),
Some(Expr::Path(units)),
Some(parse_quote! { "flerkin" }),
None,
);
let expected = concat!(
"{ if let Some (recorder) = metrics :: try_recorder () { ",
"recorder . register_mytype (",
"metrics :: Key :: Owned (metrics :: KeyData :: from_name (\"mykeyname\")) , ",
"Some (metrics :: Unit :: Nanoseconds) , ",
"Some (\"flerkin\")",
") ; ",
"} }",
);
assert_eq!(stream.to_string(), expected);
}
/// If there are no dynamic labels - generate an invocation with caching.
#[test]
fn test_get_expanded_callsite_fast_path() {

4
metrics-observer/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/target
**/*.rs.bk
Cargo.lock
/.vscode

View File

@ -0,0 +1,11 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
<!-- next-header -->
## [Unreleased] - ReleaseDate
### Added
- Effective birth of the crate.

View File

@ -4,6 +4,22 @@ import "google/protobuf/timestamp.proto";
package event.proto;
message Metadata {
string name = 1;
enum MetricType {
COUNTER = 0;
GAUGE = 1;
HISTOGRAM = 2;
}
MetricType metric_type = 2;
oneof unit {
string unit_value = 3;
}
oneof description {
string description_value = 4;
}
}
message Metric {
string name = 1;
google.protobuf.Timestamp timestamp = 2;
@ -26,3 +42,10 @@ message Gauge {
message Histogram {
uint64 value = 1;
}
message Event {
oneof event {
Metadata metadata = 1;
Metric metric = 2;
}
}

View File

@ -2,7 +2,7 @@ use std::io;
use std::thread;
use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, TrySendError, RecvTimeoutError};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, TrySendError};
use termion::event::Key;
use termion::input::TermRead;
@ -37,4 +37,4 @@ impl InputEvents {
Err(e) => Err(e),
}
}
}
}

View File

@ -1,21 +1,27 @@
use std::fmt;
use std::num::FpCategory;
use std::time::Duration;
use std::{error::Error, io};
use chrono::Local;
use metrics::Unit;
use termion::{event::Key, input::MouseTerminal, raw::IntoRawMode, screen::AlternateScreen};
use tui::{
backend::TermionBackend,
layout::{Constraint, Direction, Layout},
style::{Color, Modifier, Style},
text::{Span, Spans},
widgets::{Block, Borders, Paragraph, Wrap, List, ListItem},
widgets::{Block, Borders, List, ListItem, Paragraph, Wrap},
Terminal,
};
mod input;
use self::input::InputEvents;
mod metrics;
use self::metrics::{ClientState, MetricData};
// Module name/crate name collision that we have to deal with.
#[path = "metrics.rs"]
mod metrics_inner;
use self::metrics_inner::{ClientState, MetricData};
mod selector;
use self::selector::Selector;
@ -28,7 +34,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let mut terminal = Terminal::new(backend)?;
let mut events = InputEvents::new();
let client = metrics::Client::new("127.0.0.1:5000".to_string());
let client = metrics_inner::Client::new("127.0.0.1:5000".to_string());
let mut selector = Selector::new();
loop {
@ -36,12 +42,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let chunks = Layout::default()
.direction(Direction::Vertical)
.margin(1)
.constraints(
[
Constraint::Length(4),
Constraint::Percentage(90)
].as_ref()
)
.constraints([Constraint::Length(4), Constraint::Percentage(90)].as_ref())
.split(f.size());
let current_dt = Local::now().format(" (%Y/%m/%d %I:%M:%S %p)").to_string();
@ -58,7 +59,7 @@ fn main() -> Result<(), Box<dyn Error>> {
}
Spans::from(spans)
},
}
ClientState::Connected => Spans::from(vec![
Span::raw("state: "),
Span::styled("connected", Style::default().fg(Color::Green)),
@ -67,7 +68,10 @@ fn main() -> Result<(), Box<dyn Error>> {
let header_block = Block::default()
.title(vec![
Span::styled("metrics-observer", Style::default().add_modifier(Modifier::BOLD)),
Span::styled(
"metrics-observer",
Style::default().add_modifier(Modifier::BOLD),
),
Span::raw(current_dt),
])
.borders(Borders::ALL);
@ -87,42 +91,55 @@ fn main() -> Result<(), Box<dyn Error>> {
// Knock 5 off the line width to account for 3-character highlight symbol + borders.
let line_width = chunks[1].width.saturating_sub(6) as usize;
let items = client.with_metrics(|metrics| {
let mut items = Vec::new();
for (key, value) in metrics.iter() {
let inner_key = key.key();
let name = inner_key.name();
let labels = inner_key.labels().map(|label| format!("{} = {}", label.key(), label.value())).collect::<Vec<_>>();
let display_name = if labels.is_empty() {
name.to_string()
} else {
format!("{} [{}]", name, labels.join(", "))
};
let mut items = Vec::new();
let metrics = client.get_metrics();
for (key, value, unit, _desc) in metrics {
let inner_key = key.key();
let name = inner_key.name();
let labels = inner_key
.labels()
.map(|label| format!("{} = {}", label.key(), label.value()))
.collect::<Vec<_>>();
let display_name = if labels.is_empty() {
name.to_string()
} else {
format!("{} [{}]", name, labels.join(", "))
};
let display_value = match value {
MetricData::Counter(value) => format!("total: {}", value),
MetricData::Gauge(value) => format!("current: {}", value),
MetricData::Histogram(value) => {
let min = value.min();
let max = value.max();
let p50 = value.value_at_quantile(0.5);
let p99 = value.value_at_quantile(0.99);
let p999 = value.value_at_quantile(0.999);
let display_value = match value {
MetricData::Counter(value) => {
format!("total: {}", u64_to_displayable(value, unit))
}
MetricData::Gauge(value) => {
format!("current: {}", f64_to_displayable(value, unit))
}
MetricData::Histogram(value) => {
let min = value.min();
let max = value.max();
let p50 = value.value_at_quantile(0.5);
let p99 = value.value_at_quantile(0.99);
let p999 = value.value_at_quantile(0.999);
format!("min: {} p50: {} p99: {} p999: {} max: {}",
min, p50, p99, p999, max)
},
};
format!(
"min: {} p50: {} p99: {} p999: {} max: {}",
u64_to_displayable(min, unit.clone()),
u64_to_displayable(p50, unit.clone()),
u64_to_displayable(p99, unit.clone()),
u64_to_displayable(p999, unit.clone()),
u64_to_displayable(max, unit),
)
}
};
let name_length = display_name.chars().count();
let value_length = display_value.chars().count();
let space = line_width.saturating_sub(name_length).saturating_sub(value_length);
let name_length = display_name.chars().count();
let value_length = display_value.chars().count();
let space = line_width
.saturating_sub(name_length)
.saturating_sub(value_length);
let display = format!("{}{}{}", display_name, " ".repeat(space), display_value);
items.push(ListItem::new(display));
}
items
});
let display = format!("{}{}{}", display_name, " ".repeat(space), display_value);
items.push(ListItem::new(display));
}
selector.set_length(items.len());
let metrics_block = Block::default()
@ -132,7 +149,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let metrics = List::new(items)
.block(metrics_block)
.highlight_symbol(">> ");
f.render_stateful_widget(metrics, chunks[1], selector.state());
})?;
@ -145,10 +162,203 @@ fn main() -> Result<(), Box<dyn Error>> {
Key::Down => selector.next(),
Key::PageUp => selector.top(),
Key::PageDown => selector.bottom(),
_ => {},
_ => {}
}
}
}
Ok(())
}
}
fn u64_to_displayable(value: u64, unit: Option<Unit>) -> String {
let unit = match unit {
None => return value.to_string(),
Some(inner) => inner,
};
if unit.is_time_based() {
return u64_time_to_displayable(value, unit);
}
let label = unit.as_canonical_label();
format!("{}{}", value, label)
}
fn f64_to_displayable(value: f64, unit: Option<Unit>) -> String {
let unit = match unit {
None => return value.to_string(),
Some(inner) => inner,
};
if unit.is_time_based() {
return f64_time_to_displayable(value, unit);
}
let label = unit.as_canonical_label();
format!("{}{}", value, label)
}
fn u64_time_to_displayable(value: u64, unit: Unit) -> String {
let dur = match unit {
Unit::Nanoseconds => Duration::from_nanos(value),
Unit::Microseconds => Duration::from_micros(value),
Unit::Milliseconds => Duration::from_millis(value),
Unit::Seconds => Duration::from_secs(value),
// If it's not a time-based unit, then just format the value plainly.
_ => return value.to_string(),
};
format!("{:?}", TruncatedDuration(dur))
}
fn f64_time_to_displayable(value: f64, unit: Unit) -> String {
// Calculate how much we need to scale the value by, since `Duration` only takes f64 values if
// they are at the seconds granularity, although obviously they could contain significant digits
// for subsecond precision.
let scaling_factor = match unit {
Unit::Nanoseconds => Some(1_000_000_000.0),
Unit::Microseconds => Some(1_000_000.0),
Unit::Milliseconds => Some(1_000.0),
Unit::Seconds => None,
// If it's not a time-based unit, then just format the value plainly.
_ => return value.to_string(),
};
let adjusted = match scaling_factor {
Some(factor) => value / factor,
None => value,
};
let sign = if adjusted < 0.0 { "-" } else { "" };
let normalized = adjusted.abs();
if !normalized.is_normal() && normalized.classify() != FpCategory::Zero {
// We need a normalized number, but unlike `is_normal`, `Duration` is fine with a value that
// is at zero, so we just exclude that here.
return value.to_string();
}
let dur = Duration::from_secs_f64(normalized);
format!("{}{:?}", sign, TruncatedDuration(dur))
}
struct TruncatedDuration(Duration);
impl fmt::Debug for TruncatedDuration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
/// Formats a floating point number in decimal notation.
///
/// The number is given as the `integer_part` and a fractional part.
/// The value of the fractional part is `fractional_part / divisor`. So
/// `integer_part` = 3, `fractional_part` = 12 and `divisor` = 100
/// represents the number `3.012`. Trailing zeros are omitted.
///
/// `divisor` must not be above 100_000_000. It also should be a power
/// of 10, everything else doesn't make sense. `fractional_part` has
/// to be less than `10 * divisor`!
fn fmt_decimal(
f: &mut fmt::Formatter<'_>,
mut integer_part: u64,
mut fractional_part: u32,
mut divisor: u32,
precision: usize,
) -> fmt::Result {
// Encode the fractional part into a temporary buffer. The buffer
// only need to hold 9 elements, because `fractional_part` has to
// be smaller than 10^9. The buffer is prefilled with '0' digits
// to simplify the code below.
let mut buf = [b'0'; 9];
let precision = if precision > 9 { 9 } else { precision };
// The next digit is written at this position
let mut pos = 0;
// We keep writing digits into the buffer while there are non-zero
// digits left and we haven't written enough digits yet.
while fractional_part > 0 && pos < precision {
// Write new digit into the buffer
buf[pos] = b'0' + (fractional_part / divisor) as u8;
fractional_part %= divisor;
divisor /= 10;
pos += 1;
}
// If a precision < 9 was specified, there may be some non-zero
// digits left that weren't written into the buffer. In that case we
// need to perform rounding to match the semantics of printing
// normal floating point numbers. However, we only need to do work
// when rounding up. This happens if the first digit of the
// remaining ones is >= 5.
if fractional_part > 0 && fractional_part >= divisor * 5 {
// Round up the number contained in the buffer. We go through
// the buffer backwards and keep track of the carry.
let mut rev_pos = pos;
let mut carry = true;
while carry && rev_pos > 0 {
rev_pos -= 1;
// If the digit in the buffer is not '9', we just need to
// increment it and can stop then (since we don't have a
// carry anymore). Otherwise, we set it to '0' (overflow)
// and continue.
if buf[rev_pos] < b'9' {
buf[rev_pos] += 1;
carry = false;
} else {
buf[rev_pos] = b'0';
}
}
// If we still have the carry bit set, that means that we set
// the whole buffer to '0's and need to increment the integer
// part.
if carry {
integer_part += 1;
}
}
// If we haven't emitted a single fractional digit and the precision
// wasn't set to a non-zero value, we don't print the decimal point.
if pos == 0 {
write!(f, "{}", integer_part)
} else {
// SAFETY: We are only writing ASCII digits into the buffer and it was
// initialized with '0's, so it contains valid UTF8.
let s = unsafe { std::str::from_utf8_unchecked(&buf[..pos]) };
let s = s.trim_end_matches('0');
write!(f, "{}.{}", integer_part, s)
}
}
// Print leading '+' sign if requested
if f.sign_plus() {
write!(f, "+")?;
}
let secs = self.0.as_secs();
let sub_nanos = self.0.subsec_nanos();
let nanos = self.0.as_nanos();
if secs > 0 {
fmt_decimal(f, secs, sub_nanos, 100_000_000, 3)?;
f.write_str("s")
} else if nanos >= 1_000_000 {
fmt_decimal(
f,
nanos as u64 / 1_000_000,
(nanos % 1_000_000) as u32,
100_000,
2,
)?;
f.write_str("ms")
} else if nanos >= 1_000 {
fmt_decimal(f, nanos as u64 / 1_000, (nanos % 1_000) as u32, 100, 1)?;
f.write_str("µs")
} else {
fmt_decimal(f, nanos as u64, 0, 1, 0)?;
f.write_str("ns")
}
}
}

View File

@ -1,28 +1,35 @@
use std::collections::HashMap;
use std::io::Read;
use std::net::TcpStream;
use std::time::Duration;
use std::thread;
use std::net::ToSocketAddrs;
use std::sync::{Arc, Mutex, RwLock};
use std::io::Read;
use std::thread;
use std::time::Duration;
use bytes::{BufMut, BytesMut};
use prost::Message;
use hdrhistogram::Histogram;
use prost::Message;
use metrics::{Label, KeyData};
use metrics::{KeyData, Label, Unit};
use metrics_util::{CompositeKey, MetricKind};
mod proto {
include!(concat!(env!("OUT_DIR"), "/event.proto.rs"));
}
use self::proto::{
event::Event,
metadata::{Description as DescriptionMetadata, MetricType, Unit as UnitMetadata},
Event as EventWrapper,
};
#[derive(Clone)]
pub enum ClientState {
Disconnected(Option<String>),
Connected,
}
#[derive(Clone)]
pub enum MetricData {
Counter(u64),
Gauge(f64),
@ -32,6 +39,7 @@ pub enum MetricData {
pub struct Client {
state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<HashMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<Unit>, Option<String>)>>>,
handle: thread::JoinHandle<()>,
}
@ -39,11 +47,13 @@ impl Client {
pub fn new(addr: String) -> Client {
let state = Arc::new(Mutex::new(ClientState::Disconnected(None)));
let metrics = Arc::new(RwLock::new(HashMap::new()));
let metadata = Arc::new(RwLock::new(HashMap::new()));
let handle = {
let state = state.clone();
let metrics = metrics.clone();
let metadata = metadata.clone();
thread::spawn(move || {
let mut runner = Runner::new(addr, state, metrics);
let mut runner = Runner::new(addr, state, metrics, metadata);
runner.run();
})
};
@ -51,6 +61,7 @@ impl Client {
Client {
state,
metrics,
metadata,
handle,
}
}
@ -59,12 +70,22 @@ impl Client {
self.state.lock().unwrap().clone()
}
pub fn with_metrics<F, T>(&self, f: F) -> T
where
F: FnOnce(&HashMap<CompositeKey, MetricData>) -> T,
{
let handle = self.metrics.read().unwrap();
f(&handle)
pub fn get_metrics(&self) -> Vec<(CompositeKey, MetricData, Option<Unit>, Option<String>)> {
let metrics = self.metrics.read().unwrap();
let metadata = self.metadata.read().unwrap();
metrics
.iter()
.map(|(k, v)| {
let metakey = (k.kind(), k.key().name().to_string());
let (unit, desc) = match metadata.get(&metakey) {
Some((unit, desc)) => (unit.clone(), desc.clone()),
None => (None, None),
};
(k.clone(), v.clone(), unit, desc)
})
.collect()
}
}
@ -79,6 +100,7 @@ struct Runner {
addr: String,
client_state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<HashMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<Unit>, Option<String>)>>>,
}
impl Runner {
@ -86,12 +108,14 @@ impl Runner {
addr: String,
state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<HashMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<Unit>, Option<String>)>>>,
) -> Runner {
Runner {
state: RunnerState::Disconnected,
addr,
client_state: state,
metrics,
metadata,
}
}
@ -104,38 +128,47 @@ impl Runner {
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(None);
}
// Try to connect to our target and transition into Connected.
let addr = match self.addr.to_socket_addrs() {
Ok(mut addrs) => match addrs.next() {
Some(addr) => addr,
None => {
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(Some("failed to resolve specified host".to_string()));
*state = ClientState::Disconnected(Some(
"failed to resolve specified host".to_string(),
));
break;
}
}
},
Err(_) => {
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(Some("failed to resolve specified host".to_string()));
*state = ClientState::Disconnected(Some(
"failed to resolve specified host".to_string(),
));
break;
}
};
match TcpStream::connect_timeout(&addr, Duration::from_secs(3)) {
Ok(stream) => RunnerState::Connected(stream),
Err(_) => {
RunnerState::ErrorBackoff("error while connecting", Duration::from_secs(3))
}
Err(_) => RunnerState::ErrorBackoff(
"error while connecting",
Duration::from_secs(3),
),
}
},
}
RunnerState::ErrorBackoff(msg, dur) => {
{
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(Some(format!("{}, retrying in {} seconds...", msg, dur.as_secs())));
*state = ClientState::Disconnected(Some(format!(
"{}, retrying in {} seconds...",
msg,
dur.as_secs()
)));
}
thread::sleep(dur);
RunnerState::Disconnected
},
}
RunnerState::Connected(ref mut stream) => {
{
let mut state = self.client_state.lock().unwrap();
@ -144,53 +177,108 @@ impl Runner {
let mut buf = BytesMut::new();
let mut rbuf = [0u8; 1024];
loop {
match stream.read(&mut rbuf[..]) {
Ok(0) => break,
Ok(n) => buf.put_slice(&rbuf[..n]),
Err(e) => eprintln!("read error: {:?}", e),
};
match proto::Metric::decode_length_delimited(&mut buf) {
Err(e) => eprintln!("decode error: {:?}", e),
Ok(msg) => {
let mut labels_raw = msg.labels.into_iter().collect::<Vec<_>>();
labels_raw.sort_by(|a, b| a.0.cmp(&b.0));
let labels = labels_raw.into_iter().map(|(k, v)| Label::new(k, v)).collect::<Vec<_>>();
let key_data: KeyData = (msg.name, labels).into();
match msg.value.expect("no metric value") {
proto::metric::Value::Counter(value) => {
let key = CompositeKey::new(MetricKind::Counter, key_data.into());
let mut metrics = self.metrics.write().unwrap();
let counter = metrics.entry(key).or_insert_with(|| MetricData::Counter(0));
if let MetricData::Counter(inner) = counter {
*inner += value.value;
}
},
proto::metric::Value::Gauge(value) => {
let key = CompositeKey::new(MetricKind::Gauge, key_data.into());
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics.entry(key).or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
*inner = value.value;
}
},
proto::metric::Value::Histogram(value) => {
let key = CompositeKey::new(MetricKind::Histogram, key_data.into());
let mut metrics = self.metrics.write().unwrap();
let histogram = metrics.entry(key).or_insert_with(|| {
let histogram = Histogram::new(3).expect("failed to create histogram");
MetricData::Histogram(histogram)
});
let event = match EventWrapper::decode_length_delimited(&mut buf) {
Err(e) => {
eprintln!("decode error: {:?}", e);
continue;
}
Ok(event) => event,
};
if let MetricData::Histogram(inner) = histogram {
inner.record(value.value).expect("failed to record value to histogram");
}
},
if let Some(event) = event.event {
match event {
Event::Metadata(metadata) => {
let metric_type = MetricType::from_i32(metadata.metric_type)
.expect("unknown metric type over wire");
let metric_type = match metric_type {
MetricType::Counter => MetricKind::Counter,
MetricType::Gauge => MetricKind::Gauge,
MetricType::Histogram => MetricKind::Histogram,
};
let key = (metric_type, metadata.name);
let mut mmap = self
.metadata
.write()
.expect("failed to get metadata write lock");
let entry = mmap.entry(key).or_insert((None, None));
let (uentry, dentry) = entry;
*uentry = metadata
.unit
.map(|u| match u {
UnitMetadata::UnitValue(us) => us,
})
.and_then(|s| Unit::from_str(s.as_str()));
*dentry = metadata.description.map(|d| match d {
DescriptionMetadata::DescriptionValue(ds) => ds,
});
}
},
Event::Metric(metric) => {
let mut labels_raw =
metric.labels.into_iter().collect::<Vec<_>>();
labels_raw.sort_by(|a, b| a.0.cmp(&b.0));
let labels = labels_raw
.into_iter()
.map(|(k, v)| Label::new(k, v))
.collect::<Vec<_>>();
let key_data: KeyData = (metric.name, labels).into();
match metric.value.expect("no metric value") {
proto::metric::Value::Counter(value) => {
let key = CompositeKey::new(
MetricKind::Counter,
key_data.into(),
);
let mut metrics = self.metrics.write().unwrap();
let counter = metrics
.entry(key)
.or_insert_with(|| MetricData::Counter(0));
if let MetricData::Counter(inner) = counter {
*inner += value.value;
}
}
proto::metric::Value::Gauge(value) => {
let key = CompositeKey::new(
MetricKind::Gauge,
key_data.into(),
);
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics
.entry(key)
.or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
*inner = value.value;
}
}
proto::metric::Value::Histogram(value) => {
let key = CompositeKey::new(
MetricKind::Histogram,
key_data.into(),
);
let mut metrics = self.metrics.write().unwrap();
let histogram =
metrics.entry(key).or_insert_with(|| {
let histogram = Histogram::new(3)
.expect("failed to create histogram");
MetricData::Histogram(histogram)
});
if let MetricData::Histogram(inner) = histogram {
inner
.record(value.value)
.expect("failed to record value to histogram");
}
}
}
}
}
}
}
@ -200,4 +288,4 @@ impl Runner {
self.state = next;
}
}
}
}

View File

@ -55,4 +55,4 @@ impl Selector {
};
self.1.select(Some(i));
}
}
}

View File

@ -55,7 +55,7 @@
#![deny(missing_docs)]
use metrics::{Key, KeyData, Label, Recorder};
use metrics::{Key, KeyData, Label, Recorder, Unit};
use metrics_util::layers::Layer;
use tracing::Span;
@ -136,16 +136,16 @@ where
R: Recorder,
F: LabelFilter,
{
fn register_counter(&self, key: Key, description: Option<&'static str>) {
self.inner.register_counter(key, description)
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_counter(key, unit, description)
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
self.inner.register_gauge(key, description)
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_gauge(key, unit, description)
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
self.inner.register_histogram(key, description)
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_histogram(key, unit, description)
}
fn increment_counter(&self, key: Key, value: u64) {

View File

@ -1,5 +1,3 @@
use std::collections::HashSet;
use metrics::{counter, KeyData, Label};
use metrics_tracing_context::{LabelFilter, MetricsLayer, TracingContextLayer};
use metrics_util::{layers::Layer, DebugValue, DebuggingRecorder, MetricKind, Snapshotter};
@ -63,6 +61,8 @@ fn test_basic_functionality() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
)]
)
@ -89,7 +89,6 @@ fn test_macro_forms() {
"service" => "login_service", "node_name" => node_name.clone());
let snapshot = snapshotter.snapshot();
let snapshot: HashSet<_> = snapshot.into_iter().collect();
assert_eq!(
snapshot,
@ -104,6 +103,8 @@ fn test_macro_forms() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),
(
@ -117,6 +118,8 @@ fn test_macro_forms() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),
(
@ -130,6 +133,8 @@ fn test_macro_forms() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),
(
@ -144,11 +149,11 @@ fn test_macro_forms() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),
]
.into_iter()
.collect()
)
}
@ -168,6 +173,8 @@ fn test_no_labels() {
vec![(
MetricKind::Counter,
KeyData::from_name("login_attempts").into(),
None,
None,
DebugValue::Counter(1),
)]
)
@ -211,7 +218,6 @@ fn test_multiple_paths_to_the_same_callsite() {
path2();
let snapshot = snapshotter.snapshot();
let snapshot: HashSet<_> = snapshot.into_iter().collect();
assert_eq!(
snapshot,
@ -227,6 +233,8 @@ fn test_multiple_paths_to_the_same_callsite() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),
(
@ -240,11 +248,11 @@ fn test_multiple_paths_to_the_same_callsite() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
)
]
.into_iter()
.collect()
)
}
@ -282,7 +290,6 @@ fn test_nested_spans() {
outer();
let snapshot = snapshotter.snapshot();
let snapshot: HashSet<_> = snapshot.into_iter().collect();
assert_eq!(
snapshot,
@ -300,10 +307,10 @@ fn test_nested_spans() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),]
.into_iter()
.collect()
)]
)
}
@ -341,6 +348,8 @@ fn test_label_filtering() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
)]
)

View File

@ -32,22 +32,27 @@ harness = false
[dependencies]
metrics = { version = "0.13.0-alpha.1", path = "../metrics", features = ["std"] }
crossbeam-epoch = "0.9"
crossbeam-utils = "0.8"
serde = "1.0"
arc-swap = "0.4"
atomic-shim = "0.1"
parking_lot = "0.11"
crossbeam-epoch = { version = "0.9", optional = true }
crossbeam-utils = { version = "0.8", default-features = false }
arc-swap = { version = "0.4", optional = true }
atomic-shim = { version = "0.1", optional = true }
aho-corasick = { version = "0.7", optional = true }
dashmap = "3"
dashmap = { version = "3", optional = true }
indexmap = { version = "1.6", optional = true }
[dev-dependencies]
bolero = "0.5"
criterion = "0.3"
lazy_static = "1.3"
rand = { version = "0.7", features = ["small_rng"] }
rand_distr = "0.3"
[features]
default = ["std", "layer-filter"]
std = []
default = ["std"]
std = ["arc-swap", "atomic-shim", "crossbeam-epoch", "dashmap", "indexmap"]
layer-filter = ["aho-corasick"]
[[test]]
name = "streaming"
path = "tests/streaming/fuzz_target.rs"
harness = false

View File

@ -112,7 +112,7 @@ pub struct AtomicBucket<T> {
impl<T> AtomicBucket<T> {
/// Creates a new, empty bucket.
pub const fn new() -> Self {
pub fn new() -> Self {
AtomicBucket {
tail: Atomic::null(),
}

View File

@ -1,8 +1,11 @@
use std::{hash::Hash, hash::Hasher, sync::Arc};
use core::hash::{Hash, Hasher};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use crate::{handle::Handle, registry::Registry};
use metrics::{Key, Recorder};
use indexmap::IndexMap;
use metrics::{Key, Recorder, Unit};
/// Metric kinds.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy, Ord, PartialOrd)]
@ -60,23 +63,52 @@ impl Hash for DebugValue {
/// Captures point-in-time snapshots of `DebuggingRecorder`.
pub struct Snapshotter {
registry: Arc<Registry<DifferentiatedKey, Handle>>,
metrics: Arc<Mutex<IndexMap<DifferentiatedKey, ()>>>,
units: Arc<Mutex<HashMap<DifferentiatedKey, Unit>>>,
descriptions: Arc<Mutex<HashMap<DifferentiatedKey, &'static str>>>,
}
impl Snapshotter {
/// Takes a snapshot of the recorder.
pub fn snapshot(&self) -> Vec<(MetricKind, Key, DebugValue)> {
let mut metrics = Vec::new();
pub fn snapshot(
&self,
) -> Vec<(
MetricKind,
Key,
Option<Unit>,
Option<&'static str>,
DebugValue,
)> {
let mut snapshot = Vec::new();
let handles = self.registry.get_handles();
for (dkey, handle) in handles {
let (kind, key) = dkey.into_parts();
let value = match kind {
MetricKind::Counter => DebugValue::Counter(handle.read_counter()),
MetricKind::Gauge => DebugValue::Gauge(handle.read_gauge()),
MetricKind::Histogram => DebugValue::Histogram(handle.read_histogram()),
};
metrics.push((kind, key, value));
let metrics = {
let metrics = self.metrics.lock().expect("metrics lock poisoned");
metrics.clone()
};
for (dkey, _) in metrics.into_iter() {
if let Some(handle) = handles.get(&dkey) {
let unit = self
.units
.lock()
.expect("units lock poisoned")
.get(&dkey)
.cloned();
let description = self
.descriptions
.lock()
.expect("descriptions lock poisoned")
.get(&dkey)
.cloned();
let (kind, key) = dkey.into_parts();
let value = match kind {
MetricKind::Counter => DebugValue::Counter(handle.read_counter()),
MetricKind::Gauge => DebugValue::Gauge(handle.read_gauge()),
MetricKind::Histogram => DebugValue::Histogram(handle.read_histogram()),
};
snapshot.push((kind, key, unit, description, value));
}
}
metrics
snapshot
}
}
@ -86,6 +118,9 @@ impl Snapshotter {
/// to the raw values.
pub struct DebuggingRecorder {
registry: Arc<Registry<DifferentiatedKey, Handle>>,
metrics: Arc<Mutex<IndexMap<DifferentiatedKey, ()>>>,
units: Arc<Mutex<HashMap<DifferentiatedKey, Unit>>>,
descriptions: Arc<Mutex<HashMap<DifferentiatedKey, &'static str>>>,
}
impl DebuggingRecorder {
@ -93,6 +128,9 @@ impl DebuggingRecorder {
pub fn new() -> DebuggingRecorder {
DebuggingRecorder {
registry: Arc::new(Registry::new()),
metrics: Arc::new(Mutex::new(IndexMap::new())),
units: Arc::new(Mutex::new(HashMap::new())),
descriptions: Arc::new(Mutex::new(HashMap::new())),
}
}
@ -100,6 +138,32 @@ impl DebuggingRecorder {
pub fn snapshotter(&self) -> Snapshotter {
Snapshotter {
registry: self.registry.clone(),
metrics: self.metrics.clone(),
units: self.units.clone(),
descriptions: self.descriptions.clone(),
}
}
fn register_metric(&self, rkey: DifferentiatedKey) {
let mut metrics = self.metrics.lock().expect("metrics lock poisoned");
let _ = metrics.insert(rkey.clone(), ());
}
fn insert_unit_description(
&self,
rkey: DifferentiatedKey,
unit: Option<Unit>,
description: Option<&'static str>,
) {
if let Some(unit) = unit {
let mut units = self.units.lock().expect("units lock poisoned");
let uentry = units.entry(rkey.clone()).or_insert_with(|| unit.clone());
*uentry = unit;
}
if let Some(description) = description {
let mut descriptions = self.descriptions.lock().expect("description lock poisoned");
let dentry = descriptions.entry(rkey).or_insert_with(|| description);
*dentry = description;
}
}
@ -110,23 +174,30 @@ impl DebuggingRecorder {
}
impl Recorder for DebuggingRecorder {
fn register_counter(&self, key: Key, _description: Option<&'static str>) {
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let rkey = DifferentiatedKey(MetricKind::Counter, key);
self.register_metric(rkey.clone());
self.insert_unit_description(rkey.clone(), unit, description);
self.registry.op(rkey, |_| {}, || Handle::counter())
}
fn register_gauge(&self, key: Key, _description: Option<&'static str>) {
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let rkey = DifferentiatedKey(MetricKind::Gauge, key);
self.register_metric(rkey.clone());
self.insert_unit_description(rkey.clone(), unit, description);
self.registry.op(rkey, |_| {}, || Handle::gauge())
}
fn register_histogram(&self, key: Key, _description: Option<&'static str>) {
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let rkey = DifferentiatedKey(MetricKind::Histogram, key);
self.register_metric(rkey.clone());
self.insert_unit_description(rkey.clone(), unit, description);
self.registry.op(rkey, |_| {}, || Handle::histogram())
}
fn increment_counter(&self, key: Key, value: u64) {
let rkey = DifferentiatedKey(MetricKind::Counter, key);
self.register_metric(rkey.clone());
self.registry.op(
rkey,
|handle| handle.increment_counter(value),
@ -136,6 +207,7 @@ impl Recorder for DebuggingRecorder {
fn update_gauge(&self, key: Key, value: f64) {
let rkey = DifferentiatedKey(MetricKind::Gauge, key);
self.register_metric(rkey.clone());
self.registry.op(
rkey,
|handle| handle.update_gauge(value),
@ -145,6 +217,7 @@ impl Recorder for DebuggingRecorder {
fn record_histogram(&self, key: Key, value: u64) {
let rkey = DifferentiatedKey(MetricKind::Histogram, key);
self.register_metric(rkey.clone());
self.registry.op(
rkey,
|handle| handle.record_histogram(value),

View File

@ -1,4 +1,4 @@
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
/// Fans out metrics to multiple recorders.
pub struct Fanout {
@ -6,21 +6,21 @@ pub struct Fanout {
}
impl Recorder for Fanout {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
for recorder in &self.recorders {
recorder.register_counter(key.clone(), description);
recorder.register_counter(key.clone(), unit.clone(), description);
}
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
for recorder in &self.recorders {
recorder.register_gauge(key.clone(), description);
recorder.register_gauge(key.clone(), unit.clone(), description);
}
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
for recorder in &self.recorders {
recorder.register_histogram(key.clone(), description);
recorder.register_histogram(key.clone(), unit.clone(), description);
}
}
@ -73,7 +73,7 @@ impl FanoutBuilder {
mod tests {
use super::FanoutBuilder;
use crate::debugging::DebuggingRecorder;
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
#[test]
fn test_basic_functionality() {
@ -91,8 +91,18 @@ mod tests {
assert_eq!(before1.len(), 0);
assert_eq!(before2.len(), 0);
fanout.register_counter(Key::Owned("tokio.loops".into()), None);
fanout.register_gauge(Key::Owned("hyper.sent_bytes".into()), None);
let ud = &[(Unit::Count, "counter desc"), (Unit::Bytes, "gauge desc")];
fanout.register_counter(
Key::Owned("tokio.loops".into()),
Some(ud[0].0.clone()),
Some(ud[0].1),
);
fanout.register_gauge(
Key::Owned("hyper.sent_bytes".into()),
Some(ud[1].0.clone()),
Some(ud[1].1),
);
fanout.increment_counter(Key::Owned("tokio.loops".into()), 47);
fanout.update_gauge(Key::Owned("hyper.sent_bytes".into()), 12.0);
@ -101,11 +111,21 @@ mod tests {
assert_eq!(after1.len(), 2);
assert_eq!(after2.len(), 2);
let after = after1.into_iter().zip(after2).collect::<Vec<_>>();
let after = after1
.into_iter()
.zip(after2)
.enumerate()
.collect::<Vec<_>>();
for ((_, k1, v1), (_, k2, v2)) in after {
for (i, ((_, k1, u1, d1, v1), (_, k2, u2, d2, v2))) in after {
assert_eq!(k1, k2);
assert_eq!(u1, u2);
assert_eq!(d1, d2);
assert_eq!(v1, v2);
assert_eq!(Some(ud[i].0.clone()), u1);
assert_eq!(Some(ud[i].0.clone()), u2);
assert_eq!(Some(ud[i].1), d1);
assert_eq!(Some(ud[i].1), d2);
}
}
}

View File

@ -1,6 +1,6 @@
use crate::layers::Layer;
use aho_corasick::{AhoCorasick, AhoCorasickBuilder};
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
/// Filters and discards metrics matching certain name patterns.
///
@ -18,25 +18,25 @@ impl<R> Filter<R> {
}
impl<R: Recorder> Recorder for Filter<R> {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
if self.should_filter(&key) {
return;
}
self.inner.register_counter(key, description)
self.inner.register_counter(key, unit, description)
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
if self.should_filter(&key) {
return;
}
self.inner.register_gauge(key, description)
self.inner.register_gauge(key, unit, description)
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
if self.should_filter(&key) {
return;
}
self.inner.register_histogram(key, description)
self.inner.register_histogram(key, unit, description)
}
fn increment_counter(&self, key: Key, value: u64) {
@ -135,7 +135,7 @@ mod tests {
use super::FilterLayer;
use crate::debugging::DebuggingRecorder;
use crate::layers::Layer;
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
#[test]
fn test_basic_functionality() {
@ -148,17 +148,49 @@ mod tests {
let before = snapshotter.snapshot();
assert_eq!(before.len(), 0);
layered.register_counter(Key::Owned("tokio.loops".into()), None);
layered.register_gauge(Key::Owned("hyper.sent_bytes".into()), None);
layered.register_histogram(Key::Owned("hyper.recv_bytes".into()), None);
layered.register_counter(Key::Owned("bb8.conns".into()), None);
layered.register_gauge(Key::Owned("hyper.tokio.sent_bytes".into()), None);
let ud = &[
(Unit::Count, "counter desc"),
(Unit::Bytes, "gauge desc"),
(Unit::Bytes, "histogram desc"),
(Unit::Count, "counter desc"),
(Unit::Bytes, "gauge desc"),
];
layered.register_counter(
Key::Owned("tokio.loops".into()),
Some(ud[0].0.clone()),
Some(ud[0].1),
);
layered.register_gauge(
Key::Owned("hyper.sent_bytes".into()),
Some(ud[1].0.clone()),
Some(ud[1].1),
);
layered.register_histogram(
Key::Owned("hyper.tokio.sent_bytes".into()),
Some(ud[2].0.clone()),
Some(ud[2].1),
);
layered.register_counter(
Key::Owned("bb8.conns".into()),
Some(ud[3].0.clone()),
Some(ud[3].1),
);
layered.register_gauge(
Key::Owned("hyper.recv_bytes".into()),
Some(ud[4].0.clone()),
Some(ud[4].1),
);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 2);
for (_kind, key, _value) in &after {
for (_kind, key, unit, desc, _value) in after {
assert!(!key.name().contains("tokio") && !key.name().contains("bb8"));
// We cheat here since we're not comparing one-to-one with the source data,
// but we know which metrics are going to make it through so we can hard code.
assert_eq!(Some(Unit::Bytes), unit);
assert!(!desc.unwrap().is_empty() && desc.unwrap() == "gauge desc");
}
}
@ -174,16 +206,16 @@ mod tests {
let before = snapshotter.snapshot();
assert_eq!(before.len(), 0);
layered.register_counter(Key::Owned("tokiO.loops".into()), None);
layered.register_gauge(Key::Owned("hyper.sent_bytes".into()), None);
layered.register_histogram(Key::Owned("hyper.recv_bytes".into()), None);
layered.register_counter(Key::Owned("bb8.conns".into()), None);
layered.register_counter(Key::Owned("Bb8.conns_closed".into()), None);
layered.register_counter(Key::Owned("tokiO.loops".into()), None, None);
layered.register_gauge(Key::Owned("hyper.sent_bytes".into()), None, None);
layered.register_histogram(Key::Owned("hyper.recv_bytes".into()), None, None);
layered.register_counter(Key::Owned("bb8.conns".into()), None, None);
layered.register_counter(Key::Owned("Bb8.conns_closed".into()), None, None);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 2);
for (_kind, key, _value) in &after {
for (_kind, key, _unit, _desc, _value) in &after {
assert!(
!key.name().to_lowercase().contains("tokio")
&& !key.name().to_lowercase().contains("bb8")

View File

@ -8,7 +8,7 @@
//! Here's an example of a layer that filters out all metrics that start with a specific string:
//!
//! ```rust
//! # use metrics::{Key, Recorder};
//! # use metrics::{Key, Recorder, Unit};
//! # use metrics_util::DebuggingRecorder;
//! # use metrics_util::layers::{Layer, Stack, PrefixLayer};
//! // A simple layer that denies any metrics that have "stairway" or "heaven" in their name.
@ -22,25 +22,25 @@
//! }
//!
//! impl<R: Recorder> Recorder for StairwayDeny<R> {
//! fn register_counter(&self, key: Key, description: Option<&'static str>) {
//! fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
//! if self.is_invalid_key(&key) {
//! return;
//! }
//! self.0.register_counter(key, description)
//! self.0.register_counter(key, unit, description)
//! }
//!
//! fn register_gauge(&self, key: Key, description: Option<&'static str>) {
//! fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
//! if self.is_invalid_key(&key) {
//! return;
//! }
//! self.0.register_gauge(key, description)
//! self.0.register_gauge(key, unit, description)
//! }
//!
//! fn register_histogram(&self, key: Key, description: Option<&'static str>) {
//! fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
//! if self.is_invalid_key(&key) {
//! return;
//! }
//! self.0.register_histogram(key, description)
//! self.0.register_histogram(key, unit, description)
//! }
//!
//! fn increment_counter(&self, key: Key, value: u64) {
@ -102,7 +102,7 @@
//! .expect("failed to install stack");
//! # }
//! ```
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
#[cfg(feature = "std")]
use metrics::SetRecorderError;
@ -155,16 +155,16 @@ impl<R: Recorder + 'static> Stack<R> {
}
impl<R: Recorder> Recorder for Stack<R> {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
self.inner.register_counter(key, description)
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_counter(key, unit, description);
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
self.inner.register_gauge(key, description)
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_gauge(key, unit, description);
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
self.inner.register_histogram(key, description)
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_histogram(key, unit, description);
}
fn increment_counter(&self, key: Key, value: u64) {

View File

@ -1,5 +1,5 @@
use crate::layers::Layer;
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
/// Applies a prefix to every metric key.
///
@ -18,19 +18,19 @@ impl<R> Prefix<R> {
}
impl<R: Recorder> Recorder for Prefix<R> {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let new_key = self.prefix_key(key);
self.inner.register_counter(new_key, description)
self.inner.register_counter(new_key, unit, description)
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let new_key = self.prefix_key(key);
self.inner.register_gauge(new_key, description)
self.inner.register_gauge(new_key, unit, description)
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let new_key = self.prefix_key(key);
self.inner.register_histogram(new_key, description)
self.inner.register_histogram(new_key, unit, description)
}
fn increment_counter(&self, key: Key, value: u64) {
@ -77,7 +77,7 @@ mod tests {
use super::PrefixLayer;
use crate::debugging::DebuggingRecorder;
use crate::layers::Layer;
use metrics::{KeyData, Recorder};
use metrics::{KeyData, Recorder, Unit};
#[test]
fn test_basic_functionality() {
@ -89,15 +89,35 @@ mod tests {
let before = snapshotter.snapshot();
assert_eq!(before.len(), 0);
layered.register_counter(KeyData::from_name("counter_metric").into(), None);
layered.register_gauge(KeyData::from_name("gauge_metric").into(), None);
layered.register_histogram(KeyData::from_name("histogram_metric").into(), None);
let ud = &[
(Unit::Nanoseconds, "counter desc"),
(Unit::Microseconds, "gauge desc"),
(Unit::Milliseconds, "histogram desc"),
];
layered.register_counter(
KeyData::from_name("counter_metric").into(),
Some(ud[0].0.clone()),
Some(ud[0].1),
);
layered.register_gauge(
KeyData::from_name("gauge_metric").into(),
Some(ud[1].0.clone()),
Some(ud[1].1),
);
layered.register_histogram(
KeyData::from_name("histogram_metric").into(),
Some(ud[2].0.clone()),
Some(ud[2].1),
);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 3);
for (_kind, key, _value) in &after {
for (i, (_kind, key, unit, desc, _value)) in after.iter().enumerate() {
assert!(key.name().starts_with("testing"));
assert_eq!(&Some(ud[i].0.clone()), unit);
assert_eq!(&Some(ud[i].1), desc);
}
}
}

View File

@ -1,24 +1,33 @@
//! Helper types and functions used within the metrics ecosystem.
#![deny(missing_docs)]
#![cfg_attr(not(feature = "std"), no_std)]
#[cfg(feature = "std")]
mod bucket;
#[cfg(feature = "std")]
pub use bucket::AtomicBucket;
#[cfg(feature = "std")]
mod debugging;
#[cfg(feature = "std")]
pub use debugging::{DebugValue, DebuggingRecorder, MetricKind, Snapshotter};
#[cfg(feature = "std")]
mod handle;
#[cfg(feature = "std")]
pub use handle::Handle;
#[cfg(feature = "std")]
mod streaming;
#[cfg(feature = "std")]
pub use streaming::StreamingIntegers;
mod quantile;
pub use quantile::{parse_quantiles, Quantile};
mod tree;
pub use tree::{Integer, MetricsTree};
#[cfg(feature = "std")]
mod registry;
#[cfg(feature = "std")]
pub use registry::Registry;
mod key;

View File

@ -1,6 +1,6 @@
use core::hash::Hash;
use dashmap::DashMap;
use std::collections::HashMap;
use std::hash::Hash;
/// A high-performance metric registry.
///

View File

@ -1,4 +1,4 @@
use std::slice;
use core::slice;
/// A compressed set of integers.
///
@ -50,7 +50,7 @@ use std::slice;
pub struct StreamingIntegers {
inner: Vec<u8>,
len: usize,
last: Option<i64>,
last: Option<i128>,
}
impl StreamingIntegers {
@ -99,7 +99,7 @@ impl StreamingIntegers {
// a delta value.
let mut src_idx = 0;
if self.last.is_none() {
let first = src[src_idx] as i64;
let first = src[src_idx] as i128;
self.last = Some(first);
let zigzag = zigzag_encode(first);
@ -112,7 +112,8 @@ impl StreamingIntegers {
let mut last = self.last.unwrap();
while src_idx < src_len {
let value = src[src_idx] as i64;
let value = src[src_idx] as i128;
// attempted to subtract with overflow
let diff = value - last;
let zigzag = zigzag_encode(diff);
buf_idx = vbyte_encode(zigzag, &mut buf, buf_idx);
@ -194,17 +195,17 @@ impl StreamingIntegers {
}
#[inline]
fn zigzag_encode(input: i64) -> u64 {
((input << 1) ^ (input >> 63)) as u64
fn zigzag_encode(input: i128) -> u128 {
((input << 1) ^ (input >> 127)) as u128
}
#[inline]
fn zigzag_decode(input: u64) -> i64 {
((input >> 1) as i64) ^ (-((input & 1) as i64))
fn zigzag_decode(input: u128) -> i128 {
((input >> 1) as i128) ^ (-((input & 1) as i128))
}
#[inline]
fn vbyte_encode(mut input: u64, buf: &mut [u8], mut buf_idx: usize) -> usize {
fn vbyte_encode(mut input: u128, buf: &mut [u8], mut buf_idx: usize) -> usize {
while input >= 128 {
buf[buf_idx] = 0x80 as u8 | (input as u8 & 0x7F);
buf_idx += 1;
@ -215,11 +216,11 @@ fn vbyte_encode(mut input: u64, buf: &mut [u8], mut buf_idx: usize) -> usize {
}
#[inline]
fn vbyte_decode(buf: &[u8], mut buf_idx: usize) -> (u64, usize) {
fn vbyte_decode(buf: &[u8], mut buf_idx: usize) -> (u128, usize) {
let mut tmp = 0;
let mut factor = 0;
loop {
tmp |= u64::from(buf[buf_idx] & 0x7F) << (7 * factor);
tmp |= u128::from(buf[buf_idx] & 0x7F) << (7 * factor);
if buf[buf_idx] & 0x80 != 0x80 {
return (tmp, buf_idx + 1);
}
@ -240,6 +241,19 @@ mod tests {
assert_eq!(decompressed.len(), 0);
}
#[test]
fn test_streaming_integers_edge_cases() {
let mut si = StreamingIntegers::new();
let decompressed = si.decompress();
assert_eq!(decompressed.len(), 0);
let values = vec![140754668284938, 9223372079804448768];
si.compress(&values);
let decompressed = si.decompress();
assert_eq!(decompressed, values);
}
#[test]
fn test_streaming_integers_single_block() {
let mut si = StreamingIntegers::new();

View File

@ -1,122 +0,0 @@
use serde::ser::{Serialize, Serializer};
use std::collections::HashMap;
/// An integer metric value.
pub enum Integer {
/// A signed value.
Signed(i64),
/// An unsigned value.
Unsigned(u64),
}
impl From<i64> for Integer {
fn from(i: i64) -> Integer {
Integer::Signed(i)
}
}
impl From<u64> for Integer {
fn from(i: u64) -> Integer {
Integer::Unsigned(i)
}
}
enum TreeEntry {
Value(Integer),
Nested(MetricsTree),
}
impl Serialize for TreeEntry {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
TreeEntry::Value(value) => match value {
Integer::Signed(i) => serializer.serialize_i64(*i),
Integer::Unsigned(i) => serializer.serialize_u64(*i),
},
TreeEntry::Nested(tree) => tree.serialize(serializer),
}
}
}
/// A tree-structured metrics container.
///
/// Used for building a tree structure out of scoped metrics, where each level in the tree
/// represents a nested scope.
#[derive(Default)]
pub struct MetricsTree {
contents: HashMap<String, TreeEntry>,
}
impl MetricsTree {
/// Inserts a single value into the tree.
pub fn insert_value<V: Into<Integer>>(
&mut self,
mut levels: Vec<String>,
key: String,
value: V,
) {
match levels.len() {
0 => {
self.contents.insert(key, TreeEntry::Value(value.into()));
}
_ => {
let name = levels.remove(0);
let inner = self
.contents
.entry(name)
.or_insert_with(|| TreeEntry::Nested(MetricsTree::default()));
if let TreeEntry::Nested(tree) = inner {
tree.insert_value(levels, key, value);
}
}
}
}
/// Inserts multiple values into the tree.
pub fn insert_values<V: Into<Integer>>(
&mut self,
mut levels: Vec<String>,
values: Vec<(String, V)>,
) {
match levels.len() {
0 => {
for v in values.into_iter() {
self.contents.insert(v.0, TreeEntry::Value(v.1.into()));
}
}
_ => {
let name = levels.remove(0);
let inner = self
.contents
.entry(name)
.or_insert_with(|| TreeEntry::Nested(MetricsTree::default()));
if let TreeEntry::Nested(tree) = inner {
tree.insert_values(levels, values);
}
}
}
}
/// Clears all entries in the tree.
pub fn clear(&mut self) {
self.contents.clear();
}
}
impl Serialize for MetricsTree {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut sorted = self.contents.iter().collect::<Vec<_>>();
sorted.sort_by_key(|p| p.0);
serializer.collect_map(sorted)
}
}

View File

@ -0,0 +1 @@
<EFBFBD>箒筱sssェ>

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>s<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ss<EFBFBD>><3E><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>N

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@ -0,0 +1,2 @@
íÿÿÿÿÿÿNíYí«ÍíYí«XXX
Á(y

View File

@ -0,0 +1 @@
ォ粐粐粐粐粐

View File

@ -0,0 +1 @@
<EFBFBD>

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD>薶滫

View File

@ -0,0 +1,2 @@
<EFBFBD><EFBFBD>N倢戓ヘXX
チ(y戓(

View File

@ -0,0 +1 @@
Íí(Y((Y(*

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>s<EFBFBD>s<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@ -0,0 +1 @@
<EFBFBD><12><><EFBFBD><EFBFBD>N<EFBFBD><4E>

View File

@ -0,0 +1 @@
ííÍNí˙ĺíA

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>K

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><04>

View File

@ -0,0 +1 @@
澵澵戓ォ

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><0E>

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>(<28><>(

View File

@ -0,0 +1,4 @@
<EFBFBD><EFBFBD>Y#<23><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>&XY<58>XY<58>X<EFBFBD><58><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
X<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
X
<EFBFBD><EFBFBD><EFBFBD><EFBFBD>(<28>y

View File

@ -0,0 +1,2 @@
ííYí«ÍíYí«XXX
Á(y

View File

@ -0,0 +1,2 @@
<EFBFBD><EFBFBD>Y<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>&XY<58>X<EFBFBD><58><EFBFBD><EFBFBD><EFBFBD><EFBFBD>X
(<28>y

View File

@ -0,0 +1,2 @@
<EFBFBD>
<EFBFBD><EFBFBD>

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><0E>

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@ -0,0 +1 @@
ォ粐z粐粐

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@ -0,0 +1 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@ -0,0 +1,3 @@
ííXX
ÁXXX
Á(yäää)y

View File

@ -0,0 +1,9 @@
use bolero::fuzz;
use metrics_util::StreamingIntegers;
fn main() {
fuzz!().with_type().for_each(|value: &Vec<u64>| {
let mut si = StreamingIntegers::new();
si.compress(&value);
});
}

View File

@ -7,15 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- next-header -->
## [Unreleased] - ReleaseDate
### Added
- Support for specifying the unit of a measurement during registration. ([#107](https://github.com/metrics-rs/metrics/pull/107))
## [0.12.1] - 2019-11-21
### Changed
- Cost for macros dropped to almost zero when no recorder is installed. (#55)
- Cost for macros dropped to almost zero when no recorder is installed. ([#55](https://github.com/metrics-rs/metrics/pull/55))
## [0.12.0] - 2019-10-18
### Changed
- Improved documentation. (#44, #45, #46)
- Renamed `Recorder::record_counter` to `increment_counter` and `Recorder::record_gauge` to `update_gauge`. (#47)
- Renamed `Recorder::record_counter` to `increment_counter` and `Recorder::record_gauge` to `update_gauge`. ([#47](https://github.com/metrics-rs/metrics/pull/47))
## [0.11.1] - 2019-08-09
### Changed

View File

@ -3,15 +3,22 @@ extern crate criterion;
use criterion::{Benchmark, Criterion};
use metrics::{counter, Key, Recorder};
use metrics::{counter, Key, Recorder, Unit};
use rand::{thread_rng, Rng};
#[derive(Default)]
struct TestRecorder;
impl Recorder for TestRecorder {
fn register_counter(&self, _key: Key, _description: Option<&'static str>) {}
fn register_gauge(&self, _key: Key, _description: Option<&'static str>) {}
fn register_histogram(&self, _key: Key, _description: Option<&'static str>) {}
fn register_counter(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {
}
fn register_gauge(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
fn register_histogram(
&self,
_key: Key,
_unit: Option<Unit>,
_description: Option<&'static str>,
) {
}
fn increment_counter(&self, _key: Key, _value: u64) {}
fn update_gauge(&self, _key: Key, _value: f64) {}
fn record_histogram(&self, _key: Key, _value: u64) {}

Some files were not shown because too many files have changed in this diff Show More