Add transaction downloader and verifier (#2679)

* Add transaction downloader

* Changed mempool downloader to be like inbound

* Verifier working (logs result)

* Apply suggestions from code review

Co-authored-by: teor <teor@riseup.net>

* Apply suggestions from code review

Co-authored-by: teor <teor@riseup.net>

* Fix coinbase check for mempool, improve is_coinbase() docs

* Change other downloads.rs docs to reflect the mempool downloads.rs changes

* Change TIMEOUTs to downloads.rs; add docs

* Renamed is_coinbase() to has_valid_coinbase_transaction_inputs() and contains_coinbase_input() to has_any_coinbase_inputs(); reorder checks

* Validate network upgrade for V4 transactions; check before computing sighash (for V5 too)

* Add block_ prefix to downloads and verifier

* Update zebra-consensus/src/transaction.rs

Co-authored-by: teor <teor@riseup.net>

* Add consensus doc; add more Block prefixes

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Conrado Gouvea 2021-09-01 21:06:20 -03:00 committed by GitHub
parent b6fe816473
commit 1ccb2de7c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 662 additions and 58 deletions

View File

@ -0,0 +1,207 @@
{
"__inputs": [
{
"name": "DS_PROMETHEUS-ZEBRA",
"label": "Prometheus-Zebra",
"description": "",
"type": "datasource",
"pluginId": "prometheus",
"pluginName": "Prometheus"
}
],
"__requires": [
{
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "8.1.2"
},
{
"type": "panel",
"id": "graph",
"name": "Graph (old)",
"version": ""
},
{
"type": "datasource",
"id": "prometheus",
"name": "Prometheus",
"version": "1.0.0"
}
],
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"target": {
"limit": 100,
"matchAny": false,
"tags": [],
"type": "dashboard"
},
"type": "dashboard"
}
]
},
"editable": true,
"gnetId": null,
"graphTooltip": 0,
"id": null,
"iteration": 1630092146360,
"links": [],
"panels": [
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 7,
"w": 24,
"x": 0,
"y": 0
},
"hiddenSeries": false,
"id": 6,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "8.1.2",
"pointradius": 2,
"points": false,
"renderer": "flot",
"repeatDirection": "h",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "rate(gossip_downloaded_transaction_count{job=\"$job\"}[1m]) * 60",
"interval": "",
"legendFormat": "gossip_downloaded_transaction_count per min",
"refId": "C"
},
{
"exemplar": true,
"expr": "rate(gossip_verified_transaction_count{job=\"$job\"}[1m]) * 60",
"interval": "",
"legendFormat": "gossip_verified_transaction_count per min",
"refId": "D"
},
{
"exemplar": true,
"expr": "gossip_queued_transaction_count{job=\"$job\"}",
"interval": "",
"legendFormat": "gossip_queued_transaction_count",
"refId": "E"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Transaction Verifier Gossip Count - $job",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"refresh": "5s",
"schemaVersion": 30,
"style": "dark",
"tags": [],
"templating": {
"list": [
{
"allValue": null,
"current": {},
"datasource": "${DS_PROMETHEUS-ZEBRA}",
"definition": "label_values(zcash_chain_verified_block_height, job)",
"description": null,
"error": null,
"hide": 0,
"includeAll": true,
"label": null,
"multi": true,
"name": "job",
"options": [],
"query": {
"query": "label_values(zcash_chain_verified_block_height, job)",
"refId": "StandardVariableQuery"
},
"refresh": 1,
"regex": "",
"skipUrlSync": false,
"sort": 1,
"tagValuesQuery": "",
"tagsQuery": "",
"type": "query",
"useTags": false
}
]
},
"time": {
"from": "now-30m",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "transaction verification",
"uid": "oBEHvS4nz",
"version": 2
}

View File

@ -376,8 +376,9 @@ impl Transaction {
}
}
/// Returns `true` if this transaction is a coinbase transaction.
pub fn is_coinbase(&self) -> bool {
/// Returns `true` if this transaction has valid inputs for a coinbase
/// transaction, that is, has a single input and it is a coinbase input.
pub fn has_valid_coinbase_transaction_inputs(&self) -> bool {
self.inputs().len() == 1
&& matches!(
self.inputs().get(0),
@ -386,7 +387,7 @@ impl Transaction {
}
/// Returns `true` if transaction contains any coinbase inputs.
pub fn contains_coinbase_input(&self) -> bool {
pub fn has_any_coinbase_inputs(&self) -> bool {
self.inputs()
.iter()
.any(|input| matches!(input, transparent::Input::Coinbase { .. }))

View File

@ -428,7 +428,7 @@ impl Transaction {
&mut self,
outputs: &HashMap<transparent::OutPoint, transparent::Output>,
) -> Result<Amount<NonNegative>, ValueBalanceError> {
if self.is_coinbase() {
if self.has_valid_coinbase_transaction_inputs() {
// TODO: if needed, fixup coinbase:
// - miner subsidy
// - founders reward or funding streams (hopefully not?)

View File

@ -12,7 +12,7 @@
//! unmined transactions. They can be used to handle transactions regardless of version,
//! and get the [`WtxId`] or [`Hash`] when required.
use std::sync::Arc;
use std::{fmt, sync::Arc};
#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
@ -93,6 +93,15 @@ impl From<&WtxId> for UnminedTxId {
}
}
impl fmt::Display for UnminedTxId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Legacy(hash) => hash.fmt(f),
Witnessed(id) => id.fmt(f),
}
}
}
impl UnminedTxId {
/// Create a new `UnminedTxId` using a v1-v4 legacy transaction ID.
///

View File

@ -157,7 +157,7 @@ pub(crate) fn new_transaction_ordered_outputs(
) -> HashMap<transparent::OutPoint, OrderedUtxo> {
let mut new_ordered_outputs = HashMap::new();
let from_coinbase = transaction.is_coinbase();
let from_coinbase = transaction.has_valid_coinbase_transaction_inputs();
for (output_index_in_transaction, output) in transaction.outputs().iter().cloned().enumerate() {
let output_index_in_transaction = output_index_in_transaction
.try_into()

View File

@ -27,10 +27,10 @@ pub fn coinbase_is_first(block: &Block) -> Result<(), BlockError> {
.get(0)
.ok_or(BlockError::NoTransactions)?;
let mut rest = block.transactions.iter().skip(1);
if !first.is_coinbase() {
if !first.has_valid_coinbase_transaction_inputs() {
return Err(TransactionError::CoinbasePosition)?;
}
if rest.any(|tx| tx.contains_coinbase_input()) {
if rest.any(|tx| tx.has_any_coinbase_inputs()) {
return Err(TransactionError::CoinbaseAfterFirst)?;
}

View File

@ -41,6 +41,9 @@ pub enum TransactionError {
#[error("coinbase transaction MUST NOT have the EnableSpendsOrchard flag set")]
CoinbaseHasEnableSpendsOrchard,
#[error("coinbase inputs MUST NOT exist in mempool")]
CoinbaseInMempool,
#[error("coinbase transaction failed subsidy validation")]
Subsidy(#[from] SubsidyError),

View File

@ -47,7 +47,7 @@ mod config;
mod parameters;
mod primitives;
mod script;
mod transaction;
pub mod transaction;
pub mod chain;
#[allow(missing_docs)]

View File

@ -1,3 +1,5 @@
//! Asynchronous verification of transactions.
//!
use std::{
collections::HashMap,
future::Future,
@ -50,6 +52,7 @@ where
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send + 'static,
{
/// Create a new transaction verifier.
pub fn new(network: Network, script_verifier: script::Verifier<ZS>) -> Self {
Self {
network,
@ -165,11 +168,6 @@ where
// TODO: break up each chunk into its own method
fn call(&mut self, req: Request) -> Self::Future {
if req.is_mempool() {
// XXX determine exactly which rules apply to mempool transactions
unimplemented!("Zebra does not yet have a mempool (#2309)");
}
let script_verifier = self.script_verifier.clone();
let network = self.network;
@ -183,7 +181,10 @@ where
// Do basic checks first
check::has_inputs_and_outputs(&tx)?;
if tx.is_coinbase() {
if req.is_mempool() && tx.has_any_coinbase_inputs() {
return Err(TransactionError::CoinbaseInMempool);
}
if tx.has_valid_coinbase_transaction_inputs() {
check::coinbase_tx_no_prevout_joinsplit_spend(&tx)?;
}
@ -278,6 +279,9 @@ where
) -> Result<AsyncChecks, TransactionError> {
let tx = request.transaction();
let upgrade = request.upgrade(network);
Self::verify_v4_transaction_network_upgrade(&tx, upgrade)?;
let shielded_sighash = tx.sighash(upgrade, HashType::ALL, None);
Ok(
@ -298,6 +302,36 @@ where
)
}
/// Verifies if a V4 `transaction` is supported by `network_upgrade`.
fn verify_v4_transaction_network_upgrade(
transaction: &Transaction,
network_upgrade: NetworkUpgrade,
) -> Result<(), TransactionError> {
match network_upgrade {
// Supports V4 transactions
//
// Consensus rules:
// > [Sapling to Canopy inclusive, pre-NU5] The transaction version number MUST be 4, ...
// >
// > [NU5 onward] The transaction version number MUST be 4 or 5.
//
// https://zips.z.cash/protocol/protocol.pdf#txnconsensus
NetworkUpgrade::Sapling
| NetworkUpgrade::Blossom
| NetworkUpgrade::Heartwood
| NetworkUpgrade::Canopy
| NetworkUpgrade::Nu5 => Ok(()),
// Does not support V4 transactions
NetworkUpgrade::Genesis
| NetworkUpgrade::BeforeOverwinter
| NetworkUpgrade::Overwinter => Err(TransactionError::UnsupportedByNetworkUpgrade(
transaction.version(),
network_upgrade,
)),
}
}
/// Verify a V5 transaction.
///
/// Returns a set of asynchronous checks that must all succeed for the transaction to be
@ -327,10 +361,11 @@ where
) -> Result<AsyncChecks, TransactionError> {
let transaction = request.transaction();
let upgrade = request.upgrade(network);
let shielded_sighash = transaction.sighash(upgrade, HashType::ALL, None);
Self::verify_v5_transaction_network_upgrade(&transaction, upgrade)?;
let shielded_sighash = transaction.sighash(upgrade, HashType::ALL, None);
let _async_checks = Self::verify_transparent_inputs_and_outputs(
&request,
network,
@ -363,6 +398,11 @@ where
) -> Result<(), TransactionError> {
match network_upgrade {
// Supports V5 transactions
//
// Consensus rules:
// > [NU5 onward] The transaction version number MUST be 4 or 5.
//
// https://zips.z.cash/protocol/protocol.pdf#txnconsensus
NetworkUpgrade::Nu5 => Ok(()),
// Does not support V5 transactions
@ -389,7 +429,7 @@ where
) -> Result<AsyncChecks, TransactionError> {
let transaction = request.transaction();
if transaction.is_coinbase() {
if transaction.has_valid_coinbase_transaction_inputs() {
// The script verifier only verifies PrevOut inputs and their corresponding UTXOs.
// Coinbase transactions don't have any PrevOut inputs.
Ok(AsyncChecks::new())

View File

@ -51,7 +51,7 @@ pub fn has_inputs_and_outputs(tx: &Transaction) -> Result<(), TransactionError>
///
/// https://zips.z.cash/protocol/protocol.pdf#txnencodingandconsensus
pub fn coinbase_tx_no_prevout_joinsplit_spend(tx: &Transaction) -> Result<(), TransactionError> {
if tx.is_coinbase() {
if tx.has_valid_coinbase_transaction_inputs() {
if tx.contains_prevout_input() {
return Err(TransactionError::CoinbaseHasPrevOutInput);
} else if tx.joinsplit_count() > 0 {

View File

@ -165,7 +165,7 @@ fn v5_coinbase_transaction_without_enable_spends_flag_passes_validation() {
zebra_test::vectors::MAINNET_BLOCKS.iter(),
)
.rev()
.find(|transaction| transaction.is_coinbase())
.find(|transaction| transaction.has_valid_coinbase_transaction_inputs())
.expect("At least one fake V5 coinbase transaction in the test vectors");
insert_fake_orchard_shielded_data(&mut transaction);
@ -180,7 +180,7 @@ fn v5_coinbase_transaction_with_enable_spends_flag_fails_validation() {
zebra_test::vectors::MAINNET_BLOCKS.iter(),
)
.rev()
.find(|transaction| transaction.is_coinbase())
.find(|transaction| transaction.has_valid_coinbase_transaction_inputs())
.expect("At least one fake V5 coinbase transaction in the test vectors");
let shielded_data = insert_fake_orchard_shielded_data(&mut transaction);
@ -702,7 +702,8 @@ fn v4_with_sapling_spends() {
let (height, transaction) = test_transactions(network)
.rev()
.filter(|(_, transaction)| {
!transaction.is_coinbase() && transaction.inputs().is_empty()
!transaction.has_valid_coinbase_transaction_inputs()
&& transaction.inputs().is_empty()
})
.find(|(_, transaction)| transaction.sapling_spends_per_anchor().next().is_some())
.expect("No transaction found with Sapling spends");
@ -739,7 +740,8 @@ fn v4_with_sapling_outputs_and_no_spends() {
let (height, transaction) = test_transactions(network)
.rev()
.filter(|(_, transaction)| {
!transaction.is_coinbase() && transaction.inputs().is_empty()
!transaction.has_valid_coinbase_transaction_inputs()
&& transaction.inputs().is_empty()
})
.find(|(_, transaction)| {
transaction.sapling_spends_per_anchor().next().is_none()
@ -781,7 +783,10 @@ fn v5_with_sapling_spends() {
let transaction =
fake_v5_transactions_for_network(network, zebra_test::vectors::MAINNET_BLOCKS.iter())
.rev()
.filter(|transaction| !transaction.is_coinbase() && transaction.inputs().is_empty())
.filter(|transaction| {
!transaction.has_valid_coinbase_transaction_inputs()
&& transaction.inputs().is_empty()
})
.find(|transaction| transaction.sapling_spends_per_anchor().next().is_some())
.expect("No transaction found with Sapling spends");

View File

@ -227,7 +227,7 @@ pub fn remaining_transaction_value(
) -> Result<(), ValidateContextError> {
for (tx_index_in_block, transaction) in prepared.block.transactions.iter().enumerate() {
// TODO: check coinbase transaction remaining value (#338, #1162)
if transaction.is_coinbase() {
if transaction.has_valid_coinbase_transaction_inputs() {
continue;
}

View File

@ -91,7 +91,7 @@ async fn test_populated_state_responds_correctly(
Ok(Response::Transaction(Some(transaction.clone()))),
));
let from_coinbase = transaction.is_coinbase();
let from_coinbase = transaction.has_valid_coinbase_transaction_inputs();
for (index, output) in transaction.outputs().iter().cloned().enumerate() {
let outpoint = transparent::OutPoint {
hash: transaction_hash,

View File

@ -57,7 +57,7 @@ impl StartCmd {
info!("initializing verifiers");
// TODO: use the transaction verifier to verify mempool transactions (#2637, #2606)
let (chain_verifier, _tx_verifier) = zebra_consensus::chain::init(
let (chain_verifier, tx_verifier) = zebra_consensus::chain::init(
config.consensus.clone(),
config.network.network,
state.clone(),
@ -76,6 +76,7 @@ impl StartCmd {
setup_rx,
state.clone(),
chain_verifier.clone(),
tx_verifier.clone(),
));
let (peer_set, address_book) =

View File

@ -17,19 +17,28 @@ use zebra_network as zn;
use zebra_state as zs;
use zebra_chain::block::{self, Block};
use zebra_consensus::chain::VerifyChainError;
use zebra_consensus::transaction;
use zebra_consensus::{chain::VerifyChainError, error::TransactionError};
use zebra_network::AddressBook;
use super::mempool::downloads::{
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
};
// Re-use the syncer timeouts for consistency.
use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
mod downloads;
use downloads::Downloads;
use downloads::Downloads as BlockDownloads;
type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
type Verifier = Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>;
type InboundDownloads = Downloads<Timeout<Outbound>, Timeout<Verifier>, State>;
type BlockVerifier = Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>;
type TxVerifier = Buffer<
BoxService<transaction::Request, transaction::Response, TransactionError>,
transaction::Request,
>;
type InboundBlockDownloads = BlockDownloads<Timeout<Outbound>, Timeout<BlockVerifier>, State>;
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
pub type NetworkSetupData = (Outbound, Arc<std::sync::Mutex<AddressBook>>);
@ -44,9 +53,13 @@ pub enum Setup {
/// after the network is set up.
network_setup: oneshot::Receiver<NetworkSetupData>,
/// A service that verifies downloaded blocks. Given to `downloads`
/// A service that verifies downloaded blocks. Given to `block_downloads`
/// after the network is set up.
verifier: Verifier,
block_verifier: BlockVerifier,
/// A service that verifies downloaded transactions. Given to `tx_downloads`
/// after the network is set up.
tx_verifier: TxVerifier,
},
/// Network setup is complete.
@ -57,7 +70,9 @@ pub enum Setup {
address_book: Arc<std::sync::Mutex<zn::AddressBook>>,
/// A `futures::Stream` that downloads and verifies gossiped blocks.
downloads: Pin<Box<InboundDownloads>>,
block_downloads: Pin<Box<InboundBlockDownloads>>,
tx_downloads: Pin<Box<InboundTxDownloads>>,
},
/// Temporary state used in the service's internal network initialization
@ -117,12 +132,14 @@ impl Inbound {
pub fn new(
network_setup: oneshot::Receiver<NetworkSetupData>,
state: State,
verifier: Verifier,
block_verifier: BlockVerifier,
tx_verifier: TxVerifier,
) -> Self {
Self {
network_setup: Setup::AwaitingNetwork {
network_setup,
verifier,
block_verifier,
tx_verifier,
},
state,
}
@ -154,18 +171,25 @@ impl Service<zn::Request> for Inbound {
self.network_setup = match self.take_setup() {
Setup::AwaitingNetwork {
mut network_setup,
verifier,
block_verifier,
tx_verifier,
} => match network_setup.try_recv() {
Ok((outbound, address_book)) => {
let downloads = Box::pin(Downloads::new(
Timeout::new(outbound, BLOCK_DOWNLOAD_TIMEOUT),
Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT),
let block_downloads = Box::pin(BlockDownloads::new(
Timeout::new(outbound.clone(), BLOCK_DOWNLOAD_TIMEOUT),
Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT),
self.state.clone(),
));
let tx_downloads = Box::pin(TxDownloads::new(
Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT),
Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT),
self.state.clone(),
));
result = Ok(());
Setup::Initialized {
address_book,
downloads,
block_downloads,
tx_downloads,
}
}
Err(TryRecvError::Empty) => {
@ -173,7 +197,8 @@ impl Service<zn::Request> for Inbound {
result = Ok(());
Setup::AwaitingNetwork {
network_setup,
verifier,
block_verifier,
tx_verifier,
}
}
Err(error @ TryRecvError::Closed) => {
@ -194,14 +219,17 @@ impl Service<zn::Request> for Inbound {
// Clean up completed download tasks, ignoring their results
Setup::Initialized {
address_book,
mut downloads,
mut block_downloads,
mut tx_downloads,
} => {
while let Poll::Ready(Some(_)) = downloads.as_mut().poll_next(cx) {}
while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {}
while let Poll::Ready(Some(_)) = tx_downloads.as_mut().poll_next(cx) {}
result = Ok(());
Setup::Initialized {
address_book,
downloads,
block_downloads,
tx_downloads,
}
}
};
@ -312,15 +340,29 @@ impl Service<zn::Request> for Inbound {
}
zn::Request::PushTransaction(_transaction) => {
debug!("ignoring unimplemented request");
// TODO: send to Tx Download & Verify Stream
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseTransactionIds(_transactions) => {
debug!("ignoring unimplemented request");
zn::Request::AdvertiseTransactionIds(transactions) => {
if let Setup::Initialized { tx_downloads, .. } = &mut self.network_setup {
// TODO: check if we're close to the tip before proceeding?
// what do we do if it's not?
for txid in transactions {
tx_downloads.download_and_verify(txid);
}
} else {
info!(
"ignoring `AdvertiseTransactionIds` request from remote peer during network setup"
);
}
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseBlock(hash) => {
if let Setup::Initialized { downloads, .. } = &mut self.network_setup {
downloads.download_and_verify(hash);
if let Setup::Initialized {
block_downloads, ..
} = &mut self.network_setup
{
block_downloads.download_and_verify(hash);
} else {
info!(
?hash,

View File

@ -32,10 +32,8 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// attacks.
///
/// The maximum block size is 2 million bytes. A deserialized malicious
/// block with ~225_000 transparent outputs can take up 9MB of RAM. As of
/// February 2021, a growing `Vec` can allocate up to 2x its current length,
/// leading to an overall memory usage of 18MB per malicious block. (See
/// #1880 for more details.)
/// block with ~225_000 transparent outputs can take up 9MB of RAM.
/// (See #1880 for more details.)
///
/// Malicious blocks will eventually timeout or fail contextual validation.
/// Once validation fails, the block is dropped, and its memory is deallocated.
@ -116,8 +114,7 @@ where
// If no download and verify tasks have exited since the last poll, this
// task is scheduled for wakeup when the next task becomes ready.
//
// TODO:
// This would be cleaner with poll_map #63514, but that's nightly only.
// TODO: this would be cleaner with poll_map (#2693)
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("block download and verify tasks must not panic") {
Ok(hash) => {
@ -245,7 +242,6 @@ where
});
self.pending.push(task);
// XXX replace with expect_none when stable
assert!(
self.cancel_handles.insert(hash, cancel_tx).is_none(),
"blocks are only queued once"

View File

@ -18,6 +18,7 @@ use zebra_chain::{
use crate::BoxError;
mod crawler;
pub mod downloads;
mod error;
mod storage;

View File

@ -0,0 +1,301 @@
use std::{
collections::HashMap,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use color_eyre::eyre::eyre;
use futures::{
future::TryFutureExt,
ready,
stream::{FuturesUnordered, Stream},
};
use pin_project::pin_project;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::transaction::UnminedTxId;
use zebra_consensus::transaction as tx;
use zebra_network as zn;
use zebra_state as zs;
use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Controls how long we wait for a transaction download request to complete.
///
/// This is currently equal to [`crate::components::sync::BLOCK_DOWNLOAD_TIMEOUT`] for
/// consistency, even though parts of the rationale used for defining the value
/// don't apply here (e.g. we can drop transactions hashes when the queue is full).
pub(crate) const TRANSACTION_DOWNLOAD_TIMEOUT: Duration = BLOCK_DOWNLOAD_TIMEOUT;
/// Controls how long we wait for a transaction verify request to complete.
///
/// This is currently equal to [`crate::components::sync::BLOCK_VERIFY_TIMEOUT`] for
/// consistency.
///
/// This timeout may lead to denial of service, which will be handled in
/// https://github.com/ZcashFoundation/zebra/issues/2694
pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT;
/// The maximum number of concurrent inbound download and verify tasks.
///
/// We expect the mempool crawler to download and verify most mempool transactions, so this bound
/// can be small.
///
/// ## Security
///
/// We use a small concurrency limit, to prevent memory denial-of-service
/// attacks.
///
/// The maximum transaction size is 2 million bytes. A deserialized malicious
/// transaction with ~225_000 transparent outputs can take up 9MB of RAM.
/// (See #1880 for more details.)
///
/// Malicious transactions will eventually timeout or fail validation.
/// Once validation fails, the transaction is dropped, and its memory is deallocated.
///
/// Since Zebra keeps an `inv` index, inbound downloads for malicious transactions
/// will be directed to the malicious node that originally gossiped the hash.
/// Therefore, this attack can be carried out by a single malicious node.
const MAX_INBOUND_CONCURRENCY: usize = 10;
/// The action taken in response to a peer's gossiped transaction hash.
pub enum DownloadAction {
/// The transaction hash was successfully queued for download and verification.
AddedToQueue,
/// The transaction hash is already queued, so this request was ignored.
///
/// Another peer has already gossiped the same hash to us, or the mempool crawler has fetched it.
AlreadyQueued,
/// The queue is at capacity, so this request was ignored.
///
/// The mempool crawler should discover this transaction later.
/// If it is mined into a block, it will be downloaded by the syncer, or the inbound block downloader.
///
/// The queue's capacity is [`MAX_INBOUND_CONCURRENCY`].
FullQueue,
}
/// Represents a [`Stream`] of download and verification tasks.
#[pin_project]
#[derive(Debug)]
pub struct Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
// Services
/// A service that forwards requests to connected peers, and returns their
/// responses.
network: ZN,
/// A service that verifies downloaded transactions.
verifier: ZV,
/// A service that manages cached blockchain state.
state: ZS,
// Internal downloads state
/// A list of pending transaction download and verify tasks.
#[pin]
pending: FuturesUnordered<JoinHandle<Result<UnminedTxId, (BoxError, UnminedTxId)>>>,
/// A list of channels that can be used to cancel pending transaction download and
/// verify tasks.
cancel_handles: HashMap<UnminedTxId, oneshot::Sender<()>>,
}
impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
type Item = Result<UnminedTxId, BoxError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
// CORRECTNESS
//
// The current task must be scheduled for wakeup every time we return
// `Poll::Pending`.
//
// If no download and verify tasks have exited since the last poll, this
// task is scheduled for wakeup when the next task becomes ready.
//
// TODO: this would be cleaner with poll_map (#2693)
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("transaction download and verify tasks must not panic") {
Ok(hash) => {
this.cancel_handles.remove(&hash);
Poll::Ready(Some(Ok(hash)))
}
Err((e, hash)) => {
this.cancel_handles.remove(&hash);
Poll::Ready(Some(Err(e)))
}
}
} else {
Poll::Ready(None)
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.pending.size_hint()
}
}
impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
/// Initialize a new download stream with the provided `network` and
/// `verifier` services.
///
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
/// timeout limits should be applied to the `network` service passed into
/// this constructor.
pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
Self {
network,
verifier,
state,
pending: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
}
}
/// Queue a transaction for download and verification.
///
/// Returns the action taken in response to the queue request.
#[instrument(skip(self, txid), fields(txid = %txid))]
pub fn download_and_verify(&mut self, txid: UnminedTxId) -> DownloadAction {
if self.cancel_handles.contains_key(&txid) {
tracing::debug!(
?txid,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"transaction id already queued for inbound download: ignored transaction"
);
return DownloadAction::AlreadyQueued;
}
if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
tracing::info!(
?txid,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"too many transactions queued for inbound download: ignored transaction"
);
return DownloadAction::FullQueue;
}
// This oneshot is used to signal cancellation to the download task.
let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
let network = self.network.clone();
let verifier = self.verifier.clone();
let state = self.state.clone();
let fut = async move {
// TODO: adapt this for transaction / mempool
// // Check if the block is already in the state.
// // BUG: check if the hash is in any chain (#862).
// // Depth only checks the main chain.
// match state.oneshot(zs::Request::Depth(hash)).await {
// Ok(zs::Response::Depth(None)) => Ok(()),
// Ok(zs::Response::Depth(Some(_))) => Err("already present".into()),
// Ok(_) => unreachable!("wrong response"),
// Err(e) => Err(e),
// }?;
let height = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()),
Ok(zs::Response::Tip(Some((height, _hash)))) => Ok(height),
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}?;
let height = (height + 1).ok_or_else(|| eyre!("no next height"))?;
let tx = if let zn::Response::Transactions(txs) = network
.oneshot(zn::Request::TransactionsById(
std::iter::once(txid).collect(),
))
.await?
{
txs.into_iter()
.next()
.expect("successful response has the transaction in it")
} else {
unreachable!("wrong response to transaction request");
};
metrics::counter!("gossip.downloaded.transaction.count", 1);
let result = verifier
.oneshot(tx::Request::Mempool {
transaction: tx,
height,
})
.await;
tracing::debug!(?txid, ?result, "verified transaction for the mempool");
result
}
.map_ok(|hash| {
metrics::counter!("gossip.verified.transaction.count", 1);
hash
})
// Tack the hash onto the error so we can remove the cancel handle
// on failure as well as on success.
.map_err(move |e| (e, txid))
.in_current_span();
let task = tokio::spawn(async move {
// TODO: if the verifier and cancel are both ready, which should we
// prefer? (Currently, select! chooses one at random.)
tokio::select! {
_ = &mut cancel_rx => {
tracing::trace!("task cancelled prior to completion");
metrics::counter!("gossip.cancelled.count", 1);
Err(("canceled".into(), txid))
}
verification = fut => verification,
}
});
self.pending.push(task);
assert!(
self.cancel_handles.insert(txid, cancel_tx).is_none(),
"transactions are only queued once"
);
tracing::debug!(
?txid,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"queued transaction hash for download"
);
metrics::gauge!("gossip.queued.transaction.count", self.pending.len() as _);
DownloadAction::AddedToQueue
}
}

View File

@ -80,8 +80,7 @@ where
// If no download and verify tasks have exited since the last poll, this
// task is scheduled for wakeup when the next task becomes ready.
//
// TODO:
// This would be cleaner with poll_map #63514, but that's nightly only.
// TODO: this would be cleaner with poll_map (#2693)
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("block download and verify tasks must not panic") {
Ok(hash) => {
@ -203,7 +202,6 @@ where
);
self.pending.push(task);
// XXX replace with expect_none when stable
assert!(
self.cancel_handles.insert(hash, cancel_tx).is_none(),
"blocks are only queued once"