From 32e21d6e37c21fa3d36a75781d984da6d75048b1 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 28 Nov 2016 15:55:00 +0300 Subject: [PATCH] ongoing sync refactoring --- db/src/indexed_block.rs | 1 + sync/src/blocks_writer.rs | 5 +++-- sync/src/orphan_blocks_pool.rs | 11 ++++++----- sync/src/synchronization_chain.rs | 6 +++--- sync/src/synchronization_client.rs | 26 +++++++++++++------------- sync/src/synchronization_verifier.rs | 11 ++++++----- 6 files changed, 32 insertions(+), 28 deletions(-) diff --git a/db/src/indexed_block.rs b/db/src/indexed_block.rs index eda7c915..139e0c4f 100644 --- a/db/src/indexed_block.rs +++ b/db/src/indexed_block.rs @@ -2,6 +2,7 @@ use chain; use primitives::hash::H256; use serialization::Serializable; +#[derive(Debug)] pub struct IndexedBlock { header: chain::BlockHeader, header_hash: H256, diff --git a/sync/src/blocks_writer.rs b/sync/src/blocks_writer.rs index 8bd15f54..a103f353 100644 --- a/sync/src/blocks_writer.rs +++ b/sync/src/blocks_writer.rs @@ -19,12 +19,13 @@ impl BlocksWriter { } pub fn append_block(&mut self, block: chain::Block) -> Result<(), Error> { + let indexed_block: db::IndexedBlock = block.into(); // TODO: share same verification code with synchronization_client - if self.storage.best_block().map_or(false, |bb| bb.hash != block.block_header.previous_header_hash) { + if self.storage.best_block().map_or(false, |bb| bb.hash != indexed_block.header().previous_header_hash) { return Err(Error::OutOfOrderBlock); } - match self.verifier.verify(&block) { + match self.verifier.verify(&indexed_block) { Err(err) => Err(Error::Verification(err)), Ok(_chain) => { try!(self.storage.insert_block(&block).map_err(Error::Database)); Ok(()) } } diff --git a/sync/src/orphan_blocks_pool.rs b/sync/src/orphan_blocks_pool.rs index ef82f2d2..ad7a6874 100644 --- a/sync/src/orphan_blocks_pool.rs +++ b/sync/src/orphan_blocks_pool.rs @@ -4,13 +4,14 @@ use linked_hash_map::LinkedHashMap; use time; use chain::Block; use primitives::hash::H256; +use db::IndexedBlock; #[derive(Debug)] /// Storage for blocks, for which we have no parent yet. /// Blocks from this storage are either moved to verification queue, or removed at all. pub struct OrphanBlocksPool { /// Blocks from requested_hashes, but received out-of-order. - orphaned_blocks: HashMap>, + orphaned_blocks: HashMap>, /// Blocks that we have received without requesting with receiving time. unknown_blocks: LinkedHashMap, } @@ -41,7 +42,7 @@ impl OrphanBlocksPool { } /// Insert orphaned block, for which we have already requested its parent block - pub fn insert_orphaned_block(&mut self, hash: H256, block: Block) { + pub fn insert_orphaned_block(&mut self, hash: H256, block: IndexedBlock) { self.orphaned_blocks .entry(block.block_header.previous_header_hash.clone()) .or_insert_with(HashMap::new) @@ -49,7 +50,7 @@ impl OrphanBlocksPool { } /// Insert unknown block, for which we know nothing about its parent block - pub fn insert_unknown_block(&mut self, hash: H256, block: Block) { + pub fn insert_unknown_block(&mut self, hash: H256, block: IndexedBlock) { let previous_value = self.unknown_blocks.insert(hash.clone(), time::precise_time_s()); assert_eq!(previous_value, None); @@ -67,7 +68,7 @@ impl OrphanBlocksPool { } /// Remove all blocks, depending on this parent - pub fn remove_blocks_for_parent(&mut self, hash: &H256) -> Vec<(H256, Block)> { + pub fn remove_blocks_for_parent(&mut self, hash: &H256) -> Vec<(H256, IndexedBlock)> { let mut queue: VecDeque = VecDeque::new(); queue.push_back(hash.clone()); @@ -86,7 +87,7 @@ impl OrphanBlocksPool { } /// Remove blocks with given hashes + all dependent blocks - pub fn remove_blocks(&mut self, hashes: &HashSet) -> Vec<(H256, Block)> { + pub fn remove_blocks(&mut self, hashes: &HashSet) -> Vec<(H256, IndexedBlock)> { // TODO: excess clone let mut removed: Vec<(H256, Block)> = Vec::new(); let parent_orphan_keys: Vec<_> = self.orphaned_blocks.keys().cloned().collect(); diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 2e4cd336..fe639e5b 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -4,7 +4,7 @@ use std::collections::VecDeque; use linked_hash_map::LinkedHashMap; use parking_lot::RwLock; use chain::{Block, BlockHeader, Transaction}; -use db; +use db::{self, IndexedBlock}; use best_headers_chain::{BestHeadersChain, Information as BestHeadersInformation}; use primitives::bytes::Bytes; use primitives::hash::H256; @@ -309,7 +309,7 @@ impl Chain { } /// Insert new best block to storage - pub fn insert_best_block(&mut self, hash: H256, block: &Block) -> Result { + pub fn insert_best_block(&mut self, hash: H256, block: &IndexedBlock) -> Result { let is_appending_to_main_branch = self.best_storage_block.hash == block.block_header.previous_header_hash; // insert to storage @@ -353,7 +353,7 @@ impl Chain { // all transactions from this block were accepted // + all transactions from previous blocks of this fork were accepted // => delete accepted transactions from verification queue and from the memory pool - let this_block_transactions_hashes = block.transactions.iter().map(|tx| tx.hash()); + let this_block_transactions_hashes = block.transaction_hashes(); let mut canonized_blocks_hashes: Vec = Vec::new(); let mut new_main_blocks_transactions_hashes: Vec = Vec::new(); while let Some(canonized_block_hash) = reorganization.pop_canonized() { diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 244e6e32..cfcd4174 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -7,7 +7,7 @@ use futures::{BoxFuture, Future, finished}; use futures::stream::Stream; use tokio_core::reactor::{Handle, Interval}; use futures_cpupool::CpuPool; -use db; +use db::{self, IndexedBlock}; use chain::{Block, BlockHeader, Transaction}; use message::types; use message::common::{InventoryVector, InventoryType}; @@ -191,7 +191,7 @@ pub trait Client : Send + 'static { fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec); fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec); fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec); - fn on_peer_block(&mut self, peer_index: usize, block: Block); + fn on_peer_block(&mut self, peer_index: usize, block: IndexedBlock); fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction); fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad); fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd); @@ -212,7 +212,7 @@ pub trait ClientCore : VerificationSink { fn on_new_transactions_inventory(&mut self, peer_index: usize, transactions_hashes: Vec); fn on_new_blocks_headers(&mut self, peer_index: usize, blocks_headers: Vec); fn on_peer_blocks_notfound(&mut self, peer_index: usize, blocks_hashes: Vec); - fn on_peer_block(&mut self, peer_index: usize, block: Block) -> Option>; + fn on_peer_block(&mut self, peer_index: usize, block: IndexedBlock) -> Option>; fn on_peer_transaction(&mut self, peer_index: usize, transaction: Transaction) -> Option>; fn on_peer_filterload(&mut self, peer_index: usize, message: &types::FilterLoad); fn on_peer_filteradd(&mut self, peer_index: usize, message: &types::FilterAdd); @@ -371,8 +371,8 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri self.core.lock().on_peer_blocks_notfound(peer_index, blocks_hashes); } - fn on_peer_block(&mut self, peer_index: usize, block: Block) { - let blocks_to_verify = { self.core.lock().on_peer_block(peer_index, block) }; + fn on_peer_block(&mut self, peer_index: usize, block: IndexedBlock) { + let blocks_to_verify = self.core.lock().on_peer_block(peer_index, block); // verify selected blocks if let Some(mut blocks_to_verify) = blocks_to_verify { @@ -609,13 +609,13 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { } /// Process new block. - fn on_peer_block(&mut self, peer_index: usize, block: Block) -> Option> { + fn on_peer_block(&mut self, peer_index: usize, block: IndexedBlock) -> Option> { let block_hash = block.hash(); // update peers to select next tasks self.peers.on_block_received(peer_index, &block_hash); - self.process_peer_block(peer_index, block_hash, block) + self.process_peer_block(peer_index, block_hash.clone(), block) } /// Process new transaction. @@ -776,7 +776,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { impl VerificationSink for SynchronizationClientCore where T: TaskExecutor { /// Process successful block verification - fn on_block_verification_success(&mut self, block: Block) { + fn on_block_verification_success(&mut self, block: IndexedBlock) { let hash = block.hash(); // insert block to the storage match { @@ -1114,9 +1114,9 @@ impl SynchronizationClientCore where T: TaskExecutor { } /// Process new peer block - fn process_peer_block(&mut self, peer_index: usize, block_hash: H256, block: Block) -> Option> { + fn process_peer_block(&mut self, peer_index: usize, block_hash: H256, block: IndexedBlock) -> Option> { // prepare list of blocks to verify + make all required changes to the chain - let mut result: Option> = None; + let mut result: Option> = None; let mut chain = self.chain.write(); match chain.block_state(&block_hash) { BlockState::Verifying | BlockState::Stored => { @@ -1125,7 +1125,7 @@ impl SynchronizationClientCore where T: TaskExecutor { }, BlockState::Unknown | BlockState::Scheduled | BlockState::Requested => { // check parent block state - match chain.block_state(&block.block_header.previous_header_hash) { + match chain.block_state(&block.header().previous_header_hash) { BlockState::Unknown => { if self.state.is_synchronizing() { // when synchronizing, we tend to receive all blocks in-order @@ -1153,14 +1153,14 @@ impl SynchronizationClientCore where T: TaskExecutor { // remember peer as useful self.peers.useful_peer(peer_index); // schedule verification - let mut blocks_to_verify: VecDeque<(H256, Block)> = VecDeque::new(); + let mut blocks_to_verify: VecDeque<(H256, IndexedBlock)> = VecDeque::new(); blocks_to_verify.push_back((block_hash.clone(), block)); blocks_to_verify.extend(self.orphaned_blocks_pool.remove_blocks_for_parent(&block_hash)); // forget blocks we are going to process let blocks_hashes_to_forget: Vec<_> = blocks_to_verify.iter().map(|t| t.0.clone()).collect(); chain.forget_blocks_leave_header(&blocks_hashes_to_forget); // remember that we are verifying these blocks - let blocks_headers_to_verify: Vec<_> = blocks_to_verify.iter().map(|&(ref h, ref b)| (h.clone(), b.block_header.clone())).collect(); + let blocks_headers_to_verify: Vec<_> = blocks_to_verify.iter().map(|&(ref h, ref b)| (h.clone(), b.header().clone())).collect(); chain.verify_blocks(blocks_headers_to_verify); // remember that we are verifying block from this peer self.verifying_blocks_by_peer.insert(block_hash.clone(), peer_index); diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index dd1dca29..986045e4 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -7,11 +7,12 @@ use network::{Magic, ConsensusParams}; use primitives::hash::H256; use verification::{ChainVerifier, Verify as VerificationVerify}; use synchronization_chain::ChainRef; +use db::IndexedBlock; /// Verification events sink pub trait VerificationSink : Send + 'static { /// When block verification has completed successfully. - fn on_block_verification_success(&mut self, block: Block); + fn on_block_verification_success(&mut self, block: IndexedBlock); /// When block verification has failed. fn on_block_verification_error(&mut self, err: &str, hash: &H256); /// When transaction verification has completed successfully. @@ -23,7 +24,7 @@ pub trait VerificationSink : Send + 'static { /// Verification thread tasks enum VerificationTask { /// Verify single block - VerifyBlock(Block), + VerifyBlock(IndexedBlock), /// Verify single transaction VerifyTransaction(Transaction), /// Stop verification thread @@ -33,7 +34,7 @@ enum VerificationTask { /// Synchronization verifier pub trait Verifier : Send + 'static { /// Verify block - fn verify_block(&self, block: Block); + fn verify_block(&self, block: IndexedBlock); /// Verify transaction fn verify_transaction(&self, transaction: Transaction); } @@ -73,7 +74,7 @@ impl AsyncVerifier { match task { VerificationTask::VerifyBlock(block) => { // for changes that are not relying on block# - let is_bip16_active_on_block = block.block_header.time >= bip16_time_border; + let is_bip16_active_on_block = block.header().time >= bip16_time_border; let force_parameters_change = is_bip16_active_on_block != is_bip16_active; if force_parameters_change { parameters_change_steps = Some(0); @@ -132,7 +133,7 @@ impl Drop for AsyncVerifier { impl Verifier for AsyncVerifier { /// Verify block - fn verify_block(&self, block: Block) { + fn verify_block(&self, block: IndexedBlock) { self.verification_work_sender .send(VerificationTask::VerifyBlock(block)) .expect("Verification thread have the same lifetime as `AsyncVerifier`");