Merge pull request #133 from ethcore/sync_notfound

Process `notfound` messages from sync peers
This commit is contained in:
Marek Kotewicz 2016-11-16 09:22:44 +01:00 committed by GitHub
commit d8c06007ac
3 changed files with 106 additions and 9 deletions

View File

@ -4,11 +4,12 @@ use parking_lot::Mutex;
use db;
use chain::RepresentH256;
use p2p::OutboundSyncConnectionRef;
use message::common::InventoryType;
use message::common::{InventoryType, InventoryVector};
use message::types;
use synchronization_client::{Client, SynchronizationClient};
use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor};
use synchronization_server::{Server, SynchronizationServer};
use primitives::hash::H256;
pub type LocalNodeRef = Arc<LocalNode<LocalSynchronizationTaskExecutor, SynchronizationServer, SynchronizationClient<LocalSynchronizationTaskExecutor>>>;
@ -83,13 +84,8 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
// (2) with 500 entries
// what is (1)?
// process blocks first
let blocks_inventory: Vec<_> = message.inventory.iter()
.filter(|item| item.inv_type == InventoryType::MessageBlock)
.map(|item| item.hash.clone())
.collect();
// if there are unknown blocks => start synchronizing with peer
let blocks_inventory = self.blocks_inventory(&message.inventory);
if !blocks_inventory.is_empty() {
self.client.lock().on_new_blocks_inventory(peer_index, blocks_inventory);
}
@ -199,8 +195,18 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
trace!(target: "sync", "Got `blocktxn` message from peer#{}", peer_index);
}
pub fn on_peer_notfound(&self, peer_index: usize, _message: types::NotFound) {
pub fn on_peer_notfound(&self, peer_index: usize, message: types::NotFound) {
trace!(target: "sync", "Got `notfound` message from peer#{}", peer_index);
let blocks_inventory = self.blocks_inventory(&message.inventory);
self.client.lock().on_peer_blocks_notfound(peer_index, blocks_inventory);
}
fn blocks_inventory(&self, inventory: &Vec<InventoryVector>) -> Vec<H256> {
inventory.iter()
.filter(|item| item.inv_type == InventoryType::MessageBlock)
.map(|item| item.hash.clone())
.collect()
}
}

View File

@ -187,8 +187,9 @@ enum VerificationTask {
pub trait Client : Send + 'static {
fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State;
fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>);
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>);
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>);
fn on_peer_block(&mut self, peer_index: usize, block: Block);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option<Arc<PeersBlocksWaiter>>);
@ -364,6 +365,25 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
self.execute_synchronization_tasks(None);
}
/// When peer has no blocks
fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec<H256>) {
if let Some(requested_blocks) = self.peers.get_blocks_tasks(peer_index) {
// check if peer has responded with notfound to requested blocks
let notfound_blocks: HashSet<H256> = blocks_hashes.into_iter().collect();
if requested_blocks.intersection(&notfound_blocks).nth(0).is_none() {
// if notfound some other blocks => just ignore the message
return;
}
// for now, let's exclude peer from synchronization - we are relying on full nodes for synchronization
let removed_tasks = self.peers.reset_blocks_tasks(peer_index);
self.peers.unuseful_peer(peer_index);
// if peer has had some blocks tasks, rerequest these blocks
self.execute_synchronization_tasks(Some(removed_tasks));
}
}
/// Process new block.
fn on_peer_block(&mut self, peer_index: usize, block: Block) {
let block_hash = block.hash();
@ -1444,4 +1464,56 @@ pub mod tests {
fn sync_after_db_insert_nonfatal_fail() {
// TODO: implement me
}
#[test]
fn peer_removed_from_sync_after_responding_with_requested_block_notfound() {
let (_, _, executor, _, sync) = create_sync(None);
let mut sync = sync.lock();
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
sync.on_new_blocks_headers(1, vec![b1.block_header.clone(), b2.block_header.clone()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::RequestBlocks(1, vec![b1.hash(), b2.hash()])]);
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.unuseful, 0);
assert_eq!(sync.information().peers.active, 1);
sync.on_peer_blocks_notfound(1, vec![b1.hash()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1)]);
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.unuseful, 1);
assert_eq!(sync.information().peers.active, 0);
}
#[test]
fn peer_not_removed_from_sync_after_responding_with_requested_block_notfound() {
let (_, _, executor, _, sync) = create_sync(None);
let mut sync = sync.lock();
let b1 = test_data::block_h1();
let b2 = test_data::block_h2();
sync.on_new_blocks_headers(1, vec![b1.block_header.clone(), b2.block_header.clone()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::RequestBlocks(1, vec![b1.hash(), b2.hash()])]);
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.unuseful, 0);
assert_eq!(sync.information().peers.active, 1);
sync.on_peer_blocks_notfound(1, vec![test_data::block_h170().hash()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![]);
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.unuseful, 0);
assert_eq!(sync.information().peers.active, 1);
}
}

View File

@ -120,6 +120,11 @@ impl Peers {
.collect()
}
/// Get peer tasks
pub fn get_blocks_tasks(&self, peer_index: usize) -> Option<HashSet<H256>> {
self.blocks_requests.get(&peer_index).cloned()
}
/// Mark peer as useful.
pub fn useful_peer(&mut self, peer_index: usize) {
// if peer is unknown => insert to idle queue
@ -133,6 +138,20 @@ impl Peers {
}
}
/// Mark peer as unuseful.
pub fn unuseful_peer(&mut self, peer_index: usize) {
// if peer is unknown => insert to idle queue
// if peer is known && not useful => insert to idle queue
assert!(!self.blocks_requests.contains_key(&peer_index));
assert!(!self.blocks_requests_order.contains_key(&peer_index));
self.idle.remove(&peer_index);
self.unuseful.insert(peer_index);
self.failures.remove(&peer_index);
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
}
/// Peer has been disconnected
pub fn on_peer_disconnected(&mut self, peer_index: usize) -> Option<Vec<H256>> {
// forget this peer without any chances to reuse