Add log levels, remove unused db pings

This commit is contained in:
Riordan Panayides 2023-05-31 14:32:34 +01:00
parent 95f923f7c7
commit 4d81f6c7f6
7 changed files with 28 additions and 35 deletions

View File

@ -1,4 +1,5 @@
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::debug;
use std::{ use std::{
collections::{hash_map::DefaultHasher, HashMap}, collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher}, hash::{Hash, Hasher},
@ -36,22 +37,14 @@ pub async fn persist_fill_events(
} }
if !write_batch.is_empty() { if !write_batch.is_empty() {
// print!("writing: {:?} events to DB\n", write_batch.len()); debug!("writing: {:?} events to DB\n", write_batch.len());
// match conn.ping().await {
// Ok(_) => {
let upsert_statement = build_fills_upsert_statement(write_batch); let upsert_statement = build_fills_upsert_statement(write_batch);
client client
.execute(&upsert_statement, &[]) .execute(&upsert_statement, &[])
.await .await
.map_err_anyhow() .map_err_anyhow()
.unwrap(); .unwrap();
// }
// Err(_) => {
// println!("Fills ping failed");
// break;
// }
// }
} }
} }
} }
@ -62,14 +55,12 @@ pub async fn persist_candles(
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let client = pool.get().await.unwrap(); let client = pool.get().await.unwrap();
loop { loop {
// match client.ping().await {
// Ok(_) => {
match candles_receiver.try_recv() { match candles_receiver.try_recv() {
Ok(candles) => { Ok(candles) => {
if candles.is_empty() { if candles.is_empty() {
continue; continue;
} }
// print!("writing: {:?} candles to DB\n", candles.len()); debug!("writing: {:?} candles to DB\n", candles.len());
let upsert_statement = build_candles_upsert_statement(candles); let upsert_statement = build_candles_upsert_statement(candles);
client client
.execute(&upsert_statement, &[]) .execute(&upsert_statement, &[])
@ -82,12 +73,6 @@ pub async fn persist_candles(
panic!("Candles sender must stay alive") panic!("Candles sender must stay alive")
} }
}; };
// }
// Err(_) => {
// println!("Candle ping failed");
// break;
// }
// };
} }
} }

View File

@ -1,5 +1,6 @@
use chrono::{DateTime, Duration, DurationRound, Utc}; use chrono::{DateTime, Duration, DurationRound, Utc};
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::debug;
use std::cmp::max; use std::cmp::max;
use crate::{ use crate::{
@ -46,12 +47,12 @@ pub async fn batch_higher_order_candles(
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()) fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution())
.await?; .await?;
if constituent_candles.is_empty() { if constituent_candles.is_empty() {
// println!( debug!(
// "Batching {}, but no candles found for: {:?}, {}", "Batching {}, but no candles found for: {:?}, {}",
// resolution, resolution,
// market_name, market_name,
// resolution.get_constituent_resolution() resolution.get_constituent_resolution()
// ); );
return Ok(Vec::new()); return Ok(Vec::new());
} }
let start_time = constituent_candles[0].start_time.duration_trunc(day())?; let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
@ -82,7 +83,7 @@ fn combine_into_higher_order_candles(
st: DateTime<Utc>, st: DateTime<Utc>,
seed_candle: Candle, seed_candle: Candle,
) -> Vec<Candle> { ) -> Vec<Candle> {
// println!("target_resolution: {}", target_resolution); debug!("combining for target_resolution: {}", target_resolution);
let duration = target_resolution.get_duration(); let duration = target_resolution.get_duration();

View File

@ -2,6 +2,7 @@ use std::cmp::min;
use chrono::{DateTime, Duration, DurationRound, Utc}; use chrono::{DateTime, Duration, DurationRound, Utc};
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::debug;
use crate::{ use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
@ -40,7 +41,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
let earliest_fill = fetch_earliest_fill(pool, market_address).await?; let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
if earliest_fill.is_none() { if earliest_fill.is_none() {
println!("No fills found for: {:?}", market_name); debug!("No fills found for: {:?}", market_name);
return Ok(Vec::new()); return Ok(Vec::new());
} }
@ -132,7 +133,7 @@ pub async fn backfill_batch_1m_candles(
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?; let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
if earliest_fill.is_none() { if earliest_fill.is_none() {
println!("No fills found for: {:?}", &market_name); debug!("No fills found for: {:?}", &market_name);
return Ok(candles); return Ok(candles);
} }

View File

@ -3,6 +3,7 @@ pub mod minute_candles;
use chrono::Duration; use chrono::Duration;
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::{error, warn};
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
use tokio::{sync::mpsc::Sender, time::sleep}; use tokio::{sync::mpsc::Sender, time::sleep};
@ -23,13 +24,13 @@ pub async fn batch_for_market(
loop { loop {
let sender = candles_sender.clone(); let sender = candles_sender.clone();
let market_clone = market.clone(); let market_clone = market.clone();
// let client = pool.get().await?;
loop { loop {
sleep(Duration::milliseconds(2000).to_std()?).await; sleep(Duration::milliseconds(2000).to_std()?).await;
match batch_inner(pool, &sender, &market_clone).await { match batch_inner(pool, &sender, &market_clone).await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!( error!(
"Batching thread failed for {:?} with error: {:?}", "Batching thread failed for {:?} with error: {:?}",
market_clone.name.clone(), market_clone.name.clone(),
e e
@ -38,7 +39,7 @@ pub async fn batch_for_market(
} }
}; };
} }
println!("Restarting {:?} batching thread", market.name); warn!("Restarting {:?} batching thread", market.name);
} }
} }

View File

@ -1,3 +1,4 @@
use log::{error, info};
use openbook_candles::structs::candle::Candle; use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::structs::openbook::OpenBookFillEvent;
@ -21,6 +22,7 @@ use tokio::sync::mpsc;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
env_logger::init();
dotenv::dotenv().ok(); dotenv::dotenv().ok();
let args: Vec<String> = env::args().collect(); let args: Vec<String> = env::args().collect();
@ -41,7 +43,7 @@ async fn main() -> anyhow::Result<()> {
for m in market_infos.clone() { for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, m.name); target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
} }
println!("{:?}", target_markets); info!("{:?}", target_markets);
let pool = connect_to_database().await?; let pool = connect_to_database().await?;
setup_database(&pool).await?; setup_database(&pool).await?;
@ -71,7 +73,7 @@ async fn main() -> anyhow::Result<()> {
batch_for_market(&batch_pool, &sender, &market) batch_for_market(&batch_pool, &sender, &market)
.await .await
.unwrap(); .unwrap();
println!("SOMETHING WENT WRONG"); error!("batching halted for market {}", &market.name);
})); }));
} }

View File

@ -1,3 +1,4 @@
use log::warn;
use solana_client::client_error::Result as ClientResult; use solana_client::client_error::Result as ClientResult;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_transaction_status::{ use solana_transaction_status::{
@ -37,7 +38,8 @@ pub fn parse_trades_from_openbook_txns(
} }
} }
} }
Err(_) => { Err(e) => {
warn!("rpc error in get_transaction {}", e);
METRIC_RPC_ERRORS_TOTAL METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getTransaction"]) .with_label_values(&["getTransaction"])
.inc(); .inc();

View File

@ -1,4 +1,5 @@
use futures::future::join_all; use futures::future::join_all;
use log::{debug, warn};
use solana_client::{ use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig, rpc_config::RpcTransactionConfig,
@ -61,7 +62,7 @@ pub async fn scrape_transactions(
{ {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
println!("Error in get_signatures_for_address_with_config: {}", e); warn!("rpc error in get_signatures_for_address_with_config: {}", e);
METRIC_RPC_ERRORS_TOTAL METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getSignaturesForAddress"]) .with_label_values(&["getSignaturesForAddress"])
.inc(); .inc();
@ -70,7 +71,7 @@ pub async fn scrape_transactions(
}; };
if sigs.is_empty() { if sigs.is_empty() {
println!("No signatures found"); debug!("No signatures found");
return before_sig; return before_sig;
} }