Merge pull request #36 from paritytech/dp/chore/add-transaction-pool

Add transaction-pool
This commit is contained in:
David 2018-08-23 14:56:38 +02:00 committed by GitHub
commit ec68e03c50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 2420 additions and 0 deletions

View File

@ -13,6 +13,8 @@ members = [
"patricia_trie",
"plain_hasher",
"rlp",
"transaction-pool",
"trace-time",
"trie-standardmap",
"triehash",
"uint"

9
trace-time/Cargo.toml Normal file
View File

@ -0,0 +1,9 @@
[package]
name = "trace-time"
description = "Easily trace time to execute a scope."
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "GPL-3.0"
[dependencies]
log = "0.4"

55
trace-time/src/lib.rs Normal file
View File

@ -0,0 +1,55 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Performance timer with logging
#[macro_use]
extern crate log;
use std::time::Instant;
#[macro_export]
macro_rules! trace_time {
($name: expr) => {
let _timer = $crate::PerfTimer::new($name);
}
}
/// Performance timer with logging. Starts measuring time in the constructor, prints
/// elapsed time in the destructor or when `stop` is called.
pub struct PerfTimer {
name: &'static str,
start: Instant,
}
impl PerfTimer {
/// Create an instance with given name.
pub fn new(name: &'static str) -> PerfTimer {
PerfTimer {
name,
start: Instant::now(),
}
}
}
impl Drop for PerfTimer {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
let ms = elapsed.subsec_nanos() as f32 / 1_000_000.0 +
elapsed.as_secs() as f32 * 1_000.0;
trace!(target: "perf", "{}: {:.2}ms", self.name, ms);
}
}

View File

@ -0,0 +1,15 @@
[package]
description = "Generic transaction pool."
name = "transaction-pool"
version = "1.13.2"
license = "GPL-3.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
error-chain = "0.12"
log = "0.4"
smallvec = "0.4"
trace-time = { path = "../trace-time", version = "0.1" }
[dev-dependencies]
ethereum-types = "0.4"

View File

@ -0,0 +1,53 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
/// Error chain doesn't let us have generic types.
/// So the hashes are converted to debug strings for easy display.
type Hash = String;
error_chain! {
errors {
/// Transaction is already imported
AlreadyImported(hash: Hash) {
description("transaction is already in the pool"),
display("[{}] already imported", hash)
}
/// Transaction is too cheap to enter the queue
TooCheapToEnter(hash: Hash, min_score: String) {
description("the pool is full and transaction is too cheap to replace any transaction"),
display("[{}] too cheap to enter the pool. Min score: {}", hash, min_score)
}
/// Transaction is too cheap to replace existing transaction that occupies the same slot.
TooCheapToReplace(old_hash: Hash, hash: Hash) {
description("transaction is too cheap to replace existing transaction in the pool"),
display("[{}] too cheap to replace: {}", hash, old_hash)
}
}
}
#[cfg(test)]
impl PartialEq for ErrorKind {
fn eq(&self, other: &Self) -> bool {
use self::ErrorKind::*;
match (self, other) {
(&AlreadyImported(ref h1), &AlreadyImported(ref h2)) => h1 == h2,
(&TooCheapToEnter(ref h1, ref s1), &TooCheapToEnter(ref h2, ref s2)) => h1 == h2 && s1 == s2,
(&TooCheapToReplace(ref old1, ref new1), &TooCheapToReplace(ref old2, ref new2)) => old1 == old2 && new1 == new2,
_ => false,
}
}
}

124
transaction-pool/src/lib.rs Normal file
View File

@ -0,0 +1,124 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Generic Transaction Pool
//!
//! An extensible and performant implementation of Ethereum Transaction Pool.
//! The pool stores ordered, verified transactions according to some pluggable
//! `Scoring` implementation.
//! The pool also allows you to construct a set of `pending` transactions according
//! to some notion of `Readiness` (pluggable).
//!
//! The pool is generic over transactions and should make no assumptions about them.
//! The only thing we can rely on is the `Scoring` that defines:
//! - the ordering of transactions from a single sender
//! - the priority of the transaction compared to other transactions from different senders
//!
//! NOTE: the transactions from a single sender are not ordered by priority,
//! but still when constructing pending set we always need to maintain the ordering
//! (i.e. `txs[1]` always needs to be included after `txs[0]` even if it has higher priority)
//!
//! ### Design Details
//!
//! Performance assumptions:
//! - Possibility to handle tens of thousands of transactions
//! - Fast insertions and replacements `O(per-sender + log(senders))`
//! - Reasonably fast removal of stalled transactions `O(per-sender)`
//! - Reasonably fast construction of pending set `O(txs * (log(senders) + log(per-sender))`
//!
//! The removal performance could be improved by trading some memory. Currently `SmallVec` is used
//! to store senders transactions, instead we could use `VecDeque` and efficiently `pop_front`
//! the best transactions.
//!
//! The pending set construction and insertion complexity could be reduced by introducing
//! a notion of `nonce` - an absolute, numeric ordering of transactions.
//! We don't do that because of possible implications of EIP208 where nonce might not be
//! explicitly available.
//!
//! 1. The pool groups transactions from particular sender together
//! and stores them ordered by `Scoring` within that group
//! i.e. `HashMap<Sender, Vec<Transaction>>`.
//! 2. Additionaly we maintain the best and the worst transaction from each sender
//! (by `Scoring` not `priority`) ordered by `priority`.
//! It means that we can easily identify the best transaction inside the entire pool
//! and the worst transaction.
//! 3. Whenever new transaction is inserted to the queue:
//! - first check all the limits (overall, memory, per-sender)
//! - retrieve all transactions from a sender
//! - binary search for position to insert the transaction
//! - decide if we are replacing existing transaction (3 outcomes: drop, replace, insert)
//! - update best and worst transaction from that sender if affected
//! 4. Pending List construction:
//! - Take the best transaction (by priority) from all senders to the List
//! - Replace the transaction with next transaction (by ordering) from that sender (if any)
//! - Repeat
#![warn(missing_docs)]
extern crate smallvec;
extern crate trace_time;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
#[cfg(test)]
extern crate ethereum_types;
#[cfg(test)]
mod tests;
mod error;
mod listener;
mod options;
mod pool;
mod ready;
mod status;
mod transactions;
mod verifier;
pub mod scoring;
pub use self::error::{Error, ErrorKind};
pub use self::listener::{Listener, NoopListener};
pub use self::options::Options;
pub use self::pool::{Pool, PendingIterator, UnorderedIterator, Transaction};
pub use self::ready::{Ready, Readiness};
pub use self::scoring::Scoring;
pub use self::status::{LightStatus, Status};
pub use self::verifier::Verifier;
use std::fmt;
use std::hash::Hash;
/// Already verified transaction that can be safely queued.
pub trait VerifiedTransaction: fmt::Debug {
/// Transaction hash type.
type Hash: fmt::Debug + fmt::LowerHex + Eq + Clone + Hash;
/// Transaction sender type.
type Sender: fmt::Debug + Eq + Clone + Hash + Send;
/// Transaction hash
fn hash(&self) -> &Self::Hash;
/// Memory usage
fn mem_usage(&self) -> usize;
/// Transaction sender
fn sender(&self) -> &Self::Sender;
}

View File

@ -0,0 +1,85 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use error::ErrorKind;
/// Transaction pool listener.
///
/// Listener is being notified about status of every transaction in the pool.
pub trait Listener<T> {
/// The transaction has been successfuly added to the pool.
/// If second argument is `Some` the transaction has took place of some other transaction
/// which was already in pool.
/// NOTE: You won't be notified about drop of `old` transaction separately.
fn added(&mut self, _tx: &Arc<T>, _old: Option<&Arc<T>>) {}
/// The transaction was rejected from the pool.
/// It means that it was too cheap to replace any transaction already in the pool.
fn rejected(&mut self, _tx: &Arc<T>, _reason: &ErrorKind) {}
/// The transaction was pushed out from the pool because of the limit.
fn dropped(&mut self, _tx: &Arc<T>, _by: Option<&T>) {}
/// The transaction was marked as invalid by executor.
fn invalid(&mut self, _tx: &Arc<T>) {}
/// The transaction has been canceled.
fn canceled(&mut self, _tx: &Arc<T>) {}
/// The transaction has been culled from the pool.
fn culled(&mut self, _tx: &Arc<T>) {}
}
/// A no-op implementation of `Listener`.
#[derive(Debug)]
pub struct NoopListener;
impl<T> Listener<T> for NoopListener {}
impl<T, A, B> Listener<T> for (A, B) where
A: Listener<T>,
B: Listener<T>,
{
fn added(&mut self, tx: &Arc<T>, old: Option<&Arc<T>>) {
self.0.added(tx, old);
self.1.added(tx, old);
}
fn rejected(&mut self, tx: &Arc<T>, reason: &ErrorKind) {
self.0.rejected(tx, reason);
self.1.rejected(tx, reason);
}
fn dropped(&mut self, tx: &Arc<T>, by: Option<&T>) {
self.0.dropped(tx, by);
self.1.dropped(tx, by);
}
fn invalid(&mut self, tx: &Arc<T>) {
self.0.invalid(tx);
self.1.invalid(tx);
}
fn canceled(&mut self, tx: &Arc<T>) {
self.0.canceled(tx);
self.1.canceled(tx);
}
fn culled(&mut self, tx: &Arc<T>) {
self.0.culled(tx);
self.1.culled(tx);
}
}

View File

@ -0,0 +1,36 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
/// Transaction Pool options.
#[derive(Clone, Debug, PartialEq)]
pub struct Options {
/// Maximal number of transactions in the pool.
pub max_count: usize,
/// Maximal number of transactions from single sender.
pub max_per_sender: usize,
/// Maximal memory usage.
pub max_mem_usage: usize,
}
impl Default for Options {
fn default() -> Self {
Options {
max_count: 1024,
max_per_sender: 16,
max_mem_usage: 8 * 1024 * 1024,
}
}
}

View File

@ -0,0 +1,616 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::slice;
use std::collections::{hash_map, HashMap, BTreeSet};
use error;
use listener::{Listener, NoopListener};
use options::Options;
use ready::{Ready, Readiness};
use scoring::{self, Scoring, ScoreWithRef};
use status::{LightStatus, Status};
use transactions::{AddResult, Transactions};
use {VerifiedTransaction};
/// Internal representation of transaction.
///
/// Includes unique insertion id that can be used for scoring explictly,
/// but internally is used to resolve conflicts in case of equal scoring
/// (newer transactionsa are preferred).
#[derive(Debug)]
pub struct Transaction<T> {
/// Sequential id of the transaction
pub insertion_id: u64,
/// Shared transaction
pub transaction: Arc<T>,
}
impl<T> Clone for Transaction<T> {
fn clone(&self) -> Self {
Transaction {
insertion_id: self.insertion_id,
transaction: self.transaction.clone(),
}
}
}
impl<T> ::std::ops::Deref for Transaction<T> {
type Target = Arc<T>;
fn deref(&self) -> &Self::Target {
&self.transaction
}
}
/// A transaction pool.
#[derive(Debug)]
pub struct Pool<T: VerifiedTransaction, S: Scoring<T>, L = NoopListener> {
listener: L,
scoring: S,
options: Options,
mem_usage: usize,
transactions: HashMap<T::Sender, Transactions<T, S>>,
by_hash: HashMap<T::Hash, Transaction<T>>,
best_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
worst_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
insertion_id: u64,
}
impl<T: VerifiedTransaction, S: Scoring<T> + Default> Default for Pool<T, S> {
fn default() -> Self {
Self::with_scoring(S::default(), Options::default())
}
}
impl<T: VerifiedTransaction, S: Scoring<T> + Default> Pool<T, S> {
/// Creates a new `Pool` with given options
/// and default `Scoring` and `Listener`.
pub fn with_options(options: Options) -> Self {
Self::with_scoring(S::default(), options)
}
}
impl<T: VerifiedTransaction, S: Scoring<T>> Pool<T, S> {
/// Creates a new `Pool` with given `Scoring` and options.
pub fn with_scoring(scoring: S, options: Options) -> Self {
Self::new(NoopListener, scoring, options)
}
}
const INITIAL_NUMBER_OF_SENDERS: usize = 16;
impl<T, S, L> Pool<T, S, L> where
T: VerifiedTransaction,
S: Scoring<T>,
L: Listener<T>,
{
/// Creates new `Pool` with given `Scoring`, `Listener` and options.
pub fn new(listener: L, scoring: S, options: Options) -> Self {
let transactions = HashMap::with_capacity(INITIAL_NUMBER_OF_SENDERS);
let by_hash = HashMap::with_capacity(options.max_count / 16);
Pool {
listener,
scoring,
options,
mem_usage: 0,
transactions,
by_hash,
best_transactions: Default::default(),
worst_transactions: Default::default(),
insertion_id: 0,
}
}
/// Attempts to import new transaction to the pool, returns a `Arc<T>` or an `Error`.
///
/// NOTE: Since `Ready`ness is separate from the pool it's possible to import stalled transactions.
/// It's the caller responsibility to make sure that's not the case.
///
/// NOTE: The transaction may push out some other transactions from the pool
/// either because of limits (see `Options`) or because `Scoring` decides that the transaction
/// replaces an existing transaction from that sender.
/// If any limit is reached the transaction with the lowest `Score` is evicted to make room.
///
/// The `Listener` will be informed on any drops or rejections.
pub fn import(&mut self, transaction: T) -> error::Result<Arc<T>> {
let mem_usage = transaction.mem_usage();
ensure!(!self.by_hash.contains_key(transaction.hash()), error::ErrorKind::AlreadyImported(format!("{:?}", transaction.hash())));
self.insertion_id += 1;
let transaction = Transaction {
insertion_id: self.insertion_id,
transaction: Arc::new(transaction),
};
// TODO [ToDr] Most likely move this after the transaction is inserted.
// Avoid using should_replace, but rather use scoring for that.
{
let remove_worst = |s: &mut Self, transaction| {
match s.remove_worst(transaction) {
Err(err) => {
s.listener.rejected(transaction, err.kind());
Err(err)
},
Ok(None) => Ok(false),
Ok(Some(removed)) => {
s.listener.dropped(&removed, Some(transaction));
s.finalize_remove(removed.hash());
Ok(true)
},
}
};
while self.by_hash.len() + 1 > self.options.max_count {
trace!("Count limit reached: {} > {}", self.by_hash.len() + 1, self.options.max_count);
if !remove_worst(self, &transaction)? {
break;
}
}
while self.mem_usage + mem_usage > self.options.max_mem_usage {
trace!("Mem limit reached: {} > {}", self.mem_usage + mem_usage, self.options.max_mem_usage);
if !remove_worst(self, &transaction)? {
break;
}
}
}
let (result, prev_state, current_state) = {
let transactions = self.transactions.entry(transaction.sender().clone()).or_insert_with(Transactions::default);
// get worst and best transactions for comparison
let prev = transactions.worst_and_best();
let result = transactions.add(transaction, &self.scoring, self.options.max_per_sender);
let current = transactions.worst_and_best();
(result, prev, current)
};
// update best and worst transactions from this sender (if required)
self.update_senders_worst_and_best(prev_state, current_state);
match result {
AddResult::Ok(tx) => {
self.listener.added(&tx, None);
self.finalize_insert(&tx, None);
Ok(tx.transaction)
},
AddResult::PushedOut { new, old } |
AddResult::Replaced { new, old } => {
self.listener.added(&new, Some(&old));
self.finalize_insert(&new, Some(&old));
Ok(new.transaction)
},
AddResult::TooCheap { new, old } => {
let error = error::ErrorKind::TooCheapToReplace(format!("{:x}", old.hash()), format!("{:x}", new.hash()));
self.listener.rejected(&new, &error);
bail!(error)
},
AddResult::TooCheapToEnter(new, score) => {
let error = error::ErrorKind::TooCheapToEnter(format!("{:x}", new.hash()), format!("{:#x}", score));
self.listener.rejected(&new, &error);
bail!(error)
}
}
}
/// Updates state of the pool statistics if the transaction was added to a set.
fn finalize_insert(&mut self, new: &Transaction<T>, old: Option<&Transaction<T>>) {
self.mem_usage += new.mem_usage();
self.by_hash.insert(new.hash().clone(), new.clone());
if let Some(old) = old {
self.finalize_remove(old.hash());
}
}
/// Updates the pool statistics if transaction was removed.
fn finalize_remove(&mut self, hash: &T::Hash) -> Option<Arc<T>> {
self.by_hash.remove(hash).map(|old| {
self.mem_usage -= old.transaction.mem_usage();
old.transaction
})
}
/// Updates best and worst transactions from a sender.
fn update_senders_worst_and_best(
&mut self,
previous: Option<((S::Score, Transaction<T>), (S::Score, Transaction<T>))>,
current: Option<((S::Score, Transaction<T>), (S::Score, Transaction<T>))>,
) {
let worst_collection = &mut self.worst_transactions;
let best_collection = &mut self.best_transactions;
let is_same = |a: &(S::Score, Transaction<T>), b: &(S::Score, Transaction<T>)| {
a.0 == b.0 && a.1.hash() == b.1.hash()
};
let update = |collection: &mut BTreeSet<_>, (score, tx), remove| if remove {
collection.remove(&ScoreWithRef::new(score, tx));
} else {
collection.insert(ScoreWithRef::new(score, tx));
};
match (previous, current) {
(None, Some((worst, best))) => {
update(worst_collection, worst, false);
update(best_collection, best, false);
},
(Some((worst, best)), None) => {
// all transactions from that sender has been removed.
// We can clear a hashmap entry.
self.transactions.remove(worst.1.sender());
update(worst_collection, worst, true);
update(best_collection, best, true);
},
(Some((w1, b1)), Some((w2, b2))) => {
if !is_same(&w1, &w2) {
update(worst_collection, w1, true);
update(worst_collection, w2, false);
}
if !is_same(&b1, &b2) {
update(best_collection, b1, true);
update(best_collection, b2, false);
}
},
(None, None) => {},
}
}
/// Attempts to remove the worst transaction from the pool if it's worse than the given one.
///
/// Returns `None` in case we couldn't decide if the transaction should replace the worst transaction or not.
/// In such case we will accept the transaction even though it is going to exceed the limit.
fn remove_worst(&mut self, transaction: &Transaction<T>) -> error::Result<Option<Transaction<T>>> {
let to_remove = match self.worst_transactions.iter().next_back() {
// No elements to remove? and the pool is still full?
None => {
warn!("The pool is full but there are no transactions to remove.");
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), "unknown".into()).into());
},
Some(old) => match self.scoring.should_replace(&old.transaction, transaction) {
// We can't decide which of them should be removed, so accept both.
scoring::Choice::InsertNew => None,
// New transaction is better than the worst one so we can replace it.
scoring::Choice::ReplaceOld => Some(old.clone()),
// otherwise fail
scoring::Choice::RejectNew => {
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), format!("{:#x}", old.score)).into())
},
},
};
if let Some(to_remove) = to_remove {
// Remove from transaction set
self.remove_from_set(to_remove.transaction.sender(), |set, scoring| {
set.remove(&to_remove.transaction, scoring)
});
Ok(Some(to_remove.transaction))
} else {
Ok(None)
}
}
/// Removes transaction from sender's transaction `HashMap`.
fn remove_from_set<R, F: FnOnce(&mut Transactions<T, S>, &S) -> R>(&mut self, sender: &T::Sender, f: F) -> Option<R> {
let (prev, next, result) = if let Some(set) = self.transactions.get_mut(sender) {
let prev = set.worst_and_best();
let result = f(set, &self.scoring);
(prev, set.worst_and_best(), result)
} else {
return None;
};
self.update_senders_worst_and_best(prev, next);
Some(result)
}
/// Clears pool from all transactions.
/// This causes a listener notification that all transactions were dropped.
/// NOTE: the drop-notification order will be arbitrary.
pub fn clear(&mut self) {
self.mem_usage = 0;
self.transactions.clear();
self.best_transactions.clear();
self.worst_transactions.clear();
for (_hash, tx) in self.by_hash.drain() {
self.listener.dropped(&tx.transaction, None)
}
}
/// Removes single transaction from the pool.
/// Depending on the `is_invalid` flag the listener
/// will either get a `cancelled` or `invalid` notification.
pub fn remove(&mut self, hash: &T::Hash, is_invalid: bool) -> Option<Arc<T>> {
if let Some(tx) = self.finalize_remove(hash) {
self.remove_from_set(tx.sender(), |set, scoring| {
set.remove(&tx, scoring)
});
if is_invalid {
self.listener.invalid(&tx);
} else {
self.listener.canceled(&tx);
}
Some(tx)
} else {
None
}
}
/// Removes all stalled transactions from given sender.
fn remove_stalled<R: Ready<T>>(&mut self, sender: &T::Sender, ready: &mut R) -> usize {
let removed_from_set = self.remove_from_set(sender, |transactions, scoring| {
transactions.cull(ready, scoring)
});
match removed_from_set {
Some(removed) => {
let len = removed.len();
for tx in removed {
self.finalize_remove(tx.hash());
self.listener.culled(&tx);
}
len
},
None => 0,
}
}
/// Removes all stalled transactions from given sender list (or from all senders).
pub fn cull<R: Ready<T>>(&mut self, senders: Option<&[T::Sender]>, mut ready: R) -> usize {
let mut removed = 0;
match senders {
Some(senders) => {
for sender in senders {
removed += self.remove_stalled(sender, &mut ready);
}
},
None => {
let senders = self.transactions.keys().cloned().collect::<Vec<_>>();
for sender in senders {
removed += self.remove_stalled(&sender, &mut ready);
}
},
}
removed
}
/// Returns a transaction if it's part of the pool or `None` otherwise.
pub fn find(&self, hash: &T::Hash) -> Option<Arc<T>> {
self.by_hash.get(hash).map(|t| t.transaction.clone())
}
/// Returns worst transaction in the queue (if any).
pub fn worst_transaction(&self) -> Option<Arc<T>> {
self.worst_transactions.iter().next_back().map(|x| x.transaction.transaction.clone())
}
/// Returns true if the pool is at it's capacity.
pub fn is_full(&self) -> bool {
self.by_hash.len() >= self.options.max_count
|| self.mem_usage >= self.options.max_mem_usage
}
/// Returns senders ordered by priority of their transactions.
pub fn senders(&self) -> impl Iterator<Item=&T::Sender> {
self.best_transactions.iter().map(|tx| tx.transaction.sender())
}
/// Returns an iterator of pending (ready) transactions.
pub fn pending<R: Ready<T>>(&self, ready: R) -> PendingIterator<T, R, S, L> {
PendingIterator {
ready,
best_transactions: self.best_transactions.clone(),
pool: self,
}
}
/// Returns pending (ready) transactions from given sender.
pub fn pending_from_sender<R: Ready<T>>(&self, ready: R, sender: &T::Sender) -> PendingIterator<T, R, S, L> {
let best_transactions = self.transactions.get(sender)
.and_then(|transactions| transactions.worst_and_best())
.map(|(_, best)| ScoreWithRef::new(best.0, best.1))
.map(|s| {
let mut set = BTreeSet::new();
set.insert(s);
set
})
.unwrap_or_default();
PendingIterator {
ready,
best_transactions,
pool: self,
}
}
/// Returns unprioritized list of ready transactions.
pub fn unordered_pending<R: Ready<T>>(&self, ready: R) -> UnorderedIterator<T, R, S> {
UnorderedIterator {
ready,
senders: self.transactions.iter(),
transactions: None,
}
}
/// Update score of transactions of a particular sender.
pub fn update_scores(&mut self, sender: &T::Sender, event: S::Event) {
let res = if let Some(set) = self.transactions.get_mut(sender) {
let prev = set.worst_and_best();
set.update_scores(&self.scoring, event);
let current = set.worst_and_best();
Some((prev, current))
} else {
None
};
if let Some((prev, current)) = res {
self.update_senders_worst_and_best(prev, current);
}
}
/// Computes the full status of the pool (including readiness).
pub fn status<R: Ready<T>>(&self, mut ready: R) -> Status {
let mut status = Status::default();
for (_sender, transactions) in &self.transactions {
let len = transactions.len();
for (idx, tx) in transactions.iter().enumerate() {
match ready.is_ready(tx) {
Readiness::Stale => status.stalled += 1,
Readiness::Ready => status.pending += 1,
Readiness::Future => {
status.future += len - idx;
break;
}
}
}
}
status
}
/// Returns light status of the pool.
pub fn light_status(&self) -> LightStatus {
LightStatus {
mem_usage: self.mem_usage,
transaction_count: self.by_hash.len(),
senders: self.transactions.len(),
}
}
/// Returns current pool options.
pub fn options(&self) -> Options {
self.options.clone()
}
/// Borrows listener instance.
pub fn listener(&self) -> &L {
&self.listener
}
/// Borrows scoring instance.
pub fn scoring(&self) -> &S {
&self.scoring
}
/// Borrows listener mutably.
pub fn listener_mut(&mut self) -> &mut L {
&mut self.listener
}
}
/// An iterator over all pending (ready) transactions in unoredered fashion.
///
/// NOTE: Current implementation will iterate over all transactions from particular sender
/// ordered by nonce, but that might change in the future.
///
/// NOTE: the transactions are not removed from the queue.
/// You might remove them later by calling `cull`.
pub struct UnorderedIterator<'a, T, R, S> where
T: VerifiedTransaction + 'a,
S: Scoring<T> + 'a,
{
ready: R,
senders: hash_map::Iter<'a, T::Sender, Transactions<T, S>>,
transactions: Option<slice::Iter<'a, Transaction<T>>>,
}
impl<'a, T, R, S> Iterator for UnorderedIterator<'a, T, R, S> where
T: VerifiedTransaction,
R: Ready<T>,
S: Scoring<T>,
{
type Item = Arc<T>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(transactions) = self.transactions.as_mut() {
if let Some(tx) = transactions.next() {
match self.ready.is_ready(&tx) {
Readiness::Ready => {
return Some(tx.transaction.clone());
},
state => trace!("[{:?}] Ignoring {:?} transaction.", tx.hash(), state),
}
}
}
// otherwise fallback and try next sender
let next_sender = self.senders.next()?;
self.transactions = Some(next_sender.1.iter());
}
}
}
/// An iterator over all pending (ready) transactions.
/// NOTE: the transactions are not removed from the queue.
/// You might remove them later by calling `cull`.
pub struct PendingIterator<'a, T, R, S, L> where
T: VerifiedTransaction + 'a,
S: Scoring<T> + 'a,
L: 'a,
{
ready: R,
best_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
pool: &'a Pool<T, S, L>,
}
impl<'a, T, R, S, L> Iterator for PendingIterator<'a, T, R, S, L> where
T: VerifiedTransaction,
R: Ready<T>,
S: Scoring<T>,
{
type Item = Arc<T>;
fn next(&mut self) -> Option<Self::Item> {
while !self.best_transactions.is_empty() {
let best = {
let best = self.best_transactions.iter().next().expect("current_best is not empty; qed").clone();
self.best_transactions.take(&best).expect("Just taken from iterator; qed")
};
match self.ready.is_ready(&best.transaction) {
Readiness::Ready => {
// retrieve next one from that sender.
let next = self.pool.transactions
.get(best.transaction.sender())
.and_then(|s| s.find_next(&best.transaction, &self.pool.scoring));
if let Some((score, tx)) = next {
self.best_transactions.insert(ScoreWithRef::new(score, tx));
}
return Some(best.transaction.transaction)
},
state => trace!("[{:?}] Ignoring {:?} transaction.", best.transaction.hash(), state),
}
}
None
}
}

View File

@ -0,0 +1,54 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
/// Transaction readiness.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Readiness {
/// The transaction is stale (and should/will be removed from the pool).
Stale,
/// The transaction is ready to be included in pending set.
Ready,
/// The transaction is not yet ready.
Future,
}
/// A readiness indicator.
pub trait Ready<T> {
/// Returns true if transaction is ready to be included in pending block,
/// given all previous transactions that were ready are already included.
///
/// NOTE: readiness of transactions will be checked according to `Score` ordering,
/// the implementation should maintain a state of already checked transactions.
fn is_ready(&mut self, tx: &T) -> Readiness;
}
impl<T, F> Ready<T> for F where F: FnMut(&T) -> Readiness {
fn is_ready(&mut self, tx: &T) -> Readiness {
(*self)(tx)
}
}
impl<T, A, B> Ready<T> for (A, B) where
A: Ready<T>,
B: Ready<T>,
{
fn is_ready(&mut self, tx: &T) -> Readiness {
match self.0.is_ready(tx) {
Readiness::Ready => self.1.is_ready(tx),
r => r,
}
}
}

View File

@ -0,0 +1,157 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! A transactions ordering abstraction.
use std::{cmp, fmt};
use pool::Transaction;
/// Represents a decision what to do with
/// a new transaction that tries to enter the pool.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Choice {
/// New transaction should be rejected
/// (i.e. the old transaction that occupies the same spot
/// is better).
RejectNew,
/// The old transaction should be dropped
/// in favour of the new one.
ReplaceOld,
/// The new transaction should be inserted
/// and both (old and new) should stay in the pool.
InsertNew,
}
/// Describes a reason why the `Score` of transactions
/// should be updated.
/// The `Scoring` implementations can use this information
/// to update the `Score` table more efficiently.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Change<T = ()> {
/// New transaction has been inserted at given index.
/// The Score at that index is initialized with default value
/// and needs to be filled in.
InsertedAt(usize),
/// The transaction has been removed at given index and other transactions
/// shifted to it's place.
/// The scores were removed and shifted as well.
/// For simple scoring algorithms no action is required here.
RemovedAt(usize),
/// The transaction at given index has replaced a previous transaction.
/// The score at that index needs to be update (it contains value from previous transaction).
ReplacedAt(usize),
/// Given number of stalled transactions has been culled from the beginning.
/// The scores has been removed from the beginning as well.
/// For simple scoring algorithms no action is required here.
Culled(usize),
/// Custom event to update the score triggered outside of the pool.
/// Handling this event is up to scoring implementation.
Event(T),
}
/// A transaction ordering.
///
/// The implementation should decide on order of transactions in the pool.
/// Each transaction should also get assigned a `Score` which is used to later
/// prioritize transactions in the pending set.
///
/// Implementation notes:
/// - Returned `Score`s should match ordering of `compare` method.
/// - `compare` will be called only within a context of transactions from the same sender.
/// - `choose` may be called even if `compare` returns `Ordering::Equal`
/// - `should_replace` is used to decide if new transaction should push out an old transaction already in the queue.
/// - `Score`s and `compare` should align with `Ready` implementation.
///
/// Example: Natural ordering of Ethereum transactions.
/// - `compare`: compares transaction `nonce` ()
/// - `choose`: compares transactions `gasPrice` (decides if old transaction should be replaced)
/// - `update_scores`: score defined as `gasPrice` if `n==0` and `max(scores[n-1], gasPrice)` if `n>0`
/// - `should_replace`: compares `gasPrice` (decides if transaction from a different sender is more valuable)
///
pub trait Scoring<T>: fmt::Debug {
/// A score of a transaction.
type Score: cmp::Ord + Clone + Default + fmt::Debug + Send + fmt::LowerHex;
/// Custom scoring update event type.
type Event: fmt::Debug;
/// Decides on ordering of `T`s from a particular sender.
fn compare(&self, old: &T, other: &T) -> cmp::Ordering;
/// Decides how to deal with two transactions from a sender that seem to occupy the same slot in the queue.
fn choose(&self, old: &T, new: &T) -> Choice;
/// Updates the transaction scores given a list of transactions and a change to previous scoring.
/// NOTE: you can safely assume that both slices have the same length.
/// (i.e. score at index `i` represents transaction at the same index)
fn update_scores(&self, txs: &[Transaction<T>], scores: &mut [Self::Score], change: Change<Self::Event>);
/// Decides if `new` should push out `old` transaction from the pool.
///
/// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits.
fn should_replace(&self, old: &T, new: &T) -> Choice;
/// Decides if the transaction should ignore per-sender limit in the pool.
///
/// If you return `true` for given transaction it's going to be accepted even though
/// the per-sender limit is exceeded.
fn should_ignore_sender_limit(&self, _new: &T) -> bool { false }
}
/// A score with a reference to the transaction.
#[derive(Debug)]
pub struct ScoreWithRef<T, S> {
/// Score
pub score: S,
/// Shared transaction
pub transaction: Transaction<T>,
}
impl<T, S> ScoreWithRef<T, S> {
/// Creates a new `ScoreWithRef`
pub fn new(score: S, transaction: Transaction<T>) -> Self {
ScoreWithRef { score, transaction }
}
}
impl<T, S: Clone> Clone for ScoreWithRef<T, S> {
fn clone(&self) -> Self {
ScoreWithRef {
score: self.score.clone(),
transaction: self.transaction.clone(),
}
}
}
impl<S: cmp::Ord, T> Ord for ScoreWithRef<T, S> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
other.score.cmp(&self.score)
.then(other.transaction.insertion_id.cmp(&self.transaction.insertion_id))
}
}
impl<S: cmp::Ord, T> PartialOrd for ScoreWithRef<T, S> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<S: cmp::Ord, T> PartialEq for ScoreWithRef<T, S> {
fn eq(&self, other: &Self) -> bool {
self.score == other.score && self.transaction.insertion_id == other.transaction.insertion_id
}
}
impl<S: cmp::Ord, T> Eq for ScoreWithRef<T, S> {}

View File

@ -0,0 +1,40 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
/// Light pool status.
/// This status is cheap to compute and can be called frequently.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct LightStatus {
/// Memory usage in bytes.
pub mem_usage: usize,
/// Total number of transactions in the pool.
pub transaction_count: usize,
/// Number of unique senders in the pool.
pub senders: usize,
}
/// A full queue status.
/// To compute this status it is required to provide `Ready`.
/// NOTE: To compute the status we need to visit each transaction in the pool.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct Status {
/// Number of stalled transactions.
pub stalled: usize,
/// Number of pending (ready) transactions.
pub pending: usize,
/// Number of future (not ready) transactions.
pub future: usize,
}

View File

@ -0,0 +1,110 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::cmp;
use std::collections::HashMap;
use ethereum_types::{H160 as Sender, U256};
use {pool, scoring, Scoring, Ready, Readiness};
use super::Transaction;
#[derive(Debug, Default)]
pub struct DummyScoring {
always_insert: bool,
}
impl DummyScoring {
pub fn always_insert() -> Self {
DummyScoring {
always_insert: true,
}
}
}
impl Scoring<Transaction> for DummyScoring {
type Score = U256;
type Event = ();
fn compare(&self, old: &Transaction, new: &Transaction) -> cmp::Ordering {
old.nonce.cmp(&new.nonce)
}
fn choose(&self, old: &Transaction, new: &Transaction) -> scoring::Choice {
if old.nonce == new.nonce {
if new.gas_price > old.gas_price {
scoring::Choice::ReplaceOld
} else {
scoring::Choice::RejectNew
}
} else {
scoring::Choice::InsertNew
}
}
fn update_scores(&self, txs: &[pool::Transaction<Transaction>], scores: &mut [Self::Score], change: scoring::Change) {
if let scoring::Change::Event(_) = change {
// In case of event reset all scores to 0
for i in 0..txs.len() {
scores[i] = 0.into();
}
} else {
// Set to a gas price otherwise
for i in 0..txs.len() {
scores[i] = txs[i].gas_price;
}
}
}
fn should_replace(&self, old: &Transaction, new: &Transaction) -> scoring::Choice {
if self.always_insert {
scoring::Choice::InsertNew
} else if new.gas_price > old.gas_price {
scoring::Choice::ReplaceOld
} else {
scoring::Choice::RejectNew
}
}
fn should_ignore_sender_limit(&self, _new: &Transaction) -> bool {
self.always_insert
}
}
#[derive(Default)]
pub struct NonceReady(HashMap<Sender, U256>, U256);
impl NonceReady {
pub fn new<T: Into<U256>>(min: T) -> Self {
let mut n = NonceReady::default();
n.1 = min.into();
n
}
}
impl Ready<Transaction> for NonceReady {
fn is_ready(&mut self, tx: &Transaction) -> Readiness {
let min = self.1;
let nonce = self.0.entry(tx.sender).or_insert_with(|| min);
match tx.nonce.cmp(nonce) {
cmp::Ordering::Greater => Readiness::Future,
cmp::Ordering::Equal => {
*nonce += 1.into();
Readiness::Ready
},
cmp::Ordering::Less => Readiness::Stale,
}
}
}

View File

@ -0,0 +1,748 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
mod helpers;
mod tx_builder;
use self::helpers::{DummyScoring, NonceReady};
use self::tx_builder::TransactionBuilder;
use std::sync::Arc;
use ethereum_types::{H256, U256, Address};
use super::*;
#[derive(Debug, PartialEq)]
pub struct Transaction {
pub hash: H256,
pub nonce: U256,
pub gas_price: U256,
pub gas: U256,
pub sender: Address,
pub mem_usage: usize,
}
impl VerifiedTransaction for Transaction {
type Hash = H256;
type Sender = Address;
fn hash(&self) -> &H256 { &self.hash }
fn mem_usage(&self) -> usize { self.mem_usage }
fn sender(&self) -> &Address { &self.sender }
}
pub type SharedTransaction = Arc<Transaction>;
type TestPool = Pool<Transaction, DummyScoring>;
impl TestPool {
pub fn with_limit(max_count: usize) -> Self {
Self::with_options(Options {
max_count,
..Default::default()
})
}
}
#[test]
fn should_clear_queue() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
assert_eq!(txq.light_status(), LightStatus {
mem_usage: 0,
transaction_count: 0,
senders: 0,
});
let tx1 = b.tx().nonce(0).new();
let tx2 = b.tx().nonce(1).mem_usage(1).new();
// add
txq.import(tx1).unwrap();
txq.import(tx2).unwrap();
assert_eq!(txq.light_status(), LightStatus {
mem_usage: 1,
transaction_count: 2,
senders: 1,
});
// when
txq.clear();
// then
assert_eq!(txq.light_status(), LightStatus {
mem_usage: 0,
transaction_count: 0,
senders: 0,
});
}
#[test]
fn should_not_allow_same_transaction_twice() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
let tx1 = b.tx().nonce(0).new();
let tx2 = b.tx().nonce(0).new();
// when
txq.import(tx1).unwrap();
txq.import(tx2).unwrap_err();
// then
assert_eq!(txq.light_status().transaction_count, 1);
}
#[test]
fn should_replace_transaction() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
let tx1 = b.tx().nonce(0).gas_price(1).new();
let tx2 = b.tx().nonce(0).gas_price(2).new();
// when
txq.import(tx1).unwrap();
txq.import(tx2).unwrap();
// then
assert_eq!(txq.light_status().transaction_count, 1);
}
#[test]
fn should_reject_if_above_count() {
let b = TransactionBuilder::default();
let mut txq = TestPool::with_options(Options {
max_count: 1,
..Default::default()
});
// Reject second
let tx1 = b.tx().nonce(0).new();
let tx2 = b.tx().nonce(1).new();
let hash = format!("{:?}", tx2.hash());
txq.import(tx1).unwrap();
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
assert_eq!(txq.light_status().transaction_count, 1);
txq.clear();
// Replace first
let tx1 = b.tx().nonce(0).new();
let tx2 = b.tx().nonce(0).sender(1).gas_price(2).new();
txq.import(tx1).unwrap();
txq.import(tx2).unwrap();
assert_eq!(txq.light_status().transaction_count, 1);
}
#[test]
fn should_reject_if_above_mem_usage() {
let b = TransactionBuilder::default();
let mut txq = TestPool::with_options(Options {
max_mem_usage: 1,
..Default::default()
});
// Reject second
let tx1 = b.tx().nonce(1).mem_usage(1).new();
let tx2 = b.tx().nonce(2).mem_usage(2).new();
let hash = format!("{:?}", tx2.hash());
txq.import(tx1).unwrap();
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
assert_eq!(txq.light_status().transaction_count, 1);
txq.clear();
// Replace first
let tx1 = b.tx().nonce(1).mem_usage(1).new();
let tx2 = b.tx().nonce(1).sender(1).gas_price(2).mem_usage(1).new();
txq.import(tx1).unwrap();
txq.import(tx2).unwrap();
assert_eq!(txq.light_status().transaction_count, 1);
}
#[test]
fn should_reject_if_above_sender_count() {
let b = TransactionBuilder::default();
let mut txq = TestPool::with_options(Options {
max_per_sender: 1,
..Default::default()
});
// Reject second
let tx1 = b.tx().nonce(1).new();
let tx2 = b.tx().nonce(2).new();
let hash = format!("{:x}", tx2.hash());
txq.import(tx1).unwrap();
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
assert_eq!(txq.light_status().transaction_count, 1);
txq.clear();
// Replace first
let tx1 = b.tx().nonce(1).new();
let tx2 = b.tx().nonce(2).gas_price(2).new();
let hash = format!("{:x}", tx2.hash());
txq.import(tx1).unwrap();
// This results in error because we also compare nonces
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
assert_eq!(txq.light_status().transaction_count, 1);
}
#[test]
fn should_construct_pending() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
let tx0 = txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
let tx1 = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap();
let tx2 = txq.import(b.tx().nonce(2).new()).unwrap();
// this transaction doesn't get to the block despite high gas price
// because of block gas limit and simplistic ordering algorithm.
txq.import(b.tx().nonce(3).gas_price(4).new()).unwrap();
//gap
txq.import(b.tx().nonce(5).new()).unwrap();
let tx5 = txq.import(b.tx().sender(1).nonce(0).new()).unwrap();
let tx6 = txq.import(b.tx().sender(1).nonce(1).new()).unwrap();
let tx7 = txq.import(b.tx().sender(1).nonce(2).new()).unwrap();
let tx8 = txq.import(b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap();
// gap
txq.import(b.tx().sender(1).nonce(5).new()).unwrap();
let tx9 = txq.import(b.tx().sender(2).nonce(0).new()).unwrap();
assert_eq!(txq.light_status().transaction_count, 11);
assert_eq!(txq.status(NonceReady::default()), Status {
stalled: 0,
pending: 9,
future: 2,
});
assert_eq!(txq.status(NonceReady::new(1)), Status {
stalled: 3,
pending: 6,
future: 2,
});
// when
let mut current_gas = U256::zero();
let limit = (21_000 * 8).into();
let mut pending = txq.pending(NonceReady::default()).take_while(|tx| {
let should_take = tx.gas + current_gas <= limit;
if should_take {
current_gas = current_gas + tx.gas
}
should_take
});
assert_eq!(pending.next(), Some(tx0));
assert_eq!(pending.next(), Some(tx1));
assert_eq!(pending.next(), Some(tx9));
assert_eq!(pending.next(), Some(tx5));
assert_eq!(pending.next(), Some(tx6));
assert_eq!(pending.next(), Some(tx7));
assert_eq!(pending.next(), Some(tx8));
assert_eq!(pending.next(), Some(tx2));
assert_eq!(pending.next(), None);
}
#[test]
fn should_return_unordered_iterator() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
let tx0 = txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
let tx1 = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap();
let tx2 = txq.import(b.tx().nonce(2).new()).unwrap();
let tx3 = txq.import(b.tx().nonce(3).gas_price(4).new()).unwrap();
//gap
txq.import(b.tx().nonce(5).new()).unwrap();
let tx5 = txq.import(b.tx().sender(1).nonce(0).new()).unwrap();
let tx6 = txq.import(b.tx().sender(1).nonce(1).new()).unwrap();
let tx7 = txq.import(b.tx().sender(1).nonce(2).new()).unwrap();
let tx8 = txq.import(b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap();
// gap
txq.import(b.tx().sender(1).nonce(5).new()).unwrap();
let tx9 = txq.import(b.tx().sender(2).nonce(0).new()).unwrap();
assert_eq!(txq.light_status().transaction_count, 11);
assert_eq!(txq.status(NonceReady::default()), Status {
stalled: 0,
pending: 9,
future: 2,
});
assert_eq!(txq.status(NonceReady::new(1)), Status {
stalled: 3,
pending: 6,
future: 2,
});
// when
let all: Vec<_> = txq.unordered_pending(NonceReady::default()).collect();
let chain1 = vec![tx0, tx1, tx2, tx3];
let chain2 = vec![tx5, tx6, tx7, tx8];
let chain3 = vec![tx9];
assert_eq!(all.len(), chain1.len() + chain2.len() + chain3.len());
let mut options = vec![
vec![chain1.clone(), chain2.clone(), chain3.clone()],
vec![chain2.clone(), chain1.clone(), chain3.clone()],
vec![chain2.clone(), chain3.clone(), chain1.clone()],
vec![chain3.clone(), chain2.clone(), chain1.clone()],
vec![chain3.clone(), chain1.clone(), chain2.clone()],
vec![chain1.clone(), chain3.clone(), chain2.clone()],
].into_iter().map(|mut v| {
let mut first = v.pop().unwrap();
for mut x in v {
first.append(&mut x);
}
first
});
assert!(options.any(|opt| all == opt));
}
#[test]
fn should_update_scoring_correctly() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
let tx0 = txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
let tx1 = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap();
let tx2 = txq.import(b.tx().nonce(2).new()).unwrap();
// this transaction doesn't get to the block despite high gas price
// because of block gas limit and simplistic ordering algorithm.
txq.import(b.tx().nonce(3).gas_price(4).new()).unwrap();
//gap
txq.import(b.tx().nonce(5).new()).unwrap();
let tx5 = txq.import(b.tx().sender(1).nonce(0).new()).unwrap();
let tx6 = txq.import(b.tx().sender(1).nonce(1).new()).unwrap();
let tx7 = txq.import(b.tx().sender(1).nonce(2).new()).unwrap();
let tx8 = txq.import(b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap();
// gap
txq.import(b.tx().sender(1).nonce(5).new()).unwrap();
let tx9 = txq.import(b.tx().sender(2).nonce(0).new()).unwrap();
assert_eq!(txq.light_status().transaction_count, 11);
assert_eq!(txq.status(NonceReady::default()), Status {
stalled: 0,
pending: 9,
future: 2,
});
assert_eq!(txq.status(NonceReady::new(1)), Status {
stalled: 3,
pending: 6,
future: 2,
});
txq.update_scores(&0.into(), ());
// when
let mut current_gas = U256::zero();
let limit = (21_000 * 8).into();
let mut pending = txq.pending(NonceReady::default()).take_while(|tx| {
let should_take = tx.gas + current_gas <= limit;
if should_take {
current_gas = current_gas + tx.gas
}
should_take
});
assert_eq!(pending.next(), Some(tx9));
assert_eq!(pending.next(), Some(tx5));
assert_eq!(pending.next(), Some(tx6));
assert_eq!(pending.next(), Some(tx7));
assert_eq!(pending.next(), Some(tx8));
// penalized transactions
assert_eq!(pending.next(), Some(tx0));
assert_eq!(pending.next(), Some(tx1));
assert_eq!(pending.next(), Some(tx2));
assert_eq!(pending.next(), None);
}
#[test]
fn should_remove_transaction() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
let tx1 = txq.import(b.tx().nonce(0).new()).unwrap();
let tx2 = txq.import(b.tx().nonce(1).new()).unwrap();
txq.import(b.tx().nonce(2).new()).unwrap();
assert_eq!(txq.light_status().transaction_count, 3);
// when
assert!(txq.remove(&tx2.hash(), false).is_some());
// then
assert_eq!(txq.light_status().transaction_count, 2);
let mut pending = txq.pending(NonceReady::default());
assert_eq!(pending.next(), Some(tx1));
assert_eq!(pending.next(), None);
}
#[test]
fn should_cull_stalled_transactions() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
txq.import(b.tx().nonce(1).new()).unwrap();
txq.import(b.tx().nonce(3).new()).unwrap();
txq.import(b.tx().sender(1).nonce(0).new()).unwrap();
txq.import(b.tx().sender(1).nonce(1).new()).unwrap();
txq.import(b.tx().sender(1).nonce(5).new()).unwrap();
assert_eq!(txq.status(NonceReady::new(1)), Status {
stalled: 2,
pending: 2,
future: 2,
});
// when
assert_eq!(txq.cull(None, NonceReady::new(1)), 2);
// then
assert_eq!(txq.status(NonceReady::new(1)), Status {
stalled: 0,
pending: 2,
future: 2,
});
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 4,
senders: 2,
mem_usage: 0,
});
}
#[test]
fn should_cull_stalled_transactions_from_a_sender() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
txq.import(b.tx().nonce(1).new()).unwrap();
txq.import(b.tx().sender(1).nonce(0).new()).unwrap();
txq.import(b.tx().sender(1).nonce(1).new()).unwrap();
txq.import(b.tx().sender(1).nonce(2).new()).unwrap();
assert_eq!(txq.status(NonceReady::new(2)), Status {
stalled: 4,
pending: 1,
future: 0,
});
// when
let sender = 0.into();
assert_eq!(txq.cull(Some(&[sender]), NonceReady::new(2)), 2);
// then
assert_eq!(txq.status(NonceReady::new(2)), Status {
stalled: 2,
pending: 1,
future: 0,
});
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 3,
senders: 1,
mem_usage: 0,
});
}
#[test]
fn should_re_insert_after_cull() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
txq.import(b.tx().nonce(1).new()).unwrap();
txq.import(b.tx().sender(1).nonce(0).new()).unwrap();
txq.import(b.tx().sender(1).nonce(1).new()).unwrap();
assert_eq!(txq.status(NonceReady::new(1)), Status {
stalled: 2,
pending: 2,
future: 0,
});
// when
assert_eq!(txq.cull(None, NonceReady::new(1)), 2);
assert_eq!(txq.status(NonceReady::new(1)), Status {
stalled: 0,
pending: 2,
future: 0,
});
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
txq.import(b.tx().sender(1).nonce(0).new()).unwrap();
assert_eq!(txq.status(NonceReady::new(1)), Status {
stalled: 2,
pending: 2,
future: 0,
});
}
#[test]
fn should_return_worst_transaction() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::default();
assert!(txq.worst_transaction().is_none());
// when
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
txq.import(b.tx().sender(1).nonce(0).gas_price(4).new()).unwrap();
// then
assert_eq!(txq.worst_transaction().unwrap().gas_price, 4.into());
}
#[test]
fn should_return_is_full() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::with_limit(2);
assert!(!txq.is_full());
// when
txq.import(b.tx().nonce(0).gas_price(110).new()).unwrap();
assert!(!txq.is_full());
txq.import(b.tx().sender(1).nonce(0).gas_price(100).new()).unwrap();
// then
assert!(txq.is_full());
}
#[test]
fn should_import_even_if_limit_is_reached_and_should_replace_returns_insert_new() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::with_scoring(DummyScoring::always_insert(), Options {
max_count: 1,
..Default::default()
});
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 1,
senders: 1,
mem_usage: 0,
});
// when
txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap();
// then
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 2,
senders: 1,
mem_usage: 0,
});
}
#[test]
fn should_not_import_even_if_limit_is_reached_and_should_replace_returns_false() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::with_scoring(DummyScoring::default(), Options {
max_count: 1,
..Default::default()
});
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 1,
senders: 1,
mem_usage: 0,
});
// when
let err = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap_err();
// then
assert_eq!(err.kind(),
&error::ErrorKind::TooCheapToEnter("0x00000000000000000000000000000000000000000000000000000000000001f5".into(), "0x5".into()));
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 1,
senders: 1,
mem_usage: 0,
});
}
#[test]
fn should_import_even_if_sender_limit_is_reached() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::with_scoring(DummyScoring::always_insert(), Options {
max_count: 1,
max_per_sender: 1,
..Default::default()
});
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 1,
senders: 1,
mem_usage: 0,
});
// when
txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap();
// then
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 2,
senders: 1,
mem_usage: 0,
});
}
mod listener {
use std::cell::RefCell;
use std::rc::Rc;
use super::*;
#[derive(Default)]
struct MyListener(pub Rc<RefCell<Vec<&'static str>>>);
impl Listener<Transaction> for MyListener {
fn added(&mut self, _tx: &SharedTransaction, old: Option<&SharedTransaction>) {
self.0.borrow_mut().push(if old.is_some() { "replaced" } else { "added" });
}
fn rejected(&mut self, _tx: &SharedTransaction, _reason: &error::ErrorKind) {
self.0.borrow_mut().push("rejected".into());
}
fn dropped(&mut self, _tx: &SharedTransaction, _new: Option<&Transaction>) {
self.0.borrow_mut().push("dropped".into());
}
fn invalid(&mut self, _tx: &SharedTransaction) {
self.0.borrow_mut().push("invalid".into());
}
fn canceled(&mut self, _tx: &SharedTransaction) {
self.0.borrow_mut().push("canceled".into());
}
fn culled(&mut self, _tx: &SharedTransaction) {
self.0.borrow_mut().push("culled".into());
}
}
#[test]
fn insert_transaction() {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring::default(), Options {
max_per_sender: 1,
max_count: 2,
..Default::default()
});
assert!(results.borrow().is_empty());
// Regular import
txq.import(b.tx().nonce(1).new()).unwrap();
assert_eq!(*results.borrow(), &["added"]);
// Already present (no notification)
txq.import(b.tx().nonce(1).new()).unwrap_err();
assert_eq!(*results.borrow(), &["added"]);
// Push out the first one
txq.import(b.tx().nonce(1).gas_price(1).new()).unwrap();
assert_eq!(*results.borrow(), &["added", "replaced"]);
// Reject
txq.import(b.tx().nonce(1).new()).unwrap_err();
assert_eq!(*results.borrow(), &["added", "replaced", "rejected"]);
results.borrow_mut().clear();
// Different sender (accept)
txq.import(b.tx().sender(1).nonce(1).gas_price(2).new()).unwrap();
assert_eq!(*results.borrow(), &["added"]);
// Third sender push out low gas price
txq.import(b.tx().sender(2).nonce(1).gas_price(4).new()).unwrap();
assert_eq!(*results.borrow(), &["added", "dropped", "added"]);
// Reject (too cheap)
txq.import(b.tx().sender(2).nonce(1).gas_price(2).new()).unwrap_err();
assert_eq!(*results.borrow(), &["added", "dropped", "added", "rejected"]);
assert_eq!(txq.light_status().transaction_count, 2);
}
#[test]
fn remove_transaction() {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring::default(), Options::default());
// insert
let tx1 = txq.import(b.tx().nonce(1).new()).unwrap();
let tx2 = txq.import(b.tx().nonce(2).new()).unwrap();
// then
txq.remove(&tx1.hash(), false);
assert_eq!(*results.borrow(), &["added", "added", "canceled"]);
txq.remove(&tx2.hash(), true);
assert_eq!(*results.borrow(), &["added", "added", "canceled", "invalid"]);
assert_eq!(txq.light_status().transaction_count, 0);
}
#[test]
fn clear_queue() {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring::default(), Options::default());
// insert
txq.import(b.tx().nonce(1).new()).unwrap();
txq.import(b.tx().nonce(2).new()).unwrap();
// when
txq.clear();
// then
assert_eq!(*results.borrow(), &["added", "added", "dropped", "dropped"]);
}
#[test]
fn cull_stalled() {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring::default(), Options::default());
// insert
txq.import(b.tx().nonce(1).new()).unwrap();
txq.import(b.tx().nonce(2).new()).unwrap();
// when
txq.cull(None, NonceReady::new(3));
// then
assert_eq!(*results.borrow(), &["added", "added", "culled", "culled"]);
}
}

View File

@ -0,0 +1,64 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use super::{Transaction, U256, Address};
#[derive(Debug, Default, Clone)]
pub struct TransactionBuilder {
nonce: U256,
gas_price: U256,
gas: U256,
sender: Address,
mem_usage: usize,
}
impl TransactionBuilder {
pub fn tx(&self) -> Self {
self.clone()
}
pub fn nonce<T: Into<U256>>(mut self, nonce: T) -> Self {
self.nonce = nonce.into();
self
}
pub fn gas_price<T: Into<U256>>(mut self, gas_price: T) -> Self {
self.gas_price = gas_price.into();
self
}
pub fn sender<T: Into<Address>>(mut self, sender: T) -> Self {
self.sender = sender.into();
self
}
pub fn mem_usage(mut self, mem_usage: usize) -> Self {
self.mem_usage = mem_usage;
self
}
pub fn new(self) -> Transaction {
let hash = self.nonce ^ (U256::from(100) * self.gas_price) ^ (U256::from(100_000) * U256::from(self.sender.low_u64()));
Transaction {
hash: hash.into(),
nonce: self.nonce,
gas_price: self.gas_price,
gas: 21_000.into(),
sender: self.sender,
mem_usage: self.mem_usage,
}
}
}

View File

@ -0,0 +1,221 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::{fmt, mem};
use smallvec::SmallVec;
use ready::{Ready, Readiness};
use scoring::{self, Scoring};
use pool::Transaction;
#[derive(Debug)]
pub enum AddResult<T, S> {
Ok(T),
TooCheapToEnter(T, S),
TooCheap {
old: T,
new: T,
},
Replaced {
old: T,
new: T,
},
PushedOut {
old: T,
new: T,
},
}
/// Represents all transactions from a particular sender ordered by nonce.
const PER_SENDER: usize = 8;
#[derive(Debug)]
pub struct Transactions<T, S: Scoring<T>> {
// TODO [ToDr] Consider using something that doesn't require shifting all records.
transactions: SmallVec<[Transaction<T>; PER_SENDER]>,
scores: SmallVec<[S::Score; PER_SENDER]>,
}
impl<T, S: Scoring<T>> Default for Transactions<T, S> {
fn default() -> Self {
Transactions {
transactions: Default::default(),
scores: Default::default(),
}
}
}
impl<T: fmt::Debug, S: Scoring<T>> Transactions<T, S> {
pub fn is_empty(&self) -> bool {
self.transactions.is_empty()
}
pub fn len(&self) -> usize {
self.transactions.len()
}
pub fn iter(&self) -> ::std::slice::Iter<Transaction<T>> {
self.transactions.iter()
}
pub fn worst_and_best(&self) -> Option<((S::Score, Transaction<T>), (S::Score, Transaction<T>))> {
let len = self.scores.len();
self.scores.get(0).cloned().map(|best| {
let worst = self.scores[len - 1].clone();
let best_tx = self.transactions[0].clone();
let worst_tx = self.transactions[len - 1].clone();
((worst, worst_tx), (best, best_tx))
})
}
pub fn find_next(&self, tx: &T, scoring: &S) -> Option<(S::Score, Transaction<T>)> {
self.transactions.binary_search_by(|old| scoring.compare(old, &tx)).ok().and_then(|index| {
let index = index + 1;
if index < self.scores.len() {
Some((self.scores[index].clone(), self.transactions[index].clone()))
} else {
None
}
})
}
fn push_cheapest_transaction(&mut self, tx: Transaction<T>, scoring: &S, max_count: usize) -> AddResult<Transaction<T>, S::Score> {
let index = self.transactions.len();
if index == max_count && !scoring.should_ignore_sender_limit(&tx) {
let min_score = self.scores[index - 1].clone();
AddResult::TooCheapToEnter(tx, min_score)
} else {
self.transactions.push(tx.clone());
self.scores.push(Default::default());
scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::InsertedAt(index));
AddResult::Ok(tx)
}
}
pub fn update_scores(&mut self, scoring: &S, event: S::Event) {
scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::Event(event));
}
pub fn add(&mut self, new: Transaction<T>, scoring: &S, max_count: usize) -> AddResult<Transaction<T>, S::Score> {
let index = match self.transactions.binary_search_by(|old| scoring.compare(old, &new)) {
Ok(index) => index,
Err(index) => index,
};
// Insert at the end.
if index == self.transactions.len() {
return self.push_cheapest_transaction(new, scoring, max_count)
}
// Decide if the transaction should replace some other.
match scoring.choose(&self.transactions[index], &new) {
// New transaction should be rejected
scoring::Choice::RejectNew => AddResult::TooCheap {
old: self.transactions[index].clone(),
new,
},
// New transaction should be kept along with old ones.
scoring::Choice::InsertNew => {
self.transactions.insert(index, new.clone());
self.scores.insert(index, Default::default());
scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::InsertedAt(index));
if self.transactions.len() > max_count {
let old = self.transactions.pop().expect("len is non-zero");
self.scores.pop();
scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::RemovedAt(self.transactions.len()));
AddResult::PushedOut {
old,
new,
}
} else {
AddResult::Ok(new)
}
},
// New transaction is replacing some other transaction already in the queue.
scoring::Choice::ReplaceOld => {
let old = mem::replace(&mut self.transactions[index], new.clone());
scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::ReplacedAt(index));
AddResult::Replaced {
old,
new,
}
},
}
}
pub fn remove(&mut self, tx: &T, scoring: &S) -> bool {
let index = match self.transactions.binary_search_by(|old| scoring.compare(old, tx)) {
Ok(index) => index,
Err(_) => {
warn!("Attempting to remove non-existent transaction {:?}", tx);
return false;
},
};
self.transactions.remove(index);
self.scores.remove(index);
// Update scoring
scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::RemovedAt(index));
return true;
}
pub fn cull<R: Ready<T>>(&mut self, ready: &mut R, scoring: &S) -> SmallVec<[Transaction<T>; PER_SENDER]> {
let mut result = SmallVec::new();
if self.is_empty() {
return result;
}
let mut first_non_stalled = 0;
for tx in &self.transactions {
match ready.is_ready(tx) {
Readiness::Stale => {
first_non_stalled += 1;
},
Readiness::Ready | Readiness::Future => break,
}
}
if first_non_stalled == 0 {
return result;
}
// reverse the vectors to easily remove first elements.
self.transactions.reverse();
self.scores.reverse();
for _ in 0..first_non_stalled {
self.scores.pop();
result.push(
self.transactions.pop().expect("first_non_stalled is never greater than transactions.len(); qed")
);
}
self.transactions.reverse();
self.scores.reverse();
// update scoring
scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::Culled(result.len()));
// reverse the result to maintain correct order.
result.reverse();
result
}
}

View File

@ -0,0 +1,31 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use {VerifiedTransaction};
/// Transaction verification.
///
/// Verifier is responsible to decide if the transaction should even be considered for pool inclusion.
pub trait Verifier<U> {
/// Verification error.
type Error;
/// Verified transaction.
type VerifiedTransaction: VerifiedTransaction;
/// Verifies a `UnverifiedTransaction` and produces `VerifiedTransaction` instance.
fn verify_transaction(&self, tx: U) -> Result<Self::VerifiedTransaction, Self::Error>;
}