diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index c64636b8..f91e0a40 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -214,8 +214,10 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersCon trace!(target: "sync", "Got `cmpctblock` message from peer#{}", peer_index); } - pub fn on_peer_get_block_txn(&self, peer_index: usize, _message: types::GetBlockTxn) { + pub fn on_peer_get_block_txn(&self, peer_index: usize, message: types::GetBlockTxn) { trace!(target: "sync", "Got `getblocktxn` message from peer#{}", peer_index); + + self.server.serve_get_block_txn(peer_index, message.request.blockhash, message.request.indexes).map(|t| self.server.add_task(peer_index, t)); } pub fn on_peer_block_txn(&self, peer_index: usize, _message: types::BlockTxn) { diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index 19896e8a..54a2b3c7 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use std::collections::HashMap; use parking_lot::Mutex; use chain::{Block, BlockHeader, Transaction}; -use message::common::{InventoryVector, InventoryType, BlockHeaderAndIDs}; +use message::common::{InventoryVector, InventoryType, BlockHeaderAndIDs, BlockTransactions}; use message::types; use primitives::hash::H256; use p2p::OutboundSyncConnectionRef; @@ -35,6 +35,8 @@ pub enum Task { SendMerkleBlock(usize, types::MerkleBlock), /// Send transaction SendTransaction(usize, Transaction), + /// Send block transactions + SendBlockTxn(usize, H256, Vec), /// Send notfound SendNotFound(usize, Vec, ServerTaskIndex), /// Send inventory @@ -155,6 +157,19 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor { connection.send_transaction(&transaction_message); } }, + Task::SendBlockTxn(peer_index, block_hash, transactions) => { + let transactions_message = types::BlockTxn { + request: BlockTransactions { + blockhash: block_hash, + transactions: transactions, + } + }; + + if let Some(connection) = self.peers.get_mut(&peer_index) { + trace!(target: "sync", "Sending blocktxn with {} transactions to peer#{}", transactions_message.request.transactions.len(), peer_index); + connection.send_block_txn(&transactions_message); + } + }, Task::SendNotFound(peer_index, unknown_inventory, id) => { let notfound = types::NotFound { inventory: unknown_inventory, diff --git a/sync/src/synchronization_server.rs b/sync/src/synchronization_server.rs index a6b71762..6262cc9c 100644 --- a/sync/src/synchronization_server.rs +++ b/sync/src/synchronization_server.rs @@ -1,7 +1,7 @@ use std::sync::{Arc, Weak}; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; -use std::collections::{VecDeque, HashMap}; +use std::collections::{VecDeque, HashMap, HashSet}; use std::collections::hash_map::Entry; use futures::{Future, BoxFuture, lazy, finished}; use parking_lot::{Mutex, Condvar}; @@ -19,6 +19,7 @@ pub trait Server : Send + Sync + 'static { fn serve_getdata(&self, peer_index: usize, inventory: FilteredInventory) -> Option; fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option; fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option) -> Option; + fn serve_get_block_txn(&self, peer_index: usize, block_hash: H256, indexes: Vec) -> Option; fn serve_mempool(&self, peer_index: usize) -> Option; fn add_task(&self, peer_index: usize, task: IndexedServerTask); } @@ -103,6 +104,7 @@ pub enum ServerTask { ServeGetData(FilteredInventory), ServeGetBlocks(db::BestBlock, H256), ServeGetHeaders(db::BestBlock, H256), + ServeGetBlockTxn(H256, Vec), ServeMempool, ReturnNotFound(Vec), ReturnBlock(H256), @@ -235,6 +237,45 @@ impl SynchronizationServer { // inform that we have processed task for peer queue.lock().task_processed(peer_index); }, + // `getblocktxn` => `blocktxn` + ServerTask::ServeGetBlockTxn(block_hash, indexes) => { + let transactions = { + let chain = chain.read(); + let storage = chain.storage(); + if let Some(block) = storage.block(db::BlockRef::Hash(block_hash.clone())) { + + let requested_len = indexes.len(); + let transactions_len = block.transactions.len(); + let mut read_indexes = HashSet::new(); + let transactions: Vec<_> = indexes.into_iter() + .map(|index| { + if index >= transactions_len { + None + } else if !read_indexes.insert(index) { + None + } else { + Some(block.transactions[index].clone()) + } + }) + .take_while(Option::is_some) + .map(Option::unwrap) // take_while above + .collect(); + if transactions.len() == requested_len { + Some(transactions) + } else { + // TODO: malformed + None + } + } else { + // TODO: else malformed + None + } + }; + if let Some(transactions) = transactions { + trace!(target: "sync", "Going to respond with {} blocktxn transactions to peer#{}", transactions.len(), peer_index); + executor.lock().execute(Task::SendBlockTxn(peer_index, block_hash, transactions)); + } + }, // `mempool` => `inventory` ServerTask::ServeMempool => { let inventory: Vec<_> = chain.read() @@ -399,6 +440,13 @@ impl Server for SynchronizationServer { } } + fn serve_get_block_txn(&self, _peer_index: usize, block_hash: H256, indexes: Vec) -> Option { + // TODO: Upon receipt of a properly-formatted getblocktxn message, nodes which *recently provided the sender + // of such a message a cmpctblock for the block hash identified in this message* MUST respond ... + let task = IndexedServerTask::new(ServerTask::ServeGetBlockTxn(block_hash, indexes), ServerTaskIndex::None); + Some(task) + } + fn serve_mempool(&self, _peer_index: usize) -> Option { let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::None); Some(task) @@ -539,6 +587,11 @@ pub mod tests { None } + fn serve_get_block_txn(&self, peer_index: usize, block_hash: H256, indexes: Vec) -> Option { + self.tasks.lock().push((peer_index, ServerTask::ServeGetBlockTxn(block_hash, indexes))); + None + } + fn serve_mempool(&self, peer_index: usize) -> Option { self.tasks.lock().push((peer_index, ServerTask::ServeMempool)); None @@ -683,4 +736,26 @@ pub mod tests { let tasks = DummyTaskExecutor::wait_tasks(executor); assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]); } + + #[test] + fn server_get_block_txn_responds_when_good_request() { + let (_, executor, server) = create_synchronization_server(); + // when asking for block_txns + server.serve_get_block_txn(0, test_data::genesis().hash(), vec![0]).map(|t| server.add_task(0, t)); + // server responds with transactions + let tasks = DummyTaskExecutor::wait_tasks(executor); + assert_eq!(tasks, vec![Task::SendBlockTxn(0, test_data::genesis().hash(), vec![ + test_data::genesis().transactions[0].clone() + ])]); + } + + #[test] + fn server_get_block_txn_do_not_responds_when_bad_request() { + let (_, executor, server) = create_synchronization_server(); + // when asking for block_txns + server.serve_get_block_txn(0, test_data::genesis().hash(), vec![1]).map(|t| server.add_task(0, t)); + // server responds with transactions + let tasks = DummyTaskExecutor::wait_tasks(executor); + assert_eq!(tasks, vec![]); + } }