From fd4eebc034b76016b508fede25df0afaa25b4365 Mon Sep 17 00:00:00 2001 From: dboures Date: Wed, 7 Jun 2023 23:03:54 -0500 Subject: [PATCH 1/7] refactor: backfilling uses a transactions table --- src/backfill-candles/main.rs | 2 +- src/backfill-trades/main.rs | 261 ++++++++++++++------------- src/database/fetch.rs | 21 +++ src/database/initialize.rs | 44 ++++- src/database/insert.rs | 131 +++++++++++++- src/structs/mod.rs | 1 + src/structs/openbook.rs | 4 +- src/structs/transaction.rs | 52 ++++++ src/utils/mod.rs | 4 + src/worker/main.rs | 2 +- src/worker/trade_fetching/parsing.rs | 4 +- src/worker/trade_fetching/scrape.rs | 4 +- 12 files changed, 390 insertions(+), 140 deletions(-) create mode 100644 src/structs/transaction.rs diff --git a/src/backfill-candles/main.rs b/src/backfill-candles/main.rs index e7c2b39..9901fe4 100644 --- a/src/backfill-candles/main.rs +++ b/src/backfill-candles/main.rs @@ -53,7 +53,7 @@ async fn main() -> anyhow::Result<()> { async fn save_candles(candles: Vec, client: Object) -> anyhow::Result<()> { if !candles.is_empty() { - let upsert_statement = build_candles_upsert_statement(candles); + let upsert_statement = build_candles_upsert_statement(&candles); client .execute(&upsert_statement, &[]) .await diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index 8ac2ec3..d452a90 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -1,23 +1,28 @@ use anchor_lang::prelude::Pubkey; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; +use deadpool_postgres::Pool; use futures::future::join_all; +use log::debug; use openbook_candles::{ - database::{initialize::connect_to_database, insert::persist_fill_events}, + database::{ + fetch::fetch_worker_transactions, + initialize::{connect_to_database, setup_database}, + insert::{add_fills_atomically, build_transactions_insert_statement}, + }, structs::{ markets::{fetch_market_infos, load_markets}, - openbook::OpenBookFillEvent, + transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS}, }, - utils::Config, + utils::{AnyhowWrap, Config, OPENBOOK_KEY}, worker::trade_fetching::parsing::parse_trades_from_openbook_txns, }; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, - rpc_config::RpcTransactionConfig, rpc_response::RpcConfirmedTransactionStatusWithSignature, + rpc_config::RpcTransactionConfig }; use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; -use std::{collections::HashMap, env, str::FromStr}; -use tokio::sync::mpsc::{self, Sender}; +use std::{collections::HashMap, env, str::FromStr,time::Duration as WaitDuration }; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { @@ -26,6 +31,8 @@ async fn main() -> anyhow::Result<()> { assert!(args.len() == 2); let path_to_markets_json = &args[1]; + // let num_days = args[2].parse::().unwrap(); // TODO: implement + let num_days = 1; let rpc_url: String = dotenv::var("RPC_URL").unwrap(); let config = Config { @@ -40,147 +47,147 @@ async fn main() -> anyhow::Result<()> { println!("{:?}", target_markets); let pool = connect_to_database().await?; - let (fill_sender, mut fill_receiver) = mpsc::channel::(1000); + setup_database(&pool).await?; - tokio::spawn(async move { - loop { - persist_fill_events(&pool, &mut fill_receiver) + let mut handles = vec![]; + + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); + handles.push(tokio::spawn(async move { + fetch_signatures(rpc_clone, &pool_clone, num_days).await.unwrap(); + })); + + for id in 0..NUM_TRANSACTION_PARTITIONS { + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); + let markets_clone = target_markets.clone(); + handles.push(tokio::spawn(async move { + backfill(id as i32, rpc_clone, &pool_clone, &markets_clone) .await .unwrap(); - } - }); + })); + } - backfill(rpc_url, &fill_sender, &target_markets).await?; + // TODO: spawn status thread + + futures::future::join_all(handles).await; + Ok(()) +} + +pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> anyhow::Result<()> { + let mut before_sig: Option = None; + let mut now_time = Utc::now().timestamp(); + let end_time = (Utc::now() - Duration::days(num_days)).timestamp(); + let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); + + while now_time > end_time { + let rpc_config = GetConfirmedSignaturesForAddress2Config { + before: before_sig, + until: None, + limit: None, + commitment: Some(CommitmentConfig::confirmed()), + }; + + let sigs = match rpc_client + .get_signatures_for_address_with_config( + &OPENBOOK_KEY, + rpc_config, + ) + .await + { + Ok(sigs) => sigs, + Err(e) => { + println!("Error fetching signatures: {}", e); + continue; + } + }; + if sigs.is_empty() { + println!("No signatures found, trying again"); + continue; + } + let last = sigs.last().unwrap(); + let last_time = last.block_time.unwrap().clone(); + let last_signature = last.signature.clone(); + let transactions = sigs + .into_iter() + .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) + .collect::>(); + + if transactions.is_empty() { + println!("No transactions found, trying again"); + } + debug!("writing: {:?} txns to DB\n", transactions.len()); + let upsert_statement = build_transactions_insert_statement(transactions); + let client = pool.get().await?; + client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + + now_time = last_time; + before_sig = Some(Signature::from_str(&last_signature)?); + let time_left = backfill_time_left(now_time, end_time); + println!( + "{} minutes ~ {} days remaining in the backfill\n", + time_left.num_minutes(), + time_left.num_days() + ); + } Ok(()) } pub async fn backfill( + worker_id: i32, rpc_url: String, - fill_sender: &Sender, + pool: &Pool, target_markets: &HashMap, ) -> anyhow::Result<()> { - println!("backfill started"); - let mut before_sig: Option = None; - let mut now_time = Utc::now().timestamp(); - let end_time = (Utc::now() - Duration::days(1)).timestamp(); + println!("Worker {} up \n", worker_id); + let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); - let mut handles = vec![]; + loop { + let transactions = fetch_worker_transactions(worker_id, pool).await?; + if transactions.len() == 0 { + println!("No signatures found by worker {}", worker_id); + tokio::time::sleep(WaitDuration::from_secs(1)).await; + continue; + }; - while now_time > end_time { - let rpc_client = - RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); - let maybe_r = get_signatures(&rpc_client, before_sig).await; + // for each signature, fetch the transaction + let txn_config = RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Json), + commitment: Some(CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }; - match maybe_r { - Some((last, time, sigs)) => { - now_time = time; - before_sig = Some(last); - let time_left = backfill_time_left(now_time, end_time); - println!( - "{} minutes ~ {} days remaining in the backfill\n", - time_left.num_minutes(), - time_left.num_days() - ); + let sig_strings = transactions + .iter() + .map(|t| t.signature.clone()) + .collect::>(); - let cloned_markets = target_markets.clone(); - let cloned_sender = fill_sender.clone(); - let handle = tokio::spawn(async move { - get_transactions(&rpc_client, sigs, &cloned_sender, &cloned_markets).await; - }); - handles.push(handle); - } - None => {} - } + let signatures: Vec<_> = transactions + .into_iter() + .map(|t| t.signature.parse::().unwrap()) + .collect(); + + let txn_futs: Vec<_> = signatures + .iter() + .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) + .collect(); + + let mut txns = join_all(txn_futs).await; + + let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); + + // Write any fills to the database, and mark the transactions as processed + add_fills_atomically(pool, worker_id, fills, sig_strings).await?; } - futures::future::join_all(handles).await; - - println!("Backfill complete \n"); + // TODO: graceful shutdown + // println!("Worker {} down \n", worker_id); Ok(()) } -pub async fn get_signatures( - rpc_client: &RpcClient, - before_sig: Option, -) -> Option<( - Signature, - i64, - Vec, -)> { - let rpc_config = GetConfirmedSignaturesForAddress2Config { - before: before_sig, - until: None, - limit: None, - commitment: Some(CommitmentConfig::confirmed()), - }; - - let sigs = match rpc_client - .get_signatures_for_address_with_config( - &Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), - rpc_config, - ) - .await - { - Ok(s) => s, - Err(e) => { - println!("Error in get_signatures_for_address_with_config: {}", e); - return None; - } - }; - - if sigs.is_empty() { - println!("No signatures found"); - return None; - } - let last = sigs.last().unwrap(); - // println!("{:?}", last.block_time.unwrap()); - Some(( - Signature::from_str(&last.signature).unwrap(), - last.block_time.unwrap(), - sigs, - )) -} - -pub async fn get_transactions( - rpc_client: &RpcClient, - mut sigs: Vec, - fill_sender: &Sender, - target_markets: &HashMap, -) { - sigs.retain(|sig| sig.err.is_none()); - if sigs.last().is_none() { - return; - } - - let txn_config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Json), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - - let signatures: Vec<_> = sigs - .into_iter() - .map(|sig| sig.signature.parse::().unwrap()) - .collect(); - - let txn_futs: Vec<_> = signatures - .iter() - .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) - .collect(); - - let mut txns = join_all(txn_futs).await; - - let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); - if !fills.is_empty() { - for fill in fills.into_iter() { - // println!("Sending fill {:?}", fill); - if let Err(_) = fill_sender.send(fill).await { - panic!("receiver dropped"); - } - } - } -} - fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration { let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap(); let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap(); diff --git a/src/database/fetch.rs b/src/database/fetch.rs index 37f6043..36ce206 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -4,6 +4,7 @@ use crate::structs::{ openbook::PgOpenBookFill, resolution::Resolution, trader::PgTrader, + transaction::PgTransaction, }; use chrono::{DateTime, Utc}; use deadpool_postgres::{GenericClient, Pool}; @@ -320,3 +321,23 @@ pub async fn fetch_coingecko_24h_high_low( .map(PgCoinGecko24HighLow::from_row) .collect()) } + +/// Fetches unprocessed, non-error transactions for the specified worker partition. +/// Pulls at most 50 transactions at a time. +pub async fn fetch_worker_transactions( + worker_id: i32, + pool: &Pool, +) -> anyhow::Result> { + let client = pool.get().await?; + + let stmt = r#"SELECT signature, program_pk, block_datetime, slot, err, "processed", worker_partition + FROM transactions + where worker_partition = $1 + and err = false + and processed = false + LIMIT 50"#; + + let rows = client.query(stmt, &[&worker_id]).await?; + + Ok(rows.into_iter().map(PgTransaction::from_row).collect()) +} diff --git a/src/database/initialize.rs b/src/database/initialize.rs index 4bc8a9c..29bc4ce 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -67,8 +67,9 @@ pub async fn connect_to_database() -> anyhow::Result { pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> { let candles_table_fut = create_candles_table(pool); + let transactions_table_fut = create_transactions_table(pool); let fills_table_fut = create_fills_table(pool); - let result = tokio::try_join!(candles_table_fut, fills_table_fut); + let result = tokio::try_join!(candles_table_fut, transactions_table_fut, fills_table_fut); match result { Ok(_) => { println!("Successfully configured database"); @@ -137,7 +138,8 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { native_qty_received double precision not null, native_fee_or_rebate double precision not null, fee_tier text not null, - order_id text not null + order_id text not null, + log_index int4 not null )", &[], ) @@ -158,3 +160,41 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { .await?; Ok(()) } + +pub async fn create_transactions_table(pool: &Pool) -> anyhow::Result<()> { + let client = pool.get().await?; + + client + .execute( + "CREATE TABLE IF NOT EXISTS transactions ( + signature text NOT NULL, + program_pk text NOT NULL, + block_datetime timestamptz NOT NULL, + slot bigint NOT NULL, + err bool NOT NULL, + processed bool NOT NULL, + worker_partition int4 NOT NULL, + CONSTRAINT transactions_pk PRIMARY KEY (signature, worker_partition) + ) PARTITION BY LIST (worker_partition);", + &[], + ) + .await?; + + client.batch_execute( + "CREATE INDEX IF NOT EXISTS transactions_processed_err_idx ON ONLY transactions (signature) WHERE processed IS NOT TRUE and err IS NOT TRUE; + CREATE INDEX IF NOT EXISTS transactions_program_pk_idx ON ONLY transactions USING btree (program_pk, slot DESC); + + CREATE TABLE IF NOT EXISTS transactions_0 PARTITION OF transactions FOR VALUES IN (0); + CREATE TABLE IF NOT EXISTS transactions_1 PARTITION OF transactions FOR VALUES IN (1); + CREATE TABLE IF NOT EXISTS transactions_2 PARTITION OF transactions FOR VALUES IN (2); + CREATE TABLE IF NOT EXISTS transactions_3 PARTITION OF transactions FOR VALUES IN (3); + CREATE TABLE IF NOT EXISTS transactions_4 PARTITION OF transactions FOR VALUES IN (4); + CREATE TABLE IF NOT EXISTS transactions_5 PARTITION OF transactions FOR VALUES IN (5); + CREATE TABLE IF NOT EXISTS transactions_6 PARTITION OF transactions FOR VALUES IN (6); + CREATE TABLE IF NOT EXISTS transactions_7 PARTITION OF transactions FOR VALUES IN (7); + CREATE TABLE IF NOT EXISTS transactions_8 PARTITION OF transactions FOR VALUES IN (8); + CREATE TABLE IF NOT EXISTS transactions_9 PARTITION OF transactions FOR VALUES IN (9);" + ).await?; + + Ok(()) +} diff --git a/src/database/insert.rs b/src/database/insert.rs index ba95e87..6fabab2 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -7,10 +7,44 @@ use std::{ use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use crate::{ - structs::{candle::Candle, openbook::OpenBookFillEvent}, + structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction}, utils::{to_timestampz, AnyhowWrap}, }; +pub async fn add_fills_atomically( + pool: &Pool, + worker_id: i32, + fills: Vec, + signatures: Vec, +) -> anyhow::Result<()> { + let mut client = pool.get().await?; + + let db_txn = client.build_transaction().start().await?; + + // 1. Insert fills + if fills.len() > 0 { + let fills_statement = build_fills_upsert_statement_not_crazy(fills); + db_txn + .execute(&fills_statement, &[]) + .await + .map_err_anyhow() + .unwrap(); + } + + // 2. Update txns table as processed + let transactions_statement = + build_transactions_processed_update_statement(worker_id, signatures); + db_txn + .execute(&transactions_statement, &[]) + .await + .map_err_anyhow() + .unwrap(); + + db_txn.commit().await?; + + Ok(()) +} + pub async fn persist_fill_events( pool: &Pool, fill_receiver: &mut Receiver, @@ -50,12 +84,12 @@ pub async fn persist_fill_events( #[allow(deprecated)] fn build_fills_upsert_statement(events: HashMap) -> String { - let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); + let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); for (idx, event) in events.keys().enumerate() { let mut hasher = DefaultHasher::new(); event.hash(&mut hasher); let val_str = format!( - "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})", + "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", hasher.finish(), to_timestampz(event.block_time as u64).to_rfc3339(), event.market, @@ -68,6 +102,7 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin event.native_fee_or_rebate, event.fee_tier, event.order_id, + event.log_index, ); if idx == 0 { @@ -77,7 +112,42 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin } } - let handle_conflict = "ON CONFLICT (id) DO UPDATE SET market=excluded.market"; + let handle_conflict = "ON CONFLICT DO NOTHING"; + + stmt = format!("{} {}", stmt, handle_conflict); + stmt +} + +fn build_fills_upsert_statement_not_crazy(fills: Vec) -> String { + let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); + for (idx, fill) in fills.iter().enumerate() { + let mut hasher = DefaultHasher::new(); + fill.hash(&mut hasher); + let val_str = format!( + "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", + hasher.finish(), + to_timestampz(fill.block_time as u64).to_rfc3339(), + fill.market, + fill.open_orders, + fill.open_orders_owner, + fill.bid, + fill.maker, + fill.native_qty_paid, + fill.native_qty_received, + fill.native_fee_or_rebate, + fill.fee_tier, + fill.order_id, + fill.log_index, + ); + + if idx == 0 { + stmt = format!("{} {}", &stmt, val_str); + } else { + stmt = format!("{}, {}", &stmt, val_str); + } + } + + let handle_conflict = "ON CONFLICT DO NOTHING"; stmt = format!("{} {}", stmt, handle_conflict); stmt @@ -121,6 +191,57 @@ pub fn build_candles_upsert_statement(candles: &Vec) -> String { stmt } +pub fn build_transactions_insert_statement(transactions: Vec) -> String { + let mut stmt = String::from("INSERT INTO transactions (signature, program_pk, block_datetime, slot, err, processed, worker_partition) VALUES"); + for (idx, txn) in transactions.iter().enumerate() { + let val_str = format!( + "(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {})", + txn.signature, + txn.program_pk, + txn.block_datetime.to_rfc3339(), + txn.slot, + txn.err, + txn.processed, + txn.worker_partition, + ); + + if idx == 0 { + stmt = format!("{} {}", &stmt, val_str); + } else { + stmt = format!("{}, {}", &stmt, val_str); + } + } + + let handle_conflict = "ON CONFLICT DO NOTHING"; + + stmt = format!("{} {}", stmt, handle_conflict); + stmt +} + +pub fn build_transactions_processed_update_statement( + worker_id: i32, + processed_signatures: Vec, +) -> String { + let mut stmt = String::from( + "UPDATE transactions + SET processed = true + WHERE transactions.signature IN (", + ); + for (idx, sig) in processed_signatures.iter().enumerate() { + let val_str = if idx == processed_signatures.len() - 1 { + format!("\'{}\'", sig,) + } else { + format!("\'{}\',", sig,) + }; + stmt = format!("{} {}", &stmt, val_str); + } + + let worker_stmt = format!(") AND worker_partition = {} ", worker_id); + + stmt = format!("{} {}", stmt, worker_stmt); + stmt +} + #[cfg(test)] mod tests { use super::*; @@ -145,6 +266,7 @@ mod tests { client_order_id: None, referrer_rebate: Some(841), block_time: 0, + log_index: 1, }; let event_2 = OpenBookFillEvent { @@ -163,6 +285,7 @@ mod tests { client_order_id: None, referrer_rebate: Some(841), block_time: 0, + log_index: 1, }; let mut h1 = DefaultHasher::new(); diff --git a/src/structs/mod.rs b/src/structs/mod.rs index 113578c..e161e2c 100644 --- a/src/structs/mod.rs +++ b/src/structs/mod.rs @@ -6,3 +6,4 @@ pub mod resolution; pub mod slab; pub mod trader; pub mod tradingview; +pub mod transaction; diff --git a/src/structs/openbook.rs b/src/structs/openbook.rs index 9fd65ec..4e35bb5 100644 --- a/src/structs/openbook.rs +++ b/src/structs/openbook.rs @@ -22,7 +22,7 @@ pub struct OpenBookFillEventRaw { pub referrer_rebate: Option, } impl OpenBookFillEventRaw { - pub fn with_time(self, block_time: i64) -> OpenBookFillEvent { + pub fn into_event(self, block_time: i64, log_index: usize) -> OpenBookFillEvent { OpenBookFillEvent { market: self.market, open_orders: self.open_orders, @@ -38,6 +38,7 @@ impl OpenBookFillEventRaw { client_order_id: self.client_order_id, referrer_rebate: self.referrer_rebate, block_time, + log_index, } } } @@ -59,6 +60,7 @@ pub struct OpenBookFillEvent { pub client_order_id: Option, pub referrer_rebate: Option, pub block_time: i64, + pub log_index: usize } #[derive(Copy, Clone, Debug, PartialEq)] diff --git a/src/structs/transaction.rs b/src/structs/transaction.rs new file mode 100644 index 0000000..47909ad --- /dev/null +++ b/src/structs/transaction.rs @@ -0,0 +1,52 @@ +use chrono::{DateTime, Utc}; +use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature; +use tokio_postgres::Row; + +use crate::utils::{to_timestampz, OPENBOOK_KEY}; + +#[derive(Clone, Debug, PartialEq)] +pub struct PgTransaction { + pub signature: String, + pub program_pk: String, + pub block_datetime: DateTime, + pub slot: u64, + pub err: bool, + pub processed: bool, + pub worker_partition: i32, +} + +pub const NUM_TRANSACTION_PARTITIONS: u64 = 10; + +impl PgTransaction { + pub fn from_rpc_confirmed_transaction( + rpc_confirmed_transaction: RpcConfirmedTransactionStatusWithSignature, + ) -> Self { + PgTransaction { + signature: rpc_confirmed_transaction.signature, + program_pk: OPENBOOK_KEY.to_string(), + block_datetime: to_timestampz(rpc_confirmed_transaction.block_time.unwrap() as u64), + slot: rpc_confirmed_transaction.slot, + err: rpc_confirmed_transaction.err.is_some(), + processed: false, + worker_partition: (rpc_confirmed_transaction.slot % NUM_TRANSACTION_PARTITIONS) as i32, + } + } + + pub fn from_row(row: Row) -> Self { + let slot_raw = row.get::(3); + PgTransaction { + signature: row.get(0), + program_pk: row.get(1), + block_datetime: row.get(2), + slot: slot_raw as u64, + err: row.get(4), + processed: row.get(5), + worker_partition: row.get(6), + } + } +} + +pub enum ProcessState { + Processed, + Unprocessed, +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index aab0aec..2ce6bf6 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,9 +1,13 @@ +use anchor_lang::prelude::Pubkey; use chrono::{NaiveDateTime, Utc}; use deadpool_postgres::Pool; use serde_derive::Deserialize; +use solana_sdk::pubkey; use crate::structs::markets::MarketInfo; +pub const OPENBOOK_KEY: Pubkey = pubkey!("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX"); + pub trait AnyhowWrap { type Value; fn map_err_anyhow(self) -> anyhow::Result; diff --git a/src/worker/main.rs b/src/worker/main.rs index 996cc41..77581f9 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -9,7 +9,7 @@ use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::{ database::{ initialize::{connect_to_database, setup_database}, - insert::{persist_fill_events}, + insert::persist_fill_events, }, worker::candle_batching::batch_for_market, }; diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index a31ce5c..3f314ae 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -55,7 +55,7 @@ fn parse_openbook_fills_from_logs( block_time: i64, ) -> Option> { let mut fills_vector = Vec::::new(); - for l in logs { + for (idx, l) in logs.iter().enumerate() { match l.strip_prefix(PROGRAM_DATA) { Some(log) => { let borsh_bytes = match anchor_lang::__private::base64::decode(log) { @@ -68,7 +68,7 @@ fn parse_openbook_fills_from_logs( match event { Ok(e) => { - let fill_event = e.with_time(block_time); + let fill_event = e.into_event(block_time, idx); if target_markets.contains_key(&fill_event.market) { fills_vector.push(fill_event); } diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index bbba445..fcd89c6 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc::Sender; use crate::{ structs::openbook::OpenBookFillEvent, - utils::Config, + utils::{Config, OPENBOOK_KEY}, worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, }; @@ -55,7 +55,7 @@ pub async fn scrape_transactions( let mut sigs = match rpc_client .get_signatures_for_address_with_config( - &Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), + &OPENBOOK_KEY, rpc_config, ) .await From dc1726af431f98ddfd6017da71b939bde585f563 Mon Sep 17 00:00:00 2001 From: dboures Date: Thu, 8 Jun 2023 01:13:41 -0500 Subject: [PATCH 2/7] refactor: replace id with signature in fills table --- src/backfill-trades/main.rs | 3 +- src/database/initialize.rs | 12 ++--- src/database/insert.rs | 73 +++------------------------- src/structs/openbook.rs | 4 +- src/worker/trade_fetching/parsing.rs | 7 ++- src/worker/trade_fetching/scrape.rs | 7 ++- 6 files changed, 25 insertions(+), 81 deletions(-) diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index d452a90..ec515d6 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -177,7 +177,8 @@ pub async fn backfill( let mut txns = join_all(txn_futs).await; - let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); + // TODO: batch fills into groups of 1000 + let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); // Write any fills to the database, and mark the transactions as processed add_fills_atomically(pool, worker_id, fills, sig_strings).await?; diff --git a/src/database/initialize.rs b/src/database/initialize.rs index 29bc4ce..6d032db 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -127,7 +127,7 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { client .execute( "CREATE TABLE IF NOT EXISTS fills ( - id numeric PRIMARY KEY, + signature text not null, time timestamptz not null, market text not null, open_orders text not null, @@ -139,19 +139,13 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { native_fee_or_rebate double precision not null, fee_tier text not null, order_id text not null, - log_index int4 not null + log_index int4 not null, + CONSTRAINT fills_pk PRIMARY KEY (signature, log_index) )", &[], ) .await?; - client - .execute( - "CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)", - &[], - ) - .await?; - client .execute( "CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)", diff --git a/src/database/insert.rs b/src/database/insert.rs index 6fabab2..e7eedaf 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,8 +1,7 @@ use deadpool_postgres::Pool; use log::debug; use std::{ - collections::{hash_map::DefaultHasher, HashMap}, - hash::{Hash, Hasher}, + collections::{HashMap} }; use tokio::sync::mpsc::{error::TryRecvError, Receiver}; @@ -84,13 +83,11 @@ pub async fn persist_fill_events( #[allow(deprecated)] fn build_fills_upsert_statement(events: HashMap) -> String { - let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); + let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); for (idx, event) in events.keys().enumerate() { - let mut hasher = DefaultHasher::new(); - event.hash(&mut hasher); let val_str = format!( "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", - hasher.finish(), + event.signature, to_timestampz(event.block_time as u64).to_rfc3339(), event.market, event.open_orders, @@ -119,13 +116,11 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin } fn build_fills_upsert_statement_not_crazy(fills: Vec) -> String { - let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); + let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); for (idx, fill) in fills.iter().enumerate() { - let mut hasher = DefaultHasher::new(); - fill.hash(&mut hasher); let val_str = format!( - "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", - hasher.finish(), + "(\'{}\', \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", + fill.signature, to_timestampz(fill.block_time as u64).to_rfc3339(), fill.market, fill.open_orders, @@ -241,59 +236,3 @@ pub fn build_transactions_processed_update_statement( stmt = format!("{} {}", stmt, worker_stmt); stmt } - -#[cfg(test)] -mod tests { - use super::*; - use solana_sdk::pubkey::Pubkey; - use std::str::FromStr; - - #[test] - fn test_event_hashing() { - let event_1 = OpenBookFillEvent { - market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), - open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), - open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") - .unwrap(), - bid: false, - maker: false, - native_qty_paid: 200000000, - native_qty_received: 4204317, - native_fee_or_rebate: 1683, - order_id: 387898134381964481824213, - owner_slot: 0, - fee_tier: 0, - client_order_id: None, - referrer_rebate: Some(841), - block_time: 0, - log_index: 1, - }; - - let event_2 = OpenBookFillEvent { - market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), - open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), - open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") - .unwrap(), - bid: false, - maker: false, - native_qty_paid: 200000001, - native_qty_received: 4204317, - native_fee_or_rebate: 1683, - order_id: 387898134381964481824213, - owner_slot: 0, - fee_tier: 0, - client_order_id: None, - referrer_rebate: Some(841), - block_time: 0, - log_index: 1, - }; - - let mut h1 = DefaultHasher::new(); - event_1.hash(&mut h1); - - let mut h2 = DefaultHasher::new(); - event_2.hash(&mut h2); - - assert_ne!(h1.finish(), h2.finish()); - } -} diff --git a/src/structs/openbook.rs b/src/structs/openbook.rs index 4e35bb5..8629ce2 100644 --- a/src/structs/openbook.rs +++ b/src/structs/openbook.rs @@ -22,8 +22,9 @@ pub struct OpenBookFillEventRaw { pub referrer_rebate: Option, } impl OpenBookFillEventRaw { - pub fn into_event(self, block_time: i64, log_index: usize) -> OpenBookFillEvent { + pub fn into_event(self, signature: String, block_time: i64, log_index: usize) -> OpenBookFillEvent { OpenBookFillEvent { + signature, market: self.market, open_orders: self.open_orders, open_orders_owner: self.open_orders_owner, @@ -46,6 +47,7 @@ impl OpenBookFillEventRaw { #[event] #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct OpenBookFillEvent { + pub signature: String, pub market: Pubkey, pub open_orders: Pubkey, pub open_orders_owner: Pubkey, diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 3f314ae..6add236 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -15,10 +15,11 @@ const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, + sig_strings: &Vec, target_markets: &HashMap, ) -> Vec { let mut fills_vector = Vec::::new(); - for txn in txns.iter_mut() { + for (idx, txn) in txns.iter_mut().enumerate() { match txn { Ok(t) => { if let Some(m) = &t.transaction.meta { @@ -27,6 +28,7 @@ pub fn parse_trades_from_openbook_txns( match parse_openbook_fills_from_logs( logs, target_markets, + sig_strings[idx].clone(), t.block_time.unwrap(), ) { Some(mut events) => fills_vector.append(&mut events), @@ -52,6 +54,7 @@ pub fn parse_trades_from_openbook_txns( fn parse_openbook_fills_from_logs( logs: &Vec, target_markets: &HashMap, + signature: String, block_time: i64, ) -> Option> { let mut fills_vector = Vec::::new(); @@ -68,7 +71,7 @@ fn parse_openbook_fills_from_logs( match event { Ok(e) => { - let fill_event = e.into_event(block_time, idx); + let fill_event = e.into_event(signature.clone(), block_time, idx); if target_markets.contains_key(&fill_event.market) { fills_vector.push(fill_event); } diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index fcd89c6..7957bb8 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -89,6 +89,11 @@ pub async fn scrape_transactions( max_supported_transaction_version: Some(0), }; + let sig_strings = sigs + .iter() + .map(|t| t.signature.clone()) + .collect::>(); + let signatures: Vec<_> = sigs .into_iter() .map(|sig| sig.signature.parse::().unwrap()) @@ -101,7 +106,7 @@ pub async fn scrape_transactions( let mut txns = join_all(txn_futs).await; - let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); + let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); if !fills.is_empty() { for fill in fills.into_iter() { let market_name = target_markets.get(&fill.market).unwrap(); From 67b3583ba6efa1bce023e54ea23347a763d6c5a1 Mon Sep 17 00:00:00 2001 From: dboures Date: Thu, 8 Jun 2023 02:47:05 -0500 Subject: [PATCH 3/7] refactor: worker uses a transactions table --- src/backfill-trades/main.rs | 87 ++---------- src/database/insert.rs | 6 +- src/structs/openbook.rs | 9 +- src/worker/main.rs | 41 +++--- src/worker/metrics/mod.rs | 10 +- src/worker/trade_fetching/parsing.rs | 2 +- src/worker/trade_fetching/scrape.rs | 196 +++++++++++++-------------- 7 files changed, 146 insertions(+), 205 deletions(-) diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index ec515d6..c0a73b5 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -1,28 +1,24 @@ use anchor_lang::prelude::Pubkey; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use deadpool_postgres::Pool; -use futures::future::join_all; use log::debug; use openbook_candles::{ database::{ - fetch::fetch_worker_transactions, initialize::{connect_to_database, setup_database}, - insert::{add_fills_atomically, build_transactions_insert_statement}, + insert::build_transactions_insert_statement, }, structs::{ markets::{fetch_market_infos, load_markets}, transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS}, }, utils::{AnyhowWrap, Config, OPENBOOK_KEY}, - worker::trade_fetching::parsing::parse_trades_from_openbook_txns, + worker::trade_fetching::scrape::scrape_fills, }; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, - rpc_config::RpcTransactionConfig }; use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; -use solana_transaction_status::UiTransactionEncoding; -use std::{collections::HashMap, env, str::FromStr,time::Duration as WaitDuration }; +use std::{collections::HashMap, env, str::FromStr}; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { @@ -54,15 +50,18 @@ async fn main() -> anyhow::Result<()> { let rpc_clone = rpc_url.clone(); let pool_clone = pool.clone(); handles.push(tokio::spawn(async move { - fetch_signatures(rpc_clone, &pool_clone, num_days).await.unwrap(); + fetch_signatures(rpc_clone, &pool_clone, num_days) + .await + .unwrap(); })); + // Low priority improvement: batch fills into 1000's per worker for id in 0..NUM_TRANSACTION_PARTITIONS { let rpc_clone = rpc_url.clone(); let pool_clone = pool.clone(); let markets_clone = target_markets.clone(); handles.push(tokio::spawn(async move { - backfill(id as i32, rpc_clone, &pool_clone, &markets_clone) + scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone) .await .unwrap(); })); @@ -89,10 +88,7 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an }; let sigs = match rpc_client - .get_signatures_for_address_with_config( - &OPENBOOK_KEY, - rpc_config, - ) + .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config) .await { Ok(sigs) => sigs, @@ -114,15 +110,15 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an .collect::>(); if transactions.is_empty() { - println!("No transactions found, trying again"); + println!("No transactions found, trying again"); } debug!("writing: {:?} txns to DB\n", transactions.len()); let upsert_statement = build_transactions_insert_statement(transactions); - let client = pool.get().await?; - client - .execute(&upsert_statement, &[]) - .await - .map_err_anyhow()?; + let client = pool.get().await?; + client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; now_time = last_time; before_sig = Some(Signature::from_str(&last_signature)?); @@ -136,59 +132,6 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an Ok(()) } -pub async fn backfill( - worker_id: i32, - rpc_url: String, - pool: &Pool, - target_markets: &HashMap, -) -> anyhow::Result<()> { - println!("Worker {} up \n", worker_id); - let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); - - loop { - let transactions = fetch_worker_transactions(worker_id, pool).await?; - if transactions.len() == 0 { - println!("No signatures found by worker {}", worker_id); - tokio::time::sleep(WaitDuration::from_secs(1)).await; - continue; - }; - - // for each signature, fetch the transaction - let txn_config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Json), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - - let sig_strings = transactions - .iter() - .map(|t| t.signature.clone()) - .collect::>(); - - let signatures: Vec<_> = transactions - .into_iter() - .map(|t| t.signature.parse::().unwrap()) - .collect(); - - let txn_futs: Vec<_> = signatures - .iter() - .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) - .collect(); - - let mut txns = join_all(txn_futs).await; - - // TODO: batch fills into groups of 1000 - let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); - - // Write any fills to the database, and mark the transactions as processed - add_fills_atomically(pool, worker_id, fills, sig_strings).await?; - } - - // TODO: graceful shutdown - // println!("Worker {} down \n", worker_id); - Ok(()) -} - fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration { let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap(); let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap(); diff --git a/src/database/insert.rs b/src/database/insert.rs index e7eedaf..6fe01ff 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,8 +1,6 @@ use deadpool_postgres::Pool; use log::debug; -use std::{ - collections::{HashMap} -}; +use std::collections::HashMap; use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use crate::{ @@ -10,7 +8,7 @@ use crate::{ utils::{to_timestampz, AnyhowWrap}, }; -pub async fn add_fills_atomically( +pub async fn insert_fills_atomically( pool: &Pool, worker_id: i32, fills: Vec, diff --git a/src/structs/openbook.rs b/src/structs/openbook.rs index 8629ce2..d3c0c0b 100644 --- a/src/structs/openbook.rs +++ b/src/structs/openbook.rs @@ -22,7 +22,12 @@ pub struct OpenBookFillEventRaw { pub referrer_rebate: Option, } impl OpenBookFillEventRaw { - pub fn into_event(self, signature: String, block_time: i64, log_index: usize) -> OpenBookFillEvent { + pub fn into_event( + self, + signature: String, + block_time: i64, + log_index: usize, + ) -> OpenBookFillEvent { OpenBookFillEvent { signature, market: self.market, @@ -62,7 +67,7 @@ pub struct OpenBookFillEvent { pub client_order_id: Option, pub referrer_rebate: Option, pub block_time: i64, - pub log_index: usize + pub log_index: usize, } #[derive(Copy, Clone, Debug, PartialEq)] diff --git a/src/worker/main.rs b/src/worker/main.rs index 77581f9..35b6161 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,22 +1,18 @@ use log::{error, info}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; -use openbook_candles::structs::openbook::OpenBookFillEvent; +use openbook_candles::structs::transaction::NUM_TRANSACTION_PARTITIONS; use openbook_candles::utils::Config; use openbook_candles::worker::metrics::{ - serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_FILLS_QUEUE_LENGTH, + serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, }; -use openbook_candles::worker::trade_fetching::scrape::scrape; +use openbook_candles::worker::trade_fetching::scrape::{scrape_fills, scrape_signatures}; use openbook_candles::{ - database::{ - initialize::{connect_to_database, setup_database}, - insert::persist_fill_events, - }, + database::initialize::{connect_to_database, setup_database}, worker::candle_batching::batch_for_market, }; use solana_sdk::pubkey::Pubkey; use std::env; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; -use tokio::sync::mpsc; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { @@ -32,8 +28,6 @@ async fn main() -> anyhow::Result<()> { rpc_url: rpc_url.clone(), }; - let fills_queue_max_size = 10000; - let markets = load_markets(path_to_markets_json); let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); @@ -46,21 +40,26 @@ async fn main() -> anyhow::Result<()> { setup_database(&pool).await?; let mut handles = vec![]; - let (fill_sender, mut fill_receiver) = mpsc::channel::(fills_queue_max_size); - let scrape_fill_sender = fill_sender.clone(); + // signature scraping + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); handles.push(tokio::spawn(async move { - scrape(&config, &scrape_fill_sender, &target_markets).await; + scrape_signatures(rpc_clone, &pool_clone).await.unwrap(); })); - let fills_pool = pool.clone(); - handles.push(tokio::spawn(async move { - loop { - persist_fill_events(&fills_pool, &mut fill_receiver) + // transaction/fill scraping + for id in 0..NUM_TRANSACTION_PARTITIONS { + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); + let markets_clone = target_markets.clone(); + handles.push(tokio::spawn(async move { + scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone) .await .unwrap(); - } - })); + })); + } + // candle batching for market in market_infos.into_iter() { let batch_pool = pool.clone(); handles.push(tokio::spawn(async move { @@ -70,7 +69,6 @@ async fn main() -> anyhow::Result<()> { } let monitor_pool = pool.clone(); - let monitor_fill_channel = fill_sender.clone(); handles.push(tokio::spawn(async move { // TODO: maybe break this out into a new function loop { @@ -78,9 +76,6 @@ async fn main() -> anyhow::Result<()> { METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); METRIC_DB_POOL_SIZE.set(pool_status.size as i64); - METRIC_FILLS_QUEUE_LENGTH - .set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64); - tokio::time::sleep(WaitDuration::from_secs(10)).await; } })); diff --git a/src/worker/metrics/mod.rs b/src/worker/metrics/mod.rs index 09488f3..7f76add 100644 --- a/src/worker/metrics/mod.rs +++ b/src/worker/metrics/mod.rs @@ -2,8 +2,8 @@ use actix_web::{dev::Server, http::StatusCode, App, HttpServer}; use actix_web_prom::PrometheusMetricsBuilder; use lazy_static::lazy_static; use prometheus::{ - register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec, - IntGauge, Registry, + register_int_counter_vec_with_registry, register_int_counter_with_registry, + register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry, }; lazy_static! { @@ -30,9 +30,9 @@ lazy_static! { METRIC_REGISTRY ) .unwrap(); - pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( - "fills_queue_length", - "Current length of the fills write queue", + pub static ref METRIC_TRANSACTIONS_TOTAL: IntCounter = register_int_counter_with_registry!( + "transactions_total", + "Total number of transaction signatures scraped", METRIC_REGISTRY ) .unwrap(); diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 6add236..d5d1491 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -15,7 +15,7 @@ const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, - sig_strings: &Vec, + sig_strings: &Vec, target_markets: &HashMap, ) -> Vec { let mut fills_vector = Vec::::new(); diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index 7957bb8..fa1fd10 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -1,5 +1,6 @@ +use deadpool_postgres::Pool; use futures::future::join_all; -use log::{debug, warn}; +use log::{debug, info, warn}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::RpcTransactionConfig, @@ -10,112 +11,111 @@ use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; use crate::{ - structs::openbook::OpenBookFillEvent, - utils::{Config, OPENBOOK_KEY}, - worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, + database::{ + fetch::fetch_worker_transactions, + insert::{build_transactions_insert_statement, insert_fills_atomically}, + }, + structs::{openbook::OpenBookFillEvent, transaction::PgTransaction}, + utils::{AnyhowWrap, Config, OPENBOOK_KEY}, + worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL}, }; use super::parsing::parse_trades_from_openbook_txns; -pub async fn scrape( - config: &Config, - fill_sender: &Sender, - target_markets: &HashMap, -) { - let rpc_client = - RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed()); - let before_slot = None; +pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> { + let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); + loop { - scrape_transactions( - &rpc_client, - before_slot, - Some(150), - fill_sender, - target_markets, - ) - .await; - tokio::time::sleep(WaitDuration::from_millis(250)).await; - } -} + let rpc_config = GetConfirmedSignaturesForAddress2Config { + before: None, + until: None, + limit: None, + commitment: Some(CommitmentConfig::confirmed()), + }; -pub async fn scrape_transactions( - rpc_client: &RpcClient, - before_sig: Option, - limit: Option, - fill_sender: &Sender, - target_markets: &HashMap, -) -> Option { - let rpc_config = GetConfirmedSignaturesForAddress2Config { - before: before_sig, - until: None, - limit, - commitment: Some(CommitmentConfig::confirmed()), - }; - - let mut sigs = match rpc_client - .get_signatures_for_address_with_config( - &OPENBOOK_KEY, - rpc_config, - ) - .await - { - Ok(s) => s, - Err(e) => { - warn!("rpc error in get_signatures_for_address_with_config: {}", e); - METRIC_RPC_ERRORS_TOTAL - .with_label_values(&["getSignaturesForAddress"]) - .inc(); - return before_sig; - } - }; - - if sigs.is_empty() { - debug!("No signatures found"); - return before_sig; - } - - let last = sigs.last().unwrap(); - let request_last_sig = Signature::from_str(&last.signature).unwrap(); - - sigs.retain(|sig| sig.err.is_none()); - if sigs.last().is_none() { - return Some(request_last_sig); - } - - let txn_config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Json), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - - let sig_strings = sigs - .iter() - .map(|t| t.signature.clone()) - .collect::>(); - - let signatures: Vec<_> = sigs - .into_iter() - .map(|sig| sig.signature.parse::().unwrap()) - .collect(); - - let txn_futs: Vec<_> = signatures - .iter() - .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) - .collect(); - - let mut txns = join_all(txn_futs).await; - - let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); - if !fills.is_empty() { - for fill in fills.into_iter() { - let market_name = target_markets.get(&fill.market).unwrap(); - if let Err(_) = fill_sender.send(fill).await { - panic!("receiver dropped"); + let sigs = match rpc_client + .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config) + .await + { + Ok(sigs) => sigs, + Err(e) => { + warn!("rpc error in get_signatures_for_address_with_config: {}", e); + METRIC_RPC_ERRORS_TOTAL + .with_label_values(&["getSignaturesForAddress"]) + .inc(); + continue; } - METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc(); + }; + if sigs.is_empty() { + debug!("No signatures found, trying again"); + continue; } + let transactions: Vec = sigs + .into_iter() + .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) + .collect(); + + debug!("Scraper writing: {:?} txns to DB\n", transactions.len()); + let upsert_statement = build_transactions_insert_statement(transactions); + let client = pool.get().await?; + let num_txns = client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns); + } + // TODO: graceful shutdown + Ok(()) +} + +pub async fn scrape_fills( + worker_id: i32, + rpc_url: String, + pool: &Pool, + target_markets: &HashMap, +) -> anyhow::Result<()> { + debug!("Worker {} started \n", worker_id); + let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); + + loop { + let transactions = fetch_worker_transactions(worker_id, pool).await?; + if transactions.len() == 0 { + debug!("No signatures found by worker {}", worker_id); + tokio::time::sleep(WaitDuration::from_secs(1)).await; + continue; + }; + + // for each signature, fetch the transaction + let txn_config = RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Json), + commitment: Some(CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }; + + let sig_strings = transactions + .iter() + .map(|t| t.signature.clone()) + .collect::>(); + + let signatures: Vec<_> = transactions + .into_iter() + .map(|t| t.signature.parse::().unwrap()) + .collect(); + + let txn_futs: Vec<_> = signatures + .iter() + .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) + .collect(); + + let mut txns = join_all(txn_futs).await; + + // TODO: reenable total fills metric + let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); + + // Write any fills to the database, and update the transactions as processed + insert_fills_atomically(pool, worker_id, fills, sig_strings).await?; } - Some(request_last_sig) + Ok(()) } From 4ba8fd5ec52dd0e416f6020a059baa5b9e852cc5 Mon Sep 17 00:00:00 2001 From: dboures Date: Thu, 8 Jun 2023 22:19:53 -0500 Subject: [PATCH 4/7] fix: don't mark unfetched txns as processed --- src/worker/trade_fetching/parsing.rs | 9 ++++++--- src/worker/trade_fetching/scrape.rs | 6 +++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index d5d1491..0ead868 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -15,10 +15,11 @@ const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, - sig_strings: &Vec, + mut sig_strings: Vec, target_markets: &HashMap, -) -> Vec { +) -> (Vec, Vec) { let mut fills_vector = Vec::::new(); + let mut failed_sigs = vec![]; for (idx, txn) in txns.iter_mut().enumerate() { match txn { Ok(t) => { @@ -42,13 +43,15 @@ pub fn parse_trades_from_openbook_txns( } Err(e) => { warn!("rpc error in get_transaction {}", e); + failed_sigs.push(sig_strings[idx].clone()); METRIC_RPC_ERRORS_TOTAL .with_label_values(&["getTransaction"]) .inc(); } } } - fills_vector + sig_strings.retain(|s| !failed_sigs.contains(&s)); + (fills_vector, sig_strings) } fn parse_openbook_fills_from_logs( diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index fa1fd10..9f79a1e 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -111,10 +111,10 @@ pub async fn scrape_fills( let mut txns = join_all(txn_futs).await; // TODO: reenable total fills metric - let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); + let (fills, completed_sigs) = parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets); - // Write any fills to the database, and update the transactions as processed - insert_fills_atomically(pool, worker_id, fills, sig_strings).await?; + // Write fills to the database, and update properly fetched transactions as processed + insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?; } Ok(()) From c7ee74b872fd5e2ba88b0ce7e9db3c9a125f7a67 Mon Sep 17 00:00:00 2001 From: dboures Date: Thu, 8 Jun 2023 22:56:49 -0500 Subject: [PATCH 5/7] style: clippy and dead code cleanup --- .gitignore | 2 + src/backfill-trades/main.rs | 4 +- src/database/insert.rs | 83 ++-------------------------- src/server/main.rs | 2 +- src/worker/candle_batching/mod.rs | 2 +- src/worker/trade_fetching/parsing.rs | 2 +- src/worker/trade_fetching/scrape.rs | 19 +++---- 7 files changed, 21 insertions(+), 93 deletions(-) diff --git a/.gitignore b/.gitignore index fedaa2b..8d062d8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /target .env +*.cer* +*.pks* \ No newline at end of file diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index c0a73b5..8db0405 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -102,11 +102,11 @@ pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> an continue; } let last = sigs.last().unwrap(); - let last_time = last.block_time.unwrap().clone(); + let last_time = last.block_time.unwrap(); let last_signature = last.signature.clone(); let transactions = sigs .into_iter() - .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) + .map(PgTransaction::from_rpc_confirmed_transaction) .collect::>(); if transactions.is_empty() { diff --git a/src/database/insert.rs b/src/database/insert.rs index 6fe01ff..ab4d613 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,7 +1,7 @@ use deadpool_postgres::Pool; -use log::debug; -use std::collections::HashMap; -use tokio::sync::mpsc::{error::TryRecvError, Receiver}; + + + use crate::{ structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction}, @@ -19,8 +19,8 @@ pub async fn insert_fills_atomically( let db_txn = client.build_transaction().start().await?; // 1. Insert fills - if fills.len() > 0 { - let fills_statement = build_fills_upsert_statement_not_crazy(fills); + if !fills.is_empty() { + let fills_statement = build_fills_upsert_statement(fills); db_txn .execute(&fills_statement, &[]) .await @@ -42,78 +42,7 @@ pub async fn insert_fills_atomically( Ok(()) } -pub async fn persist_fill_events( - pool: &Pool, - fill_receiver: &mut Receiver, -) -> anyhow::Result<()> { - loop { - let mut write_batch = HashMap::new(); - while write_batch.len() < 10 { - match fill_receiver.try_recv() { - Ok(event) => { - write_batch.entry(event).or_insert(0); - } - Err(TryRecvError::Empty) => { - if !write_batch.is_empty() { - break; - } else { - continue; - } - } - Err(TryRecvError::Disconnected) => { - panic!("Fills sender must stay alive") - } - }; - } - - if !write_batch.is_empty() { - debug!("writing: {:?} events to DB\n", write_batch.len()); - let upsert_statement = build_fills_upsert_statement(write_batch); - let client = pool.get().await?; - client - .execute(&upsert_statement, &[]) - .await - .map_err_anyhow() - .unwrap(); - } - } -} - -#[allow(deprecated)] -fn build_fills_upsert_statement(events: HashMap) -> String { - let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); - for (idx, event) in events.keys().enumerate() { - let val_str = format!( - "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", - event.signature, - to_timestampz(event.block_time as u64).to_rfc3339(), - event.market, - event.open_orders, - event.open_orders_owner, - event.bid, - event.maker, - event.native_qty_paid, - event.native_qty_received, - event.native_fee_or_rebate, - event.fee_tier, - event.order_id, - event.log_index, - ); - - if idx == 0 { - stmt = format!("{} {}", &stmt, val_str); - } else { - stmt = format!("{}, {}", &stmt, val_str); - } - } - - let handle_conflict = "ON CONFLICT DO NOTHING"; - - stmt = format!("{} {}", stmt, handle_conflict); - stmt -} - -fn build_fills_upsert_statement_not_crazy(fills: Vec) -> String { +fn build_fills_upsert_statement(fills: Vec) -> String { let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); for (idx, fill) in fills.iter().enumerate() { let val_str = format!( diff --git a/src/server/main.rs b/src/server/main.rs index 19a8706..e3ecfee 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -55,7 +55,7 @@ async fn main() -> std::io::Result<()> { .unwrap(); // For collecting metrics on the public api, excluding 404s let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server") - .registry(registry.clone()) + .registry(registry) .exclude_status(StatusCode::NOT_FOUND) .build() .unwrap(); diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index 310f9e5..6699fae 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -53,7 +53,7 @@ async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { } async fn save_candles(pool: &Pool, candles: Vec) -> anyhow::Result<()> { - if candles.len() == 0 { + if candles.is_empty() { return Ok(()); } let upsert_statement = build_candles_upsert_statement(&candles); diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 0ead868..218f6ac 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -50,7 +50,7 @@ pub fn parse_trades_from_openbook_txns( } } } - sig_strings.retain(|s| !failed_sigs.contains(&s)); + sig_strings.retain(|s| !failed_sigs.contains(s)); (fills_vector, sig_strings) } diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index 9f79a1e..dfe1a11 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -1,23 +1,23 @@ use deadpool_postgres::Pool; use futures::future::join_all; -use log::{debug, info, warn}; +use log::{debug, warn}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::RpcTransactionConfig, }; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; -use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; -use tokio::sync::mpsc::Sender; +use std::{collections::HashMap, time::Duration as WaitDuration}; + use crate::{ database::{ fetch::fetch_worker_transactions, insert::{build_transactions_insert_statement, insert_fills_atomically}, }, - structs::{openbook::OpenBookFillEvent, transaction::PgTransaction}, - utils::{AnyhowWrap, Config, OPENBOOK_KEY}, - worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL}, + structs::{transaction::PgTransaction}, + utils::{AnyhowWrap, OPENBOOK_KEY}, + worker::metrics::{METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL}, }; use super::parsing::parse_trades_from_openbook_txns; @@ -53,7 +53,7 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<( } let transactions: Vec = sigs .into_iter() - .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) + .map(PgTransaction::from_rpc_confirmed_transaction) .collect(); debug!("Scraper writing: {:?} txns to DB\n", transactions.len()); @@ -66,7 +66,6 @@ pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<( METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns); } // TODO: graceful shutdown - Ok(()) } pub async fn scrape_fills( @@ -80,7 +79,7 @@ pub async fn scrape_fills( loop { let transactions = fetch_worker_transactions(worker_id, pool).await?; - if transactions.len() == 0 { + if transactions.is_empty() { debug!("No signatures found by worker {}", worker_id); tokio::time::sleep(WaitDuration::from_secs(1)).await; continue; @@ -116,6 +115,4 @@ pub async fn scrape_fills( // Write fills to the database, and update properly fetched transactions as processed insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?; } - - Ok(()) } From d4afc71c04c82906eb5e82942d39f839763bcd3d Mon Sep 17 00:00:00 2001 From: dboures Date: Thu, 8 Jun 2023 23:50:48 -0500 Subject: [PATCH 6/7] reenable total fills metric --- src/database/insert.rs | 3 --- src/worker/trade_fetching/scrape.rs | 15 ++++++++------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/database/insert.rs b/src/database/insert.rs index ab4d613..a9d6bc9 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,8 +1,5 @@ use deadpool_postgres::Pool; - - - use crate::{ structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction}, utils::{to_timestampz, AnyhowWrap}, diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index dfe1a11..617df70 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -9,20 +9,18 @@ use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature: use solana_transaction_status::UiTransactionEncoding; use std::{collections::HashMap, time::Duration as WaitDuration}; - use crate::{ database::{ fetch::fetch_worker_transactions, insert::{build_transactions_insert_statement, insert_fills_atomically}, }, - structs::{transaction::PgTransaction}, + structs::transaction::PgTransaction, utils::{AnyhowWrap, OPENBOOK_KEY}, - worker::metrics::{METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL}, + worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL}, }; use super::parsing::parse_trades_from_openbook_txns; - pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> { let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); @@ -109,9 +107,12 @@ pub async fn scrape_fills( let mut txns = join_all(txn_futs).await; - // TODO: reenable total fills metric - let (fills, completed_sigs) = parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets); - + let (fills, completed_sigs) = + parse_trades_from_openbook_txns(&mut txns, sig_strings, target_markets); + for fill in fills.iter() { + let market_name = target_markets.get(&fill.market).unwrap(); + METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc(); + } // Write fills to the database, and update properly fetched transactions as processed insert_fills_atomically(pool, worker_id, fills, completed_sigs).await?; } From 8b3e4b95a7864cfe745d72bc7429c80614850000 Mon Sep 17 00:00:00 2001 From: dboures Date: Fri, 9 Jun 2023 07:55:41 -0500 Subject: [PATCH 7/7] increase candle cooldown to 5 seconds --- src/worker/candle_batching/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index 6699fae..49ddbaf 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -21,7 +21,7 @@ pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Resul let market_clone = market.clone(); loop { - sleep(Duration::milliseconds(2000).to_std()?).await; + sleep(Duration::milliseconds(5000).to_std()?).await; match batch_inner(pool, &market_clone).await { Ok(_) => {} Err(e) => {