diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index b945e90..2098dce 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -22,7 +22,7 @@ pub fn create_geyser_reconnecting_stream( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, ) -> impl Stream { - let mut state = ConnectionState::NotConnected(0); + let mut state = ConnectionState::NotConnected(1); // in case of cancellation, we restart from here: // thus we want to keep the progression in a state object outside the stream! makro @@ -42,7 +42,7 @@ pub fn create_geyser_reconnecting_stream( let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout); let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout); let subscribe_filter = subscribe_filter.clone(); - log!(if attempt >= 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt + 1, addr); + log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr); async move { let connect_result = GeyserGrpcClient::connect_with_timeout( @@ -66,7 +66,7 @@ pub fn create_geyser_reconnecting_stream( } }); - (ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt + 1)) + (ConnectionState::Connecting(attempt + 1, connection_task), Message::Connecting(attempt + 1)) } ConnectionState::Connecting(attempt, connection_task) => { @@ -77,7 +77,7 @@ pub fn create_geyser_reconnecting_stream( Ok(Err(geyser_error)) => { // ATM we consider all errors recoverable warn!("subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error); - (ConnectionState::WaitReconnect(attempt + 1), Message::Connecting(attempt)) + (ConnectionState::WaitReconnect(attempt + 1), Message::Connecting(attempt + 1)) }, Err(geyser_grpc_task_error) => { panic!("task aborted - should not happen :{geyser_grpc_task_error}"); @@ -96,17 +96,17 @@ pub fn create_geyser_reconnecting_stream( Ok(Some(Err(tonic_status))) => { // ATM we consider all errors recoverable warn!("error on {} - retrying: {:?}", grpc_source, tonic_status); - (ConnectionState::WaitReconnect(0), Message::Connecting(0)) + (ConnectionState::WaitReconnect(1), Message::Connecting(1)) } Ok(None) => { // should not arrive here, Mean the stream close. warn!("geyser stream closed on {} - retrying", grpc_source); - (ConnectionState::WaitReconnect(0), Message::Connecting(0)) + (ConnectionState::WaitReconnect(1), Message::Connecting(1)) } Err(_elapsed) => { // timeout warn!("geyser stream timeout on {} - retrying", grpc_source); - (ConnectionState::WaitReconnect(0), Message::Connecting(0)) + (ConnectionState::WaitReconnect(1), Message::Connecting(1)) } } diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index b196a29..6aa725f 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,4 +1,4 @@ -use crate::{GrpcSourceConfig, Message}; +use crate::{Attempt, GrpcSourceConfig, Message}; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; use std::time::Duration; @@ -11,9 +11,6 @@ use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::service::Interceptor; use yellowstone_grpc_proto::tonic::Status; -// 1-based counter -type Attempt = u32; - enum ConnectionState>, F: Interceptor> { NotConnected(Attempt), // connected but not subscribed diff --git a/src/lib.rs b/src/lib.rs index 0cf09c1..a5c1690 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod grpc_subscription_autoreconnect_tasks; pub mod grpcmultiplex_fastestwins; mod obfuscate; +// 1-based attempt counter type Attempt = u32; // wraps payload and status messages