Send `AdvertiseTransactionIds` to peers (#2823)
* bradcast transactions to peers after they get inserted into mempool * remove network argument from mempool init * remove dbg left * remove return value in mempool enable call * rename channel sender and receiver vars * change unwrap() to expect() * change the channel to a hashset * fix build * fix tests * rustfmt * fix tiny space issue inside macro Co-authored-by: teor <teor@riseup.net> * check errors/panics in transaction gossip tests * fix build of newly added tests * Stop dropping the inbound service and mempool in a test Keeping the mempool around avoids a transaction broadcast task error, so we can test that there are no other errors in the task. * Tweak variable names and add comments * Avoid unexpected drops by returning a mempool guard in tests * Use BoxError to simplify service types in tests * Make all returned service types consistent in tests We want to be able to change the setup without changing the tests. Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
parent
0683e0b40b
commit
724967d488
|
@ -26,6 +26,7 @@
|
||||||
use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
|
use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
|
||||||
use color_eyre::eyre::{eyre, Report};
|
use color_eyre::eyre::{eyre, Report};
|
||||||
use futures::{select, FutureExt};
|
use futures::{select, FutureExt};
|
||||||
|
use std::collections::HashSet;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tower::builder::ServiceBuilder;
|
use tower::builder::ServiceBuilder;
|
||||||
use tower::util::BoxService;
|
use tower::util::BoxService;
|
||||||
|
@ -91,14 +92,18 @@ impl StartCmd {
|
||||||
ChainSync::new(&config, peer_set.clone(), state.clone(), chain_verifier);
|
ChainSync::new(&config, peer_set.clone(), state.clone(), chain_verifier);
|
||||||
|
|
||||||
info!("initializing mempool");
|
info!("initializing mempool");
|
||||||
|
|
||||||
|
let (mempool_transaction_sender, mempool_transaction_receiver) =
|
||||||
|
tokio::sync::watch::channel(HashSet::new());
|
||||||
|
|
||||||
let mempool_service = BoxService::new(Mempool::new(
|
let mempool_service = BoxService::new(Mempool::new(
|
||||||
config.network.network,
|
|
||||||
peer_set.clone(),
|
peer_set.clone(),
|
||||||
state,
|
state,
|
||||||
tx_verifier,
|
tx_verifier,
|
||||||
sync_status.clone(),
|
sync_status.clone(),
|
||||||
latest_chain_tip,
|
latest_chain_tip,
|
||||||
chain_tip_change.clone(),
|
chain_tip_change.clone(),
|
||||||
|
mempool_transaction_sender,
|
||||||
));
|
));
|
||||||
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);
|
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);
|
||||||
|
|
||||||
|
@ -114,7 +119,13 @@ impl StartCmd {
|
||||||
peer_set.clone(),
|
peer_set.clone(),
|
||||||
));
|
));
|
||||||
|
|
||||||
let mempool_crawler_task_handle = mempool::Crawler::spawn(peer_set, mempool, sync_status);
|
let mempool_crawler_task_handle =
|
||||||
|
mempool::Crawler::spawn(peer_set.clone(), mempool, sync_status);
|
||||||
|
|
||||||
|
let tx_gossip_task_handle = tokio::spawn(mempool::gossip_mempool_transaction_id(
|
||||||
|
mempool_transaction_receiver,
|
||||||
|
peer_set,
|
||||||
|
));
|
||||||
|
|
||||||
select! {
|
select! {
|
||||||
sync_result = syncer_error_future.fuse() => sync_result,
|
sync_result = syncer_error_future.fuse() => sync_result,
|
||||||
|
@ -126,6 +137,10 @@ impl StartCmd {
|
||||||
mempool_crawl_result = mempool_crawler_task_handle.fuse() => mempool_crawl_result
|
mempool_crawl_result = mempool_crawler_task_handle.fuse() => mempool_crawl_result
|
||||||
.expect("unexpected panic in the mempool crawler")
|
.expect("unexpected panic in the mempool crawler")
|
||||||
.map_err(|e| eyre!(e)),
|
.map_err(|e| eyre!(e)),
|
||||||
|
|
||||||
|
tx_gossip_result = tx_gossip_task_handle.fuse() => tx_gossip_result
|
||||||
|
.expect("unexpected panic in the transaction gossip task")
|
||||||
|
.map_err(|e| eyre!(e)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,10 +2,7 @@ use std::{collections::HashSet, iter::FromIterator, net::SocketAddr, str::FromSt
|
||||||
|
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use tokio::{sync::oneshot, task::JoinHandle};
|
use tokio::{sync::oneshot, task::JoinHandle};
|
||||||
use tower::{
|
use tower::{buffer::Buffer, builder::ServiceBuilder, util::BoxService, Service, ServiceExt};
|
||||||
buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, Service,
|
|
||||||
ServiceExt,
|
|
||||||
};
|
|
||||||
use tracing::Span;
|
use tracing::Span;
|
||||||
|
|
||||||
use zebra_chain::{
|
use zebra_chain::{
|
||||||
|
@ -19,21 +16,26 @@ use zebra_network::{AddressBook, Request, Response};
|
||||||
use zebra_state::Config as StateConfig;
|
use zebra_state::Config as StateConfig;
|
||||||
use zebra_test::mock_service::{MockService, PanicAssertion};
|
use zebra_test::mock_service::{MockService, PanicAssertion};
|
||||||
|
|
||||||
use crate::components::{
|
use crate::{
|
||||||
mempool::{unmined_transactions_in_blocks, Mempool},
|
components::{
|
||||||
sync::{self, BlockGossipError, SyncStatus},
|
mempool::{self, gossip_mempool_transaction_id, unmined_transactions_in_blocks, Mempool},
|
||||||
|
sync::{self, BlockGossipError, SyncStatus},
|
||||||
|
},
|
||||||
|
BoxError,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn mempool_requests_for_transactions() {
|
async fn mempool_requests_for_transactions() {
|
||||||
let (
|
let (
|
||||||
inbound_service,
|
inbound_service,
|
||||||
|
_mempool_guard,
|
||||||
_committed_blocks,
|
_committed_blocks,
|
||||||
added_transactions,
|
added_transactions,
|
||||||
_mock_tx_verifier,
|
_mock_tx_verifier,
|
||||||
mut peer_set,
|
mut peer_set,
|
||||||
_state_guard,
|
_state_guard,
|
||||||
sync_gossip_task_handle,
|
sync_gossip_task_handle,
|
||||||
|
tx_gossip_task_handle,
|
||||||
) = setup(true).await;
|
) = setup(true).await;
|
||||||
|
|
||||||
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();
|
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();
|
||||||
|
@ -58,6 +60,7 @@ async fn mempool_requests_for_transactions() {
|
||||||
.collect::<HashSet<_>>();
|
.collect::<HashSet<_>>();
|
||||||
|
|
||||||
let response = inbound_service
|
let response = inbound_service
|
||||||
|
.clone()
|
||||||
.oneshot(Request::TransactionsById(hash_set))
|
.oneshot(Request::TransactionsById(hash_set))
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
@ -75,6 +78,13 @@ async fn mempool_requests_for_transactions() {
|
||||||
"unexpected error or panic in sync gossip task: {:?}",
|
"unexpected error or panic in sync gossip task: {:?}",
|
||||||
sync_gossip_result,
|
sync_gossip_result,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
|
||||||
|
assert!(
|
||||||
|
matches!(tx_gossip_result, None),
|
||||||
|
"unexpected error or panic in transaction gossip task: {:?}",
|
||||||
|
tx_gossip_result,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -88,12 +98,14 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
|
||||||
|
|
||||||
let (
|
let (
|
||||||
inbound_service,
|
inbound_service,
|
||||||
|
_mempool_guard,
|
||||||
_committed_blocks,
|
_committed_blocks,
|
||||||
_added_transactions,
|
_added_transactions,
|
||||||
mut tx_verifier,
|
mut tx_verifier,
|
||||||
mut peer_set,
|
mut peer_set,
|
||||||
_state_guard,
|
_state_guard,
|
||||||
sync_gossip_task_handle,
|
sync_gossip_task_handle,
|
||||||
|
tx_gossip_task_handle,
|
||||||
) = setup(false).await;
|
) = setup(false).await;
|
||||||
|
|
||||||
// Test `Request::PushTransaction`
|
// Test `Request::PushTransaction`
|
||||||
|
@ -124,8 +136,14 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
// check that nothing unexpected happened
|
// Make sure there is an additional request broadcasting the
|
||||||
peer_set.expect_no_requests().await;
|
// inserted transaction to peers.
|
||||||
|
let mut hs = HashSet::new();
|
||||||
|
hs.insert(tx.unmined_id());
|
||||||
|
peer_set
|
||||||
|
.expect_request(Request::AdvertiseTransactionIds(hs))
|
||||||
|
.await
|
||||||
|
.respond(Response::Nil);
|
||||||
|
|
||||||
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
|
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
|
||||||
assert!(
|
assert!(
|
||||||
|
@ -134,6 +152,13 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> {
|
||||||
sync_gossip_result,
|
sync_gossip_result,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
|
||||||
|
assert!(
|
||||||
|
matches!(tx_gossip_result, None),
|
||||||
|
"unexpected error or panic in transaction gossip task: {:?}",
|
||||||
|
tx_gossip_result,
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,12 +178,14 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
|
||||||
|
|
||||||
let (
|
let (
|
||||||
inbound_service,
|
inbound_service,
|
||||||
|
_mempool_guard,
|
||||||
_committed_blocks,
|
_committed_blocks,
|
||||||
_added_transactions,
|
_added_transactions,
|
||||||
mut tx_verifier,
|
mut tx_verifier,
|
||||||
mut peer_set,
|
mut peer_set,
|
||||||
_state_guard,
|
_state_guard,
|
||||||
sync_gossip_task_handle,
|
sync_gossip_task_handle,
|
||||||
|
tx_gossip_task_handle,
|
||||||
) = setup(false).await;
|
) = setup(false).await;
|
||||||
|
|
||||||
// Test `Request::AdvertiseTransactionIds`
|
// Test `Request::AdvertiseTransactionIds`
|
||||||
|
@ -170,7 +197,7 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
|
||||||
peer_set
|
peer_set
|
||||||
.expect_request(Request::TransactionsById(txs))
|
.expect_request(Request::TransactionsById(txs))
|
||||||
.map(|responder| {
|
.map(|responder| {
|
||||||
let unmined_transaction = UnminedTx::from(test_transaction);
|
let unmined_transaction = UnminedTx::from(test_transaction.clone());
|
||||||
responder.respond(Response::Transactions(vec![unmined_transaction]))
|
responder.respond(Response::Transactions(vec![unmined_transaction]))
|
||||||
});
|
});
|
||||||
// Simulate a successful transaction verification
|
// Simulate a successful transaction verification
|
||||||
|
@ -200,8 +227,14 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
// check that nothing unexpected happened
|
// Make sure there is an additional request broadcasting the
|
||||||
peer_set.expect_no_requests().await;
|
// inserted transaction to peers.
|
||||||
|
let mut hs = HashSet::new();
|
||||||
|
hs.insert(test_transaction.unmined_id());
|
||||||
|
peer_set
|
||||||
|
.expect_request(Request::AdvertiseTransactionIds(hs))
|
||||||
|
.await
|
||||||
|
.respond(Response::Nil);
|
||||||
|
|
||||||
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
|
let sync_gossip_result = sync_gossip_task_handle.now_or_never();
|
||||||
assert!(
|
assert!(
|
||||||
|
@ -210,6 +243,13 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> {
|
||||||
sync_gossip_result,
|
sync_gossip_result,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
|
||||||
|
assert!(
|
||||||
|
matches!(tx_gossip_result, None),
|
||||||
|
"unexpected error or panic in transaction gossip task: {:?}",
|
||||||
|
tx_gossip_result,
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,12 +273,14 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
// Get services
|
// Get services
|
||||||
let (
|
let (
|
||||||
inbound_service,
|
inbound_service,
|
||||||
|
_mempool_guard,
|
||||||
_committed_blocks,
|
_committed_blocks,
|
||||||
_added_transactions,
|
_added_transactions,
|
||||||
mut tx_verifier,
|
mut tx_verifier,
|
||||||
mut peer_set,
|
mut peer_set,
|
||||||
state_service,
|
state_service,
|
||||||
sync_gossip_task_handle,
|
sync_gossip_task_handle,
|
||||||
|
tx_gossip_task_handle,
|
||||||
) = setup(false).await;
|
) = setup(false).await;
|
||||||
|
|
||||||
// Push test transaction
|
// Push test transaction
|
||||||
|
@ -283,6 +325,15 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
// Test transaction 1 is gossiped
|
||||||
|
let mut hs = HashSet::new();
|
||||||
|
hs.insert(tx1_id);
|
||||||
|
peer_set
|
||||||
|
.expect_request(Request::AdvertiseTransactionIds(hs))
|
||||||
|
.await
|
||||||
|
.respond(Response::Nil);
|
||||||
|
|
||||||
|
// Block is gossiped then
|
||||||
peer_set
|
peer_set
|
||||||
.expect_request(Request::AdvertiseBlock(block_two.hash()))
|
.expect_request(Request::AdvertiseBlock(block_two.hash()))
|
||||||
.await
|
.await
|
||||||
|
@ -315,6 +366,7 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
// Block is gossiped
|
||||||
peer_set
|
peer_set
|
||||||
.expect_request(Request::AdvertiseBlock(block_three.hash()))
|
.expect_request(Request::AdvertiseBlock(block_three.hash()))
|
||||||
.await
|
.await
|
||||||
|
@ -351,6 +403,14 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Test transaction 2 is gossiped
|
||||||
|
let mut hs = HashSet::new();
|
||||||
|
hs.insert(tx2_id);
|
||||||
|
peer_set
|
||||||
|
.expect_request(Request::AdvertiseTransactionIds(hs))
|
||||||
|
.await
|
||||||
|
.respond(Response::Nil);
|
||||||
|
|
||||||
// Add all the rest of the continous blocks we have to test tx2 will never expire.
|
// Add all the rest of the continous blocks we have to test tx2 will never expire.
|
||||||
let more_blocks: Vec<Arc<Block>> = vec![
|
let more_blocks: Vec<Arc<Block>> = vec![
|
||||||
zebra_test::vectors::BLOCK_MAINNET_4_BYTES
|
zebra_test::vectors::BLOCK_MAINNET_4_BYTES
|
||||||
|
@ -384,6 +444,7 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
// Block is gossiped
|
||||||
peer_set
|
peer_set
|
||||||
.expect_request(Request::AdvertiseBlock(block.hash()))
|
.expect_request(Request::AdvertiseBlock(block.hash()))
|
||||||
.await
|
.await
|
||||||
|
@ -415,26 +476,31 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
|
||||||
sync_gossip_result,
|
sync_gossip_result,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let tx_gossip_result = tx_gossip_task_handle.now_or_never();
|
||||||
|
assert!(
|
||||||
|
matches!(tx_gossip_result, None),
|
||||||
|
"unexpected error or panic in transaction gossip task: {:?}",
|
||||||
|
tx_gossip_result,
|
||||||
|
);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn setup(
|
async fn setup(
|
||||||
add_transactions: bool,
|
add_transactions: bool,
|
||||||
) -> (
|
) -> (
|
||||||
LoadShed<tower::buffer::Buffer<super::Inbound, zebra_network::Request>>,
|
Buffer<
|
||||||
|
BoxService<zebra_network::Request, zebra_network::Response, BoxError>,
|
||||||
|
zebra_network::Request,
|
||||||
|
>,
|
||||||
|
Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>,
|
||||||
Vec<Arc<Block>>,
|
Vec<Arc<Block>>,
|
||||||
Vec<UnminedTx>,
|
Vec<UnminedTx>,
|
||||||
MockService<transaction::Request, transaction::Response, PanicAssertion, TransactionError>,
|
MockService<transaction::Request, transaction::Response, PanicAssertion, TransactionError>,
|
||||||
MockService<Request, Response, PanicAssertion>,
|
MockService<Request, Response, PanicAssertion>,
|
||||||
Buffer<
|
Buffer<BoxService<zebra_state::Request, zebra_state::Response, BoxError>, zebra_state::Request>,
|
||||||
BoxService<
|
|
||||||
zebra_state::Request,
|
|
||||||
zebra_state::Response,
|
|
||||||
Box<dyn std::error::Error + Send + Sync>,
|
|
||||||
>,
|
|
||||||
zebra_state::Request,
|
|
||||||
>,
|
|
||||||
JoinHandle<Result<(), BlockGossipError>>,
|
JoinHandle<Result<(), BlockGossipError>>,
|
||||||
|
JoinHandle<Result<(), BoxError>>,
|
||||||
) {
|
) {
|
||||||
let network = Network::Mainnet;
|
let network = Network::Mainnet;
|
||||||
let consensus_config = ConsensusConfig::default();
|
let consensus_config = ConsensusConfig::default();
|
||||||
|
@ -491,39 +557,43 @@ async fn setup(
|
||||||
.unwrap();
|
.unwrap();
|
||||||
committed_blocks.push(block_one);
|
committed_blocks.push(block_one);
|
||||||
|
|
||||||
|
let (transaction_sender, transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
|
||||||
|
|
||||||
let mut mempool_service = Mempool::new(
|
let mut mempool_service = Mempool::new(
|
||||||
network,
|
|
||||||
buffered_peer_set.clone(),
|
buffered_peer_set.clone(),
|
||||||
state_service.clone(),
|
state_service.clone(),
|
||||||
buffered_tx_verifier.clone(),
|
buffered_tx_verifier.clone(),
|
||||||
sync_status.clone(),
|
sync_status.clone(),
|
||||||
latest_chain_tip,
|
latest_chain_tip,
|
||||||
chain_tip_change.clone(),
|
chain_tip_change.clone(),
|
||||||
|
transaction_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Enable the mempool
|
// Enable the mempool
|
||||||
let _ = mempool_service.enable(&mut recent_syncs).await;
|
mempool_service.enable(&mut recent_syncs).await;
|
||||||
|
|
||||||
|
// Add transactions to the mempool, skipping verification and broadcast
|
||||||
let mut added_transactions = Vec::new();
|
let mut added_transactions = Vec::new();
|
||||||
if add_transactions {
|
if add_transactions {
|
||||||
added_transactions.extend(add_some_stuff_to_mempool(&mut mempool_service, network));
|
added_transactions.extend(add_some_stuff_to_mempool(&mut mempool_service, network));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mempool_service = BoxService::new(mempool_service);
|
let mempool_service = BoxService::new(mempool_service);
|
||||||
let mempool = ServiceBuilder::new().buffer(1).service(mempool_service);
|
let mempool_service = ServiceBuilder::new().buffer(1).service(mempool_service);
|
||||||
|
|
||||||
let (setup_tx, setup_rx) = oneshot::channel();
|
let (setup_tx, setup_rx) = oneshot::channel();
|
||||||
|
|
||||||
let inbound_service = ServiceBuilder::new()
|
let inbound_service = ServiceBuilder::new()
|
||||||
.load_shed()
|
.load_shed()
|
||||||
.buffer(1)
|
|
||||||
.service(super::Inbound::new(
|
.service(super::Inbound::new(
|
||||||
setup_rx,
|
setup_rx,
|
||||||
state_service.clone(),
|
state_service.clone(),
|
||||||
block_verifier.clone(),
|
block_verifier.clone(),
|
||||||
));
|
));
|
||||||
|
let inbound_service = BoxService::new(inbound_service);
|
||||||
|
let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service);
|
||||||
|
|
||||||
let r = setup_tx.send((buffered_peer_set, address_book, mempool));
|
let r = setup_tx.send((buffered_peer_set, address_book, mempool_service.clone()));
|
||||||
// We can't expect or unwrap because the returned Result does not implement Debug
|
// We can't expect or unwrap because the returned Result does not implement Debug
|
||||||
assert!(r.is_ok(), "unexpected setup channel send failure");
|
assert!(r.is_ok(), "unexpected setup channel send failure");
|
||||||
|
|
||||||
|
@ -533,6 +603,11 @@ async fn setup(
|
||||||
peer_set.clone(),
|
peer_set.clone(),
|
||||||
));
|
));
|
||||||
|
|
||||||
|
let tx_gossip_task_handle = tokio::spawn(gossip_mempool_transaction_id(
|
||||||
|
transaction_receiver,
|
||||||
|
peer_set.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
// Make sure there is an additional request broadcasting the
|
// Make sure there is an additional request broadcasting the
|
||||||
// committed blocks to peers.
|
// committed blocks to peers.
|
||||||
//
|
//
|
||||||
|
@ -546,15 +621,20 @@ async fn setup(
|
||||||
|
|
||||||
(
|
(
|
||||||
inbound_service,
|
inbound_service,
|
||||||
|
mempool_service,
|
||||||
committed_blocks,
|
committed_blocks,
|
||||||
added_transactions,
|
added_transactions,
|
||||||
mock_tx_verifier,
|
mock_tx_verifier,
|
||||||
peer_set,
|
peer_set,
|
||||||
state_service,
|
state_service,
|
||||||
sync_gossip_task_handle,
|
sync_gossip_task_handle,
|
||||||
|
tx_gossip_task_handle,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Manually add a transaction to the mempool storage.
|
||||||
|
///
|
||||||
|
/// Skips some mempool functionality, like transaction verification and peer broadcasts.
|
||||||
fn add_some_stuff_to_mempool(mempool_service: &mut Mempool, network: Network) -> Vec<UnminedTx> {
|
fn add_some_stuff_to_mempool(mempool_service: &mut Mempool, network: Network) -> Vec<UnminedTx> {
|
||||||
// get the genesis block coinbase transaction from the Zcash blockchain.
|
// get the genesis block coinbase transaction from the Zcash blockchain.
|
||||||
let genesis_transactions: Vec<_> = unmined_transactions_in_blocks(..=0, network)
|
let genesis_transactions: Vec<_> = unmined_transactions_in_blocks(..=0, network)
|
||||||
|
|
|
@ -12,11 +12,11 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{future::FutureExt, stream::Stream};
|
use futures::{future::FutureExt, stream::Stream};
|
||||||
|
use tokio::sync::watch;
|
||||||
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
|
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
|
||||||
|
|
||||||
use zebra_chain::{
|
use zebra_chain::{
|
||||||
chain_tip::ChainTip,
|
chain_tip::ChainTip,
|
||||||
parameters::Network,
|
|
||||||
transaction::{UnminedTx, UnminedTxId},
|
transaction::{UnminedTx, UnminedTxId},
|
||||||
};
|
};
|
||||||
use zebra_consensus::{error::TransactionError, transaction};
|
use zebra_consensus::{error::TransactionError, transaction};
|
||||||
|
@ -29,6 +29,7 @@ pub use crate::BoxError;
|
||||||
mod crawler;
|
mod crawler;
|
||||||
pub mod downloads;
|
pub mod downloads;
|
||||||
mod error;
|
mod error;
|
||||||
|
pub mod gossip;
|
||||||
mod storage;
|
mod storage;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -42,6 +43,7 @@ pub use self::storage::{
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub use self::storage::tests::unmined_transactions_in_blocks;
|
pub use self::storage::tests::unmined_transactions_in_blocks;
|
||||||
|
pub use gossip::gossip_mempool_transaction_id;
|
||||||
|
|
||||||
use self::downloads::{
|
use self::downloads::{
|
||||||
Downloads as TxDownloads, Gossip, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
|
Downloads as TxDownloads, Gossip, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT,
|
||||||
|
@ -146,17 +148,21 @@ pub struct Mempool {
|
||||||
/// Handle to the transaction verifier service.
|
/// Handle to the transaction verifier service.
|
||||||
/// Used to construct the transaction downloader.
|
/// Used to construct the transaction downloader.
|
||||||
tx_verifier: TxVerifier,
|
tx_verifier: TxVerifier,
|
||||||
|
|
||||||
|
/// Sender part of a gossip transactions channel.
|
||||||
|
/// Used to broadcast transaction ids to peers.
|
||||||
|
transaction_sender: watch::Sender<HashSet<UnminedTxId>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Mempool {
|
impl Mempool {
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
_network: Network,
|
|
||||||
outbound: Outbound,
|
outbound: Outbound,
|
||||||
state: State,
|
state: State,
|
||||||
tx_verifier: TxVerifier,
|
tx_verifier: TxVerifier,
|
||||||
sync_status: SyncStatus,
|
sync_status: SyncStatus,
|
||||||
latest_chain_tip: zs::LatestChainTip,
|
latest_chain_tip: zs::LatestChainTip,
|
||||||
chain_tip_change: ChainTipChange,
|
chain_tip_change: ChainTipChange,
|
||||||
|
transaction_sender: watch::Sender<HashSet<UnminedTxId>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Mempool {
|
Mempool {
|
||||||
active_state: ActiveState::Disabled,
|
active_state: ActiveState::Disabled,
|
||||||
|
@ -166,6 +172,7 @@ impl Mempool {
|
||||||
outbound,
|
outbound,
|
||||||
state,
|
state,
|
||||||
tx_verifier,
|
tx_verifier,
|
||||||
|
transaction_sender,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,12 +261,16 @@ impl Service<Request> for Mempool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up completed download tasks and add to mempool if successful.
|
// Clean up completed download tasks and add to mempool if successful.
|
||||||
|
// Also, send succesful transactions to peers.
|
||||||
|
let mut inserted_txids = HashSet::new();
|
||||||
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
|
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
|
||||||
match r {
|
match r {
|
||||||
Ok(tx) => {
|
Ok(tx) => {
|
||||||
// Storage handles conflicting transactions or a full mempool internally,
|
// Storage handles conflicting transactions or a full mempool internally,
|
||||||
// so just ignore the storage result here
|
// so just ignore the storage result here
|
||||||
let _ = storage.insert(tx);
|
let _ = storage.insert(tx.clone());
|
||||||
|
// Save transaction ids that we will send to peers
|
||||||
|
inserted_txids.insert(tx.id);
|
||||||
}
|
}
|
||||||
Err((txid, e)) => {
|
Err((txid, e)) => {
|
||||||
reject_if_needed(storage, txid, e);
|
reject_if_needed(storage, txid, e);
|
||||||
|
@ -267,6 +278,10 @@ impl Service<Request> for Mempool {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
// Send any newly inserted transactions to peers
|
||||||
|
if !inserted_txids.is_empty() {
|
||||||
|
let _ = self.transaction_sender.send(inserted_txids)?;
|
||||||
|
}
|
||||||
|
|
||||||
// Remove expired transactions from the mempool.
|
// Remove expired transactions from the mempool.
|
||||||
if let Some(tip_height) = self.latest_chain_tip.best_tip_height() {
|
if let Some(tip_height) = self.latest_chain_tip.best_tip_height() {
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
//! A task that gossips [`transaction::UnminedTxId`] that enter the mempool to peers.
|
||||||
|
|
||||||
|
use tower::{timeout::Timeout, Service, ServiceExt};
|
||||||
|
|
||||||
|
use zebra_network as zn;
|
||||||
|
|
||||||
|
use tokio::sync::watch;
|
||||||
|
use zebra_chain::transaction::UnminedTxId;
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
use crate::BoxError;
|
||||||
|
|
||||||
|
use crate::components::sync::TIPS_RESPONSE_TIMEOUT;
|
||||||
|
|
||||||
|
/// Run continuously, gossiping new [`transaction::UnminedTxId`] to peers.
|
||||||
|
///
|
||||||
|
/// Broadcast any [`transaction::UnminedTxId`] that gets stored in the mempool to all ready peers.
|
||||||
|
pub async fn gossip_mempool_transaction_id<ZN>(
|
||||||
|
mut receiver: watch::Receiver<HashSet<UnminedTxId>>,
|
||||||
|
broadcast_network: ZN,
|
||||||
|
) -> Result<(), BoxError>
|
||||||
|
where
|
||||||
|
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
|
||||||
|
ZN::Future: Send,
|
||||||
|
{
|
||||||
|
info!("initializing transaction gossip task");
|
||||||
|
|
||||||
|
// use the same timeout as tips requests,
|
||||||
|
// so broadcasts don't delay the syncer too long
|
||||||
|
let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// once we get new data in the channel, broadcast to peers
|
||||||
|
receiver.changed().await?;
|
||||||
|
|
||||||
|
let txs = receiver.borrow().clone();
|
||||||
|
let request = zn::Request::AdvertiseTransactionIds(txs);
|
||||||
|
|
||||||
|
info!(?request, "sending mempool transaction broadcast");
|
||||||
|
|
||||||
|
// broadcast requests don't return errors, and we'd just want to ignore them anyway
|
||||||
|
let _ = broadcast_network.ready_and().await?.call(request).await;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,5 @@
|
||||||
use proptest::prelude::*;
|
use proptest::prelude::*;
|
||||||
|
use std::collections::HashSet;
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tower::{buffer::Buffer, util::BoxService};
|
use tower::{buffer::Buffer, util::BoxService};
|
||||||
|
|
||||||
|
@ -150,14 +151,16 @@ fn setup(
|
||||||
let (sync_status, recent_syncs) = SyncStatus::new();
|
let (sync_status, recent_syncs) = SyncStatus::new();
|
||||||
let (chain_tip_sender, latest_chain_tip, chain_tip_change) = ChainTipSender::new(None, network);
|
let (chain_tip_sender, latest_chain_tip, chain_tip_change) = ChainTipSender::new(None, network);
|
||||||
|
|
||||||
|
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
|
||||||
|
|
||||||
let mempool = Mempool::new(
|
let mempool = Mempool::new(
|
||||||
network,
|
|
||||||
Buffer::new(BoxService::new(peer_set.clone()), 1),
|
Buffer::new(BoxService::new(peer_set.clone()), 1),
|
||||||
Buffer::new(BoxService::new(state_service.clone()), 1),
|
Buffer::new(BoxService::new(state_service.clone()), 1),
|
||||||
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
|
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
|
||||||
sync_status,
|
sync_status,
|
||||||
latest_chain_tip,
|
latest_chain_tip,
|
||||||
chain_tip_change,
|
chain_tip_change,
|
||||||
|
transaction_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
(
|
(
|
||||||
|
|
|
@ -4,7 +4,7 @@ use color_eyre::Report;
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tower::{ServiceBuilder, ServiceExt};
|
use tower::{ServiceBuilder, ServiceExt};
|
||||||
|
|
||||||
use zebra_chain::{block::Block, serialization::ZcashDeserializeInto};
|
use zebra_chain::{block::Block, parameters::Network, serialization::ZcashDeserializeInto};
|
||||||
use zebra_consensus::Config as ConsensusConfig;
|
use zebra_consensus::Config as ConsensusConfig;
|
||||||
use zebra_state::Config as StateConfig;
|
use zebra_state::Config as StateConfig;
|
||||||
use zebra_test::mock_service::MockService;
|
use zebra_test::mock_service::MockService;
|
||||||
|
@ -36,14 +36,16 @@ async fn mempool_service_basic() -> Result<(), Report> {
|
||||||
let more_transactions = unmined_transactions;
|
let more_transactions = unmined_transactions;
|
||||||
|
|
||||||
// Start the mempool service
|
// Start the mempool service
|
||||||
|
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
|
||||||
|
|
||||||
let mut service = Mempool::new(
|
let mut service = Mempool::new(
|
||||||
network,
|
|
||||||
Buffer::new(BoxService::new(peer_set), 1),
|
Buffer::new(BoxService::new(peer_set), 1),
|
||||||
state_service.clone(),
|
state_service.clone(),
|
||||||
tx_verifier,
|
tx_verifier,
|
||||||
sync_status,
|
sync_status,
|
||||||
latest_chain_tip,
|
latest_chain_tip,
|
||||||
chain_tip_change,
|
chain_tip_change,
|
||||||
|
transaction_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Enable the mempool
|
// Enable the mempool
|
||||||
|
@ -163,14 +165,16 @@ async fn mempool_queue() -> Result<(), Report> {
|
||||||
let stored_tx = transactions.next_back().unwrap().clone();
|
let stored_tx = transactions.next_back().unwrap().clone();
|
||||||
|
|
||||||
// Start the mempool service
|
// Start the mempool service
|
||||||
|
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
|
||||||
|
|
||||||
let mut service = Mempool::new(
|
let mut service = Mempool::new(
|
||||||
network,
|
|
||||||
Buffer::new(BoxService::new(peer_set), 1),
|
Buffer::new(BoxService::new(peer_set), 1),
|
||||||
state_service.clone(),
|
state_service.clone(),
|
||||||
tx_verifier,
|
tx_verifier,
|
||||||
sync_status,
|
sync_status,
|
||||||
latest_chain_tip,
|
latest_chain_tip,
|
||||||
chain_tip_change,
|
chain_tip_change,
|
||||||
|
transaction_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Enable the mempool
|
// Enable the mempool
|
||||||
|
@ -262,14 +266,16 @@ async fn mempool_service_disabled() -> Result<(), Report> {
|
||||||
let more_transactions = unmined_transactions;
|
let more_transactions = unmined_transactions;
|
||||||
|
|
||||||
// Start the mempool service
|
// Start the mempool service
|
||||||
|
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
|
||||||
|
|
||||||
let mut service = Mempool::new(
|
let mut service = Mempool::new(
|
||||||
network,
|
|
||||||
Buffer::new(BoxService::new(peer_set), 1),
|
Buffer::new(BoxService::new(peer_set), 1),
|
||||||
state_service.clone(),
|
state_service.clone(),
|
||||||
tx_verifier,
|
tx_verifier,
|
||||||
sync_status,
|
sync_status,
|
||||||
latest_chain_tip,
|
latest_chain_tip,
|
||||||
chain_tip_change,
|
chain_tip_change,
|
||||||
|
transaction_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Test if mempool is disabled (it should start disabled)
|
// Test if mempool is disabled (it should start disabled)
|
||||||
|
@ -383,14 +389,16 @@ async fn mempool_cancel_mined() -> Result<(), Report> {
|
||||||
time::pause();
|
time::pause();
|
||||||
|
|
||||||
// Start the mempool service
|
// Start the mempool service
|
||||||
|
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
|
||||||
|
|
||||||
let mut mempool = Mempool::new(
|
let mut mempool = Mempool::new(
|
||||||
network,
|
|
||||||
Buffer::new(BoxService::new(peer_set), 1),
|
Buffer::new(BoxService::new(peer_set), 1),
|
||||||
state_service.clone(),
|
state_service.clone(),
|
||||||
tx_verifier,
|
tx_verifier,
|
||||||
sync_status,
|
sync_status,
|
||||||
latest_chain_tip,
|
latest_chain_tip,
|
||||||
chain_tip_change,
|
chain_tip_change,
|
||||||
|
transaction_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Enable the mempool
|
// Enable the mempool
|
||||||
|
@ -495,14 +503,16 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report>
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Start the mempool service
|
// Start the mempool service
|
||||||
|
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
|
||||||
|
|
||||||
let mut mempool = Mempool::new(
|
let mut mempool = Mempool::new(
|
||||||
network,
|
|
||||||
Buffer::new(BoxService::new(peer_set), 1),
|
Buffer::new(BoxService::new(peer_set), 1),
|
||||||
state_service.clone(),
|
state_service.clone(),
|
||||||
tx_verifier,
|
tx_verifier,
|
||||||
sync_status,
|
sync_status,
|
||||||
latest_chain_tip,
|
latest_chain_tip,
|
||||||
chain_tip_change,
|
chain_tip_change,
|
||||||
|
transaction_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Enable the mempool
|
// Enable the mempool
|
||||||
|
@ -589,14 +599,16 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
|
||||||
time::pause();
|
time::pause();
|
||||||
|
|
||||||
// Start the mempool service
|
// Start the mempool service
|
||||||
|
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
|
||||||
|
|
||||||
let mut mempool = Mempool::new(
|
let mut mempool = Mempool::new(
|
||||||
network,
|
|
||||||
Buffer::new(BoxService::new(peer_set.clone()), 1),
|
Buffer::new(BoxService::new(peer_set.clone()), 1),
|
||||||
state_service.clone(),
|
state_service.clone(),
|
||||||
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
|
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
|
||||||
sync_status,
|
sync_status,
|
||||||
latest_chain_tip,
|
latest_chain_tip,
|
||||||
chain_tip_change,
|
chain_tip_change,
|
||||||
|
transaction_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Enable the mempool
|
// Enable the mempool
|
||||||
|
@ -691,14 +703,16 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
|
||||||
time::pause();
|
time::pause();
|
||||||
|
|
||||||
// Start the mempool service
|
// Start the mempool service
|
||||||
|
let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new());
|
||||||
|
|
||||||
let mut mempool = Mempool::new(
|
let mut mempool = Mempool::new(
|
||||||
network,
|
|
||||||
Buffer::new(BoxService::new(peer_set.clone()), 1),
|
Buffer::new(BoxService::new(peer_set.clone()), 1),
|
||||||
state_service.clone(),
|
state_service.clone(),
|
||||||
tx_verifier,
|
tx_verifier,
|
||||||
sync_status,
|
sync_status,
|
||||||
latest_chain_tip,
|
latest_chain_tip,
|
||||||
chain_tip_change,
|
chain_tip_change,
|
||||||
|
transaction_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Enable the mempool
|
// Enable the mempool
|
||||||
|
|
|
@ -76,7 +76,7 @@ const MIN_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP *
|
||||||
///
|
///
|
||||||
/// If this timeout is set too low, the syncer will sometimes get stuck in a
|
/// If this timeout is set too low, the syncer will sometimes get stuck in a
|
||||||
/// failure loop.
|
/// failure loop.
|
||||||
const TIPS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
|
pub const TIPS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
|
||||||
|
|
||||||
/// Controls how long we wait for a block download request to complete.
|
/// Controls how long we wait for a block download request to complete.
|
||||||
///
|
///
|
||||||
|
|
Loading…
Reference in New Issue