diff --git a/src/bridge.rs b/src/bridge.rs index f94556e4..3898ad73 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -1,4 +1,3 @@ - use crate::{ configs::SendTransactionConfig, encoding::BinaryEncoding, @@ -10,12 +9,13 @@ use std::{ops::Deref, str::FromStr, sync::Arc}; use anyhow::bail; +use dashmap::DashMap; use log::info; use reqwest::Url; use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink}; use solana_client::{ - nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient}, + nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient, tpu_client::TpuClient}, rpc_config::{RpcContextConfig, RpcRequestAirdropConfig}, rpc_response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo}, }; @@ -24,7 +24,7 @@ use solana_sdk::{ pubkey::Pubkey, transaction::VersionedTransaction, }; -use solana_transaction_status::TransactionStatus; +use solana_transaction_status::{TransactionStatus}; use tokio::{net::ToSocketAddrs, task::JoinHandle}; /// A bridge between clients and tpu @@ -35,8 +35,7 @@ pub struct LiteBridge { pub tx_sender: Option, pub finalized_block_listenser: BlockListener, pub confirmed_block_listenser: BlockListener, - #[cfg(feature = "metrics")] - pub txs_sent: Arc>, + pub txs_sent: Arc>>, #[cfg(feature = "metrics")] pub metrics: Arc>, } @@ -52,10 +51,24 @@ impl LiteBridge { let tpu_client = Arc::new(TpuClient::new(rpc_client.clone(), ws_addr, Default::default()).await?); - let finalized_block_listenser = - BlockListener::new(rpc_client.clone(), ws_addr, CommitmentConfig::finalized()).await?; - let confirmed_block_listenser = - BlockListener::new(rpc_client.clone(), ws_addr, CommitmentConfig::confirmed()).await?; + let pub_sub_client = Arc::new(PubsubClient::new(ws_addr).await?); + let txs_sent = Arc::new(DashMap::new()); + + let finalized_block_listenser = BlockListener::new( + pub_sub_client.clone(), + rpc_client.clone(), + txs_sent.clone(), + CommitmentConfig::finalized(), + ) + .await?; + + let confirmed_block_listenser = BlockListener::new( + pub_sub_client, + rpc_client.clone(), + txs_sent.clone(), + CommitmentConfig::confirmed(), + ) + .await?; Ok(Self { tx_sender: if batch_transactions { @@ -67,8 +80,7 @@ impl LiteBridge { confirmed_block_listenser, rpc_url, tpu_client, - #[cfg(feature = "metrics")] - txs_sent: Default::default(), + txs_sent, #[cfg(feature = "metrics")] metrics: Default::default(), }) @@ -76,33 +88,43 @@ impl LiteBridge { #[cfg(feature = "metrics")] pub fn capture_metrics(self) -> JoinHandle> { - let mut one_second = tokio::time::interval(std::time::Duration::from_secs(crate::DEFAULT_METRIC_RESET_TIME_INTERVAL)); + use solana_transaction_status::TransactionConfirmationStatus; + + let mut one_second = tokio::time::interval(std::time::Duration::from_secs(1)); tokio::spawn(async move { info!("Capturing Metrics"); loop { - let txs_sent: Vec = self.txs_sent.iter().map(|v| v.clone()).collect(); - self.txs_sent.clear(); + one_second.tick().await; - let metrics = crate::metrics::Metrics { - total_txs: txs_sent.len(), - txs_confirmed: self - .confirmed_block_listenser - .num_of_sigs_commited(&txs_sent) - .await, - txs_finalized: self - .finalized_block_listenser - .num_of_sigs_commited(&txs_sent) - .await, - in_secs: crate::DEFAULT_METRIC_RESET_TIME_INTERVAL - }; + let total_txs_sent = self.txs_sent.len(); + let mut total_txs_confirmed: usize = 0; + let mut total_txs_finalized: usize = 0; + + for tx in self.txs_sent.iter() { + if let Some(tx) = tx.value() { + match tx.confirmation_status() { + TransactionConfirmationStatus::Confirmed => total_txs_confirmed += 1, + TransactionConfirmationStatus::Finalized => total_txs_finalized += 1, + _ => (), + } + } + } + + let mut metrics = self.metrics.write().await; + + metrics.txs_sent_in_one_sec = total_txs_sent - metrics.total_txs_sent; + metrics.txs_confirmed_in_one_sec = + total_txs_confirmed - metrics.total_txs_confirmed; + metrics.txs_finalized_in_one_sec = + total_txs_finalized - metrics.total_txs_finalized; + + metrics.total_txs_sent = total_txs_sent; + metrics.total_txs_confirmed = total_txs_confirmed; + metrics.total_txs_finalized = total_txs_finalized; log::warn!("{metrics:?}"); - - *self.metrics.write().await = metrics; - - one_second.tick().await; } }) } @@ -197,7 +219,7 @@ impl LiteRpcServer for LiteBridge { .signatures[0]; #[cfg(feature = "metrics")] - self.txs_sent.insert(sig.to_string()); + self.txs_sent.insert(sig.to_string(), None); if let Some(tx_sender) = &self.tx_sender { tx_sender.enqnueue_tx(raw_tx); diff --git a/src/metrics.rs b/src/metrics.rs index 55a444b6..d676b365 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -2,9 +2,11 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Default, Debug, Serialize, Deserialize)] pub struct Metrics { - pub total_txs: usize, - pub txs_confirmed: usize, - pub txs_finalized: usize, - pub in_secs: u64 + pub total_txs_sent: usize, + pub total_txs_confirmed: usize, + pub total_txs_finalized: usize, + pub txs_sent_in_one_sec: usize, + pub txs_confirmed_in_one_sec: usize, + pub txs_finalized_in_one_sec: usize } diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 262ef520..0d68dab5 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -18,14 +18,15 @@ use solana_sdk::{ use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus}; use tokio::{sync::RwLock, task::JoinHandle}; + + /// Background worker which listen's to new blocks /// and keeps a track of confirmed txs #[derive(Clone)] pub struct BlockListener { pub_sub_client: Arc, commitment_config: CommitmentConfig, - - pub blocks: Arc>, + txs_sent: Arc>>, latest_block_info: Arc>, signature_subscribers: Arc>, } @@ -38,18 +39,18 @@ struct BlockInformation { impl BlockListener { pub async fn new( + pub_sub_client: Arc, rpc_client: Arc, - ws_url: &str, + txs_sent: Arc>>, commitment_config: CommitmentConfig, ) -> anyhow::Result { - let pub_sub_client = Arc::new(PubsubClient::new(ws_url).await?); let (latest_block_hash, block_height) = rpc_client .get_latest_blockhash_with_commitment(commitment_config) .await?; Ok(Self { pub_sub_client, - blocks: Default::default(), + txs_sent, latest_block_info: Arc::new(RwLock::new(BlockInformation { slot: rpc_client.get_slot().await?, blockhash: latest_block_hash.to_string(), @@ -66,7 +67,11 @@ impl BlockListener { let mut commitment_levels = Vec::with_capacity(sigs.len()); for sig in sigs { - commitment_levels.push(self.blocks.get(sig).map(|some| some.value().clone())); + commitment_levels.push( + self.txs_sent + .get(sig) + .map_or_else(|| None, |some| some.value().clone()), + ); } commitment_levels @@ -75,7 +80,7 @@ impl BlockListener { pub async fn num_of_sigs_commited(&self, sigs: &[String]) -> usize { let mut num_of_sigs_commited = 0; for sig in sigs { - if self.blocks.contains_key(sig) { + if self.txs_sent.contains_key(sig) { num_of_sigs_commited += 1; } } @@ -156,8 +161,20 @@ impl BlockListener { }; for sig in signatures { + let Some(mut tx_status) = self.txs_sent.get_mut(&sig) else { + continue; + }; + info!("{comfirmation_status:?} {sig}"); + *tx_status.value_mut() = Some(TransactionStatus { + slot, + confirmations: None, //TODO: talk about this + status: Ok(()), // legacy field + err: None, + confirmation_status: Some(comfirmation_status.clone()), + }); + // subscribers if let Some((sig, mut sink)) = self.signature_subscribers.remove(&sig) { warn!("notification {}", sig); @@ -171,17 +188,6 @@ impl BlockListener { }) .unwrap(); } - - self.blocks.insert( - sig, - TransactionStatus { - slot, - confirmations: None, //TODO: talk about this - status: Ok(()), // legacy field - err: None, - confirmation_status: Some(comfirmation_status.clone()), - }, - ); } } diff --git a/tests/workers.rs b/tests/workers.rs index 30c97606..bdb6e953 100644 --- a/tests/workers.rs +++ b/tests/workers.rs @@ -2,15 +2,19 @@ use std::sync::Arc; use std::time::Duration; use bench_utils::helpers::BenchHelper; +use dashmap::DashMap; use futures::future::try_join_all; use lite_rpc::{ encoding::BinaryEncoding, workers::{BlockListener, TxSender}, DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR, }; -use solana_client::nonblocking::{rpc_client::RpcClient, tpu_client::TpuClient}; +use solana_client::nonblocking::{ + pubsub_client::PubsubClient, rpc_client::RpcClient, tpu_client::TpuClient, +}; use solana_sdk::{commitment_config::CommitmentConfig, native_token::LAMPORTS_PER_SOL}; +use solana_transaction_status::TransactionConfirmationStatus; #[tokio::test] async fn send_and_confirm_txs() { @@ -24,9 +28,13 @@ async fn send_and_confirm_txs() { .unwrap(), ); + let pub_sub_client = Arc::new(PubsubClient::new(DEFAULT_WS_ADDR).await.unwrap()); + let txs_sent = Arc::new(DashMap::new()); + let block_listener = BlockListener::new( + pub_sub_client.clone(), rpc_client.clone(), - DEFAULT_WS_ADDR, + txs_sent.clone(), CommitmentConfig::confirmed(), ) .await @@ -56,8 +64,14 @@ async fn send_and_confirm_txs() { let sig = sig.to_string(); for _ in 0..2 { - if block_listener.blocks.contains_key(&sig) { - return; + let tx_status = txs_sent.get(&sig).unwrap(); + + if let Some(tx_status) = tx_status.value() { + if tx_status.confirmation_status() + == TransactionConfirmationStatus::Confirmed + { + return; + } } tokio::time::sleep(Duration::from_millis(800)).await;