Pass the mempool config to the mempool (#2861)

* Split mempool config into its own module

Also:
- expand config docs
- clean up mempool imports

* Pass the mempool config to the mempool

* Create the transaction sender channel inside the mempool 1/2

This simplifies all the code that calls the mempool.

Also:
- update the mempool enabled state before returning the new mempool
- add some test module doc comments

* Refactor a setup function out of the mempool unit tests 2/2

Also:
- update the setup function to handle the latest mempool changes

* Clarify a comment

Co-authored-by: Conrado Gouvea <conrado@zfnd.org>
This commit is contained in:
teor 2021-10-13 03:31:54 +10:00 committed by GitHub
parent 09f23cb2e0
commit b274ee4066
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 152 additions and 226 deletions

View File

@ -26,10 +26,8 @@
use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
use color_eyre::eyre::{eyre, Report};
use futures::{select, FutureExt};
use std::collections::HashSet;
use tokio::sync::oneshot;
use tower::builder::ServiceBuilder;
use tower::util::BoxService;
use tower::{builder::ServiceBuilder, util::BoxService};
use crate::{
components::{
@ -92,20 +90,17 @@ impl StartCmd {
ChainSync::new(&config, peer_set.clone(), state.clone(), chain_verifier);
info!("initializing mempool");
let (mempool_transaction_sender, mempool_transaction_receiver) =
tokio::sync::watch::channel(HashSet::new());
let mempool_service = BoxService::new(Mempool::new(
let (mempool, mempool_transaction_receiver) = Mempool::new(
&config.mempool,
peer_set.clone(),
state,
tx_verifier,
sync_status.clone(),
latest_chain_tip,
chain_tip_change.clone(),
mempool_transaction_sender,
));
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);
);
let mempool = BoxService::new(mempool);
let mempool = ServiceBuilder::new().buffer(20).service(mempool);
setup_tx
.send((peer_set.clone(), address_book, mempool.clone()))

View File

@ -1,3 +1,5 @@
//! Inbound service tests.
use std::{collections::HashSet, iter::FromIterator, net::SocketAddr, str::FromStr, sync::Arc};
use futures::FutureExt;
@ -576,16 +578,14 @@ async fn setup(
.unwrap();
committed_blocks.push(block_one);
let (transaction_sender, transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
let mut mempool_service = Mempool::new(
let (mut mempool_service, transaction_receiver) = Mempool::new(
&mempool::Config::default(),
buffered_peer_set.clone(),
state_service.clone(),
buffered_tx_verifier.clone(),
sync_status.clone(),
latest_chain_tip,
chain_tip_change.clone(),
transaction_sender,
);
// Enable the mempool

View File

@ -1,14 +1,11 @@
//! Zebra mempool.
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
future::Future,
iter,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::{future::FutureExt, stream::Stream};
@ -24,8 +21,9 @@ use zebra_network as zn;
use zebra_state as zs;
use zebra_state::{ChainTipChange, TipAction};
pub use crate::BoxError;
use crate::components::sync::SyncStatus;
mod config;
mod crawler;
pub mod downloads;
mod error;
@ -35,23 +33,24 @@ mod storage;
#[cfg(test)]
mod tests;
pub use self::crawler::Crawler;
pub use self::error::MempoolError;
pub use self::storage::{
pub use crate::BoxError;
pub use config::Config;
pub use crawler::Crawler;
pub use error::MempoolError;
pub use gossip::gossip_mempool_transaction_id;
pub use storage::{
ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError,
};
#[cfg(test)]
pub use self::storage::tests::unmined_transactions_in_blocks;
pub use gossip::gossip_mempool_transaction_id;
pub use storage::tests::unmined_transactions_in_blocks;
use self::downloads::{
use downloads::{
Downloads as TxDownloads, Gossip, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
TRANSACTION_VERIFY_TIMEOUT,
};
use super::sync::SyncStatus;
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
type TxVerifier = Buffer<
@ -97,30 +96,6 @@ enum ActiveState {
},
}
/// Mempool configuration section.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Config {
/// The transaction cost limit
pub tx_cost_limit: u32,
/// Max amount of minutes for transactions to be in recently evicted
pub eviction_memory_time: Duration,
}
/// Consensus rules:
///
/// - There MUST be a configuration option mempooltxcostlimit, which SHOULD default to 80000000.
/// - There MUST be a configuration option mempoolevictionmemoryminutes, which SHOULD default to 60.
///
/// https://zips.z.cash/zip-0401#specification
impl Default for Config {
fn default() -> Self {
Self {
tx_cost_limit: 80_000_000,
eviction_memory_time: Duration::from_secs(60 * 60),
}
}
}
/// Mempool async management and query service.
///
/// The mempool is the set of all verified transactions that this node is aware
@ -157,15 +132,18 @@ pub struct Mempool {
impl Mempool {
pub(crate) fn new(
_config: &Config,
outbound: Outbound,
state: State,
tx_verifier: TxVerifier,
sync_status: SyncStatus,
latest_chain_tip: zs::LatestChainTip,
chain_tip_change: ChainTipChange,
transaction_sender: watch::Sender<HashSet<UnminedTxId>>,
) -> Self {
Mempool {
) -> (Self, watch::Receiver<HashSet<UnminedTxId>>) {
let (transaction_sender, transaction_receiver) =
tokio::sync::watch::channel(HashSet::new());
let mut service = Mempool {
active_state: ActiveState::Disabled,
sync_status,
latest_chain_tip,
@ -174,7 +152,13 @@ impl Mempool {
state,
tx_verifier,
transaction_sender,
}
};
// Make sure `is_enabled` is accurate.
// Otherwise, it is only updated in `poll_ready`, right before each service call.
service.update_state();
(service, transaction_receiver)
}
/// Update the mempool state (enabled / disabled) depending on how close to

View File

@ -0,0 +1,46 @@
//! User-configurable mempool parameters.
use std::time::Duration;
use serde::{Deserialize, Serialize};
/// Mempool configuration section.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Config {
/// The mempool transaction cost limit.
///
/// This limits the total serialized byte size of all transactions in the mempool.
///
/// This corresponds to `mempooltxcostlimit` from [ZIP-401](https://zips.z.cash/zip-0401#specification).
pub tx_cost_limit: u32,
/// The mempool transaction eviction age limit.
///
/// This limits the maximum amount of time evicted transaction IDs stay in the mempool rejection list.
/// Transactions are randomly evicted from the mempool when the mempool reaches [`tx_cost_limit`].
///
/// (Transactions can also be rejected by the mempool for other reasons.
/// Different rejection reasons can have different age limits.)
///
/// This corresponds to `mempoolevictionmemoryminutes` from
/// [ZIP-401](https://zips.z.cash/zip-0401#specification).
pub eviction_memory_time: Duration,
}
/// Consensus rules:
///
/// > There MUST be a configuration option mempooltxcostlimit,
/// > which SHOULD default to 80000000.
/// >
/// > There MUST be a configuration option mempoolevictionmemoryminutes,
/// > which SHOULD default to 60 [minutes].
///
/// https://zips.z.cash/zip-0401#specification
impl Default for Config {
fn default() -> Self {
Self {
tx_cost_limit: 80_000_000,
eviction_memory_time: Duration::from_secs(60 * 60),
}
}
}

View File

@ -1,5 +1,6 @@
//! Randomised property tests for the mempool.
use proptest::prelude::*;
use std::collections::HashSet;
use tokio::time;
use tower::{buffer::Buffer, util::BoxService};
@ -9,8 +10,10 @@ use zebra_network as zn;
use zebra_state::{self as zs, ChainTipBlock, ChainTipSender};
use zebra_test::mock_service::{MockService, PropTestAssertion};
use super::super::Mempool;
use crate::components::sync::{RecentSyncLengths, SyncStatus};
use crate::components::{
mempool::{self, Mempool},
sync::{RecentSyncLengths, SyncStatus},
};
/// A [`MockService`] representing the network service.
type MockPeerSet = MockService<zn::Request, zn::Response, PropTestAssertion>;
@ -151,16 +154,14 @@ fn setup(
let (sync_status, recent_syncs) = SyncStatus::new();
let (chain_tip_sender, latest_chain_tip, chain_tip_change) = ChainTipSender::new(None, network);
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
let mempool = Mempool::new(
let (mempool, _transaction_receiver) = Mempool::new(
&mempool::Config::default(),
Buffer::new(BoxService::new(peer_set.clone()), 1),
Buffer::new(BoxService::new(state_service.clone()), 1),
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
sync_status,
latest_chain_tip,
chain_tip_change,
transaction_sender,
);
(

View File

@ -1,3 +1,5 @@
//! Fixed test vectors for the mempool.
use std::{collections::HashSet, sync::Arc};
use color_eyre::Report;
@ -5,27 +7,31 @@ use tokio::time;
use tower::{ServiceBuilder, ServiceExt};
use zebra_chain::{block::Block, parameters::Network, serialization::ZcashDeserializeInto};
use zebra_consensus::Config as ConsensusConfig;
use zebra_consensus::transaction as tx;
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::MockService;
use zebra_test::mock_service::{MockService, PanicAssertion};
use super::super::{storage::tests::unmined_transactions_in_blocks, *};
use crate::components::{
mempool::{self, storage::tests::unmined_transactions_in_blocks, *},
sync::RecentSyncLengths,
};
/// A [`MockService`] representing the network service.
type MockPeerSet = MockService<zn::Request, zn::Response, PanicAssertion>;
/// The unmocked Zebra state service's type.
type StateService = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
/// A [`MockService`] representing the Zebra transaction verifier service.
type MockTxVerifier = MockService<tx::Request, tx::Response, PanicAssertion, TransactionError>;
#[tokio::test]
async fn mempool_service_basic() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
// get the genesis block transactions from the Zcash blockchain.
let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network);
@ -35,19 +41,6 @@ async fn mempool_service_basic() -> Result<(), Report> {
let last_transaction = unmined_transactions.next_back().unwrap();
let more_transactions = unmined_transactions;
// Start the mempool service
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
let mut service = Mempool::new(
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
transaction_sender,
);
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
@ -138,17 +131,9 @@ async fn mempool_service_basic() -> Result<(), Report> {
async fn mempool_queue() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
// Get transactions to use in the test
let unmined_transactions = unmined_transactions_in_blocks(..=10, network);
@ -164,19 +149,6 @@ async fn mempool_queue() -> Result<(), Report> {
// The last transaction that will be added in the mempool (and thus not rejected)
let stored_tx = transactions.next_back().unwrap().clone();
// Start the mempool service
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
let mut service = Mempool::new(
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
transaction_sender,
);
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
@ -247,16 +219,9 @@ async fn mempool_queue() -> Result<(), Report> {
async fn mempool_service_disabled() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
// get the genesis block transactions from the Zcash blockchain.
let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network);
@ -265,19 +230,6 @@ async fn mempool_service_disabled() -> Result<(), Report> {
.expect("Missing genesis transaction");
let more_transactions = unmined_transactions;
// Start the mempool service
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
let mut service = Mempool::new(
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
transaction_sender,
);
// Test if mempool is disabled (it should start disabled)
assert!(!service.is_enabled());
@ -374,33 +326,12 @@ async fn mempool_cancel_mined() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
time::pause();
// Start the mempool service
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
let mut mempool = Mempool::new(
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
transaction_sender,
);
// Enable the mempool
let _ = mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());
@ -490,30 +421,9 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report>
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
// Start the mempool service
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
let mut mempool = Mempool::new(
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
transaction_sender,
);
let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
// Enable the mempool
let _ = mempool.enable(&mut recent_syncs).await;
@ -579,18 +489,9 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report>
async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, _tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
let mut tx_verifier = MockService::build().for_unit_tests();
let (mut mempool, _peer_set, mut state_service, mut tx_verifier, mut recent_syncs) =
setup(network).await;
// Get transactions to use in the test
let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network);
@ -598,19 +499,6 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
time::pause();
// Start the mempool service
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
let mut mempool = Mempool::new(
Buffer::new(BoxService::new(peer_set.clone()), 1),
state_service.clone(),
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
sync_status,
latest_chain_tip,
chain_tip_change,
transaction_sender,
);
// Enable the mempool
let _ = mempool.enable(&mut recent_syncs).await;
@ -684,17 +572,9 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let mut peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
let (mut mempool, mut peer_set, mut state_service, _tx_verifier, mut recent_syncs) =
setup(network).await;
// Get transactions to use in the test
let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network);
@ -702,19 +582,6 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
time::pause();
// Start the mempool service
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
let mut mempool = Mempool::new(
Buffer::new(BoxService::new(peer_set.clone()), 1),
state_service.clone(),
tx_verifier,
sync_status,
latest_chain_tip,
chain_tip_change,
transaction_sender,
);
// Enable the mempool
let _ = mempool.enable(&mut recent_syncs).await;
@ -778,3 +645,36 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
Ok(())
}
/// Create a new [`Mempool`] instance using mocked services.
async fn setup(
network: Network,
) -> (
Mempool,
MockPeerSet,
StateService,
MockTxVerifier,
RecentSyncLengths,
) {
let peer_set = MockService::build().for_unit_tests();
let state_config = StateConfig::ephemeral();
let (state, latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let tx_verifier = MockService::build().for_unit_tests();
let (sync_status, recent_syncs) = SyncStatus::new();
let (mempool, _mempool_transaction_receiver) = Mempool::new(
&mempool::Config::default(),
Buffer::new(BoxService::new(peer_set.clone()), 1),
state_service.clone(),
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
sync_status,
latest_chain_tip,
chain_tip_change,
);
(mempool, peer_set, state_service, tx_verifier, recent_syncs)
}