Merge pull request #205 from ethcore/sync_getblocktxn

Process `getblocktxn` message in sync
This commit is contained in:
Marek Kotewicz 2016-11-28 10:23:10 +01:00 committed by GitHub
commit 252eec711a
3 changed files with 95 additions and 3 deletions

View File

@ -214,8 +214,10 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
trace!(target: "sync", "Got `cmpctblock` message from peer#{}", peer_index);
}
pub fn on_peer_get_block_txn(&self, peer_index: usize, _message: types::GetBlockTxn) {
pub fn on_peer_get_block_txn(&self, peer_index: usize, message: types::GetBlockTxn) {
trace!(target: "sync", "Got `getblocktxn` message from peer#{}", peer_index);
self.server.serve_get_block_txn(peer_index, message.request.blockhash, message.request.indexes).map(|t| self.server.add_task(peer_index, t));
}
pub fn on_peer_block_txn(&self, peer_index: usize, _message: types::BlockTxn) {

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use std::collections::HashMap;
use parking_lot::Mutex;
use chain::{Block, BlockHeader, Transaction};
use message::common::{InventoryVector, InventoryType, BlockHeaderAndIDs};
use message::common::{InventoryVector, InventoryType, BlockHeaderAndIDs, BlockTransactions};
use message::types;
use primitives::hash::H256;
use p2p::OutboundSyncConnectionRef;
@ -35,6 +35,8 @@ pub enum Task {
SendMerkleBlock(usize, types::MerkleBlock),
/// Send transaction
SendTransaction(usize, Transaction),
/// Send block transactions
SendBlockTxn(usize, H256, Vec<Transaction>),
/// Send notfound
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
/// Send inventory
@ -155,6 +157,19 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_transaction(&transaction_message);
}
},
Task::SendBlockTxn(peer_index, block_hash, transactions) => {
let transactions_message = types::BlockTxn {
request: BlockTransactions {
blockhash: block_hash,
transactions: transactions,
}
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
trace!(target: "sync", "Sending blocktxn with {} transactions to peer#{}", transactions_message.request.transactions.len(), peer_index);
connection.send_block_txn(&transactions_message);
}
},
Task::SendNotFound(peer_index, unknown_inventory, id) => {
let notfound = types::NotFound {
inventory: unknown_inventory,

View File

@ -1,7 +1,7 @@
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::collections::{VecDeque, HashMap};
use std::collections::{VecDeque, HashMap, HashSet};
use std::collections::hash_map::Entry;
use futures::{Future, BoxFuture, lazy, finished};
use parking_lot::{Mutex, Condvar};
@ -19,6 +19,7 @@ pub trait Server : Send + Sync + 'static {
fn serve_getdata(&self, peer_index: usize, inventory: FilteredInventory) -> Option<IndexedServerTask>;
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask>;
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask>;
fn serve_get_block_txn(&self, peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask>;
fn serve_mempool(&self, peer_index: usize) -> Option<IndexedServerTask>;
fn add_task(&self, peer_index: usize, task: IndexedServerTask);
}
@ -103,6 +104,7 @@ pub enum ServerTask {
ServeGetData(FilteredInventory),
ServeGetBlocks(db::BestBlock, H256),
ServeGetHeaders(db::BestBlock, H256),
ServeGetBlockTxn(H256, Vec<usize>),
ServeMempool,
ReturnNotFound(Vec<InventoryVector>),
ReturnBlock(H256),
@ -235,6 +237,45 @@ impl SynchronizationServer {
// inform that we have processed task for peer
queue.lock().task_processed(peer_index);
},
// `getblocktxn` => `blocktxn`
ServerTask::ServeGetBlockTxn(block_hash, indexes) => {
let transactions = {
let chain = chain.read();
let storage = chain.storage();
if let Some(block) = storage.block(db::BlockRef::Hash(block_hash.clone())) {
let requested_len = indexes.len();
let transactions_len = block.transactions.len();
let mut read_indexes = HashSet::new();
let transactions: Vec<_> = indexes.into_iter()
.map(|index| {
if index >= transactions_len {
None
} else if !read_indexes.insert(index) {
None
} else {
Some(block.transactions[index].clone())
}
})
.take_while(Option::is_some)
.map(Option::unwrap) // take_while above
.collect();
if transactions.len() == requested_len {
Some(transactions)
} else {
// TODO: malformed
None
}
} else {
// TODO: else malformed
None
}
};
if let Some(transactions) = transactions {
trace!(target: "sync", "Going to respond with {} blocktxn transactions to peer#{}", transactions.len(), peer_index);
executor.lock().execute(Task::SendBlockTxn(peer_index, block_hash, transactions));
}
},
// `mempool` => `inventory`
ServerTask::ServeMempool => {
let inventory: Vec<_> = chain.read()
@ -399,6 +440,13 @@ impl Server for SynchronizationServer {
}
}
fn serve_get_block_txn(&self, _peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask> {
// TODO: Upon receipt of a properly-formatted getblocktxn message, nodes which *recently provided the sender
// of such a message a cmpctblock for the block hash identified in this message* MUST respond ...
let task = IndexedServerTask::new(ServerTask::ServeGetBlockTxn(block_hash, indexes), ServerTaskIndex::None);
Some(task)
}
fn serve_mempool(&self, _peer_index: usize) -> Option<IndexedServerTask> {
let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::None);
Some(task)
@ -539,6 +587,11 @@ pub mod tests {
None
}
fn serve_get_block_txn(&self, peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask> {
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlockTxn(block_hash, indexes)));
None
}
fn serve_mempool(&self, peer_index: usize) -> Option<IndexedServerTask> {
self.tasks.lock().push((peer_index, ServerTask::ServeMempool));
None
@ -683,4 +736,26 @@ pub mod tests {
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]);
}
#[test]
fn server_get_block_txn_responds_when_good_request() {
let (_, executor, server) = create_synchronization_server();
// when asking for block_txns
server.serve_get_block_txn(0, test_data::genesis().hash(), vec![0]).map(|t| server.add_task(0, t));
// server responds with transactions
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![Task::SendBlockTxn(0, test_data::genesis().hash(), vec![
test_data::genesis().transactions[0].clone()
])]);
}
#[test]
fn server_get_block_txn_do_not_responds_when_bad_request() {
let (_, executor, server) = create_synchronization_server();
// when asking for block_txns
server.serve_get_block_txn(0, test_data::genesis().hash(), vec![1]).map(|t| server.add_task(0, t));
// server responds with transactions
let tasks = DummyTaskExecutor::wait_tasks(executor);
assert_eq!(tasks, vec![]);
}
}