This commit is contained in:
Arya 2024-09-03 19:50:28 -04:00
parent ec2ab5cca9
commit a05d618d8d
3 changed files with 112 additions and 76 deletions

View File

@ -132,7 +132,10 @@ impl ActiveState {
} => {
let mut transactions = Vec::new();
let storage = storage.transactions().map(|tx| tx.clone().into());
let storage = storage
.transactions()
.values()
.map(|tx| tx.transaction.clone().into());
transactions.extend(storage);
let pending = tx_downloads.transaction_requests().cloned();
@ -715,6 +718,7 @@ impl Service<Request> for Mempool {
async move { Ok(Response::Transactions(res)) }.boxed()
}
Request::TransactionsByMinedId(ref ids) => {
trace!(?req, "got mempool request");
@ -732,7 +736,7 @@ impl Service<Request> for Mempool {
Request::FullTransactions => {
trace!(?req, "got mempool request");
let transactions: Vec<_> = storage.full_transactions().cloned().collect();
let transactions: Vec<_> = storage.transactions().values().cloned().collect();
trace!(?req, transactions_count = ?transactions.len(), "answered mempool request");

View File

@ -16,8 +16,9 @@ use std::{
use thiserror::Error;
use zebra_chain::transaction::{
self, Hash, Transaction, UnminedTx, UnminedTxId, VerifiedUnminedTx,
use zebra_chain::{
transaction::{self, Hash, Transaction, UnminedTx, UnminedTxId, VerifiedUnminedTx},
transparent::OutPoint,
};
use zebra_state::PendingUtxos;
@ -117,10 +118,14 @@ pub struct Storage {
/// The set of verified transactions in the mempool.
verified: VerifiedSet,
/// The set of partially verified transactions in the mempool that can be added to the set of verified transactions
/// once a set of UTXOs are available in the mempool if there are no circular dependencies.
partially_verified: Vec<(VerifiedUnminedTx, Vec<OutPoint>)>,
// Pending UTXO Request Tracking
//
/// The set of outpoints with pending requests for their associated transparent::Output.
_pending_utxos: PendingUtxos,
pending_utxos: PendingUtxos,
/// The set of transactions rejected due to bad authorizations, or for other
/// reasons, and their rejection reasons. These rejections only apply to the
@ -169,7 +174,8 @@ impl Storage {
pub(crate) fn new(config: &config::Config) -> Self {
Self {
tx_cost_limit: config.tx_cost_limit,
_pending_utxos: PendingUtxos::default(),
partially_verified: Default::default(),
pending_utxos: PendingUtxos::default(),
eviction_memory_time: config.eviction_memory_time,
verified: Default::default(),
tip_rejected_exact: Default::default(),
@ -221,18 +227,46 @@ impl Storage {
// Then, we try to insert into the pool. If this fails the transaction is rejected.
let mut result = Ok(tx_id);
if let Err(rejection_error) = self.verified.insert(tx) {
tracing::debug!(
?tx_id,
?rejection_error,
stored_transaction_count = ?self.verified.transaction_count(),
"insertion error for transaction",
);
// We could return here, but we still want to check the mempool size
self.reject(tx_id, rejection_error.clone().into());
result = Err(rejection_error.into());
}
let _min_tx_index_in_block = match self.verified.insert(tx) {
Ok(min_tx_index_in_block) => min_tx_index_in_block,
Err(rejection_error) => {
tracing::debug!(
?tx_id,
?rejection_error,
stored_transaction_count = ?self.verified.transaction_count(),
"insertion error for transaction",
);
// We could return here, but we still want to check the mempool size
self.reject(tx_id, rejection_error.clone().into());
result = Err(rejection_error.into());
0
}
};
// What happens if tx1 depends on tx2, which depends on tx3, up to 10
// where they're all submitted at once, all queued to be downloaded and verified,
// and all trigger the transaction verifier to call the mempool with an AwaitUTXO, then
// when the mempool is polled, tx10 is verified because no dependencies, triggering a response
// for tx9's AwaitUtxo channel, but that won't trigger another response for tx8 until
// the mempool's poll_ready() method is called again .. so .. eagerly make the UTXOs available in a `SentHashes` type struct? Like ..
// an `partially_verified` set that still needs its UTXOs to be ready?
//
// TODO: Assign verified unmined transactions with a tentative/minimum tx-in-block index
// based on what unmined UTXOs it spends and any upstream unmined UTXO spends.
//
// # Consensus
//
// > Every non-null prevout MUST point to a unique UTXO in either a preceding block,
// > or a previous transaction in the same block.
//
// <https://zips.z.cash/protocol/protocol.pdf#txnconsensus>
// let new_outputs = tx.transaction.transaction.outputs();
// self.pending_utxos.check_against_ordered(new_outputs);
// TODO: Evict any transactions that depend on the randomly evicted transaction as well.
// Once inserted, we evict transactions over the pool size limit per [ZIP-401];
//
@ -334,6 +368,8 @@ impl Storage {
let duplicate_spend_ids: HashSet<_> = self
.verified
.transactions()
.values()
.map(|tx| &tx.transaction)
.filter_map(|tx| {
(tx.transaction
.spent_outpoints()
@ -414,26 +450,20 @@ impl Storage {
/// Returns the set of [`UnminedTxId`]s in the mempool.
pub fn tx_ids(&self) -> impl Iterator<Item = UnminedTxId> + '_ {
self.verified.transactions().map(|tx| tx.id)
self.transactions()
.values()
.map(|tx| &tx.transaction)
.map(|tx| tx.id)
}
/// Returns an iterator over the [`UnminedTx`]s in the mempool.
//
// TODO: make the transactions() method return VerifiedUnminedTx,
// and remove the full_transactions() method
pub fn transactions(&self) -> impl Iterator<Item = &UnminedTx> {
pub fn transactions(&self) -> &HashMap<UnminedTxId, VerifiedUnminedTx> {
self.verified.transactions()
}
/// Returns an iterator over the [`VerifiedUnminedTx`] in the set.
///
/// Each [`VerifiedUnminedTx`] contains an [`UnminedTx`],
/// and adds extra fields from the transaction verifier result.
#[allow(dead_code)]
pub fn full_transactions(&self) -> impl Iterator<Item = &VerifiedUnminedTx> + '_ {
self.verified.full_transactions()
}
/// Returns the number of transactions in the mempool.
#[allow(dead_code)]
pub fn transaction_count(&self) -> usize {
@ -462,9 +492,9 @@ impl Storage {
&self,
tx_ids: HashSet<UnminedTxId>,
) -> impl Iterator<Item = &UnminedTx> {
self.verified
.transactions()
.filter(move |tx| tx_ids.contains(&tx.id))
tx_ids
.into_iter()
.filter_map(|tx_id| self.transactions().get(&tx_id).map(|tx| &tx.transaction))
}
/// Returns the set of [`UnminedTx`]es with matching [`transaction::Hash`]es
@ -478,7 +508,9 @@ impl Storage {
) -> impl Iterator<Item = &UnminedTx> {
self.verified
.transactions()
.filter(move |tx| tx_ids.contains(&tx.id.mined_id()))
.iter()
.filter(move |(tx_id, _)| tx_ids.contains(&tx_id.mined_id()))
.map(|(_, tx)| &tx.transaction)
}
/// Returns `true` if a transaction exactly matching an [`UnminedTxId`] is in
@ -487,7 +519,7 @@ impl Storage {
/// This matches the exact transaction, with identical blockchain effects,
/// signatures, and proofs.
pub fn contains_transaction_exact(&self, txid: &UnminedTxId) -> bool {
self.verified.transactions().any(|tx| &tx.id == txid)
self.verified.contains(txid)
}
/// Returns the number of rejected [`UnminedTxId`]s or [`transaction::Hash`]es.
@ -618,11 +650,11 @@ impl Storage {
// then extracts the mined ID out of it
let mut unmined_id_set = HashSet::new();
for t in self.transactions() {
if let Some(expiry_height) = t.transaction.expiry_height() {
for (&tx_id, tx) in self.transactions() {
if let Some(expiry_height) = tx.transaction.transaction.expiry_height() {
if tip_height >= expiry_height {
txid_set.insert(t.id.mined_id());
unmined_id_set.insert(t.id);
txid_set.insert(tx_id.mined_id());
unmined_id_set.insert(tx_id);
}
}
}
@ -632,8 +664,8 @@ impl Storage {
.remove_all_that(|tx| txid_set.contains(&tx.transaction.id.mined_id()));
// also reject it
for id in unmined_id_set.iter() {
self.reject(*id, SameEffectsChainRejectionError::Expired.into());
for &id in &unmined_id_set {
self.reject(id, SameEffectsChainRejectionError::Expired.into());
}
unmined_id_set

View File

@ -2,7 +2,7 @@
use std::{
borrow::Cow,
collections::{HashSet, VecDeque},
collections::{HashMap, HashSet},
hash::Hash,
};
@ -30,7 +30,13 @@ use zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD;
#[derive(Default)]
pub struct VerifiedSet {
/// The set of verified transactions in the mempool.
transactions: VecDeque<VerifiedUnminedTx>,
transactions: HashMap<UnminedTxId, VerifiedUnminedTx>,
/// A map of transaction dependencies, any transaction that spends outputs
/// of another transaction should be here.
///
/// TODO: Improve docs.
_transaction_dependencies: HashMap<UnminedTxId, Vec<UnminedTxId>>,
/// The total size of the transactions in the mempool if they were
/// serialized.
@ -60,20 +66,9 @@ impl Drop for VerifiedSet {
}
impl VerifiedSet {
/// Returns an iterator over the [`UnminedTx`] in the set.
//
// TODO: make the transactions() method return VerifiedUnminedTx,
// and remove the full_transactions() method
pub fn transactions(&self) -> impl Iterator<Item = &UnminedTx> + '_ {
self.transactions.iter().map(|tx| &tx.transaction)
}
/// Returns an iterator over the [`VerifiedUnminedTx`] in the set.
///
/// Each [`VerifiedUnminedTx`] contains an [`UnminedTx`],
/// and adds extra fields from the transaction verifier result.
pub fn full_transactions(&self) -> impl Iterator<Item = &VerifiedUnminedTx> + '_ {
self.transactions.iter()
/// Returns a reference to the [`HashMap`] of [`VerifiedUnminedTx`]s in the set.
pub fn transactions(&self) -> &HashMap<UnminedTxId, VerifiedUnminedTx> {
&self.transactions
}
/// Returns the number of verified transactions in the set.
@ -99,7 +94,7 @@ impl VerifiedSet {
/// Returns `true` if the set of verified transactions contains the transaction with the
/// specified [`UnminedTxId`].
pub fn contains(&self, id: &UnminedTxId) -> bool {
self.transactions.iter().any(|tx| &tx.transaction.id == id)
self.transactions.contains_key(id)
}
/// Clear the set of verified transactions.
@ -118,6 +113,10 @@ impl VerifiedSet {
/// Insert a `transaction` into the set.
///
/// Returns the number of mempool transactions that must be included in a block before this one
/// so that all of this transactions spent outputs are available in a preceding block,
/// or a previous transaction in the same block if successful.
///
/// Returns an error if the `transaction` has spend conflicts with any other transaction
/// already in the set.
///
@ -126,7 +125,7 @@ impl VerifiedSet {
pub fn insert(
&mut self,
transaction: VerifiedUnminedTx,
) -> Result<(), SameEffectsTipRejectionError> {
) -> Result<usize, SameEffectsTipRejectionError> {
if self.has_spend_conflicts(&transaction.transaction) {
return Err(SameEffectsTipRejectionError::SpendConflict);
}
@ -134,11 +133,12 @@ impl VerifiedSet {
self.cache_outputs_from(&transaction.transaction.transaction);
self.transactions_serialized_size += transaction.transaction.size;
self.total_cost += transaction.cost();
self.transactions.push_front(transaction);
self.transactions
.insert(transaction.transaction.id, transaction);
self.update_metrics();
Ok(())
Ok(0)
}
/// Evict one transaction from the set, returns the victim transaction.
@ -168,16 +168,20 @@ impl VerifiedSet {
use rand::distributions::{Distribution, WeightedIndex};
use rand::prelude::thread_rng;
let weights: Vec<u64> = self
let (keys, weights): (Vec<UnminedTxId>, Vec<u64>) = self
.transactions
.iter()
.map(|tx| tx.clone().eviction_weight())
.collect();
.map(|(&tx_id, tx)| (tx_id, tx.eviction_weight()))
.unzip();
let dist = WeightedIndex::new(weights)
.expect("there is at least one weight, all weights are non-negative, and the total is positive");
Some(self.remove(dist.sample(&mut thread_rng())))
let key_to_remove = keys
.get(dist.sample(&mut thread_rng()))
.expect("should have a key at every index in the distribution");
Some(self.remove(key_to_remove))
}
}
@ -190,20 +194,16 @@ impl VerifiedSet {
// iterator borrows `self.transactions` immutably, but it also need to be borrowed mutably
// in order to remove the transactions while traversing the iterator.
#[allow(clippy::needless_collect)]
let indices_to_remove: Vec<_> = self
let keys_to_remove: Vec<_> = self
.transactions
.iter()
.enumerate()
.filter(|(_, tx)| predicate(tx))
.map(|(index, _)| index)
.filter_map(|(&tx_id, tx)| predicate(tx).then_some(tx_id))
.collect();
let removed_count = indices_to_remove.len();
let removed_count = keys_to_remove.len();
// Correctness: remove indexes in reverse order,
// so earlier indexes still correspond to the same transactions
for index_to_remove in indices_to_remove.into_iter().rev() {
self.remove(index_to_remove);
for key_to_remove in keys_to_remove {
self.remove(&key_to_remove);
}
removed_count
@ -212,11 +212,11 @@ impl VerifiedSet {
/// Removes a transaction from the set.
///
/// Also removes its outputs from the internal caches.
fn remove(&mut self, transaction_index: usize) -> VerifiedUnminedTx {
fn remove(&mut self, key_to_remove: &UnminedTxId) -> VerifiedUnminedTx {
let removed_tx = self
.transactions
.remove(transaction_index)
.expect("invalid transaction index");
.remove(key_to_remove)
.expect("invalid transaction key");
self.transactions_serialized_size -= removed_tx.transaction.size;
self.total_cost -= removed_tx.cost();
@ -308,7 +308,7 @@ impl VerifiedSet {
let mut size_with_weight_gt2 = 0;
let mut size_with_weight_gt3 = 0;
for entry in self.full_transactions() {
for entry in self.transactions().values() {
paid_actions += entry.conventional_actions - entry.unpaid_actions;
if entry.fee_weight_ratio > 3.0 {