fix(mempool): Re-verify transactions that were verified at a different tip height (#6154)

* checks tip height before mempool insertions

* adds unit test for reverifying txs

* Adds TODO

* Adds correctness note

* dedup best_tip_height() calls

* Update zebrad/src/components/mempool.rs

Co-authored-by: teor <teor@riseup.net>

* uses Option for expected tip height

* removes misplaced dummy_call()

* calls wait_for_chain_tip without a timeout where it doesn't matter and skips instead of panicking where it doesn't

* Update zebrad/src/components/mempool/tests/vector.rs

* removes whitespace for rustfmt

---------

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Arya 2023-02-16 15:06:42 -05:00 committed by GitHub
parent fddd361d3e
commit f253213e34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 231 additions and 22 deletions

View File

@ -372,20 +372,36 @@ impl Service<Request> for Mempool {
// Collect inserted transaction ids.
let mut send_to_peers_ids = HashSet::<_>::new();
let best_tip_height = self.latest_chain_tip.best_tip_height();
// Clean up completed download tasks and add to mempool if successful.
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
match r {
Ok(tx) => {
let insert_result = storage.insert(tx.clone());
Ok((tx, expected_tip_height)) => {
// # Correctness:
//
// It's okay to use tip height here instead of the tip hash since
// chain_tip_change.last_tip_change() returns a `TipAction::Reset` when
// the best chain changes (which is the only way to stay at the same height), and the
// mempool re-verifies all pending tx_downloads when there's a `TipAction::Reset`.
if best_tip_height == expected_tip_height {
let insert_result = storage.insert(tx.clone());
tracing::trace!(
?insert_result,
"got Ok(_) transaction verify, tried to store",
);
tracing::trace!(
?insert_result,
"got Ok(_) transaction verify, tried to store",
);
if let Ok(inserted_id) = insert_result {
// Save transaction ids that we will send to peers
send_to_peers_ids.insert(inserted_id);
if let Ok(inserted_id) = insert_result {
// Save transaction ids that we will send to peers
send_to_peers_ids.insert(inserted_id);
}
} else {
tracing::trace!("chain grew during tx verification, retrying ..",);
// We don't care if re-queueing the transaction request fails.
let _result =
tx_downloads.download_if_needed_and_verify(tx.transaction.into());
}
}
Err((txid, error)) => {
@ -416,7 +432,7 @@ impl Service<Request> for Mempool {
//
// Lock times never expire, because block times are strictly increasing.
// So we don't need to check them here.
if let Some(tip_height) = self.latest_chain_tip.best_tip_height() {
if let Some(tip_height) = best_tip_height {
let expired_transactions = storage.remove_expired_transactions(tip_height);
// Remove transactions that are expired from the peers list
send_to_peers_ids =

View File

@ -148,7 +148,12 @@ where
/// A list of pending transaction download and verify tasks.
#[pin]
pending: FuturesUnordered<
JoinHandle<Result<VerifiedUnminedTx, (TransactionDownloadVerifyError, UnminedTxId)>>,
JoinHandle<
Result<
(VerifiedUnminedTx, Option<Height>),
(TransactionDownloadVerifyError, UnminedTxId),
>,
>,
>,
/// A list of channels that can be used to cancel pending transaction download and
@ -165,7 +170,8 @@ where
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
type Item = Result<VerifiedUnminedTx, (UnminedTxId, TransactionDownloadVerifyError)>;
type Item =
Result<(VerifiedUnminedTx, Option<Height>), (UnminedTxId, TransactionDownloadVerifyError)>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
@ -180,9 +186,9 @@ where
// TODO: this would be cleaner with poll_map (#2693)
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("transaction download and verify tasks must not panic") {
Ok(tx) => {
Ok((tx, tip_height)) => {
this.cancel_handles.remove(&tx.transaction.id);
Poll::Ready(Some(Ok(tx)))
Poll::Ready(Some(Ok((tx, tip_height))))
}
Err((e, hash)) => {
this.cancel_handles.remove(&hash);
@ -282,12 +288,12 @@ where
trace!(?txid, "transaction is not in best chain");
let next_height = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Ok(Height(0)),
let (tip_height, next_height) = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Ok((None, Height(0))),
Ok(zs::Response::Tip(Some((height, _hash)))) => {
let next_height =
(height + 1).expect("valid heights are far below the maximum");
Ok(next_height)
Ok((Some(height), next_height))
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e)),
@ -341,8 +347,8 @@ where
height: next_height,
})
.map_ok(|rsp| {
rsp.into_mempool_transaction()
.expect("unexpected non-mempool response to mempool request")
(rsp.into_mempool_transaction()
.expect("unexpected non-mempool response to mempool request"), tip_height)
})
.await;
@ -351,13 +357,13 @@ where
result.map_err(|e| TransactionDownloadVerifyError::Invalid(e.into()))
}
.map_ok(|tx| {
.map_ok(|(tx, tip_height)| {
metrics::counter!(
"mempool.verified.transactions.total",
1,
"version" => format!("{}", tx.transaction.transaction.version()),
);
tx
(tx, tip_height)
})
// Tack the hash onto the error so we can remove the cancel handle
// on failure as well as on success.

View File

@ -7,7 +7,8 @@ use tokio::time::{self, timeout};
use tower::{ServiceBuilder, ServiceExt};
use zebra_chain::{
block::Block, fmt::humantime_seconds, parameters::Network, serialization::ZcashDeserializeInto,
amount::Amount, block::Block, fmt::humantime_seconds, parameters::Network,
serialization::ZcashDeserializeInto, transaction::VerifiedUnminedTx,
};
use zebra_consensus::transaction as tx;
use zebra_state::{Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT};
@ -794,6 +795,192 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
Ok(())
}
/// Check that transactions are re-verified if the tip changes
/// during verification.
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reverifies_after_tip_change() -> Result<(), Report> {
let network = Network::Mainnet;
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
let block3: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_3_BYTES
.zcash_deserialize_into()
.unwrap();
let (
mut mempool,
mut peer_set,
mut state_service,
mut chain_tip_change,
mut tx_verifier,
mut recent_syncs,
) = setup(network, u64::MAX).await;
// Enable the mempool
mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());
// Push the genesis block to the state
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
// Wait for the chain tip update without a timeout
// (skipping the chain tip change here may cause the test to
// pass without reverifying transactions for `TipAction::Grow`)
chain_tip_change
.wait_for_tip_change()
.await
.expect("unexpected chain tip update failure");
// Queue transaction from block 3 for download
let tx = block3.transactions[0].clone();
let txid = block3.transactions[0].unmined_id();
let response = mempool
.ready()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(mempool.tx_downloads().in_flight(), 1);
// Verify the transaction
peer_set
.expect_request_that(|req| matches!(req, zn::Request::TransactionsById(_)))
.map(|responder| {
responder.respond(zn::Response::Transactions(vec![
zn::InventoryResponse::Available(tx.clone().into()),
]));
})
.await;
tx_verifier
.expect_request_that(|_| true)
.map(|responder| {
let transaction = responder
.request()
.clone()
.mempool_transaction()
.expect("unexpected non-mempool request");
// Set a dummy fee and sigops.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
0,
)));
})
.await;
// Push block 1 to the state. This is considered a network upgrade,
// and must cancel all pending transaction downloads with a `TipAction::Reset`.
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
block1.clone().into(),
))
.await
.unwrap();
// Wait for the chain tip update without a timeout
// (skipping the chain tip change here will fail the test)
chain_tip_change
.wait_for_tip_change()
.await
.expect("unexpected chain tip update failure");
// Query the mempool to make it poll chain_tip_change and try reverifying its state for the `TipAction::Reset`
mempool.dummy_call().await;
// Check that there is still an in-flight tx_download and that
// no transactions were inserted in the mempool.
assert_eq!(mempool.tx_downloads().in_flight(), 1);
assert_eq!(mempool.storage().transaction_count(), 0);
// Verify the transaction again
peer_set
.expect_request_that(|req| matches!(req, zn::Request::TransactionsById(_)))
.map(|responder| {
responder.respond(zn::Response::Transactions(vec![
zn::InventoryResponse::Available(tx.into()),
]));
})
.await;
// Verify the transaction now that the mempool has already checked chain_tip_change
tx_verifier
.expect_request_that(|_| true)
.map(|responder| {
let transaction = responder
.request()
.clone()
.mempool_transaction()
.expect("unexpected non-mempool request");
// Set a dummy fee and sigops.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
0,
)));
})
.await;
// Push block 2 to the state. This will increase the tip height past the expected
// tip height that the the tx was verified at.
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
block2.clone().into(),
))
.await
.unwrap();
// Wait for the chain tip update without a timeout
// (skipping the chain tip change here will fail the test)
chain_tip_change
.wait_for_tip_change()
.await
.expect("unexpected chain tip update failure");
// Query the mempool to make it poll tx_downloads.pending and try reverifying transactions
// because the tip height has changed.
mempool.dummy_call().await;
// Check that there is still an in-flight tx_download and that
// no transactions were inserted in the mempool.
assert_eq!(mempool.tx_downloads().in_flight(), 1);
assert_eq!(mempool.storage().transaction_count(), 0);
Ok(())
}
/// Create a new [`Mempool`] instance using mocked services.
async fn setup(
network: Network,