diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet.rs index c7017fb..157b6a6 100644 --- a/examples/stream_blocks_mainnet.rs +++ b/examples/stream_blocks_mainnet.rs @@ -5,17 +5,17 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::pin::pin; use geyser_grpc_connector::experimental::mock_literpc_core::{map_produced_block, ProducedBlock}; -use geyser_grpc_connector::grpc_subscription_autoreconnect::GrpcSourceConfig; +use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GrpcSourceConfig}; use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, FromYellowstoneMapper}; use tokio::time::{sleep, Duration}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; -fn start_example_consumer(block_stream: impl Stream + Send + 'static) { +fn start_example_consumer(multiplex_stream: impl Stream + Send + 'static) { tokio::spawn(async move { - let mut block_stream = pin!(block_stream); + let mut block_stream = pin!(multiplex_stream); while let Some(block) = block_stream.next().await { - info!("received block #{}", block.slot,); + info!("received block #{} from multiplexer", block.slot,); } }); } @@ -37,7 +37,7 @@ impl FromYellowstoneMapper for ExtractBlock { #[tokio::main] pub async fn main() { - // RUST_LOG=info,grpc_using_streams=debug + // RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace tracing_subscriber::fmt::init(); // console_subscriber::init(); @@ -56,10 +56,13 @@ pub async fn main() { let toxiproxy_config = GrpcSourceConfig::new("toxiproxy".to_string(), grpc_addr_mainnet_triton_toxi, None); + let green_stream = create_geyser_reconnecting_stream(green_config.clone(), CommitmentConfig::finalized()); + let blue_stream = create_geyser_reconnecting_stream(blue_config.clone(), CommitmentConfig::finalized()); + let toxiproxy_stream = create_geyser_reconnecting_stream(toxiproxy_config.clone(), CommitmentConfig::finalized()); let multiplex_stream = create_multiplex( - vec![green_config, blue_config, toxiproxy_config], + vec![green_stream, blue_stream, toxiproxy_stream], CommitmentConfig::finalized(), - ExtractBlock(CommitmentConfig::confirmed()), + ExtractBlock(CommitmentConfig::finalized()), ); start_example_consumer(multiplex_stream); diff --git a/src/grpc_subscription_autoreconnect.rs b/src/grpc_subscription_autoreconnect.rs index 41fb8b0..70909d5 100644 --- a/src/grpc_subscription_autoreconnect.rs +++ b/src/grpc_subscription_autoreconnect.rs @@ -4,6 +4,7 @@ use log::{debug, info, trace, warn}; use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; use std::pin::Pin; +use std::sync::atomic::{AtomicI32, Ordering}; use tokio::task::JoinHandle; use tokio::time::{sleep, Duration}; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult}; @@ -67,15 +68,17 @@ pub fn create_geyser_reconnecting_stream( _ => panic!("Only CONFIRMED and FINALIZED is supported/suggested"), }; - // NOT_CONNECTED; CONNECTING let mut state = ConnectionState::NotConnected; + let connection_attempts = AtomicI32::new(0); // in case of cancellation, we restart from here: // thus we want to keep the progression in a state object outside the stream! makro stream! { - loop{ + loop { let yield_value; + (state, yield_value) = match state { + ConnectionState::NotConnected => { let connection_task = tokio::spawn({ @@ -83,6 +86,7 @@ pub fn create_geyser_reconnecting_stream( let token = grpc_source.grpc_x_token.clone(); let config = grpc_source.tls_config.clone(); // let (block_filter, blockmeta_filter) = blocks_filters.clone(); + info!("Connecting attempt #{} to {}", connection_attempts.fetch_add(1, Ordering::Relaxed), addr); async move { let connect_result = GeyserGrpcClient::connect_with_timeout( @@ -128,6 +132,7 @@ pub fn create_geyser_reconnecting_stream( (ConnectionState::Connecting(connection_task), None) } + ConnectionState::Connecting(connection_task) => { let subscribe_result = connection_task.await; @@ -144,11 +149,12 @@ pub fn create_geyser_reconnecting_stream( } } + ConnectionState::Ready(mut geyser_stream) => { match geyser_stream.next().await { Some(Ok(update_message)) => { - trace!("> update message on {}", label); + trace!("> recv update message from {}", label); (ConnectionState::Ready(geyser_stream), Some(update_message)) } Some(Err(tonic_status)) => { @@ -164,12 +170,15 @@ pub fn create_geyser_reconnecting_stream( } } + ConnectionState::WaitReconnect => { - // TODO implement backoff + info!("Waiting a bit, then connect to {}", label); sleep(Duration::from_secs(1)).await; (ConnectionState::NotConnected, None) } + }; // -- match + yield yield_value } diff --git a/src/grpcmultiplex_fastestwins.rs b/src/grpcmultiplex_fastestwins.rs index 5f6e7f3..3cd6a01 100644 --- a/src/grpcmultiplex_fastestwins.rs +++ b/src/grpcmultiplex_fastestwins.rs @@ -13,14 +13,15 @@ pub trait FromYellowstoneMapper { fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)>; } -pub fn create_multiplex( - // TODO provide list of streams - grpc_sources: Vec, +/// use streams created by ``create_geyser_reconnecting_stream`` +/// note: this is agnostic to the type of the stream +pub fn create_multiplex( + grpc_source_streams: Vec>>, commitment_config: CommitmentConfig, - extractor: E, -) -> impl Stream + mapper: M, +) -> impl Stream where - E: FromYellowstoneMapper, + M: FromYellowstoneMapper, { assert!( commitment_config == CommitmentConfig::confirmed() @@ -29,29 +30,23 @@ where ); // note: PROCESSED blocks are not sequential in presense of forks; this will break the logic - if grpc_sources.is_empty() { + if grpc_source_streams.is_empty() { panic!("Must have at least one source"); } info!( - "Starting multiplexer with {} sources: {}", - grpc_sources.len(), - grpc_sources - .iter() - .map(|source| source.label.clone()) - .join(", ") + "Starting multiplexer with {} sources", + grpc_source_streams.len(), ); + // use merge let mut futures = futures::stream::SelectAll::new(); - for grpc_source in grpc_sources { - futures.push(Box::pin(create_geyser_reconnecting_stream( - grpc_source.clone(), - commitment_config, - ))); + for grpc_source in grpc_source_streams { + futures.push(Box::pin(grpc_source)); } - map_updates(futures, extractor) + map_updates(futures, mapper) } fn map_updates(geyser_stream: S, mapper: E) -> impl Stream