Use `MockedClientHandle` in other tests (#3241)

* Move `MockedClientHandle` to `peer` module

It's more closely related to a `Client` than the `PeerSet`, and this
prepares it to be used by other tests.

* Rename `MockedClientHandle` to `ClientTestHarness`

Reduce confusion, and clarify that the client is not mocked.

Co-authored-by: teor <teor@riseup.net>

* Add clarification to `mock_peers` documentation

Explicitly say how the generated data is returned.

* Rename method to `wants_connection_heartbeats`

The `Client` service only represents one direction of a connection, so
`is_connected` is not the exact term.

Co-authored-by: teor <teor@riseup.net>

* Mock `Client` instead of `LoadTrackedClient`

Move where the conversion from mocked `Client` to mocked
`LoadTrackedClient` in order to make the test helper more easily used by
other tests.

* Use `ClientTestHarness` in `initialize` tests

Replace the boilerplate code to create a fake `Client` instance with
usages of the `ClientTestHarness` constructor.

* Allow receiving requests from `Client` instance

Create a helper type to wrap the result, to make it easier to assert on
specific events after trying to receive a request.

* Allow inspecting the current error in the slot

Share the `ErrorSlot` between the `Client` and the handle, so that the
handle can be used to inspect the contents of the `ErrorSlot`.

* Allow placing an error into the `ErrorSlot`

Assuming it is initially empty. If it already has an error, the code
will panic.

* Allow gracefully closing the request receiver

Close the endpoint with the appropriate call to the `close()` method.

* Allow dropping the request receiver endpoint

Forcefully closes the endpoint.

* Rename field to `client_request_receiver`

Also rename the related methods to include
`outbound_client_request_receiver` to make it more precise.

Co-authored-by: teor <teor@riseup.net>

* Allow dropping the heartbeat shutdown receiver

Allows the `Client` to detect that the channel has been closed.

* Rename fn. to `drop_heartbeat_shutdown_receiver`

Make it clear that it affects the heartbeat task.

Co-authored-by: teor <teor@riseup.net>

* Move `NowOrLater` into a new `now-or-later` crate

Make it easily accessible to other crates.

* Add `IsReady` extension trait for `Service`

Simplifies checking if a service is immediately ready to be called.

* Add extension method to check for readiness error

Checks if the `Service` isn't immediately ready because a call to
`ready` immediately returns an error.

* Rename method to `is_failed`

Avoid negated method names.

Co-authored-by: teor <teor@riseup.net>

* Add a `IsReady::is_pending` extension method

Checks if a `Service` is not ready to be called.

* Use `ClientTestHarness` in `Client` test vectors

Reduce repeated code and try to improve readability.

* Create a new `ClientTestHarnessBuilder` type

A builder to create test `Client` instances using mock data which can be
tracked and manipulated through a `ClientTestHarness`.

* Allow configuring the `Client`'s mocked version

Add a `with_version` builder method.

* Use `ClientTestHarnessBuilder` in `PeerVersions`

Use the builder to set the peer version, so that the `version` parameter
can be removed from the constructor later.

* Use a default mock version where possible

Reduce noise when setting up the harness for tests that don't really
care about the remote peer version.

* Remove `Version` parameter from the `build` method

The `with_version` builder method should be used instead.

* Fix some typos and outdated info in the release checklist

* Add extra client tests for zero and multiple readiness checks (#3273)

And document existing tests.

* Replace `NowOrLater` with `futures::poll!` (#3272)

* Replace NowOrLater with the futures::poll! macro in zebrad

* Replace NowOrLater with the futures::poll! macro in zebra-test

* Remove the now-or-later crate

* remove unused imports

* rustfmt

Co-authored-by: teor <teor@riseup.net>
Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2021-12-21 17:13:26 -03:00 committed by GitHub
parent 8db0528165
commit b71833292d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 379 additions and 418 deletions

View File

@ -69,11 +69,9 @@ After you have your changes pushed start a PR with them using this template by a
You can use `fastmod` to interactively find and replace versions.
For example, for `zebra-1.0.0-alpha.12`, we did:
For example, you can do something like:
```
fastmod --extensions rs,toml,md --fixed-strings '1.0.0-alpha.12' '1.0.0-alpha.13'
fastmod --extensions rs,toml,md --fixed-strings '1.0.0-alpha.11' '1.0.0-alpha.12'
fastmod --extensions rs,toml,md --fixed-strings '1.0.0-alpha.10' '1.0.0-alpha.11'
fastmod --extensions rs,toml,md --fixed-strings '0.2.9' '0.2.10' tower-batch
fastmod --extensions rs,toml,md --fixed-strings '0.2.8' '0.2.9' tower-fallback
```

View File

@ -15,6 +15,8 @@ mod load_tracked_client;
/// Watches for chain tip height updates to determine the minimum support peer protocol version.
mod minimum_peer_version;
#[cfg(any(test, feature = "proptest-impl"))]
pub use client::tests::ClientTestHarness;
#[cfg(not(test))]
use client::ClientRequest;
#[cfg(test)]

View File

@ -21,8 +21,8 @@ use crate::{
use super::{ErrorSlot, PeerError, SharedPeerError};
#[cfg(test)]
mod tests;
#[cfg(any(test, feature = "proptest-impl"))]
pub mod tests;
/// The "client" duplex half of a peer connection.
pub struct Client {

View File

@ -1,3 +1,189 @@
//! Tests for the [`Client`] part of peer connections
//! Tests for the [`Client`] part of peer connections, and some test utilities for mocking
//! [`Client`] instances.
mod vectors;
use futures::channel::{mpsc, oneshot};
use crate::{
peer::{error::SharedPeerError, CancelHeartbeatTask, Client, ClientRequest, ErrorSlot},
protocol::external::types::Version,
};
/// A harness with mocked channels for testing a [`Client`] instance.
pub struct ClientTestHarness {
client_request_receiver: Option<mpsc::Receiver<ClientRequest>>,
shutdown_receiver: Option<oneshot::Receiver<CancelHeartbeatTask>>,
error_slot: ErrorSlot,
version: Version,
}
impl ClientTestHarness {
/// Create a [`ClientTestHarnessBuilder`] instance to help create a new [`Client`] instance
/// and a [`ClientTestHarness`] to track it.
pub fn build() -> ClientTestHarnessBuilder {
ClientTestHarnessBuilder { version: None }
}
/// Gets the peer protocol version associated to the [`Client`].
pub fn version(&self) -> Version {
self.version
}
/// Returns true if the [`Client`] instance still wants connection heartbeats to be sent.
///
/// Checks that the client:
/// - has not been dropped,
/// - has not closed or dropped the mocked heartbeat task channel, and
/// - has not asked the mocked heartbeat task to shut down.
pub fn wants_connection_heartbeats(&mut self) -> bool {
let receive_result = self
.shutdown_receiver
.as_mut()
.expect("heartbeat shutdown receiver endpoint has been dropped")
.try_recv();
match receive_result {
Ok(None) => true,
Ok(Some(CancelHeartbeatTask)) | Err(oneshot::Canceled) => false,
}
}
/// Drops the mocked heartbeat shutdown receiver endpoint.
pub fn drop_heartbeat_shutdown_receiver(&mut self) {
let _ = self
.shutdown_receiver
.take()
.expect("heartbeat shutdown receiver endpoint has already been dropped");
}
/// Closes the receiver endpoint of [`ClientRequests`] that are supposed to be sent to the
/// remote peer.
///
/// The remote peer that would receive the requests is mocked for testing.
pub fn close_outbound_client_request_receiver(&mut self) {
self.client_request_receiver
.as_mut()
.expect("request receiver endpoint has been dropped")
.close();
}
/// Drops the receiver endpoint of [`ClientRequests`], forcefully closing the channel.
///
/// The remote peer that would receive the requests is mocked for testing.
pub fn drop_outbound_client_request_receiver(&mut self) {
self.client_request_receiver
.take()
.expect("request receiver endpoint has already been dropped");
}
/// Tries to receive a [`ClientRequest`] sent by the [`Client`] instance.
///
/// The remote peer that would receive the requests is mocked for testing.
pub(crate) fn try_to_receive_outbound_client_request(&mut self) -> ReceiveRequestAttempt {
let receive_result = self
.client_request_receiver
.as_mut()
.expect("request receiver endpoint has been dropped")
.try_next();
match receive_result {
Ok(Some(request)) => ReceiveRequestAttempt::Request(request),
Ok(None) => ReceiveRequestAttempt::Closed,
Err(_) => ReceiveRequestAttempt::Empty,
}
}
/// Returns the current error in the [`ErrorSlot`], if there is one.
pub fn current_error(&self) -> Option<SharedPeerError> {
self.error_slot.try_get_error()
}
/// Sets the error in the [`ErrorSlot`], assuming there isn't one already.
///
/// # Panics
///
/// If there's already an error in the [`ErrorSlot`].
pub fn set_error(&self, error: impl Into<SharedPeerError>) {
self.error_slot
.try_update_error(error.into())
.expect("unexpected earlier error in error slot")
}
}
/// The result of an attempt to receive a [`ClientRequest`] sent by the [`Client`] instance.
///
/// The remote peer that would receive the request is mocked for testing.
pub(crate) enum ReceiveRequestAttempt {
/// The [`Client`] instance has closed the sender endpoint of the channel.
Closed,
/// There were no queued requests in the channel.
Empty,
/// One request was successfully received.
Request(ClientRequest),
}
impl ReceiveRequestAttempt {
/// Check if the attempt to receive resulted in discovering that the sender endpoint had been
/// closed.
pub fn is_closed(&self) -> bool {
matches!(self, ReceiveRequestAttempt::Closed)
}
/// Check if the attempt to receive resulted in no requests.
pub fn is_empty(&self) -> bool {
matches!(self, ReceiveRequestAttempt::Empty)
}
/// Returns the received request, if there was one.
#[allow(dead_code)]
pub fn request(self) -> Option<ClientRequest> {
match self {
ReceiveRequestAttempt::Request(request) => Some(request),
ReceiveRequestAttempt::Closed | ReceiveRequestAttempt::Empty => None,
}
}
}
/// A builder for a [`Client`] and [`ClientTestHarness`] instance.
///
/// Mocked data is used to construct a real [`Client`] instance. The mocked data is initialized by
/// the [`ClientTestHarnessBuilder`], and can be accessed and changed through the
/// [`ClientTestHarness`].
pub struct ClientTestHarnessBuilder {
version: Option<Version>,
}
impl ClientTestHarnessBuilder {
/// Configure the mocked version for the peer.
pub fn with_version(mut self, version: Version) -> Self {
self.version = Some(version);
self
}
/// Build a [`Client`] instance with the mocked data and a [`ClientTestHarness`] to track it.
pub fn finish(self) -> (Client, ClientTestHarness) {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let (client_request_sender, client_request_receiver) = mpsc::channel(1);
let error_slot = ErrorSlot::default();
let version = self.version.unwrap_or(Version(0));
let client = Client {
shutdown_tx: Some(shutdown_sender),
server_tx: client_request_sender,
error_slot: error_slot.clone(),
version,
};
let harness = ClientTestHarness {
client_request_receiver: Some(client_request_receiver),
shutdown_receiver: Some(shutdown_receiver),
error_slot,
version,
};
(client, harness)
}
}

View File

@ -1,233 +1,152 @@
//! Fixed peer [`Client`] test vectors.
use futures::{
channel::{mpsc, oneshot},
FutureExt,
};
use tower::ServiceExt;
use crate::{
peer::{CancelHeartbeatTask, Client, ErrorSlot},
protocol::external::types::Version,
PeerError,
};
use zebra_test::service_extensions::IsReady;
use crate::{peer::ClientTestHarness, PeerError};
/// Test that a newly initialized client functions correctly before it is polled.
#[tokio::test]
async fn client_service_ok_without_readiness_check() {
zebra_test::init();
let (_client, mut harness) = ClientTestHarness::build().finish();
assert!(harness.current_error().is_none());
assert!(harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_empty());
}
/// Test that a newly initialized client functions correctly after it is polled.
#[tokio::test]
async fn client_service_ready_ok() {
zebra_test::init();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let (server_tx, mut server_rx) = mpsc::channel(1);
let (mut client, mut harness) = ClientTestHarness::build().finish();
let shared_error_slot = ErrorSlot::default();
let mut client = Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot: shared_error_slot.clone(),
version: Version(0),
};
let result = client.ready().now_or_never();
assert!(matches!(result, Some(Ok(Client { .. }))));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, None));
let result = shutdown_rx.try_recv();
assert!(matches!(result, Ok(None)));
// Unlike oneshots, open futures::mpsc channels return Err when empty
let result = server_rx.try_next();
assert!(matches!(result, Err(_)));
assert!(client.is_ready().await);
assert!(harness.current_error().is_none());
assert!(harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_empty());
}
/// Test that a client functions correctly if its readiness future is dropped.
#[tokio::test]
async fn client_service_ready_drop_ok() {
zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
std::mem::drop(client.ready());
assert!(client.is_ready().await);
assert!(harness.current_error().is_none());
assert!(harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_empty());
}
/// Test that a client functions correctly if it is polled for readiness multiple times.
#[tokio::test]
async fn client_service_ready_multiple_ok() {
zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
assert!(client.is_ready().await);
assert!(client.is_ready().await);
assert!(harness.current_error().is_none());
assert!(harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_empty());
}
/// Test that clients propagate errors from their heartbeat tasks.
#[tokio::test]
async fn client_service_ready_heartbeat_exit() {
zebra_test::init();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let (server_tx, mut server_rx) = mpsc::channel(1);
let (mut client, mut harness) = ClientTestHarness::build().finish();
let shared_error_slot = ErrorSlot::default();
harness.set_error(PeerError::HeartbeatTaskExited);
harness.drop_heartbeat_shutdown_receiver();
let mut client = Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot: shared_error_slot.clone(),
version: Version(0),
};
shared_error_slot
.try_update_error(PeerError::HeartbeatTaskExited.into())
.expect("unexpected earlier error in tests");
std::mem::drop(shutdown_rx);
let result = client.ready().now_or_never();
assert!(matches!(result, Some(Err(_))));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
// Unlike oneshots, closed futures::mpsc channels return None
let result = server_rx.try_next();
assert!(matches!(result, Ok(None)));
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}
/// Test that clients propagate errors from their connection tasks.
#[tokio::test]
async fn client_service_ready_request_drop() {
zebra_test::init();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let (server_tx, server_rx) = mpsc::channel(1);
let (mut client, mut harness) = ClientTestHarness::build().finish();
let shared_error_slot = ErrorSlot::default();
harness.set_error(PeerError::ConnectionDropped);
harness.drop_outbound_client_request_receiver();
let mut client = Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot: shared_error_slot.clone(),
version: Version(0),
};
shared_error_slot
.try_update_error(PeerError::ConnectionDropped.into())
.expect("unexpected earlier error in tests");
std::mem::drop(server_rx);
let result = client.ready().now_or_never();
assert!(matches!(result, Some(Err(_))));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
let result = shutdown_rx.try_recv();
assert!(matches!(result, Ok(Some(CancelHeartbeatTask))));
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
}
/// Test that clients error when their connection task closes the request channel.
#[tokio::test]
async fn client_service_ready_request_close() {
zebra_test::init();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let (server_tx, mut server_rx) = mpsc::channel(1);
let (mut client, mut harness) = ClientTestHarness::build().finish();
let shared_error_slot = ErrorSlot::default();
harness.set_error(PeerError::ConnectionClosed);
harness.close_outbound_client_request_receiver();
let mut client = Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot: shared_error_slot.clone(),
version: Version(0),
};
shared_error_slot
.try_update_error(PeerError::ConnectionClosed.into())
.expect("unexpected earlier error in tests");
server_rx.close();
let result = client.ready().now_or_never();
assert!(matches!(result, Some(Err(_))));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
let result = shutdown_rx.try_recv();
assert!(matches!(result, Ok(Some(CancelHeartbeatTask))));
let result = server_rx.try_next();
assert!(matches!(result, Ok(None)));
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}
#[tokio::test]
async fn client_service_ready_error_in_slot() {
zebra_test::init();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let (server_tx, mut server_rx) = mpsc::channel(1);
let (mut client, mut harness) = ClientTestHarness::build().finish();
let shared_error_slot = ErrorSlot::default();
harness.set_error(PeerError::Overloaded);
let mut client = Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot: shared_error_slot.clone(),
version: Version(0),
};
shared_error_slot
.try_update_error(PeerError::Overloaded.into())
.expect("unexpected earlier error in tests");
let result = client.ready().now_or_never();
assert!(matches!(result, Some(Err(_))));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
let result = shutdown_rx.try_recv();
assert!(matches!(result, Ok(Some(CancelHeartbeatTask))));
let result = server_rx.try_next();
assert!(matches!(result, Ok(None)));
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}
/// Test that clients error when multiple error conditions occur at the same time.
#[tokio::test]
async fn client_service_ready_multiple_errors() {
zebra_test::init();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let (server_tx, mut server_rx) = mpsc::channel(1);
let (mut client, mut harness) = ClientTestHarness::build().finish();
let shared_error_slot = ErrorSlot::default();
harness.set_error(PeerError::DuplicateHandshake);
harness.drop_heartbeat_shutdown_receiver();
harness.close_outbound_client_request_receiver();
let mut client = Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot: shared_error_slot.clone(),
version: Version(0),
};
shared_error_slot
.try_update_error(PeerError::DuplicateHandshake.into())
.expect("unexpected earlier error in tests");
std::mem::drop(shutdown_rx);
server_rx.close();
let result = client.ready().now_or_never();
assert!(matches!(result, Some(Err(_))));
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
let result = server_rx.try_next();
assert!(matches!(result, Ok(None)));
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}
/// Test that clients register an error and cleanup channels correctly when the client is dropped.
#[tokio::test]
async fn client_service_drop_cleanup() {
zebra_test::init();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let (server_tx, mut server_rx) = mpsc::channel(1);
let shared_error_slot = ErrorSlot::default();
let client = Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot: shared_error_slot.clone(),
version: Version(0),
};
let (client, mut harness) = ClientTestHarness::build().finish();
std::mem::drop(client);
let error = shared_error_slot.try_get_error();
assert!(matches!(error, Some(_)));
let result = shutdown_rx.try_recv();
assert!(matches!(result, Ok(Some(CancelHeartbeatTask))));
let result = server_rx.try_next();
assert!(matches!(result, Ok(None)));
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}

View File

@ -21,10 +21,7 @@ use std::{
};
use chrono::Utc;
use futures::{
channel::{mpsc, oneshot},
FutureExt, StreamExt,
};
use futures::{channel::mpsc, FutureExt, StreamExt};
use tokio::{net::TcpStream, task::JoinHandle};
use tower::{service_fn, Service};
use tracing::Span;
@ -36,7 +33,7 @@ use crate::{
address_book_updater::AddressBookUpdater,
constants, init,
meta_addr::MetaAddr,
peer::{self, ErrorSlot, HandshakeRequest, OutboundConnectorRequest},
peer::{self, ClientTestHarness, HandshakeRequest, OutboundConnectorRequest},
peer_set::{
initialize::{
accept_inbound_connections, add_initial_peers, crawl_and_dial, open_listener,
@ -45,7 +42,7 @@ use crate::{
set::MorePeers,
ActiveConnectionCounter, CandidateSet,
},
protocol::{external::types::Version, types::PeerServices},
protocol::types::PeerServices,
AddressBook, BoxError, Config, Request, Response,
};
@ -351,16 +348,7 @@ async fn crawler_peer_limit_one_connect_ok_then_drop() {
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
version: Version(1),
};
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Fake the connection closing.
std::mem::drop(connection_tracker);
@ -424,16 +412,7 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() {
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
version: Version(1),
};
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Make the connection staying open.
peer_tracker_tx
@ -544,16 +523,7 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() {
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
version: Version(1),
};
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Fake the connection closing.
std::mem::drop(connection_tracker);
@ -619,16 +589,7 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() {
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
version: Version(1),
};
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Make the connection staying open.
peer_tracker_tx
@ -771,16 +732,7 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() {
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
version: Version(1),
};
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Actually close the connection.
std::mem::drop(connection_tracker);
@ -848,16 +800,7 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() {
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
version: Version(1),
};
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Make the connection staying open.
peer_tracker_tx
@ -977,16 +920,7 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() {
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
version: Version(1),
};
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Actually close the connection.
std::mem::drop(connection_tracker);
@ -1054,16 +988,7 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() {
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
version: Version(1),
};
let (fake_client, _harness) = ClientTestHarness::build().finish();
// Make the connection staying open.
peer_tracker_tx

View File

@ -1,9 +1,6 @@
use std::{net::SocketAddr, sync::Arc};
use futures::{
channel::{mpsc, oneshot},
stream, Stream, StreamExt,
};
use futures::{channel::mpsc, stream, Stream, StreamExt};
use proptest::{collection::vec, prelude::*};
use proptest_derive::Arbitrary;
use tokio::{
@ -25,10 +22,7 @@ use zebra_chain::{
use super::MorePeers;
use crate::{
address_book::AddressMetrics,
peer::{
CancelHeartbeatTask, Client, ClientRequest, ErrorSlot, LoadTrackedClient,
MinimumPeerVersion,
},
peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion},
peer_set::PeerSet,
protocol::external::{types::Version, InventoryHash},
AddressBook, Config,
@ -42,50 +36,6 @@ mod prop;
/// This affects the maximum number of peer connections added to the [`PeerSet`] during the tests.
const MAX_PEERS: usize = 20;
/// A handle to a mocked [`Client`] instance.
struct MockedClientHandle {
_request_receiver: mpsc::Receiver<ClientRequest>,
shutdown_receiver: oneshot::Receiver<CancelHeartbeatTask>,
version: Version,
}
impl MockedClientHandle {
/// Create a new mocked [`Client`] instance, returning it together with a handle to track it.
pub fn new(version: Version) -> (Self, LoadTrackedClient) {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let (request_sender, _request_receiver) = mpsc::channel(1);
let client = Client {
shutdown_tx: Some(shutdown_sender),
server_tx: request_sender,
error_slot: ErrorSlot::default(),
version,
};
let handle = MockedClientHandle {
_request_receiver,
shutdown_receiver,
version,
};
(handle, client.into())
}
/// Gets the peer protocol version associated to the [`Client`].
pub fn version(&self) -> Version {
self.version
}
/// Checks if the [`Client`] instance has not been dropped, which would have disconnected from
/// the peer.
pub fn is_connected(&mut self) -> bool {
match self.shutdown_receiver.try_recv() {
Ok(None) => true,
Ok(Some(CancelHeartbeatTask)) | Err(oneshot::Canceled) => false,
}
}
}
/// A helper type to generate arbitrary peer versions which can then become mock peer services.
#[derive(Arbitrary, Debug)]
struct PeerVersions {
@ -98,40 +48,44 @@ impl PeerVersions {
///
/// Each peer versions results in a mock peer service, which is returned as a tuple. The first
/// element is the [`LeadTrackedClient`], which is the actual service for the peer connection.
/// The second element is a [`MockedClientHandle`], which contains the open endpoints of the
/// The second element is a [`ClientTestHarness`], which contains the open endpoints of the
/// mock channels used by the peer service.
pub fn mock_peers(&self) -> (Vec<LoadTrackedClient>, Vec<MockedClientHandle>) {
///
/// The clients and the harnesses are collected into separate [`Vec`] lists and returned.
pub fn mock_peers(&self) -> (Vec<LoadTrackedClient>, Vec<ClientTestHarness>) {
let mut clients = Vec::with_capacity(self.peer_versions.len());
let mut handles = Vec::with_capacity(self.peer_versions.len());
let mut harnesses = Vec::with_capacity(self.peer_versions.len());
for peer_version in &self.peer_versions {
let (handle, client) = MockedClientHandle::new(*peer_version);
let (client, harness) = ClientTestHarness::build()
.with_version(*peer_version)
.finish();
clients.push(client);
handles.push(handle);
clients.push(client.into());
harnesses.push(harness);
}
(clients, handles)
(clients, harnesses)
}
/// Convert the arbitrary peer versions into mock peer services available through a
/// [`Discover`] compatible stream.
///
/// A tuple is returned, where the first item is a stream with the mock peers available through
/// a [`Discover`] interface, and the second is a list of handles to the mocked services.
/// a [`Discover`] interface, and the second is a list of harnesses to the mocked services.
///
/// The returned stream never finishes, so it is ready to be passed to the [`PeerSet`]
/// constructor.
///
/// See [`Self::mock_peers`] for details on how the peers are mocked and on what the handles
/// See [`Self::mock_peers`] for details on how the peers are mocked and on what the harnesses
/// contain.
pub fn mock_peer_discovery(
&self,
) -> (
impl Stream<Item = Result<Change<SocketAddr, LoadTrackedClient>, BoxError>>,
Vec<MockedClientHandle>,
Vec<ClientTestHarness>,
) {
let (clients, handles) = self.mock_peers();
let (clients, harnesses) = self.mock_peers();
let fake_ports = 1_u16..;
let discovered_peers_iterator = fake_ports.zip(clients).map(|(port, client)| {
@ -142,7 +96,7 @@ impl PeerVersions {
let discovered_peers = stream::iter(discovered_peers_iterator).chain(stream::pending());
(discovered_peers, handles)
(discovered_peers, harnesses)
}
}

View File

@ -6,11 +6,9 @@ use tower::{discover::Discover, BoxError, ServiceExt};
use zebra_chain::{block, chain_tip::ChainTip, parameters::Network};
use super::{
BlockHeightPairAcrossNetworkUpgrades, MockedClientHandle, PeerSetBuilder, PeerVersions,
};
use super::{BlockHeightPairAcrossNetworkUpgrades, PeerSetBuilder, PeerVersions};
use crate::{
peer::{LoadTrackedClient, MinimumPeerVersion},
peer::{ClientTestHarness, LoadTrackedClient, MinimumPeerVersion},
peer_set::PeerSet,
protocol::external::types::Version,
};
@ -25,7 +23,7 @@ proptest! {
) {
let runtime = zebra_test::init_async();
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (discovered_peers, mut harnesses) = peer_versions.mock_peer_discovery();
let (mut minimum_peer_version, best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(network);
@ -43,7 +41,7 @@ proptest! {
check_if_only_up_to_date_peers_are_live(
&mut peer_set,
&mut handles,
&mut harnesses,
current_minimum_version,
)?;
@ -59,7 +57,7 @@ proptest! {
) {
let runtime = zebra_test::init_async();
let (discovered_peers, mut handles) = peer_versions.mock_peer_discovery();
let (discovered_peers, mut harnesses) = peer_versions.mock_peer_discovery();
let (mut minimum_peer_version, best_tip_height) =
MinimumPeerVersion::with_mock_chain_tip(block_heights.network);
@ -75,7 +73,7 @@ proptest! {
check_if_only_up_to_date_peers_are_live(
&mut peer_set,
&mut handles,
&mut harnesses,
minimum_peer_version.current(),
)?;
@ -85,7 +83,7 @@ proptest! {
check_if_only_up_to_date_peers_are_live(
&mut peer_set,
&mut handles,
&mut harnesses,
minimum_peer_version.current(),
)?;
@ -97,10 +95,10 @@ proptest! {
/// Check if only peers with up-to-date protocol versions are live.
///
/// This will poll the `peer_set` to allow it to drop outdated peers, and then check the peer
/// `handles` to assert that only up-to-date peers are kept by the `peer_set`.
/// `harnesses` to assert that only up-to-date peers are kept by the `peer_set`.
fn check_if_only_up_to_date_peers_are_live<D, C>(
peer_set: &mut PeerSet<D, C>,
handles: &mut Vec<MockedClientHandle>,
harnesses: &mut Vec<ClientTestHarness>,
minimum_version: Version,
) -> Result<(), TestCaseError>
where
@ -110,9 +108,9 @@ where
{
// Force `poll_discover` to be called to process all discovered peers.
let poll_result = peer_set.ready().now_or_never();
let all_peers_are_outdated = handles
let all_peers_are_outdated = harnesses
.iter()
.all(|handle| handle.version() < minimum_version);
.all(|harness| harness.version() < minimum_version);
if all_peers_are_outdated {
prop_assert!(matches!(poll_result, None));
@ -120,9 +118,9 @@ where
prop_assert!(matches!(poll_result, Some(Ok(_))));
}
for handle in handles {
let is_outdated = handle.version() < minimum_version;
let is_connected = handle.is_connected();
for harness in harnesses {
let is_outdated = harness.version() < minimum_version;
let is_connected = harness.wants_connection_heartbeats();
prop_assert!(
is_connected != is_outdated,

View File

@ -24,6 +24,7 @@ pub mod mock_service;
pub mod net;
pub mod network_addr;
pub mod prelude;
pub mod service_extensions;
pub mod transcript;
pub mod vectors;
pub mod zip0143;

View File

@ -0,0 +1,48 @@
//! Extension traits for [`Service`] types to help with testing.
use std::task::Poll;
use futures::future::{BoxFuture, FutureExt};
use tower::{Service, ServiceExt};
/// An extension trait to check if a [`Service`] is immediately ready to be called.
pub trait IsReady<Request>: Service<Request> {
/// Poll the [`Service`] once, and return true if it is immediately ready to be called.
fn is_ready(&mut self) -> BoxFuture<bool>;
/// Poll the [`Service`] once, and return true if it is pending.
fn is_pending(&mut self) -> BoxFuture<bool>;
/// Poll the [`Service`] once, and return true if it has failed.
fn is_failed(&mut self) -> BoxFuture<bool>;
}
impl<S, Request> IsReady<Request> for S
where
S: Service<Request> + Send,
Request: 'static,
{
fn is_ready(&mut self) -> BoxFuture<bool> {
async move {
let ready_result = futures::poll!(self.ready());
matches!(ready_result, Poll::Ready(Ok(_)))
}
.boxed()
}
fn is_pending(&mut self) -> BoxFuture<bool> {
async move {
let ready_result = futures::poll!(self.ready());
ready_result.is_pending()
}
.boxed()
}
fn is_failed(&mut self) -> BoxFuture<bool> {
async move {
let ready_result = futures::poll!(self.ready());
matches!(ready_result, Poll::Ready(Err(_)))
}
.boxed()
}
}

View File

@ -1,5 +0,0 @@
//! Extensions used in [`Future`]s and async code.
mod now_or_later;
pub use self::now_or_later::NowOrLater;

View File

@ -1,63 +0,0 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use pin_project::pin_project;
/// A helper [`Future`] wrapper that will always return [`Poll::Ready`].
///
/// If the inner [`Future`] `F` is ready and produces an output `value`, then [`NowOrNever`] will
/// also be ready but with an output `Some(value)`.
///
/// If the inner [`Future`] `F` is not ready, then:
///
/// - [`NowOrNever`] will be still be ready but with an output `None`,
/// - and the task associated with the future will be scheduled to awake whenever the inner
/// [`Future`] `F` becomes ready.
///
/// This is different from [`FutureExt::now_or_never`] because `now_or_never` uses a fake task
/// [`Context`], which means that calling `now_or_never` inside an `async` function doesn't
/// schedule the generated future to be polled again when the inner future becomes ready.
///
/// # Examples
///
/// ```
/// use futures::{FutureExt, future};
/// # use zebrad::async_ext::NowOrLater;
///
/// let inner_future = future::ready(());
///
/// # let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
/// #
/// # runtime.block_on(async move {
/// assert_eq!(NowOrLater(inner_future).await, Some(()));
/// # });
/// ```
///
/// ```
/// use futures::{FutureExt, future};
/// # use zebrad::async_ext::NowOrLater;
///
/// let inner_future = future::pending::<()>();
///
/// # let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
/// #
/// # runtime.block_on(async move {
/// assert_eq!(NowOrLater(inner_future).await, None);
/// # });
/// ```
#[pin_project]
pub struct NowOrLater<F>(#[pin] pub F);
impl<F: Future> Future for NowOrLater<F> {
type Output = Option<F::Output>;
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().0.poll(context) {
Poll::Ready(value) => Poll::Ready(Some(value)),
Poll::Pending => Poll::Ready(None),
}
}
}

View File

@ -2,7 +2,7 @@
//!
//! It is used when Zebra is a long way behind the current chain tip.
use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
use std::{collections::HashSet, pin::Pin, sync::Arc, task::Poll, time::Duration};
use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt};
@ -24,8 +24,7 @@ use zebra_state as zs;
use zs::LatestChainTip;
use crate::{
async_ext::NowOrLater, components::sync::downloads::BlockDownloadVerifyError,
config::ZebradConfig, BoxError,
components::sync::downloads::BlockDownloadVerifyError, config::ZebradConfig, BoxError,
};
mod downloads;
@ -345,7 +344,7 @@ where
while !self.prospective_tips.is_empty() {
// Check whether any block tasks are currently ready:
while let Some(Some(rsp)) = NowOrLater(self.downloads.next()).await {
while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
match rsp {
Ok(hash) => {
tracing::trace!(?hash, "verified and committed block to state");

View File

@ -37,7 +37,6 @@ extern crate tracing;
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub mod application;
pub mod async_ext;
pub mod commands;
pub mod components;
pub mod config;