actually schedule verification tasks for decanonized transactions

This commit is contained in:
Svyatoslav Nikolsky 2016-11-30 14:29:19 +03:00
parent 3daad00dc3
commit 4c85a3c3b4
3 changed files with 46 additions and 27 deletions

View File

@ -5,7 +5,7 @@ use chain;
use db;
use network::Magic;
use orphan_blocks_pool::OrphanBlocksPool;
use synchronization_verifier::{Verifier, SyncVerifier, VerificationSink};
use synchronization_verifier::{Verifier, SyncVerifier, VerificationSink, VerificationTask};
use primitives::hash::H256;
use super::Error;
@ -75,10 +75,11 @@ impl BlocksWriterSink {
}
impl VerificationSink for BlocksWriterSink {
fn on_block_verification_success(&mut self, block: db::IndexedBlock) {
fn on_block_verification_success(&mut self, block: db::IndexedBlock) -> Option<Vec<VerificationTask>> {
if let Err(err) = self.storage.insert_indexed_block(&block) {
self.err = Some(Error::Database(err));
}
None
}
fn on_block_verification_error(&mut self, err: &str, _hash: &H256) {

View File

@ -24,7 +24,7 @@ use synchronization_server::ServerTaskIndex;
use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory,
manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS,
ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig};
use synchronization_verifier::{Verifier, VerificationSink};
use synchronization_verifier::{Verifier, VerificationSink, VerificationTask};
use compact_block_builder::build_compact_block;
use hash_queue::HashPosition;
use miner::transaction_fee_rate;
@ -776,7 +776,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor {
/// Process successful block verification
fn on_block_verification_success(&mut self, block: IndexedBlock) {
fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option<Vec<VerificationTask>> {
let hash = block.hash();
// insert block to the storage
match {
@ -810,14 +810,20 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
}
// deal with block transactions
let mut verification_tasks: Vec<VerificationTask> = Vec::with_capacity(insert_result.transactions_to_reverify.len());
for (hash, tx) in insert_result.transactions_to_reverify {
// TODO: transactions from this blocks will be relayed. Do we need this?
self.process_peer_transaction(None, hash, tx);
if let Some(tx_orphans) = self.process_peer_transaction(None, hash, tx) {
let tx_tasks = tx_orphans.into_iter().map(|(_, tx)| VerificationTask::VerifyTransaction(tx));
verification_tasks.extend(tx_tasks);
};
}
Some(verification_tasks)
},
Err(db::Error::Consistency(e)) => {
// process as verification error
self.on_block_verification_error(&format!("{:?}", db::Error::Consistency(e)), &hash);
None
},
Err(e) => {
// process as irrecoverable failure

View File

@ -1,4 +1,5 @@
use std::thread;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::mpsc::{channel, Sender, Receiver};
use parking_lot::Mutex;
@ -11,7 +12,7 @@ use db::{SharedStore, IndexedBlock};
/// Verification events sink
pub trait VerificationSink : Send + 'static {
/// When block verification has completed successfully.
fn on_block_verification_success(&mut self, block: IndexedBlock);
fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option<Vec<VerificationTask>>;
/// When block verification has failed.
fn on_block_verification_error(&mut self, err: &str, hash: &H256);
/// When transaction verification has completed successfully.
@ -21,7 +22,8 @@ pub trait VerificationSink : Send + 'static {
}
/// Verification thread tasks
enum VerificationTask {
#[derive(Debug)]
pub enum VerificationTask {
/// Verify single block
VerifyBlock(IndexedBlock),
/// Verify single transaction
@ -133,26 +135,33 @@ impl<T> Verifier for SyncVerifier<T> where T: VerificationSink {
/// Execute single verification task
fn execute_verification_task<T: VerificationSink>(sink: &Arc<Mutex<T>>, verifier: &ChainVerifier, task: VerificationTask) {
match task {
VerificationTask::VerifyBlock(block) => {
// verify block
match verifier.verify(&block) {
Ok(Chain::Main) | Ok(Chain::Side) => {
sink.lock().on_block_verification_success(block)
},
Ok(Chain::Orphan) => {
unreachable!("sync will never put orphaned blocks to verification queue");
},
Err(e) => {
sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash())
let mut tasks_queue: VecDeque<VerificationTask> = VecDeque::new();
tasks_queue.push_back(task);
while let Some(task) = tasks_queue.pop_front() {
match task {
VerificationTask::VerifyBlock(block) => {
// verify block
match verifier.verify(&block) {
Ok(Chain::Main) | Ok(Chain::Side) => {
if let Some(tasks) = sink.lock().on_block_verification_success(block) {
tasks_queue.extend(tasks);
}
},
Ok(Chain::Orphan) => {
unreachable!("sync will never put orphaned blocks to verification queue");
},
Err(e) => {
sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash())
}
}
}
},
VerificationTask::VerifyTransaction(transaction) => {
// TODO: add verification here
sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash())
},
_ => unreachable!("must be checked by caller"),
},
VerificationTask::VerifyTransaction(transaction) => {
// TODO: add verification here
sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash())
},
_ => unreachable!("must be checked by caller"),
}
}
}
@ -189,7 +198,10 @@ pub mod tests {
match self.sink {
Some(ref sink) => match self.errors.get(&block.hash()) {
Some(err) => sink.lock().on_block_verification_error(&err, &block.hash()),
None => sink.lock().on_block_verification_success(block),
None => {
sink.lock().on_block_verification_success(block);
()
},
},
None => panic!("call set_sink"),
}