separate sync verifier
This commit is contained in:
parent
bd2d5c6bbc
commit
bd67160689
|
@ -31,6 +31,7 @@ mod synchronization_executor;
|
|||
mod synchronization_manager;
|
||||
mod synchronization_peers;
|
||||
mod synchronization_server;
|
||||
mod synchronization_verifier;
|
||||
|
||||
use std::sync::Arc;
|
||||
use parking_lot::RwLock;
|
||||
|
@ -61,11 +62,18 @@ pub fn create_sync_connection_factory(handle: &Handle, consensus_params: Consens
|
|||
use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory;
|
||||
use synchronization_server::SynchronizationServer;
|
||||
use synchronization_client::{SynchronizationClient, Config as SynchronizationConfig};
|
||||
use synchronization_verifier::AsyncVerifier;
|
||||
|
||||
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
|
||||
let sync_executor = SyncExecutor::new(sync_chain.clone());
|
||||
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
|
||||
let sync_client = SynchronizationClient::new(SynchronizationConfig::with_consensus_params(consensus_params), handle, sync_executor.clone(), sync_chain);
|
||||
let sync_client = SynchronizationClient::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone());
|
||||
{
|
||||
let verifier_sink = sync_client.lock().core();
|
||||
let verifier = AsyncVerifier::new(consensus_params, sync_chain, verifier_sink);
|
||||
sync_client.lock().set_verifier(verifier);
|
||||
}
|
||||
|
||||
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
|
||||
SyncConnectionFactory::with_local_node(sync_node)
|
||||
}
|
||||
|
|
|
@ -9,9 +9,10 @@ use message::types;
|
|||
use synchronization_client::{Client, SynchronizationClient};
|
||||
use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor};
|
||||
use synchronization_server::{Server, SynchronizationServer};
|
||||
use synchronization_verifier::AsyncVerifier;
|
||||
use primitives::hash::H256;
|
||||
|
||||
pub type LocalNodeRef = Arc<LocalNode<LocalSynchronizationTaskExecutor, SynchronizationServer, SynchronizationClient<LocalSynchronizationTaskExecutor>>>;
|
||||
pub type LocalNodeRef = Arc<LocalNode<LocalSynchronizationTaskExecutor, SynchronizationServer, SynchronizationClient<LocalSynchronizationTaskExecutor, AsyncVerifier>>>;
|
||||
|
||||
/// Local synchronization node
|
||||
pub struct LocalNode<T: SynchronizationTaskExecutor + PeersConnections,
|
||||
|
@ -233,12 +234,13 @@ mod tests {
|
|||
use synchronization_chain::Chain;
|
||||
use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef};
|
||||
use message::types;
|
||||
use message::common::{Magic, ConsensusParams, InventoryVector, InventoryType};
|
||||
use message::common::{InventoryVector, InventoryType};
|
||||
use db;
|
||||
use super::LocalNode;
|
||||
use test_data;
|
||||
use synchronization_server::ServerTask;
|
||||
use synchronization_server::tests::DummyServer;
|
||||
use synchronization_verifier::tests::DummyVerifier;
|
||||
use tokio_core::reactor::{Core, Handle};
|
||||
|
||||
struct DummyOutboundSyncConnection;
|
||||
|
@ -272,14 +274,19 @@ mod tests {
|
|||
fn ignored(&self, _id: u32) {}
|
||||
}
|
||||
|
||||
fn create_local_node() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor>>) {
|
||||
fn create_local_node() -> (Core, Handle, Arc<Mutex<DummyTaskExecutor>>, Arc<DummyServer>, LocalNode<DummyTaskExecutor, DummyServer, SynchronizationClient<DummyTaskExecutor, DummyVerifier>>) {
|
||||
let event_loop = event_loop();
|
||||
let handle = event_loop.handle();
|
||||
let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block()))));
|
||||
let executor = DummyTaskExecutor::new();
|
||||
let server = Arc::new(DummyServer::new());
|
||||
let config = Config { consensus_params: ConsensusParams::with_magic(Magic::Mainnet), threads_num: 1, skip_verification: true };
|
||||
let client = SynchronizationClient::new(config, &handle, executor.clone(), chain);
|
||||
let config = Config { threads_num: 1 };
|
||||
let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone());
|
||||
{
|
||||
let verifier_sink = client.lock().core();
|
||||
let verifier = DummyVerifier::new(verifier_sink);
|
||||
client.lock().set_verifier(verifier);
|
||||
}
|
||||
let local_node = LocalNode::new(server.clone(), client, executor.clone());
|
||||
(event_loop, handle, executor, server, local_node)
|
||||
}
|
||||
|
|
|
@ -280,6 +280,13 @@ impl Chain {
|
|||
self.hash_chain.push_back_at(VERIFYING_QUEUE, hash);
|
||||
}
|
||||
|
||||
/// Add blocks to verifying queue
|
||||
pub fn verify_blocks(&mut self, blocks: Vec<(H256, BlockHeader)>) {
|
||||
for (hash, header) in blocks {
|
||||
self.verify_block(hash, header);
|
||||
}
|
||||
}
|
||||
|
||||
/// Moves n blocks from requested queue to verifying queue
|
||||
#[cfg(test)]
|
||||
pub fn verify_blocks_hashes(&mut self, n: u32) -> Vec<H256> {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,179 @@
|
|||
use std::thread;
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc::{channel, Sender, Receiver};
|
||||
use parking_lot::Mutex;
|
||||
use chain::{Block, Transaction, RepresentH256};
|
||||
use message::common::ConsensusParams;
|
||||
use primitives::hash::H256;
|
||||
use verification::{ChainVerifier, Verify as VerificationVerify};
|
||||
use synchronization_chain::ChainRef;
|
||||
|
||||
/// Verification events sink
|
||||
pub trait VerificationSink : Send + 'static {
|
||||
/// When block verification has completed successfully.
|
||||
fn on_block_verification_success(&mut self, block: Block);
|
||||
/// When block verification has failed.
|
||||
fn on_block_verification_error(&mut self, err: &str, hash: &H256);
|
||||
/// When transaction verification has completed successfully.
|
||||
fn on_transaction_verification_success(&mut self, transaction: Transaction);
|
||||
/// When transaction verification has failed.
|
||||
fn on_transaction_verification_error(&mut self, err: &str, hash: &H256);
|
||||
}
|
||||
|
||||
/// Verification thread tasks
|
||||
enum VerificationTask {
|
||||
/// Verify single block
|
||||
VerifyBlock(Block),
|
||||
/// Verify single transaction
|
||||
VerifyTransaction(Transaction),
|
||||
/// Stop verification thread
|
||||
Stop,
|
||||
}
|
||||
|
||||
/// Synchronization verifier
|
||||
pub trait Verifier : Send + 'static {
|
||||
/// Verify block
|
||||
fn verify_block(&self, block: Block);
|
||||
/// Verify transaction
|
||||
fn verify_transaction(&self, transaction: Transaction);
|
||||
}
|
||||
|
||||
/// Asynchronous synchronization verifier
|
||||
pub struct AsyncVerifier {
|
||||
/// Verification work transmission channel.
|
||||
verification_work_sender: Sender<VerificationTask>,
|
||||
/// Verification thread.
|
||||
verification_worker_thread: Option<thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl AsyncVerifier {
|
||||
/// Create new async verifier
|
||||
pub fn new<T: VerificationSink>(consensus_params: ConsensusParams, chain: ChainRef, sink: Arc<Mutex<T>>) -> Self {
|
||||
let (verification_work_sender, verification_work_receiver) = channel();
|
||||
let storage = chain.read().storage();
|
||||
let verifier = ChainVerifier::new(storage);
|
||||
AsyncVerifier {
|
||||
verification_work_sender: verification_work_sender,
|
||||
verification_worker_thread: Some(thread::Builder::new()
|
||||
.name("Sync verification thread".to_string())
|
||||
.spawn(move || {
|
||||
AsyncVerifier::verification_worker_proc(sink, chain, consensus_params, verifier, verification_work_receiver)
|
||||
})
|
||||
.expect("Error creating verification thread"))
|
||||
}
|
||||
}
|
||||
|
||||
/// Thread procedure for handling verification tasks
|
||||
fn verification_worker_proc<T: VerificationSink>(sink: Arc<Mutex<T>>, chain: ChainRef, consensus_params: ConsensusParams, mut verifier: ChainVerifier, work_receiver: Receiver<VerificationTask>) {
|
||||
let bip16_time_border = consensus_params.bip16_time;
|
||||
let mut is_bip16_active = false;
|
||||
let mut parameters_change_steps = Some(0);
|
||||
|
||||
while let Ok(task) = work_receiver.recv() {
|
||||
match task {
|
||||
VerificationTask::VerifyBlock(block) => {
|
||||
// for changes that are not relying on block#
|
||||
let is_bip16_active_on_block = block.block_header.time >= bip16_time_border;
|
||||
let force_parameters_change = is_bip16_active_on_block != is_bip16_active;
|
||||
if force_parameters_change {
|
||||
parameters_change_steps = Some(0);
|
||||
}
|
||||
|
||||
// change verifier parameters, if needed
|
||||
if let Some(steps_left) = parameters_change_steps {
|
||||
if steps_left == 0 {
|
||||
let best_storage_block = chain.read().best_storage_block();
|
||||
|
||||
is_bip16_active = is_bip16_active_on_block;
|
||||
verifier = verifier.verify_p2sh(is_bip16_active);
|
||||
|
||||
let is_bip65_active = best_storage_block.number >= consensus_params.bip65_height;
|
||||
verifier = verifier.verify_clocktimeverify(is_bip65_active);
|
||||
|
||||
if is_bip65_active {
|
||||
parameters_change_steps = None;
|
||||
} else {
|
||||
parameters_change_steps = Some(consensus_params.bip65_height - best_storage_block.number);
|
||||
}
|
||||
} else {
|
||||
parameters_change_steps = Some(steps_left - 1);
|
||||
}
|
||||
}
|
||||
|
||||
// verify block
|
||||
match verifier.verify(&block) {
|
||||
Ok(_chain) => {
|
||||
sink.lock().on_block_verification_success(block)
|
||||
},
|
||||
Err(e) => {
|
||||
sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash())
|
||||
}
|
||||
}
|
||||
},
|
||||
VerificationTask::VerifyTransaction(transaction) => {
|
||||
// TODO: add verification here
|
||||
sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash())
|
||||
}
|
||||
VerificationTask::Stop => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for AsyncVerifier {
|
||||
fn drop(&mut self) {
|
||||
if let Some(join_handle) = self.verification_worker_thread.take() {
|
||||
// ignore send error here <= destructing anyway
|
||||
let _ = self.verification_work_sender.send(VerificationTask::Stop);
|
||||
join_handle.join().expect("Clean shutdown.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Verifier for AsyncVerifier {
|
||||
/// Verify block
|
||||
fn verify_block(&self, block: Block) {
|
||||
self.verification_work_sender
|
||||
.send(VerificationTask::VerifyBlock(block))
|
||||
.expect("Verification thread have the same lifetime as `AsyncVerifier`");
|
||||
}
|
||||
|
||||
/// Verify transaction
|
||||
fn verify_transaction(&self, transaction: Transaction) {
|
||||
self.verification_work_sender
|
||||
.send(VerificationTask::VerifyTransaction(transaction))
|
||||
.expect("Verification thread have the same lifetime as `AsyncVerifier`");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use std::sync::Arc;
|
||||
use parking_lot::Mutex;
|
||||
use chain::{Block, Transaction};
|
||||
use synchronization_client::SynchronizationClientCore;
|
||||
use synchronization_executor::tests::DummyTaskExecutor;
|
||||
use super::{Verifier, VerificationSink};
|
||||
|
||||
pub struct DummyVerifier {
|
||||
sink: Arc<Mutex<SynchronizationClientCore<DummyTaskExecutor>>>,
|
||||
}
|
||||
|
||||
impl DummyVerifier {
|
||||
pub fn new(sink: Arc<Mutex<SynchronizationClientCore<DummyTaskExecutor>>>) -> Self {
|
||||
DummyVerifier {
|
||||
sink: sink,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Verifier for DummyVerifier {
|
||||
fn verify_block(&self, block: Block) {
|
||||
self.sink.lock().on_block_verification_success(block);
|
||||
}
|
||||
|
||||
fn verify_transaction(&self, transaction: Transaction) {
|
||||
self.sink.lock().on_transaction_verification_success(transaction);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue