From 8bd9ba823d6f75842fcec445a4daa1b8cfc7f80b Mon Sep 17 00:00:00 2001 From: Arya Date: Mon, 2 Sep 2024 12:43:51 -0400 Subject: [PATCH] adds an argument to `router::init` for receiving a handle to the mempool service. --- zebra-consensus/src/router.rs | 8 +++- zebra-consensus/src/router/tests.rs | 16 ++++++- .../tests/snapshot/get_block_template_rpcs.rs | 10 +++- zebra-rpc/src/methods/tests/vectors.rs | 48 +++++++++++++++---- zebra-state/src/service/pending_utxos.rs | 6 +++ zebrad/src/commands/start.rs | 12 +++-- .../components/inbound/tests/fake_peer_set.rs | 10 +++- zebrad/src/components/mempool/storage.rs | 4 +- zebrad/tests/acceptance.rs | 18 +++++-- 9 files changed, 106 insertions(+), 26 deletions(-) diff --git a/zebra-consensus/src/router.rs b/zebra-consensus/src/router.rs index ba42896e5..3de8da98b 100644 --- a/zebra-consensus/src/router.rs +++ b/zebra-consensus/src/router.rs @@ -21,7 +21,7 @@ use std::{ use futures::{FutureExt, TryFutureExt}; use thiserror::Error; -use tokio::task::JoinHandle; +use tokio::{sync::oneshot, task::JoinHandle}; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; use tracing::{instrument, Instrument, Span}; @@ -30,6 +30,7 @@ use zebra_chain::{ parameters::Network, }; +use zebra_node_services::mempool; use zebra_state as zs; use crate::{ @@ -230,11 +231,14 @@ where /// Block and transaction verification requests should be wrapped in a timeout, /// so that out-of-order and invalid requests do not hang indefinitely. /// See the [`router`](`crate::router`) module documentation for details. -#[instrument(skip(state_service))] +#[instrument(skip(state_service, _mempool))] pub async fn init( config: Config, network: &Network, mut state_service: S, + _mempool: oneshot::Receiver< + Buffer, mempool::Request>, + >, ) -> ( Buffer, Request>, Buffer< diff --git a/zebra-consensus/src/router/tests.rs b/zebra-consensus/src/router/tests.rs index 8fe304e33..f6265b13d 100644 --- a/zebra-consensus/src/router/tests.rs +++ b/zebra-consensus/src/router/tests.rs @@ -68,7 +68,13 @@ async fn verifiers_from_network( _transaction_verifier, _groth16_download_handle, _max_checkpoint_height, - ) = crate::router::init(Config::default(), &network, state_service.clone()).await; + ) = crate::router::init( + Config::default(), + &network, + state_service.clone(), + oneshot::channel().1, + ) + .await; // We can drop the download task handle here, because: // - if the download task fails, the tests will panic, and @@ -169,7 +175,13 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> { _transaction_verifier, _groth16_download_handle, _max_checkpoint_height, - ) = super::init(config.clone(), &network, zs::init_test(&network)).await; + ) = super::init( + config.clone(), + &network, + zs::init_test(&network), + oneshot::channel().1, + ) + .await; // Add a timeout layer let block_verifier_router = diff --git a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs index 8afb7dd31..67284b196 100644 --- a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs +++ b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs @@ -13,6 +13,7 @@ use std::{ use hex::FromHex; use insta::Settings; use jsonrpc_core::Result; +use tokio::sync::oneshot; use tower::{buffer::Buffer, Service}; use zebra_chain::{ @@ -86,8 +87,13 @@ pub async fn test_responses( _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), network, state.clone()) - .await; + ) = zebra_consensus::router::init( + zebra_consensus::Config::default(), + network, + state.clone(), + oneshot::channel().1, + ) + .await; let mut mock_sync_status = MockSyncStatus::default(); mock_sync_status.set_is_close_to_tip(true); diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 5b5a21e23..3e9dd2b1e 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -17,6 +17,9 @@ use zebra_node_services::BoxError; use zebra_state::{LatestChainTip, ReadStateService}; use zebra_test::mock_service::MockService; +#[cfg(feature = "getblocktemplate-rpcs")] +use tokio::sync::oneshot; + use super::super::*; #[tokio::test(flavor = "multi_thread")] @@ -921,8 +924,13 @@ async fn rpc_getblockcount() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone()) - .await; + ) = zebra_consensus::router::init( + zebra_consensus::Config::default(), + &Mainnet, + state.clone(), + oneshot::channel().1, + ) + .await; // Init RPC let get_block_template_rpc = GetBlockTemplateRpcImpl::new( @@ -966,8 +974,13 @@ async fn rpc_getblockcount_empty_state() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone()) - .await; + ) = zebra_consensus::router::init( + zebra_consensus::Config::default(), + &Mainnet, + state.clone(), + oneshot::channel().1, + ) + .await; // Init RPC let get_block_template_rpc = get_block_template_rpcs::GetBlockTemplateRpcImpl::new( @@ -1013,8 +1026,13 @@ async fn rpc_getpeerinfo() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &network, state.clone()) - .await; + ) = zebra_consensus::router::init( + zebra_consensus::Config::default(), + &network, + state.clone(), + oneshot::channel().1, + ) + .await; let mock_peer_address = zebra_network::types::MetaAddr::new_initial_peer( std::net::SocketAddr::new( @@ -1083,8 +1101,13 @@ async fn rpc_getblockhash() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone()) - .await; + ) = zebra_consensus::router::init( + zebra_consensus::Config::default(), + &Mainnet, + state.clone(), + oneshot::channel().1, + ) + .await; // Init RPC let get_block_template_rpc = get_block_template_rpcs::GetBlockTemplateRpcImpl::new( @@ -1569,8 +1592,13 @@ async fn rpc_submitblock_errors() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone()) - .await; + ) = zebra_consensus::router::init( + zebra_consensus::Config::default(), + &Mainnet, + state.clone(), + oneshot::channel().1, + ) + .await; // Init RPC let get_block_template_rpc = GetBlockTemplateRpcImpl::new( diff --git a/zebra-state/src/service/pending_utxos.rs b/zebra-state/src/service/pending_utxos.rs index c60719825..08eca952e 100644 --- a/zebra-state/src/service/pending_utxos.rs +++ b/zebra-state/src/service/pending_utxos.rs @@ -8,6 +8,7 @@ use zebra_chain::transparent; use crate::{BoxError, Response}; +/// Pending UTXO tracker, used in state service and mempool. #[derive(Debug, Default)] pub struct PendingUtxos(HashMap>); @@ -70,4 +71,9 @@ impl PendingUtxos { pub fn len(&self) -> usize { self.0.len() } + + /// Returns true if there are no utxos being waited on. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 887f1cc02..7106dad2f 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -160,14 +160,14 @@ impl StartCmd { // or enable denial of service attacks. // // See `zebra_network::Connection::drive_peer_request()` for details. - let (setup_tx, setup_rx) = oneshot::channel(); + let (inbound_setup_tx, inbound_setup_rx) = oneshot::channel(); let inbound = ServiceBuilder::new() .load_shed() .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY) .timeout(MAX_INBOUND_RESPONSE_TIME) .service(Inbound::new( config.sync.full_verify_concurrency_limit, - setup_rx, + inbound_setup_rx, )); let (peer_set, address_book) = zebra_network::init( @@ -179,11 +179,13 @@ impl StartCmd { .await; info!("initializing verifiers"); + let (tx_verifier_setup_tx, tx_verifier_setup_rx) = oneshot::channel(); let (block_verifier_router, tx_verifier, consensus_task_handles, max_checkpoint_height) = zebra_consensus::router::init( config.consensus.clone(), &config.network.network, state.clone(), + tx_verifier_setup_rx, ) .await; @@ -212,6 +214,10 @@ impl StartCmd { .buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY) .service(mempool); + tx_verifier_setup_tx + .send(mempool.clone()) + .map_err(|_| eyre!("could not send setup data to inbound service"))?; + info!("fully initializing inbound peer request handler"); // Fully start the inbound service as soon as possible let setup_data = InboundSetupData { @@ -222,7 +228,7 @@ impl StartCmd { state: state.clone(), latest_chain_tip: latest_chain_tip.clone(), }; - setup_tx + inbound_setup_tx .send(setup_data) .map_err(|_| eyre!("could not send setup data to inbound service"))?; // And give it time to clear its queue diff --git a/zebrad/src/components/inbound/tests/fake_peer_set.rs b/zebrad/src/components/inbound/tests/fake_peer_set.rs index 3ca30c575..f7a7d626f 100644 --- a/zebrad/src/components/inbound/tests/fake_peer_set.rs +++ b/zebrad/src/components/inbound/tests/fake_peer_set.rs @@ -789,6 +789,7 @@ async fn caches_getaddr_response() { consensus_config.clone(), &network, state_service.clone(), + oneshot::channel().1, ) .await; @@ -894,8 +895,13 @@ async fn setup( // Download task panics and timeouts are propagated to the tests that use Groth16 verifiers. let (block_verifier, _transaction_verifier, _groth16_download_handle, _max_checkpoint_height) = - zebra_consensus::router::init(consensus_config.clone(), &network, state_service.clone()) - .await; + zebra_consensus::router::init( + consensus_config.clone(), + &network, + state_service.clone(), + oneshot::channel().1, + ) + .await; let mut peer_set = MockService::build() .with_max_request_delay(MAX_PEER_SET_REQUEST_DELAY) diff --git a/zebrad/src/components/mempool/storage.rs b/zebrad/src/components/mempool/storage.rs index f567e43a3..cecd4b3f3 100644 --- a/zebrad/src/components/mempool/storage.rs +++ b/zebrad/src/components/mempool/storage.rs @@ -120,7 +120,7 @@ pub struct Storage { // Pending UTXO Request Tracking // /// The set of outpoints with pending requests for their associated transparent::Output. - pending_utxos: PendingUtxos, + _pending_utxos: PendingUtxos, /// The set of transactions rejected due to bad authorizations, or for other /// reasons, and their rejection reasons. These rejections only apply to the @@ -169,7 +169,7 @@ impl Storage { pub(crate) fn new(config: &config::Config) -> Self { Self { tx_cost_limit: config.tx_cost_limit, - pending_utxos: PendingUtxos::default(), + _pending_utxos: PendingUtxos::default(), eviction_memory_time: config.eviction_memory_time, verified: Default::default(), tip_rejected_exact: Default::default(), diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index c21b0a0e3..bc0ea0271 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -153,6 +153,7 @@ use color_eyre::{ use semver::Version; use serde_json::Value; +use tokio::sync::oneshot; use tower::ServiceExt; use zebra_chain::{ block::{self, genesis::regtest_genesis_block, Height}, @@ -2910,7 +2911,13 @@ async fn validate_regtest_genesis_block() { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &network, state).await; + ) = zebra_consensus::router::init( + zebra_consensus::Config::default(), + &network, + state, + oneshot::channel().1, + ) + .await; let genesis_hash = block_verifier_router .oneshot(zebra_consensus::Request::Commit(regtest_genesis_block())) @@ -3310,8 +3317,13 @@ async fn nu6_funding_streams_and_coinbase_balance() -> Result<()> { _transaction_verifier, _parameter_download_task_handle, _max_checkpoint_height, - ) = zebra_consensus::router::init(zebra_consensus::Config::default(), &network, state.clone()) - .await; + ) = zebra_consensus::router::init( + zebra_consensus::Config::default(), + &network, + state.clone(), + oneshot::channel().1, + ) + .await; tracing::info!("started state service and block verifier, committing Regtest genesis block");