Merge pull request #1 from dboures/vanilla-pooling

Vanilla pooling
This commit is contained in:
dboures 2023-03-27 14:17:17 -05:00 committed by GitHub
commit a4c275974a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 255 additions and 312 deletions

View File

@ -151,158 +151,6 @@
}, },
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n ORDER BY start_time asc" "query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n ORDER BY start_time asc"
}, },
"98d4c97eeb4f0d14fd9284a968e88837456117a1c1995df37e615902b482ff10": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n ORDER BY start_time asc LIMIT 1"
},
"a69ebc1820a6635f83a99fddc1bf8ae0c7a3184de6d38e129baa6397c61efc60": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n and complete = true\n ORDER BY start_time asc"
},
"aee3a3e04f837bd62e263452bfbaf5d7dff271799c80d5efd22a54954ac212c4": { "aee3a3e04f837bd62e263452bfbaf5d7dff271799c80d5efd22a54954ac212c4": {
"describe": { "describe": {
"columns": [ "columns": [
@ -449,6 +297,81 @@
}, },
"query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1 \n and maker = true\n ORDER BY time asc LIMIT 1" "query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1 \n and maker = true\n ORDER BY time asc LIMIT 1"
}, },
"e367fec686c10d361f4a5ac014ca34ce68544dfe9cf32b79a6a78790e8c6d5cc": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n ORDER BY start_time asc"
},
"e94788e9eb04534dc13a73f80255705fb39789caa6dfb43e8417471f8399bb85": { "e94788e9eb04534dc13a73f80255705fb39789caa6dfb43e8417471f8399bb85": {
"describe": { "describe": {
"columns": [], "columns": [],

View File

@ -1,5 +1,5 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use sqlx::{Pool, Postgres}; use sqlx::{pool::PoolConnection, Postgres};
use crate::{ use crate::{
structs::{candle::Candle, openbook::PgOpenBookFill, resolution::Resolution, trader::PgTrader}, structs::{candle::Candle, openbook::PgOpenBookFill, resolution::Resolution, trader::PgTrader},
@ -7,7 +7,7 @@ use crate::{
}; };
pub async fn fetch_earliest_fill( pub async fn fetch_earliest_fill(
pool: &Pool<Postgres>, conn: &mut PoolConnection<Postgres>,
market_address_string: &str, market_address_string: &str,
) -> anyhow::Result<Option<PgOpenBookFill>> { ) -> anyhow::Result<Option<PgOpenBookFill>> {
sqlx::query_as!( sqlx::query_as!(
@ -25,13 +25,13 @@ pub async fn fetch_earliest_fill(
ORDER BY time asc LIMIT 1"#, ORDER BY time asc LIMIT 1"#,
market_address_string market_address_string
) )
.fetch_optional(pool) .fetch_optional(conn)
.await .await
.map_err_anyhow() .map_err_anyhow()
} }
pub async fn fetch_fills_from( pub async fn fetch_fills_from(
pool: &Pool<Postgres>, conn: &mut PoolConnection<Postgres>,
market_address_string: &str, market_address_string: &str,
start_time: DateTime<Utc>, start_time: DateTime<Utc>,
end_time: DateTime<Utc>, end_time: DateTime<Utc>,
@ -55,13 +55,13 @@ pub async fn fetch_fills_from(
start_time, start_time,
end_time end_time
) )
.fetch_all(pool) .fetch_all(conn)
.await .await
.map_err_anyhow() .map_err_anyhow()
} }
pub async fn fetch_latest_finished_candle( pub async fn fetch_latest_finished_candle(
pool: &Pool<Postgres>, conn: &mut PoolConnection<Postgres>,
market_name: &str, market_name: &str,
resolution: Resolution, resolution: Resolution,
) -> anyhow::Result<Option<Candle>> { ) -> anyhow::Result<Option<Candle>> {
@ -86,16 +86,16 @@ pub async fn fetch_latest_finished_candle(
market_name, market_name,
resolution.to_string() resolution.to_string()
) )
.fetch_optional(pool) .fetch_optional(conn)
.await .await
.map_err_anyhow() .map_err_anyhow()
} }
pub async fn fetch_earliest_candle( pub async fn fetch_earliest_candles(
pool: &Pool<Postgres>, conn: &mut PoolConnection<Postgres>,
market_name: &str, market_name: &str,
resolution: Resolution, resolution: Resolution,
) -> anyhow::Result<Option<Candle>> { ) -> anyhow::Result<Vec<Candle>> {
sqlx::query_as!( sqlx::query_as!(
Candle, Candle,
r#"SELECT r#"SELECT
@ -112,17 +112,17 @@ pub async fn fetch_earliest_candle(
from candles from candles
where market_name = $1 where market_name = $1
and resolution = $2 and resolution = $2
ORDER BY start_time asc LIMIT 1"#, ORDER BY start_time asc"#,
market_name, market_name,
resolution.to_string() resolution.to_string()
) )
.fetch_optional(pool) .fetch_all(conn)
.await .await
.map_err_anyhow() .map_err_anyhow()
} }
pub async fn fetch_candles_from( pub async fn fetch_candles_from(
pool: &Pool<Postgres>, conn: &mut PoolConnection<Postgres>,
market_name: &str, market_name: &str,
resolution: Resolution, resolution: Resolution,
start_time: DateTime<Utc>, start_time: DateTime<Utc>,
@ -146,20 +146,19 @@ pub async fn fetch_candles_from(
and resolution = $2 and resolution = $2
and start_time >= $3 and start_time >= $3
and end_time <= $4 and end_time <= $4
and complete = true
ORDER BY start_time asc"#, ORDER BY start_time asc"#,
market_name, market_name,
resolution.to_string(), resolution.to_string(),
start_time, start_time,
end_time end_time
) )
.fetch_all(pool) .fetch_all(conn)
.await .await
.map_err_anyhow() .map_err_anyhow()
} }
pub async fn fetch_tradingview_candles( pub async fn fetch_tradingview_candles(
pool: &Pool<Postgres>, conn: &mut PoolConnection<Postgres>,
market_name: &str, market_name: &str,
resolution: Resolution, resolution: Resolution,
start_time: DateTime<Utc>, start_time: DateTime<Utc>,
@ -189,13 +188,13 @@ pub async fn fetch_tradingview_candles(
start_time, start_time,
end_time end_time
) )
.fetch_all(pool) .fetch_all(conn)
.await .await
.map_err_anyhow() .map_err_anyhow()
} }
pub async fn fetch_top_traders_by_base_volume_from( pub async fn fetch_top_traders_by_base_volume_from(
pool: &Pool<Postgres>, conn: &mut PoolConnection<Postgres>,
market_address_string: &str, market_address_string: &str,
start_time: DateTime<Utc>, start_time: DateTime<Utc>,
end_time: DateTime<Utc>, end_time: DateTime<Utc>,
@ -225,13 +224,13 @@ LIMIT 10000"#,
start_time, start_time,
end_time end_time
) )
.fetch_all(pool) .fetch_all(conn)
.await .await
.map_err_anyhow() .map_err_anyhow()
} }
pub async fn fetch_top_traders_by_quote_volume_from( pub async fn fetch_top_traders_by_quote_volume_from(
pool: &Pool<Postgres>, conn: &mut PoolConnection<Postgres>,
market_address_string: &str, market_address_string: &str,
start_time: DateTime<Utc>, start_time: DateTime<Utc>,
end_time: DateTime<Utc>, end_time: DateTime<Utc>,
@ -261,7 +260,7 @@ LIMIT 10000"#,
start_time, start_time,
end_time end_time
) )
.fetch_all(pool) .fetch_all(conn)
.await .await
.map_err_anyhow() .map_err_anyhow()
} }

View File

@ -1,5 +1,5 @@
use chrono::Utc; use chrono::Utc;
use sqlx::{Pool, Postgres}; use sqlx::{Connection, Pool, Postgres};
use std::{ use std::{
collections::{hash_map::DefaultHasher, HashMap}, collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher}, hash::{Hash, Hasher},
@ -13,8 +13,9 @@ use crate::{
pub async fn persist_fill_events( pub async fn persist_fill_events(
pool: &Pool<Postgres>, pool: &Pool<Postgres>,
mut fill_receiver: Receiver<OpenBookFillEventLog>, fill_receiver: &mut Receiver<OpenBookFillEventLog>,
) { ) -> anyhow::Result<()> {
let mut conn = pool.acquire().await.unwrap();
loop { loop {
let mut write_batch = HashMap::new(); let mut write_batch = HashMap::new();
while write_batch.len() < 10 { while write_batch.len() < 10 {
@ -38,38 +39,61 @@ pub async fn persist_fill_events(
} }
if write_batch.len() > 0 { if write_batch.len() > 0 {
print!("writing: {:?} events to DB\n", write_batch.len()); // print!("writing: {:?} events to DB\n", write_batch.len());
let upsert_statement = build_fills_upsert_statement(write_batch);
sqlx::query(&upsert_statement) match conn.ping().await {
.execute(pool) Ok(_) => {
.await let upsert_statement = build_fills_upsert_statement(write_batch);
.map_err_anyhow() sqlx::query(&upsert_statement)
.unwrap(); .execute(&mut conn)
.await
.map_err_anyhow()
.unwrap();
}
Err(_) => {
println!("Fills ping failed");
break;
}
}
} }
} }
Ok(())
} }
pub async fn persist_candles(pool: Pool<Postgres>, mut candles_receiver: Receiver<Vec<Candle>>) { pub async fn persist_candles(
pool: Pool<Postgres>,
candles_receiver: &mut Receiver<Vec<Candle>>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await.unwrap();
loop { loop {
match candles_receiver.try_recv() { match conn.ping().await {
Ok(candles) => { Ok(_) => {
if candles.len() == 0 { match candles_receiver.try_recv() {
continue; Ok(candles) => {
} if candles.len() == 0 {
print!("writing: {:?} candles to DB\n", candles.len()); continue;
let upsert_statement = build_candes_upsert_statement(candles); }
sqlx::query(&upsert_statement) // print!("writing: {:?} candles to DB\n", candles.len());
.execute(&pool) let upsert_statement = build_candes_upsert_statement(candles);
.await sqlx::query(&upsert_statement)
.map_err_anyhow() .execute(&mut conn)
.unwrap(); .await
.map_err_anyhow()
.unwrap();
}
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
panic!("Candles sender must stay alive")
}
};
} }
Err(TryRecvError::Empty) => continue, Err(_) => {
Err(TryRecvError::Disconnected) => { println!("Candle ping failed");
panic!("Candles sender must stay alive") break;
} }
}; };
} }
Ok(())
} }
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> String { fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> String {

View File

@ -1,4 +1,4 @@
pub mod worker;
pub mod database; pub mod database;
pub mod structs; pub mod structs;
pub mod utils; pub mod utils;
pub mod worker;

View File

@ -34,10 +34,9 @@ pub async fn get_candles(
let from = to_timestampz(info.from); let from = to_timestampz(info.from);
let to = to_timestampz(info.to); let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap();
let candles = let candles =
match fetch_tradingview_candles(&context.pool, &info.market_name, resolution, from, to) match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to).await {
.await
{
Ok(c) => c, Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError), Err(_) => return Err(ServerError::DbQueryError),
}; };

View File

@ -31,17 +31,14 @@ pub async fn get_top_traders_by_base_volume(
let from = to_timestampz(info.from); let from = to_timestampz(info.from);
let to = to_timestampz(info.to); let to = to_timestampz(info.to);
let raw_traders = match fetch_top_traders_by_base_volume_from( let mut conn = context.pool.acquire().await.unwrap();
&context.pool, let raw_traders =
&selected_market.address, match fetch_top_traders_by_base_volume_from(&mut conn, &selected_market.address, from, to)
from, .await
to, {
) Ok(c) => c,
.await Err(_) => return Err(ServerError::DbQueryError),
{ };
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let traders = raw_traders let traders = raw_traders
.into_iter() .into_iter()
@ -70,17 +67,14 @@ pub async fn get_top_traders_by_quote_volume(
let from = to_timestampz(info.from); let from = to_timestampz(info.from);
let to = to_timestampz(info.to); let to = to_timestampz(info.to);
let raw_traders = match fetch_top_traders_by_quote_volume_from( let mut conn = context.pool.acquire().await.unwrap();
&context.pool, let raw_traders =
&selected_market.address, match fetch_top_traders_by_quote_volume_from(&mut conn, &selected_market.address, from, to)
from, .await
to, {
) Ok(c) => c,
.await Err(_) => return Err(ServerError::DbQueryError),
{ };
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let traders = raw_traders let traders = raw_traders
.into_iter() .into_iter()

View File

@ -1,10 +1,9 @@
use chrono::{DateTime, Duration, DurationRound, Utc}; use chrono::{DateTime, Duration, DurationRound, Utc};
use num_traits::Zero; use sqlx::{pool::PoolConnection, Postgres};
use sqlx::{types::Decimal, Pool, Postgres};
use std::cmp::{max, min}; use std::cmp::{max, min};
use crate::{ use crate::{
database::fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle}, database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
structs::{ structs::{
candle::Candle, candle::Candle,
resolution::{day, Resolution}, resolution::{day, Resolution},
@ -12,18 +11,18 @@ use crate::{
}; };
pub async fn batch_higher_order_candles( pub async fn batch_higher_order_candles(
pool: &Pool<Postgres>, conn: &mut PoolConnection<Postgres>,
market_name: &str, market_name: &str,
resolution: Resolution, resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> { ) -> anyhow::Result<Vec<Candle>> {
let latest_candle = fetch_latest_finished_candle(pool, market_name, resolution).await?; let latest_candle = fetch_latest_finished_candle(conn, market_name, resolution).await?;
match latest_candle { match latest_candle {
Some(candle) => { Some(candle) => {
let start_time = candle.end_time; let start_time = candle.end_time;
let end_time = start_time + day(); let end_time = start_time + day();
let mut constituent_candles = fetch_candles_from( let mut constituent_candles = fetch_candles_from(
pool, conn,
market_name, market_name,
resolution.get_constituent_resolution(), resolution.get_constituent_resolution(),
start_time, start_time,
@ -42,32 +41,20 @@ pub async fn batch_higher_order_candles(
Ok(combined_candles) Ok(combined_candles)
} }
None => { None => {
let constituent_candle = let mut constituent_candles =
fetch_earliest_candle(pool, market_name, resolution.get_constituent_resolution()) fetch_earliest_candles(conn, market_name, resolution.get_constituent_resolution())
.await?; .await?;
if constituent_candle.is_none() { if constituent_candles.len() == 0 {
println!( // println!(
"Batching {}, but no candles found for: {:?}, {}", // "Batching {}, but no candles found for: {:?}, {}",
resolution, // resolution,
market_name, // market_name,
resolution.get_constituent_resolution() // resolution.get_constituent_resolution()
); // );
return Ok(Vec::new()); return Ok(Vec::new());
} }
let start_time = constituent_candle let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
.unwrap()
.start_time
.duration_trunc(day())?;
let end_time = start_time + day();
let mut constituent_candles = fetch_candles_from(
pool,
market_name,
resolution.get_constituent_resolution(),
start_time,
end_time,
)
.await?;
if constituent_candles.len() == 0 { if constituent_candles.len() == 0 {
return Ok(Vec::new()); return Ok(Vec::new());
} }
@ -80,7 +67,10 @@ pub async fn batch_higher_order_candles(
seed_candle, seed_candle,
); );
Ok(trim_zero_candles(combined_candles)) Ok(trim_candles(
combined_candles,
constituent_candles[0].start_time,
))
} }
} }
} }
@ -91,7 +81,7 @@ fn combine_into_higher_order_candles(
st: DateTime<Utc>, st: DateTime<Utc>,
seed_candle: Candle, seed_candle: Candle,
) -> Vec<Candle> { ) -> Vec<Candle> {
println!("target_resolution: {}", target_resolution); // println!("target_resolution: {}", target_resolution);
let duration = target_resolution.get_duration(); let duration = target_resolution.get_duration();
@ -101,11 +91,10 @@ fn combine_into_higher_order_candles(
); );
let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap(); let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap();
let candle_window = now - st; let candle_window = now - st;
let num_candles = if candle_window.num_minutes() % duration.num_minutes() == 0 { let num_candles = max(
(candle_window.num_minutes() / duration.num_minutes()) as usize + 1 1,
} else { (candle_window.num_minutes() / duration.num_minutes()) as usize + 1,
(candle_window.num_minutes() / duration.num_minutes()) as usize );
};
let mut combined_candles = vec![empty_candle; num_candles]; let mut combined_candles = vec![empty_candle; num_candles];
@ -143,16 +132,10 @@ fn combine_into_higher_order_candles(
combined_candles combined_candles
} }
fn trim_zero_candles(mut c: Vec<Candle>) -> Vec<Candle> { fn trim_candles(mut c: Vec<Candle>, start_time: DateTime<Utc>) -> Vec<Candle> {
let mut i = 0; let mut i = 0;
while i < c.len() { while i < c.len() {
if c[i].open == Decimal::zero() if c[i].end_time <= start_time {
&& c[i].high == Decimal::zero()
&& c[i].low == Decimal::zero()
&& c[i].close == Decimal::zero()
&& c[i].volume == Decimal::zero()
&& c[i].complete == true
{
c.remove(i); c.remove(i);
} else { } else {
i += 1; i += 1;

View File

@ -1,7 +1,7 @@
use std::cmp::{max, min}; use std::cmp::{max, min};
use chrono::{DateTime, Duration, DurationRound, Utc}; use chrono::{DateTime, Duration, DurationRound, Utc};
use sqlx::{types::Decimal, Pool, Postgres}; use sqlx::{pool::PoolConnection, types::Decimal, Postgres};
use crate::{ use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
@ -14,12 +14,12 @@ use crate::{
}; };
pub async fn batch_1m_candles( pub async fn batch_1m_candles(
pool: &Pool<Postgres>, conn: &mut PoolConnection<Postgres>,
market: &MarketInfo, market: &MarketInfo,
) -> anyhow::Result<Vec<Candle>> { ) -> anyhow::Result<Vec<Candle>> {
let market_name = &market.name; let market_name = &market.name;
let market_address = &market.address; let market_address = &market.address;
let latest_candle = fetch_latest_finished_candle(pool, market_name, Resolution::R1m).await?; let latest_candle = fetch_latest_finished_candle(conn, market_name, Resolution::R1m).await?;
match latest_candle { match latest_candle {
Some(candle) => { Some(candle) => {
@ -28,7 +28,7 @@ pub async fn batch_1m_candles(
start_time + day(), start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?, Utc::now().duration_trunc(Duration::minutes(1))?,
); );
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?;
let candles = combine_fills_into_1m_candles( let candles = combine_fills_into_1m_candles(
&mut fills, &mut fills,
market, market,
@ -39,7 +39,7 @@ pub async fn batch_1m_candles(
Ok(candles) Ok(candles)
} }
None => { None => {
let earliest_fill = fetch_earliest_fill(pool, market_address).await?; let earliest_fill = fetch_earliest_fill(conn, market_address).await?;
if earliest_fill.is_none() { if earliest_fill.is_none() {
println!("No fills found for: {:?}", market_name); println!("No fills found for: {:?}", market_name);
@ -54,10 +54,14 @@ pub async fn batch_1m_candles(
start_time + day(), start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?, Utc::now().duration_trunc(Duration::minutes(1))?,
); );
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?;
let candles = if fills.len() > 0 {
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); let candles =
Ok(candles) combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
Ok(candles)
} else {
Ok(Vec::new())
}
} }
} }
} }

View File

@ -2,55 +2,58 @@ pub mod higher_order_candles;
pub mod minute_candles; pub mod minute_candles;
use chrono::Duration; use chrono::Duration;
use sqlx::{Pool, Postgres}; use sqlx::{pool::PoolConnection, Pool, Postgres};
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
use tokio::{sync::mpsc::Sender, time::sleep}; use tokio::{sync::mpsc::Sender, time::sleep};
use crate::{ use crate::{
worker::candle_batching::minute_candles::batch_1m_candles,
structs::{candle::Candle, markets::MarketInfo, resolution::Resolution}, structs::{candle::Candle, markets::MarketInfo, resolution::Resolution},
worker::candle_batching::minute_candles::batch_1m_candles,
}; };
use self::higher_order_candles::batch_higher_order_candles; use self::higher_order_candles::batch_higher_order_candles;
pub async fn batch_candles( pub async fn batch_for_market(
pool: Pool<Postgres>, pool: Pool<Postgres>,
candles_sender: &Sender<Vec<Candle>>, candles_sender: &Sender<Vec<Candle>>,
markets: Vec<MarketInfo>, market: &MarketInfo,
) { ) -> anyhow::Result<()> {
let mut handles = vec![]; loop {
for market in markets.into_iter() {
let sender = candles_sender.clone(); let sender = candles_sender.clone();
let pool_clone = pool.clone();
let market_clone = market.clone(); let market_clone = market.clone();
handles.push(tokio::spawn(async move { let mut conn = pool.acquire().await?;
loop { loop {
batch_for_market(&pool_clone, &sender, &market_clone) sleep(Duration::milliseconds(2000).to_std()?).await;
.await match batch_inner(&mut conn, &sender, &market_clone).await {
.unwrap(); Ok(_) => {}
Err(e) => {
sleep(Duration::milliseconds(2000).to_std().unwrap()).await; println!(
} "Batching thread failed for {:?} with error: {:?}",
})); market_clone.name.clone(),
e
);
break;
}
};
}
println!("Restarting {:?} batching thread", market.name);
} }
futures::future::join_all(handles).await;
} }
async fn batch_for_market( async fn batch_inner(
pool: &Pool<Postgres>, conn: &mut PoolConnection<Postgres>,
candles_sender: &Sender<Vec<Candle>>, candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo, market: &MarketInfo,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let market_name = &market.name.clone(); let market_name = &market.name.clone();
let candles = batch_1m_candles(pool, market).await?; let candles = batch_1m_candles(conn, market).await?;
send_candles(candles, candles_sender).await; send_candles(candles, candles_sender).await;
for resolution in Resolution::iter() { for resolution in Resolution::iter() {
if resolution == Resolution::R1m { if resolution == Resolution::R1m {
continue; continue;
} }
let candles = batch_higher_order_candles(pool, market_name, resolution).await?; let candles = batch_higher_order_candles(conn, market_name, resolution).await?;
send_candles(candles, candles_sender).await; send_candles(candles, candles_sender).await;
} }
Ok(()) Ok(())

View File

@ -1,6 +1,4 @@
use dotenv; use dotenv;
use openbook_candles::worker::candle_batching::batch_candles;
use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::database::{ use openbook_candles::database::{
initialize::{connect_to_database, setup_database}, initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events}, insert::{persist_candles, persist_fill_events},
@ -9,10 +7,12 @@ use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEventLog; use openbook_candles::structs::openbook::OpenBookFillEventLog;
use openbook_candles::utils::Config; use openbook_candles::utils::Config;
use openbook_candles::worker::candle_batching::batch_for_market;
use openbook_candles::worker::trade_fetching::scrape::scrape;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::env;
use std::{collections::HashMap, str::FromStr}; use std::{collections::HashMap, str::FromStr};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use std::env;
#[tokio::main] #[tokio::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> {
}; };
let markets = load_markets(&path_to_markets_json); let markets = load_markets(&path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets).await?; let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new(); let mut target_markets = HashMap::new();
for m in market_infos.clone() { for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0); target_markets.insert(Pubkey::from_str(&m.address)?, 0);
@ -46,7 +46,7 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?; setup_database(&pool).await?;
let mut handles = vec![]; let mut handles = vec![];
let (fill_sender, fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000); let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay
@ -54,19 +54,33 @@ async fn main() -> anyhow::Result<()> {
let fills_pool = pool.clone(); let fills_pool = pool.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
persist_fill_events(&fills_pool, fill_receiver).await; loop {
persist_fill_events(&fills_pool, &mut fill_receiver)
.await
.unwrap();
}
})); }));
let (candle_sender, candle_receiver) = mpsc::channel::<Vec<Candle>>(1000); let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
let batch_pool = pool.clone(); for market in market_infos.into_iter() {
handles.push(tokio::spawn(async move { let sender = candle_sender.clone();
batch_candles(batch_pool, &candle_sender, market_infos).await; let batch_pool = pool.clone();
})); handles.push(tokio::spawn(async move {
batch_for_market(batch_pool, &sender, &market)
.await
.unwrap();
println!("SOMETHING WENT WRONG");
}));
}
let persist_pool = pool.clone(); let persist_pool = pool.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
persist_candles(persist_pool, candle_receiver).await; loop {
persist_candles(persist_pool.clone(), &mut candle_receiver)
.await
.unwrap();
}
})); }));
futures::future::join_all(handles).await; futures::future::join_all(handles).await;