diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index d452a90..ec515d6 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -177,7 +177,8 @@ pub async fn backfill( let mut txns = join_all(txn_futs).await; - let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); + // TODO: batch fills into groups of 1000 + let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); // Write any fills to the database, and mark the transactions as processed add_fills_atomically(pool, worker_id, fills, sig_strings).await?; diff --git a/src/database/initialize.rs b/src/database/initialize.rs index 29bc4ce..6d032db 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -127,7 +127,7 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { client .execute( "CREATE TABLE IF NOT EXISTS fills ( - id numeric PRIMARY KEY, + signature text not null, time timestamptz not null, market text not null, open_orders text not null, @@ -139,19 +139,13 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { native_fee_or_rebate double precision not null, fee_tier text not null, order_id text not null, - log_index int4 not null + log_index int4 not null, + CONSTRAINT fills_pk PRIMARY KEY (signature, log_index) )", &[], ) .await?; - client - .execute( - "CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)", - &[], - ) - .await?; - client .execute( "CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)", diff --git a/src/database/insert.rs b/src/database/insert.rs index 6fabab2..e7eedaf 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,8 +1,7 @@ use deadpool_postgres::Pool; use log::debug; use std::{ - collections::{hash_map::DefaultHasher, HashMap}, - hash::{Hash, Hasher}, + collections::{HashMap} }; use tokio::sync::mpsc::{error::TryRecvError, Receiver}; @@ -84,13 +83,11 @@ pub async fn persist_fill_events( #[allow(deprecated)] fn build_fills_upsert_statement(events: HashMap) -> String { - let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); + let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); for (idx, event) in events.keys().enumerate() { - let mut hasher = DefaultHasher::new(); - event.hash(&mut hasher); let val_str = format!( "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", - hasher.finish(), + event.signature, to_timestampz(event.block_time as u64).to_rfc3339(), event.market, event.open_orders, @@ -119,13 +116,11 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin } fn build_fills_upsert_statement_not_crazy(fills: Vec) -> String { - let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); + let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); for (idx, fill) in fills.iter().enumerate() { - let mut hasher = DefaultHasher::new(); - fill.hash(&mut hasher); let val_str = format!( - "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", - hasher.finish(), + "(\'{}\', \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", + fill.signature, to_timestampz(fill.block_time as u64).to_rfc3339(), fill.market, fill.open_orders, @@ -241,59 +236,3 @@ pub fn build_transactions_processed_update_statement( stmt = format!("{} {}", stmt, worker_stmt); stmt } - -#[cfg(test)] -mod tests { - use super::*; - use solana_sdk::pubkey::Pubkey; - use std::str::FromStr; - - #[test] - fn test_event_hashing() { - let event_1 = OpenBookFillEvent { - market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), - open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), - open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") - .unwrap(), - bid: false, - maker: false, - native_qty_paid: 200000000, - native_qty_received: 4204317, - native_fee_or_rebate: 1683, - order_id: 387898134381964481824213, - owner_slot: 0, - fee_tier: 0, - client_order_id: None, - referrer_rebate: Some(841), - block_time: 0, - log_index: 1, - }; - - let event_2 = OpenBookFillEvent { - market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), - open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), - open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") - .unwrap(), - bid: false, - maker: false, - native_qty_paid: 200000001, - native_qty_received: 4204317, - native_fee_or_rebate: 1683, - order_id: 387898134381964481824213, - owner_slot: 0, - fee_tier: 0, - client_order_id: None, - referrer_rebate: Some(841), - block_time: 0, - log_index: 1, - }; - - let mut h1 = DefaultHasher::new(); - event_1.hash(&mut h1); - - let mut h2 = DefaultHasher::new(); - event_2.hash(&mut h2); - - assert_ne!(h1.finish(), h2.finish()); - } -} diff --git a/src/structs/openbook.rs b/src/structs/openbook.rs index 4e35bb5..8629ce2 100644 --- a/src/structs/openbook.rs +++ b/src/structs/openbook.rs @@ -22,8 +22,9 @@ pub struct OpenBookFillEventRaw { pub referrer_rebate: Option, } impl OpenBookFillEventRaw { - pub fn into_event(self, block_time: i64, log_index: usize) -> OpenBookFillEvent { + pub fn into_event(self, signature: String, block_time: i64, log_index: usize) -> OpenBookFillEvent { OpenBookFillEvent { + signature, market: self.market, open_orders: self.open_orders, open_orders_owner: self.open_orders_owner, @@ -46,6 +47,7 @@ impl OpenBookFillEventRaw { #[event] #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct OpenBookFillEvent { + pub signature: String, pub market: Pubkey, pub open_orders: Pubkey, pub open_orders_owner: Pubkey, diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 3f314ae..6add236 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -15,10 +15,11 @@ const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, + sig_strings: &Vec, target_markets: &HashMap, ) -> Vec { let mut fills_vector = Vec::::new(); - for txn in txns.iter_mut() { + for (idx, txn) in txns.iter_mut().enumerate() { match txn { Ok(t) => { if let Some(m) = &t.transaction.meta { @@ -27,6 +28,7 @@ pub fn parse_trades_from_openbook_txns( match parse_openbook_fills_from_logs( logs, target_markets, + sig_strings[idx].clone(), t.block_time.unwrap(), ) { Some(mut events) => fills_vector.append(&mut events), @@ -52,6 +54,7 @@ pub fn parse_trades_from_openbook_txns( fn parse_openbook_fills_from_logs( logs: &Vec, target_markets: &HashMap, + signature: String, block_time: i64, ) -> Option> { let mut fills_vector = Vec::::new(); @@ -68,7 +71,7 @@ fn parse_openbook_fills_from_logs( match event { Ok(e) => { - let fill_event = e.into_event(block_time, idx); + let fill_event = e.into_event(signature.clone(), block_time, idx); if target_markets.contains_key(&fill_event.market) { fills_vector.push(fill_event); } diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index fcd89c6..7957bb8 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -89,6 +89,11 @@ pub async fn scrape_transactions( max_supported_transaction_version: Some(0), }; + let sig_strings = sigs + .iter() + .map(|t| t.signature.clone()) + .collect::>(); + let signatures: Vec<_> = sigs .into_iter() .map(|sig| sig.signature.parse::().unwrap()) @@ -101,7 +106,7 @@ pub async fn scrape_transactions( let mut txns = join_all(txn_futs).await; - let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); + let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); if !fills.is_empty() { for fill in fills.into_iter() { let market_name = target_markets.get(&fill.market).unwrap();