Merge pull request #116 from ethcore/sync_utilize_all_peers

Ask all known peers about inventory when entering saturated state
This commit is contained in:
Svyatoslav Nikolsky 2016-11-15 12:07:04 +03:00 committed by GitHub
commit 711b32c6b8
4 changed files with 425 additions and 186 deletions

View File

@ -112,6 +112,11 @@ impl<T, U, V> LocalNode<T, U, V> where T: SynchronizationTaskExecutor + PeersCon
pub fn on_peer_getheaders(&self, peer_index: usize, message: types::GetHeaders) {
trace!(target: "sync", "Got `getheaders` message from peer#{}", peer_index);
// do not serve getheaders requests until we are synchronized
if self.client.lock().state().is_synchronizing() {
return;
}
// simulating bitcoind for passing tests: if we are in nearly-saturated state
// and peer, which has just provided a new blocks to us, is asking for headers
// => do not serve getheaders until we have fully process his blocks + wait until headers are served before returning

View File

@ -20,7 +20,8 @@ use synchronization_chain::{Chain, ChainRef, BlockState, HeadersIntersection};
use synchronization_chain::{Information as ChainInformation};
use verification::{ChainVerifier, Error as VerificationError, Verify};
use synchronization_executor::{Task, TaskExecutor};
use synchronization_manager::{manage_synchronization_peers, manage_unknown_orphaned_blocks, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig};
use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory,
manage_unknown_orphaned_blocks, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig};
use hash_queue::HashPosition;
use time;
use std::time::Duration;
@ -184,6 +185,7 @@ enum VerificationTask {
/// Synchronization client trait
pub trait Client : Send + 'static {
fn best_block(&self) -> db::BestBlock;
fn state(&self) -> State;
fn on_new_blocks_inventory(&mut self, peer_index: usize, peer_hashes: Vec<H256>);
fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec<BlockHeader>);
fn on_peer_block(&mut self, peer_index: usize, block: Block);
@ -289,6 +291,11 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
self.chain.read().best_block()
}
/// Get synchronization state
fn state(&self) -> State {
self.state.clone()
}
/// Try to queue synchronization of unknown blocks when new inventory is received.
fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec<H256>) {
// we use headers-first synchronization
@ -343,6 +350,9 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
blocks_hashes
};
// update peers to select next tasks
self.peers.on_inventory_received(peer_index);
// now insert unknown blocks to the queue
self.process_new_blocks_headers(peer_index, blocks_hashes, blocks_headers);
self.execute_synchronization_tasks(None);
@ -362,8 +372,11 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
/// Peer disconnected.
fn on_peer_disconnected(&mut self, peer_index: usize) {
// when last peer is disconnected, reset, but let verifying blocks be verified
if self.peers.on_peer_disconnected(peer_index) {
self.switch_to_saturated_state(false);
let peer_tasks = self.peers.on_peer_disconnected(peer_index);
if !self.peers.has_any_useful() {
self.switch_to_saturated_state();
} else if peer_tasks.is_some() {
self.execute_synchronization_tasks(peer_tasks);
}
}
@ -480,15 +493,12 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
None => return Ok(()),
};
let mut client = client.lock();
client.print_synchronization_information();
if client.state.is_synchronizing() || client.state.is_nearly_saturated() {
let blocks_to_request = manage_synchronization_peers(&peers_config, &mut client.peers);
// if no peers left => we are saturated
if !client.peers.any() {
client.switch_to_saturated_state(false);
} else {
client.execute_synchronization_tasks(blocks_to_request);
}
let blocks_to_request = manage_synchronization_peers_blocks(&peers_config, &mut client.peers);
client.execute_synchronization_tasks(blocks_to_request);
manage_synchronization_peers_inventory(&peers_config, &mut client.peers);
if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, &mut client.unknown_blocks) {
client.remove_orphaned_blocks(orphans_to_remove.into_iter().collect());
}
@ -528,13 +538,13 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
trace!(target: "sync", "Ignoring {} headers from peer#{}. All blocks are known and in database.", headers.len(), peer_index);
if self.state.is_synchronizing() {
// remember peer as useful
self.peers.insert(peer_index);
self.peers.useful_peer(peer_index);
}
},
HeadersIntersection::InMemoryNoNewBlocks => {
trace!(target: "sync", "Ignoring {} headers from peer#{}. All blocks are known and in memory.", headers.len(), peer_index);
// remember peer as useful
self.peers.insert(peer_index);
self.peers.useful_peer(peer_index);
},
HeadersIntersection::InMemoryMainNewBlocks(new_block_index)
| HeadersIntersection::InMemoryForkNewBlocks(new_block_index)
@ -544,9 +554,10 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
let new_blocks_hashes = hashes.split_off(new_block_index);
let new_blocks_headers = headers.split_off(new_block_index);
let new_blocks_hashes_len = new_blocks_hashes.len();
trace!(target: "sync", "Sch. {} headers from peer#{}. First {:?}, last: {:?}", new_blocks_hashes_len, peer_index, new_blocks_hashes[0], new_blocks_hashes[new_blocks_hashes_len - 1]);
chain.schedule_blocks_headers(new_blocks_hashes, new_blocks_headers);
// remember peer as useful
self.peers.insert(peer_index);
self.peers.useful_peer(peer_index);
// switch to synchronization state
if !self.state.is_synchronizing() {
if new_blocks_hashes_len == 1 && !self.state.is_nearly_saturated() {
@ -567,7 +578,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
match chain.block_state(&block_hash) {
BlockState::Verifying | BlockState::Stored => {
// remember peer as useful
self.peers.insert(peer_index);
self.peers.useful_peer(peer_index);
},
BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => {
// check parent block state
@ -593,7 +604,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
},
BlockState::Verifying | BlockState::Stored => {
// remember peer as useful
self.peers.insert(peer_index);
self.peers.useful_peer(peer_index);
// forget block
chain.forget_leave_header(&block_hash);
// schedule verification
@ -632,7 +643,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
},
BlockState::Requested | BlockState::Scheduled => {
// remember peer as useful
self.peers.insert(peer_index);
self.peers.useful_peer(peer_index);
// remember as orphan block
self.orphaned_blocks
.entry(block.block_header.previous_header_hash.clone())
@ -649,69 +660,69 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
};
if switch_to_saturated {
self.switch_to_saturated_state(true);
self.switch_to_saturated_state();
}
}
/// Schedule new synchronization tasks, if any.
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>) {
let mut tasks: Vec<Task> = Vec::new();
let idle_peers = self.peers.idle_peers();
let idle_peers_len = idle_peers.len() as u32;
// display information if processed many blocks || enough time has passed since sync start
self.print_synchronization_information();
// if some blocks requests are forced => we should ask peers even if there are no idle peers
if let Some(forced_blocks_requests) = forced_blocks_requests {
let all_peers = self.peers.all_peers();
tasks.extend(self.prepare_blocks_requests_tasks(all_peers, forced_blocks_requests));
let useful_peers = self.peers.useful_peers();
// if we have to request blocks && there are no useful peers at all => switch to saturated state
if useful_peers.is_empty() {
warn!(target: "sync", "Last peer was marked as non-useful. Moving to saturated state.");
self.switch_to_saturated_state();
return;
}
let forced_tasks = self.prepare_blocks_requests_tasks(useful_peers, forced_blocks_requests);
tasks.extend(forced_tasks);
}
// prepare 'normal' synchronization tasks
if idle_peers_len != 0 {
let blocks_to_request = {
// display information if processed many blocks || enough time has passed since sync start
let mut blocks_requests: Option<Vec<H256>> = None;
let blocks_idle_peers = self.peers.idle_peers_for_blocks();
{
// check if we can query some blocks hashes
let inventory_idle_peers = self.peers.idle_peers_for_inventory();
if !inventory_idle_peers.is_empty() {
let scheduled_hashes_len = { self.chain.read().length_of_state(BlockState::Scheduled) };
if scheduled_hashes_len < MAX_SCHEDULED_HASHES {
for inventory_peer in inventory_idle_peers.iter() {
self.peers.on_inventory_requested(*inventory_peer);
}
let inventory_tasks = inventory_idle_peers.into_iter().map(|p| Task::RequestBlocksHeaders(p));
tasks.extend(inventory_tasks);
}
}
// check if we can move some blocks from scheduled to requested queue
let blocks_idle_peers_len = blocks_idle_peers.len() as u32;
if blocks_idle_peers_len != 0 {
let mut chain = self.chain.write();
if let State::Synchronizing(timestamp, num_of_blocks) = self.state {
let new_timestamp = time::precise_time_s();
let timestamp_diff = new_timestamp - timestamp;
let new_num_of_blocks = chain.best_storage_block().number;
let blocks_diff = if new_num_of_blocks > num_of_blocks { new_num_of_blocks - num_of_blocks } else { 0 };
if timestamp_diff >= 60.0 || blocks_diff > 1000 {
self.state = State::Synchronizing(time::precise_time_s(), new_num_of_blocks);
use time;
info!(target: "sync", "{:?} @ Processed {} blocks in {} seconds. Chain information: {:?}"
, time::strftime("%H:%M:%S", &time::now()).unwrap()
, blocks_diff, timestamp_diff
, chain.information());
}
}
// check if we can query some blocks hashes
let scheduled_hashes_len = chain.length_of_state(BlockState::Scheduled);
if self.state.is_synchronizing() {
if scheduled_hashes_len < MAX_SCHEDULED_HASHES {
tasks.push(Task::RequestBlocksHeaders(idle_peers[0]));
self.peers.on_inventory_requested(idle_peers[0]);
}
}
// check if we can move some blocks from scheduled to requested queue
let requested_hashes_len = chain.length_of_state(BlockState::Requested);
let verifying_hashes_len = chain.length_of_state(BlockState::Verifying);
if requested_hashes_len + verifying_hashes_len < MAX_REQUESTED_BLOCKS + MAX_VERIFYING_BLOCKS && scheduled_hashes_len != 0 {
let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(scheduled_hashes_len / idle_peers_len, MIN_BLOCKS_IN_REQUEST));
let hashes_to_request_len = chunk_size * idle_peers_len;
Some(chain.request_blocks_hashes(hashes_to_request_len))
} else {
None
let chunk_size = min(MAX_BLOCKS_IN_REQUEST, max(scheduled_hashes_len / blocks_idle_peers_len, MIN_BLOCKS_IN_REQUEST));
let hashes_to_request_len = chunk_size * blocks_idle_peers_len;
let hashes_to_request = chain.request_blocks_hashes(hashes_to_request_len);
blocks_requests = Some(hashes_to_request);
}
};
if let Some(blocks_to_request) = blocks_to_request {
tasks.extend(self.prepare_blocks_requests_tasks(idle_peers, blocks_to_request));
}
}
// append blocks requests tasks
if let Some(blocks_requests) = blocks_requests {
tasks.extend(self.prepare_blocks_requests_tasks(blocks_idle_peers, blocks_requests));
}
// execute synchronization tasks
for task in tasks {
self.executor.lock().execute(task);
@ -743,7 +754,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
/// Switch to saturated state
fn switch_to_saturated_state(&mut self, ask_for_inventory: bool) {
fn switch_to_saturated_state(&mut self) {
if self.state.is_saturated() {
return;
}
@ -770,11 +781,12 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
chain.information());
}
if ask_for_inventory {
// finally - ask all known peers for their best blocks inventory, in case if some peer
// has lead us to the fork
{
let mut executor = self.executor.lock();
for idle_peer in self.peers.idle_peers() {
self.peers.on_inventory_requested(idle_peer);
executor.execute(Task::RequestBlocksHeaders(idle_peer));
for peer in self.peers.all_peers() {
executor.execute(Task::RequestBlocksHeaders(peer));
}
}
}
@ -850,6 +862,27 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
}
/// Print synchronization information
fn print_synchronization_information(&mut self) {
if let State::Synchronizing(timestamp, num_of_blocks) = self.state {
let chain = self.chain.read();
let new_timestamp = time::precise_time_s();
let timestamp_diff = new_timestamp - timestamp;
let new_num_of_blocks = chain.best_storage_block().number;
let blocks_diff = if new_num_of_blocks > num_of_blocks { new_num_of_blocks - num_of_blocks } else { 0 };
if timestamp_diff >= 60.0 || blocks_diff > 1000 {
self.state = State::Synchronizing(time::precise_time_s(), new_num_of_blocks);
use time;
info!(target: "sync", "{:?} @ Processed {} blocks in {} seconds. Chain information: {:?}"
, time::strftime("%H:%M:%S", &time::now()).unwrap()
, blocks_diff, timestamp_diff
, chain.information());
}
}
}
/// Thread procedure for handling verification tasks
fn verification_worker_proc(sync: Arc<Mutex<Self>>, storage: Arc<db::Store>, work_receiver: Receiver<VerificationTask>) {
let verifier = ChainVerifier::new(storage);
@ -942,7 +975,7 @@ pub mod tests {
sync.on_new_blocks_headers(5, vec![block1.block_header.clone()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocks(5, vec![block1.hash()])]);
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(5), Task::RequestBlocks(5, vec![block1.hash()])]);
assert!(sync.information().state.is_nearly_saturated());
assert_eq!(sync.information().orphaned, 0);
assert_eq!(sync.information().chain.scheduled, 0);
@ -1010,7 +1043,7 @@ pub mod tests {
// synchronization has started && new blocks have been requested
let tasks = executor.lock().take_tasks();
assert!(sync.information().state.is_nearly_saturated());
assert_eq!(tasks, vec![Task::RequestBlocks(1, vec![block1.hash()])]);
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::RequestBlocks(1, vec![block1.hash()])]);
}
{
@ -1324,4 +1357,39 @@ pub mod tests {
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocks(1, vec![test_data::block_h1().hash()])]);
}
#[test]
fn blocks_rerequested_on_peer_disconnect() {
let (_, _, executor, _, sync) = create_sync(None);
let block1: Block = test_data::block_h1();
let block2: Block = test_data::block_h2();
{
let mut sync = sync.lock();
// receive inventory from new peer#1
sync.on_new_blocks_headers(1, vec![block1.block_header.clone()]);
// synchronization has started && new blocks have been requested
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(1), Task::RequestBlocks(1, vec![block1.hash()])]);
}
{
let mut sync = sync.lock();
// receive inventory from new peer#2
sync.on_new_blocks_headers(2, vec![block1.block_header.clone(), block2.block_header.clone()]);
// synchronization has started && new blocks have been requested
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocksHeaders(2), Task::RequestBlocks(2, vec![block2.hash()])]);
}
{
let mut sync = sync.lock();
// peer#1 is disconnected && it has pending blocks requests => ask peer#2
sync.on_peer_disconnected(1);
// blocks have been requested
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocks(2, vec![block1.hash()])]);
}
}
}

View File

@ -5,8 +5,10 @@ use primitives::hash::H256;
/// Management interval (in ms)
pub const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000;
/// Response time to decrease peer score
const DEFAULT_PEER_FAILURE_INTERVAL_MS: u32 = 5 * 1000;
/// Response time before getting block to decrease peer score
const DEFAULT_PEER_BLOCK_FAILURE_INTERVAL_MS: u32 = 5 * 1000;
/// Response time before getting inventory to decrease peer score
const DEFAULT_PEER_INVENTORY_FAILURE_INTERVAL_MS: u32 = 5 * 1000;
/// Unknown orphan block removal time
const DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS: u32 = 20 * 60 * 1000;
/// Maximal number of orphaned blocks
@ -14,14 +16,17 @@ const DEFAULT_UNKNOWN_BLOCKS_MAX_LEN: usize = 16;
/// Peers management configuration
pub struct ManagePeersConfig {
/// Time interval (in milliseconds) to wait answer from the peer before penalizing && reexecuting tasks
pub failure_interval_ms: u32,
/// Time interval (in milliseconds) to wait block from the peer before penalizing && reexecuting tasks
pub block_failure_interval_ms: u32,
/// Time interval (in milliseconds) to wait inventory from the peer before penalizing && reexecuting tasks
pub inventory_failure_interval_ms: u32,
}
impl Default for ManagePeersConfig {
fn default() -> Self {
ManagePeersConfig {
failure_interval_ms: DEFAULT_PEER_FAILURE_INTERVAL_MS,
block_failure_interval_ms: DEFAULT_PEER_BLOCK_FAILURE_INTERVAL_MS,
inventory_failure_interval_ms: DEFAULT_PEER_INVENTORY_FAILURE_INTERVAL_MS,
}
}
}
@ -43,25 +48,25 @@ impl Default for ManageUnknownBlocksConfig {
}
}
/// Manage stalled synchronization peers tasks
pub fn manage_synchronization_peers(config: &ManagePeersConfig, peers: &mut Peers) -> Option<Vec<H256>> {
/// Manage stalled synchronization peers blocks tasks
pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &mut Peers) -> Option<Vec<H256>> {
let mut blocks_to_request: Vec<H256> = Vec::new();
let now = precise_time_s();
// reset tasks for peers, which has not responded during given period
for (worst_peer_index, worst_peer_time) in peers.worst_peers() {
for (worst_peer_index, worst_peer_time) in peers.ordered_blocks_requests() {
// check if peer has not responded within given time
let time_diff = now - worst_peer_time;
if time_diff <= config.failure_interval_ms as f64 / 1000f64 {
if time_diff <= config.block_failure_interval_ms as f64 / 1000f64 {
break;
}
// decrease score && move to the idle queue
warn!(target: "sync", "Failed to get response from peer#{} in {} seconds", worst_peer_index, time_diff);
let peer_tasks = peers.reset_tasks(worst_peer_index);
warn!(target: "sync", "Failed to get requested block from peer#{} in {} seconds", worst_peer_index, time_diff);
let peer_tasks = peers.reset_blocks_tasks(worst_peer_index);
blocks_to_request.extend(peer_tasks);
// if peer failed many times => forget it
if peers.on_peer_failure(worst_peer_index) {
if peers.on_peer_block_failure(worst_peer_index) {
warn!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index);
}
}
@ -69,6 +74,21 @@ pub fn manage_synchronization_peers(config: &ManagePeersConfig, peers: &mut Peer
if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) }
}
/// Manage stalled synchronization peers inventory tasks
pub fn manage_synchronization_peers_inventory(config: &ManagePeersConfig, peers: &mut Peers) {
let now = precise_time_s();
// reset tasks for peers, which has not responded during given period
for (worst_peer_index, worst_peer_time) in peers.ordered_inventory_requests() {
// check if peer has not responded within given time
let time_diff = now - worst_peer_time;
if time_diff <= config.inventory_failure_interval_ms as f64 / 1000f64 {
break;
}
peers.on_peer_inventory_failure(worst_peer_index);
}
}
/// Manage unknown orphaned blocks
pub fn manage_unknown_orphaned_blocks(config: &ManageUnknownBlocksConfig, unknown_blocks: &mut LinkedHashMap<H256, f64>) -> Option<Vec<H256>> {
let mut unknown_to_remove: Vec<H256> = Vec::new();
@ -102,34 +122,34 @@ pub fn manage_unknown_orphaned_blocks(config: &ManageUnknownBlocksConfig, unknow
mod tests {
use time::precise_time_s;
use linked_hash_map::LinkedHashMap;
use super::{ManagePeersConfig, ManageUnknownBlocksConfig, manage_synchronization_peers, manage_unknown_orphaned_blocks};
use super::{ManagePeersConfig, ManageUnknownBlocksConfig, manage_synchronization_peers_blocks, manage_unknown_orphaned_blocks};
use synchronization_peers::Peers;
use primitives::hash::H256;
#[test]
fn manage_good_peer() {
let config = ManagePeersConfig { failure_interval_ms: 1000, };
let config = ManagePeersConfig { block_failure_interval_ms: 1000, ..Default::default() };
let mut peers = Peers::new();
peers.on_blocks_requested(1, &vec![H256::from(0), H256::from(1)]);
peers.on_block_received(1, &H256::from(0));
assert_eq!(manage_synchronization_peers(&config, &mut peers), None);
assert_eq!(peers.idle_peers(), vec![]);
assert_eq!(manage_synchronization_peers_blocks(&config, &mut peers), None);
assert_eq!(peers.idle_peers_for_blocks(), vec![]);
}
#[test]
fn manage_bad_peers() {
use std::thread::sleep;
use std::time::Duration;
let config = ManagePeersConfig { failure_interval_ms: 0, };
let config = ManagePeersConfig { block_failure_interval_ms: 0, ..Default::default() };
let mut peers = Peers::new();
peers.on_blocks_requested(1, &vec![H256::from(0)]);
peers.on_blocks_requested(2, &vec![H256::from(1)]);
sleep(Duration::from_millis(1));
let managed_tasks = manage_synchronization_peers(&config, &mut peers).expect("managed tasks");
let managed_tasks = manage_synchronization_peers_blocks(&config, &mut peers).expect("managed tasks");
assert!(managed_tasks.contains(&H256::from(0)));
assert!(managed_tasks.contains(&H256::from(1)));
let idle_peers = peers.idle_peers();
let idle_peers = peers.idle_peers_for_blocks();
assert_eq!(2, idle_peers.len());
assert!(idle_peers.contains(&1));
assert!(idle_peers.contains(&2));

View File

@ -5,28 +5,36 @@ use linked_hash_map::LinkedHashMap;
use time::precise_time_s;
/// Max peer failures # before excluding from sync process
const MAX_PEER_FAILURES: usize = 8;
const MAX_PEER_FAILURES: usize = 2;
/// Set of peers selected for synchronization.
#[derive(Debug)]
pub struct Peers {
/// Peers that have no pending requests.
/// Peers that are marked as useful for current synchronization session && have no pending requests.
idle: HashSet<usize>,
/// Pending requests by peer.
requests: HashMap<usize, HashSet<H256>>,
/// Peers failures.
/// Peers that are marked as non-useful for current synchronization session && have no pending requests.
unuseful: HashSet<usize>,
/// # of failures for given peer.
failures: HashMap<usize, usize>,
/// Last message time from peer
times: LinkedHashMap<usize, f64>,
/// Peers that are marked as useful for current synchronization session && have pending blocks requests.
blocks_requests: HashMap<usize, HashSet<H256>>,
/// Last block message time from peer.
blocks_requests_order: LinkedHashMap<usize, f64>,
/// Peers that are marked as useful for current synchronization session && have pending requests.
inventory_requests: HashSet<usize>,
/// Last inventory message time from peer.
inventory_requests_order: LinkedHashMap<usize, f64>,
}
/// Information on synchronization peers
#[cfg(test)]
#[derive(Debug)]
pub struct Information {
/// Number of currently idle synchronization peers.
/// # of peers that are marked as useful for current synchronization session && have no pending requests.
pub idle: usize,
/// Number of currently active synchronization peers.
/// # of peers that are marked as non-useful for current synchronization session && have no pending requests.
pub unuseful: usize,
/// # of peers that are marked as useful for current synchronization session && have pending requests.
pub active: usize,
}
@ -34,95 +42,172 @@ impl Peers {
pub fn new() -> Peers {
Peers {
idle: HashSet::new(),
requests: HashMap::new(),
unuseful: HashSet::new(),
failures: HashMap::new(),
times: LinkedHashMap::new(),
blocks_requests: HashMap::new(),
blocks_requests_order: LinkedHashMap::new(),
inventory_requests: HashSet::new(),
inventory_requests_order: LinkedHashMap::new(),
}
}
/// Get information on synchronization peers
#[cfg(test)]
pub fn information(&self) -> Information {
let blocks_requests_peers: HashSet<_> = self.blocks_requests.keys().cloned().collect();
let total_unuseful_peers = self.unuseful.difference(&self.inventory_requests).count();
let total_active_peers = blocks_requests_peers.union(&self.inventory_requests).count();
Information {
idle: self.idle.len(),
active: self.requests.len(),
unuseful: total_unuseful_peers,
active: total_active_peers,
}
}
/// Has any peers?
pub fn any(&self) -> bool {
!self.idle.is_empty() || !self.requests.is_empty()
/// Has any useful peers?
pub fn has_any_useful(&self) -> bool {
!self.idle.is_empty()
|| !self.blocks_requests.is_empty()
|| !self.inventory_requests.is_empty()
}
/// Get idle peer.
#[cfg(test)]
pub fn idle_peer(&self) -> Option<usize> {
self.idle.iter().cloned().next()
}
/// Get all peers.
/// Get all peers
pub fn all_peers(&self) -> Vec<usize> {
self.idle.iter().cloned().chain(self.requests.keys().cloned()).collect()
self.idle.iter().cloned()
.chain(self.unuseful.iter().cloned())
.chain(self.blocks_requests.keys().cloned())
.chain(self.inventory_requests.iter().cloned())
.collect()
}
/// Get useful peers
pub fn useful_peers(&self) -> Vec<usize> {
self.idle.iter().cloned()
.chain(self.blocks_requests.keys().cloned())
.chain(self.inventory_requests.iter().cloned())
.collect()
}
/// Get idle peers for inventory request.
pub fn idle_peers_for_inventory(&self) -> Vec<usize> {
let peers: HashSet<_> = self.idle.iter().cloned()
.chain(self.blocks_requests.keys().cloned())
.collect();
let except: HashSet<_> = self.inventory_requests.iter().cloned().collect();
peers.difference(&except).cloned().collect()
}
/// Get idle peers.
pub fn idle_peers(&self) -> Vec<usize> {
self.idle.iter().cloned().collect()
pub fn idle_peers_for_blocks(&self) -> Vec<usize> {
let peers: HashSet<_> = self.idle.iter().cloned()
.chain(self.inventory_requests.iter().cloned())
.collect();
let except: HashSet<_> = self.blocks_requests.keys().cloned().collect();
peers.difference(&except).cloned().collect()
}
/// Get worst peer.
pub fn worst_peers(&self) -> Vec<(usize, f64)> {
self.times.iter().map(|(&pi, &t)| (pi, t)).collect()
/// Get active blocks requests, sorted by last response time (oldest first).
pub fn ordered_blocks_requests(&self) -> Vec<(usize, f64)> {
self.blocks_requests_order.iter()
.map(|(&pi, &t)| (pi, t))
.collect()
}
/// Insert new synchronization peer.
pub fn insert(&mut self, peer_index: usize) {
if !self.idle.contains(&peer_index) && !self.requests.contains_key(&peer_index) {
/// Get active inventory requests, sorted by last response time (oldest first).
pub fn ordered_inventory_requests(&self) -> Vec<(usize, f64)> {
self.inventory_requests_order.iter()
.map(|(&pi, &t)| (pi, t))
.collect()
}
/// Mark peer as useful.
pub fn useful_peer(&mut self, peer_index: usize) {
// if peer is unknown => insert to idle queue
// if peer is known && not useful => insert to idle queue
if !self.idle.contains(&peer_index)
&& !self.blocks_requests.contains_key(&peer_index)
&& !self.inventory_requests.contains(&peer_index) {
self.idle.insert(peer_index);
self.unuseful.remove(&peer_index);
self.failures.remove(&peer_index);
}
}
/// Peer has been disconnected
pub fn on_peer_disconnected(&mut self, peer_index: usize) -> bool {
pub fn on_peer_disconnected(&mut self, peer_index: usize) -> Option<Vec<H256>> {
// forget this peer without any chances to reuse
self.idle.remove(&peer_index);
self.requests.remove(&peer_index);
self.unuseful.remove(&peer_index);
self.failures.remove(&peer_index);
self.times.remove(&peer_index);
(self.idle.len() + self.requests.len()) == 0
let peer_blocks_requests = self.blocks_requests.remove(&peer_index);
self.blocks_requests_order.remove(&peer_index);
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
peer_blocks_requests
.map(|hs| hs.into_iter().collect())
}
/// Block is received from peer.
pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) {
if let Entry::Occupied(mut entry) = self.requests.entry(peer_index) {
entry.get_mut().remove(block_hash);
if entry.get().is_empty() {
self.idle.insert(peer_index);
entry.remove_entry();
}
// if this is requested block && it is last requested block => remove from blocks_requests
let try_mark_as_idle = match self.blocks_requests.entry(peer_index) {
Entry::Occupied(mut requests_entry) => {
requests_entry.get_mut().remove(block_hash);
self.blocks_requests_order.remove(&peer_index);
if requests_entry.get().is_empty() {
requests_entry.remove_entry();
true
} else {
self.blocks_requests_order.insert(peer_index, precise_time_s());
false
}
},
_ => false,
};
// try to mark as idle
if try_mark_as_idle {
self.try_mark_idle(peer_index);
}
self.on_peer_message(peer_index);
}
/// Inventory received from peer.
pub fn on_inventory_received(&mut self, peer_index: usize) {
// if we have requested inventory => remove from inventory_requests
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
// try to mark as idle
self.try_mark_idle(peer_index);
}
/// Blocks have been requested from peer.
pub fn on_blocks_requested(&mut self, peer_index: usize, blocks_hashes: &[H256]) {
// mark peer as active
self.idle.remove(&peer_index);
self.requests.entry(peer_index).or_insert_with(HashSet::new).extend(blocks_hashes.iter().cloned());
self.times.insert(peer_index, precise_time_s());
self.unuseful.remove(&peer_index);
self.blocks_requests.entry(peer_index).or_insert_with(HashSet::new).extend(blocks_hashes.iter().cloned());
self.blocks_requests_order.remove(&peer_index);
self.blocks_requests_order.insert(peer_index, precise_time_s());
}
/// Inventory has been requested from peer.
pub fn on_inventory_requested(&mut self, peer_index: usize) {
// inventory can only be requested from idle peers
assert!(!self.requests.contains_key(&peer_index));
self.inventory_requests.insert(peer_index);
self.inventory_requests_order.remove(&peer_index);
self.inventory_requests_order.insert(peer_index, precise_time_s());
self.idle.remove(&peer_index);
// peer is now out-of-synchronization process, because:
// mark peer as active
if self.idle.remove(&peer_index) {
self.unuseful.insert(peer_index);
}
// peer is now out-of-synchronization process (will not request blocks from him), because:
// 1) if it has new blocks, it will respond with `inventory` message && will be inserted back here
// 2) if it has no new blocks => either synchronization is completed, or it is behind us in sync
}
/// We have failed to get response from peer during given period
pub fn on_peer_failure(&mut self, peer_index: usize) -> bool {
/// We have failed to get block from peer during given period
pub fn on_peer_block_failure(&mut self, peer_index: usize) -> bool {
let peer_failures = match self.failures.entry(peer_index) {
Entry::Occupied(mut entry) => {
let failures = entry.get() + 1;
@ -134,35 +219,54 @@ impl Peers {
let too_much_failures = peer_failures >= MAX_PEER_FAILURES;
if too_much_failures {
self.idle.remove(&peer_index);
self.unuseful.insert(peer_index);
self.failures.remove(&peer_index);
self.requests.remove(&peer_index);
self.times.remove(&peer_index);
self.blocks_requests.remove(&peer_index);
self.blocks_requests_order.remove(&peer_index);
}
too_much_failures
}
/// Reset peers state
pub fn reset(&mut self) {
self.idle.extend(self.requests.drain().map(|(k, _)| k));
self.failures.clear();
self.times.clear();
/// We have failed to get inventory from peer during given period
pub fn on_peer_inventory_failure(&mut self, peer_index: usize) {
// ignore inventory failures
self.inventory_requests.remove(&peer_index);
self.inventory_requests_order.remove(&peer_index);
if !self.blocks_requests.contains_key(&peer_index) {
self.idle.insert(peer_index);
self.unuseful.remove(&peer_index);
}
}
/// Reset peer tasks
pub fn reset_tasks(&mut self, peer_index: usize) -> Vec<H256> {
let requests = self.requests.remove(&peer_index);
self.times.remove(&peer_index);
self.idle.insert(peer_index);
/// Reset all peers state to the unuseful
pub fn reset(&mut self) {
self.unuseful.extend(self.idle.drain());
self.unuseful.extend(self.blocks_requests.drain().map(|(k, _)| k));
self.unuseful.extend(self.inventory_requests.drain());
self.failures.clear();
self.inventory_requests_order.clear();
self.blocks_requests_order.clear();
}
/// Reset peer tasks && move peer to idle state
pub fn reset_blocks_tasks(&mut self, peer_index: usize) -> Vec<H256> {
let requests = self.blocks_requests.remove(&peer_index);
self.blocks_requests_order.remove(&peer_index);
self.try_mark_idle(peer_index);
requests.expect("empty requests queue is not allowed").into_iter().collect()
}
/// When sync message is received from peer
fn on_peer_message(&mut self, peer_index: usize) {
self.failures.remove(&peer_index);
self.times.remove(&peer_index);
if self.requests.contains_key(&peer_index) {
self.times.insert(peer_index, precise_time_s());
/// Try to mark peer as idle
fn try_mark_idle(&mut self, peer_index: usize) {
if self.blocks_requests.contains_key(&peer_index)
|| self.inventory_requests.contains(&peer_index) {
return;
}
self.idle.insert(peer_index);
self.unuseful.remove(&peer_index);
}
}
@ -174,8 +278,8 @@ mod tests {
#[test]
fn peers_empty_on_start() {
let peers = Peers::new();
assert_eq!(peers.idle_peer(), None);
assert_eq!(peers.idle_peers().len(), 0);
assert_eq!(peers.idle_peers_for_blocks(), vec![]);
assert_eq!(peers.idle_peers_for_inventory(), vec![]);
let info = peers.information();
assert_eq!(info.idle, 0);
@ -183,87 +287,110 @@ mod tests {
}
#[test]
fn peers_all_idle_after_reset() {
fn peers_all_unuseful_after_reset() {
let mut peers = Peers::new();
peers.on_blocks_requested(7, &vec![H256::default()]);
peers.on_blocks_requested(8, &vec![H256::default()]);
assert_eq!(peers.information().idle, 0);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 2);
peers.reset();
assert_eq!(peers.information().idle, 2);
assert_eq!(peers.information().idle, 0);
assert_eq!(peers.information().unuseful, 2);
assert_eq!(peers.information().active, 0);
}
#[test]
fn peers_removed_after_inventory_request() {
fn peer_idle_after_reset_tasks() {
let mut peers = Peers::new();
peers.insert(5);
peers.insert(7);
peers.on_blocks_requested(7, &vec![H256::default()]);
assert_eq!(peers.information().idle, 0);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 1);
assert_eq!(peers.reset_blocks_tasks(7), vec![H256::default()]);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 0);
}
#[test]
fn peers_active_after_inventory_request() {
let mut peers = Peers::new();
peers.useful_peer(5);
peers.useful_peer(7);
assert_eq!(peers.information().idle, 2);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 0);
peers.on_inventory_requested(5);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 1);
}
#[test]
fn peers_insert_remove_idle() {
let mut peers = Peers::new();
peers.insert(0);
peers.useful_peer(0);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 0);
assert_eq!(peers.idle_peer(), Some(0));
assert_eq!(peers.idle_peers(), vec![0]);
assert_eq!(peers.idle_peers_for_blocks(), vec![0]);
peers.insert(5);
peers.useful_peer(5);
assert_eq!(peers.information().idle, 2);
assert_eq!(peers.information().active, 0);
assert!(peers.idle_peer() == Some(0) || peers.idle_peer() == Some(5));
assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5);
assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5);
let idle_peers = peers.idle_peers_for_blocks();
assert!(idle_peers[0] == 0 || idle_peers[0] == 5);
assert!(idle_peers[1] == 0 || idle_peers[1] == 5);
peers.on_peer_disconnected(7);
assert_eq!(peers.information().idle, 2);
assert_eq!(peers.information().active, 0);
assert!(peers.idle_peer() == Some(0) || peers.idle_peer() == Some(5));
assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5);
assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5);
let idle_peers = peers.idle_peers_for_blocks();
assert!(idle_peers[0] == 0 || idle_peers[0] == 5);
assert!(idle_peers[1] == 0 || idle_peers[1] == 5);
peers.on_peer_disconnected(0);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().active, 0);
assert_eq!(peers.idle_peer(), Some(5));
assert_eq!(peers.idle_peers(), vec![5]);
assert_eq!(peers.idle_peers_for_blocks(), vec![5]);
}
#[test]
fn peers_request_blocks() {
let mut peers = Peers::new();
peers.insert(5);
peers.useful_peer(5);
peers.on_blocks_requested(7, &vec![H256::default()]);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 1);
peers.on_blocks_requested(8, &vec![H256::default()]);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 2);
peers.on_block_received(7, &H256::default());
assert_eq!(peers.information().idle, 2);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 1);
peers.on_block_received(9, &H256::default());
assert_eq!(peers.information().idle, 2);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 1);
peers.on_block_received(8, &"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into());
assert_eq!(peers.information().idle, 2);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 1);
peers.on_block_received(8, &H256::default());
assert_eq!(peers.information().idle, 3);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 0);
}
@ -271,36 +398,55 @@ mod tests {
fn peers_worst() {
let mut peers = Peers::new();
peers.insert(1);
peers.insert(2);
assert_eq!(peers.worst_peers(), vec![]);
peers.useful_peer(1);
peers.useful_peer(2);
assert_eq!(peers.ordered_blocks_requests(), vec![]);
peers.on_blocks_requested(1, &vec![H256::default()]);
assert_eq!(peers.worst_peers().len(), 1);
assert_eq!(peers.worst_peers()[0].0, 1);
assert_eq!(peers.ordered_blocks_requests().len(), 1);
assert_eq!(peers.ordered_blocks_requests()[0].0, 1);
peers.on_blocks_requested(2, &vec![H256::default()]);
assert_eq!(peers.worst_peers().len(), 2);
assert_eq!(peers.worst_peers()[0].0, 1);
assert_eq!(peers.worst_peers()[1].0, 2);
assert_eq!(peers.ordered_blocks_requests().len(), 2);
assert_eq!(peers.ordered_blocks_requests()[0].0, 1);
assert_eq!(peers.ordered_blocks_requests()[1].0, 2);
assert_eq!(peers.information().idle, 0);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 2);
peers.reset_tasks(1);
peers.reset_blocks_tasks(1);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().unuseful, 0);
assert_eq!(peers.information().active, 1);
assert_eq!(peers.worst_peers().len(), 1);
assert_eq!(peers.worst_peers()[0].0, 2);
assert_eq!(peers.ordered_blocks_requests().len(), 1);
assert_eq!(peers.ordered_blocks_requests()[0].0, 2);
for _ in 0..MAX_PEER_FAILURES {
peers.on_peer_failure(2);
peers.on_peer_block_failure(2);
}
assert_eq!(peers.worst_peers().len(), 0);
assert_eq!(peers.ordered_blocks_requests().len(), 0);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().unuseful, 1);
assert_eq!(peers.information().active, 0);
}
#[test]
fn peer_not_inserted_when_known() {
let mut peers = Peers::new();
peers.useful_peer(1);
peers.useful_peer(1);
assert_eq!(peers.information().active + peers.information().idle + peers.information().unuseful, 1);
peers.on_blocks_requested(1, &vec![H256::default()]);
peers.useful_peer(1);
assert_eq!(peers.information().active + peers.information().idle + peers.information().unuseful, 1);
for _ in 0..MAX_PEER_FAILURES {
peers.on_peer_block_failure(1);
}
peers.useful_peer(1);
assert_eq!(peers.information().active + peers.information().idle + peers.information().unuseful, 1);
}
}