From c055d8b992200deb5a2b8e78b739bd366b1066ca Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Wed, 31 May 2023 14:12:26 +0100 Subject: [PATCH] Add more worker metrics --- src/worker/candle_batching/mod.rs | 9 +++-- src/worker/main.rs | 32 ++++++++++++++---- src/worker/metrics/mod.rs | 49 ++++++++++++++++++++++++++-- src/worker/trade_fetching/parsing.rs | 6 ++-- src/worker/trade_fetching/scrape.rs | 3 +- 5 files changed, 85 insertions(+), 14 deletions(-) diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index 56259b8..3a8cd0b 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -13,6 +13,8 @@ use crate::{ use self::higher_order_candles::batch_higher_order_candles; +use super::metrics::METRIC_CANDLES_TOTAL; + pub async fn batch_for_market( pool: &Pool, candles_sender: &Sender>, @@ -47,14 +49,15 @@ async fn batch_inner( ) -> anyhow::Result<()> { let market_name = &market.name.clone(); let candles = batch_1m_candles(pool, market).await?; - send_candles(candles, candles_sender).await; - + send_candles(candles.clone(), candles_sender).await; + METRIC_CANDLES_TOTAL.with_label_values(&[market.name.as_str()]).inc_by(candles.clone().len() as u64); for resolution in Resolution::iter() { if resolution == Resolution::R1m { continue; } let candles = batch_higher_order_candles(pool, market_name, resolution).await?; - send_candles(candles, candles_sender).await; + send_candles(candles.clone(), candles_sender).await; + METRIC_CANDLES_TOTAL.with_label_values(&[market.name.as_str()]).inc_by(candles.clone().len() as u64); } Ok(()) } diff --git a/src/worker/main.rs b/src/worker/main.rs index 95cdef9..f8b4550 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -2,7 +2,7 @@ use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::utils::Config; -use openbook_candles::worker::metrics::serve_metrics; +use openbook_candles::worker::metrics::{serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_CANDLES_QUEUE_LENGTH, METRIC_FILLS_QUEUE_LENGTH}; use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::{ database::{ @@ -13,7 +13,7 @@ use openbook_candles::{ }; use solana_sdk::pubkey::Pubkey; use std::env; -use std::{collections::HashMap, str::FromStr}; +use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] @@ -29,6 +29,9 @@ async fn main() -> anyhow::Result<()> { rpc_url: rpc_url.clone(), }; + let candles_queue_max_size = 10000; + let fills_queue_max_size = 10000; + let markets = load_markets(path_to_markets_json); let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); @@ -41,10 +44,10 @@ async fn main() -> anyhow::Result<()> { setup_database(&pool).await?; let mut handles = vec![]; - let (fill_sender, mut fill_receiver) = mpsc::channel::(10000); - + let (fill_sender, mut fill_receiver) = mpsc::channel::(fills_queue_max_size); + let scrape_fill_sender = fill_sender.clone(); handles.push(tokio::spawn(async move { - scrape(&config, &fill_sender, &target_markets).await; + scrape(&config, &scrape_fill_sender, &target_markets).await; })); let fills_pool = pool.clone(); @@ -56,7 +59,7 @@ async fn main() -> anyhow::Result<()> { } })); - let (candle_sender, mut candle_receiver) = mpsc::channel::>(100000); + let (candle_sender, mut candle_receiver) = mpsc::channel::>(candles_queue_max_size); for market in market_infos.into_iter() { let sender = candle_sender.clone(); @@ -78,6 +81,23 @@ async fn main() -> anyhow::Result<()> { } })); + let monitor_pool = pool.clone(); + let monitor_fill_channel = fill_sender.clone(); + let monitor_candle_channel = candle_sender.clone(); + handles.push(tokio::spawn(async move { + // TODO: maybe break this out into a new function + loop { + let pool_status = monitor_pool.status(); + METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); + METRIC_DB_POOL_SIZE.set(pool_status.size as i64); + + METRIC_CANDLES_QUEUE_LENGTH.set((candles_queue_max_size - monitor_candle_channel.capacity()) as i64); + METRIC_FILLS_QUEUE_LENGTH.set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64); + + tokio::time::sleep(WaitDuration::from_secs(10)).await; + } + })); + handles.push(tokio::spawn(async move { // TODO: this is ugly af serve_metrics().await.unwrap().await.unwrap(); diff --git a/src/worker/metrics/mod.rs b/src/worker/metrics/mod.rs index 6089970..8b65d48 100644 --- a/src/worker/metrics/mod.rs +++ b/src/worker/metrics/mod.rs @@ -1,18 +1,63 @@ use actix_web::{dev::Server, http::StatusCode, App, HttpServer}; use actix_web_prom::PrometheusMetricsBuilder; use lazy_static::lazy_static; -use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry}; +use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry, IntGauge, register_int_gauge_with_registry, register_int_counter_with_registry, IntCounter}; lazy_static! { static ref METRIC_REGISTRY: Registry = Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap(); + pub static ref METRIC_TXS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "txs_total", + "Total number of transactions scraped", + &["market", "status"], + METRIC_REGISTRY + ) + .unwrap(); pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( "fills_total", - "Total number of fills scraped", + "Total number of fills parsed", &["market"], METRIC_REGISTRY ) .unwrap(); + pub static ref METRIC_CANDLES_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "candles_total", + "Total number of candles generated", + &["market"], + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( + "fills_queue_length", + "Current length of the fills write queue", + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_CANDLES_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( + "candles_queue_length", + "Current length of the candles write queue", + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "rpc_errors_total", + "RPC errors while scraping", + &["method"], + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_DB_POOL_SIZE: IntGauge = register_int_gauge_with_registry!( + "db_pool_size", + "Current size of the DB connection pool", + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_DB_POOL_AVAILABLE: IntGauge = register_int_gauge_with_registry!( + "db_pool_available", + "Available DB connections in the pool", + METRIC_REGISTRY + ) + .unwrap(); } pub async fn serve_metrics() -> anyhow::Result { diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 22e3fb8..3d1981b 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -5,7 +5,7 @@ use solana_transaction_status::{ }; use std::{collections::HashMap, io::Error}; -use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw}; +use crate::{structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw}, worker::metrics::METRIC_RPC_ERRORS_TOTAL}; const PROGRAM_DATA: &str = "Program data: "; @@ -34,7 +34,9 @@ pub fn parse_trades_from_openbook_txns( } } } - Err(_) => {} + Err(_) => { + METRIC_RPC_ERRORS_TOTAL.with_label_values(&["getTransaction"]).inc(); + } } } fills_vector diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index 9c5b9c1..9dd7553 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -9,7 +9,7 @@ use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; use crate::{ - structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::METRIC_FILLS_TOTAL, + structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, }; use super::parsing::parse_trades_from_openbook_txns; @@ -60,6 +60,7 @@ pub async fn scrape_transactions( Ok(s) => s, Err(e) => { println!("Error in get_signatures_for_address_with_config: {}", e); + METRIC_RPC_ERRORS_TOTAL.with_label_values(&["getSignaturesForAddress"]).inc(); return before_sig; } };