Merge pull request #170 from ethcore/relay_mempool_transactions

Relay verified transactions (when moving to the memory pool)
This commit is contained in:
Nikolay Volf 2016-11-22 19:54:24 +03:00 committed by GitHub
commit 1c643ad8e8
2 changed files with 104 additions and 25 deletions

View File

@ -490,8 +490,13 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
/// Process new transaction.
fn on_peer_transaction(&mut self, _peer_index: usize, transaction: Transaction) -> Option<VecDeque<(H256, Transaction)>> {
self.process_peer_transaction(transaction)
fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction) -> Option<VecDeque<(H256, Transaction)>> {
let transaction_hash = transaction.hash();
// remember that peer has this transaction
self.peers.on_transaction_received(peer_index, &transaction_hash);
self.process_peer_transaction(Some(peer_index), transaction_hash, transaction)
}
/// Peer disconnected.
@ -636,8 +641,9 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
}
// deal with block transactions
for (_, tx) in insert_result.transactions_to_reverify {
self.process_peer_transaction(tx);
for (hash, tx) in insert_result.transactions_to_reverify {
// TODO: transactions from this blocks will be relayed. Do we need this?
self.process_peer_transaction(None, hash, tx);
}
},
Err(db::Error::Consistency(e)) => {
@ -673,6 +679,8 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
/// Process successful transaction verification
fn on_transaction_verification_success(&mut self, transaction: Transaction) {
let hash = transaction.hash();
{
// insert transaction to the memory pool
let mut chain = self.chain.write();
@ -686,6 +694,10 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
chain.insert_verified_transaction(transaction);
}
// relay transaction to peers
self.relay_new_transactions(vec![hash]);
}
/// Process failed transaction verification
fn on_transaction_verification_error(&mut self, err: &str, hash: &H256) {
warn!(target: "sync", "Transaction {:?} verification failed with error {:?}", hash.to_reversed_str(), err);
@ -790,6 +802,26 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
/// Relay new transactions
fn relay_new_transactions(&self, new_transactions_hashes: Vec<H256>) {
let mut executor = self.executor.lock();
// TODO: use all peers here (currently sync only)
for peer_index in self.peers.all_peers() {
let inventory: Vec<_> = new_transactions_hashes.iter()
.filter(|h| !self.peers.has_transaction_with_hash(peer_index, h))
.cloned()
.map(|h| InventoryVector {
inv_type: InventoryType::MessageTx,
hash: h,
})
.collect();
if !inventory.is_empty() {
executor.execute(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None));
}
}
}
/// Process new blocks inventory
fn process_new_blocks_headers(&mut self, peer_index: usize, mut hashes: Vec<H256>, mut headers: Vec<BlockHeader>) {
assert_eq!(hashes.len(), headers.len());
@ -925,14 +957,18 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
/// Process new peer transaction
fn process_peer_transaction(&mut self, transaction: Transaction) -> Option<VecDeque<(H256, Transaction)>> {
fn process_peer_transaction(&mut self, peer_index: Option<usize>, hash: H256, transaction: Transaction) -> Option<VecDeque<(H256, Transaction)>> {
// if we are in synchronization state, we will ignore this message
if self.state.is_synchronizing() {
return None;
}
// mark peer as useful (TODO: remove after self.all_peers() would be all peers, not sync one)
if let Some(peer_index) = peer_index {
self.peers.useful_peer(peer_index);
}
// else => verify transaction + it's orphans and then add to the memory pool
let hash = transaction.hash();
let mut chain = self.chain.write();
// if any parent transaction is unknown => we have orphan transaction => remember in orphan pool
@ -953,7 +989,6 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
for &(ref h, ref tx) in &transactons {
chain.verify_transaction(h.clone(), tx.clone());
}
Some(transactons)
}
@ -1835,4 +1870,21 @@ pub mod tests {
assert!(tasks.iter().any(|t| t == &Task::SendInventory(2, inventory.clone(), ServerTaskIndex::None)));
}
}
#[test]
fn relay_new_transaction_when_in_saturated_state() {
let (_, _, executor, _, sync) = create_sync(None, None);
let tx1: Transaction = test_data::TransactionBuilder::with_output(10).into();
let tx2: Transaction = test_data::TransactionBuilder::with_output(20).into();
let tx2_hash = tx2.hash();
let mut sync = sync.lock();
sync.on_peer_transaction(1, tx1);
sync.on_peer_transaction(2, tx2);
let tasks = { executor.lock().take_tasks() };
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx2_hash }];
assert_eq!(tasks, vec![Task::SendInventory(1, inventory, ServerTaskIndex::None)]);
}
}

View File

@ -8,6 +8,8 @@ use time::precise_time_s;
const MAX_PEER_FAILURES: usize = 2;
/// Max last blocks to store for given peer
const MAX_LAST_BLOCKS_TO_STORE: usize = 64;
/// Max last transactions to store for given peer
const MAX_LAST_TRANSACTIONS_TO_STORE: usize = 64;
/// Set of peers selected for synchronization.
#[derive(Debug)]
@ -26,8 +28,10 @@ pub struct Peers {
inventory_requests: HashSet<usize>,
/// Last inventory message time from peer.
inventory_requests_order: LinkedHashMap<usize, f64>,
/// Last blocks from peer
last_block_responses: HashMap<usize, LinkedHashMap<H256, ()>>,
/// Last blocks from peer.
last_blocks: HashMap<usize, LinkedHashMap<H256, ()>>,
/// Last transactions from peer.
last_transactions: HashMap<usize, LinkedHashMap<H256, ()>>,
}
/// Information on synchronization peers
@ -52,7 +56,8 @@ impl Peers {
blocks_requests_order: LinkedHashMap::new(),
inventory_requests: HashSet::new(),
inventory_requests_order: LinkedHashMap::new(),
last_block_responses: HashMap::new(),
last_blocks: HashMap::new(),
last_transactions: HashMap::new(),
}
}
@ -140,7 +145,15 @@ impl Peers {
/// True if peer already has block with this hash
pub fn has_block_with_hash(&self, peer_index: usize, hash: &H256) -> bool {
self.last_block_responses
self.last_blocks
.get(&peer_index)
.map(|h| h.contains_key(hash))
.unwrap_or(false)
}
/// True if peer already has transaction with this hash
pub fn has_transaction_with_hash(&self, peer_index: usize, hash: &H256) -> bool {
self.last_transactions
.get(&peer_index)
.map(|h| h.contains_key(hash))
.unwrap_or(false)
@ -183,7 +196,8 @@ impl Peers {
self.blocks_requests_order.remove(&peer_index);
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
self.last_block_responses.remove(&peer_index);
self.last_blocks.remove(&peer_index);
self.last_transactions.remove(&peer_index);
peer_blocks_requests
.map(|hs| hs.into_iter().collect())
}
@ -213,12 +227,25 @@ impl Peers {
// TODO: add test for it
// remember that peer knows about this block
let last_block_responses_entry = self.last_block_responses.entry(peer_index).or_insert_with(LinkedHashMap::default);
if !last_block_responses_entry.contains_key(block_hash) {
if last_block_responses_entry.len() == MAX_LAST_BLOCKS_TO_STORE {
last_block_responses_entry.pop_front();
let last_blocks_entry = self.last_blocks.entry(peer_index).or_insert_with(LinkedHashMap::default);
if !last_blocks_entry.contains_key(block_hash) {
if last_blocks_entry.len() == MAX_LAST_BLOCKS_TO_STORE {
last_blocks_entry.pop_front();
}
last_block_responses_entry.insert(block_hash.clone(), ());
last_blocks_entry.insert(block_hash.clone(), ());
}
}
/// Transaction is received from peer.
pub fn on_transaction_received(&mut self, peer_index: usize, transaction_hash: &H256) {
// TODO: add test for it
// remember that peer knows about this transaction
let last_transactions_entry = self.last_transactions.entry(peer_index).or_insert_with(LinkedHashMap::default);
if !last_transactions_entry.contains_key(transaction_hash) {
if last_transactions_entry.len() == MAX_LAST_TRANSACTIONS_TO_STORE {
last_transactions_entry.pop_front();
}
last_transactions_entry.insert(transaction_hash.clone(), ());
}
}