openbook-candles/src/candle_creation/main.rs

72 lines
2.3 KiB
Rust
Raw Normal View History

2023-03-12 22:03:58 -07:00
use dotenv;
use openbook_candles::candle_creation::candle_batching::batch_candles;
use openbook_candles::candle_creation::trade_fetching::scrape::{fetch_market_infos, scrape};
2023-03-12 22:03:58 -07:00
use openbook_candles::database::{
initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events},
};
2023-03-13 09:51:30 -07:00
use openbook_candles::structs::candle::Candle;
2023-03-12 22:03:58 -07:00
use openbook_candles::structs::markets::load_markets;
use openbook_candles::structs::openbook::OpenBookFillEventLog;
2023-03-13 09:51:30 -07:00
use openbook_candles::utils::Config;
2023-03-11 14:50:22 -08:00
use solana_sdk::pubkey::Pubkey;
2023-03-12 22:03:58 -07:00
use std::{collections::HashMap, str::FromStr};
2023-03-05 22:52:42 -08:00
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
2023-03-13 09:51:30 -07:00
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
.unwrap()
.parse::<u32>()
.unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
database_url,
2023-03-12 22:03:58 -07:00
max_pg_pool_connections,
2023-03-05 22:52:42 -08:00
};
2023-03-12 17:48:53 -07:00
let markets = load_markets("/Users/dboures/dev/openbook-candles/markets.json");
2023-03-09 20:54:51 -08:00
let market_infos = fetch_market_infos(&config, markets).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0);
}
println!("{:?}", target_markets);
2023-03-05 22:52:42 -08:00
let pool = connect_to_database(&config).await?;
setup_database(&pool).await?;
let mut handles = vec![];
let (fill_sender, fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
2023-03-05 22:52:42 -08:00
handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay
}));
2023-03-12 00:13:57 -08:00
let fills_pool = pool.clone();
handles.push(tokio::spawn(async move {
persist_fill_events(&fills_pool, fill_receiver).await;
}));
2023-03-12 00:13:57 -08:00
let (candle_sender, candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
let batch_pool = pool.clone();
handles.push(tokio::spawn(async move {
batch_candles(batch_pool, &candle_sender, market_infos).await;
}));
2023-03-12 00:13:57 -08:00
let persist_pool = pool.clone();
handles.push(tokio::spawn(async move {
persist_candles(persist_pool, candle_receiver).await;
}));
futures::future::join_all(handles).await;
2023-03-11 14:50:22 -08:00
2023-03-05 22:52:42 -08:00
Ok(())
}