From 0dc2d92ad8904899a8c1790ba48ac65b14d55065 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Tue, 21 Jul 2020 15:18:22 -0700 Subject: [PATCH] network: ensure dropping a Client closes the connection. This fixes a bug introduced when we added heartbeat support. Recall that we handle the Bitcoin connection state machine on a per-peer basis. Each connection has a task created from the `Connection` struct, and a `Client: tower::Service` "frontend" that passes requests to it via a channel. In the `Connection` event loop, the connection checks whether the request channel has been closed, indicating no further requests from the `Client`, in which case it shuts itself down and cleans up resources. This occurs when all of the senders have been dropped. However, this behavior broke when we introduced heartbeat support, because we spawned an additional task to send heartbeat messages along the request channel. This meant that instead of having a single sender, dropped by the `Client`, we have two senders, the `Client` and the "shadow client" task that generates heartbeat messages. This means that when the `Client` is dropped, we still have a live sender and the connection is not closed. To fix this, the `Client` now uses a `oneshot` to shut down its corresponding heartbeat task. This closes all senders. --- zebra-network/src/peer/client.rs | 13 ++++++++ zebra-network/src/peer/handshake.rs | 52 +++++++++++++++++------------ 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 7f7852246..f925a55a8 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -16,6 +16,9 @@ use super::{ErrorSlot, SharedPeerError}; /// The "client" duplex half of a peer connection. pub struct Client { + // Used to shut down the corresponding heartbeat. + // This is always Some except when we take it on drop. + pub(super) shutdown_tx: Option>, pub(super) server_tx: mpsc::Sender, pub(super) error_slot: ErrorSlot, } @@ -85,3 +88,13 @@ impl Service for Client { } } } + +impl Drop for Client { + fn drop(&mut self) { + let _ = self + .shutdown_tx + .take() + .expect("must not drop twice") + .send(()); + } +} diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 17e74dbbd..5e73f2eb8 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -8,7 +8,10 @@ use std::{ }; use chrono::Utc; -use futures::{channel::mpsc, prelude::*}; +use futures::{ + channel::{mpsc, oneshot}, + prelude::*, +}; use tokio::net::TcpStream; use tokio_util::codec::Framed; use tower::Service; @@ -215,9 +218,11 @@ where // These channels should not be cloned more than they are // in this block, see constants.rs for more. let (server_tx, server_rx) = mpsc::channel(0); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); let slot = ErrorSlot::default(); let client = Client { + shutdown_tx: Some(shutdown_tx), server_tx: server_tx.clone(), error_slot: slot.clone(), }; @@ -283,35 +288,38 @@ where let heartbeat_span = tracing::debug_span!(parent: connection_span, "heartbeat"); tokio::spawn( async move { - use futures::channel::oneshot; - use super::client::ClientRequest; + use futures::future::Either; + let mut shutdown_rx = shutdown_rx; let mut server_tx = server_tx; - let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL); - loop { - interval_stream.tick().await; - - // We discard the server handle because our - // heartbeat `Ping`s are a special case, and we - // don't actually care about the response here. - let (request_tx, _) = oneshot::channel(); - if server_tx - .send(ClientRequest { - request: Request::Ping(Nonce::default()), - tx: request_tx, - span: tracing::Span::current(), - }) - .await - .is_err() - { - return; + let shutdown_rx_ref = Pin::new(&mut shutdown_rx); + match future::select(interval_stream.next(), shutdown_rx_ref).await { + Either::Left(_) => { + // We don't wait on a response because heartbeats are checked + // internally to the connection logic, we just need a separate + // task (this one) to generate them. + let (request_tx, _) = oneshot::channel(); + if server_tx + .send(ClientRequest { + request: Request::Ping(Nonce::default()), + tx: request_tx, + span: tracing::Span::current(), + }) + .await + .is_err() + { + return; + } + } + Either::Right(_) => return, // got shutdown signal } } } - .instrument(heartbeat_span), + .instrument(heartbeat_span) + .boxed(), ); Ok(client)