diff --git a/src/bridge.rs b/src/bridge.rs index da7f3134..6c39f3d2 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -18,6 +18,7 @@ use log::info; use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink}; +use prometheus::{opts, register_counter, Counter}; use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction}; use solana_rpc_client_api::{ config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig}, @@ -34,6 +35,24 @@ use tokio::{ task::JoinHandle, }; +lazy_static::lazy_static! { + static ref RPC_SEND_TX: Counter = + register_counter!(opts!("rpc_send_tx", "RPC call send transaction")).unwrap(); + static ref RPC_GET_LATEST_BLOCKHASH: Counter = + register_counter!(opts!("rpc_get_latest_blockhash", "RPC call to get latest block hash")).unwrap(); + static ref RPC_IS_BLOCKHASH_VALID: Counter = + register_counter!(opts!("rpc_is_blockhash_valid", "RPC call to check if blockhash is vali calld")).unwrap(); + static ref RPC_GET_SIGNATURE_STATUSES: Counter = + register_counter!(opts!("rpc_get_signature_statuses", "RPC call to get signature statuses")).unwrap(); + static ref RPC_GET_VERSION: Counter = + register_counter!(opts!("rpc_get_version", "RPC call to version")).unwrap(); + static ref RPC_REQUEST_AIRDROP: Counter = + register_counter!(opts!("rpc_airdrop", "RPC call to request airdrop")).unwrap(); + static ref RPC_SIGNATURE_SUBSCRIBE: Counter = + register_counter!(opts!("rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap(); + +} + /// A bridge between clients and tpu pub struct LiteBridge { pub rpc_client: Arc, @@ -174,6 +193,8 @@ impl LiteRpcServer for LiteBridge { tx: String, send_transaction_config: Option, ) -> crate::rpc::Result { + RPC_SEND_TX.inc(); + let SendTransactionConfig { encoding, max_retries: _, @@ -215,6 +236,8 @@ impl LiteRpcServer for LiteBridge { &self, config: Option, ) -> crate::rpc::Result> { + RPC_GET_LATEST_BLOCKHASH.inc(); + let commitment_config = config .map(|config| config.commitment.unwrap_or_default()) .unwrap_or_default(); @@ -241,6 +264,8 @@ impl LiteRpcServer for LiteBridge { blockhash: String, config: Option, ) -> crate::rpc::Result> { + RPC_IS_BLOCKHASH_VALID.inc(); + let commitment = config.unwrap_or_default().commitment.unwrap_or_default(); let commitment = CommitmentConfig { commitment }; @@ -283,6 +308,8 @@ impl LiteRpcServer for LiteBridge { sigs: Vec, _config: Option, ) -> crate::rpc::Result>>> { + RPC_GET_SIGNATURE_STATUSES.inc(); + let sig_statuses = sigs .iter() .map(|sig| { @@ -308,6 +335,8 @@ impl LiteRpcServer for LiteBridge { } fn get_version(&self) -> crate::rpc::Result { + RPC_GET_VERSION.inc(); + let version = solana_version::Version::default(); Ok(RpcVersionInfo { solana_core: version.to_string(), @@ -321,6 +350,8 @@ impl LiteRpcServer for LiteBridge { lamports: u64, config: Option, ) -> crate::rpc::Result { + RPC_REQUEST_AIRDROP.inc(); + let pubkey = match Pubkey::from_str(&pubkey_str) { Ok(pubkey) => pubkey, Err(err) => { @@ -352,6 +383,7 @@ impl LiteRpcServer for LiteBridge { signature: String, _commitment_config: CommitmentConfig, ) -> SubscriptionResult { + RPC_SIGNATURE_SUBSCRIBE.inc(); sink.accept()?; self.block_listner.signature_subscribe(signature, sink); Ok(()) diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index bd0172b7..78ff4a8b 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -39,6 +39,14 @@ lazy_static::lazy_static! { "Time to receive finalized block from block subscribe", )) .unwrap(); + static ref FIN_BLOCKS_RECV: Counter = + register_counter!(opts!("fin_blocks_recv", "Number of Finalized Blocks Received")).unwrap(); + static ref CON_BLOCKS_RECV: Counter = + register_counter!(opts!("con_blocks_recv", "Number of Confirmed Blocks Received")).unwrap(); + static ref INCOMPLETE_FIN_BLOCKS_RECV: Counter = + register_counter!(opts!("incomplete_fin_blocks_recv", "Number of Incomplete Finalized Blocks Received")).unwrap(); + static ref INCOMPLETE_CON_BLOCKS_RECV: Counter = + register_counter!(opts!("incomplete_con_blocks_recv", "Number of Incomplete Confirmed Blocks Received")).unwrap(); static ref TXS_CONFIRMED: Counter = register_counter!(opts!("txs_confirmed", "Number of Transactions Confirmed")).unwrap(); static ref TXS_FINALIZED: Counter = @@ -92,6 +100,14 @@ impl BlockListener { self.signature_subscribers.remove(&signature); } + fn increment_invalid_block_metric(commitment_config: CommitmentConfig) { + if commitment_config.is_finalized() { + INCOMPLETE_FIN_BLOCKS_RECV.inc(); + } else { + INCOMPLETE_CON_BLOCKS_RECV.inc(); + } + } + pub async fn listen_from_pubsub( self, pubsub_client: &PubsubClient, @@ -131,27 +147,35 @@ impl BlockListener { }; let Some(block) = recv.as_mut().next().await else { - bail!("PubSub broke"); - }; + bail!("PubSub broke"); + }; timer.observe_duration(); + if commitment_config.is_finalized() { + FIN_BLOCKS_RECV.inc(); + } else { + CON_BLOCKS_RECV.inc(); + }; + let slot = block.context.slot; let Some(block) = block.value.block else { - continue; - }; + Self::increment_invalid_block_metric(commitment_config); + continue; + }; let Some(block_height) = block.block_height else { - continue; - }; - - let blockhash = block.blockhash; + Self::increment_invalid_block_metric(commitment_config); + continue; + }; let Some(transactions) = block.transactions else { - continue; - }; + Self::increment_invalid_block_metric(commitment_config); + continue; + }; + let blockhash = block.blockhash; let parent_slot = block.parent_slot; self.block_store