Serving some sync requests (#73)
* started work on synchronization_server * continue synchronization server * added response to getblocks message * renamed Synchronization to Client * LocalNode parametrized by Server * LocalNode parametrized by Client * fixed test * support connect to given port via command line * fixed couple of sync server issues && added sync server trace * fixing sync issues * fixed grumbles
This commit is contained in:
parent
3074bec9ec
commit
0b212ec5eb
|
@ -582,6 +582,7 @@ dependencies = [
|
|||
"p2p 0.1.0",
|
||||
"parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"primitives 0.1.0",
|
||||
"test-data 0.1.0",
|
||||
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"verification 0.1.0",
|
||||
]
|
||||
|
|
|
@ -36,6 +36,7 @@ pub trait InboundSyncConnection : Send + Sync {
|
|||
fn on_compact_block(&self, message: types::CompactBlock);
|
||||
fn on_get_block_txn(&self, message: types::GetBlockTxn);
|
||||
fn on_block_txn(&self, message: types::BlockTxn);
|
||||
fn on_notfound(&self, message: types::NotFound);
|
||||
}
|
||||
|
||||
pub trait OutboundSyncConnection : Send + Sync {
|
||||
|
@ -57,6 +58,7 @@ pub trait OutboundSyncConnection : Send + Sync {
|
|||
fn send_compact_block(&self, message: &types::CompactBlock);
|
||||
fn send_get_block_txn(&self, message: &types::GetBlockTxn);
|
||||
fn send_block_txn(&self, message: &types::BlockTxn);
|
||||
fn send_notfound(&self, message: &types::NotFound);
|
||||
}
|
||||
|
||||
struct OutboundSync {
|
||||
|
@ -154,6 +156,10 @@ impl OutboundSyncConnection for OutboundSync {
|
|||
fn send_block_txn(&self, message: &types::BlockTxn) {
|
||||
self.send_message(message);
|
||||
}
|
||||
|
||||
fn send_notfound(&self, message: &types::NotFound) {
|
||||
self.send_message(message);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SyncProtocol {
|
||||
|
@ -248,6 +254,10 @@ impl Protocol for SyncProtocol {
|
|||
let message: types::BlockTxn = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_block_txn(message);
|
||||
}
|
||||
else if command == &types::NotFound::command() {
|
||||
let message: types::NotFound = try!(deserialize_payload(payload, version));
|
||||
self.inbound_connection.on_notfound(message);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -13,4 +13,5 @@ db = { path = "../db" }
|
|||
message = { path = "../message" }
|
||||
p2p = { path = "../p2p" }
|
||||
primitives = { path = "../primitives" }
|
||||
test-data = { path = "../test-data" }
|
||||
verification = { path = "../verification" }
|
||||
|
|
|
@ -1,15 +1,14 @@
|
|||
use message::types;
|
||||
use p2p::{InboundSyncConnection, InboundSyncConnectionRef};
|
||||
use local_node::LocalNodeRef;
|
||||
use synchronization_executor::LocalSynchronizationTaskExecutor;
|
||||
|
||||
pub struct InboundConnection {
|
||||
local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>,
|
||||
local_node: LocalNodeRef,
|
||||
peer_index: usize,
|
||||
}
|
||||
|
||||
impl InboundConnection {
|
||||
pub fn new(local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>, peer_index: usize) -> InboundSyncConnectionRef {
|
||||
pub fn new(local_node: LocalNodeRef, peer_index: usize) -> InboundSyncConnectionRef {
|
||||
Box::new(InboundConnection {
|
||||
local_node: local_node,
|
||||
peer_index: peer_index,
|
||||
|
@ -97,4 +96,8 @@ impl InboundSyncConnection for InboundConnection {
|
|||
fn on_block_txn(&self, message: types::BlockTxn) {
|
||||
self.local_node.on_peer_block_txn(self.peer_index, message);
|
||||
}
|
||||
|
||||
fn on_notfound(&self, message: types::NotFound) {
|
||||
self.local_node.on_peer_notfound(self.peer_index, message);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,13 @@
|
|||
use p2p::{LocalSyncNode, LocalSyncNodeRef, OutboundSyncConnectionRef, InboundSyncConnectionRef};
|
||||
use local_node::LocalNodeRef;
|
||||
use inbound_connection::InboundConnection;
|
||||
use synchronization_executor::LocalSynchronizationTaskExecutor;
|
||||
|
||||
pub struct InboundConnectionFactory {
|
||||
local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>,
|
||||
local_node: LocalNodeRef,
|
||||
}
|
||||
|
||||
impl InboundConnectionFactory {
|
||||
pub fn with_local_node(local_node: LocalNodeRef<LocalSynchronizationTaskExecutor>) -> LocalSyncNodeRef {
|
||||
pub fn with_local_node(local_node: LocalNodeRef) -> LocalSyncNodeRef {
|
||||
Box::new(
|
||||
InboundConnectionFactory {
|
||||
local_node: local_node,
|
||||
|
|
|
@ -6,6 +6,7 @@ extern crate message;
|
|||
extern crate p2p;
|
||||
extern crate parking_lot;
|
||||
extern crate primitives;
|
||||
extern crate test_data;
|
||||
extern crate time;
|
||||
extern crate verification;
|
||||
|
||||
|
@ -13,13 +14,14 @@ mod hash_queue;
|
|||
mod inbound_connection;
|
||||
mod inbound_connection_factory;
|
||||
mod local_node;
|
||||
mod synchronization;
|
||||
mod synchronization_chain;
|
||||
mod synchronization_client;
|
||||
mod synchronization_executor;
|
||||
mod synchronization_peers;
|
||||
mod synchronization_server;
|
||||
|
||||
use std::sync::Arc;
|
||||
use parking_lot::RwLock;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
|
||||
/// Create inbound synchronization connections factory for given `db`.
|
||||
pub fn create_sync_connection_factory(db: Arc<db::Store>) -> p2p::LocalSyncNodeRef {
|
||||
|
@ -27,9 +29,13 @@ pub fn create_sync_connection_factory(db: Arc<db::Store>) -> p2p::LocalSyncNodeR
|
|||
use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor;
|
||||
use local_node::LocalNode as SyncNode;
|
||||
use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory;
|
||||
use synchronization_server::SynchronizationServer;
|
||||
use synchronization_client::{SynchronizationClient, Config as SynchronizationConfig};
|
||||
|
||||
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
|
||||
let sync_executor = SyncExecutor::new(sync_chain.clone());
|
||||
let sync_node = SyncNode::new(sync_chain, sync_executor);
|
||||
let sync_server = Arc::new(Mutex::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone())));
|
||||
let sync_client = SynchronizationClient::new(SynchronizationConfig::default(), sync_executor.clone(), sync_chain);
|
||||
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
|
||||
SyncConnectionFactory::with_local_node(sync_node)
|
||||
}
|
||||
|
|
|
@ -6,26 +6,24 @@ use chain::RepresentH256;
|
|||
use p2p::OutboundSyncConnectionRef;
|
||||
use message::common::InventoryType;
|
||||
use message::types;
|
||||
use synchronization::{Synchronization, SynchronizationRef, Config as SynchronizationConfig, Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor};
|
||||
use synchronization_chain::ChainRef;
|
||||
use synchronization_client::{Client, SynchronizationClient};
|
||||
use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor};
|
||||
use synchronization_server::{Server, SynchronizationServer};
|
||||
|
||||
/// Thread-safe reference to the `LocalNode`.
|
||||
/// Locks order:
|
||||
/// 1) sync Mutex
|
||||
/// 2) executor Mutex
|
||||
/// 2) chain RwLock
|
||||
pub type LocalNodeRef<T> = Arc<LocalNode<T>>;
|
||||
pub type LocalNodeRef = Arc<LocalNode<LocalSynchronizationTaskExecutor, SynchronizationServer, SynchronizationClient<LocalSynchronizationTaskExecutor>>>;
|
||||
|
||||
/// Local synchronization node
|
||||
pub struct LocalNode<T: SynchronizationTaskExecutor + PeersConnections + Send + 'static> {
|
||||
pub struct LocalNode<T: SynchronizationTaskExecutor + PeersConnections,
|
||||
U: Server,
|
||||
V: Client> {
|
||||
/// Throughout counter of synchronization peers
|
||||
peer_counter: AtomicUsize,
|
||||
/// Synchronization chain
|
||||
chain: ChainRef,
|
||||
/// Synchronization executor
|
||||
executor: Arc<Mutex<T>>,
|
||||
/// Synchronization process
|
||||
sync: SynchronizationRef<T>,
|
||||
client: Arc<Mutex<V>>,
|
||||
/// Synchronization server
|
||||
server: Arc<Mutex<U>>,
|
||||
}
|
||||
|
||||
/// Peers list
|
||||
|
@ -34,21 +32,22 @@ pub trait PeersConnections {
|
|||
fn remove_peer_connection(&mut self, peer_index: usize);
|
||||
}
|
||||
|
||||
impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + Send + 'static {
|
||||
impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersConnections,
|
||||
U: Server,
|
||||
V: Client {
|
||||
/// New synchronization node with given storage
|
||||
pub fn new(chain: ChainRef, executor: Arc<Mutex<T>>) -> LocalNodeRef<T> {
|
||||
let sync = Synchronization::new(SynchronizationConfig::default(), executor.clone(), chain.clone());
|
||||
Arc::new(LocalNode {
|
||||
pub fn new(server: Arc<Mutex<U>>, client: Arc<Mutex<V>>, executor: Arc<Mutex<T>>) -> Self {
|
||||
LocalNode {
|
||||
peer_counter: AtomicUsize::new(0),
|
||||
chain: chain,
|
||||
executor: executor,
|
||||
sync: sync,
|
||||
})
|
||||
client: client,
|
||||
server: server,
|
||||
}
|
||||
}
|
||||
|
||||
/// Best block hash (including non-verified, requested && non-requested blocks)
|
||||
pub fn best_block(&self) -> db::BestBlock {
|
||||
self.chain.read().best_block()
|
||||
self.client.lock().best_block()
|
||||
}
|
||||
|
||||
pub fn create_sync_session(&self, _best_block_height: i32, outbound_connection: OutboundSyncConnectionRef) -> usize {
|
||||
|
@ -71,7 +70,7 @@ impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + S
|
|||
trace!(target: "sync", "Stopping sync session with peer#{}", peer_index);
|
||||
|
||||
self.executor.lock().remove_peer_connection(peer_index);
|
||||
self.sync.lock().on_peer_disconnected(peer_index);
|
||||
self.client.lock().on_peer_disconnected(peer_index);
|
||||
}
|
||||
|
||||
pub fn on_peer_inventory(&self, peer_index: usize, message: types::Inv) {
|
||||
|
@ -90,18 +89,22 @@ impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + S
|
|||
|
||||
// if there are unknown blocks => start synchronizing with peer
|
||||
if !blocks_inventory.is_empty() {
|
||||
self.sync.lock().on_new_blocks_inventory(peer_index, blocks_inventory);
|
||||
self.client.lock().on_new_blocks_inventory(peer_index, blocks_inventory);
|
||||
}
|
||||
|
||||
// TODO: process unknown transactions, etc...
|
||||
}
|
||||
|
||||
pub fn on_peer_getdata(&self, peer_index: usize, _message: types::GetData) {
|
||||
pub fn on_peer_getdata(&self, peer_index: usize, message: types::GetData) {
|
||||
trace!(target: "sync", "Got `getdata` message from peer#{}", peer_index);
|
||||
|
||||
self.server.lock().serve_getdata(peer_index, message);
|
||||
}
|
||||
|
||||
pub fn on_peer_getblocks(&self, peer_index: usize, _message: types::GetBlocks) {
|
||||
pub fn on_peer_getblocks(&self, peer_index: usize, message: types::GetBlocks) {
|
||||
trace!(target: "sync", "Got `getblocks` message from peer#{}", peer_index);
|
||||
|
||||
self.server.lock().serve_getblocks(peer_index, message);
|
||||
}
|
||||
|
||||
pub fn on_peer_getheaders(&self, peer_index: usize, _message: types::GetHeaders) {
|
||||
|
@ -116,7 +119,7 @@ impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + S
|
|||
trace!(target: "sync", "Got `block` message from peer#{}. Block hash: {}", peer_index, message.block.hash());
|
||||
|
||||
// try to process new block
|
||||
self.sync.lock().on_peer_block(peer_index, message.block);
|
||||
self.client.lock().on_peer_block(peer_index, message.block);
|
||||
}
|
||||
|
||||
pub fn on_peer_headers(&self, peer_index: usize, _message: types::Headers) {
|
||||
|
@ -166,19 +169,29 @@ impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + S
|
|||
pub fn on_peer_block_txn(&self, peer_index: usize, _message: types::BlockTxn) {
|
||||
trace!(target: "sync", "Got `blocktxn` message from peer#{}", peer_index);
|
||||
}
|
||||
|
||||
pub fn on_peer_notfound(&self, peer_index: usize, _message: types::NotFound) {
|
||||
trace!(target: "sync", "Got `notfound` message from peer#{}", peer_index);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use synchronization::Task;
|
||||
use synchronization::tests::DummyTaskExecutor;
|
||||
use chain::RepresentH256;
|
||||
use synchronization_executor::Task;
|
||||
use synchronization_client::tests::DummyTaskExecutor;
|
||||
use synchronization_client::{Config, SynchronizationClient};
|
||||
use synchronization_chain::Chain;
|
||||
use p2p::{OutboundSyncConnection, OutboundSyncConnectionRef};
|
||||
use message::types;
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use db;
|
||||
use super::LocalNode;
|
||||
use test_data;
|
||||
use synchronization_server::ServerTask;
|
||||
use synchronization_server::tests::DummyServer;
|
||||
|
||||
struct DummyOutboundSyncConnection;
|
||||
|
||||
|
@ -207,18 +220,47 @@ mod tests {
|
|||
fn send_compact_block(&self, _message: &types::CompactBlock) {}
|
||||
fn send_get_block_txn(&self, _message: &types::GetBlockTxn) {}
|
||||
fn send_block_txn(&self, _message: &types::BlockTxn) {}
|
||||
fn send_notfound(&self, _message: &types::NotFound) {}
|
||||
}
|
||||
|
||||
fn create_local_node() -> (Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<DummyServer>>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor>>) {
|
||||
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
|
||||
let executor = Arc::new(Mutex::new(DummyTaskExecutor::default()));
|
||||
let server = Arc::new(Mutex::new(DummyServer::new()));
|
||||
let config = Config { skip_verification: true };
|
||||
let client = SynchronizationClient::new(config, executor.clone(), chain);
|
||||
let local_node = LocalNode::new(server.clone(), client, executor.clone());
|
||||
(executor, server, local_node)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_node_request_inventory_on_sync_start() {
|
||||
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
|
||||
let executor = Arc::new(Mutex::new(DummyTaskExecutor::default()));
|
||||
let local_node = LocalNode::new(chain, executor.clone());
|
||||
let (executor, _, local_node) = create_local_node();
|
||||
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||
// start sync session
|
||||
local_node.start_sync_session(peer_index, 0);
|
||||
|
||||
// => ask for inventory
|
||||
let tasks = executor.lock().take_tasks();
|
||||
assert_eq!(tasks.len(), 1);
|
||||
assert_eq!(tasks, vec![Task::RequestInventory(peer_index)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_node_serves_block() {
|
||||
let (_, server, local_node) = create_local_node();
|
||||
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||
// peer requests genesis block
|
||||
let genesis_block_hash = test_data::genesis().hash();
|
||||
let inventory = vec![
|
||||
InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: genesis_block_hash.clone(),
|
||||
}
|
||||
];
|
||||
local_node.on_peer_getdata(peer_index, types::GetData {
|
||||
inventory: inventory.clone()
|
||||
});
|
||||
// => `getdata` is served
|
||||
let tasks = server.lock().take_tasks();
|
||||
assert_eq!(tasks, vec![(peer_index, ServerTask::ServeGetData(inventory))]);
|
||||
}
|
||||
}
|
|
@ -199,6 +199,21 @@ impl Chain {
|
|||
}
|
||||
}
|
||||
|
||||
/// Get block number from storage
|
||||
pub fn storage_block_number(&self, hash: &H256) -> Option<u32> {
|
||||
self.storage.block_number(hash)
|
||||
}
|
||||
|
||||
/// Get block hash from storage
|
||||
pub fn storage_block_hash(&self, number: u32) -> Option<H256> {
|
||||
self.storage.block_hash(number)
|
||||
}
|
||||
|
||||
/// Get block from the storage
|
||||
pub fn storage_block(&self, hash: &H256) -> Option<Block> {
|
||||
self.storage.block(db::BlockRef::Hash(hash.clone()))
|
||||
}
|
||||
|
||||
/// Prepare best block locator hashes
|
||||
pub fn best_block_locator_hashes(&self) -> Vec<H256> {
|
||||
let mut result: Vec<H256> = Vec::with_capacity(4);
|
||||
|
|
|
@ -15,6 +15,7 @@ use synchronization_chain::{ChainRef, BlockState};
|
|||
#[cfg(test)]
|
||||
use synchronization_chain::{Information as ChainInformation};
|
||||
use verification::{ChainVerifier, Error as VerificationError, Verify};
|
||||
use synchronization_executor::{Task, TaskExecutor};
|
||||
use time;
|
||||
|
||||
///! Blocks synchronization process:
|
||||
|
@ -77,20 +78,7 @@ const MIN_BLOCKS_IN_REQUEST: u32 = 32;
|
|||
/// Maximum number of blocks to request from peer
|
||||
const MAX_BLOCKS_IN_REQUEST: u32 = 512;
|
||||
|
||||
/// Thread-safe reference to the `Synchronization`
|
||||
pub type SynchronizationRef<T> = Arc<Mutex<Synchronization<T>>>;
|
||||
|
||||
/// Synchronization task for the peer.
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
pub enum Task {
|
||||
/// Request given blocks.
|
||||
RequestBlocks(usize, Vec<H256>),
|
||||
/// Request full inventory using block_locator_hashes.
|
||||
RequestInventory(usize),
|
||||
/// Request inventory using best block locator only.
|
||||
RequestBestInventory(usize),
|
||||
}
|
||||
|
||||
/// Synchronization state
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum State {
|
||||
Synchronizing(f64, u32),
|
||||
|
@ -111,11 +99,6 @@ pub struct Information {
|
|||
pub orphaned: usize,
|
||||
}
|
||||
|
||||
/// Synchronization task executor
|
||||
pub trait TaskExecutor {
|
||||
fn execute(&mut self, task: Task);
|
||||
}
|
||||
|
||||
/// Verification thread tasks
|
||||
enum VerificationTask {
|
||||
/// Verify single block
|
||||
|
@ -124,15 +107,26 @@ enum VerificationTask {
|
|||
Stop,
|
||||
}
|
||||
|
||||
/// Synchronization config
|
||||
#[derive(Default)]
|
||||
pub struct Config {
|
||||
/// Skip blocks verification
|
||||
pub skip_block_verification: bool,
|
||||
/// Synchronization client trait
|
||||
pub trait Client : Send + 'static {
|
||||
fn best_block(&self) -> db::BestBlock;
|
||||
fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>);
|
||||
fn on_peer_block(&mut self, peer_index: usize, block: Block);
|
||||
fn on_peer_disconnected(&mut self, peer_index: usize);
|
||||
fn reset(&mut self, is_hard: bool);
|
||||
fn on_block_verification_success(&mut self, block: Block);
|
||||
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256);
|
||||
}
|
||||
|
||||
/// New blocks synchronization process.
|
||||
pub struct Synchronization<T: TaskExecutor + Send + 'static> {
|
||||
/// Synchronization client configuration options.
|
||||
#[derive(Default)]
|
||||
pub struct Config {
|
||||
/// Do not verify incoming blocks before inserting to db.
|
||||
pub skip_verification: bool,
|
||||
}
|
||||
|
||||
/// Synchronization client.
|
||||
pub struct SynchronizationClient<T: TaskExecutor> {
|
||||
/// Synchronization state.
|
||||
state: State,
|
||||
/// Synchronization peers.
|
||||
|
@ -158,7 +152,7 @@ impl State {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Synchronization<T> where T: TaskExecutor + Send + 'static {
|
||||
impl<T> Drop for SynchronizationClient<T> where T: TaskExecutor {
|
||||
fn drop(&mut self) {
|
||||
if let Some(join_handle) = self.verification_worker_thread.take() {
|
||||
self.verification_work_sender
|
||||
|
@ -170,11 +164,91 @@ impl<T> Drop for Synchronization<T> where T: TaskExecutor + Send + 'static {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
|
||||
impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
|
||||
/// Get best known block
|
||||
fn best_block(&self) -> db::BestBlock {
|
||||
self.chain.read().best_block()
|
||||
}
|
||||
|
||||
/// Try to queue synchronization of unknown blocks when new inventory is received.
|
||||
fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>) {
|
||||
self.process_new_blocks_inventory(peer_index, peer_hashes);
|
||||
self.execute_synchronization_tasks();
|
||||
}
|
||||
|
||||
/// Process new block.
|
||||
fn on_peer_block(&mut self, peer_index: usize, block: Block) {
|
||||
let block_hash = block.hash();
|
||||
|
||||
// update peers to select next tasks
|
||||
self.peers.on_block_received(peer_index, &block_hash);
|
||||
|
||||
self.process_peer_block(block_hash, block);
|
||||
self.execute_synchronization_tasks();
|
||||
}
|
||||
|
||||
/// Peer disconnected.
|
||||
fn on_peer_disconnected(&mut self, peer_index: usize) {
|
||||
self.peers.on_peer_disconnected(peer_index);
|
||||
|
||||
// when last peer is disconnected, reset, but let verifying blocks be verified
|
||||
self.reset(false);
|
||||
}
|
||||
|
||||
/// Reset synchronization process
|
||||
fn reset(&mut self, is_hard: bool) {
|
||||
self.peers.reset();
|
||||
self.orphaned_blocks.clear();
|
||||
// TODO: reset verification queue
|
||||
|
||||
let mut chain = self.chain.write();
|
||||
chain.remove_blocks_with_state(BlockState::Requested);
|
||||
chain.remove_blocks_with_state(BlockState::Scheduled);
|
||||
if is_hard {
|
||||
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
|
||||
chain.remove_blocks_with_state(BlockState::Verifying);
|
||||
warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block());
|
||||
}
|
||||
else {
|
||||
self.state = State::Saturated;
|
||||
}
|
||||
}
|
||||
|
||||
/// Process successful block verification
|
||||
fn on_block_verification_success(&mut self, block: Block) {
|
||||
{
|
||||
let hash = block.hash();
|
||||
let mut chain = self.chain.write();
|
||||
|
||||
// remove from verifying queue
|
||||
assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front);
|
||||
|
||||
// insert to storage
|
||||
chain.insert_best_block(block)
|
||||
.expect("Error inserting to db.");
|
||||
}
|
||||
|
||||
// continue with synchronization
|
||||
self.execute_synchronization_tasks();
|
||||
}
|
||||
|
||||
/// Process failed block verification
|
||||
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) {
|
||||
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
|
||||
|
||||
// reset synchronization process
|
||||
self.reset(true);
|
||||
|
||||
// start new tasks
|
||||
self.execute_synchronization_tasks();
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
||||
/// Create new synchronization window
|
||||
pub fn new(config: Config, executor: Arc<Mutex<T>>, chain: ChainRef) -> SynchronizationRef<T> {
|
||||
let sync = SynchronizationRef::new(Mutex::new(
|
||||
Synchronization {
|
||||
pub fn new(config: Config, executor: Arc<Mutex<T>>, chain: ChainRef) -> Arc<Mutex<Self>> {
|
||||
let sync = Arc::new(Mutex::new(
|
||||
SynchronizationClient {
|
||||
state: State::Saturated,
|
||||
peers: Peers::new(),
|
||||
executor: executor,
|
||||
|
@ -185,7 +259,7 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
|
|||
}
|
||||
));
|
||||
|
||||
if !config.skip_block_verification {
|
||||
if !config.skip_verification {
|
||||
let (verification_work_sender, verification_work_receiver) = channel();
|
||||
let csync = sync.clone();
|
||||
let mut lsync = sync.lock();
|
||||
|
@ -194,7 +268,7 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
|
|||
lsync.verification_worker_thread = Some(thread::Builder::new()
|
||||
.name("Sync verification thread".to_string())
|
||||
.spawn(move || {
|
||||
Synchronization::verification_worker_proc(csync, storage, verification_work_receiver)
|
||||
SynchronizationClient::verification_worker_proc(csync, storage, verification_work_receiver)
|
||||
})
|
||||
.expect("Error creating verification thread"));
|
||||
}
|
||||
|
@ -213,50 +287,6 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
|
|||
}
|
||||
}
|
||||
|
||||
/// Try to queue synchronization of unknown blocks when new inventory is received.
|
||||
pub fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>) {
|
||||
self.process_new_blocks_inventory(peer_index, peer_hashes);
|
||||
self.execute_synchronization_tasks();
|
||||
}
|
||||
|
||||
/// Process new block.
|
||||
pub fn on_peer_block(&mut self, peer_index: usize, block: Block) {
|
||||
let block_hash = block.hash();
|
||||
|
||||
// update peers to select next tasks
|
||||
self.peers.on_block_received(peer_index, &block_hash);
|
||||
|
||||
self.process_peer_block(block_hash, block);
|
||||
self.execute_synchronization_tasks();
|
||||
}
|
||||
|
||||
/// Peer disconnected.
|
||||
pub fn on_peer_disconnected(&mut self, peer_index: usize) {
|
||||
self.peers.on_peer_disconnected(peer_index);
|
||||
|
||||
// when last peer is disconnected, reset, but let verifying blocks be verified
|
||||
self.reset(false);
|
||||
}
|
||||
|
||||
/// Reset synchronization process
|
||||
pub fn reset(&mut self, is_hard: bool) {
|
||||
self.peers.reset();
|
||||
self.orphaned_blocks.clear();
|
||||
// TODO: reset verification queue
|
||||
|
||||
let mut chain = self.chain.write();
|
||||
chain.remove_blocks_with_state(BlockState::Requested);
|
||||
chain.remove_blocks_with_state(BlockState::Scheduled);
|
||||
if is_hard {
|
||||
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
|
||||
chain.remove_blocks_with_state(BlockState::Verifying);
|
||||
warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block());
|
||||
}
|
||||
else {
|
||||
self.state = State::Saturated;
|
||||
}
|
||||
}
|
||||
|
||||
/// Process new blocks inventory
|
||||
fn process_new_blocks_inventory(&mut self, peer_index: usize, mut peer_hashes: Vec<H256>) {
|
||||
// | requested | QUEUED |
|
||||
|
@ -273,54 +303,62 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
|
|||
|
||||
let mut chain = self.chain.write();
|
||||
|
||||
// new block is scheduled => move to synchronizing state
|
||||
if !self.state.is_synchronizing() {
|
||||
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
|
||||
}
|
||||
|
||||
// when synchronization is idling
|
||||
// => request full inventory
|
||||
if !chain.has_blocks_of_state(BlockState::Scheduled)
|
||||
&& !chain.has_blocks_of_state(BlockState::Requested) {
|
||||
let unknown_blocks = peer_hashes.into_iter()
|
||||
.filter(|hash| chain.block_has_state(&hash, BlockState::Unknown))
|
||||
.collect();
|
||||
chain.schedule_blocks_hashes(unknown_blocks);
|
||||
self.peers.insert(peer_index);
|
||||
return;
|
||||
}
|
||||
|
||||
// cases: [2], [5], [6], [8]
|
||||
// if last block from peer_hashes is in window { requested_hashes + queued_hashes }
|
||||
// => no new blocks for synchronization, but we will use this peer in synchronization
|
||||
let peer_hashes_len = peer_hashes.len();
|
||||
if chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Scheduled)
|
||||
|| chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Requested) {
|
||||
self.peers.insert(peer_index);
|
||||
return;
|
||||
}
|
||||
|
||||
// cases: [1], [3], [4], [7], [9], [10]
|
||||
// try to find new blocks for synchronization from inventory
|
||||
let mut last_known_peer_hash_index = peer_hashes_len - 1;
|
||||
loop {
|
||||
if chain.block_state(&peer_hashes[last_known_peer_hash_index]) != BlockState::Unknown {
|
||||
// we have found first block which is known to us
|
||||
// => blocks in range [(last_known_peer_hash_index + 1)..peer_hashes_len] are unknown
|
||||
// && must be scheduled for request
|
||||
let unknown_peer_hashes = peer_hashes.split_off(last_known_peer_hash_index + 1);
|
||||
// when synchronization is idling
|
||||
// => request full inventory
|
||||
if !chain.has_blocks_of_state(BlockState::Scheduled)
|
||||
&& !chain.has_blocks_of_state(BlockState::Requested) {
|
||||
let unknown_blocks: Vec<_> = peer_hashes.into_iter()
|
||||
.filter(|hash| chain.block_has_state(&hash, BlockState::Unknown))
|
||||
.collect();
|
||||
|
||||
chain.schedule_blocks_hashes(unknown_peer_hashes);
|
||||
// no new blocks => no need to switch to the synchronizing state
|
||||
if unknown_blocks.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
chain.schedule_blocks_hashes(unknown_blocks);
|
||||
self.peers.insert(peer_index);
|
||||
break;
|
||||
}
|
||||
|
||||
// cases: [2], [5], [6], [8]
|
||||
// if last block from peer_hashes is in window { requested_hashes + queued_hashes }
|
||||
// => no new blocks for synchronization, but we will use this peer in synchronization
|
||||
let peer_hashes_len = peer_hashes.len();
|
||||
if chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Scheduled)
|
||||
|| chain.block_has_state(&peer_hashes[peer_hashes_len - 1], BlockState::Requested) {
|
||||
self.peers.insert(peer_index);
|
||||
return;
|
||||
}
|
||||
|
||||
if last_known_peer_hash_index == 0 {
|
||||
// either these are blocks from the future or blocks from the past
|
||||
// => TODO: ignore this peer during synchronization
|
||||
return;
|
||||
// cases: [1], [3], [4], [7], [9], [10]
|
||||
// try to find new blocks for synchronization from inventory
|
||||
let mut last_known_peer_hash_index = peer_hashes_len - 1;
|
||||
loop {
|
||||
if chain.block_state(&peer_hashes[last_known_peer_hash_index]) != BlockState::Unknown {
|
||||
// we have found first block which is known to us
|
||||
// => blocks in range [(last_known_peer_hash_index + 1)..peer_hashes_len] are unknown
|
||||
// && must be scheduled for request
|
||||
let unknown_peer_hashes = peer_hashes.split_off(last_known_peer_hash_index + 1);
|
||||
|
||||
chain.schedule_blocks_hashes(unknown_peer_hashes);
|
||||
self.peers.insert(peer_index);
|
||||
break;
|
||||
}
|
||||
|
||||
if last_known_peer_hash_index == 0 {
|
||||
// either these are blocks from the future or blocks from the past
|
||||
// => TODO: ignore this peer during synchronization
|
||||
return;
|
||||
}
|
||||
last_known_peer_hash_index -= 1;
|
||||
}
|
||||
last_known_peer_hash_index -= 1;
|
||||
}
|
||||
|
||||
// move to synchronizing state
|
||||
if !self.state.is_synchronizing() {
|
||||
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -374,9 +412,9 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
|
|||
|
||||
// proceed to the next orphaned block
|
||||
if let Entry::Occupied(orphaned_block_entry) = self.orphaned_blocks.entry(current_block_hash) {
|
||||
let (orphaned_parent_hash, orphaned_block) = orphaned_block_entry.remove_entry();
|
||||
current_block_hash = orphaned_parent_hash;
|
||||
let (_, orphaned_block) = orphaned_block_entry.remove_entry();
|
||||
current_block = orphaned_block;
|
||||
current_block_hash = current_block.hash();
|
||||
}
|
||||
else {
|
||||
break;
|
||||
|
@ -453,7 +491,7 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
|
|||
}
|
||||
|
||||
/// Thread procedure for handling verification tasks
|
||||
fn verification_worker_proc(sync: SynchronizationRef<T>, storage: Arc<db::Store>, work_receiver: Receiver<VerificationTask>) {
|
||||
fn verification_worker_proc(sync: Arc<Mutex<Self>>, storage: Arc<db::Store>, work_receiver: Receiver<VerificationTask>) {
|
||||
let verifier = ChainVerifier::new(storage);
|
||||
while let Ok(task) = work_receiver.recv() {
|
||||
match task {
|
||||
|
@ -471,35 +509,6 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Process successful block verification
|
||||
fn on_block_verification_success(&mut self, block: Block) {
|
||||
{
|
||||
let hash = block.hash();
|
||||
let mut chain = self.chain.write();
|
||||
|
||||
// remove from verifying queue
|
||||
assert_eq!(chain.remove_block_with_state(&hash, BlockState::Verifying), HashPosition::Front);
|
||||
|
||||
// insert to storage
|
||||
chain.insert_best_block(block)
|
||||
.expect("Error inserting to db.");
|
||||
}
|
||||
|
||||
// continue with synchronization
|
||||
self.execute_synchronization_tasks();
|
||||
}
|
||||
|
||||
/// Process failed block verification
|
||||
fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) {
|
||||
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
|
||||
|
||||
// reset synchronization process
|
||||
self.reset(true);
|
||||
|
||||
// start new tasks
|
||||
self.execute_synchronization_tasks();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -508,9 +517,11 @@ pub mod tests {
|
|||
use std::mem::replace;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use chain::{Block, RepresentH256};
|
||||
use super::{Synchronization, SynchronizationRef, Config, Task, TaskExecutor};
|
||||
use super::{Client, Config, SynchronizationClient};
|
||||
use synchronization_executor::{Task, TaskExecutor};
|
||||
use local_node::PeersConnections;
|
||||
use synchronization_chain::{Chain, ChainRef};
|
||||
use test_data;
|
||||
use p2p::OutboundSyncConnectionRef;
|
||||
use db;
|
||||
|
||||
|
@ -536,13 +547,12 @@ pub mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
fn create_sync() -> (Arc<Mutex<DummyTaskExecutor>>, SynchronizationRef<DummyTaskExecutor>) {
|
||||
fn create_sync() -> (Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<SynchronizationClient<DummyTaskExecutor>>>) {
|
||||
let storage = Arc::new(db::TestStorage::with_genesis_block());
|
||||
let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone())));
|
||||
let executor = Arc::new(Mutex::new(DummyTaskExecutor::default()));
|
||||
(executor.clone(), Synchronization::new(Config {
|
||||
skip_block_verification: true,
|
||||
}, executor, chain))
|
||||
let config = Config { skip_verification: true };
|
||||
(executor.clone(), SynchronizationClient::new(config, executor, chain))
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -658,7 +668,6 @@ pub mod tests {
|
|||
&& sync.information().orphaned == 1);
|
||||
// receive block from peer#1
|
||||
sync.on_peer_block(1, block1);
|
||||
println!("=== {:?}", sync.information().chain);
|
||||
assert!(sync.information().chain.requested == 0
|
||||
&& sync.information().orphaned == 0
|
||||
&& sync.information().chain.stored == 3);
|
||||
|
@ -683,4 +692,17 @@ pub mod tests {
|
|||
assert!(!sync.information().state.is_synchronizing());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn synchronization_not_starting_when_receiving_known_blocks() {
|
||||
let (executor, sync) = create_sync();
|
||||
let mut sync = sync.lock();
|
||||
// saturated => receive inventory with known blocks only
|
||||
sync.on_new_blocks_inventory(1, vec![test_data::genesis().hash()]);
|
||||
// => no need to start synchronization
|
||||
assert!(!sync.information().state.is_synchronizing());
|
||||
// => no synchronization tasks are scheduled
|
||||
let tasks = executor.lock().take_tasks();
|
||||
assert_eq!(tasks, vec![]);
|
||||
}
|
||||
}
|
|
@ -1,16 +1,38 @@
|
|||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
use parking_lot::Mutex;
|
||||
use chain::{Block, RepresentH256};
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use message::types;
|
||||
use primitives::hash::H256;
|
||||
use p2p::OutboundSyncConnectionRef;
|
||||
use synchronization_chain::ChainRef;
|
||||
use synchronization::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor};
|
||||
use local_node::PeersConnections;
|
||||
|
||||
pub type LocalSynchronizationTaskExecutorRef = Arc<Mutex<LocalSynchronizationTaskExecutor>>;
|
||||
|
||||
/// Synchronization task executor
|
||||
pub trait TaskExecutor : Send + 'static {
|
||||
fn execute(&mut self, task: Task);
|
||||
}
|
||||
|
||||
/// Synchronization task for the peer.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Task {
|
||||
/// Request given blocks.
|
||||
RequestBlocks(usize, Vec<H256>),
|
||||
/// Request full inventory using block_locator_hashes.
|
||||
RequestInventory(usize),
|
||||
/// Request inventory using best block locator only.
|
||||
RequestBestInventory(usize),
|
||||
/// Send block.
|
||||
SendBlock(usize, Block),
|
||||
/// Send notfound
|
||||
SendNotFound(usize, Vec<InventoryVector>),
|
||||
/// Send inventory
|
||||
SendInventory(usize, Vec<InventoryVector>),
|
||||
}
|
||||
|
||||
/// Synchronization tasks executor
|
||||
pub struct LocalSynchronizationTaskExecutor {
|
||||
/// Active synchronization peers
|
||||
|
@ -38,12 +60,12 @@ impl PeersConnections for LocalSynchronizationTaskExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {
|
||||
fn execute(&mut self, task: SynchronizationTask) {
|
||||
impl TaskExecutor for LocalSynchronizationTaskExecutor {
|
||||
fn execute(&mut self, task: Task) {
|
||||
// TODO: what is types::GetBlocks::version here? (@ PR#37)
|
||||
|
||||
match task {
|
||||
SynchronizationTask::RequestBlocks(peer_index, blocks_hashes) => {
|
||||
Task::RequestBlocks(peer_index, blocks_hashes) => {
|
||||
let getdata = types::GetData {
|
||||
inventory: blocks_hashes.into_iter()
|
||||
.map(|hash| InventoryVector {
|
||||
|
@ -58,7 +80,7 @@ impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
connection.send_getdata(&getdata);
|
||||
}
|
||||
}
|
||||
SynchronizationTask::RequestInventory(peer_index) => {
|
||||
Task::RequestInventory(peer_index) => {
|
||||
let block_locator_hashes = self.chain.read().block_locator_hashes();
|
||||
let getblocks = types::GetBlocks {
|
||||
version: 0,
|
||||
|
@ -72,7 +94,7 @@ impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
connection.send_getblocks(&getblocks);
|
||||
}
|
||||
},
|
||||
SynchronizationTask::RequestBestInventory(peer_index) => {
|
||||
Task::RequestBestInventory(peer_index) => {
|
||||
let block_locator_hashes = self.chain.read().best_block_locator_hashes();
|
||||
let getblocks = types::GetBlocks {
|
||||
version: 0,
|
||||
|
@ -86,6 +108,39 @@ impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
connection.send_getblocks(&getblocks);
|
||||
}
|
||||
},
|
||||
Task::SendBlock(peer_index, block) => {
|
||||
let block_message = types::Block {
|
||||
block: block,
|
||||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
let connection = &mut *connection;
|
||||
trace!(target: "sync", "Sending block {:?} to peer#{}", block_message.block.hash(), peer_index);
|
||||
connection.send_block(&block_message);
|
||||
}
|
||||
},
|
||||
Task::SendNotFound(peer_index, unknown_inventory) => {
|
||||
let notfound = types::NotFound {
|
||||
inventory: unknown_inventory,
|
||||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
let connection = &mut *connection;
|
||||
trace!(target: "sync", "Sending notfound to peer#{} with {} items", peer_index, notfound.inventory.len());
|
||||
connection.send_notfound(¬found);
|
||||
}
|
||||
},
|
||||
Task::SendInventory(peer_index, inventory) => {
|
||||
let inventory = types::Inv {
|
||||
inventory: inventory,
|
||||
};
|
||||
|
||||
if let Some(connection) = self.peers.get_mut(&peer_index) {
|
||||
let connection = &mut *connection;
|
||||
trace!(target: "sync", "Sending inventory to peer#{} with {} items", peer_index, inventory.inventory.len());
|
||||
connection.send_inventory(&inventory);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,284 @@
|
|||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread;
|
||||
use std::collections::{VecDeque, HashMap};
|
||||
use std::collections::hash_map::Entry;
|
||||
use parking_lot::{Mutex, Condvar};
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use db;
|
||||
use primitives::hash::H256;
|
||||
use synchronization_chain::ChainRef;
|
||||
use synchronization_executor::{Task, TaskExecutor};
|
||||
use message::types;
|
||||
|
||||
/// Synchronization requests server trait
|
||||
pub trait Server : Send + 'static {
|
||||
fn serve_getdata(&mut self, peer_index: usize, message: types::GetData);
|
||||
fn serve_getblocks(&mut self, peer_index: usize, message: types::GetBlocks);
|
||||
}
|
||||
|
||||
/// Synchronization requests server
|
||||
pub struct SynchronizationServer {
|
||||
chain: ChainRef,
|
||||
queue_ready: Arc<Condvar>,
|
||||
queue: Arc<Mutex<ServerQueue>>,
|
||||
worker_thread: Option<thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
struct ServerQueue {
|
||||
is_stopping: AtomicBool,
|
||||
queue_ready: Arc<Condvar>,
|
||||
peers_queue: VecDeque<usize>,
|
||||
tasks_queue: HashMap<usize, VecDeque<ServerTask>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ServerTask {
|
||||
ServeGetData(Vec<InventoryVector>),
|
||||
ServeGetBlocks(db::BestBlock, H256),
|
||||
ReturnNotFound(Vec<InventoryVector>),
|
||||
ReturnBlock(H256),
|
||||
}
|
||||
|
||||
impl SynchronizationServer {
|
||||
pub fn new<T: TaskExecutor>(chain: ChainRef, executor: Arc<Mutex<T>>) -> Self {
|
||||
let queue_ready = Arc::new(Condvar::new());
|
||||
let queue = Arc::new(Mutex::new(ServerQueue::new(queue_ready.clone())));
|
||||
let mut server = SynchronizationServer {
|
||||
chain: chain.clone(),
|
||||
queue_ready: queue_ready.clone(),
|
||||
queue: queue.clone(),
|
||||
worker_thread: None,
|
||||
};
|
||||
server.worker_thread = Some(thread::spawn(move || {
|
||||
SynchronizationServer::server_worker(queue_ready, queue, chain, executor);
|
||||
}));
|
||||
server
|
||||
}
|
||||
|
||||
fn locate_known_block(&self, block_locator_hashes: Vec<H256>) -> Option<db::BestBlock> {
|
||||
let chain = self.chain.read();
|
||||
block_locator_hashes.into_iter()
|
||||
.filter_map(|hash| chain
|
||||
.storage_block_number(&hash)
|
||||
.map(|number| db::BestBlock {
|
||||
number: number,
|
||||
hash: hash,
|
||||
}))
|
||||
.nth(0)
|
||||
}
|
||||
|
||||
fn server_worker<T: TaskExecutor>(queue_ready: Arc<Condvar>, queue: Arc<Mutex<ServerQueue>>, chain: ChainRef, executor: Arc<Mutex<T>>) {
|
||||
loop {
|
||||
let server_task = {
|
||||
let mut queue = queue.lock();
|
||||
if queue.is_stopping.load(Ordering::SeqCst) {
|
||||
break
|
||||
}
|
||||
queue.next_task()
|
||||
.map_or_else(|| {
|
||||
queue_ready.wait(&mut queue);
|
||||
queue.next_task()
|
||||
}, |next_task| Some(next_task))
|
||||
};
|
||||
|
||||
match server_task {
|
||||
// has new task
|
||||
Some(server_task) => match server_task {
|
||||
// `getdata` => `notfound` + `block` + ...
|
||||
(peer_index, ServerTask::ServeGetData(inventory)) => {
|
||||
let mut unknown_items: Vec<InventoryVector> = Vec::new();
|
||||
let mut new_tasks: Vec<ServerTask> = Vec::new();
|
||||
for item in inventory {
|
||||
match item.inv_type {
|
||||
InventoryType::MessageBlock => {
|
||||
match chain.read().storage_block_number(&item.hash) {
|
||||
Some(_) => new_tasks.push(ServerTask::ReturnBlock(item.hash.clone())),
|
||||
None => unknown_items.push(item),
|
||||
}
|
||||
},
|
||||
_ => (), // TODO: process other inventory types
|
||||
}
|
||||
}
|
||||
// respond with `notfound` message for unknown data
|
||||
if !unknown_items.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with notfound with {} items to peer#{}", unknown_items.len(), peer_index);
|
||||
new_tasks.push(ServerTask::ReturnNotFound(unknown_items));
|
||||
}
|
||||
// schedule data responses
|
||||
if !new_tasks.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with data with {} items to peer#{}", new_tasks.len(), peer_index);
|
||||
queue.lock().add_tasks(peer_index, new_tasks);
|
||||
}
|
||||
},
|
||||
// `inventory`
|
||||
(peer_index, ServerTask::ServeGetBlocks(best_block, hash_stop)) => {
|
||||
let storage_block_hash = chain.read().storage_block_hash(best_block.number);
|
||||
if let Some(hash) = storage_block_hash {
|
||||
// check that chain has not reorganized since task was queued
|
||||
if hash == best_block.hash {
|
||||
let mut inventory: Vec<InventoryVector> = Vec::new();
|
||||
let first_block_number = best_block.number + 1;
|
||||
let last_block_number = best_block.number + 500;
|
||||
// 500 hashes after best_block.number OR hash_stop OR blockchain end
|
||||
for block_number in first_block_number..last_block_number {
|
||||
match chain.read().storage_block_hash(block_number) {
|
||||
Some(ref block_hash) if block_hash == &hash_stop => break,
|
||||
None => break,
|
||||
Some(block_hash) => {
|
||||
inventory.push(InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: block_hash,
|
||||
});
|
||||
},
|
||||
}
|
||||
}
|
||||
if !inventory.is_empty() {
|
||||
trace!(target: "sync", "Going to respond with inventory with {} items to peer#{}", inventory.len(), peer_index);
|
||||
executor.lock().execute(Task::SendInventory(peer_index, inventory));
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
// `notfound`
|
||||
(peer_index, ServerTask::ReturnNotFound(inventory)) => {
|
||||
executor.lock().execute(Task::SendNotFound(peer_index, inventory));
|
||||
},
|
||||
// `block`
|
||||
(peer_index, ServerTask::ReturnBlock(block_hash)) => {
|
||||
let storage_block = chain.read().storage_block(&block_hash);
|
||||
if let Some(storage_block) = storage_block {
|
||||
executor.lock().execute(Task::SendBlock(peer_index, storage_block));
|
||||
}
|
||||
},
|
||||
},
|
||||
// no tasks after wake-up => stopping
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SynchronizationServer {
|
||||
fn drop(&mut self) {
|
||||
if let Some(join_handle) = self.worker_thread.take() {
|
||||
self.queue.lock().is_stopping.store(true, Ordering::SeqCst);
|
||||
self.queue_ready.notify_one();
|
||||
join_handle.join().expect("Clean shutdown.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Server for SynchronizationServer {
|
||||
fn serve_getdata(&mut self, peer_index: usize, message: types::GetData) {
|
||||
self.queue.lock().add_task(peer_index, ServerTask::ServeGetData(message.inventory));
|
||||
}
|
||||
|
||||
fn serve_getblocks(&mut self, peer_index: usize, message: types::GetBlocks) {
|
||||
if let Some(best_common_block) = self.locate_known_block(message.block_locator_hashes) {
|
||||
trace!(target: "sync", "Best common block with peer#{} is block#{}: {:?}", peer_index, best_common_block.number, best_common_block.hash);
|
||||
self.queue.lock().add_task(peer_index, ServerTask::ServeGetBlocks(best_common_block, message.hash_stop));
|
||||
}
|
||||
else {
|
||||
trace!(target: "sync", "No common blocks with peer#{}", peer_index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ServerQueue {
|
||||
pub fn new(queue_ready: Arc<Condvar>) -> Self {
|
||||
ServerQueue {
|
||||
is_stopping: AtomicBool::new(false),
|
||||
queue_ready: queue_ready,
|
||||
peers_queue: VecDeque::new(),
|
||||
tasks_queue: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next_task(&mut self) -> Option<(usize, ServerTask)> {
|
||||
self.peers_queue.pop_front()
|
||||
.map(|peer| {
|
||||
let (peer_task, no_tasks_left) = {
|
||||
let peer_tasks = self.tasks_queue.get_mut(&peer).expect("for each peer there is non-empty tasks queue");
|
||||
let peer_task = peer_tasks.pop_front().expect("for each peer there is non-empty tasks queue");
|
||||
(peer_task, peer_tasks.is_empty())
|
||||
};
|
||||
|
||||
// remove if no tasks left || schedule otherwise
|
||||
if no_tasks_left {
|
||||
self.tasks_queue.remove(&peer);
|
||||
}
|
||||
else {
|
||||
self.peers_queue.push_back(peer);
|
||||
}
|
||||
(peer, peer_task)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn add_task(&mut self, peer_index: usize, task: ServerTask) {
|
||||
match self.tasks_queue.entry(peer_index) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().push_back(task);
|
||||
},
|
||||
Entry::Vacant(entry) => {
|
||||
let mut new_tasks = VecDeque::new();
|
||||
new_tasks.push_back(task);
|
||||
entry.insert(new_tasks);
|
||||
self.peers_queue.push_back(peer_index);
|
||||
}
|
||||
}
|
||||
self.queue_ready.notify_one();
|
||||
}
|
||||
|
||||
pub fn add_tasks(&mut self, peer_index: usize, tasks: Vec<ServerTask>) {
|
||||
match self.tasks_queue.entry(peer_index) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().extend(tasks);
|
||||
},
|
||||
Entry::Vacant(entry) => {
|
||||
let mut new_tasks = VecDeque::new();
|
||||
new_tasks.extend(tasks);
|
||||
entry.insert(new_tasks);
|
||||
self.peers_queue.push_back(peer_index);
|
||||
}
|
||||
}
|
||||
self.queue_ready.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use super::{Server, ServerTask};
|
||||
use message::types;
|
||||
use db;
|
||||
use std::mem::replace;
|
||||
|
||||
pub struct DummyServer {
|
||||
tasks: Vec<(usize, ServerTask)>,
|
||||
}
|
||||
|
||||
impl DummyServer {
|
||||
pub fn new() -> Self {
|
||||
DummyServer {
|
||||
tasks: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn take_tasks(&mut self) -> Vec<(usize, ServerTask)> {
|
||||
replace(&mut self.tasks, Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl Server for DummyServer {
|
||||
fn serve_getdata(&mut self, peer_index: usize, message: types::GetData) {
|
||||
self.tasks.push((peer_index, ServerTask::ServeGetData(message.inventory)));
|
||||
}
|
||||
|
||||
fn serve_getblocks(&mut self, peer_index: usize, message: types::GetBlocks) {
|
||||
self.tasks.push((peer_index, ServerTask::ServeGetBlocks(db::BestBlock {
|
||||
number: 0,
|
||||
hash: message.block_locator_hashes[0].clone(),
|
||||
}, message.hash_stop)));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue