From d7969a808702cb6be03d7eec7cba3cd6bc0b87e8 Mon Sep 17 00:00:00 2001 From: galactus <96341601+godmodegalactus@users.noreply.github.com> Date: Sat, 9 Dec 2023 21:50:02 +0100 Subject: [PATCH] CHecking stream for timeouts. (#18) --- src/main.rs | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/main.rs b/src/main.rs index 7844673..c8850fa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ use std::{ sync::{atomic::AtomicU64, Arc}, time::Duration, }; +use yellowstone_grpc_proto_original::geyser::SubscribeRequestPing; use crate::prometheus_sync::PrometheusSync; use block_info::BlockInfo; @@ -135,7 +136,6 @@ pub async fn start_tracking_banking_stage_errors( } }, _=>{ - break; } } } @@ -189,11 +189,26 @@ async fn start_tracking_blocks( Default::default(), None, Default::default(), - None, + Some(SubscribeRequestPing { id: 0 }), ) .await .unwrap(); - while let Some(message) = geyser_stream.next().await { + loop { + let res = tokio::time::timeout(Duration::from_secs(10), geyser_stream.next()).await; + let message_res = if let Ok(message) = res { + message + } else { + // restarting geyser block subscription because of timeout + error!("Restarting geyser block subscription because of timeout"); + break; + }; + let message = if let Some(message) = message_res { + message + } else { + error!("Restarting geyser block subscription because it is broken"); + break; + }; + let Ok(message) = message else { continue; }; @@ -246,9 +261,7 @@ async fn start_tracking_blocks( }); // delay queue so that we get all the banking stage errors before processing block } - _ => { - break; - } + _ => {} }; } log::error!("stopping the sidecar, geyser block stream is broken");