This commit is contained in:
Aniket Prajapati 2023-01-07 18:10:56 +05:30
parent 14a3b51e49
commit a8e4daba5a
No known key found for this signature in database
GPG Key ID: D4346D8C9C5398F2
6 changed files with 98 additions and 2 deletions

View File

@ -4,6 +4,10 @@ version = "0.1.0"
edition = "2021"
description = "A lite version of solana rpc to send and confirm transactions"
[features]
default = []
metrics = []
[workspace]
members = [
"bench-utils"

View File

@ -1,3 +1,4 @@
use crate::{
configs::SendTransactionConfig,
encoding::BinaryEncoding,
@ -8,6 +9,7 @@ use crate::{
use std::{ops::Deref, str::FromStr, sync::Arc};
use anyhow::bail;
use log::info;
use reqwest::Url;
@ -33,6 +35,10 @@ pub struct LiteBridge {
pub tx_sender: Option<TxSender>,
pub finalized_block_listenser: BlockListener,
pub confirmed_block_listenser: BlockListener,
#[cfg(feature = "metrics")]
pub txs_sent: Arc<dashmap::DashSet<String>>,
#[cfg(feature = "metrics")]
pub metrics: Arc<tokio::sync::RwLock<crate::metrics::Metrics>>,
}
impl LiteBridge {
@ -61,6 +67,43 @@ impl LiteBridge {
confirmed_block_listenser,
rpc_url,
tpu_client,
#[cfg(feature = "metrics")]
txs_sent: Default::default(),
#[cfg(feature = "metrics")]
metrics: Default::default(),
})
}
#[cfg(feature = "metrics")]
pub fn capture_metrics(self) -> JoinHandle<anyhow::Result<()>> {
let mut one_second = tokio::time::interval(std::time::Duration::from_secs(crate::DEFAULT_METRIC_RESET_TIME_INTERVAL));
tokio::spawn(async move {
info!("Capturing Metrics");
loop {
let txs_sent: Vec<String> = self.txs_sent.iter().map(|v| v.clone()).collect();
self.txs_sent.clear();
let metrics = crate::metrics::Metrics {
total_txs: txs_sent.len(),
txs_confirmed: self
.confirmed_block_listenser
.num_of_sigs_commited(&txs_sent)
.await,
txs_finalized: self
.finalized_block_listenser
.num_of_sigs_commited(&txs_sent)
.await,
in_secs: crate::DEFAULT_METRIC_RESET_TIME_INTERVAL
};
log::warn!("{metrics:?}");
*self.metrics.write().await = metrics;
one_second.tick().await;
}
})
}
@ -94,7 +137,7 @@ impl LiteBridge {
.http_only()
.build(http_addr.clone())
.await?
.start(self.into_rpc())?;
.start(self.clone().into_rpc())?;
let ws_server = tokio::spawn(async move {
info!("Websocket Server started at {ws_addr:?}");
@ -119,12 +162,24 @@ impl LiteBridge {
services.push(tx_sender.execute());
}
#[cfg(feature = "metrics")]
services.push(self.capture_metrics());
Ok(services)
}
}
#[jsonrpsee::core::async_trait]
impl LiteRpcServer for LiteBridge {
#[allow(unreachable_code)]
async fn get_metrics(&self) -> crate::rpc::Result<crate::metrics::Metrics> {
#[cfg(feature = "metrics")]
{
return Ok(self.metrics.read().await.to_owned());
}
panic!("server not compiled with metrics support")
}
async fn send_transaction(
&self,
tx: String,
@ -141,6 +196,9 @@ impl LiteRpcServer for LiteBridge {
.unwrap()
.signatures[0];
#[cfg(feature = "metrics")]
self.txs_sent.insert(sig.to_string());
if let Some(tx_sender) = &self.tx_sender {
tx_sender.enqnueue_tx(raw_tx);
} else {

View File

@ -8,6 +8,7 @@ pub mod encoding;
pub mod errors;
pub mod rpc;
pub mod workers;
pub mod metrics;
pub type WireTransaction = Vec<u8>;
@ -23,5 +24,9 @@ pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
pub const TX_MAX_RETRIES_UPPER_LIMIT: u16 = 5;
#[from_env]
pub const DEFAULT_TX_RETRY_BATCH_SIZE: usize = 20;
#[cfg(feature = "metrics")]
#[from_env]
pub const DEFAULT_METRIC_RESET_TIME_INTERVAL: u64 = 12;
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =
TransactionConfirmationStatus::Finalized;

10
src/metrics.rs Normal file
View File

@ -0,0 +1,10 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Metrics {
pub total_txs: usize,
pub txs_confirmed: usize,
pub txs_finalized: usize,
pub in_secs: u64
}

View File

@ -8,6 +8,7 @@ use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::TransactionStatus;
use crate::configs::SendTransactionConfig;
use crate::metrics::Metrics;
pub type Result<T> = std::result::Result<T, Error>;
@ -44,6 +45,9 @@ pub trait LiteRpc {
config: Option<RpcRequestAirdropConfig>,
) -> Result<String>;
#[method(name = "getMetrics")]
async fn get_metrics(&self) -> Result<Metrics>;
#[subscription(name = "signatureSubscribe" => "signatureNotification", unsubscribe="signatureUnsubscribe", item=RpcResponse<serde_json::Value>)]
fn signature_subscribe(&self, signature: String, commitment_config: CommitmentConfig);
}

View File

@ -23,8 +23,9 @@ use tokio::{sync::RwLock, task::JoinHandle};
#[derive(Clone)]
pub struct BlockListener {
pub_sub_client: Arc<PubsubClient>,
pub blocks: Arc<DashMap<String, TransactionStatus>>,
commitment_config: CommitmentConfig,
pub blocks: Arc<DashMap<String, TransactionStatus>>,
latest_block_info: Arc<RwLock<BlockInformation>>,
signature_subscribers: Arc<DashMap<String, SubscriptionSink>>,
}
@ -71,6 +72,16 @@ impl BlockListener {
commitment_levels
}
pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize {
let mut num_of_sigs_commited = 0;
for sig in sigs {
if self.blocks.contains_key(sig) {
num_of_sigs_commited += 1;
}
}
num_of_sigs_commited
}
pub async fn get_slot(&self) -> u64 {
self.latest_block_info.read().await.slot
}
@ -177,4 +188,8 @@ impl BlockListener {
bail!("Stopped Listening to {commitment:?} blocks")
})
}
pub fn capture_metrics(self) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn(async move { Ok(()) })
}
}