4. Avoid repeated requests to peers after partial responses or errors (#3505)

* fix(network): split synthetic NotFoundRegistry from message NotFoundResponse

* docs(network): Improve `notfound` message documentation

* refactor(network): Rename MustUseOneshotSender to MustUseClientResponseSender

```
fastmod MustUseOneshotSender MustUseClientResponseSender zebra*
```

* docs(network): fix a comment typo

* refactor(network): remove generics from MustUseClientResponseSender

* refactor(network): add an inventory collector to Client, but don't use it yet

* feat(network): register missing peer responses as missing inventory

We register this missing inventory based on peer responses,
or connection errors or timeouts.

Inbound message inventory tracking requires peers to send `notfound` messages.
But `zcashd` skips `notfound` for blocks, so we can't rely on peer messages.
This missing inventory tracking works regardless of peer `notfound` messages.

* refactor(network): rename ResponseStatus to InventoryResponse

```sh
fastmod ResponseStatus InventoryResponse zebra*
```

* refactor(network): rename InventoryStatus::inner() to to_inner()

* fix(network): remove a redundant runtime.enter() in a test

* doc(network): the exact time used to filter outbound peers doesn't matter

* fix(network): handle block requests slightly more efficiently

* doc(network): fix a typo

* fmt(network): `cargo fmt` after rename ResponseStatus to InventoryResponse

* doc(test): clarify some test comments

* test(network): test synthetic notfound from connection errors and peer inventory routing

* test(network): improve inbound test diagnostics

* feat(network): add a proptest-impl feature to zebra-network

* feat(network): add a test-only connect_isolated_with_inbound function

* test(network): allow a response on the isolated peer test connection

* test(network): fix failures in test synthetic notfound

* test(network): Simplify SharedPeerError test assertions

* test(network): test synthetic notfound from partially successful requests

* test(network): MissingInventoryCollector ignores local NotFoundRegistry errors

* fix(network): decrease the inventory rotation interval

This stops us waiting 3-4 sync resets (4 minutes) before we retry a missing block.

Now we wait 1-2 sync resets (2 minutes), which is still a reasonable rate limit.
This should speed up syncing near the tip, and on testnet.

* fmt(network): cargo fmt --all

* cleanup(network): remove unnecessary allow(dead_code)

* cleanup(network): stop importing the whole sync module into tests

* doc(network): clarify syncer inventory retry constraint

* doc(network): add a TODO for a fix to ensure API behaviour remains consistent

* doc(network): fix a function doc typo

* doc(network): clarify how we handle peers that don't send `notfound`

* docs(network): clarify a test comment

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
teor 2022-02-15 11:44:33 +10:00 committed by GitHub
parent b4d708089b
commit a4dd3b7396
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1024 additions and 238 deletions

View File

@ -55,7 +55,7 @@ zcash_note_encryption = { git = "https://github.com/ZcashFoundation/librustzcash
zcash_primitives = { git = "https://github.com/ZcashFoundation/librustzcash.git", tag = "0.5.1-zebra-v1.0.0-beta.4" }
zcash_history = { git = "https://github.com/ZcashFoundation/librustzcash.git", tag = "0.5.1-zebra-v1.0.0-beta.4" }
proptest = { version = "0.10", optional = true }
proptest = { version = "0.10.1", optional = true }
proptest-derive = { version = "0.3.0", optional = true }
rand = { version = "0.8", optional = true }
@ -76,8 +76,8 @@ itertools = "0.10.3"
spandoc = "0.2"
tracing = "0.1.29"
proptest = "0.10"
proptest-derive = "0.3"
proptest = "0.10.1"
proptest-derive = "0.3.0"
rand = "0.8"
rand_chacha = "0.3"

View File

@ -47,13 +47,13 @@ zebra-chain = { path = "../zebra-chain" }
zebra-state = { path = "../zebra-state" }
zebra-script = { path = "../zebra-script" }
proptest = { version = "0.10", optional = true }
proptest = { version = "0.10.1", optional = true }
proptest-derive = { version = "0.3.0", optional = true }
[dev-dependencies]
color-eyre = "0.5.11"
hex = "0.4.3"
proptest = "0.10"
proptest = "0.10.1"
proptest-derive = "0.3.0"
rand07 = { package = "rand", version = "0.7" }
spandoc = "0.2"

View File

@ -10,6 +10,7 @@ edition = "2021"
[features]
default = []
tor = ["arti-client", "tor-rtcompat"]
proptest-impl = ["proptest", "proptest-derive", "zebra-chain/proptest-impl"]
[dependencies]
bitflags = "1.2"
@ -40,11 +41,16 @@ tracing-error = { version = "0.1.2", features = ["traced-error"] }
arti-client = { version = "0.0.2", optional = true }
tor-rtcompat = { version = "0.0.2", optional = true }
# proptest dependencies
proptest = { version = "0.10.1", optional = true }
proptest-derive = { version = "0.3.0", optional = true }
zebra-chain = { path = "../zebra-chain" }
[dev-dependencies]
proptest = "0.10"
proptest-derive = "0.3"
proptest = "0.10.1"
proptest-derive = "0.3.0"
static_assertions = "1.1.0"
tokio = { version = "1.16.1", features = ["test-util"] }
toml = "0.5"

View File

@ -95,6 +95,11 @@ pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(4);
/// specific manner that matches up with this math.
pub const MIN_PEER_RECONNECTION_DELAY: Duration = Duration::from_secs(59 + 20 + 20 + 20);
/// Zebra rotates its peer inventory registry every time this interval elapses.
///
/// After 2 of these intervals, Zebra's local available and missing inventory entries expire.
pub const INVENTORY_ROTATION_INTERVAL: Duration = Duration::from_secs(53);
/// The default peer address crawler interval.
///
/// This should be at least [`HANDSHAKE_TIMEOUT`](constants::HANDSHAKE_TIMEOUT)
@ -309,6 +314,8 @@ mod tests {
use std::convert::TryFrom;
use zebra_chain::parameters::POST_BLOSSOM_POW_TARGET_SPACING;
use super::*;
/// This assures that the `Duration` value we are computing for
@ -394,4 +401,20 @@ mod tests {
"the address book limit should actually be used"
);
}
/// Make sure inventory registry rotation is consistent with the target block interval.
#[test]
fn ensure_inventory_rotation_consistent() {
zebra_test::init();
assert!(
INVENTORY_ROTATION_INTERVAL
< Duration::from_secs(
POST_BLOSSOM_POW_TARGET_SPACING
.try_into()
.expect("non-negative"),
),
"we should expire inventory every time 1-2 new blocks get generated"
);
}
}

View File

@ -6,7 +6,7 @@ use futures::future::TryFutureExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{
util::{BoxService, Oneshot},
ServiceExt,
Service, ServiceExt,
};
use zebra_chain::{chain_tip::NoChainTip, parameters::Network};
@ -32,11 +32,11 @@ mod tests;
/// this low-level API is useful for custom network crawlers or Tor connections.
///
/// In addition to being completely isolated from all other node state, this
/// method also aims to be minimally distinguishable from other clients.
/// function also aims to be minimally distinguishable from other clients.
///
/// SECURITY TODO: check if the timestamp field can be zeroed, to remove another distinguisher (#3300)
///
/// Note that this method does not implement any timeout behavior, so callers may
/// Note that this function does not implement any timeout behavior, so callers may
/// want to layer it with a timeout as appropriate for their application.
///
/// # Inputs
@ -54,6 +54,37 @@ pub fn connect_isolated<PeerTransport>(
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
where
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let nil_inbound_service =
tower::service_fn(|_req| async move { Ok::<Response, BoxError>(Response::Nil) });
connect_isolated_with_inbound(network, data_stream, user_agent, nil_inbound_service)
}
/// Creates an isolated Zcash peer connection using the provided data stream.
/// This function is for testing purposes only.
///
/// See [`connect_isolated`] for details.
///
/// # Additional Inputs
///
/// - `inbound_service`: a [`tower::Service`] that answers inbound requests from the connected peer.
///
/// # Privacy
///
/// This function can make the isolated connection send different responses to peers,
/// which makes it stand out from other isolated connections from other peers.
pub fn connect_isolated_with_inbound<PeerTransport, InboundService>(
network: Network,
data_stream: PeerTransport,
user_agent: String,
inbound_service: InboundService,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
where
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
InboundService:
Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
InboundService::Future: Send,
{
let config = Config {
network,
@ -62,9 +93,7 @@ where
let handshake = peer::Handshake::builder()
.with_config(config)
.with_inbound_service(tower::service_fn(|_req| async move {
Ok::<Response, BoxError>(Response::Nil)
}))
.with_inbound_service(inbound_service)
.with_user_agent(user_agent)
.with_latest_chain_tip(NoChainTip)
.finish()
@ -101,7 +130,35 @@ pub fn connect_isolated_tcp_direct(
addr: SocketAddr,
user_agent: String,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>> {
let nil_inbound_service =
tower::service_fn(|_req| async move { Ok::<Response, BoxError>(Response::Nil) });
connect_isolated_tcp_direct_with_inbound(network, addr, user_agent, nil_inbound_service)
}
/// Creates an isolated Zcash peer connection using the provided data stream.
/// This function is for testing purposes only.
///
/// See [`connect_isolated_with_inbound`] and [`connect_isolated_tcp_direct`] for details.
///
/// # Privacy
///
/// This function can make the isolated connection send different responses to peers,
/// which makes it stand out from other isolated connections from other peers.
pub fn connect_isolated_tcp_direct_with_inbound<InboundService>(
network: Network,
addr: SocketAddr,
user_agent: String,
inbound_service: InboundService,
) -> impl Future<Output = Result<BoxService<Request, Response, BoxError>, BoxError>>
where
InboundService:
Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
InboundService::Future: Send,
{
tokio::net::TcpStream::connect(addr)
.err_into()
.and_then(move |tcp_stream| connect_isolated(network, tcp_stream, user_agent))
.and_then(move |tcp_stream| {
connect_isolated_with_inbound(network, tcp_stream, user_agent, inbound_service)
})
}

View File

@ -2,13 +2,13 @@
use std::sync::{Arc, Mutex};
use arti_client::{TorAddr, TorClient, TorClientConfig};
use arti_client::{DataStream, TorAddr, TorClient, TorClientConfig};
use tor_rtcompat::tokio::TokioRuntimeHandle;
use tower::util::BoxService;
use tower::{util::BoxService, Service};
use zebra_chain::parameters::Network;
use crate::{connect_isolated, BoxError, Request, Response};
use crate::{connect_isolated, connect_isolated_with_inbound, BoxError, Request, Response};
#[cfg(test)]
mod tests;
@ -45,6 +45,44 @@ pub async fn connect_isolated_tor(
hostname: String,
user_agent: String,
) -> Result<BoxService<Request, Response, BoxError>, BoxError> {
let tor_stream = new_tor_stream(hostname).await?;
// Calling connect_isolated_tor_with_inbound causes lifetime issues.
//
// TODO: fix the lifetime issues, and call connect_isolated_tor_with_inbound
// so the behaviour of both functions is consistent.
connect_isolated(network, tor_stream, user_agent).await
}
/// Creates an isolated Zcash peer connection to `hostname` via Tor.
/// This function is for testing purposes only.
///
/// See [`connect_isolated_with_inbound`] and [`connect_isolated_tor`] for details.
///
/// # Privacy
///
/// This function can make the isolated connection send different responses to peers,
/// which makes it stand out from other isolated connections from other peers.
pub async fn connect_isolated_tor_with_inbound<InboundService>(
network: Network,
hostname: String,
user_agent: String,
inbound_service: InboundService,
) -> Result<BoxService<Request, Response, BoxError>, BoxError>
where
InboundService:
Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
InboundService::Future: Send,
{
let tor_stream = new_tor_stream(hostname).await?;
connect_isolated_with_inbound(network, tor_stream, user_agent, inbound_service).await
}
/// Creates a Zcash peer connection to `hostname` via Tor, and returns a tor stream.
///
/// See [`connect_isolated`] for details.
async fn new_tor_stream(hostname: String) -> Result<DataStream, BoxError> {
let addr = TorAddr::from(hostname)?;
// Initialize or clone the shared tor client instance
@ -55,7 +93,7 @@ pub async fn connect_isolated_tor(
let tor_stream = tor_client.connect(addr, None).await?;
connect_isolated(network, tor_stream, user_agent).await
Ok(tor_stream)
}
/// Returns a new tor client instance, and updates [`SHARED_TOR_CLIENT`].

View File

@ -150,6 +150,14 @@ mod protocol;
#[cfg(feature = "tor")]
pub use crate::isolated::tor::connect_isolated_tor;
#[cfg(all(feature = "tor", any(test, feature = "proptest-impl")))]
pub use crate::isolated::tor::connect_isolated_tor_with_inbound;
#[cfg(any(test, feature = "proptest-impl"))]
pub use crate::isolated::{
connect_isolated_tcp_direct_with_inbound, connect_isolated_with_inbound,
};
pub use crate::{
address_book::AddressBook,
config::Config,
@ -158,10 +166,13 @@ pub use crate::{
peer::{HandshakeError, PeerError, SharedPeerError},
peer_set::init,
policies::RetryLimit,
protocol::internal::{Request, Response, ResponseStatus},
protocol::internal::{InventoryResponse, Request, Response},
};
/// Types used in the definition of [`Request`] and [`Response`] messages.
pub mod types {
pub use crate::{meta_addr::MetaAddr, protocol::types::PeerServices};
#[cfg(any(test, feature = "proptest-impl"))]
pub use crate::protocol::external::InventoryHash;
}

View File

@ -12,12 +12,14 @@ use super::{MetaAddr, MetaAddrChange, PeerServices};
///
/// This should be at least twice the number of [`PeerAddrState`]s, so the tests
/// can cover multiple transitions through every state.
#[allow(dead_code)]
pub const MAX_ADDR_CHANGE: usize = 15;
/// The largest number of random addresses we want to add to an [`AddressBook`].
///
/// This should be at least the number of [`PeerAddrState`]s, so the tests can
/// cover interactions between addresses in different states.
#[allow(dead_code)]
pub const MAX_META_ADDR: usize = 8;
impl MetaAddr {

View File

@ -16,7 +16,7 @@ pub(crate) use client::tests::ReceiveRequestAttempt;
#[cfg(test)]
pub(crate) use handshake::register_inventory_status;
use client::{ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};
use client::{ClientRequestReceiver, InProgressClientRequest, MustUseClientResponseSender};
pub(crate) use client::{CancelHeartbeatTask, ClientRequest};

View File

@ -1,7 +1,10 @@
//! Handles outbound requests from our node to the network.
use std::{
collections::HashSet,
future::Future,
iter,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
@ -12,13 +15,14 @@ use futures::{
stream::{Stream, StreamExt},
FutureExt,
};
use tokio::task::JoinHandle;
use tokio::{sync::broadcast, task::JoinHandle};
use tower::Service;
use crate::{
peer::error::AlreadyErrored,
peer_set::InventoryChange,
protocol::{
external::types::Version,
external::{types::Version, InventoryHash},
internal::{Request, Response},
},
};
@ -37,6 +41,13 @@ pub struct Client {
/// Used to send [`Request`]s to the remote peer.
pub(crate) server_tx: mpsc::Sender<ClientRequest>,
/// Used to register missing inventory in client [`Response`]s,
/// so that the peer set can route retries to other clients.
pub(crate) inv_collector: broadcast::Sender<InventoryChange>,
/// The peer address for registering missing inventory.
pub(crate) transient_addr: Option<SocketAddr>,
/// A slot for an error shared between the Connection and the Client that uses it.
///
/// `None` unless the connection or client have errored.
@ -69,6 +80,13 @@ pub(crate) struct ClientRequest {
/// future that may be moved around before it resolves.
pub tx: oneshot::Sender<Result<Response, SharedPeerError>>,
/// Used to register missing inventory in responses on `tx`,
/// so that the peer set can route retries to other clients.
pub inv_collector: Option<broadcast::Sender<InventoryChange>>,
/// The peer address for registering missing inventory.
pub transient_addr: Option<SocketAddr>,
/// The tracing context for the request, so that work the connection task does
/// processing messages in the context of this request will have correct context.
pub span: tracing::Span,
@ -89,6 +107,7 @@ pub(super) struct ClientRequestReceiver {
pub(super) struct InProgressClientRequest {
/// The actual request.
pub request: Request,
/// The return message channel, included because `peer::Client::call` returns a
/// future that may be moved around before it resolves.
///
@ -99,27 +118,53 @@ pub(super) struct InProgressClientRequest {
/// `Ok(())`, it will assume that it is safe to unconditionally poll the
/// `Receiver` tied to the `Sender` used to create the `ClientRequest`.
///
/// We also take advantage of this invariant to route inventory requests
/// away from peers that did not respond with that inventory.
///
/// We enforce this invariant via the type system, by converting
/// `ClientRequest`s to `InProgressClientRequest`s when they are received by
/// the background task. These conversions are implemented by
/// `ClientRequestReceiver`.
pub tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
pub tx: MustUseClientResponseSender,
/// The tracing context for the request, so that work the connection task does
/// processing messages in the context of this request will have correct context.
pub span: tracing::Span,
}
/// A oneshot::Sender that must be used by calling `send()`.
/// A `oneshot::Sender` for client responses, that must be used by calling `send()`.
/// Also handles forwarding missing inventory to the inventory registry.
///
/// Panics on drop if `tx` has not been used or canceled.
/// Panics if `tx.send()` is used more than once.
#[derive(Debug)]
#[must_use = "tx.send() must be called before drop"]
pub(super) struct MustUseOneshotSender<T: std::fmt::Debug> {
/// The sender for the oneshot channel.
pub(super) struct MustUseClientResponseSender {
/// The sender for the oneshot client response channel.
///
/// `None` if `tx.send()` has been used.
pub tx: Option<oneshot::Sender<T>>,
pub tx: Option<oneshot::Sender<Result<Response, SharedPeerError>>>,
/// Forwards missing inventory in the response to the inventory collector.
///
/// Boxed to reduce the size of containing structures.
pub missing_inv: Option<Box<MissingInventoryCollector>>,
}
/// Forwards missing inventory in the response to the inventory registry.
#[derive(Debug)]
pub(super) struct MissingInventoryCollector {
/// A clone of the original request, if it is an inventory request.
///
/// This struct is only ever created with inventory requests.
request: Request,
/// Used to register missing inventory from responses,
/// so that the peer set can route retries to other clients.
collector: broadcast::Sender<InventoryChange>,
/// The peer address for registering missing inventory.
transient_addr: SocketAddr,
}
impl std::fmt::Debug for Client {
@ -133,12 +178,17 @@ impl std::fmt::Debug for Client {
impl From<ClientRequest> for InProgressClientRequest {
fn from(client_request: ClientRequest) -> Self {
let ClientRequest { request, tx, span } = client_request;
InProgressClientRequest {
let ClientRequest {
request,
tx: tx.into(),
tx,
inv_collector,
transient_addr,
span,
}
} = client_request;
let tx = MustUseClientResponseSender::new(tx, &request, inv_collector, transient_addr);
InProgressClientRequest { request, tx, span }
}
}
@ -199,26 +249,52 @@ impl From<mpsc::Receiver<ClientRequest>> for ClientRequestReceiver {
}
}
impl<T: std::fmt::Debug> MustUseOneshotSender<T> {
/// Forwards `t` to `tx.send()`, and marks this sender as used.
impl MustUseClientResponseSender {
/// Returns a newly created client response sender for `tx`.
///
/// If `request` or the response contains missing inventory,
/// it is forwarded to the `inv_collector`, for the peer at `transient_addr`.
pub fn new(
tx: oneshot::Sender<Result<Response, SharedPeerError>>,
request: &Request,
inv_collector: Option<broadcast::Sender<InventoryChange>>,
transient_addr: Option<SocketAddr>,
) -> Self {
Self {
tx: Some(tx),
missing_inv: MissingInventoryCollector::new(request, inv_collector, transient_addr),
}
}
/// Forwards `response` to `tx.send()`, and missing inventory to `inv_collector`,
/// and marks this sender as used.
///
/// Panics if `tx.send()` is used more than once.
pub fn send(mut self, t: T) -> Result<(), T> {
pub fn send(
mut self,
response: Result<Response, SharedPeerError>,
) -> Result<(), Result<Response, SharedPeerError>> {
// Forward any missing inventory to the registry.
if let Some(missing_inv) = self.missing_inv.take() {
missing_inv.send(&response);
}
// Forward the response to the internal requester.
self.tx
.take()
.unwrap_or_else(|| {
panic!(
"multiple uses of oneshot sender: oneshot must be used exactly once: {:?}",
"multiple uses of response sender: response must be sent exactly once: {:?}",
self
)
})
.send(t)
.send(response)
}
/// Returns `tx.cancellation()`.
///
/// Panics if `tx.send()` has previously been used.
pub fn cancellation(&mut self) -> oneshot::Cancellation<'_, T> {
pub fn cancellation(&mut self) -> oneshot::Cancellation<'_, Result<Response, SharedPeerError>> {
self.tx
.as_mut()
.map(|tx| tx.cancellation())
@ -239,13 +315,7 @@ impl<T: std::fmt::Debug> MustUseOneshotSender<T> {
}
}
impl<T: std::fmt::Debug> From<oneshot::Sender<T>> for MustUseOneshotSender<T> {
fn from(sender: oneshot::Sender<T>) -> Self {
MustUseOneshotSender { tx: Some(sender) }
}
}
impl<T: std::fmt::Debug> Drop for MustUseOneshotSender<T> {
impl Drop for MustUseClientResponseSender {
#[instrument(skip(self))]
fn drop(&mut self) {
// we don't panic if we are shutting down anyway
@ -253,13 +323,100 @@ impl<T: std::fmt::Debug> Drop for MustUseOneshotSender<T> {
// is_canceled() will not panic, because we check is_none() first
assert!(
self.tx.is_none() || self.is_canceled(),
"unused oneshot sender: oneshot must be used or canceled: {:?}",
"unused client response sender: oneshot must be used or canceled: {:?}",
self
);
}
}
}
impl MissingInventoryCollector {
/// Returns a newly created missing inventory collector, if needed.
///
/// If `request` or the response contains missing inventory,
/// it is forwarded to the `inv_collector`, for the peer at `transient_addr`.
pub fn new(
request: &Request,
inv_collector: Option<broadcast::Sender<InventoryChange>>,
transient_addr: Option<SocketAddr>,
) -> Option<Box<MissingInventoryCollector>> {
if !request.is_inventory_download() {
return None;
}
if let (Some(inv_collector), Some(transient_addr)) = (inv_collector, transient_addr) {
Some(Box::new(MissingInventoryCollector {
request: request.clone(),
collector: inv_collector,
transient_addr,
}))
} else {
None
}
}
/// Forwards any missing inventory to the registry.
///
/// `zcashd` doesn't send `notfound` messages for blocks,
/// so we need to track missing blocks ourselves.
///
/// This can sometimes send duplicate missing inventory,
/// but the registry ignores duplicates anyway.
pub fn send(self, response: &Result<Response, SharedPeerError>) {
let missing_inv: HashSet<InventoryHash> = match (self.request, response) {
// Missing block hashes from partial responses.
(_, Ok(Response::Blocks(block_statuses))) => block_statuses
.iter()
.filter_map(|b| b.missing())
.map(InventoryHash::Block)
.collect(),
// Missing transaction IDs from partial responses.
(_, Ok(Response::Transactions(tx_statuses))) => tx_statuses
.iter()
.filter_map(|tx| tx.missing())
.map(|tx| tx.into())
.collect(),
// Other response types never contain missing inventory.
(_, Ok(_)) => iter::empty().collect(),
// We don't forward NotFoundRegistry errors,
// because the errors are generated locally from the registry,
// so those statuses are already in the registry.
//
// Unfortunately, we can't access the inner error variant here,
// due to TracedError.
(_, Err(e)) if e.inner_debug().contains("NotFoundRegistry") => iter::empty().collect(),
// Missing inventory from other errors, including NotFoundResponse, timeouts,
// and dropped connections.
(request, Err(_)) => {
// The request either contains blocks or transactions,
// but this is a convenient way to collect them both.
let missing_blocks = request
.block_hash_inventory()
.into_iter()
.map(InventoryHash::Block);
let missing_txs = request
.transaction_id_inventory()
.into_iter()
.map(InventoryHash::from);
missing_blocks.chain(missing_txs).collect()
}
};
if let Some(missing_inv) =
InventoryChange::new_missing_multi(missing_inv.iter(), self.transient_addr)
{
// if all the receivers are closed, assume we're in tests or an isolated connection
let _ = self.collector.send(missing_inv);
}
}
}
impl Client {
/// Check if this connection's heartbeat task has exited.
fn check_heartbeat(&mut self, cx: &mut Context<'_>) -> Result<(), SharedPeerError> {
@ -401,7 +558,13 @@ impl Service<Request> for Client {
// request.
let span = tracing::Span::current();
match self.server_tx.try_send(ClientRequest { request, tx, span }) {
match self.server_tx.try_send(ClientRequest {
request,
tx,
inv_collector: Some(self.inv_collector.clone()),
transient_addr: self.transient_addr,
span,
}) {
Err(e) => {
if e.is_disconnected() {
let ClientRequest { tx, .. } = e.into_inner();

View File

@ -1,7 +1,7 @@
//! Tests for the [`Client`] part of peer connections, and some test utilities for mocking
//! [`Client`] instances.
mod vectors;
#![cfg_attr(feature = "proptest-impl", allow(dead_code))]
use std::time::Duration;
@ -9,13 +9,20 @@ use futures::{
channel::{mpsc, oneshot},
future::{self, AbortHandle, Future, FutureExt},
};
use tokio::task::JoinHandle;
use tokio::{
sync::broadcast::{self, error::TryRecvError},
task::JoinHandle,
};
use crate::{
peer::{error::SharedPeerError, CancelHeartbeatTask, Client, ClientRequest, ErrorSlot},
peer_set::InventoryChange,
protocol::external::types::Version,
};
#[cfg(test)]
mod vectors;
/// The maximum time a mocked peer connection should be alive during a test.
const MAX_PEER_CONNECTION_TIME: Duration = Duration::from_secs(10);
@ -23,6 +30,8 @@ const MAX_PEER_CONNECTION_TIME: Duration = Duration::from_secs(10);
pub struct ClientTestHarness {
client_request_receiver: Option<mpsc::Receiver<ClientRequest>>,
shutdown_receiver: Option<oneshot::Receiver<CancelHeartbeatTask>>,
#[allow(dead_code)]
inv_receiver: Option<broadcast::Receiver<InventoryChange>>,
error_slot: ErrorSlot,
version: Version,
connection_aborter: AbortHandle,
@ -109,6 +118,42 @@ impl ClientTestHarness {
}
}
/// Drops the receiver endpoint of [`InventoryChanges`], forcefully closing the channel.
///
/// The inventory registry that would track the changes is mocked for testing.
///
/// Note: this closes the broadcast receiver, it doesn't have a separate `close()` method.
#[allow(dead_code)]
pub fn drop_inventory_change_receiver(&mut self) {
self.inv_receiver
.take()
.expect("inventory change receiver endpoint has already been dropped");
}
/// Tries to receive an [`InventoryChange`] sent by the [`Client`] instance.
///
/// This method acts like a mock inventory registry, allowing tests to track the changes.
///
/// TODO: make ReceiveRequestAttempt generic, and use it here.
#[allow(dead_code)]
pub(crate) fn try_to_receive_inventory_change(&mut self) -> Option<InventoryChange> {
let receive_result = self
.inv_receiver
.as_mut()
.expect("inventory change receiver endpoint has been dropped")
.try_recv();
match receive_result {
Ok(change) => Some(change),
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Closed) => None,
Err(TryRecvError::Lagged(skipped_messages)) => unreachable!(
"unexpected lagged inventory receiver in tests, skipped {} messages",
skipped_messages,
),
}
}
/// Returns the current error in the [`ErrorSlot`], if there is one.
pub fn current_error(&self) -> Option<SharedPeerError> {
self.error_slot.try_get_error()
@ -228,6 +273,8 @@ where
pub fn finish(self) -> (Client, ClientTestHarness) {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let (client_request_sender, client_request_receiver) = mpsc::channel(1);
let (inv_sender, inv_receiver) = broadcast::channel(5);
let error_slot = ErrorSlot::default();
let version = self.version.unwrap_or(Version(0));
@ -239,6 +286,8 @@ where
let client = Client {
shutdown_tx: Some(shutdown_sender),
server_tx: client_request_sender,
inv_collector: inv_sender,
transient_addr: None,
error_slot: error_slot.clone(),
version,
connection_task,
@ -248,6 +297,7 @@ where
let harness = ClientTestHarness {
client_request_receiver: Some(client_request_receiver),
shutdown_receiver: Some(shutdown_receiver),
inv_receiver: Some(inv_receiver),
error_slot,
version,
connection_aborter,

View File

@ -1,11 +1,19 @@
//! Fixed peer [`Client`] test vectors.
use std::iter;
use futures::poll;
use tokio::sync::broadcast;
use tower::ServiceExt;
use zebra_chain::block;
use zebra_test::service_extensions::IsReady;
use crate::{peer::ClientTestHarness, PeerError};
use crate::{
peer::{client::MissingInventoryCollector, ClientTestHarness},
protocol::external::InventoryHash,
PeerError, Request, SharedPeerError,
};
/// Test that a newly initialized client functions correctly before it is polled.
#[tokio::test]
@ -217,3 +225,36 @@ async fn client_service_propagates_panic_from_heartbeat_task() {
let _ = poll!(client.ready());
}
/// Make sure MissingInventoryCollector ignores NotFoundRegistry errors.
///
/// ## Correctness
///
/// If the MissingInventoryCollector registered these locally generated errors,
/// our missing inventory errors could get constantly refreshed locally,
/// and we would never ask the peer if it has received the inventory.
#[test]
fn missing_inv_collector_ignores_local_registry_errors() {
zebra_test::init();
let block_hash = block::Hash([0; 32]);
let request = Request::BlocksByHash(iter::once(block_hash).collect());
let response = Err(SharedPeerError::from(PeerError::NotFoundRegistry(vec![
InventoryHash::from(block_hash),
])));
let (inv_collector, mut inv_receiver) = broadcast::channel(1);
let transient_addr = "0.0.0.0:0".parse().unwrap();
// Keep the channel open, so we don't get a `Closed` error.
let _inv_channel_guard = inv_collector.clone();
let missing_inv =
MissingInventoryCollector::new(&request, Some(inv_collector), Some(transient_addr))
.expect("unexpected invalid collector: arguments should be valid");
missing_inv.send(&response);
let recv_result = inv_receiver.try_recv();
assert_eq!(recv_result, Err(broadcast::error::TryRecvError::Empty));
}

View File

@ -29,18 +29,18 @@ use crate::{
meta_addr::MetaAddr,
peer::{
connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver,
ConnectedAddr, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
ConnectedAddr, ErrorSlot, InProgressClientRequest, MustUseClientResponseSender, PeerError,
SharedPeerError,
},
peer_set::ConnectionTracker,
protocol::{
external::{types::Nonce, InventoryHash, Message},
internal::{Request, Response, ResponseStatus},
internal::{InventoryResponse, Request, Response},
},
BoxError,
};
use ResponseStatus::*;
use InventoryResponse::*;
mod peer_tx;
@ -151,7 +151,7 @@ impl Handler {
(Handler::Peers, Message::Addr(addrs)) => Handler::Finished(Ok(Response::Peers(addrs))),
// `zcashd` returns requested transactions in a single batch of messages.
// Other transaction or non-transaction messages can come before or after the batch.
// After the transaction batch, `zcashd` sends `NotFound` if any transactions are missing:
// After the transaction batch, `zcashd` sends `notfound` if any transactions are missing:
// https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5617
(
Handler::TransactionsById {
@ -163,17 +163,17 @@ impl Handler {
// assumptions:
// - the transaction messages are sent in a single continuous batch
// - missing transactions are silently skipped
// (there is no `NotFound` message at the end of the batch)
// (there is no `notfound` message at the end of the batch)
if pending_ids.remove(&transaction.id) {
// we are in the middle of the continuous transaction messages
transactions.push(transaction);
} else {
// We got a transaction we didn't ask for. If the caller doesn't know any of the
// transactions, they should have sent a `NotFound` with all the hashes, rather
// transactions, they should have sent a `notfound` with all the hashes, rather
// than an unsolicited transaction.
//
// So either:
// 1. The peer implements the protocol badly, skipping `NotFound`.
// 1. The peer implements the protocol badly, skipping `notfound`.
// We should cancel the request, so we don't hang waiting for transactions
// that will never arrive.
// 2. The peer sent an unsolicited transaction.
@ -188,11 +188,11 @@ impl Handler {
if ignored_msg.is_some() && transactions.is_empty() {
// If we didn't get anything we wanted, retry the request.
let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids)))
Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
} else if pending_ids.is_empty() || ignored_msg.is_some() {
// If we got some of what we wanted, let the internal client know.
let available = transactions.into_iter().map(ResponseStatus::Available);
let missing = pending_ids.into_iter().map(ResponseStatus::Missing);
let available = transactions.into_iter().map(InventoryResponse::Available);
let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
Handler::Finished(Ok(Response::Transactions(
available.chain(missing).collect(),
@ -214,12 +214,12 @@ impl Handler {
Message::NotFound(missing_invs),
) => {
// assumptions:
// - the peer eventually returns a transaction or a `NotFound` entry
// - the peer eventually returns a transaction or a `notfound` entry
// for each hash
// - all `NotFound` entries are contained in a single message
// - the `NotFound` message comes after the transaction messages
// - all `notfound` entries are contained in a single message
// - the `notfound` message comes after the transaction messages
//
// If we're in sync with the peer, then the `NotFound` should contain the remaining
// If we're in sync with the peer, then the `notfound` should contain the remaining
// hashes from the handler. If we're not in sync with the peer, we should return
// what we got so far.
let missing_transaction_ids: HashSet<_> = transaction_ids(&missing_invs).collect();
@ -236,11 +236,11 @@ impl Handler {
if transactions.is_empty() {
// If we didn't get anything we wanted, retry the request.
let missing_transaction_ids = pending_ids.into_iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_transaction_ids)))
Handler::Finished(Err(PeerError::NotFoundResponse(missing_transaction_ids)))
} else {
// If we got some of what we wanted, let the internal client know.
let available = transactions.into_iter().map(ResponseStatus::Available);
let missing = pending_ids.into_iter().map(ResponseStatus::Missing);
let available = transactions.into_iter().map(InventoryResponse::Available);
let missing = pending_ids.into_iter().map(InventoryResponse::Missing);
Handler::Finished(Ok(Response::Transactions(
available.chain(missing).collect(),
@ -249,7 +249,7 @@ impl Handler {
}
// `zcashd` returns requested blocks in a single batch of messages.
// Other blocks or non-blocks messages can come before or after the batch.
// `zcashd` silently skips missing blocks, rather than sending a final `NotFound` message.
// `zcashd` silently skips missing blocks, rather than sending a final `notfound` message.
// https://github.com/zcash/zcash/blob/e7b425298f6d9a54810cb7183f00be547e4d9415/src/main.cpp#L5523
(
Handler::BlocksByHash {
@ -261,7 +261,7 @@ impl Handler {
// assumptions:
// - the block messages are sent in a single continuous batch
// - missing blocks are silently skipped
// (there is no `NotFound` message at the end of the batch)
// (there is no `notfound` message at the end of the batch)
if pending_hashes.remove(&block.hash()) {
// we are in the middle of the continuous block messages
blocks.push(block);
@ -286,16 +286,20 @@ impl Handler {
// when the response for the second request arrives.
//
// Ignoring the message gives us a chance to synchronize back to the correct
// request.
// request. If that doesn't happen, this request times out.
//
// Peers can avoid these cascading errors by sending an explicit `notfound`.
// Zebra sends `notfound`, but `zcashd` doesn't.
// In case 2, if peers respond with a `notfound` message,
// the cascading errors don't happen. The `notfound` message cancels our request,
// and we know we are in sync with the peer.
//
// Zebra sends `notfound` in response to block requests, but `zcashd` doesn't.
// So we need this message workaround, and the related inventory workarounds.
ignored_msg = Some(Message::Block(block));
}
if pending_hashes.is_empty() {
// If we got everything we wanted, let the internal client know.
let available = blocks.into_iter().map(ResponseStatus::Available);
let available = blocks.into_iter().map(InventoryResponse::Available);
Handler::Finished(Ok(Response::Blocks(available.collect())))
} else {
// Keep on waiting for all the blocks we wanted, until we get them or time out.
@ -314,12 +318,12 @@ impl Handler {
Message::NotFound(missing_invs),
) => {
// assumptions:
// - the peer eventually returns a block or a `NotFound` entry
// - the peer eventually returns a block or a `notfound` entry
// for each hash
// - all `NotFound` entries are contained in a single message
// - the `NotFound` message comes after the block messages
// - all `notfound` entries are contained in a single message
// - the `notfound` message comes after the block messages
//
// If we're in sync with the peer, then the `NotFound` should contain the remaining
// If we're in sync with the peer, then the `notfound` should contain the remaining
// hashes from the handler. If we're not in sync with the peer, we should return
// what we got so far, and log an error.
let missing_blocks: HashSet<_> = block_hashes(&missing_invs).collect();
@ -336,11 +340,11 @@ impl Handler {
if blocks.is_empty() {
// If we didn't get anything we wanted, retry the request.
let missing_block_hashes = pending_hashes.into_iter().map(Into::into).collect();
Handler::Finished(Err(PeerError::NotFound(missing_block_hashes)))
Handler::Finished(Err(PeerError::NotFoundResponse(missing_block_hashes)))
} else {
// If we got some of what we wanted, let the internal client know.
let available = blocks.into_iter().map(ResponseStatus::Available);
let missing = pending_hashes.into_iter().map(ResponseStatus::Missing);
let available = blocks.into_iter().map(InventoryResponse::Available);
let missing = pending_hashes.into_iter().map(InventoryResponse::Missing);
Handler::Finished(Ok(Response::Blocks(available.chain(missing).collect())))
}
@ -387,7 +391,7 @@ pub(super) enum State {
/// Awaiting a peer message we can interpret as a client request.
AwaitingResponse {
handler: Handler,
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
tx: MustUseClientResponseSender,
span: tracing::Span,
},
/// A failure has occurred and we are shutting down the connection.
@ -470,7 +474,7 @@ pub struct Connection<S, Tx> {
pub(super) client_rx: ClientRequestReceiver,
/// A slot for an error shared between the Connection and the Client that uses it.
//
///
/// `None` unless the connection or client have errored.
pub(super) error_slot: ErrorSlot,
@ -1243,7 +1247,7 @@ where
}
Response::Blocks(blocks) => {
// Generate one tx message per block,
// then a notfound% message with all the missing block hashes.
// then a notfound message with all the missing block hashes.
let mut missing_hashes = Vec::new();
for block in blocks.into_iter() {

View File

@ -20,11 +20,11 @@ use zebra_test::mock_service::{MockService, PropTestAssertion};
use crate::{
peer::{connection::Connection, ClientRequest, ErrorSlot},
protocol::external::Message,
protocol::internal::ResponseStatus,
protocol::internal::InventoryResponse,
Request, Response, SharedPeerError,
};
use ResponseStatus::*;
use InventoryResponse::*;
proptest! {
// The default value of proptest cases (256) causes this test to take more than ten seconds on
@ -156,6 +156,9 @@ async fn send_block_request(
let client_request = ClientRequest {
request,
tx: response_sender,
// we skip inventory collection in these tests
inv_collector: None,
transient_addr: None,
span: Span::none(),
};

View File

@ -132,6 +132,8 @@ async fn connection_run_loop_message_ok() {
let request = ClientRequest {
request: Request::Peers,
tx: request_tx,
inv_collector: None,
transient_addr: None,
span: Span::current(),
};
@ -459,6 +461,8 @@ async fn connection_run_loop_send_timeout_nil_response() {
let request = ClientRequest {
request: Request::AdvertiseTransactionIds(HashSet::new()),
tx: request_tx,
inv_collector: None,
transient_addr: None,
span: Span::current(),
};
@ -532,6 +536,8 @@ async fn connection_run_loop_send_timeout_expect_response() {
let request = ClientRequest {
request: Request::Peers,
tx: request_tx,
inv_collector: None,
transient_addr: None,
span: Span::current(),
};
@ -605,6 +611,8 @@ async fn connection_run_loop_receive_timeout() {
let request = ClientRequest {
request: Request::Peers,
tx: request_tx,
inv_collector: None,
transient_addr: None,
span: Span::current(),
};

View File

@ -82,9 +82,40 @@ pub enum PeerError {
#[error("Internal services over capacity")]
Overloaded,
/// We requested data that the peer couldn't find.
#[error("Remote peer could not find items: {0:?}")]
NotFound(Vec<InventoryHash>),
/// We requested data, but the peer replied with a `notfound` message.
/// (Or it didn't respond before the request finished.)
///
/// This error happens when the peer doesn't have any of the requested data,
/// so that the original request can be retried.
///
/// This is a temporary error.
///
/// Zebra can try different peers if the request is retried,
/// or peers can download and verify the missing data.
///
/// If the peer has some of the data, the request returns an [`Ok`] response,
/// with any `notfound` data is marked as [`Missing`](InventoryResponse::Missing).
#[error("Remote peer could not find any of the items: {0:?}")]
NotFoundResponse(Vec<InventoryHash>),
/// We requested data, but all our ready peers are marked as recently
/// [`Missing`](InventoryResponse::Missing) that data in our local inventory registry.
///
/// This is a temporary error.
///
/// Peers with the inventory can finish their requests and become ready,
/// or other peers can download and verify the missing data.
///
/// # Correctness
///
/// This error is produced using Zebra's local inventory registry,
/// without contacting any peers.
///
/// Client responses containing this error must not be used to update the inventory registry.
/// This makes sure that we eventually expire our local cache of missing inventory,
/// and send requests to peers again.
#[error("All ready peers are registered as recently missing these items: {0:?}")]
NotFoundRegistry(Vec<InventoryHash>),
}
impl PeerError {
@ -103,7 +134,8 @@ impl PeerError {
PeerError::Serialization(inner) => format!("Serialization({})", inner).into(),
PeerError::DuplicateHandshake => "DuplicateHandshake".into(),
PeerError::Overloaded => "Overloaded".into(),
PeerError::NotFound(_) => "NotFound".into(),
PeerError::NotFoundResponse(_) => "NotFoundResponse".into(),
PeerError::NotFoundRegistry(_) => "NotFoundRegistry".into(),
}
}
}

View File

@ -882,7 +882,7 @@ where
// So we can just track peer activity based on Ping and Pong.
// (This significantly improves performance, by reducing time system calls.)
let inbound_ts_collector = address_book_updater.clone();
let inv_collector = inv_collector.clone();
let inbound_inv_collector = inv_collector.clone();
let ts_inner_conn_span = connection_span.clone();
let inv_inner_conn_span = connection_span.clone();
let peer_rx = peer_rx
@ -892,6 +892,7 @@ where
let inbound_ts_collector = inbound_ts_collector.clone();
let span =
debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector");
async move {
match &msg {
Ok(msg) => {
@ -935,9 +936,10 @@ where
.instrument(span)
})
.then(move |msg| {
let inv_collector = inv_collector.clone();
let inbound_inv_collector = inbound_inv_collector.clone();
let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter");
register_inventory_status(msg, connected_addr, inv_collector).instrument(span)
register_inventory_status(msg, connected_addr, inbound_inv_collector)
.instrument(span)
})
.boxed();
@ -971,6 +973,8 @@ where
let client = Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
inv_collector,
transient_addr: connected_addr.get_transient_addr(),
error_slot,
version: remote_version,
connection_task,
@ -1184,6 +1188,9 @@ async fn send_one_heartbeat(
match server_tx.try_send(ClientRequest {
request,
tx,
// we're not requesting inventory, so we don't need to update the registry
inv_collector: None,
transient_addr: None,
span: tracing::Span::current(),
}) {
Ok(()) => {}

View File

@ -1,5 +1,7 @@
//! Test utilities and tests for minimum network peer version requirements.
#![cfg_attr(feature = "proptest-impl", allow(dead_code))]
use zebra_chain::{
chain_tip::mock::{MockChainTip, MockChainTipSender},
parameters::Network,

View File

@ -8,7 +8,6 @@ use std::{
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::{FutureExt, Stream, StreamExt};
@ -18,17 +17,18 @@ use tokio::{
};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream, IntervalStream};
use zebra_chain::{parameters::POST_BLOSSOM_POW_TARGET_SPACING, serialization::AtLeastOne};
use zebra_chain::serialization::AtLeastOne;
use crate::{
protocol::{external::InventoryHash, internal::ResponseStatus},
constants::INVENTORY_ROTATION_INTERVAL,
protocol::{external::InventoryHash, internal::InventoryResponse},
BoxError,
};
use self::update::Update;
/// Underlying type for the alias InventoryStatus::*
use ResponseStatus::*;
use InventoryResponse::*;
pub mod update;
@ -36,7 +36,7 @@ pub mod update;
mod tests;
/// A peer inventory status, which tracks a hash for both available and missing inventory.
pub type InventoryStatus<T> = ResponseStatus<T, T>;
pub type InventoryStatus<T> = InventoryResponse<T, T>;
/// A peer inventory status change, used in the inventory status channel.
///
@ -106,7 +106,6 @@ impl InventoryChange {
}
/// Returns a new missing multiple inventory change, if `hashes` contains at least one change.
#[allow(dead_code)]
pub fn new_missing_multi<'a>(
hashes: impl IntoIterator<Item = &'a InventoryHash>,
peer: SocketAddr,
@ -135,8 +134,8 @@ impl<T> InventoryStatus<T> {
}
impl<T: Clone> InventoryStatus<T> {
/// Get the inner item, regardless of status.
pub fn inner(&self) -> T {
/// Returns a clone of the inner item, regardless of status.
pub fn to_inner(&self) -> T {
match self {
Available(item) | Missing(item) => item.clone(),
}
@ -146,11 +145,7 @@ impl<T: Clone> InventoryStatus<T> {
impl InventoryRegistry {
/// Returns a new Inventory Registry for `inv_stream`.
pub fn new(inv_stream: broadcast::Receiver<InventoryChange>) -> Self {
let interval = Duration::from_secs(
POST_BLOSSOM_POW_TARGET_SPACING
.try_into()
.expect("non-negative"),
);
let interval = INVENTORY_ROTATION_INTERVAL;
// Don't do an immediate rotation, current and prev are already empty.
let mut interval = tokio::time::interval_at(Instant::now() + interval, interval);
@ -270,7 +265,7 @@ impl InventoryRegistry {
/// `Missing` markers are not updated until the registry rotates, for security reasons.
fn register(&mut self, change: InventoryChange) {
let new_status = change.marker();
let (invs, addr) = change.inner();
let (invs, addr) = change.to_inner();
for inv in invs {
use InventoryHash::*;

View File

@ -29,7 +29,6 @@ proptest! {
// Start the runtime
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
runtime.block_on(async move {
// Check all combinations of:

View File

@ -654,7 +654,7 @@ where
/// falling back to a ready peer that isn't missing the inventory.
///
/// If all ready peers are missing the inventory,
/// returns a [`NotFound`](PeerError::NotFound) error.
/// returns a synthetic [`NotFoundRegistry`](PeerError::NotFoundRegistry) error.
///
/// Uses P2C to route requests to the least loaded peer in each list.
fn route_inv(
@ -724,7 +724,9 @@ where
// Avoid routing requests to peers that are missing inventory.
// If we kept trying doomed requests, peers that are missing our requested inventory
// could take up a large amount of our bandwidth and retry limits.
Err(SharedPeerError::from(PeerError::NotFound(vec![hash])))
Err(SharedPeerError::from(PeerError::NotFoundRegistry(vec![
hash,
])))
}
.map_err(Into::into)
.boxed()

View File

@ -542,7 +542,7 @@ fn peer_set_route_inv_all_missing_fail() {
.downcast_ref::<SharedPeerError>()
.expect("peer set should return a boxed SharedPeerError")
.inner_debug(),
"NotFound([Block(block::Hash(\"0000000000000000000000000000000000000000000000000000000000000000\"))])"
"NotFoundRegistry([Block(block::Hash(\"0000000000000000000000000000000000000000000000000000000000000000\"))])"
);
});
}

View File

@ -20,6 +20,7 @@ pub use in_version::AddrInVersion;
pub(super) use v1::AddrV1;
pub(super) use v2::AddrV2;
#[allow(unused_imports)]
#[cfg(any(test, feature = "proptest-impl"))]
pub(super) use v1::{ipv6_mapped_socket_addr, ADDR_V1_SIZE};

View File

@ -230,7 +230,7 @@ pub enum Message {
///
/// `zcashd` returns requested items in a single batch of messages.
/// Missing blocks are silently skipped. Missing transaction hashes are
/// included in a single `NotFound` message following the transactions.
/// included in a single `notfound` message following the transactions.
/// Other item or non-item messages can come before or after the batch.
///
/// The list contains zero or more inventory hashes.
@ -254,13 +254,15 @@ pub enum Message {
/// A `notfound` message.
///
/// Zebra responds with this message when it doesn't have the requested blocks or transactions.
///
/// When a peer requests a list of transaction hashes, `zcashd` returns:
/// - a batch of messages containing found transactions, then
/// - a `NotFound` message containing a list of transaction hashes that
/// - a `notfound` message containing a list of transaction hashes that
/// aren't available in its mempool or state.
///
/// But when a peer requests blocks or headers, any missing items are
/// silently skipped, without any `NotFound` messages.
/// silently skipped, without any `notfound` messages.
///
/// The list contains zero or more inventory hashes.
///

View File

@ -1,7 +1,3 @@
#![allow(clippy::unit_arg)]
use crate::constants::{self, magics};
use std::{cmp::max, fmt};
use zebra_chain::{
@ -12,7 +8,9 @@ use zebra_chain::{
},
};
#[cfg(test)]
use crate::constants::{self, magics};
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
/// A magic number identifying the network.

View File

@ -4,4 +4,4 @@ mod response_status;
pub use request::Request;
pub use response::Response;
pub use response_status::ResponseStatus;
pub use response_status::InventoryResponse;

View File

@ -249,4 +249,30 @@ impl Request {
Request::MempoolTransactionIds => "MempoolTransactionIds",
}
}
/// Returns true if the request is for block or transaction inventory downloads.
pub fn is_inventory_download(&self) -> bool {
matches!(
self,
Request::BlocksByHash(_) | Request::TransactionsById(_)
)
}
/// Returns the block hash inventory downloads from the request, if any.
pub fn block_hash_inventory(&self) -> HashSet<block::Hash> {
if let Request::BlocksByHash(block_hashes) = self {
block_hashes.clone()
} else {
HashSet::new()
}
}
/// Returns the transaction ID inventory downloads from the request, if any.
pub fn transaction_id_inventory(&self) -> HashSet<UnminedTxId> {
if let Request::TransactionsById(transaction_ids) = self {
transaction_ids.clone()
} else {
HashSet::new()
}
}
}

View File

@ -7,12 +7,12 @@ use zebra_chain::{
transaction::{UnminedTx, UnminedTxId},
};
use crate::{meta_addr::MetaAddr, protocol::internal::ResponseStatus};
use crate::{meta_addr::MetaAddr, protocol::internal::InventoryResponse};
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
use ResponseStatus::*;
use InventoryResponse::*;
/// A response to a network request, represented in internal format.
#[derive(Clone, Debug, Eq, PartialEq)]
@ -66,15 +66,15 @@ pub enum Response {
/// When Zebra doesn't have a block or transaction, it always sends `notfound`.
/// `zcashd` sometimes sends no response, and sometimes sends `notfound`.
//
// TODO: make this into a HashMap<block::Hash, ResponseStatus<Arc<Block>, ()>> - a unique list (#2244)
Blocks(Vec<ResponseStatus<Arc<Block>, block::Hash>>),
// TODO: make this into a HashMap<block::Hash, InventoryResponse<Arc<Block>, ()>> - a unique list (#2244)
Blocks(Vec<InventoryResponse<Arc<Block>, block::Hash>>),
/// A list of found unmined transactions, and missing unmined transaction IDs.
///
/// Each list contains zero or more entries.
//
// TODO: make this into a HashMap<UnminedTxId, ResponseStatus<UnminedTx, ()>> - a unique list (#2244)
Transactions(Vec<ResponseStatus<UnminedTx, UnminedTxId>>),
// TODO: make this into a HashMap<UnminedTxId, InventoryResponse<UnminedTx, ()>> - a unique list (#2244)
Transactions(Vec<InventoryResponse<UnminedTx, UnminedTxId>>),
}
impl fmt::Display for Response {
@ -136,4 +136,9 @@ impl Response {
Response::Transactions(_) => "Transactions",
}
}
/// Returns true if the response is a block or transaction inventory download.
pub fn is_inventory_download(&self) -> bool {
matches!(self, Response::Blocks(_) | Response::Transactions(_))
}
}

View File

@ -5,7 +5,7 @@ use std::fmt;
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
use ResponseStatus::*;
use InventoryResponse::*;
/// A generic peer inventory response status.
///
@ -13,7 +13,7 @@ use ResponseStatus::*;
/// and `Missing` is used for inventory that is missing from the response.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub enum ResponseStatus<A, M> {
pub enum InventoryResponse<A, M> {
/// An available inventory item.
Available(A),
@ -21,37 +21,34 @@ pub enum ResponseStatus<A, M> {
Missing(M),
}
impl<A, M> fmt::Display for ResponseStatus<A, M> {
impl<A, M> fmt::Display for InventoryResponse<A, M> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(self.command())
}
}
impl<A, M> ResponseStatus<A, M> {
impl<A, M> InventoryResponse<A, M> {
/// Returns the response status type as a string.
pub fn command(&self) -> &'static str {
match self {
ResponseStatus::Available(_) => "Available",
ResponseStatus::Missing(_) => "Missing",
InventoryResponse::Available(_) => "Available",
InventoryResponse::Missing(_) => "Missing",
}
}
/// Returns true if the inventory item was available.
#[allow(dead_code)]
pub fn is_available(&self) -> bool {
matches!(self, Available(_))
}
/// Returns true if the inventory item was missing.
#[allow(dead_code)]
pub fn is_missing(&self) -> bool {
matches!(self, Missing(_))
}
/// Maps a `ResponseStatus<A, M>` to `ResponseStatus<B, M>` by applying a function to a
/// Maps a `InventoryResponse<A, M>` to `InventoryResponse<B, M>` by applying a function to a
/// contained [`Available`] value, leaving the [`Missing`] value untouched.
#[allow(dead_code)]
pub fn map_available<B, F: FnOnce(A) -> B>(self, f: F) -> ResponseStatus<B, M> {
pub fn map_available<B, F: FnOnce(A) -> B>(self, f: F) -> InventoryResponse<B, M> {
// Based on Result::map from https://doc.rust-lang.org/src/core/result.rs.html#765
match self {
Available(a) => Available(f(a)),
@ -59,10 +56,9 @@ impl<A, M> ResponseStatus<A, M> {
}
}
/// Maps a `ResponseStatus<A, M>` to `ResponseStatus<A, N>` by applying a function to a
/// Maps a `InventoryResponse<A, M>` to `InventoryResponse<A, N>` by applying a function to a
/// contained [`Missing`] value, leaving the [`Available`] value untouched.
#[allow(dead_code)]
pub fn map_missing<N, F: FnOnce(M) -> N>(self, f: F) -> ResponseStatus<A, N> {
pub fn map_missing<N, F: FnOnce(M) -> N>(self, f: F) -> InventoryResponse<A, N> {
// Based on Result::map_err from https://doc.rust-lang.org/src/core/result.rs.html#850
match self {
Available(a) => Available(a),
@ -70,8 +66,8 @@ impl<A, M> ResponseStatus<A, M> {
}
}
/// Converts from `&ResponseStatus<A, M>` to `ResponseStatus<&A, &M>`.
pub fn as_ref(&self) -> ResponseStatus<&A, &M> {
/// Converts from `&InventoryResponse<A, M>` to `InventoryResponse<&A, &M>`.
pub fn as_ref(&self) -> InventoryResponse<&A, &M> {
match self {
Available(item) => Available(item),
Missing(item) => Missing(item),
@ -79,7 +75,7 @@ impl<A, M> ResponseStatus<A, M> {
}
}
impl<A: Clone, M: Clone> ResponseStatus<A, M> {
impl<A: Clone, M: Clone> InventoryResponse<A, M> {
/// Get the available inventory item, if present.
pub fn available(&self) -> Option<A> {
if let Available(item) = self {
@ -90,7 +86,6 @@ impl<A: Clone, M: Clone> ResponseStatus<A, M> {
}
/// Get the missing inventory item, if present.
#[allow(dead_code)]
pub fn missing(&self) -> Option<M> {
if let Missing(item) = self {
Some(item.clone())

View File

@ -24,7 +24,7 @@ metrics = "0.17.1"
# The fix should be included in multiset 0.0.6.
multiset = { git = "https://github.com/jmitchell/multiset", rev = "91ef8550b518f75ae87ae0d8771150f259fd34d5" }
proptest = { version = "0.10.1", optional = true }
proptest-derive = { version = "0.3", optional = true }
proptest-derive = { version = "0.3.0", optional = true }
regex = "1"
rlimit = "0.5.4"
rocksdb = "0.17.0"
@ -47,7 +47,7 @@ halo2 = "=0.1.0-beta.1"
itertools = "0.10.3"
jubjub = "0.8.0"
proptest = "0.10.1"
proptest-derive = "0.3"
proptest-derive = "0.3.0"
spandoc = "0.2"
tokio = { version = "1.16.1", features = ["full"] }

View File

@ -61,11 +61,12 @@ semver = "1.0.5"
tempfile = "3.3.0"
tokio = { version = "1.16.1", features = ["full", "test-util"] }
proptest = "0.10"
proptest-derive = "0.3"
proptest = "0.10.1"
proptest-derive = "0.3.0"
zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }
zebra-consensus = { path = "../zebra-consensus/", features = ["proptest-impl"] }
zebra-network = { path = "../zebra-network", features = ["proptest-impl"] }
zebra-state = { path = "../zebra-state", features = ["proptest-impl"] }
zebra-test = { path = "../zebra-test" }

View File

@ -31,7 +31,7 @@ use zebra_chain::{
use zebra_consensus::chain::VerifyChainError;
use zebra_network::{
constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
AddressBook, ResponseStatus,
AddressBook, InventoryResponse,
};
// Re-use the syncer timeouts for consistency.
@ -40,7 +40,7 @@ use super::{
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
};
use ResponseStatus::*;
use InventoryResponse::*;
pub(crate) mod downloads;
@ -319,6 +319,9 @@ impl Service<zn::Request> for Inbound {
async move {
// Correctness: get the current time after acquiring the address book lock.
//
// This time is used to filter outdated peers, so it doesn't really matter
// if we get it when the future is created, or when it starts running.
let now = Utc::now();
// Send a sanitized response
@ -358,17 +361,14 @@ impl Service<zn::Request> for Inbound {
// https://github.com/tower-rs/tower/blob/master/tower/src/util/call_all/common.rs#L112
use futures::stream::TryStreamExt;
hashes
.clone()
.into_iter()
.iter()
.cloned()
.map(|hash| zs::Request::Block(hash.into()))
.map(|request| state.clone().oneshot(request))
.collect::<futures::stream::FuturesOrdered<_>>()
.try_filter_map(|response| async move {
Ok(match response {
zs::Response::Block(Some(block)) => Some(block),
// `zcashd` ignores missing blocks in GetData responses,
// rather than including them in a trailing `NotFound`
// message
zs::Response::Block(None) => None,
_ => unreachable!("wrong response from state"),
})

View File

@ -22,7 +22,7 @@ use zebra_chain::{
transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx},
};
use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig};
use zebra_network::{AddressBook, Request, Response, ResponseStatus};
use zebra_network::{AddressBook, InventoryResponse, Request, Response};
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion};
@ -35,7 +35,7 @@ use crate::{
BoxError,
};
use ResponseStatus::*;
use InventoryResponse::*;
/// Maximum time to wait for a network service request.
///

View File

@ -14,12 +14,13 @@ use tower::{
use zebra_chain::{
block::{self, Block},
parameters::Network,
transaction::{AuthDigest, Hash as TxHash, UnminedTxId, WtxId},
serialization::ZcashDeserializeInto,
transaction::{AuthDigest, Hash as TxHash, Transaction, UnminedTx, UnminedTxId, WtxId},
};
use zebra_consensus::{chain::VerifyChainError, error::TransactionError, transaction};
use zebra_network::{
connect_isolated_tcp_direct, Config as NetworkConfig, Request, Response, ResponseStatus,
SharedPeerError,
connect_isolated_tcp_direct_with_inbound, types::InventoryHash, Config as NetworkConfig,
InventoryResponse, PeerError, Request, Response, SharedPeerError,
};
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion};
@ -33,7 +34,7 @@ use crate::{
BoxError,
};
use ResponseStatus::*;
use InventoryResponse::*;
/// Check that a network stack with an empty address book only contains the local listener port,
/// by querying the inbound service via a local TCP connection.
@ -57,9 +58,9 @@ async fn inbound_peers_empty_address_book() -> Result<(), crate::BoxError> {
tx_gossip_task_handle,
// real open socket addresses
listen_addr,
) = setup().await;
) = setup(None).await;
// Use inbound directly
// Send a request to inbound directly
let request = inbound_service.clone().oneshot(Request::Peers);
let response = request.await;
match response.as_ref() {
@ -78,7 +79,7 @@ async fn inbound_peers_empty_address_book() -> Result<(), crate::BoxError> {
),
};
// Use the connected peer via a local TCP connection
// Send a request via the connected peer, via a local TCP connection, to the inbound service
let request = connected_peer_service.clone().oneshot(Request::Peers);
let response = request.await;
match response.as_ref() {
@ -134,11 +135,11 @@ async fn inbound_block_empty_state_notfound() -> Result<(), crate::BoxError> {
tx_gossip_task_handle,
// real open socket addresses
_listen_addr,
) = setup().await;
) = setup(None).await;
let test_block = block::Hash([0x11; 32]);
// Use inbound directly
// Send a request to inbound directly
let request = inbound_service
.clone()
.oneshot(Request::BlocksByHash(iter::once(test_block).collect()));
@ -159,26 +160,27 @@ async fn inbound_block_empty_state_notfound() -> Result<(), crate::BoxError> {
),
};
// Use the connected peer via a local TCP connection
let request = connected_peer_service
// Send a request via the connected peer, via a local TCP connection, to the inbound service
let response = connected_peer_service
.clone()
.oneshot(Request::BlocksByHash(iter::once(test_block).collect()));
let response = request.await;
match response.as_ref() {
Err(missing_error) => {
let missing_error = missing_error
.downcast_ref::<SharedPeerError>()
.expect("unexpected inner error type, expected SharedPeerError");
assert_eq!(
missing_error.inner_debug(),
"NotFound([Block(block::Hash(\"1111111111111111111111111111111111111111111111111111111111111111\"))])"
);
}
_ => unreachable!(
"peer::Connection should map missing `BlocksByHash` responses as `Err(SharedPeerError(NotFound(_)))`, \
.oneshot(Request::BlocksByHash(iter::once(test_block).collect()))
.await;
if let Err(missing_error) = response.as_ref() {
let missing_error = missing_error
.downcast_ref::<SharedPeerError>()
.expect("unexpected inner error type, expected SharedPeerError");
// Unfortunately, we can't access SharedPeerError's inner type,
// so we can't compare the actual responses.
let expected = PeerError::NotFoundResponse(vec![InventoryHash::Block(test_block)]);
let expected = SharedPeerError::from(expected);
assert_eq!(missing_error.inner_debug(), expected.inner_debug());
} else {
unreachable!(
"peer::Connection should map missing `BlocksByHash` responses as `Err(SharedPeerError(NotFoundResponse(_)))`, \
actual result: {:?}",
response
),
response
)
};
let block_gossip_result = block_gossip_task_handle.now_or_never();
@ -201,8 +203,6 @@ async fn inbound_block_empty_state_notfound() -> Result<(), crate::BoxError> {
/// Check that a network stack with an empty state responds to single transaction requests with `notfound`.
///
/// Uses a real Zebra network stack, with an isolated Zebra inbound TCP connection.
///
/// TODO: test a response with some Available and some Missing transactions.
#[tokio::test]
async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> {
let (
@ -220,7 +220,7 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> {
tx_gossip_task_handle,
// real open socket addresses
_listen_addr,
) = setup().await;
) = setup(None).await;
let test_tx = UnminedTxId::from_legacy_id(TxHash([0x22; 32]));
let test_wtx: UnminedTxId = WtxId {
@ -231,7 +231,7 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> {
// Test both transaction ID variants, separately and together
for txs in [vec![test_tx], vec![test_wtx], vec![test_tx, test_wtx]] {
// Use inbound directly
// Send a request to inbound directly
let request = inbound_service
.clone()
.oneshot(Request::TransactionsById(txs.iter().copied().collect()));
@ -256,50 +256,55 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> {
),
};
// Use the connected peer via a local TCP connection
let request = connected_peer_service
// Send a request via the connected peer, via a local TCP connection, to the inbound service
let response = connected_peer_service
.clone()
.oneshot(Request::TransactionsById(txs.iter().copied().collect()));
let response = request.await;
match response.as_ref() {
Err(missing_error) => {
let missing_error = missing_error
.downcast_ref::<SharedPeerError>()
.expect("unexpected inner error type, expected SharedPeerError");
.oneshot(Request::TransactionsById(txs.iter().copied().collect()))
.await;
if let Err(missing_error) = response.as_ref() {
let missing_error = missing_error
.downcast_ref::<SharedPeerError>()
.expect("unexpected inner error type, expected SharedPeerError");
// Unfortunately, we can't access SharedPeerError's inner type,
// so we can't compare the actual responses.
if txs == vec![test_tx] {
assert_eq!(
missing_error.inner_debug(),
"NotFound([Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\"))])",
);
} else if txs == vec![test_wtx] {
assert_eq!(
missing_error.inner_debug(),
"NotFound([Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") })])",
);
} else if txs == vec![test_tx, test_wtx] {
// The response order is unstable, because it depends on concurrent inbound futures.
// In #2244 we will fix this by replacing response Vecs with HashSets.
assert!(
missing_error.inner_debug() ==
"NotFound([Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\")), Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") })])"
||
missing_error.inner_debug() ==
"NotFound([Wtx(WtxId { id: transaction::Hash(\"3333333333333333333333333333333333333333333333333333333333333333\"), auth_digest: AuthDigest(\"4444444444444444444444444444444444444444444444444444444444444444\") }), Tx(transaction::Hash(\"2222222222222222222222222222222222222222222222222222222222222222\"))])",
"unexpected response: {:?}",
missing_error.inner_debug(),
);
} else {
unreachable!("unexpected test case");
}
// Unfortunately, we can't access SharedPeerError's inner type,
// so we can't compare the actual responses.
if txs.len() <= 1 {
let expected = PeerError::NotFoundResponse(
txs.iter().copied().map(InventoryHash::from).collect(),
);
let expected = SharedPeerError::from(expected);
assert_eq!(missing_error.inner_debug(), expected.inner_debug());
} else {
// The response order is unstable, because it depends on concurrent inbound futures.
// In #2244 we will fix this by replacing response Vecs with HashSets.
//
// Assume there are only 2 transactions.
let expected1: Vec<InventoryHash> =
txs.iter().copied().map(InventoryHash::from).collect();
let expected2: Vec<InventoryHash> =
txs.iter().rev().copied().map(InventoryHash::from).collect();
let expected: Vec<String> = [expected1, expected2]
.into_iter()
.map(PeerError::NotFoundResponse)
.map(|error| SharedPeerError::from(error).inner_debug())
.collect();
let actual = missing_error.inner_debug();
assert!(
expected.iter().any(|expected| expected == &actual),
"unexpected response: {:?} \
expected one of: {:?}",
actual,
expected,
);
}
_ => unreachable!(
"peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFound(_)))`, \
} else {
unreachable!(
"peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFoundResponse(_)))`, \
actual result: {:?}",
response
),
response
)
};
}
@ -320,12 +325,286 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> {
Ok(())
}
/// Check that a network stack:
/// - returns a `NotFound` error when a peer responds with an unrelated transaction, and
/// - returns a `NotFoundRegistry` error for repeated requests to a non-responsive peer.
///
/// The requests are coming from the full stack to the isolated peer,
/// so this is the reverse of the previous tests.
///
/// Uses a Zebra network stack's peer set to query an isolated Zebra TCP connection,
/// with an unrelated transaction test responder.
#[tokio::test]
async fn outbound_tx_unrelated_response_notfound() -> Result<(), crate::BoxError> {
// We respond with an unrelated transaction, so the peer gives up on the request.
let unrelated_response: Transaction =
zebra_test::vectors::DUMMY_TX1.zcash_deserialize_into()?;
let unrelated_response = Response::Transactions(vec![Available(unrelated_response.into())]);
let (
// real services
_connected_peer_service,
_inbound_service,
peer_set,
_mempool_service,
_state_service,
// mocked services
_mock_block_verifier,
_mock_tx_verifier,
// real tasks
block_gossip_task_handle,
tx_gossip_task_handle,
// real open socket addresses
_listen_addr,
) = setup(Some(unrelated_response)).await;
let test_tx5 = UnminedTxId::from_legacy_id(TxHash([0x55; 32]));
let test_wtx67: UnminedTxId = WtxId {
id: TxHash([0x66; 32]),
auth_digest: AuthDigest([0x77; 32]),
}
.into();
let test_tx8 = UnminedTxId::from_legacy_id(TxHash([0x88; 32]));
let test_wtx91: UnminedTxId = WtxId {
id: TxHash([0x99; 32]),
auth_digest: AuthDigest([0x11; 32]),
}
.into();
// Test both transaction ID variants, separately and together.
// These IDs all need to be different, to avoid polluting the inventory registry between tests.
for txs in [vec![test_tx5], vec![test_wtx67], vec![test_tx8, test_wtx91]] {
// Send a request via the peer set, via a local TCP connection,
// to the isolated peer's `unrelated_response` inbound service
let response = peer_set
.clone()
.oneshot(Request::TransactionsById(txs.iter().copied().collect()))
.await;
// Unfortunately, we can't access SharedPeerError's inner type,
// so we can't compare the actual responses.
if let Err(missing_error) = response.as_ref() {
let missing_error = missing_error
.downcast_ref::<SharedPeerError>()
.expect("unexpected inner error type, expected SharedPeerError");
if txs.len() <= 1 {
let expected = PeerError::NotFoundResponse(
txs.iter().copied().map(InventoryHash::from).collect(),
);
let expected = SharedPeerError::from(expected);
assert_eq!(missing_error.inner_debug(), expected.inner_debug());
} else {
// The response order is unstable, because it depends on concurrent inbound futures.
// In #2244 we will fix this by replacing response Vecs with HashSets.
//
// Assume there are only 2 transactions.
let expected1: Vec<InventoryHash> =
txs.iter().copied().map(InventoryHash::from).collect();
let expected2: Vec<InventoryHash> =
txs.iter().rev().copied().map(InventoryHash::from).collect();
let expected: Vec<String> = [expected1, expected2]
.into_iter()
.map(PeerError::NotFoundResponse)
.map(|error| SharedPeerError::from(error).inner_debug())
.collect();
let actual = missing_error.inner_debug();
assert!(
expected.iter().any(|expected| expected == &actual),
"unexpected response: {:?} \
expected one of: {:?}",
actual,
expected,
);
}
} else {
unreachable!(
"peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFoundResponse(_)))`, \
actual result: {:?}",
response
)
};
// The peer set only does routing for single-transaction requests.
// (But the inventory tracker tracks the response to requests of any size.)
for tx in &txs {
// Now send the same request to the peer set,
// but expect a local failure from the inventory registry.
let response = peer_set
.clone()
.oneshot(Request::TransactionsById(iter::once(tx).copied().collect()))
.await;
// The only ready peer in the PeerSet failed the same request,
// so we expect the peer set to return a `NotFoundRegistry` error immediately.
//
// If these asserts fail, then the PeerSet isn't returning inv routing error responses.
// (Or the missing inventory from the previous timeout wasn't registered correctly.)
if let Err(missing_error) = response.as_ref() {
let missing_error = missing_error
.downcast_ref::<SharedPeerError>()
.expect("unexpected inner error type, expected SharedPeerError");
// Unfortunately, we can't access SharedPeerError's inner type,
// so we can't compare the actual responses.
let expected = PeerError::NotFoundRegistry(vec![InventoryHash::from(*tx)]);
let expected = SharedPeerError::from(expected);
assert_eq!(missing_error.inner_debug(), expected.inner_debug());
} else {
unreachable!(
"peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFoundRegistry(_)))`, \
actual result: {:?}",
response
)
};
}
}
let block_gossip_result = block_gossip_task_handle.now_or_never();
assert!(
matches!(block_gossip_result, None),
"unexpected error or panic in block gossip task: {:?}",
block_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
/// Check that a network stack:
/// - returns a partial notfound response, when a peer partially responds to a multi-transaction request,
/// - returns a `NotFoundRegistry` error for repeated requests to a non-responsive peer.
///
/// The requests are coming from the full stack to the isolated peer.
#[tokio::test]
async fn outbound_tx_partial_response_notfound() -> Result<(), crate::BoxError> {
// We repeatedly respond with the same transaction, so the peer gives up on the second response.
let repeated_tx: Transaction = zebra_test::vectors::DUMMY_TX1.zcash_deserialize_into()?;
let repeated_tx: UnminedTx = repeated_tx.into();
let repeated_response = Response::Transactions(vec![
Available(repeated_tx.clone()),
Available(repeated_tx.clone()),
]);
let (
// real services
_connected_peer_service,
_inbound_service,
peer_set,
_mempool_service,
_state_service,
// mocked services
_mock_block_verifier,
_mock_tx_verifier,
// real tasks
block_gossip_task_handle,
tx_gossip_task_handle,
// real open socket addresses
_listen_addr,
) = setup(Some(repeated_response)).await;
let missing_tx_id = UnminedTxId::from_legacy_id(TxHash([0x22; 32]));
let txs = [missing_tx_id, repeated_tx.id];
// Send a request via the peer set, via a local TCP connection,
// to the isolated peer's `repeated_response` inbound service
let response = peer_set
.clone()
.oneshot(Request::TransactionsById(txs.iter().copied().collect()))
.await;
if let Ok(Response::Transactions(tx_response)) = response.as_ref() {
let available: Vec<UnminedTx> = tx_response
.iter()
.filter_map(InventoryResponse::available)
.collect();
let missing: Vec<UnminedTxId> = tx_response
.iter()
.filter_map(InventoryResponse::missing)
.collect();
assert_eq!(available, vec![repeated_tx]);
assert_eq!(missing, vec![missing_tx_id]);
} else {
unreachable!(
"peer::Connection should map partial `TransactionsById` responses as `Ok(Response::Transactions(_))`, \
actual result: {:?}",
response
)
};
// Now send another request to the peer set with only the missing transaction,
// but expect a local failure from the inventory registry.
//
// The peer set only does routing for single-transaction requests.
// (But the inventory tracker tracks the response to requests of any size.)
let response = peer_set
.clone()
.oneshot(Request::TransactionsById(
iter::once(missing_tx_id).collect(),
))
.await;
// The only ready peer in the PeerSet failed the same request,
// so we expect the peer set to return a `NotFoundRegistry` error immediately.
//
// If these asserts fail, then the PeerSet isn't returning inv routing error responses.
// (Or the missing inventory from the previous timeout wasn't registered correctly.)
if let Err(missing_error) = response.as_ref() {
let missing_error = missing_error
.downcast_ref::<SharedPeerError>()
.expect("unexpected inner error type, expected SharedPeerError");
// Unfortunately, we can't access SharedPeerError's inner type,
// so we can't compare the actual responses.
let expected = PeerError::NotFoundRegistry(vec![InventoryHash::from(missing_tx_id)]);
let expected = SharedPeerError::from(expected);
assert_eq!(missing_error.inner_debug(), expected.inner_debug());
} else {
unreachable!(
"peer::Connection should map missing `TransactionsById` responses as `Err(SharedPeerError(NotFoundRegistry(_)))`, \
actual result: {:?}",
response
)
};
let block_gossip_result = block_gossip_task_handle.now_or_never();
assert!(
matches!(block_gossip_result, None),
"unexpected error or panic in block gossip task: {:?}",
block_gossip_result,
);
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
assert!(
matches!(tx_gossip_result, None),
"unexpected error or panic in transaction gossip task: {:?}",
tx_gossip_result,
);
Ok(())
}
/// Setup a real Zebra network stack, with a connected peer using a real isolated network stack.
///
/// The isolated peer responds to every request with `isolated_peer_response`.
/// (If no response is provided, the isolated peer ignores inbound requests.)
///
/// Uses fake verifiers, and does not run a block syncer task.
async fn setup() -> (
async fn setup(
isolated_peer_response: Option<Response>,
) -> (
// real services
// connected peer
// connected peer which responds with isolated_peer_response
Buffer<
BoxService<zebra_network::Request, zebra_network::Response, BoxError>,
zebra_network::Request,
@ -462,11 +741,23 @@ async fn setup() -> (
peer_set.clone(),
));
// Set up the inbound service response for the isolated peer
let isolated_peer_response = isolated_peer_response.unwrap_or(Response::Nil);
let response_inbound_service = tower::service_fn(move |_req| {
let isolated_peer_response = isolated_peer_response.clone();
async move { Ok::<Response, BoxError>(isolated_peer_response) }
});
let user_agent = "test".to_string();
// Open a fake peer connection to the inbound listener, using the isolated connection API
let connected_peer_service =
connect_isolated_tcp_direct(network, listen_addr, "test".to_string())
.await
.expect("local listener connection succeeds");
let connected_peer_service = connect_isolated_tcp_direct_with_inbound(
network,
listen_addr,
user_agent,
response_inbound_service,
)
.await
.expect("local listener connection succeeds");
let connected_peer_service = ServiceBuilder::new()
.buffer(10)
.service(connected_peer_service);

View File

@ -853,6 +853,8 @@ where
BlockDownloadVerifyError::DownloadFailed(ref source)
if format!("{:?}", source).contains("NotFound") =>
{
// Covers both NotFoundResponse and NotFoundRegistry errors.
//
// TODO: improve this by checking the type (#2908)
// restart after a certain number of NotFound errors?
debug!(error = ?e, "block was not found, possibly from a peer that doesn't have the block yet, continuing");

View File

@ -10,10 +10,18 @@ use futures::future;
use tokio::time::{timeout, Duration};
use zebra_chain::parameters::{Network, POST_BLOSSOM_POW_TARGET_SPACING};
use zebra_network::constants::{DEFAULT_CRAWL_NEW_PEER_INTERVAL, HANDSHAKE_TIMEOUT};
use zebra_network::constants::{
DEFAULT_CRAWL_NEW_PEER_INTERVAL, HANDSHAKE_TIMEOUT, INVENTORY_ROTATION_INTERVAL,
};
use zebra_state::ChainTipSender;
use super::super::*;
use crate::config::ZebradConfig;
use crate::{
components::sync::{
ChainSync, BLOCK_DOWNLOAD_RETRY_LIMIT, BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT,
GENESIS_TIMEOUT_RETRY, SYNC_RESTART_DELAY,
},
config::ZebradConfig,
};
/// Make sure the timeout values are consistent with each other.
#[test]
@ -78,6 +86,20 @@ fn ensure_timeouts_consistent() {
"a syncer tip crawl should complete before most new blocks"
);
// This is a compromise between two failure modes:
// - some peers have the inventory, but they weren't ready last time we checked,
// so we want to retry soon
// - all peers are missing the inventory, so we want to wait for a while before retrying
assert!(
INVENTORY_ROTATION_INTERVAL < SYNC_RESTART_DELAY,
"we should expire some inventory every time the syncer resets"
);
assert!(
SYNC_RESTART_DELAY < 2 * INVENTORY_ROTATION_INTERVAL,
"we should give the syncer at least one retry attempt, \
before we expire all inventory"
);
// The default peer crawler interval should be at least
// `HANDSHAKE_TIMEOUT` lower than all other crawler intervals.
//
@ -133,7 +155,7 @@ fn request_genesis_is_rate_limited() {
});
// create an empty latest chain tip
let (_sender, latest_chain_tip, _change) = zs::ChainTipSender::new(None, Network::Mainnet);
let (_sender, latest_chain_tip, _change) = ChainTipSender::new(None, Network::Mainnet);
// create a verifier service that will always panic as it will never be called
let verifier_service =

View File

@ -11,7 +11,7 @@ use zebra_chain::{
serialization::ZcashDeserializeInto,
};
use zebra_consensus::Config as ConsensusConfig;
use zebra_network::ResponseStatus;
use zebra_network::InventoryResponse;
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion};
@ -26,7 +26,7 @@ use crate::{
config::ZebradConfig,
};
use ResponseStatus::*;
use InventoryResponse::*;
/// Maximum time to wait for a request to any test service.
///