diff --git a/core/src/solana_utils.rs b/core/src/solana_utils.rs index 37eb8d31..21bdc860 100644 --- a/core/src/solana_utils.rs +++ b/core/src/solana_utils.rs @@ -1,8 +1,6 @@ use crate::structures::identity_stakes::IdentityStakes; -use anyhow::{bail, Context}; -use futures::StreamExt; +use anyhow::Context; use log::info; -use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::pubkey::Pubkey; use solana_streamer::nonblocking::quic::ConnectionPeerType; @@ -62,39 +60,19 @@ impl SolanaUtils { pub async fn poll_slots( rpc_client: &RpcClient, - rpc_ws_address: &str, update_slot: impl Fn(u64), ) -> anyhow::Result<()> { - let pubsub_client = PubsubClient::new(rpc_ws_address) - .await - .context("Error creating pubsub client")?; - - let slot = rpc_client - .get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig { - commitment: solana_sdk::commitment_config::CommitmentLevel::Processed, - }) - .await - .context("Error getting slot")?; - - update_slot(slot); - - let (mut client, unsub) = - tokio::time::timeout(Duration::from_millis(1000), pubsub_client.slot_subscribe()) + let mut poll_frequency = tokio::time::interval(Duration::from_millis(50)); + loop { + let slot = rpc_client + .get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig { + commitment: solana_sdk::commitment_config::CommitmentLevel::Processed, + }) .await - .context("Timeout subscribing to slots")? - .context("Slot pub sub disconnected")?; - - while let Ok(slot_info) = - tokio::time::timeout(Duration::from_millis(2000), client.next()).await - { - if let Some(slot_info) = slot_info { - update_slot(slot_info.slot); - } + .context("Error getting slot")?; + update_slot(slot); + poll_frequency.tick().await; } - - unsub(); - - bail!("Slot pub sub disconnected") } // Estimates the slots, either from polled slot or by forcefully updating after every 400ms diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index 8deaa2ce..6ec5e5c0 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -88,7 +88,7 @@ pub struct LiteBridge { impl LiteBridge { pub async fn new( rpc_url: String, - ws_addr: String, + _ws_addr: String, fanout_slots: u64, identity: Keypair, retry_after: Duration, @@ -122,7 +122,6 @@ impl LiteBridge { Arc::new(identity), current_slot, rpc_client.clone(), - ws_addr, tx_store.clone(), ) .await?; diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index c12e0fe4..f646c3ae 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -57,7 +57,6 @@ pub struct TpuService { current_slot: Arc, estimated_slot: Arc, rpc_client: Arc, - rpc_ws_address: String, broadcast_sender: Arc)>>, tpu_connection_manager: Arc, identity_stakes: Arc>, @@ -73,7 +72,6 @@ impl TpuService { identity: Arc, current_slot: Slot, rpc_client: Arc, - rpc_ws_address: String, txs_sent_store: TxStore, ) -> anyhow::Result { let (sender, _) = tokio::sync::broadcast::channel(config.maximum_transaction_in_queue); @@ -91,7 +89,6 @@ impl TpuService { estimated_slot: Arc::new(AtomicU64::new(current_slot)), leader_schedule: Arc::new(LeaderSchedule::new(config.number_of_leaders_to_cache)), rpc_client, - rpc_ws_address, broadcast_sender: Arc::new(sender), tpu_connection_manager: Arc::new(tpu_connection_manager), identity_stakes: Arc::new(RwLock::new(IdentityStakes::default())), @@ -173,7 +170,6 @@ impl TpuService { ) -> AnyhowJoinHandle { let current_slot = self.current_slot.clone(); let rpc_client = self.rpc_client.clone(); - let rpc_ws_address = self.rpc_ws_address.clone(); let update_slot = move |slot: u64| { if slot > current_slot.load(Ordering::Relaxed) { @@ -188,7 +184,7 @@ impl TpuService { while nb_error < max_nb_errors { // always loop update the current slots as it is central to working of TPU - let Err(err) = SolanaUtils::poll_slots(&rpc_client, &rpc_ws_address, &update_slot).await else { + let Err(err) = SolanaUtils::poll_slots(&rpc_client, &update_slot).await else { nb_error = 0; continue; };