Cancel heartbeats that are waiting for a peer, rather than hanging Zebra (#3325)

* If the crawler is delayed, delay future crawl intervals by the same amount

* Cancel heartbeats that are waiting for network requests or responses
This commit is contained in:
teor 2022-01-13 05:15:07 +10:00 committed by GitHub
parent d076b999f3
commit ac4ed57751
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 99 additions and 43 deletions

View File

@ -41,6 +41,10 @@ pub enum PeerError {
#[error("Internal peer connection task exited")]
ConnectionTaskExited,
/// Zebra's [`Client`] cancelled its heartbeat task.
#[error("Internal client cancelled its heartbeat task")]
ClientCancelledHeartbeatTask,
/// Zebra's internal heartbeat task exited.
#[error("Internal heartbeat task exited")]
HeartbeatTaskExited,
@ -75,6 +79,7 @@ impl PeerError {
PeerError::ConnectionClosed => "ConnectionClosed".into(),
PeerError::ConnectionDropped => "ConnectionDropped".into(),
PeerError::ClientDropped => "ClientDropped".into(),
PeerError::ClientCancelledHeartbeatTask => "ClientCancelledHeartbeatTask".into(),
PeerError::HeartbeatTaskExited => "HeartbeatTaskExited".into(),
PeerError::ConnectionTaskExited => "ConnectionTaskExited".into(),
PeerError::ClientRequestTimeout => "ClientRequestTimeout".into(),

View File

@ -10,7 +10,7 @@ use std::{
};
use chrono::{TimeZone, Utc};
use futures::{channel::oneshot, future, FutureExt, SinkExt, StreamExt};
use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt};
use tokio::{
net::TcpStream,
sync::broadcast,
@ -732,7 +732,7 @@ where
// CORRECTNESS
//
// As a defence-in-depth against hangs, every send or next on stream
// As a defence-in-depth against hangs, every send() or next() on peer_conn
// should be wrapped in a timeout.
let mut peer_conn = Framed::new(
tcp_stream,
@ -933,7 +933,7 @@ where
);
let heartbeat_task = tokio::spawn(
send_periodic_heartbeats(
send_periodic_heartbeats_with_shutdown_handle(
connected_addr,
remote_services,
shutdown_rx,
@ -975,15 +975,74 @@ where
/// heartbeat_ts_collector.
///
/// Returning from this function terminates the connection's heartbeat task.
async fn send_periodic_heartbeats(
async fn send_periodic_heartbeats_with_shutdown_handle(
connected_addr: ConnectedAddr,
remote_services: PeerServices,
mut shutdown_rx: oneshot::Receiver<CancelHeartbeatTask>,
mut server_tx: futures::channel::mpsc::Sender<ClientRequest>,
shutdown_rx: oneshot::Receiver<CancelHeartbeatTask>,
server_tx: futures::channel::mpsc::Sender<ClientRequest>,
mut heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
) {
use futures::future::Either;
let heartbeat_run_loop = send_periodic_heartbeats_run_loop(
connected_addr,
remote_services,
server_tx,
heartbeat_ts_collector.clone(),
);
pin_mut!(shutdown_rx);
pin_mut!(heartbeat_run_loop);
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// Starvation is impossible here, because interval has a
// slow rate, and shutdown is a oneshot. If both futures
// are ready, we want the shutdown to take priority over
// sending a useless heartbeat.
let _result = match future::select(shutdown_rx, heartbeat_run_loop).await {
Either::Left((Ok(CancelHeartbeatTask), _unused_run_loop)) => {
tracing::trace!("shutting down because Client requested shut down");
handle_heartbeat_shutdown(
PeerError::ClientCancelledHeartbeatTask,
&mut heartbeat_ts_collector,
&connected_addr,
&remote_services,
)
.await
}
Either::Left((Err(oneshot::Canceled), _unused_run_loop)) => {
tracing::trace!("shutting down because Client was dropped");
handle_heartbeat_shutdown(
PeerError::ClientDropped,
&mut heartbeat_ts_collector,
&connected_addr,
&remote_services,
)
.await
}
Either::Right((result, _unused_shutdown)) => {
tracing::trace!("shutting down due to heartbeat failure");
// heartbeat_timeout() already send an error on the timestamp collector channel
result
}
};
}
/// Send periodical heartbeats to `server_tx`, and update the peer status through
/// `heartbeat_ts_collector`.
///
/// See `send_periodic_heartbeats_with_shutdown_handle` for details.
async fn send_periodic_heartbeats_run_loop(
connected_addr: ConnectedAddr,
remote_services: PeerServices,
mut server_tx: futures::channel::mpsc::Sender<ClientRequest>,
mut heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
) -> Result<(), BoxError> {
// Don't send the first heartbeat immediately - we've just completed the handshake!
let mut interval = tokio::time::interval_at(
Instant::now() + constants::HEARTBEAT_INTERVAL,
@ -995,49 +1054,20 @@ async fn send_periodic_heartbeats(
let mut interval_stream = IntervalStream::new(interval);
loop {
let shutdown_rx_ref = Pin::new(&mut shutdown_rx);
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// Starvation is impossible here, because interval has a
// slow rate, and shutdown is a oneshot. If both futures
// are ready, we want the shutdown to take priority over
// sending a useless heartbeat.
if matches!(
future::select(shutdown_rx_ref, interval_stream.next()).await,
Either::Left(_)
) {
tracing::trace!("shutting down due to Client shut down");
if let Some(book_addr) = connected_addr.get_address_book_addr() {
// awaiting a local task won't hang
let _ = heartbeat_ts_collector
.send(MetaAddr::new_shutdown(&book_addr, remote_services))
.await;
}
return;
}
while let Some(_instant) = interval_stream.next().await {
// We've reached another heartbeat interval without
// shutting down, so do a heartbeat request.
//
// TODO: await heartbeat and shutdown (#3254)
let heartbeat = send_one_heartbeat(&mut server_tx);
if heartbeat_timeout(
heartbeat_timeout(
heartbeat,
&mut heartbeat_ts_collector,
&connected_addr,
&remote_services,
)
.await
.is_err()
{
return;
}
.await?;
}
unreachable!("unexpected IntervalStream termination")
}
/// Send one heartbeat using `server_tx`.
@ -1145,3 +1175,21 @@ where
}
}
}
/// Mark `connected_addr` as shut down using `address_book_updater`.
async fn handle_heartbeat_shutdown(
peer_error: PeerError,
address_book_updater: &mut tokio::sync::mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<(), BoxError> {
tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat");
if let Some(book_addr) = connected_addr.get_address_book_addr() {
let _ = address_book_updater
.send(MetaAddr::new_shutdown(&book_addr, *remote_services))
.await;
}
Err(peer_error.into())
}

View File

@ -667,9 +667,12 @@ where
// prevents us from adding items to the stream and checking its length.
handshakes.push(future::pending().boxed());
let mut crawl_timer =
IntervalStream::new(tokio::time::interval(config.crawl_new_peer_interval))
.map(|tick| TimerCrawl { tick });
let mut crawl_timer = tokio::time::interval(config.crawl_new_peer_interval);
// If the crawl is delayed, also delay all future crawls.
// (Shorter intervals just add load, without any benefit.)
crawl_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut crawl_timer = IntervalStream::new(crawl_timer).map(|tick| TimerCrawl { tick });
loop {
metrics::gauge!(