diff --git a/sync/src/blocks_writer.rs b/sync/src/blocks_writer.rs index 9e3f84c9..a04c4c94 100644 --- a/sync/src/blocks_writer.rs +++ b/sync/src/blocks_writer.rs @@ -6,9 +6,9 @@ use storage; use network::ConsensusParams; use primitives::hash::H256; use super::Error; -use synchronization_verifier::{Verifier, SyncVerifier, VerificationTask, +use synchronization_verifier::{Verifier, SyncVerifier, VerificationTask, HeadersVerificationSink, VerificationSink, BlockVerificationSink, TransactionVerificationSink}; -use types::StorageRef; +use types::{PeerIndex, StorageRef}; use utils::OrphanBlocksPool; use VerificationParameters; @@ -141,6 +141,16 @@ impl TransactionVerificationSink for BlocksWriterSink { } } +impl HeadersVerificationSink for BlocksWriterSink { + fn on_headers_verification_success(&self, _headers: Vec) { + unreachable!("not intended to verify headers") + } + + fn on_headers_verification_error(&self, _peer: PeerIndex, _err: String, _hash: H256) { + unreachable!("not intended to verify headers") + } +} + #[cfg(test)] mod tests { extern crate test_data; diff --git a/sync/src/lib.rs b/sync/src/lib.rs index e382d49c..9abc8f7e 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -58,7 +58,7 @@ pub enum Error { Verification(String), } -#[derive(Debug)] +#[derive(Debug, Clone)] /// Verification parameters. pub struct VerificationParameters { /// Blocks verification level. @@ -111,13 +111,15 @@ pub fn create_local_sync_node(consensus: ConsensusParams, db: storage::SharedSto let sync_state = SynchronizationStateRef::new(SynchronizationState::with_storage(db.clone())); let sync_chain = SyncChain::new(db.clone(), memory_pool.clone()); - let chain_verifier = Arc::new(ChainVerifier::new(db.clone(), consensus.clone())); + let light_chain_verifier = Arc::new(ChainVerifier::new(db.clone(), consensus.clone())); + let heavy_chain_verifier = Arc::new(ChainVerifier::new(db.clone(), consensus.clone())); let sync_executor = SyncExecutor::new(peers.clone()); let sync_server = Arc::new(ServerImpl::new(peers.clone(), db.clone(), memory_pool.clone(), sync_executor.clone())); - let sync_client_core = SynchronizationClientCore::new(sync_client_config, sync_state.clone(), peers.clone(), sync_executor.clone(), sync_chain, chain_verifier.clone()); + let sync_client_core = SynchronizationClientCore::new(sync_client_config, sync_state.clone(), peers.clone(), sync_executor.clone(), sync_chain); let verifier_sink = Arc::new(CoreVerificationSink::new(sync_client_core.clone())); - let verifier = AsyncVerifier::new(chain_verifier, db.clone(), memory_pool.clone(), verifier_sink, verification_params); - let sync_client = SynchronizationClient::new(sync_state.clone(), sync_client_core, verifier); + let light_verifier = AsyncVerifier::new(light_chain_verifier, db.clone(), memory_pool.clone(), verifier_sink.clone(), verification_params.clone()); + let heavy_verifier = AsyncVerifier::new(heavy_chain_verifier, db.clone(), memory_pool.clone(), verifier_sink, verification_params); + let sync_client = SynchronizationClient::new(sync_state.clone(), sync_client_core, light_verifier, heavy_verifier); Arc::new(SyncNode::new(consensus, db, memory_pool, peers, sync_state, sync_client, sync_server)) } diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 405b34dc..51abbf1b 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -282,7 +282,6 @@ pub mod tests { use synchronization_server::tests::DummyServer; use synchronization_verifier::tests::DummyVerifier; use primitives::bytes::Bytes; - use verification::BackwardsCompatibleChainVerifier as ChainVerifier; use std::iter::repeat; use synchronization_peers::PeersImpl; use utils::SynchronizationState; @@ -312,14 +311,15 @@ pub mod tests { let executor = DummyTaskExecutor::new(); let server = Arc::new(DummyServer::new()); let config = Config { close_connection_on_bad_block: true }; - let chain_verifier = Arc::new(ChainVerifier::new(storage.clone(), ConsensusParams::new(Network::Mainnet))); - let client_core = SynchronizationClientCore::new(config, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier); - let mut verifier = match verifier { + let client_core = SynchronizationClientCore::new(config, sync_state.clone(), sync_peers.clone(), executor.clone(), chain); + let mut light_verifier = DummyVerifier::default(); + light_verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone()))); + let mut heavy_verifier = match verifier { Some(verifier) => verifier, None => DummyVerifier::default(), }; - verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone()))); - let client = SynchronizationClient::new(sync_state.clone(), client_core, verifier); + heavy_verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone()))); + let client = SynchronizationClient::new(sync_state.clone(), client_core, light_verifier, heavy_verifier); let local_node = LocalNode::new(ConsensusParams::new(Network::Mainnet), storage, memory_pool, sync_peers, sync_state, client, server.clone()); (executor, server, local_node) } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 5b26f2a4..6ad1184b 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -135,14 +135,16 @@ pub trait Client : Send + Sync + 'static { /// Synchronization client facade pub struct SynchronizationClient { - /// Verification mutex - verification_lock: Mutex<()>, /// Shared client state shared_state: SynchronizationStateRef, /// Client core core: ClientCoreRef>, - /// Verifier - verifier: U, + /// Verification mutex + heavy_verification_lock: Mutex<()>, + /// Verifier that performs heavy verifications (blocks during sync + transactions). + heavy_verifier: U, + /// Verifier that performs lightweight verifications (headers during sync). + light_verifier: U, } impl Client for SynchronizationClient where T: TaskExecutor, U: Verifier { @@ -159,7 +161,10 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri } fn on_headers(&self, peer_index: PeerIndex, message: types::Headers) { - self.core.lock().on_headers(peer_index, message); + let headers_to_verify = self.core.lock().on_headers(peer_index, message); + if let Some(headers_to_verify) = headers_to_verify { + self.light_verifier.verify_headers(peer_index, headers_to_verify); + } } fn on_block(&self, peer_index: PeerIndex, block: IndexedBlock) { @@ -169,13 +174,13 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri { // verification tasks must be scheduled in the same order as they were built in on_block // => here we use verification_lock for this - let _verification_lock = self.verification_lock.lock(); + let _verification_lock = self.heavy_verification_lock.lock(); let blocks_to_verify = self.core.lock().on_block(peer_index, block); // verify blocks if let Some(mut blocks_to_verify) = blocks_to_verify { while let Some(block) = blocks_to_verify.pop_front() { - self.verifier.verify_block(block); + self.heavy_verifier.verify_block(block); } } } @@ -201,7 +206,7 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri // => we should verify blocks we mine let next_block_height = self.shared_state.best_storage_block_height() + 1; while let Some(tx) = transactions_to_verify.pop_front() { - self.verifier.verify_transaction(next_block_height, tx); + self.heavy_verifier.verify_transaction(next_block_height, tx); } } } @@ -219,7 +224,7 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri let next_block_height = self.shared_state.best_storage_block_height() + 1; while let Some(tx) = transactions_to_verify.pop_front() { - self.verifier.verify_transaction(next_block_height, tx); + self.heavy_verifier.verify_transaction(next_block_height, tx); } Ok(()) } @@ -231,12 +236,18 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri impl SynchronizationClient where T: TaskExecutor, U: Verifier { /// Create new synchronization client - pub fn new(shared_state: SynchronizationStateRef, core: ClientCoreRef>, verifier: U) -> Arc { + pub fn new( + shared_state: SynchronizationStateRef, + core: ClientCoreRef>, + light_verifier: U, + heavy_verifier: U, + ) -> Arc { Arc::new(SynchronizationClient { - verification_lock: Mutex::new(()), shared_state: shared_state, core: core, - verifier: verifier, + light_verifier: light_verifier, + heavy_verification_lock: Mutex::new(()), + heavy_verifier: heavy_verifier, }) } } diff --git a/sync/src/synchronization_client_core.rs b/sync/src/synchronization_client_core.rs index 462d59ef..cf63382b 100644 --- a/sync/src/synchronization_client_core.rs +++ b/sync/src/synchronization_client_core.rs @@ -10,14 +10,14 @@ use message::types; use message::common::{InventoryType, InventoryVector}; use miner::transaction_fee_rate; use primitives::hash::H256; -use verification::BackwardsCompatibleChainVerifier as ChainVerifier; use synchronization_chain::{Chain, BlockState, TransactionState, BlockInsertionResult}; use synchronization_executor::{Task, TaskExecutor}; use synchronization_manager::ManagementWorker; use synchronization_peers_tasks::PeersTasks; -use synchronization_verifier::{VerificationSink, BlockVerificationSink, TransactionVerificationSink, VerificationTask}; +use synchronization_verifier::{VerificationSink, HeadersVerificationSink, BlockVerificationSink, + TransactionVerificationSink, VerificationTask}; use types::{BlockHeight, ClientCoreRef, PeersRef, PeerIndex, SynchronizationStateRef, EmptyBoxFuture, SyncListenerRef}; -use utils::{AverageSpeedMeter, MessageBlockHeadersProvider, OrphanBlocksPool, OrphanTransactionsPool, HashPosition}; +use utils::{AverageSpeedMeter, OrphanBlocksPool, OrphanTransactionsPool, HashPosition}; #[cfg(test)] use synchronization_peers_tasks::{Information as PeersTasksInformation}; #[cfg(test)] use synchronization_chain::{Information as ChainInformation}; @@ -67,7 +67,7 @@ pub trait ClientCore { fn on_connect(&mut self, peer_index: PeerIndex); fn on_disconnect(&mut self, peer_index: PeerIndex); fn on_inventory(&self, peer_index: PeerIndex, message: types::Inv); - fn on_headers(&mut self, peer_index: PeerIndex, message: types::Headers); + fn on_headers(&mut self, peer_index: PeerIndex, message: types::Headers) -> Option>; fn on_block(&mut self, peer_index: PeerIndex, block: IndexedBlock) -> Option>; fn on_transaction(&mut self, peer_index: PeerIndex, transaction: IndexedTransaction) -> Option>; fn on_notfound(&mut self, peer_index: PeerIndex, message: types::NotFound); @@ -105,10 +105,6 @@ pub struct SynchronizationClientCore { orphaned_blocks_pool: OrphanBlocksPool, /// Orphaned transactions pool. orphaned_transactions_pool: OrphanTransactionsPool, - /// Chain verifier - chain_verifier: Arc, - /// Verify block headers? - verify_headers: bool, /// Verifying blocks by peer verifying_blocks_by_peer: HashMap, /// Verifying blocks futures @@ -166,16 +162,6 @@ enum AppendTransactionError { Orphan(HashSet), } -/// Blocks headers verification result -enum BlocksHeadersVerificationResult { - /// Skip these blocks headers - Skip, - /// Error during verification of header with given index - Error(usize), - /// Successful verification - Success, -} - impl State { pub fn is_saturated(&self) -> bool { match *self { @@ -259,11 +245,11 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { } /// Try to queue synchronization of unknown blocks when blocks headers are received. - fn on_headers(&mut self, peer_index: PeerIndex, message: types::Headers) { + fn on_headers(&mut self, peer_index: PeerIndex, message: types::Headers) -> Option> { assert!(!message.headers.is_empty(), "This must be checked in incoming connection"); // transform to indexed headers - let mut headers: Vec<_> = message.headers.into_iter().map(IndexedBlockHeader::from_raw).collect(); + let headers: Vec<_> = message.headers.into_iter().map(IndexedBlockHeader::from_raw).collect(); // update peers to select next tasks self.peers_tasks.on_headers_received(peer_index); @@ -287,71 +273,85 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { self.peers.misbehaving(peer_index, "Too many failures."); } - return; + return None; } // find first unknown header position // optimization: normally, the first header will be unknown - let num_headers = headers.len(); - let first_unknown_index = match self.chain.block_state(&header0.hash) { - BlockState::Unknown => 0, - _ => { - // optimization: if last header is known, then all headers are also known - let header_last = &headers[num_headers - 1]; - match self.chain.block_state(&header_last.hash) { - BlockState::Unknown => 1 + headers.iter().skip(1) - .position(|header| self.chain.block_state(&header.hash) == BlockState::Unknown) - .expect("last header has UnknownState; we are searching for first unknown header; qed"), - // else all headers are known - _ => { - trace!(target: "sync", "Ignoring {} known headers from peer#{}", headers.len(), peer_index); - // but this peer is still useful for synchronization - self.peers_tasks.useful_peer(peer_index); - return; - }, - } - } - }; + let headers_in_message = headers.len(); + let headers = self.find_unknown_headers(headers); + if headers.is_empty() { + trace!(target: "sync", "Ignoring {} known headers from peer#{}", headers_in_message, peer_index); + // but this peer is still useful for synchronization + self.peers_tasks.useful_peer(peer_index); + return None; + } // validate blocks headers before scheduling - let last_known_hash = if first_unknown_index > 0 { headers[first_unknown_index - 1].hash.clone() } else { header0.raw.previous_header_hash.clone() }; + let mut last_known_hash = headers[0].raw.previous_header_hash; if self.config.close_connection_on_bad_block && self.chain.block_state(&last_known_hash) == BlockState::DeadEnd { self.peers.misbehaving(peer_index, &format!("Provided after dead-end block {}", last_known_hash.to_reversed_str())); - return; + return None; } - match self.verify_headers(peer_index, last_known_hash, &headers[first_unknown_index..num_headers]) { - BlocksHeadersVerificationResult::Error(error_index) => self.chain.mark_dead_end_block(&headers[first_unknown_index + error_index].hash), - BlocksHeadersVerificationResult::Skip => (), - BlocksHeadersVerificationResult::Success => { - // report progress - let num_new_headers = num_headers - first_unknown_index; - trace!(target: "sync", "New {} headers from peer#{}. First {:?}, last: {:?}", - num_new_headers, + + for (header_index, header) in headers.iter().enumerate() { + // check that this header is direct child of previous header + if header.raw.previous_header_hash != last_known_hash { + self.peers.misbehaving( peer_index, - headers[first_unknown_index].hash.to_reversed_str(), - headers[num_headers - 1].hash.to_reversed_str() + &format!( + "Neighbour headers in `headers` message are unlinked: Prev: {}, PrevLink: {}, Curr: {}", + last_known_hash.to_reversed_str(), + header.raw.previous_header_hash.to_reversed_str(), + header.hash.to_reversed_str(), + ), ); + return None; + } - // prepare new headers array - let new_headers = headers.split_off(first_unknown_index); - self.chain.schedule_blocks_headers(new_headers); + // check that we do not know all blocks in range [first_unknown_index..] + // if we know some block => there has been verification error => all headers should be ignored + // see when_previous_block_verification_failed_fork_is_not_requested for details + match self.chain.block_state(&header.hash) { + BlockState::Unknown => (), + BlockState::DeadEnd if self.config.close_connection_on_bad_block => { + self.peers.misbehaving( + peer_index, + &format!( + "Provided dead-end block {:?}", + header.hash.to_reversed_str(), + ), + ); + return None; + }, + block_state => { + trace!( + target: "sync", + "Ignoring {} headers from peer#{} - known ({:?}) header {} at the {}/{} ({}...{})", + headers.len(), peer_index, block_state, header.hash.to_reversed_str(), header_index, + headers.len(), headers[0].hash.to_reversed_str(), + headers[headers.len() - 1].hash.to_reversed_str()); + self.peers_tasks.useful_peer(peer_index); + return None; + }, + } - // switch to synchronization state - if !self.state.is_synchronizing() { - if self.chain.length_of_blocks_state(BlockState::Scheduled) + - self.chain.length_of_blocks_state(BlockState::Requested) == 1 { - self.switch_to_nearly_saturated_state(); - } else { - self.switch_to_synchronization_state(); - } - } - - // these peers have supplied us with new headers => useful indeed - self.peers_tasks.useful_peer(peer_index); - // and execute tasks - self.execute_synchronization_tasks(None, None); - }, + last_known_hash = header.hash; } + + // report progress + let num_new_headers = headers_in_message - headers.len(); + trace!(target: "sync", "New {} headers from peer#{}. First {:?}, last: {:?}", + num_new_headers, + peer_index, + headers[0].hash.to_reversed_str(), + headers[headers.len() - 1].hash.to_reversed_str() + ); + + // peer has supplied us with new headers => useful indeed + self.peers_tasks.useful_peer(peer_index); + + Some(headers) } fn on_block(&mut self, peer_index: PeerIndex, block: IndexedBlock) -> Option> { @@ -427,8 +427,8 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { let blocks_hashes_to_forget: Vec<_> = blocks_to_verify.iter().map(|b| b.hash().clone()).collect(); self.chain.forget_blocks_leave_header(&blocks_hashes_to_forget); // remember that we are verifying these blocks - let blocks_headers_to_verify: Vec<_> = blocks_to_verify.iter().map(|b| b.header.clone()).collect(); - self.chain.verify_blocks(blocks_headers_to_verify); + let blocks_headers: Vec<_> = blocks_to_verify.iter().map(|b| b.header.clone()).collect(); + self.chain.verify_blocks(blocks_headers); // remember that we are verifying block from this peer for verifying_block_hash in blocks_to_verify.iter().map(|b| b.hash().clone()) { self.verifying_blocks_by_peer.insert(verifying_block_hash, peer_index); @@ -717,6 +717,16 @@ impl CoreVerificationSink where T: TaskExecutor { impl VerificationSink for CoreVerificationSink where T: TaskExecutor { } +impl HeadersVerificationSink for CoreVerificationSink where T: TaskExecutor { + fn on_headers_verification_success(&self, headers: Vec) { + self.core.lock().on_headers_verification_success(headers) + } + + fn on_headers_verification_error(&self, peer: PeerIndex, error: String, hash: H256) { + self.core.lock().on_headers_verification_error(peer, error, hash) + } +} + impl BlockVerificationSink for CoreVerificationSink where T: TaskExecutor { /// Process successful block verification fn on_block_verification_success(&self, block: IndexedBlock) -> Option> { @@ -743,7 +753,7 @@ impl TransactionVerificationSink for CoreVerificationSink where T: TaskExe impl SynchronizationClientCore where T: TaskExecutor { /// Create new synchronization client core - pub fn new(config: Config, shared_state: SynchronizationStateRef, peers: PeersRef, executor: Arc, chain: Chain, chain_verifier: Arc) -> ClientCoreRef { + pub fn new(config: Config, shared_state: SynchronizationStateRef, peers: PeersRef, executor: Arc, chain: Chain) -> ClientCoreRef { let sync = Arc::new(Mutex::new( SynchronizationClientCore { shared_state: shared_state, @@ -755,8 +765,6 @@ impl SynchronizationClientCore where T: TaskExecutor { chain: chain, orphaned_blocks_pool: OrphanBlocksPool::new(), orphaned_transactions_pool: OrphanTransactionsPool::new(), - chain_verifier: chain_verifier, - verify_headers: true, verifying_blocks_by_peer: HashMap::new(), verifying_blocks_futures: HashMap::new(), verifying_transactions_sinks: HashMap::new(), @@ -820,12 +828,6 @@ impl SynchronizationClientCore where T: TaskExecutor { &mut self.orphaned_transactions_pool } - /// Verify block headers or not? - #[cfg(test)] - pub fn set_verify_headers(&mut self, verify: bool) { - self.verify_headers = verify; - } - /// Print synchronization information pub fn print_synchronization_information(&mut self) { if let State::Synchronizing(timestamp, num_of_blocks) = self.state { @@ -857,56 +859,6 @@ impl SynchronizationClientCore where T: TaskExecutor { } } - /// Verify and select unknown headers for scheduling - fn verify_headers(&mut self, peer_index: PeerIndex, last_known_hash: H256, headers: &[IndexedBlockHeader]) -> BlocksHeadersVerificationResult { - // validate blocks headers before scheduling - let mut last_known_hash = &last_known_hash; - let mut headers_provider = MessageBlockHeadersProvider::new(&self.chain, self.chain.best_block_header().number); - for (header_index, header) in headers.iter().enumerate() { - // check that this header is direct child of previous header - if &header.raw.previous_header_hash != last_known_hash { - self.peers.misbehaving(peer_index, &format!("Neighbour headers in `headers` message are unlinked: Prev: {}, PrevLink: {}, Curr: {}", - last_known_hash.to_reversed_str(), header.raw.previous_header_hash.to_reversed_str(), header.hash.to_reversed_str())); - return BlocksHeadersVerificationResult::Skip; - } - - // check that we do not know all blocks in range [first_unknown_index..] - // if we know some block => there has been verification error => all headers should be ignored - // see when_previous_block_verification_failed_fork_is_not_requested for details - match self.chain.block_state(&header.hash) { - BlockState::Unknown => (), - BlockState::DeadEnd if self.config.close_connection_on_bad_block => { - self.peers.misbehaving(peer_index, &format!("Provided dead-end block {:?}", header.hash.to_reversed_str())); - return BlocksHeadersVerificationResult::Skip; - }, - block_state => { - trace!(target: "sync", "Ignoring {} headers from peer#{} - known ({:?}) header {} at the {}/{} ({}...{})", - headers.len(), peer_index, block_state, header.hash.to_reversed_str(), header_index, headers.len(), - headers[0].hash.to_reversed_str(), headers[headers.len() - 1].hash.to_reversed_str()); - self.peers_tasks.useful_peer(peer_index); - return BlocksHeadersVerificationResult::Skip; - }, - } - - // verify header - if self.verify_headers { - if let Err(error) = self.chain_verifier.verify_block_header(&headers_provider, &header.hash, &header.raw) { - if self.config.close_connection_on_bad_block { - self.peers.misbehaving(peer_index, &format!("Error verifying header {} from `headers`: {:?}", header.hash.to_reversed_str(), error)); - } else { - warn!(target: "sync", "Error verifying header {} from `headers` message: {:?}", header.hash.to_reversed_str(), error); - } - return BlocksHeadersVerificationResult::Error(header_index); - } - } - - last_known_hash = &header.hash; - headers_provider.append_header(header.hash.clone(), header.clone()); - } - - BlocksHeadersVerificationResult::Success - } - /// Process new peer transaction fn process_peer_transaction(&mut self, _peer_index: Option, transaction: IndexedTransaction, relay: bool) -> Option> { match self.try_append_transaction(transaction.clone(), relay) { @@ -1052,6 +1004,71 @@ impl SynchronizationClientCore where T: TaskExecutor { } } + fn find_unknown_headers(&self, mut headers: Vec) -> Vec { + // find first unknown header position + // optimization: normally, the first header will be unknown + let num_headers = headers.len(); + let first_unknown_index = match self.chain.block_state(&headers[0].hash) { + BlockState::Unknown => 0, + _ => { + // optimization: if last header is known, then all headers are also known + let header_last = &headers[num_headers - 1]; + match self.chain.block_state(&header_last.hash) { + BlockState::Unknown => 1 + headers.iter().skip(1) + .position(|header| self.chain.block_state(&header.hash) == BlockState::Unknown) + .expect("last header has UnknownState; we are searching for first unknown header; qed"), + // else all headers are known + _ => headers.len(), + } + } + }; + + if first_unknown_index == 0 { headers } else { headers.split_off(first_unknown_index) } + } + + fn on_headers_verification_success(&mut self, headers: Vec) { + let headers = self.find_unknown_headers(headers); + if headers.is_empty() { + return; + } + + self.chain.schedule_blocks_headers(headers); + + // switch to synchronization state + if !self.state.is_synchronizing() { + if self.chain.length_of_blocks_state(BlockState::Scheduled) + + self.chain.length_of_blocks_state(BlockState::Requested) == 1 { + self.switch_to_nearly_saturated_state(); + } else { + self.switch_to_synchronization_state(); + } + } + self.execute_synchronization_tasks(None, None); + } + + fn on_headers_verification_error(&mut self, peer: PeerIndex, error: String, hash: H256) { + if self.config.close_connection_on_bad_block { + self.peers.misbehaving( + peer, + &format!( + "Error verifying header {} from `headers`: {:?}", + hash.to_reversed_str(), + error, + ), + ); + } else { + warn!( + target: "sync", + "Error verifying header {} from `headers` message: {:?}", + hash.to_reversed_str(), + error, + ); + } + + self.chain.mark_dead_end_block(&hash); + self.execute_synchronization_tasks(None, None); + } + fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option> { // update block processing speed self.block_speed_meter.checkpoint(); @@ -1301,17 +1318,16 @@ pub mod tests { let config = Config { close_connection_on_bad_block: true }; let chain_verifier = Arc::new(ChainVerifier::new(storage.clone(), ConsensusParams::new(Network::Unitest))); - let client_core = SynchronizationClientCore::new(config, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier.clone()); - { - client_core.lock().set_verify_headers(false); - } - let mut verifier = verifier.unwrap_or_default(); - verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone()))); - verifier.set_storage(storage); - verifier.set_memory_pool(memory_pool); - verifier.set_verifier(chain_verifier); + let client_core = SynchronizationClientCore::new(config, sync_state.clone(), sync_peers.clone(), executor.clone(), chain); + let mut light_verifier = DummyVerifier::default(); + light_verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone()))); + let mut heavy_verifier = verifier.unwrap_or_default(); + heavy_verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone()))); + heavy_verifier.set_storage(storage); + heavy_verifier.set_memory_pool(memory_pool); + heavy_verifier.set_verifier(chain_verifier); - let client = SynchronizationClient::new(sync_state, client_core.clone(), verifier); + let client = SynchronizationClient::new(sync_state, client_core.clone(), light_verifier, heavy_verifier); (executor, client_core, client) } @@ -2125,7 +2141,7 @@ pub mod tests { chain.mark_dead_end_block(&b1.hash()); } - core.lock().set_verify_headers(true); +// core.lock().set_verify_headers(true); core.lock().peers.insert(0, Services::default(), DummyOutboundSyncConnection::new()); assert!(core.lock().peers.enumerate().contains(&0)); diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index 51eef646..84d06022 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -5,15 +5,23 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use parking_lot::Mutex; use time::get_time; -use chain::{IndexedBlock, IndexedTransaction}; +use chain::{IndexedBlockHeader, IndexedBlock, IndexedTransaction}; use network::ConsensusParams; use primitives::hash::H256; use verification::{BackwardsCompatibleChainVerifier as ChainVerifier, Verify as VerificationVerify, Error as VerificationError, VerificationLevel}; -use types::{BlockHeight, StorageRef, MemoryPoolRef}; +use types::{PeerIndex, BlockHeight, StorageRef, MemoryPoolRef}; use utils::MemoryPoolTransactionOutputProvider; use VerificationParameters; +/// Headers verification events sink +pub trait HeadersVerificationSink : Send + Sync + 'static { + /// When headers verification has completed successfully. + fn on_headers_verification_success(&self, headers: Vec); + /// When headers verification has failed. + fn on_headers_verification_error(&self, peer: PeerIndex, error: String, hash: H256); +} + /// Block verification events sink pub trait BlockVerificationSink : Send + Sync + 'static { /// When block verification has completed successfully. @@ -31,12 +39,14 @@ pub trait TransactionVerificationSink : Send + Sync + 'static { } /// Verification events sink -pub trait VerificationSink : BlockVerificationSink + TransactionVerificationSink { +pub trait VerificationSink : HeadersVerificationSink + BlockVerificationSink + TransactionVerificationSink { } /// Verification thread tasks #[derive(Debug)] pub enum VerificationTask { + /// Verify headers + VerifyHeaders(PeerIndex, Vec), /// Verify single block VerifyBlock(IndexedBlock), /// Verify single transaction @@ -47,6 +57,8 @@ pub enum VerificationTask { /// Synchronization verifier pub trait Verifier : Send + Sync + 'static { + /// Verify headers + fn verify_headers(&self, peer: PeerIndex, headers: Vec); /// Verify block fn verify_block(&self, block: IndexedBlock); /// Verify transaction @@ -67,8 +79,10 @@ pub struct ChainVerifierWrapper { pub verifier: Arc, /// Verification parameters. verification_params: VerificationParameters, - /// Is verification edge passed. + /// True if we have passed verification edge && full verification is required. pub enforce_full_verification: AtomicBool, + /// True if we need to actually verify headers. + verify_headers: bool, } impl ChainVerifierWrapper { @@ -79,6 +93,15 @@ impl ChainVerifierWrapper { verifier: verifier, verification_params: verification_params, enforce_full_verification: enforce_full_verification, + verify_headers: false, + } + } + + /// Verify header. + pub fn verify_block_header(&self, header: &IndexedBlockHeader) -> Result<(), VerificationError> { + match self.verify_headers { + true => self.verifier.verify_block_header(header), + false => Ok(()), } } @@ -138,7 +161,13 @@ impl AsyncVerifier { } /// Execute single verification task - pub fn execute_single_task(sink: &Arc, storage: &StorageRef, memory_pool: &MemoryPoolRef, verifier: &ChainVerifierWrapper, task: VerificationTask) -> bool { + pub fn execute_single_task( + sink: &Arc, + storage: &StorageRef, + memory_pool: &MemoryPoolRef, + verifier: &ChainVerifierWrapper, + task: VerificationTask, + ) -> bool { // block verification && insertion can lead to reorganization // => transactions from decanonized blocks should be put back to the MemoryPool // => they must be verified again @@ -148,6 +177,15 @@ impl AsyncVerifier { while let Some(task) = tasks_queue.pop_front() { match task { + VerificationTask::VerifyHeaders(peer, headers) => { + let result = headers.iter() + .try_for_each(|header| verifier.verify_block_header(header) + .map_err(|error| (error, header.hash))); + match result { + Ok(_) => sink.on_headers_verification_success(headers), + Err((error, hash)) => sink.on_headers_verification_error(peer, format!("{:?}", error), hash), + } + }, VerificationTask::VerifyBlock(block) => { // verify block match verifier.verify_block(&block) { @@ -185,7 +223,6 @@ impl AsyncVerifier { } } - impl Drop for AsyncVerifier { fn drop(&mut self) { if let Some(join_handle) = self.verification_worker_thread.take() { @@ -200,6 +237,13 @@ impl Drop for AsyncVerifier { } impl Verifier for AsyncVerifier { + /// Verify headers + fn verify_headers(&self, peer: PeerIndex, headers: Vec) { + self.verification_work_sender.lock() + .send(VerificationTask::VerifyHeaders(peer, headers)) + .expect("Verification thread have the same lifetime as `AsyncVerifier`"); + } + /// Verify block fn verify_block(&self, block: IndexedBlock) { self.verification_work_sender.lock() @@ -236,6 +280,11 @@ impl SyncVerifier where T: VerificationSink { } impl Verifier for SyncVerifier where T: VerificationSink { + /// Verify headers + fn verify_headers(&self, _peer: PeerIndex, _headers: Vec) { + unreachable!("SyncVerifier is used only for blocks verification") + } + /// Verify block fn verify_block(&self, block: IndexedBlock) { match self.verifier.verify_block(&block) { @@ -251,7 +300,7 @@ impl Verifier for SyncVerifier where T: VerificationSink { /// Verify transaction fn verify_transaction(&self, _height: BlockHeight, _transaction: IndexedTransaction) { - unimplemented!() // sync verifier is currently only used for blocks verification + unreachable!("SyncVerifier is used only for blocks verification") } } @@ -269,9 +318,10 @@ pub mod tests { use synchronization_client_core::CoreVerificationSink; use synchronization_executor::tests::DummyTaskExecutor; use primitives::hash::H256; - use chain::{IndexedBlock, IndexedTransaction}; - use super::{Verifier, BlockVerificationSink, TransactionVerificationSink, AsyncVerifier, VerificationTask, ChainVerifierWrapper}; - use types::{BlockHeight, StorageRef, MemoryPoolRef}; + use chain::{IndexedBlockHeader, IndexedBlock, IndexedTransaction}; + use super::{Verifier, HeadersVerificationSink, BlockVerificationSink, TransactionVerificationSink, + AsyncVerifier, VerificationTask, ChainVerifierWrapper}; + use types::{PeerIndex, BlockHeight, StorageRef, MemoryPoolRef}; use VerificationParameters; #[derive(Default)] @@ -314,6 +364,13 @@ pub mod tests { } impl Verifier for DummyVerifier { + fn verify_headers(&self, _peer: PeerIndex, headers: Vec) { + match self.sink { + Some(ref sink) => sink.on_headers_verification_success(headers), + _ => (), + } + } + fn verify_block(&self, block: IndexedBlock) { match self.sink { Some(ref sink) => match self.errors.get(&block.hash()) { diff --git a/sync/src/utils/message_block_headers_provider.rs b/sync/src/utils/message_block_headers_provider.rs deleted file mode 100644 index 4b117ab4..00000000 --- a/sync/src/utils/message_block_headers_provider.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::collections::HashMap; -use chain::IndexedBlockHeader; -use storage::{BlockRef, BlockHeaderProvider}; -use primitives::bytes::Bytes; -use primitives::hash::H256; - -/// Block headers provider from `headers` message -pub struct MessageBlockHeadersProvider<'a> { - /// Synchronization chain headers provider - chain_provider: &'a BlockHeaderProvider, - /// headers offset - first_header_number: u32, - /// headers by hash - headers: HashMap, - /// headers by order - headers_order: Vec, -} - -impl<'a> MessageBlockHeadersProvider<'a> { - pub fn new(chain_provider: &'a BlockHeaderProvider, best_block_header_height: u32) -> Self { - MessageBlockHeadersProvider { - chain_provider: chain_provider, - first_header_number: best_block_header_height + 1, - headers: HashMap::new(), - headers_order: Vec::new(), - } - } - - pub fn append_header(&mut self, hash: H256, header: IndexedBlockHeader) { - self.headers.insert(hash.clone(), header); - self.headers_order.push(hash); - } -} - -impl<'a> BlockHeaderProvider for MessageBlockHeadersProvider<'a> { - fn block_header_bytes(&self, block_ref: BlockRef) -> Option { - use ser::serialize; - self.block_header(block_ref).map(|h| serialize(&h.raw)) - } - - fn block_header(&self, block_ref: BlockRef) -> Option { - self.chain_provider.block_header(block_ref.clone()) - .or_else(move || match block_ref { - BlockRef::Hash(h) => self.headers.get(&h).cloned(), - BlockRef::Number(n) => if n >= self.first_header_number && n - self.first_header_number < self.headers_order.len() as u32 { - let header_hash = &self.headers_order[(n - self.first_header_number) as usize]; - Some(self.headers[header_hash].clone()) - } else { - None - }, - }) - } -} - -#[cfg(test)] -mod tests { - extern crate test_data; - - use storage::{AsSubstore, BlockHeaderProvider, BlockRef}; - use db::BlockChainDatabase; - use primitives::hash::H256; - use super::MessageBlockHeadersProvider; - - #[test] - fn test_message_block_headers_provider() { - let storage = BlockChainDatabase::init_test_chain(vec![test_data::genesis().into()]); - let storage_provider = storage.as_block_header_provider(); - let mut headers_provider = MessageBlockHeadersProvider::new(storage_provider, 0); - - assert_eq!(headers_provider.block_header(BlockRef::Hash(test_data::genesis().hash())), Some(test_data::genesis().block_header.into())); - assert_eq!(headers_provider.block_header(BlockRef::Number(0)), Some(test_data::genesis().block_header.into())); - assert_eq!(headers_provider.block_header(BlockRef::Hash(H256::from(1))), None); - assert_eq!(headers_provider.block_header(BlockRef::Number(1)), None); - - headers_provider.append_header(test_data::block_h1().hash(), test_data::block_h1().block_header.into()); - - assert_eq!(headers_provider.block_header(BlockRef::Hash(test_data::genesis().hash())), Some(test_data::genesis().block_header.into())); - assert_eq!(headers_provider.block_header(BlockRef::Number(0)), Some(test_data::genesis().block_header.into())); - assert_eq!(headers_provider.block_header(BlockRef::Hash(test_data::block_h1().hash())), Some(test_data::block_h1().block_header.into())); - assert_eq!(headers_provider.block_header(BlockRef::Number(1)), Some(test_data::block_h1().block_header.into())); - assert_eq!(headers_provider.block_header(BlockRef::Hash(H256::from(1))), None); - assert_eq!(headers_provider.block_header(BlockRef::Number(2)), None); - } -} diff --git a/sync/src/utils/mod.rs b/sync/src/utils/mod.rs index a08f8883..27fecee1 100644 --- a/sync/src/utils/mod.rs +++ b/sync/src/utils/mod.rs @@ -6,7 +6,6 @@ mod fee_rate_filter; mod hash_queue; mod known_hash_filter; mod memory_pool_transaction_provider; -mod message_block_headers_provider; mod orphan_blocks_pool; mod orphan_transactions_pool; mod partial_merkle_tree; @@ -20,7 +19,6 @@ pub use self::fee_rate_filter::FeeRateFilter; pub use self::hash_queue::{HashQueue, HashQueueChain, HashPosition}; pub use self::known_hash_filter::{KnownHashType, KnownHashFilter}; pub use self::memory_pool_transaction_provider::MemoryPoolTransactionOutputProvider; -pub use self::message_block_headers_provider::MessageBlockHeadersProvider; pub use self::orphan_blocks_pool::OrphanBlocksPool; pub use self::orphan_transactions_pool::{OrphanTransactionsPool, OrphanTransaction}; pub use self::partial_merkle_tree::{PartialMerkleTree, build_partial_merkle_tree}; diff --git a/verification/src/chain_verifier.rs b/verification/src/chain_verifier.rs index 33954109..5b2c2b88 100644 --- a/verification/src/chain_verifier.rs +++ b/verification/src/chain_verifier.rs @@ -1,7 +1,6 @@ //! Bitcoin chain verifier -use hash::H256; -use chain::{IndexedBlock, IndexedBlockHeader, BlockHeader, IndexedTransaction}; +use chain::{IndexedBlock, IndexedBlockHeader, IndexedTransaction}; use storage::{SharedStore, TransactionOutputProvider, BlockHeaderProvider, BlockOrigin, DuplexTransactionOutputProvider, NoopStore}; use network::ConsensusParams; @@ -85,15 +84,10 @@ impl BackwardsCompatibleChainVerifier { pub fn verify_block_header( &self, - _block_header_provider: &BlockHeaderProvider, - hash: &H256, - header: &BlockHeader + header: &IndexedBlockHeader, ) -> Result<(), Error> { - // let's do only preverifcation - // TODO: full verification let current_time = ::time::get_time().sec as u32; - let header = IndexedBlockHeader::new(hash.clone(), header.clone()); - let header_verifier = HeaderVerifier::new(&header, &self.consensus, current_time); + let header_verifier = HeaderVerifier::new(header, &self.consensus, current_time); header_verifier.check() }