diff --git a/src/database/fetch.rs b/src/database/fetch.rs index b7f3162..dfa71d4 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -95,7 +95,7 @@ pub async fn fetch_latest_finished_candle( } /// Fetches all of the candles for the given market and resoultion, starting from the earliest. -/// Note that this function will fetch ALL candles. +/// Note that this function will fetch at most 2000 candles. pub async fn fetch_earliest_candles( pool: &Pool, market_name: &str, @@ -117,7 +117,8 @@ pub async fn fetch_earliest_candles( from openbook.candles where market_name = $1 and resolution = $2 - ORDER BY start_time asc"#; + ORDER BY start_time asc + LIMIT 2000"#; let rows = client .query(stmt, &[&market_name, &resolution.to_string()]) @@ -246,24 +247,27 @@ pub async fn fetch_coingecko_24h_volume( let stmt = r#"SELECT t1.market, - COALESCE(t2.native_quantity_received, 0) as "raw_base_size", - COALESCE(t2.native_quantity_paid, 0) as "raw_quote_size" + COALESCE(t2.base_size, 0) as "base_size", + COALESCE(t3.quote_size, 0) as "quote_size" FROM ( - SELECT distinct on (market) * - FROM openbook.openbook_fill_events f - where bid = true - and market = any($1) - order by market, "time" desc + SELECT unnest($1::text[]) as market ) t1 LEFT JOIN ( select market, - sum(native_quantity_received) as "native_quantity_received", - sum(native_quantity_paid) as "native_quantity_paid" + sum("size") as "base_size" from openbook.openbook_fill_events - where "time" >= current_timestamp - interval '1 day' + where block_datetime >= current_timestamp - interval '1 day' and bid = true group by market - ) t2 ON t1.market = t2.market"#; + ) t2 ON t1.market = t2.market + LEFT JOIN ( + select market, + sum("size" * price) as "quote_size" + from openbook.openbook_fill_events + where block_datetime >= current_timestamp - interval '1 day' + and bid = true + group by market + ) t3 ON t1.market = t3.market"#; let rows = client.query(stmt, &[&market_address_strings]).await?; diff --git a/src/database/initialize.rs b/src/database/initialize.rs index cd33e67..019be0d 100644 --- a/src/database/initialize.rs +++ b/src/database/initialize.rs @@ -101,7 +101,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> { .await?; client.execute( - "CREATE UNIQUE INDEX idx_market_time_resolution ON openbook.candles USING btree (market_name, start_time, resolution);", + "CREATE UNIQUE INDEX IF NOT EXISTS idx_market_time_resolution ON openbook.candles USING btree (market_name, start_time, resolution);", &[] ).await?; diff --git a/src/structs/coingecko.rs b/src/structs/coingecko.rs index 132a416..f7c7673 100644 --- a/src/structs/coingecko.rs +++ b/src/structs/coingecko.rs @@ -1,7 +1,7 @@ use serde::Serialize; use tokio_postgres::Row; -use super::{markets::MarketInfo, openbook::token_factor}; +use super::markets::MarketInfo; #[derive(Debug, Clone, Serialize)] pub struct CoinGeckoOrderBook { @@ -35,26 +35,24 @@ pub struct CoinGeckoTicker { pub struct PgCoinGecko24HourVolume { pub address: String, - pub raw_base_size: f64, - pub raw_quote_size: f64, + pub base_size: f64, + pub quote_size: f64, } impl PgCoinGecko24HourVolume { pub fn convert_to_readable(&self, markets: &Vec) -> CoinGecko24HourVolume { let market = markets.iter().find(|m| m.address == self.address).unwrap(); - let base_volume = self.raw_base_size / token_factor(market.base_decimals); - let target_volume = self.raw_quote_size / token_factor(market.quote_decimals); CoinGecko24HourVolume { market_name: market.name.clone(), - base_volume, - target_volume, + base_volume: self.base_size, + target_volume: self.quote_size, } } pub fn from_row(row: Row) -> Self { PgCoinGecko24HourVolume { address: row.get(0), - raw_base_size: row.get(1), - raw_quote_size: row.get(2), + base_size: row.get(1), + quote_size: row.get(2), } } } diff --git a/src/worker/candle_batching/higher_order_candles.rs b/src/worker/candle_batching/higher_order_candles.rs index e621a08..ec72829 100644 --- a/src/worker/candle_batching/higher_order_candles.rs +++ b/src/worker/candle_batching/higher_order_candles.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Duration, DurationRound, Utc}; use deadpool_postgres::Pool; use log::debug; -use std::cmp::max; +use std::cmp::{max, min}; use crate::{ database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle}, @@ -92,7 +92,7 @@ fn combine_into_higher_order_candles( target_resolution, ); let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap(); - let candle_window = now - st; + let candle_window = min(now - st, day()); let num_candles = max( 1, (candle_window.num_minutes() / duration.num_minutes()) as usize + 1, diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index 1d29559..c129d9f 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -28,6 +28,9 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul (Utc::now() + Duration::minutes(1)).duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; + if fills.is_empty() { + return Ok(Vec::new()); + } let candles = combine_fills_into_1m_candles( &mut fills, diff --git a/src/worker/candle_batching/mod.rs b/src/worker/candle_batching/mod.rs index 04d0550..346471c 100644 --- a/src/worker/candle_batching/mod.rs +++ b/src/worker/candle_batching/mod.rs @@ -21,7 +21,6 @@ use super::metrics::METRIC_CANDLES_TOTAL; pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { loop { let market_clone = market.clone(); - loop { sleep(Duration::milliseconds(2000).to_std()?).await; match batch_inner(pool, &market_clone).await { @@ -43,6 +42,9 @@ pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Resul async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> { let market_name = &market.name.clone(); let candles = batch_1m_candles(pool, market).await?; + if candles.is_empty() { + return Ok(()); + } METRIC_CANDLES_TOTAL .with_label_values(&[market.name.as_str()]) .inc_by(candles.clone().len() as u64);