diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index fd2c07e4..39bd4797 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -183,6 +183,7 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon pub fn on_peer_sendheaders(&self, peer_index: usize, _message: types::SendHeaders) { trace!(target: "sync", "Got `sendheaders` message from peer#{}", peer_index); + self.client.lock().on_peer_sendheaders(peer_index); } pub fn on_peer_feefilter(&self, peer_index: usize, _message: types::FeeFilter) { diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index dfc7d5ad..ae29cce2 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -194,6 +194,7 @@ pub trait Client : Send + 'static { fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad); fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd); fn on_peer_filterclear(&mut self, peer_index: usize); + fn on_peer_sendheaders(&mut self, peer_index: usize); fn on_peer_disconnected(&mut self, peer_index: usize); fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>); } @@ -213,6 +214,7 @@ pub trait ClientCore : VerificationSink { fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad); fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd); fn on_peer_filterclear(&mut self, peer_index: usize); + fn on_peer_sendheaders(&mut self, peer_index: usize); fn on_peer_disconnected(&mut self, peer_index: usize); fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>); fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option>); @@ -395,6 +397,10 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri self.core.lock().on_peer_filterclear(peer_index); } + fn on_peer_sendheaders(&mut self, peer_index: usize) { + self.core.lock().on_peer_sendheaders(peer_index); + } + fn on_peer_disconnected(&mut self, peer_index: usize) { self.core.lock().on_peer_disconnected(peer_index); } @@ -624,6 +630,13 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { } } + /// Peer wants to get blocks headers instead of blocks hashes when announcing new blocks + fn on_peer_sendheaders(&mut self, peer_index: usize) { + if self.peers.is_known_peer(peer_index) { + self.peers.on_peer_sendheaders(peer_index); + } + } + /// Peer disconnected. fn on_peer_disconnected(&mut self, peer_index: usize) { // when last peer is disconnected, reset, but let verifying blocks be verified @@ -914,22 +927,42 @@ impl SynchronizationClientCore where T: TaskExecutor { /// Relay new blocks fn relay_new_blocks(&mut self, new_blocks_hashes: Vec) { - let tasks: Vec<_> = self.peers.all_peers().into_iter() - .filter_map(|peer_index| { - let inventory: Vec<_> = new_blocks_hashes.iter() - .filter(|h| self.peers.filter(peer_index).filter_block(h)) - .map(|h| InventoryVector { - inv_type: InventoryType::MessageBlock, - hash: h.clone(), - }) - .collect(); - if !inventory.is_empty() { - Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None)) - } else { - None - } - }) - .collect(); + let tasks: Vec<_> = { + self.peers.all_peers().into_iter() + .filter_map(|peer_index| { + let send_headers = self.peers.send_headers(peer_index); + + if send_headers { + let filtered_blocks_hashes: Vec<_> = new_blocks_hashes.iter() + .filter(|h| self.peers.filter(peer_index).filter_block(h)) + .collect(); + let chain = self.chain.read(); + let headers: Vec<_> = filtered_blocks_hashes.into_iter() + .filter_map(|h| chain.block_header_by_hash(&h)) + .collect(); + if !headers.is_empty() { + Some(Task::SendHeaders(peer_index, headers, ServerTaskIndex::None)) + } + else { + None + } + } else { + let inventory: Vec<_> = new_blocks_hashes.iter() + .filter(|h| self.peers.filter(peer_index).filter_block(h)) + .map(|h| InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: h.clone(), + }) + .collect(); + if !inventory.is_empty() { + Some(Task::SendInventory(peer_index, inventory, ServerTaskIndex::None)) + } else { + None + } + } + }) + .collect() + }; let mut executor = self.executor.lock(); for task in tasks { @@ -2083,4 +2116,30 @@ pub mod tests { Task::SendInventory(4, inventory.clone(), ServerTaskIndex::None), ]); } + + #[test] + fn relay_new_block_after_sendheaders() { + let (_, _, executor, _, sync) = create_sync(None, None); + let genesis = test_data::genesis(); + let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); + + let mut sync = sync.lock(); + sync.on_peer_connected(1); + sync.on_peer_connected(2); + sync.on_peer_sendheaders(2); + sync.on_peer_connected(3); + + // igonore tasks + { executor.lock().take_tasks(); } + + sync.on_peer_block(1, b0.clone()); + + let tasks = executor.lock().take_tasks(); + let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: b0.hash() }]; + let headers = vec![b0.block_header.clone()]; + assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), + Task::SendHeaders(2, headers, ServerTaskIndex::None), + Task::SendInventory(3, inventory, ServerTaskIndex::None), + ]); + } } diff --git a/sync/src/synchronization_peers.rs b/sync/src/synchronization_peers.rs index 8df51dd7..9bc64742 100644 --- a/sync/src/synchronization_peers.rs +++ b/sync/src/synchronization_peers.rs @@ -27,6 +27,8 @@ pub struct Peers { inventory_requests_order: LinkedHashMap, /// Peer connections filters. filters: HashMap, + /// Flags, informing that peer wants `headers` message instead of `inventory` when announcing new blocks + send_headers: HashSet, } /// Information on synchronization peers @@ -52,6 +54,7 @@ impl Peers { inventory_requests: HashSet::new(), inventory_requests_order: LinkedHashMap::new(), filters: HashMap::new(), + send_headers: HashSet::new(), } } @@ -157,6 +160,12 @@ impl Peers { self.filters.entry(peer_index).or_insert_with(ConnectionFilter::default) } + /// Does peer wants `headers` message instead of `inventory` when announcing new blocks + pub fn send_headers(&self, peer_index: usize) -> bool { + assert!(self.is_known_peer(peer_index)); + self.send_headers.contains(&peer_index) + } + /// Mark peer as useful. pub fn useful_peer(&mut self, peer_index: usize) { // if peer is unknown => insert to idle queue @@ -184,6 +193,11 @@ impl Peers { self.inventory_requests_order.remove(&peer_index); } + /// Peer wants `headers` message instead of `inventory` when announcing new blocks + pub fn on_peer_sendheaders(&mut self, peer_index: usize) { + self.send_headers.insert(peer_index); + } + /// Peer has been disconnected pub fn on_peer_disconnected(&mut self, peer_index: usize) -> Option> { // forget this peer without any chances to reuse @@ -195,6 +209,7 @@ impl Peers { self.inventory_requests.remove(&peer_index); self.inventory_requests_order.remove(&peer_index); self.filters.remove(&peer_index); + self.send_headers.remove(&peer_index); peer_blocks_requests .map(|hs| hs.into_iter().collect()) }