Add Metrics (#2)

* Add request metrics to server

* Add mango mainnet markets

* Enable fly metric polling

* Add RAY/USDC market

* Add metrics to worker, add market name to map

* Add log levels, remove unused db pings

* filter redundant 1m candles

* add debugging output to candle batching, mark old candles as completed

---------

Co-authored-by: dboures <deanboures@gmail.com>
This commit is contained in:
riordanp 2023-06-08 14:26:43 +01:00 committed by GitHub
parent 93d95005f5
commit 0d6185a58a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 336 additions and 112 deletions

36
Cargo.lock generated
View File

@ -191,6 +191,18 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "actix-web-prom"
version = "0.6.0"
source = "git+https://github.com/riordanp/actix-web-prom.git?branch=exclude-paths#614434270cbcfdffa2b3a854aff4c1e49c4973fd"
dependencies = [
"actix-web",
"futures-core",
"pin-project-lite",
"prometheus",
"regex",
]
[[package]] [[package]]
name = "adler" name = "adler"
version = "1.0.2" version = "1.0.2"
@ -3423,6 +3435,7 @@ name = "openbook-candles"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"actix-web", "actix-web",
"actix-web-prom",
"anchor-client", "anchor-client",
"anchor-lang", "anchor-lang",
"anyhow", "anyhow",
@ -3438,11 +3451,13 @@ dependencies = [
"env_logger 0.10.0", "env_logger 0.10.0",
"futures 0.3.27", "futures 0.3.27",
"jsonrpc-core-client", "jsonrpc-core-client",
"lazy_static",
"log 0.4.17", "log 0.4.17",
"native-tls", "native-tls",
"num-traits", "num-traits",
"num_enum 0.6.1", "num_enum 0.6.1",
"postgres-native-tls", "postgres-native-tls",
"prometheus",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
@ -3966,6 +3981,21 @@ dependencies = [
"yansi", "yansi",
] ]
[[package]]
name = "prometheus"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c"
dependencies = [
"cfg-if 1.0.0",
"fnv",
"lazy_static",
"memchr",
"parking_lot 0.12.1",
"protobuf",
"thiserror",
]
[[package]] [[package]]
name = "prost" name = "prost"
version = "0.11.6" version = "0.11.6"
@ -4021,6 +4051,12 @@ dependencies = [
"prost", "prost",
] ]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]] [[package]]
name = "protobuf-src" name = "protobuf-src"
version = "1.1.0+21.5" version = "1.1.0+21.5"

View File

@ -62,9 +62,12 @@ serum_dex = { version = "0.5.10", git = "https://github.com/openbook-dex/program
anchor-lang = ">=0.25.0" anchor-lang = ">=0.25.0"
actix-web = "4" actix-web = "4"
actix-web-prom = { version = "0.6.0", git = "https://github.com/riordanp/actix-web-prom.git", branch = "exclude-paths" }
arrayref = "0.3.6" arrayref = "0.3.6"
bytemuck = "1.12.3" bytemuck = "1.12.3"
num_enum = "0.6.1" num_enum = "0.6.1"
config = "0.13.1" config = "0.13.1"
prometheus = "0.13.3"
lazy_static = "1.4.0"

View File

@ -17,3 +17,7 @@ kill_timeout = 30
hard_limit = 1024 hard_limit = 1024
soft_limit = 1024 soft_limit = 1024
type = "connections" type = "connections"
[metrics]
port = 9091
path = "/metrics"

View File

@ -7,3 +7,7 @@ kill_timeout = 30
[experimental] [experimental]
cmd = ["worker", "markets.json"] cmd = ["worker", "markets.json"]
[metrics]
port = 9091
path = "/metrics"

View File

@ -4,47 +4,35 @@
"address" : "8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6" "address" : "8BnEgHoWFysVcuFFX7QztDmzuH8r5ZFvyP3sYwn1XTh6"
}, },
{ {
"name" : "RLB/USDC", "name" : "wBTCpo/USDC",
"address" : "72h8rWaWwfPUL36PAFqyQZU8RT1V3FKG7Nc45aK89xTs" "address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN"
}, },
{ {
"name" : "MNGO/USDC", "name" : "MNGO/USDC",
"address" : "3NnxQvDcZXputNMxaxsGvqiKpqgPfSYXpNigZNFcknmD" "address" : "3NnxQvDcZXputNMxaxsGvqiKpqgPfSYXpNigZNFcknmD"
}, },
{ {
"name" : "BONK/SOL", "name": "BONK/SOL",
"address" : "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF" "address": "Hs97TCZeuYiJxooo3U73qEHXg3dKpRL4uYKYRryEK9CF"
}, },
{ {
"name" : "BONK/USDC", "name": "DUAL/USDC",
"address" : "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71" "address": "H6rrYK3SUHF2eguZCyJxnSBMJqjXhUtuaki6PHiutvum"
},
{
"name" : "WBTC/USDC",
"address" : "3BAKsQd3RuhZKES2DGysMhjBdwjZYKYmxRqnSMtZ4KSN"
}, },
{ {
"name": "mSOL/USDC", "name": "mSOL/USDC",
"address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD" "address": "9Lyhks5bQQxb9EyyX55NtgKQzpM4WK7JCmeaWuQ5MoXD"
}, },
{ {
"name": "SOL/USDT", "name": "ETHpo/USDC",
"address": "2AdaV97p6SfkuMQJdu8DHhBhmJe7oWdvbm52MJfYQmfA"
},
{
"name": "USDT/USDC",
"address": "B2na8Awyd7cpC59iEU43FagJAPLigr3AP3s38KM982bu"
},
{
"name": "ETH/USDC",
"address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4" "address": "BbJgE7HZMaDp5NTYvRh5jZSkQPVDTU8ubPFtpogUkEj4"
}, },
{
"name": "BONK/USDC",
"address": "8PhnCfgqpgFM7ZJvttGdBVMXHuU4Q23ACxCvWkbs1M71"
},
{ {
"name": "RAY/USDC", "name": "RAY/USDC",
"address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj" "address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj"
},
{
"name": "DUAL/USDC",
"address": "H6rrYK3SUHF2eguZCyJxnSBMJqjXhUtuaki6PHiutvum"
} }
] ]

View File

@ -1,12 +1,7 @@
use deadpool_postgres::Object; use deadpool_postgres::Object;
use openbook_candles::{ use openbook_candles::{
database::{ database::{initialize::connect_to_database, insert::build_candles_upsert_statement},
initialize::connect_to_database,
insert::{build_candles_upsert_statement},
},
structs::{ structs::{
candle::Candle, candle::Candle,
markets::{fetch_market_infos, load_markets}, markets::{fetch_market_infos, load_markets},
@ -18,10 +13,9 @@ use openbook_candles::{
minute_candles::backfill_batch_1m_candles, minute_candles::backfill_batch_1m_candles,
}, },
}; };
use std::{env}; use std::env;
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();

View File

@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> {
let market_infos = fetch_market_infos(&config, markets.clone()).await?; let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new(); let mut target_markets = HashMap::new();
for m in market_infos.clone() { for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0); target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
} }
println!("{:?}", target_markets); println!("{:?}", target_markets);
@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> {
pub async fn backfill( pub async fn backfill(
rpc_url: String, rpc_url: String,
fill_sender: &Sender<OpenBookFillEvent>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
println!("backfill started"); println!("backfill started");
let mut before_sig: Option<Signature> = None; let mut before_sig: Option<Signature> = None;
@ -145,7 +145,7 @@ pub async fn get_transactions(
rpc_client: &RpcClient, rpc_client: &RpcClient,
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>, mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
fill_sender: &Sender<OpenBookFillEvent>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
) { ) {
sigs.retain(|sig| sig.err.is_none()); sigs.retain(|sig| sig.err.is_none());
if sigs.last().is_none() { if sigs.last().is_none() {

View File

@ -59,10 +59,7 @@ pub async fn fetch_fills_from(
let rows = client let rows = client
.query(stmt, &[&market_address_string, &start_time, &end_time]) .query(stmt, &[&market_address_string, &start_time, &end_time])
.await?; .await?;
Ok(rows Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect())
.into_iter()
.map(PgOpenBookFill::from_row)
.collect())
} }
pub async fn fetch_latest_finished_candle( pub async fn fetch_latest_finished_candle(

View File

@ -36,6 +36,7 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
MakeTlsConnector::new( MakeTlsConnector::new(
TlsConnector::builder() TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_cert)?) .add_root_certificate(Certificate::from_pem(&ca_cert)?)
// TODO: make this configurable
.identity(Identity::from_pkcs12(&client_key, "pass")?) .identity(Identity::from_pkcs12(&client_key, "pass")?)
.danger_accept_invalid_certs(false) .danger_accept_invalid_certs(false)
.build()?, .build()?,

View File

@ -1,4 +1,5 @@
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::debug;
use std::{ use std::{
collections::{hash_map::DefaultHasher, HashMap}, collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher}, hash::{Hash, Hasher},
@ -36,22 +37,14 @@ pub async fn persist_fill_events(
} }
if !write_batch.is_empty() { if !write_batch.is_empty() {
// print!("writing: {:?} events to DB\n", write_batch.len()); debug!("writing: {:?} events to DB\n", write_batch.len());
// match conn.ping().await {
// Ok(_) => {
let upsert_statement = build_fills_upsert_statement(write_batch); let upsert_statement = build_fills_upsert_statement(write_batch);
client client
.execute(&upsert_statement, &[]) .execute(&upsert_statement, &[])
.await .await
.map_err_anyhow() .map_err_anyhow()
.unwrap(); .unwrap();
// }
// Err(_) => {
// println!("Fills ping failed");
// break;
// }
// }
} }
} }
} }
@ -62,14 +55,12 @@ pub async fn persist_candles(
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let client = pool.get().await.unwrap(); let client = pool.get().await.unwrap();
loop { loop {
// match client.ping().await {
// Ok(_) => {
match candles_receiver.try_recv() { match candles_receiver.try_recv() {
Ok(candles) => { Ok(candles) => {
if candles.is_empty() { if candles.is_empty() {
continue; continue;
} }
// print!("writing: {:?} candles to DB\n", candles.len()); debug!("writing: {:?} candles to DB\n", candles.len());
let upsert_statement = build_candles_upsert_statement(candles); let upsert_statement = build_candles_upsert_statement(candles);
client client
.execute(&upsert_statement, &[]) .execute(&upsert_statement, &[])
@ -82,12 +73,6 @@ pub async fn persist_candles(
panic!("Candles sender must stay alive") panic!("Candles sender must stay alive")
} }
}; };
// }
// Err(_) => {
// println!("Candle ping failed");
// break;
// }
// };
} }
} }

View File

@ -1,9 +1,13 @@
use actix_web::{ use actix_web::{
http::StatusCode,
middleware::Logger, middleware::Logger,
rt::System,
web::{self, Data}, web::{self, Data},
App, HttpServer, App, HttpServer,
}; };
use actix_web_prom::PrometheusMetricsBuilder;
use candles::get_candles; use candles::get_candles;
use prometheus::Registry;
use markets::get_markets; use markets::get_markets;
use openbook_candles::{ use openbook_candles::{
@ -12,6 +16,7 @@ use openbook_candles::{
utils::{Config, WebContext}, utils::{Config, WebContext},
}; };
use std::env; use std::env;
use std::thread;
use traders::{get_top_traders_by_base_volume, get_top_traders_by_quote_volume}; use traders::{get_top_traders_by_base_volume, get_top_traders_by_quote_volume};
mod candles; mod candles;
@ -39,6 +44,22 @@ async fn main() -> std::io::Result<()> {
let market_infos = fetch_market_infos(&config, markets).await.unwrap(); let market_infos = fetch_market_infos(&config, markets).await.unwrap();
let pool = connect_to_database().await.unwrap(); let pool = connect_to_database().await.unwrap();
let registry = Registry::new();
// For serving metrics on a private port
let private_metrics = PrometheusMetricsBuilder::new("openbook_candles_server_private")
.registry(registry.clone())
.exclude("/metrics")
.exclude_status(StatusCode::NOT_FOUND)
.endpoint("/metrics")
.build()
.unwrap();
// For collecting metrics on the public api, excluding 404s
let public_metrics = PrometheusMetricsBuilder::new("openbook_candles_server")
.registry(registry.clone())
.exclude_status(StatusCode::NOT_FOUND)
.build()
.unwrap();
let context = Data::new(WebContext { let context = Data::new(WebContext {
rpc_url, rpc_url,
pool, pool,
@ -46,20 +67,40 @@ async fn main() -> std::io::Result<()> {
}); });
println!("Starting server"); println!("Starting server");
HttpServer::new(move || { // Thread to serve public API
App::new() let public_server = thread::spawn(move || {
.wrap(Logger::default()) let sys = System::new();
.app_data(context.clone()) let srv = HttpServer::new(move || {
.service( App::new()
web::scope("/api") .wrap(Logger::default())
.service(get_candles) .wrap(public_metrics.clone())
.service(get_top_traders_by_base_volume) .app_data(context.clone())
.service(get_top_traders_by_quote_volume) .service(
.service(get_markets) web::scope("/api")
.service(coingecko::service()), .service(get_candles)
) .service(get_top_traders_by_base_volume)
}) .service(get_top_traders_by_quote_volume)
.bind(&bind_addr)? .service(get_markets)
.run() .service(coingecko::service()),
.await )
})
.bind(&bind_addr)
.unwrap()
.run();
sys.block_on(srv).unwrap();
});
// Thread to serve metrics endpoint privately
let private_server = thread::spawn(move || {
let sys = System::new();
let srv = HttpServer::new(move || App::new().wrap(private_metrics.clone()))
.bind("0.0.0.0:9091")
.unwrap()
.run();
sys.block_on(srv).unwrap();
});
private_server.join().unwrap();
public_server.join().unwrap();
Ok(())
} }

View File

@ -3,7 +3,7 @@ use tokio_postgres::Row;
use super::resolution::Resolution; use super::resolution::Resolution;
#[derive(Clone, Debug)] #[derive(Clone, Debug, PartialEq)]
pub struct Candle { pub struct Candle {
pub market_name: String, pub market_name: String,
pub start_time: DateTime<Utc>, pub start_time: DateTime<Utc>,

View File

@ -1,5 +1,6 @@
use chrono::{DateTime, Duration, DurationRound, Utc}; use chrono::{DateTime, Duration, DurationRound, Utc};
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::debug;
use std::cmp::max; use std::cmp::max;
use crate::{ use crate::{
@ -46,12 +47,12 @@ pub async fn batch_higher_order_candles(
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution()) fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution())
.await?; .await?;
if constituent_candles.is_empty() { if constituent_candles.is_empty() {
// println!( debug!(
// "Batching {}, but no candles found for: {:?}, {}", "Batching {}, but no candles found for: {:?}, {}",
// resolution, resolution,
// market_name, market_name,
// resolution.get_constituent_resolution() resolution.get_constituent_resolution()
// ); );
return Ok(Vec::new()); return Ok(Vec::new());
} }
let start_time = constituent_candles[0].start_time.duration_trunc(day())?; let start_time = constituent_candles[0].start_time.duration_trunc(day())?;
@ -82,7 +83,7 @@ fn combine_into_higher_order_candles(
st: DateTime<Utc>, st: DateTime<Utc>,
seed_candle: Candle, seed_candle: Candle,
) -> Vec<Candle> { ) -> Vec<Candle> {
// println!("target_resolution: {}", target_resolution); debug!("combining for target_resolution: {}", target_resolution);
let duration = target_resolution.get_duration(); let duration = target_resolution.get_duration();

View File

@ -2,9 +2,10 @@ use std::cmp::min;
use chrono::{DateTime, Duration, DurationRound, Utc}; use chrono::{DateTime, Duration, DurationRound, Utc};
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::debug;
use crate::{ use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle}, database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle, fetch_candles_from},
structs::{ structs::{
candle::Candle, candle::Candle,
markets::MarketInfo, markets::MarketInfo,
@ -21,12 +22,16 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
match latest_candle { match latest_candle {
Some(candle) => { Some(candle) => {
println!("{}: latest finished candle time {}", market_name, candle.end_time);
let start_time = candle.end_time; let start_time = candle.end_time;
let end_time = min( let end_time = min(
start_time + day(), start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?, Utc::now().duration_trunc(Duration::minutes(1))?,
); );
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
let existing_candles = fetch_candles_from(pool, market_name, Resolution::R1m, candle.start_time, end_time).await?;
println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time);
let candles = combine_fills_into_1m_candles( let candles = combine_fills_into_1m_candles(
&mut fills, &mut fills,
market, market,
@ -34,13 +39,16 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
end_time, end_time,
Some(candle.close), Some(candle.close),
); );
Ok(candles)
println!("{}: filtering {} new candles on {} existing candles from {} to {}", market_name, candles.clone().len(), existing_candles.clone().len(), start_time, end_time);
Ok(filter_redundant_candles(existing_candles, candles.clone()))
} }
None => { None => {
println!("{}: no finished candle", market_name);
let earliest_fill = fetch_earliest_fill(pool, market_address).await?; let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
if earliest_fill.is_none() { if earliest_fill.is_none() {
println!("No fills found for: {:?}", market_name); debug!("No fills found for: {:?}", market_name);
return Ok(Vec::new()); return Ok(Vec::new());
} }
@ -53,6 +61,7 @@ pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Resul
Utc::now().duration_trunc(Duration::minutes(1))?, Utc::now().duration_trunc(Duration::minutes(1))?,
); );
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?; let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
println!("{}: combining {} fills from {} to {}", market_name, fills.clone().len(), start_time, end_time);
if !fills.is_empty() { if !fills.is_empty() {
let candles = let candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None); combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
@ -98,7 +107,7 @@ fn combine_fills_into_1m_candles(
while matches!(fills_iter.peek(), Some(f) if f.time < end_time) { while matches!(fills_iter.peek(), Some(f) if f.time < end_time) {
let fill = fills_iter.next().unwrap(); let fill = fills_iter.next().unwrap();
println!("adding fill from {}", fill.time);
let (price, volume) = let (price, volume) =
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals); calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
@ -112,8 +121,16 @@ fn combine_fills_into_1m_candles(
candles[i].start_time = start_time; candles[i].start_time = start_time;
candles[i].end_time = end_time; candles[i].end_time = end_time;
candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time); candles[i].complete = matches!(fills_iter.peek(), Some(f) if f.time > end_time) || end_time < Utc::now() - Duration::days(1);
if candles[i].complete {
println!("candle {} complete with end time {}", i, end_time);
} else {
let peeked_fill = fills_iter.peek();
match peeked_fill {
Some(f) => println!("candle {} incomplete, peeked fill was at {} and end time was {}", i, f.time, end_time),
None => {}
}
}
start_time = end_time; start_time = end_time;
end_time += Duration::minutes(1); end_time += Duration::minutes(1);
} }
@ -121,6 +138,17 @@ fn combine_fills_into_1m_candles(
candles candles
} }
fn filter_redundant_candles(existing_candles: Vec<Candle>, mut candles: Vec<Candle>) -> Vec<Candle> {
candles.retain(|c| {
!existing_candles.contains(c)
});
println!("trimmed: {:?}", candles.len());
// println!("{:?}", candles.last());
println!("candles: {:?}", existing_candles.len());
// println!("{:?}", existing_candles.last());
candles
}
/// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end. /// Goes from the earliest fill to the most recent. Will mark candles as complete if there are missing gaps of fills between the start and end.
pub async fn backfill_batch_1m_candles( pub async fn backfill_batch_1m_candles(
pool: &Pool, pool: &Pool,
@ -132,7 +160,7 @@ pub async fn backfill_batch_1m_candles(
let earliest_fill = fetch_earliest_fill(pool, &market.address).await?; let earliest_fill = fetch_earliest_fill(pool, &market.address).await?;
if earliest_fill.is_none() { if earliest_fill.is_none() {
println!("No fills found for: {:?}", &market_name); debug!("No fills found for: {:?}", &market_name);
return Ok(candles); return Ok(candles);
} }

View File

@ -3,6 +3,7 @@ pub mod minute_candles;
use chrono::Duration; use chrono::Duration;
use deadpool_postgres::Pool; use deadpool_postgres::Pool;
use log::{error, warn};
use strum::IntoEnumIterator; use strum::IntoEnumIterator;
use tokio::{sync::mpsc::Sender, time::sleep}; use tokio::{sync::mpsc::Sender, time::sleep};
@ -13,6 +14,8 @@ use crate::{
use self::higher_order_candles::batch_higher_order_candles; use self::higher_order_candles::batch_higher_order_candles;
use super::metrics::METRIC_CANDLES_TOTAL;
pub async fn batch_for_market( pub async fn batch_for_market(
pool: &Pool, pool: &Pool,
candles_sender: &Sender<Vec<Candle>>, candles_sender: &Sender<Vec<Candle>>,
@ -21,13 +24,13 @@ pub async fn batch_for_market(
loop { loop {
let sender = candles_sender.clone(); let sender = candles_sender.clone();
let market_clone = market.clone(); let market_clone = market.clone();
// let client = pool.get().await?;
loop { loop {
sleep(Duration::milliseconds(2000).to_std()?).await; sleep(Duration::milliseconds(2000).to_std()?).await;
match batch_inner(pool, &sender, &market_clone).await { match batch_inner(pool, &sender, &market_clone).await {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
println!( error!(
"Batching thread failed for {:?} with error: {:?}", "Batching thread failed for {:?} with error: {:?}",
market_clone.name.clone(), market_clone.name.clone(),
e e
@ -36,7 +39,7 @@ pub async fn batch_for_market(
} }
}; };
} }
println!("Restarting {:?} batching thread", market.name); warn!("Restarting {:?} batching thread", market.name);
} }
} }
@ -47,14 +50,19 @@ async fn batch_inner(
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let market_name = &market.name.clone(); let market_name = &market.name.clone();
let candles = batch_1m_candles(pool, market).await?; let candles = batch_1m_candles(pool, market).await?;
send_candles(candles, candles_sender).await; send_candles(candles.clone(), candles_sender).await;
METRIC_CANDLES_TOTAL
.with_label_values(&[market.name.as_str()])
.inc_by(candles.clone().len() as u64);
for resolution in Resolution::iter() { for resolution in Resolution::iter() {
if resolution == Resolution::R1m { if resolution == Resolution::R1m {
continue; continue;
} }
let candles = batch_higher_order_candles(pool, market_name, resolution).await?; let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
send_candles(candles, candles_sender).await; send_candles(candles.clone(), candles_sender).await;
METRIC_CANDLES_TOTAL
.with_label_values(&[market.name.as_str()])
.inc_by(candles.clone().len() as u64);
} }
Ok(()) Ok(())
} }

View File

@ -1,8 +1,12 @@
use log::{error, info};
use openbook_candles::structs::candle::Candle; use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets}; use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEvent; use openbook_candles::structs::openbook::OpenBookFillEvent;
use openbook_candles::utils::Config; use openbook_candles::utils::Config;
use openbook_candles::worker::metrics::{
serve_metrics, METRIC_CANDLES_QUEUE_LENGTH, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE,
METRIC_FILLS_QUEUE_LENGTH,
};
use openbook_candles::worker::trade_fetching::scrape::scrape; use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::{ use openbook_candles::{
database::{ database::{
@ -13,11 +17,12 @@ use openbook_candles::{
}; };
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::env; use std::env;
use std::{collections::HashMap, str::FromStr}; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc; use tokio::sync::mpsc;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)] #[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
env_logger::init();
dotenv::dotenv().ok(); dotenv::dotenv().ok();
let args: Vec<String> = env::args().collect(); let args: Vec<String> = env::args().collect();
@ -29,22 +34,25 @@ async fn main() -> anyhow::Result<()> {
rpc_url: rpc_url.clone(), rpc_url: rpc_url.clone(),
}; };
let candles_queue_max_size = 10000;
let fills_queue_max_size = 10000;
let markets = load_markets(path_to_markets_json); let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?; let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new(); let mut target_markets = HashMap::new();
for m in market_infos.clone() { for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0); target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
} }
println!("{:?}", target_markets); info!("{:?}", target_markets);
let pool = connect_to_database().await?; let pool = connect_to_database().await?;
setup_database(&pool).await?; setup_database(&pool).await?;
let mut handles = vec![]; let mut handles = vec![];
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000); let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(fills_queue_max_size);
let scrape_fill_sender = fill_sender.clone();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; scrape(&config, &scrape_fill_sender, &target_markets).await;
})); }));
let fills_pool = pool.clone(); let fills_pool = pool.clone();
@ -56,7 +64,7 @@ async fn main() -> anyhow::Result<()> {
} }
})); }));
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(1000); let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(candles_queue_max_size);
for market in market_infos.into_iter() { for market in market_infos.into_iter() {
let sender = candle_sender.clone(); let sender = candle_sender.clone();
@ -65,7 +73,7 @@ async fn main() -> anyhow::Result<()> {
batch_for_market(&batch_pool, &sender, &market) batch_for_market(&batch_pool, &sender, &market)
.await .await
.unwrap(); .unwrap();
println!("SOMETHING WENT WRONG"); error!("batching halted for market {}", &market.name);
})); }));
} }
@ -78,6 +86,30 @@ async fn main() -> anyhow::Result<()> {
} }
})); }));
let monitor_pool = pool.clone();
let monitor_fill_channel = fill_sender.clone();
let monitor_candle_channel = candle_sender.clone();
handles.push(tokio::spawn(async move {
// TODO: maybe break this out into a new function
loop {
let pool_status = monitor_pool.status();
METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64);
METRIC_DB_POOL_SIZE.set(pool_status.size as i64);
METRIC_CANDLES_QUEUE_LENGTH
.set((candles_queue_max_size - monitor_candle_channel.capacity()) as i64);
METRIC_FILLS_QUEUE_LENGTH
.set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64);
tokio::time::sleep(WaitDuration::from_secs(10)).await;
}
}));
handles.push(tokio::spawn(async move {
// TODO: this is ugly af
serve_metrics().await.unwrap().await.unwrap();
}));
futures::future::join_all(handles).await; futures::future::join_all(handles).await;
Ok(()) Ok(())

82
src/worker/metrics/mod.rs Normal file
View File

@ -0,0 +1,82 @@
use actix_web::{dev::Server, http::StatusCode, App, HttpServer};
use actix_web_prom::PrometheusMetricsBuilder;
use lazy_static::lazy_static;
use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec,
IntGauge, Registry,
};
lazy_static! {
static ref METRIC_REGISTRY: Registry =
Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap();
pub static ref METRIC_TXS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"txs_total",
"Total number of transactions scraped",
&["market", "status"],
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"fills_total",
"Total number of fills parsed",
&["market"],
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_CANDLES_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"candles_total",
"Total number of candles generated",
&["market"],
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!(
"fills_queue_length",
"Current length of the fills write queue",
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_CANDLES_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!(
"candles_queue_length",
"Current length of the candles write queue",
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec =
register_int_counter_vec_with_registry!(
"rpc_errors_total",
"RPC errors while scraping",
&["method"],
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_DB_POOL_SIZE: IntGauge = register_int_gauge_with_registry!(
"db_pool_size",
"Current size of the DB connection pool",
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_DB_POOL_AVAILABLE: IntGauge = register_int_gauge_with_registry!(
"db_pool_available",
"Available DB connections in the pool",
METRIC_REGISTRY
)
.unwrap();
}
pub async fn serve_metrics() -> anyhow::Result<Server> {
let metrics = PrometheusMetricsBuilder::new("openbook_candles_worker")
.registry(METRIC_REGISTRY.clone())
.exclude("/metrics")
.exclude_status(StatusCode::NOT_FOUND)
.endpoint("/metrics")
.build()
.unwrap();
let server = HttpServer::new(move || App::new().wrap(metrics.clone()))
.bind("0.0.0.0:9091")
.unwrap()
.disable_signals()
.run();
Ok(server)
}

View File

@ -1,2 +1,3 @@
pub mod candle_batching; pub mod candle_batching;
pub mod metrics;
pub mod trade_fetching; pub mod trade_fetching;

View File

@ -1,3 +1,4 @@
use log::warn;
use solana_client::client_error::Result as ClientResult; use solana_client::client_error::Result as ClientResult;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_transaction_status::{ use solana_transaction_status::{
@ -5,13 +6,16 @@ use solana_transaction_status::{
}; };
use std::{collections::HashMap, io::Error}; use std::{collections::HashMap, io::Error};
use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw}; use crate::{
structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw},
worker::metrics::METRIC_RPC_ERRORS_TOTAL,
};
const PROGRAM_DATA: &str = "Program data: "; const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns( pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>, txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
) -> Vec<OpenBookFillEvent> { ) -> Vec<OpenBookFillEvent> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new(); let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for txn in txns.iter_mut() { for txn in txns.iter_mut() {
@ -34,7 +38,12 @@ pub fn parse_trades_from_openbook_txns(
} }
} }
} }
Err(_) => {} Err(e) => {
warn!("rpc error in get_transaction {}", e);
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getTransaction"])
.inc();
}
} }
} }
fills_vector fills_vector
@ -42,7 +51,7 @@ pub fn parse_trades_from_openbook_txns(
fn parse_openbook_fills_from_logs( fn parse_openbook_fills_from_logs(
logs: &Vec<String>, logs: &Vec<String>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
block_time: i64, block_time: i64,
) -> Option<Vec<OpenBookFillEvent>> { ) -> Option<Vec<OpenBookFillEvent>> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new(); let mut fills_vector = Vec::<OpenBookFillEvent>::new();

View File

@ -1,4 +1,5 @@
use futures::future::join_all; use futures::future::join_all;
use log::{debug, warn};
use solana_client::{ use solana_client::{
nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::RpcTransactionConfig, rpc_config::RpcTransactionConfig,
@ -8,14 +9,18 @@ use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration}; use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use crate::{structs::openbook::OpenBookFillEvent, utils::Config}; use crate::{
structs::openbook::OpenBookFillEvent,
utils::Config,
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL},
};
use super::parsing::parse_trades_from_openbook_txns; use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape( pub async fn scrape(
config: &Config, config: &Config,
fill_sender: &Sender<OpenBookFillEvent>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
) { ) {
let rpc_client = let rpc_client =
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed()); RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
@ -39,7 +44,7 @@ pub async fn scrape_transactions(
before_sig: Option<Signature>, before_sig: Option<Signature>,
limit: Option<usize>, limit: Option<usize>,
fill_sender: &Sender<OpenBookFillEvent>, fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>, target_markets: &HashMap<Pubkey, String>,
) -> Option<Signature> { ) -> Option<Signature> {
let rpc_config = GetConfirmedSignaturesForAddress2Config { let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig, before: before_sig,
@ -57,13 +62,16 @@ pub async fn scrape_transactions(
{ {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
println!("Error in get_signatures_for_address_with_config: {}", e); warn!("rpc error in get_signatures_for_address_with_config: {}", e);
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getSignaturesForAddress"])
.inc();
return before_sig; return before_sig;
} }
}; };
if sigs.is_empty() { if sigs.is_empty() {
println!("No signatures found"); debug!("No signatures found");
return before_sig; return before_sig;
} }
@ -96,9 +104,11 @@ pub async fn scrape_transactions(
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets); let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if !fills.is_empty() { if !fills.is_empty() {
for fill in fills.into_iter() { for fill in fills.into_iter() {
let market_name = target_markets.get(&fill.market).unwrap();
if let Err(_) = fill_sender.send(fill).await { if let Err(_) = fill_sender.send(fill).await {
panic!("receiver dropped"); panic!("receiver dropped");
} }
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
} }
} }