diff --git a/src/database/insert.rs b/src/database/insert.rs index d777f96..d9a8115 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,4 +1,5 @@ use deadpool_postgres::Pool; +use log::debug; use std::{ collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, @@ -36,22 +37,14 @@ pub async fn persist_fill_events( } if !write_batch.is_empty() { - // print!("writing: {:?} events to DB\n", write_batch.len()); + debug!("writing: {:?} events to DB\n", write_batch.len()); - // match conn.ping().await { - // Ok(_) => { let upsert_statement = build_fills_upsert_statement(write_batch); client .execute(&upsert_statement, &[]) .await .map_err_anyhow() .unwrap(); - // } - // Err(_) => { - // println!("Fills ping failed"); - // break; - // } - // } } } } @@ -62,14 +55,12 @@ pub async fn persist_candles( ) -> anyhow::Result<()> { let client = pool.get().await.unwrap(); loop { - // match client.ping().await { - // Ok(_) => { match candles_receiver.try_recv() { Ok(candles) => { if candles.is_empty() { continue; } - // print!("writing: {:?} candles to DB\n", candles.len()); + debug!("writing: {:?} candles to DB\n", candles.len()); let upsert_statement = build_candles_upsert_statement(candles); client .execute(&upsert_statement, &[]) @@ -82,12 +73,6 @@ pub async fn persist_candles( panic!("Candles sender must stay alive") } }; - // } - // Err(_) => { - // println!("Candle ping failed"); - // break; - // } - // }; } } diff --git a/src/worker/candle_batching/higher_order_candles.rs b/src/worker/candle_batching/higher_order_candles.rs index f38518b..e621a08 100644 --- a/src/worker/candle_batching/higher_order_candles.rs +++ b/src/worker/candle_batching/higher_order_candles.rs @@ -1,5 +1,6 @@ use chrono::{DateTime, Duration, DurationRound, Utc}; use deadpool_postgres::Pool; +use log::debug; use std::cmp::max; use crate::{ @@ -46,12 +47,12 @@ pub async fn batch_higher_order_candles( fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()) .await?; if constituent_candles.is_empty() { - // println!( - // "Batching {}, but no candles found for: {:?}, {}", - // resolution, - // market_name, - // resolution.get_constituent_resolution() - // ); + debug!( + "Batching {}, but no candles found for: {:?}, {}", + resolution, + market_name, + resolution.get_constituent_resolution() + ); return Ok(Vec::new()); } let start_time = constituent_candles[0].start_time.duration_trunc(day())?; @@ -82,7 +83,7 @@ fn combine_into_higher_order_candles( st: DateTime, seed_candle: Candle, ) -> Vec { - // println!("target_resolution: {}", target_resolution); + debug!("combining for target_resolution: {}", target_resolution); let duration = target_resolution.get_duration(); diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index 24e44f5..d051b47 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -2,6 +2,7 @@ use std::cmp::min; use chrono::{DateTime, Duration, DurationRound, Utc}; use deadpool_postgres::Pool; +use log::debug; use crate::{ database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, @@ -40,7 +41,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul let earliest_fill = fetch_earliest_fill(pool, market_address).await?; if earliest_fill.is_none() { - println!("No fills found for: {:?}", market_name); + debug!("No fills found for: {:?}", market_name); return Ok(Vec::new()); } @@ -132,7 +133,7 @@ pub async fn backfill_batch_1m_candles( let earliest_fill = fetch_earliest_fill(pool, &market.address).await?; if earliest_fill.is_none() { - println!("No fills found for: {:?}", &market_name); + debug!("No fills found for: {:?}", &market_name); return Ok(candles); } diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index d2c12b1..d6303f0 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -3,6 +3,7 @@ pub mod minute_candles; use chrono::Duration; use deadpool_postgres::Pool; +use log::{error, warn}; use strum::IntoEnumIterator; use tokio::{sync::mpsc::Sender, time::sleep}; @@ -23,13 +24,13 @@ pub async fn batch_for_market( loop { let sender = candles_sender.clone(); let market_clone = market.clone(); - // let client = pool.get().await?; + loop { sleep(Duration::milliseconds(2000).to_std()?).await; match batch_inner(pool, &sender, &market_clone).await { Ok(_) => {} Err(e) => { - println!( + error!( "Batching thread failed for {:?} with error: {:?}", market_clone.name.clone(), e @@ -38,7 +39,7 @@ pub async fn batch_for_market( } }; } - println!("Restarting {:?} batching thread", market.name); + warn!("Restarting {:?} batching thread", market.name); } } diff --git a/src/worker/main.rs b/src/worker/main.rs index 05137d2..3f94a7f 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,3 +1,4 @@ +use log::{error, info}; use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEvent; @@ -21,6 +22,7 @@ use tokio::sync::mpsc; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { + env_logger::init(); dotenv::dotenv().ok(); let args: Vec = env::args().collect(); @@ -41,7 +43,7 @@ async fn main() -> anyhow::Result<()> { for m in market_infos.clone() { target_markets.insert(Pubkey::from_str(&m.address)?, m.name); } - println!("{:?}", target_markets); + info!("{:?}", target_markets); let pool = connect_to_database().await?; setup_database(&pool).await?; @@ -71,7 +73,7 @@ async fn main() -> anyhow::Result<()> { batch_for_market(&batch_pool, &sender, &market) .await .unwrap(); - println!("SOMETHING WENT WRONG"); + error!("batching halted for market {}", &market.name); })); } diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index fea4e0d..a31ce5c 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -1,3 +1,4 @@ +use log::warn; use solana_client::client_error::Result as ClientResult; use solana_sdk::pubkey::Pubkey; use solana_transaction_status::{ @@ -37,7 +38,8 @@ pub fn parse_trades_from_openbook_txns( } } } - Err(_) => { + Err(e) => { + warn!("rpc error in get_transaction {}", e); METRIC_RPC_ERRORS_TOTAL .with_label_values(&["getTransaction"]) .inc(); diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index f038781..bbba445 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -1,4 +1,5 @@ use futures::future::join_all; +use log::{debug, warn}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::RpcTransactionConfig, @@ -61,7 +62,7 @@ pub async fn scrape_transactions( { Ok(s) => s, Err(e) => { - println!("Error in get_signatures_for_address_with_config: {}", e); + warn!("rpc error in get_signatures_for_address_with_config: {}", e); METRIC_RPC_ERRORS_TOTAL .with_label_values(&["getSignaturesForAddress"]) .inc(); @@ -70,7 +71,7 @@ pub async fn scrape_transactions( }; if sigs.is_empty() { - println!("No signatures found"); + debug!("No signatures found"); return before_sig; }