feat: fill events include block time (fixes backfill issues)

This commit is contained in:
dboures 2023-05-19 14:50:56 -05:00
parent 872e6fcdb2
commit 0e821cc3c1
No known key found for this signature in database
GPG Key ID: AB3790129D478852
5 changed files with 70 additions and 23 deletions

View File

@ -1,4 +1,3 @@
use chrono::Utc;
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use std::{ use std::{
collections::{hash_map::DefaultHasher, HashMap}, collections::{hash_map::DefaultHasher, HashMap},
@ -7,13 +6,13 @@ use std::{
use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use tokio::sync::mpsc::{error::TryRecvError, Receiver};
use crate::{ use crate::{
structs::{candle::Candle, openbook::OpenBookFillEventLog}, structs::{candle::Candle, openbook::OpenBookFillEvent},
utils::AnyhowWrap, utils::{to_timestampz, AnyhowWrap},
}; };
pub async fn persist_fill_events( pub async fn persist_fill_events(
pool: &Pool, pool: &Pool,
fill_receiver: &mut Receiver<OpenBookFillEventLog>, fill_receiver: &mut Receiver<OpenBookFillEvent>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let client = pool.get().await?; let client = pool.get().await?;
loop { loop {
@ -94,7 +93,8 @@ pub async fn persist_candles(
} }
} }
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> String { #[allow(deprecated)]
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES");
for (idx, event) in events.keys().enumerate() { for (idx, event) in events.keys().enumerate() {
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
@ -102,7 +102,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> St
let val_str = format!( let val_str = format!(
"({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})", "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})",
hasher.finish(), hasher.finish(),
Utc::now().to_rfc3339(), to_timestampz(event.block_time as u64).to_rfc3339(),
event.market, event.market,
event.open_orders, event.open_orders,
event.open_orders_owner, event.open_orders_owner,
@ -174,7 +174,7 @@ mod tests {
#[test] #[test]
fn test_event_hashing() { fn test_event_hashing() {
let event_1 = OpenBookFillEventLog { let event_1 = OpenBookFillEvent {
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
@ -189,9 +189,10 @@ mod tests {
fee_tier: 0, fee_tier: 0,
client_order_id: None, client_order_id: None,
referrer_rebate: Some(841), referrer_rebate: Some(841),
block_time: 0,
}; };
let event_2 = OpenBookFillEventLog { let event_2 = OpenBookFillEvent {
market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(), market: Pubkey::from_str("8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6").unwrap(),
open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(), open_orders: Pubkey::from_str("CKo9nGfgekYYfjHw4K22qMAtVeqBXET3pSGm8k5DSJi7").unwrap(),
open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw") open_orders_owner: Pubkey::from_str("JCNCMFXo5M5qwUPg2Utu1u6YWp3MbygxqBsBeXXJfrw")
@ -206,6 +207,7 @@ mod tests {
fee_tier: 0, fee_tier: 0,
client_order_id: None, client_order_id: None,
referrer_rebate: Some(841), referrer_rebate: Some(841),
block_time: 0,
}; };
let mut h1 = DefaultHasher::new(); let mut h1 = DefaultHasher::new();

View File

@ -6,7 +6,7 @@ use tokio_postgres::Row;
#[event] #[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEventLog { pub struct OpenBookFillEventRaw {
pub market: Pubkey, pub market: Pubkey,
pub open_orders: Pubkey, pub open_orders: Pubkey,
pub open_orders_owner: Pubkey, pub open_orders_owner: Pubkey,
@ -21,6 +21,45 @@ pub struct OpenBookFillEventLog {
pub client_order_id: Option<u64>, pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>, pub referrer_rebate: Option<u64>,
} }
impl OpenBookFillEventRaw {
pub fn with_time(self, block_time: i64) -> OpenBookFillEvent {
OpenBookFillEvent {
market: self.market,
open_orders: self.open_orders,
open_orders_owner: self.open_orders_owner,
bid: self.bid,
maker: self.maker,
native_qty_paid: self.native_qty_paid,
native_qty_received: self.native_qty_received,
native_fee_or_rebate: self.native_fee_or_rebate,
order_id: self.order_id,
owner_slot: self.owner_slot,
fee_tier: self.fee_tier,
client_order_id: self.client_order_id,
referrer_rebate: self.referrer_rebate,
block_time,
}
}
}
#[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OpenBookFillEvent {
pub market: Pubkey,
pub open_orders: Pubkey,
pub open_orders_owner: Pubkey,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: u64,
pub native_qty_received: u64,
pub native_fee_or_rebate: u64,
pub order_id: u128,
pub owner_slot: u8,
pub fee_tier: u8,
pub client_order_id: Option<u64>,
pub referrer_rebate: Option<u64>,
pub block_time: i64,
}
#[derive(Copy, Clone, Debug, PartialEq)] #[derive(Copy, Clone, Debug, PartialEq)]
pub struct PgOpenBookFill { pub struct PgOpenBookFill {

View File

@ -1,7 +1,7 @@
use dotenv; use dotenv;
use openbook_candles::structs::candle::Candle; use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEventLog; use openbook_candles::structs::openbook::OpenBookFillEvent;
use openbook_candles::utils::Config; use openbook_candles::utils::Config;
use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::{ use openbook_candles::{
@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?; setup_database(&pool).await?;
let mut handles = vec![]; let mut handles = vec![];
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000); let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; scrape(&config, &fill_sender, &target_markets).await;

View File

@ -5,22 +5,26 @@ use solana_transaction_status::{
}; };
use std::{collections::HashMap, io::Error}; use std::{collections::HashMap, io::Error};
use crate::structs::openbook::OpenBookFillEventLog; use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw};
const PROGRAM_DATA: &str = "Program data: "; const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns( pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>, txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, u8>,
) -> Vec<OpenBookFillEventLog> { ) -> Vec<OpenBookFillEvent> {
let mut fills_vector = Vec::<OpenBookFillEventLog>::new(); let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for txn in txns.iter_mut() { for txn in txns.iter_mut() {
match txn { match txn {
Ok(t) => { Ok(t) => {
if let Some(m) = &t.transaction.meta { if let Some(m) = &t.transaction.meta {
match &m.log_messages { match &m.log_messages {
OptionSerializer::Some(logs) => { OptionSerializer::Some(logs) => {
match parse_openbook_fills_from_logs(logs, target_markets) { match parse_openbook_fills_from_logs(
logs,
target_markets,
t.block_time.unwrap(),
) {
Some(mut events) => fills_vector.append(&mut events), Some(mut events) => fills_vector.append(&mut events),
None => {} None => {}
} }
@ -39,8 +43,9 @@ pub fn parse_trades_from_openbook_txns(
fn parse_openbook_fills_from_logs( fn parse_openbook_fills_from_logs(
logs: &Vec<String>, logs: &Vec<String>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, u8>,
) -> Option<Vec<OpenBookFillEventLog>> { block_time: i64,
let mut fills_vector = Vec::<OpenBookFillEventLog>::new(); ) -> Option<Vec<OpenBookFillEvent>> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for l in logs { for l in logs {
match l.strip_prefix(PROGRAM_DATA) { match l.strip_prefix(PROGRAM_DATA) {
Some(log) => { Some(log) => {
@ -49,13 +54,14 @@ fn parse_openbook_fills_from_logs(
_ => continue, _ => continue,
}; };
let mut slice: &[u8] = &borsh_bytes[8..]; let mut slice: &[u8] = &borsh_bytes[8..];
let event: Result<OpenBookFillEventLog, Error> = let event: Result<OpenBookFillEventRaw, Error> =
anchor_lang::AnchorDeserialize::deserialize(&mut slice); anchor_lang::AnchorDeserialize::deserialize(&mut slice);
match event { match event {
Ok(e) => { Ok(e) => {
if target_markets.contains_key(&e.market) { let fill_event = e.with_time(block_time);
fills_vector.push(e); if target_markets.contains_key(&fill_event.market) {
fills_vector.push(fill_event);
} }
} }
_ => continue, _ => continue,

View File

@ -8,13 +8,13 @@ use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use crate::{structs::openbook::OpenBookFillEventLog, utils::Config}; use crate::{structs::openbook::OpenBookFillEvent, utils::Config};
use super::parsing::parse_trades_from_openbook_txns; use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape( pub async fn scrape(
config: &Config, config: &Config,
fill_sender: &Sender<OpenBookFillEventLog>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, u8>,
) { ) {
let rpc_client = let rpc_client =
@ -38,7 +38,7 @@ pub async fn scrape_transactions(
rpc_client: &RpcClient, rpc_client: &RpcClient,
before_sig: Option<Signature>, before_sig: Option<Signature>,
limit: Option<usize>, limit: Option<usize>,
fill_sender: &Sender<OpenBookFillEventLog>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, u8>,
) -> Option<Signature> { ) -> Option<Signature> {
let rpc_config = GetConfirmedSignaturesForAddress2Config { let rpc_config = GetConfirmedSignaturesForAddress2Config {