fixing the issue where slot subscription is lagging or is not working
This commit is contained in:
parent
e28a1976b3
commit
ed134a2679
|
@ -10,7 +10,7 @@ use prometheus::core::GenericGauge;
|
|||
use prometheus::{opts, register_int_gauge};
|
||||
use serde_json::json;
|
||||
use solana_client::rpc_request::RpcRequest;
|
||||
use solana_client::rpc_response::{RpcBlockhash, Response};
|
||||
use solana_client::rpc_response::{Response, RpcBlockhash};
|
||||
use solana_client::{nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig};
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use solana_transaction_status::TransactionDetails;
|
||||
|
@ -39,7 +39,6 @@ pub struct BlockStore {
|
|||
|
||||
impl BlockStore {
|
||||
pub async fn new(rpc_client: &RpcClient) -> anyhow::Result<Self> {
|
||||
|
||||
let blocks = Arc::new(DashMap::new());
|
||||
|
||||
// fetch in order of least recency so the blockstore is as up to date as it can be on boot
|
||||
|
@ -50,14 +49,14 @@ impl BlockStore {
|
|||
let (processed_blockhash, processed_block) =
|
||||
Self::fetch_latest_processed(rpc_client).await?;
|
||||
|
||||
blocks.insert(processed_blockhash.clone(),processed_block);
|
||||
blocks.insert(processed_blockhash.clone(), processed_block);
|
||||
blocks.insert(confirmed_blockhash.clone(), confirmed_block);
|
||||
blocks.insert(finalized_blockhash.clone(), finalized_block);
|
||||
|
||||
Ok(Self {
|
||||
latest_processed_block: Arc::new(RwLock::new((
|
||||
processed_blockhash.clone(),
|
||||
processed_block
|
||||
processed_block,
|
||||
))),
|
||||
latest_confirmed_block: Arc::new(RwLock::new((
|
||||
confirmed_blockhash.clone(),
|
||||
|
@ -75,11 +74,12 @@ impl BlockStore {
|
|||
pub async fn fetch_latest_processed(
|
||||
rpc_client: &RpcClient,
|
||||
) -> anyhow::Result<(String, BlockInformation)> {
|
||||
let response = rpc_client.send::<Response<RpcBlockhash>>(
|
||||
RpcRequest::GetLatestBlockhash,
|
||||
json!([CommitmentConfig::processed()]),
|
||||
)
|
||||
.await?;
|
||||
let response = rpc_client
|
||||
.send::<Response<RpcBlockhash>>(
|
||||
RpcRequest::GetLatestBlockhash,
|
||||
json!([CommitmentConfig::processed()]),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let processed_blockhash = response.value.blockhash;
|
||||
let processed_block = BlockInformation {
|
||||
|
@ -195,7 +195,6 @@ impl BlockStore {
|
|||
// save slot copy to avoid borrow issues
|
||||
let slot = block_info.slot;
|
||||
|
||||
|
||||
// Write to block store first in order to prevent
|
||||
// any race condition i.e prevent some one to
|
||||
// ask the map what it doesn't have rn
|
||||
|
@ -210,7 +209,9 @@ impl BlockStore {
|
|||
}
|
||||
|
||||
pub async fn clean(&self, cleanup_duration: Duration) {
|
||||
let latest_processed = self.get_latest_blockhash(CommitmentConfig::processed()).await;
|
||||
let latest_processed = self
|
||||
.get_latest_blockhash(CommitmentConfig::processed())
|
||||
.await;
|
||||
let latest_confirmed = self
|
||||
.get_latest_blockhash(CommitmentConfig::confirmed())
|
||||
.await;
|
||||
|
|
|
@ -162,7 +162,6 @@ impl BlockListener {
|
|||
CommitmentLevel::Finalized => TransactionConfirmationStatus::Finalized,
|
||||
_ => TransactionConfirmationStatus::Confirmed,
|
||||
};
|
||||
|
||||
|
||||
let timer = if commitment_config.is_finalized() {
|
||||
TT_RECV_FIN_BLOCK.start_timer()
|
||||
|
@ -334,7 +333,7 @@ impl BlockListener {
|
|||
slot: slot as i64,
|
||||
leader_id: 0, // TODO: lookup leader
|
||||
parent_slot: parent_slot as i64,
|
||||
cluster_time: Utc.timestamp_millis_opt(block_time*1000).unwrap(),
|
||||
cluster_time: Utc.timestamp_millis_opt(block_time * 1000).unwrap(),
|
||||
local_time: block_info.map(|b| b.processed_local_time).flatten(),
|
||||
}))
|
||||
.expect("Error sending block to postgres service");
|
||||
|
@ -496,8 +495,15 @@ impl BlockListener {
|
|||
info!("processed block listner started");
|
||||
|
||||
loop {
|
||||
let (processed_blockhash, processed_block) = BlockStore::fetch_latest_processed(rpc_client.as_ref()).await?;
|
||||
block_store.add_block(processed_blockhash, processed_block, CommitmentConfig::processed()).await;
|
||||
let (processed_blockhash, processed_block) =
|
||||
BlockStore::fetch_latest_processed(rpc_client.as_ref()).await?;
|
||||
block_store
|
||||
.add_block(
|
||||
processed_blockhash,
|
||||
processed_block,
|
||||
CommitmentConfig::processed(),
|
||||
)
|
||||
.await;
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
|
||||
}
|
||||
})
|
||||
|
|
|
@ -25,7 +25,7 @@ use tokio::{
|
|||
use super::rotating_queue::RotatingQueue;
|
||||
|
||||
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
|
||||
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10);
|
||||
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(5);
|
||||
const CONNECTION_RETRY_COUNT: usize = 10;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
|
@ -56,8 +56,8 @@ impl ActiveConnection {
|
|||
|
||||
async fn make_connection(endpoint: Endpoint, addr: SocketAddr) -> anyhow::Result<Connection> {
|
||||
let connecting = endpoint.connect(addr, "connect")?;
|
||||
let res = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, connecting).await?;
|
||||
Ok(res.unwrap())
|
||||
let res = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, connecting).await??;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn make_connection_0rtt(
|
||||
|
|
|
@ -172,14 +172,15 @@ impl TpuService {
|
|||
}
|
||||
};
|
||||
let fanout = self.fanout_slots;
|
||||
let last_slot = estimated_slot + fanout;
|
||||
|
||||
let next_leaders = {
|
||||
let leader_schedule = self.leader_schedule.read().await;
|
||||
let mut next_leaders = vec![];
|
||||
for leader in leader_schedule.iter() {
|
||||
if leader.leader_slot >= load_slot && leader.leader_slot <= load_slot + fanout {
|
||||
if leader.leader_slot >= load_slot && leader.leader_slot <= last_slot {
|
||||
next_leaders.push(leader.contact_info.clone());
|
||||
} else if leader.leader_slot > load_slot + fanout {
|
||||
} else if leader.leader_slot > last_slot {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -200,6 +201,76 @@ impl TpuService {
|
|||
.await;
|
||||
}
|
||||
|
||||
async fn update_current_slot(&self, update_notifier: tokio::sync::mpsc::UnboundedSender<u64>) {
|
||||
loop {
|
||||
let slot = self
|
||||
.rpc_client
|
||||
.get_slot_with_commitment(solana_sdk::commitment_config::CommitmentConfig {
|
||||
commitment: solana_sdk::commitment_config::CommitmentLevel::Processed,
|
||||
})
|
||||
.await;
|
||||
match slot {
|
||||
Ok(slot) => {
|
||||
self.current_slot.store(slot, Ordering::Relaxed);
|
||||
}
|
||||
Err(e) => {
|
||||
// error getting slot
|
||||
error!("error getting slot {}", e);
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let res = tokio::time::timeout(
|
||||
Duration::from_millis(2000),
|
||||
self.pubsub_client.slot_subscribe(),
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(sub_res) => {
|
||||
match sub_res {
|
||||
Ok((mut client, unsub)) => {
|
||||
loop {
|
||||
let next = tokio::time::timeout(
|
||||
Duration::from_millis(2000),
|
||||
client.next(),
|
||||
)
|
||||
.await;
|
||||
match next {
|
||||
Ok(slot_info) => {
|
||||
if let Some(slot_info) = slot_info {
|
||||
if slot_info.slot
|
||||
> self.current_slot.load(Ordering::Relaxed)
|
||||
{
|
||||
self.current_slot
|
||||
.store(slot_info.slot, Ordering::Relaxed);
|
||||
CURRENT_SLOT.set(slot_info.slot as i64);
|
||||
let _ = update_notifier.send(slot_info.slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// timedout reconnect to pubsub
|
||||
warn!("slot pub sub disconnected reconnecting");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
unsub();
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("slot pub sub disconnected ({}) reconnecting", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// timed out
|
||||
warn!("timedout subscribing to slots");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
|
||||
self.update_cluster_nodes().await?;
|
||||
self.update_leader_schedule().await?;
|
||||
|
@ -227,41 +298,12 @@ impl TpuService {
|
|||
}
|
||||
});
|
||||
|
||||
let pubsub_client = self.pubsub_client.clone();
|
||||
let current_slot = self.current_slot.clone();
|
||||
let this = self.clone();
|
||||
let (slot_sender, slot_reciever) = tokio::sync::mpsc::unbounded_channel::<Slot>();
|
||||
|
||||
let slot_sub_task = tokio::spawn(async move {
|
||||
let pubsub_client = pubsub_client.clone();
|
||||
let current_slot = current_slot.clone();
|
||||
loop {
|
||||
let res = pubsub_client.slot_subscribe().await;
|
||||
if let Ok((mut client, unsub)) = res {
|
||||
loop {
|
||||
let next =
|
||||
tokio::time::timeout(Duration::from_millis(2000), client.next()).await;
|
||||
match next {
|
||||
Ok(slot_info) => {
|
||||
if let Some(slot_info) = slot_info {
|
||||
if slot_info.slot > current_slot.load(Ordering::Relaxed) {
|
||||
current_slot.store(slot_info.slot, Ordering::Relaxed);
|
||||
CURRENT_SLOT.set(slot_info.slot as i64);
|
||||
let _ = slot_sender.send(slot_info.slot);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// timedout reconnect to pubsub
|
||||
warn!("slot pub sub disconnected reconnecting");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
unsub();
|
||||
} else if let Err(e) = res {
|
||||
error!("could not subsribe to the slot {}", e);
|
||||
}
|
||||
}
|
||||
this.update_current_slot(slot_sender).await;
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let estimated_slot = self.estimated_slot.clone();
|
||||
|
|
Loading…
Reference in New Issue