Remove duplicate IDs in mempool requests and responses (#2887)

* Guarantee unique IDs in mempool service responses

* Guarantee unique IDs in crawler task mempool Queue requests

Also update the tests to use unique IDs.

Co-authored-by: Conrado Gouvea <conrado@zfnd.org>
This commit is contained in:
teor 2021-10-19 01:31:11 +10:00 committed by GitHub
parent fb02ce5925
commit 40c907dd09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 34 deletions

View File

@ -396,7 +396,7 @@ impl Service<zn::Request> for Inbound {
if let Setup::Initialized { mempool, .. } = &mut self.network_setup { if let Setup::Initialized { mempool, .. } = &mut self.network_setup {
mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp { mempool.clone().oneshot(mempool::Request::TransactionIds).map_ok(|resp| match resp {
mempool::Response::TransactionIds(transaction_ids) if transaction_ids.is_empty() => zn::Response::Nil, mempool::Response::TransactionIds(transaction_ids) if transaction_ids.is_empty() => zn::Response::Nil,
mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids), mempool::Response::TransactionIds(transaction_ids) => zn::Response::TransactionIds(transaction_ids.into_iter().collect()),
_ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"), _ => unreachable!("Mempool component should always respond to a `TransactionIds` request with a `TransactionIds` response"),
}) })
.boxed() .boxed()

View File

@ -71,8 +71,8 @@ pub enum Request {
#[derive(Debug)] #[derive(Debug)]
pub enum Response { pub enum Response {
Transactions(Vec<UnminedTx>), Transactions(Vec<UnminedTx>),
TransactionIds(Vec<UnminedTxId>), TransactionIds(HashSet<UnminedTxId>),
RejectedTransactionIds(Vec<UnminedTxId>), RejectedTransactionIds(HashSet<UnminedTxId>),
Queued(Vec<Result<(), MempoolError>>), Queued(Vec<Result<(), MempoolError>>),
} }

View File

@ -2,13 +2,13 @@
//! //!
//! The crawler periodically requests transactions from peers in order to populate the mempool. //! The crawler periodically requests transactions from peers in order to populate the mempool.
use std::time::Duration; use std::{collections::HashSet, time::Duration};
use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt}; use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt};
use tokio::{sync::watch, task::JoinHandle, time::sleep}; use tokio::{sync::watch, task::JoinHandle, time::sleep};
use tower::{timeout::Timeout, BoxError, Service, ServiceExt}; use tower::{timeout::Timeout, BoxError, Service, ServiceExt};
use zebra_chain::block::Height; use zebra_chain::{block::Height, transaction::UnminedTxId};
use zebra_network as zn; use zebra_network as zn;
use zebra_state::ChainTipChange; use zebra_state::ChainTipChange;
@ -171,8 +171,8 @@ where
/// Handle a peer's response to the crawler's request for transactions. /// Handle a peer's response to the crawler's request for transactions.
async fn handle_response(&mut self, response: zn::Response) -> Result<(), BoxError> { async fn handle_response(&mut self, response: zn::Response) -> Result<(), BoxError> {
let transaction_ids: Vec<_> = match response { let transaction_ids: HashSet<_> = match response {
zn::Response::TransactionIds(ids) => ids.into_iter().map(Gossip::Id).collect(), zn::Response::TransactionIds(ids) => ids.into_iter().collect(),
_ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"), _ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
}; };
@ -189,7 +189,12 @@ where
} }
/// Forward the crawled transactions IDs to the mempool transaction downloader. /// Forward the crawled transactions IDs to the mempool transaction downloader.
async fn queue_transactions(&mut self, transaction_ids: Vec<Gossip>) -> Result<(), BoxError> { async fn queue_transactions(
&mut self,
transaction_ids: HashSet<UnminedTxId>,
) -> Result<(), BoxError> {
let transaction_ids = transaction_ids.into_iter().map(Gossip::Id).collect();
let call_result = self let call_result = self
.mempool .mempool
.ready_and() .ready_and()

View File

@ -1,6 +1,9 @@
use std::time::Duration; use std::{collections::HashSet, time::Duration};
use proptest::{collection::vec, prelude::*}; use proptest::{
collection::{hash_set, vec},
prelude::*,
};
use tokio::time; use tokio::time;
use zebra_chain::{parameters::Network, transaction::UnminedTxId}; use zebra_chain::{parameters::Network, transaction::UnminedTxId};
@ -68,7 +71,7 @@ proptest! {
for _ in 0..CRAWL_ITERATIONS { for _ in 0..CRAWL_ITERATIONS {
for _ in 0..FANOUT { for _ in 0..FANOUT {
if mempool_is_enabled { if mempool_is_enabled {
respond_with_transaction_ids(&mut peer_set, vec![]).await?; respond_with_transaction_ids(&mut peer_set, HashSet::new()).await?;
} else { } else {
peer_set.expect_no_requests().await?; peer_set.expect_no_requests().await?;
} }
@ -96,7 +99,7 @@ proptest! {
/// the mempool. /// the mempool.
#[test] #[test]
fn crawled_transactions_are_forwarded_to_downloader( fn crawled_transactions_are_forwarded_to_downloader(
transaction_ids in vec(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX), transaction_ids in hash_set(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX),
) { ) {
let runtime = tokio::runtime::Builder::new_current_thread() let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
@ -136,11 +139,15 @@ proptest! {
#[test] #[test]
fn transaction_id_forwarding_errors_dont_stop_the_crawler( fn transaction_id_forwarding_errors_dont_stop_the_crawler(
service_call_error in any::<MempoolError>(), service_call_error in any::<MempoolError>(),
transaction_ids_for_call_failure in vec(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX), transaction_ids_for_call_failure in hash_set(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX),
transaction_ids_and_responses in transaction_ids_and_responses in
vec(any::<(UnminedTxId, Result<(), MempoolError>)>(), 1..MAX_CRAWLED_TX), vec(any::<(UnminedTxId, Result<(), MempoolError>)>(), 1..MAX_CRAWLED_TX),
transaction_ids_for_return_to_normal in vec(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX), transaction_ids_for_return_to_normal in hash_set(any::<UnminedTxId>(), 1..MAX_CRAWLED_TX),
) { ) {
// Make transaction_ids_and_responses unique
let unique_transaction_ids_and_responses: HashSet<UnminedTxId> = transaction_ids_and_responses.iter().map(|(id, _result)| id).copied().collect();
let transaction_ids_and_responses: Vec<(UnminedTxId, Result<(), MempoolError>)> = unique_transaction_ids_and_responses.iter().map(|unique_id| transaction_ids_and_responses.iter().find(|(id, _result)| id == unique_id).unwrap()).cloned().collect();
let runtime = tokio::runtime::Builder::new_current_thread() let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build()
@ -158,11 +165,11 @@ proptest! {
// Prepare to simulate download errors. // Prepare to simulate download errors.
let download_result_count = transaction_ids_and_responses.len(); let download_result_count = transaction_ids_and_responses.len();
let mut transaction_ids_for_download_errors = Vec::with_capacity(download_result_count); let mut transaction_ids_for_download_errors = HashSet::with_capacity(download_result_count);
let mut download_result_list = Vec::with_capacity(download_result_count); let mut download_result_list = Vec::with_capacity(download_result_count);
for (transaction_id, result) in transaction_ids_and_responses { for (transaction_id, result) in transaction_ids_and_responses {
transaction_ids_for_download_errors.push(transaction_id); transaction_ids_for_download_errors.insert(transaction_id);
download_result_list.push(result); download_result_list.push(result);
} }
@ -257,12 +264,14 @@ fn setup_crawler() -> (
/// Intercept a request for mempool transaction IDs and respond with the `transaction_ids` list. /// Intercept a request for mempool transaction IDs and respond with the `transaction_ids` list.
async fn respond_with_transaction_ids( async fn respond_with_transaction_ids(
peer_set: &mut MockPeerSet, peer_set: &mut MockPeerSet,
transaction_ids: Vec<UnminedTxId>, transaction_ids: HashSet<UnminedTxId>,
) -> Result<(), TestCaseError> { ) -> Result<(), TestCaseError> {
peer_set peer_set
.expect_request(zn::Request::MempoolTransactionIds) .expect_request(zn::Request::MempoolTransactionIds)
.await? .await?
.respond(zn::Response::TransactionIds(transaction_ids)); .respond(zn::Response::TransactionIds(
transaction_ids.into_iter().collect(),
));
Ok(()) Ok(())
} }
@ -280,7 +289,7 @@ async fn respond_with_transaction_ids(
/// If `responses` contains more items than the [`FANOUT`] number. /// If `responses` contains more items than the [`FANOUT`] number.
async fn crawler_iteration( async fn crawler_iteration(
peer_set: &mut MockPeerSet, peer_set: &mut MockPeerSet,
responses: Vec<Vec<UnminedTxId>>, responses: Vec<HashSet<UnminedTxId>>,
) -> Result<(), TestCaseError> { ) -> Result<(), TestCaseError> {
let empty_responses = FANOUT let empty_responses = FANOUT
.checked_sub(responses.len()) .checked_sub(responses.len())
@ -291,7 +300,7 @@ async fn crawler_iteration(
} }
for _ in 0..empty_responses { for _ in 0..empty_responses {
respond_with_transaction_ids(peer_set, vec![]).await?; respond_with_transaction_ids(peer_set, HashSet::new()).await?;
} }
peer_set.expect_no_requests().await?; peer_set.expect_no_requests().await?;
@ -310,16 +319,27 @@ async fn crawler_iteration(
/// If `response` and `expected_transaction_ids` have different sizes. /// If `response` and `expected_transaction_ids` have different sizes.
async fn respond_to_queue_request( async fn respond_to_queue_request(
mempool: &mut MockMempool, mempool: &mut MockMempool,
expected_transaction_ids: Vec<UnminedTxId>, expected_transaction_ids: HashSet<UnminedTxId>,
response: Vec<Result<(), MempoolError>>, response: Vec<Result<(), MempoolError>>,
) -> Result<(), TestCaseError> { ) -> Result<(), TestCaseError> {
let request_parameter = expected_transaction_ids
.into_iter()
.map(Gossip::Id)
.collect();
mempool mempool
.expect_request(mempool::Request::Queue(request_parameter)) .expect_request_that(|req| {
if let mempool::Request::Queue(req) = req {
let ids: HashSet<UnminedTxId> = req
.iter()
.filter_map(|gossip| {
if let Gossip::Id(id) = gossip {
Some(*id)
} else {
None
}
})
.collect();
ids == expected_transaction_ids
} else {
false
}
})
.await? .await?
.respond(mempool::Response::Queued(response)); .respond(mempool::Response::Queued(response));
@ -333,16 +353,27 @@ async fn respond_to_queue_request(
/// from queuing the transactions for downloading. /// from queuing the transactions for downloading.
async fn respond_to_queue_request_with_error( async fn respond_to_queue_request_with_error(
mempool: &mut MockMempool, mempool: &mut MockMempool,
expected_transaction_ids: Vec<UnminedTxId>, expected_transaction_ids: HashSet<UnminedTxId>,
error: MempoolError, error: MempoolError,
) -> Result<(), TestCaseError> { ) -> Result<(), TestCaseError> {
let request_parameter = expected_transaction_ids
.into_iter()
.map(Gossip::Id)
.collect();
mempool mempool
.expect_request(mempool::Request::Queue(request_parameter)) .expect_request_that(|req| {
if let mempool::Request::Queue(req) = req {
let ids: HashSet<UnminedTxId> = req
.iter()
.filter_map(|gossip| {
if let Gossip::Id(id) = gossip {
Some(*id)
} else {
None
}
})
.collect();
ids == expected_transaction_ids
} else {
false
}
})
.await? .await?
.respond(Err(error)); .respond(Err(error));