Merge pull request #439 from paritytech/sync_blocks_policy

Synchronization fixes
This commit is contained in:
Svyatoslav Nikolsky 2017-08-14 00:06:43 +03:00 committed by GitHub
commit a2ae59c0e3
11 changed files with 203 additions and 63 deletions

View File

@ -100,6 +100,14 @@ impl Version {
Version::V70001(_, _, ref v) => v.relay,
}
}
pub fn user_agent(&self) -> Option<String> {
match *self {
Version::V0(_) => None,
Version::V106(_, ref v) |
Version::V70001(_, ref v, _) => Some(v.user_agent.clone()),
}
}
}
#[derive(Debug, Default, PartialEq, Clone)]

View File

@ -53,6 +53,7 @@ impl Connections {
let peer_info = PeerInfo {
id: id,
address: connection.address,
user_agent: connection.version_message.user_agent().unwrap_or("unknown".into()),
direction: direction,
version: connection.version,
version_message: connection.version_message,

View File

@ -89,6 +89,12 @@ impl Context {
self.node_table.write().insert_many(nodes);
}
/// Penalize node.
pub fn penalize_node(&self, addr: &SocketAddr) {
trace!("Penalizing node {}", addr);
self.node_table.write().note_failure(addr);
}
/// Adds node to table.
pub fn add_node(&self, addr: SocketAddr) -> Result<(), NodeTableError> {
trace!("Adding node {} to node table", &addr);

View File

@ -13,7 +13,7 @@ pub trait LocalSyncNode : Send + Sync {
}
pub trait InboundSyncConnection : Send + Sync {
fn start_sync_session(&self, version: types::Version);
fn start_sync_session(&self, peer_name: String, version: types::Version);
fn close_session(&self);
fn on_inventory(&self, message: types::Inv);
fn on_getdata(&self, message: types::GetData);
@ -159,6 +159,7 @@ impl OutboundSyncConnection for OutboundSync {
}
fn close(&self) {
self.context.global().penalize_node(&self.context.info().address);
self.context.close()
}
}
@ -181,7 +182,11 @@ impl SyncProtocol {
impl Protocol for SyncProtocol {
fn initialize(&mut self) {
self.inbound_connection.start_sync_session(self.context.info().version_message.clone());
let info = self.context.info();
self.inbound_connection.start_sync_session(
format!("{}/{}", info.address, info.user_agent),
info.version_message.clone()
);
}
fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> {

View File

@ -14,6 +14,7 @@ pub enum Direction {
pub struct PeerInfo {
pub id: PeerId,
pub address: SocketAddr,
pub user_agent: String,
pub direction: Direction,
pub version: u32,
pub version_message: types::Version,

View File

@ -31,8 +31,8 @@ impl InboundConnection {
}
impl InboundSyncConnection for InboundConnection {
fn start_sync_session(&self, version: types::Version) {
self.node.on_connect(self.peer_index, version);
fn start_sync_session(&self, peer_name: String, version: types::Version) {
self.node.on_connect(self.peer_index, peer_name, version);
}
fn close_session(&self) {

View File

@ -65,8 +65,8 @@ impl<T, U, V> LocalNode<T, U, V> where T: TaskExecutor, U: Server, V: Client {
}
/// When new peer connects to the node
pub fn on_connect(&self, peer_index: PeerIndex, version: types::Version) {
trace!(target: "sync", "Starting new sync session with peer#{}", peer_index);
pub fn on_connect(&self, peer_index: PeerIndex, peer_name: String, version: types::Version) {
trace!(target: "sync", "Starting new sync session with peer#{}: {}", peer_index, peer_name);
// light clients may not want transactions broadcasting until filter for connection is set
if !version.relay_transactions() {
@ -389,7 +389,7 @@ pub mod tests {
#[test]
fn local_node_serves_block() {
let (_, server, local_node) = create_local_node(None);
let peer_index = 0; local_node.on_connect(peer_index, types::Version::default());
let peer_index = 0; local_node.on_connect(peer_index, "test".into(), types::Version::default());
// peer requests genesis block
let genesis_block_hash = test_data::genesis().hash();
let inventory = vec![
@ -411,7 +411,7 @@ pub mod tests {
let (executor, _, local_node) = create_local_node(None);
// transaction will be relayed to this peer
let peer_index1 = 0; local_node.on_connect(peer_index1, types::Version::default());
let peer_index1 = 0; local_node.on_connect(peer_index1, "test".into(), types::Version::default());
executor.take_tasks();
let genesis = test_data::genesis();
@ -436,7 +436,7 @@ pub mod tests {
let (executor, _, local_node) = create_local_node(Some(verifier));
let peer_index1 = 0; local_node.on_connect(peer_index1, types::Version::default());
let peer_index1 = 0; local_node.on_connect(peer_index1, "test".into(), types::Version::default());
executor.take_tasks();
let result = local_node.accept_transaction(transaction);

View File

@ -681,7 +681,7 @@ impl db::BlockHeaderProvider for Chain {
impl fmt::Debug for Information {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[sch:{} / bh:{} -> req:{} -> vfy:{} -> stored: {}]", self.scheduled, self.headers.best, self.requested, self.verifying, self.stored)
write!(f, "[sch:{} -> req:{} -> vfy:{} -> stored: {}]", self.scheduled, self.requested, self.verifying, self.stored)
}
}

View File

@ -5,6 +5,7 @@ use std::sync::Arc;
use futures::Future;
use parking_lot::Mutex;
use time;
use time::precise_time_s;
use chain::{IndexedBlockHeader, IndexedTransaction, Transaction, IndexedBlock};
use message::types;
use message::common::{InventoryType, InventoryVector};
@ -19,7 +20,6 @@ use synchronization_peers_tasks::PeersTasks;
use synchronization_verifier::{VerificationSink, BlockVerificationSink, TransactionVerificationSink, VerificationTask};
use types::{BlockHeight, ClientCoreRef, PeersRef, PeerIndex, SynchronizationStateRef, EmptyBoxFuture, SyncListenerRef};
use utils::{AverageSpeedMeter, MessageBlockHeadersProvider, OrphanBlocksPool, OrphanTransactionsPool, HashPosition};
#[cfg(test)] use synchronization_peers::Peers;
#[cfg(test)] use synchronization_peers_tasks::{Information as PeersTasksInformation};
#[cfg(test)] use synchronization_chain::{Information as ChainInformation};
@ -41,6 +41,12 @@ const NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S: f64 = 20_f64;
const SYNC_SPEED_BLOCKS_TO_INSPECT: usize = 512;
/// Number of blocks to inspect when calculating average blocks speed
const BLOCKS_SPEED_BLOCKS_TO_INSPECT: usize = 512;
/// Minimal time between duplicated blocks requests.
const MIN_BLOCK_DUPLICATION_INTERVAL_S: f64 = 10_f64;
/// Maximal number of blocks in duplicate requests.
const MAX_BLOCKS_IN_DUPLICATE_REQUEST: BlockHeight = 4;
/// Minimal number of blocks in duplicate requests.
const MIN_BLOCKS_IN_DUPLICATE_REQUEST: BlockHeight = 8;
/// Information on current synchronization state.
#[cfg(test)]
@ -123,6 +129,8 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
config: Config,
/// Synchronization events listener
listener: Option<SyncListenerRef>,
/// Time of last duplicated blocks request.
last_dup_time: f64,
}
/// Verification sink for synchronization client core
@ -142,6 +150,20 @@ pub enum State {
Saturated,
}
/// Blocks request limits.
pub struct BlocksRequestLimits {
/// Approximate maximal number of blocks hashes in scheduled queue.
pub max_scheduled_hashes: BlockHeight,
/// Approximate maximal number of blocks hashes in requested queue.
pub max_requested_blocks: BlockHeight,
/// Approximate maximal number of blocks in verifying queue.
pub max_verifying_blocks: BlockHeight,
/// Minimum number of blocks to request from peer
pub min_blocks_in_request: BlockHeight,
/// Maximum number of blocks to request from peer
pub max_blocks_in_request: BlockHeight,
}
/// Transaction append error
enum AppendTransactionError {
Synchronizing,
@ -189,6 +211,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
self.executor.execute(Task::GetHeaders(peer_index, types::GetHeaders::with_block_locator_hashes(block_locator_hashes)));
// unuseful until respond with headers message
self.peers_tasks.unuseful_peer(peer_index);
self.peers_tasks.on_headers_requested(peer_index);
}
fn on_disconnect(&mut self, peer_index: PeerIndex) {
@ -467,6 +490,9 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
// for now, let's exclude peer from synchronization - we are relying on full nodes for synchronization
let removed_tasks = self.peers_tasks.reset_blocks_tasks(peer_index);
self.peers_tasks.unuseful_peer(peer_index);
if self.state.is_synchronizing() {
self.peers.misbehaving(peer_index, &format!("Responded with NotFound(unrequested_block)"));
}
// if peer has had some blocks tasks, rerequest these blocks
self.execute_synchronization_tasks(Some(removed_tasks), None);
@ -516,7 +542,15 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
// display information if processed many blocks || enough time has passed since sync start
self.print_synchronization_information();
// prepare limits. TODO: must be updated using current retrieval && verification speed && blocks size
let mut limits = BlocksRequestLimits::default();
if self.chain.length_of_blocks_state(BlockState::Stored) > 150_000 {
limits.min_blocks_in_request = 8;
limits.max_blocks_in_request = 16;
}
// if some blocks requests are forced => we should ask peers even if there are no idle peers
let verifying_hashes_len = self.chain.length_of_blocks_state(BlockState::Verifying);
if let Some(forced_blocks_requests) = forced_blocks_requests {
let useful_peers = self.peers_tasks.useful_peers();
// if we have to request blocks && there are no useful peers at all => switch to saturated state
@ -526,7 +560,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
return;
}
let forced_tasks = self.prepare_blocks_requests_tasks(useful_peers, forced_blocks_requests);
let forced_tasks = self.prepare_blocks_requests_tasks(&limits, useful_peers, forced_blocks_requests);
tasks.extend(forced_tasks);
}
@ -534,7 +568,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
if let Some(final_blocks_requests) = final_blocks_requests {
let useful_peers = self.peers_tasks.useful_peers();
if !useful_peers.is_empty() { // if empty => not a problem, just forget these blocks
let forced_tasks = self.prepare_blocks_requests_tasks(useful_peers, final_blocks_requests);
let forced_tasks = self.prepare_blocks_requests_tasks(&limits, useful_peers, final_blocks_requests);
tasks.extend(forced_tasks);
}
}
@ -575,7 +609,6 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
// => we could ask idle peer2 about [B1, B2, B3, B4]
// these requests has priority over new blocks requests below
let requested_hashes_len = self.chain.length_of_blocks_state(BlockState::Requested);
let verifying_hashes_len = self.chain.length_of_blocks_state(BlockState::Verifying);
if requested_hashes_len != 0 {
let verification_speed: f64 = self.block_speed_meter.speed();
let synchronization_speed: f64 = self.sync_speed_meter.speed();
@ -609,15 +642,19 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
// if verification queue will be empty before all synchronization requests will be completed
// + do not spam with duplicated blocks requests if blocks are too big && there are still blocks left for NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S
// => duplicate blocks requests
let now = precise_time_s();
if synchronization_queue_will_be_full_in > verification_queue_will_be_empty_in &&
verification_queue_will_be_empty_in < NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S {
verification_queue_will_be_empty_in < NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S &&
now - self.last_dup_time > MIN_BLOCK_DUPLICATION_INTERVAL_S {
// do not duplicate too often
self.last_dup_time = now;
// blocks / second * second -> blocks
let hashes_requests_to_duplicate_len = synchronization_speed * (synchronization_queue_will_be_full_in - verification_queue_will_be_empty_in);
let hashes_requests_to_duplicate_len = (synchronization_speed * (synchronization_queue_will_be_full_in - verification_queue_will_be_empty_in)) as BlockHeight;
// do not ask for too many blocks
let hashes_requests_to_duplicate_len = min(MAX_BLOCKS_IN_REQUEST, hashes_requests_to_duplicate_len as BlockHeight);
let hashes_requests_to_duplicate_len = min(MAX_BLOCKS_IN_DUPLICATE_REQUEST, hashes_requests_to_duplicate_len);
// ask for at least 1 block
let hashes_requests_to_duplicate_len = max(1, min(requested_hashes_len, hashes_requests_to_duplicate_len));
blocks_requests = Some(self.chain.best_n_of_blocks_state(BlockState::Requested, hashes_requests_to_duplicate_len));
let hashes_requests_to_duplicate_len = max(MIN_BLOCKS_IN_DUPLICATE_REQUEST, min(requested_hashes_len, hashes_requests_to_duplicate_len));
blocks_requests = Some(self.chain.best_n_of_blocks_state(BlockState::Requested, hashes_requests_to_duplicate_len as BlockHeight));
trace!(target: "sync", "Duplicating {} blocks requests. Sync speed: {} * {}, blocks speed: {} * {}.", hashes_requests_to_duplicate_len, synchronization_speed, requested_hashes_len, verification_speed, verifying_hashes_len);
}
@ -625,9 +662,10 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
// check if we can move some blocks from scheduled to requested queue
{
// TODO: only request minimal number of blocks, if other urgent blocks are requested
let scheduled_hashes_len = self.chain.length_of_blocks_state(BlockState::Scheduled);
if requested_hashes_len + verifying_hashes_len < MAX_REQUESTED_BLOCKS + MAX_VERIFYING_BLOCKS && scheduled_hashes_len != 0 {
let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(scheduled_hashes_len / blocks_idle_peers_len, MIN_BLOCKS_IN_REQUEST));
let chunk_size = min(limits.max_blocks_in_request, max(scheduled_hashes_len / blocks_idle_peers_len, limits.min_blocks_in_request));
let hashes_to_request_len = chunk_size * blocks_idle_peers_len;
let hashes_to_request = self.chain.request_blocks_hashes(hashes_to_request_len);
match blocks_requests {
@ -641,7 +679,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
// append blocks requests tasks
if let Some(blocks_requests) = blocks_requests {
tasks.extend(self.prepare_blocks_requests_tasks(blocks_idle_peers, blocks_requests));
tasks.extend(self.prepare_blocks_requests_tasks(&limits, blocks_idle_peers, blocks_requests));
}
// execute synchronization tasks
@ -724,6 +762,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT),
config: config,
listener: None,
last_dup_time: 0f64,
}
));
@ -758,6 +797,11 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
&mut self.chain
}
/// Return peers reference
pub fn peers(&self) -> PeersRef {
self.peers.clone()
}
/// Return peers tasks reference
pub fn peers_tasks(&mut self) -> &mut PeersTasks {
&mut self.peers_tasks
@ -779,12 +823,6 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
self.verify_headers = verify;
}
/// Return peers reference
#[cfg(test)]
pub fn peers(&mut self) -> &Peers {
&*self.peers
}
/// Print synchronization information
pub fn print_synchronization_information(&mut self) {
if let State::Synchronizing(timestamp, num_of_blocks) = self.state {
@ -796,9 +834,10 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
self.state = State::Synchronizing(time::precise_time_s(), new_num_of_blocks);
use time;
info!(target: "sync", "{:?} @ Processed {} blocks in {} seconds. Chain information: {:?}"
info!(target: "sync", "{:?} Processed {} blocks in {:.2} seconds. Peers: {:?}. Chain: {:?}"
, time::strftime("%H:%M:%S", &time::now()).unwrap()
, blocks_diff, timestamp_diff
, self.peers_tasks.information()
, self.chain.information());
}
}
@ -904,13 +943,13 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
Ok(transactions)
}
fn prepare_blocks_requests_tasks(&mut self, mut peers: Vec<PeerIndex>, mut hashes: Vec<H256>) -> Vec<Task> {
fn prepare_blocks_requests_tasks(&mut self, limits: &BlocksRequestLimits, mut peers: Vec<PeerIndex>, mut hashes: Vec<H256>) -> Vec<Task> {
use std::mem::swap;
// ask fastest peers for hashes at the beginning of `hashes`
self.peers_tasks.sort_peers_for_blocks(&mut peers);
let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(hashes.len() as BlockHeight, MIN_BLOCKS_IN_REQUEST));
let chunk_size = min(limits.max_blocks_in_request, max(hashes.len() as BlockHeight, limits.min_blocks_in_request));
let last_peer_index = peers.len() - 1;
let mut tasks: Vec<Task> = Vec::new();
for (peer_index, peer) in peers.into_iter().enumerate() {
@ -1172,6 +1211,18 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
}
}
impl Default for BlocksRequestLimits {
fn default() -> Self {
BlocksRequestLimits {
max_scheduled_hashes: MAX_SCHEDULED_HASHES,
max_requested_blocks: MAX_REQUESTED_BLOCKS,
max_verifying_blocks: MAX_VERIFYING_BLOCKS,
min_blocks_in_request: MIN_BLOCKS_IN_REQUEST,
max_blocks_in_request: MAX_BLOCKS_IN_REQUEST,
}
}
}
#[cfg(test)]
pub mod tests {
extern crate test_data;

View File

@ -7,15 +7,20 @@ use time::precise_time_s;
use primitives::hash::H256;
use synchronization_client_core::{ClientCore, SynchronizationClientCore};
use synchronization_executor::TaskExecutor;
use synchronization_peers_tasks::PeersTasks;
use synchronization_peers_tasks::{PeersTasks, TrustLevel};
use utils::{OrphanBlocksPool, OrphanTransactionsPool};
use types::PeersRef;
/// Management interval (in ms)
const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000;
/// Response time before getting block to decrease peer score
const DEFAULT_PEER_BLOCK_FAILURE_INTERVAL_MS: u32 = 60 * 1000;
const DEFAULT_NEW_PEER_BLOCK_FAILURE_INTERVAL_MS: u32 = 5 * 1000;
/// Response time before getting headers to decrease peer score
const DEFAULT_PEER_HEADERS_FAILURE_INTERVAL_MS: u32 = 60 * 1000;
const DEFAULT_NEW_PEER_HEADERS_FAILURE_INTERVAL_MS: u32 = 5 * 1000;
/// Response time before getting block to decrease peer score
const DEFAULT_TRUSTED_PEER_BLOCK_FAILURE_INTERVAL_MS: u32 = 20 * 1000;
/// Response time before getting headers to decrease peer score
const DEFAULT_TRUSTED_PEER_HEADERS_FAILURE_INTERVAL_MS: u32 = 20 * 1000;
/// Unknown orphan block removal time
const DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS: u32 = 20 * 60 * 1000;
/// Maximal number of orphaned blocks
@ -81,14 +86,14 @@ impl ManagementWorker {
core.print_synchronization_information();
// execute management tasks if not saturated
if core.state().is_synchronizing() || core.state().is_nearly_saturated() {
let (blocks_to_request, blocks_to_forget) = manage_synchronization_peers_blocks(&peers_config, core.peers_tasks());
let (blocks_to_request, blocks_to_forget) = manage_synchronization_peers_blocks(&peers_config, core.peers(), core.peers_tasks());
core.forget_failed_blocks(&blocks_to_forget);
core.execute_synchronization_tasks(
if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) },
if blocks_to_forget.is_empty() { None } else { Some(blocks_to_forget) },
);
manage_synchronization_peers_headers(&peers_config, core.peers_tasks());
manage_synchronization_peers_headers(&peers_config, core.peers(), core.peers_tasks());
manage_orphaned_transactions(&orphan_config, core.orphaned_transactions_pool());
if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, core.orphaned_blocks_pool()) {
for orphan_to_remove in orphans_to_remove {
@ -115,17 +120,22 @@ impl Drop for ManagementWorker {
/// Peers management configuration
pub struct ManagePeersConfig {
/// Time interval (in milliseconds) to wait block from the peer before penalizing && reexecuting tasks
pub block_failure_interval_ms: u32,
pub new_block_failure_interval_ms: u32,
/// Time interval (in milliseconds) to wait headers from the peer before penalizing && reexecuting tasks
pub headers_failure_interval_ms: u32,
pub new_headers_failure_interval_ms: u32,
/// Time interval (in milliseconds) to wait block from the peer before penalizing && reexecuting tasks
pub trusted_block_failure_interval_ms: u32,
/// Time interval (in milliseconds) to wait headers from the peer before penalizing && reexecuting tasks
pub trusted_headers_failure_interval_ms: u32,
}
impl Default for ManagePeersConfig {
fn default() -> Self {
ManagePeersConfig {
block_failure_interval_ms: DEFAULT_PEER_BLOCK_FAILURE_INTERVAL_MS,
headers_failure_interval_ms: DEFAULT_PEER_HEADERS_FAILURE_INTERVAL_MS,
new_block_failure_interval_ms: DEFAULT_NEW_PEER_BLOCK_FAILURE_INTERVAL_MS,
new_headers_failure_interval_ms: DEFAULT_NEW_PEER_HEADERS_FAILURE_INTERVAL_MS,
trusted_block_failure_interval_ms: DEFAULT_TRUSTED_PEER_BLOCK_FAILURE_INTERVAL_MS,
trusted_headers_failure_interval_ms: DEFAULT_TRUSTED_PEER_HEADERS_FAILURE_INTERVAL_MS,
}
}
}
@ -165,33 +175,36 @@ impl Default for ManageOrphanTransactionsConfig {
}
/// Manage stalled synchronization peers blocks tasks
pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &mut PeersTasks) -> (Vec<H256>, Vec<H256>) {
pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: PeersRef, peers_tasks: &mut PeersTasks) -> (Vec<H256>, Vec<H256>) {
let mut blocks_to_request: Vec<H256> = Vec::new();
let mut blocks_to_forget: Vec<H256> = Vec::new();
let now = precise_time_s();
// reset tasks for peers, which has not responded during given period
let ordered_blocks_requests: Vec<_> = peers.ordered_blocks_requests().clone().into_iter().collect();
let ordered_blocks_requests: Vec<_> = peers_tasks.ordered_blocks_requests().clone().into_iter().collect();
for (worst_peer_index, blocks_request) in ordered_blocks_requests {
// check if peer has not responded within given time
let is_trusted = peers_tasks.get_peer_stats(worst_peer_index).map(|s| s.trust() == TrustLevel::Trusted).unwrap_or(false);
let block_failure_interval = if is_trusted { config.trusted_block_failure_interval_ms } else { config.new_block_failure_interval_ms };
let time_diff = now - blocks_request.timestamp;
if time_diff <= config.block_failure_interval_ms as f64 / 1000f64 {
if time_diff <= block_failure_interval as f64 / 1000f64 {
break;
}
// decrease score && move to the idle queue
warn!(target: "sync", "Failed to get requested block from peer#{} in {} seconds", worst_peer_index, time_diff);
let failed_blocks = peers.reset_blocks_tasks(worst_peer_index);
let failed_blocks = peers_tasks.reset_blocks_tasks(worst_peer_index);
// mark blocks as failed
let (normal_blocks, failed_blocks) = peers.on_blocks_failure(failed_blocks);
let (normal_blocks, failed_blocks) = peers_tasks.on_blocks_failure(failed_blocks);
blocks_to_request.extend(normal_blocks);
blocks_to_forget.extend(failed_blocks);
// if peer failed many times => forget it
if peers.on_peer_block_failure(worst_peer_index) {
if peers_tasks.on_peer_block_failure(worst_peer_index) {
warn!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index);
peers.unuseful_peer(worst_peer_index);
peers_tasks.unuseful_peer(worst_peer_index);
peers.misbehaving(worst_peer_index, &format!("Too many failures"));
}
}
@ -199,18 +212,29 @@ pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &m
}
/// Manage stalled synchronization peers headers tasks
pub fn manage_synchronization_peers_headers(config: &ManagePeersConfig, peers: &mut PeersTasks) {
pub fn manage_synchronization_peers_headers(config: &ManagePeersConfig, peers: PeersRef, peers_tasks: &mut PeersTasks) {
let now = precise_time_s();
// reset tasks for peers, which has not responded during given period
let ordered_headers_requests: Vec<_> = peers.ordered_headers_requests().clone().into_iter().collect();
let ordered_headers_requests: Vec<_> = peers_tasks.ordered_headers_requests().clone().into_iter().collect();
for (worst_peer_index, headers_request) in ordered_headers_requests {
// check if peer has not responded within given time
let is_trusted = peers_tasks.get_peer_stats(worst_peer_index).map(|s| s.trust() == TrustLevel::Trusted).unwrap_or(false);
let headers_failure_interval = if is_trusted { config.trusted_headers_failure_interval_ms } else { config.new_headers_failure_interval_ms };
let time_diff = now - headers_request.timestamp;
if time_diff <= config.headers_failure_interval_ms as f64 / 1000f64 {
if time_diff <= headers_failure_interval as f64 / 1000f64 {
break;
}
peers.on_peer_headers_failure(worst_peer_index);
// do not penalize peer if it has pending blocks tasks
if peers_tasks.get_blocks_tasks(worst_peer_index).map(|t| !t.is_empty()).unwrap_or(false) {
continue;
}
// if peer failed many times => forget it
if peers_tasks.on_peer_headers_failure(worst_peer_index) {
warn!(target: "sync", "Too many headers failures for peer#{}. Excluding from synchronization", worst_peer_index);
peers.misbehaving(worst_peer_index, &format!("Too many headers failures"));
}
}
}
@ -286,8 +310,10 @@ pub fn manage_orphaned_transactions(config: &ManageOrphanTransactionsConfig, orp
mod tests {
extern crate test_data;
use std::sync::Arc;
use std::collections::HashSet;
use primitives::hash::H256;
use synchronization_peers::PeersImpl;
use synchronization_peers_tasks::PeersTasks;
use super::{ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig, manage_synchronization_peers_blocks,
manage_unknown_orphaned_blocks, manage_orphaned_transactions};
@ -295,11 +321,11 @@ mod tests {
#[test]
fn manage_good_peer() {
let config = ManagePeersConfig { block_failure_interval_ms: 1000, ..Default::default() };
let config = ManagePeersConfig { new_block_failure_interval_ms: 1000, ..Default::default() };
let mut peers = PeersTasks::default();
peers.on_blocks_requested(1, &vec![H256::from(0), H256::from(1)]);
peers.on_block_received(1, &H256::from(0));
assert_eq!(manage_synchronization_peers_blocks(&config, &mut peers), (vec![], vec![]));
assert_eq!(manage_synchronization_peers_blocks(&config, Arc::new(PeersImpl::default()), &mut peers), (vec![], vec![]));
assert_eq!(peers.idle_peers_for_blocks().len(), 0);
}
@ -307,13 +333,13 @@ mod tests {
fn manage_bad_peers() {
use std::thread::sleep;
use std::time::Duration;
let config = ManagePeersConfig { block_failure_interval_ms: 0, ..Default::default() };
let config = ManagePeersConfig { new_block_failure_interval_ms: 0, ..Default::default() };
let mut peers = PeersTasks::default();
peers.on_blocks_requested(1, &vec![H256::from(0)]);
peers.on_blocks_requested(2, &vec![H256::from(1)]);
sleep(Duration::from_millis(1));
let managed_tasks = manage_synchronization_peers_blocks(&config, &mut peers).0;
let managed_tasks = manage_synchronization_peers_blocks(&config, Arc::new(PeersImpl::default()), &mut peers).0;
assert!(managed_tasks.contains(&H256::from(0)));
assert!(managed_tasks.contains(&H256::from(1)));
let idle_peers = peers.idle_peers_for_blocks();

View File

@ -1,3 +1,4 @@
use std::fmt;
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use linked_hash_map::LinkedHashMap;
@ -7,15 +8,13 @@ use types::PeerIndex;
use utils::AverageSpeedMeter;
/// Max peer failures # before excluding from sync process
const MAX_PEER_FAILURES: usize = 2;
const MAX_PEER_FAILURES: usize = 4;
/// Max blocks failures # before forgetiing this block and restarting sync
const MAX_BLOCKS_FAILURES: usize = 6;
/// Number of blocks to inspect while calculating average response time
const BLOCKS_TO_INSPECT: usize = 32;
/// Information on synchronization peers
#[cfg(test)]
#[derive(Debug)]
pub struct Information {
/// # of peers that are marked as useful for current synchronization session && have no pending requests.
pub idle: usize,
@ -62,13 +61,24 @@ pub struct BlocksRequest {
pub blocks: HashSet<H256>,
}
/// Peer trust level.
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum TrustLevel {
/// Suspicios peer (either it is fresh peer, or it has failed to respond to last requests).
Suspicious,
/// This peer is responding to requests.
Trusted,
}
/// Peer statistics
#[derive(Debug, Default)]
struct PeerStats {
#[derive(Debug)]
pub struct PeerStats {
/// Number of blocks requests failures
failures: usize,
/// Average block response time meter
speed: AverageSpeedMeter,
/// Peer trust level.
trust: TrustLevel,
}
/// Block statistics
@ -80,7 +90,6 @@ struct BlockStats {
impl PeersTasks {
/// Get information on synchronization peers
#[cfg(test)]
pub fn information(&self) -> Information {
let active_for_headers: HashSet<_> = self.headers_requests.keys().cloned().collect();
Information {
@ -137,6 +146,11 @@ impl PeersTasks {
.map(|br| &br.blocks)
}
/// Get peer statistics
pub fn get_peer_stats(&self, peer_index: PeerIndex) -> Option<&PeerStats> {
self.stats.get(&peer_index)
}
/// Mark peer as useful.
pub fn useful_peer(&mut self, peer_index: PeerIndex) {
// if peer is unknown => insert to idle queue
@ -194,10 +208,20 @@ impl PeersTasks {
};
// it was requested block => update block response time
self.stats.get_mut(&peer_index).map(|br| br.speed.checkpoint());
self.stats.get_mut(&peer_index)
.map(|br| {
if br.failures > 0 {
br.failures -= 1;
}
br.trust = TrustLevel::Trusted;
br.speed.checkpoint()
});
// if it hasn't been last requested block => just return
if !is_last_requested_block_received {
let mut peer_blocks_requests = self.blocks_requests.remove(&peer_index).expect("checked above; qed");
peer_blocks_requests.timestamp = precise_time_s();
self.blocks_requests.insert(peer_index, peer_blocks_requests);
return;
}
@ -286,10 +310,17 @@ impl PeersTasks {
}
/// We have failed to get headers from peer during given period
pub fn on_peer_headers_failure(&mut self, peer_index: PeerIndex) {
pub fn on_peer_headers_failure(&mut self, peer_index: PeerIndex) -> bool {
// we never penalize peers for header requests failures
self.headers_requests.remove(&peer_index);
self.idle_for_headers.insert(peer_index);
self.stats.get_mut(&peer_index)
.map(|s| {
s.failures += 1;
s.failures > MAX_PEER_FAILURES
})
.unwrap_or_default()
}
/// Reset all peers state to the unuseful
@ -333,8 +364,19 @@ impl PeerStats {
PeerStats {
failures: 0,
speed: AverageSpeedMeter::with_inspect_items(BLOCKS_TO_INSPECT),
trust: TrustLevel::Suspicious,
}
}
pub fn trust(&self) -> TrustLevel {
self.trust
}
}
impl fmt::Debug for Information {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[active:{}, idle:{}, bad:{}]", self.active, self.idle, self.unuseful)
}
}
#[cfg(test)]