1. Create an API for a missing inventory registry, but don't register any missing inventory yet (#3255)

* feat(network): create an API for registering missing inventory, but don't use it yet

* feat(constraint): implement AtLeastOne::iter_mut()

* refactor(network): add InventoryStatus::marker() method to remove associated data

* fix(network): prefer current inventory, and missing inventory statuses

* fix(network): if an inventory rotation is missed, delay future rotations

* fix(network): don't immediately rotate a new empty inventory registry

* fix(network): assert that only expected inventory variants are stored in the registry

* test(network): add a basic empty inventory registry test

Also adds an inventory registry update future,
which makes it easier to call from an async context.

* refactor(network): add a convenience API for new InventoryChanges

* feat(network): improve inventory registry logging and metrics

* test(network): make sure advertised and missing inventory is correctly registered

* test(network): check that missing inventory is preferred over advertised

* test(network): check that current inventory is preferred over previous

* test(network): check peer set routes inv requests to advertised peers

* refactor(network): make the InventoryChange API more flexible

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
teor 2022-02-07 09:05:52 +10:00 committed by GitHub
parent 7b401ddeb0
commit 98502d6181
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 771 additions and 63 deletions

View File

@ -147,7 +147,7 @@ where
}
}
// Deref (but not DerefMut, because that could break the constraint)
// Deref and AsRef (but not DerefMut or AsMut, because that could break the constraint)
impl<T> Deref for AtLeastOne<T> {
type Target = Vec<T>;
@ -157,6 +157,12 @@ impl<T> Deref for AtLeastOne<T> {
}
}
impl<T> AsRef<[T]> for AtLeastOne<T> {
fn as_ref(&self) -> &[T] {
self.inner.as_ref()
}
}
// Extracting one or more items
impl<T> From<AtLeastOne<T>> for Vec<T> {
@ -165,7 +171,35 @@ impl<T> From<AtLeastOne<T>> for Vec<T> {
}
}
// `IntoIterator` for `T` and `&mut T`, because iterators can't remove items
impl<T> IntoIterator for AtLeastOne<T> {
type Item = T;
type IntoIter = std::vec::IntoIter<T>;
fn into_iter(self) -> std::vec::IntoIter<T> {
self.inner.into_iter()
}
}
impl<T> AtLeastOne<T> {
/// Returns an iterator that allows modifying each value.
pub fn iter_mut(&mut self) -> std::slice::IterMut<'_, T> {
self.inner.iter_mut()
}
}
impl<T> AtLeastOne<T> {
/// Returns a new `AtLeastOne`, containing a single `item`.
///
/// Skips the `TrustedPreallocate` memory denial of service checks.
/// (`TrustedPreallocate` can not defend against a single item
/// that causes a denial of service by itself.)
pub fn from_one(item: T) -> AtLeastOne<T> {
AtLeastOne { inner: vec![item] }
}
/// Returns a reference to the inner vector.
pub fn as_vec(&self) -> &Vec<T> {
&self.inner

View File

@ -38,7 +38,7 @@ use crate::{
CancelHeartbeatTask, Client, ClientRequest, Connection, ErrorSlot, HandshakeError,
MinimumPeerVersion, PeerError,
},
peer_set::ConnectionTracker,
peer_set::{ConnectionTracker, InventoryChange},
protocol::{
external::{types::*, AddrInVersion, Codec, InventoryHash, Message},
internal::{Request, Response},
@ -68,7 +68,7 @@ where
inbound_service: S,
address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>,
inv_collector: broadcast::Sender<InventoryChange>,
minimum_peer_version: MinimumPeerVersion<C>,
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
@ -349,7 +349,7 @@ where
inbound_service: Option<S>,
address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
inv_collector: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
inv_collector: Option<broadcast::Sender<InventoryChange>>,
latest_chain_tip: C,
}
@ -377,7 +377,7 @@ where
/// to look up peers that have specific inventory.
pub fn with_inventory_collector(
mut self,
inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>,
inv_collector: broadcast::Sender<InventoryChange>,
) -> Self {
self.inv_collector = Some(inv_collector);
self
@ -930,24 +930,37 @@ where
//
// https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
//
// TODO: zcashd has a bug where it merges queued inv messages of
// the same or different types. So Zebra should split small
// merged inv messages into separate inv messages. (#1768)
// Note: zcashd has a bug where it merges queued inv messages of
// the same or different types. Zebra compensates by sending `notfound`
// responses to the inv collector. (#2156, #1768)
//
// (We can't split `inv`s, because that fills the inventory registry
// with useless entries that the whole network has, making it large and slow.)
match hashes.as_slice() {
[hash @ InventoryHash::Block(_)] => {
debug!(?hash, "registering gossiped block inventory for peer");
let _ = inv_collector.send((*hash, transient_addr));
// The peer set and inv collector use the peer's remote
// address as an identifier
let _ = inv_collector.send(InventoryChange::new_advertised(
*hash,
transient_addr,
));
}
[hashes @ ..] => {
for hash in hashes {
if let Some(unmined_tx_id) = hash.unmined_tx_id() {
debug!(?unmined_tx_id, "registering unmined transaction inventory for peer");
// The peer set and inv collector use the peer's remote
// address as an identifier
let _ = inv_collector.send((*hash, transient_addr));
} else {
trace!(?hash, "ignoring non-transaction inventory hash in multi-hash list")
}
let hashes =
hashes.iter().filter(|hash| hash.unmined_tx_id().is_some());
debug!(
?hashes,
"registering unmined transaction inventory for peer"
);
if let Some(change) = InventoryChange::new_advertised_multi(
hashes,
transient_addr,
) {
let _ = inv_collector.send(change);
}
}
}

View File

@ -6,6 +6,7 @@ mod set;
mod unready_service;
pub(crate) use candidate_set::CandidateSet;
pub(crate) use inventory_registry::InventoryChange;
pub(crate) use limit::{ActiveConnectionCounter, ConnectionTracker};
use inventory_registry::InventoryRegistry;

View File

@ -3,7 +3,8 @@
//! [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
use std::{
collections::{HashMap, HashSet},
collections::HashMap,
convert::TryInto,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
@ -11,31 +12,71 @@ use std::{
};
use futures::{FutureExt, Stream, StreamExt};
use tokio::{sync::broadcast, time};
use tokio::{
sync::broadcast,
time::{self, Instant},
};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream, IntervalStream};
use zebra_chain::{parameters::POST_BLOSSOM_POW_TARGET_SPACING, serialization::AtLeastOne};
use crate::{protocol::external::InventoryHash, BoxError};
/// An Inventory Registry for tracking recent inventory advertisements by peer.
use self::update::Update;
use InventoryStatus::*;
pub mod update;
#[cfg(test)]
mod tests;
/// A peer inventory status change, used in the inventory status channel.
pub type InventoryChange = InventoryStatus<(AtLeastOne<InventoryHash>, SocketAddr)>;
/// An internal marker used in inventory status hash maps.
type InventoryMarker = InventoryStatus<()>;
/// A generic peer inventory status.
///
/// `Advertised` is used for inventory that peers claim to have,
/// and `Missing` is used for inventory they didn't provide when we requested it.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum InventoryStatus<T: Clone> {
/// An advertised inventory hash.
///
/// For performance reasons, advertisements should only be tracked
/// for hashes that are rare on the network.
/// So Zebra only tracks single-block inventory messages.
Advertised(T),
/// An inventory hash rejected by a peer.
///
/// For security reasons, all `notfound` rejections should be tracked.
/// This also helps with performance, if the hash is rare on the network.
#[allow(dead_code)]
Missing(T),
}
/// An Inventory Registry for tracking recent inventory advertisements and missing inventory.
///
/// For more details please refer to the [RFC].
///
/// [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
pub struct InventoryRegistry {
/// Map tracking the inventory advertisements from the current interval
/// period
current: HashMap<InventoryHash, HashSet<SocketAddr>>,
/// Map tracking inventory advertisements from the previous interval period
prev: HashMap<InventoryHash, HashSet<SocketAddr>>,
/// Stream of incoming inventory hashes to register
/// Map tracking the latest inventory status from the current interval
/// period.
current: HashMap<InventoryHash, HashMap<SocketAddr, InventoryMarker>>,
/// Map tracking inventory statuses from the previous interval period.
prev: HashMap<InventoryHash, HashMap<SocketAddr, InventoryMarker>>,
/// Stream of incoming inventory statuses to register.
inv_stream: Pin<
Box<
dyn Stream<Item = Result<(InventoryHash, SocketAddr), BroadcastStreamRecvError>>
+ Send
+ 'static,
>,
Box<dyn Stream<Item = Result<InventoryChange, BroadcastStreamRecvError>> + Send + 'static>,
>,
/// Interval tracking how frequently we should rotate our maps
/// Interval tracking when we should next rotate our maps.
interval: IntervalStream,
}
@ -48,24 +89,176 @@ impl std::fmt::Debug for InventoryRegistry {
}
}
impl InventoryChange {
/// Returns a new advertised inventory change from a single hash.
pub fn new_advertised(hash: InventoryHash, peer: SocketAddr) -> Self {
InventoryStatus::Advertised((AtLeastOne::from_one(hash), peer))
}
/// Returns a new missing inventory change from a single hash.
#[allow(dead_code)]
pub fn new_missing(hash: InventoryHash, peer: SocketAddr) -> Self {
InventoryStatus::Missing((AtLeastOne::from_one(hash), peer))
}
/// Returns a new advertised multiple inventory change, if `hashes` contains at least one change.
pub fn new_advertised_multi<'a>(
hashes: impl IntoIterator<Item = &'a InventoryHash>,
peer: SocketAddr,
) -> Option<Self> {
let hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
let hashes = hashes.try_into().ok();
hashes.map(|hashes| InventoryStatus::Advertised((hashes, peer)))
}
/// Returns a new missing multiple inventory change, if `hashes` contains at least one change.
#[allow(dead_code)]
pub fn new_missing_multi<'a>(
hashes: impl IntoIterator<Item = &'a InventoryHash>,
peer: SocketAddr,
) -> Option<Self> {
let hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
let hashes = hashes.try_into().ok();
hashes.map(|hashes| InventoryStatus::Missing((hashes, peer)))
}
}
impl<T: Clone> InventoryStatus<T> {
/// Returns true if the inventory item was advertised.
#[allow(dead_code)]
pub fn is_advertised(&self) -> bool {
matches!(self, Advertised(_))
}
/// Returns true if the inventory item was missing.
#[allow(dead_code)]
pub fn is_missing(&self) -> bool {
matches!(self, Missing(_))
}
/// Get the advertised inventory item, if present.
pub fn advertised(&self) -> Option<T> {
if let Advertised(item) = self {
Some(item.clone())
} else {
None
}
}
/// Get the rejected inventory item, if present.
#[allow(dead_code)]
pub fn missing(&self) -> Option<T> {
if let Missing(item) = self {
Some(item.clone())
} else {
None
}
}
/// Get the inner item, regardless of status.
pub fn inner(&self) -> T {
match self {
Advertised(item) | Missing(item) => item.clone(),
}
}
/// Get a marker for the status, without any associated data.
pub fn marker(&self) -> InventoryMarker {
self.as_ref().map(|_inner| ())
}
/// Maps an `InventoryStatus<T>` to `InventoryStatus<U>` by applying a function to a contained value.
pub fn map<U: Clone, F: FnOnce(T) -> U>(self, f: F) -> InventoryStatus<U> {
// Based on Option::map from https://doc.rust-lang.org/src/core/option.rs.html#829
match self {
Advertised(item) => Advertised(f(item)),
Missing(item) => Missing(f(item)),
}
}
/// Converts from `&InventoryStatus<T>` to `InventoryStatus<&T>`.
pub fn as_ref(&self) -> InventoryStatus<&T> {
match self {
Advertised(item) => Advertised(item),
Missing(item) => Missing(item),
}
}
}
impl InventoryRegistry {
/// Returns an Inventory Registry
pub fn new(inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>) -> Self {
/// Returns a new Inventory Registry for `inv_stream`.
pub fn new(inv_stream: broadcast::Receiver<InventoryChange>) -> Self {
let interval = Duration::from_secs(
POST_BLOSSOM_POW_TARGET_SPACING
.try_into()
.expect("non-negative"),
);
// Don't do an immediate rotation, current and prev are already empty.
let mut interval = tokio::time::interval_at(Instant::now() + interval, interval);
// SECURITY: if the rotation time is late, delay future rotations by the same amount
interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
Self {
current: Default::default(),
prev: Default::default(),
inv_stream: BroadcastStream::new(inv_stream).boxed(),
interval: IntervalStream::new(time::interval(Duration::from_secs(75))),
interval: IntervalStream::new(interval),
}
}
/// Returns an iterator over addrs of peers that have recently advertised
/// having `hash` in their inventory.
pub fn peers(&self, hash: &InventoryHash) -> impl Iterator<Item = &SocketAddr> {
let prev = self.prev.get(hash).into_iter();
let current = self.current.get(hash).into_iter();
/// Returns an iterator over addrs of peers that have recently advertised `hash` in their inventory.
pub fn advertising_peers(&self, hash: InventoryHash) -> impl Iterator<Item = &SocketAddr> {
self.status_peers(hash)
.filter_map(|addr_status| addr_status.advertised())
}
prev.chain(current).flatten()
/// Returns an iterator over addrs of peers that have recently missed `hash` in their inventory.
#[allow(dead_code)]
pub fn missing_peers(&self, hash: InventoryHash) -> impl Iterator<Item = &SocketAddr> {
self.status_peers(hash)
.filter_map(|addr_status| addr_status.missing())
}
/// Returns an iterator over peer inventory statuses for `hash`.
///
/// Prefers current statuses to previously rotated statuses for the same peer.
pub fn status_peers(
&self,
hash: InventoryHash,
) -> impl Iterator<Item = InventoryStatus<&SocketAddr>> {
let prev = self.prev.get(&hash);
let current = self.current.get(&hash);
// # Security
//
// Prefer `current` statuses for the same peer over previously rotated statuses.
// This makes sure Zebra is using the most up-to-date network information.
let prev = prev
.into_iter()
.flatten()
.filter(move |(addr, _status)| !self.has_current_status(hash, **addr));
let current = current.into_iter().flatten();
current
.chain(prev)
.map(|(addr, status)| status.map(|()| addr))
}
/// Returns true if there is a current status entry for `hash` and `addr`.
pub fn has_current_status(&self, hash: InventoryHash, addr: SocketAddr) -> bool {
self.current
.get(&hash)
.and_then(|current| current.get(&addr))
.is_some()
}
/// Returns a future that polls once for new registry updates.
#[allow(dead_code)]
pub fn update(&mut self) -> Update {
Update::new(self)
}
/// Drive periodic inventory tasks
@ -75,6 +268,7 @@ impl InventoryRegistry {
/// - rotates HashMaps based on interval events
/// - drains the inv_stream channel and registers all advertised inventory
pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Result<(), BoxError> {
// Correctness: Registers the current task for wakeup when the timer next becomes ready.
while Pin::new(&mut self.interval).poll_next(cx).is_ready() {
self.rotate();
}
@ -97,10 +291,14 @@ impl InventoryRegistry {
// failure of the peer set.
while let Poll::Ready(channel_result) = self.inv_stream.next().poll_unpin(cx) {
match channel_result {
Some(Ok((hash, addr))) => self.register(hash, addr),
Some(Ok(change)) => self.register(change),
Some(Err(BroadcastStreamRecvError::Lagged(count))) => {
metrics::counter!("pool.inventory.dropped", 1);
tracing::debug!(count, "dropped lagged inventory advertisements");
metrics::counter!("pool.inventory.dropped.messages", count);
// If this message happens a lot, we should improve inventory registry performance,
// or poll the registry or peer set in a separate task.
info!(count, "dropped lagged inventory advertisements");
}
// This indicates all senders, including the one in the handshaker,
// have been dropped, which really is a permanent failure.
@ -111,9 +309,52 @@ impl InventoryRegistry {
Ok(())
}
/// Record that the given inventory `hash` is available from the peer `addr`
fn register(&mut self, hash: InventoryHash, addr: SocketAddr) {
self.current.entry(hash).or_default().insert(addr);
/// Record the given inventory `change` for the peer `addr`.
///
/// `Missing` markers are not updated until the registry rotates, for security reasons.
fn register(&mut self, change: InventoryChange) {
let new_status = change.marker();
let (invs, addr) = change.inner();
for inv in invs {
use InventoryHash::*;
assert!(
matches!(inv, Block(_) | Tx(_) | Wtx(_)),
"unexpected inventory type: {:?} from peer: {:?}",
inv,
addr,
);
let current = self.current.entry(inv).or_default();
// # Security
//
// Prefer `missing` over `advertised`, so malicious peers can't reset their own entries,
// and funnel multiple failing requests to themselves.
if let Some(old_status) = current.get(&addr) {
if old_status.is_missing() && new_status.is_advertised() {
debug!(?new_status, ?old_status, ?addr, ?inv, "skipping new status");
continue;
}
debug!(
?new_status,
?old_status,
?addr,
?inv,
"keeping both new and old status"
);
}
let replaced_status = current.insert(addr, new_status);
debug!(
?new_status,
?replaced_status,
?addr,
?inv,
"inserted new status"
);
}
}
/// Replace the prev HashMap with current's and replace current with an empty

View File

@ -0,0 +1,3 @@
//! Tests for the inventory registry.
mod vectors;

View File

@ -0,0 +1,198 @@
//! Fixed test vectors for the inventory registry.
use tokio::sync::broadcast;
use zebra_chain::block;
use crate::{
peer_set::{
inventory_registry::{InventoryRegistry, InventoryStatus},
InventoryChange,
},
protocol::external::InventoryHash,
};
/// The number of changes that can be pending in the inventory channel, before it starts lagging.
///
/// Lagging drops messages, so tests should avoid filling the channel.
pub const MAX_PENDING_CHANGES: usize = 32;
/// Check an empty inventory registry works as expected.
#[tokio::test]
async fn inv_registry_empty_ok() {
let fake_hash = InventoryHash::Error;
let (mut inv_registry, _inv_stream_tx) = new_inv_registry();
inv_registry
.update()
.await
.expect("unexpected dropped registry sender channel");
assert_eq!(inv_registry.advertising_peers(fake_hash).count(), 0);
assert_eq!(inv_registry.missing_peers(fake_hash).count(), 0);
}
/// Check inventory registration for one advertised hash/peer.
#[tokio::test]
async fn inv_registry_one_advertised_ok() {
let test_hash = InventoryHash::Block(block::Hash([0; 32]));
let test_peer = "1.1.1.1:1"
.parse()
.expect("unexpected invalid peer address");
let test_change = InventoryStatus::new_advertised(test_hash, test_peer);
let (mut inv_registry, inv_stream_tx) = new_inv_registry();
let receiver_count = inv_stream_tx
.send(test_change)
.expect("unexpected failed inventory status send");
assert_eq!(receiver_count, 1);
inv_registry
.update()
.await
.expect("unexpected dropped registry sender channel");
assert_eq!(
inv_registry.advertising_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
}
/// Check inventory registration for one missing hash/peer.
#[tokio::test]
async fn inv_registry_one_missing_ok() {
let test_hash = InventoryHash::Block(block::Hash([0; 32]));
let test_peer = "1.1.1.1:1"
.parse()
.expect("unexpected invalid peer address");
let test_change = InventoryStatus::new_missing(test_hash, test_peer);
let (mut inv_registry, inv_stream_tx) = new_inv_registry();
let receiver_count = inv_stream_tx
.send(test_change)
.expect("unexpected failed inventory status send");
assert_eq!(receiver_count, 1);
inv_registry
.update()
.await
.expect("unexpected dropped registry sender channel");
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0);
assert_eq!(
inv_registry.missing_peers(test_hash).next(),
Some(&test_peer),
);
}
/// Check inventory registration for one hash/peer prefers missing over advertised.
#[tokio::test]
async fn inv_registry_prefer_missing_ok() {
inv_registry_prefer_missing_order(true).await;
inv_registry_prefer_missing_order(false).await;
}
async fn inv_registry_prefer_missing_order(missing_first: bool) {
let test_hash = InventoryHash::Block(block::Hash([0; 32]));
let test_peer = "1.1.1.1:1"
.parse()
.expect("unexpected invalid peer address");
let missing_change = InventoryStatus::new_missing(test_hash, test_peer);
let advertised_change = InventoryStatus::new_advertised(test_hash, test_peer);
let (mut inv_registry, inv_stream_tx) = new_inv_registry();
let changes = if missing_first {
[missing_change, advertised_change]
} else {
[advertised_change, missing_change]
};
for change in changes {
let receiver_count = inv_stream_tx
.send(change)
.expect("unexpected failed inventory status send");
assert_eq!(receiver_count, 1);
}
// TODO: also test with updates after each change
inv_registry
.update()
.await
.expect("unexpected dropped registry sender channel");
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0);
assert_eq!(
inv_registry.missing_peers(test_hash).next(),
Some(&test_peer),
);
}
/// Check inventory registration for one hash/peer prefers current over previous.
#[tokio::test]
async fn inv_registry_prefer_current_ok() {
inv_registry_prefer_current_order(true).await;
inv_registry_prefer_current_order(false).await;
}
async fn inv_registry_prefer_current_order(missing_current: bool) {
let test_hash = InventoryHash::Block(block::Hash([0; 32]));
let test_peer = "1.1.1.1:1"
.parse()
.expect("unexpected invalid peer address");
let missing_change = InventoryStatus::new_missing(test_hash, test_peer);
let advertised_change = InventoryStatus::new_advertised(test_hash, test_peer);
let (mut inv_registry, inv_stream_tx) = new_inv_registry();
let changes = if missing_current {
[advertised_change, missing_change]
} else {
[missing_change, advertised_change]
};
for change in changes {
// This rotation has no effect in the first loop iteration, because the registry is empty.
inv_registry.rotate();
let receiver_count = inv_stream_tx
.send(change)
.expect("unexpected failed inventory status send");
assert_eq!(receiver_count, 1);
// We must update after each change, so the rotation puts the first change in `prev`.
inv_registry
.update()
.await
.expect("unexpected dropped registry sender channel");
}
if missing_current {
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 0);
assert_eq!(
inv_registry.missing_peers(test_hash).next(),
Some(&test_peer),
);
} else {
assert_eq!(
inv_registry.advertising_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
}
}
/// Returns a newly initialised inventory registry, and a sender for its inventory channel.
fn new_inv_registry() -> (InventoryRegistry, broadcast::Sender<InventoryChange>) {
let (inv_stream_tx, inv_stream_rx) = broadcast::channel(MAX_PENDING_CHANGES);
let inv_registry = InventoryRegistry::new(inv_stream_rx);
(inv_registry, inv_stream_tx)
}

View File

@ -0,0 +1,36 @@
//! Inventory registry update future.
use std::pin::Pin;
use futures::{
future::Future,
task::{Context, Poll},
};
use crate::{peer_set::InventoryRegistry, BoxError};
/// Future for the [`update`](super::InventoryRegistry::update) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Update<'a> {
registry: &'a mut InventoryRegistry,
}
impl Unpin for Update<'_> {}
impl<'a> Update<'a> {
#[allow(dead_code)]
pub(super) fn new(registry: &'a mut InventoryRegistry) -> Self {
Self { registry }
}
}
impl Future for Update<'_> {
type Output = Result<(), BoxError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// TODO: should the future wait until new changes arrive?
// or for the rotation timer?
Poll::Ready(self.registry.poll_inventory(cx))
}
}

View File

@ -124,7 +124,7 @@ use crate::{
peer::{LoadTrackedClient, MinimumPeerVersion},
peer_set::{
unready_service::{Error as UnreadyError, UnreadyService},
InventoryRegistry,
InventoryChange, InventoryRegistry,
},
protocol::{
external::InventoryHash,
@ -256,7 +256,7 @@ where
/// - `handle_rx`: receives background task handles,
/// monitors them to make sure they're still running,
/// and shuts down all the tasks as soon as one task exits;
/// - `inv_stream`: receives inventory advertisements for peers,
/// - `inv_stream`: receives inventory changes from peers,
/// allowing the peer set to direct inventory requests;
/// - `address_book`: when peer set is busy, it logs address book diagnostics.
pub fn new(
@ -264,7 +264,7 @@ where
discover: D,
demand_signal: mpsc::Sender<MorePeers>,
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>,
inv_stream: broadcast::Receiver<InventoryChange>,
address_metrics: watch::Receiver<AddressMetrics>,
minimum_peer_version: MinimumPeerVersion<C>,
) -> Self {
@ -659,7 +659,7 @@ where
) -> <Self as tower::Service<Request>>::Future {
let inventory_peer_list = self
.inventory_registry
.peers(&hash)
.advertising_peers(hash)
.filter(|&key| self.ready_services.contains_key(key))
.copied()
.collect();

View File

@ -19,12 +19,11 @@ use zebra_chain::{
parameters::{Network, NetworkUpgrade},
};
use super::MorePeers;
use crate::{
address_book::AddressMetrics,
peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion},
peer_set::PeerSet,
protocol::external::{types::Version, InventoryHash},
peer_set::{set::MorePeers, InventoryChange, PeerSet},
protocol::external::types::Version,
AddressBook, Config,
};
@ -113,7 +112,7 @@ struct PeerSetBuilder<D, C> {
discover: Option<D>,
demand_signal: Option<mpsc::Sender<MorePeers>>,
handle_rx: Option<tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>>,
inv_stream: Option<broadcast::Receiver<(InventoryHash, SocketAddr)>>,
inv_stream: Option<broadcast::Receiver<InventoryChange>>,
address_book: Option<Arc<std::sync::Mutex<AddressBook>>>,
minimum_peer_version: Option<MinimumPeerVersion<C>>,
}
@ -207,7 +206,7 @@ pub struct PeerSetGuard {
background_tasks_sender:
Option<tokio::sync::oneshot::Sender<Vec<JoinHandle<Result<(), BoxError>>>>>,
demand_receiver: Option<mpsc::Receiver<MorePeers>>,
inventory_sender: Option<broadcast::Sender<(InventoryHash, SocketAddr)>>,
inventory_sender: Option<broadcast::Sender<InventoryChange>>,
address_book: Option<Arc<std::sync::Mutex<AddressBook>>>,
}
@ -217,6 +216,31 @@ impl PeerSetGuard {
PeerSetGuard::default()
}
/// Return a mutable reference to the background tasks sender, if present.
#[allow(dead_code)]
pub fn background_tasks_sender(
&mut self,
) -> &mut Option<tokio::sync::oneshot::Sender<Vec<JoinHandle<Result<(), BoxError>>>>> {
&mut self.background_tasks_sender
}
/// Return a mutable reference to the background tasks sender, if present.
#[allow(dead_code)]
pub fn demand_receiver(&mut self) -> &mut Option<mpsc::Receiver<MorePeers>> {
&mut self.demand_receiver
}
/// Return a mutable reference to the background tasks sender, if present.
pub fn inventory_sender(&mut self) -> &mut Option<broadcast::Sender<InventoryChange>> {
&mut self.inventory_sender
}
/// Return a mutable reference to the background tasks sender, if present.
#[allow(dead_code)]
pub fn address_book(&mut self) -> &mut Option<Arc<std::sync::Mutex<AddressBook>>> {
&mut self.address_book
}
/// Create a dummy channel for the background tasks sent to the [`PeerSet`].
///
/// The sender is stored inside the [`PeerSetGuard`], while the receiver is returned to be
@ -247,9 +271,7 @@ impl PeerSetGuard {
///
/// The sender is stored inside the [`PeerSetGuard`], while the receiver is returned to be
/// passed to the [`PeerSet`] constructor.
pub fn create_inventory_receiver(
&mut self,
) -> broadcast::Receiver<(InventoryHash, SocketAddr)> {
pub fn create_inventory_receiver(&mut self) -> broadcast::Receiver<InventoryChange> {
let (sender, receiver) = broadcast::channel(1);
self.inventory_sender = Some(sender);

View File

@ -1,13 +1,18 @@
use std::time::Duration;
use std::{iter, time::Duration};
use tokio::time::timeout;
use tower::{Service, ServiceExt};
use zebra_chain::parameters::{Network, NetworkUpgrade};
use zebra_chain::{
block,
parameters::{Network, NetworkUpgrade},
};
use super::{PeerSetBuilder, PeerVersions};
use crate::{
peer::{ClientRequest, MinimumPeerVersion},
protocol::external::types::Version,
peer_set::inventory_registry::InventoryStatus,
protocol::external::{types::Version, InventoryHash},
Request,
};
@ -164,3 +169,159 @@ fn peer_set_ready_multiple_connections() {
assert!(timeout(Duration::from_secs(10), peer_ready).await.is_err());
});
}
/// Check that a peer set with an empty inventory registry routes requests to a random ready peer.
#[test]
fn peer_set_route_inv_empty_registry() {
let test_hash = block::Hash([0; 32]);
// Use two peers with the same version
let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Canopy);
let peer_versions = PeerVersions {
peer_versions: vec![peer_version, peer_version],
};
// Start the runtime
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
// Pause the runtime's timer so that it advances automatically.
//
// CORRECTNESS: This test does not depend on external resources that could really timeout, like
// real network connections.
tokio::time::pause();
// Get peers and client handles of them
let (discovered_peers, handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet);
// Make sure we have the right number of peers
assert_eq!(handles.len(), 2);
runtime.block_on(async move {
// Build a peerset
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();
// Get peerset ready
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");
// Check we have the right amount of ready services
assert_eq!(peer_ready.ready_services.len(), 2);
// Send an inventory-based request
let sent_request = Request::BlocksByHash(iter::once(test_hash).collect());
let _fut = peer_ready.call(sent_request.clone());
// Check that one of the clients received the request
let mut received_count = 0;
for mut handle in handles {
if let Some(ClientRequest { request, .. }) =
handle.try_to_receive_outbound_client_request().request()
{
assert_eq!(sent_request, request);
received_count += 1;
}
}
assert_eq!(received_count, 1);
});
}
/// Check that a peer set routes inventory requests to a peer that has advertised that inventory.
#[test]
fn peer_set_route_inv_via_registry() {
let test_hash = block::Hash([0; 32]);
let test_inv = InventoryHash::Block(test_hash);
// Hard-code the fixed test address created by mock_peer_discovery
// TODO: add peer test addresses to ClientTestHarness
let test_peer = "127.0.0.1:1"
.parse()
.expect("unexpected invalid peer address");
let test_change = InventoryStatus::new_advertised(test_inv, test_peer);
// Use two peers with the same version
let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Canopy);
let peer_versions = PeerVersions {
peer_versions: vec![peer_version, peer_version],
};
// Start the runtime
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
// Pause the runtime's timer so that it advances automatically.
//
// CORRECTNESS: This test does not depend on external resources that could really timeout, like
// real network connections.
tokio::time::pause();
// Get peers and client handles of them
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet);
// Make sure we have the right number of peers
assert_eq!(handles.len(), 2);
runtime.block_on(async move {
// Build a peerset
let (mut peer_set, mut peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();
// Advertise some inventory
peer_set_guard
.inventory_sender()
.as_mut()
.expect("unexpected missing inv sender")
.send(test_change)
.expect("unexpected dropped receiver");
// Get peerset ready
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");
// Check we have the right amount of ready services
assert_eq!(peer_ready.ready_services.len(), 2);
// Send an inventory-based request
let sent_request = Request::BlocksByHash(iter::once(test_hash).collect());
let _fut = peer_ready.call(sent_request.clone());
// Check that the client that advertised the inventory received the request
let advertised_handle = &mut handles[0];
if let Some(ClientRequest { request, .. }) = advertised_handle
.try_to_receive_outbound_client_request()
.request()
{
assert_eq!(sent_request, request);
} else {
panic!("inv request not routed to advertised peer");
}
let other_handle = &mut handles[1];
assert!(
matches!(
other_handle
.try_to_receive_outbound_client_request()
.request(),
None
),
"request routed to non-advertised peer",
);
});
}

View File

@ -50,7 +50,6 @@ pub enum InventoryHash {
/// [auth_digest]: https://zips.z.cash/zip-0244#authorizing-data-commitment
/// [zip239]: https://zips.z.cash/zip-0239
/// [bip339]: https://github.com/bitcoin/bips/blob/master/bip-0339.mediawiki
// TODO: Actually handle this variant once the mempool is implemented (#2449)
Wtx(transaction::WtxId),
}