diff --git a/src/database/insert.rs b/src/database/insert.rs index d9a8115..ba95e87 100644 --- a/src/database/insert.rs +++ b/src/database/insert.rs @@ -15,7 +15,6 @@ pub async fn persist_fill_events( pool: &Pool, fill_receiver: &mut Receiver, ) -> anyhow::Result<()> { - let client = pool.get().await?; loop { let mut write_batch = HashMap::new(); while write_batch.len() < 10 { @@ -38,8 +37,8 @@ pub async fn persist_fill_events( if !write_batch.is_empty() { debug!("writing: {:?} events to DB\n", write_batch.len()); - let upsert_statement = build_fills_upsert_statement(write_batch); + let client = pool.get().await?; client .execute(&upsert_statement, &[]) .await @@ -49,33 +48,6 @@ pub async fn persist_fill_events( } } -pub async fn persist_candles( - pool: Pool, - candles_receiver: &mut Receiver>, -) -> anyhow::Result<()> { - let client = pool.get().await.unwrap(); - loop { - match candles_receiver.try_recv() { - Ok(candles) => { - if candles.is_empty() { - continue; - } - debug!("writing: {:?} candles to DB\n", candles.len()); - let upsert_statement = build_candles_upsert_statement(candles); - client - .execute(&upsert_statement, &[]) - .await - .map_err_anyhow() - .unwrap(); - } - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Disconnected) => { - panic!("Candles sender must stay alive") - } - }; - } -} - #[allow(deprecated)] fn build_fills_upsert_statement(events: HashMap) -> String { let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); @@ -111,7 +83,7 @@ fn build_fills_upsert_statement(events: HashMap) -> Strin stmt } -pub fn build_candles_upsert_statement(candles: Vec) -> String { +pub fn build_candles_upsert_statement(candles: &Vec) -> String { let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES"); for (idx, candle) in candles.iter().enumerate() { let val_str = format!( diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index a53bef4..1dcc0b8 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -5,7 +5,7 @@ use deadpool_postgres::Pool; use log::debug; use crate::{ - database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle, fetch_candles_from}, + database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, structs::{ candle::Candle, markets::MarketInfo, @@ -22,15 +22,12 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul match latest_candle { Some(candle) => { - println!("{}: latest finished candle time {}", market_name, candle.end_time); let start_time = candle.end_time; let end_time = min( start_time + day(), - Utc::now().duration_trunc(Duration::minutes(1))?, + (Utc::now() + Duration::minutes(1)).duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; - let existing_candles = fetch_candles_from(pool, market_name, Resolution::R1m, candle.start_time, end_time).await?; - println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time); let candles = combine_fills_into_1m_candles( &mut fills, @@ -39,12 +36,9 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul end_time, Some(candle.close), ); - - println!("{}: filtering {} new candles on {} existing candles from {} to {}", market_name, candles.clone().len(), existing_candles.clone().len(), start_time, end_time); - Ok(filter_redundant_candles(existing_candles, candles.clone())) + Ok(candles) } None => { - println!("{}: no finished candle", market_name); let earliest_fill = fetch_earliest_fill(pool, market_address).await?; if earliest_fill.is_none() { @@ -61,7 +55,6 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; - println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time); if !fills.is_empty() { let candles = combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); @@ -107,7 +100,6 @@ fn combine_fills_into_1m_candles( while matches!(fills_iter.peek(), Some(f) if f.time < end_time) { let fill = fills_iter.next().unwrap(); - println!("adding fill from {}", fill.time); let (price, volume) = calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals); @@ -121,16 +113,8 @@ fn combine_fills_into_1m_candles( candles[i].start_time = start_time; candles[i].end_time = end_time; - candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time) || end_time < Utc::now() - Duration::days(1); - if candles[i].complete { - println!("candle {} complete with end time {}", i, end_time); - } else { - let peeked_fill = fills_iter.peek(); - match peeked_fill { - Some(f) => println!("candle {} incomplete, peeked fill was at {} and end time was {}", i, f.time, end_time), - None => {} - } - } + candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time) + || end_time < Utc::now() - Duration::minutes(10); start_time = end_time; end_time += Duration::minutes(1); } @@ -138,17 +122,6 @@ fn combine_fills_into_1m_candles( candles } -fn filter_redundant_candles(existing_candles: Vec, mut candles: Vec) -> Vec { - candles.retain(|c| { - !existing_candles.contains(c) - }); - println!("trimmed: {:?}", candles.len()); - // println!("{:?}", candles.last()); - println!("candles: {:?}", existing_candles.len()); - // println!("{:?}", existing_candles.last()); - candles -} - /// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end. pub async fn backfill_batch_1m_candles( pool: &Pool, diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index d6303f0..310f9e5 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -5,29 +5,24 @@ use chrono::Duration; use deadpool_postgres::Pool; use log::{error, warn}; use strum::IntoEnumIterator; -use tokio::{sync::mpsc::Sender, time::sleep}; +use tokio::time::sleep; use crate::{ + database::insert::build_candles_upsert_statement, structs::{candle::Candle, markets::MarketInfo, resolution::Resolution}, + utils::AnyhowWrap, worker::candle_batching::minute_candles::batch_1m_candles, }; use self::higher_order_candles::batch_higher_order_candles; -use super::metrics::METRIC_CANDLES_TOTAL; - -pub async fn batch_for_market( - pool: &Pool, - candles_sender: &Sender>, - market: &MarketInfo, -) -> anyhow::Result<()> { +pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { loop { - let sender = candles_sender.clone(); let market_clone = market.clone(); loop { sleep(Duration::milliseconds(2000).to_std()?).await; - match batch_inner(pool, &sender, &market_clone).await { + match batch_inner(pool, &market_clone).await { Ok(_) => {} Err(e) => { error!( @@ -43,34 +38,29 @@ pub async fn batch_for_market( } } -async fn batch_inner( - pool: &Pool, - candles_sender: &Sender>, - market: &MarketInfo, -) -> anyhow::Result<()> { +async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { let market_name = &market.name.clone(); let candles = batch_1m_candles(pool, market).await?; - send_candles(candles.clone(), candles_sender).await; - METRIC_CANDLES_TOTAL - .with_label_values(&[market.name.as_str()]) - .inc_by(candles.clone().len() as u64); + save_candles(pool, candles).await?; for resolution in Resolution::iter() { if resolution == Resolution::R1m { continue; } let candles = batch_higher_order_candles(pool, market_name, resolution).await?; - send_candles(candles.clone(), candles_sender).await; - METRIC_CANDLES_TOTAL - .with_label_values(&[market.name.as_str()]) - .inc_by(candles.clone().len() as u64); + save_candles(pool, candles).await?; } Ok(()) } -async fn send_candles(candles: Vec, candles_sender: &Sender>) { - if !candles.is_empty() { - if let Err(_) = candles_sender.send(candles).await { - panic!("candles receiver dropped"); - } +async fn save_candles(pool: &Pool, candles: Vec) -> anyhow::Result<()> { + if candles.len() == 0 { + return Ok(()); } + let upsert_statement = build_candles_upsert_statement(&candles); + let client = pool.get().await.unwrap(); + client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + Ok(()) } diff --git a/src/worker/main.rs b/src/worker/main.rs index 3f94a7f..996cc41 100644 --- a/src/worker/main.rs +++ b/src/worker/main.rs @@ -1,17 +1,15 @@ use log::{error, info}; -use openbook_candles::structs::candle::Candle; use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::utils::Config; use openbook_candles::worker::metrics::{ - serve_metrics, METRIC_CANDLES_QUEUE_LENGTH, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, - METRIC_FILLS_QUEUE_LENGTH, + serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_FILLS_QUEUE_LENGTH, }; use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::{ database::{ initialize::{connect_to_database, setup_database}, - insert::{persist_candles, persist_fill_events}, + insert::{persist_fill_events}, }, worker::candle_batching::batch_for_market, }; @@ -34,7 +32,6 @@ async fn main() -> anyhow::Result<()> { rpc_url: rpc_url.clone(), }; - let candles_queue_max_size = 10000; let fills_queue_max_size = 10000; let markets = load_markets(path_to_markets_json); @@ -64,31 +61,16 @@ async fn main() -> anyhow::Result<()> { } })); - let (candle_sender, mut candle_receiver) = mpsc::channel::>(candles_queue_max_size); - for market in market_infos.into_iter() { - let sender = candle_sender.clone(); let batch_pool = pool.clone(); handles.push(tokio::spawn(async move { - batch_for_market(&batch_pool, &sender, &market) - .await - .unwrap(); + batch_for_market(&batch_pool, &market).await.unwrap(); error!("batching halted for market {}", &market.name); })); } - let persist_pool = pool.clone(); - handles.push(tokio::spawn(async move { - loop { - persist_candles(persist_pool.clone(), &mut candle_receiver) - .await - .unwrap(); - } - })); - let monitor_pool = pool.clone(); let monitor_fill_channel = fill_sender.clone(); - let monitor_candle_channel = candle_sender.clone(); handles.push(tokio::spawn(async move { // TODO: maybe break this out into a new function loop { @@ -96,8 +78,6 @@ async fn main() -> anyhow::Result<()> { METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); METRIC_DB_POOL_SIZE.set(pool_status.size as i64); - METRIC_CANDLES_QUEUE_LENGTH - .set((candles_queue_max_size - monitor_candle_channel.capacity()) as i64); METRIC_FILLS_QUEUE_LENGTH .set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64); diff --git a/src/worker/metrics/mod.rs b/src/worker/metrics/mod.rs index 2302784..09488f3 100644 --- a/src/worker/metrics/mod.rs +++ b/src/worker/metrics/mod.rs @@ -36,12 +36,6 @@ lazy_static! { METRIC_REGISTRY ) .unwrap(); - pub static ref METRIC_CANDLES_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!( - "candles_queue_length", - "Current length of the candles write queue", - METRIC_REGISTRY - ) - .unwrap(); pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!( "rpc_errors_total",