refactor: update candle db schema to support market names

This commit is contained in:
dboures 2023-03-12 20:03:39 -05:00
parent 379ba2c9c6
commit f4e1e5c3bb
No known key found for this signature in database
GPG Key ID: AB3790129D478852
8 changed files with 134 additions and 133 deletions

View File

@ -20,16 +20,6 @@
},
"query": "CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)"
},
"61ce67d221cf35cea33940529f7d38af1514961245f4abc95b872e88cc0dc1e0": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "DO $$\n BEGIN\n IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN\n ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market, start_time, resolution);\n END IF;\n END $$"
},
"63fd83accf07c969461c66ca26a28f506dbbc2e8c0b7d74d9a06ccf52aa1b8b6": {
"describe": {
"columns": [
@ -48,6 +38,16 @@
},
"query": "Select COUNT(*) as total from fills"
},
"6658c0121e5a7defbd1fe7c549ca0a957b188b9eb1837573a05d0e6476ef945a": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE TABLE IF NOT EXISTS candles (\n id serial,\n market_name text,\n start_time timestamptz,\n end_time timestamptz,\n resolution text,\n open numeric,\n close numeric,\n high numeric,\n low numeric,\n volume numeric,\n complete bool\n )"
},
"817ee7903cb5095f85fb787beff04ace3a452cf8749205bb230e41d8c9e03c4a": {
"describe": {
"columns": [],
@ -58,7 +58,7 @@
},
"query": "CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)"
},
"830392e8e03b8e34490df87905873ffea0749f6d321bd32a99b90766d3d7e167": {
"98d4c97eeb4f0d14fd9284a968e88837456117a1c1995df37e615902b482ff10": {
"describe": {
"columns": [
{
@ -77,7 +77,7 @@
"type_info": "Text"
},
{
"name": "market!",
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
@ -131,9 +131,9 @@
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market as \"market!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market = $1\n and resolution = $2\n and complete = true\n ORDER BY start_time desc LIMIT 1"
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n ORDER BY start_time asc LIMIT 1"
},
"900d4bd7a81a308648cd47eecb3b86b5e7afbbdc34e93ef35393ceab00fb8552": {
"a69ebc1820a6635f83a99fddc1bf8ae0c7a3184de6d38e129baa6397c61efc60": {
"describe": {
"columns": [
{
@ -152,82 +152,7 @@
"type_info": "Text"
},
{
"name": "market!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market as \"market!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market = $1\n and resolution = $2\n ORDER BY start_time asc LIMIT 1"
},
"aa9adbbc7f215e28cc07b615f30ad9d3f415f25260cc87a1556225f01e0ef3be": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market!",
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
@ -283,7 +208,17 @@
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market as \"market!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n and complete = true\n ORDER BY start_time asc"
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n and complete = true\n ORDER BY start_time asc"
},
"b259d64b388eb675b727ee511529472177b59ea616041360217afc2d928f33ed": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)"
},
"b71ec8e5c041cec2c83778f31f7dc723cc1b5a8b6bb0e2c7f8ba7ef31f117965": {
"describe": {
@ -387,7 +322,7 @@
},
"query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1 \n and maker = true\n ORDER BY time asc LIMIT 1"
},
"ebf9f73491ea62c20a25245080abb0be928e22a0d622fafa48bb01db34e84b94": {
"e94788e9eb04534dc13a73f80255705fb39789caa6dfb43e8417471f8399bb85": {
"describe": {
"columns": [],
"nullable": [],
@ -395,16 +330,81 @@
"Left": []
}
},
"query": "CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market, start_time, resolution)"
"query": "DO $$\n BEGIN\n IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN\n ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);\n END IF;\n END $$"
},
"ef6422f34cc3e649a90fbbbc6ad668de6ee4b0994c52f72d0295985085d7047b": {
"fc5b19647fbdffb44ab87517ca2a6787f8eab3cc59a1633551524acab44425b6": {
"describe": {
"columns": [],
"nullable": [],
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": []
"Left": [
"Text",
"Text"
]
}
},
"query": "CREATE TABLE IF NOT EXISTS candles (\n id serial,\n market text,\n start_time timestamptz,\n end_time timestamptz,\n resolution text,\n open numeric,\n close numeric,\n high numeric,\n low numeric,\n volume numeric,\n complete bool\n )"
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and complete = true\n ORDER BY start_time desc LIMIT 1"
}
}

View File

@ -13,11 +13,11 @@ use crate::{
pub async fn batch_higher_order_candles(
pool: &Pool<Postgres>,
market_address_string: &str,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> {
let latest_candle =
fetch_latest_finished_candle(pool, market_address_string, resolution).await?;
fetch_latest_finished_candle(pool, market_name, resolution).await?;
match latest_candle {
Some(candle) => {
@ -27,7 +27,7 @@ pub async fn batch_higher_order_candles(
// println!("start_time: {:?}", start_time);
let mut constituent_candles = fetch_candles_from(
pool,
market_address_string,
market_name,
resolution.get_constituent_resolution(),
start_time,
end_time,
@ -47,7 +47,7 @@ pub async fn batch_higher_order_candles(
None => {
let constituent_candle = fetch_earliest_candle(
pool,
market_address_string,
market_name,
resolution.get_constituent_resolution(),
)
.await?;
@ -55,7 +55,7 @@ pub async fn batch_higher_order_candles(
println!(
"Batching {}, but no candles found for: {:?}, {}",
resolution,
market_address_string,
market_name,
resolution.get_constituent_resolution()
);
return Ok(Vec::new());
@ -68,7 +68,7 @@ pub async fn batch_higher_order_candles(
let mut constituent_candles = fetch_candles_from(
pool,
market_address_string,
market_name,
resolution.get_constituent_resolution(),
start_time,
end_time,
@ -104,7 +104,7 @@ fn combine_into_higher_order_candles(
let candles_len = constituent_candles.len();
let empty_candle =
Candle::create_empty_candle(constituent_candles[0].market.clone(), target_resolution);
Candle::create_empty_candle(constituent_candles[0].market_name.clone(), target_resolution);
let mut combined_candles =
vec![empty_candle; (day().num_minutes() / duration.num_minutes()) as usize];

View File

@ -15,9 +15,10 @@ pub async fn batch_1m_candles(
pool: &Pool<Postgres>,
market: MarketInfo,
) -> anyhow::Result<Vec<Candle>> {
let market_address_string = &market.address;
let market_name = &market.name;
let market_address = &market.address;
let latest_candle =
fetch_latest_finished_candle(pool, market_address_string, Resolution::R1m).await?;
fetch_latest_finished_candle(pool, market_name, Resolution::R1m).await?;
match latest_candle {
Some(candle) => {
@ -27,7 +28,7 @@ pub async fn batch_1m_candles(
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills =
fetch_fills_from(pool, market_address_string, start_time, end_time).await?;
fetch_fills_from(pool, market_address, start_time, end_time).await?;
let candles = combine_fills_into_1m_candles(
&mut fills,
market,
@ -38,10 +39,10 @@ pub async fn batch_1m_candles(
Ok(candles)
}
None => {
let earliest_fill = fetch_earliest_fill(pool, market_address_string).await?;
let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
if earliest_fill.is_none() {
println!("No fills found for: {:?}", market_address_string);
println!("No fills found for: {:?}", market_name);
return Ok(Vec::new());
}
@ -54,7 +55,7 @@ pub async fn batch_1m_candles(
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills =
fetch_fills_from(pool, market_address_string, start_time, end_time).await?;
fetch_fills_from(pool, market_address, start_time, end_time).await?;
let candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
Ok(candles)
@ -69,7 +70,7 @@ fn combine_fills_into_1m_candles(
et: DateTime<Utc>,
maybe_last_price: Option<Decimal>,
) -> Vec<Candle> {
let empty_candle = Candle::create_empty_candle(market.address, Resolution::R1m);
let empty_candle = Candle::create_empty_candle(market.name, Resolution::R1m);
let minutes = (et - st).num_minutes();
let mut candles = vec![empty_candle; minutes as usize];

View File

@ -1,11 +1,11 @@
use std::{collections::HashMap, str::FromStr};
use openbook_candles::candle_batching::batch_candles;
use openbook_candles::candle_creation::candle_batching::batch_candles;
use openbook_candles::database::{
insert::{persist_candles, persist_fill_events},
initialize::{connect_to_database, setup_database},
Candle,
};
use openbook_candles::trade_fetching::{
use openbook_candles::candle_creation::trade_fetching::{
backfill::backfill,
parsing::OpenBookFillEventLog,
scrape::{fetch_market_infos, scrape},

View File

@ -61,7 +61,7 @@ pub async fn fetch_fills_from(
pub async fn fetch_latest_finished_candle(
pool: &Pool<Postgres>,
market_address_string: &str,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Option<Candle>> {
sqlx::query_as!(
@ -70,7 +70,7 @@ pub async fn fetch_latest_finished_candle(
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market as "market!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
@ -78,11 +78,11 @@ pub async fn fetch_latest_finished_candle(
volume as "volume!",
complete as "complete!"
from candles
where market = $1
where market_name = $1
and resolution = $2
and complete = true
ORDER BY start_time desc LIMIT 1"#,
market_address_string,
market_name,
resolution.to_string()
)
.fetch_optional(pool)
@ -92,7 +92,7 @@ pub async fn fetch_latest_finished_candle(
pub async fn fetch_earliest_candle(
pool: &Pool<Postgres>,
market_address_string: &str,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Option<Candle>> {
sqlx::query_as!(
@ -101,7 +101,7 @@ pub async fn fetch_earliest_candle(
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market as "market!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
@ -109,10 +109,10 @@ pub async fn fetch_earliest_candle(
volume as "volume!",
complete as "complete!"
from candles
where market = $1
where market_name = $1
and resolution = $2
ORDER BY start_time asc LIMIT 1"#,
market_address_string,
market_name,
resolution.to_string()
)
.fetch_optional(pool)
@ -133,7 +133,7 @@ pub async fn fetch_candles_from(
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market as "market!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
@ -141,7 +141,7 @@ pub async fn fetch_candles_from(
volume as "volume!",
complete as "complete!"
from candles
where market = $1
where market_name = $1
and resolution = $2
and start_time >= $3
and end_time <= $4

View File

@ -40,7 +40,7 @@ pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
sqlx::query!(
"CREATE TABLE IF NOT EXISTS candles (
id serial,
market text,
market_name text,
start_time timestamptz,
end_time timestamptz,
resolution text,
@ -56,14 +56,14 @@ pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
.await?;
sqlx::query!(
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market, start_time, resolution)"
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)"
).execute(&mut tx).await?;
sqlx::query!(
"DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market, start_time, resolution);
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);
END IF;
END $$"
)

View File

@ -108,11 +108,11 @@ pub async fn persist_candles(pool: Pool<Postgres>, mut candles_receiver: Receive
}
fn build_candes_upsert_statement(candles: Vec<Candle>) -> String {
let mut stmt = String::from("INSERT INTO candles (market, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
let mut stmt = String::from("INSERT INTO candles (market_name, start_time, end_time, resolution, open, close, high, low, volume, complete) VALUES");
for (idx, candle) in candles.iter().enumerate() {
let val_str = format!(
"(\'{}\', \'{}\', \'{}\', \'{}\', {}, {}, {}, {}, {}, {})",
candle.market,
candle.market_name,
candle.start_time.to_rfc3339(),
candle.end_time.to_rfc3339(),
candle.resolution,
@ -131,7 +131,7 @@ fn build_candes_upsert_statement(candles: Vec<Candle>) -> String {
}
}
let handle_conflict = "ON CONFLICT (market, start_time, resolution)
let handle_conflict = "ON CONFLICT (market_name, start_time, resolution)
DO UPDATE SET
open=excluded.open,
close=excluded.close,

View File

@ -76,7 +76,7 @@ impl Resolution {
#[derive(Clone, Debug)]
pub struct Candle {
pub market: String,
pub market_name: String,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub resolution: String,
@ -89,9 +89,9 @@ pub struct Candle {
}
impl Candle {
pub fn create_empty_candle(market: String, resolution: Resolution) -> Candle {
pub fn create_empty_candle(market_name: String, resolution: Resolution) -> Candle {
Candle {
market,
market_name,
start_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
end_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
resolution: resolution.to_string(),