From 260a2f104b47acc25bd09592565f4b00c997fb9a Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Fri, 1 Sep 2023 16:40:59 +0200 Subject: [PATCH 1/3] readding few prometheus counters --- core/src/tx_store.rs | 5 ++- services/src/data_caching_service.rs | 48 +++++++++++++++++++++++++--- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/core/src/tx_store.rs b/core/src/tx_store.rs index 68fbd0c0..104849a7 100644 --- a/core/src/tx_store.rs +++ b/core/src/tx_store.rs @@ -25,9 +25,12 @@ pub struct TxStore { } impl TxStore { - pub fn update_status(&self, signature: &str, status: TransactionStatus) { + pub fn update_status(&self, signature: &str, status: TransactionStatus) -> bool { if let Some(mut meta) = self.store.get_mut(signature) { meta.status = Some(status); + true + } else { + false } } diff --git a/services/src/data_caching_service.rs b/services/src/data_caching_service.rs index eaa13799..06da6164 100644 --- a/services/src/data_caching_service.rs +++ b/services/src/data_caching_service.rs @@ -1,6 +1,8 @@ use std::time::Duration; use anyhow::{bail, Context}; +use prometheus::core::GenericGauge; +use prometheus::{register_int_gauge, opts, IntCounter, register_int_counter}; use solana_lite_rpc_core::block_information_store::BlockInformation; use solana_lite_rpc_core::data_cache::DataCache; use solana_lite_rpc_core::streams::{ @@ -10,6 +12,27 @@ use solana_lite_rpc_core::AnyhowJoinHandle; use solana_sdk::commitment_config::CommitmentLevel; use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus}; + +lazy_static::lazy_static! { + static ref NB_CLUSTER_NODES: GenericGauge = + register_int_gauge!(opts!("literpc_nb_cluster_nodes", "Number of cluster nodes in saved")).unwrap(); + + static ref CURRENT_SLOT: GenericGauge = + register_int_gauge!(opts!("literpc_current_slot", "Current slot seen by last rpc")).unwrap(); + + static ref ESTIMATED_SLOT: GenericGauge = + register_int_gauge!(opts!("literpc_estimated_slot", "Estimated slot seen by last rpc")).unwrap(); + + static ref TXS_CONFIRMED: IntCounter = + register_int_counter!(opts!("literpc_txs_confirmed", "Number of Transactions Confirmed")).unwrap(); + + static ref TXS_FINALIZED: IntCounter = + register_int_counter!(opts!("literpc_txs_finalized", "Number of Transactions Finalized")).unwrap(); + + static ref TXS_PROCESSED: IntCounter = + register_int_counter!(opts!("literpc_txs_processed", "Number of Transactions Processed")).unwrap(); +} + pub struct DataCachingService { pub data_cache: DataCache, pub clean_duration: Duration, @@ -40,12 +63,13 @@ impl DataCachingService { let confirmation_status = match block.commitment_config.commitment { CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized, - _ => TransactionConfirmationStatus::Confirmed, + CommitmentLevel::Confirmed => TransactionConfirmationStatus::Confirmed, + _ => TransactionConfirmationStatus::Processed, + }; for tx in block.txs { - // - data_cache.txs.update_status( + if data_cache.txs.update_status( &tx.signature, TransactionStatus { slot: block.slot, @@ -54,7 +78,20 @@ impl DataCachingService { err: tx.err.clone(), confirmation_status: Some(confirmation_status.clone()), }, - ); + ) { + // transaction updated + match confirmation_status { + TransactionConfirmationStatus::Finalized => { + TXS_FINALIZED.inc(); + }, + TransactionConfirmationStatus::Confirmed => { + TXS_CONFIRMED.inc(); + }, + TransactionConfirmationStatus::Processed => { + TXS_PROCESSED.inc(); + }, + } + } // notify data_cache .tx_subs @@ -70,6 +107,8 @@ impl DataCachingService { loop { match slot_notification.recv().await { Ok(slot_notification) => { + CURRENT_SLOT.set(slot_notification.processed_slot as i64); + ESTIMATED_SLOT.set(slot_notification.estimated_processed_slot as i64); data_cache.slot_cache.update(slot_notification); } Err(e) => { @@ -87,6 +126,7 @@ impl DataCachingService { .cluster_info .load_cluster_info(&mut cluster_info_notification) .await?; + NB_CLUSTER_NODES.set(data_cache.cluster_info.cluster_nodes.len() as i64); } }); From 50c3b43a8bdca55f4237f2d1619ef7de0b6d26a3 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Fri, 1 Sep 2023 16:53:55 +0200 Subject: [PATCH 2/3] cargo fmt --- services/src/data_caching_service.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/services/src/data_caching_service.rs b/services/src/data_caching_service.rs index 06da6164..f290c344 100644 --- a/services/src/data_caching_service.rs +++ b/services/src/data_caching_service.rs @@ -2,7 +2,7 @@ use std::time::Duration; use anyhow::{bail, Context}; use prometheus::core::GenericGauge; -use prometheus::{register_int_gauge, opts, IntCounter, register_int_counter}; +use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter}; use solana_lite_rpc_core::block_information_store::BlockInformation; use solana_lite_rpc_core::data_cache::DataCache; use solana_lite_rpc_core::streams::{ @@ -12,7 +12,6 @@ use solana_lite_rpc_core::AnyhowJoinHandle; use solana_sdk::commitment_config::CommitmentLevel; use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus}; - lazy_static::lazy_static! { static ref NB_CLUSTER_NODES: GenericGauge = register_int_gauge!(opts!("literpc_nb_cluster_nodes", "Number of cluster nodes in saved")).unwrap(); @@ -25,10 +24,10 @@ lazy_static::lazy_static! { static ref TXS_CONFIRMED: IntCounter = register_int_counter!(opts!("literpc_txs_confirmed", "Number of Transactions Confirmed")).unwrap(); - + static ref TXS_FINALIZED: IntCounter = register_int_counter!(opts!("literpc_txs_finalized", "Number of Transactions Finalized")).unwrap(); - + static ref TXS_PROCESSED: IntCounter = register_int_counter!(opts!("literpc_txs_processed", "Number of Transactions Processed")).unwrap(); } @@ -65,7 +64,6 @@ impl DataCachingService { CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized, CommitmentLevel::Confirmed => TransactionConfirmationStatus::Confirmed, _ => TransactionConfirmationStatus::Processed, - }; for tx in block.txs { @@ -83,13 +81,13 @@ impl DataCachingService { match confirmation_status { TransactionConfirmationStatus::Finalized => { TXS_FINALIZED.inc(); - }, + } TransactionConfirmationStatus::Confirmed => { TXS_CONFIRMED.inc(); - }, + } TransactionConfirmationStatus::Processed => { TXS_PROCESSED.inc(); - }, + } } } // notify From 9fd6af9f98f2feeb473f4bcc5b64c51d63d258d2 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Sat, 2 Sep 2023 08:29:50 +0200 Subject: [PATCH 3/3] fix slot polling issue --- .../src/json_rpc_subscription.rs | 9 +- .../src/rpc_polling/poll_slots.rs | 108 ++++++++++-------- 2 files changed, 62 insertions(+), 55 deletions(-) diff --git a/cluster-endpoints/src/json_rpc_subscription.rs b/cluster-endpoints/src/json_rpc_subscription.rs index 11196a62..ccf354be 100644 --- a/cluster-endpoints/src/json_rpc_subscription.rs +++ b/cluster-endpoints/src/json_rpc_subscription.rs @@ -18,13 +18,8 @@ pub fn create_json_rpc_polling_subscription( let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10); let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10); - let mut endpoint_tasks = vec![]; - let slot_polling_task = tokio::spawn(poll_slots( - rpc_client.clone(), - CommitmentConfig::processed(), - slot_sx, - )); - endpoint_tasks.push(slot_polling_task); + let mut endpoint_tasks = + poll_slots(rpc_client.clone(), CommitmentConfig::processed(), slot_sx)?; let mut block_polling_tasks = poll_block(rpc_client.clone(), block_sx, slot_notifier.resubscribe()); diff --git a/cluster-endpoints/src/rpc_polling/poll_slots.rs b/cluster-endpoints/src/rpc_polling/poll_slots.rs index 73812bea..16e625df 100644 --- a/cluster-endpoints/src/rpc_polling/poll_slots.rs +++ b/cluster-endpoints/src/rpc_polling/poll_slots.rs @@ -1,8 +1,8 @@ use std::{sync::Arc, time::Duration}; -use anyhow::Context; +use anyhow::{bail, Context}; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_lite_rpc_core::structures::slot_notification::SlotNotification; +use solana_lite_rpc_core::{structures::slot_notification::SlotNotification, AnyhowJoinHandle}; use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot}; use tokio::sync::broadcast::Sender; const AVERAGE_SLOT_CHANGE_TIME: Duration = Duration::from_millis(400); @@ -14,74 +14,86 @@ pub async fn poll_commitment_slots( ) -> anyhow::Result<()> { let mut poll_frequency = tokio::time::interval(Duration::from_millis(10)); let mut last_slot = 0; + let mut errors = 0; loop { - let slot = rpc_client - .get_slot_with_commitment(commitment_config) - .await - .context("Error getting slot")?; - if slot > last_slot { - // send - slot_tx.send(slot).context("Error sending slot")?; - last_slot = slot; + let slot = rpc_client.get_slot_with_commitment(commitment_config).await; + match slot { + Ok(slot) => { + if slot > last_slot { + // send + slot_tx.send(slot).context("Error sending slot")?; + last_slot = slot; + } + errors = 0; + } + Err(e) => { + errors += 1; + if errors > 10 { + bail!("Exceeded error count to get slots from rpc {e:?}"); + } + } } // wait for next poll i.e at least 50ms poll_frequency.tick().await; } } -pub async fn poll_slots( +pub fn poll_slots( rpc_client: Arc, commitment_config: CommitmentConfig, sender: Sender, -) -> anyhow::Result<()> { - let slot = rpc_client - .get_slot_with_commitment(CommitmentConfig::confirmed()) - .await - .context("Error getting slot")?; - - let mut current_slot = slot; - let mut estimated_slot = slot; - +) -> anyhow::Result> { // processed slot update task let (slot_update_sx, mut slot_update_rx) = tokio::sync::mpsc::unbounded_channel(); - tokio::spawn(poll_commitment_slots( - rpc_client, + let task1 = tokio::spawn(poll_commitment_slots( + rpc_client.clone(), commitment_config, slot_update_sx, )); - loop { - match tokio::time::timeout(AVERAGE_SLOT_CHANGE_TIME, slot_update_rx.recv()).await { - Ok(Some(slot)) => { - // slot is latest - if slot > current_slot { - current_slot = slot; - if current_slot > estimated_slot { - estimated_slot = slot; + let task2 = tokio::spawn(async move { + let slot = rpc_client + .get_slot_with_commitment(CommitmentConfig::confirmed()) + .await + .context("Error getting slot")?; + + let mut current_slot = slot; + let mut estimated_slot = slot; + + loop { + match tokio::time::timeout(AVERAGE_SLOT_CHANGE_TIME, slot_update_rx.recv()).await { + Ok(Some(slot)) => { + // slot is latest + if slot > current_slot { + current_slot = slot; + if current_slot > estimated_slot { + estimated_slot = slot; + } + sender + .send(SlotNotification { + processed_slot: current_slot, + estimated_processed_slot: estimated_slot, + }) + .context("Cannot send slot notification")?; + } + } + Ok(None) => log::error!("got nothing from slot update notifier"), + Err(err) => { + log::warn!("failed to receive slot update: {err}"); + // force update the slot + // estimated slot should not go ahead more than 32 slots + // this is because it may be a slot block + if estimated_slot < current_slot + 32 { + estimated_slot += 1; } sender .send(SlotNotification { processed_slot: current_slot, estimated_processed_slot: estimated_slot, }) - .context("Cannot send slot notification")?; + .context("Connot send slot notification")?; } } - Ok(None) => log::error!("got nothing from slot update notifier"), - Err(err) => { - log::warn!("failed to receive slot update: {err}"); - // force update the slot - // estimated slot should not go ahead more than 32 slots - // this is because it may be a slot block - if estimated_slot < current_slot + 32 { - estimated_slot += 1; - } - sender - .send(SlotNotification { - processed_slot: current_slot, - estimated_processed_slot: estimated_slot, - }) - .context("Connot send slot notification")?; - } } - } + }); + Ok(vec![task1, task2]) }