network: fix bug in inventory advertisement handling (#1022)
* network: fix bug in inventory advertisement handling The RFC https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html described the use of a `broadcast` channel in place of an `mpsc` channel to get ring-buffer behavior, keeping a bound on the size of the channel but dropping old entries when the channel is full. However, it didn't explicitly describe how this works (the `broadcast` channel returns a `RecvError::Lagged(u64)` to inform receivers that they lost messages), so the lag-handling wasn't implemented and I didn't notice in review. Instead, the ? operator bubbled the lag error all the way up from `InventoryRegistry::poll_inventory` through `<PeerSet as Service>::poll_ready` through various Tower wrappers to users of the peer set. The error propagation is bad enough, because it caused client errors that shouldn't have happened, but there's a worse interaction. The `Service` contract distinguishes between request errors (from `Service::call`, scoped to the request) and service errors (from `Service::poll_ready`, scoped to the service). The `Service` contract specifies that once a service returns an error from `poll_ready`, the service can be assumed to be failed permanently. I believe (but haven't tested or carefully worked through the details) that this caused various tower middleware to report the entire peer set service as permanently failed due to a transient inventory "error" (more of an indicator), and I suspect that this is the cause of #1003, where all of the sync component's requests end up failing because the peer set reported that it failed permanently. I am able to reproduce #1003 locally before this change and unable to reproduce it locally after this change, though I have not tested exhaustively. * network: add metric for dropped inventory advertisements Co-authored-by: teor <teor@riseup.net> Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
parent
ca1a451895
commit
cad38415b2
|
@ -64,8 +64,34 @@ impl InventoryRegistry {
|
||||||
self.rotate();
|
self.rotate();
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Poll::Ready(Some((hash, addr))) = Pin::new(&mut self.inv_stream).poll_next(cx)? {
|
// This module uses a broadcast channel instead of an mpsc channel, even
|
||||||
self.register(hash, addr)
|
// though there's a single consumer of inventory advertisements, because
|
||||||
|
// the broadcast channel has ring-buffer behavior: when the channel is
|
||||||
|
// full, sending a new message displaces the oldest message in the
|
||||||
|
// channel.
|
||||||
|
//
|
||||||
|
// This is the behavior we want for inventory advertisements, because we
|
||||||
|
// want to have a bounded buffer of unprocessed advertisements, and we
|
||||||
|
// want to prioritize new inventory (which is likely only at a specific
|
||||||
|
// peer) over old inventory (which is likely more widely distributed).
|
||||||
|
//
|
||||||
|
// The broadcast channel reports dropped messages by returning
|
||||||
|
// `RecvError::Lagged`. It's crucial that we handle that error here
|
||||||
|
// rather than propagating it through the peer set's Service::poll_ready
|
||||||
|
// implementation, where reporting a failure means reporting a permanent
|
||||||
|
// failure of the peer set.
|
||||||
|
use broadcast::RecvError;
|
||||||
|
while let Poll::Ready(Some(channel_result)) = Pin::new(&mut self.inv_stream).poll_next(cx) {
|
||||||
|
match channel_result {
|
||||||
|
Ok((hash, addr)) => self.register(hash, addr),
|
||||||
|
Err(RecvError::Lagged(count)) => {
|
||||||
|
metrics::counter!("pool.inventory.dropped", 1);
|
||||||
|
tracing::debug!(count, "dropped lagged inventory advertisements");
|
||||||
|
}
|
||||||
|
// This indicates all senders, including the one in the handshaker,
|
||||||
|
// have been dropped, which really is a permanent failure.
|
||||||
|
Err(RecvError::Closed) => return Err(RecvError::Closed.into()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
Loading…
Reference in New Issue