serve request

This commit is contained in:
Svyatoslav Nikolsky 2016-11-02 14:20:39 +03:00
parent 0b212ec5eb
commit aec505a226
5 changed files with 263 additions and 65 deletions

View File

@ -107,8 +107,10 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
self.server.lock().serve_getblocks(peer_index, message);
}
pub fn on_peer_getheaders(&self, peer_index: usize, _message: types::GetHeaders) {
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index);
self.server.lock().serve_getheaders(peer_index, message);
}
pub fn on_peer_transaction(&self, peer_index: usize, message: types::Tx) {
@ -181,7 +183,7 @@ mod tests {
use parking_lot::{Mutex, RwLock};
use chain::RepresentH256;
use synchronization_executor::Task;
use synchronization_client::tests::DummyTaskExecutor;
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_client::{Config, SynchronizationClient};
use synchronization_chain::Chain;
use p2p::{OutboundSyncConnection, OutboundSyncConnectionRef};
@ -225,7 +227,7 @@ mod tests {
fn create_local_node() -> (Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<DummyServer>>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor>>) {
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
let executor = Arc::new(Mutex::new(DummyTaskExecutor::default()));
let executor = DummyTaskExecutor::new();
let server = Arc::new(Mutex::new(DummyServer::new()));
let config = Config { skip_verification: true };
let client = SynchronizationClient::new(config, executor.clone(), chain);

View File

@ -372,10 +372,11 @@ impl fmt::Debug for Chain {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use chain::{Block, RepresentH256};
use chain::RepresentH256;
use hash_queue::HashPosition;
use super::{Chain, BlockState};
use db::{self, Store, BestBlock};
use test_data;
#[test]
fn chain_empty() {
@ -455,8 +456,7 @@ mod tests {
assert!(chain.information().scheduled == 3 && chain.information().requested == 1
&& chain.information().verifying == 1 && chain.information().stored == 1);
// insert new best block to the chain
let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into();
chain.insert_best_block(block1).expect("Db error");
chain.insert_best_block(test_data::block_h1()).expect("Db error");
assert!(chain.information().scheduled == 3 && chain.information().requested == 1
&& chain.information().verifying == 1 && chain.information().stored == 2);
assert_eq!(db.best_block().expect("storage with genesis block is required").number, 1);
@ -468,13 +468,13 @@ mod tests {
let genesis_hash = chain.best_block().hash;
assert_eq!(chain.block_locator_hashes(), vec![genesis_hash.clone()]);
let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into();
let block1 = test_data::block_h1();
let block1_hash = block1.hash();
chain.insert_best_block(block1).expect("Error inserting new block");
assert_eq!(chain.block_locator_hashes(), vec![block1_hash.clone(), genesis_hash.clone()]);
let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into();
let block2 = test_data::block_h2();
let block2_hash = block2.hash();
chain.insert_best_block(block2).expect("Error inserting new block");

View File

@ -514,43 +514,19 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use std::mem::replace;
use parking_lot::{Mutex, RwLock};
use chain::{Block, RepresentH256};
use super::{Client, Config, SynchronizationClient};
use synchronization_executor::{Task, TaskExecutor};
use local_node::PeersConnections;
use synchronization_executor::Task;
use synchronization_chain::{Chain, ChainRef};
use synchronization_executor::tests::DummyTaskExecutor;
use test_data;
use p2p::OutboundSyncConnectionRef;
use db;
#[derive(Default)]
pub struct DummyTaskExecutor {
pub tasks: Vec<Task>,
}
impl DummyTaskExecutor {
pub fn take_tasks(&mut self) -> Vec<Task> {
replace(&mut self.tasks, Vec::new())
}
}
impl PeersConnections for DummyTaskExecutor {
fn add_peer_connection(&mut self, _: usize, _: OutboundSyncConnectionRef) {}
fn remove_peer_connection(&mut self, _: usize) {}
}
impl TaskExecutor for DummyTaskExecutor {
fn execute(&mut self, task: Task) {
self.tasks.push(task);
}
}
fn create_sync() -> (Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<SynchronizationClient<DummyTaskExecutor>>>) {
let storage = Arc::new(db::TestStorage::with_genesis_block());
let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone())));
let executor = Arc::new(Mutex::new(DummyTaskExecutor::default()));
let executor = DummyTaskExecutor::new();
let config = Config { skip_verification: true };
(executor.clone(), SynchronizationClient::new(config, executor, chain))
}

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::collections::HashMap;
use parking_lot::Mutex;
use chain::{Block, RepresentH256};
use chain::{Block, BlockHeader, RepresentH256};
use message::common::{InventoryVector, InventoryType};
use message::types;
use primitives::hash::H256;
@ -31,6 +31,8 @@ pub enum Task {
SendNotFound(usize, Vec<InventoryVector>),
/// Send inventory
SendInventory(usize, Vec<InventoryVector>),
/// Send headers
SendHeaders(usize, Vec<BlockHeader>),
}
/// Synchronization tasks executor
@ -141,6 +143,71 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_inventory(&inventory);
}
},
Task::SendHeaders(peer_index, headers) => {
let headers = types::Headers {
headers: headers,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending headers to peer#{} with {} items", peer_index, headers.headers.len());
connection.send_headers(&headers);
}
},
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use std::sync::Arc;
use std::mem::replace;
use std::time;
use parking_lot::{Mutex, Condvar};
use local_node::PeersConnections;
use p2p::OutboundSyncConnectionRef;
pub struct DummyTaskExecutor {
tasks: Vec<Task>,
waiter: Arc<Condvar>,
}
impl DummyTaskExecutor {
pub fn new() -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(DummyTaskExecutor {
tasks: Vec::new(),
waiter: Arc::new(Condvar::new()),
}))
}
pub fn wait_tasks_for(executor: Arc<Mutex<Self>>, timeout_ms: u64) -> Vec<Task> {
let mut executor = executor.lock();
if executor.tasks.is_empty() {
let waiter = executor.waiter.clone();
waiter.wait_for(&mut executor, time::Duration::from_millis(timeout_ms)).timed_out();
}
executor.take_tasks()
}
pub fn wait_tasks(executor: Arc<Mutex<Self>>) -> Vec<Task> {
DummyTaskExecutor::wait_tasks_for(executor, 1000)
}
pub fn take_tasks(&mut self) -> Vec<Task> {
replace(&mut self.tasks, Vec::new())
}
}
impl PeersConnections for DummyTaskExecutor {
fn add_peer_connection(&mut self, _: usize, _: OutboundSyncConnectionRef) {}
fn remove_peer_connection(&mut self, _: usize) {}
}
impl TaskExecutor for DummyTaskExecutor {
fn execute(&mut self, task: Task) {
self.tasks.push(task);
self.waiter.notify_one();
}
}
}

View File

@ -15,6 +15,7 @@ use message::types;
pub trait Server : Send + 'static {
fn serve_getdata(&mut self, peer_index: usize, message: types::GetData);
fn serve_getblocks(&mut self, peer_index: usize, message: types::GetBlocks);
fn serve_getheaders(&mut self, peer_index: usize, message: types::GetHeaders);
}
/// Synchronization requests server
@ -36,6 +37,7 @@ struct ServerQueue {
pub enum ServerTask {
ServeGetData(Vec<InventoryVector>),
ServeGetBlocks(db::BestBlock, H256),
ServeGetHeaders(db::BestBlock, H256),
ReturnNotFound(Vec<InventoryVector>),
ReturnBlock(H256),
}
@ -111,35 +113,27 @@ impl SynchronizationServer {
queue.lock().add_tasks(peer_index, new_tasks);
}
},
// `inventory`
// `getblocks` => `inventory`
(peer_index, ServerTask::ServeGetBlocks(best_block, hash_stop)) => {
let storage_block_hash = chain.read().storage_block_hash(best_block.number);
if let Some(hash) = storage_block_hash {
// check that chain has not reorganized since task was queued
if hash == best_block.hash {
let mut inventory: Vec<InventoryVector> = Vec::new();
let first_block_number = best_block.number + 1;
let last_block_number = best_block.number + 500;
// 500 hashes after best_block.number OR hash_stop OR blockchain end
for block_number in first_block_number..last_block_number {
match chain.read().storage_block_hash(block_number) {
Some(ref block_hash) if block_hash == &hash_stop => break,
None => break,
Some(block_hash) => {
inventory.push(InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: block_hash,
});
},
}
}
if !inventory.is_empty() {
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", inventory.len(), peer_index);
executor.lock().execute(Task::SendInventory(peer_index, inventory));
}
}
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 500);
if !blocks_hashes.is_empty() {
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
let inventory = blocks_hashes.into_iter().map(|hash| InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: hash,
}).collect();
executor.lock().execute(Task::SendInventory(peer_index, inventory));
}
},
// `getheaders` => `headers`
(peer_index, ServerTask::ServeGetHeaders(best_block, hash_stop)) => {
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 2000);
if !blocks_hashes.is_empty() {
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_hashes.len(), peer_index);
let blocks_headers = blocks_hashes.into_iter().filter_map(|hash| chain.read().storage_block(&hash).map(|block| block.block_header)).collect();
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers));
}
}
// `notfound`
(peer_index, ServerTask::ReturnNotFound(inventory)) => {
executor.lock().execute(Task::SendNotFound(peer_index, inventory));
@ -157,6 +151,29 @@ impl SynchronizationServer {
}
}
}
fn blocks_hashes_after(chain: &ChainRef, best_block: &db::BestBlock, hash_stop: &H256, max_hashes: u32) -> Vec<H256> {
let mut hashes: Vec<H256> = Vec::new();
let storage_block_hash = chain.read().storage_block_hash(best_block.number);
if let Some(hash) = storage_block_hash {
// check that chain has not reorganized since task was queued
if hash == best_block.hash {
let first_block_number = best_block.number + 1;
let last_block_number = first_block_number + max_hashes;
// `max_hashes` hashes after best_block.number OR hash_stop OR blockchain end
for block_number in first_block_number..last_block_number {
match chain.read().storage_block_hash(block_number) {
Some(ref block_hash) if block_hash == hash_stop => break,
None => break,
Some(block_hash) => {
hashes.push(block_hash);
},
}
}
}
}
hashes
}
}
impl Drop for SynchronizationServer {
@ -183,6 +200,16 @@ impl Server for SynchronizationServer {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
}
}
fn serve_getheaders(&mut self, peer_index: usize, message: types::GetHeaders) {
if let Some(best_common_block) = self.locate_known_block(message.block_locator_hashes) {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
self.queue.lock().add_task(peer_index, ServerTask::ServeGetHeaders(best_common_block, message.hash_stop));
}
else {
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
}
}
}
impl ServerQueue {
@ -248,10 +275,19 @@ impl ServerQueue {
#[cfg(test)]
pub mod tests {
use super::{Server, ServerTask};
use message::types;
use db;
use std::sync::Arc;
use std::mem::replace;
use parking_lot::{Mutex, RwLock};
use db;
use test_data;
use primitives::hash::H256;
use chain::RepresentH256;
use message::types;
use message::common::{InventoryVector, InventoryType};
use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_chain::Chain;
use super::{Server, ServerTask, SynchronizationServer};
pub struct DummyServer {
tasks: Vec<(usize, ServerTask)>,
@ -280,5 +316,122 @@ pub mod tests {
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
}
fn serve_getheaders(&mut self, peer_index: usize, message: types::GetHeaders) {
self.tasks.push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
}
}
fn create_synchronization_server() -> (Arc<RwLock<Chain>>, Arc<Mutex<DummyTaskExecutor>>, SynchronizationServer) {
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
let executor = DummyTaskExecutor::new();
let server = SynchronizationServer::new(chain.clone(), executor.clone());
(chain, executor, server)
}
#[test]
fn server_getdata_responds_notfound_when_block_not_found() {
let (_, executor, mut server) = create_synchronization_server();
// when asking for unknown block
let inventory = vec![
InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: H256::default(),
}
];
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
});
// => respond with notfound
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]);
}
#[test]
fn server_getdata_responds_block_when_block_is_found() {
let (_, executor, mut server) = create_synchronization_server();
// when asking for known block
let inventory = vec![
InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: test_data::genesis().hash(),
}
];
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
});
// => respond with notfound
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis())]);
}
#[test]
fn server_getblocks_do_not_responds_inventory_when_synchronized() {
let (_, executor, mut server) = create_synchronization_server();
// when asking for blocks hashes
let genesis_block_hash = test_data::genesis().hash();
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
});
// => respond with inventory
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
}
#[test]
fn server_getblocks_responds_inventory_when_have_unknown_blocks() {
let (chain, executor, mut server) = create_synchronization_server();
chain.write().insert_best_block(test_data::block_h1()).expect("Db write error");
// when asking for blocks hashes
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
});
// => respond with inventory
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: test_data::block_h1().hash(),
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
}
#[test]
fn server_getheaders_do_not_responds_headers_when_synchronized() {
let (_, executor, mut server) = create_synchronization_server();
// when asking for blocks hashes
let genesis_block_hash = test_data::genesis().hash();
server.serve_getheaders(0, types::GetHeaders {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
});
// => respond with inventory
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
}
#[test]
fn server_getheaders_responds_headers_when_have_unknown_blocks() {
let (chain, executor, mut server) = create_synchronization_server();
chain.write().insert_best_block(test_data::block_h1()).expect("Db write error");
// when asking for blocks hashes
server.serve_getheaders(0, types::GetHeaders {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
});
// => respond with inventory
let headers = vec![
test_data::block_h1().block_header,
];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendHeaders(0, headers)]);
}
}