refactor: remove candles queue

This commit is contained in:
dboures 2023-06-03 12:22:10 -05:00
parent 89e2fa7178
commit e0d677c241
No known key found for this signature in database
GPG Key ID: AB3790129D478852
5 changed files with 28 additions and 119 deletions

View File

@ -15,7 +15,6 @@ pub async fn persist_fill_events(
pool: &Pool, pool: &Pool,
fill_receiver: &mut Receiver<OpenBookFillEvent>, fill_receiver: &mut Receiver<OpenBookFillEvent>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let client = pool.get().await?;
loop { loop {
let mut write_batch = HashMap::new(); let mut write_batch = HashMap::new();
while write_batch.len() < 10 { while write_batch.len() < 10 {
@ -38,8 +37,8 @@ pub async fn persist_fill_events(
if !write_batch.is_empty() { if !write_batch.is_empty() {
debug!("writing: {:?} events to DB\n", write_batch.len()); debug!("writing: {:?} events to DB\n", write_batch.len());
let upsert_statement = build_fills_upsert_statement(write_batch); let upsert_statement = build_fills_upsert_statement(write_batch);
let client = pool.get().await?;
client client
.execute(&upsert_statement, &[]) .execute(&upsert_statement, &[])
.await .await
@ -49,33 +48,6 @@ pub async fn persist_fill_events(
} }
} }
pub async fn persist_candles(
pool: Pool,
candles_receiver: &mut Receiver<Vec<Candle>>,
) -> anyhow::Result<()> {
let client = pool.get().await.unwrap();
loop {
match candles_receiver.try_recv() {
Ok(candles) => {
if candles.is_empty() {
continue;
}
debug!("writing: {:?} candles to DB\n", candles.len());
let upsert_statement = build_candles_upsert_statement(candles);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()
.unwrap();
}
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
panic!("Candles sender must stay alive")
}
};
}
}
#[allow(deprecated)] #[allow(deprecated)]
fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String { fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> String {
let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES"); let mut stmt = String::from("INSERT INTO fills (id, time, market, open_orders, open_orders_owner, bid, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, fee_tier, order_id) VALUES");
@ -111,7 +83,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> Strin
stmt stmt
} }
pub fn build_candles_upsert_statement(candles: Vec<Candle>) -> String { pub fn build_candles_upsert_statement(candles: &Vec<Candle>) -> String {
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES"); let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
for (idx, candle) in candles.iter().enumerate() { for (idx, candle) in candles.iter().enumerate() {
let val_str = format!( let val_str = format!(

View File

@ -5,7 +5,7 @@ use deadpool_postgres::Pool;
use log::debug; use log::debug;
use crate::{ use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle, fetch_candles_from}, database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
structs::{ structs::{
candle::Candle, candle::Candle,
markets::MarketInfo, markets::MarketInfo,
@ -22,15 +22,12 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
match latest_candle { match latest_candle {
Some(candle) => { Some(candle) => {
println!("{}: latest finished candle time {}", market_name, candle.end_time);
let start_time = candle.end_time; let start_time = candle.end_time;
let end_time = min( let end_time = min(
start_time + day(), start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?, (Utc::now() + Duration::minutes(1)).duration_trunc(Duration::minutes(1))?,
); );
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
let existing_candles = fetch_candles_from(pool, market_name, Resolution::R1m, candle.start_time, end_time).await?;
println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time);
let candles = combine_fills_into_1m_candles( let candles = combine_fills_into_1m_candles(
&mut fills, &mut fills,
@ -39,12 +36,9 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
end_time, end_time,
Some(candle.close), Some(candle.close),
); );
Ok(candles)
println!("{}: filtering {} new candles on {} existing candles from {} to {}", market_name, candles.clone().len(), existing_candles.clone().len(), start_time, end_time);
Ok(filter_redundant_candles(existing_candles, candles.clone()))
} }
None => { None => {
println!("{}: no finished candle", market_name);
let earliest_fill = fetch_earliest_fill(pool, market_address).await?; let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
if earliest_fill.is_none() { if earliest_fill.is_none() {
@ -61,7 +55,6 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
Utc::now().duration_trunc(Duration::minutes(1))?, Utc::now().duration_trunc(Duration::minutes(1))?,
); );
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time);
if !fills.is_empty() { if !fills.is_empty() {
let candles = let candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
@ -107,7 +100,6 @@ fn combine_fills_into_1m_candles(
while matches!(fills_iter.peek(), Some(f) if f.time < end_time) { while matches!(fills_iter.peek(), Some(f) if f.time < end_time) {
let fill = fills_iter.next().unwrap(); let fill = fills_iter.next().unwrap();
println!("adding fill from {}", fill.time);
let (price, volume) = let (price, volume) =
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals); calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
@ -121,16 +113,8 @@ fn combine_fills_into_1m_candles(
candles[i].start_time = start_time; candles[i].start_time = start_time;
candles[i].end_time = end_time; candles[i].end_time = end_time;
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time) || end_time < Utc::now() - Duration::days(1); candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time)
if candles[i].complete { || end_time < Utc::now() - Duration::minutes(10);
println!("candle {} complete with end time {}", i, end_time);
} else {
let peeked_fill = fills_iter.peek();
match peeked_fill {
Some(f) => println!("candle {} incomplete, peeked fill was at {} and end time was {}", i, f.time, end_time),
None => {}
}
}
start_time = end_time; start_time = end_time;
end_time += Duration::minutes(1); end_time += Duration::minutes(1);
} }
@ -138,17 +122,6 @@ fn combine_fills_into_1m_candles(
candles candles
} }
fn filter_redundant_candles(existing_candles: Vec<Candle>, mut candles: Vec<Candle>) -> Vec<Candle> {
candles.retain(|c| {
!existing_candles.contains(c)
});
println!("trimmed: {:?}", candles.len());
// println!("{:?}", candles.last());
println!("candles: {:?}", existing_candles.len());
// println!("{:?}", existing_candles.last());
candles
}
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end. /// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
pub async fn backfill_batch_1m_candles( pub async fn backfill_batch_1m_candles(
pool: &Pool, pool: &Pool,

View File

@ -5,29 +5,24 @@ use chrono::Duration;
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::{error, warn}; use log::{error, warn};
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
use tokio::{sync::mpsc::Sender, time::sleep}; use tokio::time::sleep;
use crate::{ use crate::{
database::insert::build_candles_upsert_statement,
structs::{candle::Candle, markets::MarketInfo, resolution::Resolution}, structs::{candle::Candle, markets::MarketInfo, resolution::Resolution},
utils::AnyhowWrap,
worker::candle_batching::minute_candles::batch_1m_candles, worker::candle_batching::minute_candles::batch_1m_candles,
}; };
use self::higher_order_candles::batch_higher_order_candles; use self::higher_order_candles::batch_higher_order_candles;
use super::metrics::METRIC_CANDLES_TOTAL; pub async fn batch_for_market(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
pub async fn batch_for_market(
pool: &Pool,
candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo,
) -> anyhow::Result<()> {
loop { loop {
let sender = candles_sender.clone();
let market_clone = market.clone(); let market_clone = market.clone();
loop { loop {
sleep(Duration::milliseconds(2000).to_std()?).await; sleep(Duration::milliseconds(2000).to_std()?).await;
match batch_inner(pool, &sender, &market_clone).await { match batch_inner(pool, &market_clone).await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
error!( error!(
@ -43,34 +38,29 @@ pub async fn batch_for_market(
} }
} }
async fn batch_inner( async fn batch_inner(pool: &Pool, market: &MarketInfo) -> anyhow::Result<()> {
pool: &Pool,
candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo,
) -> anyhow::Result<()> {
let market_name = &market.name.clone(); let market_name = &market.name.clone();
let candles = batch_1m_candles(pool, market).await?; let candles = batch_1m_candles(pool, market).await?;
send_candles(candles.clone(), candles_sender).await; save_candles(pool, candles).await?;
METRIC_CANDLES_TOTAL
.with_label_values(&[market.name.as_str()])
.inc_by(candles.clone().len() as u64);
for resolution in Resolution::iter() { for resolution in Resolution::iter() {
if resolution == Resolution::R1m { if resolution == Resolution::R1m {
continue; continue;
} }
let candles = batch_higher_order_candles(pool, market_name, resolution).await?; let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
send_candles(candles.clone(), candles_sender).await; save_candles(pool, candles).await?;
METRIC_CANDLES_TOTAL
.with_label_values(&[market.name.as_str()])
.inc_by(candles.clone().len() as u64);
} }
Ok(()) Ok(())
} }
async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) { async fn save_candles(pool: &Pool, candles: Vec<Candle>) -> anyhow::Result<()> {
if !candles.is_empty() { if candles.len() == 0 {
if let Err(_) = candles_sender.send(candles).await { return Ok(());
panic!("candles receiver dropped");
}
} }
let upsert_statement = build_candles_upsert_statement(&candles);
let client = pool.get().await.unwrap();
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
Ok(())
} }

View File

@ -1,17 +1,15 @@
use log::{error, info}; use log::{error, info};
use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::structs::openbook::OpenBookFillEvent;
use openbook_candles::utils::Config; use openbook_candles::utils::Config;
use openbook_candles::worker::metrics::{ use openbook_candles::worker::metrics::{
serve_metrics, METRIC_CANDLES_QUEUE_LENGTH, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, serve_metrics, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE, METRIC_FILLS_QUEUE_LENGTH,
METRIC_FILLS_QUEUE_LENGTH,
}; };
use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::{ use openbook_candles::{
database::{ database::{
initialize::{connect_to_database, setup_database}, initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events}, insert::{persist_fill_events},
}, },
worker::candle_batching::batch_for_market, worker::candle_batching::batch_for_market,
}; };
@ -34,7 +32,6 @@ async fn main() -> anyhow::Result<()> {
rpc_url: rpc_url.clone(), rpc_url: rpc_url.clone(),
}; };
let candles_queue_max_size = 10000;
let fills_queue_max_size = 10000; let fills_queue_max_size = 10000;
let markets = load_markets(path_to_markets_json); let markets = load_markets(path_to_markets_json);
@ -64,31 +61,16 @@ async fn main() -> anyhow::Result<()> {
} }
})); }));
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(candles_queue_max_size);
for market in market_infos.into_iter() { for market in market_infos.into_iter() {
let sender = candle_sender.clone();
let batch_pool = pool.clone(); let batch_pool = pool.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
batch_for_market(&batch_pool, &sender, &market) batch_for_market(&batch_pool, &market).await.unwrap();
.await
.unwrap();
error!("batching halted for market {}", &market.name); error!("batching halted for market {}", &market.name);
})); }));
} }
let persist_pool = pool.clone();
handles.push(tokio::spawn(async move {
loop {
persist_candles(persist_pool.clone(), &mut candle_receiver)
.await
.unwrap();
}
}));
let monitor_pool = pool.clone(); let monitor_pool = pool.clone();
let monitor_fill_channel = fill_sender.clone(); let monitor_fill_channel = fill_sender.clone();
let monitor_candle_channel = candle_sender.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
// TODO: maybe break this out into a new function // TODO: maybe break this out into a new function
loop { loop {
@ -96,8 +78,6 @@ async fn main() -> anyhow::Result<()> {
METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64); METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64);
METRIC_DB_POOL_SIZE.set(pool_status.size as i64); METRIC_DB_POOL_SIZE.set(pool_status.size as i64);
METRIC_CANDLES_QUEUE_LENGTH
.set((candles_queue_max_size - monitor_candle_channel.capacity()) as i64);
METRIC_FILLS_QUEUE_LENGTH METRIC_FILLS_QUEUE_LENGTH
.set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64); .set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64);

View File

@ -36,12 +36,6 @@ lazy_static! {
METRIC_REGISTRY METRIC_REGISTRY
) )
.unwrap(); .unwrap();
pub static ref METRIC_CANDLES_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!(
"candles_queue_length",
"Current length of the candles write queue",
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec = pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec =
register_int_counter_vec_with_registry!( register_int_counter_vec_with_registry!(
"rpc_errors_total", "rpc_errors_total",