diff --git a/src/grpc_subscription_autoreconnect.rs b/src/grpc_subscription_autoreconnect.rs index f66c8f3..dfad979 100644 --- a/src/grpc_subscription_autoreconnect.rs +++ b/src/grpc_subscription_autoreconnect.rs @@ -7,6 +7,7 @@ use std::fmt::{Debug, Display}; use std::pin::Pin; use std::time::Duration; use futures::channel::mpsc; +use tokio::sync::broadcast::error::SendError; use tokio::sync::broadcast::Receiver; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout, Timeout}; @@ -293,7 +294,7 @@ pub fn create_geyser_reconnecting_task( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, ) -> (JoinHandle<()>, Receiver) { - let (tx, rx) = tokio::sync::broadcast::channel::(1000); + let (sender, receiver_stream) = tokio::sync::broadcast::channel::(1000); let jh_geyser_task = tokio::spawn(async move { let mut state = NotConnected(0); @@ -389,20 +390,34 @@ pub fn create_geyser_reconnecting_task( TheState::NotConnected(attempt) } Ready(attempt, mut geyser_stream) => { - match geyser_stream.next().await { - Some(Ok(update_message)) => { - trace!("> recv update message from {}", grpc_source); - TheState::Ready(attempt, geyser_stream) - } - Some(Err(tonic_status)) => { - // ATM we consider all errors recoverable - warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status); - TheState::WaitReconnect(attempt) - } - None => { - // should not arrive here, Mean the stream close. - warn!("geyser stream closed on {} - retrying", grpc_source); - TheState::WaitReconnect(attempt) + 'recv_loop: loop { + match geyser_stream.next().await { + Some(Ok(update_message)) => { + trace!("> recv update message from {}", grpc_source); + match sender.send(Message::GeyserSubscribeUpdate(Box::new(update_message))) { + Ok(n_subscribers) => { + trace!("sent update message to {} subscribers (buffer={})", + n_subscribers, + sender.len()); + continue 'recv_loop; + } + Err(SendError(_)) => { + // note: error does not mean that future sends will also fail! + trace!("no subscribers for update message"); + continue 'recv_loop; + } + }; + } + Some(Err(tonic_status)) => { + // ATM we consider all errors recoverable + warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status); + break 'recv_loop TheState::WaitReconnect(attempt); + } + None => { + // should not arrive here, Mean the stream close. + warn!("geyser stream closed on {} - retrying", grpc_source); + break 'recv_loop TheState::WaitReconnect(attempt); + } } } } @@ -413,7 +428,7 @@ pub fn create_geyser_reconnecting_task( }); - (jh_geyser_task, rx) + (jh_geyser_task, receiver_stream) }