diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 0e17bfa5..4ddb05db 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -287,11 +287,8 @@ impl Chain { /// Forget in-memory block pub fn forget(&mut self, hash: &H256) -> HashPosition { - let position = self.forget_leave_header(hash); - if position != HashPosition::Missing { - self.headers_chain.remove(hash); - } - position + self.headers_chain.remove(hash); + self.forget_leave_header(hash) } /// Forget in-memory block, but leave its header in the headers_chain (orphan queue) @@ -308,11 +305,8 @@ impl Chain { /// Forget in-memory block by hash if it is currently in given state #[cfg(test)] pub fn forget_with_state(&mut self, hash: &H256, state: BlockState) -> HashPosition { - let position = self.forget_with_state_leave_header(hash, state); - if position != HashPosition::Missing { - self.headers_chain.remove(hash); - } - position + self.headers_chain.remove(hash); + self.forget_with_state_leave_header(hash, state) } /// Forget in-memory block by hash if it is currently in given state diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 92cc09d9..4b844cd7 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -9,6 +9,7 @@ use futures::{BoxFuture, Future, finished}; use futures::stream::Stream; use tokio_core::reactor::{Handle, Interval}; use futures_cpupool::CpuPool; +use linked_hash_map::LinkedHashMap; use db; use chain::{Block, BlockHeader, RepresentH256}; use primitives::hash::H256; @@ -19,7 +20,7 @@ use synchronization_chain::{ChainRef, BlockState, HeadersIntersection}; use synchronization_chain::{Information as ChainInformation}; use verification::{ChainVerifier, Error as VerificationError, Verify}; use synchronization_executor::{Task, TaskExecutor}; -use synchronization_manager::{manage_synchronization_peers, MANAGEMENT_INTERVAL_MS}; +use synchronization_manager::{manage_synchronization_peers, manage_unknown_orphaned_blocks, MANAGEMENT_INTERVAL_MS}; use hash_queue::HashPosition; use time; use std::time::Duration; @@ -224,7 +225,9 @@ pub struct SynchronizationClient { /// Chain reference. chain: ChainRef, /// Blocks from requested_hashes, but received out-of-order. - orphaned_blocks: HashMap>, + orphaned_blocks: HashMap>, + /// Blocks that we have received without requesting with receiving time. + unknown_blocks: LinkedHashMap, /// Verification work transmission channel. verification_work_sender: Option>, /// Verification thread. @@ -302,6 +305,7 @@ impl Client for SynchronizationClient where T: TaskExecutor { let chain = self.chain.read(); blocks_hashes.into_iter() .filter(|h| chain.block_state(&h) == BlockState::Unknown) + .filter(|h| !self.unknown_blocks.contains_key(h)) .collect() }; @@ -317,7 +321,7 @@ impl Client for SynchronizationClient where T: TaskExecutor { if { self.chain.read().block_state(&header0.previous_header_hash) == BlockState::Unknown } { - warn!(target: "sync", "Previous header of the first header from `headers` message is unknown. First: {:?}. Previous: {:?}", header0.hash(), header0.previous_header_hash); + warn!(target: "sync", "Previous header of the first header from peer#{} `headers` message is unknown. First: {:?}. Previous: {:?}", peer_index, header0.hash(), header0.previous_header_hash); return; } @@ -328,7 +332,7 @@ impl Client for SynchronizationClient where T: TaskExecutor { for block_header in blocks_headers.iter() { let block_header_hash = block_header.hash(); if block_header.previous_header_hash != prev_block_hash { - warn!(target: "sync", "Neighbour headers in `headers` message are unlinked: Prev: {:?}, PrevLink: {:?}, Curr: {:?}", prev_block_hash, block_header.previous_header_hash, block_header_hash); + warn!(target: "sync", "Neighbour headers in peer#{} `headers` message are unlinked: Prev: {:?}, PrevLink: {:?}, Curr: {:?}", peer_index, prev_block_hash, block_header.previous_header_hash, block_header_hash); return; } @@ -386,7 +390,6 @@ impl Client for SynchronizationClient where T: TaskExecutor { /// Process successful block verification fn on_block_verification_success(&mut self, block: Block) { let hash = block.hash(); - // insert block to the storage { let mut chain = self.chain.write(); @@ -440,6 +443,7 @@ impl SynchronizationClient where T: TaskExecutor { executor: executor, chain: chain.clone(), orphaned_blocks: HashMap::new(), + unknown_blocks: LinkedHashMap::new(), verification_work_sender: None, verification_worker_thread: None, verifying_blocks_by_peer: HashMap::new(), @@ -482,6 +486,10 @@ impl SynchronizationClient where T: TaskExecutor { } else { client.execute_synchronization_tasks(blocks_to_request); } + + if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&mut client.unknown_blocks) { + client.remove_orphaned_blocks(orphans_to_remove.into_iter().collect()); + } } Ok(()) }) @@ -511,14 +519,18 @@ impl SynchronizationClient where T: TaskExecutor { let mut chain = self.chain.write(); match chain.intersect_with_headers(&hashes, &headers) { - HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => (), + HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => { + warn!(target: "sync", "Ignoring {} headers from peer#{}. Unknown and we are synchronizing.", headers.len(), peer_index); + }, HeadersIntersection::DbAllBlocksKnown => { + trace!(target: "sync", "Ignoring {} headers from peer#{}. All blocks are known and in database.", headers.len(), peer_index); if self.state.is_synchronizing() { // remember peer as useful self.peers.insert(peer_index); } }, HeadersIntersection::InMemoryNoNewBlocks => { + trace!(target: "sync", "Ignoring {} headers from peer#{}. All blocks are known and in memory.", headers.len(), peer_index); // remember peer as useful self.peers.insert(peer_index); }, @@ -561,6 +573,20 @@ impl SynchronizationClient where T: TaskExecutor { BlockState::Unknown => { // remove block from current queue chain.forget(&block_hash); + + if self.state.is_synchronizing() { + // when synchronizing, we tend to receive all blocks in-order + trace!(target: "sync", "Ignoring block {} from peer#{}, because its parent is unknown and we are synchronizing", block_hash, peer_index); + } else { + // remove this block from the queue + chain.forget_leave_header(&block_hash); + // remember this block as unknown + self.unknown_blocks.insert(block_hash.clone(), time::precise_time_s()); + self.orphaned_blocks + .entry(block.block_header.previous_header_hash.clone()) + .or_insert_with(HashMap::new) + .insert(block_hash, block); + } }, BlockState::Verifying | BlockState::Stored => { // remember peer as useful @@ -603,7 +629,10 @@ impl SynchronizationClient where T: TaskExecutor { // process orphan blocks if let Entry::Occupied(entry) = self.orphaned_blocks.entry(block_hash) { let (_, orphaned) = entry.remove_entry(); - blocks.extend(orphaned); + for orphaned_hash in orphaned.keys() { + self.unknown_blocks.remove(&orphaned_hash); + } + blocks.extend(orphaned.into_iter()); } } }, @@ -613,8 +642,8 @@ impl SynchronizationClient where T: TaskExecutor { // remember as orphan block self.orphaned_blocks .entry(block.block_header.previous_header_hash.clone()) - .or_insert_with(Vec::new) - .push((block_hash, block)) + .or_insert_with(HashMap::new) + .insert(block_hash, block); } } }, @@ -726,9 +755,15 @@ impl SynchronizationClient where T: TaskExecutor { } self.state = State::Saturated; - self.orphaned_blocks.clear(); self.peers.reset(); + // remove sync orphans, but leave unknown orphans until they'll be removed by management thread + let orphans_to_remove: HashSet<_> = self.orphaned_blocks.values() + .flat_map(|v| v.iter().map(|e| e.0.clone())) + .filter(|h| !self.unknown_blocks.contains_key(h)) + .collect(); + self.remove_orphaned_blocks(orphans_to_remove); + // leave currently verifying blocks { let mut chain = self.chain.write(); @@ -750,6 +785,30 @@ impl SynchronizationClient where T: TaskExecutor { } } + /// Remove give orphaned blocks + fn remove_orphaned_blocks(&mut self, orphans_to_remove: HashSet) { + let parent_orphan_keys: Vec<_> = self.orphaned_blocks.keys().cloned().collect(); + for parent_orphan_key in parent_orphan_keys.into_iter() { + if let Entry::Occupied(mut orphan_entry) = self.orphaned_blocks.entry(parent_orphan_key.clone()) { + if { + let mut orphans = orphan_entry.get_mut(); + let orphans_keys: HashSet = orphans.keys().cloned().collect(); + for orphan_to_remove in orphans_keys.intersection(&orphans_to_remove) { + orphans.remove(orphan_to_remove); + } + orphans.is_empty() + } { + orphan_entry.remove_entry(); + } + } + } + + let mut chain = self.chain.write(); + for orphan_to_remove in orphans_to_remove { + chain.forget(&orphan_to_remove); + } + } + /// Awake threads, waiting for this block fn awake_waiting_threads(&mut self, hash: &H256) { // find a peer, which has supplied us with this block @@ -898,20 +957,18 @@ pub mod tests { let (_, _, _, _, sync) = create_sync(None); let mut sync = sync.lock(); - let block2: Block = test_data::block_h169(); - - sync.on_new_blocks_headers(5, vec![block2.block_header.clone()]); - sync.on_peer_block(5, block2); + sync.on_new_blocks_headers(5, vec![test_data::block_h1().block_header.clone(), test_data::block_h2().block_header.clone()]); + sync.on_peer_block(5, test_data::block_h169()); // out-of-order block was presented by the peer - assert!(!sync.information().state.is_synchronizing()); + assert!(sync.information().state.is_synchronizing()); assert_eq!(sync.information().orphaned, 0); assert_eq!(sync.information().chain.scheduled, 0); - assert_eq!(sync.information().chain.requested, 0); + assert_eq!(sync.information().chain.requested, 2); assert_eq!(sync.information().chain.stored, 1); // we have just requested new `inventory` from the peer => peer is forgotten assert_eq!(sync.information().peers.idle, 0); - assert_eq!(sync.information().peers.active, 0); + assert_eq!(sync.information().peers.active, 1); // TODO: check that peer is penalized } @@ -1211,4 +1268,38 @@ pub mod tests { assert_eq!(chain.best_storage_block().number, 4); } } + + #[test] + fn accept_out_of_order_blocks_when_saturated() { + let (_, _, _, chain, sync) = create_sync(None); + let mut sync = sync.lock(); + + sync.on_peer_block(1, test_data::block_h2()); + assert_eq!(sync.information().orphaned, 1); + + { + let chain = chain.read(); + assert_eq!(chain.best_storage_block().number, 0); + } + + sync.on_peer_block(1, test_data::block_h1()); + assert_eq!(sync.information().orphaned, 0); + + { + let chain = chain.read(); + assert_eq!(chain.best_storage_block().number, 2); + } + } + + #[test] + fn do_not_rerequest_unknown_block_in_inventory() { + let (_, _, executor, _, sync) = create_sync(None); + let mut sync = sync.lock(); + + sync.on_peer_block(1, test_data::block_h2()); + sync.on_new_blocks_inventory(1, vec![test_data::block_h1().hash(), test_data::block_h2().hash()]); + + let tasks = executor.lock().take_tasks(); + assert_eq!(tasks, vec![Task::RequestBlocks(1, vec![test_data::block_h1().hash()])]); + } } diff --git a/sync/src/synchronization_manager.rs b/sync/src/synchronization_manager.rs index 85bb6a61..11240129 100644 --- a/sync/src/synchronization_manager.rs +++ b/sync/src/synchronization_manager.rs @@ -1,20 +1,26 @@ use time::precise_time_s; +use linked_hash_map::LinkedHashMap; use synchronization_peers::Peers; use primitives::hash::H256; /// Management interval (in ms) pub const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000; /// Response time to decrease peer score -const FAILURE_INTERVAL_S: f64 = 5f64; +const PEER_FAILURE_INTERVAL_S: f64 = 5f64; +/// Unknown orphan block removal time +const UNKNOWN_BLOCK_REMOVAL_TIME_S: f64 = 20f64 * 60f64; +/// Maximal number of orphaned blocks +const UNKNOWN_BLOCKS_MAX_LEN: usize = 16; -/// Management worker +/// Manage stalled synchronization peers tasks pub fn manage_synchronization_peers(peers: &mut Peers) -> Option> { let mut blocks_to_request: Vec = Vec::new(); + let now = precise_time_s(); // reset tasks for peers, which has not responded during given period for (worst_peer_index, worst_peer_time) in peers.worst_peers() { // check if peer has not responded within given time - let time_diff = worst_peer_time - precise_time_s(); - if time_diff <= FAILURE_INTERVAL_S { + let time_diff = worst_peer_time - now; + if time_diff <= PEER_FAILURE_INTERVAL_S { break; } @@ -31,3 +37,32 @@ pub fn manage_synchronization_peers(peers: &mut Peers) -> Option> { if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) } } + +/// Manage unknown orphaned blocks +pub fn manage_unknown_orphaned_blocks(unknown_blocks: &mut LinkedHashMap) -> Option> { + let mut unknown_to_remove: Vec = Vec::new(); + let mut remove_num = if unknown_blocks.len() > UNKNOWN_BLOCKS_MAX_LEN { UNKNOWN_BLOCKS_MAX_LEN - unknown_blocks.len() } else { 0 }; + let now = precise_time_s(); + for (hash, time) in unknown_blocks.iter() { + // remove oldest blocks if there are more unknown blocks that we can hold in memory + if remove_num > 0 { + unknown_to_remove.push(hash.clone()); + remove_num -= 1; + continue; + } + + // check if block is unknown for too long + let time_diff = time - now; + if time_diff <= UNKNOWN_BLOCK_REMOVAL_TIME_S { + break; + } + unknown_to_remove.push(hash.clone()); + } + + // remove unknown blocks + for unknown_block in unknown_to_remove.iter() { + unknown_blocks.remove(unknown_block); + } + + if unknown_to_remove.is_empty() { None } else { Some(unknown_to_remove) } +}