diff --git a/Cargo.lock b/Cargo.lock index 2093321..b17fae6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3451,6 +3451,7 @@ dependencies = [ "env_logger 0.10.0", "futures 0.3.27", "jsonrpc-core-client", + "lazy_static", "log 0.4.17", "native-tls", "num-traits", diff --git a/Cargo.toml b/Cargo.toml index cf5ef4c..a4b3607 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,3 +70,4 @@ num_enum = "0.6.1" config = "0.13.1" prometheus = "0.13.3" +lazy_static = "1.4.0" diff --git a/markets.json b/markets.json index b376379..ef69ff0 100644 --- a/markets.json +++ b/markets.json @@ -33,6 +33,6 @@ }, { "name": "RAY/USDC", - "address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxV" + "address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj" } ] \ No newline at end of file diff --git a/src/backfill-candles/main.rs b/src/backfill-candles/main.rs index 0321d21..e7c2b39 100644 --- a/src/backfill-candles/main.rs +++ b/src/backfill-candles/main.rs @@ -1,12 +1,7 @@ - - use deadpool_postgres::Object; use openbook_candles::{ - database::{ - initialize::connect_to_database, - insert::{build_candles_upsert_statement}, - }, + database::{initialize::connect_to_database, insert::build_candles_upsert_statement}, structs::{ candle::Candle, markets::{fetch_market_infos, load_markets}, @@ -18,10 +13,9 @@ use openbook_candles::{ minute_candles::backfill_batch_1m_candles, }, }; -use std::{env}; +use std::env; use strum::IntoEnumIterator; - #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index 23a0fb4..8ac2ec3 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> { let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); for m in market_infos.clone() { - target_markets.insert(Pubkey::from_str(&m.address)?, 0); + target_markets.insert(Pubkey::from_str(&m.address)?, m.name); } println!("{:?}", target_markets); @@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> { pub async fn backfill( rpc_url: String, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) -> anyhow::Result<()> { println!("backfill started"); let mut before_sig: Option = None; @@ -145,7 +145,7 @@ pub async fn get_transactions( rpc_client: &RpcClient, mut sigs: Vec, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) { sigs.retain(|sig| sig.err.is_none()); if sigs.last().is_none() { diff --git a/src/database/fetch.rs b/src/database/fetch.rs index d0882ef..9e58f61 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -67,10 +67,7 @@ pub async fn fetch_fills_from( let rows = client .query(&stmt, &[&market_address_string, &start_time, &end_time]) .await?; - Ok(rows - .into_iter() - .map(PgOpenBookFill::from_row) - .collect()) + Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect()) } pub async fn fetch_latest_finished_candle( diff --git a/src/database/initialize.rs b/src/database/initialize.rs index d26bc42..92fd854 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -36,6 +36,7 @@ pub async fn connect_to_database() -> anyhow::Result { MakeTlsConnector::new( TlsConnector::builder() .add_root_certificate(Certificate::from_pem(&ca_cert)?) + // TODO: make this configurable .identity(Identity::from_pkcs12(&client_key, "pass")?) .danger_accept_invalid_certs(false) .build()?, diff --git a/src/server/main.rs b/src/server/main.rs index a940956..19a8706 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,8 +1,9 @@ use actix_web::{ + http::StatusCode, middleware::Logger, rt::System, web::{self, Data}, - App, HttpServer, http::StatusCode, + App, HttpServer, }; use actix_web_prom::PrometheusMetricsBuilder; use candles::get_candles; @@ -92,13 +93,10 @@ async fn main() -> std::io::Result<()> { // Thread to serve metrics endpoint privately let private_server = thread::spawn(move || { let sys = System::new(); - let srv = HttpServer::new(move || { - App::new() - .wrap(private_metrics.clone()) - }) - .bind("0.0.0.0:9091") - .unwrap() - .run(); + let srv = HttpServer::new(move || App::new().wrap(private_metrics.clone())) + .bind("0.0.0.0:9091") + .unwrap() + .run(); sys.block_on(srv).unwrap(); }); diff --git a/src/worker/main.rs b/src/worker/main.rs index 000f0e6..95cdef9 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,8 +1,8 @@ - use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::utils::Config; +use openbook_candles::worker::metrics::serve_metrics; use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::{ database::{ @@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> { let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); for m in market_infos.clone() { - target_markets.insert(Pubkey::from_str(&m.address)?, 0); + target_markets.insert(Pubkey::from_str(&m.address)?, m.name); } println!("{:?}", target_markets); @@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> { setup_database(&pool).await?; let mut handles = vec![]; - let (fill_sender, mut fill_receiver) = mpsc::channel::(1000); + let (fill_sender, mut fill_receiver) = mpsc::channel::(10000); handles.push(tokio::spawn(async move { scrape(&config, &fill_sender, &target_markets).await; @@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> { } })); - let (candle_sender, mut candle_receiver) = mpsc::channel::>(1000); + let (candle_sender, mut candle_receiver) = mpsc::channel::>(100000); for market in market_infos.into_iter() { let sender = candle_sender.clone(); @@ -78,6 +78,11 @@ async fn main() -> anyhow::Result<()> { } })); + handles.push(tokio::spawn(async move { + // TODO: this is ugly af + serve_metrics().await.unwrap().await.unwrap(); + })); + futures::future::join_all(handles).await; Ok(()) diff --git a/src/worker/metrics/mod.rs b/src/worker/metrics/mod.rs new file mode 100644 index 0000000..6089970 --- /dev/null +++ b/src/worker/metrics/mod.rs @@ -0,0 +1,33 @@ +use actix_web::{dev::Server, http::StatusCode, App, HttpServer}; +use actix_web_prom::PrometheusMetricsBuilder; +use lazy_static::lazy_static; +use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry}; + +lazy_static! { + static ref METRIC_REGISTRY: Registry = + Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap(); + pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( + "fills_total", + "Total number of fills scraped", + &["market"], + METRIC_REGISTRY + ) + .unwrap(); +} + +pub async fn serve_metrics() -> anyhow::Result { + let metrics = PrometheusMetricsBuilder::new("openbook_candles_worker") + .registry(METRIC_REGISTRY.clone()) + .exclude("/metrics") + .exclude_status(StatusCode::NOT_FOUND) + .endpoint("/metrics") + .build() + .unwrap(); + let server = HttpServer::new(move || App::new().wrap(metrics.clone())) + .bind("0.0.0.0:9091") + .unwrap() + .disable_signals() + .run(); + + Ok(server) +} diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 04e662b..53437ac 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,2 +1,3 @@ pub mod candle_batching; +pub mod metrics; pub mod trade_fetching; diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 02c8d64..22e3fb8 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -11,7 +11,7 @@ const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, - target_markets: &HashMap, + target_markets: &HashMap, ) -> Vec { let mut fills_vector = Vec::::new(); for txn in txns.iter_mut() { @@ -42,7 +42,7 @@ pub fn parse_trades_from_openbook_txns( fn parse_openbook_fills_from_logs( logs: &Vec, - target_markets: &HashMap, + target_markets: &HashMap, block_time: i64, ) -> Option> { let mut fills_vector = Vec::::new(); diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index 6a5bf09..9c5b9c1 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -8,14 +8,16 @@ use solana_transaction_status::UiTransactionEncoding; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; -use crate::{structs::openbook::OpenBookFillEvent, utils::Config}; +use crate::{ + structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::METRIC_FILLS_TOTAL, +}; use super::parsing::parse_trades_from_openbook_txns; pub async fn scrape( config: &Config, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) { let rpc_client = RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed()); @@ -39,7 +41,7 @@ pub async fn scrape_transactions( before_sig: Option, limit: Option, fill_sender: &Sender, - target_markets: &HashMap, + target_markets: &HashMap, ) -> Option { let rpc_config = GetConfirmedSignaturesForAddress2Config { before: before_sig, @@ -96,9 +98,11 @@ pub async fn scrape_transactions( let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); if !fills.is_empty() { for fill in fills.into_iter() { + let market_name = target_markets.get(&fill.market).unwrap(); if let Err(_) = fill_sender.send(fill).await { panic!("receiver dropped"); } + METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc(); } }