forget unanswered blocks
This commit is contained in:
parent
9288aed976
commit
f3be8450e6
|
@ -1124,8 +1124,9 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
let mut client = client.lock();
|
||||
client.print_synchronization_information();
|
||||
if client.state.is_synchronizing() || client.state.is_nearly_saturated() {
|
||||
let blocks_to_request = manage_synchronization_peers_blocks(&peers_config, &mut client.peers);
|
||||
client.execute_synchronization_tasks(blocks_to_request);
|
||||
let (blocks_to_request, blocks_to_forget) = manage_synchronization_peers_blocks(&peers_config, &mut client.peers);
|
||||
client.forget_failed_blocks(blocks_to_forget); // TODO: children of blocks_to_forget can be in blocks_to_request => these have to be removed
|
||||
client.execute_synchronization_tasks(if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) });
|
||||
|
||||
manage_synchronization_peers_inventory(&peers_config, &mut client.peers);
|
||||
manage_orphaned_transactions(&orphan_config, &mut client.orphaned_transactions_pool);
|
||||
|
@ -1477,6 +1478,17 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
|||
Some(transactions)
|
||||
}
|
||||
|
||||
fn forget_failed_blocks(&mut self, blocks_to_forget: Vec<H256>) {
|
||||
if blocks_to_forget.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut chain = self.chain.write();
|
||||
for block_to_forget in blocks_to_forget {
|
||||
chain.forget_block_with_children(&block_to_forget);
|
||||
}
|
||||
}
|
||||
|
||||
fn prepare_blocks_requests_tasks(&mut self, peers: Vec<usize>, mut hashes: Vec<H256>) -> Vec<Task> {
|
||||
use std::mem::swap;
|
||||
|
||||
|
@ -1687,13 +1699,14 @@ pub mod tests {
|
|||
use test_data;
|
||||
use db::{self, BlockHeaderProvider};
|
||||
use devtools::RandomTempPath;
|
||||
use synchronization_client::ClientCore;
|
||||
|
||||
fn create_disk_storage() -> db::SharedStore {
|
||||
let path = RandomTempPath::create_dir();
|
||||
Arc::new(db::Storage::new(path.as_path()).unwrap())
|
||||
}
|
||||
|
||||
fn create_sync(storage: Option<db::SharedStore>, verifier: Option<DummyVerifier>) -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, ChainRef, Arc<Mutex<SynchronizationClient<DummyTaskExecutor, DummyVerifier>>>) {
|
||||
fn create_sync(storage: Option<db::SharedStore>, verifier: Option<DummyVerifier>) -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, ChainRef, Arc<Mutex<SynchronizationClientCore<DummyTaskExecutor>>>, Arc<Mutex<SynchronizationClient<DummyTaskExecutor, DummyVerifier>>>) {
|
||||
let event_loop = event_loop();
|
||||
let handle = event_loop.handle();
|
||||
let storage = match storage {
|
||||
|
@ -1711,13 +1724,13 @@ pub mod tests {
|
|||
}
|
||||
let mut verifier = verifier.unwrap_or_default();
|
||||
verifier.set_sink(client_core.clone());
|
||||
let client = SynchronizationClient::new(client_core, verifier);
|
||||
(event_loop, handle, executor, chain, client)
|
||||
let client = SynchronizationClient::new(client_core.clone(), verifier);
|
||||
(event_loop, handle, executor, chain, client_core, client)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn synchronization_saturated_on_start() {
|
||||
let (_, _, _, _, sync) = create_sync(None, None);
|
||||
let (_, _, _, _, _, sync) = create_sync(None, None);
|
||||
let sync = sync.lock();
|
||||
let info = sync.information();
|
||||
assert!(!info.state.is_synchronizing());
|
||||
|
@ -1727,7 +1740,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_in_order_block_path_nearly_saturated() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
|
||||
let mut sync = sync.lock();
|
||||
let block1: Block = test_data::block_h1();
|
||||
|
@ -1768,7 +1781,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_out_of_order_block_path() {
|
||||
let (_, _, _, _, sync) = create_sync(None, None);
|
||||
let (_, _, _, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
sync.on_new_blocks_headers(5, vec![test_data::block_h1().block_header.clone(), test_data::block_h2().block_header.clone()]);
|
||||
|
@ -1788,7 +1801,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_parallel_peers() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
|
||||
let block1: Block = test_data::block_h1();
|
||||
let block2: Block = test_data::block_h2();
|
||||
|
@ -1834,7 +1847,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_reset_when_peer_is_disconnected() {
|
||||
let (_, _, _, _, sync) = create_sync(None, None);
|
||||
let (_, _, _, _, _, sync) = create_sync(None, None);
|
||||
|
||||
// request new blocks
|
||||
{
|
||||
|
@ -1853,7 +1866,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_not_starting_when_receiving_known_blocks() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
// saturated => receive inventory with known blocks only
|
||||
sync.on_new_blocks_headers(1, vec![test_data::genesis().block_header]);
|
||||
|
@ -1866,7 +1879,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_asks_for_inventory_after_saturating() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
let block = test_data::block_h1();
|
||||
sync.on_new_blocks_headers(1, vec![block.block_header.clone()]);
|
||||
|
@ -1887,7 +1900,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_remembers_correct_block_headers_in_order() {
|
||||
let (_, _, executor, chain, sync) = create_sync(None, None);
|
||||
let (_, _, executor, chain, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
let b1 = test_data::block_h1();
|
||||
|
@ -1930,7 +1943,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_remembers_correct_block_headers_out_of_order() {
|
||||
let (_, _, executor, chain, sync) = create_sync(None, None);
|
||||
let (_, _, executor, chain, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
let b1 = test_data::block_h1();
|
||||
|
@ -1973,7 +1986,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn synchronization_ignores_unknown_block_headers() {
|
||||
let (_, _, executor, chain, sync) = create_sync(None, None);
|
||||
let (_, _, executor, chain, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
let b169 = test_data::block_h169();
|
||||
|
@ -1993,7 +2006,7 @@ pub mod tests {
|
|||
let genesis = test_data::genesis();
|
||||
storage.insert_block(&genesis).expect("no db error");
|
||||
|
||||
let (_, _, executor, chain, sync) = create_sync(Some(storage), None);
|
||||
let (_, _, executor, chain, _, sync) = create_sync(Some(storage), None);
|
||||
let genesis_header = &genesis.block_header;
|
||||
let fork1 = test_data::build_n_empty_blocks_from(2, 100, &genesis_header);
|
||||
let fork2 = test_data::build_n_empty_blocks_from(3, 200, &genesis_header);
|
||||
|
@ -2051,7 +2064,7 @@ pub mod tests {
|
|||
let genesis = test_data::genesis();
|
||||
storage.insert_block(&genesis).expect("no db error");
|
||||
|
||||
let (_, _, executor, chain, sync) = create_sync(Some(storage), None);
|
||||
let (_, _, executor, chain, _, sync) = create_sync(Some(storage), None);
|
||||
let common_block = test_data::block_builder().header().parent(genesis.hash()).build().build();
|
||||
let fork1 = test_data::build_n_empty_blocks_from(2, 100, &common_block.block_header);
|
||||
let fork2 = test_data::build_n_empty_blocks_from(3, 200, &common_block.block_header);
|
||||
|
@ -2091,7 +2104,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn accept_out_of_order_blocks_when_saturated() {
|
||||
let (_, _, _, chain, sync) = create_sync(None, None);
|
||||
let (_, _, _, chain, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
sync.on_peer_block(1, test_data::block_h2().into());
|
||||
|
@ -2113,7 +2126,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn do_not_rerequest_unknown_block_in_inventory() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
sync.on_peer_block(1, test_data::block_h2().into());
|
||||
|
@ -2125,7 +2138,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn blocks_rerequested_on_peer_disconnect() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
|
||||
let block1: Block = test_data::block_h1();
|
||||
let block2: Block = test_data::block_h2();
|
||||
|
@ -2167,7 +2180,7 @@ pub mod tests {
|
|||
storage.insert_error(block.hash(), db::Error::Consistency(db::ConsistencyError::NoBestBlock));
|
||||
let best_genesis = storage.best_block().unwrap();
|
||||
|
||||
let (_, _, _, chain, sync) = create_sync(Some(Arc::new(storage)), None);
|
||||
let (_, _, _, chain, _, sync) = create_sync(Some(Arc::new(storage)), None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
sync.on_peer_block(1, block.into());
|
||||
|
@ -2177,7 +2190,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn peer_removed_from_sync_after_responding_with_requested_block_notfound() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
let b1 = test_data::block_h1();
|
||||
|
@ -2203,7 +2216,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn peer_not_removed_from_sync_after_responding_with_requested_block_notfound() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
let b1 = test_data::block_h1();
|
||||
|
@ -2229,7 +2242,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn transaction_is_not_requested_when_synchronizing() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
let b1 = test_data::block_h1();
|
||||
|
@ -2247,7 +2260,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn transaction_is_requested_when_not_synchronizing() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
sync.on_new_transactions_inventory(0, vec![H256::from(0)]);
|
||||
|
@ -2271,7 +2284,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn same_transaction_can_be_requested_twice() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
sync.on_new_transactions_inventory(0, vec![H256::from(0)]);
|
||||
|
@ -2291,7 +2304,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn known_transaction_is_not_requested() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
sync.on_new_transactions_inventory(0, vec![test_data::genesis().transactions[0].hash(), H256::from(0)]);
|
||||
|
@ -2300,7 +2313,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn transaction_is_not_accepted_when_synchronizing() {
|
||||
let (_, _, _, _, sync) = create_sync(None, None);
|
||||
let (_, _, _, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
let b1 = test_data::block_h1();
|
||||
|
@ -2316,7 +2329,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn transaction_is_accepted_when_not_synchronizing() {
|
||||
let (_, _, _, _, sync) = create_sync(None, None);
|
||||
let (_, _, _, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
sync.on_peer_transaction(1, test_data::TransactionBuilder::with_version(1).into());
|
||||
|
@ -2333,7 +2346,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn transaction_is_orphaned_when_input_is_unknown() {
|
||||
let (_, _, _, _, sync) = create_sync(None, None);
|
||||
let (_, _, _, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
sync.on_peer_transaction(1, test_data::TransactionBuilder::with_default_input(0).into());
|
||||
|
@ -2347,7 +2360,7 @@ pub mod tests {
|
|||
test_data::TransactionBuilder::with_output(10).store(chain) // t0
|
||||
.set_input(&chain.at(0), 0).set_output(20).store(chain); // t0 -> t1
|
||||
|
||||
let (_, _, _, _, sync) = create_sync(None, None);
|
||||
let (_, _, _, _, _, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
sync.on_peer_transaction(1, chain.at(1));
|
||||
|
@ -2391,7 +2404,7 @@ pub mod tests {
|
|||
let mut dummy_verifier = DummyVerifier::default();
|
||||
dummy_verifier.error_when_verifying(b21.hash(), "simulated");
|
||||
|
||||
let (_, _, _, _, sync) = create_sync(None, Some(dummy_verifier));
|
||||
let (_, _, _, _, _, sync) = create_sync(None, Some(dummy_verifier));
|
||||
|
||||
let mut sync = sync.lock();
|
||||
|
||||
|
@ -2411,7 +2424,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn relay_new_block_when_in_saturated_state() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let genesis = test_data::genesis();
|
||||
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
|
||||
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
|
||||
|
@ -2455,7 +2468,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn relay_new_transaction_when_in_saturated_state() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
|
||||
let tx: Transaction = test_data::TransactionBuilder::with_output(20).into();
|
||||
let tx_hash = tx.hash();
|
||||
|
@ -2471,7 +2484,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn relay_new_transaction_with_bloom_filter() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
|
||||
let tx1: Transaction = test_data::TransactionBuilder::with_output(10).into();
|
||||
let tx2: Transaction = test_data::TransactionBuilder::with_output(20).into();
|
||||
|
@ -2526,7 +2539,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn relay_new_block_after_sendheaders() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let genesis = test_data::genesis();
|
||||
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
|
||||
|
||||
|
@ -2552,7 +2565,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn relay_new_transaction_with_feefilter() {
|
||||
let (_, _, executor, chain, sync) = create_sync(None, None);
|
||||
let (_, _, executor, chain, _, sync) = create_sync(None, None);
|
||||
|
||||
let b1 = test_data::block_builder().header().parent(test_data::genesis().hash()).build()
|
||||
.transaction().output().value(1_000_000).build().build()
|
||||
|
@ -2602,7 +2615,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn receive_same_unknown_block_twice() {
|
||||
let (_, _, _, _, sync) = create_sync(None, None);
|
||||
let (_, _, _, _, _, sync) = create_sync(None, None);
|
||||
|
||||
let mut sync = sync.lock();
|
||||
|
||||
|
@ -2613,7 +2626,7 @@ pub mod tests {
|
|||
|
||||
#[test]
|
||||
fn relay_new_block_after_sendcmpct() {
|
||||
let (_, _, executor, _, sync) = create_sync(None, None);
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, None);
|
||||
let genesis = test_data::genesis();
|
||||
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
|
||||
|
||||
|
@ -2670,7 +2683,7 @@ pub mod tests {
|
|||
let mut dummy_verifier = DummyVerifier::default();
|
||||
dummy_verifier.error_when_verifying(b0.hash(), "simulated");
|
||||
|
||||
let (_, _, executor, _, sync) = create_sync(None, Some(dummy_verifier));
|
||||
let (_, _, executor, _, _, sync) = create_sync(None, Some(dummy_verifier));
|
||||
sync.lock().on_peer_block(0, b0.into());
|
||||
|
||||
let tasks = executor.lock().take_tasks();
|
||||
|
@ -2684,7 +2697,7 @@ pub mod tests {
|
|||
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
|
||||
let b2 = test_data::block_builder().header().parent(b1.hash()).build().build();
|
||||
|
||||
let (_, _, executor, chain, sync) = create_sync(None, None);
|
||||
let (_, _, executor, chain, _, sync) = create_sync(None, None);
|
||||
{
|
||||
chain.write().mark_dead_end_block(&b0.hash());
|
||||
}
|
||||
|
@ -2701,7 +2714,7 @@ pub mod tests {
|
|||
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
|
||||
let b2 = test_data::block_builder().header().parent(b1.hash()).build().build();
|
||||
|
||||
let (_, _, executor, chain, sync) = create_sync(None, None);
|
||||
let (_, _, executor, chain, _, sync) = create_sync(None, None);
|
||||
{
|
||||
chain.write().mark_dead_end_block(&b1.hash());
|
||||
}
|
||||
|
@ -2716,7 +2729,7 @@ pub mod tests {
|
|||
let genesis = test_data::genesis();
|
||||
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
|
||||
|
||||
let (_, _, executor, chain, sync) = create_sync(None, None);
|
||||
let (_, _, executor, chain, _, sync) = create_sync(None, None);
|
||||
{
|
||||
chain.write().mark_dead_end_block(&b0.hash());
|
||||
}
|
||||
|
@ -2732,7 +2745,7 @@ pub mod tests {
|
|||
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build();
|
||||
let b1 = test_data::block_builder().header().parent(b0.hash()).build().build();
|
||||
|
||||
let (_, _, executor, chain, sync) = create_sync(None, None);
|
||||
let (_, _, executor, chain, _, sync) = create_sync(None, None);
|
||||
{
|
||||
chain.write().mark_dead_end_block(&b0.hash());
|
||||
}
|
||||
|
@ -2741,4 +2754,39 @@ pub mod tests {
|
|||
let tasks = executor.lock().take_tasks();
|
||||
assert_eq!(tasks, vec![Task::Close(0)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn when_peer_does_not_respond_to_block_requests() {
|
||||
let genesis = test_data::genesis();
|
||||
let b0 = test_data::block_builder().header().parent(genesis.hash()).build().build(); // block we will stuck with
|
||||
let b1 = test_data::block_builder().header().parent(genesis.hash()).build().build(); // another branch
|
||||
let b2 = test_data::block_builder().header().parent(b1.hash()).build().build();
|
||||
|
||||
let (_, _, executor, _, core, sync) = create_sync(None, None);
|
||||
let mut sync = sync.lock();
|
||||
|
||||
// when peer1 announces 'false' b0
|
||||
sync.on_new_blocks_headers(1, vec![b0.block_header.clone()]);
|
||||
// and peer2 announces 'true' b1
|
||||
sync.on_new_blocks_headers(2, vec![b1.block_header.clone(), b2.block_header.clone()]);
|
||||
|
||||
// check that all blocks are requested
|
||||
assert_eq!(sync.information().chain.requested, 3);
|
||||
|
||||
// forget tasks
|
||||
{ executor.lock().take_tasks(); }
|
||||
|
||||
// and then peer2 responds with with b1 while b0 is still left in queue
|
||||
sync.on_peer_block(2, b1.into());
|
||||
|
||||
// now simulate some time has passed && number of b0 failures is @max level
|
||||
{
|
||||
let mut core = core.lock();
|
||||
core.forget_failed_blocks(vec![b0.hash()]);
|
||||
core.execute_synchronization_tasks(None);
|
||||
}
|
||||
|
||||
// check that only one block (b2) is requested
|
||||
assert_eq!(sync.information().chain.requested, 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,8 +72,9 @@ impl Default for ManageOrphanTransactionsConfig {
|
|||
}
|
||||
|
||||
/// Manage stalled synchronization peers blocks tasks
|
||||
pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &mut Peers) -> Option<Vec<H256>> {
|
||||
pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &mut Peers) -> (Vec<H256>, Vec<H256>) {
|
||||
let mut blocks_to_request: Vec<H256> = Vec::new();
|
||||
let mut blocks_to_forget: Vec<H256> = Vec::new();
|
||||
let now = precise_time_s();
|
||||
|
||||
// reset tasks for peers, which has not responded during given period
|
||||
|
@ -86,8 +87,12 @@ pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &m
|
|||
|
||||
// decrease score && move to the idle queue
|
||||
warn!(target: "sync", "Failed to get requested block from peer#{} in {} seconds", worst_peer_index, time_diff);
|
||||
let peer_tasks = peers.reset_blocks_tasks(worst_peer_index);
|
||||
blocks_to_request.extend(peer_tasks);
|
||||
let failed_blocks = peers.reset_blocks_tasks(worst_peer_index);
|
||||
|
||||
// mark blocks as failed
|
||||
let (normal_blocks, failed_blocks) = peers.on_blocks_failure(failed_blocks);
|
||||
blocks_to_request.extend(normal_blocks);
|
||||
blocks_to_forget.extend(failed_blocks);
|
||||
|
||||
// if peer failed many times => forget it
|
||||
if peers.on_peer_block_failure(worst_peer_index) {
|
||||
|
@ -95,7 +100,7 @@ pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &m
|
|||
}
|
||||
}
|
||||
|
||||
if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) }
|
||||
(blocks_to_request, blocks_to_forget)
|
||||
}
|
||||
|
||||
/// Manage stalled synchronization peers inventory tasks
|
||||
|
@ -198,7 +203,7 @@ mod tests {
|
|||
let mut peers = Peers::new();
|
||||
peers.on_blocks_requested(1, &vec![H256::from(0), H256::from(1)]);
|
||||
peers.on_block_received(1, &H256::from(0));
|
||||
assert_eq!(manage_synchronization_peers_blocks(&config, &mut peers), None);
|
||||
assert_eq!(manage_synchronization_peers_blocks(&config, &mut peers), (vec![], vec![]));
|
||||
assert_eq!(peers.idle_peers_for_blocks(), vec![]);
|
||||
}
|
||||
|
||||
|
@ -212,7 +217,7 @@ mod tests {
|
|||
peers.on_blocks_requested(2, &vec![H256::from(1)]);
|
||||
sleep(Duration::from_millis(1));
|
||||
|
||||
let managed_tasks = manage_synchronization_peers_blocks(&config, &mut peers).expect("managed tasks");
|
||||
let managed_tasks = manage_synchronization_peers_blocks(&config, &mut peers).0;
|
||||
assert!(managed_tasks.contains(&H256::from(0)));
|
||||
assert!(managed_tasks.contains(&H256::from(1)));
|
||||
let idle_peers = peers.idle_peers_for_blocks();
|
||||
|
|
|
@ -8,6 +8,8 @@ use synchronization_client::BlockAnnouncementType;
|
|||
|
||||
/// Max peer failures # before excluding from sync process
|
||||
const MAX_PEER_FAILURES: usize = 2;
|
||||
/// Max blocks failures # before forgetiing this block and restarting sync
|
||||
const MAX_BLOCKS_FAILURES: usize = 6;
|
||||
|
||||
/// Set of peers selected for synchronization.
|
||||
#[derive(Debug)]
|
||||
|
@ -18,6 +20,8 @@ pub struct Peers {
|
|||
unuseful: HashSet<usize>,
|
||||
/// # of failures for given peer.
|
||||
failures: HashMap<usize, usize>,
|
||||
/// # of failures for given block.
|
||||
blocks_failures: HashMap<H256, usize>,
|
||||
/// Peers that are marked as useful for current synchronization session && have pending blocks requests.
|
||||
blocks_requests: HashMap<usize, HashSet<H256>>,
|
||||
/// Last block message time from peer.
|
||||
|
@ -50,6 +54,7 @@ impl Peers {
|
|||
idle: HashSet::new(),
|
||||
unuseful: HashSet::new(),
|
||||
failures: HashMap::new(),
|
||||
blocks_failures: HashMap::new(),
|
||||
blocks_requests: HashMap::new(),
|
||||
blocks_requests_order: LinkedHashMap::new(),
|
||||
inventory_requests: HashSet::new(),
|
||||
|
@ -222,6 +227,9 @@ impl Peers {
|
|||
|
||||
/// Block is received from peer.
|
||||
pub fn on_block_received(&mut self, peer_index: usize, block_hash: &H256) {
|
||||
// forget block failures
|
||||
self.blocks_failures.remove(block_hash);
|
||||
|
||||
// if this is requested block && it is last requested block => remove from blocks_requests
|
||||
let try_mark_as_idle = match self.blocks_requests.entry(peer_index) {
|
||||
Entry::Occupied(mut requests_entry) => {
|
||||
|
@ -287,6 +295,31 @@ impl Peers {
|
|||
// 2) if it has no new blocks => either synchronization is completed, or it is behind us in sync
|
||||
}
|
||||
|
||||
/// We have failed to get blocks
|
||||
pub fn on_blocks_failure(&mut self, hashes: Vec<H256>) -> (Vec<H256>, Vec<H256>) {
|
||||
let mut failed_blocks: Vec<H256> = Vec::new();
|
||||
let mut normal_blocks: Vec<H256> = Vec::with_capacity(hashes.len());
|
||||
for hash in hashes {
|
||||
match self.blocks_failures.entry(hash.clone()) {
|
||||
Entry::Vacant(entry) => {
|
||||
normal_blocks.push(hash);
|
||||
entry.insert(0);
|
||||
},
|
||||
Entry::Occupied(mut entry) => {
|
||||
*entry.get_mut() += 1;
|
||||
if *entry.get() >= MAX_BLOCKS_FAILURES {
|
||||
entry.remove();
|
||||
failed_blocks.push(hash);
|
||||
} else {
|
||||
normal_blocks.push(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(normal_blocks, failed_blocks)
|
||||
}
|
||||
|
||||
/// We have failed to get block from peer during given period
|
||||
pub fn on_peer_block_failure(&mut self, peer_index: usize) -> bool {
|
||||
let peer_failures = match self.failures.entry(peer_index) {
|
||||
|
@ -353,7 +386,7 @@ impl Peers {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{Peers, MAX_PEER_FAILURES};
|
||||
use super::{Peers, MAX_PEER_FAILURES, MAX_BLOCKS_FAILURES};
|
||||
use primitives::hash::H256;
|
||||
|
||||
#[test]
|
||||
|
@ -530,4 +563,22 @@ mod tests {
|
|||
peers.useful_peer(1);
|
||||
assert_eq!(peers.information().active + peers.information().idle + peers.information().unuseful, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_block_failures() {
|
||||
let mut peers = Peers::new();
|
||||
peers.useful_peer(1);
|
||||
peers.on_blocks_requested(1, &vec![H256::from(1)]);
|
||||
for _ in 0..MAX_BLOCKS_FAILURES {
|
||||
let requested_blocks = peers.reset_blocks_tasks(1);
|
||||
let (blocks_to_request, blocks_to_forget) = peers.on_blocks_failure(requested_blocks);
|
||||
assert_eq!(blocks_to_request, vec![H256::from(1)]);
|
||||
assert_eq!(blocks_to_forget, vec![]);
|
||||
peers.on_blocks_requested(1, &vec![H256::from(1)]);
|
||||
}
|
||||
let requested_blocks = peers.reset_blocks_tasks(1);
|
||||
let (blocks_to_request, blocks_to_forget) = peers.on_blocks_failure(requested_blocks);
|
||||
assert_eq!(blocks_to_request, vec![]);
|
||||
assert_eq!(blocks_to_forget, vec![H256::from(1)]);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue