diff --git a/src/backfill-candles/main.rs b/src/backfill-candles/main.rs index e7c2b39..9901fe4 100644 --- a/src/backfill-candles/main.rs +++ b/src/backfill-candles/main.rs @@ -53,7 +53,7 @@ async fn main() -> anyhow::Result<()> { async fn save_candles(candles: Vec, client: Object) -> anyhow::Result<()> { if !candles.is_empty() { - let upsert_statement = build_candles_upsert_statement(candles); + let upsert_statement = build_candles_upsert_statement(&candles); client .execute(&upsert_statement, &[]) .await diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs index 8ac2ec3..d452a90 100644 --- a/src/backfill-trades/main.rs +++ b/src/backfill-trades/main.rs @@ -1,23 +1,28 @@ use anchor_lang::prelude::Pubkey; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; +use deadpool_postgres::Pool; use futures::future::join_all; +use log::debug; use openbook_candles::{ - database::{initialize::connect_to_database, insert::persist_fill_events}, + database::{ + fetch::fetch_worker_transactions, + initialize::{connect_to_database, setup_database}, + insert::{add_fills_atomically, build_transactions_insert_statement}, + }, structs::{ markets::{fetch_market_infos, load_markets}, - openbook::OpenBookFillEvent, + transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS}, }, - utils::Config, + utils::{AnyhowWrap, Config, OPENBOOK_KEY}, worker::trade_fetching::parsing::parse_trades_from_openbook_txns, }; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, - rpc_config::RpcTransactionConfig, rpc_response::RpcConfirmedTransactionStatusWithSignature, + rpc_config::RpcTransactionConfig }; use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; -use std::{collections::HashMap, env, str::FromStr}; -use tokio::sync::mpsc::{self, Sender}; +use std::{collections::HashMap, env, str::FromStr,time::Duration as WaitDuration }; #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { @@ -26,6 +31,8 @@ async fn main() -> anyhow::Result<()> { assert!(args.len() == 2); let path_to_markets_json = &args[1]; + // let num_days = args[2].parse::().unwrap(); // TODO: implement + let num_days = 1; let rpc_url: String = dotenv::var("RPC_URL").unwrap(); let config = Config { @@ -40,147 +47,147 @@ async fn main() -> anyhow::Result<()> { println!("{:?}", target_markets); let pool = connect_to_database().await?; - let (fill_sender, mut fill_receiver) = mpsc::channel::(1000); + setup_database(&pool).await?; - tokio::spawn(async move { - loop { - persist_fill_events(&pool, &mut fill_receiver) + let mut handles = vec![]; + + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); + handles.push(tokio::spawn(async move { + fetch_signatures(rpc_clone, &pool_clone, num_days).await.unwrap(); + })); + + for id in 0..NUM_TRANSACTION_PARTITIONS { + let rpc_clone = rpc_url.clone(); + let pool_clone = pool.clone(); + let markets_clone = target_markets.clone(); + handles.push(tokio::spawn(async move { + backfill(id as i32, rpc_clone, &pool_clone, &markets_clone) .await .unwrap(); - } - }); + })); + } - backfill(rpc_url, &fill_sender, &target_markets).await?; + // TODO: spawn status thread + + futures::future::join_all(handles).await; + Ok(()) +} + +pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> anyhow::Result<()> { + let mut before_sig: Option = None; + let mut now_time = Utc::now().timestamp(); + let end_time = (Utc::now() - Duration::days(num_days)).timestamp(); + let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); + + while now_time > end_time { + let rpc_config = GetConfirmedSignaturesForAddress2Config { + before: before_sig, + until: None, + limit: None, + commitment: Some(CommitmentConfig::confirmed()), + }; + + let sigs = match rpc_client + .get_signatures_for_address_with_config( + &OPENBOOK_KEY, + rpc_config, + ) + .await + { + Ok(sigs) => sigs, + Err(e) => { + println!("Error fetching signatures: {}", e); + continue; + } + }; + if sigs.is_empty() { + println!("No signatures found, trying again"); + continue; + } + let last = sigs.last().unwrap(); + let last_time = last.block_time.unwrap().clone(); + let last_signature = last.signature.clone(); + let transactions = sigs + .into_iter() + .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) + .collect::>(); + + if transactions.is_empty() { + println!("No transactions found, trying again"); + } + debug!("writing: {:?} txns to DB\n", transactions.len()); + let upsert_statement = build_transactions_insert_statement(transactions); + let client = pool.get().await?; + client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + + now_time = last_time; + before_sig = Some(Signature::from_str(&last_signature)?); + let time_left = backfill_time_left(now_time, end_time); + println!( + "{} minutes ~ {} days remaining in the backfill\n", + time_left.num_minutes(), + time_left.num_days() + ); + } Ok(()) } pub async fn backfill( + worker_id: i32, rpc_url: String, - fill_sender: &Sender, + pool: &Pool, target_markets: &HashMap, ) -> anyhow::Result<()> { - println!("backfill started"); - let mut before_sig: Option = None; - let mut now_time = Utc::now().timestamp(); - let end_time = (Utc::now() - Duration::days(1)).timestamp(); + println!("Worker {} up \n", worker_id); + let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); - let mut handles = vec![]; + loop { + let transactions = fetch_worker_transactions(worker_id, pool).await?; + if transactions.len() == 0 { + println!("No signatures found by worker {}", worker_id); + tokio::time::sleep(WaitDuration::from_secs(1)).await; + continue; + }; - while now_time > end_time { - let rpc_client = - RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); - let maybe_r = get_signatures(&rpc_client, before_sig).await; + // for each signature, fetch the transaction + let txn_config = RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Json), + commitment: Some(CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }; - match maybe_r { - Some((last, time, sigs)) => { - now_time = time; - before_sig = Some(last); - let time_left = backfill_time_left(now_time, end_time); - println!( - "{} minutes ~ {} days remaining in the backfill\n", - time_left.num_minutes(), - time_left.num_days() - ); + let sig_strings = transactions + .iter() + .map(|t| t.signature.clone()) + .collect::>(); - let cloned_markets = target_markets.clone(); - let cloned_sender = fill_sender.clone(); - let handle = tokio::spawn(async move { - get_transactions(&rpc_client, sigs, &cloned_sender, &cloned_markets).await; - }); - handles.push(handle); - } - None => {} - } + let signatures: Vec<_> = transactions + .into_iter() + .map(|t| t.signature.parse::().unwrap()) + .collect(); + + let txn_futs: Vec<_> = signatures + .iter() + .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) + .collect(); + + let mut txns = join_all(txn_futs).await; + + let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); + + // Write any fills to the database, and mark the transactions as processed + add_fills_atomically(pool, worker_id, fills, sig_strings).await?; } - futures::future::join_all(handles).await; - - println!("Backfill complete \n"); + // TODO: graceful shutdown + // println!("Worker {} down \n", worker_id); Ok(()) } -pub async fn get_signatures( - rpc_client: &RpcClient, - before_sig: Option, -) -> Option<( - Signature, - i64, - Vec, -)> { - let rpc_config = GetConfirmedSignaturesForAddress2Config { - before: before_sig, - until: None, - limit: None, - commitment: Some(CommitmentConfig::confirmed()), - }; - - let sigs = match rpc_client - .get_signatures_for_address_with_config( - &Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), - rpc_config, - ) - .await - { - Ok(s) => s, - Err(e) => { - println!("Error in get_signatures_for_address_with_config: {}", e); - return None; - } - }; - - if sigs.is_empty() { - println!("No signatures found"); - return None; - } - let last = sigs.last().unwrap(); - // println!("{:?}", last.block_time.unwrap()); - Some(( - Signature::from_str(&last.signature).unwrap(), - last.block_time.unwrap(), - sigs, - )) -} - -pub async fn get_transactions( - rpc_client: &RpcClient, - mut sigs: Vec, - fill_sender: &Sender, - target_markets: &HashMap, -) { - sigs.retain(|sig| sig.err.is_none()); - if sigs.last().is_none() { - return; - } - - let txn_config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Json), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - - let signatures: Vec<_> = sigs - .into_iter() - .map(|sig| sig.signature.parse::().unwrap()) - .collect(); - - let txn_futs: Vec<_> = signatures - .iter() - .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) - .collect(); - - let mut txns = join_all(txn_futs).await; - - let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); - if !fills.is_empty() { - for fill in fills.into_iter() { - // println!("Sending fill {:?}", fill); - if let Err(_) = fill_sender.send(fill).await { - panic!("receiver dropped"); - } - } - } -} - fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration { let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap(); let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap(); diff --git a/src/database/fetch.rs b/src/database/fetch.rs index 37f6043..36ce206 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -4,6 +4,7 @@ use crate::structs::{ openbook::PgOpenBookFill, resolution::Resolution, trader::PgTrader, + transaction::PgTransaction, }; use chrono::{DateTime, Utc}; use deadpool_postgres::{GenericClient, Pool}; @@ -320,3 +321,23 @@ pub async fn fetch_coingecko_24h_high_low( .map(PgCoinGecko24HighLow::from_row) .collect()) } + +/// Fetches unprocessed, non-error transactions for the specified worker partition. +/// Pulls at most 50 transactions at a time. +pub async fn fetch_worker_transactions( + worker_id: i32, + pool: &Pool, +) -> anyhow::Result> { + let client = pool.get().await?; + + let stmt = r#"SELECT signature, program_pk, block_datetime, slot, err, "processed", worker_partition + FROM transactions + where worker_partition = $1 + and err = false + and processed = false + LIMIT 50"#; + + let rows = client.query(stmt, &[&worker_id]).await?; + + Ok(rows.into_iter().map(PgTransaction::from_row).collect()) +} diff --git a/src/database/initialize.rs b/src/database/initialize.rs index 4bc8a9c..29bc4ce 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -67,8 +67,9 @@ pub async fn connect_to_database() -> anyhow::Result { pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> { let candles_table_fut = create_candles_table(pool); + let transactions_table_fut = create_transactions_table(pool); let fills_table_fut = create_fills_table(pool); - let result = tokio::try_join!(candles_table_fut, fills_table_fut); + let result = tokio::try_join!(candles_table_fut, transactions_table_fut, fills_table_fut); match result { Ok(_) => { println!("Successfully configured database"); @@ -137,7 +138,8 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { native_qty_received double precision not null, native_fee_or_rebate double precision not null, fee_tier text not null, - order_id text not null + order_id text not null, + log_index int4 not null )", &[], ) @@ -158,3 +160,41 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { .await?; Ok(()) } + +pub async fn create_transactions_table(pool: &Pool) -> anyhow::Result<()> { + let client = pool.get().await?; + + client + .execute( + "CREATE TABLE IF NOT EXISTS transactions ( + signature text NOT NULL, + program_pk text NOT NULL, + block_datetime timestamptz NOT NULL, + slot bigint NOT NULL, + err bool NOT NULL, + processed bool NOT NULL, + worker_partition int4 NOT NULL, + CONSTRAINT transactions_pk PRIMARY KEY (signature, worker_partition) + ) PARTITION BY LIST (worker_partition);", + &[], + ) + .await?; + + client.batch_execute( + "CREATE INDEX IF NOT EXISTS transactions_processed_err_idx ON ONLY transactions (signature) WHERE processed IS NOT TRUE and err IS NOT TRUE; + CREATE INDEX IF NOT EXISTS transactions_program_pk_idx ON ONLY transactions USING btree (program_pk, slot DESC); + + CREATE TABLE IF NOT EXISTS transactions_0 PARTITION OF transactions FOR VALUES IN (0); + CREATE TABLE IF NOT EXISTS transactions_1 PARTITION OF transactions FOR VALUES IN (1); + CREATE TABLE IF NOT EXISTS transactions_2 PARTITION OF transactions FOR VALUES IN (2); + CREATE TABLE IF NOT EXISTS transactions_3 PARTITION OF transactions FOR VALUES IN (3); + CREATE TABLE IF NOT EXISTS transactions_4 PARTITION OF transactions FOR VALUES IN (4); + CREATE TABLE IF NOT EXISTS transactions_5 PARTITION OF transactions FOR VALUES IN (5); + CREATE TABLE IF NOT EXISTS transactions_6 PARTITION OF transactions FOR VALUES IN (6); + CREATE TABLE IF NOT EXISTS transactions_7 PARTITION OF transactions FOR VALUES IN (7); + CREATE TABLE IF NOT EXISTS transactions_8 PARTITION OF transactions FOR VALUES IN (8); + CREATE TABLE IF NOT EXISTS transactions_9 PARTITION OF transactions FOR VALUES IN (9);" + ).await?; + + Ok(()) +} diff --git a/src/database/insert.rs b/src/database/insert.rs index ba95e87..6fabab2 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -7,10 +7,44 @@ use std::{ use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use crate::{ - structs::{candle::Candle, openbook::OpenBookFillEvent}, + structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction}, utils::{to_timestampz, AnyhowWrap}, }; +pub async fn add_fills_atomically( + pool: &Pool, + worker_id: i32, + fills: Vec, + signatures: Vec, +) -> anyhow::Result<()> { + let mut client = pool.get().await?; + + let db_txn = client.build_transaction().start().await?; + + // 1. Insert fills + if fills.len() > 0 { + let fills_statement = build_fills_upsert_statement_not_crazy(fills); + db_txn + .execute(&fills_statement, &[]) + .await + .map_err_anyhow() + .unwrap(); + } + + // 2. Update txns table as processed + let transactions_statement = + build_transactions_processed_update_statement(worker_id, signatures); + db_txn + .execute(&transactions_statement, &[]) + .await + .map_err_anyhow() + .unwrap(); + + db_txn.commit().await?; + + Ok(()) +} + pub async fn persist_fill_events( pool: &Pool, fill_receiver: &mut Receiver, @@ -50,12 +84,12 @@ pub async fn persist_fill_events( #[allow(deprecated)] fn build_fills_upsert_statement(events: HashMap) -> String { - let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); + let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); for (idx, event) in events.keys().enumerate() { let mut hasher = DefaultHasher::new(); event.hash(&mut hasher); let val_str = format!( - "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {})", + "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", hasher.finish(), to_timestampz(event.block_time as u64).to_rfc3339(), event.market, @@ -68,6 +102,7 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin event.native_fee_or_rebate, event.fee_tier, event.order_id, + event.log_index, ); if idx == 0 { @@ -77,7 +112,42 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin } } - let handle_conflict = "ON CONFLICT (id) DO UPDATE SET market=excluded.market"; + let handle_conflict = "ON CONFLICT DO NOTHING"; + + stmt = format!("{} {}", stmt, handle_conflict); + stmt +} + +fn build_fills_upsert_statement_not_crazy(fills: Vec) -> String { + let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); + for (idx, fill) in fills.iter().enumerate() { + let mut hasher = DefaultHasher::new(); + fill.hash(&mut hasher); + let val_str = format!( + "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", + hasher.finish(), + to_timestampz(fill.block_time as u64).to_rfc3339(), + fill.market, + fill.open_orders, + fill.open_orders_owner, + fill.bid, + fill.maker, + fill.native_qty_paid, + fill.native_qty_received, + fill.native_fee_or_rebate, + fill.fee_tier, + fill.order_id, + fill.log_index, + ); + + if idx == 0 { + stmt = format!("{} {}", &stmt, val_str); + } else { + stmt = format!("{}, {}", &stmt, val_str); + } + } + + let handle_conflict = "ON CONFLICT DO NOTHING"; stmt = format!("{} {}", stmt, handle_conflict); stmt @@ -121,6 +191,57 @@ pub fn build_candles_upsert_statement(candles: &Vec) -> String { stmt } +pub fn build_transactions_insert_statement(transactions: Vec) -> String { + let mut stmt = String::from("INSERT INTO transactions (signature, program_pk, block_datetime, slot, err, processed, worker_partition) VALUES"); + for (idx, txn) in transactions.iter().enumerate() { + let val_str = format!( + "(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {})", + txn.signature, + txn.program_pk, + txn.block_datetime.to_rfc3339(), + txn.slot, + txn.err, + txn.processed, + txn.worker_partition, + ); + + if idx == 0 { + stmt = format!("{} {}", &stmt, val_str); + } else { + stmt = format!("{}, {}", &stmt, val_str); + } + } + + let handle_conflict = "ON CONFLICT DO NOTHING"; + + stmt = format!("{} {}", stmt, handle_conflict); + stmt +} + +pub fn build_transactions_processed_update_statement( + worker_id: i32, + processed_signatures: Vec, +) -> String { + let mut stmt = String::from( + "UPDATE transactions + SET processed = true + WHERE transactions.signature IN (", + ); + for (idx, sig) in processed_signatures.iter().enumerate() { + let val_str = if idx == processed_signatures.len() - 1 { + format!("\'{}\'", sig,) + } else { + format!("\'{}\',", sig,) + }; + stmt = format!("{} {}", &stmt, val_str); + } + + let worker_stmt = format!(") AND worker_partition = {} ", worker_id); + + stmt = format!("{} {}", stmt, worker_stmt); + stmt +} + #[cfg(test)] mod tests { use super::*; @@ -145,6 +266,7 @@ mod tests { client_order_id: None, referrer_rebate: Some(841), block_time: 0, + log_index: 1, }; let event_2 = OpenBookFillEvent { @@ -163,6 +285,7 @@ mod tests { client_order_id: None, referrer_rebate: Some(841), block_time: 0, + log_index: 1, }; let mut h1 = DefaultHasher::new(); diff --git a/src/structs/mod.rs b/src/structs/mod.rs index 113578c..e161e2c 100644 --- a/src/structs/mod.rs +++ b/src/structs/mod.rs @@ -6,3 +6,4 @@ pub mod resolution; pub mod slab; pub mod trader; pub mod tradingview; +pub mod transaction; diff --git a/src/structs/openbook.rs b/src/structs/openbook.rs index 9fd65ec..4e35bb5 100644 --- a/src/structs/openbook.rs +++ b/src/structs/openbook.rs @@ -22,7 +22,7 @@ pub struct OpenBookFillEventRaw { pub referrer_rebate: Option, } impl OpenBookFillEventRaw { - pub fn with_time(self, block_time: i64) -> OpenBookFillEvent { + pub fn into_event(self, block_time: i64, log_index: usize) -> OpenBookFillEvent { OpenBookFillEvent { market: self.market, open_orders: self.open_orders, @@ -38,6 +38,7 @@ impl OpenBookFillEventRaw { client_order_id: self.client_order_id, referrer_rebate: self.referrer_rebate, block_time, + log_index, } } } @@ -59,6 +60,7 @@ pub struct OpenBookFillEvent { pub client_order_id: Option, pub referrer_rebate: Option, pub block_time: i64, + pub log_index: usize } #[derive(Copy, Clone, Debug, PartialEq)] diff --git a/src/structs/transaction.rs b/src/structs/transaction.rs new file mode 100644 index 0000000..47909ad --- /dev/null +++ b/src/structs/transaction.rs @@ -0,0 +1,52 @@ +use chrono::{DateTime, Utc}; +use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature; +use tokio_postgres::Row; + +use crate::utils::{to_timestampz, OPENBOOK_KEY}; + +#[derive(Clone, Debug, PartialEq)] +pub struct PgTransaction { + pub signature: String, + pub program_pk: String, + pub block_datetime: DateTime, + pub slot: u64, + pub err: bool, + pub processed: bool, + pub worker_partition: i32, +} + +pub const NUM_TRANSACTION_PARTITIONS: u64 = 10; + +impl PgTransaction { + pub fn from_rpc_confirmed_transaction( + rpc_confirmed_transaction: RpcConfirmedTransactionStatusWithSignature, + ) -> Self { + PgTransaction { + signature: rpc_confirmed_transaction.signature, + program_pk: OPENBOOK_KEY.to_string(), + block_datetime: to_timestampz(rpc_confirmed_transaction.block_time.unwrap() as u64), + slot: rpc_confirmed_transaction.slot, + err: rpc_confirmed_transaction.err.is_some(), + processed: false, + worker_partition: (rpc_confirmed_transaction.slot % NUM_TRANSACTION_PARTITIONS) as i32, + } + } + + pub fn from_row(row: Row) -> Self { + let slot_raw = row.get::(3); + PgTransaction { + signature: row.get(0), + program_pk: row.get(1), + block_datetime: row.get(2), + slot: slot_raw as u64, + err: row.get(4), + processed: row.get(5), + worker_partition: row.get(6), + } + } +} + +pub enum ProcessState { + Processed, + Unprocessed, +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index aab0aec..2ce6bf6 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,9 +1,13 @@ +use anchor_lang::prelude::Pubkey; use chrono::{NaiveDateTime, Utc}; use deadpool_postgres::Pool; use serde_derive::Deserialize; +use solana_sdk::pubkey; use crate::structs::markets::MarketInfo; +pub const OPENBOOK_KEY: Pubkey = pubkey!("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX"); + pub trait AnyhowWrap { type Value; fn map_err_anyhow(self) -> anyhow::Result; diff --git a/src/worker/main.rs b/src/worker/main.rs index 996cc41..77581f9 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -9,7 +9,7 @@ use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::{ database::{ initialize::{connect_to_database, setup_database}, - insert::{persist_fill_events}, + insert::persist_fill_events, }, worker::candle_batching::batch_for_market, }; diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs index a31ce5c..3f314ae 100644 --- a/src/worker/trade_fetching/parsing.rs +++ b/src/worker/trade_fetching/parsing.rs @@ -55,7 +55,7 @@ fn parse_openbook_fills_from_logs( block_time: i64, ) -> Option> { let mut fills_vector = Vec::::new(); - for l in logs { + for (idx, l) in logs.iter().enumerate() { match l.strip_prefix(PROGRAM_DATA) { Some(log) => { let borsh_bytes = match anchor_lang::__private::base64::decode(log) { @@ -68,7 +68,7 @@ fn parse_openbook_fills_from_logs( match event { Ok(e) => { - let fill_event = e.with_time(block_time); + let fill_event = e.into_event(block_time, idx); if target_markets.contains_key(&fill_event.market) { fills_vector.push(fill_event); } diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs index bbba445..fcd89c6 100644 --- a/src/worker/trade_fetching/scrape.rs +++ b/src/worker/trade_fetching/scrape.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc::Sender; use crate::{ structs::openbook::OpenBookFillEvent, - utils::Config, + utils::{Config, OPENBOOK_KEY}, worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL}, }; @@ -55,7 +55,7 @@ pub async fn scrape_transactions( let mut sigs = match rpc_client .get_signatures_for_address_with_config( - &Pubkey::from_str("srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX").unwrap(), + &OPENBOOK_KEY, rpc_config, ) .await