change(mempool): Re-verify mempool transactions after a chain fork, rather than re-downloading them all (#5841)

* Move Drop from mempool::ActiveState to mempool::Downloads, to avoid bugs

* Re-verify mempool transactions after a fork

And add a marker struct for mempool download cancellation.

* Update README based on recent mitigations for some issues, tidy format

* Make mempool proptests easier to debug

* Make UnminedTx Display text much smaller

* Update tests for mempool transaction re-verification after forks

* Retry all stored and pending transactions

* Fix a test to check for mempool reset retries
This commit is contained in:
teor 2022-12-13 09:19:45 +10:00 committed by GitHub
parent 47073ab30a
commit cb1045ae5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 147 additions and 65 deletions

View File

@ -235,19 +235,13 @@ So Zebra's state should always be valid, unless your OS or disk hardware is corr
There are a few bugs in Zebra that we're still working on fixing:
- Zebra falsely estimates that it's close to the tip when the network connection goes down [#4649](https://github.com/ZcashFoundation/zebra/issues/4649)
- One of the consequences of this issue is that Zebra might add unwanted load
to other peers when the connection goes back up. This load will last only
for a short period of time because Zebra will quickly find out that it's
still not close to the tip.
- Zebra falsely estimates that it's close to the tip when the network connection goes down [#4649](https://github.com/ZcashFoundation/zebra/issues/4649).
- If Zebra fails downloading the Zcash parameters, use [the Zcash parameters download script](https://github.com/zcash/zcash/blob/master/zcutil/fetch-params.sh) instead. This script might be needed on macOS, even with Rust stable.
- No Windows support [#3801](https://github.com/ZcashFoundation/zebra/issues/3801)
- We used to test with Windows Server 2019, but not anymore; see issue for details
- Experimental Tor support is disabled until [`arti-client` upgrades to `x25519-dalek` 2.0.0 or later](https://github.com/ZcashFoundation/zebra/issues/5492)
- This happens due to a Rust dependency conflict, which can only be resolved by changing the dependencies of `x25519-dalek`
- No Windows support [#3801](https://github.com/ZcashFoundation/zebra/issues/3801). We used to test with Windows Server 2019, but not anymore; see the issue for details.
- Experimental Tor support is disabled until [`arti-client` upgrades to `x25519-dalek` 2.0.0 or later](https://github.com/ZcashFoundation/zebra/issues/5492). This happens due to a Rust dependency conflict, which can only be resolved by upgrading to a version of `x25519-dalek` with the dependency fix.
- Output of `help`, `--help` flag, and usage of invalid commands or options are inconsistent. Reports of these issues can be found [here](https://github.com/ZcashFoundation/zebra/issues/5502) and are planned to be fixed in the context of [upgrading Abscissa](https://github.com/ZcashFoundation/zebra/issues/5502).

View File

@ -139,10 +139,19 @@ impl fmt::Display for Transaction {
let mut fmter = f.debug_struct("Transaction");
fmter.field("version", &self.version());
if let Some(network_upgrade) = self.network_upgrade() {
fmter.field("network_upgrade", &network_upgrade);
}
if let Some(lock_time) = self.lock_time() {
fmter.field("lock_time", &lock_time);
}
if let Some(expiry_height) = self.expiry_height() {
fmter.field("expiry_height", &expiry_height);
}
fmter.field("transparent_inputs", &self.inputs().len());
fmter.field("transparent_outputs", &self.outputs().len());
fmter.field("sprout_joinsplits", &self.joinsplit_count());

View File

@ -223,7 +223,7 @@ pub struct UnminedTx {
impl fmt::Display for UnminedTx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnminedTx")
.field("transaction", &self.transaction)
.field("transaction", &self.transaction.to_string())
.field("serialized_size", &self.size)
.field("conventional_fee", &self.conventional_fee)
.finish()
@ -327,7 +327,7 @@ pub struct VerifiedUnminedTx {
impl fmt::Display for VerifiedUnminedTx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("VerifiedUnminedTx")
.field("transaction", &self.transaction)
.field("transaction", &self.transaction.to_string())
.field("miner_fee", &self.miner_fee)
.field("legacy_sigop_count", &self.legacy_sigop_count)
.field("unpaid_actions", &self.unpaid_actions)

View File

@ -5,7 +5,7 @@
//! * [LatestChainTip] for efficient access to the current best tip, and
//! * [ChainTipChange] to `await` specific changes to the chain tip.
use std::sync::Arc;
use std::{fmt, sync::Arc};
use chrono::{DateTime, Utc};
use tokio::sync::watch;
@ -73,6 +73,16 @@ pub struct ChainTipBlock {
pub previous_block_hash: block::Hash,
}
impl fmt::Display for ChainTipBlock {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ChainTipBlock")
.field("height", &self.height)
.field("hash", &self.hash)
.field("transactions", &self.transactions.len())
.finish()
}
}
impl From<ContextuallyValidBlock> for ChainTipBlock {
fn from(contextually_valid: ContextuallyValidBlock) -> Self {
let ContextuallyValidBlock {

View File

@ -434,7 +434,8 @@ impl<Request, Response, Error> MockService<Request, Response, PanicAssertion, Er
}
}
/// A helper method to get the next request from the queue.
/// Returns the next request from the queue,
/// or panics if there are no requests after a short timeout.
///
/// Returns the next request in the internal queue or waits at most the max delay time
/// configured by [`MockServiceBuilder::with_max_request_delay`] for a new request to be
@ -687,7 +688,7 @@ impl<Request, Response, Assertion, Error> MockService<Request, Response, Asserti
/// If too many requests are received and the queue fills up, the oldest requests are dropped
/// and ignored. This means that calling this may not receive the next request if the queue is
/// not dimensioned properly with the [`MockServiceBuilder::with_proxy_channel_size`] method.
async fn try_next_request(&mut self) -> Option<ResponseSender<Request, Response, Error>> {
pub async fn try_next_request(&mut self) -> Option<ResponseSender<Request, Response, Error>> {
loop {
match timeout(self.max_request_delay, self.receiver.recv()).await {
Ok(Ok(item)) => {

View File

@ -36,7 +36,7 @@ use zebra_chain::{
};
use zebra_consensus::{error::TransactionError, transaction};
use zebra_network as zn;
use zebra_node_services::mempool::{Request, Response};
use zebra_node_services::mempool::{Gossip, Request, Response};
use zebra_state as zs;
use zebra_state::{ChainTipChange, TipAction};
@ -107,19 +107,32 @@ impl Default for ActiveState {
}
}
impl Drop for ActiveState {
fn drop(&mut self) {
if let ActiveState::Enabled { tx_downloads, .. } = self {
tx_downloads.cancel_all();
}
}
}
impl ActiveState {
/// Returns the current state, leaving [`Self::Disabled`] in its place.
fn take(&mut self) -> Self {
std::mem::take(self)
}
/// Returns a list of requests that will retry every stored and pending transaction.
fn transaction_retry_requests(&self) -> Vec<Gossip> {
match self {
ActiveState::Disabled => Vec::new(),
ActiveState::Enabled {
storage,
tx_downloads,
} => {
let mut transactions = Vec::new();
let storage = storage.transactions().map(|tx| tx.clone().into());
transactions.extend(storage);
let pending = tx_downloads.transaction_requests().cloned();
transactions.extend(pending);
transactions
}
}
}
}
/// Mempool async management and query service.
@ -259,8 +272,8 @@ impl Mempool {
"deactivating mempool: Zebra is syncing lots of blocks"
);
// This drops the previous ActiveState::Enabled,
// cancelling its download tasks.
// This drops the previous ActiveState::Enabled, cancelling its download tasks.
// We don't preserve the previous transactions, because we are syncing lots of blocks.
self.active_state = ActiveState::Disabled
}
@ -319,17 +332,35 @@ impl Service<Request> for Mempool {
"resetting mempool: switched best chain, skipped blocks, or activated network upgrade"
);
let previous_state = self.active_state.take();
let tx_retries = previous_state.transaction_retry_requests();
// Use the same code for dropping and resetting the mempool,
// to avoid subtle bugs.
//
// Drop the current contents of the state,
// cancelling any pending download tasks,
// and dropping completed verification results.
std::mem::drop(self.active_state.take());
std::mem::drop(previous_state);
// Re-initialise an empty state.
self.update_state();
// Re-verify the transactions that were pending or valid at the previous tip.
// This saves us the time and data needed to re-download them.
if let ActiveState::Enabled { tx_downloads, .. } = &mut self.active_state {
info!(
transactions = tx_retries.len(),
"re-verifying mempool transactions after a chain fork"
);
for tx in tx_retries {
// This is just an efficiency optimisation, so we don't care if queueing
// transaction requests fails.
let _result = tx_downloads.download_if_needed_and_verify(tx);
}
}
return Poll::Ready(Ok(()));
}

View File

@ -97,6 +97,10 @@ pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT;
/// Therefore, this attack can be carried out by a single malicious node.
pub const MAX_INBOUND_CONCURRENCY: usize = 10;
/// A marker struct for the oneshot channels which cancel a pending download and verify.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
struct CancelDownloadAndVerify;
/// Errors that can occur while downloading and verifying a transaction.
#[derive(Error, Debug)]
#[allow(dead_code)]
@ -122,7 +126,7 @@ pub enum TransactionDownloadVerifyError {
#[derive(Debug)]
pub struct Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
@ -148,8 +152,8 @@ where
>,
/// A list of channels that can be used to cancel pending transaction download and
/// verify tasks.
cancel_handles: HashMap<UnminedTxId, oneshot::Sender<()>>,
/// verify tasks. Each channel also has the corresponding request.
cancel_handles: HashMap<UnminedTxId, (oneshot::Sender<CancelDownloadAndVerify>, Gossip)>,
}
impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
@ -264,12 +268,14 @@ where
}
// This oneshot is used to signal cancellation to the download task.
let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
let (cancel_tx, mut cancel_rx) = oneshot::channel::<CancelDownloadAndVerify>();
let network = self.network.clone();
let verifier = self.verifier.clone();
let mut state = self.state.clone();
let gossiped_tx_req = gossiped_tx.clone();
let fut = async move {
// Don't download/verify if the transaction is already in the best chain.
Self::transaction_in_best_chain(&mut state, txid).await?;
@ -378,7 +384,9 @@ where
self.pending.push(task);
assert!(
self.cancel_handles.insert(txid, cancel_tx).is_none(),
self.cancel_handles
.insert(txid, (cancel_tx, gossiped_tx_req))
.is_none(),
"transactions are only queued once"
);
@ -411,7 +419,7 @@ where
for txid in removed_txids {
if let Some(handle) = self.cancel_handles.remove(&txid) {
let _ = handle.send(());
let _ = handle.0.send(CancelDownloadAndVerify);
}
}
}
@ -425,7 +433,7 @@ where
// Since we already dropped the JoinHandles above, they should
// fail silently.
for (_hash, cancel) in self.cancel_handles.drain() {
let _ = cancel.send(());
let _ = cancel.0.send(CancelDownloadAndVerify);
}
assert!(self.pending.is_empty());
assert!(self.cancel_handles.is_empty());
@ -442,6 +450,11 @@ where
self.pending.len()
}
/// Get a list of the currently pending transaction requests.
pub fn transaction_requests(&self) -> impl Iterator<Item = &Gossip> {
self.cancel_handles.iter().map(|(_tx_id, (_handle, tx))| tx)
}
/// Check if transaction is already in the best chain.
async fn transaction_in_best_chain(
state: &mut ZS,
@ -467,14 +480,16 @@ where
#[pinned_drop]
impl<ZN, ZV, ZS> PinnedDrop for Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
fn drop(self: Pin<&mut Self>) {
fn drop(mut self: Pin<&mut Self>) {
self.cancel_all();
metrics::gauge!("mempool.currently.queued.transactions", 0 as f64);
}
}

View File

@ -1,6 +1,6 @@
//! Randomised property tests for the mempool.
use std::env;
use std::{env, fmt};
use proptest::{collection::vec, prelude::*};
use proptest_derive::Arbitrary;
@ -11,6 +11,7 @@ use tower::{buffer::Buffer, util::BoxService};
use zebra_chain::{
block,
fmt::DisplayToDebug,
parameters::{Network, NetworkUpgrade},
transaction::VerifiedUnminedTx,
};
@ -49,17 +50,17 @@ proptest! {
#[test]
fn storage_is_cleared_on_single_chain_reset(
network in any::<Network>(),
transaction in any::<VerifiedUnminedTx>(),
chain_tip in any::<ChainTipBlock>(),
transaction in any::<DisplayToDebug<VerifiedUnminedTx>>(),
chain_tip in any::<DisplayToDebug<ChainTipBlock>>(),
) {
let (runtime, _init_guard) = zebra_test::init_async();
runtime.block_on(async move {
let (
mut mempool,
mut peer_set,
mut state_service,
mut tx_verifier,
_peer_set,
_state_service,
_tx_verifier,
mut recent_syncs,
mut chain_tip_sender,
) = setup(network);
@ -71,7 +72,7 @@ proptest! {
// Insert a dummy transaction.
mempool
.storage()
.insert(transaction)
.insert(transaction.0)
.expect("Inserting a transaction should succeed");
// The first call to `poll_ready` shouldn't clear the storage yet.
@ -80,16 +81,15 @@ proptest! {
prop_assert_eq!(mempool.storage().transaction_count(), 1);
// Simulate a chain reset.
chain_tip_sender.set_finalized_tip(chain_tip);
chain_tip_sender.set_finalized_tip(chain_tip.0);
// This time a call to `poll_ready` should clear the storage.
mempool.dummy_call().await;
prop_assert_eq!(mempool.storage().transaction_count(), 0);
peer_set.expect_no_requests().await?;
state_service.expect_no_requests().await?;
tx_verifier.expect_no_requests().await?;
// The services might or might not get requests,
// depending on how many transactions get re-queued, and if they need downloading.
Ok(())
})?;
@ -99,18 +99,18 @@ proptest! {
#[test]
fn storage_is_cleared_on_multiple_chain_resets(
network in any::<Network>(),
mut previous_chain_tip in any::<ChainTipBlock>(),
mut transactions in vec(any::<VerifiedUnminedTx>(), 0..CHAIN_LENGTH),
fake_chain_tips in vec(any::<FakeChainTip>(), 0..CHAIN_LENGTH),
mut previous_chain_tip in any::<DisplayToDebug<ChainTipBlock>>(),
mut transactions in vec(any::<DisplayToDebug<VerifiedUnminedTx>>(), 0..CHAIN_LENGTH),
fake_chain_tips in vec(any::<DisplayToDebug<FakeChainTip>>(), 0..CHAIN_LENGTH),
) {
let (runtime, _init_guard) = zebra_test::init_async();
runtime.block_on(async move {
let (
mut mempool,
mut peer_set,
mut state_service,
mut tx_verifier,
_peer_set,
_state_service,
_tx_verifier,
mut recent_syncs,
mut chain_tip_sender,
) = setup(network);
@ -120,7 +120,7 @@ proptest! {
mempool.enable(&mut recent_syncs).await;
// Set the initial chain tip.
chain_tip_sender.set_best_non_finalized_tip(previous_chain_tip.clone());
chain_tip_sender.set_best_non_finalized_tip(previous_chain_tip.0.clone());
// Call the mempool so that it is aware of the initial chain tip.
mempool.dummy_call().await;
@ -146,7 +146,7 @@ proptest! {
// Insert the dummy transaction into the mempool.
mempool
.storage()
.insert(transaction.clone())
.insert(transaction.0.clone())
.expect("Inserting a transaction should succeed");
// Set the new chain tip.
@ -155,7 +155,7 @@ proptest! {
// Call the mempool so that it is aware of the new chain tip.
mempool.dummy_call().await;
match fake_chain_tip {
match fake_chain_tip.0 {
FakeChainTip::Grow(_) => {
// The mempool should not be empty because we had a regular chain growth.
prop_assert_ne!(mempool.storage().transaction_count(), 0);
@ -168,12 +168,11 @@ proptest! {
}
// Remember the current chain tip so that the next one can refer to it.
previous_chain_tip = chain_tip;
previous_chain_tip = chain_tip.into();
}
peer_set.expect_no_requests().await?;
state_service.expect_no_requests().await?;
tx_verifier.expect_no_requests().await?;
// The services might or might not get requests,
// depending on how many transactions get re-queued, and if they need downloading.
Ok(())
})?;
@ -274,12 +273,23 @@ fn setup(
}
/// A helper enum for simulating either a chain reset or growth.
#[derive(Arbitrary, Debug, Clone)]
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq)]
enum FakeChainTip {
Grow(ChainTipBlock),
Reset(ChainTipBlock),
}
impl fmt::Display for FakeChainTip {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let (mut f, inner) = match self {
FakeChainTip::Grow(inner) => (f.debug_tuple("FakeChainTip::Grow"), inner),
FakeChainTip::Reset(inner) => (f.debug_tuple("FakeChainTip::Reset"), inner),
};
f.field(&inner).finish()
}
}
impl FakeChainTip {
/// Returns a new [`ChainTipBlock`] placed on top of the previous block if
/// the chain is supposed to grow. Otherwise returns a [`ChainTipBlock`]

View File

@ -536,7 +536,7 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report>
let (
mut mempool,
_peer_set,
mut peer_set,
mut state_service,
mut chain_tip_change,
_tx_verifier,
@ -623,11 +623,23 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report>
);
}
// Ignore all the previous network requests.
while let Some(_request) = peer_set.try_next_request().await {}
// Query the mempool to make it poll chain_tip_change
mempool.dummy_call().await;
// Check if download was cancelled.
assert_eq!(mempool.tx_downloads().in_flight(), 0);
// Check if download was cancelled and transaction was retried.
let request = peer_set
.try_next_request()
.await
.expect("unexpected missing mempool retry");
assert_eq!(
request.request(),
&zebra_network::Request::TransactionsById(iter::once(txid).collect()),
);
assert_eq!(mempool.tx_downloads().in_flight(), 1);
Ok(())
}