make attempt 1-based

This commit is contained in:
GroovieGermanikus 2024-03-11 15:41:55 +01:00
parent 1fc40f0cd3
commit f1e174c1dc
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 13 additions and 13 deletions

View File

@ -11,6 +11,7 @@ use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::service::Interceptor;
use yellowstone_grpc_proto::tonic::Status;
// 1-based counter
type Attempt = u32;
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
@ -56,7 +57,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
// task will be aborted when downstream receiver gets dropped
let jh_geyser_task = tokio::spawn(async move {
let mut state = ConnectionState::NotConnected(0);
let mut state = ConnectionState::NotConnected(1);
let mut messages_forwarded = 0;
loop {
@ -68,13 +69,13 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
log!(
if attempt > 0 {
if attempt > 1 {
Level::Warn
} else {
Level::Debug
},
"Connecting attempt {} to {}",
attempt + 1,
attempt,
addr
);
let connect_result = GeyserGrpcClient::connect_with_timeout(
@ -144,15 +145,17 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
Ok(subscribe_result) => {
match subscribe_result {
Ok(geyser_stream) => {
debug!("subscribed to {} after {} failed attempts", grpc_source, attempt);
if attempt > 1 {
debug!("subscribed to {} after {} failed attempts", grpc_source, attempt);
}
ConnectionState::Ready(geyser_stream)
},
Err(GeyserGrpcClientError::TonicError(_)) => {
warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt + 1);
warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
Err(GeyserGrpcClientError::TonicStatus(_)) => {
warn!("subscribe failed on {} {} attempts - retrying", grpc_source, attempt + 1);
warn!("subscribe failed on {} after {} attempts - retrying", grpc_source, attempt);
ConnectionState::RecoverableConnectionError(attempt + 1)
}
// non-recoverable
@ -161,10 +164,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
"subscribe to {} failed with unrecoverable error: {}",
grpc_source, unrecoverable_error
);
ConnectionState::FatalError(
attempt + 1,
FatalErrorReason::SubscribeError,
)
ConnectionState::FatalError(attempt + 1, FatalErrorReason::SubscribeError)
}
}
}
@ -285,15 +285,15 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
Ok(Some(Err(tonic_status))) => {
// all tonic errors are recoverable
warn!("error on {} - retrying: {:?}", grpc_source, tonic_status);
break 'recv_loop ConnectionState::WaitReconnect(0);
break 'recv_loop ConnectionState::WaitReconnect(1);
}
Ok(None) => {
warn!("geyser stream closed on {} - retrying", grpc_source);
break 'recv_loop ConnectionState::WaitReconnect(0);
break 'recv_loop ConnectionState::WaitReconnect(1);
}
Err(_elapsed) => {
warn!("timeout on {} - retrying", grpc_source);
break 'recv_loop ConnectionState::WaitReconnect(0);
break 'recv_loop ConnectionState::WaitReconnect(1);
}
} // -- END match
} // -- END receive loop