From 53f55aa66940ef181c5e3f90073f2e18541490ca Mon Sep 17 00:00:00 2001 From: dboures Date: Sun, 12 Mar 2023 03:13:57 -0500 Subject: [PATCH] feat: can batch higher order candles --- .../candle_batching/higher_order_candles.rs | 163 +++++++++++++++++ .../{batcher.rs => minute_candles.rs} | 48 +---- candle-creator/src/candle_batching/mod.rs | 83 ++++++++- .../src/database/{fetchers.rs => fetch.rs} | 69 ++++++- candle-creator/src/database/initialize.rs | 117 ++++++++++++ .../src/database/{database.rs => insert.rs} | 169 ++++++++---------- candle-creator/src/database/mod.rs | 35 ++-- candle-creator/src/main.rs | 23 ++- .../src/trade_fetching/websocket.rs | 2 +- 9 files changed, 541 insertions(+), 168 deletions(-) create mode 100644 candle-creator/src/candle_batching/higher_order_candles.rs rename candle-creator/src/candle_batching/{batcher.rs => minute_candles.rs} (75%) rename candle-creator/src/database/{fetchers.rs => fetch.rs} (58%) create mode 100644 candle-creator/src/database/initialize.rs rename candle-creator/src/database/{database.rs => insert.rs} (60%) diff --git a/candle-creator/src/candle_batching/higher_order_candles.rs b/candle-creator/src/candle_batching/higher_order_candles.rs new file mode 100644 index 0000000..8853d6a --- /dev/null +++ b/candle-creator/src/candle_batching/higher_order_candles.rs @@ -0,0 +1,163 @@ +use chrono::{DateTime, DurationRound, Utc}; +use num_traits::Zero; +use sqlx::{types::Decimal, Pool, Postgres}; +use std::cmp::{max, min}; + +use crate::{ + candle_batching::DAY, + database::{ + fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle}, + Candle, Resolution, + }, +}; + +pub async fn batch_higher_order_candles( + pool: &Pool, + market_address_string: &str, + resolution: Resolution, +) -> anyhow::Result> { + let latest_candle = + fetch_latest_finished_candle(pool, market_address_string, resolution).await?; + + match latest_candle { + Some(candle) => { + let start_time = candle.end_time; + let end_time = start_time + DAY(); + // println!("candle.end_time: {:?}", candle.end_time); + // println!("start_time: {:?}", start_time); + let mut constituent_candles = fetch_candles_from( + pool, + market_address_string, + resolution.get_constituent_resolution(), + start_time, + end_time, + ) + .await?; + if constituent_candles.len() == 0 { + return Ok(Vec::new()); + } + let combined_candles = combine_into_higher_order_candles( + &mut constituent_candles, + resolution, + start_time, + candle, + ); + Ok(combined_candles) + } + None => { + let constituent_candle = fetch_earliest_candle( + pool, + market_address_string, + resolution.get_constituent_resolution(), + ) + .await?; + if constituent_candle.is_none() { + println!( + "Batching {}, but no candles found for: {:?}, {}", + resolution, + market_address_string, + resolution.get_constituent_resolution() + ); + return Ok(Vec::new()); + } + let start_time = constituent_candle + .unwrap() + .start_time + .duration_trunc(DAY())?; + let end_time = start_time + DAY(); + + let mut constituent_candles = fetch_candles_from( + pool, + market_address_string, + resolution.get_constituent_resolution(), + start_time, + end_time, + ) + .await?; + if constituent_candles.len() == 0 { + return Ok(Vec::new()); + } + + let seed_candle = constituent_candles[0].clone(); + let combined_candles = combine_into_higher_order_candles( + &mut constituent_candles, + resolution, + start_time, + seed_candle, + ); + + Ok(trim_zero_candles(combined_candles)) + } + } +} + +fn combine_into_higher_order_candles( + constituent_candles: &mut Vec, + target_resolution: Resolution, + st: DateTime, + seed_candle: Candle, +) -> Vec { + println!("target_resolution: {}", target_resolution); + + let duration = target_resolution.get_duration(); + + let candles_len = constituent_candles.len(); + + let empty_candle = + Candle::create_empty_candle(constituent_candles[0].market.clone(), target_resolution); + let mut combined_candles = + vec![empty_candle; (DAY().num_minutes() / duration.num_minutes()) as usize]; + + println!("candles_len: {}", candles_len); + + let mut con_iter = constituent_candles.iter_mut().peekable(); + let mut start_time = st.clone(); + let mut end_time = start_time + duration; + + let mut last_candle = seed_candle; + + for i in 0..combined_candles.len() { + combined_candles[i].open = last_candle.close; + combined_candles[i].low = last_candle.close; + combined_candles[i].close = last_candle.close; + combined_candles[i].high = last_candle.close; + + while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) { + let unit_candle = con_iter.next().unwrap(); + combined_candles[i].high = max(combined_candles[i].high, unit_candle.high); + combined_candles[i].low = min(combined_candles[i].low, unit_candle.low); + combined_candles[i].close = unit_candle.close; + combined_candles[i].volume += unit_candle.volume; + combined_candles[i].complete = unit_candle.complete; + combined_candles[i].end_time = unit_candle.end_time; + } + + combined_candles[i].start_time = start_time; + combined_candles[i].end_time = end_time; + + start_time = end_time; + end_time = end_time + duration; + + last_candle = combined_candles[i].clone(); + } + + combined_candles +} + +fn trim_zero_candles(mut c: Vec) -> Vec { + let mut i = 0; + while i < c.len() { + if c[i].open == Decimal::zero() + && c[i].high == Decimal::zero() + && c[i].low == Decimal::zero() + && c[i].close == Decimal::zero() + && c[i].volume == Decimal::zero() + && c[i].complete == true + { + c.remove(i); + } else { + i += 1; + } + } + c +} diff --git a/candle-creator/src/candle_batching/batcher.rs b/candle-creator/src/candle_batching/minute_candles.rs similarity index 75% rename from candle-creator/src/candle_batching/batcher.rs rename to candle-creator/src/candle_batching/minute_candles.rs index 734782d..b6e1f02 100644 --- a/candle-creator/src/candle_batching/batcher.rs +++ b/candle-creator/src/candle_batching/minute_candles.rs @@ -1,47 +1,17 @@ use std::cmp::{max, min}; -use chrono::{DateTime, Duration, DurationRound, SubsecRound, Utc}; +use chrono::{DateTime, Duration, DurationRound, Utc}; use num_traits::{FromPrimitive, Zero}; use sqlx::{types::Decimal, Pool, Postgres}; use crate::database::{ - fetchers::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, + fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, Candle, MarketInfo, PgOpenBookFill, Resolution, }; -pub async fn batch_candles(pool: &Pool, markets: Vec) { - let m = MarketInfo { - name: "BTC/USDC".to_owned(), - address: "A8YFbxQYFVqKZaoYJLLUVcQiWP7G2MeEgW5wsAQgMvFw".to_owned(), - base_decimals: 6, - quote_decimals: 6, - base_mint_key: "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU".to_owned(), - quote_mint_key: "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_owned(), - base_lot_size: 10_000, - quote_lot_size: 1, - }; - batch_for_market(&pool.clone(), m).await.unwrap(); +use super::DAY; - // for market in markets.iter() { - // tokio::spawn(async move { - // loop { - // batch_for_market(&pool.clone(), &market.address.clone()).await; - // } - // }); - // } -} - -async fn batch_for_market(pool: &Pool, market: MarketInfo) -> anyhow::Result<()> { - let candles = batch_1m_candles(pool, market).await?; - // println!("candles {:?}", candles[10]); - // database::insert_candles(pool, candles) - - // for resolution in Resolution.iter - - Ok(()) -} - -async fn batch_1m_candles( +pub async fn batch_1m_candles( pool: &Pool, market: MarketInfo, ) -> anyhow::Result> { @@ -53,7 +23,7 @@ async fn batch_1m_candles( Some(candle) => { let start_time = candle.end_time; let end_time = min( - start_time + Duration::hours(6), + start_time + DAY(), Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = @@ -80,7 +50,7 @@ async fn batch_1m_candles( .time .duration_trunc(Duration::minutes(1))?; let end_time = min( - start_time + Duration::hours(6), + start_time + DAY(), Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = @@ -108,7 +78,7 @@ fn combine_fills_into_1m_candles( let mut start_time = st.clone(); let mut end_time = start_time + Duration::minutes(1); - let mut last_price = maybe_last_price.unwrap_or(Decimal::zero()); + let mut last_price = maybe_last_price.unwrap_or(Decimal::zero()); // TODO: very first open is wrong for i in 0..candles.len() { candles[i].open = last_price; @@ -122,8 +92,6 @@ fn combine_fills_into_1m_candles( let (price, volume) = calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals); - println!("{:?}", price); - candles[i].close = price; candles[i].low = min(price, candles[i].low); candles[i].high = max(price, candles[i].high); @@ -136,8 +104,6 @@ fn combine_fills_into_1m_candles( candles[i].end_time = end_time; candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time); - println!("{:?}", candles[i]); - start_time = end_time; end_time = end_time + Duration::minutes(1); } diff --git a/candle-creator/src/candle_batching/mod.rs b/candle-creator/src/candle_batching/mod.rs index de0a2da..bbab621 100644 --- a/candle-creator/src/candle_batching/mod.rs +++ b/candle-creator/src/candle_batching/mod.rs @@ -1 +1,82 @@ -pub mod batcher; +pub mod higher_order_candles; +pub mod minute_candles; + +use std::cmp::{max, min}; + +use chrono::{DateTime, Duration, DurationRound, Utc}; +use num_traits::{FromPrimitive, Zero}; +use sqlx::{types::Decimal, Pool, Postgres}; +use strum::IntoEnumIterator; +use tokio::{sync::mpsc::Sender, time::sleep}; + +use crate::{ + candle_batching::minute_candles::batch_1m_candles, + database::{ + fetch::{ + fetch_candles_from, fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle, + }, + Candle, MarketInfo, PgOpenBookFill, Resolution, + }, +}; + +use self::higher_order_candles::batch_higher_order_candles; + +pub fn DAY() -> Duration { + Duration::days(1) +} + +pub async fn batch_candles( + pool: Pool, + candles_sender: &Sender>, + markets: Vec, +) { + // tokio spawn a taks for every market + + loop { + let m = MarketInfo { + name: "BTC/USDC".to_owned(), + address: "A8YFbxQYFVqKZaoYJLLUVcQiWP7G2MeEgW5wsAQgMvFw".to_owned(), + base_decimals: 6, + quote_decimals: 6, + base_mint_key: "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU".to_owned(), + quote_mint_key: "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_owned(), + base_lot_size: 10_000, + quote_lot_size: 1, + }; + + batch_for_market(&pool.clone(), candles_sender, m) + .await + .unwrap(); + + sleep(Duration::milliseconds(500).to_std().unwrap()).await; + } + + //loop +} + +async fn batch_for_market( + pool: &Pool, + candles_sender: &Sender>, + market: MarketInfo, +) -> anyhow::Result<()> { + let market_address = &market.address.clone(); + let candles = batch_1m_candles(pool, market).await?; + send_candles(candles, candles_sender).await; + + for resolution in Resolution::iter() { + if resolution == Resolution::R1m { + continue; + } + let candles = batch_higher_order_candles(pool, market_address, resolution).await?; + send_candles(candles, candles_sender).await; + } + Ok(()) +} + +async fn send_candles(candles: Vec, candles_sender: &Sender>) { + if candles.len() > 0 { + if let Err(_) = candles_sender.send(candles).await { + panic!("candles receiver dropped"); + } + } +} diff --git a/candle-creator/src/database/fetchers.rs b/candle-creator/src/database/fetch.rs similarity index 58% rename from candle-creator/src/database/fetchers.rs rename to candle-creator/src/database/fetch.rs index 57bfedd..02e8823 100644 --- a/candle-creator/src/database/fetchers.rs +++ b/candle-creator/src/database/fetch.rs @@ -5,8 +5,6 @@ use crate::{database::PgOpenBookFill, utils::AnyhowWrap}; use super::{Candle, Resolution}; -// use super::PgMarketInfo; - pub async fn fetch_earliest_fill( pool: &Pool, market_address_string: &str, @@ -90,4 +88,69 @@ pub async fn fetch_latest_finished_candle( .map_err_anyhow() } -// fetch_candles +pub async fn fetch_earliest_candle( + pool: &Pool, + market_address_string: &str, + resolution: Resolution, +) -> anyhow::Result> { + sqlx::query_as!( + Candle, + r#"SELECT + start_time as "start_time!", + end_time as "end_time!", + resolution as "resolution!", + market as "market!", + open as "open!", + close as "close!", + high as "high!", + low as "low!", + volume as "volume!", + complete as "complete!" + from candles + where market = $1 + and resolution = $2 + ORDER BY start_time asc LIMIT 1"#, + market_address_string, + resolution.to_string() + ) + .fetch_optional(pool) + .await + .map_err_anyhow() +} + +pub async fn fetch_candles_from( + pool: &Pool, + market_address_string: &str, + resolution: Resolution, + start_time: DateTime, + end_time: DateTime, +) -> anyhow::Result> { + sqlx::query_as!( + Candle, + r#"SELECT + start_time as "start_time!", + end_time as "end_time!", + resolution as "resolution!", + market as "market!", + open as "open!", + close as "close!", + high as "high!", + low as "low!", + volume as "volume!", + complete as "complete!" + from candles + where market = $1 + and resolution = $2 + and start_time >= $3 + and end_time <= $4 + and complete = true + ORDER BY start_time asc"#, + market_address_string, + resolution.to_string(), + start_time, + end_time + ) + .fetch_all(pool) + .await + .map_err_anyhow() +} diff --git a/candle-creator/src/database/initialize.rs b/candle-creator/src/database/initialize.rs new file mode 100644 index 0000000..bf1c35e --- /dev/null +++ b/candle-creator/src/database/initialize.rs @@ -0,0 +1,117 @@ +use chrono::Utc; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + time::{Duration, Instant}, +}; +use tokio::sync::mpsc::{error::TryRecvError, Receiver}; + +use crate::{ + trade_fetching::parsing::OpenBookFillEventLog, + utils::{AnyhowWrap, Config}, +}; + +use super::MarketInfo; + +pub async fn connect_to_database(config: &Config) -> anyhow::Result> { + loop { + let pool = PgPoolOptions::new() + .max_connections(config.max_pg_pool_connections) + .connect(&config.database_url) + .await; + if pool.is_ok() { + println!("Database connected"); + return pool.map_err_anyhow(); + } + println!("Failed to connect to database, retrying"); + tokio::time::sleep(Duration::from_millis(500)).await; + } +} + +pub async fn setup_database(pool: &Pool, markets: Vec) -> anyhow::Result<()> { + let candles_table_fut = create_candles_table(pool); + let fills_table_fut = create_fills_table(pool); + let result = tokio::try_join!(candles_table_fut, fills_table_fut); + match result { + Ok(_) => { + println!("Successfully configured database"); + Ok(()) + } + Err(e) => { + println!("Failed to configure database: {e}"); + Err(e) + } + } +} + +pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> { + let mut tx = pool.begin().await.map_err_anyhow()?; + + sqlx::query!( + "CREATE TABLE IF NOT EXISTS candles ( + id serial, + market text, + start_time timestamptz, + end_time timestamptz, + resolution text, + open numeric, + close numeric, + high numeric, + low numeric, + volume numeric, + complete bool + )", + ) + .execute(&mut tx) + .await?; + + sqlx::query!( + "CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market, start_time, resolution)" + ).execute(&mut tx).await?; + + sqlx::query!( + "ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market, start_time, resolution)" + ) + .execute(&mut tx) + .await?; + + tx.commit().await.map_err_anyhow() +} + +pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { + let mut tx = pool.begin().await.map_err_anyhow()?; + + sqlx::query!( + "CREATE TABLE IF NOT EXISTS fills ( + id numeric PRIMARY KEY, + time timestamptz not null, + market text not null, + open_orders text not null, + open_orders_owner text not null, + bid bool not null, + maker bool not null, + native_qty_paid numeric not null, + native_qty_received numeric not null, + native_fee_or_rebate numeric not null, + fee_tier text not null, + order_id text not null + )", + ) + .execute(&mut tx) + .await?; + + sqlx::query!("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)") + .execute(&mut tx) + .await?; + + sqlx::query!("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)") + .execute(&mut tx) + .await?; + + tx.commit().await.map_err_anyhow() +} + +pub async fn save_candles() { + unimplemented!("TODO"); +} diff --git a/candle-creator/src/database/database.rs b/candle-creator/src/database/insert.rs similarity index 60% rename from candle-creator/src/database/database.rs rename to candle-creator/src/database/insert.rs index 089b67c..dfd2f68 100644 --- a/candle-creator/src/database/database.rs +++ b/candle-creator/src/database/insert.rs @@ -12,105 +12,9 @@ use crate::{ utils::{AnyhowWrap, Config}, }; -use super::MarketInfo; +use super::Candle; -pub async fn connect_to_database(config: &Config) -> anyhow::Result> { - loop { - let pool = PgPoolOptions::new() - .max_connections(config.max_pg_pool_connections) - .connect(&config.database_url) - .await; - if pool.is_ok() { - println!("Database connected"); - return pool.map_err_anyhow(); - } - println!("Failed to connect to database, retrying"); - tokio::time::sleep(Duration::from_millis(500)).await; - } -} - -pub async fn setup_database(pool: &Pool, markets: Vec) -> anyhow::Result<()> { - let candles_table_fut = create_candles_table(pool); - let fills_table_fut = create_fills_table(pool); - let result = tokio::try_join!(candles_table_fut, fills_table_fut); - match result { - Ok(_) => { - println!("Successfully configured database"); - Ok(()) - } - Err(e) => { - println!("Failed to configure database: {e}"); - Err(e) - } - } -} - -pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> { - let mut tx = pool.begin().await.map_err_anyhow()?; - - sqlx::query( - "CREATE TABLE IF NOT EXISTS candles ( - id serial, - market text, - start_time timestamptz, - end_time timestamptz, - resolution text, - open numeric, - close numeric, - high numeric, - low numeric, - volume numeric, - complete bool - )", - ) - .execute(&mut tx) - .await?; - - sqlx::query( - "CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market, start_time, resolution)" - ).execute(&mut tx).await?; - - tx.commit().await.map_err_anyhow() -} - -pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { - let mut tx = pool.begin().await.map_err_anyhow()?; - - sqlx::query!( - "CREATE TABLE IF NOT EXISTS fills ( - id numeric PRIMARY KEY, - time timestamptz not null, - market text not null, - open_orders text not null, - open_orders_owner text not null, - bid bool not null, - maker bool not null, - native_qty_paid numeric not null, - native_qty_received numeric not null, - native_fee_or_rebate numeric not null, - fee_tier text not null, - order_id text not null - )", - ) - .execute(&mut tx) - .await?; - - sqlx::query!("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)") - .execute(&mut tx) - .await?; - - sqlx::query!("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)") - .execute(&mut tx) - .await?; - - tx.commit().await.map_err_anyhow() -} - -pub async fn save_candles() { - unimplemented!("TODO"); -} - -pub async fn handle_fill_events( +pub async fn persist_fill_events( pool: &Pool, mut fill_receiver: Receiver, ) { @@ -127,7 +31,7 @@ pub async fn handle_fill_events( } Err(TryRecvError::Empty) => break, Err(TryRecvError::Disconnected) => { - panic!("sender must stay alive") + panic!("Fills sender must stay alive") } }; } @@ -179,6 +83,73 @@ fn build_fills_upsert_statement(events: Vec) -> String { stmt } +pub async fn persist_candles(pool: Pool, mut candles_receiver: Receiver>) { + loop { + match candles_receiver.try_recv() { + Ok(candles) => { + if candles.len() == 0 { + continue; + } + print!("writing: {:?} candles to DB\n", candles.len()); + match candles.last() { + Some(c) => { + println!("{:?}\n\n", c.end_time) + } + None => {} + } + let upsert_statement = build_candes_upsert_statement(candles); + sqlx::query(&upsert_statement) + .execute(&pool) + .await + .map_err_anyhow() + .unwrap(); + } + Err(TryRecvError::Empty) => continue, + Err(TryRecvError::Disconnected) => { + panic!("Candles sender must stay alive") + } + }; + } +} + +fn build_candes_upsert_statement(candles: Vec) -> String { + let mut stmt = String::from("INSERT INTO candles (market, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES"); + for (idx, candle) in candles.iter().enumerate() { + let val_str = format!( + "(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {})", + candle.market, + candle.start_time.to_rfc3339(), + candle.end_time.to_rfc3339(), + candle.resolution, + candle.open, + candle.close, + candle.high, + candle.low, + candle.volume, + candle.complete, + ); + + if idx == 0 { + stmt = format!("{} {}", &stmt, val_str); + } else { + stmt = format!("{}, {}", &stmt, val_str); + } + } + + let handle_conflict = "ON CONFLICT (market, start_time, resolution) + DO UPDATE SET + open=excluded.open, + close=excluded.close, + high=excluded.high, + low=excluded.low, + volume=excluded.volume, + complete=excluded.complete + "; + + stmt = format!("{} {}", stmt, handle_conflict); + stmt +} + #[cfg(test)] mod tests { use super::*; diff --git a/candle-creator/src/database/mod.rs b/candle-creator/src/database/mod.rs index e0de84b..4de60e3 100644 --- a/candle-creator/src/database/mod.rs +++ b/candle-creator/src/database/mod.rs @@ -1,18 +1,21 @@ use std::fmt; -use chrono::{DateTime, NaiveDateTime, Utc}; +use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use num_traits::Zero; use sqlx::types::Decimal; use strum::EnumIter; -pub mod database; -pub mod fetchers; +use crate::candle_batching::DAY; + +pub mod fetch; +pub mod initialize; +pub mod insert; pub trait Summary { fn summarize(&self) -> String; } -#[derive(EnumIter)] +#[derive(EnumIter, Copy, Clone, Eq, PartialEq)] pub enum Resolution { R1m, R3m, @@ -23,7 +26,6 @@ pub enum Resolution { R2h, R4h, R1d, - R1w, } impl fmt::Display for Resolution { @@ -38,7 +40,6 @@ impl fmt::Display for Resolution { Resolution::R2h => write!(f, "2H"), Resolution::R4h => write!(f, "4H"), Resolution::R1d => write!(f, "1D"), - Resolution::R1w => write!(f, "1W"), } } } @@ -55,22 +56,20 @@ impl Resolution { Resolution::R2h => Resolution::R1h, Resolution::R4h => Resolution::R2h, Resolution::R1d => Resolution::R4h, - Resolution::R1w => Resolution::R1d, } } - pub fn get_constituent_resolution_factor(self) -> u8 { + pub fn get_duration(self) -> Duration { match self { - Resolution::R1m => panic!("have to use fills to make 1M candles"), - Resolution::R3m => 3, - Resolution::R5m => 5, - Resolution::R15m => 3, - Resolution::R30m => 2, - Resolution::R1h => 2, - Resolution::R2h => 2, - Resolution::R4h => 2, - Resolution::R1d => 6, - Resolution::R1w => 7, + Resolution::R1m => Duration::minutes(1), + Resolution::R3m => Duration::minutes(3), + Resolution::R5m => Duration::minutes(5), + Resolution::R15m => Duration::minutes(15), + Resolution::R30m => Duration::minutes(30), + Resolution::R1h => Duration::hours(1), + Resolution::R2h => Duration::hours(2), + Resolution::R4h => Duration::hours(4), + Resolution::R1d => DAY(), } } } diff --git a/candle-creator/src/main.rs b/candle-creator/src/main.rs index 6452989..a8ce495 100644 --- a/candle-creator/src/main.rs +++ b/candle-creator/src/main.rs @@ -1,11 +1,12 @@ use crate::{ - database::{fetchers::fetch_latest_finished_candle, Resolution}, + candle_batching::batch_candles, + database::{fetch::fetch_latest_finished_candle, insert::persist_candles, Candle, Resolution}, trade_fetching::{parsing::OpenBookFillEventLog, scrape::fetch_market_infos}, utils::Config, }; use database::{ - database::{connect_to_database, setup_database}, - fetchers::fetch_earliest_fill, + fetch::fetch_earliest_fill, + initialize::{connect_to_database, setup_database}, }; use dotenv; use solana_sdk::pubkey::Pubkey; @@ -39,14 +40,26 @@ async fn main() -> anyhow::Result<()> { // let (fill_sender, fill_receiver) = mpsc::channel::(1000); // tokio::spawn(async move { - // trade_fetching::scrape::scrape(&config, fill_sender.clone()).await; + // trade_fetching::scrape::scrape(&config, fill_sender.clone()).await; TODO: send the vec, it's okay // }); // database::database::handle_fill_events(&pool, fill_receiver).await; // trade_fetching::websocket::listen_logs().await?; - candle_batching::batcher::batch_candles(&pool, market_infos).await; + let (candle_sender, candle_receiver) = mpsc::channel::>(1000); + + let batch_pool = pool.clone(); + tokio::spawn(async move { + batch_candles(batch_pool, &candle_sender, market_infos).await; + }); + + let persist_pool = pool.clone(); + // tokio::spawn(async move { + persist_candles(persist_pool, candle_receiver).await; + // }); + + loop {} Ok(()) } diff --git a/candle-creator/src/trade_fetching/websocket.rs b/candle-creator/src/trade_fetching/websocket.rs index f91787e..e66e287 100644 --- a/candle-creator/src/trade_fetching/websocket.rs +++ b/candle-creator/src/trade_fetching/websocket.rs @@ -20,7 +20,7 @@ use std::{io::Error, rc::Rc, str::FromStr, time::Duration}; use crate::utils::AnyhowWrap; use crate::{ - database::database::{connect_to_database, setup_database}, + database::initialize::{connect_to_database, setup_database}, utils::Config, };