create_geyser_autoconnection_task allows caller-provided mpsc

This commit is contained in:
GroovieGermanikus 2024-02-15 12:00:19 +01:00
parent 6c26802864
commit 66c4256438
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 18 additions and 7 deletions

View File

@ -31,6 +31,17 @@ enum FatalErrorReason {
SubscribeError,
}
pub fn create_geyser_autoconnection_task(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> (AbortHandle, Receiver<Message>) {
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);
let abort_handle = create_geyser_autoconnection_task_with_mpsc(grpc_source, subscribe_filter, sender);
(abort_handle, receiver_channel)
}
/// connect to grpc source performing autoconect if required,
/// returns mpsc channel; task will abort on fatal error
///
@ -38,12 +49,12 @@ enum FatalErrorReason {
/// * no panic/unwrap
/// * do not use "?"
/// * do not "return" unless you really want to abort the task
pub fn create_geyser_autoconnection_task(
pub fn create_geyser_autoconnection_task_with_mpsc(
grpc_source: GrpcSourceConfig,
subscribe_filter: SubscribeRequest,
) -> (AbortHandle, Receiver<Message>) {
mpsc_sender: tokio::sync::mpsc::Sender<Message>,
) -> AbortHandle {
// read this for argument: http://www.randomhacks.net/2019/03/08/should-rust-channels-panic-on-send/
let (sender, receiver_channel) = tokio::sync::mpsc::channel::<Message>(1);
let jh_geyser_task = tokio::spawn(async move {
let mut state = ConnectionState::NotConnected(0);
@ -220,7 +231,7 @@ pub fn create_geyser_autoconnection_task(
Duration::from_millis(500)
};
let started_at = Instant::now();
match sender
match mpsc_sender
.send_timeout(
Message::GeyserSubscribeUpdate(Box::new(update_message)),
warning_threshold,
@ -242,9 +253,9 @@ pub fn create_geyser_autoconnection_task(
continue 'recv_loop;
}
Err(SendTimeoutError::Timeout(the_message)) => {
warn!("downstream receiver did not pick put message for {}ms - keep waiting", warning_threshold.as_millis());
warn!("downstream receiver did not pick up message for {}ms - keep waiting", warning_threshold.as_millis());
match sender.send(the_message).await {
match mpsc_sender.send(the_message).await {
Ok(()) => {
messages_forwarded += 1;
trace!(
@ -291,7 +302,7 @@ pub fn create_geyser_autoconnection_task(
} // -- endless state loop
});
(jh_geyser_task.abort_handle(), receiver_channel)
jh_geyser_task.abort_handle()
}
#[cfg(test)]