From 9a6c5c8579091b5745be3315276770e1c36cef49 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 30 Nov 2016 12:16:29 +0300 Subject: [PATCH] support semi-unordered blocks import --- pbtc/commands/import.rs | 7 +- sync/src/blocks_writer.rs | 104 ++++++++++++++++++++++----- sync/src/lib.rs | 10 +-- sync/src/orphan_blocks_pool.rs | 1 - sync/src/synchronization_verifier.rs | 82 +++++++++++++++------ 5 files changed, 153 insertions(+), 51 deletions(-) diff --git a/pbtc/commands/import.rs b/pbtc/commands/import.rs index d65245f3..ce5c10ef 100644 --- a/pbtc/commands/import.rs +++ b/pbtc/commands/import.rs @@ -13,7 +13,6 @@ pub fn import(cfg: Config, matches: &ArgMatches) -> Result<(), String> { let blk_path = matches.value_of("PATH").expect("PATH is required in cli.yml; qed"); let blk_dir = try!(::import::open_blk_dir(blk_path).map_err(|_| "Import directory does not exist".to_owned())); let mut counter = 0; - let mut skipped = 0; for blk in blk_dir { // TODO: verify magic! let blk = try!(blk.map_err(|_| "Cannot read block".to_owned())); @@ -24,14 +23,12 @@ pub fn import(cfg: Config, matches: &ArgMatches) -> Result<(), String> { info!("Imported {} blocks", counter); } } - Err(Error::OutOfOrderBlock) => { - skipped += 1; - }, + Err(Error::TooManyOrphanBlocks) => return Err("Too many orphan (unordered) blocks".into()), Err(_) => return Err("Cannot append block".into()), } } - info!("Finished import of {} blocks. Skipped {} blocks.", counter, skipped); + info!("Finished import of {} blocks", counter); Ok(()) } diff --git a/sync/src/blocks_writer.rs b/sync/src/blocks_writer.rs index 8de6e695..1c4ee11e 100644 --- a/sync/src/blocks_writer.rs +++ b/sync/src/blocks_writer.rs @@ -1,45 +1,109 @@ use std::sync::Arc; +use std::collections::VecDeque; +use parking_lot::Mutex; use chain; use db; use network::Magic; -use verification::{Verify, ChainVerifier}; +use orphan_blocks_pool::OrphanBlocksPool; +use synchronization_verifier::{Verifier, SyncVerifier, VerificationSink}; +use primitives::hash::H256; use super::Error; +pub const MAX_ORPHANED_BLOCKS: usize = 64; + pub struct BlocksWriter { - storage: Arc, - verifier: ChainVerifier, + storage: db::SharedStore, + orphaned_blocks_pool: OrphanBlocksPool, + verifier: SyncVerifier, + sink: Arc>, +} + +struct BlocksWriterSink { + storage: db::SharedStore, + err: Option, } impl BlocksWriter { pub fn new(storage: db::SharedStore, network: Magic) -> BlocksWriter { + let sink = Arc::new(Mutex::new(BlocksWriterSink::new(storage.clone()))); + let verifier = SyncVerifier::new(network, storage.clone(), sink.clone()); BlocksWriter { - storage: storage.clone(), - verifier: ChainVerifier::new(storage, network), + storage: storage, + orphaned_blocks_pool: OrphanBlocksPool::new(), + verifier: verifier, + sink: sink, } } pub fn append_block(&mut self, block: chain::Block) -> Result<(), Error> { let indexed_block: db::IndexedBlock = block.into(); - // TODO: share same verification code with synchronization_client - if self.storage.best_block().map_or(false, |bb| bb.hash != indexed_block.header().previous_header_hash) { - return Err(Error::OutOfOrderBlock); + // verify && insert only if parent block is already in the storage + if !self.storage.contains_block(db::BlockRef::Hash(indexed_block.header().previous_header_hash.clone())) { + self.orphaned_blocks_pool.insert_orphaned_block(indexed_block.hash().clone(), indexed_block); + // we can't hold many orphaned blocks in memory during import + if self.orphaned_blocks_pool.len() > MAX_ORPHANED_BLOCKS { + return Err(Error::TooManyOrphanBlocks); + } + return Ok(()); } - match self.verifier.verify(&indexed_block) { - Err(err) => Err(Error::Verification(err)), - Ok(_chain) => { try!(self.storage.insert_indexed_block(&indexed_block).map_err(Error::Database)); Ok(()) } + // verify && insert block && all its orphan children + let mut verification_queue: VecDeque = self.orphaned_blocks_pool.remove_blocks_for_parent(indexed_block.hash()).into_iter().map(|(_, b)| b).collect(); + verification_queue.push_front(indexed_block); + while let Some(block) = verification_queue.pop_front() { + println!("Verifying {:?}", block.hash().to_reversed_str()); + self.verifier.verify_block(block); + if let Some(err) = self.sink.lock().error() { + return Err(err); + } } + + Ok(()) } } +impl BlocksWriterSink { + pub fn new(storage: db::SharedStore) -> Self { + BlocksWriterSink { + storage: storage, + err: None, + } + } + + pub fn error(&mut self) -> Option { + self.err.take() + } +} + +impl VerificationSink for BlocksWriterSink { + fn on_block_verification_success(&mut self, block: db::IndexedBlock) { + if let Err(err) = self.storage.insert_indexed_block(&block) { + self.err = Some(Error::Database(err)); + } + } + + fn on_block_verification_error(&mut self, err: &str, _hash: &H256) { + self.err = Some(Error::Verification(err.into())); + } + + fn on_transaction_verification_success(&mut self, _transaction: chain::Transaction) { + unreachable!("not intended to verify transactions") + } + + fn on_transaction_verification_error(&mut self, _err: &str, _hash: &H256) { + unreachable!("not intended to verify transactions") + } +} + + #[cfg(test)] mod tests { use std::sync::Arc; use db::{self, Store}; use network::Magic; - use {test_data, verification}; + use test_data; use super::super::Error; - use super::BlocksWriter; + use super::{BlocksWriter, MAX_ORPHANED_BLOCKS}; #[test] fn blocks_writer_appends_blocks() { @@ -52,11 +116,15 @@ mod tests { #[test] fn blocks_writer_verification_error() { let db = Arc::new(db::TestStorage::with_genesis_block()); + let blocks = test_data::build_n_empty_blocks_from_genesis((MAX_ORPHANED_BLOCKS + 2) as u32, 1); let mut blocks_target = BlocksWriter::new(db.clone(), Magic::Testnet); - match blocks_target.append_block(test_data::block_h2()).unwrap_err() { - Error::OutOfOrderBlock => (), - _ => panic!("Unexpected error"), - }; + for (index, block) in blocks.into_iter().skip(1).enumerate() { + match blocks_target.append_block(block) { + Err(Error::TooManyOrphanBlocks) if index == MAX_ORPHANED_BLOCKS => (), + Ok(_) if index != MAX_ORPHANED_BLOCKS => (), + _ => panic!("unexpected"), + } + } assert_eq!(db.best_block().expect("Block is inserted").number, 0); } @@ -69,7 +137,7 @@ mod tests { .header().parent(test_data::genesis().hash()).build() .build(); match blocks_target.append_block(wrong_block).unwrap_err() { - Error::Verification(verification::Error::Empty) => (), + Error::Verification(_) => (), _ => panic!("Unexpected error"), }; assert_eq!(db.best_block().expect("Block is inserted").number, 0); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index af2bb6df..38eee5a7 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -51,12 +51,12 @@ use network::Magic; /// Sync errors. #[derive(Debug)] pub enum Error { - /// Out of order block. - OutOfOrderBlock, + /// Too many orphan blocks. + TooManyOrphanBlocks, /// Database error. Database(db::Error), /// Block verification error. - Verification(verification::Error), + Verification(String), } /// Create blocks writer. @@ -74,11 +74,11 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig}; use synchronization_verifier::AsyncVerifier; - let sync_chain = Arc::new(RwLock::new(SyncChain::new(db))); + let sync_chain = Arc::new(RwLock::new(SyncChain::new(db.clone()))); let sync_executor = SyncExecutor::new(sync_chain.clone()); let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone())); let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone()); - let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone()); + let verifier = AsyncVerifier::new(network, db, sync_client_core.clone()); let sync_client = SynchronizationClient::new(sync_client_core, verifier); let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor)); SyncConnectionFactory::with_local_node(sync_node) diff --git a/sync/src/orphan_blocks_pool.rs b/sync/src/orphan_blocks_pool.rs index 6e7ff21f..702a539b 100644 --- a/sync/src/orphan_blocks_pool.rs +++ b/sync/src/orphan_blocks_pool.rs @@ -24,7 +24,6 @@ impl OrphanBlocksPool { } } - #[cfg(test)] /// Get total number of blocks in pool pub fn len(&self) -> usize { self.orphaned_blocks.len() diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index 192739c4..21ead87e 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -6,8 +6,7 @@ use chain::Transaction; use network::Magic; use primitives::hash::H256; use verification::{ChainVerifier, Verify as VerificationVerify, Chain}; -use synchronization_chain::ChainRef; -use db::IndexedBlock; +use db::{SharedStore, IndexedBlock}; /// Verification events sink pub trait VerificationSink : Send + 'static { @@ -49,9 +48,8 @@ pub struct AsyncVerifier { impl AsyncVerifier { /// Create new async verifier - pub fn new(network: Magic, chain: ChainRef, sink: Arc>) -> Self { + pub fn new(network: Magic, storage: SharedStore, sink: Arc>) -> Self { let (verification_work_sender, verification_work_receiver) = channel(); - let storage = chain.read().storage(); let verifier = ChainVerifier::new(storage, network); AsyncVerifier { verification_work_sender: verification_work_sender, @@ -68,30 +66,14 @@ impl AsyncVerifier { fn verification_worker_proc(sink: Arc>, verifier: ChainVerifier, work_receiver: Receiver) { while let Ok(task) = work_receiver.recv() { match task { - VerificationTask::VerifyBlock(block) => { - // verify block - match verifier.verify(&block) { - Ok(Chain::Main) | Ok(Chain::Side) => { - sink.lock().on_block_verification_success(block) - }, - Ok(Chain::Orphan) => { - unreachable!("sync will never put orphaned blocks to verification queue"); - }, - Err(e) => { - sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash()) - } - } - }, - VerificationTask::VerifyTransaction(transaction) => { - // TODO: add verification here - sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash()) - } VerificationTask::Stop => break, + _ => execute_verification_task(&sink, &verifier, task), } } } } + impl Drop for AsyncVerifier { fn drop(&mut self) { if let Some(join_handle) = self.verification_worker_thread.take() { @@ -118,6 +100,62 @@ impl Verifier for AsyncVerifier { } } +/// Synchronous synchronization verifier +pub struct SyncVerifier { + /// Verifier + verifier: ChainVerifier, + /// Verification sink + sink: Arc>, +} + +impl SyncVerifier where T: VerificationSink { + /// Create new sync verifier + pub fn new(network: Magic, storage: SharedStore, sink: Arc>) -> Self { + let verifier = ChainVerifier::new(storage, network); + SyncVerifier { + verifier: verifier, + sink: sink, + } + } +} + +impl Verifier for SyncVerifier where T: VerificationSink { + /// Verify block + fn verify_block(&self, block: IndexedBlock) { + execute_verification_task(&self.sink, &self.verifier, VerificationTask::VerifyBlock(block)) + } + + /// Verify transaction + fn verify_transaction(&self, transaction: Transaction) { + execute_verification_task(&self.sink, &self.verifier, VerificationTask::VerifyTransaction(transaction)) + } +} + +/// Execute single verification task +fn execute_verification_task(sink: &Arc>, verifier: &ChainVerifier, task: VerificationTask) { + match task { + VerificationTask::VerifyBlock(block) => { + // verify block + match verifier.verify(&block) { + Ok(Chain::Main) | Ok(Chain::Side) => { + sink.lock().on_block_verification_success(block) + }, + Ok(Chain::Orphan) => { + unreachable!("sync will never put orphaned blocks to verification queue"); + }, + Err(e) => { + sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash()) + } + } + }, + VerificationTask::VerifyTransaction(transaction) => { + // TODO: add verification here + sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash()) + }, + _ => unreachable!("must be checked by caller"), + } +} + #[cfg(test)] pub mod tests { use std::sync::Arc;