send works now

This commit is contained in:
GroovieGermanikus 2024-01-18 11:11:46 +01:00
parent c9144ee39f
commit bbba7b1fea
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 31 additions and 16 deletions

View File

@ -7,6 +7,7 @@ use std::fmt::{Debug, Display};
use std::pin::Pin; use std::pin::Pin;
use std::time::Duration; use std::time::Duration;
use futures::channel::mpsc; use futures::channel::mpsc;
use tokio::sync::broadcast::error::SendError;
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Receiver;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout, Timeout}; use tokio::time::{sleep, timeout, Timeout};
@ -293,7 +294,7 @@ pub fn create_geyser_reconnecting_task(
grpc_source: GrpcSourceConfig, grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest, subscribe_filter: SubscribeRequest,
) -> (JoinHandle<()>, Receiver<Message>) { ) -> (JoinHandle<()>, Receiver<Message>) {
let (tx, rx) = tokio::sync::broadcast::channel::<Message>(1000); let (sender, receiver_stream) = tokio::sync::broadcast::channel::<Message>(1000);
let jh_geyser_task = tokio::spawn(async move { let jh_geyser_task = tokio::spawn(async move {
let mut state = NotConnected(0); let mut state = NotConnected(0);
@ -389,20 +390,34 @@ pub fn create_geyser_reconnecting_task(
TheState::NotConnected(attempt) TheState::NotConnected(attempt)
} }
Ready(attempt, mut geyser_stream) => { Ready(attempt, mut geyser_stream) => {
'recv_loop: loop {
match geyser_stream.next().await { match geyser_stream.next().await {
Some(Ok(update_message)) => { Some(Ok(update_message)) => {
trace!("> recv update message from {}", grpc_source); trace!("> recv update message from {}", grpc_source);
TheState::Ready(attempt, geyser_stream) match sender.send(Message::GeyserSubscribeUpdate(Box::new(update_message))) {
Ok(n_subscribers) => {
trace!("sent update message to {} subscribers (buffer={})",
n_subscribers,
sender.len());
continue 'recv_loop;
}
Err(SendError(_)) => {
// note: error does not mean that future sends will also fail!
trace!("no subscribers for update message");
continue 'recv_loop;
}
};
} }
Some(Err(tonic_status)) => { Some(Err(tonic_status)) => {
// ATM we consider all errors recoverable // ATM we consider all errors recoverable
warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status); warn!("! error on {} - retrying: {:?}", grpc_source, tonic_status);
TheState::WaitReconnect(attempt) break 'recv_loop TheState::WaitReconnect(attempt);
} }
None => { None => {
// should not arrive here, Mean the stream close. // should not arrive here, Mean the stream close.
warn!("geyser stream closed on {} - retrying", grpc_source); warn!("geyser stream closed on {} - retrying", grpc_source);
TheState::WaitReconnect(attempt) break 'recv_loop TheState::WaitReconnect(attempt);
}
} }
} }
} }
@ -413,7 +428,7 @@ pub fn create_geyser_reconnecting_task(
}); });
(jh_geyser_task, rx) (jh_geyser_task, receiver_stream)
} }