bugfixes and performance improvements
This commit is contained in:
parent
f74cec5d13
commit
89915bf249
|
@ -95,7 +95,7 @@ pub async fn fetch_latest_finished_candle(
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches all of the candles for the given market and resoultion, starting from the earliest.
|
/// Fetches all of the candles for the given market and resoultion, starting from the earliest.
|
||||||
/// Note that this function will fetch ALL candles.
|
/// Note that this function will fetch at most 2000 candles.
|
||||||
pub async fn fetch_earliest_candles(
|
pub async fn fetch_earliest_candles(
|
||||||
pool: &Pool,
|
pool: &Pool,
|
||||||
market_name: &str,
|
market_name: &str,
|
||||||
|
@ -117,7 +117,8 @@ pub async fn fetch_earliest_candles(
|
||||||
from openbook.candles
|
from openbook.candles
|
||||||
where market_name = $1
|
where market_name = $1
|
||||||
and resolution = $2
|
and resolution = $2
|
||||||
ORDER BY start_time asc"#;
|
ORDER BY start_time asc
|
||||||
|
LIMIT 2000"#;
|
||||||
|
|
||||||
let rows = client
|
let rows = client
|
||||||
.query(stmt, &[&market_name, &resolution.to_string()])
|
.query(stmt, &[&market_name, &resolution.to_string()])
|
||||||
|
@ -246,24 +247,27 @@ pub async fn fetch_coingecko_24h_volume(
|
||||||
|
|
||||||
let stmt = r#"SELECT
|
let stmt = r#"SELECT
|
||||||
t1.market,
|
t1.market,
|
||||||
COALESCE(t2.native_quantity_received, 0) as "raw_base_size",
|
COALESCE(t2.base_size, 0) as "base_size",
|
||||||
COALESCE(t2.native_quantity_paid, 0) as "raw_quote_size"
|
COALESCE(t3.quote_size, 0) as "quote_size"
|
||||||
FROM (
|
FROM (
|
||||||
SELECT distinct on (market) *
|
SELECT unnest($1::text[]) as market
|
||||||
FROM openbook.openbook_fill_events f
|
|
||||||
where bid = true
|
|
||||||
and market = any($1)
|
|
||||||
order by market, "time" desc
|
|
||||||
) t1
|
) t1
|
||||||
LEFT JOIN (
|
LEFT JOIN (
|
||||||
select market,
|
select market,
|
||||||
sum(native_quantity_received) as "native_quantity_received",
|
sum("size") as "base_size"
|
||||||
sum(native_quantity_paid) as "native_quantity_paid"
|
|
||||||
from openbook.openbook_fill_events
|
from openbook.openbook_fill_events
|
||||||
where "time" >= current_timestamp - interval '1 day'
|
where block_datetime >= current_timestamp - interval '1 day'
|
||||||
and bid = true
|
and bid = true
|
||||||
group by market
|
group by market
|
||||||
) t2 ON t1.market = t2.market"#;
|
) t2 ON t1.market = t2.market
|
||||||
|
LEFT JOIN (
|
||||||
|
select market,
|
||||||
|
sum("size" * price) as "quote_size"
|
||||||
|
from openbook.openbook_fill_events
|
||||||
|
where block_datetime >= current_timestamp - interval '1 day'
|
||||||
|
and bid = true
|
||||||
|
group by market
|
||||||
|
) t3 ON t1.market = t3.market"#;
|
||||||
|
|
||||||
let rows = client.query(stmt, &[&market_address_strings]).await?;
|
let rows = client.query(stmt, &[&market_address_strings]).await?;
|
||||||
|
|
||||||
|
|
|
@ -101,7 +101,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
client.execute(
|
client.execute(
|
||||||
"CREATE UNIQUE INDEX idx_market_time_resolution ON openbook.candles USING btree (market_name, start_time, resolution);",
|
"CREATE UNIQUE INDEX IF NOT EXISTS idx_market_time_resolution ON openbook.candles USING btree (market_name, start_time, resolution);",
|
||||||
&[]
|
&[]
|
||||||
).await?;
|
).await?;
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tokio_postgres::Row;
|
use tokio_postgres::Row;
|
||||||
|
|
||||||
use super::{markets::MarketInfo, openbook::token_factor};
|
use super::markets::MarketInfo;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
pub struct CoinGeckoOrderBook {
|
pub struct CoinGeckoOrderBook {
|
||||||
|
@ -35,26 +35,24 @@ pub struct CoinGeckoTicker {
|
||||||
|
|
||||||
pub struct PgCoinGecko24HourVolume {
|
pub struct PgCoinGecko24HourVolume {
|
||||||
pub address: String,
|
pub address: String,
|
||||||
pub raw_base_size: f64,
|
pub base_size: f64,
|
||||||
pub raw_quote_size: f64,
|
pub quote_size: f64,
|
||||||
}
|
}
|
||||||
impl PgCoinGecko24HourVolume {
|
impl PgCoinGecko24HourVolume {
|
||||||
pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume {
|
pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume {
|
||||||
let market = markets.iter().find(|m| m.address == self.address).unwrap();
|
let market = markets.iter().find(|m| m.address == self.address).unwrap();
|
||||||
let base_volume = self.raw_base_size / token_factor(market.base_decimals);
|
|
||||||
let target_volume = self.raw_quote_size / token_factor(market.quote_decimals);
|
|
||||||
CoinGecko24HourVolume {
|
CoinGecko24HourVolume {
|
||||||
market_name: market.name.clone(),
|
market_name: market.name.clone(),
|
||||||
base_volume,
|
base_volume: self.base_size,
|
||||||
target_volume,
|
target_volume: self.quote_size,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_row(row: Row) -> Self {
|
pub fn from_row(row: Row) -> Self {
|
||||||
PgCoinGecko24HourVolume {
|
PgCoinGecko24HourVolume {
|
||||||
address: row.get(0),
|
address: row.get(0),
|
||||||
raw_base_size: row.get(1),
|
base_size: row.get(1),
|
||||||
raw_quote_size: row.get(2),
|
quote_size: row.get(2),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||||
use deadpool_postgres::Pool;
|
use deadpool_postgres::Pool;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use std::cmp::max;
|
use std::cmp::{max, min};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
|
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
|
||||||
|
@ -92,7 +92,7 @@ fn combine_into_higher_order_candles(
|
||||||
target_resolution,
|
target_resolution,
|
||||||
);
|
);
|
||||||
let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap();
|
let now = Utc::now().duration_trunc(Duration::minutes(1)).unwrap();
|
||||||
let candle_window = now - st;
|
let candle_window = min(now - st, day());
|
||||||
let num_candles = max(
|
let num_candles = max(
|
||||||
1,
|
1,
|
||||||
(candle_window.num_minutes() / duration.num_minutes()) as usize + 1,
|
(candle_window.num_minutes() / duration.num_minutes()) as usize + 1,
|
||||||
|
|
|
@ -28,6 +28,9 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
||||||
(Utc::now() + Duration::minutes(1)).duration_trunc(Duration::minutes(1))?,
|
(Utc::now() + Duration::minutes(1)).duration_trunc(Duration::minutes(1))?,
|
||||||
);
|
);
|
||||||
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
||||||
|
if fills.is_empty() {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
|
||||||
let candles = combine_fills_into_1m_candles(
|
let candles = combine_fills_into_1m_candles(
|
||||||
&mut fills,
|
&mut fills,
|
||||||
|
|
|
@ -21,7 +21,6 @@ use super::metrics::METRIC_CANDLES_TOTAL;
|
||||||
pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
let market_clone = market.clone();
|
let market_clone = market.clone();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
sleep(Duration::milliseconds(2000).to_std()?).await;
|
sleep(Duration::milliseconds(2000).to_std()?).await;
|
||||||
match batch_inner(pool, &market_clone).await {
|
match batch_inner(pool, &market_clone).await {
|
||||||
|
@ -43,6 +42,9 @@ pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
|
||||||
async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
|
||||||
let market_name = &market.name.clone();
|
let market_name = &market.name.clone();
|
||||||
let candles = batch_1m_candles(pool, market).await?;
|
let candles = batch_1m_candles(pool, market).await?;
|
||||||
|
if candles.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
METRIC_CANDLES_TOTAL
|
METRIC_CANDLES_TOTAL
|
||||||
.with_label_values(&[market.name.as_str()])
|
.with_label_values(&[market.name.as_str()])
|
||||||
.inc_by(candles.clone().len() as u64);
|
.inc_by(candles.clone().len() as u64);
|
||||||
|
|
Loading…
Reference in New Issue