Mempool component and storage (#2651)

* First pass at a Mempool Service, incl. a storage layer underneath

* Fixed up Mempool service and storage

* allow dead code where needed

* clippy

* typo

* only drain if the mempool is full

* add a basic storage test

* remove space

* fix test for when MEMPOOL_SIZE change

* group some imports

* add a basic mempool service test

* add clippy suggestions

* remove not needed allow dead code

* Apply suggestions from code review

Co-authored-by: teor <teor@riseup.net>

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Deirdre Connolly 2021-08-27 10:36:17 -04:00 committed by GitHub
parent d2e14b22f9
commit 8d3f6dc026
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 317 additions and 2 deletions

View File

@ -1,8 +1,89 @@
//! Zebra mempool.
/// Mempool-related errors.
pub mod error;
use std::{
collections::HashSet,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures::future::FutureExt;
use tower::Service;
use zebra_chain::{
parameters::Network,
transaction::{UnminedTx, UnminedTxId},
};
use crate::BoxError;
mod crawler;
mod error;
mod storage;
#[cfg(test)]
mod tests;
pub use self::crawler::Crawler;
pub use self::error::MempoolError;
#[derive(Debug)]
#[allow(dead_code)]
pub enum Request {
TransactionIds,
TransactionsById(HashSet<UnminedTxId>),
}
#[derive(Debug)]
pub enum Response {
Transactions(Vec<UnminedTx>),
TransactionIds(Vec<UnminedTxId>),
}
/// Mempool async management and query service.
///
/// The mempool is the set of all verified transactions that this node is aware
/// of that have yet to be confirmed by the Zcash network. A transaction is
/// confirmed when it has been included in a block ('mined').
#[derive(Clone)]
pub struct Mempool {
/// The Mempool storage itself.
///
/// ##: Correctness: only components internal to the [`Mempool`] struct are allowed to
/// inject transactions into `storage`, as transactions must be verified beforehand.
storage: storage::Storage,
}
impl Mempool {
#[allow(dead_code)]
pub(crate) fn new(_network: Network) -> Self {
Mempool {
storage: Default::default(),
}
}
}
impl Service<Request> for Mempool {
type Response = Response;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
#[instrument(name = "mempool", skip(self, req))]
fn call(&mut self, req: Request) -> Self::Future {
match req {
Request::TransactionIds => {
let res = self.storage.clone().tx_ids();
async move { Ok(Response::TransactionIds(res)) }.boxed()
}
Request::TransactionsById(ids) => {
let rsp = Ok(self.storage.clone().transactions(ids)).map(Response::Transactions);
async move { rsp }.boxed()
}
}
}
}

View File

@ -22,4 +22,7 @@ pub enum MempoolError {
#[error("transaction was not found in mempool")]
NotInMempool,
#[error("transaction evicted from the mempool due to size restrictions")]
Excess,
}

View File

@ -0,0 +1,106 @@
use std::collections::{HashMap, HashSet, VecDeque};
use zebra_chain::{
block,
transaction::{UnminedTx, UnminedTxId},
};
use zebra_consensus::error::TransactionError;
use super::MempoolError;
#[cfg(test)]
pub mod tests;
const MEMPOOL_SIZE: usize = 2;
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub enum State {
/// Rejected because verification failed.
Invalid(TransactionError),
/// An otherwise valid mempool transaction was mined into a block, therefore
/// no longer belongs in the mempool.
Confirmed(block::Hash),
/// Stayed in mempool for too long without being mined.
// TODO(2021-08-20): set expiration at 2 weeks? This is what Bitcoin does.
Expired,
/// Transaction fee is too low for the current mempool state.
LowFee,
/// Otherwise valid transaction removed from mempool, say because of FIFO
/// (first in, first out) policy.
Excess,
}
#[derive(Clone, Default)]
pub struct Storage {
/// The set of verified transactions in the mempool. This is a
/// cache of size [`MEMPOOL_SIZE`].
verified: VecDeque<UnminedTx>,
/// The set of rejected transactions by id, and their rejection reasons.
rejected: HashMap<UnminedTxId, State>,
}
impl Storage {
/// Insert a [`UnminedTx`] into the mempool.
///
/// If its insertion results in evicting other transactions, they will be tracked
/// as [`State::Excess`].
#[allow(dead_code)]
pub fn insert(&mut self, tx: UnminedTx) -> Result<UnminedTxId, MempoolError> {
let tx_id = tx.id;
// First, check if we should reject this transaction.
if self.rejected.contains_key(&tx.id) {
return Err(match self.rejected.get(&tx.id).unwrap() {
State::Invalid(e) => MempoolError::Invalid(e.clone()),
State::Expired => MempoolError::Expired,
State::Confirmed(block_hash) => MempoolError::InBlock(*block_hash),
State::Excess => MempoolError::Excess,
State::LowFee => MempoolError::LowFee,
});
}
// If `tx` is already in the mempool, we don't change anything.
//
// Security: transactions must not get refreshed by new queries,
// because that allows malicious peers to keep transactions live forever.
if self.verified.contains(&tx) {
return Err(MempoolError::InMempool);
}
// Then, we insert into the pool.
self.verified.push_front(tx);
// Once inserted, we evict transactions over the pool size limit in FIFO
// order.
if self.verified.len() > MEMPOOL_SIZE {
for evicted_tx in self.verified.drain(MEMPOOL_SIZE..) {
let _ = self.rejected.insert(evicted_tx.id, State::Excess);
}
assert_eq!(self.verified.len(), MEMPOOL_SIZE);
}
Ok(tx_id)
}
/// Returns `true` if a [`UnminedTx`] matching an [`UnminedTxId`] is in
/// the mempool.
#[allow(dead_code)]
pub fn contains(self, txid: &UnminedTxId) -> bool {
self.verified.iter().any(|tx| &tx.id == txid)
}
/// Returns the set of [`UnminedTxId`]s in the mempool.
pub fn tx_ids(self) -> Vec<UnminedTxId> {
self.verified.iter().map(|tx| tx.id).collect()
}
/// Returns the set of [`Transaction`]s matching ids in the mempool.
pub fn transactions(self, tx_ids: HashSet<UnminedTxId>) -> Vec<UnminedTx> {
self.verified
.into_iter()
.filter(|tx| tx_ids.contains(&tx.id))
.collect()
}
}

View File

@ -0,0 +1,79 @@
use super::*;
use zebra_chain::{
block::Block, parameters::Network, serialization::ZcashDeserializeInto, transaction::UnminedTx,
};
use color_eyre::eyre::Result;
#[test]
fn mempool_storage_basic() -> Result<()> {
zebra_test::init();
mempool_storage_basic_for_network(Network::Mainnet)?;
mempool_storage_basic_for_network(Network::Testnet)?;
Ok(())
}
fn mempool_storage_basic_for_network(network: Network) -> Result<()> {
// Create an empty storage
let mut storage: Storage = Default::default();
// Get transactions from the first 10 blocks of the Zcash blockchain
let (total_transactions, unmined_transactions) = unmined_transactions_in_blocks(10, network);
// Insert them all to the storage
for unmined_transaction in unmined_transactions.clone() {
storage.insert(unmined_transaction)?;
}
// Only MEMPOOL_SIZE should land in verified
assert_eq!(storage.verified.len(), MEMPOOL_SIZE);
// The rest of the transactions will be in rejected
assert_eq!(storage.rejected.len(), total_transactions - MEMPOOL_SIZE);
// Make sure the last MEMPOOL_SIZE transactions we sent are in the verified
for tx in unmined_transactions.iter().rev().take(MEMPOOL_SIZE) {
assert!(storage.clone().contains(&tx.id));
}
// Anything greater should not be in the verified
for tx in unmined_transactions
.iter()
.take(unmined_transactions.len() - MEMPOOL_SIZE)
{
assert!(!storage.clone().contains(&tx.id));
}
Ok(())
}
pub fn unmined_transactions_in_blocks(
last_block_height: u32,
network: Network,
) -> (usize, Vec<UnminedTx>) {
let mut transactions = vec![];
let mut total = 0;
let block_iter = match network {
Network::Mainnet => zebra_test::vectors::MAINNET_BLOCKS.iter(),
Network::Testnet => zebra_test::vectors::TESTNET_BLOCKS.iter(),
};
for (&height, block) in block_iter {
if height <= last_block_height {
let block = block
.zcash_deserialize_into::<Block>()
.expect("block is structurally valid");
for transaction in block.transactions.iter() {
transactions.push(UnminedTx::from(transaction));
total += 1;
}
}
}
(total, transactions)
}

View File

@ -0,0 +1,46 @@
use super::*;
use color_eyre::Report;
use std::collections::HashSet;
use storage::tests::unmined_transactions_in_blocks;
use tower::ServiceExt;
#[tokio::test]
async fn mempool_service_basic() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
// get the genesis block transactions from the Zcash blockchain.
let genesis_transactions = unmined_transactions_in_blocks(0, network);
// Start the mempool service
let mut service = Mempool::new(network);
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage.insert(genesis_transactions.1[0].clone())?;
// Test `Request::TransactionIds`
let response = service
.clone()
.oneshot(Request::TransactionIds)
.await
.unwrap();
let transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
// Test `Request::TransactionsById`
let hash_set = transaction_ids.iter().copied().collect::<HashSet<_>>();
let response = service
.oneshot(Request::TransactionsById(hash_set))
.await
.unwrap();
let transactions = match response {
Response::Transactions(transactions) => transactions,
_ => unreachable!("will never happen in this test"),
};
// Make sure the transaction from the blockchain test vector is the same as the
// response of `Request::TransactionsById`
assert_eq!(genesis_transactions.1[0], transactions[0]);
Ok(())
}