From b71833292de6295703fb967aa5a02d8319721f08 Mon Sep 17 00:00:00 2001 From: Janito Vaqueiro Ferreira Filho Date: Tue, 21 Dec 2021 17:13:26 -0300 Subject: [PATCH] Use `MockedClientHandle` in other tests (#3241) * Move `MockedClientHandle` to `peer` module It's more closely related to a `Client` than the `PeerSet`, and this prepares it to be used by other tests. * Rename `MockedClientHandle` to `ClientTestHarness` Reduce confusion, and clarify that the client is not mocked. Co-authored-by: teor * Add clarification to `mock_peers` documentation Explicitly say how the generated data is returned. * Rename method to `wants_connection_heartbeats` The `Client` service only represents one direction of a connection, so `is_connected` is not the exact term. Co-authored-by: teor * Mock `Client` instead of `LoadTrackedClient` Move where the conversion from mocked `Client` to mocked `LoadTrackedClient` in order to make the test helper more easily used by other tests. * Use `ClientTestHarness` in `initialize` tests Replace the boilerplate code to create a fake `Client` instance with usages of the `ClientTestHarness` constructor. * Allow receiving requests from `Client` instance Create a helper type to wrap the result, to make it easier to assert on specific events after trying to receive a request. * Allow inspecting the current error in the slot Share the `ErrorSlot` between the `Client` and the handle, so that the handle can be used to inspect the contents of the `ErrorSlot`. * Allow placing an error into the `ErrorSlot` Assuming it is initially empty. If it already has an error, the code will panic. * Allow gracefully closing the request receiver Close the endpoint with the appropriate call to the `close()` method. * Allow dropping the request receiver endpoint Forcefully closes the endpoint. * Rename field to `client_request_receiver` Also rename the related methods to include `outbound_client_request_receiver` to make it more precise. Co-authored-by: teor * Allow dropping the heartbeat shutdown receiver Allows the `Client` to detect that the channel has been closed. * Rename fn. to `drop_heartbeat_shutdown_receiver` Make it clear that it affects the heartbeat task. Co-authored-by: teor * Move `NowOrLater` into a new `now-or-later` crate Make it easily accessible to other crates. * Add `IsReady` extension trait for `Service` Simplifies checking if a service is immediately ready to be called. * Add extension method to check for readiness error Checks if the `Service` isn't immediately ready because a call to `ready` immediately returns an error. * Rename method to `is_failed` Avoid negated method names. Co-authored-by: teor * Add a `IsReady::is_pending` extension method Checks if a `Service` is not ready to be called. * Use `ClientTestHarness` in `Client` test vectors Reduce repeated code and try to improve readability. * Create a new `ClientTestHarnessBuilder` type A builder to create test `Client` instances using mock data which can be tracked and manipulated through a `ClientTestHarness`. * Allow configuring the `Client`'s mocked version Add a `with_version` builder method. * Use `ClientTestHarnessBuilder` in `PeerVersions` Use the builder to set the peer version, so that the `version` parameter can be removed from the constructor later. * Use a default mock version where possible Reduce noise when setting up the harness for tests that don't really care about the remote peer version. * Remove `Version` parameter from the `build` method The `with_version` builder method should be used instead. * Fix some typos and outdated info in the release checklist * Add extra client tests for zero and multiple readiness checks (#3273) And document existing tests. * Replace `NowOrLater` with `futures::poll!` (#3272) * Replace NowOrLater with the futures::poll! macro in zebrad * Replace NowOrLater with the futures::poll! macro in zebra-test * Remove the now-or-later crate * remove unused imports * rustfmt Co-authored-by: teor Co-authored-by: Alfredo Garcia --- .../release-checklist.md | 4 +- zebra-network/src/peer.rs | 2 + zebra-network/src/peer/client.rs | 4 +- zebra-network/src/peer/client/tests.rs | 188 ++++++++++++- .../src/peer/client/tests/vectors.rs | 265 ++++++------------ .../src/peer_set/initialize/tests/vectors.rs | 97 +------ zebra-network/src/peer_set/set/tests.rs | 82 ++---- zebra-network/src/peer_set/set/tests/prop.rs | 30 +- zebra-test/src/lib.rs | 1 + zebra-test/src/service_extensions.rs | 48 ++++ zebrad/src/async_ext.rs | 5 - zebrad/src/async_ext/now_or_later.rs | 63 ----- zebrad/src/components/sync.rs | 7 +- zebrad/src/lib.rs | 1 - 14 files changed, 379 insertions(+), 418 deletions(-) create mode 100644 zebra-test/src/service_extensions.rs delete mode 100644 zebrad/src/async_ext.rs delete mode 100644 zebrad/src/async_ext/now_or_later.rs diff --git a/.github/PULL_REQUEST_TEMPLATE/release-checklist.md b/.github/PULL_REQUEST_TEMPLATE/release-checklist.md index acdfc9bd8..a9e509fc8 100644 --- a/.github/PULL_REQUEST_TEMPLATE/release-checklist.md +++ b/.github/PULL_REQUEST_TEMPLATE/release-checklist.md @@ -69,11 +69,9 @@ After you have your changes pushed start a PR with them using this template by a You can use `fastmod` to interactively find and replace versions. -For example, for `zebra-1.0.0-alpha.12`, we did: +For example, you can do something like: ``` fastmod --extensions rs,toml,md --fixed-strings '1.0.0-alpha.12' '1.0.0-alpha.13' -fastmod --extensions rs,toml,md --fixed-strings '1.0.0-alpha.11' '1.0.0-alpha.12' -fastmod --extensions rs,toml,md --fixed-strings '1.0.0-alpha.10' '1.0.0-alpha.11' fastmod --extensions rs,toml,md --fixed-strings '0.2.9' '0.2.10' tower-batch fastmod --extensions rs,toml,md --fixed-strings '0.2.8' '0.2.9' tower-fallback ``` diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index 8f4be9a86..1a10a96e2 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -15,6 +15,8 @@ mod load_tracked_client; /// Watches for chain tip height updates to determine the minimum support peer protocol version. mod minimum_peer_version; +#[cfg(any(test, feature = "proptest-impl"))] +pub use client::tests::ClientTestHarness; #[cfg(not(test))] use client::ClientRequest; #[cfg(test)] diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index 6e55775b9..da0e32b58 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -21,8 +21,8 @@ use crate::{ use super::{ErrorSlot, PeerError, SharedPeerError}; -#[cfg(test)] -mod tests; +#[cfg(any(test, feature = "proptest-impl"))] +pub mod tests; /// The "client" duplex half of a peer connection. pub struct Client { diff --git a/zebra-network/src/peer/client/tests.rs b/zebra-network/src/peer/client/tests.rs index 78babc731..e99807283 100644 --- a/zebra-network/src/peer/client/tests.rs +++ b/zebra-network/src/peer/client/tests.rs @@ -1,3 +1,189 @@ -//! Tests for the [`Client`] part of peer connections +//! Tests for the [`Client`] part of peer connections, and some test utilities for mocking +//! [`Client`] instances. mod vectors; + +use futures::channel::{mpsc, oneshot}; + +use crate::{ + peer::{error::SharedPeerError, CancelHeartbeatTask, Client, ClientRequest, ErrorSlot}, + protocol::external::types::Version, +}; + +/// A harness with mocked channels for testing a [`Client`] instance. +pub struct ClientTestHarness { + client_request_receiver: Option>, + shutdown_receiver: Option>, + error_slot: ErrorSlot, + version: Version, +} + +impl ClientTestHarness { + /// Create a [`ClientTestHarnessBuilder`] instance to help create a new [`Client`] instance + /// and a [`ClientTestHarness`] to track it. + pub fn build() -> ClientTestHarnessBuilder { + ClientTestHarnessBuilder { version: None } + } + + /// Gets the peer protocol version associated to the [`Client`]. + pub fn version(&self) -> Version { + self.version + } + + /// Returns true if the [`Client`] instance still wants connection heartbeats to be sent. + /// + /// Checks that the client: + /// - has not been dropped, + /// - has not closed or dropped the mocked heartbeat task channel, and + /// - has not asked the mocked heartbeat task to shut down. + pub fn wants_connection_heartbeats(&mut self) -> bool { + let receive_result = self + .shutdown_receiver + .as_mut() + .expect("heartbeat shutdown receiver endpoint has been dropped") + .try_recv(); + + match receive_result { + Ok(None) => true, + Ok(Some(CancelHeartbeatTask)) | Err(oneshot::Canceled) => false, + } + } + + /// Drops the mocked heartbeat shutdown receiver endpoint. + pub fn drop_heartbeat_shutdown_receiver(&mut self) { + let _ = self + .shutdown_receiver + .take() + .expect("heartbeat shutdown receiver endpoint has already been dropped"); + } + + /// Closes the receiver endpoint of [`ClientRequests`] that are supposed to be sent to the + /// remote peer. + /// + /// The remote peer that would receive the requests is mocked for testing. + pub fn close_outbound_client_request_receiver(&mut self) { + self.client_request_receiver + .as_mut() + .expect("request receiver endpoint has been dropped") + .close(); + } + + /// Drops the receiver endpoint of [`ClientRequests`], forcefully closing the channel. + /// + /// The remote peer that would receive the requests is mocked for testing. + pub fn drop_outbound_client_request_receiver(&mut self) { + self.client_request_receiver + .take() + .expect("request receiver endpoint has already been dropped"); + } + + /// Tries to receive a [`ClientRequest`] sent by the [`Client`] instance. + /// + /// The remote peer that would receive the requests is mocked for testing. + pub(crate) fn try_to_receive_outbound_client_request(&mut self) -> ReceiveRequestAttempt { + let receive_result = self + .client_request_receiver + .as_mut() + .expect("request receiver endpoint has been dropped") + .try_next(); + + match receive_result { + Ok(Some(request)) => ReceiveRequestAttempt::Request(request), + Ok(None) => ReceiveRequestAttempt::Closed, + Err(_) => ReceiveRequestAttempt::Empty, + } + } + + /// Returns the current error in the [`ErrorSlot`], if there is one. + pub fn current_error(&self) -> Option { + self.error_slot.try_get_error() + } + + /// Sets the error in the [`ErrorSlot`], assuming there isn't one already. + /// + /// # Panics + /// + /// If there's already an error in the [`ErrorSlot`]. + pub fn set_error(&self, error: impl Into) { + self.error_slot + .try_update_error(error.into()) + .expect("unexpected earlier error in error slot") + } +} + +/// The result of an attempt to receive a [`ClientRequest`] sent by the [`Client`] instance. +/// +/// The remote peer that would receive the request is mocked for testing. +pub(crate) enum ReceiveRequestAttempt { + /// The [`Client`] instance has closed the sender endpoint of the channel. + Closed, + + /// There were no queued requests in the channel. + Empty, + + /// One request was successfully received. + Request(ClientRequest), +} + +impl ReceiveRequestAttempt { + /// Check if the attempt to receive resulted in discovering that the sender endpoint had been + /// closed. + pub fn is_closed(&self) -> bool { + matches!(self, ReceiveRequestAttempt::Closed) + } + + /// Check if the attempt to receive resulted in no requests. + pub fn is_empty(&self) -> bool { + matches!(self, ReceiveRequestAttempt::Empty) + } + + /// Returns the received request, if there was one. + #[allow(dead_code)] + pub fn request(self) -> Option { + match self { + ReceiveRequestAttempt::Request(request) => Some(request), + ReceiveRequestAttempt::Closed | ReceiveRequestAttempt::Empty => None, + } + } +} + +/// A builder for a [`Client`] and [`ClientTestHarness`] instance. +/// +/// Mocked data is used to construct a real [`Client`] instance. The mocked data is initialized by +/// the [`ClientTestHarnessBuilder`], and can be accessed and changed through the +/// [`ClientTestHarness`]. +pub struct ClientTestHarnessBuilder { + version: Option, +} + +impl ClientTestHarnessBuilder { + /// Configure the mocked version for the peer. + pub fn with_version(mut self, version: Version) -> Self { + self.version = Some(version); + self + } + + /// Build a [`Client`] instance with the mocked data and a [`ClientTestHarness`] to track it. + pub fn finish(self) -> (Client, ClientTestHarness) { + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + let (client_request_sender, client_request_receiver) = mpsc::channel(1); + let error_slot = ErrorSlot::default(); + let version = self.version.unwrap_or(Version(0)); + + let client = Client { + shutdown_tx: Some(shutdown_sender), + server_tx: client_request_sender, + error_slot: error_slot.clone(), + version, + }; + + let harness = ClientTestHarness { + client_request_receiver: Some(client_request_receiver), + shutdown_receiver: Some(shutdown_receiver), + error_slot, + version, + }; + + (client, harness) + } +} diff --git a/zebra-network/src/peer/client/tests/vectors.rs b/zebra-network/src/peer/client/tests/vectors.rs index 3d18d790e..5a331973e 100644 --- a/zebra-network/src/peer/client/tests/vectors.rs +++ b/zebra-network/src/peer/client/tests/vectors.rs @@ -1,233 +1,152 @@ //! Fixed peer [`Client`] test vectors. -use futures::{ - channel::{mpsc, oneshot}, - FutureExt, -}; use tower::ServiceExt; -use crate::{ - peer::{CancelHeartbeatTask, Client, ErrorSlot}, - protocol::external::types::Version, - PeerError, -}; +use zebra_test::service_extensions::IsReady; +use crate::{peer::ClientTestHarness, PeerError}; + +/// Test that a newly initialized client functions correctly before it is polled. +#[tokio::test] +async fn client_service_ok_without_readiness_check() { + zebra_test::init(); + + let (_client, mut harness) = ClientTestHarness::build().finish(); + + assert!(harness.current_error().is_none()); + assert!(harness.wants_connection_heartbeats()); + assert!(harness.try_to_receive_outbound_client_request().is_empty()); +} + +/// Test that a newly initialized client functions correctly after it is polled. #[tokio::test] async fn client_service_ready_ok() { zebra_test::init(); - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - let (server_tx, mut server_rx) = mpsc::channel(1); + let (mut client, mut harness) = ClientTestHarness::build().finish(); - let shared_error_slot = ErrorSlot::default(); - - let mut client = Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot: shared_error_slot.clone(), - version: Version(0), - }; - - let result = client.ready().now_or_never(); - assert!(matches!(result, Some(Ok(Client { .. })))); - - let error = shared_error_slot.try_get_error(); - assert!(matches!(error, None)); - - let result = shutdown_rx.try_recv(); - assert!(matches!(result, Ok(None))); - - // Unlike oneshots, open futures::mpsc channels return Err when empty - let result = server_rx.try_next(); - assert!(matches!(result, Err(_))); + assert!(client.is_ready().await); + assert!(harness.current_error().is_none()); + assert!(harness.wants_connection_heartbeats()); + assert!(harness.try_to_receive_outbound_client_request().is_empty()); } +/// Test that a client functions correctly if its readiness future is dropped. +#[tokio::test] +async fn client_service_ready_drop_ok() { + zebra_test::init(); + + let (mut client, mut harness) = ClientTestHarness::build().finish(); + + std::mem::drop(client.ready()); + + assert!(client.is_ready().await); + assert!(harness.current_error().is_none()); + assert!(harness.wants_connection_heartbeats()); + assert!(harness.try_to_receive_outbound_client_request().is_empty()); +} + +/// Test that a client functions correctly if it is polled for readiness multiple times. +#[tokio::test] +async fn client_service_ready_multiple_ok() { + zebra_test::init(); + + let (mut client, mut harness) = ClientTestHarness::build().finish(); + + assert!(client.is_ready().await); + assert!(client.is_ready().await); + + assert!(harness.current_error().is_none()); + assert!(harness.wants_connection_heartbeats()); + assert!(harness.try_to_receive_outbound_client_request().is_empty()); +} + +/// Test that clients propagate errors from their heartbeat tasks. #[tokio::test] async fn client_service_ready_heartbeat_exit() { zebra_test::init(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let (server_tx, mut server_rx) = mpsc::channel(1); + let (mut client, mut harness) = ClientTestHarness::build().finish(); - let shared_error_slot = ErrorSlot::default(); + harness.set_error(PeerError::HeartbeatTaskExited); + harness.drop_heartbeat_shutdown_receiver(); - let mut client = Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot: shared_error_slot.clone(), - version: Version(0), - }; - - shared_error_slot - .try_update_error(PeerError::HeartbeatTaskExited.into()) - .expect("unexpected earlier error in tests"); - std::mem::drop(shutdown_rx); - - let result = client.ready().now_or_never(); - assert!(matches!(result, Some(Err(_)))); - - let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); - - // Unlike oneshots, closed futures::mpsc channels return None - let result = server_rx.try_next(); - assert!(matches!(result, Ok(None))); + assert!(client.is_failed().await); + assert!(harness.current_error().is_some()); + assert!(harness.try_to_receive_outbound_client_request().is_closed()); } +/// Test that clients propagate errors from their connection tasks. #[tokio::test] async fn client_service_ready_request_drop() { zebra_test::init(); - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - let (server_tx, server_rx) = mpsc::channel(1); + let (mut client, mut harness) = ClientTestHarness::build().finish(); - let shared_error_slot = ErrorSlot::default(); + harness.set_error(PeerError::ConnectionDropped); + harness.drop_outbound_client_request_receiver(); - let mut client = Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot: shared_error_slot.clone(), - version: Version(0), - }; - - shared_error_slot - .try_update_error(PeerError::ConnectionDropped.into()) - .expect("unexpected earlier error in tests"); - std::mem::drop(server_rx); - - let result = client.ready().now_or_never(); - assert!(matches!(result, Some(Err(_)))); - - let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); - - let result = shutdown_rx.try_recv(); - assert!(matches!(result, Ok(Some(CancelHeartbeatTask)))); + assert!(client.is_failed().await); + assert!(harness.current_error().is_some()); + assert!(!harness.wants_connection_heartbeats()); } +/// Test that clients error when their connection task closes the request channel. #[tokio::test] async fn client_service_ready_request_close() { zebra_test::init(); - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - let (server_tx, mut server_rx) = mpsc::channel(1); + let (mut client, mut harness) = ClientTestHarness::build().finish(); - let shared_error_slot = ErrorSlot::default(); + harness.set_error(PeerError::ConnectionClosed); + harness.close_outbound_client_request_receiver(); - let mut client = Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot: shared_error_slot.clone(), - version: Version(0), - }; - - shared_error_slot - .try_update_error(PeerError::ConnectionClosed.into()) - .expect("unexpected earlier error in tests"); - server_rx.close(); - - let result = client.ready().now_or_never(); - assert!(matches!(result, Some(Err(_)))); - - let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); - - let result = shutdown_rx.try_recv(); - assert!(matches!(result, Ok(Some(CancelHeartbeatTask)))); - - let result = server_rx.try_next(); - assert!(matches!(result, Ok(None))); + assert!(client.is_failed().await); + assert!(harness.current_error().is_some()); + assert!(!harness.wants_connection_heartbeats()); + assert!(harness.try_to_receive_outbound_client_request().is_closed()); } #[tokio::test] async fn client_service_ready_error_in_slot() { zebra_test::init(); - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - let (server_tx, mut server_rx) = mpsc::channel(1); + let (mut client, mut harness) = ClientTestHarness::build().finish(); - let shared_error_slot = ErrorSlot::default(); + harness.set_error(PeerError::Overloaded); - let mut client = Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot: shared_error_slot.clone(), - version: Version(0), - }; - - shared_error_slot - .try_update_error(PeerError::Overloaded.into()) - .expect("unexpected earlier error in tests"); - - let result = client.ready().now_or_never(); - assert!(matches!(result, Some(Err(_)))); - - let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); - - let result = shutdown_rx.try_recv(); - assert!(matches!(result, Ok(Some(CancelHeartbeatTask)))); - - let result = server_rx.try_next(); - assert!(matches!(result, Ok(None))); + assert!(client.is_failed().await); + assert!(harness.current_error().is_some()); + assert!(!harness.wants_connection_heartbeats()); + assert!(harness.try_to_receive_outbound_client_request().is_closed()); } +/// Test that clients error when multiple error conditions occur at the same time. #[tokio::test] async fn client_service_ready_multiple_errors() { zebra_test::init(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let (server_tx, mut server_rx) = mpsc::channel(1); + let (mut client, mut harness) = ClientTestHarness::build().finish(); - let shared_error_slot = ErrorSlot::default(); + harness.set_error(PeerError::DuplicateHandshake); + harness.drop_heartbeat_shutdown_receiver(); + harness.close_outbound_client_request_receiver(); - let mut client = Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot: shared_error_slot.clone(), - version: Version(0), - }; - - shared_error_slot - .try_update_error(PeerError::DuplicateHandshake.into()) - .expect("unexpected earlier error in tests"); - std::mem::drop(shutdown_rx); - server_rx.close(); - - let result = client.ready().now_or_never(); - assert!(matches!(result, Some(Err(_)))); - - let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); - - let result = server_rx.try_next(); - assert!(matches!(result, Ok(None))); + assert!(client.is_failed().await); + assert!(harness.current_error().is_some()); + assert!(harness.try_to_receive_outbound_client_request().is_closed()); } +/// Test that clients register an error and cleanup channels correctly when the client is dropped. #[tokio::test] async fn client_service_drop_cleanup() { zebra_test::init(); - let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); - let (server_tx, mut server_rx) = mpsc::channel(1); - - let shared_error_slot = ErrorSlot::default(); - - let client = Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot: shared_error_slot.clone(), - version: Version(0), - }; + let (client, mut harness) = ClientTestHarness::build().finish(); std::mem::drop(client); - let error = shared_error_slot.try_get_error(); - assert!(matches!(error, Some(_))); - - let result = shutdown_rx.try_recv(); - assert!(matches!(result, Ok(Some(CancelHeartbeatTask)))); - - let result = server_rx.try_next(); - assert!(matches!(result, Ok(None))); + assert!(harness.current_error().is_some()); + assert!(!harness.wants_connection_heartbeats()); + assert!(harness.try_to_receive_outbound_client_request().is_closed()); } diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index de5ca0104..cd33a20e6 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -21,10 +21,7 @@ use std::{ }; use chrono::Utc; -use futures::{ - channel::{mpsc, oneshot}, - FutureExt, StreamExt, -}; +use futures::{channel::mpsc, FutureExt, StreamExt}; use tokio::{net::TcpStream, task::JoinHandle}; use tower::{service_fn, Service}; use tracing::Span; @@ -36,7 +33,7 @@ use crate::{ address_book_updater::AddressBookUpdater, constants, init, meta_addr::MetaAddr, - peer::{self, ErrorSlot, HandshakeRequest, OutboundConnectorRequest}, + peer::{self, ClientTestHarness, HandshakeRequest, OutboundConnectorRequest}, peer_set::{ initialize::{ accept_inbound_connections, add_initial_peers, crawl_and_dial, open_listener, @@ -45,7 +42,7 @@ use crate::{ set::MorePeers, ActiveConnectionCounter, CandidateSet, }, - protocol::{external::types::Version, types::PeerServices}, + protocol::types::PeerServices, AddressBook, BoxError, Config, Request, Response, }; @@ -351,16 +348,7 @@ async fn crawler_peer_limit_one_connect_ok_then_drop() { connection_tracker, } = req; - let (server_tx, _server_rx) = mpsc::channel(0); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let error_slot = ErrorSlot::default(); - - let fake_client = peer::Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot, - version: Version(1), - }; + let (fake_client, _harness) = ClientTestHarness::build().finish(); // Fake the connection closing. std::mem::drop(connection_tracker); @@ -424,16 +412,7 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { connection_tracker, } = req; - let (server_tx, _server_rx) = mpsc::channel(0); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let error_slot = ErrorSlot::default(); - - let fake_client = peer::Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot, - version: Version(1), - }; + let (fake_client, _harness) = ClientTestHarness::build().finish(); // Make the connection staying open. peer_tracker_tx @@ -544,16 +523,7 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() { connection_tracker, } = req; - let (server_tx, _server_rx) = mpsc::channel(0); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let error_slot = ErrorSlot::default(); - - let fake_client = peer::Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot, - version: Version(1), - }; + let (fake_client, _harness) = ClientTestHarness::build().finish(); // Fake the connection closing. std::mem::drop(connection_tracker); @@ -619,16 +589,7 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { connection_tracker, } = req; - let (server_tx, _server_rx) = mpsc::channel(0); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let error_slot = ErrorSlot::default(); - - let fake_client = peer::Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot, - version: Version(1), - }; + let (fake_client, _harness) = ClientTestHarness::build().finish(); // Make the connection staying open. peer_tracker_tx @@ -771,16 +732,7 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() { connection_tracker, } = req; - let (server_tx, _server_rx) = mpsc::channel(0); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let error_slot = ErrorSlot::default(); - - let fake_client = peer::Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot, - version: Version(1), - }; + let (fake_client, _harness) = ClientTestHarness::build().finish(); // Actually close the connection. std::mem::drop(connection_tracker); @@ -848,16 +800,7 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() { connection_tracker, } = req; - let (server_tx, _server_rx) = mpsc::channel(0); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let error_slot = ErrorSlot::default(); - - let fake_client = peer::Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot, - version: Version(1), - }; + let (fake_client, _harness) = ClientTestHarness::build().finish(); // Make the connection staying open. peer_tracker_tx @@ -977,16 +920,7 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() { connection_tracker, } = req; - let (server_tx, _server_rx) = mpsc::channel(0); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let error_slot = ErrorSlot::default(); - - let fake_client = peer::Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot, - version: Version(1), - }; + let (fake_client, _harness) = ClientTestHarness::build().finish(); // Actually close the connection. std::mem::drop(connection_tracker); @@ -1054,16 +988,7 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() { connection_tracker, } = req; - let (server_tx, _server_rx) = mpsc::channel(0); - let (shutdown_tx, _shutdown_rx) = oneshot::channel(); - let error_slot = ErrorSlot::default(); - - let fake_client = peer::Client { - shutdown_tx: Some(shutdown_tx), - server_tx, - error_slot, - version: Version(1), - }; + let (fake_client, _harness) = ClientTestHarness::build().finish(); // Make the connection staying open. peer_tracker_tx diff --git a/zebra-network/src/peer_set/set/tests.rs b/zebra-network/src/peer_set/set/tests.rs index f0696570f..5226ca2c7 100644 --- a/zebra-network/src/peer_set/set/tests.rs +++ b/zebra-network/src/peer_set/set/tests.rs @@ -1,9 +1,6 @@ use std::{net::SocketAddr, sync::Arc}; -use futures::{ - channel::{mpsc, oneshot}, - stream, Stream, StreamExt, -}; +use futures::{channel::mpsc, stream, Stream, StreamExt}; use proptest::{collection::vec, prelude::*}; use proptest_derive::Arbitrary; use tokio::{ @@ -25,10 +22,7 @@ use zebra_chain::{ use super::MorePeers; use crate::{ address_book::AddressMetrics, - peer::{ - CancelHeartbeatTask, Client, ClientRequest, ErrorSlot, LoadTrackedClient, - MinimumPeerVersion, - }, + peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion}, peer_set::PeerSet, protocol::external::{types::Version, InventoryHash}, AddressBook, Config, @@ -42,50 +36,6 @@ mod prop; /// This affects the maximum number of peer connections added to the [`PeerSet`] during the tests. const MAX_PEERS: usize = 20; -/// A handle to a mocked [`Client`] instance. -struct MockedClientHandle { - _request_receiver: mpsc::Receiver, - shutdown_receiver: oneshot::Receiver, - version: Version, -} - -impl MockedClientHandle { - /// Create a new mocked [`Client`] instance, returning it together with a handle to track it. - pub fn new(version: Version) -> (Self, LoadTrackedClient) { - let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - let (request_sender, _request_receiver) = mpsc::channel(1); - - let client = Client { - shutdown_tx: Some(shutdown_sender), - server_tx: request_sender, - error_slot: ErrorSlot::default(), - version, - }; - - let handle = MockedClientHandle { - _request_receiver, - shutdown_receiver, - version, - }; - - (handle, client.into()) - } - - /// Gets the peer protocol version associated to the [`Client`]. - pub fn version(&self) -> Version { - self.version - } - - /// Checks if the [`Client`] instance has not been dropped, which would have disconnected from - /// the peer. - pub fn is_connected(&mut self) -> bool { - match self.shutdown_receiver.try_recv() { - Ok(None) => true, - Ok(Some(CancelHeartbeatTask)) | Err(oneshot::Canceled) => false, - } - } -} - /// A helper type to generate arbitrary peer versions which can then become mock peer services. #[derive(Arbitrary, Debug)] struct PeerVersions { @@ -98,40 +48,44 @@ impl PeerVersions { /// /// Each peer versions results in a mock peer service, which is returned as a tuple. The first /// element is the [`LeadTrackedClient`], which is the actual service for the peer connection. - /// The second element is a [`MockedClientHandle`], which contains the open endpoints of the + /// The second element is a [`ClientTestHarness`], which contains the open endpoints of the /// mock channels used by the peer service. - pub fn mock_peers(&self) -> (Vec, Vec) { + /// + /// The clients and the harnesses are collected into separate [`Vec`] lists and returned. + pub fn mock_peers(&self) -> (Vec, Vec) { let mut clients = Vec::with_capacity(self.peer_versions.len()); - let mut handles = Vec::with_capacity(self.peer_versions.len()); + let mut harnesses = Vec::with_capacity(self.peer_versions.len()); for peer_version in &self.peer_versions { - let (handle, client) = MockedClientHandle::new(*peer_version); + let (client, harness) = ClientTestHarness::build() + .with_version(*peer_version) + .finish(); - clients.push(client); - handles.push(handle); + clients.push(client.into()); + harnesses.push(harness); } - (clients, handles) + (clients, harnesses) } /// Convert the arbitrary peer versions into mock peer services available through a /// [`Discover`] compatible stream. /// /// A tuple is returned, where the first item is a stream with the mock peers available through - /// a [`Discover`] interface, and the second is a list of handles to the mocked services. + /// a [`Discover`] interface, and the second is a list of harnesses to the mocked services. /// /// The returned stream never finishes, so it is ready to be passed to the [`PeerSet`] /// constructor. /// - /// See [`Self::mock_peers`] for details on how the peers are mocked and on what the handles + /// See [`Self::mock_peers`] for details on how the peers are mocked and on what the harnesses /// contain. pub fn mock_peer_discovery( &self, ) -> ( impl Stream, BoxError>>, - Vec, + Vec, ) { - let (clients, handles) = self.mock_peers(); + let (clients, harnesses) = self.mock_peers(); let fake_ports = 1_u16..; let discovered_peers_iterator = fake_ports.zip(clients).map(|(port, client)| { @@ -142,7 +96,7 @@ impl PeerVersions { let discovered_peers = stream::iter(discovered_peers_iterator).chain(stream::pending()); - (discovered_peers, handles) + (discovered_peers, harnesses) } } diff --git a/zebra-network/src/peer_set/set/tests/prop.rs b/zebra-network/src/peer_set/set/tests/prop.rs index 3ca138c16..949d5858d 100644 --- a/zebra-network/src/peer_set/set/tests/prop.rs +++ b/zebra-network/src/peer_set/set/tests/prop.rs @@ -6,11 +6,9 @@ use tower::{discover::Discover, BoxError, ServiceExt}; use zebra_chain::{block, chain_tip::ChainTip, parameters::Network}; -use super::{ - BlockHeightPairAcrossNetworkUpgrades, MockedClientHandle, PeerSetBuilder, PeerVersions, -}; +use super::{BlockHeightPairAcrossNetworkUpgrades, PeerSetBuilder, PeerVersions}; use crate::{ - peer::{LoadTrackedClient, MinimumPeerVersion}, + peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion}, peer_set::PeerSet, protocol::external::types::Version, }; @@ -25,7 +23,7 @@ proptest! { ) { let runtime = zebra_test::init_async(); - let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery(); + let (discovered_peers, mut harnesses) = peer_versions.mock_peer_discovery(); let (mut minimum_peer_version, best_tip_height) = MinimumPeerVersion::with_mock_chain_tip(network); @@ -43,7 +41,7 @@ proptest! { check_if_only_up_to_date_peers_are_live( &mut peer_set, - &mut handles, + &mut harnesses, current_minimum_version, )?; @@ -59,7 +57,7 @@ proptest! { ) { let runtime = zebra_test::init_async(); - let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery(); + let (discovered_peers, mut harnesses) = peer_versions.mock_peer_discovery(); let (mut minimum_peer_version, best_tip_height) = MinimumPeerVersion::with_mock_chain_tip(block_heights.network); @@ -75,7 +73,7 @@ proptest! { check_if_only_up_to_date_peers_are_live( &mut peer_set, - &mut handles, + &mut harnesses, minimum_peer_version.current(), )?; @@ -85,7 +83,7 @@ proptest! { check_if_only_up_to_date_peers_are_live( &mut peer_set, - &mut handles, + &mut harnesses, minimum_peer_version.current(), )?; @@ -97,10 +95,10 @@ proptest! { /// Check if only peers with up-to-date protocol versions are live. /// /// This will poll the `peer_set` to allow it to drop outdated peers, and then check the peer -/// `handles` to assert that only up-to-date peers are kept by the `peer_set`. +/// `harnesses` to assert that only up-to-date peers are kept by the `peer_set`. fn check_if_only_up_to_date_peers_are_live( peer_set: &mut PeerSet, - handles: &mut Vec, + harnesses: &mut Vec, minimum_version: Version, ) -> Result<(), TestCaseError> where @@ -110,9 +108,9 @@ where { // Force `poll_discover` to be called to process all discovered peers. let poll_result = peer_set.ready().now_or_never(); - let all_peers_are_outdated = handles + let all_peers_are_outdated = harnesses .iter() - .all(|handle| handle.version() < minimum_version); + .all(|harness| harness.version() < minimum_version); if all_peers_are_outdated { prop_assert!(matches!(poll_result, None)); @@ -120,9 +118,9 @@ where prop_assert!(matches!(poll_result, Some(Ok(_)))); } - for handle in handles { - let is_outdated = handle.version() < minimum_version; - let is_connected = handle.is_connected(); + for harness in harnesses { + let is_outdated = harness.version() < minimum_version; + let is_connected = harness.wants_connection_heartbeats(); prop_assert!( is_connected != is_outdated, diff --git a/zebra-test/src/lib.rs b/zebra-test/src/lib.rs index 098c57a6e..fb9d0f050 100644 --- a/zebra-test/src/lib.rs +++ b/zebra-test/src/lib.rs @@ -24,6 +24,7 @@ pub mod mock_service; pub mod net; pub mod network_addr; pub mod prelude; +pub mod service_extensions; pub mod transcript; pub mod vectors; pub mod zip0143; diff --git a/zebra-test/src/service_extensions.rs b/zebra-test/src/service_extensions.rs new file mode 100644 index 000000000..0aa9648e3 --- /dev/null +++ b/zebra-test/src/service_extensions.rs @@ -0,0 +1,48 @@ +//! Extension traits for [`Service`] types to help with testing. + +use std::task::Poll; + +use futures::future::{BoxFuture, FutureExt}; +use tower::{Service, ServiceExt}; + +/// An extension trait to check if a [`Service`] is immediately ready to be called. +pub trait IsReady: Service { + /// Poll the [`Service`] once, and return true if it is immediately ready to be called. + fn is_ready(&mut self) -> BoxFuture; + + /// Poll the [`Service`] once, and return true if it is pending. + fn is_pending(&mut self) -> BoxFuture; + + /// Poll the [`Service`] once, and return true if it has failed. + fn is_failed(&mut self) -> BoxFuture; +} + +impl IsReady for S +where + S: Service + Send, + Request: 'static, +{ + fn is_ready(&mut self) -> BoxFuture { + async move { + let ready_result = futures::poll!(self.ready()); + matches!(ready_result, Poll::Ready(Ok(_))) + } + .boxed() + } + + fn is_pending(&mut self) -> BoxFuture { + async move { + let ready_result = futures::poll!(self.ready()); + ready_result.is_pending() + } + .boxed() + } + + fn is_failed(&mut self) -> BoxFuture { + async move { + let ready_result = futures::poll!(self.ready()); + matches!(ready_result, Poll::Ready(Err(_))) + } + .boxed() + } +} diff --git a/zebrad/src/async_ext.rs b/zebrad/src/async_ext.rs deleted file mode 100644 index e316689d4..000000000 --- a/zebrad/src/async_ext.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Extensions used in [`Future`]s and async code. - -mod now_or_later; - -pub use self::now_or_later::NowOrLater; diff --git a/zebrad/src/async_ext/now_or_later.rs b/zebrad/src/async_ext/now_or_later.rs deleted file mode 100644 index 0941dba35..000000000 --- a/zebrad/src/async_ext/now_or_later.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -use pin_project::pin_project; - -/// A helper [`Future`] wrapper that will always return [`Poll::Ready`]. -/// -/// If the inner [`Future`] `F` is ready and produces an output `value`, then [`NowOrNever`] will -/// also be ready but with an output `Some(value)`. -/// -/// If the inner [`Future`] `F` is not ready, then: -/// -/// - [`NowOrNever`] will be still be ready but with an output `None`, -/// - and the task associated with the future will be scheduled to awake whenever the inner -/// [`Future`] `F` becomes ready. -/// -/// This is different from [`FutureExt::now_or_never`] because `now_or_never` uses a fake task -/// [`Context`], which means that calling `now_or_never` inside an `async` function doesn't -/// schedule the generated future to be polled again when the inner future becomes ready. -/// -/// # Examples -/// -/// ``` -/// use futures::{FutureExt, future}; -/// # use zebrad::async_ext::NowOrLater; -/// -/// let inner_future = future::ready(()); -/// -/// # let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime"); -/// # -/// # runtime.block_on(async move { -/// assert_eq!(NowOrLater(inner_future).await, Some(())); -/// # }); -/// ``` -/// -/// ``` -/// use futures::{FutureExt, future}; -/// # use zebrad::async_ext::NowOrLater; -/// -/// let inner_future = future::pending::<()>(); -/// -/// # let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime"); -/// # -/// # runtime.block_on(async move { -/// assert_eq!(NowOrLater(inner_future).await, None); -/// # }); -/// ``` -#[pin_project] -pub struct NowOrLater(#[pin] pub F); - -impl Future for NowOrLater { - type Output = Option; - - fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll { - match self.project().0.poll(context) { - Poll::Ready(value) => Poll::Ready(Some(value)), - Poll::Pending => Poll::Ready(None), - } - } -} diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index d3e74a3b5..017a6198b 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -2,7 +2,7 @@ //! //! It is used when Zebra is a long way behind the current chain tip. -use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; +use std::{collections::HashSet, pin::Pin, sync::Arc, task::Poll, time::Duration}; use color_eyre::eyre::{eyre, Report}; use futures::stream::{FuturesUnordered, StreamExt}; @@ -24,8 +24,7 @@ use zebra_state as zs; use zs::LatestChainTip; use crate::{ - async_ext::NowOrLater, components::sync::downloads::BlockDownloadVerifyError, - config::ZebradConfig, BoxError, + components::sync::downloads::BlockDownloadVerifyError, config::ZebradConfig, BoxError, }; mod downloads; @@ -345,7 +344,7 @@ where while !self.prospective_tips.is_empty() { // Check whether any block tasks are currently ready: - while let Some(Some(rsp)) = NowOrLater(self.downloads.next()).await { + while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) { match rsp { Ok(hash) => { tracing::trace!(?hash, "verified and committed block to state"); diff --git a/zebrad/src/lib.rs b/zebrad/src/lib.rs index 81df250cc..ab9ec41b8 100644 --- a/zebrad/src/lib.rs +++ b/zebrad/src/lib.rs @@ -37,7 +37,6 @@ extern crate tracing; pub type BoxError = Box; pub mod application; -pub mod async_ext; pub mod commands; pub mod components; pub mod config;