From 0e821cc3c1b2dba8d9ca9ad73f528c5781ab8cf7 Mon Sep 17 00:00:00 2001 From: dboures Date: Fri, 19 May 2023 14:50:56 -0500 Subject: [PATCH] feat: fill events include block time (fixes backfill issues) --- src/database/insert.rs | 18 ++++++------ src/structs/openbook.rs | 41 +++++++++++++++++++++++++++- src/worker/main.rs | 4 +-- src/worker/trade_fetching/parsing.rs | 24 ++++++++++------ src/worker/trade_fetching/scrape.rs | 6 ++-- 5 files changed, 70 insertions(+), 23 deletions(-) diff --git a/src/database/insert.rs b/src/database/insert.rs index 414c35b..89db430 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,4 +1,3 @@ -use chrono::Utc; use deadpool_postgres::Pool; use std::{ collections::{hash_map::DefaultHasher, HashMap}, @@ -7,13 +6,13 @@ use std::{ use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use crate::{ - structs::{candle::Candle, openbook::OpenBookFillEventLog}, - utils::AnyhowWrap, + structs::{candle::Candle, openbook::OpenBookFillEvent}, + utils::{to_timestampz, AnyhowWrap}, }; pub async fn persist_fill_events( pool: &Pool, - fill_receiver: &mut Receiver, + fill_receiver: &mut Receiver, ) -> anyhow::Result<()> { let client = pool.get().await?; loop { @@ -94,7 +93,8 @@ pub async fn persist_candles( } } -fn build_fills_upsert_statement(events: HashMap) -> String { +#[allow(deprecated)] +fn build_fills_upsert_statement(events: HashMap) -> String { let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); for (idx, event) in events.keys().enumerate() { let mut hasher = DefaultHasher::new(); @@ -102,7 +102,7 @@ fn build_fills_upsert_statement(events: HashMap) -> St let val_str = format!( "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})", hasher.finish(), - Utc::now().to_rfc3339(), + to_timestampz(event.block_time as u64).to_rfc3339(), event.market, event.open_orders, event.open_orders_owner, @@ -174,7 +174,7 @@ mod tests { #[test] fn test_event_hashing() { - let event_1 = OpenBookFillEventLog { + let event_1 = OpenBookFillEvent { market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") @@ -189,9 +189,10 @@ mod tests { fee_tier: 0, client_order_id: None, referrer_rebate: Some(841), + block_time: 0, }; - let event_2 = OpenBookFillEventLog { + let event_2 = OpenBookFillEvent { market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") @@ -206,6 +207,7 @@ mod tests { fee_tier: 0, client_order_id: None, referrer_rebate: Some(841), + block_time: 0, }; let mut h1 = DefaultHasher::new(); diff --git a/src/structs/openbook.rs b/src/structs/openbook.rs index 038b543..9fd65ec 100644 --- a/src/structs/openbook.rs +++ b/src/structs/openbook.rs @@ -6,7 +6,7 @@ use tokio_postgres::Row; #[event] #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct OpenBookFillEventLog { +pub struct OpenBookFillEventRaw { pub market: Pubkey, pub open_orders: Pubkey, pub open_orders_owner: Pubkey, @@ -21,6 +21,45 @@ pub struct OpenBookFillEventLog { pub client_order_id: Option, pub referrer_rebate: Option, } +impl OpenBookFillEventRaw { + pub fn with_time(self, block_time: i64) -> OpenBookFillEvent { + OpenBookFillEvent { + market: self.market, + open_orders: self.open_orders, + open_orders_owner: self.open_orders_owner, + bid: self.bid, + maker: self.maker, + native_qty_paid: self.native_qty_paid, + native_qty_received: self.native_qty_received, + native_fee_or_rebate: self.native_fee_or_rebate, + order_id: self.order_id, + owner_slot: self.owner_slot, + fee_tier: self.fee_tier, + client_order_id: self.client_order_id, + referrer_rebate: self.referrer_rebate, + block_time, + } + } +} + +#[event] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct OpenBookFillEvent { + pub market: Pubkey, + pub open_orders: Pubkey, + pub open_orders_owner: Pubkey, + pub bid: bool, + pub maker: bool, + pub native_qty_paid: u64, + pub native_qty_received: u64, + pub native_fee_or_rebate: u64, + pub order_id: u128, + pub owner_slot: u8, + pub fee_tier: u8, + pub client_order_id: Option, + pub referrer_rebate: Option, + pub block_time: i64, +} #[derive(Copy, Clone, Debug, PartialEq)] pub struct PgOpenBookFill { diff --git a/src/worker/main.rs b/src/worker/main.rs index e8c774f..875708f 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,7 +1,7 @@ use dotenv; use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; -use openbook_candles::structs::openbook::OpenBookFillEventLog; +use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::utils::Config; use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::{ @@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> { setup_database(&pool).await?; let mut handles = vec![]; - let (fill_sender, mut fill_receiver) = mpsc::channel::(1000); + let (fill_sender, mut fill_receiver) = mpsc::channel::(1000); handles.push(tokio::spawn(async move { scrape(&config, &fill_sender, &target_markets).await; diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index 4ac699b..e70560d 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -5,22 +5,26 @@ use solana_transaction_status::{ }; use std::{collections::HashMap, io::Error}; -use crate::structs::openbook::OpenBookFillEventLog; +use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw}; const PROGRAM_DATA: &str = "Program data: "; pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, target_markets: &HashMap, -) -> Vec { - let mut fills_vector = Vec::::new(); +) -> Vec { + let mut fills_vector = Vec::::new(); for txn in txns.iter_mut() { match txn { Ok(t) => { if let Some(m) = &t.transaction.meta { match &m.log_messages { OptionSerializer::Some(logs) => { - match parse_openbook_fills_from_logs(logs, target_markets) { + match parse_openbook_fills_from_logs( + logs, + target_markets, + t.block_time.unwrap(), + ) { Some(mut events) => fills_vector.append(&mut events), None => {} } @@ -39,8 +43,9 @@ pub fn parse_trades_from_openbook_txns( fn parse_openbook_fills_from_logs( logs: &Vec, target_markets: &HashMap, -) -> Option> { - let mut fills_vector = Vec::::new(); + block_time: i64, +) -> Option> { + let mut fills_vector = Vec::::new(); for l in logs { match l.strip_prefix(PROGRAM_DATA) { Some(log) => { @@ -49,13 +54,14 @@ fn parse_openbook_fills_from_logs( _ => continue, }; let mut slice: &[u8] = &borsh_bytes[8..]; - let event: Result = + let event: Result = anchor_lang::AnchorDeserialize::deserialize(&mut slice); match event { Ok(e) => { - if target_markets.contains_key(&e.market) { - fills_vector.push(e); + let fill_event = e.with_time(block_time); + if target_markets.contains_key(&fill_event.market) { + fills_vector.push(fill_event); } } _ => continue, diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index 9cacec6..ebd3a02 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -8,13 +8,13 @@ use solana_transaction_status::UiTransactionEncoding; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; -use crate::{structs::openbook::OpenBookFillEventLog, utils::Config}; +use crate::{structs::openbook::OpenBookFillEvent, utils::Config}; use super::parsing::parse_trades_from_openbook_txns; pub async fn scrape( config: &Config, - fill_sender: &Sender, + fill_sender: &Sender, target_markets: &HashMap, ) { let rpc_client = @@ -38,7 +38,7 @@ pub async fn scrape_transactions( rpc_client: &RpcClient, before_sig: Option, limit: Option, - fill_sender: &Sender, + fill_sender: &Sender, target_markets: &HashMap, ) -> Option { let rpc_config = GetConfirmedSignaturesForAddress2Config {