diff --git a/src/main.rs b/src/main.rs index 1116af4..2b798e9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,7 +43,6 @@ pub async fn start_tracking_banking_stage_errors( map_of_infos: Arc>>, slot_by_errors: Arc>, slot: Arc, - subscribe_to_slots: bool, ) { loop { let token: Option = None; @@ -54,20 +53,11 @@ pub async fn start_tracking_banking_stage_errors( ) .unwrap(); - let slot_subscription: HashMap< - String, - yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots, - > = if subscribe_to_slots { - log::info!("subscribing to slots on grpc banking errors"); - let mut slot_sub = HashMap::new(); - slot_sub.insert( - "slot_sub".to_string(), - yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots {}, - ); - slot_sub - } else { - HashMap::new() - }; + let mut slot_subscription = HashMap::new(); + slot_subscription.insert( + "slot_subscription".to_string(), + yellowstone_grpc_proto::geyser::SubscribeRequestFilterSlots {}, + ); let mut geyser_stream = client .subscribe_once( @@ -84,7 +74,7 @@ pub async fn start_tracking_banking_stage_errors( .await .unwrap(); log::info!("started geyser banking stage subscription"); - while let Some(message) = geyser_stream.next().await { + while let Ok(Some(message)) = tokio::time::timeout(Duration::from_secs(30), geyser_stream.next()).await { let Ok(message) = message else { continue; }; @@ -139,8 +129,7 @@ pub async fn start_tracking_banking_stage_errors( _=>{} } } - error!("geyser banking stage connection failed {}", grpc_address); - tokio::time::sleep(Duration::from_secs(1)).await; + error!("geyser banking stage connection failed {} restarting", grpc_address); } } @@ -247,7 +236,6 @@ async fn main() { let postgres = postgres::Postgres::new().await; let slot = Arc::new(AtomicU64::new(0)); - let no_block_subscription = grpc_block_addr.is_none(); postgres.spawn_transaction_infos_saver(map_of_infos.clone(), slot.clone()); let jhs = args .banking_grpc_addresses @@ -263,7 +251,6 @@ async fn main() { map_of_infos, slot_by_errors, slot, - no_block_subscription, ) .await; })