tpu service

This commit is contained in:
aniketfuryrocks 2023-06-12 06:11:49 +05:30
parent 0d6f271156
commit 56acfccc62
No known key found for this signature in database
GPG Key ID: 1B75EA596D89FF06
5 changed files with 79 additions and 86 deletions

View File

@ -7,3 +7,5 @@ pub mod rotating_queue;
pub mod solana_utils;
pub mod structures;
pub mod subscription_handler;
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;

View File

@ -1,6 +1,7 @@
use crate::structures::identity_stakes::IdentityStakes;
use anyhow::Context;
use futures::StreamExt;
use log::{error, info, warn};
use log::{info, warn};
use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
@ -61,66 +62,40 @@ impl SolanaUtils {
pub async fn poll_slots(
rpc_client: Arc<RpcClient>,
pubsub_client: Arc<PubsubClient>,
rpc_ws_address: &str,
update_slot: impl Fn(u64),
) {
loop {
let slot = rpc_client
.get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig {
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
})
.await;
match slot {
Ok(slot) => {
update_slot(slot);
}
Err(e) => {
// error getting slot
error!("error getting slot {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
}
) -> anyhow::Result<()> {
let pubsub_client = PubsubClient::new(rpc_ws_address)
.await
.context("Error creating pubsub client")?;
let res =
tokio::time::timeout(Duration::from_millis(1000), pubsub_client.slot_subscribe())
.await;
match res {
Ok(sub_res) => {
match sub_res {
Ok((mut client, unsub)) => {
loop {
let next = tokio::time::timeout(
Duration::from_millis(2000),
client.next(),
)
.await;
match next {
Ok(slot_info) => {
if let Some(slot_info) = slot_info {
update_slot(slot_info.slot);
}
}
Err(_) => {
// timedout reconnect to pubsub
warn!("slot pub sub disconnected reconnecting");
break;
}
}
}
unsub();
}
Err(e) => {
warn!("slot pub sub disconnected ({}) reconnecting", e);
}
}
}
Err(_) => {
// timed out
warn!("timedout subscribing to slots");
}
let slot = rpc_client
.get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig {
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
})
.await
.context("error getting slot")?;
update_slot(slot);
let (mut client, unsub) =
tokio::time::timeout(Duration::from_millis(1000), pubsub_client.slot_subscribe())
.await
.context("timedout subscribing to slots")?
.context("slot pub sub disconnected")?;
while let Ok(slot_info) =
tokio::time::timeout(Duration::from_millis(2000), client.next()).await
{
if let Some(slot_info) = slot_info {
update_slot(slot_info.slot);
}
}
warn!("slot pub sub disconnected reconnecting");
unsub();
Ok(())
}
// Estimates the slots, either from polled slot or by forcefully updating after every 400ms

View File

@ -3,7 +3,7 @@ use crate::{
encoding::BinaryEncoding,
postgres::Postgres,
rpc::LiteRpcServer,
AnyhowJoinHandle, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
};
use solana_lite_rpc_services::{
@ -17,7 +17,10 @@ use solana_lite_rpc_services::{
tx_sender::{TxProps, TxSender, TXS_IN_CHANNEL},
};
use solana_lite_rpc_core::block_store::{BlockInformation, BlockStore};
use solana_lite_rpc_core::{
block_store::{BlockInformation, BlockStore},
AnyhowJoinHandle,
};
use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};

View File

@ -9,8 +9,6 @@ pub mod errors;
pub mod postgres;
pub mod rpc;
pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;
#[from_env]
pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899";
#[from_env]

View File

@ -1,13 +1,14 @@
use anyhow::Result;
use anyhow::bail;
use dashmap::DashMap;
use log::{error, info};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_client::nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{
leader_schedule::LeaderSchedule, solana_utils::SolanaUtils,
structures::identity_stakes::IdentityStakes,
structures::identity_stakes::IdentityStakes, AnyhowJoinHandle,
};
use solana_sdk::{
pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, signer::Signer, slot_history::Slot,
};
@ -22,7 +23,6 @@ use std::{
};
use tokio::{
sync::RwLock,
task::JoinHandle,
time::{Duration, Instant},
};
@ -54,7 +54,7 @@ pub struct TpuService {
estimated_slot: Arc<AtomicU64>,
fanout_slots: u64,
rpc_client: Arc<RpcClient>,
pubsub_client: Arc<PubsubClient>,
rpc_ws_address: String,
broadcast_sender: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
tpu_connection_manager: Arc<TpuConnectionManager>,
identity: Arc<Keypair>,
@ -72,7 +72,6 @@ impl TpuService {
rpc_ws_address: String,
txs_sent_store: Arc<DashMap<String, TxProps>>,
) -> anyhow::Result<Self> {
let pubsub_client = PubsubClient::new(&rpc_ws_address).await?;
let (sender, _) = tokio::sync::broadcast::channel(MAXIMUM_TRANSACTIONS_IN_QUEUE);
let (certificate, key) = new_self_signed_tls_certificate(
identity.as_ref(),
@ -89,7 +88,7 @@ impl TpuService {
leader_schedule: Arc::new(LeaderSchedule::new(CACHE_NEXT_SLOT_LEADERS_PUBKEY_SIZE)),
fanout_slots,
rpc_client,
pubsub_client: Arc::new(pubsub_client),
rpc_ws_address,
broadcast_sender: Arc::new(sender),
tpu_connection_manager: Arc::new(tpu_connection_manager),
identity,
@ -98,7 +97,7 @@ impl TpuService {
})
}
pub async fn update_current_stakes(&self) -> Result<()> {
pub async fn update_current_stakes(&self) -> anyhow::Result<()> {
// update stakes for identity
// update stakes for the identity
{
@ -117,7 +116,7 @@ impl TpuService {
Ok(())
}
pub async fn update_leader_schedule(&self) -> Result<()> {
pub async fn update_leader_schedule(&self) -> anyhow::Result<()> {
let current_slot = self.current_slot.load(Ordering::Relaxed);
let estimated_slot = self.estimated_slot.load(Ordering::Relaxed);
self.leader_schedule
@ -166,9 +165,11 @@ impl TpuService {
.await;
}
async fn update_current_slot(&self, update_notifier: tokio::sync::mpsc::UnboundedSender<u64>) {
async fn update_current_slot(
&self,
update_notifier: tokio::sync::mpsc::UnboundedSender<u64>,
) -> anyhow::Result<()> {
let current_slot = self.current_slot.clone();
let update_slot = |slot: u64| {
if slot > current_slot.load(Ordering::Relaxed) {
current_slot.store(slot, Ordering::Relaxed);
@ -177,15 +178,23 @@ impl TpuService {
}
};
SolanaUtils::poll_slots(
self.rpc_client.clone(),
self.pubsub_client.clone(),
update_slot,
)
.await;
loop {
let Err(err) = SolanaUtils::poll_slots(
self.rpc_client.clone(),
&self.rpc_ws_address,
update_slot,
)
.await else {
bail!("current slot fetch task exited");
};
error!("slot fetch task error: {err:?}");
tokio::time::sleep(Duration::from_millis(2000)).await;
}
}
pub async fn start(&self) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
pub async fn start(&self) -> anyhow::Result<()> {
self.leader_schedule
.load_cluster_info(self.rpc_client.clone())
.await?;
@ -218,8 +227,8 @@ impl TpuService {
let this = self.clone();
let (slot_sender, slot_reciever) = tokio::sync::mpsc::unbounded_channel::<Slot>();
let slot_sub_task = tokio::spawn(async move {
this.update_current_slot(slot_sender).await;
let slot_sub_task: AnyhowJoinHandle = tokio::spawn(async move {
this.update_current_slot(slot_sender).await?;
Ok(())
});
@ -242,11 +251,17 @@ impl TpuService {
}
});
Ok(vec![
jh_update_leaders,
slot_sub_task,
estimated_slot_calculation,
])
tokio::select! {
res = jh_update_leaders => {
bail!("Leader update service exited unexpectedly {res:?}");
},
res = slot_sub_task => {
bail!("Leader update service exited unexpectedly {res:?}");
},
res = estimated_slot_calculation => {
bail!("Estimated slot calculation service exited unexpectedly {res:?}");
},
}
}
pub fn get_estimated_slot(&self) -> u64 {