feat: split backfills into trades and candles
This commit is contained in:
parent
8bbf89677c
commit
7ae8339ebb
|
@ -16,8 +16,12 @@ name = "server"
|
||||||
path = "src/server/main.rs"
|
path = "src/server/main.rs"
|
||||||
|
|
||||||
[[bin]]
|
[[bin]]
|
||||||
name = "backfill"
|
name = "backfill-trades"
|
||||||
path = "src/backfill/main.rs"
|
path = "src/backfill-trades/main.rs"
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "backfill-candles"
|
||||||
|
path = "src/backfill-candles/main.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
|
|
@ -0,0 +1,70 @@
|
||||||
|
use anchor_lang::prelude::Pubkey;
|
||||||
|
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
||||||
|
use deadpool_postgres::Object;
|
||||||
|
use futures::future::join_all;
|
||||||
|
use openbook_candles::{
|
||||||
|
database::{
|
||||||
|
initialize::connect_to_database,
|
||||||
|
insert::{build_candles_upsert_statement, persist_candles},
|
||||||
|
},
|
||||||
|
structs::{
|
||||||
|
candle::Candle,
|
||||||
|
markets::{fetch_market_infos, load_markets},
|
||||||
|
openbook::OpenBookFillEvent,
|
||||||
|
resolution::Resolution,
|
||||||
|
},
|
||||||
|
utils::{AnyhowWrap, Config},
|
||||||
|
worker::candle_batching::{
|
||||||
|
higher_order_candles::backfill_batch_higher_order_candles,
|
||||||
|
minute_candles::backfill_batch_1m_candles,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use std::{collections::HashMap, env, str::FromStr};
|
||||||
|
use strum::IntoEnumIterator;
|
||||||
|
use tokio::sync::mpsc::{self, Sender};
|
||||||
|
|
||||||
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
dotenv::dotenv().ok();
|
||||||
|
let args: Vec<String> = env::args().collect();
|
||||||
|
assert!(args.len() == 2);
|
||||||
|
|
||||||
|
let path_to_markets_json = &args[1];
|
||||||
|
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
|
||||||
|
|
||||||
|
let config = Config {
|
||||||
|
rpc_url: rpc_url.clone(),
|
||||||
|
};
|
||||||
|
let markets = load_markets(&path_to_markets_json);
|
||||||
|
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||||
|
println!("Backfilling candles for {:?}", markets);
|
||||||
|
|
||||||
|
let pool = connect_to_database().await?;
|
||||||
|
for market in market_infos.into_iter() {
|
||||||
|
let client = pool.get().await?;
|
||||||
|
let minute_candles = backfill_batch_1m_candles(&pool, &market).await?;
|
||||||
|
save_candles(minute_candles, client).await?;
|
||||||
|
|
||||||
|
for resolution in Resolution::iter() {
|
||||||
|
if resolution == Resolution::R1m {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let higher_order_candles =
|
||||||
|
backfill_batch_higher_order_candles(&pool, &market.name, resolution).await?;
|
||||||
|
let client = pool.get().await?;
|
||||||
|
save_candles(higher_order_candles, client).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save_candles(candles: Vec<Candle>, client: Object) -> anyhow::Result<()> {
|
||||||
|
if candles.len() > 0 {
|
||||||
|
let upsert_statement = build_candles_upsert_statement(candles);
|
||||||
|
client
|
||||||
|
.execute(&upsert_statement, &[])
|
||||||
|
.await
|
||||||
|
.map_err_anyhow()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -5,7 +5,7 @@ use openbook_candles::{
|
||||||
database::{initialize::connect_to_database, insert::persist_fill_events},
|
database::{initialize::connect_to_database, insert::persist_fill_events},
|
||||||
structs::{
|
structs::{
|
||||||
markets::{fetch_market_infos, load_markets},
|
markets::{fetch_market_infos, load_markets},
|
||||||
openbook::OpenBookFillEventLog,
|
openbook::OpenBookFillEvent,
|
||||||
},
|
},
|
||||||
utils::Config,
|
utils::Config,
|
||||||
worker::trade_fetching::parsing::parse_trades_from_openbook_txns,
|
worker::trade_fetching::parsing::parse_trades_from_openbook_txns,
|
||||||
|
@ -19,7 +19,7 @@ use solana_transaction_status::UiTransactionEncoding;
|
||||||
use std::{collections::HashMap, env, str::FromStr};
|
use std::{collections::HashMap, env, str::FromStr};
|
||||||
use tokio::sync::mpsc::{self, Sender};
|
use tokio::sync::mpsc::{self, Sender};
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
dotenv::dotenv().ok();
|
dotenv::dotenv().ok();
|
||||||
let args: Vec<String> = env::args().collect();
|
let args: Vec<String> = env::args().collect();
|
||||||
|
@ -40,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
println!("{:?}", target_markets);
|
println!("{:?}", target_markets);
|
||||||
|
|
||||||
let pool = connect_to_database().await?;
|
let pool = connect_to_database().await?;
|
||||||
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
|
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
|
@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
pub async fn backfill(
|
pub async fn backfill(
|
||||||
rpc_url: String,
|
rpc_url: String,
|
||||||
fill_sender: &Sender<OpenBookFillEventLog>,
|
fill_sender: &Sender<OpenBookFillEvent>,
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
target_markets: &HashMap<Pubkey, u8>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
println!("backfill started");
|
println!("backfill started");
|
||||||
|
@ -75,7 +75,6 @@ pub async fn backfill(
|
||||||
Some((last, time, sigs)) => {
|
Some((last, time, sigs)) => {
|
||||||
now_time = time;
|
now_time = time;
|
||||||
before_sig = Some(last);
|
before_sig = Some(last);
|
||||||
|
|
||||||
let time_left = backfill_time_left(now_time, end_time);
|
let time_left = backfill_time_left(now_time, end_time);
|
||||||
println!(
|
println!(
|
||||||
"{} minutes ~ {} days remaining in the backfill\n",
|
"{} minutes ~ {} days remaining in the backfill\n",
|
||||||
|
@ -134,6 +133,7 @@ pub async fn get_signatures(
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let last = sigs.last().unwrap();
|
let last = sigs.last().unwrap();
|
||||||
|
// println!("{:?}", last.block_time.unwrap());
|
||||||
return Some((
|
return Some((
|
||||||
Signature::from_str(&last.signature).unwrap(),
|
Signature::from_str(&last.signature).unwrap(),
|
||||||
last.block_time.unwrap(),
|
last.block_time.unwrap(),
|
||||||
|
@ -144,7 +144,7 @@ pub async fn get_signatures(
|
||||||
pub async fn get_transactions(
|
pub async fn get_transactions(
|
||||||
rpc_client: &RpcClient,
|
rpc_client: &RpcClient,
|
||||||
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
|
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
|
||||||
fill_sender: &Sender<OpenBookFillEventLog>,
|
fill_sender: &Sender<OpenBookFillEvent>,
|
||||||
target_markets: &HashMap<Pubkey, u8>,
|
target_markets: &HashMap<Pubkey, u8>,
|
||||||
) {
|
) {
|
||||||
sigs.retain(|sig| sig.err.is_none());
|
sigs.retain(|sig| sig.err.is_none());
|
||||||
|
@ -173,6 +173,7 @@ pub async fn get_transactions(
|
||||||
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
|
||||||
if fills.len() > 0 {
|
if fills.len() > 0 {
|
||||||
for fill in fills.into_iter() {
|
for fill in fills.into_iter() {
|
||||||
|
// println!("Sending fill {:?}", fill);
|
||||||
if let Err(_) = fill_sender.send(fill).await {
|
if let Err(_) = fill_sender.send(fill).await {
|
||||||
panic!("receiver dropped");
|
panic!("receiver dropped");
|
||||||
}
|
}
|
|
@ -111,6 +111,8 @@ pub async fn fetch_latest_finished_candle(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetches all of the candles for the given market and resoultion, starting from the earliest.
|
||||||
|
/// Note that this function will fetch ALL candles.
|
||||||
pub async fn fetch_earliest_candles(
|
pub async fn fetch_earliest_candles(
|
||||||
pool: &Pool,
|
pool: &Pool,
|
||||||
market_name: &str,
|
market_name: &str,
|
||||||
|
|
|
@ -72,7 +72,7 @@ pub async fn persist_candles(
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// print!("writing: {:?} candles to DB\n", candles.len());
|
// print!("writing: {:?} candles to DB\n", candles.len());
|
||||||
let upsert_statement = build_candes_upsert_statement(candles);
|
let upsert_statement = build_candles_upsert_statement(candles);
|
||||||
client
|
client
|
||||||
.execute(&upsert_statement, &[])
|
.execute(&upsert_statement, &[])
|
||||||
.await
|
.await
|
||||||
|
@ -128,7 +128,7 @@ fn build_fills_upsert_statement(events: HashMap<OpenBookFillEvent, u8>) -> Strin
|
||||||
stmt
|
stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_candes_upsert_statement(candles: Vec<Candle>) -> String {
|
pub fn build_candles_upsert_statement(candles: Vec<Candle>) -> String {
|
||||||
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
|
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
|
||||||
for (idx, candle) in candles.iter().enumerate() {
|
for (idx, candle) in candles.iter().enumerate() {
|
||||||
let val_str = format!(
|
let val_str = format!(
|
||||||
|
|
|
@ -144,3 +144,29 @@ fn trim_candles(mut c: Vec<Candle>, start_time: DateTime<Utc>) -> Vec<Candle> {
|
||||||
}
|
}
|
||||||
c
|
c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn backfill_batch_higher_order_candles(
|
||||||
|
pool: &Pool,
|
||||||
|
market_name: &str,
|
||||||
|
resolution: Resolution,
|
||||||
|
) -> anyhow::Result<Vec<Candle>> {
|
||||||
|
let mut constituent_candles =
|
||||||
|
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()).await?;
|
||||||
|
if constituent_candles.len() == 0 {
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
|
||||||
|
|
||||||
|
let seed_candle = constituent_candles[0].clone();
|
||||||
|
let combined_candles = combine_into_higher_order_candles(
|
||||||
|
&mut constituent_candles,
|
||||||
|
resolution,
|
||||||
|
start_time,
|
||||||
|
seed_candle,
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(trim_candles(
|
||||||
|
combined_candles,
|
||||||
|
constituent_candles[0].start_time,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
|
@ -120,3 +120,38 @@ fn combine_fills_into_1m_candles(
|
||||||
|
|
||||||
candles
|
candles
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
|
||||||
|
pub async fn backfill_batch_1m_candles(
|
||||||
|
pool: &Pool,
|
||||||
|
market: &MarketInfo,
|
||||||
|
) -> anyhow::Result<Vec<Candle>> {
|
||||||
|
let market_name = &market.name;
|
||||||
|
let market_address = &market.address;
|
||||||
|
let mut candles = vec![];
|
||||||
|
|
||||||
|
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
|
||||||
|
if earliest_fill.is_none() {
|
||||||
|
println!("No fills found for: {:?}", &market_name);
|
||||||
|
return Ok(candles);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut start_time = earliest_fill
|
||||||
|
.unwrap()
|
||||||
|
.time
|
||||||
|
.duration_trunc(Duration::minutes(1))?;
|
||||||
|
while start_time < Utc::now() {
|
||||||
|
let end_time = min(
|
||||||
|
start_time + day(),
|
||||||
|
Utc::now().duration_trunc(Duration::minutes(1))?,
|
||||||
|
);
|
||||||
|
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
|
||||||
|
if fills.len() > 0 {
|
||||||
|
let mut minute_candles =
|
||||||
|
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
|
||||||
|
candles.append(&mut minute_candles);
|
||||||
|
}
|
||||||
|
start_time += day()
|
||||||
|
}
|
||||||
|
Ok(candles)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue