diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 86850d23..fbf87f22 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -19,7 +19,7 @@ use synchronization_peers::Peers; use synchronization_chain::{Chain, ChainRef, BlockState, HeadersIntersection}; #[cfg(test)] use synchronization_chain::{Information as ChainInformation}; -use verification::{ChainVerifier, Error as VerificationError, Verify}; +use verification::{ChainVerifier, Verify}; use synchronization_executor::{Task, TaskExecutor}; use synchronization_manager::{manage_synchronization_peers_blocks, manage_synchronization_peers_inventory, manage_unknown_orphaned_blocks, MANAGEMENT_INTERVAL_MS, ManagePeersConfig, ManageUnknownBlocksConfig}; @@ -193,7 +193,7 @@ pub trait Client : Send + 'static { fn on_peer_disconnected(&mut self, peer_index: usize); fn get_peers_nearly_blocks_waiter(&mut self, peer_index: usize) -> (bool, Option>); fn on_block_verification_success(&mut self, block: Block); - fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256); + fn on_block_verification_error(&mut self, err: &str, hash: &H256); } /// Synchronization peer blocks waiter @@ -211,7 +211,7 @@ pub struct Config { pub consensus_params: ConsensusParams, /// Number of threads to allocate in synchronization CpuPool. pub threads_num: usize, - /// Do not verify incoming blocks before inserting to db. +/// Do not verify incoming blocks before inserting to db. pub skip_verification: bool, } @@ -410,7 +410,7 @@ impl Client for SynchronizationClient where T: TaskExecutor { fn on_block_verification_success(&mut self, block: Block) { let hash = block.hash(); // insert block to the storage - { + match { let mut chain = self.chain.write(); // remove block from verification queue @@ -419,19 +419,30 @@ impl Client for SynchronizationClient where T: TaskExecutor { if chain.forget_with_state_leave_header(&hash, BlockState::Verifying) != HashPosition::Missing { // block was in verification queue => insert to storage chain.insert_best_block(hash.clone(), block) - .expect("Error inserting to db."); + } else { + Ok(()) + } + } { + Ok(_) => { + // awake threads, waiting for this block insertion + self.awake_waiting_threads(&hash); + + // continue with synchronization + self.execute_synchronization_tasks(None); + }, + Err(db::Error::Consistency(e)) => { + // process as verification error + self.on_block_verification_error(&format!("{:?}", db::Error::Consistency(e)), &hash); + }, + Err(e) => { + // process as irrecoverable failure + panic!("Block {:?} insertion failed with error {:?}", hash, e); } } - - // awake threads, waiting for this block insertion - self.awake_waiting_threads(&hash); - - // continue with synchronization - self.execute_synchronization_tasks(None); } /// Process failed block verification - fn on_block_verification_error(&mut self, err: &VerificationError, hash: &H256) { + fn on_block_verification_error(&mut self, err: &str, hash: &H256) { warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash, err); { @@ -928,8 +939,8 @@ impl SynchronizationClient where T: TaskExecutor { Ok(_chain) => { sync.lock().on_block_verification_success(block) }, - Err(err) => { - sync.lock().on_block_verification_error(&err, &block.hash()) + Err(e) => { + sync.lock().on_block_verification_error(&format!("{:?}", e), &block.hash()) } } }, @@ -1428,4 +1439,9 @@ pub mod tests { assert_eq!(tasks, vec![Task::RequestBlocks(2, vec![block1.hash()])]); } } + + #[test] + fn sync_after_db_insert_nonfatal_fail() { + // TODO: implement me + } }