From 87a4bbc4848b635d805e1901cde4ad69ad580237 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Fri, 19 Jan 2024 11:46:19 +0100 Subject: [PATCH] use channel_plugger in test --- examples/stream_blocks_autoconnect.rs | 6 +++++- src/lib.rs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index 6b8d2a0..bb0be04 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -18,6 +18,7 @@ use tracing::warn; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; use yellowstone_grpc_proto::prost::Message as _; +use geyser_grpc_connector::channel_plugger::spawn_plugger_mpcs_to_broadcast; fn start_example_blockmini_consumer( multiplex_stream: impl Stream + Send + 'static, @@ -106,10 +107,13 @@ pub async fn main() { info!("Write Block stream.."); + let (broadcast_tx, broadcast_rx) = tokio::sync::broadcast::channel(100); let (jh_geyser_task, mut message_channel) = create_geyser_autoconnection_task( green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_and_txs(), ); + spawn_plugger_mpcs_to_broadcast(message_channel, broadcast_tx); + let mut message_channel = broadcast_rx; tokio::spawn(async move { if let TestCases::SlowReceiverStartup = TEST_CASE { @@ -117,7 +121,7 @@ pub async fn main() { } let mut message_count = 0; - while let Some(message) = message_channel.recv().await { + while let Ok(message) = message_channel.recv().await { if let TestCases::AbortTaskFromOutside = TEST_CASE { if message_count > 5 { info!("(testcase) aborting task from outside"); diff --git a/src/lib.rs b/src/lib.rs index 358d941..9e66b62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,11 +8,11 @@ use yellowstone_grpc_proto::geyser::{ }; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; -mod channel_plugger; pub mod grpc_subscription; pub mod grpc_subscription_autoreconnect_streams; pub mod grpc_subscription_autoreconnect_tasks; pub mod grpcmultiplex_fastestwins; +pub mod channel_plugger; mod obfuscate; #[derive(Clone, Debug)]