when_transaction_double_spends_during_reorg

This commit is contained in:
Svyatoslav Nikolsky 2017-01-09 17:35:15 +03:00
parent afc9c53df0
commit fe97161092
3 changed files with 212 additions and 55 deletions

View File

@ -297,6 +297,10 @@ impl<T> ClientCore for SynchronizationClientCore<T> where T: TaskExecutor {
// validate blocks headers before scheduling
let last_known_hash = if first_unknown_index > 0 { headers[first_unknown_index - 1].hash.clone() } else { header0.raw.previous_header_hash.clone() };
if self.config.close_connection_on_bad_block && self.chain.block_state(&last_known_hash) == BlockState::DeadEnd {
self.peers.misbehaving(peer_index, &format!("Provided after dead-end block {}", last_known_hash.to_reversed_str()));
return;
}
match self.verify_headers(peer_index, last_known_hash, &headers[first_unknown_index..num_headers]) {
BlocksHeadersVerificationResult::Error(error_index) => self.chain.mark_dead_end_block(&headers[first_unknown_index + error_index].hash),
BlocksHeadersVerificationResult::Skip => (),
@ -814,6 +818,7 @@ impl<T> SynchronizationClientCore<T> where T: TaskExecutor {
// check that we do not know all blocks in range [first_unknown_index..]
// if we know some block => there has been verification error => all headers should be ignored
// see when_previous_block_verification_failed_fork_is_not_requested for details
println!("=== {:?}", self.chain.block_state(&header.hash));
match self.chain.block_state(&header.hash) {
BlockState::Unknown => (),
BlockState::DeadEnd if self.config.close_connection_on_bad_block => {
@ -1213,17 +1218,22 @@ pub mod tests {
None => Arc::new(db::TestStorage::with_genesis_block()),
};
let sync_state = SynchronizationStateRef::new(SynchronizationState::with_storage(storage.clone()));
let chain = Chain::new(storage.clone(), Arc::new(RwLock::new(MemoryPool::new())));
let memory_pool = Arc::new(RwLock::new(MemoryPool::new()));
let chain = Chain::new(storage.clone(), memory_pool.clone());
let executor = DummyTaskExecutor::new();
let config = Config { network: Magic::Mainnet, threads_num: 1, close_connection_on_bad_block: true };
let chain_verifier = Arc::new(ChainVerifier::new(storage.clone(), Magic::Testnet));
let client_core = SynchronizationClientCore::new(config, &handle, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier);
let chain_verifier = Arc::new(ChainVerifier::new(storage.clone(), Magic::Unitest));
let client_core = SynchronizationClientCore::new(config, &handle, sync_state.clone(), sync_peers.clone(), executor.clone(), chain, chain_verifier.clone());
{
client_core.lock().set_verify_headers(false);
}
let mut verifier = verifier.unwrap_or_default();
verifier.set_sink(Arc::new(CoreVerificationSink::new(client_core.clone())));
verifier.set_storage(storage);
verifier.set_memory_pool(memory_pool);
verifier.set_verifier(chain_verifier);
let client = SynchronizationClient::new(sync_state, client_core.clone(), verifier);
(event_loop, handle, executor, client_core, client)
}
@ -2035,7 +2045,6 @@ pub mod tests {
chain.mark_dead_end_block(&b0.hash());
}
core.lock().set_verify_headers(true);
core.lock().peers.insert(0, DummyOutboundSyncConnection::new());
assert!(core.lock().peers.enumerate().contains(&0));
@ -2155,4 +2164,107 @@ pub mod tests {
fn when_transaction_replaces_locked_transaction() {
// TODO
}
#[test]
fn when_transaction_double_spends_during_reorg() {
let b0 = test_data::block_builder().header().build()
.transaction().coinbase()
.output().value(10).build()
.build()
.transaction()
.output().value(20).build()
.build()
.transaction()
.output().value(30).build()
.build()
.transaction()
.output().value(40).build()
.build()
.transaction()
.output().value(50).build()
.build()
.build();
// in-storage spends b0[1] && b0[2]
let b1 = test_data::block_builder()
.transaction().coinbase()
.output().value(50).build()
.build()
.transaction().version(10)
.input().hash(b0.transactions[1].hash()).index(0).build()
.output().value(10).build()
.build()
.transaction().version(40)
.input().hash(b0.transactions[2].hash()).index(0).build()
.output().value(10).build()
.build()
.merkled_header().parent(b0.hash()).build()
.build();
// in-memory spends b0[3]
// in-memory spends b0[4]
let future_block = test_data::block_builder().header().parent(b1.hash()).build()
.transaction().version(40)
.input().hash(b0.transactions[3].hash()).index(0).build()
.output().value(10).build()
.build()
.transaction().version(50)
.input().hash(b0.transactions[4].hash()).index(0).build()
.output().value(10).build()
.build()
.build();
let tx2: Transaction = future_block.transactions[0].clone();
let tx3: Transaction = future_block.transactions[1].clone();
// in-storage [side] spends b0[3]
let b2 = test_data::block_builder().header().parent(b0.hash()).build()
.transaction().coinbase()
.output().value(5555).build()
.build()
.transaction().version(20)
.input().hash(b0.transactions[3].hash()).index(0).build()
.build()
.merkled_header().parent(b0.hash()).build()
.build();
// in-storage [causes reorg to b2 + b3] spends b0[1]
let b3 = test_data::block_builder()
.transaction().coinbase().version(40)
.output().value(50).build()
.build()
.transaction().version(30)
.input().hash(b0.transactions[1].hash()).index(0).build()
.output().value(10).build()
.build()
.merkled_header().parent(b2.hash()).build()
.build();
let mut dummy_verifier = DummyVerifier::default();
dummy_verifier.actual_check_when_verifying(b3.hash());
let storage = create_disk_storage();
storage.insert_block(&b0).expect("no db error");
let (_, _, _, core, sync) = create_sync(Some(storage), Some(dummy_verifier));
sync.on_block(0, b1.clone().into());
sync.on_transaction(0, tx2.clone().into());
sync.on_transaction(0, tx3.clone().into());
assert_eq!(core.lock().information().chain.stored, 2); // b0 + b1
assert_eq!(core.lock().information().chain.transactions.transactions_count, 2); // tx2 + tx3
// insert b2, which will make tx2 invalid, but not yet
sync.on_block(0, b2.clone().into());
assert_eq!(core.lock().information().chain.stored, 2); // b0 + b1
assert_eq!(core.lock().information().chain.transactions.transactions_count, 2); // tx2 + tx3
// insert b3 => best chain is b0 + b2 + b3
// + transaction from b0 is reverified => ok
// + tx2 will be reverified => fail
// + tx3 will be reverified => ok
sync.on_block(0, b3.clone().into());
assert_eq!(core.lock().information().chain.stored, 3); // b0 + b2 + b3
assert_eq!(core.lock().information().chain.transactions.transactions_count, 2); // b1[0] + tx3
let mempool = core.lock().chain().memory_pool();
assert!(mempool.write().remove_by_hash(&b1.transactions[2].hash()).is_some());
assert!(mempool.write().remove_by_hash(&tx3.hash()).is_some());
}
}

View File

@ -86,53 +86,62 @@ impl AsyncVerifier {
/// Thread procedure for handling verification tasks
fn verification_worker_proc<T: VerificationSink>(sink: Arc<T>, storage: StorageRef, memory_pool: MemoryPoolRef, verifier: Arc<ChainVerifier>, work_receiver: Receiver<VerificationTask>) {
while let Ok(task) = work_receiver.recv() {
// block verification && insertion can lead to reorganization
// => transactions from decanonized blocks should be put back to the MemoryPool
// => they must be verified again
// => here's sub-tasks queue
let mut tasks_queue: VecDeque<VerificationTask> = VecDeque::new();
tasks_queue.push_back(task);
while let Some(task) = tasks_queue.pop_front() {
match task {
VerificationTask::VerifyBlock(block) => {
// verify block
match verifier.verify(&block) {
Ok(Chain::Main) | Ok(Chain::Side) => {
if let Some(tasks) = sink.on_block_verification_success(block) {
tasks_queue.extend(tasks);
}
},
Ok(Chain::Orphan) => {
// this can happen for B1 if B0 verification has failed && we have already scheduled verification of B0
sink.on_block_verification_error(&format!("orphaned block because parent block verification has failed"), block.hash())
},
Err(e) => {
sink.on_block_verification_error(&format!("{:?}", e), block.hash())
}
}
},
VerificationTask::VerifyTransaction(height, transaction) => {
// output provider must check previous outputs in both storage && memory pool
match MemoryPoolTransactionOutputProvider::for_transaction(storage.clone(), &memory_pool, &transaction.raw) {
Err(e) => {
sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash);
continue; // with new verification sub-task
},
Ok(tx_output_provider) => {
let time: u32 = get_time().sec as u32;
match verifier.verify_mempool_transaction(&tx_output_provider, height, time, &transaction.raw) {
Ok(_) => sink.on_transaction_verification_success(transaction.into()),
Err(e) => sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash),
}
},
};
},
VerificationTask::Stop => break,
}
if !AsyncVerifier::execute_single_task(&sink, &storage, &memory_pool, &verifier, task) {
break;
}
}
}
/// Execute single verification task
pub fn execute_single_task<T: VerificationSink>(sink: &Arc<T>, storage: &StorageRef, memory_pool: &MemoryPoolRef, verifier: &Arc<ChainVerifier>, task: VerificationTask) -> bool {
// block verification && insertion can lead to reorganization
// => transactions from decanonized blocks should be put back to the MemoryPool
// => they must be verified again
// => here's sub-tasks queue
let mut tasks_queue: VecDeque<VerificationTask> = VecDeque::new();
tasks_queue.push_back(task);
while let Some(task) = tasks_queue.pop_front() {
match task {
VerificationTask::VerifyBlock(block) => {
// verify block
match verifier.verify(&block) {
Ok(Chain::Main) | Ok(Chain::Side) => {
if let Some(tasks) = sink.on_block_verification_success(block) {
tasks_queue.extend(tasks);
}
},
Ok(Chain::Orphan) => {
// this can happen for B1 if B0 verification has failed && we have already scheduled verification of B0
sink.on_block_verification_error(&format!("orphaned block because parent block verification has failed"), block.hash())
},
Err(e) => {
sink.on_block_verification_error(&format!("{:?}", e), block.hash())
}
}
},
VerificationTask::VerifyTransaction(height, transaction) => {
// output provider must check previous outputs in both storage && memory pool
match MemoryPoolTransactionOutputProvider::for_transaction(storage.clone(), &memory_pool, &transaction.raw) {
Err(e) => {
sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash);
continue; // with new verification sub-task
},
Ok(tx_output_provider) => {
let time: u32 = get_time().sec as u32;
match verifier.verify_mempool_transaction(&tx_output_provider, height, time, &transaction.raw) {
Ok(_) => sink.on_transaction_verification_success(transaction.into()),
Err(e) => sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash),
}
},
};
},
VerificationTask::Stop => return false,
}
}
true
}
}
@ -208,18 +217,23 @@ impl<T> Verifier for SyncVerifier<T> where T: VerificationSink {
#[cfg(test)]
pub mod tests {
use std::sync::Arc;
use std::collections::HashMap;
use std::collections::{HashSet, HashMap};
use verification::BackwardsCompatibleChainVerifier as ChainVerifier;
use synchronization_client_core::CoreVerificationSink;
use synchronization_executor::tests::DummyTaskExecutor;
use primitives::hash::H256;
use chain::{IndexedBlock, IndexedTransaction};
use super::{Verifier, BlockVerificationSink, TransactionVerificationSink};
use types::BlockHeight;
use super::{Verifier, BlockVerificationSink, TransactionVerificationSink, AsyncVerifier, VerificationTask};
use types::{BlockHeight, StorageRef, MemoryPoolRef};
#[derive(Default)]
pub struct DummyVerifier {
sink: Option<Arc<CoreVerificationSink<DummyTaskExecutor>>>,
errors: HashMap<H256, String>
errors: HashMap<H256, String>,
actual_checks: HashSet<H256>,
storage: Option<StorageRef>,
memory_pool: Option<MemoryPoolRef>,
verifier: Option<Arc<ChainVerifier>>,
}
impl DummyVerifier {
@ -227,9 +241,25 @@ pub mod tests {
self.sink = Some(sink);
}
pub fn set_storage(&mut self, storage: StorageRef) {
self.storage = Some(storage);
}
pub fn set_memory_pool(&mut self, memory_pool: MemoryPoolRef) {
self.memory_pool = Some(memory_pool);
}
pub fn set_verifier(&mut self, verifier: Arc<ChainVerifier>) {
self.verifier = Some(verifier);
}
pub fn error_when_verifying(&mut self, hash: H256, err: &str) {
self.errors.insert(hash, err.into());
}
pub fn actual_check_when_verifying(&mut self, hash: H256) {
self.actual_checks.insert(hash);
}
}
impl Verifier for DummyVerifier {
@ -238,8 +268,11 @@ pub mod tests {
Some(ref sink) => match self.errors.get(&block.hash()) {
Some(err) => sink.on_block_verification_error(&err, &block.hash()),
None => {
sink.on_block_verification_success(block);
()
if self.actual_checks.contains(block.hash()) {
AsyncVerifier::execute_single_task(sink, self.storage.as_ref().unwrap(), self.memory_pool.as_ref().unwrap(), self.verifier.as_ref().unwrap(), VerificationTask::VerifyBlock(block));
} else {
sink.on_block_verification_success(block);
}
},
},
None => panic!("call set_sink"),
@ -250,7 +283,14 @@ pub mod tests {
match self.sink {
Some(ref sink) => match self.errors.get(&transaction.hash) {
Some(err) => sink.on_transaction_verification_error(&err, &transaction.hash),
None => sink.on_transaction_verification_success(transaction.into()),
None => {
if self.actual_checks.contains(&transaction.hash) {
let next_block_height = self.storage.as_ref().unwrap().best_block().unwrap().number + 1;
AsyncVerifier::execute_single_task(sink, self.storage.as_ref().unwrap(), self.memory_pool.as_ref().unwrap(), self.verifier.as_ref().unwrap(), VerificationTask::VerifyTransaction(next_block_height, transaction));
} else {
sink.on_transaction_verification_success(transaction.into());
}
},
},
None => panic!("call set_sink"),
}

View File

@ -293,6 +293,11 @@ impl<F> TransactionBuilder<F> where F: Invoke<chain::Transaction> {
self
}
pub fn version(mut self, version: i32) -> Self {
self.version = version;
self
}
pub fn input(self) -> TransactionInputBuilder<Self> {
TransactionInputBuilder::with_callback(self)
}