timeout for subscription

This commit is contained in:
GroovieGermanikus 2023-12-15 11:59:19 +01:00
parent a7f957a528
commit 761583d3b2
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 36 additions and 18 deletions

View File

@ -1,12 +1,12 @@
use async_stream::stream;
use futures::{Stream, StreamExt};
use log::{debug, info, trace, warn};
use log::{debug, info, log, trace, warn, Level};
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::atomic::{AtomicI32, Ordering};
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
use tokio::time::{sleep, Duration, timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::{SubscribeRequestFilterBlocks, SubscribeUpdate};
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
@ -21,6 +21,13 @@ trait GrpcConnectionFactory: Clone {
) -> GeyserGrpcClientResult<Pin<Box<dyn Stream<Item = Result<SubscribeUpdate, Status>>>>>;
}
#[derive(Clone, Debug)]
pub struct GrpcConnectionTimeouts {
pub connect_timeout: Duration,
pub request_timeout: Duration,
pub subscribe_timeout: Duration,
}
#[derive(Clone, Debug)]
pub struct GrpcSourceConfig {
// symbolic name used in logs
@ -28,6 +35,7 @@ pub struct GrpcSourceConfig {
grpc_addr: String,
grpc_x_token: Option<String>,
tls_config: Option<ClientTlsConfig>,
timeouts: Option<GrpcConnectionTimeouts>,
}
impl GrpcSourceConfig {
@ -37,6 +45,7 @@ impl GrpcSourceConfig {
grpc_addr,
grpc_x_token,
tls_config: None,
timeouts: None,
}
}
}
@ -69,7 +78,7 @@ pub fn create_geyser_reconnecting_stream(
};
let mut state = ConnectionState::NotConnected;
let connection_attempts = AtomicI32::new(0);
let connection_attempts = AtomicI32::new(1);
// in case of cancellation, we restart from here:
// thus we want to keep the progression in a state object outside the stream! makro
@ -85,13 +94,20 @@ pub fn create_geyser_reconnecting_stream(
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
// let (block_filter, blockmeta_filter) = blocks_filters.clone();
info!("Connecting attempt #{} to {}", connection_attempts.fetch_add(1, Ordering::Relaxed), addr);
let attempt = connection_attempts.fetch_add(1, Ordering::Relaxed);
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
async move {
let connect_result = GeyserGrpcClient::connect_with_timeout(
addr, token, config,
Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await;
addr, token, config,
connect_timeout,
request_timeout,
false)
.await;
let mut client = connect_result?;
// TODO make filter configurable for caller
@ -115,18 +131,20 @@ pub fn create_geyser_reconnecting_stream(
// Connected;
client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
blocksmeta_subs,
Some(commitment_level),
Default::default(),
None,
).await
timeout(subscribe_timeout.unwrap_or(Duration::MAX),
client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
blocksmeta_subs,
Some(commitment_level),
Default::default(),
None,
))
.await.expect("timeout on subscribe_once")
}
});