diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 5154843d1..3aef33daf 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -26,6 +26,7 @@ use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use color_eyre::eyre::{eyre, Report}; use futures::{select, FutureExt}; +use std::collections::HashSet; use tokio::sync::oneshot; use tower::builder::ServiceBuilder; use tower::util::BoxService; @@ -91,14 +92,18 @@ impl StartCmd { ChainSync::new(&config, peer_set.clone(), state.clone(), chain_verifier); info!("initializing mempool"); + + let (mempool_transaction_sender, mempool_transaction_receiver) = + tokio::sync::watch::channel(HashSet::new()); + let mempool_service = BoxService::new(Mempool::new( - config.network.network, peer_set.clone(), state, tx_verifier, sync_status.clone(), latest_chain_tip, chain_tip_change.clone(), + mempool_transaction_sender, )); let mempool = ServiceBuilder::new().buffer(20).service(mempool_service); @@ -114,7 +119,13 @@ impl StartCmd { peer_set.clone(), )); - let mempool_crawler_task_handle = mempool::Crawler::spawn(peer_set, mempool, sync_status); + let mempool_crawler_task_handle = + mempool::Crawler::spawn(peer_set.clone(), mempool, sync_status); + + let tx_gossip_task_handle = tokio::spawn(mempool::gossip_mempool_transaction_id( + mempool_transaction_receiver, + peer_set, + )); select! { sync_result = syncer_error_future.fuse() => sync_result, @@ -126,6 +137,10 @@ impl StartCmd { mempool_crawl_result = mempool_crawler_task_handle.fuse() => mempool_crawl_result .expect("unexpected panic in the mempool crawler") .map_err(|e| eyre!(e)), + + tx_gossip_result = tx_gossip_task_handle.fuse() => tx_gossip_result + .expect("unexpected panic in the transaction gossip task") + .map_err(|e| eyre!(e)), } } } diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index 29538fb6f..97fcebea5 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -2,10 +2,7 @@ use std::{collections::HashSet, iter::FromIterator, net::SocketAddr, str::FromSt use futures::FutureExt; use tokio::{sync::oneshot, task::JoinHandle}; -use tower::{ - buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, Service, - ServiceExt, -}; +use tower::{buffer::Buffer, builder::ServiceBuilder, util::BoxService, Service, ServiceExt}; use tracing::Span; use zebra_chain::{ @@ -19,21 +16,26 @@ use zebra_network::{AddressBook, Request, Response}; use zebra_state::Config as StateConfig; use zebra_test::mock_service::{MockService, PanicAssertion}; -use crate::components::{ - mempool::{unmined_transactions_in_blocks, Mempool}, - sync::{self, BlockGossipError, SyncStatus}, +use crate::{ + components::{ + mempool::{self, gossip_mempool_transaction_id, unmined_transactions_in_blocks, Mempool}, + sync::{self, BlockGossipError, SyncStatus}, + }, + BoxError, }; #[tokio::test] async fn mempool_requests_for_transactions() { let ( inbound_service, + _mempool_guard, _committed_blocks, added_transactions, _mock_tx_verifier, mut peer_set, _state_guard, sync_gossip_task_handle, + tx_gossip_task_handle, ) = setup(true).await; let added_transaction_ids: Vec = added_transactions.iter().map(|t| t.id).collect(); @@ -58,6 +60,7 @@ async fn mempool_requests_for_transactions() { .collect::>(); let response = inbound_service + .clone() .oneshot(Request::TransactionsById(hash_set)) .await; @@ -75,6 +78,13 @@ async fn mempool_requests_for_transactions() { "unexpected error or panic in sync gossip task: {:?}", sync_gossip_result, ); + + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); } #[tokio::test] @@ -88,12 +98,14 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> { let ( inbound_service, + _mempool_guard, _committed_blocks, _added_transactions, mut tx_verifier, mut peer_set, _state_guard, sync_gossip_task_handle, + tx_gossip_task_handle, ) = setup(false).await; // Test `Request::PushTransaction` @@ -124,8 +136,14 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> { ), }; - // check that nothing unexpected happened - peer_set.expect_no_requests().await; + // Make sure there is an additional request broadcasting the + // inserted transaction to peers. + let mut hs = HashSet::new(); + hs.insert(tx.unmined_id()); + peer_set + .expect_request(Request::AdvertiseTransactionIds(hs)) + .await + .respond(Response::Nil); let sync_gossip_result = sync_gossip_task_handle.now_or_never(); assert!( @@ -134,6 +152,13 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> { sync_gossip_result, ); + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + Ok(()) } @@ -153,12 +178,14 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { let ( inbound_service, + _mempool_guard, _committed_blocks, _added_transactions, mut tx_verifier, mut peer_set, _state_guard, sync_gossip_task_handle, + tx_gossip_task_handle, ) = setup(false).await; // Test `Request::AdvertiseTransactionIds` @@ -170,7 +197,7 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { peer_set .expect_request(Request::TransactionsById(txs)) .map(|responder| { - let unmined_transaction = UnminedTx::from(test_transaction); + let unmined_transaction = UnminedTx::from(test_transaction.clone()); responder.respond(Response::Transactions(vec![unmined_transaction])) }); // Simulate a successful transaction verification @@ -200,8 +227,14 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { ), }; - // check that nothing unexpected happened - peer_set.expect_no_requests().await; + // Make sure there is an additional request broadcasting the + // inserted transaction to peers. + let mut hs = HashSet::new(); + hs.insert(test_transaction.unmined_id()); + peer_set + .expect_request(Request::AdvertiseTransactionIds(hs)) + .await + .respond(Response::Nil); let sync_gossip_result = sync_gossip_task_handle.now_or_never(); assert!( @@ -210,6 +243,13 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { sync_gossip_result, ); + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + Ok(()) } @@ -233,12 +273,14 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { // Get services let ( inbound_service, + _mempool_guard, _committed_blocks, _added_transactions, mut tx_verifier, mut peer_set, state_service, sync_gossip_task_handle, + tx_gossip_task_handle, ) = setup(false).await; // Push test transaction @@ -283,6 +325,15 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { .await .unwrap(); + // Test transaction 1 is gossiped + let mut hs = HashSet::new(); + hs.insert(tx1_id); + peer_set + .expect_request(Request::AdvertiseTransactionIds(hs)) + .await + .respond(Response::Nil); + + // Block is gossiped then peer_set .expect_request(Request::AdvertiseBlock(block_two.hash())) .await @@ -315,6 +366,7 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { .await .unwrap(); + // Block is gossiped peer_set .expect_request(Request::AdvertiseBlock(block_three.hash())) .await @@ -351,6 +403,14 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { ), }; + // Test transaction 2 is gossiped + let mut hs = HashSet::new(); + hs.insert(tx2_id); + peer_set + .expect_request(Request::AdvertiseTransactionIds(hs)) + .await + .respond(Response::Nil); + // Add all the rest of the continous blocks we have to test tx2 will never expire. let more_blocks: Vec> = vec![ zebra_test::vectors::BLOCK_MAINNET_4_BYTES @@ -384,6 +444,7 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { .await .unwrap(); + // Block is gossiped peer_set .expect_request(Request::AdvertiseBlock(block.hash())) .await @@ -415,26 +476,31 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { sync_gossip_result, ); + let tx_gossip_result = tx_gossip_task_handle.now_or_never(); + assert!( + matches!(tx_gossip_result, None), + "unexpected error or panic in transaction gossip task: {:?}", + tx_gossip_result, + ); + Ok(()) } async fn setup( add_transactions: bool, ) -> ( - LoadShed>, + Buffer< + BoxService, + zebra_network::Request, + >, + Buffer, mempool::Request>, Vec>, Vec, MockService, MockService, - Buffer< - BoxService< - zebra_state::Request, - zebra_state::Response, - Box, - >, - zebra_state::Request, - >, + Buffer, zebra_state::Request>, JoinHandle>, + JoinHandle>, ) { let network = Network::Mainnet; let consensus_config = ConsensusConfig::default(); @@ -491,39 +557,43 @@ async fn setup( .unwrap(); committed_blocks.push(block_one); + let (transaction_sender, transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); + let mut mempool_service = Mempool::new( - network, buffered_peer_set.clone(), state_service.clone(), buffered_tx_verifier.clone(), sync_status.clone(), latest_chain_tip, chain_tip_change.clone(), + transaction_sender, ); // Enable the mempool - let _ = mempool_service.enable(&mut recent_syncs).await; + mempool_service.enable(&mut recent_syncs).await; + // Add transactions to the mempool, skipping verification and broadcast let mut added_transactions = Vec::new(); if add_transactions { added_transactions.extend(add_some_stuff_to_mempool(&mut mempool_service, network)); } let mempool_service = BoxService::new(mempool_service); - let mempool = ServiceBuilder::new().buffer(1).service(mempool_service); + let mempool_service = ServiceBuilder::new().buffer(1).service(mempool_service); let (setup_tx, setup_rx) = oneshot::channel(); let inbound_service = ServiceBuilder::new() .load_shed() - .buffer(1) .service(super::Inbound::new( setup_rx, state_service.clone(), block_verifier.clone(), )); + let inbound_service = BoxService::new(inbound_service); + let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service); - let r = setup_tx.send((buffered_peer_set, address_book, mempool)); + let r = setup_tx.send((buffered_peer_set, address_book, mempool_service.clone())); // We can't expect or unwrap because the returned Result does not implement Debug assert!(r.is_ok(), "unexpected setup channel send failure"); @@ -533,6 +603,11 @@ async fn setup( peer_set.clone(), )); + let tx_gossip_task_handle = tokio::spawn(gossip_mempool_transaction_id( + transaction_receiver, + peer_set.clone(), + )); + // Make sure there is an additional request broadcasting the // committed blocks to peers. // @@ -546,15 +621,20 @@ async fn setup( ( inbound_service, + mempool_service, committed_blocks, added_transactions, mock_tx_verifier, peer_set, state_service, sync_gossip_task_handle, + tx_gossip_task_handle, ) } +/// Manually add a transaction to the mempool storage. +/// +/// Skips some mempool functionality, like transaction verification and peer broadcasts. fn add_some_stuff_to_mempool(mempool_service: &mut Mempool, network: Network) -> Vec { // get the genesis block coinbase transaction from the Zcash blockchain. let genesis_transactions: Vec<_> = unmined_transactions_in_blocks(..=0, network) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 10c72990b..a83b59f0c 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -12,11 +12,11 @@ use std::{ }; use futures::{future::FutureExt, stream::Stream}; +use tokio::sync::watch; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; use zebra_chain::{ chain_tip::ChainTip, - parameters::Network, transaction::{UnminedTx, UnminedTxId}, }; use zebra_consensus::{error::TransactionError, transaction}; @@ -29,6 +29,7 @@ pub use crate::BoxError; mod crawler; pub mod downloads; mod error; +pub mod gossip; mod storage; #[cfg(test)] @@ -42,6 +43,7 @@ pub use self::storage::{ #[cfg(test)] pub use self::storage::tests::unmined_transactions_in_blocks; +pub use gossip::gossip_mempool_transaction_id; use self::downloads::{ Downloads as TxDownloads, Gossip, TransactionDownloadVerifyError, TRANSACTION_DOWNLOAD_TIMEOUT, @@ -146,17 +148,21 @@ pub struct Mempool { /// Handle to the transaction verifier service. /// Used to construct the transaction downloader. tx_verifier: TxVerifier, + + /// Sender part of a gossip transactions channel. + /// Used to broadcast transaction ids to peers. + transaction_sender: watch::Sender>, } impl Mempool { pub(crate) fn new( - _network: Network, outbound: Outbound, state: State, tx_verifier: TxVerifier, sync_status: SyncStatus, latest_chain_tip: zs::LatestChainTip, chain_tip_change: ChainTipChange, + transaction_sender: watch::Sender>, ) -> Self { Mempool { active_state: ActiveState::Disabled, @@ -166,6 +172,7 @@ impl Mempool { outbound, state, tx_verifier, + transaction_sender, } } @@ -254,12 +261,16 @@ impl Service for Mempool { } // Clean up completed download tasks and add to mempool if successful. + // Also, send succesful transactions to peers. + let mut inserted_txids = HashSet::new(); while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) { match r { Ok(tx) => { // Storage handles conflicting transactions or a full mempool internally, // so just ignore the storage result here - let _ = storage.insert(tx); + let _ = storage.insert(tx.clone()); + // Save transaction ids that we will send to peers + inserted_txids.insert(tx.id); } Err((txid, e)) => { reject_if_needed(storage, txid, e); @@ -267,6 +278,10 @@ impl Service for Mempool { } }; } + // Send any newly inserted transactions to peers + if !inserted_txids.is_empty() { + let _ = self.transaction_sender.send(inserted_txids)?; + } // Remove expired transactions from the mempool. if let Some(tip_height) = self.latest_chain_tip.best_tip_height() { diff --git a/zebrad/src/components/mempool/gossip.rs b/zebrad/src/components/mempool/gossip.rs new file mode 100644 index 000000000..5e43eca05 --- /dev/null +++ b/zebrad/src/components/mempool/gossip.rs @@ -0,0 +1,45 @@ +//! A task that gossips [`transaction::UnminedTxId`] that enter the mempool to peers. + +use tower::{timeout::Timeout, Service, ServiceExt}; + +use zebra_network as zn; + +use tokio::sync::watch; +use zebra_chain::transaction::UnminedTxId; + +use std::collections::HashSet; + +use crate::BoxError; + +use crate::components::sync::TIPS_RESPONSE_TIMEOUT; + +/// Run continuously, gossiping new [`transaction::UnminedTxId`] to peers. +/// +/// Broadcast any [`transaction::UnminedTxId`] that gets stored in the mempool to all ready peers. +pub async fn gossip_mempool_transaction_id( + mut receiver: watch::Receiver>, + broadcast_network: ZN, +) -> Result<(), BoxError> +where + ZN: Service + Send + Clone + 'static, + ZN::Future: Send, +{ + info!("initializing transaction gossip task"); + + // use the same timeout as tips requests, + // so broadcasts don't delay the syncer too long + let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT); + + loop { + // once we get new data in the channel, broadcast to peers + receiver.changed().await?; + + let txs = receiver.borrow().clone(); + let request = zn::Request::AdvertiseTransactionIds(txs); + + info!(?request, "sending mempool transaction broadcast"); + + // broadcast requests don't return errors, and we'd just want to ignore them anyway + let _ = broadcast_network.ready_and().await?.call(request).await; + } +} diff --git a/zebrad/src/components/mempool/tests/prop.rs b/zebrad/src/components/mempool/tests/prop.rs index a324ecd0c..efe20c8f2 100644 --- a/zebrad/src/components/mempool/tests/prop.rs +++ b/zebrad/src/components/mempool/tests/prop.rs @@ -1,4 +1,5 @@ use proptest::prelude::*; +use std::collections::HashSet; use tokio::time; use tower::{buffer::Buffer, util::BoxService}; @@ -150,14 +151,16 @@ fn setup( let (sync_status, recent_syncs) = SyncStatus::new(); let (chain_tip_sender, latest_chain_tip, chain_tip_change) = ChainTipSender::new(None, network); + let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); + let mempool = Mempool::new( - network, Buffer::new(BoxService::new(peer_set.clone()), 1), Buffer::new(BoxService::new(state_service.clone()), 1), Buffer::new(BoxService::new(tx_verifier.clone()), 1), sync_status, latest_chain_tip, chain_tip_change, + transaction_sender, ); ( diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 4101001ad..c60295b87 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -4,7 +4,7 @@ use color_eyre::Report; use tokio::time; use tower::{ServiceBuilder, ServiceExt}; -use zebra_chain::{block::Block, serialization::ZcashDeserializeInto}; +use zebra_chain::{block::Block, parameters::Network, serialization::ZcashDeserializeInto}; use zebra_consensus::Config as ConsensusConfig; use zebra_state::Config as StateConfig; use zebra_test::mock_service::MockService; @@ -36,14 +36,16 @@ async fn mempool_service_basic() -> Result<(), Report> { let more_transactions = unmined_transactions; // Start the mempool service + let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); + let mut service = Mempool::new( - network, Buffer::new(BoxService::new(peer_set), 1), state_service.clone(), tx_verifier, sync_status, latest_chain_tip, chain_tip_change, + transaction_sender, ); // Enable the mempool @@ -163,14 +165,16 @@ async fn mempool_queue() -> Result<(), Report> { let stored_tx = transactions.next_back().unwrap().clone(); // Start the mempool service + let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); + let mut service = Mempool::new( - network, Buffer::new(BoxService::new(peer_set), 1), state_service.clone(), tx_verifier, sync_status, latest_chain_tip, chain_tip_change, + transaction_sender, ); // Enable the mempool @@ -262,14 +266,16 @@ async fn mempool_service_disabled() -> Result<(), Report> { let more_transactions = unmined_transactions; // Start the mempool service + let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); + let mut service = Mempool::new( - network, Buffer::new(BoxService::new(peer_set), 1), state_service.clone(), tx_verifier, sync_status, latest_chain_tip, chain_tip_change, + transaction_sender, ); // Test if mempool is disabled (it should start disabled) @@ -383,14 +389,16 @@ async fn mempool_cancel_mined() -> Result<(), Report> { time::pause(); // Start the mempool service + let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); + let mut mempool = Mempool::new( - network, Buffer::new(BoxService::new(peer_set), 1), state_service.clone(), tx_verifier, sync_status, latest_chain_tip, chain_tip_change, + transaction_sender, ); // Enable the mempool @@ -495,14 +503,16 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> .await; // Start the mempool service + let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); + let mut mempool = Mempool::new( - network, Buffer::new(BoxService::new(peer_set), 1), state_service.clone(), tx_verifier, sync_status, latest_chain_tip, chain_tip_change, + transaction_sender, ); // Enable the mempool @@ -589,14 +599,16 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { time::pause(); // Start the mempool service + let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); + let mut mempool = Mempool::new( - network, Buffer::new(BoxService::new(peer_set.clone()), 1), state_service.clone(), Buffer::new(BoxService::new(tx_verifier.clone()), 1), sync_status, latest_chain_tip, chain_tip_change, + transaction_sender, ); // Enable the mempool @@ -691,14 +703,16 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { time::pause(); // Start the mempool service + let (transaction_sender, _transaction_receiver) = tokio::sync::watch::channel(HashSet::new()); + let mut mempool = Mempool::new( - network, Buffer::new(BoxService::new(peer_set.clone()), 1), state_service.clone(), tx_verifier, sync_status, latest_chain_tip, chain_tip_change, + transaction_sender, ); // Enable the mempool diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 8857f01bc..018e1a71f 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -76,7 +76,7 @@ const MIN_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * /// /// If this timeout is set too low, the syncer will sometimes get stuck in a /// failure loop. -const TIPS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6); +pub const TIPS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6); /// Controls how long we wait for a block download request to complete. ///