fix slot polling issue

This commit is contained in:
Godmode Galactus 2023-09-02 08:29:50 +02:00
parent 50c3b43a8b
commit 9fd6af9f98
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
2 changed files with 62 additions and 55 deletions

View File

@ -18,13 +18,8 @@ pub fn create_json_rpc_polling_subscription(
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10); let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10);
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10); let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10);
let mut endpoint_tasks = vec![]; let mut endpoint_tasks =
let slot_polling_task = tokio::spawn(poll_slots( poll_slots(rpc_client.clone(), CommitmentConfig::processed(), slot_sx)?;
rpc_client.clone(),
CommitmentConfig::processed(),
slot_sx,
));
endpoint_tasks.push(slot_polling_task);
let mut block_polling_tasks = let mut block_polling_tasks =
poll_block(rpc_client.clone(), block_sx, slot_notifier.resubscribe()); poll_block(rpc_client.clone(), block_sx, slot_notifier.resubscribe());

View File

@ -1,8 +1,8 @@
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use anyhow::Context; use anyhow::{bail, Context};
use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::structures::slot_notification::SlotNotification; use solana_lite_rpc_core::{structures::slot_notification::SlotNotification, AnyhowJoinHandle};
use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot}; use solana_sdk::{commitment_config::CommitmentConfig, slot_history::Slot};
use tokio::sync::broadcast::Sender; use tokio::sync::broadcast::Sender;
const AVERAGE_SLOT_CHANGE_TIME: Duration = Duration::from_millis(400); const AVERAGE_SLOT_CHANGE_TIME: Duration = Duration::from_millis(400);
@ -14,74 +14,86 @@ pub async fn poll_commitment_slots(
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut poll_frequency = tokio::time::interval(Duration::from_millis(10)); let mut poll_frequency = tokio::time::interval(Duration::from_millis(10));
let mut last_slot = 0; let mut last_slot = 0;
let mut errors = 0;
loop { loop {
let slot = rpc_client let slot = rpc_client.get_slot_with_commitment(commitment_config).await;
.get_slot_with_commitment(commitment_config) match slot {
.await Ok(slot) => {
.context("Error getting slot")?; if slot > last_slot {
if slot > last_slot { // send
// send slot_tx.send(slot).context("Error sending slot")?;
slot_tx.send(slot).context("Error sending slot")?; last_slot = slot;
last_slot = slot; }
errors = 0;
}
Err(e) => {
errors += 1;
if errors > 10 {
bail!("Exceeded error count to get slots from rpc {e:?}");
}
}
} }
// wait for next poll i.e at least 50ms // wait for next poll i.e at least 50ms
poll_frequency.tick().await; poll_frequency.tick().await;
} }
} }
pub async fn poll_slots( pub fn poll_slots(
rpc_client: Arc<RpcClient>, rpc_client: Arc<RpcClient>,
commitment_config: CommitmentConfig, commitment_config: CommitmentConfig,
sender: Sender<SlotNotification>, sender: Sender<SlotNotification>,
) -> anyhow::Result<()> { ) -> anyhow::Result<Vec<AnyhowJoinHandle>> {
let slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await
.context("Error getting slot")?;
let mut current_slot = slot;
let mut estimated_slot = slot;
// processed slot update task // processed slot update task
let (slot_update_sx, mut slot_update_rx) = tokio::sync::mpsc::unbounded_channel(); let (slot_update_sx, mut slot_update_rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(poll_commitment_slots( let task1 = tokio::spawn(poll_commitment_slots(
rpc_client, rpc_client.clone(),
commitment_config, commitment_config,
slot_update_sx, slot_update_sx,
)); ));
loop { let task2 = tokio::spawn(async move {
match tokio::time::timeout(AVERAGE_SLOT_CHANGE_TIME, slot_update_rx.recv()).await { let slot = rpc_client
Ok(Some(slot)) => { .get_slot_with_commitment(CommitmentConfig::confirmed())
// slot is latest .await
if slot > current_slot { .context("Error getting slot")?;
current_slot = slot;
if current_slot > estimated_slot { let mut current_slot = slot;
estimated_slot = slot; let mut estimated_slot = slot;
loop {
match tokio::time::timeout(AVERAGE_SLOT_CHANGE_TIME, slot_update_rx.recv()).await {
Ok(Some(slot)) => {
// slot is latest
if slot > current_slot {
current_slot = slot;
if current_slot > estimated_slot {
estimated_slot = slot;
}
sender
.send(SlotNotification {
processed_slot: current_slot,
estimated_processed_slot: estimated_slot,
})
.context("Cannot send slot notification")?;
}
}
Ok(None) => log::error!("got nothing from slot update notifier"),
Err(err) => {
log::warn!("failed to receive slot update: {err}");
// force update the slot
// estimated slot should not go ahead more than 32 slots
// this is because it may be a slot block
if estimated_slot < current_slot + 32 {
estimated_slot += 1;
} }
sender sender
.send(SlotNotification { .send(SlotNotification {
processed_slot: current_slot, processed_slot: current_slot,
estimated_processed_slot: estimated_slot, estimated_processed_slot: estimated_slot,
}) })
.context("Cannot send slot notification")?; .context("Connot send slot notification")?;
} }
} }
Ok(None) => log::error!("got nothing from slot update notifier"),
Err(err) => {
log::warn!("failed to receive slot update: {err}");
// force update the slot
// estimated slot should not go ahead more than 32 slots
// this is because it may be a slot block
if estimated_slot < current_slot + 32 {
estimated_slot += 1;
}
sender
.send(SlotNotification {
processed_slot: current_slot,
estimated_processed_slot: estimated_slot,
})
.context("Connot send slot notification")?;
}
} }
} });
Ok(vec![task1, task2])
} }