fix double-increment of attempt

This commit is contained in:
GroovieGermanikus 2024-03-11 15:36:56 +01:00
parent 48efa7bbe8
commit 4ffe39849f
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 34 additions and 14 deletions

View File

@ -66,7 +66,7 @@ pub fn create_geyser_reconnecting_stream(
} }
}); });
(ConnectionState::Connecting(attempt + 1, connection_task), Message::Connecting(attempt + 1)) (ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt + 1))
} }
ConnectionState::Connecting(attempt, connection_task) => { ConnectionState::Connecting(attempt, connection_task) => {

View File

@ -38,7 +38,8 @@ pub fn create_geyser_autoconnection_task(
) -> (AbortHandle, Receiver<Message>) { ) -> (AbortHandle, Receiver<Message>) {
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1); let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);
let abort_handle = create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender); let abort_handle =
create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender);
(abort_handle, receiver_channel) (abort_handle, receiver_channel)
} }
@ -61,7 +62,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
loop { loop {
state = match state { state = match state {
ConnectionState::NotConnected(attempt) => { ConnectionState::NotConnected(attempt) => {
let addr = grpc_source.grpc_addr.clone(); let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone(); let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone(); let config = grpc_source.tls_config.clone();
@ -88,14 +88,22 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
.await; .await;
match connect_result { match connect_result {
Ok(client) => ConnectionState::Connecting(attempt + 1, client), Ok(client) => ConnectionState::Connecting(attempt, client),
Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError( Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError(
attempt + 1, FatalErrorReason::ConfigurationError, attempt + 1,
), Err(GeyserGrpcClientError::MetadataValueError(_)) => { FatalErrorReason::ConfigurationError,
ConnectionState::FatalError(attempt + 1, FatalErrorReason::ConfigurationError) ),
Err(GeyserGrpcClientError::MetadataValueError(_)) => {
ConnectionState::FatalError(
attempt + 1,
FatalErrorReason::ConfigurationError,
)
} }
Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => { Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => {
ConnectionState::FatalError(attempt + 1, FatalErrorReason::ConfigurationError) ConnectionState::FatalError(
attempt + 1,
FatalErrorReason::ConfigurationError,
)
} }
Err(GeyserGrpcClientError::TonicError(tonic_error)) => { Err(GeyserGrpcClientError::TonicError(tonic_error)) => {
warn!( warn!(
@ -135,13 +143,16 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
match subscribe_result_timeout { match subscribe_result_timeout {
Ok(subscribe_result) => { Ok(subscribe_result) => {
match subscribe_result { match subscribe_result {
Ok(geyser_stream) => ConnectionState::Ready(geyser_stream), Ok(geyser_stream) => {
debug!("subscribed to {} after {} failed attempts", grpc_source, attempt);
ConnectionState::Ready(geyser_stream)
},
Err(GeyserGrpcClientError::TonicError(_)) => { Err(GeyserGrpcClientError::TonicError(_)) => {
warn!("subscribe failed on {} - retrying", grpc_source); warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt + 1);
ConnectionState::RecoverableConnectionError(attempt + 1) ConnectionState::RecoverableConnectionError(attempt + 1)
} }
Err(GeyserGrpcClientError::TonicStatus(_)) => { Err(GeyserGrpcClientError::TonicStatus(_)) => {
warn!("subscribe failed on {} - retrying", grpc_source); warn!("subscribe failed on {} {} attempts - retrying", grpc_source, attempt + 1);
ConnectionState::RecoverableConnectionError(attempt + 1) ConnectionState::RecoverableConnectionError(attempt + 1)
} }
// non-recoverable // non-recoverable
@ -150,7 +161,10 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
"subscribe to {} failed with unrecoverable error: {}", "subscribe to {} failed with unrecoverable error: {}",
grpc_source, unrecoverable_error grpc_source, unrecoverable_error
); );
ConnectionState::FatalError(attempt + 1, FatalErrorReason::SubscribeError) ConnectionState::FatalError(
attempt + 1,
FatalErrorReason::SubscribeError,
)
} }
} }
} }
@ -252,13 +266,19 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
} }
Err(_send_error) => { Err(_send_error) => {
warn!("downstream receiver closed, message is lost - aborting"); warn!("downstream receiver closed, message is lost - aborting");
break 'recv_loop ConnectionState::FatalError(0, FatalErrorReason::DownstreamChannelClosed); break 'recv_loop ConnectionState::FatalError(
0,
FatalErrorReason::DownstreamChannelClosed,
);
} }
} }
} }
Err(SendTimeoutError::Closed(_)) => { Err(SendTimeoutError::Closed(_)) => {
warn!("downstream receiver closed - aborting"); warn!("downstream receiver closed - aborting");
break 'recv_loop ConnectionState::FatalError(0, FatalErrorReason::DownstreamChannelClosed); break 'recv_loop ConnectionState::FatalError(
0,
FatalErrorReason::DownstreamChannelClosed,
);
} }
} }
} }