diff --git a/candle-creator/Cargo.lock b/candle-creator/Cargo.lock index 8883825..45f6815 100644 --- a/candle-creator/Cargo.lock +++ b/candle-creator/Cargo.lock @@ -1570,6 +1570,9 @@ name = "either" version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" +dependencies = [ + "serde", +] [[package]] name = "encode_unicode" @@ -5982,9 +5985,12 @@ dependencies = [ "dotenvy", "either", "heck 0.4.0", + "hex", "once_cell", "proc-macro2 1.0.50", "quote 1.0.23", + "serde", + "serde_json", "sha2 0.10.6", "sqlx-core", "sqlx-rt", diff --git a/candle-creator/Cargo.toml b/candle-creator/Cargo.toml index 08880f6..a318338 100644 --- a/candle-creator/Cargo.toml +++ b/candle-creator/Cargo.toml @@ -11,7 +11,7 @@ tokio-stream = "0.1" jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] } -sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres", "chrono", "decimal" ] } +sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres", "chrono", "decimal", "offline" ] } chrono = "0.4.23" solana-client = "=1.14.13" diff --git a/candle-creator/sqlx-data.json b/candle-creator/sqlx-data.json new file mode 100644 index 0000000..351deeb --- /dev/null +++ b/candle-creator/sqlx-data.json @@ -0,0 +1,392 @@ +{ + "db": "PostgreSQL", + "101e71cebdce376dec54e861042d6ee0afd56ecf79738886d85fe289059c5902": { + "describe": { + "columns": [ + { + "name": "time!", + "ordinal": 0, + "type_info": "Timestamptz" + }, + { + "name": "bid!", + "ordinal": 1, + "type_info": "Bool" + }, + { + "name": "maker!", + "ordinal": 2, + "type_info": "Bool" + }, + { + "name": "native_qty_paid!", + "ordinal": 3, + "type_info": "Numeric" + }, + { + "name": "native_qty_received!", + "ordinal": 4, + "type_info": "Numeric" + }, + { + "name": "native_fee_or_rebate!", + "ordinal": 5, + "type_info": "Numeric" + } + ], + "nullable": [ + false, + false, + false, + false, + false, + false + ], + "parameters": { + "Left": [ + "Text", + "Timestamptz", + "Timestamptz" + ] + } + }, + "query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1\n and time >= $2\n and time < $3 \n ORDER BY time asc" + }, + "35e8220c601aca620da1cfcb978c8b7a64dcbf15550521b418cf509015cd88d8": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [] + } + }, + "query": "CREATE TABLE IF NOT EXISTS fills (\n id numeric PRIMARY KEY,\n time timestamptz not null,\n market text not null,\n open_orders text not null,\n open_orders_owner text not null,\n bid bool not null,\n maker bool not null,\n native_qty_paid numeric not null,\n native_qty_received numeric not null,\n native_fee_or_rebate numeric not null,\n fee_tier text not null,\n order_id text not null\n )" + }, + "4bab7d4329b2969b2ba610546c660207740c9bafe644df55fa57101df30e4899": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [] + } + }, + "query": "CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)" + }, + "61ce67d221cf35cea33940529f7d38af1514961245f4abc95b872e88cc0dc1e0": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [] + } + }, + "query": "DO $$\n BEGIN\n IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN\n ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market, start_time, resolution);\n END IF;\n END $$" + }, + "817ee7903cb5095f85fb787beff04ace3a452cf8749205bb230e41d8c9e03c4a": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [] + } + }, + "query": "CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)" + }, + "830392e8e03b8e34490df87905873ffea0749f6d321bd32a99b90766d3d7e167": { + "describe": { + "columns": [ + { + "name": "start_time!", + "ordinal": 0, + "type_info": "Timestamptz" + }, + { + "name": "end_time!", + "ordinal": 1, + "type_info": "Timestamptz" + }, + { + "name": "resolution!", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "market!", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "open!", + "ordinal": 4, + "type_info": "Numeric" + }, + { + "name": "close!", + "ordinal": 5, + "type_info": "Numeric" + }, + { + "name": "high!", + "ordinal": 6, + "type_info": "Numeric" + }, + { + "name": "low!", + "ordinal": 7, + "type_info": "Numeric" + }, + { + "name": "volume!", + "ordinal": 8, + "type_info": "Numeric" + }, + { + "name": "complete!", + "ordinal": 9, + "type_info": "Bool" + } + ], + "nullable": [ + true, + true, + true, + true, + true, + true, + true, + true, + true, + true + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + } + }, + "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market as \"market!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market = $1\n and resolution = $2\n and complete = true\n ORDER BY start_time desc LIMIT 1" + }, + "900d4bd7a81a308648cd47eecb3b86b5e7afbbdc34e93ef35393ceab00fb8552": { + "describe": { + "columns": [ + { + "name": "start_time!", + "ordinal": 0, + "type_info": "Timestamptz" + }, + { + "name": "end_time!", + "ordinal": 1, + "type_info": "Timestamptz" + }, + { + "name": "resolution!", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "market!", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "open!", + "ordinal": 4, + "type_info": "Numeric" + }, + { + "name": "close!", + "ordinal": 5, + "type_info": "Numeric" + }, + { + "name": "high!", + "ordinal": 6, + "type_info": "Numeric" + }, + { + "name": "low!", + "ordinal": 7, + "type_info": "Numeric" + }, + { + "name": "volume!", + "ordinal": 8, + "type_info": "Numeric" + }, + { + "name": "complete!", + "ordinal": 9, + "type_info": "Bool" + } + ], + "nullable": [ + true, + true, + true, + true, + true, + true, + true, + true, + true, + true + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + } + }, + "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market as \"market!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market = $1\n and resolution = $2\n ORDER BY start_time asc LIMIT 1" + }, + "aa9adbbc7f215e28cc07b615f30ad9d3f415f25260cc87a1556225f01e0ef3be": { + "describe": { + "columns": [ + { + "name": "start_time!", + "ordinal": 0, + "type_info": "Timestamptz" + }, + { + "name": "end_time!", + "ordinal": 1, + "type_info": "Timestamptz" + }, + { + "name": "resolution!", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "market!", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "open!", + "ordinal": 4, + "type_info": "Numeric" + }, + { + "name": "close!", + "ordinal": 5, + "type_info": "Numeric" + }, + { + "name": "high!", + "ordinal": 6, + "type_info": "Numeric" + }, + { + "name": "low!", + "ordinal": 7, + "type_info": "Numeric" + }, + { + "name": "volume!", + "ordinal": 8, + "type_info": "Numeric" + }, + { + "name": "complete!", + "ordinal": 9, + "type_info": "Bool" + } + ], + "nullable": [ + true, + true, + true, + true, + true, + true, + true, + true, + true, + true + ], + "parameters": { + "Left": [ + "Text", + "Text", + "Timestamptz", + "Timestamptz" + ] + } + }, + "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market as \"market!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n and complete = true\n ORDER BY start_time asc" + }, + "c1ed9567bf732245975182fbf216da5858f89df285f8a5f17fb58ac1f33cc0e9": { + "describe": { + "columns": [ + { + "name": "time!", + "ordinal": 0, + "type_info": "Timestamptz" + }, + { + "name": "bid!", + "ordinal": 1, + "type_info": "Bool" + }, + { + "name": "maker!", + "ordinal": 2, + "type_info": "Bool" + }, + { + "name": "native_qty_paid!", + "ordinal": 3, + "type_info": "Numeric" + }, + { + "name": "native_qty_received!", + "ordinal": 4, + "type_info": "Numeric" + }, + { + "name": "native_fee_or_rebate!", + "ordinal": 5, + "type_info": "Numeric" + } + ], + "nullable": [ + false, + false, + false, + false, + false, + false + ], + "parameters": { + "Left": [ + "Text" + ] + } + }, + "query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1 \n ORDER BY time asc LIMIT 1" + }, + "ebf9f73491ea62c20a25245080abb0be928e22a0d622fafa48bb01db34e84b94": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [] + } + }, + "query": "CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market, start_time, resolution)" + }, + "ef6422f34cc3e649a90fbbbc6ad668de6ee4b0994c52f72d0295985085d7047b": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [] + } + }, + "query": "CREATE TABLE IF NOT EXISTS candles (\n id serial,\n market text,\n start_time timestamptz,\n end_time timestamptz,\n resolution text,\n open numeric,\n close numeric,\n high numeric,\n low numeric,\n volume numeric,\n complete bool\n )" + } +} \ No newline at end of file diff --git a/candle-creator/src/candle_batching/higher_order_candles.rs b/candle-creator/src/candle_batching/higher_order_candles.rs index 8853d6a..9b5024c 100644 --- a/candle-creator/src/candle_batching/higher_order_candles.rs +++ b/candle-creator/src/candle_batching/higher_order_candles.rs @@ -4,7 +4,7 @@ use sqlx::{types::Decimal, Pool, Postgres}; use std::cmp::{max, min}; use crate::{ - candle_batching::DAY, + candle_batching::day, database::{ fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle}, Candle, Resolution, @@ -22,7 +22,7 @@ pub async fn batch_higher_order_candles( match latest_candle { Some(candle) => { let start_time = candle.end_time; - let end_time = start_time + DAY(); + let end_time = start_time + day(); // println!("candle.end_time: {:?}", candle.end_time); // println!("start_time: {:?}", start_time); let mut constituent_candles = fetch_candles_from( @@ -63,8 +63,8 @@ pub async fn batch_higher_order_candles( let start_time = constituent_candle .unwrap() .start_time - .duration_trunc(DAY())?; - let end_time = start_time + DAY(); + .duration_trunc(day())?; + let end_time = start_time + day(); let mut constituent_candles = fetch_candles_from( pool, @@ -106,7 +106,7 @@ fn combine_into_higher_order_candles( let empty_candle = Candle::create_empty_candle(constituent_candles[0].market.clone(), target_resolution); let mut combined_candles = - vec![empty_candle; (DAY().num_minutes() / duration.num_minutes()) as usize]; + vec![empty_candle; (day().num_minutes() / duration.num_minutes()) as usize]; println!("candles_len: {}", candles_len); diff --git a/candle-creator/src/candle_batching/minute_candles.rs b/candle-creator/src/candle_batching/minute_candles.rs index b6e1f02..e15687e 100644 --- a/candle-creator/src/candle_batching/minute_candles.rs +++ b/candle-creator/src/candle_batching/minute_candles.rs @@ -9,7 +9,7 @@ use crate::database::{ Candle, MarketInfo, PgOpenBookFill, Resolution, }; -use super::DAY; +use super::day; pub async fn batch_1m_candles( pool: &Pool, @@ -23,7 +23,7 @@ pub async fn batch_1m_candles( Some(candle) => { let start_time = candle.end_time; let end_time = min( - start_time + DAY(), + start_time + day(), Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = @@ -50,7 +50,7 @@ pub async fn batch_1m_candles( .time .duration_trunc(Duration::minutes(1))?; let end_time = min( - start_time + DAY(), + start_time + day(), Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = diff --git a/candle-creator/src/candle_batching/mod.rs b/candle-creator/src/candle_batching/mod.rs index bbab621..9fd7025 100644 --- a/candle-creator/src/candle_batching/mod.rs +++ b/candle-creator/src/candle_batching/mod.rs @@ -1,27 +1,19 @@ pub mod higher_order_candles; pub mod minute_candles; -use std::cmp::{max, min}; - -use chrono::{DateTime, Duration, DurationRound, Utc}; -use num_traits::{FromPrimitive, Zero}; -use sqlx::{types::Decimal, Pool, Postgres}; +use chrono::Duration; +use sqlx::{Pool, Postgres}; use strum::IntoEnumIterator; use tokio::{sync::mpsc::Sender, time::sleep}; use crate::{ candle_batching::minute_candles::batch_1m_candles, - database::{ - fetch::{ - fetch_candles_from, fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle, - }, - Candle, MarketInfo, PgOpenBookFill, Resolution, - }, + database::{Candle, MarketInfo, Resolution}, }; use self::higher_order_candles::batch_higher_order_candles; -pub fn DAY() -> Duration { +pub fn day() -> Duration { Duration::days(1) } @@ -30,7 +22,7 @@ pub async fn batch_candles( candles_sender: &Sender>, markets: Vec, ) { - // tokio spawn a taks for every market + // TODO: tokio spawn a taks for every market loop { let m = MarketInfo { diff --git a/candle-creator/src/database/initialize.rs b/candle-creator/src/database/initialize.rs index bf1c35e..55f379c 100644 --- a/candle-creator/src/database/initialize.rs +++ b/candle-creator/src/database/initialize.rs @@ -1,18 +1,7 @@ -use chrono::Utc; use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; -use std::{ - collections::hash_map::DefaultHasher, - hash::{Hash, Hasher}, - time::{Duration, Instant}, -}; -use tokio::sync::mpsc::{error::TryRecvError, Receiver}; +use std::time::Duration; -use crate::{ - trade_fetching::parsing::OpenBookFillEventLog, - utils::{AnyhowWrap, Config}, -}; - -use super::MarketInfo; +use crate::utils::{AnyhowWrap, Config}; pub async fn connect_to_database(config: &Config) -> anyhow::Result> { loop { @@ -29,7 +18,7 @@ pub async fn connect_to_database(config: &Config) -> anyhow::Result, markets: Vec) -> anyhow::Result<()> { +pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> { let candles_table_fut = create_candles_table(pool); let fills_table_fut = create_fills_table(pool); let result = tokio::try_join!(candles_table_fut, fills_table_fut); @@ -71,7 +60,12 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> { ).execute(&mut tx).await?; sqlx::query!( - "ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market, start_time, resolution)" + "DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN + ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market, start_time, resolution); + END IF; + END $$" ) .execute(&mut tx) .await?; @@ -111,7 +105,3 @@ pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { tx.commit().await.map_err_anyhow() } - -pub async fn save_candles() { - unimplemented!("TODO"); -} diff --git a/candle-creator/src/database/insert.rs b/candle-creator/src/database/insert.rs index dfd2f68..4ec62a3 100644 --- a/candle-creator/src/database/insert.rs +++ b/candle-creator/src/database/insert.rs @@ -1,16 +1,13 @@ use chrono::Utc; -use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; +use sqlx::{Pool, Postgres}; use std::{ - collections::hash_map::DefaultHasher, + collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, - time::{Duration, Instant}, + time::Instant, }; use tokio::sync::mpsc::{error::TryRecvError, Receiver}; -use crate::{ - trade_fetching::parsing::OpenBookFillEventLog, - utils::{AnyhowWrap, Config}, -}; +use crate::{trade_fetching::parsing::OpenBookFillEventLog, utils::AnyhowWrap}; use super::Candle; @@ -20,13 +17,12 @@ pub async fn persist_fill_events( ) { loop { let start = Instant::now(); - let mut write_batch = Vec::new(); + let mut write_batch = HashMap::new(); while write_batch.len() < 10 || start.elapsed().as_secs() > 10 { match fill_receiver.try_recv() { Ok(event) => { - if !write_batch.contains(&event) { - // O(n) - write_batch.push(event) + if !write_batch.contains_key(&event) { + write_batch.insert(event, 0); } } Err(TryRecvError::Empty) => break, @@ -48,9 +44,9 @@ pub async fn persist_fill_events( } } -fn build_fills_upsert_statement(events: Vec) -> String { +fn build_fills_upsert_statement(events: HashMap) -> String { let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); - for (idx, event) in events.iter().enumerate() { + for (idx, event) in events.keys().enumerate() { let mut hasher = DefaultHasher::new(); event.hash(&mut hasher); let val_str = format!( @@ -79,7 +75,6 @@ fn build_fills_upsert_statement(events: Vec) -> String { let handle_conflict = "ON CONFLICT (id) DO UPDATE SET market=excluded.market"; stmt = format!("{} {}", stmt, handle_conflict); - print!("{}", stmt); stmt } diff --git a/candle-creator/src/database/mod.rs b/candle-creator/src/database/mod.rs index 4de60e3..bee634b 100644 --- a/candle-creator/src/database/mod.rs +++ b/candle-creator/src/database/mod.rs @@ -5,7 +5,7 @@ use num_traits::Zero; use sqlx::types::Decimal; use strum::EnumIter; -use crate::candle_batching::DAY; +use crate::candle_batching::day; pub mod fetch; pub mod initialize; @@ -69,7 +69,7 @@ impl Resolution { Resolution::R1h => Duration::hours(1), Resolution::R2h => Duration::hours(2), Resolution::R4h => Duration::hours(4), - Resolution::R1d => DAY(), + Resolution::R1d => day(), } } } @@ -115,7 +115,7 @@ pub struct PgOpenBookFill { pub native_fee_or_rebate: Decimal, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MarketInfo { pub name: String, pub address: String, diff --git a/candle-creator/src/main.rs b/candle-creator/src/main.rs index a8ce495..817a4be 100644 --- a/candle-creator/src/main.rs +++ b/candle-creator/src/main.rs @@ -1,13 +1,19 @@ +use std::{collections::HashMap, str::FromStr}; + use crate::{ candle_batching::batch_candles, - database::{fetch::fetch_latest_finished_candle, insert::persist_candles, Candle, Resolution}, - trade_fetching::{parsing::OpenBookFillEventLog, scrape::fetch_market_infos}, + database::{ + insert::{persist_candles, persist_fill_events}, + Candle, + }, + trade_fetching::{ + backfill::backfill, + parsing::OpenBookFillEventLog, + scrape::{fetch_market_infos, scrape}, + }, utils::Config, }; -use database::{ - fetch::fetch_earliest_fill, - initialize::{connect_to_database, setup_database}, -}; +use database::initialize::{connect_to_database, setup_database}; use dotenv; use solana_sdk::pubkey::Pubkey; use tokio::sync::mpsc; @@ -25,41 +31,52 @@ async fn main() -> anyhow::Result<()> { let database_url: String = dotenv::var("DATABASE_URL").unwrap(); let config = Config { - rpc_url, + rpc_url: rpc_url.clone(), database_url, max_pg_pool_connections: 5, }; let markets = utils::load_markets("/Users/dboures/dev/openbook-candles/markets.json"); let market_infos = fetch_market_infos(&config, markets).await?; - println!("{:?}", market_infos); + let mut target_markets = HashMap::new(); + for m in market_infos.clone() { + target_markets.insert(Pubkey::from_str(&m.address)?, 0); + } + println!("{:?}", target_markets); let pool = connect_to_database(&config).await?; - // setup_database(&pool, market_infos).await?; + setup_database(&pool).await?; - // let (fill_sender, fill_receiver) = mpsc::channel::(1000); + let (fill_sender, fill_receiver) = mpsc::channel::(1000); - // tokio::spawn(async move { - // trade_fetching::scrape::scrape(&config, fill_sender.clone()).await; TODO: send the vec, it's okay - // }); - - // database::database::handle_fill_events(&pool, fill_receiver).await; - - // trade_fetching::websocket::listen_logs().await?; - - let (candle_sender, candle_receiver) = mpsc::channel::>(1000); - - let batch_pool = pool.clone(); + let bf_sender = fill_sender.clone(); + let targets = target_markets.clone(); tokio::spawn(async move { - batch_candles(batch_pool, &candle_sender, market_infos).await; + backfill(&rpc_url.clone(), &bf_sender, &targets).await; }); - let persist_pool = pool.clone(); + tokio::spawn(async move { + scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay + }); + + let fills_pool = pool.clone(); + tokio::spawn(async move { + persist_fill_events(&fills_pool, fill_receiver).await; + }); + + // let (candle_sender, candle_receiver) = mpsc::channel::>(1000); + + // let batch_pool = pool.clone(); // tokio::spawn(async move { - persist_candles(persist_pool, candle_receiver).await; + // batch_candles(batch_pool, &candle_sender, market_infos).await; // }); - loop {} + // let persist_pool = pool.clone(); + // // tokio::spawn(async move { + // persist_candles(persist_pool, candle_receiver).await; + // // }); + + loop {} // tokio drop if one thread drops or something Ok(()) } diff --git a/candle-creator/src/trade_fetching/backfill.rs b/candle-creator/src/trade_fetching/backfill.rs new file mode 100644 index 0000000..c1734e4 --- /dev/null +++ b/candle-creator/src/trade_fetching/backfill.rs @@ -0,0 +1,66 @@ +use chrono::{DateTime, Duration, NaiveDateTime, Utc}; +use solana_client::{rpc_client::RpcClient, rpc_config::RpcTransactionConfig}; +use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature}; +use solana_transaction_status::UiTransactionEncoding; +use std::collections::HashMap; +use tokio::sync::mpsc::Sender; + +use crate::trade_fetching::scrape::scrape_transactions; + +use super::parsing::OpenBookFillEventLog; + +pub async fn backfill( + rpc_url: &String, + fill_sender: &Sender, + target_markets: &HashMap, +) { + let rpc_client = RpcClient::new_with_commitment(rpc_url, CommitmentConfig::processed()); + + println!("backfill started"); + let mut before_slot: Option = None; + let end_time = (Utc::now() - Duration::days(1)).timestamp(); + loop { + let last_sig_option = + scrape_transactions(&rpc_client, before_slot, None, fill_sender, target_markets).await; + + if last_sig_option.is_none() { + println!("last sig is none"); + continue; + } + + let txn_config = RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Base64), + commitment: Some(CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }; + match rpc_client.get_transaction_with_config(&last_sig_option.unwrap(), txn_config) { + Ok(txn) => { + let unix_sig_time = rpc_client.get_block_time(txn.slot).unwrap(); + if unix_sig_time < end_time { + break; + } + let time_left = backfill_time_left(unix_sig_time, end_time); + println!( + "{} minutes ~ {} days remaining in the backfill\n", + time_left.num_minutes(), + time_left.num_days() + ); + } + Err(e) => { + println!("error: {:?}", e); + continue; + } + } + before_slot = last_sig_option; + } + + print!("Backfill complete \n"); +} + +fn backfill_time_left(current_time: i64, backfill_end: i64) -> Duration { + let naive_cur = NaiveDateTime::from_timestamp_millis(current_time * 1000).unwrap(); + let naive_bf = NaiveDateTime::from_timestamp_millis(backfill_end * 1000).unwrap(); + let cur_date = DateTime::::from_utc(naive_cur, Utc); + let bf_date = DateTime::::from_utc(naive_bf, Utc); + cur_date - bf_date +} diff --git a/candle-creator/src/trade_fetching/mod.rs b/candle-creator/src/trade_fetching/mod.rs index 07ba1f9..fc1f1bc 100644 --- a/candle-creator/src/trade_fetching/mod.rs +++ b/candle-creator/src/trade_fetching/mod.rs @@ -1,3 +1,4 @@ +pub mod backfill; pub mod parsing; pub mod scrape; pub mod websocket; diff --git a/candle-creator/src/trade_fetching/parsing.rs b/candle-creator/src/trade_fetching/parsing.rs index 6eb7a49..45c9f15 100644 --- a/candle-creator/src/trade_fetching/parsing.rs +++ b/candle-creator/src/trade_fetching/parsing.rs @@ -2,7 +2,7 @@ use solana_client::client_error::Result as ClientResult; use solana_transaction_status::{ option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta, }; -use std::io::Error; +use std::{collections::HashMap, io::Error}; use anchor_lang::{event, AnchorDeserialize, AnchorSerialize}; use solana_sdk::pubkey::Pubkey; @@ -10,7 +10,7 @@ use solana_sdk::pubkey::Pubkey; const PROGRAM_DATA: &str = "Program data: "; #[event] -#[derive(Debug, Clone, PartialEq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct OpenBookFillEventLog { pub market: Pubkey, pub open_orders: Pubkey, @@ -84,6 +84,7 @@ pub struct MarketState { pub fn parse_trades_from_openbook_txns( txns: &mut Vec>, + target_markets: &HashMap, ) -> Vec { let mut fills_vector = Vec::::new(); for txn in txns.iter_mut() { @@ -92,7 +93,7 @@ pub fn parse_trades_from_openbook_txns( if let Some(m) = &t.transaction.meta { match &m.log_messages { OptionSerializer::Some(logs) => { - match parse_openbook_fills_from_logs(logs) { + match parse_openbook_fills_from_logs(logs, target_markets) { Some(mut events) => fills_vector.append(&mut events), None => {} } @@ -108,7 +109,10 @@ pub fn parse_trades_from_openbook_txns( fills_vector } -fn parse_openbook_fills_from_logs(logs: &Vec) -> Option> { +fn parse_openbook_fills_from_logs( + logs: &Vec, + target_markets: &HashMap, +) -> Option> { let mut fills_vector = Vec::::new(); for l in logs { match l.strip_prefix(PROGRAM_DATA) { @@ -123,7 +127,9 @@ fn parse_openbook_fills_from_logs(logs: &Vec) -> Option { - fills_vector.push(e); + if target_markets.contains_key(&e.market) { + fills_vector.push(e); + } } _ => continue, } diff --git a/candle-creator/src/trade_fetching/scrape.rs b/candle-creator/src/trade_fetching/scrape.rs index c840728..766ce2a 100644 --- a/candle-creator/src/trade_fetching/scrape.rs +++ b/candle-creator/src/trade_fetching/scrape.rs @@ -10,7 +10,7 @@ use solana_sdk::{ }; use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding}; use spl_token::state::Mint; -use std::{collections::HashMap, str::FromStr, time::Duration}; +use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use tokio::sync::mpsc::Sender; use crate::{ @@ -21,58 +21,39 @@ use crate::{ use super::parsing::{parse_trades_from_openbook_txns, OpenBookFillEventLog}; -pub async fn scrape(config: &Config, fill_sender: Sender) { +pub async fn scrape( + config: &Config, + fill_sender: &Sender, + target_markets: &HashMap, +) { let url = &config.rpc_url; let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed()); let before_slot = None; loop { - scrape_transactions(&rpc_client, before_slot, &fill_sender).await; - - print!("Ding fires are done \n\n"); - tokio::time::sleep(Duration::from_millis(500)).await; + scrape_transactions( + &rpc_client, + before_slot, + Some(150), + fill_sender, + target_markets, + ) + .await; + tokio::time::sleep(WaitDuration::from_millis(250)).await; } } -// pub async fn backfill(config: &Config, fill_sender: Sender) { -// let url = &config.rpc_url; -// let rpc_client = RpcClient::new_with_commitment(url, CommitmentConfig::processed()); - -// let mut before_slot: Option = None; - -// loop { -// let last_sig_option = scrape_transactions(&rpc_client, before_slot, &fill_sender).await; - -// if last_sig_option.is_none() { -// continue; -// } - -// match rpc_client.get_transaction(&last_sig_option.unwrap(), UiTransactionEncoding::Json) { -// Ok(txn) => { -// let unix_sig_time = rpc_client.get_block_time(txn.slot).unwrap(); -// if unix_sig_time > 0 { -// // TODO: is 3 months in past -// break; -// } -// println!("backfill at {}", unix_sig_time); -// } -// Err(_) => continue, -// } -// before_slot = last_sig_option; -// } - -// print!("Backfill complete \n"); -// } - pub async fn scrape_transactions( rpc_client: &RpcClient, - before_slot: Option, + before_sig: Option, + limit: Option, fill_sender: &Sender, + target_markets: &HashMap, ) -> Option { let rpc_config = GetConfirmedSignaturesForAddress2Config { - before: before_slot, + before: before_sig, until: None, - limit: Some(150), + limit, commitment: Some(CommitmentConfig::confirmed()), }; @@ -81,11 +62,24 @@ pub async fn scrape_transactions( rpc_config, ) { Ok(s) => s, - Err(_) => return before_slot, // TODO: add error log + Err(e) => { + println!("Error in get_signatures_for_address_with_config: {}", e); + return before_sig; + } }; + if sigs.len() == 0 { + println!("No signatures found"); + return before_sig; + } + + let last = sigs.last().clone().unwrap(); + let request_last_sig = Signature::from_str(&last.signature).unwrap(); + sigs.retain(|sig| sig.err.is_none()); - let last_sig = sigs.last().unwrap().clone(); // Failed here + if sigs.last().is_none() { + return Some(request_last_sig); + } let txn_config = RpcTransactionConfig { encoding: Some(UiTransactionEncoding::Json), @@ -103,7 +97,7 @@ pub async fn scrape_transactions( }) .collect::>>(); // TODO: am I actually getting all the txns? - let fills = parse_trades_from_openbook_txns(&mut txns); + let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); if fills.len() > 0 { for fill in fills.into_iter() { if let Err(_) = fill_sender.send(fill).await { @@ -112,7 +106,7 @@ pub async fn scrape_transactions( } } - Some(Signature::from_str(&last_sig.signature).unwrap()) + Some(request_last_sig) } pub async fn fetch_market_infos( @@ -181,9 +175,6 @@ pub async fn fetch_market_infos( .get_multiple_accounts_with_config(&mint_keys, rpc_config) .unwrap() .value; - // println!("{:?}", mint_results); - // println!("{:?}", mint_keys); - // println!("{:?}", mint_results.len()); for i in 0..mint_results.len() { let mut mint_account = mint_results[i].as_ref().unwrap().clone(); let mut mint_bytes: &[u8] = &mut mint_account.data[..]; diff --git a/candle-creator/src/trade_fetching/websocket.rs b/candle-creator/src/trade_fetching/websocket.rs index e66e287..2030c35 100644 --- a/candle-creator/src/trade_fetching/websocket.rs +++ b/candle-creator/src/trade_fetching/websocket.rs @@ -1,28 +1,28 @@ -use jsonrpc_core_client::transports::ws; +// use jsonrpc_core_client::transports::ws; -use anchor_client::{ - anchor_lang::{self, event, AnchorDeserialize, AnchorSerialize, Discriminator}, - ClientError as AnchorClientError, Cluster, -}; -use log::*; -use solana_account_decoder::UiAccountEncoding; -use solana_client::{ - pubsub_client::{PubsubClient, PubsubClientSubscription}, - rpc_config::{ - RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig, - RpcTransactionLogsFilter, - }, - rpc_response::{Response, RpcKeyedAccount, RpcLogsResponse}, -}; -use solana_rpc::rpc_pubsub::RpcSolPubSubClient; -use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair}; -use std::{io::Error, rc::Rc, str::FromStr, time::Duration}; +// use anchor_client::{ +// anchor_lang::{self, event, AnchorDeserialize, AnchorSerialize, Discriminator}, +// ClientError as AnchorClientError, Cluster, +// }; +// use log::*; +// use solana_account_decoder::UiAccountEncoding; +// use solana_client::{ +// pubsub_client::{PubsubClient, PubsubClientSubscription}, +// rpc_config::{ +// RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig, +// RpcTransactionLogsFilter, +// }, +// rpc_response::{Response, RpcKeyedAccount, RpcLogsResponse}, +// }; +// use solana_rpc::rpc_pubsub::RpcSolPubSubClient; +// use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair}; +// use std::{io::Error, rc::Rc, str::FromStr, time::Duration}; -use crate::utils::AnyhowWrap; -use crate::{ - database::initialize::{connect_to_database, setup_database}, - utils::Config, -}; +// use crate::utils::AnyhowWrap; +// use crate::{ +// database::initialize::{connect_to_database, setup_database}, +// utils::Config, +// }; // use super::parsing::parse_and_save_logs; diff --git a/candle-creator/src/utils/mod.rs b/candle-creator/src/utils/mod.rs index 8985279..0165580 100644 --- a/candle-creator/src/utils/mod.rs +++ b/candle-creator/src/utils/mod.rs @@ -1,5 +1,5 @@ use serde_derive::Deserialize; -use std::{fs::File, io::Read}; +use std::fs::File; pub trait AnyhowWrap { type Value;