diff --git a/Cargo.lock b/Cargo.lock index b8dceba5..ee8797c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -611,6 +611,9 @@ version = "0.1.0" dependencies = [ "chain 0.1.0", "db 0.1.0", + "futures 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "message 0.1.0", "miner 0.1.0", @@ -619,6 +622,7 @@ dependencies = [ "primitives 0.1.0", "test-data 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.0 (git+https://github.com/debris/tokio-core)", "verification 0.1.0", ] diff --git a/pbtc/commands/start.rs b/pbtc/commands/start.rs index 5d5ae234..fd36d258 100644 --- a/pbtc/commands/start.rs +++ b/pbtc/commands/start.rs @@ -27,7 +27,8 @@ pub fn start(cfg: config::Config) -> Result<(), String> { let db = open_db(cfg.use_disk_database); init_db(&db); - let sync_connection_factory = create_sync_connection_factory(db); + let sync_handle = el.handle(); + let sync_connection_factory = create_sync_connection_factory(&sync_handle, db); let p2p = p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle()); try!(p2p.run().map_err(|_| "Failed to start p2p module")); diff --git a/pbtc/main.rs b/pbtc/main.rs index 0f4a6c41..b08a2951 100644 --- a/pbtc/main.rs +++ b/pbtc/main.rs @@ -43,5 +43,3 @@ fn run() -> Result<(), String> { _ => commands::start(cfg), } } - - diff --git a/sync/Cargo.toml b/sync/Cargo.toml index ed1f1a54..d2b626e6 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -7,6 +7,10 @@ authors = ["Ethcore "] parking_lot = "0.3" log = "0.3" time = "0.1" +futures = "0.1" +futures-cpupool = "0.1" +tokio-core = { git = "https://github.com/debris/tokio-core" } +linked-hash-map = "0.3" chain = { path = "../chain" } db = { path = "../db" } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 1a3ee849..20c60229 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -2,9 +2,13 @@ extern crate chain; extern crate db; #[macro_use] extern crate log; +extern crate futures; +extern crate futures_cpupool; +extern crate tokio_core; extern crate message; extern crate p2p; extern crate parking_lot; +extern crate linked_hash_map; extern crate primitives; extern crate test_data; extern crate time; @@ -19,11 +23,13 @@ mod local_node; mod synchronization_chain; mod synchronization_client; mod synchronization_executor; +mod synchronization_manager; mod synchronization_peers; mod synchronization_server; use std::sync::Arc; use parking_lot::{Mutex, RwLock}; +use tokio_core::reactor::Handle; /// Sync errors. #[derive(Debug)] @@ -42,7 +48,7 @@ pub fn create_sync_blocks_writer(db: Arc) -> blocks_writer::BlocksWri } /// Create inbound synchronization connections factory for given `db`. -pub fn create_sync_connection_factory(db: Arc) -> p2p::LocalSyncNodeRef { +pub fn create_sync_connection_factory(handle: &Handle, db: Arc) -> p2p::LocalSyncNodeRef { use synchronization_chain::Chain as SyncChain; use synchronization_executor::LocalSynchronizationTaskExecutor as SyncExecutor; use local_node::LocalNode as SyncNode; @@ -53,7 +59,7 @@ pub fn create_sync_connection_factory(db: Arc) -> p2p::LocalSyncNodeR let sync_chain = Arc::new(RwLock::new(SyncChain::new(db))); let sync_executor = SyncExecutor::new(sync_chain.clone()); let sync_server = Arc::new(Mutex::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()))); - let sync_client = SynchronizationClient::new(SynchronizationConfig::default(), sync_executor.clone(), sync_chain); + let sync_client = SynchronizationClient::new(SynchronizationConfig::default(), handle, sync_executor.clone(), sync_chain); let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor)); SyncConnectionFactory::with_local_node(sync_node) } diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 6a4ffb24..99ed22dc 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -188,7 +188,7 @@ mod tests { use synchronization_executor::tests::DummyTaskExecutor; use synchronization_client::{Config, SynchronizationClient}; use synchronization_chain::Chain; - use p2p::{OutboundSyncConnection, OutboundSyncConnectionRef}; + use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef}; use message::types; use message::common::{InventoryVector, InventoryType}; use db; @@ -196,6 +196,7 @@ mod tests { use test_data; use synchronization_server::ServerTask; use synchronization_server::tests::DummyServer; + use tokio_core::reactor::{Core, Handle}; struct DummyOutboundSyncConnection; @@ -227,19 +228,21 @@ mod tests { fn send_notfound(&self, _message: &types::NotFound) {} } - fn create_local_node() -> (Arc>, Arc>, LocalNode>) { + fn create_local_node() -> (Core, Handle, Arc>, Arc>, LocalNode>) { + let event_loop = event_loop(); + let handle = event_loop.handle(); let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block())))); let executor = DummyTaskExecutor::new(); let server = Arc::new(Mutex::new(DummyServer::new())); - let config = Config { skip_verification: true }; - let client = SynchronizationClient::new(config, executor.clone(), chain); + let config = Config { threads_num: 1, skip_verification: true }; + let client = SynchronizationClient::new(config, &handle, executor.clone(), chain); let local_node = LocalNode::new(server.clone(), client, executor.clone()); - (executor, server, local_node) + (event_loop, handle, executor, server, local_node) } #[test] fn local_node_request_inventory_on_sync_start() { - let (executor, _, local_node) = create_local_node(); + let (_, _, executor, _, local_node) = create_local_node(); let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); // start sync session local_node.start_sync_session(peer_index, 0); @@ -250,7 +253,7 @@ mod tests { #[test] fn local_node_serves_block() { - let (_, server, local_node) = create_local_node(); + let (_, _, _, server, local_node) = create_local_node(); let peer_index = local_node.create_sync_session(0, DummyOutboundSyncConnection::new()); // peer requests genesis block let genesis_block_hash = test_data::genesis().hash(); diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index dde07f9a..1ab9d1cb 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -5,6 +5,10 @@ use std::collections::HashMap; use std::collections::hash_map::Entry; use std::sync::mpsc::{channel, Sender, Receiver}; use parking_lot::Mutex; +use futures::{BoxFuture, Future, finished}; +use futures::stream::Stream; +use tokio_core::reactor::{Handle, Interval}; +use futures_cpupool::CpuPool; use db; use chain::{Block, RepresentH256}; use primitives::hash::H256; @@ -16,7 +20,9 @@ use synchronization_chain::{ChainRef, BlockState}; use synchronization_chain::{Information as ChainInformation}; use verification::{ChainVerifier, Error as VerificationError, Verify}; use synchronization_executor::{Task, TaskExecutor}; +use synchronization_manager::{manage_synchronization_peers, MANAGEMENT_INTERVAL_MS}; use time; +use std::time::Duration; ///! Blocks synchronization process: ///! @@ -70,13 +76,13 @@ use time; /// Approximate maximal number of blocks hashes in scheduled queue. const MAX_SCHEDULED_HASHES: u32 = 4 * 1024; /// Approximate maximal number of blocks hashes in requested queue. -const MAX_REQUESTED_BLOCKS: u32 = 512; +const MAX_REQUESTED_BLOCKS: u32 = 256; /// Approximate maximal number of blocks in verifying queue. -const MAX_VERIFYING_BLOCKS: u32 = 512; +const MAX_VERIFYING_BLOCKS: u32 = 256; /// Minimum number of blocks to request from peer const MIN_BLOCKS_IN_REQUEST: u32 = 32; /// Maximum number of blocks to request from peer -const MAX_BLOCKS_IN_REQUEST: u32 = 512; +const MAX_BLOCKS_IN_REQUEST: u32 = 128; /// Synchronization state #[derive(Debug, Clone, Copy)] @@ -119,8 +125,9 @@ pub trait Client : Send + 'static { } /// Synchronization client configuration options. -#[derive(Default)] pub struct Config { + /// Number of threads to allocate in synchronization CpuPool. + pub threads_num: usize, /// Do not verify incoming blocks before inserting to db. pub skip_verification: bool, } @@ -129,6 +136,10 @@ pub struct Config { pub struct SynchronizationClient { /// Synchronization state. state: State, + /// Cpu pool. + pool: CpuPool, + /// Sync management worker. + management_worker: Option>, /// Synchronization peers. peers: Peers, /// Task executor. @@ -143,6 +154,15 @@ pub struct SynchronizationClient { verification_worker_thread: Option>, } +impl Default for Config { + fn default() -> Self { + Config { + threads_num: 4, + skip_verification: false, + } + } +} + impl State { pub fn is_synchronizing(&self) -> bool { match self { @@ -246,11 +266,13 @@ impl Client for SynchronizationClient where T: TaskExecutor { impl SynchronizationClient where T: TaskExecutor { /// Create new synchronization window - pub fn new(config: Config, executor: Arc>, chain: ChainRef) -> Arc> { + pub fn new(config: Config, handle: &Handle, executor: Arc>, chain: ChainRef) -> Arc> { let sync = Arc::new(Mutex::new( SynchronizationClient { state: State::Saturated, peers: Peers::new(), + pool: CpuPool::new(config.threads_num), + management_worker: None, executor: executor, chain: chain.clone(), orphaned_blocks: HashMap::new(), @@ -273,6 +295,29 @@ impl SynchronizationClient where T: TaskExecutor { .expect("Error creating verification thread")); } + // TODO: start management worker only when synchronization is started + // currently impossible because there is no way to call Interval::new with Remote && Handle is not-Send + { + let csync = Arc::downgrade(&sync); + let mut sync = sync.lock(); + let management_worker = Interval::new(Duration::from_millis(MANAGEMENT_INTERVAL_MS), handle) + .expect("Failed to create interval") + .and_then(move |_| { + let client = match csync.upgrade() { + Some(client) => client, + None => return Ok(()), + }; + let mut client = client.lock(); + manage_synchronization_peers(&mut client.peers); + client.execute_synchronization_tasks(); + Ok(()) + }) + .for_each(|_| Ok(())) + .then(|_| finished::<(), ()>(())) + .boxed(); + sync.management_worker = Some(sync.pool.spawn(management_worker).boxed()); + } + sync } @@ -303,7 +348,7 @@ impl SynchronizationClient where T: TaskExecutor { let mut chain = self.chain.write(); - loop { + 'outer: loop { // when synchronization is idling // => request full inventory if !chain.has_blocks_of_state(BlockState::Scheduled) @@ -344,7 +389,7 @@ impl SynchronizationClient where T: TaskExecutor { chain.schedule_blocks_hashes(unknown_peer_hashes); self.peers.insert(peer_index); - break; + break 'outer; } if last_known_peer_hash_index == 0 { @@ -444,7 +489,7 @@ impl SynchronizationClient where T: TaskExecutor { let new_num_of_blocks = chain.best_block().number; let blocks_diff = if new_num_of_blocks > num_of_blocks { new_num_of_blocks - num_of_blocks} else { 0 }; if timestamp_diff >= 60.0 || blocks_diff > 1000 { - self.state = State::Synchronizing(new_timestamp, new_num_of_blocks); + self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); info!(target: "sync", "Processed {} blocks in {} seconds. Chain information: {:?}" , blocks_diff, timestamp_diff @@ -515,25 +560,30 @@ impl SynchronizationClient where T: TaskExecutor { pub mod tests { use std::sync::Arc; use parking_lot::{Mutex, RwLock}; + use tokio_core::reactor::{Core, Handle}; use chain::{Block, RepresentH256}; use super::{Client, Config, SynchronizationClient}; use synchronization_executor::Task; use synchronization_chain::{Chain, ChainRef}; use synchronization_executor::tests::DummyTaskExecutor; + use p2p::event_loop; use test_data; use db; - fn create_sync() -> (Arc>, Arc>>) { + fn create_sync() -> (Core, Handle, Arc>, Arc>>) { + let event_loop = event_loop(); + let handle = event_loop.handle(); let storage = Arc::new(db::TestStorage::with_genesis_block()); let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone()))); let executor = DummyTaskExecutor::new(); - let config = Config { skip_verification: true }; - (executor.clone(), SynchronizationClient::new(config, executor, chain)) + let config = Config { threads_num: 1, skip_verification: true }; + let client = SynchronizationClient::new(config, &handle, executor.clone(), chain); + (event_loop, handle, executor, client) } #[test] fn synchronization_saturated_on_start() { - let (_, sync) = create_sync(); + let (_, _, _, sync) = create_sync(); let sync = sync.lock(); let info = sync.information(); assert!(!info.state.is_synchronizing()); @@ -542,7 +592,7 @@ pub mod tests { #[test] fn synchronization_in_order_block_path() { - let (executor, sync) = create_sync(); + let (_, _, executor, sync) = create_sync(); let mut sync = sync.lock(); let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into(); @@ -585,7 +635,7 @@ pub mod tests { #[test] fn synchronization_out_of_order_block_path() { - let (_, sync) = create_sync(); + let (_, _, _, sync) = create_sync(); let mut sync = sync.lock(); let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into(); @@ -607,7 +657,7 @@ pub mod tests { #[test] fn synchronization_parallel_peers() { - let (executor, sync) = create_sync(); + let (_, _, executor, sync) = create_sync(); let block1: Block = "010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e362990101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000".into(); let block2: Block = "010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd610101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d010bffffffff0100f2052a010000004341047211a824f55b505228e4c3d5194c1fcfaa15a456abdf37f9b9d97a4040afc073dee6c89064984f03385237d92167c13e236446b417ab79a0fcae412ae3316b77ac00000000".into(); @@ -652,7 +702,7 @@ pub mod tests { #[test] fn synchronization_reset_when_peer_is_disconnected() { - let (_, sync) = create_sync(); + let (_, _, _, sync) = create_sync(); // request new blocks { @@ -671,7 +721,7 @@ pub mod tests { #[test] fn synchronization_not_starting_when_receiving_known_blocks() { - let (executor, sync) = create_sync(); + let (_, _, executor, sync) = create_sync(); let mut sync = sync.lock(); // saturated => receive inventory with known blocks only sync.on_new_blocks_inventory(1, vec![test_data::genesis().hash()]); diff --git a/sync/src/synchronization_manager.rs b/sync/src/synchronization_manager.rs new file mode 100644 index 00000000..83118bdf --- /dev/null +++ b/sync/src/synchronization_manager.rs @@ -0,0 +1,26 @@ +use time::precise_time_s; +use synchronization_peers::Peers; + +/// Management interval (in ms) +pub const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000; +/// Response time to decrease peer score +const FAILURE_INTERVAL_S: f64 = 5f64; + +/// Management worker +pub fn manage_synchronization_peers(peers: &mut Peers) { + // reset tasks for peers, which has not responded during given period + for (worst_peer_index, worst_peer_time) in peers.worst_peers() { + // check if peer has not responded within given time + let time_diff = worst_peer_time - precise_time_s(); + if time_diff <= FAILURE_INTERVAL_S { + break; + } + + // decrease score && move to the idle queue + trace!(target: "sync", "Failed to get response from peer#{} in {} seconds", worst_peer_index, time_diff); + peers.reset_tasks(worst_peer_index); + if peers.on_peer_failure(worst_peer_index) { + trace!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index); + } + } +} diff --git a/sync/src/synchronization_peers.rs b/sync/src/synchronization_peers.rs index e0f6057f..cb296916 100644 --- a/sync/src/synchronization_peers.rs +++ b/sync/src/synchronization_peers.rs @@ -1,16 +1,23 @@ use std::collections::{HashMap, HashSet}; use std::collections::hash_map::Entry; use primitives::hash::H256; +use linked_hash_map::LinkedHashMap; +use time::precise_time_s; -// TODO: sync score for peers + choose peers based on their score +/// Max peer failures # before excluding from sync process +const MAX_PEER_FAILURES: usize = 8; /// Set of peers selected for synchronization. #[derive(Debug)] pub struct Peers { - /// Peers that have not pending blocks requests. - idle_peers: HashSet, - /// Pending block requests by peer. - blocks_requests: HashMap>, + /// Peers that have no pending requests. + idle: HashSet, + /// Pending requests by peer. + requests: HashMap>, + /// Peers failures. + failures: HashMap, + /// Last message time from peer + times: LinkedHashMap, } /// Information on synchronization peers @@ -26,8 +33,10 @@ pub struct Information { impl Peers { pub fn new() -> Peers { Peers { - idle_peers: HashSet::new(), - blocks_requests: HashMap::new(), + idle: HashSet::new(), + requests: HashMap::new(), + failures: HashMap::new(), + times: LinkedHashMap::new(), } } @@ -35,75 +44,122 @@ impl Peers { #[cfg(test)] pub fn information(&self) -> Information { Information { - idle: self.idle_peers.len(), - active: self.blocks_requests.len(), + idle: self.idle.len(), + active: self.requests.len(), } } /// Get idle peer. #[cfg(test)] pub fn idle_peer(&self) -> Option { - self.idle_peers.iter().cloned().next() + self.idle.iter().cloned().next() } /// Get idle peers. pub fn idle_peers(&self) -> Vec { - self.idle_peers.iter().cloned().collect() + self.idle.iter().cloned().collect() + } + + /// Get worst peer. + pub fn worst_peers(&self) -> Vec<(usize, f64)> { + self.times.iter().map(|(&pi, &t)| (pi, t)).collect() } /// Insert new synchronization peer. pub fn insert(&mut self, peer_index: usize) { - if !self.idle_peers.contains(&peer_index) && !self.blocks_requests.contains_key(&peer_index) { - self.idle_peers.insert(peer_index); - } - } - - /// Block is received from peer. - pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) { - if let Entry::Occupied(mut entry) = self.blocks_requests.entry(peer_index) { - entry.get_mut().remove(block_hash); - if entry.get().is_empty() { - self.idle_peers.insert(peer_index); - entry.remove_entry(); - } + if !self.idle.contains(&peer_index) && !self.requests.contains_key(&peer_index) { + self.idle.insert(peer_index); } } /// Peer has been disconnected pub fn on_peer_disconnected(&mut self, peer_index: usize) { - self.idle_peers.remove(&peer_index); - self.blocks_requests.remove(&peer_index); + self.idle.remove(&peer_index); + self.requests.remove(&peer_index); + self.failures.remove(&peer_index); + self.times.remove(&peer_index); + } + + /// Block is received from peer. + pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) { + if let Entry::Occupied(mut entry) = self.requests.entry(peer_index) { + entry.get_mut().remove(block_hash); + if entry.get().is_empty() { + self.idle.insert(peer_index); + entry.remove_entry(); + } + } + self.on_peer_message(peer_index); } /// Blocks have been requested from peer. pub fn on_blocks_requested(&mut self, peer_index: usize, blocks_hashes: &Vec) { // inventory can only be requested from idle peers - assert!(!self.blocks_requests.contains_key(&peer_index)); + assert!(!self.requests.contains_key(&peer_index)); - self.idle_peers.remove(&peer_index); - self.blocks_requests.entry(peer_index).or_insert(HashSet::new()).extend(blocks_hashes.iter().cloned()); + self.idle.remove(&peer_index); + self.requests.entry(peer_index).or_insert(HashSet::new()).extend(blocks_hashes.iter().cloned()); + self.times.insert(peer_index, precise_time_s()); } /// Inventory has been requested from peer. pub fn on_inventory_requested(&mut self, peer_index: usize) { // inventory can only be requested from idle peers - assert!(!self.blocks_requests.contains_key(&peer_index)); - self.idle_peers.remove(&peer_index); + assert!(!self.requests.contains_key(&peer_index)); + self.idle.remove(&peer_index); // peer is now out-of-synchronization process, because: - // 1) if it has new blocks, it will respond with `inventory` message && will be insrted back here + // 1) if it has new blocks, it will respond with `inventory` message && will be inserted back here // 2) if it has no new blocks => either synchronization is completed, or it is behind us in sync } + /// We have failed to get response from peer during given period + pub fn on_peer_failure(&mut self, peer_index: usize) -> bool { + let peer_failures = match self.failures.entry(peer_index) { + Entry::Occupied(mut entry) => { + let failures = entry.get() + 1; + entry.insert(failures) + 1; + failures + }, + Entry::Vacant(entry) => *entry.insert(1), + }; + + let too_much_failures = peer_failures >= MAX_PEER_FAILURES; + if too_much_failures { + self.failures.remove(&peer_index); + self.requests.remove(&peer_index); + self.times.remove(&peer_index); + } + too_much_failures + } + /// Reset peers state pub fn reset(&mut self) { - self.idle_peers.extend(self.blocks_requests.drain().map(|(k, _)| k)); + self.idle.extend(self.requests.drain().map(|(k, _)| k)); + self.failures.clear(); + self.times.clear(); + } + + /// Reset peer tasks + pub fn reset_tasks(&mut self, peer_index: usize) { + self.requests.remove(&peer_index); + self.times.remove(&peer_index); + self.idle.insert(peer_index); + } + + /// When sync message is received from peer + fn on_peer_message(&mut self, peer_index: usize) { + self.failures.remove(&peer_index); + self.times.remove(&peer_index); + if self.requests.contains_key(&peer_index) { + self.times.insert(peer_index, precise_time_s()); + } } } #[cfg(test)] mod tests { - use super::Peers; + use super::{Peers, MAX_PEER_FAILURES}; use primitives::hash::H256; #[test] @@ -201,4 +257,41 @@ mod tests { assert_eq!(peers.information().idle, 3); assert_eq!(peers.information().active, 0); } -} \ No newline at end of file + + #[test] + fn peers_worst() { + let mut peers = Peers::new(); + + peers.insert(1); + peers.insert(2); + assert_eq!(peers.worst_peers(), vec![]); + + peers.on_blocks_requested(1, &vec![H256::default()]); + assert_eq!(peers.worst_peers().len(), 1); + assert_eq!(peers.worst_peers()[0].0, 1); + + peers.on_blocks_requested(2, &vec![H256::default()]); + assert_eq!(peers.worst_peers().len(), 2); + assert_eq!(peers.worst_peers()[0].0, 1); + assert_eq!(peers.worst_peers()[1].0, 2); + + assert_eq!(peers.information().idle, 0); + assert_eq!(peers.information().active, 2); + + peers.reset_tasks(1); + + assert_eq!(peers.information().idle, 1); + assert_eq!(peers.information().active, 1); + + assert_eq!(peers.worst_peers().len(), 1); + assert_eq!(peers.worst_peers()[0].0, 2); + + for _ in 0..MAX_PEER_FAILURES { + peers.on_peer_failure(2); + } + + assert_eq!(peers.worst_peers().len(), 0); + assert_eq!(peers.information().idle, 1); + assert_eq!(peers.information().active, 0); + } +}