fix: try to use loops to fix timeouts

This commit is contained in:
dboures 2023-03-26 14:39:25 -05:00
parent 23aea990d5
commit ce4e9d1d39
No known key found for this signature in database
GPG Key ID: AB3790129D478852
3 changed files with 33 additions and 26 deletions

View File

@ -91,7 +91,7 @@ fn combine_into_higher_order_candles(
st: DateTime<Utc>,
seed_candle: Candle,
) -> Vec<Candle> {
println!("target_resolution: {}", target_resolution);
// println!("target_resolution: {}", target_resolution);
let duration = target_resolution.get_duration();

View File

@ -7,37 +7,40 @@ use strum::IntoEnumIterator;
use tokio::{sync::mpsc::Sender, time::sleep};
use crate::{
worker::candle_batching::minute_candles::batch_1m_candles,
structs::{candle::Candle, markets::MarketInfo, resolution::Resolution},
worker::candle_batching::minute_candles::batch_1m_candles,
};
use self::higher_order_candles::batch_higher_order_candles;
pub async fn batch_candles(
pub async fn batch_for_market(
pool: Pool<Postgres>,
candles_sender: &Sender<Vec<Candle>>,
markets: Vec<MarketInfo>,
market: &MarketInfo,
) {
let mut handles = vec![];
for market in markets.into_iter() {
loop {
let sender = candles_sender.clone();
let pool_clone = pool.clone();
let market_clone = market.clone();
handles.push(tokio::spawn(async move {
loop {
batch_for_market(&pool_clone, &sender, &market_clone)
.await
.unwrap();
sleep(Duration::milliseconds(2000).to_std().unwrap()).await;
}
}));
loop {
sleep(Duration::milliseconds(2000).to_std().unwrap()).await;
match batch_inner(&pool_clone, &sender, &market_clone).await {
Ok(_) => {}
Err(e) => {
println!(
"Batching thread failed for {:?} with error: {:?}",
market_clone.name.clone(),
e
);
break;
}
};
}
println!("Restarting {:?} batching thread", market.name);
}
futures::future::join_all(handles).await;
}
async fn batch_for_market(
async fn batch_inner(
pool: &Pool<Postgres>,
candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo,

View File

@ -1,6 +1,4 @@
use dotenv;
use openbook_candles::worker::candle_batching::batch_candles;
use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::database::{
initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events},
@ -9,10 +7,12 @@ use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEventLog;
use openbook_candles::utils::Config;
use openbook_candles::worker::candle_batching::batch_for_market;
use openbook_candles::worker::trade_fetching::scrape::scrape;
use solana_sdk::pubkey::Pubkey;
use std::env;
use std::{collections::HashMap, str::FromStr};
use tokio::sync::mpsc;
use std::env;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> {
};
let markets = load_markets(&path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets).await?;
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0);
@ -59,10 +59,14 @@ async fn main() -> anyhow::Result<()> {
let (candle_sender, candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
let batch_pool = pool.clone();
handles.push(tokio::spawn(async move {
batch_candles(batch_pool, &candle_sender, market_infos).await;
}));
for market in market_infos.into_iter() {
let sender = candle_sender.clone();
let batch_pool = pool.clone();
handles.push(tokio::spawn(async move {
batch_for_market(batch_pool, &sender, &market).await;
println!("SOMETHING WENT WRONG");
}));
}
let persist_pool = pool.clone();
handles.push(tokio::spawn(async move {