sync synchronizer part done

This commit is contained in:
debris 2016-11-14 09:56:58 +01:00
parent 62e1d9c288
commit 86cff63c30
6 changed files with 180 additions and 106 deletions

View File

@ -5,6 +5,8 @@ use protocol::Protocol;
use util::{PeerInfo, PeerId};
use p2p::Context;
const UNIMPLEMENTED_TASK_ID: u32 = 0;
pub type InboundSyncConnectionRef = Box<InboundSyncConnection>;
pub type OutboundSyncConnectionRef = Box<OutboundSyncConnection>;
pub type LocalSyncNodeRef = Box<LocalSyncNode>;
@ -19,13 +21,13 @@ pub trait InboundSyncConnection : Send + Sync {
fn start_sync_session(&self, version: u32);
fn close_session(&self);
fn on_inventory(&self, message: types::Inv);
fn on_getdata(&self, message: types::GetData);
fn on_getblocks(&self, message: types::GetBlocks);
fn on_getheaders(&self, message: types::GetHeaders);
fn on_getdata(&self, message: types::GetData, id: u32);
fn on_getblocks(&self, message: types::GetBlocks, id: u32);
fn on_getheaders(&self, message: types::GetHeaders, id: u32);
fn on_transaction(&self, message: types::Tx);
fn on_block(&self, message: types::Block);
fn on_headers(&self, message: types::Headers);
fn on_mempool(&self, message: types::MemPool);
fn on_mempool(&self, message: types::MemPool, id: u32);
fn on_filterload(&self, message: types::FilterLoad);
fn on_filteradd(&self, message: types::FilterAdd);
fn on_filterclear(&self, message: types::FilterClear);
@ -40,14 +42,14 @@ pub trait InboundSyncConnection : Send + Sync {
}
pub trait OutboundSyncConnection : Send + Sync {
fn send_inventory(&self, message: &types::Inv);
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool);
fn send_getdata(&self, message: &types::GetData);
fn send_getblocks(&self, message: &types::GetBlocks);
fn send_getheaders(&self, message: &types::GetHeaders);
fn send_transaction(&self, message: &types::Tx);
fn send_block(&self, message: &types::Block);
fn send_headers(&self, message: &types::Headers);
fn send_mempool(&self, message: &types::MemPool);
fn send_block(&self, message: &types::Block, id: u32, is_final: bool);
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool);
fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool);
fn send_filterload(&self, message: &types::FilterLoad);
fn send_filteradd(&self, message: &types::FilterAdd);
fn send_filterclear(&self, message: &types::FilterClear);
@ -58,7 +60,7 @@ pub trait OutboundSyncConnection : Send + Sync {
fn send_compact_block(&self, message: &types::CompactBlock);
fn send_get_block_txn(&self, message: &types::GetBlockTxn);
fn send_block_txn(&self, message: &types::BlockTxn);
fn send_notfound(&self, message: &types::NotFound);
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool);
}
struct OutboundSync {
@ -85,7 +87,7 @@ impl OutboundSync {
}
impl OutboundSyncConnection for OutboundSync {
fn send_inventory(&self, message: &types::Inv) {
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool) {
self.send_message(message);
}
@ -105,15 +107,15 @@ impl OutboundSyncConnection for OutboundSync {
self.send_message(message);
}
fn send_block(&self, message: &types::Block) {
fn send_block(&self, message: &types::Block, id: u32, is_final: bool) {
self.send_message(message);
}
fn send_headers(&self, message: &types::Headers) {
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool) {
self.send_message(message);
}
fn send_mempool(&self, message: &types::MemPool) {
fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool) {
self.send_message(message);
}
@ -157,7 +159,7 @@ impl OutboundSyncConnection for OutboundSync {
self.send_message(message);
}
fn send_notfound(&self, message: &types::NotFound) {
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool) {
self.send_message(message);
}
}
@ -190,15 +192,15 @@ impl Protocol for SyncProtocol {
}
else if command == &types::GetData::command() {
let message: types::GetData = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getdata(message);
self.inbound_connection.on_getdata(message, UNIMPLEMENTED_TASK_ID);
}
else if command == &types::GetBlocks::command() {
let message: types::GetBlocks = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getblocks(message);
self.inbound_connection.on_getblocks(message, UNIMPLEMENTED_TASK_ID);
}
else if command == &types::GetHeaders::command() {
let message: types::GetHeaders = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_getheaders(message);
self.inbound_connection.on_getheaders(message, UNIMPLEMENTED_TASK_ID);
}
else if command == &types::Tx::command() {
let message: types::Tx = try!(deserialize_payload(payload, self.info.version));
@ -210,7 +212,7 @@ impl Protocol for SyncProtocol {
}
else if command == &types::MemPool::command() {
let message: types::MemPool = try!(deserialize_payload(payload, self.info.version));
self.inbound_connection.on_mempool(message);
self.inbound_connection.on_mempool(message, UNIMPLEMENTED_TASK_ID);
}
else if command == &types::Headers::command() {
let message: types::Headers = try!(deserialize_payload(payload, self.info.version));

View File

@ -4,7 +4,7 @@ use bytes::Bytes;
use message::{Command, Error};
use p2p::Context;
use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol};
use util::{PeerInfo};
use util::{ConfigurableSynchronizer, PeerInfo};
pub trait SessionFactory {
fn new_session(context: Arc<Context>, info: PeerInfo) -> Session;
@ -34,12 +34,14 @@ impl SessionFactory for NormalSessionFactory {
pub struct Session {
protocols: Mutex<Vec<Box<Protocol>>>,
synchronizer: Mutex<ConfigurableSynchronizer>,
}
impl Session {
pub fn new(protocols: Vec<Box<Protocol>>) -> Self {
Session {
protocols: Mutex::new(protocols),
synchronizer: Mutex::new(ConfigurableSynchronizer::new(false)),
}
}

View File

@ -29,16 +29,16 @@ impl InboundSyncConnection for InboundConnection {
self.local_node.on_peer_inventory(self.peer_index, message);
}
fn on_getdata(&self, message: types::GetData) {
self.local_node.on_peer_getdata(self.peer_index, message);
fn on_getdata(&self, message: types::GetData, id: u32) {
self.local_node.on_peer_getdata(self.peer_index, message, id);
}
fn on_getblocks(&self, message: types::GetBlocks) {
self.local_node.on_peer_getblocks(self.peer_index, message);
fn on_getblocks(&self, message: types::GetBlocks, id: u32) {
self.local_node.on_peer_getblocks(self.peer_index, message, id);
}
fn on_getheaders(&self, message: types::GetHeaders) {
self.local_node.on_peer_getheaders(self.peer_index, message);
fn on_getheaders(&self, message: types::GetHeaders, id: u32) {
self.local_node.on_peer_getheaders(self.peer_index, message, id);
}
fn on_transaction(&self, message: types::Tx) {
@ -53,8 +53,8 @@ impl InboundSyncConnection for InboundConnection {
self.local_node.on_peer_headers(self.peer_index, message);
}
fn on_mempool(&self, message: types::MemPool) {
self.local_node.on_peer_mempool(self.peer_index, message);
fn on_mempool(&self, message: types::MemPool, id: u32) {
self.local_node.on_peer_mempool(self.peer_index, message, id);
}
fn on_filterload(&self, message: types::FilterLoad) {

View File

@ -97,19 +97,19 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
// TODO: process unknown transactions, etc...
}
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
self.server.serve_getdata(peer_index, message);
self.server.serve_getdata(peer_index, message, id);
}
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index);
self.server.serve_getblocks(peer_index, message);
self.server.serve_getblocks(peer_index, message, id);
}
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index);
// simulating bitcoind for passing tests: if we are in nearly-saturated state
@ -123,7 +123,7 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
need_wait
};
self.server.serve_getheaders(peer_index, message);
self.server.serve_getheaders(peer_index, message, id);
if need_wait {
self.server.wait_peer_requests_completed(peer_index);
}
@ -148,10 +148,10 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
}
}
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool) {
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool, id: u32) {
trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index);
self.server.serve_mempool(peer_index);
self.server.serve_mempool(peer_index, id);
}
pub fn on_peer_filterload(&self, peer_index: usize, _message: types::FilterLoad) {
@ -227,14 +227,14 @@ mod tests {
}
impl OutboundSyncConnection for DummyOutboundSyncConnection {
fn send_inventory(&self, _message: &types::Inv) {}
fn send_inventory(&self, _message: &types::Inv, _id: u32, _is_final: bool) {}
fn send_getdata(&self, _message: &types::GetData) {}
fn send_getblocks(&self, _message: &types::GetBlocks) {}
fn send_getheaders(&self, _message: &types::GetHeaders) {}
fn send_transaction(&self, _message: &types::Tx) {}
fn send_block(&self, _message: &types::Block) {}
fn send_headers(&self, _message: &types::Headers) {}
fn send_mempool(&self, _message: &types::MemPool) {}
fn send_block(&self, _message: &types::Block, _id: u32, _is_final: bool) {}
fn send_headers(&self, _message: &types::Headers, _id: u32, _is_final: bool) {}
fn send_mempool(&self, _message: &types::MemPool, _id: u32, _is_final: bool) {}
fn send_filterload(&self, _message: &types::FilterLoad) {}
fn send_filteradd(&self, _message: &types::FilterAdd) {}
fn send_filterclear(&self, _message: &types::FilterClear) {}
@ -245,7 +245,7 @@ mod tests {
fn send_compact_block(&self, _message: &types::CompactBlock) {}
fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {}
fn send_block_txn(&self, _message: &types::BlockTxn) {}
fn send_notfound(&self, _message: &types::NotFound) {}
fn send_notfound(&self, _message: &types::NotFound, _id: u32, _is_final: bool) {}
}
fn create_local_node() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor>>) {
@ -283,9 +283,10 @@ mod tests {
hash: genesis_block_hash.clone(),
}
];
let dummy_id = 0;
local_node.on_peer_getdata(peer_index, types::GetData {
inventory: inventory.clone()
});
}, dummy_id);
// => `getdata` is served
let tasks = server.take_tasks();
assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]);

View File

@ -7,6 +7,7 @@ use message::types;
use primitives::hash::H256;
use p2p::OutboundSyncConnectionRef;
use synchronization_chain::ChainRef;
use synchronization_server::ServerTaskIndex;
use local_node::PeersConnections;
pub type LocalSynchronizationTaskExecutorRef = Arc<Mutex<LocalSynchronizationTaskExecutor>>;
@ -24,13 +25,13 @@ pub enum Task {
/// Request blocks headers using full getheaders.block_locator_hashes.
RequestBlocksHeaders(usize),
/// Send block.
SendBlock(usize, Block),
SendBlock(usize, Block, ServerTaskIndex),
/// Send notfound
SendNotFound(usize, Vec<InventoryVector>),
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
/// Send inventory
SendInventory(usize, Vec<InventoryVector>),
SendInventory(usize, Vec<InventoryVector>, ServerTaskIndex),
/// Send headers
SendHeaders(usize, Vec<BlockHeader>),
SendHeaders(usize, Vec<BlockHeader>, ServerTaskIndex),
}
/// Synchronization tasks executor
@ -94,7 +95,7 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_getheaders(&getheaders);
}
},
Task::SendBlock(peer_index, block) => {
Task::SendBlock(peer_index, block, id) => {
let block_message = types::Block {
block: block,
};
@ -102,10 +103,10 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index);
connection.send_block(&block_message);
connection.send_block(&block_message, id.raw(), id.is_final());
}
},
Task::SendNotFound(peer_index, unknown_inventory) => {
Task::SendNotFound(peer_index, unknown_inventory, id) => {
let notfound = types::NotFound {
inventory: unknown_inventory,
};
@ -113,10 +114,10 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len());
connection.send_notfound(&notfound);
connection.send_notfound(&notfound, id.raw(), id.is_final());
}
},
Task::SendInventory(peer_index, inventory) => {
Task::SendInventory(peer_index, inventory, id) => {
let inventory = types::Inv {
inventory: inventory,
};
@ -124,10 +125,10 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len());
connection.send_inventory(&inventory);
connection.send_inventory(&inventory, id.raw(), id.is_final());
}
},
Task::SendHeaders(peer_index, headers) => {
Task::SendHeaders(peer_index, headers, id) => {
let headers = types::Headers {
headers: headers,
};
@ -135,7 +136,7 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending headers to peer#{} with {} items", peer_index, headers.headers.len());
connection.send_headers(&headers);
connection.send_headers(&headers, id.raw(), id.is_final());
}
},
}
@ -194,4 +195,4 @@ pub mod tests {
self.waiter.notify_one();
}
}
}
}

View File

@ -14,10 +14,10 @@ use message::types;
/// Synchronization requests server trait
pub trait Server : Send + 'static {
fn serve_getdata(&self, peer_index: usize, message: types::GetData);
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks);
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders);
fn serve_mempool(&self, peer_index: usize);
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32);
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32);
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32);
fn serve_mempool(&self, peer_index: usize, id: u32);
fn wait_peer_requests_completed(&self, peer_index: usize);
}
@ -42,10 +42,53 @@ struct ServerQueue {
is_stopping: AtomicBool,
queue_ready: Arc<Condvar>,
peers_queue: VecDeque<usize>,
tasks_queue: HashMap<usize, VecDeque<ServerTask>>,
tasks_queue: HashMap<usize, VecDeque<IndexedServerTask>>,
peer_waiters: HashMap<usize, Arc<PeerRequestsWaiter>>,
}
/// `ServerTask` index.
#[derive(Debug, PartialEq)]
pub enum ServerTaskIndex {
/// `Partial` is used when server needs to send more than one response for request.
Partial(u32),
/// `Final` task task can be preceded by many `Partial` tasks with the same id.
Final(u32),
}
impl ServerTaskIndex {
pub fn raw(&self) -> u32 {
match *self {
ServerTaskIndex::Partial(id) => id,
ServerTaskIndex::Final(id) => id,
}
}
pub fn is_final(&self) -> bool {
match *self {
ServerTaskIndex::Partial(_) => false,
ServerTaskIndex::Final(_) => true,
}
}
}
/// Server tests together with unique id assigned to it
#[derive(Debug, PartialEq)]
pub struct IndexedServerTask {
/// Task itself.
task: ServerTask,
/// Task id.
id: ServerTaskIndex,
}
impl IndexedServerTask {
fn new(task: ServerTask, id: ServerTaskIndex) -> Self {
IndexedServerTask {
task: task,
id: id,
}
}
}
#[derive(Debug, PartialEq)]
pub enum ServerTask {
ServeGetData(Vec<InventoryVector>),
@ -97,11 +140,18 @@ impl SynchronizationServer {
})
};
match server_task {
// `getdata` => `notfound` + `block` + ...
Some((peer_index, ServerTask::ServeGetData(inventory))) => {
let (peer_index, indexed_task) = match server_task {
Some((peer_index, indexed_task)) => (peer_index, indexed_task),
// no tasks after wake-up => stopping or pausing
_ => continue,
};
match indexed_task.task {
// `getdata` => `notfound` + `block` + ...
ServerTask::ServeGetData(inventory) => {
let mut unknown_items: Vec<InventoryVector> = Vec::new();
let mut new_tasks: Vec<ServerTask> = Vec::new();
let mut new_tasks: Vec<IndexedServerTask> = Vec::new();
let task_id = indexed_task.id.raw();
{
let chain = chain.read();
let storage = chain.storage();
@ -109,7 +159,10 @@ impl SynchronizationServer {
match item.inv_type {
InventoryType::MessageBlock => {
match storage.block_number(&item.hash) {
Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())),
Some(_) => {
let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::Partial(task_id));
new_tasks.push(task);
},
None => unknown_items.push(item),
}
},
@ -120,18 +173,23 @@ impl SynchronizationServer {
// respond with `notfound` message for unknown data
if !unknown_items.is_empty() {
trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index);
new_tasks.push(ServerTask::ReturnNotFound(unknown_items));
let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::Partial(task_id));
new_tasks.push(task);
}
// schedule data responses
if !new_tasks.is_empty() {
trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index);
// mark last task as the final one
if let Some(task) = new_tasks.last_mut() {
task.id = ServerTaskIndex::Final(task_id);
}
queue.lock().add_tasks(peer_index, new_tasks);
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `getblocks` => `inventory`
Some((peer_index, ServerTask::ServeGetBlocks(best_block, hash_stop))) => {
ServerTask::ServeGetBlocks(best_block, hash_stop) => {
let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 500);
if !blocks_hashes.is_empty() {
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index);
@ -139,25 +197,25 @@ impl SynchronizationServer {
inv_type: InventoryType::MessageBlock,
hash: hash,
}).collect();
executor.lock().execute(Task::SendInventory(peer_index, inventory));
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `getheaders` => `headers`
Some((peer_index, ServerTask::ServeGetHeaders(best_block, hash_stop))) => {
ServerTask::ServeGetHeaders(best_block, hash_stop) => {
// What if we have no common blocks with peer at all? Maybe drop connection or penalize peer?
// https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568
let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_block, &hash_stop, 2000);
if !blocks_headers.is_empty() {
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers));
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `mempool` => `inventory`
Some((peer_index, ServerTask::ServeMempool)) => {
ServerTask::ServeMempool => {
let inventory: Vec<_> = chain.read()
.memory_pool()
.get_transactions_ids()
@ -169,27 +227,25 @@ impl SynchronizationServer {
.collect();
if !inventory.is_empty() {
trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index);
executor.lock().execute(Task::SendInventory(peer_index, inventory));
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `notfound`
Some((peer_index, ServerTask::ReturnNotFound(inventory))) => {
executor.lock().execute(Task::SendNotFound(peer_index, inventory));
ServerTask::ReturnNotFound(inventory) => {
executor.lock().execute(Task::SendNotFound(peer_index, inventory, indexed_task.id));
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `block`
Some((peer_index, ServerTask::ReturnBlock(block_hash))) => {
ServerTask::ReturnBlock(block_hash) => {
let block = chain.read().storage().block(db::BlockRef::Hash(block_hash))
.expect("we have checked that block exists in ServeGetData; db is append-only; qed");
executor.lock().execute(Task::SendBlock(peer_index, block));
executor.lock().execute(Task::SendBlock(peer_index, block, indexed_task.id));
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// no tasks after wake-up => stopping or pausing
None => (),
}
}
}
@ -273,32 +329,36 @@ impl Drop for SynchronizationServer {
}
impl Server for SynchronizationServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
self.queue.lock().add_task(peer_index, ServerTask::ServeGetData(message.inventory));
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
self.queue.lock().add_task(peer_index, ServerTask::ServeGetBlocks(best_common_block, message.hash_stop));
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
else {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
}
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
self.queue.lock().add_task(peer_index, ServerTask::ServeGetHeaders(best_common_block, message.hash_stop));
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
else {
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
}
}
fn serve_mempool(&self, peer_index: usize) {
self.queue.lock().add_task(peer_index, ServerTask::ServeMempool);
fn serve_mempool(&self, peer_index: usize, id: u32) {
let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::Final(id));
self.queue.lock().add_task(peer_index, task);
}
fn wait_peer_requests_completed(&self, peer_index: usize) {
@ -322,7 +382,7 @@ impl ServerQueue {
}
}
pub fn next_task(&mut self) -> Option<(usize, ServerTask)> {
pub fn next_task(&mut self) -> Option<(usize, IndexedServerTask)> {
self.peers_queue.pop_front()
.map(|peer| {
let (peer_task, no_tasks_left) = {
@ -353,7 +413,7 @@ impl ServerQueue {
}
}
pub fn add_task(&mut self, peer_index: usize, task: ServerTask) {
pub fn add_task(&mut self, peer_index: usize, task: IndexedServerTask) {
match self.tasks_queue.entry(peer_index) {
Entry::Occupied(mut entry) => {
let add_to_peers_queue = entry.get().is_empty();
@ -372,7 +432,7 @@ impl ServerQueue {
self.queue_ready.notify_one();
}
pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec<ServerTask>) {
pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec<IndexedServerTask>) {
match self.tasks_queue.entry(peer_index) {
Entry::Occupied(mut entry) => {
let add_to_peers_queue = entry.get().is_empty();
@ -442,7 +502,7 @@ pub mod tests {
use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_chain::Chain;
use super::{Server, ServerTask, SynchronizationServer};
use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex};
pub struct DummyServer {
tasks: Mutex<Vec<(usize, ServerTask)>>,
@ -461,25 +521,25 @@ pub mod tests {
}
impl Server for DummyServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
fn serve_getdata(&self, peer_index: usize, message: types::GetData, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory)));
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
}
fn serve_mempool(&self, peer_index: usize) {
fn serve_mempool(&self, peer_index: usize, _id: u32) {
self.tasks.lock().push((peer_index, ServerTask::ServeMempool));
}
@ -504,12 +564,13 @@ pub mod tests {
hash: H256::default(),
}
];
let dummy_id = 0;
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
});
}, dummy_id);
// => respond with notfound
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::Final(dummy_id))]);
}
#[test]
@ -522,12 +583,13 @@ pub mod tests {
hash: test_data::genesis().hash(),
}
];
let dummy_id = 0;
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
});
}, dummy_id);
// => respond with block
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis())]);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::Final(dummy_id))]);
}
#[test]
@ -535,11 +597,12 @@ pub mod tests {
let (_, executor, server) = create_synchronization_server();
// when asking for blocks hashes
let genesis_block_hash = test_data::genesis().hash();
let dummy_id = 0;
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
});
}, dummy_id);
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
@ -550,18 +613,19 @@ pub mod tests {
let (chain, executor, server) = create_synchronization_server();
chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error");
// when asking for blocks hashes
let dummy_id = 0;
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
});
}, dummy_id);
// => responds with inventory
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: test_data::block_h1().hash(),
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
}
#[test]
@ -569,11 +633,12 @@ pub mod tests {
let (_, executor, server) = create_synchronization_server();
// when asking for blocks hashes
let genesis_block_hash = test_data::genesis().hash();
let dummy_id = 0;
server.serve_getheaders(0, types::GetHeaders {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
});
}, dummy_id);
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
@ -584,24 +649,26 @@ pub mod tests {
let (chain, executor, server) = create_synchronization_server();
chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error");
// when asking for blocks hashes
let dummy_id = 0;
server.serve_getheaders(0, types::GetHeaders {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
});
}, dummy_id);
// => responds with headers
let headers = vec![
test_data::block_h1().block_header,
];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendHeaders(0, headers)]);
assert_eq!(tasks, vec![Task::SendHeaders(0, headers, ServerTaskIndex::Final(dummy_id))]);
}
#[test]
fn server_mempool_do_not_responds_inventory_when_empty_memory_pool() {
let (_, executor, server) = create_synchronization_server();
// when asking for memory pool transactions ids
server.serve_mempool(0);
let dummy_id = 0;
server.serve_mempool(0, dummy_id);
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![]);
@ -615,13 +682,14 @@ pub mod tests {
let transaction_hash = transaction.hash();
chain.write().memory_pool_mut().insert_verified(transaction);
// when asking for memory pool transactions ids
server.serve_mempool(0);
let dummy_id = 0;
server.serve_mempool(0, dummy_id);
// => respond with inventory
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageTx,
hash: transaction_hash,
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
}
}