diff --git a/zebra-consensus/src/transaction/tests.rs b/zebra-consensus/src/transaction/tests.rs index eb7a37944..82d4f1a9d 100644 --- a/zebra-consensus/src/transaction/tests.rs +++ b/zebra-consensus/src/transaction/tests.rs @@ -214,7 +214,7 @@ fn v5_coinbase_transaction_without_enable_spends_flag_passes_validation() { insert_fake_orchard_shielded_data(&mut transaction); - assert!(check::coinbase_tx_no_prevout_joinsplit_spend(&transaction).is_ok(),); + assert!(check::coinbase_tx_no_prevout_joinsplit_spend(&transaction).is_ok()); } #[test] diff --git a/zebra-network/src/isolated/tor/tests/vectors.rs b/zebra-network/src/isolated/tor/tests/vectors.rs index b74ade718..c2a622802 100644 --- a/zebra-network/src/isolated/tor/tests/vectors.rs +++ b/zebra-network/src/isolated/tor/tests/vectors.rs @@ -86,7 +86,7 @@ async fn connect_isolated_run_tor_once_with(network: Network, hostname: String) // We make the test pass if there are network errors, if we get a valid running service, // or if we are still waiting for Tor or the handshake. let outbound_result = outbound_join_handle_timeout.await; - assert!(matches!(outbound_result, Ok(Ok(_)) | Err(_),)); + assert!(matches!(outbound_result, Ok(Ok(_)) | Err(_))); outbound_join_handle.abort(); } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index d8fb5f8f4..4074dfc1b 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -29,6 +29,7 @@ use zebra_chain::{ block, chain_tip::{ChainTip, NoChainTip}, parameters::Network, + serialization::SerializationError, }; use crate::{ @@ -917,57 +918,7 @@ where .then(move |msg| { let inv_collector = inv_collector.clone(); let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter"); - async move { - if let (Ok(Message::Inv(hashes)), Some(transient_addr)) = - (&msg, connected_addr.get_transient_addr()) - { - // We ignore inventory messages with more than one - // block, because they are most likely replies to a - // query, rather than a newly gossiped block. - // - // (We process inventory messages with any number of - // transactions.) - // - // https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring - // - // Note: zcashd has a bug where it merges queued inv messages of - // the same or different types. Zebra compensates by sending `notfound` - // responses to the inv collector. (#2156, #1768) - // - // (We can't split `inv`s, because that fills the inventory registry - // with useless entries that the whole network has, making it large and slow.) - match hashes.as_slice() { - [hash @ InventoryHash::Block(_)] => { - debug!(?hash, "registering gossiped block inventory for peer"); - - // The peer set and inv collector use the peer's remote - // address as an identifier - let _ = inv_collector.send(InventoryChange::new_advertised( - *hash, - transient_addr, - )); - } - [hashes @ ..] => { - let hashes = - hashes.iter().filter(|hash| hash.unmined_tx_id().is_some()); - - debug!( - ?hashes, - "registering unmined transaction inventory for peer" - ); - - if let Some(change) = InventoryChange::new_advertised_multi( - hashes, - transient_addr, - ) { - let _ = inv_collector.send(change); - } - } - } - } - msg - } - .instrument(span) + register_inventory_status(msg, connected_addr, inv_collector).instrument(span) }) .boxed(); @@ -1017,6 +968,76 @@ where } } +/// Register any advertised or missing inventory in `msg` for `connected_addr`. +async fn register_inventory_status( + msg: Result, + connected_addr: ConnectedAddr, + inv_collector: broadcast::Sender, +) -> Result { + match (&msg, connected_addr.get_transient_addr()) { + (Ok(Message::Inv(advertised)), Some(transient_addr)) => { + // We ignore inventory messages with more than one + // block, because they are most likely replies to a + // query, rather than a newly gossiped block. + // + // (We process inventory messages with any number of + // transactions.) + // + // https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring + // + // Note: zcashd has a bug where it merges queued inv messages of + // the same or different types. Zebra compensates by sending `notfound` + // responses to the inv collector. (#2156, #1768) + // + // (We can't split `inv`s, because that fills the inventory registry + // with useless entries that the whole network has, making it large and slow.) + match advertised.as_slice() { + [advertised @ InventoryHash::Block(_)] => { + debug!( + ?advertised, + "registering gossiped advertised block inventory for peer" + ); + + // The peer set and inv collector use the peer's remote + // address as an identifier + // If all receivers have been dropped, `send` returns an error. + // When that happens, Zebra is shutting down, so we want to ignore this error. + let _ = inv_collector + .send(InventoryChange::new_advertised(*advertised, transient_addr)); + } + [advertised @ ..] => { + let advertised = advertised + .iter() + .filter(|advertised| advertised.unmined_tx_id().is_some()); + + debug!( + ?advertised, + "registering advertised unmined transaction inventory for peer", + ); + + if let Some(change) = + InventoryChange::new_advertised_multi(advertised, transient_addr) + { + // Ignore channel errors that should only happen during shutdown. + let _ = inv_collector.send(change); + } + } + } + } + + (Ok(Message::NotFound(missing)), Some(transient_addr)) => { + debug!(?missing, "registering missing inventory for peer"); + + if let Some(change) = InventoryChange::new_missing_multi(missing, transient_addr) { + let _ = inv_collector.send(change); + } + } + _ => {} + } + + msg +} + /// Send periodical heartbeats to `server_tx`, and update the peer status through /// `heartbeat_ts_collector`. /// diff --git a/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs b/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs index f54970b62..e89814d6b 100644 --- a/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs +++ b/zebra-network/src/peer_set/inventory_registry/tests/vectors.rs @@ -58,6 +58,7 @@ async fn inv_registry_one_advertised_ok() { inv_registry.advertising_peers(test_hash).next(), Some(&test_peer), ); + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1); assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); } @@ -87,6 +88,7 @@ async fn inv_registry_one_missing_ok() { inv_registry.missing_peers(test_hash).next(), Some(&test_peer), ); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 1); } /// Check inventory registration for one hash/peer prefers missing over advertised. @@ -131,6 +133,7 @@ async fn inv_registry_prefer_missing_order(missing_first: bool) { inv_registry.missing_peers(test_hash).next(), Some(&test_peer), ); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 1); } /// Check inventory registration for one hash/peer prefers current over previous. @@ -179,11 +182,13 @@ async fn inv_registry_prefer_current_order(missing_current: bool) { inv_registry.missing_peers(test_hash).next(), Some(&test_peer), ); + assert_eq!(inv_registry.missing_peers(test_hash).count(), 1); } else { assert_eq!( inv_registry.advertising_peers(test_hash).next(), Some(&test_peer), ); + assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1); assert_eq!(inv_registry.missing_peers(test_hash).count(), 0); } } diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 3277f5ada..e8e8a6f91 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -130,7 +130,7 @@ use crate::{ external::InventoryHash, internal::{Request, Response}, }, - BoxError, Config, + BoxError, Config, PeerError, SharedPeerError, }; #[cfg(test)] @@ -650,17 +650,22 @@ where fut.map_err(Into::into).boxed() } - /// Tries to route a request to a peer that advertised that inventory, - /// falling back to P2C if there is no ready peer. + /// Tries to route a request to a ready peer that advertised that inventory, + /// falling back to a ready peer that isn't missing the inventory. + /// + /// If all ready peers are missing the inventory, + /// returns a [`NotFound`](PeerError::NotFound) error. + /// + /// Uses P2C to route requests to the least loaded peer in each list. fn route_inv( &mut self, req: Request, hash: InventoryHash, ) -> >::Future { - let inventory_peer_list = self + let advertising_peer_list = self .inventory_registry .advertising_peers(hash) - .filter(|&key| self.ready_services.contains_key(key)) + .filter(|&addr| self.ready_services.contains_key(addr)) .copied() .collect(); @@ -672,21 +677,57 @@ where // peers would be able to influence our choice by switching addresses. // But we need the choice to be random, // so that a peer can't provide all our inventory responses. - let peer = self.select_p2c_peer_from_list(&inventory_peer_list); + let peer = self.select_p2c_peer_from_list(&advertising_peer_list); - match peer.and_then(|key| self.take_ready_service(&key)) { - Some(mut svc) => { - let peer = peer.expect("just checked peer is Some"); - tracing::trace!(?hash, ?peer, "routing based on inventory"); - let fut = svc.call(req); - self.push_unready(peer, svc); - fut.map_err(Into::into).boxed() - } - None => { - tracing::trace!(?hash, "no ready peer for inventory, falling back to p2c"); - self.route_p2c(req) - } + if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) { + let peer = peer.expect("just checked peer is Some"); + tracing::trace!(?hash, ?peer, "routing to a peer which advertised inventory"); + let fut = svc.call(req); + self.push_unready(peer, svc); + return fut.map_err(Into::into).boxed(); } + + let missing_peer_list: HashSet = self + .inventory_registry + .missing_peers(hash) + .copied() + .collect(); + let maybe_peer_list = self + .ready_services + .keys() + .filter(|addr| !missing_peer_list.contains(addr)) + .copied() + .collect(); + + // Security: choose a random, less-loaded peer that might have the inventory. + let peer = self.select_p2c_peer_from_list(&maybe_peer_list); + + if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) { + let peer = peer.expect("just checked peer is Some"); + tracing::trace!(?hash, ?peer, "routing to a peer that might have inventory"); + let fut = svc.call(req); + self.push_unready(peer, svc); + return fut.map_err(Into::into).boxed(); + } + + // TODO: reduce this log level after testing #2156 and #2726 + tracing::info!( + ?hash, + "all ready peers are missing inventory, failing request" + ); + async move { + // Let other tasks run, so a retry request might get different ready peers. + tokio::task::yield_now().await; + + // # Security + // + // Avoid routing requests to peers that are missing inventory. + // If we kept trying doomed requests, peers that are missing our requested inventory + // could take up a large amount of our bandwidth and retry limits. + Err(SharedPeerError::from(PeerError::NotFound(vec![hash]))) + } + .map_err(Into::into) + .boxed() } /// Routes the same request to up to `max_peers` ready peers, ignoring return values. diff --git a/zebra-network/src/peer_set/set/tests/vectors.rs b/zebra-network/src/peer_set/set/tests/vectors.rs index f4cdc55a5..2b02091c1 100644 --- a/zebra-network/src/peer_set/set/tests/vectors.rs +++ b/zebra-network/src/peer_set/set/tests/vectors.rs @@ -13,7 +13,7 @@ use crate::{ peer::{ClientRequest, MinimumPeerVersion}, peer_set::inventory_registry::InventoryStatus, protocol::external::{types::Version, InventoryHash}, - Request, + Request, SharedPeerError, }; #[test] @@ -236,15 +236,24 @@ fn peer_set_route_inv_empty_registry() { /// Check that a peer set routes inventory requests to a peer that has advertised that inventory. #[test] -fn peer_set_route_inv_via_registry() { +fn peer_set_route_inv_advertised_registry() { + peer_set_route_inv_advertised_registry_order(true); + peer_set_route_inv_advertised_registry_order(false); +} + +fn peer_set_route_inv_advertised_registry_order(advertised_first: bool) { let test_hash = block::Hash([0; 32]); let test_inv = InventoryHash::Block(test_hash); // Hard-code the fixed test address created by mock_peer_discovery // TODO: add peer test addresses to ClientTestHarness - let test_peer = "127.0.0.1:1" - .parse() - .expect("unexpected invalid peer address"); + let test_peer = if advertised_first { + "127.0.0.1:1" + } else { + "127.0.0.1:2" + } + .parse() + .expect("unexpected invalid peer address"); let test_change = InventoryStatus::new_advertised(test_inv, test_peer); @@ -301,7 +310,11 @@ fn peer_set_route_inv_via_registry() { let _fut = peer_ready.call(sent_request.clone()); // Check that the client that advertised the inventory received the request - let advertised_handle = &mut handles[0]; + let advertised_handle = if advertised_first { + &mut handles[0] + } else { + &mut handles[1] + }; if let Some(ClientRequest { request, .. }) = advertised_handle .try_to_receive_outbound_client_request() @@ -312,7 +325,11 @@ fn peer_set_route_inv_via_registry() { panic!("inv request not routed to advertised peer"); } - let other_handle = &mut handles[1]; + let other_handle = if advertised_first { + &mut handles[1] + } else { + &mut handles[0] + }; assert!( matches!( @@ -325,3 +342,207 @@ fn peer_set_route_inv_via_registry() { ); }); } + +/// Check that a peer set routes inventory requests to peers that are not missing that inventory. +#[test] +fn peer_set_route_inv_missing_registry() { + peer_set_route_inv_missing_registry_order(true); + peer_set_route_inv_missing_registry_order(false); +} + +fn peer_set_route_inv_missing_registry_order(missing_first: bool) { + let test_hash = block::Hash([0; 32]); + let test_inv = InventoryHash::Block(test_hash); + + // Hard-code the fixed test address created by mock_peer_discovery + // TODO: add peer test addresses to ClientTestHarness + let test_peer = if missing_first { + "127.0.0.1:1" + } else { + "127.0.0.1:2" + } + .parse() + .expect("unexpected invalid peer address"); + + let test_change = InventoryStatus::new_missing(test_inv, test_peer); + + // Use two peers with the same version + let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Canopy); + let peer_versions = PeerVersions { + peer_versions: vec![peer_version, peer_version], + }; + + // Start the runtime + let runtime = zebra_test::init_async(); + let _guard = runtime.enter(); + + // Pause the runtime's timer so that it advances automatically. + // + // CORRECTNESS: This test does not depend on external resources that could really timeout, like + // real network connections. + tokio::time::pause(); + + // Get peers and client handles of them + let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery(); + let (minimum_peer_version, _best_tip_height) = + MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet); + + // Make sure we have the right number of peers + assert_eq!(handles.len(), 2); + + runtime.block_on(async move { + // Build a peerset + let (mut peer_set, mut peer_set_guard) = PeerSetBuilder::new() + .with_discover(discovered_peers) + .with_minimum_peer_version(minimum_peer_version.clone()) + .build(); + + // Mark some inventory as missing + peer_set_guard + .inventory_sender() + .as_mut() + .expect("unexpected missing inv sender") + .send(test_change) + .expect("unexpected dropped receiver"); + + // Get peerset ready + let peer_ready = peer_set + .ready() + .await + .expect("peer set service is always ready"); + + // Check we have the right amount of ready services + assert_eq!(peer_ready.ready_services.len(), 2); + + // Send an inventory-based request + let sent_request = Request::BlocksByHash(iter::once(test_hash).collect()); + let _fut = peer_ready.call(sent_request.clone()); + + // Check that the client missing the inventory did not receive the request + let missing_handle = if missing_first { + &mut handles[0] + } else { + &mut handles[1] + }; + + assert!( + matches!( + missing_handle + .try_to_receive_outbound_client_request() + .request(), + None + ), + "request routed to missing peer", + ); + + // Check that the client that was not missing the inventory received the request + let other_handle = if missing_first { + &mut handles[1] + } else { + &mut handles[0] + }; + + if let Some(ClientRequest { request, .. }) = other_handle + .try_to_receive_outbound_client_request() + .request() + { + assert_eq!(sent_request, request); + } else { + panic!( + "inv request should have been routed to the only peer not missing the inventory" + ); + } + }); +} + +/// Check that a peer set fails inventory requests if all peers are missing that inventory. +#[test] +fn peer_set_route_inv_all_missing_fail() { + let test_hash = block::Hash([0; 32]); + let test_inv = InventoryHash::Block(test_hash); + + // Hard-code the fixed test address created by mock_peer_discovery + // TODO: add peer test addresses to ClientTestHarness + let test_peer = "127.0.0.1:1" + .parse() + .expect("unexpected invalid peer address"); + + let test_change = InventoryStatus::new_missing(test_inv, test_peer); + + // Use one peer + let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Canopy); + let peer_versions = PeerVersions { + peer_versions: vec![peer_version], + }; + + // Start the runtime + let runtime = zebra_test::init_async(); + let _guard = runtime.enter(); + + // Pause the runtime's timer so that it advances automatically. + // + // CORRECTNESS: This test does not depend on external resources that could really timeout, like + // real network connections. + tokio::time::pause(); + + // Get the peer and its client handle + let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery(); + let (minimum_peer_version, _best_tip_height) = + MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet); + + // Make sure we have the right number of peers + assert_eq!(handles.len(), 1); + + runtime.block_on(async move { + // Build a peerset + let (mut peer_set, mut peer_set_guard) = PeerSetBuilder::new() + .with_discover(discovered_peers) + .with_minimum_peer_version(minimum_peer_version.clone()) + .build(); + + // Mark the inventory as missing for all peers + peer_set_guard + .inventory_sender() + .as_mut() + .expect("unexpected missing inv sender") + .send(test_change) + .expect("unexpected dropped receiver"); + + // Get peerset ready + let peer_ready = peer_set + .ready() + .await + .expect("peer set service is always ready"); + + // Check we have the right amount of ready services + assert_eq!(peer_ready.ready_services.len(), 1); + + // Send an inventory-based request + let sent_request = Request::BlocksByHash(iter::once(test_hash).collect()); + let response_fut = peer_ready.call(sent_request.clone()); + + // Check that the client missing the inventory did not receive the request + let missing_handle = &mut handles[0]; + + assert!( + matches!( + missing_handle + .try_to_receive_outbound_client_request() + .request(), + None + ), + "request routed to missing peer", + ); + + // Check that the response is a synthetic error + let response = response_fut.await; + assert_eq!( + response + .expect_err("peer set should return an error (not a Response)") + .downcast_ref::() + .expect("peer set should return a boxed SharedPeerError") + .inner_debug(), + "NotFound([Block(block::Hash(\"0000000000000000000000000000000000000000000000000000000000000000\"))])" + ); + }); +} diff --git a/zebra-state/src/service/non_finalized_state/queued_blocks.rs b/zebra-state/src/service/non_finalized_state/queued_blocks.rs index a990bcf32..839cc9fee 100644 --- a/zebra-state/src/service/non_finalized_state/queued_blocks.rs +++ b/zebra-state/src/service/non_finalized_state/queued_blocks.rs @@ -144,7 +144,7 @@ impl QueuedBlocks { } } - tracing::trace!(num_blocks = %self.blocks.len(), "Finished pruning blocks at or beneath the finalized tip height",); + tracing::trace!(num_blocks = %self.blocks.len(), "Finished pruning blocks at or beneath the finalized tip height"); self.update_metrics(); }