diff --git a/sync/src/inbound_connection.rs b/sync/src/inbound_connection.rs index ca2d8b4f..f77956ff 100644 --- a/sync/src/inbound_connection.rs +++ b/sync/src/inbound_connection.rs @@ -23,7 +23,7 @@ impl InboundSyncConnection for InboundConnection { } fn close_session(&self) { - // TODO: implement + self.local_node.stop_sync_session(self.peer_index) } fn on_inventory(&self, message: types::Inv) { diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index d61c24d5..8fa7258a 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -31,6 +31,7 @@ pub struct LocalNode LocalNode where T: SynchronizationTaskExecutor + PeersConnections + Send + 'static { @@ -66,6 +67,13 @@ impl LocalNode where T: SynchronizationTaskExecutor + PeersConnections + S self.executor.lock().execute(SynchronizationTask::RequestInventory(peer_index)); } + pub fn stop_sync_session(&self, peer_index: usize) { + trace!(target: "sync", "Stopping sync session with peer#{}", peer_index); + + self.executor.lock().remove_peer_connection(peer_index); + self.sync.lock().on_peer_disconnected(peer_index); + } + pub fn on_peer_inventory(&self, peer_index: usize, message: types::Inv) { trace!(target: "sync", "Got `inventory` message from peer#{}. Inventory len: {}", peer_index, message.inventory.len()); diff --git a/sync/src/synchronization.rs b/sync/src/synchronization.rs index 3d1525d1..aa1bee3b 100644 --- a/sync/src/synchronization.rs +++ b/sync/src/synchronization.rs @@ -230,19 +230,31 @@ impl Synchronization where T: TaskExecutor + Send + 'static { self.execute_synchronization_tasks(); } + /// Peer disconnected. + pub fn on_peer_disconnected(&mut self, peer_index: usize) { + self.peers.on_peer_disconnected(peer_index); + + // when last peer is disconnected, reset, but let verifying blocks be verified + self.reset(false); + } + /// Reset synchronization process - pub fn reset(&mut self) { + pub fn reset(&mut self, is_hard: bool) { self.peers.reset(); self.orphaned_blocks.clear(); // TODO: reset verification queue let mut chain = self.chain.write(); - self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); chain.remove_blocks_with_state(BlockState::Requested); chain.remove_blocks_with_state(BlockState::Scheduled); - chain.remove_blocks_with_state(BlockState::Verifying); - - warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block()); + if is_hard { + self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number); + chain.remove_blocks_with_state(BlockState::Verifying); + warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block()); + } + else { + self.state = State::Saturated; + } } /// Process new blocks inventory @@ -483,7 +495,7 @@ impl Synchronization where T: TaskExecutor + Send + 'static { warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err); // reset synchronization process - self.reset(); + self.reset(true); // start new tasks self.execute_synchronization_tasks(); @@ -515,6 +527,7 @@ pub mod tests { impl PeersConnections for DummyTaskExecutor { fn add_peer_connection(&mut self, _: usize, _: OutboundSyncConnectionRef) {} + fn remove_peer_connection(&mut self, _: usize) {} } impl TaskExecutor for DummyTaskExecutor { @@ -651,4 +664,23 @@ pub mod tests { && sync.information().chain.stored == 3); } } + + #[test] + fn synchronization_reset_when_peer_is_disconnected() { + let (_, sync) = create_sync(); + + // request new blocks + { + let mut sync = sync.lock(); + sync.on_new_blocks_inventory(1, vec!["0000000000000000000000000000000000000000000000000000000000000000".into()]); + assert!(sync.information().state.is_synchronizing()); + } + + // lost connection to peer => synchronization state lost + { + let mut sync = sync.lock(); + sync.on_peer_disconnected(1); + assert!(!sync.information().state.is_synchronizing()); + } + } } diff --git a/sync/src/synchronization_executor.rs b/sync/src/synchronization_executor.rs index ec258e25..b91a5622 100644 --- a/sync/src/synchronization_executor.rs +++ b/sync/src/synchronization_executor.rs @@ -32,6 +32,10 @@ impl PeersConnections for LocalSynchronizationTaskExecutor { fn add_peer_connection(&mut self, index: usize, connection: OutboundSyncConnectionRef) { self.peers.insert(index, connection); } + + fn remove_peer_connection(&mut self, index: usize) { + self.peers.remove(&index); + } } impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor { diff --git a/sync/src/synchronization_peers.rs b/sync/src/synchronization_peers.rs index 2986414c..e0f6057f 100644 --- a/sync/src/synchronization_peers.rs +++ b/sync/src/synchronization_peers.rs @@ -58,13 +58,6 @@ impl Peers { } } - /// Remove synchronization peer. - #[cfg(test)] - pub fn remove(&mut self, peer_index: usize) { - self.idle_peers.remove(&peer_index); - self.blocks_requests.remove(&peer_index); - } - /// Block is received from peer. pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) { if let Entry::Occupied(mut entry) = self.blocks_requests.entry(peer_index) { @@ -76,6 +69,12 @@ impl Peers { } } + /// Peer has been disconnected + pub fn on_peer_disconnected(&mut self, peer_index: usize) { + self.idle_peers.remove(&peer_index); + self.blocks_requests.remove(&peer_index); + } + /// Blocks have been requested from peer. pub fn on_blocks_requested(&mut self, peer_index: usize, blocks_hashes: &Vec) { // inventory can only be requested from idle peers @@ -158,14 +157,14 @@ mod tests { assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5); assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5); - peers.remove(7); + peers.on_peer_disconnected(7); assert_eq!(peers.information().idle, 2); assert_eq!(peers.information().active, 0); assert!(peers.idle_peer() == Some(0) || peers.idle_peer() == Some(5)); assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5); assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5); - peers.remove(0); + peers.on_peer_disconnected(0); assert_eq!(peers.information().idle, 1); assert_eq!(peers.information().active, 0); assert_eq!(peers.idle_peer(), Some(5));