Add transactions that failed verification to the mempool rejected list (#2821)
* Add transactions that failed verification to the mempool rejected list * Add tests * Work with recent changes
This commit is contained in:
parent
664d4384d4
commit
dd1f0a6dcc
|
@ -41,7 +41,8 @@ pub use self::storage::{
|
|||
pub use self::storage::tests::unmined_transactions_in_blocks;
|
||||
|
||||
use self::downloads::{
|
||||
Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
|
||||
Downloads as TxDownloads, Gossip, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
|
||||
TRANSACTION_VERIFY_TIMEOUT,
|
||||
};
|
||||
|
||||
use super::sync::SyncStatus;
|
||||
|
@ -227,11 +228,17 @@ impl Service<Request> for Mempool {
|
|||
|
||||
// Clean up completed download tasks and add to mempool if successful.
|
||||
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
|
||||
if let Ok(tx) = r {
|
||||
match r {
|
||||
Ok(tx) => {
|
||||
// Storage handles conflicting transactions or a full mempool internally,
|
||||
// so just ignore the storage result here
|
||||
let _ = storage.insert(tx);
|
||||
}
|
||||
Err((txid, e)) => {
|
||||
reject_if_needed(storage, txid, e);
|
||||
// TODO: should we also log the result?
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Remove expired transactions from the mempool.
|
||||
|
@ -327,3 +334,38 @@ fn remove_expired_transactions(
|
|||
// expiry height is effecting data, so we match by non-malleable TXID
|
||||
storage.remove_same_effects(&txid_set);
|
||||
}
|
||||
|
||||
/// Add a transaction that failed download and verification to the rejected list
|
||||
/// if needed, depending on the reason for the failure.
|
||||
fn reject_if_needed(
|
||||
storage: &mut storage::Storage,
|
||||
txid: UnminedTxId,
|
||||
e: TransactionDownloadVerifyError,
|
||||
) {
|
||||
match e {
|
||||
// Rejecting a transaction already in state would speed up further
|
||||
// download attempts without checking the state. However it would
|
||||
// make the reject list grow forever.
|
||||
// TODO: revisit after reviewing the rejected list cleanup criteria?
|
||||
// TODO: if we decide to reject it, then we need to pass the block hash
|
||||
// to State::Confirmed. This would require the zs::Response::Transaction
|
||||
// to include the hash, which would need to be implemented.
|
||||
TransactionDownloadVerifyError::InState |
|
||||
// An unknown error in the state service, better do nothing
|
||||
TransactionDownloadVerifyError::StateError(_) |
|
||||
// Sync has just started. Mempool shouldn't even be enabled, so will not
|
||||
// happen in practice.
|
||||
TransactionDownloadVerifyError::NoTip |
|
||||
// If download failed, do nothing; the crawler will end up trying to
|
||||
// download it again.
|
||||
TransactionDownloadVerifyError::DownloadFailed(_) |
|
||||
// If it was cancelled then a block was mined, or there was a network
|
||||
// upgrade, etc. No reason to reject it.
|
||||
TransactionDownloadVerifyError::Cancelled => {}
|
||||
// Consensus verification failed. Reject transaction to avoid
|
||||
// having to download and verify it again just for it to fail again.
|
||||
TransactionDownloadVerifyError::Invalid(e) => {
|
||||
storage.reject(txid, ExactTipRejectionError::FailedVerification(e).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,13 +5,13 @@ use std::{
|
|||
time::Duration,
|
||||
};
|
||||
|
||||
use color_eyre::eyre::eyre;
|
||||
use futures::{
|
||||
future::TryFutureExt,
|
||||
ready,
|
||||
stream::{FuturesUnordered, Stream},
|
||||
};
|
||||
use pin_project::pin_project;
|
||||
use thiserror::Error;
|
||||
use tokio::{sync::oneshot, task::JoinHandle};
|
||||
use tower::{Service, ServiceExt};
|
||||
use tracing_futures::Instrument;
|
||||
|
@ -65,6 +65,29 @@ pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT;
|
|||
/// Therefore, this attack can be carried out by a single malicious node.
|
||||
pub(crate) const MAX_INBOUND_CONCURRENCY: usize = 10;
|
||||
|
||||
/// Errors that can occur while downloading and verifying a transaction.
|
||||
#[derive(Error, Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub enum TransactionDownloadVerifyError {
|
||||
#[error("transaction is already in state")]
|
||||
InState,
|
||||
|
||||
#[error("error in state service")]
|
||||
StateError(#[source] BoxError),
|
||||
|
||||
#[error("transaction not validated because the tip is empty")]
|
||||
NoTip,
|
||||
|
||||
#[error("error downloading transaction")]
|
||||
DownloadFailed(#[source] BoxError),
|
||||
|
||||
#[error("transaction download / verification was cancelled")]
|
||||
Cancelled,
|
||||
|
||||
#[error("transaction did not pass consensus validation")]
|
||||
Invalid(#[from] zebra_consensus::error::TransactionError),
|
||||
}
|
||||
|
||||
/// A gossiped transaction, which can be the transaction itself or just its ID.
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
pub enum Gossip {
|
||||
|
@ -120,7 +143,9 @@ where
|
|||
// Internal downloads state
|
||||
/// A list of pending transaction download and verify tasks.
|
||||
#[pin]
|
||||
pending: FuturesUnordered<JoinHandle<Result<UnminedTx, (BoxError, UnminedTxId)>>>,
|
||||
pending: FuturesUnordered<
|
||||
JoinHandle<Result<UnminedTx, (TransactionDownloadVerifyError, UnminedTxId)>>,
|
||||
>,
|
||||
|
||||
/// A list of channels that can be used to cancel pending transaction download and
|
||||
/// verify tasks.
|
||||
|
@ -136,7 +161,7 @@ where
|
|||
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
||||
ZS::Future: Send,
|
||||
{
|
||||
type Item = Result<UnminedTx, BoxError>;
|
||||
type Item = Result<UnminedTx, (UnminedTxId, TransactionDownloadVerifyError)>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
|
@ -157,7 +182,7 @@ where
|
|||
}
|
||||
Err((e, hash)) => {
|
||||
this.cancel_handles.remove(&hash);
|
||||
Poll::Ready(Some(Err(e)))
|
||||
Poll::Ready(Some(Err((hash, e))))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -237,21 +262,27 @@ where
|
|||
Self::transaction_in_state(&mut state, txid).await?;
|
||||
|
||||
let height = match state.oneshot(zs::Request::Tip).await {
|
||||
Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()),
|
||||
Ok(zs::Response::Tip(None)) => Err(TransactionDownloadVerifyError::NoTip),
|
||||
Ok(zs::Response::Tip(Some((height, _hash)))) => Ok(height),
|
||||
Ok(_) => unreachable!("wrong response"),
|
||||
Err(e) => Err(e),
|
||||
Err(e) => Err(TransactionDownloadVerifyError::StateError(e)),
|
||||
}?;
|
||||
let height = (height + 1).ok_or_else(|| eyre!("no next height"))?;
|
||||
let height = (height + 1).expect("must have next height");
|
||||
|
||||
let tx = match gossiped_tx {
|
||||
Gossip::Id(txid) => {
|
||||
let req = zn::Request::TransactionsById(std::iter::once(txid).collect());
|
||||
|
||||
let tx = match network.oneshot(req).await? {
|
||||
zn::Response::Transactions(mut txs) => txs
|
||||
.pop()
|
||||
.expect("successful response has the transaction in it"),
|
||||
let tx = match network
|
||||
.oneshot(req)
|
||||
.await
|
||||
.map_err(|e| TransactionDownloadVerifyError::DownloadFailed(e))?
|
||||
{
|
||||
zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| {
|
||||
TransactionDownloadVerifyError::DownloadFailed(
|
||||
"no transactions returned".into(),
|
||||
)
|
||||
})?,
|
||||
_ => unreachable!("wrong response to transaction request"),
|
||||
};
|
||||
|
||||
|
@ -274,7 +305,7 @@ where
|
|||
|
||||
tracing::debug!(?txid, ?result, "verified transaction for the mempool");
|
||||
|
||||
result
|
||||
result.map_err(|e| TransactionDownloadVerifyError::Invalid(e.into()))
|
||||
}
|
||||
.map_ok(|tx| {
|
||||
metrics::counter!("gossip.verified.transaction.count", 1);
|
||||
|
@ -292,7 +323,7 @@ where
|
|||
_ = &mut cancel_rx => {
|
||||
tracing::trace!("task cancelled prior to completion");
|
||||
metrics::counter!("gossip.cancelled.count", 1);
|
||||
Err(("canceled".into(), txid))
|
||||
Err((TransactionDownloadVerifyError::Cancelled, txid))
|
||||
}
|
||||
verification = fut => verification,
|
||||
}
|
||||
|
@ -357,18 +388,22 @@ where
|
|||
}
|
||||
|
||||
/// Check if transaction is already in the state.
|
||||
async fn transaction_in_state(state: &mut ZS, txid: UnminedTxId) -> Result<(), BoxError> {
|
||||
async fn transaction_in_state(
|
||||
state: &mut ZS,
|
||||
txid: UnminedTxId,
|
||||
) -> Result<(), TransactionDownloadVerifyError> {
|
||||
// Check if the transaction is already in the state.
|
||||
match state
|
||||
.ready_and()
|
||||
.await?
|
||||
.await
|
||||
.map_err(|e| TransactionDownloadVerifyError::StateError(e))?
|
||||
.call(zs::Request::Transaction(txid.mined_id()))
|
||||
.await
|
||||
{
|
||||
Ok(zs::Response::Transaction(None)) => Ok(()),
|
||||
Ok(zs::Response::Transaction(Some(_))) => Err("already present in state".into()),
|
||||
Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState),
|
||||
Ok(_) => unreachable!("wrong response"),
|
||||
Err(e) => Err(e),
|
||||
Err(e) => Err(TransactionDownloadVerifyError::StateError(e)),
|
||||
}?;
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -52,4 +52,7 @@ pub enum MempoolError {
|
|||
|
||||
#[error("mempool is disabled since synchronization is behind the chain tip")]
|
||||
Disabled,
|
||||
|
||||
#[error("error calling a service")]
|
||||
ServiceError,
|
||||
}
|
||||
|
|
|
@ -76,6 +76,18 @@ pub enum SameEffectsChainRejectionError {
|
|||
RandomlyEvicted,
|
||||
}
|
||||
|
||||
/// Storage error that combines all other specific error types.
|
||||
#[derive(Error, Clone, Debug, PartialEq, Eq)]
|
||||
#[allow(dead_code)]
|
||||
pub enum RejectionError {
|
||||
#[error(transparent)]
|
||||
ExactTip(#[from] ExactTipRejectionError),
|
||||
#[error(transparent)]
|
||||
SameEffectsTip(#[from] SameEffectsTipRejectionError),
|
||||
#[error(transparent)]
|
||||
SameEffectsChain(#[from] SameEffectsChainRejectionError),
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Storage {
|
||||
/// The set of verified transactions in the mempool. This is a
|
||||
|
@ -279,6 +291,22 @@ impl Storage {
|
|||
+ self.chain_rejected_same_effects.len()
|
||||
}
|
||||
|
||||
/// Add a transaction to the rejected list for the given reason.
|
||||
pub fn reject(&mut self, txid: UnminedTxId, reason: RejectionError) {
|
||||
match reason {
|
||||
RejectionError::ExactTip(e) => {
|
||||
self.tip_rejected_exact.insert(txid, e);
|
||||
}
|
||||
RejectionError::SameEffectsTip(e) => {
|
||||
self.tip_rejected_same_effects.insert(txid.mined_id(), e);
|
||||
}
|
||||
RejectionError::SameEffectsChain(e) => {
|
||||
self.chain_rejected_same_effects.insert(txid.mined_id(), e);
|
||||
}
|
||||
}
|
||||
self.limit_rejection_list_memory();
|
||||
}
|
||||
|
||||
/// Returns `true` if a [`UnminedTx`] matching an [`UnminedTxId`] is in
|
||||
/// any mempool rejected list.
|
||||
///
|
||||
|
|
|
@ -563,3 +563,204 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report>
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if a transaction that fails verification is rejected by the mempool.
|
||||
#[tokio::test]
|
||||
async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
|
||||
// Using the mainnet for now
|
||||
let network = Network::Mainnet;
|
||||
let consensus_config = ConsensusConfig::default();
|
||||
let state_config = StateConfig::ephemeral();
|
||||
let peer_set = MockService::build().for_unit_tests();
|
||||
let (sync_status, mut recent_syncs) = SyncStatus::new();
|
||||
let (state, latest_chain_tip, chain_tip_change) =
|
||||
zebra_state::init(state_config.clone(), network);
|
||||
|
||||
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
|
||||
let (_chain_verifier, _tx_verifier) =
|
||||
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
|
||||
.await;
|
||||
let mut tx_verifier = MockService::build().for_unit_tests();
|
||||
|
||||
// Get transactions to use in the test
|
||||
let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network);
|
||||
let rejected_tx = unmined_transactions.next().unwrap().clone();
|
||||
|
||||
time::pause();
|
||||
|
||||
// Start the mempool service
|
||||
let mut mempool = Mempool::new(
|
||||
network,
|
||||
Buffer::new(BoxService::new(peer_set.clone()), 1),
|
||||
state_service.clone(),
|
||||
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
|
||||
sync_status,
|
||||
latest_chain_tip,
|
||||
chain_tip_change,
|
||||
);
|
||||
|
||||
// Enable the mempool
|
||||
let _ = mempool.enable(&mut recent_syncs).await;
|
||||
|
||||
// Push the genesis block to the state, since downloader needs a valid tip.
|
||||
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
|
||||
.zcash_deserialize_into()
|
||||
.unwrap();
|
||||
state_service
|
||||
.ready_and()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(zebra_state::Request::CommitFinalizedBlock(
|
||||
genesis_block.clone().into(),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Queue first transaction for verification
|
||||
// (queue the transaction itself to avoid a download).
|
||||
let request = mempool
|
||||
.ready_and()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(Request::Queue(vec![rejected_tx.clone().into()]));
|
||||
// Make the mock verifier return that the transaction is invalid.
|
||||
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
|
||||
responder.respond(Err(TransactionError::BadBalance));
|
||||
});
|
||||
let (response, _) = futures::join!(request, verification);
|
||||
let queued_responses = match response.unwrap() {
|
||||
Response::Queued(queue_responses) => queue_responses,
|
||||
_ => unreachable!("will never happen in this test"),
|
||||
};
|
||||
// Check that the request was enqueued successfully.
|
||||
assert_eq!(queued_responses.len(), 1);
|
||||
assert!(queued_responses[0].is_ok());
|
||||
|
||||
for _ in 0..2 {
|
||||
// Query the mempool just to poll it and make get the downloader/verifier result.
|
||||
mempool.dummy_call().await;
|
||||
// Sleep to avoid starvation and make sure the verification failure is picked up.
|
||||
time::sleep(time::Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
// Try to queue the same transaction by its ID and check if it's correctly
|
||||
// rejected.
|
||||
let response = mempool
|
||||
.ready_and()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(Request::Queue(vec![rejected_tx.id.into()]))
|
||||
.await
|
||||
.unwrap();
|
||||
let queued_responses = match response {
|
||||
Response::Queued(queue_responses) => queue_responses,
|
||||
_ => unreachable!("will never happen in this test"),
|
||||
};
|
||||
assert_eq!(queued_responses.len(), 1);
|
||||
assert!(matches!(
|
||||
queued_responses[0],
|
||||
Err(MempoolError::StorageExactTip(
|
||||
ExactTipRejectionError::FailedVerification(_)
|
||||
))
|
||||
));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if a transaction that fails download is _not_ rejected.
|
||||
#[tokio::test]
|
||||
async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
|
||||
// Using the mainnet for now
|
||||
let network = Network::Mainnet;
|
||||
let consensus_config = ConsensusConfig::default();
|
||||
let state_config = StateConfig::ephemeral();
|
||||
let mut peer_set = MockService::build().for_unit_tests();
|
||||
let (sync_status, mut recent_syncs) = SyncStatus::new();
|
||||
let (state, latest_chain_tip, chain_tip_change) =
|
||||
zebra_state::init(state_config.clone(), network);
|
||||
|
||||
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
|
||||
let (_chain_verifier, tx_verifier) =
|
||||
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
|
||||
.await;
|
||||
|
||||
// Get transactions to use in the test
|
||||
let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network);
|
||||
let rejected_valid_tx = unmined_transactions.next().unwrap().clone();
|
||||
|
||||
time::pause();
|
||||
|
||||
// Start the mempool service
|
||||
let mut mempool = Mempool::new(
|
||||
network,
|
||||
Buffer::new(BoxService::new(peer_set.clone()), 1),
|
||||
state_service.clone(),
|
||||
tx_verifier,
|
||||
sync_status,
|
||||
latest_chain_tip,
|
||||
chain_tip_change,
|
||||
);
|
||||
|
||||
// Enable the mempool
|
||||
let _ = mempool.enable(&mut recent_syncs).await;
|
||||
|
||||
// Push the genesis block to the state, since downloader needs a valid tip.
|
||||
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
|
||||
.zcash_deserialize_into()
|
||||
.unwrap();
|
||||
state_service
|
||||
.ready_and()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(zebra_state::Request::CommitFinalizedBlock(
|
||||
genesis_block.clone().into(),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Queue second transaction for download and verification.
|
||||
let request = mempool
|
||||
.ready_and()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(Request::Queue(vec![rejected_valid_tx.id.into()]));
|
||||
// Make the mock peer set return that the download failed.
|
||||
let verification = peer_set
|
||||
.expect_request_that(|r| matches!(r, zn::Request::TransactionsById(_)))
|
||||
.map(|responder| {
|
||||
responder.respond(zn::Response::Transactions(vec![]));
|
||||
});
|
||||
let (response, _) = futures::join!(request, verification);
|
||||
let queued_responses = match response.unwrap() {
|
||||
Response::Queued(queue_responses) => queue_responses,
|
||||
_ => unreachable!("will never happen in this test"),
|
||||
};
|
||||
// Check that the request was enqueued successfully.
|
||||
assert_eq!(queued_responses.len(), 1);
|
||||
assert!(queued_responses[0].is_ok());
|
||||
|
||||
for _ in 0..2 {
|
||||
// Query the mempool just to poll it and make get the downloader/verifier result.
|
||||
mempool.dummy_call().await;
|
||||
// Sleep to avoid starvation and make sure the download failure is picked up.
|
||||
time::sleep(time::Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
// Try to queue the same transaction by its ID and check if it's not being
|
||||
// rejected.
|
||||
let response = mempool
|
||||
.ready_and()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(Request::Queue(vec![rejected_valid_tx.id.into()]))
|
||||
.await
|
||||
.unwrap();
|
||||
let queued_responses = match response {
|
||||
Response::Queued(queue_responses) => queue_responses,
|
||||
_ => unreachable!("will never happen in this test"),
|
||||
};
|
||||
assert_eq!(queued_responses.len(), 1);
|
||||
assert!(queued_responses[0].is_ok());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue