From c5d3a1f4ab9f8b73a7756cd4e4447fe872194010 Mon Sep 17 00:00:00 2001 From: dboures Date: Tue, 14 Mar 2023 00:31:00 -0500 Subject: [PATCH] feat: markets in batcher spawn own threads --- .../candle_batching/higher_order_candles.rs | 17 +++--- .../candle_batching/minute_candles.rs | 17 ++++-- src/candle_creation/candle_batching/mod.rs | 39 ++++++-------- src/database/insert.rs | 52 ++++++++----------- src/structs/resolution.rs | 16 +++--- 5 files changed, 69 insertions(+), 72 deletions(-) diff --git a/src/candle_creation/candle_batching/higher_order_candles.rs b/src/candle_creation/candle_batching/higher_order_candles.rs index effe564..9cb0644 100644 --- a/src/candle_creation/candle_batching/higher_order_candles.rs +++ b/src/candle_creation/candle_batching/higher_order_candles.rs @@ -1,4 +1,4 @@ -use chrono::{DateTime, DurationRound, Utc}; +use chrono::{DateTime, Duration, DurationRound, Utc}; use num_traits::Zero; use sqlx::{types::Decimal, Pool, Postgres}; use std::cmp::{max, min}; @@ -22,8 +22,6 @@ pub async fn batch_higher_order_candles( Some(candle) => { let start_time = candle.end_time; let end_time = start_time + day(); - // println!("candle.end_time: {:?}", candle.end_time); - // println!("start_time: {:?}", start_time); let mut constituent_candles = fetch_candles_from( pool, market_name, @@ -97,16 +95,19 @@ fn combine_into_higher_order_candles( let duration = target_resolution.get_duration(); - let candles_len = constituent_candles.len(); - let empty_candle = Candle::create_empty_candle( constituent_candles[0].market_name.clone(), target_resolution, ); - let mut combined_candles = - vec![empty_candle; (day().num_minutes() / duration.num_minutes()) as usize]; + let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap(); + let candle_window = now - st; + let num_candles = if candle_window.num_minutes() % duration.num_minutes() == 0 { + (candle_window.num_minutes() / duration.num_minutes()) as usize + 1 + } else { + (candle_window.num_minutes() / duration.num_minutes()) as usize + }; - println!("candles_len: {}", candles_len); + let mut combined_candles = vec![empty_candle; num_candles]; let mut con_iter = constituent_candles.iter_mut().peekable(); let mut start_time = st.clone(); diff --git a/src/candle_creation/candle_batching/minute_candles.rs b/src/candle_creation/candle_batching/minute_candles.rs index 80be14f..c45de28 100644 --- a/src/candle_creation/candle_batching/minute_candles.rs +++ b/src/candle_creation/candle_batching/minute_candles.rs @@ -1,7 +1,6 @@ use std::cmp::{max, min}; use chrono::{DateTime, Duration, DurationRound, Utc}; -use num_traits::{FromPrimitive, Zero}; use sqlx::{types::Decimal, Pool, Postgres}; use crate::{ @@ -16,7 +15,7 @@ use crate::{ pub async fn batch_1m_candles( pool: &Pool, - market: MarketInfo, + market: &MarketInfo, ) -> anyhow::Result> { let market_name = &market.name; let market_address = &market.address; @@ -65,12 +64,12 @@ pub async fn batch_1m_candles( fn combine_fills_into_1m_candles( fills: &mut Vec, - market: MarketInfo, + market: &MarketInfo, st: DateTime, et: DateTime, maybe_last_price: Option, ) -> Vec { - let empty_candle = Candle::create_empty_candle(market.name, Resolution::R1m); + let empty_candle = Candle::create_empty_candle(market.name.clone(), Resolution::R1m); let minutes = (et - st).num_minutes(); let mut candles = vec![empty_candle; minutes as usize]; @@ -79,7 +78,15 @@ fn combine_fills_into_1m_candles( let mut start_time = st.clone(); let mut end_time = start_time + Duration::minutes(1); - let mut last_price = maybe_last_price.unwrap_or(Decimal::zero()); // TODO: very first open is wrong + let mut last_price = match maybe_last_price { + Some(p) => p, + None => { + let first = fills_iter.peek().clone().unwrap(); + let (price, _) = + calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals); + price + } + }; for i in 0..candles.len() { candles[i].open = last_price; diff --git a/src/candle_creation/candle_batching/mod.rs b/src/candle_creation/candle_batching/mod.rs index 231fb8d..bf535d6 100644 --- a/src/candle_creation/candle_batching/mod.rs +++ b/src/candle_creation/candle_batching/mod.rs @@ -18,36 +18,31 @@ pub async fn batch_candles( candles_sender: &Sender>, markets: Vec, ) { - // TODO: tokio spawn a taks for every market + let mut handles = vec![]; + for market in markets.into_iter() { + let sender = candles_sender.clone(); + let pool_clone = pool.clone(); + let market_clone = market.clone(); + handles.push(tokio::spawn(async move { + loop { + batch_for_market(&pool_clone, &sender, &market_clone) + .await + .unwrap(); - loop { - let m = MarketInfo { - name: "BTC/USDC".to_owned(), - address: "A8YFbxQYFVqKZaoYJLLUVcQiWP7G2MeEgW5wsAQgMvFw".to_owned(), - base_decimals: 6, - quote_decimals: 6, - base_mint_key: "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU".to_owned(), - quote_mint_key: "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_owned(), - base_lot_size: 10_000, - quote_lot_size: 1, - }; - - batch_for_market(&pool.clone(), candles_sender, m) - .await - .unwrap(); - - sleep(Duration::milliseconds(500).to_std().unwrap()).await; + sleep(Duration::milliseconds(2000).to_std().unwrap()).await; + } + })); } - //loop + futures::future::join_all(handles).await; } async fn batch_for_market( pool: &Pool, candles_sender: &Sender>, - market: MarketInfo, + market: &MarketInfo, ) -> anyhow::Result<()> { - let market_address = &market.address.clone(); + let market_name = &market.name.clone(); let candles = batch_1m_candles(pool, market).await?; send_candles(candles, candles_sender).await; @@ -55,7 +50,7 @@ async fn batch_for_market( if resolution == Resolution::R1m { continue; } - let candles = batch_higher_order_candles(pool, market_address, resolution).await?; + let candles = batch_higher_order_candles(pool, market_name, resolution).await?; send_candles(candles, candles_sender).await; } Ok(()) diff --git a/src/database/insert.rs b/src/database/insert.rs index b98dc2d..12d7ea4 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -45,6 +45,29 @@ pub async fn persist_fill_events( } } +pub async fn persist_candles(pool: Pool, mut candles_receiver: Receiver>) { + loop { + match candles_receiver.try_recv() { + Ok(candles) => { + if candles.len() == 0 { + continue; + } + print!("writing: {:?} candles to DB\n", candles.len()); + let upsert_statement = build_candes_upsert_statement(candles); + sqlx::query(&upsert_statement) + .execute(&pool) + .await + .map_err_anyhow() + .unwrap(); + } + Err(TryRecvError::Empty) => continue, + Err(TryRecvError::Disconnected) => { + panic!("Candles sender must stay alive") + } + }; + } +} + fn build_fills_upsert_statement(events: HashMap) -> String { let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); for (idx, event) in events.keys().enumerate() { @@ -79,35 +102,6 @@ fn build_fills_upsert_statement(events: HashMap) -> St stmt } -pub async fn persist_candles(pool: Pool, mut candles_receiver: Receiver>) { - loop { - match candles_receiver.try_recv() { - Ok(candles) => { - if candles.len() == 0 { - continue; - } - print!("writing: {:?} candles to DB\n", candles.len()); - match candles.last() { - Some(c) => { - println!("{:?}\n\n", c.end_time) - } - None => {} - } - let upsert_statement = build_candes_upsert_statement(candles); - sqlx::query(&upsert_statement) - .execute(&pool) - .await - .map_err_anyhow() - .unwrap(); - } - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Disconnected) => { - panic!("Candles sender must stay alive") - } - }; - } -} - fn build_candes_upsert_statement(candles: Vec) -> String { let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES"); for (idx, candle) in candles.iter().enumerate() { diff --git a/src/structs/resolution.rs b/src/structs/resolution.rs index 8ffa9dc..13c2ebb 100644 --- a/src/structs/resolution.rs +++ b/src/structs/resolution.rs @@ -66,14 +66,14 @@ impl Resolution { pub fn from_str(v: &str) -> Result { match v { - "1" => Ok(Resolution::R1m), - "3" => Ok(Resolution::R3m), - "5" => Ok(Resolution::R5m), - "15" => Ok(Resolution::R15m), - "30" => Ok(Resolution::R30m), - "60" => Ok(Resolution::R1h), - "120" => Ok(Resolution::R2h), - "240" => Ok(Resolution::R4h), + "1M" => Ok(Resolution::R1m), + "3M" => Ok(Resolution::R3m), + "5M" => Ok(Resolution::R5m), + "15M" => Ok(Resolution::R15m), + "30M" => Ok(Resolution::R30m), + "1H" => Ok(Resolution::R1h), + "2H" => Ok(Resolution::R2h), + "4H" => Ok(Resolution::R4h), "D" => Ok(Resolution::R1d), _ => Err(()), }