adds an argument to `router::init` for receiving a handle to the mempool service.

This commit is contained in:
Arya 2024-09-02 12:43:51 -04:00
parent f2b325e541
commit 8bd9ba823d
9 changed files with 106 additions and 26 deletions

View File

@ -21,7 +21,7 @@ use std::{
use futures::{FutureExt, TryFutureExt};
use thiserror::Error;
use tokio::task::JoinHandle;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use tracing::{instrument, Instrument, Span};
@ -30,6 +30,7 @@ use zebra_chain::{
parameters::Network,
};
use zebra_node_services::mempool;
use zebra_state as zs;
use crate::{
@ -230,11 +231,14 @@ where
/// Block and transaction verification requests should be wrapped in a timeout,
/// so that out-of-order and invalid requests do not hang indefinitely.
/// See the [`router`](`crate::router`) module documentation for details.
#[instrument(skip(state_service))]
#[instrument(skip(state_service, _mempool))]
pub async fn init<S>(
config: Config,
network: &Network,
mut state_service: S,
_mempool: oneshot::Receiver<
Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>,
>,
) -> (
Buffer<BoxService<Request, block::Hash, RouterError>, Request>,
Buffer<

View File

@ -68,7 +68,13 @@ async fn verifiers_from_network(
_transaction_verifier,
_groth16_download_handle,
_max_checkpoint_height,
) = crate::router::init(Config::default(), &network, state_service.clone()).await;
) = crate::router::init(
Config::default(),
&network,
state_service.clone(),
oneshot::channel().1,
)
.await;
// We can drop the download task handle here, because:
// - if the download task fails, the tests will panic, and
@ -169,7 +175,13 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> {
_transaction_verifier,
_groth16_download_handle,
_max_checkpoint_height,
) = super::init(config.clone(), &network, zs::init_test(&network)).await;
) = super::init(
config.clone(),
&network,
zs::init_test(&network),
oneshot::channel().1,
)
.await;
// Add a timeout layer
let block_verifier_router =

View File

@ -13,6 +13,7 @@ use std::{
use hex::FromHex;
use insta::Settings;
use jsonrpc_core::Result;
use tokio::sync::oneshot;
use tower::{buffer::Buffer, Service};
use zebra_chain::{
@ -86,8 +87,13 @@ pub async fn test_responses<State, ReadState>(
_transaction_verifier,
_parameter_download_task_handle,
_max_checkpoint_height,
) = zebra_consensus::router::init(zebra_consensus::Config::default(), network, state.clone())
.await;
) = zebra_consensus::router::init(
zebra_consensus::Config::default(),
network,
state.clone(),
oneshot::channel().1,
)
.await;
let mut mock_sync_status = MockSyncStatus::default();
mock_sync_status.set_is_close_to_tip(true);

View File

@ -17,6 +17,9 @@ use zebra_node_services::BoxError;
use zebra_state::{LatestChainTip, ReadStateService};
use zebra_test::mock_service::MockService;
#[cfg(feature = "getblocktemplate-rpcs")]
use tokio::sync::oneshot;
use super::super::*;
#[tokio::test(flavor = "multi_thread")]
@ -921,8 +924,13 @@ async fn rpc_getblockcount() {
_transaction_verifier,
_parameter_download_task_handle,
_max_checkpoint_height,
) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone())
.await;
) = zebra_consensus::router::init(
zebra_consensus::Config::default(),
&Mainnet,
state.clone(),
oneshot::channel().1,
)
.await;
// Init RPC
let get_block_template_rpc = GetBlockTemplateRpcImpl::new(
@ -966,8 +974,13 @@ async fn rpc_getblockcount_empty_state() {
_transaction_verifier,
_parameter_download_task_handle,
_max_checkpoint_height,
) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone())
.await;
) = zebra_consensus::router::init(
zebra_consensus::Config::default(),
&Mainnet,
state.clone(),
oneshot::channel().1,
)
.await;
// Init RPC
let get_block_template_rpc = get_block_template_rpcs::GetBlockTemplateRpcImpl::new(
@ -1013,8 +1026,13 @@ async fn rpc_getpeerinfo() {
_transaction_verifier,
_parameter_download_task_handle,
_max_checkpoint_height,
) = zebra_consensus::router::init(zebra_consensus::Config::default(), &network, state.clone())
.await;
) = zebra_consensus::router::init(
zebra_consensus::Config::default(),
&network,
state.clone(),
oneshot::channel().1,
)
.await;
let mock_peer_address = zebra_network::types::MetaAddr::new_initial_peer(
std::net::SocketAddr::new(
@ -1083,8 +1101,13 @@ async fn rpc_getblockhash() {
_transaction_verifier,
_parameter_download_task_handle,
_max_checkpoint_height,
) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone())
.await;
) = zebra_consensus::router::init(
zebra_consensus::Config::default(),
&Mainnet,
state.clone(),
oneshot::channel().1,
)
.await;
// Init RPC
let get_block_template_rpc = get_block_template_rpcs::GetBlockTemplateRpcImpl::new(
@ -1569,8 +1592,13 @@ async fn rpc_submitblock_errors() {
_transaction_verifier,
_parameter_download_task_handle,
_max_checkpoint_height,
) = zebra_consensus::router::init(zebra_consensus::Config::default(), &Mainnet, state.clone())
.await;
) = zebra_consensus::router::init(
zebra_consensus::Config::default(),
&Mainnet,
state.clone(),
oneshot::channel().1,
)
.await;
// Init RPC
let get_block_template_rpc = GetBlockTemplateRpcImpl::new(

View File

@ -8,6 +8,7 @@ use zebra_chain::transparent;
use crate::{BoxError, Response};
/// Pending UTXO tracker, used in state service and mempool.
#[derive(Debug, Default)]
pub struct PendingUtxos(HashMap<transparent::OutPoint, broadcast::Sender<transparent::Utxo>>);
@ -70,4 +71,9 @@ impl PendingUtxos {
pub fn len(&self) -> usize {
self.0.len()
}
/// Returns true if there are no utxos being waited on.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}

View File

@ -160,14 +160,14 @@ impl StartCmd {
// or enable denial of service attacks.
//
// See `zebra_network::Connection::drive_peer_request()` for details.
let (setup_tx, setup_rx) = oneshot::channel();
let (inbound_setup_tx, inbound_setup_rx) = oneshot::channel();
let inbound = ServiceBuilder::new()
.load_shed()
.buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
.timeout(MAX_INBOUND_RESPONSE_TIME)
.service(Inbound::new(
config.sync.full_verify_concurrency_limit,
setup_rx,
inbound_setup_rx,
));
let (peer_set, address_book) = zebra_network::init(
@ -179,11 +179,13 @@ impl StartCmd {
.await;
info!("initializing verifiers");
let (tx_verifier_setup_tx, tx_verifier_setup_rx) = oneshot::channel();
let (block_verifier_router, tx_verifier, consensus_task_handles, max_checkpoint_height) =
zebra_consensus::router::init(
config.consensus.clone(),
&config.network.network,
state.clone(),
tx_verifier_setup_rx,
)
.await;
@ -212,6 +214,10 @@ impl StartCmd {
.buffer(mempool::downloads::MAX_INBOUND_CONCURRENCY)
.service(mempool);
tx_verifier_setup_tx
.send(mempool.clone())
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
info!("fully initializing inbound peer request handler");
// Fully start the inbound service as soon as possible
let setup_data = InboundSetupData {
@ -222,7 +228,7 @@ impl StartCmd {
state: state.clone(),
latest_chain_tip: latest_chain_tip.clone(),
};
setup_tx
inbound_setup_tx
.send(setup_data)
.map_err(|_| eyre!("could not send setup data to inbound service"))?;
// And give it time to clear its queue

View File

@ -789,6 +789,7 @@ async fn caches_getaddr_response() {
consensus_config.clone(),
&network,
state_service.clone(),
oneshot::channel().1,
)
.await;
@ -894,8 +895,13 @@ async fn setup(
// Download task panics and timeouts are propagated to the tests that use Groth16 verifiers.
let (block_verifier, _transaction_verifier, _groth16_download_handle, _max_checkpoint_height) =
zebra_consensus::router::init(consensus_config.clone(), &network, state_service.clone())
.await;
zebra_consensus::router::init(
consensus_config.clone(),
&network,
state_service.clone(),
oneshot::channel().1,
)
.await;
let mut peer_set = MockService::build()
.with_max_request_delay(MAX_PEER_SET_REQUEST_DELAY)

View File

@ -120,7 +120,7 @@ pub struct Storage {
// Pending UTXO Request Tracking
//
/// The set of outpoints with pending requests for their associated transparent::Output.
pending_utxos: PendingUtxos,
_pending_utxos: PendingUtxos,
/// The set of transactions rejected due to bad authorizations, or for other
/// reasons, and their rejection reasons. These rejections only apply to the
@ -169,7 +169,7 @@ impl Storage {
pub(crate) fn new(config: &config::Config) -> Self {
Self {
tx_cost_limit: config.tx_cost_limit,
pending_utxos: PendingUtxos::default(),
_pending_utxos: PendingUtxos::default(),
eviction_memory_time: config.eviction_memory_time,
verified: Default::default(),
tip_rejected_exact: Default::default(),

View File

@ -153,6 +153,7 @@ use color_eyre::{
use semver::Version;
use serde_json::Value;
use tokio::sync::oneshot;
use tower::ServiceExt;
use zebra_chain::{
block::{self, genesis::regtest_genesis_block, Height},
@ -2910,7 +2911,13 @@ async fn validate_regtest_genesis_block() {
_transaction_verifier,
_parameter_download_task_handle,
_max_checkpoint_height,
) = zebra_consensus::router::init(zebra_consensus::Config::default(), &network, state).await;
) = zebra_consensus::router::init(
zebra_consensus::Config::default(),
&network,
state,
oneshot::channel().1,
)
.await;
let genesis_hash = block_verifier_router
.oneshot(zebra_consensus::Request::Commit(regtest_genesis_block()))
@ -3310,8 +3317,13 @@ async fn nu6_funding_streams_and_coinbase_balance() -> Result<()> {
_transaction_verifier,
_parameter_download_task_handle,
_max_checkpoint_height,
) = zebra_consensus::router::init(zebra_consensus::Config::default(), &network, state.clone())
.await;
) = zebra_consensus::router::init(
zebra_consensus::Config::default(),
&network,
state.clone(),
oneshot::channel().1,
)
.await;
tracing::info!("started state service and block verifier, committing Regtest genesis block");