diff --git a/src/structs/candle.rs b/src/structs/candle.rs index 7cf26b2..733bd36 100644 --- a/src/structs/candle.rs +++ b/src/structs/candle.rs @@ -3,7 +3,7 @@ use tokio_postgres::Row; use super::resolution::Resolution; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct Candle { pub market_name: String, pub start_time: DateTime, diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index d051b47..cf20446 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -5,7 +5,7 @@ use deadpool_postgres::Pool; use log::debug; use crate::{ - database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, + database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle, fetch_candles_from}, structs::{ candle::Candle, markets::MarketInfo, @@ -28,6 +28,8 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul Utc::now().duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; + let existing_candles = fetch_candles_from(pool, market_name, Resolution::R1m, candle.start_time, end_time).await?; + let candles = combine_fills_into_1m_candles( &mut fills, market, @@ -35,7 +37,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul end_time, Some(candle.close), ); - Ok(candles) + Ok(filter_redundant_candles(existing_candles, candles.clone())) } None => { let earliest_fill = fetch_earliest_fill(pool, market_address).await?; @@ -122,6 +124,17 @@ fn combine_fills_into_1m_candles( candles } +fn filter_redundant_candles(existing_candles: Vec, mut candles: Vec) -> Vec { + candles.retain(|c| { + !existing_candles.contains(c) + }); + println!("trimmed: {:?}", candles.len()); + // println!("{:?}", candles.last()); + println!("candles: {:?}", existing_candles.len()); + // println!("{:?}", existing_candles.last()); + candles +} + /// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end. pub async fn backfill_batch_1m_candles( pool: &Pool,