diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 969b4f3b9..dbc5beba0 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -372,20 +372,36 @@ impl Service for Mempool { // Collect inserted transaction ids. let mut send_to_peers_ids = HashSet::<_>::new(); + let best_tip_height = self.latest_chain_tip.best_tip_height(); + // Clean up completed download tasks and add to mempool if successful. while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) { match r { - Ok(tx) => { - let insert_result = storage.insert(tx.clone()); + Ok((tx, expected_tip_height)) => { + // # Correctness: + // + // It's okay to use tip height here instead of the tip hash since + // chain_tip_change.last_tip_change() returns a `TipAction::Reset` when + // the best chain changes (which is the only way to stay at the same height), and the + // mempool re-verifies all pending tx_downloads when there's a `TipAction::Reset`. + if best_tip_height == expected_tip_height { + let insert_result = storage.insert(tx.clone()); - tracing::trace!( - ?insert_result, - "got Ok(_) transaction verify, tried to store", - ); + tracing::trace!( + ?insert_result, + "got Ok(_) transaction verify, tried to store", + ); - if let Ok(inserted_id) = insert_result { - // Save transaction ids that we will send to peers - send_to_peers_ids.insert(inserted_id); + if let Ok(inserted_id) = insert_result { + // Save transaction ids that we will send to peers + send_to_peers_ids.insert(inserted_id); + } + } else { + tracing::trace!("chain grew during tx verification, retrying ..",); + + // We don't care if re-queueing the transaction request fails. + let _result = + tx_downloads.download_if_needed_and_verify(tx.transaction.into()); } } Err((txid, error)) => { @@ -416,7 +432,7 @@ impl Service for Mempool { // // Lock times never expire, because block times are strictly increasing. // So we don't need to check them here. - if let Some(tip_height) = self.latest_chain_tip.best_tip_height() { + if let Some(tip_height) = best_tip_height { let expired_transactions = storage.remove_expired_transactions(tip_height); // Remove transactions that are expired from the peers list send_to_peers_ids = diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 99d7dc5fe..7a83d91db 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -148,7 +148,12 @@ where /// A list of pending transaction download and verify tasks. #[pin] pending: FuturesUnordered< - JoinHandle>, + JoinHandle< + Result< + (VerifiedUnminedTx, Option), + (TransactionDownloadVerifyError, UnminedTxId), + >, + >, >, /// A list of channels that can be used to cancel pending transaction download and @@ -165,7 +170,8 @@ where ZS: Service + Send + Clone + 'static, ZS::Future: Send, { - type Item = Result; + type Item = + Result<(VerifiedUnminedTx, Option), (UnminedTxId, TransactionDownloadVerifyError)>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); @@ -180,9 +186,9 @@ where // TODO: this would be cleaner with poll_map (#2693) if let Some(join_result) = ready!(this.pending.poll_next(cx)) { match join_result.expect("transaction download and verify tasks must not panic") { - Ok(tx) => { + Ok((tx, tip_height)) => { this.cancel_handles.remove(&tx.transaction.id); - Poll::Ready(Some(Ok(tx))) + Poll::Ready(Some(Ok((tx, tip_height)))) } Err((e, hash)) => { this.cancel_handles.remove(&hash); @@ -282,12 +288,12 @@ where trace!(?txid, "transaction is not in best chain"); - let next_height = match state.oneshot(zs::Request::Tip).await { - Ok(zs::Response::Tip(None)) => Ok(Height(0)), + let (tip_height, next_height) = match state.oneshot(zs::Request::Tip).await { + Ok(zs::Response::Tip(None)) => Ok((None, Height(0))), Ok(zs::Response::Tip(Some((height, _hash)))) => { let next_height = (height + 1).expect("valid heights are far below the maximum"); - Ok(next_height) + Ok((Some(height), next_height)) } Ok(_) => unreachable!("wrong response"), Err(e) => Err(TransactionDownloadVerifyError::StateError(e)), @@ -341,8 +347,8 @@ where height: next_height, }) .map_ok(|rsp| { - rsp.into_mempool_transaction() - .expect("unexpected non-mempool response to mempool request") + (rsp.into_mempool_transaction() + .expect("unexpected non-mempool response to mempool request"), tip_height) }) .await; @@ -351,13 +357,13 @@ where result.map_err(|e| TransactionDownloadVerifyError::Invalid(e.into())) } - .map_ok(|tx| { + .map_ok(|(tx, tip_height)| { metrics::counter!( "mempool.verified.transactions.total", 1, "version" => format!("{}", tx.transaction.transaction.version()), ); - tx + (tx, tip_height) }) // Tack the hash onto the error so we can remove the cancel handle // on failure as well as on success. diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index b75d19974..aa4b8b363 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -7,7 +7,8 @@ use tokio::time::{self, timeout}; use tower::{ServiceBuilder, ServiceExt}; use zebra_chain::{ - block::Block, fmt::humantime_seconds, parameters::Network, serialization::ZcashDeserializeInto, + amount::Amount, block::Block, fmt::humantime_seconds, parameters::Network, + serialization::ZcashDeserializeInto, transaction::VerifiedUnminedTx, }; use zebra_consensus::transaction as tx; use zebra_state::{Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT}; @@ -794,6 +795,192 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { Ok(()) } +/// Check that transactions are re-verified if the tip changes +/// during verification. +#[tokio::test(flavor = "multi_thread")] +async fn mempool_reverifies_after_tip_change() -> Result<(), Report> { + let network = Network::Mainnet; + + let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES + .zcash_deserialize_into() + .unwrap(); + let block1: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES + .zcash_deserialize_into() + .unwrap(); + let block2: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES + .zcash_deserialize_into() + .unwrap(); + let block3: Arc = zebra_test::vectors::BLOCK_MAINNET_3_BYTES + .zcash_deserialize_into() + .unwrap(); + + let ( + mut mempool, + mut peer_set, + mut state_service, + mut chain_tip_change, + mut tx_verifier, + mut recent_syncs, + ) = setup(network, u64::MAX).await; + + // Enable the mempool + mempool.enable(&mut recent_syncs).await; + assert!(mempool.is_enabled()); + + // Push the genesis block to the state + state_service + .ready() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + genesis_block.clone().into(), + )) + .await + .unwrap(); + + // Wait for the chain tip update without a timeout + // (skipping the chain tip change here may cause the test to + // pass without reverifying transactions for `TipAction::Grow`) + chain_tip_change + .wait_for_tip_change() + .await + .expect("unexpected chain tip update failure"); + + // Queue transaction from block 3 for download + let tx = block3.transactions[0].clone(); + let txid = block3.transactions[0].unmined_id(); + let response = mempool + .ready() + .await + .unwrap() + .call(Request::Queue(vec![txid.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + assert_eq!(mempool.tx_downloads().in_flight(), 1); + + // Verify the transaction + + peer_set + .expect_request_that(|req| matches!(req, zn::Request::TransactionsById(_))) + .map(|responder| { + responder.respond(zn::Response::Transactions(vec![ + zn::InventoryResponse::Available(tx.clone().into()), + ])); + }) + .await; + + tx_verifier + .expect_request_that(|_| true) + .map(|responder| { + let transaction = responder + .request() + .clone() + .mempool_transaction() + .expect("unexpected non-mempool request"); + + // Set a dummy fee and sigops. + responder.respond(transaction::Response::from(VerifiedUnminedTx::new( + transaction, + Amount::zero(), + 0, + ))); + }) + .await; + + // Push block 1 to the state. This is considered a network upgrade, + // and must cancel all pending transaction downloads with a `TipAction::Reset`. + state_service + .ready() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + block1.clone().into(), + )) + .await + .unwrap(); + + // Wait for the chain tip update without a timeout + // (skipping the chain tip change here will fail the test) + chain_tip_change + .wait_for_tip_change() + .await + .expect("unexpected chain tip update failure"); + + // Query the mempool to make it poll chain_tip_change and try reverifying its state for the `TipAction::Reset` + mempool.dummy_call().await; + + // Check that there is still an in-flight tx_download and that + // no transactions were inserted in the mempool. + assert_eq!(mempool.tx_downloads().in_flight(), 1); + assert_eq!(mempool.storage().transaction_count(), 0); + + // Verify the transaction again + + peer_set + .expect_request_that(|req| matches!(req, zn::Request::TransactionsById(_))) + .map(|responder| { + responder.respond(zn::Response::Transactions(vec![ + zn::InventoryResponse::Available(tx.into()), + ])); + }) + .await; + + // Verify the transaction now that the mempool has already checked chain_tip_change + tx_verifier + .expect_request_that(|_| true) + .map(|responder| { + let transaction = responder + .request() + .clone() + .mempool_transaction() + .expect("unexpected non-mempool request"); + + // Set a dummy fee and sigops. + responder.respond(transaction::Response::from(VerifiedUnminedTx::new( + transaction, + Amount::zero(), + 0, + ))); + }) + .await; + + // Push block 2 to the state. This will increase the tip height past the expected + // tip height that the the tx was verified at. + state_service + .ready() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + block2.clone().into(), + )) + .await + .unwrap(); + + // Wait for the chain tip update without a timeout + // (skipping the chain tip change here will fail the test) + chain_tip_change + .wait_for_tip_change() + .await + .expect("unexpected chain tip update failure"); + + // Query the mempool to make it poll tx_downloads.pending and try reverifying transactions + // because the tip height has changed. + mempool.dummy_call().await; + + // Check that there is still an in-flight tx_download and that + // no transactions were inserted in the mempool. + assert_eq!(mempool.tx_downloads().in_flight(), 1); + assert_eq!(mempool.storage().transaction_count(), 0); + + Ok(()) +} + /// Create a new [`Mempool`] instance using mocked services. async fn setup( network: Network,