Merge pull request #148 from ethcore/sync_separate_verifier

Sync client can now use different verifiers (async && dummy, etc.)
This commit is contained in:
Svyatoslav Nikolsky 2016-11-20 09:48:01 +03:00 committed by GitHub
commit a526f227cf
7 changed files with 766 additions and 528 deletions

View File

@ -14,6 +14,7 @@ pub struct Information {
// TODO: currently it supports first chain only (so whatever headers sequence came first, it is best)
/// Builds the block-header-chain of in-memory blocks, for which only headers are currently known
#[derive(Debug)]
pub struct BestHeadersChain {
/// Best hash in storage
storage_best_hash: H256,

View File

@ -15,13 +15,14 @@ pub enum HashPosition {
}
/// Ordered queue with O(1) contains() && random access operations cost.
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct HashQueue {
queue: VecDeque<H256>,
set: HashSet<H256>,
}
/// Chain of linked queues. First queue has index zero.
#[derive(Debug)]
pub struct HashQueueChain {
chain: Vec<HashQueue>,
}

View File

@ -31,6 +31,7 @@ mod synchronization_executor;
mod synchronization_manager;
mod synchronization_peers;
mod synchronization_server;
mod synchronization_verifier;
use std::sync::Arc;
use parking_lot::RwLock;
@ -60,12 +61,15 @@ pub fn create_sync_connection_factory(handle: &Handle, consensus_params: Consens
use local_node::LocalNode as SyncNode;
use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory;
use synchronization_server::SynchronizationServer;
use synchronization_client::{SynchronizationClient, Config as SynchronizationConfig};
use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig};
use synchronization_verifier::AsyncVerifier;
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
let sync_executor = SyncExecutor::new(sync_chain.clone());
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
let sync_client = SynchronizationClient::new(SynchronizationConfig::with_consensus_params(consensus_params), handle, sync_executor.clone(), sync_chain);
let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone());
let verifier = AsyncVerifier::new(consensus_params, sync_chain, sync_client_core.clone());
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
SyncConnectionFactory::with_local_node(sync_node)
}

View File

@ -9,9 +9,10 @@ use message::types;
use synchronization_client::{Client, SynchronizationClient};
use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor};
use synchronization_server::{Server, SynchronizationServer};
use synchronization_verifier::AsyncVerifier;
use primitives::hash::H256;
pub type LocalNodeRef = Arc<LocalNode<LocalSynchronizationTaskExecutor, SynchronizationServer, SynchronizationClient<LocalSynchronizationTaskExecutor>>>;
pub type LocalNodeRef = Arc<LocalNode<LocalSynchronizationTaskExecutor, SynchronizationServer, SynchronizationClient<LocalSynchronizationTaskExecutor, AsyncVerifier>>>;
/// Local synchronization node
pub struct LocalNode<T: SynchronizationTaskExecutor + PeersConnections,
@ -229,16 +230,17 @@ mod tests {
use chain::RepresentH256;
use synchronization_executor::Task;
use synchronization_executor::tests::DummyTaskExecutor;
use synchronization_client::{Config, SynchronizationClient};
use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore};
use synchronization_chain::Chain;
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
use message::types;
use message::common::{Magic, ConsensusParams, InventoryVector, InventoryType};
use message::common::{InventoryVector, InventoryType};
use db;
use super::LocalNode;
use test_data;
use synchronization_server::ServerTask;
use synchronization_server::tests::DummyServer;
use synchronization_verifier::tests::DummyVerifier;
use tokio_core::reactor::{Core, Handle};
struct DummyOutboundSyncConnection;
@ -272,14 +274,17 @@ mod tests {
fn ignored(&self, _id: u32) {}
}
fn create_local_node() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor>>) {
fn create_local_node() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor, DummyVerifier>>) {
let event_loop = event_loop();
let handle = event_loop.handle();
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
let executor = DummyTaskExecutor::new();
let server = Arc::new(DummyServer::new());
let config = Config { consensus_params: ConsensusParams::with_magic(Magic::Mainnet), threads_num: 1, skip_verification: true };
let client = SynchronizationClient::new(config, &handle, executor.clone(), chain);
let config = Config { threads_num: 1 };
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone());
let mut verifier = DummyVerifier::default();
verifier.set_sink(client_core.clone());
let client = SynchronizationClient::new(client_core, verifier);
let local_node = LocalNode::new(server.clone(), client, executor.clone());
(event_loop, handle, executor, server, local_node)
}

View File

@ -280,6 +280,13 @@ impl Chain {
self.hash_chain.push_back_at(VERIFYING_QUEUE, hash);
}
/// Add blocks to verifying queue
pub fn verify_blocks(&mut self, blocks: Vec<(H256, BlockHeader)>) {
for (hash, header) in blocks {
self.verify_block(hash, header);
}
}
/// Moves n blocks from requested queue to verifying queue
#[cfg(test)]
pub fn verify_blocks_hashes(&mut self, n: u32) -> Vec<H256> {

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,197 @@
use std::thread;
use std::sync::Arc;
use std::sync::mpsc::{channel, Sender, Receiver};
use parking_lot::Mutex;
use chain::{Block, Transaction, RepresentH256};
use message::common::ConsensusParams;
use primitives::hash::H256;
use verification::{ChainVerifier, Verify as VerificationVerify};
use synchronization_chain::ChainRef;
/// Verification events sink
pub trait VerificationSink : Send + 'static {
/// When block verification has completed successfully.
fn on_block_verification_success(&mut self, block: Block);
/// When block verification has failed.
fn on_block_verification_error(&mut self, err: &str, hash: &H256);
/// When transaction verification has completed successfully.
fn on_transaction_verification_success(&mut self, transaction: Transaction);
/// When transaction verification has failed.
fn on_transaction_verification_error(&mut self, err: &str, hash: &H256);
}
/// Verification thread tasks
enum VerificationTask {
/// Verify single block
VerifyBlock(Block),
/// Verify single transaction
VerifyTransaction(Transaction),
/// Stop verification thread
Stop,
}
/// Synchronization verifier
pub trait Verifier : Send + 'static {
/// Verify block
fn verify_block(&self, block: Block);
/// Verify transaction
fn verify_transaction(&self, transaction: Transaction);
}
/// Asynchronous synchronization verifier
pub struct AsyncVerifier {
/// Verification work transmission channel.
verification_work_sender: Sender<VerificationTask>,
/// Verification thread.
verification_worker_thread: Option<thread::JoinHandle<()>>,
}
impl AsyncVerifier {
/// Create new async verifier
pub fn new<T: VerificationSink>(consensus_params: ConsensusParams, chain: ChainRef, sink: Arc<Mutex<T>>) -> Self {
let (verification_work_sender, verification_work_receiver) = channel();
let storage = chain.read().storage();
let verifier = ChainVerifier::new(storage);
AsyncVerifier {
verification_work_sender: verification_work_sender,
verification_worker_thread: Some(thread::Builder::new()
.name("Sync verification thread".to_string())
.spawn(move || {
AsyncVerifier::verification_worker_proc(sink, chain, consensus_params, verifier, verification_work_receiver)
})
.expect("Error creating verification thread"))
}
}
/// Thread procedure for handling verification tasks
fn verification_worker_proc<T: VerificationSink>(sink: Arc<Mutex<T>>, chain: ChainRef, consensus_params: ConsensusParams, mut verifier: ChainVerifier, work_receiver: Receiver<VerificationTask>) {
let bip16_time_border = consensus_params.bip16_time;
let mut is_bip16_active = false;
let mut parameters_change_steps = Some(0);
while let Ok(task) = work_receiver.recv() {
match task {
VerificationTask::VerifyBlock(block) => {
// for changes that are not relying on block#
let is_bip16_active_on_block = block.block_header.time >= bip16_time_border;
let force_parameters_change = is_bip16_active_on_block != is_bip16_active;
if force_parameters_change {
parameters_change_steps = Some(0);
}
// change verifier parameters, if needed
if let Some(steps_left) = parameters_change_steps {
if steps_left == 0 {
let best_storage_block = chain.read().best_storage_block();
is_bip16_active = is_bip16_active_on_block;
verifier = verifier.verify_p2sh(is_bip16_active);
let is_bip65_active = best_storage_block.number >= consensus_params.bip65_height;
verifier = verifier.verify_clocktimeverify(is_bip65_active);
if is_bip65_active {
parameters_change_steps = None;
} else {
parameters_change_steps = Some(consensus_params.bip65_height - best_storage_block.number);
}
} else {
parameters_change_steps = Some(steps_left - 1);
}
}
// verify block
match verifier.verify(&block) {
Ok(_chain) => {
sink.lock().on_block_verification_success(block)
},
Err(e) => {
sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash())
}
}
},
VerificationTask::VerifyTransaction(transaction) => {
// TODO: add verification here
sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash())
}
VerificationTask::Stop => break,
}
}
}
}
impl Drop for AsyncVerifier {
fn drop(&mut self) {
if let Some(join_handle) = self.verification_worker_thread.take() {
// ignore send error here <= destructing anyway
let _ = self.verification_work_sender.send(VerificationTask::Stop);
join_handle.join().expect("Clean shutdown.");
}
}
}
impl Verifier for AsyncVerifier {
/// Verify block
fn verify_block(&self, block: Block) {
self.verification_work_sender
.send(VerificationTask::VerifyBlock(block))
.expect("Verification thread have the same lifetime as `AsyncVerifier`");
}
/// Verify transaction
fn verify_transaction(&self, transaction: Transaction) {
self.verification_work_sender
.send(VerificationTask::VerifyTransaction(transaction))
.expect("Verification thread have the same lifetime as `AsyncVerifier`");
}
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use std::collections::HashMap;
use parking_lot::Mutex;
use chain::{Block, Transaction, RepresentH256};
use synchronization_client::SynchronizationClientCore;
use synchronization_executor::tests::DummyTaskExecutor;
use primitives::hash::H256;
use super::{Verifier, VerificationSink};
#[derive(Default)]
pub struct DummyVerifier {
sink: Option<Arc<Mutex<SynchronizationClientCore<DummyTaskExecutor>>>>,
errors: HashMap<H256, String>
}
impl DummyVerifier {
pub fn set_sink(&mut self, sink: Arc<Mutex<SynchronizationClientCore<DummyTaskExecutor>>>) {
self.sink = Some(sink);
}
pub fn error_when_verifying(&mut self, hash: H256, err: &str) {
self.errors.insert(hash, err.into());
}
}
impl Verifier for DummyVerifier {
fn verify_block(&self, block: Block) {
match self.sink {
Some(ref sink) => match self.errors.get(&block.hash()) {
Some(err) => sink.lock().on_block_verification_error(&err, &block.hash()),
None => sink.lock().on_block_verification_success(block),
},
None => panic!("call set_sink"),
}
}
fn verify_transaction(&self, transaction: Transaction) {
match self.sink {
Some(ref sink) => match self.errors.get(&transaction.hash()) {
Some(err) => sink.lock().on_transaction_verification_error(&err, &transaction.hash()),
None => sink.lock().on_transaction_verification_success(transaction),
},
None => panic!("call set_sink"),
}
}
}
}