From 6e15250b96637eea66c2054a7a0576bcfd3ee1b0 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 22 Jan 2024 10:55:03 +0100 Subject: [PATCH] minor cleanup --- examples/stream_blocks_autoconnect.rs | 18 +++++++++--------- src/grpc_subscription_autoreconnect_tasks.rs | 15 --------------- 2 files changed, 9 insertions(+), 24 deletions(-) diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index bb0be04..4e21eb3 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -5,6 +5,9 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; +use geyser_grpc_connector::channel_plugger::{ + spawn_broadcast_channel_plug, spawn_plugger_mpcs_to_broadcast, +}; use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream; use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{ create_geyser_autoconnection_task, Message, @@ -18,7 +21,6 @@ use tracing::warn; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; -use geyser_grpc_connector::channel_plugger::spawn_plugger_mpcs_to_broadcast; fn start_example_blockmini_consumer( multiplex_stream: impl Stream + Send + 'static, @@ -79,7 +81,7 @@ enum TestCases { CloseAfterReceiving, AbortTaskFromOutside, } -const TEST_CASE: TestCases = TestCases::TemporaryLaggingReceiver; +const TEST_CASE: TestCases = TestCases::Basic; #[tokio::main] pub async fn main() { @@ -107,13 +109,12 @@ pub async fn main() { info!("Write Block stream.."); - let (broadcast_tx, broadcast_rx) = tokio::sync::broadcast::channel(100); - let (jh_geyser_task, mut message_channel) = create_geyser_autoconnection_task( + let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task( green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), ); - spawn_plugger_mpcs_to_broadcast(message_channel, broadcast_tx); - let mut message_channel = broadcast_rx; + let mut message_channel = + spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel); tokio::spawn(async move { if let TestCases::SlowReceiverStartup = TEST_CASE { @@ -131,8 +132,7 @@ pub async fn main() { match message { Message::GeyserSubscribeUpdate(subscriber_update) => { message_count += 1; - // info!("got update: {:?}", subscriber_update.update_oneof.); - info!("got update!!!"); + info!("got update - {} bytes", subscriber_update.encoded_len()); if let TestCases::CloseAfterReceiving = TEST_CASE { info!("(testcase) closing stream after receiving"); @@ -155,5 +155,5 @@ pub async fn main() { }); // "infinite" sleep - sleep(Duration::from_secs(1800)).await; + sleep(Duration::from_secs(2000)).await; } diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index b21411e..52c61a5 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -295,21 +295,6 @@ pub fn create_geyser_autoconnection_task( ); } } - // { - // Ok(n_subscribers) => { - // trace!( - // "sent update message to {} subscribers (buffer={})", - // n_subscribers, - // sender.len() - // ); - // continue 'recv_loop; - // } - // Err(SendError(_)) => { - // // note: error does not mean that future sends will also fail! - // trace!("no subscribers for update message"); - // continue 'recv_loop; - // } - // }; } Some(Err(tonic_status)) => { // all tonic errors are recoverable