diff --git a/.gitignore b/.gitignore index fedaa2b..8d062d8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /target .env +*.cer* +*.pks* \ No newline at end of file diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index c0a73b5..8db0405 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -102,11 +102,11 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an continue; } let last = sigs.last().unwrap(); - let last_time = last.block_time.unwrap().clone(); + let last_time = last.block_time.unwrap(); let last_signature = last.signature.clone(); let transactions = sigs .into_iter() - .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) + .map(PgTransaction::from_rpc_confirmed_transaction) .collect::>(); if transactions.is_empty() { diff --git a/src/database/insert.rs b/src/database/insert.rs index 6fe01ff..a9d6bc9 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,7 +1,4 @@ use deadpool_postgres::Pool; -use log::debug; -use std::collections::HashMap; -use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use crate::{ structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction}, @@ -19,8 +16,8 @@ pub async fn insert_fills_atomically( let db_txn = client.build_transaction().start().await?; // 1. Insert fills - if fills.len() > 0 { - let fills_statement = build_fills_upsert_statement_not_crazy(fills); + if !fills.is_empty() { + let fills_statement = build_fills_upsert_statement(fills); db_txn .execute(&fills_statement, &[]) .await @@ -42,78 +39,7 @@ pub async fn insert_fills_atomically( Ok(()) } -pub async fn persist_fill_events( - pool: &Pool, - fill_receiver: &mut Receiver, -) -> anyhow::Result<()> { - loop { - let mut write_batch = HashMap::new(); - while write_batch.len() < 10 { - match fill_receiver.try_recv() { - Ok(event) => { - write_batch.entry(event).or_insert(0); - } - Err(TryRecvError::Empty) => { - if !write_batch.is_empty() { - break; - } else { - continue; - } - } - Err(TryRecvError::Disconnected) => { - panic!("Fills sender must stay alive") - } - }; - } - - if !write_batch.is_empty() { - debug!("writing: {:?} events to DB\n", write_batch.len()); - let upsert_statement = build_fills_upsert_statement(write_batch); - let client = pool.get().await?; - client - .execute(&upsert_statement, &[]) - .await - .map_err_anyhow() - .unwrap(); - } - } -} - -#[allow(deprecated)] -fn build_fills_upsert_statement(events: HashMap) -> String { - let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); - for (idx, event) in events.keys().enumerate() { - let val_str = format!( - "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", - event.signature, - to_timestampz(event.block_time as u64).to_rfc3339(), - event.market, - event.open_orders, - event.open_orders_owner, - event.bid, - event.maker, - event.native_qty_paid, - event.native_qty_received, - event.native_fee_or_rebate, - event.fee_tier, - event.order_id, - event.log_index, - ); - - if idx == 0 { - stmt = format!("{} {}", &stmt, val_str); - } else { - stmt = format!("{}, {}", &stmt, val_str); - } - } - - let handle_conflict = "ON CONFLICT DO NOTHING"; - - stmt = format!("{} {}", stmt, handle_conflict); - stmt -} - -fn build_fills_upsert_statement_not_crazy(fills: Vec) -> String { +fn build_fills_upsert_statement(fills: Vec) -> String { let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); for (idx, fill) in fills.iter().enumerate() { let val_str = format!( diff --git a/src/server/main.rs b/src/server/main.rs index 19a8706..e3ecfee 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -55,7 +55,7 @@ async fn main() -> std::io::Result<()> { .unwrap(); // For collecting metrics on the public api, excluding 404s let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server") - .registry(registry.clone()) + .registry(registry) .exclude_status(StatusCode::NOT_FOUND) .build() .unwrap(); diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index b6f25a4..e1985f7 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -23,7 +23,7 @@ pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Resul let market_clone = market.clone(); loop { - sleep(Duration::milliseconds(2000).to_std()?).await; + sleep(Duration::milliseconds(5000).to_std()?).await; match batch_inner(pool, &market_clone).await { Ok(_) => {} Err(e) => { @@ -61,7 +61,7 @@ async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { } async fn save_candles(pool: &Pool, candles: Vec) -> anyhow::Result<()> { - if candles.len() == 0 { + if candles.is_empty() { return Ok(()); } let upsert_statement = build_candles_upsert_statement(&candles); diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index d5d1491..218f6ac 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -15,10 +15,11 @@ const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, - sig_strings: &Vec, + mut sig_strings: Vec, target_markets: &HashMap, -) -> Vec { +) -> (Vec, Vec) { let mut fills_vector = Vec::::new(); + let mut failed_sigs = vec![]; for (idx, txn) in txns.iter_mut().enumerate() { match txn { Ok(t) => { @@ -42,13 +43,15 @@ pub fn parse_trades_from_openbook_txns( } Err(e) => { warn!("rpc error in get_transaction {}", e); + failed_sigs.push(sig_strings[idx].clone()); METRIC_RPC_ERRORS_TOTAL .with_label_values(&["getTransaction"]) .inc(); } } } - fills_vector + sig_strings.retain(|s| !failed_sigs.contains(s)); + (fills_vector, sig_strings) } fn parse_openbook_fills_from_logs( diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index 19bd514..617df70 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -1,28 +1,26 @@ use deadpool_postgres::Pool; use futures::future::join_all; -use log::{debug, info, warn}; +use log::{debug, warn}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::RpcTransactionConfig, }; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; -use std::{collections::HashMap, str::FromStr}; -use tokio::sync::mpsc::Sender; +use std::{collections::HashMap, time::Duration as WaitDuration}; use crate::{ database::{ fetch::fetch_worker_transactions, insert::{build_transactions_insert_statement, insert_fills_atomically}, }, - structs::{openbook::OpenBookFillEvent, transaction::PgTransaction}, - utils::{AnyhowWrap, Config, OPENBOOK_KEY}, + structs::transaction::PgTransaction, + utils::{AnyhowWrap, OPENBOOK_KEY}, worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL}, }; use super::parsing::parse_trades_from_openbook_txns; - pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> { let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); @@ -53,7 +51,7 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<( } let transactions: Vec = sigs .into_iter() - .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) + .map(PgTransaction::from_rpc_confirmed_transaction) .collect(); debug!("Scraper writing: {:?} txns to DB\n", transactions.len()); @@ -66,7 +64,6 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<( METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns); } // TODO: graceful shutdown - Ok(()) } pub async fn scrape_fills( @@ -80,7 +77,7 @@ pub async fn scrape_fills( loop { let transactions = fetch_worker_transactions(worker_id, pool).await?; - if transactions.len() == 0 { + if transactions.is_empty() { debug!("No signatures found by worker {}", worker_id); tokio::time::sleep(WaitDuration::from_secs(1)).await; continue; @@ -110,12 +107,13 @@ pub async fn scrape_fills( let mut txns = join_all(txn_futs).await; - // TODO: reenable total fills metric - let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); - - // Write any fills to the database, and update the transactions as processed - insert_fills_atomically(pool, worker_id, fills, sig_strings).await?; + let (fills, completed_sigs) = + parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets); + for fill in fills.iter() { + let market_name = target_markets.get(&fill.market).unwrap(); + METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc(); + } + // Write fills to the database, and update properly fetched transactions as processed + insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?; } - - Ok(()) }