From ce4e9d1d3914b6ade023cddaa9b1e0d9878d5471 Mon Sep 17 00:00:00 2001 From: dboures Date: Sun, 26 Mar 2023 14:39:25 -0500 Subject: [PATCH 1/3] fix: try to use loops to fix timeouts --- .../candle_batching/higher_order_candles.rs | 2 +- src/worker/candle_batching/mod.rs | 37 ++++++++++--------- src/worker/main.rs | 20 ++++++---- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/src/worker/candle_batching/higher_order_candles.rs b/src/worker/candle_batching/higher_order_candles.rs index 9cb0644..8dfc115 100644 --- a/src/worker/candle_batching/higher_order_candles.rs +++ b/src/worker/candle_batching/higher_order_candles.rs @@ -91,7 +91,7 @@ fn combine_into_higher_order_candles( st: DateTime, seed_candle: Candle, ) -> Vec { - println!("target_resolution: {}", target_resolution); + // println!("target_resolution: {}", target_resolution); let duration = target_resolution.get_duration(); diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index 150f2b9..0cc6998 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -7,37 +7,40 @@ use strum::IntoEnumIterator; use tokio::{sync::mpsc::Sender, time::sleep}; use crate::{ - worker::candle_batching::minute_candles::batch_1m_candles, structs::{candle::Candle, markets::MarketInfo, resolution::Resolution}, + worker::candle_batching::minute_candles::batch_1m_candles, }; use self::higher_order_candles::batch_higher_order_candles; -pub async fn batch_candles( +pub async fn batch_for_market( pool: Pool, candles_sender: &Sender>, - markets: Vec, + market: &MarketInfo, ) { - let mut handles = vec![]; - for market in markets.into_iter() { + loop { let sender = candles_sender.clone(); let pool_clone = pool.clone(); let market_clone = market.clone(); - handles.push(tokio::spawn(async move { - loop { - batch_for_market(&pool_clone, &sender, &market_clone) - .await - .unwrap(); - - sleep(Duration::milliseconds(2000).to_std().unwrap()).await; - } - })); + loop { + sleep(Duration::milliseconds(2000).to_std().unwrap()).await; + match batch_inner(&pool_clone, &sender, &market_clone).await { + Ok(_) => {} + Err(e) => { + println!( + "Batching thread failed for {:?} with error: {:?}", + market_clone.name.clone(), + e + ); + break; + } + }; + } + println!("Restarting {:?} batching thread", market.name); } - - futures::future::join_all(handles).await; } -async fn batch_for_market( +async fn batch_inner( pool: &Pool, candles_sender: &Sender>, market: &MarketInfo, diff --git a/src/worker/main.rs b/src/worker/main.rs index a1a7ff1..1f11ae7 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,6 +1,4 @@ use dotenv; -use openbook_candles::worker::candle_batching::batch_candles; -use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::database::{ initialize::{connect_to_database, setup_database}, insert::{persist_candles, persist_fill_events}, @@ -9,10 +7,12 @@ use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEventLog; use openbook_candles::utils::Config; +use openbook_candles::worker::candle_batching::batch_for_market; +use openbook_candles::worker::trade_fetching::scrape::scrape; use solana_sdk::pubkey::Pubkey; +use std::env; use std::{collections::HashMap, str::FromStr}; use tokio::sync::mpsc; -use std::env; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> { }; let markets = load_markets(&path_to_markets_json); - let market_infos = fetch_market_infos(&config, markets).await?; + let market_infos = fetch_market_infos(&config, markets.clone()).await?; let mut target_markets = HashMap::new(); for m in market_infos.clone() { target_markets.insert(Pubkey::from_str(&m.address)?, 0); @@ -59,10 +59,14 @@ async fn main() -> anyhow::Result<()> { let (candle_sender, candle_receiver) = mpsc::channel::>(1000); - let batch_pool = pool.clone(); - handles.push(tokio::spawn(async move { - batch_candles(batch_pool, &candle_sender, market_infos).await; - })); + for market in market_infos.into_iter() { + let sender = candle_sender.clone(); + let batch_pool = pool.clone(); + handles.push(tokio::spawn(async move { + batch_for_market(batch_pool, &sender, &market).await; + println!("SOMETHING WENT WRONG"); + })); + } let persist_pool = pool.clone(); handles.push(tokio::spawn(async move { From bbf73b164b766c83019f4df780c8c968108f55b6 Mon Sep 17 00:00:00 2001 From: dboures Date: Mon, 27 Mar 2023 12:11:58 -0500 Subject: [PATCH 2/3] fix: fix connection timeouts --- src/database/fetch.rs | 34 ++++----- src/database/insert.rs | 80 +++++++++++++------- src/lib.rs | 2 +- src/server/candles.rs | 3 +- src/server/traders.rs | 6 +- src/worker/candle_batching/minute_candles.rs | 12 +-- src/worker/candle_batching/mod.rs | 16 ++-- src/worker/main.rs | 14 ++-- 8 files changed, 99 insertions(+), 68 deletions(-) diff --git a/src/database/fetch.rs b/src/database/fetch.rs index 6896d55..c6aa162 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -1,5 +1,5 @@ use chrono::{DateTime, Utc}; -use sqlx::{Pool, Postgres}; +use sqlx::{pool::PoolConnection, Postgres}; use crate::{ structs::{candle::Candle, openbook::PgOpenBookFill, resolution::Resolution, trader::PgTrader}, @@ -7,7 +7,7 @@ use crate::{ }; pub async fn fetch_earliest_fill( - pool: &Pool, + conn: &mut PoolConnection, market_address_string: &str, ) -> anyhow::Result> { sqlx::query_as!( @@ -25,13 +25,13 @@ pub async fn fetch_earliest_fill( ORDER BY time asc LIMIT 1"#, market_address_string ) - .fetch_optional(pool) + .fetch_optional(conn) .await .map_err_anyhow() } pub async fn fetch_fills_from( - pool: &Pool, + conn: &mut PoolConnection, market_address_string: &str, start_time: DateTime, end_time: DateTime, @@ -55,13 +55,13 @@ pub async fn fetch_fills_from( start_time, end_time ) - .fetch_all(pool) + .fetch_all(conn) .await .map_err_anyhow() } pub async fn fetch_latest_finished_candle( - pool: &Pool, + conn: &mut PoolConnection, market_name: &str, resolution: Resolution, ) -> anyhow::Result> { @@ -86,13 +86,13 @@ pub async fn fetch_latest_finished_candle( market_name, resolution.to_string() ) - .fetch_optional(pool) + .fetch_optional(conn) .await .map_err_anyhow() } pub async fn fetch_earliest_candle( - pool: &Pool, + conn: &mut PoolConnection, market_name: &str, resolution: Resolution, ) -> anyhow::Result> { @@ -116,13 +116,13 @@ pub async fn fetch_earliest_candle( market_name, resolution.to_string() ) - .fetch_optional(pool) + .fetch_optional(conn) .await .map_err_anyhow() } pub async fn fetch_candles_from( - pool: &Pool, + conn: &mut PoolConnection, market_name: &str, resolution: Resolution, start_time: DateTime, @@ -153,13 +153,13 @@ pub async fn fetch_candles_from( start_time, end_time ) - .fetch_all(pool) + .fetch_all(conn) .await .map_err_anyhow() } pub async fn fetch_tradingview_candles( - pool: &Pool, + conn: &mut PoolConnection, market_name: &str, resolution: Resolution, start_time: DateTime, @@ -189,13 +189,13 @@ pub async fn fetch_tradingview_candles( start_time, end_time ) - .fetch_all(pool) + .fetch_all(conn) .await .map_err_anyhow() } pub async fn fetch_top_traders_by_base_volume_from( - pool: &Pool, + conn: &mut PoolConnection, market_address_string: &str, start_time: DateTime, end_time: DateTime, @@ -225,13 +225,13 @@ LIMIT 10000"#, start_time, end_time ) - .fetch_all(pool) + .fetch_all(conn) .await .map_err_anyhow() } pub async fn fetch_top_traders_by_quote_volume_from( - pool: &Pool, + conn: &mut PoolConnection, market_address_string: &str, start_time: DateTime, end_time: DateTime, @@ -261,7 +261,7 @@ LIMIT 10000"#, start_time, end_time ) - .fetch_all(pool) + .fetch_all(conn) .await .map_err_anyhow() } diff --git a/src/database/insert.rs b/src/database/insert.rs index 36053c8..5fce11f 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,10 +1,10 @@ -use chrono::Utc; -use sqlx::{Pool, Postgres}; +use chrono::{Utc}; +use sqlx::{Pool, Postgres, Connection}; use std::{ collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, }; -use tokio::sync::mpsc::{error::TryRecvError, Receiver}; +use tokio::{sync::mpsc::{error::TryRecvError, Receiver}}; use crate::{ structs::{candle::Candle, openbook::OpenBookFillEventLog}, @@ -13,8 +13,9 @@ use crate::{ pub async fn persist_fill_events( pool: &Pool, - mut fill_receiver: Receiver, -) { + fill_receiver: &mut Receiver, +) -> anyhow::Result<()> { + let mut conn = pool.acquire().await.unwrap(); loop { let mut write_batch = HashMap::new(); while write_batch.len() < 10 { @@ -38,38 +39,61 @@ pub async fn persist_fill_events( } if write_batch.len() > 0 { - print!("writing: {:?} events to DB\n", write_batch.len()); - let upsert_statement = build_fills_upsert_statement(write_batch); - sqlx::query(&upsert_statement) - .execute(pool) - .await - .map_err_anyhow() - .unwrap(); + // print!("writing: {:?} events to DB\n", write_batch.len()); + + match conn.ping().await { + Ok(_) => { + let upsert_statement = build_fills_upsert_statement(write_batch); + sqlx::query(&upsert_statement) + .execute(&mut conn) + .await + .map_err_anyhow() + .unwrap(); + } + Err(_) => { + println!("Fills ping failed"); + break; + } + } } } + Ok(()) } -pub async fn persist_candles(pool: Pool, mut candles_receiver: Receiver>) { +pub async fn persist_candles( + pool: Pool, + candles_receiver: &mut Receiver>, +) -> anyhow::Result<()> { + let mut conn = pool.acquire().await.unwrap(); loop { - match candles_receiver.try_recv() { - Ok(candles) => { - if candles.len() == 0 { - continue; - } - print!("writing: {:?} candles to DB\n", candles.len()); - let upsert_statement = build_candes_upsert_statement(candles); - sqlx::query(&upsert_statement) - .execute(&pool) - .await - .map_err_anyhow() - .unwrap(); + match conn.ping().await { + Ok(_) => { + match candles_receiver.try_recv() { + Ok(candles) => { + if candles.len() == 0 { + continue; + } + // print!("writing: {:?} candles to DB\n", candles.len()); + let upsert_statement = build_candes_upsert_statement(candles); + sqlx::query(&upsert_statement) + .execute(&mut conn) + .await + .map_err_anyhow() + .unwrap(); + } + Err(TryRecvError::Empty) => continue, + Err(TryRecvError::Disconnected) => { + panic!("Candles sender must stay alive") + } + }; } - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Disconnected) => { - panic!("Candles sender must stay alive") + Err(_) => { + println!("Candle ping failed"); + break; } }; } + Ok(()) } fn build_fills_upsert_statement(events: HashMap) -> String { diff --git a/src/lib.rs b/src/lib.rs index 768631b..86b17e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -pub mod worker; pub mod database; pub mod structs; pub mod utils; +pub mod worker; diff --git a/src/server/candles.rs b/src/server/candles.rs index cb24d2f..8500d63 100644 --- a/src/server/candles.rs +++ b/src/server/candles.rs @@ -34,8 +34,9 @@ pub async fn get_candles( let from = to_timestampz(info.from); let to = to_timestampz(info.to); + let mut conn = context.pool.acquire().await.unwrap(); let candles = - match fetch_tradingview_candles(&context.pool, &info.market_name, resolution, from, to) + match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to) .await { Ok(c) => c, diff --git a/src/server/traders.rs b/src/server/traders.rs index 17618d3..50296ad 100644 --- a/src/server/traders.rs +++ b/src/server/traders.rs @@ -31,8 +31,9 @@ pub async fn get_top_traders_by_base_volume( let from = to_timestampz(info.from); let to = to_timestampz(info.to); + let mut conn = context.pool.acquire().await.unwrap(); let raw_traders = match fetch_top_traders_by_base_volume_from( - &context.pool, + &mut conn, &selected_market.address, from, to, @@ -70,8 +71,9 @@ pub async fn get_top_traders_by_quote_volume( let from = to_timestampz(info.from); let to = to_timestampz(info.to); + let mut conn = context.pool.acquire().await.unwrap(); let raw_traders = match fetch_top_traders_by_quote_volume_from( - &context.pool, + &mut conn, &selected_market.address, from, to, diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index 00263e8..baadf72 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -1,7 +1,7 @@ use std::cmp::{max, min}; use chrono::{DateTime, Duration, DurationRound, Utc}; -use sqlx::{types::Decimal, Pool, Postgres}; +use sqlx::{pool::PoolConnection, types::Decimal, Postgres}; use crate::{ database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, @@ -14,12 +14,12 @@ use crate::{ }; pub async fn batch_1m_candles( - pool: &Pool, + conn: &mut PoolConnection, market: &MarketInfo, ) -> anyhow::Result> { let market_name = &market.name; let market_address = &market.address; - let latest_candle = fetch_latest_finished_candle(pool, market_name, Resolution::R1m).await?; + let latest_candle = fetch_latest_finished_candle(conn, market_name, Resolution::R1m).await?; match latest_candle { Some(candle) => { @@ -28,7 +28,7 @@ pub async fn batch_1m_candles( start_time + day(), Utc::now().duration_trunc(Duration::minutes(1))?, ); - let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; + let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?; let candles = combine_fills_into_1m_candles( &mut fills, market, @@ -39,7 +39,7 @@ pub async fn batch_1m_candles( Ok(candles) } None => { - let earliest_fill = fetch_earliest_fill(pool, market_address).await?; + let earliest_fill = fetch_earliest_fill(conn, market_address).await?; if earliest_fill.is_none() { println!("No fills found for: {:?}", market_name); @@ -54,7 +54,7 @@ pub async fn batch_1m_candles( start_time + day(), Utc::now().duration_trunc(Duration::minutes(1))?, ); - let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; + let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?; let candles = combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); Ok(candles) diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index 0cc6998..f2d9088 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -2,7 +2,7 @@ pub mod higher_order_candles; pub mod minute_candles; use chrono::Duration; -use sqlx::{Pool, Postgres}; +use sqlx::{pool::PoolConnection, Pool, Postgres}; use strum::IntoEnumIterator; use tokio::{sync::mpsc::Sender, time::sleep}; @@ -17,14 +17,14 @@ pub async fn batch_for_market( pool: Pool, candles_sender: &Sender>, market: &MarketInfo, -) { +) -> anyhow::Result<()> { loop { let sender = candles_sender.clone(); - let pool_clone = pool.clone(); let market_clone = market.clone(); + let mut conn = pool.acquire().await?; loop { - sleep(Duration::milliseconds(2000).to_std().unwrap()).await; - match batch_inner(&pool_clone, &sender, &market_clone).await { + sleep(Duration::milliseconds(2000).to_std()?).await; + match batch_inner(&mut conn, &sender, &market_clone).await { Ok(_) => {} Err(e) => { println!( @@ -41,19 +41,19 @@ pub async fn batch_for_market( } async fn batch_inner( - pool: &Pool, + conn: &mut PoolConnection, candles_sender: &Sender>, market: &MarketInfo, ) -> anyhow::Result<()> { let market_name = &market.name.clone(); - let candles = batch_1m_candles(pool, market).await?; + let candles = batch_1m_candles(conn, market).await?; send_candles(candles, candles_sender).await; for resolution in Resolution::iter() { if resolution == Resolution::R1m { continue; } - let candles = batch_higher_order_candles(pool, market_name, resolution).await?; + let candles = batch_higher_order_candles(conn, market_name, resolution).await?; send_candles(candles, candles_sender).await; } Ok(()) diff --git a/src/worker/main.rs b/src/worker/main.rs index 1f11ae7..aa76f03 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -46,7 +46,7 @@ async fn main() -> anyhow::Result<()> { setup_database(&pool).await?; let mut handles = vec![]; - let (fill_sender, fill_receiver) = mpsc::channel::(1000); + let (fill_sender, mut fill_receiver) = mpsc::channel::(1000); handles.push(tokio::spawn(async move { scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay @@ -54,23 +54,27 @@ async fn main() -> anyhow::Result<()> { let fills_pool = pool.clone(); handles.push(tokio::spawn(async move { - persist_fill_events(&fills_pool, fill_receiver).await; + loop { + persist_fill_events(&fills_pool, &mut fill_receiver).await.unwrap(); + } })); - let (candle_sender, candle_receiver) = mpsc::channel::>(1000); + let (candle_sender, mut candle_receiver) = mpsc::channel::>(1000); for market in market_infos.into_iter() { let sender = candle_sender.clone(); let batch_pool = pool.clone(); handles.push(tokio::spawn(async move { - batch_for_market(batch_pool, &sender, &market).await; + batch_for_market(batch_pool, &sender, &market).await.unwrap(); println!("SOMETHING WENT WRONG"); })); } let persist_pool = pool.clone(); handles.push(tokio::spawn(async move { - persist_candles(persist_pool, candle_receiver).await; + loop { + persist_candles(persist_pool.clone(), &mut candle_receiver).await.unwrap(); + } })); futures::future::join_all(handles).await; From f509df05d5f18dc4b07ff8ddfb1dcedf7b6a9d5f Mon Sep 17 00:00:00 2001 From: dboures Date: Mon, 27 Mar 2023 14:01:48 -0500 Subject: [PATCH 3/3] fix: candle batching works with 0 fills --- sqlx-data.json | 227 ++++++------------ src/database/fetch.rs | 9 +- src/database/insert.rs | 6 +- src/server/candles.rs | 4 +- src/server/traders.rs | 36 ++- .../candle_batching/higher_order_candles.rs | 67 ++---- src/worker/candle_batching/minute_candles.rs | 10 +- src/worker/main.rs | 12 +- 8 files changed, 138 insertions(+), 233 deletions(-) diff --git a/sqlx-data.json b/sqlx-data.json index 08a5120..80a317a 100644 --- a/sqlx-data.json +++ b/sqlx-data.json @@ -151,158 +151,6 @@ }, "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n ORDER BY start_time asc" }, - "98d4c97eeb4f0d14fd9284a968e88837456117a1c1995df37e615902b482ff10": { - "describe": { - "columns": [ - { - "name": "start_time!", - "ordinal": 0, - "type_info": "Timestamptz" - }, - { - "name": "end_time!", - "ordinal": 1, - "type_info": "Timestamptz" - }, - { - "name": "resolution!", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "market_name!", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "open!", - "ordinal": 4, - "type_info": "Numeric" - }, - { - "name": "close!", - "ordinal": 5, - "type_info": "Numeric" - }, - { - "name": "high!", - "ordinal": 6, - "type_info": "Numeric" - }, - { - "name": "low!", - "ordinal": 7, - "type_info": "Numeric" - }, - { - "name": "volume!", - "ordinal": 8, - "type_info": "Numeric" - }, - { - "name": "complete!", - "ordinal": 9, - "type_info": "Bool" - } - ], - "nullable": [ - true, - true, - true, - true, - true, - true, - true, - true, - true, - true - ], - "parameters": { - "Left": [ - "Text", - "Text" - ] - } - }, - "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n ORDER BY start_time asc LIMIT 1" - }, - "a69ebc1820a6635f83a99fddc1bf8ae0c7a3184de6d38e129baa6397c61efc60": { - "describe": { - "columns": [ - { - "name": "start_time!", - "ordinal": 0, - "type_info": "Timestamptz" - }, - { - "name": "end_time!", - "ordinal": 1, - "type_info": "Timestamptz" - }, - { - "name": "resolution!", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "market_name!", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "open!", - "ordinal": 4, - "type_info": "Numeric" - }, - { - "name": "close!", - "ordinal": 5, - "type_info": "Numeric" - }, - { - "name": "high!", - "ordinal": 6, - "type_info": "Numeric" - }, - { - "name": "low!", - "ordinal": 7, - "type_info": "Numeric" - }, - { - "name": "volume!", - "ordinal": 8, - "type_info": "Numeric" - }, - { - "name": "complete!", - "ordinal": 9, - "type_info": "Bool" - } - ], - "nullable": [ - true, - true, - true, - true, - true, - true, - true, - true, - true, - true - ], - "parameters": { - "Left": [ - "Text", - "Text", - "Timestamptz", - "Timestamptz" - ] - } - }, - "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n and complete = true\n ORDER BY start_time asc" - }, "aee3a3e04f837bd62e263452bfbaf5d7dff271799c80d5efd22a54954ac212c4": { "describe": { "columns": [ @@ -449,6 +297,81 @@ }, "query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1 \n and maker = true\n ORDER BY time asc LIMIT 1" }, + "e367fec686c10d361f4a5ac014ca34ce68544dfe9cf32b79a6a78790e8c6d5cc": { + "describe": { + "columns": [ + { + "name": "start_time!", + "ordinal": 0, + "type_info": "Timestamptz" + }, + { + "name": "end_time!", + "ordinal": 1, + "type_info": "Timestamptz" + }, + { + "name": "resolution!", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "market_name!", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "open!", + "ordinal": 4, + "type_info": "Numeric" + }, + { + "name": "close!", + "ordinal": 5, + "type_info": "Numeric" + }, + { + "name": "high!", + "ordinal": 6, + "type_info": "Numeric" + }, + { + "name": "low!", + "ordinal": 7, + "type_info": "Numeric" + }, + { + "name": "volume!", + "ordinal": 8, + "type_info": "Numeric" + }, + { + "name": "complete!", + "ordinal": 9, + "type_info": "Bool" + } + ], + "nullable": [ + true, + true, + true, + true, + true, + true, + true, + true, + true, + true + ], + "parameters": { + "Left": [ + "Text", + "Text" + ] + } + }, + "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n ORDER BY start_time asc" + }, "e94788e9eb04534dc13a73f80255705fb39789caa6dfb43e8417471f8399bb85": { "describe": { "columns": [], diff --git a/src/database/fetch.rs b/src/database/fetch.rs index c6aa162..a49fdf9 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -91,11 +91,11 @@ pub async fn fetch_latest_finished_candle( .map_err_anyhow() } -pub async fn fetch_earliest_candle( +pub async fn fetch_earliest_candles( conn: &mut PoolConnection, market_name: &str, resolution: Resolution, -) -> anyhow::Result> { +) -> anyhow::Result> { sqlx::query_as!( Candle, r#"SELECT @@ -112,11 +112,11 @@ pub async fn fetch_earliest_candle( from candles where market_name = $1 and resolution = $2 - ORDER BY start_time asc LIMIT 1"#, + ORDER BY start_time asc"#, market_name, resolution.to_string() ) - .fetch_optional(conn) + .fetch_all(conn) .await .map_err_anyhow() } @@ -146,7 +146,6 @@ pub async fn fetch_candles_from( and resolution = $2 and start_time >= $3 and end_time <= $4 - and complete = true ORDER BY start_time asc"#, market_name, resolution.to_string(), diff --git a/src/database/insert.rs b/src/database/insert.rs index 5fce11f..02221af 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -1,10 +1,10 @@ -use chrono::{Utc}; -use sqlx::{Pool, Postgres, Connection}; +use chrono::Utc; +use sqlx::{Connection, Pool, Postgres}; use std::{ collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, }; -use tokio::{sync::mpsc::{error::TryRecvError, Receiver}}; +use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use crate::{ structs::{candle::Candle, openbook::OpenBookFillEventLog}, diff --git a/src/server/candles.rs b/src/server/candles.rs index 8500d63..37a075e 100644 --- a/src/server/candles.rs +++ b/src/server/candles.rs @@ -36,9 +36,7 @@ pub async fn get_candles( let mut conn = context.pool.acquire().await.unwrap(); let candles = - match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to) - .await - { + match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to).await { Ok(c) => c, Err(_) => return Err(ServerError::DbQueryError), }; diff --git a/src/server/traders.rs b/src/server/traders.rs index 50296ad..399524e 100644 --- a/src/server/traders.rs +++ b/src/server/traders.rs @@ -32,17 +32,13 @@ pub async fn get_top_traders_by_base_volume( let to = to_timestampz(info.to); let mut conn = context.pool.acquire().await.unwrap(); - let raw_traders = match fetch_top_traders_by_base_volume_from( - &mut conn, - &selected_market.address, - from, - to, - ) - .await - { - Ok(c) => c, - Err(_) => return Err(ServerError::DbQueryError), - }; + let raw_traders = + match fetch_top_traders_by_base_volume_from(&mut conn, &selected_market.address, from, to) + .await + { + Ok(c) => c, + Err(_) => return Err(ServerError::DbQueryError), + }; let traders = raw_traders .into_iter() @@ -72,17 +68,13 @@ pub async fn get_top_traders_by_quote_volume( let to = to_timestampz(info.to); let mut conn = context.pool.acquire().await.unwrap(); - let raw_traders = match fetch_top_traders_by_quote_volume_from( - &mut conn, - &selected_market.address, - from, - to, - ) - .await - { - Ok(c) => c, - Err(_) => return Err(ServerError::DbQueryError), - }; + let raw_traders = + match fetch_top_traders_by_quote_volume_from(&mut conn, &selected_market.address, from, to) + .await + { + Ok(c) => c, + Err(_) => return Err(ServerError::DbQueryError), + }; let traders = raw_traders .into_iter() diff --git a/src/worker/candle_batching/higher_order_candles.rs b/src/worker/candle_batching/higher_order_candles.rs index 8dfc115..e05130b 100644 --- a/src/worker/candle_batching/higher_order_candles.rs +++ b/src/worker/candle_batching/higher_order_candles.rs @@ -1,10 +1,9 @@ use chrono::{DateTime, Duration, DurationRound, Utc}; -use num_traits::Zero; -use sqlx::{types::Decimal, Pool, Postgres}; +use sqlx::{pool::PoolConnection, Postgres}; use std::cmp::{max, min}; use crate::{ - database::fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle}, + database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle}, structs::{ candle::Candle, resolution::{day, Resolution}, @@ -12,18 +11,18 @@ use crate::{ }; pub async fn batch_higher_order_candles( - pool: &Pool, + conn: &mut PoolConnection, market_name: &str, resolution: Resolution, ) -> anyhow::Result> { - let latest_candle = fetch_latest_finished_candle(pool, market_name, resolution).await?; + let latest_candle = fetch_latest_finished_candle(conn, market_name, resolution).await?; match latest_candle { Some(candle) => { let start_time = candle.end_time; let end_time = start_time + day(); let mut constituent_candles = fetch_candles_from( - pool, + conn, market_name, resolution.get_constituent_resolution(), start_time, @@ -42,32 +41,20 @@ pub async fn batch_higher_order_candles( Ok(combined_candles) } None => { - let constituent_candle = - fetch_earliest_candle(pool, market_name, resolution.get_constituent_resolution()) + let mut constituent_candles = + fetch_earliest_candles(conn, market_name, resolution.get_constituent_resolution()) .await?; - if constituent_candle.is_none() { - println!( - "Batching {}, but no candles found for: {:?}, {}", - resolution, - market_name, - resolution.get_constituent_resolution() - ); + if constituent_candles.len() == 0 { + // println!( + // "Batching {}, but no candles found for: {:?}, {}", + // resolution, + // market_name, + // resolution.get_constituent_resolution() + // ); return Ok(Vec::new()); } - let start_time = constituent_candle - .unwrap() - .start_time - .duration_trunc(day())?; - let end_time = start_time + day(); + let start_time = constituent_candles[0].start_time.duration_trunc(day())?; - let mut constituent_candles = fetch_candles_from( - pool, - market_name, - resolution.get_constituent_resolution(), - start_time, - end_time, - ) - .await?; if constituent_candles.len() == 0 { return Ok(Vec::new()); } @@ -80,7 +67,10 @@ pub async fn batch_higher_order_candles( seed_candle, ); - Ok(trim_zero_candles(combined_candles)) + Ok(trim_candles( + combined_candles, + constituent_candles[0].start_time, + )) } } } @@ -101,11 +91,10 @@ fn combine_into_higher_order_candles( ); let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap(); let candle_window = now - st; - let num_candles = if candle_window.num_minutes() % duration.num_minutes() == 0 { - (candle_window.num_minutes() / duration.num_minutes()) as usize + 1 - } else { - (candle_window.num_minutes() / duration.num_minutes()) as usize - }; + let num_candles = max( + 1, + (candle_window.num_minutes() / duration.num_minutes()) as usize + 1, + ); let mut combined_candles = vec![empty_candle; num_candles]; @@ -143,16 +132,10 @@ fn combine_into_higher_order_candles( combined_candles } -fn trim_zero_candles(mut c: Vec) -> Vec { +fn trim_candles(mut c: Vec, start_time: DateTime) -> Vec { let mut i = 0; while i < c.len() { - if c[i].open == Decimal::zero() - && c[i].high == Decimal::zero() - && c[i].low == Decimal::zero() - && c[i].close == Decimal::zero() - && c[i].volume == Decimal::zero() - && c[i].complete == true - { + if c[i].end_time <= start_time { c.remove(i); } else { i += 1; diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index baadf72..d4f8f0d 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -55,9 +55,13 @@ pub async fn batch_1m_candles( Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?; - let candles = - combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); - Ok(candles) + if fills.len() > 0 { + let candles = + combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); + Ok(candles) + } else { + Ok(Vec::new()) + } } } } diff --git a/src/worker/main.rs b/src/worker/main.rs index aa76f03..072d729 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -55,7 +55,9 @@ async fn main() -> anyhow::Result<()> { let fills_pool = pool.clone(); handles.push(tokio::spawn(async move { loop { - persist_fill_events(&fills_pool, &mut fill_receiver).await.unwrap(); + persist_fill_events(&fills_pool, &mut fill_receiver) + .await + .unwrap(); } })); @@ -65,7 +67,9 @@ async fn main() -> anyhow::Result<()> { let sender = candle_sender.clone(); let batch_pool = pool.clone(); handles.push(tokio::spawn(async move { - batch_for_market(batch_pool, &sender, &market).await.unwrap(); + batch_for_market(batch_pool, &sender, &market) + .await + .unwrap(); println!("SOMETHING WENT WRONG"); })); } @@ -73,7 +77,9 @@ async fn main() -> anyhow::Result<()> { let persist_pool = pool.clone(); handles.push(tokio::spawn(async move { loop { - persist_candles(persist_pool.clone(), &mut candle_receiver).await.unwrap(); + persist_candles(persist_pool.clone(), &mut candle_receiver) + .await + .unwrap(); } }));