From 56636c85fc901ce9075e0a6039e2abb764486a7d Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Thu, 23 Sep 2021 10:17:06 -0300 Subject: [PATCH] Add missing tests for mempool inbound requests (#2769) * Use `MockService` in inbound test Refactor the `mempool_requsets_for_transactions` test so that it uses a `MockService` instead of the `mock_peer_set` function. * Use `MockService` in the basic mempool test Refactor the `mempool_service_basic` test so that it uses a `MockService` instead of the `mock_peer_set` helper function. * Remove the `mock_peer_set` helper function It is not used anymore, since the usages were replaced with `MockService`s. * add tests for mempool inbound requests * Use MockService for transaction verifier * Refactor creation of mock `peer_set` Use the same style as the mock transaction verifier. * Derive `Eq` for `zebra_network::Request` Make it easy to use the `MockService::expect_request` method. * Return mocked peer set service from `setup` Allow it to be used to respond to requests. * Add bindings for the transaction used for testing Allow them to be moved into futures later. * Respond to transaction download request Make sure that the test transaction appears to the mempool as if it had been downloaded by the peer set service. * Assert that no unexpected requests were received Check that the mempool doesn't send unexpected requests to the peer set service. * add tests for mempool inbound requests * Use MockService for transaction verifier * add missing `expect_no_requests` to `mempool_advertise_transaction_ids` test Co-authored-by: Janito Vaqueiro Ferreira Filho Co-authored-by: Conrado Gouvea --- .../src/protocol/internal/request.rs | 2 +- zebrad/src/components/inbound/tests.rs | 263 ++++++++++++++---- 2 files changed, 210 insertions(+), 55 deletions(-) diff --git a/zebra-network/src/protocol/internal/request.rs b/zebra-network/src/protocol/internal/request.rs index 976b3f96d..84d39afd0 100644 --- a/zebra-network/src/protocol/internal/request.rs +++ b/zebra-network/src/protocol/internal/request.rs @@ -29,7 +29,7 @@ use proptest_derive::Arbitrary; /// while waiting for a response, the peer connection resets its state and makes /// a best-effort attempt to ignore any messages responsive to the cancelled /// request, subject to limitations in the underlying Zcash protocol. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, PartialEq)] #[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub enum Request { /// Requests additional peers from the server. diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index f45a840ce..31bc02f72 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -1,73 +1,37 @@ -use std::{collections::HashSet, net::SocketAddr, str::FromStr, sync::Arc}; +use std::{collections::HashSet, iter::FromIterator, net::SocketAddr, str::FromStr, sync::Arc}; use super::mempool::{unmined_transactions_in_blocks, Mempool}; use crate::components::sync::SyncStatus; +use futures::FutureExt; use tokio::sync::oneshot; -use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt}; +use tower::{ + buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, ServiceExt, +}; use tracing::Span; use zebra_chain::{ + block::Block, parameters::Network, + serialization::ZcashDeserializeInto, transaction::{UnminedTx, UnminedTxId}, }; -use zebra_consensus::Config as ConsensusConfig; + +use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig}; use zebra_network::{AddressBook, Request, Response}; use zebra_state::Config as StateConfig; -use zebra_test::mock_service::MockService; +use zebra_test::mock_service::{MockService, PanicAssertion}; #[tokio::test] async fn mempool_requests_for_transactions() { - let network = Network::Mainnet; - let consensus_config = ConsensusConfig::default(); - let state_config = StateConfig::ephemeral(); - let peer_set = MockService::build().for_unit_tests(); - let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none()); - let address_book = Arc::new(std::sync::Mutex::new(address_book)); - let (sync_status, _recent_syncs) = SyncStatus::new(); - let (_state_service, _latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); + let (inbound_service, added_transactions, _, mut peer_set) = setup(true).await; - let (state, _, _) = zebra_state::init(state_config, network); - let state_service = ServiceBuilder::new().buffer(1).service(state); - - let (block_verifier, transaction_verifier) = - zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) - .await; - - let peer_set_service = ServiceBuilder::new() - .buffer(1) - .service(BoxService::new(peer_set)); - - let mut mempool_service = Mempool::new( - network, - peer_set_service.clone(), - state_service.clone(), - transaction_verifier, - sync_status, - chain_tip_change, - ); - - let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network); - let added_transaction_ids: Vec = added_transactions.iter().map(|t| t.id).collect(); - - let mempool_service = BoxService::new(mempool_service); - let mempool = ServiceBuilder::new().buffer(1).service(mempool_service); - - let (setup_tx, setup_rx) = oneshot::channel(); - - let inbound_service = ServiceBuilder::new() - .load_shed() - .buffer(1) - .service(super::Inbound::new( - setup_rx, - state_service, - block_verifier.clone(), - )); - - let r = setup_tx.send((peer_set_service, address_book, mempool)); - // We can't expect or unwrap because the returned Result does not implement Debug - assert!(r.is_ok()); + let added_transaction_ids: Vec = added_transactions + .clone() + .unwrap() + .iter() + .map(|t| t.id) + .collect(); // Test `Request::MempoolTransactionIds` let request = inbound_service @@ -92,9 +56,200 @@ async fn mempool_requests_for_transactions() { .await; match request { - Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions), + Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions.unwrap()), _ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec)`"), }; + + peer_set.expect_no_requests().await; +} + +#[tokio::test] +async fn mempool_push_transaction() -> Result<(), crate::BoxError> { + // get a block that has at least one non coinbase transaction + let block: Arc = + zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; + + // use the first transaction that is not coinbase + let tx = block.transactions[1].clone(); + + let (inbound_service, _, mut tx_verifier, mut peer_set) = setup(false).await; + + // Test `Request::PushTransaction` + let request = inbound_service + .clone() + .oneshot(Request::PushTransaction(tx.clone().into())); + // Simulate a successful transaction verification + let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { + let txid = responder.request().tx_id(); + responder.respond(txid); + }); + let (response, _) = futures::join!(request, verification); + match response { + Ok(Response::Nil) => (), + _ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"), + }; + + // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + match request { + Ok(Response::TransactionIds(response)) => assert_eq!(response, vec![tx.unmined_id()]), + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + + peer_set.expect_no_requests().await; + + Ok(()) +} + +#[tokio::test] +async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { + // get a block that has at least one non coinbase transaction + let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; + + // use the first transaction that is not coinbase + let test_transaction = block + .transactions + .into_iter() + .find(|tx| !tx.has_any_coinbase_inputs()) + .expect("at least one non-coinbase transaction"); + let test_transaction_id = test_transaction.unmined_id(); + let txs = HashSet::from_iter([test_transaction_id]); + + let (inbound_service, _, mut tx_verifier, mut peer_set) = setup(false).await; + + // Test `Request::AdvertiseTransactionIds` + let request = inbound_service + .clone() + .oneshot(Request::AdvertiseTransactionIds(txs.clone())); + // Ensure the mocked peer set responds + let peer_set_responder = + peer_set + .expect_request(Request::TransactionsById(txs)) + .map(|responder| { + let unmined_transaction = UnminedTx { + id: test_transaction_id, + transaction: test_transaction, + }; + + responder.respond(Response::Transactions(vec![unmined_transaction])) + }); + // Simulate a successful transaction verification + let verification = tx_verifier.expect_request_that(|_| true).map(|responder| { + let txid = responder.request().tx_id(); + responder.respond(txid); + }); + let (response, _, _) = futures::join!(request, peer_set_responder, verification); + + match response { + Ok(Response::Nil) => (), + _ => unreachable!("`AdvertiseTransactionIds` requests should always respond `Ok(Nil)`"), + }; + + // Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool + let request = inbound_service + .clone() + .oneshot(Request::MempoolTransactionIds) + .await; + + match request { + Ok(Response::TransactionIds(response)) => { + assert_eq!(response, vec![test_transaction_id]) + } + _ => unreachable!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" + ), + }; + + peer_set.expect_no_requests().await; + + Ok(()) +} + +async fn setup( + add_transactions: bool, +) -> ( + LoadShed>, + Option>, + MockService, + MockService, +) { + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none()); + let address_book = Arc::new(std::sync::Mutex::new(address_book)); + let (sync_status, _recent_syncs) = SyncStatus::new(); + let (_state_service, _latest_chain_tip, chain_tip_change) = + zebra_state::init(state_config.clone(), network); + + let (state, _, _) = zebra_state::init(state_config, network); + let state_service = ServiceBuilder::new().buffer(1).service(state); + + let (block_verifier, _transaction_verifier) = + zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) + .await; + + let peer_set = MockService::build().for_unit_tests(); + let buffered_peer_set = Buffer::new(BoxService::new(peer_set.clone()), 10); + + let mock_tx_verifier = MockService::build().for_unit_tests(); + let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10); + + let mut mempool_service = Mempool::new( + network, + buffered_peer_set.clone(), + state_service.clone(), + buffered_tx_verifier.clone(), + sync_status, + chain_tip_change, + ); + + let mut added_transactions = None; + if add_transactions { + added_transactions = Some(add_some_stuff_to_mempool(&mut mempool_service, network)); + } + + let mempool_service = BoxService::new(mempool_service); + let mempool = ServiceBuilder::new().buffer(1).service(mempool_service); + + let (setup_tx, setup_rx) = oneshot::channel(); + + let inbound_service = ServiceBuilder::new() + .load_shed() + .buffer(1) + .service(super::Inbound::new( + setup_rx, + state_service.clone(), + block_verifier.clone(), + )); + + let r = setup_tx.send((buffered_peer_set, address_book, mempool)); + // We can't expect or unwrap because the returned Result does not implement Debug + assert!(r.is_ok()); + + // Push the genesis block to the state + let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .oneshot(zebra_state::Request::CommitFinalizedBlock( + genesis_block.clone().into(), + )) + .await + .unwrap(); + + ( + inbound_service, + added_transactions, + mock_tx_verifier, + peer_set, + ) } fn add_some_stuff_to_mempool(mempool_service: &mut Mempool, network: Network) -> Vec {