Merge pull request #69 from ethcore/close_sync_session
Close sync session implementation
This commit is contained in:
commit
96216a4068
|
@ -23,7 +23,7 @@ impl InboundSyncConnection for InboundConnection {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn close_session(&self) {
|
fn close_session(&self) {
|
||||||
// TODO: implement
|
self.local_node.stop_sync_session(self.peer_index)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_inventory(&self, message: types::Inv) {
|
fn on_inventory(&self, message: types::Inv) {
|
||||||
|
|
|
@ -31,6 +31,7 @@ pub struct LocalNode<T: SynchronizationTaskExecutor + PeersConnections + Send +
|
||||||
/// Peers list
|
/// Peers list
|
||||||
pub trait PeersConnections {
|
pub trait PeersConnections {
|
||||||
fn add_peer_connection(&mut self, peer_index: usize, outbound_connection: OutboundSyncConnectionRef);
|
fn add_peer_connection(&mut self, peer_index: usize, outbound_connection: OutboundSyncConnectionRef);
|
||||||
|
fn remove_peer_connection(&mut self, peer_index: usize);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + Send + 'static {
|
impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + Send + 'static {
|
||||||
|
@ -66,6 +67,13 @@ impl<T> LocalNode<T> where T: SynchronizationTaskExecutor + PeersConnections + S
|
||||||
self.executor.lock().execute(SynchronizationTask::RequestInventory(peer_index));
|
self.executor.lock().execute(SynchronizationTask::RequestInventory(peer_index));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn stop_sync_session(&self, peer_index: usize) {
|
||||||
|
trace!(target: "sync", "Stopping sync session with peer#{}", peer_index);
|
||||||
|
|
||||||
|
self.executor.lock().remove_peer_connection(peer_index);
|
||||||
|
self.sync.lock().on_peer_disconnected(peer_index);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn on_peer_inventory(&self, peer_index: usize, message: types::Inv) {
|
pub fn on_peer_inventory(&self, peer_index: usize, message: types::Inv) {
|
||||||
trace!(target: "sync", "Got `inventory` message from peer#{}. Inventory len: {}", peer_index, message.inventory.len());
|
trace!(target: "sync", "Got `inventory` message from peer#{}. Inventory len: {}", peer_index, message.inventory.len());
|
||||||
|
|
||||||
|
|
|
@ -230,19 +230,31 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
|
||||||
self.execute_synchronization_tasks();
|
self.execute_synchronization_tasks();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Peer disconnected.
|
||||||
|
pub fn on_peer_disconnected(&mut self, peer_index: usize) {
|
||||||
|
self.peers.on_peer_disconnected(peer_index);
|
||||||
|
|
||||||
|
// when last peer is disconnected, reset, but let verifying blocks be verified
|
||||||
|
self.reset(false);
|
||||||
|
}
|
||||||
|
|
||||||
/// Reset synchronization process
|
/// Reset synchronization process
|
||||||
pub fn reset(&mut self) {
|
pub fn reset(&mut self, is_hard: bool) {
|
||||||
self.peers.reset();
|
self.peers.reset();
|
||||||
self.orphaned_blocks.clear();
|
self.orphaned_blocks.clear();
|
||||||
// TODO: reset verification queue
|
// TODO: reset verification queue
|
||||||
|
|
||||||
let mut chain = self.chain.write();
|
let mut chain = self.chain.write();
|
||||||
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
|
|
||||||
chain.remove_blocks_with_state(BlockState::Requested);
|
chain.remove_blocks_with_state(BlockState::Requested);
|
||||||
chain.remove_blocks_with_state(BlockState::Scheduled);
|
chain.remove_blocks_with_state(BlockState::Scheduled);
|
||||||
chain.remove_blocks_with_state(BlockState::Verifying);
|
if is_hard {
|
||||||
|
self.state = State::Synchronizing(time::precise_time_s(), chain.best_block().number);
|
||||||
warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block());
|
chain.remove_blocks_with_state(BlockState::Verifying);
|
||||||
|
warn!(target: "sync", "Synchronization process restarting from block {:?}", chain.best_block());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
self.state = State::Saturated;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process new blocks inventory
|
/// Process new blocks inventory
|
||||||
|
@ -483,7 +495,7 @@ impl<T> Synchronization<T> where T: TaskExecutor + Send + 'static {
|
||||||
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
|
warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err);
|
||||||
|
|
||||||
// reset synchronization process
|
// reset synchronization process
|
||||||
self.reset();
|
self.reset(true);
|
||||||
|
|
||||||
// start new tasks
|
// start new tasks
|
||||||
self.execute_synchronization_tasks();
|
self.execute_synchronization_tasks();
|
||||||
|
@ -515,6 +527,7 @@ pub mod tests {
|
||||||
|
|
||||||
impl PeersConnections for DummyTaskExecutor {
|
impl PeersConnections for DummyTaskExecutor {
|
||||||
fn add_peer_connection(&mut self, _: usize, _: OutboundSyncConnectionRef) {}
|
fn add_peer_connection(&mut self, _: usize, _: OutboundSyncConnectionRef) {}
|
||||||
|
fn remove_peer_connection(&mut self, _: usize) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TaskExecutor for DummyTaskExecutor {
|
impl TaskExecutor for DummyTaskExecutor {
|
||||||
|
@ -651,4 +664,23 @@ pub mod tests {
|
||||||
&& sync.information().chain.stored == 3);
|
&& sync.information().chain.stored == 3);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn synchronization_reset_when_peer_is_disconnected() {
|
||||||
|
let (_, sync) = create_sync();
|
||||||
|
|
||||||
|
// request new blocks
|
||||||
|
{
|
||||||
|
let mut sync = sync.lock();
|
||||||
|
sync.on_new_blocks_inventory(1, vec!["0000000000000000000000000000000000000000000000000000000000000000".into()]);
|
||||||
|
assert!(sync.information().state.is_synchronizing());
|
||||||
|
}
|
||||||
|
|
||||||
|
// lost connection to peer => synchronization state lost
|
||||||
|
{
|
||||||
|
let mut sync = sync.lock();
|
||||||
|
sync.on_peer_disconnected(1);
|
||||||
|
assert!(!sync.information().state.is_synchronizing());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,10 @@ impl PeersConnections for LocalSynchronizationTaskExecutor {
|
||||||
fn add_peer_connection(&mut self, index: usize, connection: OutboundSyncConnectionRef) {
|
fn add_peer_connection(&mut self, index: usize, connection: OutboundSyncConnectionRef) {
|
||||||
self.peers.insert(index, connection);
|
self.peers.insert(index, connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn remove_peer_connection(&mut self, index: usize) {
|
||||||
|
self.peers.remove(&index);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {
|
impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {
|
||||||
|
|
|
@ -58,13 +58,6 @@ impl Peers {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove synchronization peer.
|
|
||||||
#[cfg(test)]
|
|
||||||
pub fn remove(&mut self, peer_index: usize) {
|
|
||||||
self.idle_peers.remove(&peer_index);
|
|
||||||
self.blocks_requests.remove(&peer_index);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Block is received from peer.
|
/// Block is received from peer.
|
||||||
pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) {
|
pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) {
|
||||||
if let Entry::Occupied(mut entry) = self.blocks_requests.entry(peer_index) {
|
if let Entry::Occupied(mut entry) = self.blocks_requests.entry(peer_index) {
|
||||||
|
@ -76,6 +69,12 @@ impl Peers {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Peer has been disconnected
|
||||||
|
pub fn on_peer_disconnected(&mut self, peer_index: usize) {
|
||||||
|
self.idle_peers.remove(&peer_index);
|
||||||
|
self.blocks_requests.remove(&peer_index);
|
||||||
|
}
|
||||||
|
|
||||||
/// Blocks have been requested from peer.
|
/// Blocks have been requested from peer.
|
||||||
pub fn on_blocks_requested(&mut self, peer_index: usize, blocks_hashes: &Vec<H256>) {
|
pub fn on_blocks_requested(&mut self, peer_index: usize, blocks_hashes: &Vec<H256>) {
|
||||||
// inventory can only be requested from idle peers
|
// inventory can only be requested from idle peers
|
||||||
|
@ -158,14 +157,14 @@ mod tests {
|
||||||
assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5);
|
assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5);
|
||||||
assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5);
|
assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5);
|
||||||
|
|
||||||
peers.remove(7);
|
peers.on_peer_disconnected(7);
|
||||||
assert_eq!(peers.information().idle, 2);
|
assert_eq!(peers.information().idle, 2);
|
||||||
assert_eq!(peers.information().active, 0);
|
assert_eq!(peers.information().active, 0);
|
||||||
assert!(peers.idle_peer() == Some(0) || peers.idle_peer() == Some(5));
|
assert!(peers.idle_peer() == Some(0) || peers.idle_peer() == Some(5));
|
||||||
assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5);
|
assert!(peers.idle_peers()[0] == 0 || peers.idle_peers()[0] == 5);
|
||||||
assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5);
|
assert!(peers.idle_peers()[1] == 0 || peers.idle_peers()[1] == 5);
|
||||||
|
|
||||||
peers.remove(0);
|
peers.on_peer_disconnected(0);
|
||||||
assert_eq!(peers.information().idle, 1);
|
assert_eq!(peers.information().idle, 1);
|
||||||
assert_eq!(peers.information().active, 0);
|
assert_eq!(peers.information().active, 0);
|
||||||
assert_eq!(peers.idle_peer(), Some(5));
|
assert_eq!(peers.idle_peer(), Some(5));
|
||||||
|
|
Loading…
Reference in New Issue