From aec505a2267668a9623d9907d8abe7ec4ad3bcb5 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 2 Nov 2016 14:20:39 +0300 Subject: [PATCH] serve request --- sync/src/local_node.rs | 8 +- sync/src/synchronization_chain.rs | 10 +- sync/src/synchronization_client.rs | 30 +--- sync/src/synchronization_executor.rs | 69 ++++++++- sync/src/synchronization_server.rs | 211 +++++++++++++++++++++++---- 5 files changed, 263 insertions(+), 65 deletions(-) diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 1bf9d0b1..ed96cd03 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -107,8 +107,10 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon self.server.lock().serve_getblocks(peer_index, message); } - pub fn on_peer_getheaders(&self, peer_index: usize, _message: types::GetHeaders) { + pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders) { trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index); + + self.server.lock().serve_getheaders(peer_index, message); } pub fn on_peer_transaction(&self, peer_index: usize, message: types::Tx) { @@ -181,7 +183,7 @@ mod tests { use parking_lot::{Mutex, RwLock}; use chain::RepresentH256; use synchronization_executor::Task; - use synchronization_client::tests::DummyTaskExecutor; + use synchronization_executor::tests::DummyTaskExecutor; use synchronization_client::{Config, SynchronizationClient}; use synchronization_chain::Chain; use p2p::{OutboundSyncConnection, OutboundSyncConnectionRef}; @@ -225,7 +227,7 @@ mod tests { fn create_local_node() -> (Arc>, Arc>, LocalNode>) { let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block())))); - let executor = Arc::new(Mutex::new(DummyTaskExecutor::default())); + let executor = DummyTaskExecutor::new(); let server = Arc::new(Mutex::new(DummyServer::new())); let config = Config { skip_verification: true }; let client = SynchronizationClient::new(config, executor.clone(), chain); diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 48a87887..0df1a10c 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -372,10 +372,11 @@ impl fmt::Debug for Chain { #[cfg(test)] mod tests { use std::sync::Arc; - use chain::{Block, RepresentH256}; + use chain::RepresentH256; use hash_queue::HashPosition; use super::{Chain, BlockState}; use db::{self, Store, BestBlock}; + use test_data; #[test] fn chain_empty() { @@ -455,8 +456,7 @@ mod tests { assert!(chain.information().scheduled == 3 && chain.information().requested == 1 && chain.information().verifying == 1 && chain.information().stored == 1); // insert new best block to the chain - let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into(); - chain.insert_best_block(block1).expect("Db error"); + chain.insert_best_block(test_data::block_h1()).expect("Db error"); assert!(chain.information().scheduled == 3 && chain.information().requested == 1 && chain.information().verifying == 1 && chain.information().stored == 2); assert_eq!(db.best_block().expect("storage with genesis block is required").number, 1); @@ -468,13 +468,13 @@ mod tests { let genesis_hash = chain.best_block().hash; assert_eq!(chain.block_locator_hashes(), vec![genesis_hash.clone()]); - let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into(); + let block1 = test_data::block_h1(); let block1_hash = block1.hash(); chain.insert_best_block(block1).expect("Error inserting new block"); assert_eq!(chain.block_locator_hashes(), vec![block1_hash.clone(), genesis_hash.clone()]); - let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into(); + let block2 = test_data::block_h2(); let block2_hash = block2.hash(); chain.insert_best_block(block2).expect("Error inserting new block"); diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 274accd4..dde07f9a 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -514,43 +514,19 @@ impl SynchronizationClient where T: TaskExecutor { #[cfg(test)] pub mod tests { use std::sync::Arc; - use std::mem::replace; use parking_lot::{Mutex, RwLock}; use chain::{Block, RepresentH256}; use super::{Client, Config, SynchronizationClient}; - use synchronization_executor::{Task, TaskExecutor}; - use local_node::PeersConnections; + use synchronization_executor::Task; use synchronization_chain::{Chain, ChainRef}; + use synchronization_executor::tests::DummyTaskExecutor; use test_data; - use p2p::OutboundSyncConnectionRef; use db; - #[derive(Default)] - pub struct DummyTaskExecutor { - pub tasks: Vec, - } - - impl DummyTaskExecutor { - pub fn take_tasks(&mut self) -> Vec { - replace(&mut self.tasks, Vec::new()) - } - } - - impl PeersConnections for DummyTaskExecutor { - fn add_peer_connection(&mut self, _: usize, _: OutboundSyncConnectionRef) {} - fn remove_peer_connection(&mut self, _: usize) {} - } - - impl TaskExecutor for DummyTaskExecutor { - fn execute(&mut self, task: Task) { - self.tasks.push(task); - } - } - fn create_sync() -> (Arc>, Arc>>) { let storage = Arc::new(db::TestStorage::with_genesis_block()); let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone()))); - let executor = Arc::new(Mutex::new(DummyTaskExecutor::default())); + let executor = DummyTaskExecutor::new(); let config = Config { skip_verification: true }; (executor.clone(), SynchronizationClient::new(config, executor, chain)) } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index e2afa3e5..5b35cca6 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::collections::HashMap; use parking_lot::Mutex; -use chain::{Block, RepresentH256}; +use chain::{Block, BlockHeader, RepresentH256}; use message::common::{InventoryVector, InventoryType}; use message::types; use primitives::hash::H256; @@ -31,6 +31,8 @@ pub enum Task { SendNotFound(usize, Vec), /// Send inventory SendInventory(usize, Vec), + /// Send headers + SendHeaders(usize, Vec), } /// Synchronization tasks executor @@ -141,6 +143,71 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.send_inventory(&inventory); } }, + Task::SendHeaders(peer_index, headers) => { + let headers = types::Headers { + headers: headers, + }; + + if let Some(connection) = self.peers.get_mut(&peer_index) { + let connection = &mut *connection; + trace!(target: "sync", "Sending headers to peer#{} with {} items", peer_index, headers.headers.len()); + connection.send_headers(&headers); + } + }, } } } + +#[cfg(test)] +pub mod tests { + use super::*; + use std::sync::Arc; + use std::mem::replace; + use std::time; + use parking_lot::{Mutex, Condvar}; + use local_node::PeersConnections; + use p2p::OutboundSyncConnectionRef; + + pub struct DummyTaskExecutor { + tasks: Vec, + waiter: Arc, + } + + impl DummyTaskExecutor { + pub fn new() -> Arc> { + Arc::new(Mutex::new(DummyTaskExecutor { + tasks: Vec::new(), + waiter: Arc::new(Condvar::new()), + })) + } + + pub fn wait_tasks_for(executor: Arc>, timeout_ms: u64) -> Vec { + let mut executor = executor.lock(); + if executor.tasks.is_empty() { + let waiter = executor.waiter.clone(); + waiter.wait_for(&mut executor, time::Duration::from_millis(timeout_ms)).timed_out(); + } + executor.take_tasks() + } + + pub fn wait_tasks(executor: Arc>) -> Vec { + DummyTaskExecutor::wait_tasks_for(executor, 1000) + } + + pub fn take_tasks(&mut self) -> Vec { + replace(&mut self.tasks, Vec::new()) + } + } + + impl PeersConnections for DummyTaskExecutor { + fn add_peer_connection(&mut self, _: usize, _: OutboundSyncConnectionRef) {} + fn remove_peer_connection(&mut self, _: usize) {} + } + + impl TaskExecutor for DummyTaskExecutor { + fn execute(&mut self, task: Task) { + self.tasks.push(task); + self.waiter.notify_one(); + } + } +} \ No newline at end of file diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index 5bfd1217..53320b7a 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -15,6 +15,7 @@ use message::types; pub trait Server : Send + 'static { fn serve_getdata(&mut self, peer_index: usize, message: types::GetData); fn serve_getblocks(&mut self, peer_index: usize, message: types::GetBlocks); + fn serve_getheaders(&mut self, peer_index: usize, message: types::GetHeaders); } /// Synchronization requests server @@ -36,6 +37,7 @@ struct ServerQueue { pub enum ServerTask { ServeGetData(Vec), ServeGetBlocks(db::BestBlock, H256), + ServeGetHeaders(db::BestBlock, H256), ReturnNotFound(Vec), ReturnBlock(H256), } @@ -111,35 +113,27 @@ impl SynchronizationServer { queue.lock().add_tasks(peer_index, new_tasks); } }, - // `inventory` + // `getblocks` => `inventory` (peer_index, ServerTask::ServeGetBlocks(best_block, hash_stop)) => { - let storage_block_hash = chain.read().storage_block_hash(best_block.number); - if let Some(hash) = storage_block_hash { - // check that chain has not reorganized since task was queued - if hash == best_block.hash { - let mut inventory: Vec = Vec::new(); - let first_block_number = best_block.number + 1; - let last_block_number = best_block.number + 500; - // 500 hashes after best_block.number OR hash_stop OR blockchain end - for block_number in first_block_number..last_block_number { - match chain.read().storage_block_hash(block_number) { - Some(ref block_hash) if block_hash == &hash_stop => break, - None => break, - Some(block_hash) => { - inventory.push(InventoryVector { - inv_type: InventoryType::MessageBlock, - hash: block_hash, - }); - }, - } - } - if !inventory.is_empty() { - trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", inventory.len(), peer_index); - executor.lock().execute(Task::SendInventory(peer_index, inventory)); - } - } + let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 500); + if !blocks_hashes.is_empty() { + trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index); + let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: hash, + }).collect(); + executor.lock().execute(Task::SendInventory(peer_index, inventory)); } }, + // `getheaders` => `headers` + (peer_index, ServerTask::ServeGetHeaders(best_block, hash_stop)) => { + let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 2000); + if !blocks_hashes.is_empty() { + trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_hashes.len(), peer_index); + let blocks_headers = blocks_hashes.into_iter().filter_map(|hash| chain.read().storage_block(&hash).map(|block| block.block_header)).collect(); + executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers)); + } + } // `notfound` (peer_index, ServerTask::ReturnNotFound(inventory)) => { executor.lock().execute(Task::SendNotFound(peer_index, inventory)); @@ -157,6 +151,29 @@ impl SynchronizationServer { } } } + + fn blocks_hashes_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec { + let mut hashes: Vec = Vec::new(); + let storage_block_hash = chain.read().storage_block_hash(best_block.number); + if let Some(hash) = storage_block_hash { + // check that chain has not reorganized since task was queued + if hash == best_block.hash { + let first_block_number = best_block.number + 1; + let last_block_number = first_block_number + max_hashes; + // `max_hashes` hashes after best_block.number OR hash_stop OR blockchain end + for block_number in first_block_number..last_block_number { + match chain.read().storage_block_hash(block_number) { + Some(ref block_hash) if block_hash == hash_stop => break, + None => break, + Some(block_hash) => { + hashes.push(block_hash); + }, + } + } + } + } + hashes + } } impl Drop for SynchronizationServer { @@ -183,6 +200,16 @@ impl Server for SynchronizationServer { trace!(target: "sync", "No common blocks with peer#{}", peer_index); } } + + fn serve_getheaders(&mut self, peer_index: usize, message: types::GetHeaders) { + if let Some(best_common_block) = self.locate_known_block(message.block_locator_hashes) { + trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); + self.queue.lock().add_task(peer_index, ServerTask::ServeGetHeaders(best_common_block, message.hash_stop)); + } + else { + trace!(target: "sync", "No common blocks headers with peer#{}", peer_index); + } + } } impl ServerQueue { @@ -248,10 +275,19 @@ impl ServerQueue { #[cfg(test)] pub mod tests { - use super::{Server, ServerTask}; - use message::types; - use db; + use std::sync::Arc; use std::mem::replace; + use parking_lot::{Mutex, RwLock}; + use db; + use test_data; + use primitives::hash::H256; + use chain::RepresentH256; + use message::types; + use message::common::{InventoryVector, InventoryType}; + use synchronization_executor::Task; + use synchronization_executor::tests::DummyTaskExecutor; + use synchronization_chain::Chain; + use super::{Server, ServerTask, SynchronizationServer}; pub struct DummyServer { tasks: Vec<(usize, ServerTask)>, @@ -280,5 +316,122 @@ pub mod tests { hash: message.block_locator_hashes[0].clone(), }, message.hash_stop))); } + + fn serve_getheaders(&mut self, peer_index: usize, message: types::GetHeaders) { + self.tasks.push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock { + number: 0, + hash: message.block_locator_hashes[0].clone(), + }, message.hash_stop))); + } + } + + fn create_synchronization_server() -> (Arc>, Arc>, SynchronizationServer) { + let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block())))); + let executor = DummyTaskExecutor::new(); + let server = SynchronizationServer::new(chain.clone(), executor.clone()); + (chain, executor, server) + } + + #[test] + fn server_getdata_responds_notfound_when_block_not_found() { + let (_, executor, mut server) = create_synchronization_server(); + // when asking for unknown block + let inventory = vec![ + InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: H256::default(), + } + ]; + server.serve_getdata(0, types::GetData { + inventory: inventory.clone(), + }); + // => respond with notfound + let tasks = DummyTaskExecutor::wait_tasks(executor); + assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]); + } + + #[test] + fn server_getdata_responds_block_when_block_is_found() { + let (_, executor, mut server) = create_synchronization_server(); + // when asking for known block + let inventory = vec![ + InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: test_data::genesis().hash(), + } + ]; + server.serve_getdata(0, types::GetData { + inventory: inventory.clone(), + }); + // => respond with notfound + let tasks = DummyTaskExecutor::wait_tasks(executor); + assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis())]); + } + + #[test] + fn server_getblocks_do_not_responds_inventory_when_synchronized() { + let (_, executor, mut server) = create_synchronization_server(); + // when asking for blocks hashes + let genesis_block_hash = test_data::genesis().hash(); + server.serve_getblocks(0, types::GetBlocks { + version: 0, + block_locator_hashes: vec![genesis_block_hash.clone()], + hash_stop: H256::default(), + }); + // => respond with inventory + let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout + assert_eq!(tasks, vec![]); + } + + #[test] + fn server_getblocks_responds_inventory_when_have_unknown_blocks() { + let (chain, executor, mut server) = create_synchronization_server(); + chain.write().insert_best_block(test_data::block_h1()).expect("Db write error"); + // when asking for blocks hashes + server.serve_getblocks(0, types::GetBlocks { + version: 0, + block_locator_hashes: vec![test_data::genesis().hash()], + hash_stop: H256::default(), + }); + // => respond with inventory + let inventory = vec![InventoryVector { + inv_type: InventoryType::MessageBlock, + hash: test_data::block_h1().hash(), + }]; + let tasks = DummyTaskExecutor::wait_tasks(executor); + assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]); + } + + #[test] + fn server_getheaders_do_not_responds_headers_when_synchronized() { + let (_, executor, mut server) = create_synchronization_server(); + // when asking for blocks hashes + let genesis_block_hash = test_data::genesis().hash(); + server.serve_getheaders(0, types::GetHeaders { + version: 0, + block_locator_hashes: vec![genesis_block_hash.clone()], + hash_stop: H256::default(), + }); + // => respond with inventory + let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout + assert_eq!(tasks, vec![]); + } + + #[test] + fn server_getheaders_responds_headers_when_have_unknown_blocks() { + let (chain, executor, mut server) = create_synchronization_server(); + chain.write().insert_best_block(test_data::block_h1()).expect("Db write error"); + // when asking for blocks hashes + server.serve_getheaders(0, types::GetHeaders { + version: 0, + block_locator_hashes: vec![test_data::genesis().hash()], + hash_stop: H256::default(), + }); + // => respond with inventory + let headers = vec![ + test_data::block_h1().block_header, + ]; + let tasks = DummyTaskExecutor::wait_tasks(executor); + assert_eq!(tasks, vec![Task::SendHeaders(0, headers)]); } }