Merge branch 'master' of github.com:ethcore/parity-bitcoin into depsup
This commit is contained in:
commit
5857f0a2da
|
@ -214,8 +214,10 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
|
|||
trace!(target: "sync", "Got `cmpctblock` message from peer#{}", peer_index);
|
||||
}
|
||||
|
||||
pub fn on_peer_get_block_txn(&self, peer_index: usize, _message: types::GetBlockTxn) {
|
||||
pub fn on_peer_get_block_txn(&self, peer_index: usize, message: types::GetBlockTxn) {
|
||||
trace!(target: "sync", "Got `getblocktxn` message from peer#{}", peer_index);
|
||||
|
||||
self.server.serve_get_block_txn(peer_index, message.request.blockhash, message.request.indexes).map(|t| self.server.add_task(peer_index, t));
|
||||
}
|
||||
|
||||
pub fn on_peer_block_txn(&self, peer_index: usize, _message: types::BlockTxn) {
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::sync::Arc;
|
|||
use std::collections::HashMap;
|
||||
use parking_lot::Mutex;
|
||||
use chain::{Block, BlockHeader, Transaction};
|
||||
use message::common::{InventoryVector, InventoryType, BlockHeaderAndIDs};
|
||||
use message::common::{InventoryVector, InventoryType, BlockHeaderAndIDs, BlockTransactions};
|
||||
use message::types;
|
||||
use primitives::hash::H256;
|
||||
use p2p::OutboundSyncConnectionRef;
|
||||
|
@ -35,6 +35,8 @@ pub enum Task {
|
|||
SendMerkleBlock(usize, types::MerkleBlock),
|
||||
/// Send transaction
|
||||
SendTransaction(usize, Transaction),
|
||||
/// Send block transactions
|
||||
SendBlockTxn(usize, H256, Vec<Transaction>),
|
||||
/// Send notfound
|
||||
SendNotFound(usize, Vec<InventoryVector>, ServerTaskIndex),
|
||||
/// Send inventory
|
||||
|
@ -155,6 +157,19 @@ impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
connection.send_transaction(&transaction_message);
|
||||
}
|
||||
},
|
||||
Task::SendBlockTxn(peer_index, block_hash, transactions) => {
|
||||
let transactions_message = types::BlockTxn {
|
||||
request: BlockTransactions {
|
||||
blockhash: block_hash,
|
||||
transactions: transactions,
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
trace!(target: "sync", "Sending blocktxn with {} transactions to peer#{}", transactions_message.request.transactions.len(), peer_index);
|
||||
connection.send_block_txn(&transactions_message);
|
||||
}
|
||||
},
|
||||
Task::SendNotFound(peer_index, unknown_inventory, id) => {
|
||||
let notfound = types::NotFound {
|
||||
inventory: unknown_inventory,
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::sync::{Arc, Weak};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread;
|
||||
use std::collections::{VecDeque, HashMap};
|
||||
use std::collections::{VecDeque, HashMap, HashSet};
|
||||
use std::collections::hash_map::Entry;
|
||||
use futures::{Future, BoxFuture, lazy, finished};
|
||||
use parking_lot::{Mutex, Condvar};
|
||||
|
@ -19,6 +19,7 @@ pub trait Server : Send + Sync + 'static {
|
|||
fn serve_getdata(&self, peer_index: usize, inventory: FilteredInventory) -> Option<IndexedServerTask>;
|
||||
fn serve_getblocks(&self, peer_index: usize, message: types::GetBlocks) -> Option<IndexedServerTask>;
|
||||
fn serve_getheaders(&self, peer_index: usize, message: types::GetHeaders, id: Option<u32>) -> Option<IndexedServerTask>;
|
||||
fn serve_get_block_txn(&self, peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask>;
|
||||
fn serve_mempool(&self, peer_index: usize) -> Option<IndexedServerTask>;
|
||||
fn add_task(&self, peer_index: usize, task: IndexedServerTask);
|
||||
}
|
||||
|
@ -103,6 +104,7 @@ pub enum ServerTask {
|
|||
ServeGetData(FilteredInventory),
|
||||
ServeGetBlocks(db::BestBlock, H256),
|
||||
ServeGetHeaders(db::BestBlock, H256),
|
||||
ServeGetBlockTxn(H256, Vec<usize>),
|
||||
ServeMempool,
|
||||
ReturnNotFound(Vec<InventoryVector>),
|
||||
ReturnBlock(H256),
|
||||
|
@ -235,6 +237,45 @@ impl SynchronizationServer {
|
|||
// inform that we have processed task for peer
|
||||
queue.lock().task_processed(peer_index);
|
||||
},
|
||||
// `getblocktxn` => `blocktxn`
|
||||
ServerTask::ServeGetBlockTxn(block_hash, indexes) => {
|
||||
let transactions = {
|
||||
let chain = chain.read();
|
||||
let storage = chain.storage();
|
||||
if let Some(block) = storage.block(db::BlockRef::Hash(block_hash.clone())) {
|
||||
|
||||
let requested_len = indexes.len();
|
||||
let transactions_len = block.transactions.len();
|
||||
let mut read_indexes = HashSet::new();
|
||||
let transactions: Vec<_> = indexes.into_iter()
|
||||
.map(|index| {
|
||||
if index >= transactions_len {
|
||||
None
|
||||
} else if !read_indexes.insert(index) {
|
||||
None
|
||||
} else {
|
||||
Some(block.transactions[index].clone())
|
||||
}
|
||||
})
|
||||
.take_while(Option::is_some)
|
||||
.map(Option::unwrap) // take_while above
|
||||
.collect();
|
||||
if transactions.len() == requested_len {
|
||||
Some(transactions)
|
||||
} else {
|
||||
// TODO: malformed
|
||||
None
|
||||
}
|
||||
} else {
|
||||
// TODO: else malformed
|
||||
None
|
||||
}
|
||||
};
|
||||
if let Some(transactions) = transactions {
|
||||
trace!(target: "sync", "Going to respond with {} blocktxn transactions to peer#{}", transactions.len(), peer_index);
|
||||
executor.lock().execute(Task::SendBlockTxn(peer_index, block_hash, transactions));
|
||||
}
|
||||
},
|
||||
// `mempool` => `inventory`
|
||||
ServerTask::ServeMempool => {
|
||||
let inventory: Vec<_> = chain.read()
|
||||
|
@ -399,6 +440,13 @@ impl Server for SynchronizationServer {
|
|||
}
|
||||
}
|
||||
|
||||
fn serve_get_block_txn(&self, _peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask> {
|
||||
// TODO: Upon receipt of a properly-formatted getblocktxn message, nodes which *recently provided the sender
|
||||
// of such a message a cmpctblock for the block hash identified in this message* MUST respond ...
|
||||
let task = IndexedServerTask::new(ServerTask::ServeGetBlockTxn(block_hash, indexes), ServerTaskIndex::None);
|
||||
Some(task)
|
||||
}
|
||||
|
||||
fn serve_mempool(&self, _peer_index: usize) -> Option<IndexedServerTask> {
|
||||
let task = IndexedServerTask::new(ServerTask::ServeMempool, ServerTaskIndex::None);
|
||||
Some(task)
|
||||
|
@ -539,6 +587,11 @@ pub mod tests {
|
|||
None
|
||||
}
|
||||
|
||||
fn serve_get_block_txn(&self, peer_index: usize, block_hash: H256, indexes: Vec<usize>) -> Option<IndexedServerTask> {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeGetBlockTxn(block_hash, indexes)));
|
||||
None
|
||||
}
|
||||
|
||||
fn serve_mempool(&self, peer_index: usize) -> Option<IndexedServerTask> {
|
||||
self.tasks.lock().push((peer_index, ServerTask::ServeMempool));
|
||||
None
|
||||
|
@ -683,4 +736,26 @@ pub mod tests {
|
|||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendInventory(0, inventory, ServerTaskIndex::None)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_get_block_txn_responds_when_good_request() {
|
||||
let (_, executor, server) = create_synchronization_server();
|
||||
// when asking for block_txns
|
||||
server.serve_get_block_txn(0, test_data::genesis().hash(), vec![0]).map(|t| server.add_task(0, t));
|
||||
// server responds with transactions
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![Task::SendBlockTxn(0, test_data::genesis().hash(), vec![
|
||||
test_data::genesis().transactions[0].clone()
|
||||
])]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_get_block_txn_do_not_responds_when_bad_request() {
|
||||
let (_, executor, server) = create_synchronization_server();
|
||||
// when asking for block_txns
|
||||
server.serve_get_block_txn(0, test_data::genesis().hash(), vec![1]).map(|t| server.add_task(0, t));
|
||||
// server responds with transactions
|
||||
let tasks = DummyTaskExecutor::wait_tasks(executor);
|
||||
assert_eq!(tasks, vec![]);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue