Merge pull request #176 from blockworks-foundation/readding_prometheus_counter

readding few prometheus counters
This commit is contained in:
galactus 2023-09-02 10:39:20 +02:00 committed by GitHub
commit 8580eded2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 60 deletions

View File

@ -18,13 +18,8 @@ pub fn create_json_rpc_polling_subscription(
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10);
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10);
let mut endpoint_tasks = vec![];
let slot_polling_task = tokio::spawn(poll_slots(
rpc_client.clone(),
CommitmentConfig::processed(),
slot_sx,
));
endpoint_tasks.push(slot_polling_task);
let mut endpoint_tasks =
poll_slots(rpc_client.clone(), CommitmentConfig::processed(), slot_sx)?;
let mut block_polling_tasks =
poll_block(rpc_client.clone(), block_sx, slot_notifier.resubscribe());

View File

@ -1,8 +1,8 @@
use std::{sync::Arc, time::Duration};
use anyhow::Context;
use anyhow::{bail, Context};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
use solana_lite_rpc_core::{structures::slot_notification::SlotNotification, AnyhowJoinHandle};
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
use tokio::sync::broadcast::Sender;
const AVERAGE_SLOT_CHANGE_TIME: Duration = Duration::from_millis(400);
@ -14,74 +14,86 @@ pub async fn poll_commitment_slots(
) -> anyhow::Result<()> {
let mut poll_frequency = tokio::time::interval(Duration::from_millis(10));
let mut last_slot = 0;
let mut errors = 0;
loop {
let slot = rpc_client
.get_slot_with_commitment(commitment_config)
.await
.context("Error getting slot")?;
if slot > last_slot {
// send
slot_tx.send(slot).context("Error sending slot")?;
last_slot = slot;
let slot = rpc_client.get_slot_with_commitment(commitment_config).await;
match slot {
Ok(slot) => {
if slot > last_slot {
// send
slot_tx.send(slot).context("Error sending slot")?;
last_slot = slot;
}
errors = 0;
}
Err(e) => {
errors += 1;
if errors > 10 {
bail!("Exceeded error count to get slots from rpc {e:?}");
}
}
}
// wait for next poll i.e at least 50ms
poll_frequency.tick().await;
}
}
pub async fn poll_slots(
pub fn poll_slots(
rpc_client: Arc<RpcClient>,
commitment_config: CommitmentConfig,
sender: Sender<SlotNotification>,
) -> anyhow::Result<()> {
let slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await
.context("Error getting slot")?;
let mut current_slot = slot;
let mut estimated_slot = slot;
) -> anyhow::Result<Vec<AnyhowJoinHandle>> {
// processed slot update task
let (slot_update_sx, mut slot_update_rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(poll_commitment_slots(
rpc_client,
let task1 = tokio::spawn(poll_commitment_slots(
rpc_client.clone(),
commitment_config,
slot_update_sx,
));
loop {
match tokio::time::timeout(AVERAGE_SLOT_CHANGE_TIME, slot_update_rx.recv()).await {
Ok(Some(slot)) => {
// slot is latest
if slot > current_slot {
current_slot = slot;
if current_slot > estimated_slot {
estimated_slot = slot;
let task2 = tokio::spawn(async move {
let slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await
.context("Error getting slot")?;
let mut current_slot = slot;
let mut estimated_slot = slot;
loop {
match tokio::time::timeout(AVERAGE_SLOT_CHANGE_TIME, slot_update_rx.recv()).await {
Ok(Some(slot)) => {
// slot is latest
if slot > current_slot {
current_slot = slot;
if current_slot > estimated_slot {
estimated_slot = slot;
}
sender
.send(SlotNotification {
processed_slot: current_slot,
estimated_processed_slot: estimated_slot,
})
.context("Cannot send slot notification")?;
}
}
Ok(None) => log::error!("got nothing from slot update notifier"),
Err(err) => {
log::warn!("failed to receive slot update: {err}");
// force update the slot
// estimated slot should not go ahead more than 32 slots
// this is because it may be a slot block
if estimated_slot < current_slot + 32 {
estimated_slot += 1;
}
sender
.send(SlotNotification {
processed_slot: current_slot,
estimated_processed_slot: estimated_slot,
})
.context("Cannot send slot notification")?;
.context("Connot send slot notification")?;
}
}
Ok(None) => log::error!("got nothing from slot update notifier"),
Err(err) => {
log::warn!("failed to receive slot update: {err}");
// force update the slot
// estimated slot should not go ahead more than 32 slots
// this is because it may be a slot block
if estimated_slot < current_slot + 32 {
estimated_slot += 1;
}
sender
.send(SlotNotification {
processed_slot: current_slot,
estimated_processed_slot: estimated_slot,
})
.context("Connot send slot notification")?;
}
}
}
});
Ok(vec![task1, task2])
}

View File

@ -25,9 +25,12 @@ pub struct TxStore {
}
impl TxStore {
pub fn update_status(&self, signature: &str, status: TransactionStatus) {
pub fn update_status(&self, signature: &str, status: TransactionStatus) -> bool {
if let Some(mut meta) = self.store.get_mut(signature) {
meta.status = Some(status);
true
} else {
false
}
}

View File

@ -1,6 +1,8 @@
use std::time::Duration;
use anyhow::{bail, Context};
use prometheus::core::GenericGauge;
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter};
use solana_lite_rpc_core::block_information_store::BlockInformation;
use solana_lite_rpc_core::data_cache::DataCache;
use solana_lite_rpc_core::streams::{
@ -10,6 +12,26 @@ use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::commitment_config::CommitmentLevel;
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
lazy_static::lazy_static! {
static ref NB_CLUSTER_NODES: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_cluster_nodes", "Number of cluster nodes in saved")).unwrap();
static ref CURRENT_SLOT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_current_slot", "Current slot seen by last rpc")).unwrap();
static ref ESTIMATED_SLOT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_estimated_slot", "Estimated slot seen by last rpc")).unwrap();
static ref TXS_CONFIRMED: IntCounter =
register_int_counter!(opts!("literpc_txs_confirmed", "Number of Transactions Confirmed")).unwrap();
static ref TXS_FINALIZED: IntCounter =
register_int_counter!(opts!("literpc_txs_finalized", "Number of Transactions Finalized")).unwrap();
static ref TXS_PROCESSED: IntCounter =
register_int_counter!(opts!("literpc_txs_processed", "Number of Transactions Processed")).unwrap();
}
pub struct DataCachingService {
pub data_cache: DataCache,
pub clean_duration: Duration,
@ -40,12 +62,12 @@ impl DataCachingService {
let confirmation_status = match block.commitment_config.commitment {
CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized,
_ => TransactionConfirmationStatus::Confirmed,
CommitmentLevel::Confirmed => TransactionConfirmationStatus::Confirmed,
_ => TransactionConfirmationStatus::Processed,
};
for tx in block.txs {
//
data_cache.txs.update_status(
if data_cache.txs.update_status(
&tx.signature,
TransactionStatus {
slot: block.slot,
@ -54,7 +76,20 @@ impl DataCachingService {
err: tx.err.clone(),
confirmation_status: Some(confirmation_status.clone()),
},
);
) {
// transaction updated
match confirmation_status {
TransactionConfirmationStatus::Finalized => {
TXS_FINALIZED.inc();
}
TransactionConfirmationStatus::Confirmed => {
TXS_CONFIRMED.inc();
}
TransactionConfirmationStatus::Processed => {
TXS_PROCESSED.inc();
}
}
}
// notify
data_cache
.tx_subs
@ -70,6 +105,8 @@ impl DataCachingService {
loop {
match slot_notification.recv().await {
Ok(slot_notification) => {
CURRENT_SLOT.set(slot_notification.processed_slot as i64);
ESTIMATED_SLOT.set(slot_notification.estimated_processed_slot as i64);
data_cache.slot_cache.update(slot_notification);
}
Err(e) => {
@ -87,6 +124,7 @@ impl DataCachingService {
.cluster_info
.load_cluster_info(&mut cluster_info_notification)
.await?;
NB_CLUSTER_NODES.set(data_cache.cluster_info.cluster_nodes.len() as i64);
}
});