diff --git a/sync/src/blocks_writer.rs b/sync/src/blocks_writer.rs index 32807ffd..bb618b2a 100644 --- a/sync/src/blocks_writer.rs +++ b/sync/src/blocks_writer.rs @@ -5,7 +5,7 @@ use chain; use db; use network::Magic; use orphan_blocks_pool::OrphanBlocksPool; -use synchronization_verifier::{Verifier, SyncVerifier, VerificationSink}; +use synchronization_verifier::{Verifier, SyncVerifier, VerificationSink, VerificationTask}; use primitives::hash::H256; use super::Error; @@ -75,10 +75,11 @@ impl BlocksWriterSink { } impl VerificationSink for BlocksWriterSink { - fn on_block_verification_success(&mut self, block: db::IndexedBlock) { + fn on_block_verification_success(&mut self, block: db::IndexedBlock) -> Option> { if let Err(err) = self.storage.insert_indexed_block(&block) { self.err = Some(Error::Database(err)); } + None } fn on_block_verification_error(&mut self, err: &str, _hash: &H256) { diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 4721fb06..f730b082 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -24,7 +24,7 @@ use synchronization_server::ServerTaskIndex; use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory, manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig}; -use synchronization_verifier::{Verifier, VerificationSink}; +use synchronization_verifier::{Verifier, VerificationSink, VerificationTask}; use compact_block_builder::build_compact_block; use hash_queue::HashPosition; use miner::transaction_fee_rate; @@ -776,7 +776,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { impl VerificationSink for SynchronizationClientCore where T: TaskExecutor { /// Process successful block verification - fn on_block_verification_success(&mut self, block: IndexedBlock) { + fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option> { let hash = block.hash(); // insert block to the storage match { @@ -810,14 +810,20 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor } // deal with block transactions + let mut verification_tasks: Vec = Vec::with_capacity(insert_result.transactions_to_reverify.len()); for (hash, tx) in insert_result.transactions_to_reverify { // TODO: transactions from this blocks will be relayed. Do we need this? - self.process_peer_transaction(None, hash, tx); + if let Some(tx_orphans) = self.process_peer_transaction(None, hash, tx) { + let tx_tasks = tx_orphans.into_iter().map(|(_, tx)| VerificationTask::VerifyTransaction(tx)); + verification_tasks.extend(tx_tasks); + }; } + Some(verification_tasks) }, Err(db::Error::Consistency(e)) => { // process as verification error self.on_block_verification_error(&format!("{:?}", db::Error::Consistency(e)), &hash); + None }, Err(e) => { // process as irrecoverable failure diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index 21ead87e..2be61676 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -1,4 +1,5 @@ use std::thread; +use std::collections::VecDeque; use std::sync::Arc; use std::sync::mpsc::{channel, Sender, Receiver}; use parking_lot::Mutex; @@ -11,7 +12,7 @@ use db::{SharedStore, IndexedBlock}; /// Verification events sink pub trait VerificationSink : Send + 'static { /// When block verification has completed successfully. - fn on_block_verification_success(&mut self, block: IndexedBlock); + fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option>; /// When block verification has failed. fn on_block_verification_error(&mut self, err: &str, hash: &H256); /// When transaction verification has completed successfully. @@ -21,7 +22,8 @@ pub trait VerificationSink : Send + 'static { } /// Verification thread tasks -enum VerificationTask { +#[derive(Debug)] +pub enum VerificationTask { /// Verify single block VerifyBlock(IndexedBlock), /// Verify single transaction @@ -133,26 +135,33 @@ impl Verifier for SyncVerifier where T: VerificationSink { /// Execute single verification task fn execute_verification_task(sink: &Arc>, verifier: &ChainVerifier, task: VerificationTask) { - match task { - VerificationTask::VerifyBlock(block) => { - // verify block - match verifier.verify(&block) { - Ok(Chain::Main) | Ok(Chain::Side) => { - sink.lock().on_block_verification_success(block) - }, - Ok(Chain::Orphan) => { - unreachable!("sync will never put orphaned blocks to verification queue"); - }, - Err(e) => { - sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash()) + let mut tasks_queue: VecDeque = VecDeque::new(); + tasks_queue.push_back(task); + + while let Some(task) = tasks_queue.pop_front() { + match task { + VerificationTask::VerifyBlock(block) => { + // verify block + match verifier.verify(&block) { + Ok(Chain::Main) | Ok(Chain::Side) => { + if let Some(tasks) = sink.lock().on_block_verification_success(block) { + tasks_queue.extend(tasks); + } + }, + Ok(Chain::Orphan) => { + unreachable!("sync will never put orphaned blocks to verification queue"); + }, + Err(e) => { + sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash()) + } } - } - }, - VerificationTask::VerifyTransaction(transaction) => { - // TODO: add verification here - sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash()) - }, - _ => unreachable!("must be checked by caller"), + }, + VerificationTask::VerifyTransaction(transaction) => { + // TODO: add verification here + sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash()) + }, + _ => unreachable!("must be checked by caller"), + } } } @@ -189,7 +198,10 @@ pub mod tests { match self.sink { Some(ref sink) => match self.errors.get(&block.hash()) { Some(err) => sink.lock().on_block_verification_error(&err, &block.hash()), - None => sink.lock().on_block_verification_success(block), + None => { + sink.lock().on_block_verification_success(block); + () + }, }, None => panic!("call set_sink"), }