refactor: markets are held in memory
This commit is contained in:
parent
1036d2b26e
commit
16789aa0b0
|
@ -1,8 +1,5 @@
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use sqlx::{
|
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
|
||||||
postgres::{PgPoolOptions, PgQueryResult},
|
|
||||||
Executor, Pool, Postgres,
|
|
||||||
};
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::hash_map::DefaultHasher,
|
collections::hash_map::DefaultHasher,
|
||||||
hash::{Hash, Hasher},
|
hash::{Hash, Hasher},
|
||||||
|
@ -12,9 +9,11 @@ use tokio::sync::mpsc::{error::TryRecvError, Receiver};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
trade_fetching::parsing::OpenBookFillEventLog,
|
trade_fetching::parsing::OpenBookFillEventLog,
|
||||||
utils::{AnyhowWrap, Config, MarketInfo},
|
utils::{AnyhowWrap, Config},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use super::MarketInfo;
|
||||||
|
|
||||||
pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgres>> {
|
pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgres>> {
|
||||||
loop {
|
loop {
|
||||||
let pool = PgPoolOptions::new()
|
let pool = PgPoolOptions::new()
|
||||||
|
@ -33,8 +32,7 @@ pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgre
|
||||||
pub async fn setup_database(pool: &Pool<Postgres>, markets: Vec<MarketInfo>) -> anyhow::Result<()> {
|
pub async fn setup_database(pool: &Pool<Postgres>, markets: Vec<MarketInfo>) -> anyhow::Result<()> {
|
||||||
let candles_table_fut = create_candles_table(pool);
|
let candles_table_fut = create_candles_table(pool);
|
||||||
let fills_table_fut = create_fills_table(pool);
|
let fills_table_fut = create_fills_table(pool);
|
||||||
let markets_table_fut = create_markets_table(pool, markets);
|
let result = tokio::try_join!(candles_table_fut, fills_table_fut);
|
||||||
let result = tokio::try_join!(candles_table_fut, fills_table_fut, markets_table_fut);
|
|
||||||
match result {
|
match result {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
println!("Successfully configured database");
|
println!("Successfully configured database");
|
||||||
|
@ -62,7 +60,7 @@ pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
||||||
high numeric,
|
high numeric,
|
||||||
low numeric,
|
low numeric,
|
||||||
volume numeric,
|
volume numeric,
|
||||||
vwap numeric
|
complete bool
|
||||||
)",
|
)",
|
||||||
)
|
)
|
||||||
.execute(&mut tx)
|
.execute(&mut tx)
|
||||||
|
@ -78,90 +76,38 @@ pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
||||||
pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
|
||||||
let mut tx = pool.begin().await.map_err_anyhow()?;
|
let mut tx = pool.begin().await.map_err_anyhow()?;
|
||||||
|
|
||||||
sqlx::query(
|
sqlx::query!(
|
||||||
"CREATE TABLE IF NOT EXISTS fills (
|
"CREATE TABLE IF NOT EXISTS fills (
|
||||||
id numeric PRIMARY KEY,
|
id numeric PRIMARY KEY,
|
||||||
time timestamptz,
|
time timestamptz not null,
|
||||||
market text,
|
market text not null,
|
||||||
open_orders text,
|
open_orders text not null,
|
||||||
open_orders_owner text,
|
open_orders_owner text not null,
|
||||||
bid bool,
|
bid bool not null,
|
||||||
maker bool,
|
maker bool not null,
|
||||||
native_qty_paid numeric,
|
native_qty_paid numeric not null,
|
||||||
native_qty_received numeric,
|
native_qty_received numeric not null,
|
||||||
native_fee_or_rebate numeric,
|
native_fee_or_rebate numeric not null,
|
||||||
fee_tier text,
|
fee_tier text not null,
|
||||||
order_id text,
|
order_id text not null,
|
||||||
client_order_id numeric,
|
client_order_id numeric not null,
|
||||||
referrer_rebate numeric
|
referrer_rebate numeric not null
|
||||||
)",
|
)",
|
||||||
)
|
)
|
||||||
.execute(&mut tx)
|
.execute(&mut tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)")
|
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)")
|
||||||
.execute(&mut tx)
|
.execute(&mut tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)")
|
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)")
|
||||||
.execute(&mut tx)
|
.execute(&mut tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
tx.commit().await.map_err_anyhow()
|
tx.commit().await.map_err_anyhow()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn create_markets_table(
|
|
||||||
pool: &Pool<Postgres>,
|
|
||||||
markets: Vec<MarketInfo>,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let mut tx = pool.begin().await.map_err_anyhow()?;
|
|
||||||
|
|
||||||
sqlx::query(
|
|
||||||
"CREATE TABLE IF NOT EXISTS markets (
|
|
||||||
market_name text PRIMARY KEY,
|
|
||||||
address text,
|
|
||||||
base_decimals numeric,
|
|
||||||
quote_decimals numeric,
|
|
||||||
base_lot_size numeric,
|
|
||||||
quote_lot_size numeric
|
|
||||||
)",
|
|
||||||
)
|
|
||||||
.execute(&mut tx)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let insert_statement = build_markets_insert_statement(markets);
|
|
||||||
sqlx::query(&insert_statement).execute(&mut tx).await?;
|
|
||||||
|
|
||||||
tx.commit().await.map_err_anyhow()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build_markets_insert_statement(markets: Vec<MarketInfo>) -> String {
|
|
||||||
let mut stmt = String::from("INSERT INTO markets (market_name, address, base_decimals, quote_decimals, base_lot_size, quote_lot_size) VALUES");
|
|
||||||
for (idx, market) in markets.iter().enumerate() {
|
|
||||||
let val_str = format!(
|
|
||||||
"(\'{}\', \'{}\', {}, {}, {}, {})",
|
|
||||||
market.name,
|
|
||||||
market.address,
|
|
||||||
market.base_decimals,
|
|
||||||
market.quote_decimals,
|
|
||||||
market.base_lot_size,
|
|
||||||
market.quote_lot_size,
|
|
||||||
);
|
|
||||||
|
|
||||||
if idx == 0 {
|
|
||||||
stmt = format!("{} {}", &stmt, val_str);
|
|
||||||
} else {
|
|
||||||
stmt = format!("{}, {}", &stmt, val_str);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let handle_conflict = "ON CONFLICT (market_name) DO UPDATE SET address=excluded.address";
|
|
||||||
|
|
||||||
stmt = format!("{} {}", stmt, handle_conflict);
|
|
||||||
print!("{}", stmt);
|
|
||||||
stmt
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn save_candles() {
|
pub async fn save_candles() {
|
||||||
unimplemented!("TODO");
|
unimplemented!("TODO");
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,16 +26,6 @@ pub struct MarketConfig {
|
||||||
pub address: String,
|
pub address: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct MarketInfo {
|
|
||||||
pub name: String,
|
|
||||||
pub address: String,
|
|
||||||
pub base_decimals: u8,
|
|
||||||
pub quote_decimals: u8,
|
|
||||||
pub base_lot_size: u64,
|
|
||||||
pub quote_lot_size: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn load_markets(path: &str) -> Vec<MarketConfig> {
|
pub fn load_markets(path: &str) -> Vec<MarketConfig> {
|
||||||
let reader = File::open(path).unwrap();
|
let reader = File::open(path).unwrap();
|
||||||
serde_json::from_reader(reader).unwrap()
|
serde_json::from_reader(reader).unwrap()
|
||||||
|
|
Loading…
Reference in New Issue