restart attempt counter in ready state

This commit is contained in:
GroovieGermanikus 2024-03-11 15:27:32 +01:00
parent a6ab686fa4
commit 48efa7bbe8
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
3 changed files with 39 additions and 57 deletions

View File

@ -12,7 +12,7 @@ use yellowstone_grpc_proto::tonic::Status;
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected(Attempt),
Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>),
Ready(Attempt, S),
Ready(S),
WaitReconnect(Attempt),
}
@ -32,8 +32,7 @@ pub fn create_geyser_reconnecting_stream(
(state, yield_value) = match state {
ConnectionState::NotConnected(mut attempt) => {
attempt += 1;
ConnectionState::NotConnected(attempt) => {
let connection_task = tokio::spawn({
let addr = grpc_source.grpc_addr.clone();
@ -43,7 +42,7 @@ pub fn create_geyser_reconnecting_stream(
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
let subscribe_filter = subscribe_filter.clone();
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
log!(if attempt >= 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt + 1, addr);
async move {
let connect_result = GeyserGrpcClient::connect_with_timeout(
@ -67,18 +66,18 @@ pub fn create_geyser_reconnecting_stream(
}
});
(ConnectionState::Connecting(attempt, connection_task), Message::Connecting(attempt))
(ConnectionState::Connecting(attempt + 1, connection_task), Message::Connecting(attempt + 1))
}
ConnectionState::Connecting(attempt, connection_task) => {
let subscribe_result = connection_task.await;
match subscribe_result {
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(attempt, subscribed_stream), Message::Connecting(attempt)),
Ok(Ok(subscribed_stream)) => (ConnectionState::Ready(subscribed_stream), Message::Connecting(attempt)),
Ok(Err(geyser_error)) => {
// ATM we consider all errors recoverable
warn!("subscribe failed on {} - retrying: {:?}", grpc_source, geyser_error);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
(ConnectionState::WaitReconnect(attempt + 1), Message::Connecting(attempt))
},
Err(geyser_grpc_task_error) => {
panic!("task aborted - should not happen :{geyser_grpc_task_error}");
@ -87,27 +86,27 @@ pub fn create_geyser_reconnecting_stream(
}
ConnectionState::Ready(attempt, mut geyser_stream) => {
ConnectionState::Ready(mut geyser_stream) => {
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await {
Ok(Some(Ok(update_message))) => {
trace!("> recv update message from {}", grpc_source);
(ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message)))
(ConnectionState::Ready(geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message)))
}
Ok(Some(Err(tonic_status))) => {
// ATM we consider all errors recoverable
warn!("error on {} - retrying: {:?}", grpc_source, tonic_status);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
(ConnectionState::WaitReconnect(0), Message::Connecting(0))
}
Ok(None) => {
// should not arrive here, Mean the stream close.
warn!("geyser stream closed on {} - retrying", grpc_source);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
(ConnectionState::WaitReconnect(0), Message::Connecting(0))
}
Err(_elapsed) => {
// timeout
warn!("geyser stream timeout on {} - retrying", grpc_source);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
(ConnectionState::WaitReconnect(0), Message::Connecting(0))
}
}

View File

@ -15,8 +15,9 @@ type Attempt = u32;
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
NotConnected(Attempt),
Connected(Attempt, GeyserGrpcClient<F>),
Ready(Attempt, S),
// connected but not subscribed
Connecting(Attempt, GeyserGrpcClient<F>),
Ready(S),
// error states
RecoverableConnectionError(Attempt),
// non-recoverable error
@ -59,8 +60,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
loop {
state = match state {
ConnectionState::NotConnected(mut attempt) => {
attempt += 1;
ConnectionState::NotConnected(attempt) => {
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
@ -68,13 +68,13 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
log!(
if attempt > 1 {
if attempt > 0 {
Level::Warn
} else {
Level::Debug
},
"Connecting attempt #{} to {}",
attempt,
"Connecting attempt {} to {}",
attempt + 1,
addr
);
let connect_result = GeyserGrpcClient::connect_with_timeout(
@ -88,47 +88,39 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
.await;
match connect_result {
Ok(client) => ConnectionState::Connected(attempt, client),
Ok(client) => ConnectionState::Connecting(attempt + 1, client),
Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError(
attempt,
FatalErrorReason::ConfigurationError,
),
Err(GeyserGrpcClientError::MetadataValueError(_)) => {
ConnectionState::FatalError(
attempt,
FatalErrorReason::ConfigurationError,
)
attempt + 1, FatalErrorReason::ConfigurationError,
), Err(GeyserGrpcClientError::MetadataValueError(_)) => {
ConnectionState::FatalError(attempt + 1, FatalErrorReason::ConfigurationError)
}
Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => {
ConnectionState::FatalError(
attempt,
FatalErrorReason::ConfigurationError,
)
ConnectionState::FatalError(attempt + 1, FatalErrorReason::ConfigurationError)
}
Err(GeyserGrpcClientError::TonicError(tonic_error)) => {
warn!(
"connect failed on {} - aborting: {:?}",
grpc_source, tonic_error
);
ConnectionState::FatalError(attempt, FatalErrorReason::NetworkError)
ConnectionState::FatalError(attempt + 1, FatalErrorReason::NetworkError)
}
Err(GeyserGrpcClientError::TonicStatus(tonic_status)) => {
warn!(
"connect failed on {} - retrying: {:?}",
grpc_source, tonic_status
);
ConnectionState::RecoverableConnectionError(attempt)
ConnectionState::RecoverableConnectionError(attempt + 1)
}
Err(GeyserGrpcClientError::SubscribeSendError(send_error)) => {
warn!(
"connect failed with send error on {} - retrying: {:?}",
grpc_source, send_error
);
ConnectionState::RecoverableConnectionError(attempt)
ConnectionState::RecoverableConnectionError(attempt + 1)
}
}
}
ConnectionState::Connected(attempt, mut client) => {
ConnectionState::Connecting(attempt, mut client) => {
let subscribe_timeout =
grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
let subscribe_filter = subscribe_filter.clone();
@ -143,14 +135,14 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
match subscribe_result_timeout {
Ok(subscribe_result) => {
match subscribe_result {
Ok(geyser_stream) => ConnectionState::Ready(attempt, geyser_stream),
Ok(geyser_stream) => ConnectionState::Ready(geyser_stream),
Err(GeyserGrpcClientError::TonicError(_)) => {
warn!("subscribe failed on {} - retrying", grpc_source);
ConnectionState::RecoverableConnectionError(attempt)
ConnectionState::RecoverableConnectionError(attempt + 1)
}
Err(GeyserGrpcClientError::TonicStatus(_)) => {
warn!("subscribe failed on {} - retrying", grpc_source);
ConnectionState::RecoverableConnectionError(attempt)
ConnectionState::RecoverableConnectionError(attempt + 1)
}
// non-recoverable
Err(unrecoverable_error) => {
@ -158,10 +150,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
"subscribe to {} failed with unrecoverable error: {}",
grpc_source, unrecoverable_error
);
ConnectionState::FatalError(
attempt,
FatalErrorReason::SubscribeError,
)
ConnectionState::FatalError(attempt + 1, FatalErrorReason::SubscribeError)
}
}
}
@ -170,7 +159,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
"subscribe failed with timeout on {} - retrying",
grpc_source
);
ConnectionState::RecoverableConnectionError(attempt)
ConnectionState::RecoverableConnectionError(attempt + 1)
}
}
}
@ -210,7 +199,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
sleep(Duration::from_secs_f32(backoff_secs)).await;
ConnectionState::NotConnected(attempt)
}
ConnectionState::Ready(attempt, mut geyser_stream) => {
ConnectionState::Ready(mut geyser_stream) => {
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
'recv_loop: loop {
match timeout(
@ -263,36 +252,30 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
}
Err(_send_error) => {
warn!("downstream receiver closed, message is lost - aborting");
break 'recv_loop ConnectionState::FatalError(
attempt,
FatalErrorReason::DownstreamChannelClosed,
);
break 'recv_loop ConnectionState::FatalError(0, FatalErrorReason::DownstreamChannelClosed);
}
}
}
Err(SendTimeoutError::Closed(_)) => {
warn!("downstream receiver closed - aborting");
break 'recv_loop ConnectionState::FatalError(
attempt,
FatalErrorReason::DownstreamChannelClosed,
);
break 'recv_loop ConnectionState::FatalError(0, FatalErrorReason::DownstreamChannelClosed);
}
}
}
Ok(Some(Err(tonic_status))) => {
// all tonic errors are recoverable
warn!("error on {} - retrying: {:?}", grpc_source, tonic_status);
break 'recv_loop ConnectionState::WaitReconnect(attempt);
break 'recv_loop ConnectionState::WaitReconnect(0);
}
Ok(None) => {
warn!("geyser stream closed on {} - retrying", grpc_source);
break 'recv_loop ConnectionState::WaitReconnect(attempt);
break 'recv_loop ConnectionState::WaitReconnect(0);
}
Err(_elapsed) => {
warn!("timeout on {} - retrying", grpc_source);
break 'recv_loop ConnectionState::WaitReconnect(attempt);
break 'recv_loop ConnectionState::WaitReconnect(0);
}
}
} // -- END match
} // -- END receive loop
}
} // -- END match