diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index a0c29fe..66d6d61 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,37 +1,29 @@ use crate::{GrpcSourceConfig, Message}; -use anyhow::bail; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; -use solana_sdk::commitment_config::CommitmentConfig; -use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::future::Future; -use std::pin::Pin; use std::time::Duration; -use tokio::sync::mpsc::error::{SendError, SendTimeoutError}; +use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::Receiver; -use tokio::task::{AbortHandle, JoinHandle}; +use tokio::task::AbortHandle; +use tokio::time::{sleep, timeout, Instant}; use tokio::time::error::Elapsed; -use tokio::time::{sleep, timeout, Instant, Timeout}; -use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult}; -use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; -use yellowstone_grpc_proto::geyser::{ - CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate, -}; -use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta; -use yellowstone_grpc_proto::tonic; -use yellowstone_grpc_proto::tonic::codegen::http::uri::InvalidUri; -use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue; +use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError}; +use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::service::Interceptor; -use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; -use yellowstone_grpc_proto::tonic::{Code, Status}; +use yellowstone_grpc_proto::tonic::Status; type Attempt = u32; -enum ConnectionState>> { +enum ConnectionState>, F: Interceptor> { NotConnected(Attempt), - Connecting(Attempt, JoinHandle>), + Connected(Attempt, GeyserGrpcClient), Ready(Attempt, S), + // error states + RecoverableConnectionError(Attempt), + // non-recoverable error + FatalError(Attempt, FatalErrorReason), WaitReconnect(Attempt), } @@ -44,17 +36,6 @@ enum FatalErrorReason { Misc, } -enum State>, F: Interceptor> { - NotConnected(Attempt), - Connected(Attempt, GeyserGrpcClient), - Ready(Attempt, S), - // error states - RecoverableConnectionError(Attempt), - // non-recoverable error - FatalError(Attempt, FatalErrorReason), - WaitReconnect(Attempt), -} - /// connect to grpc source performing autoconect if required, /// returns mpsc channel; task will abort on fatal error /// @@ -70,12 +51,12 @@ pub fn create_geyser_autoconnection_task( let (sender, receiver_channel) = tokio::sync::mpsc::channel::(1); let jh_geyser_task = tokio::spawn(async move { - let mut state = State::NotConnected(0); + let mut state = ConnectionState::NotConnected(0); let mut messages_forwarded = 0; loop { state = match state { - State::NotConnected(mut attempt) => { + ConnectionState::NotConnected(mut attempt) => { attempt += 1; let addr = grpc_source.grpc_addr.clone(); @@ -104,40 +85,47 @@ pub fn create_geyser_autoconnection_task( .await; match connect_result { - Ok(client) => State::Connected(attempt, client), - Err(GeyserGrpcClientError::InvalidUri(_)) => { - State::FatalError(attempt, FatalErrorReason::ConfigurationError) - } + Ok(client) => ConnectionState::Connected(attempt, client), + Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError( + attempt, + FatalErrorReason::ConfigurationError, + ), Err(GeyserGrpcClientError::MetadataValueError(_)) => { - State::FatalError(attempt, FatalErrorReason::ConfigurationError) + ConnectionState::FatalError( + attempt, + FatalErrorReason::ConfigurationError, + ) } Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => { - State::FatalError(attempt, FatalErrorReason::ConfigurationError) + ConnectionState::FatalError( + attempt, + FatalErrorReason::ConfigurationError, + ) } Err(GeyserGrpcClientError::TonicError(tonic_error)) => { warn!( "! connect failed on {} - aborting: {:?}", grpc_source, tonic_error ); - State::FatalError(attempt, FatalErrorReason::NetworkError) + ConnectionState::FatalError(attempt, FatalErrorReason::NetworkError) } Err(GeyserGrpcClientError::TonicStatus(tonic_status)) => { warn!( "! connect failed on {} - retrying: {:?}", grpc_source, tonic_status ); - State::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt) } Err(GeyserGrpcClientError::SubscribeSendError(send_error)) => { warn!( "! connect failed with send error on {} - retrying: {:?}", grpc_source, send_error ); - State::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt) } } } - State::Connected(attempt, mut client) => { + ConnectionState::Connected(attempt, mut client) => { let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout); let subscribe_filter = subscribe_filter.clone(); @@ -152,14 +140,14 @@ pub fn create_geyser_autoconnection_task( match subscribe_result_timeout { Ok(subscribe_result) => { match subscribe_result { - Ok(geyser_stream) => State::Ready(attempt, geyser_stream), + Ok(geyser_stream) => ConnectionState::Ready(attempt, geyser_stream), Err(GeyserGrpcClientError::TonicError(_)) => { warn!("subscribe failed on {} - retrying", grpc_source); - State::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt) } Err(GeyserGrpcClientError::TonicStatus(_)) => { warn!("subscribe failed on {} - retrying", grpc_source); - State::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt) } // non-recoverable Err(unrecoverable_error) => { @@ -167,7 +155,10 @@ pub fn create_geyser_autoconnection_task( "! subscribe to {} failed with unrecoverable error: {}", grpc_source, unrecoverable_error ); - State::FatalError(attempt, FatalErrorReason::SubscribeError) + ConnectionState::FatalError( + attempt, + FatalErrorReason::SubscribeError, + ) } } } @@ -176,20 +167,20 @@ pub fn create_geyser_autoconnection_task( "! subscribe failed with timeout on {} - retrying", grpc_source ); - State::RecoverableConnectionError(attempt) + ConnectionState::RecoverableConnectionError(attempt) } } } - State::RecoverableConnectionError(attempt) => { + ConnectionState::RecoverableConnectionError(attempt) => { let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0); info!( "! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source ); sleep(Duration::from_secs_f32(backoff_secs)).await; - State::NotConnected(attempt) + ConnectionState::NotConnected(attempt) } - State::FatalError(_attempt, reason) => match reason { + ConnectionState::FatalError(_attempt, reason) => match reason { FatalErrorReason::DownstreamChannelClosed => { warn!("downstream closed - aborting"); return; @@ -211,25 +202,22 @@ pub fn create_geyser_autoconnection_task( return; } }, - State::WaitReconnect(attempt) => { + ConnectionState::WaitReconnect(attempt) => { let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0); info!( "! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source ); sleep(Duration::from_secs_f32(backoff_secs)).await; - State::NotConnected(attempt) + ConnectionState::NotConnected(attempt) } - State::Ready(attempt, mut geyser_stream) => { + ConnectionState::Ready(attempt, mut geyser_stream) => { + let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); 'recv_loop: loop { - match geyser_stream.next().await { - Some(Ok(update_message)) => { + match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await { + Ok(Some(Ok(update_message))) => { trace!("> recv update message from {}", grpc_source); - // TODO consider extract this - // backpressure - should'n we block here? - // TODO extract timeout param; TODO respect startup - // emit warning if message not received - // note: first send never blocks + // note: first send never blocks as the mpsc channel has capacity 1 let warning_threshold = if messages_forwarded == 1 { Duration::from_millis(3000) } else { @@ -271,7 +259,7 @@ pub fn create_geyser_autoconnection_task( } Err(_send_error) => { warn!("downstream receiver closed, message is lost - aborting"); - break 'recv_loop State::FatalError( + break 'recv_loop ConnectionState::FatalError( attempt, FatalErrorReason::DownstreamChannelClosed, ); @@ -280,21 +268,25 @@ pub fn create_geyser_autoconnection_task( } Err(SendTimeoutError::Closed(_)) => { warn!("downstream receiver closed - aborting"); - break 'recv_loop State::FatalError( + break 'recv_loop ConnectionState::FatalError( attempt, FatalErrorReason::DownstreamChannelClosed, ); } } } - Some(Err(tonic_status)) => { + Ok(Some(Err(tonic_status))) => { // all tonic errors are recoverable warn!("error on {} - retrying: {:?}", grpc_source, tonic_status); - break 'recv_loop State::WaitReconnect(attempt); + break 'recv_loop ConnectionState::WaitReconnect(attempt); } - None => { + Ok(None) => { warn!("geyser stream closed on {} - retrying", grpc_source); - break 'recv_loop State::WaitReconnect(attempt); + break 'recv_loop ConnectionState::WaitReconnect(attempt); + } + Err(_elapsed) => { + warn!("timeout on {} - retrying", grpc_source); + break 'recv_loop ConnectionState::WaitReconnect(attempt); } } } // -- end loop