2. Route peer requests based on missing inventory (#3465)

* feat(network): send notfound messages to the inventory registry

* refactor(network): move the inventory filter into an async function

* feat(network): avoid routing requests to peers that are missing inventory

* test(network): advertised routing is independent of numeric address value

* test(network): peer set routes requests to peers not missing that inventory

* test(network): peer set fails requests if all ready peers are missing that inventory

* fix(clippy): needless-borrow in the peer set

* fix(lint): remove redundant trailing commas in macro calls

There is no clippy lint for this, maybe because some macros
are sensitive to trailing commas.
(But not the ones changed in this commit.)

* test(network): check the exact number of inventory peers

* doc(network): explain why we ignore inventory send failures

* docs(network): explain why a channel error is ignored
This commit is contained in:
teor 2022-02-08 11:16:41 +10:00 committed by GitHub
parent 1a14baf8e7
commit 9be13a4fb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 367 additions and 79 deletions

View File

@ -214,7 +214,7 @@ fn v5_coinbase_transaction_without_enable_spends_flag_passes_validation() {
insert_fake_orchard_shielded_data(&mut transaction);
assert!(check::coinbase_tx_no_prevout_joinsplit_spend(&transaction).is_ok(),);
assert!(check::coinbase_tx_no_prevout_joinsplit_spend(&transaction).is_ok());
}
#[test]

View File

@ -86,7 +86,7 @@ async fn connect_isolated_run_tor_once_with(network: Network, hostname: String)
// We make the test pass if there are network errors, if we get a valid running service,
// or if we are still waiting for Tor or the handshake.
let outbound_result = outbound_join_handle_timeout.await;
assert!(matches!(outbound_result, Ok(Ok(_)) | Err(_),));
assert!(matches!(outbound_result, Ok(Ok(_)) | Err(_)));
outbound_join_handle.abort();
}

View File

@ -29,6 +29,7 @@ use zebra_chain::{
block,
chain_tip::{ChainTip, NoChainTip},
parameters::Network,
serialization::SerializationError,
};
use crate::{
@ -917,57 +918,7 @@ where
.then(move |msg| {
let inv_collector = inv_collector.clone();
let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter");
async move {
if let (Ok(Message::Inv(hashes)), Some(transient_addr)) =
(&msg, connected_addr.get_transient_addr())
{
// We ignore inventory messages with more than one
// block, because they are most likely replies to a
// query, rather than a newly gossiped block.
//
// (We process inventory messages with any number of
// transactions.)
//
// https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
//
// Note: zcashd has a bug where it merges queued inv messages of
// the same or different types. Zebra compensates by sending `notfound`
// responses to the inv collector. (#2156, #1768)
//
// (We can't split `inv`s, because that fills the inventory registry
// with useless entries that the whole network has, making it large and slow.)
match hashes.as_slice() {
[hash @ InventoryHash::Block(_)] => {
debug!(?hash, "registering gossiped block inventory for peer");
// The peer set and inv collector use the peer's remote
// address as an identifier
let _ = inv_collector.send(InventoryChange::new_advertised(
*hash,
transient_addr,
));
}
[hashes @ ..] => {
let hashes =
hashes.iter().filter(|hash| hash.unmined_tx_id().is_some());
debug!(
?hashes,
"registering unmined transaction inventory for peer"
);
if let Some(change) = InventoryChange::new_advertised_multi(
hashes,
transient_addr,
) {
let _ = inv_collector.send(change);
}
}
}
}
msg
}
.instrument(span)
register_inventory_status(msg, connected_addr, inv_collector).instrument(span)
})
.boxed();
@ -1017,6 +968,76 @@ where
}
}
/// Register any advertised or missing inventory in `msg` for `connected_addr`.
async fn register_inventory_status(
msg: Result<Message, SerializationError>,
connected_addr: ConnectedAddr,
inv_collector: broadcast::Sender<InventoryChange>,
) -> Result<Message, SerializationError> {
match (&msg, connected_addr.get_transient_addr()) {
(Ok(Message::Inv(advertised)), Some(transient_addr)) => {
// We ignore inventory messages with more than one
// block, because they are most likely replies to a
// query, rather than a newly gossiped block.
//
// (We process inventory messages with any number of
// transactions.)
//
// https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
//
// Note: zcashd has a bug where it merges queued inv messages of
// the same or different types. Zebra compensates by sending `notfound`
// responses to the inv collector. (#2156, #1768)
//
// (We can't split `inv`s, because that fills the inventory registry
// with useless entries that the whole network has, making it large and slow.)
match advertised.as_slice() {
[advertised @ InventoryHash::Block(_)] => {
debug!(
?advertised,
"registering gossiped advertised block inventory for peer"
);
// The peer set and inv collector use the peer's remote
// address as an identifier
// If all receivers have been dropped, `send` returns an error.
// When that happens, Zebra is shutting down, so we want to ignore this error.
let _ = inv_collector
.send(InventoryChange::new_advertised(*advertised, transient_addr));
}
[advertised @ ..] => {
let advertised = advertised
.iter()
.filter(|advertised| advertised.unmined_tx_id().is_some());
debug!(
?advertised,
"registering advertised unmined transaction inventory for peer",
);
if let Some(change) =
InventoryChange::new_advertised_multi(advertised, transient_addr)
{
// Ignore channel errors that should only happen during shutdown.
let _ = inv_collector.send(change);
}
}
}
}
(Ok(Message::NotFound(missing)), Some(transient_addr)) => {
debug!(?missing, "registering missing inventory for peer");
if let Some(change) = InventoryChange::new_missing_multi(missing, transient_addr) {
let _ = inv_collector.send(change);
}
}
_ => {}
}
msg
}
/// Send periodical heartbeats to `server_tx`, and update the peer status through
/// `heartbeat_ts_collector`.
///

View File

@ -58,6 +58,7 @@ async fn inv_registry_one_advertised_ok() {
inv_registry.advertising_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
}
@ -87,6 +88,7 @@ async fn inv_registry_one_missing_ok() {
inv_registry.missing_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 1);
}
/// Check inventory registration for one hash/peer prefers missing over advertised.
@ -131,6 +133,7 @@ async fn inv_registry_prefer_missing_order(missing_first: bool) {
inv_registry.missing_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 1);
}
/// Check inventory registration for one hash/peer prefers current over previous.
@ -179,11 +182,13 @@ async fn inv_registry_prefer_current_order(missing_current: bool) {
inv_registry.missing_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 1);
} else {
assert_eq!(
inv_registry.advertising_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
}
}

View File

@ -130,7 +130,7 @@ use crate::{
external::InventoryHash,
internal::{Request, Response},
},
BoxError, Config,
BoxError, Config, PeerError, SharedPeerError,
};
#[cfg(test)]
@ -650,17 +650,22 @@ where
fut.map_err(Into::into).boxed()
}
/// Tries to route a request to a peer that advertised that inventory,
/// falling back to P2C if there is no ready peer.
/// Tries to route a request to a ready peer that advertised that inventory,
/// falling back to a ready peer that isn't missing the inventory.
///
/// If all ready peers are missing the inventory,
/// returns a [`NotFound`](PeerError::NotFound) error.
///
/// Uses P2C to route requests to the least loaded peer in each list.
fn route_inv(
&mut self,
req: Request,
hash: InventoryHash,
) -> <Self as tower::Service<Request>>::Future {
let inventory_peer_list = self
let advertising_peer_list = self
.inventory_registry
.advertising_peers(hash)
.filter(|&key| self.ready_services.contains_key(key))
.filter(|&addr| self.ready_services.contains_key(addr))
.copied()
.collect();
@ -672,21 +677,57 @@ where
// peers would be able to influence our choice by switching addresses.
// But we need the choice to be random,
// so that a peer can't provide all our inventory responses.
let peer = self.select_p2c_peer_from_list(&inventory_peer_list);
let peer = self.select_p2c_peer_from_list(&advertising_peer_list);
match peer.and_then(|key| self.take_ready_service(&key)) {
Some(mut svc) => {
let peer = peer.expect("just checked peer is Some");
tracing::trace!(?hash, ?peer, "routing based on inventory");
let fut = svc.call(req);
self.push_unready(peer, svc);
fut.map_err(Into::into).boxed()
}
None => {
tracing::trace!(?hash, "no ready peer for inventory, falling back to p2c");
self.route_p2c(req)
}
if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
let peer = peer.expect("just checked peer is Some");
tracing::trace!(?hash, ?peer, "routing to a peer which advertised inventory");
let fut = svc.call(req);
self.push_unready(peer, svc);
return fut.map_err(Into::into).boxed();
}
let missing_peer_list: HashSet<SocketAddr> = self
.inventory_registry
.missing_peers(hash)
.copied()
.collect();
let maybe_peer_list = self
.ready_services
.keys()
.filter(|addr| !missing_peer_list.contains(addr))
.copied()
.collect();
// Security: choose a random, less-loaded peer that might have the inventory.
let peer = self.select_p2c_peer_from_list(&maybe_peer_list);
if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
let peer = peer.expect("just checked peer is Some");
tracing::trace!(?hash, ?peer, "routing to a peer that might have inventory");
let fut = svc.call(req);
self.push_unready(peer, svc);
return fut.map_err(Into::into).boxed();
}
// TODO: reduce this log level after testing #2156 and #2726
tracing::info!(
?hash,
"all ready peers are missing inventory, failing request"
);
async move {
// Let other tasks run, so a retry request might get different ready peers.
tokio::task::yield_now().await;
// # Security
//
// Avoid routing requests to peers that are missing inventory.
// If we kept trying doomed requests, peers that are missing our requested inventory
// could take up a large amount of our bandwidth and retry limits.
Err(SharedPeerError::from(PeerError::NotFound(vec![hash])))
}
.map_err(Into::into)
.boxed()
}
/// Routes the same request to up to `max_peers` ready peers, ignoring return values.

View File

@ -13,7 +13,7 @@ use crate::{
peer::{ClientRequest, MinimumPeerVersion},
peer_set::inventory_registry::InventoryStatus,
protocol::external::{types::Version, InventoryHash},
Request,
Request, SharedPeerError,
};
#[test]
@ -236,15 +236,24 @@ fn peer_set_route_inv_empty_registry() {
/// Check that a peer set routes inventory requests to a peer that has advertised that inventory.
#[test]
fn peer_set_route_inv_via_registry() {
fn peer_set_route_inv_advertised_registry() {
peer_set_route_inv_advertised_registry_order(true);
peer_set_route_inv_advertised_registry_order(false);
}
fn peer_set_route_inv_advertised_registry_order(advertised_first: bool) {
let test_hash = block::Hash([0; 32]);
let test_inv = InventoryHash::Block(test_hash);
// Hard-code the fixed test address created by mock_peer_discovery
// TODO: add peer test addresses to ClientTestHarness
let test_peer = "127.0.0.1:1"
.parse()
.expect("unexpected invalid peer address");
let test_peer = if advertised_first {
"127.0.0.1:1"
} else {
"127.0.0.1:2"
}
.parse()
.expect("unexpected invalid peer address");
let test_change = InventoryStatus::new_advertised(test_inv, test_peer);
@ -301,7 +310,11 @@ fn peer_set_route_inv_via_registry() {
let _fut = peer_ready.call(sent_request.clone());
// Check that the client that advertised the inventory received the request
let advertised_handle = &mut handles[0];
let advertised_handle = if advertised_first {
&mut handles[0]
} else {
&mut handles[1]
};
if let Some(ClientRequest { request, .. }) = advertised_handle
.try_to_receive_outbound_client_request()
@ -312,7 +325,11 @@ fn peer_set_route_inv_via_registry() {
panic!("inv request not routed to advertised peer");
}
let other_handle = &mut handles[1];
let other_handle = if advertised_first {
&mut handles[1]
} else {
&mut handles[0]
};
assert!(
matches!(
@ -325,3 +342,207 @@ fn peer_set_route_inv_via_registry() {
);
});
}
/// Check that a peer set routes inventory requests to peers that are not missing that inventory.
#[test]
fn peer_set_route_inv_missing_registry() {
peer_set_route_inv_missing_registry_order(true);
peer_set_route_inv_missing_registry_order(false);
}
fn peer_set_route_inv_missing_registry_order(missing_first: bool) {
let test_hash = block::Hash([0; 32]);
let test_inv = InventoryHash::Block(test_hash);
// Hard-code the fixed test address created by mock_peer_discovery
// TODO: add peer test addresses to ClientTestHarness
let test_peer = if missing_first {
"127.0.0.1:1"
} else {
"127.0.0.1:2"
}
.parse()
.expect("unexpected invalid peer address");
let test_change = InventoryStatus::new_missing(test_inv, test_peer);
// Use two peers with the same version
let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Canopy);
let peer_versions = PeerVersions {
peer_versions: vec![peer_version, peer_version],
};
// Start the runtime
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
// Pause the runtime's timer so that it advances automatically.
//
// CORRECTNESS: This test does not depend on external resources that could really timeout, like
// real network connections.
tokio::time::pause();
// Get peers and client handles of them
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet);
// Make sure we have the right number of peers
assert_eq!(handles.len(), 2);
runtime.block_on(async move {
// Build a peerset
let (mut peer_set, mut peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();
// Mark some inventory as missing
peer_set_guard
.inventory_sender()
.as_mut()
.expect("unexpected missing inv sender")
.send(test_change)
.expect("unexpected dropped receiver");
// Get peerset ready
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");
// Check we have the right amount of ready services
assert_eq!(peer_ready.ready_services.len(), 2);
// Send an inventory-based request
let sent_request = Request::BlocksByHash(iter::once(test_hash).collect());
let _fut = peer_ready.call(sent_request.clone());
// Check that the client missing the inventory did not receive the request
let missing_handle = if missing_first {
&mut handles[0]
} else {
&mut handles[1]
};
assert!(
matches!(
missing_handle
.try_to_receive_outbound_client_request()
.request(),
None
),
"request routed to missing peer",
);
// Check that the client that was not missing the inventory received the request
let other_handle = if missing_first {
&mut handles[1]
} else {
&mut handles[0]
};
if let Some(ClientRequest { request, .. }) = other_handle
.try_to_receive_outbound_client_request()
.request()
{
assert_eq!(sent_request, request);
} else {
panic!(
"inv request should have been routed to the only peer not missing the inventory"
);
}
});
}
/// Check that a peer set fails inventory requests if all peers are missing that inventory.
#[test]
fn peer_set_route_inv_all_missing_fail() {
let test_hash = block::Hash([0; 32]);
let test_inv = InventoryHash::Block(test_hash);
// Hard-code the fixed test address created by mock_peer_discovery
// TODO: add peer test addresses to ClientTestHarness
let test_peer = "127.0.0.1:1"
.parse()
.expect("unexpected invalid peer address");
let test_change = InventoryStatus::new_missing(test_inv, test_peer);
// Use one peer
let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Canopy);
let peer_versions = PeerVersions {
peer_versions: vec![peer_version],
};
// Start the runtime
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
// Pause the runtime's timer so that it advances automatically.
//
// CORRECTNESS: This test does not depend on external resources that could really timeout, like
// real network connections.
tokio::time::pause();
// Get the peer and its client handle
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (minimum_peer_version, _best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet);
// Make sure we have the right number of peers
assert_eq!(handles.len(), 1);
runtime.block_on(async move {
// Build a peerset
let (mut peer_set, mut peer_set_guard) = PeerSetBuilder::new()
.with_discover(discovered_peers)
.with_minimum_peer_version(minimum_peer_version.clone())
.build();
// Mark the inventory as missing for all peers
peer_set_guard
.inventory_sender()
.as_mut()
.expect("unexpected missing inv sender")
.send(test_change)
.expect("unexpected dropped receiver");
// Get peerset ready
let peer_ready = peer_set
.ready()
.await
.expect("peer set service is always ready");
// Check we have the right amount of ready services
assert_eq!(peer_ready.ready_services.len(), 1);
// Send an inventory-based request
let sent_request = Request::BlocksByHash(iter::once(test_hash).collect());
let response_fut = peer_ready.call(sent_request.clone());
// Check that the client missing the inventory did not receive the request
let missing_handle = &mut handles[0];
assert!(
matches!(
missing_handle
.try_to_receive_outbound_client_request()
.request(),
None
),
"request routed to missing peer",
);
// Check that the response is a synthetic error
let response = response_fut.await;
assert_eq!(
response
.expect_err("peer set should return an error (not a Response)")
.downcast_ref::<SharedPeerError>()
.expect("peer set should return a boxed SharedPeerError")
.inner_debug(),
"NotFound([Block(block::Hash(\"0000000000000000000000000000000000000000000000000000000000000000\"))])"
);
});
}

View File

@ -144,7 +144,7 @@ impl QueuedBlocks {
}
}
tracing::trace!(num_blocks = %self.blocks.len(), "Finished pruning blocks at or beneath the finalized tip height",);
tracing::trace!(num_blocks = %self.blocks.len(), "Finished pruning blocks at or beneath the finalized tip height");
self.update_metrics();
}