diff --git a/p2p/src/protocol/sync.rs b/p2p/src/protocol/sync.rs index 64687d99..86311303 100644 --- a/p2p/src/protocol/sync.rs +++ b/p2p/src/protocol/sync.rs @@ -5,6 +5,8 @@ use protocol::Protocol; use util::{PeerInfo, PeerId}; use p2p::Context; +const UNIMPLEMENTED_TASK_ID: u32 = 0; + pub type InboundSyncConnectionRef = Box; pub type OutboundSyncConnectionRef = Box; pub type LocalSyncNodeRef = Box; @@ -19,13 +21,13 @@ pub trait InboundSyncConnection : Send + Sync { fn start_sync_session(&self, version: u32); fn close_session(&self); fn on_inventory(&self, message: types::Inv); - fn on_getdata(&self, message: types::GetData); - fn on_getblocks(&self, message: types::GetBlocks); - fn on_getheaders(&self, message: types::GetHeaders); + fn on_getdata(&self, message: types::GetData, id: u32); + fn on_getblocks(&self, message: types::GetBlocks, id: u32); + fn on_getheaders(&self, message: types::GetHeaders, id: u32); fn on_transaction(&self, message: types::Tx); fn on_block(&self, message: types::Block); fn on_headers(&self, message: types::Headers); - fn on_mempool(&self, message: types::MemPool); + fn on_mempool(&self, message: types::MemPool, id: u32); fn on_filterload(&self, message: types::FilterLoad); fn on_filteradd(&self, message: types::FilterAdd); fn on_filterclear(&self, message: types::FilterClear); @@ -40,14 +42,14 @@ pub trait InboundSyncConnection : Send + Sync { } pub trait OutboundSyncConnection : Send + Sync { - fn send_inventory(&self, message: &types::Inv); + fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool); fn send_getdata(&self, message: &types::GetData); fn send_getblocks(&self, message: &types::GetBlocks); fn send_getheaders(&self, message: &types::GetHeaders); fn send_transaction(&self, message: &types::Tx); - fn send_block(&self, message: &types::Block); - fn send_headers(&self, message: &types::Headers); - fn send_mempool(&self, message: &types::MemPool); + fn send_block(&self, message: &types::Block, id: u32, is_final: bool); + fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool); + fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool); fn send_filterload(&self, message: &types::FilterLoad); fn send_filteradd(&self, message: &types::FilterAdd); fn send_filterclear(&self, message: &types::FilterClear); @@ -58,7 +60,7 @@ pub trait OutboundSyncConnection : Send + Sync { fn send_compact_block(&self, message: &types::CompactBlock); fn send_get_block_txn(&self, message: &types::GetBlockTxn); fn send_block_txn(&self, message: &types::BlockTxn); - fn send_notfound(&self, message: &types::NotFound); + fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool); } struct OutboundSync { @@ -85,7 +87,7 @@ impl OutboundSync { } impl OutboundSyncConnection for OutboundSync { - fn send_inventory(&self, message: &types::Inv) { + fn send_inventory(&self, message: &types::Inv, id: u32, is_final: bool) { self.send_message(message); } @@ -105,15 +107,15 @@ impl OutboundSyncConnection for OutboundSync { self.send_message(message); } - fn send_block(&self, message: &types::Block) { + fn send_block(&self, message: &types::Block, id: u32, is_final: bool) { self.send_message(message); } - fn send_headers(&self, message: &types::Headers) { + fn send_headers(&self, message: &types::Headers, id: u32, is_final: bool) { self.send_message(message); } - fn send_mempool(&self, message: &types::MemPool) { + fn send_mempool(&self, message: &types::MemPool, id: u32, is_final: bool) { self.send_message(message); } @@ -157,7 +159,7 @@ impl OutboundSyncConnection for OutboundSync { self.send_message(message); } - fn send_notfound(&self, message: &types::NotFound) { + fn send_notfound(&self, message: &types::NotFound, id: u32, is_final: bool) { self.send_message(message); } } @@ -190,15 +192,15 @@ impl Protocol for SyncProtocol { } else if command == &types::GetData::command() { let message: types::GetData = try!(deserialize_payload(payload, self.info.version)); - self.inbound_connection.on_getdata(message); + self.inbound_connection.on_getdata(message, UNIMPLEMENTED_TASK_ID); } else if command == &types::GetBlocks::command() { let message: types::GetBlocks = try!(deserialize_payload(payload, self.info.version)); - self.inbound_connection.on_getblocks(message); + self.inbound_connection.on_getblocks(message, UNIMPLEMENTED_TASK_ID); } else if command == &types::GetHeaders::command() { let message: types::GetHeaders = try!(deserialize_payload(payload, self.info.version)); - self.inbound_connection.on_getheaders(message); + self.inbound_connection.on_getheaders(message, UNIMPLEMENTED_TASK_ID); } else if command == &types::Tx::command() { let message: types::Tx = try!(deserialize_payload(payload, self.info.version)); @@ -210,7 +212,7 @@ impl Protocol for SyncProtocol { } else if command == &types::MemPool::command() { let message: types::MemPool = try!(deserialize_payload(payload, self.info.version)); - self.inbound_connection.on_mempool(message); + self.inbound_connection.on_mempool(message, UNIMPLEMENTED_TASK_ID); } else if command == &types::Headers::command() { let message: types::Headers = try!(deserialize_payload(payload, self.info.version)); diff --git a/p2p/src/session.rs b/p2p/src/session.rs index dc960d1a..0b730151 100644 --- a/p2p/src/session.rs +++ b/p2p/src/session.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use message::{Command, Error}; use p2p::Context; use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol}; -use util::{PeerInfo}; +use util::{ConfigurableSynchronizer, PeerInfo}; pub trait SessionFactory { fn new_session(context: Arc, info: PeerInfo) -> Session; @@ -34,12 +34,14 @@ impl SessionFactory for NormalSessionFactory { pub struct Session { protocols: Mutex>>, + synchronizer: Mutex, } impl Session { pub fn new(protocols: Vec>) -> Self { Session { protocols: Mutex::new(protocols), + synchronizer: Mutex::new(ConfigurableSynchronizer::new(false)), } } diff --git a/sync/src/inbound_connection.rs b/sync/src/inbound_connection.rs index 02b89e4d..4f79ecc6 100644 --- a/sync/src/inbound_connection.rs +++ b/sync/src/inbound_connection.rs @@ -29,16 +29,16 @@ impl InboundSyncConnection for InboundConnection { self.local_node.on_peer_inventory(self.peer_index, message); } - fn on_getdata(&self, message: types::GetData) { - self.local_node.on_peer_getdata(self.peer_index, message); + fn on_getdata(&self, message: types::GetData, id: u32) { + self.local_node.on_peer_getdata(self.peer_index, message, id); } - fn on_getblocks(&self, message: types::GetBlocks) { - self.local_node.on_peer_getblocks(self.peer_index, message); + fn on_getblocks(&self, message: types::GetBlocks, id: u32) { + self.local_node.on_peer_getblocks(self.peer_index, message, id); } - fn on_getheaders(&self, message: types::GetHeaders) { - self.local_node.on_peer_getheaders(self.peer_index, message); + fn on_getheaders(&self, message: types::GetHeaders, id: u32) { + self.local_node.on_peer_getheaders(self.peer_index, message, id); } fn on_transaction(&self, message: types::Tx) { @@ -53,8 +53,8 @@ impl InboundSyncConnection for InboundConnection { self.local_node.on_peer_headers(self.peer_index, message); } - fn on_mempool(&self, message: types::MemPool) { - self.local_node.on_peer_mempool(self.peer_index, message); + fn on_mempool(&self, message: types::MemPool, id: u32) { + self.local_node.on_peer_mempool(self.peer_index, message, id); } fn on_filterload(&self, message: types::FilterLoad) { diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 5ecf5d68..ca079b39 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -97,19 +97,19 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon // TODO: process unknown transactions, etc... } - pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) { + pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData, id: u32) { trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index); - self.server.serve_getdata(peer_index, message); + self.server.serve_getdata(peer_index, message, id); } - pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) { + pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) { trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index); - self.server.serve_getblocks(peer_index, message); + self.server.serve_getblocks(peer_index, message, id); } - pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders) { + pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) { trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index); // simulating bitcoind for passing tests: if we are in nearly-saturated state @@ -123,7 +123,7 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon need_wait }; - self.server.serve_getheaders(peer_index, message); + self.server.serve_getheaders(peer_index, message, id); if need_wait { self.server.wait_peer_requests_completed(peer_index); } @@ -148,10 +148,10 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon } } - pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool) { + pub fn on_peer_mempool(&self, peer_index: usize, _message: types::MemPool, id: u32) { trace!(target: "sync", "Got `mempool` message from peer#{}", peer_index); - self.server.serve_mempool(peer_index); + self.server.serve_mempool(peer_index, id); } pub fn on_peer_filterload(&self, peer_index: usize, _message: types::FilterLoad) { @@ -227,14 +227,14 @@ mod tests { } impl OutboundSyncConnection for DummyOutboundSyncConnection { - fn send_inventory(&self, _message: &types::Inv) {} + fn send_inventory(&self, _message: &types::Inv, _id: u32, _is_final: bool) {} fn send_getdata(&self, _message: &types::GetData) {} fn send_getblocks(&self, _message: &types::GetBlocks) {} fn send_getheaders(&self, _message: &types::GetHeaders) {} fn send_transaction(&self, _message: &types::Tx) {} - fn send_block(&self, _message: &types::Block) {} - fn send_headers(&self, _message: &types::Headers) {} - fn send_mempool(&self, _message: &types::MemPool) {} + fn send_block(&self, _message: &types::Block, _id: u32, _is_final: bool) {} + fn send_headers(&self, _message: &types::Headers, _id: u32, _is_final: bool) {} + fn send_mempool(&self, _message: &types::MemPool, _id: u32, _is_final: bool) {} fn send_filterload(&self, _message: &types::FilterLoad) {} fn send_filteradd(&self, _message: &types::FilterAdd) {} fn send_filterclear(&self, _message: &types::FilterClear) {} @@ -245,7 +245,7 @@ mod tests { fn send_compact_block(&self, _message: &types::CompactBlock) {} fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {} fn send_block_txn(&self, _message: &types::BlockTxn) {} - fn send_notfound(&self, _message: &types::NotFound) {} + fn send_notfound(&self, _message: &types::NotFound, _id: u32, _is_final: bool) {} } fn create_local_node() -> (Core, Handle, Arc>, Arc, LocalNode>) { @@ -283,9 +283,10 @@ mod tests { hash: genesis_block_hash.clone(), } ]; + let dummy_id = 0; local_node.on_peer_getdata(peer_index, types::GetData { inventory: inventory.clone() - }); + }, dummy_id); // => `getdata` is served let tasks = server.take_tasks(); assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]); diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index e1cc9a6f..b248d5ee 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -7,6 +7,7 @@ use message::types; use primitives::hash::H256; use p2p::OutboundSyncConnectionRef; use synchronization_chain::ChainRef; +use synchronization_server::ServerTaskIndex; use local_node::PeersConnections; pub type LocalSynchronizationTaskExecutorRef = Arc>; @@ -24,13 +25,13 @@ pub enum Task { /// Request blocks headers using full getheaders.block_locator_hashes. RequestBlocksHeaders(usize), /// Send block. - SendBlock(usize, Block), + SendBlock(usize, Block, ServerTaskIndex), /// Send notfound - SendNotFound(usize, Vec), + SendNotFound(usize, Vec, ServerTaskIndex), /// Send inventory - SendInventory(usize, Vec), + SendInventory(usize, Vec, ServerTaskIndex), /// Send headers - SendHeaders(usize, Vec), + SendHeaders(usize, Vec, ServerTaskIndex), } /// Synchronization tasks executor @@ -94,7 +95,7 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.send_getheaders(&getheaders); } }, - Task::SendBlock(peer_index, block) => { + Task::SendBlock(peer_index, block, id) => { let block_message = types::Block { block: block, }; @@ -102,10 +103,10 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { if let Some(connection) = self.peers.get_mut(&peer_index) { let connection = &mut *connection; trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index); - connection.send_block(&block_message); + connection.send_block(&block_message, id.raw(), id.is_final()); } }, - Task::SendNotFound(peer_index, unknown_inventory) => { + Task::SendNotFound(peer_index, unknown_inventory, id) => { let notfound = types::NotFound { inventory: unknown_inventory, }; @@ -113,10 +114,10 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { if let Some(connection) = self.peers.get_mut(&peer_index) { let connection = &mut *connection; trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len()); - connection.send_notfound(¬found); + connection.send_notfound(¬found, id.raw(), id.is_final()); } }, - Task::SendInventory(peer_index, inventory) => { + Task::SendInventory(peer_index, inventory, id) => { let inventory = types::Inv { inventory: inventory, }; @@ -124,10 +125,10 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { if let Some(connection) = self.peers.get_mut(&peer_index) { let connection = &mut *connection; trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len()); - connection.send_inventory(&inventory); + connection.send_inventory(&inventory, id.raw(), id.is_final()); } }, - Task::SendHeaders(peer_index, headers) => { + Task::SendHeaders(peer_index, headers, id) => { let headers = types::Headers { headers: headers, }; @@ -135,7 +136,7 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { if let Some(connection) = self.peers.get_mut(&peer_index) { let connection = &mut *connection; trace!(target: "sync", "Sending headers to peer#{} with {} items", peer_index, headers.headers.len()); - connection.send_headers(&headers); + connection.send_headers(&headers, id.raw(), id.is_final()); } }, } @@ -194,4 +195,4 @@ pub mod tests { self.waiter.notify_one(); } } -} \ No newline at end of file +} diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index aec9d303..602fa484 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -14,10 +14,10 @@ use message::types; /// Synchronization requests server trait pub trait Server : Send + 'static { - fn serve_getdata(&self, peer_index: usize, message: types::GetData); - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks); - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders); - fn serve_mempool(&self, peer_index: usize); + fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32); + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32); + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32); + fn serve_mempool(&self, peer_index: usize, id: u32); fn wait_peer_requests_completed(&self, peer_index: usize); } @@ -42,10 +42,53 @@ struct ServerQueue { is_stopping: AtomicBool, queue_ready: Arc, peers_queue: VecDeque, - tasks_queue: HashMap>, + tasks_queue: HashMap>, peer_waiters: HashMap>, } +/// `ServerTask` index. +#[derive(Debug, PartialEq)] +pub enum ServerTaskIndex { + /// `Partial` is used when server needs to send more than one response for request. + Partial(u32), + /// `Final` task task can be preceded by many `Partial` tasks with the same id. + Final(u32), +} + +impl ServerTaskIndex { + pub fn raw(&self) -> u32 { + match *self { + ServerTaskIndex::Partial(id) => id, + ServerTaskIndex::Final(id) => id, + } + } + + pub fn is_final(&self) -> bool { + match *self { + ServerTaskIndex::Partial(_) => false, + ServerTaskIndex::Final(_) => true, + } + } +} + +/// Server tests together with unique id assigned to it +#[derive(Debug, PartialEq)] +pub struct IndexedServerTask { + /// Task itself. + task: ServerTask, + /// Task id. + id: ServerTaskIndex, +} + +impl IndexedServerTask { + fn new(task: ServerTask, id: ServerTaskIndex) -> Self { + IndexedServerTask { + task: task, + id: id, + } + } +} + #[derive(Debug, PartialEq)] pub enum ServerTask { ServeGetData(Vec), @@ -97,11 +140,18 @@ impl SynchronizationServer { }) }; - match server_task { - // `getdata` => `notfound` + `block` + ... - Some((peer_index, ServerTask::ServeGetData(inventory))) => { + let (peer_index, indexed_task) = match server_task { + Some((peer_index, indexed_task)) => (peer_index, indexed_task), + // no tasks after wake-up => stopping or pausing + _ => continue, + }; + + match indexed_task.task { + // `getdata` => `notfound` + `block` + ... + ServerTask::ServeGetData(inventory) => { let mut unknown_items: Vec = Vec::new(); - let mut new_tasks: Vec = Vec::new(); + let mut new_tasks: Vec = Vec::new(); + let task_id = indexed_task.id.raw(); { let chain = chain.read(); let storage = chain.storage(); @@ -109,7 +159,10 @@ impl SynchronizationServer { match item.inv_type { InventoryType::MessageBlock => { match storage.block_number(&item.hash) { - Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())), + Some(_) => { + let task = IndexedServerTask::new(ServerTask::ReturnBlock(item.hash.clone()), ServerTaskIndex::Partial(task_id)); + new_tasks.push(task); + }, None => unknown_items.push(item), } }, @@ -120,18 +173,23 @@ impl SynchronizationServer { // respond with `notfound` message for unknown data if !unknown_items.is_empty() { trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index); - new_tasks.push(ServerTask::ReturnNotFound(unknown_items)); + let task = IndexedServerTask::new(ServerTask::ReturnNotFound(unknown_items), ServerTaskIndex::Partial(task_id)); + new_tasks.push(task); } // schedule data responses if !new_tasks.is_empty() { trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index); + // mark last task as the final one + if let Some(task) = new_tasks.last_mut() { + task.id = ServerTaskIndex::Final(task_id); + } queue.lock().add_tasks(peer_index, new_tasks); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, // `getblocks` => `inventory` - Some((peer_index, ServerTask::ServeGetBlocks(best_block, hash_stop))) => { + ServerTask::ServeGetBlocks(best_block, hash_stop) => { let blocks_hashes = SynchronizationServer::blocks_hashes_after(&chain, &best_block, &hash_stop, 500); if !blocks_hashes.is_empty() { trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", blocks_hashes.len(), peer_index); @@ -139,25 +197,25 @@ impl SynchronizationServer { inv_type: InventoryType::MessageBlock, hash: hash, }).collect(); - executor.lock().execute(Task::SendInventory(peer_index, inventory)); + executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id)); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, // `getheaders` => `headers` - Some((peer_index, ServerTask::ServeGetHeaders(best_block, hash_stop))) => { + ServerTask::ServeGetHeaders(best_block, hash_stop) => { // What if we have no common blocks with peer at all? Maybe drop connection or penalize peer? // https://github.com/ethcore/parity-bitcoin/pull/91#discussion_r86734568 let blocks_headers = SynchronizationServer::blocks_headers_after(&chain, &best_block, &hash_stop, 2000); if !blocks_headers.is_empty() { trace!(target: "sync", "Going to respond with blocks headers with {} items to peer#{}", blocks_headers.len(), peer_index); - executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers)); + executor.lock().execute(Task::SendHeaders(peer_index, blocks_headers, indexed_task.id)); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, // `mempool` => `inventory` - Some((peer_index, ServerTask::ServeMempool)) => { + ServerTask::ServeMempool => { let inventory: Vec<_> = chain.read() .memory_pool() .get_transactions_ids() @@ -169,27 +227,25 @@ impl SynchronizationServer { .collect(); if !inventory.is_empty() { trace!(target: "sync", "Going to respond with {} memory-pool transactions ids to peer#{}", inventory.len(), peer_index); - executor.lock().execute(Task::SendInventory(peer_index, inventory)); + executor.lock().execute(Task::SendInventory(peer_index, inventory, indexed_task.id)); } // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, // `notfound` - Some((peer_index, ServerTask::ReturnNotFound(inventory))) => { - executor.lock().execute(Task::SendNotFound(peer_index, inventory)); + ServerTask::ReturnNotFound(inventory) => { + executor.lock().execute(Task::SendNotFound(peer_index, inventory, indexed_task.id)); // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, // `block` - Some((peer_index, ServerTask::ReturnBlock(block_hash))) => { + ServerTask::ReturnBlock(block_hash) => { let block = chain.read().storage().block(db::BlockRef::Hash(block_hash)) .expect("we have checked that block exists in ServeGetData; db is append-only; qed"); - executor.lock().execute(Task::SendBlock(peer_index, block)); + executor.lock().execute(Task::SendBlock(peer_index, block, indexed_task.id)); // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, - // no tasks after wake-up => stopping or pausing - None => (), } } } @@ -273,32 +329,36 @@ impl Drop for SynchronizationServer { } impl Server for SynchronizationServer { - fn serve_getdata(&self, peer_index: usize, message: types::GetData) { - self.queue.lock().add_task(peer_index, ServerTask::ServeGetData(message.inventory)); + fn serve_getdata(&self, peer_index: usize, message: types::GetData, id: u32) { + let task = IndexedServerTask::new(ServerTask::ServeGetData(message.inventory), ServerTaskIndex::Final(id)); + self.queue.lock().add_task(peer_index, task); } - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) { + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, id: u32) { if let Some(best_common_block) = self.locate_known_block_hash(message.block_locator_hashes) { trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); - self.queue.lock().add_task(peer_index, ServerTask::ServeGetBlocks(best_common_block, message.hash_stop)); + let task = IndexedServerTask::new(ServerTask::ServeGetBlocks(best_common_block, message.hash_stop), ServerTaskIndex::Final(id)); + self.queue.lock().add_task(peer_index, task); } else { trace!(target: "sync", "No common blocks with peer#{}", peer_index); } } - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) { + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: u32) { if let Some(best_common_block) = self.locate_known_block_header(message.block_locator_hashes) { trace!(target: "sync", "Best common block header with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash); - self.queue.lock().add_task(peer_index, ServerTask::ServeGetHeaders(best_common_block, message.hash_stop)); + let task = IndexedServerTask::new(ServerTask::ServeGetHeaders(best_common_block, message.hash_stop), ServerTaskIndex::Final(id)); + self.queue.lock().add_task(peer_index, task); } else { trace!(target: "sync", "No common blocks headers with peer#{}", peer_index); } } - fn serve_mempool(&self, peer_index: usize) { - self.queue.lock().add_task(peer_index, ServerTask::ServeMempool); + fn serve_mempool(&self, peer_index: usize, id: u32) { + let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::Final(id)); + self.queue.lock().add_task(peer_index, task); } fn wait_peer_requests_completed(&self, peer_index: usize) { @@ -322,7 +382,7 @@ impl ServerQueue { } } - pub fn next_task(&mut self) -> Option<(usize, ServerTask)> { + pub fn next_task(&mut self) -> Option<(usize, IndexedServerTask)> { self.peers_queue.pop_front() .map(|peer| { let (peer_task, no_tasks_left) = { @@ -353,7 +413,7 @@ impl ServerQueue { } } - pub fn add_task(&mut self, peer_index: usize, task: ServerTask) { + pub fn add_task(&mut self, peer_index: usize, task: IndexedServerTask) { match self.tasks_queue.entry(peer_index) { Entry::Occupied(mut entry) => { let add_to_peers_queue = entry.get().is_empty(); @@ -372,7 +432,7 @@ impl ServerQueue { self.queue_ready.notify_one(); } - pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec) { + pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec) { match self.tasks_queue.entry(peer_index) { Entry::Occupied(mut entry) => { let add_to_peers_queue = entry.get().is_empty(); @@ -442,7 +502,7 @@ pub mod tests { use synchronization_executor::Task; use synchronization_executor::tests::DummyTaskExecutor; use synchronization_chain::Chain; - use super::{Server, ServerTask, SynchronizationServer}; + use super::{Server, ServerTask, SynchronizationServer, ServerTaskIndex}; pub struct DummyServer { tasks: Mutex>, @@ -461,25 +521,25 @@ pub mod tests { } impl Server for DummyServer { - fn serve_getdata(&self, peer_index: usize, message: types::GetData) { + fn serve_getdata(&self, peer_index: usize, message: types::GetData, _id: u32) { self.tasks.lock().push((peer_index, ServerTask::ServeGetData(message.inventory))); } - fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) { + fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks, _id: u32) { self.tasks.lock().push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock { number: 0, hash: message.block_locator_hashes[0].clone(), }, message.hash_stop))); } - fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders) { + fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, _id: u32) { self.tasks.lock().push((peer_index, ServerTask::ServeGetHeaders(db::BestBlock { number: 0, hash: message.block_locator_hashes[0].clone(), }, message.hash_stop))); } - fn serve_mempool(&self, peer_index: usize) { + fn serve_mempool(&self, peer_index: usize, _id: u32) { self.tasks.lock().push((peer_index, ServerTask::ServeMempool)); } @@ -504,12 +564,13 @@ pub mod tests { hash: H256::default(), } ]; + let dummy_id = 0; server.serve_getdata(0, types::GetData { inventory: inventory.clone(), - }); + }, dummy_id); // => respond with notfound let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendNotFound(0, inventory)]); + assert_eq!(tasks, vec![Task::SendNotFound(0, inventory, ServerTaskIndex::Final(dummy_id))]); } #[test] @@ -522,12 +583,13 @@ pub mod tests { hash: test_data::genesis().hash(), } ]; + let dummy_id = 0; server.serve_getdata(0, types::GetData { inventory: inventory.clone(), - }); + }, dummy_id); // => respond with block let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis())]); + assert_eq!(tasks, vec![Task::SendBlock(0, test_data::genesis(), ServerTaskIndex::Final(dummy_id))]); } #[test] @@ -535,11 +597,12 @@ pub mod tests { let (_, executor, server) = create_synchronization_server(); // when asking for blocks hashes let genesis_block_hash = test_data::genesis().hash(); + let dummy_id = 0; server.serve_getblocks(0, types::GetBlocks { version: 0, block_locator_hashes: vec![genesis_block_hash.clone()], hash_stop: H256::default(), - }); + }, dummy_id); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout assert_eq!(tasks, vec![]); @@ -550,18 +613,19 @@ pub mod tests { let (chain, executor, server) = create_synchronization_server(); chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error"); // when asking for blocks hashes + let dummy_id = 0; server.serve_getblocks(0, types::GetBlocks { version: 0, block_locator_hashes: vec![test_data::genesis().hash()], hash_stop: H256::default(), - }); + }, dummy_id); // => responds with inventory let inventory = vec![InventoryVector { inv_type: InventoryType::MessageBlock, hash: test_data::block_h1().hash(), }]; let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]); + assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]); } #[test] @@ -569,11 +633,12 @@ pub mod tests { let (_, executor, server) = create_synchronization_server(); // when asking for blocks hashes let genesis_block_hash = test_data::genesis().hash(); + let dummy_id = 0; server.serve_getheaders(0, types::GetHeaders { version: 0, block_locator_hashes: vec![genesis_block_hash.clone()], hash_stop: H256::default(), - }); + }, dummy_id); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout assert_eq!(tasks, vec![]); @@ -584,24 +649,26 @@ pub mod tests { let (chain, executor, server) = create_synchronization_server(); chain.write().insert_best_block(test_data::block_h1().hash(), test_data::block_h1()).expect("Db write error"); // when asking for blocks hashes + let dummy_id = 0; server.serve_getheaders(0, types::GetHeaders { version: 0, block_locator_hashes: vec![test_data::genesis().hash()], hash_stop: H256::default(), - }); + }, dummy_id); // => responds with headers let headers = vec![ test_data::block_h1().block_header, ]; let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendHeaders(0, headers)]); + assert_eq!(tasks, vec![Task::SendHeaders(0, headers, ServerTaskIndex::Final(dummy_id))]); } #[test] fn server_mempool_do_not_responds_inventory_when_empty_memory_pool() { let (_, executor, server) = create_synchronization_server(); // when asking for memory pool transactions ids - server.serve_mempool(0); + let dummy_id = 0; + server.serve_mempool(0, dummy_id); // => no response let tasks = DummyTaskExecutor::wait_tasks_for(executor, 100); // TODO: get rid of explicit timeout assert_eq!(tasks, vec![]); @@ -615,13 +682,14 @@ pub mod tests { let transaction_hash = transaction.hash(); chain.write().memory_pool_mut().insert_verified(transaction); // when asking for memory pool transactions ids - server.serve_mempool(0); + let dummy_id = 0; + server.serve_mempool(0, dummy_id); // => respond with inventory let inventory = vec![InventoryVector { inv_type: InventoryType::MessageTx, hash: transaction_hash, }]; let tasks = DummyTaskExecutor::wait_tasks(executor); - assert_eq!(tasks, vec![Task::SendInventory(0, inventory)]); + assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::Final(dummy_id))]); } }