diff --git a/Cargo.lock b/Cargo.lock index 99f3efb..b17fae6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -191,6 +191,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "actix-web-prom" +version = "0.6.0" +source = "git+https://github.com/riordanp/actix-web-prom.git?branch=exclude-paths#614434270cbcfdffa2b3a854aff4c1e49c4973fd" +dependencies = [ + "actix-web", + "futures-core", + "pin-project-lite", + "prometheus", + "regex", +] + [[package]] name = "adler" version = "1.0.2" @@ -3423,6 +3435,7 @@ name = "openbook-candles" version = "0.1.0" dependencies = [ "actix-web", + "actix-web-prom", "anchor-client", "anchor-lang", "anyhow", @@ -3438,11 +3451,13 @@ dependencies = [ "env_logger 0.10.0", "futures 0.3.27", "jsonrpc-core-client", + "lazy_static", "log 0.4.17", "native-tls", "num-traits", "num_enum 0.6.1", "postgres-native-tls", + "prometheus", "serde", "serde_derive", "serde_json", @@ -3966,6 +3981,21 @@ dependencies = [ "yansi", ] +[[package]] +name = "prometheus" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.12.1", + "protobuf", + "thiserror", +] + [[package]] name = "prost" version = "0.11.6" @@ -4021,6 +4051,12 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "protobuf-src" version = "1.1.0+21.5" diff --git a/Cargo.toml b/Cargo.toml index 002d5fc..a4b3607 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,9 +62,12 @@ serum_dex = { version = "0.5.10", git = "https://github.com/openbook-dex/program anchor-lang = ">=0.25.0" actix-web = "4" +actix-web-prom = { version = "0.6.0", git = "https://github.com/riordanp/actix-web-prom.git", branch = "exclude-paths" } arrayref = "0.3.6" bytemuck = "1.12.3" num_enum = "0.6.1" -config = "0.13.1" \ No newline at end of file +config = "0.13.1" +prometheus = "0.13.3" +lazy_static = "1.4.0" diff --git a/cd/server.toml b/cd/server.toml index 7b9acb4..966d00d 100644 --- a/cd/server.toml +++ b/cd/server.toml @@ -17,3 +17,7 @@ kill_timeout = 30 hard_limit = 1024 soft_limit = 1024 type = "connections" + +[metrics] + port = 9091 + path = "/metrics" diff --git a/cd/worker.toml b/cd/worker.toml index 8f6da45..258785b 100644 --- a/cd/worker.toml +++ b/cd/worker.toml @@ -7,3 +7,7 @@ kill_timeout = 30 [experimental] cmd = ["worker", "markets.json"] + +[metrics] + port = 9091 + path = "/metrics" \ No newline at end of file diff --git a/markets.json b/markets.json index 9f7d11b..ef69ff0 100644 --- a/markets.json +++ b/markets.json @@ -4,47 +4,35 @@ "address" : "8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6" }, { - "name" : "RLB/USDC", - "address" : "72h8rWaWwfPUL36PAFqyQZU8RT1V3FKG7Nc45aK89xTs" + "name" : "wBTCpo/USDC", + "address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN" }, { "name" : "MNGO/USDC", "address" : "3NnxQvDcZXputNMxaxsGvqiKpqgPfSYXpNigZNFcknmD" }, { - "name" : "BONK/SOL", - "address" : "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF" + "name": "BONK/SOL", + "address": "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF" }, { - "name" : "BONK/USDC", - "address" : "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71" - }, - { - "name" : "WBTC/USDC", - "address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN" + "name": "DUAL/USDC", + "address": "H6rrYK3SUHF2eguZCyJxnSBMJqjXhUtuaki6PHiutvum" }, { "name": "mSOL/USDC", "address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD" }, { - "name": "SOL/USDT", - "address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA" - }, - { - "name": "USDT/USDC", - "address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu" - }, - { - "name": "ETH/USDC", + "name": "ETHpo/USDC", "address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4" }, + { + "name": "BONK/USDC", + "address": "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71" + }, { "name": "RAY/USDC", "address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj" - }, - { - "name": "DUAL/USDC", - "address": "H6rrYK3SUHF2eguZCyJxnSBMJqjXhUtuaki6PHiutvum" } ] \ No newline at end of file diff --git a/src/backfill-candles/main.rs b/src/backfill-candles/main.rs index 0321d21..e7c2b39 100644 --- a/src/backfill-candles/main.rs +++ b/src/backfill-candles/main.rs @@ -1,12 +1,7 @@ - - use deadpool_postgres::Object; use openbook_candles::{ - database::{ - initialize::connect_to_database, - insert::{build_candles_upsert_statement}, - }, + database::{initialize::connect_to_database, insert::build_candles_upsert_statement}, structs::{ candle::Candle, markets::{fetch_market_infos, load_markets}, @@ -18,10 +13,9 @@ use openbook_candles::{ minute_candles::backfill_batch_1m_candles, }, }; -use std::{env}; +use std::env; use strum::IntoEnumIterator; - #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index 23a0fb4..8ac2ec3 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> { let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); for m in market_infos.clone() { - target_markets.insert(Pubkey::from_str(&m.address)?, 0); + target_markets.insert(Pubkey::from_str(&m.address)?, m.name); } println!("{:?}", target_markets); @@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> { pub async fn backfill( rpc_url: String, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) -> anyhow::Result<()> { println!("backfill started"); let mut before_sig: Option = None; @@ -145,7 +145,7 @@ pub async fn get_transactions( rpc_client: &RpcClient, mut sigs: Vec, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) { sigs.retain(|sig| sig.err.is_none()); if sigs.last().is_none() { diff --git a/src/database/fetch.rs b/src/database/fetch.rs index 11b2470..37f6043 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -59,10 +59,7 @@ pub async fn fetch_fills_from( let rows = client .query(stmt, &[&market_address_string, &start_time, &end_time]) .await?; - Ok(rows - .into_iter() - .map(PgOpenBookFill::from_row) - .collect()) + Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect()) } pub async fn fetch_latest_finished_candle( diff --git a/src/database/initialize.rs b/src/database/initialize.rs index d26bc42..4bc8a9c 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -36,6 +36,7 @@ pub async fn connect_to_database() -> anyhow::Result { MakeTlsConnector::new( TlsConnector::builder() .add_root_certificate(Certificate::from_pem(&ca_cert)?) + // TODO: make this configurable .identity(Identity::from_pkcs12(&client_key, "pass")?) .danger_accept_invalid_certs(false) .build()?, @@ -86,7 +87,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> { client .execute( "CREATE TABLE IF NOT EXISTS candles ( - id serial, + id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, market_name text, start_time timestamptz, end_time timestamptz, diff --git a/src/database/insert.rs b/src/database/insert.rs index d777f96..ba95e87 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,4 +1,5 @@ use deadpool_postgres::Pool; +use log::debug; use std::{ collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, @@ -14,7 +15,6 @@ pub async fn persist_fill_events( pool: &Pool, fill_receiver: &mut Receiver, ) -> anyhow::Result<()> { - let client = pool.get().await?; loop { let mut write_batch = HashMap::new(); while write_batch.len() < 10 { @@ -36,61 +36,18 @@ pub async fn persist_fill_events( } if !write_batch.is_empty() { - // print!("writing: {:?} events to DB\n", write_batch.len()); - - // match conn.ping().await { - // Ok(_) => { + debug!("writing: {:?} events to DB\n", write_batch.len()); let upsert_statement = build_fills_upsert_statement(write_batch); + let client = pool.get().await?; client .execute(&upsert_statement, &[]) .await .map_err_anyhow() .unwrap(); - // } - // Err(_) => { - // println!("Fills ping failed"); - // break; - // } - // } } } } -pub async fn persist_candles( - pool: Pool, - candles_receiver: &mut Receiver>, -) -> anyhow::Result<()> { - let client = pool.get().await.unwrap(); - loop { - // match client.ping().await { - // Ok(_) => { - match candles_receiver.try_recv() { - Ok(candles) => { - if candles.is_empty() { - continue; - } - // print!("writing: {:?} candles to DB\n", candles.len()); - let upsert_statement = build_candles_upsert_statement(candles); - client - .execute(&upsert_statement, &[]) - .await - .map_err_anyhow() - .unwrap(); - } - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Disconnected) => { - panic!("Candles sender must stay alive") - } - }; - // } - // Err(_) => { - // println!("Candle ping failed"); - // break; - // } - // }; - } -} - #[allow(deprecated)] fn build_fills_upsert_statement(events: HashMap) -> String { let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); @@ -126,7 +83,7 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin stmt } -pub fn build_candles_upsert_statement(candles: Vec) -> String { +pub fn build_candles_upsert_statement(candles: &Vec) -> String { let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES"); for (idx, candle) in candles.iter().enumerate() { let val_str = format!( diff --git a/src/server/main.rs b/src/server/main.rs index d319727..19a8706 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,9 +1,13 @@ use actix_web::{ + http::StatusCode, middleware::Logger, + rt::System, web::{self, Data}, App, HttpServer, }; +use actix_web_prom::PrometheusMetricsBuilder; use candles::get_candles; +use prometheus::Registry; use markets::get_markets; use openbook_candles::{ @@ -12,6 +16,7 @@ use openbook_candles::{ utils::{Config, WebContext}, }; use std::env; +use std::thread; use traders::{get_top_traders_by_base_volume, get_top_traders_by_quote_volume}; mod candles; @@ -39,6 +44,22 @@ async fn main() -> std::io::Result<()> { let market_infos = fetch_market_infos(&config, markets).await.unwrap(); let pool = connect_to_database().await.unwrap(); + let registry = Registry::new(); + // For serving metrics on a private port + let private_metrics = PrometheusMetricsBuilder::new("openbook_candles_server_private") + .registry(registry.clone()) + .exclude("/metrics") + .exclude_status(StatusCode::NOT_FOUND) + .endpoint("/metrics") + .build() + .unwrap(); + // For collecting metrics on the public api, excluding 404s + let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server") + .registry(registry.clone()) + .exclude_status(StatusCode::NOT_FOUND) + .build() + .unwrap(); + let context = Data::new(WebContext { rpc_url, pool, @@ -46,20 +67,40 @@ async fn main() -> std::io::Result<()> { }); println!("Starting server"); - HttpServer::new(move || { - App::new() - .wrap(Logger::default()) - .app_data(context.clone()) - .service( - web::scope("/api") - .service(get_candles) - .service(get_top_traders_by_base_volume) - .service(get_top_traders_by_quote_volume) - .service(get_markets) - .service(coingecko::service()), - ) - }) - .bind(&bind_addr)? - .run() - .await + // Thread to serve public API + let public_server = thread::spawn(move || { + let sys = System::new(); + let srv = HttpServer::new(move || { + App::new() + .wrap(Logger::default()) + .wrap(public_metrics.clone()) + .app_data(context.clone()) + .service( + web::scope("/api") + .service(get_candles) + .service(get_top_traders_by_base_volume) + .service(get_top_traders_by_quote_volume) + .service(get_markets) + .service(coingecko::service()), + ) + }) + .bind(&bind_addr) + .unwrap() + .run(); + sys.block_on(srv).unwrap(); + }); + + // Thread to serve metrics endpoint privately + let private_server = thread::spawn(move || { + let sys = System::new(); + let srv = HttpServer::new(move || App::new().wrap(private_metrics.clone())) + .bind("0.0.0.0:9091") + .unwrap() + .run(); + sys.block_on(srv).unwrap(); + }); + + private_server.join().unwrap(); + public_server.join().unwrap(); + Ok(()) } diff --git a/src/structs/candle.rs b/src/structs/candle.rs index 7cf26b2..733bd36 100644 --- a/src/structs/candle.rs +++ b/src/structs/candle.rs @@ -3,7 +3,7 @@ use tokio_postgres::Row; use super::resolution::Resolution; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct Candle { pub market_name: String, pub start_time: DateTime, diff --git a/src/worker/candle_batching/higher_order_candles.rs b/src/worker/candle_batching/higher_order_candles.rs index f38518b..e621a08 100644 --- a/src/worker/candle_batching/higher_order_candles.rs +++ b/src/worker/candle_batching/higher_order_candles.rs @@ -1,5 +1,6 @@ use chrono::{DateTime, Duration, DurationRound, Utc}; use deadpool_postgres::Pool; +use log::debug; use std::cmp::max; use crate::{ @@ -46,12 +47,12 @@ pub async fn batch_higher_order_candles( fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()) .await?; if constituent_candles.is_empty() { - // println!( - // "Batching {}, but no candles found for: {:?}, {}", - // resolution, - // market_name, - // resolution.get_constituent_resolution() - // ); + debug!( + "Batching {}, but no candles found for: {:?}, {}", + resolution, + market_name, + resolution.get_constituent_resolution() + ); return Ok(Vec::new()); } let start_time = constituent_candles[0].start_time.duration_trunc(day())?; @@ -82,7 +83,7 @@ fn combine_into_higher_order_candles( st: DateTime, seed_candle: Candle, ) -> Vec { - // println!("target_resolution: {}", target_resolution); + debug!("combining for target_resolution: {}", target_resolution); let duration = target_resolution.get_duration(); diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index 24e44f5..1dcc0b8 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -2,6 +2,7 @@ use std::cmp::min; use chrono::{DateTime, Duration, DurationRound, Utc}; use deadpool_postgres::Pool; +use log::debug; use crate::{ database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, @@ -24,9 +25,10 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul let start_time = candle.end_time; let end_time = min( start_time + day(), - Utc::now().duration_trunc(Duration::minutes(1))?, + (Utc::now() + Duration::minutes(1)).duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; + let candles = combine_fills_into_1m_candles( &mut fills, market, @@ -40,7 +42,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul let earliest_fill = fetch_earliest_fill(pool, market_address).await?; if earliest_fill.is_none() { - println!("No fills found for: {:?}", market_name); + debug!("No fills found for: {:?}", market_name); return Ok(Vec::new()); } @@ -98,7 +100,6 @@ fn combine_fills_into_1m_candles( while matches!(fills_iter.peek(), Some(f) if f.time < end_time) { let fill = fills_iter.next().unwrap(); - let (price, volume) = calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals); @@ -112,8 +113,8 @@ fn combine_fills_into_1m_candles( candles[i].start_time = start_time; candles[i].end_time = end_time; - candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time); - + candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time) + || end_time < Utc::now() - Duration::minutes(10); start_time = end_time; end_time += Duration::minutes(1); } @@ -132,7 +133,7 @@ pub async fn backfill_batch_1m_candles( let earliest_fill = fetch_earliest_fill(pool, &market.address).await?; if earliest_fill.is_none() { - println!("No fills found for: {:?}", &market_name); + debug!("No fills found for: {:?}", &market_name); return Ok(candles); } diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index 56259b8..310f9e5 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -3,31 +3,29 @@ pub mod minute_candles; use chrono::Duration; use deadpool_postgres::Pool; +use log::{error, warn}; use strum::IntoEnumIterator; -use tokio::{sync::mpsc::Sender, time::sleep}; +use tokio::time::sleep; use crate::{ + database::insert::build_candles_upsert_statement, structs::{candle::Candle, markets::MarketInfo, resolution::Resolution}, + utils::AnyhowWrap, worker::candle_batching::minute_candles::batch_1m_candles, }; use self::higher_order_candles::batch_higher_order_candles; -pub async fn batch_for_market( - pool: &Pool, - candles_sender: &Sender>, - market: &MarketInfo, -) -> anyhow::Result<()> { +pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { loop { - let sender = candles_sender.clone(); let market_clone = market.clone(); - // let client = pool.get().await?; + loop { sleep(Duration::milliseconds(2000).to_std()?).await; - match batch_inner(pool, &sender, &market_clone).await { + match batch_inner(pool, &market_clone).await { Ok(_) => {} Err(e) => { - println!( + error!( "Batching thread failed for {:?} with error: {:?}", market_clone.name.clone(), e @@ -36,33 +34,33 @@ pub async fn batch_for_market( } }; } - println!("Restarting {:?} batching thread", market.name); + warn!("Restarting {:?} batching thread", market.name); } } -async fn batch_inner( - pool: &Pool, - candles_sender: &Sender>, - market: &MarketInfo, -) -> anyhow::Result<()> { +async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { let market_name = &market.name.clone(); let candles = batch_1m_candles(pool, market).await?; - send_candles(candles, candles_sender).await; - + save_candles(pool, candles).await?; for resolution in Resolution::iter() { if resolution == Resolution::R1m { continue; } let candles = batch_higher_order_candles(pool, market_name, resolution).await?; - send_candles(candles, candles_sender).await; + save_candles(pool, candles).await?; } Ok(()) } -async fn send_candles(candles: Vec, candles_sender: &Sender>) { - if !candles.is_empty() { - if let Err(_) = candles_sender.send(candles).await { - panic!("candles receiver dropped"); - } +async fn save_candles(pool: &Pool, candles: Vec) -> anyhow::Result<()> { + if candles.len() == 0 { + return Ok(()); } + let upsert_statement = build_candles_upsert_statement(&candles); + let client = pool.get().await.unwrap(); + client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + Ok(()) } diff --git a/src/worker/main.rs b/src/worker/main.rs index 000f0e6..996cc41 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,23 +1,26 @@ - -use openbook_candles::structs::candle::Candle; +use log::{error, info}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::utils::Config; +use openbook_candles::worker::metrics::{ + serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_FILLS_QUEUE_LENGTH, +}; use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::{ database::{ initialize::{connect_to_database, setup_database}, - insert::{persist_candles, persist_fill_events}, + insert::{persist_fill_events}, }, worker::candle_batching::batch_for_market, }; use solana_sdk::pubkey::Pubkey; use std::env; -use std::{collections::HashMap, str::FromStr}; +use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { + env_logger::init(); dotenv::dotenv().ok(); let args: Vec = env::args().collect(); @@ -29,22 +32,24 @@ async fn main() -> anyhow::Result<()> { rpc_url: rpc_url.clone(), }; + let fills_queue_max_size = 10000; + let markets = load_markets(path_to_markets_json); let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); for m in market_infos.clone() { - target_markets.insert(Pubkey::from_str(&m.address)?, 0); + target_markets.insert(Pubkey::from_str(&m.address)?, m.name); } - println!("{:?}", target_markets); + info!("{:?}", target_markets); let pool = connect_to_database().await?; setup_database(&pool).await?; let mut handles = vec![]; - let (fill_sender, mut fill_receiver) = mpsc::channel::(1000); - + let (fill_sender, mut fill_receiver) = mpsc::channel::(fills_queue_max_size); + let scrape_fill_sender = fill_sender.clone(); handles.push(tokio::spawn(async move { - scrape(&config, &fill_sender, &target_markets).await; + scrape(&config, &scrape_fill_sender, &target_markets).await; })); let fills_pool = pool.clone(); @@ -56,28 +61,35 @@ async fn main() -> anyhow::Result<()> { } })); - let (candle_sender, mut candle_receiver) = mpsc::channel::>(1000); - for market in market_infos.into_iter() { - let sender = candle_sender.clone(); let batch_pool = pool.clone(); handles.push(tokio::spawn(async move { - batch_for_market(&batch_pool, &sender, &market) - .await - .unwrap(); - println!("SOMETHING WENT WRONG"); + batch_for_market(&batch_pool, &market).await.unwrap(); + error!("batching halted for market {}", &market.name); })); } - let persist_pool = pool.clone(); + let monitor_pool = pool.clone(); + let monitor_fill_channel = fill_sender.clone(); handles.push(tokio::spawn(async move { + // TODO: maybe break this out into a new function loop { - persist_candles(persist_pool.clone(), &mut candle_receiver) - .await - .unwrap(); + let pool_status = monitor_pool.status(); + METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); + METRIC_DB_POOL_SIZE.set(pool_status.size as i64); + + METRIC_FILLS_QUEUE_LENGTH + .set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64); + + tokio::time::sleep(WaitDuration::from_secs(10)).await; } })); + handles.push(tokio::spawn(async move { + // TODO: this is ugly af + serve_metrics().await.unwrap().await.unwrap(); + })); + futures::future::join_all(handles).await; Ok(()) diff --git a/src/worker/metrics/mod.rs b/src/worker/metrics/mod.rs new file mode 100644 index 0000000..09488f3 --- /dev/null +++ b/src/worker/metrics/mod.rs @@ -0,0 +1,76 @@ +use actix_web::{dev::Server, http::StatusCode, App, HttpServer}; +use actix_web_prom::PrometheusMetricsBuilder; +use lazy_static::lazy_static; +use prometheus::{ + register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec, + IntGauge, Registry, +}; + +lazy_static! { + static ref METRIC_REGISTRY: Registry = + Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap(); + pub static ref METRIC_TXS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "txs_total", + "Total number of transactions scraped", + &["market", "status"], + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "fills_total", + "Total number of fills parsed", + &["market"], + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_CANDLES_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "candles_total", + "Total number of candles generated", + &["market"], + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( + "fills_queue_length", + "Current length of the fills write queue", + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec = + register_int_counter_vec_with_registry!( + "rpc_errors_total", + "RPC errors while scraping", + &["method"], + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_DB_POOL_SIZE: IntGauge = register_int_gauge_with_registry!( + "db_pool_size", + "Current size of the DB connection pool", + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_DB_POOL_AVAILABLE: IntGauge = register_int_gauge_with_registry!( + "db_pool_available", + "Available DB connections in the pool", + METRIC_REGISTRY + ) + .unwrap(); +} + +pub async fn serve_metrics() -> anyhow::Result { + let metrics = PrometheusMetricsBuilder::new("openbook_candles_worker") + .registry(METRIC_REGISTRY.clone()) + .exclude("/metrics") + .exclude_status(StatusCode::NOT_FOUND) + .endpoint("/metrics") + .build() + .unwrap(); + let server = HttpServer::new(move || App::new().wrap(metrics.clone())) + .bind("0.0.0.0:9091") + .unwrap() + .disable_signals() + .run(); + + Ok(server) +} diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 04e662b..53437ac 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,2 +1,3 @@ pub mod candle_batching; +pub mod metrics; pub mod trade_fetching; diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 02c8d64..a31ce5c 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -1,3 +1,4 @@ +use log::warn; use solana_client::client_error::Result as ClientResult; use solana_sdk::pubkey::Pubkey; use solana_transaction_status::{ @@ -5,13 +6,16 @@ use solana_transaction_status::{ }; use std::{collections::HashMap, io::Error}; -use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw}; +use crate::{ + structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw}, + worker::metrics::METRIC_RPC_ERRORS_TOTAL, +}; const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, - target_markets: &HashMap, + target_markets: &HashMap, ) -> Vec { let mut fills_vector = Vec::::new(); for txn in txns.iter_mut() { @@ -34,7 +38,12 @@ pub fn parse_trades_from_openbook_txns( } } } - Err(_) => {} + Err(e) => { + warn!("rpc error in get_transaction {}", e); + METRIC_RPC_ERRORS_TOTAL + .with_label_values(&["getTransaction"]) + .inc(); + } } } fills_vector @@ -42,7 +51,7 @@ pub fn parse_trades_from_openbook_txns( fn parse_openbook_fills_from_logs( logs: &Vec, - target_markets: &HashMap, + target_markets: &HashMap, block_time: i64, ) -> Option> { let mut fills_vector = Vec::::new(); diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index 6a5bf09..bbba445 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -1,4 +1,5 @@ use futures::future::join_all; +use log::{debug, warn}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::RpcTransactionConfig, @@ -8,14 +9,18 @@ use solana_transaction_status::UiTransactionEncoding; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; -use crate::{structs::openbook::OpenBookFillEvent, utils::Config}; +use crate::{ + structs::openbook::OpenBookFillEvent, + utils::Config, + worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, +}; use super::parsing::parse_trades_from_openbook_txns; pub async fn scrape( config: &Config, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) { let rpc_client = RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed()); @@ -39,7 +44,7 @@ pub async fn scrape_transactions( before_sig: Option, limit: Option, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) -> Option { let rpc_config = GetConfirmedSignaturesForAddress2Config { before: before_sig, @@ -57,13 +62,16 @@ pub async fn scrape_transactions( { Ok(s) => s, Err(e) => { - println!("Error in get_signatures_for_address_with_config: {}", e); + warn!("rpc error in get_signatures_for_address_with_config: {}", e); + METRIC_RPC_ERRORS_TOTAL + .with_label_values(&["getSignaturesForAddress"]) + .inc(); return before_sig; } }; if sigs.is_empty() { - println!("No signatures found"); + debug!("No signatures found"); return before_sig; } @@ -96,9 +104,11 @@ pub async fn scrape_transactions( let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); if !fills.is_empty() { for fill in fills.into_iter() { + let market_name = target_markets.get(&fill.market).unwrap(); if let Err(_) = fill_sender.send(fill).await { panic!("receiver dropped"); } + METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc(); } }