handle timeout for stream version
This commit is contained in:
parent
ec43c2972e
commit
db034040e8
|
@ -1,30 +1,15 @@
|
||||||
use crate::{Attempt, GrpcSourceConfig, Message};
|
use crate::{Attempt, GrpcSourceConfig, Message};
|
||||||
use async_stream::stream;
|
use async_stream::stream;
|
||||||
use futures::channel::mpsc;
|
|
||||||
use futures::{Stream, StreamExt};
|
use futures::{Stream, StreamExt};
|
||||||
use log::{debug, error, info, log, trace, warn, Level};
|
use log::{debug, error, info, log, trace, warn, Level};
|
||||||
use solana_sdk::commitment_config::CommitmentConfig;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::fmt::{Debug, Display};
|
use std::fmt::{Debug, Display};
|
||||||
use std::pin::Pin;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::broadcast::error::SendError;
|
|
||||||
use tokio::sync::broadcast::Receiver;
|
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use tokio::time::error::Elapsed;
|
use tokio::time::{sleep, timeout};
|
||||||
use tokio::time::{sleep, timeout, Timeout};
|
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
|
||||||
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult};
|
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
|
||||||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
|
||||||
use yellowstone_grpc_proto::geyser::{
|
|
||||||
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate,
|
|
||||||
};
|
|
||||||
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
|
|
||||||
use yellowstone_grpc_proto::tonic;
|
|
||||||
use yellowstone_grpc_proto::tonic::codegen::http::uri::InvalidUri;
|
|
||||||
use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue;
|
|
||||||
use yellowstone_grpc_proto::tonic::service::Interceptor;
|
use yellowstone_grpc_proto::tonic::service::Interceptor;
|
||||||
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
|
use yellowstone_grpc_proto::tonic::Status;
|
||||||
use yellowstone_grpc_proto::tonic::{Code, Status};
|
|
||||||
|
|
||||||
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
|
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
|
||||||
NotConnected(Attempt),
|
NotConnected(Attempt),
|
||||||
|
@ -105,22 +90,27 @@ pub fn create_geyser_reconnecting_stream(
|
||||||
}
|
}
|
||||||
|
|
||||||
ConnectionState::Ready(attempt, mut geyser_stream) => {
|
ConnectionState::Ready(attempt, mut geyser_stream) => {
|
||||||
|
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
|
||||||
match geyser_stream.next().await {
|
match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await {
|
||||||
Some(Ok(update_message)) => {
|
Ok(Some(Ok(update_message))) => {
|
||||||
trace!("> recv update message from {}", grpc_source);
|
trace!("> recv update message from {}", grpc_source);
|
||||||
(ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message)))
|
(ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message)))
|
||||||
}
|
}
|
||||||
Some(Err(tonic_status)) => {
|
Ok(Some(Err(tonic_status))) => {
|
||||||
// ATM we consider all errors recoverable
|
// ATM we consider all errors recoverable
|
||||||
warn!("error on {} - retrying: {:?}", grpc_source, tonic_status);
|
warn!("error on {} - retrying: {:?}", grpc_source, tonic_status);
|
||||||
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
|
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
|
||||||
}
|
}
|
||||||
None => {
|
Ok(None) => {
|
||||||
// should not arrive here, Mean the stream close.
|
// should not arrive here, Mean the stream close.
|
||||||
warn!("geyser stream closed on {} - retrying", grpc_source);
|
warn!("geyser stream closed on {} - retrying", grpc_source);
|
||||||
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
|
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
|
||||||
}
|
}
|
||||||
|
Err(_elapsed) => {
|
||||||
|
// timeout
|
||||||
|
warn!("geyser stream timeout on {} - retrying", grpc_source);
|
||||||
|
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue