From f509df05d5f18dc4b07ff8ddfb1dcedf7b6a9d5f Mon Sep 17 00:00:00 2001 From: dboures Date: Mon, 27 Mar 2023 14:01:48 -0500 Subject: [PATCH] fix: candle batching works with 0 fills --- sqlx-data.json | 227 ++++++------------ src/database/fetch.rs | 9 +- src/database/insert.rs | 6 +- src/server/candles.rs | 4 +- src/server/traders.rs | 36 ++- .../candle_batching/higher_order_candles.rs | 67 ++---- src/worker/candle_batching/minute_candles.rs | 10 +- src/worker/main.rs | 12 +- 8 files changed, 138 insertions(+), 233 deletions(-) diff --git a/sqlx-data.json b/sqlx-data.json index 08a5120..80a317a 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -151,158 +151,6 @@ }, "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n ORDER BY start_time asc" }, - "98d4c97eeb4f0d14fd9284a968e88837456117a1c1995df37e615902b482ff10": { - "describe": { - "columns": [ - { - "name": "start_time!", - "ordinal": 0, - "type_info": "Timestamptz" - }, - { - "name": "end_time!", - "ordinal": 1, - "type_info": "Timestamptz" - }, - { - "name": "resolution!", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "market_name!", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "open!", - "ordinal": 4, - "type_info": "Numeric" - }, - { - "name": "close!", - "ordinal": 5, - "type_info": "Numeric" - }, - { - "name": "high!", - "ordinal": 6, - "type_info": "Numeric" - }, - { - "name": "low!", - "ordinal": 7, - "type_info": "Numeric" - }, - { - "name": "volume!", - "ordinal": 8, - "type_info": "Numeric" - }, - { - "name": "complete!", - "ordinal": 9, - "type_info": "Bool" - } - ], - "nullable": [ - true, - true, - true, - true, - true, - true, - true, - true, - true, - true - ], - "parameters": { - "Left": [ - "Text", - "Text" - ] - } - }, - "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n ORDER BY start_time asc LIMIT 1" - }, - "a69ebc1820a6635f83a99fddc1bf8ae0c7a3184de6d38e129baa6397c61efc60": { - "describe": { - "columns": [ - { - "name": "start_time!", - "ordinal": 0, - "type_info": "Timestamptz" - }, - { - "name": "end_time!", - "ordinal": 1, - "type_info": "Timestamptz" - }, - { - "name": "resolution!", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "market_name!", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "open!", - "ordinal": 4, - "type_info": "Numeric" - }, - { - "name": "close!", - "ordinal": 5, - "type_info": "Numeric" - }, - { - "name": "high!", - "ordinal": 6, - "type_info": "Numeric" - }, - { - "name": "low!", - "ordinal": 7, - "type_info": "Numeric" - }, - { - "name": "volume!", - "ordinal": 8, - "type_info": "Numeric" - }, - { - "name": "complete!", - "ordinal": 9, - "type_info": "Bool" - } - ], - "nullable": [ - true, - true, - true, - true, - true, - true, - true, - true, - true, - true - ], - "parameters": { - "Left": [ - "Text", - "Text", - "Timestamptz", - "Timestamptz" - ] - } - }, - "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n and complete = true\n ORDER BY start_time asc" - }, "aee3a3e04f837bd62e263452bfbaf5d7dff271799c80d5efd22a54954ac212c4": { "describe": { "columns": [ @@ -449,6 +297,81 @@ }, "query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1 \n and maker = true\n ORDER BY time asc LIMIT 1" }, + "e367fec686c10d361f4a5ac014ca34ce68544dfe9cf32b79a6a78790e8c6d5cc": { + "describe": { + "columns": [ + { + "name": "start_time!", + "ordinal": 0, + "type_info": "Timestamptz" + }, + { + "name": "end_time!", + "ordinal": 1, + "type_info": "Timestamptz" + }, + { + "name": "resolution!", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "market_name!", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "open!", + "ordinal": 4, + "type_info": "Numeric" + }, + { + "name": "close!", + "ordinal": 5, + "type_info": "Numeric" + }, + { + "name": "high!", + "ordinal": 6, + "type_info": "Numeric" + }, + { + "name": "low!", + "ordinal": 7, + "type_info": "Numeric" + }, + { + "name": "volume!", + "ordinal": 8, + "type_info": "Numeric" + }, + { + "name": "complete!", + "ordinal": 9, + "type_info": "Bool" + } + ], + "nullable": [ + true, + true, + true, + true, + true, + true, + true, + true, + true, + true + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + } + }, + "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n ORDER BY start_time asc" + }, "e94788e9eb04534dc13a73f80255705fb39789caa6dfb43e8417471f8399bb85": { "describe": { "columns": [], diff --git a/src/database/fetch.rs b/src/database/fetch.rs index c6aa162..a49fdf9 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -91,11 +91,11 @@ pub async fn fetch_latest_finished_candle( .map_err_anyhow() } -pub async fn fetch_earliest_candle( +pub async fn fetch_earliest_candles( conn: &mut PoolConnection, market_name: &str, resolution: Resolution, -) -> anyhow::Result> { +) -> anyhow::Result> { sqlx::query_as!( Candle, r#"SELECT @@ -112,11 +112,11 @@ pub async fn fetch_earliest_candle( from candles where market_name = $1 and resolution = $2 - ORDER BY start_time asc LIMIT 1"#, + ORDER BY start_time asc"#, market_name, resolution.to_string() ) - .fetch_optional(conn) + .fetch_all(conn) .await .map_err_anyhow() } @@ -146,7 +146,6 @@ pub async fn fetch_candles_from( and resolution = $2 and start_time >= $3 and end_time <= $4 - and complete = true ORDER BY start_time asc"#, market_name, resolution.to_string(), diff --git a/src/database/insert.rs b/src/database/insert.rs index 5fce11f..02221af 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,10 +1,10 @@ -use chrono::{Utc}; -use sqlx::{Pool, Postgres, Connection}; +use chrono::Utc; +use sqlx::{Connection, Pool, Postgres}; use std::{ collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, }; -use tokio::{sync::mpsc::{error::TryRecvError, Receiver}}; +use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use crate::{ structs::{candle::Candle, openbook::OpenBookFillEventLog}, diff --git a/src/server/candles.rs b/src/server/candles.rs index 8500d63..37a075e 100644 --- a/src/server/candles.rs +++ b/src/server/candles.rs @@ -36,9 +36,7 @@ pub async fn get_candles( let mut conn = context.pool.acquire().await.unwrap(); let candles = - match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to) - .await - { + match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to).await { Ok(c) => c, Err(_) => return Err(ServerError::DbQueryError), }; diff --git a/src/server/traders.rs b/src/server/traders.rs index 50296ad..399524e 100644 --- a/src/server/traders.rs +++ b/src/server/traders.rs @@ -32,17 +32,13 @@ pub async fn get_top_traders_by_base_volume( let to = to_timestampz(info.to); let mut conn = context.pool.acquire().await.unwrap(); - let raw_traders = match fetch_top_traders_by_base_volume_from( - &mut conn, - &selected_market.address, - from, - to, - ) - .await - { - Ok(c) => c, - Err(_) => return Err(ServerError::DbQueryError), - }; + let raw_traders = + match fetch_top_traders_by_base_volume_from(&mut conn, &selected_market.address, from, to) + .await + { + Ok(c) => c, + Err(_) => return Err(ServerError::DbQueryError), + }; let traders = raw_traders .into_iter() @@ -72,17 +68,13 @@ pub async fn get_top_traders_by_quote_volume( let to = to_timestampz(info.to); let mut conn = context.pool.acquire().await.unwrap(); - let raw_traders = match fetch_top_traders_by_quote_volume_from( - &mut conn, - &selected_market.address, - from, - to, - ) - .await - { - Ok(c) => c, - Err(_) => return Err(ServerError::DbQueryError), - }; + let raw_traders = + match fetch_top_traders_by_quote_volume_from(&mut conn, &selected_market.address, from, to) + .await + { + Ok(c) => c, + Err(_) => return Err(ServerError::DbQueryError), + }; let traders = raw_traders .into_iter() diff --git a/src/worker/candle_batching/higher_order_candles.rs b/src/worker/candle_batching/higher_order_candles.rs index 8dfc115..e05130b 100644 --- a/src/worker/candle_batching/higher_order_candles.rs +++ b/src/worker/candle_batching/higher_order_candles.rs @@ -1,10 +1,9 @@ use chrono::{DateTime, Duration, DurationRound, Utc}; -use num_traits::Zero; -use sqlx::{types::Decimal, Pool, Postgres}; +use sqlx::{pool::PoolConnection, Postgres}; use std::cmp::{max, min}; use crate::{ - database::fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle}, + database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle}, structs::{ candle::Candle, resolution::{day, Resolution}, @@ -12,18 +11,18 @@ use crate::{ }; pub async fn batch_higher_order_candles( - pool: &Pool, + conn: &mut PoolConnection, market_name: &str, resolution: Resolution, ) -> anyhow::Result> { - let latest_candle = fetch_latest_finished_candle(pool, market_name, resolution).await?; + let latest_candle = fetch_latest_finished_candle(conn, market_name, resolution).await?; match latest_candle { Some(candle) => { let start_time = candle.end_time; let end_time = start_time + day(); let mut constituent_candles = fetch_candles_from( - pool, + conn, market_name, resolution.get_constituent_resolution(), start_time, @@ -42,32 +41,20 @@ pub async fn batch_higher_order_candles( Ok(combined_candles) } None => { - let constituent_candle = - fetch_earliest_candle(pool, market_name, resolution.get_constituent_resolution()) + let mut constituent_candles = + fetch_earliest_candles(conn, market_name, resolution.get_constituent_resolution()) .await?; - if constituent_candle.is_none() { - println!( - "Batching {}, but no candles found for: {:?}, {}", - resolution, - market_name, - resolution.get_constituent_resolution() - ); + if constituent_candles.len() == 0 { + // println!( + // "Batching {}, but no candles found for: {:?}, {}", + // resolution, + // market_name, + // resolution.get_constituent_resolution() + // ); return Ok(Vec::new()); } - let start_time = constituent_candle - .unwrap() - .start_time - .duration_trunc(day())?; - let end_time = start_time + day(); + let start_time = constituent_candles[0].start_time.duration_trunc(day())?; - let mut constituent_candles = fetch_candles_from( - pool, - market_name, - resolution.get_constituent_resolution(), - start_time, - end_time, - ) - .await?; if constituent_candles.len() == 0 { return Ok(Vec::new()); } @@ -80,7 +67,10 @@ pub async fn batch_higher_order_candles( seed_candle, ); - Ok(trim_zero_candles(combined_candles)) + Ok(trim_candles( + combined_candles, + constituent_candles[0].start_time, + )) } } } @@ -101,11 +91,10 @@ fn combine_into_higher_order_candles( ); let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap(); let candle_window = now - st; - let num_candles = if candle_window.num_minutes() % duration.num_minutes() == 0 { - (candle_window.num_minutes() / duration.num_minutes()) as usize + 1 - } else { - (candle_window.num_minutes() / duration.num_minutes()) as usize - }; + let num_candles = max( + 1, + (candle_window.num_minutes() / duration.num_minutes()) as usize + 1, + ); let mut combined_candles = vec![empty_candle; num_candles]; @@ -143,16 +132,10 @@ fn combine_into_higher_order_candles( combined_candles } -fn trim_zero_candles(mut c: Vec) -> Vec { +fn trim_candles(mut c: Vec, start_time: DateTime) -> Vec { let mut i = 0; while i < c.len() { - if c[i].open == Decimal::zero() - && c[i].high == Decimal::zero() - && c[i].low == Decimal::zero() - && c[i].close == Decimal::zero() - && c[i].volume == Decimal::zero() - && c[i].complete == true - { + if c[i].end_time <= start_time { c.remove(i); } else { i += 1; diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index baadf72..d4f8f0d 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -55,9 +55,13 @@ pub async fn batch_1m_candles( Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?; - let candles = - combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); - Ok(candles) + if fills.len() > 0 { + let candles = + combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); + Ok(candles) + } else { + Ok(Vec::new()) + } } } } diff --git a/src/worker/main.rs b/src/worker/main.rs index aa76f03..072d729 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -55,7 +55,9 @@ async fn main() -> anyhow::Result<()> { let fills_pool = pool.clone(); handles.push(tokio::spawn(async move { loop { - persist_fill_events(&fills_pool, &mut fill_receiver).await.unwrap(); + persist_fill_events(&fills_pool, &mut fill_receiver) + .await + .unwrap(); } })); @@ -65,7 +67,9 @@ async fn main() -> anyhow::Result<()> { let sender = candle_sender.clone(); let batch_pool = pool.clone(); handles.push(tokio::spawn(async move { - batch_for_market(batch_pool, &sender, &market).await.unwrap(); + batch_for_market(batch_pool, &sender, &market) + .await + .unwrap(); println!("SOMETHING WENT WRONG"); })); } @@ -73,7 +77,9 @@ async fn main() -> anyhow::Result<()> { let persist_pool = pool.clone(); handles.push(tokio::spawn(async move { loop { - persist_candles(persist_pool.clone(), &mut candle_receiver).await.unwrap(); + persist_candles(persist_pool.clone(), &mut candle_receiver) + .await + .unwrap(); } }));