clarify await_or_exit behavior on channel close

This commit is contained in:
GroovieGermanikus 2024-04-29 09:08:09 +02:00
parent 1245d8be37
commit 1d1a01c8ae
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 15 additions and 3 deletions

View File

@ -6,6 +6,7 @@ use std::time::Duration;
use tokio::sync::mpsc::error::SendTimeoutError; use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout, Instant}; use tokio::time::{sleep, timeout, Instant};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError}; use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
@ -379,16 +380,27 @@ enum MaybeExit<T> {
Exit, Exit,
} }
async fn await_or_exit<F, E>(future: F, exit_notify: E) -> MaybeExit<F::Output> async fn await_or_exit<F, E, T>(future: F, exit_notify: E) -> MaybeExit<F::Output>
where where
F: Future, F: Future,
E: Future, E: Future<Output = Result<T, RecvError>>,
{ {
tokio::select! { tokio::select! {
res = future => { res = future => {
MaybeExit::Continue(res) MaybeExit::Continue(res)
}, },
_ = exit_notify => { res = exit_notify => {
match res {
Ok(_) => {
debug!("exit on signal");
}
Err(RecvError::Lagged(_)) => {
warn!("exit on signal (lag)");
}
Err(RecvError::Closed) => {
warn!("exit on signal (channel close)");
}
}
MaybeExit::Exit MaybeExit::Exit
} }
} }