From 945c19a8eb89ec5e38a04533ead14ce76f2e5f64 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Tue, 13 Dec 2016 23:26:08 +0300 Subject: [PATCH 1/4] replace non-final transactions in mempool --- chain/src/block.rs | 2 +- chain/src/transaction.rs | 12 ++- db/src/indexed_block.rs | 2 +- miner/src/block_assembler.rs | 62 +++++++++++---- miner/src/lib.rs | 3 +- miner/src/memory_pool.rs | 92 +++++++++++++++++++++- sync/src/local_node.rs | 4 +- sync/src/synchronization_chain.rs | 23 ++++++ sync/src/synchronization_verifier.rs | 112 ++++++++++++++++----------- test-data/src/chain_builder.rs | 10 ++- 10 files changed, 253 insertions(+), 69 deletions(-) diff --git a/chain/src/block.rs b/chain/src/block.rs index ff4278d8..03cedd87 100644 --- a/chain/src/block.rs +++ b/chain/src/block.rs @@ -68,7 +68,7 @@ impl Block { } pub fn is_final(&self, height: u32) -> bool { - self.transactions.iter().all(|t| t.is_final(height, self.block_header.time)) + self.transactions.iter().all(|t| t.is_final_in_block(height, self.block_header.time)) } } diff --git a/chain/src/transaction.rs b/chain/src/transaction.rs index c2dd5657..8e6c984f 100644 --- a/chain/src/transaction.rs +++ b/chain/src/transaction.rs @@ -250,7 +250,17 @@ impl Transaction { self.inputs.len() == 1 && self.inputs[0].previous_output.is_null() } - pub fn is_final(&self, block_height: u32, block_time: u32) -> bool { + pub fn is_final(&self) -> bool { + // if lock_time is 0, transaction is final + if self.lock_time == 0 { + return true; + } + // setting all sequence numbers to 0xffffffff disables the time lock, so if you want to use locktime, + // at least one input must have a sequence number below the maximum. + self.inputs.iter().all(TransactionInput::is_final) + } + + pub fn is_final_in_block(&self, block_height: u32, block_time: u32) -> bool { if self.lock_time == 0 { return true; } diff --git a/db/src/indexed_block.rs b/db/src/indexed_block.rs index dee6936b..e788a580 100644 --- a/db/src/indexed_block.rs +++ b/db/src/indexed_block.rs @@ -84,6 +84,6 @@ impl IndexedBlock { } pub fn is_final(&self, height: u32) -> bool { - self.transactions.iter().all(|tx| tx.raw.is_final(height, self.header.raw.time)) + self.transactions.iter().all(|tx| tx.raw.is_final_in_block(height, self.header.raw.time)) } } diff --git a/miner/src/block_assembler.rs b/miner/src/block_assembler.rs index abf4a268..eb119947 100644 --- a/miner/src/block_assembler.rs +++ b/miner/src/block_assembler.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use primitives::hash::H256; use chain::{OutPoint, TransactionOutput}; use db::{SharedStore, IndexedTransaction, PreviousTransactionOutputProvider}; @@ -102,10 +103,6 @@ impl SizePolicy { self.finish_counter += 1; } - if fits { - self.current_size += size; - } - match (fits, finish) { (true, true) => NextStep::FinishAndAppend, (true, false) => NextStep::Append, @@ -113,6 +110,10 @@ impl SizePolicy { (false, false) => NextStep::Ignore, } } + + fn apply(&mut self, size: u32) { + self.current_size += size; + } } /// Block assembler @@ -136,25 +137,34 @@ struct FittingTransactionsIterator<'a, T> { store: &'a PreviousTransactionOutputProvider, /// Memory pool transactions iterator iter: T, + /// New block height + block_height: u32, + /// New block time + block_time: u32, /// Size policy decides if transactions size fits the block block_size: SizePolicy, /// Sigops policy decides if transactions sigops fits the block sigops: SizePolicy, /// Previous entries are needed to get previous transaction outputs previous_entries: Vec<&'a Entry>, + /// Hashes of ignored entries + ignored: HashSet, /// True if block is already full finished: bool, } impl<'a, T> FittingTransactionsIterator<'a, T> where T: Iterator { - fn new(store: &'a PreviousTransactionOutputProvider, iter: T, max_block_size: u32, max_block_sigops: u32) -> Self { + fn new(store: &'a PreviousTransactionOutputProvider, iter: T, max_block_size: u32, max_block_sigops: u32, block_height: u32, block_time: u32) -> Self { FittingTransactionsIterator { store: store, iter: iter, + block_height: block_height, + block_time: block_time, // reserve some space for header and transations len field block_size: SizePolicy::new(BLOCK_HEADER_SIZE + 4, max_block_size, 1_000, 50), sigops: SizePolicy::new(0, max_block_sigops, 8, 50), previous_entries: Vec::new(), + ignored: HashSet::new(), finished: false, } } @@ -192,18 +202,34 @@ impl<'a, T> Iterator for FittingTransactionsIterator<'a, T> where T: Iterator { + self.block_size.apply(transaction_size); + self.sigops.apply(transaction_size); self.previous_entries.push(entry); return Some(entry); }, NextStep::FinishAndAppend => { self.finished = true; + self.block_size.apply(transaction_size); + self.sigops.apply(transaction_size); self.previous_entries.push(entry); return Some(entry); }, NextStep::Ignore => (), NextStep::FinishAndIgnore => { + self.ignored.insert(entry.hash.clone()); self.finished = true; }, } @@ -227,7 +253,7 @@ impl BlockAssembler { let mut transactions = Vec::new(); let mempool_iter = mempool.iter(OrderingStrategy::ByTransactionScore); - let tx_iter = FittingTransactionsIterator::new(store.as_previous_transaction_output_provider(), mempool_iter, self.max_block_size, self.max_block_sigops); + let tx_iter = FittingTransactionsIterator::new(store.as_previous_transaction_output_provider(), mempool_iter, self.max_block_size, self.max_block_sigops, height, time); for entry in tx_iter { // miner_fee is i64, but we can safely cast it to u64 // memory pool should restrict miner fee to be positive @@ -260,18 +286,18 @@ mod tests { #[test] fn test_size_policy() { let mut size_policy = SizePolicy::new(0, 1000, 200, 3); - assert_eq!(size_policy.decide(100), NextStep::Append); - assert_eq!(size_policy.decide(500), NextStep::Append); + assert_eq!(size_policy.decide(100), NextStep::Append); size_policy.apply(100); + assert_eq!(size_policy.decide(500), NextStep::Append); size_policy.apply(500); assert_eq!(size_policy.decide(600), NextStep::Ignore); - assert_eq!(size_policy.decide(200), NextStep::Append); + assert_eq!(size_policy.decide(200), NextStep::Append); size_policy.apply(200); assert_eq!(size_policy.decide(300), NextStep::Ignore); assert_eq!(size_policy.decide(300), NextStep::Ignore); // this transaction will make counter + buffer > max size - assert_eq!(size_policy.decide(1), NextStep::Append); + assert_eq!(size_policy.decide(1), NextStep::Append); size_policy.apply(1); // so now only 3 more transactions may accepted / ignored - assert_eq!(size_policy.decide(1), NextStep::Append); + assert_eq!(size_policy.decide(1), NextStep::Append); size_policy.apply(1); assert_eq!(size_policy.decide(1000), NextStep::Ignore); - assert_eq!(size_policy.decide(1), NextStep::FinishAndAppend); + assert_eq!(size_policy.decide(1), NextStep::FinishAndAppend); size_policy.apply(1); // we should not call decide again after it returned finish... // but we can, let's check if result is ok assert_eq!(size_policy.decide(1000), NextStep::FinishAndIgnore); @@ -294,11 +320,21 @@ mod tests { let entries: Vec = Vec::new(); let store_ref: &[_] = &store; - let iter = FittingTransactionsIterator::new(&store_ref, entries.iter(), MAX_BLOCK_SIZE as u32, MAX_BLOCK_SIGOPS as u32); + let iter = FittingTransactionsIterator::new(&store_ref, entries.iter(), MAX_BLOCK_SIZE as u32, MAX_BLOCK_SIGOPS as u32, 0, 0); assert!(iter.collect::>().is_empty()); } #[test] fn test_fitting_transactions_iterator_max_block_size_reached() { } + + #[test] + fn test_fitting_transactions_iterator_ignored_parent() { + // TODO + } + + #[test] + fn test_fitting_transactions_iterator_locked_transaction() { + // TODO + } } diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 27cd1bde..9226a690 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -17,5 +17,6 @@ mod memory_pool; pub use block_assembler::{BlockAssembler, BlockTemplate}; pub use cpu_miner::find_solution; -pub use memory_pool::{MemoryPool, Information as MemoryPoolInformation, OrderingStrategy as MemoryPoolOrderingStrategy}; +pub use memory_pool::{MemoryPool, HashedOutPoint, DoubleSpendCheckResult, Information as MemoryPoolInformation, + OrderingStrategy as MemoryPoolOrderingStrategy}; pub use fee::{transaction_fee, transaction_fee_rate}; diff --git a/miner/src/memory_pool.rs b/miner/src/memory_pool.rs index 72a20d0f..faccaa49 100644 --- a/miner/src/memory_pool.rs +++ b/miner/src/memory_pool.rs @@ -141,11 +141,21 @@ struct ByPackageScoreOrderedEntry { } #[derive(Debug, PartialEq, Eq, Clone)] -struct HashedOutPoint { - /// Transasction output point +pub struct HashedOutPoint { + /// Transaction output point out_point: OutPoint, } +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum DoubleSpendCheckResult { + /// No double spend + NoDoubleSpend, + /// Input {self.1, self.2} of new transaction is already spent in previous final memory-pool transaction {self.0} + DoubleSpend(H256, H256, u32), + /// Some inputs of new transaction are already spent by locked memory-pool transactions + LockedDoubleSpend(HashSet), +} + impl From for HashedOutPoint { fn from(out_point: OutPoint) -> Self { HashedOutPoint { @@ -400,6 +410,41 @@ impl Storage { }) } + pub fn check_double_spend(&self, transaction: &Transaction) -> DoubleSpendCheckResult { + let mut locked_spends: HashSet = HashSet::new(); + for input in &transaction.inputs { + let mut queue: VecDeque = VecDeque::new(); + queue.push_back(input.previous_output.clone()); + + while let Some(prevout) = queue.pop_front() { + // if the same output is already spent with another transaction + if let Some(entry_hash) = self.by_previous_output.get(&prevout.clone().into()).cloned() { + let entry = self.by_hash.get(&entry_hash).expect("checked that it exists line above; qed"); + // check if this is final transaction. If so, that's an double-spend error + if entry.transaction.is_final() { + return DoubleSpendCheckResult::DoubleSpend(entry_hash, prevout.hash, prevout.index); + } + // this prevout is holded by locked transaction + locked_spends.insert(prevout.into()); + // transaction is not final => new transaction possibly replace it in memory pool + // we should also 'virtually exclude' all descendant transactions from emory pool + let locked_outputs: Vec<_> = entry.transaction.outputs.iter().enumerate().map(|(idx, _)| OutPoint { + hash: entry_hash.clone(), + index: idx as u32, + }).collect(); + locked_spends.extend(locked_outputs.iter().cloned().map(Into::into)); + queue.extend(locked_outputs); + } + } + } + + if locked_spends.is_empty() { + DoubleSpendCheckResult::NoDoubleSpend + } else { + DoubleSpendCheckResult::LockedDoubleSpend(locked_spends) + } + } + pub fn remove_by_prevout(&mut self, prevout: &OutPoint) -> Option> { let mut queue: VecDeque = VecDeque::new(); let mut removed: Vec = Vec::new(); @@ -407,7 +452,7 @@ impl Storage { while let Some(prevout) = queue.pop_front() { if let Some(entry_hash) = self.by_previous_output.get(&prevout.clone().into()).cloned() { - let entry = self.remove_by_hash(&entry_hash).expect("checket that it exists line above; qed"); + let entry = self.remove_by_hash(&entry_hash).expect("checked that it exists line above; qed"); queue.extend(entry.transaction.outputs.iter().enumerate().map(|(idx, _)| OutPoint { hash: entry_hash.clone(), index: idx as u32, @@ -604,6 +649,11 @@ impl MemoryPool { self.storage.remove_by_hash(h).map(|entry| entry.transaction) } + /// Checks double spend result + pub fn check_double_spend(&self, transaction: &Transaction) -> DoubleSpendCheckResult { + self.storage.check_double_spend(transaction) + } + /// Removes transaction (and all its descendants) which has spent given output pub fn remove_by_prevout(&mut self, prevout: &OutPoint) -> Option> { self.storage.remove_by_prevout(prevout) @@ -795,7 +845,7 @@ impl<'a> Iterator for MemoryPoolIterator<'a> { mod tests { use chain::{Transaction, OutPoint}; use heapsize::HeapSizeOf; - use super::{MemoryPool, OrderingStrategy}; + use super::{MemoryPool, OrderingStrategy, DoubleSpendCheckResult}; use test_data::{ChainBuilder, TransactionBuilder}; fn to_memory_pool(chain: &mut ChainBuilder) -> MemoryPool { @@ -1222,4 +1272,38 @@ mod tests { assert_eq!(pool.remove_by_prevout(&OutPoint { hash: chain.hash(0), index: 0 }), Some(vec![chain.at(1), chain.at(2)])); assert_eq!(pool.information().transactions_count, 2); } + + #[test] + fn test_memory_pool_check_double_spend() { + let chain = &mut ChainBuilder::new(); + + TransactionBuilder::with_output(10).add_output(10).add_output(10).store(chain) // transaction0 + .reset().set_input(&chain.at(0), 0).add_output(20).lock().store(chain) // locked: transaction0 -> transaction1 + .reset().set_input(&chain.at(0), 0).add_output(30).store(chain) // good replacement: transaction0 -> transaction2 + .reset().set_input(&chain.at(0), 1).add_output(40).store(chain) // not-locked: transaction0 -> transaction3 + .reset().set_input(&chain.at(0), 1).add_output(50).store(chain) // bad replacement: transaction0 -> transaction4 + .reset().set_input(&chain.at(0), 2).add_output(60).store(chain); // no double spending: transaction0 -> transaction5 + let mut pool = MemoryPool::new(); + pool.insert_verified(chain.at(1)); + pool.insert_verified(chain.at(3)); + // check locked double spends + match pool.check_double_spend(&chain.at(2)) { + DoubleSpendCheckResult::LockedDoubleSpend(hs) => assert!(hs.contains(&chain.at(1).inputs[0].previous_output.clone().into())), + _ => panic!("unexpected"), + } + // check unlocked double spends + match pool.check_double_spend(&chain.at(4)) { + DoubleSpendCheckResult::DoubleSpend(hash1, hash2, index) => { + assert_eq!(hash1, chain.at(3).hash()); + assert_eq!(hash2, chain.at(0).hash()); + assert_eq!(index, 1); + }, + _ => panic!("unexpected"), + } + // check no-double spends + match pool.check_double_spend(&chain.at(5)) { + DoubleSpendCheckResult::NoDoubleSpend => (), + _ => panic!("unexpected"), + } + } } diff --git a/sync/src/local_node.rs b/sync/src/local_node.rs index ac7f81fb..5ec801a5 100644 --- a/sync/src/local_node.rs +++ b/sync/src/local_node.rs @@ -634,12 +634,12 @@ mod tests { let transaction_hash = transaction.hash(); let result = local_node.accept_transaction(transaction); - assert_eq!(result, Ok(transaction_hash)); + assert_eq!(result, Ok(transaction_hash.clone())); assert_eq!(executor.lock().take_tasks(), vec![Task::SendInventory(peer_index1, vec![InventoryVector { inv_type: InventoryType::MessageTx, - hash: "0791efccd035c5fe501023ff888106eba5eff533965de4a6e06400f623bcac34".into(), + hash: transaction_hash, }] )] ); diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 1a1299f6..28a1ea4d 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -677,6 +677,13 @@ impl Chain { /// Insert transaction to memory pool pub fn insert_verified_transaction(&mut self, transaction: Transaction) { + // we have verified transaction, but possibly this transaction replaces + // existing transaction from memory pool + // => remove previous transactions before + for input in &transaction.inputs { + self.memory_pool.remove_by_prevout(&input.previous_output); + } + // now insert transaction itself self.memory_pool.insert_verified(transaction); } @@ -1269,4 +1276,20 @@ mod tests { headers[4].clone(), ]), HeadersIntersection::DeadEnd(0)); } + + #[test] + fn update_memory_pool_transaction() { + use test_data::{ChainBuilder, TransactionBuilder}; + + let data_chain = &mut ChainBuilder::new(); + TransactionBuilder::with_output(10).add_output(10).add_output(10).store(data_chain) // transaction0 + .reset().set_input(&data_chain.at(0), 0).add_output(20).lock().store(data_chain) // transaction0 -> transaction1 + .reset().set_input(&data_chain.at(0), 0).add_output(30).store(data_chain); // transaction0 -> transaction2 + + let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block())); + chain.insert_verified_transaction(data_chain.at(1)); + assert_eq!(chain.information().transactions.transactions_count, 1); + chain.insert_verified_transaction(data_chain.at(2)); + assert_eq!(chain.information().transactions.transactions_count, 1); // tx was replaces + } } diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index b6c7204b..c1c9492a 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -1,12 +1,13 @@ use std::thread; -use std::collections::VecDeque; +use std::collections::{VecDeque, HashSet}; use std::sync::Arc; use std::sync::mpsc::{channel, Sender, Receiver}; use chain::{Transaction, OutPoint, TransactionOutput}; use network::Magic; +use miner::{HashedOutPoint, DoubleSpendCheckResult}; use primitives::hash::H256; use synchronization_chain::ChainRef; -use verification::{BackwardsCompatibleChainVerifier as ChainVerifier, Verify as VerificationVerify, Chain}; +use verification::{self, BackwardsCompatibleChainVerifier as ChainVerifier, Verify as VerificationVerify, Chain}; use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider, TransactionOutputObserver}; use time::get_time; @@ -57,12 +58,23 @@ pub struct AsyncVerifier { verification_worker_thread: Option>, } +/// Transaction output observer, which looks into storage && into memory pool struct ChainMemoryPoolTransactionOutputProvider { + /// Chain reference chain: ChainRef, + /// Previous outputs, for which we should return 'Not spent' value. + /// These are used when new version of transaction is received. + locked_outputs: Option>, } -#[derive(Default)] -struct EmptyTransactionOutputProvider { +impl VerificationTask { + /// Returns transaction reference if it is transaction verification task + pub fn transaction(&self) -> Option<&Transaction> { + match self { + &VerificationTask::VerifyTransaction(_, ref transaction) => Some(&transaction), + _ => None, + } + } } impl AsyncVerifier { @@ -86,8 +98,22 @@ impl AsyncVerifier { match task { VerificationTask::Stop => break, _ => { - let prevout_provider = ChainMemoryPoolTransactionOutputProvider::with_chain(chain.clone()); - execute_verification_task(&sink, &prevout_provider, &verifier, task) + let prevout_provider = if let Some(ref transaction) = task.transaction() { + match ChainMemoryPoolTransactionOutputProvider::for_transaction(chain.clone(), transaction) { + Err(e) => { + sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()); + return; + }, + Ok(prevout_provider) => Some(prevout_provider), + } + } else { + None + }; + let prevout_provider = match prevout_provider { + Some(ref prevout_provider) => Some(prevout_provider), + None => None, + }; + execute_verification_task(&sink, prevout_provider, &verifier, task) }, } } @@ -137,23 +163,23 @@ impl SyncVerifier where T: VerificationSink { verifier: verifier, sink: sink, } - } } + } impl Verifier for SyncVerifier where T: VerificationSink { /// Verify block fn verify_block(&self, block: IndexedBlock) { - execute_verification_task(&self.sink, &EmptyTransactionOutputProvider::default(), &self.verifier, VerificationTask::VerifyBlock(block)) + execute_verification_task::(&self.sink, None, &self.verifier, VerificationTask::VerifyBlock(block)) } /// Verify transaction - fn verify_transaction(&self, height: u32, transaction: Transaction) { - execute_verification_task(&self.sink, &EmptyTransactionOutputProvider::default(), &self.verifier, VerificationTask::VerifyTransaction(height, transaction)) + fn verify_transaction(&self, _height: u32, _transaction: Transaction) { + unimplemented!() // sync verifier is currently only used for blocks verification } } /// Execute single verification task -fn execute_verification_task(sink: &Arc, tx_output_provider: &U, verifier: &ChainVerifier, task: VerificationTask) { +fn execute_verification_task(sink: &Arc, tx_output_provider: Option<&U>, verifier: &ChainVerifier, task: VerificationTask) { let mut tasks_queue: VecDeque = VecDeque::new(); tasks_queue.push_back(task); @@ -178,6 +204,7 @@ fn execute_verification_task { let time: u32 = get_time().sec as u32; + let tx_output_provider = tx_output_provider.expect("must be provided for transaction checks"); match verifier.verify_mempool_transaction(tx_output_provider, height, time, &transaction) { Ok(_) => sink.on_transaction_verification_success(transaction), Err(e) => sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()), @@ -189,13 +216,36 @@ fn execute_verification_task Self { - ChainMemoryPoolTransactionOutputProvider { - chain: chain, + pub fn for_transaction(chain: ChainRef, transaction: &Transaction) -> Result { + // we have to check if there are another in-mempool transactions which spent same outputs here + match chain.read().memory_pool().check_double_spend(transaction) { + DoubleSpendCheckResult::DoubleSpend(_, hash, index) => Err(verification::TransactionError::UsingSpentOutput(hash, index)), + DoubleSpendCheckResult::NoDoubleSpend => Ok(ChainMemoryPoolTransactionOutputProvider { + chain: chain.clone(), + locked_outputs: None, + }), + DoubleSpendCheckResult::LockedDoubleSpend(locked_outputs) => Ok(ChainMemoryPoolTransactionOutputProvider { + chain: chain.clone(), + locked_outputs: Some(locked_outputs), + }), } } } +impl TransactionOutputObserver for ChainMemoryPoolTransactionOutputProvider { + fn is_spent(&self, prevout: &OutPoint) -> Option { + // check if this output is 'locked' by mempool transaction + if let Some(ref locked_outputs) = self.locked_outputs { + if locked_outputs.contains(&prevout.clone().into()) { + return Some(false); + } + } + + // check spending in storage + self.chain.read().storage().transaction_meta(&prevout.hash).and_then(|tm| tm.is_spent(prevout.index as usize)) + } +} + impl PreviousTransactionOutputProvider for ChainMemoryPoolTransactionOutputProvider { fn previous_transaction_output(&self, prevout: &OutPoint) -> Option { let chain = self.chain.read(); @@ -204,39 +254,16 @@ impl PreviousTransactionOutputProvider for ChainMemoryPoolTransactionOutputProvi } } -impl TransactionOutputObserver for ChainMemoryPoolTransactionOutputProvider { - fn is_spent(&self, prevout: &OutPoint) -> Option { - let chain = self.chain.read(); - if chain.memory_pool().is_spent(prevout) { - return Some(true); - } - chain.storage().transaction_meta(&prevout.hash).and_then(|tm| tm.is_spent(prevout.index as usize)) - } -} - -impl PreviousTransactionOutputProvider for EmptyTransactionOutputProvider { - fn previous_transaction_output(&self, _prevout: &OutPoint) -> Option { - None - } -} - -impl TransactionOutputObserver for EmptyTransactionOutputProvider { - fn is_spent(&self, _prevout: &OutPoint) -> Option { - None - } -} - #[cfg(test)] pub mod tests { use std::sync::Arc; use std::collections::HashMap; - use parking_lot::RwLock; use chain::Transaction; - use synchronization_chain::{Chain, ChainRef}; + use synchronization_chain::Chain; use synchronization_client::CoreVerificationSink; use synchronization_executor::tests::DummyTaskExecutor; use primitives::hash::H256; - use super::{Verifier, BlockVerificationSink, TransactionVerificationSink, ChainMemoryPoolTransactionOutputProvider}; + use super::{Verifier, BlockVerificationSink, TransactionVerificationSink}; use db::{self, IndexedBlock}; use test_data; @@ -283,16 +310,13 @@ pub mod tests { #[test] fn when_transaction_spends_output_twice() { - use db::TransactionOutputObserver; let tx1: Transaction = test_data::TransactionBuilder::with_default_input(0).into(); let tx2: Transaction = test_data::TransactionBuilder::with_default_input(1).into(); let out1 = tx1.inputs[0].previous_output.clone(); let out2 = tx2.inputs[0].previous_output.clone(); let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block())); chain.memory_pool_mut().insert_verified(tx1); - let chain = ChainRef::new(RwLock::new(chain)); - let provider = ChainMemoryPoolTransactionOutputProvider::with_chain(chain); - assert!(provider.is_spent(&out1).unwrap_or_default()); - assert!(!provider.is_spent(&out2).unwrap_or_default()); + assert!(chain.memory_pool().is_spent(&out1)); + assert!(!chain.memory_pool().is_spent(&out2)); } } diff --git a/test-data/src/chain_builder.rs b/test-data/src/chain_builder.rs index 5a37f9b1..62d23f83 100644 --- a/test-data/src/chain_builder.rs +++ b/test-data/src/chain_builder.rs @@ -97,7 +97,7 @@ impl TransactionBuilder { index: output_index, }, script_sig: Bytes::new_with_len(0), - sequence: 0, + sequence: 0xffffffff, }); self } @@ -113,11 +113,17 @@ impl TransactionBuilder { index: output_index, }, script_sig: Bytes::new_with_len(0), - sequence: 0, + sequence: 0xffffffff, }]; self } + pub fn lock(mut self) -> Self { + self.transaction.inputs[0].sequence = 0; + self.transaction.lock_time = 500000; + self + } + pub fn store(self, chain: &mut ChainBuilder) -> Self { chain.transactions.push(self.transaction.clone()); self From 768912c759680bc201d2e9a14b1f8cc17d1fc057 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 14 Dec 2016 00:23:23 +0300 Subject: [PATCH 2/4] fix after merhe --- sync/src/synchronization_verifier.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index 5c71eae9..e35aba2a 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -8,7 +8,7 @@ use miner::{HashedOutPoint, DoubleSpendCheckResult}; use primitives::hash::H256; use synchronization_chain::ChainRef; use verification::{self, BackwardsCompatibleChainVerifier as ChainVerifier, Verify as VerificationVerify, Chain}; -use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider, TransactionOutputObserver}; +use db::{SharedStore, PreviousTransactionOutputProvider, TransactionOutputObserver}; use time::get_time; /// Block verification events sink @@ -263,8 +263,9 @@ pub mod tests { use synchronization_client::CoreVerificationSink; use synchronization_executor::tests::DummyTaskExecutor; use primitives::hash::H256; + use chain::IndexedBlock; use super::{Verifier, BlockVerificationSink, TransactionVerificationSink}; - use db::{self, IndexedBlock}; + use db; use test_data; #[derive(Default)] From a9dcc0d6f7085b074b4df23d45a0fc30d2254dda Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 14 Dec 2016 02:08:07 +0300 Subject: [PATCH 3/4] fix bad pattern --- sync/src/synchronization_verifier.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index e35aba2a..26e53d5f 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -109,11 +109,7 @@ impl AsyncVerifier { } else { None }; - let prevout_provider = match prevout_provider { - Some(ref prevout_provider) => Some(prevout_provider), - None => None, - }; - execute_verification_task(&sink, prevout_provider, &verifier, task) + execute_verification_task(&sink, prevout_provider.as_ref(), &verifier, task) }, } } From be53bbdc3e14582718b1b1a446a973a7ab3a872a Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 14 Dec 2016 14:22:02 +0300 Subject: [PATCH 4/4] added some more tests --- miner/src/lib.rs | 4 +- miner/src/memory_pool.rs | 162 +++++++++++++++++++-------- sync/src/synchronization_client.rs | 5 + sync/src/synchronization_verifier.rs | 82 +++++++++++--- 4 files changed, 190 insertions(+), 63 deletions(-) diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 9226a690..c2f38adc 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -17,6 +17,6 @@ mod memory_pool; pub use block_assembler::{BlockAssembler, BlockTemplate}; pub use cpu_miner::find_solution; -pub use memory_pool::{MemoryPool, HashedOutPoint, DoubleSpendCheckResult, Information as MemoryPoolInformation, - OrderingStrategy as MemoryPoolOrderingStrategy}; +pub use memory_pool::{MemoryPool, HashedOutPoint, Information as MemoryPoolInformation, + OrderingStrategy as MemoryPoolOrderingStrategy, DoubleSpendCheckResult, NonFinalDoubleSpendSet}; pub use fee::{transaction_fee, transaction_fee_rate}; diff --git a/miner/src/memory_pool.rs b/miner/src/memory_pool.rs index faccaa49..fda41156 100644 --- a/miner/src/memory_pool.rs +++ b/miner/src/memory_pool.rs @@ -146,14 +146,26 @@ pub struct HashedOutPoint { out_point: OutPoint, } -#[derive(Debug, PartialEq, Eq, Clone)] +/// Result of checking double spend with +#[derive(Debug, PartialEq)] pub enum DoubleSpendCheckResult { /// No double spend NoDoubleSpend, /// Input {self.1, self.2} of new transaction is already spent in previous final memory-pool transaction {self.0} DoubleSpend(H256, H256, u32), - /// Some inputs of new transaction are already spent by locked memory-pool transactions - LockedDoubleSpend(HashSet), + /// Some inputs of new transaction are already spent by non-final memory-pool transactions + NonFinalDoubleSpend(NonFinalDoubleSpendSet), +} + +/// Set of transaction outputs, which can be replaced if newer transaction +/// replaces non-final transaction in memory pool +#[derive(Debug, PartialEq)] +pub struct NonFinalDoubleSpendSet { + /// Double-spend outputs (outputs of newer transaction, which are also spent by nonfinal transactions of mempool) + pub double_spends: HashSet, + /// Outputs which also will be removed from memory pool in case of newer transaction insertion + /// (i.e. outputs of nonfinal transactions && their descendants) + pub dependent_spends: HashSet, } impl From for HashedOutPoint { @@ -411,37 +423,45 @@ impl Storage { } pub fn check_double_spend(&self, transaction: &Transaction) -> DoubleSpendCheckResult { - let mut locked_spends: HashSet = HashSet::new(); - for input in &transaction.inputs { - let mut queue: VecDeque = VecDeque::new(); - queue.push_back(input.previous_output.clone()); + let mut double_spends: HashSet = HashSet::new(); + let mut dependent_spends: HashSet = HashSet::new(); - while let Some(prevout) = queue.pop_front() { - // if the same output is already spent with another transaction - if let Some(entry_hash) = self.by_previous_output.get(&prevout.clone().into()).cloned() { - let entry = self.by_hash.get(&entry_hash).expect("checked that it exists line above; qed"); - // check if this is final transaction. If so, that's an double-spend error - if entry.transaction.is_final() { - return DoubleSpendCheckResult::DoubleSpend(entry_hash, prevout.hash, prevout.index); + for input in &transaction.inputs { + // find transaction that spends the same output + let prevout: HashedOutPoint = input.previous_output.clone().into(); + if let Some(entry_hash) = self.by_previous_output.get(&prevout).cloned() { + // check if this is final transaction. If so, that's a potential double-spend error + let entry = self.by_hash.get(&entry_hash).expect("checked that it exists line above; qed"); + if entry.transaction.is_final() { + return DoubleSpendCheckResult::DoubleSpend(entry_hash, prevout.out_point.hash, prevout.out_point.index); + } + // else remember this double spend + double_spends.insert(prevout.clone()); + // and 'virtually' remove entry && all descendants from mempool + let mut queue: VecDeque = VecDeque::new(); + queue.push_back(prevout); + while let Some(dependent_prevout) = queue.pop_front() { + // if the same output is already spent with another in-pool transaction + if let Some(dependent_entry_hash) = self.by_previous_output.get(&dependent_prevout).cloned() { + let dependent_entry = self.by_hash.get(&dependent_entry_hash).expect("checked that it exists line above; qed"); + let dependent_outputs: Vec<_> = dependent_entry.transaction.outputs.iter().enumerate().map(|(idx, _)| OutPoint { + hash: dependent_entry_hash.clone(), + index: idx as u32, + }.into()).collect(); + dependent_spends.extend(dependent_outputs.clone()); + queue.extend(dependent_outputs); } - // this prevout is holded by locked transaction - locked_spends.insert(prevout.into()); - // transaction is not final => new transaction possibly replace it in memory pool - // we should also 'virtually exclude' all descendant transactions from emory pool - let locked_outputs: Vec<_> = entry.transaction.outputs.iter().enumerate().map(|(idx, _)| OutPoint { - hash: entry_hash.clone(), - index: idx as u32, - }).collect(); - locked_spends.extend(locked_outputs.iter().cloned().map(Into::into)); - queue.extend(locked_outputs); } } } - if locked_spends.is_empty() { + if double_spends.is_empty() { DoubleSpendCheckResult::NoDoubleSpend } else { - DoubleSpendCheckResult::LockedDoubleSpend(locked_spends) + DoubleSpendCheckResult::NonFinalDoubleSpend(NonFinalDoubleSpendSet { + double_spends: double_spends, + dependent_spends: dependent_spends, + }) } } @@ -649,7 +669,7 @@ impl MemoryPool { self.storage.remove_by_hash(h).map(|entry| entry.transaction) } - /// Checks double spend result + /// Checks if `transaction` spends some outputs, already spent by inpool transactions. pub fn check_double_spend(&self, transaction: &Transaction) -> DoubleSpendCheckResult { self.storage.check_double_spend(transaction) } @@ -1277,33 +1297,83 @@ mod tests { fn test_memory_pool_check_double_spend() { let chain = &mut ChainBuilder::new(); - TransactionBuilder::with_output(10).add_output(10).add_output(10).store(chain) // transaction0 - .reset().set_input(&chain.at(0), 0).add_output(20).lock().store(chain) // locked: transaction0 -> transaction1 - .reset().set_input(&chain.at(0), 0).add_output(30).store(chain) // good replacement: transaction0 -> transaction2 - .reset().set_input(&chain.at(0), 1).add_output(40).store(chain) // not-locked: transaction0 -> transaction3 - .reset().set_input(&chain.at(0), 1).add_output(50).store(chain) // bad replacement: transaction0 -> transaction4 - .reset().set_input(&chain.at(0), 2).add_output(60).store(chain); // no double spending: transaction0 -> transaction5 + TransactionBuilder::with_output(10).add_output(10).add_output(10).store(chain) // t0 + .reset().set_input(&chain.at(0), 0).add_output(20).lock().store(chain) // nonfinal: t0[0] -> t1 + .reset().set_input(&chain.at(1), 0).add_output(30).store(chain) // dependent: t0[0] -> t1[0] -> t2 + .reset().set_input(&chain.at(0), 0).add_output(40).store(chain) // good replacement: t0[0] -> t3 + .reset().set_input(&chain.at(0), 1).add_output(50).store(chain) // final: t0[1] -> t4 + .reset().set_input(&chain.at(0), 1).add_output(60).store(chain) // bad replacement: t0[1] -> t5 + .reset().set_input(&chain.at(0), 2).add_output(70).store(chain); // no double spend: t0[2] -> t6 + let mut pool = MemoryPool::new(); pool.insert_verified(chain.at(1)); - pool.insert_verified(chain.at(3)); - // check locked double spends - match pool.check_double_spend(&chain.at(2)) { - DoubleSpendCheckResult::LockedDoubleSpend(hs) => assert!(hs.contains(&chain.at(1).inputs[0].previous_output.clone().into())), - _ => panic!("unexpected"), - } - // check unlocked double spends - match pool.check_double_spend(&chain.at(4)) { - DoubleSpendCheckResult::DoubleSpend(hash1, hash2, index) => { - assert_eq!(hash1, chain.at(3).hash()); - assert_eq!(hash2, chain.at(0).hash()); - assert_eq!(index, 1); + pool.insert_verified(chain.at(2)); + pool.insert_verified(chain.at(4)); + // when output is spent by nonfinal transaction + match pool.check_double_spend(&chain.at(3)) { + DoubleSpendCheckResult::NonFinalDoubleSpend(set) => { + assert_eq!(set.double_spends.len(), 1); + assert!(set.double_spends.contains(&chain.at(1).inputs[0].previous_output.clone().into())); + assert_eq!(set.dependent_spends.len(), 2); + assert!(set.dependent_spends.contains(&OutPoint { + hash: chain.at(1).hash(), + index: 0, + }.into())); + assert!(set.dependent_spends.contains(&OutPoint { + hash: chain.at(2).hash(), + index: 0, + }.into())); }, _ => panic!("unexpected"), } - // check no-double spends + // when output is spent by final transaction match pool.check_double_spend(&chain.at(5)) { + DoubleSpendCheckResult::DoubleSpend(inpool_hash, prev_hash, prev_index) => { + assert_eq!(inpool_hash, chain.at(4).hash()); + assert_eq!(prev_hash, chain.at(0).hash()); + assert_eq!(prev_index, 1); + }, + _ => panic!("unexpected"), + } + // when output is not spent at all + match pool.check_double_spend(&chain.at(6)) { DoubleSpendCheckResult::NoDoubleSpend => (), _ => panic!("unexpected"), } } + + #[test] + fn test_memory_pool_check_double_spend_multiple_dependent_outputs() { + let chain = &mut ChainBuilder::new(); + + TransactionBuilder::with_output(100).store(chain) // t0 + .reset().set_input(&chain.at(0), 0).add_output(20).add_output(30).add_output(50).lock().store(chain) // nonfinal: t0[0] -> t1 + .reset().set_input(&chain.at(0), 0).add_output(40).store(chain); // good replacement: t0[0] -> t2 + + let mut pool = MemoryPool::new(); + pool.insert_verified(chain.at(1)); + + // when output is spent by nonfinal transaction + match pool.check_double_spend(&chain.at(2)) { + DoubleSpendCheckResult::NonFinalDoubleSpend(set) => { + assert_eq!(set.double_spends.len(), 1); + assert!(set.double_spends.contains(&chain.at(1).inputs[0].previous_output.clone().into())); + assert_eq!(set.dependent_spends.len(), 3); + assert!(set.dependent_spends.contains(&OutPoint { + hash: chain.at(1).hash(), + index: 0, + }.into())); + assert!(set.dependent_spends.contains(&OutPoint { + hash: chain.at(1).hash(), + index: 1, + }.into())); + assert!(set.dependent_spends.contains(&OutPoint { + hash: chain.at(1).hash(), + index: 2, + }.into())); + }, + _ => panic!("unexpected"), + } + + } } diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index 2dfd8db8..a2e03b16 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -2935,4 +2935,9 @@ pub mod tests { // should not panic sync.on_peer_transaction(1, test_data::TransactionBuilder::with_default_input(0).into()); } + + #[test] + fn when_transaction_replaces_locked_transaction() { + // TODO + } } diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index 26e53d5f..553f83d5 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -1,10 +1,10 @@ use std::thread; -use std::collections::{VecDeque, HashSet}; +use std::collections::VecDeque; use std::sync::Arc; use std::sync::mpsc::{channel, Sender, Receiver}; use chain::{Transaction, OutPoint, TransactionOutput, IndexedBlock}; use network::Magic; -use miner::{HashedOutPoint, DoubleSpendCheckResult}; +use miner::{DoubleSpendCheckResult, NonFinalDoubleSpendSet}; use primitives::hash::H256; use synchronization_chain::ChainRef; use verification::{self, BackwardsCompatibleChainVerifier as ChainVerifier, Verify as VerificationVerify, Chain}; @@ -64,7 +64,7 @@ struct ChainMemoryPoolTransactionOutputProvider { chain: ChainRef, /// Previous outputs, for which we should return 'Not spent' value. /// These are used when new version of transaction is received. - locked_outputs: Option>, + nonfinal_spends: Option, } impl VerificationTask { @@ -214,15 +214,20 @@ fn execute_verification_task Result { // we have to check if there are another in-mempool transactions which spent same outputs here - match chain.read().memory_pool().check_double_spend(transaction) { + let check_result = chain.read().memory_pool().check_double_spend(transaction); + ChainMemoryPoolTransactionOutputProvider::for_double_spend_check_result(chain, check_result) + } + + pub fn for_double_spend_check_result(chain: ChainRef, check_result: DoubleSpendCheckResult) -> Result { + match check_result { DoubleSpendCheckResult::DoubleSpend(_, hash, index) => Err(verification::TransactionError::UsingSpentOutput(hash, index)), DoubleSpendCheckResult::NoDoubleSpend => Ok(ChainMemoryPoolTransactionOutputProvider { chain: chain.clone(), - locked_outputs: None, + nonfinal_spends: None, }), - DoubleSpendCheckResult::LockedDoubleSpend(locked_outputs) => Ok(ChainMemoryPoolTransactionOutputProvider { + DoubleSpendCheckResult::NonFinalDoubleSpend(nonfinal_spends) => Ok(ChainMemoryPoolTransactionOutputProvider { chain: chain.clone(), - locked_outputs: Some(locked_outputs), + nonfinal_spends: Some(nonfinal_spends), }), } } @@ -231,19 +236,32 @@ impl ChainMemoryPoolTransactionOutputProvider { impl TransactionOutputObserver for ChainMemoryPoolTransactionOutputProvider { fn is_spent(&self, prevout: &OutPoint) -> Option { // check if this output is 'locked' by mempool transaction - if let Some(ref locked_outputs) = self.locked_outputs { - if locked_outputs.contains(&prevout.clone().into()) { + if let Some(ref nonfinal_spends) = self.nonfinal_spends { + if nonfinal_spends.double_spends.contains(&prevout.clone().into()) { return Some(false); } } - // check spending in storage - self.chain.read().storage().transaction_meta(&prevout.hash).and_then(|tm| tm.is_spent(prevout.index as usize)) + // we can omit memory_pool check here, because it has been completed in `for_transaction` method + // => just check spending in storage + self.chain.read().storage() + .transaction_meta(&prevout.hash) + .and_then(|tm| tm.is_spent(prevout.index as usize)) } } impl PreviousTransactionOutputProvider for ChainMemoryPoolTransactionOutputProvider { fn previous_transaction_output(&self, prevout: &OutPoint) -> Option { + // check if that is output of some transaction, which is vitually removed from memory pool + if let Some(ref nonfinal_spends) = self.nonfinal_spends { + if nonfinal_spends.dependent_spends.contains(&prevout.clone().into()) { + // transaction is trying to replace some nonfinal transaction + // + it is also depends on this transaction + // => this is definitely an error + return None; + } + } + let chain = self.chain.read(); chain.memory_pool().previous_transaction_output(prevout) .or_else(|| chain.storage().as_previous_transaction_output_provider().previous_transaction_output(prevout)) @@ -254,15 +272,16 @@ impl PreviousTransactionOutputProvider for ChainMemoryPoolTransactionOutputProvi pub mod tests { use std::sync::Arc; use std::collections::HashMap; - use chain::Transaction; - use synchronization_chain::Chain; + use chain::{Transaction, OutPoint}; + use synchronization_chain::{Chain, ChainRef}; use synchronization_client::CoreVerificationSink; use synchronization_executor::tests::DummyTaskExecutor; use primitives::hash::H256; use chain::IndexedBlock; - use super::{Verifier, BlockVerificationSink, TransactionVerificationSink}; - use db; + use super::{Verifier, BlockVerificationSink, TransactionVerificationSink, ChainMemoryPoolTransactionOutputProvider}; + use db::{self, TransactionOutputObserver, PreviousTransactionOutputProvider}; use test_data; + use parking_lot::RwLock; #[derive(Default)] pub struct DummyVerifier { @@ -316,4 +335,37 @@ pub mod tests { assert!(chain.memory_pool().is_spent(&out1)); assert!(!chain.memory_pool().is_spent(&out2)); } + + #[test] + fn when_transaction_depends_on_removed_nonfinal_transaction() { + let dchain = &mut test_data::ChainBuilder::new(); + + test_data::TransactionBuilder::with_output(10).store(dchain) // t0 + .reset().set_input(&dchain.at(0), 0).add_output(20).lock().store(dchain) // nonfinal: t0[0] -> t1 + .reset().set_input(&dchain.at(1), 0).add_output(30).store(dchain) // dependent: t0[0] -> t1[0] -> t2 + .reset().set_input(&dchain.at(0), 0).add_output(40).store(dchain); // good replacement: t0[0] -> t3 + + let mut chain = Chain::new(Arc::new(db::TestStorage::with_genesis_block())); + chain.memory_pool_mut().insert_verified(dchain.at(0)); + chain.memory_pool_mut().insert_verified(dchain.at(1)); + chain.memory_pool_mut().insert_verified(dchain.at(2)); + + // when inserting t3: + // check that is_spent(t0[0]) == Some(false) (as it is spent by nonfinal t1) + // check that is_spent(t1[0]) == None (as t1 is virtually removed) + // check that is_spent(t2[0]) == None (as t2 is virtually removed) + // check that previous_transaction_output(t0[0]) = Some(_) + // check that previous_transaction_output(t1[0]) = None (as t1 is virtually removed) + // check that previous_transaction_output(t2[0]) = None (as t2 is virtually removed) + // => + // if t3 is also depending on t1[0] || t2[0], it will be rejected by verification as missing inputs + let chain = ChainRef::new(RwLock::new(chain)); + let provider = ChainMemoryPoolTransactionOutputProvider::for_transaction(chain, &dchain.at(3)).unwrap(); + assert_eq!(provider.is_spent(&OutPoint { hash: dchain.at(0).hash(), index: 0, }), Some(false)); + assert_eq!(provider.is_spent(&OutPoint { hash: dchain.at(1).hash(), index: 0, }), None); + assert_eq!(provider.is_spent(&OutPoint { hash: dchain.at(2).hash(), index: 0, }), None); + assert_eq!(provider.previous_transaction_output(&OutPoint { hash: dchain.at(0).hash(), index: 0, }), Some(dchain.at(0).outputs[0].clone())); + assert_eq!(provider.previous_transaction_output(&OutPoint { hash: dchain.at(1).hash(), index: 0, }), None); + assert_eq!(provider.previous_transaction_output(&OutPoint { hash: dchain.at(2).hash(), index: 0, }), None); + } }