From 7bba73b7f3e0402acc4b6d062241644a8aff74f7 Mon Sep 17 00:00:00 2001 From: Lou-Kamades Date: Wed, 26 Jul 2023 20:25:19 -0500 Subject: [PATCH] refactor: make backfilling faster --- Cargo.lock | 36 ++++-- Cargo.toml | 1 + README.md | 2 +- src/backfill-candles/main.rs | 49 +++----- src/database/backfill.rs | 82 +++++++++++++ src/database/fetch.rs | 4 +- src/database/mod.rs | 1 + src/structs/openbook.rs | 12 +- .../candle_batching/higher_order_candles.rs | 97 ++++++++------- src/worker/candle_batching/minute_candles.rs | 114 ++++++++++++++---- 10 files changed, 281 insertions(+), 117 deletions(-) create mode 100644 src/database/backfill.rs diff --git a/Cargo.lock b/Cargo.lock index b17fae6..e9cbfe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2615,6 +2615,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.5" @@ -3450,6 +3459,7 @@ dependencies = [ "dotenv", "env_logger 0.10.0", "futures 0.3.27", + "itertools 0.11.0", "jsonrpc-core-client", "lazy_static", "log 0.4.17", @@ -4014,7 +4024,7 @@ checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e" dependencies = [ "bytes 1.3.0", "heck 0.4.0", - "itertools", + "itertools 0.10.5", "lazy_static", "log 0.4.17", "multimap", @@ -4035,7 +4045,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bda8c0881ea9f722eb9629376db3d0b903b462477c1aafcb0566610ac28ac5d" dependencies = [ "anyhow", - "itertools", + "itertools 0.10.5", "proc-macro2 1.0.56", "quote 1.0.26", "syn 1.0.109", @@ -4877,7 +4887,7 @@ dependencies = [ "default-env", "enumflags2", "field-offset", - "itertools", + "itertools 0.10.5", "num-traits", "num_enum 0.5.7", "safe-transmute", @@ -5252,7 +5262,7 @@ dependencies = [ "futures-util", "indexmap", "indicatif", - "itertools", + "itertools 0.10.5", "jsonrpc-core", "lazy_static", "log 0.4.17", @@ -5416,7 +5426,7 @@ dependencies = [ "crossbeam-channel", "flate2", "indexmap", - "itertools", + "itertools 0.10.5", "log 0.4.17", "lru", "matches", @@ -5465,7 +5475,7 @@ dependencies = [ "dashmap", "fs_extra", "futures 0.3.27", - "itertools", + "itertools 0.10.5", "lazy_static", "libc", "log 0.4.17", @@ -5643,7 +5653,7 @@ dependencies = [ "console_log", "curve25519-dalek", "getrandom 0.2.8", - "itertools", + "itertools 0.10.5", "js-sys", "lazy_static", "libc", @@ -5682,7 +5692,7 @@ dependencies = [ "bincode", "eager", "enum-iterator", - "itertools", + "itertools 0.10.5", "libc", "libloading", "log 0.4.17", @@ -5739,7 +5749,7 @@ dependencies = [ "bs58 0.4.0", "crossbeam-channel", "dashmap", - "itertools", + "itertools 0.10.5", "jsonrpc-core", "jsonrpc-core-client", "jsonrpc-derive", @@ -5801,7 +5811,7 @@ dependencies = [ "fnv", "im", "index_list", - "itertools", + "itertools 0.10.5", "lazy_static", "log 0.4.17", "lru", @@ -5863,7 +5873,7 @@ dependencies = [ "ed25519-dalek-bip32", "generic-array", "hmac 0.12.1", - "itertools", + "itertools 0.10.5", "js-sys", "lazy_static", "libsecp256k1", @@ -6011,7 +6021,7 @@ dependencies = [ "futures-util", "histogram", "indexmap", - "itertools", + "itertools 0.10.5", "libc", "log 0.4.17", "nix", @@ -6143,7 +6153,7 @@ dependencies = [ "cipher 0.4.3", "curve25519-dalek", "getrandom 0.1.16", - "itertools", + "itertools 0.10.5", "lazy_static", "merlin", "num-derive", diff --git a/Cargo.toml b/Cargo.toml index 6604464..62a810d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,3 +67,4 @@ num_enum = "0.6.1" config = "0.13.1" prometheus = "0.13.3" lazy_static = "1.4.0" +itertools = "0.11.0" diff --git a/README.md b/README.md index 1138778..c9eea59 100644 --- a/README.md +++ b/README.md @@ -296,7 +296,7 @@ Returns 24-hour pricing and volume information on each market available. **Request:** -`GET /api/coingecko/orderbook/?ticker_id={ticker_id}&depth={depth}` +`GET /api/coingecko/orderbook?ticker_id={ticker_id}&depth={depth}` Returns order book information with a specified depth for a given market. diff --git a/src/backfill-candles/main.rs b/src/backfill-candles/main.rs index 9901fe4..e26550a 100644 --- a/src/backfill-candles/main.rs +++ b/src/backfill-candles/main.rs @@ -1,20 +1,17 @@ -use deadpool_postgres::Object; + use openbook_candles::{ - database::{initialize::connect_to_database, insert::build_candles_upsert_statement}, + database::{initialize::connect_to_database}, structs::{ - candle::Candle, markets::{fetch_market_infos, load_markets}, - resolution::Resolution, }, - utils::{AnyhowWrap, Config}, + utils::{Config}, worker::candle_batching::{ - higher_order_candles::backfill_batch_higher_order_candles, - minute_candles::backfill_batch_1m_candles, + higher_order_candles::backfill_batch_higher_order_candles, minute_candles::backfill_batch_1m_candles, }, }; use std::env; -use strum::IntoEnumIterator; + #[tokio::main(flavor = "multi_thread", worker_threads = 10)] async fn main() -> anyhow::Result<()> { @@ -33,31 +30,19 @@ async fn main() -> anyhow::Result<()> { println!("Backfilling candles for {:?}", markets); let pool = connect_to_database().await?; - for market in market_infos.into_iter() { - let client = pool.get().await?; - let minute_candles = backfill_batch_1m_candles(&pool, &market).await?; - save_candles(minute_candles, client).await?; + backfill_batch_1m_candles(&pool, market_infos.clone()).await?; - for resolution in Resolution::iter() { - if resolution == Resolution::R1m { - continue; - } - let higher_order_candles = - backfill_batch_higher_order_candles(&pool, &market.name, resolution).await?; - let client = pool.get().await?; - save_candles(higher_order_candles, client).await?; - } - } - Ok(()) -} - -async fn save_candles(candles: Vec, client: Object) -> anyhow::Result<()> { - if !candles.is_empty() { - let upsert_statement = build_candles_upsert_statement(&candles); - client - .execute(&upsert_statement, &[]) - .await - .map_err_anyhow()?; + let mut handles = vec![]; + let mi = market_infos.clone(); + for market in mi.into_iter() { + let pc = pool.clone(); + handles.push(tokio::spawn(async move { + backfill_batch_higher_order_candles(&pc, &market.name) + .await + .unwrap(); + })); } + + futures::future::join_all(handles).await; Ok(()) } diff --git a/src/database/backfill.rs b/src/database/backfill.rs new file mode 100644 index 0000000..c150377 --- /dev/null +++ b/src/database/backfill.rs @@ -0,0 +1,82 @@ +use crate::structs::{candle::Candle, openbook::PgOpenBookFill}; +use chrono::{DateTime, Utc}; +use deadpool_postgres::{GenericClient, Object}; + +pub async fn fetch_earliest_fill_multiple_markets( + conn_object: &Object, + market_address_strings: &Vec, +) -> anyhow::Result> { + let stmt = r#"SELECT + block_datetime as "time", + market as "market_key", + bid as "bid", + maker as "maker", + price as "price", + size as "size" + from openbook.openbook_fill_events + where market = ANY($1) + and maker = true + ORDER BY time asc LIMIT 1"#; + + let row = conn_object + .query_opt(stmt, &[&market_address_strings]) + .await?; + + match row { + Some(r) => Ok(Some(PgOpenBookFill::from_row(r))), + None => Ok(None), + } +} + +pub async fn fetch_fills_multiple_markets_from( + conn_object: &Object, + market_address_strings: &Vec, + start_time: DateTime, + end_time: DateTime, +) -> anyhow::Result> { + let stmt = r#"SELECT + block_datetime as "time", + market as "market_key", + bid as "bid", + maker as "maker", + price as "price", + size as "size" + from openbook.openbook_fill_events + where market = ANY($1) + and block_datetime >= $2::timestamptz + and block_datetime < $3::timestamptz + and maker = true + ORDER BY time asc"#; + + let rows = conn_object + .query(stmt, &[&market_address_strings, &start_time, &end_time]) + .await?; + Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect()) +} + +pub async fn fetch_last_minute_candles(conn_object: &Object) -> anyhow::Result> { + let stmt = r#"SELECT + c.market_name as "market_name", + c.start_time as "start_time", + c.end_time as "end_time", + c.resolution as "resolution", + c.open as "open", + c.close as "close", + c.high as "high", + c.low as "low", + c.volume as "volume", + c.complete as "complete" + from + ( + select market_name, max(start_time) as max_start_time from openbook.candles + where resolution = '1M' + group by market_name + ) mkts + left join openbook.candles c + on mkts.market_name = c.market_name + and mkts.max_start_time = c.start_time + where c.resolution ='1M'"#; + + let rows = conn_object.query(stmt, &[]).await?; + Ok(rows.into_iter().map(Candle::from_row).collect()) +} diff --git a/src/database/fetch.rs b/src/database/fetch.rs index dfa71d4..bdb257b 100644 --- a/src/database/fetch.rs +++ b/src/database/fetch.rs @@ -16,6 +16,7 @@ pub async fn fetch_earliest_fill( let stmt = r#"SELECT block_datetime as "time", + market as "market_key", bid as "bid", maker as "maker", price as "price", @@ -43,6 +44,7 @@ pub async fn fetch_fills_from( let stmt = r#"SELECT block_datetime as "time", + market as "market_key", bid as "bid", maker as "maker", price as "price", @@ -94,7 +96,7 @@ pub async fn fetch_latest_finished_candle( } } -/// Fetches all of the candles for the given market and resoultion, starting from the earliest. +/// Fetches all of the candles for the given market and resolution, starting from the earliest. /// Note that this function will fetch at most 2000 candles. pub async fn fetch_earliest_candles( pool: &Pool, diff --git a/src/database/mod.rs b/src/database/mod.rs index 1d33d3a..09df44d 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,3 +1,4 @@ +pub mod backfill; pub mod fetch; pub mod initialize; pub mod insert; diff --git a/src/structs/openbook.rs b/src/structs/openbook.rs index 9fdf699..4d02a6b 100644 --- a/src/structs/openbook.rs +++ b/src/structs/openbook.rs @@ -3,9 +3,10 @@ use chrono::{DateTime, Utc}; use num_traits::Pow; use tokio_postgres::Row; -#[derive(Copy, Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct PgOpenBookFill { pub time: DateTime, + pub market_key: String, pub bid: bool, pub maker: bool, pub price: f64, @@ -15,10 +16,11 @@ impl PgOpenBookFill { pub fn from_row(row: Row) -> Self { PgOpenBookFill { time: row.get(0), - bid: row.get(1), - maker: row.get(2), - price: row.get(3), - size: row.get(4), + market_key: row.get(1), + bid: row.get(2), + maker: row.get(3), + price: row.get(4), + size: row.get(5), } } } diff --git a/src/worker/candle_batching/higher_order_candles.rs b/src/worker/candle_batching/higher_order_candles.rs index ec72829..65acb7b 100644 --- a/src/worker/candle_batching/higher_order_candles.rs +++ b/src/worker/candle_batching/higher_order_candles.rs @@ -2,14 +2,21 @@ use chrono::{DateTime, Duration, DurationRound, Utc}; use deadpool_postgres::Pool; use log::debug; use std::cmp::{max, min}; +use strum::IntoEnumIterator; use crate::{ - database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle}, + database::{ + fetch::{ + fetch_candles_from, fetch_earliest_candles, + fetch_latest_finished_candle, + }, + insert::build_candles_upsert_statement, + }, structs::{ candle::Candle, resolution::{day, Resolution}, }, - utils::{f64_max, f64_min}, + utils::{f64_max, f64_min, AnyhowWrap}, }; pub async fn batch_higher_order_candles( @@ -34,12 +41,8 @@ pub async fn batch_higher_order_candles( if constituent_candles.is_empty() { return Ok(Vec::new()); } - let combined_candles = combine_into_higher_order_candles( - &mut constituent_candles, - resolution, - start_time, - candle, - ); + let combined_candles = + combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time); Ok(combined_candles) } None => { @@ -61,13 +64,8 @@ pub async fn batch_higher_order_candles( return Ok(Vec::new()); } - let seed_candle = constituent_candles[0].clone(); - let combined_candles = combine_into_higher_order_candles( - &mut constituent_candles, - resolution, - start_time, - seed_candle, - ); + let combined_candles = + combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time); Ok(trim_candles( combined_candles, @@ -78,10 +76,9 @@ pub async fn batch_higher_order_candles( } fn combine_into_higher_order_candles( - constituent_candles: &mut Vec, + constituent_candles: &Vec, target_resolution: Resolution, st: DateTime, - seed_candle: Candle, ) -> Vec { debug!("combining for target_resolution: {}", target_resolution); @@ -100,17 +97,16 @@ fn combine_into_higher_order_candles( let mut combined_candles = vec![empty_candle; num_candles]; - let mut con_iter = constituent_candles.iter_mut().peekable(); + let mut last_close = constituent_candles[0].close; + let mut con_iter = constituent_candles.iter().peekable(); let mut start_time = st; let mut end_time = start_time + duration; - let mut last_candle = seed_candle; - for i in 0..combined_candles.len() { - combined_candles[i].open = last_candle.close; - combined_candles[i].low = last_candle.close; - combined_candles[i].close = last_candle.close; - combined_candles[i].high = last_candle.close; + combined_candles[i].open = last_close; + combined_candles[i].low = last_close; + combined_candles[i].close = last_close; + combined_candles[i].high = last_close; while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) { let unit_candle = con_iter.next().unwrap(); @@ -128,7 +124,7 @@ fn combine_into_higher_order_candles( start_time = end_time; end_time += duration; - last_candle = combined_candles[i].clone(); + last_close = combined_candles[i].close; } combined_candles @@ -149,25 +145,38 @@ fn trim_candles(mut c: Vec, start_time: DateTime) -> Vec { pub async fn backfill_batch_higher_order_candles( pool: &Pool, market_name: &str, - resolution: Resolution, -) -> anyhow::Result> { - let mut constituent_candles = - fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?; - if constituent_candles.is_empty() { - return Ok(vec![]); +) -> anyhow::Result<()> { + let earliest_candles = fetch_earliest_candles(pool, market_name, Resolution::R1m).await?; + let mut start_time = earliest_candles[0].start_time.duration_trunc(day())?; + while start_time < Utc::now() { + let mut candles = vec![]; + let mut constituent_candles = fetch_candles_from( + pool, + market_name, + Resolution::R1m, + start_time, + start_time + day(), + ) + .await?; + + for resolution in Resolution::iter() { + if resolution == Resolution::R1m { + continue; + } + let mut combined_candles = + combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time); + candles.append(&mut combined_candles); + } + + let upsert_statement = build_candles_upsert_statement(&candles); + let client = pool.get().await.unwrap(); + client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + // println!("{:?} {:?} done", market_name, start_time); + start_time += day(); } - let start_time = constituent_candles[0].start_time.duration_trunc(day())?; - let seed_candle = constituent_candles[0].clone(); - let combined_candles = combine_into_higher_order_candles( - &mut constituent_candles, - resolution, - start_time, - seed_candle, - ); - - Ok(trim_candles( - combined_candles, - constituent_candles[0].start_time, - )) + Ok(()) } diff --git a/src/worker/candle_batching/minute_candles.rs b/src/worker/candle_batching/minute_candles.rs index c129d9f..b092221 100644 --- a/src/worker/candle_batching/minute_candles.rs +++ b/src/worker/candle_batching/minute_candles.rs @@ -1,18 +1,26 @@ -use std::cmp::min; +use std::{cmp::min, collections::HashMap}; use chrono::{DateTime, Duration, DurationRound, Utc}; use deadpool_postgres::Pool; +use itertools::Itertools; use log::debug; +use crate::database::backfill::{ + fetch_earliest_fill_multiple_markets, fetch_fills_multiple_markets_from, + fetch_last_minute_candles, +}; use crate::{ - database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, + database::{ + fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, + insert::build_candles_upsert_statement, + }, structs::{ - candle::Candle, + candle::{Candle}, markets::MarketInfo, openbook::PgOpenBookFill, resolution::{day, Resolution}, }, - utils::{f64_max, f64_min}, + utils::{f64_max, f64_min, AnyhowWrap}, }; pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Result> { @@ -28,9 +36,6 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul (Utc::now() + Duration::minutes(1)).duration_trunc(Duration::minutes(1))?, ); let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; - if fills.is_empty() { - return Ok(Vec::new()); - } let candles = combine_fills_into_1m_candles( &mut fills, @@ -124,17 +129,19 @@ fn combine_fills_into_1m_candles( /// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end. pub async fn backfill_batch_1m_candles( pool: &Pool, - market: &MarketInfo, -) -> anyhow::Result> { - let market_name = &market.name; - let market_address = &market.address; - let mut candles = vec![]; + markets: Vec, +) -> anyhow::Result<()> { + let market_address_strings: Vec = markets.iter().map(|m| m.address.clone()).collect(); + let mut candle_container = HashMap::new(); + let client = pool.get().await?; - let earliest_fill = fetch_earliest_fill(pool, &market.address).await?; + let earliest_fill = + fetch_earliest_fill_multiple_markets(&client, &market_address_strings).await?; if earliest_fill.is_none() { - debug!("No fills found for: {:?}", &market_name); - return Ok(candles); + println!("No fills found for backfill"); + return Ok(()); } + println!("Found earliset fill for backfill"); let mut start_time = earliest_fill .unwrap() @@ -145,13 +152,78 @@ pub async fn backfill_batch_1m_candles( start_time + day(), Utc::now().duration_trunc(Duration::minutes(1))?, ); - let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; - if !fills.is_empty() { - let mut minute_candles = + let last_candles = fetch_last_minute_candles(&client).await?; + let all_fills = fetch_fills_multiple_markets_from( + &client, + &market_address_strings, + start_time, + end_time, + ) + .await?; + // println!("{:?} {:?}", start_time, end_time); + // println!("all fills len : {:?}", all_fills.len()); + println!("{:?}", all_fills[0]); + // println!("{:?}", all_fills[1]); + // println!("Fetched multiple fills for backfill"); + let fills_groups = all_fills + .into_iter() + .sorted_by(|a, b| Ord::cmp(&a.market_key, &b.market_key)) + .group_by(|f| f.market_key.clone()); + + let fills_by_market: Vec<(String, Vec)> = fills_groups + .into_iter() + .map(|(m, group)| (m, group.collect())) + .collect(); + + println!("fbm len : {:?}", fills_by_market.len()); + // sort fills by market, make candles + for (_, mut fills) in fills_by_market { + let market = markets + .iter() + .find(|m| m.address == fills[0].market_key) + .unwrap(); + let minute_candles = combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); - candles.append(&mut minute_candles); + candle_container.insert(&market.address, minute_candles); } - start_time += day() + + // where no candles, make empty ones + for (k, v) in candle_container.iter_mut() { + if v.is_empty() { + let market = markets.iter().find(|m| &m.address == *k).unwrap(); + let last_candle = last_candles + .iter() + .find(|c| c.market_name == market.name) + .unwrap(); + let empty_candles = combine_fills_into_1m_candles( + &mut vec![], + market, + start_time, + end_time, + Some(last_candle.close), + ); + *v = empty_candles; + } + } + + // insert candles in batches + for candles in candle_container.values() { + let candle_chunks: Vec> = + candles.chunks(1500).map(|chunk| chunk.to_vec()).collect(); // 1440 minutes in a day + for c in candle_chunks { + let upsert_statement = build_candles_upsert_statement(&c); + client + .execute(&upsert_statement, &[]) + .await + .map_err_anyhow()?; + } + } + // reset entries but keep markets we've seen for blank candles + for (_, v) in candle_container.iter_mut() { + *v = vec![]; + } + println!("day done"); + start_time += day(); } - Ok(candles) + Ok(()) }