simplify labels + show registration example + reduce allocs

This commit is contained in:
Toby Lawrence 2020-06-15 22:21:33 -04:00
parent 35316e5df1
commit ce3bef9c84
8 changed files with 224 additions and 184 deletions

View File

@ -1,7 +1,7 @@
use std::thread;
use std::time::Duration;
use metrics::{histogram, increment};
use metrics::{register_counter, register_histogram, histogram, increment};
use metrics_exporter_prometheus::PrometheusBuilder;
use quanta::Clock;
@ -14,6 +14,9 @@ fn main() {
.install()
.expect("failed to install Prometheus recorder");
register_counter!("tcp_server_loops", "The iterations of the TCP server event loop so far.");
register_histogram!("tcp_server_loop_delta_ns", "The time taken for iterations of the TCP server event loop.");
let clock = Clock::new();
let mut last = None;

View File

@ -72,6 +72,7 @@ struct Inner {
quantiles: Vec<Quantile>,
buckets: Vec<u64>,
buckets_by_name: Option<HashMap<String, Vec<u64>>>,
descriptions: RwLock<HashMap<String, &'static str>>,
}
impl Inner {
@ -185,12 +186,22 @@ impl Inner {
.unwrap_or(0);
let mut output = format!(
"# metrics snapshot (ts={}) (prometheus exposition format)",
"# metrics snapshot (ts={}) (prometheus exposition format)\n",
ts
);
let descriptions = self.descriptions.read();
for (name, mut by_labels) in counters.drain() {
output.push_str("\n# TYPE ");
if let Some(desc) = descriptions.get(name.as_str()) {
output.push_str("# HELP ");
output.push_str(name.as_str());
output.push_str(" ");
output.push_str(desc);
output.push_str("\n");
}
output.push_str("# TYPE ");
output.push_str(name.as_str());
output.push_str(" counter\n");
for (labels, value) in by_labels.drain() {
@ -200,10 +211,19 @@ impl Inner {
output.push_str(value.to_string().as_str());
output.push_str("\n");
}
output.push_str("\n");
}
for (name, mut by_labels) in gauges.drain() {
output.push_str("\n# TYPE ");
if let Some(desc) = descriptions.get(name.as_str()) {
output.push_str("# HELP ");
output.push_str(name.as_str());
output.push_str(" ");
output.push_str(desc);
output.push_str("\n");
}
output.push_str("# TYPE ");
output.push_str(name.as_str());
output.push_str(" gauge\n");
for (labels, value) in by_labels.drain() {
@ -213,6 +233,7 @@ impl Inner {
output.push_str(value.to_string().as_str());
output.push_str("\n");
}
output.push_str("\n");
}
let mut sorted_overrides = self
@ -223,11 +244,19 @@ impl Inner {
sorted_overrides.sort_by(|(a, _), (b, _)| b.len().cmp(&a.len()));
for (name, mut by_labels) in distributions.drain() {
if let Some(desc) = descriptions.get(name.as_str()) {
output.push_str("# HELP ");
output.push_str(name.as_str());
output.push_str(" ");
output.push_str(desc);
output.push_str("\n");
}
let has_buckets = sorted_overrides
.iter()
.any(|(k, _)| !self.buckets.is_empty() || name.ends_with(*k));
output.push_str("\n# TYPE ");
output.push_str("# TYPE ");
output.push_str(name.as_str());
output.push_str(" ");
output.push_str(if has_buckets { "histogram" } else { "summary" });
@ -287,6 +316,8 @@ impl Inner {
output.push_str(count.to_string().as_str());
output.push_str("\n");
}
output.push_str("\n");
}
output
@ -294,10 +325,26 @@ impl Inner {
}
/// A Prometheus recorder.
///
/// This recorder should be composed with other recorders or installed globally via
/// [`metrics::set_boxed_recorder`][set_boxed_recorder].
///
///
pub struct PrometheusRecorder {
inner: Arc<Inner>,
}
impl PrometheusRecorder {
fn add_description_if_missing(&self, key: &Key, description: Option<&'static str>) {
if let Some(description) = description {
let mut descriptions = self.inner.descriptions.write();
if !descriptions.contains_key(key.name().as_ref()) {
descriptions.insert(key.name().to_string(), description);
}
}
}
}
/// Builder for creating and installing a Prometheus recorder/exporter.
pub struct PrometheusBuilder {
listen_address: SocketAddr,
@ -415,6 +462,7 @@ impl PrometheusBuilder {
quantiles: self.quantiles.clone(),
buckets: self.buckets.clone(),
buckets_by_name: self.buckets_by_name.clone(),
descriptions: RwLock::new(HashMap::new()),
});
let recorder = PrometheusRecorder {
@ -446,7 +494,8 @@ impl PrometheusBuilder {
}
impl Recorder for PrometheusRecorder {
fn register_counter(&self, key: Key, _description: Option<&'static str>) -> Identifier {
fn register_counter(&self, key: Key, description: Option<&'static str>) -> Identifier {
self.add_description_if_missing(&key, description);
self.inner
.registry()
.get_or_create_identifier(CompositeKey::new(MetricKind::Counter, key), |_| {
@ -454,7 +503,8 @@ impl Recorder for PrometheusRecorder {
})
}
fn register_gauge(&self, key: Key, _description: Option<&'static str>) -> Identifier {
fn register_gauge(&self, key: Key, description: Option<&'static str>) -> Identifier {
self.add_description_if_missing(&key, description);
self.inner
.registry()
.get_or_create_identifier(CompositeKey::new(MetricKind::Gauge, key), |_| {
@ -462,7 +512,8 @@ impl Recorder for PrometheusRecorder {
})
}
fn register_histogram(&self, key: Key, _description: Option<&'static str>) -> Identifier {
fn register_histogram(&self, key: Key, description: Option<&'static str>) -> Identifier {
self.add_description_if_missing(&key, description);
self.inner
.registry()
.get_or_create_identifier(CompositeKey::new(MetricKind::Histogram, key), |_| {
@ -494,18 +545,21 @@ fn key_to_parts(key: Key) -> (String, Vec<String>) {
let sanitize = |c| c == '.' || c == '=' || c == '{' || c == '}' || c == '+' || c == '-';
let name = name.replace(sanitize, "_");
let labels = labels
.into_iter()
.map(Label::into_parts)
.map(|(k, v)| {
format!(
"{}=\"{}\"",
k,
v.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
)
.map(|labels| {
labels.into_iter()
.map(Label::into_parts)
.map(|(k, v)| {
format!(
"{}=\"{}\"",
k,
v.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
)
})
.collect()
})
.collect();
.unwrap_or_else(|| Vec::new());
(name, labels)
}

View File

@ -1,3 +1,49 @@
//! A [`metrics`][metrics]-compatible exporter that outputs metrics to clients over TCP.
//!
//! This exporter creates a TCP server, that when connected to, will stream individual metrics to
//! the client using a Protocol Buffers encoding.
//!
//! # Backpressure
//! The exporter has configurable buffering, which allows users to trade off how many metrics they
//! want to be queued up at any given time. This buffer limit applies both to incoming metrics, as
//! well as the individual buffers for each connected client.
//!
//! By default, the buffer limit is set at 1024 metrics. When the incoming buffer -- metrics being
//! fed to the exported -- is full, metrics will be dropped. If a client's buffer is full,
//! potentially due to slow network conditions or slow processing, then messages in the client's
//! buffer will be dropped in FIFO order in order to allow the exporter to continue fanning out
//! metrics to clients.
//!
//! If no buffer limit is set, then te exporter will ingest and enqueue as many metrics as possible,
//! potentially up until the point of memory exhaustion. A buffer limit is advised for this reason,
//! even if it is many multiples of the default.
//!
//! # Encoding
//! Metrics are encoded using Protocol Buffers. The protocol file can be found in the repository at
//! `proto/event.proto`.
//!
//! # Usage
//! The TCP exporter can be constructed by creating a [`TcpBuilder], configuring it as needed, and
//! calling [`TcpBuilder::install`] to both spawn the TCP server as well as install the exporter
//! globally.
//!
//! If necessary, the recorder itself can be returned so that it can be composed separately, while
//! still installing the TCP server itself, by calling [`TcpBuilder::build`].
//!
//! ```
//! # use metrics_exporter_tcp::TcpBuilder;
//! # fn direct() {
//! // Install the exporter directly:
//! let builder = TcpBuilder::new();
//! builder.install().expect("failed to install TCP exporter");
//!
//! // Or install the TCP server and get the recorder:
//! let builder = TcpBuilder::new();
//! let recorder = builder.build().expect("failed to install TCP exporter");
//! # }
//! ```
//!
//! [metrics]: https://docs.rs/metrics
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::io::{self, Write};
use std::net::SocketAddr;
@ -49,13 +95,13 @@ impl CompositeKey {
}
}
// Errors that could occur while installing a TCP recorder/exporter.
/// Errors that could occur while installing a TCP recorder/exporter.
#[derive(Debug)]
pub enum Error {
// Creating the networking event loop did not succeed.
/// Creating the networking event loop did not succeed.
Io(io::Error),
// Installing the recorder did not succeed.
/// Installing the recorder did not succeed.
Recorder(SetRecorderError),
}
@ -71,7 +117,8 @@ impl From<SetRecorderError> for Error {
}
}
struct TcpRecorder {
/// A TCP recorder.
pub struct TcpRecorder {
registry: Arc<TcpRegistry>,
tx: Sender<(Identifier, MetricValue)>,
waker: Arc<Waker>,
@ -129,6 +176,17 @@ impl TcpBuilder {
/// An error will be returned if there's an issue with creating the TCP server or with
/// installing the recorder as the global recorder.
pub fn install(self) -> Result<(), Error> {
let recorder = self.build()?;
metrics::set_boxed_recorder(Box::new(recorder))?;
Ok(())
}
/// Builds and installs the exporter, but returns the recorder.
///
/// In most cases, users should prefer to use [`TcpBuilder::install`] to create and install
/// the recorder and exporter automatically for them. If a caller is combining recorders,
/// however, then this method allows the caller the flexibility to do so.
pub fn build(self) -> Result<TcpRecorder, Error> {
let buffer_size = self.buffer_size;
let (tx, rx) = match buffer_size {
None => unbounded(),
@ -149,10 +207,9 @@ impl TcpBuilder {
tx,
waker: Arc::clone(&waker),
};
metrics::set_boxed_recorder(Box::new(recorder))?;
thread::spawn(move || run_transport(registry, poll, waker, listener, rx, buffer_size));
Ok(())
Ok(recorder)
}
}
@ -409,8 +466,12 @@ fn convert_metric_to_protobuf_encoded(
let labels = ckey
.key()
.labels()
.map(|label| (label.key().to_string(), label.value().to_string()))
.collect::<BTreeMap<_, _>>();
.map(|labels| {
labels
.map(|label| (label.key().to_string(), label.value().to_string()))
.collect::<BTreeMap<_, _>>()
})
.unwrap_or_else(|| BTreeMap::new());
let mvalue = match value {
MetricValue::Counter(cv) => proto::metric::Value::Counter(proto::Counter { value: cv }),
MetricValue::Gauge(gv) => proto::metric::Value::Gauge(proto::Gauge { value: gv }),

View File

@ -185,10 +185,8 @@ fn get_expanded_registration(
labels: Vec<(LitStr, Expr)>,
) -> TokenStream {
let register_ident = format_ident!("register_{}", metric_type);
let key = key_to_quoted(key);
let insertable_labels = labels
.into_iter()
.map(|(k, v)| quote! { metrics::Label::new(#k, #v) });
let key = key_to_quoted(key, labels);
let desc = match desc {
Some(desc) => quote! { Some(#desc) },
None => quote! { None },
@ -198,8 +196,7 @@ fn get_expanded_registration(
{
// Only do this work if there's a recorder installed.
if let Some(recorder) = metrics::try_recorder() {
let mlabels = vec![#(#insertable_labels),*];
recorder.#register_ident((#key, mlabels).into(), #desc);
recorder.#register_ident(#key, #desc);
}
}
};
@ -219,17 +216,8 @@ where
{
let register_ident = format_ident!("register_{}", metric_type);
let op_ident = format_ident!("{}_{}", op_type, metric_type);
let key = key_to_quoted(key);
let use_fast_path = can_use_fast_path(&labels);
let composite_key = if labels.is_empty() {
quote! { #key.into() }
} else {
let insertable_labels = labels
.into_iter()
.map(|(k, v)| quote! { metrics::Label::new(#k, #v) });
quote! { (#key, vec![#(#insertable_labels),*]).into() }
};
let key = key_to_quoted(key, labels);
let op_values = if metric_type == "histogram" {
quote! {
@ -251,9 +239,8 @@ where
if let Some(recorder) = metrics::try_recorder() {
// Initialize our fast path cached identifier.
let id = METRICS_INIT.get_or_init(|| {
recorder.#register_ident(#composite_key, None)
recorder.#register_ident(#key, None)
});
recorder.#op_ident(id, #op_values);
}
}
@ -266,8 +253,7 @@ where
{
// Only do this work if there's a recorder installed.
if let Some(recorder) = metrics::try_recorder() {
let id = recorder.#register_ident(#composite_key, None);
let id = recorder.#register_ident(#key, None);
recorder.#op_ident(id, #op_values);
}
}
@ -288,8 +274,8 @@ fn read_key(input: &mut ParseStream) -> Result<Key> {
}
}
fn key_to_quoted(key: Key) -> proc_macro2::TokenStream {
match key {
fn key_to_quoted(key: Key, labels: Vec<(LitStr, Expr)>) -> proc_macro2::TokenStream {
let name = match key {
Key::NotScoped(s) => {
quote! { #s }
},
@ -298,6 +284,17 @@ fn key_to_quoted(key: Key) -> proc_macro2::TokenStream {
format!("{}.{}", std::module_path!().replace("::", "."), #s)
}
},
};
if labels.is_empty() {
quote! { metrics::Key::from_name(#name) }
} else {
let insertable_labels = labels
.into_iter()
.map(|(k, v)| quote! { metrics::Label::new(#k, #v) });
quote! {
metrics::Key::from_name_and_labels(#name, vec![#(#insertable_labels),*])
}
}
}

View File

@ -1,4 +1,4 @@
use crate::{IntoLabels, Label, ScopedString};
use crate::{Label, ScopedString};
use std::{fmt, slice::Iter};
/// A metric key.
@ -8,7 +8,7 @@ use std::{fmt, slice::Iter};
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub struct Key {
name: ScopedString,
labels: Vec<Label>,
labels: Option<Vec<Label>>,
}
impl Key {
@ -19,71 +19,59 @@ impl Key {
{
Key {
name: name.into(),
labels: Vec::new(),
labels: None,
}
}
/// Creates a `Key` from a name and vector of `Label`s.
pub fn from_name_and_labels<N, L>(name: N, labels: L) -> Self
pub fn from_name_and_labels<N>(name: N, labels: Vec<Label>) -> Self
where
N: Into<ScopedString>,
L: IntoLabels,
{
Key {
name: name.into(),
labels: labels.into_labels(),
labels: Some(labels),
}
}
/// Adds a new set of labels to this key.
///
/// New labels will be appended to any existing labels.
pub fn add_labels<L>(&mut self, new_labels: L)
where
L: IntoLabels,
{
self.labels.extend(new_labels.into_labels());
}
/// Name of this key.
pub fn name(&self) -> ScopedString {
self.name.clone()
pub fn name(&self) -> &ScopedString {
&self.name
}
/// Labels of this key, if they exist.
pub fn labels(&self) -> Iter<Label> {
self.labels.iter()
}
/// Maps the name of this `Key` to a new name.
pub fn map_name<F, S>(self, f: F) -> Self
where
F: FnOnce(ScopedString) -> S,
S: Into<ScopedString>,
{
Key {
name: f(self.name).into(),
labels: self.labels,
}
pub fn labels(&self) -> Option<Iter<Label>> {
self.labels.as_ref().map(|xs| xs.iter())
}
/// Consumes this `Key`, returning the name and any labels.
pub fn into_parts(self) -> (ScopedString, Vec<Label>) {
pub fn into_parts(self) -> (ScopedString, Option<Vec<Label>>) {
(self.name, self.labels)
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.labels.is_empty() {
write!(f, "Key({})", self.name)
} else {
let kv_pairs = self
.labels
.iter()
.map(|label| format!("{} = {}", label.0, label.1))
.collect::<Vec<_>>();
write!(f, "Key({}, [{}])", self.name, kv_pairs.join(", "))
match &self.labels {
None => write!(f, "Key({})", self.name),
Some(labels) => {
write!(f, "Key({}", self.name)?;
if !labels.is_empty() {
let mut first = true;
write!(f, ", [")?;
for label in labels {
if first {
write!(f, "{} = {}", label.0, label.1)?;
first = false;
} else {
write!(f, ", {} = {}", label.0, label.1)?;
}
}
write!(f, "]")?;
}
write!(f, ")")
},
}
}
}
@ -100,18 +88,34 @@ impl From<&'static str> for Key {
}
}
impl From<ScopedString> for Key {
fn from(name: ScopedString) -> Key {
Key::from_name(name)
}
}
#[cfg(test)]
mod tests {
use super::Key;
use crate::Label;
impl<K, L> From<(K, L)> for Key
where
K: Into<ScopedString>,
L: IntoLabels,
{
fn from(parts: (K, L)) -> Key {
Key::from_name_and_labels(parts.0, parts.1)
#[test]
fn test_key_proper_display() {
let key1 = Key::from_name("foobar");
let result1 = key1.to_string();
assert_eq!(result1, "Key(foobar)");
let key2 = Key::from_name_and_labels("foobar", vec![Label::new("system", "http")]);
let result2 = key2.to_string();
assert_eq!(result2, "Key(foobar, [system = http])");
let key3 = Key::from_name_and_labels("foobar", vec![
Label::new("system", "http"),
Label::new("user", "joe"),
]);
let result3 = key3.to_string();
assert_eq!(result3, "Key(foobar, [system = http, user = joe])");
let key4 = Key::from_name_and_labels("foobar", vec![
Label::new("black", "black"),
Label::new("lives", "lives"),
Label::new("matter", "matter"),
]);
let result4 = key4.to_string();
assert_eq!(result4, "Key(foobar, [black = black, lives = lives, matter = matter])");
}
}
}

View File

@ -28,46 +28,4 @@ impl Label {
pub fn into_parts(self) -> (ScopedString, ScopedString) {
(self.0, self.1)
}
}
impl<K, V> From<(K, V)> for Label
where
K: Into<ScopedString>,
V: Into<ScopedString>,
{
fn from(pair: (K, V)) -> Label {
Label::new(pair.0, pair.1)
}
}
impl<K, V> From<&(K, V)> for Label
where
K: Into<ScopedString> + Clone,
V: Into<ScopedString> + Clone,
{
fn from(pair: &(K, V)) -> Label {
Label::new(pair.0.clone(), pair.1.clone())
}
}
/// A value that can be converted to `Label`s.
pub trait IntoLabels {
/// Consumes this value, turning it into a vector of `Label`s.
fn into_labels(self) -> Vec<Label>;
}
impl IntoLabels for Vec<Label> {
fn into_labels(self) -> Vec<Label> {
self
}
}
impl<T, L> IntoLabels for &T
where
Self: IntoIterator<Item = L>,
L: Into<Label>,
{
fn into_labels(self) -> Vec<Label> {
self.into_iter().map(|l| l.into()).collect()
}
}
}

View File

@ -184,9 +184,6 @@ pub use self::label::*;
mod recorder;
pub use self::recorder::*;
mod macros;
pub use self::macros::*;
/// Registers a counter.
///
/// Counters represent a single value that can only be incremented over time, or reset to zero.

View File

@ -1,34 +0,0 @@
/// Helper macro for generating a set of labels.
///
/// While a `Label` can be generated manually, most users will tend towards the key => value format
/// commonly used for defining hashes/maps in many programming languages. This macro allows users
/// to do the exact same thing in calls that depend on [`metrics::IntoLabels`].
///
/// # Examples
/// ```rust
/// # #[macro_use] extern crate metrics;
/// # use metrics::IntoLabels;
/// fn takes_labels<L: IntoLabels>(name: &str, labels: L) {
/// println!("name: {} labels: {:?}", name, labels.into_labels());
/// }
///
/// takes_labels("requests_processed", labels!("request_type" => "admin"));
/// ```
#[macro_export]
macro_rules! labels {
(@ { $($out:expr),* $(,)* } $(,)*) => {
std::vec![ $($out),* ]
};
(@ { } $k:expr => $v:expr, $($rest:tt)*) => {
$crate::labels!(@ { $crate::Label::new($k, $v) } $($rest)*)
};
(@ { $($out:expr),+ } $k:expr => $v:expr, $($rest:tt)*) => {
$crate::labels!(@ { $($out),+, $crate::Label::new($k, $v) } $($rest)*)
};
($($args:tt)*) => {
$crate::labels!(@ { } $($args)*, )
};
}