Merge pull request #152 from blockworks-foundation/replacing_slot_pubsub_with_rpc_polling

replacing slot pubsub because it is not working well
This commit is contained in:
galactus 2023-06-23 13:00:13 +02:00 committed by GitHub
commit 7219359326
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 39 deletions

View File

@ -1,8 +1,6 @@
use crate::structures::identity_stakes::IdentityStakes;
use anyhow::{bail, Context};
use futures::StreamExt;
use anyhow::Context;
use log::info;
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::ConnectionPeerType;
@ -62,39 +60,19 @@ impl SolanaUtils {
pub async fn poll_slots(
rpc_client: &RpcClient,
rpc_ws_address: &str,
update_slot: impl Fn(u64),
) -> anyhow::Result<()> {
let pubsub_client = PubsubClient::new(rpc_ws_address)
.await
.context("Error creating pubsub client")?;
let slot = rpc_client
.get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig {
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
})
.await
.context("Error getting slot")?;
update_slot(slot);
let (mut client, unsub) =
tokio::time::timeout(Duration::from_millis(1000), pubsub_client.slot_subscribe())
let mut poll_frequency = tokio::time::interval(Duration::from_millis(50));
loop {
let slot = rpc_client
.get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig {
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
})
.await
.context("Timeout subscribing to slots")?
.context("Slot pub sub disconnected")?;
while let Ok(slot_info) =
tokio::time::timeout(Duration::from_millis(2000), client.next()).await
{
if let Some(slot_info) = slot_info {
update_slot(slot_info.slot);
}
.context("Error getting slot")?;
update_slot(slot);
poll_frequency.tick().await;
}
unsub();
bail!("Slot pub sub disconnected")
}
// Estimates the slots, either from polled slot or by forcefully updating after every 400ms

View File

@ -88,7 +88,7 @@ pub struct LiteBridge {
impl LiteBridge {
pub async fn new(
rpc_url: String,
ws_addr: String,
_ws_addr: String,
fanout_slots: u64,
identity: Keypair,
retry_after: Duration,
@ -122,7 +122,6 @@ impl LiteBridge {
Arc::new(identity),
current_slot,
rpc_client.clone(),
ws_addr,
tx_store.clone(),
)
.await?;

View File

@ -57,7 +57,6 @@ pub struct TpuService {
current_slot: Arc<AtomicU64>,
estimated_slot: Arc<AtomicU64>,
rpc_client: Arc<RpcClient>,
rpc_ws_address: String,
broadcast_sender: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
tpu_connection_manager: Arc<TpuConnectionManager>,
identity_stakes: Arc<RwLock<IdentityStakes>>,
@ -73,7 +72,6 @@ impl TpuService {
identity: Arc<Keypair>,
current_slot: Slot,
rpc_client: Arc<RpcClient>,
rpc_ws_address: String,
txs_sent_store: TxStore,
) -> anyhow::Result<Self> {
let (sender, _) = tokio::sync::broadcast::channel(config.maximum_transaction_in_queue);
@ -91,7 +89,6 @@ impl TpuService {
estimated_slot: Arc::new(AtomicU64::new(current_slot)),
leader_schedule: Arc::new(LeaderSchedule::new(config.number_of_leaders_to_cache)),
rpc_client,
rpc_ws_address,
broadcast_sender: Arc::new(sender),
tpu_connection_manager: Arc::new(tpu_connection_manager),
identity_stakes: Arc::new(RwLock::new(IdentityStakes::default())),
@ -173,7 +170,6 @@ impl TpuService {
) -> AnyhowJoinHandle {
let current_slot = self.current_slot.clone();
let rpc_client = self.rpc_client.clone();
let rpc_ws_address = self.rpc_ws_address.clone();
let update_slot = move |slot: u64| {
if slot > current_slot.load(Ordering::Relaxed) {
@ -188,7 +184,7 @@ impl TpuService {
while nb_error < max_nb_errors {
// always loop update the current slots as it is central to working of TPU
let Err(err) = SolanaUtils::poll_slots(&rpc_client, &rpc_ws_address, &update_slot).await else {
let Err(err) = SolanaUtils::poll_slots(&rpc_client, &update_slot).await else {
nb_error = 0;
continue;
};