diff --git a/src/grpc_subscription.rs b/src/grpc_subscription.rs index 477d29f..e4059f4 100644 --- a/src/grpc_subscription.rs +++ b/src/grpc_subscription.rs @@ -3,12 +3,13 @@ // rpc_polling::vote_accounts_and_cluster_info_polling::poll_vote_accounts_and_cluster_info, // }; use anyhow::{bail, Context}; -use futures::StreamExt; +use futures::{Stream, StreamExt}; use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; -use tokio::sync::broadcast::Sender; +use tokio::sync::broadcast::{Receiver, Sender}; use yellowstone_grpc_client::GeyserGrpcClient; +use yellowstone_grpc_proto::geyser::SubscribeRequest; use yellowstone_grpc_proto::prelude::{ subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeUpdateBlock, diff --git a/src/grpc_subscription_autoreconnect.rs b/src/grpc_subscription_autoreconnect.rs index 57a593f..84b6dbf 100644 --- a/src/grpc_subscription_autoreconnect.rs +++ b/src/grpc_subscription_autoreconnect.rs @@ -1,19 +1,25 @@ use async_stream::stream; use futures::{Stream, StreamExt}; -use log::{debug, info, log, trace, warn, Level}; +use log::{debug, info, log, trace, warn, Level, error}; use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::time::Duration; +use futures::channel::mpsc; +use tokio::sync::broadcast::Receiver; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; -use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult}; +use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult}; use yellowstone_grpc_proto::geyser::{ CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate, }; use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta; +use yellowstone_grpc_proto::tonic; +use yellowstone_grpc_proto::tonic::codegen::http::uri::InvalidUri; +use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; use yellowstone_grpc_proto::tonic::Status; +use crate::grpc_subscription_autoreconnect::TheState::{Connected, FatalError, NotConnected, RecoverableConnectionError}; #[derive(Clone, Debug)] pub struct GrpcConnectionTimeouts { @@ -264,6 +270,97 @@ pub fn create_geyser_reconnecting_stream( the_stream } + +enum TheState>> { + NotConnected(Attempt), + RecoverableConnectionError(Attempt), + FatalError(Attempt), + Connected(Attempt, S), + +} + + +pub fn create_geyser_reconnecting_task( + grpc_source: GrpcSourceConfig, + subscribe_filter: SubscribeRequest, +) -> Receiver { + let (tx, rx) = tokio::sync::broadcast::channel::(1000); + + let geyser_task = tokio::spawn(async move { + let mut attempt = 1; + + let mut state = NotConnected(0); + + loop { + + state = match state { + NotConnected(_) => { + let addr = grpc_source.grpc_addr.clone(); + let token = grpc_source.grpc_x_token.clone(); + let config = grpc_source.tls_config.clone(); + let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout); + let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout); + let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout); + let subscribe_filter = subscribe_filter.clone(); + log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr); + let connect_result = GeyserGrpcClient::connect_with_timeout( + addr, token, config, + connect_timeout, + request_timeout, + false) + .await; + let mut client = connect_result?; + + debug!("Subscribe with filter {:?}", subscribe_filter); + + let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX), + client + .subscribe_once2(subscribe_filter)) + .await; + + // maybe not optimal + let subscribe_result = subscribe_result.map_err(|_| Status::unknown("unspecific subscribe timeout"))?; + + match subscribe_result { + Ok(geyser_stream) => { + Connected(geyser_stream) + } + Err(GeyserGrpcClientError::TonicError(_)) => { + warn!("! subscribe failed on {} - retrying", grpc_source); + RecoverableConnectionError(attempt) + } + Err(GeyserGrpcClientError::TonicStatus(_)) => { + warn!("! subscribe failed on {} - retrying", grpc_source); + RecoverableConnectionError(attempt) + } + // non-recoverable + Err(unrecoverable_error) => { + error!("! subscribe to {} failed with unrecoverable error: {}", grpc_source, unrecoverable_error); + FatalError(attempt) + } + } + } + RecoverableConnectionError(attempt) => { + let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0); + info!("! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source); + sleep(Duration::from_secs_f32(backoff_secs)).await; + } + FatalError(_) => { + // TOOD what to do + panic!("Fatal error") + } + } + + } + + }); + + + rx +} + + + #[cfg(test)] mod tests { use super::*;