Merge pull request #261 from ethcore/one_chain_verifier
share one chain verifier
This commit is contained in:
commit
9288aed976
|
@ -47,6 +47,7 @@ use std::sync::Arc;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use tokio_core::reactor::Handle;
|
use tokio_core::reactor::Handle;
|
||||||
use network::Magic;
|
use network::Magic;
|
||||||
|
use verification::ChainVerifier;
|
||||||
|
|
||||||
/// Sync errors.
|
/// Sync errors.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -81,11 +82,12 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S
|
||||||
threads_num: 4,
|
threads_num: 4,
|
||||||
};
|
};
|
||||||
|
|
||||||
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
|
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db.clone())));
|
||||||
|
let chain_verifier = Arc::new(ChainVerifier::new(db, network));
|
||||||
let sync_executor = SyncExecutor::new(sync_chain.clone());
|
let sync_executor = SyncExecutor::new(sync_chain.clone());
|
||||||
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
|
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
|
||||||
let sync_client_core = SynchronizationClientCore::new(sync_client_config, handle, sync_executor.clone(), sync_chain.clone(), network);
|
let sync_client_core = SynchronizationClientCore::new(sync_client_config, handle, sync_executor.clone(), sync_chain.clone(), chain_verifier.clone());
|
||||||
let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone());
|
let verifier = AsyncVerifier::new(chain_verifier, sync_chain, sync_client_core.clone());
|
||||||
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
|
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
|
||||||
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
|
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
|
||||||
SyncConnectionFactory::with_local_node(sync_node)
|
SyncConnectionFactory::with_local_node(sync_node)
|
||||||
|
|
|
@ -274,6 +274,7 @@ mod tests {
|
||||||
use synchronization_verifier::tests::DummyVerifier;
|
use synchronization_verifier::tests::DummyVerifier;
|
||||||
use tokio_core::reactor::{Core, Handle};
|
use tokio_core::reactor::{Core, Handle};
|
||||||
use primitives::bytes::Bytes;
|
use primitives::bytes::Bytes;
|
||||||
|
use verification::ChainVerifier;
|
||||||
|
|
||||||
struct DummyOutboundSyncConnection;
|
struct DummyOutboundSyncConnection;
|
||||||
|
|
||||||
|
@ -315,7 +316,8 @@ mod tests {
|
||||||
let executor = DummyTaskExecutor::new();
|
let executor = DummyTaskExecutor::new();
|
||||||
let server = Arc::new(DummyServer::new());
|
let server = Arc::new(DummyServer::new());
|
||||||
let config = Config { threads_num: 1, close_connection_on_bad_block: true };
|
let config = Config { threads_num: 1, close_connection_on_bad_block: true };
|
||||||
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Mainnet);
|
let chain_verifier = Arc::new(ChainVerifier::new(chain.read().storage(), Magic::Mainnet));
|
||||||
|
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), chain_verifier);
|
||||||
let mut verifier = DummyVerifier::default();
|
let mut verifier = DummyVerifier::default();
|
||||||
verifier.set_sink(client_core.clone());
|
verifier.set_sink(client_core.clone());
|
||||||
let client = SynchronizationClient::new(client_core, verifier);
|
let client = SynchronizationClient::new(client_core, verifier);
|
||||||
|
|
|
@ -30,7 +30,6 @@ use compact_block_builder::build_compact_block;
|
||||||
use hash_queue::HashPosition;
|
use hash_queue::HashPosition;
|
||||||
use miner::transaction_fee_rate;
|
use miner::transaction_fee_rate;
|
||||||
use verification::ChainVerifier;
|
use verification::ChainVerifier;
|
||||||
use network::Magic;
|
|
||||||
use time;
|
use time;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -298,8 +297,8 @@ pub struct SynchronizationClientCore<T: TaskExecutor> {
|
||||||
orphaned_blocks_pool: OrphanBlocksPool,
|
orphaned_blocks_pool: OrphanBlocksPool,
|
||||||
/// Orphaned transactions pool.
|
/// Orphaned transactions pool.
|
||||||
orphaned_transactions_pool: OrphanTransactionsPool,
|
orphaned_transactions_pool: OrphanTransactionsPool,
|
||||||
/// Network config
|
/// Chain verifier
|
||||||
network: Magic,
|
chain_verifier: Arc<ChainVerifier>,
|
||||||
/// Verify block headers?
|
/// Verify block headers?
|
||||||
verify_headers: bool,
|
verify_headers: bool,
|
||||||
/// Verifying blocks by peer
|
/// Verifying blocks by peer
|
||||||
|
@ -655,7 +654,6 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
|
|
||||||
// validate blocks headers before scheduling
|
// validate blocks headers before scheduling
|
||||||
let chain = self.chain.read();
|
let chain = self.chain.read();
|
||||||
let verifier = ChainVerifier::new(chain.storage(), self.network);
|
|
||||||
let mut block_header_provider = MessageBlockHeadersProvider::new(&*chain);
|
let mut block_header_provider = MessageBlockHeadersProvider::new(&*chain);
|
||||||
let mut blocks_hashes: Vec<H256> = Vec::with_capacity(blocks_headers.len());
|
let mut blocks_hashes: Vec<H256> = Vec::with_capacity(blocks_headers.len());
|
||||||
let mut prev_block_hash = header0.previous_header_hash.clone();
|
let mut prev_block_hash = header0.previous_header_hash.clone();
|
||||||
|
@ -669,7 +667,7 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
|
|
||||||
// verify header
|
// verify header
|
||||||
if self.verify_headers {
|
if self.verify_headers {
|
||||||
if let Err(error) = verifier.verify_block_header(&block_header_provider, &block_header_hash, &block_header) {
|
if let Err(error) = self.chain_verifier.verify_block_header(&block_header_provider, &block_header_hash, &block_header) {
|
||||||
warn!(target: "sync", "Error verifying header {:?} from peer#{} `headers` message: {:?}", block_header_hash.to_reversed_str(), peer_index, error);
|
warn!(target: "sync", "Error verifying header {:?} from peer#{} `headers` message: {:?}", block_header_hash.to_reversed_str(), peer_index, error);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1088,7 +1086,7 @@ impl<T> VerificationSink for SynchronizationClientCore<T> where T: TaskExecutor
|
||||||
|
|
||||||
impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
/// Create new synchronization client core
|
/// Create new synchronization client core
|
||||||
pub fn new(config: Config, handle: &Handle, executor: Arc<Mutex<T>>, chain: ChainRef, network: Magic) -> Arc<Mutex<Self>> {
|
pub fn new(config: Config, handle: &Handle, executor: Arc<Mutex<T>>, chain: ChainRef, chain_verifier: Arc<ChainVerifier>) -> Arc<Mutex<Self>> {
|
||||||
let sync = Arc::new(Mutex::new(
|
let sync = Arc::new(Mutex::new(
|
||||||
SynchronizationClientCore {
|
SynchronizationClientCore {
|
||||||
state: State::Saturated,
|
state: State::Saturated,
|
||||||
|
@ -1099,7 +1097,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
|
||||||
chain: chain.clone(),
|
chain: chain.clone(),
|
||||||
orphaned_blocks_pool: OrphanBlocksPool::new(),
|
orphaned_blocks_pool: OrphanBlocksPool::new(),
|
||||||
orphaned_transactions_pool: OrphanTransactionsPool::new(),
|
orphaned_transactions_pool: OrphanTransactionsPool::new(),
|
||||||
network: network,
|
chain_verifier: chain_verifier,
|
||||||
verify_headers: true,
|
verify_headers: true,
|
||||||
verifying_blocks_by_peer: HashMap::new(),
|
verifying_blocks_by_peer: HashMap::new(),
|
||||||
verifying_blocks_futures: HashMap::new(),
|
verifying_blocks_futures: HashMap::new(),
|
||||||
|
@ -1673,6 +1671,7 @@ pub mod tests {
|
||||||
use parking_lot::{Mutex, RwLock};
|
use parking_lot::{Mutex, RwLock};
|
||||||
use tokio_core::reactor::{Core, Handle};
|
use tokio_core::reactor::{Core, Handle};
|
||||||
use chain::{Block, Transaction};
|
use chain::{Block, Transaction};
|
||||||
|
use network::Magic;
|
||||||
use message::common::{InventoryVector, InventoryType};
|
use message::common::{InventoryVector, InventoryType};
|
||||||
use message::types;
|
use message::types;
|
||||||
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore, BlockAnnouncementType, MessageBlockHeadersProvider};
|
use super::{Client, Config, SynchronizationClient, SynchronizationClientCore, BlockAnnouncementType, MessageBlockHeadersProvider};
|
||||||
|
@ -1683,7 +1682,7 @@ pub mod tests {
|
||||||
use synchronization_verifier::tests::DummyVerifier;
|
use synchronization_verifier::tests::DummyVerifier;
|
||||||
use synchronization_server::ServerTaskIndex;
|
use synchronization_server::ServerTaskIndex;
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
use network::Magic;
|
use verification::ChainVerifier;
|
||||||
use p2p::event_loop;
|
use p2p::event_loop;
|
||||||
use test_data;
|
use test_data;
|
||||||
use db::{self, BlockHeaderProvider};
|
use db::{self, BlockHeaderProvider};
|
||||||
|
@ -1705,7 +1704,8 @@ pub mod tests {
|
||||||
let executor = DummyTaskExecutor::new();
|
let executor = DummyTaskExecutor::new();
|
||||||
let config = Config { threads_num: 1, close_connection_on_bad_block: true };
|
let config = Config { threads_num: 1, close_connection_on_bad_block: true };
|
||||||
|
|
||||||
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Testnet);
|
let chain_verifier = Arc::new(ChainVerifier::new(storage.clone(), Magic::Testnet));
|
||||||
|
let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), chain_verifier);
|
||||||
{
|
{
|
||||||
client_core.lock().verify_headers(false);
|
client_core.lock().verify_headers(false);
|
||||||
}
|
}
|
||||||
|
@ -2740,5 +2740,5 @@ pub mod tests {
|
||||||
|
|
||||||
let tasks = executor.lock().take_tasks();
|
let tasks = executor.lock().take_tasks();
|
||||||
assert_eq!(tasks, vec![Task::Close(0)]);
|
assert_eq!(tasks, vec![Task::Close(0)]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,9 +60,8 @@ struct EmptyTransactionOutputProvider {
|
||||||
|
|
||||||
impl AsyncVerifier {
|
impl AsyncVerifier {
|
||||||
/// Create new async verifier
|
/// Create new async verifier
|
||||||
pub fn new<T: VerificationSink>(network: Magic, chain: ChainRef, sink: Arc<Mutex<T>>) -> Self {
|
pub fn new<T: VerificationSink>(verifier: Arc<ChainVerifier>, chain: ChainRef, sink: Arc<Mutex<T>>) -> Self {
|
||||||
let (verification_work_sender, verification_work_receiver) = channel();
|
let (verification_work_sender, verification_work_receiver) = channel();
|
||||||
let verifier = ChainVerifier::new(chain.read().storage(), network);
|
|
||||||
AsyncVerifier {
|
AsyncVerifier {
|
||||||
verification_work_sender: verification_work_sender,
|
verification_work_sender: verification_work_sender,
|
||||||
verification_worker_thread: Some(thread::Builder::new()
|
verification_worker_thread: Some(thread::Builder::new()
|
||||||
|
@ -75,7 +74,7 @@ impl AsyncVerifier {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Thread procedure for handling verification tasks
|
/// Thread procedure for handling verification tasks
|
||||||
fn verification_worker_proc<T: VerificationSink>(sink: Arc<Mutex<T>>, chain: ChainRef, verifier: ChainVerifier, work_receiver: Receiver<VerificationTask>) {
|
fn verification_worker_proc<T: VerificationSink>(sink: Arc<Mutex<T>>, chain: ChainRef, verifier: Arc<ChainVerifier>, work_receiver: Receiver<VerificationTask>) {
|
||||||
while let Ok(task) = work_receiver.recv() {
|
while let Ok(task) = work_receiver.recv() {
|
||||||
match task {
|
match task {
|
||||||
VerificationTask::Stop => break,
|
VerificationTask::Stop => break,
|
||||||
|
|
Loading…
Reference in New Issue