diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 4b844cd7..d94fa9e0 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -15,7 +15,7 @@ use chain::{Block, BlockHeader, RepresentH256}; use primitives::hash::H256; use synchronization_peers::Peers; #[cfg(test)] use synchronization_peers::{Information as PeersInformation}; -use synchronization_chain::{ChainRef, BlockState, HeadersIntersection}; +use synchronization_chain::{Chain, ChainRef, BlockState, HeadersIntersection}; #[cfg(test)] use synchronization_chain::{Information as ChainInformation}; use verification::{ChainVerifier, Error as VerificationError, Verify}; @@ -571,12 +571,13 @@ impl SynchronizationClient where T: TaskExecutor { // check parent block state match chain.block_state(&block.block_header.previous_header_hash) { BlockState::Unknown => { - // remove block from current queue - chain.forget(&block_hash); - if self.state.is_synchronizing() { // when synchronizing, we tend to receive all blocks in-order trace!(target: "sync", "Ignoring block {} from peer#{}, because its parent is unknown and we are synchronizing", block_hash, peer_index); + // remove block from current queue + chain.forget(&block_hash); + // remove orphaned blocks + SynchronizationClient::::remove_orphaned_blocks_for_parent(&mut self.unknown_blocks, &mut self.orphaned_blocks, &mut chain, &block_hash); } else { // remove this block from the queue chain.forget_leave_header(&block_hash); @@ -591,13 +592,13 @@ impl SynchronizationClient where T: TaskExecutor { BlockState::Verifying | BlockState::Stored => { // remember peer as useful self.peers.insert(peer_index); + // forget block + chain.forget_leave_header(&block_hash); // schedule verification let mut blocks: VecDeque<(H256, Block)> = VecDeque::new(); - blocks.push_back((block_hash, block)); + blocks.push_back((block_hash.clone(), block)); + blocks.extend(SynchronizationClient::::remove_orphaned_blocks_for_parent(&mut self.unknown_blocks, &mut self.orphaned_blocks, &mut chain, &block_hash)); while let Some((block_hash, block)) = blocks.pop_front() { - // remove block from current queue - // header is removed in insert_best_block or in one of on_verification_* methods - chain.forget_leave_header(&block_hash); match self.verification_work_sender { Some(ref verification_work_sender) => { // remember that we are verifying block from this peer @@ -625,15 +626,6 @@ impl SynchronizationClient where T: TaskExecutor { .expect("Error inserting to db."); }, } - - // process orphan blocks - if let Entry::Occupied(entry) = self.orphaned_blocks.entry(block_hash) { - let (_, orphaned) = entry.remove_entry(); - for orphaned_hash in orphaned.keys() { - self.unknown_blocks.remove(&orphaned_hash); - } - blocks.extend(orphaned.into_iter()); - } } }, BlockState::Requested | BlockState::Scheduled => { @@ -785,7 +777,28 @@ impl SynchronizationClient where T: TaskExecutor { } } - /// Remove give orphaned blocks + /// Remove orphaned blocks for given parent + fn remove_orphaned_blocks_for_parent(unknown_blocks: &mut LinkedHashMap, orphaned_blocks: &mut HashMap>, chain: &mut Chain, parent: &H256) -> VecDeque<(H256, Block)> { + let mut queue: VecDeque = VecDeque::new(); + queue.push_back(parent.clone()); + + let mut removed: VecDeque<(H256, Block)> = VecDeque::new(); + while let Some(parent_hash) = queue.pop_front() { + chain.forget_leave_header(&parent_hash); + + if let Entry::Occupied(entry) = orphaned_blocks.entry(parent_hash) { + let (_, orphaned) = entry.remove_entry(); + for orphaned_hash in orphaned.keys() { + unknown_blocks.remove(&orphaned_hash); + } + queue.extend(orphaned.keys().cloned()); + removed.extend(orphaned.into_iter()); + } + } + removed + } + + /// Remove given orphaned blocks fn remove_orphaned_blocks(&mut self, orphans_to_remove: HashSet) { let parent_orphan_keys: Vec<_> = self.orphaned_blocks.keys().cloned().collect(); for parent_orphan_key in parent_orphan_keys.into_iter() { @@ -1011,6 +1024,7 @@ pub mod tests { && sync.information().orphaned == 1); // receive block from peer#1 sync.on_peer_block(1, block1); + assert!(sync.information().chain.requested == 0 && sync.information().orphaned == 0 && sync.information().chain.stored == 3);