From b3808da95e98a36475e3bec6e72ea32e692ff4fe Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Mon, 22 Jan 2024 10:56:07 +0100 Subject: [PATCH] example --- src/channel_plugger.rs | 23 +++++++++++++++++++++-- src/lib.rs | 2 +- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/channel_plugger.rs b/src/channel_plugger.rs index 76c316b..83baf7f 100644 --- a/src/channel_plugger.rs +++ b/src/channel_plugger.rs @@ -4,13 +4,26 @@ use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc::error::SendTimeoutError; use tokio::time::{sleep, timeout}; +/// usage: see plug_pattern test +pub fn spawn_broadcast_channel_plug( + downstream_broadcast: ( + tokio::sync::broadcast::Sender, + tokio::sync::broadcast::Receiver, + ), + upstream: tokio::sync::mpsc::Receiver, +) -> tokio::sync::broadcast::Receiver { + spawn_plugger_mpcs_to_broadcast(upstream, downstream_broadcast.0); + downstream_broadcast.1 +} +/// note: backpressure will NOT get propagated to upstream pub fn spawn_plugger_mpcs_to_broadcast( mut upstream: tokio::sync::mpsc::Receiver, downstream: tokio::sync::broadcast::Sender, + // TODO allow multiple downstreams + fanout ) { // abort forwarder by closing the sender - let _donothing = tokio::spawn(async move { + let _private_handler = tokio::spawn(async move { while let Some(value) = upstream.recv().await { match downstream.send(value) { Ok(n_subscribers) => { @@ -26,11 +39,17 @@ pub fn spawn_plugger_mpcs_to_broadcast( }); } - #[cfg(test)] mod tests { use super::*; + #[tokio::test] + async fn plug_pattern() { + let (jh_task, message_channel) = tokio::sync::mpsc::channel::(1); + let broadcast_rx = + spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel); + } + #[tokio::test] async fn connect_broadcast_to_mpsc() { solana_logger::setup_with_default("debug"); diff --git a/src/lib.rs b/src/lib.rs index 9e66b62..5de9710 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,11 +8,11 @@ use yellowstone_grpc_proto::geyser::{ }; use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig; +pub mod channel_plugger; pub mod grpc_subscription; pub mod grpc_subscription_autoreconnect_streams; pub mod grpc_subscription_autoreconnect_tasks; pub mod grpcmultiplex_fastestwins; -pub mod channel_plugger; mod obfuscate; #[derive(Clone, Debug)]