From a20d23e924a1cb7c3d36b9f384b2c25f02e273dd Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Sun, 5 Feb 2023 15:08:17 +0530 Subject: [PATCH] reset pub_sub client --- src/bridge.rs | 12 +- src/workers/block_listenser.rs | 335 +++++++++++++++++---------------- src/workers/tx_sender.rs | 2 +- tests/workers.rs | 5 +- 4 files changed, 176 insertions(+), 178 deletions(-) diff --git a/src/bridge.rs b/src/bridge.rs index 2d73d9f3..da7f3134 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -18,16 +18,13 @@ use log::info; use jsonrpsee::{server::ServerBuilder, types::SubscriptionResult, SubscriptionSink}; -use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient; use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction}; use solana_rpc_client_api::{ config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig}, response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo}, }; use solana_sdk::{ - commitment_config::{CommitmentConfig}, - hash::Hash, - pubkey::Pubkey, + commitment_config::CommitmentConfig, hash::Hash, pubkey::Pubkey, transaction::VersionedTransaction, }; use solana_transaction_status::TransactionStatus; @@ -51,7 +48,6 @@ pub struct LiteBridge { impl LiteBridge { pub async fn new(rpc_url: String, ws_addr: String, fanout_slots: u64) -> anyhow::Result { let rpc_client = Arc::new(RpcClient::new(rpc_url.clone())); - let pub_sub_client = Arc::new(PubsubClient::new(&ws_addr).await?); let tpu_manager = Arc::new(TpuManager::new(rpc_client.clone(), ws_addr, fanout_slots).await?); @@ -60,11 +56,7 @@ impl LiteBridge { let block_store = BlockStore::new(&rpc_client).await?; - let block_listner = BlockListener::new( - pub_sub_client.clone(), - tx_sender.clone(), - block_store.clone(), - ); + let block_listner = BlockListener::new(tx_sender.clone(), block_store.clone()); Ok(Self { rpc_client, diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index f0d86174..bd0172b7 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -4,7 +4,7 @@ use anyhow::{bail, Context}; use dashmap::DashMap; use futures::StreamExt; use jsonrpsee::SubscriptionSink; -use log::info; +use log::{info, warn}; use prometheus::{histogram_opts, opts, register_counter, register_histogram, Counter, Histogram}; use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient; use solana_rpc_client::rpc_client::SerializableTransaction; @@ -49,7 +49,6 @@ lazy_static::lazy_static! { /// and keeps a track of confirmed txs #[derive(Clone)] pub struct BlockListener { - pub_sub_client: Arc, tx_sender: TxSender, block_store: BlockStore, pub signature_subscribers: Arc>, @@ -67,13 +66,8 @@ pub struct BlockListnerNotificatons { } impl BlockListener { - pub fn new( - pub_sub_client: Arc, - tx_sender: TxSender, - block_store: BlockStore, - ) -> Self { + pub fn new(tx_sender: TxSender, block_store: BlockStore) -> Self { Self { - pub_sub_client, tx_sender, block_store, signature_subscribers: Default::default(), @@ -98,168 +92,183 @@ impl BlockListener { self.signature_subscribers.remove(&signature); } + pub async fn listen_from_pubsub( + self, + pubsub_client: &PubsubClient, + commitment_config: CommitmentConfig, + postgres: &Option, + ) -> anyhow::Result<()> { + let commitment = commitment_config.commitment; + + let comfirmation_status = match commitment { + CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized, + _ => TransactionConfirmationStatus::Confirmed, + }; + + info!("Subscribing to {commitment:?} blocks"); + + let (mut recv, _) = pubsub_client + .block_subscribe( + RpcBlockSubscribeFilter::All, + Some(RpcBlockSubscribeConfig { + commitment: Some(commitment_config), + encoding: None, + transaction_details: Some(solana_transaction_status::TransactionDetails::Full), + show_rewards: None, + max_supported_transaction_version: None, + }), + ) + .await + .context("Error calling block_subscribe")?; + + info!("Listening to {commitment:?} blocks"); + + loop { + let timer = if commitment_config.is_finalized() { + TT_RECV_FIN_BLOCK.start_timer() + } else { + TT_RECV_CON_BLOCK.start_timer() + }; + + let Some(block) = recv.as_mut().next().await else { + bail!("PubSub broke"); + }; + + timer.observe_duration(); + + let slot = block.context.slot; + + let Some(block) = block.value.block else { + continue; + }; + + let Some(block_height) = block.block_height else { + continue; + }; + + let blockhash = block.blockhash; + + let Some(transactions) = block.transactions else { + continue; + }; + + let parent_slot = block.parent_slot; + + self.block_store + .add_block( + blockhash.clone(), + BlockInformation { slot, block_height }, + commitment_config, + ) + .await; + + if let Some(postgres) = &postgres { + let Some(rewards) = block.rewards else { + continue; + }; + + let Some(leader_reward) = rewards + .iter() + .find(|reward| Some(RewardType::Fee) == reward.reward_type) else { + continue; + }; + + let _leader_id = &leader_reward.pubkey; + + postgres + .send(PostgresMsg::PostgresBlock(PostgresBlock { + slot: slot as i64, + leader_id: 0, //FIX: + parent_slot: parent_slot as i64, + })) + .expect("Error sending block to postgres service"); + } + + for tx in transactions { + let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else { + info!("tx with no meta"); + continue; + }; + + let Some(tx) = tx.transaction.decode() else { + info!("unable to decode tx"); + continue; + }; + + let sig = tx.get_signature().to_string(); + + if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) { + // + // Metrics + // + if status.is_ok() { + if commitment_config.is_finalized() { + TXS_FINALIZED.inc(); + } else { + TXS_CONFIRMED.inc(); + } + } + + tx_status.value_mut().status = Some(TransactionStatus { + slot, + confirmations: None, + status, + err: err.clone(), + confirmation_status: Some(comfirmation_status.clone()), + }); + + // + // Write to postgres + // + if let Some(postgres) = &postgres { + let cu_consumed = match compute_units_consumed { + OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64), + _ => None, + }; + + postgres + .send(PostgresMsg::PostgresUpdateTx( + PostgresUpdateTx { + processed_slot: slot as i64, + cu_consumed, + cu_requested: None, //TODO: cu requested + }, + sig.clone(), + )) + .unwrap(); + } + }; + + // subscribers + if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) { + // none if transaction succeeded + sink.send(&RpcResponse { + context: RpcResponseContext { + slot, + api_version: None, + }, + value: serde_json::json!({ "err": err }), + })?; + } + } + } + } + pub fn listen( self, commitment_config: CommitmentConfig, postgres: Option, ) -> JoinHandle> { tokio::spawn(async move { - let commitment = commitment_config.commitment; - - let comfirmation_status = match commitment { - CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized, - _ => TransactionConfirmationStatus::Confirmed, - }; - - info!("Subscribing to {commitment:?} blocks"); - - let (mut recv, _) = self - .pub_sub_client - .block_subscribe( - RpcBlockSubscribeFilter::All, - Some(RpcBlockSubscribeConfig { - commitment: Some(commitment_config), - encoding: None, - transaction_details: Some( - solana_transaction_status::TransactionDetails::Full, - ), - show_rewards: None, - max_supported_transaction_version: None, - }), - ) - .await - .context("Error calling block_subscribe")?; - - info!("Listening to {commitment:?} blocks"); - loop { - let timer = if commitment_config.is_finalized() { - TT_RECV_FIN_BLOCK.start_timer() - } else { - TT_RECV_CON_BLOCK.start_timer() - }; - - let Some(block) = recv.as_mut().next().await else { - bail!("PubSub broke"); - }; - - timer.observe_duration(); - - let slot = block.context.slot; - - let Some(block) = block.value.block else { - continue; - }; - - let Some(block_height) = block.block_height else { - continue; - }; - - let blockhash = block.blockhash; - - let Some(transactions) = block.transactions else { - continue; - }; - - let parent_slot = block.parent_slot; - - self.block_store - .add_block( - blockhash.clone(), - BlockInformation { slot, block_height }, - commitment_config, - ) - .await; - - if let Some(postgres) = &postgres { - let Some(rewards) = block.rewards else { - continue; - }; - - let Some(leader_reward) = rewards - .iter() - .find(|reward| Some(RewardType::Fee) == reward.reward_type) else { - continue; - }; - - let _leader_id = &leader_reward.pubkey; - - postgres - .send(PostgresMsg::PostgresBlock(PostgresBlock { - slot: slot as i64, - leader_id: 0, //FIX: - parent_slot: parent_slot as i64, - })) - .expect("Error sending block to postgres service"); - } - - for tx in transactions { - let Some(UiTransactionStatusMeta { err, status, compute_units_consumed ,.. }) = tx.meta else { - info!("tx with no meta"); - continue; - }; - - let Some(tx) = tx.transaction.decode() else { - info!("unable to decode tx"); - continue; - }; - - let sig = tx.get_signature().to_string(); - - if let Some(mut tx_status) = self.tx_sender.txs_sent.get_mut(&sig) { - // - // Metrics - // - if status.is_ok() { - if commitment_config.is_finalized() { - TXS_FINALIZED.inc(); - } else { - TXS_CONFIRMED.inc(); - } - } - - tx_status.value_mut().status = Some(TransactionStatus { - slot, - confirmations: None, - status, - err: err.clone(), - confirmation_status: Some(comfirmation_status.clone()), - }); - - // - // Write to postgres - // - if let Some(postgres) = &postgres { - let cu_consumed = match compute_units_consumed { - OptionSerializer::Some(cu_consumed) => Some(cu_consumed as i64), - _ => None, - }; - - postgres - .send(PostgresMsg::PostgresUpdateTx( - PostgresUpdateTx { - processed_slot: slot as i64, - cu_consumed, - cu_requested: None, //TODO: cu requested - }, - sig.clone(), - )) - .unwrap(); - } - }; - - // subscribers - if let Some((_sig, mut sink)) = self.signature_subscribers.remove(&sig) { - // none if transaction succeeded - sink.send(&RpcResponse { - context: RpcResponseContext { - slot, - api_version: None, - }, - value: serde_json::json!({ "err": err }), - })?; - } - } + let ws_addr = &self.tx_sender.tpu_manager.ws_addr; + let pub_sub_client = PubsubClient::new(ws_addr).await?; + let err = self + .clone() + .listen_from_pubsub(&pub_sub_client, commitment_config, &postgres) + .await + .unwrap_err(); + warn!("{commitment_config:?} Block Subscribe error {err}"); } }) } diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index 836e20a4..318bfdd8 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -34,7 +34,7 @@ pub struct TxSender { /// Tx(s) forwarded to tpu pub txs_sent: Arc>, /// TpuClient to call the tpu port - tpu_manager: Arc, + pub tpu_manager: Arc, } /// Transaction Properties diff --git a/tests/workers.rs b/tests/workers.rs index d1dd5c08..6210dfe4 100644 --- a/tests/workers.rs +++ b/tests/workers.rs @@ -10,7 +10,6 @@ use lite_rpc::{ DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS, DEFAULT_TX_BATCH_SIZE, DEFAULT_WS_ADDR, }; -use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::commitment_config::CommitmentConfig; @@ -33,12 +32,10 @@ async fn send_and_confirm_txs() { .unwrap(), ); - let pub_sub_client = Arc::new(PubsubClient::new(DEFAULT_WS_ADDR).await.unwrap()); - let tx_sender = TxSender::new(tpu_client); let block_store = BlockStore::new(&rpc_client).await.unwrap(); - let block_listener = BlockListener::new(pub_sub_client.clone(), tx_sender.clone(), block_store); + let block_listener = BlockListener::new(tx_sender.clone(), block_store); let (tx_send, tx_recv) = mpsc::unbounded_channel();