add connection state

This commit is contained in:
GroovieGermanikus 2024-01-18 10:09:32 +01:00
parent 24add74ebc
commit fa05a1bd5f
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 55 additions and 18 deletions

View File

@ -5,9 +5,7 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::pin::pin;
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, create_geyser_reconnecting_task, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
@ -92,16 +90,32 @@ pub async fn main() {
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
info!("Write Block stream..");
let green_stream = create_geyser_reconnecting_stream(
let (jh_geyser_task, mut green_stream) = create_geyser_reconnecting_task(
green_config.clone(),
GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
// GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
GeyserFilter(CommitmentConfig::confirmed()).blocks(),
);
let multiplex_stream = create_multiplexed_stream(
vec![green_stream],
BlockMiniExtractor(CommitmentConfig::confirmed()),
);
start_example_blockmini_consumer(multiplex_stream);
tokio::spawn(async move {
while let Some(mini) = green_stream.recv().await {
info!(
"emitted block mini #{}@{} with {} bytes from multiplexer",
mini.slot, mini.commitment_config.commitment, mini.blocksize
);
}
});
// let green_stream = create_geyser_reconnecting_stream(
// green_config.clone(),
// GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(),
// // GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(),
// );
// let multiplex_stream = create_multiplexed_stream(
// vec![green_stream],
// BlockMiniExtractor(CommitmentConfig::confirmed()),
// );
// start_example_blockmini_consumer(multiplex_stream);
// "infinite" sleep
sleep(Duration::from_secs(1800)).await;

View File

@ -13,6 +13,7 @@ use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrp
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate,
};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta;
use yellowstone_grpc_proto::tonic;
use yellowstone_grpc_proto::tonic::codegen::http::uri::InvalidUri;
@ -80,6 +81,8 @@ impl GrpcSourceConfig {
type Attempt = u32;
// wraps payload and status messages
// clone is required by broacast channel
#[derive(Clone)]
pub enum Message {
GeyserSubscribeUpdate(Box<SubscribeUpdate>),
// connect (attempt=1) or reconnect(attempt=2..)
@ -283,18 +286,18 @@ enum TheState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
pub fn create_geyser_reconnecting_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> Receiver<Message> {
) -> (JoinHandle<()>, Receiver<Message>) {
let (tx, rx) = tokio::sync::broadcast::channel::<Message>(1000);
let geyser_task = tokio::spawn(async move {
let mut attempt = 1;
let jh_geyser_task = tokio::spawn(async move {
let mut state = NotConnected(0);
loop {
state = match state {
NotConnected(_) => {
NotConnected(mut attempt) => {
attempt += 1;
let addr = grpc_source.grpc_addr.clone();
let token = grpc_source.grpc_x_token.clone();
let config = grpc_source.tls_config.clone();
@ -309,6 +312,7 @@ pub fn create_geyser_reconnecting_task(
request_timeout,
false)
.await;
let mut client = connect_result?;
debug!("Subscribe with filter {:?}", subscribe_filter);
@ -323,7 +327,7 @@ pub fn create_geyser_reconnecting_task(
match subscribe_result {
Ok(geyser_stream) => {
Connected(geyser_stream)
Connected(attempt, geyser_stream)
}
Err(GeyserGrpcClientError::TonicError(_)) => {
warn!("! subscribe failed on {} - retrying", grpc_source);
@ -344,11 +348,30 @@ pub fn create_geyser_reconnecting_task(
let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0);
info!("! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source);
sleep(Duration::from_secs_f32(backoff_secs)).await;
NotConnected(attempt)
}
FatalError(_) => {
// TOOD what to do
panic!("Fatal error")
}
Connected(attempt, mut geyser_stream) => {
match geyser_stream.next().await {
Some(Ok(update_message)) => {
trace!("> recv update message from {}", grpc_source);
(ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message)))
}
Some(Err(tonic_status)) => {
// ATM we consider all errors recoverable
warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
}
None => {
// should not arrive here, Mean the stream close.
warn!("geyser stream closed on {} - retrying", grpc_source);
(ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt))
}
}
}
}
}
@ -356,7 +379,7 @@ pub fn create_geyser_reconnecting_task(
});
rx
(jh_geyser_task, rx)
}