stalled verification until earlier blocks are not processed
This commit is contained in:
parent
5a20b9fe36
commit
10f7fe174c
|
@ -4,7 +4,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use db::{self, BlockRef};
|
use db::{self, BlockRef};
|
||||||
use chain::{self, RepresentH256};
|
use chain::{self, RepresentH256};
|
||||||
use super::{Verify, VerificationResult, Chain, Error, TransactionError};
|
use super::{Verify, VerificationResult, Chain, Error, TransactionError, ContinueVerify};
|
||||||
use utils;
|
use utils;
|
||||||
|
|
||||||
const BLOCK_MAX_FUTURE: i64 = 2 * 60 * 60; // 2 hours
|
const BLOCK_MAX_FUTURE: i64 = 2 * 60 * 60; // 2 hours
|
||||||
|
@ -107,6 +107,25 @@ impl Verify for ChainVerifier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ContinueVerify for ChainVerifier {
|
||||||
|
type State = usize;
|
||||||
|
|
||||||
|
fn continue_verify(&self, block: &chain::Block, state: usize) -> VerificationResult {
|
||||||
|
// verify transactions (except coinbase)
|
||||||
|
for (idx, transaction) in block.transactions().iter().skip(state).enumerate() {
|
||||||
|
try!(self.verify_transaction(block, transaction).map_err(|e| Error::Transaction(idx, e)));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
let _parent = match self.store.block(BlockRef::Hash(block.header().previous_header_hash.clone())) {
|
||||||
|
Some(b) => b,
|
||||||
|
None => { return Ok(Chain::Orphan); }
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Chain::Main)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
||||||
|
|
|
@ -86,3 +86,9 @@ pub type VerificationResult = Result<Chain, Error>;
|
||||||
pub trait Verify : Send + Sync {
|
pub trait Verify : Send + Sync {
|
||||||
fn verify(&self, block: &chain::Block) -> VerificationResult;
|
fn verify(&self, block: &chain::Block) -> VerificationResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Trait for verifier that can be interrupted and continue from the specific point
|
||||||
|
pub trait ContinueVerify : Verify + Send + Sync {
|
||||||
|
type State;
|
||||||
|
fn continue_verify(&self, block: &chain::Block, state: Self::State) -> VerificationResult;
|
||||||
|
}
|
||||||
|
|
|
@ -2,10 +2,10 @@
|
||||||
|
|
||||||
use chain::{Block, RepresentH256};
|
use chain::{Block, RepresentH256};
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
use super::{Chain, Verify, BlockStatus};
|
use super::{Chain, ContinueVerify, BlockStatus, Error as VerificationError, TransactionError};
|
||||||
use linked_hash_map::LinkedHashMap;
|
use linked_hash_map::LinkedHashMap;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use std::collections::HashSet;
|
use std::collections::{HashSet, VecDeque};
|
||||||
|
|
||||||
const MAX_PENDING_PRESET: usize = 128;
|
const MAX_PENDING_PRESET: usize = 128;
|
||||||
|
|
||||||
|
@ -29,22 +29,81 @@ pub enum Error {
|
||||||
Duplicate,
|
Duplicate,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub enum ScheduleItem {
|
||||||
|
Block(Block),
|
||||||
|
Continued(Block, usize),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ScheduleItem {
|
||||||
|
fn block(self) -> Block {
|
||||||
|
match self {
|
||||||
|
ScheduleItem::Block(block) => block,
|
||||||
|
ScheduleItem::Continued(block, _) => block,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Schedule {
|
||||||
|
items: VecDeque<(H256, ScheduleItem)>,
|
||||||
|
set: HashSet<H256>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Schedule {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Schedule { items: VecDeque::new(), set: HashSet::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push_back(&mut self, hash: H256, item: ScheduleItem) {
|
||||||
|
self.items.push_back((hash.clone(), item));
|
||||||
|
self.set.insert(hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push_front(&mut self, hash: H256, item: ScheduleItem) {
|
||||||
|
self.items.push_front((hash.clone(), item));
|
||||||
|
self.set.insert(hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pop_front(&mut self) -> Option<(H256, ScheduleItem)> {
|
||||||
|
self.items.pop_front()
|
||||||
|
.and_then(|(h, b)| { self.set.remove(&h); Some((h, b)) })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn contains(&self, hash: &H256) -> bool {
|
||||||
|
self.set.contains(hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.set.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn front(&self) -> Option<&ScheduleItem> {
|
||||||
|
self.items.front().and_then(|&(_, ref item)| Some(item))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Verification queue
|
/// Verification queue
|
||||||
pub struct Queue {
|
pub struct Queue {
|
||||||
verifier: Box<Verify>,
|
verifier: Box<ContinueVerify<State=usize>>,
|
||||||
items: RwLock<LinkedHashMap<H256, Block>>,
|
items: RwLock<Schedule>,
|
||||||
|
// todo: write lock on verified should continue until blocks are persisted in the database
|
||||||
verified: RwLock<LinkedHashMap<H256, VerifiedBlock>>,
|
verified: RwLock<LinkedHashMap<H256, VerifiedBlock>>,
|
||||||
invalid: RwLock<HashSet<H256>>,
|
invalid: RwLock<HashSet<H256>>,
|
||||||
processing: RwLock<HashSet<H256>>,
|
processing: RwLock<HashSet<H256>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum WorkStatus {
|
||||||
|
Continue,
|
||||||
|
Wait,
|
||||||
|
}
|
||||||
|
|
||||||
impl Queue {
|
impl Queue {
|
||||||
|
|
||||||
/// New verification queue
|
/// New verification queue
|
||||||
pub fn new(verifier: Box<Verify>) -> Self {
|
pub fn new(verifier: Box<ContinueVerify<State=usize>>) -> Self {
|
||||||
Queue {
|
Queue {
|
||||||
verifier: verifier,
|
verifier: verifier,
|
||||||
items: RwLock::new(LinkedHashMap::new()),
|
items: RwLock::new(Schedule::new()),
|
||||||
verified: RwLock::new(LinkedHashMap::new()),
|
verified: RwLock::new(LinkedHashMap::new()),
|
||||||
invalid: RwLock::new(HashSet::new()),
|
invalid: RwLock::new(HashSet::new()),
|
||||||
processing: RwLock::new(HashSet::new()),
|
processing: RwLock::new(HashSet::new()),
|
||||||
|
@ -52,26 +111,47 @@ impl Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process one block in the queue
|
/// Process one block in the queue
|
||||||
pub fn process(&self) {
|
pub fn process(&self) -> WorkStatus {
|
||||||
let (hash, block) = {
|
let (hash, item) = {
|
||||||
let mut processing = self.processing.write();
|
let mut processing = self.processing.write();
|
||||||
let mut items = self.items.write();
|
let mut items = self.items.write();
|
||||||
|
|
||||||
|
if let Some(&ScheduleItem::Continued(_, _)) = items.front() {
|
||||||
|
if !self.verified.read().is_empty() || !processing.is_empty() {
|
||||||
|
// stall here until earlier blocks are processed
|
||||||
|
return WorkStatus::Wait
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
match items.pop_front() {
|
match items.pop_front() {
|
||||||
Some((hash, block)) => {
|
Some((hash, item)) => {
|
||||||
processing.insert(hash.clone());
|
processing.insert(hash.clone());
|
||||||
(hash, block)
|
(hash, item)
|
||||||
},
|
},
|
||||||
/// nothing to verify
|
/// nothing to verify
|
||||||
None => { return; },
|
_ => { return WorkStatus::Wait; },
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
match self.verifier.verify(&block) {
|
match {
|
||||||
|
match item {
|
||||||
|
ScheduleItem::Block(ref block) => self.verifier.verify(block),
|
||||||
|
ScheduleItem::Continued(ref block, state) => self.verifier.continue_verify(block, state),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
Ok(chain) => {
|
Ok(chain) => {
|
||||||
let mut verified = self.verified.write();
|
let mut verified = self.verified.write();
|
||||||
let mut processing = self.processing.write();
|
let mut processing = self.processing.write();
|
||||||
processing.remove(&hash);
|
processing.remove(&hash);
|
||||||
verified.insert(hash, VerifiedBlock::new(chain, block));
|
verified.insert(hash, VerifiedBlock::new(chain, item.block()));
|
||||||
|
},
|
||||||
|
// todo: more generic incloncusive variant type for this match?
|
||||||
|
Err(VerificationError::Transaction(num, TransactionError::Inconclusive(_))) => {
|
||||||
|
let mut processing = self.processing.write();
|
||||||
|
let mut items = self.items.write();
|
||||||
|
processing.remove(&hash);
|
||||||
|
items.push_front(hash, ScheduleItem::Continued(item.block(), num));
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
println!("Verification failed: {:?}", e);
|
println!("Verification failed: {:?}", e);
|
||||||
|
@ -82,6 +162,8 @@ impl Queue {
|
||||||
invalid.insert(hash);
|
invalid.insert(hash);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
WorkStatus::Continue
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Query block status
|
/// Query block status
|
||||||
|
@ -89,7 +171,7 @@ impl Queue {
|
||||||
if self.invalid.read().contains(hash) { BlockStatus::Invalid }
|
if self.invalid.read().contains(hash) { BlockStatus::Invalid }
|
||||||
else if self.processing.read().contains(hash) { BlockStatus::Verifying }
|
else if self.processing.read().contains(hash) { BlockStatus::Verifying }
|
||||||
else if self.verified.read().contains_key(hash) { BlockStatus::Valid }
|
else if self.verified.read().contains_key(hash) { BlockStatus::Valid }
|
||||||
else if self.items.read().contains_key(hash) { BlockStatus::Pending }
|
else if self.items.read().contains(hash) { BlockStatus::Pending }
|
||||||
else { BlockStatus::Absent }
|
else { BlockStatus::Absent }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +187,7 @@ impl Queue {
|
||||||
|
|
||||||
let mut items = self.items.write();
|
let mut items = self.items.write();
|
||||||
if items.len() > self.max_pending() { return Err(Error::Full) }
|
if items.len() > self.max_pending() { return Err(Error::Full) }
|
||||||
items.insert(hash, block);
|
items.push_back(hash, ScheduleItem::Block(block));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -118,7 +200,7 @@ impl Queue {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::Queue;
|
use super::Queue;
|
||||||
use super::super::{BlockStatus, VerificationResult, Verify, Chain, Error as VerificationError};
|
use super::super::{BlockStatus, VerificationResult, Verify, ContinueVerify, Chain, Error as VerificationError};
|
||||||
use chain::{Block, RepresentH256};
|
use chain::{Block, RepresentH256};
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
use test_data;
|
use test_data;
|
||||||
|
@ -128,11 +210,21 @@ mod tests {
|
||||||
fn verify(&self, _block: &Block) -> VerificationResult { Ok(Chain::Main) }
|
fn verify(&self, _block: &Block) -> VerificationResult { Ok(Chain::Main) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ContinueVerify for FacileVerifier {
|
||||||
|
type State = usize;
|
||||||
|
fn continue_verify(&self, _block: &Block, _state: usize) -> VerificationResult { Ok(Chain::Main) }
|
||||||
|
}
|
||||||
|
|
||||||
struct EvilVerifier;
|
struct EvilVerifier;
|
||||||
impl Verify for EvilVerifier {
|
impl Verify for EvilVerifier {
|
||||||
fn verify(&self, _block: &Block) -> VerificationResult { Err(VerificationError::Empty) }
|
fn verify(&self, _block: &Block) -> VerificationResult { Err(VerificationError::Empty) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ContinueVerify for EvilVerifier {
|
||||||
|
type State = usize;
|
||||||
|
fn continue_verify(&self, _block: &Block, _state: usize) -> VerificationResult { Ok(Chain::Main) }
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn new() {
|
fn new() {
|
||||||
let queue = Queue::new(Box::new(FacileVerifier));
|
let queue = Queue::new(Box::new(FacileVerifier));
|
||||||
|
|
Loading…
Reference in New Issue