Merge pull request #117 from blockworks-foundation/fixing_issues_with_slot_subscription

fixing the issue where slot subscription is lagging or is not working
This commit is contained in:
galactus 2023-04-12 09:45:50 +02:00 committed by GitHub
commit f11c7e4969
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 101 additions and 52 deletions

View File

@ -10,7 +10,7 @@ use prometheus::core::GenericGauge;
use prometheus::{opts, register_int_gauge};
use serde_json::json;
use solana_client::rpc_request::RpcRequest;
use solana_client::rpc_response::{RpcBlockhash, Response};
use solana_client::rpc_response::{Response, RpcBlockhash};
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::TransactionDetails;
@ -39,7 +39,6 @@ pub struct BlockStore {
impl BlockStore {
pub async fn new(rpc_client: &RpcClient) -> anyhow::Result<Self> {
let blocks = Arc::new(DashMap::new());
// fetch in order of least recency so the blockstore is as up to date as it can be on boot
@ -50,14 +49,14 @@ impl BlockStore {
let (processed_blockhash, processed_block) =
Self::fetch_latest_processed(rpc_client).await?;
blocks.insert(processed_blockhash.clone(),processed_block);
blocks.insert(processed_blockhash.clone(), processed_block);
blocks.insert(confirmed_blockhash.clone(), confirmed_block);
blocks.insert(finalized_blockhash.clone(), finalized_block);
Ok(Self {
latest_processed_block: Arc::new(RwLock::new((
processed_blockhash.clone(),
processed_block
processed_block,
))),
latest_confirmed_block: Arc::new(RwLock::new((
confirmed_blockhash.clone(),
@ -75,11 +74,12 @@ impl BlockStore {
pub async fn fetch_latest_processed(
rpc_client: &RpcClient,
) -> anyhow::Result<(String, BlockInformation)> {
let response = rpc_client.send::<Response<RpcBlockhash>>(
RpcRequest::GetLatestBlockhash,
json!([CommitmentConfig::processed()]),
)
.await?;
let response = rpc_client
.send::<Response<RpcBlockhash>>(
RpcRequest::GetLatestBlockhash,
json!([CommitmentConfig::processed()]),
)
.await?;
let processed_blockhash = response.value.blockhash;
let processed_block = BlockInformation {
@ -195,7 +195,6 @@ impl BlockStore {
// save slot copy to avoid borrow issues
let slot = block_info.slot;
// Write to block store first in order to prevent
// any race condition i.e prevent some one to
// ask the map what it doesn't have rn
@ -210,7 +209,9 @@ impl BlockStore {
}
pub async fn clean(&self, cleanup_duration: Duration) {
let latest_processed = self.get_latest_blockhash(CommitmentConfig::processed()).await;
let latest_processed = self
.get_latest_blockhash(CommitmentConfig::processed())
.await;
let latest_confirmed = self
.get_latest_blockhash(CommitmentConfig::confirmed())
.await;

View File

@ -162,7 +162,6 @@ impl BlockListener {
CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized,
_ => TransactionConfirmationStatus::Confirmed,
};
let timer = if commitment_config.is_finalized() {
TT_RECV_FIN_BLOCK.start_timer()
@ -334,7 +333,7 @@ impl BlockListener {
slot: slot as i64,
leader_id: 0, // TODO: lookup leader
parent_slot: parent_slot as i64,
cluster_time: Utc.timestamp_millis_opt(block_time*1000).unwrap(),
cluster_time: Utc.timestamp_millis_opt(block_time * 1000).unwrap(),
local_time: block_info.map(|b| b.processed_local_time).flatten(),
}))
.expect("Error sending block to postgres service");
@ -496,8 +495,15 @@ impl BlockListener {
info!("processed block listner started");
loop {
let (processed_blockhash, processed_block) = BlockStore::fetch_latest_processed(rpc_client.as_ref()).await?;
block_store.add_block(processed_blockhash, processed_block, CommitmentConfig::processed()).await;
let (processed_blockhash, processed_block) =
BlockStore::fetch_latest_processed(rpc_client.as_ref()).await?;
block_store
.add_block(
processed_blockhash,
processed_block,
CommitmentConfig::processed(),
)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
})

View File

@ -25,7 +25,7 @@ use tokio::{
use super::rotating_queue::RotatingQueue;
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10);
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(5);
const CONNECTION_RETRY_COUNT: usize = 10;
lazy_static::lazy_static! {
@ -56,8 +56,8 @@ impl ActiveConnection {
async fn make_connection(endpoint: Endpoint, addr: SocketAddr) -> anyhow::Result<Connection> {
let connecting = endpoint.connect(addr, "connect")?;
let res = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, connecting).await?;
Ok(res.unwrap())
let res = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, connecting).await??;
Ok(res)
}
async fn make_connection_0rtt(

View File

@ -172,14 +172,15 @@ impl TpuService {
}
};
let fanout = self.fanout_slots;
let last_slot = estimated_slot + fanout;
let next_leaders = {
let leader_schedule = self.leader_schedule.read().await;
let mut next_leaders = vec![];
for leader in leader_schedule.iter() {
if leader.leader_slot >= load_slot && leader.leader_slot <= load_slot + fanout {
if leader.leader_slot >= load_slot && leader.leader_slot <= last_slot {
next_leaders.push(leader.contact_info.clone());
} else if leader.leader_slot > load_slot + fanout {
} else if leader.leader_slot > last_slot {
break;
}
}
@ -200,6 +201,76 @@ impl TpuService {
.await;
}
async fn update_current_slot(&self, update_notifier: tokio::sync::mpsc::UnboundedSender<u64>) {
loop {
let slot = self
.rpc_client
.get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig {
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
})
.await;
match slot {
Ok(slot) => {
self.current_slot.store(slot, Ordering::Relaxed);
}
Err(e) => {
// error getting slot
error!("error getting slot {}", e);
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
}
}
let res = tokio::time::timeout(
Duration::from_millis(2000),
self.pubsub_client.slot_subscribe(),
)
.await;
match res {
Ok(sub_res) => {
match sub_res {
Ok((mut client, unsub)) => {
loop {
let next = tokio::time::timeout(
Duration::from_millis(2000),
client.next(),
)
.await;
match next {
Ok(slot_info) => {
if let Some(slot_info) = slot_info {
if slot_info.slot
> self.current_slot.load(Ordering::Relaxed)
{
self.current_slot
.store(slot_info.slot, Ordering::Relaxed);
CURRENT_SLOT.set(slot_info.slot as i64);
let _ = update_notifier.send(slot_info.slot);
}
}
}
Err(_) => {
// timedout reconnect to pubsub
warn!("slot pub sub disconnected reconnecting");
break;
}
}
}
unsub();
}
Err(e) => {
warn!("slot pub sub disconnected ({}) reconnecting", e);
}
}
}
Err(_) => {
// timed out
warn!("timedout subscribing to slots");
}
}
}
}
pub async fn start(&self) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
self.update_cluster_nodes().await?;
self.update_leader_schedule().await?;
@ -227,41 +298,12 @@ impl TpuService {
}
});
let pubsub_client = self.pubsub_client.clone();
let current_slot = self.current_slot.clone();
let this = self.clone();
let (slot_sender, slot_reciever) = tokio::sync::mpsc::unbounded_channel::<Slot>();
let slot_sub_task = tokio::spawn(async move {
let pubsub_client = pubsub_client.clone();
let current_slot = current_slot.clone();
loop {
let res = pubsub_client.slot_subscribe().await;
if let Ok((mut client, unsub)) = res {
loop {
let next =
tokio::time::timeout(Duration::from_millis(2000), client.next()).await;
match next {
Ok(slot_info) => {
if let Some(slot_info) = slot_info {
if slot_info.slot > current_slot.load(Ordering::Relaxed) {
current_slot.store(slot_info.slot, Ordering::Relaxed);
CURRENT_SLOT.set(slot_info.slot as i64);
let _ = slot_sender.send(slot_info.slot);
}
}
}
Err(_) => {
// timedout reconnect to pubsub
warn!("slot pub sub disconnected reconnecting");
break;
}
}
}
unsub();
} else if let Err(e) = res {
error!("could not subsribe to the slot {}", e);
}
}
this.update_current_slot(slot_sender).await;
Ok(())
});
let estimated_slot = self.estimated_slot.clone();