From bd67160689f48cfb0baf25e652df3a1b897f4246 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 18 Nov 2016 11:52:45 +0300 Subject: [PATCH 1/4] separate sync verifier --- sync/src/lib.rs | 10 +- sync/src/local_node.rs | 17 +- sync/src/synchronization_chain.rs | 7 + sync/src/synchronization_client.rs | 976 +++++++++++++-------------- sync/src/synchronization_verifier.rs | 179 +++++ 5 files changed, 691 insertions(+), 498 deletions(-) create mode 100644 sync/src/synchronization_verifier.rs diff --git a/sync/src/lib.rs b/sync/src/lib.rs index c03f214f..8b9d490e 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -31,6 +31,7 @@ mod synchronization_executor; mod synchronization_manager; mod synchronization_peers; mod synchronization_server; +mod synchronization_verifier; use std::sync::Arc; use parking_lot::RwLock; @@ -61,11 +62,18 @@ pub fn create_sync_connection_factory(handle: &Handle, consensus_params: Consens use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory; use synchronization_server::SynchronizationServer; use synchronization_client::{SynchronizationClient, Config as SynchronizationConfig}; + use synchronization_verifier::AsyncVerifier; let sync_chain = Arc::new(RwLock::new(SyncChain::new(db))); let sync_executor = SyncExecutor::new(sync_chain.clone()); let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone())); - let sync_client = SynchronizationClient::new(SynchronizationConfig::with_consensus_params(consensus_params), handle, sync_executor.clone(), sync_chain); + let sync_client = SynchronizationClient::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone()); + { + let verifier_sink = sync_client.lock().core(); + let verifier = AsyncVerifier::new(consensus_params, sync_chain, verifier_sink); + sync_client.lock().set_verifier(verifier); + } + let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor)); SyncConnectionFactory::with_local_node(sync_node) } diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index a51d9836..73c30ac1 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -9,9 +9,10 @@ use message::types; use synchronization_client::{Client, SynchronizationClient}; use synchronization_executor::{Task as SynchronizationTask, TaskExecutor as SynchronizationTaskExecutor, LocalSynchronizationTaskExecutor}; use synchronization_server::{Server, SynchronizationServer}; +use synchronization_verifier::AsyncVerifier; use primitives::hash::H256; -pub type LocalNodeRef = Arc>>; +pub type LocalNodeRef = Arc>>; /// Local synchronization node pub struct LocalNode (Core, Handle, Arc>, Arc, LocalNode>) { + fn create_local_node() -> (Core, Handle, Arc>, Arc, LocalNode>) { let event_loop = event_loop(); let handle = event_loop.handle(); let chain = Arc::new(RwLock::new(Chain::new(Arc::new(db::TestStorage::with_genesis_block())))); let executor = DummyTaskExecutor::new(); let server = Arc::new(DummyServer::new()); - let config = Config { consensus_params: ConsensusParams::with_magic(Magic::Mainnet), threads_num: 1, skip_verification: true }; - let client = SynchronizationClient::new(config, &handle, executor.clone(), chain); + let config = Config { threads_num: 1 }; + let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone()); + { + let verifier_sink = client.lock().core(); + let verifier = DummyVerifier::new(verifier_sink); + client.lock().set_verifier(verifier); + } let local_node = LocalNode::new(server.clone(), client, executor.clone()); (event_loop, handle, executor, server, local_node) } diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 26c1bd78..f3e17d94 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -280,6 +280,13 @@ impl Chain { self.hash_chain.push_back_at(VERIFYING_QUEUE, hash); } + /// Add blocks to verifying queue + pub fn verify_blocks(&mut self, blocks: Vec<(H256, BlockHeader)>) { + for (hash, header) in blocks { + self.verify_block(hash, header); + } + } + /// Moves n blocks from requested queue to verifying queue #[cfg(test)] pub fn verify_blocks_hashes(&mut self, n: u32) -> Vec { diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index be4d8e4b..96563c8a 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -1,9 +1,7 @@ -use std::thread; use std::sync::Arc; use std::cmp::{min, max}; use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::hash_map::Entry; -use std::sync::mpsc::{channel, Sender, Receiver}; use parking_lot::{Mutex, Condvar}; use futures::{BoxFuture, Future, finished}; use futures::stream::Stream; @@ -11,20 +9,19 @@ use tokio_core::reactor::{Handle, Interval}; use futures_cpupool::CpuPool; use db; use chain::{Block, BlockHeader, Transaction, RepresentH256}; -use message::common::ConsensusParams; use primitives::hash::H256; use synchronization_peers::Peers; #[cfg(test)] use synchronization_peers::{Information as PeersInformation}; use synchronization_chain::{ChainRef, BlockState, TransactionState, HeadersIntersection, BlockInsertionResult}; #[cfg(test)] use synchronization_chain::{Information as ChainInformation}; -use verification::{ChainVerifier, Verify}; use synchronization_executor::{Task, TaskExecutor}; use orphan_blocks_pool::OrphanBlocksPool; use orphan_transactions_pool::OrphanTransactionsPool; use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory, manage_unknown_orphaned_blocks, manage_orphaned_transactions, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig}; +use synchronization_verifier::{Verifier, VerificationSink}; use hash_queue::HashPosition; use time; use std::time::Duration; @@ -179,16 +176,6 @@ pub struct Information { pub orphaned_transactions: usize, } -/// Verification thread tasks -enum VerificationTask { - /// Verify single block - VerifyBlock(Block), - /// Verify single transaction - VerifyTransaction(Transaction), - /// Stop verification thread - Stop, -} - /// Synchronization client trait pub trait Client : Send + 'static { fn best_block(&self) -> db::BestBlock; @@ -201,10 +188,22 @@ pub trait Client : Send + 'static { fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction); fn on_peer_disconnected(&mut self, peer_index: usize); fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option>); - fn on_block_verification_success(&mut self, block: Block); - fn on_block_verification_error(&mut self, err: &str, hash: &H256); - fn on_transaction_verification_success(&mut self, transaction: Transaction); - fn on_transaction_verification_error(&mut self, err: &str, hash: &H256); +} + +/// Synchronization client trait +pub trait ClientCore : VerificationSink { + fn best_block(&self) -> db::BestBlock; + fn state(&self) -> State; + fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec); + fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec); + fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec); + fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec); + fn on_peer_block(&mut self, peer_index: usize, block: Block) -> Option>; + fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction) -> Option>; + fn on_peer_disconnected(&mut self, peer_index: usize); + fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option>); + fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option>); + fn try_switch_to_saturated_state(&mut self) -> bool; } /// Synchronization peer blocks waiter @@ -218,18 +217,20 @@ pub struct PeersBlocksWaiter { /// Synchronization client configuration options. pub struct Config { - /// Consensus-related parameters. - pub consensus_params: ConsensusParams, /// Number of threads to allocate in synchronization CpuPool. pub threads_num: usize, -/// Do not verify incoming blocks before inserting to db. - pub skip_verification: bool, +} + +/// Synchronization client facade +pub struct SynchronizationClient { + /// Client + client: Arc>>, + /// Verifier + verifier: Option>, } /// Synchronization client. -pub struct SynchronizationClient { - /// Synchronization configuration. - config: Config, +pub struct SynchronizationClientCore { /// Synchronization state. state: State, /// Cpu pool. @@ -246,10 +247,6 @@ pub struct SynchronizationClient { orphaned_blocks_pool: OrphanBlocksPool, /// Orphaned transactions pool. orphaned_transactions_pool: OrphanTransactionsPool, - /// Verification work transmission channel. - verification_work_sender: Option>, - /// Verification thread. - verification_worker_thread: Option>, /// Verifying blocks by peer verifying_blocks_by_peer: HashMap, /// Verifying blocks waiters @@ -257,11 +254,9 @@ pub struct SynchronizationClient { } impl Config { - pub fn with_consensus_params(consensus_params: ConsensusParams) -> Self { + pub fn new() -> Self { Config { - consensus_params: consensus_params, threads_num: 4, - skip_verification: false, } } } @@ -289,20 +284,106 @@ impl State { } } -impl Drop for SynchronizationClient where T: TaskExecutor { - fn drop(&mut self) { - if let Some(join_handle) = self.verification_worker_thread.take() { - // ignore send error here <= destructing anyway - let _ = self.verification_work_sender - .take() - .expect("Some(join_handle) => Some(verification_work_sender)") - .send(VerificationTask::Stop); - join_handle.join().expect("Clean shutdown."); +impl Client for SynchronizationClient where T: TaskExecutor, U: Verifier { + fn best_block(&self) -> db::BestBlock { + self.client.lock().best_block() + } + + fn state(&self) -> State { + self.client.lock().state() + } + + fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec) { + self.client.lock().on_new_blocks_inventory(peer_index, blocks_hashes) + } + + fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec) { + self.client.lock().on_new_transactions_inventory(peer_index, transactions_hashes) + } + + fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec) { + self.client.lock().on_new_blocks_headers(peer_index, blocks_headers); + } + + fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec) { + self.client.lock().on_peer_blocks_notfound(peer_index, blocks_hashes); + } + + fn on_peer_block(&mut self, peer_index: usize, block: Block) { + let blocks_to_verify = { self.client.lock().on_peer_block(peer_index, block) }; + + // verify selected blocks + if let Some(mut blocks_to_verify) = blocks_to_verify { + while let Some((_, block)) = blocks_to_verify.pop_front() { + // schedule verification + match self.verifier { + Some(ref verifier) => verifier.verify_block(block), + None => panic!("call set_verifier after construction"), + } + } } + + // try to switch to saturated state OR execute sync tasks + { + let mut client = self.client.lock(); + if !client.try_switch_to_saturated_state() { + client.execute_synchronization_tasks(None); + } + } + } + + fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction) { + let transactions_to_verify = { self.client.lock().on_peer_transaction(peer_index, transaction) }; + + if let Some(mut transactions_to_verify) = transactions_to_verify { + while let Some((_, tx)) = transactions_to_verify.pop_front() { + // schedule verification + match self.verifier { + Some(ref verifier) => verifier.verify_transaction(tx), + None => panic!("call set_verifier after construction"), + } + } + } + } + + fn on_peer_disconnected(&mut self, peer_index: usize) { + self.client.lock().on_peer_disconnected(peer_index); + } + + fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option>) { + self.client.lock().get_peers_nearly_blocks_waiter(peer_index) } } -impl Client for SynchronizationClient where T: TaskExecutor { +impl SynchronizationClient where T: TaskExecutor, U: Verifier { + /// Create new synchronization client + pub fn new(config: Config, handle: &Handle, executor: Arc>, chain: ChainRef) -> Arc> { + Arc::new(Mutex::new( + SynchronizationClient { + client: SynchronizationClientCore::new(config, handle, executor, chain), + verifier: None, + } + )) + } + + /// Get client core + pub fn core(&self) -> Arc>> { + self.client.clone() + } + + /// Set verifier (TODO: use builder && check in build instead) + pub fn set_verifier(&mut self, verifier: U) { + self.verifier = Some(Box::new(verifier)); + } + + /// Get information on current synchronization state. + #[cfg(test)] + pub fn information(&self) -> Information { + self.client.lock().information() + } +} + +impl ClientCore for SynchronizationClientCore where T: TaskExecutor { /// Get best known block fn best_block(&self) -> db::BestBlock { self.chain.read().best_block() @@ -423,19 +504,18 @@ impl Client for SynchronizationClient where T: TaskExecutor { } /// Process new block. - fn on_peer_block(&mut self, peer_index: usize, block: Block) { + fn on_peer_block(&mut self, peer_index: usize, block: Block) -> Option> { let block_hash = block.hash(); // update peers to select next tasks self.peers.on_block_received(peer_index, &block_hash); - self.process_peer_block(peer_index, block_hash, block); - self.execute_synchronization_tasks(None); + self.process_peer_block(peer_index, block_hash, block) } /// Process new transaction. - fn on_peer_transaction(&mut self, _peer_index: usize, transaction: Transaction) { - self.process_peer_transaction(transaction); + fn on_peer_transaction(&mut self, _peer_index: usize, transaction: Transaction) -> Option> { + self.process_peer_transaction(transaction) } /// Peer disconnected. @@ -449,7 +529,7 @@ impl Client for SynchronizationClient where T: TaskExecutor { } } - /// Get waiter to wait until peer blocks are processed in nearly synchronized state + /// Get waiter for verifying blocks fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option>) { // if we are currently synchronizing => no need to wait if self.state.is_synchronizing() { @@ -469,383 +549,6 @@ impl Client for SynchronizationClient where T: TaskExecutor { } } - /// Process successful block verification - fn on_block_verification_success(&mut self, block: Block) { - let hash = block.hash(); - // insert block to the storage - match { - let mut chain = self.chain.write(); - - // remove block from verification queue - // header is removed in `insert_best_block` call - // or it is removed earlier, when block was removed from the verifying queue - if chain.forget_block_with_state_leave_header(&hash, BlockState::Verifying) != HashPosition::Missing { - // block was in verification queue => insert to storage - chain.insert_best_block(hash.clone(), &block) - } else { - Ok(BlockInsertionResult::default()) - } - } { - Ok(insert_result) => { - // awake threads, waiting for this block insertion - self.awake_waiting_threads(&hash); - - // continue with synchronization - self.execute_synchronization_tasks(None); - - // relay block to our peers - if self.state.is_saturated() { - // TODO: Task::BroadcastBlock - } - - // deal with block transactions - for (_, tx) in insert_result.transactions_to_reverify { - self.process_peer_transaction(tx) - } - }, - Err(db::Error::Consistency(e)) => { - // process as verification error - self.on_block_verification_error(&format!("{:?}", db::Error::Consistency(e)), &hash); - }, - Err(e) => { - // process as irrecoverable failure - panic!("Block {:?} insertion failed with error {:?}", hash, e); - } - } - } - - /// Process failed block verification - fn on_block_verification_error(&mut self, err: &str, hash: &H256) { - warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash.to_reversed_str(), err); - - { - let mut chain = self.chain.write(); - - // forget for this block and all its children - // headers are also removed as they all are invalid - chain.forget_block_with_children(hash); - } - - // awake threads, waiting for this block insertion - self.awake_waiting_threads(hash); - - // start new tasks - self.execute_synchronization_tasks(None); - } - - /// Process successful transaction verification - fn on_transaction_verification_success(&mut self, transaction: Transaction) { - let hash = transaction.hash(); - // insert transaction to the memory pool - let mut chain = self.chain.write(); - - // remove transaction from verification queue - // if it is not in the queue => it was removed due to error or reorganization - if !chain.forget_verifying_transaction(&hash) { - return; - } - - // transaction was in verification queue => insert to memory pool - chain.insert_verified_transaction(transaction); - } - - /// Process failed transaction verification - fn on_transaction_verification_error(&mut self, err: &str, hash: &H256) { - warn!(target: "sync", "Transaction {:?} verification failed with error {:?}", hash.to_reversed_str(), err); - - { - let mut chain = self.chain.write(); - - // forget for this transaction and all its children - chain.forget_verifying_transaction_with_children(hash); - } - } -} - -impl SynchronizationClient where T: TaskExecutor { - /// Create new synchronization window - pub fn new(config: Config, handle: &Handle, executor: Arc>, chain: ChainRef) -> Arc> { - let skip_verification = config.skip_verification; - let sync = Arc::new(Mutex::new( - SynchronizationClient { - state: State::Saturated, - peers: Peers::new(), - pool: CpuPool::new(config.threads_num), - management_worker: None, - executor: executor, - chain: chain.clone(), - orphaned_blocks_pool: OrphanBlocksPool::new(), - orphaned_transactions_pool: OrphanTransactionsPool::new(), - verification_work_sender: None, - verification_worker_thread: None, - verifying_blocks_by_peer: HashMap::new(), - verifying_blocks_waiters: HashMap::new(), - config: config, - } - )); - - if !skip_verification { - let (verification_work_sender, verification_work_receiver) = channel(); - let csync = sync.clone(); - let mut lsync = sync.lock(); - let storage = chain.read().storage(); - let verifier = ChainVerifier::new(storage); - lsync.verification_work_sender = Some(verification_work_sender); - lsync.verification_worker_thread = Some(thread::Builder::new() - .name("Sync verification thread".to_string()) - .spawn(move || { - SynchronizationClient::verification_worker_proc(csync, verifier, verification_work_receiver) - }) - .expect("Error creating verification thread")); - } - - // TODO: start management worker only when synchronization is started - // currently impossible because there is no way to call Interval::new with Remote && Handle is not-Send - { - let peers_config = ManagePeersConfig::default(); - let unknown_config = ManageUnknownBlocksConfig::default(); - let orphan_config = ManageOrphanTransactionsConfig::default(); - let csync = Arc::downgrade(&sync); - let mut sync = sync.lock(); - let management_worker = Interval::new(Duration::from_millis(MANAGEMENT_INTERVAL_MS), handle) - .expect("Failed to create interval") - .and_then(move |_| { - let client = match csync.upgrade() { - Some(client) => client, - None => return Ok(()), - }; - let mut client = client.lock(); - client.print_synchronization_information(); - if client.state.is_synchronizing() || client.state.is_nearly_saturated() { - let blocks_to_request = manage_synchronization_peers_blocks(&peers_config, &mut client.peers); - client.execute_synchronization_tasks(blocks_to_request); - - manage_synchronization_peers_inventory(&peers_config, &mut client.peers); - manage_orphaned_transactions(&orphan_config, &mut client.orphaned_transactions_pool); - if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, &mut client.orphaned_blocks_pool) { - let mut chain = client.chain.write(); - for orphan_to_remove in orphans_to_remove { - chain.forget_block(&orphan_to_remove); - } - } - } - Ok(()) - }) - .for_each(|_| Ok(())) - .then(|_| finished::<(), ()>(())) - .boxed(); - sync.management_worker = Some(sync.pool.spawn(management_worker).boxed()); - } - - sync - } - - /// Get information on current synchronization state. - #[cfg(test)] - pub fn information(&self) -> Information { - Information { - state: self.state, - peers: self.peers.information(), - chain: self.chain.read().information(), - orphaned_blocks: self.orphaned_blocks_pool.len(), - orphaned_transactions: self.orphaned_transactions_pool.len(), - } - } - - /// Get configuration parameters. - pub fn config(&self) -> &Config { - &self.config - } - - /// Process new blocks inventory - fn process_new_blocks_headers(&mut self, peer_index: usize, mut hashes: Vec, mut headers: Vec) { - assert_eq!(hashes.len(), headers.len()); - - let mut chain = self.chain.write(); - match chain.intersect_with_blocks_headers(&hashes, &headers) { - HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => { - warn!(target: "sync", "Ignoring {} headers from peer#{}. Unknown and we are synchronizing.", headers.len(), peer_index); - }, - HeadersIntersection::DbAllBlocksKnown => { - trace!(target: "sync", "Ignoring {} headers from peer#{}. All blocks are known and in database.", headers.len(), peer_index); - if self.state.is_synchronizing() { - // remember peer as useful - self.peers.useful_peer(peer_index); - } - }, - HeadersIntersection::InMemoryNoNewBlocks => { - trace!(target: "sync", "Ignoring {} headers from peer#{}. All blocks are known and in memory.", headers.len(), peer_index); - // remember peer as useful - self.peers.useful_peer(peer_index); - }, - HeadersIntersection::InMemoryMainNewBlocks(new_block_index) - | HeadersIntersection::InMemoryForkNewBlocks(new_block_index) - | HeadersIntersection::DbForkNewBlocks(new_block_index) - | HeadersIntersection::NoKnownBlocks(new_block_index) => { - // schedule new blocks - let new_blocks_hashes = hashes.split_off(new_block_index); - let new_blocks_headers = headers.split_off(new_block_index); - let new_blocks_hashes_len = new_blocks_hashes.len(); - trace!( - target: "sync", "New {} headers from peer#{}. First {:?}, last: {:?}", - new_blocks_hashes_len, - peer_index, - new_blocks_hashes[0].to_reversed_str(), - new_blocks_hashes[new_blocks_hashes_len - 1].to_reversed_str() - ); - chain.schedule_blocks_headers(new_blocks_hashes, new_blocks_headers); - // remember peer as useful - self.peers.useful_peer(peer_index); - // switch to synchronization state - if !self.state.is_synchronizing() { - // TODO: NearlySaturated should start when we are in Saturated state && count(new_blocks_headers) is < LIMIT (LIMIT > 1) - if new_blocks_hashes_len == 1 && !self.state.is_nearly_saturated() { - self.state = State::NearlySaturated; - } - else { - self.state = State::Synchronizing(time::precise_time_s(), chain.best_storage_block().number); - } - } - } - } - } - - /// Process new peer block - fn process_peer_block(&mut self, peer_index: usize, block_hash: H256, block: Block) { - let switch_to_saturated = { - let mut chain = self.chain.write(); - match chain.block_state(&block_hash) { - BlockState::Verifying | BlockState::Stored => { - // remember peer as useful - self.peers.useful_peer(peer_index); - }, - BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => { - // check parent block state - match chain.block_state(&block.block_header.previous_header_hash) { - BlockState::Unknown => { - if self.state.is_synchronizing() { - // when synchronizing, we tend to receive all blocks in-order - trace!( - target: "sync", - "Ignoring block {} from peer#{}, because its parent is unknown and we are synchronizing", - block_hash.to_reversed_str(), - peer_index - ); - // remove block from current queue - chain.forget_block(&block_hash); - // remove orphaned blocks - let removed_blocks_hashes: Vec<_> = self.orphaned_blocks_pool.remove_blocks_for_parent(&block_hash).into_iter().map(|t| t.0).collect(); - chain.forget_blocks_leave_header(&removed_blocks_hashes); - } else { - // remove this block from the queue - chain.forget_block_leave_header(&block_hash); - // remember this block as unknown - self.orphaned_blocks_pool.insert_unknown_block(block_hash, block); - } - }, - BlockState::Verifying | BlockState::Stored => { - // remember peer as useful - self.peers.useful_peer(peer_index); - // schedule verification - let mut blocks: VecDeque<(H256, Block)> = VecDeque::new(); - blocks.push_back((block_hash.clone(), block)); - blocks.extend(self.orphaned_blocks_pool.remove_blocks_for_parent(&block_hash)); - // forget blocks we are going to process - let blocks_hashes_to_forget: Vec<_> = blocks.iter().map(|t| t.0.clone()).collect(); - chain.forget_blocks_leave_header(&blocks_hashes_to_forget); - while let Some((block_hash, block)) = blocks.pop_front() { - match self.verification_work_sender { - Some(ref verification_work_sender) => { - // remember that we are verifying block from this peer - self.verifying_blocks_by_peer.insert(block_hash.clone(), peer_index); - match self.verifying_blocks_waiters.entry(peer_index) { - Entry::Occupied(mut entry) => { - entry.get_mut().0.insert(block_hash.clone()); - }, - Entry::Vacant(entry) => { - let mut block_hashes = HashSet::new(); - block_hashes.insert(block_hash.clone()); - entry.insert((block_hashes, None)); - } - } - // append to verifying queue - chain.verify_block(block_hash.clone(), block.block_header.clone()); - // schedule verification - verification_work_sender - .send(VerificationTask::VerifyBlock(block)) - .expect("Verification thread have the same lifetime as `Synchronization`"); - }, - None => { - // insert to the storage + forget block header - chain.insert_best_block(block_hash.clone(), &block) - .expect("Error inserting to db."); - }, - } - } - }, - BlockState::Requested | BlockState::Scheduled => { - // remember peer as useful - self.peers.useful_peer(peer_index); - // remember as orphan block - self.orphaned_blocks_pool.insert_orphaned_block(block_hash, block); - } - } - }, - } - - // requested block is received => move to saturated state if there are no more blocks - chain.length_of_blocks_state(BlockState::Scheduled) == 0 - && chain.length_of_blocks_state(BlockState::Requested) == 0 - }; - - if switch_to_saturated { - self.switch_to_saturated_state(); - } - } - - /// Process new peer transaction - fn process_peer_transaction(&mut self, transaction: Transaction) { - // if we are in synchronization state, we will ignore this message - if self.state.is_synchronizing() { - return; - } - - // else => verify transaction + it's orphans and then add to the memory pool - let hash = transaction.hash(); - let mut chain = self.chain.write(); - - // if any parent transaction is unknown => we have orphan transaction => remember in orphan pool - let unknown_parents: HashSet = transaction.inputs.iter() - .filter(|input| chain.transaction_state(&input.previous_output.hash) == TransactionState::Unknown) - .map(|input| input.previous_output.hash.clone()) - .collect(); - if !unknown_parents.is_empty() { - self.orphaned_transactions_pool.insert(hash, transaction, unknown_parents); - return; - } - - // else verify && insert this transaction && all dependent orphans - let mut transactons: VecDeque<(H256, Transaction)> = VecDeque::new(); - transactons.push_back((hash.clone(), transaction)); - transactons.extend(self.orphaned_transactions_pool.remove_transactions_for_parent(&hash)); - while let Some((tx_hash, tx)) = transactons.pop_front() { - match self.verification_work_sender { - Some(ref verification_work_sender) => { - // append to verifying queue - chain.verify_transaction(tx_hash.clone(), tx.clone()); - // schedule verification - verification_work_sender - .send(VerificationTask::VerifyTransaction(tx)) - .expect("Verification thread have the same lifetime as `Synchronization`"); - }, - None => { - // insert to the memory pool - chain.insert_verified_transaction(tx); - }, - } - } - } - /// Schedule new synchronization tasks, if any. fn execute_synchronization_tasks(&mut self, forced_blocks_requests: Option>) { let mut tasks: Vec = Vec::new(); @@ -911,6 +614,348 @@ impl SynchronizationClient where T: TaskExecutor { } } + fn try_switch_to_saturated_state(&mut self) -> bool { + let switch_to_saturated = { + let chain = self.chain.read(); + + // requested block is received => move to saturated state if there are no more blocks + chain.length_of_blocks_state(BlockState::Scheduled) == 0 + && chain.length_of_blocks_state(BlockState::Requested) == 0 + }; + + if switch_to_saturated { + self.switch_to_saturated_state(); + } + + switch_to_saturated + } +} + +impl VerificationSink for SynchronizationClientCore where T: TaskExecutor { + /// Process successful block verification + fn on_block_verification_success(&mut self, block: Block) { + let hash = block.hash(); + // insert block to the storage + match { + let mut chain = self.chain.write(); + + // remove block from verification queue + // header is removed in `insert_best_block` call + // or it is removed earlier, when block was removed from the verifying queue + if chain.forget_block_with_state_leave_header(&hash, BlockState::Verifying) != HashPosition::Missing { + // block was in verification queue => insert to storage + chain.insert_best_block(hash.clone(), &block) + } else { + Ok(BlockInsertionResult::default()) + } + } { + Ok(insert_result) => { + // awake threads, waiting for this block insertion + self.awake_waiting_threads(&hash); + + // continue with synchronization + self.execute_synchronization_tasks(None); + + // relay block to our peers + if self.state.is_saturated() { + // TODO: Task::BroadcastBlock + } + + // deal with block transactions + for (_, tx) in insert_result.transactions_to_reverify { + self.process_peer_transaction(tx); + } + }, + Err(db::Error::Consistency(e)) => { + // process as verification error + self.on_block_verification_error(&format!("{:?}", db::Error::Consistency(e)), &hash); + }, + Err(e) => { + // process as irrecoverable failure + panic!("Block {:?} insertion failed with error {:?}", hash, e); + } + } + } + + /// Process failed block verification + fn on_block_verification_error(&mut self, err: &str, hash: &H256) { + warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash.to_reversed_str(), err); + + { + let mut chain = self.chain.write(); + + // forget for this block and all its children + // headers are also removed as they all are invalid + chain.forget_block_with_children(hash); + } + + // awake threads, waiting for this block insertion + self.awake_waiting_threads(hash); + + // start new tasks + self.execute_synchronization_tasks(None); + } + + /// Process successful transaction verification + fn on_transaction_verification_success(&mut self, transaction: Transaction) { + let hash = transaction.hash(); + // insert transaction to the memory pool + let mut chain = self.chain.write(); + + // remove transaction from verification queue + // if it is not in the queue => it was removed due to error or reorganization + if !chain.forget_verifying_transaction(&hash) { + return; + } + + // transaction was in verification queue => insert to memory pool + chain.insert_verified_transaction(transaction); + } + + /// Process failed transaction verification + fn on_transaction_verification_error(&mut self, err: &str, hash: &H256) { + warn!(target: "sync", "Transaction {:?} verification failed with error {:?}", hash.to_reversed_str(), err); + + { + let mut chain = self.chain.write(); + + // forget for this transaction and all its children + chain.forget_verifying_transaction_with_children(hash); + } + } +} + +impl SynchronizationClientCore where T: TaskExecutor { + /// Create new synchronization client core + pub fn new(config: Config, handle: &Handle, executor: Arc>, chain: ChainRef) -> Arc> { + let sync = Arc::new(Mutex::new( + SynchronizationClientCore { + state: State::Saturated, + peers: Peers::new(), + pool: CpuPool::new(config.threads_num), + management_worker: None, + executor: executor, + chain: chain.clone(), + orphaned_blocks_pool: OrphanBlocksPool::new(), + orphaned_transactions_pool: OrphanTransactionsPool::new(), + verifying_blocks_by_peer: HashMap::new(), + verifying_blocks_waiters: HashMap::new(), + } + )); + + // TODO: start management worker only when synchronization is started + // currently impossible because there is no way to call Interval::new with Remote && Handle is not-Send + { + let peers_config = ManagePeersConfig::default(); + let unknown_config = ManageUnknownBlocksConfig::default(); + let orphan_config = ManageOrphanTransactionsConfig::default(); + let csync = Arc::downgrade(&sync); + let mut sync = sync.lock(); + let management_worker = Interval::new(Duration::from_millis(MANAGEMENT_INTERVAL_MS), handle) + .expect("Failed to create interval") + .and_then(move |_| { + let client = match csync.upgrade() { + Some(client) => client, + None => return Ok(()), + }; + let mut client = client.lock(); + client.print_synchronization_information(); + if client.state.is_synchronizing() || client.state.is_nearly_saturated() { + let blocks_to_request = manage_synchronization_peers_blocks(&peers_config, &mut client.peers); + client.execute_synchronization_tasks(blocks_to_request); + + manage_synchronization_peers_inventory(&peers_config, &mut client.peers); + manage_orphaned_transactions(&orphan_config, &mut client.orphaned_transactions_pool); + if let Some(orphans_to_remove) = manage_unknown_orphaned_blocks(&unknown_config, &mut client.orphaned_blocks_pool) { + let mut chain = client.chain.write(); + for orphan_to_remove in orphans_to_remove { + chain.forget_block(&orphan_to_remove); + } + } + } + Ok(()) + }) + .for_each(|_| Ok(())) + .then(|_| finished::<(), ()>(())) + .boxed(); + sync.management_worker = Some(sync.pool.spawn(management_worker).boxed()); + } + + sync + } + + /// Get information on current synchronization state. + #[cfg(test)] + pub fn information(&self) -> Information { + Information { + state: self.state, + peers: self.peers.information(), + chain: self.chain.read().information(), + orphaned_blocks: self.orphaned_blocks_pool.len(), + orphaned_transactions: self.orphaned_transactions_pool.len(), + } + } + + /// Process new blocks inventory + fn process_new_blocks_headers(&mut self, peer_index: usize, mut hashes: Vec, mut headers: Vec) { + assert_eq!(hashes.len(), headers.len()); + + let mut chain = self.chain.write(); + match chain.intersect_with_blocks_headers(&hashes, &headers) { + HeadersIntersection::NoKnownBlocks(_) if self.state.is_synchronizing() => { + warn!(target: "sync", "Ignoring {} headers from peer#{}. Unknown and we are synchronizing.", headers.len(), peer_index); + }, + HeadersIntersection::DbAllBlocksKnown => { + trace!(target: "sync", "Ignoring {} headers from peer#{}. All blocks are known and in database.", headers.len(), peer_index); + if self.state.is_synchronizing() { + // remember peer as useful + self.peers.useful_peer(peer_index); + } + }, + HeadersIntersection::InMemoryNoNewBlocks => { + trace!(target: "sync", "Ignoring {} headers from peer#{}. All blocks are known and in memory.", headers.len(), peer_index); + // remember peer as useful + self.peers.useful_peer(peer_index); + }, + HeadersIntersection::InMemoryMainNewBlocks(new_block_index) + | HeadersIntersection::InMemoryForkNewBlocks(new_block_index) + | HeadersIntersection::DbForkNewBlocks(new_block_index) + | HeadersIntersection::NoKnownBlocks(new_block_index) => { + // schedule new blocks + let new_blocks_hashes = hashes.split_off(new_block_index); + let new_blocks_headers = headers.split_off(new_block_index); + let new_blocks_hashes_len = new_blocks_hashes.len(); + trace!( + target: "sync", "New {} headers from peer#{}. First {:?}, last: {:?}", + new_blocks_hashes_len, + peer_index, + new_blocks_hashes[0].to_reversed_str(), + new_blocks_hashes[new_blocks_hashes_len - 1].to_reversed_str() + ); + chain.schedule_blocks_headers(new_blocks_hashes, new_blocks_headers); + // remember peer as useful + self.peers.useful_peer(peer_index); + // switch to synchronization state + if !self.state.is_synchronizing() { + // TODO: NearlySaturated should start when we are in Saturated state && count(new_blocks_headers) is < LIMIT (LIMIT > 1) + if new_blocks_hashes_len == 1 && !self.state.is_nearly_saturated() { + self.state = State::NearlySaturated; + } + else { + self.state = State::Synchronizing(time::precise_time_s(), chain.best_storage_block().number); + } + } + } + } + } + + /// Process new peer block + fn process_peer_block(&mut self, peer_index: usize, block_hash: H256, block: Block) -> Option> { + // prepare list of blocks to verify + make all required changes to the chain + let mut result: Option> = None; + let mut chain = self.chain.write(); + match chain.block_state(&block_hash) { + BlockState::Verifying | BlockState::Stored => { + // remember peer as useful + self.peers.useful_peer(peer_index); + }, + BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => { + // check parent block state + match chain.block_state(&block.block_header.previous_header_hash) { + BlockState::Unknown => { + if self.state.is_synchronizing() { + // when synchronizing, we tend to receive all blocks in-order + trace!( + target: "sync", + "Ignoring block {} from peer#{}, because its parent is unknown and we are synchronizing", + block_hash.to_reversed_str(), + peer_index + ); + // remove block from current queue + chain.forget_block(&block_hash); + // remove orphaned blocks + let removed_blocks_hashes: Vec<_> = self.orphaned_blocks_pool.remove_blocks_for_parent(&block_hash).into_iter().map(|t| t.0).collect(); + chain.forget_blocks_leave_header(&removed_blocks_hashes); + } else { + // remove this block from the queue + chain.forget_block_leave_header(&block_hash); + // remember this block as unknown + self.orphaned_blocks_pool.insert_unknown_block(block_hash, block); + } + }, + BlockState::Verifying | BlockState::Stored => { + // remember peer as useful + self.peers.useful_peer(peer_index); + // schedule verification + let mut blocks_to_verify: VecDeque<(H256, Block)> = VecDeque::new(); + blocks_to_verify.push_back((block_hash.clone(), block)); + blocks_to_verify.extend(self.orphaned_blocks_pool.remove_blocks_for_parent(&block_hash)); + // forget blocks we are going to process + let blocks_hashes_to_forget: Vec<_> = blocks_to_verify.iter().map(|t| t.0.clone()).collect(); + chain.forget_blocks_leave_header(&blocks_hashes_to_forget); + // remember that we are verifying these blocks + let blocks_headers_to_verify: Vec<_> = blocks_to_verify.iter().map(|&(ref h, ref b)| (h.clone(), b.block_header.clone())).collect(); + chain.verify_blocks(blocks_headers_to_verify); + // remember that we are verifying block from this peer + self.verifying_blocks_by_peer.insert(block_hash.clone(), peer_index); + match self.verifying_blocks_waiters.entry(peer_index) { + Entry::Occupied(mut entry) => { + entry.get_mut().0.insert(block_hash.clone()); + }, + Entry::Vacant(entry) => { + let mut block_hashes = HashSet::new(); + block_hashes.insert(block_hash.clone()); + entry.insert((block_hashes, None)); + } + } + result = Some(blocks_to_verify); + }, + BlockState::Requested | BlockState::Scheduled => { + // remember peer as useful + self.peers.useful_peer(peer_index); + // remember as orphan block + self.orphaned_blocks_pool.insert_orphaned_block(block_hash, block); + } + } + }, + } + + result + } + + /// Process new peer transaction + fn process_peer_transaction(&mut self, transaction: Transaction) -> Option> { + // if we are in synchronization state, we will ignore this message + if self.state.is_synchronizing() { + return None; + } + + // else => verify transaction + it's orphans and then add to the memory pool + let hash = transaction.hash(); + let mut chain = self.chain.write(); + + // if any parent transaction is unknown => we have orphan transaction => remember in orphan pool + let unknown_parents: HashSet = transaction.inputs.iter() + .filter(|input| chain.transaction_state(&input.previous_output.hash) == TransactionState::Unknown) + .map(|input| input.previous_output.hash.clone()) + .collect(); + if !unknown_parents.is_empty() { + self.orphaned_transactions_pool.insert(hash, transaction, unknown_parents); + return None; + } + + // else verify && insert this transaction && all dependent orphans + let mut transactons: VecDeque<(H256, Transaction)> = VecDeque::new(); + transactons.push_back((hash.clone(), transaction)); + transactons.extend(self.orphaned_transactions_pool.remove_transactions_for_parent(&hash)); + // remember that we are verifying these transactions + for &(ref h, ref tx) in &transactons { + chain.verify_transaction(h.clone(), tx.clone()); + } + + Some(transactons) + } + fn prepare_blocks_requests_tasks(&mut self, peers: Vec, mut hashes: Vec) -> Vec { use std::mem::swap; @@ -1020,64 +1065,6 @@ impl SynchronizationClient where T: TaskExecutor { } } } - - /// Thread procedure for handling verification tasks - fn verification_worker_proc(sync: Arc>, mut verifier: ChainVerifier, work_receiver: Receiver) { - let bip16_time_border = { sync.lock().config().consensus_params.bip16_time }; - let mut is_bip16_active = false; - let mut parameters_change_steps = Some(0); - - while let Ok(task) = work_receiver.recv() { - match task { - VerificationTask::VerifyBlock(block) => { - // for changes that are not relying on block# - let is_bip16_active_on_block = block.block_header.time >= bip16_time_border; - let force_parameters_change = is_bip16_active_on_block != is_bip16_active; - if force_parameters_change { - parameters_change_steps = Some(0); - } - - // change verifier parameters, if needed - if let Some(steps_left) = parameters_change_steps { - if steps_left == 0 { - let sync = sync.lock(); - let config = sync.config(); - let best_storage_block = sync.chain.read().best_storage_block(); - - is_bip16_active = is_bip16_active_on_block; - verifier = verifier.verify_p2sh(is_bip16_active); - - let is_bip65_active = best_storage_block.number >= config.consensus_params.bip65_height; - verifier = verifier.verify_clocktimeverify(is_bip65_active); - - if is_bip65_active { - parameters_change_steps = None; - } else { - parameters_change_steps = Some(config.consensus_params.bip65_height - best_storage_block.number); - } - } else { - parameters_change_steps = Some(steps_left - 1); - } - } - - // verify block - match verifier.verify(&block) { - Ok(_chain) => { - sync.lock().on_block_verification_success(block) - }, - Err(e) => { - sync.lock().on_block_verification_error(&format!("{:?}", e), &block.hash()) - } - } - }, - VerificationTask::VerifyTransaction(transaction) => { - // TODO: add verification here - sync.lock().on_transaction_verification_error("unimplemented", &transaction.hash()) - } - VerificationTask::Stop => break, - } - } - } } impl PeersBlocksWaiter { @@ -1103,11 +1090,11 @@ pub mod tests { use parking_lot::{Mutex, RwLock}; use tokio_core::reactor::{Core, Handle}; use chain::{Block, Transaction, RepresentH256}; - use message::common::{Magic, ConsensusParams}; use super::{Client, Config, SynchronizationClient}; use synchronization_executor::Task; use synchronization_chain::{Chain, ChainRef}; use synchronization_executor::tests::DummyTaskExecutor; + use synchronization_verifier::tests::DummyVerifier; use primitives::hash::H256; use p2p::event_loop; use test_data; @@ -1119,7 +1106,7 @@ pub mod tests { Arc::new(db::Storage::new(path.as_path()).unwrap()) } - fn create_sync(storage: Option) -> (Core, Handle, Arc>, ChainRef, Arc>>) { + fn create_sync(storage: Option) -> (Core, Handle, Arc>, ChainRef, Arc>>) { let event_loop = event_loop(); let handle = event_loop.handle(); let storage = match storage { @@ -1128,9 +1115,14 @@ pub mod tests { }; let chain = ChainRef::new(RwLock::new(Chain::new(storage.clone()))); let executor = DummyTaskExecutor::new(); - let config = Config { consensus_params: ConsensusParams::with_magic(Magic::Mainnet), threads_num: 1, skip_verification: true }; + let config = Config { threads_num: 1 }; let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone()); + { + let verifier_sink = client.lock().core(); + let verifier = DummyVerifier::new(verifier_sink); + client.lock().set_verifier(verifier); + } (event_loop, handle, executor, chain, client) } @@ -1713,7 +1705,7 @@ pub mod tests { assert!(sync.information().state.is_synchronizing()); - sync.process_peer_transaction(Transaction::default()); + sync.on_peer_transaction(1, Transaction::default()); assert_eq!(sync.information().chain.transactions.transactions_count, 0); } @@ -1723,7 +1715,7 @@ pub mod tests { let (_, _, _, _, sync) = create_sync(None); let mut sync = sync.lock(); - sync.process_peer_transaction(test_data::TransactionBuilder::with_version(1).into()); + sync.on_peer_transaction(1, test_data::TransactionBuilder::with_version(1).into()); assert_eq!(sync.information().chain.transactions.transactions_count, 1); let b1 = test_data::block_h1(); @@ -1731,7 +1723,7 @@ pub mod tests { assert!(sync.information().state.is_nearly_saturated()); - sync.process_peer_transaction(test_data::TransactionBuilder::with_version(2).into()); + sync.on_peer_transaction(1, test_data::TransactionBuilder::with_version(2).into()); assert_eq!(sync.information().chain.transactions.transactions_count, 2); } @@ -1740,7 +1732,7 @@ pub mod tests { let (_, _, _, _, sync) = create_sync(None); let mut sync = sync.lock(); - sync.process_peer_transaction(test_data::TransactionBuilder::with_default_input(0).into()); + sync.on_peer_transaction(1, test_data::TransactionBuilder::with_default_input(0).into()); assert_eq!(sync.information().chain.transactions.transactions_count, 0); assert_eq!(sync.information().orphaned_transactions, 1); } @@ -1754,11 +1746,11 @@ pub mod tests { let (_, _, _, _, sync) = create_sync(None); let mut sync = sync.lock(); - sync.process_peer_transaction(chain.at(1)); + sync.on_peer_transaction(1, chain.at(1)); assert_eq!(sync.information().chain.transactions.transactions_count, 0); assert_eq!(sync.information().orphaned_transactions, 1); - sync.process_peer_transaction(chain.at(0)); + sync.on_peer_transaction(1, chain.at(0)); assert_eq!(sync.information().chain.transactions.transactions_count, 2); assert_eq!(sync.information().orphaned_transactions, 0); } diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs new file mode 100644 index 00000000..f722a7c3 --- /dev/null +++ b/sync/src/synchronization_verifier.rs @@ -0,0 +1,179 @@ +use std::thread; +use std::sync::Arc; +use std::sync::mpsc::{channel, Sender, Receiver}; +use parking_lot::Mutex; +use chain::{Block, Transaction, RepresentH256}; +use message::common::ConsensusParams; +use primitives::hash::H256; +use verification::{ChainVerifier, Verify as VerificationVerify}; +use synchronization_chain::ChainRef; + +/// Verification events sink +pub trait VerificationSink : Send + 'static { + /// When block verification has completed successfully. + fn on_block_verification_success(&mut self, block: Block); + /// When block verification has failed. + fn on_block_verification_error(&mut self, err: &str, hash: &H256); + /// When transaction verification has completed successfully. + fn on_transaction_verification_success(&mut self, transaction: Transaction); + /// When transaction verification has failed. + fn on_transaction_verification_error(&mut self, err: &str, hash: &H256); +} + +/// Verification thread tasks +enum VerificationTask { + /// Verify single block + VerifyBlock(Block), + /// Verify single transaction + VerifyTransaction(Transaction), + /// Stop verification thread + Stop, +} + +/// Synchronization verifier +pub trait Verifier : Send + 'static { + /// Verify block + fn verify_block(&self, block: Block); + /// Verify transaction + fn verify_transaction(&self, transaction: Transaction); +} + +/// Asynchronous synchronization verifier +pub struct AsyncVerifier { + /// Verification work transmission channel. + verification_work_sender: Sender, + /// Verification thread. + verification_worker_thread: Option>, +} + +impl AsyncVerifier { + /// Create new async verifier + pub fn new(consensus_params: ConsensusParams, chain: ChainRef, sink: Arc>) -> Self { + let (verification_work_sender, verification_work_receiver) = channel(); + let storage = chain.read().storage(); + let verifier = ChainVerifier::new(storage); + AsyncVerifier { + verification_work_sender: verification_work_sender, + verification_worker_thread: Some(thread::Builder::new() + .name("Sync verification thread".to_string()) + .spawn(move || { + AsyncVerifier::verification_worker_proc(sink, chain, consensus_params, verifier, verification_work_receiver) + }) + .expect("Error creating verification thread")) + } + } + + /// Thread procedure for handling verification tasks + fn verification_worker_proc(sink: Arc>, chain: ChainRef, consensus_params: ConsensusParams, mut verifier: ChainVerifier, work_receiver: Receiver) { + let bip16_time_border = consensus_params.bip16_time; + let mut is_bip16_active = false; + let mut parameters_change_steps = Some(0); + + while let Ok(task) = work_receiver.recv() { + match task { + VerificationTask::VerifyBlock(block) => { + // for changes that are not relying on block# + let is_bip16_active_on_block = block.block_header.time >= bip16_time_border; + let force_parameters_change = is_bip16_active_on_block != is_bip16_active; + if force_parameters_change { + parameters_change_steps = Some(0); + } + + // change verifier parameters, if needed + if let Some(steps_left) = parameters_change_steps { + if steps_left == 0 { + let best_storage_block = chain.read().best_storage_block(); + + is_bip16_active = is_bip16_active_on_block; + verifier = verifier.verify_p2sh(is_bip16_active); + + let is_bip65_active = best_storage_block.number >= consensus_params.bip65_height; + verifier = verifier.verify_clocktimeverify(is_bip65_active); + + if is_bip65_active { + parameters_change_steps = None; + } else { + parameters_change_steps = Some(consensus_params.bip65_height - best_storage_block.number); + } + } else { + parameters_change_steps = Some(steps_left - 1); + } + } + + // verify block + match verifier.verify(&block) { + Ok(_chain) => { + sink.lock().on_block_verification_success(block) + }, + Err(e) => { + sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash()) + } + } + }, + VerificationTask::VerifyTransaction(transaction) => { + // TODO: add verification here + sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash()) + } + VerificationTask::Stop => break, + } + } + } +} + +impl Drop for AsyncVerifier { + fn drop(&mut self) { + if let Some(join_handle) = self.verification_worker_thread.take() { + // ignore send error here <= destructing anyway + let _ = self.verification_work_sender.send(VerificationTask::Stop); + join_handle.join().expect("Clean shutdown."); + } + } +} + +impl Verifier for AsyncVerifier { + /// Verify block + fn verify_block(&self, block: Block) { + self.verification_work_sender + .send(VerificationTask::VerifyBlock(block)) + .expect("Verification thread have the same lifetime as `AsyncVerifier`"); + } + + /// Verify transaction + fn verify_transaction(&self, transaction: Transaction) { + self.verification_work_sender + .send(VerificationTask::VerifyTransaction(transaction)) + .expect("Verification thread have the same lifetime as `AsyncVerifier`"); + } +} + +#[cfg(test)] +pub mod tests { + use std::sync::Arc; + use parking_lot::Mutex; + use chain::{Block, Transaction}; + use synchronization_client::SynchronizationClientCore; + use synchronization_executor::tests::DummyTaskExecutor; + use super::{Verifier, VerificationSink}; + + pub struct DummyVerifier { + sink: Arc>>, + } + + impl DummyVerifier { + pub fn new(sink: Arc>>) -> Self { + DummyVerifier { + sink: sink, + } + } + } + + impl Verifier for DummyVerifier { + fn verify_block(&self, block: Block) { + self.sink.lock().on_block_verification_success(block); + } + + fn verify_transaction(&self, transaction: Transaction) { + self.sink.lock().on_transaction_verification_success(transaction); + } + } +} From 70bb4ef5ccc098ac8fd8a8feb9b99da8b81094d9 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 18 Nov 2016 14:00:14 +0300 Subject: [PATCH 2/4] added test for issue #121 --- sync/src/best_headers_chain.rs | 1 + sync/src/hash_queue.rs | 3 +- sync/src/local_node.rs | 3 +- sync/src/synchronization_client.rs | 109 ++++++++++++++++++++------- sync/src/synchronization_verifier.rs | 36 +++++++-- 5 files changed, 117 insertions(+), 35 deletions(-) diff --git a/sync/src/best_headers_chain.rs b/sync/src/best_headers_chain.rs index 9a8a7ef2..d8c1aefc 100644 --- a/sync/src/best_headers_chain.rs +++ b/sync/src/best_headers_chain.rs @@ -14,6 +14,7 @@ pub struct Information { // TODO: currently it supports first chain only (so whatever headers sequence came first, it is best) /// Builds the block-header-chain of in-memory blocks, for which only headers are currently known +#[derive(Debug)] pub struct BestHeadersChain { /// Best hash in storage storage_best_hash: H256, diff --git a/sync/src/hash_queue.rs b/sync/src/hash_queue.rs index 960b03c9..14b7acc4 100644 --- a/sync/src/hash_queue.rs +++ b/sync/src/hash_queue.rs @@ -15,13 +15,14 @@ pub enum HashPosition { } /// Ordered queue with O(1) contains() && random access operations cost. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct HashQueue { queue: VecDeque, set: HashSet, } /// Chain of linked queues. First queue has index zero. +#[derive(Debug)] pub struct HashQueueChain { chain: Vec, } diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 73c30ac1..390dd925 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -284,7 +284,8 @@ mod tests { let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone()); { let verifier_sink = client.lock().core(); - let verifier = DummyVerifier::new(verifier_sink); + let mut verifier = DummyVerifier::new(); + verifier.set_sink(verifier_sink); client.lock().set_verifier(verifier); } let local_node = LocalNode::new(server.clone(), client, executor.clone()); diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 96563c8a..94a807ae 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -1106,7 +1106,7 @@ pub mod tests { Arc::new(db::Storage::new(path.as_path()).unwrap()) } - fn create_sync(storage: Option) -> (Core, Handle, Arc>, ChainRef, Arc>>) { + fn create_sync(storage: Option, verifier: Option) -> (Core, Handle, Arc>, ChainRef, Arc>>) { let event_loop = event_loop(); let handle = event_loop.handle(); let storage = match storage { @@ -1120,7 +1120,11 @@ pub mod tests { let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone()); { let verifier_sink = client.lock().core(); - let verifier = DummyVerifier::new(verifier_sink); + let mut verifier = match verifier { + Some(verifier) => verifier, + None => DummyVerifier::new(), + }; + verifier.set_sink(verifier_sink); client.lock().set_verifier(verifier); } (event_loop, handle, executor, chain, client) @@ -1128,7 +1132,7 @@ pub mod tests { #[test] fn synchronization_saturated_on_start() { - let (_, _, _, _, sync) = create_sync(None); + let (_, _, _, _, sync) = create_sync(None, None); let sync = sync.lock(); let info = sync.information(); assert!(!info.state.is_synchronizing()); @@ -1138,7 +1142,7 @@ pub mod tests { #[test] fn synchronization_in_order_block_path_nearly_saturated() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let block1: Block = test_data::block_h1(); @@ -1179,7 +1183,7 @@ pub mod tests { #[test] fn synchronization_out_of_order_block_path() { - let (_, _, _, _, sync) = create_sync(None); + let (_, _, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_new_blocks_headers(5, vec![test_data::block_h1().block_header.clone(), test_data::block_h2().block_header.clone()]); @@ -1199,7 +1203,7 @@ pub mod tests { #[test] fn synchronization_parallel_peers() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let block1: Block = test_data::block_h1(); let block2: Block = test_data::block_h2(); @@ -1245,7 +1249,7 @@ pub mod tests { #[test] fn synchronization_reset_when_peer_is_disconnected() { - let (_, _, _, _, sync) = create_sync(None); + let (_, _, _, _, sync) = create_sync(None, None); // request new blocks { @@ -1264,7 +1268,7 @@ pub mod tests { #[test] fn synchronization_not_starting_when_receiving_known_blocks() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let mut sync = sync.lock(); // saturated => receive inventory with known blocks only sync.on_new_blocks_headers(1, vec![test_data::genesis().block_header]); @@ -1277,7 +1281,7 @@ pub mod tests { #[test] fn synchronization_asks_for_inventory_after_saturating() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let block = test_data::block_h1(); sync.on_new_blocks_headers(1, vec![block.block_header.clone()]); @@ -1295,7 +1299,7 @@ pub mod tests { #[test] fn synchronization_remembers_correct_block_headers_in_order() { - let (_, _, executor, chain, sync) = create_sync(None); + let (_, _, executor, chain, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -1338,7 +1342,7 @@ pub mod tests { #[test] fn synchronization_remembers_correct_block_headers_out_of_order() { - let (_, _, executor, chain, sync) = create_sync(None); + let (_, _, executor, chain, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -1381,7 +1385,7 @@ pub mod tests { #[test] fn synchronization_ignores_unknown_block_headers() { - let (_, _, executor, chain, sync) = create_sync(None); + let (_, _, executor, chain, sync) = create_sync(None, None); let mut sync = sync.lock(); let b169 = test_data::block_h169(); @@ -1401,7 +1405,7 @@ pub mod tests { let genesis = test_data::genesis(); storage.insert_block(&genesis).expect("no db error"); - let (_, _, executor, chain, sync) = create_sync(Some(storage)); + let (_, _, executor, chain, sync) = create_sync(Some(storage), None); let genesis_header = &genesis.block_header; let fork1 = test_data::build_n_empty_blocks_from(2, 100, &genesis_header); let fork2 = test_data::build_n_empty_blocks_from(3, 200, &genesis_header); @@ -1459,7 +1463,7 @@ pub mod tests { let genesis = test_data::genesis(); storage.insert_block(&genesis).expect("no db error"); - let (_, _, executor, chain, sync) = create_sync(Some(storage)); + let (_, _, executor, chain, sync) = create_sync(Some(storage), None); let common_block = test_data::block_builder().header().parent(genesis.hash()).build().build(); let fork1 = test_data::build_n_empty_blocks_from(2, 100, &common_block.block_header); let fork2 = test_data::build_n_empty_blocks_from(3, 200, &common_block.block_header); @@ -1499,7 +1503,7 @@ pub mod tests { #[test] fn accept_out_of_order_blocks_when_saturated() { - let (_, _, _, chain, sync) = create_sync(None); + let (_, _, _, chain, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_peer_block(1, test_data::block_h2()); @@ -1521,7 +1525,7 @@ pub mod tests { #[test] fn do_not_rerequest_unknown_block_in_inventory() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_peer_block(1, test_data::block_h2()); @@ -1533,7 +1537,7 @@ pub mod tests { #[test] fn blocks_rerequested_on_peer_disconnect() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let block1: Block = test_data::block_h1(); let block2: Block = test_data::block_h2(); @@ -1573,7 +1577,7 @@ pub mod tests { #[test] fn peer_removed_from_sync_after_responding_with_requested_block_notfound() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -1599,7 +1603,7 @@ pub mod tests { #[test] fn peer_not_removed_from_sync_after_responding_with_requested_block_notfound() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -1625,7 +1629,7 @@ pub mod tests { #[test] fn transaction_is_not_requested_when_synchronizing() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -1643,7 +1647,7 @@ pub mod tests { #[test] fn transaction_is_requested_when_not_synchronizing() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_new_transactions_inventory(0, vec![H256::from(0)]); @@ -1667,7 +1671,7 @@ pub mod tests { #[test] fn same_transaction_can_be_requested_twice() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_new_transactions_inventory(0, vec![H256::from(0)]); @@ -1687,7 +1691,7 @@ pub mod tests { #[test] fn known_transaction_is_not_requested() { - let (_, _, executor, _, sync) = create_sync(None); + let (_, _, executor, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_new_transactions_inventory(0, vec![test_data::genesis().transactions[0].hash(), H256::from(0)]); @@ -1696,7 +1700,7 @@ pub mod tests { #[test] fn transaction_is_not_accepted_when_synchronizing() { - let (_, _, _, _, sync) = create_sync(None); + let (_, _, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); let b1 = test_data::block_h1(); @@ -1712,7 +1716,7 @@ pub mod tests { #[test] fn transaction_is_accepted_when_not_synchronizing() { - let (_, _, _, _, sync) = create_sync(None); + let (_, _, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_peer_transaction(1, test_data::TransactionBuilder::with_version(1).into()); @@ -1729,7 +1733,7 @@ pub mod tests { #[test] fn transaction_is_orphaned_when_input_is_unknown() { - let (_, _, _, _, sync) = create_sync(None); + let (_, _, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_peer_transaction(1, test_data::TransactionBuilder::with_default_input(0).into()); @@ -1743,7 +1747,7 @@ pub mod tests { test_data::TransactionBuilder::with_output(10).store(chain) // t0 .set_input(&chain.at(0), 0).set_output(20).store(chain); // t0 -> t1 - let (_, _, _, _, sync) = create_sync(None); + let (_, _, _, _, sync) = create_sync(None, None); let mut sync = sync.lock(); sync.on_peer_transaction(1, chain.at(1)); @@ -1754,4 +1758,55 @@ pub mod tests { assert_eq!(sync.information().chain.transactions.transactions_count, 2); assert_eq!(sync.information().orphaned_transactions, 0); } + + #[test] + #[ignore] // TODO: causes panic currently + // https://github.com/ethcore/parity-bitcoin/issues/121 + fn when_previous_block_verification_failed_fork_is_not_requested() { + // got headers [b10, b11, b12] - some fork + // got headers [b10, b21, b22] - main branch + // got b10, b11, b12, b21. b22 is requested + // + // verifying: [b10, b11, b12, b21] + // headers_chain: [b10, b11, b12] + // + // b21 verification failed => b22 is not removed (since it is not in headers_chain) + // got new headers [b10, b21, b22, b23] => intersection point is b10 => scheduling [b21, b22, b23] + // + // block queue is empty => new tasks => requesting [b21, b22] => panic in hash_queue + // + // TODO: do not trust first intersection point - check each hash when scheduling hashes. + // If at least one hash is known => previous verification failed => drop all headers. + + let genesis = test_data::genesis(); + let b10 = test_data::block_builder().header().parent(genesis.hash()).build().build(); + + let b11 = test_data::block_builder().header().nonce(1).parent(b10.hash()).build().build(); + let b12 = test_data::block_builder().header().parent(b11.hash()).build().build(); + + let b21 = test_data::block_builder().header().nonce(2).parent(b10.hash()).build().build(); + let b22 = test_data::block_builder().header().parent(b21.hash()).build().build(); + let b23 = test_data::block_builder().header().parent(b22.hash()).build().build(); + + // TODO: simulate verification during b21 verification + let mut dummy_verifier = DummyVerifier::new(); + dummy_verifier.error_when_verifying(b21.hash(), "simulated"); + + let (_, _, _, _, sync) = create_sync(None, Some(dummy_verifier)); + + let mut sync = sync.lock(); + + sync.on_new_blocks_headers(1, vec![b10.block_header.clone(), b11.block_header.clone(), b12.block_header.clone()]); + sync.on_new_blocks_headers(2, vec![b10.block_header.clone(), b21.block_header.clone(), b22.block_header.clone()]); + + sync.on_peer_block(1, b10.clone()); + sync.on_peer_block(1, b11); + sync.on_peer_block(1, b12); + + sync.on_peer_block(2, b21.clone()); + + // should not panic here + sync.on_new_blocks_headers(2, vec![b10.block_header.clone(), b21.block_header.clone(), + b22.block_header.clone(), b23.block_header.clone()]); + } } diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index f722a7c3..e8d45565 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -149,31 +149,55 @@ impl Verifier for AsyncVerifier { #[cfg(test)] pub mod tests { use std::sync::Arc; + use std::collections::HashMap; use parking_lot::Mutex; - use chain::{Block, Transaction}; + use chain::{Block, Transaction, RepresentH256}; use synchronization_client::SynchronizationClientCore; use synchronization_executor::tests::DummyTaskExecutor; + use primitives::hash::H256; use super::{Verifier, VerificationSink}; pub struct DummyVerifier { - sink: Arc>>, + sink: Option>>>, + errors: HashMap } impl DummyVerifier { - pub fn new(sink: Arc>>) -> Self { + pub fn new() -> Self { DummyVerifier { - sink: sink, + sink: None, + errors: HashMap::new(), } } + + pub fn set_sink(&mut self, sink: Arc>>) { + self.sink = Some(sink); + } + + pub fn error_when_verifying(&mut self, hash: H256, err: &str) { + self.errors.insert(hash, err.into()); + } } impl Verifier for DummyVerifier { fn verify_block(&self, block: Block) { - self.sink.lock().on_block_verification_success(block); + match self.sink { + Some(ref sink) => match self.errors.get(&block.hash()) { + Some(err) => sink.lock().on_block_verification_error(&err, &block.hash()), + None => sink.lock().on_block_verification_success(block), + }, + None => panic!("call set_sink"), + } } fn verify_transaction(&self, transaction: Transaction) { - self.sink.lock().on_transaction_verification_success(transaction); + match self.sink { + Some(ref sink) => match self.errors.get(&transaction.hash()) { + Some(err) => sink.lock().on_transaction_verification_error(&err, &transaction.hash()), + None => sink.lock().on_transaction_verification_success(transaction), + }, + None => panic!("call set_sink"), + } } } } From b66fd70c6f993421ed9b88839fc1af3d0a823dc6 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 18 Nov 2016 14:39:34 +0300 Subject: [PATCH 3/4] fixed client construction --- sync/src/lib.rs | 12 ++--- sync/src/local_node.rs | 13 ++--- sync/src/synchronization_client.rs | 77 +++++++++++------------------- 3 files changed, 37 insertions(+), 65 deletions(-) diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 8b9d490e..bcf96928 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -61,19 +61,15 @@ pub fn create_sync_connection_factory(handle: &Handle, consensus_params: Consens use local_node::LocalNode as SyncNode; use inbound_connection_factory::InboundConnectionFactory as SyncConnectionFactory; use synchronization_server::SynchronizationServer; - use synchronization_client::{SynchronizationClient, Config as SynchronizationConfig}; + use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig}; use synchronization_verifier::AsyncVerifier; let sync_chain = Arc::new(RwLock::new(SyncChain::new(db))); let sync_executor = SyncExecutor::new(sync_chain.clone()); let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone())); - let sync_client = SynchronizationClient::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone()); - { - let verifier_sink = sync_client.lock().core(); - let verifier = AsyncVerifier::new(consensus_params, sync_chain, verifier_sink); - sync_client.lock().set_verifier(verifier); - } - + let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone()); + let verifier = AsyncVerifier::new(consensus_params, sync_chain, sync_client_core.clone()); + let sync_client = SynchronizationClient::new(sync_client_core, verifier); let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor)); SyncConnectionFactory::with_local_node(sync_node) } diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 390dd925..3d3f4f7d 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -230,7 +230,7 @@ mod tests { use chain::RepresentH256; use synchronization_executor::Task; use synchronization_executor::tests::DummyTaskExecutor; - use synchronization_client::{Config, SynchronizationClient}; + use synchronization_client::{Config, SynchronizationClient, SynchronizationClientCore}; use synchronization_chain::Chain; use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef}; use message::types; @@ -281,13 +281,10 @@ mod tests { let executor = DummyTaskExecutor::new(); let server = Arc::new(DummyServer::new()); let config = Config { threads_num: 1 }; - let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone()); - { - let verifier_sink = client.lock().core(); - let mut verifier = DummyVerifier::new(); - verifier.set_sink(verifier_sink); - client.lock().set_verifier(verifier); - } + let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone()); + let mut verifier = DummyVerifier::new(); + verifier.set_sink(client_core.clone()); + let client = SynchronizationClient::new(client_core, verifier); let local_node = LocalNode::new(server.clone(), client, executor.clone()); (event_loop, handle, executor, server, local_node) } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 94a807ae..285410e4 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -223,10 +223,10 @@ pub struct Config { /// Synchronization client facade pub struct SynchronizationClient { - /// Client - client: Arc>>, + /// Client core + core: Arc>>, /// Verifier - verifier: Option>, + verifier: U, } /// Synchronization client. @@ -286,46 +286,42 @@ impl State { impl Client for SynchronizationClient where T: TaskExecutor, U: Verifier { fn best_block(&self) -> db::BestBlock { - self.client.lock().best_block() + self.core.lock().best_block() } fn state(&self) -> State { - self.client.lock().state() + self.core.lock().state() } fn on_new_blocks_inventory(&mut self, peer_index: usize, blocks_hashes: Vec) { - self.client.lock().on_new_blocks_inventory(peer_index, blocks_hashes) + self.core.lock().on_new_blocks_inventory(peer_index, blocks_hashes) } fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec) { - self.client.lock().on_new_transactions_inventory(peer_index, transactions_hashes) + self.core.lock().on_new_transactions_inventory(peer_index, transactions_hashes) } fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec) { - self.client.lock().on_new_blocks_headers(peer_index, blocks_headers); + self.core.lock().on_new_blocks_headers(peer_index, blocks_headers); } fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec) { - self.client.lock().on_peer_blocks_notfound(peer_index, blocks_hashes); + self.core.lock().on_peer_blocks_notfound(peer_index, blocks_hashes); } fn on_peer_block(&mut self, peer_index: usize, block: Block) { - let blocks_to_verify = { self.client.lock().on_peer_block(peer_index, block) }; + let blocks_to_verify = { self.core.lock().on_peer_block(peer_index, block) }; // verify selected blocks if let Some(mut blocks_to_verify) = blocks_to_verify { while let Some((_, block)) = blocks_to_verify.pop_front() { - // schedule verification - match self.verifier { - Some(ref verifier) => verifier.verify_block(block), - None => panic!("call set_verifier after construction"), - } + self.verifier.verify_block(block); } } // try to switch to saturated state OR execute sync tasks { - let mut client = self.client.lock(); + let mut client = self.core.lock(); if !client.try_switch_to_saturated_state() { client.execute_synchronization_tasks(None); } @@ -333,53 +329,39 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri } fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction) { - let transactions_to_verify = { self.client.lock().on_peer_transaction(peer_index, transaction) }; + let transactions_to_verify = { self.core.lock().on_peer_transaction(peer_index, transaction) }; if let Some(mut transactions_to_verify) = transactions_to_verify { while let Some((_, tx)) = transactions_to_verify.pop_front() { - // schedule verification - match self.verifier { - Some(ref verifier) => verifier.verify_transaction(tx), - None => panic!("call set_verifier after construction"), - } + self.verifier.verify_transaction(tx); } } } fn on_peer_disconnected(&mut self, peer_index: usize) { - self.client.lock().on_peer_disconnected(peer_index); + self.core.lock().on_peer_disconnected(peer_index); } fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option>) { - self.client.lock().get_peers_nearly_blocks_waiter(peer_index) + self.core.lock().get_peers_nearly_blocks_waiter(peer_index) } } impl SynchronizationClient where T: TaskExecutor, U: Verifier { /// Create new synchronization client - pub fn new(config: Config, handle: &Handle, executor: Arc>, chain: ChainRef) -> Arc> { + pub fn new(core: Arc>>, verifier: U) -> Arc> { Arc::new(Mutex::new( SynchronizationClient { - client: SynchronizationClientCore::new(config, handle, executor, chain), - verifier: None, + core: core, + verifier: verifier, } )) } - /// Get client core - pub fn core(&self) -> Arc>> { - self.client.clone() - } - - /// Set verifier (TODO: use builder && check in build instead) - pub fn set_verifier(&mut self, verifier: U) { - self.verifier = Some(Box::new(verifier)); - } - /// Get information on current synchronization state. #[cfg(test)] pub fn information(&self) -> Information { - self.client.lock().information() + self.core.lock().information() } } @@ -1090,7 +1072,7 @@ pub mod tests { use parking_lot::{Mutex, RwLock}; use tokio_core::reactor::{Core, Handle}; use chain::{Block, Transaction, RepresentH256}; - use super::{Client, Config, SynchronizationClient}; + use super::{Client, Config, SynchronizationClient, SynchronizationClientCore}; use synchronization_executor::Task; use synchronization_chain::{Chain, ChainRef}; use synchronization_executor::tests::DummyTaskExecutor; @@ -1117,16 +1099,13 @@ pub mod tests { let executor = DummyTaskExecutor::new(); let config = Config { threads_num: 1 }; - let client = SynchronizationClient::new(config, &handle, executor.clone(), chain.clone()); - { - let verifier_sink = client.lock().core(); - let mut verifier = match verifier { - Some(verifier) => verifier, - None => DummyVerifier::new(), - }; - verifier.set_sink(verifier_sink); - client.lock().set_verifier(verifier); - } + let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone()); + let mut verifier = match verifier { + Some(verifier) => verifier, + None => DummyVerifier::new(), + }; + verifier.set_sink(client_core.clone()); + let client = SynchronizationClient::new(client_core, verifier); (event_loop, handle, executor, chain, client) } From 6c1a08b5e80a1a4db62bdb3536b44aa59400ce3d Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Fri, 18 Nov 2016 17:10:57 +0300 Subject: [PATCH 4/4] fixed style --- sync/src/local_node.rs | 2 +- sync/src/synchronization_client.rs | 7 ++----- sync/src/synchronization_verifier.rs | 8 +------- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 3d3f4f7d..5337e091 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -282,7 +282,7 @@ mod tests { let server = Arc::new(DummyServer::new()); let config = Config { threads_num: 1 }; let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone()); - let mut verifier = DummyVerifier::new(); + let mut verifier = DummyVerifier::default(); verifier.set_sink(client_core.clone()); let client = SynchronizationClient::new(client_core, verifier); let local_node = LocalNode::new(server.clone(), client, executor.clone()); diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 285410e4..5a37dd15 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -1100,10 +1100,7 @@ pub mod tests { let config = Config { threads_num: 1 }; let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone()); - let mut verifier = match verifier { - Some(verifier) => verifier, - None => DummyVerifier::new(), - }; + let mut verifier = verifier.unwrap_or_default(); verifier.set_sink(client_core.clone()); let client = SynchronizationClient::new(client_core, verifier); (event_loop, handle, executor, chain, client) @@ -1768,7 +1765,7 @@ pub mod tests { let b23 = test_data::block_builder().header().parent(b22.hash()).build().build(); // TODO: simulate verification during b21 verification - let mut dummy_verifier = DummyVerifier::new(); + let mut dummy_verifier = DummyVerifier::default(); dummy_verifier.error_when_verifying(b21.hash(), "simulated"); let (_, _, _, _, sync) = create_sync(None, Some(dummy_verifier)); diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index e8d45565..d65b6670 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -157,19 +157,13 @@ pub mod tests { use primitives::hash::H256; use super::{Verifier, VerificationSink}; + #[derive(Default)] pub struct DummyVerifier { sink: Option>>>, errors: HashMap } impl DummyVerifier { - pub fn new() -> Self { - DummyVerifier { - sink: None, - errors: HashMap::new(), - } - } - pub fn set_sink(&mut self, sink: Arc>>) { self.sink = Some(sink); }