From 0e9b5fd89e16d4c06d7bdae31979dea5c18c8417 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 7 Aug 2017 12:59:38 +0300 Subject: [PATCH 1/5] merge good changes from sync_queue branch --- message/src/types/version.rs | 8 +++ p2p/src/net/connections.rs | 1 + p2p/src/p2p.rs | 6 ++ p2p/src/protocol/sync.rs | 9 ++- p2p/src/util/peer.rs | 1 + sync/src/inbound_connection.rs | 4 +- sync/src/local_node.rs | 10 +-- sync/src/synchronization_chain.rs | 2 +- sync/src/synchronization_client_core.rs | 85 +++++++++++++++++++------ sync/src/synchronization_manager.rs | 76 ++++++++++++++-------- sync/src/synchronization_peers_tasks.rs | 52 ++++++++++++--- 11 files changed, 191 insertions(+), 63 deletions(-) diff --git a/message/src/types/version.rs b/message/src/types/version.rs index e75b6640..d1000c3c 100644 --- a/message/src/types/version.rs +++ b/message/src/types/version.rs @@ -100,6 +100,14 @@ impl Version { Version::V70001(_, _, ref v) => v.relay, } } + + pub fn user_agent(&self) -> Option { + match *self { + Version::V0(_) => None, + Version::V106(_, ref v) | + Version::V70001(_, ref v, _) => Some(v.user_agent.clone()), + } + } } #[derive(Debug, Default, PartialEq, Clone)] diff --git a/p2p/src/net/connections.rs b/p2p/src/net/connections.rs index 94894c82..18fb8224 100644 --- a/p2p/src/net/connections.rs +++ b/p2p/src/net/connections.rs @@ -53,6 +53,7 @@ impl Connections { let peer_info = PeerInfo { id: id, address: connection.address, + user_agent: connection.version_message.user_agent().unwrap_or("unknown".into()), direction: direction, version: connection.version, version_message: connection.version_message, diff --git a/p2p/src/p2p.rs b/p2p/src/p2p.rs index 24f9cdf2..4eb75149 100644 --- a/p2p/src/p2p.rs +++ b/p2p/src/p2p.rs @@ -89,6 +89,12 @@ impl Context { self.node_table.write().insert_many(nodes); } + /// Penalize node. + pub fn penalize_node(&self, addr: &SocketAddr) { + trace!("Penalizing node {}", addr); + self.node_table.write().note_failure(addr); + } + /// Adds node to table. pub fn add_node(&self, addr: SocketAddr) -> Result<(), NodeTableError> { trace!("Adding node {} to node table", &addr); diff --git a/p2p/src/protocol/sync.rs b/p2p/src/protocol/sync.rs index 9b079a71..34386dd7 100644 --- a/p2p/src/protocol/sync.rs +++ b/p2p/src/protocol/sync.rs @@ -13,7 +13,7 @@ pub trait LocalSyncNode : Send + Sync { } pub trait InboundSyncConnection : Send + Sync { - fn start_sync_session(&self, version: types::Version); + fn start_sync_session(&self, peer_name: String, version: types::Version); fn close_session(&self); fn on_inventory(&self, message: types::Inv); fn on_getdata(&self, message: types::GetData); @@ -159,6 +159,7 @@ impl OutboundSyncConnection for OutboundSync { } fn close(&self) { + self.context.global().penalize_node(&self.context.info().address); self.context.close() } } @@ -181,7 +182,11 @@ impl SyncProtocol { impl Protocol for SyncProtocol { fn initialize(&mut self) { - self.inbound_connection.start_sync_session(self.context.info().version_message.clone()); + let info = self.context.info(); + self.inbound_connection.start_sync_session( + format!("{}/{}", info.address, info.user_agent), + info.version_message.clone() + ); } fn on_message(&mut self, command: &Command, payload: &Bytes) -> Result<(), Error> { diff --git a/p2p/src/util/peer.rs b/p2p/src/util/peer.rs index 45f3e127..cbed687a 100644 --- a/p2p/src/util/peer.rs +++ b/p2p/src/util/peer.rs @@ -14,6 +14,7 @@ pub enum Direction { pub struct PeerInfo { pub id: PeerId, pub address: SocketAddr, + pub user_agent: String, pub direction: Direction, pub version: u32, pub version_message: types::Version, diff --git a/sync/src/inbound_connection.rs b/sync/src/inbound_connection.rs index 5c989e5b..e3f17d90 100644 --- a/sync/src/inbound_connection.rs +++ b/sync/src/inbound_connection.rs @@ -31,8 +31,8 @@ impl InboundConnection { } impl InboundSyncConnection for InboundConnection { - fn start_sync_session(&self, version: types::Version) { - self.node.on_connect(self.peer_index, version); + fn start_sync_session(&self, peer_name: String, version: types::Version) { + self.node.on_connect(self.peer_index, peer_name, version); } fn close_session(&self) { diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index af5987da..914ab245 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -65,8 +65,8 @@ impl LocalNode where T: TaskExecutor, U: Server, V: Client { } /// When new peer connects to the node - pub fn on_connect(&self, peer_index: PeerIndex, version: types::Version) { - trace!(target: "sync", "Starting new sync session with peer#{}", peer_index); + pub fn on_connect(&self, peer_index: PeerIndex, peer_name: String, version: types::Version) { + trace!(target: "sync", "Starting new sync session with peer#{}: {}", peer_index, peer_name); // light clients may not want transactions broadcasting until filter for connection is set if !version.relay_transactions() { @@ -389,7 +389,7 @@ pub mod tests { #[test] fn local_node_serves_block() { let (_, server, local_node) = create_local_node(None); - let peer_index = 0; local_node.on_connect(peer_index, types::Version::default()); + let peer_index = 0; local_node.on_connect(peer_index, "test".into(), types::Version::default()); // peer requests genesis block let genesis_block_hash = test_data::genesis().hash(); let inventory = vec![ @@ -411,7 +411,7 @@ pub mod tests { let (executor, _, local_node) = create_local_node(None); // transaction will be relayed to this peer - let peer_index1 = 0; local_node.on_connect(peer_index1, types::Version::default()); + let peer_index1 = 0; local_node.on_connect(peer_index1, "test".into(), types::Version::default()); executor.take_tasks(); let genesis = test_data::genesis(); @@ -436,7 +436,7 @@ pub mod tests { let (executor, _, local_node) = create_local_node(Some(verifier)); - let peer_index1 = 0; local_node.on_connect(peer_index1, types::Version::default()); + let peer_index1 = 0; local_node.on_connect(peer_index1, "test".into(), types::Version::default()); executor.take_tasks(); let result = local_node.accept_transaction(transaction); diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index b26cc41c..e5c30711 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -681,7 +681,7 @@ impl db::BlockHeaderProvider for Chain { impl fmt::Debug for Information { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "[sch:{} / bh:{} -> req:{} -> vfy:{} -> stored: {}]", self.scheduled, self.headers.best, self.requested, self.verifying, self.stored) + write!(f, "[sch:{} -> req:{} -> vfy:{} -> stored: {}]", self.scheduled, self.requested, self.verifying, self.stored) } } diff --git a/sync/src/synchronization_client_core.rs b/sync/src/synchronization_client_core.rs index 58392b31..bf273a01 100644 --- a/sync/src/synchronization_client_core.rs +++ b/sync/src/synchronization_client_core.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use futures::Future; use parking_lot::Mutex; use time; +use time::precise_time_s; use chain::{IndexedBlockHeader, IndexedTransaction, Transaction, IndexedBlock}; use message::types; use message::common::{InventoryType, InventoryVector}; @@ -19,7 +20,6 @@ use synchronization_peers_tasks::PeersTasks; use synchronization_verifier::{VerificationSink, BlockVerificationSink, TransactionVerificationSink, VerificationTask}; use types::{BlockHeight, ClientCoreRef, PeersRef, PeerIndex, SynchronizationStateRef, EmptyBoxFuture, SyncListenerRef}; use utils::{AverageSpeedMeter, MessageBlockHeadersProvider, OrphanBlocksPool, OrphanTransactionsPool, HashPosition}; -#[cfg(test)] use synchronization_peers::Peers; #[cfg(test)] use synchronization_peers_tasks::{Information as PeersTasksInformation}; #[cfg(test)] use synchronization_chain::{Information as ChainInformation}; @@ -41,6 +41,12 @@ const NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S: f64 = 20_f64; const SYNC_SPEED_BLOCKS_TO_INSPECT: usize = 512; /// Number of blocks to inspect when calculating average blocks speed const BLOCKS_SPEED_BLOCKS_TO_INSPECT: usize = 512; +/// Minimal time between duplicated blocks requests. +const MIN_BLOCK_DUPLICATION_INTERVAL_S: f64 = 10_f64; +/// Maximal number of blocks in duplicate requests. +const MAX_BLOCKS_IN_DUPLICATE_REQUEST: BlockHeight = 4; +/// Minimal number of blocks in duplicate requests. +const MIN_BLOCKS_IN_DUPLICATE_REQUEST: BlockHeight = 8; /// Information on current synchronization state. #[cfg(test)] @@ -123,6 +129,8 @@ pub struct SynchronizationClientCore { config: Config, /// Synchronization events listener listener: Option, + /// Time of last duplicated blocks request. + last_dup_time: f64, } /// Verification sink for synchronization client core @@ -142,6 +150,20 @@ pub enum State { Saturated, } +/// Blocks request limits. +pub struct BlocksRequestLimits { + /// Approximate maximal number of blocks hashes in scheduled queue. + pub max_scheduled_hashes: BlockHeight, + /// Approximate maximal number of blocks hashes in requested queue. + pub max_requested_blocks: BlockHeight, + /// Approximate maximal number of blocks in verifying queue. + pub max_verifying_blocks: BlockHeight, + /// Minimum number of blocks to request from peer + pub min_blocks_in_request: BlockHeight, + /// Maximum number of blocks to request from peer + pub max_blocks_in_request: BlockHeight, +} + /// Transaction append error enum AppendTransactionError { Synchronizing, @@ -189,6 +211,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { self.executor.execute(Task::GetHeaders(peer_index, types::GetHeaders::with_block_locator_hashes(block_locator_hashes))); // unuseful until respond with headers message self.peers_tasks.unuseful_peer(peer_index); + self.peers_tasks.on_headers_requested(peer_index); } fn on_disconnect(&mut self, peer_index: PeerIndex) { @@ -467,6 +490,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // for now, let's exclude peer from synchronization - we are relying on full nodes for synchronization let removed_tasks = self.peers_tasks.reset_blocks_tasks(peer_index); self.peers_tasks.unuseful_peer(peer_index); + self.peers.misbehaving(peer_index, &format!("Responded with NotFound(unrequested_block)")); // if peer has had some blocks tasks, rerequest these blocks self.execute_synchronization_tasks(Some(removed_tasks), None); @@ -516,6 +540,10 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // display information if processed many blocks || enough time has passed since sync start self.print_synchronization_information(); + // prepare limits + let verifying_hashes_len = self.chain.length_of_blocks_state(BlockState::Verifying); + let limits = BlocksRequestLimits::default(); // TODO: must be updated using retrieval && verification speed + // if some blocks requests are forced => we should ask peers even if there are no idle peers if let Some(forced_blocks_requests) = forced_blocks_requests { let useful_peers = self.peers_tasks.useful_peers(); @@ -526,7 +554,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { return; } - let forced_tasks = self.prepare_blocks_requests_tasks(useful_peers, forced_blocks_requests); + let forced_tasks = self.prepare_blocks_requests_tasks(&limits, useful_peers, forced_blocks_requests); tasks.extend(forced_tasks); } @@ -534,7 +562,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { if let Some(final_blocks_requests) = final_blocks_requests { let useful_peers = self.peers_tasks.useful_peers(); if !useful_peers.is_empty() { // if empty => not a problem, just forget these blocks - let forced_tasks = self.prepare_blocks_requests_tasks(useful_peers, final_blocks_requests); + let forced_tasks = self.prepare_blocks_requests_tasks(&limits, useful_peers, final_blocks_requests); tasks.extend(forced_tasks); } } @@ -575,7 +603,6 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // => we could ask idle peer2 about [B1, B2, B3, B4] // these requests has priority over new blocks requests below let requested_hashes_len = self.chain.length_of_blocks_state(BlockState::Requested); - let verifying_hashes_len = self.chain.length_of_blocks_state(BlockState::Verifying); if requested_hashes_len != 0 { let verification_speed: f64 = self.block_speed_meter.speed(); let synchronization_speed: f64 = self.sync_speed_meter.speed(); @@ -609,15 +636,19 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // if verification queue will be empty before all synchronization requests will be completed // + do not spam with duplicated blocks requests if blocks are too big && there are still blocks left for NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S // => duplicate blocks requests + let now = precise_time_s(); if synchronization_queue_will_be_full_in > verification_queue_will_be_empty_in && - verification_queue_will_be_empty_in < NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S { + verification_queue_will_be_empty_in < NEAR_EMPTY_VERIFICATION_QUEUE_THRESHOLD_S && + now - self.last_dup_time > MIN_BLOCK_DUPLICATION_INTERVAL_S { + // do not duplicate too often + self.last_dup_time = now; // blocks / second * second -> blocks - let hashes_requests_to_duplicate_len = synchronization_speed * (synchronization_queue_will_be_full_in - verification_queue_will_be_empty_in); + let hashes_requests_to_duplicate_len = (synchronization_speed * (synchronization_queue_will_be_full_in - verification_queue_will_be_empty_in)) as BlockHeight; // do not ask for too many blocks - let hashes_requests_to_duplicate_len = min(MAX_BLOCKS_IN_REQUEST, hashes_requests_to_duplicate_len as BlockHeight); + let hashes_requests_to_duplicate_len = min(MAX_BLOCKS_IN_DUPLICATE_REQUEST, hashes_requests_to_duplicate_len); // ask for at least 1 block - let hashes_requests_to_duplicate_len = max(1, min(requested_hashes_len, hashes_requests_to_duplicate_len)); - blocks_requests = Some(self.chain.best_n_of_blocks_state(BlockState::Requested, hashes_requests_to_duplicate_len)); + let hashes_requests_to_duplicate_len = max(MIN_BLOCKS_IN_DUPLICATE_REQUEST, min(requested_hashes_len, hashes_requests_to_duplicate_len)); + blocks_requests = Some(self.chain.best_n_of_blocks_state(BlockState::Requested, hashes_requests_to_duplicate_len as BlockHeight)); trace!(target: "sync", "Duplicating {} blocks requests. Sync speed: {} * {}, blocks speed: {} * {}.", hashes_requests_to_duplicate_len, synchronization_speed, requested_hashes_len, verification_speed, verifying_hashes_len); } @@ -625,9 +656,10 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // check if we can move some blocks from scheduled to requested queue { + // TODO: only request minimal number of blocks, if other urgent blocks are requested let scheduled_hashes_len = self.chain.length_of_blocks_state(BlockState::Scheduled); if requested_hashes_len + verifying_hashes_len < MAX_REQUESTED_BLOCKS + MAX_VERIFYING_BLOCKS && scheduled_hashes_len != 0 { - let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(scheduled_hashes_len / blocks_idle_peers_len, MIN_BLOCKS_IN_REQUEST)); + let chunk_size = min(limits.max_blocks_in_request, max(scheduled_hashes_len / blocks_idle_peers_len, limits.min_blocks_in_request)); let hashes_to_request_len = chunk_size * blocks_idle_peers_len; let hashes_to_request = self.chain.request_blocks_hashes(hashes_to_request_len); match blocks_requests { @@ -641,7 +673,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // append blocks requests tasks if let Some(blocks_requests) = blocks_requests { - tasks.extend(self.prepare_blocks_requests_tasks(blocks_idle_peers, blocks_requests)); + tasks.extend(self.prepare_blocks_requests_tasks(&limits, blocks_idle_peers, blocks_requests)); } // execute synchronization tasks @@ -724,6 +756,7 @@ impl SynchronizationClientCore where T: TaskExecutor { sync_speed_meter: AverageSpeedMeter::with_inspect_items(BLOCKS_SPEED_BLOCKS_TO_INSPECT), config: config, listener: None, + last_dup_time: 0f64, } )); @@ -758,6 +791,11 @@ impl SynchronizationClientCore where T: TaskExecutor { &mut self.chain } + /// Return peers reference + pub fn peers(&self) -> PeersRef { + self.peers.clone() + } + /// Return peers tasks reference pub fn peers_tasks(&mut self) -> &mut PeersTasks { &mut self.peers_tasks @@ -779,12 +817,6 @@ impl SynchronizationClientCore where T: TaskExecutor { self.verify_headers = verify; } - /// Return peers reference - #[cfg(test)] - pub fn peers(&mut self) -> &Peers { - &*self.peers - } - /// Print synchronization information pub fn print_synchronization_information(&mut self) { if let State::Synchronizing(timestamp, num_of_blocks) = self.state { @@ -796,9 +828,10 @@ impl SynchronizationClientCore where T: TaskExecutor { self.state = State::Synchronizing(time::precise_time_s(), new_num_of_blocks); use time; - info!(target: "sync", "{:?} @ Processed {} blocks in {} seconds. Chain information: {:?}" + info!(target: "sync", "{:?} Processed {} blocks in {:.2} seconds. Peers: {:?}. Chain: {:?}" , time::strftime("%H:%M:%S", &time::now()).unwrap() , blocks_diff, timestamp_diff + , self.peers_tasks.information() , self.chain.information()); } } @@ -904,13 +937,13 @@ impl SynchronizationClientCore where T: TaskExecutor { Ok(transactions) } - fn prepare_blocks_requests_tasks(&mut self, mut peers: Vec, mut hashes: Vec) -> Vec { + fn prepare_blocks_requests_tasks(&mut self, limits: &BlocksRequestLimits, mut peers: Vec, mut hashes: Vec) -> Vec { use std::mem::swap; // ask fastest peers for hashes at the beginning of `hashes` self.peers_tasks.sort_peers_for_blocks(&mut peers); - let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(hashes.len() as BlockHeight, MIN_BLOCKS_IN_REQUEST)); + let chunk_size = min(limits.max_blocks_in_request, max(hashes.len() as BlockHeight, limits.min_blocks_in_request)); let last_peer_index = peers.len() - 1; let mut tasks: Vec = Vec::new(); for (peer_index, peer) in peers.into_iter().enumerate() { @@ -1172,6 +1205,18 @@ impl SynchronizationClientCore where T: TaskExecutor { } } +impl Default for BlocksRequestLimits { + fn default() -> Self { + BlocksRequestLimits { + max_scheduled_hashes: MAX_SCHEDULED_HASHES, + max_requested_blocks: MAX_REQUESTED_BLOCKS, + max_verifying_blocks: MAX_VERIFYING_BLOCKS, + min_blocks_in_request: MIN_BLOCKS_IN_REQUEST, + max_blocks_in_request: MAX_BLOCKS_IN_REQUEST, + } + } +} + #[cfg(test)] pub mod tests { extern crate test_data; diff --git a/sync/src/synchronization_manager.rs b/sync/src/synchronization_manager.rs index 475c97e6..5e6ba2d8 100644 --- a/sync/src/synchronization_manager.rs +++ b/sync/src/synchronization_manager.rs @@ -7,15 +7,20 @@ use time::precise_time_s; use primitives::hash::H256; use synchronization_client_core::{ClientCore, SynchronizationClientCore}; use synchronization_executor::TaskExecutor; -use synchronization_peers_tasks::PeersTasks; +use synchronization_peers_tasks::{PeersTasks, TrustLevel}; use utils::{OrphanBlocksPool, OrphanTransactionsPool}; +use types::PeersRef; /// Management interval (in ms) const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000; /// Response time before getting block to decrease peer score -const DEFAULT_PEER_BLOCK_FAILURE_INTERVAL_MS: u32 = 60 * 1000; +const DEFAULT_NEW_PEER_BLOCK_FAILURE_INTERVAL_MS: u32 = 5 * 1000; /// Response time before getting headers to decrease peer score -const DEFAULT_PEER_HEADERS_FAILURE_INTERVAL_MS: u32 = 60 * 1000; +const DEFAULT_NEW_PEER_HEADERS_FAILURE_INTERVAL_MS: u32 = 5 * 1000; +/// Response time before getting block to decrease peer score +const DEFAULT_TRUSTED_PEER_BLOCK_FAILURE_INTERVAL_MS: u32 = 20 * 1000; +/// Response time before getting headers to decrease peer score +const DEFAULT_TRUSTED_PEER_HEADERS_FAILURE_INTERVAL_MS: u32 = 20 * 1000; /// Unknown orphan block removal time const DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS: u32 = 20 * 60 * 1000; /// Maximal number of orphaned blocks @@ -81,14 +86,14 @@ impl ManagementWorker { core.print_synchronization_information(); // execute management tasks if not saturated if core.state().is_synchronizing() || core.state().is_nearly_saturated() { - let (blocks_to_request, blocks_to_forget) = manage_synchronization_peers_blocks(&peers_config, core.peers_tasks()); + let (blocks_to_request, blocks_to_forget) = manage_synchronization_peers_blocks(&peers_config, core.peers(), core.peers_tasks()); core.forget_failed_blocks(&blocks_to_forget); core.execute_synchronization_tasks( if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) }, if blocks_to_forget.is_empty() { None } else { Some(blocks_to_forget) }, ); - manage_synchronization_peers_headers(&peers_config, core.peers_tasks()); + manage_synchronization_peers_headers(&peers_config, core.peers(), core.peers_tasks()); manage_orphaned_transactions(&orphan_config, core.orphaned_transactions_pool()); if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, core.orphaned_blocks_pool()) { for orphan_to_remove in orphans_to_remove { @@ -115,17 +120,22 @@ impl Drop for ManagementWorker { /// Peers management configuration pub struct ManagePeersConfig { - /// Time interval (in milliseconds) to wait block from the peer before penalizing && reexecuting tasks - pub block_failure_interval_ms: u32, + pub new_block_failure_interval_ms: u32, /// Time interval (in milliseconds) to wait headers from the peer before penalizing && reexecuting tasks - pub headers_failure_interval_ms: u32, + pub new_headers_failure_interval_ms: u32, + /// Time interval (in milliseconds) to wait block from the peer before penalizing && reexecuting tasks + pub trusted_block_failure_interval_ms: u32, + /// Time interval (in milliseconds) to wait headers from the peer before penalizing && reexecuting tasks + pub trusted_headers_failure_interval_ms: u32, } impl Default for ManagePeersConfig { fn default() -> Self { ManagePeersConfig { - block_failure_interval_ms: DEFAULT_PEER_BLOCK_FAILURE_INTERVAL_MS, - headers_failure_interval_ms: DEFAULT_PEER_HEADERS_FAILURE_INTERVAL_MS, + new_block_failure_interval_ms: DEFAULT_NEW_PEER_BLOCK_FAILURE_INTERVAL_MS, + new_headers_failure_interval_ms: DEFAULT_NEW_PEER_HEADERS_FAILURE_INTERVAL_MS, + trusted_block_failure_interval_ms: DEFAULT_TRUSTED_PEER_BLOCK_FAILURE_INTERVAL_MS, + trusted_headers_failure_interval_ms: DEFAULT_TRUSTED_PEER_HEADERS_FAILURE_INTERVAL_MS, } } } @@ -165,33 +175,36 @@ impl Default for ManageOrphanTransactionsConfig { } /// Manage stalled synchronization peers blocks tasks -pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &mut PeersTasks) -> (Vec, Vec) { +pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: PeersRef, peers_tasks: &mut PeersTasks) -> (Vec, Vec) { let mut blocks_to_request: Vec = Vec::new(); let mut blocks_to_forget: Vec = Vec::new(); let now = precise_time_s(); // reset tasks for peers, which has not responded during given period - let ordered_blocks_requests: Vec<_> = peers.ordered_blocks_requests().clone().into_iter().collect(); + let ordered_blocks_requests: Vec<_> = peers_tasks.ordered_blocks_requests().clone().into_iter().collect(); for (worst_peer_index, blocks_request) in ordered_blocks_requests { // check if peer has not responded within given time + let is_trusted = peers_tasks.get_peer_stats(worst_peer_index).map(|s| s.trust() == TrustLevel::Trusted).unwrap_or(false); + let block_failure_interval = if is_trusted { config.trusted_block_failure_interval_ms } else { config.new_block_failure_interval_ms }; let time_diff = now - blocks_request.timestamp; - if time_diff <= config.block_failure_interval_ms as f64 / 1000f64 { + if time_diff <= block_failure_interval as f64 / 1000f64 { break; } // decrease score && move to the idle queue warn!(target: "sync", "Failed to get requested block from peer#{} in {} seconds", worst_peer_index, time_diff); - let failed_blocks = peers.reset_blocks_tasks(worst_peer_index); + let failed_blocks = peers_tasks.reset_blocks_tasks(worst_peer_index); // mark blocks as failed - let (normal_blocks, failed_blocks) = peers.on_blocks_failure(failed_blocks); + let (normal_blocks, failed_blocks) = peers_tasks.on_blocks_failure(failed_blocks); blocks_to_request.extend(normal_blocks); blocks_to_forget.extend(failed_blocks); // if peer failed many times => forget it - if peers.on_peer_block_failure(worst_peer_index) { + if peers_tasks.on_peer_block_failure(worst_peer_index) { warn!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index); - peers.unuseful_peer(worst_peer_index); + peers_tasks.unuseful_peer(worst_peer_index); + peers.misbehaving(worst_peer_index, &format!("Too many failures")); } } @@ -199,18 +212,29 @@ pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &m } /// Manage stalled synchronization peers headers tasks -pub fn manage_synchronization_peers_headers(config: &ManagePeersConfig, peers: &mut PeersTasks) { +pub fn manage_synchronization_peers_headers(config: &ManagePeersConfig, peers: PeersRef, peers_tasks: &mut PeersTasks) { let now = precise_time_s(); // reset tasks for peers, which has not responded during given period - let ordered_headers_requests: Vec<_> = peers.ordered_headers_requests().clone().into_iter().collect(); + let ordered_headers_requests: Vec<_> = peers_tasks.ordered_headers_requests().clone().into_iter().collect(); for (worst_peer_index, headers_request) in ordered_headers_requests { // check if peer has not responded within given time + let is_trusted = peers_tasks.get_peer_stats(worst_peer_index).map(|s| s.trust() == TrustLevel::Trusted).unwrap_or(false); + let headers_failure_interval = if is_trusted { config.trusted_headers_failure_interval_ms } else { config.new_headers_failure_interval_ms }; let time_diff = now - headers_request.timestamp; - if time_diff <= config.headers_failure_interval_ms as f64 / 1000f64 { + if time_diff <= headers_failure_interval as f64 / 1000f64 { break; } - peers.on_peer_headers_failure(worst_peer_index); + // do not penalize peer if it has pending blocks tasks + if peers_tasks.get_blocks_tasks(worst_peer_index).map(|t| !t.is_empty()).unwrap_or(false) { + continue; + } + + // if peer failed many times => forget it + if peers_tasks.on_peer_headers_failure(worst_peer_index) { + warn!(target: "sync", "Too many headers failures for peer#{}. Excluding from synchronization", worst_peer_index); + peers.misbehaving(worst_peer_index, &format!("Too many headers failures")); + } } } @@ -286,8 +310,10 @@ pub fn manage_orphaned_transactions(config: &ManageOrphanTransactionsConfig, orp mod tests { extern crate test_data; + use std::sync::Arc; use std::collections::HashSet; use primitives::hash::H256; + use synchronization_peers::PeersImpl; use synchronization_peers_tasks::PeersTasks; use super::{ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig, manage_synchronization_peers_blocks, manage_unknown_orphaned_blocks, manage_orphaned_transactions}; @@ -295,11 +321,11 @@ mod tests { #[test] fn manage_good_peer() { - let config = ManagePeersConfig { block_failure_interval_ms: 1000, ..Default::default() }; + let config = ManagePeersConfig { new_block_failure_interval_ms: 1000, ..Default::default() }; let mut peers = PeersTasks::default(); peers.on_blocks_requested(1, &vec![H256::from(0), H256::from(1)]); peers.on_block_received(1, &H256::from(0)); - assert_eq!(manage_synchronization_peers_blocks(&config, &mut peers), (vec![], vec![])); + assert_eq!(manage_synchronization_peers_blocks(&config, Arc::new(PeersImpl::default()), &mut peers), (vec![], vec![])); assert_eq!(peers.idle_peers_for_blocks().len(), 0); } @@ -307,13 +333,13 @@ mod tests { fn manage_bad_peers() { use std::thread::sleep; use std::time::Duration; - let config = ManagePeersConfig { block_failure_interval_ms: 0, ..Default::default() }; + let config = ManagePeersConfig { new_block_failure_interval_ms: 0, ..Default::default() }; let mut peers = PeersTasks::default(); peers.on_blocks_requested(1, &vec![H256::from(0)]); peers.on_blocks_requested(2, &vec![H256::from(1)]); sleep(Duration::from_millis(1)); - let managed_tasks = manage_synchronization_peers_blocks(&config, &mut peers).0; + let managed_tasks = manage_synchronization_peers_blocks(&config, Arc::new(PeersImpl::default()), &mut peers).0; assert!(managed_tasks.contains(&H256::from(0))); assert!(managed_tasks.contains(&H256::from(1))); let idle_peers = peers.idle_peers_for_blocks(); diff --git a/sync/src/synchronization_peers_tasks.rs b/sync/src/synchronization_peers_tasks.rs index f3cb4fcd..89a4690c 100644 --- a/sync/src/synchronization_peers_tasks.rs +++ b/sync/src/synchronization_peers_tasks.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use linked_hash_map::LinkedHashMap; @@ -7,15 +8,13 @@ use types::PeerIndex; use utils::AverageSpeedMeter; /// Max peer failures # before excluding from sync process -const MAX_PEER_FAILURES: usize = 2; +const MAX_PEER_FAILURES: usize = 4; /// Max blocks failures # before forgetiing this block and restarting sync const MAX_BLOCKS_FAILURES: usize = 6; /// Number of blocks to inspect while calculating average response time const BLOCKS_TO_INSPECT: usize = 32; /// Information on synchronization peers -#[cfg(test)] -#[derive(Debug)] pub struct Information { /// # of peers that are marked as useful for current synchronization session && have no pending requests. pub idle: usize, @@ -62,13 +61,24 @@ pub struct BlocksRequest { pub blocks: HashSet, } +/// Peer trust level. +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum TrustLevel { + /// Suspicios peer (either it is fresh peer, or it has failed to respond to last requests). + Suspicious, + /// This peer is responding to requests. + Trusted, +} + /// Peer statistics -#[derive(Debug, Default)] -struct PeerStats { +#[derive(Debug)] +pub struct PeerStats { /// Number of blocks requests failures failures: usize, /// Average block response time meter speed: AverageSpeedMeter, + /// Peer trust level. + trust: TrustLevel, } /// Block statistics @@ -80,7 +90,6 @@ struct BlockStats { impl PeersTasks { /// Get information on synchronization peers - #[cfg(test)] pub fn information(&self) -> Information { let active_for_headers: HashSet<_> = self.headers_requests.keys().cloned().collect(); Information { @@ -137,6 +146,11 @@ impl PeersTasks { .map(|br| &br.blocks) } + /// Get peer statistics + pub fn get_peer_stats(&self, peer_index: PeerIndex) -> Option<&PeerStats> { + self.stats.get(&peer_index) + } + /// Mark peer as useful. pub fn useful_peer(&mut self, peer_index: PeerIndex) { // if peer is unknown => insert to idle queue @@ -194,7 +208,11 @@ impl PeersTasks { }; // it was requested block => update block response time - self.stats.get_mut(&peer_index).map(|br| br.speed.checkpoint()); + self.stats.get_mut(&peer_index) + .map(|br| { + br.trust = TrustLevel::Trusted; + br.speed.checkpoint() + }); // if it hasn't been last requested block => just return if !is_last_requested_block_received { @@ -286,10 +304,17 @@ impl PeersTasks { } /// We have failed to get headers from peer during given period - pub fn on_peer_headers_failure(&mut self, peer_index: PeerIndex) { + pub fn on_peer_headers_failure(&mut self, peer_index: PeerIndex) -> bool { // we never penalize peers for header requests failures self.headers_requests.remove(&peer_index); self.idle_for_headers.insert(peer_index); + + self.stats.get_mut(&peer_index) + .map(|s| { + s.failures += 1; + s.failures > MAX_PEER_FAILURES + }) + .unwrap_or_default() } /// Reset all peers state to the unuseful @@ -333,8 +358,19 @@ impl PeerStats { PeerStats { failures: 0, speed: AverageSpeedMeter::with_inspect_items(BLOCKS_TO_INSPECT), + trust: TrustLevel::Suspicious, } } + + pub fn trust(&self) -> TrustLevel { + self.trust + } +} + +impl fmt::Debug for Information { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "[active:{}, idle:{}, bad:{}]", self.active, self.idle, self.unuseful) + } } #[cfg(test)] From 72c3408b8f9e59726b06e1e41247a8caf34136d3 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 11 Aug 2017 11:05:34 +0300 Subject: [PATCH 2/5] fixed node penalizing --- sync/src/synchronization_peers_tasks.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sync/src/synchronization_peers_tasks.rs b/sync/src/synchronization_peers_tasks.rs index 89a4690c..2cb9cfaa 100644 --- a/sync/src/synchronization_peers_tasks.rs +++ b/sync/src/synchronization_peers_tasks.rs @@ -210,12 +210,18 @@ impl PeersTasks { // it was requested block => update block response time self.stats.get_mut(&peer_index) .map(|br| { + if br.failures > 0 { + br.failures -= 1; + } br.trust = TrustLevel::Trusted; br.speed.checkpoint() }); // if it hasn't been last requested block => just return if !is_last_requested_block_received { + let mut peer_blocks_requests = self.blocks_requests.remove(&peer_index).expect("checked above; qed"); + peer_blocks_requests.timestamp = precise_time_s(); + self.blocks_requests.insert(peer_index, peer_blocks_requests); return; } @@ -223,7 +229,6 @@ impl PeersTasks { self.stats.get_mut(&peer_index).map(|br| br.speed.stop()); // mark this peer as idle for blocks request - self.blocks_requests.remove(&peer_index); self.idle_for_blocks.insert(peer_index); // also mark as available for headers request if not yet if !self.headers_requests.contains_key(&peer_index) { From 28f4cda92d0baa048ade142ef6494bc8a91644db Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 11 Aug 2017 11:12:47 +0300 Subject: [PATCH 3/5] added hardocded (min, max) requested blocks switch --- sync/src/synchronization_client_core.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sync/src/synchronization_client_core.rs b/sync/src/synchronization_client_core.rs index bf273a01..e1ca1c95 100644 --- a/sync/src/synchronization_client_core.rs +++ b/sync/src/synchronization_client_core.rs @@ -540,11 +540,15 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // display information if processed many blocks || enough time has passed since sync start self.print_synchronization_information(); - // prepare limits - let verifying_hashes_len = self.chain.length_of_blocks_state(BlockState::Verifying); - let limits = BlocksRequestLimits::default(); // TODO: must be updated using retrieval && verification speed + // prepare limits. TODO: must be updated using current retrieval && verification speed && blocks size + let mut limits = BlocksRequestLimits::default(); + if self.chain.length_of_blocks_state(BlockState::Stored) > 150_000 { + limits.min_blocks_in_request = 8; + limits.max_blocks_in_request = 16; + } // if some blocks requests are forced => we should ask peers even if there are no idle peers + let verifying_hashes_len = self.chain.length_of_blocks_state(BlockState::Verifying); if let Some(forced_blocks_requests) = forced_blocks_requests { let useful_peers = self.peers_tasks.useful_peers(); // if we have to request blocks && there are no useful peers at all => switch to saturated state From b33e327d324dac1e7f7afc41947c17b2691f9ab7 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 11 Aug 2017 11:27:31 +0300 Subject: [PATCH 4/5] do not close connection on notfound if not synchronizing --- sync/src/synchronization_client_core.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sync/src/synchronization_client_core.rs b/sync/src/synchronization_client_core.rs index e1ca1c95..43b5bc0c 100644 --- a/sync/src/synchronization_client_core.rs +++ b/sync/src/synchronization_client_core.rs @@ -490,7 +490,9 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // for now, let's exclude peer from synchronization - we are relying on full nodes for synchronization let removed_tasks = self.peers_tasks.reset_blocks_tasks(peer_index); self.peers_tasks.unuseful_peer(peer_index); - self.peers.misbehaving(peer_index, &format!("Responded with NotFound(unrequested_block)")); + if self.state.is_synchronizing() { + self.peers.misbehaving(peer_index, &format!("Responded with NotFound(unrequested_block)")); + } // if peer has had some blocks tasks, rerequest these blocks self.execute_synchronization_tasks(Some(removed_tasks), None); From 5d5a6d715ca52952cf5c57aed585cb60a789c348 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 11 Aug 2017 11:48:26 +0300 Subject: [PATCH 5/5] lost line --- sync/src/synchronization_peers_tasks.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/sync/src/synchronization_peers_tasks.rs b/sync/src/synchronization_peers_tasks.rs index 2cb9cfaa..a9323f7b 100644 --- a/sync/src/synchronization_peers_tasks.rs +++ b/sync/src/synchronization_peers_tasks.rs @@ -229,6 +229,7 @@ impl PeersTasks { self.stats.get_mut(&peer_index).map(|br| br.speed.stop()); // mark this peer as idle for blocks request + self.blocks_requests.remove(&peer_index); self.idle_for_blocks.insert(peer_index); // also mark as available for headers request if not yet if !self.headers_requests.contains_key(&peer_index) {