From db034040e86dd9c6606769b6f7752d69166ca57b Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 26 Jan 2024 19:19:05 +0100 Subject: [PATCH] handle timeout for stream version --- ...grpc_subscription_autoreconnect_streams.rs | 38 +++++++------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/src/grpc_subscription_autoreconnect_streams.rs b/src/grpc_subscription_autoreconnect_streams.rs index dfea64f..7d403a6 100644 --- a/src/grpc_subscription_autoreconnect_streams.rs +++ b/src/grpc_subscription_autoreconnect_streams.rs @@ -1,30 +1,15 @@ use crate::{Attempt, GrpcSourceConfig, Message}; use async_stream::stream; -use futures::channel::mpsc; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; -use solana_sdk::commitment_config::CommitmentConfig; -use std::collections::HashMap; use std::fmt::{Debug, Display}; -use std::pin::Pin; use std::time::Duration; -use tokio::sync::broadcast::error::SendError; -use tokio::sync::broadcast::Receiver; use tokio::task::JoinHandle; -use tokio::time::error::Elapsed; -use tokio::time::{sleep, timeout, Timeout}; -use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult}; -use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; -use yellowstone_grpc_proto::geyser::{ - CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate, -}; -use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta; -use yellowstone_grpc_proto::tonic; -use yellowstone_grpc_proto::tonic::codegen::http::uri::InvalidUri; -use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue; +use tokio::time::{sleep, timeout}; +use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult}; +use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate}; use yellowstone_grpc_proto::tonic::service::Interceptor; -use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; -use yellowstone_grpc_proto::tonic::{Code, Status}; +use yellowstone_grpc_proto::tonic::Status; enum ConnectionState>> { NotConnected(Attempt), @@ -105,22 +90,27 @@ pub fn create_geyser_reconnecting_stream( } ConnectionState::Ready(attempt, mut geyser_stream) => { - - match geyser_stream.next().await { - Some(Ok(update_message)) => { + let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); + match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await { + Ok(Some(Ok(update_message))) => { trace!("> recv update message from {}", grpc_source); (ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message))) } - Some(Err(tonic_status)) => { + Ok(Some(Err(tonic_status))) => { // ATM we consider all errors recoverable warn!("error on {} - retrying: {:?}", grpc_source, tonic_status); (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) } - None => { + Ok(None) => { // should not arrive here, Mean the stream close. warn!("geyser stream closed on {} - retrying", grpc_source); (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) } + Err(_elapsed) => { + // timeout + warn!("geyser stream timeout on {} - retrying", grpc_source); + (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + } } }