From 7d8fd45de4e479fa852e9c3f916a4c5e23b5551c Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Thu, 6 Jan 2022 12:54:35 +0100 Subject: [PATCH] Add support for updating a monitoring table That way connector interruptions can be seen directly form the target database. --- connector-mango/example-config.toml | 2 + connector-mango/scripts/create_schema.sql | 9 +++ connector-mango/scripts/drop_schema.sql | 1 + connector-raw/example-config.toml | 2 + connector-raw/scripts/create_schema.sql | 9 +++ connector-raw/scripts/drop_schema.sql | 1 + lib/src/lib.rs | 4 ++ lib/src/metrics.rs | 56 +++++++++++------ lib/src/postgres_target.rs | 73 ++++++++++++++++++++++- 9 files changed, 138 insertions(+), 19 deletions(-) diff --git a/connector-mango/example-config.toml b/connector-mango/example-config.toml index 39c015d..bd87885 100644 --- a/connector-mango/example-config.toml +++ b/connector-mango/example-config.toml @@ -26,3 +26,5 @@ retry_connection_sleep_secs = 30 fatal_connection_timeout_secs = 600 allow_invalid_certs = false delete_old_data = true +monitoring_name = "example" +monitoring_update_frequency_secs = 30 diff --git a/connector-mango/scripts/create_schema.sql b/connector-mango/scripts/create_schema.sql index d568b9d..6a3b429 100644 --- a/connector-mango/scripts/create_schema.sql +++ b/connector-mango/scripts/create_schema.sql @@ -8,6 +8,15 @@ CREATE TYPE "SlotStatus" AS ENUM ( 'Processed' ); +CREATE TABLE monitoring ( + name TEXT PRIMARY KEY, + last_update TIMESTAMP WITH TIME ZONE, + last_slot_write TIMESTAMP WITH TIME ZONE, + last_account_write_write TIMESTAMP WITH TIME ZONE, + slot_queue BIGINT, + account_write_queue BIGINT +); + CREATE TABLE pubkey ( pubkey_id BIGSERIAL PRIMARY KEY, pubkey VARCHAR(44) NOT NULL UNIQUE diff --git a/connector-mango/scripts/drop_schema.sql b/connector-mango/scripts/drop_schema.sql index 121ef6a..6af61a9 100644 --- a/connector-mango/scripts/drop_schema.sql +++ b/connector-mango/scripts/drop_schema.sql @@ -2,6 +2,7 @@ * Script for cleaning up the schema for PostgreSQL used for the AccountsDb plugin. */ +DROP TABLE monitoring CASCADE; DROP TABLE slot CASCADE; DROP TABLE account_write CASCADE; DROP TABLE pubkey CASCADE; diff --git a/connector-raw/example-config.toml b/connector-raw/example-config.toml index 73b9592..3d4e8f8 100644 --- a/connector-raw/example-config.toml +++ b/connector-raw/example-config.toml @@ -26,3 +26,5 @@ retry_connection_sleep_secs = 30 fatal_connection_timeout_secs = 600 allow_invalid_certs = false delete_old_data = true +monitoring_name = "example" +monitoring_update_frequency_secs = 30 diff --git a/connector-raw/scripts/create_schema.sql b/connector-raw/scripts/create_schema.sql index 5900bb4..21d7d8b 100644 --- a/connector-raw/scripts/create_schema.sql +++ b/connector-raw/scripts/create_schema.sql @@ -8,6 +8,15 @@ CREATE TYPE "SlotStatus" AS ENUM ( 'Processed' ); +CREATE TABLE monitoring ( + name TEXT PRIMARY KEY, + last_update TIMESTAMP WITH TIME ZONE, + last_slot_write TIMESTAMP WITH TIME ZONE, + last_account_write_write TIMESTAMP WITH TIME ZONE, + slot_queue BIGINT, + account_write_queue BIGINT +); + CREATE TABLE pubkey ( pubkey_id BIGSERIAL PRIMARY KEY, pubkey VARCHAR(44) NOT NULL UNIQUE diff --git a/connector-raw/scripts/drop_schema.sql b/connector-raw/scripts/drop_schema.sql index 79062f5..0a7b320 100644 --- a/connector-raw/scripts/drop_schema.sql +++ b/connector-raw/scripts/drop_schema.sql @@ -2,6 +2,7 @@ * Script for cleaning up the schema for PostgreSQL used for the AccountsDb plugin. */ +DROP TABLE monitoring CASCADE; DROP TABLE slot CASCADE; DROP TABLE account_write CASCADE; DROP TABLE pubkey CASCADE; diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 5f39861..6447373 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -86,6 +86,10 @@ pub struct PostgresConfig { pub allow_invalid_certs: bool, /// Delete old data automatically, keeping only the current data snapshot pub delete_old_data: bool, + /// Name key to use in the monitoring table + pub monitoring_name: String, + /// Frequency with which to update the monitoring table + pub monitoring_update_frequency_secs: u64, } #[derive(Clone, Debug, Deserialize)] diff --git a/lib/src/metrics.rs b/lib/src/metrics.rs index 50cdae2..c296747 100644 --- a/lib/src/metrics.rs +++ b/lib/src/metrics.rs @@ -24,10 +24,18 @@ pub struct MetricU64 { value: Arc, } impl MetricU64 { + pub fn value(&self) -> u64 { + self.value.load(atomic::Ordering::Acquire) + } + pub fn set(&mut self, value: u64) { self.value.store(value, atomic::Ordering::Release); } + pub fn set_max(&mut self, value: u64) { + self.value.fetch_max(value, atomic::Ordering::AcqRel); + } + pub fn add(&mut self, value: u64) { self.value.fetch_add(value, atomic::Ordering::AcqRel); } @@ -77,30 +85,42 @@ pub struct Metrics { impl Metrics { pub fn register_u64(&self, name: String) -> MetricU64 { - let value = Arc::new(atomic::AtomicU64::new(0)); - self.registry - .write() - .unwrap() - .insert(name, Value::U64(value.clone())); - MetricU64 { value } + let mut registry = self.registry.write().unwrap(); + let value = registry + .entry(name) + .or_insert(Value::U64(Arc::new(atomic::AtomicU64::new(0)))); + MetricU64 { + value: match value { + Value::U64(v) => v.clone(), + _ => panic!("bad metric type"), + }, + } } pub fn register_i64(&self, name: String) -> MetricI64 { - let value = Arc::new(atomic::AtomicI64::new(0)); - self.registry - .write() - .unwrap() - .insert(name, Value::I64(value.clone())); - MetricI64 { value } + let mut registry = self.registry.write().unwrap(); + let value = registry + .entry(name) + .or_insert(Value::I64(Arc::new(atomic::AtomicI64::new(0)))); + MetricI64 { + value: match value { + Value::I64(v) => v.clone(), + _ => panic!("bad metric type"), + }, + } } pub fn register_string(&self, name: String) -> MetricString { - let value = Arc::new(Mutex::new(String::new())); - self.registry - .write() - .unwrap() - .insert(name, Value::String(value.clone())); - MetricString { value } + let mut registry = self.registry.write().unwrap(); + let value = registry + .entry(name) + .or_insert(Value::String(Arc::new(Mutex::new(String::new())))); + MetricString { + value: match value { + Value::String(v) => v.clone(), + _ => panic!("bad metric type"), + }, + } } } diff --git a/lib/src/postgres_target.rs b/lib/src/postgres_target.rs index 2995328..a8de496 100644 --- a/lib/src/postgres_target.rs +++ b/lib/src/postgres_target.rs @@ -3,7 +3,7 @@ use log::*; use native_tls::TlsConnector; use postgres_native_tls::MakeTlsConnector; use postgres_query::{query, query_dyn}; -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, convert::TryFrom, time::Duration}; use crate::{metrics, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate}; @@ -310,6 +310,16 @@ impl SlotsProcessing { } } +fn secs_since_epoch() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() +} +fn epoch_secs_to_time(secs: u64) -> std::time::SystemTime { + std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs) +} + pub async fn init( config: &PostgresConfig, account_tables: AccountTables, @@ -341,6 +351,8 @@ pub async fn init( let config = config.clone(); let mut metric_retries = metrics_sender.register_u64("postgres_account_write_retries".into()); + let mut metric_last_write = + metrics_sender.register_u64("postgres_account_write_last_write_timestamp".into()); tokio::spawn(async move { let mut client_opt = None; loop { @@ -397,6 +409,7 @@ pub async fn init( }; break; } + metric_last_write.set_max(secs_since_epoch()); } }); } @@ -444,6 +457,8 @@ pub async fn init( let receiver_c = slot_inserter_receiver.clone(); let config = config.clone(); let mut metric_retries = metrics_sender.register_u64("postgres_slot_update_retries".into()); + let mut metric_last_write = + metrics_sender.register_u64("postgres_slot_last_write_timestamp".into()); let slots_processing = slots_processing.clone(); tokio::spawn(async move { let mut client_opt = None; @@ -474,6 +489,62 @@ pub async fn init( }; break; } + metric_last_write.set_max(secs_since_epoch()); + } + }); + } + + // postgres metrics/monitoring thread + { + let postgres_con = + postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) + .await?; + let metric_slot_last_write = + metrics_sender.register_u64("postgres_slot_last_write_timestamp".into()); + let metric_account_write_last_write = + metrics_sender.register_u64("postgres_account_write_last_write_timestamp".into()); + let metric_account_queue = metrics_sender.register_u64("account_write_queue".into()); + let metric_slot_queue = metrics_sender.register_u64("slot_insert_queue".into()); + let config = config.clone(); + tokio::spawn(async move { + let mut client_opt = None; + loop { + tokio::time::sleep(Duration::from_secs(config.monitoring_update_frequency_secs)) + .await; + + let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await; + let last_update = std::time::SystemTime::now(); + let last_slot_write = epoch_secs_to_time(metric_slot_last_write.value()); + let last_account_write_write = + epoch_secs_to_time(metric_account_write_last_write.value()); + let slot_queue = i64::try_from(metric_slot_queue.value()).unwrap(); + let account_write_queue = i64::try_from(metric_account_queue.value()).unwrap(); + let query = query!( + "INSERT INTO monitoring + (name, last_update, last_slot_write, last_account_write_write, slot_queue, account_write_queue) + VALUES + ($name, $last_update, $last_slot_write, $last_account_write_write, $slot_queue, $account_write_queue) + ON CONFLICT (name) DO UPDATE SET + last_update=$last_update, + last_slot_write=$last_slot_write, + last_account_write_write=$last_account_write_write, + slot_queue=$slot_queue, + account_write_queue=$account_write_queue + ", + name = config.monitoring_name, + last_update, + last_slot_write, + last_account_write_write, + slot_queue, + account_write_queue, + ); + if let Err(err) = query + .execute(client) + .await + .context("updating monitoring table") + { + warn!("failed to process monitoring update: {:?}", err); + }; } }); }