fixed headers removal

This commit is contained in:
Svyatoslav Nikolsky 2016-11-11 12:19:45 +03:00
parent 781a9e1223
commit c526e833e3
1 changed files with 32 additions and 18 deletions

View File

@ -15,7 +15,7 @@ use chain::{Block, BlockHeader, RepresentH256};
use primitives::hash::H256;
use synchronization_peers::Peers;
#[cfg(test)] use synchronization_peers::{Information as PeersInformation};
use synchronization_chain::{ChainRef, BlockState, HeadersIntersection};
use synchronization_chain::{Chain, ChainRef, BlockState, HeadersIntersection};
#[cfg(test)]
use synchronization_chain::{Information as ChainInformation};
use verification::{ChainVerifier, Error as VerificationError, Verify};
@ -571,12 +571,13 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
// check parent block state
match chain.block_state(&block.block_header.previous_header_hash) {
BlockState::Unknown => {
// remove block from current queue
chain.forget(&block_hash);
if self.state.is_synchronizing() {
// when synchronizing, we tend to receive all blocks in-order
trace!(target: "sync", "Ignoring block {} from peer#{}, because its parent is unknown and we are synchronizing", block_hash, peer_index);
// remove block from current queue
chain.forget(&block_hash);
// remove orphaned blocks
SynchronizationClient::<T>::remove_orphaned_blocks_for_parent(&mut self.unknown_blocks, &mut self.orphaned_blocks, &mut chain, &block_hash);
} else {
// remove this block from the queue
chain.forget_leave_header(&block_hash);
@ -591,13 +592,13 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
BlockState::Verifying | BlockState::Stored => {
// remember peer as useful
self.peers.insert(peer_index);
// forget block
chain.forget_leave_header(&block_hash);
// schedule verification
let mut blocks: VecDeque<(H256, Block)> = VecDeque::new();
blocks.push_back((block_hash, block));
blocks.push_back((block_hash.clone(), block));
blocks.extend(SynchronizationClient::<T>::remove_orphaned_blocks_for_parent(&mut self.unknown_blocks, &mut self.orphaned_blocks, &mut chain, &block_hash));
while let Some((block_hash, block)) = blocks.pop_front() {
// remove block from current queue
// header is removed in insert_best_block or in one of on_verification_* methods
chain.forget_leave_header(&block_hash);
match self.verification_work_sender {
Some(ref verification_work_sender) => {
// remember that we are verifying block from this peer
@ -625,15 +626,6 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
.expect("Error inserting to db.");
},
}
// process orphan blocks
if let Entry::Occupied(entry) = self.orphaned_blocks.entry(block_hash) {
let (_, orphaned) = entry.remove_entry();
for orphaned_hash in orphaned.keys() {
self.unknown_blocks.remove(&orphaned_hash);
}
blocks.extend(orphaned.into_iter());
}
}
},
BlockState::Requested | BlockState::Scheduled => {
@ -785,7 +777,28 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
}
/// Remove give orphaned blocks
/// Remove orphaned blocks for given parent
fn remove_orphaned_blocks_for_parent(unknown_blocks: &mut LinkedHashMap<H256, f64>, orphaned_blocks: &mut HashMap<H256, HashMap<H256, Block>>, chain: &mut Chain, parent: &H256) -> VecDeque<(H256, Block)> {
let mut queue: VecDeque<H256> = VecDeque::new();
queue.push_back(parent.clone());
let mut removed: VecDeque<(H256, Block)> = VecDeque::new();
while let Some(parent_hash) = queue.pop_front() {
chain.forget_leave_header(&parent_hash);
if let Entry::Occupied(entry) = orphaned_blocks.entry(parent_hash) {
let (_, orphaned) = entry.remove_entry();
for orphaned_hash in orphaned.keys() {
unknown_blocks.remove(&orphaned_hash);
}
queue.extend(orphaned.keys().cloned());
removed.extend(orphaned.into_iter());
}
}
removed
}
/// Remove given orphaned blocks
fn remove_orphaned_blocks(&mut self, orphans_to_remove: HashSet<H256>) {
let parent_orphan_keys: Vec<_> = self.orphaned_blocks.keys().cloned().collect();
for parent_orphan_key in parent_orphan_keys.into_iter() {
@ -1011,6 +1024,7 @@ pub mod tests {
&& sync.information().orphaned == 1);
// receive block from peer#1
sync.on_peer_block(1, block1);
assert!(sync.information().chain.requested == 0
&& sync.information().orphaned == 0
&& sync.information().chain.stored == 3);