From 9fd6af9f98f2feeb473f4bcc5b64c51d63d258d2 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Sat, 2 Sep 2023 08:29:50 +0200 Subject: [PATCH] fix slot polling issue --- .../src/json_rpc_subscription.rs | 9 +- .../src/rpc_polling/poll_slots.rs | 108 ++++++++++-------- 2 files changed, 62 insertions(+), 55 deletions(-) diff --git a/cluster-endpoints/src/json_rpc_subscription.rs b/cluster-endpoints/src/json_rpc_subscription.rs index 11196a62..ccf354be 100644 --- a/cluster-endpoints/src/json_rpc_subscription.rs +++ b/cluster-endpoints/src/json_rpc_subscription.rs @@ -18,13 +18,8 @@ pub fn create_json_rpc_polling_subscription( let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10); let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10); - let mut endpoint_tasks = vec![]; - let slot_polling_task = tokio::spawn(poll_slots( - rpc_client.clone(), - CommitmentConfig::processed(), - slot_sx, - )); - endpoint_tasks.push(slot_polling_task); + let mut endpoint_tasks = + poll_slots(rpc_client.clone(), CommitmentConfig::processed(), slot_sx)?; let mut block_polling_tasks = poll_block(rpc_client.clone(), block_sx, slot_notifier.resubscribe()); diff --git a/cluster-endpoints/src/rpc_polling/poll_slots.rs b/cluster-endpoints/src/rpc_polling/poll_slots.rs index 73812bea..16e625df 100644 --- a/cluster-endpoints/src/rpc_polling/poll_slots.rs +++ b/cluster-endpoints/src/rpc_polling/poll_slots.rs @@ -1,8 +1,8 @@ use std::{sync::Arc, time::Duration}; -use anyhow::Context; +use anyhow::{bail, Context}; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_lite_rpc_core::structures::slot_notification::SlotNotification; +use solana_lite_rpc_core::{structures::slot_notification::SlotNotification, AnyhowJoinHandle}; use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot}; use tokio::sync::broadcast::Sender; const AVERAGE_SLOT_CHANGE_TIME: Duration = Duration::from_millis(400); @@ -14,74 +14,86 @@ pub async fn poll_commitment_slots( ) -> anyhow::Result<()> { let mut poll_frequency = tokio::time::interval(Duration::from_millis(10)); let mut last_slot = 0; + let mut errors = 0; loop { - let slot = rpc_client - .get_slot_with_commitment(commitment_config) - .await - .context("Error getting slot")?; - if slot > last_slot { - // send - slot_tx.send(slot).context("Error sending slot")?; - last_slot = slot; + let slot = rpc_client.get_slot_with_commitment(commitment_config).await; + match slot { + Ok(slot) => { + if slot > last_slot { + // send + slot_tx.send(slot).context("Error sending slot")?; + last_slot = slot; + } + errors = 0; + } + Err(e) => { + errors += 1; + if errors > 10 { + bail!("Exceeded error count to get slots from rpc {e:?}"); + } + } } // wait for next poll i.e at least 50ms poll_frequency.tick().await; } } -pub async fn poll_slots( +pub fn poll_slots( rpc_client: Arc, commitment_config: CommitmentConfig, sender: Sender, -) -> anyhow::Result<()> { - let slot = rpc_client - .get_slot_with_commitment(CommitmentConfig::confirmed()) - .await - .context("Error getting slot")?; - - let mut current_slot = slot; - let mut estimated_slot = slot; - +) -> anyhow::Result> { // processed slot update task let (slot_update_sx, mut slot_update_rx) = tokio::sync::mpsc::unbounded_channel(); - tokio::spawn(poll_commitment_slots( - rpc_client, + let task1 = tokio::spawn(poll_commitment_slots( + rpc_client.clone(), commitment_config, slot_update_sx, )); - loop { - match tokio::time::timeout(AVERAGE_SLOT_CHANGE_TIME, slot_update_rx.recv()).await { - Ok(Some(slot)) => { - // slot is latest - if slot > current_slot { - current_slot = slot; - if current_slot > estimated_slot { - estimated_slot = slot; + let task2 = tokio::spawn(async move { + let slot = rpc_client + .get_slot_with_commitment(CommitmentConfig::confirmed()) + .await + .context("Error getting slot")?; + + let mut current_slot = slot; + let mut estimated_slot = slot; + + loop { + match tokio::time::timeout(AVERAGE_SLOT_CHANGE_TIME, slot_update_rx.recv()).await { + Ok(Some(slot)) => { + // slot is latest + if slot > current_slot { + current_slot = slot; + if current_slot > estimated_slot { + estimated_slot = slot; + } + sender + .send(SlotNotification { + processed_slot: current_slot, + estimated_processed_slot: estimated_slot, + }) + .context("Cannot send slot notification")?; + } + } + Ok(None) => log::error!("got nothing from slot update notifier"), + Err(err) => { + log::warn!("failed to receive slot update: {err}"); + // force update the slot + // estimated slot should not go ahead more than 32 slots + // this is because it may be a slot block + if estimated_slot < current_slot + 32 { + estimated_slot += 1; } sender .send(SlotNotification { processed_slot: current_slot, estimated_processed_slot: estimated_slot, }) - .context("Cannot send slot notification")?; + .context("Connot send slot notification")?; } } - Ok(None) => log::error!("got nothing from slot update notifier"), - Err(err) => { - log::warn!("failed to receive slot update: {err}"); - // force update the slot - // estimated slot should not go ahead more than 32 slots - // this is because it may be a slot block - if estimated_slot < current_slot + 32 { - estimated_slot += 1; - } - sender - .send(SlotNotification { - processed_slot: current_slot, - estimated_processed_slot: estimated_slot, - }) - .context("Connot send slot notification")?; - } } - } + }); + Ok(vec![task1, task2]) }