Compare commits

..

No commits in common. "95f923f7c7dd31b64dcd3ad6f578dc6e2c2cb566" and "3a515d61a0fd8906f323586bb29d032fc50d6a14" have entirely different histories.

5 changed files with 14 additions and 107 deletions

View File

@ -13,8 +13,6 @@ use crate::{
use self::higher_order_candles::batch_higher_order_candles;
use super::metrics::METRIC_CANDLES_TOTAL;
pub async fn batch_for_market(
pool: &Pool,
candles_sender: &Sender<Vec<Candle>>,
@ -49,19 +47,14 @@ async fn batch_inner(
) -> anyhow::Result<()> {
let market_name = &market.name.clone();
let candles = batch_1m_candles(pool, market).await?;
send_candles(candles.clone(), candles_sender).await;
METRIC_CANDLES_TOTAL
.with_label_values(&[market.name.as_str()])
.inc_by(candles.clone().len() as u64);
send_candles(candles, candles_sender).await;
for resolution in Resolution::iter() {
if resolution == Resolution::R1m {
continue;
}
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
send_candles(candles.clone(), candles_sender).await;
METRIC_CANDLES_TOTAL
.with_label_values(&[market.name.as_str()])
.inc_by(candles.clone().len() as u64);
send_candles(candles, candles_sender).await;
}
Ok(())
}

View File

@ -2,10 +2,7 @@ use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEvent;
use openbook_candles::utils::Config;
use openbook_candles::worker::metrics::{
serve_metrics, METRIC_CANDLES_QUEUE_LENGTH, METRIC_DB_POOL_AVAILABLE, METRIC_DB_POOL_SIZE,
METRIC_FILLS_QUEUE_LENGTH,
};
use openbook_candles::worker::metrics::serve_metrics;
use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::{
database::{
@ -16,7 +13,7 @@ use openbook_candles::{
};
use solana_sdk::pubkey::Pubkey;
use std::env;
use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use std::{collections::HashMap, str::FromStr};
use tokio::sync::mpsc;
#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
@ -32,9 +29,6 @@ async fn main() -> anyhow::Result<()> {
rpc_url: rpc_url.clone(),
};
let candles_queue_max_size = 10000;
let fills_queue_max_size = 10000;
let markets = load_markets(path_to_markets_json);
let market_infos = fetch_market_infos(&config, markets.clone()).await?;
let mut target_markets = HashMap::new();
@ -47,10 +41,10 @@ async fn main() -> anyhow::Result<()> {
setup_database(&pool).await?;
let mut handles = vec![];
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(fills_queue_max_size);
let scrape_fill_sender = fill_sender.clone();
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEvent>(10000);
handles.push(tokio::spawn(async move {
scrape(&config, &scrape_fill_sender, &target_markets).await;
scrape(&config, &fill_sender, &target_markets).await;
}));
let fills_pool = pool.clone();
@ -62,7 +56,7 @@ async fn main() -> anyhow::Result<()> {
}
}));
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(candles_queue_max_size);
let (candle_sender, mut candle_receiver) = mpsc::channel::<Vec<Candle>>(100000);
for market in market_infos.into_iter() {
let sender = candle_sender.clone();
@ -84,25 +78,6 @@ async fn main() -> anyhow::Result<()> {
}
}));
let monitor_pool = pool.clone();
let monitor_fill_channel = fill_sender.clone();
let monitor_candle_channel = candle_sender.clone();
handles.push(tokio::spawn(async move {
// TODO: maybe break this out into a new function
loop {
let pool_status = monitor_pool.status();
METRIC_DB_POOL_AVAILABLE.set(pool_status.available as i64);
METRIC_DB_POOL_SIZE.set(pool_status.size as i64);
METRIC_CANDLES_QUEUE_LENGTH
.set((candles_queue_max_size - monitor_candle_channel.capacity()) as i64);
METRIC_FILLS_QUEUE_LENGTH
.set((fills_queue_max_size - monitor_fill_channel.capacity()) as i64);
tokio::time::sleep(WaitDuration::from_secs(10)).await;
}
}));
handles.push(tokio::spawn(async move {
// TODO: this is ugly af
serve_metrics().await.unwrap().await.unwrap();

View File

@ -1,67 +1,18 @@
use actix_web::{dev::Server, http::StatusCode, App, HttpServer};
use actix_web_prom::PrometheusMetricsBuilder;
use lazy_static::lazy_static;
use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec,
IntGauge, Registry,
};
use prometheus::{register_int_counter_vec_with_registry, IntCounterVec, Registry};
lazy_static! {
static ref METRIC_REGISTRY: Registry =
Registry::new_custom(Some("openbook_candles_worker".to_string()), None).unwrap();
pub static ref METRIC_TXS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"txs_total",
"Total number of transactions scraped",
&["market", "status"],
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_FILLS_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"fills_total",
"Total number of fills parsed",
"Total number of fills scraped",
&["market"],
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_CANDLES_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"candles_total",
"Total number of candles generated",
&["market"],
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_FILLS_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!(
"fills_queue_length",
"Current length of the fills write queue",
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_CANDLES_QUEUE_LENGTH: IntGauge = register_int_gauge_with_registry!(
"candles_queue_length",
"Current length of the candles write queue",
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_RPC_ERRORS_TOTAL: IntCounterVec =
register_int_counter_vec_with_registry!(
"rpc_errors_total",
"RPC errors while scraping",
&["method"],
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_DB_POOL_SIZE: IntGauge = register_int_gauge_with_registry!(
"db_pool_size",
"Current size of the DB connection pool",
METRIC_REGISTRY
)
.unwrap();
pub static ref METRIC_DB_POOL_AVAILABLE: IntGauge = register_int_gauge_with_registry!(
"db_pool_available",
"Available DB connections in the pool",
METRIC_REGISTRY
)
.unwrap();
}
pub async fn serve_metrics() -> anyhow::Result<Server> {

View File

@ -5,10 +5,7 @@ use solana_transaction_status::{
};
use std::{collections::HashMap, io::Error};
use crate::{
structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw},
worker::metrics::METRIC_RPC_ERRORS_TOTAL,
};
use crate::structs::openbook::{OpenBookFillEvent, OpenBookFillEventRaw};
const PROGRAM_DATA: &str = "Program data: ";
@ -37,11 +34,7 @@ pub fn parse_trades_from_openbook_txns(
}
}
}
Err(_) => {
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getTransaction"])
.inc();
}
Err(_) => {}
}
}
fills_vector

View File

@ -9,9 +9,7 @@ use std::{collections::HashMap, str::FromStr, time::Duration as WaitDuration};
use tokio::sync::mpsc::Sender;
use crate::{
structs::openbook::OpenBookFillEvent,
utils::Config,
worker::metrics::{METRIC_FILLS_TOTAL, METRIC_RPC_ERRORS_TOTAL},
structs::openbook::OpenBookFillEvent, utils::Config, worker::metrics::METRIC_FILLS_TOTAL,
};
use super::parsing::parse_trades_from_openbook_txns;
@ -62,9 +60,6 @@ pub async fn scrape_transactions(
Ok(s) => s,
Err(e) => {
println!("Error in get_signatures_for_address_with_config: {}", e);
METRIC_RPC_ERRORS_TOTAL
.with_label_values(&["getSignaturesForAddress"])
.inc();
return before_sig;
}
};