Merge pull request #150 from ethcore/rely_on_p2p_sync

Rely on p2p sync
This commit is contained in:
Svyatoslav Nikolsky 2016-11-21 09:23:01 +03:00 committed by GitHub
commit 6fa7cfb77e
6 changed files with 112 additions and 172 deletions

View File

@ -18,13 +18,13 @@ pub trait InboundSyncConnection : Send + Sync {
fn start_sync_session(&self, version: u32);
fn close_session(&self);
fn on_inventory(&self, message: types::Inv);
fn on_getdata(&self, message: types::GetData, id: u32);
fn on_getblocks(&self, message: types::GetBlocks, id: u32);
fn on_getdata(&self, message: types::GetData);
fn on_getblocks(&self, message: types::GetBlocks);
fn on_getheaders(&self, message: types::GetHeaders, id: u32);
fn on_transaction(&self, message: types::Tx);
fn on_block(&self, message: types::Block);
fn on_headers(&self, message: types::Headers);
fn on_mempool(&self, message: types::MemPool, id: u32);
fn on_mempool(&self, message: types::MemPool);
fn on_filterload(&self, message: types::FilterLoad);
fn on_filteradd(&self, message: types::FilterAdd);
fn on_filterclear(&self, message: types::FilterClear);
@ -39,13 +39,14 @@ pub trait InboundSyncConnection : Send + Sync {
}
pub trait OutboundSyncConnection : Send + Sync {
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool);
fn send_inventory(&self, message: &types::Inv);
fn send_getdata(&self, message: &types::GetData);
fn send_getblocks(&self, message: &types::GetBlocks);
fn send_getheaders(&self, message: &types::GetHeaders);
fn send_transaction(&self, message: &types::Tx);
fn send_block(&self, message: &types::Block, id: u32, is_final: bool);
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool);
fn send_block(&self, message: &types::Block);
fn send_headers(&self, message: &types::Headers);
fn respond_headers(&self, message: &types::Headers, id: u32);
fn send_mempool(&self, message: &types::MemPool);
fn send_filterload(&self, message: &types::FilterLoad);
fn send_filteradd(&self, message: &types::FilterAdd);
@ -57,7 +58,7 @@ pub trait OutboundSyncConnection : Send + Sync {
fn send_compact_block(&self, message: &types::CompactBlock);
fn send_get_block_txn(&self, message: &types::GetBlockTxn);
fn send_block_txn(&self, message: &types::BlockTxn);
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool);
fn send_notfound(&self, message: &types::NotFound);
fn ignored(&self, id: u32);
}
@ -78,8 +79,8 @@ impl OutboundSync {
}
impl OutboundSyncConnection for OutboundSync {
fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
fn send_inventory(&self, message: &types::Inv) {
self.context.send_request(message);
}
fn send_getdata(&self, message: &types::GetData) {
@ -98,12 +99,16 @@ impl OutboundSyncConnection for OutboundSync {
self.context.send_request(message);
}
fn send_block(&self, message: &types::Block, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
fn send_block(&self, message: &types::Block) {
self.context.send_request(message);
}
fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
fn send_headers(&self, message: &types::Headers) {
self.context.send_request(message);
}
fn respond_headers(&self, message: &types::Headers, id: u32) {
self.context.send_response(message, id, true);
}
fn send_mempool(&self, message: &types::MemPool) {
@ -150,8 +155,8 @@ impl OutboundSyncConnection for OutboundSync {
self.context.send_request(message);
}
fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool) {
self.context.send_response(message, id, is_final);
fn send_notfound(&self, message: &types::NotFound) {
self.context.send_request(message);
}
fn ignored(&self, id: u32) {
@ -188,15 +193,11 @@ impl Protocol for SyncProtocol {
}
else if command == &types::GetData::command() {
let message: types::GetData = try!(deserialize_payload(payload, version));
let id = self.context.declare_response();
trace!("declared response {} for request: {}", id, types::GetData::command());
self.inbound_connection.on_getdata(message, id);
self.inbound_connection.on_getdata(message);
}
else if command == &types::GetBlocks::command() {
let message: types::GetBlocks = try!(deserialize_payload(payload, version));
let id = self.context.declare_response();
trace!("declared response {} for request: {}", id, types::GetBlocks::command());
self.inbound_connection.on_getblocks(message, id);
self.inbound_connection.on_getblocks(message);
}
else if command == &types::GetHeaders::command() {
let message: types::GetHeaders = try!(deserialize_payload(payload, version));
@ -214,9 +215,7 @@ impl Protocol for SyncProtocol {
}
else if command == &types::MemPool::command() {
let message: types::MemPool = try!(deserialize_payload(payload, version));
let id = self.context.declare_response();
trace!("declared response {} for request: {}", id, types::MemPool::command());
self.inbound_connection.on_mempool(message, id);
self.inbound_connection.on_mempool(message);
}
else if command == &types::Headers::command() {
let message: types::Headers = try!(deserialize_payload(payload, version));

View File

@ -29,12 +29,12 @@ impl InboundSyncConnection for InboundConnection {
self.local_node.on_peer_inventory(self.peer_index, message);
}
fn on_getdata(&self, message: types::GetData, id: u32) {
self.local_node.on_peer_getdata(self.peer_index, message, id);
fn on_getdata(&self, message: types::GetData) {
self.local_node.on_peer_getdata(self.peer_index, message);
}
fn on_getblocks(&self, message: types::GetBlocks, id: u32) {
self.local_node.on_peer_getblocks(self.peer_index, message, id);
fn on_getblocks(&self, message: types::GetBlocks) {
self.local_node.on_peer_getblocks(self.peer_index, message);
}
fn on_getheaders(&self, message: types::GetHeaders, id: u32) {
@ -53,8 +53,8 @@ impl InboundSyncConnection for InboundConnection {
self.local_node.on_peer_headers(self.peer_index, message);
}
fn on_mempool(&self, message: types::MemPool, id: u32) {
self.local_node.on_peer_mempool(self.peer_index, message, id);
fn on_mempool(&self, message: types::MemPool) {
self.local_node.on_peer_mempool(self.peer_index, message);
}
fn on_filterload(&self, message: types::FilterLoad) {

View File

@ -95,16 +95,16 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
// TODO: process other inventory types
}
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
self.server.serve_getdata(peer_index, message, id);
self.server.serve_getdata(peer_index, message);
}
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index);
self.server.serve_getblocks(peer_index, message, id);
self.server.serve_getblocks(peer_index, message);
}
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
@ -127,10 +127,15 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
need_wait
};
// if we do not need synchronized responses => inform p2p that we have processed this request
let id = if !need_wait {
self.executor.lock().execute(SynchronizationTask::Ignore(peer_index, id));
None
} else {
Some(id)
};
self.server.serve_getheaders(peer_index, message, id);
if need_wait {
self.server.wait_peer_requests_completed(peer_index);
}
}
pub fn on_peer_transaction(&self, peer_index: usize, message: types::Tx) {
@ -155,10 +160,10 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
}
}
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool, id: u32) {
pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool) {
trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index);
self.server.serve_mempool(peer_index, id);
self.server.serve_mempool(peer_index);
}
pub fn on_peer_filterload(&self, peer_index: usize, _message: types::FilterLoad) {
@ -252,13 +257,14 @@ mod tests {
}
impl OutboundSyncConnection for DummyOutboundSyncConnection {
fn send_inventory(&self, _message: &types::Inv, _id: u32, _is_final: bool) {}
fn send_inventory(&self, _message: &types::Inv) {}
fn send_getdata(&self, _message: &types::GetData) {}
fn send_getblocks(&self, _message: &types::GetBlocks) {}
fn send_getheaders(&self, _message: &types::GetHeaders) {}
fn send_transaction(&self, _message: &types::Tx) {}
fn send_block(&self, _message: &types::Block, _id: u32, _is_final: bool) {}
fn send_headers(&self, _message: &types::Headers, _id: u32, _is_final: bool) {}
fn send_block(&self, _message: &types::Block) {}
fn send_headers(&self, _message: &types::Headers) {}
fn respond_headers(&self, _message: &types::Headers, _id: u32) {}
fn send_mempool(&self, _message: &types::MemPool) {}
fn send_filterload(&self, _message: &types::FilterLoad) {}
fn send_filteradd(&self, _message: &types::FilterAdd) {}
@ -270,7 +276,7 @@ mod tests {
fn send_compact_block(&self, _message: &types::CompactBlock) {}
fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {}
fn send_block_txn(&self, _message: &types::BlockTxn) {}
fn send_notfound(&self, _message: &types::NotFound, _id: u32, _is_final: bool) {}
fn send_notfound(&self, _message: &types::NotFound) {}
fn ignored(&self, _id: u32) {}
}
@ -312,10 +318,9 @@ mod tests {
hash: genesis_block_hash.clone(),
}
];
let dummy_id = 0;
local_node.on_peer_getdata(peer_index, types::GetData {
inventory: inventory.clone()
}, dummy_id);
});
// => `getdata` is served
let tasks = server.take_tasks();
assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]);

View File

@ -1770,7 +1770,7 @@ pub mod tests {
let b22 = test_data::block_builder().header().parent(b21.hash()).build().build();
let b23 = test_data::block_builder().header().parent(b22.hash()).build().build();
// TODO: simulate verification during b21 verification
// simulate verification during b21 verification
let mut dummy_verifier = DummyVerifier::default();
dummy_verifier.error_when_verifying(b21.hash(), "simulated");

View File

@ -127,8 +127,9 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
assert_eq!(id.raw(), None);
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index);
connection.send_block(&block_message, id.raw(), id.is_final());
connection.send_block(&block_message);
}
},
Task::SendNotFound(peer_index, unknown_inventory, id) => {
@ -137,8 +138,9 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
assert_eq!(id.raw(), None);
trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len());
connection.send_notfound(&notfound, id.raw(), id.is_final());
connection.send_notfound(&notfound);
}
},
Task::SendInventory(peer_index, inventory, id) => {
@ -147,8 +149,9 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
assert_eq!(id.raw(), None);
trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len());
connection.send_inventory(&inventory, id.raw(), id.is_final());
connection.send_inventory(&inventory);
}
},
Task::SendHeaders(peer_index, headers, id) => {
@ -158,7 +161,10 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Sending headers to peer#{} with {} items", peer_index, headers.headers.len());
connection.send_headers(&headers, id.raw(), id.is_final());
match id.raw() {
Some(id) => connection.respond_headers(&headers, id),
None => connection.send_headers(&headers),
}
}
},
Task::Ignore(peer_index, id) => {

View File

@ -14,20 +14,10 @@ use message::types;
/// Synchronization requests server trait
pub trait Server : Send + 'static {
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32);
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32);
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32);
fn serve_mempool(&self, peer_index: usize, id: u32);
fn wait_peer_requests_completed(&self, peer_index: usize);
}
/// Peer requests waiter
#[derive(Default)]
pub struct PeerRequestsWaiter {
/// Awake mutex
peer_requests_lock: Mutex<bool>,
/// Awake event
peer_requests_done: Condvar,
fn serve_getdata(&self, peer_index: usize, message: types::GetData);
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks);
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>);
fn serve_mempool(&self, peer_index: usize);
}
/// Synchronization requests server
@ -43,29 +33,32 @@ struct ServerQueue {
queue_ready: Arc<Condvar>,
peers_queue: VecDeque<usize>,
tasks_queue: HashMap<usize, VecDeque<IndexedServerTask>>,
peer_waiters: HashMap<usize, Arc<PeerRequestsWaiter>>,
}
/// `ServerTask` index.
#[derive(Debug, PartialEq)]
pub enum ServerTaskIndex {
/// `None` is used when response is sent out-of-order
None,
/// `Partial` is used when server needs to send more than one response for request.
Partial(u32),
_Partial(u32),
/// `Final` task task can be preceded by many `Partial` tasks with the same id.
Final(u32),
}
impl ServerTaskIndex {
pub fn raw(&self) -> u32 {
pub fn raw(&self) -> Option<u32> {
match *self {
ServerTaskIndex::Partial(id) | ServerTaskIndex::Final(id) => id,
ServerTaskIndex::None => None,
ServerTaskIndex::_Partial(id) | ServerTaskIndex::Final(id) => Some(id),
}
}
pub fn is_final(&self) -> bool {
pub fn _is_final(&self) -> bool {
match *self {
ServerTaskIndex::Partial(_) => false,
ServerTaskIndex::_Partial(_) => false,
ServerTaskIndex::Final(_) => true,
ServerTaskIndex::None => panic!("check with raw() before"),
}
}
}
@ -157,7 +150,7 @@ impl SynchronizationServer {
ServerTask::ServeGetData(inventory) => {
let mut unknown_items: Vec<InventoryVector> = Vec::new();
let mut new_tasks: Vec<IndexedServerTask> = Vec::new();
let task_id = indexed_task.id.raw();
assert_eq!(indexed_task.id.raw(), None);
{
let chain = chain.read();
let storage = chain.storage();
@ -166,7 +159,7 @@ impl SynchronizationServer {
InventoryType::MessageBlock => {
match storage.block_number(&item.hash) {
Some(_) => {
let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::Partial(task_id));
let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::None);
new_tasks.push(task);
},
None => unknown_items.push(item),
@ -179,19 +172,13 @@ impl SynchronizationServer {
// respond with `notfound` message for unknown data
if !unknown_items.is_empty() {
trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index);
let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::Partial(task_id));
let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::None);
new_tasks.push(task);
}
// schedule data responses
if !new_tasks.is_empty() {
trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index);
// mark last task as the final one
if let Some(task) = new_tasks.last_mut() {
task.id = ServerTaskIndex::Final(task_id);
}
queue.lock().add_tasks(peer_index, new_tasks);
} else {
executor.lock().execute(Task::Ignore(peer_index, task_id));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
@ -207,7 +194,7 @@ impl SynchronizationServer {
}).collect();
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
} else {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
assert_eq!(indexed_task.id, ServerTaskIndex::None);
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
@ -220,8 +207,8 @@ impl SynchronizationServer {
if !blocks_headers.is_empty() {
trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index);
executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id));
} else {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
} else if let Some(response_id) = indexed_task.id.raw() {
executor.lock().execute(Task::Ignore(peer_index, response_id));
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
@ -240,7 +227,7 @@ impl SynchronizationServer {
trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index);
executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id));
} else {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
assert_eq!(indexed_task.id, ServerTaskIndex::None);
}
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
@ -261,7 +248,8 @@ impl SynchronizationServer {
},
// ignore
ServerTask::Ignore => {
executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw()));
let response_id = indexed_task.id.raw().expect("do not schedule redundant ignore task");
executor.lock().execute(Task::Ignore(peer_index, response_id));
queue.lock().task_processed(peer_index);
},
}
@ -347,48 +335,41 @@ impl Drop for SynchronizationServer {
}
impl Server for SynchronizationServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32) {
let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::Final(id));
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::None);
self.queue.lock().add_task(peer_index, task);
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) {
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::None);
self.queue.lock().add_task(peer_index, task);
}
else {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
}
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) {
if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) {
trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str());
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), ServerTaskIndex::Final(id));
let server_task_index = id.map_or_else(|| ServerTaskIndex::None, |id| ServerTaskIndex::Final(id));
let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), server_task_index);
self.queue.lock().add_task(peer_index, task);
}
else {
trace!(target: "sync", "No common blocks headers with peer#{}", peer_index);
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
if let Some(id) = id {
self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id));
}
}
}
fn serve_mempool(&self, peer_index: usize, id: u32) {
let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::Final(id));
fn serve_mempool(&self, peer_index: usize) {
let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::None);
self.queue.lock().add_task(peer_index, task);
}
fn wait_peer_requests_completed(&self, peer_index: usize) {
if let Some(waiter) = {
let mut queue = self.queue.lock();
queue.get_peer_requests_waiter(peer_index)
} {
waiter.wait();
}
}
}
impl ServerQueue {
@ -398,7 +379,6 @@ impl ServerQueue {
queue_ready: queue_ready,
peers_queue: VecDeque::new(),
tasks_queue: HashMap::new(),
peer_waiters: HashMap::new(),
}
}
@ -425,11 +405,6 @@ impl ServerQueue {
return;
}
tasks_entry.remove_entry();
if let Entry::Occupied(entry) = self.peer_waiters.entry(peer_index) {
entry.get().awake();
entry.remove_entry();
}
}
}
@ -470,42 +445,6 @@ impl ServerQueue {
}
self.queue_ready.notify_one();
}
pub fn get_peer_requests_waiter(&mut self, peer_index: usize) -> Option<Arc<PeerRequestsWaiter>> {
match self.peer_waiters.entry(peer_index) {
Entry::Vacant(entry) => {
// there are no pending tasks for this peer
if !self.tasks_queue.contains_key(&peer_index) {
return None;
}
// there are tasks => wait for completion
let waiter = Arc::new(PeerRequestsWaiter::default());
entry.insert(waiter.clone());
Some(waiter)
},
Entry::Occupied(entry) => {
Some(entry.get().clone())
},
}
}
}
impl PeerRequestsWaiter {
pub fn wait(&self) {
let mut locker = self.peer_requests_lock.lock();
if *locker {
return;
}
self.peer_requests_done.wait(&mut locker);
}
pub fn awake(&self) {
let mut locker = self.peer_requests_lock.lock();
*locker = true;
self.peer_requests_done.notify_all();
}
}
#[cfg(test)]
@ -541,30 +480,27 @@ pub mod tests {
}
impl Server for DummyServer {
fn serve_getdata(&self, peer_index: usize, message: types::GetData, _id: u32) {
fn serve_getdata(&self, peer_index: usize, message: types::GetData) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory)));
}
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, _id: u32) {
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
}
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: u32) {
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: Option<u32>) {
self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
}
fn serve_mempool(&self, peer_index: usize, _id: u32) {
fn serve_mempool(&self, peer_index: usize) {
self.tasks.lock().push((peer_index, ServerTask::ServeMempool));
}
fn wait_peer_requests_completed(&self, _peer_index: usize) {
}
}
fn create_synchronization_server() -> (Arc<RwLock<Chain>>, Arc<Mutex<DummyTaskExecutor>>, SynchronizationServer) {
@ -584,13 +520,12 @@ pub mod tests {
hash: H256::default(),
}
];
let dummy_id = 0;
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
}, dummy_id);
});
// => respond with notfound
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::Final(dummy_id))]);
assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]);
}
#[test]
@ -603,13 +538,12 @@ pub mod tests {
hash: test_data::genesis().hash(),
}
];
let dummy_id = 0;
server.serve_getdata(0, types::GetData {
inventory: inventory.clone(),
}, dummy_id);
});
// => respond with block
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::Final(dummy_id))]);
assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]);
}
#[test]
@ -617,15 +551,14 @@ pub mod tests {
let (_, executor, server) = create_synchronization_server();
// when asking for blocks hashes
let genesis_block_hash = test_data::genesis().hash();
let dummy_id = 5;
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
}, dummy_id);
});
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
assert_eq!(tasks, vec![]);
}
#[test]
@ -633,19 +566,18 @@ pub mod tests {
let (chain, executor, server) = create_synchronization_server();
chain.write().insert_best_block(test_data::block_h1().hash(), &test_data::block_h1()).expect("Db write error");
// when asking for blocks hashes
let dummy_id = 0;
server.serve_getblocks(0, types::GetBlocks {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
}, dummy_id);
});
// => responds with inventory
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: test_data::block_h1().hash(),
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]);
}
#[test]
@ -658,7 +590,7 @@ pub mod tests {
version: 0,
block_locator_hashes: vec![genesis_block_hash.clone()],
hash_stop: H256::default(),
}, dummy_id);
}, Some(dummy_id));
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
@ -674,7 +606,7 @@ pub mod tests {
version: 0,
block_locator_hashes: vec![test_data::genesis().hash()],
hash_stop: H256::default(),
}, dummy_id);
}, Some(dummy_id));
// => responds with headers
let headers = vec![
test_data::block_h1().block_header,
@ -687,11 +619,10 @@ pub mod tests {
fn server_mempool_do_not_responds_inventory_when_empty_memory_pool() {
let (_, executor, server) = create_synchronization_server();
// when asking for memory pool transactions ids
let dummy_id = 9;
server.serve_mempool(0, dummy_id);
server.serve_mempool(0);
// => no response
let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout
assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]);
assert_eq!(tasks, vec![]);
}
#[test]
@ -702,14 +633,13 @@ pub mod tests {
let transaction_hash = transaction.hash();
chain.write().insert_verified_transaction(transaction);
// when asking for memory pool transactions ids
let dummy_id = 0;
server.serve_mempool(0, dummy_id);
server.serve_mempool(0);
// => respond with inventory
let inventory = vec![InventoryVector {
inv_type: InventoryType::MessageTx,
hash: transaction_hash,
}];
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]);
}
}