support semi-unordered blocks import
This commit is contained in:
parent
25748096fa
commit
9a6c5c8579
|
@ -13,7 +13,6 @@ pub fn import(cfg: Config, matches: &ArgMatches) -> Result<(), String> {
|
||||||
let blk_path = matches.value_of("PATH").expect("PATH is required in cli.yml; qed");
|
let blk_path = matches.value_of("PATH").expect("PATH is required in cli.yml; qed");
|
||||||
let blk_dir = try!(::import::open_blk_dir(blk_path).map_err(|_| "Import directory does not exist".to_owned()));
|
let blk_dir = try!(::import::open_blk_dir(blk_path).map_err(|_| "Import directory does not exist".to_owned()));
|
||||||
let mut counter = 0;
|
let mut counter = 0;
|
||||||
let mut skipped = 0;
|
|
||||||
for blk in blk_dir {
|
for blk in blk_dir {
|
||||||
// TODO: verify magic!
|
// TODO: verify magic!
|
||||||
let blk = try!(blk.map_err(|_| "Cannot read block".to_owned()));
|
let blk = try!(blk.map_err(|_| "Cannot read block".to_owned()));
|
||||||
|
@ -24,14 +23,12 @@ pub fn import(cfg: Config, matches: &ArgMatches) -> Result<(), String> {
|
||||||
info!("Imported {} blocks", counter);
|
info!("Imported {} blocks", counter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(Error::OutOfOrderBlock) => {
|
Err(Error::TooManyOrphanBlocks) => return Err("Too many orphan (unordered) blocks".into()),
|
||||||
skipped += 1;
|
|
||||||
},
|
|
||||||
Err(_) => return Err("Cannot append block".into()),
|
Err(_) => return Err("Cannot append block".into()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Finished import of {} blocks. Skipped {} blocks.", counter, skipped);
|
info!("Finished import of {} blocks", counter);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,45 +1,109 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use parking_lot::Mutex;
|
||||||
use chain;
|
use chain;
|
||||||
use db;
|
use db;
|
||||||
use network::Magic;
|
use network::Magic;
|
||||||
use verification::{Verify, ChainVerifier};
|
use orphan_blocks_pool::OrphanBlocksPool;
|
||||||
|
use synchronization_verifier::{Verifier, SyncVerifier, VerificationSink};
|
||||||
|
use primitives::hash::H256;
|
||||||
use super::Error;
|
use super::Error;
|
||||||
|
|
||||||
|
pub const MAX_ORPHANED_BLOCKS: usize = 64;
|
||||||
|
|
||||||
pub struct BlocksWriter {
|
pub struct BlocksWriter {
|
||||||
storage: Arc<db::Store>,
|
storage: db::SharedStore,
|
||||||
verifier: ChainVerifier,
|
orphaned_blocks_pool: OrphanBlocksPool,
|
||||||
|
verifier: SyncVerifier<BlocksWriterSink>,
|
||||||
|
sink: Arc<Mutex<BlocksWriterSink>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct BlocksWriterSink {
|
||||||
|
storage: db::SharedStore,
|
||||||
|
err: Option<Error>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlocksWriter {
|
impl BlocksWriter {
|
||||||
pub fn new(storage: db::SharedStore, network: Magic) -> BlocksWriter {
|
pub fn new(storage: db::SharedStore, network: Magic) -> BlocksWriter {
|
||||||
|
let sink = Arc::new(Mutex::new(BlocksWriterSink::new(storage.clone())));
|
||||||
|
let verifier = SyncVerifier::new(network, storage.clone(), sink.clone());
|
||||||
BlocksWriter {
|
BlocksWriter {
|
||||||
storage: storage.clone(),
|
storage: storage,
|
||||||
verifier: ChainVerifier::new(storage, network),
|
orphaned_blocks_pool: OrphanBlocksPool::new(),
|
||||||
|
verifier: verifier,
|
||||||
|
sink: sink,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn append_block(&mut self, block: chain::Block) -> Result<(), Error> {
|
pub fn append_block(&mut self, block: chain::Block) -> Result<(), Error> {
|
||||||
let indexed_block: db::IndexedBlock = block.into();
|
let indexed_block: db::IndexedBlock = block.into();
|
||||||
// TODO: share same verification code with synchronization_client
|
// verify && insert only if parent block is already in the storage
|
||||||
if self.storage.best_block().map_or(false, |bb| bb.hash != indexed_block.header().previous_header_hash) {
|
if !self.storage.contains_block(db::BlockRef::Hash(indexed_block.header().previous_header_hash.clone())) {
|
||||||
return Err(Error::OutOfOrderBlock);
|
self.orphaned_blocks_pool.insert_orphaned_block(indexed_block.hash().clone(), indexed_block);
|
||||||
|
// we can't hold many orphaned blocks in memory during import
|
||||||
|
if self.orphaned_blocks_pool.len() > MAX_ORPHANED_BLOCKS {
|
||||||
|
return Err(Error::TooManyOrphanBlocks);
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.verifier.verify(&indexed_block) {
|
// verify && insert block && all its orphan children
|
||||||
Err(err) => Err(Error::Verification(err)),
|
let mut verification_queue: VecDeque<db::IndexedBlock> = self.orphaned_blocks_pool.remove_blocks_for_parent(indexed_block.hash()).into_iter().map(|(_, b)| b).collect();
|
||||||
Ok(_chain) => { try!(self.storage.insert_indexed_block(&indexed_block).map_err(Error::Database)); Ok(()) }
|
verification_queue.push_front(indexed_block);
|
||||||
|
while let Some(block) = verification_queue.pop_front() {
|
||||||
|
println!("Verifying {:?}", block.hash().to_reversed_str());
|
||||||
|
self.verifier.verify_block(block);
|
||||||
|
if let Some(err) = self.sink.lock().error() {
|
||||||
|
return Err(err);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl BlocksWriterSink {
|
||||||
|
pub fn new(storage: db::SharedStore) -> Self {
|
||||||
|
BlocksWriterSink {
|
||||||
|
storage: storage,
|
||||||
|
err: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn error(&mut self) -> Option<Error> {
|
||||||
|
self.err.take()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl VerificationSink for BlocksWriterSink {
|
||||||
|
fn on_block_verification_success(&mut self, block: db::IndexedBlock) {
|
||||||
|
if let Err(err) = self.storage.insert_indexed_block(&block) {
|
||||||
|
self.err = Some(Error::Database(err));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_block_verification_error(&mut self, err: &str, _hash: &H256) {
|
||||||
|
self.err = Some(Error::Verification(err.into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_transaction_verification_success(&mut self, _transaction: chain::Transaction) {
|
||||||
|
unreachable!("not intended to verify transactions")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_transaction_verification_error(&mut self, _err: &str, _hash: &H256) {
|
||||||
|
unreachable!("not intended to verify transactions")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use db::{self, Store};
|
use db::{self, Store};
|
||||||
use network::Magic;
|
use network::Magic;
|
||||||
use {test_data, verification};
|
use test_data;
|
||||||
use super::super::Error;
|
use super::super::Error;
|
||||||
use super::BlocksWriter;
|
use super::{BlocksWriter, MAX_ORPHANED_BLOCKS};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn blocks_writer_appends_blocks() {
|
fn blocks_writer_appends_blocks() {
|
||||||
|
@ -52,11 +116,15 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn blocks_writer_verification_error() {
|
fn blocks_writer_verification_error() {
|
||||||
let db = Arc::new(db::TestStorage::with_genesis_block());
|
let db = Arc::new(db::TestStorage::with_genesis_block());
|
||||||
|
let blocks = test_data::build_n_empty_blocks_from_genesis((MAX_ORPHANED_BLOCKS + 2) as u32, 1);
|
||||||
let mut blocks_target = BlocksWriter::new(db.clone(), Magic::Testnet);
|
let mut blocks_target = BlocksWriter::new(db.clone(), Magic::Testnet);
|
||||||
match blocks_target.append_block(test_data::block_h2()).unwrap_err() {
|
for (index, block) in blocks.into_iter().skip(1).enumerate() {
|
||||||
Error::OutOfOrderBlock => (),
|
match blocks_target.append_block(block) {
|
||||||
_ => panic!("Unexpected error"),
|
Err(Error::TooManyOrphanBlocks) if index == MAX_ORPHANED_BLOCKS => (),
|
||||||
};
|
Ok(_) if index != MAX_ORPHANED_BLOCKS => (),
|
||||||
|
_ => panic!("unexpected"),
|
||||||
|
}
|
||||||
|
}
|
||||||
assert_eq!(db.best_block().expect("Block is inserted").number, 0);
|
assert_eq!(db.best_block().expect("Block is inserted").number, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -69,7 +137,7 @@ mod tests {
|
||||||
.header().parent(test_data::genesis().hash()).build()
|
.header().parent(test_data::genesis().hash()).build()
|
||||||
.build();
|
.build();
|
||||||
match blocks_target.append_block(wrong_block).unwrap_err() {
|
match blocks_target.append_block(wrong_block).unwrap_err() {
|
||||||
Error::Verification(verification::Error::Empty) => (),
|
Error::Verification(_) => (),
|
||||||
_ => panic!("Unexpected error"),
|
_ => panic!("Unexpected error"),
|
||||||
};
|
};
|
||||||
assert_eq!(db.best_block().expect("Block is inserted").number, 0);
|
assert_eq!(db.best_block().expect("Block is inserted").number, 0);
|
||||||
|
|
|
@ -51,12 +51,12 @@ use network::Magic;
|
||||||
/// Sync errors.
|
/// Sync errors.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
/// Out of order block.
|
/// Too many orphan blocks.
|
||||||
OutOfOrderBlock,
|
TooManyOrphanBlocks,
|
||||||
/// Database error.
|
/// Database error.
|
||||||
Database(db::Error),
|
Database(db::Error),
|
||||||
/// Block verification error.
|
/// Block verification error.
|
||||||
Verification(verification::Error),
|
Verification(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create blocks writer.
|
/// Create blocks writer.
|
||||||
|
@ -74,11 +74,11 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S
|
||||||
use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig};
|
use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig};
|
||||||
use synchronization_verifier::AsyncVerifier;
|
use synchronization_verifier::AsyncVerifier;
|
||||||
|
|
||||||
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db)));
|
let sync_chain = Arc::new(RwLock::new(SyncChain::new(db.clone())));
|
||||||
let sync_executor = SyncExecutor::new(sync_chain.clone());
|
let sync_executor = SyncExecutor::new(sync_chain.clone());
|
||||||
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
|
let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone()));
|
||||||
let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone());
|
let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone());
|
||||||
let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone());
|
let verifier = AsyncVerifier::new(network, db, sync_client_core.clone());
|
||||||
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
|
let sync_client = SynchronizationClient::new(sync_client_core, verifier);
|
||||||
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
|
let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor));
|
||||||
SyncConnectionFactory::with_local_node(sync_node)
|
SyncConnectionFactory::with_local_node(sync_node)
|
||||||
|
|
|
@ -24,7 +24,6 @@ impl OrphanBlocksPool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
/// Get total number of blocks in pool
|
/// Get total number of blocks in pool
|
||||||
pub fn len(&self) -> usize {
|
pub fn len(&self) -> usize {
|
||||||
self.orphaned_blocks.len()
|
self.orphaned_blocks.len()
|
||||||
|
|
|
@ -6,8 +6,7 @@ use chain::Transaction;
|
||||||
use network::Magic;
|
use network::Magic;
|
||||||
use primitives::hash::H256;
|
use primitives::hash::H256;
|
||||||
use verification::{ChainVerifier, Verify as VerificationVerify, Chain};
|
use verification::{ChainVerifier, Verify as VerificationVerify, Chain};
|
||||||
use synchronization_chain::ChainRef;
|
use db::{SharedStore, IndexedBlock};
|
||||||
use db::IndexedBlock;
|
|
||||||
|
|
||||||
/// Verification events sink
|
/// Verification events sink
|
||||||
pub trait VerificationSink : Send + 'static {
|
pub trait VerificationSink : Send + 'static {
|
||||||
|
@ -49,9 +48,8 @@ pub struct AsyncVerifier {
|
||||||
|
|
||||||
impl AsyncVerifier {
|
impl AsyncVerifier {
|
||||||
/// Create new async verifier
|
/// Create new async verifier
|
||||||
pub fn new<T: VerificationSink>(network: Magic, chain: ChainRef, sink: Arc<Mutex<T>>) -> Self {
|
pub fn new<T: VerificationSink>(network: Magic, storage: SharedStore, sink: Arc<Mutex<T>>) -> Self {
|
||||||
let (verification_work_sender, verification_work_receiver) = channel();
|
let (verification_work_sender, verification_work_receiver) = channel();
|
||||||
let storage = chain.read().storage();
|
|
||||||
let verifier = ChainVerifier::new(storage, network);
|
let verifier = ChainVerifier::new(storage, network);
|
||||||
AsyncVerifier {
|
AsyncVerifier {
|
||||||
verification_work_sender: verification_work_sender,
|
verification_work_sender: verification_work_sender,
|
||||||
|
@ -68,30 +66,14 @@ impl AsyncVerifier {
|
||||||
fn verification_worker_proc<T: VerificationSink>(sink: Arc<Mutex<T>>, verifier: ChainVerifier, work_receiver: Receiver<VerificationTask>) {
|
fn verification_worker_proc<T: VerificationSink>(sink: Arc<Mutex<T>>, verifier: ChainVerifier, work_receiver: Receiver<VerificationTask>) {
|
||||||
while let Ok(task) = work_receiver.recv() {
|
while let Ok(task) = work_receiver.recv() {
|
||||||
match task {
|
match task {
|
||||||
VerificationTask::VerifyBlock(block) => {
|
|
||||||
// verify block
|
|
||||||
match verifier.verify(&block) {
|
|
||||||
Ok(Chain::Main) | Ok(Chain::Side) => {
|
|
||||||
sink.lock().on_block_verification_success(block)
|
|
||||||
},
|
|
||||||
Ok(Chain::Orphan) => {
|
|
||||||
unreachable!("sync will never put orphaned blocks to verification queue");
|
|
||||||
},
|
|
||||||
Err(e) => {
|
|
||||||
sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
VerificationTask::VerifyTransaction(transaction) => {
|
|
||||||
// TODO: add verification here
|
|
||||||
sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash())
|
|
||||||
}
|
|
||||||
VerificationTask::Stop => break,
|
VerificationTask::Stop => break,
|
||||||
|
_ => execute_verification_task(&sink, &verifier, task),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
impl Drop for AsyncVerifier {
|
impl Drop for AsyncVerifier {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(join_handle) = self.verification_worker_thread.take() {
|
if let Some(join_handle) = self.verification_worker_thread.take() {
|
||||||
|
@ -118,6 +100,62 @@ impl Verifier for AsyncVerifier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Synchronous synchronization verifier
|
||||||
|
pub struct SyncVerifier<T: VerificationSink> {
|
||||||
|
/// Verifier
|
||||||
|
verifier: ChainVerifier,
|
||||||
|
/// Verification sink
|
||||||
|
sink: Arc<Mutex<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> SyncVerifier<T> where T: VerificationSink {
|
||||||
|
/// Create new sync verifier
|
||||||
|
pub fn new(network: Magic, storage: SharedStore, sink: Arc<Mutex<T>>) -> Self {
|
||||||
|
let verifier = ChainVerifier::new(storage, network);
|
||||||
|
SyncVerifier {
|
||||||
|
verifier: verifier,
|
||||||
|
sink: sink,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Verifier for SyncVerifier<T> where T: VerificationSink {
|
||||||
|
/// Verify block
|
||||||
|
fn verify_block(&self, block: IndexedBlock) {
|
||||||
|
execute_verification_task(&self.sink, &self.verifier, VerificationTask::VerifyBlock(block))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Verify transaction
|
||||||
|
fn verify_transaction(&self, transaction: Transaction) {
|
||||||
|
execute_verification_task(&self.sink, &self.verifier, VerificationTask::VerifyTransaction(transaction))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Execute single verification task
|
||||||
|
fn execute_verification_task<T: VerificationSink>(sink: &Arc<Mutex<T>>, verifier: &ChainVerifier, task: VerificationTask) {
|
||||||
|
match task {
|
||||||
|
VerificationTask::VerifyBlock(block) => {
|
||||||
|
// verify block
|
||||||
|
match verifier.verify(&block) {
|
||||||
|
Ok(Chain::Main) | Ok(Chain::Side) => {
|
||||||
|
sink.lock().on_block_verification_success(block)
|
||||||
|
},
|
||||||
|
Ok(Chain::Orphan) => {
|
||||||
|
unreachable!("sync will never put orphaned blocks to verification queue");
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
sink.lock().on_block_verification_error(&format!("{:?}", e), &block.hash())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
VerificationTask::VerifyTransaction(transaction) => {
|
||||||
|
// TODO: add verification here
|
||||||
|
sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash())
|
||||||
|
},
|
||||||
|
_ => unreachable!("must be checked by caller"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod tests {
|
pub mod tests {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
Loading…
Reference in New Issue