network: ensure dropping a Client closes the connection.
This fixes a bug introduced when we added heartbeat support. Recall that we handle the Bitcoin connection state machine on a per-peer basis. Each connection has a task created from the `Connection` struct, and a `Client: tower::Service` "frontend" that passes requests to it via a channel. In the `Connection` event loop, the connection checks whether the request channel has been closed, indicating no further requests from the `Client`, in which case it shuts itself down and cleans up resources. This occurs when all of the senders have been dropped. However, this behavior broke when we introduced heartbeat support, because we spawned an additional task to send heartbeat messages along the request channel. This meant that instead of having a single sender, dropped by the `Client`, we have two senders, the `Client` and the "shadow client" task that generates heartbeat messages. This means that when the `Client` is dropped, we still have a live sender and the connection is not closed. To fix this, the `Client` now uses a `oneshot` to shut down its corresponding heartbeat task. This closes all senders.
This commit is contained in:
parent
b0cd920fad
commit
0dc2d92ad8
|
@ -16,6 +16,9 @@ use super::{ErrorSlot, SharedPeerError};
|
||||||
|
|
||||||
/// The "client" duplex half of a peer connection.
|
/// The "client" duplex half of a peer connection.
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
|
// Used to shut down the corresponding heartbeat.
|
||||||
|
// This is always Some except when we take it on drop.
|
||||||
|
pub(super) shutdown_tx: Option<oneshot::Sender<()>>,
|
||||||
pub(super) server_tx: mpsc::Sender<ClientRequest>,
|
pub(super) server_tx: mpsc::Sender<ClientRequest>,
|
||||||
pub(super) error_slot: ErrorSlot,
|
pub(super) error_slot: ErrorSlot,
|
||||||
}
|
}
|
||||||
|
@ -85,3 +88,13 @@ impl Service<Request> for Client {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for Client {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let _ = self
|
||||||
|
.shutdown_tx
|
||||||
|
.take()
|
||||||
|
.expect("must not drop twice")
|
||||||
|
.send(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -8,7 +8,10 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use futures::{channel::mpsc, prelude::*};
|
use futures::{
|
||||||
|
channel::{mpsc, oneshot},
|
||||||
|
prelude::*,
|
||||||
|
};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio_util::codec::Framed;
|
use tokio_util::codec::Framed;
|
||||||
use tower::Service;
|
use tower::Service;
|
||||||
|
@ -215,9 +218,11 @@ where
|
||||||
// These channels should not be cloned more than they are
|
// These channels should not be cloned more than they are
|
||||||
// in this block, see constants.rs for more.
|
// in this block, see constants.rs for more.
|
||||||
let (server_tx, server_rx) = mpsc::channel(0);
|
let (server_tx, server_rx) = mpsc::channel(0);
|
||||||
|
let (shutdown_tx, shutdown_rx) = oneshot::channel();
|
||||||
let slot = ErrorSlot::default();
|
let slot = ErrorSlot::default();
|
||||||
|
|
||||||
let client = Client {
|
let client = Client {
|
||||||
|
shutdown_tx: Some(shutdown_tx),
|
||||||
server_tx: server_tx.clone(),
|
server_tx: server_tx.clone(),
|
||||||
error_slot: slot.clone(),
|
error_slot: slot.clone(),
|
||||||
};
|
};
|
||||||
|
@ -283,20 +288,19 @@ where
|
||||||
let heartbeat_span = tracing::debug_span!(parent: connection_span, "heartbeat");
|
let heartbeat_span = tracing::debug_span!(parent: connection_span, "heartbeat");
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
async move {
|
async move {
|
||||||
use futures::channel::oneshot;
|
|
||||||
|
|
||||||
use super::client::ClientRequest;
|
use super::client::ClientRequest;
|
||||||
|
use futures::future::Either;
|
||||||
|
|
||||||
|
let mut shutdown_rx = shutdown_rx;
|
||||||
let mut server_tx = server_tx;
|
let mut server_tx = server_tx;
|
||||||
|
|
||||||
let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL);
|
let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
interval_stream.tick().await;
|
let shutdown_rx_ref = Pin::new(&mut shutdown_rx);
|
||||||
|
match future::select(interval_stream.next(), shutdown_rx_ref).await {
|
||||||
// We discard the server handle because our
|
Either::Left(_) => {
|
||||||
// heartbeat `Ping`s are a special case, and we
|
// We don't wait on a response because heartbeats are checked
|
||||||
// don't actually care about the response here.
|
// internally to the connection logic, we just need a separate
|
||||||
|
// task (this one) to generate them.
|
||||||
let (request_tx, _) = oneshot::channel();
|
let (request_tx, _) = oneshot::channel();
|
||||||
if server_tx
|
if server_tx
|
||||||
.send(ClientRequest {
|
.send(ClientRequest {
|
||||||
|
@ -310,8 +314,12 @@ where
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Either::Right(_) => return, // got shutdown signal
|
||||||
}
|
}
|
||||||
.instrument(heartbeat_span),
|
}
|
||||||
|
}
|
||||||
|
.instrument(heartbeat_span)
|
||||||
|
.boxed(),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(client)
|
Ok(client)
|
||||||
|
|
Loading…
Reference in New Issue