Merge branch 'master' of github.com:ethcore/parity-bitcoin into p2p_multiple_connections

This commit is contained in:
debris 2016-11-03 00:22:32 +01:00
commit 67aae38673
14 changed files with 744 additions and 212 deletions

1
Cargo.lock generated
View File

@ -584,6 +584,7 @@ dependencies = [
"p2p 0.1.0",
"parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0",
"test-data 0.1.0",
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
"verification 0.1.0",
]

View File

@ -20,7 +20,7 @@ const COL_BLOCK_HEADERS: u32 = 2;
const COL_BLOCK_TRANSACTIONS: u32 = 3;
const COL_TRANSACTIONS: u32 = 4;
const COL_TRANSACTIONS_META: u32 = 5;
const _COL_RESERVED2: u32 = 6;
const COL_BLOCK_NUMBERS: u32 = 6;
const _COL_RESERVED3: u32 = 7;
const _COL_RESERVED4: u32 = 8;
const _COL_RESERVED5: u32 = 9;
@ -266,8 +266,9 @@ impl Store for Storage {
self.best_block.read().clone()
}
fn block_number(&self, _hash: &H256) -> Option<u32> {
unimplemented!()
fn block_number(&self, hash: &H256) -> Option<u32> {
self.get(COL_BLOCK_NUMBERS, &**hash)
.map(|val| LittleEndian::read_u32(&val))
}
fn block_hash(&self, number: u32) -> Option<H256> {
@ -356,7 +357,8 @@ impl Store for Storage {
transaction.write_u32(Some(COL_META), KEY_BEST_BLOCK_NUMBER, new_best_number);
// updating main chain height reference
transaction.put(Some(COL_BLOCK_HASHES), &u32_key(new_best_number), std::ops::Deref::deref(&block_hash))
transaction.put(Some(COL_BLOCK_HASHES), &u32_key(new_best_number), std::ops::Deref::deref(&block_hash));
transaction.write_u32(Some(COL_BLOCK_NUMBERS), std::ops::Deref::deref(&block_hash), new_best_number);
}
transaction.put(Some(COL_META), KEY_BEST_BLOCK_HASH, std::ops::Deref::deref(&new_best_hash));
@ -470,6 +472,18 @@ mod tests {
assert_eq!(loaded_transaction.hash(), block.transactions()[0].hash());
}
#[test]
fn stores_block_number() {
let path = RandomTempPath::create_dir();
let store = Storage::new(path.as_path()).unwrap();
let block: Block = test_data::block_h9();
store.insert_block(&block).unwrap();
let number = store.block_number(&block.hash()).unwrap();
assert_eq!(0, number);
}
#[test]
fn transaction_meta_update() {
let path = RandomTempPath::create_dir();

View File

@ -37,6 +37,7 @@ pub trait InboundSyncConnection : Send + Sync {
fn on_compact_block(&self, message: types::CompactBlock);
fn on_get_block_txn(&self, message: types::GetBlockTxn);
fn on_block_txn(&self, message: types::BlockTxn);
fn on_notfound(&self, message: types::NotFound);
}
pub trait OutboundSyncConnection : Send + Sync {
@ -58,6 +59,7 @@ pub trait OutboundSyncConnection : Send + Sync {
fn send_compact_block(&self, message: &types::CompactBlock);
fn send_get_block_txn(&self, message: &types::GetBlockTxn);
fn send_block_txn(&self, message: &types::BlockTxn);
fn send_notfound(&self, message: &types::NotFound);
}
struct OutboundSync {
@ -155,6 +157,10 @@ impl OutboundSyncConnection for OutboundSync {
fn send_block_txn(&self, message: &types::BlockTxn) {
self.send_message(message);
}
fn send_notfound(&self, message: &types::NotFound) {
self.send_message(message);
}
}
pub struct SyncProtocol {
@ -249,6 +255,10 @@ impl Protocol for SyncProtocol {
let message: types::BlockTxn = try!(deserialize_payload(payload, version));
self.inbound_connection.on_block_txn(message);
}
else if command == &types::NotFound::command() {
let message: types::NotFound = try!(deserialize_payload(payload, version));
self.inbound_connection.on_notfound(message);
}
Ok(())
}

View File

@ -60,7 +60,12 @@ impl From<Node> for NodeByScore {
impl PartialOrd for NodeByScore {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
if self.0.failures == other.0.failures {
other.0.time.partial_cmp(&self.0.time)
if other.0.time == self.0.time {
other.0.partial_cmp(&self.0)
}
else {
other.0.time.partial_cmp(&self.0.time)
}
} else {
self.0.failures.partial_cmp(&other.0.failures)
}
@ -70,7 +75,12 @@ impl PartialOrd for NodeByScore {
impl Ord for NodeByScore {
fn cmp(&self, other: &Self) -> Ordering {
if self.0.failures == other.0.failures {
other.0.time.cmp(&self.0.time)
if other.0.time == self.0.time {
other.0.cmp(&self.0)
}
else {
other.0.time.cmp(&self.0.time)
}
} else {
self.0.failures.cmp(&other.0.failures)
}
@ -88,13 +98,63 @@ impl From<Node> for NodeByTime {
impl PartialOrd for NodeByTime {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.0.time.partial_cmp(&self.0.time)
if other.0.time == self.0.time {
other.0.partial_cmp(&self.0)
}
else {
other.0.time.partial_cmp(&self.0.time)
}
}
}
impl Ord for NodeByTime {
fn cmp(&self, other: &Self) -> Ordering {
other.0.time.cmp(&self.0.time)
if other.0.time == self.0.time {
other.0.cmp(&self.0)
}
else {
other.0.time.cmp(&self.0.time)
}
}
}
impl Ord for Node {
fn cmp(&self, other: &Self) -> Ordering {
// some ordering using address as unique key
match self.addr {
SocketAddr::V4(self_addr) => match other.addr {
SocketAddr::V4(other_addr) => {
let self_port = self_addr.port();
let other_port = other_addr.port();
if self_port == other_port {
self_addr.ip().cmp(&other_addr.ip())
}
else {
self_port.cmp(&other_port)
}
},
SocketAddr::V6(_) => Ordering::Less,
},
SocketAddr::V6(self_addr) => match other.addr {
SocketAddr::V4(_) => Ordering::Greater,
SocketAddr::V6(other_addr) => {
let self_port = self_addr.port();
let other_port = other_addr.port();
if self_port == other_port {
self_addr.ip().cmp(&other_addr.ip())
}
else {
self_port.cmp(&other_port)
}
},
},
}
}
}
impl PartialOrd for Node {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
@ -224,7 +284,7 @@ impl<T> NodeTable<T> where T: Time {
mod tests {
use std::net::SocketAddr;
use message::common::Services;
use util::time::IncrementalTime;
use util::time::{IncrementalTime, ZeroTime};
use super::NodeTable;
#[test]
@ -311,4 +371,14 @@ mod tests {
assert_eq!(nodes[4].failures, 0);
}
#[test]
fn test_node_table_duplicates() {
let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap();
let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap();
let mut table = NodeTable::<ZeroTime>::default();
table.insert(s0, Services::default());
table.insert(s1, Services::default());
table.note_failure(&s0);
table.note_failure(&s1);
}
}

View File

@ -27,3 +27,13 @@ impl Time for IncrementalTime {
result
}
}
#[derive(Default)]
pub struct ZeroTime {
}
impl Time for ZeroTime {
fn get(&self) -> time::Timespec {
time::Timespec::new(0, 0)
}
}

View File

@ -13,4 +13,5 @@ db = { path = "../db" }
message = { path = "../message" }
p2p = { path = "../p2p" }
primitives = { path = "../primitives" }
test-data = { path = "../test-data" }
verification = { path = "../verification" }

View File

@ -1,15 +1,14 @@
use message::types;
use p2p::{InboundSyncConnection, InboundSyncConnectionRef};
use local_node::LocalNodeRef;
use synchronization_executor::LocalSynchronizationTaskExecutor;
pub struct InboundConnection {
local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>,
local_node: LocalNodeRef,
peer_index: usize,
}
impl InboundConnection {
pub fn new(local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>, peer_index: usize) -> InboundSyncConnectionRef {
pub fn new(local_node: LocalNodeRef, peer_index: usize) -> InboundSyncConnectionRef {
Box::new(InboundConnection {
local_node: local_node,
peer_index: peer_index,
@ -97,4 +96,8 @@ impl InboundSyncConnection for InboundConnection {
fn on_block_txn(&self, message: types::BlockTxn) {
self.local_node.on_peer_block_txn(self.peer_index, message);
}
fn on_notfound(&self, message: types::NotFound) {
self.local_node.on_peer_notfound(self.peer_index, message);
}
}

View File

@ -1,14 +1,13 @@
use p2p::{LocalSyncNode, LocalSyncNodeRef, OutboundSyncConnectionRef, InboundSyncConnectionRef};
use local_node::LocalNodeRef;
use inbound_connection::InboundConnection;
use synchronization_executor::LocalSynchronizationTaskExecutor;
pub struct InboundConnectionFactory {
local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>,
local_node: LocalNodeRef,
}
impl InboundConnectionFactory {
pub fn with_local_node(local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>) -> LocalSyncNodeRef {
pub fn with_local_node(local_node: LocalNodeRef) -> LocalSyncNodeRef {
Box::new(
InboundConnectionFactory {
local_node: local_node,

View File

@ -6,6 +6,7 @@ extern crate message;
extern crate p2p;
extern crate parking_lot;
extern crate primitives;
extern crate test_data;
extern crate time;
extern crate verification;
@ -13,13 +14,14 @@ mod hash_queue;
mod inbound_connection;
mod inbound_connection_factory;
mod local_node;
mod synchronization;
mod synchronization_chain;
mod synchronization_client;
mod synchronization_executor;
mod synchronization_peers;
mod synchronization_server;
use std::sync::Arc;
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
/// Create inbound synchronization connections factory for given `db`.
pub fn create_sync_connection_factory(db: Arc<db::Store>) -> p2p::LocalSyncNodeRef {
@ -27,9 +29,13 @@ pub fn create_sync_connection_factory(db: Arc<db::Store>) -> p2p::LocalSyncNodeR
use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor;
use local_node::LocalNode as SyncNode;
use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory;
use synchronization_server::SynchronizationServer;
use synchronization_client::{SynchronizationClient, Config as SynchronizationConfig};
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
let sync_executor = SyncExecutor::new(sync_chain.clone());
let sync_node = SyncNode::new(sync_chain, sync_executor);
let sync_server = Arc::new(Mutex::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone())));
let sync_client = SynchronizationClient::new(SynchronizationConfig::default(), sync_executor.clone(), sync_chain);
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
SyncConnectionFactory::with_local_node(sync_node)
}

View File

@ -6,26 +6,24 @@ use chain::RepresentH256;
use p2p::OutboundSyncConnectionRef;
use message::common::InventoryType;
use message::types;
use synchronization::{Synchronization, SynchronizationRef, Config as SynchronizationConfig, Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor};
use synchronization_chain::ChainRef;
use synchronization_client::{Client, SynchronizationClient};
use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor};
use synchronization_server::{Server, SynchronizationServer};
/// Thread-safe reference to the `LocalNode`.
/// Locks order:
/// 1) sync Mutex
/// 2) executor Mutex
/// 2) chain RwLock
pub type LocalNodeRef<T> = Arc<LocalNode<T>>;
pub type LocalNodeRef = Arc<LocalNode<LocalSynchronizationTaskExecutor, SynchronizationServer, SynchronizationClient<LocalSynchronizationTaskExecutor>>>;
/// Local synchronization node
pub struct LocalNode<T: SynchronizationTaskExecutor + PeersConnections + Send + 'static> {
pub struct LocalNode<T: SynchronizationTaskExecutor + PeersConnections,
U: Server,
V: Client> {
/// Throughout counter of synchronization peers
peer_counter: AtomicUsize,
/// Synchronization chain
chain: ChainRef,
/// Synchronization executor
executor: Arc<Mutex<T>>,
/// Synchronization process
sync: SynchronizationRef<T>,
client: Arc<Mutex<V>>,
/// Synchronization server
server: Arc<Mutex<U>>,
}
/// Peers list
@ -34,21 +32,22 @@ pub trait PeersConnections {
fn remove_peer_connection(&mut self, peer_index: usize);
}
impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + Send + 'static {
impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersConnections,
U: Server,
V: Client {
/// New synchronization node with given storage
pub fn new(chain: ChainRef, executor: Arc<Mutex<T>>) -> LocalNodeRef<T> {
let sync = Synchronization::new(SynchronizationConfig::default(), executor.clone(), chain.clone());
Arc::new(LocalNode {
pub fn new(server: Arc<Mutex<U>>, client: Arc<Mutex<V>>, executor: Arc<Mutex<T>>) -> Self {
LocalNode {
peer_counter: AtomicUsize::new(0),
chain: chain,
executor: executor,
sync: sync,
})
client: client,
server: server,
}
}
/// Best block hash (including non-verified, requested && non-requested blocks)
pub fn best_block(&self) -> db::BestBlock {
self.chain.read().best_block()
self.client.lock().best_block()
}
pub fn create_sync_session(&self, _best_block_height: i32, outbound_connection: OutboundSyncConnectionRef) -> usize {
@ -71,7 +70,7 @@ impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + S
trace!(target: "sync", "Stopping sync session with peer#{}", peer_index);
self.executor.lock().remove_peer_connection(peer_index);
self.sync.lock().on_peer_disconnected(peer_index);
self.client.lock().on_peer_disconnected(peer_index);
}
pub fn on_peer_inventory(&self, peer_index: usize, message: types::Inv) {
@ -90,18 +89,22 @@ impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + S
// if there are unknown blocks => start synchronizing with peer
if !blocks_inventory.is_empty() {
self.sync.lock().on_new_blocks_inventory(peer_index, blocks_inventory);
self.client.lock().on_new_blocks_inventory(peer_index, blocks_inventory);
}
// TODO: process unknown transactions, etc...
}
pub fn on_peer_getdata(&self, peer_index: usize, _message: types::GetData) {
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
self.server.lock().serve_getdata(peer_index, message);
}
pub fn on_peer_getblocks(&self, peer_index: usize, _message: types::GetBlocks) {
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index);
self.server.lock().serve_getblocks(peer_index, message);
}
pub fn on_peer_getheaders(&self, peer_index: usize, _message: types::GetHeaders) {
@ -116,7 +119,7 @@ impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + S
trace!(target: "sync", "Got `block` message from peer#{}. Block hash: {}", peer_index, message.block.hash());
// try to process new block
self.sync.lock().on_peer_block(peer_index, message.block);
self.client.lock().on_peer_block(peer_index, message.block);
}
pub fn on_peer_headers(&self, peer_index: usize, _message: types::Headers) {
@ -166,19 +169,29 @@ impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + S
pub fn on_peer_block_txn(&self, peer_index: usize, _message: types::BlockTxn) {
trace!(target: "sync", "Got `blocktxn` message from peer#{}", peer_index);
}
pub fn on_peer_notfound(&self, peer_index: usize, _message: types::NotFound) {
trace!(target: "sync", "Got `notfound` message from peer#{}", peer_index);
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use synchronization::Task;
use synchronization::tests::DummyTaskExecutor;
use chain::RepresentH256;
use synchronization_executor::Task;
use synchronization_client::tests::DummyTaskExecutor;
use synchronization_client::{Config, SynchronizationClient};
use synchronization_chain::Chain;
use p2p::{OutboundSyncConnection, OutboundSyncConnectionRef};
use message::types;
use message::common::{InventoryVector, InventoryType};
use db;
use super::LocalNode;
use test_data;
use synchronization_server::ServerTask;
use synchronization_server::tests::DummyServer;
struct DummyOutboundSyncConnection;
@ -207,18 +220,47 @@ mod tests {
fn send_compact_block(&self, _message: &types::CompactBlock) {}
fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {}
fn send_block_txn(&self, _message: &types::BlockTxn) {}
fn send_notfound(&self, _message: &types::NotFound) {}
}
fn create_local_node() -> (Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<DummyServer>>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor>>) {
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
let executor = Arc::new(Mutex::new(DummyTaskExecutor::default()));
let server = Arc::new(Mutex::new(DummyServer::new()));
let config = Config { skip_verification: true };
let client = SynchronizationClient::new(config, executor.clone(), chain);
let local_node = LocalNode::new(server.clone(), client, executor.clone());
(executor, server, local_node)
}
#[test]
fn local_node_request_inventory_on_sync_start() {
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
let executor = Arc::new(Mutex::new(DummyTaskExecutor::default()));
let local_node = LocalNode::new(chain, executor.clone());
let (executor, _, local_node) = create_local_node();
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
// start sync session
local_node.start_sync_session(peer_index, 0);
// => ask for inventory
let tasks = executor.lock().take_tasks();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks, vec![Task::RequestInventory(peer_index)]);
}
}
#[test]
fn local_node_serves_block() {
let (_, server, local_node) = create_local_node();
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
// peer requests genesis block
let genesis_block_hash = test_data::genesis().hash();
let inventory = vec![
InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: genesis_block_hash.clone(),
}
];
local_node.on_peer_getdata(peer_index, types::GetData {
inventory: inventory.clone()
});
// => `getdata` is served
let tasks = server.lock().take_tasks();
assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]);
}
}

View File

@ -199,6 +199,21 @@ impl Chain {
}
}
/// Get block number from storage
pub fn storage_block_number(&self, hash: &H256) -> Option<u32> {
self.storage.block_number(hash)
}
/// Get block hash from storage
pub fn storage_block_hash(&self, number: u32) -> Option<H256> {
self.storage.block_hash(number)
}
/// Get block from the storage
pub fn storage_block(&self, hash: &H256) -> Option<Block> {
self.storage.block(db::BlockRef::Hash(hash.clone()))
}
/// Prepare best block locator hashes
pub fn best_block_locator_hashes(&self) -> Vec<H256> {
let mut result: Vec<H256> = Vec::with_capacity(4);

View File

@ -15,6 +15,7 @@ use synchronization_chain::{ChainRef, BlockState};
#[cfg(test)]
use synchronization_chain::{Information as ChainInformation};
use verification::{ChainVerifier, Error as VerificationError, Verify};
use synchronization_executor::{Task, TaskExecutor};
use time;
///! Blocks synchronization process:
@ -77,20 +78,7 @@ const MIN_BLOCKS_IN_REQUEST: u32 = 32;
/// Maximum number of blocks to request from peer
const MAX_BLOCKS_IN_REQUEST: u32 = 512;
/// Thread-safe reference to the `Synchronization`
pub type SynchronizationRef<T> = Arc<Mutex<Synchronization<T>>>;
/// Synchronization task for the peer.
#[derive(Debug, Eq, PartialEq)]
pub enum Task {
/// Request given blocks.
RequestBlocks(usize, Vec<H256>),
/// Request full inventory using block_locator_hashes.
RequestInventory(usize),
/// Request inventory using best block locator only.
RequestBestInventory(usize),
}
/// Synchronization state
#[derive(Debug, Clone, Copy)]
pub enum State {
Synchronizing(f64, u32),
@ -111,11 +99,6 @@ pub struct Information {
pub orphaned: usize,
}
/// Synchronization task executor
pub trait TaskExecutor {
fn execute(&mut self, task: Task);
}
/// Verification thread tasks
enum VerificationTask {
/// Verify single block
@ -124,15 +107,26 @@ enum VerificationTask {
Stop,
}
/// Synchronization config
/// Synchronization client trait
pub trait Client : Send + 'static {
fn best_block(&self) -> db::BestBlock;
fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>);
fn on_peer_block(&mut self, peer_index: usize, block: Block);
fn on_peer_disconnected(&mut self, peer_index: usize);
fn reset(&mut self, is_hard: bool);
fn on_block_verification_success(&mut self, block: Block);
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256);
}
/// Synchronization client configuration options.
#[derive(Default)]
pub struct Config {
/// Skip blocks verification
pub skip_block_verification: bool,
}
/// Do not verify incoming blocks before inserting to db.
pub skip_verification: bool,
}
/// New blocks synchronization process.
pub struct Synchronization<T: TaskExecutor + Send + 'static> {
/// Synchronization client.
pub struct SynchronizationClient<T: TaskExecutor> {
/// Synchronization state.
state: State,
/// Synchronization peers.
@ -158,7 +152,7 @@ impl State {
}
}
impl<T> Drop for Synchronization<T> where T: TaskExecutor + Send + 'static {
impl<T> Drop for SynchronizationClient<T> where T: TaskExecutor {
fn drop(&mut self) {
if let Some(join_handle) = self.verification_worker_thread.take() {
self.verification_work_sender
@ -170,11 +164,91 @@ impl<T> Drop for Synchronization<T> where T: TaskExecutor + Send + 'static {
}
}
impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
/// Get best known block
fn best_block(&self) -> db::BestBlock {
self.chain.read().best_block()
}
/// Try to queue synchronization of unknown blocks when new inventory is received.
fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>) {
self.process_new_blocks_inventory(peer_index, peer_hashes);
self.execute_synchronization_tasks();
}
/// Process new block.
fn on_peer_block(&mut self, peer_index: usize, block: Block) {
let block_hash = block.hash();
// update peers to select next tasks
self.peers.on_block_received(peer_index, &block_hash);
self.process_peer_block(block_hash, block);
self.execute_synchronization_tasks();
}
/// Peer disconnected.
fn on_peer_disconnected(&mut self, peer_index: usize) {
self.peers.on_peer_disconnected(peer_index);
// when last peer is disconnected, reset, but let verifying blocks be verified
self.reset(false);
}
/// Reset synchronization process
fn reset(&mut self, is_hard: bool) {
self.peers.reset();
self.orphaned_blocks.clear();
// TODO: reset verification queue
let mut chain = self.chain.write();
chain.remove_blocks_with_state(BlockState::Requested);
chain.remove_blocks_with_state(BlockState::Scheduled);
if is_hard {
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
chain.remove_blocks_with_state(BlockState::Verifying);
warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block());
}
else {
self.state = State::Saturated;
}
}
/// Process successful block verification
fn on_block_verification_success(&mut self, block: Block) {
{
let hash = block.hash();
let mut chain = self.chain.write();
// remove from verifying queue
assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front);
// insert to storage
chain.insert_best_block(block)
.expect("Error inserting to db.");
}
// continue with synchronization
self.execute_synchronization_tasks();
}
/// Process failed block verification
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) {
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
// reset synchronization process
self.reset(true);
// start new tasks
self.execute_synchronization_tasks();
}
}
impl<T> SynchronizationClient<T> where T: TaskExecutor {
/// Create new synchronization window
pub fn new(config: Config, executor: Arc<Mutex<T>>, chain: ChainRef) -> SynchronizationRef<T> {
let sync = SynchronizationRef::new(Mutex::new(
Synchronization {
pub fn new(config: Config, executor: Arc<Mutex<T>>, chain: ChainRef) -> Arc<Mutex<Self>> {
let sync = Arc::new(Mutex::new(
SynchronizationClient {
state: State::Saturated,
peers: Peers::new(),
executor: executor,
@ -185,7 +259,7 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
}
));
if !config.skip_block_verification {
if !config.skip_verification {
let (verification_work_sender, verification_work_receiver) = channel();
let csync = sync.clone();
let mut lsync = sync.lock();
@ -194,7 +268,7 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
lsync.verification_worker_thread = Some(thread::Builder::new()
.name("Sync verification thread".to_string())
.spawn(move || {
Synchronization::verification_worker_proc(csync, storage, verification_work_receiver)
SynchronizationClient::verification_worker_proc(csync, storage, verification_work_receiver)
})
.expect("Error creating verification thread"));
}
@ -213,50 +287,6 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
}
}
/// Try to queue synchronization of unknown blocks when new inventory is received.
pub fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>) {
self.process_new_blocks_inventory(peer_index, peer_hashes);
self.execute_synchronization_tasks();
}
/// Process new block.
pub fn on_peer_block(&mut self, peer_index: usize, block: Block) {
let block_hash = block.hash();
// update peers to select next tasks
self.peers.on_block_received(peer_index, &block_hash);
self.process_peer_block(block_hash, block);
self.execute_synchronization_tasks();
}
/// Peer disconnected.
pub fn on_peer_disconnected(&mut self, peer_index: usize) {
self.peers.on_peer_disconnected(peer_index);
// when last peer is disconnected, reset, but let verifying blocks be verified
self.reset(false);
}
/// Reset synchronization process
pub fn reset(&mut self, is_hard: bool) {
self.peers.reset();
self.orphaned_blocks.clear();
// TODO: reset verification queue
let mut chain = self.chain.write();
chain.remove_blocks_with_state(BlockState::Requested);
chain.remove_blocks_with_state(BlockState::Scheduled);
if is_hard {
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
chain.remove_blocks_with_state(BlockState::Verifying);
warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block());
}
else {
self.state = State::Saturated;
}
}
/// Process new blocks inventory
fn process_new_blocks_inventory(&mut self, peer_index: usize, mut peer_hashes: Vec<H256>) {
// | requested | QUEUED |
@ -273,54 +303,62 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
let mut chain = self.chain.write();
// new block is scheduled => move to synchronizing state
if !self.state.is_synchronizing() {
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
}
// when synchronization is idling
// => request full inventory
if !chain.has_blocks_of_state(BlockState::Scheduled)
&& !chain.has_blocks_of_state(BlockState::Requested) {
let unknown_blocks = peer_hashes.into_iter()
.filter(|hash| chain.block_has_state(&hash, BlockState::Unknown))
.collect();
chain.schedule_blocks_hashes(unknown_blocks);
self.peers.insert(peer_index);
return;
}
// cases: [2], [5], [6], [8]
// if last block from peer_hashes is in window { requested_hashes + queued_hashes }
// => no new blocks for synchronization, but we will use this peer in synchronization
let peer_hashes_len = peer_hashes.len();
if chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Scheduled)
|| chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Requested) {
self.peers.insert(peer_index);
return;
}
// cases: [1], [3], [4], [7], [9], [10]
// try to find new blocks for synchronization from inventory
let mut last_known_peer_hash_index = peer_hashes_len - 1;
loop {
if chain.block_state(&peer_hashes[last_known_peer_hash_index]) != BlockState::Unknown {
// we have found first block which is known to us
// => blocks in range [(last_known_peer_hash_index + 1)..peer_hashes_len] are unknown
// && must be scheduled for request
let unknown_peer_hashes = peer_hashes.split_off(last_known_peer_hash_index + 1);
// when synchronization is idling
// => request full inventory
if !chain.has_blocks_of_state(BlockState::Scheduled)
&& !chain.has_blocks_of_state(BlockState::Requested) {
let unknown_blocks: Vec<_> = peer_hashes.into_iter()
.filter(|hash| chain.block_has_state(&hash, BlockState::Unknown))
.collect();
chain.schedule_blocks_hashes(unknown_peer_hashes);
// no new blocks => no need to switch to the synchronizing state
if unknown_blocks.is_empty() {
return;
}
chain.schedule_blocks_hashes(unknown_blocks);
self.peers.insert(peer_index);
break;
}
// cases: [2], [5], [6], [8]
// if last block from peer_hashes is in window { requested_hashes + queued_hashes }
// => no new blocks for synchronization, but we will use this peer in synchronization
let peer_hashes_len = peer_hashes.len();
if chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Scheduled)
|| chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Requested) {
self.peers.insert(peer_index);
return;
}
if last_known_peer_hash_index == 0 {
// either these are blocks from the future or blocks from the past
// => TODO: ignore this peer during synchronization
return;
// cases: [1], [3], [4], [7], [9], [10]
// try to find new blocks for synchronization from inventory
let mut last_known_peer_hash_index = peer_hashes_len - 1;
loop {
if chain.block_state(&peer_hashes[last_known_peer_hash_index]) != BlockState::Unknown {
// we have found first block which is known to us
// => blocks in range [(last_known_peer_hash_index + 1)..peer_hashes_len] are unknown
// && must be scheduled for request
let unknown_peer_hashes = peer_hashes.split_off(last_known_peer_hash_index + 1);
chain.schedule_blocks_hashes(unknown_peer_hashes);
self.peers.insert(peer_index);
break;
}
if last_known_peer_hash_index == 0 {
// either these are blocks from the future or blocks from the past
// => TODO: ignore this peer during synchronization
return;
}
last_known_peer_hash_index -= 1;
}
last_known_peer_hash_index -= 1;
}
// move to synchronizing state
if !self.state.is_synchronizing() {
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
}
}
@ -374,9 +412,9 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
// proceed to the next orphaned block
if let Entry::Occupied(orphaned_block_entry) = self.orphaned_blocks.entry(current_block_hash) {
let (orphaned_parent_hash, orphaned_block) = orphaned_block_entry.remove_entry();
current_block_hash = orphaned_parent_hash;
let (_, orphaned_block) = orphaned_block_entry.remove_entry();
current_block = orphaned_block;
current_block_hash = current_block.hash();
}
else {
break;
@ -453,7 +491,7 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
}
/// Thread procedure for handling verification tasks
fn verification_worker_proc(sync: SynchronizationRef<T>, storage: Arc<db::Store>, work_receiver: Receiver<VerificationTask>) {
fn verification_worker_proc(sync: Arc<Mutex<Self>>, storage: Arc<db::Store>, work_receiver: Receiver<VerificationTask>) {
let verifier = ChainVerifier::new(storage);
while let Ok(task) = work_receiver.recv() {
match task {
@ -471,35 +509,6 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
}
}
}
/// Process successful block verification
fn on_block_verification_success(&mut self, block: Block) {
{
let hash = block.hash();
let mut chain = self.chain.write();
// remove from verifying queue
assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front);
// insert to storage
chain.insert_best_block(block)
.expect("Error inserting to db.");
}
// continue with synchronization
self.execute_synchronization_tasks();
}
/// Process failed block verification
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) {
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
// reset synchronization process
self.reset(true);
// start new tasks
self.execute_synchronization_tasks();
}
}
#[cfg(test)]
@ -508,9 +517,11 @@ pub mod tests {
use std::mem::replace;
use parking_lot::{Mutex, RwLock};
use chain::{Block, RepresentH256};
use super::{Synchronization, SynchronizationRef, Config, Task, TaskExecutor};
use super::{Client, Config, SynchronizationClient};
use synchronization_executor::{Task, TaskExecutor};
use local_node::PeersConnections;
use synchronization_chain::{Chain, ChainRef};
use test_data;
use p2p::OutboundSyncConnectionRef;
use db;
@ -536,13 +547,12 @@ pub mod tests {
}
}
fn create_sync() -> (Arc<Mutex<DummyTaskExecutor>>, SynchronizationRef<DummyTaskExecutor>) {
fn create_sync() -> (Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<SynchronizationClient<DummyTaskExecutor>>>) {
let storage = Arc::new(db::TestStorage::with_genesis_block());
let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone())));
let executor = Arc::new(Mutex::new(DummyTaskExecutor::default()));
(executor.clone(), Synchronization::new(Config {
skip_block_verification: true,
}, executor, chain))
let config = Config { skip_verification: true };
(executor.clone(), SynchronizationClient::new(config, executor, chain))
}
#[test]
@ -658,7 +668,6 @@ pub mod tests {
&& sync.information().orphaned == 1);
// receive block from peer#1
sync.on_peer_block(1, block1);
println!("=== {:?}", sync.information().chain);
assert!(sync.information().chain.requested == 0
&& sync.information().orphaned == 0
&& sync.information().chain.stored == 3);
@ -683,4 +692,17 @@ pub mod tests {
assert!(!sync.information().state.is_synchronizing());
}
}
#[test]
fn synchronization_not_starting_when_receiving_known_blocks() {
let (executor, sync) = create_sync();
let mut sync = sync.lock();
// saturated => receive inventory with known blocks only
sync.on_new_blocks_inventory(1, vec![test_data::genesis().hash()]);
// => no need to start synchronization
assert!(!sync.information().state.is_synchronizing());
// => no synchronization tasks are scheduled
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![]);
}
}

View File

@ -1,16 +1,38 @@
use std::sync::Arc;
use std::collections::HashMap;
use parking_lot::Mutex;
use chain::{Block, RepresentH256};
use message::common::{InventoryVector, InventoryType};
use message::types;
use primitives::hash::H256;
use p2p::OutboundSyncConnectionRef;
use synchronization_chain::ChainRef;
use synchronization::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor};
use local_node::PeersConnections;
pub type LocalSynchronizationTaskExecutorRef = Arc<Mutex<LocalSynchronizationTaskExecutor>>;
/// Synchronization task executor
pub trait TaskExecutor : Send + 'static {
fn execute(&mut self, task: Task);
}
/// Synchronization task for the peer.
#[derive(Debug, PartialEq)]
pub enum Task {
/// Request given blocks.
RequestBlocks(usize, Vec<H256>),
/// Request full inventory using block_locator_hashes.
RequestInventory(usize),
/// Request inventory using best block locator only.
RequestBestInventory(usize),
/// Send block.
SendBlock(usize, Block),
/// Send notfound
SendNotFound(usize, Vec<InventoryVector>),
/// Send inventory
SendInventory(usize, Vec<InventoryVector>),
}
/// Synchronization tasks executor
pub struct LocalSynchronizationTaskExecutor {
/// Active synchronization peers
@ -38,12 +60,12 @@ impl PeersConnections for LocalSynchronizationTaskExecutor {
}
}
impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {
fn execute(&mut self, task: SynchronizationTask) {
impl TaskExecutor for LocalSynchronizationTaskExecutor {
fn execute(&mut self, task: Task) {
// TODO: what is types::GetBlocks::version here? (@ PR#37)
match task {
SynchronizationTask::RequestBlocks(peer_index, blocks_hashes) => {
Task::RequestBlocks(peer_index, blocks_hashes) => {
let getdata = types::GetData {
inventory: blocks_hashes.into_iter()
.map(|hash| InventoryVector {
@ -58,7 +80,7 @@ impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_getdata(&getdata);
}
}
SynchronizationTask::RequestInventory(peer_index) => {
Task::RequestInventory(peer_index) => {
let block_locator_hashes = self.chain.read().block_locator_hashes();
let getblocks = types::GetBlocks {
version: 0,
@ -72,7 +94,7 @@ impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_getblocks(&getblocks);
}
},
SynchronizationTask::RequestBestInventory(peer_index) => {
Task::RequestBestInventory(peer_index) => {
let block_locator_hashes = self.chain.read().best_block_locator_hashes();
let getblocks = types::GetBlocks {
version: 0,
@ -86,6 +108,39 @@ impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {
connection.send_getblocks(&getblocks);
}
},
Task::SendBlock(peer_index, block) => {
let block_message = types::Block {
block: block,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index);
connection.send_block(&block_message);
}
},
Task::SendNotFound(peer_index, unknown_inventory) => {
let notfound = types::NotFound {
inventory: unknown_inventory,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len());
connection.send_notfound(&notfound);
}
},
Task::SendInventory(peer_index, inventory) => {
let inventory = types::Inv {
inventory: inventory,
};
if let Some(connection) = self.peers.get_mut(&peer_index) {
let connection = &mut *connection;
trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len());
connection.send_inventory(&inventory);
}
},
}
}
}

View File

@ -0,0 +1,284 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::collections::{VecDeque, HashMap};
use std::collections::hash_map::Entry;
use parking_lot::{Mutex, Condvar};
use message::common::{InventoryVector, InventoryType};
use db;
use primitives::hash::H256;
use synchronization_chain::ChainRef;
use synchronization_executor::{Task, TaskExecutor};
use message::types;
/// Synchronization requests server trait
pub trait Server : Send + 'static {
fn serve_getdata(&mut self, peer_index: usize, message: types::GetData);
fn serve_getblocks(&mut self, peer_index: usize, message: types::GetBlocks);
}
/// Synchronization requests server
pub struct SynchronizationServer {
chain: ChainRef,
queue_ready: Arc<Condvar>,
queue: Arc<Mutex<ServerQueue>>,
worker_thread: Option<thread::JoinHandle<()>>,
}
struct ServerQueue {
is_stopping: AtomicBool,
queue_ready: Arc<Condvar>,
peers_queue: VecDeque<usize>,
tasks_queue: HashMap<usize, VecDeque<ServerTask>>,
}
#[derive(Debug, PartialEq)]
pub enum ServerTask {
ServeGetData(Vec<InventoryVector>),
ServeGetBlocks(db::BestBlock, H256),
ReturnNotFound(Vec<InventoryVector>),
ReturnBlock(H256),
}
impl SynchronizationServer {
pub fn new<T: TaskExecutor>(chain: ChainRef, executor: Arc<Mutex<T>>) -> Self {
let queue_ready = Arc::new(Condvar::new());
let queue = Arc::new(Mutex::new(ServerQueue::new(queue_ready.clone())));
let mut server = SynchronizationServer {
chain: chain.clone(),
queue_ready: queue_ready.clone(),
queue: queue.clone(),
worker_thread: None,
};
server.worker_thread = Some(thread::spawn(move || {
SynchronizationServer::server_worker(queue_ready, queue, chain, executor);
}));
server
}
fn locate_known_block(&self, block_locator_hashes: Vec<H256>) -> Option<db::BestBlock> {
let chain = self.chain.read();
block_locator_hashes.into_iter()
.filter_map(|hash| chain
.storage_block_number(&hash)
.map(|number| db::BestBlock {
number: number,
hash: hash,
}))
.nth(0)
}
fn server_worker<T: TaskExecutor>(queue_ready: Arc<Condvar>, queue: Arc<Mutex<ServerQueue>>, chain: ChainRef, executor: Arc<Mutex<T>>) {
loop {
let server_task = {
let mut queue = queue.lock();
if queue.is_stopping.load(Ordering::SeqCst) {
break
}
queue.next_task()
.map_or_else(|| {
queue_ready.wait(&mut queue);
queue.next_task()
}, |next_task| Some(next_task))
};
match server_task {
// has new task
Some(server_task) => match server_task {
// `getdata` => `notfound` + `block` + ...
(peer_index, ServerTask::ServeGetData(inventory)) => {
let mut unknown_items: Vec<InventoryVector> = Vec::new();
let mut new_tasks: Vec<ServerTask> = Vec::new();
for item in inventory {
match item.inv_type {
InventoryType::MessageBlock => {
match chain.read().storage_block_number(&item.hash) {
Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())),
None => unknown_items.push(item),
}
},
_ => (), // TODO: process other inventory types
}
}
// respond with `notfound` message for unknown data
if !unknown_items.is_empty() {
trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index);
new_tasks.push(ServerTask::ReturnNotFound(unknown_items));
}
// schedule data responses
if !new_tasks.is_empty() {
trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index);
queue.lock().add_tasks(peer_index, new_tasks);
}
},
// `inventory`
(peer_index, ServerTask::ServeGetBlocks(best_block, hash_stop)) => {
let storage_block_hash = chain.read().storage_block_hash(best_block.number);
if let Some(hash) = storage_block_hash {
// check that chain has not reorganized since task was queued
if hash == best_block.hash {
let mut inventory: Vec<InventoryVector> = Vec::new();
let first_block_number = best_block.number + 1;
let last_block_number = best_block.number + 500;
// 500 hashes after best_block.number OR hash_stop OR blockchain end
for block_number in first_block_number..last_block_number {
match chain.read().storage_block_hash(block_number) {
Some(ref block_hash) if block_hash == &hash_stop => break,
None => break,
Some(block_hash) => {
inventory.push(InventoryVector {
inv_type: InventoryType::MessageBlock,
hash: block_hash,
});
},
}
}
if !inventory.is_empty() {
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", inventory.len(), peer_index);
executor.lock().execute(Task::SendInventory(peer_index, inventory));
}
}
}
},
// `notfound`
(peer_index, ServerTask::ReturnNotFound(inventory)) => {
executor.lock().execute(Task::SendNotFound(peer_index, inventory));
},
// `block`
(peer_index, ServerTask::ReturnBlock(block_hash)) => {
let storage_block = chain.read().storage_block(&block_hash);
if let Some(storage_block) = storage_block {
executor.lock().execute(Task::SendBlock(peer_index, storage_block));
}
},
},
// no tasks after wake-up => stopping
None => break,
}
}
}
}
impl Drop for SynchronizationServer {
fn drop(&mut self) {
if let Some(join_handle) = self.worker_thread.take() {
self.queue.lock().is_stopping.store(true, Ordering::SeqCst);
self.queue_ready.notify_one();
join_handle.join().expect("Clean shutdown.");
}
}
}
impl Server for SynchronizationServer {
fn serve_getdata(&mut self, peer_index: usize, message: types::GetData) {
self.queue.lock().add_task(peer_index, ServerTask::ServeGetData(message.inventory));
}
fn serve_getblocks(&mut self, peer_index: usize, message: types::GetBlocks) {
if let Some(best_common_block) = self.locate_known_block(message.block_locator_hashes) {
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
self.queue.lock().add_task(peer_index, ServerTask::ServeGetBlocks(best_common_block, message.hash_stop));
}
else {
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
}
}
}
impl ServerQueue {
pub fn new(queue_ready: Arc<Condvar>) -> Self {
ServerQueue {
is_stopping: AtomicBool::new(false),
queue_ready: queue_ready,
peers_queue: VecDeque::new(),
tasks_queue: HashMap::new(),
}
}
pub fn next_task(&mut self) -> Option<(usize, ServerTask)> {
self.peers_queue.pop_front()
.map(|peer| {
let (peer_task, no_tasks_left) = {
let peer_tasks = self.tasks_queue.get_mut(&peer).expect("for each peer there is non-empty tasks queue");
let peer_task = peer_tasks.pop_front().expect("for each peer there is non-empty tasks queue");
(peer_task, peer_tasks.is_empty())
};
// remove if no tasks left || schedule otherwise
if no_tasks_left {
self.tasks_queue.remove(&peer);
}
else {
self.peers_queue.push_back(peer);
}
(peer, peer_task)
})
}
pub fn add_task(&mut self, peer_index: usize, task: ServerTask) {
match self.tasks_queue.entry(peer_index) {
Entry::Occupied(mut entry) => {
entry.get_mut().push_back(task);
},
Entry::Vacant(entry) => {
let mut new_tasks = VecDeque::new();
new_tasks.push_back(task);
entry.insert(new_tasks);
self.peers_queue.push_back(peer_index);
}
}
self.queue_ready.notify_one();
}
pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec<ServerTask>) {
match self.tasks_queue.entry(peer_index) {
Entry::Occupied(mut entry) => {
entry.get_mut().extend(tasks);
},
Entry::Vacant(entry) => {
let mut new_tasks = VecDeque::new();
new_tasks.extend(tasks);
entry.insert(new_tasks);
self.peers_queue.push_back(peer_index);
}
}
self.queue_ready.notify_one();
}
}
#[cfg(test)]
pub mod tests {
use super::{Server, ServerTask};
use message::types;
use db;
use std::mem::replace;
pub struct DummyServer {
tasks: Vec<(usize, ServerTask)>,
}
impl DummyServer {
pub fn new() -> Self {
DummyServer {
tasks: Vec::new(),
}
}
pub fn take_tasks(&mut self) -> Vec<(usize, ServerTask)> {
replace(&mut self.tasks, Vec::new())
}
}
impl Server for DummyServer {
fn serve_getdata(&mut self, peer_index: usize, message: types::GetData) {
self.tasks.push((peer_index, ServerTask::ServeGetData(message.inventory)));
}
fn serve_getblocks(&mut self, peer_index: usize, message: types::GetBlocks) {
self.tasks.push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock {
number: 0,
hash: message.block_locator_hashes[0].clone(),
}, message.hash_stop)));
}
}
}