From 46c61bc48ef668318dbc7d53155bddbc9027cbf1 Mon Sep 17 00:00:00 2001 From: aniketfuryrocks Date: Wed, 22 Mar 2023 18:29:30 +0530 Subject: [PATCH] metrics --- src/workers/block_listenser.rs | 58 ++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 8b4e9f2f..bcdf7ce6 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -66,7 +66,10 @@ lazy_static::lazy_static! { register_int_counter!(opts!("literpc_txs_finalized", "Number of Transactions Finalized")).unwrap(); static ref ERRORS_WHILE_FETCHING_SLOTS: IntCounter = register_int_counter!(opts!("literpc_error_while_fetching_slots", "Number of errors while fetching slots")).unwrap(); - static ref BLOCKS_IN_QUEUE: GenericGauge = register_int_gauge!(opts!("literpc_blocks_in_queue", "Number of blocks waiting to deque")).unwrap(); + + static ref BLOCKS_IN_CONFIRMED_QUEUE: GenericGauge = register_int_gauge!(opts!("literpc_blocks_in_confirmed_queue", "Number of confirmed blocks waiting to deque")).unwrap(); + static ref BLOCKS_IN_FINALIZED_QUEUE: GenericGauge = register_int_gauge!(opts!("literpc_blocks_in_finalized_queue", "Number of finalized blocks waiting to deque")).unwrap(); + static ref BLOCKS_IN_RETRY_QUEUE: GenericGauge = register_int_gauge!(opts!("literpc_blocks_in_retry_queue", "Number of blocks waiting in retry")).unwrap(); static ref NUMBER_OF_SIGNATURE_SUBSCRIBERS: GenericGauge = register_int_gauge!(opts!("literpc_number_of_signature_sub", "Number of signature subscriber")).unwrap(); } @@ -194,8 +197,8 @@ impl BlockListener { }; let Some(transactions) = block.transactions else { - Self::increment_invalid_block_metric(commitment_config); - return Ok(()); + Self::increment_invalid_block_metric(commitment_config); + return Ok(()); }; let blockhash = block.blockhash; @@ -379,40 +382,35 @@ impl BlockListener { // a task that will queue back the slots to be retried after a certain delay let recent_slot = Arc::new(AtomicU64::new(0)); + { let slots_task_queue = slots_task_queue.clone(); let recent_slot = recent_slot.clone(); tokio::spawn(async move { - loop { - match slot_retry_queue_rx.recv().await { - Some((slot, error_count, instant)) => { - BLOCKS_IN_RETRY_QUEUE.dec(); - let recent_slot = - recent_slot.load(std::sync::atomic::Ordering::Relaxed); - // if slot is too old ignore - if recent_slot.saturating_sub(slot) > 256 { - // slot too old to retry - // most probably its an empty slot - continue; - } - - let now = tokio::time::Instant::now(); - if now < instant { - tokio::time::sleep_until(instant).await; - } - let mut queue = slots_task_queue.lock().await; - queue.push_back((slot, error_count + 1)); - } - None => { - break; - } + while let Some((slot, error_count, instant)) = slot_retry_queue_rx.recv().await { + BLOCKS_IN_RETRY_QUEUE.dec(); + let recent_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed); + // if slot is too old ignore + if recent_slot.saturating_sub(slot) > 256 { + // slot too old to retry + // most probably its an empty slot + continue; } + + let now = tokio::time::Instant::now(); + if now < instant { + tokio::time::sleep_until(instant).await; + } + let mut queue = slots_task_queue.lock().await; + queue.push_back((slot, error_count + 1)); } }); } let rpc_client = self.rpc_client.clone(); tokio::spawn(async move { + info!("{commitment_config:?} block listner started"); + let slots_task_queue = slots_task_queue.clone(); let last_latest_slot = self .block_store @@ -450,11 +448,17 @@ impl BlockListener { for slot in new_block_slots { lock.push_back((slot, 0)); } - BLOCKS_IN_QUEUE.set(lock.len() as i64); + + if commitment_config.is_finalized() { + BLOCKS_IN_FINALIZED_QUEUE.set(lock.len() as i64); + } else { + BLOCKS_IN_CONFIRMED_QUEUE.set(lock.len() as i64); + } } last_latest_slot = new_slot; recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; } })