Never drop local transactions from different senders. (#9002)

This commit is contained in:
Tomasz Drwięga 2018-07-03 11:36:59 +02:00 committed by Afri Schoedon
parent 758a7bea9d
commit e22c3470d6
4 changed files with 114 additions and 27 deletions

View File

@ -22,7 +22,7 @@ use error;
use listener::{Listener, NoopListener};
use options::Options;
use ready::{Ready, Readiness};
use scoring::{Scoring, ScoreWithRef};
use scoring::{self, Scoring, ScoreWithRef};
use status::{LightStatus, Status};
use transactions::{AddResult, Transactions};
@ -139,7 +139,7 @@ impl<T, S, L> Pool<T, S, L> where
ensure!(!self.by_hash.contains_key(transaction.hash()), error::ErrorKind::AlreadyImported(format!("{:?}", transaction.hash())));
self.insertion_id += 1;
let mut transaction = Transaction {
let transaction = Transaction {
insertion_id: self.insertion_id,
transaction: Arc::new(transaction),
};
@ -148,27 +148,32 @@ impl<T, S, L> Pool<T, S, L> where
// Avoid using should_replace, but rather use scoring for that.
{
let remove_worst = |s: &mut Self, transaction| {
match s.remove_worst(&transaction) {
match s.remove_worst(transaction) {
Err(err) => {
s.listener.rejected(&transaction, err.kind());
s.listener.rejected(transaction, err.kind());
Err(err)
},
Ok(removed) => {
s.listener.dropped(&removed, Some(&transaction));
Ok(None) => Ok(false),
Ok(Some(removed)) => {
s.listener.dropped(&removed, Some(transaction));
s.finalize_remove(removed.hash());
Ok(transaction)
Ok(true)
},
}
};
while self.by_hash.len() + 1 > self.options.max_count {
trace!("Count limit reached: {} > {}", self.by_hash.len() + 1, self.options.max_count);
transaction = remove_worst(self, transaction)?;
if !remove_worst(self, &transaction)? {
break;
}
}
while self.mem_usage + mem_usage > self.options.max_mem_usage {
trace!("Mem limit reached: {} > {}", self.mem_usage + mem_usage, self.options.max_mem_usage);
transaction = remove_worst(self, transaction)?;
if !remove_worst(self, &transaction)? {
break;
}
}
}
@ -273,28 +278,38 @@ impl<T, S, L> Pool<T, S, L> where
}
/// Attempts to remove the worst transaction from the pool if it's worse than the given one.
fn remove_worst(&mut self, transaction: &Transaction<T>) -> error::Result<Transaction<T>> {
///
/// Returns `None` in case we couldn't decide if the transaction should replace the worst transaction or not.
/// In such case we will accept the transaction even though it is going to exceed the limit.
fn remove_worst(&mut self, transaction: &Transaction<T>) -> error::Result<Option<Transaction<T>>> {
let to_remove = match self.worst_transactions.iter().next_back() {
// No elements to remove? and the pool is still full?
None => {
warn!("The pool is full but there are no transactions to remove.");
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), "unknown".into()).into());
},
Some(old) => if self.scoring.should_replace(&old.transaction, transaction) {
Some(old) => match self.scoring.should_replace(&old.transaction, transaction) {
// We can't decide which of them should be removed, so accept both.
scoring::Choice::InsertNew => None,
// New transaction is better than the worst one so we can replace it.
old.clone()
} else {
scoring::Choice::ReplaceOld => Some(old.clone()),
// otherwise fail
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), format!("{:?}", old.score)).into())
scoring::Choice::RejectNew => {
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), format!("{:?}", old.score)).into())
},
},
};
// Remove from transaction set
self.remove_from_set(to_remove.transaction.sender(), |set, scoring| {
set.remove(&to_remove.transaction, scoring)
});
if let Some(to_remove) = to_remove {
// Remove from transaction set
self.remove_from_set(to_remove.transaction.sender(), |set, scoring| {
set.remove(&to_remove.transaction, scoring)
});
Ok(to_remove.transaction)
Ok(Some(to_remove.transaction))
} else {
Ok(None)
}
}
/// Removes transaction from sender's transaction `HashMap`.

View File

@ -99,7 +99,9 @@ pub trait Scoring<T>: fmt::Debug {
fn update_scores(&self, txs: &[Transaction<T>], scores: &mut [Self::Score], change: Change<Self::Event>);
/// Decides if `new` should push out `old` transaction from the pool.
fn should_replace(&self, old: &T, new: &T) -> bool;
///
/// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits.
fn should_replace(&self, old: &T, new: &T) -> Choice;
}
/// A score with a reference to the transaction.

View File

@ -22,7 +22,17 @@ use {pool, scoring, Scoring, Ready, Readiness};
use super::Transaction;
#[derive(Debug, Default)]
pub struct DummyScoring;
pub struct DummyScoring {
always_insert: bool,
}
impl DummyScoring {
pub fn always_insert() -> Self {
DummyScoring {
always_insert: true,
}
}
}
impl Scoring<Transaction> for DummyScoring {
type Score = U256;
@ -58,8 +68,14 @@ impl Scoring<Transaction> for DummyScoring {
}
}
fn should_replace(&self, old: &Transaction, new: &Transaction) -> bool {
new.gas_price > old.gas_price
fn should_replace(&self, old: &Transaction, new: &Transaction) -> scoring::Choice {
if self.always_insert {
scoring::Choice::InsertNew
} else if new.gas_price > old.gas_price {
scoring::Choice::ReplaceOld
} else {
scoring::Choice::RejectNew
}
}
}

View File

@ -537,6 +537,60 @@ fn should_return_is_full() {
assert!(txq.is_full());
}
#[test]
fn should_import_even_if_limit_is_reached_and_should_replace_returns_insert_new() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::with_scoring(DummyScoring::always_insert(), Options {
max_count: 1,
..Default::default()
});
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 1,
senders: 1,
mem_usage: 0,
});
// when
txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap();
// then
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 2,
senders: 1,
mem_usage: 0,
});
}
#[test]
fn should_not_import_even_if_limit_is_reached_and_should_replace_returns_false() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::with_scoring(DummyScoring::default(), Options {
max_count: 1,
..Default::default()
});
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 1,
senders: 1,
mem_usage: 0,
});
// when
let err = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap_err();
// then
assert_eq!(err.kind(),
&error::ErrorKind::TooCheapToEnter("0x00000000000000000000000000000000000000000000000000000000000001f5".into(), "0x5".into()));
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 1,
senders: 1,
mem_usage: 0,
});
}
mod listener {
use std::cell::RefCell;
use std::rc::Rc;
@ -577,7 +631,7 @@ mod listener {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring, Options {
let mut txq = Pool::new(listener, DummyScoring::default(), Options {
max_per_sender: 1,
max_count: 2,
..Default::default()
@ -615,7 +669,7 @@ mod listener {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring, Options::default());
let mut txq = Pool::new(listener, DummyScoring::default(), Options::default());
// insert
let tx1 = txq.import(b.tx().nonce(1).new()).unwrap();
@ -634,7 +688,7 @@ mod listener {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring, Options::default());
let mut txq = Pool::new(listener, DummyScoring::default(), Options::default());
// insert
txq.import(b.tx().nonce(1).new()).unwrap();
@ -652,7 +706,7 @@ mod listener {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring, Options::default());
let mut txq = Pool::new(listener, DummyScoring::default(), Options::default());
// insert
txq.import(b.tx().nonce(1).new()).unwrap();