diff --git a/examples/stream_blocks_autoconnect.rs b/examples/stream_blocks_autoconnect.rs index 02ddda6..09214be 100644 --- a/examples/stream_blocks_autoconnect.rs +++ b/examples/stream_blocks_autoconnect.rs @@ -2,8 +2,7 @@ use log::info; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use std::env; -use std::sync::Arc; -use tokio::sync::Notify; +use tokio::sync::broadcast; use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug; use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task; @@ -89,7 +88,7 @@ pub async fn main() { info!("Write Block stream.."); - let exit_notify = Arc::new(Notify::new()); + let (_, exit_notify) = broadcast::channel(1); let (jh_geyser_task, message_channel) = create_geyser_autoconnection_task( green_config.clone(), diff --git a/examples/stream_blocks_mainnet_task.rs b/examples/stream_blocks_mainnet_task.rs index 265bbdf..380a937 100644 --- a/examples/stream_blocks_mainnet_task.rs +++ b/examples/stream_blocks_mainnet_task.rs @@ -2,8 +2,6 @@ use log::{info, warn}; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use std::env; -use std::sync::Arc; -use tokio::sync::Notify; use base64::Engine; use itertools::Itertools; @@ -120,7 +118,7 @@ pub async fn main() { subscribe_timeout: Duration::from_secs(5), receive_timeout: Duration::from_secs(5), }; - let exit_notify = Arc::new(Notify::new()); + let (_, exit_notify) = tokio::sync::broadcast::channel(1); let green_config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone()); @@ -134,19 +132,19 @@ pub async fn main() { green_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), - exit_notify.clone(), + exit_notify.resubscribe(), ); let _blue_stream_ah = create_geyser_autoconnection_task_with_mpsc( blue_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), - exit_notify.clone(), + exit_notify.resubscribe(), ); let _toxiproxy_stream_ah = create_geyser_autoconnection_task_with_mpsc( toxiproxy_config.clone(), GeyserFilter(CommitmentConfig::confirmed()).blocks_meta(), autoconnect_tx.clone(), - exit_notify.clone(), + exit_notify, ); start_example_blockmeta_consumer(blockmeta_rx); diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 37d37f2..be0cd7b 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,11 +1,10 @@ use crate::{yellowstone_grpc_util, Attempt, GrpcSourceConfig, Message}; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; -use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::Receiver; -use tokio::sync::Notify; +use tokio::sync::broadcast; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout, Instant}; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError}; @@ -35,7 +34,7 @@ enum FatalErrorReason { pub fn create_geyser_autoconnection_task( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, - exit_notify: Arc, + exit_notify: broadcast::Receiver<()>, ) -> (JoinHandle<()>, Receiver) { let (sender, receiver_channel) = tokio::sync::mpsc::channel::(1); @@ -56,7 +55,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, mpsc_downstream: tokio::sync::mpsc::Sender, - exit_notify: Arc, + mut exit_notify: broadcast::Receiver<()>, ) -> JoinHandle<()> { // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ @@ -97,7 +96,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( ) => { res }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break 'main_loop; } }; @@ -156,7 +155,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( ) => { res }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break 'main_loop; } }; @@ -219,7 +218,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( _ = sleep(Duration::from_secs_f32(backoff_secs)) => { //slept }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break 'main_loop; } }; @@ -253,7 +252,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( _ = sleep(Duration::from_secs_f32(backoff_secs)) => { //slept }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break 'main_loop; } }; @@ -269,7 +268,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( ) => { res }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break 'main_loop; } }; @@ -292,7 +291,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( ) => { res }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break 'main_loop; } }; @@ -319,7 +318,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( res = mpsc_downstream.send(the_message)=> { res }, - _ = exit_notify.notified() => { + _ = exit_notify.recv() => { break 'main_loop; } };