Create a helper `MockService` type to help with writing tests that use mock `tower::Service`s (#2748)

* Implement initial service mocking helpers

Adds a [`MockService`] type, which can be configured and built for usage
in unit tests or proptests. The mocked service can then be used to
intercept requests and respond indivdiually to them.

* Use `MockService in the `mempool::Crawler` test

Refactor it to remove the helper mock function, and use the new
`MockService` helper type.

* Use `MockService` in `CandidateSet` test vectors

Refactor to remove the manual mocking of the peer set service.

* Panic if a response is not sent by `MockService`

Change the current semantics to require all `MockService` usages to
respond to every intercepted request.

A `must_use` attribute was added to the `ResponseSender` so that the
compiler can warn when this doesn't happen.

* Allow generic error types in `MockService`

Replace the hard-coded `BoxError` as the `Service`'s error type with a
generic type parameter. This allows mocking services in locations that
require specific error types.

* Add a `ResponseSender::request` getter

Allow inspecting the request again before responding, and using
information from the request in the response.

Co-authored-by: Conrado Gouvea <conrado@zfnd.org>
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2021-09-21 14:44:59 -03:00 committed by GitHub
parent 061ad55144
commit b714b2b3b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 798 additions and 83 deletions

View File

@ -1,7 +1,5 @@
use std::{
collections::VecDeque,
convert::TryInto,
iter,
net::{IpAddr, SocketAddr},
str::FromStr,
sync::Arc,
@ -9,16 +7,14 @@ use std::{
};
use chrono::{DateTime, Duration, Utc};
use futures::future;
use tokio::{
runtime::Runtime,
sync::watch,
time::{self, Instant},
};
use tower::Service;
use tracing::Span;
use zebra_chain::serialization::DateTime32;
use zebra_test::mock_service::{MockService, PanicAssertion};
use super::super::{validate_addrs, CandidateSet};
use crate::{
@ -146,9 +142,11 @@ fn candidate_set_updates_are_rate_limited() {
let _guard = runtime.enter();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let (peer_service, call_count) = mock_peer_service();
let mut candidate_set =
CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
let mut peer_service = MockService::build().for_unit_tests();
let mut candidate_set = CandidateSet::new(
Arc::new(std::sync::Mutex::new(address_book)),
peer_service.clone(),
);
runtime.block_on(async move {
time::pause();
@ -156,6 +154,7 @@ fn candidate_set_updates_are_rate_limited() {
let time_limit = Instant::now()
+ INTERVALS_TO_RUN * MIN_PEER_GET_ADDR_INTERVAL
+ StdDuration::from_secs(1);
let mut next_allowed_request_time = Instant::now();
while Instant::now() <= time_limit {
candidate_set
@ -163,13 +162,16 @@ fn candidate_set_updates_are_rate_limited() {
.await
.expect("Call to CandidateSet::update should not fail");
if Instant::now() >= next_allowed_request_time {
verify_fanned_out_requests(&mut peer_service).await;
next_allowed_request_time = Instant::now() + MIN_PEER_GET_ADDR_INTERVAL;
} else {
peer_service.expect_no_requests().await;
}
time::advance(MIN_PEER_GET_ADDR_INTERVAL / POLL_FREQUENCY_FACTOR).await;
}
assert_eq!(
*call_count.borrow(),
INTERVALS_TO_RUN as usize * GET_ADDR_FANOUT
);
});
}
@ -181,9 +183,11 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
let _guard = runtime.enter();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let (peer_service, call_count) = mock_peer_service();
let mut candidate_set =
CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
let mut peer_service = MockService::build().for_unit_tests();
let mut candidate_set = CandidateSet::new(
Arc::new(std::sync::Mutex::new(address_book)),
peer_service.clone(),
);
runtime.block_on(async move {
time::pause();
@ -194,7 +198,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
.await
.expect("Call to CandidateSet::update should not fail");
assert_eq!(*call_count.borrow(), GET_ADDR_FANOUT);
verify_fanned_out_requests(&mut peer_service).await;
// The following two calls to `update` should be skipped
candidate_set
@ -207,7 +211,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
.await
.expect("Call to CandidateSet::update should not fail");
assert_eq!(*call_count.borrow(), GET_ADDR_FANOUT);
peer_service.expect_no_requests().await;
// After waiting for at least the minimum interval the call to `update` should succeed
time::advance(MIN_PEER_GET_ADDR_INTERVAL).await;
@ -216,7 +220,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
.await
.expect("Call to CandidateSet::update should not fail");
assert_eq!(*call_count.borrow(), 2 * GET_ADDR_FANOUT);
verify_fanned_out_requests(&mut peer_service).await;
});
}
@ -243,49 +247,21 @@ fn mock_gossiped_peers(last_seen_times: impl IntoIterator<Item = DateTime<Utc>>)
.collect()
}
/// Create a mock `PeerSet` service that checks that requests to it are rate limited.
/// Verify that a batch of fanned out requests are sent by the candidate set.
///
/// The function also returns a call count watcher, that can be used for checking how many times the
/// service was called.
fn mock_peer_service<E>() -> (
impl Service<
Request,
Response = Response,
Future = future::Ready<Result<Response, E>>,
Error = E,
> + 'static,
watch::Receiver<usize>,
/// # Panics
///
/// This will panic (causing the test to fail) if more or less requests are received than the
/// expected [`GET_ADDR_FANOUT`] amount.
async fn verify_fanned_out_requests(
peer_service: &mut MockService<Request, Response, PanicAssertion>,
) {
let rate_limit_interval = MIN_PEER_GET_ADDR_INTERVAL;
for _ in 0..GET_ADDR_FANOUT {
peer_service
.expect_request_that(|request| matches!(request, Request::Peers))
.await
.respond(Response::Peers(vec![]));
}
let mut call_counter = 0;
let (call_count_sender, call_count_receiver) = watch::channel(call_counter);
let mut peer_request_tracker: VecDeque<_> =
iter::repeat(Instant::now()).take(GET_ADDR_FANOUT).collect();
let service = tower::service_fn(move |request| {
match request {
Request::Peers => {
// Get time from queue that the request is authorized to be sent
let authorized_request_time = peer_request_tracker
.pop_front()
.expect("peer_request_tracker should always have GET_ADDR_FANOUT elements");
// Check that the request was rate limited
assert!(Instant::now() >= authorized_request_time);
// Push a new authorization, updated by the rate limit interval
peer_request_tracker.push_back(Instant::now() + rate_limit_interval);
// Increment count of calls
call_counter += 1;
let _ = call_count_sender.send(call_counter);
// Return an empty list of peer addresses
future::ok(Response::Peers(vec![]))
}
_ => unreachable!("Received an unexpected internal message: {:?}", request),
}
});
(service, call_count_receiver)
peer_service.expect_no_requests().await;
}

View File

@ -20,6 +20,7 @@ use std::sync::Once;
#[allow(missing_docs)]
pub mod command;
pub mod mock_service;
pub mod net;
pub mod prelude;
pub mod transcript;

View File

@ -0,0 +1,743 @@
//! Some helpers to make it simpler to mock Tower services.
//!
//! A [`MockService`] is a generic [`tower::Service`] implementation that allows intercepting
//! requests, responding to them individually, and checking that there are no requests to be
//! received (at least during a period of time). The [`MockService`] can be built for proptests or
//! for normal Rust unit tests.
//!
//! # Example
//!
//! ```
//! use zebra_test::mock_service::MockService;
//! # use tower::ServiceExt;
//!
//! # let reactor = tokio::runtime::Builder::new_current_thread()
//! # .enable_all()
//! # .build()
//! # .expect("Failed to build Tokio runtime");
//! #
//! # reactor.block_on(async {
//! let mut mock_service = MockService::build().for_unit_tests();
//! let mut service = mock_service.clone();
//! #
//! # // Add types to satisfy the compiler's type inference for the `Error` type.
//! # let _typed_mock_service: MockService<_, _, _> = mock_service.clone();
//!
//! let call = tokio::spawn(mock_service.clone().oneshot("hello"));
//!
//! mock_service
//! .expect_request("hello").await
//! .respond("hi!");
//!
//! mock_service.expect_no_requests().await;
//!
//! let response = call
//! .await
//! .expect("Failed to run call on the background")
//! .expect("Failed to receive response from service");
//!
//! assert_eq!(response, "hi!");
//! # });
//! ```
use std::{
fmt::Debug,
marker::PhantomData,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use futures::{future::BoxFuture, FutureExt};
use proptest::prelude::*;
use tokio::{
sync::{
broadcast::{self, error::RecvError},
oneshot, Mutex,
},
time::timeout,
};
use tower::{BoxError, Service};
/// The default size of the channel that forwards received requests.
///
/// If requests are received faster than the test code can consume them, some requests may be
/// ignored.
///
/// This value can be configured in the [`MockService`] using
/// [`MockServiceBuilder::with_proxy_channel_size`].
const DEFAULT_PROXY_CHANNEL_SIZE: usize = 100;
/// The default timeout before considering a request has not been received.
///
/// This is the time that the mocked service waits before considering a request will not be
/// received. It can be configured in the [`MockService`] using
/// [`MockServiceBuilder::with_max_request_delay`].
///
/// Note that if a test checks that no requests are received, each check has to wait for this
/// amount of time, so this may affect the test execution time.
const DEFAULT_MAX_REQUEST_DELAY: Duration = Duration::from_millis(25);
/// An internal type representing the item that's sent in the [`broadcast`] channel.
///
/// The actual type that matters is the [`ResponseSender`] but since there could be more than one
/// [`MockService`] verifying requests, the type must be wrapped so that it can be shared by all
/// receivers:
///
/// - The [`Arc`] makes sure the instance is on the heap, and can be shared properly between
/// threads and dropped when no longer needed.
/// - The [`Mutex`] ensures only one [`MockService`] instance can reply to the received request.
/// - The [`Option`] forces the [`MockService`] that handles the request to take ownership of it
/// because sending a response also forces the [`ResponseSender`] to be dropped.
type ProxyItem<Request, Response, Error> =
Arc<Mutex<Option<ResponseSender<Request, Response, Error>>>>;
/// A service implementation that allows intercepting requests for checking them.
///
/// The type is generic over the request and response types, and also has an extra generic type
/// parameter that's used as a tag to determine if the internal assertions should panic or return
/// errors for proptest minimization. See [`AssertionType`] for more information.
///
/// The mock service can be cloned, and provides methods for checking the received requests as well
/// as responding to them individually.
///
/// Internally, the instance that's operating as the service will forward requests to a
/// [`broadcast`] channel that the other instances listen to.
///
/// See the [module-level documentation][`super::mock_service`] for an example.
pub struct MockService<Request, Response, Assertion, Error = BoxError> {
receiver: broadcast::Receiver<ProxyItem<Request, Response, Error>>,
sender: broadcast::Sender<ProxyItem<Request, Response, Error>>,
max_request_delay: Duration,
_assertion_type: PhantomData<Assertion>,
}
/// A builder type to create a [`MockService`].
///
/// Allows changing specific parameters used by the [`MockService`], if necessary. The default
/// parameters should be reasonable for most cases.
#[derive(Default)]
pub struct MockServiceBuilder {
proxy_channel_size: Option<usize>,
max_request_delay: Option<Duration>,
}
/// A helper type for responding to incoming requests.
///
/// An instance of this type is created for each request received by the [`MockService`]. It
/// contains the received request and a [`oneshot::Sender`] that can be used to respond to the
/// request.
///
/// If a response is not sent, the channel is closed and a [`BoxError`] is returned by the service
/// to the caller that sent the request.
#[must_use = "Tests may fail if a response is not sent back to the caller"]
pub struct ResponseSender<Request, Response, Error> {
request: Request,
response_sender: oneshot::Sender<Result<Response, Error>>,
}
/// The [`tower::Service`] implementation of the [`MockService`].
///
/// The [`MockService`] is always ready, and it intercepts the requests wrapping them in a
/// [`ResponseSender`] which can be used to send a response.
impl<Request, Response, Assertion, Error> Service<Request>
for MockService<Request, Response, Assertion, Error>
where
Response: Send + 'static,
Error: Send + 'static,
{
type Response = Response;
type Error = Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _context: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, request: Request) -> Self::Future {
let (response_sender, response_receiver) = ResponseSender::new(request);
let proxy_item = Arc::new(Mutex::new(Some(response_sender)));
let _ = self.sender.send(proxy_item);
response_receiver
.map(|response| {
response.expect("A response was not sent by the `MockService` for a request")
})
.boxed()
}
}
/// An entry point for starting the [`MockServiceBuilder`].
///
/// This `impl` block exists for ergonomic reasons. The generic type paramaters don't matter,
/// because they are actually set by [`MockServiceBuilder::finish`].
impl MockService<(), (), ()> {
/// Create a [`MockServiceBuilder`] to help with the creation of a [`MockService`].
pub fn build() -> MockServiceBuilder {
MockServiceBuilder::default()
}
}
impl MockServiceBuilder {
/// Configure the size of the proxy channel used for sending intercepted requests.
///
/// This determines the maximum amount of requests that are kept in queue before the oldest
/// request is dropped. This means that any tests that receive too many requests might ignore
/// some requests if this parameter isn't properly configured.
///
/// The default value of 100 should be enough for most cases.
///
/// # Example
///
/// ```
/// # use zebra_test::mock_service::MockService;
/// #
/// let mock_service = MockService::build()
/// .with_proxy_channel_size(100)
/// .for_prop_tests();
/// #
/// # // Add types to satisfy the compiler's type inference.
/// # let typed_mock_service: MockService<(), (), _> = mock_service;
/// ```
pub fn with_proxy_channel_size(mut self, size: usize) -> Self {
self.proxy_channel_size = Some(size);
self
}
/// Configure the time to wait for a request before considering no requests will be received.
///
/// This determines the maximum amount of time that the [`MockService`] will wait for a request
/// to be received before considering that a request will not be received.
///
/// The default value of 25 ms should be enough for most cases.
///
/// # Example
///
/// ```
/// # use std::time::Duration;
/// #
/// # use zebra_test::mock_service::MockService;
/// #
/// let mock_service = MockService::build()
/// .with_max_request_delay(Duration::from_millis(25))
/// .for_unit_tests();
/// #
/// # // Add types to satisfy the compiler's type inference.
/// # let typed_mock_service: MockService<(), (), _> = mock_service;
/// ```
pub fn with_max_request_delay(mut self, max_request_delay: Duration) -> Self {
self.max_request_delay = Some(max_request_delay);
self
}
/// Create a [`MockService`] to be used in `proptest`s.
///
/// The assertions performed by [`MockService`] use the macros provided by [`proptest`], like
/// [`prop_assert`].
pub fn for_prop_tests<Request, Response, Error>(
self,
) -> MockService<Request, Response, PropTestAssertion, Error> {
self.finish()
}
/// Create a [`MockService`] to be used in Rust unit tests.
///
/// The assertions performed by [`MockService`] use the macros provided by default in Rust,
/// like [`assert`].
pub fn for_unit_tests<Request, Response, Error>(
self,
) -> MockService<Request, Response, PanicAssertion, Error> {
self.finish()
}
/// An internal helper method to create the actual [`MockService`].
///
/// Note that this is used by both [`Self::for_prop_tests`] and [`Self::for_unit_tests`], the
/// only difference being the `Assertion` generic type parameter, which Rust infers
/// automatically.
fn finish<Request, Response, Assertion, Error>(
self,
) -> MockService<Request, Response, Assertion, Error> {
let proxy_channel_size = self
.proxy_channel_size
.unwrap_or(DEFAULT_PROXY_CHANNEL_SIZE);
let (sender, receiver) = broadcast::channel(proxy_channel_size);
MockService {
receiver,
sender,
max_request_delay: self.max_request_delay.unwrap_or(DEFAULT_MAX_REQUEST_DELAY),
_assertion_type: PhantomData,
}
}
}
/// Implementation of [`MockService`] methods that use standard Rust panicking assertions.
impl<Request, Response, Error> MockService<Request, Response, PanicAssertion, Error> {
/// Expect a specific request to be received.
///
/// The expected request should be the next one in the internal queue, or if the queue is
/// empty, it should be received in at most the max delay time configured by
/// [`MockServiceBuilder::with_max_request_delay`].
///
/// If the received request matches the expected request, a [`ResponseSender`] is returned
/// which can be used to inspect the request and respond to it. If no response is sent, the
/// sender of the requests receives an error.
///
/// # Panics
///
/// If no request is received or if a request is received that's not equal to the expected
/// request, this method panics.
///
/// # Example
///
/// ```
/// # use zebra_test::mock_service::MockService;
/// # use tower::ServiceExt;
/// #
/// # let reactor = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .expect("Failed to build Tokio runtime");
/// #
/// # reactor.block_on(async {
/// # let mut mock_service: MockService<_, _, _> = MockService::build().for_unit_tests();
/// # let mut service = mock_service.clone();
/// #
/// let call = tokio::spawn(mock_service.clone().oneshot("request"));
///
/// mock_service.expect_request("request").await.respond("response");
///
/// assert!(matches!(call.await, Ok(Ok("response"))));
/// # });
/// ```
pub async fn expect_request(
&mut self,
expected: Request,
) -> ResponseSender<Request, Response, Error>
where
Request: PartialEq + Debug,
{
let response_sender = self.next_request().await;
assert_eq!(response_sender.request, expected);
response_sender
}
/// Expect a request to be received that matches a specified condition.
///
/// There should be a request already in the internal queue, or a request should be received in
/// at most the max delay time configured by [`MockServiceBuilder::with_max_request_delay`].
///
/// The received request is passed to the `condition` function, which should return `true` if
/// it matches the expected condition or `false` otherwise. If `true` is returned, a
/// [`ResponseSender`] is returned which can be used to inspect the request again and respond
/// to it. If no response is sent, the sender of the requests receives an error.
///
/// # Panics
///
/// If the `condition` function returns `false`, this method panics.
///
/// # Example
///
/// ```
/// # use zebra_test::mock_service::MockService;
/// # use tower::ServiceExt;
/// #
/// # let reactor = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .expect("Failed to build Tokio runtime");
/// #
/// # reactor.block_on(async {
/// # let mut mock_service: MockService<_, _, _> = MockService::build().for_unit_tests();
/// # let mut service = mock_service.clone();
/// #
/// let call = tokio::spawn(mock_service.clone().oneshot(1));
///
/// mock_service.expect_request_that(|request| *request > 0).await.respond("response");
///
/// assert!(matches!(call.await, Ok(Ok("response"))));
/// # });
/// ```
pub async fn expect_request_that(
&mut self,
condition: impl FnOnce(&Request) -> bool,
) -> ResponseSender<Request, Response, Error> {
let response_sender = self.next_request().await;
assert!(condition(&response_sender.request));
response_sender
}
/// Expect no requests to be received.
///
/// The internal queue of received requests should be empty, and no new requests should arrive
/// for the max delay time configured by [`MockServiceBuilder::with_max_request_delay`].
///
/// # Panics
///
/// If the queue is not empty or if a request is received before the max request delay timeout
/// expires.
///
/// # Example
///
/// ```
/// # use zebra_test::mock_service::MockService;
/// # use tower::ServiceExt;
/// #
/// # let reactor = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .expect("Failed to build Tokio runtime");
/// #
/// # reactor.block_on(async {
/// # let mut mock_service: MockService<(), (), _> = MockService::build().for_unit_tests();
/// #
/// mock_service.expect_no_requests().await;
/// # });
/// ```
pub async fn expect_no_requests(&mut self)
where
Request: Debug,
{
if let Some(response_sender) = self.try_next_request().await {
panic!(
"Received an unexpected request: {:?}",
response_sender.request
);
}
}
/// A helper method to get the next request from the queue.
///
/// Returns the next request in the internal queue or waits at most the max delay time
/// configured by [`MockServiceBuilder::with_max_request_delay`] for a new request to be
/// received, and then returns that.
///
/// # Panics
///
/// If the queue is empty and a request is not received before the max request delay timeout
/// expires.
async fn next_request(&mut self) -> ResponseSender<Request, Response, Error> {
match self.try_next_request().await {
Some(request) => request,
None => panic!("Timeout while waiting for a request"),
}
}
}
/// Implementation of [`MockService`] methods that use [`proptest`] assertions.
impl<Request, Response, Error> MockService<Request, Response, PropTestAssertion, Error> {
/// Expect a specific request to be received.
///
/// The expected request should be the next one in the internal queue, or if the queue is
/// empty, it should be received in at most the max delay time configured by
/// [`MockServiceBuilder::with_max_request_delay`].
///
/// If the received request matches the expected request, a [`ResponseSender`] is returned
/// which can be used to inspect the request and respond to it. If no response is sent, the
/// sender of the requests receives an error.
///
/// If no request is received or if a request is received that's not equal to the expected
/// request, this method returns an error generated by a [`proptest`] assertion.
///
/// # Example
///
/// ```
/// # use proptest::prelude::*;
/// # use tower::ServiceExt;
/// #
/// # use zebra_test::mock_service::MockService;
/// #
/// # let reactor = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .expect("Failed to build Tokio runtime");
/// #
/// # reactor.block_on(async {
/// # let test_code = || async {
/// # let mut mock_service: MockService<_, _, _> =
/// # MockService::build().for_prop_tests();
/// # let mut service = mock_service.clone();
/// #
/// let call = tokio::spawn(mock_service.clone().oneshot("request"));
///
/// // NOTE: The try operator `?` is required for errors to be handled by proptest.
/// mock_service
/// .expect_request("request").await?
/// .respond("response");
///
/// prop_assert!(matches!(call.await, Ok(Ok("response"))));
/// #
/// # Ok::<(), TestCaseError>(())
/// # };
/// # test_code().await
/// # }).unwrap();
/// ```
pub async fn expect_request(
&mut self,
expected: Request,
) -> Result<ResponseSender<Request, Response, Error>, TestCaseError>
where
Request: PartialEq + Debug,
{
let response_sender = self.next_request().await?;
prop_assert_eq!(&response_sender.request, &expected);
Ok(response_sender)
}
/// Expect a request to be received that matches a specified condition.
///
/// There should be a request already in the internal queue, or a request should be received in
/// at most the max delay time configured by [`MockServiceBuilder::with_max_request_delay`].
///
/// The received request is passed to the `condition` function, which should return `true` if
/// it matches the expected condition or `false` otherwise. If `true` is returned, a
/// [`ResponseSender`] is returned which can be used to inspect the request again and respond
/// to it. If no response is sent, the sender of the requests receives an error.
///
/// If the `condition` function returns `false`, this method returns an error generated by a
/// [`proptest`] assertion.
///
/// # Example
///
/// ```
/// # use proptest::prelude::*;
/// # use tower::ServiceExt;
/// #
/// # use zebra_test::mock_service::MockService;
/// #
/// # let reactor = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .expect("Failed to build Tokio runtime");
/// #
/// # reactor.block_on(async {
/// # let test_code = || async {
/// # let mut mock_service: MockService<_, _, _> =
/// # MockService::build().for_prop_tests();
/// # let mut service = mock_service.clone();
/// #
/// let call = tokio::spawn(mock_service.clone().oneshot(1));
///
/// // NOTE: The try operator `?` is required for errors to be handled by proptest.
/// mock_service
/// .expect_request_that(|request| *request > 0).await?
/// .respond("OK");
///
/// prop_assert!(matches!(call.await, Ok(Ok("OK"))));
/// #
/// # Ok::<(), TestCaseError>(())
/// # };
/// # test_code().await
/// # }).unwrap();
/// ```
pub async fn expect_request_that(
&mut self,
condition: impl FnOnce(&Request) -> bool,
) -> Result<ResponseSender<Request, Response, Error>, TestCaseError> {
let response_sender = self.next_request().await?;
prop_assert!(condition(&response_sender.request));
Ok(response_sender)
}
/// Expect no requests to be received.
///
/// The internal queue of received requests should be empty, and no new requests should arrive
/// for the max delay time configured by [`MockServiceBuilder::with_max_request_delay`].
///
/// If the queue is not empty or if a request is received before the max request delay timeout
/// expires, an error generated by a [`proptest`] assertion is returned.
///
/// # Example
///
/// ```
/// # use proptest::prelude::TestCaseError;
/// # use tower::ServiceExt;
/// #
/// # use zebra_test::mock_service::MockService;
/// #
/// # let reactor = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .expect("Failed to build Tokio runtime");
/// #
/// # reactor.block_on(async {
/// # let test_code = || async {
/// # let mut mock_service: MockService<(), (), _> =
/// # MockService::build().for_prop_tests();
/// #
/// // NOTE: The try operator `?` is required for errors to be handled by proptest.
/// mock_service.expect_no_requests().await?;
/// #
/// # Ok::<(), TestCaseError>(())
/// # };
/// # test_code().await
/// # }).unwrap();
/// ```
pub async fn expect_no_requests(&mut self) -> Result<(), TestCaseError>
where
Request: Debug,
{
match self.try_next_request().await {
Some(response_sender) => {
prop_assert!(
false,
"Received an unexpected request: {:?}",
response_sender.request
);
unreachable!("prop_assert!(false) returns an early error");
}
None => Ok(()),
}
}
/// A helper method to get the next request from the queue.
///
/// Returns the next request in the internal queue or waits at most the max delay time
/// configured by [`MockServiceBuilder::with_max_request_delay`] for a new request to be
/// received, and then returns that.
///
/// If the queue is empty and a request is not received before the max request delay timeout
/// expires, an error generated by a [`proptest`] assertion is returned.
async fn next_request(
&mut self,
) -> Result<ResponseSender<Request, Response, Error>, TestCaseError> {
match self.try_next_request().await {
Some(request) => Ok(request),
None => {
prop_assert!(false, "Timeout while waiting for a request");
unreachable!("prop_assert!(false) returns an early error");
}
}
}
}
/// Code that is independent of the assertions used in [`MockService`].
impl<Request, Response, Assertion, Error> MockService<Request, Response, Assertion, Error> {
/// Try to get the next request received.
///
/// Returns the next element in the queue. If the queue is empty, waits at most the max request
/// delay configured by [`MockServiceBuilder::with_max_request_delay`] for a request, and
/// returns it.
///
/// If no request is received, returns `None`.
///
/// If too many requests are received and the queue fills up, the oldest requests are dropped
/// and ignored. This means that calling this may not receive the next request if the queue is
/// not dimensioned properly with the [`MockServiceBuilder::with_proxy_channel_size`] method.
async fn try_next_request(&mut self) -> Option<ResponseSender<Request, Response, Error>> {
loop {
match timeout(self.max_request_delay, self.receiver.recv()).await {
Ok(Ok(item)) => {
if let Some(proxy_item) = item.lock().await.take() {
return Some(proxy_item);
}
}
Ok(Err(RecvError::Lagged(_))) => continue,
Ok(Err(RecvError::Closed)) => unreachable!("Sender is never closed"),
Err(_timeout) => return None,
}
}
}
}
impl<Request, Response, Assertion, Error> Clone
for MockService<Request, Response, Assertion, Error>
{
/// Clones the [`MockService`].
///
/// This is a cheap operation, because it simply clones the [`broadcast`] channel endpoints.
fn clone(&self) -> Self {
MockService {
receiver: self.sender.subscribe(),
sender: self.sender.clone(),
max_request_delay: self.max_request_delay,
_assertion_type: PhantomData,
}
}
}
impl<Request, Response, Error> ResponseSender<Request, Response, Error> {
/// Create a [`ResponseSender`] for a given `request`.
fn new(request: Request) -> (Self, oneshot::Receiver<Result<Response, Error>>) {
let (response_sender, response_receiver) = oneshot::channel();
(
ResponseSender {
request,
response_sender,
},
response_receiver,
)
}
/// Access the `request` that's awaiting a response.
pub fn request(&self) -> &Request {
&self.request
}
/// Respond to the request.
///
/// The `response` can be of the `Response` type or a [`Result`]. This allows sending an error
/// representing an error while processing the request.
///
/// This method takes ownership of the [`ResponseSender`] so that only one response can be
/// sent.
///
/// If this method is not called, the caller will panic.
pub fn respond(self, response: impl ResponseResult<Response, Error>) {
let _ = self.response_sender.send(response.into_result());
}
}
/// A representation of an assertion type.
///
/// This trait is used to group the types of assertions that the [`MockService`] can do. There are
/// currently two types that are used as type-system tags on the [`MockService`]:
///
/// - [`PanicAssertion`]
/// - [`PropTestAssertion`]
trait AssertionType {}
/// Represents normal Rust assertions that panic, like [`assert_eq`].
pub enum PanicAssertion {}
/// Represents [`proptest`] assertions that return errors, like [`prop_assert_eq`].
pub enum PropTestAssertion {}
impl AssertionType for PanicAssertion {}
impl AssertionType for PropTestAssertion {}
/// A helper trait to improve ergonomics when sending a response.
///
/// This allows the [`ResponseSender::respond`] method to receive either a [`Result`] or just the
/// response type that is wrapped in an `Ok` variant.
pub trait ResponseResult<Response, Error> {
/// Converts the type into a [`Result`] that can be sent as a response.
fn into_result(self) -> Result<Response, Error>;
}
impl<Response, Error> ResponseResult<Response, Error> for Response {
fn into_result(self) -> Result<Response, Error> {
Ok(self)
}
}
impl<Response, Error> ResponseResult<Response, Error> for Result<Response, Error>
where
Error: std::error::Error + Send + Sync + 'static,
{
fn into_result(self) -> Result<Response, Error> {
self
}
}

View File

@ -1,26 +1,20 @@
use std::time::Duration;
use proptest::prelude::*;
use tokio::time::{self, timeout};
use tokio::time;
use zebra_network::Request;
use crate::components::tests::mock_peer_set;
use zebra_network::{Request, Response};
use zebra_test::mock_service::MockService;
use super::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY};
/// The number of iterations to crawl while testing.
///
/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test.
/// See more information in [`MAX_REQUEST_DELAY`].
const CRAWL_ITERATIONS: usize = 4;
/// The maximum time to wait for a request to arrive before considering it won't arrive.
///
/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test.
/// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for
/// at least `MAX_REQUEST_DELAY * CRAWL_ITERATIONS`.
const MAX_REQUEST_DELAY: Duration = Duration::from_millis(25);
/// at least `CRAWL_ITERATIONS` times the timeout for receiving a request (see more information in
/// [`MockServiceBuilder::with_max_request_delay`]).
const CRAWL_ITERATIONS: usize = 4;
/// The amount of time to advance beyond the expected instant that the crawler wakes up.
const ERROR_MARGIN: Duration = Duration::from_millis(100);
@ -38,30 +32,31 @@ proptest! {
sync_lengths.push(0);
runtime.block_on(async move {
let (peer_set, mut requests) = mock_peer_set();
let mut peer_set = MockService::build().for_prop_tests();
let (sync_status, mut recent_sync_lengths) = SyncStatus::new();
time::pause();
Crawler::spawn(peer_set, sync_status.clone());
Crawler::spawn(peer_set.clone(), sync_status.clone());
for sync_length in sync_lengths {
let mempool_is_enabled = sync_status.is_close_to_tip();
for _ in 0..CRAWL_ITERATIONS {
for _ in 0..FANOUT {
let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;
if mempool_is_enabled {
prop_assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds))));
peer_set
.expect_request_that(|request| {
matches!(request, Request::MempoolTransactionIds)
})
.await?
.respond(Response::TransactionIds(vec![]));
} else {
prop_assert!(request.is_err());
peer_set.expect_no_requests().await?;
}
}
let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await;
prop_assert!(extra_request.is_err());
peer_set.expect_no_requests().await?;
time::sleep(RATE_LIMIT_DELAY + ERROR_MARGIN).await;
}
@ -72,7 +67,7 @@ proptest! {
recent_sync_lengths.push_extend_tips_length(sync_length);
}
Ok(())
Ok::<(), TestCaseError>(())
})?;
}
}