From f74cec5d1344da248247f32e4f114892cfb2fa06 Mon Sep 17 00:00:00 2001 From: Lou-Kamades Date: Wed, 14 Jun 2023 02:19:27 -0500 Subject: [PATCH] refactor: remove scraper and use existing db schema --- .gitignore | 2 + Cargo.toml | 4 - src/backfill-trades/main.rs | 141 ------------- src/database/fetch.rs | 161 +++++++-------- src/database/initialize.rs | 95 +-------- src/database/insert.rs | 201 +------------------ src/structs/mod.rs | 1 - src/structs/openbook.rs | 107 +--------- src/structs/trader.rs | 8 +- src/structs/transaction.rs | 52 ----- src/worker/candle_batching/minute_candles.rs | 18 +- src/worker/candle_batching/mod.rs | 2 +- src/worker/main.rs | 21 -- src/worker/mod.rs | 1 - src/worker/trade_fetching/mod.rs | 2 - src/worker/trade_fetching/parsing.rs | 91 --------- src/worker/trade_fetching/scrape.rs | 121 ----------- 17 files changed, 95 insertions(+), 933 deletions(-) delete mode 100644 src/backfill-trades/main.rs delete mode 100644 src/structs/transaction.rs delete mode 100644 src/worker/trade_fetching/mod.rs delete mode 100644 src/worker/trade_fetching/parsing.rs delete mode 100644 src/worker/trade_fetching/scrape.rs diff --git a/.gitignore b/.gitignore index fedaa2b..8d062d8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /target .env +*.cer* +*.pks* \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index a4b3607..6604464 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,10 +15,6 @@ path = "src/worker/main.rs" name = "server" path = "src/server/main.rs" -[[bin]] -name = "backfill-trades" -path = "src/backfill-trades/main.rs" - [[bin]] name = "backfill-candles" path = "src/backfill-candles/main.rs" diff --git a/src/backfill-trades/main.rs b/src/backfill-trades/main.rs deleted file mode 100644 index c0a73b5..0000000 --- a/src/backfill-trades/main.rs +++ /dev/null @@ -1,141 +0,0 @@ -use anchor_lang::prelude::Pubkey; -use chrono::{DateTime, Duration, NaiveDateTime, Utc}; -use deadpool_postgres::Pool; -use log::debug; -use openbook_candles::{ - database::{ - initialize::{connect_to_database, setup_database}, - insert::build_transactions_insert_statement, - }, - structs::{ - markets::{fetch_market_infos, load_markets}, - transaction::{PgTransaction, NUM_TRANSACTION_PARTITIONS}, - }, - utils::{AnyhowWrap, Config, OPENBOOK_KEY}, - worker::trade_fetching::scrape::scrape_fills, -}; -use solana_client::{ - nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, -}; -use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; -use std::{collections::HashMap, env, str::FromStr}; - -#[tokio::main(flavor = "multi_thread", worker_threads = 10)] -async fn main() -> anyhow::Result<()> { - dotenv::dotenv().ok(); - let args: Vec = env::args().collect(); - assert!(args.len() == 2); - - let path_to_markets_json = &args[1]; - // let num_days = args[2].parse::().unwrap(); // TODO: implement - let num_days = 1; - let rpc_url: String = dotenv::var("RPC_URL").unwrap(); - - let config = Config { - rpc_url: rpc_url.clone(), - }; - let markets = load_markets(path_to_markets_json); - let market_infos = fetch_market_infos(&config, markets.clone()).await?; - let mut target_markets = HashMap::new(); - for m in market_infos.clone() { - target_markets.insert(Pubkey::from_str(&m.address)?, m.name); - } - println!("{:?}", target_markets); - - let pool = connect_to_database().await?; - setup_database(&pool).await?; - - let mut handles = vec![]; - - let rpc_clone = rpc_url.clone(); - let pool_clone = pool.clone(); - handles.push(tokio::spawn(async move { - fetch_signatures(rpc_clone, &pool_clone, num_days) - .await - .unwrap(); - })); - - // Low priority improvement: batch fills into 1000's per worker - for id in 0..NUM_TRANSACTION_PARTITIONS { - let rpc_clone = rpc_url.clone(); - let pool_clone = pool.clone(); - let markets_clone = target_markets.clone(); - handles.push(tokio::spawn(async move { - scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone) - .await - .unwrap(); - })); - } - - // TODO: spawn status thread - - futures::future::join_all(handles).await; - Ok(()) -} - -pub async fn fetch_signatures(rpc_url: String, pool: &Pool, num_days: i64) -> anyhow::Result<()> { - let mut before_sig: Option = None; - let mut now_time = Utc::now().timestamp(); - let end_time = (Utc::now() - Duration::days(num_days)).timestamp(); - let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); - - while now_time > end_time { - let rpc_config = GetConfirmedSignaturesForAddress2Config { - before: before_sig, - until: None, - limit: None, - commitment: Some(CommitmentConfig::confirmed()), - }; - - let sigs = match rpc_client - .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config) - .await - { - Ok(sigs) => sigs, - Err(e) => { - println!("Error fetching signatures: {}", e); - continue; - } - }; - if sigs.is_empty() { - println!("No signatures found, trying again"); - continue; - } - let last = sigs.last().unwrap(); - let last_time = last.block_time.unwrap().clone(); - let last_signature = last.signature.clone(); - let transactions = sigs - .into_iter() - .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) - .collect::>(); - - if transactions.is_empty() { - println!("No transactions found, trying again"); - } - debug!("writing: {:?} txns to DB\n", transactions.len()); - let upsert_statement = build_transactions_insert_statement(transactions); - let client = pool.get().await?; - client - .execute(&upsert_statement, &[]) - .await - .map_err_anyhow()?; - - now_time = last_time; - before_sig = Some(Signature::from_str(&last_signature)?); - let time_left = backfill_time_left(now_time, end_time); - println!( - "{} minutes ~ {} days remaining in the backfill\n", - time_left.num_minutes(), - time_left.num_days() - ); - } - Ok(()) -} - -fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration { - let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap(); - let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap(); - let cur_date = DateTime::::from_utc(naive_cur, Utc); - let bf_date = DateTime::::from_utc(naive_bf, Utc); - cur_date - bf_date -} diff --git a/src/database/fetch.rs b/src/database/fetch.rs index 36ce206..b7f3162 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -4,7 +4,6 @@ use crate::structs::{ openbook::PgOpenBookFill, resolution::Resolution, trader::PgTrader, - transaction::PgTransaction, }; use chrono::{DateTime, Utc}; use deadpool_postgres::{GenericClient, Pool}; @@ -16,13 +15,12 @@ pub async fn fetch_earliest_fill( let client = pool.get().await?; let stmt = r#"SELECT - time as "time!", - bid as "bid!", - maker as "maker!", - native_qty_paid as "native_qty_paid!", - native_qty_received as "native_qty_received!", - native_fee_or_rebate as "native_fee_or_rebate!" - from fills + block_datetime as "time", + bid as "bid", + maker as "maker", + price as "price", + size as "size" + from openbook.openbook_fill_events where market = $1 and maker = true ORDER BY time asc LIMIT 1"#; @@ -44,16 +42,15 @@ pub async fn fetch_fills_from( let client = pool.get().await?; let stmt = r#"SELECT - time as "time!", - bid as "bid!", - maker as "maker!", - native_qty_paid as "native_qty_paid!", - native_qty_received as "native_qty_received!", - native_fee_or_rebate as "native_fee_or_rebate!" - from fills + block_datetime as "time", + bid as "bid", + maker as "maker", + price as "price", + size as "size" + from openbook.openbook_fill_events where market = $1 - and time >= $2::timestamptz - and time < $3::timestamptz + and block_datetime >= $2::timestamptz + and block_datetime < $3::timestamptz and maker = true ORDER BY time asc"#; @@ -71,17 +68,17 @@ pub async fn fetch_latest_finished_candle( let client = pool.get().await?; let stmt = r#"SELECT - market_name as "market_name!", - start_time as "start_time!", - end_time as "end_time!", - resolution as "resolution!", - open as "open!", - close as "close!", - high as "high!", - low as "low!", - volume as "volume!", - complete as "complete!" - from candles + market_name as "market_name", + start_time as "start_time", + end_time as "end_time", + resolution as "resolution", + open as "open", + close as "close", + high as "high", + low as "low", + volume as "volume", + complete as "complete" + from openbook.candles where market_name = $1 and resolution = $2 and complete = true @@ -107,17 +104,17 @@ pub async fn fetch_earliest_candles( let client = pool.get().await?; let stmt = r#"SELECT - market_name as "market_name!", - start_time as "start_time!", - end_time as "end_time!", + market_name as "market_name", + start_time as "start_time", + end_time as "end_time", resolution as "resolution!", - open as "open!", - close as "close!", - high as "high!", - low as "low!", - volume as "volume!", - complete as "complete!" - from candles + open as "open", + close as "close", + high as "high", + low as "low", + volume as "volume", + complete as "complete" + from openbook.candles where market_name = $1 and resolution = $2 ORDER BY start_time asc"#; @@ -139,17 +136,17 @@ pub async fn fetch_candles_from( let client = pool.get().await?; let stmt = r#"SELECT - market_name as "market_name!", - start_time as "start_time!", - end_time as "end_time!", - resolution as "resolution!", - open as "open!", - close as "close!", - high as "high!", - low as "low!", - volume as "volume!", - complete as "complete!" - from candles + market_name as "market_name", + start_time as "start_time", + end_time as "end_time", + resolution as "resolution", + open as "open", + close as "close", + high as "high", + low as "low", + volume as "volume", + complete as "complete" + from openbook.candles where market_name = $1 and resolution = $2 and start_time >= $3 @@ -182,20 +179,20 @@ pub async fn fetch_top_traders_by_base_volume_from( let stmt = r#"SELECT open_orders_owner, sum( - native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END - ) as "raw_ask_size!", + native_quantity_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END + ) as "raw_ask_size", sum( - native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END - ) as "raw_bid_size!" - FROM fills + native_quantity_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END + ) as "raw_bid_size" + FROM openbook.openbook_fill_events WHERE market = $1 AND time >= $2 AND time < $3 GROUP BY open_orders_owner ORDER BY - sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) + sum(native_quantity_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) + - sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) + sum(native_quantity_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) DESC LIMIT 10000"#; @@ -217,20 +214,20 @@ pub async fn fetch_top_traders_by_quote_volume_from( let stmt = r#"SELECT open_orders_owner, sum( - native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END - ) as "raw_ask_size!", + native_quantity_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END + ) as "raw_ask_size", sum( - native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END - ) as "raw_bid_size!" - FROM fills + native_quantity_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END + ) as "raw_bid_size" + FROM openbook.openbook_fill_events WHERE market = $1 AND time >= $2 AND time < $3 GROUP BY open_orders_owner ORDER BY - sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) + sum(native_quantity_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) + - sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) + sum(native_quantity_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) DESC LIMIT 10000"#; @@ -249,20 +246,20 @@ pub async fn fetch_coingecko_24h_volume( let stmt = r#"SELECT t1.market, - COALESCE(t2.native_qty_received, 0) as "raw_base_size!", - COALESCE(t2.native_qty_paid, 0) as "raw_quote_size!" + COALESCE(t2.native_quantity_received, 0) as "raw_base_size", + COALESCE(t2.native_quantity_paid, 0) as "raw_quote_size" FROM ( SELECT distinct on (market) * - FROM fills f + FROM openbook.openbook_fill_events f where bid = true and market = any($1) order by market, "time" desc ) t1 LEFT JOIN ( select market, - sum(native_qty_received) as "native_qty_received", - sum(native_qty_paid) as "native_qty_paid" - from fills + sum(native_quantity_received) as "native_quantity_received", + sum(native_quantity_paid) as "native_quantity_paid" + from openbook.openbook_fill_events where "time" >= current_timestamp - interval '1 day' and bid = true group by market @@ -291,10 +288,10 @@ pub async fn fetch_coingecko_24h_high_low( ( SELECT * from - candles + openbook.candles where (market_name, start_time, resolution) in ( select market_name, max(start_time), resolution - from candles + from openbook.candles where "resolution" = '1M' and market_name = any($1) group by market_name, resolution @@ -307,7 +304,7 @@ pub async fn fetch_coingecko_24h_high_low( max(high) as "high", min(low) as "low" from - candles + openbook.candles where "resolution" = '1M' and "start_time" >= current_timestamp - interval '1 day' @@ -321,23 +318,3 @@ pub async fn fetch_coingecko_24h_high_low( .map(PgCoinGecko24HighLow::from_row) .collect()) } - -/// Fetches unprocessed, non-error transactions for the specified worker partition. -/// Pulls at most 50 transactions at a time. -pub async fn fetch_worker_transactions( - worker_id: i32, - pool: &Pool, -) -> anyhow::Result> { - let client = pool.get().await?; - - let stmt = r#"SELECT signature, program_pk, block_datetime, slot, err, "processed", worker_partition - FROM transactions - where worker_partition = $1 - and err = false - and processed = false - LIMIT 50"#; - - let rows = client.query(stmt, &[&worker_id]).await?; - - Ok(rows.into_iter().map(PgTransaction::from_row).collect()) -} diff --git a/src/database/initialize.rs b/src/database/initialize.rs index 6d032db..cd33e67 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -21,8 +21,8 @@ pub async fn connect_to_database() -> anyhow::Result { // openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks // base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64 - // fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills - // fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills + // fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a APP-NAME + // fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a APP-NAME let tls = if pg_config.pg_use_ssl { pg_config.pg.ssl_mode = Some(SslMode::Require); let ca_cert = fs::read(pg_config.pg_ca_cert_path.expect("reading ca cert from env")) @@ -66,11 +66,7 @@ pub async fn connect_to_database() -> anyhow::Result { } pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> { - let candles_table_fut = create_candles_table(pool); - let transactions_table_fut = create_transactions_table(pool); - let fills_table_fut = create_fills_table(pool); - let result = tokio::try_join!(candles_table_fut, transactions_table_fut, fills_table_fut); - match result { + match create_candles_table(pool).await { Ok(_) => { println!("Successfully configured database"); Ok(()) @@ -87,7 +83,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> { client .execute( - "CREATE TABLE IF NOT EXISTS candles ( + "CREATE TABLE IF NOT EXISTS openbook.candles ( id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY, market_name text, start_time timestamptz, @@ -105,90 +101,9 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> { .await?; client.execute( - "CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)", + "CREATE UNIQUE INDEX idx_market_time_resolution ON openbook.candles USING btree (market_name, start_time, resolution);", &[] ).await?; - client.execute( - "DO $$ - BEGIN - IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN - ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution); - END IF; - END $$", &[] - ).await?; - - Ok(()) -} - -pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { - let client = pool.get().await?; - - client - .execute( - "CREATE TABLE IF NOT EXISTS fills ( - signature text not null, - time timestamptz not null, - market text not null, - open_orders text not null, - open_orders_owner text not null, - bid bool not null, - maker bool not null, - native_qty_paid double precision not null, - native_qty_received double precision not null, - native_fee_or_rebate double precision not null, - fee_tier text not null, - order_id text not null, - log_index int4 not null, - CONSTRAINT fills_pk PRIMARY KEY (signature, log_index) - )", - &[], - ) - .await?; - - client - .execute( - "CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)", - &[], - ) - .await?; - Ok(()) -} - -pub async fn create_transactions_table(pool: &Pool) -> anyhow::Result<()> { - let client = pool.get().await?; - - client - .execute( - "CREATE TABLE IF NOT EXISTS transactions ( - signature text NOT NULL, - program_pk text NOT NULL, - block_datetime timestamptz NOT NULL, - slot bigint NOT NULL, - err bool NOT NULL, - processed bool NOT NULL, - worker_partition int4 NOT NULL, - CONSTRAINT transactions_pk PRIMARY KEY (signature, worker_partition) - ) PARTITION BY LIST (worker_partition);", - &[], - ) - .await?; - - client.batch_execute( - "CREATE INDEX IF NOT EXISTS transactions_processed_err_idx ON ONLY transactions (signature) WHERE processed IS NOT TRUE and err IS NOT TRUE; - CREATE INDEX IF NOT EXISTS transactions_program_pk_idx ON ONLY transactions USING btree (program_pk, slot DESC); - - CREATE TABLE IF NOT EXISTS transactions_0 PARTITION OF transactions FOR VALUES IN (0); - CREATE TABLE IF NOT EXISTS transactions_1 PARTITION OF transactions FOR VALUES IN (1); - CREATE TABLE IF NOT EXISTS transactions_2 PARTITION OF transactions FOR VALUES IN (2); - CREATE TABLE IF NOT EXISTS transactions_3 PARTITION OF transactions FOR VALUES IN (3); - CREATE TABLE IF NOT EXISTS transactions_4 PARTITION OF transactions FOR VALUES IN (4); - CREATE TABLE IF NOT EXISTS transactions_5 PARTITION OF transactions FOR VALUES IN (5); - CREATE TABLE IF NOT EXISTS transactions_6 PARTITION OF transactions FOR VALUES IN (6); - CREATE TABLE IF NOT EXISTS transactions_7 PARTITION OF transactions FOR VALUES IN (7); - CREATE TABLE IF NOT EXISTS transactions_8 PARTITION OF transactions FOR VALUES IN (8); - CREATE TABLE IF NOT EXISTS transactions_9 PARTITION OF transactions FOR VALUES IN (9);" - ).await?; - Ok(()) } diff --git a/src/database/insert.rs b/src/database/insert.rs index 6fe01ff..52aba85 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,153 +1,7 @@ -use deadpool_postgres::Pool; -use log::debug; -use std::collections::HashMap; -use tokio::sync::mpsc::{error::TryRecvError, Receiver}; - -use crate::{ - structs::{candle::Candle, openbook::OpenBookFillEvent, transaction::PgTransaction}, - utils::{to_timestampz, AnyhowWrap}, -}; - -pub async fn insert_fills_atomically( - pool: &Pool, - worker_id: i32, - fills: Vec, - signatures: Vec, -) -> anyhow::Result<()> { - let mut client = pool.get().await?; - - let db_txn = client.build_transaction().start().await?; - - // 1. Insert fills - if fills.len() > 0 { - let fills_statement = build_fills_upsert_statement_not_crazy(fills); - db_txn - .execute(&fills_statement, &[]) - .await - .map_err_anyhow() - .unwrap(); - } - - // 2. Update txns table as processed - let transactions_statement = - build_transactions_processed_update_statement(worker_id, signatures); - db_txn - .execute(&transactions_statement, &[]) - .await - .map_err_anyhow() - .unwrap(); - - db_txn.commit().await?; - - Ok(()) -} - -pub async fn persist_fill_events( - pool: &Pool, - fill_receiver: &mut Receiver, -) -> anyhow::Result<()> { - loop { - let mut write_batch = HashMap::new(); - while write_batch.len() < 10 { - match fill_receiver.try_recv() { - Ok(event) => { - write_batch.entry(event).or_insert(0); - } - Err(TryRecvError::Empty) => { - if !write_batch.is_empty() { - break; - } else { - continue; - } - } - Err(TryRecvError::Disconnected) => { - panic!("Fills sender must stay alive") - } - }; - } - - if !write_batch.is_empty() { - debug!("writing: {:?} events to DB\n", write_batch.len()); - let upsert_statement = build_fills_upsert_statement(write_batch); - let client = pool.get().await?; - client - .execute(&upsert_statement, &[]) - .await - .map_err_anyhow() - .unwrap(); - } - } -} - -#[allow(deprecated)] -fn build_fills_upsert_statement(events: HashMap) -> String { - let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); - for (idx, event) in events.keys().enumerate() { - let val_str = format!( - "({}, \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", - event.signature, - to_timestampz(event.block_time as u64).to_rfc3339(), - event.market, - event.open_orders, - event.open_orders_owner, - event.bid, - event.maker, - event.native_qty_paid, - event.native_qty_received, - event.native_fee_or_rebate, - event.fee_tier, - event.order_id, - event.log_index, - ); - - if idx == 0 { - stmt = format!("{} {}", &stmt, val_str); - } else { - stmt = format!("{}, {}", &stmt, val_str); - } - } - - let handle_conflict = "ON CONFLICT DO NOTHING"; - - stmt = format!("{} {}", stmt, handle_conflict); - stmt -} - -fn build_fills_upsert_statement_not_crazy(fills: Vec) -> String { - let mut stmt = String::from("INSERT INTO fills (signature, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id, log_index) VALUES"); - for (idx, fill) in fills.iter().enumerate() { - let val_str = format!( - "(\'{}\', \'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {}, {}, {})", - fill.signature, - to_timestampz(fill.block_time as u64).to_rfc3339(), - fill.market, - fill.open_orders, - fill.open_orders_owner, - fill.bid, - fill.maker, - fill.native_qty_paid, - fill.native_qty_received, - fill.native_fee_or_rebate, - fill.fee_tier, - fill.order_id, - fill.log_index, - ); - - if idx == 0 { - stmt = format!("{} {}", &stmt, val_str); - } else { - stmt = format!("{}, {}", &stmt, val_str); - } - } - - let handle_conflict = "ON CONFLICT DO NOTHING"; - - stmt = format!("{} {}", stmt, handle_conflict); - stmt -} +use crate::structs::candle::Candle; pub fn build_candles_upsert_statement(candles: &Vec) -> String { - let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES"); + let mut stmt = String::from("INSERT INTO openbook.candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES"); for (idx, candle) in candles.iter().enumerate() { let val_str = format!( "(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {})", @@ -183,54 +37,3 @@ pub fn build_candles_upsert_statement(candles: &Vec) -> String { stmt = format!("{} {}", stmt, handle_conflict); stmt } - -pub fn build_transactions_insert_statement(transactions: Vec) -> String { - let mut stmt = String::from("INSERT INTO transactions (signature, program_pk, block_datetime, slot, err, processed, worker_partition) VALUES"); - for (idx, txn) in transactions.iter().enumerate() { - let val_str = format!( - "(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {})", - txn.signature, - txn.program_pk, - txn.block_datetime.to_rfc3339(), - txn.slot, - txn.err, - txn.processed, - txn.worker_partition, - ); - - if idx == 0 { - stmt = format!("{} {}", &stmt, val_str); - } else { - stmt = format!("{}, {}", &stmt, val_str); - } - } - - let handle_conflict = "ON CONFLICT DO NOTHING"; - - stmt = format!("{} {}", stmt, handle_conflict); - stmt -} - -pub fn build_transactions_processed_update_statement( - worker_id: i32, - processed_signatures: Vec, -) -> String { - let mut stmt = String::from( - "UPDATE transactions - SET processed = true - WHERE transactions.signature IN (", - ); - for (idx, sig) in processed_signatures.iter().enumerate() { - let val_str = if idx == processed_signatures.len() - 1 { - format!("\'{}\'", sig,) - } else { - format!("\'{}\',", sig,) - }; - stmt = format!("{} {}", &stmt, val_str); - } - - let worker_stmt = format!(") AND worker_partition = {} ", worker_id); - - stmt = format!("{} {}", stmt, worker_stmt); - stmt -} diff --git a/src/structs/mod.rs b/src/structs/mod.rs index e161e2c..113578c 100644 --- a/src/structs/mod.rs +++ b/src/structs/mod.rs @@ -6,4 +6,3 @@ pub mod resolution; pub mod slab; pub mod trader; pub mod tradingview; -pub mod transaction; diff --git a/src/structs/openbook.rs b/src/structs/openbook.rs index d3c0c0b..9fdf699 100644 --- a/src/structs/openbook.rs +++ b/src/structs/openbook.rs @@ -1,83 +1,15 @@ -use anchor_lang::{event, AnchorDeserialize, AnchorSerialize}; +use anchor_lang::AnchorDeserialize; use chrono::{DateTime, Utc}; use num_traits::Pow; -use solana_sdk::pubkey::Pubkey; use tokio_postgres::Row; -#[event] -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct OpenBookFillEventRaw { - pub market: Pubkey, - pub open_orders: Pubkey, - pub open_orders_owner: Pubkey, - pub bid: bool, - pub maker: bool, - pub native_qty_paid: u64, - pub native_qty_received: u64, - pub native_fee_or_rebate: u64, - pub order_id: u128, - pub owner_slot: u8, - pub fee_tier: u8, - pub client_order_id: Option, - pub referrer_rebate: Option, -} -impl OpenBookFillEventRaw { - pub fn into_event( - self, - signature: String, - block_time: i64, - log_index: usize, - ) -> OpenBookFillEvent { - OpenBookFillEvent { - signature, - market: self.market, - open_orders: self.open_orders, - open_orders_owner: self.open_orders_owner, - bid: self.bid, - maker: self.maker, - native_qty_paid: self.native_qty_paid, - native_qty_received: self.native_qty_received, - native_fee_or_rebate: self.native_fee_or_rebate, - order_id: self.order_id, - owner_slot: self.owner_slot, - fee_tier: self.fee_tier, - client_order_id: self.client_order_id, - referrer_rebate: self.referrer_rebate, - block_time, - log_index, - } - } -} - -#[event] -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct OpenBookFillEvent { - pub signature: String, - pub market: Pubkey, - pub open_orders: Pubkey, - pub open_orders_owner: Pubkey, - pub bid: bool, - pub maker: bool, - pub native_qty_paid: u64, - pub native_qty_received: u64, - pub native_fee_or_rebate: u64, - pub order_id: u128, - pub owner_slot: u8, - pub fee_tier: u8, - pub client_order_id: Option, - pub referrer_rebate: Option, - pub block_time: i64, - pub log_index: usize, -} - #[derive(Copy, Clone, Debug, PartialEq)] pub struct PgOpenBookFill { pub time: DateTime, pub bid: bool, pub maker: bool, - pub native_qty_paid: f64, - pub native_qty_received: f64, - pub native_fee_or_rebate: f64, + pub price: f64, + pub size: f64, } impl PgOpenBookFill { pub fn from_row(row: Row) -> Self { @@ -85,9 +17,8 @@ impl PgOpenBookFill { time: row.get(0), bid: row.get(1), maker: row.get(2), - native_qty_paid: row.get(3), - native_qty_received: row.get(4), - native_fee_or_rebate: row.get(5), + price: row.get(3), + size: row.get(4), } } } @@ -147,34 +78,6 @@ pub struct MarketState { pub referrer_rebates_accrued: u64, } -pub fn calculate_fill_price_and_size( - fill: PgOpenBookFill, - base_decimals: u8, - quote_decimals: u8, -) -> (f64, f64) { - if fill.bid { - let price_before_fees = if fill.maker { - fill.native_qty_paid + fill.native_fee_or_rebate - } else { - fill.native_qty_paid - fill.native_fee_or_rebate - }; - let price = (price_before_fees * token_factor(base_decimals)) - / (token_factor(quote_decimals) * fill.native_qty_received); - let size = fill.native_qty_received / token_factor(base_decimals); - (price, size) - } else { - let price_before_fees = if fill.maker { - fill.native_qty_received - fill.native_fee_or_rebate - } else { - fill.native_qty_received + fill.native_fee_or_rebate - }; - let price = (price_before_fees * token_factor(base_decimals)) - / (token_factor(quote_decimals) * fill.native_qty_paid); - let size = fill.native_qty_paid / token_factor(base_decimals); - (price, size) - } -} - pub fn token_factor(decimals: u8) -> f64 { 10f64.pow(decimals as f64) } diff --git a/src/structs/trader.rs b/src/structs/trader.rs index be0b82e..213e79f 100644 --- a/src/structs/trader.rs +++ b/src/structs/trader.rs @@ -9,8 +9,8 @@ use super::openbook::token_factor; #[derive(Clone, Debug, PartialEq)] pub struct PgTrader { pub open_orders_owner: String, - pub raw_ask_size: f64, - pub raw_bid_size: f64, + pub raw_ask_size: i64, + pub raw_bid_size: i64, } impl PgTrader { pub fn from_row(row: Row) -> Self { @@ -52,8 +52,8 @@ pub struct TraderResponse { // Note that the Postgres queries only return volumes in base or quote pub fn calculate_trader_volume(trader: PgTrader, decimals: u8) -> Trader { - let bid_size = trader.raw_bid_size / token_factor(decimals); - let ask_size = trader.raw_ask_size / token_factor(decimals); + let bid_size = (trader.raw_bid_size as f64) / token_factor(decimals); + let ask_size = (trader.raw_ask_size as f64) / token_factor(decimals); Trader { pubkey: trader.open_orders_owner, diff --git a/src/structs/transaction.rs b/src/structs/transaction.rs deleted file mode 100644 index 47909ad..0000000 --- a/src/structs/transaction.rs +++ /dev/null @@ -1,52 +0,0 @@ -use chrono::{DateTime, Utc}; -use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature; -use tokio_postgres::Row; - -use crate::utils::{to_timestampz, OPENBOOK_KEY}; - -#[derive(Clone, Debug, PartialEq)] -pub struct PgTransaction { - pub signature: String, - pub program_pk: String, - pub block_datetime: DateTime, - pub slot: u64, - pub err: bool, - pub processed: bool, - pub worker_partition: i32, -} - -pub const NUM_TRANSACTION_PARTITIONS: u64 = 10; - -impl PgTransaction { - pub fn from_rpc_confirmed_transaction( - rpc_confirmed_transaction: RpcConfirmedTransactionStatusWithSignature, - ) -> Self { - PgTransaction { - signature: rpc_confirmed_transaction.signature, - program_pk: OPENBOOK_KEY.to_string(), - block_datetime: to_timestampz(rpc_confirmed_transaction.block_time.unwrap() as u64), - slot: rpc_confirmed_transaction.slot, - err: rpc_confirmed_transaction.err.is_some(), - processed: false, - worker_partition: (rpc_confirmed_transaction.slot % NUM_TRANSACTION_PARTITIONS) as i32, - } - } - - pub fn from_row(row: Row) -> Self { - let slot_raw = row.get::(3); - PgTransaction { - signature: row.get(0), - program_pk: row.get(1), - block_datetime: row.get(2), - slot: slot_raw as u64, - err: row.get(4), - processed: row.get(5), - worker_partition: row.get(6), - } - } -} - -pub enum ProcessState { - Processed, - Unprocessed, -} diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index 1dcc0b8..1d29559 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -9,7 +9,7 @@ use crate::{ structs::{ candle::Candle, markets::MarketInfo, - openbook::{calculate_fill_price_and_size, PgOpenBookFill}, + openbook::PgOpenBookFill, resolution::{day, Resolution}, }, utils::{f64_max, f64_min}, @@ -86,9 +86,7 @@ fn combine_fills_into_1m_candles( Some(p) => p, None => { let first = fills_iter.peek().unwrap(); - let (price, _) = - calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals); - price + first.price } }; @@ -100,15 +98,13 @@ fn combine_fills_into_1m_candles( while matches!(fills_iter.peek(), Some(f) if f.time < end_time) { let fill = fills_iter.next().unwrap(); - let (price, volume) = - calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals); - candles[i].close = price; - candles[i].low = f64_min(price, candles[i].low); - candles[i].high = f64_max(price, candles[i].high); - candles[i].volume += volume; + candles[i].close = fill.price; + candles[i].low = f64_min(fill.price, candles[i].low); + candles[i].high = f64_max(fill.price, candles[i].high); + candles[i].volume += fill.size; - last_price = price; + last_price = fill.price; } candles[i].start_time = start_time; diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index b6f25a4..04d0550 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -56,7 +56,7 @@ async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { .with_label_values(&[market.name.as_str()]) .inc_by(candles.clone().len() as u64); save_candles(pool, candles).await?; - } + } Ok(()) } diff --git a/src/worker/main.rs b/src/worker/main.rs index 35b6161..f37e6d8 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,11 +1,9 @@ use log::{error, info}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; -use openbook_candles::structs::transaction::NUM_TRANSACTION_PARTITIONS; use openbook_candles::utils::Config; use openbook_candles::worker::metrics::{ serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, }; -use openbook_candles::worker::trade_fetching::scrape::{scrape_fills, scrape_signatures}; use openbook_candles::{ database::initialize::{connect_to_database, setup_database}, worker::candle_batching::batch_for_market, @@ -40,25 +38,6 @@ async fn main() -> anyhow::Result<()> { setup_database(&pool).await?; let mut handles = vec![]; - // signature scraping - let rpc_clone = rpc_url.clone(); - let pool_clone = pool.clone(); - handles.push(tokio::spawn(async move { - scrape_signatures(rpc_clone, &pool_clone).await.unwrap(); - })); - - // transaction/fill scraping - for id in 0..NUM_TRANSACTION_PARTITIONS { - let rpc_clone = rpc_url.clone(); - let pool_clone = pool.clone(); - let markets_clone = target_markets.clone(); - handles.push(tokio::spawn(async move { - scrape_fills(id as i32, rpc_clone, &pool_clone, &markets_clone) - .await - .unwrap(); - })); - } - // candle batching for market in market_infos.into_iter() { let batch_pool = pool.clone(); diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 53437ac..39ce57d 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,3 +1,2 @@ pub mod candle_batching; pub mod metrics; -pub mod trade_fetching; diff --git a/src/worker/trade_fetching/mod.rs b/src/worker/trade_fetching/mod.rs deleted file mode 100644 index 4e93e83..0000000 --- a/src/worker/trade_fetching/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod parsing; -pub mod scrape; diff --git a/src/worker/trade_fetching/parsing.rs b/src/worker/trade_fetching/parsing.rs deleted file mode 100644 index d5d1491..0000000 --- a/src/worker/trade_fetching/parsing.rs +++ /dev/null @@ -1,91 +0,0 @@ -use log::warn; -use solana_client::client_error::Result as ClientResult; -use solana_sdk::pubkey::Pubkey; -use solana_transaction_status::{ - option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta, -}; -use std::{collections::HashMap, io::Error}; - -use crate::{ - structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw}, - worker::metrics::METRIC_RPC_ERRORS_TOTAL, -}; - -const PROGRAM_DATA: &str = "Program data: "; - -pub fn parse_trades_from_openbook_txns( - txns: &mut Vec>, - sig_strings: &Vec, - target_markets: &HashMap, -) -> Vec { - let mut fills_vector = Vec::::new(); - for (idx, txn) in txns.iter_mut().enumerate() { - match txn { - Ok(t) => { - if let Some(m) = &t.transaction.meta { - match &m.log_messages { - OptionSerializer::Some(logs) => { - match parse_openbook_fills_from_logs( - logs, - target_markets, - sig_strings[idx].clone(), - t.block_time.unwrap(), - ) { - Some(mut events) => fills_vector.append(&mut events), - None => {} - } - } - OptionSerializer::None => {} - OptionSerializer::Skip => {} - } - } - } - Err(e) => { - warn!("rpc error in get_transaction {}", e); - METRIC_RPC_ERRORS_TOTAL - .with_label_values(&["getTransaction"]) - .inc(); - } - } - } - fills_vector -} - -fn parse_openbook_fills_from_logs( - logs: &Vec, - target_markets: &HashMap, - signature: String, - block_time: i64, -) -> Option> { - let mut fills_vector = Vec::::new(); - for (idx, l) in logs.iter().enumerate() { - match l.strip_prefix(PROGRAM_DATA) { - Some(log) => { - let borsh_bytes = match anchor_lang::__private::base64::decode(log) { - Ok(borsh_bytes) => borsh_bytes, - _ => continue, - }; - let mut slice: &[u8] = &borsh_bytes[8..]; - let event: Result = - anchor_lang::AnchorDeserialize::deserialize(&mut slice); - - match event { - Ok(e) => { - let fill_event = e.into_event(signature.clone(), block_time, idx); - if target_markets.contains_key(&fill_event.market) { - fills_vector.push(fill_event); - } - } - _ => continue, - } - } - _ => (), - } - } - - if !fills_vector.is_empty() { - Some(fills_vector) - } else { - None - } -} diff --git a/src/worker/trade_fetching/scrape.rs b/src/worker/trade_fetching/scrape.rs deleted file mode 100644 index 19bd514..0000000 --- a/src/worker/trade_fetching/scrape.rs +++ /dev/null @@ -1,121 +0,0 @@ -use deadpool_postgres::Pool; -use futures::future::join_all; -use log::{debug, info, warn}; -use solana_client::{ - nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, - rpc_config::RpcTransactionConfig, -}; -use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature}; -use solana_transaction_status::UiTransactionEncoding; -use std::{collections::HashMap, str::FromStr}; -use tokio::sync::mpsc::Sender; - -use crate::{ - database::{ - fetch::fetch_worker_transactions, - insert::{build_transactions_insert_statement, insert_fills_atomically}, - }, - structs::{openbook::OpenBookFillEvent, transaction::PgTransaction}, - utils::{AnyhowWrap, Config, OPENBOOK_KEY}, - worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL, METRIC_TRANSACTIONS_TOTAL}, -}; - -use super::parsing::parse_trades_from_openbook_txns; - - -pub async fn scrape_signatures(rpc_url: String, pool: &Pool) -> anyhow::Result<()> { - let rpc_client = RpcClient::new_with_commitment(rpc_url.clone(), CommitmentConfig::confirmed()); - - loop { - let rpc_config = GetConfirmedSignaturesForAddress2Config { - before: None, - until: None, - limit: None, - commitment: Some(CommitmentConfig::confirmed()), - }; - - let sigs = match rpc_client - .get_signatures_for_address_with_config(&OPENBOOK_KEY, rpc_config) - .await - { - Ok(sigs) => sigs, - Err(e) => { - warn!("rpc error in get_signatures_for_address_with_config: {}", e); - METRIC_RPC_ERRORS_TOTAL - .with_label_values(&["getSignaturesForAddress"]) - .inc(); - continue; - } - }; - if sigs.is_empty() { - debug!("No signatures found, trying again"); - continue; - } - let transactions: Vec = sigs - .into_iter() - .map(|s| PgTransaction::from_rpc_confirmed_transaction(s)) - .collect(); - - debug!("Scraper writing: {:?} txns to DB\n", transactions.len()); - let upsert_statement = build_transactions_insert_statement(transactions); - let client = pool.get().await?; - let num_txns = client - .execute(&upsert_statement, &[]) - .await - .map_err_anyhow()?; - METRIC_TRANSACTIONS_TOTAL.inc_by(num_txns); - } - // TODO: graceful shutdown - Ok(()) -} - -pub async fn scrape_fills( - worker_id: i32, - rpc_url: String, - pool: &Pool, - target_markets: &HashMap, -) -> anyhow::Result<()> { - debug!("Worker {} started \n", worker_id); - let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::confirmed()); - - loop { - let transactions = fetch_worker_transactions(worker_id, pool).await?; - if transactions.len() == 0 { - debug!("No signatures found by worker {}", worker_id); - tokio::time::sleep(WaitDuration::from_secs(1)).await; - continue; - }; - - // for each signature, fetch the transaction - let txn_config = RpcTransactionConfig { - encoding: Some(UiTransactionEncoding::Json), - commitment: Some(CommitmentConfig::confirmed()), - max_supported_transaction_version: Some(0), - }; - - let sig_strings = transactions - .iter() - .map(|t| t.signature.clone()) - .collect::>(); - - let signatures: Vec<_> = transactions - .into_iter() - .map(|t| t.signature.parse::().unwrap()) - .collect(); - - let txn_futs: Vec<_> = signatures - .iter() - .map(|s| rpc_client.get_transaction_with_config(s, txn_config)) - .collect(); - - let mut txns = join_all(txn_futs).await; - - // TODO: reenable total fills metric - let fills = parse_trades_from_openbook_txns(&mut txns, &sig_strings, target_markets); - - // Write any fills to the database, and update the transactions as processed - insert_fills_atomically(pool, worker_id, fills, sig_strings).await?; - } - - Ok(()) -}