diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index cb1b26d..b945e90 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -66,7 +66,7 @@ pub fn create_geyser_reconnecting_stream( } }); - (ConnectionState::Connecting(attempt + 1, connection_task), Message::Connecting(attempt + 1)) + (ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt + 1)) } ConnectionState::Connecting(attempt, connection_task) => { diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index a519935..34d3083 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -38,7 +38,8 @@ pub fn create_geyser_autoconnection_task( ) -> (AbortHandle, Receiver) { let (sender, receiver_channel) = tokio::sync::mpsc::channel::(1); - let abort_handle = create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender); + let abort_handle = + create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender); (abort_handle, receiver_channel) } @@ -61,7 +62,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc( loop { state = match state { ConnectionState::NotConnected(attempt) => { - let addr = grpc_source.grpc_addr.clone(); let token = grpc_source.grpc_x_token.clone(); let config = grpc_source.tls_config.clone(); @@ -88,14 +88,22 @@ pub fn create_geyser_autoconnection_task_with_mpsc( .await; match connect_result { - Ok(client) => ConnectionState::Connecting(attempt + 1, client), + Ok(client) => ConnectionState::Connecting(attempt, client), Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError( - attempt + 1, FatalErrorReason::ConfigurationError, - ), Err(GeyserGrpcClientError::MetadataValueError(_)) => { - ConnectionState::FatalError(attempt + 1, FatalErrorReason::ConfigurationError) + attempt + 1, + FatalErrorReason::ConfigurationError, + ), + Err(GeyserGrpcClientError::MetadataValueError(_)) => { + ConnectionState::FatalError( + attempt + 1, + FatalErrorReason::ConfigurationError, + ) } Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => { - ConnectionState::FatalError(attempt + 1, FatalErrorReason::ConfigurationError) + ConnectionState::FatalError( + attempt + 1, + FatalErrorReason::ConfigurationError, + ) } Err(GeyserGrpcClientError::TonicError(tonic_error)) => { warn!( @@ -135,13 +143,16 @@ pub fn create_geyser_autoconnection_task_with_mpsc( match subscribe_result_timeout { Ok(subscribe_result) => { match subscribe_result { - Ok(geyser_stream) => ConnectionState::Ready(geyser_stream), + Ok(geyser_stream) => { + debug!("subscribed to {} after {} failed attempts", grpc_source, attempt); + ConnectionState::Ready(geyser_stream) + }, Err(GeyserGrpcClientError::TonicError(_)) => { - warn!("subscribe failed on {} - retrying", grpc_source); + warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt + 1); ConnectionState::RecoverableConnectionError(attempt + 1) } Err(GeyserGrpcClientError::TonicStatus(_)) => { - warn!("subscribe failed on {} - retrying", grpc_source); + warn!("subscribe failed on {} {} attempts - retrying", grpc_source, attempt + 1); ConnectionState::RecoverableConnectionError(attempt + 1) } // non-recoverable @@ -150,7 +161,10 @@ pub fn create_geyser_autoconnection_task_with_mpsc( "subscribe to {} failed with unrecoverable error: {}", grpc_source, unrecoverable_error ); - ConnectionState::FatalError(attempt + 1, FatalErrorReason::SubscribeError) + ConnectionState::FatalError( + attempt + 1, + FatalErrorReason::SubscribeError, + ) } } } @@ -252,13 +266,19 @@ pub fn create_geyser_autoconnection_task_with_mpsc( } Err(_send_error) => { warn!("downstream receiver closed, message is lost - aborting"); - break 'recv_loop ConnectionState::FatalError(0, FatalErrorReason::DownstreamChannelClosed); + break 'recv_loop ConnectionState::FatalError( + 0, + FatalErrorReason::DownstreamChannelClosed, + ); } } } Err(SendTimeoutError::Closed(_)) => { warn!("downstream receiver closed - aborting"); - break 'recv_loop ConnectionState::FatalError(0, FatalErrorReason::DownstreamChannelClosed); + break 'recv_loop ConnectionState::FatalError( + 0, + FatalErrorReason::DownstreamChannelClosed, + ); } } }