Add missing tests for mempool inbound requests (#2769)
* Use `MockService` in inbound test Refactor the `mempool_requsets_for_transactions` test so that it uses a `MockService` instead of the `mock_peer_set` function. * Use `MockService` in the basic mempool test Refactor the `mempool_service_basic` test so that it uses a `MockService` instead of the `mock_peer_set` helper function. * Remove the `mock_peer_set` helper function It is not used anymore, since the usages were replaced with `MockService`s. * add tests for mempool inbound requests * Use MockService for transaction verifier * Refactor creation of mock `peer_set` Use the same style as the mock transaction verifier. * Derive `Eq` for `zebra_network::Request` Make it easy to use the `MockService::expect_request` method. * Return mocked peer set service from `setup` Allow it to be used to respond to requests. * Add bindings for the transaction used for testing Allow them to be moved into futures later. * Respond to transaction download request Make sure that the test transaction appears to the mempool as if it had been downloaded by the peer set service. * Assert that no unexpected requests were received Check that the mempool doesn't send unexpected requests to the peer set service. * add tests for mempool inbound requests * Use MockService for transaction verifier * add missing `expect_no_requests` to `mempool_advertise_transaction_ids` test Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com> Co-authored-by: Conrado Gouvea <conrado@zfnd.org>
This commit is contained in:
parent
e2cc6e12f8
commit
56636c85fc
|
@ -29,7 +29,7 @@ use proptest_derive::Arbitrary;
|
||||||
/// while waiting for a response, the peer connection resets its state and makes
|
/// while waiting for a response, the peer connection resets its state and makes
|
||||||
/// a best-effort attempt to ignore any messages responsive to the cancelled
|
/// a best-effort attempt to ignore any messages responsive to the cancelled
|
||||||
/// request, subject to limitations in the underlying Zcash protocol.
|
/// request, subject to limitations in the underlying Zcash protocol.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||||
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
|
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
|
||||||
pub enum Request {
|
pub enum Request {
|
||||||
/// Requests additional peers from the server.
|
/// Requests additional peers from the server.
|
||||||
|
|
|
@ -1,73 +1,37 @@
|
||||||
use std::{collections::HashSet, net::SocketAddr, str::FromStr, sync::Arc};
|
use std::{collections::HashSet, iter::FromIterator, net::SocketAddr, str::FromStr, sync::Arc};
|
||||||
|
|
||||||
use super::mempool::{unmined_transactions_in_blocks, Mempool};
|
use super::mempool::{unmined_transactions_in_blocks, Mempool};
|
||||||
use crate::components::sync::SyncStatus;
|
use crate::components::sync::SyncStatus;
|
||||||
|
|
||||||
|
use futures::FutureExt;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};
|
use tower::{
|
||||||
|
buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, ServiceExt,
|
||||||
|
};
|
||||||
|
|
||||||
use tracing::Span;
|
use tracing::Span;
|
||||||
use zebra_chain::{
|
use zebra_chain::{
|
||||||
|
block::Block,
|
||||||
parameters::Network,
|
parameters::Network,
|
||||||
|
serialization::ZcashDeserializeInto,
|
||||||
transaction::{UnminedTx, UnminedTxId},
|
transaction::{UnminedTx, UnminedTxId},
|
||||||
};
|
};
|
||||||
use zebra_consensus::Config as ConsensusConfig;
|
|
||||||
|
use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig};
|
||||||
use zebra_network::{AddressBook, Request, Response};
|
use zebra_network::{AddressBook, Request, Response};
|
||||||
use zebra_state::Config as StateConfig;
|
use zebra_state::Config as StateConfig;
|
||||||
use zebra_test::mock_service::MockService;
|
use zebra_test::mock_service::{MockService, PanicAssertion};
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn mempool_requests_for_transactions() {
|
async fn mempool_requests_for_transactions() {
|
||||||
let network = Network::Mainnet;
|
let (inbound_service, added_transactions, _, mut peer_set) = setup(true).await;
|
||||||
let consensus_config = ConsensusConfig::default();
|
|
||||||
let state_config = StateConfig::ephemeral();
|
|
||||||
let peer_set = MockService::build().for_unit_tests();
|
|
||||||
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
|
|
||||||
let address_book = Arc::new(std::sync::Mutex::new(address_book));
|
|
||||||
let (sync_status, _recent_syncs) = SyncStatus::new();
|
|
||||||
let (_state_service, _latest_chain_tip, chain_tip_change) =
|
|
||||||
zebra_state::init(state_config.clone(), network);
|
|
||||||
|
|
||||||
let (state, _, _) = zebra_state::init(state_config, network);
|
let added_transaction_ids: Vec<UnminedTxId> = added_transactions
|
||||||
let state_service = ServiceBuilder::new().buffer(1).service(state);
|
.clone()
|
||||||
|
.unwrap()
|
||||||
let (block_verifier, transaction_verifier) =
|
.iter()
|
||||||
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
|
.map(|t| t.id)
|
||||||
.await;
|
.collect();
|
||||||
|
|
||||||
let peer_set_service = ServiceBuilder::new()
|
|
||||||
.buffer(1)
|
|
||||||
.service(BoxService::new(peer_set));
|
|
||||||
|
|
||||||
let mut mempool_service = Mempool::new(
|
|
||||||
network,
|
|
||||||
peer_set_service.clone(),
|
|
||||||
state_service.clone(),
|
|
||||||
transaction_verifier,
|
|
||||||
sync_status,
|
|
||||||
chain_tip_change,
|
|
||||||
);
|
|
||||||
|
|
||||||
let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network);
|
|
||||||
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();
|
|
||||||
|
|
||||||
let mempool_service = BoxService::new(mempool_service);
|
|
||||||
let mempool = ServiceBuilder::new().buffer(1).service(mempool_service);
|
|
||||||
|
|
||||||
let (setup_tx, setup_rx) = oneshot::channel();
|
|
||||||
|
|
||||||
let inbound_service = ServiceBuilder::new()
|
|
||||||
.load_shed()
|
|
||||||
.buffer(1)
|
|
||||||
.service(super::Inbound::new(
|
|
||||||
setup_rx,
|
|
||||||
state_service,
|
|
||||||
block_verifier.clone(),
|
|
||||||
));
|
|
||||||
|
|
||||||
let r = setup_tx.send((peer_set_service, address_book, mempool));
|
|
||||||
// We can't expect or unwrap because the returned Result does not implement Debug
|
|
||||||
assert!(r.is_ok());
|
|
||||||
|
|
||||||
// Test `Request::MempoolTransactionIds`
|
// Test `Request::MempoolTransactionIds`
|
||||||
let request = inbound_service
|
let request = inbound_service
|
||||||
|
@ -92,9 +56,200 @@ async fn mempool_requests_for_transactions() {
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
match request {
|
match request {
|
||||||
Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions),
|
Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions.unwrap()),
|
||||||
_ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec<UnminedTx>)`"),
|
_ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec<UnminedTx>)`"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
peer_set.expect_no_requests().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
|
||||||
|
// get a block that has at least one non coinbase transaction
|
||||||
|
let block: Arc<Block> =
|
||||||
|
zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
|
||||||
|
|
||||||
|
// use the first transaction that is not coinbase
|
||||||
|
let tx = block.transactions[1].clone();
|
||||||
|
|
||||||
|
let (inbound_service, _, mut tx_verifier, mut peer_set) = setup(false).await;
|
||||||
|
|
||||||
|
// Test `Request::PushTransaction`
|
||||||
|
let request = inbound_service
|
||||||
|
.clone()
|
||||||
|
.oneshot(Request::PushTransaction(tx.clone().into()));
|
||||||
|
// Simulate a successful transaction verification
|
||||||
|
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
|
||||||
|
let txid = responder.request().tx_id();
|
||||||
|
responder.respond(txid);
|
||||||
|
});
|
||||||
|
let (response, _) = futures::join!(request, verification);
|
||||||
|
match response {
|
||||||
|
Ok(Response::Nil) => (),
|
||||||
|
_ => unreachable!("`PushTransaction` requests should always respond `Ok(Nil)`"),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
|
||||||
|
let request = inbound_service
|
||||||
|
.clone()
|
||||||
|
.oneshot(Request::MempoolTransactionIds)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match request {
|
||||||
|
Ok(Response::TransactionIds(response)) => assert_eq!(response, vec![tx.unmined_id()]),
|
||||||
|
_ => unreachable!(
|
||||||
|
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
peer_set.expect_no_requests().await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
|
||||||
|
// get a block that has at least one non coinbase transaction
|
||||||
|
let block: Block = zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?;
|
||||||
|
|
||||||
|
// use the first transaction that is not coinbase
|
||||||
|
let test_transaction = block
|
||||||
|
.transactions
|
||||||
|
.into_iter()
|
||||||
|
.find(|tx| !tx.has_any_coinbase_inputs())
|
||||||
|
.expect("at least one non-coinbase transaction");
|
||||||
|
let test_transaction_id = test_transaction.unmined_id();
|
||||||
|
let txs = HashSet::from_iter([test_transaction_id]);
|
||||||
|
|
||||||
|
let (inbound_service, _, mut tx_verifier, mut peer_set) = setup(false).await;
|
||||||
|
|
||||||
|
// Test `Request::AdvertiseTransactionIds`
|
||||||
|
let request = inbound_service
|
||||||
|
.clone()
|
||||||
|
.oneshot(Request::AdvertiseTransactionIds(txs.clone()));
|
||||||
|
// Ensure the mocked peer set responds
|
||||||
|
let peer_set_responder =
|
||||||
|
peer_set
|
||||||
|
.expect_request(Request::TransactionsById(txs))
|
||||||
|
.map(|responder| {
|
||||||
|
let unmined_transaction = UnminedTx {
|
||||||
|
id: test_transaction_id,
|
||||||
|
transaction: test_transaction,
|
||||||
|
};
|
||||||
|
|
||||||
|
responder.respond(Response::Transactions(vec![unmined_transaction]))
|
||||||
|
});
|
||||||
|
// Simulate a successful transaction verification
|
||||||
|
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
|
||||||
|
let txid = responder.request().tx_id();
|
||||||
|
responder.respond(txid);
|
||||||
|
});
|
||||||
|
let (response, _, _) = futures::join!(request, peer_set_responder, verification);
|
||||||
|
|
||||||
|
match response {
|
||||||
|
Ok(Response::Nil) => (),
|
||||||
|
_ => unreachable!("`AdvertiseTransactionIds` requests should always respond `Ok(Nil)`"),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Use `Request::MempoolTransactionIds` to check the transaction was inserted to mempool
|
||||||
|
let request = inbound_service
|
||||||
|
.clone()
|
||||||
|
.oneshot(Request::MempoolTransactionIds)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match request {
|
||||||
|
Ok(Response::TransactionIds(response)) => {
|
||||||
|
assert_eq!(response, vec![test_transaction_id])
|
||||||
|
}
|
||||||
|
_ => unreachable!(
|
||||||
|
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
peer_set.expect_no_requests().await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn setup(
|
||||||
|
add_transactions: bool,
|
||||||
|
) -> (
|
||||||
|
LoadShed<tower::buffer::Buffer<super::Inbound, zebra_network::Request>>,
|
||||||
|
Option<Vec<UnminedTx>>,
|
||||||
|
MockService<transaction::Request, transaction::Response, PanicAssertion, TransactionError>,
|
||||||
|
MockService<Request, Response, PanicAssertion>,
|
||||||
|
) {
|
||||||
|
let network = Network::Mainnet;
|
||||||
|
let consensus_config = ConsensusConfig::default();
|
||||||
|
let state_config = StateConfig::ephemeral();
|
||||||
|
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
|
||||||
|
let address_book = Arc::new(std::sync::Mutex::new(address_book));
|
||||||
|
let (sync_status, _recent_syncs) = SyncStatus::new();
|
||||||
|
let (_state_service, _latest_chain_tip, chain_tip_change) =
|
||||||
|
zebra_state::init(state_config.clone(), network);
|
||||||
|
|
||||||
|
let (state, _, _) = zebra_state::init(state_config, network);
|
||||||
|
let state_service = ServiceBuilder::new().buffer(1).service(state);
|
||||||
|
|
||||||
|
let (block_verifier, _transaction_verifier) =
|
||||||
|
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let peer_set = MockService::build().for_unit_tests();
|
||||||
|
let buffered_peer_set = Buffer::new(BoxService::new(peer_set.clone()), 10);
|
||||||
|
|
||||||
|
let mock_tx_verifier = MockService::build().for_unit_tests();
|
||||||
|
let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10);
|
||||||
|
|
||||||
|
let mut mempool_service = Mempool::new(
|
||||||
|
network,
|
||||||
|
buffered_peer_set.clone(),
|
||||||
|
state_service.clone(),
|
||||||
|
buffered_tx_verifier.clone(),
|
||||||
|
sync_status,
|
||||||
|
chain_tip_change,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut added_transactions = None;
|
||||||
|
if add_transactions {
|
||||||
|
added_transactions = Some(add_some_stuff_to_mempool(&mut mempool_service, network));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mempool_service = BoxService::new(mempool_service);
|
||||||
|
let mempool = ServiceBuilder::new().buffer(1).service(mempool_service);
|
||||||
|
|
||||||
|
let (setup_tx, setup_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let inbound_service = ServiceBuilder::new()
|
||||||
|
.load_shed()
|
||||||
|
.buffer(1)
|
||||||
|
.service(super::Inbound::new(
|
||||||
|
setup_rx,
|
||||||
|
state_service.clone(),
|
||||||
|
block_verifier.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
|
let r = setup_tx.send((buffered_peer_set, address_book, mempool));
|
||||||
|
// We can't expect or unwrap because the returned Result does not implement Debug
|
||||||
|
assert!(r.is_ok());
|
||||||
|
|
||||||
|
// Push the genesis block to the state
|
||||||
|
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
|
||||||
|
.zcash_deserialize_into()
|
||||||
|
.unwrap();
|
||||||
|
state_service
|
||||||
|
.oneshot(zebra_state::Request::CommitFinalizedBlock(
|
||||||
|
genesis_block.clone().into(),
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
(
|
||||||
|
inbound_service,
|
||||||
|
added_transactions,
|
||||||
|
mock_tx_verifier,
|
||||||
|
peer_set,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_some_stuff_to_mempool(mempool_service: &mut Mempool, network: Network) -> Vec<UnminedTx> {
|
fn add_some_stuff_to_mempool(mempool_service: &mut Mempool, network: Network) -> Vec<UnminedTx> {
|
||||||
|
|
Loading…
Reference in New Issue