From 964ac87135dcff04838ccfd25fa6261829a3c178 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 18 Nov 2016 16:06:14 +0300 Subject: [PATCH] getting rid of sync waits --- p2p/src/protocol/sync.rs | 47 ++++---- sync/src/inbound_connection.rs | 12 +- sync/src/local_node.rs | 35 +++--- sync/src/synchronization_executor.rs | 14 ++- sync/src/synchronization_server.rs | 174 ++++++++------------------- 5 files changed, 111 insertions(+), 171 deletions(-) diff --git a/p2p/src/protocol/sync.rs b/p2p/src/protocol/sync.rs index fc92f9ab..f3b5f701 100644 --- a/p2p/src/protocol/sync.rs +++ b/p2p/src/protocol/sync.rs @@ -18,13 +18,13 @@ pub trait InboundSyncConnection : Send + Sync { fn start_sync_session(&self, version: u32); fn close_session(&self); fn on_inventory(&self, message: types::Inv); - fn on_getdata(&self, message: types::GetData, id: u32); - fn on_getblocks(&self, message: types::GetBlocks, id: u32); + fn on_getdata(&self, message: types::GetData); + fn on_getblocks(&self, message: types::GetBlocks); fn on_getheaders(&self, message: types::GetHeaders, id: u32); fn on_transaction(&self, message: types::Tx); fn on_block(&self, message: types::Block); fn on_headers(&self, message: types::Headers); - fn on_mempool(&self, message: types::MemPool, id: u32); + fn on_mempool(&self, message: types::MemPool); fn on_filterload(&self, message: types::FilterLoad); fn on_filteradd(&self, message: types::FilterAdd); fn on_filterclear(&self, message: types::FilterClear); @@ -39,13 +39,14 @@ pub trait InboundSyncConnection : Send + Sync { } pub trait OutboundSyncConnection : Send + Sync { - fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool); + fn send_inventory(&self, message: &types::Inv); fn send_getdata(&self, message: &types::GetData); fn send_getblocks(&self, message: &types::GetBlocks); fn send_getheaders(&self, message: &types::GetHeaders); fn send_transaction(&self, message: &types::Tx); - fn send_block(&self, message: &types::Block, id: u32, is_final: bool); - fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool); + fn send_block(&self, message: &types::Block); + fn send_headers(&self, message: &types::Headers); + fn respond_headers(&self, message: &types::Headers, id: u32); fn send_mempool(&self, message: &types::MemPool); fn send_filterload(&self, message: &types::FilterLoad); fn send_filteradd(&self, message: &types::FilterAdd); @@ -57,7 +58,7 @@ pub trait OutboundSyncConnection : Send + Sync { fn send_compact_block(&self, message: &types::CompactBlock); fn send_get_block_txn(&self, message: &types::GetBlockTxn); fn send_block_txn(&self, message: &types::BlockTxn); - fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool); + fn send_notfound(&self, message: &types::NotFound); fn ignored(&self, id: u32); } @@ -78,8 +79,8 @@ impl OutboundSync { } impl OutboundSyncConnection for OutboundSync { - fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool) { - self.context.send_response(message, id, is_final); + fn send_inventory(&self, message: &types::Inv) { + self.context.send_request(message); } fn send_getdata(&self, message: &types::GetData) { @@ -98,12 +99,16 @@ impl OutboundSyncConnection for OutboundSync { self.context.send_request(message); } - fn send_block(&self, message: &types::Block, id: u32, is_final: bool) { - self.context.send_response(message, id, is_final); + fn send_block(&self, message: &types::Block) { + self.context.send_request(message); } - fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool) { - self.context.send_response(message, id, is_final); + fn send_headers(&self, message: &types::Headers) { + self.context.send_request(message); + } + + fn respond_headers(&self, message: &types::Headers, id: u32) { + self.context.send_response(message, id, true); } fn send_mempool(&self, message: &types::MemPool) { @@ -150,8 +155,8 @@ impl OutboundSyncConnection for OutboundSync { self.context.send_request(message); } - fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool) { - self.context.send_response(message, id, is_final); + fn send_notfound(&self, message: &types::NotFound) { + self.context.send_request(message); } fn ignored(&self, id: u32) { @@ -188,15 +193,11 @@ impl Protocol for SyncProtocol { } else if command == &types::GetData::command() { let message: types::GetData = try!(deserialize_payload(payload, version)); - let id = self.context.declare_response(); - trace!("declared response {} for request: {}", id, types::GetData::command()); - self.inbound_connection.on_getdata(message, id); + self.inbound_connection.on_getdata(message); } else if command == &types::GetBlocks::command() { let message: types::GetBlocks = try!(deserialize_payload(payload, version)); - let id = self.context.declare_response(); - trace!("declared response {} for request: {}", id, types::GetBlocks::command()); - self.inbound_connection.on_getblocks(message, id); + self.inbound_connection.on_getblocks(message); } else if command == &types::GetHeaders::command() { let message: types::GetHeaders = try!(deserialize_payload(payload, version)); @@ -214,9 +215,7 @@ impl Protocol for SyncProtocol { } else if command == &types::MemPool::command() { let message: types::MemPool = try!(deserialize_payload(payload, version)); - let id = self.context.declare_response(); - trace!("declared response {} for request: {}", id, types::MemPool::command()); - self.inbound_connection.on_mempool(message, id); + self.inbound_connection.on_mempool(message); } else if command == &types::Headers::command() { let message: types::Headers = try!(deserialize_payload(payload, version)); diff --git a/sync/src/inbound_connection.rs b/sync/src/inbound_connection.rs index 4f79ecc6..228cb0ab 100644 --- a/sync/src/inbound_connection.rs +++ b/sync/src/inbound_connection.rs @@ -29,12 +29,12 @@ impl InboundSyncConnection for InboundConnection { self.local_node.on_peer_inventory(self.peer_index, message); } - fn on_getdata(&self, message: types::GetData, id: u32) { - self.local_node.on_peer_getdata(self.peer_index, message, id); + fn on_getdata(&self, message: types::GetData) { + self.local_node.on_peer_getdata(self.peer_index, message); } - fn on_getblocks(&self, message: types::GetBlocks, id: u32) { - self.local_node.on_peer_getblocks(self.peer_index, message, id); + fn on_getblocks(&self, message: types::GetBlocks) { + self.local_node.on_peer_getblocks(self.peer_index, message); } fn on_getheaders(&self, message: types::GetHeaders, id: u32) { @@ -53,8 +53,8 @@ impl InboundSyncConnection for InboundConnection { self.local_node.on_peer_headers(self.peer_index, message); } - fn on_mempool(&self, message: types::MemPool, id: u32) { - self.local_node.on_peer_mempool(self.peer_index, message, id); + fn on_mempool(&self, message: types::MemPool) { + self.local_node.on_peer_mempool(self.peer_index, message); } fn on_filterload(&self, message: types::FilterLoad) { diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 3d3f4f7d..b1114006 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -95,16 +95,16 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon // TODO: process other inventory types } - pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData, id: u32) { + pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) { trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index); - self.server.serve_getdata(peer_index, message, id); + self.server.serve_getdata(peer_index, message); } - pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) { + pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) { trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index); - self.server.serve_getblocks(peer_index, message, id); + self.server.serve_getblocks(peer_index, message); } pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) { @@ -127,10 +127,15 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon need_wait }; + // if we do not need synchronized responses => inform p2p that we have processed this request + let id = if !need_wait { + self.executor.lock().execute(SynchronizationTask::Ignore(peer_index, id)); + None + } else { + Some(id) + }; + self.server.serve_getheaders(peer_index, message, id); - if need_wait { - self.server.wait_peer_requests_completed(peer_index); - } } pub fn on_peer_transaction(&self, peer_index: usize, message: types::Tx) { @@ -155,10 +160,10 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon } } - pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool, id: u32) { + pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool) { trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index); - self.server.serve_mempool(peer_index, id); + self.server.serve_mempool(peer_index); } pub fn on_peer_filterload(&self, peer_index: usize, _message: types::FilterLoad) { @@ -252,13 +257,14 @@ mod tests { } impl OutboundSyncConnection for DummyOutboundSyncConnection { - fn send_inventory(&self, _message: &types::Inv, _id: u32, _is_final: bool) {} + fn send_inventory(&self, _message: &types::Inv) {} fn send_getdata(&self, _message: &types::GetData) {} fn send_getblocks(&self, _message: &types::GetBlocks) {} fn send_getheaders(&self, _message: &types::GetHeaders) {} fn send_transaction(&self, _message: &types::Tx) {} - fn send_block(&self, _message: &types::Block, _id: u32, _is_final: bool) {} - fn send_headers(&self, _message: &types::Headers, _id: u32, _is_final: bool) {} + fn send_block(&self, _message: &types::Block) {} + fn send_headers(&self, _message: &types::Headers) {} + fn respond_headers(&self, _message: &types::Headers, _id: u32) {} fn send_mempool(&self, _message: &types::MemPool) {} fn send_filterload(&self, _message: &types::FilterLoad) {} fn send_filteradd(&self, _message: &types::FilterAdd) {} @@ -270,7 +276,7 @@ mod tests { fn send_compact_block(&self, _message: &types::CompactBlock) {} fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {} fn send_block_txn(&self, _message: &types::BlockTxn) {} - fn send_notfound(&self, _message: &types::NotFound, _id: u32, _is_final: bool) {} + fn send_notfound(&self, _message: &types::NotFound) {} fn ignored(&self, _id: u32) {} } @@ -312,10 +318,9 @@ mod tests { hash: genesis_block_hash.clone(), } ]; - let dummy_id = 0; local_node.on_peer_getdata(peer_index, types::GetData { inventory: inventory.clone() - }, dummy_id); + }); // => `getdata` is served let tasks = server.take_tasks(); assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]); diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index fd248517..8a8f57ca 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -127,8 +127,9 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { }; if let Some(connection) = self.peers.get_mut(&peer_index) { + assert_eq!(id.raw(), None); trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index); - connection.send_block(&block_message, id.raw(), id.is_final()); + connection.send_block(&block_message); } }, Task::SendNotFound(peer_index, unknown_inventory, id) => { @@ -137,8 +138,9 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { }; if let Some(connection) = self.peers.get_mut(&peer_index) { + assert_eq!(id.raw(), None); trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len()); - connection.send_notfound(¬found, id.raw(), id.is_final()); + connection.send_notfound(¬found); } }, Task::SendInventory(peer_index, inventory, id) => { @@ -147,8 +149,9 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { }; if let Some(connection) = self.peers.get_mut(&peer_index) { + assert_eq!(id.raw(), None); trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len()); - connection.send_inventory(&inventory, id.raw(), id.is_final()); + connection.send_inventory(&inventory); } }, Task::SendHeaders(peer_index, headers, id) => { @@ -158,7 +161,10 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { if let Some(connection) = self.peers.get_mut(&peer_index) { trace!(target: "sync", "Sending headers to peer#{} with {} items", peer_index, headers.headers.len()); - connection.send_headers(&headers, id.raw(), id.is_final()); + match id.raw() { + Some(id) => connection.respond_headers(&headers, id), + None => connection.send_headers(&headers), + } } }, Task::Ignore(peer_index, id) => { diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index 4118354d..b935a595 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -14,20 +14,10 @@ use message::types; /// Synchronization requests server trait pub trait Server : Send + 'static { - fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32); - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32); - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32); - fn serve_mempool(&self, peer_index: usize, id: u32); - fn wait_peer_requests_completed(&self, peer_index: usize); -} - -/// Peer requests waiter -#[derive(Default)] -pub struct PeerRequestsWaiter { - /// Awake mutex - peer_requests_lock: Mutex, - /// Awake event - peer_requests_done: Condvar, + fn serve_getdata(&self, peer_index: usize, message: types::GetData); + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks); + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option); + fn serve_mempool(&self, peer_index: usize); } /// Synchronization requests server @@ -43,29 +33,32 @@ struct ServerQueue { queue_ready: Arc, peers_queue: VecDeque, tasks_queue: HashMap>, - peer_waiters: HashMap>, } /// `ServerTask` index. #[derive(Debug, PartialEq)] pub enum ServerTaskIndex { + /// `None` is used when response is sent out-of-order + None, /// `Partial` is used when server needs to send more than one response for request. - Partial(u32), + _Partial(u32), /// `Final` task task can be preceded by many `Partial` tasks with the same id. Final(u32), } impl ServerTaskIndex { - pub fn raw(&self) -> u32 { + pub fn raw(&self) -> Option { match *self { - ServerTaskIndex::Partial(id) | ServerTaskIndex::Final(id) => id, + ServerTaskIndex::None => None, + ServerTaskIndex::_Partial(id) | ServerTaskIndex::Final(id) => Some(id), } } - pub fn is_final(&self) -> bool { + pub fn _is_final(&self) -> bool { match *self { - ServerTaskIndex::Partial(_) => false, + ServerTaskIndex::_Partial(_) => false, ServerTaskIndex::Final(_) => true, + ServerTaskIndex::None => panic!("check with raw() before"), } } } @@ -157,7 +150,7 @@ impl SynchronizationServer { ServerTask::ServeGetData(inventory) => { let mut unknown_items: Vec = Vec::new(); let mut new_tasks: Vec = Vec::new(); - let task_id = indexed_task.id.raw(); + assert_eq!(indexed_task.id.raw(), None); { let chain = chain.read(); let storage = chain.storage(); @@ -166,7 +159,7 @@ impl SynchronizationServer { InventoryType::MessageBlock => { match storage.block_number(&item.hash) { Some(_) => { - let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::Partial(task_id)); + let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::None); new_tasks.push(task); }, None => unknown_items.push(item), @@ -179,19 +172,13 @@ impl SynchronizationServer { // respond with `notfound` message for unknown data if !unknown_items.is_empty() { trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index); - let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::Partial(task_id)); + let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::None); new_tasks.push(task); } // schedule data responses if !new_tasks.is_empty() { trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index); - // mark last task as the final one - if let Some(task) = new_tasks.last_mut() { - task.id = ServerTaskIndex::Final(task_id); - } queue.lock().add_tasks(peer_index, new_tasks); - } else { - executor.lock().execute(Task::Ignore(peer_index, task_id)); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); @@ -207,7 +194,7 @@ impl SynchronizationServer { }).collect(); executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id)); } else { - executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw())); + assert_eq!(indexed_task.id, ServerTaskIndex::None); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); @@ -220,8 +207,8 @@ impl SynchronizationServer { if !blocks_headers.is_empty() { trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index); executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id)); - } else { - executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw())); + } else if let Some(response_id) = indexed_task.id.raw() { + executor.lock().execute(Task::Ignore(peer_index, response_id)); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); @@ -240,7 +227,7 @@ impl SynchronizationServer { trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index); executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id)); } else { - executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw())); + assert_eq!(indexed_task.id, ServerTaskIndex::None); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); @@ -261,7 +248,8 @@ impl SynchronizationServer { }, // ignore ServerTask::Ignore => { - executor.lock().execute(Task::Ignore(peer_index, indexed_task.id.raw())); + let response_id = indexed_task.id.raw().expect("do not schedule redundant ignore task"); + executor.lock().execute(Task::Ignore(peer_index, response_id)); queue.lock().task_processed(peer_index); }, } @@ -347,48 +335,41 @@ impl Drop for SynchronizationServer { } impl Server for SynchronizationServer { - fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32) { - let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::Final(id)); + fn serve_getdata(&self, peer_index: usize, message: types::GetData) { + let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::None); self.queue.lock().add_task(peer_index, task); } - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) { + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) { if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) { trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); - let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::Final(id)); + let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::None); self.queue.lock().add_task(peer_index, task); } else { trace!(target: "sync", "No common blocks with peer#{}", peer_index); - self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id)); } } - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) { + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option) { if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) { trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash.to_reversed_str()); - let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), ServerTaskIndex::Final(id)); + let server_task_index = id.map_or_else(|| ServerTaskIndex::None, |id| ServerTaskIndex::Final(id)); + let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), server_task_index); self.queue.lock().add_task(peer_index, task); } else { trace!(target: "sync", "No common blocks headers with peer#{}", peer_index); - self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id)); + if let Some(id) = id { + self.queue.lock().add_task(peer_index, IndexedServerTask::ignore(id)); + } } } - fn serve_mempool(&self, peer_index: usize, id: u32) { - let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::Final(id)); + fn serve_mempool(&self, peer_index: usize) { + let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::None); self.queue.lock().add_task(peer_index, task); } - - fn wait_peer_requests_completed(&self, peer_index: usize) { - if let Some(waiter) = { - let mut queue = self.queue.lock(); - queue.get_peer_requests_waiter(peer_index) - } { - waiter.wait(); - } - } } impl ServerQueue { @@ -398,7 +379,6 @@ impl ServerQueue { queue_ready: queue_ready, peers_queue: VecDeque::new(), tasks_queue: HashMap::new(), - peer_waiters: HashMap::new(), } } @@ -425,11 +405,6 @@ impl ServerQueue { return; } tasks_entry.remove_entry(); - - if let Entry::Occupied(entry) = self.peer_waiters.entry(peer_index) { - entry.get().awake(); - entry.remove_entry(); - } } } @@ -470,42 +445,6 @@ impl ServerQueue { } self.queue_ready.notify_one(); } - - pub fn get_peer_requests_waiter(&mut self, peer_index: usize) -> Option> { - match self.peer_waiters.entry(peer_index) { - Entry::Vacant(entry) => { - // there are no pending tasks for this peer - if !self.tasks_queue.contains_key(&peer_index) { - return None; - } - - // there are tasks => wait for completion - let waiter = Arc::new(PeerRequestsWaiter::default()); - entry.insert(waiter.clone()); - Some(waiter) - }, - Entry::Occupied(entry) => { - Some(entry.get().clone()) - }, - } - } -} - -impl PeerRequestsWaiter { - pub fn wait(&self) { - let mut locker = self.peer_requests_lock.lock(); - if *locker { - return; - } - - self.peer_requests_done.wait(&mut locker); - } - - pub fn awake(&self) { - let mut locker = self.peer_requests_lock.lock(); - *locker = true; - self.peer_requests_done.notify_all(); - } } #[cfg(test)] @@ -541,30 +480,27 @@ pub mod tests { } impl Server for DummyServer { - fn serve_getdata(&self, peer_index: usize, message: types::GetData, _id: u32) { + fn serve_getdata(&self, peer_index: usize, message: types::GetData) { self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory))); } - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, _id: u32) { + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) { self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock { number: 0, hash: message.block_locator_hashes[0].clone(), }, message.hash_stop))); } - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: u32) { + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: Option) { self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock { number: 0, hash: message.block_locator_hashes[0].clone(), }, message.hash_stop))); } - fn serve_mempool(&self, peer_index: usize, _id: u32) { + fn serve_mempool(&self, peer_index: usize) { self.tasks.lock().push((peer_index, ServerTask::ServeMempool)); } - - fn wait_peer_requests_completed(&self, _peer_index: usize) { - } } fn create_synchronization_server() -> (Arc>, Arc>, SynchronizationServer) { @@ -584,13 +520,12 @@ pub mod tests { hash: H256::default(), } ]; - let dummy_id = 0; server.serve_getdata(0, types::GetData { inventory: inventory.clone(), - }, dummy_id); + }); // => respond with notfound let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::Final(dummy_id))]); + assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::None)]); } #[test] @@ -603,13 +538,12 @@ pub mod tests { hash: test_data::genesis().hash(), } ]; - let dummy_id = 0; server.serve_getdata(0, types::GetData { inventory: inventory.clone(), - }, dummy_id); + }); // => respond with block let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::Final(dummy_id))]); + assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::None)]); } #[test] @@ -617,15 +551,14 @@ pub mod tests { let (_, executor, server) = create_synchronization_server(); // when asking for blocks hashes let genesis_block_hash = test_data::genesis().hash(); - let dummy_id = 5; server.serve_getblocks(0, types::GetBlocks { version: 0, block_locator_hashes: vec![genesis_block_hash.clone()], hash_stop: H256::default(), - }, dummy_id); + }); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout - assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]); + assert_eq!(tasks, vec![]); } #[test] @@ -633,19 +566,18 @@ pub mod tests { let (chain, executor, server) = create_synchronization_server(); chain.write().insert_best_block(test_data::block_h1().hash(), &test_data::block_h1()).expect("Db write error"); // when asking for blocks hashes - let dummy_id = 0; server.serve_getblocks(0, types::GetBlocks { version: 0, block_locator_hashes: vec![test_data::genesis().hash()], hash_stop: H256::default(), - }, dummy_id); + }); // => responds with inventory let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: test_data::block_h1().hash(), }]; let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]); + assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]); } #[test] @@ -658,7 +590,7 @@ pub mod tests { version: 0, block_locator_hashes: vec![genesis_block_hash.clone()], hash_stop: H256::default(), - }, dummy_id); + }, Some(dummy_id)); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]); @@ -674,7 +606,7 @@ pub mod tests { version: 0, block_locator_hashes: vec![test_data::genesis().hash()], hash_stop: H256::default(), - }, dummy_id); + }, Some(dummy_id)); // => responds with headers let headers = vec![ test_data::block_h1().block_header, @@ -687,11 +619,10 @@ pub mod tests { fn server_mempool_do_not_responds_inventory_when_empty_memory_pool() { let (_, executor, server) = create_synchronization_server(); // when asking for memory pool transactions ids - let dummy_id = 9; - server.serve_mempool(0, dummy_id); + server.serve_mempool(0); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout - assert_eq!(tasks, vec![Task::Ignore(0, dummy_id)]); + assert_eq!(tasks, vec![]); } #[test] @@ -702,14 +633,13 @@ pub mod tests { let transaction_hash = transaction.hash(); chain.write().insert_verified_transaction(transaction); // when asking for memory pool transactions ids - let dummy_id = 0; - server.serve_mempool(0, dummy_id); + server.serve_mempool(0); // => respond with inventory let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: transaction_hash, }]; let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]); + assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]); } }