ask all peers about inventory when entering saturated state
This commit is contained in:
parent
62e1d9c288
commit
9b4fe77e1e
|
@ -362,8 +362,9 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
|
|||
/// Peer disconnected.
|
||||
fn on_peer_disconnected(&mut self, peer_index: usize) {
|
||||
// when last peer is disconnected, reset, but let verifying blocks be verified
|
||||
if self.peers.on_peer_disconnected(peer_index) {
|
||||
self.switch_to_saturated_state(false);
|
||||
self.peers.on_peer_disconnected(peer_index);
|
||||
if !self.peers.any() {
|
||||
self.switch_to_saturated_state();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -482,12 +483,14 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
let mut client = client.lock();
|
||||
if client.state.is_synchronizing() || client.state.is_nearly_saturated() {
|
||||
let blocks_to_request = manage_synchronization_peers(&peers_config, &mut client.peers);
|
||||
// if no peers left => we are saturated
|
||||
if !client.peers.any() {
|
||||
client.switch_to_saturated_state(false);
|
||||
if blocks_to_request.is_some() {
|
||||
// if no peers able to serve blocks_to_request => we are saturated
|
||||
if client.peers.idle_peers().is_empty() && client.peers.active_peers().is_empty() {
|
||||
client.switch_to_saturated_state();
|
||||
} else {
|
||||
client.execute_synchronization_tasks(blocks_to_request);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, &mut client.unknown_blocks) {
|
||||
client.remove_orphaned_blocks(orphans_to_remove.into_iter().collect());
|
||||
|
@ -649,7 +652,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
};
|
||||
|
||||
if switch_to_saturated {
|
||||
self.switch_to_saturated_state(true);
|
||||
self.switch_to_saturated_state();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -743,7 +746,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
}
|
||||
|
||||
/// Switch to saturated state
|
||||
fn switch_to_saturated_state(&mut self, ask_for_inventory: bool) {
|
||||
fn switch_to_saturated_state(&mut self) {
|
||||
if self.state.is_saturated() {
|
||||
return;
|
||||
}
|
||||
|
@ -770,11 +773,13 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
|
|||
chain.information());
|
||||
}
|
||||
|
||||
if ask_for_inventory {
|
||||
// finally - ask all known peers for their best blocks inventory, in case if some peer
|
||||
// has lead us to the fork
|
||||
{
|
||||
let mut executor = self.executor.lock();
|
||||
for idle_peer in self.peers.idle_peers() {
|
||||
self.peers.on_inventory_requested(idle_peer);
|
||||
executor.execute(Task::RequestBlocksHeaders(idle_peer));
|
||||
for peer in self.peers.all_peers() {
|
||||
self.peers.on_inventory_requested(peer);
|
||||
executor.execute(Task::RequestBlocksHeaders(peer));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ pub fn manage_synchronization_peers(config: &ManagePeersConfig, peers: &mut Peer
|
|||
let mut blocks_to_request: Vec<H256> = Vec::new();
|
||||
let now = precise_time_s();
|
||||
// reset tasks for peers, which has not responded during given period
|
||||
for (worst_peer_index, worst_peer_time) in peers.worst_peers() {
|
||||
for (worst_peer_index, worst_peer_time) in peers.active_peers_order() {
|
||||
// check if peer has not responded within given time
|
||||
let time_diff = now - worst_peer_time;
|
||||
if time_diff <= config.failure_interval_ms as f64 / 1000f64 {
|
||||
|
|
|
@ -10,23 +10,27 @@ const MAX_PEER_FAILURES: usize = 8;
|
|||
/// Set of peers selected for synchronization.
|
||||
#[derive(Debug)]
|
||||
pub struct Peers {
|
||||
/// Peers that have no pending requests.
|
||||
/// Peers that are marked as useful for current synchronization session && have no pending requests.
|
||||
idle: HashSet<usize>,
|
||||
/// Pending requests by peer.
|
||||
requests: HashMap<usize, HashSet<H256>>,
|
||||
/// Peers failures.
|
||||
failures: HashMap<usize, usize>,
|
||||
/// Last message time from peer
|
||||
times: LinkedHashMap<usize, f64>,
|
||||
/// Peers that are marked as non-useful for current synchronization session && have no pending requests.
|
||||
unuseful: HashSet<usize>,
|
||||
/// Peers that are marked as useful for current synchronization session && have pending requests.
|
||||
active: HashMap<usize, HashSet<H256>>,
|
||||
/// # of failures for given peer.
|
||||
active_failures: HashMap<usize, usize>,
|
||||
/// Last message time from peer.
|
||||
active_order: LinkedHashMap<usize, f64>,
|
||||
}
|
||||
|
||||
/// Information on synchronization peers
|
||||
#[cfg(test)]
|
||||
#[derive(Debug)]
|
||||
pub struct Information {
|
||||
/// Number of currently idle synchronization peers.
|
||||
/// # of peers that are marked as useful for current synchronization session && have no pending requests.
|
||||
pub idle: usize,
|
||||
/// Number of currently active synchronization peers.
|
||||
/// # of peers that are marked as non-useful for current synchronization session && have no pending requests.
|
||||
pub unuseful: usize,
|
||||
/// # of peers that are marked as useful for current synchronization session && have pending requests.
|
||||
pub active: usize,
|
||||
}
|
||||
|
||||
|
@ -34,9 +38,10 @@ impl Peers {
|
|||
pub fn new() -> Peers {
|
||||
Peers {
|
||||
idle: HashSet::new(),
|
||||
requests: HashMap::new(),
|
||||
failures: HashMap::new(),
|
||||
times: LinkedHashMap::new(),
|
||||
unuseful: HashSet::new(),
|
||||
active: HashMap::new(),
|
||||
active_failures: HashMap::new(),
|
||||
active_order: LinkedHashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -45,24 +50,24 @@ impl Peers {
|
|||
pub fn information(&self) -> Information {
|
||||
Information {
|
||||
idle: self.idle.len(),
|
||||
active: self.requests.len(),
|
||||
unuseful: self.unuseful.len(),
|
||||
active: self.active.len(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Has any peers?
|
||||
pub fn any(&self) -> bool {
|
||||
!self.idle.is_empty() || !self.requests.is_empty()
|
||||
!self.idle.is_empty()
|
||||
|| !self.unuseful.is_empty()
|
||||
|| !self.active.is_empty()
|
||||
}
|
||||
|
||||
/// Get idle peer.
|
||||
#[cfg(test)]
|
||||
pub fn idle_peer(&self) -> Option<usize> {
|
||||
self.idle.iter().cloned().next()
|
||||
}
|
||||
|
||||
/// Get all peers.
|
||||
/// Get all peers
|
||||
pub fn all_peers(&self) -> Vec<usize> {
|
||||
self.idle.iter().cloned().chain(self.requests.keys().cloned()).collect()
|
||||
self.idle_peers().into_iter()
|
||||
.chain(self.unuseful_peers().into_iter())
|
||||
.chain(self.active_peers().into_iter())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Get idle peers.
|
||||
|
@ -70,52 +75,80 @@ impl Peers {
|
|||
self.idle.iter().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get worst peer.
|
||||
pub fn worst_peers(&self) -> Vec<(usize, f64)> {
|
||||
self.times.iter().map(|(&pi, &t)| (pi, t)).collect()
|
||||
/// Get unuseful peers.
|
||||
pub fn unuseful_peers(&self) -> Vec<usize> {
|
||||
self.unuseful.iter().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get active peers.
|
||||
pub fn active_peers(&self) -> Vec<usize> {
|
||||
self.active.keys().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get active peers, sorted by last response time (oldest first).
|
||||
pub fn active_peers_order(&self) -> Vec<(usize, f64)> {
|
||||
self.active_order.iter()
|
||||
.map(|(&pi, &t)| (pi, t))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Insert new synchronization peer.
|
||||
pub fn insert(&mut self, peer_index: usize) {
|
||||
if !self.idle.contains(&peer_index) && !self.requests.contains_key(&peer_index) {
|
||||
// if peer is unknown => insert to idle queue
|
||||
// if peer is known && not useful => insert to idle queue
|
||||
if !self.idle.contains(&peer_index)
|
||||
&& !self.active.contains_key(&peer_index) {
|
||||
self.idle.insert(peer_index);
|
||||
self.unuseful.remove(&peer_index);
|
||||
}
|
||||
}
|
||||
|
||||
/// Peer has been disconnected
|
||||
pub fn on_peer_disconnected(&mut self, peer_index: usize) -> bool {
|
||||
pub fn on_peer_disconnected(&mut self, peer_index: usize) {
|
||||
// foret this peer without any chances to reuse
|
||||
self.idle.remove(&peer_index);
|
||||
self.requests.remove(&peer_index);
|
||||
self.failures.remove(&peer_index);
|
||||
self.times.remove(&peer_index);
|
||||
(self.idle.len() + self.requests.len()) == 0
|
||||
self.unuseful.remove(&peer_index);
|
||||
self.active.remove(&peer_index);
|
||||
self.active_failures.remove(&peer_index);
|
||||
self.active_order.remove(&peer_index);
|
||||
}
|
||||
|
||||
/// Block is received from peer.
|
||||
pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) {
|
||||
if let Entry::Occupied(mut entry) = self.requests.entry(peer_index) {
|
||||
entry.get_mut().remove(block_hash);
|
||||
if entry.get().is_empty() {
|
||||
self.idle.insert(peer_index);
|
||||
entry.remove_entry();
|
||||
}
|
||||
}
|
||||
// got new message => forget all previous failures
|
||||
self.on_peer_message(peer_index);
|
||||
// if this is message from active peer && there are no pendind requests to this peer - mark as idle
|
||||
if let Entry::Occupied(mut active_entry) = self.active.entry(peer_index) {
|
||||
active_entry.get_mut().remove(block_hash);
|
||||
if active_entry.get().is_empty() {
|
||||
self.active_failures.remove(&peer_index);
|
||||
self.active_order.remove(&peer_index);
|
||||
self.idle.insert(peer_index);
|
||||
active_entry.remove_entry();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks have been requested from peer.
|
||||
pub fn on_blocks_requested(&mut self, peer_index: usize, blocks_hashes: &[H256]) {
|
||||
// mark peer as active
|
||||
self.idle.remove(&peer_index);
|
||||
self.requests.entry(peer_index).or_insert_with(HashSet::new).extend(blocks_hashes.iter().cloned());
|
||||
self.times.insert(peer_index, precise_time_s());
|
||||
self.unuseful.remove(&peer_index);
|
||||
self.active.entry(peer_index).or_insert_with(HashSet::new).extend(blocks_hashes.iter().cloned());
|
||||
self.active_order.insert(peer_index, precise_time_s());
|
||||
}
|
||||
|
||||
/// Inventory has been requested from peer.
|
||||
pub fn on_inventory_requested(&mut self, peer_index: usize) {
|
||||
// inventory can only be requested from idle peers
|
||||
assert!(!self.requests.contains_key(&peer_index));
|
||||
assert!(!self.active.contains_key(&peer_index));
|
||||
|
||||
// remove peer from idle && active queues & leave him as unuseful (will becaume useful again once we will receive `good` inventory from him)
|
||||
self.idle.remove(&peer_index);
|
||||
self.active.remove(&peer_index);
|
||||
self.active_failures.remove(&peer_index);
|
||||
self.active_order.remove(&peer_index);
|
||||
self.unuseful.insert(peer_index);
|
||||
// peer is now out-of-synchronization process, because:
|
||||
// 1) if it has new blocks, it will respond with `inventory` message && will be inserted back here
|
||||
// 2) if it has no new blocks => either synchronization is completed, or it is behind us in sync
|
||||
|
@ -123,7 +156,7 @@ impl Peers {
|
|||
|
||||
/// We have failed to get response from peer during given period
|
||||
pub fn on_peer_failure(&mut self, peer_index: usize) -> bool {
|
||||
let peer_failures = match self.failures.entry(peer_index) {
|
||||
let peer_failures = match self.active_failures.entry(peer_index) {
|
||||
Entry::Occupied(mut entry) => {
|
||||
let failures = entry.get() + 1;
|
||||
entry.insert(failures);
|
||||
|
@ -134,34 +167,38 @@ impl Peers {
|
|||
|
||||
let too_much_failures = peer_failures >= MAX_PEER_FAILURES;
|
||||
if too_much_failures {
|
||||
self.failures.remove(&peer_index);
|
||||
self.requests.remove(&peer_index);
|
||||
self.times.remove(&peer_index);
|
||||
self.idle.remove(&peer_index);
|
||||
self.active.remove(&peer_index);
|
||||
self.active_failures.remove(&peer_index);
|
||||
self.active_order.remove(&peer_index);
|
||||
self.unuseful.insert(peer_index);
|
||||
}
|
||||
too_much_failures
|
||||
}
|
||||
|
||||
/// Reset peers state
|
||||
/// Reset all peers state to the unuseful
|
||||
pub fn reset(&mut self) {
|
||||
self.idle.extend(self.requests.drain().map(|(k, _)| k));
|
||||
self.failures.clear();
|
||||
self.times.clear();
|
||||
self.unuseful.extend(self.idle.drain());
|
||||
self.unuseful.extend(self.active.drain().map(|(k, _)| k));
|
||||
self.active_failures.clear();
|
||||
self.active_order.clear();
|
||||
}
|
||||
|
||||
/// Reset peer tasks
|
||||
/// Reset peer tasks && move peer to idle state
|
||||
pub fn reset_tasks(&mut self, peer_index: usize) -> Vec<H256> {
|
||||
let requests = self.requests.remove(&peer_index);
|
||||
self.times.remove(&peer_index);
|
||||
let requests = self.active.remove(&peer_index);
|
||||
self.active_order.remove(&peer_index);
|
||||
self.unuseful.remove(&peer_index);
|
||||
self.idle.insert(peer_index);
|
||||
requests.expect("empty requests queue is not allowed").into_iter().collect()
|
||||
}
|
||||
|
||||
/// When sync message is received from peer
|
||||
fn on_peer_message(&mut self, peer_index: usize) {
|
||||
self.failures.remove(&peer_index);
|
||||
self.times.remove(&peer_index);
|
||||
if self.requests.contains_key(&peer_index) {
|
||||
self.times.insert(peer_index, precise_time_s());
|
||||
if self.active.contains_key(&peer_index) {
|
||||
self.active_failures.remove(&peer_index);
|
||||
self.active_order.remove(&peer_index);
|
||||
self.active_order.insert(peer_index, precise_time_s());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -174,8 +211,7 @@ mod tests {
|
|||
#[test]
|
||||
fn peers_empty_on_start() {
|
||||
let peers = Peers::new();
|
||||
assert_eq!(peers.idle_peer(), None);
|
||||
assert_eq!(peers.idle_peers().len(), 0);
|
||||
assert_eq!(peers.idle_peers(), vec![]);
|
||||
|
||||
let info = peers.information();
|
||||
assert_eq!(info.idle, 0);
|
||||
|
@ -183,26 +219,44 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn peers_all_idle_after_reset() {
|
||||
fn peers_all_unuseful_after_reset() {
|
||||
let mut peers = Peers::new();
|
||||
peers.on_blocks_requested(7, &vec![H256::default()]);
|
||||
peers.on_blocks_requested(8, &vec![H256::default()]);
|
||||
assert_eq!(peers.information().idle, 0);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 2);
|
||||
peers.reset();
|
||||
assert_eq!(peers.information().idle, 2);
|
||||
assert_eq!(peers.information().idle, 0);
|
||||
assert_eq!(peers.information().unuseful, 2);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peers_removed_after_inventory_request() {
|
||||
fn peer_idle_after_reset_tasks() {
|
||||
let mut peers = Peers::new();
|
||||
peers.on_blocks_requested(7, &vec![H256::default()]);
|
||||
assert_eq!(peers.information().idle, 0);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 1);
|
||||
assert_eq!(peers.reset_tasks(7), vec![H256::default()]);
|
||||
assert_eq!(peers.information().idle, 1);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peers_unuseful_after_inventory_request() {
|
||||
let mut peers = Peers::new();
|
||||
peers.insert(5);
|
||||
peers.insert(7);
|
||||
assert_eq!(peers.information().idle, 2);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
peers.on_inventory_requested(5);
|
||||
assert_eq!(peers.information().idle, 1);
|
||||
assert_eq!(peers.information().unuseful, 1);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -211,28 +265,25 @@ mod tests {
|
|||
|
||||
peers.insert(0);
|
||||
assert_eq!(peers.information().idle, 1);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
assert_eq!(peers.idle_peer(), Some(0));
|
||||
assert_eq!(peers.idle_peers(), vec![0]);
|
||||
|
||||
peers.insert(5);
|
||||
assert_eq!(peers.information().idle, 2);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
assert!(peers.idle_peer() == Some(0) || peers.idle_peer() == Some(5));
|
||||
assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5);
|
||||
assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5);
|
||||
|
||||
peers.on_peer_disconnected(7);
|
||||
assert_eq!(peers.information().idle, 2);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
assert!(peers.idle_peer() == Some(0) || peers.idle_peer() == Some(5));
|
||||
assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5);
|
||||
assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5);
|
||||
|
||||
peers.on_peer_disconnected(0);
|
||||
assert_eq!(peers.information().idle, 1);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
assert_eq!(peers.idle_peer(), Some(5));
|
||||
assert_eq!(peers.idle_peers(), vec![5]);
|
||||
}
|
||||
|
||||
|
@ -244,26 +295,32 @@ mod tests {
|
|||
|
||||
peers.on_blocks_requested(7, &vec![H256::default()]);
|
||||
assert_eq!(peers.information().idle, 1);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 1);
|
||||
|
||||
peers.on_blocks_requested(8, &vec![H256::default()]);
|
||||
assert_eq!(peers.information().idle, 1);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 2);
|
||||
|
||||
peers.on_block_received(7, &H256::default());
|
||||
assert_eq!(peers.information().idle, 2);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 1);
|
||||
|
||||
peers.on_block_received(9, &H256::default());
|
||||
assert_eq!(peers.information().idle, 2);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 1);
|
||||
|
||||
peers.on_block_received(8, &"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f".into());
|
||||
assert_eq!(peers.information().idle, 2);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 1);
|
||||
|
||||
peers.on_block_received(8, &H256::default());
|
||||
assert_eq!(peers.information().idle, 3);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
}
|
||||
|
||||
|
@ -273,34 +330,42 @@ mod tests {
|
|||
|
||||
peers.insert(1);
|
||||
peers.insert(2);
|
||||
assert_eq!(peers.worst_peers(), vec![]);
|
||||
assert_eq!(peers.active_peers_order(), vec![]);
|
||||
|
||||
peers.on_blocks_requested(1, &vec![H256::default()]);
|
||||
assert_eq!(peers.worst_peers().len(), 1);
|
||||
assert_eq!(peers.worst_peers()[0].0, 1);
|
||||
assert_eq!(peers.active_peers_order().len(), 1);
|
||||
assert_eq!(peers.active_peers_order()[0].0, 1);
|
||||
|
||||
peers.on_blocks_requested(2, &vec![H256::default()]);
|
||||
assert_eq!(peers.worst_peers().len(), 2);
|
||||
assert_eq!(peers.worst_peers()[0].0, 1);
|
||||
assert_eq!(peers.worst_peers()[1].0, 2);
|
||||
assert_eq!(peers.active_peers_order().len(), 2);
|
||||
assert_eq!(peers.active_peers_order()[0].0, 1);
|
||||
assert_eq!(peers.active_peers_order()[1].0, 2);
|
||||
|
||||
assert_eq!(peers.information().idle, 0);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 2);
|
||||
|
||||
peers.reset_tasks(1);
|
||||
|
||||
assert_eq!(peers.information().idle, 1);
|
||||
assert_eq!(peers.information().unuseful, 0);
|
||||
assert_eq!(peers.information().active, 1);
|
||||
|
||||
assert_eq!(peers.worst_peers().len(), 1);
|
||||
assert_eq!(peers.worst_peers()[0].0, 2);
|
||||
assert_eq!(peers.active_peers_order().len(), 1);
|
||||
assert_eq!(peers.active_peers_order()[0].0, 2);
|
||||
|
||||
for _ in 0..MAX_PEER_FAILURES {
|
||||
peers.on_peer_failure(2);
|
||||
}
|
||||
|
||||
assert_eq!(peers.worst_peers().len(), 0);
|
||||
assert_eq!(peers.active_peers_order().len(), 0);
|
||||
assert_eq!(peers.information().idle, 1);
|
||||
assert_eq!(peers.information().unuseful, 1);
|
||||
assert_eq!(peers.information().active, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_not_inserted_when_known() {
|
||||
// TODO
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue