2023-03-12 00:13:57 -08:00
|
|
|
pub mod higher_order_candles;
|
|
|
|
pub mod minute_candles;
|
|
|
|
|
2023-03-12 16:32:12 -07:00
|
|
|
use chrono::Duration;
|
|
|
|
use sqlx::{Pool, Postgres};
|
2023-03-12 00:13:57 -08:00
|
|
|
use strum::IntoEnumIterator;
|
|
|
|
use tokio::{sync::mpsc::Sender, time::sleep};
|
|
|
|
|
|
|
|
use crate::{
|
2023-03-14 14:57:42 -07:00
|
|
|
worker::candle_batching::minute_candles::batch_1m_candles,
|
2023-03-13 09:51:30 -07:00
|
|
|
structs::{candle::Candle, markets::MarketInfo, resolution::Resolution},
|
2023-03-12 00:13:57 -08:00
|
|
|
};
|
|
|
|
|
|
|
|
use self::higher_order_candles::batch_higher_order_candles;
|
|
|
|
|
|
|
|
pub async fn batch_candles(
|
|
|
|
pool: Pool<Postgres>,
|
|
|
|
candles_sender: &Sender<Vec<Candle>>,
|
|
|
|
markets: Vec<MarketInfo>,
|
|
|
|
) {
|
2023-03-13 22:31:00 -07:00
|
|
|
let mut handles = vec![];
|
|
|
|
for market in markets.into_iter() {
|
|
|
|
let sender = candles_sender.clone();
|
|
|
|
let pool_clone = pool.clone();
|
|
|
|
let market_clone = market.clone();
|
|
|
|
handles.push(tokio::spawn(async move {
|
|
|
|
loop {
|
|
|
|
batch_for_market(&pool_clone, &sender, &market_clone)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
2023-03-12 00:13:57 -08:00
|
|
|
|
2023-03-13 22:31:00 -07:00
|
|
|
sleep(Duration::milliseconds(2000).to_std().unwrap()).await;
|
|
|
|
}
|
|
|
|
}));
|
2023-03-12 00:13:57 -08:00
|
|
|
}
|
|
|
|
|
2023-03-13 22:31:00 -07:00
|
|
|
futures::future::join_all(handles).await;
|
2023-03-12 00:13:57 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn batch_for_market(
|
|
|
|
pool: &Pool<Postgres>,
|
|
|
|
candles_sender: &Sender<Vec<Candle>>,
|
2023-03-13 22:31:00 -07:00
|
|
|
market: &MarketInfo,
|
2023-03-12 00:13:57 -08:00
|
|
|
) -> anyhow::Result<()> {
|
2023-03-13 22:31:00 -07:00
|
|
|
let market_name = &market.name.clone();
|
2023-03-12 00:13:57 -08:00
|
|
|
let candles = batch_1m_candles(pool, market).await?;
|
|
|
|
send_candles(candles, candles_sender).await;
|
|
|
|
|
|
|
|
for resolution in Resolution::iter() {
|
|
|
|
if resolution == Resolution::R1m {
|
|
|
|
continue;
|
|
|
|
}
|
2023-03-13 22:31:00 -07:00
|
|
|
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
|
2023-03-12 00:13:57 -08:00
|
|
|
send_candles(candles, candles_sender).await;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) {
|
|
|
|
if candles.len() > 0 {
|
|
|
|
if let Err(_) = candles_sender.send(candles).await {
|
|
|
|
panic!("candles receiver dropped");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|