From cb1045ae5f21fd8faac630f4f9ddf24f3cc7dbba Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 13 Dec 2022 09:19:45 +1000 Subject: [PATCH] change(mempool): Re-verify mempool transactions after a chain fork, rather than re-downloading them all (#5841) * Move Drop from mempool::ActiveState to mempool::Downloads, to avoid bugs * Re-verify mempool transactions after a fork And add a marker struct for mempool download cancellation. * Update README based on recent mitigations for some issues, tidy format * Make mempool proptests easier to debug * Make UnminedTx Display text much smaller * Update tests for mempool transaction re-verification after forks * Retry all stored and pending transactions * Fix a test to check for mempool reset retries --- README.md | 14 ++--- zebra-chain/src/transaction.rs | 9 +++ zebra-chain/src/transaction/unmined.rs | 4 +- zebra-state/src/service/chain_tip.rs | 12 +++- zebra-test/src/mock_service.rs | 5 +- zebrad/src/components/mempool.rs | 57 ++++++++++++++---- zebrad/src/components/mempool/downloads.rs | 33 +++++++--- zebrad/src/components/mempool/tests/prop.rs | 60 +++++++++++-------- zebrad/src/components/mempool/tests/vector.rs | 18 +++++- 9 files changed, 147 insertions(+), 65 deletions(-) diff --git a/README.md b/README.md index f319a3380..bdcea5bf9 100644 --- a/README.md +++ b/README.md @@ -235,19 +235,13 @@ So Zebra's state should always be valid, unless your OS or disk hardware is corr There are a few bugs in Zebra that we're still working on fixing: -- Zebra falsely estimates that it's close to the tip when the network connection goes down [#4649](https://github.com/ZcashFoundation/zebra/issues/4649) - - - One of the consequences of this issue is that Zebra might add unwanted load - to other peers when the connection goes back up. This load will last only - for a short period of time because Zebra will quickly find out that it's - still not close to the tip. +- Zebra falsely estimates that it's close to the tip when the network connection goes down [#4649](https://github.com/ZcashFoundation/zebra/issues/4649). - If Zebra fails downloading the Zcash parameters, use [the Zcash parameters download script](https://github.com/zcash/zcash/blob/master/zcutil/fetch-params.sh) instead. This script might be needed on macOS, even with Rust stable. -- No Windows support [#3801](https://github.com/ZcashFoundation/zebra/issues/3801) - - We used to test with Windows Server 2019, but not anymore; see issue for details -- Experimental Tor support is disabled until [`arti-client` upgrades to `x25519-dalek` 2.0.0 or later](https://github.com/ZcashFoundation/zebra/issues/5492) - - This happens due to a Rust dependency conflict, which can only be resolved by changing the dependencies of `x25519-dalek` +- No Windows support [#3801](https://github.com/ZcashFoundation/zebra/issues/3801). We used to test with Windows Server 2019, but not anymore; see the issue for details. + +- Experimental Tor support is disabled until [`arti-client` upgrades to `x25519-dalek` 2.0.0 or later](https://github.com/ZcashFoundation/zebra/issues/5492). This happens due to a Rust dependency conflict, which can only be resolved by upgrading to a version of `x25519-dalek` with the dependency fix. - Output of `help`, `--help` flag, and usage of invalid commands or options are inconsistent. Reports of these issues can be found [here](https://github.com/ZcashFoundation/zebra/issues/5502) and are planned to be fixed in the context of [upgrading Abscissa](https://github.com/ZcashFoundation/zebra/issues/5502). diff --git a/zebra-chain/src/transaction.rs b/zebra-chain/src/transaction.rs index 88770bcfd..828c740aa 100644 --- a/zebra-chain/src/transaction.rs +++ b/zebra-chain/src/transaction.rs @@ -139,10 +139,19 @@ impl fmt::Display for Transaction { let mut fmter = f.debug_struct("Transaction"); fmter.field("version", &self.version()); + if let Some(network_upgrade) = self.network_upgrade() { fmter.field("network_upgrade", &network_upgrade); } + if let Some(lock_time) = self.lock_time() { + fmter.field("lock_time", &lock_time); + } + + if let Some(expiry_height) = self.expiry_height() { + fmter.field("expiry_height", &expiry_height); + } + fmter.field("transparent_inputs", &self.inputs().len()); fmter.field("transparent_outputs", &self.outputs().len()); fmter.field("sprout_joinsplits", &self.joinsplit_count()); diff --git a/zebra-chain/src/transaction/unmined.rs b/zebra-chain/src/transaction/unmined.rs index e5707f70a..14390ec09 100644 --- a/zebra-chain/src/transaction/unmined.rs +++ b/zebra-chain/src/transaction/unmined.rs @@ -223,7 +223,7 @@ pub struct UnminedTx { impl fmt::Display for UnminedTx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("UnminedTx") - .field("transaction", &self.transaction) + .field("transaction", &self.transaction.to_string()) .field("serialized_size", &self.size) .field("conventional_fee", &self.conventional_fee) .finish() @@ -327,7 +327,7 @@ pub struct VerifiedUnminedTx { impl fmt::Display for VerifiedUnminedTx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("VerifiedUnminedTx") - .field("transaction", &self.transaction) + .field("transaction", &self.transaction.to_string()) .field("miner_fee", &self.miner_fee) .field("legacy_sigop_count", &self.legacy_sigop_count) .field("unpaid_actions", &self.unpaid_actions) diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs index 5435483cd..db85536fb 100644 --- a/zebra-state/src/service/chain_tip.rs +++ b/zebra-state/src/service/chain_tip.rs @@ -5,7 +5,7 @@ //! * [LatestChainTip] for efficient access to the current best tip, and //! * [ChainTipChange] to `await` specific changes to the chain tip. -use std::sync::Arc; +use std::{fmt, sync::Arc}; use chrono::{DateTime, Utc}; use tokio::sync::watch; @@ -73,6 +73,16 @@ pub struct ChainTipBlock { pub previous_block_hash: block::Hash, } +impl fmt::Display for ChainTipBlock { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ChainTipBlock") + .field("height", &self.height) + .field("hash", &self.hash) + .field("transactions", &self.transactions.len()) + .finish() + } +} + impl From for ChainTipBlock { fn from(contextually_valid: ContextuallyValidBlock) -> Self { let ContextuallyValidBlock { diff --git a/zebra-test/src/mock_service.rs b/zebra-test/src/mock_service.rs index 3d4462286..e919a36dd 100644 --- a/zebra-test/src/mock_service.rs +++ b/zebra-test/src/mock_service.rs @@ -434,7 +434,8 @@ impl MockService MockService Option> { + pub async fn try_next_request(&mut self) -> Option> { loop { match timeout(self.max_request_delay, self.receiver.recv()).await { Ok(Ok(item)) => { diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 6c2713378..de42ccc2d 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -36,7 +36,7 @@ use zebra_chain::{ }; use zebra_consensus::{error::TransactionError, transaction}; use zebra_network as zn; -use zebra_node_services::mempool::{Request, Response}; +use zebra_node_services::mempool::{Gossip, Request, Response}; use zebra_state as zs; use zebra_state::{ChainTipChange, TipAction}; @@ -107,19 +107,32 @@ impl Default for ActiveState { } } -impl Drop for ActiveState { - fn drop(&mut self) { - if let ActiveState::Enabled { tx_downloads, .. } = self { - tx_downloads.cancel_all(); - } - } -} - impl ActiveState { /// Returns the current state, leaving [`Self::Disabled`] in its place. fn take(&mut self) -> Self { std::mem::take(self) } + + /// Returns a list of requests that will retry every stored and pending transaction. + fn transaction_retry_requests(&self) -> Vec { + match self { + ActiveState::Disabled => Vec::new(), + ActiveState::Enabled { + storage, + tx_downloads, + } => { + let mut transactions = Vec::new(); + + let storage = storage.transactions().map(|tx| tx.clone().into()); + transactions.extend(storage); + + let pending = tx_downloads.transaction_requests().cloned(); + transactions.extend(pending); + + transactions + } + } + } } /// Mempool async management and query service. @@ -259,8 +272,8 @@ impl Mempool { "deactivating mempool: Zebra is syncing lots of blocks" ); - // This drops the previous ActiveState::Enabled, - // cancelling its download tasks. + // This drops the previous ActiveState::Enabled, cancelling its download tasks. + // We don't preserve the previous transactions, because we are syncing lots of blocks. self.active_state = ActiveState::Disabled } @@ -319,17 +332,35 @@ impl Service for Mempool { "resetting mempool: switched best chain, skipped blocks, or activated network upgrade" ); + let previous_state = self.active_state.take(); + let tx_retries = previous_state.transaction_retry_requests(); + // Use the same code for dropping and resetting the mempool, // to avoid subtle bugs. - + // // Drop the current contents of the state, // cancelling any pending download tasks, // and dropping completed verification results. - std::mem::drop(self.active_state.take()); + std::mem::drop(previous_state); // Re-initialise an empty state. self.update_state(); + // Re-verify the transactions that were pending or valid at the previous tip. + // This saves us the time and data needed to re-download them. + if let ActiveState::Enabled { tx_downloads, .. } = &mut self.active_state { + info!( + transactions = tx_retries.len(), + "re-verifying mempool transactions after a chain fork" + ); + + for tx in tx_retries { + // This is just an efficiency optimisation, so we don't care if queueing + // transaction requests fails. + let _result = tx_downloads.download_if_needed_and_verify(tx); + } + } + return Poll::Ready(Ok(())); } diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 16fbcf578..99d7dc5fe 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -97,6 +97,10 @@ pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT; /// Therefore, this attack can be carried out by a single malicious node. pub const MAX_INBOUND_CONCURRENCY: usize = 10; +/// A marker struct for the oneshot channels which cancel a pending download and verify. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +struct CancelDownloadAndVerify; + /// Errors that can occur while downloading and verifying a transaction. #[derive(Error, Debug)] #[allow(dead_code)] @@ -122,7 +126,7 @@ pub enum TransactionDownloadVerifyError { #[derive(Debug)] pub struct Downloads where - ZN: Service + Send + 'static, + ZN: Service + Send + Clone + 'static, ZN::Future: Send, ZV: Service + Send + Clone + 'static, ZV::Future: Send, @@ -148,8 +152,8 @@ where >, /// A list of channels that can be used to cancel pending transaction download and - /// verify tasks. - cancel_handles: HashMap>, + /// verify tasks. Each channel also has the corresponding request. + cancel_handles: HashMap, Gossip)>, } impl Stream for Downloads @@ -264,12 +268,14 @@ where } // This oneshot is used to signal cancellation to the download task. - let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>(); + let (cancel_tx, mut cancel_rx) = oneshot::channel::(); let network = self.network.clone(); let verifier = self.verifier.clone(); let mut state = self.state.clone(); + let gossiped_tx_req = gossiped_tx.clone(); + let fut = async move { // Don't download/verify if the transaction is already in the best chain. Self::transaction_in_best_chain(&mut state, txid).await?; @@ -378,7 +384,9 @@ where self.pending.push(task); assert!( - self.cancel_handles.insert(txid, cancel_tx).is_none(), + self.cancel_handles + .insert(txid, (cancel_tx, gossiped_tx_req)) + .is_none(), "transactions are only queued once" ); @@ -411,7 +419,7 @@ where for txid in removed_txids { if let Some(handle) = self.cancel_handles.remove(&txid) { - let _ = handle.send(()); + let _ = handle.0.send(CancelDownloadAndVerify); } } } @@ -425,7 +433,7 @@ where // Since we already dropped the JoinHandles above, they should // fail silently. for (_hash, cancel) in self.cancel_handles.drain() { - let _ = cancel.send(()); + let _ = cancel.0.send(CancelDownloadAndVerify); } assert!(self.pending.is_empty()); assert!(self.cancel_handles.is_empty()); @@ -442,6 +450,11 @@ where self.pending.len() } + /// Get a list of the currently pending transaction requests. + pub fn transaction_requests(&self) -> impl Iterator { + self.cancel_handles.iter().map(|(_tx_id, (_handle, tx))| tx) + } + /// Check if transaction is already in the best chain. async fn transaction_in_best_chain( state: &mut ZS, @@ -467,14 +480,16 @@ where #[pinned_drop] impl PinnedDrop for Downloads where - ZN: Service + Send + 'static, + ZN: Service + Send + Clone + 'static, ZN::Future: Send, ZV: Service + Send + Clone + 'static, ZV::Future: Send, ZS: Service + Send + Clone + 'static, ZS::Future: Send, { - fn drop(self: Pin<&mut Self>) { + fn drop(mut self: Pin<&mut Self>) { + self.cancel_all(); + metrics::gauge!("mempool.currently.queued.transactions", 0 as f64); } } diff --git a/zebrad/src/components/mempool/tests/prop.rs b/zebrad/src/components/mempool/tests/prop.rs index 7f8a24dc1..562747a3f 100644 --- a/zebrad/src/components/mempool/tests/prop.rs +++ b/zebrad/src/components/mempool/tests/prop.rs @@ -1,6 +1,6 @@ //! Randomised property tests for the mempool. -use std::env; +use std::{env, fmt}; use proptest::{collection::vec, prelude::*}; use proptest_derive::Arbitrary; @@ -11,6 +11,7 @@ use tower::{buffer::Buffer, util::BoxService}; use zebra_chain::{ block, + fmt::DisplayToDebug, parameters::{Network, NetworkUpgrade}, transaction::VerifiedUnminedTx, }; @@ -49,17 +50,17 @@ proptest! { #[test] fn storage_is_cleared_on_single_chain_reset( network in any::(), - transaction in any::(), - chain_tip in any::(), + transaction in any::>(), + chain_tip in any::>(), ) { let (runtime, _init_guard) = zebra_test::init_async(); runtime.block_on(async move { let ( mut mempool, - mut peer_set, - mut state_service, - mut tx_verifier, + _peer_set, + _state_service, + _tx_verifier, mut recent_syncs, mut chain_tip_sender, ) = setup(network); @@ -71,7 +72,7 @@ proptest! { // Insert a dummy transaction. mempool .storage() - .insert(transaction) + .insert(transaction.0) .expect("Inserting a transaction should succeed"); // The first call to `poll_ready` shouldn't clear the storage yet. @@ -80,16 +81,15 @@ proptest! { prop_assert_eq!(mempool.storage().transaction_count(), 1); // Simulate a chain reset. - chain_tip_sender.set_finalized_tip(chain_tip); + chain_tip_sender.set_finalized_tip(chain_tip.0); // This time a call to `poll_ready` should clear the storage. mempool.dummy_call().await; prop_assert_eq!(mempool.storage().transaction_count(), 0); - peer_set.expect_no_requests().await?; - state_service.expect_no_requests().await?; - tx_verifier.expect_no_requests().await?; + // The services might or might not get requests, + // depending on how many transactions get re-queued, and if they need downloading. Ok(()) })?; @@ -99,18 +99,18 @@ proptest! { #[test] fn storage_is_cleared_on_multiple_chain_resets( network in any::(), - mut previous_chain_tip in any::(), - mut transactions in vec(any::(), 0..CHAIN_LENGTH), - fake_chain_tips in vec(any::(), 0..CHAIN_LENGTH), + mut previous_chain_tip in any::>(), + mut transactions in vec(any::>(), 0..CHAIN_LENGTH), + fake_chain_tips in vec(any::>(), 0..CHAIN_LENGTH), ) { let (runtime, _init_guard) = zebra_test::init_async(); runtime.block_on(async move { let ( mut mempool, - mut peer_set, - mut state_service, - mut tx_verifier, + _peer_set, + _state_service, + _tx_verifier, mut recent_syncs, mut chain_tip_sender, ) = setup(network); @@ -120,7 +120,7 @@ proptest! { mempool.enable(&mut recent_syncs).await; // Set the initial chain tip. - chain_tip_sender.set_best_non_finalized_tip(previous_chain_tip.clone()); + chain_tip_sender.set_best_non_finalized_tip(previous_chain_tip.0.clone()); // Call the mempool so that it is aware of the initial chain tip. mempool.dummy_call().await; @@ -146,7 +146,7 @@ proptest! { // Insert the dummy transaction into the mempool. mempool .storage() - .insert(transaction.clone()) + .insert(transaction.0.clone()) .expect("Inserting a transaction should succeed"); // Set the new chain tip. @@ -155,7 +155,7 @@ proptest! { // Call the mempool so that it is aware of the new chain tip. mempool.dummy_call().await; - match fake_chain_tip { + match fake_chain_tip.0 { FakeChainTip::Grow(_) => { // The mempool should not be empty because we had a regular chain growth. prop_assert_ne!(mempool.storage().transaction_count(), 0); @@ -168,12 +168,11 @@ proptest! { } // Remember the current chain tip so that the next one can refer to it. - previous_chain_tip = chain_tip; + previous_chain_tip = chain_tip.into(); } - peer_set.expect_no_requests().await?; - state_service.expect_no_requests().await?; - tx_verifier.expect_no_requests().await?; + // The services might or might not get requests, + // depending on how many transactions get re-queued, and if they need downloading. Ok(()) })?; @@ -274,12 +273,23 @@ fn setup( } /// A helper enum for simulating either a chain reset or growth. -#[derive(Arbitrary, Debug, Clone)] +#[derive(Arbitrary, Clone, Debug, Eq, PartialEq)] enum FakeChainTip { Grow(ChainTipBlock), Reset(ChainTipBlock), } +impl fmt::Display for FakeChainTip { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let (mut f, inner) = match self { + FakeChainTip::Grow(inner) => (f.debug_tuple("FakeChainTip::Grow"), inner), + FakeChainTip::Reset(inner) => (f.debug_tuple("FakeChainTip::Reset"), inner), + }; + + f.field(&inner).finish() + } +} + impl FakeChainTip { /// Returns a new [`ChainTipBlock`] placed on top of the previous block if /// the chain is supposed to grow. Otherwise returns a [`ChainTipBlock`] diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index c48ca2d18..b75d19974 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -536,7 +536,7 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> let ( mut mempool, - _peer_set, + mut peer_set, mut state_service, mut chain_tip_change, _tx_verifier, @@ -623,11 +623,23 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> ); } + // Ignore all the previous network requests. + while let Some(_request) = peer_set.try_next_request().await {} + // Query the mempool to make it poll chain_tip_change mempool.dummy_call().await; - // Check if download was cancelled. - assert_eq!(mempool.tx_downloads().in_flight(), 0); + // Check if download was cancelled and transaction was retried. + let request = peer_set + .try_next_request() + .await + .expect("unexpected missing mempool retry"); + + assert_eq!( + request.request(), + &zebra_network::Request::TransactionsById(iter::once(txid).collect()), + ); + assert_eq!(mempool.tx_downloads().in_flight(), 1); Ok(()) }