refactor: make backfilling faster

This commit is contained in:
Lou-Kamades 2023-07-26 20:25:19 -05:00
parent 34dc341059
commit 7bba73b7f3
No known key found for this signature in database
GPG Key ID: 87A166E4D7C01F30
10 changed files with 281 additions and 117 deletions

36
Cargo.lock generated
View File

@ -2615,6 +2615,15 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.5"
@ -3450,6 +3459,7 @@ dependencies = [
"dotenv",
"env_logger 0.10.0",
"futures 0.3.27",
"itertools 0.11.0",
"jsonrpc-core-client",
"lazy_static",
"log 0.4.17",
@ -4014,7 +4024,7 @@ checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e"
dependencies = [
"bytes 1.3.0",
"heck 0.4.0",
"itertools",
"itertools 0.10.5",
"lazy_static",
"log 0.4.17",
"multimap",
@ -4035,7 +4045,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bda8c0881ea9f722eb9629376db3d0b903b462477c1aafcb0566610ac28ac5d"
dependencies = [
"anyhow",
"itertools",
"itertools 0.10.5",
"proc-macro2 1.0.56",
"quote 1.0.26",
"syn 1.0.109",
@ -4877,7 +4887,7 @@ dependencies = [
"default-env",
"enumflags2",
"field-offset",
"itertools",
"itertools 0.10.5",
"num-traits",
"num_enum 0.5.7",
"safe-transmute",
@ -5252,7 +5262,7 @@ dependencies = [
"futures-util",
"indexmap",
"indicatif",
"itertools",
"itertools 0.10.5",
"jsonrpc-core",
"lazy_static",
"log 0.4.17",
@ -5416,7 +5426,7 @@ dependencies = [
"crossbeam-channel",
"flate2",
"indexmap",
"itertools",
"itertools 0.10.5",
"log 0.4.17",
"lru",
"matches",
@ -5465,7 +5475,7 @@ dependencies = [
"dashmap",
"fs_extra",
"futures 0.3.27",
"itertools",
"itertools 0.10.5",
"lazy_static",
"libc",
"log 0.4.17",
@ -5643,7 +5653,7 @@ dependencies = [
"console_log",
"curve25519-dalek",
"getrandom 0.2.8",
"itertools",
"itertools 0.10.5",
"js-sys",
"lazy_static",
"libc",
@ -5682,7 +5692,7 @@ dependencies = [
"bincode",
"eager",
"enum-iterator",
"itertools",
"itertools 0.10.5",
"libc",
"libloading",
"log 0.4.17",
@ -5739,7 +5749,7 @@ dependencies = [
"bs58 0.4.0",
"crossbeam-channel",
"dashmap",
"itertools",
"itertools 0.10.5",
"jsonrpc-core",
"jsonrpc-core-client",
"jsonrpc-derive",
@ -5801,7 +5811,7 @@ dependencies = [
"fnv",
"im",
"index_list",
"itertools",
"itertools 0.10.5",
"lazy_static",
"log 0.4.17",
"lru",
@ -5863,7 +5873,7 @@ dependencies = [
"ed25519-dalek-bip32",
"generic-array",
"hmac 0.12.1",
"itertools",
"itertools 0.10.5",
"js-sys",
"lazy_static",
"libsecp256k1",
@ -6011,7 +6021,7 @@ dependencies = [
"futures-util",
"histogram",
"indexmap",
"itertools",
"itertools 0.10.5",
"libc",
"log 0.4.17",
"nix",
@ -6143,7 +6153,7 @@ dependencies = [
"cipher 0.4.3",
"curve25519-dalek",
"getrandom 0.1.16",
"itertools",
"itertools 0.10.5",
"lazy_static",
"merlin",
"num-derive",

View File

@ -67,3 +67,4 @@ num_enum = "0.6.1"
config = "0.13.1"
prometheus = "0.13.3"
lazy_static = "1.4.0"
itertools = "0.11.0"

View File

@ -296,7 +296,7 @@ Returns 24-hour pricing and volume information on each market available.
**Request:**
`GET /api/coingecko/orderbook/?ticker_id={ticker_id}&depth={depth}`
`GET /api/coingecko/orderbook?ticker_id={ticker_id}&depth={depth}`
Returns order book information with a specified depth for a given market.

View File

@ -1,20 +1,17 @@
use deadpool_postgres::Object;
use openbook_candles::{
database::{initialize::connect_to_database, insert::build_candles_upsert_statement},
database::{initialize::connect_to_database},
structs::{
candle::Candle,
markets::{fetch_market_infos, load_markets},
resolution::Resolution,
},
utils::{AnyhowWrap, Config},
utils::{Config},
worker::candle_batching::{
higher_order_candles::backfill_batch_higher_order_candles,
minute_candles::backfill_batch_1m_candles,
higher_order_candles::backfill_batch_higher_order_candles, minute_candles::backfill_batch_1m_candles,
},
};
use std::env;
use strum::IntoEnumIterator;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
@ -33,31 +30,19 @@ async fn main() -> anyhow::Result<()> {
println!("Backfilling candles for {:?}", markets);
let pool = connect_to_database().await?;
for market in market_infos.into_iter() {
let client = pool.get().await?;
let minute_candles = backfill_batch_1m_candles(&pool, &market).await?;
save_candles(minute_candles, client).await?;
backfill_batch_1m_candles(&pool, market_infos.clone()).await?;
for resolution in Resolution::iter() {
if resolution == Resolution::R1m {
continue;
}
let higher_order_candles =
backfill_batch_higher_order_candles(&pool, &market.name, resolution).await?;
let client = pool.get().await?;
save_candles(higher_order_candles, client).await?;
}
}
Ok(())
}
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
if !candles.is_empty() {
let upsert_statement = build_candles_upsert_statement(&candles);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
let mut handles = vec![];
let mi = market_infos.clone();
for market in mi.into_iter() {
let pc = pool.clone();
handles.push(tokio::spawn(async move {
backfill_batch_higher_order_candles(&pc, &market.name)
.await
.unwrap();
}));
}
futures::future::join_all(handles).await;
Ok(())
}

82
src/database/backfill.rs Normal file
View File

@ -0,0 +1,82 @@
use crate::structs::{candle::Candle, openbook::PgOpenBookFill};
use chrono::{DateTime, Utc};
use deadpool_postgres::{GenericClient, Object};
pub async fn fetch_earliest_fill_multiple_markets(
conn_object: &Object,
market_address_strings: &Vec<String>,
) -> anyhow::Result<Option<PgOpenBookFill>> {
let stmt = r#"SELECT
block_datetime as "time",
market as "market_key",
bid as "bid",
maker as "maker",
price as "price",
size as "size"
from openbook.openbook_fill_events
where market = ANY($1)
and maker = true
ORDER BY time asc LIMIT 1"#;
let row = conn_object
.query_opt(stmt, &[&market_address_strings])
.await?;
match row {
Some(r) => Ok(Some(PgOpenBookFill::from_row(r))),
None => Ok(None),
}
}
pub async fn fetch_fills_multiple_markets_from(
conn_object: &Object,
market_address_strings: &Vec<String>,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgOpenBookFill>> {
let stmt = r#"SELECT
block_datetime as "time",
market as "market_key",
bid as "bid",
maker as "maker",
price as "price",
size as "size"
from openbook.openbook_fill_events
where market = ANY($1)
and block_datetime >= $2::timestamptz
and block_datetime < $3::timestamptz
and maker = true
ORDER BY time asc"#;
let rows = conn_object
.query(stmt, &[&market_address_strings, &start_time, &end_time])
.await?;
Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect())
}
pub async fn fetch_last_minute_candles(conn_object: &Object) -> anyhow::Result<Vec<Candle>> {
let stmt = r#"SELECT
c.market_name as "market_name",
c.start_time as "start_time",
c.end_time as "end_time",
c.resolution as "resolution",
c.open as "open",
c.close as "close",
c.high as "high",
c.low as "low",
c.volume as "volume",
c.complete as "complete"
from
(
select market_name, max(start_time) as max_start_time from openbook.candles
where resolution = '1M'
group by market_name
) mkts
left join openbook.candles c
on mkts.market_name = c.market_name
and mkts.max_start_time = c.start_time
where c.resolution ='1M'"#;
let rows = conn_object.query(stmt, &[]).await?;
Ok(rows.into_iter().map(Candle::from_row).collect())
}

View File

@ -16,6 +16,7 @@ pub async fn fetch_earliest_fill(
let stmt = r#"SELECT
block_datetime as "time",
market as "market_key",
bid as "bid",
maker as "maker",
price as "price",
@ -43,6 +44,7 @@ pub async fn fetch_fills_from(
let stmt = r#"SELECT
block_datetime as "time",
market as "market_key",
bid as "bid",
maker as "maker",
price as "price",
@ -94,7 +96,7 @@ pub async fn fetch_latest_finished_candle(
}
}
/// Fetches all of the candles for the given market and resoultion, starting from the earliest.
/// Fetches all of the candles for the given market and resolution, starting from the earliest.
/// Note that this function will fetch at most 2000 candles.
pub async fn fetch_earliest_candles(
pool: &Pool,

View File

@ -1,3 +1,4 @@
pub mod backfill;
pub mod fetch;
pub mod initialize;
pub mod insert;

View File

@ -3,9 +3,10 @@ use chrono::{DateTime, Utc};
use num_traits::Pow;
use tokio_postgres::Row;
#[derive(Copy, Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub struct PgOpenBookFill {
pub time: DateTime<Utc>,
pub market_key: String,
pub bid: bool,
pub maker: bool,
pub price: f64,
@ -15,10 +16,11 @@ impl PgOpenBookFill {
pub fn from_row(row: Row) -> Self {
PgOpenBookFill {
time: row.get(0),
bid: row.get(1),
maker: row.get(2),
price: row.get(3),
size: row.get(4),
market_key: row.get(1),
bid: row.get(2),
maker: row.get(3),
price: row.get(4),
size: row.get(5),
}
}
}

View File

@ -2,14 +2,21 @@ use chrono::{DateTime, Duration, DurationRound, Utc};
use deadpool_postgres::Pool;
use log::debug;
use std::cmp::{max, min};
use strum::IntoEnumIterator;
use crate::{
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
database::{
fetch::{
fetch_candles_from, fetch_earliest_candles,
fetch_latest_finished_candle,
},
insert::build_candles_upsert_statement,
},
structs::{
candle::Candle,
resolution::{day, Resolution},
},
utils::{f64_max, f64_min},
utils::{f64_max, f64_min, AnyhowWrap},
};
pub async fn batch_higher_order_candles(
@ -34,12 +41,8 @@ pub async fn batch_higher_order_candles(
if constituent_candles.is_empty() {
return Ok(Vec::new());
}
let combined_candles = combine_into_higher_order_candles(
&mut constituent_candles,
resolution,
start_time,
candle,
);
let combined_candles =
combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
Ok(combined_candles)
}
None => {
@ -61,13 +64,8 @@ pub async fn batch_higher_order_candles(
return Ok(Vec::new());
}
let seed_candle = constituent_candles[0].clone();
let combined_candles = combine_into_higher_order_candles(
&mut constituent_candles,
resolution,
start_time,
seed_candle,
);
let combined_candles =
combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
Ok(trim_candles(
combined_candles,
@ -78,10 +76,9 @@ pub async fn batch_higher_order_candles(
}
fn combine_into_higher_order_candles(
constituent_candles: &mut Vec<Candle>,
constituent_candles: &Vec<Candle>,
target_resolution: Resolution,
st: DateTime<Utc>,
seed_candle: Candle,
) -> Vec<Candle> {
debug!("combining for target_resolution: {}", target_resolution);
@ -100,17 +97,16 @@ fn combine_into_higher_order_candles(
let mut combined_candles = vec![empty_candle; num_candles];
let mut con_iter = constituent_candles.iter_mut().peekable();
let mut last_close = constituent_candles[0].close;
let mut con_iter = constituent_candles.iter().peekable();
let mut start_time = st;
let mut end_time = start_time + duration;
let mut last_candle = seed_candle;
for i in 0..combined_candles.len() {
combined_candles[i].open = last_candle.close;
combined_candles[i].low = last_candle.close;
combined_candles[i].close = last_candle.close;
combined_candles[i].high = last_candle.close;
combined_candles[i].open = last_close;
combined_candles[i].low = last_close;
combined_candles[i].close = last_close;
combined_candles[i].high = last_close;
while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) {
let unit_candle = con_iter.next().unwrap();
@ -128,7 +124,7 @@ fn combine_into_higher_order_candles(
start_time = end_time;
end_time += duration;
last_candle = combined_candles[i].clone();
last_close = combined_candles[i].close;
}
combined_candles
@ -149,25 +145,38 @@ fn trim_candles(mut c: Vec<Candle>, start_time: DateTime<Utc>) -> Vec<Candle> {
pub async fn backfill_batch_higher_order_candles(
pool: &Pool,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> {
let mut constituent_candles =
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?;
if constituent_candles.is_empty() {
return Ok(vec![]);
) -> anyhow::Result<()> {
let earliest_candles = fetch_earliest_candles(pool, market_name, Resolution::R1m).await?;
let mut start_time = earliest_candles[0].start_time.duration_trunc(day())?;
while start_time < Utc::now() {
let mut candles = vec![];
let mut constituent_candles = fetch_candles_from(
pool,
market_name,
Resolution::R1m,
start_time,
start_time + day(),
)
.await?;
for resolution in Resolution::iter() {
if resolution == Resolution::R1m {
continue;
}
let mut combined_candles =
combine_into_higher_order_candles(&mut constituent_candles, resolution, start_time);
candles.append(&mut combined_candles);
}
let upsert_statement = build_candles_upsert_statement(&candles);
let client = pool.get().await.unwrap();
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
// println!("{:?} {:?} done", market_name, start_time);
start_time += day();
}
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
let seed_candle = constituent_candles[0].clone();
let combined_candles = combine_into_higher_order_candles(
&mut constituent_candles,
resolution,
start_time,
seed_candle,
);
Ok(trim_candles(
combined_candles,
constituent_candles[0].start_time,
))
Ok(())
}

View File

@ -1,18 +1,26 @@
use std::cmp::min;
use std::{cmp::min, collections::HashMap};
use chrono::{DateTime, Duration, DurationRound, Utc};
use deadpool_postgres::Pool;
use itertools::Itertools;
use log::debug;
use crate::database::backfill::{
fetch_earliest_fill_multiple_markets, fetch_fills_multiple_markets_from,
fetch_last_minute_candles,
};
use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
database::{
fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
insert::build_candles_upsert_statement,
},
structs::{
candle::Candle,
candle::{Candle},
markets::MarketInfo,
openbook::PgOpenBookFill,
resolution::{day, Resolution},
},
utils::{f64_max, f64_min},
utils::{f64_max, f64_min, AnyhowWrap},
};
pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Result<Vec<Candle>> {
@ -28,9 +36,6 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
(Utc::now() + Duration::minutes(1)).duration_trunc(Duration::minutes(1))?,
);
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
if fills.is_empty() {
return Ok(Vec::new());
}
let candles = combine_fills_into_1m_candles(
&mut fills,
@ -124,17 +129,19 @@ fn combine_fills_into_1m_candles(
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
pub async fn backfill_batch_1m_candles(
pool: &Pool,
market: &MarketInfo,
) -> anyhow::Result<Vec<Candle>> {
let market_name = &market.name;
let market_address = &market.address;
let mut candles = vec![];
markets: Vec<MarketInfo>,
) -> anyhow::Result<()> {
let market_address_strings: Vec<String> = markets.iter().map(|m| m.address.clone()).collect();
let mut candle_container = HashMap::new();
let client = pool.get().await?;
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
let earliest_fill =
fetch_earliest_fill_multiple_markets(&client, &market_address_strings).await?;
if earliest_fill.is_none() {
debug!("No fills found for: {:?}", &market_name);
return Ok(candles);
println!("No fills found for backfill");
return Ok(());
}
println!("Found earliset fill for backfill");
let mut start_time = earliest_fill
.unwrap()
@ -145,13 +152,78 @@ pub async fn backfill_batch_1m_candles(
start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
if !fills.is_empty() {
let mut minute_candles =
let last_candles = fetch_last_minute_candles(&client).await?;
let all_fills = fetch_fills_multiple_markets_from(
&client,
&market_address_strings,
start_time,
end_time,
)
.await?;
// println!("{:?} {:?}", start_time, end_time);
// println!("all fills len : {:?}", all_fills.len());
println!("{:?}", all_fills[0]);
// println!("{:?}", all_fills[1]);
// println!("Fetched multiple fills for backfill");
let fills_groups = all_fills
.into_iter()
.sorted_by(|a, b| Ord::cmp(&a.market_key, &b.market_key))
.group_by(|f| f.market_key.clone());
let fills_by_market: Vec<(String, Vec<PgOpenBookFill>)> = fills_groups
.into_iter()
.map(|(m, group)| (m, group.collect()))
.collect();
println!("fbm len : {:?}", fills_by_market.len());
// sort fills by market, make candles
for (_, mut fills) in fills_by_market {
let market = markets
.iter()
.find(|m| m.address == fills[0].market_key)
.unwrap();
let minute_candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
candles.append(&mut minute_candles);
candle_container.insert(&market.address, minute_candles);
}
start_time += day()
// where no candles, make empty ones
for (k, v) in candle_container.iter_mut() {
if v.is_empty() {
let market = markets.iter().find(|m| &m.address == *k).unwrap();
let last_candle = last_candles
.iter()
.find(|c| c.market_name == market.name)
.unwrap();
let empty_candles = combine_fills_into_1m_candles(
&mut vec![],
market,
start_time,
end_time,
Some(last_candle.close),
);
*v = empty_candles;
}
}
// insert candles in batches
for candles in candle_container.values() {
let candle_chunks: Vec<Vec<Candle>> =
candles.chunks(1500).map(|chunk| chunk.to_vec()).collect(); // 1440 minutes in a day
for c in candle_chunks {
let upsert_statement = build_candles_upsert_statement(&c);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()?;
}
}
// reset entries but keep markets we've seen for blank candles
for (_, v) in candle_container.iter_mut() {
*v = vec![];
}
println!("day done");
start_time += day();
}
Ok(candles)
Ok(())
}