use rayon for parallel verification

This commit is contained in:
debris 2016-12-11 17:42:26 +01:00
parent f8c71c6c37
commit 0df90a85bd
9 changed files with 64 additions and 90 deletions

13
Cargo.lock generated
View File

@ -10,6 +10,7 @@ dependencies = [
"network 0.1.0", "network 0.1.0",
"parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0", "primitives 0.1.0",
"rayon 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"scoped-pool 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "scoped-pool 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"script 0.1.0", "script 0.1.0",
"serialization 0.1.0", "serialization 0.1.0",
@ -747,6 +748,17 @@ dependencies = [
"rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "rayon"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "0.1.80" version = "0.1.80"
@ -1283,6 +1295,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum quick-error 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0aad603e8d7fb67da22dbdf1f4b826ce8829e406124109e73cf1b2454b93a71c" "checksum quick-error 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0aad603e8d7fb67da22dbdf1f4b826ce8829e406124109e73cf1b2454b93a71c"
"checksum rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "022e0636ec2519ddae48154b028864bdce4eaf7d35226ab8e65c611be97b189d" "checksum rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "022e0636ec2519ddae48154b028864bdce4eaf7d35226ab8e65c611be97b189d"
"checksum rayon 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0f0783f5880c56f5a308e219ac9309dbe781e064741dd5def4c617c440890305" "checksum rayon 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0f0783f5880c56f5a308e219ac9309dbe781e064741dd5def4c617c440890305"
"checksum rayon 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3b6a6e05e0e6b703e9f2ad266eb63f3712e693a17a2702b95a23de14ce8defa9"
"checksum regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)" = "4fd4ace6a8cf7860714a2c2280d6c1f7e6a413486c13298bbc86fd3da019402f" "checksum regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)" = "4fd4ace6a8cf7860714a2c2280d6c1f7e6a413486c13298bbc86fd3da019402f"
"checksum regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "f9ec002c35e86791825ed294b50008eea9ddfc8def4420124fbc6b08db834957" "checksum regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "f9ec002c35e86791825ed294b50008eea9ddfc8def4420124fbc6b08db834957"
"checksum rocksdb 0.4.5 (git+https://github.com/ethcore/rust-rocksdb)" = "<none>" "checksum rocksdb 0.4.5 (git+https://github.com/ethcore/rust-rocksdb)" = "<none>"

View File

@ -135,7 +135,7 @@ impl Default for BlockAssembler {
/// Iterator iterating over mempool transactions and yielding only those which fit the block /// Iterator iterating over mempool transactions and yielding only those which fit the block
struct FittingTransactionsIterator<'a, T> { struct FittingTransactionsIterator<'a, T> {
/// Shared store is used to query previous transaction outputs from database /// Shared store is used to query previous transaction outputs from database
store: &'a SharedStore, store: &'a PreviousTransactionOutputProvider,
/// Memory pool transactions iterator /// Memory pool transactions iterator
iter: T, iter: T,
/// Size policy decides if transactions size fits the block /// Size policy decides if transactions size fits the block
@ -149,7 +149,7 @@ struct FittingTransactionsIterator<'a, T> {
} }
impl<'a, T> FittingTransactionsIterator<'a, T> where T: Iterator<Item = &'a Entry> { impl<'a, T> FittingTransactionsIterator<'a, T> where T: Iterator<Item = &'a Entry> {
fn new(store: &'a SharedStore, iter: T, max_block_size: u32, max_block_sigops: u32) -> Self { fn new(store: &'a PreviousTransactionOutputProvider, iter: T, max_block_size: u32, max_block_sigops: u32) -> Self {
FittingTransactionsIterator { FittingTransactionsIterator {
store: store, store: store,
iter: iter, iter: iter,
@ -164,11 +164,13 @@ impl<'a, T> FittingTransactionsIterator<'a, T> where T: Iterator<Item = &'a Entr
impl<'a, T> PreviousTransactionOutputProvider for FittingTransactionsIterator<'a, T> { impl<'a, T> PreviousTransactionOutputProvider for FittingTransactionsIterator<'a, T> {
fn previous_transaction_output(&self, prevout: &OutPoint) -> Option<TransactionOutput> { fn previous_transaction_output(&self, prevout: &OutPoint) -> Option<TransactionOutput> {
self.store.transaction(&prevout.hash) self.store.previous_transaction_output(prevout)
.as_ref() .or_else(|| {
.or_else(|| self.previous_entries.iter().find(|e| e.hash == prevout.hash).map(|e| &e.transaction)) self.previous_entries.iter()
.and_then(|tx| tx.outputs.iter().nth(prevout.index as usize)) .find(|e| e.hash == prevout.hash)
.cloned() .and_then(|e| e.transaction.outputs.iter().nth(prevout.index as usize))
.cloned()
})
} }
} }
@ -235,7 +237,7 @@ impl BlockAssembler {
let mut transactions = Vec::new(); let mut transactions = Vec::new();
let mempool_iter = mempool.iter(OrderingStrategy::ByTransactionScore); let mempool_iter = mempool.iter(OrderingStrategy::ByTransactionScore);
let tx_iter = FittingTransactionsIterator::new(store, mempool_iter, self.max_block_size, self.max_block_sigops); let tx_iter = FittingTransactionsIterator::new(store.as_previous_transaction_output_provider(), mempool_iter, self.max_block_size, self.max_block_sigops);
for entry in tx_iter { for entry in tx_iter {
// miner_fee is i64, but we can safely cast it to u64 // miner_fee is i64, but we can safely cast it to u64
// memory pool should restrict miner fee to be positive // memory pool should restrict miner fee to be positive
@ -260,7 +262,10 @@ impl BlockAssembler {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{SizePolicy, NextStep}; use db::IndexedTransaction;
use verification::{MAX_BLOCK_SIZE, MAX_BLOCK_SIGOPS};
use memory_pool::Entry;
use super::{SizePolicy, NextStep, FittingTransactionsIterator};
#[test] #[test]
fn test_size_policy() { fn test_size_policy() {
@ -292,4 +297,18 @@ mod tests {
assert_eq!(NextStep::FinishAndAppend.and(NextStep::Ignore), NextStep::FinishAndIgnore); assert_eq!(NextStep::FinishAndAppend.and(NextStep::Ignore), NextStep::FinishAndIgnore);
assert_eq!(NextStep::FinishAndAppend.and(NextStep::Append), NextStep::FinishAndAppend); assert_eq!(NextStep::FinishAndAppend.and(NextStep::Append), NextStep::FinishAndAppend);
} }
#[test]
fn test_fitting_transactions_iterator_no_transactions() {
let store: Vec<IndexedTransaction> = Vec::new();
let entries: Vec<Entry> = Vec::new();
let store_ref: &[_] = &store;
let iter = FittingTransactionsIterator::new(&store_ref, entries.iter(), MAX_BLOCK_SIZE as u32, MAX_BLOCK_SIGOPS as u32);
assert!(iter.collect::<Vec<_>>().is_empty());
}
#[test]
fn test_fitting_transactions_iterator_max_block_size_reached() {
}
} }

View File

@ -9,6 +9,7 @@ parking_lot = "0.3"
time = "0.1" time = "0.1"
log = "0.3" log = "0.3"
scoped-pool = "1.0" scoped-pool = "1.0"
rayon = "0.5"
ethcore-devtools = { path = "../devtools" } ethcore-devtools = { path = "../devtools" }
primitives = { path = "../primitives" } primitives = { path = "../primitives" }

View File

@ -1,10 +1,8 @@
use std::ops;
use network::{Magic, ConsensusParams}; use network::{Magic, ConsensusParams};
use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider, BlockHeaderProvider}; use db::{SharedStore, PreviousTransactionOutputProvider, BlockHeaderProvider};
use sigops::{StoreWithUnretainedOutputs, transaction_sigops}; use sigops::{StoreWithUnretainedOutputs, transaction_sigops};
use utils::{work_required, block_reward_satoshi}; use utils::{work_required, block_reward_satoshi};
use accept_header::CanonHeader; use canon::CanonBlock;
use accept_transaction::CanonTransaction;
use constants::MAX_BLOCK_SIGOPS; use constants::MAX_BLOCK_SIGOPS;
use error::Error; use error::Error;
@ -38,36 +36,6 @@ impl<'a> BlockAcceptor<'a> {
} }
} }
/// Blocks whose parents are known to be in the chain
#[derive(Clone, Copy)]
pub struct CanonBlock<'a> {
block: &'a IndexedBlock,
}
impl<'a> CanonBlock<'a> {
pub fn new(block: &'a IndexedBlock) -> Self {
CanonBlock {
block: block,
}
}
pub fn header<'b>(&'b self) -> CanonHeader<'a> where 'a: 'b {
CanonHeader::new(&self.block.header)
}
pub fn transactions<'b>(&'b self) -> Vec<CanonTransaction<'a>> where 'a: 'b {
self.block.transactions.iter().map(CanonTransaction::new).collect()
}
}
impl<'a> ops::Deref for CanonBlock<'a> {
type Target = IndexedBlock;
fn deref(&self) -> &Self::Target {
self.block
}
}
trait BlockRule { trait BlockRule {
/// If verification fails returns an error /// If verification fails returns an error
fn check(&self) -> Result<(), Error>; fn check(&self) -> Result<(), Error>;

View File

@ -1,8 +1,9 @@
use scoped_pool::Pool; use rayon::prelude::{IntoParallelRefIterator, IndexedParallelIterator, ParallelIterator};
use db::SharedStore; use db::SharedStore;
use network::Magic; use network::Magic;
use error::Error; use error::Error;
use accept_block::{CanonBlock, BlockAcceptor}; use canon::CanonBlock;
use accept_block::BlockAcceptor;
use accept_header::HeaderAcceptor; use accept_header::HeaderAcceptor;
use accept_transaction::TransactionAcceptor; use accept_transaction::TransactionAcceptor;
@ -24,14 +25,10 @@ impl<'a> ChainAcceptor<'a> {
pub fn check(&self) -> Result<(), Error> { pub fn check(&self) -> Result<(), Error> {
try!(self.block.check()); try!(self.block.check());
try!(self.header.check()); try!(self.header.check());
self.transactions.iter() self.transactions.par_iter()
.enumerate() .enumerate()
.map(|(index, tx)| tx.check().map_err(|err| Error::Transaction(index, err))) .fold(|| Ok(()), |result, (index, tx)| result.and_then(|_| tx.check().map_err(|err| Error::Transaction(index, err))))
.collect::<Result<Vec<_>, _>>()?; .reduce(|| Ok(()), |acc, check| acc.and(check))?;
Ok(()) Ok(())
} }
pub fn parallel_check(&self, _pool: &Pool) -> Result<(), Error> {
unimplemented!();
}
} }

View File

@ -1,4 +1,4 @@
use db::IndexedBlockHeader; use canon::CanonHeader;
use error::Error; use error::Error;
pub struct HeaderAcceptor<'a> { pub struct HeaderAcceptor<'a> {
@ -16,16 +16,3 @@ impl<'a> HeaderAcceptor<'a> {
Ok(()) Ok(())
} }
} }
#[derive(Clone, Copy)]
pub struct CanonHeader<'a> {
header: &'a IndexedBlockHeader,
}
impl<'a> CanonHeader<'a> {
pub fn new(header: &'a IndexedBlockHeader) -> Self {
CanonHeader {
header: header,
}
}
}

View File

@ -1,4 +1,4 @@
use db::IndexedTransaction; use canon::CanonTransaction;
use error::TransactionError; use error::TransactionError;
pub struct TransactionAcceptor<'a> { pub struct TransactionAcceptor<'a> {
@ -17,15 +17,3 @@ impl<'a> TransactionAcceptor<'a> {
} }
} }
#[derive(Clone, Copy)]
pub struct CanonTransaction<'a> {
transaction: &'a IndexedTransaction,
}
impl<'a> CanonTransaction<'a> {
pub fn new(transaction: &'a IndexedTransaction) -> Self {
CanonTransaction {
transaction: transaction,
}
}
}

View File

@ -40,6 +40,7 @@ extern crate time;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate scoped_pool; extern crate scoped_pool;
extern crate rayon;
extern crate db; extern crate db;
extern crate chain; extern crate chain;
@ -60,6 +61,7 @@ mod task;
mod utils; mod utils;
pub mod constants; pub mod constants;
mod canon;
mod accept_block; mod accept_block;
mod accept_chain; mod accept_chain;
mod accept_header; mod accept_header;
@ -71,10 +73,13 @@ mod verify_transaction;
pub use primitives::{uint, hash, compact}; pub use primitives::{uint, hash, compact};
pub use accept_block::{BlockAcceptor, CanonBlock}; pub use canon::{CanonBlock, CanonHeader, CanonTransaction};
pub use accept_block::BlockAcceptor;
pub use accept_chain::ChainAcceptor; pub use accept_chain::ChainAcceptor;
pub use accept_header::{HeaderAcceptor, CanonHeader}; pub use accept_header::HeaderAcceptor;
pub use accept_transaction::{TransactionAcceptor, CanonTransaction}; pub use accept_transaction::TransactionAcceptor;
pub use verify_block::BlockVerifier; pub use verify_block::BlockVerifier;
pub use verify_chain::ChainVerifier as XXXChainVerifier; pub use verify_chain::ChainVerifier as XXXChainVerifier;
pub use verify_header::HeaderVerifier; pub use verify_header::HeaderVerifier;

View File

@ -1,4 +1,4 @@
use scoped_pool::Pool; use rayon::prelude::{IntoParallelRefIterator, IndexedParallelIterator, ParallelIterator};
use db::IndexedBlock; use db::IndexedBlock;
use network::Magic; use network::Magic;
use error::Error; use error::Error;
@ -24,14 +24,10 @@ impl<'a> ChainVerifier<'a> {
pub fn check(&self) -> Result<(), Error> { pub fn check(&self) -> Result<(), Error> {
try!(self.block.check()); try!(self.block.check());
try!(self.header.check()); try!(self.header.check());
self.transactions.iter() self.transactions.par_iter()
.enumerate() .enumerate()
.map(|(index, tx)| tx.check().map_err(|err| Error::Transaction(index, err))) .fold(|| Ok(()), |result, (index, tx)| result.and_then(|_| tx.check().map_err(|err| Error::Transaction(index, err))))
.collect::<Result<Vec<_>, _>>()?; .reduce(|| Ok(()), |acc, check| acc.and(check))?;
Ok(()) Ok(())
} }
pub fn parallel_check(&self, _pool: &Pool) -> Result<(), Error> {
unimplemented!();
}
} }