From a6ab686fa4607e95550d29f77f08ea074b90e91f Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 15 Feb 2024 14:56:10 +0100 Subject: [PATCH] add create_geyser_autoconnection_task_with_mpsc --- src/grpc_subscription_autoreconnect_tasks.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 67cd4c4..bf786e5 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -42,20 +42,17 @@ pub fn create_geyser_autoconnection_task( (abort_handle, receiver_channel) } -/// connect to grpc source performing autoconect if required, +/// connect to grpc source performing autoconnect if required, /// returns mpsc channel; task will abort on fatal error -/// -/// implementation hints: -/// * no panic/unwrap -/// * do not use "?" -/// * do not "return" unless you really want to abort the task +/// will shut down when receiver is dropped pub fn create_geyser_autoconnection_task_with_mpsc( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, - mpsc_sender: tokio::sync::mpsc::Sender, + mpsc_downstream: tokio::sync::mpsc::Sender, ) -> AbortHandle { // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ + // task will be aborted when downstream receiver gets dropped let jh_geyser_task = tokio::spawn(async move { let mut state = ConnectionState::NotConnected(0); let mut messages_forwarded = 0; @@ -231,7 +228,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( Duration::from_millis(500) }; let started_at = Instant::now(); - match mpsc_sender + match mpsc_downstream .send_timeout( Message::GeyserSubscribeUpdate(Box::new(update_message)), warning_threshold, @@ -255,7 +252,7 @@ pub fn create_geyser_autoconnection_task_with_mpsc( Err(SendTimeoutError::Timeout(the_message)) => { warn!("downstream receiver did not pick up message for {}ms - keep waiting", warning_threshold.as_millis()); - match mpsc_sender.send(the_message).await { + match mpsc_downstream.send(the_message).await { Ok(()) => { messages_forwarded += 1; trace!(