Add more sends before dropping ClientRequests

This fix also changes heartbeat behaviour in the following ways:
* if the queue is full, the connection is closed. Previously, the sender
  would wait until the queue had emptied
* if the queue flush fails, Zebra panics, because it can't send an error
  on the ClientRequest sender, so the invariant is broken
This commit is contained in:
teor 2021-01-05 11:00:04 +10:00 committed by Jane Lusby
parent 3e711ccc8a
commit f8ff2e9c0b
2 changed files with 37 additions and 15 deletions

View File

@ -12,7 +12,7 @@ use tower::Service;
use crate::protocol::internal::{Request, Response};
use super::{ErrorSlot, SharedPeerError};
use super::{ErrorSlot, PeerError, SharedPeerError};
/// The "client" duplex half of a peer connection.
pub struct Client {
@ -148,6 +148,8 @@ impl Service<Request> for Client {
}) {
Err(e) => {
if e.is_disconnected() {
let ClientRequest { tx, .. } = e.into_inner();
let _ = tx.send(Err(PeerError::ConnectionClosed.into()));
future::ready(Err(self
.error_slot
.try_get_error()

View File

@ -30,7 +30,7 @@ use crate::{
BoxError, Config,
};
use super::{Client, Connection, ErrorSlot, HandshakeError};
use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError};
/// A [`Service`] that handshakes with a remote peer and constructs a
/// client/server pair.
@ -464,19 +464,39 @@ where
let (tx, rx) = oneshot::channel();
let request = Request::Ping(Nonce::default());
tracing::trace!(?request, "queueing heartbeat request");
if server_tx
.send(ClientRequest {
request,
tx: tx.into(),
span: tracing::Span::current(),
})
.await
.is_err()
{
tracing::trace!(
"error sending heartbeat request, shutting down"
);
return;
match server_tx.try_send(ClientRequest {
request,
tx: tx.into(),
span: tracing::Span::current(),
}) {
Ok(()) => {
match server_tx.flush().await {
Ok(()) => {}
Err(e) => {
// TODO: we can't get the client request for this failure,
// so we can't ensure the invariant holds
panic!("flushing client request failed: {:?}", e);
}
}
}
Err(e) => {
tracing::trace!(
?e,
"error sending heartbeat request, shutting down"
);
if e.is_disconnected() {
let ClientRequest { tx, .. } = e.into_inner();
let _ =
tx.send(Err(PeerError::ConnectionClosed.into()));
} else if e.is_full() {
let ClientRequest { tx, .. } = e.into_inner();
let _ = tx.send(Err(PeerError::Overloaded.into()));
} else {
// we need to map unexpected error types to PeerErrors
panic!("unexpected try_send error: {:?}", e);
}
return;
}
}
// Heartbeats are checked internally to the
// connection logic, but we need to wait on the