global metrics store

This commit is contained in:
Aniket Prajapati 2023-01-08 19:43:33 +05:30
parent a8e4daba5a
commit f3bc5e873b
No known key found for this signature in database
GPG Key ID: D4346D8C9C5398F2
4 changed files with 101 additions and 57 deletions

View File

@ -1,4 +1,3 @@
use crate::{ use crate::{
configs::SendTransactionConfig, configs::SendTransactionConfig,
encoding::BinaryEncoding, encoding::BinaryEncoding,
@ -10,12 +9,13 @@ use std::{ops::Deref, str::FromStr, sync::Arc};
use anyhow::bail; use anyhow::bail;
use dashmap::DashMap;
use log::info; use log::info;
use reqwest::Url; use reqwest::Url;
use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink}; use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink};
use solana_client::{ use solana_client::{
nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient}, nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient, tpu_client::TpuClient},
rpc_config::{RpcContextConfig, RpcRequestAirdropConfig}, rpc_config::{RpcContextConfig, RpcRequestAirdropConfig},
rpc_response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo}, rpc_response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
}; };
@ -24,7 +24,7 @@ use solana_sdk::{
pubkey::Pubkey, pubkey::Pubkey,
transaction::VersionedTransaction, transaction::VersionedTransaction,
}; };
use solana_transaction_status::TransactionStatus; use solana_transaction_status::{TransactionStatus};
use tokio::{net::ToSocketAddrs, task::JoinHandle}; use tokio::{net::ToSocketAddrs, task::JoinHandle};
/// A bridge between clients and tpu /// A bridge between clients and tpu
@ -35,8 +35,7 @@ pub struct LiteBridge {
pub tx_sender: Option<TxSender>, pub tx_sender: Option<TxSender>,
pub finalized_block_listenser: BlockListener, pub finalized_block_listenser: BlockListener,
pub confirmed_block_listenser: BlockListener, pub confirmed_block_listenser: BlockListener,
#[cfg(feature = "metrics")] pub txs_sent: Arc<DashMap<String, Option<TransactionStatus>>>,
pub txs_sent: Arc<dashmap::DashSet<String>>,
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
pub metrics: Arc<tokio::sync::RwLock<crate::metrics::Metrics>>, pub metrics: Arc<tokio::sync::RwLock<crate::metrics::Metrics>>,
} }
@ -52,10 +51,24 @@ impl LiteBridge {
let tpu_client = let tpu_client =
Arc::new(TpuClient::new(rpc_client.clone(), ws_addr, Default::default()).await?); Arc::new(TpuClient::new(rpc_client.clone(), ws_addr, Default::default()).await?);
let finalized_block_listenser = let pub_sub_client = Arc::new(PubsubClient::new(ws_addr).await?);
BlockListener::new(rpc_client.clone(), ws_addr, CommitmentConfig::finalized()).await?; let txs_sent = Arc::new(DashMap::new());
let confirmed_block_listenser =
BlockListener::new(rpc_client.clone(), ws_addr, CommitmentConfig::confirmed()).await?; let finalized_block_listenser = BlockListener::new(
pub_sub_client.clone(),
rpc_client.clone(),
txs_sent.clone(),
CommitmentConfig::finalized(),
)
.await?;
let confirmed_block_listenser = BlockListener::new(
pub_sub_client,
rpc_client.clone(),
txs_sent.clone(),
CommitmentConfig::confirmed(),
)
.await?;
Ok(Self { Ok(Self {
tx_sender: if batch_transactions { tx_sender: if batch_transactions {
@ -67,8 +80,7 @@ impl LiteBridge {
confirmed_block_listenser, confirmed_block_listenser,
rpc_url, rpc_url,
tpu_client, tpu_client,
#[cfg(feature = "metrics")] txs_sent,
txs_sent: Default::default(),
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
metrics: Default::default(), metrics: Default::default(),
}) })
@ -76,33 +88,43 @@ impl LiteBridge {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
pub fn capture_metrics(self) -> JoinHandle<anyhow::Result<()>> { pub fn capture_metrics(self) -> JoinHandle<anyhow::Result<()>> {
let mut one_second = tokio::time::interval(std::time::Duration::from_secs(crate::DEFAULT_METRIC_RESET_TIME_INTERVAL)); use solana_transaction_status::TransactionConfirmationStatus;
let mut one_second = tokio::time::interval(std::time::Duration::from_secs(1));
tokio::spawn(async move { tokio::spawn(async move {
info!("Capturing Metrics"); info!("Capturing Metrics");
loop { loop {
let txs_sent: Vec<String> = self.txs_sent.iter().map(|v| v.clone()).collect(); one_second.tick().await;
self.txs_sent.clear();
let metrics = crate::metrics::Metrics { let total_txs_sent = self.txs_sent.len();
total_txs: txs_sent.len(), let mut total_txs_confirmed: usize = 0;
txs_confirmed: self let mut total_txs_finalized: usize = 0;
.confirmed_block_listenser
.num_of_sigs_commited(&txs_sent) for tx in self.txs_sent.iter() {
.await, if let Some(tx) = tx.value() {
txs_finalized: self match tx.confirmation_status() {
.finalized_block_listenser TransactionConfirmationStatus::Confirmed => total_txs_confirmed += 1,
.num_of_sigs_commited(&txs_sent) TransactionConfirmationStatus::Finalized => total_txs_finalized += 1,
.await, _ => (),
in_secs: crate::DEFAULT_METRIC_RESET_TIME_INTERVAL }
}; }
}
let mut metrics = self.metrics.write().await;
metrics.txs_sent_in_one_sec = total_txs_sent - metrics.total_txs_sent;
metrics.txs_confirmed_in_one_sec =
total_txs_confirmed - metrics.total_txs_confirmed;
metrics.txs_finalized_in_one_sec =
total_txs_finalized - metrics.total_txs_finalized;
metrics.total_txs_sent = total_txs_sent;
metrics.total_txs_confirmed = total_txs_confirmed;
metrics.total_txs_finalized = total_txs_finalized;
log::warn!("{metrics:?}"); log::warn!("{metrics:?}");
*self.metrics.write().await = metrics;
one_second.tick().await;
} }
}) })
} }
@ -197,7 +219,7 @@ impl LiteRpcServer for LiteBridge {
.signatures[0]; .signatures[0];
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
self.txs_sent.insert(sig.to_string()); self.txs_sent.insert(sig.to_string(), None);
if let Some(tx_sender) = &self.tx_sender { if let Some(tx_sender) = &self.tx_sender {
tx_sender.enqnueue_tx(raw_tx); tx_sender.enqnueue_tx(raw_tx);

View File

@ -2,9 +2,11 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Default, Debug, Serialize, Deserialize)] #[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Metrics { pub struct Metrics {
pub total_txs: usize, pub total_txs_sent: usize,
pub txs_confirmed: usize, pub total_txs_confirmed: usize,
pub txs_finalized: usize, pub total_txs_finalized: usize,
pub in_secs: u64 pub txs_sent_in_one_sec: usize,
pub txs_confirmed_in_one_sec: usize,
pub txs_finalized_in_one_sec: usize
} }

View File

@ -18,14 +18,15 @@ use solana_sdk::{
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus}; use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
use tokio::{sync::RwLock, task::JoinHandle}; use tokio::{sync::RwLock, task::JoinHandle};
/// Background worker which listen's to new blocks /// Background worker which listen's to new blocks
/// and keeps a track of confirmed txs /// and keeps a track of confirmed txs
#[derive(Clone)] #[derive(Clone)]
pub struct BlockListener { pub struct BlockListener {
pub_sub_client: Arc<PubsubClient>, pub_sub_client: Arc<PubsubClient>,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
txs_sent: Arc<DashMap<String, Option<TransactionStatus>>>,
pub blocks: Arc<DashMap<String, TransactionStatus>>,
latest_block_info: Arc<RwLock<BlockInformation>>, latest_block_info: Arc<RwLock<BlockInformation>>,
signature_subscribers: Arc<DashMap<String, SubscriptionSink>>, signature_subscribers: Arc<DashMap<String, SubscriptionSink>>,
} }
@ -38,18 +39,18 @@ struct BlockInformation {
impl BlockListener { impl BlockListener {
pub async fn new( pub async fn new(
pub_sub_client: Arc<PubsubClient>,
rpc_client: Arc<RpcClient>, rpc_client: Arc<RpcClient>,
ws_url: &str, txs_sent: Arc<DashMap<String, Option<TransactionStatus>>>,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let pub_sub_client = Arc::new(PubsubClient::new(ws_url).await?);
let (latest_block_hash, block_height) = rpc_client let (latest_block_hash, block_height) = rpc_client
.get_latest_blockhash_with_commitment(commitment_config) .get_latest_blockhash_with_commitment(commitment_config)
.await?; .await?;
Ok(Self { Ok(Self {
pub_sub_client, pub_sub_client,
blocks: Default::default(), txs_sent,
latest_block_info: Arc::new(RwLock::new(BlockInformation { latest_block_info: Arc::new(RwLock::new(BlockInformation {
slot: rpc_client.get_slot().await?, slot: rpc_client.get_slot().await?,
blockhash: latest_block_hash.to_string(), blockhash: latest_block_hash.to_string(),
@ -66,7 +67,11 @@ impl BlockListener {
let mut commitment_levels = Vec::with_capacity(sigs.len()); let mut commitment_levels = Vec::with_capacity(sigs.len());
for sig in sigs { for sig in sigs {
commitment_levels.push(self.blocks.get(sig).map(|some| some.value().clone())); commitment_levels.push(
self.txs_sent
.get(sig)
.map_or_else(|| None, |some| some.value().clone()),
);
} }
commitment_levels commitment_levels
@ -75,7 +80,7 @@ impl BlockListener {
pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize { pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize {
let mut num_of_sigs_commited = 0; let mut num_of_sigs_commited = 0;
for sig in sigs { for sig in sigs {
if self.blocks.contains_key(sig) { if self.txs_sent.contains_key(sig) {
num_of_sigs_commited += 1; num_of_sigs_commited += 1;
} }
} }
@ -156,8 +161,20 @@ impl BlockListener {
}; };
for sig in signatures { for sig in signatures {
let Some(mut tx_status) = self.txs_sent.get_mut(&sig) else {
continue;
};
info!("{comfirmation_status:?} {sig}"); info!("{comfirmation_status:?} {sig}");
*tx_status.value_mut() = Some(TransactionStatus {
slot,
confirmations: None, //TODO: talk about this
status: Ok(()), // legacy field
err: None,
confirmation_status: Some(comfirmation_status.clone()),
});
// subscribers // subscribers
if let Some((sig, mut sink)) = self.signature_subscribers.remove(&sig) { if let Some((sig, mut sink)) = self.signature_subscribers.remove(&sig) {
warn!("notification {}", sig); warn!("notification {}", sig);
@ -171,17 +188,6 @@ impl BlockListener {
}) })
.unwrap(); .unwrap();
} }
self.blocks.insert(
sig,
TransactionStatus {
slot,
confirmations: None, //TODO: talk about this
status: Ok(()), // legacy field
err: None,
confirmation_status: Some(comfirmation_status.clone()),
},
);
} }
} }

View File

@ -2,15 +2,19 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use bench_utils::helpers::BenchHelper; use bench_utils::helpers::BenchHelper;
use dashmap::DashMap;
use futures::future::try_join_all; use futures::future::try_join_all;
use lite_rpc::{ use lite_rpc::{
encoding::BinaryEncoding, encoding::BinaryEncoding,
workers::{BlockListener, TxSender}, workers::{BlockListener, TxSender},
DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR, DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR,
}; };
use solana_client::nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient}; use solana_client::nonblocking::{
pubsub_client::PubsubClient, rpc_client::RpcClient, tpu_client::TpuClient,
};
use solana_sdk::{commitment_config::CommitmentConfig, native_token::LAMPORTS_PER_SOL}; use solana_sdk::{commitment_config::CommitmentConfig, native_token::LAMPORTS_PER_SOL};
use solana_transaction_status::TransactionConfirmationStatus;
#[tokio::test] #[tokio::test]
async fn send_and_confirm_txs() { async fn send_and_confirm_txs() {
@ -24,9 +28,13 @@ async fn send_and_confirm_txs() {
.unwrap(), .unwrap(),
); );
let pub_sub_client = Arc::new(PubsubClient::new(DEFAULT_WS_ADDR).await.unwrap());
let txs_sent = Arc::new(DashMap::new());
let block_listener = BlockListener::new( let block_listener = BlockListener::new(
pub_sub_client.clone(),
rpc_client.clone(), rpc_client.clone(),
DEFAULT_WS_ADDR, txs_sent.clone(),
CommitmentConfig::confirmed(), CommitmentConfig::confirmed(),
) )
.await .await
@ -56,9 +64,15 @@ async fn send_and_confirm_txs() {
let sig = sig.to_string(); let sig = sig.to_string();
for _ in 0..2 { for _ in 0..2 {
if block_listener.blocks.contains_key(&sig) { let tx_status = txs_sent.get(&sig).unwrap();
if let Some(tx_status) = tx_status.value() {
if tx_status.confirmation_status()
== TransactionConfirmationStatus::Confirmed
{
return; return;
} }
}
tokio::time::sleep(Duration::from_millis(800)).await; tokio::time::sleep(Duration::from_millis(800)).await;
} }