more metriccs

This commit is contained in:
aniketfuryrocks 2023-02-05 15:43:29 +05:30
parent a20d23e924
commit 4ada8d864f
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
2 changed files with 66 additions and 10 deletions

View File

@ -18,6 +18,7 @@ use log::info;
use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink}; use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink};
use prometheus::{opts, register_counter, Counter};
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction}; use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_rpc_client_api::{ use solana_rpc_client_api::{
config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig}, config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig},
@ -34,6 +35,24 @@ use tokio::{
task::JoinHandle, task::JoinHandle,
}; };
lazy_static::lazy_static! {
static ref RPC_SEND_TX: Counter =
register_counter!(opts!("rpc_send_tx", "RPC call send transaction")).unwrap();
static ref RPC_GET_LATEST_BLOCKHASH: Counter =
register_counter!(opts!("rpc_get_latest_blockhash", "RPC call to get latest block hash")).unwrap();
static ref RPC_IS_BLOCKHASH_VALID: Counter =
register_counter!(opts!("rpc_is_blockhash_valid", "RPC call to check if blockhash is vali calld")).unwrap();
static ref RPC_GET_SIGNATURE_STATUSES: Counter =
register_counter!(opts!("rpc_get_signature_statuses", "RPC call to get signature statuses")).unwrap();
static ref RPC_GET_VERSION: Counter =
register_counter!(opts!("rpc_get_version", "RPC call to version")).unwrap();
static ref RPC_REQUEST_AIRDROP: Counter =
register_counter!(opts!("rpc_airdrop", "RPC call to request airdrop")).unwrap();
static ref RPC_SIGNATURE_SUBSCRIBE: Counter =
register_counter!(opts!("rpc_signature_subscribe", "RPC call to subscribe to signature")).unwrap();
}
/// A bridge between clients and tpu /// A bridge between clients and tpu
pub struct LiteBridge { pub struct LiteBridge {
pub rpc_client: Arc<RpcClient>, pub rpc_client: Arc<RpcClient>,
@ -174,6 +193,8 @@ impl LiteRpcServer for LiteBridge {
tx: String, tx: String,
send_transaction_config: Option<SendTransactionConfig>, send_transaction_config: Option<SendTransactionConfig>,
) -> crate::rpc::Result<String> { ) -> crate::rpc::Result<String> {
RPC_SEND_TX.inc();
let SendTransactionConfig { let SendTransactionConfig {
encoding, encoding,
max_retries: _, max_retries: _,
@ -215,6 +236,8 @@ impl LiteRpcServer for LiteBridge {
&self, &self,
config: Option<RpcContextConfig>, config: Option<RpcContextConfig>,
) -> crate::rpc::Result<RpcResponse<RpcBlockhash>> { ) -> crate::rpc::Result<RpcResponse<RpcBlockhash>> {
RPC_GET_LATEST_BLOCKHASH.inc();
let commitment_config = config let commitment_config = config
.map(|config| config.commitment.unwrap_or_default()) .map(|config| config.commitment.unwrap_or_default())
.unwrap_or_default(); .unwrap_or_default();
@ -241,6 +264,8 @@ impl LiteRpcServer for LiteBridge {
blockhash: String, blockhash: String,
config: Option<IsBlockHashValidConfig>, config: Option<IsBlockHashValidConfig>,
) -> crate::rpc::Result<RpcResponse<bool>> { ) -> crate::rpc::Result<RpcResponse<bool>> {
RPC_IS_BLOCKHASH_VALID.inc();
let commitment = config.unwrap_or_default().commitment.unwrap_or_default(); let commitment = config.unwrap_or_default().commitment.unwrap_or_default();
let commitment = CommitmentConfig { commitment }; let commitment = CommitmentConfig { commitment };
@ -283,6 +308,8 @@ impl LiteRpcServer for LiteBridge {
sigs: Vec<String>, sigs: Vec<String>,
_config: Option<RpcSignatureStatusConfig>, _config: Option<RpcSignatureStatusConfig>,
) -> crate::rpc::Result<RpcResponse<Vec<Option<TransactionStatus>>>> { ) -> crate::rpc::Result<RpcResponse<Vec<Option<TransactionStatus>>>> {
RPC_GET_SIGNATURE_STATUSES.inc();
let sig_statuses = sigs let sig_statuses = sigs
.iter() .iter()
.map(|sig| { .map(|sig| {
@ -308,6 +335,8 @@ impl LiteRpcServer for LiteBridge {
} }
fn get_version(&self) -> crate::rpc::Result<RpcVersionInfo> { fn get_version(&self) -> crate::rpc::Result<RpcVersionInfo> {
RPC_GET_VERSION.inc();
let version = solana_version::Version::default(); let version = solana_version::Version::default();
Ok(RpcVersionInfo { Ok(RpcVersionInfo {
solana_core: version.to_string(), solana_core: version.to_string(),
@ -321,6 +350,8 @@ impl LiteRpcServer for LiteBridge {
lamports: u64, lamports: u64,
config: Option<RpcRequestAirdropConfig>, config: Option<RpcRequestAirdropConfig>,
) -> crate::rpc::Result<String> { ) -> crate::rpc::Result<String> {
RPC_REQUEST_AIRDROP.inc();
let pubkey = match Pubkey::from_str(&pubkey_str) { let pubkey = match Pubkey::from_str(&pubkey_str) {
Ok(pubkey) => pubkey, Ok(pubkey) => pubkey,
Err(err) => { Err(err) => {
@ -352,6 +383,7 @@ impl LiteRpcServer for LiteBridge {
signature: String, signature: String,
_commitment_config: CommitmentConfig, _commitment_config: CommitmentConfig,
) -> SubscriptionResult { ) -> SubscriptionResult {
RPC_SIGNATURE_SUBSCRIBE.inc();
sink.accept()?; sink.accept()?;
self.block_listner.signature_subscribe(signature, sink); self.block_listner.signature_subscribe(signature, sink);
Ok(()) Ok(())

View File

@ -39,6 +39,14 @@ lazy_static::lazy_static! {
"Time to receive finalized block from block subscribe", "Time to receive finalized block from block subscribe",
)) ))
.unwrap(); .unwrap();
static ref FIN_BLOCKS_RECV: Counter =
register_counter!(opts!("fin_blocks_recv", "Number of Finalized Blocks Received")).unwrap();
static ref CON_BLOCKS_RECV: Counter =
register_counter!(opts!("con_blocks_recv", "Number of Confirmed Blocks Received")).unwrap();
static ref INCOMPLETE_FIN_BLOCKS_RECV: Counter =
register_counter!(opts!("incomplete_fin_blocks_recv", "Number of Incomplete Finalized Blocks Received")).unwrap();
static ref INCOMPLETE_CON_BLOCKS_RECV: Counter =
register_counter!(opts!("incomplete_con_blocks_recv", "Number of Incomplete Confirmed Blocks Received")).unwrap();
static ref TXS_CONFIRMED: Counter = static ref TXS_CONFIRMED: Counter =
register_counter!(opts!("txs_confirmed", "Number of Transactions Confirmed")).unwrap(); register_counter!(opts!("txs_confirmed", "Number of Transactions Confirmed")).unwrap();
static ref TXS_FINALIZED: Counter = static ref TXS_FINALIZED: Counter =
@ -92,6 +100,14 @@ impl BlockListener {
self.signature_subscribers.remove(&signature); self.signature_subscribers.remove(&signature);
} }
fn increment_invalid_block_metric(commitment_config: CommitmentConfig) {
if commitment_config.is_finalized() {
INCOMPLETE_FIN_BLOCKS_RECV.inc();
} else {
INCOMPLETE_CON_BLOCKS_RECV.inc();
}
}
pub async fn listen_from_pubsub( pub async fn listen_from_pubsub(
self, self,
pubsub_client: &PubsubClient, pubsub_client: &PubsubClient,
@ -131,27 +147,35 @@ impl BlockListener {
}; };
let Some(block) = recv.as_mut().next().await else { let Some(block) = recv.as_mut().next().await else {
bail!("PubSub broke"); bail!("PubSub broke");
}; };
timer.observe_duration(); timer.observe_duration();
if commitment_config.is_finalized() {
FIN_BLOCKS_RECV.inc();
} else {
CON_BLOCKS_RECV.inc();
};
let slot = block.context.slot; let slot = block.context.slot;
let Some(block) = block.value.block else { let Some(block) = block.value.block else {
continue; Self::increment_invalid_block_metric(commitment_config);
}; continue;
};
let Some(block_height) = block.block_height else { let Some(block_height) = block.block_height else {
continue; Self::increment_invalid_block_metric(commitment_config);
}; continue;
};
let blockhash = block.blockhash;
let Some(transactions) = block.transactions else { let Some(transactions) = block.transactions else {
continue; Self::increment_invalid_block_metric(commitment_config);
}; continue;
};
let blockhash = block.blockhash;
let parent_slot = block.parent_slot; let parent_slot = block.parent_slot;
self.block_store self.block_store