Merge pull request #70 from ethcore/verification-p

Continuous verification of block transactions
This commit is contained in:
Svyatoslav Nikolsky 2016-10-31 23:42:43 +03:00 committed by GitHub
commit 7e160715ea
3 changed files with 220 additions and 19 deletions

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use db::{self, BlockRef};
use chain::{self, RepresentH256};
use super::{Verify, VerificationResult, Chain, Error, TransactionError};
use super::{Verify, VerificationResult, Chain, Error, TransactionError, ContinueVerify};
use utils;
const BLOCK_MAX_FUTURE: i64 = 2 * 60 * 60; // 2 hours
@ -107,6 +107,25 @@ impl Verify for ChainVerifier {
}
}
impl ContinueVerify for ChainVerifier {
type State = usize;
fn continue_verify(&self, block: &chain::Block, state: usize) -> VerificationResult {
// verify transactions (except coinbase)
for (idx, transaction) in block.transactions().iter().skip(state-1).enumerate() {
try!(self.verify_transaction(block, transaction).map_err(|e| Error::Transaction(idx, e)));
}
let _parent = match self.store.block(BlockRef::Hash(block.header().previous_header_hash.clone())) {
Some(b) => b,
None => { return Ok(Chain::Orphan); }
};
Ok(Chain::Main)
}
}
#[cfg(test)]
mod tests {
@ -162,5 +181,4 @@ mod tests {
));
assert_eq!(should_be, verifier.verify(&b170));
}
}

View File

@ -86,3 +86,9 @@ pub type VerificationResult = Result<Chain, Error>;
pub trait Verify : Send + Sync {
fn verify(&self, block: &chain::Block) -> VerificationResult;
}
/// Trait for verifier that can be interrupted and continue from the specific point
pub trait ContinueVerify : Verify + Send + Sync {
type State;
fn continue_verify(&self, block: &chain::Block, state: Self::State) -> VerificationResult;
}

View File

@ -2,10 +2,10 @@
use chain::{Block, RepresentH256};
use primitives::hash::H256;
use super::{Chain, Verify, BlockStatus};
use super::{Chain, ContinueVerify, BlockStatus, Error as VerificationError, TransactionError};
use linked_hash_map::LinkedHashMap;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};
const MAX_PENDING_PRESET: usize = 128;
@ -29,22 +29,85 @@ pub enum Error {
Duplicate,
}
pub enum ScheduleItem {
Block(Block),
Continued(Block, usize),
}
impl ScheduleItem {
fn block(self) -> Block {
match self {
ScheduleItem::Block(block) => block,
ScheduleItem::Continued(block, _) => block,
}
}
}
struct Schedule {
items: VecDeque<(H256, ScheduleItem)>,
set: HashSet<H256>,
}
impl Schedule {
pub fn new() -> Self {
Schedule { items: VecDeque::new(), set: HashSet::new() }
}
pub fn push_back(&mut self, hash: H256, item: ScheduleItem) {
self.items.push_back((hash.clone(), item));
self.set.insert(hash);
}
pub fn push_front(&mut self, hash: H256, item: ScheduleItem) {
self.items.push_front((hash.clone(), item));
self.set.insert(hash);
}
pub fn pop_front(&mut self) -> Option<(H256, ScheduleItem)> {
self.items.pop_front()
.and_then(|(h, b)| { self.set.remove(&h); Some((h, b)) })
}
pub fn contains(&self, hash: &H256) -> bool {
self.set.contains(hash)
}
pub fn len(&self) -> usize {
self.set.len()
}
pub fn front(&self) -> Option<&ScheduleItem> {
self.items.front().and_then(|&(_, ref item)| Some(item))
}
}
/// Verification queue
pub struct Queue {
verifier: Box<Verify>,
items: RwLock<LinkedHashMap<H256, Block>>,
verifier: Box<ContinueVerify<State=usize>>,
items: RwLock<Schedule>,
// todo: write lock on verified should continue until blocks are persisted in the database
// todo: OR journal verified transactions before they are persisted
// todo: OTHERWISE verification thread may behave suboptiomal, trying to verify the same block
// todo: over and over again until in finally gets inserted
verified: RwLock<LinkedHashMap<H256, VerifiedBlock>>,
invalid: RwLock<HashSet<H256>>,
processing: RwLock<HashSet<H256>>,
}
pub enum WorkStatus {
Continue,
Wait,
}
impl Queue {
/// New verification queue
pub fn new(verifier: Box<Verify>) -> Self {
pub fn new(verifier: Box<ContinueVerify<State=usize>>) -> Self {
Queue {
verifier: verifier,
items: RwLock::new(LinkedHashMap::new()),
items: RwLock::new(Schedule::new()),
verified: RwLock::new(LinkedHashMap::new()),
invalid: RwLock::new(HashSet::new()),
processing: RwLock::new(HashSet::new()),
@ -52,26 +115,47 @@ impl Queue {
}
/// Process one block in the queue
pub fn process(&self) {
let (hash, block) = {
pub fn process(&self) -> WorkStatus {
let (hash, item) = {
let mut processing = self.processing.write();
let mut items = self.items.write();
if let Some(&ScheduleItem::Continued(_, _)) = items.front() {
if !self.verified.read().is_empty() || !processing.is_empty() {
// stall here until earlier blocks are processed
return WorkStatus::Wait
}
}
match items.pop_front() {
Some((hash, block)) => {
Some((hash, item)) => {
processing.insert(hash.clone());
(hash, block)
(hash, item)
},
/// nothing to verify
None => { return; },
_ => { return WorkStatus::Wait; },
}
};
match self.verifier.verify(&block) {
match {
match item {
ScheduleItem::Block(ref block) => self.verifier.verify(block),
ScheduleItem::Continued(ref block, state) => self.verifier.continue_verify(block, state),
}
}
{
Ok(chain) => {
let mut verified = self.verified.write();
let mut processing = self.processing.write();
let mut verified = self.verified.write();
processing.remove(&hash);
verified.insert(hash, VerifiedBlock::new(chain, block));
verified.insert(hash, VerifiedBlock::new(chain, item.block()));
},
// todo: more generic incloncusive variant type for this match?
Err(VerificationError::Transaction(num, TransactionError::Inconclusive(_))) => {
let mut processing = self.processing.write();
let mut items = self.items.write();
processing.remove(&hash);
items.push_front(hash, ScheduleItem::Continued(item.block(), num));
},
Err(e) => {
println!("Verification failed: {:?}", e);
@ -82,6 +166,8 @@ impl Queue {
invalid.insert(hash);
}
}
WorkStatus::Continue
}
/// Query block status
@ -89,7 +175,7 @@ impl Queue {
if self.invalid.read().contains(hash) { BlockStatus::Invalid }
else if self.processing.read().contains(hash) { BlockStatus::Verifying }
else if self.verified.read().contains_key(hash) { BlockStatus::Valid }
else if self.items.read().contains_key(hash) { BlockStatus::Pending }
else if self.items.read().contains(hash) { BlockStatus::Pending }
else { BlockStatus::Absent }
}
@ -105,7 +191,7 @@ impl Queue {
let mut items = self.items.write();
if items.len() > self.max_pending() { return Err(Error::Full) }
items.insert(hash, block);
items.push_back(hash, ScheduleItem::Block(block));
Ok(())
}
@ -118,21 +204,52 @@ impl Queue {
#[cfg(test)]
mod tests {
use super::Queue;
use super::super::{BlockStatus, VerificationResult, Verify, Chain, Error as VerificationError};
use super::super::{BlockStatus, VerificationResult, Verify, ContinueVerify, Chain, Error as VerificationError, TransactionError};
use chain::{Block, RepresentH256};
use primitives::hash::H256;
use test_data;
use std::collections::HashMap;
struct FacileVerifier;
impl Verify for FacileVerifier {
fn verify(&self, _block: &Block) -> VerificationResult { Ok(Chain::Main) }
}
impl ContinueVerify for FacileVerifier {
type State = usize;
fn continue_verify(&self, _block: &Block, _state: usize) -> VerificationResult { Ok(Chain::Main) }
}
struct EvilVerifier;
impl Verify for EvilVerifier {
fn verify(&self, _block: &Block) -> VerificationResult { Err(VerificationError::Empty) }
}
impl ContinueVerify for EvilVerifier {
type State = usize;
fn continue_verify(&self, _block: &Block, _state: usize) -> VerificationResult { Ok(Chain::Main) }
}
struct HupVerifier {
hups: HashMap<H256, usize>,
}
impl Verify for HupVerifier {
fn verify(&self, block: &Block) -> VerificationResult {
if let Some(hup) = self.hups.get(&block.hash()) {
Err(VerificationError::Transaction(*hup, TransactionError::Inconclusive(H256::from(0))))
}
else {
Ok(Chain::Main)
}
}
}
impl ContinueVerify for HupVerifier {
type State = usize;
fn continue_verify(&self, _block: &Block, _state: usize) -> VerificationResult { Ok(Chain::Main) }
}
#[test]
fn new() {
let queue = Queue::new(Box::new(FacileVerifier));
@ -227,4 +344,64 @@ mod tests {
assert_eq!(queue.block_status(&hash), BlockStatus::Absent);
assert_eq!(h, hash);
}
#[test]
fn verification_stalls_on_unverifiable() {
let b1 = test_data::block_builder()
.header().build()
.build();
let b2 = test_data::block_builder()
.header().parent(b1.hash()).build()
.build();
let mut hup_verifier = HupVerifier { hups: HashMap::new() };
hup_verifier.hups.insert(b2.hash(), 5);
let queue = Queue::new(Box::new(hup_verifier));
queue.push(b1.clone()).unwrap();
queue.push(b2.clone()).unwrap();
queue.process();
assert_eq!(queue.block_status(&b1.hash()), BlockStatus::Valid);
queue.process();
assert_eq!(queue.block_status(&b2.hash()),
BlockStatus::Pending,
"Block #2 supposed to stay in the pending state, because it requires 'processing' and 'verified' lines to be empty to continue" );
}
#[test]
fn verification_continues_stalled_block() {
let b1 = test_data::block_builder()
.header().build()
.build();
let b2 = test_data::block_builder()
.header().parent(b1.hash()).build()
.build();
let mut hup_verifier = HupVerifier { hups: HashMap::new() };
hup_verifier.hups.insert(b2.hash(), 5);
let queue = Queue::new(Box::new(hup_verifier));
queue.push(b1.clone()).unwrap();
queue.push(b2.clone()).unwrap();
queue.process();
assert_eq!(queue.block_status(&b1.hash()), BlockStatus::Valid);
queue.process();
assert_eq!(queue.block_status(&b2.hash()),
BlockStatus::Pending,
"Block #2 supposed to stay in the pending state, because it requires 'processing' and 'verified' lines to be empty to continue" );
queue.pop_valid();
queue.process();
assert_eq!(queue.block_status(&b2.hash()),
BlockStatus::Valid,
"Block #2 supposed to achieve valid state, because it requires 'processing' and 'verified' lines to be empty, which are indeed empty" );
}
}