diff --git a/examples/stream_blocks_single.rs b/examples/stream_blocks_single.rs index 2d7ee51..f1d01f9 100644 --- a/examples/stream_blocks_single.rs +++ b/examples/stream_blocks_single.rs @@ -5,9 +5,7 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; -use geyser_grpc_connector::grpc_subscription_autoreconnect::{ - create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, -}; +use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, create_geyser_reconnecting_task, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; @@ -92,16 +90,32 @@ pub async fn main() { GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); info!("Write Block stream.."); - let green_stream = create_geyser_reconnecting_stream( + + let (jh_geyser_task, mut green_stream) = create_geyser_reconnecting_task( green_config.clone(), - GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), - // GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), + GeyserFilter(CommitmentConfig::confirmed()).blocks(), ); - let multiplex_stream = create_multiplexed_stream( - vec![green_stream], - BlockMiniExtractor(CommitmentConfig::confirmed()), - ); - start_example_blockmini_consumer(multiplex_stream); + + tokio::spawn(async move { + while let Some(mini) = green_stream.recv().await { + info!( + "emitted block mini #{}@{} with {} bytes from multiplexer", + mini.slot, mini.commitment_config.commitment, mini.blocksize + ); + } + }); + + + // let green_stream = create_geyser_reconnecting_stream( + // green_config.clone(), + // GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), + // // GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), + // ); + // let multiplex_stream = create_multiplexed_stream( + // vec![green_stream], + // BlockMiniExtractor(CommitmentConfig::confirmed()), + // ); + // start_example_blockmini_consumer(multiplex_stream); // "infinite" sleep sleep(Duration::from_secs(1800)).await; diff --git a/src/grpc_subscription_autoreconnect.rs b/src/grpc_subscription_autoreconnect.rs index 84b6dbf..d3adfae 100644 --- a/src/grpc_subscription_autoreconnect.rs +++ b/src/grpc_subscription_autoreconnect.rs @@ -13,6 +13,7 @@ use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrp use yellowstone_grpc_proto::geyser::{ CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeUpdate, }; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::prelude::SubscribeRequestFilterBlocksMeta; use yellowstone_grpc_proto::tonic; use yellowstone_grpc_proto::tonic::codegen::http::uri::InvalidUri; @@ -80,6 +81,8 @@ impl GrpcSourceConfig { type Attempt = u32; // wraps payload and status messages +// clone is required by broacast channel +#[derive(Clone)] pub enum Message { GeyserSubscribeUpdate(Box), // connect (attempt=1) or reconnect(attempt=2..) @@ -283,18 +286,18 @@ enum TheState>> { pub fn create_geyser_reconnecting_task( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, -) -> Receiver { +) -> (JoinHandle<()>, Receiver) { let (tx, rx) = tokio::sync::broadcast::channel::(1000); - let geyser_task = tokio::spawn(async move { - let mut attempt = 1; - + let jh_geyser_task = tokio::spawn(async move { let mut state = NotConnected(0); loop { state = match state { - NotConnected(_) => { + NotConnected(mut attempt) => { + attempt += 1; + let addr = grpc_source.grpc_addr.clone(); let token = grpc_source.grpc_x_token.clone(); let config = grpc_source.tls_config.clone(); @@ -309,6 +312,7 @@ pub fn create_geyser_reconnecting_task( request_timeout, false) .await; + let mut client = connect_result?; debug!("Subscribe with filter {:?}", subscribe_filter); @@ -323,7 +327,7 @@ pub fn create_geyser_reconnecting_task( match subscribe_result { Ok(geyser_stream) => { - Connected(geyser_stream) + Connected(attempt, geyser_stream) } Err(GeyserGrpcClientError::TonicError(_)) => { warn!("! subscribe failed on {} - retrying", grpc_source); @@ -344,11 +348,30 @@ pub fn create_geyser_reconnecting_task( let backoff_secs = 1.5_f32.powi(attempt as i32).min(15.0); info!("! waiting {} seconds, then reconnect to {}", backoff_secs, grpc_source); sleep(Duration::from_secs_f32(backoff_secs)).await; + NotConnected(attempt) } FatalError(_) => { // TOOD what to do panic!("Fatal error") } + Connected(attempt, mut geyser_stream) => { + match geyser_stream.next().await { + Some(Ok(update_message)) => { + trace!("> recv update message from {}", grpc_source); + (ConnectionState::Ready(attempt, geyser_stream), Message::GeyserSubscribeUpdate(Box::new(update_message))) + } + Some(Err(tonic_status)) => { + // ATM we consider all errors recoverable + warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status); + (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + } + None => { + // should not arrive here, Mean the stream close. + warn!("geyser stream closed on {} - retrying", grpc_source); + (ConnectionState::WaitReconnect(attempt), Message::Connecting(attempt)) + } + } + } } } @@ -356,7 +379,7 @@ pub fn create_geyser_reconnecting_task( }); - rx + (jh_geyser_task, rx) }