From ce3bef9c84c78528659dcfe7628ee5c63aaa39cd Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Mon, 15 Jun 2020 22:21:33 -0400 Subject: [PATCH] simplify labels + show registration example + reduce allocs --- .../examples/prometheus_server.rs | 5 +- metrics-exporter-prometheus/src/lib.rs | 90 +++++++++++--- metrics-exporter-tcp/src/lib.rs | 77 ++++++++++-- metrics-macros/src/lib.rs | 41 +++---- metrics/src/key.rs | 114 +++++++++--------- metrics/src/label.rs | 44 +------ metrics/src/lib.rs | 3 - metrics/src/macros.rs | 34 ------ 8 files changed, 224 insertions(+), 184 deletions(-) delete mode 100644 metrics/src/macros.rs diff --git a/metrics-exporter-prometheus/examples/prometheus_server.rs b/metrics-exporter-prometheus/examples/prometheus_server.rs index 13412a5..00d0d09 100644 --- a/metrics-exporter-prometheus/examples/prometheus_server.rs +++ b/metrics-exporter-prometheus/examples/prometheus_server.rs @@ -1,7 +1,7 @@ use std::thread; use std::time::Duration; -use metrics::{histogram, increment}; +use metrics::{register_counter, register_histogram, histogram, increment}; use metrics_exporter_prometheus::PrometheusBuilder; use quanta::Clock; @@ -14,6 +14,9 @@ fn main() { .install() .expect("failed to install Prometheus recorder"); + register_counter!("tcp_server_loops", "The iterations of the TCP server event loop so far."); + register_histogram!("tcp_server_loop_delta_ns", "The time taken for iterations of the TCP server event loop."); + let clock = Clock::new(); let mut last = None; diff --git a/metrics-exporter-prometheus/src/lib.rs b/metrics-exporter-prometheus/src/lib.rs index 076f654..b543741 100644 --- a/metrics-exporter-prometheus/src/lib.rs +++ b/metrics-exporter-prometheus/src/lib.rs @@ -72,6 +72,7 @@ struct Inner { quantiles: Vec, buckets: Vec, buckets_by_name: Option>>, + descriptions: RwLock>, } impl Inner { @@ -185,12 +186,22 @@ impl Inner { .unwrap_or(0); let mut output = format!( - "# metrics snapshot (ts={}) (prometheus exposition format)", + "# metrics snapshot (ts={}) (prometheus exposition format)\n", ts ); + let descriptions = self.descriptions.read(); + for (name, mut by_labels) in counters.drain() { - output.push_str("\n# TYPE "); + if let Some(desc) = descriptions.get(name.as_str()) { + output.push_str("# HELP "); + output.push_str(name.as_str()); + output.push_str(" "); + output.push_str(desc); + output.push_str("\n"); + } + + output.push_str("# TYPE "); output.push_str(name.as_str()); output.push_str(" counter\n"); for (labels, value) in by_labels.drain() { @@ -200,10 +211,19 @@ impl Inner { output.push_str(value.to_string().as_str()); output.push_str("\n"); } + output.push_str("\n"); } for (name, mut by_labels) in gauges.drain() { - output.push_str("\n# TYPE "); + if let Some(desc) = descriptions.get(name.as_str()) { + output.push_str("# HELP "); + output.push_str(name.as_str()); + output.push_str(" "); + output.push_str(desc); + output.push_str("\n"); + } + + output.push_str("# TYPE "); output.push_str(name.as_str()); output.push_str(" gauge\n"); for (labels, value) in by_labels.drain() { @@ -213,6 +233,7 @@ impl Inner { output.push_str(value.to_string().as_str()); output.push_str("\n"); } + output.push_str("\n"); } let mut sorted_overrides = self @@ -223,11 +244,19 @@ impl Inner { sorted_overrides.sort_by(|(a, _), (b, _)| b.len().cmp(&a.len())); for (name, mut by_labels) in distributions.drain() { + if let Some(desc) = descriptions.get(name.as_str()) { + output.push_str("# HELP "); + output.push_str(name.as_str()); + output.push_str(" "); + output.push_str(desc); + output.push_str("\n"); + } + let has_buckets = sorted_overrides .iter() .any(|(k, _)| !self.buckets.is_empty() || name.ends_with(*k)); - output.push_str("\n# TYPE "); + output.push_str("# TYPE "); output.push_str(name.as_str()); output.push_str(" "); output.push_str(if has_buckets { "histogram" } else { "summary" }); @@ -287,6 +316,8 @@ impl Inner { output.push_str(count.to_string().as_str()); output.push_str("\n"); } + + output.push_str("\n"); } output @@ -294,10 +325,26 @@ impl Inner { } /// A Prometheus recorder. +/// +/// This recorder should be composed with other recorders or installed globally via +/// [`metrics::set_boxed_recorder`][set_boxed_recorder]. +/// +/// pub struct PrometheusRecorder { inner: Arc, } +impl PrometheusRecorder { + fn add_description_if_missing(&self, key: &Key, description: Option<&'static str>) { + if let Some(description) = description { + let mut descriptions = self.inner.descriptions.write(); + if !descriptions.contains_key(key.name().as_ref()) { + descriptions.insert(key.name().to_string(), description); + } + } + } +} + /// Builder for creating and installing a Prometheus recorder/exporter. pub struct PrometheusBuilder { listen_address: SocketAddr, @@ -415,6 +462,7 @@ impl PrometheusBuilder { quantiles: self.quantiles.clone(), buckets: self.buckets.clone(), buckets_by_name: self.buckets_by_name.clone(), + descriptions: RwLock::new(HashMap::new()), }); let recorder = PrometheusRecorder { @@ -446,7 +494,8 @@ impl PrometheusBuilder { } impl Recorder for PrometheusRecorder { - fn register_counter(&self, key: Key, _description: Option<&'static str>) -> Identifier { + fn register_counter(&self, key: Key, description: Option<&'static str>) -> Identifier { + self.add_description_if_missing(&key, description); self.inner .registry() .get_or_create_identifier(CompositeKey::new(MetricKind::Counter, key), |_| { @@ -454,7 +503,8 @@ impl Recorder for PrometheusRecorder { }) } - fn register_gauge(&self, key: Key, _description: Option<&'static str>) -> Identifier { + fn register_gauge(&self, key: Key, description: Option<&'static str>) -> Identifier { + self.add_description_if_missing(&key, description); self.inner .registry() .get_or_create_identifier(CompositeKey::new(MetricKind::Gauge, key), |_| { @@ -462,7 +512,8 @@ impl Recorder for PrometheusRecorder { }) } - fn register_histogram(&self, key: Key, _description: Option<&'static str>) -> Identifier { + fn register_histogram(&self, key: Key, description: Option<&'static str>) -> Identifier { + self.add_description_if_missing(&key, description); self.inner .registry() .get_or_create_identifier(CompositeKey::new(MetricKind::Histogram, key), |_| { @@ -494,18 +545,21 @@ fn key_to_parts(key: Key) -> (String, Vec) { let sanitize = |c| c == '.' || c == '=' || c == '{' || c == '}' || c == '+' || c == '-'; let name = name.replace(sanitize, "_"); let labels = labels - .into_iter() - .map(Label::into_parts) - .map(|(k, v)| { - format!( - "{}=\"{}\"", - k, - v.replace("\\", "\\\\") - .replace("\"", "\\\"") - .replace("\n", "\\n") - ) + .map(|labels| { + labels.into_iter() + .map(Label::into_parts) + .map(|(k, v)| { + format!( + "{}=\"{}\"", + k, + v.replace("\\", "\\\\") + .replace("\"", "\\\"") + .replace("\n", "\\n") + ) + }) + .collect() }) - .collect(); + .unwrap_or_else(|| Vec::new()); (name, labels) } diff --git a/metrics-exporter-tcp/src/lib.rs b/metrics-exporter-tcp/src/lib.rs index f5ec447..346a272 100644 --- a/metrics-exporter-tcp/src/lib.rs +++ b/metrics-exporter-tcp/src/lib.rs @@ -1,3 +1,49 @@ +//! A [`metrics`][metrics]-compatible exporter that outputs metrics to clients over TCP. +//! +//! This exporter creates a TCP server, that when connected to, will stream individual metrics to +//! the client using a Protocol Buffers encoding. +//! +//! # Backpressure +//! The exporter has configurable buffering, which allows users to trade off how many metrics they +//! want to be queued up at any given time. This buffer limit applies both to incoming metrics, as +//! well as the individual buffers for each connected client. +//! +//! By default, the buffer limit is set at 1024 metrics. When the incoming buffer -- metrics being +//! fed to the exported -- is full, metrics will be dropped. If a client's buffer is full, +//! potentially due to slow network conditions or slow processing, then messages in the client's +//! buffer will be dropped in FIFO order in order to allow the exporter to continue fanning out +//! metrics to clients. +//! +//! If no buffer limit is set, then te exporter will ingest and enqueue as many metrics as possible, +//! potentially up until the point of memory exhaustion. A buffer limit is advised for this reason, +//! even if it is many multiples of the default. +//! +//! # Encoding +//! Metrics are encoded using Protocol Buffers. The protocol file can be found in the repository at +//! `proto/event.proto`. +//! +//! # Usage +//! The TCP exporter can be constructed by creating a [`TcpBuilder], configuring it as needed, and +//! calling [`TcpBuilder::install`] to both spawn the TCP server as well as install the exporter +//! globally. +//! +//! If necessary, the recorder itself can be returned so that it can be composed separately, while +//! still installing the TCP server itself, by calling [`TcpBuilder::build`]. +//! +//! ``` +//! # use metrics_exporter_tcp::TcpBuilder; +//! # fn direct() { +//! // Install the exporter directly: +//! let builder = TcpBuilder::new(); +//! builder.install().expect("failed to install TCP exporter"); +//! +//! // Or install the TCP server and get the recorder: +//! let builder = TcpBuilder::new(); +//! let recorder = builder.build().expect("failed to install TCP exporter"); +//! # } +//! ``` +//! +//! [metrics]: https://docs.rs/metrics use std::collections::{BTreeMap, HashMap, VecDeque}; use std::io::{self, Write}; use std::net::SocketAddr; @@ -49,13 +95,13 @@ impl CompositeKey { } } -// Errors that could occur while installing a TCP recorder/exporter. +/// Errors that could occur while installing a TCP recorder/exporter. #[derive(Debug)] pub enum Error { - // Creating the networking event loop did not succeed. + /// Creating the networking event loop did not succeed. Io(io::Error), - // Installing the recorder did not succeed. + /// Installing the recorder did not succeed. Recorder(SetRecorderError), } @@ -71,7 +117,8 @@ impl From for Error { } } -struct TcpRecorder { +/// A TCP recorder. +pub struct TcpRecorder { registry: Arc, tx: Sender<(Identifier, MetricValue)>, waker: Arc, @@ -129,6 +176,17 @@ impl TcpBuilder { /// An error will be returned if there's an issue with creating the TCP server or with /// installing the recorder as the global recorder. pub fn install(self) -> Result<(), Error> { + let recorder = self.build()?; + metrics::set_boxed_recorder(Box::new(recorder))?; + Ok(()) + } + + /// Builds and installs the exporter, but returns the recorder. + /// + /// In most cases, users should prefer to use [`TcpBuilder::install`] to create and install + /// the recorder and exporter automatically for them. If a caller is combining recorders, + /// however, then this method allows the caller the flexibility to do so. + pub fn build(self) -> Result { let buffer_size = self.buffer_size; let (tx, rx) = match buffer_size { None => unbounded(), @@ -149,10 +207,9 @@ impl TcpBuilder { tx, waker: Arc::clone(&waker), }; - metrics::set_boxed_recorder(Box::new(recorder))?; thread::spawn(move || run_transport(registry, poll, waker, listener, rx, buffer_size)); - Ok(()) + Ok(recorder) } } @@ -409,8 +466,12 @@ fn convert_metric_to_protobuf_encoded( let labels = ckey .key() .labels() - .map(|label| (label.key().to_string(), label.value().to_string())) - .collect::>(); + .map(|labels| { + labels + .map(|label| (label.key().to_string(), label.value().to_string())) + .collect::>() + }) + .unwrap_or_else(|| BTreeMap::new()); let mvalue = match value { MetricValue::Counter(cv) => proto::metric::Value::Counter(proto::Counter { value: cv }), MetricValue::Gauge(gv) => proto::metric::Value::Gauge(proto::Gauge { value: gv }), diff --git a/metrics-macros/src/lib.rs b/metrics-macros/src/lib.rs index 649926f..ce04547 100644 --- a/metrics-macros/src/lib.rs +++ b/metrics-macros/src/lib.rs @@ -185,10 +185,8 @@ fn get_expanded_registration( labels: Vec<(LitStr, Expr)>, ) -> TokenStream { let register_ident = format_ident!("register_{}", metric_type); - let key = key_to_quoted(key); - let insertable_labels = labels - .into_iter() - .map(|(k, v)| quote! { metrics::Label::new(#k, #v) }); + let key = key_to_quoted(key, labels); + let desc = match desc { Some(desc) => quote! { Some(#desc) }, None => quote! { None }, @@ -198,8 +196,7 @@ fn get_expanded_registration( { // Only do this work if there's a recorder installed. if let Some(recorder) = metrics::try_recorder() { - let mlabels = vec![#(#insertable_labels),*]; - recorder.#register_ident((#key, mlabels).into(), #desc); + recorder.#register_ident(#key, #desc); } } }; @@ -219,17 +216,8 @@ where { let register_ident = format_ident!("register_{}", metric_type); let op_ident = format_ident!("{}_{}", op_type, metric_type); - let key = key_to_quoted(key); - let use_fast_path = can_use_fast_path(&labels); - let composite_key = if labels.is_empty() { - quote! { #key.into() } - } else { - let insertable_labels = labels - .into_iter() - .map(|(k, v)| quote! { metrics::Label::new(#k, #v) }); - quote! { (#key, vec![#(#insertable_labels),*]).into() } - }; + let key = key_to_quoted(key, labels); let op_values = if metric_type == "histogram" { quote! { @@ -251,9 +239,8 @@ where if let Some(recorder) = metrics::try_recorder() { // Initialize our fast path cached identifier. let id = METRICS_INIT.get_or_init(|| { - recorder.#register_ident(#composite_key, None) + recorder.#register_ident(#key, None) }); - recorder.#op_ident(id, #op_values); } } @@ -266,8 +253,7 @@ where { // Only do this work if there's a recorder installed. if let Some(recorder) = metrics::try_recorder() { - let id = recorder.#register_ident(#composite_key, None); - + let id = recorder.#register_ident(#key, None); recorder.#op_ident(id, #op_values); } } @@ -288,8 +274,8 @@ fn read_key(input: &mut ParseStream) -> Result { } } -fn key_to_quoted(key: Key) -> proc_macro2::TokenStream { - match key { +fn key_to_quoted(key: Key, labels: Vec<(LitStr, Expr)>) -> proc_macro2::TokenStream { + let name = match key { Key::NotScoped(s) => { quote! { #s } }, @@ -298,6 +284,17 @@ fn key_to_quoted(key: Key) -> proc_macro2::TokenStream { format!("{}.{}", std::module_path!().replace("::", "."), #s) } }, + }; + + if labels.is_empty() { + quote! { metrics::Key::from_name(#name) } + } else { + let insertable_labels = labels + .into_iter() + .map(|(k, v)| quote! { metrics::Label::new(#k, #v) }); + quote! { + metrics::Key::from_name_and_labels(#name, vec![#(#insertable_labels),*]) + } } } diff --git a/metrics/src/key.rs b/metrics/src/key.rs index 0d4b89f..1c4e1a5 100644 --- a/metrics/src/key.rs +++ b/metrics/src/key.rs @@ -1,4 +1,4 @@ -use crate::{IntoLabels, Label, ScopedString}; +use crate::{Label, ScopedString}; use std::{fmt, slice::Iter}; /// A metric key. @@ -8,7 +8,7 @@ use std::{fmt, slice::Iter}; #[derive(PartialEq, Eq, Hash, Clone, Debug)] pub struct Key { name: ScopedString, - labels: Vec