feat: markets in batcher spawn own threads

This commit is contained in:
dboures 2023-03-14 00:31:00 -05:00
parent 4abadc9589
commit c5d3a1f4ab
No known key found for this signature in database
GPG Key ID: AB3790129D478852
5 changed files with 69 additions and 72 deletions

View File

@ -1,4 +1,4 @@
use chrono::{DateTime, DurationRound, Utc};
use chrono::{DateTime, Duration, DurationRound, Utc};
use num_traits::Zero;
use sqlx::{types::Decimal, Pool, Postgres};
use std::cmp::{max, min};
@ -22,8 +22,6 @@ pub async fn batch_higher_order_candles(
Some(candle) => {
let start_time = candle.end_time;
let end_time = start_time + day();
// println!("candle.end_time: {:?}", candle.end_time);
// println!("start_time: {:?}", start_time);
let mut constituent_candles = fetch_candles_from(
pool,
market_name,
@ -97,16 +95,19 @@ fn combine_into_higher_order_candles(
let duration = target_resolution.get_duration();
let candles_len = constituent_candles.len();
let empty_candle = Candle::create_empty_candle(
constituent_candles[0].market_name.clone(),
target_resolution,
);
let mut combined_candles =
vec![empty_candle; (day().num_minutes() / duration.num_minutes()) as usize];
let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap();
let candle_window = now - st;
let num_candles = if candle_window.num_minutes() % duration.num_minutes() == 0 {
(candle_window.num_minutes() / duration.num_minutes()) as usize + 1
} else {
(candle_window.num_minutes() / duration.num_minutes()) as usize
};
println!("candles_len: {}", candles_len);
let mut combined_candles = vec![empty_candle; num_candles];
let mut con_iter = constituent_candles.iter_mut().peekable();
let mut start_time = st.clone();

View File

@ -1,7 +1,6 @@
use std::cmp::{max, min};
use chrono::{DateTime, Duration, DurationRound, Utc};
use num_traits::{FromPrimitive, Zero};
use sqlx::{types::Decimal, Pool, Postgres};
use crate::{
@ -16,7 +15,7 @@ use crate::{
pub async fn batch_1m_candles(
pool: &Pool<Postgres>,
market: MarketInfo,
market: &MarketInfo,
) -> anyhow::Result<Vec<Candle>> {
let market_name = &market.name;
let market_address = &market.address;
@ -65,12 +64,12 @@ pub async fn batch_1m_candles(
fn combine_fills_into_1m_candles(
fills: &mut Vec<PgOpenBookFill>,
market: MarketInfo,
market: &MarketInfo,
st: DateTime<Utc>,
et: DateTime<Utc>,
maybe_last_price: Option<Decimal>,
) -> Vec<Candle> {
let empty_candle = Candle::create_empty_candle(market.name, Resolution::R1m);
let empty_candle = Candle::create_empty_candle(market.name.clone(), Resolution::R1m);
let minutes = (et - st).num_minutes();
let mut candles = vec![empty_candle; minutes as usize];
@ -79,7 +78,15 @@ fn combine_fills_into_1m_candles(
let mut start_time = st.clone();
let mut end_time = start_time + Duration::minutes(1);
let mut last_price = maybe_last_price.unwrap_or(Decimal::zero()); // TODO: very first open is wrong
let mut last_price = match maybe_last_price {
Some(p) => p,
None => {
let first = fills_iter.peek().clone().unwrap();
let (price, _) =
calculate_fill_price_and_size(**first, market.base_decimals, market.quote_decimals);
price
}
};
for i in 0..candles.len() {
candles[i].open = last_price;

View File

@ -18,36 +18,31 @@ pub async fn batch_candles(
candles_sender: &Sender<Vec<Candle>>,
markets: Vec<MarketInfo>,
) {
// TODO: tokio spawn a taks for every market
let mut handles = vec![];
for market in markets.into_iter() {
let sender = candles_sender.clone();
let pool_clone = pool.clone();
let market_clone = market.clone();
handles.push(tokio::spawn(async move {
loop {
batch_for_market(&pool_clone, &sender, &market_clone)
.await
.unwrap();
loop {
let m = MarketInfo {
name: "BTC/USDC".to_owned(),
address: "A8YFbxQYFVqKZaoYJLLUVcQiWP7G2MeEgW5wsAQgMvFw".to_owned(),
base_decimals: 6,
quote_decimals: 6,
base_mint_key: "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU".to_owned(),
quote_mint_key: "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_owned(),
base_lot_size: 10_000,
quote_lot_size: 1,
};
batch_for_market(&pool.clone(), candles_sender, m)
.await
.unwrap();
sleep(Duration::milliseconds(500).to_std().unwrap()).await;
sleep(Duration::milliseconds(2000).to_std().unwrap()).await;
}
}));
}
//loop
futures::future::join_all(handles).await;
}
async fn batch_for_market(
pool: &Pool<Postgres>,
candles_sender: &Sender<Vec<Candle>>,
market: MarketInfo,
market: &MarketInfo,
) -> anyhow::Result<()> {
let market_address = &market.address.clone();
let market_name = &market.name.clone();
let candles = batch_1m_candles(pool, market).await?;
send_candles(candles, candles_sender).await;
@ -55,7 +50,7 @@ async fn batch_for_market(
if resolution == Resolution::R1m {
continue;
}
let candles = batch_higher_order_candles(pool, market_address, resolution).await?;
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
send_candles(candles, candles_sender).await;
}
Ok(())

View File

@ -45,6 +45,29 @@ pub async fn persist_fill_events(
}
}
pub async fn persist_candles(pool: Pool<Postgres>, mut candles_receiver: Receiver<Vec<Candle>>) {
loop {
match candles_receiver.try_recv() {
Ok(candles) => {
if candles.len() == 0 {
continue;
}
print!("writing: {:?} candles to DB\n", candles.len());
let upsert_statement = build_candes_upsert_statement(candles);
sqlx::query(&upsert_statement)
.execute(&pool)
.await
.map_err_anyhow()
.unwrap();
}
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
panic!("Candles sender must stay alive")
}
};
}
}
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> String {
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES");
for (idx, event) in events.keys().enumerate() {
@ -79,35 +102,6 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEventLog, u8>) -> St
stmt
}
pub async fn persist_candles(pool: Pool<Postgres>, mut candles_receiver: Receiver<Vec<Candle>>) {
loop {
match candles_receiver.try_recv() {
Ok(candles) => {
if candles.len() == 0 {
continue;
}
print!("writing: {:?} candles to DB\n", candles.len());
match candles.last() {
Some(c) => {
println!("{:?}\n\n", c.end_time)
}
None => {}
}
let upsert_statement = build_candes_upsert_statement(candles);
sqlx::query(&upsert_statement)
.execute(&pool)
.await
.map_err_anyhow()
.unwrap();
}
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
panic!("Candles sender must stay alive")
}
};
}
}
fn build_candes_upsert_statement(candles: Vec<Candle>) -> String {
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
for (idx, candle) in candles.iter().enumerate() {

View File

@ -66,14 +66,14 @@ impl Resolution {
pub fn from_str(v: &str) -> Result<Self, ()> {
match v {
"1" => Ok(Resolution::R1m),
"3" => Ok(Resolution::R3m),
"5" => Ok(Resolution::R5m),
"15" => Ok(Resolution::R15m),
"30" => Ok(Resolution::R30m),
"60" => Ok(Resolution::R1h),
"120" => Ok(Resolution::R2h),
"240" => Ok(Resolution::R4h),
"1M" => Ok(Resolution::R1m),
"3M" => Ok(Resolution::R3m),
"5M" => Ok(Resolution::R5m),
"15M" => Ok(Resolution::R15m),
"30M" => Ok(Resolution::R30m),
"1H" => Ok(Resolution::R1h),
"2H" => Ok(Resolution::R2h),
"4H" => Ok(Resolution::R4h),
"D" => Ok(Resolution::R1d),
_ => Err(()),
}