initial close_session implementation && test

This commit is contained in:
Svyatoslav Nikolsky 2016-10-31 20:03:08 +03:00
parent f4f4b0f3b9
commit 1113f245c5
5 changed files with 59 additions and 16 deletions

View File

@ -23,7 +23,7 @@ impl InboundSyncConnection for InboundConnection {
}
fn close_session(&self) {
// TODO: implement
self.local_node.stop_sync_session(self.peer_index)
}
fn on_inventory(&self, message: types::Inv) {

View File

@ -31,6 +31,7 @@ pub struct LocalNode<T: SynchronizationTaskExecutor + PeersConnections + Send +
/// Peers list
pub trait PeersConnections {
fn add_peer_connection(&mut self, peer_index: usize, outbound_connection: OutboundSyncConnectionRef);
fn remove_peer_connection(&mut self, peer_index: usize);
}
impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + Send + 'static {
@ -66,6 +67,13 @@ impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + S
self.executor.lock().execute(SynchronizationTask::RequestInventory(peer_index));
}
pub fn stop_sync_session(&self, peer_index: usize) {
trace!(target: "sync", "Stopping sync session with peer#{}", peer_index);
self.executor.lock().remove_peer_connection(peer_index);
self.sync.lock().on_peer_disconnected(peer_index);
}
pub fn on_peer_inventory(&self, peer_index: usize, message: types::Inv) {
trace!(target: "sync", "Got `inventory` message from peer#{}. Inventory len: {}", peer_index, message.inventory.len());

View File

@ -230,19 +230,31 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
self.execute_synchronization_tasks();
}
/// Peer disconnected.
pub fn on_peer_disconnected(&mut self, peer_index: usize) {
self.peers.on_peer_disconnected(peer_index);
// when last peer is disconnected, reset, but let verifying blocks be verified
self.reset(false);
}
/// Reset synchronization process
pub fn reset(&mut self) {
pub fn reset(&mut self, is_hard: bool) {
self.peers.reset();
self.orphaned_blocks.clear();
// TODO: reset verification queue
let mut chain = self.chain.write();
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
chain.remove_blocks_with_state(BlockState::Requested);
chain.remove_blocks_with_state(BlockState::Scheduled);
chain.remove_blocks_with_state(BlockState::Verifying);
warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block());
if is_hard {
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
chain.remove_blocks_with_state(BlockState::Verifying);
warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block());
}
else {
self.state = State::Saturated;
}
}
/// Process new blocks inventory
@ -483,7 +495,7 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
// reset synchronization process
self.reset();
self.reset(true);
// start new tasks
self.execute_synchronization_tasks();
@ -515,6 +527,7 @@ pub mod tests {
impl PeersConnections for DummyTaskExecutor {
fn add_peer_connection(&mut self, _: usize, _: OutboundSyncConnectionRef) {}
fn remove_peer_connection(&mut self, _: usize) {}
}
impl TaskExecutor for DummyTaskExecutor {
@ -651,4 +664,23 @@ pub mod tests {
&& sync.information().chain.stored == 3);
}
}
#[test]
fn synchronization_reset_when_peer_is_disconnected() {
let (_, sync) = create_sync();
// request new blocks
{
let mut sync = sync.lock();
sync.on_new_blocks_inventory(1, vec!["0000000000000000000000000000000000000000000000000000000000000000".into()]);
assert!(sync.information().state.is_synchronizing());
}
// lost connection to peer => synchronization state lost
{
let mut sync = sync.lock();
sync.on_peer_disconnected(1);
assert!(!sync.information().state.is_synchronizing());
}
}
}

View File

@ -32,6 +32,10 @@ impl PeersConnections for LocalSynchronizationTaskExecutor {
fn add_peer_connection(&mut self, index: usize, connection: OutboundSyncConnectionRef) {
self.peers.insert(index, connection);
}
fn remove_peer_connection(&mut self, index: usize) {
self.peers.remove(&index);
}
}
impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {

View File

@ -58,13 +58,6 @@ impl Peers {
}
}
/// Remove synchronization peer.
#[cfg(test)]
pub fn remove(&mut self, peer_index: usize) {
self.idle_peers.remove(&peer_index);
self.blocks_requests.remove(&peer_index);
}
/// Block is received from peer.
pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) {
if let Entry::Occupied(mut entry) = self.blocks_requests.entry(peer_index) {
@ -76,6 +69,12 @@ impl Peers {
}
}
/// Peer has been disconnected
pub fn on_peer_disconnected(&mut self, peer_index: usize) {
self.idle_peers.remove(&peer_index);
self.blocks_requests.remove(&peer_index);
}
/// Blocks have been requested from peer.
pub fn on_blocks_requested(&mut self, peer_index: usize, blocks_hashes: &Vec<H256>) {
// inventory can only be requested from idle peers
@ -158,14 +157,14 @@ mod tests {
assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5);
assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5);
peers.remove(7);
peers.on_peer_disconnected(7);
assert_eq!(peers.information().idle, 2);
assert_eq!(peers.information().active, 0);
assert!(peers.idle_peer() == Some(0) || peers.idle_peer() == Some(5));
assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5);
assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5);
peers.remove(0);
peers.on_peer_disconnected(0);
assert_eq!(peers.information().idle, 1);
assert_eq!(peers.information().active, 0);
assert_eq!(peers.idle_peer(), Some(5));