make attempt counter 1-based

This commit is contained in:
GroovieGermanikus 2024-03-11 15:45:28 +01:00
parent f1e174c1dc
commit 26bc7a3683
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
3 changed files with 9 additions and 11 deletions

View File

@ -22,7 +22,7 @@ pub fn create_geyser_reconnecting_stream(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> impl Stream<Item = Message> {
let mut state = ConnectionState::NotConnected(0);
let mut state = ConnectionState::NotConnected(1);
// in case of cancellation, we restart from here:
// thus we want to keep the progression in a state object outside the stream! makro
@ -42,7 +42,7 @@ pub fn create_geyser_reconnecting_stream(
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
let subscribe_filter = subscribe_filter.clone();
log!(if attempt >= 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt + 1, addr);
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
async move {
let connect_result = GeyserGrpcClient::connect_with_timeout(
@ -66,7 +66,7 @@ pub fn create_geyser_reconnecting_stream(
}
});
(ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt + 1))
(ConnectionState::Connecting(attempt + 1, connection_task), Message::Connecting(attempt + 1))
}
ConnectionState::Connecting(attempt, connection_task) => {
@ -77,7 +77,7 @@ pub fn create_geyser_reconnecting_stream(
Ok(Err(geyser_error)) => {
// ATM we consider all errors recoverable
warn!("subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error);
(ConnectionState::WaitReconnect(attempt + 1), Message::Connecting(attempt))
(ConnectionState::WaitReconnect(attempt + 1), Message::Connecting(attempt + 1))
},
Err(geyser_grpc_task_error) => {
panic!("task aborted - should not happen :{geyser_grpc_task_error}");
@ -96,17 +96,17 @@ pub fn create_geyser_reconnecting_stream(
Ok(Some(Err(tonic_status))) => {
// ATM we consider all errors recoverable
warn!("error on {} - retrying: {:?}", grpc_source, tonic_status);
(ConnectionState::WaitReconnect(0), Message::Connecting(0))
(ConnectionState::WaitReconnect(1), Message::Connecting(1))
}
Ok(None) => {
// should not arrive here, Mean the stream close.
warn!("geyser stream closed on {} - retrying", grpc_source);
(ConnectionState::WaitReconnect(0), Message::Connecting(0))
(ConnectionState::WaitReconnect(1), Message::Connecting(1))
}
Err(_elapsed) => {
// timeout
warn!("geyser stream timeout on {} - retrying", grpc_source);
(ConnectionState::WaitReconnect(0), Message::Connecting(0))
(ConnectionState::WaitReconnect(1), Message::Connecting(1))
}
}

View File

@ -1,4 +1,4 @@
use crate::{GrpcSourceConfig, Message};
use crate::{Attempt, GrpcSourceConfig, Message};
use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use std::time::Duration;
@ -11,9 +11,6 @@ use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::service::Interceptor;
use yellowstone_grpc_proto::tonic::Status;
// 1-based counter
type Attempt = u32;
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
NotConnected(Attempt),
// connected but not subscribed

View File

@ -14,6 +14,7 @@ pub mod grpc_subscription_autoreconnect_tasks;
pub mod grpcmultiplex_fastestwins;
mod obfuscate;
// 1-based attempt counter
type Attempt = u32;
// wraps payload and status messages