Compare commits

...

8 Commits

Author SHA1 Message Date
Riordan Panayides 3a515d61a0 Merge remote-tracking branch 'upstream/main' into pan/metrics 2023-05-30 18:03:20 +01:00
Riordan Panayides 35937c9572 Add metrics to worker, add market name to map, cargo fmt, fix RAY/USDC id 2023-05-30 18:01:47 +01:00
Riordan Panayides 9489bd3e78 Add RAY/USDC market 2023-05-30 12:16:20 +01:00
dboures 93d95005f5
chore: remove stale sqlx file + update markets.json 2023-05-29 23:35:47 -05:00
dboures 9c7a42f83e
refactor: don't use prepared statements 2023-05-29 23:35:06 -05:00
dboures b1999c0061
fix: /tickers returns last prices if stale 2023-05-29 23:31:57 -05:00
Riordan Panayides ac2e906688 Enable fly metric polling 2023-05-29 20:48:53 +01:00
Riordan Panayides 522e7a5865 Add mango mainnet markets 2023-05-29 20:42:26 +01:00
17 changed files with 165 additions and 668 deletions

1
Cargo.lock generated
View File

@ -3451,6 +3451,7 @@ dependencies = [
"env_logger 0.10.0",
"futures 0.3.27",
"jsonrpc-core-client",
"lazy_static",
"log 0.4.17",
"native-tls",
"num-traits",

View File

@ -70,3 +70,4 @@ num_enum = "0.6.1"
config = "0.13.1"
prometheus = "0.13.3"
lazy_static = "1.4.0"

View File

@ -17,3 +17,7 @@ kill_timeout = 30
hard_limit = 1024
soft_limit = 1024
type = "connections"
[metrics]
port = 9091
path = "/metrics"

View File

@ -7,3 +7,7 @@ kill_timeout = 30
[experimental]
cmd = ["worker", "markets.json"]
[metrics]
port = 9091
path = "/metrics"

View File

@ -4,43 +4,35 @@
"address" : "8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6"
},
{
"name" : "RLB/USDC",
"address" : "72h8rWaWwfPUL36PAFqyQZU8RT1V3FKG7Nc45aK89xTs"
"name" : "wBTCpo/USDC",
"address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN"
},
{
"name" : "MNGO/USDC",
"address" : "3NnxQvDcZXputNMxaxsGvqiKpqgPfSYXpNigZNFcknmD"
},
{
"name" : "BONK/SOL",
"address" : "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF"
"name": "BONK/SOL",
"address": "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF"
},
{
"name" : "WBTC/USDC",
"address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN"
"name": "DUAL/USDC",
"address": "H6rrYK3SUHF2eguZCyJxnSBMJqjXhUtuaki6PHiutvum"
},
{
"name": "mSOL/USDC",
"address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD"
},
{
"name": "SOL/USDT",
"address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA"
},
{
"name": "USDT/USDC",
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
},
{
"name": "ETH/USDC",
"name": "ETHpo/USDC",
"address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4"
},
{
"name": "BONK/USDC",
"address": "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71"
},
{
"name": "RAY/USDC",
"address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj"
},
{
"name": "RAY/USDT",
"address": "GpHbiJJ9VHiuHVXeoet121Utrbm1CSNNzYrBKB8Xz2oz"
}
]

View File

@ -1,526 +0,0 @@
{
"db": "PostgreSQL",
"21b633d5aec33394129b051ea1df0ee9ab097626d74d8943f6323f9fb42723b5": {
"describe": {
"columns": [
{
"name": "open_orders_owner",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "raw_ask_size!",
"ordinal": 1,
"type_info": "Numeric"
},
{
"name": "raw_bid_size!",
"ordinal": 2,
"type_info": "Numeric"
}
],
"nullable": [
false,
null,
null
],
"parameters": {
"Left": [
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n open_orders_owner, \n sum(\n native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END\n ) as \"raw_ask_size!\",\n sum(\n native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END\n ) as \"raw_bid_size!\"\n FROM fills\n WHERE market = $1\n AND time >= $2\n AND time < $3\n GROUP BY open_orders_owner\n ORDER BY \n sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) \n + \n sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) \nDESC \nLIMIT 10000"
},
"35e8220c601aca620da1cfcb978c8b7a64dcbf15550521b418cf509015cd88d8": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE TABLE IF NOT EXISTS fills (\n id numeric PRIMARY KEY,\n time timestamptz not null,\n market text not null,\n open_orders text not null,\n open_orders_owner text not null,\n bid bool not null,\n maker bool not null,\n native_qty_paid numeric not null,\n native_qty_received numeric not null,\n native_fee_or_rebate numeric not null,\n fee_tier text not null,\n order_id text not null\n )"
},
"409267a0a1c925b3723396b1534b17ccdd7a27aac7bbcfaef159dbc6e3005625": {
"describe": {
"columns": [
{
"name": "address!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "raw_quote_size!",
"ordinal": 1,
"type_info": "Numeric"
},
{
"name": "raw_base_size!",
"ordinal": 2,
"type_info": "Numeric"
}
],
"nullable": [
false,
null,
null
],
"parameters": {
"Left": []
}
},
"query": "select market as \"address!\",\n sum(native_qty_paid) as \"raw_quote_size!\",\n sum(native_qty_received) as \"raw_base_size!\"\n from fills \n where \"time\" >= current_timestamp - interval '1 day' \n and bid = true\n group by market"
},
"4bab7d4329b2969b2ba610546c660207740c9bafe644df55fa57101df30e4899": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)"
},
"6658c0121e5a7defbd1fe7c549ca0a957b188b9eb1837573a05d0e6476ef945a": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE TABLE IF NOT EXISTS candles (\n id serial,\n market_name text,\n start_time timestamptz,\n end_time timestamptz,\n resolution text,\n open numeric,\n close numeric,\n high numeric,\n low numeric,\n volume numeric,\n complete bool\n )"
},
"817ee7903cb5095f85fb787beff04ace3a452cf8749205bb230e41d8c9e03c4a": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)"
},
"81d619e2680874afff756031aa4fef16678a7ea226a259e1bdb316bf52478939": {
"describe": {
"columns": [
{
"name": "market_name!",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "high!",
"ordinal": 1,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 2,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 3,
"type_info": "Numeric"
}
],
"nullable": [
true,
null,
null,
true
],
"parameters": {
"Left": []
}
},
"query": "select \n g.market_name as \"market_name!\", \n g.high as \"high!\", \n g.low as \"low!\", \n c.\"close\" as \"close!\"\n from \n (\n SELECT \n market_name, \n max(start_time) as \"start_time\", \n max(high) as \"high\", \n min(low) as \"low\" \n from \n candles \n where \n \"resolution\" = '1M' \n and \"start_time\" >= current_timestamp - interval '1 day' \n group by \n market_name\n ) as g \n join candles c on g.market_name = c.market_name \n and g.start_time = c.start_time \n where \n c.resolution = '1M'"
},
"866773102b03b002e9d0535d3173f36264e1d30a46a5ec8240b0ea8076d6d1c5": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and start_time >= $3\n and end_time <= $4\n ORDER BY start_time asc"
},
"aee3a3e04f837bd62e263452bfbaf5d7dff271799c80d5efd22a54954ac212c4": {
"describe": {
"columns": [
{
"name": "open_orders_owner",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "raw_ask_size!",
"ordinal": 1,
"type_info": "Numeric"
},
{
"name": "raw_bid_size!",
"ordinal": 2,
"type_info": "Numeric"
}
],
"nullable": [
false,
null,
null
],
"parameters": {
"Left": [
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n open_orders_owner, \n sum(\n native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END\n ) as \"raw_ask_size!\",\n sum(\n native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END\n ) as \"raw_bid_size!\"\n FROM fills\n WHERE market = $1\n AND time >= $2\n AND time < $3\n GROUP BY open_orders_owner\n ORDER BY \n sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END) \n + \n sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END) \nDESC \nLIMIT 10000"
},
"b259d64b388eb675b727ee511529472177b59ea616041360217afc2d928f33ed": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)"
},
"b71ec8e5c041cec2c83778f31f7dc723cc1b5a8b6bb0e2c7f8ba7ef31f117965": {
"describe": {
"columns": [
{
"name": "time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "bid!",
"ordinal": 1,
"type_info": "Bool"
},
{
"name": "maker!",
"ordinal": 2,
"type_info": "Bool"
},
{
"name": "native_qty_paid!",
"ordinal": 3,
"type_info": "Numeric"
},
{
"name": "native_qty_received!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "native_fee_or_rebate!",
"ordinal": 5,
"type_info": "Numeric"
}
],
"nullable": [
false,
false,
false,
false,
false,
false
],
"parameters": {
"Left": [
"Text",
"Timestamptz",
"Timestamptz"
]
}
},
"query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1\n and time >= $2\n and time < $3 \n and maker = true\n ORDER BY time asc"
},
"dc7c7c04b6870b9617e1e869aa4b7027baddaeeb22f2792f2e9c40f643f863c7": {
"describe": {
"columns": [
{
"name": "time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "bid!",
"ordinal": 1,
"type_info": "Bool"
},
{
"name": "maker!",
"ordinal": 2,
"type_info": "Bool"
},
{
"name": "native_qty_paid!",
"ordinal": 3,
"type_info": "Numeric"
},
{
"name": "native_qty_received!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "native_fee_or_rebate!",
"ordinal": 5,
"type_info": "Numeric"
}
],
"nullable": [
false,
false,
false,
false,
false,
false
],
"parameters": {
"Left": [
"Text"
]
}
},
"query": "SELECT \n time as \"time!\",\n bid as \"bid!\",\n maker as \"maker!\",\n native_qty_paid as \"native_qty_paid!\",\n native_qty_received as \"native_qty_received!\",\n native_fee_or_rebate as \"native_fee_or_rebate!\" \n from fills \n where market = $1 \n and maker = true\n ORDER BY time asc LIMIT 1"
},
"e367fec686c10d361f4a5ac014ca34ce68544dfe9cf32b79a6a78790e8c6d5cc": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n ORDER BY start_time asc"
},
"e94788e9eb04534dc13a73f80255705fb39789caa6dfb43e8417471f8399bb85": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "DO $$\n BEGIN\n IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN\n ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);\n END IF;\n END $$"
},
"fc5b19647fbdffb44ab87517ca2a6787f8eab3cc59a1633551524acab44425b6": {
"describe": {
"columns": [
{
"name": "start_time!",
"ordinal": 0,
"type_info": "Timestamptz"
},
{
"name": "end_time!",
"ordinal": 1,
"type_info": "Timestamptz"
},
{
"name": "resolution!",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "market_name!",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "open!",
"ordinal": 4,
"type_info": "Numeric"
},
{
"name": "close!",
"ordinal": 5,
"type_info": "Numeric"
},
{
"name": "high!",
"ordinal": 6,
"type_info": "Numeric"
},
{
"name": "low!",
"ordinal": 7,
"type_info": "Numeric"
},
{
"name": "volume!",
"ordinal": 8,
"type_info": "Numeric"
},
{
"name": "complete!",
"ordinal": 9,
"type_info": "Bool"
}
],
"nullable": [
true,
true,
true,
true,
true,
true,
true,
true,
true,
true
],
"parameters": {
"Left": [
"Text",
"Text"
]
}
},
"query": "SELECT \n start_time as \"start_time!\",\n end_time as \"end_time!\",\n resolution as \"resolution!\",\n market_name as \"market_name!\",\n open as \"open!\",\n close as \"close!\",\n high as \"high!\",\n low as \"low!\",\n volume as \"volume!\",\n complete as \"complete!\"\n from candles\n where market_name = $1\n and resolution = $2\n and complete = true\n ORDER BY start_time desc LIMIT 1"
}
}

View File

@ -1,12 +1,7 @@
use deadpool_postgres::Object;
use openbook_candles::{
database::{
initialize::connect_to_database,
insert::{build_candles_upsert_statement},
},
database::{initialize::connect_to_database, insert::build_candles_upsert_statement},
structs::{
candle::Candle,
markets::{fetch_market_infos, load_markets},
@ -18,10 +13,9 @@ use openbook_candles::{
minute_candles::backfill_batch_1m_candles,
},
};
use std::{env};
use std::env;
use strum::IntoEnumIterator;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();

View File

@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> {
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0);
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
}
println!("{:?}", target_markets);
@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> {
pub async fn backfill(
rpc_url: String,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
) -> anyhow::Result<()> {
println!("backfill started");
let mut before_sig: Option<Signature> = None;
@ -145,7 +145,7 @@ pub async fn get_transactions(
rpc_client: &RpcClient,
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
) {
sigs.retain(|sig| sig.err.is_none());
if sigs.last().is_none() {

View File

@ -14,9 +14,7 @@ pub async fn fetch_earliest_fill(
) -> anyhow::Result<Option<PgOpenBookFill>> {
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
let stmt = r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
@ -26,11 +24,9 @@ pub async fn fetch_earliest_fill(
from fills
where market = $1
and maker = true
ORDER BY time asc LIMIT 1"#,
)
.await?;
ORDER BY time asc LIMIT 1"#;
let row = client.query_opt(&stmt, &[&market_address_string]).await?;
let row = client.query_opt(stmt, &[&market_address_string]).await?;
match row {
Some(r) => Ok(Some(PgOpenBookFill::from_row(r))),
@ -46,9 +42,7 @@ pub async fn fetch_fills_from(
) -> anyhow::Result<Vec<PgOpenBookFill>> {
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
let stmt = r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
@ -60,17 +54,12 @@ pub async fn fetch_fills_from(
and time >= $2::timestamptz
and time < $3::timestamptz
and maker = true
ORDER BY time asc"#,
)
.await?;
ORDER BY time asc"#;
let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time])
.query(stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows
.into_iter()
.map(PgOpenBookFill::from_row)
.collect())
Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect())
}
pub async fn fetch_latest_finished_candle(
@ -80,9 +69,7 @@ pub async fn fetch_latest_finished_candle(
) -> anyhow::Result<Option<Candle>> {
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
let stmt = r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
@ -97,12 +84,10 @@ pub async fn fetch_latest_finished_candle(
where market_name = $1
and resolution = $2
and complete = true
ORDER BY start_time desc LIMIT 1"#,
)
.await?;
ORDER BY start_time desc LIMIT 1"#;
let row = client
.query_opt(&stmt, &[&market_name, &resolution.to_string()])
.query_opt(stmt, &[&market_name, &resolution.to_string()])
.await?;
match row {
@ -120,9 +105,7 @@ pub async fn fetch_earliest_candles(
) -> anyhow::Result<Vec<Candle>> {
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
let stmt = r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
@ -136,12 +119,10 @@ pub async fn fetch_earliest_candles(
from candles
where market_name = $1
and resolution = $2
ORDER BY start_time asc"#,
)
.await?;
ORDER BY start_time asc"#;
let rows = client
.query(&stmt, &[&market_name, &resolution.to_string()])
.query(stmt, &[&market_name, &resolution.to_string()])
.await?;
Ok(rows.into_iter().map(Candle::from_row).collect())
@ -156,9 +137,7 @@ pub async fn fetch_candles_from(
) -> anyhow::Result<Vec<Candle>> {
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
let stmt = r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
@ -174,13 +153,11 @@ pub async fn fetch_candles_from(
and resolution = $2
and start_time >= $3
and end_time <= $4
ORDER BY start_time asc"#,
)
.await?;
ORDER BY start_time asc"#;
let rows = client
.query(
&stmt,
stmt,
&[
&market_name,
&resolution.to_string(),
@ -201,9 +178,7 @@ pub async fn fetch_top_traders_by_base_volume_from(
) -> anyhow::Result<Vec<PgTrader>> {
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
let stmt = r#"SELECT
open_orders_owner,
sum(
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
@ -221,12 +196,10 @@ pub async fn fetch_top_traders_by_base_volume_from(
+
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#,
)
.await?;
LIMIT 10000"#;
let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time])
.query(stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows.into_iter().map(PgTrader::from_row).collect())
@ -240,9 +213,7 @@ pub async fn fetch_top_traders_by_quote_volume_from(
) -> anyhow::Result<Vec<PgTrader>> {
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
let stmt = r#"SELECT
open_orders_owner,
sum(
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
@ -260,12 +231,10 @@ pub async fn fetch_top_traders_by_quote_volume_from(
+
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#,
)
.await?;
LIMIT 10000"#;
let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time])
.query(stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows.into_iter().map(PgTrader::from_row).collect())
@ -273,22 +242,32 @@ pub async fn fetch_top_traders_by_quote_volume_from(
pub async fn fetch_coingecko_24h_volume(
pool: &Pool,
market_address_strings: &Vec<&str>,
) -> anyhow::Result<Vec<PgCoinGecko24HourVolume>> {
let client = pool.get().await?;
let stmt = client
.prepare(
r#"select market as "address!",
sum(native_qty_received) as "raw_base_size!",
sum(native_qty_paid) as "raw_quote_size!"
from fills
where "time" >= current_timestamp - interval '1 day'
and bid = true
group by market"#,
)
.await?;
let stmt = r#"SELECT
t1.market,
COALESCE(t2.native_qty_received, 0) as "raw_base_size!",
COALESCE(t2.native_qty_paid, 0) as "raw_quote_size!"
FROM (
SELECT distinct on (market) *
FROM fills f
where bid = true
and market = any($1)
order by market, "time" desc
) t1
LEFT JOIN (
select market,
sum(native_qty_received) as "native_qty_received",
sum(native_qty_paid) as "native_qty_paid"
from fills
where "time" >= current_timestamp - interval '1 day'
and bid = true
group by market
) t2 ON t1.market = t2.market"#;
let rows = client.query(&stmt, &[]).await?;
let rows = client.query(stmt, &[&market_address_strings]).await?;
Ok(rows
.into_iter()
@ -298,39 +277,43 @@ pub async fn fetch_coingecko_24h_volume(
pub async fn fetch_coingecko_24h_high_low(
pool: &Pool,
market_names: &Vec<&str>,
) -> anyhow::Result<Vec<PgCoinGecko24HighLow>> {
let client = pool.get().await?;
let stmt = client
.prepare(
r#"select
g.market_name as "market_name!",
g.high as "high!",
g.low as "low!",
c."close" as "close!"
from
(
SELECT
market_name,
max(start_time) as "start_time",
max(high) as "high",
min(low) as "low"
let stmt = r#"select
r.market_name as "market_name!",
coalesce(c.high, r.high) as "high!",
coalesce(c.low, r.low) as "low!",
r."close" as "close!"
from
candles
where
"resolution" = '1M'
and "start_time" >= current_timestamp - interval '1 day'
group by
market_name
) as g
join candles c on g.market_name = c.market_name
and g.start_time = c.start_time
where
c.resolution = '1M'"#,
)
.await?;
(
SELECT *
from
candles
where (market_name, start_time, resolution) in (
select market_name, max(start_time), resolution
from candles
where "resolution" = '1M'
and market_name = any($1)
group by market_name, resolution
)
) as r
left join (
SELECT
market_name,
max(start_time) as "start_time",
max(high) as "high",
min(low) as "low"
from
candles
where
"resolution" = '1M'
and "start_time" >= current_timestamp - interval '1 day'
group by market_name
) c on r.market_name = c.market_name"#;
let rows = client.query(&stmt, &[]).await?;
let rows = client.query(stmt, &[&market_names]).await?;
Ok(rows
.into_iter()

View File

@ -36,6 +36,7 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
MakeTlsConnector::new(
TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
// TODO: make this configurable
.identity(Identity::from_pkcs12(&client_key, "pass")?)
.danger_accept_invalid_certs(false)
.build()?,

View File

@ -51,10 +51,12 @@ pub async fn pairs(context: web::Data<WebContext>) -> Result<HttpResponse, Serve
pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, ServerError> {
// let client = RpcClient::new(context.rpc_url.clone());
let markets = &context.markets;
let market_names = markets.iter().map(|x| x.name.as_str()).collect();
let market_addresses = markets.iter().map(|x| x.address.as_str()).collect();
// let bba_fut = get_best_bids_and_asks(client, markets);
let volume_fut = fetch_coingecko_24h_volume(&context.pool);
let high_low_fut = fetch_coingecko_24h_high_low(&context.pool);
let volume_fut = fetch_coingecko_24h_volume(&context.pool, &market_addresses);
let high_low_fut = fetch_coingecko_24h_high_low(&context.pool, &market_names);
let (volume_query, high_low_quey) = join!(volume_fut, high_low_fut,);

View File

@ -1,8 +1,9 @@
use actix_web::{
http::StatusCode,
middleware::Logger,
rt::System,
web::{self, Data},
App, HttpServer, http::StatusCode,
App, HttpServer,
};
use actix_web_prom::PrometheusMetricsBuilder;
use candles::get_candles;
@ -92,13 +93,10 @@ async fn main() -> std::io::Result<()> {
// Thread to serve metrics endpoint privately
let private_server = thread::spawn(move || {
let sys = System::new();
let srv = HttpServer::new(move || {
App::new()
.wrap(private_metrics.clone())
})
.bind("0.0.0.0:9091")
.unwrap()
.run();
let srv = HttpServer::new(move || App::new().wrap(private_metrics.clone()))
.bind("0.0.0.0:9091")
.unwrap()
.run();
sys.block_on(srv).unwrap();
});

View File

@ -1,8 +1,8 @@
use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEvent;
use openbook_candles::utils::Config;
use openbook_candles::worker::metrics::serve_metrics;
use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::{
database::{
@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> {
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0);
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
}
println!("{:?}", target_markets);
@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?;
let mut handles = vec![];
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(10000);
handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await;
@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> {
}
}));
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(100000);
for market in market_infos.into_iter() {
let sender = candle_sender.clone();
@ -78,6 +78,11 @@ async fn main() -> anyhow::Result<()> {
}
}));
handles.push(tokio::spawn(async move {
// TODO: this is ugly af
serve_metrics().await.unwrap().await.unwrap();
}));
futures::future::join_all(handles).await;
Ok(())

33
src/worker/metrics/mod.rs Normal file
View File

@ -0,0 +1,33 @@
use actix_web::{dev::Server, http::StatusCode, App, HttpServer};
use actix_web_prom::PrometheusMetricsBuilder;
use lazy_static::lazy_static;
use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry};
lazy_static! {
static ref METRIC_REGISTRY: Registry =
Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap();
pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"fills_total",
"Total number of fills scraped",
&["market"],
METRIC_REGISTRY
)
.unwrap();
}
pub async fn serve_metrics() -> anyhow::Result<Server> {
let metrics = PrometheusMetricsBuilder::new("openbook_candles_worker")
.registry(METRIC_REGISTRY.clone())
.exclude("/metrics")
.exclude_status(StatusCode::NOT_FOUND)
.endpoint("/metrics")
.build()
.unwrap();
let server = HttpServer::new(move || App::new().wrap(metrics.clone()))
.bind("0.0.0.0:9091")
.unwrap()
.disable_signals()
.run();
Ok(server)
}

View File

@ -1,2 +1,3 @@
pub mod candle_batching;
pub mod metrics;
pub mod trade_fetching;

View File

@ -11,7 +11,7 @@ const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
) -> Vec<OpenBookFillEvent> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for txn in txns.iter_mut() {
@ -42,7 +42,7 @@ pub fn parse_trades_from_openbook_txns(
fn parse_openbook_fills_from_logs(
logs: &Vec<String>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
block_time: i64,
) -> Option<Vec<OpenBookFillEvent>> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();

View File

@ -8,14 +8,16 @@ use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender;
use crate::{structs::openbook::OpenBookFillEvent, utils::Config};
use crate::{
structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::METRIC_FILLS_TOTAL,
};
use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape(
config: &Config,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
) {
let rpc_client =
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
@ -39,7 +41,7 @@ pub async fn scrape_transactions(
before_sig: Option<Signature>,
limit: Option<usize>,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
) -> Option<Signature> {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig,
@ -96,9 +98,11 @@ pub async fn scrape_transactions(
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if !fills.is_empty() {
for fill in fills.into_iter() {
let market_name = target_markets.get(&fill.market).unwrap();
if let Err(_) = fill_sender.send(fill).await {
panic!("receiver dropped");
}
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
}
}