service-mango-health: monitors/historizes accounts health (#890)

* service-mango-health:  add a new service that compute health for all mango accounts and persist an history in postgres sql

(cherry picked from commit 6aa8a883f4)
This commit is contained in:
Serge Farny 2024-02-29 17:05:29 +01:00
parent cacf668999
commit a06801db5c
26 changed files with 1742 additions and 60 deletions

View File

@ -15,6 +15,7 @@ on:
'bin/service-mango-fills/**',
'bin/service-mango-orderbook/**',
'bin/service-mango-pnl/**',
'bin/service-mango-health/**',
]
workflow_dispatch:

66
Cargo.lock generated
View File

@ -5982,6 +5982,7 @@ dependencies = [
"serde_derive",
"serde_json",
"serum_dex 0.5.10 (git+https://github.com/openbook-dex/program.git)",
"services-mango-lib",
"solana-client",
"solana-logger",
"solana-sdk",
@ -5993,6 +5994,53 @@ dependencies = [
"ws",
]
[[package]]
name = "service-mango-health"
version = "0.1.0"
dependencies = [
"anchor-client",
"anchor-lang",
"anyhow",
"async-channel",
"async-trait",
"base64 0.21.4",
"bs58 0.3.1",
"bytemuck",
"chrono",
"fixed 1.11.0 (git+https://github.com/blockworks-foundation/fixed.git?branch=v1.11.0-borsh0_10-mango)",
"futures 0.3.28",
"futures-channel",
"futures-core",
"futures-util",
"itertools",
"jemallocator",
"log 0.4.20",
"mango-feeds-connector",
"mango-feeds-lib",
"mango-v4",
"mango-v4-client",
"native-tls",
"postgres-native-tls",
"postgres-types",
"postgres_query",
"rustls 0.20.9",
"serde",
"serde_derive",
"serde_json",
"serum_dex 0.5.10 (git+https://github.com/openbook-dex/program.git)",
"services-mango-lib",
"solana-client",
"solana-logger",
"solana-sdk",
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-tungstenite 0.17.2",
"toml",
"tracing",
"ws",
]
[[package]]
name = "service-mango-orderbook"
version = "0.1.0"
@ -6051,6 +6099,24 @@ dependencies = [
"toml",
]
[[package]]
name = "services-mango-lib"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.21.4",
"native-tls",
"postgres-native-tls",
"postgres-types",
"postgres_query",
"rustls 0.20.9",
"serde",
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
"tracing",
]
[[package]]
name = "sgx-quote"
version = "0.1.0"

View File

@ -27,6 +27,7 @@ COPY --from=build /app/target/release/service-mango-* /usr/local/bin/
COPY --from=build /app/bin/service-mango-pnl/conf/template-config.toml ./pnl-config.toml
COPY --from=build /app/bin/service-mango-fills/conf/template-config.toml ./fills-config.toml
COPY --from=build /app/bin/service-mango-orderbook/conf/template-config.toml ./orderbook-config.toml
COPY --from=build /app/bin/service-mango-health/conf/template-config.toml ./health-config.toml
COPY --from=build /app/bin/service-mango-pnl/conf/template-config.toml ./pnl-config.toml
COPY --from=build /app/bin/service-mango-fills/conf//template-config.toml ./fills-config.toml

View File

@ -6,6 +6,7 @@ edition = "2018"
license = "AGPL-3.0-or-later"
[dependencies]
services-mango-lib = { path = "../../lib/services-mango-lib" }
mango-feeds-lib = { path = "../../lib/mango-feeds-lib" }
mango-feeds-connector = { workspace = true }

View File

@ -2,12 +2,10 @@ use crate::postgres_config::PostgresConfig;
use chrono::{TimeZone, Utc};
use log::*;
use mango_feeds_connector::metrics::{MetricType, MetricU64, Metrics};
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use postgres_query::Caching;
use service_mango_fills::*;
use services_mango_lib::postgres_configuration::PostgresConfiguration;
use std::{
env, fs,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
@ -24,56 +22,15 @@ async fn postgres_connection(
) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> {
let (tx, rx) = async_channel::unbounded();
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
let tls = match &config.tls {
Some(tls) => {
use base64::{engine::general_purpose, Engine as _};
let ca_cert = match &tls.ca_cert_path.chars().next().unwrap() {
'$' => general_purpose::STANDARD
.decode(
env::var(&tls.ca_cert_path[1..])
.expect("reading client cert from env")
.into_bytes(),
)
.expect("decoding client cert"),
_ => fs::read(&tls.ca_cert_path).expect("reading client cert from file"),
};
let client_key = match &tls.client_key_path.chars().next().unwrap() {
'$' => general_purpose::STANDARD
.decode(
env::var(&tls.client_key_path[1..])
.expect("reading client key from env")
.into_bytes(),
)
.expect("decoding client key"),
_ => fs::read(&tls.client_key_path).expect("reading client key from file"),
};
MakeTlsConnector::new(
TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
.identity(Identity::from_pkcs12(&client_key, "pass")?)
.danger_accept_invalid_certs(config.allow_invalid_certs)
.build()?,
)
}
None => MakeTlsConnector::new(
TlsConnector::builder()
.danger_accept_invalid_certs(config.allow_invalid_certs)
.build()?,
),
let config = config.clone();
let lib_config = PostgresConfiguration {
connection_string: config.connection_string.clone(),
allow_invalid_certs: config.allow_invalid_certs,
tls: config.tls.clone(),
};
let config = config.clone();
let connection_string = match &config.connection_string.chars().next().unwrap() {
'$' => {
env::var(&config.connection_string[1..]).expect("reading connection string from env")
}
_ => config.connection_string.clone(),
};
let mut initial = Some(tokio_postgres::connect(&connection_string, tls.clone()).await?);
let mut initial = Some(services_mango_lib::postgres_connection::connect(&lib_config).await?);
let mut metric_retries = metric_retries;
let mut metric_live = metric_live;
tokio::spawn(async move {
@ -86,7 +43,8 @@ async fn postgres_connection(
let (client, connection) = match initial.take() {
Some(v) => v,
None => {
let result = tokio_postgres::connect(&connection_string, tls.clone()).await;
let result =
services_mango_lib::postgres_connection::connect(&lib_config).await;
match result {
Ok(v) => v,
Err(err) => {

View File

@ -1,4 +1,5 @@
use serde_derive::Deserialize;
use services_mango_lib::postgres_configuration::PostgresTlsConfig;
#[derive(Clone, Debug, Deserialize)]
pub struct PostgresConfig {
@ -21,11 +22,3 @@ pub struct PostgresConfig {
pub allow_invalid_certs: bool,
pub tls: Option<PostgresTlsConfig>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct PostgresTlsConfig {
/// CA Cert file or env var
pub ca_cert_path: String,
/// PKCS12 client cert path
pub client_key_path: String,
}

View File

@ -0,0 +1,60 @@
[package]
name = "service-mango-health"
version = "0.1.0"
authors = ["Christian Kamm <mail@ckamm.de>", "Maximilian Schneider <max@mango.markets>", "Serge Farny <serge.farny@gmail.com>"]
edition = "2018"
license = "AGPL-3.0-or-later"
[dependencies]
mango-feeds-lib = { path = "../../lib/mango-feeds-lib" }
mango-feeds-connector = { workspace = true }
services-mango-lib = { path = "../../lib/services-mango-lib" }
solana-client = { workspace = true }
solana-logger = { workspace = true }
solana-sdk = { workspace = true }
anchor-lang = { workspace = true }
anchor-client = { workspace = true }
fixed = { workspace = true, features = ["serde", "borsh"] }
mango-v4 = { path = "../../programs/mango-v4", features = ["client"] }
mango-v4-client = { path = "../../lib/client" }
serum_dex = { workspace = true }
bs58 = "0.3.1"
log = "0.4"
anyhow = "1.0"
toml = "0.5"
serde = "1.0.130"
serde_derive = "1.0.130"
serde_json = "1.0.68"
futures = "0.3.17"
futures-core = "0.3"
futures-channel = "0.3"
futures-util = "0.3"
ws = "^0.9.2"
async-channel = "1.6"
async-trait = "0.1"
bytemuck = "^1.7.2"
itertools = "0.10.3"
jemallocator = "0.3.2"
chrono = "0.4.23"
base64 = "0.21"
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
tokio-postgres-rustls = "0.9.0"
postgres-types = { version = "0.2", features = ["array-impls", "derive", "with-chrono-0_4"] }
postgres-native-tls = "0.5"
native-tls = "0.2"
rustls = "0.20.8"
# postgres_query hasn't updated its crate in a while
postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" }
tracing = { version = "0.1", features = ["log"] }

View File

@ -0,0 +1,4 @@
# service-mango-health
This service monitors all mango accounts health (publish and persists health ratio)

View File

@ -0,0 +1,27 @@
rpc_ws_url = "wss://mango.rpcpool.com/<TOKEN>"
rpc_http_url = "http://mango.rpcpool.com/<TOKEN>"
mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX"
# [postgres]
# connection_string = "$PG_CONNECTION_STRING"
# allow_invalid_certs = true
# max_retry_count = 2
# # [postgres.tls]
# # ca_cert_path = "$PG_CA_CERT"
# # client_key_path = "$PG_CLIENT_KEY"
[computing_configuration]
recompute_interval_ms = 100
[logging_configuration]
log_health_to_stdout = true
#log_health_for_accounts = ["xxx"]
[persistence_configuration]
enabled = true
history_time_to_live_secs = 2678400 # 31 days
persist_max_periodicity_secs = 60
snapshot_queue_length = 30 # 30 * persist_max_periodicity_secs secs of backup in queue
max_failure_duration_secs = 3600
max_retry_count = 3

View File

@ -0,0 +1,26 @@
rpc_http_url = "$RPC_HTTP_URL"
rpc_ws_url = "$RPC_WS_URL"
mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX"
[postgres]
connection_string = "$PG_CONNECTION_STRING"
max_retry_count = 2
allow_invalid_certs = true
[postgres.tls]
ca_cert_path = "$PG_CA_CERT"
client_key_path = "$PG_CLIENT_KEY"
[computing_configuration]
recompute_interval_ms = 100
[logging_configuration]
log_health_to_stdout = false
[persistence_configuration]
enabled = true
history_time_to_live_secs = 2678400 # 31 days
persist_max_periodicity_secs = 60
snapshot_queue_length = 30 # 30 * persist_max_periodicity_secs secs of backup in queue
max_failure_duration_secs = 3600
max_retry_count = 3

View File

@ -0,0 +1,26 @@
CREATE SCHEMA IF NOT EXISTS mango_monitoring AUTHORIZATION CURRENT_ROLE;
CREATE TABLE IF NOT EXISTS mango_monitoring.health_history
(
Pubkey VARCHAR(44) NOT NULL,
Timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
MaintenanceRatio DOUBLE PRECISION,
Maintenance DOUBLE PRECISION,
Initial DOUBLE PRECISION,
LiquidationEnd DOUBLE PRECISION,
IsBeingLiquidated BOOLEAN
);
CREATE MATERIALIZED VIEW mango_monitoring.health_current AS
SELECT DISTINCT ON (pubkey)
*
FROM mango_monitoring.health_history
ORDER BY pubkey, timestamp DESC;
CREATE INDEX health_history_pubkey_index ON mango_monitoring.health_history
(
Pubkey ASC,
Timestamp ASC
);
CREATE INDEX health_history_timestamp_index ON mango_monitoring.health_history
(
Timestamp ASC
);

View File

@ -0,0 +1,35 @@
use serde_derive::Deserialize;
use services_mango_lib::postgres_configuration::PostgresConfiguration;
use std::collections::HashSet;
#[derive(Clone, Debug, Deserialize)]
pub struct Configuration {
pub postgres: Option<PostgresConfiguration>,
pub rpc_http_url: String,
pub rpc_ws_url: String,
pub mango_group: String,
pub computing_configuration: ComputingConfiguration,
pub logging_configuration: LoggingConfiguration,
pub persistence_configuration: PersistenceConfiguration,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ComputingConfiguration {
pub recompute_interval_ms: u64,
}
#[derive(Clone, Debug, Deserialize)]
pub struct LoggingConfiguration {
pub log_health_to_stdout: bool,
pub log_health_for_accounts: Option<HashSet<String>>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct PersistenceConfiguration {
pub enabled: bool,
pub history_time_to_live_secs: i64,
pub persist_max_periodicity_secs: i64,
pub max_failure_duration_secs: i64,
pub max_retry_count: u64,
pub snapshot_queue_length: usize,
}

View File

@ -0,0 +1,87 @@
mod configuration;
mod processors;
mod utils;
use futures_util::StreamExt;
// use mango_feeds_connector::metrics;
use mango_v4_client::tracing_subscriber_init;
use std::fs::File;
use std::io::Read;
use std::sync::atomic::Ordering;
use crate::configuration::Configuration;
use crate::processors::data::DataProcessor;
use crate::processors::exit::ExitProcessor;
use crate::processors::health::HealthProcessor;
use crate::processors::logger::LoggerProcessor;
use crate::processors::persister::PersisterProcessor;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
eprintln!("Please enter a config file path argument.");
return Ok(());
}
let configuration: Configuration = {
let mut file = File::open(&args[1])?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
toml::from_str(&contents).unwrap()
};
tracing_subscriber_init();
// TODO FAS Add metrics
// let metrics_tx = metrics::start(configuration.metrics.clone(), "health".into());
let exit_processor = ExitProcessor::init().await?;
let data_processor: DataProcessor =
DataProcessor::init(&configuration, exit_processor.exit.clone()).await?;
let health_processor = HealthProcessor::init(
&data_processor.channel,
data_processor.chain_data.clone(),
&configuration,
exit_processor.exit.clone(),
)
.await?;
let logger = LoggerProcessor::init(
&health_processor.channel,
&configuration,
exit_processor.exit.clone(),
)
.await?;
let persister = PersisterProcessor::init(
&health_processor.channel,
&configuration,
exit_processor.exit.clone(),
)
.await?;
let mut jobs = vec![exit_processor.job, data_processor.job, health_processor.job];
if let Some(logger) = logger {
jobs.push(logger.job)
}
if let Some(persister) = persister {
jobs.push(persister.job)
}
let mut jobs: futures::stream::FuturesUnordered<_> = jobs.into_iter().collect();
while let Some(_) = jobs.next().await {
// if any job exit, stop the others threads & wait
exit_processor.exit.store(true, Ordering::Relaxed);
}
// for now, we force exit here because websocket connection to RPC is not properly closed on exit
tracing::warn!("killing process");
std::process::exit(0x0100);
}

View File

@ -0,0 +1,210 @@
use crate::configuration::Configuration;
use crate::processors::data::DataEvent::{AccountUpdate, Other, Snapshot};
use async_channel::Receiver;
use chrono::Utc;
use itertools::Itertools;
use mango_v4_client::account_update_stream::Message;
use mango_v4_client::snapshot_source::is_mango_account;
use mango_v4_client::{
account_update_stream, chain_data, snapshot_source, websocket_source, MangoGroupContext,
};
use services_mango_lib::fail_or_retry;
use services_mango_lib::retry_counter::RetryCounter;
use solana_client::nonblocking::rpc_client::RpcClient as RpcClientAsync;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::task::JoinHandle;
use tracing::warn;
pub struct DataProcessor {
pub channel: tokio::sync::broadcast::Sender<DataEvent>,
pub job: JoinHandle<()>,
pub chain_data: Arc<RwLock<chain_data::ChainData>>,
}
#[derive(Clone, Debug)]
pub enum DataEvent {
Other,
Snapshot(SnapshotEvent),
AccountUpdate(AccountUpdateEvent),
}
#[derive(Clone, Debug)]
pub struct SnapshotEvent {
pub received_at: chrono::DateTime<Utc>,
pub accounts: Vec<Pubkey>,
}
#[derive(Clone, Debug)]
pub struct AccountUpdateEvent {
pub received_at: chrono::DateTime<Utc>,
pub account: Pubkey,
}
impl DataProcessor {
pub async fn init(
configuration: &Configuration,
exit: Arc<AtomicBool>,
) -> anyhow::Result<DataProcessor> {
let mut retry_counter = RetryCounter::new(2);
let mango_group = Pubkey::from_str(&configuration.mango_group)?;
let mango_stream =
fail_or_retry!(retry_counter, Self::init_mango_source(configuration).await)?;
let (sender, _) = tokio::sync::broadcast::channel(8192);
let sender_clone = sender.clone();
// The representation of current on-chain account data
let chain_data = Arc::new(RwLock::new(chain_data::ChainData::new()));
let chain_data_clone = chain_data.clone();
let job = tokio::spawn(async move {
loop {
if exit.load(Ordering::Relaxed) {
warn!("shutting down data processor...");
break;
}
tokio::select! {
Ok(msg) = mango_stream.recv() => {
let received_at = Utc::now();
msg.update_chain_data(&mut chain_data_clone.write().unwrap());
if sender_clone.receiver_count() == 0 {
continue;
}
let event = Self::parse_message(msg, received_at, mango_group);
if event.is_none() {
continue;
}
let res = sender_clone.send(event.unwrap());
if res.is_err() {
break;
}
},
else => {
warn!("mango update channel err");
break;
}
}
}
});
let result = DataProcessor {
channel: sender,
job,
chain_data,
};
Ok(result)
}
fn new_rpc_async(configuration: &Configuration) -> RpcClientAsync {
let commitment = CommitmentConfig::processed();
RpcClientAsync::new_with_timeout_and_commitment(
configuration.rpc_http_url.clone(),
Duration::from_secs(60),
commitment,
)
}
fn parse_message(
message: Message,
received_at: chrono::DateTime<Utc>,
mango_group: Pubkey,
) -> Option<DataEvent> {
match message {
Message::Account(account_write) => {
if is_mango_account(&account_write.account, &mango_group).is_some() {
return Some(AccountUpdate(AccountUpdateEvent {
account: account_write.pubkey,
received_at,
}));
}
}
Message::Snapshot(snapshot) => {
let mut result = Vec::new();
for update in snapshot.iter() {
if is_mango_account(&update.account, &mango_group).is_some() {
result.push(update.pubkey);
}
}
return Some(Snapshot(SnapshotEvent {
accounts: result,
received_at,
}));
}
_ => {}
};
return Some(Other);
}
async fn init_mango_source(configuration: &Configuration) -> anyhow::Result<Receiver<Message>> {
//
// Client setup
//
let rpc_async = Self::new_rpc_async(configuration);
let mango_group = Pubkey::from_str(&configuration.mango_group)?;
let group_context = MangoGroupContext::new_from_rpc(&rpc_async, mango_group).await?;
let mango_oracles = group_context
.tokens
.values()
.map(|value| value.oracle)
.chain(group_context.perp_markets.values().map(|p| p.oracle))
.unique()
.collect::<Vec<Pubkey>>();
let serum_programs = group_context
.serum3_markets
.values()
.map(|s3| s3.serum_program)
.unique()
.collect_vec();
let (account_update_sender, account_update_receiver) =
async_channel::unbounded::<account_update_stream::Message>();
websocket_source::start(
websocket_source::Config {
rpc_ws_url: configuration.rpc_ws_url.clone(),
serum_programs,
open_orders_authority: mango_group,
},
mango_oracles.clone(),
account_update_sender.clone(),
);
let first_websocket_slot = websocket_source::get_next_create_bank_slot(
account_update_receiver.clone(),
Duration::from_secs(10),
)
.await?;
// Getting solana account snapshots via jsonrpc
// FUTURE: of what to fetch a snapshot - should probably take as an input
snapshot_source::start(
snapshot_source::Config {
rpc_http_url: configuration.rpc_http_url.clone(),
mango_group,
get_multiple_accounts_count: 100,
parallel_rpc_requests: 10,
snapshot_interval: Duration::from_secs(5 * 60),
min_slot: first_websocket_slot + 10,
},
mango_oracles,
account_update_sender,
);
Ok(account_update_receiver)
}
}

View File

@ -0,0 +1,39 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::{info, warn};
pub struct ExitProcessor {
pub job: JoinHandle<()>,
pub exit: Arc<AtomicBool>,
}
impl ExitProcessor {
pub async fn init() -> anyhow::Result<ExitProcessor> {
let exit: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let job = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(1000));
loop {
if exit_clone.load(Ordering::Relaxed) {
warn!("shutting down logger processor...");
break;
}
tokio::select! {
_ = interval.tick() => {}
_ = tokio::signal::ctrl_c()=> {
info!("Received SIGINT, shutting down...");
exit_clone.store(true, Ordering::Relaxed);
break;
}
}
}
warn!("shutting down exit processor...");
});
let result = ExitProcessor { job, exit };
Ok(result)
}
}

View File

@ -0,0 +1,176 @@
use crate::configuration::Configuration;
use crate::processors::data::DataEvent;
use chrono::Utc;
use mango_v4::health::HealthType;
use mango_v4_client::chain_data::AccountFetcher;
use mango_v4_client::{chain_data, health_cache, MangoGroupContext};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Instant;
use tokio::task::JoinHandle;
use tracing::warn;
pub struct HealthProcessor {
pub channel: tokio::sync::broadcast::Sender<HealthEvent>,
pub job: JoinHandle<()>,
}
#[derive(Clone, Debug)]
pub struct HealthEvent {
pub computed_at: chrono::DateTime<Utc>,
pub components: Vec<HealthComponent>,
}
#[derive(Clone, Debug)]
pub struct HealthComponent {
pub account: Pubkey,
pub value: Option<HealthComponentValue>,
}
#[derive(Clone, Debug)]
pub struct HealthComponentValue {
pub maintenance_ratio: f64,
pub initial_health: f64,
pub maintenance_health: f64,
pub liquidation_end_health: f64,
pub is_being_liquidated: bool,
}
impl HealthProcessor {
pub async fn init(
data_sender: &tokio::sync::broadcast::Sender<DataEvent>,
chain_data: Arc<RwLock<chain_data::ChainData>>,
configuration: &Configuration,
exit: Arc<AtomicBool>,
) -> anyhow::Result<HealthProcessor> {
let (sender, _) = tokio::sync::broadcast::channel(8192);
let sender_clone = sender.clone();
let mut data = data_sender.subscribe();
let mut accounts = HashSet::<Pubkey>::new();
let mut snapshot_received = false;
let mut last_recompute = Instant::now();
let recompute_interval = std::time::Duration::from_millis(
configuration.computing_configuration.recompute_interval_ms,
);
let account_fetcher = chain_data::AccountFetcher {
chain_data: chain_data.clone(),
rpc: RpcClient::new(configuration.rpc_http_url.clone()),
};
let mango_group_context = MangoGroupContext::new_from_rpc(
&account_fetcher.rpc,
Pubkey::from_str(&configuration.mango_group)?,
)
.await?;
let job = tokio::spawn(async move {
loop {
if exit.load(Ordering::Relaxed) {
warn!("shutting down health processor...");
break;
}
tokio::select! {
Ok(msg) = data.recv() => {
match msg {
DataEvent::AccountUpdate(upd) => {
accounts.insert(upd.account);
},
DataEvent::Snapshot(snap) => {
for account in snap.accounts {
accounts.insert(account);
}
snapshot_received = true;
},
DataEvent::Other => {
}
}
if sender_clone.receiver_count() == 0 {
continue;
}
if snapshot_received && last_recompute.elapsed() >= recompute_interval {
last_recompute = Instant::now();
let health_event = Self::compute_health(&mango_group_context,
&account_fetcher,
&accounts).await;
let res = sender_clone.send(health_event);
if res.is_err() {
break;
}
}
},
else => {
warn!("data update channel err");
break;
}
}
}
});
let result = HealthProcessor {
channel: sender,
job,
};
Ok(result)
}
async fn compute_health(
mango_group_context: &MangoGroupContext,
account_fetcher: &AccountFetcher,
accounts: &HashSet<Pubkey>,
) -> HealthEvent {
let computed_at = Utc::now();
let mut components = Vec::new();
for account in accounts {
let value =
Self::compute_account_health(&mango_group_context, account_fetcher, &account).await;
components.push({
HealthComponent {
account: *account,
value: value.ok(),
}
})
}
HealthEvent {
computed_at,
components,
}
}
async fn compute_account_health(
mango_group_context: &&MangoGroupContext,
account_fetcher: &AccountFetcher,
account: &Pubkey,
) -> anyhow::Result<HealthComponentValue> {
let mango_account = account_fetcher.fetch_mango_account(account)?;
let health_cache = health_cache::new(
&mango_group_context,
&*account_fetcher,
&mango_account,
)
.await?;
let res = HealthComponentValue {
maintenance_ratio: health_cache.health_ratio(HealthType::Maint).to_num(),
initial_health: health_cache.health(HealthType::Init).to_num(),
maintenance_health: health_cache.health(HealthType::Maint).to_num(),
liquidation_end_health: health_cache.health(HealthType::LiquidationEnd).to_num(),
is_being_liquidated: mango_account.fixed.being_liquidated(),
};
Ok(res)
}
}

View File

@ -0,0 +1,81 @@
use crate::configuration::Configuration;
use crate::processors::health::{HealthComponentValue, HealthEvent};
use solana_sdk::pubkey::Pubkey;
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::task::JoinHandle;
use tracing::{info, warn};
pub struct LoggerProcessor {
pub job: JoinHandle<()>,
}
impl LoggerProcessor {
pub async fn init(
data_sender: &tokio::sync::broadcast::Sender<HealthEvent>,
configuration: &Configuration,
exit: Arc<AtomicBool>,
) -> anyhow::Result<Option<LoggerProcessor>> {
let enable_logging = configuration.logging_configuration.log_health_to_stdout;
if !enable_logging {
return Ok(None);
}
let mut data = data_sender.subscribe();
let filter: HashSet<Pubkey> = configuration
.logging_configuration
.log_health_for_accounts
.clone()
.unwrap_or_default()
.iter()
.map(|s| Pubkey::from_str(s).unwrap())
.collect();
let job = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(1000));
loop {
if exit.load(Ordering::Relaxed) {
warn!("shutting down logger processor...");
break;
}
tokio::select! {
_ = interval.tick() => {
},
Ok(msg) = data.recv() => {
for component in msg.components {
if !filter.is_empty() && !filter.contains(&component.account) {
continue;
}
if component.value.is_some() {
let value: HealthComponentValue = component.value.unwrap();
info!(
computed_at = %msg.computed_at,
account = %component.account,
maintenance_ratio = %value.maintenance_ratio,
initial_health = %value.initial_health,
maintenance_health = %value.maintenance_health,
liquidation_end_health = %value.liquidation_end_health,
is_being_liquidated = %value.is_being_liquidated,
)
} else {
info!(
computed_at = %msg.computed_at,
account = %component.account,
error = "Missing health data"
)
}
}
},
}
}
});
let result = LoggerProcessor { job };
Ok(Some(result))
}
}

View File

@ -0,0 +1,5 @@
pub mod data;
pub mod exit;
pub mod health;
pub mod logger;
pub mod persister;

View File

@ -0,0 +1,712 @@
use crate::configuration::Configuration;
use crate::processors::health::{HealthComponent, HealthEvent};
use anchor_lang::prelude::Pubkey;
use chrono::{Duration, DurationRound, Utc};
use futures_util::pin_mut;
use postgres_types::{ToSql, Type};
use services_mango_lib::fail_or_retry;
use services_mango_lib::postgres_configuration::PostgresConfiguration;
use services_mango_lib::postgres_connection;
use services_mango_lib::retry_counter::RetryCounter;
use std::collections::{HashMap, VecDeque};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio_postgres::binary_copy::BinaryCopyInWriter;
use tokio_postgres::{Client, Transaction};
use tracing::{error, warn};
pub struct PersisterProcessor {
pub job: JoinHandle<()>,
}
impl PersisterProcessor {
pub async fn init(
data_sender: &tokio::sync::broadcast::Sender<HealthEvent>,
configuration: &Configuration,
exit: Arc<AtomicBool>,
) -> anyhow::Result<Option<PersisterProcessor>> {
let postgres_configuration = configuration.postgres.clone().unwrap_or_default();
let persistence_configuration = configuration.persistence_configuration.clone();
let time_to_live = Duration::seconds(persistence_configuration.history_time_to_live_secs);
let periodicity = Duration::seconds(persistence_configuration.persist_max_periodicity_secs);
let max_snapshot_count = persistence_configuration.snapshot_queue_length;
let max_failure_duration =
Duration::seconds(persistence_configuration.max_failure_duration_secs);
if !persistence_configuration.enabled {
return Ok(None);
}
let mut data = data_sender.subscribe();
let mut unpersisted_snapshots = VecDeque::new();
let job = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(1000));
let mut retry_counter = RetryCounter::new(persistence_configuration.max_retry_count);
let mut last_successful_persistence = chrono::Utc::now();
let mut connection = match fail_or_retry!(
retry_counter,
postgres_connection::connect(&postgres_configuration).await
) {
Err(e) => {
tracing::error!("Failed to connect to postgres sql: {}", e);
return;
}
Ok(cnt) => cnt.0,
};
let mut previous =
match fail_or_retry!(retry_counter, Self::load_previous(&connection).await) {
Ok(prv) => prv,
Err(e) => {
error!("loading of previous state failed: {}", e);
return;
}
};
loop {
if exit.load(Ordering::Relaxed) {
warn!("shutting down persister processor...");
break;
}
tokio::select! {
_ = interval.tick() => {
},
Ok(event) = data.recv() => {
Self::store_snapshot(
&previous,
&mut unpersisted_snapshots,
&event,
periodicity,
max_snapshot_count,
);
if let Err(e) = retry_counter.fail_or_ignore(
Self::persist_all_snapshots_and_update_state(
&mut connection,
&mut previous,
&mut unpersisted_snapshots,
time_to_live,
)
.await,
) {
error!("persistence failed (for {}): {}", chrono::Utc::now() - last_successful_persistence, e);
match Self::try_to_reconnect(&postgres_configuration).await {
Ok(client) => {
connection = client;
}
Err(e) => {
if chrono::Utc::now() - last_successful_persistence
> max_failure_duration
{
error!("failed to reconnect (after multiple retries): {}", e);
break; // Shutdown processor
}
}
};
}
if unpersisted_snapshots.is_empty() {
last_successful_persistence = chrono::Utc::now();
}
}
}
}
});
let result = PersisterProcessor { job };
Ok(Some(result))
}
fn build_persisted_data(
computed_at: chrono::DateTime<Utc>,
component: &HealthComponent,
) -> PersistedData {
match &component.value {
Some(value) => PersistedData {
computed_at: computed_at,
maintenance_ratio: Some(value.maintenance_ratio),
initial_health: Some(value.initial_health),
maintenance_health: Some(value.maintenance_health),
liquidation_end_health: Some(value.liquidation_end_health),
is_being_liquidated: Some(value.is_being_liquidated),
},
None => PersistedData {
computed_at: computed_at,
maintenance_ratio: None,
initial_health: None,
maintenance_health: None,
liquidation_end_health: None,
is_being_liquidated: None,
},
}
}
fn store_snapshot(
previous: &HashMap<Pubkey, PersistedData>,
snapshots: &mut VecDeque<PersisterSnapshot>,
event: &HealthEvent,
periodicity: chrono::Duration,
max_snapshot_count: usize,
) {
let bucket = event
.computed_at
.duration_round(periodicity)
.unwrap_or(chrono::DateTime::<Utc>::MIN_UTC);
let mut previous_snapshot = &PersisterSnapshot {
bucket,
value: HashMap::new(),
};
if !snapshots.is_empty() {
previous_snapshot = &snapshots[snapshots.len() - 1];
}
let updates = event
.components
.iter()
.filter_map(|component| {
let persisted_data = Self::build_persisted_data(event.computed_at, &component);
let should_insert_new_point = Self::should_insert(
&previous,
&component.account,
&persisted_data,
periodicity,
);
let should_update_exising_point =
previous_snapshot.value.contains_key(&component.account);
(should_insert_new_point || should_update_exising_point)
.then(|| (component.account, persisted_data))
})
.collect();
if let Some(existing_snapshot_for_bucket) = (*snapshots)
.iter_mut()
.find(|s| s.bucket == bucket)
.as_mut()
{
for (k, v) in updates {
existing_snapshot_for_bucket.value.insert(k, v);
}
return;
}
if snapshots.len() >= max_snapshot_count {
snapshots.pop_front();
}
let snapshot = PersisterSnapshot {
bucket,
value: updates,
};
snapshots.push_back(snapshot);
}
async fn persist_all_snapshots_and_update_state(
client: &mut Client,
previous: &mut HashMap<Pubkey, PersistedData>,
snapshots: &mut VecDeque<PersisterSnapshot>,
ttl: Duration,
) -> anyhow::Result<()> {
loop {
if snapshots.is_empty() {
break;
}
let snapshot = &snapshots[0];
if snapshot.value.len() == 0 {
snapshots.pop_front();
continue;
}
Self::persist_snapshot(client, &snapshot.value, ttl).await?;
let snapshot = snapshots.pop_front().unwrap();
for (k, v) in snapshot.value {
previous.insert(k, v);
}
}
Ok(())
}
async fn try_to_reconnect(
postgres_configuration: &PostgresConfiguration,
) -> anyhow::Result<Client> {
let client = postgres_connection::connect(&postgres_configuration)
.await?
.0;
Ok(client)
}
async fn load_previous(client: &Client) -> anyhow::Result<HashMap<Pubkey, PersistedData>> {
let rows = client
.query(
"SELECT Pubkey, Timestamp, MaintenanceRatio, Initial, Maintenance, LiquidationEnd, IsBeingLiquidated FROM mango_monitoring.health_current",
&[],
)
.await?;
let mut result = HashMap::<Pubkey, PersistedData>::new();
for row in rows {
let key = Pubkey::from_str(row.get(0))?;
result.insert(
key,
PersistedData {
computed_at: row.get(1),
maintenance_ratio: row.get(2),
initial_health: row.get(3),
maintenance_health: row.get(4),
liquidation_end_health: row.get(5),
is_being_liquidated: row.get(6),
},
);
}
Ok(result)
}
async fn persist_snapshot(
client: &mut Client,
updates: &HashMap<Pubkey, PersistedData>,
ttl: chrono::Duration,
) -> anyhow::Result<()> {
let tx = client.transaction().await?;
Self::insert_history(&tx, &updates).await?;
Self::delete_old_history(&tx, chrono::Utc::now(), ttl).await?;
Self::update_current(&tx).await?;
tx.commit().await?;
Ok(())
}
async fn insert_history<'tx>(
client: &Transaction<'tx>,
updates: &HashMap<Pubkey, PersistedData>,
) -> anyhow::Result<()> {
let col_types = [
Type::VARCHAR,
Type::TIMESTAMPTZ,
Type::FLOAT8,
Type::FLOAT8,
Type::FLOAT8,
Type::FLOAT8,
Type::BOOL,
];
let sink = client.copy_in("COPY mango_monitoring.health_history (Pubkey, Timestamp, MaintenanceRatio, Initial, Maintenance, LiquidationEnd, IsBeingLiquidated) FROM STDIN BINARY").await?;
let writer = BinaryCopyInWriter::new(sink, &col_types);
pin_mut!(writer);
for (key, value) in updates {
let key = key.to_string();
let row: Vec<&'_ (dyn ToSql + Sync)> = vec![
&key,
&value.computed_at,
&value.maintenance_ratio,
&value.initial_health,
&value.maintenance_health,
&value.liquidation_end_health,
&value.is_being_liquidated,
];
writer.as_mut().write(&row).await?;
}
writer.finish().await?;
Ok(())
}
async fn update_current<'tx>(client: &Transaction<'tx>) -> anyhow::Result<()> {
let query =
postgres_query::query!("REFRESH MATERIALIZED VIEW mango_monitoring.health_current");
query.execute(client).await?;
Ok(())
}
async fn delete_old_history<'tx>(
client: &Transaction<'tx>,
now: chrono::DateTime<Utc>,
ttl: chrono::Duration,
) -> anyhow::Result<()> {
let min_ts = now - ttl;
let query = postgres_query::query!(
"DELETE FROM mango_monitoring.health_history WHERE timestamp < $min_ts",
min_ts
);
query.execute(client).await?;
Ok(())
}
fn should_insert(
persisted_data: &HashMap<Pubkey, PersistedData>,
health_component_key: &Pubkey,
health_component: &PersistedData,
periodicity: Duration,
) -> bool {
match persisted_data.get(health_component_key) {
None => true,
Some(previous) => {
let is_old = health_component.computed_at - previous.computed_at >= periodicity;
let between_none_and_some = previous.is_some() != health_component.is_some();
if is_old || between_none_and_some {
true
} else if previous.is_some() && health_component.is_some() {
let changing_flag = health_component.is_being_liquidated.unwrap()
!= previous.is_being_liquidated.unwrap();
let curr = health_component.maintenance_ratio.unwrap();
let prev = previous.maintenance_ratio.unwrap();
let changing_side = (prev <= 0.0 && curr > 0.0) || (prev > 0.0 && curr <= 0.0);
let big_move = prev != 0.0 && (prev - curr).abs() / prev > 0.1;
changing_side || changing_flag || big_move
} else {
false
}
}
}
}
}
struct PersistedData {
pub computed_at: chrono::DateTime<Utc>,
pub maintenance_ratio: Option<f64>,
pub initial_health: Option<f64>,
pub maintenance_health: Option<f64>,
pub liquidation_end_health: Option<f64>,
pub is_being_liquidated: Option<bool>,
}
impl PersistedData {
pub fn is_some(&self) -> bool {
self.maintenance_ratio.is_some()
&& self.initial_health.is_some()
&& self.maintenance_health.is_some()
&& self.liquidation_end_health.is_some()
&& self.is_being_liquidated.is_some()
}
}
struct PersisterSnapshot {
pub value: HashMap<Pubkey, PersistedData>,
pub bucket: chrono::DateTime<Utc>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processors::health::HealthComponentValue;
use chrono::SubsecRound;
fn make_value(hr: f64, i: u64, m: u64, le: u64, ibl: bool) -> Option<HealthComponentValue> {
Some(HealthComponentValue {
maintenance_ratio: hr,
initial_health: i as f64,
maintenance_health: m as f64,
liquidation_end_health: le as f64,
is_being_liquidated: ibl,
})
}
fn make_persisted_empty(t_secs: i64) -> PersistedData {
PersistedData {
computed_at: chrono::Utc::now() - chrono::Duration::seconds(t_secs),
maintenance_ratio: None,
initial_health: None,
maintenance_health: None,
liquidation_end_health: None,
is_being_liquidated: None,
}
}
fn make_persisted(t_secs: i64, mr: f64) -> PersistedData {
PersistedData {
computed_at: chrono::Utc::now() - chrono::Duration::seconds(t_secs),
maintenance_ratio: Some(mr),
initial_health: Some(1000f64),
maintenance_health: Some(1000f64),
liquidation_end_health: Some(1f64),
is_being_liquidated: Some(false),
}
}
fn make_persisted_with_liquidated_flag(t_secs: i64, mr: f64) -> PersistedData {
PersistedData {
computed_at: chrono::Utc::now() - chrono::Duration::seconds(t_secs),
maintenance_ratio: Some(mr),
initial_health: Some(1000f64),
maintenance_health: Some(1000f64),
liquidation_end_health: Some(1f64),
is_being_liquidated: Some(true),
}
}
#[test]
fn should_persist_if_there_is_no_previous_point() {
let previous = HashMap::new();
assert!(PersisterProcessor::should_insert(
&previous,
&Pubkey::new_unique(),
&make_persisted(0, 123f64),
chrono::Duration::seconds(60)
));
assert!(PersisterProcessor::should_insert(
&previous,
&Pubkey::new_unique(),
&make_persisted(0, 0f64),
chrono::Duration::seconds(60)
));
assert!(PersisterProcessor::should_insert(
&previous,
&Pubkey::new_unique(),
&make_persisted_empty(0),
chrono::Duration::seconds(60)
));
}
#[test]
fn should_persist_if_previous_point_is_old() {
let mut previous = HashMap::new();
let pk1 = Pubkey::new_unique();
let pk2 = Pubkey::new_unique();
previous.insert(pk1, make_persisted(120, 123.0));
previous.insert(pk2, make_persisted(3, 123.0));
assert!(PersisterProcessor::should_insert(
&previous,
&pk1,
&make_persisted(0, 124.0),
chrono::Duration::seconds(60)
));
assert!(!PersisterProcessor::should_insert(
&previous,
&pk2,
&make_persisted(0, 124.0),
chrono::Duration::seconds(60)
));
}
#[test]
fn should_persist_when_change_is_interesting() {
let mut previous = HashMap::new();
let pk1 = Pubkey::new_unique();
let pk2 = Pubkey::new_unique();
previous.insert(pk1, make_persisted(0, 123f64));
previous.insert(pk2, make_persisted(0, 0.01));
// small move, nop
assert!(!PersisterProcessor::should_insert(
&previous,
&pk1,
&make_persisted(0, 124.0),
chrono::Duration::seconds(60)
));
// big move, insert
assert!(PersisterProcessor::should_insert(
&previous,
&pk1,
&make_persisted(0, 100.0),
chrono::Duration::seconds(60)
));
// small move, but cross 0, insert
assert!(PersisterProcessor::should_insert(
&previous,
&pk2,
&make_persisted(0, -0.001),
chrono::Duration::seconds(60)
));
// small move, does not cross 0, nop
assert!(!PersisterProcessor::should_insert(
&previous,
&pk2,
&make_persisted(0, 0.0099),
chrono::Duration::seconds(60)
));
// no change except flag being liquidated change
assert!(PersisterProcessor::should_insert(
&previous,
&pk2,
&make_persisted_with_liquidated_flag(0, 0.01),
chrono::Duration::seconds(60)
));
}
#[test]
fn should_correctly_convert_event_into_data() {
let computed_at = chrono::Utc::now();
let component = HealthComponent {
account: Pubkey::new_unique(),
value: Some(HealthComponentValue {
maintenance_ratio: 123.0,
initial_health: 1000.0,
maintenance_health: 2000.0,
liquidation_end_health: 3000.0,
is_being_liquidated: false,
}),
};
let converted = PersisterProcessor::build_persisted_data(computed_at, &component);
assert_eq!(converted.computed_at, computed_at);
assert_eq!(converted.maintenance_ratio.unwrap(), 123.0);
assert_eq!(converted.initial_health.unwrap(), 1000.0);
assert_eq!(converted.maintenance_health.unwrap(), 2000.0);
assert_eq!(converted.liquidation_end_health.unwrap(), 3000.0);
assert_eq!(converted.is_being_liquidated.unwrap(), false);
}
#[test]
fn should_store_or_replace_snapshot() {
let pk = Pubkey::new_unique();
let previous = HashMap::new();
let mut snapshots = VecDeque::new();
let event1 = HealthEvent {
computed_at: chrono::Utc::now().trunc_subsecs(0) - chrono::Duration::seconds(300),
components: vec![HealthComponent {
account: pk,
value: make_value(50.25f64, 2, 3, 4, false),
}],
};
let event2 = HealthEvent {
computed_at: chrono::Utc::now().trunc_subsecs(0) - chrono::Duration::seconds(290),
components: vec![HealthComponent {
account: pk,
value: make_value(502.5f64, 20, 30, 40, false),
}],
};
let event3 = HealthEvent {
computed_at: chrono::Utc::now().trunc_subsecs(0) - chrono::Duration::seconds(200),
components: vec![HealthComponent {
account: pk,
value: make_value(5025.0f64, 200, 300, 400, false),
}],
};
let event4 = HealthEvent {
computed_at: chrono::Utc::now().trunc_subsecs(0) - chrono::Duration::seconds(100),
components: vec![HealthComponent {
account: pk,
value: make_value(50250.0f64, 2000, 3000, 4000, false),
}],
};
PersisterProcessor::store_snapshot(
&previous,
&mut snapshots,
&event1,
Duration::seconds(60),
2,
);
assert_eq!(snapshots.len(), 1);
assert_eq!(
snapshots[0]
.value
.iter()
.next()
.unwrap()
.1
.maintenance_health
.unwrap(),
3.0
);
PersisterProcessor::store_snapshot(
&previous,
&mut snapshots,
&event2,
Duration::seconds(60),
2,
);
assert_eq!(snapshots.len(), 1);
assert_eq!(
snapshots[0]
.value
.iter()
.next()
.unwrap()
.1
.maintenance_health
.unwrap(),
30.0
);
PersisterProcessor::store_snapshot(
&previous,
&mut snapshots,
&event3,
Duration::seconds(60),
2,
);
assert_eq!(snapshots.len(), 2);
assert_eq!(
snapshots[0]
.value
.iter()
.next()
.unwrap()
.1
.maintenance_health
.unwrap(),
30.0
);
assert_eq!(
snapshots[1]
.value
.iter()
.next()
.unwrap()
.1
.maintenance_health
.unwrap(),
300.0
);
PersisterProcessor::store_snapshot(
&previous,
&mut snapshots,
&event4,
Duration::seconds(60),
2,
);
assert_eq!(snapshots.len(), 2);
assert_eq!(
snapshots[0]
.value
.iter()
.next()
.unwrap()
.1
.maintenance_health
.unwrap(),
300.0
);
assert_eq!(
snapshots[1]
.value
.iter()
.next()
.unwrap()
.1
.maintenance_health
.unwrap(),
3000.0
);
}
}

View File

@ -0,0 +1 @@

9
cd/health.toml Normal file
View File

@ -0,0 +1,9 @@
app = "mango-health-monitoring"
kill_signal = "SIGTERM"
kill_timeout = 30
[build]
dockerfile = "../Dockerfile"
[experimental]
cmd = ["service-mango-health", "health-config.toml"]

View File

@ -0,0 +1,20 @@
[package]
name = "services-mango-lib"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0"
base64 = "0.21"
tokio = { version = "1", features = ["full"] }
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
tokio-postgres-rustls = "0.9.0"
postgres-types = { version = "0.2", features = ["array-impls", "derive", "with-chrono-0_4"] }
postgres-native-tls = "0.5"
native-tls = "0.2"
rustls = "0.20.8"
postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" }
tracing = { version = "0.1", features = ["log"] }
serde = { version = "1.0.188", features = ["derive"] }

View File

@ -0,0 +1,3 @@
pub mod postgres_configuration;
pub mod postgres_connection;
pub mod retry_counter;

View File

@ -0,0 +1,16 @@
use serde::Deserialize;
#[derive(Clone, Debug, Deserialize, Default)]
pub struct PostgresConfiguration {
pub connection_string: String,
pub allow_invalid_certs: bool,
pub tls: Option<PostgresTlsConfig>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct PostgresTlsConfig {
/// CA Cert file or env var
pub ca_cert_path: String,
/// PKCS12 client cert path
pub client_key_path: String,
}

View File

@ -0,0 +1,66 @@
use crate::postgres_configuration::PostgresConfiguration;
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use std::{env, fs};
use tokio::task::JoinHandle;
use tokio_postgres::Client;
pub async fn connect(
config: &PostgresConfiguration,
) -> anyhow::Result<(Client, JoinHandle<Result<(), tokio_postgres::Error>>)> {
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
let tls = match &config.tls {
Some(tls) => {
use base64::{engine::general_purpose, Engine as _};
let ca_cert = match &tls.ca_cert_path.chars().next().unwrap() {
'$' => general_purpose::STANDARD
.decode(
env::var(&tls.ca_cert_path[1..])
.expect("reading client cert from env")
.into_bytes(),
)
.expect("decoding client cert"),
_ => fs::read(&tls.ca_cert_path).expect("reading client cert from file"),
};
let client_key = match &tls.client_key_path.chars().next().unwrap() {
'$' => general_purpose::STANDARD
.decode(
env::var(&tls.client_key_path[1..])
.expect("reading client key from env")
.into_bytes(),
)
.expect("decoding client key"),
_ => fs::read(&tls.client_key_path).expect("reading client key from file"),
};
MakeTlsConnector::new(
TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
.identity(Identity::from_pkcs12(&client_key, "pass")?)
.danger_accept_invalid_certs(config.allow_invalid_certs)
.build()?,
)
}
None => MakeTlsConnector::new(
TlsConnector::builder()
.danger_accept_invalid_certs(config.allow_invalid_certs)
.build()?,
),
};
let config = config.clone();
let connection_string = match &config.connection_string.chars().next().unwrap() {
'$' => {
env::var(&config.connection_string[1..]).expect("reading connection string from env")
}
_ => config.connection_string.clone(),
};
let (client, connection) = tokio_postgres::connect(&connection_string, tls).await?;
let handle = tokio::spawn(async move { connection.await });
Ok((client, handle))
}

View File

@ -0,0 +1,59 @@
pub struct RetryCounter {
error_count: u64,
max_retry_count: u64,
}
impl RetryCounter {
pub fn new(max_retry_count: u64) -> Self {
RetryCounter {
max_retry_count,
error_count: 0,
}
}
pub fn reset(&mut self) {
self.error_count = 0;
}
/// true if should retry, false if should bail
pub fn on_new_error(&mut self) -> bool {
self.error_count += 1;
self.error_count <= self.max_retry_count
}
pub fn fail_or_ignore<T>(&mut self, result: anyhow::Result<T>) -> anyhow::Result<Option<T>> {
match result {
Err(e) => match self.on_new_error() {
true => Ok(None),
false => {
self.reset();
Err(e)
}
},
Ok(v) => {
self.reset();
Ok(Some(v))
}
}
}
}
#[macro_export]
macro_rules! fail_or_retry {
($retry_counter:expr, $f:expr) => {{
loop {
let result = $retry_counter.fail_or_ignore($f);
match result {
Ok(Some(value)) => {
break Ok(value);
}
Ok(None) => {
continue;
}
Err(e) => {
break Err(e);
}
}
}
}};
}