This commit is contained in:
GroovieGermanikus 2024-01-18 08:45:55 +01:00
parent 0aa9f4abaf
commit 24add74ebc
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 102 additions and 4 deletions

View File

@ -3,12 +3,13 @@
// rpc_polling::vote_accounts_and_cluster_info_polling::poll_vote_accounts_and_cluster_info,
// };
use anyhow::{bail, Context};
use futures::StreamExt;
use futures::{Stream, StreamExt};
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use tokio::sync::broadcast::Sender;
use tokio::sync::broadcast::{Receiver, Sender};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::SubscribeRequest;
use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
SubscribeUpdateBlock,

View File

@ -1,19 +1,25 @@
use async_stream::stream;
use futures::{Stream, StreamExt};
use log::{debug, info, log, trace, warn, Level};
use log::{debug, info, log, trace, warn, Level, error};
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::time::Duration;
use futures::channel::mpsc;
use tokio::sync::broadcast::Receiver;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate,
};
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::tonic;
use yellowstone_grpc_proto::tonic::codegen::http::uri::InvalidUri;
use yellowstone_grpc_proto::tonic::metadata::errors::InvalidMetadataValue;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use yellowstone_grpc_proto::tonic::Status;
use crate::grpc_subscription_autoreconnect::TheState::{Connected, FatalError, NotConnected, RecoverableConnectionError};
#[derive(Clone, Debug)]
pub struct GrpcConnectionTimeouts {
@ -264,6 +270,97 @@ pub fn create_geyser_reconnecting_stream(
the_stream
}
enum TheState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
NotConnected(Attempt),
RecoverableConnectionError(Attempt),
FatalError(Attempt),
Connected(Attempt, S),
}
pub fn create_geyser_reconnecting_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> Receiver<Message> {
let (tx, rx) = tokio::sync::broadcast::channel::<Message>(1000);
let geyser_task = tokio::spawn(async move {
let mut attempt = 1;
let mut state = NotConnected(0);
loop {
state = match state {
NotConnected(_) => {
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
let connect_timeout = grpc_source.timeouts.as_ref().map(|t| t.connect_timeout);
let request_timeout = grpc_source.timeouts.as_ref().map(|t| t.request_timeout);
let subscribe_timeout = grpc_source.timeouts.as_ref().map(|t| t.subscribe_timeout);
let subscribe_filter = subscribe_filter.clone();
log!(if attempt > 1 { Level::Warn } else { Level::Debug }, "Connecting attempt #{} to {}", attempt, addr);
let connect_result = GeyserGrpcClient::connect_with_timeout(
addr, token, config,
connect_timeout,
request_timeout,
false)
.await;
let mut client = connect_result?;
debug!("Subscribe with filter {:?}", subscribe_filter);
let subscribe_result = timeout(subscribe_timeout.unwrap_or(Duration::MAX),
client
.subscribe_once2(subscribe_filter))
.await;
// maybe not optimal
let subscribe_result = subscribe_result.map_err(|_| Status::unknown("unspecific subscribe timeout"))?;
match subscribe_result {
Ok(geyser_stream) => {
Connected(geyser_stream)
}
Err(GeyserGrpcClientError::TonicError(_)) => {
warn!("! subscribe failed on {} - retrying", grpc_source);
RecoverableConnectionError(attempt)
}
Err(GeyserGrpcClientError::TonicStatus(_)) => {
warn!("! subscribe failed on {} - retrying", grpc_source);
RecoverableConnectionError(attempt)
}
// non-recoverable
Err(unrecoverable_error) => {
error!("! subscribe to {} failed with unrecoverable error: {}", grpc_source, unrecoverable_error);
FatalError(attempt)
}
}
}
RecoverableConnectionError(attempt) => {
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
info!("! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source);
sleep(Duration::from_secs_f32(backoff_secs)).await;
}
FatalError(_) => {
// TOOD what to do
panic!("Fatal error")
}
}
}
});
rx
}
#[cfg(test)]
mod tests {
use super::*;