diff --git a/src/backfill-candles/main.rs b/src/backfill-candles/main.rs index e7c2b39..9901fe4 100644 --- a/src/backfill-candles/main.rs +++ b/src/backfill-candles/main.rs @@ -53,7 +53,7 @@ async fn main() -> anyhow::Result<()> { async fn save_candles(candles: Vec, client: Object) -> anyhow::Result<()> { if !candles.is_empty() { - let upsert_statement = build_candles_upsert_statement(candles); + let upsert_statement = build_candles_upsert_statement(&candles); client .execute(&upsert_statement, &[]) .await diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index 8ac2ec3..c0a73b5 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -1,23 +1,24 @@ use anchor_lang::prelude::Pubkey; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; -use futures::future::join_all; +use deadpool_postgres::Pool; +use log::debug; use openbook_candles::{ - database::{initialize::connect_to_database, insert::persist_fill_events}, + database::{ + initialize::{connect_to_database, setup_database}, + insert::build_transactions_insert_statement, + }, structs::{ markets::{fetch_market_infos, load_markets}, - openbook::OpenBookFillEvent, + transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS}, }, - utils::Config, - worker::trade_fetching::parsing::parse_trades_from_openbook_txns, + utils::{AnyhowWrap, Config, OPENBOOK_KEY}, + worker::trade_fetching::scrape::scrape_fills, }; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, - rpc_config::RpcTransactionConfig, rpc_response::RpcConfirmedTransactionStatusWithSignature, }; use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; -use solana_transaction_status::UiTransactionEncoding; use std::{collections::HashMap, env, str::FromStr}; -use tokio::sync::mpsc::{self, Sender}; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { @@ -26,6 +27,8 @@ async fn main() -> anyhow::Result<()> { assert!(args.len() == 2); let path_to_markets_json = &args[1]; + // let num_days = args[2].parse::().unwrap(); // TODO: implement + let num_days = 1; let rpc_url: String = dotenv::var("RPC_URL").unwrap(); let config = Config { @@ -40,145 +43,93 @@ async fn main() -> anyhow::Result<()> { println!("{:?}", target_markets); let pool = connect_to_database().await?; - let (fill_sender, mut fill_receiver) = mpsc::channel::(1000); - - tokio::spawn(async move { - loop { - persist_fill_events(&pool, &mut fill_receiver) - .await - .unwrap(); - } - }); - - backfill(rpc_url, &fill_sender, &target_markets).await?; - Ok(()) -} - -pub async fn backfill( - rpc_url: String, - fill_sender: &Sender, - target_markets: &HashMap, -) -> anyhow::Result<()> { - println!("backfill started"); - let mut before_sig: Option = None; - let mut now_time = Utc::now().timestamp(); - let end_time = (Utc::now() - Duration::days(1)).timestamp(); + setup_database(&pool).await?; let mut handles = vec![]; - while now_time > end_time { - let rpc_client = - RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); - let maybe_r = get_signatures(&rpc_client, before_sig).await; + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); + handles.push(tokio::spawn(async move { + fetch_signatures(rpc_clone, &pool_clone, num_days) + .await + .unwrap(); + })); - match maybe_r { - Some((last, time, sigs)) => { - now_time = time; - before_sig = Some(last); - let time_left = backfill_time_left(now_time, end_time); - println!( - "{} minutes ~ {} days remaining in the backfill\n", - time_left.num_minutes(), - time_left.num_days() - ); - - let cloned_markets = target_markets.clone(); - let cloned_sender = fill_sender.clone(); - let handle = tokio::spawn(async move { - get_transactions(&rpc_client, sigs, &cloned_sender, &cloned_markets).await; - }); - handles.push(handle); - } - None => {} - } + // Low priority improvement: batch fills into 1000's per worker + for id in 0..NUM_TRANSACTION_PARTITIONS { + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); + let markets_clone = target_markets.clone(); + handles.push(tokio::spawn(async move { + scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone) + .await + .unwrap(); + })); } - futures::future::join_all(handles).await; + // TODO: spawn status thread - println!("Backfill complete \n"); + futures::future::join_all(handles).await; Ok(()) } -pub async fn get_signatures( - rpc_client: &RpcClient, - before_sig: Option, -) -> Option<( - Signature, - i64, - Vec, -)> { - let rpc_config = GetConfirmedSignaturesForAddress2Config { - before: before_sig, - until: None, - limit: None, - commitment: Some(CommitmentConfig::confirmed()), - }; +pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> anyhow::Result<()> { + let mut before_sig: Option = None; + let mut now_time = Utc::now().timestamp(); + let end_time = (Utc::now() - Duration::days(num_days)).timestamp(); + let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); - let sigs = match rpc_client - .get_signatures_for_address_with_config( - &Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), - rpc_config, - ) - .await - { - Ok(s) => s, - Err(e) => { - println!("Error in get_signatures_for_address_with_config: {}", e); - return None; - } - }; + while now_time > end_time { + let rpc_config = GetConfirmedSignaturesForAddress2Config { + before: before_sig, + until: None, + limit: None, + commitment: Some(CommitmentConfig::confirmed()), + }; - if sigs.is_empty() { - println!("No signatures found"); - return None; - } - let last = sigs.last().unwrap(); - // println!("{:?}", last.block_time.unwrap()); - Some(( - Signature::from_str(&last.signature).unwrap(), - last.block_time.unwrap(), - sigs, - )) -} - -pub async fn get_transactions( - rpc_client: &RpcClient, - mut sigs: Vec, - fill_sender: &Sender, - target_markets: &HashMap, -) { - sigs.retain(|sig| sig.err.is_none()); - if sigs.last().is_none() { - return; - } - - let txn_config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Json), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - - let signatures: Vec<_> = sigs - .into_iter() - .map(|sig| sig.signature.parse::().unwrap()) - .collect(); - - let txn_futs: Vec<_> = signatures - .iter() - .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) - .collect(); - - let mut txns = join_all(txn_futs).await; - - let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); - if !fills.is_empty() { - for fill in fills.into_iter() { - // println!("Sending fill {:?}", fill); - if let Err(_) = fill_sender.send(fill).await { - panic!("receiver dropped"); + let sigs = match rpc_client + .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config) + .await + { + Ok(sigs) => sigs, + Err(e) => { + println!("Error fetching signatures: {}", e); + continue; } + }; + if sigs.is_empty() { + println!("No signatures found, trying again"); + continue; } + let last = sigs.last().unwrap(); + let last_time = last.block_time.unwrap().clone(); + let last_signature = last.signature.clone(); + let transactions = sigs + .into_iter() + .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) + .collect::>(); + + if transactions.is_empty() { + println!("No transactions found, trying again"); + } + debug!("writing: {:?} txns to DB\n", transactions.len()); + let upsert_statement = build_transactions_insert_statement(transactions); + let client = pool.get().await?; + client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + + now_time = last_time; + before_sig = Some(Signature::from_str(&last_signature)?); + let time_left = backfill_time_left(now_time, end_time); + println!( + "{} minutes ~ {} days remaining in the backfill\n", + time_left.num_minutes(), + time_left.num_days() + ); } + Ok(()) } fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration { diff --git a/src/database/fetch.rs b/src/database/fetch.rs index 37f6043..36ce206 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -4,6 +4,7 @@ use crate::structs::{ openbook::PgOpenBookFill, resolution::Resolution, trader::PgTrader, + transaction::PgTransaction, }; use chrono::{DateTime, Utc}; use deadpool_postgres::{GenericClient, Pool}; @@ -320,3 +321,23 @@ pub async fn fetch_coingecko_24h_high_low( .map(PgCoinGecko24HighLow::from_row) .collect()) } + +/// Fetches unprocessed, non-error transactions for the specified worker partition. +/// Pulls at most 50 transactions at a time. +pub async fn fetch_worker_transactions( + worker_id: i32, + pool: &Pool, +) -> anyhow::Result> { + let client = pool.get().await?; + + let stmt = r#"SELECT signature, program_pk, block_datetime, slot, err, "processed", worker_partition + FROM transactions + where worker_partition = $1 + and err = false + and processed = false + LIMIT 50"#; + + let rows = client.query(stmt, &[&worker_id]).await?; + + Ok(rows.into_iter().map(PgTransaction::from_row).collect()) +} diff --git a/src/database/initialize.rs b/src/database/initialize.rs index 4bc8a9c..6d032db 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -67,8 +67,9 @@ pub async fn connect_to_database() -> anyhow::Result { pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> { let candles_table_fut = create_candles_table(pool); + let transactions_table_fut = create_transactions_table(pool); let fills_table_fut = create_fills_table(pool); - let result = tokio::try_join!(candles_table_fut, fills_table_fut); + let result = tokio::try_join!(candles_table_fut, transactions_table_fut, fills_table_fut); match result { Ok(_) => { println!("Successfully configured database"); @@ -126,7 +127,7 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { client .execute( "CREATE TABLE IF NOT EXISTS fills ( - id numeric PRIMARY KEY, + signature text not null, time timestamptz not null, market text not null, open_orders text not null, @@ -137,19 +138,14 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { native_qty_received double precision not null, native_fee_or_rebate double precision not null, fee_tier text not null, - order_id text not null + order_id text not null, + log_index int4 not null, + CONSTRAINT fills_pk PRIMARY KEY (signature, log_index) )", &[], ) .await?; - client - .execute( - "CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)", - &[], - ) - .await?; - client .execute( "CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)", @@ -158,3 +154,41 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { .await?; Ok(()) } + +pub async fn create_transactions_table(pool: &Pool) -> anyhow::Result<()> { + let client = pool.get().await?; + + client + .execute( + "CREATE TABLE IF NOT EXISTS transactions ( + signature text NOT NULL, + program_pk text NOT NULL, + block_datetime timestamptz NOT NULL, + slot bigint NOT NULL, + err bool NOT NULL, + processed bool NOT NULL, + worker_partition int4 NOT NULL, + CONSTRAINT transactions_pk PRIMARY KEY (signature, worker_partition) + ) PARTITION BY LIST (worker_partition);", + &[], + ) + .await?; + + client.batch_execute( + "CREATE INDEX IF NOT EXISTS transactions_processed_err_idx ON ONLY transactions (signature) WHERE processed IS NOT TRUE and err IS NOT TRUE; + CREATE INDEX IF NOT EXISTS transactions_program_pk_idx ON ONLY transactions USING btree (program_pk, slot DESC); + + CREATE TABLE IF NOT EXISTS transactions_0 PARTITION OF transactions FOR VALUES IN (0); + CREATE TABLE IF NOT EXISTS transactions_1 PARTITION OF transactions FOR VALUES IN (1); + CREATE TABLE IF NOT EXISTS transactions_2 PARTITION OF transactions FOR VALUES IN (2); + CREATE TABLE IF NOT EXISTS transactions_3 PARTITION OF transactions FOR VALUES IN (3); + CREATE TABLE IF NOT EXISTS transactions_4 PARTITION OF transactions FOR VALUES IN (4); + CREATE TABLE IF NOT EXISTS transactions_5 PARTITION OF transactions FOR VALUES IN (5); + CREATE TABLE IF NOT EXISTS transactions_6 PARTITION OF transactions FOR VALUES IN (6); + CREATE TABLE IF NOT EXISTS transactions_7 PARTITION OF transactions FOR VALUES IN (7); + CREATE TABLE IF NOT EXISTS transactions_8 PARTITION OF transactions FOR VALUES IN (8); + CREATE TABLE IF NOT EXISTS transactions_9 PARTITION OF transactions FOR VALUES IN (9);" + ).await?; + + Ok(()) +} diff --git a/src/database/insert.rs b/src/database/insert.rs index ba95e87..6fe01ff 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,16 +1,47 @@ use deadpool_postgres::Pool; use log::debug; -use std::{ - collections::{hash_map::DefaultHasher, HashMap}, - hash::{Hash, Hasher}, -}; +use std::collections::HashMap; use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use crate::{ - structs::{candle::Candle, openbook::OpenBookFillEvent}, + structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction}, utils::{to_timestampz, AnyhowWrap}, }; +pub async fn insert_fills_atomically( + pool: &Pool, + worker_id: i32, + fills: Vec, + signatures: Vec, +) -> anyhow::Result<()> { + let mut client = pool.get().await?; + + let db_txn = client.build_transaction().start().await?; + + // 1. Insert fills + if fills.len() > 0 { + let fills_statement = build_fills_upsert_statement_not_crazy(fills); + db_txn + .execute(&fills_statement, &[]) + .await + .map_err_anyhow() + .unwrap(); + } + + // 2. Update txns table as processed + let transactions_statement = + build_transactions_processed_update_statement(worker_id, signatures); + db_txn + .execute(&transactions_statement, &[]) + .await + .map_err_anyhow() + .unwrap(); + + db_txn.commit().await?; + + Ok(()) +} + pub async fn persist_fill_events( pool: &Pool, fill_receiver: &mut Receiver, @@ -50,13 +81,11 @@ pub async fn persist_fill_events( #[allow(deprecated)] fn build_fills_upsert_statement(events: HashMap) -> String { - let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); + let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); for (idx, event) in events.keys().enumerate() { - let mut hasher = DefaultHasher::new(); - event.hash(&mut hasher); let val_str = format!( - "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})", - hasher.finish(), + "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", + event.signature, to_timestampz(event.block_time as u64).to_rfc3339(), event.market, event.open_orders, @@ -68,6 +97,7 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin event.native_fee_or_rebate, event.fee_tier, event.order_id, + event.log_index, ); if idx == 0 { @@ -77,7 +107,40 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin } } - let handle_conflict = "ON CONFLICT (id) DO UPDATE SET market=excluded.market"; + let handle_conflict = "ON CONFLICT DO NOTHING"; + + stmt = format!("{} {}", stmt, handle_conflict); + stmt +} + +fn build_fills_upsert_statement_not_crazy(fills: Vec) -> String { + let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); + for (idx, fill) in fills.iter().enumerate() { + let val_str = format!( + "(\'{}\', \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", + fill.signature, + to_timestampz(fill.block_time as u64).to_rfc3339(), + fill.market, + fill.open_orders, + fill.open_orders_owner, + fill.bid, + fill.maker, + fill.native_qty_paid, + fill.native_qty_received, + fill.native_fee_or_rebate, + fill.fee_tier, + fill.order_id, + fill.log_index, + ); + + if idx == 0 { + stmt = format!("{} {}", &stmt, val_str); + } else { + stmt = format!("{}, {}", &stmt, val_str); + } + } + + let handle_conflict = "ON CONFLICT DO NOTHING"; stmt = format!("{} {}", stmt, handle_conflict); stmt @@ -121,56 +184,53 @@ pub fn build_candles_upsert_statement(candles: &Vec) -> String { stmt } -#[cfg(test)] -mod tests { - use super::*; - use solana_sdk::pubkey::Pubkey; - use std::str::FromStr; +pub fn build_transactions_insert_statement(transactions: Vec) -> String { + let mut stmt = String::from("INSERT INTO transactions (signature, program_pk, block_datetime, slot, err, processed, worker_partition) VALUES"); + for (idx, txn) in transactions.iter().enumerate() { + let val_str = format!( + "(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {})", + txn.signature, + txn.program_pk, + txn.block_datetime.to_rfc3339(), + txn.slot, + txn.err, + txn.processed, + txn.worker_partition, + ); - #[test] - fn test_event_hashing() { - let event_1 = OpenBookFillEvent { - market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), - open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), - open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") - .unwrap(), - bid: false, - maker: false, - native_qty_paid: 200000000, - native_qty_received: 4204317, - native_fee_or_rebate: 1683, - order_id: 387898134381964481824213, - owner_slot: 0, - fee_tier: 0, - client_order_id: None, - referrer_rebate: Some(841), - block_time: 0, - }; - - let event_2 = OpenBookFillEvent { - market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), - open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), - open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") - .unwrap(), - bid: false, - maker: false, - native_qty_paid: 200000001, - native_qty_received: 4204317, - native_fee_or_rebate: 1683, - order_id: 387898134381964481824213, - owner_slot: 0, - fee_tier: 0, - client_order_id: None, - referrer_rebate: Some(841), - block_time: 0, - }; - - let mut h1 = DefaultHasher::new(); - event_1.hash(&mut h1); - - let mut h2 = DefaultHasher::new(); - event_2.hash(&mut h2); - - assert_ne!(h1.finish(), h2.finish()); + if idx == 0 { + stmt = format!("{} {}", &stmt, val_str); + } else { + stmt = format!("{}, {}", &stmt, val_str); + } } + + let handle_conflict = "ON CONFLICT DO NOTHING"; + + stmt = format!("{} {}", stmt, handle_conflict); + stmt +} + +pub fn build_transactions_processed_update_statement( + worker_id: i32, + processed_signatures: Vec, +) -> String { + let mut stmt = String::from( + "UPDATE transactions + SET processed = true + WHERE transactions.signature IN (", + ); + for (idx, sig) in processed_signatures.iter().enumerate() { + let val_str = if idx == processed_signatures.len() - 1 { + format!("\'{}\'", sig,) + } else { + format!("\'{}\',", sig,) + }; + stmt = format!("{} {}", &stmt, val_str); + } + + let worker_stmt = format!(") AND worker_partition = {} ", worker_id); + + stmt = format!("{} {}", stmt, worker_stmt); + stmt } diff --git a/src/structs/mod.rs b/src/structs/mod.rs index 113578c..e161e2c 100644 --- a/src/structs/mod.rs +++ b/src/structs/mod.rs @@ -6,3 +6,4 @@ pub mod resolution; pub mod slab; pub mod trader; pub mod tradingview; +pub mod transaction; diff --git a/src/structs/openbook.rs b/src/structs/openbook.rs index 9fd65ec..d3c0c0b 100644 --- a/src/structs/openbook.rs +++ b/src/structs/openbook.rs @@ -22,8 +22,14 @@ pub struct OpenBookFillEventRaw { pub referrer_rebate: Option, } impl OpenBookFillEventRaw { - pub fn with_time(self, block_time: i64) -> OpenBookFillEvent { + pub fn into_event( + self, + signature: String, + block_time: i64, + log_index: usize, + ) -> OpenBookFillEvent { OpenBookFillEvent { + signature, market: self.market, open_orders: self.open_orders, open_orders_owner: self.open_orders_owner, @@ -38,6 +44,7 @@ impl OpenBookFillEventRaw { client_order_id: self.client_order_id, referrer_rebate: self.referrer_rebate, block_time, + log_index, } } } @@ -45,6 +52,7 @@ impl OpenBookFillEventRaw { #[event] #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct OpenBookFillEvent { + pub signature: String, pub market: Pubkey, pub open_orders: Pubkey, pub open_orders_owner: Pubkey, @@ -59,6 +67,7 @@ pub struct OpenBookFillEvent { pub client_order_id: Option, pub referrer_rebate: Option, pub block_time: i64, + pub log_index: usize, } #[derive(Copy, Clone, Debug, PartialEq)] diff --git a/src/structs/transaction.rs b/src/structs/transaction.rs new file mode 100644 index 0000000..47909ad --- /dev/null +++ b/src/structs/transaction.rs @@ -0,0 +1,52 @@ +use chrono::{DateTime, Utc}; +use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature; +use tokio_postgres::Row; + +use crate::utils::{to_timestampz, OPENBOOK_KEY}; + +#[derive(Clone, Debug, PartialEq)] +pub struct PgTransaction { + pub signature: String, + pub program_pk: String, + pub block_datetime: DateTime, + pub slot: u64, + pub err: bool, + pub processed: bool, + pub worker_partition: i32, +} + +pub const NUM_TRANSACTION_PARTITIONS: u64 = 10; + +impl PgTransaction { + pub fn from_rpc_confirmed_transaction( + rpc_confirmed_transaction: RpcConfirmedTransactionStatusWithSignature, + ) -> Self { + PgTransaction { + signature: rpc_confirmed_transaction.signature, + program_pk: OPENBOOK_KEY.to_string(), + block_datetime: to_timestampz(rpc_confirmed_transaction.block_time.unwrap() as u64), + slot: rpc_confirmed_transaction.slot, + err: rpc_confirmed_transaction.err.is_some(), + processed: false, + worker_partition: (rpc_confirmed_transaction.slot % NUM_TRANSACTION_PARTITIONS) as i32, + } + } + + pub fn from_row(row: Row) -> Self { + let slot_raw = row.get::(3); + PgTransaction { + signature: row.get(0), + program_pk: row.get(1), + block_datetime: row.get(2), + slot: slot_raw as u64, + err: row.get(4), + processed: row.get(5), + worker_partition: row.get(6), + } + } +} + +pub enum ProcessState { + Processed, + Unprocessed, +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index aab0aec..2ce6bf6 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,9 +1,13 @@ +use anchor_lang::prelude::Pubkey; use chrono::{NaiveDateTime, Utc}; use deadpool_postgres::Pool; use serde_derive::Deserialize; +use solana_sdk::pubkey; use crate::structs::markets::MarketInfo; +pub const OPENBOOK_KEY: Pubkey = pubkey!("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX"); + pub trait AnyhowWrap { type Value; fn map_err_anyhow(self) -> anyhow::Result; diff --git a/src/worker/main.rs b/src/worker/main.rs index 996cc41..35b6161 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,22 +1,18 @@ use log::{error, info}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; -use openbook_candles::structs::openbook::OpenBookFillEvent; +use openbook_candles::structs::transaction::NUM_TRANSACTION_PARTITIONS; use openbook_candles::utils::Config; use openbook_candles::worker::metrics::{ - serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_FILLS_QUEUE_LENGTH, + serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, }; -use openbook_candles::worker::trade_fetching::scrape::scrape; +use openbook_candles::worker::trade_fetching::scrape::{scrape_fills, scrape_signatures}; use openbook_candles::{ - database::{ - initialize::{connect_to_database, setup_database}, - insert::{persist_fill_events}, - }, + database::initialize::{connect_to_database, setup_database}, worker::candle_batching::batch_for_market, }; use solana_sdk::pubkey::Pubkey; use std::env; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; -use tokio::sync::mpsc; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { @@ -32,8 +28,6 @@ async fn main() -> anyhow::Result<()> { rpc_url: rpc_url.clone(), }; - let fills_queue_max_size = 10000; - let markets = load_markets(path_to_markets_json); let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); @@ -46,21 +40,26 @@ async fn main() -> anyhow::Result<()> { setup_database(&pool).await?; let mut handles = vec![]; - let (fill_sender, mut fill_receiver) = mpsc::channel::(fills_queue_max_size); - let scrape_fill_sender = fill_sender.clone(); + // signature scraping + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); handles.push(tokio::spawn(async move { - scrape(&config, &scrape_fill_sender, &target_markets).await; + scrape_signatures(rpc_clone, &pool_clone).await.unwrap(); })); - let fills_pool = pool.clone(); - handles.push(tokio::spawn(async move { - loop { - persist_fill_events(&fills_pool, &mut fill_receiver) + // transaction/fill scraping + for id in 0..NUM_TRANSACTION_PARTITIONS { + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); + let markets_clone = target_markets.clone(); + handles.push(tokio::spawn(async move { + scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone) .await .unwrap(); - } - })); + })); + } + // candle batching for market in market_infos.into_iter() { let batch_pool = pool.clone(); handles.push(tokio::spawn(async move { @@ -70,7 +69,6 @@ async fn main() -> anyhow::Result<()> { } let monitor_pool = pool.clone(); - let monitor_fill_channel = fill_sender.clone(); handles.push(tokio::spawn(async move { // TODO: maybe break this out into a new function loop { @@ -78,9 +76,6 @@ async fn main() -> anyhow::Result<()> { METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); METRIC_DB_POOL_SIZE.set(pool_status.size as i64); - METRIC_FILLS_QUEUE_LENGTH - .set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64); - tokio::time::sleep(WaitDuration::from_secs(10)).await; } })); diff --git a/src/worker/metrics/mod.rs b/src/worker/metrics/mod.rs index 09488f3..7f76add 100644 --- a/src/worker/metrics/mod.rs +++ b/src/worker/metrics/mod.rs @@ -2,8 +2,8 @@ use actix_web::{dev::Server, http::StatusCode, App, HttpServer}; use actix_web_prom::PrometheusMetricsBuilder; use lazy_static::lazy_static; use prometheus::{ - register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec, - IntGauge, Registry, + register_int_counter_vec_with_registry, register_int_counter_with_registry, + register_int_gauge_with_registry, IntCounter, IntCounterVec, IntGauge, Registry, }; lazy_static! { @@ -30,9 +30,9 @@ lazy_static! { METRIC_REGISTRY ) .unwrap(); - pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( - "fills_queue_length", - "Current length of the fills write queue", + pub static ref METRIC_TRANSACTIONS_TOTAL: IntCounter = register_int_counter_with_registry!( + "transactions_total", + "Total number of transaction signatures scraped", METRIC_REGISTRY ) .unwrap(); diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index a31ce5c..d5d1491 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -15,10 +15,11 @@ const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, + sig_strings: &Vec, target_markets: &HashMap, ) -> Vec { let mut fills_vector = Vec::::new(); - for txn in txns.iter_mut() { + for (idx, txn) in txns.iter_mut().enumerate() { match txn { Ok(t) => { if let Some(m) = &t.transaction.meta { @@ -27,6 +28,7 @@ pub fn parse_trades_from_openbook_txns( match parse_openbook_fills_from_logs( logs, target_markets, + sig_strings[idx].clone(), t.block_time.unwrap(), ) { Some(mut events) => fills_vector.append(&mut events), @@ -52,10 +54,11 @@ pub fn parse_trades_from_openbook_txns( fn parse_openbook_fills_from_logs( logs: &Vec, target_markets: &HashMap, + signature: String, block_time: i64, ) -> Option> { let mut fills_vector = Vec::::new(); - for l in logs { + for (idx, l) in logs.iter().enumerate() { match l.strip_prefix(PROGRAM_DATA) { Some(log) => { let borsh_bytes = match anchor_lang::__private::base64::decode(log) { @@ -68,7 +71,7 @@ fn parse_openbook_fills_from_logs( match event { Ok(e) => { - let fill_event = e.with_time(block_time); + let fill_event = e.into_event(signature.clone(), block_time, idx); if target_markets.contains_key(&fill_event.market) { fills_vector.push(fill_event); } diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index dca1b78..19bd514 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -1,5 +1,6 @@ +use deadpool_postgres::Pool; use futures::future::join_all; -use log::{debug, warn}; +use log::{debug, info, warn}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::RpcTransactionConfig, @@ -10,106 +11,111 @@ use std::{collections::HashMap, str::FromStr}; use tokio::sync::mpsc::Sender; use crate::{ - structs::openbook::OpenBookFillEvent, - utils::Config, - worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, + database::{ + fetch::fetch_worker_transactions, + insert::{build_transactions_insert_statement, insert_fills_atomically}, + }, + structs::{openbook::OpenBookFillEvent, transaction::PgTransaction}, + utils::{AnyhowWrap, Config, OPENBOOK_KEY}, + worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL}, }; use super::parsing::parse_trades_from_openbook_txns; -pub async fn scrape( - config: &Config, - fill_sender: &Sender, - target_markets: &HashMap, -) { - let rpc_client = - RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed()); - let before_slot = None; +pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> { + let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); + loop { - scrape_transactions( - &rpc_client, - before_slot, - Some(1000), - fill_sender, - target_markets, - ) - .await; - } -} + let rpc_config = GetConfirmedSignaturesForAddress2Config { + before: None, + until: None, + limit: None, + commitment: Some(CommitmentConfig::confirmed()), + }; -pub async fn scrape_transactions( - rpc_client: &RpcClient, - before_sig: Option, - limit: Option, - fill_sender: &Sender, - target_markets: &HashMap, -) -> Option { - let rpc_config = GetConfirmedSignaturesForAddress2Config { - before: before_sig, - until: None, - limit, - commitment: Some(CommitmentConfig::confirmed()), - }; - - let mut sigs = match rpc_client - .get_signatures_for_address_with_config( - &Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), - rpc_config, - ) - .await - { - Ok(s) => s, - Err(e) => { - warn!("rpc error in get_signatures_for_address_with_config: {}", e); - METRIC_RPC_ERRORS_TOTAL - .with_label_values(&["getSignaturesForAddress"]) - .inc(); - return before_sig; - } - }; - - if sigs.is_empty() { - debug!("No signatures found"); - return before_sig; - } - - let last = sigs.last().unwrap(); - let request_last_sig = Signature::from_str(&last.signature).unwrap(); - - sigs.retain(|sig| sig.err.is_none()); - if sigs.last().is_none() { - return Some(request_last_sig); - } - - let txn_config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Json), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - - let signatures: Vec<_> = sigs - .into_iter() - .map(|sig| sig.signature.parse::().unwrap()) - .collect(); - - let txn_futs: Vec<_> = signatures - .iter() - .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) - .collect(); - - let mut txns = join_all(txn_futs).await; - - let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); - if !fills.is_empty() { - for fill in fills.into_iter() { - let market_name = target_markets.get(&fill.market).unwrap(); - if let Err(_) = fill_sender.send(fill).await { - panic!("receiver dropped"); + let sigs = match rpc_client + .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config) + .await + { + Ok(sigs) => sigs, + Err(e) => { + warn!("rpc error in get_signatures_for_address_with_config: {}", e); + METRIC_RPC_ERRORS_TOTAL + .with_label_values(&["getSignaturesForAddress"]) + .inc(); + continue; } - METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc(); + }; + if sigs.is_empty() { + debug!("No signatures found, trying again"); + continue; } + let transactions: Vec = sigs + .into_iter() + .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) + .collect(); + + debug!("Scraper writing: {:?} txns to DB\n", transactions.len()); + let upsert_statement = build_transactions_insert_statement(transactions); + let client = pool.get().await?; + let num_txns = client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns); + } + // TODO: graceful shutdown + Ok(()) +} + +pub async fn scrape_fills( + worker_id: i32, + rpc_url: String, + pool: &Pool, + target_markets: &HashMap, +) -> anyhow::Result<()> { + debug!("Worker {} started \n", worker_id); + let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); + + loop { + let transactions = fetch_worker_transactions(worker_id, pool).await?; + if transactions.len() == 0 { + debug!("No signatures found by worker {}", worker_id); + tokio::time::sleep(WaitDuration::from_secs(1)).await; + continue; + }; + + // for each signature, fetch the transaction + let txn_config = RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Json), + commitment: Some(CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }; + + let sig_strings = transactions + .iter() + .map(|t| t.signature.clone()) + .collect::>(); + + let signatures: Vec<_> = transactions + .into_iter() + .map(|t| t.signature.parse::().unwrap()) + .collect(); + + let txn_futs: Vec<_> = signatures + .iter() + .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) + .collect(); + + let mut txns = join_all(txn_futs).await; + + // TODO: reenable total fills metric + let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); + + // Write any fills to the database, and update the transactions as processed + insert_fills_atomically(pool, worker_id, fills, sig_strings).await?; } - Some(request_last_sig) + Ok(()) }