diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs new file mode 100644 index 0000000..c7eb42d --- /dev/null +++ b/examples/stream_blocks_autoconnect.rs @@ -0,0 +1,145 @@ +use futures::{Stream, StreamExt}; +use log::info; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use std::env; +use std::pin::pin; + +use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{ + create_geyser_reconnecting_stream, +}; +use geyser_grpc_connector::grpcmultiplex_fastestwins::{ + create_multiplexed_stream, FromYellowstoneExtractor, +}; +use tokio::time::{sleep, Duration}; +use tracing::warn; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::geyser::SubscribeUpdate; +use yellowstone_grpc_proto::prost::Message as _; +use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_autoconnection_task, Message}; +use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; + +fn start_example_blockmini_consumer( + multiplex_stream: impl Stream + Send + 'static, +) { + tokio::spawn(async move { + let mut blockmeta_stream = pin!(multiplex_stream); + while let Some(mini) = blockmeta_stream.next().await { + info!( + "emitted block mini #{}@{} with {} bytes from multiplexer", + mini.slot, mini.commitment_config.commitment, mini.blocksize + ); + } + }); +} + +pub struct BlockMini { + pub blocksize: usize, + pub slot: Slot, + pub commitment_config: CommitmentConfig, +} + +struct BlockMiniExtractor(CommitmentConfig); + +impl FromYellowstoneExtractor for BlockMiniExtractor { + type Target = BlockMini; + fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> { + match update.update_oneof { + Some(UpdateOneof::Block(update_block_message)) => { + let blocksize = update_block_message.encoded_len(); + let slot = update_block_message.slot; + let mini = BlockMini { + blocksize, + slot, + commitment_config: self.0, + }; + Some((slot, mini)) + } + Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => { + let blocksize = update_blockmeta_message.encoded_len(); + let slot = update_blockmeta_message.slot; + let mini = BlockMini { + blocksize, + slot, + commitment_config: self.0, + }; + Some((slot, mini)) + } + _ => None, + } + } +} + +enum TestCases { + Basic, + SlowReceiver, +} + + +#[tokio::main] +pub async fn main() { + // RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace + tracing_subscriber::fmt::init(); + // console_subscriber::init(); + + let test_case = TestCases::SlowReceiver; + + let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green"); + let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok(); + + info!( + "Using grpc source on {} ({})", + grpc_addr_green, + grpc_x_token_green.is_some() + ); + + let timeouts = GrpcConnectionTimeouts { + connect_timeout: Duration::from_secs(5), + request_timeout: Duration::from_secs(5), + subscribe_timeout: Duration::from_secs(5), + }; + + let green_config = + GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); + + info!("Write Block stream.."); + + let (jh_geyser_task, mut green_stream) = create_geyser_autoconnection_task( + green_config.clone(), + GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), + ); + + tokio::spawn(async move { + + if let TestCases::SlowReceiver = test_case { + sleep(Duration::from_secs(5)).await; + } + + while let Some(message) = green_stream.recv().await { + match message { + Message::GeyserSubscribeUpdate(subscriber_update) => { + // info!("got update: {:?}", subscriber_update.update_oneof.); + info!("got update!!!"); + } + Message::Connecting(attempt) => { + warn!("Connection attempt: {}", attempt); + } + } + } + warn!("Stream aborted"); + }); + + // let green_stream = create_geyser_reconnecting_stream( + // green_config.clone(), + // GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), + // // GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), + // ); + // let multiplex_stream = create_multiplexed_stream( + // vec![green_stream], + // BlockMiniExtractor(CommitmentConfig::confirmed()), + // ); + // start_example_blockmini_consumer(multiplex_stream); + + // "infinite" sleep + sleep(Duration::from_secs(1800)).await; +} diff --git a/examples/stream_blocks_single.rs b/examples/stream_blocks_single.rs index 2655598..e490c7c 100644 --- a/examples/stream_blocks_single.rs +++ b/examples/stream_blocks_single.rs @@ -5,9 +5,7 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; -use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{ - create_geyser_reconnecting_stream, -}; +use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{create_geyser_reconnecting_stream, Message}; use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; @@ -16,7 +14,6 @@ use tracing::warn; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; -use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{create_geyser_autoconnection_task, Message}; use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig}; fn start_example_blockmini_consumer( @@ -96,13 +93,15 @@ pub async fn main() { info!("Write Block stream.."); - let (jh_geyser_task, mut green_stream) = create_geyser_autoconnection_task( + let green_stream= create_geyser_reconnecting_stream( green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), ); + tokio::spawn(async move { - while let Some(message) = green_stream.recv().await { + let mut green_stream = pin!(green_stream); + while let Some(message) = green_stream.next().await { match message { Message::GeyserSubscribeUpdate(subscriber_update) => { // info!("got update: {:?}", subscriber_update.update_oneof.); diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 3fdfc93..288c469 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -71,7 +71,7 @@ pub fn create_geyser_autoconnection_task( let jh_geyser_task = tokio::spawn(async move { let mut state = NotConnected(0); - let mut messages_forwared = 0; + let mut messages_forwarded = 0; loop { state = match state { @@ -206,12 +206,19 @@ pub fn create_geyser_autoconnection_task( // backpressure - should'n we block here? // TODO extract timeout param; TODO respect startup // emit warning if message not received - let warning_threshold = if messages_forwared < 1 { Duration::from_millis(5000) } else { Duration::from_millis(500) }; + // note: first send never blocks + let warning_threshold = if messages_forwarded == 1 { Duration::from_millis(3000) } else { Duration::from_millis(500) }; let started_at = Instant::now(); match sender.send_timeout(Message::GeyserSubscribeUpdate(Box::new(update_message)), warning_threshold).await { Ok(()) => { - messages_forwared += 1; - trace!("sent update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32() * 1000.0); + messages_forwarded += 1; + if messages_forwarded == 1 { + // note: first send never blocks - do not print time as this is a lie + trace!("queued first update message"); + } else { + trace!("queued update message {} in {:.02}ms", + messages_forwarded, started_at.elapsed().as_secs_f32() * 1000.0); + } continue 'recv_loop; } Err(SendTimeoutError::Timeout(the_message)) => { @@ -219,8 +226,9 @@ pub fn create_geyser_autoconnection_task( match sender.send(the_message).await { Ok(()) => { - messages_forwared += 1; - trace!("sent delayed update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32() * 1000.0); + messages_forwarded += 1; + trace!("queued delayed update message {} in {:.02}ms", + messages_forwarded, started_at.elapsed().as_secs_f32() * 1000.0); } Err(_send_error ) => { warn!("downstream receiver closed, message is lost - aborting");