Add metrics to worker, add market name to map, cargo fmt, fix RAY/USDC id

This commit is contained in:
Riordan Panayides 2023-05-30 18:01:47 +01:00
parent 9489bd3e78
commit 35937c9572
13 changed files with 68 additions and 33 deletions

1
Cargo.lock generated
View File

@ -3451,6 +3451,7 @@ dependencies = [
"env_logger 0.10.0",
"futures 0.3.27",
"jsonrpc-core-client",
"lazy_static",
"log 0.4.17",
"native-tls",
"num-traits",

View File

@ -70,3 +70,4 @@ num_enum = "0.6.1"
config = "0.13.1"
prometheus = "0.13.3"
lazy_static = "1.4.0"

View File

@ -33,6 +33,6 @@
},
{
"name": "RAY/USDC",
"address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxV"
"address": "DZjbn4XC8qoHKikZqzmhemykVzmossoayV9ffbsUqxVj"
}
]

View File

@ -1,12 +1,7 @@
use deadpool_postgres::Object;
use openbook_candles::{
database::{
initialize::connect_to_database,
insert::{build_candles_upsert_statement},
},
database::{initialize::connect_to_database, insert::build_candles_upsert_statement},
structs::{
candle::Candle,
markets::{fetch_market_infos, load_markets},
@ -18,10 +13,9 @@ use openbook_candles::{
minute_candles::backfill_batch_1m_candles,
},
};
use std::{env};
use std::env;
use strum::IntoEnumIterator;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();

View File

@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> {
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0);
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
}
println!("{:?}", target_markets);
@ -57,7 +57,7 @@ async fn main() -> anyhow::Result<()> {
pub async fn backfill(
rpc_url: String,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
) -> anyhow::Result<()> {
println!("backfill started");
let mut before_sig: Option<Signature> = None;
@ -145,7 +145,7 @@ pub async fn get_transactions(
rpc_client: &RpcClient,
mut sigs: Vec<RpcConfirmedTransactionStatusWithSignature>,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
) {
sigs.retain(|sig| sig.err.is_none());
if sigs.last().is_none() {

View File

@ -67,10 +67,7 @@ pub async fn fetch_fills_from(
let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows
.into_iter()
.map(PgOpenBookFill::from_row)
.collect())
Ok(rows.into_iter().map(PgOpenBookFill::from_row).collect())
}
pub async fn fetch_latest_finished_candle(

View File

@ -36,6 +36,7 @@ pub async fn connect_to_database() -> anyhow::Result<Pool> {
MakeTlsConnector::new(
TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
// TODO: make this configurable
.identity(Identity::from_pkcs12(&client_key, "pass")?)
.danger_accept_invalid_certs(false)
.build()?,

View File

@ -1,8 +1,9 @@
use actix_web::{
http::StatusCode,
middleware::Logger,
rt::System,
web::{self, Data},
App, HttpServer, http::StatusCode,
App, HttpServer,
};
use actix_web_prom::PrometheusMetricsBuilder;
use candles::get_candles;
@ -92,13 +93,10 @@ async fn main() -> std::io::Result<()> {
// Thread to serve metrics endpoint privately
let private_server = thread::spawn(move || {
let sys = System::new();
let srv = HttpServer::new(move || {
App::new()
.wrap(private_metrics.clone())
})
.bind("0.0.0.0:9091")
.unwrap()
.run();
let srv = HttpServer::new(move || App::new().wrap(private_metrics.clone()))
.bind("0.0.0.0:9091")
.unwrap()
.run();
sys.block_on(srv).unwrap();
});

View File

@ -1,8 +1,8 @@
use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEvent;
use openbook_candles::utils::Config;
use openbook_candles::worker::metrics::serve_metrics;
use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::{
database::{
@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> {
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new();
for m in market_infos.clone() {
target_markets.insert(Pubkey::from_str(&m.address)?, 0);
target_markets.insert(Pubkey::from_str(&m.address)?, m.name);
}
println!("{:?}", target_markets);
@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?;
let mut handles = vec![];
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(1000);
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(10000);
handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await;
@ -56,7 +56,7 @@ async fn main() -> anyhow::Result<()> {
}
}));
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(1000);
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(100000);
for market in market_infos.into_iter() {
let sender = candle_sender.clone();
@ -78,6 +78,11 @@ async fn main() -> anyhow::Result<()> {
}
}));
handles.push(tokio::spawn(async move {
// TODO: this is ugly af
serve_metrics().await.unwrap().await.unwrap();
}));
futures::future::join_all(handles).await;
Ok(())

33
src/worker/metrics/mod.rs Normal file
View File

@ -0,0 +1,33 @@
use actix_web::{dev::Server, http::StatusCode, App, HttpServer};
use actix_web_prom::PrometheusMetricsBuilder;
use lazy_static::lazy_static;
use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry};
lazy_static! {
static ref METRIC_REGISTRY: Registry =
Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap();
pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"fills_total",
"Total number of fills scraped",
&["market"],
METRIC_REGISTRY
)
.unwrap();
}
pub async fn serve_metrics() -> anyhow::Result<Server> {
let metrics = PrometheusMetricsBuilder::new("openbook_candles_worker")
.registry(METRIC_REGISTRY.clone())
.exclude("/metrics")
.exclude_status(StatusCode::NOT_FOUND)
.endpoint("/metrics")
.build()
.unwrap();
let server = HttpServer::new(move || App::new().wrap(metrics.clone()))
.bind("0.0.0.0:9091")
.unwrap()
.disable_signals()
.run();
Ok(server)
}

View File

@ -1,2 +1,3 @@
pub mod candle_batching;
pub mod metrics;
pub mod trade_fetching;

View File

@ -11,7 +11,7 @@ const PROGRAM_DATA: &str = "Program data: ";
pub fn parse_trades_from_openbook_txns(
txns: &mut Vec<ClientResult<EncodedConfirmedTransactionWithStatusMeta>>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
) -> Vec<OpenBookFillEvent> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();
for txn in txns.iter_mut() {
@ -42,7 +42,7 @@ pub fn parse_trades_from_openbook_txns(
fn parse_openbook_fills_from_logs(
logs: &Vec<String>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
block_time: i64,
) -> Option<Vec<OpenBookFillEvent>> {
let mut fills_vector = Vec::<OpenBookFillEvent>::new();

View File

@ -8,14 +8,16 @@ use solana_transaction_status::UiTransactionEncoding;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender;
use crate::{structs::openbook::OpenBookFillEvent, utils::Config};
use crate::{
structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::METRIC_FILLS_TOTAL,
};
use super::parsing::parse_trades_from_openbook_txns;
pub async fn scrape(
config: &Config,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
) {
let rpc_client =
RpcClient::new_with_commitment(config.rpc_url.clone(), CommitmentConfig::processed());
@ -39,7 +41,7 @@ pub async fn scrape_transactions(
before_sig: Option<Signature>,
limit: Option<usize>,
fill_sender: &Sender<OpenBookFillEvent>,
target_markets: &HashMap<Pubkey, u8>,
target_markets: &HashMap<Pubkey, String>,
) -> Option<Signature> {
let rpc_config = GetConfirmedSignaturesForAddress2Config {
before: before_sig,
@ -96,9 +98,11 @@ pub async fn scrape_transactions(
let fills = parse_trades_from_openbook_txns(&mut txns, target_markets);
if !fills.is_empty() {
for fill in fills.into_iter() {
let market_name = target_markets.get(&fill.market).unwrap();
if let Err(_) = fill_sender.send(fill).await {
panic!("receiver dropped");
}
METRIC_FILLS_TOTAL.with_label_values(&[market_name]).inc();
}
}