diff --git a/candle-creator/src/database/database.rs b/candle-creator/src/database/database.rs index 8978092..9641b5f 100644 --- a/candle-creator/src/database/database.rs +++ b/candle-creator/src/database/database.rs @@ -1,8 +1,5 @@ use chrono::Utc; -use sqlx::{ - postgres::{PgPoolOptions, PgQueryResult}, - Executor, Pool, Postgres, -}; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, @@ -12,9 +9,11 @@ use tokio::sync::mpsc::{error::TryRecvError, Receiver}; use crate::{ trade_fetching::parsing::OpenBookFillEventLog, - utils::{AnyhowWrap, Config, MarketInfo}, + utils::{AnyhowWrap, Config}, }; +use super::MarketInfo; + pub async fn connect_to_database(config: &Config) -> anyhow::Result> { loop { let pool = PgPoolOptions::new() @@ -33,8 +32,7 @@ pub async fn connect_to_database(config: &Config) -> anyhow::Result, markets: Vec) -> anyhow::Result<()> { let candles_table_fut = create_candles_table(pool); let fills_table_fut = create_fills_table(pool); - let markets_table_fut = create_markets_table(pool, markets); - let result = tokio::try_join!(candles_table_fut, fills_table_fut, markets_table_fut); + let result = tokio::try_join!(candles_table_fut, fills_table_fut); match result { Ok(_) => { println!("Successfully configured database"); @@ -62,7 +60,7 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> { high numeric, low numeric, volume numeric, - vwap numeric + complete bool )", ) .execute(&mut tx) @@ -78,90 +76,38 @@ pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> { pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> { let mut tx = pool.begin().await.map_err_anyhow()?; - sqlx::query( + sqlx::query!( "CREATE TABLE IF NOT EXISTS fills ( id numeric PRIMARY KEY, - time timestamptz, - market text, - open_orders text, - open_orders_owner text, - bid bool, - maker bool, - native_qty_paid numeric, - native_qty_received numeric, - native_fee_or_rebate numeric, - fee_tier text, - order_id text, - client_order_id numeric, - referrer_rebate numeric + time timestamptz not null, + market text not null, + open_orders text not null, + open_orders_owner text not null, + bid bool not null, + maker bool not null, + native_qty_paid numeric not null, + native_qty_received numeric not null, + native_fee_or_rebate numeric not null, + fee_tier text not null, + order_id text not null, + client_order_id numeric not null, + referrer_rebate numeric not null )", ) .execute(&mut tx) .await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)") + sqlx::query!("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)") .execute(&mut tx) .await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)") + sqlx::query!("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)") .execute(&mut tx) .await?; tx.commit().await.map_err_anyhow() } -pub async fn create_markets_table( - pool: &Pool, - markets: Vec, -) -> anyhow::Result<()> { - let mut tx = pool.begin().await.map_err_anyhow()?; - - sqlx::query( - "CREATE TABLE IF NOT EXISTS markets ( - market_name text PRIMARY KEY, - address text, - base_decimals numeric, - quote_decimals numeric, - base_lot_size numeric, - quote_lot_size numeric - )", - ) - .execute(&mut tx) - .await?; - - let insert_statement = build_markets_insert_statement(markets); - sqlx::query(&insert_statement).execute(&mut tx).await?; - - tx.commit().await.map_err_anyhow() -} - -fn build_markets_insert_statement(markets: Vec) -> String { - let mut stmt = String::from("INSERT INTO markets (market_name, address, base_decimals, quote_decimals, base_lot_size, quote_lot_size) VALUES"); - for (idx, market) in markets.iter().enumerate() { - let val_str = format!( - "(\'{}\', \'{}\', {}, {}, {}, {})", - market.name, - market.address, - market.base_decimals, - market.quote_decimals, - market.base_lot_size, - market.quote_lot_size, - ); - - if idx == 0 { - stmt = format!("{} {}", &stmt, val_str); - } else { - stmt = format!("{}, {}", &stmt, val_str); - } - } - - let handle_conflict = "ON CONFLICT (market_name) DO UPDATE SET address=excluded.address"; - - stmt = format!("{} {}", stmt, handle_conflict); - print!("{}", stmt); - stmt -} - pub async fn save_candles() { unimplemented!("TODO"); } diff --git a/candle-creator/src/utils/mod.rs b/candle-creator/src/utils/mod.rs index d2a9f79..8985279 100644 --- a/candle-creator/src/utils/mod.rs +++ b/candle-creator/src/utils/mod.rs @@ -26,16 +26,6 @@ pub struct MarketConfig { pub address: String, } -#[derive(Debug)] -pub struct MarketInfo { - pub name: String, - pub address: String, - pub base_decimals: u8, - pub quote_decimals: u8, - pub base_lot_size: u64, - pub quote_lot_size: u64, -} - pub fn load_markets(path: &str) -> Vec { let reader = File::open(path).unwrap(); serde_json::from_reader(reader).unwrap()