From a05d618d8d7580c785195cef89c553851e63f87b Mon Sep 17 00:00:00 2001 From: Arya Date: Tue, 3 Sep 2024 19:50:28 -0400 Subject: [PATCH] draft --- zebrad/src/components/mempool.rs | 8 +- zebrad/src/components/mempool/storage.rs | 106 ++++++++++++------ .../mempool/storage/verified_set.rs | 74 ++++++------ 3 files changed, 112 insertions(+), 76 deletions(-) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 2d9b2b3e0..b7937a2c8 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -132,7 +132,10 @@ impl ActiveState { } => { let mut transactions = Vec::new(); - let storage = storage.transactions().map(|tx| tx.clone().into()); + let storage = storage + .transactions() + .values() + .map(|tx| tx.transaction.clone().into()); transactions.extend(storage); let pending = tx_downloads.transaction_requests().cloned(); @@ -715,6 +718,7 @@ impl Service for Mempool { async move { Ok(Response::Transactions(res)) }.boxed() } + Request::TransactionsByMinedId(ref ids) => { trace!(?req, "got mempool request"); @@ -732,7 +736,7 @@ impl Service for Mempool { Request::FullTransactions => { trace!(?req, "got mempool request"); - let transactions: Vec<_> = storage.full_transactions().cloned().collect(); + let transactions: Vec<_> = storage.transactions().values().cloned().collect(); trace!(?req, transactions_count = ?transactions.len(), "answered mempool request"); diff --git a/zebrad/src/components/mempool/storage.rs b/zebrad/src/components/mempool/storage.rs index cecd4b3f3..8be24220b 100644 --- a/zebrad/src/components/mempool/storage.rs +++ b/zebrad/src/components/mempool/storage.rs @@ -16,8 +16,9 @@ use std::{ use thiserror::Error; -use zebra_chain::transaction::{ - self, Hash, Transaction, UnminedTx, UnminedTxId, VerifiedUnminedTx, +use zebra_chain::{ + transaction::{self, Hash, Transaction, UnminedTx, UnminedTxId, VerifiedUnminedTx}, + transparent::OutPoint, }; use zebra_state::PendingUtxos; @@ -117,10 +118,14 @@ pub struct Storage { /// The set of verified transactions in the mempool. verified: VerifiedSet, + /// The set of partially verified transactions in the mempool that can be added to the set of verified transactions + /// once a set of UTXOs are available in the mempool if there are no circular dependencies. + partially_verified: Vec<(VerifiedUnminedTx, Vec)>, + // Pending UTXO Request Tracking // /// The set of outpoints with pending requests for their associated transparent::Output. - _pending_utxos: PendingUtxos, + pending_utxos: PendingUtxos, /// The set of transactions rejected due to bad authorizations, or for other /// reasons, and their rejection reasons. These rejections only apply to the @@ -169,7 +174,8 @@ impl Storage { pub(crate) fn new(config: &config::Config) -> Self { Self { tx_cost_limit: config.tx_cost_limit, - _pending_utxos: PendingUtxos::default(), + partially_verified: Default::default(), + pending_utxos: PendingUtxos::default(), eviction_memory_time: config.eviction_memory_time, verified: Default::default(), tip_rejected_exact: Default::default(), @@ -221,18 +227,46 @@ impl Storage { // Then, we try to insert into the pool. If this fails the transaction is rejected. let mut result = Ok(tx_id); - if let Err(rejection_error) = self.verified.insert(tx) { - tracing::debug!( - ?tx_id, - ?rejection_error, - stored_transaction_count = ?self.verified.transaction_count(), - "insertion error for transaction", - ); - // We could return here, but we still want to check the mempool size - self.reject(tx_id, rejection_error.clone().into()); - result = Err(rejection_error.into()); - } + let _min_tx_index_in_block = match self.verified.insert(tx) { + Ok(min_tx_index_in_block) => min_tx_index_in_block, + Err(rejection_error) => { + tracing::debug!( + ?tx_id, + ?rejection_error, + stored_transaction_count = ?self.verified.transaction_count(), + "insertion error for transaction", + ); + + // We could return here, but we still want to check the mempool size + self.reject(tx_id, rejection_error.clone().into()); + result = Err(rejection_error.into()); + 0 + } + }; + + // What happens if tx1 depends on tx2, which depends on tx3, up to 10 + // where they're all submitted at once, all queued to be downloaded and verified, + // and all trigger the transaction verifier to call the mempool with an AwaitUTXO, then + // when the mempool is polled, tx10 is verified because no dependencies, triggering a response + // for tx9's AwaitUtxo channel, but that won't trigger another response for tx8 until + // the mempool's poll_ready() method is called again .. so .. eagerly make the UTXOs available in a `SentHashes` type struct? Like .. + // an `partially_verified` set that still needs its UTXOs to be ready? + // + // TODO: Assign verified unmined transactions with a tentative/minimum tx-in-block index + // based on what unmined UTXOs it spends and any upstream unmined UTXO spends. + // + // # Consensus + // + // > Every non-null prevout MUST point to a unique UTXO in either a preceding block, + // > or a previous transaction in the same block. + // + // + + // let new_outputs = tx.transaction.transaction.outputs(); + // self.pending_utxos.check_against_ordered(new_outputs); + + // TODO: Evict any transactions that depend on the randomly evicted transaction as well. // Once inserted, we evict transactions over the pool size limit per [ZIP-401]; // @@ -334,6 +368,8 @@ impl Storage { let duplicate_spend_ids: HashSet<_> = self .verified .transactions() + .values() + .map(|tx| &tx.transaction) .filter_map(|tx| { (tx.transaction .spent_outpoints() @@ -414,26 +450,20 @@ impl Storage { /// Returns the set of [`UnminedTxId`]s in the mempool. pub fn tx_ids(&self) -> impl Iterator + '_ { - self.verified.transactions().map(|tx| tx.id) + self.transactions() + .values() + .map(|tx| &tx.transaction) + .map(|tx| tx.id) } /// Returns an iterator over the [`UnminedTx`]s in the mempool. // // TODO: make the transactions() method return VerifiedUnminedTx, // and remove the full_transactions() method - pub fn transactions(&self) -> impl Iterator { + pub fn transactions(&self) -> &HashMap { self.verified.transactions() } - /// Returns an iterator over the [`VerifiedUnminedTx`] in the set. - /// - /// Each [`VerifiedUnminedTx`] contains an [`UnminedTx`], - /// and adds extra fields from the transaction verifier result. - #[allow(dead_code)] - pub fn full_transactions(&self) -> impl Iterator + '_ { - self.verified.full_transactions() - } - /// Returns the number of transactions in the mempool. #[allow(dead_code)] pub fn transaction_count(&self) -> usize { @@ -462,9 +492,9 @@ impl Storage { &self, tx_ids: HashSet, ) -> impl Iterator { - self.verified - .transactions() - .filter(move |tx| tx_ids.contains(&tx.id)) + tx_ids + .into_iter() + .filter_map(|tx_id| self.transactions().get(&tx_id).map(|tx| &tx.transaction)) } /// Returns the set of [`UnminedTx`]es with matching [`transaction::Hash`]es @@ -478,7 +508,9 @@ impl Storage { ) -> impl Iterator { self.verified .transactions() - .filter(move |tx| tx_ids.contains(&tx.id.mined_id())) + .iter() + .filter(move |(tx_id, _)| tx_ids.contains(&tx_id.mined_id())) + .map(|(_, tx)| &tx.transaction) } /// Returns `true` if a transaction exactly matching an [`UnminedTxId`] is in @@ -487,7 +519,7 @@ impl Storage { /// This matches the exact transaction, with identical blockchain effects, /// signatures, and proofs. pub fn contains_transaction_exact(&self, txid: &UnminedTxId) -> bool { - self.verified.transactions().any(|tx| &tx.id == txid) + self.verified.contains(txid) } /// Returns the number of rejected [`UnminedTxId`]s or [`transaction::Hash`]es. @@ -618,11 +650,11 @@ impl Storage { // then extracts the mined ID out of it let mut unmined_id_set = HashSet::new(); - for t in self.transactions() { - if let Some(expiry_height) = t.transaction.expiry_height() { + for (&tx_id, tx) in self.transactions() { + if let Some(expiry_height) = tx.transaction.transaction.expiry_height() { if tip_height >= expiry_height { - txid_set.insert(t.id.mined_id()); - unmined_id_set.insert(t.id); + txid_set.insert(tx_id.mined_id()); + unmined_id_set.insert(tx_id); } } } @@ -632,8 +664,8 @@ impl Storage { .remove_all_that(|tx| txid_set.contains(&tx.transaction.id.mined_id())); // also reject it - for id in unmined_id_set.iter() { - self.reject(*id, SameEffectsChainRejectionError::Expired.into()); + for &id in &unmined_id_set { + self.reject(id, SameEffectsChainRejectionError::Expired.into()); } unmined_id_set diff --git a/zebrad/src/components/mempool/storage/verified_set.rs b/zebrad/src/components/mempool/storage/verified_set.rs index a9c850b4e..237b9cc67 100644 --- a/zebrad/src/components/mempool/storage/verified_set.rs +++ b/zebrad/src/components/mempool/storage/verified_set.rs @@ -2,7 +2,7 @@ use std::{ borrow::Cow, - collections::{HashSet, VecDeque}, + collections::{HashMap, HashSet}, hash::Hash, }; @@ -30,7 +30,13 @@ use zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD; #[derive(Default)] pub struct VerifiedSet { /// The set of verified transactions in the mempool. - transactions: VecDeque, + transactions: HashMap, + + /// A map of transaction dependencies, any transaction that spends outputs + /// of another transaction should be here. + /// + /// TODO: Improve docs. + _transaction_dependencies: HashMap>, /// The total size of the transactions in the mempool if they were /// serialized. @@ -60,20 +66,9 @@ impl Drop for VerifiedSet { } impl VerifiedSet { - /// Returns an iterator over the [`UnminedTx`] in the set. - // - // TODO: make the transactions() method return VerifiedUnminedTx, - // and remove the full_transactions() method - pub fn transactions(&self) -> impl Iterator + '_ { - self.transactions.iter().map(|tx| &tx.transaction) - } - - /// Returns an iterator over the [`VerifiedUnminedTx`] in the set. - /// - /// Each [`VerifiedUnminedTx`] contains an [`UnminedTx`], - /// and adds extra fields from the transaction verifier result. - pub fn full_transactions(&self) -> impl Iterator + '_ { - self.transactions.iter() + /// Returns a reference to the [`HashMap`] of [`VerifiedUnminedTx`]s in the set. + pub fn transactions(&self) -> &HashMap { + &self.transactions } /// Returns the number of verified transactions in the set. @@ -99,7 +94,7 @@ impl VerifiedSet { /// Returns `true` if the set of verified transactions contains the transaction with the /// specified [`UnminedTxId`]. pub fn contains(&self, id: &UnminedTxId) -> bool { - self.transactions.iter().any(|tx| &tx.transaction.id == id) + self.transactions.contains_key(id) } /// Clear the set of verified transactions. @@ -118,6 +113,10 @@ impl VerifiedSet { /// Insert a `transaction` into the set. /// + /// Returns the number of mempool transactions that must be included in a block before this one + /// so that all of this transactions spent outputs are available in a preceding block, + /// or a previous transaction in the same block if successful. + /// /// Returns an error if the `transaction` has spend conflicts with any other transaction /// already in the set. /// @@ -126,7 +125,7 @@ impl VerifiedSet { pub fn insert( &mut self, transaction: VerifiedUnminedTx, - ) -> Result<(), SameEffectsTipRejectionError> { + ) -> Result { if self.has_spend_conflicts(&transaction.transaction) { return Err(SameEffectsTipRejectionError::SpendConflict); } @@ -134,11 +133,12 @@ impl VerifiedSet { self.cache_outputs_from(&transaction.transaction.transaction); self.transactions_serialized_size += transaction.transaction.size; self.total_cost += transaction.cost(); - self.transactions.push_front(transaction); + self.transactions + .insert(transaction.transaction.id, transaction); self.update_metrics(); - Ok(()) + Ok(0) } /// Evict one transaction from the set, returns the victim transaction. @@ -168,16 +168,20 @@ impl VerifiedSet { use rand::distributions::{Distribution, WeightedIndex}; use rand::prelude::thread_rng; - let weights: Vec = self + let (keys, weights): (Vec, Vec) = self .transactions .iter() - .map(|tx| tx.clone().eviction_weight()) - .collect(); + .map(|(&tx_id, tx)| (tx_id, tx.eviction_weight())) + .unzip(); let dist = WeightedIndex::new(weights) .expect("there is at least one weight, all weights are non-negative, and the total is positive"); - Some(self.remove(dist.sample(&mut thread_rng()))) + let key_to_remove = keys + .get(dist.sample(&mut thread_rng())) + .expect("should have a key at every index in the distribution"); + + Some(self.remove(key_to_remove)) } } @@ -190,20 +194,16 @@ impl VerifiedSet { // iterator borrows `self.transactions` immutably, but it also need to be borrowed mutably // in order to remove the transactions while traversing the iterator. #[allow(clippy::needless_collect)] - let indices_to_remove: Vec<_> = self + let keys_to_remove: Vec<_> = self .transactions .iter() - .enumerate() - .filter(|(_, tx)| predicate(tx)) - .map(|(index, _)| index) + .filter_map(|(&tx_id, tx)| predicate(tx).then_some(tx_id)) .collect(); - let removed_count = indices_to_remove.len(); + let removed_count = keys_to_remove.len(); - // Correctness: remove indexes in reverse order, - // so earlier indexes still correspond to the same transactions - for index_to_remove in indices_to_remove.into_iter().rev() { - self.remove(index_to_remove); + for key_to_remove in keys_to_remove { + self.remove(&key_to_remove); } removed_count @@ -212,11 +212,11 @@ impl VerifiedSet { /// Removes a transaction from the set. /// /// Also removes its outputs from the internal caches. - fn remove(&mut self, transaction_index: usize) -> VerifiedUnminedTx { + fn remove(&mut self, key_to_remove: &UnminedTxId) -> VerifiedUnminedTx { let removed_tx = self .transactions - .remove(transaction_index) - .expect("invalid transaction index"); + .remove(key_to_remove) + .expect("invalid transaction key"); self.transactions_serialized_size -= removed_tx.transaction.size; self.total_cost -= removed_tx.cost(); @@ -308,7 +308,7 @@ impl VerifiedSet { let mut size_with_weight_gt2 = 0; let mut size_with_weight_gt3 = 0; - for entry in self.full_transactions() { + for entry in self.transactions().values() { paid_actions += entry.conventional_actions - entry.unpaid_actions; if entry.fee_weight_ratio > 3.0 {