replacing slot pubsub because it is not working well
This commit is contained in:
parent
4eca04bfa3
commit
04868028b8
|
@ -1,8 +1,6 @@
|
|||
use crate::structures::identity_stakes::IdentityStakes;
|
||||
use anyhow::{bail, Context};
|
||||
use futures::StreamExt;
|
||||
use anyhow::Context;
|
||||
use log::info;
|
||||
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
|
||||
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_streamer::nonblocking::quic::ConnectionPeerType;
|
||||
|
@ -62,39 +60,19 @@ impl SolanaUtils {
|
|||
|
||||
pub async fn poll_slots(
|
||||
rpc_client: &RpcClient,
|
||||
rpc_ws_address: &str,
|
||||
update_slot: impl Fn(u64),
|
||||
) -> anyhow::Result<()> {
|
||||
let pubsub_client = PubsubClient::new(rpc_ws_address)
|
||||
.await
|
||||
.context("Error creating pubsub client")?;
|
||||
|
||||
let slot = rpc_client
|
||||
.get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig {
|
||||
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
|
||||
})
|
||||
.await
|
||||
.context("Error getting slot")?;
|
||||
|
||||
update_slot(slot);
|
||||
|
||||
let (mut client, unsub) =
|
||||
tokio::time::timeout(Duration::from_millis(1000), pubsub_client.slot_subscribe())
|
||||
let mut poll_frequency = tokio::time::interval(Duration::from_millis(50));
|
||||
loop {
|
||||
let slot = rpc_client
|
||||
.get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig {
|
||||
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
|
||||
})
|
||||
.await
|
||||
.context("Timeout subscribing to slots")?
|
||||
.context("Slot pub sub disconnected")?;
|
||||
|
||||
while let Ok(slot_info) =
|
||||
tokio::time::timeout(Duration::from_millis(2000), client.next()).await
|
||||
{
|
||||
if let Some(slot_info) = slot_info {
|
||||
update_slot(slot_info.slot);
|
||||
}
|
||||
.context("Error getting slot")?;
|
||||
update_slot(slot);
|
||||
poll_frequency.tick().await;
|
||||
}
|
||||
|
||||
unsub();
|
||||
|
||||
bail!("Slot pub sub disconnected")
|
||||
}
|
||||
|
||||
// Estimates the slots, either from polled slot or by forcefully updating after every 400ms
|
||||
|
|
|
@ -88,7 +88,7 @@ pub struct LiteBridge {
|
|||
impl LiteBridge {
|
||||
pub async fn new(
|
||||
rpc_url: String,
|
||||
ws_addr: String,
|
||||
_ws_addr: String,
|
||||
fanout_slots: u64,
|
||||
identity: Keypair,
|
||||
retry_after: Duration,
|
||||
|
@ -122,7 +122,6 @@ impl LiteBridge {
|
|||
Arc::new(identity),
|
||||
current_slot,
|
||||
rpc_client.clone(),
|
||||
ws_addr,
|
||||
tx_store.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
|
|
@ -57,7 +57,6 @@ pub struct TpuService {
|
|||
current_slot: Arc<AtomicU64>,
|
||||
estimated_slot: Arc<AtomicU64>,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
rpc_ws_address: String,
|
||||
broadcast_sender: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
|
||||
tpu_connection_manager: Arc<TpuConnectionManager>,
|
||||
identity_stakes: Arc<RwLock<IdentityStakes>>,
|
||||
|
@ -73,7 +72,6 @@ impl TpuService {
|
|||
identity: Arc<Keypair>,
|
||||
current_slot: Slot,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
rpc_ws_address: String,
|
||||
txs_sent_store: TxStore,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (sender, _) = tokio::sync::broadcast::channel(config.maximum_transaction_in_queue);
|
||||
|
@ -91,7 +89,6 @@ impl TpuService {
|
|||
estimated_slot: Arc::new(AtomicU64::new(current_slot)),
|
||||
leader_schedule: Arc::new(LeaderSchedule::new(config.number_of_leaders_to_cache)),
|
||||
rpc_client,
|
||||
rpc_ws_address,
|
||||
broadcast_sender: Arc::new(sender),
|
||||
tpu_connection_manager: Arc::new(tpu_connection_manager),
|
||||
identity_stakes: Arc::new(RwLock::new(IdentityStakes::default())),
|
||||
|
@ -173,7 +170,6 @@ impl TpuService {
|
|||
) -> AnyhowJoinHandle {
|
||||
let current_slot = self.current_slot.clone();
|
||||
let rpc_client = self.rpc_client.clone();
|
||||
let rpc_ws_address = self.rpc_ws_address.clone();
|
||||
|
||||
let update_slot = move |slot: u64| {
|
||||
if slot > current_slot.load(Ordering::Relaxed) {
|
||||
|
@ -188,7 +184,7 @@ impl TpuService {
|
|||
|
||||
while nb_error < max_nb_errors {
|
||||
// always loop update the current slots as it is central to working of TPU
|
||||
let Err(err) = SolanaUtils::poll_slots(&rpc_client, &rpc_ws_address, &update_slot).await else {
|
||||
let Err(err) = SolanaUtils::poll_slots(&rpc_client, &update_slot).await else {
|
||||
nb_error = 0;
|
||||
continue;
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue