Test if the mempool storage is cleared (#2815)

* Move mempool tests into `tests::vector` sub-module

Make it consistent with other test modules and prepare for adding
property tests.

* Reorder imports

Make it consistent with the general guidelines followed on other
modules.

* Export `ChainTipBlock` and `ChainTipSender`

Allow these types to be used in other crates for testing purposes.

* Derive `Arbitrary` for `ChainTipBlock`

Make it easy to generate random `ChainTipBlock`s for usage in property
tests.

* Refactor to move test methods into `tests` module

Reduce the repeated test configuration attributes and make it easier to
see what is test specific and what is part of the general
implementation.

* Add a `Mempool::dummy_call` test helper method

Performs a dummy call just so that `poll_ready` gets called.

* Use `dummy_call` in existing tests

Replace the custom dummy requests with the helper method.

* Test if the mempool is cleared on chain reset

A chain reset should force the mempool storage to be cleared so that
transaction verification can restart using the new chain tip.

* Test if mempool is cleared on syncer restart

If the block synchronizer falls behind and then starts catching up
again, the mempool should be disabled and therefore the storage should
be cleared.
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2021-10-01 11:44:25 -03:00 committed by GitHub
parent 966f52a280
commit 50a5728d0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 783 additions and 623 deletions

View File

@ -6,7 +6,7 @@ license = "MIT OR Apache-2.0"
edition = "2018"
[features]
proptest-impl = ["proptest", "zebra-test"]
proptest-impl = ["proptest", "proptest-derive", "zebra-test"]
[dependencies]
zebra-chain = { path = "../zebra-chain" }
@ -34,6 +34,7 @@ rlimit = "0.5.4"
multiset = "0.0.5"
proptest = { version = "0.10.1", optional = true }
proptest-derive = { version = "0.3", optional = true }
zebra-test = { path = "../zebra-test/", optional = true }
[dev-dependencies]

View File

@ -41,6 +41,9 @@ pub use service::{
};
#[cfg(any(test, feature = "proptest-impl"))]
pub use service::init_test;
pub use service::{
chain_tip::{ChainTipBlock, ChainTipSender},
init_test,
};
pub(crate) use request::ContextuallyValidBlock;

View File

@ -10,6 +10,9 @@ use std::sync::Arc;
use tokio::sync::watch;
use tracing::instrument;
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
use zebra_chain::{
block,
chain_tip::ChainTip,
@ -33,6 +36,7 @@ type ChainTipData = Option<ChainTipBlock>;
/// Used to efficiently update [`ChainTipSender`], [`LatestChainTip`],
/// and [`ChainTipChange`].
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub struct ChainTipBlock {
/// The hash of the best chain tip block.
pub hash: block::Hash,

View File

@ -61,6 +61,7 @@ proptest = "0.10"
proptest-derive = "0.3"
zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }
zebra-state = { path = "../zebra-state", features = ["proptest-impl"] }
zebra-test = { path = "../zebra-test" }
[features]

View File

@ -40,8 +40,6 @@ use self::downloads::{
Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
};
#[cfg(test)]
use super::sync::RecentSyncLengths;
use super::sync::SyncStatus;
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
@ -175,44 +173,6 @@ impl Mempool {
}
}
/// Get the storage field of the mempool for testing purposes.
#[cfg(test)]
pub fn storage(&mut self) -> &mut storage::Storage {
match &mut self.active_state {
ActiveState::Disabled => panic!("mempool must be enabled"),
ActiveState::Enabled { storage, .. } => storage,
}
}
/// Get the transaction downloader of the mempool for testing purposes.
#[cfg(test)]
pub fn tx_downloads(&self) -> &Pin<Box<InboundTxDownloads>> {
match &self.active_state {
ActiveState::Disabled => panic!("mempool must be enabled"),
ActiveState::Enabled { tx_downloads, .. } => tx_downloads,
}
}
/// Enable the mempool by pretending the synchronization is close to the tip.
#[cfg(test)]
pub async fn enable(&mut self, recent_syncs: &mut RecentSyncLengths) {
use tower::ServiceExt;
// Pretend we're close to tip
SyncStatus::sync_close_to_tip(recent_syncs);
// Make a dummy request to poll the mempool and make it enable itself
let _ = self.oneshot(Request::TransactionIds).await;
}
/// Disable the mempool by pretending the synchronization is far from the tip.
#[cfg(test)]
pub async fn disable(&mut self, recent_syncs: &mut RecentSyncLengths) {
use tower::ServiceExt;
// Pretend we're far from the tip
SyncStatus::sync_far_from_tip(recent_syncs);
// Make a dummy request to poll the mempool and make it disable itself
let _ = self.oneshot(Request::TransactionIds).await;
}
/// Check if transaction should be downloaded and/or verified.
///
/// If it is already in the mempool (or in its rejected list)

View File

@ -1,590 +1,50 @@
use super::*;
use color_eyre::Report;
use std::{collections::HashSet, sync::Arc};
use storage::tests::unmined_transactions_in_blocks;
use tokio::time;
use tower::{ServiceBuilder, ServiceExt};
use std::pin::Pin;
use zebra_chain::block::Block;
use zebra_chain::serialization::ZcashDeserializeInto;
use zebra_consensus::Config as ConsensusConfig;
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::MockService;
use tower::ServiceExt;
#[tokio::test]
async fn mempool_service_basic() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
use super::{storage::Storage, ActiveState, InboundTxDownloads, Mempool, Request};
use crate::components::sync::{RecentSyncLengths, SyncStatus};
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
mod prop;
mod vector;
// get the genesis block transactions from the Zcash blockchain.
let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network);
let genesis_transaction = unmined_transactions
.next()
.expect("Missing genesis transaction");
let txid = unmined_transactions.next_back().unwrap().id;
let more_transactions = unmined_transactions;
// Start the mempool service
let mut service = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
);
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage().insert(genesis_transaction.clone())?;
// Test `Request::TransactionIds`
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
let genesis_transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
// Test `Request::TransactionsById`
let genesis_transactions_hash_set = genesis_transaction_ids
.iter()
.copied()
.collect::<HashSet<_>>();
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionsById(
genesis_transactions_hash_set.clone(),
))
.await
.unwrap();
let transactions = match response {
Response::Transactions(transactions) => transactions,
_ => unreachable!("will never happen in this test"),
};
// Make sure the transaction from the blockchain test vector is the same as the
// response of `Request::TransactionsById`
assert_eq!(genesis_transaction, transactions[0]);
// Insert more transactions into the mempool storage.
// This will cause the genesis transaction to be moved into rejected.
// Skip the last (will be used later)
for tx in more_transactions {
service.storage().insert(tx.clone())?;
}
// Test `Request::RejectedTransactionIds`
let response = service
.ready_and()
.await
.unwrap()
.call(Request::RejectedTransactionIds(
genesis_transactions_hash_set,
))
.await
.unwrap();
let rejected_ids = match response {
Response::RejectedTransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(rejected_ids, genesis_transaction_ids);
// Test `Request::Queue`
// Use the ID of the last transaction in the list
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);
Ok(())
}
#[tokio::test]
async fn mempool_queue() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
// Get transactions to use in the test
let unmined_transactions = unmined_transactions_in_blocks(..=10, network);
let mut transactions = unmined_transactions;
// Split unmined_transactions into:
// [rejected_tx, transactions..., stored_tx, new_tx]
//
// The first transaction to be added in the mempool which will be eventually
// put in the rejected list
let rejected_tx = transactions.next().unwrap().clone();
// A transaction not in the mempool that will be Queued
let new_tx = transactions.next_back().unwrap();
// The last transaction that will be added in the mempool (and thus not rejected)
let stored_tx = transactions.next_back().unwrap().clone();
// Start the mempool service
let mut service = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
);
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
// Insert [rejected_tx, transactions..., stored_tx] into the mempool storage.
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage().insert(rejected_tx.clone())?;
// Insert more transactions into the mempool storage.
// This will cause the `rejected_tx` to be moved into rejected.
for tx in transactions {
service.storage().insert(tx.clone())?;
}
service.storage().insert(stored_tx.clone())?;
// Test `Request::Queue` for a new transaction
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![new_tx.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
// Test `Request::Queue` for a transaction already in the mempool
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![stored_tx.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(queued_responses[0], Err(MempoolError::InMempool));
// Test `Request::Queue` for a transaction rejected by the mempool
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![rejected_tx.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(queued_responses[0], Err(MempoolError::Rejected));
Ok(())
}
#[tokio::test]
async fn mempool_service_disabled() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
// get the genesis block transactions from the Zcash blockchain.
let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network);
let genesis_transaction = unmined_transactions
.next()
.expect("Missing genesis transaction");
let more_transactions = unmined_transactions;
// Start the mempool service
let mut service = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
);
// Test if mempool is disabled (it should start disabled)
assert!(!service.is_enabled());
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
assert!(service.is_enabled());
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage().insert(genesis_transaction.clone())?;
// Test if the mempool answers correctly (i.e. is enabled)
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
let _genesis_transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
// Queue a transaction for download
// Use the ID of the last transaction in the list
let txid = more_transactions.last().unwrap().id;
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);
// Disable the mempool
let _ = service.disable(&mut recent_syncs).await;
// Test if mempool is disabled again
assert!(!service.is_enabled());
// Test if the mempool returns no transactions when disabled
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
match response {
Response::TransactionIds(ids) => {
assert_eq!(
ids.len(),
0,
"mempool should return no transactions when disabled"
)
impl Mempool {
/// Get the storage field of the mempool for testing purposes.
pub fn storage(&mut self) -> &mut Storage {
match &mut self.active_state {
ActiveState::Disabled => panic!("mempool must be enabled"),
ActiveState::Enabled { storage, .. } => storage,
}
_ => unreachable!("will never happen in this test"),
};
// Test if the mempool returns to Queue requests correctly when disabled
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(queued_responses[0], Err(MempoolError::Disabled));
Ok(())
}
#[tokio::test]
async fn mempool_cancel_mined() -> Result<(), Report> {
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
time::pause();
// Start the mempool service
let mut mempool = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
);
// Enable the mempool
let _ = mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());
// Push the genesis block to the state
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
// Query the mempool to make it poll chain_tip_change
let _response = mempool
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
// Push block 1 to the state
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
block1.clone().into(),
))
.await
.unwrap();
// Query the mempool to make it poll chain_tip_change
let _response = mempool
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
// Queue transaction from block 2 for download.
// It can't be queued before because block 1 triggers a network upgrade,
// which cancels all downloads.
let txid = block2.transactions[0].unmined_id();
let response = mempool
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(mempool.tx_downloads().in_flight(), 1);
// Push block 2 to the state
state_service
.oneshot(zebra_state::Request::CommitFinalizedBlock(
block2.clone().into(),
))
.await
.unwrap();
// This is done twice because after the first query the cancellation
// is picked up by select!, and after the second the mempool gets the
// result and the download future is removed.
for _ in 0..2 {
// Query the mempool just to poll it and make it cancel the download.
let _response = mempool
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
// Sleep to avoid starvation and make sure the cancellation is picked up.
time::sleep(time::Duration::from_millis(100)).await;
}
// Check if download was cancelled.
assert_eq!(mempool.tx_downloads().in_flight(), 0);
/// Get the transaction downloader of the mempool for testing purposes.
pub fn tx_downloads(&self) -> &Pin<Box<InboundTxDownloads>> {
match &self.active_state {
ActiveState::Disabled => panic!("mempool must be enabled"),
ActiveState::Enabled { tx_downloads, .. } => tx_downloads,
}
}
Ok(())
}
#[tokio::test]
async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> {
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
// Start the mempool service
let mut mempool = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
);
// Enable the mempool
let _ = mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());
// Push the genesis block to the state
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
// Queue transaction from block 2 for download
let txid = block2.transactions[0].unmined_id();
let response = mempool
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(mempool.tx_downloads().in_flight(), 1);
// Query the mempool to make it poll chain_tip_change
let _response = mempool
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
// Push block 1 to the state. This is considered a network upgrade,
// and thus must cancel all pending transaction downloads.
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
block1.clone().into(),
))
.await
.unwrap();
// Query the mempool to make it poll chain_tip_change
let _response = mempool
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
// Check if download was cancelled.
assert_eq!(mempool.tx_downloads().in_flight(), 0);
Ok(())
/// Enable the mempool by pretending the synchronization is close to the tip.
pub async fn enable(&mut self, recent_syncs: &mut RecentSyncLengths) {
// Pretend we're close to tip
SyncStatus::sync_close_to_tip(recent_syncs);
// Make a dummy request to poll the mempool and make it enable itself
self.dummy_call().await;
}
/// Disable the mempool by pretending the synchronization is far from the tip.
pub async fn disable(&mut self, recent_syncs: &mut RecentSyncLengths) {
// Pretend we're far from the tip
SyncStatus::sync_far_from_tip(recent_syncs);
// Make a dummy request to poll the mempool and make it disable itself
self.dummy_call().await;
}
/// Perform a dummy service call so that `poll_ready` is called.
pub async fn dummy_call(&mut self) {
self.oneshot(Request::Queue(vec![]))
.await
.expect("Queuing no transactions shouldn't fail");
}
}

View File

@ -0,0 +1,171 @@
use proptest::prelude::*;
use tokio::time;
use tower::{buffer::Buffer, util::BoxService};
use zebra_chain::{parameters::Network, transaction::UnminedTx};
use zebra_consensus::{error::TransactionError, transaction as tx};
use zebra_network as zn;
use zebra_state::{self as zs, ChainTipBlock, ChainTipSender};
use zebra_test::mock_service::{MockService, PropTestAssertion};
use super::super::Mempool;
use crate::components::sync::{RecentSyncLengths, SyncStatus};
/// A [`MockService`] representing the network service.
type MockPeerSet = MockService<zn::Request, zn::Response, PropTestAssertion>;
/// A [`MockService`] representing the Zebra state service.
type MockState = MockService<zs::Request, zs::Response, PropTestAssertion>;
/// A [`MockService`] representing the Zebra transaction verifier service.
type MockTxVerifier = MockService<tx::Request, tx::Response, PropTestAssertion, TransactionError>;
proptest! {
/// Test if the mempool storage is cleared on a chain reset.
#[test]
fn storage_is_cleared_on_chain_reset(
network in any::<Network>(),
transaction in any::<UnminedTx>(),
chain_tip in any::<ChainTipBlock>(),
) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
let _guard = runtime.enter();
runtime.block_on(async move {
let (
mut mempool,
mut peer_set,
mut state_service,
mut tx_verifier,
mut recent_syncs,
mut chain_tip_sender,
) = setup(network);
time::pause();
mempool.enable(&mut recent_syncs).await;
// Insert a dummy transaction.
mempool
.storage()
.insert(transaction)
.expect("Inserting a transaction should succeed");
// The first call to `poll_ready` shouldn't clear the storage yet.
mempool.dummy_call().await;
prop_assert_eq!(mempool.storage().tx_ids().len(), 1);
// Simulate a chain reset.
chain_tip_sender.set_finalized_tip(chain_tip);
// This time a call to `poll_ready` should clear the storage.
mempool.dummy_call().await;
prop_assert!(mempool.storage().tx_ids().is_empty());
peer_set.expect_no_requests().await?;
state_service.expect_no_requests().await?;
tx_verifier.expect_no_requests().await?;
Ok(())
})?;
}
/// Test if the mempool storage is cleared if the syncer falls behind and starts to catch up.
#[test]
fn storage_is_cleared_if_syncer_falls_behind(
network in any::<Network>(),
transaction in any::<UnminedTx>(),
) {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
let _guard = runtime.enter();
runtime.block_on(async move {
let (
mut mempool,
mut peer_set,
mut state_service,
mut tx_verifier,
mut recent_syncs,
_chain_tip_sender,
) = setup(network);
time::pause();
mempool.enable(&mut recent_syncs).await;
// Insert a dummy transaction.
mempool
.storage()
.insert(transaction)
.expect("Inserting a transaction should succeed");
// The first call to `poll_ready` shouldn't clear the storage yet.
mempool.dummy_call().await;
prop_assert_eq!(mempool.storage().tx_ids().len(), 1);
// Simulate the synchronizer catching up to the network chain tip.
mempool.disable(&mut recent_syncs).await;
// This time a call to `poll_ready` should clear the storage.
mempool.dummy_call().await;
// Enable the mempool again so the storage can be accessed.
mempool.enable(&mut recent_syncs).await;
prop_assert!(mempool.storage().tx_ids().is_empty());
peer_set.expect_no_requests().await?;
state_service.expect_no_requests().await?;
tx_verifier.expect_no_requests().await?;
Ok(())
})?;
}
}
/// Create a new [`Mempool`] instance using mocked services.
fn setup(
network: Network,
) -> (
Mempool,
MockPeerSet,
MockState,
MockTxVerifier,
RecentSyncLengths,
ChainTipSender,
) {
let peer_set = MockService::build().for_prop_tests();
let state_service = MockService::build().for_prop_tests();
let tx_verifier = MockService::build().for_prop_tests();
let (sync_status, recent_syncs) = SyncStatus::new();
let (chain_tip_sender, latest_chain_tip, chain_tip_change) = ChainTipSender::new(None, network);
let mempool = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set.clone()), 1),
Buffer::new(BoxService::new(state_service.clone()), 1),
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
sync_status,
latest_chain_tip,
chain_tip_change,
);
(
mempool,
peer_set,
state_service,
tx_verifier,
recent_syncs,
chain_tip_sender,
)
}

View File

@ -0,0 +1,560 @@
use std::{collections::HashSet, sync::Arc};
use color_eyre::Report;
use tokio::time;
use tower::{ServiceBuilder, ServiceExt};
use zebra_chain::{block::Block, serialization::ZcashDeserializeInto};
use zebra_consensus::Config as ConsensusConfig;
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::MockService;
use super::super::{storage::tests::unmined_transactions_in_blocks, *};
#[tokio::test]
async fn mempool_service_basic() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
// get the genesis block transactions from the Zcash blockchain.
let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network);
let genesis_transaction = unmined_transactions
.next()
.expect("Missing genesis transaction");
let last_transaction = unmined_transactions.next_back().unwrap();
let more_transactions = unmined_transactions;
// Start the mempool service
let mut service = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
);
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage().insert(genesis_transaction.clone())?;
// Test `Request::TransactionIds`
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
let genesis_transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
// Test `Request::TransactionsById`
let genesis_transactions_hash_set = genesis_transaction_ids
.iter()
.copied()
.collect::<HashSet<_>>();
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionsById(
genesis_transactions_hash_set.clone(),
))
.await
.unwrap();
let transactions = match response {
Response::Transactions(transactions) => transactions,
_ => unreachable!("will never happen in this test"),
};
// Make sure the transaction from the blockchain test vector is the same as the
// response of `Request::TransactionsById`
assert_eq!(genesis_transaction, transactions[0]);
// Insert more transactions into the mempool storage.
// This will cause the genesis transaction to be moved into rejected.
// Skip the last (will be used later)
for tx in more_transactions {
service.storage().insert(tx.clone())?;
}
// Test `Request::RejectedTransactionIds`
let response = service
.ready_and()
.await
.unwrap()
.call(Request::RejectedTransactionIds(
genesis_transactions_hash_set,
))
.await
.unwrap();
let rejected_ids = match response {
Response::RejectedTransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(rejected_ids, genesis_transaction_ids);
// Test `Request::Queue`
// Use the ID of the last transaction in the list
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![last_transaction.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);
Ok(())
}
#[tokio::test]
async fn mempool_queue() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
// Get transactions to use in the test
let unmined_transactions = unmined_transactions_in_blocks(..=10, network);
let mut transactions = unmined_transactions;
// Split unmined_transactions into:
// [rejected_tx, transactions..., stored_tx, new_tx]
//
// The first transaction to be added in the mempool which will be eventually
// put in the rejected list
let rejected_tx = transactions.next().unwrap().clone();
// A transaction not in the mempool that will be Queued
let new_tx = transactions.next_back().unwrap();
// The last transaction that will be added in the mempool (and thus not rejected)
let stored_tx = transactions.next_back().unwrap().clone();
// Start the mempool service
let mut service = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
);
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
// Insert [rejected_tx, transactions..., stored_tx] into the mempool storage.
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage().insert(rejected_tx.clone())?;
// Insert more transactions into the mempool storage.
// This will cause the `rejected_tx` to be moved into rejected.
for tx in transactions {
service.storage().insert(tx.clone())?;
}
service.storage().insert(stored_tx.clone())?;
// Test `Request::Queue` for a new transaction
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![new_tx.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
// Test `Request::Queue` for a transaction already in the mempool
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![stored_tx.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(queued_responses[0], Err(MempoolError::InMempool));
// Test `Request::Queue` for a transaction rejected by the mempool
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![rejected_tx.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(queued_responses[0], Err(MempoolError::Rejected));
Ok(())
}
#[tokio::test]
async fn mempool_service_disabled() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
// get the genesis block transactions from the Zcash blockchain.
let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network);
let genesis_transaction = unmined_transactions
.next()
.expect("Missing genesis transaction");
let more_transactions = unmined_transactions;
// Start the mempool service
let mut service = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
);
// Test if mempool is disabled (it should start disabled)
assert!(!service.is_enabled());
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
assert!(service.is_enabled());
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage().insert(genesis_transaction.clone())?;
// Test if the mempool answers correctly (i.e. is enabled)
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
let _genesis_transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
// Queue a transaction for download
// Use the ID of the last transaction in the list
let txid = more_transactions.last().unwrap().id;
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);
// Disable the mempool
let _ = service.disable(&mut recent_syncs).await;
// Test if mempool is disabled again
assert!(!service.is_enabled());
// Test if the mempool returns no transactions when disabled
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
match response {
Response::TransactionIds(ids) => {
assert_eq!(
ids.len(),
0,
"mempool should return no transactions when disabled"
)
}
_ => unreachable!("will never happen in this test"),
};
// Test if the mempool returns to Queue requests correctly when disabled
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(queued_responses[0], Err(MempoolError::Disabled));
Ok(())
}
#[tokio::test]
async fn mempool_cancel_mined() -> Result<(), Report> {
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
time::pause();
// Start the mempool service
let mut mempool = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
);
// Enable the mempool
let _ = mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());
// Push the genesis block to the state
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
// Query the mempool to make it poll chain_tip_change
mempool.dummy_call().await;
// Push block 1 to the state
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
block1.clone().into(),
))
.await
.unwrap();
// Query the mempool to make it poll chain_tip_change
mempool.dummy_call().await;
// Queue transaction from block 2 for download.
// It can't be queued before because block 1 triggers a network upgrade,
// which cancels all downloads.
let txid = block2.transactions[0].unmined_id();
let response = mempool
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(mempool.tx_downloads().in_flight(), 1);
// Push block 2 to the state
state_service
.oneshot(zebra_state::Request::CommitFinalizedBlock(
block2.clone().into(),
))
.await
.unwrap();
// This is done twice because after the first query the cancellation
// is picked up by select!, and after the second the mempool gets the
// result and the download future is removed.
for _ in 0..2 {
// Query the mempool just to poll it and make it cancel the download.
mempool.dummy_call().await;
// Sleep to avoid starvation and make sure the cancellation is picked up.
time::sleep(time::Duration::from_millis(100)).await;
}
// Check if download was cancelled.
assert_eq!(mempool.tx_downloads().in_flight(), 0);
Ok(())
}
#[tokio::test]
async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> {
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
// Start the mempool service
let mut mempool = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
);
// Enable the mempool
let _ = mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());
// Push the genesis block to the state
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
// Queue transaction from block 2 for download
let txid = block2.transactions[0].unmined_id();
let response = mempool
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(mempool.tx_downloads().in_flight(), 1);
// Query the mempool to make it poll chain_tip_change
mempool.dummy_call().await;
// Push block 1 to the state. This is considered a network upgrade,
// and thus must cancel all pending transaction downloads.
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
block1.clone().into(),
))
.await
.unwrap();
// Query the mempool to make it poll chain_tip_change
mempool.dummy_call().await;
// Check if download was cancelled.
assert_eq!(mempool.tx_downloads().in_flight(), 0);
Ok(())
}