feat: can batch higher order candles
This commit is contained in:
parent
9216cc17d2
commit
53f55aa669
|
@ -0,0 +1,163 @@
|
|||
use chrono::{DateTime, DurationRound, Utc};
|
||||
use num_traits::Zero;
|
||||
use sqlx::{types::Decimal, Pool, Postgres};
|
||||
use std::cmp::{max, min};
|
||||
|
||||
use crate::{
|
||||
candle_batching::DAY,
|
||||
database::{
|
||||
fetch::{fetch_candles_from, fetch_earliest_candle, fetch_latest_finished_candle},
|
||||
Candle, Resolution,
|
||||
},
|
||||
};
|
||||
|
||||
pub async fn batch_higher_order_candles(
|
||||
pool: &Pool<Postgres>,
|
||||
market_address_string: &str,
|
||||
resolution: Resolution,
|
||||
) -> anyhow::Result<Vec<Candle>> {
|
||||
let latest_candle =
|
||||
fetch_latest_finished_candle(pool, market_address_string, resolution).await?;
|
||||
|
||||
match latest_candle {
|
||||
Some(candle) => {
|
||||
let start_time = candle.end_time;
|
||||
let end_time = start_time + DAY();
|
||||
// println!("candle.end_time: {:?}", candle.end_time);
|
||||
// println!("start_time: {:?}", start_time);
|
||||
let mut constituent_candles = fetch_candles_from(
|
||||
pool,
|
||||
market_address_string,
|
||||
resolution.get_constituent_resolution(),
|
||||
start_time,
|
||||
end_time,
|
||||
)
|
||||
.await?;
|
||||
if constituent_candles.len() == 0 {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let combined_candles = combine_into_higher_order_candles(
|
||||
&mut constituent_candles,
|
||||
resolution,
|
||||
start_time,
|
||||
candle,
|
||||
);
|
||||
Ok(combined_candles)
|
||||
}
|
||||
None => {
|
||||
let constituent_candle = fetch_earliest_candle(
|
||||
pool,
|
||||
market_address_string,
|
||||
resolution.get_constituent_resolution(),
|
||||
)
|
||||
.await?;
|
||||
if constituent_candle.is_none() {
|
||||
println!(
|
||||
"Batching {}, but no candles found for: {:?}, {}",
|
||||
resolution,
|
||||
market_address_string,
|
||||
resolution.get_constituent_resolution()
|
||||
);
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let start_time = constituent_candle
|
||||
.unwrap()
|
||||
.start_time
|
||||
.duration_trunc(DAY())?;
|
||||
let end_time = start_time + DAY();
|
||||
|
||||
let mut constituent_candles = fetch_candles_from(
|
||||
pool,
|
||||
market_address_string,
|
||||
resolution.get_constituent_resolution(),
|
||||
start_time,
|
||||
end_time,
|
||||
)
|
||||
.await?;
|
||||
if constituent_candles.len() == 0 {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let seed_candle = constituent_candles[0].clone();
|
||||
let combined_candles = combine_into_higher_order_candles(
|
||||
&mut constituent_candles,
|
||||
resolution,
|
||||
start_time,
|
||||
seed_candle,
|
||||
);
|
||||
|
||||
Ok(trim_zero_candles(combined_candles))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn combine_into_higher_order_candles(
|
||||
constituent_candles: &mut Vec<Candle>,
|
||||
target_resolution: Resolution,
|
||||
st: DateTime<Utc>,
|
||||
seed_candle: Candle,
|
||||
) -> Vec<Candle> {
|
||||
println!("target_resolution: {}", target_resolution);
|
||||
|
||||
let duration = target_resolution.get_duration();
|
||||
|
||||
let candles_len = constituent_candles.len();
|
||||
|
||||
let empty_candle =
|
||||
Candle::create_empty_candle(constituent_candles[0].market.clone(), target_resolution);
|
||||
let mut combined_candles =
|
||||
vec![empty_candle; (DAY().num_minutes() / duration.num_minutes()) as usize];
|
||||
|
||||
println!("candles_len: {}", candles_len);
|
||||
|
||||
let mut con_iter = constituent_candles.iter_mut().peekable();
|
||||
let mut start_time = st.clone();
|
||||
let mut end_time = start_time + duration;
|
||||
|
||||
let mut last_candle = seed_candle;
|
||||
|
||||
for i in 0..combined_candles.len() {
|
||||
combined_candles[i].open = last_candle.close;
|
||||
combined_candles[i].low = last_candle.close;
|
||||
combined_candles[i].close = last_candle.close;
|
||||
combined_candles[i].high = last_candle.close;
|
||||
|
||||
while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) {
|
||||
let unit_candle = con_iter.next().unwrap();
|
||||
combined_candles[i].high = max(combined_candles[i].high, unit_candle.high);
|
||||
combined_candles[i].low = min(combined_candles[i].low, unit_candle.low);
|
||||
combined_candles[i].close = unit_candle.close;
|
||||
combined_candles[i].volume += unit_candle.volume;
|
||||
combined_candles[i].complete = unit_candle.complete;
|
||||
combined_candles[i].end_time = unit_candle.end_time;
|
||||
}
|
||||
|
||||
combined_candles[i].start_time = start_time;
|
||||
combined_candles[i].end_time = end_time;
|
||||
|
||||
start_time = end_time;
|
||||
end_time = end_time + duration;
|
||||
|
||||
last_candle = combined_candles[i].clone();
|
||||
}
|
||||
|
||||
combined_candles
|
||||
}
|
||||
|
||||
fn trim_zero_candles(mut c: Vec<Candle>) -> Vec<Candle> {
|
||||
let mut i = 0;
|
||||
while i < c.len() {
|
||||
if c[i].open == Decimal::zero()
|
||||
&& c[i].high == Decimal::zero()
|
||||
&& c[i].low == Decimal::zero()
|
||||
&& c[i].close == Decimal::zero()
|
||||
&& c[i].volume == Decimal::zero()
|
||||
&& c[i].complete == true
|
||||
{
|
||||
c.remove(i);
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
c
|
||||
}
|
|
@ -1,47 +1,17 @@
|
|||
use std::cmp::{max, min};
|
||||
|
||||
use chrono::{DateTime, Duration, DurationRound, SubsecRound, Utc};
|
||||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||
use num_traits::{FromPrimitive, Zero};
|
||||
use sqlx::{types::Decimal, Pool, Postgres};
|
||||
|
||||
use crate::database::{
|
||||
fetchers::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
|
||||
fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
|
||||
Candle, MarketInfo, PgOpenBookFill, Resolution,
|
||||
};
|
||||
|
||||
pub async fn batch_candles(pool: &Pool<Postgres>, markets: Vec<MarketInfo>) {
|
||||
let m = MarketInfo {
|
||||
name: "BTC/USDC".to_owned(),
|
||||
address: "A8YFbxQYFVqKZaoYJLLUVcQiWP7G2MeEgW5wsAQgMvFw".to_owned(),
|
||||
base_decimals: 6,
|
||||
quote_decimals: 6,
|
||||
base_mint_key: "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU".to_owned(),
|
||||
quote_mint_key: "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_owned(),
|
||||
base_lot_size: 10_000,
|
||||
quote_lot_size: 1,
|
||||
};
|
||||
batch_for_market(&pool.clone(), m).await.unwrap();
|
||||
use super::DAY;
|
||||
|
||||
// for market in markets.iter() {
|
||||
// tokio::spawn(async move {
|
||||
// loop {
|
||||
// batch_for_market(&pool.clone(), &market.address.clone()).await;
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
}
|
||||
|
||||
async fn batch_for_market(pool: &Pool<Postgres>, market: MarketInfo) -> anyhow::Result<()> {
|
||||
let candles = batch_1m_candles(pool, market).await?;
|
||||
// println!("candles {:?}", candles[10]);
|
||||
// database::insert_candles(pool, candles)
|
||||
|
||||
// for resolution in Resolution.iter
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn batch_1m_candles(
|
||||
pub async fn batch_1m_candles(
|
||||
pool: &Pool<Postgres>,
|
||||
market: MarketInfo,
|
||||
) -> anyhow::Result<Vec<Candle>> {
|
||||
|
@ -53,7 +23,7 @@ async fn batch_1m_candles(
|
|||
Some(candle) => {
|
||||
let start_time = candle.end_time;
|
||||
let end_time = min(
|
||||
start_time + Duration::hours(6),
|
||||
start_time + DAY(),
|
||||
Utc::now().duration_trunc(Duration::minutes(1))?,
|
||||
);
|
||||
let mut fills =
|
||||
|
@ -80,7 +50,7 @@ async fn batch_1m_candles(
|
|||
.time
|
||||
.duration_trunc(Duration::minutes(1))?;
|
||||
let end_time = min(
|
||||
start_time + Duration::hours(6),
|
||||
start_time + DAY(),
|
||||
Utc::now().duration_trunc(Duration::minutes(1))?,
|
||||
);
|
||||
let mut fills =
|
||||
|
@ -108,7 +78,7 @@ fn combine_fills_into_1m_candles(
|
|||
let mut start_time = st.clone();
|
||||
let mut end_time = start_time + Duration::minutes(1);
|
||||
|
||||
let mut last_price = maybe_last_price.unwrap_or(Decimal::zero());
|
||||
let mut last_price = maybe_last_price.unwrap_or(Decimal::zero()); // TODO: very first open is wrong
|
||||
|
||||
for i in 0..candles.len() {
|
||||
candles[i].open = last_price;
|
||||
|
@ -122,8 +92,6 @@ fn combine_fills_into_1m_candles(
|
|||
let (price, volume) =
|
||||
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
|
||||
|
||||
println!("{:?}", price);
|
||||
|
||||
candles[i].close = price;
|
||||
candles[i].low = min(price, candles[i].low);
|
||||
candles[i].high = max(price, candles[i].high);
|
||||
|
@ -136,8 +104,6 @@ fn combine_fills_into_1m_candles(
|
|||
candles[i].end_time = end_time;
|
||||
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time);
|
||||
|
||||
println!("{:?}", candles[i]);
|
||||
|
||||
start_time = end_time;
|
||||
end_time = end_time + Duration::minutes(1);
|
||||
}
|
|
@ -1 +1,82 @@
|
|||
pub mod batcher;
|
||||
pub mod higher_order_candles;
|
||||
pub mod minute_candles;
|
||||
|
||||
use std::cmp::{max, min};
|
||||
|
||||
use chrono::{DateTime, Duration, DurationRound, Utc};
|
||||
use num_traits::{FromPrimitive, Zero};
|
||||
use sqlx::{types::Decimal, Pool, Postgres};
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio::{sync::mpsc::Sender, time::sleep};
|
||||
|
||||
use crate::{
|
||||
candle_batching::minute_candles::batch_1m_candles,
|
||||
database::{
|
||||
fetch::{
|
||||
fetch_candles_from, fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle,
|
||||
},
|
||||
Candle, MarketInfo, PgOpenBookFill, Resolution,
|
||||
},
|
||||
};
|
||||
|
||||
use self::higher_order_candles::batch_higher_order_candles;
|
||||
|
||||
pub fn DAY() -> Duration {
|
||||
Duration::days(1)
|
||||
}
|
||||
|
||||
pub async fn batch_candles(
|
||||
pool: Pool<Postgres>,
|
||||
candles_sender: &Sender<Vec<Candle>>,
|
||||
markets: Vec<MarketInfo>,
|
||||
) {
|
||||
// tokio spawn a taks for every market
|
||||
|
||||
loop {
|
||||
let m = MarketInfo {
|
||||
name: "BTC/USDC".to_owned(),
|
||||
address: "A8YFbxQYFVqKZaoYJLLUVcQiWP7G2MeEgW5wsAQgMvFw".to_owned(),
|
||||
base_decimals: 6,
|
||||
quote_decimals: 6,
|
||||
base_mint_key: "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU".to_owned(),
|
||||
quote_mint_key: "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_owned(),
|
||||
base_lot_size: 10_000,
|
||||
quote_lot_size: 1,
|
||||
};
|
||||
|
||||
batch_for_market(&pool.clone(), candles_sender, m)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
sleep(Duration::milliseconds(500).to_std().unwrap()).await;
|
||||
}
|
||||
|
||||
//loop
|
||||
}
|
||||
|
||||
async fn batch_for_market(
|
||||
pool: &Pool<Postgres>,
|
||||
candles_sender: &Sender<Vec<Candle>>,
|
||||
market: MarketInfo,
|
||||
) -> anyhow::Result<()> {
|
||||
let market_address = &market.address.clone();
|
||||
let candles = batch_1m_candles(pool, market).await?;
|
||||
send_candles(candles, candles_sender).await;
|
||||
|
||||
for resolution in Resolution::iter() {
|
||||
if resolution == Resolution::R1m {
|
||||
continue;
|
||||
}
|
||||
let candles = batch_higher_order_candles(pool, market_address, resolution).await?;
|
||||
send_candles(candles, candles_sender).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_candles(candles: Vec<Candle>, candles_sender: &Sender<Vec<Candle>>) {
|
||||
if candles.len() > 0 {
|
||||
if let Err(_) = candles_sender.send(candles).await {
|
||||
panic!("candles receiver dropped");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,8 +5,6 @@ use crate::{database::PgOpenBookFill, utils::AnyhowWrap};
|
|||
|
||||
use super::{Candle, Resolution};
|
||||
|
||||
// use super::PgMarketInfo;
|
||||
|
||||
pub async fn fetch_earliest_fill(
|
||||
pool: &Pool<Postgres>,
|
||||
market_address_string: &str,
|
||||
|
@ -90,4 +88,69 @@ pub async fn fetch_latest_finished_candle(
|
|||
.map_err_anyhow()
|
||||
}
|
||||
|
||||
// fetch_candles
|
||||
pub async fn fetch_earliest_candle(
|
||||
pool: &Pool<Postgres>,
|
||||
market_address_string: &str,
|
||||
resolution: Resolution,
|
||||
) -> anyhow::Result<Option<Candle>> {
|
||||
sqlx::query_as!(
|
||||
Candle,
|
||||
r#"SELECT
|
||||
start_time as "start_time!",
|
||||
end_time as "end_time!",
|
||||
resolution as "resolution!",
|
||||
market as "market!",
|
||||
open as "open!",
|
||||
close as "close!",
|
||||
high as "high!",
|
||||
low as "low!",
|
||||
volume as "volume!",
|
||||
complete as "complete!"
|
||||
from candles
|
||||
where market = $1
|
||||
and resolution = $2
|
||||
ORDER BY start_time asc LIMIT 1"#,
|
||||
market_address_string,
|
||||
resolution.to_string()
|
||||
)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.map_err_anyhow()
|
||||
}
|
||||
|
||||
pub async fn fetch_candles_from(
|
||||
pool: &Pool<Postgres>,
|
||||
market_address_string: &str,
|
||||
resolution: Resolution,
|
||||
start_time: DateTime<Utc>,
|
||||
end_time: DateTime<Utc>,
|
||||
) -> anyhow::Result<Vec<Candle>> {
|
||||
sqlx::query_as!(
|
||||
Candle,
|
||||
r#"SELECT
|
||||
start_time as "start_time!",
|
||||
end_time as "end_time!",
|
||||
resolution as "resolution!",
|
||||
market as "market!",
|
||||
open as "open!",
|
||||
close as "close!",
|
||||
high as "high!",
|
||||
low as "low!",
|
||||
volume as "volume!",
|
||||
complete as "complete!"
|
||||
from candles
|
||||
where market = $1
|
||||
and resolution = $2
|
||||
and start_time >= $3
|
||||
and end_time <= $4
|
||||
and complete = true
|
||||
ORDER BY start_time asc"#,
|
||||
market_address_string,
|
||||
resolution.to_string(),
|
||||
start_time,
|
||||
end_time
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.map_err_anyhow()
|
||||
}
|
|
@ -0,0 +1,117 @@
|
|||
use chrono::Utc;
|
||||
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
|
||||
use std::{
|
||||
collections::hash_map::DefaultHasher,
|
||||
hash::{Hash, Hasher},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::sync::mpsc::{error::TryRecvError, Receiver};
|
||||
|
||||
use crate::{
|
||||
trade_fetching::parsing::OpenBookFillEventLog,
|
||||
utils::{AnyhowWrap, Config},
|
||||
};
|
||||
|
||||
use super::MarketInfo;
|
||||
|
||||
pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgres>> {
|
||||
loop {
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(config.max_pg_pool_connections)
|
||||
.connect(&config.database_url)
|
||||
.await;
|
||||
if pool.is_ok() {
|
||||
println!("Database connected");
|
||||
return pool.map_err_anyhow();
|
||||
}
|
||||
println!("Failed to connect to database, retrying");
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn setup_database(pool: &Pool<Postgres>, markets: Vec<MarketInfo>) -> anyhow::Result<()> {
|
||||
let candles_table_fut = create_candles_table(pool);
|
||||
let fills_table_fut = create_fills_table(pool);
|
||||
let result = tokio::try_join!(candles_table_fut, fills_table_fut);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
println!("Successfully configured database");
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Failed to configure database: {e}");
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
||||
let mut tx = pool.begin().await.map_err_anyhow()?;
|
||||
|
||||
sqlx::query!(
|
||||
"CREATE TABLE IF NOT EXISTS candles (
|
||||
id serial,
|
||||
market text,
|
||||
start_time timestamptz,
|
||||
end_time timestamptz,
|
||||
resolution text,
|
||||
open numeric,
|
||||
close numeric,
|
||||
high numeric,
|
||||
low numeric,
|
||||
volume numeric,
|
||||
complete bool
|
||||
)",
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query!(
|
||||
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market, start_time, resolution)"
|
||||
).execute(&mut tx).await?;
|
||||
|
||||
sqlx::query!(
|
||||
"ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market, start_time, resolution)"
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await.map_err_anyhow()
|
||||
}
|
||||
|
||||
pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
||||
let mut tx = pool.begin().await.map_err_anyhow()?;
|
||||
|
||||
sqlx::query!(
|
||||
"CREATE TABLE IF NOT EXISTS fills (
|
||||
id numeric PRIMARY KEY,
|
||||
time timestamptz not null,
|
||||
market text not null,
|
||||
open_orders text not null,
|
||||
open_orders_owner text not null,
|
||||
bid bool not null,
|
||||
maker bool not null,
|
||||
native_qty_paid numeric not null,
|
||||
native_qty_received numeric not null,
|
||||
native_fee_or_rebate numeric not null,
|
||||
fee_tier text not null,
|
||||
order_id text not null
|
||||
)",
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)")
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)")
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await.map_err_anyhow()
|
||||
}
|
||||
|
||||
pub async fn save_candles() {
|
||||
unimplemented!("TODO");
|
||||
}
|
|
@ -12,105 +12,9 @@ use crate::{
|
|||
utils::{AnyhowWrap, Config},
|
||||
};
|
||||
|
||||
use super::MarketInfo;
|
||||
use super::Candle;
|
||||
|
||||
pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgres>> {
|
||||
loop {
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(config.max_pg_pool_connections)
|
||||
.connect(&config.database_url)
|
||||
.await;
|
||||
if pool.is_ok() {
|
||||
println!("Database connected");
|
||||
return pool.map_err_anyhow();
|
||||
}
|
||||
println!("Failed to connect to database, retrying");
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn setup_database(pool: &Pool<Postgres>, markets: Vec<MarketInfo>) -> anyhow::Result<()> {
|
||||
let candles_table_fut = create_candles_table(pool);
|
||||
let fills_table_fut = create_fills_table(pool);
|
||||
let result = tokio::try_join!(candles_table_fut, fills_table_fut);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
println!("Successfully configured database");
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Failed to configure database: {e}");
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
||||
let mut tx = pool.begin().await.map_err_anyhow()?;
|
||||
|
||||
sqlx::query(
|
||||
"CREATE TABLE IF NOT EXISTS candles (
|
||||
id serial,
|
||||
market text,
|
||||
start_time timestamptz,
|
||||
end_time timestamptz,
|
||||
resolution text,
|
||||
open numeric,
|
||||
close numeric,
|
||||
high numeric,
|
||||
low numeric,
|
||||
volume numeric,
|
||||
complete bool
|
||||
)",
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market, start_time, resolution)"
|
||||
).execute(&mut tx).await?;
|
||||
|
||||
tx.commit().await.map_err_anyhow()
|
||||
}
|
||||
|
||||
pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
||||
let mut tx = pool.begin().await.map_err_anyhow()?;
|
||||
|
||||
sqlx::query!(
|
||||
"CREATE TABLE IF NOT EXISTS fills (
|
||||
id numeric PRIMARY KEY,
|
||||
time timestamptz not null,
|
||||
market text not null,
|
||||
open_orders text not null,
|
||||
open_orders_owner text not null,
|
||||
bid bool not null,
|
||||
maker bool not null,
|
||||
native_qty_paid numeric not null,
|
||||
native_qty_received numeric not null,
|
||||
native_fee_or_rebate numeric not null,
|
||||
fee_tier text not null,
|
||||
order_id text not null
|
||||
)",
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)")
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)")
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await.map_err_anyhow()
|
||||
}
|
||||
|
||||
pub async fn save_candles() {
|
||||
unimplemented!("TODO");
|
||||
}
|
||||
|
||||
pub async fn handle_fill_events(
|
||||
pub async fn persist_fill_events(
|
||||
pool: &Pool<Postgres>,
|
||||
mut fill_receiver: Receiver<OpenBookFillEventLog>,
|
||||
) {
|
||||
|
@ -127,7 +31,7 @@ pub async fn handle_fill_events(
|
|||
}
|
||||
Err(TryRecvError::Empty) => break,
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
panic!("sender must stay alive")
|
||||
panic!("Fills sender must stay alive")
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -179,6 +83,73 @@ fn build_fills_upsert_statement(events: Vec<OpenBookFillEventLog>) -> String {
|
|||
stmt
|
||||
}
|
||||
|
||||
pub async fn persist_candles(pool: Pool<Postgres>, mut candles_receiver: Receiver<Vec<Candle>>) {
|
||||
loop {
|
||||
match candles_receiver.try_recv() {
|
||||
Ok(candles) => {
|
||||
if candles.len() == 0 {
|
||||
continue;
|
||||
}
|
||||
print!("writing: {:?} candles to DB\n", candles.len());
|
||||
match candles.last() {
|
||||
Some(c) => {
|
||||
println!("{:?}\n\n", c.end_time)
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
let upsert_statement = build_candes_upsert_statement(candles);
|
||||
sqlx::query(&upsert_statement)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.map_err_anyhow()
|
||||
.unwrap();
|
||||
}
|
||||
Err(TryRecvError::Empty) => continue,
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
panic!("Candles sender must stay alive")
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn build_candes_upsert_statement(candles: Vec<Candle>) -> String {
|
||||
let mut stmt = String::from("INSERT INTO candles (market, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
|
||||
for (idx, candle) in candles.iter().enumerate() {
|
||||
let val_str = format!(
|
||||
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {})",
|
||||
candle.market,
|
||||
candle.start_time.to_rfc3339(),
|
||||
candle.end_time.to_rfc3339(),
|
||||
candle.resolution,
|
||||
candle.open,
|
||||
candle.close,
|
||||
candle.high,
|
||||
candle.low,
|
||||
candle.volume,
|
||||
candle.complete,
|
||||
);
|
||||
|
||||
if idx == 0 {
|
||||
stmt = format!("{} {}", &stmt, val_str);
|
||||
} else {
|
||||
stmt = format!("{}, {}", &stmt, val_str);
|
||||
}
|
||||
}
|
||||
|
||||
let handle_conflict = "ON CONFLICT (market, start_time, resolution)
|
||||
DO UPDATE SET
|
||||
open=excluded.open,
|
||||
close=excluded.close,
|
||||
high=excluded.high,
|
||||
low=excluded.low,
|
||||
volume=excluded.volume,
|
||||
complete=excluded.complete
|
||||
";
|
||||
|
||||
stmt = format!("{} {}", stmt, handle_conflict);
|
||||
stmt
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
|
@ -1,18 +1,21 @@
|
|||
use std::fmt;
|
||||
|
||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
||||
use num_traits::Zero;
|
||||
use sqlx::types::Decimal;
|
||||
use strum::EnumIter;
|
||||
|
||||
pub mod database;
|
||||
pub mod fetchers;
|
||||
use crate::candle_batching::DAY;
|
||||
|
||||
pub mod fetch;
|
||||
pub mod initialize;
|
||||
pub mod insert;
|
||||
|
||||
pub trait Summary {
|
||||
fn summarize(&self) -> String;
|
||||
}
|
||||
|
||||
#[derive(EnumIter)]
|
||||
#[derive(EnumIter, Copy, Clone, Eq, PartialEq)]
|
||||
pub enum Resolution {
|
||||
R1m,
|
||||
R3m,
|
||||
|
@ -23,7 +26,6 @@ pub enum Resolution {
|
|||
R2h,
|
||||
R4h,
|
||||
R1d,
|
||||
R1w,
|
||||
}
|
||||
|
||||
impl fmt::Display for Resolution {
|
||||
|
@ -38,7 +40,6 @@ impl fmt::Display for Resolution {
|
|||
Resolution::R2h => write!(f, "2H"),
|
||||
Resolution::R4h => write!(f, "4H"),
|
||||
Resolution::R1d => write!(f, "1D"),
|
||||
Resolution::R1w => write!(f, "1W"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -55,22 +56,20 @@ impl Resolution {
|
|||
Resolution::R2h => Resolution::R1h,
|
||||
Resolution::R4h => Resolution::R2h,
|
||||
Resolution::R1d => Resolution::R4h,
|
||||
Resolution::R1w => Resolution::R1d,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_constituent_resolution_factor(self) -> u8 {
|
||||
pub fn get_duration(self) -> Duration {
|
||||
match self {
|
||||
Resolution::R1m => panic!("have to use fills to make 1M candles"),
|
||||
Resolution::R3m => 3,
|
||||
Resolution::R5m => 5,
|
||||
Resolution::R15m => 3,
|
||||
Resolution::R30m => 2,
|
||||
Resolution::R1h => 2,
|
||||
Resolution::R2h => 2,
|
||||
Resolution::R4h => 2,
|
||||
Resolution::R1d => 6,
|
||||
Resolution::R1w => 7,
|
||||
Resolution::R1m => Duration::minutes(1),
|
||||
Resolution::R3m => Duration::minutes(3),
|
||||
Resolution::R5m => Duration::minutes(5),
|
||||
Resolution::R15m => Duration::minutes(15),
|
||||
Resolution::R30m => Duration::minutes(30),
|
||||
Resolution::R1h => Duration::hours(1),
|
||||
Resolution::R2h => Duration::hours(2),
|
||||
Resolution::R4h => Duration::hours(4),
|
||||
Resolution::R1d => DAY(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
use crate::{
|
||||
database::{fetchers::fetch_latest_finished_candle, Resolution},
|
||||
candle_batching::batch_candles,
|
||||
database::{fetch::fetch_latest_finished_candle, insert::persist_candles, Candle, Resolution},
|
||||
trade_fetching::{parsing::OpenBookFillEventLog, scrape::fetch_market_infos},
|
||||
utils::Config,
|
||||
};
|
||||
use database::{
|
||||
database::{connect_to_database, setup_database},
|
||||
fetchers::fetch_earliest_fill,
|
||||
fetch::fetch_earliest_fill,
|
||||
initialize::{connect_to_database, setup_database},
|
||||
};
|
||||
use dotenv;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
@ -39,14 +40,26 @@ async fn main() -> anyhow::Result<()> {
|
|||
// let (fill_sender, fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
|
||||
|
||||
// tokio::spawn(async move {
|
||||
// trade_fetching::scrape::scrape(&config, fill_sender.clone()).await;
|
||||
// trade_fetching::scrape::scrape(&config, fill_sender.clone()).await; TODO: send the vec, it's okay
|
||||
// });
|
||||
|
||||
// database::database::handle_fill_events(&pool, fill_receiver).await;
|
||||
|
||||
// trade_fetching::websocket::listen_logs().await?;
|
||||
|
||||
candle_batching::batcher::batch_candles(&pool, market_infos).await;
|
||||
let (candle_sender, candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
|
||||
|
||||
let batch_pool = pool.clone();
|
||||
tokio::spawn(async move {
|
||||
batch_candles(batch_pool, &candle_sender, market_infos).await;
|
||||
});
|
||||
|
||||
let persist_pool = pool.clone();
|
||||
// tokio::spawn(async move {
|
||||
persist_candles(persist_pool, candle_receiver).await;
|
||||
// });
|
||||
|
||||
loop {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ use std::{io::Error, rc::Rc, str::FromStr, time::Duration};
|
|||
|
||||
use crate::utils::AnyhowWrap;
|
||||
use crate::{
|
||||
database::database::{connect_to_database, setup_database},
|
||||
database::initialize::{connect_to_database, setup_database},
|
||||
utils::Config,
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue