handle timeout
This commit is contained in:
parent
f22a0f289e
commit
ec43c2972e
|
@ -1,37 +1,29 @@
|
|||
use crate::{GrpcSourceConfig, Message};
|
||||
use anyhow::bail;
|
||||
use futures::{Stream, StreamExt};
|
||||
use log::{debug, error, info, log, trace, warn, Level};
|
||||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::error::{SendError, SendTimeoutError};
|
||||
use tokio::sync::mpsc::error::SendTimeoutError;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::task::{AbortHandle, JoinHandle};
|
||||
use tokio::task::AbortHandle;
|
||||
use tokio::time::{sleep, timeout, Instant};
|
||||
use tokio::time::error::Elapsed;
|
||||
use tokio::time::{sleep, timeout, Instant, Timeout};
|
||||
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult};
|
||||
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||
use yellowstone_grpc_proto::geyser::{
|
||||
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate,
|
||||
};
|
||||
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
|
||||
use yellowstone_grpc_proto::tonic;
|
||||
use yellowstone_grpc_proto::tonic::codegen::http::uri::InvalidUri;
|
||||
use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue;
|
||||
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
|
||||
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
|
||||
use yellowstone_grpc_proto::tonic::service::Interceptor;
|
||||
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
|
||||
use yellowstone_grpc_proto::tonic::{Code, Status};
|
||||
use yellowstone_grpc_proto::tonic::Status;
|
||||
|
||||
type Attempt = u32;
|
||||
|
||||
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
|
||||
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
|
||||
NotConnected(Attempt),
|
||||
Connecting(Attempt, JoinHandle<GeyserGrpcClientResult<S>>),
|
||||
Connected(Attempt, GeyserGrpcClient<F>),
|
||||
Ready(Attempt, S),
|
||||
// error states
|
||||
RecoverableConnectionError(Attempt),
|
||||
// non-recoverable error
|
||||
FatalError(Attempt, FatalErrorReason),
|
||||
WaitReconnect(Attempt),
|
||||
}
|
||||
|
||||
|
@ -44,17 +36,6 @@ enum FatalErrorReason {
|
|||
Misc,
|
||||
}
|
||||
|
||||
enum State<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
|
||||
NotConnected(Attempt),
|
||||
Connected(Attempt, GeyserGrpcClient<F>),
|
||||
Ready(Attempt, S),
|
||||
// error states
|
||||
RecoverableConnectionError(Attempt),
|
||||
// non-recoverable error
|
||||
FatalError(Attempt, FatalErrorReason),
|
||||
WaitReconnect(Attempt),
|
||||
}
|
||||
|
||||
/// connect to grpc source performing autoconect if required,
|
||||
/// returns mpsc channel; task will abort on fatal error
|
||||
///
|
||||
|
@ -70,12 +51,12 @@ pub fn create_geyser_autoconnection_task(
|
|||
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);
|
||||
|
||||
let jh_geyser_task = tokio::spawn(async move {
|
||||
let mut state = State::NotConnected(0);
|
||||
let mut state = ConnectionState::NotConnected(0);
|
||||
let mut messages_forwarded = 0;
|
||||
|
||||
loop {
|
||||
state = match state {
|
||||
State::NotConnected(mut attempt) => {
|
||||
ConnectionState::NotConnected(mut attempt) => {
|
||||
attempt += 1;
|
||||
|
||||
let addr = grpc_source.grpc_addr.clone();
|
||||
|
@ -104,40 +85,47 @@ pub fn create_geyser_autoconnection_task(
|
|||
.await;
|
||||
|
||||
match connect_result {
|
||||
Ok(client) => State::Connected(attempt, client),
|
||||
Err(GeyserGrpcClientError::InvalidUri(_)) => {
|
||||
State::FatalError(attempt, FatalErrorReason::ConfigurationError)
|
||||
}
|
||||
Ok(client) => ConnectionState::Connected(attempt, client),
|
||||
Err(GeyserGrpcClientError::InvalidUri(_)) => ConnectionState::FatalError(
|
||||
attempt,
|
||||
FatalErrorReason::ConfigurationError,
|
||||
),
|
||||
Err(GeyserGrpcClientError::MetadataValueError(_)) => {
|
||||
State::FatalError(attempt, FatalErrorReason::ConfigurationError)
|
||||
ConnectionState::FatalError(
|
||||
attempt,
|
||||
FatalErrorReason::ConfigurationError,
|
||||
)
|
||||
}
|
||||
Err(GeyserGrpcClientError::InvalidXTokenLength(_)) => {
|
||||
State::FatalError(attempt, FatalErrorReason::ConfigurationError)
|
||||
ConnectionState::FatalError(
|
||||
attempt,
|
||||
FatalErrorReason::ConfigurationError,
|
||||
)
|
||||
}
|
||||
Err(GeyserGrpcClientError::TonicError(tonic_error)) => {
|
||||
warn!(
|
||||
"! connect failed on {} - aborting: {:?}",
|
||||
grpc_source, tonic_error
|
||||
);
|
||||
State::FatalError(attempt, FatalErrorReason::NetworkError)
|
||||
ConnectionState::FatalError(attempt, FatalErrorReason::NetworkError)
|
||||
}
|
||||
Err(GeyserGrpcClientError::TonicStatus(tonic_status)) => {
|
||||
warn!(
|
||||
"! connect failed on {} - retrying: {:?}",
|
||||
grpc_source, tonic_status
|
||||
);
|
||||
State::RecoverableConnectionError(attempt)
|
||||
ConnectionState::RecoverableConnectionError(attempt)
|
||||
}
|
||||
Err(GeyserGrpcClientError::SubscribeSendError(send_error)) => {
|
||||
warn!(
|
||||
"! connect failed with send error on {} - retrying: {:?}",
|
||||
grpc_source, send_error
|
||||
);
|
||||
State::RecoverableConnectionError(attempt)
|
||||
ConnectionState::RecoverableConnectionError(attempt)
|
||||
}
|
||||
}
|
||||
}
|
||||
State::Connected(attempt, mut client) => {
|
||||
ConnectionState::Connected(attempt, mut client) => {
|
||||
let subscribe_timeout =
|
||||
grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
|
||||
let subscribe_filter = subscribe_filter.clone();
|
||||
|
@ -152,14 +140,14 @@ pub fn create_geyser_autoconnection_task(
|
|||
match subscribe_result_timeout {
|
||||
Ok(subscribe_result) => {
|
||||
match subscribe_result {
|
||||
Ok(geyser_stream) => State::Ready(attempt, geyser_stream),
|
||||
Ok(geyser_stream) => ConnectionState::Ready(attempt, geyser_stream),
|
||||
Err(GeyserGrpcClientError::TonicError(_)) => {
|
||||
warn!("subscribe failed on {} - retrying", grpc_source);
|
||||
State::RecoverableConnectionError(attempt)
|
||||
ConnectionState::RecoverableConnectionError(attempt)
|
||||
}
|
||||
Err(GeyserGrpcClientError::TonicStatus(_)) => {
|
||||
warn!("subscribe failed on {} - retrying", grpc_source);
|
||||
State::RecoverableConnectionError(attempt)
|
||||
ConnectionState::RecoverableConnectionError(attempt)
|
||||
}
|
||||
// non-recoverable
|
||||
Err(unrecoverable_error) => {
|
||||
|
@ -167,7 +155,10 @@ pub fn create_geyser_autoconnection_task(
|
|||
"! subscribe to {} failed with unrecoverable error: {}",
|
||||
grpc_source, unrecoverable_error
|
||||
);
|
||||
State::FatalError(attempt, FatalErrorReason::SubscribeError)
|
||||
ConnectionState::FatalError(
|
||||
attempt,
|
||||
FatalErrorReason::SubscribeError,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -176,20 +167,20 @@ pub fn create_geyser_autoconnection_task(
|
|||
"! subscribe failed with timeout on {} - retrying",
|
||||
grpc_source
|
||||
);
|
||||
State::RecoverableConnectionError(attempt)
|
||||
ConnectionState::RecoverableConnectionError(attempt)
|
||||
}
|
||||
}
|
||||
}
|
||||
State::RecoverableConnectionError(attempt) => {
|
||||
ConnectionState::RecoverableConnectionError(attempt) => {
|
||||
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
|
||||
info!(
|
||||
"! waiting {} seconds, then reconnect to {}",
|
||||
backoff_secs, grpc_source
|
||||
);
|
||||
sleep(Duration::from_secs_f32(backoff_secs)).await;
|
||||
State::NotConnected(attempt)
|
||||
ConnectionState::NotConnected(attempt)
|
||||
}
|
||||
State::FatalError(_attempt, reason) => match reason {
|
||||
ConnectionState::FatalError(_attempt, reason) => match reason {
|
||||
FatalErrorReason::DownstreamChannelClosed => {
|
||||
warn!("downstream closed - aborting");
|
||||
return;
|
||||
|
@ -211,25 +202,22 @@ pub fn create_geyser_autoconnection_task(
|
|||
return;
|
||||
}
|
||||
},
|
||||
State::WaitReconnect(attempt) => {
|
||||
ConnectionState::WaitReconnect(attempt) => {
|
||||
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
|
||||
info!(
|
||||
"! waiting {} seconds, then reconnect to {}",
|
||||
backoff_secs, grpc_source
|
||||
);
|
||||
sleep(Duration::from_secs_f32(backoff_secs)).await;
|
||||
State::NotConnected(attempt)
|
||||
ConnectionState::NotConnected(attempt)
|
||||
}
|
||||
State::Ready(attempt, mut geyser_stream) => {
|
||||
ConnectionState::Ready(attempt, mut geyser_stream) => {
|
||||
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
|
||||
'recv_loop: loop {
|
||||
match geyser_stream.next().await {
|
||||
Some(Ok(update_message)) => {
|
||||
match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await {
|
||||
Ok(Some(Ok(update_message))) => {
|
||||
trace!("> recv update message from {}", grpc_source);
|
||||
// TODO consider extract this
|
||||
// backpressure - should'n we block here?
|
||||
// TODO extract timeout param; TODO respect startup
|
||||
// emit warning if message not received
|
||||
// note: first send never blocks
|
||||
// note: first send never blocks as the mpsc channel has capacity 1
|
||||
let warning_threshold = if messages_forwarded == 1 {
|
||||
Duration::from_millis(3000)
|
||||
} else {
|
||||
|
@ -271,7 +259,7 @@ pub fn create_geyser_autoconnection_task(
|
|||
}
|
||||
Err(_send_error) => {
|
||||
warn!("downstream receiver closed, message is lost - aborting");
|
||||
break 'recv_loop State::FatalError(
|
||||
break 'recv_loop ConnectionState::FatalError(
|
||||
attempt,
|
||||
FatalErrorReason::DownstreamChannelClosed,
|
||||
);
|
||||
|
@ -280,21 +268,25 @@ pub fn create_geyser_autoconnection_task(
|
|||
}
|
||||
Err(SendTimeoutError::Closed(_)) => {
|
||||
warn!("downstream receiver closed - aborting");
|
||||
break 'recv_loop State::FatalError(
|
||||
break 'recv_loop ConnectionState::FatalError(
|
||||
attempt,
|
||||
FatalErrorReason::DownstreamChannelClosed,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(tonic_status)) => {
|
||||
Ok(Some(Err(tonic_status))) => {
|
||||
// all tonic errors are recoverable
|
||||
warn!("error on {} - retrying: {:?}", grpc_source, tonic_status);
|
||||
break 'recv_loop State::WaitReconnect(attempt);
|
||||
break 'recv_loop ConnectionState::WaitReconnect(attempt);
|
||||
}
|
||||
None => {
|
||||
Ok(None) => {
|
||||
warn!("geyser stream closed on {} - retrying", grpc_source);
|
||||
break 'recv_loop State::WaitReconnect(attempt);
|
||||
break 'recv_loop ConnectionState::WaitReconnect(attempt);
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
warn!("timeout on {} - retrying", grpc_source);
|
||||
break 'recv_loop ConnectionState::WaitReconnect(attempt);
|
||||
}
|
||||
}
|
||||
} // -- end loop
|
||||
|
|
Loading…
Reference in New Issue