fix(rpc): Return verification errors from `sendrawtransaction` RPC method (#8788)

* Adds a mempool request to wait for a transaction verification result and uses it in `sendrawtransaction` RPC method

* removes unnecessary clone

* fix clippy warnings

* returns verification errors for all `mempool::Queue` requests, removes `QueueRpc` request variant

* returns oneshot channel in mempool::Response::Queue

* updates a test vector to check for download or verification error in mempool::response::Queued result receiver

* Always require tokio as a dependency in zebra-node-services

* checks for closed channel errors in sendrawtransaction and updates a prop test to check that verification errors are propagated correctly
This commit is contained in:
Arya 2024-08-30 16:09:10 -04:00 committed by GitHub
parent 0ef9987e9e
commit 6b95d271d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 132 additions and 45 deletions

View File

@ -34,7 +34,7 @@ rpc-client = [
"serde_json", "serde_json",
] ]
shielded-scan = ["tokio"] shielded-scan = []
[dependencies] [dependencies]
zebra-chain = { path = "../zebra-chain" , version = "1.0.0-beta.39" } zebra-chain = { path = "../zebra-chain" , version = "1.0.0-beta.39" }
@ -48,7 +48,7 @@ jsonrpc-core = { version = "18.0.0", optional = true }
reqwest = { version = "0.11.26", default-features = false, features = ["rustls-tls"], optional = true } reqwest = { version = "0.11.26", default-features = false, features = ["rustls-tls"], optional = true }
serde = { version = "1.0.204", optional = true } serde = { version = "1.0.204", optional = true }
serde_json = { version = "1.0.122", optional = true } serde_json = { version = "1.0.122", optional = true }
tokio = { version = "1.39.2", features = ["time"], optional = true } tokio = { version = "1.39.2", features = ["time", "sync"] }
[dev-dependencies] [dev-dependencies]

View File

@ -4,6 +4,7 @@
use std::collections::HashSet; use std::collections::HashSet;
use tokio::sync::oneshot;
use zebra_chain::transaction::{self, UnminedTx, UnminedTxId}; use zebra_chain::transaction::{self, UnminedTx, UnminedTxId};
#[cfg(feature = "getblocktemplate-rpcs")] #[cfg(feature = "getblocktemplate-rpcs")]
@ -114,13 +115,11 @@ pub enum Response {
/// Returns matching cached rejected [`UnminedTxId`]s from the mempool, /// Returns matching cached rejected [`UnminedTxId`]s from the mempool,
RejectedTransactionIds(HashSet<UnminedTxId>), RejectedTransactionIds(HashSet<UnminedTxId>),
/// Returns a list of queue results. /// Returns a list of initial queue checks results and a oneshot receiver
/// /// for awaiting download and/or verification results.
/// These are the results of the initial queue checks.
/// The transaction may also fail download or verification later.
/// ///
/// Each result matches the request at the corresponding vector index. /// Each result matches the request at the corresponding vector index.
Queued(Vec<Result<(), BoxError>>), Queued(Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>>),
/// Confirms that the mempool has checked for recently verified transactions. /// Confirms that the mempool has checked for recently verified transactions.
CheckedForVerifiedTransactions, CheckedForVerifiedTransactions,

View File

@ -664,7 +664,7 @@ where
let response = mempool.oneshot(request).await.map_server_error()?; let response = mempool.oneshot(request).await.map_server_error()?;
let queue_results = match response { let mut queue_results = match response {
mempool::Response::Queued(results) => results, mempool::Response::Queued(results) => results,
_ => unreachable!("incorrect response variant from mempool service"), _ => unreachable!("incorrect response variant from mempool service"),
}; };
@ -675,10 +675,17 @@ where
"mempool service returned more results than expected" "mempool service returned more results than expected"
); );
tracing::debug!("sent transaction to mempool: {:?}", &queue_results[0]); let queue_result = queue_results
.pop()
.expect("there should be exactly one item in Vec")
.inspect_err(|err| tracing::debug!("sent transaction to mempool: {:?}", &err))
.map_server_error()?
.await;
queue_results[0] tracing::debug!("sent transaction to mempool: {:?}", &queue_result);
.as_ref()
queue_result
.map_server_error()?
.map(|_| SentTransactionHash(transaction_hash)) .map(|_| SentTransactionHash(transaction_hash))
.map_server_error() .map_server_error()
} }

View File

@ -7,6 +7,7 @@ use hex::ToHex;
use jsonrpc_core::{Error, ErrorCode}; use jsonrpc_core::{Error, ErrorCode};
use proptest::{collection::vec, prelude::*}; use proptest::{collection::vec, prelude::*};
use thiserror::Error; use thiserror::Error;
use tokio::sync::oneshot;
use tower::buffer::Buffer; use tower::buffer::Buffer;
use zebra_chain::{ use zebra_chain::{
@ -61,7 +62,9 @@ proptest! {
let unmined_transaction = UnminedTx::from(transaction); let unmined_transaction = UnminedTx::from(transaction);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]);
let response = mempool::Response::Queued(vec![Ok(())]); let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = mempool::Response::Queued(vec![Ok(rsp_rx)]);
mempool mempool
.expect_request(expected_request) .expect_request(expected_request)
@ -111,10 +114,10 @@ proptest! {
.expect("Transaction serializes successfully"); .expect("Transaction serializes successfully");
let transaction_hex = hex::encode(&transaction_bytes); let transaction_hex = hex::encode(&transaction_bytes);
let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex.clone()));
let unmined_transaction = UnminedTx::from(transaction); let unmined_transaction = UnminedTx::from(transaction);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]);
mempool mempool
.expect_request(expected_request) .expect_request(expected_request)
@ -138,6 +141,32 @@ proptest! {
"Result is not a server error: {result:?}" "Result is not a server error: {result:?}"
); );
let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex));
let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]);
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Err("any verification error".into()));
mempool
.expect_request(expected_request)
.await?
.respond(Ok::<_, BoxError>(mempool::Response::Queued(vec![Ok(rsp_rx)])));
let result = send_task
.await
.expect("Sending raw transactions should not panic");
prop_assert!(
matches!(
result,
Err(Error {
code: ErrorCode::ServerError(_),
..
})
),
"Result is not a server error: {result:?}"
);
// The queue task should continue without errors or panics // The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(rpc_tx_queue_task_result.is_none()); prop_assert!(rpc_tx_queue_task_result.is_none());
@ -897,7 +926,9 @@ proptest! {
// now a retry will be sent to the mempool // now a retry will be sent to the mempool
let expected_request = let expected_request =
mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]); mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]);
let response = mempool::Response::Queued(vec![Ok(())]); let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = mempool::Response::Queued(vec![Ok(rsp_rx)]);
mempool mempool
.expect_request(expected_request) .expect_request(expected_request)
@ -997,7 +1028,9 @@ proptest! {
for tx in txs.clone() { for tx in txs.clone() {
let expected_request = let expected_request =
mempool::Request::Queue(vec![mempool::Gossip::Tx(UnminedTx::from(tx))]); mempool::Request::Queue(vec![mempool::Gossip::Tx(UnminedTx::from(tx))]);
let response = mempool::Response::Queued(vec![Ok(())]); let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = mempool::Response::Queued(vec![Ok(rsp_rx)]);
mempool mempool
.expect_request(expected_request) .expect_request(expected_request)

View File

@ -5,7 +5,7 @@ use std::{collections::HashSet, env, sync::Arc};
use proptest::prelude::*; use proptest::prelude::*;
use chrono::Duration; use chrono::Duration;
use tokio::time; use tokio::{sync::oneshot, time};
use tower::ServiceExt; use tower::ServiceExt;
use zebra_chain::{ use zebra_chain::{
@ -196,7 +196,9 @@ proptest! {
let request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); let request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]);
let expected_request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); let expected_request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]);
let send_task = tokio::spawn(mempool.clone().oneshot(request)); let send_task = tokio::spawn(mempool.clone().oneshot(request));
let response = Response::Queued(vec![Ok(())]); let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = Response::Queued(vec![Ok(rsp_rx)]);
mempool mempool
.expect_request(expected_request) .expect_request(expected_request)
@ -337,7 +339,9 @@ proptest! {
// retry will queue the transaction to mempool // retry will queue the transaction to mempool
let gossip = Gossip::Tx(UnminedTx::from(transaction.clone())); let gossip = Gossip::Tx(UnminedTx::from(transaction.clone()));
let expected_request = Request::Queue(vec![gossip]); let expected_request = Request::Queue(vec![gossip]);
let response = Response::Queued(vec![Ok(())]); let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
let response = Response::Queued(vec![Ok(rsp_rx)]);
mempool mempool
.expect_request(expected_request) .expect_request(expected_request)

View File

@ -27,7 +27,7 @@ use std::{
}; };
use futures::{future::FutureExt, stream::Stream}; use futures::{future::FutureExt, stream::Stream};
use tokio::sync::broadcast; use tokio::sync::{broadcast, oneshot};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};
@ -560,7 +560,7 @@ impl Service<Request> for Mempool {
for tx in tx_retries { for tx in tx_retries {
// This is just an efficiency optimisation, so we don't care if queueing // This is just an efficiency optimisation, so we don't care if queueing
// transaction requests fails. // transaction requests fails.
let _result = tx_downloads.download_if_needed_and_verify(tx); let _result = tx_downloads.download_if_needed_and_verify(tx, None);
} }
} }
@ -608,8 +608,8 @@ impl Service<Request> for Mempool {
tracing::trace!("chain grew during tx verification, retrying ..",); tracing::trace!("chain grew during tx verification, retrying ..",);
// We don't care if re-queueing the transaction request fails. // We don't care if re-queueing the transaction request fails.
let _result = let _result = tx_downloads
tx_downloads.download_if_needed_and_verify(tx.transaction.into()); .download_if_needed_and_verify(tx.transaction.into(), None);
} }
} }
Ok(Err((txid, error))) => { Ok(Err((txid, error))) => {
@ -758,16 +758,24 @@ impl Service<Request> for Mempool {
Request::Queue(gossiped_txs) => { Request::Queue(gossiped_txs) => {
trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request"); trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request");
let rsp: Vec<Result<(), BoxError>> = gossiped_txs let rsp: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> =
.into_iter() gossiped_txs
.map(|gossiped_tx| -> Result<(), MempoolError> { .into_iter()
storage.should_download_or_verify(gossiped_tx.id())?; .map(
tx_downloads.download_if_needed_and_verify(gossiped_tx)?; |gossiped_tx| -> Result<
oneshot::Receiver<Result<(), BoxError>>,
MempoolError,
> {
let (rsp_tx, rsp_rx) = oneshot::channel();
storage.should_download_or_verify(gossiped_tx.id())?;
tx_downloads
.download_if_needed_and_verify(gossiped_tx, Some(rsp_tx))?;
Ok(()) Ok(rsp_rx)
}) },
.map(|result| result.map_err(BoxError::from)) )
.collect(); .map(|result| result.map_err(BoxError::from))
.collect();
// We've added transactions to the queue // We've added transactions to the queue
self.update_metrics(); self.update_metrics();

View File

@ -6,7 +6,7 @@ use proptest::{
collection::{hash_set, vec}, collection::{hash_set, vec},
prelude::*, prelude::*,
}; };
use tokio::time; use tokio::{sync::oneshot, time};
use zebra_chain::{ use zebra_chain::{
chain_sync_status::ChainSyncStatus, parameters::Network, transaction::UnminedTxId, chain_sync_status::ChainSyncStatus, parameters::Network, transaction::UnminedTxId,
@ -317,9 +317,17 @@ async fn respond_to_queue_request(
expected_transaction_ids: HashSet<UnminedTxId>, expected_transaction_ids: HashSet<UnminedTxId>,
response: impl IntoIterator<Item = Result<(), MempoolError>>, response: impl IntoIterator<Item = Result<(), MempoolError>>,
) -> Result<(), TestCaseError> { ) -> Result<(), TestCaseError> {
let response = response let response: Vec<Result<oneshot::Receiver<Result<(), BoxError>>, BoxError>> = response
.into_iter() .into_iter()
.map(|result| result.map_err(BoxError::from)) .map(|result| {
result
.map(|_| {
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Ok(()));
rsp_rx
})
.map_err(BoxError::from)
})
.collect(); .collect();
mempool mempool

View File

@ -51,7 +51,7 @@ use zebra_chain::{
use zebra_consensus::transaction as tx; use zebra_consensus::transaction as tx;
use zebra_network as zn; use zebra_network as zn;
use zebra_node_services::mempool::Gossip; use zebra_node_services::mempool::Gossip;
use zebra_state as zs; use zebra_state::{self as zs, CloneError};
use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
@ -105,17 +105,17 @@ pub const MAX_INBOUND_CONCURRENCY: usize = 25;
struct CancelDownloadAndVerify; struct CancelDownloadAndVerify;
/// Errors that can occur while downloading and verifying a transaction. /// Errors that can occur while downloading and verifying a transaction.
#[derive(Error, Debug)] #[derive(Error, Debug, Clone)]
#[allow(dead_code)] #[allow(dead_code)]
pub enum TransactionDownloadVerifyError { pub enum TransactionDownloadVerifyError {
#[error("transaction is already in state")] #[error("transaction is already in state")]
InState, InState,
#[error("error in state service")] #[error("error in state service")]
StateError(#[source] BoxError), StateError(#[source] CloneError),
#[error("error downloading transaction")] #[error("error downloading transaction")]
DownloadFailed(#[source] BoxError), DownloadFailed(#[source] CloneError),
#[error("transaction download / verification was cancelled")] #[error("transaction download / verification was cancelled")]
Cancelled, Cancelled,
@ -243,6 +243,7 @@ where
pub fn download_if_needed_and_verify( pub fn download_if_needed_and_verify(
&mut self, &mut self,
gossiped_tx: Gossip, gossiped_tx: Gossip,
rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
) -> Result<(), MempoolError> { ) -> Result<(), MempoolError> {
let txid = gossiped_tx.id(); let txid = gossiped_tx.id();
@ -295,7 +296,7 @@ where
Ok((Some(height), next_height)) Ok((Some(height), next_height))
} }
Ok(_) => unreachable!("wrong response"), Ok(_) => unreachable!("wrong response"),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e)), Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
}?; }?;
trace!(?txid, ?next_height, "got next height"); trace!(?txid, ?next_height, "got next height");
@ -307,11 +308,12 @@ where
let tx = match network let tx = match network
.oneshot(req) .oneshot(req)
.await .await
.map_err(CloneError::from)
.map_err(TransactionDownloadVerifyError::DownloadFailed)? .map_err(TransactionDownloadVerifyError::DownloadFailed)?
{ {
zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| { zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| {
TransactionDownloadVerifyError::DownloadFailed( TransactionDownloadVerifyError::DownloadFailed(
"no transactions returned".into(), BoxError::from("no transactions returned").into(),
) )
})?, })?,
_ => unreachable!("wrong response to transaction request"), _ => unreachable!("wrong response to transaction request"),
@ -373,7 +375,7 @@ where
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
// Prefer the cancel handle if both are ready. // Prefer the cancel handle if both are ready.
tokio::select! { let result = tokio::select! {
biased; biased;
_ = &mut cancel_rx => { _ = &mut cancel_rx => {
trace!("task cancelled prior to completion"); trace!("task cancelled prior to completion");
@ -381,7 +383,19 @@ where
Err((TransactionDownloadVerifyError::Cancelled, txid)) Err((TransactionDownloadVerifyError::Cancelled, txid))
} }
verification = fut => verification, verification = fut => verification,
};
// Send the result to responder channel if one was provided.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx.send(
result
.as_ref()
.map(|_| ())
.map_err(|(err, _)| err.clone().into()),
);
} }
result
}); });
self.pending.push(task); self.pending.push(task);
@ -458,6 +472,7 @@ where
match state match state
.ready() .ready()
.await .await
.map_err(CloneError::from)
.map_err(TransactionDownloadVerifyError::StateError)? .map_err(TransactionDownloadVerifyError::StateError)?
.call(zs::Request::Transaction(txid.mined_id())) .call(zs::Request::Transaction(txid.mined_id()))
.await .await
@ -465,7 +480,7 @@ where
Ok(zs::Response::Transaction(None)) => Ok(()), Ok(zs::Response::Transaction(None)) => Ok(()),
Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState), Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState),
Ok(_) => unreachable!("wrong response"), Ok(_) => unreachable!("wrong response"),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e)), Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
}?; }?;
Ok(()) Ok(())

View File

@ -445,12 +445,17 @@ async fn mempool_cancel_mined() -> Result<(), Report> {
.call(Request::Queue(vec![txid.into()])) .call(Request::Queue(vec![txid.into()]))
.await .await
.unwrap(); .unwrap();
let queued_responses = match response { let mut queued_responses = match response {
Response::Queued(queue_responses) => queue_responses, Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"), _ => unreachable!("will never happen in this test"),
}; };
assert_eq!(queued_responses.len(), 1); assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
let queued_response = queued_responses
.pop()
.expect("already checked that there is exactly 1 item in Vec")
.expect("initial queue checks result should be Ok");
assert_eq!(mempool.tx_downloads().in_flight(), 1); assert_eq!(mempool.tx_downloads().in_flight(), 1);
// Push block 2 to the state // Push block 2 to the state
@ -489,6 +494,14 @@ async fn mempool_cancel_mined() -> Result<(), Report> {
// Check if download was cancelled. // Check if download was cancelled.
assert_eq!(mempool.tx_downloads().in_flight(), 0); assert_eq!(mempool.tx_downloads().in_flight(), 0);
assert!(
queued_response
.await
.expect("channel should not be closed")
.is_err(),
"queued tx should fail to download and verify due to chain tip change"
);
Ok(()) Ok(())
} }