diff --git a/services/src/data_caching_service.rs b/services/src/data_caching_service.rs index 05315ea3..916fe85d 100644 --- a/services/src/data_caching_service.rs +++ b/services/src/data_caching_service.rs @@ -1,6 +1,7 @@ use std::time::Duration; use anyhow::{bail, Context}; +use log::warn; use prometheus::core::GenericGauge; use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter}; use solana_lite_rpc_core::stores::{ @@ -11,6 +12,7 @@ use solana_lite_rpc_core::AnyhowJoinHandle; use solana_sdk::clock::MAX_RECENT_BLOCKHASHES; use solana_sdk::commitment_config::CommitmentLevel; use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus}; +use tokio::sync::broadcast::error::RecvError; lazy_static::lazy_static! { static ref NB_CLUSTER_NODES: GenericGauge = @@ -51,7 +53,16 @@ impl DataCachingService { let block_cache_jh = tokio::spawn(async move { let mut block_notifier = block_notifier; loop { - let block = block_notifier.recv().await.expect("Should recv blocks"); + let block = match block_notifier.recv().await { + Ok(block) => block, + Err(RecvError::Lagged(blocks_lagged)) => { + warn!("Lagged {} blocks - continue", blocks_lagged); + continue; + } + Err(RecvError::Closed) => { + bail!("Block stream has been closed - abort"); + } + }; data_cache .block_information_store