Refactor to create a new `zebra-node-services` crate (#3648)

* Create new empty `zebra-node-services` crate

The goal is to store the mempool `Request` and `Response` types so that
the `zebra-rpc` crate can interface with the mempool service without
having to import `zebrad`.

* Move `Gossip` mempool type into new crate

It is required by the `Request` type, which will be moved next.

* Add documentation to `Gossip` variants

Avoid having to add an exception to allow undocumented code.

* Move `mempool::Request` type to new crate

The first part of the service interface definition. Usages have been
changed to refer to the new crate directly, and since this refactor is
still incomplete, some `mp` aliases are used in a few places to refer to
the old module.

* Create an `UnboxMempoolError` helper trait

Centralize some common code to extract and downcast boxed mempool
errors. The `mempool::Response` will need to contain a `BoxError`
instead of a `MempoolError` when it is moved to the
`zebra-node-services` crate, so this prepares the tests to be updated
with less changes.

* Use `UnboxMempoolError` in tests

Make the necessary changes so that the tests are ready to support a
`BoxError` in the `mempool::Response` type.

* Use `BoxError` in `mempool::Response`

Prepare it to be moved to the `zebra-node-services` crate.

* Move `mempool::Response` to `zebra-node-services`

Update usages to import from the new crate directly.

* Remove `mp` aliases for mempool component module

Use any internal types directly instead.

* Replace `tower::BoxService` with custom alias

Remove the dependency of `zebra-node-services` on `tower`.

* Move `Gossip` into a separate `sub-module`

Keep only the main `Request` and `Response` types in the `mempool`
module.

* Use `crate::BoxError` instead of `tower::BoxError`

Follow the existing convention.

* Add missing `gossip.rs` module file

It was missing from a previous refactor commit.
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2022-02-25 18:43:21 -03:00 committed by GitHub
parent 4fc10e5257
commit c24ea1fc3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 275 additions and 160 deletions

8
Cargo.lock generated
View File

@ -5688,6 +5688,13 @@ dependencies = [
"zebra-test",
]
[[package]]
name = "zebra-node-services"
version = "1.0.0-beta.5"
dependencies = [
"zebra-chain",
]
[[package]]
name = "zebra-rpc"
version = "1.0.0-beta.0"
@ -5831,6 +5838,7 @@ dependencies = [
"zebra-chain",
"zebra-consensus",
"zebra-network",
"zebra-node-services",
"zebra-rpc",
"zebra-state",
"zebra-test",

View File

@ -8,6 +8,7 @@ members = [
"zebra-consensus",
"zebra-rpc",
"zebra-client",
"zebra-node-services",
"zebra-test",
"zebra-utils",
"tower-batch",

View File

@ -0,0 +1,10 @@
[package]
name = "zebra-node-services"
authors = ["Zcash Foundation <zebra@zfnd.org>"]
license = "MIT OR Apache-2.0"
version = "1.0.0-beta.5"
edition = "2021"
repository = "https://github.com/ZcashFoundation/zebra"
[dependencies]
zebra-chain = { path = "../zebra-chain" }

View File

@ -0,0 +1,10 @@
//! The interfaces of some Zebra node services.
pub mod mempool;
/// Error type alias to make working with tower traits easier.
///
/// Note: the 'static lifetime bound means that the *type* cannot have any
/// non-'static lifetimes, (e.g., when a type contains a borrow and is
/// parameterized by 'a), *not* that the object itself has 'static lifetime.
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

View File

@ -0,0 +1,93 @@
//! The Zebra mempool.
//!
//! A service that manages known unmined Zcash transactions.
use std::collections::HashSet;
use zebra_chain::transaction::{UnminedTx, UnminedTxId};
use crate::BoxError;
mod gossip;
pub use self::gossip::Gossip;
/// A mempool service request.
///
/// Requests can query the current set of mempool transactions,
/// queue transactions to be downloaded and verified, or
/// run the mempool to check for newly verified transactions.
///
/// Requests can't modify the mempool directly,
/// because all mempool transactions must be verified.
#[derive(Debug, Eq, PartialEq)]
#[allow(dead_code)]
pub enum Request {
/// Query all transaction IDs in the mempool.
TransactionIds,
/// Query matching transactions in the mempool,
/// using a unique set of [`UnminedTxId`]s.
TransactionsById(HashSet<UnminedTxId>),
/// Query matching cached rejected transaction IDs in the mempool,
/// using a unique set of [`UnminedTxId`]s.
RejectedTransactionIds(HashSet<UnminedTxId>),
/// Queue a list of gossiped transactions or transaction IDs, or
/// crawled transaction IDs.
///
/// The transaction downloader checks for duplicates across IDs and transactions.
Queue(Vec<Gossip>),
/// Check for newly verified transactions.
///
/// The transaction downloader does not push transactions into the mempool.
/// So a task should send this request regularly (every 5-10 seconds).
///
/// These checks also happen for other request variants,
/// but we can't rely on peers to send queries regularly,
/// and crawler queue requests depend on peer responses.
/// Also, crawler requests aren't frequent enough for transaction propagation.
///
/// # Correctness
///
/// This request is required to avoid hangs in the mempool.
///
/// The queue checker task can't call `poll_ready` directly on the [`Mempool`] service,
/// because the mempool service is wrapped in a `Buffer`.
/// Calling [`Buffer::poll_ready`] reserves a buffer slot, which can cause hangs when
/// too many slots are reserved but unused:
/// <https://docs.rs/tower/0.4.10/tower/buffer/struct.Buffer.html#a-note-on-choosing-a-bound>
CheckForVerifiedTransactions,
}
/// A response to a mempool service request.
///
/// Responses can read the current set of mempool transactions,
/// check the queued status of transactions to be downloaded and verified, or
/// confirm that the mempool has been checked for newly verified transactions.
#[derive(Debug)]
pub enum Response {
/// Returns all transaction IDs from the mempool.
TransactionIds(HashSet<UnminedTxId>),
/// Returns matching transactions from the mempool.
///
/// Since the [`TransactionsById`] request is unique,
/// the response transactions are also unique.
Transactions(Vec<UnminedTx>),
/// Returns matching cached rejected transaction IDs from the mempool,
RejectedTransactionIds(HashSet<UnminedTxId>),
/// Returns a list of queue results.
///
/// These are the results of the initial queue checks.
/// The transaction may also fail download or verification later.
///
/// Each result matches the request at the corresponding vector index.
Queued(Vec<Result<(), BoxError>>),
/// Confirms that the mempool has checked for recently verified transactions.
CheckedForVerifiedTransactions,
}

View File

@ -0,0 +1,35 @@
//! Representation of a gossiped transaction to send to the mempool.
use zebra_chain::transaction::{UnminedTx, UnminedTxId};
/// A gossiped transaction, which can be the transaction itself or just its ID.
#[derive(Debug, Eq, PartialEq)]
pub enum Gossip {
/// Just the ID of an unmined transaction.
Id(UnminedTxId),
/// The full contents of an unmined transaction.
Tx(UnminedTx),
}
impl Gossip {
/// Return the [`UnminedTxId`] of a gossiped transaction.
pub fn id(&self) -> UnminedTxId {
match self {
Gossip::Id(txid) => *txid,
Gossip::Tx(tx) => tx.id,
}
}
}
impl From<UnminedTxId> for Gossip {
fn from(txid: UnminedTxId) -> Self {
Gossip::Id(txid)
}
}
impl From<UnminedTx> for Gossip {
fn from(tx: UnminedTx) -> Self {
Gossip::Tx(tx)
}
}

View File

@ -13,6 +13,7 @@ default-run = "zebrad"
zebra-chain = { path = "../zebra-chain" }
zebra-consensus = { path = "../zebra-consensus/" }
zebra-network = { path = "../zebra-network" }
zebra-node-services = { path = "../zebra-node-services" }
zebra-rpc = { path = "../zebra-rpc" }
zebra-state = { path = "../zebra-state" }

View File

@ -33,12 +33,11 @@ use zebra_network::{
constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
AddressBook, InventoryResponse,
};
use zebra_node_services::mempool;
// Re-use the syncer timeouts for consistency.
use super::{
mempool, mempool as mp,
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
};
use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
use crate::BoxError;
use InventoryResponse::*;
@ -52,7 +51,7 @@ use downloads::Downloads as BlockDownloads;
type BlockDownloadPeerSet =
Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
type Mempool = Buffer<BoxService<mp::Request, mp::Response, mp::BoxError>, mp::Request>;
type Mempool = Buffer<BoxService<mempool::Request, mempool::Response, BoxError>, mempool::Request>;
type BlockVerifier = Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>;
type GossipedBlockDownloads =
BlockDownloads<Timeout<BlockDownloadPeerSet>, Timeout<BlockVerifier>, State>;

View File

@ -23,13 +23,17 @@ use zebra_chain::{
};
use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig};
use zebra_network::{AddressBook, InventoryResponse, Request, Response};
use zebra_node_services::mempool;
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::{
components::{
inbound::{Inbound, InboundSetupData},
mempool::{self, gossip_mempool_transaction_id, unmined_transactions_in_blocks, Mempool},
mempool::{
gossip_mempool_transaction_id, unmined_transactions_in_blocks, Config as MempoolConfig,
Mempool, MempoolError, SameEffectsChainRejectionError, UnboxMempoolError,
},
sync::{self, BlockGossipError, SyncStatus},
},
BoxError,
@ -487,10 +491,12 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> {
assert_eq!(queued_responses.len(), 1);
assert_eq!(
queued_responses[0],
Err(mempool::MempoolError::StorageEffectsChain(
mempool::SameEffectsChainRejectionError::Expired
))
queued_responses
.into_iter()
.next()
.unwrap()
.unbox_mempool_error(),
MempoolError::StorageEffectsChain(SameEffectsChainRejectionError::Expired)
);
// Test transaction 2 is gossiped
@ -750,7 +756,7 @@ async fn setup(
committed_blocks.push(block_one);
let (mut mempool_service, transaction_receiver) = Mempool::new(
&mempool::Config::default(),
&MempoolConfig::default(),
buffered_peer_set.clone(),
state_service.clone(),
buffered_tx_verifier.clone(),

View File

@ -22,13 +22,14 @@ use zebra_network::{
connect_isolated_tcp_direct_with_inbound, types::InventoryHash, Config as NetworkConfig,
InventoryResponse, PeerError, Request, Response, SharedPeerError,
};
use zebra_node_services::mempool;
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::{
components::{
inbound::{Inbound, InboundSetupData},
mempool::{self, gossip_mempool_transaction_id, Mempool},
mempool::{gossip_mempool_transaction_id, Config as MempoolConfig, Mempool},
sync::{self, BlockGossipError, SyncStatus},
},
BoxError,
@ -697,7 +698,7 @@ async fn setup(
.service(BoxService::new(mock_tx_verifier.clone()));
// Mempool
let mempool_config = mempool::Config::default();
let mempool_config = MempoolConfig::default();
let (mut mempool_service, transaction_receiver) = Mempool::new(
&mempool_config,
peer_set.clone(),

View File

@ -30,13 +30,10 @@ use futures::{future::FutureExt, stream::Stream};
use tokio::sync::watch;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
use zebra_chain::{
block::Height,
chain_tip::ChainTip,
transaction::{UnminedTx, UnminedTxId},
};
use zebra_chain::{block::Height, chain_tip::ChainTip, transaction::UnminedTxId};
use zebra_consensus::{error::TransactionError, transaction};
use zebra_network as zn;
use zebra_node_services::mempool::{Request, Response};
use zebra_state as zs;
use zebra_state::{ChainTipChange, TipAction};
@ -65,10 +62,10 @@ pub use storage::{
};
#[cfg(test)]
pub use storage::tests::unmined_transactions_in_blocks;
pub use self::{storage::tests::unmined_transactions_in_blocks, tests::UnboxMempoolError};
use downloads::{
Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
};
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
@ -79,87 +76,6 @@ type TxVerifier = Buffer<
>;
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
/// A mempool service request.
///
/// Requests can query the current set of mempool transactions,
/// queue transactions to be downloaded and verified, or
/// run the mempool to check for newly verified transactions.
///
/// Requests can't modify the mempool directly,
/// because all mempool transactions must be verified.
#[derive(Debug, Eq, PartialEq)]
#[allow(dead_code)]
pub enum Request {
/// Query all transaction IDs in the mempool.
TransactionIds,
/// Query matching transactions in the mempool,
/// using a unique set of [`UnminedTxId`]s.
TransactionsById(HashSet<UnminedTxId>),
/// Query matching cached rejected transaction IDs in the mempool,
/// using a unique set of [`UnminedTxId`]s.
RejectedTransactionIds(HashSet<UnminedTxId>),
/// Queue a list of gossiped transactions or transaction IDs, or
/// crawled transaction IDs.
///
/// The transaction downloader checks for duplicates across IDs and transactions.
Queue(Vec<Gossip>),
/// Check for newly verified transactions.
///
/// The transaction downloader does not push transactions into the mempool.
/// So a task should send this request regularly (every 5-10 seconds).
///
/// These checks also happen for other request variants,
/// but we can't rely on peers to send queries regularly,
/// and crawler queue requests depend on peer responses.
/// Also, crawler requests aren't frequent enough for transaction propagation.
///
/// # Correctness
///
/// This request is required to avoid hangs in the mempool.
///
/// The queue checker task can't call `poll_ready` directly on the [`Mempool`] service,
/// because the mempool service is wrapped in a `Buffer`.
/// Calling [`Buffer::poll_ready`] reserves a buffer slot, which can cause hangs when
/// too many slots are reserved but unused:
/// <https://docs.rs/tower/0.4.10/tower/buffer/struct.Buffer.html#a-note-on-choosing-a-bound>
CheckForVerifiedTransactions,
}
/// A response to a mempool service request.
///
/// Responses can read the current set of mempool transactions,
/// check the queued status of transactions to be downloaded and verified, or
/// confirm that the mempool has been checked for newly verified transactions.
#[derive(Debug)]
pub enum Response {
/// Returns all transaction IDs from the mempool.
TransactionIds(HashSet<UnminedTxId>),
/// Returns matching transactions from the mempool.
///
/// Since the [`TransactionsById`] request is unique,
/// the response transactions are also unique.
Transactions(Vec<UnminedTx>),
/// Returns matching cached rejected transaction IDs from the mempool,
RejectedTransactionIds(HashSet<UnminedTxId>),
/// Returns a list of queue results.
///
/// These are the results of the initial queue checks.
/// The transaction may also fail download or verification later.
///
/// Each result matches the request at the corresponding vector index.
Queued(Vec<Result<(), MempoolError>>),
/// Confirms that the mempool has checked for recently verified transactions.
CheckedForVerifiedTransactions,
}
/// The state of the mempool.
///
/// Indicates whether it is enabled or disabled and, if enabled, contains
@ -489,13 +405,14 @@ impl Service<Request> for Mempool {
// Queue mempool candidates
Request::Queue(gossiped_txs) => {
let rsp: Vec<Result<(), MempoolError>> = gossiped_txs
let rsp: Vec<Result<(), BoxError>> = gossiped_txs
.into_iter()
.map(|gossiped_tx| {
.map(|gossiped_tx| -> Result<(), MempoolError> {
storage.should_download_or_verify(gossiped_tx.id())?;
tx_downloads.download_if_needed_and_verify(gossiped_tx)?;
Ok(())
})
.map(|result| result.map_err(BoxError::from))
.collect();
async move { Ok(Response::Queued(rsp)) }.boxed()
}
@ -522,8 +439,10 @@ impl Service<Request> for Mempool {
Request::Queue(gossiped_txs) => Response::Queued(
// Special case; we can signal the error inside the response,
// because the inbound service ignores inner errors.
iter::repeat(Err(MempoolError::Disabled))
iter::repeat(MempoolError::Disabled)
.take(gossiped_txs.len())
.map(BoxError::from)
.map(Err)
.collect(),
),

View File

@ -56,10 +56,11 @@ use tracing_futures::Instrument;
use zebra_chain::{block::Height, transaction::UnminedTxId};
use zebra_network as zn;
use zebra_node_services::mempool::Gossip;
use zebra_state::ChainTipChange;
use crate::components::{
mempool::{self, downloads::Gossip, Config},
mempool::{self, Config},
sync::SyncStatus,
};

View File

@ -10,18 +10,21 @@ use tokio::time;
use zebra_chain::{parameters::Network, transaction::UnminedTxId};
use zebra_network as zn;
use zebra_node_services::mempool::Gossip;
use zebra_state::ChainTipSender;
use zebra_test::mock_service::{MockService, PropTestAssertion};
use crate::components::{
mempool::{
self,
crawler::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY},
downloads::Gossip,
error::MempoolError,
Config,
use crate::{
components::{
mempool::{
self,
crawler::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY},
error::MempoolError,
Config,
},
sync::RecentSyncLengths,
},
sync::RecentSyncLengths,
BoxError,
};
/// The number of iterations to crawl while testing.
@ -310,8 +313,13 @@ async fn crawler_iteration(
async fn respond_to_queue_request(
mempool: &mut MockMempool,
expected_transaction_ids: HashSet<UnminedTxId>,
response: Vec<Result<(), MempoolError>>,
response: impl IntoIterator<Item = Result<(), MempoolError>>,
) -> Result<(), TestCaseError> {
let response = response
.into_iter()
.map(|result| result.map_err(BoxError::from))
.collect();
mempool
.expect_request_that(|req| {
if let mempool::Request::Queue(req) = req {

View File

@ -39,9 +39,10 @@ use tokio::{sync::oneshot, task::JoinHandle};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::transaction::{self, UnminedTx, UnminedTxId, VerifiedUnminedTx};
use zebra_chain::transaction::{self, UnminedTxId, VerifiedUnminedTx};
use zebra_consensus::transaction as tx;
use zebra_network as zn;
use zebra_node_services::mempool::Gossip;
use zebra_state as zs;
use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
@ -111,35 +112,6 @@ pub enum TransactionDownloadVerifyError {
Invalid(#[from] zebra_consensus::error::TransactionError),
}
/// A gossiped transaction, which can be the transaction itself or just its ID.
#[derive(Debug, Eq, PartialEq)]
pub enum Gossip {
Id(UnminedTxId),
Tx(UnminedTx),
}
impl Gossip {
/// Return the [`UnminedTxId`] of a gossiped transaction.
pub fn id(&self) -> UnminedTxId {
match self {
Gossip::Id(txid) => *txid,
Gossip::Tx(tx) => tx.id,
}
}
}
impl From<UnminedTxId> for Gossip {
fn from(txid: UnminedTxId) -> Self {
Gossip::Id(txid)
}
}
impl From<UnminedTx> for Gossip {
fn from(tx: UnminedTx) -> Self {
Gossip::Tx(tx)
}
}
/// Represents a [`Stream`] of download and verification tasks.
#[pin_project(PinnedDrop)]
#[derive(Debug)]

View File

@ -2,8 +2,13 @@ use std::pin::Pin;
use tower::ServiceExt;
use super::{storage::Storage, ActiveState, InboundTxDownloads, Mempool, Request};
use crate::components::sync::{RecentSyncLengths, SyncStatus};
use super::{
error::MempoolError, storage::Storage, ActiveState, InboundTxDownloads, Mempool, Request,
};
use crate::{
components::sync::{RecentSyncLengths, SyncStatus},
BoxError,
};
mod prop;
mod vector;
@ -48,3 +53,41 @@ impl Mempool {
.expect("unexpected failure when checking for verified transactions");
}
}
/// Helper trait to extract the [`MempoolError`] from a [`BoxError`].
pub trait UnboxMempoolError {
/// Extract and unbox the [`MempoolError`] stored inside `self`.
///
/// # Panics
///
/// If the `boxed_error` is not a boxed [`MempoolError`].
fn unbox_mempool_error(self) -> MempoolError;
}
impl UnboxMempoolError for MempoolError {
fn unbox_mempool_error(self) -> MempoolError {
self
}
}
impl UnboxMempoolError for BoxError {
fn unbox_mempool_error(self) -> MempoolError {
self.downcast::<MempoolError>()
.expect("error is not an expected `MempoolError`")
// TODO: use `Box::into_inner` when it becomes stabilized.
.as_ref()
.clone()
}
}
impl<T, E> UnboxMempoolError for Result<T, E>
where
E: UnboxMempoolError,
{
fn unbox_mempool_error(self) -> MempoolError {
match self {
Ok(_) => panic!("expected a mempool error, but got a success instead"),
Err(error) => error.unbox_mempool_error(),
}
}
}

View File

@ -11,6 +11,7 @@ use zebra_consensus::transaction as tx;
use zebra_state::Config as StateConfig;
use zebra_test::mock_service::{MockService, PanicAssertion};
use super::UnboxMempoolError;
use crate::components::{
mempool::{self, storage::tests::unmined_transactions_in_blocks, *},
sync::RecentSyncLengths,
@ -228,15 +229,12 @@ async fn mempool_queue_single() -> Result<(), Report> {
let mut in_mempool_count = 0;
let mut evicted_count = 0;
for response in queued_responses {
match response {
Ok(_) => panic!("all transactions should have been rejected"),
Err(e) => match e {
MempoolError::StorageEffectsChain(
SameEffectsChainRejectionError::RandomlyEvicted,
) => evicted_count += 1,
MempoolError::InMempool => in_mempool_count += 1,
_ => panic!("transaction should not be rejected with reason {:?}", e),
},
match response.unbox_mempool_error() {
MempoolError::StorageEffectsChain(SameEffectsChainRejectionError::RandomlyEvicted) => {
evicted_count += 1
}
MempoolError::InMempool => in_mempool_count += 1,
error => panic!("transaction should not be rejected with reason {:?}", error),
}
}
assert_eq!(in_mempool_count, transactions.len() - 1);
@ -339,8 +337,16 @@ async fn mempool_service_disabled() -> Result<(), Report> {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(queued_responses[0], Err(MempoolError::Disabled));
assert_eq!(
queued_responses
.into_iter()
.next()
.unwrap()
.unbox_mempool_error(),
MempoolError::Disabled
);
Ok(())
}
@ -588,10 +594,12 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
};
assert_eq!(queued_responses.len(), 1);
assert!(matches!(
queued_responses[0],
Err(MempoolError::StorageExactTip(
ExactTipRejectionError::FailedVerification(_)
))
queued_responses
.into_iter()
.next()
.unwrap()
.unbox_mempool_error(),
MempoolError::StorageExactTip(ExactTipRejectionError::FailedVerification(_))
));
Ok(())