diff --git a/zebra-network/src/peer_set/inventory_registry.rs b/zebra-network/src/peer_set/inventory_registry.rs index 4a6f27e4e..e7af48c01 100644 --- a/zebra-network/src/peer_set/inventory_registry.rs +++ b/zebra-network/src/peer_set/inventory_registry.rs @@ -64,8 +64,34 @@ impl InventoryRegistry { self.rotate(); } - while let Poll::Ready(Some((hash, addr))) = Pin::new(&mut self.inv_stream).poll_next(cx)? { - self.register(hash, addr) + // This module uses a broadcast channel instead of an mpsc channel, even + // though there's a single consumer of inventory advertisements, because + // the broadcast channel has ring-buffer behavior: when the channel is + // full, sending a new message displaces the oldest message in the + // channel. + // + // This is the behavior we want for inventory advertisements, because we + // want to have a bounded buffer of unprocessed advertisements, and we + // want to prioritize new inventory (which is likely only at a specific + // peer) over old inventory (which is likely more widely distributed). + // + // The broadcast channel reports dropped messages by returning + // `RecvError::Lagged`. It's crucial that we handle that error here + // rather than propagating it through the peer set's Service::poll_ready + // implementation, where reporting a failure means reporting a permanent + // failure of the peer set. + use broadcast::RecvError; + while let Poll::Ready(Some(channel_result)) = Pin::new(&mut self.inv_stream).poll_next(cx) { + match channel_result { + Ok((hash, addr)) => self.register(hash, addr), + Err(RecvError::Lagged(count)) => { + metrics::counter!("pool.inventory.dropped", 1); + tracing::debug!(count, "dropped lagged inventory advertisements"); + } + // This indicates all senders, including the one in the handshaker, + // have been dropped, which really is a permanent failure. + Err(RecvError::Closed) => return Err(RecvError::Closed.into()), + } } Ok(())