From 0591f5b3ab45022e84273d7140a49d1d900f8289 Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Mon, 29 May 2023 18:37:03 +0100 Subject: [PATCH 01/12] Add request metrics to server --- Cargo.lock | 35 +++++++++++++++++++++ Cargo.toml | 4 ++- src/server/main.rs | 77 ++++++++++++++++++++++++++++++++++++---------- 3 files changed, 98 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 99f3efb..2093321 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -191,6 +191,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "actix-web-prom" +version = "0.6.0" +source = "git+https://github.com/riordanp/actix-web-prom.git?branch=exclude-paths#614434270cbcfdffa2b3a854aff4c1e49c4973fd" +dependencies = [ + "actix-web", + "futures-core", + "pin-project-lite", + "prometheus", + "regex", +] + [[package]] name = "adler" version = "1.0.2" @@ -3423,6 +3435,7 @@ name = "openbook-candles" version = "0.1.0" dependencies = [ "actix-web", + "actix-web-prom", "anchor-client", "anchor-lang", "anyhow", @@ -3443,6 +3456,7 @@ dependencies = [ "num-traits", "num_enum 0.6.1", "postgres-native-tls", + "prometheus", "serde", "serde_derive", "serde_json", @@ -3966,6 +3980,21 @@ dependencies = [ "yansi", ] +[[package]] +name = "prometheus" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "memchr", + "parking_lot 0.12.1", + "protobuf", + "thiserror", +] + [[package]] name = "prost" version = "0.11.6" @@ -4021,6 +4050,12 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "protobuf-src" version = "1.1.0+21.5" diff --git a/Cargo.toml b/Cargo.toml index 002d5fc..cf5ef4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,9 +62,11 @@ serum_dex = { version = "0.5.10", git = "https://github.com/openbook-dex/program anchor-lang = ">=0.25.0" actix-web = "4" +actix-web-prom = { version = "0.6.0", git = "https://github.com/riordanp/actix-web-prom.git", branch = "exclude-paths" } arrayref = "0.3.6" bytemuck = "1.12.3" num_enum = "0.6.1" -config = "0.13.1" \ No newline at end of file +config = "0.13.1" +prometheus = "0.13.3" diff --git a/src/server/main.rs b/src/server/main.rs index d319727..a940956 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,9 +1,12 @@ use actix_web::{ middleware::Logger, + rt::System, web::{self, Data}, - App, HttpServer, + App, HttpServer, http::StatusCode, }; +use actix_web_prom::PrometheusMetricsBuilder; use candles::get_candles; +use prometheus::Registry; use markets::get_markets; use openbook_candles::{ @@ -12,6 +15,7 @@ use openbook_candles::{ utils::{Config, WebContext}, }; use std::env; +use std::thread; use traders::{get_top_traders_by_base_volume, get_top_traders_by_quote_volume}; mod candles; @@ -39,6 +43,22 @@ async fn main() -> std::io::Result<()> { let market_infos = fetch_market_infos(&config, markets).await.unwrap(); let pool = connect_to_database().await.unwrap(); + let registry = Registry::new(); + // For serving metrics on a private port + let private_metrics = PrometheusMetricsBuilder::new("openbook_candles_server_private") + .registry(registry.clone()) + .exclude("/metrics") + .exclude_status(StatusCode::NOT_FOUND) + .endpoint("/metrics") + .build() + .unwrap(); + // For collecting metrics on the public api, excluding 404s + let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server") + .registry(registry.clone()) + .exclude_status(StatusCode::NOT_FOUND) + .build() + .unwrap(); + let context = Data::new(WebContext { rpc_url, pool, @@ -46,20 +66,43 @@ async fn main() -> std::io::Result<()> { }); println!("Starting server"); - HttpServer::new(move || { - App::new() - .wrap(Logger::default()) - .app_data(context.clone()) - .service( - web::scope("/api") - .service(get_candles) - .service(get_top_traders_by_base_volume) - .service(get_top_traders_by_quote_volume) - .service(get_markets) - .service(coingecko::service()), - ) - }) - .bind(&bind_addr)? - .run() - .await + // Thread to serve public API + let public_server = thread::spawn(move || { + let sys = System::new(); + let srv = HttpServer::new(move || { + App::new() + .wrap(Logger::default()) + .wrap(public_metrics.clone()) + .app_data(context.clone()) + .service( + web::scope("/api") + .service(get_candles) + .service(get_top_traders_by_base_volume) + .service(get_top_traders_by_quote_volume) + .service(get_markets) + .service(coingecko::service()), + ) + }) + .bind(&bind_addr) + .unwrap() + .run(); + sys.block_on(srv).unwrap(); + }); + + // Thread to serve metrics endpoint privately + let private_server = thread::spawn(move || { + let sys = System::new(); + let srv = HttpServer::new(move || { + App::new() + .wrap(private_metrics.clone()) + }) + .bind("0.0.0.0:9091") + .unwrap() + .run(); + sys.block_on(srv).unwrap(); + }); + + private_server.join().unwrap(); + public_server.join().unwrap(); + Ok(()) } From 522e7a58657b5f2bfbb83a517876acc1da539d56 Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Mon, 29 May 2023 20:42:26 +0100 Subject: [PATCH 02/12] Add mango mainnet markets --- markets.json | 30 +++++++++--------------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/markets.json b/markets.json index a78a50a..e10ce64 100644 --- a/markets.json +++ b/markets.json @@ -4,43 +4,31 @@ "address" : "8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6" }, { - "name" : "RLB/USDC", - "address" : "72h8rWaWwfPUL36PAFqyQZU8RT1V3FKG7Nc45aK89xTs" + "name" : "wBTCpo/USDC", + "address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN" }, { "name" : "MNGO/USDC", "address" : "3NnxQvDcZXputNMxaxsGvqiKpqgPfSYXpNigZNFcknmD" }, { - "name" : "BONK/SOL", - "address" : "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF" + "name": "BONK/SOL", + "address": "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF" }, { - "name" : "WBTC/USDC", - "address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN" + "name": "DUAL/USDC", + "address": "H6rrYK3SUHF2eguZCyJxnSBMJqjXhUtuaki6PHiutvum" }, { "name": "mSOL/USDC", "address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD" }, { - "name": "SOL/USDT", - "address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA" - }, - { - "name": "USDT/USDC", - "address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu" - }, - { - "name": "ETH/USDC", + "name": "ETHpo/USDC", "address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4" }, { - "name": "RAY/USDC", - "address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj" - }, - { - "name": "RAY/USDT", - "address": "GpHbiJJ9VHiuHVXeoet121Utrbm1CSNNzYrBKB8Xz2oz" + "name": "BONK/USDC", + "address": "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71" } ] \ No newline at end of file From ac2e9066882e6e8a8457d63614fb252cd704f2a1 Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Mon, 29 May 2023 20:48:53 +0100 Subject: [PATCH 03/12] Enable fly metric polling --- cd/server.toml | 4 ++++ cd/worker.toml | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/cd/server.toml b/cd/server.toml index 7b9acb4..966d00d 100644 --- a/cd/server.toml +++ b/cd/server.toml @@ -17,3 +17,7 @@ kill_timeout = 30 hard_limit = 1024 soft_limit = 1024 type = "connections" + +[metrics] + port = 9091 + path = "/metrics" diff --git a/cd/worker.toml b/cd/worker.toml index 8f6da45..258785b 100644 --- a/cd/worker.toml +++ b/cd/worker.toml @@ -7,3 +7,7 @@ kill_timeout = 30 [experimental] cmd = ["worker", "markets.json"] + +[metrics] + port = 9091 + path = "/metrics" \ No newline at end of file From 9489bd3e78e7742e2a94561d3fb304d8d590f4d3 Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Tue, 30 May 2023 12:16:20 +0100 Subject: [PATCH 04/12] Add RAY/USDC market --- markets.json | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/markets.json b/markets.json index e10ce64..b376379 100644 --- a/markets.json +++ b/markets.json @@ -30,5 +30,9 @@ { "name": "BONK/USDC", "address": "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71" + }, + { + "name": "RAY/USDC", + "address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxV" } ] \ No newline at end of file From 35937c9572fe57854b89712e5611670822fc518d Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Tue, 30 May 2023 18:01:47 +0100 Subject: [PATCH 05/12] Add metrics to worker, add market name to map, cargo fmt, fix RAY/USDC id --- Cargo.lock | 1 + Cargo.toml | 1 + markets.json | 2 +- src/backfill-candles/main.rs | 10 ++------- src/backfill-trades/main.rs | 6 ++--- src/database/fetch.rs | 5 +---- src/database/initialize.rs | 1 + src/server/main.rs | 14 +++++------- src/worker/main.rs | 13 +++++++---- src/worker/metrics/mod.rs | 33 ++++++++++++++++++++++++++++ src/worker/mod.rs | 1 + src/worker/trade_fetching/parsing.rs | 4 ++-- src/worker/trade_fetching/scrape.rs | 10 ++++++--- 13 files changed, 68 insertions(+), 33 deletions(-) create mode 100644 src/worker/metrics/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 2093321..b17fae6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3451,6 +3451,7 @@ dependencies = [ "env_logger 0.10.0", "futures 0.3.27", "jsonrpc-core-client", + "lazy_static", "log 0.4.17", "native-tls", "num-traits", diff --git a/Cargo.toml b/Cargo.toml index cf5ef4c..a4b3607 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,3 +70,4 @@ num_enum = "0.6.1" config = "0.13.1" prometheus = "0.13.3" +lazy_static = "1.4.0" diff --git a/markets.json b/markets.json index b376379..ef69ff0 100644 --- a/markets.json +++ b/markets.json @@ -33,6 +33,6 @@ }, { "name": "RAY/USDC", - "address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxV" + "address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj" } ] \ No newline at end of file diff --git a/src/backfill-candles/main.rs b/src/backfill-candles/main.rs index 0321d21..e7c2b39 100644 --- a/src/backfill-candles/main.rs +++ b/src/backfill-candles/main.rs @@ -1,12 +1,7 @@ - - use deadpool_postgres::Object; use openbook_candles::{ - database::{ - initialize::connect_to_database, - insert::{build_candles_upsert_statement}, - }, + database::{initialize::connect_to_database, insert::build_candles_upsert_statement}, structs::{ candle::Candle, markets::{fetch_market_infos, load_markets}, @@ -18,10 +13,9 @@ use openbook_candles::{ minute_candles::backfill_batch_1m_candles, }, }; -use std::{env}; +use std::env; use strum::IntoEnumIterator; - #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index 23a0fb4..8ac2ec3 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> { let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); for m in market_infos.clone() { - target_markets.insert(Pubkey::from_str(&m.address)?, 0); + target_markets.insert(Pubkey::from_str(&m.address)?, m.name); } println!("{:?}", target_markets); @@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> { pub async fn backfill( rpc_url: String, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) -> anyhow::Result<()> { println!("backfill started"); let mut before_sig: Option = None; @@ -145,7 +145,7 @@ pub async fn get_transactions( rpc_client: &RpcClient, mut sigs: Vec, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) { sigs.retain(|sig| sig.err.is_none()); if sigs.last().is_none() { diff --git a/src/database/fetch.rs b/src/database/fetch.rs index d0882ef..9e58f61 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -67,10 +67,7 @@ pub async fn fetch_fills_from( let rows = client .query(&stmt, &[&market_address_string, &start_time, &end_time]) .await?; - Ok(rows - .into_iter() - .map(PgOpenBookFill::from_row) - .collect()) + Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect()) } pub async fn fetch_latest_finished_candle( diff --git a/src/database/initialize.rs b/src/database/initialize.rs index d26bc42..92fd854 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -36,6 +36,7 @@ pub async fn connect_to_database() -> anyhow::Result { MakeTlsConnector::new( TlsConnector::builder() .add_root_certificate(Certificate::from_pem(&ca_cert)?) + // TODO: make this configurable .identity(Identity::from_pkcs12(&client_key, "pass")?) .danger_accept_invalid_certs(false) .build()?, diff --git a/src/server/main.rs b/src/server/main.rs index a940956..19a8706 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,8 +1,9 @@ use actix_web::{ + http::StatusCode, middleware::Logger, rt::System, web::{self, Data}, - App, HttpServer, http::StatusCode, + App, HttpServer, }; use actix_web_prom::PrometheusMetricsBuilder; use candles::get_candles; @@ -92,13 +93,10 @@ async fn main() -> std::io::Result<()> { // Thread to serve metrics endpoint privately let private_server = thread::spawn(move || { let sys = System::new(); - let srv = HttpServer::new(move || { - App::new() - .wrap(private_metrics.clone()) - }) - .bind("0.0.0.0:9091") - .unwrap() - .run(); + let srv = HttpServer::new(move || App::new().wrap(private_metrics.clone())) + .bind("0.0.0.0:9091") + .unwrap() + .run(); sys.block_on(srv).unwrap(); }); diff --git a/src/worker/main.rs b/src/worker/main.rs index 000f0e6..95cdef9 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,8 +1,8 @@ - use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::utils::Config; +use openbook_candles::worker::metrics::serve_metrics; use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::{ database::{ @@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> { let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); for m in market_infos.clone() { - target_markets.insert(Pubkey::from_str(&m.address)?, 0); + target_markets.insert(Pubkey::from_str(&m.address)?, m.name); } println!("{:?}", target_markets); @@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> { setup_database(&pool).await?; let mut handles = vec![]; - let (fill_sender, mut fill_receiver) = mpsc::channel::(1000); + let (fill_sender, mut fill_receiver) = mpsc::channel::(10000); handles.push(tokio::spawn(async move { scrape(&config, &fill_sender, &target_markets).await; @@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> { } })); - let (candle_sender, mut candle_receiver) = mpsc::channel::>(1000); + let (candle_sender, mut candle_receiver) = mpsc::channel::>(100000); for market in market_infos.into_iter() { let sender = candle_sender.clone(); @@ -78,6 +78,11 @@ async fn main() -> anyhow::Result<()> { } })); + handles.push(tokio::spawn(async move { + // TODO: this is ugly af + serve_metrics().await.unwrap().await.unwrap(); + })); + futures::future::join_all(handles).await; Ok(()) diff --git a/src/worker/metrics/mod.rs b/src/worker/metrics/mod.rs new file mode 100644 index 0000000..6089970 --- /dev/null +++ b/src/worker/metrics/mod.rs @@ -0,0 +1,33 @@ +use actix_web::{dev::Server, http::StatusCode, App, HttpServer}; +use actix_web_prom::PrometheusMetricsBuilder; +use lazy_static::lazy_static; +use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry}; + +lazy_static! { + static ref METRIC_REGISTRY: Registry = + Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap(); + pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "fills_total", + "Total number of fills scraped", + &["market"], + METRIC_REGISTRY + ) + .unwrap(); +} + +pub async fn serve_metrics() -> anyhow::Result { + let metrics = PrometheusMetricsBuilder::new("openbook_candles_worker") + .registry(METRIC_REGISTRY.clone()) + .exclude("/metrics") + .exclude_status(StatusCode::NOT_FOUND) + .endpoint("/metrics") + .build() + .unwrap(); + let server = HttpServer::new(move || App::new().wrap(metrics.clone())) + .bind("0.0.0.0:9091") + .unwrap() + .disable_signals() + .run(); + + Ok(server) +} diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 04e662b..53437ac 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,2 +1,3 @@ pub mod candle_batching; +pub mod metrics; pub mod trade_fetching; diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 02c8d64..22e3fb8 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -11,7 +11,7 @@ const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, - target_markets: &HashMap, + target_markets: &HashMap, ) -> Vec { let mut fills_vector = Vec::::new(); for txn in txns.iter_mut() { @@ -42,7 +42,7 @@ pub fn parse_trades_from_openbook_txns( fn parse_openbook_fills_from_logs( logs: &Vec, - target_markets: &HashMap, + target_markets: &HashMap, block_time: i64, ) -> Option> { let mut fills_vector = Vec::::new(); diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index 6a5bf09..9c5b9c1 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -8,14 +8,16 @@ use solana_transaction_status::UiTransactionEncoding; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; -use crate::{structs::openbook::OpenBookFillEvent, utils::Config}; +use crate::{ + structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::METRIC_FILLS_TOTAL, +}; use super::parsing::parse_trades_from_openbook_txns; pub async fn scrape( config: &Config, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) { let rpc_client = RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed()); @@ -39,7 +41,7 @@ pub async fn scrape_transactions( before_sig: Option, limit: Option, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) -> Option { let rpc_config = GetConfirmedSignaturesForAddress2Config { before: before_sig, @@ -96,9 +98,11 @@ pub async fn scrape_transactions( let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); if !fills.is_empty() { for fill in fills.into_iter() { + let market_name = target_markets.get(&fill.market).unwrap(); if let Err(_) = fill_sender.send(fill).await { panic!("receiver dropped"); } + METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc(); } } From c055d8b992200deb5a2b8e78b739bd366b1066ca Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Wed, 31 May 2023 14:12:26 +0100 Subject: [PATCH 06/12] Add more worker metrics --- src/worker/candle_batching/mod.rs | 9 +++-- src/worker/main.rs | 32 ++++++++++++++---- src/worker/metrics/mod.rs | 49 ++++++++++++++++++++++++++-- src/worker/trade_fetching/parsing.rs | 6 ++-- src/worker/trade_fetching/scrape.rs | 3 +- 5 files changed, 85 insertions(+), 14 deletions(-) diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index 56259b8..3a8cd0b 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -13,6 +13,8 @@ use crate::{ use self::higher_order_candles::batch_higher_order_candles; +use super::metrics::METRIC_CANDLES_TOTAL; + pub async fn batch_for_market( pool: &Pool, candles_sender: &Sender>, @@ -47,14 +49,15 @@ async fn batch_inner( ) -> anyhow::Result<()> { let market_name = &market.name.clone(); let candles = batch_1m_candles(pool, market).await?; - send_candles(candles, candles_sender).await; - + send_candles(candles.clone(), candles_sender).await; + METRIC_CANDLES_TOTAL.with_label_values(&[market.name.as_str()]).inc_by(candles.clone().len() as u64); for resolution in Resolution::iter() { if resolution == Resolution::R1m { continue; } let candles = batch_higher_order_candles(pool, market_name, resolution).await?; - send_candles(candles, candles_sender).await; + send_candles(candles.clone(), candles_sender).await; + METRIC_CANDLES_TOTAL.with_label_values(&[market.name.as_str()]).inc_by(candles.clone().len() as u64); } Ok(()) } diff --git a/src/worker/main.rs b/src/worker/main.rs index 95cdef9..f8b4550 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -2,7 +2,7 @@ use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::utils::Config; -use openbook_candles::worker::metrics::serve_metrics; +use openbook_candles::worker::metrics::{serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_CANDLES_QUEUE_LENGTH, METRIC_FILLS_QUEUE_LENGTH}; use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::{ database::{ @@ -13,7 +13,7 @@ use openbook_candles::{ }; use solana_sdk::pubkey::Pubkey; use std::env; -use std::{collections::HashMap, str::FromStr}; +use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] @@ -29,6 +29,9 @@ async fn main() -> anyhow::Result<()> { rpc_url: rpc_url.clone(), }; + let candles_queue_max_size = 10000; + let fills_queue_max_size = 10000; + let markets = load_markets(path_to_markets_json); let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); @@ -41,10 +44,10 @@ async fn main() -> anyhow::Result<()> { setup_database(&pool).await?; let mut handles = vec![]; - let (fill_sender, mut fill_receiver) = mpsc::channel::(10000); - + let (fill_sender, mut fill_receiver) = mpsc::channel::(fills_queue_max_size); + let scrape_fill_sender = fill_sender.clone(); handles.push(tokio::spawn(async move { - scrape(&config, &fill_sender, &target_markets).await; + scrape(&config, &scrape_fill_sender, &target_markets).await; })); let fills_pool = pool.clone(); @@ -56,7 +59,7 @@ async fn main() -> anyhow::Result<()> { } })); - let (candle_sender, mut candle_receiver) = mpsc::channel::>(100000); + let (candle_sender, mut candle_receiver) = mpsc::channel::>(candles_queue_max_size); for market in market_infos.into_iter() { let sender = candle_sender.clone(); @@ -78,6 +81,23 @@ async fn main() -> anyhow::Result<()> { } })); + let monitor_pool = pool.clone(); + let monitor_fill_channel = fill_sender.clone(); + let monitor_candle_channel = candle_sender.clone(); + handles.push(tokio::spawn(async move { + // TODO: maybe break this out into a new function + loop { + let pool_status = monitor_pool.status(); + METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); + METRIC_DB_POOL_SIZE.set(pool_status.size as i64); + + METRIC_CANDLES_QUEUE_LENGTH.set((candles_queue_max_size - monitor_candle_channel.capacity()) as i64); + METRIC_FILLS_QUEUE_LENGTH.set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64); + + tokio::time::sleep(WaitDuration::from_secs(10)).await; + } + })); + handles.push(tokio::spawn(async move { // TODO: this is ugly af serve_metrics().await.unwrap().await.unwrap(); diff --git a/src/worker/metrics/mod.rs b/src/worker/metrics/mod.rs index 6089970..8b65d48 100644 --- a/src/worker/metrics/mod.rs +++ b/src/worker/metrics/mod.rs @@ -1,18 +1,63 @@ use actix_web::{dev::Server, http::StatusCode, App, HttpServer}; use actix_web_prom::PrometheusMetricsBuilder; use lazy_static::lazy_static; -use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry}; +use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry, IntGauge, register_int_gauge_with_registry, register_int_counter_with_registry, IntCounter}; lazy_static! { static ref METRIC_REGISTRY: Registry = Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap(); + pub static ref METRIC_TXS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "txs_total", + "Total number of transactions scraped", + &["market", "status"], + METRIC_REGISTRY + ) + .unwrap(); pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( "fills_total", - "Total number of fills scraped", + "Total number of fills parsed", &["market"], METRIC_REGISTRY ) .unwrap(); + pub static ref METRIC_CANDLES_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "candles_total", + "Total number of candles generated", + &["market"], + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( + "fills_queue_length", + "Current length of the fills write queue", + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_CANDLES_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( + "candles_queue_length", + "Current length of the candles write queue", + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "rpc_errors_total", + "RPC errors while scraping", + &["method"], + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_DB_POOL_SIZE: IntGauge = register_int_gauge_with_registry!( + "db_pool_size", + "Current size of the DB connection pool", + METRIC_REGISTRY + ) + .unwrap(); + pub static ref METRIC_DB_POOL_AVAILABLE: IntGauge = register_int_gauge_with_registry!( + "db_pool_available", + "Available DB connections in the pool", + METRIC_REGISTRY + ) + .unwrap(); } pub async fn serve_metrics() -> anyhow::Result { diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 22e3fb8..3d1981b 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -5,7 +5,7 @@ use solana_transaction_status::{ }; use std::{collections::HashMap, io::Error}; -use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw}; +use crate::{structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw}, worker::metrics::METRIC_RPC_ERRORS_TOTAL}; const PROGRAM_DATA: &str = "Program data: "; @@ -34,7 +34,9 @@ pub fn parse_trades_from_openbook_txns( } } } - Err(_) => {} + Err(_) => { + METRIC_RPC_ERRORS_TOTAL.with_label_values(&["getTransaction"]).inc(); + } } } fills_vector diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index 9c5b9c1..9dd7553 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -9,7 +9,7 @@ use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; use crate::{ - structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::METRIC_FILLS_TOTAL, + structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, }; use super::parsing::parse_trades_from_openbook_txns; @@ -60,6 +60,7 @@ pub async fn scrape_transactions( Ok(s) => s, Err(e) => { println!("Error in get_signatures_for_address_with_config: {}", e); + METRIC_RPC_ERRORS_TOTAL.with_label_values(&["getSignaturesForAddress"]).inc(); return before_sig; } }; From 95f923f7c7dd31b64dcd3ad6f578dc6e2c2cb566 Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Wed, 31 May 2023 14:19:29 +0100 Subject: [PATCH 07/12] cargo fmt --- src/worker/candle_batching/mod.rs | 8 ++++++-- src/worker/main.rs | 11 ++++++++--- src/worker/metrics/mod.rs | 20 ++++++++++++-------- src/worker/trade_fetching/parsing.rs | 9 +++++++-- src/worker/trade_fetching/scrape.rs | 8 ++++++-- 5 files changed, 39 insertions(+), 17 deletions(-) diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index 3a8cd0b..d2c12b1 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -50,14 +50,18 @@ async fn batch_inner( let market_name = &market.name.clone(); let candles = batch_1m_candles(pool, market).await?; send_candles(candles.clone(), candles_sender).await; - METRIC_CANDLES_TOTAL.with_label_values(&[market.name.as_str()]).inc_by(candles.clone().len() as u64); + METRIC_CANDLES_TOTAL + .with_label_values(&[market.name.as_str()]) + .inc_by(candles.clone().len() as u64); for resolution in Resolution::iter() { if resolution == Resolution::R1m { continue; } let candles = batch_higher_order_candles(pool, market_name, resolution).await?; send_candles(candles.clone(), candles_sender).await; - METRIC_CANDLES_TOTAL.with_label_values(&[market.name.as_str()]).inc_by(candles.clone().len() as u64); + METRIC_CANDLES_TOTAL + .with_label_values(&[market.name.as_str()]) + .inc_by(candles.clone().len() as u64); } Ok(()) } diff --git a/src/worker/main.rs b/src/worker/main.rs index f8b4550..05137d2 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -2,7 +2,10 @@ use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::utils::Config; -use openbook_candles::worker::metrics::{serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_CANDLES_QUEUE_LENGTH, METRIC_FILLS_QUEUE_LENGTH}; +use openbook_candles::worker::metrics::{ + serve_metrics, METRIC_CANDLES_QUEUE_LENGTH, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, + METRIC_FILLS_QUEUE_LENGTH, +}; use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::{ database::{ @@ -91,8 +94,10 @@ async fn main() -> anyhow::Result<()> { METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); METRIC_DB_POOL_SIZE.set(pool_status.size as i64); - METRIC_CANDLES_QUEUE_LENGTH.set((candles_queue_max_size - monitor_candle_channel.capacity()) as i64); - METRIC_FILLS_QUEUE_LENGTH.set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64); + METRIC_CANDLES_QUEUE_LENGTH + .set((candles_queue_max_size - monitor_candle_channel.capacity()) as i64); + METRIC_FILLS_QUEUE_LENGTH + .set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64); tokio::time::sleep(WaitDuration::from_secs(10)).await; } diff --git a/src/worker/metrics/mod.rs b/src/worker/metrics/mod.rs index 8b65d48..2302784 100644 --- a/src/worker/metrics/mod.rs +++ b/src/worker/metrics/mod.rs @@ -1,7 +1,10 @@ use actix_web::{dev::Server, http::StatusCode, App, HttpServer}; use actix_web_prom::PrometheusMetricsBuilder; use lazy_static::lazy_static; -use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry, IntGauge, register_int_gauge_with_registry, register_int_counter_with_registry, IntCounter}; +use prometheus::{ + register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec, + IntGauge, Registry, +}; lazy_static! { static ref METRIC_REGISTRY: Registry = @@ -39,13 +42,14 @@ lazy_static! { METRIC_REGISTRY ) .unwrap(); - pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( - "rpc_errors_total", - "RPC errors while scraping", - &["method"], - METRIC_REGISTRY - ) - .unwrap(); + pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec = + register_int_counter_vec_with_registry!( + "rpc_errors_total", + "RPC errors while scraping", + &["method"], + METRIC_REGISTRY + ) + .unwrap(); pub static ref METRIC_DB_POOL_SIZE: IntGauge = register_int_gauge_with_registry!( "db_pool_size", "Current size of the DB connection pool", diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 3d1981b..fea4e0d 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -5,7 +5,10 @@ use solana_transaction_status::{ }; use std::{collections::HashMap, io::Error}; -use crate::{structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw}, worker::metrics::METRIC_RPC_ERRORS_TOTAL}; +use crate::{ + structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw}, + worker::metrics::METRIC_RPC_ERRORS_TOTAL, +}; const PROGRAM_DATA: &str = "Program data: "; @@ -35,7 +38,9 @@ pub fn parse_trades_from_openbook_txns( } } Err(_) => { - METRIC_RPC_ERRORS_TOTAL.with_label_values(&["getTransaction"]).inc(); + METRIC_RPC_ERRORS_TOTAL + .with_label_values(&["getTransaction"]) + .inc(); } } } diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index 9dd7553..f038781 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -9,7 +9,9 @@ use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; use crate::{ - structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, + structs::openbook::OpenBookFillEvent, + utils::Config, + worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, }; use super::parsing::parse_trades_from_openbook_txns; @@ -60,7 +62,9 @@ pub async fn scrape_transactions( Ok(s) => s, Err(e) => { println!("Error in get_signatures_for_address_with_config: {}", e); - METRIC_RPC_ERRORS_TOTAL.with_label_values(&["getSignaturesForAddress"]).inc(); + METRIC_RPC_ERRORS_TOTAL + .with_label_values(&["getSignaturesForAddress"]) + .inc(); return before_sig; } }; From 4d81f6c7f65010988b1f01573f69c85eac5205ff Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Wed, 31 May 2023 14:32:34 +0100 Subject: [PATCH 08/12] Add log levels, remove unused db pings --- src/database/insert.rs | 21 +++---------------- .../candle_batching/higher_order_candles.rs | 15 ++++++------- src/worker/candle_batching/minute_candles.rs | 5 +++-- src/worker/candle_batching/mod.rs | 7 ++++--- src/worker/main.rs | 6 ++++-- src/worker/trade_fetching/parsing.rs | 4 +++- src/worker/trade_fetching/scrape.rs | 5 +++-- 7 files changed, 28 insertions(+), 35 deletions(-) diff --git a/src/database/insert.rs b/src/database/insert.rs index d777f96..d9a8115 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,4 +1,5 @@ use deadpool_postgres::Pool; +use log::debug; use std::{ collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, @@ -36,22 +37,14 @@ pub async fn persist_fill_events( } if !write_batch.is_empty() { - // print!("writing: {:?} events to DB\n", write_batch.len()); + debug!("writing: {:?} events to DB\n", write_batch.len()); - // match conn.ping().await { - // Ok(_) => { let upsert_statement = build_fills_upsert_statement(write_batch); client .execute(&upsert_statement, &[]) .await .map_err_anyhow() .unwrap(); - // } - // Err(_) => { - // println!("Fills ping failed"); - // break; - // } - // } } } } @@ -62,14 +55,12 @@ pub async fn persist_candles( ) -> anyhow::Result<()> { let client = pool.get().await.unwrap(); loop { - // match client.ping().await { - // Ok(_) => { match candles_receiver.try_recv() { Ok(candles) => { if candles.is_empty() { continue; } - // print!("writing: {:?} candles to DB\n", candles.len()); + debug!("writing: {:?} candles to DB\n", candles.len()); let upsert_statement = build_candles_upsert_statement(candles); client .execute(&upsert_statement, &[]) @@ -82,12 +73,6 @@ pub async fn persist_candles( panic!("Candles sender must stay alive") } }; - // } - // Err(_) => { - // println!("Candle ping failed"); - // break; - // } - // }; } } diff --git a/src/worker/candle_batching/higher_order_candles.rs b/src/worker/candle_batching/higher_order_candles.rs index f38518b..e621a08 100644 --- a/src/worker/candle_batching/higher_order_candles.rs +++ b/src/worker/candle_batching/higher_order_candles.rs @@ -1,5 +1,6 @@ use chrono::{DateTime, Duration, DurationRound, Utc}; use deadpool_postgres::Pool; +use log::debug; use std::cmp::max; use crate::{ @@ -46,12 +47,12 @@ pub async fn batch_higher_order_candles( fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()) .await?; if constituent_candles.is_empty() { - // println!( - // "Batching {}, but no candles found for: {:?}, {}", - // resolution, - // market_name, - // resolution.get_constituent_resolution() - // ); + debug!( + "Batching {}, but no candles found for: {:?}, {}", + resolution, + market_name, + resolution.get_constituent_resolution() + ); return Ok(Vec::new()); } let start_time = constituent_candles[0].start_time.duration_trunc(day())?; @@ -82,7 +83,7 @@ fn combine_into_higher_order_candles( st: DateTime, seed_candle: Candle, ) -> Vec { - // println!("target_resolution: {}", target_resolution); + debug!("combining for target_resolution: {}", target_resolution); let duration = target_resolution.get_duration(); diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index 24e44f5..d051b47 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -2,6 +2,7 @@ use std::cmp::min; use chrono::{DateTime, Duration, DurationRound, Utc}; use deadpool_postgres::Pool; +use log::debug; use crate::{ database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, @@ -40,7 +41,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul let earliest_fill = fetch_earliest_fill(pool, market_address).await?; if earliest_fill.is_none() { - println!("No fills found for: {:?}", market_name); + debug!("No fills found for: {:?}", market_name); return Ok(Vec::new()); } @@ -132,7 +133,7 @@ pub async fn backfill_batch_1m_candles( let earliest_fill = fetch_earliest_fill(pool, &market.address).await?; if earliest_fill.is_none() { - println!("No fills found for: {:?}", &market_name); + debug!("No fills found for: {:?}", &market_name); return Ok(candles); } diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index d2c12b1..d6303f0 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -3,6 +3,7 @@ pub mod minute_candles; use chrono::Duration; use deadpool_postgres::Pool; +use log::{error, warn}; use strum::IntoEnumIterator; use tokio::{sync::mpsc::Sender, time::sleep}; @@ -23,13 +24,13 @@ pub async fn batch_for_market( loop { let sender = candles_sender.clone(); let market_clone = market.clone(); - // let client = pool.get().await?; + loop { sleep(Duration::milliseconds(2000).to_std()?).await; match batch_inner(pool, &sender, &market_clone).await { Ok(_) => {} Err(e) => { - println!( + error!( "Batching thread failed for {:?} with error: {:?}", market_clone.name.clone(), e @@ -38,7 +39,7 @@ pub async fn batch_for_market( } }; } - println!("Restarting {:?} batching thread", market.name); + warn!("Restarting {:?} batching thread", market.name); } } diff --git a/src/worker/main.rs b/src/worker/main.rs index 05137d2..3f94a7f 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,3 +1,4 @@ +use log::{error, info}; use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEvent; @@ -21,6 +22,7 @@ use tokio::sync::mpsc; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { + env_logger::init(); dotenv::dotenv().ok(); let args: Vec = env::args().collect(); @@ -41,7 +43,7 @@ async fn main() -> anyhow::Result<()> { for m in market_infos.clone() { target_markets.insert(Pubkey::from_str(&m.address)?, m.name); } - println!("{:?}", target_markets); + info!("{:?}", target_markets); let pool = connect_to_database().await?; setup_database(&pool).await?; @@ -71,7 +73,7 @@ async fn main() -> anyhow::Result<()> { batch_for_market(&batch_pool, &sender, &market) .await .unwrap(); - println!("SOMETHING WENT WRONG"); + error!("batching halted for market {}", &market.name); })); } diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index fea4e0d..a31ce5c 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -1,3 +1,4 @@ +use log::warn; use solana_client::client_error::Result as ClientResult; use solana_sdk::pubkey::Pubkey; use solana_transaction_status::{ @@ -37,7 +38,8 @@ pub fn parse_trades_from_openbook_txns( } } } - Err(_) => { + Err(e) => { + warn!("rpc error in get_transaction {}", e); METRIC_RPC_ERRORS_TOTAL .with_label_values(&["getTransaction"]) .inc(); diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index f038781..bbba445 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -1,4 +1,5 @@ use futures::future::join_all; +use log::{debug, warn}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::RpcTransactionConfig, @@ -61,7 +62,7 @@ pub async fn scrape_transactions( { Ok(s) => s, Err(e) => { - println!("Error in get_signatures_for_address_with_config: {}", e); + warn!("rpc error in get_signatures_for_address_with_config: {}", e); METRIC_RPC_ERRORS_TOTAL .with_label_values(&["getSignaturesForAddress"]) .inc(); @@ -70,7 +71,7 @@ pub async fn scrape_transactions( }; if sigs.is_empty() { - println!("No signatures found"); + debug!("No signatures found"); return before_sig; } From df461e8451f9f9386c9abbe0da56528dd77f27cd Mon Sep 17 00:00:00 2001 From: dboures Date: Thu, 1 Jun 2023 00:58:57 -0500 Subject: [PATCH 09/12] temp: filter redundant 1m candles --- src/structs/candle.rs | 2 +- src/worker/candle_batching/minute_candles.rs | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/structs/candle.rs b/src/structs/candle.rs index 7cf26b2..733bd36 100644 --- a/src/structs/candle.rs +++ b/src/structs/candle.rs @@ -3,7 +3,7 @@ use tokio_postgres::Row; use super::resolution::Resolution; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct Candle { pub market_name: String, pub start_time: DateTime, diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index d051b47..cf20446 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -5,7 +5,7 @@ use deadpool_postgres::Pool; use log::debug; use crate::{ - database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, + database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle, fetch_candles_from}, structs::{ candle::Candle, markets::MarketInfo, @@ -28,6 +28,8 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; + let existing_candles = fetch_candles_from(pool, market_name, Resolution::R1m, candle.start_time, end_time).await?; + let candles = combine_fills_into_1m_candles( &mut fills, market, @@ -35,7 +37,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul end_time, Some(candle.close), ); - Ok(candles) + Ok(filter_redundant_candles(existing_candles, candles.clone())) } None => { let earliest_fill = fetch_earliest_fill(pool, market_address).await?; @@ -122,6 +124,17 @@ fn combine_fills_into_1m_candles( candles } +fn filter_redundant_candles(existing_candles: Vec, mut candles: Vec) -> Vec { + candles.retain(|c| { + !existing_candles.contains(c) + }); + println!("trimmed: {:?}", candles.len()); + // println!("{:?}", candles.last()); + println!("candles: {:?}", existing_candles.len()); + // println!("{:?}", existing_candles.last()); + candles +} + /// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end. pub async fn backfill_batch_1m_candles( pool: &Pool, From 89e2fa71780195348508a8ebd86ad30e1a4a98ad Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Thu, 1 Jun 2023 15:30:54 +0100 Subject: [PATCH 10/12] temp: add debugging output to candle batching, mark old candles as completed --- src/worker/candle_batching/minute_candles.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index cf20446..a53bef4 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -22,6 +22,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul match latest_candle { Some(candle) => { + println!("{}: latest finished candle time {}", market_name, candle.end_time); let start_time = candle.end_time; let end_time = min( start_time + day(), @@ -29,6 +30,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; let existing_candles = fetch_candles_from(pool, market_name, Resolution::R1m, candle.start_time, end_time).await?; + println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time); let candles = combine_fills_into_1m_candles( &mut fills, @@ -37,9 +39,12 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul end_time, Some(candle.close), ); + + println!("{}: filtering {} new candles on {} existing candles from {} to {}", market_name, candles.clone().len(), existing_candles.clone().len(), start_time, end_time); Ok(filter_redundant_candles(existing_candles, candles.clone())) } None => { + println!("{}: no finished candle", market_name); let earliest_fill = fetch_earliest_fill(pool, market_address).await?; if earliest_fill.is_none() { @@ -56,6 +61,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; + println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time); if !fills.is_empty() { let candles = combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); @@ -101,7 +107,7 @@ fn combine_fills_into_1m_candles( while matches!(fills_iter.peek(), Some(f) if f.time < end_time) { let fill = fills_iter.next().unwrap(); - + println!("adding fill from {}", fill.time); let (price, volume) = calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals); @@ -115,8 +121,16 @@ fn combine_fills_into_1m_candles( candles[i].start_time = start_time; candles[i].end_time = end_time; - candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time); - + candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time) || end_time < Utc::now() - Duration::days(1); + if candles[i].complete { + println!("candle {} complete with end time {}", i, end_time); + } else { + let peeked_fill = fills_iter.peek(); + match peeked_fill { + Some(f) => println!("candle {} incomplete, peeked fill was at {} and end time was {}", i, f.time, end_time), + None => {} + } + } start_time = end_time; end_time += Duration::minutes(1); } From e0d677c241aa66fdaa44d341eec2aea4b5f39261 Mon Sep 17 00:00:00 2001 From: dboures Date: Sat, 3 Jun 2023 12:22:10 -0500 Subject: [PATCH 11/12] refactor: remove candles queue --- src/database/insert.rs | 32 +------------- src/worker/candle_batching/minute_candles.rs | 37 +++------------- src/worker/candle_batching/mod.rs | 46 ++++++++------------ src/worker/main.rs | 26 ++--------- src/worker/metrics/mod.rs | 6 --- 5 files changed, 28 insertions(+), 119 deletions(-) diff --git a/src/database/insert.rs b/src/database/insert.rs index d9a8115..ba95e87 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -15,7 +15,6 @@ pub async fn persist_fill_events( pool: &Pool, fill_receiver: &mut Receiver, ) -> anyhow::Result<()> { - let client = pool.get().await?; loop { let mut write_batch = HashMap::new(); while write_batch.len() < 10 { @@ -38,8 +37,8 @@ pub async fn persist_fill_events( if !write_batch.is_empty() { debug!("writing: {:?} events to DB\n", write_batch.len()); - let upsert_statement = build_fills_upsert_statement(write_batch); + let client = pool.get().await?; client .execute(&upsert_statement, &[]) .await @@ -49,33 +48,6 @@ pub async fn persist_fill_events( } } -pub async fn persist_candles( - pool: Pool, - candles_receiver: &mut Receiver>, -) -> anyhow::Result<()> { - let client = pool.get().await.unwrap(); - loop { - match candles_receiver.try_recv() { - Ok(candles) => { - if candles.is_empty() { - continue; - } - debug!("writing: {:?} candles to DB\n", candles.len()); - let upsert_statement = build_candles_upsert_statement(candles); - client - .execute(&upsert_statement, &[]) - .await - .map_err_anyhow() - .unwrap(); - } - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Disconnected) => { - panic!("Candles sender must stay alive") - } - }; - } -} - #[allow(deprecated)] fn build_fills_upsert_statement(events: HashMap) -> String { let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); @@ -111,7 +83,7 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin stmt } -pub fn build_candles_upsert_statement(candles: Vec) -> String { +pub fn build_candles_upsert_statement(candles: &Vec) -> String { let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES"); for (idx, candle) in candles.iter().enumerate() { let val_str = format!( diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index a53bef4..1dcc0b8 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -5,7 +5,7 @@ use deadpool_postgres::Pool; use log::debug; use crate::{ - database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle, fetch_candles_from}, + database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, structs::{ candle::Candle, markets::MarketInfo, @@ -22,15 +22,12 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul match latest_candle { Some(candle) => { - println!("{}: latest finished candle time {}", market_name, candle.end_time); let start_time = candle.end_time; let end_time = min( start_time + day(), - Utc::now().duration_trunc(Duration::minutes(1))?, + (Utc::now() + Duration::minutes(1)).duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; - let existing_candles = fetch_candles_from(pool, market_name, Resolution::R1m, candle.start_time, end_time).await?; - println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time); let candles = combine_fills_into_1m_candles( &mut fills, @@ -39,12 +36,9 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul end_time, Some(candle.close), ); - - println!("{}: filtering {} new candles on {} existing candles from {} to {}", market_name, candles.clone().len(), existing_candles.clone().len(), start_time, end_time); - Ok(filter_redundant_candles(existing_candles, candles.clone())) + Ok(candles) } None => { - println!("{}: no finished candle", market_name); let earliest_fill = fetch_earliest_fill(pool, market_address).await?; if earliest_fill.is_none() { @@ -61,7 +55,6 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; - println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time); if !fills.is_empty() { let candles = combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); @@ -107,7 +100,6 @@ fn combine_fills_into_1m_candles( while matches!(fills_iter.peek(), Some(f) if f.time < end_time) { let fill = fills_iter.next().unwrap(); - println!("adding fill from {}", fill.time); let (price, volume) = calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals); @@ -121,16 +113,8 @@ fn combine_fills_into_1m_candles( candles[i].start_time = start_time; candles[i].end_time = end_time; - candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time) || end_time < Utc::now() - Duration::days(1); - if candles[i].complete { - println!("candle {} complete with end time {}", i, end_time); - } else { - let peeked_fill = fills_iter.peek(); - match peeked_fill { - Some(f) => println!("candle {} incomplete, peeked fill was at {} and end time was {}", i, f.time, end_time), - None => {} - } - } + candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time) + || end_time < Utc::now() - Duration::minutes(10); start_time = end_time; end_time += Duration::minutes(1); } @@ -138,17 +122,6 @@ fn combine_fills_into_1m_candles( candles } -fn filter_redundant_candles(existing_candles: Vec, mut candles: Vec) -> Vec { - candles.retain(|c| { - !existing_candles.contains(c) - }); - println!("trimmed: {:?}", candles.len()); - // println!("{:?}", candles.last()); - println!("candles: {:?}", existing_candles.len()); - // println!("{:?}", existing_candles.last()); - candles -} - /// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end. pub async fn backfill_batch_1m_candles( pool: &Pool, diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index d6303f0..310f9e5 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -5,29 +5,24 @@ use chrono::Duration; use deadpool_postgres::Pool; use log::{error, warn}; use strum::IntoEnumIterator; -use tokio::{sync::mpsc::Sender, time::sleep}; +use tokio::time::sleep; use crate::{ + database::insert::build_candles_upsert_statement, structs::{candle::Candle, markets::MarketInfo, resolution::Resolution}, + utils::AnyhowWrap, worker::candle_batching::minute_candles::batch_1m_candles, }; use self::higher_order_candles::batch_higher_order_candles; -use super::metrics::METRIC_CANDLES_TOTAL; - -pub async fn batch_for_market( - pool: &Pool, - candles_sender: &Sender>, - market: &MarketInfo, -) -> anyhow::Result<()> { +pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { loop { - let sender = candles_sender.clone(); let market_clone = market.clone(); loop { sleep(Duration::milliseconds(2000).to_std()?).await; - match batch_inner(pool, &sender, &market_clone).await { + match batch_inner(pool, &market_clone).await { Ok(_) => {} Err(e) => { error!( @@ -43,34 +38,29 @@ pub async fn batch_for_market( } } -async fn batch_inner( - pool: &Pool, - candles_sender: &Sender>, - market: &MarketInfo, -) -> anyhow::Result<()> { +async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { let market_name = &market.name.clone(); let candles = batch_1m_candles(pool, market).await?; - send_candles(candles.clone(), candles_sender).await; - METRIC_CANDLES_TOTAL - .with_label_values(&[market.name.as_str()]) - .inc_by(candles.clone().len() as u64); + save_candles(pool, candles).await?; for resolution in Resolution::iter() { if resolution == Resolution::R1m { continue; } let candles = batch_higher_order_candles(pool, market_name, resolution).await?; - send_candles(candles.clone(), candles_sender).await; - METRIC_CANDLES_TOTAL - .with_label_values(&[market.name.as_str()]) - .inc_by(candles.clone().len() as u64); + save_candles(pool, candles).await?; } Ok(()) } -async fn send_candles(candles: Vec, candles_sender: &Sender>) { - if !candles.is_empty() { - if let Err(_) = candles_sender.send(candles).await { - panic!("candles receiver dropped"); - } +async fn save_candles(pool: &Pool, candles: Vec) -> anyhow::Result<()> { + if candles.len() == 0 { + return Ok(()); } + let upsert_statement = build_candles_upsert_statement(&candles); + let client = pool.get().await.unwrap(); + client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + Ok(()) } diff --git a/src/worker/main.rs b/src/worker/main.rs index 3f94a7f..996cc41 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,17 +1,15 @@ use log::{error, info}; -use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::utils::Config; use openbook_candles::worker::metrics::{ - serve_metrics, METRIC_CANDLES_QUEUE_LENGTH, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, - METRIC_FILLS_QUEUE_LENGTH, + serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_FILLS_QUEUE_LENGTH, }; use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::{ database::{ initialize::{connect_to_database, setup_database}, - insert::{persist_candles, persist_fill_events}, + insert::{persist_fill_events}, }, worker::candle_batching::batch_for_market, }; @@ -34,7 +32,6 @@ async fn main() -> anyhow::Result<()> { rpc_url: rpc_url.clone(), }; - let candles_queue_max_size = 10000; let fills_queue_max_size = 10000; let markets = load_markets(path_to_markets_json); @@ -64,31 +61,16 @@ async fn main() -> anyhow::Result<()> { } })); - let (candle_sender, mut candle_receiver) = mpsc::channel::>(candles_queue_max_size); - for market in market_infos.into_iter() { - let sender = candle_sender.clone(); let batch_pool = pool.clone(); handles.push(tokio::spawn(async move { - batch_for_market(&batch_pool, &sender, &market) - .await - .unwrap(); + batch_for_market(&batch_pool, &market).await.unwrap(); error!("batching halted for market {}", &market.name); })); } - let persist_pool = pool.clone(); - handles.push(tokio::spawn(async move { - loop { - persist_candles(persist_pool.clone(), &mut candle_receiver) - .await - .unwrap(); - } - })); - let monitor_pool = pool.clone(); let monitor_fill_channel = fill_sender.clone(); - let monitor_candle_channel = candle_sender.clone(); handles.push(tokio::spawn(async move { // TODO: maybe break this out into a new function loop { @@ -96,8 +78,6 @@ async fn main() -> anyhow::Result<()> { METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); METRIC_DB_POOL_SIZE.set(pool_status.size as i64); - METRIC_CANDLES_QUEUE_LENGTH - .set((candles_queue_max_size - monitor_candle_channel.capacity()) as i64); METRIC_FILLS_QUEUE_LENGTH .set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64); diff --git a/src/worker/metrics/mod.rs b/src/worker/metrics/mod.rs index 2302784..09488f3 100644 --- a/src/worker/metrics/mod.rs +++ b/src/worker/metrics/mod.rs @@ -36,12 +36,6 @@ lazy_static! { METRIC_REGISTRY ) .unwrap(); - pub static ref METRIC_CANDLES_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( - "candles_queue_length", - "Current length of the candles write queue", - METRIC_REGISTRY - ) - .unwrap(); pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( "rpc_errors_total", From 493ced0b00538ad8c9aa2c355e8bcdc725a54e9b Mon Sep 17 00:00:00 2001 From: dboures Date: Sat, 3 Jun 2023 12:22:25 -0500 Subject: [PATCH 12/12] fix: don't use serial in postgres --- src/database/initialize.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/database/initialize.rs b/src/database/initialize.rs index 92fd854..4bc8a9c 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -87,7 +87,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> { client .execute( "CREATE TABLE IF NOT EXISTS candles ( - id serial, + id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, market_name text, start_time timestamptz, end_time timestamptz,