From b714b2b3b623e1e980aac2359d1ffcca2dcfc6a4 Mon Sep 17 00:00:00 2001 From: Janito Vaqueiro Ferreira Filho Date: Tue, 21 Sep 2021 14:44:59 -0300 Subject: [PATCH] Create a helper `MockService` type to help with writing tests that use mock `tower::Service`s (#2748) * Implement initial service mocking helpers Adds a [`MockService`] type, which can be configured and built for usage in unit tests or proptests. The mocked service can then be used to intercept requests and respond indivdiually to them. * Use `MockService in the `mempool::Crawler` test Refactor it to remove the helper mock function, and use the new `MockService` helper type. * Use `MockService` in `CandidateSet` test vectors Refactor to remove the manual mocking of the peer set service. * Panic if a response is not sent by `MockService` Change the current semantics to require all `MockService` usages to respond to every intercepted request. A `must_use` attribute was added to the `ResponseSender` so that the compiler can warn when this doesn't happen. * Allow generic error types in `MockService` Replace the hard-coded `BoxError` as the `Service`'s error type with a generic type parameter. This allows mocking services in locations that require specific error types. * Add a `ResponseSender::request` getter Allow inspecting the request again before responding, and using information from the request in the response. Co-authored-by: Conrado Gouvea --- .../peer_set/candidate_set/tests/vectors.rs | 98 +-- zebra-test/src/lib.rs | 1 + zebra-test/src/mock_service.rs | 743 ++++++++++++++++++ .../src/components/mempool/crawler/tests.rs | 39 +- 4 files changed, 798 insertions(+), 83 deletions(-) create mode 100644 zebra-test/src/mock_service.rs diff --git a/zebra-network/src/peer_set/candidate_set/tests/vectors.rs b/zebra-network/src/peer_set/candidate_set/tests/vectors.rs index 0ff7c82ae..2c29a8f4f 100644 --- a/zebra-network/src/peer_set/candidate_set/tests/vectors.rs +++ b/zebra-network/src/peer_set/candidate_set/tests/vectors.rs @@ -1,7 +1,5 @@ use std::{ - collections::VecDeque, convert::TryInto, - iter, net::{IpAddr, SocketAddr}, str::FromStr, sync::Arc, @@ -9,16 +7,14 @@ use std::{ }; use chrono::{DateTime, Duration, Utc}; -use futures::future; use tokio::{ runtime::Runtime, - sync::watch, time::{self, Instant}, }; -use tower::Service; use tracing::Span; use zebra_chain::serialization::DateTime32; +use zebra_test::mock_service::{MockService, PanicAssertion}; use super::super::{validate_addrs, CandidateSet}; use crate::{ @@ -146,9 +142,11 @@ fn candidate_set_updates_are_rate_limited() { let _guard = runtime.enter(); let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none()); - let (peer_service, call_count) = mock_peer_service(); - let mut candidate_set = - CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service); + let mut peer_service = MockService::build().for_unit_tests(); + let mut candidate_set = CandidateSet::new( + Arc::new(std::sync::Mutex::new(address_book)), + peer_service.clone(), + ); runtime.block_on(async move { time::pause(); @@ -156,6 +154,7 @@ fn candidate_set_updates_are_rate_limited() { let time_limit = Instant::now() + INTERVALS_TO_RUN * MIN_PEER_GET_ADDR_INTERVAL + StdDuration::from_secs(1); + let mut next_allowed_request_time = Instant::now(); while Instant::now() <= time_limit { candidate_set @@ -163,13 +162,16 @@ fn candidate_set_updates_are_rate_limited() { .await .expect("Call to CandidateSet::update should not fail"); + if Instant::now() >= next_allowed_request_time { + verify_fanned_out_requests(&mut peer_service).await; + + next_allowed_request_time = Instant::now() + MIN_PEER_GET_ADDR_INTERVAL; + } else { + peer_service.expect_no_requests().await; + } + time::advance(MIN_PEER_GET_ADDR_INTERVAL / POLL_FREQUENCY_FACTOR).await; } - - assert_eq!( - *call_count.borrow(), - INTERVALS_TO_RUN as usize * GET_ADDR_FANOUT - ); }); } @@ -181,9 +183,11 @@ fn candidate_set_update_after_update_initial_is_rate_limited() { let _guard = runtime.enter(); let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none()); - let (peer_service, call_count) = mock_peer_service(); - let mut candidate_set = - CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service); + let mut peer_service = MockService::build().for_unit_tests(); + let mut candidate_set = CandidateSet::new( + Arc::new(std::sync::Mutex::new(address_book)), + peer_service.clone(), + ); runtime.block_on(async move { time::pause(); @@ -194,7 +198,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() { .await .expect("Call to CandidateSet::update should not fail"); - assert_eq!(*call_count.borrow(), GET_ADDR_FANOUT); + verify_fanned_out_requests(&mut peer_service).await; // The following two calls to `update` should be skipped candidate_set @@ -207,7 +211,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() { .await .expect("Call to CandidateSet::update should not fail"); - assert_eq!(*call_count.borrow(), GET_ADDR_FANOUT); + peer_service.expect_no_requests().await; // After waiting for at least the minimum interval the call to `update` should succeed time::advance(MIN_PEER_GET_ADDR_INTERVAL).await; @@ -216,7 +220,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() { .await .expect("Call to CandidateSet::update should not fail"); - assert_eq!(*call_count.borrow(), 2 * GET_ADDR_FANOUT); + verify_fanned_out_requests(&mut peer_service).await; }); } @@ -243,49 +247,21 @@ fn mock_gossiped_peers(last_seen_times: impl IntoIterator>) .collect() } -/// Create a mock `PeerSet` service that checks that requests to it are rate limited. +/// Verify that a batch of fanned out requests are sent by the candidate set. /// -/// The function also returns a call count watcher, that can be used for checking how many times the -/// service was called. -fn mock_peer_service() -> ( - impl Service< - Request, - Response = Response, - Future = future::Ready>, - Error = E, - > + 'static, - watch::Receiver, +/// # Panics +/// +/// This will panic (causing the test to fail) if more or less requests are received than the +/// expected [`GET_ADDR_FANOUT`] amount. +async fn verify_fanned_out_requests( + peer_service: &mut MockService, ) { - let rate_limit_interval = MIN_PEER_GET_ADDR_INTERVAL; + for _ in 0..GET_ADDR_FANOUT { + peer_service + .expect_request_that(|request| matches!(request, Request::Peers)) + .await + .respond(Response::Peers(vec![])); + } - let mut call_counter = 0; - let (call_count_sender, call_count_receiver) = watch::channel(call_counter); - - let mut peer_request_tracker: VecDeque<_> = - iter::repeat(Instant::now()).take(GET_ADDR_FANOUT).collect(); - - let service = tower::service_fn(move |request| { - match request { - Request::Peers => { - // Get time from queue that the request is authorized to be sent - let authorized_request_time = peer_request_tracker - .pop_front() - .expect("peer_request_tracker should always have GET_ADDR_FANOUT elements"); - // Check that the request was rate limited - assert!(Instant::now() >= authorized_request_time); - // Push a new authorization, updated by the rate limit interval - peer_request_tracker.push_back(Instant::now() + rate_limit_interval); - - // Increment count of calls - call_counter += 1; - let _ = call_count_sender.send(call_counter); - - // Return an empty list of peer addresses - future::ok(Response::Peers(vec![])) - } - _ => unreachable!("Received an unexpected internal message: {:?}", request), - } - }); - - (service, call_count_receiver) + peer_service.expect_no_requests().await; } diff --git a/zebra-test/src/lib.rs b/zebra-test/src/lib.rs index 5d3f21c11..4fb8a9eaf 100644 --- a/zebra-test/src/lib.rs +++ b/zebra-test/src/lib.rs @@ -20,6 +20,7 @@ use std::sync::Once; #[allow(missing_docs)] pub mod command; +pub mod mock_service; pub mod net; pub mod prelude; pub mod transcript; diff --git a/zebra-test/src/mock_service.rs b/zebra-test/src/mock_service.rs new file mode 100644 index 000000000..984f89572 --- /dev/null +++ b/zebra-test/src/mock_service.rs @@ -0,0 +1,743 @@ +//! Some helpers to make it simpler to mock Tower services. +//! +//! A [`MockService`] is a generic [`tower::Service`] implementation that allows intercepting +//! requests, responding to them individually, and checking that there are no requests to be +//! received (at least during a period of time). The [`MockService`] can be built for proptests or +//! for normal Rust unit tests. +//! +//! # Example +//! +//! ``` +//! use zebra_test::mock_service::MockService; +//! # use tower::ServiceExt; +//! +//! # let reactor = tokio::runtime::Builder::new_current_thread() +//! # .enable_all() +//! # .build() +//! # .expect("Failed to build Tokio runtime"); +//! # +//! # reactor.block_on(async { +//! let mut mock_service = MockService::build().for_unit_tests(); +//! let mut service = mock_service.clone(); +//! # +//! # // Add types to satisfy the compiler's type inference for the `Error` type. +//! # let _typed_mock_service: MockService<_, _, _> = mock_service.clone(); +//! +//! let call = tokio::spawn(mock_service.clone().oneshot("hello")); +//! +//! mock_service +//! .expect_request("hello").await +//! .respond("hi!"); +//! +//! mock_service.expect_no_requests().await; +//! +//! let response = call +//! .await +//! .expect("Failed to run call on the background") +//! .expect("Failed to receive response from service"); +//! +//! assert_eq!(response, "hi!"); +//! # }); +//! ``` + +use std::{ + fmt::Debug, + marker::PhantomData, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{future::BoxFuture, FutureExt}; +use proptest::prelude::*; +use tokio::{ + sync::{ + broadcast::{self, error::RecvError}, + oneshot, Mutex, + }, + time::timeout, +}; +use tower::{BoxError, Service}; + +/// The default size of the channel that forwards received requests. +/// +/// If requests are received faster than the test code can consume them, some requests may be +/// ignored. +/// +/// This value can be configured in the [`MockService`] using +/// [`MockServiceBuilder::with_proxy_channel_size`]. +const DEFAULT_PROXY_CHANNEL_SIZE: usize = 100; + +/// The default timeout before considering a request has not been received. +/// +/// This is the time that the mocked service waits before considering a request will not be +/// received. It can be configured in the [`MockService`] using +/// [`MockServiceBuilder::with_max_request_delay`]. +/// +/// Note that if a test checks that no requests are received, each check has to wait for this +/// amount of time, so this may affect the test execution time. +const DEFAULT_MAX_REQUEST_DELAY: Duration = Duration::from_millis(25); + +/// An internal type representing the item that's sent in the [`broadcast`] channel. +/// +/// The actual type that matters is the [`ResponseSender`] but since there could be more than one +/// [`MockService`] verifying requests, the type must be wrapped so that it can be shared by all +/// receivers: +/// +/// - The [`Arc`] makes sure the instance is on the heap, and can be shared properly between +/// threads and dropped when no longer needed. +/// - The [`Mutex`] ensures only one [`MockService`] instance can reply to the received request. +/// - The [`Option`] forces the [`MockService`] that handles the request to take ownership of it +/// because sending a response also forces the [`ResponseSender`] to be dropped. +type ProxyItem = + Arc>>>; + +/// A service implementation that allows intercepting requests for checking them. +/// +/// The type is generic over the request and response types, and also has an extra generic type +/// parameter that's used as a tag to determine if the internal assertions should panic or return +/// errors for proptest minimization. See [`AssertionType`] for more information. +/// +/// The mock service can be cloned, and provides methods for checking the received requests as well +/// as responding to them individually. +/// +/// Internally, the instance that's operating as the service will forward requests to a +/// [`broadcast`] channel that the other instances listen to. +/// +/// See the [module-level documentation][`super::mock_service`] for an example. +pub struct MockService { + receiver: broadcast::Receiver>, + sender: broadcast::Sender>, + max_request_delay: Duration, + _assertion_type: PhantomData, +} + +/// A builder type to create a [`MockService`]. +/// +/// Allows changing specific parameters used by the [`MockService`], if necessary. The default +/// parameters should be reasonable for most cases. +#[derive(Default)] +pub struct MockServiceBuilder { + proxy_channel_size: Option, + max_request_delay: Option, +} + +/// A helper type for responding to incoming requests. +/// +/// An instance of this type is created for each request received by the [`MockService`]. It +/// contains the received request and a [`oneshot::Sender`] that can be used to respond to the +/// request. +/// +/// If a response is not sent, the channel is closed and a [`BoxError`] is returned by the service +/// to the caller that sent the request. +#[must_use = "Tests may fail if a response is not sent back to the caller"] +pub struct ResponseSender { + request: Request, + response_sender: oneshot::Sender>, +} + +/// The [`tower::Service`] implementation of the [`MockService`]. +/// +/// The [`MockService`] is always ready, and it intercepts the requests wrapping them in a +/// [`ResponseSender`] which can be used to send a response. +impl Service + for MockService +where + Response: Send + 'static, + Error: Send + 'static, +{ + type Response = Response; + type Error = Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _context: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: Request) -> Self::Future { + let (response_sender, response_receiver) = ResponseSender::new(request); + let proxy_item = Arc::new(Mutex::new(Some(response_sender))); + + let _ = self.sender.send(proxy_item); + + response_receiver + .map(|response| { + response.expect("A response was not sent by the `MockService` for a request") + }) + .boxed() + } +} + +/// An entry point for starting the [`MockServiceBuilder`]. +/// +/// This `impl` block exists for ergonomic reasons. The generic type paramaters don't matter, +/// because they are actually set by [`MockServiceBuilder::finish`]. +impl MockService<(), (), ()> { + /// Create a [`MockServiceBuilder`] to help with the creation of a [`MockService`]. + pub fn build() -> MockServiceBuilder { + MockServiceBuilder::default() + } +} + +impl MockServiceBuilder { + /// Configure the size of the proxy channel used for sending intercepted requests. + /// + /// This determines the maximum amount of requests that are kept in queue before the oldest + /// request is dropped. This means that any tests that receive too many requests might ignore + /// some requests if this parameter isn't properly configured. + /// + /// The default value of 100 should be enough for most cases. + /// + /// # Example + /// + /// ``` + /// # use zebra_test::mock_service::MockService; + /// # + /// let mock_service = MockService::build() + /// .with_proxy_channel_size(100) + /// .for_prop_tests(); + /// # + /// # // Add types to satisfy the compiler's type inference. + /// # let typed_mock_service: MockService<(), (), _> = mock_service; + /// ``` + pub fn with_proxy_channel_size(mut self, size: usize) -> Self { + self.proxy_channel_size = Some(size); + self + } + + /// Configure the time to wait for a request before considering no requests will be received. + /// + /// This determines the maximum amount of time that the [`MockService`] will wait for a request + /// to be received before considering that a request will not be received. + /// + /// The default value of 25 ms should be enough for most cases. + /// + /// # Example + /// + /// ``` + /// # use std::time::Duration; + /// # + /// # use zebra_test::mock_service::MockService; + /// # + /// let mock_service = MockService::build() + /// .with_max_request_delay(Duration::from_millis(25)) + /// .for_unit_tests(); + /// # + /// # // Add types to satisfy the compiler's type inference. + /// # let typed_mock_service: MockService<(), (), _> = mock_service; + /// ``` + pub fn with_max_request_delay(mut self, max_request_delay: Duration) -> Self { + self.max_request_delay = Some(max_request_delay); + self + } + + /// Create a [`MockService`] to be used in `proptest`s. + /// + /// The assertions performed by [`MockService`] use the macros provided by [`proptest`], like + /// [`prop_assert`]. + pub fn for_prop_tests( + self, + ) -> MockService { + self.finish() + } + + /// Create a [`MockService`] to be used in Rust unit tests. + /// + /// The assertions performed by [`MockService`] use the macros provided by default in Rust, + /// like [`assert`]. + pub fn for_unit_tests( + self, + ) -> MockService { + self.finish() + } + + /// An internal helper method to create the actual [`MockService`]. + /// + /// Note that this is used by both [`Self::for_prop_tests`] and [`Self::for_unit_tests`], the + /// only difference being the `Assertion` generic type parameter, which Rust infers + /// automatically. + fn finish( + self, + ) -> MockService { + let proxy_channel_size = self + .proxy_channel_size + .unwrap_or(DEFAULT_PROXY_CHANNEL_SIZE); + let (sender, receiver) = broadcast::channel(proxy_channel_size); + + MockService { + receiver, + sender, + max_request_delay: self.max_request_delay.unwrap_or(DEFAULT_MAX_REQUEST_DELAY), + _assertion_type: PhantomData, + } + } +} + +/// Implementation of [`MockService`] methods that use standard Rust panicking assertions. +impl MockService { + /// Expect a specific request to be received. + /// + /// The expected request should be the next one in the internal queue, or if the queue is + /// empty, it should be received in at most the max delay time configured by + /// [`MockServiceBuilder::with_max_request_delay`]. + /// + /// If the received request matches the expected request, a [`ResponseSender`] is returned + /// which can be used to inspect the request and respond to it. If no response is sent, the + /// sender of the requests receives an error. + /// + /// # Panics + /// + /// If no request is received or if a request is received that's not equal to the expected + /// request, this method panics. + /// + /// # Example + /// + /// ``` + /// # use zebra_test::mock_service::MockService; + /// # use tower::ServiceExt; + /// # + /// # let reactor = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .expect("Failed to build Tokio runtime"); + /// # + /// # reactor.block_on(async { + /// # let mut mock_service: MockService<_, _, _> = MockService::build().for_unit_tests(); + /// # let mut service = mock_service.clone(); + /// # + /// let call = tokio::spawn(mock_service.clone().oneshot("request")); + /// + /// mock_service.expect_request("request").await.respond("response"); + /// + /// assert!(matches!(call.await, Ok(Ok("response")))); + /// # }); + /// ``` + pub async fn expect_request( + &mut self, + expected: Request, + ) -> ResponseSender + where + Request: PartialEq + Debug, + { + let response_sender = self.next_request().await; + + assert_eq!(response_sender.request, expected); + + response_sender + } + + /// Expect a request to be received that matches a specified condition. + /// + /// There should be a request already in the internal queue, or a request should be received in + /// at most the max delay time configured by [`MockServiceBuilder::with_max_request_delay`]. + /// + /// The received request is passed to the `condition` function, which should return `true` if + /// it matches the expected condition or `false` otherwise. If `true` is returned, a + /// [`ResponseSender`] is returned which can be used to inspect the request again and respond + /// to it. If no response is sent, the sender of the requests receives an error. + /// + /// # Panics + /// + /// If the `condition` function returns `false`, this method panics. + /// + /// # Example + /// + /// ``` + /// # use zebra_test::mock_service::MockService; + /// # use tower::ServiceExt; + /// # + /// # let reactor = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .expect("Failed to build Tokio runtime"); + /// # + /// # reactor.block_on(async { + /// # let mut mock_service: MockService<_, _, _> = MockService::build().for_unit_tests(); + /// # let mut service = mock_service.clone(); + /// # + /// let call = tokio::spawn(mock_service.clone().oneshot(1)); + /// + /// mock_service.expect_request_that(|request| *request > 0).await.respond("response"); + /// + /// assert!(matches!(call.await, Ok(Ok("response")))); + /// # }); + /// ``` + pub async fn expect_request_that( + &mut self, + condition: impl FnOnce(&Request) -> bool, + ) -> ResponseSender { + let response_sender = self.next_request().await; + + assert!(condition(&response_sender.request)); + + response_sender + } + + /// Expect no requests to be received. + /// + /// The internal queue of received requests should be empty, and no new requests should arrive + /// for the max delay time configured by [`MockServiceBuilder::with_max_request_delay`]. + /// + /// # Panics + /// + /// If the queue is not empty or if a request is received before the max request delay timeout + /// expires. + /// + /// # Example + /// + /// ``` + /// # use zebra_test::mock_service::MockService; + /// # use tower::ServiceExt; + /// # + /// # let reactor = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .expect("Failed to build Tokio runtime"); + /// # + /// # reactor.block_on(async { + /// # let mut mock_service: MockService<(), (), _> = MockService::build().for_unit_tests(); + /// # + /// mock_service.expect_no_requests().await; + /// # }); + /// ``` + pub async fn expect_no_requests(&mut self) + where + Request: Debug, + { + if let Some(response_sender) = self.try_next_request().await { + panic!( + "Received an unexpected request: {:?}", + response_sender.request + ); + } + } + + /// A helper method to get the next request from the queue. + /// + /// Returns the next request in the internal queue or waits at most the max delay time + /// configured by [`MockServiceBuilder::with_max_request_delay`] for a new request to be + /// received, and then returns that. + /// + /// # Panics + /// + /// If the queue is empty and a request is not received before the max request delay timeout + /// expires. + async fn next_request(&mut self) -> ResponseSender { + match self.try_next_request().await { + Some(request) => request, + None => panic!("Timeout while waiting for a request"), + } + } +} + +/// Implementation of [`MockService`] methods that use [`proptest`] assertions. +impl MockService { + /// Expect a specific request to be received. + /// + /// The expected request should be the next one in the internal queue, or if the queue is + /// empty, it should be received in at most the max delay time configured by + /// [`MockServiceBuilder::with_max_request_delay`]. + /// + /// If the received request matches the expected request, a [`ResponseSender`] is returned + /// which can be used to inspect the request and respond to it. If no response is sent, the + /// sender of the requests receives an error. + /// + /// If no request is received or if a request is received that's not equal to the expected + /// request, this method returns an error generated by a [`proptest`] assertion. + /// + /// # Example + /// + /// ``` + /// # use proptest::prelude::*; + /// # use tower::ServiceExt; + /// # + /// # use zebra_test::mock_service::MockService; + /// # + /// # let reactor = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .expect("Failed to build Tokio runtime"); + /// # + /// # reactor.block_on(async { + /// # let test_code = || async { + /// # let mut mock_service: MockService<_, _, _> = + /// # MockService::build().for_prop_tests(); + /// # let mut service = mock_service.clone(); + /// # + /// let call = tokio::spawn(mock_service.clone().oneshot("request")); + /// + /// // NOTE: The try operator `?` is required for errors to be handled by proptest. + /// mock_service + /// .expect_request("request").await? + /// .respond("response"); + /// + /// prop_assert!(matches!(call.await, Ok(Ok("response")))); + /// # + /// # Ok::<(), TestCaseError>(()) + /// # }; + /// # test_code().await + /// # }).unwrap(); + /// ``` + pub async fn expect_request( + &mut self, + expected: Request, + ) -> Result, TestCaseError> + where + Request: PartialEq + Debug, + { + let response_sender = self.next_request().await?; + + prop_assert_eq!(&response_sender.request, &expected); + + Ok(response_sender) + } + + /// Expect a request to be received that matches a specified condition. + /// + /// There should be a request already in the internal queue, or a request should be received in + /// at most the max delay time configured by [`MockServiceBuilder::with_max_request_delay`]. + /// + /// The received request is passed to the `condition` function, which should return `true` if + /// it matches the expected condition or `false` otherwise. If `true` is returned, a + /// [`ResponseSender`] is returned which can be used to inspect the request again and respond + /// to it. If no response is sent, the sender of the requests receives an error. + /// + /// If the `condition` function returns `false`, this method returns an error generated by a + /// [`proptest`] assertion. + /// + /// # Example + /// + /// ``` + /// # use proptest::prelude::*; + /// # use tower::ServiceExt; + /// # + /// # use zebra_test::mock_service::MockService; + /// # + /// # let reactor = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .expect("Failed to build Tokio runtime"); + /// # + /// # reactor.block_on(async { + /// # let test_code = || async { + /// # let mut mock_service: MockService<_, _, _> = + /// # MockService::build().for_prop_tests(); + /// # let mut service = mock_service.clone(); + /// # + /// let call = tokio::spawn(mock_service.clone().oneshot(1)); + /// + /// // NOTE: The try operator `?` is required for errors to be handled by proptest. + /// mock_service + /// .expect_request_that(|request| *request > 0).await? + /// .respond("OK"); + /// + /// prop_assert!(matches!(call.await, Ok(Ok("OK")))); + /// # + /// # Ok::<(), TestCaseError>(()) + /// # }; + /// # test_code().await + /// # }).unwrap(); + /// ``` + pub async fn expect_request_that( + &mut self, + condition: impl FnOnce(&Request) -> bool, + ) -> Result, TestCaseError> { + let response_sender = self.next_request().await?; + + prop_assert!(condition(&response_sender.request)); + + Ok(response_sender) + } + + /// Expect no requests to be received. + /// + /// The internal queue of received requests should be empty, and no new requests should arrive + /// for the max delay time configured by [`MockServiceBuilder::with_max_request_delay`]. + /// + /// If the queue is not empty or if a request is received before the max request delay timeout + /// expires, an error generated by a [`proptest`] assertion is returned. + /// + /// # Example + /// + /// ``` + /// # use proptest::prelude::TestCaseError; + /// # use tower::ServiceExt; + /// # + /// # use zebra_test::mock_service::MockService; + /// # + /// # let reactor = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .expect("Failed to build Tokio runtime"); + /// # + /// # reactor.block_on(async { + /// # let test_code = || async { + /// # let mut mock_service: MockService<(), (), _> = + /// # MockService::build().for_prop_tests(); + /// # + /// // NOTE: The try operator `?` is required for errors to be handled by proptest. + /// mock_service.expect_no_requests().await?; + /// # + /// # Ok::<(), TestCaseError>(()) + /// # }; + /// # test_code().await + /// # }).unwrap(); + /// ``` + pub async fn expect_no_requests(&mut self) -> Result<(), TestCaseError> + where + Request: Debug, + { + match self.try_next_request().await { + Some(response_sender) => { + prop_assert!( + false, + "Received an unexpected request: {:?}", + response_sender.request + ); + unreachable!("prop_assert!(false) returns an early error"); + } + None => Ok(()), + } + } + + /// A helper method to get the next request from the queue. + /// + /// Returns the next request in the internal queue or waits at most the max delay time + /// configured by [`MockServiceBuilder::with_max_request_delay`] for a new request to be + /// received, and then returns that. + /// + /// If the queue is empty and a request is not received before the max request delay timeout + /// expires, an error generated by a [`proptest`] assertion is returned. + async fn next_request( + &mut self, + ) -> Result, TestCaseError> { + match self.try_next_request().await { + Some(request) => Ok(request), + None => { + prop_assert!(false, "Timeout while waiting for a request"); + unreachable!("prop_assert!(false) returns an early error"); + } + } + } +} + +/// Code that is independent of the assertions used in [`MockService`]. +impl MockService { + /// Try to get the next request received. + /// + /// Returns the next element in the queue. If the queue is empty, waits at most the max request + /// delay configured by [`MockServiceBuilder::with_max_request_delay`] for a request, and + /// returns it. + /// + /// If no request is received, returns `None`. + /// + /// If too many requests are received and the queue fills up, the oldest requests are dropped + /// and ignored. This means that calling this may not receive the next request if the queue is + /// not dimensioned properly with the [`MockServiceBuilder::with_proxy_channel_size`] method. + async fn try_next_request(&mut self) -> Option> { + loop { + match timeout(self.max_request_delay, self.receiver.recv()).await { + Ok(Ok(item)) => { + if let Some(proxy_item) = item.lock().await.take() { + return Some(proxy_item); + } + } + Ok(Err(RecvError::Lagged(_))) => continue, + Ok(Err(RecvError::Closed)) => unreachable!("Sender is never closed"), + Err(_timeout) => return None, + } + } + } +} + +impl Clone + for MockService +{ + /// Clones the [`MockService`]. + /// + /// This is a cheap operation, because it simply clones the [`broadcast`] channel endpoints. + fn clone(&self) -> Self { + MockService { + receiver: self.sender.subscribe(), + sender: self.sender.clone(), + max_request_delay: self.max_request_delay, + _assertion_type: PhantomData, + } + } +} + +impl ResponseSender { + /// Create a [`ResponseSender`] for a given `request`. + fn new(request: Request) -> (Self, oneshot::Receiver>) { + let (response_sender, response_receiver) = oneshot::channel(); + + ( + ResponseSender { + request, + response_sender, + }, + response_receiver, + ) + } + + /// Access the `request` that's awaiting a response. + pub fn request(&self) -> &Request { + &self.request + } + + /// Respond to the request. + /// + /// The `response` can be of the `Response` type or a [`Result`]. This allows sending an error + /// representing an error while processing the request. + /// + /// This method takes ownership of the [`ResponseSender`] so that only one response can be + /// sent. + /// + /// If this method is not called, the caller will panic. + pub fn respond(self, response: impl ResponseResult) { + let _ = self.response_sender.send(response.into_result()); + } +} + +/// A representation of an assertion type. +/// +/// This trait is used to group the types of assertions that the [`MockService`] can do. There are +/// currently two types that are used as type-system tags on the [`MockService`]: +/// +/// - [`PanicAssertion`] +/// - [`PropTestAssertion`] +trait AssertionType {} + +/// Represents normal Rust assertions that panic, like [`assert_eq`]. +pub enum PanicAssertion {} + +/// Represents [`proptest`] assertions that return errors, like [`prop_assert_eq`]. +pub enum PropTestAssertion {} + +impl AssertionType for PanicAssertion {} + +impl AssertionType for PropTestAssertion {} + +/// A helper trait to improve ergonomics when sending a response. +/// +/// This allows the [`ResponseSender::respond`] method to receive either a [`Result`] or just the +/// response type that is wrapped in an `Ok` variant. +pub trait ResponseResult { + /// Converts the type into a [`Result`] that can be sent as a response. + fn into_result(self) -> Result; +} + +impl ResponseResult for Response { + fn into_result(self) -> Result { + Ok(self) + } +} + +impl ResponseResult for Result +where + Error: std::error::Error + Send + Sync + 'static, +{ + fn into_result(self) -> Result { + self + } +} diff --git a/zebrad/src/components/mempool/crawler/tests.rs b/zebrad/src/components/mempool/crawler/tests.rs index ee91ca6cd..56d34b7cc 100644 --- a/zebrad/src/components/mempool/crawler/tests.rs +++ b/zebrad/src/components/mempool/crawler/tests.rs @@ -1,26 +1,20 @@ use std::time::Duration; use proptest::prelude::*; -use tokio::time::{self, timeout}; +use tokio::time; -use zebra_network::Request; - -use crate::components::tests::mock_peer_set; +use zebra_network::{Request, Response}; +use zebra_test::mock_service::MockService; use super::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY}; /// The number of iterations to crawl while testing. /// /// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test. -/// See more information in [`MAX_REQUEST_DELAY`]. -const CRAWL_ITERATIONS: usize = 4; - -/// The maximum time to wait for a request to arrive before considering it won't arrive. -/// -/// Note that this affects the total run time of the [`crawler_requests_for_transaction_ids`] test. /// There are [`CRAWL_ITERATIONS`] requests that are expected to not be sent, so the test runs for -/// at least `MAX_REQUEST_DELAY * CRAWL_ITERATIONS`. -const MAX_REQUEST_DELAY: Duration = Duration::from_millis(25); +/// at least `CRAWL_ITERATIONS` times the timeout for receiving a request (see more information in +/// [`MockServiceBuilder::with_max_request_delay`]). +const CRAWL_ITERATIONS: usize = 4; /// The amount of time to advance beyond the expected instant that the crawler wakes up. const ERROR_MARGIN: Duration = Duration::from_millis(100); @@ -38,30 +32,31 @@ proptest! { sync_lengths.push(0); runtime.block_on(async move { - let (peer_set, mut requests) = mock_peer_set(); + let mut peer_set = MockService::build().for_prop_tests(); let (sync_status, mut recent_sync_lengths) = SyncStatus::new(); time::pause(); - Crawler::spawn(peer_set, sync_status.clone()); + Crawler::spawn(peer_set.clone(), sync_status.clone()); for sync_length in sync_lengths { let mempool_is_enabled = sync_status.is_close_to_tip(); for _ in 0..CRAWL_ITERATIONS { for _ in 0..FANOUT { - let request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; - if mempool_is_enabled { - prop_assert!(matches!(request, Ok(Some(Request::MempoolTransactionIds)))); + peer_set + .expect_request_that(|request| { + matches!(request, Request::MempoolTransactionIds) + }) + .await? + .respond(Response::TransactionIds(vec![])); } else { - prop_assert!(request.is_err()); + peer_set.expect_no_requests().await?; } } - let extra_request = timeout(MAX_REQUEST_DELAY, requests.recv()).await; - - prop_assert!(extra_request.is_err()); + peer_set.expect_no_requests().await?; time::sleep(RATE_LIMIT_DELAY + ERROR_MARGIN).await; } @@ -72,7 +67,7 @@ proptest! { recent_sync_lengths.push_extend_tips_length(sync_length); } - Ok(()) + Ok::<(), TestCaseError>(()) })?; } }