ConnectionFilter connected to Client

This commit is contained in:
Svyatoslav Nikolsky 2016-11-23 13:48:36 +03:00
parent f0e4437eb6
commit 0cc9124fbb
4 changed files with 248 additions and 99 deletions

View File

@ -1,14 +1,18 @@
#![allow(dead_code)] // TODO: remove after connecting with Client
use linked_hash_map::LinkedHashMap;
use bit_vec::BitVec;
use murmur3::murmur3_32;
use chain::{Transaction, OutPoint};
use ser::serialize;
use message::types;
use primitives::hash::H256;
use script::Script;
/// Constant optimized to create large differences in the seed for different values of `hash_functions_num`.
const SEED_OFFSET: u32 = 0xFBA4C795;
/// Max last blocks to store for given peer
const MAX_LAST_BLOCKS_TO_STORE: usize = 64;
/// Max last transactions to store for given peer
const MAX_LAST_TRANSACTIONS_TO_STORE: usize = 64;
/// Filter, which controls data relayed over connection.
#[derive(Debug)]
@ -17,6 +21,10 @@ pub struct ConnectionFilter {
bloom: Option<ConnectionBloom>,
/// Filter update type.
filter_flags: types::FilterFlags,
/// Last blocks from peer.
last_blocks: LinkedHashMap<H256, ()>,
/// Last transactions from peer.
last_transactions: LinkedHashMap<H256, ()>,
}
/// Connection bloom filter
@ -35,6 +43,8 @@ impl Default for ConnectionFilter {
ConnectionFilter {
bloom: None,
filter_flags: types::FilterFlags::None,
last_blocks: LinkedHashMap::new(),
last_transactions: LinkedHashMap::new(),
}
}
}
@ -46,17 +56,56 @@ impl ConnectionFilter {
ConnectionFilter {
bloom: Some(ConnectionBloom::new(message)),
filter_flags: message.flags,
last_blocks: LinkedHashMap::new(),
last_transactions: LinkedHashMap::new(),
}
}
/// Check if transaction is matched && update filter
pub fn match_update_transaction(&mut self, transaction: &Transaction) -> bool {
/// We have a knowledge that block with given hash is known to this connection
pub fn known_block(&mut self, block_hash: &H256) {
// TODO: add test for it
// remember that peer knows about this block
if !self.last_blocks.contains_key(block_hash) {
if self.last_blocks.len() == MAX_LAST_BLOCKS_TO_STORE {
self.last_blocks.pop_front();
}
self.last_blocks.insert(block_hash.clone(), ());
}
}
/// We have a knowledge that transaction with given hash is known to this connection
pub fn known_transaction(&mut self, transaction_hash: &H256) {
// TODO: add test for it
// remember that peer knows about this block
if !self.last_transactions.contains_key(transaction_hash) {
if self.last_transactions.len() == MAX_LAST_TRANSACTIONS_TO_STORE {
self.last_transactions.pop_front();
}
self.last_transactions.insert(transaction_hash.clone(), ());
}
}
/// Check if block should be sent to this connection
pub fn filter_block(&self, block_hash: &H256) -> bool {
// check if block is known
!self.last_blocks.contains_key(block_hash)
}
/// Check if transaction should be sent to this connection && optionally update filter
pub fn filter_transaction(&mut self, transaction_hash: &H256, transaction: &Transaction) -> bool {
// check if transaction is known
if self.last_transactions.contains_key(transaction_hash) {
return false;
}
// check with bloom filter, if set
match self.bloom {
/// if no filter is set for the connection => match everything
None => true,
/// filter using bloom filter, then update
Some(ref mut bloom) => {
let transaction_hash = transaction.hash();
let mut is_match = false;
// match if filter contains any arbitrary script data element in any scriptPubKey in tx
@ -84,7 +133,7 @@ impl ConnectionFilter {
}
// match if filter contains transaction itself
if bloom.contains(&*transaction_hash) {
if bloom.contains(&**transaction_hash) {
return true;
}
@ -168,7 +217,7 @@ impl ConnectionBloom {
}
#[cfg(test)]
mod tests {
pub mod tests {
use std::iter::{Iterator, repeat};
use test_data;
use message::types;
@ -178,7 +227,7 @@ mod tests {
use ser::serialize;
use super::{ConnectionFilter, ConnectionBloom};
fn default_filterload() -> types::FilterLoad {
pub fn default_filterload() -> types::FilterLoad {
types::FilterLoad {
filter: Bytes::from(repeat(0u8).take(1024).collect::<Vec<_>>()),
hash_functions: 10,
@ -187,7 +236,7 @@ mod tests {
}
}
fn make_filteradd(data: &[u8]) -> types::FilterAdd {
pub fn make_filteradd(data: &[u8]) -> types::FilterAdd {
types::FilterAdd {
data: data.into(),
}
@ -210,13 +259,13 @@ mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.match_update_transaction(&tx1));
assert!(!filter.match_update_transaction(&tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
filter.add(&make_filteradd(&*tx1.hash()));
assert!(filter.match_update_transaction(&tx1));
assert!(!filter.match_update_transaction(&tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
}
#[test]
@ -229,50 +278,49 @@ mod tests {
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.match_update_transaction(&tx1));
assert!(!filter.match_update_transaction(&tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
filter.add(&make_filteradd(&tx1_out_data));
assert!(filter.match_update_transaction(&tx1));
assert!(!filter.match_update_transaction(&tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
}
#[test]
fn connection_filter_matches_transaction_by_previous_output_point() {
// https://webbtc.com/tx/eb3b82c0884e3efa6d8b0be55b4915eb20be124c9766245bcc7f34fdac32bccb
// output script: OP_DUP OP_HASH160 380cb3c594de4e7e9b8e18db182987bebb5a4f70 OP_EQUALVERIFY OP_CHECKSIG
let tx1: Transaction = "01000000024de8b0c4c2582db95fa6b3567a989b664484c7ad6672c85a3da413773e63fdb8000000006b48304502205b282fbc9b064f3bc823a23edcc0048cbb174754e7aa742e3c9f483ebe02911c022100e4b0b3a117d36cab5a67404dddbf43db7bea3c1530e0fe128ebc15621bd69a3b0121035aa98d5f77cd9a2d88710e6fc66212aff820026f0dad8f32d1f7ce87457dde50ffffffff4de8b0c4c2582db95fa6b3567a989b664484c7ad6672c85a3da413773e63fdb8010000006f004730440220276d6dad3defa37b5f81add3992d510d2f44a317fd85e04f93a1e2daea64660202200f862a0da684249322ceb8ed842fb8c859c0cb94c81e1c5308b4868157a428ee01ab51210232abdc893e7f0631364d7fd01cb33d24da45329a00357b3a7886211ab414d55a51aeffffffff02e0fd1c00000000001976a914380cb3c594de4e7e9b8e18db182987bebb5a4f7088acc0c62d000000000017142a9bc5447d664c1d0141392a842d23dba45c4f13b17500000000".into();
let tx1_previous_output: Bytes = serialize(&tx1.inputs[0].previous_output);
let tx2 = Transaction::default();
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.match_update_transaction(&tx1));
assert!(!filter.match_update_transaction(&tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
filter.add(&make_filteradd(&tx1_previous_output));
assert!(filter.match_update_transaction(&tx1));
assert!(!filter.match_update_transaction(&tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
}
#[test]
fn connection_filter_matches_transaction_by_input_script_data_element() {
// https://webbtc.com/tx/eb3b82c0884e3efa6d8b0be55b4915eb20be124c9766245bcc7f34fdac32bccb
// output script: OP_DUP OP_HASH160 380cb3c594de4e7e9b8e18db182987bebb5a4f70 OP_EQUALVERIFY OP_CHECKSIG
// input script: PUSH DATA 304502205b282fbc9b064f3bc823a23edcc0048cbb174754e7aa742e3c9f483ebe02911c022100e4b0b3a117d36cab5a67404dddbf43db7bea3c1530e0fe128ebc15621bd69a3b01
let tx1: Transaction = "01000000024de8b0c4c2582db95fa6b3567a989b664484c7ad6672c85a3da413773e63fdb8000000006b48304502205b282fbc9b064f3bc823a23edcc0048cbb174754e7aa742e3c9f483ebe02911c022100e4b0b3a117d36cab5a67404dddbf43db7bea3c1530e0fe128ebc15621bd69a3b0121035aa98d5f77cd9a2d88710e6fc66212aff820026f0dad8f32d1f7ce87457dde50ffffffff4de8b0c4c2582db95fa6b3567a989b664484c7ad6672c85a3da413773e63fdb8010000006f004730440220276d6dad3defa37b5f81add3992d510d2f44a317fd85e04f93a1e2daea64660202200f862a0da684249322ceb8ed842fb8c859c0cb94c81e1c5308b4868157a428ee01ab51210232abdc893e7f0631364d7fd01cb33d24da45329a00357b3a7886211ab414d55a51aeffffffff02e0fd1c00000000001976a914380cb3c594de4e7e9b8e18db182987bebb5a4f7088acc0c62d000000000017142a9bc5447d664c1d0141392a842d23dba45c4f13b17500000000".into();
let tx1_input_data: Bytes = "304502205b282fbc9b064f3bc823a23edcc0048cbb174754e7aa742e3c9f483ebe02911c022100e4b0b3a117d36cab5a67404dddbf43db7bea3c1530e0fe128ebc15621bd69a3b01".into();
let tx2 = Transaction::default();
let mut filter = ConnectionFilter::with_filterload(&default_filterload());
assert!(!filter.match_update_transaction(&tx1));
assert!(!filter.match_update_transaction(&tx2));
assert!(!filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
filter.add(&make_filteradd(&tx1_input_data));
assert!(filter.match_update_transaction(&tx1));
assert!(!filter.match_update_transaction(&tx2));
assert!(filter.filter_transaction(&tx1.hash(), &tx1));
assert!(!filter.filter_transaction(&tx2.hash(), &tx2));
}
}

View File

@ -58,6 +58,7 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
let peer_index = self.peer_counter.fetch_add(1, Ordering::SeqCst) + 1;
trace!(target: "sync", "Creating new sync session with peer#{}", peer_index);
self.client.lock().on_peer_connected(peer_index);
self.executor.lock().add_peer_connection(peer_index, outbound_connection);
peer_index
}

View File

@ -9,6 +9,7 @@ use tokio_core::reactor::{Handle, Interval};
use futures_cpupool::CpuPool;
use db;
use chain::{Block, BlockHeader, Transaction};
use message::types;
use message::common::{InventoryVector, InventoryType};
use primitives::hash::H256;
use synchronization_peers::Peers;
@ -182,12 +183,16 @@ pub struct Information {
pub trait Client : Send + 'static {
fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State;
fn on_peer_connected(&mut self, peer_index: usize);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>);
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>);
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_peer_block(&mut self, peer_index: usize, block: Block);
fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction);
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
fn on_peer_filterclear(&mut self, peer_index: usize);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
}
@ -196,12 +201,16 @@ pub trait Client : Send + 'static {
pub trait ClientCore : VerificationSink {
fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State;
fn on_peer_connected(&mut self, peer_index: usize);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec<H256>);
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>);
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_peer_block(&mut self, peer_index: usize, block: Block) -> Option<VecDeque<(H256, Block)>>;
fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction) -> Option<VecDeque<(H256, Transaction)>>;
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
fn on_peer_filterclear(&mut self, peer_index: usize);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>);
@ -287,6 +296,10 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
self.core.lock().state()
}
fn on_peer_connected(&mut self, peer_index: usize) {
self.core.lock().on_peer_connected(peer_index);
}
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>) {
self.core.lock().on_new_blocks_inventory(peer_index, blocks_hashes)
}
@ -332,6 +345,18 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
}
}
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad) {
self.core.lock().on_peer_filterload(peer_index, message);
}
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd) {
self.core.lock().on_peer_filteradd(peer_index, message);
}
fn on_peer_filterclear(&mut self, peer_index: usize) {
self.core.lock().on_peer_filterclear(peer_index);
}
fn on_peer_disconnected(&mut self, peer_index: usize) {
self.core.lock().on_peer_disconnected(peer_index);
}
@ -370,6 +395,12 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
self.state
}
/// Called when new peer connection is established
fn on_peer_connected(&mut self, peer_index: usize) {
// unuseful until respond with headers message
self.peers.unuseful_peer(peer_index);
}
/// Try to queue synchronization of unknown blocks when new inventory is received.
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>) {
// we use headers-first synchronization
@ -499,6 +530,27 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
self.process_peer_transaction(Some(peer_index), transaction_hash, transaction)
}
/// Peer wants to set bloom filter for the connection
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad) {
if self.peers.is_known_peer(peer_index) {
self.peers.filter_mut(peer_index).load(message);
}
}
/// Peer wants to update bloom filter for the connection
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd) {
if self.peers.is_known_peer(peer_index) {
self.peers.filter_mut(peer_index).add(message);
}
}
/// Peer wants to remove bloom filter for the connection
fn on_peer_filterclear(&mut self, peer_index: usize) {
if self.peers.is_known_peer(peer_index) {
self.peers.filter_mut(peer_index).clear();
}
}
/// Peer disconnected.
fn on_peer_disconnected(&mut self, peer_index: usize) {
// when last peer is disconnected, reset, but let verifying blocks be verified
@ -691,11 +743,11 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
}
// transaction was in verification queue => insert to memory pool
chain.insert_verified_transaction(transaction);
chain.insert_verified_transaction(transaction.clone());
}
// relay transaction to peers
self.relay_new_transactions(vec![hash]);
self.relay_new_transactions(vec![(hash, &transaction)]);
}
/// Process failed transaction verification
@ -783,42 +835,52 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
/// Relay new blocks
fn relay_new_blocks(&self, new_blocks_hashes: Vec<H256>) {
fn relay_new_blocks(&mut self, new_blocks_hashes: Vec<H256>) {
let tasks: Vec<_> = self.peers.all_peers().into_iter()
.filter_map(|peer_index| {
let inventory: Vec<_> = new_blocks_hashes.iter()
.filter(|h| self.peers.filter(peer_index).filter_block(h))
.map(|h| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: h.clone(),
})
.collect();
if !inventory.is_empty() {
Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None))
} else {
None
}
})
.collect();
let mut executor = self.executor.lock();
// TODO: use all peers here (currently sync only)
// TODO: send `headers` if peer has not send `sendheaders` command
for peer_index in self.peers.all_peers() {
let inventory: Vec<_> = new_blocks_hashes.iter()
.filter(|h| !self.peers.has_block_with_hash(peer_index, h))
.cloned()
.map(|h| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: h,
})
.collect();
if !inventory.is_empty() {
executor.execute(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None));
}
for task in tasks {
executor.execute(task);
}
}
/// Relay new transactions
fn relay_new_transactions(&self, new_transactions_hashes: Vec<H256>) {
fn relay_new_transactions(&mut self, new_transactions: Vec<(H256, &Transaction)>) {
let tasks: Vec<_> = self.peers.all_peers().into_iter()
.filter_map(|peer_index| {
let inventory: Vec<_> = new_transactions.iter()
.filter(|&&(ref h, tx)| self.peers.filter_mut(peer_index).filter_transaction(&h, tx))
.map(|&(ref h, _)| InventoryVector {
inv_type: InventoryType::MessageTx,
hash: h.clone(),
})
.collect();
if !inventory.is_empty() {
Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None))
} else {
None
}
})
.collect();
let mut executor = self.executor.lock();
// TODO: use all peers here (currently sync only)
for peer_index in self.peers.all_peers() {
let inventory: Vec<_> = new_transactions_hashes.iter()
.filter(|h| !self.peers.has_transaction_with_hash(peer_index, h))
.cloned()
.map(|h| InventoryVector {
inv_type: InventoryType::MessageTx,
hash: h,
})
.collect();
if !inventory.is_empty() {
executor.execute(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None));
}
for task in tasks {
executor.execute(task);
}
}
@ -1111,6 +1173,7 @@ pub mod tests {
use chain::{Block, Transaction};
use message::common::{InventoryVector, InventoryType};
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore};
use connection_filter::tests::*;
use synchronization_executor::Task;
use synchronization_chain::{Chain, ChainRef};
use synchronization_executor::tests::DummyTaskExecutor;
@ -1887,4 +1950,59 @@ pub mod tests {
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx2_hash }];
assert_eq!(tasks, vec![Task::SendInventory(1, inventory, ServerTaskIndex::None)]);
}
#[test]
fn relay_new_transaction_with_bloom_filter() {
let (_, _, executor, _, sync) = create_sync(None, None);
let tx1: Transaction = test_data::TransactionBuilder::with_output(10).into();
let tx2: Transaction = test_data::TransactionBuilder::with_output(20).into();
let tx3: Transaction = test_data::TransactionBuilder::with_output(30).into();
let tx1_hash = tx1.hash();
let tx2_hash = tx2.hash();
let tx3_hash = tx3.hash();
let mut sync = sync.lock();
// peer#1 wants tx1
sync.on_peer_connected(1);
sync.on_peer_filterload(1, &default_filterload());
sync.on_peer_filteradd(1, &make_filteradd(&*tx1_hash));
// peer#2 wants tx2
sync.on_peer_connected(2);
sync.on_peer_filterload(2, &default_filterload());
sync.on_peer_filteradd(2, &make_filteradd(&*tx2_hash));
// peer#3 wants tx1 + tx2 transactions
sync.on_peer_connected(3);
sync.on_peer_filterload(3, &default_filterload());
sync.on_peer_filteradd(3, &make_filteradd(&*tx1_hash));
sync.on_peer_filteradd(3, &make_filteradd(&*tx2_hash));
// peer#4 has default behaviour (no filter)
sync.on_peer_connected(4);
// peer#5 wants some other transactions
sync.on_peer_connected(5);
sync.on_peer_filterload(5, &default_filterload());
sync.on_peer_filteradd(5, &make_filteradd(&*tx3_hash));
// tx1 is relayed to peers: 1, 3, 4
sync.on_peer_transaction(6, tx1);
let tasks = { executor.lock().take_tasks() };
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx1_hash }];
assert_eq!(tasks, vec![
Task::SendInventory(1, inventory.clone(), ServerTaskIndex::None),
Task::SendInventory(3, inventory.clone(), ServerTaskIndex::None),
Task::SendInventory(4, inventory.clone(), ServerTaskIndex::None),
]);
// tx2 is relayed to peers: 2, 3, 4
sync.on_peer_transaction(6, tx2);
let tasks = { executor.lock().take_tasks() };
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: tx2_hash }];
assert_eq!(tasks, vec![
Task::SendInventory(2, inventory.clone(), ServerTaskIndex::None),
Task::SendInventory(3, inventory.clone(), ServerTaskIndex::None),
Task::SendInventory(4, inventory.clone(), ServerTaskIndex::None),
]);
}
}

View File

@ -3,13 +3,10 @@ use std::collections::hash_map::Entry;
use primitives::hash::H256;
use linked_hash_map::LinkedHashMap;
use time::precise_time_s;
use connection_filter::ConnectionFilter;
/// Max peer failures # before excluding from sync process
const MAX_PEER_FAILURES: usize = 2;
/// Max last blocks to store for given peer
const MAX_LAST_BLOCKS_TO_STORE: usize = 64;
/// Max last transactions to store for given peer
const MAX_LAST_TRANSACTIONS_TO_STORE: usize = 64;
/// Set of peers selected for synchronization.
#[derive(Debug)]
@ -28,10 +25,8 @@ pub struct Peers {
inventory_requests: HashSet<usize>,
/// Last inventory message time from peer.
inventory_requests_order: LinkedHashMap<usize, f64>,
/// Last blocks from peer.
last_blocks: HashMap<usize, LinkedHashMap<H256, ()>>,
/// Last transactions from peer.
last_transactions: HashMap<usize, LinkedHashMap<H256, ()>>,
/// Peer connections filters.
filters: HashMap<usize, ConnectionFilter>,
}
/// Information on synchronization peers
@ -56,8 +51,7 @@ impl Peers {
blocks_requests_order: LinkedHashMap::new(),
inventory_requests: HashSet::new(),
inventory_requests_order: LinkedHashMap::new(),
last_blocks: HashMap::new(),
last_transactions: HashMap::new(),
filters: HashMap::new(),
}
}
@ -74,6 +68,14 @@ impl Peers {
}
}
/// Is known peer
pub fn is_known_peer(&self, peer_index: usize) -> bool {
self.idle.contains(&peer_index)
|| self.unuseful.contains(&peer_index)
|| self.blocks_requests.contains_key(&peer_index)
|| self.inventory_requests.contains(&peer_index)
}
/// Has any useful peers?
pub fn has_any_useful(&self) -> bool {
!self.idle.is_empty()
@ -143,20 +145,16 @@ impl Peers {
self.blocks_requests.get(&peer_index).cloned()
}
/// True if peer already has block with this hash
pub fn has_block_with_hash(&self, peer_index: usize, hash: &H256) -> bool {
self.last_blocks
.get(&peer_index)
.map(|h| h.contains_key(hash))
.unwrap_or(false)
/// Get filter reference for given peer
pub fn filter(&mut self, peer_index: usize) -> &ConnectionFilter {
assert!(self.is_known_peer(peer_index));
&*self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default)
}
/// True if peer already has transaction with this hash
pub fn has_transaction_with_hash(&self, peer_index: usize, hash: &H256) -> bool {
self.last_transactions
.get(&peer_index)
.map(|h| h.contains_key(hash))
.unwrap_or(false)
/// Get mutable filter reference for given peer
pub fn filter_mut(&mut self, peer_index: usize) -> &mut ConnectionFilter {
assert!(self.is_known_peer(peer_index));
self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default)
}
/// Mark peer as useful.
@ -196,8 +194,7 @@ impl Peers {
self.blocks_requests_order.remove(&peer_index);
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
self.last_blocks.remove(&peer_index);
self.last_transactions.remove(&peer_index);
self.filters.remove(&peer_index);
peer_blocks_requests
.map(|hs| hs.into_iter().collect())
}
@ -225,28 +222,13 @@ impl Peers {
self.try_mark_idle(peer_index);
}
// TODO: add test for it
// remember that peer knows about this block
let last_blocks_entry = self.last_blocks.entry(peer_index).or_insert_with(LinkedHashMap::default);
if !last_blocks_entry.contains_key(block_hash) {
if last_blocks_entry.len() == MAX_LAST_BLOCKS_TO_STORE {
last_blocks_entry.pop_front();
}
last_blocks_entry.insert(block_hash.clone(), ());
}
self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default).known_block(block_hash);
}
/// Transaction is received from peer.
pub fn on_transaction_received(&mut self, peer_index: usize, transaction_hash: &H256) {
// TODO: add test for it
// remember that peer knows about this transaction
let last_transactions_entry = self.last_transactions.entry(peer_index).or_insert_with(LinkedHashMap::default);
if !last_transactions_entry.contains_key(transaction_hash) {
if last_transactions_entry.len() == MAX_LAST_TRANSACTIONS_TO_STORE {
last_transactions_entry.pop_front();
}
last_transactions_entry.insert(transaction_hash.clone(), ());
}
self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default).known_transaction(transaction_hash);
}
/// Inventory received from peer.