From 0cdee408514e2a717e306a8536d20cbcb5902326 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Tue, 9 Nov 2021 14:23:42 +0100 Subject: [PATCH] Metrics cleanup --- lib/src/grpc_plugin_source.rs | 18 ++--- lib/src/metrics.rs | 134 +++++++++++++++++++++++----------- lib/src/postgres_target.rs | 21 +++--- 3 files changed, 110 insertions(+), 63 deletions(-) diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 6fafb77..f9fdfa1 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -154,12 +154,12 @@ pub async fn process_events( let snapshot_source = config.snapshot_source.clone(); let metrics_sender = metrics_sender.clone(); tokio::spawn(async move { - let mut metric_retries = metrics_sender.register_counter(format!( + let mut metric_retries = metrics_sender.register_u64(format!( "grpc_source_{}_connection_retries", grpc_source.name )); let metric_status = - metrics_sender.register_tag(format!("grpc_source_{}_status", grpc_source.name)); + metrics_sender.register_string(format!("grpc_source_{}_status", grpc_source.name)); // Continuously reconnect on failure loop { @@ -186,15 +186,13 @@ pub async fn process_events( } let mut latest_write = HashMap::, (u64, u64)>::new(); - let mut metric_account_writes = - metrics_sender.register_rate_counter("grpc_account_writes".into()); - let mut metric_account_queue = - metrics_sender.register_rate_counter("account_write_queue".into()); - let mut metric_slot_queue = metrics_sender.register_rate_counter("slot_update_queue".into()); - let mut metric_slot_updates = metrics_sender.register_rate_counter("grpc_slot_updates".into()); - let mut metric_snapshots = metrics_sender.register_rate_counter("grpc_snapshots".into()); + let mut metric_account_writes = metrics_sender.register_u64("grpc_account_writes".into()); + let mut metric_account_queue = metrics_sender.register_u64("account_write_queue".into()); + let mut metric_slot_queue = metrics_sender.register_u64("slot_update_queue".into()); + let mut metric_slot_updates = metrics_sender.register_u64("grpc_slot_updates".into()); + let mut metric_snapshots = metrics_sender.register_u64("grpc_snapshots".into()); let mut metric_snapshot_account_writes = - metrics_sender.register_rate_counter("grpc_snapshot_account_writes".into()); + metrics_sender.register_u64("grpc_snapshot_account_writes".into()); loop { let msg = msg_receiver.recv().await.expect("sender must not close"); diff --git a/lib/src/metrics.rs b/lib/src/metrics.rs index 0829aef..59c269c 100644 --- a/lib/src/metrics.rs +++ b/lib/src/metrics.rs @@ -7,16 +7,41 @@ use { #[derive(Debug)] enum Value { - Counter(Arc), - RateCounter(Arc), - Tag(Arc>), + U64(Arc), + I64(Arc), + String(Arc>), +} + +#[derive(Debug)] +enum PrevValue { + U64(u64), + I64(i64), + String(String), } #[derive(Clone)] -pub struct MetricCounter { +pub struct MetricU64 { + value: Arc, +} +impl MetricU64 { + pub fn set(&mut self, value: u64) { + self.value.store(value, atomic::Ordering::Release); + } + + pub fn increment(&mut self) { + self.value.fetch_add(1, atomic::Ordering::AcqRel); + } + + pub fn decrement(&mut self) { + self.value.fetch_sub(1, atomic::Ordering::AcqRel); + } +} + +#[derive(Clone)] +pub struct MetricI64 { value: Arc, } -impl MetricCounter { +impl MetricI64 { pub fn set(&mut self, value: i64) { self.value.store(value, atomic::Ordering::Release); } @@ -31,25 +56,11 @@ impl MetricCounter { } #[derive(Clone)] -pub struct MetricRateCounter { - value: Arc, -} -impl MetricRateCounter { - pub fn set(&mut self, value: u64) { - self.value.store(value, atomic::Ordering::Release); - } - - pub fn increment(&mut self) { - self.value.fetch_add(1, atomic::Ordering::AcqRel); - } -} - -#[derive(Clone)] -pub struct MetricTag { +pub struct MetricString { value: Arc>, } -impl MetricTag { +impl MetricString { pub fn set(&self, value: String) { *self.value.lock().unwrap() = value; } @@ -61,31 +72,31 @@ pub struct Metrics { } impl Metrics { - pub fn register_counter(&self, name: String) -> MetricCounter { - let value = Arc::new(atomic::AtomicI64::new(0)); - self.registry - .write() - .unwrap() - .insert(name, Value::Counter(value.clone())); - MetricCounter { value } - } - - pub fn register_rate_counter(&self, name: String) -> MetricRateCounter { + pub fn register_u64(&self, name: String) -> MetricU64 { let value = Arc::new(atomic::AtomicU64::new(0)); self.registry .write() .unwrap() - .insert(name, Value::RateCounter(value.clone())); - MetricRateCounter { value } + .insert(name, Value::U64(value.clone())); + MetricU64 { value } } - pub fn register_tag(&self, name: String) -> MetricTag { + pub fn register_i64(&self, name: String) -> MetricI64 { + let value = Arc::new(atomic::AtomicI64::new(0)); + self.registry + .write() + .unwrap() + .insert(name, Value::I64(value.clone())); + MetricI64 { value } + } + + pub fn register_string(&self, name: String) -> MetricString { let value = Arc::new(Mutex::new(String::new())); self.registry .write() .unwrap() - .insert(name, Value::Tag(value.clone())); - MetricTag { value } + .insert(name, Value::String(value.clone())); + MetricString { value } } } @@ -96,6 +107,7 @@ pub fn start() -> Metrics { let registry_c = Arc::clone(®istry); tokio::spawn(async move { + let mut previous_values = HashMap::::new(); loop { write_interval.tick().await; @@ -103,14 +115,54 @@ pub fn start() -> Metrics { // acquire any interior locks. let metrics = registry_c.read().unwrap(); for (name, value) in metrics.iter() { + let previous_value = previous_values.get_mut(name); match value { - Value::Counter(v) => { - info!("metric: {}: {}", name, v.load(atomic::Ordering::Acquire)) + Value::U64(v) => { + let new_value = v.load(atomic::Ordering::Acquire); + let previous_value = if let Some(PrevValue::U64(v)) = previous_value { + let prev = *v; + *v = new_value; + prev + } else { + previous_values.insert(name.clone(), PrevValue::U64(new_value)); + 0 + }; + let diff = new_value.wrapping_sub(previous_value) as i64; + info!("metric: {}: {} ({:+})", name, new_value, diff); } - Value::RateCounter(v) => { - info!("metric: {}: {}", name, v.load(atomic::Ordering::Acquire)) + Value::I64(v) => { + let new_value = v.load(atomic::Ordering::Acquire); + let previous_value = if let Some(PrevValue::I64(v)) = previous_value { + let prev = *v; + *v = new_value; + prev + } else { + previous_values.insert(name.clone(), PrevValue::I64(new_value)); + 0 + }; + let diff = new_value - previous_value; + info!("metric: {}: {} ({:+})", name, new_value, diff); + } + Value::String(v) => { + let new_value = v.lock().unwrap(); + let previous_value = if let Some(PrevValue::String(v)) = previous_value { + let mut prev = new_value.clone(); + std::mem::swap(&mut prev, v); + prev + } else { + previous_values + .insert(name.clone(), PrevValue::String(new_value.clone())); + "".into() + }; + if *new_value == previous_value { + info!("metric: {}: {} (unchanged)", name, &*new_value); + } else { + info!( + "metric: {}: {} (before: {})", + name, &*new_value, previous_value + ); + } } - Value::Tag(v) => info!("metric: {}: {}", name, &*v.lock().unwrap()), } } } diff --git a/lib/src/postgres_target.rs b/lib/src/postgres_target.rs index 7d15662..17bfa1d 100644 --- a/lib/src/postgres_target.rs +++ b/lib/src/postgres_target.rs @@ -7,8 +7,8 @@ use crate::{metrics, AccountTables, AccountWrite, PostgresConfig, SlotStatus, Sl async fn postgres_connection( config: &PostgresConfig, - metric_retries: metrics::MetricCounter, - metric_live: metrics::MetricCounter, + metric_retries: metrics::MetricU64, + metric_live: metrics::MetricU64, ) -> Result>, anyhow::Error> { let (tx, rx) = async_channel::unbounded(); @@ -94,8 +94,8 @@ struct SlotsProcessing { newest_nonfinal_slot: Option, newest_final_slot: Option, cleanup_table_sql: Vec, - metric_update_rooted: metrics::MetricRateCounter, - metric_update_uncles: metrics::MetricRateCounter, + metric_update_rooted: metrics::MetricU64, + metric_update_uncles: metrics::MetricU64, } impl SlotsProcessing { @@ -251,8 +251,8 @@ pub async fn init( // slot updates are not parallel because their order matters let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); - let metric_con_retries = metrics_sender.register_counter("postgres_connection_retries".into()); - let metric_con_live = metrics_sender.register_counter("postgres_connections_alive".into()); + let metric_con_retries = metrics_sender.register_u64("postgres_connection_retries".into()); + let metric_con_live = metrics_sender.register_u64("postgres_connections_alive".into()); let postgres_slots = postgres_connection(&config, metric_con_retries.clone(), metric_con_live.clone()).await?; @@ -314,14 +314,11 @@ pub async fn init( newest_nonfinal_slot: None, newest_final_slot: None, cleanup_table_sql: Vec::::new(), - metric_update_rooted: metrics_sender - .register_rate_counter("postgres_slot_update_rooted".into()), - metric_update_uncles: metrics_sender - .register_rate_counter("postgres_slot_update_uncles".into()), + metric_update_rooted: metrics_sender.register_u64("postgres_slot_update_rooted".into()), + metric_update_uncles: metrics_sender.register_u64("postgres_slot_update_uncles".into()), }; let mut client_opt = None; - let mut metric_retries = - metrics_sender.register_rate_counter("postgres_slot_update_retries".into()); + let mut metric_retries = metrics_sender.register_u64("postgres_slot_update_retries".into()); slots_processing.set_cleanup_tables(&table_names);