diff --git a/src/grpc_subscription_autoreconnect.rs b/src/grpc_subscription_autoreconnect.rs index 70909d5..d7e7e83 100644 --- a/src/grpc_subscription_autoreconnect.rs +++ b/src/grpc_subscription_autoreconnect.rs @@ -1,12 +1,12 @@ use async_stream::stream; use futures::{Stream, StreamExt}; -use log::{debug, info, trace, warn}; +use log::{debug, info, log, trace, warn, Level}; use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; use std::pin::Pin; use std::sync::atomic::{AtomicI32, Ordering}; use tokio::task::JoinHandle; -use tokio::time::{sleep, Duration}; +use tokio::time::{sleep, Duration, timeout}; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult}; use yellowstone_grpc_proto::geyser::{SubscribeRequestFilterBlocks, SubscribeUpdate}; use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta; @@ -21,6 +21,13 @@ trait GrpcConnectionFactory: Clone { ) -> GeyserGrpcClientResult>>>>; } +#[derive(Clone, Debug)] +pub struct GrpcConnectionTimeouts { + pub connect_timeout: Duration, + pub request_timeout: Duration, + pub subscribe_timeout: Duration, +} + #[derive(Clone, Debug)] pub struct GrpcSourceConfig { // symbolic name used in logs @@ -28,6 +35,7 @@ pub struct GrpcSourceConfig { grpc_addr: String, grpc_x_token: Option, tls_config: Option, + timeouts: Option, } impl GrpcSourceConfig { @@ -37,6 +45,7 @@ impl GrpcSourceConfig { grpc_addr, grpc_x_token, tls_config: None, + timeouts: None, } } } @@ -69,7 +78,7 @@ pub fn create_geyser_reconnecting_stream( }; let mut state = ConnectionState::NotConnected; - let connection_attempts = AtomicI32::new(0); + let connection_attempts = AtomicI32::new(1); // in case of cancellation, we restart from here: // thus we want to keep the progression in a state object outside the stream! makro @@ -85,13 +94,20 @@ pub fn create_geyser_reconnecting_stream( let addr = grpc_source.grpc_addr.clone(); let token = grpc_source.grpc_x_token.clone(); let config = grpc_source.tls_config.clone(); + let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout); + let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout); + let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout); // let (block_filter, blockmeta_filter) = blocks_filters.clone(); - info!("Connecting attempt #{} to {}", connection_attempts.fetch_add(1, Ordering::Relaxed), addr); + let attempt = connection_attempts.fetch_add(1, Ordering::Relaxed); + log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr); async move { let connect_result = GeyserGrpcClient::connect_with_timeout( - addr, token, config, - Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await; + addr, token, config, + connect_timeout, + request_timeout, + false) + .await; let mut client = connect_result?; // TODO make filter configurable for caller @@ -115,18 +131,20 @@ pub fn create_geyser_reconnecting_stream( // Connected; - client - .subscribe_once( - HashMap::new(), - Default::default(), - HashMap::new(), - Default::default(), - blocks_subs, - blocksmeta_subs, - Some(commitment_level), - Default::default(), - None, - ).await + timeout(subscribe_timeout.unwrap_or(Duration::MAX), + client + .subscribe_once( + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + blocks_subs, + blocksmeta_subs, + Some(commitment_level), + Default::default(), + None, + )) + .await.expect("timeout on subscribe_once") } });