Metrics cleanup

This commit is contained in:
Christian Kamm 2021-11-09 14:23:42 +01:00
parent 4d5f0c5d10
commit 0cdee40851
3 changed files with 110 additions and 63 deletions

View File

@ -154,12 +154,12 @@ pub async fn process_events(
let snapshot_source = config.snapshot_source.clone();
let metrics_sender = metrics_sender.clone();
tokio::spawn(async move {
let mut metric_retries = metrics_sender.register_counter(format!(
let mut metric_retries = metrics_sender.register_u64(format!(
"grpc_source_{}_connection_retries",
grpc_source.name
));
let metric_status =
metrics_sender.register_tag(format!("grpc_source_{}_status", grpc_source.name));
metrics_sender.register_string(format!("grpc_source_{}_status", grpc_source.name));
// Continuously reconnect on failure
loop {
@ -186,15 +186,13 @@ pub async fn process_events(
}
let mut latest_write = HashMap::<Vec<u8>, (u64, u64)>::new();
let mut metric_account_writes =
metrics_sender.register_rate_counter("grpc_account_writes".into());
let mut metric_account_queue =
metrics_sender.register_rate_counter("account_write_queue".into());
let mut metric_slot_queue = metrics_sender.register_rate_counter("slot_update_queue".into());
let mut metric_slot_updates = metrics_sender.register_rate_counter("grpc_slot_updates".into());
let mut metric_snapshots = metrics_sender.register_rate_counter("grpc_snapshots".into());
let mut metric_account_writes = metrics_sender.register_u64("grpc_account_writes".into());
let mut metric_account_queue = metrics_sender.register_u64("account_write_queue".into());
let mut metric_slot_queue = metrics_sender.register_u64("slot_update_queue".into());
let mut metric_slot_updates = metrics_sender.register_u64("grpc_slot_updates".into());
let mut metric_snapshots = metrics_sender.register_u64("grpc_snapshots".into());
let mut metric_snapshot_account_writes =
metrics_sender.register_rate_counter("grpc_snapshot_account_writes".into());
metrics_sender.register_u64("grpc_snapshot_account_writes".into());
loop {
let msg = msg_receiver.recv().await.expect("sender must not close");

View File

@ -7,16 +7,41 @@ use {
#[derive(Debug)]
enum Value {
Counter(Arc<atomic::AtomicI64>),
RateCounter(Arc<atomic::AtomicU64>),
Tag(Arc<Mutex<String>>),
U64(Arc<atomic::AtomicU64>),
I64(Arc<atomic::AtomicI64>),
String(Arc<Mutex<String>>),
}
#[derive(Debug)]
enum PrevValue {
U64(u64),
I64(i64),
String(String),
}
#[derive(Clone)]
pub struct MetricCounter {
pub struct MetricU64 {
value: Arc<atomic::AtomicU64>,
}
impl MetricU64 {
pub fn set(&mut self, value: u64) {
self.value.store(value, atomic::Ordering::Release);
}
pub fn increment(&mut self) {
self.value.fetch_add(1, atomic::Ordering::AcqRel);
}
pub fn decrement(&mut self) {
self.value.fetch_sub(1, atomic::Ordering::AcqRel);
}
}
#[derive(Clone)]
pub struct MetricI64 {
value: Arc<atomic::AtomicI64>,
}
impl MetricCounter {
impl MetricI64 {
pub fn set(&mut self, value: i64) {
self.value.store(value, atomic::Ordering::Release);
}
@ -31,25 +56,11 @@ impl MetricCounter {
}
#[derive(Clone)]
pub struct MetricRateCounter {
value: Arc<atomic::AtomicU64>,
}
impl MetricRateCounter {
pub fn set(&mut self, value: u64) {
self.value.store(value, atomic::Ordering::Release);
}
pub fn increment(&mut self) {
self.value.fetch_add(1, atomic::Ordering::AcqRel);
}
}
#[derive(Clone)]
pub struct MetricTag {
pub struct MetricString {
value: Arc<Mutex<String>>,
}
impl MetricTag {
impl MetricString {
pub fn set(&self, value: String) {
*self.value.lock().unwrap() = value;
}
@ -61,31 +72,31 @@ pub struct Metrics {
}
impl Metrics {
pub fn register_counter(&self, name: String) -> MetricCounter {
let value = Arc::new(atomic::AtomicI64::new(0));
self.registry
.write()
.unwrap()
.insert(name, Value::Counter(value.clone()));
MetricCounter { value }
}
pub fn register_rate_counter(&self, name: String) -> MetricRateCounter {
pub fn register_u64(&self, name: String) -> MetricU64 {
let value = Arc::new(atomic::AtomicU64::new(0));
self.registry
.write()
.unwrap()
.insert(name, Value::RateCounter(value.clone()));
MetricRateCounter { value }
.insert(name, Value::U64(value.clone()));
MetricU64 { value }
}
pub fn register_tag(&self, name: String) -> MetricTag {
pub fn register_i64(&self, name: String) -> MetricI64 {
let value = Arc::new(atomic::AtomicI64::new(0));
self.registry
.write()
.unwrap()
.insert(name, Value::I64(value.clone()));
MetricI64 { value }
}
pub fn register_string(&self, name: String) -> MetricString {
let value = Arc::new(Mutex::new(String::new()));
self.registry
.write()
.unwrap()
.insert(name, Value::Tag(value.clone()));
MetricTag { value }
.insert(name, Value::String(value.clone()));
MetricString { value }
}
}
@ -96,6 +107,7 @@ pub fn start() -> Metrics {
let registry_c = Arc::clone(&registry);
tokio::spawn(async move {
let mut previous_values = HashMap::<String, PrevValue>::new();
loop {
write_interval.tick().await;
@ -103,14 +115,54 @@ pub fn start() -> Metrics {
// acquire any interior locks.
let metrics = registry_c.read().unwrap();
for (name, value) in metrics.iter() {
let previous_value = previous_values.get_mut(name);
match value {
Value::Counter(v) => {
info!("metric: {}: {}", name, v.load(atomic::Ordering::Acquire))
Value::U64(v) => {
let new_value = v.load(atomic::Ordering::Acquire);
let previous_value = if let Some(PrevValue::U64(v)) = previous_value {
let prev = *v;
*v = new_value;
prev
} else {
previous_values.insert(name.clone(), PrevValue::U64(new_value));
0
};
let diff = new_value.wrapping_sub(previous_value) as i64;
info!("metric: {}: {} ({:+})", name, new_value, diff);
}
Value::RateCounter(v) => {
info!("metric: {}: {}", name, v.load(atomic::Ordering::Acquire))
Value::I64(v) => {
let new_value = v.load(atomic::Ordering::Acquire);
let previous_value = if let Some(PrevValue::I64(v)) = previous_value {
let prev = *v;
*v = new_value;
prev
} else {
previous_values.insert(name.clone(), PrevValue::I64(new_value));
0
};
let diff = new_value - previous_value;
info!("metric: {}: {} ({:+})", name, new_value, diff);
}
Value::String(v) => {
let new_value = v.lock().unwrap();
let previous_value = if let Some(PrevValue::String(v)) = previous_value {
let mut prev = new_value.clone();
std::mem::swap(&mut prev, v);
prev
} else {
previous_values
.insert(name.clone(), PrevValue::String(new_value.clone()));
"".into()
};
if *new_value == previous_value {
info!("metric: {}: {} (unchanged)", name, &*new_value);
} else {
info!(
"metric: {}: {} (before: {})",
name, &*new_value, previous_value
);
}
}
Value::Tag(v) => info!("metric: {}: {}", name, &*v.lock().unwrap()),
}
}
}

View File

@ -7,8 +7,8 @@ use crate::{metrics, AccountTables, AccountWrite, PostgresConfig, SlotStatus, Sl
async fn postgres_connection(
config: &PostgresConfig,
metric_retries: metrics::MetricCounter,
metric_live: metrics::MetricCounter,
metric_retries: metrics::MetricU64,
metric_live: metrics::MetricU64,
) -> Result<async_channel::Receiver<Option<tokio_postgres::Client>>, anyhow::Error> {
let (tx, rx) = async_channel::unbounded();
@ -94,8 +94,8 @@ struct SlotsProcessing {
newest_nonfinal_slot: Option<i64>,
newest_final_slot: Option<i64>,
cleanup_table_sql: Vec<String>,
metric_update_rooted: metrics::MetricRateCounter,
metric_update_uncles: metrics::MetricRateCounter,
metric_update_rooted: metrics::MetricU64,
metric_update_uncles: metrics::MetricU64,
}
impl SlotsProcessing {
@ -251,8 +251,8 @@ pub async fn init(
// slot updates are not parallel because their order matters
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
let metric_con_retries = metrics_sender.register_counter("postgres_connection_retries".into());
let metric_con_live = metrics_sender.register_counter("postgres_connections_alive".into());
let metric_con_retries = metrics_sender.register_u64("postgres_connection_retries".into());
let metric_con_live = metrics_sender.register_u64("postgres_connections_alive".into());
let postgres_slots =
postgres_connection(&config, metric_con_retries.clone(), metric_con_live.clone()).await?;
@ -314,14 +314,11 @@ pub async fn init(
newest_nonfinal_slot: None,
newest_final_slot: None,
cleanup_table_sql: Vec::<String>::new(),
metric_update_rooted: metrics_sender
.register_rate_counter("postgres_slot_update_rooted".into()),
metric_update_uncles: metrics_sender
.register_rate_counter("postgres_slot_update_uncles".into()),
metric_update_rooted: metrics_sender.register_u64("postgres_slot_update_rooted".into()),
metric_update_uncles: metrics_sender.register_u64("postgres_slot_update_uncles".into()),
};
let mut client_opt = None;
let mut metric_retries =
metrics_sender.register_rate_counter("postgres_slot_update_retries".into());
let mut metric_retries = metrics_sender.register_u64("postgres_slot_update_retries".into());
slots_processing.set_cleanup_tables(&table_names);