mempool double send is now checked during reorganizations

This commit is contained in:
Svyatoslav Nikolsky 2017-01-09 11:49:04 +03:00
parent 98855f8190
commit afc9c53df0
2 changed files with 51 additions and 67 deletions

View File

@ -5,12 +5,11 @@ use std::thread;
use parking_lot::Mutex;
use time::get_time;
use chain::{IndexedBlock, IndexedTransaction};
use db::{PreviousTransactionOutputProvider, TransactionOutputObserver};
use network::Magic;
use primitives::hash::H256;
use verification::{BackwardsCompatibleChainVerifier as ChainVerifier, Verify as VerificationVerify, Chain};
use types::{BlockHeight, StorageRef, MemoryPoolRef};
use utils::{MemoryPoolTransactionOutputProvider, StorageTransactionOutputProvider};
use utils::MemoryPoolTransactionOutputProvider;
/// Block verification events sink
pub trait BlockVerificationSink : Send + Sync + 'static {
@ -87,22 +86,50 @@ impl AsyncVerifier {
/// Thread procedure for handling verification tasks
fn verification_worker_proc<T: VerificationSink>(sink: Arc<T>, storage: StorageRef, memory_pool: MemoryPoolRef, verifier: Arc<ChainVerifier>, work_receiver: Receiver<VerificationTask>) {
while let Ok(task) = work_receiver.recv() {
match task {
VerificationTask::Stop => break,
_ => {
let prevout_provider = if let Some(ref transaction) = task.transaction() {
// block verification && insertion can lead to reorganization
// => transactions from decanonized blocks should be put back to the MemoryPool
// => they must be verified again
// => here's sub-tasks queue
let mut tasks_queue: VecDeque<VerificationTask> = VecDeque::new();
tasks_queue.push_back(task);
while let Some(task) = tasks_queue.pop_front() {
match task {
VerificationTask::VerifyBlock(block) => {
// verify block
match verifier.verify(&block) {
Ok(Chain::Main) | Ok(Chain::Side) => {
if let Some(tasks) = sink.on_block_verification_success(block) {
tasks_queue.extend(tasks);
}
},
Ok(Chain::Orphan) => {
// this can happen for B1 if B0 verification has failed && we have already scheduled verification of B0
sink.on_block_verification_error(&format!("orphaned block because parent block verification has failed"), block.hash())
},
Err(e) => {
sink.on_block_verification_error(&format!("{:?}", e), block.hash())
}
}
},
VerificationTask::VerifyTransaction(height, transaction) => {
// output provider must check previous outputs in both storage && memory pool
match MemoryPoolTransactionOutputProvider::for_transaction(storage.clone(), &memory_pool, &transaction.raw) {
Err(e) => {
sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash);
return;
continue; // with new verification sub-task
},
Ok(prevout_provider) => prevout_provider,
}
} else {
MemoryPoolTransactionOutputProvider::for_block(storage.clone())
};
execute_verification_task(&sink, &prevout_provider, &verifier, task)
},
Ok(tx_output_provider) => {
let time: u32 = get_time().sec as u32;
match verifier.verify_mempool_transaction(&tx_output_provider, height, time, &transaction.raw) {
Ok(_) => sink.on_transaction_verification_success(transaction.into()),
Err(e) => sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash),
}
},
};
},
VerificationTask::Stop => break,
}
}
}
}
@ -140,8 +167,6 @@ impl Verifier for AsyncVerifier {
/// Synchronous synchronization verifier
pub struct SyncVerifier<T: VerificationSink> {
/// Storage reference
storage: StorageRef,
/// Verifier
verifier: ChainVerifier,
/// Verification sink
@ -153,7 +178,6 @@ impl<T> SyncVerifier<T> where T: VerificationSink {
pub fn new(network: Magic, storage: StorageRef, sink: Arc<T>) -> Self {
let verifier = ChainVerifier::new(storage.clone(), network);
SyncVerifier {
storage: storage,
verifier: verifier,
sink: sink,
}
@ -163,8 +187,16 @@ impl<T> SyncVerifier<T> where T: VerificationSink {
impl<T> Verifier for SyncVerifier<T> where T: VerificationSink {
/// Verify block
fn verify_block(&self, block: IndexedBlock) {
let prevout_provider = StorageTransactionOutputProvider::with_storage(self.storage.clone());
execute_verification_task(&self.sink, &prevout_provider, &self.verifier, VerificationTask::VerifyBlock(block))
match self.verifier.verify(&block) {
Ok(Chain::Main) | Ok(Chain::Side) => {
// SyncVerifier is used for bulk blocks import only
// => there are no memory pool
// => we could ignore decanonized transactions
self.sink.on_block_verification_success(block);
},
Ok(Chain::Orphan) => self.sink.on_block_verification_error(&format!("orphaned block because parent block verification has failed"), block.hash()),
Err(e) => self.sink.on_block_verification_error(&format!("{:?}", e), block.hash()),
}
}
/// Verify transaction
@ -173,45 +205,6 @@ impl<T> Verifier for SyncVerifier<T> where T: VerificationSink {
}
}
/// Execute single verification task
fn execute_verification_task<T: VerificationSink, U: TransactionOutputObserver + PreviousTransactionOutputProvider>(sink: &Arc<T>, tx_output_provider: &U, verifier: &ChainVerifier, task: VerificationTask) {
let mut tasks_queue: VecDeque<VerificationTask> = VecDeque::new();
tasks_queue.push_back(task);
while let Some(task) = tasks_queue.pop_front() {
// TODO: for each task different output provider must be created
// reorg => txes from storage are reverifying + mempool txes are reverifying
// => some mempool can be invalid after reverifying
match task {
VerificationTask::VerifyBlock(block) => {
// verify block
match verifier.verify(&block) {
Ok(Chain::Main) | Ok(Chain::Side) => {
if let Some(tasks) = sink.on_block_verification_success(block) {
tasks_queue.extend(tasks);
}
},
Ok(Chain::Orphan) => {
// this can happen for B1 if B0 verification has failed && we have already scheduled verification of B0
sink.on_block_verification_error(&format!("orphaned block because parent block verification has failed"), &block.hash())
},
Err(e) => {
sink.on_block_verification_error(&format!("{:?}", e), &block.hash())
}
}
},
VerificationTask::VerifyTransaction(height, transaction) => {
let time: u32 = get_time().sec as u32;
match verifier.verify_mempool_transaction(tx_output_provider, height, time, &transaction.raw) {
Ok(_) => sink.on_transaction_verification_success(transaction.into()),
Err(e) => sink.on_transaction_verification_error(&format!("{:?}", e), &transaction.hash),
}
},
_ => unreachable!("must be checked by caller"),
}
}
}
#[cfg(test)]
pub mod tests {
use std::sync::Arc;

View File

@ -49,15 +49,6 @@ impl MemoryPoolTransactionOutputProvider {
}),
}
}
/// Create new provider for verifying given block
pub fn for_block(storage: StorageRef) -> Self {
MemoryPoolTransactionOutputProvider {
storage_provider: StorageTransactionOutputProvider::with_storage(storage),
mempool_inputs: HashMap::new(),
nonfinal_spends: None,
}
}
}
impl TransactionOutputObserver for MemoryPoolTransactionOutputProvider {