Cancel download and verify tasks when the mempool is deactivated (#2764)

* Cancel download and verify tasks when the mempool is deactivated

* Refactor enable/disable logic to use a state enum

* Add helper test functions to enable/disable the mempool

* Add documentation about errors on service calls

* Improvements from review

* Improve documentation

* Fix bug in test

* Apply suggestions from code review

Co-authored-by: teor <teor@riseup.net>

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Conrado Gouvea 2021-09-28 20:06:40 -03:00 committed by GitHub
parent 1601c9fbb3
commit c6878d9b63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 389 additions and 88 deletions

View File

@ -237,6 +237,10 @@ impl Service<zn::Request> for Inbound {
Poll::Ready(result)
}
/// Call the inbound service.
///
/// Errors indicate that the peer has done something wrong or unexpected,
/// and will cause callers to disconnect from the remote peer.
#[instrument(name = "inbound", skip(self, req))]
fn call(&mut self, req: zn::Request) -> Self::Future {
match req {

View File

@ -6,7 +6,8 @@ use crate::components::sync::SyncStatus;
use futures::FutureExt;
use tokio::sync::oneshot;
use tower::{
buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, ServiceExt,
buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, Service,
ServiceExt,
};
use tracing::Span;
@ -34,15 +35,16 @@ async fn mempool_requests_for_transactions() {
.collect();
// Test `Request::MempoolTransactionIds`
let request = inbound_service
let response = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
match response {
Ok(Response::TransactionIds(response)) => assert_eq!(response, added_transaction_ids),
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
_ => unreachable!(format!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`, got {:?}",
response
)),
};
// Test `Request::TransactionsById`
@ -51,11 +53,11 @@ async fn mempool_requests_for_transactions() {
.copied()
.collect::<HashSet<_>>();
let request = inbound_service
let response = inbound_service
.oneshot(Request::TransactionsById(hash_set))
.await;
match request {
match response {
Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions.unwrap()),
_ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec<UnminedTx>)`"),
};
@ -184,12 +186,11 @@ async fn setup(
let state_config = StateConfig::ephemeral();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = Arc::new(std::sync::Mutex::new(address_book));
let (sync_status, _recent_syncs) = SyncStatus::new();
let (_state_service, _latest_chain_tip, chain_tip_change) =
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, _latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
let (block_verifier, _transaction_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
@ -201,6 +202,22 @@ async fn setup(
let mock_tx_verifier = MockService::build().for_unit_tests();
let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10);
// Push the genesis block to the state.
// This must be done before creating the mempool to avoid `chain_tip_change`
// returning "reset" which would clear the mempool.
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
let mut mempool_service = Mempool::new(
network,
buffered_peer_set.clone(),
@ -210,6 +227,9 @@ async fn setup(
chain_tip_change,
);
// Enable the mempool
let _ = mempool_service.enable(&mut recent_syncs).await;
let mut added_transactions = None;
if add_transactions {
added_transactions = Some(add_some_stuff_to_mempool(&mut mempool_service, network));
@ -233,17 +253,6 @@ async fn setup(
// We can't expect or unwrap because the returned Result does not implement Debug
assert!(r.is_ok());
// Push the genesis block to the state
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.oneshot(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
(
inbound_service,
added_transactions,

View File

@ -3,6 +3,7 @@
use std::{
collections::HashSet,
future::Future,
iter,
pin::Pin,
task::{Context, Poll},
};
@ -38,6 +39,8 @@ use self::downloads::{
Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
};
#[cfg(test)]
use super::sync::RecentSyncLengths;
use super::sync::SyncStatus;
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
@ -65,20 +68,33 @@ pub enum Response {
Queued(Vec<Result<(), MempoolError>>),
}
/// The state of the mempool.
///
/// Indicates wether it is enabled or disabled and, if enabled, contains
/// the necessary data to run it.
enum ActiveState {
/// The Mempool is disabled.
Disabled,
/// The Mempool is enabled.
Enabled {
/// The Mempool storage itself.
///
/// ##: Correctness: only components internal to the [`Mempool`] struct are allowed to
/// inject transactions into `storage`, as transactions must be verified beforehand.
storage: storage::Storage,
/// The transaction dowload and verify stream.
tx_downloads: Pin<Box<InboundTxDownloads>>,
},
}
/// Mempool async management and query service.
///
/// The mempool is the set of all verified transactions that this node is aware
/// of that have yet to be confirmed by the Zcash network. A transaction is
/// confirmed when it has been included in a block ('mined').
pub struct Mempool {
/// The Mempool storage itself.
///
/// ##: Correctness: only components internal to the [`Mempool`] struct are allowed to
/// inject transactions into `storage`, as transactions must be verified beforehand.
storage: storage::Storage,
/// The transaction dowload and verify stream.
tx_downloads: Pin<Box<InboundTxDownloads>>,
/// The state of the mempool.
active_state: ActiveState,
/// Allows checking if we are near the tip to enable/disable the mempool.
#[allow(dead_code)]
@ -87,6 +103,18 @@ pub struct Mempool {
/// Allows the detection of chain tip resets.
#[allow(dead_code)]
chain_tip_change: ChainTipChange,
/// Handle to the outbound service.
/// Used to construct the transaction downloader.
outbound: Outbound,
/// Handle to the state service.
/// Used to construct the transaction downloader.
state: State,
/// Handle to the transaction verifier service.
/// Used to construct the transaction downloader.
tx_verifier: TxVerifier,
}
impl Mempool {
@ -99,36 +127,100 @@ impl Mempool {
sync_status: SyncStatus,
chain_tip_change: ChainTipChange,
) -> Self {
let tx_downloads = Box::pin(TxDownloads::new(
Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT),
Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT),
state,
));
Mempool {
storage: Default::default(),
tx_downloads,
active_state: ActiveState::Disabled,
sync_status,
chain_tip_change,
outbound,
state,
tx_verifier,
}
}
/// Update the mempool state (enabled / disabled) depending on how close to
/// the tip is the synchronization, including side effects to state changes.
fn update_state(&mut self) {
let is_close_to_tip = self.sync_status.is_close_to_tip();
if self.is_enabled() == is_close_to_tip {
// the active state is up to date
return;
}
// Update enabled / disabled state
if is_close_to_tip {
let tx_downloads = Box::pin(TxDownloads::new(
Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT),
Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT),
self.state.clone(),
));
self.active_state = ActiveState::Enabled {
storage: Default::default(),
tx_downloads,
};
} else {
self.active_state = ActiveState::Disabled
}
}
/// Return whether the mempool is enabled or not.
pub fn is_enabled(&self) -> bool {
match self.active_state {
ActiveState::Disabled => false,
ActiveState::Enabled { .. } => true,
}
}
/// Get the storage field of the mempool for testing purposes.
#[cfg(test)]
pub fn storage(&mut self) -> &mut storage::Storage {
&mut self.storage
match &mut self.active_state {
ActiveState::Disabled => panic!("mempool must be enabled"),
ActiveState::Enabled { storage, .. } => storage,
}
}
/// Get the transaction downloader of the mempool for testing purposes.
#[cfg(test)]
pub fn tx_downloads(&self) -> &Pin<Box<InboundTxDownloads>> {
match &self.active_state {
ActiveState::Disabled => panic!("mempool must be enabled"),
ActiveState::Enabled { tx_downloads, .. } => tx_downloads,
}
}
/// Enable the mempool by pretending the synchronization is close to the tip.
#[cfg(test)]
pub async fn enable(&mut self, recent_syncs: &mut RecentSyncLengths) {
use tower::ServiceExt;
// Pretend we're close to tip
SyncStatus::sync_close_to_tip(recent_syncs);
// Make a dummy request to poll the mempool and make it enable itself
let _ = self.oneshot(Request::TransactionIds).await;
}
/// Disable the mempool by pretending the synchronization is far from the tip.
#[cfg(test)]
pub async fn disable(&mut self, recent_syncs: &mut RecentSyncLengths) {
use tower::ServiceExt;
// Pretend we're far from the tip
SyncStatus::sync_far_from_tip(recent_syncs);
// Make a dummy request to poll the mempool and make it disable itself
let _ = self.oneshot(Request::TransactionIds).await;
}
/// Check if transaction should be downloaded and/or verified.
///
/// If it is already in the mempool (or in its rejected list)
/// then it shouldn't be downloaded/verified.
fn should_download_or_verify(&mut self, txid: UnminedTxId) -> Result<(), MempoolError> {
fn should_download_or_verify(
storage: &mut storage::Storage,
txid: UnminedTxId,
) -> Result<(), MempoolError> {
// Check if the transaction is already in the mempool.
if self.storage.contains(&txid) {
if storage.contains(&txid) {
return Err(MempoolError::InMempool);
}
if self.storage.contains_rejected(&txid) {
if storage.contains_rejected(&txid) {
return Err(MempoolError::Rejected);
}
Ok(())
@ -142,48 +234,90 @@ impl Service<Request> for Mempool {
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Clear the mempool if there has been a chain tip reset.
if let Some(TipAction::Reset { .. }) = self.chain_tip_change.last_tip_change() {
self.storage.clear();
}
self.update_state();
// Clean up completed download tasks and add to mempool if successful
while let Poll::Ready(Some(r)) = self.tx_downloads.as_mut().poll_next(cx) {
if let Ok(tx) = r {
// TODO: should we do something with the result?
let _ = self.storage.insert(tx);
match &mut self.active_state {
ActiveState::Enabled {
storage,
tx_downloads,
} => {
// Clear the mempool if there has been a chain tip reset.
if let Some(TipAction::Reset { .. }) = self.chain_tip_change.last_tip_change() {
storage.clear();
}
// Clean up completed download tasks and add to mempool if successful
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
if let Ok(tx) = r {
// Storage handles conflicting transactions or a full mempool internally,
// so just ignore the storage result here
let _ = storage.insert(tx);
}
}
}
ActiveState::Disabled => {
// When the mempool is disabled we still return that the service is ready.
// Otherwise, callers could block waiting for the mempool to be enabled,
// which may not be the desired behaviour.
}
}
Poll::Ready(Ok(()))
}
/// Call the mempool service.
///
/// Errors indicate that the peer has done something wrong or unexpected,
/// and will cause callers to disconnect from the remote peer.
#[instrument(name = "mempool", skip(self, req))]
fn call(&mut self, req: Request) -> Self::Future {
match req {
Request::TransactionIds => {
let res = self.storage.tx_ids();
async move { Ok(Response::TransactionIds(res)) }.boxed()
}
Request::TransactionsById(ids) => {
let rsp = Ok(self.storage.transactions(ids)).map(Response::Transactions);
async move { rsp }.boxed()
}
Request::RejectedTransactionIds(ids) => {
let rsp = Ok(self.storage.rejected_transactions(ids))
.map(Response::RejectedTransactionIds);
async move { rsp }.boxed()
}
Request::Queue(gossiped_txs) => {
let rsp: Vec<Result<(), MempoolError>> = gossiped_txs
.into_iter()
.map(|gossiped_tx| {
self.should_download_or_verify(gossiped_tx.id())?;
self.tx_downloads
.download_if_needed_and_verify(gossiped_tx)?;
Ok(())
})
.collect();
async move { Ok(Response::Queued(rsp)) }.boxed()
match &mut self.active_state {
ActiveState::Enabled {
storage,
tx_downloads,
} => match req {
Request::TransactionIds => {
let res = storage.tx_ids();
async move { Ok(Response::TransactionIds(res)) }.boxed()
}
Request::TransactionsById(ids) => {
let rsp = Ok(storage.transactions(ids)).map(Response::Transactions);
async move { rsp }.boxed()
}
Request::RejectedTransactionIds(ids) => {
let rsp = Ok(storage.rejected_transactions(ids))
.map(Response::RejectedTransactionIds);
async move { rsp }.boxed()
}
Request::Queue(gossiped_txs) => {
let rsp: Vec<Result<(), MempoolError>> = gossiped_txs
.into_iter()
.map(|gossiped_tx| {
Self::should_download_or_verify(storage, gossiped_tx.id())?;
tx_downloads.download_if_needed_and_verify(gossiped_tx)?;
Ok(())
})
.collect();
async move { Ok(Response::Queued(rsp)) }.boxed()
}
},
ActiveState::Disabled => {
// We can't return an error since that will cause a disconnection
// by the peer connection handler. Therefore, return successful
// empty responses.
let resp = match req {
Request::TransactionIds => Response::TransactionIds(Default::default()),
Request::TransactionsById(_) => Response::Transactions(Default::default()),
Request::RejectedTransactionIds(_) => {
Response::RejectedTransactionIds(Default::default())
}
// Special case; we can signal the error inside the response.
Request::Queue(gossiped_txs) => Response::Queued(
iter::repeat(Err(MempoolError::Disabled))
.take(gossiped_txs.len())
.collect(),
),
};
async move { Ok(resp) }.boxed()
}
}
}

View File

@ -315,6 +315,13 @@ where
Ok(())
}
/// Get the number of currently in-flight download tasks.
// Note: copied from zebrad/src/components/sync/downloads.rs
#[allow(dead_code)]
pub fn in_flight(&self) -> usize {
self.pending.len()
}
/// Check if transaction is already in the state.
async fn transaction_in_state(state: &mut ZS, txid: UnminedTxId) -> Result<(), BoxError> {
// Check if the transaction is already in the state.

View File

@ -50,4 +50,7 @@ pub enum MempoolError {
its inputs"
)]
SpendConflict,
#[error("mempool is disabled since synchronization is behind the chain tip")]
Disabled,
}

View File

@ -15,7 +15,7 @@ async fn mempool_service_basic() -> Result<(), Report> {
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, _recent_syncs) = SyncStatus::new();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, _latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
@ -29,8 +29,8 @@ async fn mempool_service_basic() -> Result<(), Report> {
let genesis_transaction = unmined_transactions
.next()
.expect("Missing genesis transaction");
let mut more_transactions = unmined_transactions;
let last_transaction = more_transactions.next_back().unwrap();
let txid = unmined_transactions.next_back().unwrap().id;
let more_transactions = unmined_transactions;
// Start the mempool service
let mut service = Mempool::new(
@ -41,8 +41,12 @@ async fn mempool_service_basic() -> Result<(), Report> {
sync_status,
chain_tip_change,
);
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage.insert(genesis_transaction.clone())?;
service.storage().insert(genesis_transaction.clone())?;
// Test `Request::TransactionIds`
let response = service
@ -84,7 +88,7 @@ async fn mempool_service_basic() -> Result<(), Report> {
// This will cause the genesis transaction to be moved into rejected.
// Skip the last (will be used later)
for tx in more_transactions {
service.storage.insert(tx.clone())?;
service.storage().insert(tx.clone())?;
}
// Test `Request::RejectedTransactionIds`
@ -110,7 +114,7 @@ async fn mempool_service_basic() -> Result<(), Report> {
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![last_transaction.id.into()]))
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
@ -119,6 +123,7 @@ async fn mempool_service_basic() -> Result<(), Report> {
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);
Ok(())
}
@ -130,7 +135,7 @@ async fn mempool_queue() -> Result<(), Report> {
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, _recent_syncs) = SyncStatus::new();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, _latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);
@ -162,15 +167,19 @@ async fn mempool_queue() -> Result<(), Report> {
sync_status,
chain_tip_change,
);
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
// Insert [rejected_tx, transactions..., stored_tx] into the mempool storage.
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage.insert(rejected_tx.clone())?;
service.storage().insert(rejected_tx.clone())?;
// Insert more transactions into the mempool storage.
// This will cause the `rejected_tx` to be moved into rejected.
for tx in transactions {
service.storage.insert(tx.clone())?;
service.storage().insert(tx.clone())?;
}
service.storage.insert(stored_tx.clone())?;
service.storage().insert(stored_tx.clone())?;
// Test `Request::Queue` for a new transaction
let response = service
@ -219,3 +228,120 @@ async fn mempool_queue() -> Result<(), Report> {
Ok(())
}
#[tokio::test]
async fn mempool_service_disabled() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let peer_set = MockService::build().for_unit_tests();
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, _latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
// get the genesis block transactions from the Zcash blockchain.
let mut unmined_transactions = unmined_transactions_in_blocks(..=10, network);
let genesis_transaction = unmined_transactions
.next()
.expect("Missing genesis transaction");
let more_transactions = unmined_transactions;
// Start the mempool service
let mut service = Mempool::new(
network,
Buffer::new(BoxService::new(peer_set), 1),
state_service.clone(),
tx_verifier,
sync_status,
chain_tip_change,
);
// Test if mempool is disabled (it should start disabled)
assert!(!service.is_enabled());
// Enable the mempool
let _ = service.enable(&mut recent_syncs).await;
assert!(service.is_enabled());
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage().insert(genesis_transaction.clone())?;
// Test if the mempool answers correctly (i.e. is enabled)
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
let _genesis_transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
// Queue a transaction for download
// Use the ID of the last transaction in the list
let txid = more_transactions.last().unwrap().id;
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);
// Disable the mempool
let _ = service.disable(&mut recent_syncs).await;
// Test if mempool is disabled again
assert!(!service.is_enabled());
// Test if the mempool returns no transactions when disabled
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
match response {
Response::TransactionIds(ids) => {
assert_eq!(
ids.len(),
0,
"mempool should return no transactions when disabled"
)
}
_ => unreachable!("will never happen in this test"),
};
// Test if the mempool returns to Queue requests correctly when disabled
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(queued_responses[0], Err(MempoolError::Disabled));
Ok(())
}

View File

@ -28,7 +28,7 @@ mod status;
mod tests;
use downloads::{AlwaysHedge, Downloads};
use recent_sync_lengths::RecentSyncLengths;
pub use recent_sync_lengths::RecentSyncLengths;
pub use status::SyncStatus;
/// Controls the number of peers used for each ObtainTips and ExtendTips request.

View File

@ -69,4 +69,22 @@ impl SyncStatus {
// average sync length falls below the threshold.
avg < Self::MIN_DIST_FROM_TIP
}
/// Feed the given [`RecentSyncLengths`] it order to make the matching
/// [`SyncStatus`] report that it's close to the tip.
#[cfg(test)]
pub(crate) fn sync_close_to_tip(recent_syncs: &mut RecentSyncLengths) {
for _ in 0..RecentSyncLengths::MAX_RECENT_LENGTHS {
recent_syncs.push_extend_tips_length(1);
}
}
/// Feed the given [`RecentSyncLengths`] it order to make the matching
/// [`SyncStatus`] report that it's not close to the tip.
#[cfg(test)]
pub(crate) fn sync_far_from_tip(recent_syncs: &mut RecentSyncLengths) {
for _ in 0..RecentSyncLengths::MAX_RECENT_LENGTHS {
recent_syncs.push_extend_tips_length(Self::MIN_DIST_FROM_TIP * 10);
}
}
}