return errors from `send_periodic_heartbeats_with_shutdown_handle` (#4756)

* return errors from `send_periodic_heartbeats_with_shutdown_handle`

* Add string argument to `HeartbeatTaskExited`

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
Alfredo Garcia 2022-07-22 13:25:53 -03:00 committed by GitHub
parent f81e997090
commit 103d5c6326
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 84 additions and 15 deletions

View File

@ -25,6 +25,7 @@ use crate::{
external::{types::Version, InventoryHash},
internal::{Request, Response},
},
BoxError,
};
#[cfg(any(test, feature = "proptest-impl"))]
@ -58,7 +59,7 @@ pub struct Client {
pub(crate) connection_task: JoinHandle<()>,
/// A handle to the task responsible for sending periodic heartbeats.
pub(crate) heartbeat_task: JoinHandle<()>,
pub(crate) heartbeat_task: JoinHandle<Result<(), BoxError>>,
}
/// A signal sent by the [`Client`] half of a peer connection,
@ -427,7 +428,10 @@ impl Client {
.is_ready();
if is_canceled {
return self.set_task_exited_error("heartbeat", PeerError::HeartbeatTaskExited);
return self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
);
}
match self.heartbeat_task.poll_unpin(cx) {
@ -435,13 +439,41 @@ impl Client {
// Heartbeat task is still running.
Ok(())
}
Poll::Ready(Ok(())) => {
// Heartbeat task stopped unexpectedly, without panicking.
self.set_task_exited_error("heartbeat", PeerError::HeartbeatTaskExited)
Poll::Ready(Ok(Ok(_))) => {
// Heartbeat task stopped unexpectedly, without panic or error.
self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited(
"Heartbeat task stopped unexpectedly".to_string(),
),
)
}
Poll::Ready(Ok(Err(error))) => {
// Heartbeat task stopped unexpectedly, with error.
self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited(error.to_string()),
)
}
Poll::Ready(Err(error)) => {
// Heartbeat task stopped unexpectedly with a panic.
panic!("heartbeat task has panicked: {}", error);
// Heartbeat task was cancelled.
if error.is_cancelled() {
self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited("Task was cancelled".to_string()),
)
}
// Heartbeat task stopped with panic.
else if error.is_panic() {
panic!("heartbeat task has panicked: {}", error);
}
// Heartbeat task stopped with error.
else {
self.set_task_exited_error(
"heartbeat",
PeerError::HeartbeatTaskExited(error.to_string()),
)
}
}
}
}

View File

@ -18,6 +18,7 @@ use crate::{
peer::{error::SharedPeerError, CancelHeartbeatTask, Client, ClientRequest, ErrorSlot},
peer_set::InventoryChange,
protocol::external::types::Version,
BoxError,
};
#[cfg(test)]
@ -282,7 +283,7 @@ where
let (connection_task, connection_aborter) =
Self::spawn_background_task_or_fallback(self.connection_task);
let (heartbeat_task, heartbeat_aborter) =
Self::spawn_background_task_or_fallback(self.heartbeat_task);
Self::spawn_background_task_or_fallback_with_result(self.heartbeat_task);
let client = Client {
shutdown_tx: Some(shutdown_sender),
@ -332,4 +333,37 @@ where
(task_handle, abort_handle)
}
// TODO: In the context of #4734:
// - Delete `spawn_background_task_or_fallback` and `spawn_background_task`
// - Rename `spawn_background_task_or_fallback_with_result` and `spawn_background_task_with_result` to
// `spawn_background_task_or_fallback` and `spawn_background_task`
// Similar to `spawn_background_task_or_fallback` but returns a `Result`.
fn spawn_background_task_or_fallback_with_result<T>(
task_future: Option<T>,
) -> (JoinHandle<Result<(), BoxError>>, AbortHandle)
where
T: Future<Output = ()> + Send + 'static,
{
match task_future {
Some(future) => Self::spawn_background_task_with_result(future),
None => Self::spawn_background_task_with_result(tokio::time::sleep(
MAX_PEER_CONNECTION_TIME,
)),
}
}
// Similar to `spawn_background_task` but returns a `Result`.
fn spawn_background_task_with_result<T>(
task_future: T,
) -> (JoinHandle<Result<(), BoxError>>, AbortHandle)
where
T: Future<Output = ()> + Send + 'static,
{
let (task, abort_handle) = future::abortable(task_future);
let task_handle = tokio::spawn(task.map(|_result| Ok(())));
(task_handle, abort_handle)
}
}

View File

@ -80,7 +80,7 @@ async fn client_service_ready_heartbeat_exit() {
let (mut client, mut harness) = ClientTestHarness::build().finish();
harness.set_error(PeerError::HeartbeatTaskExited);
harness.set_error(PeerError::HeartbeatTaskExited("some error".to_string()));
harness.drop_heartbeat_shutdown_receiver();
assert!(client.is_failed().await);

View File

@ -57,8 +57,8 @@ pub enum PeerError {
ClientCancelledHeartbeatTask,
/// Zebra's internal heartbeat task exited.
#[error("Internal heartbeat task exited")]
HeartbeatTaskExited,
#[error("Internal heartbeat task exited with message: {0:?}")]
HeartbeatTaskExited(String),
/// Sending a message to a remote peer took too long.
#[error("Sending Client request timed out")]
@ -130,7 +130,7 @@ impl PeerError {
PeerError::ConnectionDropped => "ConnectionDropped".into(),
PeerError::ClientDropped => "ClientDropped".into(),
PeerError::ClientCancelledHeartbeatTask => "ClientCancelledHeartbeatTask".into(),
PeerError::HeartbeatTaskExited => "HeartbeatTaskExited".into(),
PeerError::HeartbeatTaskExited(_) => "HeartbeatTaskExited".into(),
PeerError::ConnectionTaskExited => "ConnectionTaskExited".into(),
PeerError::ConnectionSendTimeout => "ConnectionSendTimeout".into(),
PeerError::ConnectionReceiveTimeout => "ConnectionReceiveTimeout".into(),

View File

@ -996,7 +996,8 @@ where
server_tx.clone(),
address_book_updater.clone(),
)
.instrument(tracing::debug_span!(parent: connection_span, "heartbeat")),
.instrument(tracing::debug_span!(parent: connection_span, "heartbeat"))
.boxed(),
);
let client = Client {
@ -1114,7 +1115,7 @@ async fn send_periodic_heartbeats_with_shutdown_handle(
shutdown_rx: oneshot::Receiver<CancelHeartbeatTask>,
server_tx: futures::channel::mpsc::Sender<ClientRequest>,
mut heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
) {
) -> Result<(), BoxError> {
use futures::future::Either;
let heartbeat_run_loop = send_periodic_heartbeats_run_loop(
@ -1136,7 +1137,7 @@ async fn send_periodic_heartbeats_with_shutdown_handle(
// slow rate, and shutdown is a oneshot. If both futures
// are ready, we want the shutdown to take priority over
// sending a useless heartbeat.
let _result = match future::select(shutdown_rx, heartbeat_run_loop).await {
let result = match future::select(shutdown_rx, heartbeat_run_loop).await {
Either::Left((Ok(CancelHeartbeatTask), _unused_run_loop)) => {
tracing::trace!("shutting down because Client requested shut down");
handle_heartbeat_shutdown(
@ -1164,6 +1165,8 @@ async fn send_periodic_heartbeats_with_shutdown_handle(
result
}
};
result
}
/// Send periodical heartbeats to `server_tx`, and update the peer status through