From 2526bcefdb3117f5caec137b4202739dc53ed640 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 29 Nov 2016 14:52:28 +0300 Subject: [PATCH] serve getheaders && getdata completely in server thread --- sync/src/local_node.rs | 2 - sync/src/synchronization_server.rs | 137 ++++++++++++----------------- 2 files changed, 58 insertions(+), 81 deletions(-) diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index c487a819..e3c23e6d 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -67,8 +67,6 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon trace!(target: "sync", "Starting new sync session with peer#{}", peer_index); // request inventory from peer - // TODO: bitcoind doesn't respond to the `getheaders` request while it is synchronizing - // but it answers to the `inventory` request self.executor.lock().execute(SynchronizationTask::RequestBlocksHeaders(peer_index)); } diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index c56205e6..3c24e981 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -9,7 +9,7 @@ use message::common::{InventoryVector, InventoryType}; use db; use chain::{BlockHeader, Transaction}; use primitives::hash::H256; -use synchronization_chain::{ChainRef, TransactionState}; +use synchronization_chain::{Chain, ChainRef, TransactionState}; use synchronization_executor::{Task, TaskExecutor}; use synchronization_client::FilteredInventory; use message::types; @@ -26,7 +26,6 @@ pub trait Server : Send + Sync + 'static { /// Synchronization requests server pub struct SynchronizationServer { - chain: ChainRef, queue_ready: Arc, queue: Arc>, worker_thread: Option>, @@ -87,10 +86,6 @@ impl IndexedServerTask { } impl IndexedServerTask { - fn ignore(id: u32) -> Self { - IndexedServerTask::new(ServerTask::Ignore, ServerTaskIndex::Final(id)) - } - pub fn future(self, peer_index: usize, server: Weak) -> BoxFuture<(), ()> { lazy(move || { server.upgrade().map(|s| s.add_task(peer_index, self)); @@ -102,15 +97,14 @@ impl IndexedServerTask { #[derive(Debug, PartialEq)] pub enum ServerTask { ServeGetData(FilteredInventory), - ServeGetBlocks(db::BestBlock, H256), - ServeGetHeaders(db::BestBlock, H256), + ServeGetBlocks(Vec, H256), + ServeGetHeaders(Vec, H256), ServeGetBlockTxn(H256, Vec), ServeMempool, ReturnNotFound(Vec), ReturnBlock(H256), ReturnMerkleBlock(types::MerkleBlock), ReturnTransaction(Transaction), - Ignore, } impl SynchronizationServer { @@ -118,7 +112,6 @@ impl SynchronizationServer { let queue_ready = Arc::new(Condvar::new()); let queue = Arc::new(Mutex::new(ServerQueue::new(queue_ready.clone()))); let mut server = SynchronizationServer { - chain: chain.clone(), queue_ready: queue_ready.clone(), queue: queue.clone(), worker_thread: None, @@ -129,14 +122,14 @@ impl SynchronizationServer { server } - fn locate_known_block_hash(&self, block_locator_hashes: Vec) -> Option { + fn locate_known_block_hash(chain: &Chain, block_locator_hashes: &Vec) -> Option { block_locator_hashes.into_iter() - .filter_map(|hash| SynchronizationServer::locate_best_known_block_hash(&self.chain, &hash)) + .filter_map(|hash| SynchronizationServer::locate_best_known_block_hash(&chain, hash)) .nth(0) } - fn locate_known_block_header(&self, block_locator_hashes: Vec) -> Option { - self.locate_known_block_hash(block_locator_hashes) + fn locate_known_block_header(chain: &Chain, block_locator_hashes: &Vec) -> Option { + SynchronizationServer::locate_known_block_hash(chain, block_locator_hashes) } fn server_worker(queue_ready: Arc, queue: Arc>, chain: ChainRef, executor: Arc>) { @@ -217,31 +210,50 @@ impl SynchronizationServer { queue.lock().task_processed(peer_index); }, // `getblocks` => `inventory` - ServerTask::ServeGetBlocks(best_block, hash_stop) => { - let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 500); - if !blocks_hashes.is_empty() { - trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index); - let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector { - inv_type: InventoryType::MessageBlock, - hash: hash, - }).collect(); - executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id)); - } else { - assert_eq!(indexed_task.id, ServerTaskIndex::None); + ServerTask::ServeGetBlocks(block_locator_hashes, hash_stop) => { + assert_eq!(indexed_task.id, ServerTaskIndex::None); + + let chain = chain.read(); + if let Some(best_common_block) = SynchronizationServer::locate_known_block_hash(&chain, &block_locator_hashes) { + trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); + + let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_common_block, &hash_stop, 500); + if !blocks_hashes.is_empty() { + trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index); + let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: hash, + }).collect(); + executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id)); + } + } + else { + trace!(target: "sync", "No common blocks with peer#{}", peer_index); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, // `getheaders` => `headers` - ServerTask::ServeGetHeaders(best_block, hash_stop) => { - // What if we have no common blocks with peer at all? Maybe drop connection or penalize peer? - // https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568 - let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_block, &hash_stop, 2000); - if !blocks_headers.is_empty() { - trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index); - executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id)); - } else if let Some(response_id) = indexed_task.id.raw() { - executor.lock().execute(Task::Ignore(peer_index, response_id)); + ServerTask::ServeGetHeaders(block_locator_hashes, hash_stop) => { + let chain = chain.read(); + if let Some(best_common_block) = SynchronizationServer::locate_known_block_header(&chain, &block_locator_hashes) { + trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str()); + + // What if we have no common blocks with peer at all? Maybe drop connection or penalize peer? + // https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568 + let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_common_block, &hash_stop, 2000); + if !blocks_headers.is_empty() { + trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index); + executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id)); + } else if let Some(response_id) = indexed_task.id.raw() { + executor.lock().execute(Task::Ignore(peer_index, response_id)); + } + } + else { + trace!(target: "sync", "No common blocks headers with peer#{}", peer_index); + if let Some(response_id) = indexed_task.id.raw() { + executor.lock().execute(Task::Ignore(peer_index, response_id)); + } } // inform that we have processed task for peer queue.lock().task_processed(peer_index); @@ -326,18 +338,11 @@ impl SynchronizationServer { ServerTask::ReturnTransaction(transaction) => { executor.lock().execute(Task::SendTransaction(peer_index, transaction)); } - // ignore - ServerTask::Ignore => { - let response_id = indexed_task.id.raw().expect("do not schedule redundant ignore task"); - executor.lock().execute(Task::Ignore(peer_index, response_id)); - queue.lock().task_processed(peer_index); - }, } } } - fn blocks_hashes_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec { - let chain = chain.read(); + fn blocks_hashes_after(chain: &Chain, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec { // check that chain has not reorganized since task was queued if chain.block_hash(best_block.number).map(|h| h != best_block.hash).unwrap_or(true) { return Vec::new(); @@ -354,8 +359,7 @@ impl SynchronizationServer { .collect() } - fn blocks_headers_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec { - let chain = chain.read(); + fn blocks_headers_after(chain: &Chain, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec { // check that chain has not reorganized since task was queued if chain.block_hash(best_block.number).map(|h| h != best_block.hash).unwrap_or(true) { return Vec::new(); @@ -373,8 +377,7 @@ impl SynchronizationServer { } - fn locate_best_known_block_hash(chain: &ChainRef, hash: &H256) -> Option { - let chain = chain.read(); + fn locate_best_known_block_hash(chain: &Chain, hash: &H256) -> Option { match chain.block_number(hash) { Some(number) => Some(db::BestBlock { number: number, @@ -420,33 +423,15 @@ impl Server for SynchronizationServer { Some(task) } - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option { - if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) { - trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); - let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::None); - Some(task) - } - else { - trace!(target: "sync", "No common blocks with peer#{}", peer_index); - None - } + fn serve_getblocks(&self, _peer_index: usize, message: types::GetBlocks) -> Option { + let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(message.block_locator_hashes, message.hash_stop), ServerTaskIndex::None); + Some(task) } - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option) -> Option { - if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) { - trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str()); - let server_task_index = id.map_or_else(|| ServerTaskIndex::None, ServerTaskIndex::Final); - let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), server_task_index); - Some(task) - } - else { - trace!(target: "sync", "No common blocks headers with peer#{}", peer_index); - if let Some(id) = id { - Some(IndexedServerTask::ignore(id)) - } else { - None - } - } + fn serve_getheaders(&self, _peer_index: usize, message: types::GetHeaders, id: Option) -> Option { + let server_task_index = id.map_or_else(|| ServerTaskIndex::None, ServerTaskIndex::Final); + let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(message.block_locator_hashes, message.hash_stop), server_task_index); + Some(task) } fn serve_get_block_txn(&self, _peer_index: usize, block_hash: H256, indexes: Vec) -> Option { @@ -581,18 +566,12 @@ pub mod tests { } fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option { - self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock { - number: 0, - hash: message.block_locator_hashes[0].clone(), - }, message.hash_stop))); + self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(message.block_locator_hashes, message.hash_stop))); None } fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: Option) -> Option { - self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock { - number: 0, - hash: message.block_locator_hashes[0].clone(), - }, message.hash_stop))); + self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(message.block_locator_hashes, message.hash_stop))); None }