From dcdbf050ce4b5b007b94d018c9f60339f20fd1d7 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Mon, 6 Mar 2023 14:25:00 +0100 Subject: [PATCH] adding more tokio metrics, other metrics, forcing new instance of TPUClient in cleanup --- src/bridge.rs | 31 +++++++++---------- src/tpu_manager.rs | 54 ++++++++++++++++++++++------------ src/workers/block_listenser.rs | 28 +++++++++--------- src/workers/cleaner.rs | 10 +++++-- src/workers/metrics_capture.rs | 6 ++-- src/workers/tx_sender.rs | 16 ++++++---- 6 files changed, 86 insertions(+), 59 deletions(-) diff --git a/src/bridge.rs b/src/bridge.rs index e3fbba28..bd9c9b15 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -17,7 +17,7 @@ use log::info; use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink}; -use prometheus::{core::GenericGauge, opts, register_counter, register_int_gauge, Counter}; +use prometheus::{core::GenericGauge, opts, register_int_counter, register_int_gauge, IntCounter}; use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction}; use solana_rpc_client_api::{ config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig}, @@ -35,20 +35,20 @@ use tokio::{ }; lazy_static::lazy_static! { - static ref RPC_SEND_TX: Counter = - register_counter!(opts!("literpc_rpc_send_tx", "RPC call send transaction")).unwrap(); - static ref RPC_GET_LATEST_BLOCKHASH: Counter = - register_counter!(opts!("literpc_rpc_get_latest_blockhash", "RPC call to get latest block hash")).unwrap(); - static ref RPC_IS_BLOCKHASH_VALID: Counter = - register_counter!(opts!("literpc_rpc_is_blockhash_valid", "RPC call to check if blockhash is vali calld")).unwrap(); - static ref RPC_GET_SIGNATURE_STATUSES: Counter = - register_counter!(opts!("literpc_rpc_get_signature_statuses", "RPC call to get signature statuses")).unwrap(); - static ref RPC_GET_VERSION: Counter = - register_counter!(opts!("literpc_rpc_get_version", "RPC call to version")).unwrap(); - static ref RPC_REQUEST_AIRDROP: Counter = - register_counter!(opts!("literpc_rpc_airdrop", "RPC call to request airdrop")).unwrap(); - static ref RPC_SIGNATURE_SUBSCRIBE: Counter = - register_counter!(opts!("literpc_rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap(); + static ref RPC_SEND_TX: IntCounter = + register_int_counter!(opts!("literpc_rpc_send_tx", "RPC call send transaction")).unwrap(); + static ref RPC_GET_LATEST_BLOCKHASH: IntCounter = + register_int_counter!(opts!("literpc_rpc_get_latest_blockhash", "RPC call to get latest block hash")).unwrap(); + static ref RPC_IS_BLOCKHASH_VALID: IntCounter = + register_int_counter!(opts!("literpc_rpc_is_blockhash_valid", "RPC call to check if blockhash is vali calld")).unwrap(); + static ref RPC_GET_SIGNATURE_STATUSES: IntCounter = + register_int_counter!(opts!("literpc_rpc_get_signature_statuses", "RPC call to get signature statuses")).unwrap(); + static ref RPC_GET_VERSION: IntCounter = + register_int_counter!(opts!("literpc_rpc_get_version", "RPC call to version")).unwrap(); + static ref RPC_REQUEST_AIRDROP: IntCounter = + register_int_counter!(opts!("literpc_rpc_airdrop", "RPC call to request airdrop")).unwrap(); + static ref RPC_SIGNATURE_SUBSCRIBE: IntCounter = + register_int_counter!(opts!("literpc_rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap(); pub static ref TXS_IN_CHANNEL: GenericGauge = register_int_gauge!(opts!("literpc_txs_in_channel", "Transactions in channel")).unwrap(); } @@ -142,6 +142,7 @@ impl LiteBridge { self.tx_sender.clone(), self.block_listner.clone(), self.block_store.clone(), + self.tpu_manager.clone(), ) .start(clean_interval); diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs index eb5e4ea1..4d3ed485 100644 --- a/src/tpu_manager.rs +++ b/src/tpu_manager.rs @@ -7,6 +7,7 @@ use std::{ }; use log::info; +use prometheus::{opts, register_int_counter, IntCounter}; use solana_quic_client::{QuicConfig, QuicPool}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::Keypair; @@ -22,6 +23,11 @@ pub type QuicConnectionCache = TpuConnectionCache; const TPU_CONNECTION_CACHE_SIZE: usize = 8; +lazy_static::lazy_static! { +static ref TPU_CONNECTION_RESET: IntCounter = + register_int_counter!(opts!("literpc_tpu_connection_reset", "Number of times tpu connection was reseted")).unwrap(); +} + #[derive(Clone)] pub struct TpuManager { error_count: Arc, @@ -30,7 +36,7 @@ pub struct TpuManager { tpu_client: Arc>>, pub ws_addr: String, fanout_slots: u64, - connection_cache: Arc, + identity: Arc, } impl TpuManager { @@ -48,13 +54,9 @@ impl TpuManager { let connection_cache = QuicConnectionCache::new_with_config(TPU_CONNECTION_CACHE_SIZE, tpu_config); let connection_cache = Arc::new(connection_cache); - let tpu_client = Self::new_tpu_client( - rpc_client.clone(), - &ws_addr, - fanout_slots, - connection_cache.clone(), - ) - .await?; + let tpu_client = + Self::new_tpu_client(rpc_client.clone(), &ws_addr, fanout_slots, connection_cache) + .await?; let tpu_client = Arc::new(RwLock::new(Arc::new(tpu_client))); Ok(Self { @@ -63,7 +65,7 @@ impl TpuManager { ws_addr, fanout_slots, error_count: Default::default(), - connection_cache, + identity: Arc::new(identity), }) } @@ -82,26 +84,40 @@ impl TpuManager { .await?) } + pub async fn reset_tpu_client(&self) -> anyhow::Result<()> { + let mut tpu_config = QuicConfig::new().unwrap(); + tpu_config + .update_client_certificate(&self.identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))) + .unwrap(); + let connection_cache = + QuicConnectionCache::new_with_config(TPU_CONNECTION_CACHE_SIZE, tpu_config); + let connection_cache = Arc::new(connection_cache); + + let tpu_client = Self::new_tpu_client( + self.rpc_client.clone(), + &self.ws_addr, + self.fanout_slots, + connection_cache, + ) + .await?; + self.error_count.store(0, Ordering::Relaxed); + *self.tpu_client.write().await = Arc::new(tpu_client); + TPU_CONNECTION_RESET.inc(); + Ok(()) + } + pub async fn reset(&self) -> anyhow::Result<()> { self.error_count.fetch_add(1, Ordering::Relaxed); if self.error_count.load(Ordering::Relaxed) > 5 { - let tpu_client = Self::new_tpu_client( - self.rpc_client.clone(), - &self.ws_addr, - self.fanout_slots, - self.connection_cache.clone(), - ) - .await?; - self.error_count.store(0, Ordering::Relaxed); - *self.tpu_client.write().await = Arc::new(tpu_client); + self.reset_tpu_client().await?; info!("TPU Reset after 5 errors"); } Ok(()) } - pub async fn get_tpu_client(&self) -> Arc { + async fn get_tpu_client(&self) -> Arc { self.tpu_client.read().await.clone() } diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 93392846..d482bdc1 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -8,8 +8,8 @@ use dashmap::DashMap; use jsonrpsee::SubscriptionSink; use log::{info, warn}; use prometheus::{ - core::GenericGauge, histogram_opts, opts, register_counter, register_histogram, - register_int_gauge, Counter, Histogram, + core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter, + register_int_gauge, Histogram, IntCounter, }; use solana_rpc_client::nonblocking::rpc_client::RpcClient; @@ -52,18 +52,18 @@ lazy_static::lazy_static! { "Time to receive finalized block from block subscribe", )) .unwrap(); - static ref FIN_BLOCKS_RECV: Counter = - register_counter!(opts!("literpc_fin_blocks_recv", "Number of Finalized Blocks Received")).unwrap(); - static ref CON_BLOCKS_RECV: Counter = - register_counter!(opts!("literpc_con_blocks_recv", "Number of Confirmed Blocks Received")).unwrap(); - static ref INCOMPLETE_FIN_BLOCKS_RECV: Counter = - register_counter!(opts!("literpc_incomplete_fin_blocks_recv", "Number of Incomplete Finalized Blocks Received")).unwrap(); - static ref INCOMPLETE_CON_BLOCKS_RECV: Counter = - register_counter!(opts!("literpc_incomplete_con_blocks_recv", "Number of Incomplete Confirmed Blocks Received")).unwrap(); - static ref TXS_CONFIRMED: Counter = - register_counter!(opts!("literpc_txs_confirmed", "Number of Transactions Confirmed")).unwrap(); - static ref TXS_FINALIZED: Counter = - register_counter!(opts!("literpc_txs_finalized", "Number of Transactions Finalized")).unwrap(); + static ref FIN_BLOCKS_RECV: IntCounter = + register_int_counter!(opts!("literpc_fin_blocks_recv", "Number of Finalized Blocks Received")).unwrap(); + static ref CON_BLOCKS_RECV: IntCounter = + register_int_counter!(opts!("literpc_con_blocks_recv", "Number of Confirmed Blocks Received")).unwrap(); + static ref INCOMPLETE_FIN_BLOCKS_RECV: IntCounter = + register_int_counter!(opts!("literpc_incomplete_fin_blocks_recv", "Number of Incomplete Finalized Blocks Received")).unwrap(); + static ref INCOMPLETE_CON_BLOCKS_RECV: IntCounter = + register_int_counter!(opts!("literpc_incomplete_con_blocks_recv", "Number of Incomplete Confirmed Blocks Received")).unwrap(); + static ref TXS_CONFIRMED: IntCounter = + register_int_counter!(opts!("literpc_txs_confirmed", "Number of Transactions Confirmed")).unwrap(); + static ref TXS_FINALIZED: IntCounter = + register_int_counter!(opts!("literpc_txs_finalized", "Number of Transactions Finalized")).unwrap(); static ref BLOCKS_IN_QUEUE: GenericGauge = register_int_gauge!(opts!("literpc_blocks_in_queue", "Number of blocks waiting to deque")).unwrap(); static ref BLOCKS_IN_RETRY_QUEUE: GenericGauge = register_int_gauge!(opts!("literpc_blocks_in_retry_queue", "Number of blocks waiting in retry")).unwrap(); static ref NUMBER_OF_SIGNATURE_SUBSCRIBERS: GenericGauge = register_int_gauge!(opts!("literpc_number_of_signature_sub", "Number of signature subscriber")).unwrap(); diff --git a/src/workers/cleaner.rs b/src/workers/cleaner.rs index 10ea4b96..c1dc4895 100644 --- a/src/workers/cleaner.rs +++ b/src/workers/cleaner.rs @@ -1,9 +1,9 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use log::info; use tokio::task::JoinHandle; -use crate::block_store::BlockStore; +use crate::{block_store::BlockStore, tpu_manager::TpuManager}; use super::{BlockListener, TxSender}; @@ -13,6 +13,7 @@ pub struct Cleaner { tx_sender: TxSender, block_listenser: BlockListener, block_store: BlockStore, + tpu_manager: Arc, } impl Cleaner { @@ -20,11 +21,13 @@ impl Cleaner { tx_sender: TxSender, block_listenser: BlockListener, block_store: BlockStore, + tpu_manager: Arc, ) -> Self { Self { tx_sender, block_listenser, - block_store: block_store, + block_store, + tpu_manager, } } @@ -60,6 +63,7 @@ impl Cleaner { self.clean_tx_sender(ttl_duration); self.clean_block_listeners(ttl_duration); self.clean_block_store(ttl_duration).await; + let _ = self.tpu_manager.reset_tpu_client().await; } }) } diff --git a/src/workers/metrics_capture.rs b/src/workers/metrics_capture.rs index 1924d8c0..9bd0dea7 100644 --- a/src/workers/metrics_capture.rs +++ b/src/workers/metrics_capture.rs @@ -15,6 +15,7 @@ lazy_static::lazy_static! { static ref TOKIO_INJQUEUEDEPTH: GenericGauge = register_int_gauge!(opts!("literpc_tokio_injection_queue_depth", "Tokio tasks in injection queue in lite rpc")).unwrap(); static ref TOKIO_NB_BLOCKING_THREADS: GenericGauge = register_int_gauge!(opts!("literpc_tokio_blocking_threads", "Tokio blocking threads in lite rpc")).unwrap(); static ref TOKIO_NB_IDLE_THREADS: GenericGauge = register_int_gauge!(opts!("literpc_tokio_idle_threads", "Tokio idle threads in lite rpc")).unwrap(); + static ref TOKIO_REMOTE_SCHEDULED_COUNT: GenericGauge = register_int_gauge!(opts!("literpc_tokio_remote_scheduled", "Tokio remote scheduled tasks")).unwrap(); static ref STD_THREADS: GenericGauge = register_int_gauge!(opts!("literpc_threads", "Nb of threads used by literpc")).unwrap(); } @@ -88,13 +89,14 @@ impl MetricsCapture { metrics.txs_confirmed = txs_confirmed; metrics.txs_finalized = txs_finalized; TXS_IN_STORE.set(txs_sent as i64); - + let metrics = tokio::runtime::Handle::current().metrics(); TOKIO_TASKS.set(metrics.num_workers() as i64); TOKIO_QUEUEDEPTH.set(metrics.blocking_queue_depth() as i64); TOKIO_NB_BLOCKING_THREADS.set(metrics.num_blocking_threads() as i64); TOKIO_NB_IDLE_THREADS.set(metrics.num_idle_blocking_threads() as i64); - TOKIO_INJQUEUEDEPTH.set( metrics.injection_queue_depth() as i64 ); + TOKIO_INJQUEUEDEPTH.set(metrics.injection_queue_depth() as i64); + TOKIO_REMOTE_SCHEDULED_COUNT.set(metrics.remote_schedule_count() as i64); } }) } diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index 2d12b79f..9dc4eca2 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -7,7 +7,7 @@ use anyhow::bail; use dashmap::DashMap; use log::{info, warn}; -use prometheus::{register_counter, Counter}; +use prometheus::{core::GenericGauge, opts, register_int_counter, register_int_gauge, IntCounter}; use solana_transaction_status::TransactionStatus; use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle}; @@ -20,8 +20,11 @@ use crate::{ use super::PostgresMpscSend; lazy_static::lazy_static! { - static ref TXS_SENT: Counter = - register_counter!("literpc_txs_sent", "Number of transactions forwarded to tpu").unwrap(); + static ref TXS_SENT: IntCounter = + register_int_counter!("literpc_txs_sent", "Number of transactions forwarded to tpu").unwrap(); + static ref TXS_SENT_ERRORS: IntCounter = + register_int_counter!("literpc_txs_sent_errors", "Number of errors while transactions forwarded to tpu").unwrap(); + static ref TX_BATCH_SIZES: GenericGauge = register_int_gauge!(opts!("literpc_tx_batch_size", "batchsize of tx sent by literpc")).unwrap(); } pub type WireTransaction = Vec; @@ -81,18 +84,19 @@ impl TxSender { txs_sent.insert(sig.to_owned(), TxProps::default()); } // metrics - TXS_SENT.inc_by(sigs_and_slots.len() as f64); + TXS_SENT.inc_by(sigs_and_slots.len() as u64); 1 } Err(err) => { + TXS_SENT_ERRORS.inc_by(sigs_and_slots.len() as u64); warn!("{err}"); 0 } }; if let Some(postgres) = postgres { - let forwarded_slot = tpu_client.get_tpu_client().await.estimated_current_slot(); + let forwarded_slot = tpu_client.estimated_current_slot().await; for (sig, recent_slot) in sigs_and_slots { MESSAGES_IN_POSTGRES_CHANNEL.inc(); @@ -161,7 +165,7 @@ impl TxSender { } } } - + TX_BATCH_SIZES.set(txs.len() as i64); batch_send.send((sigs_and_slots, txs)).await?; } })