From 4b7dccbece8a2f2be0962161f31885f3a9c002a2 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 23 Nov 2023 11:10:20 +0100 Subject: [PATCH] add prometheus --- src/cli.rs | 3 +++ src/main.rs | 25 +++++++++++++++--- src/prometheus_sync.rs | 57 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 src/prometheus_sync.rs diff --git a/src/cli.rs b/src/cli.rs index f483f09..01ab73f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -5,4 +5,7 @@ use clap::Parser; pub struct Args { #[arg(short, long, default_value_t = String::from("http://127.0.0.1:10000"))] pub grpc_address: String, + /// enable metrics to prometheus at addr + #[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))] + pub prometheus_addr: String, } diff --git a/src/main.rs b/src/main.rs index da345b4..bfe6543 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,23 +10,37 @@ use cli::Args; use dashmap::DashMap; use futures::StreamExt; use log::info; +use prometheus::{IntCounter, IntGauge, opts, register_int_counter, register_int_gauge}; use solana_sdk::signature::Signature; use transaction_info::TransactionInfo; use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::prelude::{ subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeUpdateBlock, }; +use crate::prometheus_sync::PrometheusSync; mod block_info; mod cli; mod postgres; mod transaction_info; +mod prometheus_sync; + +lazy_static::lazy_static! { + static ref BANKING_STAGE_ERROR_COUNT: IntGauge = + register_int_gauge!(opts!("bankingstage_banking_errors", "banking_stage errors in block")).unwrap(); + static ref TXERROR_COUNT: IntGauge = + register_int_gauge!(opts!("bankingstage_txerrors", "transaction errors in block")).unwrap(); +} #[tokio::main()] async fn main() { tracing_subscriber::fmt::init(); let args = Args::parse(); + + let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone()); + + let grpc_addr = args.grpc_address; let mut client = GeyserGrpcClient::connect(grpc_addr, None::<&'static str>, None).unwrap(); let map_of_infos = Arc::new(DashMap::::new()); @@ -71,7 +85,9 @@ async fn main() { // process blocks with 2 mins delay so that we process all the banking stage errors before processing blocks tokio::spawn(async move { while let Some((wait_until, block)) = recv_block.recv().await { - println!("b"); + if wait_until > Instant::now() + Duration::from_secs(5) { + info!("wait until {:?} to collect errors for block {}", wait_until, block.slot); + } tokio::time::sleep_until(wait_until).await; for transaction in &block.transactions { let Some(tx) = &transaction.transaction else { @@ -84,9 +100,12 @@ async fn main() { } let block_info = BlockInfo::new(&block, &slot_by_error_task); + BANKING_STAGE_ERROR_COUNT.set(block_info.banking_stage_errors); + TXERROR_COUNT.set(block_info.processed_transactions - block_info.successful_transactions); if let Err(e) = postgres.save_block_info(block_info).await { log::error!("Error saving block {}", e); } + info!("saved block {}", block.slot); } }); @@ -105,7 +124,7 @@ async fn main() { if transaction.error.is_none() { continue; } - log::info!("got banking stage transaction erros"); + info!("got banking stage transaction errors"); let sig = transaction.signature.to_string(); match slot_by_errors.get_mut(&transaction.slot) { Some(mut value) => { @@ -127,7 +146,7 @@ async fn main() { } } UpdateOneof::Block(block) => { - log::info!("got block"); + log::debug!("got block {}", block.slot); slot.store(block.slot, std::sync::atomic::Ordering::Relaxed); send_block.send(( Instant::now() + Duration::from_secs(30), block)).expect("should works"); diff --git a/src/prometheus_sync.rs b/src/prometheus_sync.rs new file mode 100644 index 0000000..7339438 --- /dev/null +++ b/src/prometheus_sync.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use log::error; +use prometheus::{Encoder, TextEncoder}; +use tokio::{ + io::AsyncWriteExt, + net::{TcpListener, TcpStream, ToSocketAddrs}, +}; +use tokio::task::JoinHandle; + +pub struct PrometheusSync; + +impl PrometheusSync { + fn create_response(payload: &str) -> String { + format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}", + payload.len(), + payload + ) + } + + async fn handle_stream(stream: &mut TcpStream) -> anyhow::Result<()> { + let mut metrics_buffer = Vec::new(); + let encoder = TextEncoder::new(); + + let metric_families = prometheus::gather(); + encoder + .encode(&metric_families, &mut metrics_buffer) + .unwrap(); + + let metrics_buffer = String::from_utf8(metrics_buffer).unwrap(); + let response = Self::create_response(&metrics_buffer); + + stream.writable().await?; + stream.write_all(response.as_bytes()).await?; + + stream.flush().await?; + + Ok(()) + } + + pub fn sync(addr: impl ToSocketAddrs + Send + 'static) -> JoinHandle> { + tokio::spawn(async move { + let listener = TcpListener::bind(addr).await?; + + loop { + let Ok((mut stream, _addr)) = listener.accept().await else { + error!("Error accepting prometheus stream"); + tokio::time::sleep(Duration::from_millis(1)).await; + continue; + }; + + let _ = Self::handle_stream(&mut stream).await; + } + }) + } +}