This commit is contained in:
aniketfuryrocks 2023-03-22 18:29:30 +05:30
parent 8e5cf8a567
commit 46c61bc48e
No known key found for this signature in database
GPG Key ID: FA6BFCFAA7D4B764
1 changed files with 31 additions and 27 deletions

View File

@ -66,7 +66,10 @@ lazy_static::lazy_static! {
register_int_counter!(opts!("literpc_txs_finalized", "Number of Transactions Finalized")).unwrap();
static ref ERRORS_WHILE_FETCHING_SLOTS: IntCounter =
register_int_counter!(opts!("literpc_error_while_fetching_slots", "Number of errors while fetching slots")).unwrap();
static ref BLOCKS_IN_QUEUE: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_blocks_in_queue", "Number of blocks waiting to deque")).unwrap();
static ref BLOCKS_IN_CONFIRMED_QUEUE: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_blocks_in_confirmed_queue", "Number of confirmed blocks waiting to deque")).unwrap();
static ref BLOCKS_IN_FINALIZED_QUEUE: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_blocks_in_finalized_queue", "Number of finalized blocks waiting to deque")).unwrap();
static ref BLOCKS_IN_RETRY_QUEUE: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_blocks_in_retry_queue", "Number of blocks waiting in retry")).unwrap();
static ref NUMBER_OF_SIGNATURE_SUBSCRIBERS: GenericGauge<prometheus::core::AtomicI64> = register_int_gauge!(opts!("literpc_number_of_signature_sub", "Number of signature subscriber")).unwrap();
}
@ -194,8 +197,8 @@ impl BlockListener {
};
let Some(transactions) = block.transactions else {
Self::increment_invalid_block_metric(commitment_config);
return Ok(());
Self::increment_invalid_block_metric(commitment_config);
return Ok(());
};
let blockhash = block.blockhash;
@ -379,40 +382,35 @@ impl BlockListener {
// a task that will queue back the slots to be retried after a certain delay
let recent_slot = Arc::new(AtomicU64::new(0));
{
let slots_task_queue = slots_task_queue.clone();
let recent_slot = recent_slot.clone();
tokio::spawn(async move {
loop {
match slot_retry_queue_rx.recv().await {
Some((slot, error_count, instant)) => {
BLOCKS_IN_RETRY_QUEUE.dec();
let recent_slot =
recent_slot.load(std::sync::atomic::Ordering::Relaxed);
// if slot is too old ignore
if recent_slot.saturating_sub(slot) > 256 {
// slot too old to retry
// most probably its an empty slot
continue;
}
let now = tokio::time::Instant::now();
if now < instant {
tokio::time::sleep_until(instant).await;
}
let mut queue = slots_task_queue.lock().await;
queue.push_back((slot, error_count + 1));
}
None => {
break;
}
while let Some((slot, error_count, instant)) = slot_retry_queue_rx.recv().await {
BLOCKS_IN_RETRY_QUEUE.dec();
let recent_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed);
// if slot is too old ignore
if recent_slot.saturating_sub(slot) > 256 {
// slot too old to retry
// most probably its an empty slot
continue;
}
let now = tokio::time::Instant::now();
if now < instant {
tokio::time::sleep_until(instant).await;
}
let mut queue = slots_task_queue.lock().await;
queue.push_back((slot, error_count + 1));
}
});
}
let rpc_client = self.rpc_client.clone();
tokio::spawn(async move {
info!("{commitment_config:?} block listner started");
let slots_task_queue = slots_task_queue.clone();
let last_latest_slot = self
.block_store
@ -450,11 +448,17 @@ impl BlockListener {
for slot in new_block_slots {
lock.push_back((slot, 0));
}
BLOCKS_IN_QUEUE.set(lock.len() as i64);
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.set(lock.len() as i64);
} else {
BLOCKS_IN_CONFIRMED_QUEUE.set(lock.len() as i64);
}
}
last_latest_slot = new_slot;
recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed);
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}
})