Compare commits
2 Commits
3a515d61a0
...
95f923f7c7
Author | SHA1 | Date |
---|---|---|
Riordan Panayides | 95f923f7c7 | |
Riordan Panayides | c055d8b992 |
|
@ -13,6 +13,8 @@ use crate::{
|
||||||
|
|
||||||
use self::higher_order_candles::batch_higher_order_candles;
|
use self::higher_order_candles::batch_higher_order_candles;
|
||||||
|
|
||||||
|
use super::metrics::METRIC_CANDLES_TOTAL;
|
||||||
|
|
||||||
pub async fn batch_for_market(
|
pub async fn batch_for_market(
|
||||||
pool: &Pool,
|
pool: &Pool,
|
||||||
candles_sender: &Sender<Vec<Candle>>,
|
candles_sender: &Sender<Vec<Candle>>,
|
||||||
|
@ -47,14 +49,19 @@ async fn batch_inner(
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let market_name = &market.name.clone();
|
let market_name = &market.name.clone();
|
||||||
let candles = batch_1m_candles(pool, market).await?;
|
let candles = batch_1m_candles(pool, market).await?;
|
||||||
send_candles(candles, candles_sender).await;
|
send_candles(candles.clone(), candles_sender).await;
|
||||||
|
METRIC_CANDLES_TOTAL
|
||||||
|
.with_label_values(&[market.name.as_str()])
|
||||||
|
.inc_by(candles.clone().len() as u64);
|
||||||
for resolution in Resolution::iter() {
|
for resolution in Resolution::iter() {
|
||||||
if resolution == Resolution::R1m {
|
if resolution == Resolution::R1m {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
|
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
|
||||||
send_candles(candles, candles_sender).await;
|
send_candles(candles.clone(), candles_sender).await;
|
||||||
|
METRIC_CANDLES_TOTAL
|
||||||
|
.with_label_values(&[market.name.as_str()])
|
||||||
|
.inc_by(candles.clone().len() as u64);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,10 @@ use openbook_candles::structs::candle::Candle;
|
||||||
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
|
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
|
||||||
use openbook_candles::structs::openbook::OpenBookFillEvent;
|
use openbook_candles::structs::openbook::OpenBookFillEvent;
|
||||||
use openbook_candles::utils::Config;
|
use openbook_candles::utils::Config;
|
||||||
use openbook_candles::worker::metrics::serve_metrics;
|
use openbook_candles::worker::metrics::{
|
||||||
|
serve_metrics, METRIC_CANDLES_QUEUE_LENGTH, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE,
|
||||||
|
METRIC_FILLS_QUEUE_LENGTH,
|
||||||
|
};
|
||||||
use openbook_candles::worker::trade_fetching::scrape::scrape;
|
use openbook_candles::worker::trade_fetching::scrape::scrape;
|
||||||
use openbook_candles::{
|
use openbook_candles::{
|
||||||
database::{
|
database::{
|
||||||
|
@ -13,7 +16,7 @@ use openbook_candles::{
|
||||||
};
|
};
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::{collections::HashMap, str::FromStr};
|
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
|
||||||
|
@ -29,6 +32,9 @@ async fn main() -> anyhow::Result<()> {
|
||||||
rpc_url: rpc_url.clone(),
|
rpc_url: rpc_url.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let candles_queue_max_size = 10000;
|
||||||
|
let fills_queue_max_size = 10000;
|
||||||
|
|
||||||
let markets = load_markets(path_to_markets_json);
|
let markets = load_markets(path_to_markets_json);
|
||||||
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
|
||||||
let mut target_markets = HashMap::new();
|
let mut target_markets = HashMap::new();
|
||||||
|
@ -41,10 +47,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
setup_database(&pool).await?;
|
setup_database(&pool).await?;
|
||||||
let mut handles = vec![];
|
let mut handles = vec![];
|
||||||
|
|
||||||
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(10000);
|
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(fills_queue_max_size);
|
||||||
|
let scrape_fill_sender = fill_sender.clone();
|
||||||
handles.push(tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
scrape(&config, &fill_sender, &target_markets).await;
|
scrape(&config, &scrape_fill_sender, &target_markets).await;
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let fills_pool = pool.clone();
|
let fills_pool = pool.clone();
|
||||||
|
@ -56,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(100000);
|
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(candles_queue_max_size);
|
||||||
|
|
||||||
for market in market_infos.into_iter() {
|
for market in market_infos.into_iter() {
|
||||||
let sender = candle_sender.clone();
|
let sender = candle_sender.clone();
|
||||||
|
@ -78,6 +84,25 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
let monitor_pool = pool.clone();
|
||||||
|
let monitor_fill_channel = fill_sender.clone();
|
||||||
|
let monitor_candle_channel = candle_sender.clone();
|
||||||
|
handles.push(tokio::spawn(async move {
|
||||||
|
// TODO: maybe break this out into a new function
|
||||||
|
loop {
|
||||||
|
let pool_status = monitor_pool.status();
|
||||||
|
METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64);
|
||||||
|
METRIC_DB_POOL_SIZE.set(pool_status.size as i64);
|
||||||
|
|
||||||
|
METRIC_CANDLES_QUEUE_LENGTH
|
||||||
|
.set((candles_queue_max_size - monitor_candle_channel.capacity()) as i64);
|
||||||
|
METRIC_FILLS_QUEUE_LENGTH
|
||||||
|
.set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64);
|
||||||
|
|
||||||
|
tokio::time::sleep(WaitDuration::from_secs(10)).await;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
handles.push(tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
// TODO: this is ugly af
|
// TODO: this is ugly af
|
||||||
serve_metrics().await.unwrap().await.unwrap();
|
serve_metrics().await.unwrap().await.unwrap();
|
||||||
|
|
|
@ -1,18 +1,67 @@
|
||||||
use actix_web::{dev::Server, http::StatusCode, App, HttpServer};
|
use actix_web::{dev::Server, http::StatusCode, App, HttpServer};
|
||||||
use actix_web_prom::PrometheusMetricsBuilder;
|
use actix_web_prom::PrometheusMetricsBuilder;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry};
|
use prometheus::{
|
||||||
|
register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec,
|
||||||
|
IntGauge, Registry,
|
||||||
|
};
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref METRIC_REGISTRY: Registry =
|
static ref METRIC_REGISTRY: Registry =
|
||||||
Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap();
|
Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap();
|
||||||
|
pub static ref METRIC_TXS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
|
||||||
|
"txs_total",
|
||||||
|
"Total number of transactions scraped",
|
||||||
|
&["market", "status"],
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
|
pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
|
||||||
"fills_total",
|
"fills_total",
|
||||||
"Total number of fills scraped",
|
"Total number of fills parsed",
|
||||||
&["market"],
|
&["market"],
|
||||||
METRIC_REGISTRY
|
METRIC_REGISTRY
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
pub static ref METRIC_CANDLES_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
|
||||||
|
"candles_total",
|
||||||
|
"Total number of candles generated",
|
||||||
|
&["market"],
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!(
|
||||||
|
"fills_queue_length",
|
||||||
|
"Current length of the fills write queue",
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_CANDLES_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!(
|
||||||
|
"candles_queue_length",
|
||||||
|
"Current length of the candles write queue",
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec =
|
||||||
|
register_int_counter_vec_with_registry!(
|
||||||
|
"rpc_errors_total",
|
||||||
|
"RPC errors while scraping",
|
||||||
|
&["method"],
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_DB_POOL_SIZE: IntGauge = register_int_gauge_with_registry!(
|
||||||
|
"db_pool_size",
|
||||||
|
"Current size of the DB connection pool",
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_DB_POOL_AVAILABLE: IntGauge = register_int_gauge_with_registry!(
|
||||||
|
"db_pool_available",
|
||||||
|
"Available DB connections in the pool",
|
||||||
|
METRIC_REGISTRY
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn serve_metrics() -> anyhow::Result<Server> {
|
pub async fn serve_metrics() -> anyhow::Result<Server> {
|
||||||
|
|
|
@ -5,7 +5,10 @@ use solana_transaction_status::{
|
||||||
};
|
};
|
||||||
use std::{collections::HashMap, io::Error};
|
use std::{collections::HashMap, io::Error};
|
||||||
|
|
||||||
use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw};
|
use crate::{
|
||||||
|
structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw},
|
||||||
|
worker::metrics::METRIC_RPC_ERRORS_TOTAL,
|
||||||
|
};
|
||||||
|
|
||||||
const PROGRAM_DATA: &str = "Program data: ";
|
const PROGRAM_DATA: &str = "Program data: ";
|
||||||
|
|
||||||
|
@ -34,7 +37,11 @@ pub fn parse_trades_from_openbook_txns(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => {}
|
Err(_) => {
|
||||||
|
METRIC_RPC_ERRORS_TOTAL
|
||||||
|
.with_label_values(&["getTransaction"])
|
||||||
|
.inc();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fills_vector
|
fills_vector
|
||||||
|
|
|
@ -9,7 +9,9 @@ use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::METRIC_FILLS_TOTAL,
|
structs::openbook::OpenBookFillEvent,
|
||||||
|
utils::Config,
|
||||||
|
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::parsing::parse_trades_from_openbook_txns;
|
use super::parsing::parse_trades_from_openbook_txns;
|
||||||
|
@ -60,6 +62,9 @@ pub async fn scrape_transactions(
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
println!("Error in get_signatures_for_address_with_config: {}", e);
|
println!("Error in get_signatures_for_address_with_config: {}", e);
|
||||||
|
METRIC_RPC_ERRORS_TOTAL
|
||||||
|
.with_label_values(&["getSignaturesForAddress"])
|
||||||
|
.inc();
|
||||||
return before_sig;
|
return before_sig;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue