feat(rpc): Implement an RPC transaction queue (#4015)

* Add a rpc queue

* Implement the rpc queue

* Add rpc queue tests

* Remove mutex, use broadcast channel

* Have order and limit in the queue

* fix multiple transactions channel

* Use a network argument

* Use chain tip to calculate block spacing

* Add extra time

* Finalize the state check test

* Add a retry test

* Fix description

* fix some docs

* add additional empty check to `Runner::run`

* remove non used method

* ignore some errors

* fix some docs

* add a panic checker to the queue

* add missing file changes for panic checker

* skip checks and retries if height has not changed

* change constants

* reduce the number of queue test cases

* remove suggestion

* change best tip check

* fix(rpc): Check for panics in the transaction queue (#4046)

* Check for panics in the RPC transaction queue

* Add missing pin! and abort in the start task

* Check for transaction queue panics in tests

* Fixup a new RPC test from the main branch

Co-authored-by: teor <teor@riseup.net>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
Alfredo Garcia 2022-04-12 02:06:29 -03:00 committed by GitHub
parent caac71a9d8
commit d09769714f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1022 additions and 38 deletions

View File

@ -6,6 +6,7 @@
pub mod config;
pub mod methods;
pub mod queue;
pub mod server;
#[cfg(test)]
mod tests;

View File

@ -14,18 +14,22 @@ use hex::{FromHex, ToHex};
use indexmap::IndexMap;
use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use tokio::{sync::broadcast::Sender, task::JoinHandle};
use tower::{buffer::Buffer, Service, ServiceExt};
use tracing::Instrument;
use zebra_chain::{
block::{self, Height, SerializedBlock},
chain_tip::ChainTip,
parameters::{ConsensusBranchId, Network, NetworkUpgrade},
serialization::{SerializationError, ZcashDeserialize},
transaction::{self, SerializedTransaction, Transaction},
transaction::{self, SerializedTransaction, Transaction, UnminedTx},
};
use zebra_network::constants::USER_AGENT;
use zebra_node_services::{mempool, BoxError};
use crate::queue::Queue;
#[cfg(test)]
mod tests;
@ -168,17 +172,23 @@ where
/// The configured network for this RPC service.
#[allow(dead_code)]
network: Network,
/// A sender component of a channel used to send transactions to the queue.
queue_sender: Sender<Option<UnminedTx>>,
}
impl<Mempool, State, Tip> RpcImpl<Mempool, State, Tip>
where
Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError>,
Mempool: Service<mempool::Request, Response = mempool::Response, Error = BoxError> + 'static,
State: Service<
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
>,
Tip: ChainTip + Send + Sync,
zebra_state::ReadRequest,
Response = zebra_state::ReadResponse,
Error = zebra_state::BoxError,
> + Clone
+ Send
+ Sync
+ 'static,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
/// Create a new instance of the RPC handler.
pub fn new<Version>(
@ -187,17 +197,31 @@ where
state: State,
latest_chain_tip: Tip,
network: Network,
) -> Self
) -> (Self, JoinHandle<()>)
where
Version: ToString,
<Mempool as Service<mempool::Request>>::Future: Send,
<State as Service<zebra_state::ReadRequest>>::Future: Send,
{
RpcImpl {
let runner = Queue::start();
let rpc_impl = RpcImpl {
app_version: app_version.to_string(),
mempool,
state,
latest_chain_tip,
mempool: mempool.clone(),
state: state.clone(),
latest_chain_tip: latest_chain_tip.clone(),
network,
}
queue_sender: runner.sender(),
};
// run the process queue
let rpc_tx_queue_task_handle = tokio::spawn(
runner
.run(mempool, state, latest_chain_tip, network)
.in_current_span(),
);
(rpc_impl, rpc_tx_queue_task_handle)
}
}
@ -327,6 +351,7 @@ where
raw_transaction_hex: String,
) -> BoxFuture<Result<SentTransactionHash>> {
let mempool = self.mempool.clone();
let queue_sender = self.queue_sender.clone();
async move {
let raw_transaction_bytes = Vec::from_hex(raw_transaction_hex).map_err(|_| {
@ -337,6 +362,10 @@ where
let transaction_hash = raw_transaction.hash();
// send transaction to the rpc queue, ignore any error.
let unmined_transaction = UnminedTx::from(raw_transaction.clone());
let _ = queue_sender.send(Some(unmined_transaction));
let transaction_parameter = mempool::Gossip::Tx(raw_transaction.into());
let request = mempool::Request::Queue(vec![transaction_parameter]);

View File

@ -2,6 +2,7 @@
use std::collections::HashSet;
use futures::FutureExt;
use hex::ToHex;
use jsonrpc_core::{Error, ErrorCode};
use proptest::prelude::*;
@ -34,7 +35,7 @@ proptest! {
runtime.block_on(async move {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -67,6 +68,10 @@ proptest! {
prop_assert_eq!(result, Ok(hash));
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
Ok::<_, TestCaseError>(())
})?;
}
@ -82,7 +87,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -122,6 +127,10 @@ proptest! {
"Result is not a server error: {result:?}"
);
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
Ok::<_, TestCaseError>(())
})?;
}
@ -135,7 +144,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -176,6 +185,10 @@ proptest! {
"Result is not a server error: {result:?}"
);
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
Ok::<_, TestCaseError>(())
})?;
}
@ -196,7 +209,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -224,6 +237,10 @@ proptest! {
"Result is not an invalid parameters error: {result:?}"
);
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
Ok::<_, TestCaseError>(())
})?;
}
@ -246,7 +263,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -274,6 +291,10 @@ proptest! {
"Result is not an invalid parameters error: {result:?}"
);
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
Ok::<_, TestCaseError>(())
})?;
}
@ -295,7 +316,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -323,6 +344,10 @@ proptest! {
prop_assert_eq!(result, Ok(expected_response));
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
Ok::<_, TestCaseError>(())
})?;
}
@ -343,7 +368,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -371,6 +396,10 @@ proptest! {
"Result is not an invalid parameters error: {result:?}"
);
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
Ok::<_, TestCaseError>(())
})?;
}
@ -393,7 +422,7 @@ proptest! {
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -420,6 +449,11 @@ proptest! {
),
"Result is not an invalid parameters error: {result:?}"
);
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
Ok::<_, TestCaseError>(())
})?;
}
@ -431,16 +465,23 @@ proptest! {
let _guard = runtime.enter();
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
// look for an error with a `NoChainTip`
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
NoChainTip,
network,
);
let response = rpc.get_blockchain_info();
prop_assert_eq!(&response.err().unwrap().message, "No Chain tip available yet");
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
runtime.block_on(async move {
mempool.expect_no_requests().await?;
state.expect_no_requests().await?;
@ -471,7 +512,7 @@ proptest! {
mock_chain_tip_sender.send_best_tip_block_time(block_time);
// Start RPC with the mocked `ChainTip`
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -504,6 +545,10 @@ proptest! {
},
};
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
// check no requests were made during this test
runtime.block_on(async move {
mempool.expect_no_requests().await?;
@ -512,6 +557,191 @@ proptest! {
Ok::<_, TestCaseError>(())
})?;
}
/// Test the queue functionality using `send_raw_transaction`
#[test]
fn rpc_queue_main_loop(tx in any::<Transaction>())
{
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
let transaction_hash = tx.hash();
runtime.block_on(async move {
tokio::time::pause();
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
NoChainTip,
Mainnet,
);
// send a transaction
let tx_bytes = tx
.zcash_serialize_to_vec()
.expect("Transaction serializes successfully");
let tx_hex = hex::encode(&tx_bytes);
let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex));
let tx_unmined = UnminedTx::from(tx);
let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]);
// fail the mempool insertion
mempool
.expect_request(expected_request)
.await
.unwrap()
.respond(Err(DummyError));
let _ = send_task
.await
.expect("Sending raw transactions should not panic");
// advance enough time to have a new runner iteration
let spacing = chrono::Duration::seconds(150);
tokio::time::advance(spacing.to_std().unwrap()).await;
// the runner will made a new call to TransactionsById
let mut transactions_hash_set = HashSet::new();
transactions_hash_set.insert(tx_unmined.id);
let expected_request = mempool::Request::TransactionsById(transactions_hash_set);
let response = mempool::Response::Transactions(vec![]);
mempool
.expect_request(expected_request)
.await?
.respond(response);
// the runner will also query the state again for the transaction
let expected_request = zebra_state::ReadRequest::Transaction(transaction_hash);
let response = zebra_state::ReadResponse::Transaction(None);
state
.expect_request(expected_request)
.await?
.respond(response);
// now a retry will be sent to the mempool
let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]);
let response = mempool::Response::Queued(vec![Ok(())]);
mempool
.expect_request(expected_request)
.await?
.respond(response);
// no more requets are done
mempool.expect_no_requests().await?;
state.expect_no_requests().await?;
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
Ok::<_, TestCaseError>(())
})?;
}
/// Test we receive all transactions that are sent in a channel
#[test]
fn rpc_queue_receives_all_transactions_from_channel(txs in any::<[Transaction; 2]>())
{
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
runtime.block_on(async move {
tokio::time::pause();
let mut mempool = MockService::build().for_prop_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
NoChainTip,
Mainnet,
);
let mut transactions_hash_set = HashSet::new();
for tx in txs.clone() {
// send a transaction
let tx_bytes = tx
.zcash_serialize_to_vec()
.expect("Transaction serializes successfully");
let tx_hex = hex::encode(&tx_bytes);
let send_task = tokio::spawn(rpc.send_raw_transaction(tx_hex));
let tx_unmined = UnminedTx::from(tx.clone());
let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]);
// inser to hs we will use later
transactions_hash_set.insert(tx_unmined.id);
// fail the mempool insertion
mempool
.clone()
.expect_request(expected_request)
.await
.unwrap()
.respond(Err(DummyError));
let _ = send_task
.await
.expect("Sending raw transactions should not panic");
}
// advance enough time to have a new runner iteration
let spacing = chrono::Duration::seconds(150);
tokio::time::advance(spacing.to_std().unwrap()).await;
// the runner will made a new call to TransactionsById quering with both transactions
let expected_request = mempool::Request::TransactionsById(transactions_hash_set);
let response = mempool::Response::Transactions(vec![]);
mempool
.expect_request(expected_request)
.await?
.respond(response);
// the runner will also query the state again for each transaction
for _tx in txs.clone() {
let response = zebra_state::ReadResponse::Transaction(None);
// we use `expect_request_that` because we can't guarantee the state request order
state
.expect_request_that(|request| matches!(request, zebra_state::ReadRequest::Transaction(_)))
.await?
.respond(response);
}
// each transaction will be retried
for tx in txs.clone() {
let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(UnminedTx::from(tx))]);
let response = mempool::Response::Queued(vec![Ok(())]);
mempool
.expect_request(expected_request)
.await?
.respond(response);
}
// no more requets are done
mempool.expect_no_requests().await?;
state.expect_no_requests().await?;
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
prop_assert!(matches!(rpc_tx_queue_task_result, None));
Ok::<_, TestCaseError>(())
})?;
}
}
#[derive(Clone, Copy, Debug, Error)]

View File

@ -26,7 +26,7 @@ async fn rpc_getinfo() {
let mut mempool: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -46,6 +46,10 @@ async fn rpc_getinfo() {
mempool.expect_no_requests().await;
state.expect_no_requests().await;
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
@ -64,7 +68,7 @@ async fn rpc_getblock() {
zebra_state::populated_state(blocks.clone(), Mainnet).await;
// Init RPC
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
read_state,
@ -83,6 +87,10 @@ async fn rpc_getblock() {
}
mempool.expect_no_requests().await;
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
@ -93,7 +101,7 @@ async fn rpc_getblock_parse_error() {
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
// Init RPC
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -109,6 +117,10 @@ async fn rpc_getblock_parse_error() {
mempool.expect_no_requests().await;
state.expect_no_requests().await;
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
@ -119,7 +131,7 @@ async fn rpc_getblock_missing_error() {
let mut state: MockService<_, _, _, BoxError> = MockService::build().for_unit_tests();
// Init RPC
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
Buffer::new(state.clone(), 1),
@ -157,6 +169,10 @@ async fn rpc_getblock_missing_error() {
mempool.expect_no_requests().await;
state.expect_no_requests().await;
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
@ -181,7 +197,7 @@ async fn rpc_getbestblockhash() {
zebra_state::populated_state(blocks.clone(), Mainnet).await;
// Init RPC
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
read_state,
@ -199,6 +215,10 @@ async fn rpc_getbestblockhash() {
assert_eq!(response_hash, tip_block_hash);
mempool.expect_no_requests().await;
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}
#[tokio::test]
@ -217,7 +237,7 @@ async fn rpc_getrawtransaction() {
zebra_state::populated_state(blocks.clone(), Mainnet).await;
// Init RPC
let rpc = RpcImpl::new(
let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new(
"RPC test",
Buffer::new(mempool.clone(), 1),
read_state,
@ -280,4 +300,8 @@ async fn rpc_getrawtransaction() {
}
}
}
// The queue task should continue without errors or panics
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(matches!(rpc_tx_queue_task_result, None));
}

323
zebra-rpc/src/queue.rs Normal file
View File

@ -0,0 +1,323 @@
//! Transaction Queue.
//!
//! All transactions that are sent from RPC methods should be added to this queue for retries.
//! Transactions can fail to be inserted to the mempool inmediatly by different reasons,
//! like having not mined utxos.
//!
//! The [`Queue`] is just an `IndexMap` of transactions with insertion date.
//! We use this data type because we want the transactions in the queue to be in order.
//! The [`Runner`] component will do the processing in it's [`Runner::run()`] method.
use std::{collections::HashSet, sync::Arc};
use chrono::Duration;
use indexmap::IndexMap;
use tokio::{
sync::broadcast::{channel, Receiver, Sender},
time::Instant,
};
use tower::{Service, ServiceExt};
use zebra_chain::{
block::Height,
chain_tip::ChainTip,
parameters::{Network, NetworkUpgrade},
transaction::{Transaction, UnminedTx, UnminedTxId},
};
use zebra_node_services::{
mempool::{Gossip, Request, Response},
BoxError,
};
use zebra_state::{ReadRequest, ReadResponse};
#[cfg(test)]
mod tests;
/// The approximate target number of blocks a transaction can be in the queue.
const NUMBER_OF_BLOCKS_TO_EXPIRE: i64 = 5;
/// Size of the queue and channel.
const CHANNEL_AND_QUEUE_CAPACITY: usize = 20;
/// The height to use in spacing calculation if we don't have a chain tip.
const NO_CHAIN_TIP_HEIGHT: Height = Height(1);
#[derive(Clone, Debug)]
/// The queue is a container of transactions that are going to be
/// sent to the mempool again.
pub struct Queue {
transactions: IndexMap<UnminedTxId, (Arc<Transaction>, Instant)>,
}
#[derive(Debug)]
/// The runner will make the processing of the transactions in the queue.
pub struct Runner {
queue: Queue,
sender: Sender<Option<UnminedTx>>,
tip_height: Height,
}
impl Queue {
/// Start a new queue
pub fn start() -> Runner {
let (sender, _receiver) = channel(CHANNEL_AND_QUEUE_CAPACITY);
let queue = Queue {
transactions: IndexMap::new(),
};
Runner {
queue,
sender,
tip_height: Height(0),
}
}
/// Get the transactions in the queue.
pub fn transactions(&self) -> IndexMap<UnminedTxId, (Arc<Transaction>, Instant)> {
self.transactions.clone()
}
/// Insert a transaction to the queue.
pub fn insert(&mut self, unmined_tx: UnminedTx) {
self.transactions
.insert(unmined_tx.id, (unmined_tx.transaction, Instant::now()));
// remove if queue is over capacity
if self.transactions.len() > CHANNEL_AND_QUEUE_CAPACITY {
self.remove_first();
}
}
/// Remove a transaction from the queue.
pub fn remove(&mut self, unmined_id: UnminedTxId) {
self.transactions.remove(&unmined_id);
}
/// Remove the oldest transaction from the queue.
pub fn remove_first(&mut self) {
self.transactions.shift_remove_index(0);
}
}
impl Runner {
/// Create a new sender for this runner.
pub fn sender(&self) -> Sender<Option<UnminedTx>> {
self.sender.clone()
}
/// Create a new receiver.
pub fn receiver(&self) -> Receiver<Option<UnminedTx>> {
self.sender.subscribe()
}
/// Get the queue transactions as a `HashSet` of unmined ids.
fn transactions_as_hash_set(&self) -> HashSet<UnminedTxId> {
let transactions = self.queue.transactions();
transactions.iter().map(|t| *t.0).collect()
}
/// Get the queue transactions as a `Vec` of transactions.
fn transactions_as_vec(&self) -> Vec<Arc<Transaction>> {
let transactions = self.queue.transactions();
transactions.iter().map(|t| t.1 .0.clone()).collect()
}
/// Update the `tip_height` field with a new height.
pub fn update_tip_height(&mut self, height: Height) {
self.tip_height = height;
}
/// Retry sending to memempool if needed.
///
/// Creates a loop that will run each time a new block is mined.
/// In this loop, get the transactions that are in the queue and:
/// - Check if they are now in the mempool and if so, delete the transaction from the queue.
/// - Check if the transaction is now part of a block in the state and if so,
/// delete the transaction from the queue.
/// - With the transactions left in the queue, retry sending them to the mempool ignoring
/// the result of this operation.
///
/// Addtionally, each iteration of the above loop, will receive and insert to the queue
/// transactions that are pending in the channel.
pub async fn run<Mempool, State, Tip>(
mut self,
mempool: Mempool,
state: State,
tip: Tip,
network: Network,
) where
Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
State: Service<ReadRequest, Response = ReadResponse, Error = zebra_state::BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
let mut receiver = self.sender.subscribe();
loop {
// if we don't have a chain use `NO_CHAIN_TIP_HEIGHT` to get block spacing
let tip_height = match tip.best_tip_height() {
Some(height) => height,
_ => NO_CHAIN_TIP_HEIGHT,
};
// get spacing between blocks
let spacing = NetworkUpgrade::target_spacing_for_height(network, tip_height);
// sleep until the next block
tokio::time::sleep(spacing.to_std().expect("should never be less than zero")).await;
// get transactions from the channel
while let Ok(Some(tx)) = receiver.try_recv() {
let _ = &self.queue.insert(tx.clone());
}
// skip some work if stored tip height is the same as the one arriving
// TODO: check tip block hashes instead, so we always retry when there is a chain fork (these are rare)
if tip_height != self.tip_height {
// update the chain tip
self.update_tip_height(tip_height);
if !self.queue.transactions().is_empty() {
// remove what is expired
self.remove_expired(spacing);
// remove if any of the queued transactions is now in the mempool
let in_mempool =
Self::check_mempool(mempool.clone(), self.transactions_as_hash_set()).await;
self.remove_committed(in_mempool);
// remove if any of the queued transactions is now in the state
let in_state =
Self::check_state(state.clone(), self.transactions_as_hash_set()).await;
self.remove_committed(in_state);
// retry what is left in the queue
let _retried = Self::retry(mempool.clone(), self.transactions_as_vec()).await;
}
}
}
}
/// Remove transactions that are expired according to number of blocks and current spacing between blocks.
fn remove_expired(&mut self, spacing: Duration) {
// Have some extra time to to make sure we re-submit each transaction `NUMBER_OF_BLOCKS_TO_EXPIRE`
// times, as the main loop also takes some time to run.
let extra_time = Duration::seconds(5);
let duration_to_expire =
Duration::seconds(NUMBER_OF_BLOCKS_TO_EXPIRE * spacing.num_seconds()) + extra_time;
let transactions = self.queue.transactions();
let now = Instant::now();
for tx in transactions.iter() {
let tx_time =
tx.1 .1
.checked_add(
duration_to_expire
.to_std()
.expect("should never be less than zero"),
)
.expect("this is low numbers, should always be inside bounds");
if now > tx_time {
self.queue.remove(*tx.0);
}
}
}
/// Remove transactions from the queue that had been inserted to the state or the mempool.
fn remove_committed(&mut self, to_remove: HashSet<UnminedTxId>) {
for r in to_remove {
self.queue.remove(r);
}
}
/// Check the mempool for given transactions.
///
/// Returns transactions that are in the mempool.
async fn check_mempool<Mempool>(
mempool: Mempool,
transactions: HashSet<UnminedTxId>,
) -> HashSet<UnminedTxId>
where
Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
{
let mut response = HashSet::new();
if !transactions.is_empty() {
let request = Request::TransactionsById(transactions);
// ignore any error coming from the mempool
let mempool_response = mempool.oneshot(request).await;
if let Ok(Response::Transactions(txs)) = mempool_response {
for tx in txs {
response.insert(tx.id);
}
}
}
response
}
/// Check the state for given transactions.
///
/// Returns transactions that are in the state.
async fn check_state<State>(
state: State,
transactions: HashSet<UnminedTxId>,
) -> HashSet<UnminedTxId>
where
State: Service<ReadRequest, Response = ReadResponse, Error = zebra_state::BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
{
let mut response = HashSet::new();
for t in transactions {
let request = ReadRequest::Transaction(t.mined_id());
// ignore any error coming from the state
let state_response = state.clone().oneshot(request).await;
if let Ok(ReadResponse::Transaction(Some(tx))) = state_response {
response.insert(tx.0.unmined_id());
}
}
response
}
/// Retry sending given transactions to mempool.
///
/// Returns the transaction ids that were retried.
async fn retry<Mempool>(
mempool: Mempool,
transactions: Vec<Arc<Transaction>>,
) -> HashSet<UnminedTxId>
where
Mempool: Service<Request, Response = Response, Error = BoxError> + Clone + 'static,
{
let mut retried = HashSet::new();
for tx in transactions {
let unmined = UnminedTx::from(tx);
let gossip = Gossip::Tx(unmined.clone());
let request = Request::Queue(vec![gossip]);
// Send to memmpool and ignore any error
let _ = mempool.clone().oneshot(request).await;
// retrurn what we retried but don't delete from the queue,
// we might retry again in a next call.
retried.insert(unmined.id);
}
retried
}
}

View File

@ -0,0 +1,3 @@
//! Test code for the RPC queue
mod prop;

View File

@ -0,0 +1,355 @@
//! Randomised property tests for the RPC Queue.
use std::{collections::HashSet, env, sync::Arc};
use proptest::prelude::*;
use chrono::Duration;
use tokio::time;
use tower::ServiceExt;
use zebra_chain::{
block::{Block, Height},
serialization::ZcashDeserializeInto,
transaction::{Transaction, UnminedTx},
};
use zebra_node_services::mempool::{Gossip, Request, Response};
use zebra_state::{BoxError, ReadRequest, ReadResponse};
use zebra_test::mock_service::MockService;
use crate::queue::{Queue, Runner, CHANNEL_AND_QUEUE_CAPACITY};
/// The default number of proptest cases for these tests.
const DEFAULT_BLOCK_VEC_PROPTEST_CASES: u32 = 2;
proptest! {
#![proptest_config(
proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_BLOCK_VEC_PROPTEST_CASES))
)]
/// Test insert to the queue and remove from it.
#[test]
fn insert_remove_to_from_queue(transaction in any::<UnminedTx>()) {
// create a queue
let mut runner = Queue::start();
// insert transaction
runner.queue.insert(transaction.clone());
// transaction was inserted to queue
let queue_transactions = runner.queue.transactions();
prop_assert_eq!(1, queue_transactions.len());
// remove transaction from the queue
runner.queue.remove(transaction.id);
// transaction was removed from queue
prop_assert_eq!(runner.queue.transactions().len(), 0);
}
/// Test queue never grows above limit.
#[test]
fn queue_size_limit(transactions in any::<[UnminedTx; CHANNEL_AND_QUEUE_CAPACITY + 1]>()) {
// create a queue
let mut runner = Queue::start();
// insert all transactions we have
transactions.iter().for_each(|t| runner.queue.insert(t.clone()));
// transaction queue is never above limit
let queue_transactions = runner.queue.transactions();
prop_assert_eq!(CHANNEL_AND_QUEUE_CAPACITY, queue_transactions.len());
}
/// Test queue order.
#[test]
fn queue_order(transactions in any::<[UnminedTx; 32]>()) {
// create a queue
let mut runner = Queue::start();
// fill the queue and check insertion order
for i in 0..CHANNEL_AND_QUEUE_CAPACITY {
let transaction = transactions[i].clone();
runner.queue.insert(transaction.clone());
let queue_transactions = runner.queue.transactions();
prop_assert_eq!(i + 1, queue_transactions.len());
prop_assert_eq!(UnminedTx::from(queue_transactions[i].0.clone()), transaction);
}
// queue is full
let queue_transactions = runner.queue.transactions();
prop_assert_eq!(CHANNEL_AND_QUEUE_CAPACITY, queue_transactions.len());
// keep adding transaction, new transactions will always be on top of the queue
for transaction in transactions.iter().skip(CHANNEL_AND_QUEUE_CAPACITY) {
runner.queue.insert(transaction.clone());
let queue_transactions = runner.queue.transactions();
prop_assert_eq!(CHANNEL_AND_QUEUE_CAPACITY, queue_transactions.len());
prop_assert_eq!(UnminedTx::from(queue_transactions.last().unwrap().1.0.clone()), transaction.clone());
}
// check the order of the final queue
let queue_transactions = runner.queue.transactions();
for i in 0..CHANNEL_AND_QUEUE_CAPACITY {
let transaction = transactions[(CHANNEL_AND_QUEUE_CAPACITY - 8) + i].clone();
prop_assert_eq!(UnminedTx::from(queue_transactions[i].0.clone()), transaction);
}
}
/// Test transactions are removed from the queue after time elapses.
#[test]
fn remove_expired_transactions_from_queue(transaction in any::<UnminedTx>()) {
let runtime = zebra_test::init_async();
runtime.block_on(async move {
// pause the clock
time::pause();
// create a queue
let mut runner = Queue::start();
// insert a transaction to the queue
runner.queue.insert(transaction);
prop_assert_eq!(runner.queue.transactions().len(), 1);
// have a block interval value equal to the one at Height(1)
let spacing = Duration::seconds(150);
// apply expiration inmediatly, transaction will not be removed from queue
runner.remove_expired(spacing);
prop_assert_eq!(runner.queue.transactions().len(), 1);
// apply expiration after 1 block elapsed, transaction will not be removed from queue
time::advance(spacing.to_std().unwrap()).await;
runner.remove_expired(spacing);
prop_assert_eq!(runner.queue.transactions().len(), 1);
// apply expiration after 2 blocks elapsed, transaction will not be removed from queue
time::advance(spacing.to_std().unwrap()).await;
runner.remove_expired(spacing);
prop_assert_eq!(runner.queue.transactions().len(), 1);
// apply expiration after 3 blocks elapsed, transaction will not be removed from queue
time::advance(spacing.to_std().unwrap()).await;
runner.remove_expired(spacing);
prop_assert_eq!(runner.queue.transactions().len(), 1);
// apply expiration after 4 blocks elapsed, transaction will not be removed from queue
time::advance(spacing.to_std().unwrap()).await;
runner.remove_expired(spacing);
prop_assert_eq!(runner.queue.transactions().len(), 1);
// apply expiration after 5 block elapsed, transaction will not be removed from queue
// as it needs the extra time of 5 seconds
time::advance(spacing.to_std().unwrap()).await;
runner.remove_expired(spacing);
prop_assert_eq!(runner.queue.transactions().len(), 1);
// apply 6 seconds more, transaction will be removed from the queue
time::advance(chrono::Duration::seconds(6).to_std().unwrap()).await;
runner.remove_expired(spacing);
prop_assert_eq!(runner.queue.transactions().len(), 0);
Ok::<_, TestCaseError>(())
})?;
}
/// Test transactions are removed from queue after they get in the mempool
#[test]
fn queue_runner_mempool(transaction in any::<Transaction>()) {
let runtime = zebra_test::init_async();
runtime.block_on(async move {
let mut mempool = MockService::build().for_prop_tests();
// create a queue
let mut runner = Queue::start();
// insert a transaction to the queue
let unmined_transaction = UnminedTx::from(transaction);
runner.queue.insert(unmined_transaction.clone());
let transactions = runner.queue.transactions();
prop_assert_eq!(transactions.len(), 1);
// get a `HashSet` of transactions to call mempool with
let transactions_hash_set = runner.transactions_as_hash_set();
// run the mempool checker
let send_task = tokio::spawn(Runner::check_mempool(mempool.clone(), transactions_hash_set.clone()));
// mempool checker will call the mempool looking for the transaction
let expected_request = Request::TransactionsById(transactions_hash_set.clone());
let response = Response::Transactions(vec![]);
mempool
.expect_request(expected_request)
.await?
.respond(response);
let result = send_task.await.expect("Requesting transactions should not panic");
// empty results, transaction is not in the mempool
prop_assert_eq!(result, HashSet::new());
// insert transaction to the mempool
let request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]);
let expected_request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]);
let send_task = tokio::spawn(mempool.clone().oneshot(request));
let response = Response::Queued(vec![Ok(())]);
mempool
.expect_request(expected_request)
.await?
.respond(response);
let _ = send_task.await.expect("Inserting to mempool should not panic");
// check the mempool again
let send_task = tokio::spawn(Runner::check_mempool(mempool.clone(), transactions_hash_set.clone()));
// mempool checker will call the mempool looking for the transaction
let expected_request = Request::TransactionsById(transactions_hash_set);
let response = Response::Transactions(vec![unmined_transaction]);
mempool
.expect_request(expected_request)
.await?
.respond(response);
let result = send_task.await.expect("Requesting transactions should not panic");
// transaction is in the mempool
prop_assert_eq!(result.len(), 1);
// but it is not deleted from the queue yet
prop_assert_eq!(runner.queue.transactions().len(), 1);
// delete by calling remove_committed
runner.remove_committed(result);
prop_assert_eq!(runner.queue.transactions().len(), 0);
// no more requets expected
mempool.expect_no_requests().await?;
Ok::<_, TestCaseError>(())
})?;
}
/// Test transactions are removed from queue after they get in the state
#[test]
fn queue_runner_state(transaction in any::<Transaction>()) {
let runtime = zebra_test::init_async();
runtime.block_on(async move {
let mut read_state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
let mut write_state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests();
// create a queue
let mut runner = Queue::start();
// insert a transaction to the queue
let unmined_transaction = UnminedTx::from(&transaction);
runner.queue.insert(unmined_transaction.clone());
prop_assert_eq!(runner.queue.transactions().len(), 1);
// get a `HashSet` of transactions to call state with
let transactions_hash_set = runner.transactions_as_hash_set();
let send_task = tokio::spawn(Runner::check_state(read_state.clone(), transactions_hash_set.clone()));
let expected_request = ReadRequest::Transaction(transaction.hash());
let response = ReadResponse::Transaction(None);
read_state
.expect_request(expected_request)
.await?
.respond(response);
let result = send_task.await.expect("Requesting transaction should not panic");
// transaction is not in the state
prop_assert_eq!(HashSet::new(), result);
// get a block and push our transaction to it
let block =
zebra_test::vectors::BLOCK_MAINNET_1_BYTES.zcash_deserialize_into::<Arc<Block>>()?;
let mut block = Arc::try_unwrap(block).expect("block should unwrap");
block.transactions.push(Arc::new(transaction.clone()));
// commit the created block
let request = zebra_state::Request::CommitFinalizedBlock(zebra_state::FinalizedBlock::from(Arc::new(block.clone())));
let send_task = tokio::spawn(write_state.clone().oneshot(request.clone()));
let response = zebra_state::Response::Committed(block.hash());
write_state
.expect_request(request)
.await?
.respond(response);
let _ = send_task.await.expect("Inserting block to state should not panic");
// check the state again
let send_task = tokio::spawn(Runner::check_state(read_state.clone(), transactions_hash_set));
let expected_request = ReadRequest::Transaction(transaction.hash());
let response = ReadResponse::Transaction(Some((Arc::new(transaction), Height(1))));
read_state
.expect_request(expected_request)
.await?
.respond(response);
let result = send_task.await.expect("Requesting transaction should not panic");
// transaction was found in the state
prop_assert_eq!(result.len(), 1);
read_state.expect_no_requests().await?;
write_state.expect_no_requests().await?;
Ok::<_, TestCaseError>(())
})?;
}
// Test any given transaction can be mempool retried.
#[test]
fn queue_mempool_retry(transaction in any::<Transaction>()) {
let runtime = zebra_test::init_async();
runtime.block_on(async move {
let mut mempool = MockService::build().for_prop_tests();
// create a queue
let mut runner = Queue::start();
// insert a transaction to the queue
let unmined_transaction = UnminedTx::from(transaction.clone());
runner.queue.insert(unmined_transaction.clone());
let transactions = runner.queue.transactions();
prop_assert_eq!(transactions.len(), 1);
// get a `Vec` of transactions to do retries
let transactions_vec = runner.transactions_as_vec();
// run retry
let send_task = tokio::spawn(Runner::retry(mempool.clone(), transactions_vec.clone()));
// retry will queue the transaction to mempool
let gossip = Gossip::Tx(UnminedTx::from(transaction.clone()));
let expected_request = Request::Queue(vec![gossip]);
let response = Response::Queued(vec![Ok(())]);
mempool
.expect_request(expected_request)
.await?
.respond(response);
let result = send_task.await.expect("Requesting transactions should not panic");
// retry was done
prop_assert_eq!(result.len(), 1);
Ok::<_, TestCaseError>(())
})?;
}
}

View File

@ -9,6 +9,7 @@
use jsonrpc_core;
use jsonrpc_http_server::ServerBuilder;
use tokio::task::JoinHandle;
use tower::{buffer::Buffer, Service};
use tracing::*;
use tracing_futures::Instrument;
@ -37,7 +38,7 @@ impl RpcServer {
state: State,
latest_chain_tip: Tip,
network: Network,
) -> tokio::task::JoinHandle<()>
) -> (JoinHandle<()>, JoinHandle<()>)
where
Version: ToString,
Mempool: tower::Service<mempool::Request, Response = mempool::Response, Error = BoxError>
@ -52,13 +53,14 @@ impl RpcServer {
+ Sync
+ 'static,
State::Future: Send,
Tip: ChainTip + Send + Sync + 'static,
Tip: ChainTip + Clone + Send + Sync + 'static,
{
if let Some(listen_addr) = config.listen_addr {
info!("Trying to open RPC endpoint at {}...", listen_addr,);
// Initialize the rpc methods with the zebra version
let rpc_impl = RpcImpl::new(app_version, mempool, state, latest_chain_tip, network);
let (rpc_impl, rpc_tx_queue_task_handle) =
RpcImpl::new(app_version, mempool, state, latest_chain_tip, network);
// Create handler compatible with V1 and V2 RPC protocols
let mut io =
@ -87,10 +89,16 @@ impl RpcServer {
})
};
tokio::task::spawn_blocking(server)
(
tokio::task::spawn_blocking(server),
rpc_tx_queue_task_handle,
)
} else {
// There is no RPC port, so the RPC task does nothing.
tokio::task::spawn(futures::future::pending().in_current_span())
// There is no RPC port, so the RPC tasks do nothing.
(
tokio::task::spawn(futures::future::pending().in_current_span()),
tokio::task::spawn(futures::future::pending().in_current_span()),
)
}
}
}

View File

@ -160,7 +160,7 @@ impl StartCmd {
.service(mempool);
// Launch RPC server
let rpc_task_handle = RpcServer::spawn(
let (rpc_task_handle, rpc_tx_queue_task_handle) = RpcServer::spawn(
config.rpc,
app_version(),
mempool.clone(),
@ -183,7 +183,7 @@ impl StartCmd {
let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span());
let mut block_gossip_task_handle = tokio::spawn(
let block_gossip_task_handle = tokio::spawn(
sync::gossip_best_tip_block_hashes(
sync_status.clone(),
chain_tip_change.clone(),
@ -218,7 +218,9 @@ impl StartCmd {
// ongoing tasks
pin!(rpc_task_handle);
pin!(rpc_tx_queue_task_handle);
pin!(syncer_task_handle);
pin!(block_gossip_task_handle);
pin!(mempool_crawler_task_handle);
pin!(mempool_queue_checker_task_handle);
pin!(tx_gossip_task_handle);
@ -240,6 +242,13 @@ impl StartCmd {
Ok(())
}
rpc_tx_queue_result = &mut rpc_tx_queue_task_handle => {
rpc_tx_queue_result
.expect("unexpected panic in the rpc transaction queue task");
info!("rpc transaction queue task exited");
Ok(())
}
sync_result = &mut syncer_task_handle => sync_result
.expect("unexpected panic in the syncer task")
.map(|_| info!("syncer task exited")),
@ -298,12 +307,14 @@ impl StartCmd {
info!("exiting Zebra because an ongoing task exited: stopping other tasks");
// ongoing tasks
rpc_task_handle.abort();
rpc_tx_queue_task_handle.abort();
syncer_task_handle.abort();
block_gossip_task_handle.abort();
mempool_crawler_task_handle.abort();
mempool_queue_checker_task_handle.abort();
tx_gossip_task_handle.abort();
rpc_task_handle.abort();
progress_task_handle.abort();
// startup tasks
groth16_download_handle.abort();