add lifeness prometheus probe

This commit is contained in:
GroovieGermanikus 2023-11-23 11:16:00 +01:00
parent 4b7dccbece
commit 1b0c4f25d9
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 7 additions and 5 deletions

View File

@ -26,6 +26,8 @@ mod transaction_info;
mod prometheus_sync;
lazy_static::lazy_static! {
static ref BLOCK_TXS: IntGauge =
register_int_gauge!(opts!("block_arrived", "block seen with n transactions (use for liveness check)")).unwrap();
static ref BANKING_STAGE_ERROR_COUNT: IntGauge =
register_int_gauge!(opts!("bankingstage_banking_errors", "banking_stage errors in block")).unwrap();
static ref TXERROR_COUNT: IntGauge =
@ -40,7 +42,6 @@ async fn main() {
let _prometheus_jh = PrometheusSync::sync(args.prometheus_addr.clone());
let grpc_addr = args.grpc_address;
let mut client = GeyserGrpcClient::connect(grpc_addr, None::<&'static str>, None).unwrap();
let map_of_infos = Arc::new(DashMap::<String, TransactionInfo>::new());
@ -76,7 +77,7 @@ async fn main() {
.await
.unwrap();
postgres.start_saving_transaction(map_of_infos.clone(), slot.clone());
postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone());
let (send_block, mut recv_block) = tokio::sync::mpsc::unbounded_channel::<(Instant, SubscribeUpdateBlock)>();
let slot_by_error_task = slot_by_errors.clone();
@ -147,8 +148,8 @@ async fn main() {
}
UpdateOneof::Block(block) => {
log::debug!("got block {}", block.slot);
BLOCK_TXS.set(block.transactions.len() as i64);
slot.store(block.slot, std::sync::atomic::Ordering::Relaxed);
send_block.send(( Instant::now() + Duration::from_secs(30), block)).expect("should works");
// delay queue so that we get all the banking stage errors before processing block
}

View File

@ -7,6 +7,7 @@ use anyhow::Context;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use itertools::Itertools;
use log::debug;
use tokio_postgres::{tls::MakeTlsConnect, types::ToSql, Client, NoTls, Socket};
use crate::{block_info::BlockInfo, transaction_info::TransactionInfo};
@ -158,7 +159,7 @@ impl Postgres {
}
}
pub fn start_saving_transaction(
pub fn spawn_transaction_infos_saver(
&self,
map_of_transaction: Arc<DashMap<String, TransactionInfo>>,
slots: Arc<AtomicU64>,
@ -176,7 +177,7 @@ impl Postgres {
}
if !txs_to_store.is_empty() {
println!("saving {}", txs_to_store.len());
debug!("saving transaction infos for {}", txs_to_store.len());
for tx in &txs_to_store {
map_of_transaction.remove(&tx.signature);
}