diff --git a/Cargo.toml b/Cargo.toml index 50a543e..002d5fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,12 @@ name = "server" path = "src/server/main.rs" [[bin]] -name = "backfill" -path = "src/backfill/main.rs" +name = "backfill-trades" +path = "src/backfill-trades/main.rs" + +[[bin]] +name = "backfill-candles" +path = "src/backfill-candles/main.rs" [dependencies] tokio = { version = "1", features = ["full"] } diff --git a/src/backfill-candles/main.rs b/src/backfill-candles/main.rs new file mode 100644 index 0000000..6251a9a --- /dev/null +++ b/src/backfill-candles/main.rs @@ -0,0 +1,70 @@ +use anchor_lang::prelude::Pubkey; +use chrono::{DateTime, Duration, NaiveDateTime, Utc}; +use deadpool_postgres::Object; +use futures::future::join_all; +use openbook_candles::{ + database::{ + initialize::connect_to_database, + insert::{build_candles_upsert_statement, persist_candles}, + }, + structs::{ + candle::Candle, + markets::{fetch_market_infos, load_markets}, + openbook::OpenBookFillEvent, + resolution::Resolution, + }, + utils::{AnyhowWrap, Config}, + worker::candle_batching::{ + higher_order_candles::backfill_batch_higher_order_candles, + minute_candles::backfill_batch_1m_candles, + }, +}; +use std::{collections::HashMap, env, str::FromStr}; +use strum::IntoEnumIterator; +use tokio::sync::mpsc::{self, Sender}; + +#[tokio::main(flavor = "multi_thread", worker_threads = 10)] +async fn main() -> anyhow::Result<()> { + dotenv::dotenv().ok(); + let args: Vec = env::args().collect(); + assert!(args.len() == 2); + + let path_to_markets_json = &args[1]; + let rpc_url: String = dotenv::var("RPC_URL").unwrap(); + + let config = Config { + rpc_url: rpc_url.clone(), + }; + let markets = load_markets(&path_to_markets_json); + let market_infos = fetch_market_infos(&config, markets.clone()).await?; + println!("Backfilling candles for {:?}", markets); + + let pool = connect_to_database().await?; + for market in market_infos.into_iter() { + let client = pool.get().await?; + let minute_candles = backfill_batch_1m_candles(&pool, &market).await?; + save_candles(minute_candles, client).await?; + + for resolution in Resolution::iter() { + if resolution == Resolution::R1m { + continue; + } + let higher_order_candles = + backfill_batch_higher_order_candles(&pool, &market.name, resolution).await?; + let client = pool.get().await?; + save_candles(higher_order_candles, client).await?; + } + } + Ok(()) +} + +async fn save_candles(candles: Vec, client: Object) -> anyhow::Result<()> { + if candles.len() > 0 { + let upsert_statement = build_candles_upsert_statement(candles); + client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + } + Ok(()) +} diff --git a/src/backfill/main.rs b/src/backfill-trades/main.rs similarity index 94% rename from src/backfill/main.rs rename to src/backfill-trades/main.rs index f772ff2..ec9534d 100644 --- a/src/backfill/main.rs +++ b/src/backfill-trades/main.rs @@ -5,7 +5,7 @@ use openbook_candles::{ database::{initialize::connect_to_database, insert::persist_fill_events}, structs::{ markets::{fetch_market_infos, load_markets}, - openbook::OpenBookFillEventLog, + openbook::OpenBookFillEvent, }, utils::Config, worker::trade_fetching::parsing::parse_trades_from_openbook_txns, @@ -19,7 +19,7 @@ use solana_transaction_status::UiTransactionEncoding; use std::{collections::HashMap, env, str::FromStr}; use tokio::sync::mpsc::{self, Sender}; -#[tokio::main] +#[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); let args: Vec = env::args().collect(); @@ -40,7 +40,7 @@ async fn main() -> anyhow::Result<()> { println!("{:?}", target_markets); let pool = connect_to_database().await?; - let (fill_sender, mut fill_receiver) = mpsc::channel::(1000); + let (fill_sender, mut fill_receiver) = mpsc::channel::(1000); tokio::spawn(async move { loop { @@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> { pub async fn backfill( rpc_url: String, - fill_sender: &Sender, + fill_sender: &Sender, target_markets: &HashMap, ) -> anyhow::Result<()> { println!("backfill started"); @@ -75,7 +75,6 @@ pub async fn backfill( Some((last, time, sigs)) => { now_time = time; before_sig = Some(last); - let time_left = backfill_time_left(now_time, end_time); println!( "{} minutes ~ {} days remaining in the backfill\n", @@ -134,6 +133,7 @@ pub async fn get_signatures( return None; } let last = sigs.last().unwrap(); + // println!("{:?}", last.block_time.unwrap()); return Some(( Signature::from_str(&last.signature).unwrap(), last.block_time.unwrap(), @@ -144,7 +144,7 @@ pub async fn get_signatures( pub async fn get_transactions( rpc_client: &RpcClient, mut sigs: Vec, - fill_sender: &Sender, + fill_sender: &Sender, target_markets: &HashMap, ) { sigs.retain(|sig| sig.err.is_none()); @@ -173,6 +173,7 @@ pub async fn get_transactions( let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); if fills.len() > 0 { for fill in fills.into_iter() { + // println!("Sending fill {:?}", fill); if let Err(_) = fill_sender.send(fill).await { panic!("receiver dropped"); } diff --git a/src/database/fetch.rs b/src/database/fetch.rs index 05ef59d..1fe3f6e 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -111,6 +111,8 @@ pub async fn fetch_latest_finished_candle( } } +/// Fetches all of the candles for the given market and resoultion, starting from the earliest. +/// Note that this function will fetch ALL candles. pub async fn fetch_earliest_candles( pool: &Pool, market_name: &str, diff --git a/src/database/insert.rs b/src/database/insert.rs index 89db430..cab9a10 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -72,7 +72,7 @@ pub async fn persist_candles( continue; } // print!("writing: {:?} candles to DB\n", candles.len()); - let upsert_statement = build_candes_upsert_statement(candles); + let upsert_statement = build_candles_upsert_statement(candles); client .execute(&upsert_statement, &[]) .await @@ -128,7 +128,7 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin stmt } -fn build_candes_upsert_statement(candles: Vec) -> String { +pub fn build_candles_upsert_statement(candles: Vec) -> String { let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES"); for (idx, candle) in candles.iter().enumerate() { let val_str = format!( diff --git a/src/worker/candle_batching/higher_order_candles.rs b/src/worker/candle_batching/higher_order_candles.rs index 913b365..f1edbf1 100644 --- a/src/worker/candle_batching/higher_order_candles.rs +++ b/src/worker/candle_batching/higher_order_candles.rs @@ -144,3 +144,29 @@ fn trim_candles(mut c: Vec, start_time: DateTime) -> Vec { } c } + +pub async fn backfill_batch_higher_order_candles( + pool: &Pool, + market_name: &str, + resolution: Resolution, +) -> anyhow::Result> { + let mut constituent_candles = + fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?; + if constituent_candles.len() == 0 { + return Ok(vec![]); + } + let start_time = constituent_candles[0].start_time.duration_trunc(day())?; + + let seed_candle = constituent_candles[0].clone(); + let combined_candles = combine_into_higher_order_candles( + &mut constituent_candles, + resolution, + start_time, + seed_candle, + ); + + Ok(trim_candles( + combined_candles, + constituent_candles[0].start_time, + )) +} diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index fa6710f..31eb78e 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -120,3 +120,38 @@ fn combine_fills_into_1m_candles( candles } + +/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end. +pub async fn backfill_batch_1m_candles( + pool: &Pool, + market: &MarketInfo, +) -> anyhow::Result> { + let market_name = &market.name; + let market_address = &market.address; + let mut candles = vec![]; + + let earliest_fill = fetch_earliest_fill(pool, &market.address).await?; + if earliest_fill.is_none() { + println!("No fills found for: {:?}", &market_name); + return Ok(candles); + } + + let mut start_time = earliest_fill + .unwrap() + .time + .duration_trunc(Duration::minutes(1))?; + while start_time < Utc::now() { + let end_time = min( + start_time + day(), + Utc::now().duration_trunc(Duration::minutes(1))?, + ); + let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; + if fills.len() > 0 { + let mut minute_candles = + combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); + candles.append(&mut minute_candles); + } + start_time += day() + } + Ok(candles) +}