sort peers by response time before sending blocks requests

This commit is contained in:
Svyatoslav Nikolsky 2016-12-26 17:21:04 +03:00
parent 40ace5b0d4
commit c23b0f55a4
4 changed files with 333 additions and 272 deletions

View File

@ -19,7 +19,7 @@ use primitives::hash::H256;
use verification::BackwardsCompatibleChainVerifier as ChainVerifier;
use synchronization_chain::{Chain, BlockState, TransactionState, BlockInsertionResult};
use synchronization_executor::{Task, TaskExecutor};
use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory,
use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_headers,
manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS,
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
use synchronization_peers_tasks::PeersTasks;
@ -261,7 +261,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
let mut headers: Vec<_> = message.headers.into_iter().map(IndexedBlockHeader::from).collect();
// update peers to select next tasks
self.peers_tasks.on_inventory_received(peer_index);
self.peers_tasks.on_headers_received(peer_index);
// headers are ordered
// => if we know nothing about headers[0].parent
@ -459,13 +459,15 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
// we only interested in blocks, which we were asking before
if let Some(requested_blocks) = self.peers_tasks.get_blocks_tasks(peer_index) {
let is_requested_block = if let Some(requested_blocks) = self.peers_tasks.get_blocks_tasks(peer_index) {
// check if peer has responded with notfound to requested blocks
if requested_blocks.intersection(&notfound_blocks).nth(0).is_none() {
// if notfound some other blocks => just ignore the message
return;
}
// if notfound some other blocks => just ignore the message
requested_blocks.intersection(&notfound_blocks).nth(0).is_some()
} else {
false
};
if is_requested_block {
// for now, let's exclude peer from synchronization - we are relying on full nodes for synchronization
let removed_tasks = self.peers_tasks.reset_blocks_tasks(peer_index);
self.peers_tasks.unuseful_peer(peer_index);
@ -536,22 +538,22 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
}
let mut blocks_requests: Option<Vec<H256>> = None;
let blocks_idle_peers = self.peers_tasks.idle_peers_for_blocks();
let blocks_idle_peers: Vec<_> = self.peers_tasks.idle_peers_for_blocks().iter().cloned().collect();
{
// check if we can query some blocks hashes
let inventory_idle_peers = self.peers_tasks.idle_peers_for_inventory();
if !inventory_idle_peers.is_empty() {
// check if we can query some blocks headers
let headers_idle_peers: Vec<_> = self.peers_tasks.idle_peers_for_headers().iter().cloned().collect();
if !headers_idle_peers.is_empty() {
let scheduled_hashes_len = self.chain.length_of_blocks_state(BlockState::Scheduled);
if scheduled_hashes_len < MAX_SCHEDULED_HASHES {
for inventory_peer in &inventory_idle_peers {
self.peers_tasks.on_inventory_requested(*inventory_peer);
for header_peer in &headers_idle_peers {
self.peers_tasks.on_headers_requested(*header_peer);
}
let block_locator_hashes = self.chain.block_locator_hashes();
let inventory_tasks = inventory_idle_peers
.into_iter()
.map(move |peer_index| Task::GetHeaders(peer_index, types::GetHeaders::with_block_locator_hashes(block_locator_hashes.clone())));
tasks.extend(inventory_tasks);
let headers_tasks = headers_idle_peers
.iter()
.map(move |peer_index| Task::GetHeaders(*peer_index, types::GetHeaders::with_block_locator_hashes(block_locator_hashes.clone())));
tasks.extend(headers_tasks);
}
}
@ -706,7 +708,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
shared_state: shared_state,
state: State::Saturated,
peers: peers,
peers_tasks: PeersTasks::new(),
peers_tasks: PeersTasks::default(),
pool: CpuPool::new(config.threads_num),
management_worker: None,
executor: executor,
@ -748,7 +750,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
if blocks_to_forget.is_empty() { None } else { Some(blocks_to_forget) },
);
manage_synchronization_peers_inventory(&peers_config, &mut client.peers_tasks);
manage_synchronization_peers_headers(&peers_config, &mut client.peers_tasks);
manage_orphaned_transactions(&orphan_config, &mut client.orphaned_transactions_pool);
if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, &mut client.orphaned_blocks_pool) {
for orphan_to_remove in orphans_to_remove {
@ -896,10 +898,12 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
fn prepare_blocks_requests_tasks(&mut self, peers: Vec<PeerIndex>, mut hashes: Vec<H256>) -> Vec<Task> {
fn prepare_blocks_requests_tasks(&mut self, mut peers: Vec<PeerIndex>, mut hashes: Vec<H256>) -> Vec<Task> {
use std::mem::swap;
// TODO: ask most fast peers for hashes at the beginning of `hashes`
// ask fastest peers for hashes at the beginning of `hashes`
self.peers_tasks.sort_peers_for_blocks(&mut peers);
let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(hashes.len() as BlockHeight, MIN_BLOCKS_IN_REQUEST));
let last_peer_index = peers.len() - 1;
let mut tasks: Vec<Task> = Vec::new();
@ -978,8 +982,8 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
{
let block_locator_hashes: Vec<H256> = self.chain.block_locator_hashes();
for peer in self.peers_tasks.all_peers() {
self.executor.execute(Task::GetHeaders(peer, types::GetHeaders::with_block_locator_hashes(block_locator_hashes.clone())));
self.executor.execute(Task::MemoryPool(peer));
self.executor.execute(Task::GetHeaders(*peer, types::GetHeaders::with_block_locator_hashes(block_locator_hashes.clone())));
self.executor.execute(Task::MemoryPool(*peer));
}
}
}

View File

@ -8,8 +8,8 @@ use utils::{OrphanBlocksPool, OrphanTransactionsPool};
pub const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000;
/// Response time before getting block to decrease peer score
const DEFAULT_PEER_BLOCK_FAILURE_INTERVAL_MS: u32 = 60 * 1000;
/// Response time before getting inventory to decrease peer score
const DEFAULT_PEER_INVENTORY_FAILURE_INTERVAL_MS: u32 = 60 * 1000;
/// Response time before getting headers to decrease peer score
const DEFAULT_PEER_HEADERS_FAILURE_INTERVAL_MS: u32 = 60 * 1000;
/// Unknown orphan block removal time
const DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS: u32 = 20 * 60 * 1000;
/// Maximal number of orphaned blocks
@ -23,15 +23,15 @@ const DEFAULT_ORPHAN_TRANSACTIONS_MAX_LEN: usize = 10000;
pub struct ManagePeersConfig {
/// Time interval (in milliseconds) to wait block from the peer before penalizing && reexecuting tasks
pub block_failure_interval_ms: u32,
/// Time interval (in milliseconds) to wait inventory from the peer before penalizing && reexecuting tasks
pub inventory_failure_interval_ms: u32,
/// Time interval (in milliseconds) to wait headers from the peer before penalizing && reexecuting tasks
pub headers_failure_interval_ms: u32,
}
impl Default for ManagePeersConfig {
fn default() -> Self {
ManagePeersConfig {
block_failure_interval_ms: DEFAULT_PEER_BLOCK_FAILURE_INTERVAL_MS,
inventory_failure_interval_ms: DEFAULT_PEER_INVENTORY_FAILURE_INTERVAL_MS,
headers_failure_interval_ms: DEFAULT_PEER_HEADERS_FAILURE_INTERVAL_MS,
}
}
}
@ -77,9 +77,10 @@ pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &m
let now = precise_time_s();
// reset tasks for peers, which has not responded during given period
for (worst_peer_index, worst_peer_time) in peers.ordered_blocks_requests() {
let ordered_blocks_requests: Vec<_> = peers.ordered_blocks_requests().clone().into_iter().collect();
for (worst_peer_index, blocks_request) in ordered_blocks_requests {
// check if peer has not responded within given time
let time_diff = now - worst_peer_time;
let time_diff = now - blocks_request.timestamp;
if time_diff <= config.block_failure_interval_ms as f64 / 1000f64 {
break;
}
@ -96,24 +97,26 @@ pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &m
// if peer failed many times => forget it
if peers.on_peer_block_failure(worst_peer_index) {
warn!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index);
peers.unuseful_peer(worst_peer_index);
}
}
(blocks_to_request, blocks_to_forget)
}
/// Manage stalled synchronization peers inventory tasks
pub fn manage_synchronization_peers_inventory(config: &ManagePeersConfig, peers: &mut PeersTasks) {
/// Manage stalled synchronization peers headers tasks
pub fn manage_synchronization_peers_headers(config: &ManagePeersConfig, peers: &mut PeersTasks) {
let now = precise_time_s();
// reset tasks for peers, which has not responded during given period
for (worst_peer_index, worst_peer_time) in peers.ordered_inventory_requests() {
let ordered_headers_requests: Vec<_> = peers.ordered_headers_requests().clone().into_iter().collect();
for (worst_peer_index, headers_request) in ordered_headers_requests {
// check if peer has not responded within given time
let time_diff = now - worst_peer_time;
if time_diff <= config.inventory_failure_interval_ms as f64 / 1000f64 {
let time_diff = now - headers_request.timestamp;
if time_diff <= config.headers_failure_interval_ms as f64 / 1000f64 {
break;
}
peers.on_peer_inventory_failure(worst_peer_index);
peers.on_peer_headers_failure(worst_peer_index);
}
}
@ -198,11 +201,11 @@ mod tests {
#[test]
fn manage_good_peer() {
let config = ManagePeersConfig { block_failure_interval_ms: 1000, ..Default::default() };
let mut peers = PeersTasks::new();
let mut peers = PeersTasks::default();
peers.on_blocks_requested(1, &vec![H256::from(0), H256::from(1)]);
peers.on_block_received(1, &H256::from(0));
assert_eq!(manage_synchronization_peers_blocks(&config, &mut peers), (vec![], vec![]));
assert_eq!(peers.idle_peers_for_blocks(), vec![]);
assert_eq!(peers.idle_peers_for_blocks().len(), 0);
}
#[test]
@ -210,7 +213,7 @@ mod tests {
use std::thread::sleep;
use std::time::Duration;
let config = ManagePeersConfig { block_failure_interval_ms: 0, ..Default::default() };
let mut peers = PeersTasks::new();
let mut peers = PeersTasks::default();
peers.on_blocks_requested(1, &vec![H256::from(0)]);
peers.on_blocks_requested(2, &vec![H256::from(1)]);
sleep(Duration::from_millis(1));

View File

@ -1,35 +1,17 @@
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
use linked_hash_map::LinkedHashMap;
use time::precise_time_s;
use primitives::hash::H256;
use types::PeerIndex;
use utils::AverageSpeedMeter;
/// Max peer failures # before excluding from sync process
const MAX_PEER_FAILURES: usize = 2;
/// Max blocks failures # before forgetiing this block and restarting sync
const MAX_BLOCKS_FAILURES: usize = 6;
/// Set of peers selected for synchronization.
#[derive(Debug)]
pub struct PeersTasks {
/// Peers that are marked as useful for current synchronization session && have no pending requests.
idle: HashSet<PeerIndex>,
/// Peers that are marked as non-useful for current synchronization session && have no pending requests.
unuseful: HashSet<PeerIndex>,
/// # of failures for given peer.
failures: HashMap<PeerIndex, usize>,
/// # of failures for given block.
blocks_failures: HashMap<H256, usize>,
/// Peers that are marked as useful for current synchronization session && have pending blocks requests.
blocks_requests: HashMap<PeerIndex, HashSet<H256>>,
/// Last block message time from peer.
blocks_requests_order: LinkedHashMap<PeerIndex, f64>,
/// Peers that are marked as useful for current synchronization session && have pending requests.
inventory_requests: HashSet<PeerIndex>,
/// Last inventory message time from peer.
inventory_requests_order: LinkedHashMap<PeerIndex, f64>,
}
/// Number of blocks to inspect while calculating average response time
const BLOCKS_TO_INSPECT: usize = 32;
/// Information on synchronization peers
#[cfg(test)]
@ -43,194 +25,233 @@ pub struct Information {
pub active: usize,
}
impl PeersTasks {
pub fn new() -> Self {
PeersTasks {
idle: HashSet::new(),
unuseful: HashSet::new(),
failures: HashMap::new(),
blocks_failures: HashMap::new(),
blocks_requests: HashMap::new(),
blocks_requests_order: LinkedHashMap::new(),
inventory_requests: HashSet::new(),
inventory_requests_order: LinkedHashMap::new(),
}
}
/// Set of peers selected for synchronization.
#[derive(Debug, Default)]
pub struct PeersTasks {
/// All known peers ids
all: HashSet<PeerIndex>,
/// All unuseful peers
unuseful: HashSet<PeerIndex>,
/// All peers without pending headers requests
idle_for_headers: HashSet<PeerIndex>,
/// All peers without pending blocks requests
idle_for_blocks: HashSet<PeerIndex>,
/// Pending headers requests sent to peers
headers_requests: LinkedHashMap<PeerIndex, HeadersRequest>,
/// Pending blocks requests sent to peers
blocks_requests: LinkedHashMap<PeerIndex, BlocksRequest>,
/// Peers statistics
stats: HashMap<PeerIndex, PeerStats>,
/// Blocks statistics
blocks_stats: HashMap<H256, BlockStats>,
}
/// Pending headers request
#[derive(Debug, Clone)]
pub struct HeadersRequest {
/// Time when request has been sent
pub timestamp: f64,
}
/// Pending blocks request
#[derive(Debug, Clone)]
pub struct BlocksRequest {
/// Time when request has been sent
pub timestamp: f64,
/// Hashes of blocks that have been requested
pub blocks: HashSet<H256>,
}
/// Peer statistics
#[derive(Debug, Default)]
struct PeerStats {
/// Number of blocks requests failures
failures: usize,
/// Average block response time meter
speed: AverageSpeedMeter,
}
/// Block statistics
#[derive(Debug, Default)]
struct BlockStats {
/// Number of block request failures
failures: usize,
}
impl PeersTasks {
/// Get information on synchronization peers
#[cfg(test)]
pub fn information(&self) -> Information {
let blocks_requests_peers: HashSet<_> = self.blocks_requests.keys().cloned().collect();
let total_unuseful_peers = self.unuseful.difference(&self.inventory_requests).count();
let total_active_peers = blocks_requests_peers.union(&self.inventory_requests).count();
let active_for_headers: HashSet<_> = self.headers_requests.keys().cloned().collect();
Information {
idle: self.idle.len(),
unuseful: total_unuseful_peers,
active: total_active_peers,
idle: self.idle_for_blocks.difference(&active_for_headers).count(),
unuseful: self.unuseful.len(),
active: active_for_headers.union(&self.blocks_requests.keys().cloned().collect()).count(),
}
}
/// Get all peers
pub fn all_peers(&self) -> Vec<PeerIndex> {
let mut unique: Vec<_> = self.idle.iter().cloned()
.chain(self.unuseful.iter().cloned())
.chain(self.blocks_requests.keys().cloned())
.chain(self.inventory_requests.iter().cloned())
.collect();
// need stable (for tests) && unique peers here, as blocks_requests can intersect with inventory_requests
unique.sort();
unique.dedup();
unique
pub fn all_peers(&self) -> &HashSet<PeerIndex> {
&self.all
}
/// Get useful peers
pub fn useful_peers(&self) -> Vec<PeerIndex> {
let mut unique: Vec<_> = self.idle.iter().cloned()
.chain(self.blocks_requests.keys().cloned())
.chain(self.inventory_requests.iter().cloned())
.collect();
// need stable (for tests) && unique peers here, as blocks_requests can intersect with inventory_requests
unique.sort();
unique.dedup();
unique
self.all.difference(&self.unuseful).cloned().collect()
}
/// Get idle peers for inventory request.
pub fn idle_peers_for_inventory(&self) -> Vec<PeerIndex> {
let peers: HashSet<_> = self.idle.iter().cloned()
.chain(self.blocks_requests.keys().cloned())
.collect();
let except: HashSet<_> = self.inventory_requests.iter().cloned().collect();
peers.difference(&except).cloned().collect()
/// Get idle peers for headers request.
pub fn idle_peers_for_headers(&self) -> &HashSet<PeerIndex> {
&self.idle_for_headers
}
/// Get idle peers for blocks request.
pub fn idle_peers_for_blocks(&self) -> Vec<PeerIndex> {
let peers: HashSet<_> = self.idle.iter().cloned()
.chain(self.inventory_requests.iter().cloned())
.collect();
let except: HashSet<_> = self.blocks_requests.keys().cloned().collect();
peers.difference(&except).cloned().collect()
pub fn idle_peers_for_blocks(&self) -> &HashSet<PeerIndex> {
&self.idle_for_blocks
}
/// Sort peers for blocks request
pub fn sort_peers_for_blocks(&self, peers: &mut Vec<PeerIndex>) {
peers.sort_by(|left, right| {
let left_speed = self.stats.get(&left).map(|s| s.speed.speed()).unwrap_or(0f64);
let right_speed = self.stats.get(&right).map(|s| s.speed.speed()).unwrap_or(0f64);
// larger speed => better
right_speed.partial_cmp(&left_speed).unwrap_or(Ordering::Equal)
})
}
/// Get active headers requests, sorted by last response time (oldest first).
pub fn ordered_headers_requests(&self) -> &LinkedHashMap<PeerIndex, HeadersRequest> {
&self.headers_requests
}
/// Get active blocks requests, sorted by last response time (oldest first).
pub fn ordered_blocks_requests(&self) -> Vec<(PeerIndex, f64)> {
self.blocks_requests_order.iter()
.map(|(&pi, &t)| (pi, t))
.collect()
}
/// Get active inventory requests, sorted by last response time (oldest first).
pub fn ordered_inventory_requests(&self) -> Vec<(PeerIndex, f64)> {
self.inventory_requests_order.iter()
.map(|(&pi, &t)| (pi, t))
.collect()
pub fn ordered_blocks_requests(&self) -> &LinkedHashMap<PeerIndex, BlocksRequest> {
&self.blocks_requests
}
/// Get peer tasks
pub fn get_blocks_tasks(&self, peer_index: PeerIndex) -> Option<HashSet<H256>> {
self.blocks_requests.get(&peer_index).cloned()
pub fn get_blocks_tasks(&self, peer_index: PeerIndex) -> Option<&HashSet<H256>> {
self.blocks_requests
.get(&peer_index)
.map(|br| &br.blocks)
}
/// Mark peer as useful.
pub fn useful_peer(&mut self, peer_index: PeerIndex) {
// if peer is unknown => insert to idle queue
// if peer is known && not useful => insert to idle queue
if !self.idle.contains(&peer_index)
&& !self.blocks_requests.contains_key(&peer_index)
&& !self.inventory_requests.contains(&peer_index) {
self.idle.insert(peer_index);
self.unuseful.remove(&peer_index);
self.failures.remove(&peer_index);
if self.all.insert(peer_index)
|| self.unuseful.remove(&peer_index) {
self.idle_for_headers.insert(peer_index);
self.idle_for_blocks.insert(peer_index);
self.stats.insert(peer_index, PeerStats::new());
}
}
/// Mark peer as unuseful.
pub fn unuseful_peer(&mut self, peer_index: PeerIndex) {
// if peer is unknown => insert to idle queue
// if peer is known && not useful => insert to idle queue
// blocks should be rerequested from another peers
assert!(!self.blocks_requests.contains_key(&peer_index));
assert!(!self.blocks_requests_order.contains_key(&peer_index));
self.idle.remove(&peer_index);
if self.all.insert(peer_index) {
self.stats.insert(peer_index, PeerStats::new());
}
self.unuseful.insert(peer_index);
self.failures.remove(&peer_index);
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
self.idle_for_headers.remove(&peer_index);
self.idle_for_blocks.remove(&peer_index);
}
/// Peer has been disconnected
pub fn disconnect(&mut self, peer_index: PeerIndex) {
// forget this peer without any chances to reuse
self.idle.remove(&peer_index);
// blocks should be rerequested from another peers
assert!(!self.blocks_requests.contains_key(&peer_index));
self.all.remove(&peer_index);
self.unuseful.remove(&peer_index);
self.failures.remove(&peer_index);
self.idle_for_headers.remove(&peer_index);
self.idle_for_blocks.remove(&peer_index);
self.headers_requests.remove(&peer_index);
self.blocks_requests.remove(&peer_index);
self.blocks_requests_order.remove(&peer_index);
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
self.stats.remove(&peer_index);
}
/// Block is received from peer.
pub fn on_block_received(&mut self, peer_index: PeerIndex, block_hash: &H256) {
// forget block failures
self.blocks_failures.remove(block_hash);
// block received => reset failures
self.blocks_stats.remove(&block_hash);
// if this is requested block && it is last requested block => remove from blocks_requests
let try_mark_as_idle = match self.blocks_requests.entry(peer_index) {
Entry::Occupied(mut requests_entry) => {
requests_entry.get_mut().remove(block_hash);
self.blocks_requests_order.remove(&peer_index);
if requests_entry.get().is_empty() {
requests_entry.remove_entry();
true
} else {
self.blocks_requests_order.insert(peer_index, precise_time_s());
false
}
},
_ => false,
let is_last_requested_block_received = if let Some(blocks_request) = self.blocks_requests.get_mut(&peer_index) {
// if block hasn't been requested => do nothing
if !blocks_request.blocks.remove(&block_hash) {
return;
}
blocks_request.blocks.is_empty()
} else {
// this peers hasn't been requested for blocks at all
return;
};
// try to mark as idle
if try_mark_as_idle {
self.try_mark_idle(peer_index);
// it was requested block => update block response time
self.stats.get_mut(&peer_index).map(|br| br.speed.checkpoint());
// if it hasn't been last requested block => just return
if !is_last_requested_block_received {
return;
}
// no more requested blocks => pause requests speed meter
self.stats.get_mut(&peer_index).map(|br| br.speed.stop());
// mark this peer as idle for blocks request
self.blocks_requests.remove(&peer_index);
self.idle_for_blocks.insert(peer_index);
// also mark as available for headers request if not yet
if !self.headers_requests.contains_key(&peer_index) {
self.idle_for_headers.insert(peer_index);
}
}
/// Inventory received from peer.
pub fn on_inventory_received(&mut self, peer_index: PeerIndex) {
// if we have requested inventory => remove from inventory_requests
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
// try to mark as idle
self.try_mark_idle(peer_index);
/// Headers received from peer.
pub fn on_headers_received(&mut self, peer_index: PeerIndex) {
self.headers_requests.remove(&peer_index);
// we only ask for new headers when peer is also not asked for blocks
// => only insert to idle queue if no active blocks requests
if !self.blocks_requests.contains_key(&peer_index) {
self.idle_for_headers.insert(peer_index);
}
}
/// Blocks have been requested from peer.
pub fn on_blocks_requested(&mut self, peer_index: PeerIndex, blocks_hashes: &[H256]) {
// mark peer as active
self.idle.remove(&peer_index);
if !self.all.contains(&peer_index) {
self.unuseful_peer(peer_index);
}
self.unuseful.remove(&peer_index);
self.blocks_requests.entry(peer_index).or_insert_with(HashSet::new).extend(blocks_hashes.iter().cloned());
self.blocks_requests_order.remove(&peer_index);
self.blocks_requests_order.insert(peer_index, precise_time_s());
self.idle_for_blocks.remove(&peer_index);
if !self.blocks_requests.contains_key(&peer_index) {
self.blocks_requests.insert(peer_index, BlocksRequest::new());
}
self.blocks_requests.get_mut(&peer_index)
.expect("inserted one line above")
.blocks.extend(blocks_hashes.iter().cloned());
// no more requested blocks => pause requests speed meter
self.stats.get_mut(&peer_index).map(|br| br.speed.start());
}
/// Inventory has been requested from peer.
pub fn on_inventory_requested(&mut self, peer_index: PeerIndex) {
self.inventory_requests.insert(peer_index);
self.inventory_requests_order.remove(&peer_index);
self.inventory_requests_order.insert(peer_index, precise_time_s());
// mark peer as active
if self.idle.remove(&peer_index) {
self.unuseful.insert(peer_index);
/// Headers hashave been requested from peer.
pub fn on_headers_requested(&mut self, peer_index: PeerIndex) {
if !self.all.contains(&peer_index) {
self.unuseful_peer(peer_index);
}
// peer is now out-of-synchronization process (will not request blocks from him), because:
// 1) if it has new blocks, it will respond with `inventory` message && will be inserted back here
// 2) if it has no new blocks => either synchronization is completed, or it is behind us in sync
self.idle_for_headers.remove(&peer_index);
self.headers_requests.remove(&peer_index);
self.headers_requests.insert(peer_index, HeadersRequest::new());
}
/// We have failed to get blocks
@ -238,20 +259,16 @@ impl PeersTasks {
let mut failed_blocks: Vec<H256> = Vec::new();
let mut normal_blocks: Vec<H256> = Vec::with_capacity(hashes.len());
for hash in hashes {
match self.blocks_failures.entry(hash.clone()) {
Entry::Vacant(entry) => {
normal_blocks.push(hash);
entry.insert(0);
},
Entry::Occupied(mut entry) => {
*entry.get_mut() += 1;
if *entry.get() >= MAX_BLOCKS_FAILURES {
entry.remove();
failed_blocks.push(hash);
} else {
normal_blocks.push(hash);
}
}
let is_failed_block = {
let block_stats = self.blocks_stats.entry(hash.clone()).or_insert(BlockStats::default());
block_stats.failures += 1;
block_stats.failures > MAX_BLOCKS_FAILURES
};
if is_failed_block {
self.blocks_stats.remove(&hash);
failed_blocks.push(hash);
} else {
normal_blocks.push(hash);
}
}
@ -260,78 +277,77 @@ impl PeersTasks {
/// We have failed to get block from peer during given period
pub fn on_peer_block_failure(&mut self, peer_index: PeerIndex) -> bool {
let peer_failures = match self.failures.entry(peer_index) {
Entry::Occupied(mut entry) => {
let failures = entry.get() + 1;
entry.insert(failures);
failures
},
Entry::Vacant(entry) => *entry.insert(1),
};
let too_much_failures = peer_failures >= MAX_PEER_FAILURES;
if too_much_failures {
self.idle.remove(&peer_index);
self.unuseful.insert(peer_index);
self.failures.remove(&peer_index);
self.blocks_requests.remove(&peer_index);
self.blocks_requests_order.remove(&peer_index);
}
too_much_failures
self.stats.get_mut(&peer_index)
.map(|s| {
s.failures += 1;
s.failures > MAX_PEER_FAILURES
})
.unwrap_or_default()
}
/// We have failed to get inventory from peer during given period
pub fn on_peer_inventory_failure(&mut self, peer_index: PeerIndex) {
// ignore inventory failures
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
if !self.blocks_requests.contains_key(&peer_index) {
self.idle.insert(peer_index);
self.unuseful.remove(&peer_index);
}
/// We have failed to get headers from peer during given period
pub fn on_peer_headers_failure(&mut self, peer_index: PeerIndex) {
// we never penalize peers for header requests failures
self.headers_requests.remove(&peer_index);
self.idle_for_headers.insert(peer_index);
}
/// Reset all peers state to the unuseful
pub fn reset(&mut self) {
self.unuseful.extend(self.idle.drain());
self.unuseful.extend(self.blocks_requests.drain().map(|(k, _)| k));
self.unuseful.extend(self.inventory_requests.drain());
self.failures.clear();
self.inventory_requests_order.clear();
self.blocks_requests_order.clear();
self.unuseful.clear();
self.unuseful.extend(self.all.iter().cloned());
self.idle_for_headers.clear();
self.idle_for_blocks.clear();
self.headers_requests.clear();
self.blocks_requests.clear();
}
/// Reset peer tasks && move peer to idle state
pub fn reset_blocks_tasks(&mut self, peer_index: PeerIndex) -> Vec<H256> {
let requests = self.blocks_requests.remove(&peer_index);
self.blocks_requests_order.remove(&peer_index);
self.try_mark_idle(peer_index);
requests.unwrap_or_default().into_iter().collect()
self.idle_for_blocks.insert(peer_index);
self.blocks_requests.remove(&peer_index)
.map(|mut br| br.blocks.drain().collect())
.unwrap_or_default()
}
}
/// Try to mark peer as idle
fn try_mark_idle(&mut self, peer_index: PeerIndex) {
if self.blocks_requests.contains_key(&peer_index)
|| self.inventory_requests.contains(&peer_index) {
return;
impl HeadersRequest {
pub fn new() -> Self {
HeadersRequest {
timestamp: precise_time_s(),
}
}
}
self.idle.insert(peer_index);
self.unuseful.remove(&peer_index);
impl BlocksRequest {
pub fn new() -> Self {
BlocksRequest {
timestamp: precise_time_s(),
blocks: HashSet::new(),
}
}
}
impl PeerStats {
pub fn new() -> Self {
PeerStats {
failures: 0,
speed: AverageSpeedMeter::with_inspect_items(BLOCKS_TO_INSPECT),
}
}
}
#[cfg(test)]
mod tests {
use super::{PeersTasks, MAX_PEER_FAILURES, MAX_BLOCKS_FAILURES};
use primitives::hash::H256;
use super::{PeersTasks, MAX_PEER_FAILURES, MAX_BLOCKS_FAILURES};
use types::PeerIndex;
#[test]
fn peers_empty_on_start() {
let peers = PeersTasks::new();
assert_eq!(peers.idle_peers_for_blocks(), vec![]);
assert_eq!(peers.idle_peers_for_inventory(), vec![]);
let peers = PeersTasks::default();
assert_eq!(peers.idle_peers_for_blocks().len(), 0);
assert_eq!(peers.idle_peers_for_headers().len(), 0);
let info = peers.information();
assert_eq!(info.idle, 0);
@ -340,7 +356,7 @@ mod tests {
#[test]
fn peers_all_unuseful_after_reset() {
let mut peers = PeersTasks::new();
let mut peers = PeersTasks::default();
peers.on_blocks_requested(7, &vec![H256::default()]);
peers.on_blocks_requested(8, &vec![H256::default()]);
assert_eq!(peers.information().idle, 0);
@ -354,7 +370,7 @@ mod tests {
#[test]
fn peer_idle_after_reset_tasks() {
let mut peers = PeersTasks::new();
let mut peers = PeersTasks::default();
peers.on_blocks_requested(7, &vec![H256::default()]);
assert_eq!(peers.information().idle, 0);
assert_eq!(peers.information().unuseful, 0);
@ -366,14 +382,14 @@ mod tests {
}
#[test]
fn peers_active_after_inventory_request() {
let mut peers = PeersTasks::new();
fn peers_active_after_headers_request() {
let mut peers = PeersTasks::default();
peers.useful_peer(5);
peers.useful_peer(7);
assert_eq!(peers.information().idle, 2);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 0);
peers.on_inventory_requested(5);
peers.on_headers_requested(5);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 1);
@ -381,37 +397,39 @@ mod tests {
#[test]
fn peers_insert_remove_idle() {
let mut peers = PeersTasks::new();
let mut peers = PeersTasks::default();
peers.useful_peer(0);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 0);
assert_eq!(peers.idle_peers_for_blocks(), vec![0]);
assert_eq!(peers.idle_peers_for_blocks().len(), 1);
assert!(peers.idle_peers_for_blocks().contains(&0));
peers.useful_peer(5);
assert_eq!(peers.information().idle, 2);
assert_eq!(peers.information().active, 0);
let idle_peers = peers.idle_peers_for_blocks();
let idle_peers: Vec<_> = peers.idle_peers_for_blocks().iter().cloned().collect();
assert!(idle_peers[0] == 0 || idle_peers[0] == 5);
assert!(idle_peers[1] == 0 || idle_peers[1] == 5);
peers.disconnect(7);
assert_eq!(peers.information().idle, 2);
assert_eq!(peers.information().active, 0);
let idle_peers = peers.idle_peers_for_blocks();
let idle_peers: Vec<_> = peers.idle_peers_for_blocks().iter().cloned().collect();
assert!(idle_peers[0] == 0 || idle_peers[0] == 5);
assert!(idle_peers[1] == 0 || idle_peers[1] == 5);
peers.disconnect(0);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().active, 0);
assert_eq!(peers.idle_peers_for_blocks(), vec![5]);
assert_eq!(peers.idle_peers_for_blocks().len(), 1);
assert!(peers.idle_peers_for_blocks().contains(&5));
}
#[test]
fn peers_request_blocks() {
let mut peers = PeersTasks::new();
let mut peers = PeersTasks::default();
peers.useful_peer(5);
@ -448,20 +466,20 @@ mod tests {
#[test]
fn peers_worst() {
let mut peers = PeersTasks::new();
let mut peers = PeersTasks::default();
peers.useful_peer(1);
peers.useful_peer(2);
assert_eq!(peers.ordered_blocks_requests(), vec![]);
assert_eq!(peers.ordered_blocks_requests().len(), 0);
peers.on_blocks_requested(1, &vec![H256::default()]);
assert_eq!(peers.ordered_blocks_requests().len(), 1);
assert_eq!(peers.ordered_blocks_requests()[0].0, 1);
assert_eq!(*peers.ordered_blocks_requests().keys().nth(0).unwrap(), 1);
peers.on_blocks_requested(2, &vec![H256::default()]);
assert_eq!(peers.ordered_blocks_requests().len(), 2);
assert_eq!(peers.ordered_blocks_requests()[0].0, 1);
assert_eq!(peers.ordered_blocks_requests()[1].0, 2);
assert_eq!(*peers.ordered_blocks_requests().keys().nth(0).unwrap(), 1);
assert_eq!(*peers.ordered_blocks_requests().keys().nth(1).unwrap(), 2);
assert_eq!(peers.information().idle, 0);
assert_eq!(peers.information().unuseful, 0);
@ -474,10 +492,13 @@ mod tests {
assert_eq!(peers.information().active, 1);
assert_eq!(peers.ordered_blocks_requests().len(), 1);
assert_eq!(peers.ordered_blocks_requests()[0].0, 2);
assert_eq!(*peers.ordered_blocks_requests().keys().nth(0).unwrap(), 2);
for _ in 0..MAX_PEER_FAILURES {
peers.on_peer_block_failure(2);
for _ in 0..(MAX_PEER_FAILURES + 1) {
if peers.on_peer_block_failure(2) {
peers.reset_blocks_tasks(2);
peers.unuseful_peer(2);
}
}
assert_eq!(peers.ordered_blocks_requests().len(), 0);
@ -488,7 +509,7 @@ mod tests {
#[test]
fn peer_not_inserted_when_known() {
let mut peers = PeersTasks::new();
let mut peers = PeersTasks::default();
peers.useful_peer(1);
peers.useful_peer(1);
assert_eq!(peers.information().active + peers.information().idle + peers.information().unuseful, 1);
@ -496,7 +517,10 @@ mod tests {
peers.useful_peer(1);
assert_eq!(peers.information().active + peers.information().idle + peers.information().unuseful, 1);
for _ in 0..MAX_PEER_FAILURES {
peers.on_peer_block_failure(1);
if peers.on_peer_block_failure(1) {
peers.reset_blocks_tasks(1);
peers.unuseful_peer(1);
}
}
peers.useful_peer(1);
assert_eq!(peers.information().active + peers.information().idle + peers.information().unuseful, 1);
@ -504,7 +528,7 @@ mod tests {
#[test]
fn peer_block_failures() {
let mut peers = PeersTasks::new();
let mut peers = PeersTasks::default();
peers.useful_peer(1);
peers.on_blocks_requested(1, &vec![H256::from(1)]);
for _ in 0..MAX_BLOCKS_FAILURES {
@ -519,4 +543,25 @@ mod tests {
assert_eq!(blocks_to_request, vec![]);
assert_eq!(blocks_to_forget, vec![H256::from(1)]);
}
#[test]
fn peer_sort_peers_for_blocks() {
let mut peers = PeersTasks::default();
peers.on_blocks_requested(1, &vec![H256::from(1), H256::from(2)]);
peers.on_blocks_requested(2, &vec![H256::from(3), H256::from(4)]);
peers.on_block_received(2, &H256::from(3));
peers.on_block_received(2, &H256::from(4));
use std::thread;
use std::time::Duration;
thread::park_timeout(Duration::from_millis(50));
peers.on_block_received(1, &H256::from(1));
peers.on_block_received(1, &H256::from(2));
let mut peers_for_blocks: Vec<PeerIndex> = vec![1, 2];
peers.sort_peers_for_blocks(&mut peers_for_blocks);
assert_eq!(peers_for_blocks[0], 2);
assert_eq!(peers_for_blocks[1], 1);
}
}

View File

@ -2,6 +2,7 @@ use std::collections::VecDeque;
use time;
/// Speed meter with given items number
#[derive(Debug, Default)]
pub struct AverageSpeedMeter {
/// Number of items to inspect
inspect_items: usize,
@ -49,4 +50,12 @@ impl AverageSpeedMeter {
}
self.last_timestamp = Some(now);
}
pub fn start(&mut self) {
self.last_timestamp = Some(time::precise_time_s());
}
pub fn stop(&mut self) {
self.last_timestamp = None;
}
}