From 10f7fe174c96518a1dc9003c73e5c42960125e62 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 31 Oct 2016 20:35:36 +0300 Subject: [PATCH] stalled verification until earlier blocks are not processed --- verification/src/chain_verifier.rs | 21 ++++- verification/src/lib.rs | 6 ++ verification/src/queue.rs | 124 +++++++++++++++++++++++++---- 3 files changed, 134 insertions(+), 17 deletions(-) diff --git a/verification/src/chain_verifier.rs b/verification/src/chain_verifier.rs index 8ea15955..b551e0e4 100644 --- a/verification/src/chain_verifier.rs +++ b/verification/src/chain_verifier.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use db::{self, BlockRef}; use chain::{self, RepresentH256}; -use super::{Verify, VerificationResult, Chain, Error, TransactionError}; +use super::{Verify, VerificationResult, Chain, Error, TransactionError, ContinueVerify}; use utils; const BLOCK_MAX_FUTURE: i64 = 2 * 60 * 60; // 2 hours @@ -107,6 +107,25 @@ impl Verify for ChainVerifier { } } +impl ContinueVerify for ChainVerifier { + type State = usize; + + fn continue_verify(&self, block: &chain::Block, state: usize) -> VerificationResult { + // verify transactions (except coinbase) + for (idx, transaction) in block.transactions().iter().skip(state).enumerate() { + try!(self.verify_transaction(block, transaction).map_err(|e| Error::Transaction(idx, e))); + } + + + let _parent = match self.store.block(BlockRef::Hash(block.header().previous_header_hash.clone())) { + Some(b) => b, + None => { return Ok(Chain::Orphan); } + }; + + Ok(Chain::Main) + } +} + #[cfg(test)] mod tests { diff --git a/verification/src/lib.rs b/verification/src/lib.rs index 3e6b708c..a641bc36 100644 --- a/verification/src/lib.rs +++ b/verification/src/lib.rs @@ -86,3 +86,9 @@ pub type VerificationResult = Result; pub trait Verify : Send + Sync { fn verify(&self, block: &chain::Block) -> VerificationResult; } + +/// Trait for verifier that can be interrupted and continue from the specific point +pub trait ContinueVerify : Verify + Send + Sync { + type State; + fn continue_verify(&self, block: &chain::Block, state: Self::State) -> VerificationResult; +} diff --git a/verification/src/queue.rs b/verification/src/queue.rs index 42a29f59..d7e8167f 100644 --- a/verification/src/queue.rs +++ b/verification/src/queue.rs @@ -2,10 +2,10 @@ use chain::{Block, RepresentH256}; use primitives::hash::H256; -use super::{Chain, Verify, BlockStatus}; +use super::{Chain, ContinueVerify, BlockStatus, Error as VerificationError, TransactionError}; use linked_hash_map::LinkedHashMap; use parking_lot::RwLock; -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; const MAX_PENDING_PRESET: usize = 128; @@ -29,22 +29,81 @@ pub enum Error { Duplicate, } + +pub enum ScheduleItem { + Block(Block), + Continued(Block, usize), +} + +impl ScheduleItem { + fn block(self) -> Block { + match self { + ScheduleItem::Block(block) => block, + ScheduleItem::Continued(block, _) => block, + } + } +} + +struct Schedule { + items: VecDeque<(H256, ScheduleItem)>, + set: HashSet, +} + +impl Schedule { + pub fn new() -> Self { + Schedule { items: VecDeque::new(), set: HashSet::new() } + } + + pub fn push_back(&mut self, hash: H256, item: ScheduleItem) { + self.items.push_back((hash.clone(), item)); + self.set.insert(hash); + } + + pub fn push_front(&mut self, hash: H256, item: ScheduleItem) { + self.items.push_front((hash.clone(), item)); + self.set.insert(hash); + } + + pub fn pop_front(&mut self) -> Option<(H256, ScheduleItem)> { + self.items.pop_front() + .and_then(|(h, b)| { self.set.remove(&h); Some((h, b)) }) + } + + pub fn contains(&self, hash: &H256) -> bool { + self.set.contains(hash) + } + + pub fn len(&self) -> usize { + self.set.len() + } + + pub fn front(&self) -> Option<&ScheduleItem> { + self.items.front().and_then(|&(_, ref item)| Some(item)) + } +} + /// Verification queue pub struct Queue { - verifier: Box, - items: RwLock>, + verifier: Box>, + items: RwLock, + // todo: write lock on verified should continue until blocks are persisted in the database verified: RwLock>, invalid: RwLock>, processing: RwLock>, } +pub enum WorkStatus { + Continue, + Wait, +} + impl Queue { /// New verification queue - pub fn new(verifier: Box) -> Self { + pub fn new(verifier: Box>) -> Self { Queue { verifier: verifier, - items: RwLock::new(LinkedHashMap::new()), + items: RwLock::new(Schedule::new()), verified: RwLock::new(LinkedHashMap::new()), invalid: RwLock::new(HashSet::new()), processing: RwLock::new(HashSet::new()), @@ -52,26 +111,47 @@ impl Queue { } /// Process one block in the queue - pub fn process(&self) { - let (hash, block) = { + pub fn process(&self) -> WorkStatus { + let (hash, item) = { let mut processing = self.processing.write(); let mut items = self.items.write(); + + if let Some(&ScheduleItem::Continued(_, _)) = items.front() { + if !self.verified.read().is_empty() || !processing.is_empty() { + // stall here until earlier blocks are processed + return WorkStatus::Wait + } + } + match items.pop_front() { - Some((hash, block)) => { + Some((hash, item)) => { processing.insert(hash.clone()); - (hash, block) + (hash, item) }, /// nothing to verify - None => { return; }, + _ => { return WorkStatus::Wait; }, } }; - match self.verifier.verify(&block) { + match { + match item { + ScheduleItem::Block(ref block) => self.verifier.verify(block), + ScheduleItem::Continued(ref block, state) => self.verifier.continue_verify(block, state), + } + } + { Ok(chain) => { let mut verified = self.verified.write(); let mut processing = self.processing.write(); processing.remove(&hash); - verified.insert(hash, VerifiedBlock::new(chain, block)); + verified.insert(hash, VerifiedBlock::new(chain, item.block())); + }, + // todo: more generic incloncusive variant type for this match? + Err(VerificationError::Transaction(num, TransactionError::Inconclusive(_))) => { + let mut processing = self.processing.write(); + let mut items = self.items.write(); + processing.remove(&hash); + items.push_front(hash, ScheduleItem::Continued(item.block(), num)); }, Err(e) => { println!("Verification failed: {:?}", e); @@ -82,6 +162,8 @@ impl Queue { invalid.insert(hash); } } + + WorkStatus::Continue } /// Query block status @@ -89,7 +171,7 @@ impl Queue { if self.invalid.read().contains(hash) { BlockStatus::Invalid } else if self.processing.read().contains(hash) { BlockStatus::Verifying } else if self.verified.read().contains_key(hash) { BlockStatus::Valid } - else if self.items.read().contains_key(hash) { BlockStatus::Pending } + else if self.items.read().contains(hash) { BlockStatus::Pending } else { BlockStatus::Absent } } @@ -105,7 +187,7 @@ impl Queue { let mut items = self.items.write(); if items.len() > self.max_pending() { return Err(Error::Full) } - items.insert(hash, block); + items.push_back(hash, ScheduleItem::Block(block)); Ok(()) } @@ -118,7 +200,7 @@ impl Queue { #[cfg(test)] mod tests { use super::Queue; - use super::super::{BlockStatus, VerificationResult, Verify, Chain, Error as VerificationError}; + use super::super::{BlockStatus, VerificationResult, Verify, ContinueVerify, Chain, Error as VerificationError}; use chain::{Block, RepresentH256}; use primitives::hash::H256; use test_data; @@ -128,11 +210,21 @@ mod tests { fn verify(&self, _block: &Block) -> VerificationResult { Ok(Chain::Main) } } + impl ContinueVerify for FacileVerifier { + type State = usize; + fn continue_verify(&self, _block: &Block, _state: usize) -> VerificationResult { Ok(Chain::Main) } + } + struct EvilVerifier; impl Verify for EvilVerifier { fn verify(&self, _block: &Block) -> VerificationResult { Err(VerificationError::Empty) } } + impl ContinueVerify for EvilVerifier { + type State = usize; + fn continue_verify(&self, _block: &Block, _state: usize) -> VerificationResult { Ok(Chain::Main) } + } + #[test] fn new() { let queue = Queue::new(Box::new(FacileVerifier));