diff --git a/lib/network_proxy.py b/lib/network_proxy.py index 6bf3ea98..eb666b7d 100644 --- a/lib/network_proxy.py +++ b/lib/network_proxy.py @@ -66,7 +66,7 @@ class NetworkProxy(util.DaemonThread): def run(self): while self.is_running(): - self.run_jobs() # Synchronizer, for now + self.run_jobs() # Synchronizer and Verifier try: response = self.pipe.get() except util.timeout: @@ -185,9 +185,6 @@ class NetworkProxy(util.DaemonThread): def get_interfaces(self): return self.interfaces - def get_header(self, height): - return self.synchronous_get([('network.get_header', [height])])[0] - def get_local_height(self): return self.blockchain_height diff --git a/lib/verifier.py b/lib/verifier.py index 28532c5f..c19a02b7 100644 --- a/lib/verifier.py +++ b/lib/verifier.py @@ -17,63 +17,52 @@ # along with this program. If not, see . -import threading -import Queue - - -import util +from util import ThreadJob +from functools import partial from bitcoin import * -class SPV(util.DaemonThread): +class SPV(ThreadJob): """ Simple Payment Verification """ def __init__(self, network, wallet): - util.DaemonThread.__init__(self) self.wallet = wallet self.network = network self.merkle_roots = {} # hashed by me - self.queue = Queue.Queue() + self.requested_merkle = set() def run(self): - requested_merkle = set() - while self.is_running(): - unverified = self.wallet.get_unverified_txs() - for (tx_hash, tx_height) in unverified: - if tx_hash not in self.merkle_roots and tx_hash not in requested_merkle: - if self.network.send([ ('blockchain.transaction.get_merkle',[tx_hash, tx_height]) ], self.queue.put): - self.print_error('requested merkle', tx_hash) - requested_merkle.add(tx_hash) - try: - r = self.queue.get(timeout=0.1) - except Queue.Empty: - continue - if not r: - continue + unverified = self.wallet.get_unverified_txs() + for (tx_hash, tx_height) in unverified: + if tx_hash not in self.merkle_roots and tx_hash not in self.requested_merkle: + request = ('blockchain.transaction.get_merkle', + [tx_hash, tx_height]) + if self.network.send([request], self.merkle_response): + self.print_error('requested merkle', tx_hash) + self.requested_merkle.add(tx_hash) - if r.get('error'): - self.print_error('Verifier received an error:', r) - continue + def merkle_response(self, r): + if r.get('error'): + self.print_error('received an error:', r) + return - # 3. handle response - method = r['method'] - params = r['params'] - result = r['result'] + params = r['params'] + result = r['result'] - if method == 'blockchain.transaction.get_merkle': - tx_hash = params[0] - self.verify_merkle(tx_hash, result) + # Get the header asynchronously - as a thread job we cannot block + tx_hash = params[0] + request = ('network.get_header',[result.get('block_height')]) + self.network.send([request], partial(self.verify, tx_hash, result)) - self.print_error("stopped") - - - def verify_merkle(self, tx_hash, result): - tx_height = result.get('block_height') - pos = result.get('pos') - merkle_root = self.hash_merkle_root(result['merkle'], tx_hash, pos) - header = self.network.get_header(tx_height) - if not header: return - if header.get('merkle_root') != merkle_root: + def verify(self, tx_hash, merkle, header): + '''Verify the hash of the server-provided merkle branch to a + transaction matches the merkle root of its block + ''' + tx_height = merkle.get('block_height') + pos = merkle.get('pos') + merkle_root = self.hash_merkle_root(merkle['merkle'], tx_hash, pos) + header = header.get('result') + if not header or header.get('merkle_root') != merkle_root: self.print_error("merkle verification failed for", tx_hash) return diff --git a/lib/wallet.py b/lib/wallet.py index 9e3f3c2e..a927d0e0 100644 --- a/lib/wallet.py +++ b/lib/wallet.py @@ -1086,9 +1086,7 @@ class Abstract_Wallet(object): return True return False - def set_verifier(self, verifier): - self.verifier = verifier - + def prepare_for_verifier(self): # review transactions that are in the history for addr, hist in self.history.items(): for tx_hash, tx_height in hist: @@ -1107,9 +1105,9 @@ class Abstract_Wallet(object): from verifier import SPV self.network = network if self.network is not None: + self.prepare_for_verifier() self.verifier = SPV(self.network, self) - self.verifier.start() - self.set_verifier(self.verifier) + network.add_job(self.verifier) self.synchronizer = Synchronizer(self, network) network.add_job(self.synchronizer) else: @@ -1118,9 +1116,10 @@ class Abstract_Wallet(object): def stop_threads(self): if self.network: - self.verifier.stop() self.network.remove_job(self.synchronizer) + self.network.remove_job(self.verifier) self.synchronizer = None + self.verifier = None self.storage.put('stored_height', self.get_local_height(), True) def restore(self, cb):