diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 9b96e00..c9c955f 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -1,19 +1,18 @@ use crate::grpc_subscription_autoreconnect_tasks::TheState::*; -use async_stream::stream; -use futures::channel::mpsc; use futures::{Stream, StreamExt}; use log::{debug, error, info, log, trace, warn, Level}; use solana_sdk::commitment_config::CommitmentConfig; use std::collections::HashMap; use std::fmt::{Debug, Display}; +use std::future::Future; use std::pin::Pin; use std::time::Duration; use anyhow::bail; -use tokio::sync::broadcast::error::SendError; -use tokio::sync::broadcast::Receiver; +use tokio::sync::mpsc::error::{SendError, SendTimeoutError}; +use tokio::sync::mpsc::Receiver; use tokio::task::JoinHandle; use tokio::time::error::Elapsed; -use tokio::time::{sleep, timeout, Timeout}; +use tokio::time::{Instant, sleep, timeout, Timeout}; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult}; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::{ @@ -130,10 +129,12 @@ pub fn create_geyser_reconnecting_task( grpc_source: GrpcSourceConfig, subscribe_filter: SubscribeRequest, ) -> (JoinHandle<()>, Receiver) { - let (sender, receiver_stream) = tokio::sync::broadcast::channel::(1000); + // read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/ + let (sender, receiver_stream) = tokio::sync::mpsc::channel::(1); let jh_geyser_task = tokio::spawn(async move { let mut state = NotConnected(0); + let mut messages_forwared = 0; loop { state = match state { @@ -266,23 +267,51 @@ pub fn create_geyser_reconnecting_task( trace!("> recv update message from {}", grpc_source); // TODO consider extract this // backpressure - should'n we block here? - match sender - .send(Message::GeyserSubscribeUpdate(Box::new(update_message))) - { - Ok(n_subscribers) => { - trace!( - "sent update message to {} subscribers (buffer={})", - n_subscribers, - sender.len() - ); + // TODO extract timeout param; TODO respect startup + // emit warning if message not received + let warning_threshold = if messages_forwared < 1 { Duration::from_millis(5000) } else { Duration::from_millis(500) }; + let started_at = Instant::now(); + match sender.send_timeout(Message::GeyserSubscribeUpdate(Box::new(update_message)), warning_threshold).await { + Ok(()) => { + messages_forwared += 1; + trace!("sent update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32()); continue 'recv_loop; } - Err(SendError(_)) => { - // note: error does not mean that future sends will also fail! - trace!("no subscribers for update message"); - continue 'recv_loop; + Err(SendTimeoutError::Timeout(_)) => { + warn!("downstream receiver did not pick put message for {}ms - keep waiting", warning_threshold.as_millis()); + + match sender.send(Message::GeyserSubscribeUpdate(Box::new(update_message))).await { + Ok(()) => { + messages_forwared += 1; + trace!("sent delayed update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32()); + } + Err(_send_error ) => { + warn!("downstream receiver closed, message is lost - aborting"); + break 'recv_loop TheState::FatalError(attempt); + } + } + } - }; + Err(SendTimeoutError::Closed(_)) => { + warn!("downstream receiver closed - aborting"); + break 'recv_loop TheState::FatalError(attempt); + } + } + // { + // Ok(n_subscribers) => { + // trace!( + // "sent update message to {} subscribers (buffer={})", + // n_subscribers, + // sender.len() + // ); + // continue 'recv_loop; + // } + // Err(SendError(_)) => { + // // note: error does not mean that future sends will also fail! + // trace!("no subscribers for update message"); + // continue 'recv_loop; + // } + // }; } Some(Err(tonic_status)) => { // all tonic errors are recoverable