diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 38eee5a7..9f9ee979 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -74,11 +74,11 @@ pub fn create_sync_connection_factory(handle: &Handle, network: Magic, db: db::S use synchronization_client::{SynchronizationClient, SynchronizationClientCore, Config as SynchronizationConfig}; use synchronization_verifier::AsyncVerifier; - let sync_chain = Arc::new(RwLock::new(SyncChain::new(db.clone()))); + let sync_chain = Arc::new(RwLock::new(SyncChain::new(db))); let sync_executor = SyncExecutor::new(sync_chain.clone()); let sync_server = Arc::new(SynchronizationServer::new(sync_chain.clone(), sync_executor.clone())); let sync_client_core = SynchronizationClientCore::new(SynchronizationConfig::new(), handle, sync_executor.clone(), sync_chain.clone()); - let verifier = AsyncVerifier::new(network, db, sync_client_core.clone()); + let verifier = AsyncVerifier::new(network, sync_chain, sync_client_core.clone()); let sync_client = SynchronizationClient::new(sync_client_core, verifier); let sync_node = Arc::new(SyncNode::new(sync_server, sync_client, sync_executor)); SyncConnectionFactory::with_local_node(sync_node) diff --git a/sync/src/synchronization_chain.rs b/sync/src/synchronization_chain.rs index 648a5551..75941f04 100644 --- a/sync/src/synchronization_chain.rs +++ b/sync/src/synchronization_chain.rs @@ -183,6 +183,11 @@ impl Chain { self.storage.clone() } + /// Get memory pool + pub fn memory_pool(&self) -> &MemoryPool { + &self.memory_pool + } + /// Get number of blocks in given state pub fn length_of_blocks_state(&self, state: BlockState) -> u32 { match state { diff --git a/sync/src/synchronization_client.rs b/sync/src/synchronization_client.rs index f730b082..f962f6b4 100644 --- a/sync/src/synchronization_client.rs +++ b/sync/src/synchronization_client.rs @@ -285,6 +285,8 @@ pub struct SynchronizationClientCore { verifying_blocks_by_peer: HashMap, /// Verifying blocks futures verifying_blocks_futures: HashMap, Vec>)>, + /// Hashes of items we do not want to relay after verification is completed + do_not_relay: HashSet, } impl Config { @@ -394,8 +396,13 @@ impl Client for SynchronizationClient where T: TaskExecutor, U: Veri let transactions_to_verify = { self.core.lock().on_peer_transaction(peer_index, transaction) }; if let Some(mut transactions_to_verify) = transactions_to_verify { + // it is not actual height of block this transaction will be included to + // => it possibly will be invalid if included in later blocks + // => mined block can be rejected + // => we should verify blocks we mine + let next_block_height = self.best_block().number + 1; while let Some((_, tx)) = transactions_to_verify.pop_front() { - self.verifier.verify_transaction(tx); + self.verifier.verify_transaction(next_block_height, tx); } } } @@ -625,7 +632,7 @@ impl ClientCore for SynchronizationClientCore where T: TaskExecutor { // remember that peer has this transaction self.peers.on_transaction_received(peer_index, &transaction_hash); - self.process_peer_transaction(Some(peer_index), transaction_hash, transaction) + self.process_peer_transaction(Some(peer_index), transaction_hash, transaction, true) } /// Peer wants to set bloom filter for the connection @@ -778,6 +785,7 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor /// Process successful block verification fn on_block_verification_success(&mut self, block: IndexedBlock) -> Option> { let hash = block.hash(); + let needs_relay = !self.do_not_relay.remove(hash); // insert block to the storage match { let mut chain = self.chain.write(); @@ -806,15 +814,18 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor // Being able to opt-out of inv messages until the filter is set prevents a client being flooded with traffic in // the brief window of time between finishing version handshaking and setting the filter. if self.state.is_saturated() || self.state.is_nearly_saturated() { - self.relay_new_blocks(insert_result.canonized_blocks_hashes); + if needs_relay { + self.relay_new_blocks(insert_result.canonized_blocks_hashes); + } } // deal with block transactions let mut verification_tasks: Vec = Vec::with_capacity(insert_result.transactions_to_reverify.len()); + let next_block_height = self.best_block().number + 1; for (hash, tx) in insert_result.transactions_to_reverify { - // TODO: transactions from this blocks will be relayed. Do we need this? - if let Some(tx_orphans) = self.process_peer_transaction(None, hash, tx) { - let tx_tasks = tx_orphans.into_iter().map(|(_, tx)| VerificationTask::VerifyTransaction(tx)); + // do not relay resurrected transactions again + if let Some(tx_orphans) = self.process_peer_transaction(None, hash, tx, false) { + let tx_tasks = tx_orphans.into_iter().map(|(_, tx)| VerificationTask::VerifyTransaction(next_block_height, tx)); verification_tasks.extend(tx_tasks); }; } @@ -836,6 +847,8 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor fn on_block_verification_error(&mut self, err: &str, hash: &H256) { warn!(target: "sync", "Block {:?} verification failed with error {:?}", hash.to_reversed_str(), err); + self.do_not_relay.remove(hash); + { let mut chain = self.chain.write(); @@ -854,6 +867,7 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor /// Process successful transaction verification fn on_transaction_verification_success(&mut self, transaction: Transaction) { let hash = transaction.hash(); + let needs_relay = !self.do_not_relay.remove(&hash); let transaction_fee_rate = { // insert transaction to the memory pool @@ -873,13 +887,17 @@ impl VerificationSink for SynchronizationClientCore where T: TaskExecutor }; // relay transaction to peers - self.relay_new_transactions(vec![(hash, &transaction, transaction_fee_rate)]); + if needs_relay { + self.relay_new_transactions(vec![(hash, &transaction, transaction_fee_rate)]); + } } /// Process failed transaction verification fn on_transaction_verification_error(&mut self, err: &str, hash: &H256) { warn!(target: "sync", "Transaction {:?} verification failed with error {:?}", hash.to_reversed_str(), err); + self.do_not_relay.remove(hash); + { let mut chain = self.chain.write(); @@ -904,6 +922,7 @@ impl SynchronizationClientCore where T: TaskExecutor { orphaned_transactions_pool: OrphanTransactionsPool::new(), verifying_blocks_by_peer: HashMap::new(), verifying_blocks_futures: HashMap::new(), + do_not_relay: HashSet::new(), } )); @@ -1197,7 +1216,7 @@ impl SynchronizationClientCore where T: TaskExecutor { } /// Process new peer transaction - fn process_peer_transaction(&mut self, peer_index: Option, hash: H256, transaction: Transaction) -> Option> { + fn process_peer_transaction(&mut self, peer_index: Option, hash: H256, transaction: Transaction, relay: bool) -> Option> { // if we are in synchronization state, we will ignore this message if self.state.is_synchronizing() { return None; @@ -1222,14 +1241,17 @@ impl SynchronizationClientCore where T: TaskExecutor { } // else verify && insert this transaction && all dependent orphans - let mut transactons: VecDeque<(H256, Transaction)> = VecDeque::new(); - transactons.push_back((hash.clone(), transaction)); - transactons.extend(self.orphaned_transactions_pool.remove_transactions_for_parent(&hash)); + let mut transactions: VecDeque<(H256, Transaction)> = VecDeque::new(); + transactions.push_back((hash.clone(), transaction)); + transactions.extend(self.orphaned_transactions_pool.remove_transactions_for_parent(&hash)); // remember that we are verifying these transactions - for &(ref h, ref tx) in &transactons { + for &(ref h, ref tx) in &transactions { chain.verify_transaction(h.clone(), tx.clone()); + if !relay { + self.do_not_relay.insert(h.clone()); + } } - Some(transactons) + Some(transactions) } fn prepare_blocks_requests_tasks(&mut self, peers: Vec, mut hashes: Vec) -> Vec { diff --git a/sync/src/synchronization_verifier.rs b/sync/src/synchronization_verifier.rs index 2be61676..559fb3dd 100644 --- a/sync/src/synchronization_verifier.rs +++ b/sync/src/synchronization_verifier.rs @@ -3,11 +3,12 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::mpsc::{channel, Sender, Receiver}; use parking_lot::Mutex; -use chain::Transaction; +use chain::{Transaction, OutPoint, TransactionOutput}; use network::Magic; use primitives::hash::H256; +use synchronization_chain::ChainRef; use verification::{ChainVerifier, Verify as VerificationVerify, Chain}; -use db::{SharedStore, IndexedBlock}; +use db::{SharedStore, IndexedBlock, PreviousTransactionOutputProvider}; /// Verification events sink pub trait VerificationSink : Send + 'static { @@ -27,7 +28,7 @@ pub enum VerificationTask { /// Verify single block VerifyBlock(IndexedBlock), /// Verify single transaction - VerifyTransaction(Transaction), + VerifyTransaction(u32, Transaction), /// Stop verification thread Stop, } @@ -37,7 +38,7 @@ pub trait Verifier : Send + 'static { /// Verify block fn verify_block(&self, block: IndexedBlock); /// Verify transaction - fn verify_transaction(&self, transaction: Transaction); + fn verify_transaction(&self, height: u32, transaction: Transaction); } /// Asynchronous synchronization verifier @@ -48,28 +49,39 @@ pub struct AsyncVerifier { verification_worker_thread: Option>, } +struct ChainMemoryPoolTransactionOutputProvider { + chain: ChainRef, +} + +#[derive(Default)] +struct EmptyTransactionOutputProvider { +} + impl AsyncVerifier { /// Create new async verifier - pub fn new(network: Magic, storage: SharedStore, sink: Arc>) -> Self { + pub fn new(network: Magic, chain: ChainRef, sink: Arc>) -> Self { let (verification_work_sender, verification_work_receiver) = channel(); - let verifier = ChainVerifier::new(storage, network); + let verifier = ChainVerifier::new(chain.read().storage(), network); AsyncVerifier { verification_work_sender: verification_work_sender, verification_worker_thread: Some(thread::Builder::new() .name("Sync verification thread".to_string()) .spawn(move || { - AsyncVerifier::verification_worker_proc(sink, verifier, verification_work_receiver) + AsyncVerifier::verification_worker_proc(sink, chain, verifier, verification_work_receiver) }) .expect("Error creating verification thread")) } } /// Thread procedure for handling verification tasks - fn verification_worker_proc(sink: Arc>, verifier: ChainVerifier, work_receiver: Receiver) { + fn verification_worker_proc(sink: Arc>, chain: ChainRef, verifier: ChainVerifier, work_receiver: Receiver) { while let Ok(task) = work_receiver.recv() { match task { VerificationTask::Stop => break, - _ => execute_verification_task(&sink, &verifier, task), + _ => { + let prevout_provider = ChainMemoryPoolTransactionOutputProvider::with_chain(chain.clone()); + execute_verification_task(&sink, &prevout_provider, &verifier, task) + }, } } } @@ -95,9 +107,9 @@ impl Verifier for AsyncVerifier { } /// Verify transaction - fn verify_transaction(&self, transaction: Transaction) { + fn verify_transaction(&self, height: u32, transaction: Transaction) { self.verification_work_sender - .send(VerificationTask::VerifyTransaction(transaction)) + .send(VerificationTask::VerifyTransaction(height, transaction)) .expect("Verification thread have the same lifetime as `AsyncVerifier`"); } } @@ -124,17 +136,17 @@ impl SyncVerifier where T: VerificationSink { impl Verifier for SyncVerifier where T: VerificationSink { /// Verify block fn verify_block(&self, block: IndexedBlock) { - execute_verification_task(&self.sink, &self.verifier, VerificationTask::VerifyBlock(block)) + execute_verification_task(&self.sink, &EmptyTransactionOutputProvider::default(), &self.verifier, VerificationTask::VerifyBlock(block)) } /// Verify transaction - fn verify_transaction(&self, transaction: Transaction) { - execute_verification_task(&self.sink, &self.verifier, VerificationTask::VerifyTransaction(transaction)) + fn verify_transaction(&self, height: u32, transaction: Transaction) { + execute_verification_task(&self.sink, &EmptyTransactionOutputProvider::default(), &self.verifier, VerificationTask::VerifyTransaction(height, transaction)) } } /// Execute single verification task -fn execute_verification_task(sink: &Arc>, verifier: &ChainVerifier, task: VerificationTask) { +fn execute_verification_task(sink: &Arc>, tx_output_provider: &U, verifier: &ChainVerifier, task: VerificationTask) { let mut tasks_queue: VecDeque = VecDeque::new(); tasks_queue.push_back(task); @@ -156,15 +168,41 @@ fn execute_verification_task(sink: &Arc>, verifier } } }, - VerificationTask::VerifyTransaction(transaction) => { - // TODO: add verification here - sink.lock().on_transaction_verification_error("unimplemented", &transaction.hash()) + VerificationTask::VerifyTransaction(height, transaction) => { + // bitcoin: AcceptToMemoryPoolWorker + + let time: u32 = 0; // TODO + let sequence: usize = 1; // TODO: change to bool + match verifier.verify_transaction(tx_output_provider, height, time, &transaction, sequence) { + Ok(_) => sink.lock().on_transaction_verification_success(transaction), + Err(e) => sink.lock().on_transaction_verification_error(&format!("{:?}", e), &transaction.hash()), + } }, _ => unreachable!("must be checked by caller"), } } } +impl ChainMemoryPoolTransactionOutputProvider { + pub fn with_chain(chain: ChainRef) -> Self { + ChainMemoryPoolTransactionOutputProvider { + chain: chain, + } + } +} + +impl PreviousTransactionOutputProvider for ChainMemoryPoolTransactionOutputProvider { + fn previous_transaction_output(&self, prevout: &OutPoint) -> Option { + self.chain.read().memory_pool().previous_transaction_output(prevout) + } +} + +impl PreviousTransactionOutputProvider for EmptyTransactionOutputProvider { + fn previous_transaction_output(&self, _prevout: &OutPoint) -> Option { + None + } +} + #[cfg(test)] pub mod tests { use std::sync::Arc; @@ -207,7 +245,7 @@ pub mod tests { } } - fn verify_transaction(&self, transaction: Transaction) { + fn verify_transaction(&self, _height: u32, transaction: Transaction) { match self.sink { Some(ref sink) => match self.errors.get(&transaction.hash()) { Some(err) => sink.lock().on_transaction_verification_error(&err, &transaction.hash()),