switch to mpsc

This commit is contained in:
GroovieGermanikus 2024-01-19 08:43:47 +01:00
parent 21f2ef3b7c
commit dc53a50e57
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 49 additions and 20 deletions

View File

@ -1,19 +1,18 @@
use crate::grpc_subscription_autoreconnect_tasks::TheState::*;
use async_stream::stream;
use futures::channel::mpsc;
use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use anyhow::bail;
use tokio::sync::broadcast::error::SendError;
use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::error::{SendError, SendTimeoutError};
use tokio::sync::mpsc::Receiver;
use tokio::task::JoinHandle;
use tokio::time::error::Elapsed;
use tokio::time::{sleep, timeout, Timeout};
use tokio::time::{Instant, sleep, timeout, Timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
@ -130,10 +129,12 @@ pub fn create_geyser_reconnecting_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> (JoinHandle<()>, Receiver<Message>) {
let (sender, receiver_stream) = tokio::sync::broadcast::channel::<Message>(1000);
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
let (sender, receiver_stream) = tokio::sync::mpsc::channel::<Message>(1);
let jh_geyser_task = tokio::spawn(async move {
let mut state = NotConnected(0);
let mut messages_forwared = 0;
loop {
state = match state {
@ -266,23 +267,51 @@ pub fn create_geyser_reconnecting_task(
trace!("> recv update message from {}", grpc_source);
// TODO consider extract this
// backpressure - should'n we block here?
match sender
.send(Message::GeyserSubscribeUpdate(Box::new(update_message)))
{
Ok(n_subscribers) => {
trace!(
"sent update message to {} subscribers (buffer={})",
n_subscribers,
sender.len()
);
// TODO extract timeout param; TODO respect startup
// emit warning if message not received
let warning_threshold = if messages_forwared < 1 { Duration::from_millis(5000) } else { Duration::from_millis(500) };
let started_at = Instant::now();
match sender.send_timeout(Message::GeyserSubscribeUpdate(Box::new(update_message)), warning_threshold).await {
Ok(()) => {
messages_forwared += 1;
trace!("sent update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32());
continue 'recv_loop;
}
Err(SendError(_)) => {
// note: error does not mean that future sends will also fail!
trace!("no subscribers for update message");
continue 'recv_loop;
Err(SendTimeoutError::Timeout(_)) => {
warn!("downstream receiver did not pick put message for {}ms - keep waiting", warning_threshold.as_millis());
match sender.send(Message::GeyserSubscribeUpdate(Box::new(update_message))).await {
Ok(()) => {
messages_forwared += 1;
trace!("sent delayed update message to channel in {:.02}ms", started_at.elapsed().as_secs_f32());
}
Err(_send_error ) => {
warn!("downstream receiver closed, message is lost - aborting");
break 'recv_loop TheState::FatalError(attempt);
}
}
}
};
Err(SendTimeoutError::Closed(_)) => {
warn!("downstream receiver closed - aborting");
break 'recv_loop TheState::FatalError(attempt);
}
}
// {
// Ok(n_subscribers) => {
// trace!(
// "sent update message to {} subscribers (buffer={})",
// n_subscribers,
// sender.len()
// );
// continue 'recv_loop;
// }
// Err(SendError(_)) => {
// // note: error does not mean that future sends will also fail!
// trace!("no subscribers for update message");
// continue 'recv_loop;
// }
// };
}
Some(Err(tonic_status)) => {
// all tonic errors are recoverable