Add support for updating a monitoring table

That way connector interruptions can be seen directly form the target
database.
This commit is contained in:
Christian Kamm 2022-01-06 12:54:35 +01:00
parent b1f18c0b13
commit 7d8fd45de4
9 changed files with 138 additions and 19 deletions

View File

@ -26,3 +26,5 @@ retry_connection_sleep_secs = 30
fatal_connection_timeout_secs = 600
allow_invalid_certs = false
delete_old_data = true
monitoring_name = "example"
monitoring_update_frequency_secs = 30

View File

@ -8,6 +8,15 @@ CREATE TYPE "SlotStatus" AS ENUM (
'Processed'
);
CREATE TABLE monitoring (
name TEXT PRIMARY KEY,
last_update TIMESTAMP WITH TIME ZONE,
last_slot_write TIMESTAMP WITH TIME ZONE,
last_account_write_write TIMESTAMP WITH TIME ZONE,
slot_queue BIGINT,
account_write_queue BIGINT
);
CREATE TABLE pubkey (
pubkey_id BIGSERIAL PRIMARY KEY,
pubkey VARCHAR(44) NOT NULL UNIQUE

View File

@ -2,6 +2,7 @@
* Script for cleaning up the schema for PostgreSQL used for the AccountsDb plugin.
*/
DROP TABLE monitoring CASCADE;
DROP TABLE slot CASCADE;
DROP TABLE account_write CASCADE;
DROP TABLE pubkey CASCADE;

View File

@ -26,3 +26,5 @@ retry_connection_sleep_secs = 30
fatal_connection_timeout_secs = 600
allow_invalid_certs = false
delete_old_data = true
monitoring_name = "example"
monitoring_update_frequency_secs = 30

View File

@ -8,6 +8,15 @@ CREATE TYPE "SlotStatus" AS ENUM (
'Processed'
);
CREATE TABLE monitoring (
name TEXT PRIMARY KEY,
last_update TIMESTAMP WITH TIME ZONE,
last_slot_write TIMESTAMP WITH TIME ZONE,
last_account_write_write TIMESTAMP WITH TIME ZONE,
slot_queue BIGINT,
account_write_queue BIGINT
);
CREATE TABLE pubkey (
pubkey_id BIGSERIAL PRIMARY KEY,
pubkey VARCHAR(44) NOT NULL UNIQUE

View File

@ -2,6 +2,7 @@
* Script for cleaning up the schema for PostgreSQL used for the AccountsDb plugin.
*/
DROP TABLE monitoring CASCADE;
DROP TABLE slot CASCADE;
DROP TABLE account_write CASCADE;
DROP TABLE pubkey CASCADE;

View File

@ -86,6 +86,10 @@ pub struct PostgresConfig {
pub allow_invalid_certs: bool,
/// Delete old data automatically, keeping only the current data snapshot
pub delete_old_data: bool,
/// Name key to use in the monitoring table
pub monitoring_name: String,
/// Frequency with which to update the monitoring table
pub monitoring_update_frequency_secs: u64,
}
#[derive(Clone, Debug, Deserialize)]

View File

@ -24,10 +24,18 @@ pub struct MetricU64 {
value: Arc<atomic::AtomicU64>,
}
impl MetricU64 {
pub fn value(&self) -> u64 {
self.value.load(atomic::Ordering::Acquire)
}
pub fn set(&mut self, value: u64) {
self.value.store(value, atomic::Ordering::Release);
}
pub fn set_max(&mut self, value: u64) {
self.value.fetch_max(value, atomic::Ordering::AcqRel);
}
pub fn add(&mut self, value: u64) {
self.value.fetch_add(value, atomic::Ordering::AcqRel);
}
@ -77,30 +85,42 @@ pub struct Metrics {
impl Metrics {
pub fn register_u64(&self, name: String) -> MetricU64 {
let value = Arc::new(atomic::AtomicU64::new(0));
self.registry
.write()
.unwrap()
.insert(name, Value::U64(value.clone()));
MetricU64 { value }
let mut registry = self.registry.write().unwrap();
let value = registry
.entry(name)
.or_insert(Value::U64(Arc::new(atomic::AtomicU64::new(0))));
MetricU64 {
value: match value {
Value::U64(v) => v.clone(),
_ => panic!("bad metric type"),
},
}
}
pub fn register_i64(&self, name: String) -> MetricI64 {
let value = Arc::new(atomic::AtomicI64::new(0));
self.registry
.write()
.unwrap()
.insert(name, Value::I64(value.clone()));
MetricI64 { value }
let mut registry = self.registry.write().unwrap();
let value = registry
.entry(name)
.or_insert(Value::I64(Arc::new(atomic::AtomicI64::new(0))));
MetricI64 {
value: match value {
Value::I64(v) => v.clone(),
_ => panic!("bad metric type"),
},
}
}
pub fn register_string(&self, name: String) -> MetricString {
let value = Arc::new(Mutex::new(String::new()));
self.registry
.write()
.unwrap()
.insert(name, Value::String(value.clone()));
MetricString { value }
let mut registry = self.registry.write().unwrap();
let value = registry
.entry(name)
.or_insert(Value::String(Arc::new(Mutex::new(String::new()))));
MetricString {
value: match value {
Value::String(v) => v.clone(),
_ => panic!("bad metric type"),
},
}
}
}

View File

@ -3,7 +3,7 @@ use log::*;
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use postgres_query::{query, query_dyn};
use std::{collections::HashMap, time::Duration};
use std::{collections::HashMap, convert::TryFrom, time::Duration};
use crate::{metrics, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate};
@ -310,6 +310,16 @@ impl SlotsProcessing {
}
}
fn secs_since_epoch() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
}
fn epoch_secs_to_time(secs: u64) -> std::time::SystemTime {
std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs)
}
pub async fn init(
config: &PostgresConfig,
account_tables: AccountTables,
@ -341,6 +351,8 @@ pub async fn init(
let config = config.clone();
let mut metric_retries =
metrics_sender.register_u64("postgres_account_write_retries".into());
let mut metric_last_write =
metrics_sender.register_u64("postgres_account_write_last_write_timestamp".into());
tokio::spawn(async move {
let mut client_opt = None;
loop {
@ -397,6 +409,7 @@ pub async fn init(
};
break;
}
metric_last_write.set_max(secs_since_epoch());
}
});
}
@ -444,6 +457,8 @@ pub async fn init(
let receiver_c = slot_inserter_receiver.clone();
let config = config.clone();
let mut metric_retries = metrics_sender.register_u64("postgres_slot_update_retries".into());
let mut metric_last_write =
metrics_sender.register_u64("postgres_slot_last_write_timestamp".into());
let slots_processing = slots_processing.clone();
tokio::spawn(async move {
let mut client_opt = None;
@ -474,6 +489,62 @@ pub async fn init(
};
break;
}
metric_last_write.set_max(secs_since_epoch());
}
});
}
// postgres metrics/monitoring thread
{
let postgres_con =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?;
let metric_slot_last_write =
metrics_sender.register_u64("postgres_slot_last_write_timestamp".into());
let metric_account_write_last_write =
metrics_sender.register_u64("postgres_account_write_last_write_timestamp".into());
let metric_account_queue = metrics_sender.register_u64("account_write_queue".into());
let metric_slot_queue = metrics_sender.register_u64("slot_insert_queue".into());
let config = config.clone();
tokio::spawn(async move {
let mut client_opt = None;
loop {
tokio::time::sleep(Duration::from_secs(config.monitoring_update_frequency_secs))
.await;
let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await;
let last_update = std::time::SystemTime::now();
let last_slot_write = epoch_secs_to_time(metric_slot_last_write.value());
let last_account_write_write =
epoch_secs_to_time(metric_account_write_last_write.value());
let slot_queue = i64::try_from(metric_slot_queue.value()).unwrap();
let account_write_queue = i64::try_from(metric_account_queue.value()).unwrap();
let query = query!(
"INSERT INTO monitoring
(name, last_update, last_slot_write, last_account_write_write, slot_queue, account_write_queue)
VALUES
($name, $last_update, $last_slot_write, $last_account_write_write, $slot_queue, $account_write_queue)
ON CONFLICT (name) DO UPDATE SET
last_update=$last_update,
last_slot_write=$last_slot_write,
last_account_write_write=$last_account_write_write,
slot_queue=$slot_queue,
account_write_queue=$account_write_queue
",
name = config.monitoring_name,
last_update,
last_slot_write,
last_account_write_write,
slot_queue,
account_write_queue,
);
if let Err(err) = query
.execute(client)
.await
.context("updating monitoring table")
{
warn!("failed to process monitoring update: {:?}", err);
};
}
});
}