first cut at units

This commit is contained in:
Toby Lawrence 2020-10-17 13:10:28 -04:00
parent 5f26d6567f
commit 9f8f9d360c
24 changed files with 943 additions and 277 deletions

View File

@ -6,7 +6,7 @@ use hyper::{
service::{make_service_fn, service_fn},
{Body, Error as HyperError, Response, Server},
};
use metrics::{Key, Recorder, SetRecorderError};
use metrics::{Key, Recorder, SetRecorderError, Unit};
use metrics_util::{
parse_quantiles, CompositeKey, Handle, Histogram, MetricKind, Quantile, Registry,
};
@ -494,7 +494,7 @@ impl PrometheusBuilder {
}
impl Recorder for PrometheusRecorder {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
fn register_counter(&self, key: Key, _unit: Option<Unit>, description: Option<&'static str>) {
self.add_description_if_missing(&key, description);
self.inner.registry().op(
CompositeKey::new(MetricKind::Counter, key),
@ -503,7 +503,7 @@ impl Recorder for PrometheusRecorder {
);
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
fn register_gauge(&self, key: Key, _unit: Option<Unit>, description: Option<&'static str>) {
self.add_description_if_missing(&key, description);
self.inner.registry().op(
CompositeKey::new(MetricKind::Gauge, key),
@ -512,7 +512,7 @@ impl Recorder for PrometheusRecorder {
);
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
fn register_histogram(&self, key: Key, _unit: Option<Unit>, description: Option<&'static str>) {
self.add_description_if_missing(&key, description);
self.inner.registry().op(
CompositeKey::new(MetricKind::Histogram, key),

View File

@ -1,7 +1,7 @@
use std::thread;
use std::time::Duration;
use metrics::{histogram, increment};
use metrics::{histogram, increment, register_histogram, Unit};
use metrics_exporter_tcp::TcpBuilder;
use quanta::Clock;
@ -15,6 +15,8 @@ fn main() {
let mut clock = Clock::new();
let mut last = None;
register_histogram!("tcp_server_loop_delta_ns", Unit::Nanoseconds);
loop {
increment!("tcp_server_loops", "system" => "foo");

View File

@ -4,6 +4,22 @@ import "google/protobuf/timestamp.proto";
package event.proto;
message Metadata {
string name = 1;
enum MetricType {
COUNTER = 0;
GAUGE = 1;
HISTOGRAM = 2;
}
MetricType metric_type = 2;
oneof unit {
string unit_value = 3;
}
oneof description {
string description_value = 4;
}
}
message Metric {
string name = 1;
google.protobuf.Timestamp timestamp = 2;
@ -26,3 +42,10 @@ message Gauge {
message Histogram {
uint64 value = 1;
}
message Event {
oneof event {
Metadata metadata = 1;
Metric metric = 2;
}
}

View File

@ -53,7 +53,7 @@ use std::time::SystemTime;
use bytes::Bytes;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use metrics::{Key, Recorder, SetRecorderError};
use metrics::{Key, Recorder, SetRecorderError, Unit};
use mio::{
net::{TcpListener, TcpStream},
Events, Interest, Poll, Token, Waker,
@ -70,12 +70,19 @@ mod proto {
include!(concat!(env!("OUT_DIR"), "/event.proto.rs"));
}
use self::proto::metadata::MetricType;
enum MetricValue {
Counter(u64),
Gauge(f64),
Histogram(u64),
}
enum Event {
Metadata(Key, MetricType, Option<Unit>, Option<&'static str>),
Metric(Key, MetricValue),
}
/// Errors that could occur while installing a TCP recorder/exporter.
#[derive(Debug)]
pub enum Error {
@ -100,7 +107,7 @@ impl From<SetRecorderError> for Error {
/// A TCP recorder.
pub struct TcpRecorder {
tx: Sender<(Key, MetricValue)>,
tx: Sender<Event>,
waker: Arc<Waker>,
}
@ -191,18 +198,37 @@ impl TcpBuilder {
}
impl TcpRecorder {
fn register_metric(
&self,
key: Key,
metric_type: MetricType,
unit: Option<Unit>,
description: Option<&'static str>,
) {
let _ = self
.tx
.try_send(Event::Metadata(key, metric_type, unit, description));
let _ = self.waker.wake();
}
fn push_metric(&self, key: Key, value: MetricValue) {
let _ = self.tx.try_send((key, value));
let _ = self.tx.try_send(Event::Metric(key, value));
let _ = self.waker.wake();
}
}
impl Recorder for TcpRecorder {
fn register_counter(&self, _key: Key, _description: Option<&'static str>) {}
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.register_metric(key, MetricType::Counter, unit, description);
}
fn register_gauge(&self, _key: Key, _description: Option<&'static str>) {}
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.register_metric(key, MetricType::Gauge, unit, description);
}
fn register_histogram(&self, _key: Key, _description: Option<&'static str>) {}
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.register_metric(key, MetricType::Histogram, unit, description);
}
fn increment_counter(&self, key: Key, value: u64) {
self.push_metric(key, MetricValue::Counter(value));
@ -221,13 +247,14 @@ fn run_transport(
mut poll: Poll,
waker: Arc<Waker>,
listener: TcpListener,
rx: Receiver<(Key, MetricValue)>,
rx: Receiver<Event>,
buffer_size: Option<usize>,
) {
let buffer_limit = buffer_size.unwrap_or(std::usize::MAX);
let mut events = Events::with_capacity(1024);
let mut clients = HashMap::new();
let mut clients_to_remove = Vec::new();
let mut metadata = HashMap::new();
let mut next_token = START_TOKEN;
let mut buffered_pmsgs = VecDeque::with_capacity(buffer_limit);
@ -270,10 +297,22 @@ fn run_transport(
// If our sender is dead, we can't do anything else, so just return.
Err(_) => return,
};
let (key, value) = msg;
match convert_metric_to_protobuf_encoded(key, value) {
Ok(pmsg) => buffered_pmsgs.push_back(pmsg),
Err(e) => error!(error = ?e, "error encoding metric"),
match msg {
Event::Metadata(key, metric_type, unit, desc) => {
let entry = metadata
.entry(key)
.or_insert_with(|| (metric_type, None, None));
let (_, uentry, dentry) = entry;
*uentry = unit;
*dentry = desc;
}
Event::Metric(key, value) => {
match convert_metric_to_protobuf_encoded(key, value) {
Ok(pmsg) => buffered_pmsgs.push_back(pmsg),
Err(e) => error!(error = ?e, "error encoding metric"),
}
}
}
}
drop(_mrxspan);
@ -340,9 +379,10 @@ fn run_transport(
.register(&mut conn, token, CLIENT_INTEREST)
.expect("failed to register interest for client connection");
// Start tracking them.
// Start tracking them, and enqueue all of the metadata.
let metadata = generate_metadata_messages(&metadata);
clients
.insert(token, (conn, None, VecDeque::new()))
.insert(token, (conn, None, metadata))
.ok_or(())
.expect_err("client mapped to existing token!");
}
@ -370,6 +410,23 @@ fn run_transport(
}
}
fn generate_metadata_messages(
metadata: &HashMap<Key, (MetricType, Option<Unit>, Option<&'static str>)>,
) -> VecDeque<Bytes> {
let mut bufs = VecDeque::new();
for (key, (metric_type, unit, desc)) in metadata.iter() {
let msg = convert_metadata_to_protobuf_encoded(
key,
metric_type.clone(),
unit.clone(),
desc.clone(),
)
.expect("failed to encode metadata buffer");
bufs.push_back(msg);
}
bufs
}
#[tracing::instrument(skip(wbuf, msgs))]
fn drive_connection(
conn: &mut TcpStream,
@ -421,6 +478,28 @@ fn drive_connection(
}
}
fn convert_metadata_to_protobuf_encoded(
key: &Key,
metric_type: MetricType,
unit: Option<Unit>,
desc: Option<&'static str>,
) -> Result<Bytes, EncodeError> {
let name = key.name().to_string();
let metadata = proto::Metadata {
name,
metric_type: metric_type.into(),
unit: unit.map(|u| proto::metadata::Unit::UnitValue(u.as_str().to_owned())),
description: desc.map(|d| proto::metadata::Description::DescriptionValue(d.to_owned())),
};
let event = proto::Event {
event: Some(proto::event::Event::Metadata(metadata)),
};
let mut buf = Vec::new();
event.encode_length_delimited(&mut buf)?;
Ok(Bytes::from(buf))
}
fn convert_metric_to_protobuf_encoded(key: Key, value: MetricValue) -> Result<Bytes, EncodeError> {
let name = key.name().to_string();
let labels = key
@ -442,9 +521,12 @@ fn convert_metric_to_protobuf_encoded(key: Key, value: MetricValue) -> Result<By
timestamp: Some(now),
value: Some(mvalue),
};
let event = proto::Event {
event: Some(proto::event::Event::Metric(metric)),
};
let mut buf = Vec::new();
metric.encode_length_delimited(&mut buf)?;
event.encode_length_delimited(&mut buf)?;
Ok(Bytes::from(buf))
}

View File

@ -7,6 +7,7 @@ use proc_macro2::Span;
use proc_macro_hack::proc_macro_hack;
use quote::{format_ident, quote, ToTokens};
use regex::Regex;
use syn::parse::discouraged::Speculative;
use syn::parse::{Error, Parse, ParseStream, Result};
use syn::{parse_macro_input, Expr, LitStr, Token};
@ -45,6 +46,7 @@ struct WithExpression {
struct Registration {
key: Key,
unit: Option<Expr>,
description: Option<LitStr>,
labels: Option<Labels>,
}
@ -79,30 +81,78 @@ impl Parse for Registration {
fn parse(mut input: ParseStream) -> Result<Self> {
let key = read_key(&mut input)?;
// We accept three possible parameters: unit, description, and labels.
//
// If our first parameter is a literal string, we either have the description and no labels,
// or a description and labels. Peek at the trailing token after the description to see if
// we need to keep parsing.
// This may or may not be the start of labels, if the description has been omitted, so
// we hold on to it until we can make sure nothing else is behind it, or if it's a full
// fledged set of labels.
let (description, labels) = if input.peek(Token![,]) && input.peek3(Token![=>]) {
let (unit, description, labels) = if input.peek(Token![,]) && input.peek3(Token![=>]) {
// We have a ", <something> =>" pattern, which can only be labels, so we have no
// description.
// unit or description.
let labels = parse_labels(&mut input)?;
(None, labels)
(None, None, labels)
} else if input.peek(Token![,]) && input.peek2(LitStr) {
// We already know we're not working with labels only, and if we have ", <literal
// string>" then we have to at least have a description, possibly with labels.
input.parse::<Token![,]>()?;
let description = input.parse::<LitStr>().ok();
let labels = parse_labels(&mut input)?;
(description, labels)
} else {
// We might have labels passed as an expression.
(None, description, labels)
} else if input.peek(Token![,]) {
// We may or may not have anything left to parse here, but it could also be any
// combination of unit + description and/or labels.
//
// We speculatively try and parse an expression from the buffer, and see if we can match
// it to the qualified name of the Unit enum. We run all of the other normal parsing
// after that for description and labels.
let forked = input.fork();
forked.parse::<Token![,]>()?;
let unit = if let Ok(Expr::Path(path)) = forked.parse::<Expr>() {
let qname = path
.path
.segments
.iter()
.map(|x| x.ident.to_string())
.collect::<Vec<_>>()
.join("::");
if qname.starts_with("metrics::Unit") || qname.starts_with("Unit") {
Some(Expr::Path(path))
} else {
None
}
} else {
None
};
// If we succeeded, advance the main parse stream up to where the fork left off.
if unit.is_some() {
input.advance_to(&forked);
}
// We still have to check for a possible description.
let description =
if input.peek(Token![,]) && input.peek2(LitStr) && !input.peek3(Token![=>]) {
input.parse::<Token![,]>()?;
input.parse::<LitStr>().ok()
} else {
None
};
let labels = parse_labels(&mut input)?;
(None, labels)
(unit, description, labels)
} else {
(None, None, None)
};
Ok(Registration {
key,
unit,
description,
labels,
})
@ -113,33 +163,36 @@ impl Parse for Registration {
pub fn register_counter(input: TokenStream) -> TokenStream {
let Registration {
key,
unit,
description,
labels,
} = parse_macro_input!(input as Registration);
get_expanded_registration("counter", key, description, labels).into()
get_expanded_registration("counter", key, unit, description, labels).into()
}
#[proc_macro_hack]
pub fn register_gauge(input: TokenStream) -> TokenStream {
let Registration {
key,
unit,
description,
labels,
} = parse_macro_input!(input as Registration);
get_expanded_registration("gauge", key, description, labels).into()
get_expanded_registration("gauge", key, unit, description, labels).into()
}
#[proc_macro_hack]
pub fn register_histogram(input: TokenStream) -> TokenStream {
let Registration {
key,
unit,
description,
labels,
} = parse_macro_input!(input as Registration);
get_expanded_registration("histogram", key, description, labels).into()
get_expanded_registration("histogram", key, unit, description, labels).into()
}
#[proc_macro_hack]
@ -187,12 +240,18 @@ pub fn histogram(input: TokenStream) -> TokenStream {
fn get_expanded_registration(
metric_type: &str,
key: Key,
unit: Option<Expr>,
description: Option<LitStr>,
labels: Option<Labels>,
) -> proc_macro2::TokenStream {
let register_ident = format_ident!("register_{}", metric_type);
let key = key_to_quoted(key, labels);
let unit = match unit {
Some(e) => quote! { Some(#e) },
None => quote! { None },
};
let description = match description {
Some(s) => quote! { Some(#s) },
None => quote! { None },
@ -204,7 +263,7 @@ fn get_expanded_registration(
if let Some(recorder) = metrics::try_recorder() {
// Registrations are fairly rare, don't attempt to cache here
// and just use an owned ref.
recorder.#register_ident(metrics::Key::Owned(#key), #description);
recorder.#register_ident(metrics::Key::Owned(#key), #unit, #description);
}
}
}

View File

@ -1,10 +1,11 @@
use syn::parse_quote;
use syn::{Expr, ExprPath};
use super::*;
#[test]
fn test_quote_key_name_scoped() {
let stream = quote_key_name(Key::Scoped(parse_quote! {"qwerty"}));
let stream = quote_key_name(Key::Scoped(parse_quote! { "qwerty" }));
let expected =
"format ! (\"{}.{}\" , std :: module_path ! () . replace (\"::\" , \".\") , \"qwerty\")";
assert_eq!(stream.to_string(), expected);
@ -12,16 +13,18 @@ fn test_quote_key_name_scoped() {
#[test]
fn test_quote_key_name_not_scoped() {
let stream = quote_key_name(Key::NotScoped(parse_quote! {"qwerty"}));
let stream = quote_key_name(Key::NotScoped(parse_quote! { "qwerty" }));
let expected = "\"qwerty\"";
assert_eq!(stream.to_string(), expected);
}
#[test]
fn test_get_expanded_registration() {
// Basic registration.
let stream = get_expanded_registration(
"mytype",
Key::NotScoped(parse_quote! {"mykeyname"}),
Key::NotScoped(parse_quote! { "mykeyname" }),
None,
None,
None,
);
@ -30,6 +33,7 @@ fn test_get_expanded_registration() {
"{ if let Some (recorder) = metrics :: try_recorder () { ",
"recorder . register_mytype (",
"metrics :: Key :: Owned (metrics :: KeyData :: from_name (\"mykeyname\")) , ",
"None , ",
"None",
") ; ",
"} }",
@ -38,6 +42,80 @@ fn test_get_expanded_registration() {
assert_eq!(stream.to_string(), expected);
}
#[test]
fn test_get_expanded_registration_with_unit() {
// Now with unit.
let units: ExprPath = parse_quote! { metrics::Unit::Nanoseconds };
let stream = get_expanded_registration(
"mytype",
Key::NotScoped(parse_quote! { "mykeyname" }),
Some(Expr::Path(units)),
None,
None,
);
let expected = concat!(
"{ if let Some (recorder) = metrics :: try_recorder () { ",
"recorder . register_mytype (",
"metrics :: Key :: Owned (metrics :: KeyData :: from_name (\"mykeyname\")) , ",
"Some (metrics :: Unit :: Nanoseconds) , ",
"None",
") ; ",
"} }",
);
assert_eq!(stream.to_string(), expected);
}
#[test]
fn test_get_expanded_registration_with_description() {
// And with description.
let stream = get_expanded_registration(
"mytype",
Key::NotScoped(parse_quote! { "mykeyname" }),
None,
Some(parse_quote! { "flerkin" }),
None,
);
let expected = concat!(
"{ if let Some (recorder) = metrics :: try_recorder () { ",
"recorder . register_mytype (",
"metrics :: Key :: Owned (metrics :: KeyData :: from_name (\"mykeyname\")) , ",
"None , ",
"Some (\"flerkin\")",
") ; ",
"} }",
);
assert_eq!(stream.to_string(), expected);
}
#[test]
fn test_get_expanded_registration_with_unit_and_description() {
// And with unit and description.
let units: ExprPath = parse_quote! { metrics::Unit::Nanoseconds };
let stream = get_expanded_registration(
"mytype",
Key::NotScoped(parse_quote! { "mykeyname" }),
Some(Expr::Path(units)),
Some(parse_quote! { "flerkin" }),
None,
);
let expected = concat!(
"{ if let Some (recorder) = metrics :: try_recorder () { ",
"recorder . register_mytype (",
"metrics :: Key :: Owned (metrics :: KeyData :: from_name (\"mykeyname\")) , ",
"Some (metrics :: Unit :: Nanoseconds) , ",
"Some (\"flerkin\")",
") ; ",
"} }",
);
assert_eq!(stream.to_string(), expected);
}
/// If there are no dynamic labels - generate an invocation with caching.
#[test]
fn test_get_expanded_callsite_fast_path() {

View File

@ -4,6 +4,22 @@ import "google/protobuf/timestamp.proto";
package event.proto;
message Metadata {
string name = 1;
enum MetricType {
COUNTER = 0;
GAUGE = 1;
HISTOGRAM = 2;
}
MetricType metric_type = 2;
oneof unit {
string unit_value = 3;
}
oneof description {
string description_value = 4;
}
}
message Metric {
string name = 1;
google.protobuf.Timestamp timestamp = 2;
@ -26,3 +42,10 @@ message Gauge {
message Histogram {
uint64 value = 1;
}
message Event {
oneof event {
Metadata metadata = 1;
Metric metric = 2;
}
}

View File

@ -2,7 +2,7 @@ use std::io;
use std::thread;
use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, TrySendError, RecvTimeoutError};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, TrySendError};
use termion::event::Key;
use termion::input::TermRead;
@ -37,4 +37,4 @@ impl InputEvents {
Err(e) => Err(e),
}
}
}
}

View File

@ -7,7 +7,7 @@ use tui::{
layout::{Constraint, Direction, Layout},
style::{Color, Modifier, Style},
text::{Span, Spans},
widgets::{Block, Borders, Paragraph, Wrap, List, ListItem},
widgets::{Block, Borders, List, ListItem, Paragraph, Wrap},
Terminal,
};
@ -36,12 +36,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let chunks = Layout::default()
.direction(Direction::Vertical)
.margin(1)
.constraints(
[
Constraint::Length(4),
Constraint::Percentage(90)
].as_ref()
)
.constraints([Constraint::Length(4), Constraint::Percentage(90)].as_ref())
.split(f.size());
let current_dt = Local::now().format(" (%Y/%m/%d %I:%M:%S %p)").to_string();
@ -58,7 +53,7 @@ fn main() -> Result<(), Box<dyn Error>> {
}
Spans::from(spans)
},
}
ClientState::Connected => Spans::from(vec![
Span::raw("state: "),
Span::styled("connected", Style::default().fg(Color::Green)),
@ -67,7 +62,10 @@ fn main() -> Result<(), Box<dyn Error>> {
let header_block = Block::default()
.title(vec![
Span::styled("metrics-observer", Style::default().add_modifier(Modifier::BOLD)),
Span::styled(
"metrics-observer",
Style::default().add_modifier(Modifier::BOLD),
),
Span::raw(current_dt),
])
.borders(Borders::ALL);
@ -87,42 +85,48 @@ fn main() -> Result<(), Box<dyn Error>> {
// Knock 5 off the line width to account for 3-character highlight symbol + borders.
let line_width = chunks[1].width.saturating_sub(6) as usize;
let items = client.with_metrics(|metrics| {
let mut items = Vec::new();
for (key, value) in metrics.iter() {
let inner_key = key.key();
let name = inner_key.name();
let labels = inner_key.labels().map(|label| format!("{} = {}", label.key(), label.value())).collect::<Vec<_>>();
let display_name = if labels.is_empty() {
name.to_string()
} else {
format!("{} [{}]", name, labels.join(", "))
};
let mut items = Vec::new();
let metrics = client.get_metrics();
for (key, value, unit, _desc) in metrics {
let inner_key = key.key();
let name = inner_key.name();
let labels = inner_key
.labels()
.map(|label| format!("{} = {}", label.key(), label.value()))
.collect::<Vec<_>>();
let display_name = if labels.is_empty() {
name.to_string()
} else {
format!("{} [{}]", name, labels.join(", "))
};
let unit = unit_to_short_unit(unit);
let display_value = match value {
MetricData::Counter(value) => format!("total: {}", value),
MetricData::Gauge(value) => format!("current: {}", value),
MetricData::Histogram(value) => {
let min = value.min();
let max = value.max();
let p50 = value.value_at_quantile(0.5);
let p99 = value.value_at_quantile(0.99);
let p999 = value.value_at_quantile(0.999);
let display_value = match value {
MetricData::Counter(value) => format!("total: {}{}", value, unit),
MetricData::Gauge(value) => format!("current: {}{}", value, unit),
MetricData::Histogram(value) => {
let min = value.min();
let max = value.max();
let p50 = value.value_at_quantile(0.5);
let p99 = value.value_at_quantile(0.99);
let p999 = value.value_at_quantile(0.999);
format!("min: {} p50: {} p99: {} p999: {} max: {}",
min, p50, p99, p999, max)
},
};
format!(
"min: {}{} p50: {}{} p99: {}{} p999: {}{} max: {}{}",
min, unit, p50, unit, p99, unit, p999, unit, max, unit
)
}
};
let name_length = display_name.chars().count();
let value_length = display_value.chars().count();
let space = line_width.saturating_sub(name_length).saturating_sub(value_length);
let name_length = display_name.chars().count();
let value_length = display_value.chars().count();
let space = line_width
.saturating_sub(name_length)
.saturating_sub(value_length);
let display = format!("{}{}{}", display_name, " ".repeat(space), display_value);
items.push(ListItem::new(display));
}
items
});
let display = format!("{}{}{}", display_name, " ".repeat(space), display_value);
items.push(ListItem::new(display));
}
selector.set_length(items.len());
let metrics_block = Block::default()
@ -132,7 +136,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let metrics = List::new(items)
.block(metrics_block)
.highlight_symbol(">> ");
f.render_stateful_widget(metrics, chunks[1], selector.state());
})?;
@ -145,10 +149,46 @@ fn main() -> Result<(), Box<dyn Error>> {
Key::Down => selector.next(),
Key::PageUp => selector.top(),
Key::PageDown => selector.bottom(),
_ => {},
_ => {}
}
}
}
Ok(())
}
}
fn unit_to_short_unit(unit: Option<String>) -> &'static str {
match unit {
Some(s) => match s.as_str() {
"count" => "",
"percent" => "%",
"seconds" => "s",
"milliseconds" => "ms",
"microseconds" => "μs",
"nanoseconds" => "ns",
"terabytes" => " TB",
"gigabytes" => " GB",
"megabytes" => " MB",
"kilobytes" => " KB",
"bytes" => " bytes",
"terabits" => " Tb",
"gigabits" => " Gb",
"megabits" => " Mb",
"kilobits" => " Kb",
"bits" => " bits",
"terabytes_per_second" => " TBps",
"gigabytes_per_second" => " GBps",
"megabytes_per_second" => " MBps",
"kilobytes_per_second" => " KBps",
"bytes_per_second" => " Bps",
"terabits_per_second" => " Tbps",
"gigabits_per_second" => " Gbps",
"megabits_per_second" => " Mbps",
"kilobits_per_second" => " Kbps",
"bits_per_second" => " bps",
"count_per_second" => "/s",
_ => "",
},
None => "",
}
}

View File

@ -1,28 +1,35 @@
use std::collections::HashMap;
use std::io::Read;
use std::net::TcpStream;
use std::time::Duration;
use std::thread;
use std::net::ToSocketAddrs;
use std::sync::{Arc, Mutex, RwLock};
use std::io::Read;
use std::thread;
use std::time::Duration;
use bytes::{BufMut, BytesMut};
use prost::Message;
use hdrhistogram::Histogram;
use prost::Message;
use metrics::{Label, KeyData};
use metrics::{KeyData, Label};
use metrics_util::{CompositeKey, MetricKind};
mod proto {
include!(concat!(env!("OUT_DIR"), "/event.proto.rs"));
}
use self::proto::{
event::Event,
metadata::{Description, MetricType, Unit},
Event as EventWrapper,
};
#[derive(Clone)]
pub enum ClientState {
Disconnected(Option<String>),
Connected,
}
#[derive(Clone)]
pub enum MetricData {
Counter(u64),
Gauge(f64),
@ -32,6 +39,7 @@ pub enum MetricData {
pub struct Client {
state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<HashMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<String>, Option<String>)>>>,
handle: thread::JoinHandle<()>,
}
@ -39,11 +47,13 @@ impl Client {
pub fn new(addr: String) -> Client {
let state = Arc::new(Mutex::new(ClientState::Disconnected(None)));
let metrics = Arc::new(RwLock::new(HashMap::new()));
let metadata = Arc::new(RwLock::new(HashMap::new()));
let handle = {
let state = state.clone();
let metrics = metrics.clone();
let metadata = metadata.clone();
thread::spawn(move || {
let mut runner = Runner::new(addr, state, metrics);
let mut runner = Runner::new(addr, state, metrics, metadata);
runner.run();
})
};
@ -51,6 +61,7 @@ impl Client {
Client {
state,
metrics,
metadata,
handle,
}
}
@ -59,12 +70,22 @@ impl Client {
self.state.lock().unwrap().clone()
}
pub fn with_metrics<F, T>(&self, f: F) -> T
where
F: FnOnce(&HashMap<CompositeKey, MetricData>) -> T,
{
let handle = self.metrics.read().unwrap();
f(&handle)
pub fn get_metrics(&self) -> Vec<(CompositeKey, MetricData, Option<String>, Option<String>)> {
let metrics = self.metrics.read().unwrap();
let metadata = self.metadata.read().unwrap();
metrics
.iter()
.map(|(k, v)| {
let metakey = (k.kind(), k.key().name().to_string());
let (unit, desc) = match metadata.get(&metakey) {
Some((unit, desc)) => (unit.clone(), desc.clone()),
None => (None, None),
};
(k.clone(), v.clone(), unit, desc)
})
.collect()
}
}
@ -79,6 +100,7 @@ struct Runner {
addr: String,
client_state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<HashMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<String>, Option<String>)>>>,
}
impl Runner {
@ -86,12 +108,14 @@ impl Runner {
addr: String,
state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<HashMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<String>, Option<String>)>>>,
) -> Runner {
Runner {
state: RunnerState::Disconnected,
addr,
client_state: state,
metrics,
metadata,
}
}
@ -104,38 +128,47 @@ impl Runner {
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(None);
}
// Try to connect to our target and transition into Connected.
let addr = match self.addr.to_socket_addrs() {
Ok(mut addrs) => match addrs.next() {
Some(addr) => addr,
None => {
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(Some("failed to resolve specified host".to_string()));
*state = ClientState::Disconnected(Some(
"failed to resolve specified host".to_string(),
));
break;
}
}
},
Err(_) => {
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(Some("failed to resolve specified host".to_string()));
*state = ClientState::Disconnected(Some(
"failed to resolve specified host".to_string(),
));
break;
}
};
match TcpStream::connect_timeout(&addr, Duration::from_secs(3)) {
Ok(stream) => RunnerState::Connected(stream),
Err(_) => {
RunnerState::ErrorBackoff("error while connecting", Duration::from_secs(3))
}
Err(_) => RunnerState::ErrorBackoff(
"error while connecting",
Duration::from_secs(3),
),
}
},
}
RunnerState::ErrorBackoff(msg, dur) => {
{
let mut state = self.client_state.lock().unwrap();
*state = ClientState::Disconnected(Some(format!("{}, retrying in {} seconds...", msg, dur.as_secs())));
*state = ClientState::Disconnected(Some(format!(
"{}, retrying in {} seconds...",
msg,
dur.as_secs()
)));
}
thread::sleep(dur);
RunnerState::Disconnected
},
}
RunnerState::Connected(ref mut stream) => {
{
let mut state = self.client_state.lock().unwrap();
@ -144,53 +177,105 @@ impl Runner {
let mut buf = BytesMut::new();
let mut rbuf = [0u8; 1024];
loop {
match stream.read(&mut rbuf[..]) {
Ok(0) => break,
Ok(n) => buf.put_slice(&rbuf[..n]),
Err(e) => eprintln!("read error: {:?}", e),
};
match proto::Metric::decode_length_delimited(&mut buf) {
Err(e) => eprintln!("decode error: {:?}", e),
Ok(msg) => {
let mut labels_raw = msg.labels.into_iter().collect::<Vec<_>>();
labels_raw.sort_by(|a, b| a.0.cmp(&b.0));
let labels = labels_raw.into_iter().map(|(k, v)| Label::new(k, v)).collect::<Vec<_>>();
let key_data: KeyData = (msg.name, labels).into();
match msg.value.expect("no metric value") {
proto::metric::Value::Counter(value) => {
let key = CompositeKey::new(MetricKind::Counter, key_data.into());
let mut metrics = self.metrics.write().unwrap();
let counter = metrics.entry(key).or_insert_with(|| MetricData::Counter(0));
if let MetricData::Counter(inner) = counter {
*inner += value.value;
}
},
proto::metric::Value::Gauge(value) => {
let key = CompositeKey::new(MetricKind::Gauge, key_data.into());
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics.entry(key).or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
*inner = value.value;
}
},
proto::metric::Value::Histogram(value) => {
let key = CompositeKey::new(MetricKind::Histogram, key_data.into());
let mut metrics = self.metrics.write().unwrap();
let histogram = metrics.entry(key).or_insert_with(|| {
let histogram = Histogram::new(3).expect("failed to create histogram");
MetricData::Histogram(histogram)
});
let event = match EventWrapper::decode_length_delimited(&mut buf) {
Err(e) => {
eprintln!("decode error: {:?}", e);
continue;
}
Ok(event) => event,
};
if let MetricData::Histogram(inner) = histogram {
inner.record(value.value).expect("failed to record value to histogram");
}
},
if let Some(event) = event.event {
match event {
Event::Metadata(metadata) => {
let metric_type = MetricType::from_i32(metadata.metric_type)
.expect("unknown metric type over wire");
let metric_type = match metric_type {
MetricType::Counter => MetricKind::Counter,
MetricType::Gauge => MetricKind::Gauge,
MetricType::Histogram => MetricKind::Histogram,
};
let key = (metric_type, metadata.name);
let mut mmap = self
.metadata
.write()
.expect("failed to get metadata write lock");
let entry = mmap.entry(key).or_insert((None, None));
let (uentry, dentry) = entry;
*uentry = metadata.unit.map(|u| match u {
Unit::UnitValue(us) => us,
});
*dentry = metadata.description.map(|d| match d {
Description::DescriptionValue(ds) => ds,
});
}
},
Event::Metric(metric) => {
let mut labels_raw =
metric.labels.into_iter().collect::<Vec<_>>();
labels_raw.sort_by(|a, b| a.0.cmp(&b.0));
let labels = labels_raw
.into_iter()
.map(|(k, v)| Label::new(k, v))
.collect::<Vec<_>>();
let key_data: KeyData = (metric.name, labels).into();
match metric.value.expect("no metric value") {
proto::metric::Value::Counter(value) => {
let key = CompositeKey::new(
MetricKind::Counter,
key_data.into(),
);
let mut metrics = self.metrics.write().unwrap();
let counter = metrics
.entry(key)
.or_insert_with(|| MetricData::Counter(0));
if let MetricData::Counter(inner) = counter {
*inner += value.value;
}
}
proto::metric::Value::Gauge(value) => {
let key = CompositeKey::new(
MetricKind::Gauge,
key_data.into(),
);
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics
.entry(key)
.or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
*inner = value.value;
}
}
proto::metric::Value::Histogram(value) => {
let key = CompositeKey::new(
MetricKind::Histogram,
key_data.into(),
);
let mut metrics = self.metrics.write().unwrap();
let histogram =
metrics.entry(key).or_insert_with(|| {
let histogram = Histogram::new(3)
.expect("failed to create histogram");
MetricData::Histogram(histogram)
});
if let MetricData::Histogram(inner) = histogram {
inner
.record(value.value)
.expect("failed to record value to histogram");
}
}
}
}
}
}
}
@ -200,4 +285,4 @@ impl Runner {
self.state = next;
}
}
}
}

View File

@ -55,4 +55,4 @@ impl Selector {
};
self.1.select(Some(i));
}
}
}

View File

@ -55,7 +55,7 @@
#![deny(missing_docs)]
use metrics::{Key, KeyData, Label, Recorder};
use metrics::{Key, KeyData, Label, Recorder, Unit};
use metrics_util::layers::Layer;
use tracing::Span;
@ -136,16 +136,16 @@ where
R: Recorder,
F: LabelFilter,
{
fn register_counter(&self, key: Key, description: Option<&'static str>) {
self.inner.register_counter(key, description)
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_counter(key, unit, description)
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
self.inner.register_gauge(key, description)
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_gauge(key, unit, description)
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
self.inner.register_histogram(key, description)
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_histogram(key, unit, description)
}
fn increment_counter(&self, key: Key, value: u64) {

View File

@ -63,6 +63,8 @@ fn test_basic_functionality() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
)]
)
@ -89,7 +91,6 @@ fn test_macro_forms() {
"service" => "login_service", "node_name" => node_name.clone());
let snapshot = snapshotter.snapshot();
let snapshot: HashSet<_> = snapshot.into_iter().collect();
assert_eq!(
snapshot,
@ -104,6 +105,8 @@ fn test_macro_forms() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),
(
@ -117,6 +120,8 @@ fn test_macro_forms() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),
(
@ -130,6 +135,8 @@ fn test_macro_forms() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),
(
@ -144,11 +151,11 @@ fn test_macro_forms() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),
]
.into_iter()
.collect()
)
}
@ -168,6 +175,8 @@ fn test_no_labels() {
vec![(
MetricKind::Counter,
KeyData::from_name("login_attempts").into(),
None,
None,
DebugValue::Counter(1),
)]
)
@ -211,7 +220,6 @@ fn test_multiple_paths_to_the_same_callsite() {
path2();
let snapshot = snapshotter.snapshot();
let snapshot: HashSet<_> = snapshot.into_iter().collect();
assert_eq!(
snapshot,
@ -227,6 +235,8 @@ fn test_multiple_paths_to_the_same_callsite() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),
(
@ -240,11 +250,11 @@ fn test_multiple_paths_to_the_same_callsite() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
)
]
.into_iter()
.collect()
)
}
@ -282,7 +292,6 @@ fn test_nested_spans() {
outer();
let snapshot = snapshotter.snapshot();
let snapshot: HashSet<_> = snapshot.into_iter().collect();
assert_eq!(
snapshot,
@ -300,10 +309,10 @@ fn test_nested_spans() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
),]
.into_iter()
.collect()
)]
)
}
@ -341,6 +350,8 @@ fn test_label_filtering() {
],
)
.into(),
None,
None,
DebugValue::Counter(1),
)]
)

View File

@ -40,6 +40,7 @@ atomic-shim = "0.1"
parking_lot = "0.11"
aho-corasick = { version = "0.7", optional = true }
dashmap = "3"
indexmap = "1.6"
[dev-dependencies]
criterion = "0.3"

View File

@ -1,8 +1,11 @@
use std::{hash::Hash, hash::Hasher, sync::Arc};
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Mutex};
use crate::{handle::Handle, registry::Registry};
use metrics::{Key, Recorder};
use indexmap::IndexMap;
use metrics::{Key, Recorder, Unit};
/// Metric kinds.
#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy, Ord, PartialOrd)]
@ -60,23 +63,52 @@ impl Hash for DebugValue {
/// Captures point-in-time snapshots of `DebuggingRecorder`.
pub struct Snapshotter {
registry: Arc<Registry<DifferentiatedKey, Handle>>,
metrics: Arc<Mutex<IndexMap<DifferentiatedKey, ()>>>,
units: Arc<Mutex<HashMap<DifferentiatedKey, Unit>>>,
descriptions: Arc<Mutex<HashMap<DifferentiatedKey, &'static str>>>,
}
impl Snapshotter {
/// Takes a snapshot of the recorder.
pub fn snapshot(&self) -> Vec<(MetricKind, Key, DebugValue)> {
let mut metrics = Vec::new();
pub fn snapshot(
&self,
) -> Vec<(
MetricKind,
Key,
Option<Unit>,
Option<&'static str>,
DebugValue,
)> {
let mut snapshot = Vec::new();
let handles = self.registry.get_handles();
for (dkey, handle) in handles {
let (kind, key) = dkey.into_parts();
let value = match kind {
MetricKind::Counter => DebugValue::Counter(handle.read_counter()),
MetricKind::Gauge => DebugValue::Gauge(handle.read_gauge()),
MetricKind::Histogram => DebugValue::Histogram(handle.read_histogram()),
};
metrics.push((kind, key, value));
let metrics = {
let metrics = self.metrics.lock().expect("metrics lock poisoned");
metrics.clone()
};
for (dkey, _) in metrics.into_iter() {
if let Some(handle) = handles.get(&dkey) {
let unit = self
.units
.lock()
.expect("units lock poisoned")
.get(&dkey)
.cloned();
let description = self
.descriptions
.lock()
.expect("descriptions lock poisoned")
.get(&dkey)
.cloned();
let (kind, key) = dkey.into_parts();
let value = match kind {
MetricKind::Counter => DebugValue::Counter(handle.read_counter()),
MetricKind::Gauge => DebugValue::Gauge(handle.read_gauge()),
MetricKind::Histogram => DebugValue::Histogram(handle.read_histogram()),
};
snapshot.push((kind, key, unit, description, value));
}
}
metrics
snapshot
}
}
@ -86,6 +118,9 @@ impl Snapshotter {
/// to the raw values.
pub struct DebuggingRecorder {
registry: Arc<Registry<DifferentiatedKey, Handle>>,
metrics: Arc<Mutex<IndexMap<DifferentiatedKey, ()>>>,
units: Arc<Mutex<HashMap<DifferentiatedKey, Unit>>>,
descriptions: Arc<Mutex<HashMap<DifferentiatedKey, &'static str>>>,
}
impl DebuggingRecorder {
@ -93,6 +128,9 @@ impl DebuggingRecorder {
pub fn new() -> DebuggingRecorder {
DebuggingRecorder {
registry: Arc::new(Registry::new()),
metrics: Arc::new(Mutex::new(IndexMap::new())),
units: Arc::new(Mutex::new(HashMap::new())),
descriptions: Arc::new(Mutex::new(HashMap::new())),
}
}
@ -100,6 +138,32 @@ impl DebuggingRecorder {
pub fn snapshotter(&self) -> Snapshotter {
Snapshotter {
registry: self.registry.clone(),
metrics: self.metrics.clone(),
units: self.units.clone(),
descriptions: self.descriptions.clone(),
}
}
fn register_metric(&self, rkey: DifferentiatedKey) {
let mut metrics = self.metrics.lock().expect("metrics lock poisoned");
let _ = metrics.insert(rkey.clone(), ());
}
fn insert_unit_description(
&self,
rkey: DifferentiatedKey,
unit: Option<Unit>,
description: Option<&'static str>,
) {
if let Some(unit) = unit {
let mut units = self.units.lock().expect("units lock poisoned");
let uentry = units.entry(rkey.clone()).or_insert_with(|| unit.clone());
*uentry = unit;
}
if let Some(description) = description {
let mut descriptions = self.descriptions.lock().expect("description lock poisoned");
let dentry = descriptions.entry(rkey).or_insert_with(|| description);
*dentry = description;
}
}
@ -110,23 +174,30 @@ impl DebuggingRecorder {
}
impl Recorder for DebuggingRecorder {
fn register_counter(&self, key: Key, _description: Option<&'static str>) {
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let rkey = DifferentiatedKey(MetricKind::Counter, key);
self.register_metric(rkey.clone());
self.insert_unit_description(rkey.clone(), unit, description);
self.registry.op(rkey, |_| {}, || Handle::counter())
}
fn register_gauge(&self, key: Key, _description: Option<&'static str>) {
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let rkey = DifferentiatedKey(MetricKind::Gauge, key);
self.register_metric(rkey.clone());
self.insert_unit_description(rkey.clone(), unit, description);
self.registry.op(rkey, |_| {}, || Handle::gauge())
}
fn register_histogram(&self, key: Key, _description: Option<&'static str>) {
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let rkey = DifferentiatedKey(MetricKind::Histogram, key);
self.register_metric(rkey.clone());
self.insert_unit_description(rkey.clone(), unit, description);
self.registry.op(rkey, |_| {}, || Handle::histogram())
}
fn increment_counter(&self, key: Key, value: u64) {
let rkey = DifferentiatedKey(MetricKind::Counter, key);
self.register_metric(rkey.clone());
self.registry.op(
rkey,
|handle| handle.increment_counter(value),
@ -136,6 +207,7 @@ impl Recorder for DebuggingRecorder {
fn update_gauge(&self, key: Key, value: f64) {
let rkey = DifferentiatedKey(MetricKind::Gauge, key);
self.register_metric(rkey.clone());
self.registry.op(
rkey,
|handle| handle.update_gauge(value),
@ -145,6 +217,7 @@ impl Recorder for DebuggingRecorder {
fn record_histogram(&self, key: Key, value: u64) {
let rkey = DifferentiatedKey(MetricKind::Histogram, key);
self.register_metric(rkey.clone());
self.registry.op(
rkey,
|handle| handle.record_histogram(value),

View File

@ -1,4 +1,4 @@
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
/// Fans out metrics to multiple recorders.
pub struct Fanout {
@ -6,21 +6,21 @@ pub struct Fanout {
}
impl Recorder for Fanout {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
for recorder in &self.recorders {
recorder.register_counter(key.clone(), description);
recorder.register_counter(key.clone(), unit.clone(), description);
}
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
for recorder in &self.recorders {
recorder.register_gauge(key.clone(), description);
recorder.register_gauge(key.clone(), unit.clone(), description);
}
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
for recorder in &self.recorders {
recorder.register_histogram(key.clone(), description);
recorder.register_histogram(key.clone(), unit.clone(), description);
}
}
@ -73,7 +73,7 @@ impl FanoutBuilder {
mod tests {
use super::FanoutBuilder;
use crate::debugging::DebuggingRecorder;
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
#[test]
fn test_basic_functionality() {
@ -91,8 +91,18 @@ mod tests {
assert_eq!(before1.len(), 0);
assert_eq!(before2.len(), 0);
fanout.register_counter(Key::Owned("tokio.loops".into()), None);
fanout.register_gauge(Key::Owned("hyper.sent_bytes".into()), None);
let ud = &[(Unit::Count, "counter desc"), (Unit::Bytes, "gauge desc")];
fanout.register_counter(
Key::Owned("tokio.loops".into()),
Some(ud[0].0.clone()),
Some(ud[0].1),
);
fanout.register_gauge(
Key::Owned("hyper.sent_bytes".into()),
Some(ud[1].0.clone()),
Some(ud[1].1),
);
fanout.increment_counter(Key::Owned("tokio.loops".into()), 47);
fanout.update_gauge(Key::Owned("hyper.sent_bytes".into()), 12.0);
@ -101,11 +111,21 @@ mod tests {
assert_eq!(after1.len(), 2);
assert_eq!(after2.len(), 2);
let after = after1.into_iter().zip(after2).collect::<Vec<_>>();
let after = after1
.into_iter()
.zip(after2)
.enumerate()
.collect::<Vec<_>>();
for ((_, k1, v1), (_, k2, v2)) in after {
for (i, ((_, k1, u1, d1, v1), (_, k2, u2, d2, v2))) in after {
assert_eq!(k1, k2);
assert_eq!(u1, u2);
assert_eq!(d1, d2);
assert_eq!(v1, v2);
assert_eq!(Some(ud[i].0.clone()), u1);
assert_eq!(Some(ud[i].0.clone()), u2);
assert_eq!(Some(ud[i].1), d1);
assert_eq!(Some(ud[i].1), d2);
}
}
}

View File

@ -1,6 +1,6 @@
use crate::layers::Layer;
use aho_corasick::{AhoCorasick, AhoCorasickBuilder};
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
/// Filters and discards metrics matching certain name patterns.
///
@ -18,25 +18,25 @@ impl<R> Filter<R> {
}
impl<R: Recorder> Recorder for Filter<R> {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
if self.should_filter(&key) {
return;
}
self.inner.register_counter(key, description)
self.inner.register_counter(key, unit, description)
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
if self.should_filter(&key) {
return;
}
self.inner.register_gauge(key, description)
self.inner.register_gauge(key, unit, description)
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
if self.should_filter(&key) {
return;
}
self.inner.register_histogram(key, description)
self.inner.register_histogram(key, unit, description)
}
fn increment_counter(&self, key: Key, value: u64) {
@ -135,7 +135,7 @@ mod tests {
use super::FilterLayer;
use crate::debugging::DebuggingRecorder;
use crate::layers::Layer;
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
#[test]
fn test_basic_functionality() {
@ -148,17 +148,49 @@ mod tests {
let before = snapshotter.snapshot();
assert_eq!(before.len(), 0);
layered.register_counter(Key::Owned("tokio.loops".into()), None);
layered.register_gauge(Key::Owned("hyper.sent_bytes".into()), None);
layered.register_histogram(Key::Owned("hyper.recv_bytes".into()), None);
layered.register_counter(Key::Owned("bb8.conns".into()), None);
layered.register_gauge(Key::Owned("hyper.tokio.sent_bytes".into()), None);
let ud = &[
(Unit::Count, "counter desc"),
(Unit::Bytes, "gauge desc"),
(Unit::Bytes, "histogram desc"),
(Unit::Count, "counter desc"),
(Unit::Bytes, "gauge desc"),
];
layered.register_counter(
Key::Owned("tokio.loops".into()),
Some(ud[0].0.clone()),
Some(ud[0].1),
);
layered.register_gauge(
Key::Owned("hyper.sent_bytes".into()),
Some(ud[1].0.clone()),
Some(ud[1].1),
);
layered.register_histogram(
Key::Owned("hyper.tokio.sent_bytes".into()),
Some(ud[2].0.clone()),
Some(ud[2].1),
);
layered.register_counter(
Key::Owned("bb8.conns".into()),
Some(ud[3].0.clone()),
Some(ud[3].1),
);
layered.register_gauge(
Key::Owned("hyper.recv_bytes".into()),
Some(ud[4].0.clone()),
Some(ud[4].1),
);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 2);
for (_kind, key, _value) in &after {
for (_kind, key, unit, desc, _value) in after {
assert!(!key.name().contains("tokio") && !key.name().contains("bb8"));
// We cheat here since we're not comparing one-to-one with the source data,
// but we know which metrics are going to make it through so we can hard code.
assert_eq!(Some(Unit::Bytes), unit);
assert!(!desc.unwrap().is_empty() && desc.unwrap() == "gauge desc");
}
}
@ -174,16 +206,16 @@ mod tests {
let before = snapshotter.snapshot();
assert_eq!(before.len(), 0);
layered.register_counter(Key::Owned("tokiO.loops".into()), None);
layered.register_gauge(Key::Owned("hyper.sent_bytes".into()), None);
layered.register_histogram(Key::Owned("hyper.recv_bytes".into()), None);
layered.register_counter(Key::Owned("bb8.conns".into()), None);
layered.register_counter(Key::Owned("Bb8.conns_closed".into()), None);
layered.register_counter(Key::Owned("tokiO.loops".into()), None, None);
layered.register_gauge(Key::Owned("hyper.sent_bytes".into()), None, None);
layered.register_histogram(Key::Owned("hyper.recv_bytes".into()), None, None);
layered.register_counter(Key::Owned("bb8.conns".into()), None, None);
layered.register_counter(Key::Owned("Bb8.conns_closed".into()), None, None);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 2);
for (_kind, key, _value) in &after {
for (_kind, key, _unit, _desc, _value) in &after {
assert!(
!key.name().to_lowercase().contains("tokio")
&& !key.name().to_lowercase().contains("bb8")

View File

@ -8,7 +8,7 @@
//! Here's an example of a layer that filters out all metrics that start with a specific string:
//!
//! ```rust
//! # use metrics::{Key, Recorder};
//! # use metrics::{Key, Recorder, Unit};
//! # use metrics_util::DebuggingRecorder;
//! # use metrics_util::layers::{Layer, Stack, PrefixLayer};
//! // A simple layer that denies any metrics that have "stairway" or "heaven" in their name.
@ -22,25 +22,25 @@
//! }
//!
//! impl<R: Recorder> Recorder for StairwayDeny<R> {
//! fn register_counter(&self, key: Key, description: Option<&'static str>) {
//! fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
//! if self.is_invalid_key(&key) {
//! return;
//! }
//! self.0.register_counter(key, description)
//! self.0.register_counter(key, unit, description)
//! }
//!
//! fn register_gauge(&self, key: Key, description: Option<&'static str>) {
//! fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
//! if self.is_invalid_key(&key) {
//! return;
//! }
//! self.0.register_gauge(key, description)
//! self.0.register_gauge(key, unit, description)
//! }
//!
//! fn register_histogram(&self, key: Key, description: Option<&'static str>) {
//! fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
//! if self.is_invalid_key(&key) {
//! return;
//! }
//! self.0.register_histogram(key, description)
//! self.0.register_histogram(key, unit, description)
//! }
//!
//! fn increment_counter(&self, key: Key, value: u64) {
@ -102,7 +102,7 @@
//! .expect("failed to install stack");
//! # }
//! ```
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
#[cfg(feature = "std")]
use metrics::SetRecorderError;
@ -155,16 +155,16 @@ impl<R: Recorder + 'static> Stack<R> {
}
impl<R: Recorder> Recorder for Stack<R> {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
self.inner.register_counter(key, description)
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_counter(key, unit, description);
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
self.inner.register_gauge(key, description)
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_gauge(key, unit, description);
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
self.inner.register_histogram(key, description)
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_histogram(key, unit, description);
}
fn increment_counter(&self, key: Key, value: u64) {

View File

@ -1,5 +1,5 @@
use crate::layers::Layer;
use metrics::{Key, Recorder};
use metrics::{Key, Recorder, Unit};
/// Applies a prefix to every metric key.
///
@ -18,19 +18,19 @@ impl<R> Prefix<R> {
}
impl<R: Recorder> Recorder for Prefix<R> {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let new_key = self.prefix_key(key);
self.inner.register_counter(new_key, description)
self.inner.register_counter(new_key, unit, description)
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let new_key = self.prefix_key(key);
self.inner.register_gauge(new_key, description)
self.inner.register_gauge(new_key, unit, description)
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
let new_key = self.prefix_key(key);
self.inner.register_histogram(new_key, description)
self.inner.register_histogram(new_key, unit, description)
}
fn increment_counter(&self, key: Key, value: u64) {
@ -77,7 +77,7 @@ mod tests {
use super::PrefixLayer;
use crate::debugging::DebuggingRecorder;
use crate::layers::Layer;
use metrics::{KeyData, Recorder};
use metrics::{KeyData, Recorder, Unit};
#[test]
fn test_basic_functionality() {
@ -89,15 +89,35 @@ mod tests {
let before = snapshotter.snapshot();
assert_eq!(before.len(), 0);
layered.register_counter(KeyData::from_name("counter_metric").into(), None);
layered.register_gauge(KeyData::from_name("gauge_metric").into(), None);
layered.register_histogram(KeyData::from_name("histogram_metric").into(), None);
let ud = &[
(Unit::Nanoseconds, "counter desc"),
(Unit::Microseconds, "gauge desc"),
(Unit::Milliseconds, "histogram desc"),
];
layered.register_counter(
KeyData::from_name("counter_metric").into(),
Some(ud[0].0.clone()),
Some(ud[0].1),
);
layered.register_gauge(
KeyData::from_name("gauge_metric").into(),
Some(ud[1].0.clone()),
Some(ud[1].1),
);
layered.register_histogram(
KeyData::from_name("histogram_metric").into(),
Some(ud[2].0.clone()),
Some(ud[2].1),
);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 3);
for (_kind, key, _value) in &after {
for (i, (_kind, key, unit, desc, _value)) in after.iter().enumerate() {
assert!(key.name().starts_with("testing"));
assert_eq!(&Some(ud[i].0.clone()), unit);
assert_eq!(&Some(ud[i].1), desc);
}
}
}

View File

@ -3,15 +3,22 @@ extern crate criterion;
use criterion::{Benchmark, Criterion};
use metrics::{counter, Key, Recorder};
use metrics::{counter, Key, Recorder, Unit};
use rand::{thread_rng, Rng};
#[derive(Default)]
struct TestRecorder;
impl Recorder for TestRecorder {
fn register_counter(&self, _key: Key, _description: Option<&'static str>) {}
fn register_gauge(&self, _key: Key, _description: Option<&'static str>) {}
fn register_histogram(&self, _key: Key, _description: Option<&'static str>) {}
fn register_counter(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {
}
fn register_gauge(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
fn register_histogram(
&self,
_key: Key,
_unit: Option<Unit>,
_description: Option<&'static str>,
) {
}
fn increment_counter(&self, _key: Key, _value: u64) {}
fn update_gauge(&self, _key: Key, _value: f64) {}
fn record_histogram(&self, _key: Key, _value: u64) {}

View File

@ -1,4 +1,4 @@
use metrics::{counter, gauge, histogram, increment, Key, Recorder};
use metrics::{counter, gauge, histogram, increment, Key, Recorder, Unit};
#[allow(dead_code)]
static RECORDER: PrintRecorder = PrintRecorder;
@ -7,24 +7,24 @@ static RECORDER: PrintRecorder = PrintRecorder;
struct PrintRecorder;
impl Recorder for PrintRecorder {
fn register_counter(&self, key: Key, description: Option<&'static str>) {
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
println!(
"(counter) registered key {} with description {:?}",
key, description
"(counter) registered key {} with unit {:?} and description {:?}",
key, unit, description
);
}
fn register_gauge(&self, key: Key, description: Option<&'static str>) {
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
println!(
"(gauge) registered key {} with description {:?}",
key, description
"(gauge) registered key {} with unit {:?} and description {:?}",
key, unit, description
);
}
fn register_histogram(&self, key: Key, description: Option<&'static str>) {
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
println!(
"(histogram) registered key {} with description {:?}",
key, description
"(histogram) registered key {} with unit {:?} and description {:?}",
key, unit, description
);
}

View File

@ -7,6 +7,109 @@ use std::borrow::Cow;
/// take ownership of owned strings and borrows of completely static strings.
pub type ScopedString = Cow<'static, str>;
/// Units for a given metric.
///
/// While metrics do not necessarily need to be tied to a particular unit to be recorded, some
/// downstream systems natively support defining units and so they can be specified during registration.
#[derive(Clone, Debug, PartialEq)]
pub enum Unit {
// Dimensionless measurements.
/// Count.
Count,
/// Percentage.
Percent,
// Time measurements.
/// Seconds.
Seconds,
/// Milliseconds.
Milliseconds,
/// Microseconds.
Microseconds,
/// Nanoseconds.
Nanoseconds,
// Data measurement.
/// Terabytes.
Terabytes,
/// Gigabytes.
Gigabytes,
/// Megabytes.
Megabytes,
/// Kilobytes.
Kilobytes,
/// Bytes.
Bytes,
/// Terabits.
Terabits,
/// Gigabits.
Gigabits,
/// Megabits.
Megabits,
/// Kilobits.
Kilobits,
/// Bits.
Bits,
// Rate measurements.
/// Terabytes per second.
TerabytesPerSecond,
/// Gigabytes per second.
GigabytesPerSecond,
/// Megabytes per second.
MegabytesPerSecond,
/// Kilobytes per second.
KilobytesPerSecond,
/// Bytes per second.
BytesPerSecond,
/// Terabits per second.
TerabitsPerSecond,
/// Gigabits per second.
GigabitsPerSecond,
/// Megabits per second.
MegabitsPerSecond,
/// Kilobits per second.
KilobitsPerSecond,
/// Bits per second.
BitsPerSecond,
/// Count per second.
CountPerSecond,
}
impl Unit {
/// Gets the string form of this `Unit`.
pub fn as_str(&self) -> &str {
match self {
Unit::Count => "count",
Unit::Percent => "percent",
Unit::Seconds => "seconds",
Unit::Milliseconds => "milliseconds",
Unit::Microseconds => "microseconds",
Unit::Nanoseconds => "nanoseconds",
Unit::Terabytes => "terabytes",
Unit::Gigabytes => "gigabytes",
Unit::Megabytes => "megabytes",
Unit::Kilobytes => "kilobytes",
Unit::Bytes => "bytes",
Unit::Terabits => "terabits",
Unit::Gigabits => "gigabits",
Unit::Megabits => "megabits",
Unit::Kilobits => "kilobits",
Unit::Bits => "bits",
Unit::TerabytesPerSecond => "terabytes_per_second",
Unit::GigabytesPerSecond => "gigabytes_per_second",
Unit::MegabytesPerSecond => "megabytes_per_second",
Unit::KilobytesPerSecond => "kilobytes_per_second",
Unit::BytesPerSecond => "bytes_per_second",
Unit::TerabitsPerSecond => "terabits_per_second",
Unit::GigabitsPerSecond => "gigabits_per_second",
Unit::MegabitsPerSecond => "megabits_per_second",
Unit::KilobitsPerSecond => "kilobits_per_second",
Unit::BitsPerSecond => "bits_per_second",
Unit::CountPerSecond => "count_per_second",
}
}
}
/// An object which can be converted into a `u64` representation.
///
/// This trait provides a mechanism for existing types, which have a natural representation

View File

@ -61,16 +61,16 @@
//!
//! ```rust
//! use log::info;
//! use metrics::{Key, Recorder};
//! use metrics::{Key, Recorder, Unit};
//!
//! struct LogRecorder;
//!
//! impl Recorder for LogRecorder {
//! fn register_counter(&self, key: Key, _description: Option<&'static str>) {}
//! fn register_counter(&self, key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
//!
//! fn register_gauge(&self, key: Key, _description: Option<&'static str>) {}
//! fn register_gauge(&self, key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
//!
//! fn register_histogram(&self, key: Key, _description: Option<&'static str>) {}
//! fn register_histogram(&self, key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
//!
//! fn increment_counter(&self, key: Key, value: u64) {
//! info!("counter '{}' -> {}", key, value);
@ -91,12 +91,12 @@
//! function that wraps the creation and installation of the recorder:
//!
//! ```rust
//! # use metrics::{Recorder, Key};
//! # use metrics::{Key, Recorder, Unit};
//! # struct LogRecorder;
//! # impl Recorder for LogRecorder {
//! # fn register_counter(&self, _key: Key, _description: Option<&'static str>) {}
//! # fn register_gauge(&self, _key: Key, _description: Option<&'static str>) {}
//! # fn register_histogram(&self, _key: Key, _description: Option<&'static str>) {}
//! # fn register_counter(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
//! # fn register_gauge(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
//! # fn register_histogram(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
//! # fn increment_counter(&self, _key: Key, _value: u64) {}
//! # fn update_gauge(&self, _key: Key, _value: f64) {}
//! # fn record_histogram(&self, _key: Key, _value: u64) {}
@ -119,12 +119,12 @@
//! that it takes a `Box<Recorder>` rather than a `&'static Recorder`:
//!
//! ```rust
//! # use metrics::{Recorder, Key};
//! # use metrics::{Key, Recorder, Unit};
//! # struct LogRecorder;
//! # impl Recorder for LogRecorder {
//! # fn register_counter(&self, _key: Key, _description: Option<&'static str>) {}
//! # fn register_gauge(&self, _key: Key, _description: Option<&'static str>) {}
//! # fn register_histogram(&self, _key: Key, _description: Option<&'static str>) {}
//! # fn register_counter(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
//! # fn register_gauge(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
//! # fn register_histogram(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
//! # fn increment_counter(&self, _key: Key, _value: u64) {}
//! # fn update_gauge(&self, _key: Key, _value: f64) {}
//! # fn record_histogram(&self, _key: Key, _value: u64) {}

View File

@ -1,4 +1,4 @@
use crate::Key;
use crate::{Key, Unit};
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
@ -16,24 +16,27 @@ static SET_RECORDER_ERROR: &str =
pub trait Recorder {
/// Registers a counter.
///
/// Callers may provide a description of the counter being registered. Whether or not a metric
/// can be reregistered to provide a description, if one was already passed or not, as well as
/// how descriptions are used by the underlying recorder, is an implementation detail.
fn register_counter(&self, key: Key, description: Option<&'static str>);
/// Callers may provide the unit or a description of the counter being registered. Whether or
/// not a metric can be reregistered to provide a unit/description, if one was already passed
/// or not, as well as how descriptions are used by the underlying recorder, is an
/// implementation detail.
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>);
/// Registers a gauge.
///
/// Callers may provide a description of the counter being registered. Whether or not a metric
/// can be reregistered to provide a description, if one was already passed or not, as well as
/// how descriptions are used by the underlying recorder, is an implementation detail.
fn register_gauge(&self, key: Key, description: Option<&'static str>);
/// Callers may provide the unit or a description of the gauge being registered. Whether or
/// not a metric can be reregistered to provide a unit/description, if one was already passed
/// or not, as well as how descriptions are used by the underlying recorder, is an
/// implementation detail.
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>);
/// Registers a histogram.
///
/// Callers may provide a description of the counter being registered. Whether or not a metric
/// can be reregistered to provide a description, if one was already passed or not, as well as
/// how descriptions are used by the underlying recorder, is an implementation detail.
fn register_histogram(&self, key: Key, description: Option<&'static str>);
/// Callers may provide the unit or a description of the histogram being registered. Whether or
/// not a metric can be reregistered to provide a unit/description, if one was already passed
/// or not, as well as how descriptions are used by the underlying recorder, is an
/// implementation detail.
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>);
/// Increments a counter.
fn increment_counter(&self, key: Key, value: u64);
@ -42,18 +45,22 @@ pub trait Recorder {
fn update_gauge(&self, key: Key, value: f64);
/// Records a histogram.
///
/// The value can be value that implements [`IntoU64`]. By default, `metrics` provides an
/// implementation for both `u64` itself as well as [`Duration`](std::time::Duration).
fn record_histogram(&self, key: Key, value: u64);
}
struct NoopRecorder;
impl Recorder for NoopRecorder {
fn register_counter(&self, _key: Key, _description: Option<&'static str>) {}
fn register_gauge(&self, _key: Key, _description: Option<&'static str>) {}
fn register_histogram(&self, _key: Key, _description: Option<&'static str>) {}
fn register_counter(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {
}
fn register_gauge(&self, _key: Key, _unit: Option<Unit>, _description: Option<&'static str>) {}
fn register_histogram(
&self,
_key: Key,
_unit: Option<Unit>,
_description: Option<&'static str>,
) {
}
fn increment_counter(&self, _key: Key, _value: u64) {}
fn update_gauge(&self, _key: Key, _value: f64) {}
fn record_histogram(&self, _key: Key, _value: u64) {}