fix: candle batching works with 0 fills
This commit is contained in:
parent
bbf73b164b
commit
f509df05d5
227
sqlx-data.json
227
sqlx-data.json
|
@ -151,158 +151,6 @@
|
|||
},
|
||||
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n ORDER BY start_time asc"
|
||||
},
|
||||
"98d4c97eeb4f0d14fd9284a968e88837456117a1c1995df37e615902b482ff10": {
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"name": "start_time!",
|
||||
"ordinal": 0,
|
||||
"type_info": "Timestamptz"
|
||||
},
|
||||
{
|
||||
"name": "end_time!",
|
||||
"ordinal": 1,
|
||||
"type_info": "Timestamptz"
|
||||
},
|
||||
{
|
||||
"name": "resolution!",
|
||||
"ordinal": 2,
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"name": "market_name!",
|
||||
"ordinal": 3,
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"name": "open!",
|
||||
"ordinal": 4,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "close!",
|
||||
"ordinal": 5,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "high!",
|
||||
"ordinal": 6,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "low!",
|
||||
"ordinal": 7,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "volume!",
|
||||
"ordinal": 8,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "complete!",
|
||||
"ordinal": 9,
|
||||
"type_info": "Bool"
|
||||
}
|
||||
],
|
||||
"nullable": [
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Text"
|
||||
]
|
||||
}
|
||||
},
|
||||
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n ORDER BY start_time asc LIMIT 1"
|
||||
},
|
||||
"a69ebc1820a6635f83a99fddc1bf8ae0c7a3184de6d38e129baa6397c61efc60": {
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"name": "start_time!",
|
||||
"ordinal": 0,
|
||||
"type_info": "Timestamptz"
|
||||
},
|
||||
{
|
||||
"name": "end_time!",
|
||||
"ordinal": 1,
|
||||
"type_info": "Timestamptz"
|
||||
},
|
||||
{
|
||||
"name": "resolution!",
|
||||
"ordinal": 2,
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"name": "market_name!",
|
||||
"ordinal": 3,
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"name": "open!",
|
||||
"ordinal": 4,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "close!",
|
||||
"ordinal": 5,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "high!",
|
||||
"ordinal": 6,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "low!",
|
||||
"ordinal": 7,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "volume!",
|
||||
"ordinal": 8,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "complete!",
|
||||
"ordinal": 9,
|
||||
"type_info": "Bool"
|
||||
}
|
||||
],
|
||||
"nullable": [
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Text",
|
||||
"Timestamptz",
|
||||
"Timestamptz"
|
||||
]
|
||||
}
|
||||
},
|
||||
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n and complete = true\n ORDER BY start_time asc"
|
||||
},
|
||||
"aee3a3e04f837bd62e263452bfbaf5d7dff271799c80d5efd22a54954ac212c4": {
|
||||
"describe": {
|
||||
"columns": [
|
||||
|
@ -449,6 +297,81 @@
|
|||
},
|
||||
"query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1 \n and maker = true\n ORDER BY time asc LIMIT 1"
|
||||
},
|
||||
"e367fec686c10d361f4a5ac014ca34ce68544dfe9cf32b79a6a78790e8c6d5cc": {
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"name": "start_time!",
|
||||
"ordinal": 0,
|
||||
"type_info": "Timestamptz"
|
||||
},
|
||||
{
|
||||
"name": "end_time!",
|
||||
"ordinal": 1,
|
||||
"type_info": "Timestamptz"
|
||||
},
|
||||
{
|
||||
"name": "resolution!",
|
||||
"ordinal": 2,
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"name": "market_name!",
|
||||
"ordinal": 3,
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"name": "open!",
|
||||
"ordinal": 4,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "close!",
|
||||
"ordinal": 5,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "high!",
|
||||
"ordinal": 6,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "low!",
|
||||
"ordinal": 7,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "volume!",
|
||||
"ordinal": 8,
|
||||
"type_info": "Numeric"
|
||||
},
|
||||
{
|
||||
"name": "complete!",
|
||||
"ordinal": 9,
|
||||
"type_info": "Bool"
|
||||
}
|
||||
],
|
||||
"nullable": [
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true,
|
||||
true
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Text"
|
||||
]
|
||||
}
|
||||
},
|
||||
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n ORDER BY start_time asc"
|
||||
},
|
||||
"e94788e9eb04534dc13a73f80255705fb39789caa6dfb43e8417471f8399bb85": {
|
||||
"describe": {
|
||||
"columns": [],
|
||||
|
|
|
@ -91,11 +91,11 @@ pub async fn fetch_latest_finished_candle(
|
|||
.map_err_anyhow()
|
||||
}
|
||||
|
||||
pub async fn fetch_earliest_candle(
|
||||
pub async fn fetch_earliest_candles(
|
||||
conn: &mut PoolConnection<Postgres>,
|
||||
market_name: &str,
|
||||
resolution: Resolution,
|
||||
) -> anyhow::Result<Option<Candle>> {
|
||||
) -> anyhow::Result<Vec<Candle>> {
|
||||
sqlx::query_as!(
|
||||
Candle,
|
||||
r#"SELECT
|
||||
|
@ -112,11 +112,11 @@ pub async fn fetch_earliest_candle(
|
|||
from candles
|
||||
where market_name = $1
|
||||
and resolution = $2
|
||||
ORDER BY start_time asc LIMIT 1"#,
|
||||
ORDER BY start_time asc"#,
|
||||
market_name,
|
||||
resolution.to_string()
|
||||
)
|
||||
.fetch_optional(conn)
|
||||
.fetch_all(conn)
|
||||
.await
|
||||
.map_err_anyhow()
|
||||
}
|
||||
|
@ -146,7 +146,6 @@ pub async fn fetch_candles_from(
|
|||
and resolution = $2
|
||||
and start_time >= $3
|
||||
and end_time <= $4
|
||||
and complete = true
|
||||
ORDER BY start_time asc"#,
|
||||
market_name,
|
||||
resolution.to_string(),
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
use chrono::{Utc};
|
||||
use sqlx::{Pool, Postgres, Connection};
|
||||
use chrono::Utc;
|
||||
use sqlx::{Connection, Pool, Postgres};
|
||||
use std::{
|
||||
collections::{hash_map::DefaultHasher, HashMap},
|
||||
hash::{Hash, Hasher},
|
||||
};
|
||||
use tokio::{sync::mpsc::{error::TryRecvError, Receiver}};
|
||||
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
|
||||
|
||||
use crate::{
|
||||
structs::{candle::Candle, openbook::OpenBookFillEventLog},
|
||||
|
|
|
@ -36,9 +36,7 @@ pub async fn get_candles(
|
|||
|
||||
let mut conn = context.pool.acquire().await.unwrap();
|
||||
let candles =
|
||||
match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to)
|
||||
.await
|
||||
{
|
||||
match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to).await {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Err(ServerError::DbQueryError),
|
||||
};
|
||||
|
|
|
@ -32,12 +32,8 @@ pub async fn get_top_traders_by_base_volume(
|
|||
let to = to_timestampz(info.to);
|
||||
|
||||
let mut conn = context.pool.acquire().await.unwrap();
|
||||
let raw_traders = match fetch_top_traders_by_base_volume_from(
|
||||
&mut conn,
|
||||
&selected_market.address,
|
||||
from,
|
||||
to,
|
||||
)
|
||||
let raw_traders =
|
||||
match fetch_top_traders_by_base_volume_from(&mut conn, &selected_market.address, from, to)
|
||||
.await
|
||||
{
|
||||
Ok(c) => c,
|
||||
|
@ -72,12 +68,8 @@ pub async fn get_top_traders_by_quote_volume(
|
|||
let to = to_timestampz(info.to);
|
||||
|
||||
let mut conn = context.pool.acquire().await.unwrap();
|
||||
let raw_traders = match fetch_top_traders_by_quote_volume_from(
|
||||
&mut conn,
|
||||
&selected_market.address,
|
||||
from,
|
||||
to,
|
||||
)
|
||||
let raw_traders =
|
||||
match fetch_top_traders_by_quote_volume_from(&mut conn, &selected_market.address, from, to)
|
||||
.await
|
||||
{
|
||||
Ok(c) => c,
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||
use num_traits::Zero;
|
||||
use sqlx::{types::Decimal, Pool, Postgres};
|
||||
use sqlx::{pool::PoolConnection, Postgres};
|
||||
use std::cmp::{max, min};
|
||||
|
||||
use crate::{
|
||||
database::fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle},
|
||||
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
|
||||
structs::{
|
||||
candle::Candle,
|
||||
resolution::{day, Resolution},
|
||||
|
@ -12,18 +11,18 @@ use crate::{
|
|||
};
|
||||
|
||||
pub async fn batch_higher_order_candles(
|
||||
pool: &Pool<Postgres>,
|
||||
conn: &mut PoolConnection<Postgres>,
|
||||
market_name: &str,
|
||||
resolution: Resolution,
|
||||
) -> anyhow::Result<Vec<Candle>> {
|
||||
let latest_candle = fetch_latest_finished_candle(pool, market_name, resolution).await?;
|
||||
let latest_candle = fetch_latest_finished_candle(conn, market_name, resolution).await?;
|
||||
|
||||
match latest_candle {
|
||||
Some(candle) => {
|
||||
let start_time = candle.end_time;
|
||||
let end_time = start_time + day();
|
||||
let mut constituent_candles = fetch_candles_from(
|
||||
pool,
|
||||
conn,
|
||||
market_name,
|
||||
resolution.get_constituent_resolution(),
|
||||
start_time,
|
||||
|
@ -42,32 +41,20 @@ pub async fn batch_higher_order_candles(
|
|||
Ok(combined_candles)
|
||||
}
|
||||
None => {
|
||||
let constituent_candle =
|
||||
fetch_earliest_candle(pool, market_name, resolution.get_constituent_resolution())
|
||||
let mut constituent_candles =
|
||||
fetch_earliest_candles(conn, market_name, resolution.get_constituent_resolution())
|
||||
.await?;
|
||||
if constituent_candle.is_none() {
|
||||
println!(
|
||||
"Batching {}, but no candles found for: {:?}, {}",
|
||||
resolution,
|
||||
market_name,
|
||||
resolution.get_constituent_resolution()
|
||||
);
|
||||
if constituent_candles.len() == 0 {
|
||||
// println!(
|
||||
// "Batching {}, but no candles found for: {:?}, {}",
|
||||
// resolution,
|
||||
// market_name,
|
||||
// resolution.get_constituent_resolution()
|
||||
// );
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let start_time = constituent_candle
|
||||
.unwrap()
|
||||
.start_time
|
||||
.duration_trunc(day())?;
|
||||
let end_time = start_time + day();
|
||||
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
||||
|
||||
let mut constituent_candles = fetch_candles_from(
|
||||
pool,
|
||||
market_name,
|
||||
resolution.get_constituent_resolution(),
|
||||
start_time,
|
||||
end_time,
|
||||
)
|
||||
.await?;
|
||||
if constituent_candles.len() == 0 {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
@ -80,7 +67,10 @@ pub async fn batch_higher_order_candles(
|
|||
seed_candle,
|
||||
);
|
||||
|
||||
Ok(trim_zero_candles(combined_candles))
|
||||
Ok(trim_candles(
|
||||
combined_candles,
|
||||
constituent_candles[0].start_time,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -101,11 +91,10 @@ fn combine_into_higher_order_candles(
|
|||
);
|
||||
let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap();
|
||||
let candle_window = now - st;
|
||||
let num_candles = if candle_window.num_minutes() % duration.num_minutes() == 0 {
|
||||
(candle_window.num_minutes() / duration.num_minutes()) as usize + 1
|
||||
} else {
|
||||
(candle_window.num_minutes() / duration.num_minutes()) as usize
|
||||
};
|
||||
let num_candles = max(
|
||||
1,
|
||||
(candle_window.num_minutes() / duration.num_minutes()) as usize + 1,
|
||||
);
|
||||
|
||||
let mut combined_candles = vec![empty_candle; num_candles];
|
||||
|
||||
|
@ -143,16 +132,10 @@ fn combine_into_higher_order_candles(
|
|||
combined_candles
|
||||
}
|
||||
|
||||
fn trim_zero_candles(mut c: Vec<Candle>) -> Vec<Candle> {
|
||||
fn trim_candles(mut c: Vec<Candle>, start_time: DateTime<Utc>) -> Vec<Candle> {
|
||||
let mut i = 0;
|
||||
while i < c.len() {
|
||||
if c[i].open == Decimal::zero()
|
||||
&& c[i].high == Decimal::zero()
|
||||
&& c[i].low == Decimal::zero()
|
||||
&& c[i].close == Decimal::zero()
|
||||
&& c[i].volume == Decimal::zero()
|
||||
&& c[i].complete == true
|
||||
{
|
||||
if c[i].end_time <= start_time {
|
||||
c.remove(i);
|
||||
} else {
|
||||
i += 1;
|
||||
|
|
|
@ -55,9 +55,13 @@ pub async fn batch_1m_candles(
|
|||
Utc::now().duration_trunc(Duration::minutes(1))?,
|
||||
);
|
||||
let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?;
|
||||
if fills.len() > 0 {
|
||||
let candles =
|
||||
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
||||
Ok(candles)
|
||||
} else {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,7 +55,9 @@ async fn main() -> anyhow::Result<()> {
|
|||
let fills_pool = pool.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
loop {
|
||||
persist_fill_events(&fills_pool, &mut fill_receiver).await.unwrap();
|
||||
persist_fill_events(&fills_pool, &mut fill_receiver)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}));
|
||||
|
||||
|
@ -65,7 +67,9 @@ async fn main() -> anyhow::Result<()> {
|
|||
let sender = candle_sender.clone();
|
||||
let batch_pool = pool.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
batch_for_market(batch_pool, &sender, &market).await.unwrap();
|
||||
batch_for_market(batch_pool, &sender, &market)
|
||||
.await
|
||||
.unwrap();
|
||||
println!("SOMETHING WENT WRONG");
|
||||
}));
|
||||
}
|
||||
|
@ -73,7 +77,9 @@ async fn main() -> anyhow::Result<()> {
|
|||
let persist_pool = pool.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
loop {
|
||||
persist_candles(persist_pool.clone(), &mut candle_receiver).await.unwrap();
|
||||
persist_candles(persist_pool.clone(), &mut candle_receiver)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}));
|
||||
|
||||
|
|
Loading…
Reference in New Issue