Verify inbound PushTransactions (#2727)

* Verify inbound PushTransactions

* Add GossipedTx and refactor downloader to use it

* remove grafana changes

* remove TODOs

* Tidy the transaction fetching in mempool downloader

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
Co-authored-by: Deirdre Connolly <durumcrustulum@gmail.com>
This commit is contained in:
Conrado Gouvea 2021-09-09 10:04:44 -03:00 committed by GitHub
parent a2993e8df0
commit f3ee76f202
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 27 deletions

View File

@ -352,18 +352,20 @@ impl Service<zn::Request> for Inbound {
})
.boxed()
}
zn::Request::PushTransaction(_transaction) => {
debug!("ignoring unimplemented request");
// TODO: send to Tx Download & Verify Stream
// https://github.com/ZcashFoundation/zebra/issues/2692
zn::Request::PushTransaction(transaction) => {
if let Setup::Initialized { tx_downloads, .. } = &mut self.network_setup {
tx_downloads.download_if_needed_and_verify(transaction.into());
} else {
info!(
"ignoring `AdvertiseTransactionIds` request from remote peer during network setup"
);
}
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseTransactionIds(transactions) => {
if let Setup::Initialized { tx_downloads, .. } = &mut self.network_setup {
// TODO: check if we're close to the tip before proceeding?
// what do we do if it's not?
for txid in transactions {
tx_downloads.download_and_verify(txid);
tx_downloads.download_if_needed_and_verify(txid.into());
}
} else {
info!(

View File

@ -16,7 +16,7 @@ use tokio::{sync::oneshot, task::JoinHandle};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::transaction::UnminedTxId;
use zebra_chain::transaction::{UnminedTx, UnminedTxId};
use zebra_consensus::transaction as tx;
use zebra_network as zn;
use zebra_state as zs;
@ -83,6 +83,34 @@ pub enum DownloadAction {
FullQueue,
}
/// A gossiped transaction, which can be the transaction itself or just its ID.
pub enum GossipedTx {
Id(UnminedTxId),
Tx(UnminedTx),
}
impl GossipedTx {
/// Return the [`UnminedTxId`] of a gossiped transaction.
fn id(&self) -> UnminedTxId {
match self {
GossipedTx::Id(txid) => *txid,
GossipedTx::Tx(tx) => tx.id,
}
}
}
impl From<UnminedTxId> for GossipedTx {
fn from(txid: UnminedTxId) -> Self {
GossipedTx::Id(txid)
}
}
impl From<UnminedTx> for GossipedTx {
fn from(tx: UnminedTx) -> Self {
GossipedTx::Tx(tx)
}
}
/// Represents a [`Stream`] of download and verification tasks.
#[pin_project]
#[derive(Debug)]
@ -194,11 +222,13 @@ where
}
}
/// Queue a transaction for download and verification.
/// Queue a transaction for download (if needed) and verification.
///
/// Returns the action taken in response to the queue request.
#[instrument(skip(self, txid), fields(txid = %txid))]
pub fn download_and_verify(&mut self, txid: UnminedTxId) -> DownloadAction {
#[instrument(skip(self, gossiped_tx), fields(txid = %gossiped_tx.id()))]
pub fn download_if_needed_and_verify(&mut self, gossiped_tx: GossipedTx) -> DownloadAction {
let txid = gossiped_tx.id();
if self.cancel_handles.contains_key(&txid) {
tracing::debug!(
?txid,
@ -228,7 +258,7 @@ where
let mut mempool = self.mempool.clone();
let fut = async move {
Self::should_download(&mut state, &mut mempool, txid).await?;
Self::should_download_or_verify(&mut state, &mut mempool, txid).await?;
let height = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()),
@ -238,19 +268,25 @@ where
}?;
let height = (height + 1).ok_or_else(|| eyre!("no next height"))?;
let tx = if let zn::Response::Transactions(txs) = network
.oneshot(zn::Request::TransactionsById(
std::iter::once(txid).collect(),
))
.await?
{
txs.into_iter()
.next()
.expect("successful response has the transaction in it")
} else {
unreachable!("wrong response to transaction request");
let tx = match gossiped_tx {
GossipedTx::Id(txid) => {
let req = zn::Request::TransactionsById(std::iter::once(txid).collect());
let tx = match network.oneshot(req).await? {
zn::Response::Transactions(mut txs) => txs
.pop()
.expect("successful response has the transaction in it"),
_ => unreachable!("wrong response to transaction request"),
};
metrics::counter!("gossip.downloaded.transaction.count", 1);
tx
}
GossipedTx::Tx(tx) => {
metrics::counter!("gossip.pushed.transaction.count", 1);
tx
}
};
metrics::counter!("gossip.downloaded.transaction.count", 1);
let result = verifier
.oneshot(tx::Request::Mempool {
@ -302,11 +338,12 @@ where
DownloadAction::AddedToQueue
}
/// Check if transaction should be downloaded and verified.
/// Check if transaction should be downloaded and/or verified.
///
/// If it is already in the mempool (or in its rejected list)
/// or in state, then it shouldn't be downloaded (and an error is returned).
async fn should_download(
/// or in state, then it shouldn't be downloaded/verified
/// (and an error is returned).
async fn should_download_or_verify(
state: &mut ZS,
mempool: &mut ZM,
txid: UnminedTxId,