diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index d09967a0..8bc2a201 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -20,7 +20,7 @@ use synchronization_chain::{Chain, ChainRef, BlockState, HeadersIntersection}; use synchronization_chain::{Information as ChainInformation}; use verification::{ChainVerifier, Error as VerificationError, Verify}; use synchronization_executor::{Task, TaskExecutor}; -use synchronization_manager::{manage_synchronization_peers, manage_unknown_orphaned_blocks, MANAGEMENT_INTERVAL_MS}; +use synchronization_manager::{manage_synchronization_peers, manage_unknown_orphaned_blocks, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig}; use hash_queue::HashPosition; use time; use std::time::Duration; @@ -468,6 +468,8 @@ impl SynchronizationClient where T: TaskExecutor { // TODO: start management worker only when synchronization is started // currently impossible because there is no way to call Interval::new with Remote && Handle is not-Send { + let peers_config = ManagePeersConfig::default(); + let unknown_config = ManageUnknownBlocksConfig::default(); let csync = Arc::downgrade(&sync); let mut sync = sync.lock(); let management_worker = Interval::new(Duration::from_millis(MANAGEMENT_INTERVAL_MS), handle) @@ -479,7 +481,7 @@ impl SynchronizationClient where T: TaskExecutor { }; let mut client = client.lock(); if client.state.is_synchronizing() || client.state.is_nearly_saturated() { - let blocks_to_request = manage_synchronization_peers(&mut client.peers); + let blocks_to_request = manage_synchronization_peers(&peers_config, &mut client.peers); // if no peers left => we are saturated if !client.peers.any() { client.switch_to_saturated_state(false); @@ -487,7 +489,7 @@ impl SynchronizationClient where T: TaskExecutor { client.execute_synchronization_tasks(blocks_to_request); } - if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&mut client.unknown_blocks) { + if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, &mut client.unknown_blocks) { client.remove_orphaned_blocks(orphans_to_remove.into_iter().collect()); } } diff --git a/sync/src/synchronization_manager.rs b/sync/src/synchronization_manager.rs index 11240129..6ad954eb 100644 --- a/sync/src/synchronization_manager.rs +++ b/sync/src/synchronization_manager.rs @@ -6,32 +6,63 @@ use primitives::hash::H256; /// Management interval (in ms) pub const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000; /// Response time to decrease peer score -const PEER_FAILURE_INTERVAL_S: f64 = 5f64; +const DEFAULT_PEER_FAILURE_INTERVAL_MS: u32 = 5 * 1000; /// Unknown orphan block removal time -const UNKNOWN_BLOCK_REMOVAL_TIME_S: f64 = 20f64 * 60f64; +const DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS: u32 = 20 * 60 * 1000; /// Maximal number of orphaned blocks -const UNKNOWN_BLOCKS_MAX_LEN: usize = 16; +const DEFAULT_UNKNOWN_BLOCKS_MAX_LEN: usize = 16; + +/// Peers management configuration +pub struct ManagePeersConfig { + /// Time interval (in milliseconds) to wait answer from the peer before penalizing && reexecuting tasks + pub failure_interval_ms: u32, +} + +impl Default for ManagePeersConfig { + fn default() -> Self { + ManagePeersConfig { + failure_interval_ms: DEFAULT_PEER_FAILURE_INTERVAL_MS, + } + } +} + +/// Unknown blocks management configuration +pub struct ManageUnknownBlocksConfig { + /// Time interval (in milliseconds) to wait before removing unknown blocks from in-memory pool + pub removal_time_ms: u32, + /// Maximal # of unknown blocks in the in-memory pool + pub max_number: usize, +} + +impl Default for ManageUnknownBlocksConfig { + fn default() -> Self { + ManageUnknownBlocksConfig { + removal_time_ms: DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS, + max_number: DEFAULT_UNKNOWN_BLOCKS_MAX_LEN, + } + } +} /// Manage stalled synchronization peers tasks -pub fn manage_synchronization_peers(peers: &mut Peers) -> Option> { +pub fn manage_synchronization_peers(config: &ManagePeersConfig, peers: &mut Peers) -> Option> { let mut blocks_to_request: Vec = Vec::new(); let now = precise_time_s(); // reset tasks for peers, which has not responded during given period for (worst_peer_index, worst_peer_time) in peers.worst_peers() { // check if peer has not responded within given time - let time_diff = worst_peer_time - now; - if time_diff <= PEER_FAILURE_INTERVAL_S { + let time_diff = now - worst_peer_time; + if time_diff <= config.failure_interval_ms as f64 / 1000f64 { break; } // decrease score && move to the idle queue - trace!(target: "sync", "Failed to get response from peer#{} in {} seconds", worst_peer_index, time_diff); + warn!(target: "sync", "Failed to get response from peer#{} in {} seconds", worst_peer_index, time_diff); let peer_tasks = peers.reset_tasks(worst_peer_index); blocks_to_request.extend(peer_tasks); // if peer failed many times => forget it if peers.on_peer_failure(worst_peer_index) { - trace!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index); + warn!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index); } } @@ -39,9 +70,9 @@ pub fn manage_synchronization_peers(peers: &mut Peers) -> Option> { } /// Manage unknown orphaned blocks -pub fn manage_unknown_orphaned_blocks(unknown_blocks: &mut LinkedHashMap) -> Option> { +pub fn manage_unknown_orphaned_blocks(config: &ManageUnknownBlocksConfig, unknown_blocks: &mut LinkedHashMap) -> Option> { let mut unknown_to_remove: Vec = Vec::new(); - let mut remove_num = if unknown_blocks.len() > UNKNOWN_BLOCKS_MAX_LEN { UNKNOWN_BLOCKS_MAX_LEN - unknown_blocks.len() } else { 0 }; + let mut remove_num = if unknown_blocks.len() > config.max_number { unknown_blocks.len() - config.max_number } else { 0 }; let now = precise_time_s(); for (hash, time) in unknown_blocks.iter() { // remove oldest blocks if there are more unknown blocks that we can hold in memory @@ -52,8 +83,8 @@ pub fn manage_unknown_orphaned_blocks(unknown_blocks: &mut LinkedHashMap = LinkedHashMap::new(); + unknown_blocks.insert(H256::from(0), precise_time_s()); + assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), None); + assert_eq!(unknown_blocks.len(), 1); + } + + #[test] + fn manage_unknown_blocks_by_time() { + use std::thread::sleep; + use std::time::Duration; + let config = ManageUnknownBlocksConfig { removal_time_ms: 0, max_number: 100 }; + let mut unknown_blocks: LinkedHashMap = LinkedHashMap::new(); + unknown_blocks.insert(H256::from(0), precise_time_s()); + sleep(Duration::from_millis(1)); + + assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), Some(vec![H256::from(0)])); + assert_eq!(unknown_blocks.len(), 0); + } + + #[test] + fn manage_unknown_blocks_by_max_number() { + let config = ManageUnknownBlocksConfig { removal_time_ms: 100, max_number: 1 }; + let mut unknown_blocks: LinkedHashMap = LinkedHashMap::new(); + unknown_blocks.insert(H256::from(0), precise_time_s()); + unknown_blocks.insert(H256::from(1), precise_time_s()); + assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), Some(vec![H256::from(0)])); + assert_eq!(unknown_blocks.len(), 1); + } +}