temp: start adding prometheus metrics to services

This commit is contained in:
Lou-Kamades 2024-02-22 14:44:44 -08:00
parent 8adb9e8ada
commit ec2243ff8c
No known key found for this signature in database
GPG Key ID: 87A166E4D7C01F30
7 changed files with 95 additions and 34 deletions

3
Cargo.lock generated
View File

@ -3544,9 +3544,11 @@ dependencies = [
"jemallocator",
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core-client",
"lazy_static",
"mango-v4",
"mango-v4-client",
"once_cell",
"prometheus",
"pyth-sdk-solana",
"rand 0.7.3",
"regex",
@ -3564,6 +3566,7 @@ dependencies = [
"tokio-stream",
"tokio-tungstenite 0.16.1",
"tracing",
"warp",
]
[[package]]

View File

@ -29,6 +29,7 @@ itertools = "0.10.3"
jemallocator = "0.3.2"
jsonrpc-core = "18.0.0"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http", "tls"] }
lazy_static = "1.4.0"
mango-v4 = { path = "../../programs/mango-v4", features = ["client"] }
mango-v4-client = { path = "../../lib/client" }
once_cell = "1.12.0"
@ -50,4 +51,5 @@ tokio-stream = { version = "0.1.9"}
tokio-tungstenite = "0.16.1"
tracing = "0.1"
regex = "1.9.5"
hdrhistogram = "7.5.4"
hdrhistogram = "7.5.4"
warp = "0.3.3"

View File

@ -562,7 +562,7 @@ impl<'a> LiquidateHelper<'a> {
.client
.send_and_confirm_owner_tx(liq_ixs.to_instructions())
.await
.context("sending liq_token_with_token")?;
.context("sending liq_token_bankruptcy")?;
info!(
liab_token_index,
%txsig,

View File

@ -31,6 +31,12 @@ mod unwrappable_oracle_error;
pub mod util;
use crate::unwrappable_oracle_error::UnwrappableOracleError;
use crate::metrics::serve_metrics;
use crate::metrics::METRIC_CHAIN_DATA_SLOTS;
use crate::metrics::METRIC_CHAIN_DATA_ACCOUNTS;
use crate::metrics::METRIC_CHAIN_DATA_ACCOUNT_WRITE;
use crate::metrics::METRIC_MANGO_ACCOUNTS;
use crate::metrics::METRIC_ACCOUNT_UPDATE_QUEUE_LEN;
use crate::util::{is_mango_account, is_mint_info, is_perp_market};
// jemalloc seems to be better at keeping the memory footprint reasonable over
@ -136,8 +142,6 @@ async fn main() -> anyhow::Result<()> {
info!("startup");
let metrics = metrics::start();
let (account_update_sender, account_update_receiver) =
async_channel::unbounded::<account_update_stream::Message>();
@ -174,7 +178,7 @@ async fn main() -> anyhow::Result<()> {
account_update_sender,
);
start_chain_data_metrics(chain_data.clone(), &metrics);
start_chain_data_metrics(chain_data.clone());
let shared_state = Arc::new(RwLock::new(SharedState::default()));
@ -297,7 +301,7 @@ async fn main() -> anyhow::Result<()> {
.await
.expect("channel not closed");
let current_time = Instant::now();
metric_account_update_queue_len.set(account_update_receiver.len() as u64);
METRIC_ACCOUNT_UPDATE_QUEUE_LEN.set(account_update_receiver.len().try_into().unwrap());
message.update_chain_data(&mut chain_data.write().unwrap());
@ -322,7 +326,7 @@ async fn main() -> anyhow::Result<()> {
// Track all MangoAccounts: we need to iterate over them later
state.mango_accounts.insert(account_write.pubkey);
metric_mango_accounts.set(state.mango_accounts.len() as u64);
METRIC_MANGO_ACCOUNTS.set(state.mango_accounts.len().try_into().unwrap());
}
}
Message::Snapshot(snapshot) => {
@ -360,7 +364,7 @@ async fn main() -> anyhow::Result<()> {
metric_chain_update_latency
.push(current_time - reception_time.unwrap());
}
metric_mango_accounts.set(state.mango_accounts.len() as u64);
METRIC_MANGO_ACCOUNTS.set(state.mango_accounts.len().try_into().unwrap());
state.one_snapshot_done = true;
}
@ -528,7 +532,8 @@ async fn main() -> anyhow::Result<()> {
));
}
use cli_args::{BoolArg, Cli, CliDotenv};
let serve_metrics_job = tokio::spawn(serve_metrics());
use futures::StreamExt;
let mut jobs: futures::stream::FuturesUnordered<_> = vec![
data_job,
@ -536,6 +541,7 @@ async fn main() -> anyhow::Result<()> {
liquidation_job,
token_swap_info_job,
check_changes_for_abort_job,
serve_metrics_job,
]
.into_iter()
.chain(prio_jobs.into_iter())
@ -792,21 +798,17 @@ impl LiquidationState {
}
}
fn start_chain_data_metrics(chain: Arc<RwLock<chain_data::ChainData>>, metrics: &metrics::Metrics) {
fn start_chain_data_metrics(chain: Arc<RwLock<chain_data::ChainData>>) {
let mut interval = mango_v4_client::delay_interval(Duration::from_secs(600));
let mut metric_slots_count = metrics.register_u64("chain_data_slots_count".into());
let mut metric_accounts_count = metrics.register_u64("chain_data_accounts_count".into());
let mut metric_account_write_count =
metrics.register_u64("chain_data_account_write_count".into());
tokio::spawn(async move {
loop {
interval.tick().await;
let chain_lock = chain.read().unwrap();
metric_slots_count.set(chain_lock.slots_count() as u64);
metric_accounts_count.set(chain_lock.accounts_count() as u64);
metric_account_write_count.set(chain_lock.account_writes_count() as u64);
METRIC_CHAIN_DATA_SLOTS.set(chain_lock.slots_count().try_into().unwrap());
METRIC_CHAIN_DATA_ACCOUNTS.set(chain_lock.accounts_count().try_into().unwrap());
METRIC_CHAIN_DATA_ACCOUNT_WRITE.set(chain_lock.account_writes_count().try_into().unwrap());
}
});
}

View File

@ -13,23 +13,73 @@ enum Value {
I64(Arc<atomic::AtomicI64>),
String(Arc<Mutex<String>>),
Latency(Arc<Mutex<Histogram<u64>>>),
use prometheus::{Encoder, IntCounter, IntGauge, Registry};
use warp::Filter;
lazy_static::lazy_static! {
pub static ref METRICS_REGISTRY: Registry = Registry::new_custom(Some("liquidator".to_string()), None).unwrap();
pub static ref METRIC_TOTAL_LIQUIDATION_TRANSACTIONS: IntCounter =
IntCounter::new("total_liquidation_transactions", "Total liquidation transactions").unwrap();
pub static ref METRIC_TOTAL_TOKEN_LIQ_WITH_TOKEN_TRANSACTIONS: IntCounter =
IntCounter::new("total_token_liq_with_token_transactions", "Total token_liq_with_token transactions").unwrap();
pub static ref METRIC_TOTAL_TOKEN_LIQ_BANKRUPTCY_TRANSACTIONS: IntCounter =
IntCounter::new("total_token_liq_bankruptcy_transactions", "Total token_liq_bankruptcy transactions").unwrap();
pub static ref METRIC_TOTAL_PERP_LIQ_BASE_OR_POS_PNL_TRANSACTIONS: IntCounter =
IntCounter::new("total_perp_liq_base_or_positive_pnl_transactions", "Total perp_liq_base_or_positive_pnl transactions").unwrap();
pub static ref METRIC_TOTAL_PERP_LIQ_NEG_PNL_OR_BANKRUPTCY_TRANSACTIONS: IntCounter =
IntCounter::new("total_perp_liq_base_or_positive_pnl_transactions", "Total perp_liq_negative_pnl_or_bankruptcy transactions").unwrap();
pub static ref METRIC_CHAIN_DATA_SLOTS: IntGauge =
IntGauge::new("chain_data_slots_count", "chain_data_slots_count").unwrap();
pub static ref METRIC_CHAIN_DATA_ACCOUNTS: IntGauge =
IntGauge::new("chain_data_accounts_count", "chain_data_accounts_count").unwrap();
pub static ref METRIC_CHAIN_DATA_ACCOUNT_WRITE: IntGauge =
IntGauge::new("chain_data_account_write_count", "chain_data_account_write_count").unwrap();
pub static ref METRIC_ACCOUNT_UPDATE_QUEUE_LEN: IntGauge =
IntGauge::new("account_update_queue_len", "account_update_queue_len").unwrap();
pub static ref METRIC_MANGO_ACCOUNTS: IntGauge =
IntGauge::new("mango_accounts", "mango_accounts").unwrap();
}
#[derive(Debug)]
enum PrevValue {
U64(u64),
I64(i64),
String(String),
}
pub async fn serve_metrics() {
METRICS_REGISTRY
.register(Box::new(METRIC_TOTAL_LIQUIDATION_TRANSACTIONS.clone()))
.unwrap();
METRICS_REGISTRY
.register(Box::new(
METRIC_TOTAL_TOKEN_LIQ_WITH_TOKEN_TRANSACTIONS.clone(),
))
.unwrap();
METRICS_REGISTRY
.register(Box::new(
METRIC_TOTAL_TOKEN_LIQ_BANKRUPTCY_TRANSACTIONS.clone(),
))
.unwrap();
METRICS_REGISTRY
.register(Box::new(
METRIC_TOTAL_PERP_LIQ_BASE_OR_POS_PNL_TRANSACTIONS.clone(),
))
.unwrap();
METRICS_REGISTRY
.register(Box::new(
METRIC_TOTAL_PERP_LIQ_NEG_PNL_OR_BANKRUPTCY_TRANSACTIONS.clone(),
))
.unwrap();
METRICS_REGISTRY
.register(Box::new(METRIC_CHAIN_DATA_SLOTS.clone()))
.unwrap();
METRICS_REGISTRY
.register(Box::new(METRIC_CHAIN_DATA_ACCOUNTS.clone()))
.unwrap();
METRICS_REGISTRY
.register(Box::new(METRIC_CHAIN_DATA_ACCOUNT_WRITE.clone()))
.unwrap();
#[derive(Clone)]
pub struct MetricU64 {
value: Arc<atomic::AtomicU64>,
}
impl MetricU64 {
pub fn value(&self) -> u64 {
self.value.load(atomic::Ordering::Acquire)
}
let metrics_route = warp::path!("metrics").map(|| {
let mut buffer = Vec::<u8>::new();
let encoder = prometheus::TextEncoder::new();
encoder
.encode(&METRICS_REGISTRY.gather(), &mut buffer)
.unwrap();
pub fn set(&mut self, value: u64) {
self.value.store(value, atomic::Ordering::Release);
@ -228,7 +278,8 @@ pub fn start() -> Metrics {
}
}
}
String::from_utf8(buffer.clone()).unwrap()
});
Metrics { registry }
println!("Metrics server starting on port 9091");
warp::serve(metrics_route).run(([0, 0, 0, 0], 9091)).await;
}

View File

@ -217,6 +217,7 @@ async fn main() -> anyhow::Result<()> {
account_fetcher: account_fetcher.clone(),
config: settle_config,
recently_settled: Default::default(),
metrics: &metrics
};
let mut tcs_start = tcs_start::State {

View File

@ -61,6 +61,7 @@ pub struct SettlementState {
pub config: Config,
pub recently_settled: HashMap<Pubkey, Instant>,
pub metrics: &metrics::Metrics,
}
impl SettlementState {
@ -353,6 +354,7 @@ impl<'a> SettleBatchProcessor<'a> {
self.instructions = previous;
let txsig = self.send().await?;
self.instructions.append(new_ixs);
self.metrics.
return Ok(txsig);
}