diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 55f6a4b..67cd4c4 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -31,6 +31,17 @@ enum FatalErrorReason { SubscribeError, } +pub fn create_geyser_autoconnection_task( + grpc_source: GrpcSourceConfig, + subscribe_filter: SubscribeRequest, +) -> (AbortHandle, Receiver) { + let (sender, receiver_channel) = tokio::sync::mpsc::channel::(1); + + let abort_handle = create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender); + + (abort_handle, receiver_channel) +} + /// connect to grpc source performing autoconect if required, /// returns mpsc channel; task will abort on fatal error /// @@ -38,12 +49,12 @@ enum FatalErrorReason { /// * no panic/unwrap /// * do not use "?" /// * do not "return" unless you really want to abort the task -pub fn create_geyser_autoconnection_task( +pub fn create_geyser_autoconnection_task_with_mpsc( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, -) -> (AbortHandle, Receiver) { + mpsc_sender: tokio::sync::mpsc::Sender, +) -> AbortHandle { // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ - let (sender, receiver_channel) = tokio::sync::mpsc::channel::(1); let jh_geyser_task = tokio::spawn(async move { let mut state = ConnectionState::NotConnected(0); @@ -220,7 +231,7 @@ pub fn create_geyser_autoconnection_task( Duration::from_millis(500) }; let started_at = Instant::now(); - match sender + match mpsc_sender .send_timeout( Message::GeyserSubscribeUpdate(Box::new(update_message)), warning_threshold, @@ -242,9 +253,9 @@ pub fn create_geyser_autoconnection_task( continue 'recv_loop; } Err(SendTimeoutError::Timeout(the_message)) => { - warn!("downstream receiver did not pick put message for {}ms - keep waiting", warning_threshold.as_millis()); + warn!("downstream receiver did not pick up message for {}ms - keep waiting", warning_threshold.as_millis()); - match sender.send(the_message).await { + match mpsc_sender.send(the_message).await { Ok(()) => { messages_forwarded += 1; trace!( @@ -291,7 +302,7 @@ pub fn create_geyser_autoconnection_task( } // -- endless state loop }); - (jh_geyser_task.abort_handle(), receiver_channel) + jh_geyser_task.abort_handle() } #[cfg(test)]