From 6aa8a883f4090e677d644edfa9de6656fa9a55f1 Mon Sep 17 00:00:00 2001 From: Serge Farny Date: Thu, 29 Feb 2024 17:05:29 +0100 Subject: [PATCH] service-mango-health: monitors/historizes accounts health (#890) * service-mango-health: add a new service that compute health for all mango accounts and persist an history in postgres sql --- .github/workflows/ci-docker-publish.yml | 1 + Cargo.lock | 66 ++ Dockerfile | 1 + bin/service-mango-fills/Cargo.toml | 1 + .../src/fill_event_postgres_target.rs | 62 +- .../src/postgres_config.rs | 9 +- bin/service-mango-health/Cargo.toml | 60 ++ bin/service-mango-health/README.md | 4 + .../conf/example-config.toml | 27 + .../conf/template-config.toml | 26 + .../sql_scripts/setup.sql | 26 + bin/service-mango-health/src/configuration.rs | 35 + bin/service-mango-health/src/main.rs | 87 +++ .../src/processors/data.rs | 210 ++++++ .../src/processors/exit.rs | 39 + .../src/processors/health.rs | 177 +++++ .../src/processors/logger.rs | 81 ++ .../src/processors/mod.rs | 5 + .../src/processors/persister.rs | 712 ++++++++++++++++++ bin/service-mango-health/src/utils/mod.rs | 1 + cd/health.toml | 9 + lib/services-mango-lib/Cargo.toml | 20 + lib/services-mango-lib/src/lib.rs | 3 + .../src/postgres_configuration.rs | 16 + .../src/postgres_connection.rs | 66 ++ lib/services-mango-lib/src/retry_counter.rs | 59 ++ 26 files changed, 1743 insertions(+), 60 deletions(-) create mode 100644 bin/service-mango-health/Cargo.toml create mode 100644 bin/service-mango-health/README.md create mode 100644 bin/service-mango-health/conf/example-config.toml create mode 100644 bin/service-mango-health/conf/template-config.toml create mode 100644 bin/service-mango-health/sql_scripts/setup.sql create mode 100644 bin/service-mango-health/src/configuration.rs create mode 100644 bin/service-mango-health/src/main.rs create mode 100644 bin/service-mango-health/src/processors/data.rs create mode 100644 bin/service-mango-health/src/processors/exit.rs create mode 100644 bin/service-mango-health/src/processors/health.rs create mode 100644 bin/service-mango-health/src/processors/logger.rs create mode 100644 bin/service-mango-health/src/processors/mod.rs create mode 100644 bin/service-mango-health/src/processors/persister.rs create mode 100644 bin/service-mango-health/src/utils/mod.rs create mode 100644 cd/health.toml create mode 100644 lib/services-mango-lib/Cargo.toml create mode 100644 lib/services-mango-lib/src/lib.rs create mode 100644 lib/services-mango-lib/src/postgres_configuration.rs create mode 100644 lib/services-mango-lib/src/postgres_connection.rs create mode 100644 lib/services-mango-lib/src/retry_counter.rs diff --git a/.github/workflows/ci-docker-publish.yml b/.github/workflows/ci-docker-publish.yml index 5df69bd84..70d1c169e 100644 --- a/.github/workflows/ci-docker-publish.yml +++ b/.github/workflows/ci-docker-publish.yml @@ -15,6 +15,7 @@ on: 'bin/service-mango-fills/**', 'bin/service-mango-orderbook/**', 'bin/service-mango-pnl/**', + 'bin/service-mango-health/**', ] workflow_dispatch: diff --git a/Cargo.lock b/Cargo.lock index a5c572ac1..3953e9ce6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5986,6 +5986,7 @@ dependencies = [ "serde_derive", "serde_json", "serum_dex 0.5.10 (git+https://github.com/openbook-dex/program.git)", + "services-mango-lib", "solana-client", "solana-logger", "solana-sdk", @@ -5997,6 +5998,53 @@ dependencies = [ "ws", ] +[[package]] +name = "service-mango-health" +version = "0.1.0" +dependencies = [ + "anchor-client", + "anchor-lang", + "anyhow", + "async-channel", + "async-trait", + "base64 0.21.4", + "bs58 0.3.1", + "bytemuck", + "chrono", + "fixed 1.11.0 (git+https://github.com/blockworks-foundation/fixed.git?branch=v1.11.0-borsh0_10-mango)", + "futures 0.3.28", + "futures-channel", + "futures-core", + "futures-util", + "itertools", + "jemallocator", + "log 0.4.20", + "mango-feeds-connector", + "mango-feeds-lib", + "mango-v4", + "mango-v4-client", + "native-tls", + "postgres-native-tls", + "postgres-types", + "postgres_query", + "rustls 0.20.9", + "serde", + "serde_derive", + "serde_json", + "serum_dex 0.5.10 (git+https://github.com/openbook-dex/program.git)", + "services-mango-lib", + "solana-client", + "solana-logger", + "solana-sdk", + "tokio", + "tokio-postgres", + "tokio-postgres-rustls", + "tokio-tungstenite 0.17.2", + "toml", + "tracing", + "ws", +] + [[package]] name = "service-mango-orderbook" version = "0.1.0" @@ -6055,6 +6103,24 @@ dependencies = [ "toml", ] +[[package]] +name = "services-mango-lib" +version = "0.1.0" +dependencies = [ + "anyhow", + "base64 0.21.4", + "native-tls", + "postgres-native-tls", + "postgres-types", + "postgres_query", + "rustls 0.20.9", + "serde", + "tokio", + "tokio-postgres", + "tokio-postgres-rustls", + "tracing", +] + [[package]] name = "sgx-quote" version = "0.1.0" diff --git a/Dockerfile b/Dockerfile index 98dcbdd15..1743b9b9b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,6 +27,7 @@ COPY --from=build /app/target/release/service-mango-* /usr/local/bin/ COPY --from=build /app/bin/service-mango-pnl/conf/template-config.toml ./pnl-config.toml COPY --from=build /app/bin/service-mango-fills/conf/template-config.toml ./fills-config.toml COPY --from=build /app/bin/service-mango-orderbook/conf/template-config.toml ./orderbook-config.toml +COPY --from=build /app/bin/service-mango-health/conf/template-config.toml ./health-config.toml COPY --from=build /app/bin/service-mango-pnl/conf/template-config.toml ./pnl-config.toml COPY --from=build /app/bin/service-mango-fills/conf//template-config.toml ./fills-config.toml diff --git a/bin/service-mango-fills/Cargo.toml b/bin/service-mango-fills/Cargo.toml index acd55d9ba..d3187b1f6 100644 --- a/bin/service-mango-fills/Cargo.toml +++ b/bin/service-mango-fills/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" license = "AGPL-3.0-or-later" [dependencies] +services-mango-lib = { path = "../../lib/services-mango-lib" } mango-feeds-lib = { path = "../../lib/mango-feeds-lib" } mango-feeds-connector = { workspace = true } diff --git a/bin/service-mango-fills/src/fill_event_postgres_target.rs b/bin/service-mango-fills/src/fill_event_postgres_target.rs index d92d7ecb6..de8018b89 100644 --- a/bin/service-mango-fills/src/fill_event_postgres_target.rs +++ b/bin/service-mango-fills/src/fill_event_postgres_target.rs @@ -2,12 +2,10 @@ use crate::postgres_config::PostgresConfig; use chrono::{TimeZone, Utc}; use log::*; use mango_feeds_connector::metrics::{MetricType, MetricU64, Metrics}; -use native_tls::{Certificate, Identity, TlsConnector}; -use postgres_native_tls::MakeTlsConnector; use postgres_query::Caching; use service_mango_fills::*; +use services_mango_lib::postgres_configuration::PostgresConfiguration; use std::{ - env, fs, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -24,56 +22,15 @@ async fn postgres_connection( ) -> anyhow::Result>> { let (tx, rx) = async_channel::unbounded(); - // openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks - // base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64 - // fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills - // fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills - let tls = match &config.tls { - Some(tls) => { - use base64::{engine::general_purpose, Engine as _}; - let ca_cert = match &tls.ca_cert_path.chars().next().unwrap() { - '$' => general_purpose::STANDARD - .decode( - env::var(&tls.ca_cert_path[1..]) - .expect("reading client cert from env") - .into_bytes(), - ) - .expect("decoding client cert"), - _ => fs::read(&tls.ca_cert_path).expect("reading client cert from file"), - }; - let client_key = match &tls.client_key_path.chars().next().unwrap() { - '$' => general_purpose::STANDARD - .decode( - env::var(&tls.client_key_path[1..]) - .expect("reading client key from env") - .into_bytes(), - ) - .expect("decoding client key"), - _ => fs::read(&tls.client_key_path).expect("reading client key from file"), - }; - MakeTlsConnector::new( - TlsConnector::builder() - .add_root_certificate(Certificate::from_pem(&ca_cert)?) - .identity(Identity::from_pkcs12(&client_key, "pass")?) - .danger_accept_invalid_certs(config.allow_invalid_certs) - .build()?, - ) - } - None => MakeTlsConnector::new( - TlsConnector::builder() - .danger_accept_invalid_certs(config.allow_invalid_certs) - .build()?, - ), + let config = config.clone(); + let lib_config = PostgresConfiguration { + connection_string: config.connection_string.clone(), + allow_invalid_certs: config.allow_invalid_certs, + tls: config.tls.clone(), }; - let config = config.clone(); - let connection_string = match &config.connection_string.chars().next().unwrap() { - '$' => { - env::var(&config.connection_string[1..]).expect("reading connection string from env") - } - _ => config.connection_string.clone(), - }; - let mut initial = Some(tokio_postgres::connect(&connection_string, tls.clone()).await?); + let mut initial = Some(services_mango_lib::postgres_connection::connect(&lib_config).await?); + let mut metric_retries = metric_retries; let mut metric_live = metric_live; tokio::spawn(async move { @@ -86,7 +43,8 @@ async fn postgres_connection( let (client, connection) = match initial.take() { Some(v) => v, None => { - let result = tokio_postgres::connect(&connection_string, tls.clone()).await; + let result = + services_mango_lib::postgres_connection::connect(&lib_config).await; match result { Ok(v) => v, Err(err) => { diff --git a/bin/service-mango-fills/src/postgres_config.rs b/bin/service-mango-fills/src/postgres_config.rs index b67f674f2..35342673a 100644 --- a/bin/service-mango-fills/src/postgres_config.rs +++ b/bin/service-mango-fills/src/postgres_config.rs @@ -1,4 +1,5 @@ use serde_derive::Deserialize; +use services_mango_lib::postgres_configuration::PostgresTlsConfig; #[derive(Clone, Debug, Deserialize)] pub struct PostgresConfig { @@ -21,11 +22,3 @@ pub struct PostgresConfig { pub allow_invalid_certs: bool, pub tls: Option, } - -#[derive(Clone, Debug, Deserialize)] -pub struct PostgresTlsConfig { - /// CA Cert file or env var - pub ca_cert_path: String, - /// PKCS12 client cert path - pub client_key_path: String, -} diff --git a/bin/service-mango-health/Cargo.toml b/bin/service-mango-health/Cargo.toml new file mode 100644 index 000000000..f5fad86ab --- /dev/null +++ b/bin/service-mango-health/Cargo.toml @@ -0,0 +1,60 @@ +[package] +name = "service-mango-health" +version = "0.1.0" +authors = ["Christian Kamm ", "Maximilian Schneider ", "Serge Farny "] +edition = "2018" +license = "AGPL-3.0-or-later" + +[dependencies] +mango-feeds-lib = { path = "../../lib/mango-feeds-lib" } +mango-feeds-connector = { workspace = true } +services-mango-lib = { path = "../../lib/services-mango-lib" } + +solana-client = { workspace = true } +solana-logger = { workspace = true } +solana-sdk = { workspace = true } + +anchor-lang = { workspace = true } +anchor-client = { workspace = true } + +fixed = { workspace = true, features = ["serde", "borsh"] } + +mango-v4 = { path = "../../programs/mango-v4", features = ["client"] } +mango-v4-client = { path = "../../lib/client" } + + +serum_dex = { workspace = true } + +bs58 = "0.3.1" +log = "0.4" +anyhow = "1.0" +toml = "0.5" +serde = "1.0.130" +serde_derive = "1.0.130" +serde_json = "1.0.68" +futures = "0.3.17" +futures-core = "0.3" +futures-channel = "0.3" +futures-util = "0.3" +ws = "^0.9.2" +async-channel = "1.6" +async-trait = "0.1" +bytemuck = "^1.7.2" +itertools = "0.10.3" +jemallocator = "0.3.2" +chrono = "0.4.23" +base64 = "0.21" + +tokio = { version = "1", features = ["full"] } +tokio-tungstenite = "0.17" + +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } +tokio-postgres-rustls = "0.9.0" +postgres-types = { version = "0.2", features = ["array-impls", "derive", "with-chrono-0_4"] } +postgres-native-tls = "0.5" +native-tls = "0.2" +rustls = "0.20.8" +# postgres_query hasn't updated its crate in a while +postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" } +tracing = { version = "0.1", features = ["log"] } + diff --git a/bin/service-mango-health/README.md b/bin/service-mango-health/README.md new file mode 100644 index 000000000..3505e168e --- /dev/null +++ b/bin/service-mango-health/README.md @@ -0,0 +1,4 @@ +# service-mango-health + +This service monitors all mango accounts health (publish and persists health ratio) + diff --git a/bin/service-mango-health/conf/example-config.toml b/bin/service-mango-health/conf/example-config.toml new file mode 100644 index 000000000..edbb9e493 --- /dev/null +++ b/bin/service-mango-health/conf/example-config.toml @@ -0,0 +1,27 @@ +rpc_ws_url = "wss://mango.rpcpool.com/" +rpc_http_url = "http://mango.rpcpool.com/" +mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX" + +# [postgres] +# connection_string = "$PG_CONNECTION_STRING" +# allow_invalid_certs = true +# max_retry_count = 2 + +# # [postgres.tls] +# # ca_cert_path = "$PG_CA_CERT" +# # client_key_path = "$PG_CLIENT_KEY" + +[computing_configuration] +recompute_interval_ms = 100 + +[logging_configuration] +log_health_to_stdout = true +#log_health_for_accounts = ["xxx"] + +[persistence_configuration] +enabled = true +history_time_to_live_secs = 2678400 # 31 days +persist_max_periodicity_secs = 60 +snapshot_queue_length = 30 # 30 * persist_max_periodicity_secs secs of backup in queue +max_failure_duration_secs = 3600 +max_retry_count = 3 \ No newline at end of file diff --git a/bin/service-mango-health/conf/template-config.toml b/bin/service-mango-health/conf/template-config.toml new file mode 100644 index 000000000..a7684de85 --- /dev/null +++ b/bin/service-mango-health/conf/template-config.toml @@ -0,0 +1,26 @@ +rpc_http_url = "$RPC_HTTP_URL" +rpc_ws_url = "$RPC_WS_URL" +mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX" + +[postgres] +connection_string = "$PG_CONNECTION_STRING" +max_retry_count = 2 +allow_invalid_certs = true + +[postgres.tls] +ca_cert_path = "$PG_CA_CERT" +client_key_path = "$PG_CLIENT_KEY" + +[computing_configuration] +recompute_interval_ms = 100 + +[logging_configuration] +log_health_to_stdout = false + +[persistence_configuration] +enabled = true +history_time_to_live_secs = 2678400 # 31 days +persist_max_periodicity_secs = 60 +snapshot_queue_length = 30 # 30 * persist_max_periodicity_secs secs of backup in queue +max_failure_duration_secs = 3600 +max_retry_count = 3 diff --git a/bin/service-mango-health/sql_scripts/setup.sql b/bin/service-mango-health/sql_scripts/setup.sql new file mode 100644 index 000000000..150d93b99 --- /dev/null +++ b/bin/service-mango-health/sql_scripts/setup.sql @@ -0,0 +1,26 @@ +CREATE SCHEMA IF NOT EXISTS mango_monitoring AUTHORIZATION CURRENT_ROLE; +CREATE TABLE IF NOT EXISTS mango_monitoring.health_history +( + Pubkey VARCHAR(44) NOT NULL, + Timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + MaintenanceRatio DOUBLE PRECISION, + Maintenance DOUBLE PRECISION, + Initial DOUBLE PRECISION, + LiquidationEnd DOUBLE PRECISION, + IsBeingLiquidated BOOLEAN +); +CREATE MATERIALIZED VIEW mango_monitoring.health_current AS + SELECT DISTINCT ON (pubkey) + * + FROM mango_monitoring.health_history + ORDER BY pubkey, timestamp DESC; + +CREATE INDEX health_history_pubkey_index ON mango_monitoring.health_history +( + Pubkey ASC, + Timestamp ASC +); +CREATE INDEX health_history_timestamp_index ON mango_monitoring.health_history +( + Timestamp ASC +); \ No newline at end of file diff --git a/bin/service-mango-health/src/configuration.rs b/bin/service-mango-health/src/configuration.rs new file mode 100644 index 000000000..9552898b9 --- /dev/null +++ b/bin/service-mango-health/src/configuration.rs @@ -0,0 +1,35 @@ +use serde_derive::Deserialize; +use services_mango_lib::postgres_configuration::PostgresConfiguration; +use std::collections::HashSet; + +#[derive(Clone, Debug, Deserialize)] +pub struct Configuration { + pub postgres: Option, + pub rpc_http_url: String, + pub rpc_ws_url: String, + pub mango_group: String, + pub computing_configuration: ComputingConfiguration, + pub logging_configuration: LoggingConfiguration, + pub persistence_configuration: PersistenceConfiguration, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct ComputingConfiguration { + pub recompute_interval_ms: u64, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct LoggingConfiguration { + pub log_health_to_stdout: bool, + pub log_health_for_accounts: Option>, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct PersistenceConfiguration { + pub enabled: bool, + pub history_time_to_live_secs: i64, + pub persist_max_periodicity_secs: i64, + pub max_failure_duration_secs: i64, + pub max_retry_count: u64, + pub snapshot_queue_length: usize, +} diff --git a/bin/service-mango-health/src/main.rs b/bin/service-mango-health/src/main.rs new file mode 100644 index 000000000..9b3b5174a --- /dev/null +++ b/bin/service-mango-health/src/main.rs @@ -0,0 +1,87 @@ +mod configuration; +mod processors; +mod utils; + +use futures_util::StreamExt; +// use mango_feeds_connector::metrics; +use mango_v4_client::tracing_subscriber_init; +use std::fs::File; +use std::io::Read; +use std::sync::atomic::Ordering; + +use crate::configuration::Configuration; +use crate::processors::data::DataProcessor; +use crate::processors::exit::ExitProcessor; +use crate::processors::health::HealthProcessor; +use crate::processors::logger::LoggerProcessor; +use crate::processors::persister::PersisterProcessor; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args: Vec = std::env::args().collect(); + + if args.len() < 2 { + eprintln!("Please enter a config file path argument."); + return Ok(()); + } + + let configuration: Configuration = { + let mut file = File::open(&args[1])?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + toml::from_str(&contents).unwrap() + }; + + tracing_subscriber_init(); + + // TODO FAS Add metrics + // let metrics_tx = metrics::start(configuration.metrics.clone(), "health".into()); + + let exit_processor = ExitProcessor::init().await?; + + let data_processor: DataProcessor = + DataProcessor::init(&configuration, exit_processor.exit.clone()).await?; + + let health_processor = HealthProcessor::init( + &data_processor.channel, + data_processor.chain_data.clone(), + &configuration, + exit_processor.exit.clone(), + ) + .await?; + + let logger = LoggerProcessor::init( + &health_processor.channel, + &configuration, + exit_processor.exit.clone(), + ) + .await?; + + let persister = PersisterProcessor::init( + &health_processor.channel, + &configuration, + exit_processor.exit.clone(), + ) + .await?; + + let mut jobs = vec![exit_processor.job, data_processor.job, health_processor.job]; + + if let Some(logger) = logger { + jobs.push(logger.job) + } + + if let Some(persister) = persister { + jobs.push(persister.job) + } + + let mut jobs: futures::stream::FuturesUnordered<_> = jobs.into_iter().collect(); + + while let Some(_) = jobs.next().await { + // if any job exit, stop the others threads & wait + exit_processor.exit.store(true, Ordering::Relaxed); + } + + // for now, we force exit here because websocket connection to RPC is not properly closed on exit + tracing::warn!("killing process"); + std::process::exit(0x0100); +} diff --git a/bin/service-mango-health/src/processors/data.rs b/bin/service-mango-health/src/processors/data.rs new file mode 100644 index 000000000..c9b3f6ac5 --- /dev/null +++ b/bin/service-mango-health/src/processors/data.rs @@ -0,0 +1,210 @@ +use crate::configuration::Configuration; +use crate::processors::data::DataEvent::{AccountUpdate, Other, Snapshot}; +use async_channel::Receiver; +use chrono::Utc; +use itertools::Itertools; +use mango_v4_client::account_update_stream::Message; +use mango_v4_client::snapshot_source::is_mango_account; +use mango_v4_client::{ + account_update_stream, chain_data, snapshot_source, websocket_source, MangoGroupContext, +}; +use services_mango_lib::fail_or_retry; +use services_mango_lib::retry_counter::RetryCounter; +use solana_client::nonblocking::rpc_client::RpcClient as RpcClientAsync; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::pubkey::Pubkey; +use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::time::Duration; +use tokio::task::JoinHandle; +use tracing::warn; + +pub struct DataProcessor { + pub channel: tokio::sync::broadcast::Sender, + pub job: JoinHandle<()>, + pub chain_data: Arc>, +} + +#[derive(Clone, Debug)] +pub enum DataEvent { + Other, + Snapshot(SnapshotEvent), + AccountUpdate(AccountUpdateEvent), +} + +#[derive(Clone, Debug)] +pub struct SnapshotEvent { + pub received_at: chrono::DateTime, + pub accounts: Vec, +} + +#[derive(Clone, Debug)] +pub struct AccountUpdateEvent { + pub received_at: chrono::DateTime, + pub account: Pubkey, +} + +impl DataProcessor { + pub async fn init( + configuration: &Configuration, + exit: Arc, + ) -> anyhow::Result { + let mut retry_counter = RetryCounter::new(2); + let mango_group = Pubkey::from_str(&configuration.mango_group)?; + let mango_stream = + fail_or_retry!(retry_counter, Self::init_mango_source(configuration).await)?; + let (sender, _) = tokio::sync::broadcast::channel(8192); + let sender_clone = sender.clone(); + + // The representation of current on-chain account data + let chain_data = Arc::new(RwLock::new(chain_data::ChainData::new())); + let chain_data_clone = chain_data.clone(); + + let job = tokio::spawn(async move { + loop { + if exit.load(Ordering::Relaxed) { + warn!("shutting down data processor..."); + break; + } + tokio::select! { + Ok(msg) = mango_stream.recv() => { + let received_at = Utc::now(); + + msg.update_chain_data(&mut chain_data_clone.write().unwrap()); + + if sender_clone.receiver_count() == 0 { + continue; + } + + let event = Self::parse_message(msg, received_at, mango_group); + + if event.is_none() { + continue; + } + + let res = sender_clone.send(event.unwrap()); + if res.is_err() { + break; + } + }, + else => { + warn!("mango update channel err"); + break; + } + } + } + }); + + let result = DataProcessor { + channel: sender, + job, + chain_data, + }; + + Ok(result) + } + + fn new_rpc_async(configuration: &Configuration) -> RpcClientAsync { + let commitment = CommitmentConfig::processed(); + RpcClientAsync::new_with_timeout_and_commitment( + configuration.rpc_http_url.clone(), + Duration::from_secs(60), + commitment, + ) + } + + fn parse_message( + message: Message, + received_at: chrono::DateTime, + mango_group: Pubkey, + ) -> Option { + match message { + Message::Account(account_write) => { + if is_mango_account(&account_write.account, &mango_group).is_some() { + return Some(AccountUpdate(AccountUpdateEvent { + account: account_write.pubkey, + received_at, + })); + } + } + Message::Snapshot(snapshot) => { + let mut result = Vec::new(); + for update in snapshot.iter() { + if is_mango_account(&update.account, &mango_group).is_some() { + result.push(update.pubkey); + } + } + + return Some(Snapshot(SnapshotEvent { + accounts: result, + received_at, + })); + } + _ => {} + }; + + return Some(Other); + } + + async fn init_mango_source(configuration: &Configuration) -> anyhow::Result> { + // + // Client setup + // + let rpc_async = Self::new_rpc_async(configuration); + + let mango_group = Pubkey::from_str(&configuration.mango_group)?; + let group_context = MangoGroupContext::new_from_rpc(&rpc_async, mango_group).await?; + + let mango_oracles = group_context + .tokens + .values() + .map(|value| value.oracle) + .chain(group_context.perp_markets.values().map(|p| p.oracle)) + .unique() + .collect::>(); + + let serum_programs = group_context + .serum3_markets + .values() + .map(|s3| s3.serum_program) + .unique() + .collect_vec(); + + let (account_update_sender, account_update_receiver) = + async_channel::unbounded::(); + + websocket_source::start( + websocket_source::Config { + rpc_ws_url: configuration.rpc_ws_url.clone(), + serum_programs, + open_orders_authority: mango_group, + }, + mango_oracles.clone(), + account_update_sender.clone(), + ); + + let first_websocket_slot = websocket_source::get_next_create_bank_slot( + account_update_receiver.clone(), + Duration::from_secs(10), + ) + .await?; + + // Getting solana account snapshots via jsonrpc + // FUTURE: of what to fetch a snapshot - should probably take as an input + snapshot_source::start( + snapshot_source::Config { + rpc_http_url: configuration.rpc_http_url.clone(), + mango_group, + get_multiple_accounts_count: 100, + parallel_rpc_requests: 10, + snapshot_interval: Duration::from_secs(5 * 60), + min_slot: first_websocket_slot + 10, + }, + mango_oracles, + account_update_sender, + ); + + Ok(account_update_receiver) + } +} diff --git a/bin/service-mango-health/src/processors/exit.rs b/bin/service-mango-health/src/processors/exit.rs new file mode 100644 index 000000000..8ef8d0ef4 --- /dev/null +++ b/bin/service-mango-health/src/processors/exit.rs @@ -0,0 +1,39 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tracing::{info, warn}; + +pub struct ExitProcessor { + pub job: JoinHandle<()>, + pub exit: Arc, +} + +impl ExitProcessor { + pub async fn init() -> anyhow::Result { + let exit: Arc = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + + let job = tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(1000)); + loop { + if exit_clone.load(Ordering::Relaxed) { + warn!("shutting down logger processor..."); + break; + } + tokio::select! { + _ = interval.tick() => {} + _ = tokio::signal::ctrl_c()=> { + info!("Received SIGINT, shutting down..."); + exit_clone.store(true, Ordering::Relaxed); + break; + } + } + } + + warn!("shutting down exit processor..."); + }); + + let result = ExitProcessor { job, exit }; + Ok(result) + } +} diff --git a/bin/service-mango-health/src/processors/health.rs b/bin/service-mango-health/src/processors/health.rs new file mode 100644 index 000000000..d6dfe6fda --- /dev/null +++ b/bin/service-mango-health/src/processors/health.rs @@ -0,0 +1,177 @@ +use crate::configuration::Configuration; +use crate::processors::data::DataEvent; +use chrono::Utc; +use mango_v4::health::HealthType; +use mango_v4_client::chain_data::AccountFetcher; +use mango_v4_client::{chain_data, health_cache, FallbackOracleConfig, MangoGroupContext}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::pubkey::Pubkey; +use std::collections::HashSet; +use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::time::Instant; +use tokio::task::JoinHandle; +use tracing::warn; + +pub struct HealthProcessor { + pub channel: tokio::sync::broadcast::Sender, + pub job: JoinHandle<()>, +} + +#[derive(Clone, Debug)] +pub struct HealthEvent { + pub computed_at: chrono::DateTime, + pub components: Vec, +} + +#[derive(Clone, Debug)] +pub struct HealthComponent { + pub account: Pubkey, + pub value: Option, +} + +#[derive(Clone, Debug)] +pub struct HealthComponentValue { + pub maintenance_ratio: f64, + pub initial_health: f64, + pub maintenance_health: f64, + pub liquidation_end_health: f64, + pub is_being_liquidated: bool, +} + +impl HealthProcessor { + pub async fn init( + data_sender: &tokio::sync::broadcast::Sender, + chain_data: Arc>, + configuration: &Configuration, + exit: Arc, + ) -> anyhow::Result { + let (sender, _) = tokio::sync::broadcast::channel(8192); + let sender_clone = sender.clone(); + let mut data = data_sender.subscribe(); + let mut accounts = HashSet::::new(); + let mut snapshot_received = false; + let mut last_recompute = Instant::now(); + let recompute_interval = std::time::Duration::from_millis( + configuration.computing_configuration.recompute_interval_ms, + ); + + let account_fetcher = chain_data::AccountFetcher { + chain_data: chain_data.clone(), + rpc: RpcClient::new(configuration.rpc_http_url.clone()), + }; + + let mango_group_context = MangoGroupContext::new_from_rpc( + &account_fetcher.rpc, + Pubkey::from_str(&configuration.mango_group)?, + ) + .await?; + + let job = tokio::spawn(async move { + loop { + if exit.load(Ordering::Relaxed) { + warn!("shutting down health processor..."); + break; + } + + tokio::select! { + Ok(msg) = data.recv() => { + match msg { + DataEvent::AccountUpdate(upd) => { + accounts.insert(upd.account); + }, + DataEvent::Snapshot(snap) => { + for account in snap.accounts { + accounts.insert(account); + } + snapshot_received = true; + }, + DataEvent::Other => { + } + } + + if sender_clone.receiver_count() == 0 { + continue; + } + + if snapshot_received && last_recompute.elapsed() >= recompute_interval { + last_recompute = Instant::now(); + + let health_event = Self::compute_health(&mango_group_context, + &account_fetcher, + &accounts).await; + + let res = sender_clone.send(health_event); + if res.is_err() { + break; + } + } + }, + else => { + warn!("data update channel err"); + break; + } + } + } + }); + + let result = HealthProcessor { + channel: sender, + job, + }; + + Ok(result) + } + + async fn compute_health( + mango_group_context: &MangoGroupContext, + account_fetcher: &AccountFetcher, + accounts: &HashSet, + ) -> HealthEvent { + let computed_at = Utc::now(); + let mut components = Vec::new(); + + for account in accounts { + let value = + Self::compute_account_health(&mango_group_context, account_fetcher, &account).await; + + components.push({ + HealthComponent { + account: *account, + value: value.ok(), + } + }) + } + + HealthEvent { + computed_at, + components, + } + } + + async fn compute_account_health( + mango_group_context: &&MangoGroupContext, + account_fetcher: &AccountFetcher, + account: &Pubkey, + ) -> anyhow::Result { + let mango_account = account_fetcher.fetch_mango_account(account)?; + let health_cache = health_cache::new( + &mango_group_context, + &FallbackOracleConfig::Never, + &*account_fetcher, + &mango_account, + ) + .await?; + + let res = HealthComponentValue { + maintenance_ratio: health_cache.health_ratio(HealthType::Maint).to_num(), + initial_health: health_cache.health(HealthType::Init).to_num(), + maintenance_health: health_cache.health(HealthType::Maint).to_num(), + liquidation_end_health: health_cache.health(HealthType::LiquidationEnd).to_num(), + is_being_liquidated: mango_account.fixed.being_liquidated(), + }; + + Ok(res) + } +} diff --git a/bin/service-mango-health/src/processors/logger.rs b/bin/service-mango-health/src/processors/logger.rs new file mode 100644 index 000000000..aea15a033 --- /dev/null +++ b/bin/service-mango-health/src/processors/logger.rs @@ -0,0 +1,81 @@ +use crate::configuration::Configuration; +use crate::processors::health::{HealthComponentValue, HealthEvent}; +use solana_sdk::pubkey::Pubkey; +use std::collections::HashSet; +use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tracing::{info, warn}; + +pub struct LoggerProcessor { + pub job: JoinHandle<()>, +} + +impl LoggerProcessor { + pub async fn init( + data_sender: &tokio::sync::broadcast::Sender, + configuration: &Configuration, + exit: Arc, + ) -> anyhow::Result> { + let enable_logging = configuration.logging_configuration.log_health_to_stdout; + if !enable_logging { + return Ok(None); + } + + let mut data = data_sender.subscribe(); + let filter: HashSet = configuration + .logging_configuration + .log_health_for_accounts + .clone() + .unwrap_or_default() + .iter() + .map(|s| Pubkey::from_str(s).unwrap()) + .collect(); + + let job = tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(1000)); + loop { + if exit.load(Ordering::Relaxed) { + warn!("shutting down logger processor..."); + break; + } + tokio::select! { + _ = interval.tick() => { + }, + Ok(msg) = data.recv() => { + for component in msg.components { + if !filter.is_empty() && !filter.contains(&component.account) { + continue; + } + + if component.value.is_some() { + let value: HealthComponentValue = component.value.unwrap(); + + info!( + computed_at = %msg.computed_at, + account = %component.account, + maintenance_ratio = %value.maintenance_ratio, + initial_health = %value.initial_health, + maintenance_health = %value.maintenance_health, + liquidation_end_health = %value.liquidation_end_health, + is_being_liquidated = %value.is_being_liquidated, + ) + } else { + info!( + computed_at = %msg.computed_at, + account = %component.account, + error = "Missing health data" + ) + } + } + }, + } + } + }); + + let result = LoggerProcessor { job }; + + Ok(Some(result)) + } +} diff --git a/bin/service-mango-health/src/processors/mod.rs b/bin/service-mango-health/src/processors/mod.rs new file mode 100644 index 000000000..c3387cddc --- /dev/null +++ b/bin/service-mango-health/src/processors/mod.rs @@ -0,0 +1,5 @@ +pub mod data; +pub mod exit; +pub mod health; +pub mod logger; +pub mod persister; diff --git a/bin/service-mango-health/src/processors/persister.rs b/bin/service-mango-health/src/processors/persister.rs new file mode 100644 index 000000000..7843781b8 --- /dev/null +++ b/bin/service-mango-health/src/processors/persister.rs @@ -0,0 +1,712 @@ +use crate::configuration::Configuration; +use crate::processors::health::{HealthComponent, HealthEvent}; +use anchor_lang::prelude::Pubkey; +use chrono::{Duration, DurationRound, Utc}; +use futures_util::pin_mut; +use postgres_types::{ToSql, Type}; +use services_mango_lib::fail_or_retry; +use services_mango_lib::postgres_configuration::PostgresConfiguration; +use services_mango_lib::postgres_connection; +use services_mango_lib::retry_counter::RetryCounter; +use std::collections::{HashMap, VecDeque}; +use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tokio_postgres::binary_copy::BinaryCopyInWriter; +use tokio_postgres::{Client, Transaction}; +use tracing::{error, warn}; + +pub struct PersisterProcessor { + pub job: JoinHandle<()>, +} + +impl PersisterProcessor { + pub async fn init( + data_sender: &tokio::sync::broadcast::Sender, + configuration: &Configuration, + exit: Arc, + ) -> anyhow::Result> { + let postgres_configuration = configuration.postgres.clone().unwrap_or_default(); + let persistence_configuration = configuration.persistence_configuration.clone(); + let time_to_live = Duration::seconds(persistence_configuration.history_time_to_live_secs); + let periodicity = Duration::seconds(persistence_configuration.persist_max_periodicity_secs); + let max_snapshot_count = persistence_configuration.snapshot_queue_length; + let max_failure_duration = + Duration::seconds(persistence_configuration.max_failure_duration_secs); + + if !persistence_configuration.enabled { + return Ok(None); + } + + let mut data = data_sender.subscribe(); + let mut unpersisted_snapshots = VecDeque::new(); + + let job = tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(1000)); + let mut retry_counter = RetryCounter::new(persistence_configuration.max_retry_count); + let mut last_successful_persistence = chrono::Utc::now(); + + let mut connection = match fail_or_retry!( + retry_counter, + postgres_connection::connect(&postgres_configuration).await + ) { + Err(e) => { + tracing::error!("Failed to connect to postgres sql: {}", e); + return; + } + Ok(cnt) => cnt.0, + }; + + let mut previous = + match fail_or_retry!(retry_counter, Self::load_previous(&connection).await) { + Ok(prv) => prv, + Err(e) => { + error!("loading of previous state failed: {}", e); + return; + } + }; + + loop { + if exit.load(Ordering::Relaxed) { + warn!("shutting down persister processor..."); + break; + } + + tokio::select! { + _ = interval.tick() => { + }, + Ok(event) = data.recv() => { + Self::store_snapshot( + &previous, + &mut unpersisted_snapshots, + &event, + periodicity, + max_snapshot_count, + ); + + if let Err(e) = retry_counter.fail_or_ignore( + Self::persist_all_snapshots_and_update_state( + &mut connection, + &mut previous, + &mut unpersisted_snapshots, + time_to_live, + ) + .await, + ) { + error!("persistence failed (for {}): {}", chrono::Utc::now() - last_successful_persistence, e); + + match Self::try_to_reconnect(&postgres_configuration).await { + Ok(client) => { + connection = client; + } + Err(e) => { + if chrono::Utc::now() - last_successful_persistence + > max_failure_duration + { + error!("failed to reconnect (after multiple retries): {}", e); + break; // Shutdown processor + } + } + }; + } + + if unpersisted_snapshots.is_empty() { + last_successful_persistence = chrono::Utc::now(); + } + } + } + } + }); + + let result = PersisterProcessor { job }; + + Ok(Some(result)) + } + + fn build_persisted_data( + computed_at: chrono::DateTime, + component: &HealthComponent, + ) -> PersistedData { + match &component.value { + Some(value) => PersistedData { + computed_at: computed_at, + maintenance_ratio: Some(value.maintenance_ratio), + initial_health: Some(value.initial_health), + maintenance_health: Some(value.maintenance_health), + liquidation_end_health: Some(value.liquidation_end_health), + is_being_liquidated: Some(value.is_being_liquidated), + }, + None => PersistedData { + computed_at: computed_at, + maintenance_ratio: None, + initial_health: None, + maintenance_health: None, + liquidation_end_health: None, + is_being_liquidated: None, + }, + } + } + + fn store_snapshot( + previous: &HashMap, + snapshots: &mut VecDeque, + event: &HealthEvent, + periodicity: chrono::Duration, + max_snapshot_count: usize, + ) { + let bucket = event + .computed_at + .duration_round(periodicity) + .unwrap_or(chrono::DateTime::::MIN_UTC); + let mut previous_snapshot = &PersisterSnapshot { + bucket, + value: HashMap::new(), + }; + if !snapshots.is_empty() { + previous_snapshot = &snapshots[snapshots.len() - 1]; + } + + let updates = event + .components + .iter() + .filter_map(|component| { + let persisted_data = Self::build_persisted_data(event.computed_at, &component); + let should_insert_new_point = Self::should_insert( + &previous, + &component.account, + &persisted_data, + periodicity, + ); + let should_update_exising_point = + previous_snapshot.value.contains_key(&component.account); + + (should_insert_new_point || should_update_exising_point) + .then(|| (component.account, persisted_data)) + }) + .collect(); + + if let Some(existing_snapshot_for_bucket) = (*snapshots) + .iter_mut() + .find(|s| s.bucket == bucket) + .as_mut() + { + for (k, v) in updates { + existing_snapshot_for_bucket.value.insert(k, v); + } + return; + } + + if snapshots.len() >= max_snapshot_count { + snapshots.pop_front(); + } + + let snapshot = PersisterSnapshot { + bucket, + value: updates, + }; + + snapshots.push_back(snapshot); + } + + async fn persist_all_snapshots_and_update_state( + client: &mut Client, + previous: &mut HashMap, + snapshots: &mut VecDeque, + ttl: Duration, + ) -> anyhow::Result<()> { + loop { + if snapshots.is_empty() { + break; + } + + let snapshot = &snapshots[0]; + + if snapshot.value.len() == 0 { + snapshots.pop_front(); + continue; + } + + Self::persist_snapshot(client, &snapshot.value, ttl).await?; + + let snapshot = snapshots.pop_front().unwrap(); + for (k, v) in snapshot.value { + previous.insert(k, v); + } + } + + Ok(()) + } + + async fn try_to_reconnect( + postgres_configuration: &PostgresConfiguration, + ) -> anyhow::Result { + let client = postgres_connection::connect(&postgres_configuration) + .await? + .0; + + Ok(client) + } + + async fn load_previous(client: &Client) -> anyhow::Result> { + let rows = client + .query( + "SELECT Pubkey, Timestamp, MaintenanceRatio, Initial, Maintenance, LiquidationEnd, IsBeingLiquidated FROM mango_monitoring.health_current", + &[], + ) + .await?; + + let mut result = HashMap::::new(); + for row in rows { + let key = Pubkey::from_str(row.get(0))?; + result.insert( + key, + PersistedData { + computed_at: row.get(1), + maintenance_ratio: row.get(2), + initial_health: row.get(3), + maintenance_health: row.get(4), + liquidation_end_health: row.get(5), + is_being_liquidated: row.get(6), + }, + ); + } + + Ok(result) + } + + async fn persist_snapshot( + client: &mut Client, + updates: &HashMap, + ttl: chrono::Duration, + ) -> anyhow::Result<()> { + let tx = client.transaction().await?; + Self::insert_history(&tx, &updates).await?; + Self::delete_old_history(&tx, chrono::Utc::now(), ttl).await?; + Self::update_current(&tx).await?; + tx.commit().await?; + Ok(()) + } + + async fn insert_history<'tx>( + client: &Transaction<'tx>, + updates: &HashMap, + ) -> anyhow::Result<()> { + let col_types = [ + Type::VARCHAR, + Type::TIMESTAMPTZ, + Type::FLOAT8, + Type::FLOAT8, + Type::FLOAT8, + Type::FLOAT8, + Type::BOOL, + ]; + let sink = client.copy_in("COPY mango_monitoring.health_history (Pubkey, Timestamp, MaintenanceRatio, Initial, Maintenance, LiquidationEnd, IsBeingLiquidated) FROM STDIN BINARY").await?; + let writer = BinaryCopyInWriter::new(sink, &col_types); + pin_mut!(writer); + + for (key, value) in updates { + let key = key.to_string(); + let row: Vec<&'_ (dyn ToSql + Sync)> = vec![ + &key, + &value.computed_at, + &value.maintenance_ratio, + &value.initial_health, + &value.maintenance_health, + &value.liquidation_end_health, + &value.is_being_liquidated, + ]; + writer.as_mut().write(&row).await?; + } + + writer.finish().await?; + Ok(()) + } + + async fn update_current<'tx>(client: &Transaction<'tx>) -> anyhow::Result<()> { + let query = + postgres_query::query!("REFRESH MATERIALIZED VIEW mango_monitoring.health_current"); + query.execute(client).await?; + Ok(()) + } + + async fn delete_old_history<'tx>( + client: &Transaction<'tx>, + now: chrono::DateTime, + ttl: chrono::Duration, + ) -> anyhow::Result<()> { + let min_ts = now - ttl; + let query = postgres_query::query!( + "DELETE FROM mango_monitoring.health_history WHERE timestamp < $min_ts", + min_ts + ); + query.execute(client).await?; + Ok(()) + } + + fn should_insert( + persisted_data: &HashMap, + health_component_key: &Pubkey, + health_component: &PersistedData, + periodicity: Duration, + ) -> bool { + match persisted_data.get(health_component_key) { + None => true, + Some(previous) => { + let is_old = health_component.computed_at - previous.computed_at >= periodicity; + let between_none_and_some = previous.is_some() != health_component.is_some(); + + if is_old || between_none_and_some { + true + } else if previous.is_some() && health_component.is_some() { + let changing_flag = health_component.is_being_liquidated.unwrap() + != previous.is_being_liquidated.unwrap(); + + let curr = health_component.maintenance_ratio.unwrap(); + let prev = previous.maintenance_ratio.unwrap(); + let changing_side = (prev <= 0.0 && curr > 0.0) || (prev > 0.0 && curr <= 0.0); + let big_move = prev != 0.0 && (prev - curr).abs() / prev > 0.1; + + changing_side || changing_flag || big_move + } else { + false + } + } + } + } +} + +struct PersistedData { + pub computed_at: chrono::DateTime, + pub maintenance_ratio: Option, + pub initial_health: Option, + pub maintenance_health: Option, + pub liquidation_end_health: Option, + pub is_being_liquidated: Option, +} + +impl PersistedData { + pub fn is_some(&self) -> bool { + self.maintenance_ratio.is_some() + && self.initial_health.is_some() + && self.maintenance_health.is_some() + && self.liquidation_end_health.is_some() + && self.is_being_liquidated.is_some() + } +} + +struct PersisterSnapshot { + pub value: HashMap, + pub bucket: chrono::DateTime, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::processors::health::HealthComponentValue; + use chrono::SubsecRound; + + fn make_value(hr: f64, i: u64, m: u64, le: u64, ibl: bool) -> Option { + Some(HealthComponentValue { + maintenance_ratio: hr, + initial_health: i as f64, + maintenance_health: m as f64, + liquidation_end_health: le as f64, + is_being_liquidated: ibl, + }) + } + + fn make_persisted_empty(t_secs: i64) -> PersistedData { + PersistedData { + computed_at: chrono::Utc::now() - chrono::Duration::seconds(t_secs), + maintenance_ratio: None, + initial_health: None, + maintenance_health: None, + liquidation_end_health: None, + is_being_liquidated: None, + } + } + + fn make_persisted(t_secs: i64, mr: f64) -> PersistedData { + PersistedData { + computed_at: chrono::Utc::now() - chrono::Duration::seconds(t_secs), + maintenance_ratio: Some(mr), + initial_health: Some(1000f64), + maintenance_health: Some(1000f64), + liquidation_end_health: Some(1f64), + is_being_liquidated: Some(false), + } + } + + fn make_persisted_with_liquidated_flag(t_secs: i64, mr: f64) -> PersistedData { + PersistedData { + computed_at: chrono::Utc::now() - chrono::Duration::seconds(t_secs), + maintenance_ratio: Some(mr), + initial_health: Some(1000f64), + maintenance_health: Some(1000f64), + liquidation_end_health: Some(1f64), + is_being_liquidated: Some(true), + } + } + + #[test] + fn should_persist_if_there_is_no_previous_point() { + let previous = HashMap::new(); + + assert!(PersisterProcessor::should_insert( + &previous, + &Pubkey::new_unique(), + &make_persisted(0, 123f64), + chrono::Duration::seconds(60) + )); + + assert!(PersisterProcessor::should_insert( + &previous, + &Pubkey::new_unique(), + &make_persisted(0, 0f64), + chrono::Duration::seconds(60) + )); + + assert!(PersisterProcessor::should_insert( + &previous, + &Pubkey::new_unique(), + &make_persisted_empty(0), + chrono::Duration::seconds(60) + )); + } + + #[test] + fn should_persist_if_previous_point_is_old() { + let mut previous = HashMap::new(); + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + previous.insert(pk1, make_persisted(120, 123.0)); + previous.insert(pk2, make_persisted(3, 123.0)); + + assert!(PersisterProcessor::should_insert( + &previous, + &pk1, + &make_persisted(0, 124.0), + chrono::Duration::seconds(60) + )); + + assert!(!PersisterProcessor::should_insert( + &previous, + &pk2, + &make_persisted(0, 124.0), + chrono::Duration::seconds(60) + )); + } + + #[test] + fn should_persist_when_change_is_interesting() { + let mut previous = HashMap::new(); + let pk1 = Pubkey::new_unique(); + let pk2 = Pubkey::new_unique(); + + previous.insert(pk1, make_persisted(0, 123f64)); + + previous.insert(pk2, make_persisted(0, 0.01)); + + // small move, nop + assert!(!PersisterProcessor::should_insert( + &previous, + &pk1, + &make_persisted(0, 124.0), + chrono::Duration::seconds(60) + )); + + // big move, insert + assert!(PersisterProcessor::should_insert( + &previous, + &pk1, + &make_persisted(0, 100.0), + chrono::Duration::seconds(60) + )); + + // small move, but cross 0, insert + assert!(PersisterProcessor::should_insert( + &previous, + &pk2, + &make_persisted(0, -0.001), + chrono::Duration::seconds(60) + )); + + // small move, does not cross 0, nop + assert!(!PersisterProcessor::should_insert( + &previous, + &pk2, + &make_persisted(0, 0.0099), + chrono::Duration::seconds(60) + )); + + // no change except flag being liquidated change + assert!(PersisterProcessor::should_insert( + &previous, + &pk2, + &make_persisted_with_liquidated_flag(0, 0.01), + chrono::Duration::seconds(60) + )); + } + + #[test] + fn should_correctly_convert_event_into_data() { + let computed_at = chrono::Utc::now(); + let component = HealthComponent { + account: Pubkey::new_unique(), + value: Some(HealthComponentValue { + maintenance_ratio: 123.0, + initial_health: 1000.0, + maintenance_health: 2000.0, + liquidation_end_health: 3000.0, + is_being_liquidated: false, + }), + }; + + let converted = PersisterProcessor::build_persisted_data(computed_at, &component); + + assert_eq!(converted.computed_at, computed_at); + assert_eq!(converted.maintenance_ratio.unwrap(), 123.0); + assert_eq!(converted.initial_health.unwrap(), 1000.0); + assert_eq!(converted.maintenance_health.unwrap(), 2000.0); + assert_eq!(converted.liquidation_end_health.unwrap(), 3000.0); + assert_eq!(converted.is_being_liquidated.unwrap(), false); + } + + #[test] + fn should_store_or_replace_snapshot() { + let pk = Pubkey::new_unique(); + let previous = HashMap::new(); + let mut snapshots = VecDeque::new(); + let event1 = HealthEvent { + computed_at: chrono::Utc::now().trunc_subsecs(0) - chrono::Duration::seconds(300), + components: vec![HealthComponent { + account: pk, + value: make_value(50.25f64, 2, 3, 4, false), + }], + }; + let event2 = HealthEvent { + computed_at: chrono::Utc::now().trunc_subsecs(0) - chrono::Duration::seconds(290), + components: vec![HealthComponent { + account: pk, + value: make_value(502.5f64, 20, 30, 40, false), + }], + }; + let event3 = HealthEvent { + computed_at: chrono::Utc::now().trunc_subsecs(0) - chrono::Duration::seconds(200), + components: vec![HealthComponent { + account: pk, + value: make_value(5025.0f64, 200, 300, 400, false), + }], + }; + let event4 = HealthEvent { + computed_at: chrono::Utc::now().trunc_subsecs(0) - chrono::Duration::seconds(100), + components: vec![HealthComponent { + account: pk, + value: make_value(50250.0f64, 2000, 3000, 4000, false), + }], + }; + + PersisterProcessor::store_snapshot( + &previous, + &mut snapshots, + &event1, + Duration::seconds(60), + 2, + ); + assert_eq!(snapshots.len(), 1); + assert_eq!( + snapshots[0] + .value + .iter() + .next() + .unwrap() + .1 + .maintenance_health + .unwrap(), + 3.0 + ); + + PersisterProcessor::store_snapshot( + &previous, + &mut snapshots, + &event2, + Duration::seconds(60), + 2, + ); + assert_eq!(snapshots.len(), 1); + assert_eq!( + snapshots[0] + .value + .iter() + .next() + .unwrap() + .1 + .maintenance_health + .unwrap(), + 30.0 + ); + + PersisterProcessor::store_snapshot( + &previous, + &mut snapshots, + &event3, + Duration::seconds(60), + 2, + ); + assert_eq!(snapshots.len(), 2); + assert_eq!( + snapshots[0] + .value + .iter() + .next() + .unwrap() + .1 + .maintenance_health + .unwrap(), + 30.0 + ); + assert_eq!( + snapshots[1] + .value + .iter() + .next() + .unwrap() + .1 + .maintenance_health + .unwrap(), + 300.0 + ); + + PersisterProcessor::store_snapshot( + &previous, + &mut snapshots, + &event4, + Duration::seconds(60), + 2, + ); + assert_eq!(snapshots.len(), 2); + assert_eq!( + snapshots[0] + .value + .iter() + .next() + .unwrap() + .1 + .maintenance_health + .unwrap(), + 300.0 + ); + assert_eq!( + snapshots[1] + .value + .iter() + .next() + .unwrap() + .1 + .maintenance_health + .unwrap(), + 3000.0 + ); + } +} diff --git a/bin/service-mango-health/src/utils/mod.rs b/bin/service-mango-health/src/utils/mod.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/bin/service-mango-health/src/utils/mod.rs @@ -0,0 +1 @@ + diff --git a/cd/health.toml b/cd/health.toml new file mode 100644 index 000000000..66a37eb6d --- /dev/null +++ b/cd/health.toml @@ -0,0 +1,9 @@ +app = "mango-health-monitoring" +kill_signal = "SIGTERM" +kill_timeout = 30 + +[build] + dockerfile = "../Dockerfile" + +[experimental] + cmd = ["service-mango-health", "health-config.toml"] diff --git a/lib/services-mango-lib/Cargo.toml b/lib/services-mango-lib/Cargo.toml new file mode 100644 index 000000000..87911ede0 --- /dev/null +++ b/lib/services-mango-lib/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "services-mango-lib" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0" +base64 = "0.21" +tokio = { version = "1", features = ["full"] } +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } +tokio-postgres-rustls = "0.9.0" +postgres-types = { version = "0.2", features = ["array-impls", "derive", "with-chrono-0_4"] } +postgres-native-tls = "0.5" +native-tls = "0.2" +rustls = "0.20.8" +postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" } +tracing = { version = "0.1", features = ["log"] } +serde = { version = "1.0.188", features = ["derive"] } diff --git a/lib/services-mango-lib/src/lib.rs b/lib/services-mango-lib/src/lib.rs new file mode 100644 index 000000000..36329b591 --- /dev/null +++ b/lib/services-mango-lib/src/lib.rs @@ -0,0 +1,3 @@ +pub mod postgres_configuration; +pub mod postgres_connection; +pub mod retry_counter; diff --git a/lib/services-mango-lib/src/postgres_configuration.rs b/lib/services-mango-lib/src/postgres_configuration.rs new file mode 100644 index 000000000..85f8f6e75 --- /dev/null +++ b/lib/services-mango-lib/src/postgres_configuration.rs @@ -0,0 +1,16 @@ +use serde::Deserialize; + +#[derive(Clone, Debug, Deserialize, Default)] +pub struct PostgresConfiguration { + pub connection_string: String, + pub allow_invalid_certs: bool, + pub tls: Option, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct PostgresTlsConfig { + /// CA Cert file or env var + pub ca_cert_path: String, + /// PKCS12 client cert path + pub client_key_path: String, +} diff --git a/lib/services-mango-lib/src/postgres_connection.rs b/lib/services-mango-lib/src/postgres_connection.rs new file mode 100644 index 000000000..def70aa49 --- /dev/null +++ b/lib/services-mango-lib/src/postgres_connection.rs @@ -0,0 +1,66 @@ +use crate::postgres_configuration::PostgresConfiguration; +use native_tls::{Certificate, Identity, TlsConnector}; +use postgres_native_tls::MakeTlsConnector; +use std::{env, fs}; +use tokio::task::JoinHandle; +use tokio_postgres::Client; + +pub async fn connect( + config: &PostgresConfiguration, +) -> anyhow::Result<(Client, JoinHandle>)> { + // openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks + // base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64 + // fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills + // fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills + let tls = match &config.tls { + Some(tls) => { + use base64::{engine::general_purpose, Engine as _}; + let ca_cert = match &tls.ca_cert_path.chars().next().unwrap() { + '$' => general_purpose::STANDARD + .decode( + env::var(&tls.ca_cert_path[1..]) + .expect("reading client cert from env") + .into_bytes(), + ) + .expect("decoding client cert"), + _ => fs::read(&tls.ca_cert_path).expect("reading client cert from file"), + }; + let client_key = match &tls.client_key_path.chars().next().unwrap() { + '$' => general_purpose::STANDARD + .decode( + env::var(&tls.client_key_path[1..]) + .expect("reading client key from env") + .into_bytes(), + ) + .expect("decoding client key"), + _ => fs::read(&tls.client_key_path).expect("reading client key from file"), + }; + MakeTlsConnector::new( + TlsConnector::builder() + .add_root_certificate(Certificate::from_pem(&ca_cert)?) + .identity(Identity::from_pkcs12(&client_key, "pass")?) + .danger_accept_invalid_certs(config.allow_invalid_certs) + .build()?, + ) + } + None => MakeTlsConnector::new( + TlsConnector::builder() + .danger_accept_invalid_certs(config.allow_invalid_certs) + .build()?, + ), + }; + + let config = config.clone(); + let connection_string = match &config.connection_string.chars().next().unwrap() { + '$' => { + env::var(&config.connection_string[1..]).expect("reading connection string from env") + } + _ => config.connection_string.clone(), + }; + + let (client, connection) = tokio_postgres::connect(&connection_string, tls).await?; + + let handle = tokio::spawn(async move { connection.await }); + + Ok((client, handle)) +} diff --git a/lib/services-mango-lib/src/retry_counter.rs b/lib/services-mango-lib/src/retry_counter.rs new file mode 100644 index 000000000..a8423b8ee --- /dev/null +++ b/lib/services-mango-lib/src/retry_counter.rs @@ -0,0 +1,59 @@ +pub struct RetryCounter { + error_count: u64, + max_retry_count: u64, +} + +impl RetryCounter { + pub fn new(max_retry_count: u64) -> Self { + RetryCounter { + max_retry_count, + error_count: 0, + } + } + + pub fn reset(&mut self) { + self.error_count = 0; + } + + /// true if should retry, false if should bail + pub fn on_new_error(&mut self) -> bool { + self.error_count += 1; + self.error_count <= self.max_retry_count + } + + pub fn fail_or_ignore(&mut self, result: anyhow::Result) -> anyhow::Result> { + match result { + Err(e) => match self.on_new_error() { + true => Ok(None), + false => { + self.reset(); + Err(e) + } + }, + Ok(v) => { + self.reset(); + Ok(Some(v)) + } + } + } +} + +#[macro_export] +macro_rules! fail_or_retry { + ($retry_counter:expr, $f:expr) => {{ + loop { + let result = $retry_counter.fail_or_ignore($f); + match result { + Ok(Some(value)) => { + break Ok(value); + } + Ok(None) => { + continue; + } + Err(e) => { + break Err(e); + } + } + } + }}; +}