final call for blocks

This commit is contained in:
Svyatoslav Nikolsky 2016-12-06 08:57:50 +03:00
parent f3be8450e6
commit f6ca4ca61c
1 changed files with 26 additions and 14 deletions

View File

@ -233,7 +233,7 @@ pub trait ClientCore : VerificationSink {
fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter); fn on_peer_feefilter(&mut self, peer_index: usize, message: &types::FeeFilter);
fn on_peer_disconnected(&mut self, peer_index: usize); fn on_peer_disconnected(&mut self, peer_index: usize);
fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>); fn after_peer_nearly_blocks_verified(&mut self, peer_index: usize, future: BoxFuture<(), ()>);
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>); fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>, final_blocks_requests: Option<Vec<H256>>);
fn try_switch_to_saturated_state(&mut self) -> bool; fn try_switch_to_saturated_state(&mut self) -> bool;
} }
@ -435,7 +435,7 @@ impl<T, U> Client for SynchronizationClient<T, U> where T: TaskExecutor, U: Veri
{ {
let mut client = self.core.lock(); let mut client = self.core.lock();
if !client.try_switch_to_saturated_state() { if !client.try_switch_to_saturated_state() {
client.execute_synchronization_tasks(None); client.execute_synchronization_tasks(None, None);
} }
} }
} }
@ -686,7 +686,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
// now insert unknown blocks to the queue // now insert unknown blocks to the queue
self.process_new_blocks_headers(peer_index, blocks_hashes, blocks_headers); self.process_new_blocks_headers(peer_index, blocks_hashes, blocks_headers);
self.execute_synchronization_tasks(None); self.execute_synchronization_tasks(None, None);
} }
/// When peer has no blocks /// When peer has no blocks
@ -704,7 +704,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
self.peers.unuseful_peer(peer_index); self.peers.unuseful_peer(peer_index);
// if peer has had some blocks tasks, rerequest these blocks // if peer has had some blocks tasks, rerequest these blocks
self.execute_synchronization_tasks(Some(removed_tasks)); self.execute_synchronization_tasks(Some(removed_tasks), None);
} }
} }
@ -772,7 +772,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
if !self.peers.has_any_useful() { if !self.peers.has_any_useful() {
self.switch_to_saturated_state(); self.switch_to_saturated_state();
} else if peer_tasks.is_some() { } else if peer_tasks.is_some() {
self.execute_synchronization_tasks(peer_tasks); self.execute_synchronization_tasks(peer_tasks, None);
} }
} }
@ -795,7 +795,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
} }
/// Schedule new synchronization tasks, if any. /// Schedule new synchronization tasks, if any.
fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>) { fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option<Vec<H256>>, final_blocks_requests: Option<Vec<H256>>) {
let mut tasks: Vec<Task> = Vec::new(); let mut tasks: Vec<Task> = Vec::new();
// display information if processed many blocks || enough time has passed since sync start // display information if processed many blocks || enough time has passed since sync start
@ -815,6 +815,15 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
tasks.extend(forced_tasks); tasks.extend(forced_tasks);
} }
// if some blocks requests are marked as last [i.e. blocks are potentialy wrong] => ask peers anyway
if let Some(final_blocks_requests) = final_blocks_requests {
let useful_peers = self.peers.useful_peers();
if !useful_peers.is_empty() { // if empty => not a problem, just forget these blocks
let forced_tasks = self.prepare_blocks_requests_tasks(useful_peers, final_blocks_requests);
tasks.extend(forced_tasks);
}
}
let mut blocks_requests: Option<Vec<H256>> = None; let mut blocks_requests: Option<Vec<H256>> = None;
let blocks_idle_peers = self.peers.idle_peers_for_blocks(); let blocks_idle_peers = self.peers.idle_peers_for_blocks();
{ {
@ -971,7 +980,7 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
self.awake_waiting_threads(&hash); self.awake_waiting_threads(&hash);
// continue with synchronization // continue with synchronization
self.execute_synchronization_tasks(None); self.execute_synchronization_tasks(None, None);
// relay block to our peers // relay block to our peers
// TODO: // TODO:
@ -1038,7 +1047,7 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
self.awake_waiting_threads(hash); self.awake_waiting_threads(hash);
// start new tasks // start new tasks
self.execute_synchronization_tasks(None); self.execute_synchronization_tasks(None, None);
} }
/// Process successful transaction verification /// Process successful transaction verification
@ -1125,8 +1134,11 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
client.print_synchronization_information(); client.print_synchronization_information();
if client.state.is_synchronizing() || client.state.is_nearly_saturated() { if client.state.is_synchronizing() || client.state.is_nearly_saturated() {
let (blocks_to_request, blocks_to_forget) = manage_synchronization_peers_blocks(&peers_config, &mut client.peers); let (blocks_to_request, blocks_to_forget) = manage_synchronization_peers_blocks(&peers_config, &mut client.peers);
client.forget_failed_blocks(blocks_to_forget); // TODO: children of blocks_to_forget can be in blocks_to_request => these have to be removed client.forget_failed_blocks(&blocks_to_forget); // TODO: children of blocks_to_forget can be in blocks_to_request => these have to be removed
client.execute_synchronization_tasks(if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) }); client.execute_synchronization_tasks(
if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) },
if blocks_to_forget.is_empty() { None } else { Some(blocks_to_forget) },
);
manage_synchronization_peers_inventory(&peers_config, &mut client.peers); manage_synchronization_peers_inventory(&peers_config, &mut client.peers);
manage_orphaned_transactions(&orphan_config, &mut client.orphaned_transactions_pool); manage_orphaned_transactions(&orphan_config, &mut client.orphaned_transactions_pool);
@ -1478,14 +1490,14 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
Some(transactions) Some(transactions)
} }
fn forget_failed_blocks(&mut self, blocks_to_forget: Vec<H256>) { fn forget_failed_blocks(&mut self, blocks_to_forget: &[H256]) {
if blocks_to_forget.is_empty() { if blocks_to_forget.is_empty() {
return; return;
} }
let mut chain = self.chain.write(); let mut chain = self.chain.write();
for block_to_forget in blocks_to_forget { for block_to_forget in blocks_to_forget {
chain.forget_block_with_children(&block_to_forget); chain.forget_block_with_children(block_to_forget);
} }
} }
@ -2782,8 +2794,8 @@ pub mod tests {
// now simulate some time has passed && number of b0 failures is @max level // now simulate some time has passed && number of b0 failures is @max level
{ {
let mut core = core.lock(); let mut core = core.lock();
core.forget_failed_blocks(vec![b0.hash()]); core.forget_failed_blocks(&vec![b0.hash()]);
core.execute_synchronization_tasks(None); core.execute_synchronization_tasks(None, Some(vec![b0.hash()]));
} }
// check that only one block (b2) is requested // check that only one block (b2) is requested