diff --git a/sync/src/synchronization_client_core.rs b/sync/src/synchronization_client_core.rs index f982128b..908e4fe7 100644 --- a/sync/src/synchronization_client_core.rs +++ b/sync/src/synchronization_client_core.rs @@ -297,6 +297,10 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // validate blocks headers before scheduling let last_known_hash = if first_unknown_index > 0 { headers[first_unknown_index - 1].hash.clone() } else { header0.raw.previous_header_hash.clone() }; + if self.config.close_connection_on_bad_block && self.chain.block_state(&last_known_hash) == BlockState::DeadEnd { + self.peers.misbehaving(peer_index, &format!("Provided after dead-end block {}", last_known_hash.to_reversed_str())); + return; + } match self.verify_headers(peer_index, last_known_hash, &headers[first_unknown_index..num_headers]) { BlocksHeadersVerificationResult::Error(error_index) => self.chain.mark_dead_end_block(&headers[first_unknown_index + error_index].hash), BlocksHeadersVerificationResult::Skip => (), @@ -814,6 +818,7 @@ impl SynchronizationClientCore where T: TaskExecutor { // check that we do not know all blocks in range [first_unknown_index..] // if we know some block => there has been verification error => all headers should be ignored // see when_previous_block_verification_failed_fork_is_not_requested for details +println!("=== {:?}", self.chain.block_state(&header.hash)); match self.chain.block_state(&header.hash) { BlockState::Unknown => (), BlockState::DeadEnd if self.config.close_connection_on_bad_block => { @@ -1213,17 +1218,22 @@ pub mod tests { None => Arc::new(db::TestStorage::with_genesis_block()), }; let sync_state = SynchronizationStateRef::new(SynchronizationState::with_storage(storage.clone())); - let chain = Chain::new(storage.clone(), Arc::new(RwLock::new(MemoryPool::new()))); + let memory_pool = Arc::new(RwLock::new(MemoryPool::new())); + let chain = Chain::new(storage.clone(), memory_pool.clone()); let executor = DummyTaskExecutor::new(); let config = Config { network: Magic::Mainnet, threads_num: 1, close_connection_on_bad_block: true }; - let chain_verifier = Arc::new(ChainVerifier::new(storage.clone(), Magic::Testnet)); - let client_core = SynchronizationClientCore::new(config, &handle, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier); + let chain_verifier = Arc::new(ChainVerifier::new(storage.clone(), Magic::Unitest)); + let client_core = SynchronizationClientCore::new(config, &handle, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier.clone()); { client_core.lock().set_verify_headers(false); } let mut verifier = verifier.unwrap_or_default(); verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone()))); + verifier.set_storage(storage); + verifier.set_memory_pool(memory_pool); + verifier.set_verifier(chain_verifier); + let client = SynchronizationClient::new(sync_state, client_core.clone(), verifier); (event_loop, handle, executor, client_core, client) } @@ -2035,7 +2045,6 @@ pub mod tests { chain.mark_dead_end_block(&b0.hash()); } - core.lock().set_verify_headers(true); core.lock().peers.insert(0, DummyOutboundSyncConnection::new()); assert!(core.lock().peers.enumerate().contains(&0)); @@ -2155,4 +2164,107 @@ pub mod tests { fn when_transaction_replaces_locked_transaction() { // TODO } + + #[test] + fn when_transaction_double_spends_during_reorg() { + let b0 = test_data::block_builder().header().build() + .transaction().coinbase() + .output().value(10).build() + .build() + .transaction() + .output().value(20).build() + .build() + .transaction() + .output().value(30).build() + .build() + .transaction() + .output().value(40).build() + .build() + .transaction() + .output().value(50).build() + .build() + .build(); + + // in-storage spends b0[1] && b0[2] + let b1 = test_data::block_builder() + .transaction().coinbase() + .output().value(50).build() + .build() + .transaction().version(10) + .input().hash(b0.transactions[1].hash()).index(0).build() + .output().value(10).build() + .build() + .transaction().version(40) + .input().hash(b0.transactions[2].hash()).index(0).build() + .output().value(10).build() + .build() + .merkled_header().parent(b0.hash()).build() + .build(); + // in-memory spends b0[3] + // in-memory spends b0[4] + let future_block = test_data::block_builder().header().parent(b1.hash()).build() + .transaction().version(40) + .input().hash(b0.transactions[3].hash()).index(0).build() + .output().value(10).build() + .build() + .transaction().version(50) + .input().hash(b0.transactions[4].hash()).index(0).build() + .output().value(10).build() + .build() + .build(); + let tx2: Transaction = future_block.transactions[0].clone(); + let tx3: Transaction = future_block.transactions[1].clone(); + + // in-storage [side] spends b0[3] + let b2 = test_data::block_builder().header().parent(b0.hash()).build() + .transaction().coinbase() + .output().value(5555).build() + .build() + .transaction().version(20) + .input().hash(b0.transactions[3].hash()).index(0).build() + .build() + .merkled_header().parent(b0.hash()).build() + .build(); + // in-storage [causes reorg to b2 + b3] spends b0[1] + let b3 = test_data::block_builder() + .transaction().coinbase().version(40) + .output().value(50).build() + .build() + .transaction().version(30) + .input().hash(b0.transactions[1].hash()).index(0).build() + .output().value(10).build() + .build() + .merkled_header().parent(b2.hash()).build() + .build(); + + let mut dummy_verifier = DummyVerifier::default(); + dummy_verifier.actual_check_when_verifying(b3.hash()); + + let storage = create_disk_storage(); + storage.insert_block(&b0).expect("no db error"); + + let (_, _, _, core, sync) = create_sync(Some(storage), Some(dummy_verifier)); + sync.on_block(0, b1.clone().into()); + sync.on_transaction(0, tx2.clone().into()); + sync.on_transaction(0, tx3.clone().into()); + assert_eq!(core.lock().information().chain.stored, 2); // b0 + b1 + assert_eq!(core.lock().information().chain.transactions.transactions_count, 2); // tx2 + tx3 + + // insert b2, which will make tx2 invalid, but not yet + sync.on_block(0, b2.clone().into()); + assert_eq!(core.lock().information().chain.stored, 2); // b0 + b1 + assert_eq!(core.lock().information().chain.transactions.transactions_count, 2); // tx2 + tx3 + + // insert b3 => best chain is b0 + b2 + b3 + // + transaction from b0 is reverified => ok + // + tx2 will be reverified => fail + // + tx3 will be reverified => ok + sync.on_block(0, b3.clone().into()); + assert_eq!(core.lock().information().chain.stored, 3); // b0 + b2 + b3 + assert_eq!(core.lock().information().chain.transactions.transactions_count, 2); // b1[0] + tx3 + + let mempool = core.lock().chain().memory_pool(); + assert!(mempool.write().remove_by_hash(&b1.transactions[2].hash()).is_some()); + assert!(mempool.write().remove_by_hash(&tx3.hash()).is_some()); + } } diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index d488ab95..1195838c 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -86,53 +86,62 @@ impl AsyncVerifier { /// Thread procedure for handling verification tasks fn verification_worker_proc(sink: Arc, storage: StorageRef, memory_pool: MemoryPoolRef, verifier: Arc, work_receiver: Receiver) { while let Ok(task) = work_receiver.recv() { - // block verification && insertion can lead to reorganization - // => transactions from decanonized blocks should be put back to the MemoryPool - // => they must be verified again - // => here's sub-tasks queue - let mut tasks_queue: VecDeque = VecDeque::new(); - tasks_queue.push_back(task); - - while let Some(task) = tasks_queue.pop_front() { - match task { - VerificationTask::VerifyBlock(block) => { - // verify block - match verifier.verify(&block) { - Ok(Chain::Main) | Ok(Chain::Side) => { - if let Some(tasks) = sink.on_block_verification_success(block) { - tasks_queue.extend(tasks); - } - }, - Ok(Chain::Orphan) => { - // this can happen for B1 if B0 verification has failed && we have already scheduled verification of B0 - sink.on_block_verification_error(&format!("orphaned block because parent block verification has failed"), block.hash()) - }, - Err(e) => { - sink.on_block_verification_error(&format!("{:?}", e), block.hash()) - } - } - }, - VerificationTask::VerifyTransaction(height, transaction) => { - // output provider must check previous outputs in both storage && memory pool - match MemoryPoolTransactionOutputProvider::for_transaction(storage.clone(), &memory_pool, &transaction.raw) { - Err(e) => { - sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash); - continue; // with new verification sub-task - }, - Ok(tx_output_provider) => { - let time: u32 = get_time().sec as u32; - match verifier.verify_mempool_transaction(&tx_output_provider, height, time, &transaction.raw) { - Ok(_) => sink.on_transaction_verification_success(transaction.into()), - Err(e) => sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash), - } - }, - }; - }, - VerificationTask::Stop => break, - } + if !AsyncVerifier::execute_single_task(&sink, &storage, &memory_pool, &verifier, task) { + break; } } } + + /// Execute single verification task + pub fn execute_single_task(sink: &Arc, storage: &StorageRef, memory_pool: &MemoryPoolRef, verifier: &Arc, task: VerificationTask) -> bool { + // block verification && insertion can lead to reorganization + // => transactions from decanonized blocks should be put back to the MemoryPool + // => they must be verified again + // => here's sub-tasks queue + let mut tasks_queue: VecDeque = VecDeque::new(); + tasks_queue.push_back(task); + + while let Some(task) = tasks_queue.pop_front() { + match task { + VerificationTask::VerifyBlock(block) => { + // verify block + match verifier.verify(&block) { + Ok(Chain::Main) | Ok(Chain::Side) => { + if let Some(tasks) = sink.on_block_verification_success(block) { + tasks_queue.extend(tasks); + } + }, + Ok(Chain::Orphan) => { + // this can happen for B1 if B0 verification has failed && we have already scheduled verification of B0 + sink.on_block_verification_error(&format!("orphaned block because parent block verification has failed"), block.hash()) + }, + Err(e) => { + sink.on_block_verification_error(&format!("{:?}", e), block.hash()) + } + } + }, + VerificationTask::VerifyTransaction(height, transaction) => { + // output provider must check previous outputs in both storage && memory pool + match MemoryPoolTransactionOutputProvider::for_transaction(storage.clone(), &memory_pool, &transaction.raw) { + Err(e) => { + sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash); + continue; // with new verification sub-task + }, + Ok(tx_output_provider) => { + let time: u32 = get_time().sec as u32; + match verifier.verify_mempool_transaction(&tx_output_provider, height, time, &transaction.raw) { + Ok(_) => sink.on_transaction_verification_success(transaction.into()), + Err(e) => sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash), + } + }, + }; + }, + VerificationTask::Stop => return false, + } + } + + true + } } @@ -208,18 +217,23 @@ impl Verifier for SyncVerifier where T: VerificationSink { #[cfg(test)] pub mod tests { use std::sync::Arc; - use std::collections::HashMap; + use std::collections::{HashSet, HashMap}; + use verification::BackwardsCompatibleChainVerifier as ChainVerifier; use synchronization_client_core::CoreVerificationSink; use synchronization_executor::tests::DummyTaskExecutor; use primitives::hash::H256; use chain::{IndexedBlock, IndexedTransaction}; - use super::{Verifier, BlockVerificationSink, TransactionVerificationSink}; - use types::BlockHeight; + use super::{Verifier, BlockVerificationSink, TransactionVerificationSink, AsyncVerifier, VerificationTask}; + use types::{BlockHeight, StorageRef, MemoryPoolRef}; #[derive(Default)] pub struct DummyVerifier { sink: Option>>, - errors: HashMap + errors: HashMap, + actual_checks: HashSet, + storage: Option, + memory_pool: Option, + verifier: Option>, } impl DummyVerifier { @@ -227,9 +241,25 @@ pub mod tests { self.sink = Some(sink); } + pub fn set_storage(&mut self, storage: StorageRef) { + self.storage = Some(storage); + } + + pub fn set_memory_pool(&mut self, memory_pool: MemoryPoolRef) { + self.memory_pool = Some(memory_pool); + } + + pub fn set_verifier(&mut self, verifier: Arc) { + self.verifier = Some(verifier); + } + pub fn error_when_verifying(&mut self, hash: H256, err: &str) { self.errors.insert(hash, err.into()); } + + pub fn actual_check_when_verifying(&mut self, hash: H256) { + self.actual_checks.insert(hash); + } } impl Verifier for DummyVerifier { @@ -238,8 +268,11 @@ pub mod tests { Some(ref sink) => match self.errors.get(&block.hash()) { Some(err) => sink.on_block_verification_error(&err, &block.hash()), None => { - sink.on_block_verification_success(block); - () + if self.actual_checks.contains(block.hash()) { + AsyncVerifier::execute_single_task(sink, self.storage.as_ref().unwrap(), self.memory_pool.as_ref().unwrap(), self.verifier.as_ref().unwrap(), VerificationTask::VerifyBlock(block)); + } else { + sink.on_block_verification_success(block); + } }, }, None => panic!("call set_sink"), @@ -250,7 +283,14 @@ pub mod tests { match self.sink { Some(ref sink) => match self.errors.get(&transaction.hash) { Some(err) => sink.on_transaction_verification_error(&err, &transaction.hash), - None => sink.on_transaction_verification_success(transaction.into()), + None => { + if self.actual_checks.contains(&transaction.hash) { + let next_block_height = self.storage.as_ref().unwrap().best_block().unwrap().number + 1; + AsyncVerifier::execute_single_task(sink, self.storage.as_ref().unwrap(), self.memory_pool.as_ref().unwrap(), self.verifier.as_ref().unwrap(), VerificationTask::VerifyTransaction(next_block_height, transaction)); + } else { + sink.on_transaction_verification_success(transaction.into()); + } + }, }, None => panic!("call set_sink"), } diff --git a/test-data/src/block.rs b/test-data/src/block.rs index 51f3c90c..c354819d 100644 --- a/test-data/src/block.rs +++ b/test-data/src/block.rs @@ -293,6 +293,11 @@ impl TransactionBuilder where F: Invoke { self } + pub fn version(mut self, version: i32) -> Self { + self.version = version; + self + } + pub fn input(self) -> TransactionInputBuilder { TransactionInputBuilder::with_callback(self) }