process sendheaders message

This commit is contained in:
Svyatoslav Nikolsky 2016-11-24 15:18:18 +03:00
parent 59044bf2e0
commit c3672a393d
3 changed files with 91 additions and 16 deletions

View File

@ -183,6 +183,7 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
pub fn on_peer_sendheaders(&self, peer_index: usize, _message: types::SendHeaders) {
trace!(target: "sync", "Got `sendheaders` message from peer#{}", peer_index);
self.client.lock().on_peer_sendheaders(peer_index);
}
pub fn on_peer_feefilter(&self, peer_index: usize, _message: types::FeeFilter) {

View File

@ -194,6 +194,7 @@ pub trait Client : Send + 'static {
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
fn on_peer_filterclear(&mut self, peer_index: usize);
fn on_peer_sendheaders(&mut self, peer_index: usize);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
}
@ -213,6 +214,7 @@ pub trait ClientCore : VerificationSink {
fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad);
fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd);
fn on_peer_filterclear(&mut self, peer_index: usize);
fn on_peer_sendheaders(&mut self, peer_index: usize);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>);
@ -395,6 +397,10 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
self.core.lock().on_peer_filterclear(peer_index);
}
fn on_peer_sendheaders(&mut self, peer_index: usize) {
self.core.lock().on_peer_sendheaders(peer_index);
}
fn on_peer_disconnected(&mut self, peer_index: usize) {
self.core.lock().on_peer_disconnected(peer_index);
}
@ -624,6 +630,13 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
}
/// Peer wants to get blocks headers instead of blocks hashes when announcing new blocks
fn on_peer_sendheaders(&mut self, peer_index: usize) {
if self.peers.is_known_peer(peer_index) {
self.peers.on_peer_sendheaders(peer_index);
}
}
/// Peer disconnected.
fn on_peer_disconnected(&mut self, peer_index: usize) {
// when last peer is disconnected, reset, but let verifying blocks be verified
@ -914,22 +927,42 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
/// Relay new blocks
fn relay_new_blocks(&mut self, new_blocks_hashes: Vec<H256>) {
let tasks: Vec<_> = self.peers.all_peers().into_iter()
.filter_map(|peer_index| {
let inventory: Vec<_> = new_blocks_hashes.iter()
.filter(|h| self.peers.filter(peer_index).filter_block(h))
.map(|h| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: h.clone(),
})
.collect();
if !inventory.is_empty() {
Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None))
} else {
None
}
})
.collect();
let tasks: Vec<_> = {
self.peers.all_peers().into_iter()
.filter_map(|peer_index| {
let send_headers = self.peers.send_headers(peer_index);
if send_headers {
let filtered_blocks_hashes: Vec<_> = new_blocks_hashes.iter()
.filter(|h| self.peers.filter(peer_index).filter_block(h))
.collect();
let chain = self.chain.read();
let headers: Vec<_> = filtered_blocks_hashes.into_iter()
.filter_map(|h| chain.block_header_by_hash(&h))
.collect();
if !headers.is_empty() {
Some(Task::SendHeaders(peer_index, headers, ServerTaskIndex::None))
}
else {
None
}
} else {
let inventory: Vec<_> = new_blocks_hashes.iter()
.filter(|h| self.peers.filter(peer_index).filter_block(h))
.map(|h| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: h.clone(),
})
.collect();
if !inventory.is_empty() {
Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None))
} else {
None
}
}
})
.collect()
};
let mut executor = self.executor.lock();
for task in tasks {
@ -2083,4 +2116,30 @@ pub mod tests {
Task::SendInventory(4, inventory.clone(), ServerTaskIndex::None),
]);
}
#[test]
fn relay_new_block_after_sendheaders() {
let (_, _, executor, _, sync) = create_sync(None, None);
let genesis = test_data::genesis();
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
let mut sync = sync.lock();
sync.on_peer_connected(1);
sync.on_peer_connected(2);
sync.on_peer_sendheaders(2);
sync.on_peer_connected(3);
// igonore tasks
{ executor.lock().take_tasks(); }
sync.on_peer_block(1, b0.clone());
let tasks = executor.lock().take_tasks();
let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b0.hash() }];
let headers = vec![b0.block_header.clone()];
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1),
Task::SendHeaders(2, headers, ServerTaskIndex::None),
Task::SendInventory(3, inventory, ServerTaskIndex::None),
]);
}
}

View File

@ -27,6 +27,8 @@ pub struct Peers {
inventory_requests_order: LinkedHashMap<usize, f64>,
/// Peer connections filters.
filters: HashMap<usize, ConnectionFilter>,
/// Flags, informing that peer wants `headers` message instead of `inventory` when announcing new blocks
send_headers: HashSet<usize>,
}
/// Information on synchronization peers
@ -52,6 +54,7 @@ impl Peers {
inventory_requests: HashSet::new(),
inventory_requests_order: LinkedHashMap::new(),
filters: HashMap::new(),
send_headers: HashSet::new(),
}
}
@ -157,6 +160,12 @@ impl Peers {
self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default)
}
/// Does peer wants `headers` message instead of `inventory` when announcing new blocks
pub fn send_headers(&self, peer_index: usize) -> bool {
assert!(self.is_known_peer(peer_index));
self.send_headers.contains(&peer_index)
}
/// Mark peer as useful.
pub fn useful_peer(&mut self, peer_index: usize) {
// if peer is unknown => insert to idle queue
@ -184,6 +193,11 @@ impl Peers {
self.inventory_requests_order.remove(&peer_index);
}
/// Peer wants `headers` message instead of `inventory` when announcing new blocks
pub fn on_peer_sendheaders(&mut self, peer_index: usize) {
self.send_headers.insert(peer_index);
}
/// Peer has been disconnected
pub fn on_peer_disconnected(&mut self, peer_index: usize) -> Option<Vec<H256>> {
// forget this peer without any chances to reuse
@ -195,6 +209,7 @@ impl Peers {
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
self.filters.remove(&peer_index);
self.send_headers.remove(&peer_index);
peer_blocks_requests
.map(|hs| hs.into_iter().collect())
}