Restore and update mempool tests (#2966)

* Restore mempool_storage_basic

* Restore storage_is_cleared_on_chain_resets

* Restore mempool_service_basic() and mempool_queue()

* Fix tests and repeat multiple times to catch intermittent bugs

Co-authored-by: Deirdre Connolly <deirdre@zfnd.org>
Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
Conrado Gouvea 2021-10-28 17:55:05 -03:00 committed by GitHub
parent 8f04c9a243
commit df65b8cb65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 436 additions and 8 deletions

View File

@ -18,6 +18,9 @@ use crate::components::mempool::{
/// so we use a large enough value that will never be reached in the tests.
const EVICTION_MEMORY_TIME: Duration = Duration::from_secs(60 * 60);
/// Transaction count used in some tests to derive the mempool test size.
const MEMPOOL_TX_COUNT: usize = 4;
#[test]
fn mempool_storage_crud_exact_mainnet() {
zebra_test::init();
@ -50,6 +53,101 @@ fn mempool_storage_crud_exact_mainnet() {
assert!(!storage.contains_transaction_exact(&unmined_tx.transaction.id));
}
#[test]
fn mempool_storage_basic() -> Result<()> {
zebra_test::init();
// Test multiple times to catch intermittent bugs since eviction is randomized
for _ in 0..10 {
mempool_storage_basic_for_network(Network::Mainnet)?;
mempool_storage_basic_for_network(Network::Testnet)?;
}
Ok(())
}
fn mempool_storage_basic_for_network(network: Network) -> Result<()> {
// Get transactions from the first 10 blocks of the Zcash blockchain
let unmined_transactions: Vec<_> = unmined_transactions_in_blocks(..=10, network).collect();
assert!(
MEMPOOL_TX_COUNT < unmined_transactions.len(),
"inconsistent MEMPOOL_TX_COUNT value for this test; decrease it"
);
// Use the sum of the costs of the first `MEMPOOL_TX_COUNT` transactions
// as the cost limit
let tx_cost_limit = unmined_transactions
.iter()
.take(MEMPOOL_TX_COUNT)
.map(|tx| tx.cost())
.sum();
// Create an empty storage
let mut storage: Storage = Storage::new(&config::Config {
tx_cost_limit,
..Default::default()
});
// Insert them all to the storage
let mut maybe_inserted_transactions = Vec::new();
let mut some_rejected_transactions = Vec::new();
for unmined_transaction in unmined_transactions.clone() {
let result = storage.insert(unmined_transaction.clone());
match result {
Ok(_) => {
// While the transaction was inserted here, it can be rejected later.
maybe_inserted_transactions.push(unmined_transaction);
}
Err(_) => {
// Other transactions can be rejected on a successful insert,
// so not all rejected transactions will be added.
// Note that `some_rejected_transactions` can be empty since `insert` only
// returns a rejection error if the transaction being inserted is the one
// that was randomly evicted.
some_rejected_transactions.push(unmined_transaction);
}
}
}
// Since transactions are rejected randomly we can't test exact numbers.
// We know the first MEMPOOL_TX_COUNT must have been inserted successfully.
assert!(maybe_inserted_transactions.len() >= MEMPOOL_TX_COUNT);
assert_eq!(
some_rejected_transactions.len() + maybe_inserted_transactions.len(),
unmined_transactions.len()
);
// Test if the actual number of inserted/rejected transactions is consistent.
assert!(storage.verified.transaction_count() <= maybe_inserted_transactions.len());
assert!(storage.rejected_transaction_count() >= some_rejected_transactions.len());
// Test if rejected transactions were actually rejected.
for tx in some_rejected_transactions.iter() {
assert!(!storage.contains_transaction_exact(&tx.transaction.id));
}
// Query all the ids we have for rejected, get back `total - MEMPOOL_SIZE`
let all_ids: HashSet<UnminedTxId> = unmined_transactions
.iter()
.map(|tx| tx.transaction.id)
.collect();
// Convert response to a `HashSet`, because the order of the response doesn't matter.
let all_rejected_ids: HashSet<UnminedTxId> =
storage.rejected_transactions(all_ids).into_iter().collect();
let some_rejected_ids = some_rejected_transactions
.iter()
.map(|tx| tx.transaction.id)
.collect::<HashSet<_>>();
// Test if the rejected transactions we have are a subset of the actually
// rejected transactions.
assert!(some_rejected_ids.is_subset(&all_rejected_ids));
Ok(())
}
#[test]
fn mempool_storage_crud_same_effects_mainnet() {
zebra_test::init();

View File

@ -1,12 +1,13 @@
//! Randomised property tests for the mempool.
use proptest::collection::vec;
use proptest::prelude::*;
use proptest_derive::Arbitrary;
use tokio::time;
use tower::{buffer::Buffer, util::BoxService};
use zebra_chain::{parameters::Network, transaction::VerifiedUnminedTx};
use zebra_chain::{block, parameters::Network, transaction::VerifiedUnminedTx};
use zebra_consensus::{error::TransactionError, transaction as tx};
use zebra_network as zn;
use zebra_state::{self as zs, ChainTipBlock, ChainTipSender};
@ -26,6 +27,8 @@ type MockState = MockService<zs::Request, zs::Response, PropTestAssertion>;
/// A [`MockService`] representing the Zebra transaction verifier service.
type MockTxVerifier = MockService<tx::Request, tx::Response, PropTestAssertion, TransactionError>;
const CHAIN_LENGTH: usize = 10;
proptest! {
/// Test if the mempool storage is cleared on a chain reset.
#[test]
@ -81,6 +84,94 @@ proptest! {
})?;
}
/// Test if the mempool storage is cleared on multiple chain resets.
#[test]
fn storage_is_cleared_on_chain_resets(
network in any::<Network>(),
mut previous_chain_tip in any::<ChainTipBlock>(),
mut transactions in vec(any::<VerifiedUnminedTx>(), 0..CHAIN_LENGTH),
fake_chain_tips in vec(any::<FakeChainTip>(), 0..CHAIN_LENGTH),
) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
let _guard = runtime.enter();
runtime.block_on(async move {
let (
mut mempool,
mut peer_set,
mut state_service,
mut tx_verifier,
mut recent_syncs,
mut chain_tip_sender,
) = setup(network);
time::pause();
mempool.enable(&mut recent_syncs).await;
// Set the initial chain tip.
chain_tip_sender.set_best_non_finalized_tip(previous_chain_tip.clone());
// Call the mempool so that it is aware of the initial chain tip.
mempool.dummy_call().await;
for (fake_chain_tip, transaction) in fake_chain_tips.iter().zip(transactions.iter_mut()) {
// Obtain a new chain tip based on the previous one.
let chain_tip = fake_chain_tip.to_chain_tip_block(&previous_chain_tip);
// Adjust the transaction expiry height based on the new chain
// tip height so that the mempool does not evict the transaction
// when there is a chain growth.
if let Some(expiry_height) = transaction.transaction.transaction.expiry_height() {
if chain_tip.height >= expiry_height {
let mut tmp_tx = (*transaction.transaction.transaction).clone();
// Set a new expiry height that is greater than the
// height of the current chain tip.
*tmp_tx.expiry_height_mut() = block::Height(chain_tip.height.0 + 1);
transaction.transaction = tmp_tx.into();
}
}
// Insert the dummy transaction into the mempool.
mempool
.storage()
.insert(transaction.clone())
.expect("Inserting a transaction should succeed");
// Set the new chain tip.
chain_tip_sender.set_best_non_finalized_tip(chain_tip.clone());
// Call the mempool so that it is aware of the new chain tip.
mempool.dummy_call().await;
match fake_chain_tip {
FakeChainTip::Grow(_) => {
// The mempool should not be empty because we had a regular chain growth.
prop_assert_ne!(mempool.storage().transaction_count(), 0);
}
FakeChainTip::Reset(_) => {
// The mempool should be empty because we had a chain tip reset.
prop_assert_eq!(mempool.storage().transaction_count(), 0);
},
}
// Remember the current chain tip so that the next one can refer to it.
previous_chain_tip = chain_tip;
}
peer_set.expect_no_requests().await?;
state_service.expect_no_requests().await?;
tx_verifier.expect_no_requests().await?;
Ok(())
})?;
}
/// Test if the mempool storage is cleared if the syncer falls behind and starts to catch up.
#[test]
fn storage_is_cleared_if_syncer_falls_behind(
@ -185,3 +276,21 @@ enum FakeChainTip {
Grow(ChainTipBlock),
Reset(ChainTipBlock),
}
impl FakeChainTip {
/// Returns a new [`ChainTipBlock`] placed on top of the previous block if
/// the chain is supposed to grow. Otherwise returns a [`ChainTipBlock`]
/// that does not reference the previous one.
fn to_chain_tip_block(&self, previous: &ChainTipBlock) -> ChainTipBlock {
match self {
Self::Grow(chain_tip_block) => ChainTipBlock {
hash: chain_tip_block.hash,
height: block::Height(previous.height.0 + 1),
transaction_hashes: chain_tip_block.transaction_hashes.clone(),
previous_block_hash: previous.hash,
},
Self::Reset(chain_tip_block) => chain_tip_block.clone(),
}
}
}

View File

@ -1,6 +1,6 @@
//! Fixed test vectors for the mempool.
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};
use color_eyre::Report;
use tokio::time;
@ -25,13 +25,233 @@ type StateService = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>,
/// A [`MockService`] representing the Zebra transaction verifier service.
type MockTxVerifier = MockService<tx::Request, tx::Response, PanicAssertion, TransactionError>;
#[tokio::test]
async fn mempool_service_basic() -> Result<(), Report> {
// Test multiple times to catch intermittent bugs since eviction is randomized
for _ in 0..10 {
mempool_service_basic_single().await?;
}
Ok(())
}
async fn mempool_service_basic_single() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
// get the genesis block transactions from the Zcash blockchain.
let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network);
let genesis_transaction = unmined_transactions
.next()
.expect("Missing genesis transaction");
let last_transaction = unmined_transactions.next_back().unwrap();
let more_transactions = unmined_transactions.collect::<Vec<_>>();
// Use as cost limit the costs of all transactions that will be
// inserted except one (the genesis block transaction).
let cost_limit = more_transactions.iter().map(|tx| tx.cost()).sum();
let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) =
setup(network, cost_limit).await;
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
// Insert the genesis block coinbase transaction into the mempool storage.
let mut inserted_ids = HashSet::new();
service.storage().insert(genesis_transaction.clone())?;
inserted_ids.insert(genesis_transaction.transaction.id);
// Test `Request::TransactionIds`
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
let genesis_transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
// Test `Request::TransactionsById`
let genesis_transactions_hash_set = genesis_transaction_ids
.iter()
.copied()
.collect::<HashSet<_>>();
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionsById(
genesis_transactions_hash_set.clone(),
))
.await
.unwrap();
let transactions = match response {
Response::Transactions(transactions) => transactions,
_ => unreachable!("will never happen in this test"),
};
// Make sure the transaction from the blockchain test vector is the same as the
// response of `Request::TransactionsById`
assert_eq!(genesis_transaction.transaction, transactions[0]);
// Insert more transactions into the mempool storage.
// This will cause the genesis transaction to be moved into rejected.
// Skip the last (will be used later)
for tx in more_transactions {
inserted_ids.insert(tx.transaction.id);
// Error must be ignored because a insert can trigger an eviction and
// an error is returned if the transaction being inserted in chosen.
let _ = service.storage().insert(tx.clone());
}
// Test `Request::RejectedTransactionIds`
let response = service
.ready_and()
.await
.unwrap()
.call(Request::RejectedTransactionIds(
genesis_transactions_hash_set,
))
.await
.unwrap();
let rejected_ids = match response {
Response::RejectedTransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
assert!(rejected_ids.is_subset(&inserted_ids));
// Test `Request::Queue`
// Use the ID of the last transaction in the list
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![last_transaction.transaction.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);
Ok(())
}
#[tokio::test]
async fn mempool_queue() -> Result<(), Report> {
// Test multiple times to catch intermittent bugs since eviction is randomized
for _ in 0..10 {
mempool_queue_single().await?;
}
Ok(())
}
async fn mempool_queue_single() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
// Get transactions to use in the test
let unmined_transactions = unmined_transactions_in_blocks(..=10, network);
let mut transactions = unmined_transactions.collect::<Vec<_>>();
// Split unmined_transactions into:
// [transactions..., new_tx]
// A transaction not in the mempool that will be Queued
let new_tx = transactions.pop().unwrap();
// Use as cost limit the costs of all transactions that will be
// inserted except the last.
let cost_limit = transactions
.iter()
.take(transactions.len() - 1)
.map(|tx| tx.cost())
.sum();
let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) =
setup(network, cost_limit).await;
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
// Insert [transactions...] into the mempool storage.
// This will cause the at least one transaction to be rejected, since
// the cost limit is the sum of all costs except of the last transaction.
for tx in transactions.iter() {
// Error must be ignored because a insert can trigger an eviction and
// an error is returned if the transaction being inserted in chosen.
let _ = service.storage().insert(tx.clone());
}
// Test `Request::Queue` for a new transaction
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![new_tx.transaction.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
// Test `Request::Queue` with all previously inserted transactions.
// They should all be rejected; either because they are already in the mempool,
// or because they are in the recently evicted list.
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(
transactions
.iter()
.map(|tx| tx.transaction.id.into())
.collect(),
))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), transactions.len());
// Check if the responses are consistent
let mut in_mempool_count = 0;
let mut evicted_count = 0;
for response in queued_responses {
match response {
Ok(_) => panic!("all transactions should have been rejected"),
Err(e) => match e {
MempoolError::StorageEffectsChain(
SameEffectsChainRejectionError::RandomlyEvicted,
) => evicted_count += 1,
MempoolError::InMempool => in_mempool_count += 1,
_ => panic!("transaction should not be rejected with reason {:?}", e),
},
}
}
assert_eq!(in_mempool_count, transactions.len() - 1);
assert_eq!(evicted_count, 1);
Ok(())
}
#[tokio::test]
async fn mempool_service_disabled() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
setup(network, u64::MAX).await;
// get the genesis block transactions from the Zcash blockchain.
let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network);
@ -138,7 +358,7 @@ async fn mempool_cancel_mined() -> Result<(), Report> {
let network = Network::Mainnet;
let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
setup(network, u64::MAX).await;
time::pause();
@ -233,7 +453,7 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report>
let network = Network::Mainnet;
let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
setup(network, u64::MAX).await;
// Enable the mempool
let _ = mempool.enable(&mut recent_syncs).await;
@ -301,7 +521,7 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
let network = Network::Mainnet;
let (mut mempool, _peer_set, mut state_service, mut tx_verifier, mut recent_syncs) =
setup(network).await;
setup(network, u64::MAX).await;
// Get transactions to use in the test
let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network);
@ -384,7 +604,7 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
let network = Network::Mainnet;
let (mut mempool, mut peer_set, mut state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
setup(network, u64::MAX).await;
// Get transactions to use in the test
let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network);
@ -465,6 +685,7 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
/// Create a new [`Mempool`] instance using mocked services.
async fn setup(
network: Network,
tx_cost_limit: u64,
) -> (
Mempool,
MockPeerSet,
@ -484,7 +705,7 @@ async fn setup(
let (mempool, _mempool_transaction_receiver) = Mempool::new(
&mempool::Config {
tx_cost_limit: u64::MAX,
tx_cost_limit,
..Default::default()
},
Buffer::new(BoxService::new(peer_set.clone()), 1),