using task-splitter in block
This commit is contained in:
parent
1e9428a3a5
commit
a17c2fe82a
|
@ -11,6 +11,7 @@ dependencies = [
|
||||||
"network 0.1.0",
|
"network 0.1.0",
|
||||||
"parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
"parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"primitives 0.1.0",
|
"primitives 0.1.0",
|
||||||
|
"scoped-pool 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"script 0.1.0",
|
"script 0.1.0",
|
||||||
"serialization 0.1.0",
|
"serialization 0.1.0",
|
||||||
"test-data 0.1.0",
|
"test-data 0.1.0",
|
||||||
|
@ -617,11 +618,26 @@ dependencies = [
|
||||||
"semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)",
|
"semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "scoped-pool"
|
||||||
|
version = "1.0.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
dependencies = [
|
||||||
|
"crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"scopeguard 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"variance 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "scoped-tls"
|
name = "scoped-tls"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "scopeguard"
|
||||||
|
version = "0.1.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "script"
|
name = "script"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
@ -780,6 +796,11 @@ name = "utf8-ranges"
|
||||||
version = "0.1.3"
|
version = "0.1.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "variance"
|
||||||
|
version = "0.1.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "vec_map"
|
name = "vec_map"
|
||||||
version = "0.6.0"
|
version = "0.6.0"
|
||||||
|
@ -873,7 +894,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
"checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a"
|
"checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a"
|
||||||
"checksum rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)" = "bff9fc1c79f2dec76b253273d07682e94a978bd8f132ded071188122b2af9818"
|
"checksum rustc-serialize 0.3.21 (registry+https://github.com/rust-lang/crates.io-index)" = "bff9fc1c79f2dec76b253273d07682e94a978bd8f132ded071188122b2af9818"
|
||||||
"checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084"
|
"checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084"
|
||||||
|
"checksum scoped-pool 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "817a3a15e704545ce59ed2b5c60a5d32bda4d7869befb8b36667b658a6c00b43"
|
||||||
"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
|
"checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
|
||||||
|
"checksum scopeguard 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "59a076157c1e2dc561d8de585151ee6965d910dd4dcb5dabb7ae3e83981a6c57"
|
||||||
"checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac"
|
"checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac"
|
||||||
"checksum shell32-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "72f20b8f3c060374edb8046591ba28f62448c369ccbdc7b02075103fb3a9e38d"
|
"checksum shell32-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "72f20b8f3c060374edb8046591ba28f62448c369ccbdc7b02075103fb3a9e38d"
|
||||||
"checksum siphasher 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b1c3c58c9ac43c530919fe6bd8ef11ae2612f64c2bf8eab9346f5b71ce0617f2"
|
"checksum siphasher 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b1c3c58c9ac43c530919fe6bd8ef11ae2612f64c2bf8eab9346f5b71ce0617f2"
|
||||||
|
@ -888,6 +911,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
"checksum unicode-segmentation 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b905d0fc2a1f0befd86b0e72e31d1787944efef9d38b9358a9e92a69757f7e3b"
|
"checksum unicode-segmentation 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b905d0fc2a1f0befd86b0e72e31d1787944efef9d38b9358a9e92a69757f7e3b"
|
||||||
"checksum unicode-width 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2d6722facc10989f63ee0e20a83cd4e1714a9ae11529403ac7e0afd069abc39e"
|
"checksum unicode-width 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2d6722facc10989f63ee0e20a83cd4e1714a9ae11529403ac7e0afd069abc39e"
|
||||||
"checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f"
|
"checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f"
|
||||||
|
"checksum variance 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3abfc2be1fb59663871379ea884fd81de80c496f2274e021c01d6fe56cd77b05"
|
||||||
"checksum vec_map 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cac5efe5cb0fa14ec2f84f83c701c562ee63f6dcc680861b21d65c682adfb05f"
|
"checksum vec_map 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cac5efe5cb0fa14ec2f84f83c701c562ee63f6dcc680861b21d65c682adfb05f"
|
||||||
"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
|
"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
|
||||||
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
|
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
|
||||||
|
|
|
@ -88,6 +88,10 @@ impl IndexedBlock {
|
||||||
pub fn is_final(&self, height: u32) -> bool {
|
pub fn is_final(&self, height: u32) -> bool {
|
||||||
self.transactions.iter().all(|t| t.is_final(height, self.header.time))
|
self.transactions.iter().all(|t| t.is_final(height, self.header.time))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn transaction_at(&self, index: usize) -> (&H256, &chain::Transaction) {
|
||||||
|
(&self.transaction_hashes[index], &self.transactions[index])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct IndexedTransactions<'a> {
|
pub struct IndexedTransactions<'a> {
|
||||||
|
|
|
@ -9,6 +9,7 @@ parking_lot = "0.3"
|
||||||
linked-hash-map = "0.3"
|
linked-hash-map = "0.3"
|
||||||
time = "0.1"
|
time = "0.1"
|
||||||
log = "0.3"
|
log = "0.3"
|
||||||
|
scoped-pool = "1.0"
|
||||||
|
|
||||||
ethcore-devtools = { path = "../devtools" }
|
ethcore-devtools = { path = "../devtools" }
|
||||||
primitives = { path = "../primitives" }
|
primitives = { path = "../primitives" }
|
||||||
|
|
|
@ -1,17 +1,20 @@
|
||||||
//! Bitcoin chain verifier
|
//! Bitcoin chain verifier
|
||||||
|
|
||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
use db::{self, BlockRef, BlockLocation};
|
use db::{self, BlockLocation};
|
||||||
use network::Magic;
|
use network::Magic;
|
||||||
use script::Script;
|
use script::Script;
|
||||||
use super::{Verify, VerificationResult, Chain, Error, TransactionError, ContinueVerify};
|
use super::{Verify, VerificationResult, Chain, Error, TransactionError};
|
||||||
use {chain, utils};
|
use {chain, utils};
|
||||||
|
use scoped_pool::Pool;
|
||||||
|
|
||||||
const BLOCK_MAX_FUTURE: i64 = 2 * 60 * 60; // 2 hours
|
const BLOCK_MAX_FUTURE: i64 = 2 * 60 * 60; // 2 hours
|
||||||
const COINBASE_MATURITY: u32 = 100; // 2 hours
|
const COINBASE_MATURITY: u32 = 100; // 2 hours
|
||||||
const MAX_BLOCK_SIGOPS: usize = 20000;
|
const MAX_BLOCK_SIGOPS: usize = 20000;
|
||||||
const MAX_BLOCK_SIZE: usize = 1000000;
|
const MAX_BLOCK_SIZE: usize = 1000000;
|
||||||
|
|
||||||
|
const TRANSACTIONS_VERIFY_THREADS: usize = 4;
|
||||||
|
|
||||||
pub struct ChainVerifier {
|
pub struct ChainVerifier {
|
||||||
store: db::SharedStore,
|
store: db::SharedStore,
|
||||||
verify_p2sh: bool,
|
verify_p2sh: bool,
|
||||||
|
@ -19,6 +22,7 @@ pub struct ChainVerifier {
|
||||||
skip_pow: bool,
|
skip_pow: bool,
|
||||||
skip_sig: bool,
|
skip_sig: bool,
|
||||||
network: Magic,
|
network: Magic,
|
||||||
|
pool: Pool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChainVerifier {
|
impl ChainVerifier {
|
||||||
|
@ -30,6 +34,7 @@ impl ChainVerifier {
|
||||||
skip_pow: false,
|
skip_pow: false,
|
||||||
skip_sig: false,
|
skip_sig: false,
|
||||||
network: network,
|
network: network,
|
||||||
|
pool: Pool::new(TRANSACTIONS_VERIFY_THREADS),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,7 +180,7 @@ impl ChainVerifier {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_transaction(&self,
|
pub fn verify_transaction(&self,
|
||||||
block: &db::IndexedBlock,
|
block: &db::IndexedBlock,
|
||||||
transaction: &chain::Transaction,
|
transaction: &chain::Transaction,
|
||||||
sequence: usize,
|
sequence: usize,
|
||||||
|
@ -200,7 +205,7 @@ impl ChainVerifier {
|
||||||
let signer: TransactionInputSigner = transaction.clone().into();
|
let signer: TransactionInputSigner = transaction.clone().into();
|
||||||
let paired_output = match self.previous_transaction_output(block, &input.previous_output) {
|
let paired_output = match self.previous_transaction_output(block, &input.previous_output) {
|
||||||
Some(output) => output,
|
Some(output) => output,
|
||||||
_ => return Err(TransactionError::Inconclusive(input.previous_output.hash.clone()))
|
_ => return Err(TransactionError::UnknownReference(input.previous_output.hash.clone()))
|
||||||
};
|
};
|
||||||
|
|
||||||
let checker = TransactionSignatureChecker {
|
let checker = TransactionSignatureChecker {
|
||||||
|
@ -230,6 +235,8 @@ impl ChainVerifier {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_block(&self, block: &db::IndexedBlock) -> VerificationResult {
|
fn verify_block(&self, block: &db::IndexedBlock) -> VerificationResult {
|
||||||
|
use task::Task;
|
||||||
|
|
||||||
let hash = block.hash();
|
let hash = block.hash();
|
||||||
|
|
||||||
// There should be at least 1 transaction
|
// There should be at least 1 transaction
|
||||||
|
@ -281,8 +288,26 @@ impl ChainVerifier {
|
||||||
return Err(Error::CoinbaseSignatureLength(coinbase_script_len));
|
return Err(Error::CoinbaseSignatureLength(coinbase_script_len));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (idx, (_, transaction)) in block.transactions().enumerate() {
|
// todo: might use on-stack vector (smallvec/elastic array)
|
||||||
try!(self.verify_transaction(block, transaction, idx).map_err(|e| Error::Transaction(idx, e)))
|
let mut transaction_tasks: Vec<Task> = Vec::with_capacity(TRANSACTIONS_VERIFY_THREADS);
|
||||||
|
let mut last = 0;
|
||||||
|
for num_task in 0..TRANSACTIONS_VERIFY_THREADS {
|
||||||
|
let from = last;
|
||||||
|
last = ::std::cmp::max(1, block.transaction_count() / TRANSACTIONS_VERIFY_THREADS);
|
||||||
|
if num_task == TRANSACTIONS_VERIFY_THREADS - 1 { last = block.transaction_count(); };
|
||||||
|
transaction_tasks.push(Task::new(block, from, last));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.pool.scoped(|scope| {
|
||||||
|
for task in transaction_tasks.iter_mut() {
|
||||||
|
scope.execute(move || task.progress(self))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for task in transaction_tasks.into_iter() {
|
||||||
|
if let Err((index, tx_err)) = task.result() {
|
||||||
|
return Err(Error::Transaction(index, tx_err));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: pre-process projected block number once verification is parallel!
|
// todo: pre-process projected block number once verification is parallel!
|
||||||
|
@ -361,24 +386,6 @@ impl Verify for ChainVerifier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ContinueVerify for ChainVerifier {
|
|
||||||
type State = usize;
|
|
||||||
|
|
||||||
fn continue_verify(&self, block: &db::IndexedBlock, state: usize) -> VerificationResult {
|
|
||||||
// verify transactions (except coinbase)
|
|
||||||
for (idx, (_, transaction)) in block.transactions().enumerate().skip(state - 1) {
|
|
||||||
try!(self.verify_transaction(block, transaction, idx).map_err(|e| Error::Transaction(idx, e)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let _parent = match self.store.block(BlockRef::Hash(block.header().previous_header_hash.clone())) {
|
|
||||||
Some(b) => b,
|
|
||||||
None => { return Ok(Chain::Orphan); }
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Chain::Main)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -419,23 +426,6 @@ mod tests {
|
||||||
assert_eq!(Chain::Main, verifier.verify(&b1.into()).unwrap());
|
assert_eq!(Chain::Main, verifier.verify(&b1.into()).unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn unknown_transaction_returns_inconclusive() {
|
|
||||||
let storage = TestStorage::with_blocks(
|
|
||||||
&vec![
|
|
||||||
test_data::block_h169(),
|
|
||||||
]
|
|
||||||
);
|
|
||||||
let b170 = test_data::block_h170();
|
|
||||||
let verifier = ChainVerifier::new(Arc::new(storage), Magic::Testnet);
|
|
||||||
|
|
||||||
let should_be = Err(Error::Transaction(
|
|
||||||
1,
|
|
||||||
TransactionError::Inconclusive("c997a5e56e104102fa209c6a852dd90660a20b2d9c352423edce25857fcd3704".into())
|
|
||||||
));
|
|
||||||
assert_eq!(should_be, verifier.verify(&b170.into()));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn coinbase_maturity() {
|
fn coinbase_maturity() {
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ extern crate network;
|
||||||
extern crate primitives;
|
extern crate primitives;
|
||||||
extern crate serialization;
|
extern crate serialization;
|
||||||
extern crate script;
|
extern crate script;
|
||||||
|
extern crate scoped_pool;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
extern crate ethcore_devtools as devtools;
|
extern crate ethcore_devtools as devtools;
|
||||||
|
@ -22,6 +23,7 @@ extern crate test_data;
|
||||||
mod chain_verifier;
|
mod chain_verifier;
|
||||||
mod compact;
|
mod compact;
|
||||||
mod utils;
|
mod utils;
|
||||||
|
mod task;
|
||||||
|
|
||||||
pub use primitives::{uint, hash};
|
pub use primitives::{uint, hash};
|
||||||
|
|
||||||
|
@ -70,8 +72,6 @@ pub enum TransactionError {
|
||||||
Maturity,
|
Maturity,
|
||||||
/// Signature invalid for given input
|
/// Signature invalid for given input
|
||||||
Signature(usize),
|
Signature(usize),
|
||||||
/// Inconclusive (unknown parent transaction)
|
|
||||||
Inconclusive(H256),
|
|
||||||
/// Unknown previous transaction referenced
|
/// Unknown previous transaction referenced
|
||||||
UnknownReference(H256),
|
UnknownReference(H256),
|
||||||
/// Spends more than claims
|
/// Spends more than claims
|
||||||
|
@ -116,9 +116,3 @@ pub type VerificationResult = Result<Chain, Error>;
|
||||||
pub trait Verify : Send + Sync {
|
pub trait Verify : Send + Sync {
|
||||||
fn verify(&self, block: &db::IndexedBlock) -> VerificationResult;
|
fn verify(&self, block: &db::IndexedBlock) -> VerificationResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Trait for verifier that can be interrupted and continue from the specific point
|
|
||||||
pub trait ContinueVerify : Verify + Send + Sync {
|
|
||||||
type State;
|
|
||||||
fn continue_verify(&self, block: &db::IndexedBlock, state: Self::State) -> VerificationResult;
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,406 +0,0 @@
|
||||||
//! Blocks verification queue
|
|
||||||
|
|
||||||
use chain::Block;
|
|
||||||
use primitives::hash::H256;
|
|
||||||
use super::{Chain, ContinueVerify, BlockStatus, Error as VerificationError, TransactionError};
|
|
||||||
use linked_hash_map::LinkedHashMap;
|
|
||||||
use parking_lot::RwLock;
|
|
||||||
use std::collections::{HashSet, VecDeque};
|
|
||||||
|
|
||||||
const MAX_PENDING_PRESET: usize = 128;
|
|
||||||
|
|
||||||
pub struct VerifiedBlock {
|
|
||||||
pub chain: Chain,
|
|
||||||
pub block: Block,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl VerifiedBlock {
|
|
||||||
fn new(chain: Chain, block: Block) -> Self {
|
|
||||||
VerifiedBlock { chain: chain, block: block }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
/// Queue errors
|
|
||||||
pub enum Error {
|
|
||||||
/// Queue is currently full
|
|
||||||
Full,
|
|
||||||
/// There is already block in the queue
|
|
||||||
Duplicate,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub enum ScheduleItem {
|
|
||||||
Block(Block),
|
|
||||||
Continued(Block, usize),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ScheduleItem {
|
|
||||||
fn block(self) -> Block {
|
|
||||||
match self {
|
|
||||||
ScheduleItem::Block(block) | ScheduleItem::Continued(block, _) => block,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Schedule {
|
|
||||||
items: VecDeque<(H256, ScheduleItem)>,
|
|
||||||
set: HashSet<H256>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Schedule {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Schedule { items: VecDeque::new(), set: HashSet::new() }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn push_back(&mut self, hash: H256, item: ScheduleItem) {
|
|
||||||
self.items.push_back((hash.clone(), item));
|
|
||||||
self.set.insert(hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn push_front(&mut self, hash: H256, item: ScheduleItem) {
|
|
||||||
self.items.push_front((hash.clone(), item));
|
|
||||||
self.set.insert(hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pop_front(&mut self) -> Option<(H256, ScheduleItem)> {
|
|
||||||
self.items.pop_front()
|
|
||||||
.and_then(|(h, b)| { self.set.remove(&h); Some((h, b)) })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn contains(&self, hash: &H256) -> bool {
|
|
||||||
self.set.contains(hash)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn len(&self) -> usize {
|
|
||||||
self.set.len()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn front(&self) -> Option<&ScheduleItem> {
|
|
||||||
self.items.front().and_then(|&(_, ref item)| Some(item))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Verification queue
|
|
||||||
pub struct Queue {
|
|
||||||
verifier: Box<ContinueVerify<State=usize>>,
|
|
||||||
items: RwLock<Schedule>,
|
|
||||||
|
|
||||||
// todo: write lock on verified should continue until blocks are persisted in the database
|
|
||||||
// todo: OR journal verified transactions before they are persisted
|
|
||||||
// todo: OTHERWISE verification thread may behave suboptiomal, trying to verify the same block
|
|
||||||
// todo: over and over again until in finally gets inserted
|
|
||||||
verified: RwLock<LinkedHashMap<H256, VerifiedBlock>>,
|
|
||||||
invalid: RwLock<HashSet<H256>>,
|
|
||||||
processing: RwLock<HashSet<H256>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum WorkStatus {
|
|
||||||
Continue,
|
|
||||||
Wait,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Queue {
|
|
||||||
|
|
||||||
/// New verification queue
|
|
||||||
pub fn new(verifier: Box<ContinueVerify<State=usize>>) -> Self {
|
|
||||||
Queue {
|
|
||||||
verifier: verifier,
|
|
||||||
items: RwLock::new(Schedule::new()),
|
|
||||||
verified: RwLock::new(LinkedHashMap::new()),
|
|
||||||
invalid: RwLock::new(HashSet::new()),
|
|
||||||
processing: RwLock::new(HashSet::new()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process one block in the queue
|
|
||||||
pub fn process(&self) -> WorkStatus {
|
|
||||||
let (hash, item) = {
|
|
||||||
let mut processing = self.processing.write();
|
|
||||||
let mut items = self.items.write();
|
|
||||||
|
|
||||||
if let Some(&ScheduleItem::Continued(_, _)) = items.front() {
|
|
||||||
if !self.verified.read().is_empty() || !processing.is_empty() {
|
|
||||||
// stall here until earlier blocks are processed
|
|
||||||
return WorkStatus::Wait
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match items.pop_front() {
|
|
||||||
Some((hash, item)) => {
|
|
||||||
processing.insert(hash.clone());
|
|
||||||
(hash, item)
|
|
||||||
},
|
|
||||||
/// nothing to verify
|
|
||||||
_ => { return WorkStatus::Wait; },
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match {
|
|
||||||
match item {
|
|
||||||
ScheduleItem::Block(ref block) => self.verifier.verify(block),
|
|
||||||
ScheduleItem::Continued(ref block, state) => self.verifier.continue_verify(block, state),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
{
|
|
||||||
Ok(chain) => {
|
|
||||||
let mut processing = self.processing.write();
|
|
||||||
let mut verified = self.verified.write();
|
|
||||||
processing.remove(&hash);
|
|
||||||
verified.insert(hash, VerifiedBlock::new(chain, item.block()));
|
|
||||||
},
|
|
||||||
// todo: more generic incloncusive variant type for this match?
|
|
||||||
Err(VerificationError::Transaction(num, TransactionError::Inconclusive(_))) => {
|
|
||||||
let mut processing = self.processing.write();
|
|
||||||
let mut items = self.items.write();
|
|
||||||
processing.remove(&hash);
|
|
||||||
items.push_front(hash, ScheduleItem::Continued(item.block(), num));
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
trace!(target: "verification", "Verification of block {} failed: {:?}", hash.to_reversed_str(), e);
|
|
||||||
let mut invalid = self.invalid.write();
|
|
||||||
let mut processing = self.processing.write();
|
|
||||||
|
|
||||||
processing.remove(&hash);
|
|
||||||
invalid.insert(hash);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
WorkStatus::Continue
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Query block status
|
|
||||||
pub fn block_status(&self, hash: &H256) -> BlockStatus {
|
|
||||||
if self.invalid.read().contains(hash) { BlockStatus::Invalid }
|
|
||||||
else if self.processing.read().contains(hash) { BlockStatus::Verifying }
|
|
||||||
else if self.verified.read().contains_key(hash) { BlockStatus::Valid }
|
|
||||||
else if self.items.read().contains(hash) { BlockStatus::Pending }
|
|
||||||
else { BlockStatus::Absent }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn max_pending(&self) -> usize {
|
|
||||||
// todo: later might be calculated with lazy-static here based on memory usage
|
|
||||||
MAX_PENDING_PRESET
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn push(&self, block: Block) -> Result<(), Error> {
|
|
||||||
let hash = block.hash();
|
|
||||||
|
|
||||||
if self.block_status(&hash) != BlockStatus::Absent { return Err(Error::Duplicate) }
|
|
||||||
|
|
||||||
let mut items = self.items.write();
|
|
||||||
if items.len() > self.max_pending() { return Err(Error::Full) }
|
|
||||||
items.push_back(hash, ScheduleItem::Block(block));
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pop_valid(&self) -> Option<(H256, VerifiedBlock)> {
|
|
||||||
self.verified.write().pop_front()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::Queue;
|
|
||||||
use super::super::{BlockStatus, VerificationResult, Verify, ContinueVerify, Chain, Error as VerificationError, TransactionError};
|
|
||||||
use chain::Block;
|
|
||||||
use primitives::hash::H256;
|
|
||||||
use test_data;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
struct FacileVerifier;
|
|
||||||
impl Verify for FacileVerifier {
|
|
||||||
fn verify(&self, _block: &Block) -> VerificationResult { Ok(Chain::Main) }
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ContinueVerify for FacileVerifier {
|
|
||||||
type State = usize;
|
|
||||||
fn continue_verify(&self, _block: &Block, _state: usize) -> VerificationResult { Ok(Chain::Main) }
|
|
||||||
}
|
|
||||||
|
|
||||||
struct EvilVerifier;
|
|
||||||
impl Verify for EvilVerifier {
|
|
||||||
fn verify(&self, _block: &Block) -> VerificationResult { Err(VerificationError::Empty) }
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ContinueVerify for EvilVerifier {
|
|
||||||
type State = usize;
|
|
||||||
fn continue_verify(&self, _block: &Block, _state: usize) -> VerificationResult { Ok(Chain::Main) }
|
|
||||||
}
|
|
||||||
|
|
||||||
struct HupVerifier {
|
|
||||||
hups: HashMap<H256, usize>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Verify for HupVerifier {
|
|
||||||
fn verify(&self, block: &Block) -> VerificationResult {
|
|
||||||
if let Some(hup) = self.hups.get(&block.hash()) {
|
|
||||||
Err(VerificationError::Transaction(*hup, TransactionError::Inconclusive(H256::from(0))))
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
Ok(Chain::Main)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ContinueVerify for HupVerifier {
|
|
||||||
type State = usize;
|
|
||||||
fn continue_verify(&self, _block: &Block, _state: usize) -> VerificationResult { Ok(Chain::Main) }
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn new() {
|
|
||||||
let queue = Queue::new(Box::new(FacileVerifier));
|
|
||||||
assert_eq!(queue.block_status(&H256::from(0u8)), BlockStatus::Absent);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn push() {
|
|
||||||
let queue = Queue::new(Box::new(FacileVerifier));
|
|
||||||
let block = test_data::block1();
|
|
||||||
let hash = block.hash();
|
|
||||||
|
|
||||||
queue.push(block).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(queue.block_status(&hash), BlockStatus::Pending);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn push_duplicate() {
|
|
||||||
let queue = Queue::new(Box::new(FacileVerifier));
|
|
||||||
let block = test_data::block1();
|
|
||||||
let dup_block = test_data::block1();
|
|
||||||
|
|
||||||
queue.push(block).unwrap();
|
|
||||||
let second_push = queue.push(dup_block);
|
|
||||||
|
|
||||||
assert!(second_push.is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_happy() {
|
|
||||||
let queue = Queue::new(Box::new(FacileVerifier));
|
|
||||||
let block = test_data::block1();
|
|
||||||
let hash = block.hash();
|
|
||||||
|
|
||||||
queue.push(block).unwrap();
|
|
||||||
queue.process();
|
|
||||||
|
|
||||||
assert_eq!(queue.block_status(&hash), BlockStatus::Valid);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_unhappy() {
|
|
||||||
let queue = Queue::new(Box::new(EvilVerifier));
|
|
||||||
let block = test_data::block1();
|
|
||||||
let hash = block.hash();
|
|
||||||
|
|
||||||
queue.push(block).unwrap();
|
|
||||||
queue.process();
|
|
||||||
|
|
||||||
assert_eq!(queue.block_status(&hash), BlockStatus::Invalid);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_async() {
|
|
||||||
use std::thread;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
let queue = Arc::new(Queue::new(Box::new(FacileVerifier)));
|
|
||||||
|
|
||||||
let t1_queue = queue.clone();
|
|
||||||
let t1_handle = thread::spawn(move || {
|
|
||||||
let block_h1 = test_data::block_h1();
|
|
||||||
t1_queue.push(block_h1).unwrap();
|
|
||||||
t1_queue.process();
|
|
||||||
});
|
|
||||||
|
|
||||||
let t2_queue = queue.clone();
|
|
||||||
let t2_handle = thread::spawn(move || {
|
|
||||||
let block_h2 = test_data::block_h2();
|
|
||||||
t2_queue.push(block_h2).unwrap();
|
|
||||||
t2_queue.process();
|
|
||||||
});
|
|
||||||
|
|
||||||
t1_handle.join().unwrap();
|
|
||||||
t2_handle.join().unwrap();
|
|
||||||
|
|
||||||
assert_eq!(queue.block_status(&test_data::block_h1().hash()), BlockStatus::Valid);
|
|
||||||
assert_eq!(queue.block_status(&test_data::block_h2().hash()), BlockStatus::Valid);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn pop() {
|
|
||||||
let queue = Queue::new(Box::new(FacileVerifier));
|
|
||||||
let block = test_data::block1();
|
|
||||||
let hash = block.hash();
|
|
||||||
|
|
||||||
queue.push(block).unwrap();
|
|
||||||
queue.process();
|
|
||||||
let (h, _b) = queue.pop_valid().unwrap();
|
|
||||||
|
|
||||||
assert_eq!(queue.block_status(&hash), BlockStatus::Absent);
|
|
||||||
assert_eq!(h, hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn verification_stalls_on_unverifiable() {
|
|
||||||
let b1 = test_data::block_builder()
|
|
||||||
.header().build()
|
|
||||||
.build();
|
|
||||||
let b2 = test_data::block_builder()
|
|
||||||
.header().parent(b1.hash()).build()
|
|
||||||
.build();
|
|
||||||
|
|
||||||
let mut hup_verifier = HupVerifier { hups: HashMap::new() };
|
|
||||||
hup_verifier.hups.insert(b2.hash(), 5);
|
|
||||||
|
|
||||||
let queue = Queue::new(Box::new(hup_verifier));
|
|
||||||
queue.push(b1.clone()).unwrap();
|
|
||||||
queue.push(b2.clone()).unwrap();
|
|
||||||
|
|
||||||
queue.process();
|
|
||||||
assert_eq!(queue.block_status(&b1.hash()), BlockStatus::Valid);
|
|
||||||
|
|
||||||
queue.process();
|
|
||||||
assert_eq!(queue.block_status(&b2.hash()),
|
|
||||||
BlockStatus::Pending,
|
|
||||||
"Block #2 supposed to stay in the pending state, because it requires 'processing' and 'verified' lines to be empty to continue" );
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn verification_continues_stalled_block() {
|
|
||||||
let b1 = test_data::block_builder()
|
|
||||||
.header().build()
|
|
||||||
.build();
|
|
||||||
let b2 = test_data::block_builder()
|
|
||||||
.header().parent(b1.hash()).build()
|
|
||||||
.build();
|
|
||||||
|
|
||||||
let mut hup_verifier = HupVerifier { hups: HashMap::new() };
|
|
||||||
hup_verifier.hups.insert(b2.hash(), 5);
|
|
||||||
|
|
||||||
let queue = Queue::new(Box::new(hup_verifier));
|
|
||||||
queue.push(b1.clone()).unwrap();
|
|
||||||
queue.push(b2.clone()).unwrap();
|
|
||||||
|
|
||||||
queue.process();
|
|
||||||
assert_eq!(queue.block_status(&b1.hash()), BlockStatus::Valid);
|
|
||||||
|
|
||||||
queue.process();
|
|
||||||
assert_eq!(queue.block_status(&b2.hash()),
|
|
||||||
BlockStatus::Pending,
|
|
||||||
"Block #2 supposed to stay in the pending state, because it requires 'processing' and 'verified' lines to be empty to continue" );
|
|
||||||
|
|
||||||
queue.pop_valid();
|
|
||||||
queue.process();
|
|
||||||
|
|
||||||
assert_eq!(queue.block_status(&b2.hash()),
|
|
||||||
BlockStatus::Valid,
|
|
||||||
"Block #2 supposed to achieve valid state, because it requires 'processing' and 'verified' lines to be empty, which are indeed empty" );
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
use chain_verifier::ChainVerifier;
|
||||||
|
use super::TransactionError;
|
||||||
|
use db::IndexedBlock;
|
||||||
|
|
||||||
|
pub struct Task<'a> {
|
||||||
|
block: &'a IndexedBlock,
|
||||||
|
from: usize,
|
||||||
|
to: usize,
|
||||||
|
result: Result<(), TransactionCheckError>,
|
||||||
|
}
|
||||||
|
|
||||||
|
type TransactionCheckError = (usize, TransactionError);
|
||||||
|
|
||||||
|
impl<'a> Task<'a> {
|
||||||
|
pub fn new(block: &'a IndexedBlock, from: usize, to: usize) -> Self {
|
||||||
|
Task {
|
||||||
|
block: block,
|
||||||
|
from: from,
|
||||||
|
to: to,
|
||||||
|
result: Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn progress(&mut self, verifier: &ChainVerifier) {
|
||||||
|
for index in self.from..self.to {
|
||||||
|
if let Err(e) = verifier.verify_transaction(self.block, self.block.transaction_at(index).1, index) {
|
||||||
|
self.result = Err((index, e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.result = Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn result(self) -> Result<(), TransactionCheckError> {
|
||||||
|
self.result
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue