From 21f2ef3b7cdc0ae4975b17838e57390340648dbd Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 19 Jan 2024 08:25:05 +0100 Subject: [PATCH] wip --- src/grpc_subscription_autoreconnect_tasks.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 91e3986..9b96e00 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -8,6 +8,7 @@ use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::pin::Pin; use std::time::Duration; +use anyhow::bail; use tokio::sync::broadcast::error::SendError; use tokio::sync::broadcast::Receiver; use tokio::task::JoinHandle; @@ -115,15 +116,16 @@ fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel enum TheState>, F: Interceptor> { NotConnected(Attempt), - // Connected(Attempt, Box>>), Connected(Attempt, GeyserGrpcClient), Ready(Attempt, S), // error states RecoverableConnectionError(Attempt), + // non-recoverable error FatalError(Attempt), WaitReconnect(Attempt), } +/// return handler will exit on fatal error pub fn create_geyser_reconnecting_task( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, @@ -245,7 +247,8 @@ pub fn create_geyser_reconnecting_task( } FatalError(_) => { // TOOD what to do - panic!("Fatal error") + error!("! fatal error grpc connection - aborting"); + bail!("! fatal error grpc connection - aborting"); } TheState::WaitReconnect(attempt) => { let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0); @@ -261,6 +264,8 @@ pub fn create_geyser_reconnecting_task( match geyser_stream.next().await { Some(Ok(update_message)) => { trace!("> recv update message from {}", grpc_source); + // TODO consider extract this + // backpressure - should'n we block here? match sender .send(Message::GeyserSubscribeUpdate(Box::new(update_message))) {