diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 8def31c22..f5e4e2ea2 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -12,7 +12,7 @@ use tower::Service; use crate::protocol::internal::{Request, Response}; -use super::{ErrorSlot, SharedPeerError}; +use super::{ErrorSlot, PeerError, SharedPeerError}; /// The "client" duplex half of a peer connection. pub struct Client { @@ -148,6 +148,8 @@ impl Service for Client { }) { Err(e) => { if e.is_disconnected() { + let ClientRequest { tx, .. } = e.into_inner(); + let _ = tx.send(Err(PeerError::ConnectionClosed.into())); future::ready(Err(self .error_slot .try_get_error() diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 5bdfcd03f..af7ec7fd3 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -30,7 +30,7 @@ use crate::{ BoxError, Config, }; -use super::{Client, Connection, ErrorSlot, HandshakeError}; +use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError}; /// A [`Service`] that handshakes with a remote peer and constructs a /// client/server pair. @@ -464,19 +464,39 @@ where let (tx, rx) = oneshot::channel(); let request = Request::Ping(Nonce::default()); tracing::trace!(?request, "queueing heartbeat request"); - if server_tx - .send(ClientRequest { - request, - tx: tx.into(), - span: tracing::Span::current(), - }) - .await - .is_err() - { - tracing::trace!( - "error sending heartbeat request, shutting down" - ); - return; + match server_tx.try_send(ClientRequest { + request, + tx: tx.into(), + span: tracing::Span::current(), + }) { + Ok(()) => { + match server_tx.flush().await { + Ok(()) => {} + Err(e) => { + // TODO: we can't get the client request for this failure, + // so we can't ensure the invariant holds + panic!("flushing client request failed: {:?}", e); + } + } + } + Err(e) => { + tracing::trace!( + ?e, + "error sending heartbeat request, shutting down" + ); + if e.is_disconnected() { + let ClientRequest { tx, .. } = e.into_inner(); + let _ = + tx.send(Err(PeerError::ConnectionClosed.into())); + } else if e.is_full() { + let ClientRequest { tx, .. } = e.into_inner(); + let _ = tx.send(Err(PeerError::Overloaded.into())); + } else { + // we need to map unexpected error types to PeerErrors + panic!("unexpected try_send error: {:?}", e); + } + return; + } } // Heartbeats are checked internally to the // connection logic, but we need to wait on the