Merge pull request #87 from ethcore/sync_manager
Initial sync manager version
This commit is contained in:
commit
efa100a253
|
@ -621,6 +621,9 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"chain 0.1.0",
|
||||
"db 0.1.0",
|
||||
"futures 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"message 0.1.0",
|
||||
"miner 0.1.0",
|
||||
|
@ -629,6 +632,7 @@ dependencies = [
|
|||
"primitives 0.1.0",
|
||||
"test-data 0.1.0",
|
||||
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-core 0.1.0 (git+https://github.com/debris/tokio-core)",
|
||||
"verification 0.1.0",
|
||||
]
|
||||
|
||||
|
|
|
@ -28,7 +28,8 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
|
|||
let db = open_db(cfg.use_disk_database);
|
||||
init_db(&db);
|
||||
|
||||
let sync_connection_factory = create_sync_connection_factory(db);
|
||||
let sync_handle = el.handle();
|
||||
let sync_connection_factory = create_sync_connection_factory(&sync_handle, db);
|
||||
|
||||
let p2p = try!(p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle()).map_err(|x| x.to_string()));
|
||||
try!(p2p.run().map_err(|_| "Failed to start p2p module"));
|
||||
|
|
|
@ -43,5 +43,3 @@ fn run() -> Result<(), String> {
|
|||
_ => commands::start(cfg),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -7,6 +7,10 @@ authors = ["Ethcore <admin@ethcore.io>"]
|
|||
parking_lot = "0.3"
|
||||
log = "0.3"
|
||||
time = "0.1"
|
||||
futures = "0.1"
|
||||
futures-cpupool = "0.1"
|
||||
tokio-core = { git = "https://github.com/debris/tokio-core" }
|
||||
linked-hash-map = "0.3"
|
||||
|
||||
chain = { path = "../chain" }
|
||||
db = { path = "../db" }
|
||||
|
|
|
@ -2,9 +2,13 @@ extern crate chain;
|
|||
extern crate db;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
extern crate tokio_core;
|
||||
extern crate message;
|
||||
extern crate p2p;
|
||||
extern crate parking_lot;
|
||||
extern crate linked_hash_map;
|
||||
extern crate primitives;
|
||||
extern crate test_data;
|
||||
extern crate time;
|
||||
|
@ -19,11 +23,13 @@ mod local_node;
|
|||
mod synchronization_chain;
|
||||
mod synchronization_client;
|
||||
mod synchronization_executor;
|
||||
mod synchronization_manager;
|
||||
mod synchronization_peers;
|
||||
mod synchronization_server;
|
||||
|
||||
use std::sync::Arc;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use tokio_core::reactor::Handle;
|
||||
|
||||
/// Sync errors.
|
||||
#[derive(Debug)]
|
||||
|
@ -42,7 +48,7 @@ pub fn create_sync_blocks_writer(db: Arc<db::Store>) -> blocks_writer::BlocksWri
|
|||
}
|
||||
|
||||
/// Create inbound synchronization connections factory for given `db`.
|
||||
pub fn create_sync_connection_factory(db: Arc<db::Store>) -> p2p::LocalSyncNodeRef {
|
||||
pub fn create_sync_connection_factory(handle: &Handle, db: Arc<db::Store>) -> p2p::LocalSyncNodeRef {
|
||||
use synchronization_chain::Chain as SyncChain;
|
||||
use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor;
|
||||
use local_node::LocalNode as SyncNode;
|
||||
|
@ -53,7 +59,7 @@ pub fn create_sync_connection_factory(db: Arc<db::Store>) -> p2p::LocalSyncNodeR
|
|||
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
|
||||
let sync_executor = SyncExecutor::new(sync_chain.clone());
|
||||
let sync_server = Arc::new(Mutex::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone())));
|
||||
let sync_client = SynchronizationClient::new(SynchronizationConfig::default(), sync_executor.clone(), sync_chain);
|
||||
let sync_client = SynchronizationClient::new(SynchronizationConfig::default(), handle, sync_executor.clone(), sync_chain);
|
||||
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
|
||||
SyncConnectionFactory::with_local_node(sync_node)
|
||||
}
|
||||
|
|
|
@ -188,7 +188,7 @@ mod tests {
|
|||
use synchronization_executor::tests::DummyTaskExecutor;
|
||||
use synchronization_client::{Config, SynchronizationClient};
|
||||
use synchronization_chain::Chain;
|
||||
use p2p::{OutboundSyncConnection, OutboundSyncConnectionRef};
|
||||
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
|
||||
use message::types;
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use db;
|
||||
|
@ -196,6 +196,7 @@ mod tests {
|
|||
use test_data;
|
||||
use synchronization_server::ServerTask;
|
||||
use synchronization_server::tests::DummyServer;
|
||||
use tokio_core::reactor::{Core, Handle};
|
||||
|
||||
struct DummyOutboundSyncConnection;
|
||||
|
||||
|
@ -227,19 +228,21 @@ mod tests {
|
|||
fn send_notfound(&self, _message: &types::NotFound) {}
|
||||
}
|
||||
|
||||
fn create_local_node() -> (Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<DummyServer>>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor>>) {
|
||||
fn create_local_node() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<DummyServer>>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor>>) {
|
||||
let event_loop = event_loop();
|
||||
let handle = event_loop.handle();
|
||||
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
|
||||
let executor = DummyTaskExecutor::new();
|
||||
let server = Arc::new(Mutex::new(DummyServer::new()));
|
||||
let config = Config { skip_verification: true };
|
||||
let client = SynchronizationClient::new(config, executor.clone(), chain);
|
||||
let config = Config { threads_num: 1, skip_verification: true };
|
||||
let client = SynchronizationClient::new(config, &handle, executor.clone(), chain);
|
||||
let local_node = LocalNode::new(server.clone(), client, executor.clone());
|
||||
(executor, server, local_node)
|
||||
(event_loop, handle, executor, server, local_node)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_node_request_inventory_on_sync_start() {
|
||||
let (executor, _, local_node) = create_local_node();
|
||||
let (_, _, executor, _, local_node) = create_local_node();
|
||||
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||
// start sync session
|
||||
local_node.start_sync_session(peer_index, 0);
|
||||
|
@ -250,7 +253,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn local_node_serves_block() {
|
||||
let (_, server, local_node) = create_local_node();
|
||||
let (_, _, _, server, local_node) = create_local_node();
|
||||
let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new());
|
||||
// peer requests genesis block
|
||||
let genesis_block_hash = test_data::genesis().hash();
|
||||
|
|
|
@ -5,6 +5,10 @@ use std::collections::HashMap;
|
|||
use std::collections::hash_map::Entry;
|
||||
use std::sync::mpsc::{channel, Sender, Receiver};
|
||||
use parking_lot::Mutex;
|
||||
use futures::{BoxFuture, Future, finished};
|
||||
use futures::stream::Stream;
|
||||
use tokio_core::reactor::{Handle, Interval};
|
||||
use futures_cpupool::CpuPool;
|
||||
use db;
|
||||
use chain::{Block, RepresentH256};
|
||||
use primitives::hash::H256;
|
||||
|
@ -16,7 +20,9 @@ use synchronization_chain::{ChainRef, BlockState};
|
|||
use synchronization_chain::{Information as ChainInformation};
|
||||
use verification::{ChainVerifier, Error as VerificationError, Verify};
|
||||
use synchronization_executor::{Task, TaskExecutor};
|
||||
use synchronization_manager::{manage_synchronization_peers, MANAGEMENT_INTERVAL_MS};
|
||||
use time;
|
||||
use std::time::Duration;
|
||||
|
||||
///! Blocks synchronization process:
|
||||
///!
|
||||
|
@ -70,13 +76,13 @@ use time;
|
|||
/// Approximate maximal number of blocks hashes in scheduled queue.
|
||||
const MAX_SCHEDULED_HASHES: u32 = 4 * 1024;
|
||||
/// Approximate maximal number of blocks hashes in requested queue.
|
||||
const MAX_REQUESTED_BLOCKS: u32 = 512;
|
||||
const MAX_REQUESTED_BLOCKS: u32 = 256;
|
||||
/// Approximate maximal number of blocks in verifying queue.
|
||||
const MAX_VERIFYING_BLOCKS: u32 = 512;
|
||||
const MAX_VERIFYING_BLOCKS: u32 = 256;
|
||||
/// Minimum number of blocks to request from peer
|
||||
const MIN_BLOCKS_IN_REQUEST: u32 = 32;
|
||||
/// Maximum number of blocks to request from peer
|
||||
const MAX_BLOCKS_IN_REQUEST: u32 = 512;
|
||||
const MAX_BLOCKS_IN_REQUEST: u32 = 128;
|
||||
|
||||
/// Synchronization state
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
|
@ -119,8 +125,9 @@ pub trait Client : Send + 'static {
|
|||
}
|
||||
|
||||
/// Synchronization client configuration options.
|
||||
#[derive(Default)]
|
||||
pub struct Config {
|
||||
/// Number of threads to allocate in synchronization CpuPool.
|
||||
pub threads_num: usize,
|
||||
/// Do not verify incoming blocks before inserting to db.
|
||||
pub skip_verification: bool,
|
||||
}
|
||||
|
@ -129,6 +136,10 @@ pub struct Config {
|
|||
pub struct SynchronizationClient<T: TaskExecutor> {
|
||||
/// Synchronization state.
|
||||
state: State,
|
||||
/// Cpu pool.
|
||||
pool: CpuPool,
|
||||
/// Sync management worker.
|
||||
management_worker: Option<BoxFuture<(), ()>>,
|
||||
/// Synchronization peers.
|
||||
peers: Peers,
|
||||
/// Task executor.
|
||||
|
@ -143,6 +154,15 @@ pub struct SynchronizationClient<T: TaskExecutor> {
|
|||
verification_worker_thread: Option<thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Config {
|
||||
threads_num: 4,
|
||||
skip_verification: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn is_synchronizing(&self) -> bool {
|
||||
match self {
|
||||
|
@ -246,11 +266,13 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
|
|||
|
||||
impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
||||
/// Create new synchronization window
|
||||
pub fn new(config: Config, executor: Arc<Mutex<T>>, chain: ChainRef) -> Arc<Mutex<Self>> {
|
||||
pub fn new(config: Config, handle: &Handle, executor: Arc<Mutex<T>>, chain: ChainRef) -> Arc<Mutex<Self>> {
|
||||
let sync = Arc::new(Mutex::new(
|
||||
SynchronizationClient {
|
||||
state: State::Saturated,
|
||||
peers: Peers::new(),
|
||||
pool: CpuPool::new(config.threads_num),
|
||||
management_worker: None,
|
||||
executor: executor,
|
||||
chain: chain.clone(),
|
||||
orphaned_blocks: HashMap::new(),
|
||||
|
@ -273,6 +295,29 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
.expect("Error creating verification thread"));
|
||||
}
|
||||
|
||||
// TODO: start management worker only when synchronization is started
|
||||
// currently impossible because there is no way to call Interval::new with Remote && Handle is not-Send
|
||||
{
|
||||
let csync = Arc::downgrade(&sync);
|
||||
let mut sync = sync.lock();
|
||||
let management_worker = Interval::new(Duration::from_millis(MANAGEMENT_INTERVAL_MS), handle)
|
||||
.expect("Failed to create interval")
|
||||
.and_then(move |_| {
|
||||
let client = match csync.upgrade() {
|
||||
Some(client) => client,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let mut client = client.lock();
|
||||
manage_synchronization_peers(&mut client.peers);
|
||||
client.execute_synchronization_tasks();
|
||||
Ok(())
|
||||
})
|
||||
.for_each(|_| Ok(()))
|
||||
.then(|_| finished::<(), ()>(()))
|
||||
.boxed();
|
||||
sync.management_worker = Some(sync.pool.spawn(management_worker).boxed());
|
||||
}
|
||||
|
||||
sync
|
||||
}
|
||||
|
||||
|
@ -303,7 +348,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
|
||||
let mut chain = self.chain.write();
|
||||
|
||||
loop {
|
||||
'outer: loop {
|
||||
// when synchronization is idling
|
||||
// => request full inventory
|
||||
if !chain.has_blocks_of_state(BlockState::Scheduled)
|
||||
|
@ -344,7 +389,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
|
||||
chain.schedule_blocks_hashes(unknown_peer_hashes);
|
||||
self.peers.insert(peer_index);
|
||||
break;
|
||||
break 'outer;
|
||||
}
|
||||
|
||||
if last_known_peer_hash_index == 0 {
|
||||
|
@ -444,7 +489,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
let new_num_of_blocks = chain.best_block().number;
|
||||
let blocks_diff = if new_num_of_blocks > num_of_blocks { new_num_of_blocks - num_of_blocks} else { 0 };
|
||||
if timestamp_diff >= 60.0 || blocks_diff > 1000 {
|
||||
self.state = State::Synchronizing(new_timestamp, new_num_of_blocks);
|
||||
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
|
||||
|
||||
info!(target: "sync", "Processed {} blocks in {} seconds. Chain information: {:?}"
|
||||
, blocks_diff, timestamp_diff
|
||||
|
@ -515,25 +560,30 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
pub mod tests {
|
||||
use std::sync::Arc;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use tokio_core::reactor::{Core, Handle};
|
||||
use chain::{Block, RepresentH256};
|
||||
use super::{Client, Config, SynchronizationClient};
|
||||
use synchronization_executor::Task;
|
||||
use synchronization_chain::{Chain, ChainRef};
|
||||
use synchronization_executor::tests::DummyTaskExecutor;
|
||||
use p2p::event_loop;
|
||||
use test_data;
|
||||
use db;
|
||||
|
||||
fn create_sync() -> (Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<SynchronizationClient<DummyTaskExecutor>>>) {
|
||||
fn create_sync() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<Mutex<SynchronizationClient<DummyTaskExecutor>>>) {
|
||||
let event_loop = event_loop();
|
||||
let handle = event_loop.handle();
|
||||
let storage = Arc::new(db::TestStorage::with_genesis_block());
|
||||
let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone())));
|
||||
let executor = DummyTaskExecutor::new();
|
||||
let config = Config { skip_verification: true };
|
||||
(executor.clone(), SynchronizationClient::new(config, executor, chain))
|
||||
let config = Config { threads_num: 1, skip_verification: true };
|
||||
let client = SynchronizationClient::new(config, &handle, executor.clone(), chain);
|
||||
(event_loop, handle, executor, client)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn synchronization_saturated_on_start() {
|
||||
let (_, sync) = create_sync();
|
||||
let (_, _, _, sync) = create_sync();
|
||||
let sync = sync.lock();
|
||||
let info = sync.information();
|
||||
assert!(!info.state.is_synchronizing());
|
||||
|
@ -542,7 +592,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_in_order_block_path() {
|
||||
let (executor, sync) = create_sync();
|
||||
let (_, _, executor, sync) = create_sync();
|
||||
|
||||
let mut sync = sync.lock();
|
||||
let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into();
|
||||
|
@ -585,7 +635,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_out_of_order_block_path() {
|
||||
let (_, sync) = create_sync();
|
||||
let (_, _, _, sync) = create_sync();
|
||||
let mut sync = sync.lock();
|
||||
|
||||
let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into();
|
||||
|
@ -607,7 +657,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_parallel_peers() {
|
||||
let (executor, sync) = create_sync();
|
||||
let (_, _, executor, sync) = create_sync();
|
||||
|
||||
let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into();
|
||||
let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into();
|
||||
|
@ -652,7 +702,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_reset_when_peer_is_disconnected() {
|
||||
let (_, sync) = create_sync();
|
||||
let (_, _, _, sync) = create_sync();
|
||||
|
||||
// request new blocks
|
||||
{
|
||||
|
@ -671,7 +721,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_not_starting_when_receiving_known_blocks() {
|
||||
let (executor, sync) = create_sync();
|
||||
let (_, _, executor, sync) = create_sync();
|
||||
let mut sync = sync.lock();
|
||||
// saturated => receive inventory with known blocks only
|
||||
sync.on_new_blocks_inventory(1, vec![test_data::genesis().hash()]);
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
use time::precise_time_s;
|
||||
use synchronization_peers::Peers;
|
||||
|
||||
/// Management interval (in ms)
|
||||
pub const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000;
|
||||
/// Response time to decrease peer score
|
||||
const FAILURE_INTERVAL_S: f64 = 5f64;
|
||||
|
||||
/// Management worker
|
||||
pub fn manage_synchronization_peers(peers: &mut Peers) {
|
||||
// reset tasks for peers, which has not responded during given period
|
||||
for (worst_peer_index, worst_peer_time) in peers.worst_peers() {
|
||||
// check if peer has not responded within given time
|
||||
let time_diff = worst_peer_time - precise_time_s();
|
||||
if time_diff <= FAILURE_INTERVAL_S {
|
||||
break;
|
||||
}
|
||||
|
||||
// decrease score && move to the idle queue
|
||||
trace!(target: "sync", "Failed to get response from peer#{} in {} seconds", worst_peer_index, time_diff);
|
||||
peers.reset_tasks(worst_peer_index);
|
||||
if peers.on_peer_failure(worst_peer_index) {
|
||||
trace!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,16 +1,23 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::hash_map::Entry;
|
||||
use primitives::hash::H256;
|
||||
use linked_hash_map::LinkedHashMap;
|
||||
use time::precise_time_s;
|
||||
|
||||
// TODO: sync score for peers + choose peers based on their score
|
||||
/// Max peer failures # before excluding from sync process
|
||||
const MAX_PEER_FAILURES: usize = 8;
|
||||
|
||||
/// Set of peers selected for synchronization.
|
||||
#[derive(Debug)]
|
||||
pub struct Peers {
|
||||
/// Peers that have not pending blocks requests.
|
||||
idle_peers: HashSet<usize>,
|
||||
/// Pending block requests by peer.
|
||||
blocks_requests: HashMap<usize, HashSet<H256>>,
|
||||
/// Peers that have no pending requests.
|
||||
idle: HashSet<usize>,
|
||||
/// Pending requests by peer.
|
||||
requests: HashMap<usize, HashSet<H256>>,
|
||||
/// Peers failures.
|
||||
failures: HashMap<usize, usize>,
|
||||
/// Last message time from peer
|
||||
times: LinkedHashMap<usize, f64>,
|
||||
}
|
||||
|
||||
/// Information on synchronization peers
|
||||
|
@ -26,8 +33,10 @@ pub struct Information {
|
|||
impl Peers {
|
||||
pub fn new() -> Peers {
|
||||
Peers {
|
||||
idle_peers: HashSet::new(),
|
||||
blocks_requests: HashMap::new(),
|
||||
idle: HashSet::new(),
|
||||
requests: HashMap::new(),
|
||||
failures: HashMap::new(),
|
||||
times: LinkedHashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -35,75 +44,122 @@ impl Peers {
|
|||
#[cfg(test)]
|
||||
pub fn information(&self) -> Information {
|
||||
Information {
|
||||
idle: self.idle_peers.len(),
|
||||
active: self.blocks_requests.len(),
|
||||
idle: self.idle.len(),
|
||||
active: self.requests.len(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get idle peer.
|
||||
#[cfg(test)]
|
||||
pub fn idle_peer(&self) -> Option<usize> {
|
||||
self.idle_peers.iter().cloned().next()
|
||||
self.idle.iter().cloned().next()
|
||||
}
|
||||
|
||||
/// Get idle peers.
|
||||
pub fn idle_peers(&self) -> Vec<usize> {
|
||||
self.idle_peers.iter().cloned().collect()
|
||||
self.idle.iter().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get worst peer.
|
||||
pub fn worst_peers(&self) -> Vec<(usize, f64)> {
|
||||
self.times.iter().map(|(&pi, &t)| (pi, t)).collect()
|
||||
}
|
||||
|
||||
/// Insert new synchronization peer.
|
||||
pub fn insert(&mut self, peer_index: usize) {
|
||||
if !self.idle_peers.contains(&peer_index) && !self.blocks_requests.contains_key(&peer_index) {
|
||||
self.idle_peers.insert(peer_index);
|
||||
}
|
||||
}
|
||||
|
||||
/// Block is received from peer.
|
||||
pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) {
|
||||
if let Entry::Occupied(mut entry) = self.blocks_requests.entry(peer_index) {
|
||||
entry.get_mut().remove(block_hash);
|
||||
if entry.get().is_empty() {
|
||||
self.idle_peers.insert(peer_index);
|
||||
entry.remove_entry();
|
||||
}
|
||||
if !self.idle.contains(&peer_index) && !self.requests.contains_key(&peer_index) {
|
||||
self.idle.insert(peer_index);
|
||||
}
|
||||
}
|
||||
|
||||
/// Peer has been disconnected
|
||||
pub fn on_peer_disconnected(&mut self, peer_index: usize) {
|
||||
self.idle_peers.remove(&peer_index);
|
||||
self.blocks_requests.remove(&peer_index);
|
||||
self.idle.remove(&peer_index);
|
||||
self.requests.remove(&peer_index);
|
||||
self.failures.remove(&peer_index);
|
||||
self.times.remove(&peer_index);
|
||||
}
|
||||
|
||||
/// Block is received from peer.
|
||||
pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) {
|
||||
if let Entry::Occupied(mut entry) = self.requests.entry(peer_index) {
|
||||
entry.get_mut().remove(block_hash);
|
||||
if entry.get().is_empty() {
|
||||
self.idle.insert(peer_index);
|
||||
entry.remove_entry();
|
||||
}
|
||||
}
|
||||
self.on_peer_message(peer_index);
|
||||
}
|
||||
|
||||
/// Blocks have been requested from peer.
|
||||
pub fn on_blocks_requested(&mut self, peer_index: usize, blocks_hashes: &Vec<H256>) {
|
||||
// inventory can only be requested from idle peers
|
||||
assert!(!self.blocks_requests.contains_key(&peer_index));
|
||||
assert!(!self.requests.contains_key(&peer_index));
|
||||
|
||||
self.idle_peers.remove(&peer_index);
|
||||
self.blocks_requests.entry(peer_index).or_insert(HashSet::new()).extend(blocks_hashes.iter().cloned());
|
||||
self.idle.remove(&peer_index);
|
||||
self.requests.entry(peer_index).or_insert(HashSet::new()).extend(blocks_hashes.iter().cloned());
|
||||
self.times.insert(peer_index, precise_time_s());
|
||||
}
|
||||
|
||||
/// Inventory has been requested from peer.
|
||||
pub fn on_inventory_requested(&mut self, peer_index: usize) {
|
||||
// inventory can only be requested from idle peers
|
||||
assert!(!self.blocks_requests.contains_key(&peer_index));
|
||||
self.idle_peers.remove(&peer_index);
|
||||
assert!(!self.requests.contains_key(&peer_index));
|
||||
|
||||
self.idle.remove(&peer_index);
|
||||
// peer is now out-of-synchronization process, because:
|
||||
// 1) if it has new blocks, it will respond with `inventory` message && will be insrted back here
|
||||
// 1) if it has new blocks, it will respond with `inventory` message && will be inserted back here
|
||||
// 2) if it has no new blocks => either synchronization is completed, or it is behind us in sync
|
||||
}
|
||||
|
||||
/// We have failed to get response from peer during given period
|
||||
pub fn on_peer_failure(&mut self, peer_index: usize) -> bool {
|
||||
let peer_failures = match self.failures.entry(peer_index) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
let failures = entry.get() + 1;
|
||||
entry.insert(failures) + 1;
|
||||
failures
|
||||
},
|
||||
Entry::Vacant(entry) => *entry.insert(1),
|
||||
};
|
||||
|
||||
let too_much_failures = peer_failures >= MAX_PEER_FAILURES;
|
||||
if too_much_failures {
|
||||
self.failures.remove(&peer_index);
|
||||
self.requests.remove(&peer_index);
|
||||
self.times.remove(&peer_index);
|
||||
}
|
||||
too_much_failures
|
||||
}
|
||||
|
||||
/// Reset peers state
|
||||
pub fn reset(&mut self) {
|
||||
self.idle_peers.extend(self.blocks_requests.drain().map(|(k, _)| k));
|
||||
self.idle.extend(self.requests.drain().map(|(k, _)| k));
|
||||
self.failures.clear();
|
||||
self.times.clear();
|
||||
}
|
||||
|
||||
/// Reset peer tasks
|
||||
pub fn reset_tasks(&mut self, peer_index: usize) {
|
||||
self.requests.remove(&peer_index);
|
||||
self.times.remove(&peer_index);
|
||||
self.idle.insert(peer_index);
|
||||
}
|
||||
|
||||
/// When sync message is received from peer
|
||||
fn on_peer_message(&mut self, peer_index: usize) {
|
||||
self.failures.remove(&peer_index);
|
||||
self.times.remove(&peer_index);
|
||||
if self.requests.contains_key(&peer_index) {
|
||||
self.times.insert(peer_index, precise_time_s());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::Peers;
|
||||
use super::{Peers, MAX_PEER_FAILURES};
|
||||
use primitives::hash::H256;
|
||||
|
||||
#[test]
|
||||
|
@ -201,4 +257,41 @@ mod tests {
|
|||
assert_eq!(peers.information().idle, 3);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peers_worst() {
|
||||
let mut peers = Peers::new();
|
||||
|
||||
peers.insert(1);
|
||||
peers.insert(2);
|
||||
assert_eq!(peers.worst_peers(), vec![]);
|
||||
|
||||
peers.on_blocks_requested(1, &vec![H256::default()]);
|
||||
assert_eq!(peers.worst_peers().len(), 1);
|
||||
assert_eq!(peers.worst_peers()[0].0, 1);
|
||||
|
||||
peers.on_blocks_requested(2, &vec![H256::default()]);
|
||||
assert_eq!(peers.worst_peers().len(), 2);
|
||||
assert_eq!(peers.worst_peers()[0].0, 1);
|
||||
assert_eq!(peers.worst_peers()[1].0, 2);
|
||||
|
||||
assert_eq!(peers.information().idle, 0);
|
||||
assert_eq!(peers.information().active, 2);
|
||||
|
||||
peers.reset_tasks(1);
|
||||
|
||||
assert_eq!(peers.information().idle, 1);
|
||||
assert_eq!(peers.information().active, 1);
|
||||
|
||||
assert_eq!(peers.worst_peers().len(), 1);
|
||||
assert_eq!(peers.worst_peers()[0].0, 2);
|
||||
|
||||
for _ in 0..MAX_PEER_FAILURES {
|
||||
peers.on_peer_failure(2);
|
||||
}
|
||||
|
||||
assert_eq!(peers.worst_peers().len(), 0);
|
||||
assert_eq!(peers.information().idle, 1);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue