store unknown orphan blocks

This commit is contained in:
Svyatoslav Nikolsky 2016-11-11 11:38:41 +03:00
parent 17e47bad4a
commit 781a9e1223
3 changed files with 151 additions and 31 deletions

View File

@ -287,11 +287,8 @@ impl Chain {
/// Forget in-memory block
pub fn forget(&mut self, hash: &H256) -> HashPosition {
let position = self.forget_leave_header(hash);
if position != HashPosition::Missing {
self.headers_chain.remove(hash);
}
position
self.headers_chain.remove(hash);
self.forget_leave_header(hash)
}
/// Forget in-memory block, but leave its header in the headers_chain (orphan queue)
@ -308,11 +305,8 @@ impl Chain {
/// Forget in-memory block by hash if it is currently in given state
#[cfg(test)]
pub fn forget_with_state(&mut self, hash: &H256, state: BlockState) -> HashPosition {
let position = self.forget_with_state_leave_header(hash, state);
if position != HashPosition::Missing {
self.headers_chain.remove(hash);
}
position
self.headers_chain.remove(hash);
self.forget_with_state_leave_header(hash, state)
}
/// Forget in-memory block by hash if it is currently in given state

View File

@ -9,6 +9,7 @@ use futures::{BoxFuture, Future, finished};
use futures::stream::Stream;
use tokio_core::reactor::{Handle, Interval};
use futures_cpupool::CpuPool;
use linked_hash_map::LinkedHashMap;
use db;
use chain::{Block, BlockHeader, RepresentH256};
use primitives::hash::H256;
@ -19,7 +20,7 @@ use synchronization_chain::{ChainRef, BlockState, HeadersIntersection};
use synchronization_chain::{Information as ChainInformation};
use verification::{ChainVerifier, Error as VerificationError, Verify};
use synchronization_executor::{Task, TaskExecutor};
use synchronization_manager::{manage_synchronization_peers, MANAGEMENT_INTERVAL_MS};
use synchronization_manager::{manage_synchronization_peers, manage_unknown_orphaned_blocks, MANAGEMENT_INTERVAL_MS};
use hash_queue::HashPosition;
use time;
use std::time::Duration;
@ -224,7 +225,9 @@ pub struct SynchronizationClient<T: TaskExecutor> {
/// Chain reference.
chain: ChainRef,
/// Blocks from requested_hashes, but received out-of-order.
orphaned_blocks: HashMap<H256, Vec<(H256, Block)>>,
orphaned_blocks: HashMap<H256, HashMap<H256, Block>>,
/// Blocks that we have received without requesting with receiving time.
unknown_blocks: LinkedHashMap<H256, f64>,
/// Verification work transmission channel.
verification_work_sender: Option<Sender<VerificationTask>>,
/// Verification thread.
@ -302,6 +305,7 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
let chain = self.chain.read();
blocks_hashes.into_iter()
.filter(|h| chain.block_state(&h) == BlockState::Unknown)
.filter(|h| !self.unknown_blocks.contains_key(h))
.collect()
};
@ -317,7 +321,7 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
if {
self.chain.read().block_state(&header0.previous_header_hash) == BlockState::Unknown
} {
warn!(target: "sync", "Previous header of the first header from `headers` message is unknown. First: {:?}. Previous: {:?}", header0.hash(), header0.previous_header_hash);
warn!(target: "sync", "Previous header of the first header from peer#{} `headers` message is unknown. First: {:?}. Previous: {:?}", peer_index, header0.hash(), header0.previous_header_hash);
return;
}
@ -328,7 +332,7 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
for block_header in blocks_headers.iter() {
let block_header_hash = block_header.hash();
if block_header.previous_header_hash != prev_block_hash {
warn!(target: "sync", "Neighbour headers in `headers` message are unlinked: Prev: {:?}, PrevLink: {:?}, Curr: {:?}", prev_block_hash, block_header.previous_header_hash, block_header_hash);
warn!(target: "sync", "Neighbour headers in peer#{} `headers` message are unlinked: Prev: {:?}, PrevLink: {:?}, Curr: {:?}", peer_index, prev_block_hash, block_header.previous_header_hash, block_header_hash);
return;
}
@ -386,7 +390,6 @@ impl<T> Client for SynchronizationClient<T> where T: TaskExecutor {
/// Process successful block verification
fn on_block_verification_success(&mut self, block: Block) {
let hash = block.hash();
// insert block to the storage
{
let mut chain = self.chain.write();
@ -440,6 +443,7 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
executor: executor,
chain: chain.clone(),
orphaned_blocks: HashMap::new(),
unknown_blocks: LinkedHashMap::new(),
verification_work_sender: None,
verification_worker_thread: None,
verifying_blocks_by_peer: HashMap::new(),
@ -482,6 +486,10 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
} else {
client.execute_synchronization_tasks(blocks_to_request);
}
if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&mut client.unknown_blocks) {
client.remove_orphaned_blocks(orphans_to_remove.into_iter().collect());
}
}
Ok(())
})
@ -511,14 +519,18 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
let mut chain = self.chain.write();
match chain.intersect_with_headers(&hashes, &headers) {
HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => (),
HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => {
warn!(target: "sync", "Ignoring {} headers from peer#{}. Unknown and we are synchronizing.", headers.len(), peer_index);
},
HeadersIntersection::DbAllBlocksKnown => {
trace!(target: "sync", "Ignoring {} headers from peer#{}. All blocks are known and in database.", headers.len(), peer_index);
if self.state.is_synchronizing() {
// remember peer as useful
self.peers.insert(peer_index);
}
},
HeadersIntersection::InMemoryNoNewBlocks => {
trace!(target: "sync", "Ignoring {} headers from peer#{}. All blocks are known and in memory.", headers.len(), peer_index);
// remember peer as useful
self.peers.insert(peer_index);
},
@ -561,6 +573,20 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
BlockState::Unknown => {
// remove block from current queue
chain.forget(&block_hash);
if self.state.is_synchronizing() {
// when synchronizing, we tend to receive all blocks in-order
trace!(target: "sync", "Ignoring block {} from peer#{}, because its parent is unknown and we are synchronizing", block_hash, peer_index);
} else {
// remove this block from the queue
chain.forget_leave_header(&block_hash);
// remember this block as unknown
self.unknown_blocks.insert(block_hash.clone(), time::precise_time_s());
self.orphaned_blocks
.entry(block.block_header.previous_header_hash.clone())
.or_insert_with(HashMap::new)
.insert(block_hash, block);
}
},
BlockState::Verifying | BlockState::Stored => {
// remember peer as useful
@ -603,7 +629,10 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
// process orphan blocks
if let Entry::Occupied(entry) = self.orphaned_blocks.entry(block_hash) {
let (_, orphaned) = entry.remove_entry();
blocks.extend(orphaned);
for orphaned_hash in orphaned.keys() {
self.unknown_blocks.remove(&orphaned_hash);
}
blocks.extend(orphaned.into_iter());
}
}
},
@ -613,8 +642,8 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
// remember as orphan block
self.orphaned_blocks
.entry(block.block_header.previous_header_hash.clone())
.or_insert_with(Vec::new)
.push((block_hash, block))
.or_insert_with(HashMap::new)
.insert(block_hash, block);
}
}
},
@ -726,9 +755,15 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
self.state = State::Saturated;
self.orphaned_blocks.clear();
self.peers.reset();
// remove sync orphans, but leave unknown orphans until they'll be removed by management thread
let orphans_to_remove: HashSet<_> = self.orphaned_blocks.values()
.flat_map(|v| v.iter().map(|e| e.0.clone()))
.filter(|h| !self.unknown_blocks.contains_key(h))
.collect();
self.remove_orphaned_blocks(orphans_to_remove);
// leave currently verifying blocks
{
let mut chain = self.chain.write();
@ -750,6 +785,30 @@ impl<T> SynchronizationClient<T> where T: TaskExecutor {
}
}
/// Remove give orphaned blocks
fn remove_orphaned_blocks(&mut self, orphans_to_remove: HashSet<H256>) {
let parent_orphan_keys: Vec<_> = self.orphaned_blocks.keys().cloned().collect();
for parent_orphan_key in parent_orphan_keys.into_iter() {
if let Entry::Occupied(mut orphan_entry) = self.orphaned_blocks.entry(parent_orphan_key.clone()) {
if {
let mut orphans = orphan_entry.get_mut();
let orphans_keys: HashSet<H256> = orphans.keys().cloned().collect();
for orphan_to_remove in orphans_keys.intersection(&orphans_to_remove) {
orphans.remove(orphan_to_remove);
}
orphans.is_empty()
} {
orphan_entry.remove_entry();
}
}
}
let mut chain = self.chain.write();
for orphan_to_remove in orphans_to_remove {
chain.forget(&orphan_to_remove);
}
}
/// Awake threads, waiting for this block
fn awake_waiting_threads(&mut self, hash: &H256) {
// find a peer, which has supplied us with this block
@ -898,20 +957,18 @@ pub mod tests {
let (_, _, _, _, sync) = create_sync(None);
let mut sync = sync.lock();
let block2: Block = test_data::block_h169();
sync.on_new_blocks_headers(5, vec![block2.block_header.clone()]);
sync.on_peer_block(5, block2);
sync.on_new_blocks_headers(5, vec![test_data::block_h1().block_header.clone(), test_data::block_h2().block_header.clone()]);
sync.on_peer_block(5, test_data::block_h169());
// out-of-order block was presented by the peer
assert!(!sync.information().state.is_synchronizing());
assert!(sync.information().state.is_synchronizing());
assert_eq!(sync.information().orphaned, 0);
assert_eq!(sync.information().chain.scheduled, 0);
assert_eq!(sync.information().chain.requested, 0);
assert_eq!(sync.information().chain.requested, 2);
assert_eq!(sync.information().chain.stored, 1);
// we have just requested new `inventory` from the peer => peer is forgotten
assert_eq!(sync.information().peers.idle, 0);
assert_eq!(sync.information().peers.active, 0);
assert_eq!(sync.information().peers.active, 1);
// TODO: check that peer is penalized
}
@ -1211,4 +1268,38 @@ pub mod tests {
assert_eq!(chain.best_storage_block().number, 4);
}
}
#[test]
fn accept_out_of_order_blocks_when_saturated() {
let (_, _, _, chain, sync) = create_sync(None);
let mut sync = sync.lock();
sync.on_peer_block(1, test_data::block_h2());
assert_eq!(sync.information().orphaned, 1);
{
let chain = chain.read();
assert_eq!(chain.best_storage_block().number, 0);
}
sync.on_peer_block(1, test_data::block_h1());
assert_eq!(sync.information().orphaned, 0);
{
let chain = chain.read();
assert_eq!(chain.best_storage_block().number, 2);
}
}
#[test]
fn do_not_rerequest_unknown_block_in_inventory() {
let (_, _, executor, _, sync) = create_sync(None);
let mut sync = sync.lock();
sync.on_peer_block(1, test_data::block_h2());
sync.on_new_blocks_inventory(1, vec![test_data::block_h1().hash(), test_data::block_h2().hash()]);
let tasks = executor.lock().take_tasks();
assert_eq!(tasks, vec![Task::RequestBlocks(1, vec![test_data::block_h1().hash()])]);
}
}

View File

@ -1,20 +1,26 @@
use time::precise_time_s;
use linked_hash_map::LinkedHashMap;
use synchronization_peers::Peers;
use primitives::hash::H256;
/// Management interval (in ms)
pub const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000;
/// Response time to decrease peer score
const FAILURE_INTERVAL_S: f64 = 5f64;
const PEER_FAILURE_INTERVAL_S: f64 = 5f64;
/// Unknown orphan block removal time
const UNKNOWN_BLOCK_REMOVAL_TIME_S: f64 = 20f64 * 60f64;
/// Maximal number of orphaned blocks
const UNKNOWN_BLOCKS_MAX_LEN: usize = 16;
/// Management worker
/// Manage stalled synchronization peers tasks
pub fn manage_synchronization_peers(peers: &mut Peers) -> Option<Vec<H256>> {
let mut blocks_to_request: Vec<H256> = Vec::new();
let now = precise_time_s();
// reset tasks for peers, which has not responded during given period
for (worst_peer_index, worst_peer_time) in peers.worst_peers() {
// check if peer has not responded within given time
let time_diff = worst_peer_time - precise_time_s();
if time_diff <= FAILURE_INTERVAL_S {
let time_diff = worst_peer_time - now;
if time_diff <= PEER_FAILURE_INTERVAL_S {
break;
}
@ -31,3 +37,32 @@ pub fn manage_synchronization_peers(peers: &mut Peers) -> Option<Vec<H256>> {
if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) }
}
/// Manage unknown orphaned blocks
pub fn manage_unknown_orphaned_blocks(unknown_blocks: &mut LinkedHashMap<H256, f64>) -> Option<Vec<H256>> {
let mut unknown_to_remove: Vec<H256> = Vec::new();
let mut remove_num = if unknown_blocks.len() > UNKNOWN_BLOCKS_MAX_LEN { UNKNOWN_BLOCKS_MAX_LEN - unknown_blocks.len() } else { 0 };
let now = precise_time_s();
for (hash, time) in unknown_blocks.iter() {
// remove oldest blocks if there are more unknown blocks that we can hold in memory
if remove_num > 0 {
unknown_to_remove.push(hash.clone());
remove_num -= 1;
continue;
}
// check if block is unknown for too long
let time_diff = time - now;
if time_diff <= UNKNOWN_BLOCK_REMOVAL_TIME_S {
break;
}
unknown_to_remove.push(hash.clone());
}
// remove unknown blocks
for unknown_block in unknown_to_remove.iter() {
unknown_blocks.remove(unknown_block);
}
if unknown_to_remove.is_empty() { None } else { Some(unknown_to_remove) }
}