This commit is contained in:
GroovieGermanikus 2024-01-19 08:25:05 +01:00
parent 33cb8cbfa7
commit 21f2ef3b7c
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 7 additions and 2 deletions

View File

@ -8,6 +8,7 @@ use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::pin::Pin;
use std::time::Duration;
use anyhow::bail;
use tokio::sync::broadcast::error::SendError;
use tokio::sync::broadcast::Receiver;
use tokio::task::JoinHandle;
@ -115,15 +116,16 @@ fn map_commitment_level(commitment_config: CommitmentConfig) -> CommitmentLevel
enum TheState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
NotConnected(Attempt),
// Connected(Attempt, Box<Pin<GeyserGrpcClient<F>>>),
Connected(Attempt, GeyserGrpcClient<F>),
Ready(Attempt, S),
// error states
RecoverableConnectionError(Attempt),
// non-recoverable error
FatalError(Attempt),
WaitReconnect(Attempt),
}
/// return handler will exit on fatal error
pub fn create_geyser_reconnecting_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
@ -245,7 +247,8 @@ pub fn create_geyser_reconnecting_task(
}
FatalError(_) => {
// TOOD what to do
panic!("Fatal error")
error!("! fatal error grpc connection - aborting");
bail!("! fatal error grpc connection - aborting");
}
TheState::WaitReconnect(attempt) => {
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
@ -261,6 +264,8 @@ pub fn create_geyser_reconnecting_task(
match geyser_stream.next().await {
Some(Ok(update_message)) => {
trace!("> recv update message from {}", grpc_source);
// TODO consider extract this
// backpressure - should'n we block here?
match sender
.send(Message::GeyserSubscribeUpdate(Box::new(update_message)))
{