From fac3864f453d5f4c050dd5c91a41594f6a3e77fd Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Thu, 1 Dec 2016 14:55:21 +0300 Subject: [PATCH] connected sync to verify_block_header --- sync/src/lib.rs | 2 +- sync/src/local_node.rs | 3 +- sync/src/synchronization_chain.rs | 14 +++++ sync/src/synchronization_client.rs | 91 ++++++++++++++++++++++++++++-- verification/src/utils.rs | 5 +- 5 files changed, 107 insertions(+), 8 deletions(-) diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 9f9ee979..2a0ae7c4 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -77,7 +77,7 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S let sync_chain = Arc::new(RwLock::new(SyncChain::new(db))); let sync_executor = SyncExecutor::new(sync_chain.clone()); let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone())); - let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone()); + let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone(), network); let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone()); let sync_client = SynchronizationClient::new(sync_client_core, verifier); let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor)); diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index 6222c3cf..02a049c8 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -265,6 +265,7 @@ mod tests { use p2p::{event_loop, OutboundSyncConnection, OutboundSyncConnectionRef}; use message::types; use message::common::{InventoryVector, InventoryType, BlockTransactionsRequest}; + use network::Magic; use db; use super::LocalNode; use test_data; @@ -313,7 +314,7 @@ mod tests { let executor = DummyTaskExecutor::new(); let server = Arc::new(DummyServer::new()); let config = Config { threads_num: 1 }; - let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone()); + let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Mainnet); let mut verifier = DummyVerifier::default(); verifier.set_sink(client_core.clone()); let client = SynchronizationClient::new(client_core, verifier); diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 75941f04..cb6c6422 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -212,6 +212,20 @@ impl Chain { self.best_storage_block.clone() } + /// Get best block header + pub fn best_block_header(&self) -> db::BestBlock { + let headers_chain_information = self.headers_chain.information(); + if headers_chain_information.best == 0 { + return self.best_storage_block() + } + db::BestBlock { + number: self.best_storage_block.number + headers_chain_information.best, + hash: self.headers_chain.at(headers_chain_information.best - 1) + .expect("got this index above; qed") + .hash(), + } + } + /// Get block header by hash pub fn block_hash(&self, number: u32) -> Option { if number <= self.best_storage_block.number { diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 9b54f738..5dd2ca4c 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -7,14 +7,15 @@ use futures::{BoxFuture, Future, finished}; use futures::stream::Stream; use tokio_core::reactor::{Handle, Interval}; use futures_cpupool::CpuPool; -use db::{self, IndexedBlock}; +use db::{self, IndexedBlock, BlockHeaderProvider, BlockRef}; use chain::{BlockHeader, Transaction}; use message::types; use message::common::{InventoryVector, InventoryType}; use primitives::hash::H256; +use primitives::bytes::Bytes; use synchronization_peers::Peers; #[cfg(test)] use synchronization_peers::{Information as PeersInformation}; -use synchronization_chain::{ChainRef, BlockState, TransactionState, HeadersIntersection, BlockInsertionResult}; +use synchronization_chain::{ChainRef, Chain, BlockState, TransactionState, HeadersIntersection, BlockInsertionResult}; #[cfg(test)] use synchronization_chain::{Information as ChainInformation}; use synchronization_executor::{Task, TaskExecutor}; @@ -28,6 +29,8 @@ use synchronization_verifier::{Verifier, VerificationSink, VerificationTask}; use compact_block_builder::build_compact_block; use hash_queue::HashPosition; use miner::transaction_fee_rate; +use verification::ChainVerifier; +use network::Magic; use time; use std::time::Duration; @@ -285,6 +288,10 @@ pub struct SynchronizationClientCore { orphaned_blocks_pool: OrphanBlocksPool, /// Orphaned transactions pool. orphaned_transactions_pool: OrphanTransactionsPool, + /// Network config + network: Magic, + /// Verify block headers? + verify_headers: bool, /// Verifying blocks by peer verifying_blocks_by_peer: HashMap, /// Verifying blocks futures @@ -293,6 +300,18 @@ pub struct SynchronizationClientCore { do_not_relay: HashSet, } +/// Block headers provider from `headers` message +struct MessageBlockHeadersProvider<'a> { + /// sync chain + chain: &'a Chain, + /// headers offset + first_header_number: u32, + /// headers by hash + headers: HashMap, + /// headers by order + headers_order: Vec, +} + impl Config { pub fn new() -> Self { Config { @@ -609,17 +628,29 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { return; } - // TODO: add full blocks headers validation here // validate blocks headers before scheduling + let chain = self.chain.read(); + let verifier = ChainVerifier::new(chain.storage(), self.network); + let mut block_header_provider = MessageBlockHeadersProvider::new(&*chain); let mut blocks_hashes: Vec = Vec::with_capacity(blocks_headers.len()); let mut prev_block_hash = header0.previous_header_hash.clone(); for block_header in &blocks_headers { let block_header_hash = block_header.hash(); + // check that this header is direct child of previous header if block_header.previous_header_hash != prev_block_hash { warn!(target: "sync", "Neighbour headers in peer#{} `headers` message are unlinked: Prev: {:?}, PrevLink: {:?}, Curr: {:?}", peer_index, prev_block_hash, block_header.previous_header_hash, block_header_hash); return; } + // verify header + if self.verify_headers { + if let Err(error) = verifier.verify_block_header(&block_header_provider, &block_header_hash, &block_header) { + warn!(target: "sync", "Error verifying header {:?} from peer#{} `headers` message: {:?}", block_header_hash.to_reversed_str(), peer_index, error); + return; + } + } + + block_header_provider.append_header(block_header_hash.clone(), block_header.clone()); blocks_hashes.push(block_header_hash.clone()); prev_block_hash = block_header_hash; } @@ -948,7 +979,7 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor impl SynchronizationClientCore where T: TaskExecutor { /// Create new synchronization client core - pub fn new(config: Config, handle: &Handle, executor: Arc>, chain: ChainRef) -> Arc> { + pub fn new(config: Config, handle: &Handle, executor: Arc>, chain: ChainRef, network: Magic) -> Arc> { let sync = Arc::new(Mutex::new( SynchronizationClientCore { state: State::Saturated, @@ -959,6 +990,8 @@ impl SynchronizationClientCore where T: TaskExecutor { chain: chain.clone(), orphaned_blocks_pool: OrphanBlocksPool::new(), orphaned_transactions_pool: OrphanTransactionsPool::new(), + network: network, + verify_headers: true, verifying_blocks_by_peer: HashMap::new(), verifying_blocks_futures: HashMap::new(), do_not_relay: HashSet::new(), @@ -1016,6 +1049,12 @@ impl SynchronizationClientCore where T: TaskExecutor { } } + /// Verify block headers or not? + #[cfg(test)] + pub fn verify_headers(&mut self, verify: bool) { + self.verify_headers = verify; + } + /// Relay new blocks fn relay_new_blocks(&mut self, new_blocks_hashes: Vec) { let tasks: Vec<_> = { @@ -1397,6 +1436,44 @@ impl SynchronizationClientCore where T: TaskExecutor { } } +impl<'a> MessageBlockHeadersProvider<'a> { + pub fn new(chain: &'a Chain) -> Self { + let first_header_number = chain.best_block_header().number + 1; + MessageBlockHeadersProvider { + chain: chain, + first_header_number: first_header_number, + headers: HashMap::new(), + headers_order: Vec::new(), + } + } + + pub fn append_header(&mut self, hash: H256, header: BlockHeader) { + self.headers.insert(hash.clone(), header); + self.headers_order.push(hash); + } +} + +impl<'a> BlockHeaderProvider for MessageBlockHeadersProvider<'a> { + fn block_header_bytes(&self, block_ref: BlockRef) -> Option { + use ser::serialize; + self.block_header(block_ref).map(|h| serialize(&h)) + } + + fn block_header(&self, block_ref: BlockRef) -> Option { + match block_ref { + BlockRef::Hash(h) => self.chain.block_header_by_hash(&h) + .or_else(|| self.headers.get(&h).cloned()), + BlockRef::Number(n) => self.chain.block_header_by_number(n) + .or_else(|| if n >= self.first_header_number && n - self.first_header_number < self.headers_order.len() as u32 { + let ref header_hash = self.headers_order[(n - self.first_header_number) as usize]; + Some(self.headers[header_hash].clone()) + } else { + None + }), + } + } +} + #[cfg(test)] pub mod tests { use std::sync::Arc; @@ -1413,6 +1490,7 @@ pub mod tests { use synchronization_verifier::tests::DummyVerifier; use synchronization_server::ServerTaskIndex; use primitives::hash::H256; + use network::Magic; use p2p::event_loop; use test_data; use db; @@ -1434,7 +1512,10 @@ pub mod tests { let executor = DummyTaskExecutor::new(); let config = Config { threads_num: 1 }; - let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone()); + let client_core = SynchronizationClientCore::new(config, &handle, executor.clone(), chain.clone(), Magic::Testnet); + { + client_core.lock().verify_headers(false); + } let mut verifier = verifier.unwrap_or_default(); verifier.set_sink(client_core.clone()); let client = SynchronizationClient::new(client_core, verifier); diff --git a/verification/src/utils.rs b/verification/src/utils.rs index e7cd094c..282c1d2a 100644 --- a/verification/src/utils.rs +++ b/verification/src/utils.rs @@ -62,7 +62,10 @@ pub fn check_nbits(max_nbits: u32, hash: &H256, n_bits: u32) -> bool { let mut nb = [0u8; 4]; BigEndian::write_u32(&mut nb, n_bits); - let shift = (nb[0] - 3) as usize; // total shift for mantissa + let shift = match nb[0].checked_sub(3) { + Some(v) => v, + None => return false, + } as usize; // total shift for mantissa if shift >= 30 { return false; } // invalid shift