compiles
This commit is contained in:
parent
fa05a1bd5f
commit
c9144ee39f
|
@ -4,11 +4,13 @@ use log::{debug, info, log, trace, warn, Level, error};
|
|||
use solana_sdk::commitment_config::CommitmentConfig;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
use futures::channel::mpsc;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{sleep, timeout};
|
||||
use tokio::time::{sleep, timeout, Timeout};
|
||||
use tokio::time::error::Elapsed;
|
||||
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult};
|
||||
use yellowstone_grpc_proto::geyser::{
|
||||
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate,
|
||||
|
@ -18,9 +20,10 @@ use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
|
|||
use yellowstone_grpc_proto::tonic;
|
||||
use yellowstone_grpc_proto::tonic::codegen::http::uri::InvalidUri;
|
||||
use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue;
|
||||
use yellowstone_grpc_proto::tonic::service::Interceptor;
|
||||
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
|
||||
use yellowstone_grpc_proto::tonic::Status;
|
||||
use crate::grpc_subscription_autoreconnect::TheState::{Connected, FatalError, NotConnected, RecoverableConnectionError};
|
||||
use crate::grpc_subscription_autoreconnect::TheState::*;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct GrpcConnectionTimeouts {
|
||||
|
@ -274,12 +277,15 @@ pub fn create_geyser_reconnecting_stream(
|
|||
}
|
||||
|
||||
|
||||
enum TheState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
|
||||
enum TheState<S: Stream<Item = Result<SubscribeUpdate, Status>>, F: Interceptor> {
|
||||
NotConnected(Attempt),
|
||||
// Connected(Attempt, Box<Pin<GeyserGrpcClient<F>>>),
|
||||
Connected(Attempt, GeyserGrpcClient<F>),
|
||||
Ready(Attempt, S),
|
||||
// error states
|
||||
RecoverableConnectionError(Attempt),
|
||||
FatalError(Attempt),
|
||||
Connected(Attempt, S),
|
||||
|
||||
WaitReconnect(Attempt),
|
||||
}
|
||||
|
||||
|
||||
|
@ -313,21 +319,43 @@ pub fn create_geyser_reconnecting_task(
|
|||
false)
|
||||
.await;
|
||||
|
||||
let mut client = connect_result?;
|
||||
|
||||
match connect_result {
|
||||
Ok(client) => {
|
||||
Connected(attempt, client)
|
||||
}
|
||||
Err(_) => {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Connected(attempt, mut client) => {
|
||||
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
|
||||
let subscribe_filter = subscribe_filter.clone();
|
||||
debug!("Subscribe with filter {:?}", subscribe_filter);
|
||||
|
||||
let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX),
|
||||
client
|
||||
.subscribe_once2(subscribe_filter))
|
||||
let subscribe_result_timeout =
|
||||
timeout(subscribe_timeout.unwrap_or(Duration::MAX),
|
||||
client.subscribe_once2(subscribe_filter))
|
||||
.await;
|
||||
|
||||
let subscribe_result;
|
||||
match subscribe_result_timeout.map_err(|_| Status::unknown("unspecific subscribe timeout")) {
|
||||
Ok(fooo) => {
|
||||
subscribe_result = fooo;
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
// maybe not optimal
|
||||
let subscribe_result = subscribe_result.map_err(|_| Status::unknown("unspecific subscribe timeout"))?;
|
||||
// let subscribe_result = subscribe_result_timeout.map_err(|_| Status::unknown("unspecific subscribe timeout"));
|
||||
|
||||
match subscribe_result {
|
||||
Ok(geyser_stream) => {
|
||||
Connected(attempt, geyser_stream)
|
||||
Ready(attempt, geyser_stream)
|
||||
}
|
||||
Err(GeyserGrpcClientError::TonicError(_)) => {
|
||||
warn!("! subscribe failed on {} - retrying", grpc_source);
|
||||
|
@ -354,21 +382,27 @@ pub fn create_geyser_reconnecting_task(
|
|||
// TOOD what to do
|
||||
panic!("Fatal error")
|
||||
}
|
||||
Connected(attempt, mut geyser_stream) => {
|
||||
TheState::WaitReconnect(attempt) => {
|
||||
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
|
||||
info!("! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source);
|
||||
sleep(Duration::from_secs_f32(backoff_secs)).await;
|
||||
TheState::NotConnected(attempt)
|
||||
}
|
||||
Ready(attempt, mut geyser_stream) => {
|
||||
match geyser_stream.next().await {
|
||||
Some(Ok(update_message)) => {
|
||||
trace!("> recv update message from {}", grpc_source);
|
||||
(ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message)))
|
||||
TheState::Ready(attempt, geyser_stream)
|
||||
}
|
||||
Some(Err(tonic_status)) => {
|
||||
// ATM we consider all errors recoverable
|
||||
warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status);
|
||||
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
|
||||
TheState::WaitReconnect(attempt)
|
||||
}
|
||||
None => {
|
||||
// should not arrive here, Mean the stream close.
|
||||
warn!("geyser stream closed on {} - retrying", grpc_source);
|
||||
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
|
||||
TheState::WaitReconnect(attempt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue