Merge pull request #115 from ethcore/management_tests

Synchronization management tests && fixes
This commit is contained in:
Marek Kotewicz 2016-11-14 08:17:58 +01:00 committed by GitHub
commit 62e1d9c288
2 changed files with 118 additions and 15 deletions

View File

@ -20,7 +20,7 @@ use synchronization_chain::{Chain, ChainRef, BlockState, HeadersIntersection};
use synchronization_chain::{Information as ChainInformation};
use verification::{ChainVerifier, Error as VerificationError, Verify};
use synchronization_executor::{Task, TaskExecutor};
use synchronization_manager::{manage_synchronization_peers, manage_unknown_orphaned_blocks, MANAGEMENT_INTERVAL_MS};
use synchronization_manager::{manage_synchronization_peers, manage_unknown_orphaned_blocks, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig};
use hash_queue::HashPosition;
use time;
use std::time::Duration;
@ -468,6 +468,8 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
// TODO: start management worker only when synchronization is started
// currently impossible because there is no way to call Interval::new with Remote && Handle is not-Send
{
let peers_config = ManagePeersConfig::default();
let unknown_config = ManageUnknownBlocksConfig::default();
let csync = Arc::downgrade(&sync);
let mut sync = sync.lock();
let management_worker = Interval::new(Duration::from_millis(MANAGEMENT_INTERVAL_MS), handle)
@ -479,7 +481,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
};
let mut client = client.lock();
if client.state.is_synchronizing() || client.state.is_nearly_saturated() {
let blocks_to_request = manage_synchronization_peers(&mut client.peers);
let blocks_to_request = manage_synchronization_peers(&peers_config, &mut client.peers);
// if no peers left => we are saturated
if !client.peers.any() {
client.switch_to_saturated_state(false);
@ -487,7 +489,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
client.execute_synchronization_tasks(blocks_to_request);
}
if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&mut client.unknown_blocks) {
if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, &mut client.unknown_blocks) {
client.remove_orphaned_blocks(orphans_to_remove.into_iter().collect());
}
}

View File

@ -6,32 +6,63 @@ use primitives::hash::H256;
/// Management interval (in ms)
pub const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000;
/// Response time to decrease peer score
const PEER_FAILURE_INTERVAL_S: f64 = 5f64;
const DEFAULT_PEER_FAILURE_INTERVAL_MS: u32 = 5 * 1000;
/// Unknown orphan block removal time
const UNKNOWN_BLOCK_REMOVAL_TIME_S: f64 = 20f64 * 60f64;
const DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS: u32 = 20 * 60 * 1000;
/// Maximal number of orphaned blocks
const UNKNOWN_BLOCKS_MAX_LEN: usize = 16;
const DEFAULT_UNKNOWN_BLOCKS_MAX_LEN: usize = 16;
/// Peers management configuration
pub struct ManagePeersConfig {
/// Time interval (in milliseconds) to wait answer from the peer before penalizing && reexecuting tasks
pub failure_interval_ms: u32,
}
impl Default for ManagePeersConfig {
fn default() -> Self {
ManagePeersConfig {
failure_interval_ms: DEFAULT_PEER_FAILURE_INTERVAL_MS,
}
}
}
/// Unknown blocks management configuration
pub struct ManageUnknownBlocksConfig {
/// Time interval (in milliseconds) to wait before removing unknown blocks from in-memory pool
pub removal_time_ms: u32,
/// Maximal # of unknown blocks in the in-memory pool
pub max_number: usize,
}
impl Default for ManageUnknownBlocksConfig {
fn default() -> Self {
ManageUnknownBlocksConfig {
removal_time_ms: DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS,
max_number: DEFAULT_UNKNOWN_BLOCKS_MAX_LEN,
}
}
}
/// Manage stalled synchronization peers tasks
pub fn manage_synchronization_peers(peers: &mut Peers) -> Option<Vec<H256>> {
pub fn manage_synchronization_peers(config: &ManagePeersConfig, peers: &mut Peers) -> Option<Vec<H256>> {
let mut blocks_to_request: Vec<H256> = Vec::new();
let now = precise_time_s();
// reset tasks for peers, which has not responded during given period
for (worst_peer_index, worst_peer_time) in peers.worst_peers() {
// check if peer has not responded within given time
let time_diff = worst_peer_time - now;
if time_diff <= PEER_FAILURE_INTERVAL_S {
let time_diff = now - worst_peer_time;
if time_diff <= config.failure_interval_ms as f64 / 1000f64 {
break;
}
// decrease score && move to the idle queue
trace!(target: "sync", "Failed to get response from peer#{} in {} seconds", worst_peer_index, time_diff);
warn!(target: "sync", "Failed to get response from peer#{} in {} seconds", worst_peer_index, time_diff);
let peer_tasks = peers.reset_tasks(worst_peer_index);
blocks_to_request.extend(peer_tasks);
// if peer failed many times => forget it
if peers.on_peer_failure(worst_peer_index) {
trace!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index);
warn!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index);
}
}
@ -39,9 +70,9 @@ pub fn manage_synchronization_peers(peers: &mut Peers) -> Option<Vec<H256>> {
}
/// Manage unknown orphaned blocks
pub fn manage_unknown_orphaned_blocks(unknown_blocks: &mut LinkedHashMap<H256, f64>) -> Option<Vec<H256>> {
pub fn manage_unknown_orphaned_blocks(config: &ManageUnknownBlocksConfig, unknown_blocks: &mut LinkedHashMap<H256, f64>) -> Option<Vec<H256>> {
let mut unknown_to_remove: Vec<H256> = Vec::new();
let mut remove_num = if unknown_blocks.len() > UNKNOWN_BLOCKS_MAX_LEN { UNKNOWN_BLOCKS_MAX_LEN - unknown_blocks.len() } else { 0 };
let mut remove_num = if unknown_blocks.len() > config.max_number { unknown_blocks.len() - config.max_number } else { 0 };
let now = precise_time_s();
for (hash, time) in unknown_blocks.iter() {
// remove oldest blocks if there are more unknown blocks that we can hold in memory
@ -52,8 +83,8 @@ pub fn manage_unknown_orphaned_blocks(unknown_blocks: &mut LinkedHashMap<H256, f
}
// check if block is unknown for too long
let time_diff = time - now;
if time_diff <= UNKNOWN_BLOCK_REMOVAL_TIME_S {
let time_diff = now - time;
if time_diff <= config.removal_time_ms as f64 / 1000f64 {
break;
}
unknown_to_remove.push(hash.clone());
@ -66,3 +97,73 @@ pub fn manage_unknown_orphaned_blocks(unknown_blocks: &mut LinkedHashMap<H256, f
if unknown_to_remove.is_empty() { None } else { Some(unknown_to_remove) }
}
#[cfg(test)]
mod tests {
use time::precise_time_s;
use linked_hash_map::LinkedHashMap;
use super::{ManagePeersConfig, ManageUnknownBlocksConfig, manage_synchronization_peers, manage_unknown_orphaned_blocks};
use synchronization_peers::Peers;
use primitives::hash::H256;
#[test]
fn manage_good_peer() {
let config = ManagePeersConfig { failure_interval_ms: 1000, };
let mut peers = Peers::new();
peers.on_blocks_requested(1, &vec![H256::from(0), H256::from(1)]);
peers.on_block_received(1, &H256::from(0));
assert_eq!(manage_synchronization_peers(&config, &mut peers), None);
assert_eq!(peers.idle_peers(), vec![]);
}
#[test]
fn manage_bad_peers() {
use std::thread::sleep;
use std::time::Duration;
let config = ManagePeersConfig { failure_interval_ms: 0, };
let mut peers = Peers::new();
peers.on_blocks_requested(1, &vec![H256::from(0)]);
peers.on_blocks_requested(2, &vec![H256::from(1)]);
sleep(Duration::from_millis(1));
let managed_tasks = manage_synchronization_peers(&config, &mut peers).expect("managed tasks");
assert!(managed_tasks.contains(&H256::from(0)));
assert!(managed_tasks.contains(&H256::from(1)));
let idle_peers = peers.idle_peers();
assert_eq!(2, idle_peers.len());
assert!(idle_peers.contains(&1));
assert!(idle_peers.contains(&2));
}
#[test]
fn manage_unknown_blocks_good() {
let config = ManageUnknownBlocksConfig { removal_time_ms: 1000, max_number: 100 };
let mut unknown_blocks: LinkedHashMap<H256, f64> = LinkedHashMap::new();
unknown_blocks.insert(H256::from(0), precise_time_s());
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), None);
assert_eq!(unknown_blocks.len(), 1);
}
#[test]
fn manage_unknown_blocks_by_time() {
use std::thread::sleep;
use std::time::Duration;
let config = ManageUnknownBlocksConfig { removal_time_ms: 0, max_number: 100 };
let mut unknown_blocks: LinkedHashMap<H256, f64> = LinkedHashMap::new();
unknown_blocks.insert(H256::from(0), precise_time_s());
sleep(Duration::from_millis(1));
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), Some(vec![H256::from(0)]));
assert_eq!(unknown_blocks.len(), 0);
}
#[test]
fn manage_unknown_blocks_by_max_number() {
let config = ManageUnknownBlocksConfig { removal_time_ms: 100, max_number: 1 };
let mut unknown_blocks: LinkedHashMap<H256, f64> = LinkedHashMap::new();
unknown_blocks.insert(H256::from(0), precise_time_s());
unknown_blocks.insert(H256::from(1), precise_time_s());
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), Some(vec![H256::from(0)]));
assert_eq!(unknown_blocks.len(), 1);
}
}