Compare commits

...

8 Commits

Author SHA1 Message Date
Riordan Panayides 3a515d61a0 Merge remote-tracking branch 'upstream/main' into pan/metrics 2023-05-30 18:03:20 +01:00
Riordan Panayides 35937c9572 Add metrics to worker, add market name to map, cargo fmt, fix RAY/USDC id 2023-05-30 18:01:47 +01:00
Riordan Panayides 9489bd3e78 Add RAY/USDC market 2023-05-30 12:16:20 +01:00
dboures 93d95005f5
chore: remove stale sqlx file + update markets.json 2023-05-29 23:35:47 -05:00
dboures 9c7a42f83e
refactor: don't use prepared statements 2023-05-29 23:35:06 -05:00
dboures b1999c0061
fix: /tickers returns last prices if stale 2023-05-29 23:31:57 -05:00
Riordan Panayides ac2e906688 Enable fly metric polling 2023-05-29 20:48:53 +01:00
Riordan Panayides 522e7a5865 Add mango mainnet markets 2023-05-29 20:42:26 +01:00
17 changed files with 165 additions and 668 deletions

1
Cargo.lock generated
View File

@ -3451,6 +3451,7 @@ dependencies = [
"env_logger 0.10.0", "env_logger 0.10.0",
"futures 0.3.27", "futures 0.3.27",
"jsonrpc-core-client", "jsonrpc-core-client",
"lazy_static",
"log 0.4.17", "log 0.4.17",
"native-tls", "native-tls",
"num-traits", "num-traits",

View File

@ -70,3 +70,4 @@ num_enum = "0.6.1"
config = "0.13.1" config = "0.13.1"
prometheus = "0.13.3" prometheus = "0.13.3"
lazy_static = "1.4.0"

View File

@ -17,3 +17,7 @@ kill_timeout = 30
hard_limit = 1024 hard_limit = 1024
soft_limit = 1024 soft_limit = 1024
type = "connections" type = "connections"
[metrics]
port = 9091
path = "/metrics"

View File

@ -7,3 +7,7 @@ kill_timeout = 30
[experimental] [experimental]
cmd = ["worker", "markets.json"] cmd = ["worker", "markets.json"]
[metrics]
port = 9091
path = "/metrics"

View File

@ -4,43 +4,35 @@
"address" : "8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6" "address" : "8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6"
}, },
{ {
"name" : "RLB/USDC", "name" : "wBTCpo/USDC",
"address" : "72h8rWaWwfPUL36PAFqyQZU8RT1V3FKG7Nc45aK89xTs" "address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN"
}, },
{ {
"name" : "MNGO/USDC", "name" : "MNGO/USDC",
"address" : "3NnxQvDcZXputNMxaxsGvqiKpqgPfSYXpNigZNFcknmD" "address" : "3NnxQvDcZXputNMxaxsGvqiKpqgPfSYXpNigZNFcknmD"
}, },
{ {
"name" : "BONK/SOL", "name": "BONK/SOL",
"address" : "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF" "address": "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF"
}, },
{ {
"name" : "WBTC/USDC", "name": "DUAL/USDC",
"address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN" "address": "H6rrYK3SUHF2eguZCyJxnSBMJqjXhUtuaki6PHiutvum"
}, },
{ {
"name": "mSOL/USDC", "name": "mSOL/USDC",
"address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD" "address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD"
}, },
{ {
"name": "SOL/USDT", "name": "ETHpo/USDC",
"address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA"
},
{
"name": "USDT/USDC",
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
},
{
"name": "ETH/USDC",
"address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4" "address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4"
}, },
{
"name": "BONK/USDC",
"address": "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71"
},
{ {
"name": "RAY/USDC", "name": "RAY/USDC",
"address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj" "address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj"
},
{
"name": "RAY/USDT",
"address": "GpHbiJJ9VHiuHVXeoet121Utrbm1CSNNzYrBKB8Xz2oz"
} }
] ]

View File

@ -1,526 +0,0 @@
{
"db": "PostgreSQL",
"21b633d5aec33394129b051ea1df0ee9ab097626d74d8943f6323f9fb42723b5": {
"describe": {
"columns": [
{
"name": "open_orders_owner",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "raw_ask_size!",
"ordinal": 1,
"type_info": "Numeric"
},
{
"name": "raw_bid_size!",
"ordinal": 2,
"type_info": "Numeric"
}
],
"nullable": [
false,
null,
null
],
"parameters": {
"Left": [
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n open_orders_owner, \n sum(\n native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END\n ) as \"raw_ask_size!\",\n sum(\n native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END\n ) as \"raw_bid_size!\"\n FROM fills\n WHERE market = $1\n AND time >= $2\n AND time < $3\n GROUP BY open_orders_owner\n ORDER BY \n sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) \n + \n sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) \nDESC \nLIMIT 10000"
},
"35e8220c601aca620da1cfcb978c8b7a64dcbf15550521b418cf509015cd88d8": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE TABLE IF NOT EXISTS fills (\n id numeric PRIMARY KEY,\n time timestamptz not null,\n market text not null,\n open_orders text not null,\n open_orders_owner text not null,\n bid bool not null,\n maker bool not null,\n native_qty_paid numeric not null,\n native_qty_received numeric not null,\n native_fee_or_rebate numeric not null,\n fee_tier text not null,\n order_id text not null\n )"
},
"409267a0a1c925b3723396b1534b17ccdd7a27aac7bbcfaef159dbc6e3005625": {
"describe": {
"columns": [
{
"name": "address!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "raw_quote_size!",
"ordinal": 1,
"type_info": "Numeric"
},
{
"name": "raw_base_size!",
"ordinal": 2,
"type_info": "Numeric"
}
],
"nullable": [
false,
null,
null
],
"parameters": {
"Left": []
}
},
"query": "select market as \"address!\",\n sum(native_qty_paid) as \"raw_quote_size!\",\n sum(native_qty_received) as \"raw_base_size!\"\n from fills \n where \"time\" >= current_timestamp - interval '1 day' \n and bid = true\n group by market"
},
"4bab7d4329b2969b2ba610546c660207740c9bafe644df55fa57101df30e4899": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)"
},
"6658c0121e5a7defbd1fe7c549ca0a957b188b9eb1837573a05d0e6476ef945a": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE TABLE IF NOT EXISTS candles (\n id serial,\n market_name text,\n start_time timestamptz,\n end_time timestamptz,\n resolution text,\n open numeric,\n close numeric,\n high numeric,\n low numeric,\n volume numeric,\n complete bool\n )"
},
"817ee7903cb5095f85fb787beff04ace3a452cf8749205bb230e41d8c9e03c4a": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)"
},
"81d619e2680874afff756031aa4fef16678a7ea226a259e1bdb316bf52478939": {
"describe": {
"columns": [
{
"name": "market_name!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "high!",
"ordinal": 1,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 2,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 3,
"type_info": "Numeric"
}
],
"nullable": [
true,
null,
null,
true
],
"parameters": {
"Left": []
}
},
"query": "select \n g.market_name as \"market_name!\", \n g.high as \"high!\", \n g.low as \"low!\", \n c.\"close\" as \"close!\"\n from \n (\n SELECT \n market_name, \n max(start_time) as \"start_time\", \n max(high) as \"high\", \n min(low) as \"low\" \n from \n candles \n where \n \"resolution\" = '1M' \n and \"start_time\" >= current_timestamp - interval '1 day' \n group by \n market_name\n ) as g \n join candles c on g.market_name = c.market_name \n and g.start_time = c.start_time \n where \n c.resolution = '1M'"
},
"866773102b03b002e9d0535d3173f36264e1d30a46a5ec8240b0ea8076d6d1c5": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n ORDER BY start_time asc"
},
"aee3a3e04f837bd62e263452bfbaf5d7dff271799c80d5efd22a54954ac212c4": {
"describe": {
"columns": [
{
"name": "open_orders_owner",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "raw_ask_size!",
"ordinal": 1,
"type_info": "Numeric"
},
{
"name": "raw_bid_size!",
"ordinal": 2,
"type_info": "Numeric"
}
],
"nullable": [
false,
null,
null
],
"parameters": {
"Left": [
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n open_orders_owner, \n sum(\n native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END\n ) as \"raw_ask_size!\",\n sum(\n native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END\n ) as \"raw_bid_size!\"\n FROM fills\n WHERE market = $1\n AND time >= $2\n AND time < $3\n GROUP BY open_orders_owner\n ORDER BY \n sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) \n + \n sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) \nDESC \nLIMIT 10000"
},
"b259d64b388eb675b727ee511529472177b59ea616041360217afc2d928f33ed": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)"
},
"b71ec8e5c041cec2c83778f31f7dc723cc1b5a8b6bb0e2c7f8ba7ef31f117965": {
"describe": {
"columns": [
{
"name": "time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "bid!",
"ordinal": 1,
"type_info": "Bool"
},
{
"name": "maker!",
"ordinal": 2,
"type_info": "Bool"
},
{
"name": "native_qty_paid!",
"ordinal": 3,
"type_info": "Numeric"
},
{
"name": "native_qty_received!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "native_fee_or_rebate!",
"ordinal": 5,
"type_info": "Numeric"
}
],
"nullable": [
false,
false,
false,
false,
false,
false
],
"parameters": {
"Left": [
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1\n and time >= $2\n and time < $3 \n and maker = true\n ORDER BY time asc"
},
"dc7c7c04b6870b9617e1e869aa4b7027baddaeeb22f2792f2e9c40f643f863c7": {
"describe": {
"columns": [
{
"name": "time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "bid!",
"ordinal": 1,
"type_info": "Bool"
},
{
"name": "maker!",
"ordinal": 2,
"type_info": "Bool"
},
{
"name": "native_qty_paid!",
"ordinal": 3,
"type_info": "Numeric"
},
{
"name": "native_qty_received!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "native_fee_or_rebate!",
"ordinal": 5,
"type_info": "Numeric"
}
],
"nullable": [
false,
false,
false,
false,
false,
false
],
"parameters": {
"Left": [
"Text"
]
}
},
"query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1 \n and maker = true\n ORDER BY time asc LIMIT 1"
},
"e367fec686c10d361f4a5ac014ca34ce68544dfe9cf32b79a6a78790e8c6d5cc": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n ORDER BY start_time asc"
},
"e94788e9eb04534dc13a73f80255705fb39789caa6dfb43e8417471f8399bb85": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "DO $$\n BEGIN\n IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN\n ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);\n END IF;\n END $$"
},
"fc5b19647fbdffb44ab87517ca2a6787f8eab3cc59a1633551524acab44425b6": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and complete = true\n ORDER BY start_time desc LIMIT 1"
}
}

View File

@ -1,12 +1,7 @@
use deadpool_postgres::Object; use deadpool_postgres::Object;
use openbook_candles::{ use openbook_candles::{
database::{ database::{initialize::connect_to_database, insert::build_candles_upsert_statement},
initialize::connect_to_database,
insert::{build_candles_upsert_statement},
},
structs::{ structs::{
candle::Candle, candle::Candle,
markets::{fetch_market_infos, load_markets}, markets::{fetch_market_infos, load_markets},
@ -18,10 +13,9 @@ use openbook_candles::{
minute_candles::backfill_batch_1m_candles, minute_candles::backfill_batch_1m_candles,
}, },
}; };
use std::{env}; use std::env;
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();

View File

@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> {
let market_infos = fetch_market_infos(&config, markets.clone()).await?; let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new(); let mut target_markets = HashMap::new();
for m in market_infos.clone() { for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0); target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
} }
println!("{:?}", target_markets); println!("{:?}", target_markets);
@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> {
pub async fn backfill( pub async fn backfill(
rpc_url: String, rpc_url: String,
fill_sender: &Sender<OpenBookFillEvent>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
println!("backfill started"); println!("backfill started");
let mut before_sig: Option<Signature> = None; let mut before_sig: Option<Signature> = None;
@ -145,7 +145,7 @@ pub async fn get_transactions(
rpc_client: &RpcClient, rpc_client: &RpcClient,
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>, mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
fill_sender: &Sender<OpenBookFillEvent>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
) { ) {
sigs.retain(|sig| sig.err.is_none()); sigs.retain(|sig| sig.err.is_none());
if sigs.last().is_none() { if sigs.last().is_none() {

View File

@ -14,9 +14,7 @@ pub async fn fetch_earliest_fill(
) -> anyhow::Result<Option<PgOpenBookFill>> { ) -> anyhow::Result<Option<PgOpenBookFill>> {
let client = pool.get().await?; let client = pool.get().await?;
let stmt = client let stmt = r#"SELECT
.prepare(
r#"SELECT
time as "time!", time as "time!",
bid as "bid!", bid as "bid!",
maker as "maker!", maker as "maker!",
@ -26,11 +24,9 @@ pub async fn fetch_earliest_fill(
from fills from fills
where market = $1 where market = $1
and maker = true and maker = true
ORDER BY time asc LIMIT 1"#, ORDER BY time asc LIMIT 1"#;
)
.await?;
let row = client.query_opt(&stmt, &[&market_address_string]).await?; let row = client.query_opt(stmt, &[&market_address_string]).await?;
match row { match row {
Some(r) => Ok(Some(PgOpenBookFill::from_row(r))), Some(r) => Ok(Some(PgOpenBookFill::from_row(r))),
@ -46,9 +42,7 @@ pub async fn fetch_fills_from(
) -> anyhow::Result<Vec<PgOpenBookFill>> { ) -> anyhow::Result<Vec<PgOpenBookFill>> {
let client = pool.get().await?; let client = pool.get().await?;
let stmt = client let stmt = r#"SELECT
.prepare(
r#"SELECT
time as "time!", time as "time!",
bid as "bid!", bid as "bid!",
maker as "maker!", maker as "maker!",
@ -60,17 +54,12 @@ pub async fn fetch_fills_from(
and time >= $2::timestamptz and time >= $2::timestamptz
and time < $3::timestamptz and time < $3::timestamptz
and maker = true and maker = true
ORDER BY time asc"#, ORDER BY time asc"#;
)
.await?;
let rows = client let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time]) .query(stmt, &[&market_address_string, &start_time, &end_time])
.await?; .await?;
Ok(rows Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect())
.into_iter()
.map(PgOpenBookFill::from_row)
.collect())
} }
pub async fn fetch_latest_finished_candle( pub async fn fetch_latest_finished_candle(
@ -80,9 +69,7 @@ pub async fn fetch_latest_finished_candle(
) -> anyhow::Result<Option<Candle>> { ) -> anyhow::Result<Option<Candle>> {
let client = pool.get().await?; let client = pool.get().await?;
let stmt = client let stmt = r#"SELECT
.prepare(
r#"SELECT
market_name as "market_name!", market_name as "market_name!",
start_time as "start_time!", start_time as "start_time!",
end_time as "end_time!", end_time as "end_time!",
@ -97,12 +84,10 @@ pub async fn fetch_latest_finished_candle(
where market_name = $1 where market_name = $1
and resolution = $2 and resolution = $2
and complete = true and complete = true
ORDER BY start_time desc LIMIT 1"#, ORDER BY start_time desc LIMIT 1"#;
)
.await?;
let row = client let row = client
.query_opt(&stmt, &[&market_name, &resolution.to_string()]) .query_opt(stmt, &[&market_name, &resolution.to_string()])
.await?; .await?;
match row { match row {
@ -120,9 +105,7 @@ pub async fn fetch_earliest_candles(
) -> anyhow::Result<Vec<Candle>> { ) -> anyhow::Result<Vec<Candle>> {
let client = pool.get().await?; let client = pool.get().await?;
let stmt = client let stmt = r#"SELECT
.prepare(
r#"SELECT
market_name as "market_name!", market_name as "market_name!",
start_time as "start_time!", start_time as "start_time!",
end_time as "end_time!", end_time as "end_time!",
@ -136,12 +119,10 @@ pub async fn fetch_earliest_candles(
from candles from candles
where market_name = $1 where market_name = $1
and resolution = $2 and resolution = $2
ORDER BY start_time asc"#, ORDER BY start_time asc"#;
)
.await?;
let rows = client let rows = client
.query(&stmt, &[&market_name, &resolution.to_string()]) .query(stmt, &[&market_name, &resolution.to_string()])
.await?; .await?;
Ok(rows.into_iter().map(Candle::from_row).collect()) Ok(rows.into_iter().map(Candle::from_row).collect())
@ -156,9 +137,7 @@ pub async fn fetch_candles_from(
) -> anyhow::Result<Vec<Candle>> { ) -> anyhow::Result<Vec<Candle>> {
let client = pool.get().await?; let client = pool.get().await?;
let stmt = client let stmt = r#"SELECT
.prepare(
r#"SELECT
market_name as "market_name!", market_name as "market_name!",
start_time as "start_time!", start_time as "start_time!",
end_time as "end_time!", end_time as "end_time!",
@ -174,13 +153,11 @@ pub async fn fetch_candles_from(
and resolution = $2 and resolution = $2
and start_time >= $3 and start_time >= $3
and end_time <= $4 and end_time <= $4
ORDER BY start_time asc"#, ORDER BY start_time asc"#;
)
.await?;
let rows = client let rows = client
.query( .query(
&stmt, stmt,
&[ &[
&market_name, &market_name,
&resolution.to_string(), &resolution.to_string(),
@ -201,9 +178,7 @@ pub async fn fetch_top_traders_by_base_volume_from(
) -> anyhow::Result<Vec<PgTrader>> { ) -> anyhow::Result<Vec<PgTrader>> {
let client = pool.get().await?; let client = pool.get().await?;
let stmt = client let stmt = r#"SELECT
.prepare(
r#"SELECT
open_orders_owner, open_orders_owner,
sum( sum(
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
@ -221,12 +196,10 @@ pub async fn fetch_top_traders_by_base_volume_from(
+ +
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC DESC
LIMIT 10000"#, LIMIT 10000"#;
)
.await?;
let rows = client let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time]) .query(stmt, &[&market_address_string, &start_time, &end_time])
.await?; .await?;
Ok(rows.into_iter().map(PgTrader::from_row).collect()) Ok(rows.into_iter().map(PgTrader::from_row).collect())
@ -240,9 +213,7 @@ pub async fn fetch_top_traders_by_quote_volume_from(
) -> anyhow::Result<Vec<PgTrader>> { ) -> anyhow::Result<Vec<PgTrader>> {
let client = pool.get().await?; let client = pool.get().await?;
let stmt = client let stmt = r#"SELECT
.prepare(
r#"SELECT
open_orders_owner, open_orders_owner,
sum( sum(
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
@ -260,12 +231,10 @@ pub async fn fetch_top_traders_by_quote_volume_from(
+ +
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC DESC
LIMIT 10000"#, LIMIT 10000"#;
)
.await?;
let rows = client let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time]) .query(stmt, &[&market_address_string, &start_time, &end_time])
.await?; .await?;
Ok(rows.into_iter().map(PgTrader::from_row).collect()) Ok(rows.into_iter().map(PgTrader::from_row).collect())
@ -273,22 +242,32 @@ pub async fn fetch_top_traders_by_quote_volume_from(
pub async fn fetch_coingecko_24h_volume( pub async fn fetch_coingecko_24h_volume(
pool: &Pool, pool: &Pool,
market_address_strings: &Vec<&str>,
) -> anyhow::Result<Vec<PgCoinGecko24HourVolume>> { ) -> anyhow::Result<Vec<PgCoinGecko24HourVolume>> {
let client = pool.get().await?; let client = pool.get().await?;
let stmt = client let stmt = r#"SELECT
.prepare( t1.market,
r#"select market as "address!", COALESCE(t2.native_qty_received, 0) as "raw_base_size!",
sum(native_qty_received) as "raw_base_size!", COALESCE(t2.native_qty_paid, 0) as "raw_quote_size!"
sum(native_qty_paid) as "raw_quote_size!" FROM (
from fills SELECT distinct on (market) *
where "time" >= current_timestamp - interval '1 day' FROM fills f
and bid = true where bid = true
group by market"#, and market = any($1)
) order by market, "time" desc
.await?; ) t1
LEFT JOIN (
select market,
sum(native_qty_received) as "native_qty_received",
sum(native_qty_paid) as "native_qty_paid"
from fills
where "time" >= current_timestamp - interval '1 day'
and bid = true
group by market
) t2 ON t1.market = t2.market"#;
let rows = client.query(&stmt, &[]).await?; let rows = client.query(stmt, &[&market_address_strings]).await?;
Ok(rows Ok(rows
.into_iter() .into_iter()
@ -298,39 +277,43 @@ pub async fn fetch_coingecko_24h_volume(
pub async fn fetch_coingecko_24h_high_low( pub async fn fetch_coingecko_24h_high_low(
pool: &Pool, pool: &Pool,
market_names: &Vec<&str>,
) -> anyhow::Result<Vec<PgCoinGecko24HighLow>> { ) -> anyhow::Result<Vec<PgCoinGecko24HighLow>> {
let client = pool.get().await?; let client = pool.get().await?;
let stmt = client let stmt = r#"select
.prepare( r.market_name as "market_name!",
r#"select coalesce(c.high, r.high) as "high!",
g.market_name as "market_name!", coalesce(c.low, r.low) as "low!",
g.high as "high!", r."close" as "close!"
g.low as "low!",
c."close" as "close!"
from
(
SELECT
market_name,
max(start_time) as "start_time",
max(high) as "high",
min(low) as "low"
from from
candles (
where SELECT *
"resolution" = '1M' from
and "start_time" >= current_timestamp - interval '1 day' candles
group by where (market_name, start_time, resolution) in (
market_name select market_name, max(start_time), resolution
) as g from candles
join candles c on g.market_name = c.market_name where "resolution" = '1M'
and g.start_time = c.start_time and market_name = any($1)
where group by market_name, resolution
c.resolution = '1M'"#, )
) ) as r
.await?; left join (
SELECT
market_name,
max(start_time) as "start_time",
max(high) as "high",
min(low) as "low"
from
candles
where
"resolution" = '1M'
and "start_time" >= current_timestamp - interval '1 day'
group by market_name
) c on r.market_name = c.market_name"#;
let rows = client.query(&stmt, &[]).await?; let rows = client.query(stmt, &[&market_names]).await?;
Ok(rows Ok(rows
.into_iter() .into_iter()

View File

@ -36,6 +36,7 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
MakeTlsConnector::new( MakeTlsConnector::new(
TlsConnector::builder() TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_cert)?) .add_root_certificate(Certificate::from_pem(&ca_cert)?)
// TODO: make this configurable
.identity(Identity::from_pkcs12(&client_key, "pass")?) .identity(Identity::from_pkcs12(&client_key, "pass")?)
.danger_accept_invalid_certs(false) .danger_accept_invalid_certs(false)
.build()?, .build()?,

View File

@ -51,10 +51,12 @@ pub async fn pairs(context: web::Data<WebContext>) -> Result<HttpResponse, Serve
pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, ServerError> { pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, ServerError> {
// let client = RpcClient::new(context.rpc_url.clone()); // let client = RpcClient::new(context.rpc_url.clone());
let markets = &context.markets; let markets = &context.markets;
let market_names = markets.iter().map(|x| x.name.as_str()).collect();
let market_addresses = markets.iter().map(|x| x.address.as_str()).collect();
// let bba_fut = get_best_bids_and_asks(client, markets); // let bba_fut = get_best_bids_and_asks(client, markets);
let volume_fut = fetch_coingecko_24h_volume(&context.pool); let volume_fut = fetch_coingecko_24h_volume(&context.pool, &market_addresses);
let high_low_fut = fetch_coingecko_24h_high_low(&context.pool); let high_low_fut = fetch_coingecko_24h_high_low(&context.pool, &market_names);
let (volume_query, high_low_quey) = join!(volume_fut, high_low_fut,); let (volume_query, high_low_quey) = join!(volume_fut, high_low_fut,);

View File

@ -1,8 +1,9 @@
use actix_web::{ use actix_web::{
http::StatusCode,
middleware::Logger, middleware::Logger,
rt::System, rt::System,
web::{self, Data}, web::{self, Data},
App, HttpServer, http::StatusCode, App, HttpServer,
}; };
use actix_web_prom::PrometheusMetricsBuilder; use actix_web_prom::PrometheusMetricsBuilder;
use candles::get_candles; use candles::get_candles;
@ -92,13 +93,10 @@ async fn main() -> std::io::Result<()> {
// Thread to serve metrics endpoint privately // Thread to serve metrics endpoint privately
let private_server = thread::spawn(move || { let private_server = thread::spawn(move || {
let sys = System::new(); let sys = System::new();
let srv = HttpServer::new(move || { let srv = HttpServer::new(move || App::new().wrap(private_metrics.clone()))
App::new() .bind("0.0.0.0:9091")
.wrap(private_metrics.clone()) .unwrap()
}) .run();
.bind("0.0.0.0:9091")
.unwrap()
.run();
sys.block_on(srv).unwrap(); sys.block_on(srv).unwrap();
}); });

View File

@ -1,8 +1,8 @@
use openbook_candles::structs::candle::Candle; use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::structs::openbook::OpenBookFillEvent;
use openbook_candles::utils::Config; use openbook_candles::utils::Config;
use openbook_candles::worker::metrics::serve_metrics;
use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::{ use openbook_candles::{
database::{ database::{
@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> {
let market_infos = fetch_market_infos(&config, markets.clone()).await?; let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new(); let mut target_markets = HashMap::new();
for m in market_infos.clone() { for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0); target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
} }
println!("{:?}", target_markets); println!("{:?}", target_markets);
@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?; setup_database(&pool).await?;
let mut handles = vec![]; let mut handles = vec![];
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000); let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(10000);
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; scrape(&config, &fill_sender, &target_markets).await;
@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> {
} }
})); }));
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(1000); let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(100000);
for market in market_infos.into_iter() { for market in market_infos.into_iter() {
let sender = candle_sender.clone(); let sender = candle_sender.clone();
@ -78,6 +78,11 @@ async fn main() -> anyhow::Result<()> {
} }
})); }));
handles.push(tokio::spawn(async move {
// TODO: this is ugly af
serve_metrics().await.unwrap().await.unwrap();
}));
futures::future::join_all(handles).await; futures::future::join_all(handles).await;
Ok(()) Ok(())

33
src/worker/metrics/mod.rs Normal file
View File

@ -0,0 +1,33 @@
use actix_web::{dev::Server, http::StatusCode, App, HttpServer};
use actix_web_prom::PrometheusMetricsBuilder;
use lazy_static::lazy_static;
use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry};
lazy_static! {
static ref METRIC_REGISTRY: Registry =
Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap();
pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"fills_total",
"Total number of fills scraped",
&["market"],
METRIC_REGISTRY
)
.unwrap();
}
pub async fn serve_metrics() -> anyhow::Result<Server> {
let metrics = PrometheusMetricsBuilder::new("openbook_candles_worker")
.registry(METRIC_REGISTRY.clone())
.exclude("/metrics")
.exclude_status(StatusCode::NOT_FOUND)
.endpoint("/metrics")
.build()
.unwrap();
let server = HttpServer::new(move || App::new().wrap(metrics.clone()))
.bind("0.0.0.0:9091")
.unwrap()
.disable_signals()
.run();
Ok(server)
}

View File

@ -1,2 +1,3 @@
pub mod candle_batching; pub mod candle_batching;
pub mod metrics;
pub mod trade_fetching; pub mod trade_fetching;

View File

@ -11,7 +11,7 @@ const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns( pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>, txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
) -> Vec<OpenBookFillEvent> { ) -> Vec<OpenBookFillEvent> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new(); let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for txn in txns.iter_mut() { for txn in txns.iter_mut() {
@ -42,7 +42,7 @@ pub fn parse_trades_from_openbook_txns(
fn parse_openbook_fills_from_logs( fn parse_openbook_fills_from_logs(
logs: &Vec<String>, logs: &Vec<String>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
block_time: i64, block_time: i64,
) -> Option<Vec<OpenBookFillEvent>> { ) -> Option<Vec<OpenBookFillEvent>> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new(); let mut fills_vector = Vec::<OpenBookFillEvent>::new();

View File

@ -8,14 +8,16 @@ use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use crate::{structs::openbook::OpenBookFillEvent, utils::Config}; use crate::{
structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::METRIC_FILLS_TOTAL,
};
use super::parsing::parse_trades_from_openbook_txns; use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape( pub async fn scrape(
config: &Config, config: &Config,
fill_sender: &Sender<OpenBookFillEvent>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
) { ) {
let rpc_client = let rpc_client =
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed()); RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
@ -39,7 +41,7 @@ pub async fn scrape_transactions(
before_sig: Option<Signature>, before_sig: Option<Signature>,
limit: Option<usize>, limit: Option<usize>,
fill_sender: &Sender<OpenBookFillEvent>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
) -> Option<Signature> { ) -> Option<Signature> {
let rpc_config = GetConfirmedSignaturesForAddress2Config { let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig, before: before_sig,
@ -96,9 +98,11 @@ pub async fn scrape_transactions(
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if !fills.is_empty() { if !fills.is_empty() {
for fill in fills.into_iter() { for fill in fills.into_iter() {
let market_name = target_markets.get(&fill.market).unwrap();
if let Err(_) = fill_sender.send(fill).await { if let Err(_) = fill_sender.send(fill).await {
panic!("receiver dropped"); panic!("receiver dropped");
} }
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
} }
} }