diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index c1a24af7..244ec19d 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -1124,8 +1124,9 @@ impl SynchronizationClientCore where T: TaskExecutor { let mut client = client.lock(); client.print_synchronization_information(); if client.state.is_synchronizing() || client.state.is_nearly_saturated() { - let blocks_to_request = manage_synchronization_peers_blocks(&peers_config, &mut client.peers); - client.execute_synchronization_tasks(blocks_to_request); + let (blocks_to_request, blocks_to_forget) = manage_synchronization_peers_blocks(&peers_config, &mut client.peers); + client.forget_failed_blocks(blocks_to_forget); // TODO: children of blocks_to_forget can be in blocks_to_request => these have to be removed + client.execute_synchronization_tasks(if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) }); manage_synchronization_peers_inventory(&peers_config, &mut client.peers); manage_orphaned_transactions(&orphan_config, &mut client.orphaned_transactions_pool); @@ -1477,6 +1478,17 @@ impl SynchronizationClientCore where T: TaskExecutor { Some(transactions) } + fn forget_failed_blocks(&mut self, blocks_to_forget: Vec) { + if blocks_to_forget.is_empty() { + return; + } + + let mut chain = self.chain.write(); + for block_to_forget in blocks_to_forget { + chain.forget_block_with_children(&block_to_forget); + } + } + fn prepare_blocks_requests_tasks(&mut self, peers: Vec, mut hashes: Vec) -> Vec { use std::mem::swap; @@ -1687,13 +1699,14 @@ pub mod tests { use test_data; use db::{self, BlockHeaderProvider}; use devtools::RandomTempPath; + use synchronization_client::ClientCore; fn create_disk_storage() -> db::SharedStore { let path = RandomTempPath::create_dir(); Arc::new(db::Storage::new(path.as_path()).unwrap()) } - fn create_sync(storage: Option, verifier: Option) -> (Core, Handle, Arc>, ChainRef, Arc>>) { + fn create_sync(storage: Option, verifier: Option) -> (Core, Handle, Arc>, ChainRef, Arc>>, Arc>>) { let event_loop = event_loop(); let handle = event_loop.handle(); let storage = match storage { @@ -1711,13 +1724,13 @@ pub mod tests { } let mut verifier = verifier.unwrap_or_default(); verifier.set_sink(client_core.clone()); - let client = SynchronizationClient::new(client_core, verifier); - (event_loop, handle, executor, chain, client) + let client = SynchronizationClient::new(client_core.clone(), verifier); + (event_loop, handle, executor, chain, client_core, client) } #[test] fn synchronization_saturated_on_start() { - let (_, _, _, _, sync) = create_sync(None, None); + let (_, _, _, _, _, sync) = create_sync(None, None); let sync = sync.lock(); let info = sync.information(); assert!(!info.state.is_synchronizing()); @@ -1727,7 +1740,7 @@ pub mod tests { #[test] fn synchronization_in_order_block_path_nearly_saturated() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let block1: Block = test_data::block_h1(); @@ -1768,7 +1781,7 @@ pub mod tests { #[test] fn synchronization_out_of_order_block_path() { - let (_, _, _, _, sync) = create_sync(None, None); + let (_, _, _, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_new_blocks_headers(5, vec![test_data::block_h1().block_header.clone(), test_data::block_h2().block_header.clone()]); @@ -1788,7 +1801,7 @@ pub mod tests { #[test] fn synchronization_parallel_peers() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let block1: Block = test_data::block_h1(); let block2: Block = test_data::block_h2(); @@ -1834,7 +1847,7 @@ pub mod tests { #[test] fn synchronization_reset_when_peer_is_disconnected() { - let (_, _, _, _, sync) = create_sync(None, None); + let (_, _, _, _, _, sync) = create_sync(None, None); // request new blocks { @@ -1853,7 +1866,7 @@ pub mod tests { #[test] fn synchronization_not_starting_when_receiving_known_blocks() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); // saturated => receive inventory with known blocks only sync.on_new_blocks_headers(1, vec![test_data::genesis().block_header]); @@ -1866,7 +1879,7 @@ pub mod tests { #[test] fn synchronization_asks_for_inventory_after_saturating() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let block = test_data::block_h1(); sync.on_new_blocks_headers(1, vec![block.block_header.clone()]); @@ -1887,7 +1900,7 @@ pub mod tests { #[test] fn synchronization_remembers_correct_block_headers_in_order() { - let (_, _, executor, chain, sync) = create_sync(None, None); + let (_, _, executor, chain, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -1930,7 +1943,7 @@ pub mod tests { #[test] fn synchronization_remembers_correct_block_headers_out_of_order() { - let (_, _, executor, chain, sync) = create_sync(None, None); + let (_, _, executor, chain, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -1973,7 +1986,7 @@ pub mod tests { #[test] fn synchronization_ignores_unknown_block_headers() { - let (_, _, executor, chain, sync) = create_sync(None, None); + let (_, _, executor, chain, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let b169 = test_data::block_h169(); @@ -1993,7 +2006,7 @@ pub mod tests { let genesis = test_data::genesis(); storage.insert_block(&genesis).expect("no db error"); - let (_, _, executor, chain, sync) = create_sync(Some(storage), None); + let (_, _, executor, chain, _, sync) = create_sync(Some(storage), None); let genesis_header = &genesis.block_header; let fork1 = test_data::build_n_empty_blocks_from(2, 100, &genesis_header); let fork2 = test_data::build_n_empty_blocks_from(3, 200, &genesis_header); @@ -2051,7 +2064,7 @@ pub mod tests { let genesis = test_data::genesis(); storage.insert_block(&genesis).expect("no db error"); - let (_, _, executor, chain, sync) = create_sync(Some(storage), None); + let (_, _, executor, chain, _, sync) = create_sync(Some(storage), None); let common_block = test_data::block_builder().header().parent(genesis.hash()).build().build(); let fork1 = test_data::build_n_empty_blocks_from(2, 100, &common_block.block_header); let fork2 = test_data::build_n_empty_blocks_from(3, 200, &common_block.block_header); @@ -2091,7 +2104,7 @@ pub mod tests { #[test] fn accept_out_of_order_blocks_when_saturated() { - let (_, _, _, chain, sync) = create_sync(None, None); + let (_, _, _, chain, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_peer_block(1, test_data::block_h2().into()); @@ -2113,7 +2126,7 @@ pub mod tests { #[test] fn do_not_rerequest_unknown_block_in_inventory() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_peer_block(1, test_data::block_h2().into()); @@ -2125,7 +2138,7 @@ pub mod tests { #[test] fn blocks_rerequested_on_peer_disconnect() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let block1: Block = test_data::block_h1(); let block2: Block = test_data::block_h2(); @@ -2167,7 +2180,7 @@ pub mod tests { storage.insert_error(block.hash(), db::Error::Consistency(db::ConsistencyError::NoBestBlock)); let best_genesis = storage.best_block().unwrap(); - let (_, _, _, chain, sync) = create_sync(Some(Arc::new(storage)), None); + let (_, _, _, chain, _, sync) = create_sync(Some(Arc::new(storage)), None); let mut sync = sync.lock(); sync.on_peer_block(1, block.into()); @@ -2177,7 +2190,7 @@ pub mod tests { #[test] fn peer_removed_from_sync_after_responding_with_requested_block_notfound() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -2203,7 +2216,7 @@ pub mod tests { #[test] fn peer_not_removed_from_sync_after_responding_with_requested_block_notfound() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -2229,7 +2242,7 @@ pub mod tests { #[test] fn transaction_is_not_requested_when_synchronizing() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -2247,7 +2260,7 @@ pub mod tests { #[test] fn transaction_is_requested_when_not_synchronizing() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_new_transactions_inventory(0, vec![H256::from(0)]); @@ -2271,7 +2284,7 @@ pub mod tests { #[test] fn same_transaction_can_be_requested_twice() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_new_transactions_inventory(0, vec![H256::from(0)]); @@ -2291,7 +2304,7 @@ pub mod tests { #[test] fn known_transaction_is_not_requested() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_new_transactions_inventory(0, vec![test_data::genesis().transactions[0].hash(), H256::from(0)]); @@ -2300,7 +2313,7 @@ pub mod tests { #[test] fn transaction_is_not_accepted_when_synchronizing() { - let (_, _, _, _, sync) = create_sync(None, None); + let (_, _, _, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -2316,7 +2329,7 @@ pub mod tests { #[test] fn transaction_is_accepted_when_not_synchronizing() { - let (_, _, _, _, sync) = create_sync(None, None); + let (_, _, _, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_peer_transaction(1, test_data::TransactionBuilder::with_version(1).into()); @@ -2333,7 +2346,7 @@ pub mod tests { #[test] fn transaction_is_orphaned_when_input_is_unknown() { - let (_, _, _, _, sync) = create_sync(None, None); + let (_, _, _, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_peer_transaction(1, test_data::TransactionBuilder::with_default_input(0).into()); @@ -2347,7 +2360,7 @@ pub mod tests { test_data::TransactionBuilder::with_output(10).store(chain) // t0 .set_input(&chain.at(0), 0).set_output(20).store(chain); // t0 -> t1 - let (_, _, _, _, sync) = create_sync(None, None); + let (_, _, _, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_peer_transaction(1, chain.at(1)); @@ -2391,7 +2404,7 @@ pub mod tests { let mut dummy_verifier = DummyVerifier::default(); dummy_verifier.error_when_verifying(b21.hash(), "simulated"); - let (_, _, _, _, sync) = create_sync(None, Some(dummy_verifier)); + let (_, _, _, _, _, sync) = create_sync(None, Some(dummy_verifier)); let mut sync = sync.lock(); @@ -2411,7 +2424,7 @@ pub mod tests { #[test] fn relay_new_block_when_in_saturated_state() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let genesis = test_data::genesis(); let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); let b1 = test_data::block_builder().header().parent(b0.hash()).build().build(); @@ -2455,7 +2468,7 @@ pub mod tests { #[test] fn relay_new_transaction_when_in_saturated_state() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let tx: Transaction = test_data::TransactionBuilder::with_output(20).into(); let tx_hash = tx.hash(); @@ -2471,7 +2484,7 @@ pub mod tests { #[test] fn relay_new_transaction_with_bloom_filter() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let tx1: Transaction = test_data::TransactionBuilder::with_output(10).into(); let tx2: Transaction = test_data::TransactionBuilder::with_output(20).into(); @@ -2526,7 +2539,7 @@ pub mod tests { #[test] fn relay_new_block_after_sendheaders() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let genesis = test_data::genesis(); let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); @@ -2552,7 +2565,7 @@ pub mod tests { #[test] fn relay_new_transaction_with_feefilter() { - let (_, _, executor, chain, sync) = create_sync(None, None); + let (_, _, executor, chain, _, sync) = create_sync(None, None); let b1 = test_data::block_builder().header().parent(test_data::genesis().hash()).build() .transaction().output().value(1_000_000).build().build() @@ -2602,7 +2615,7 @@ pub mod tests { #[test] fn receive_same_unknown_block_twice() { - let (_, _, _, _, sync) = create_sync(None, None); + let (_, _, _, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); @@ -2613,7 +2626,7 @@ pub mod tests { #[test] fn relay_new_block_after_sendcmpct() { - let (_, _, executor, _, sync) = create_sync(None, None); + let (_, _, executor, _, _, sync) = create_sync(None, None); let genesis = test_data::genesis(); let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); @@ -2670,7 +2683,7 @@ pub mod tests { let mut dummy_verifier = DummyVerifier::default(); dummy_verifier.error_when_verifying(b0.hash(), "simulated"); - let (_, _, executor, _, sync) = create_sync(None, Some(dummy_verifier)); + let (_, _, executor, _, _, sync) = create_sync(None, Some(dummy_verifier)); sync.lock().on_peer_block(0, b0.into()); let tasks = executor.lock().take_tasks(); @@ -2684,7 +2697,7 @@ pub mod tests { let b1 = test_data::block_builder().header().parent(b0.hash()).build().build(); let b2 = test_data::block_builder().header().parent(b1.hash()).build().build(); - let (_, _, executor, chain, sync) = create_sync(None, None); + let (_, _, executor, chain, _, sync) = create_sync(None, None); { chain.write().mark_dead_end_block(&b0.hash()); } @@ -2701,7 +2714,7 @@ pub mod tests { let b1 = test_data::block_builder().header().parent(b0.hash()).build().build(); let b2 = test_data::block_builder().header().parent(b1.hash()).build().build(); - let (_, _, executor, chain, sync) = create_sync(None, None); + let (_, _, executor, chain, _, sync) = create_sync(None, None); { chain.write().mark_dead_end_block(&b1.hash()); } @@ -2716,7 +2729,7 @@ pub mod tests { let genesis = test_data::genesis(); let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); - let (_, _, executor, chain, sync) = create_sync(None, None); + let (_, _, executor, chain, _, sync) = create_sync(None, None); { chain.write().mark_dead_end_block(&b0.hash()); } @@ -2732,7 +2745,7 @@ pub mod tests { let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); let b1 = test_data::block_builder().header().parent(b0.hash()).build().build(); - let (_, _, executor, chain, sync) = create_sync(None, None); + let (_, _, executor, chain, _, sync) = create_sync(None, None); { chain.write().mark_dead_end_block(&b0.hash()); } @@ -2741,4 +2754,39 @@ pub mod tests { let tasks = executor.lock().take_tasks(); assert_eq!(tasks, vec![Task::Close(0)]); } + + #[test] + fn when_peer_does_not_respond_to_block_requests() { + let genesis = test_data::genesis(); + let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); // block we will stuck with + let b1 = test_data::block_builder().header().parent(genesis.hash()).build().build(); // another branch + let b2 = test_data::block_builder().header().parent(b1.hash()).build().build(); + + let (_, _, executor, _, core, sync) = create_sync(None, None); + let mut sync = sync.lock(); + + // when peer1 announces 'false' b0 + sync.on_new_blocks_headers(1, vec![b0.block_header.clone()]); + // and peer2 announces 'true' b1 + sync.on_new_blocks_headers(2, vec![b1.block_header.clone(), b2.block_header.clone()]); + + // check that all blocks are requested + assert_eq!(sync.information().chain.requested, 3); + + // forget tasks + { executor.lock().take_tasks(); } + + // and then peer2 responds with with b1 while b0 is still left in queue + sync.on_peer_block(2, b1.into()); + + // now simulate some time has passed && number of b0 failures is @max level + { + let mut core = core.lock(); + core.forget_failed_blocks(vec![b0.hash()]); + core.execute_synchronization_tasks(None); + } + + // check that only one block (b2) is requested + assert_eq!(sync.information().chain.requested, 1); + } } diff --git a/sync/src/synchronization_manager.rs b/sync/src/synchronization_manager.rs index e876955a..42f909d6 100644 --- a/sync/src/synchronization_manager.rs +++ b/sync/src/synchronization_manager.rs @@ -72,8 +72,9 @@ impl Default for ManageOrphanTransactionsConfig { } /// Manage stalled synchronization peers blocks tasks -pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &mut Peers) -> Option> { +pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &mut Peers) -> (Vec, Vec) { let mut blocks_to_request: Vec = Vec::new(); + let mut blocks_to_forget: Vec = Vec::new(); let now = precise_time_s(); // reset tasks for peers, which has not responded during given period @@ -86,8 +87,12 @@ pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &m // decrease score && move to the idle queue warn!(target: "sync", "Failed to get requested block from peer#{} in {} seconds", worst_peer_index, time_diff); - let peer_tasks = peers.reset_blocks_tasks(worst_peer_index); - blocks_to_request.extend(peer_tasks); + let failed_blocks = peers.reset_blocks_tasks(worst_peer_index); + + // mark blocks as failed + let (normal_blocks, failed_blocks) = peers.on_blocks_failure(failed_blocks); + blocks_to_request.extend(normal_blocks); + blocks_to_forget.extend(failed_blocks); // if peer failed many times => forget it if peers.on_peer_block_failure(worst_peer_index) { @@ -95,7 +100,7 @@ pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &m } } - if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) } + (blocks_to_request, blocks_to_forget) } /// Manage stalled synchronization peers inventory tasks @@ -198,7 +203,7 @@ mod tests { let mut peers = Peers::new(); peers.on_blocks_requested(1, &vec![H256::from(0), H256::from(1)]); peers.on_block_received(1, &H256::from(0)); - assert_eq!(manage_synchronization_peers_blocks(&config, &mut peers), None); + assert_eq!(manage_synchronization_peers_blocks(&config, &mut peers), (vec![], vec![])); assert_eq!(peers.idle_peers_for_blocks(), vec![]); } @@ -212,7 +217,7 @@ mod tests { peers.on_blocks_requested(2, &vec![H256::from(1)]); sleep(Duration::from_millis(1)); - let managed_tasks = manage_synchronization_peers_blocks(&config, &mut peers).expect("managed tasks"); + let managed_tasks = manage_synchronization_peers_blocks(&config, &mut peers).0; assert!(managed_tasks.contains(&H256::from(0))); assert!(managed_tasks.contains(&H256::from(1))); let idle_peers = peers.idle_peers_for_blocks(); diff --git a/sync/src/synchronization_peers.rs b/sync/src/synchronization_peers.rs index 47759385..b3308eb0 100644 --- a/sync/src/synchronization_peers.rs +++ b/sync/src/synchronization_peers.rs @@ -8,6 +8,8 @@ use synchronization_client::BlockAnnouncementType; /// Max peer failures # before excluding from sync process const MAX_PEER_FAILURES: usize = 2; +/// Max blocks failures # before forgetiing this block and restarting sync +const MAX_BLOCKS_FAILURES: usize = 6; /// Set of peers selected for synchronization. #[derive(Debug)] @@ -18,6 +20,8 @@ pub struct Peers { unuseful: HashSet, /// # of failures for given peer. failures: HashMap, + /// # of failures for given block. + blocks_failures: HashMap, /// Peers that are marked as useful for current synchronization session && have pending blocks requests. blocks_requests: HashMap>, /// Last block message time from peer. @@ -50,6 +54,7 @@ impl Peers { idle: HashSet::new(), unuseful: HashSet::new(), failures: HashMap::new(), + blocks_failures: HashMap::new(), blocks_requests: HashMap::new(), blocks_requests_order: LinkedHashMap::new(), inventory_requests: HashSet::new(), @@ -222,6 +227,9 @@ impl Peers { /// Block is received from peer. pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) { + // forget block failures + self.blocks_failures.remove(block_hash); + // if this is requested block && it is last requested block => remove from blocks_requests let try_mark_as_idle = match self.blocks_requests.entry(peer_index) { Entry::Occupied(mut requests_entry) => { @@ -287,6 +295,31 @@ impl Peers { // 2) if it has no new blocks => either synchronization is completed, or it is behind us in sync } + /// We have failed to get blocks + pub fn on_blocks_failure(&mut self, hashes: Vec) -> (Vec, Vec) { + let mut failed_blocks: Vec = Vec::new(); + let mut normal_blocks: Vec = Vec::with_capacity(hashes.len()); + for hash in hashes { + match self.blocks_failures.entry(hash.clone()) { + Entry::Vacant(entry) => { + normal_blocks.push(hash); + entry.insert(0); + }, + Entry::Occupied(mut entry) => { + *entry.get_mut() += 1; + if *entry.get() >= MAX_BLOCKS_FAILURES { + entry.remove(); + failed_blocks.push(hash); + } else { + normal_blocks.push(hash); + } + } + } + } + + (normal_blocks, failed_blocks) + } + /// We have failed to get block from peer during given period pub fn on_peer_block_failure(&mut self, peer_index: usize) -> bool { let peer_failures = match self.failures.entry(peer_index) { @@ -353,7 +386,7 @@ impl Peers { #[cfg(test)] mod tests { - use super::{Peers, MAX_PEER_FAILURES}; + use super::{Peers, MAX_PEER_FAILURES, MAX_BLOCKS_FAILURES}; use primitives::hash::H256; #[test] @@ -530,4 +563,22 @@ mod tests { peers.useful_peer(1); assert_eq!(peers.information().active + peers.information().idle + peers.information().unuseful, 1); } + + #[test] + fn peer_block_failures() { + let mut peers = Peers::new(); + peers.useful_peer(1); + peers.on_blocks_requested(1, &vec![H256::from(1)]); + for _ in 0..MAX_BLOCKS_FAILURES { + let requested_blocks = peers.reset_blocks_tasks(1); + let (blocks_to_request, blocks_to_forget) = peers.on_blocks_failure(requested_blocks); + assert_eq!(blocks_to_request, vec![H256::from(1)]); + assert_eq!(blocks_to_forget, vec![]); + peers.on_blocks_requested(1, &vec![H256::from(1)]); + } + let requested_blocks = peers.reset_blocks_tasks(1); + let (blocks_to_request, blocks_to_forget) = peers.on_blocks_failure(requested_blocks); + assert_eq!(blocks_to_request, vec![]); + assert_eq!(blocks_to_forget, vec![H256::from(1)]); + } }