2023-03-12 00:13:57 -08:00
|
|
|
pub mod higher_order_candles;
|
|
|
|
pub mod minute_candles;
|
|
|
|
|
2023-03-12 16:32:12 -07:00
|
|
|
use chrono::Duration;
|
2023-05-14 00:15:10 -07:00
|
|
|
use deadpool_postgres::Pool;
|
2023-05-31 06:32:34 -07:00
|
|
|
use log::{error, warn};
|
2023-03-12 00:13:57 -08:00
|
|
|
use strum::IntoEnumIterator;
|
2023-06-03 10:22:10 -07:00
|
|
|
use tokio::time::sleep;
|
2023-03-12 00:13:57 -08:00
|
|
|
|
|
|
|
use crate::{
|
2023-06-03 10:22:10 -07:00
|
|
|
database::insert::build_candles_upsert_statement,
|
2023-03-13 09:51:30 -07:00
|
|
|
structs::{candle::Candle, markets::MarketInfo, resolution::Resolution},
|
2023-06-03 10:22:10 -07:00
|
|
|
utils::AnyhowWrap,
|
2023-03-26 12:39:25 -07:00
|
|
|
worker::candle_batching::minute_candles::batch_1m_candles,
|
2023-03-12 00:13:57 -08:00
|
|
|
};
|
|
|
|
|
|
|
|
use self::higher_order_candles::batch_higher_order_candles;
|
|
|
|
|
2023-06-03 10:22:10 -07:00
|
|
|
pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
2023-03-26 12:39:25 -07:00
|
|
|
loop {
|
2023-03-13 22:31:00 -07:00
|
|
|
let market_clone = market.clone();
|
2023-05-31 06:32:34 -07:00
|
|
|
|
2023-03-26 12:39:25 -07:00
|
|
|
loop {
|
2023-03-27 10:11:58 -07:00
|
|
|
sleep(Duration::milliseconds(2000).to_std()?).await;
|
2023-06-03 10:22:10 -07:00
|
|
|
match batch_inner(pool, &market_clone).await {
|
2023-03-26 12:39:25 -07:00
|
|
|
Ok(_) => {}
|
|
|
|
Err(e) => {
|
2023-05-31 06:32:34 -07:00
|
|
|
error!(
|
2023-03-26 12:39:25 -07:00
|
|
|
"Batching thread failed for {:?} with error: {:?}",
|
|
|
|
market_clone.name.clone(),
|
|
|
|
e
|
|
|
|
);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
2023-05-31 06:32:34 -07:00
|
|
|
warn!("Restarting {:?} batching thread", market.name);
|
2023-03-12 00:13:57 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-03 10:22:10 -07:00
|
|
|
async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
2023-03-13 22:31:00 -07:00
|
|
|
let market_name = &market.name.clone();
|
2023-05-14 00:15:10 -07:00
|
|
|
let candles = batch_1m_candles(pool, market).await?;
|
2023-06-03 10:22:10 -07:00
|
|
|
save_candles(pool, candles).await?;
|
2023-03-12 00:13:57 -08:00
|
|
|
for resolution in Resolution::iter() {
|
|
|
|
if resolution == Resolution::R1m {
|
|
|
|
continue;
|
|
|
|
}
|
2023-05-14 00:15:10 -07:00
|
|
|
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
|
2023-06-03 10:22:10 -07:00
|
|
|
save_candles(pool, candles).await?;
|
2023-03-12 00:13:57 -08:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-06-03 10:22:10 -07:00
|
|
|
async fn save_candles(pool: &Pool, candles: Vec<Candle>) -> anyhow::Result<()> {
|
|
|
|
if candles.len() == 0 {
|
|
|
|
return Ok(());
|
2023-03-12 00:13:57 -08:00
|
|
|
}
|
2023-06-03 10:22:10 -07:00
|
|
|
let upsert_statement = build_candles_upsert_statement(&candles);
|
|
|
|
let client = pool.get().await.unwrap();
|
|
|
|
client
|
|
|
|
.execute(&upsert_statement, &[])
|
|
|
|
.await
|
|
|
|
.map_err_anyhow()?;
|
|
|
|
Ok(())
|
2023-03-12 00:13:57 -08:00
|
|
|
}
|