|
|
|
@ -5,7 +5,7 @@ use deadpool_postgres::Pool;
|
|
|
|
|
use log::debug;
|
|
|
|
|
|
|
|
|
|
use crate::{
|
|
|
|
|
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
|
|
|
|
|
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle, fetch_candles_from},
|
|
|
|
|
structs::{
|
|
|
|
|
candle::Candle,
|
|
|
|
|
markets::MarketInfo,
|
|
|
|
@ -22,12 +22,16 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
|
|
|
|
|
|
|
|
|
match latest_candle {
|
|
|
|
|
Some(candle) => {
|
|
|
|
|
println!("{}: latest finished candle time {}", market_name, candle.end_time);
|
|
|
|
|
let start_time = candle.end_time;
|
|
|
|
|
let end_time = min(
|
|
|
|
|
start_time + day(),
|
|
|
|
|
Utc::now().duration_trunc(Duration::minutes(1))?,
|
|
|
|
|
);
|
|
|
|
|
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
|
|
|
|
let existing_candles = fetch_candles_from(pool, market_name, Resolution::R1m, candle.start_time, end_time).await?;
|
|
|
|
|
println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time);
|
|
|
|
|
|
|
|
|
|
let candles = combine_fills_into_1m_candles(
|
|
|
|
|
&mut fills,
|
|
|
|
|
market,
|
|
|
|
@ -35,9 +39,12 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
|
|
|
|
end_time,
|
|
|
|
|
Some(candle.close),
|
|
|
|
|
);
|
|
|
|
|
Ok(candles)
|
|
|
|
|
|
|
|
|
|
println!("{}: filtering {} new candles on {} existing candles from {} to {}", market_name, candles.clone().len(), existing_candles.clone().len(), start_time, end_time);
|
|
|
|
|
Ok(filter_redundant_candles(existing_candles, candles.clone()))
|
|
|
|
|
}
|
|
|
|
|
None => {
|
|
|
|
|
println!("{}: no finished candle", market_name);
|
|
|
|
|
let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
|
|
|
|
|
|
|
|
|
|
if earliest_fill.is_none() {
|
|
|
|
@ -54,6 +61,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
|
|
|
|
Utc::now().duration_trunc(Duration::minutes(1))?,
|
|
|
|
|
);
|
|
|
|
|
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
|
|
|
|
println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time);
|
|
|
|
|
if !fills.is_empty() {
|
|
|
|
|
let candles =
|
|
|
|
|
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
|
|
|
@ -99,7 +107,7 @@ fn combine_fills_into_1m_candles(
|
|
|
|
|
|
|
|
|
|
while matches!(fills_iter.peek(), Some(f) if f.time < end_time) {
|
|
|
|
|
let fill = fills_iter.next().unwrap();
|
|
|
|
|
|
|
|
|
|
println!("adding fill from {}", fill.time);
|
|
|
|
|
let (price, volume) =
|
|
|
|
|
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
|
|
|
|
|
|
|
|
|
@ -113,8 +121,16 @@ fn combine_fills_into_1m_candles(
|
|
|
|
|
|
|
|
|
|
candles[i].start_time = start_time;
|
|
|
|
|
candles[i].end_time = end_time;
|
|
|
|
|
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time);
|
|
|
|
|
|
|
|
|
|
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time) || end_time < Utc::now() - Duration::days(1);
|
|
|
|
|
if candles[i].complete {
|
|
|
|
|
println!("candle {} complete with end time {}", i, end_time);
|
|
|
|
|
} else {
|
|
|
|
|
let peeked_fill = fills_iter.peek();
|
|
|
|
|
match peeked_fill {
|
|
|
|
|
Some(f) => println!("candle {} incomplete, peeked fill was at {} and end time was {}", i, f.time, end_time),
|
|
|
|
|
None => {}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
start_time = end_time;
|
|
|
|
|
end_time += Duration::minutes(1);
|
|
|
|
|
}
|
|
|
|
@ -122,6 +138,17 @@ fn combine_fills_into_1m_candles(
|
|
|
|
|
candles
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn filter_redundant_candles(existing_candles: Vec<Candle>, mut candles: Vec<Candle>) -> Vec<Candle> {
|
|
|
|
|
candles.retain(|c| {
|
|
|
|
|
!existing_candles.contains(c)
|
|
|
|
|
});
|
|
|
|
|
println!("trimmed: {:?}", candles.len());
|
|
|
|
|
// println!("{:?}", candles.last());
|
|
|
|
|
println!("candles: {:?}", existing_candles.len());
|
|
|
|
|
// println!("{:?}", existing_candles.last());
|
|
|
|
|
candles
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
|
|
|
|
|
pub async fn backfill_batch_1m_candles(
|
|
|
|
|
pool: &Pool,
|
|
|
|
|