From a47881d72b3da170388a009f830e0a9fbacd1a98 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 7 May 2015 14:05:10 +0900 Subject: [PATCH 1/2] Move the verified and unverified txs to the wallet. --- lib/verifier.py | 50 ++++++++++--------------------------- lib/wallet.py | 65 +++++++++++++++++++++++++++++++++---------------- 2 files changed, 57 insertions(+), 58 deletions(-) diff --git a/lib/verifier.py b/lib/verifier.py index 87141e71..94942a9e 100644 --- a/lib/verifier.py +++ b/lib/verifier.py @@ -25,41 +25,23 @@ import util from bitcoin import * - - class SPV(util.DaemonThread): """ Simple Payment Verification """ - def __init__(self, network, storage): + def __init__(self, network, wallet): util.DaemonThread.__init__(self) - self.storage = storage + self.wallet = wallet self.network = network - self.transactions = {} # requested verifications (with height sent by the requestor) - self.verified_tx = storage.get('verified_tx3',{}) # height, timestamp of verified transactions self.merkle_roots = {} # hashed by me - self.lock = threading.Lock() self.queue = Queue.Queue() - def get_height(self, tx_hash): - with self.lock: - v = self.verified_tx.get(tx_hash) - height = v[0] if v else None - return height - - - def add(self, tx_hash, tx_height): - """ add a transaction to the list of monitored transactions. """ - assert tx_height > 0 - with self.lock: - if tx_hash not in self.transactions.keys(): - self.transactions[tx_hash] = tx_height - def run(self): requested_merkle = [] while self.is_running(): + verified_tx, unverified_tx = self.wallet.get_transactions() # request missing tx - for tx_hash, tx_height in self.transactions.items(): - if tx_hash not in self.verified_tx: + for tx_hash, tx_height in unverified_tx.items(): + if tx_hash not in verified_tx: # do not request merkle branch before headers are available if tx_height > self.network.get_local_height(): continue @@ -102,12 +84,8 @@ class SPV(util.DaemonThread): # we passed all the tests self.merkle_roots[tx_hash] = merkle_root - timestamp = header.get('timestamp') - with self.lock: - self.verified_tx[tx_hash] = (tx_height, timestamp, pos) - self.print_error("verified %s"%tx_hash) - self.storage.put('verified_tx3', self.verified_tx, True) - self.network.trigger_callback('updated') + self.print_error("verified %s" % tx_hash) + self.wallet.add_verified_tx(tx_hash, (tx_height, header.get('timestamp'), pos)) def hash_merkle_root(self, merkle_s, target_hash, pos): @@ -118,15 +96,13 @@ class SPV(util.DaemonThread): return hash_encode(h) - def undo_verifications(self, height): - with self.lock: - items = self.verified_tx.items()[:] - for tx_hash, item in items: + verified_tx, unverified_tx = self.wallet.get_transactions() + txs = [] + for tx_hash, item in verified_tx: tx_height, timestamp, pos = item if tx_height >= height: self.print_error("redoing", tx_hash) - with self.lock: - self.verified_tx.pop(tx_hash) - if tx_hash in self.merkle_roots: - self.merkle_roots.pop(tx_hash) + txs.append(tx_hash) + self.merkle_roots.pop(tx_hash, None) + self.wallet.unverify_txs(txs) diff --git a/lib/wallet.py b/lib/wallet.py index b1da8367..a5d5aa69 100644 --- a/lib/wallet.py +++ b/lib/wallet.py @@ -182,6 +182,11 @@ class Abstract_Wallet(object): # spv self.verifier = None + # Transactions pending verification. Each value is the transaction height. Access with self.lock. + self.unverified_tx = {} + # Verified transactions. Each value is a (height, timestamp, block_pos) tuple. Access with self.lock. + self.verified_tx = storage.get('verified_tx3',{}) + # there is a difference between wallet.up_to_date and interface.is_up_to_date() # interface.is_up_to_date() returns true when all requests have been answered and processed # wallet.up_to_date is true when the wallet is synchronized (stronger requirement) @@ -388,32 +393,48 @@ class Abstract_Wallet(object): return decrypted def add_unverified_tx(self, tx_hash, tx_height): - if self.verifier and tx_height > 0: - self.verifier.add(tx_hash, tx_height) + if tx_height > 0: + with self.lock: + self.unverified_tx[tx_hash] = tx_height + + def add_verified_tx(self, tx_hash, info): + with self.lock: + self.verified_tx[tx_hash] = info # (tx_height, timestamp, pos) + self.storage.put('verified_tx3', self.verified_tx, True) + self.network.trigger_callback('updated') + + def unverify_txs(self, txs): + '''Used by the verifier when a reorg has happened''' + with self.lock: + for tx_hash in txs: + self.verified_tx.pop(tx_hash, None) + + def get_transactions(self): + '''Return the verified and unverified tx dicts''' + with self.lock: + return self.verified_tx, self.unverified_tx def get_confirmations(self, tx): """ return the number of confirmations of a monitored transaction. """ - if not self.verifier: - return (None, None) - with self.verifier.lock: - if tx in self.verifier.verified_tx: - height, timestamp, pos = self.verifier.verified_tx[tx] - conf = (self.network.get_local_height() - height + 1) - if conf <= 0: timestamp = None - elif tx in self.verifier.transactions: - conf = -1 - timestamp = None - else: - conf = 0 - timestamp = None + verified_tx, unverified_tx = self.get_transactions() + if tx in verified_tx: + height, timestamp, pos = verified_tx[tx] + conf = (self.network.get_local_height() - height + 1) + if conf <= 0: timestamp = None + elif tx in unverified_tx: + conf = -1 + timestamp = None + else: + conf = 0 + timestamp = None return conf, timestamp def get_txpos(self, tx_hash): "return position, even if the tx is unverified" - with self.verifier.lock: - x = self.verifier.verified_tx.get(tx_hash) - y = self.verifier.transactions.get(tx_hash) + verified_tx, unverified_tx = self.get_transactions() + x = verified_tx.get(tx_hash) + y = unverified_tx.get(tx_hash) if x: height, timestamp, pos = x return height, pos @@ -1000,7 +1021,8 @@ class Abstract_Wallet(object): self.add_unverified_tx (tx_hash, tx_height) # if we are on a pruning server, remove unverified transactions - vr = self.verifier.transactions.keys() + self.verifier.verified_tx.keys() + verified_tx, unverified_tx = self.get_transactions() + vr = verified_tx.keys() + unverified_tx.keys() for tx_hash in self.transactions.keys(): if tx_hash not in vr: print_error("removing transaction", tx_hash) @@ -1016,6 +1038,7 @@ class Abstract_Wallet(object): return False # check that we are not "orphaning" a transaction + verified_tx, unverified_tx = self.get_transactions() old_hist = self.history.get(addr,[]) for tx_hash, height in old_hist: if tx_hash in map(lambda x:x[0], hist): @@ -1035,7 +1058,7 @@ class Abstract_Wallet(object): if not tx: continue # already verified? - if self.verifier.get_height(tx_hash): + if tx_hash in verified_tx: continue # unconfirmed tx print_error("new history is orphaning transaction:", tx_hash) @@ -1065,7 +1088,7 @@ class Abstract_Wallet(object): from verifier import SPV self.network = network if self.network is not None: - self.verifier = SPV(self.network, self.storage) + self.verifier = SPV(self.network, self) self.verifier.start() self.set_verifier(self.verifier) self.synchronizer = WalletSynchronizer(self, network) From 37c3cce329b01506d1cfa9cfc095635794ef81f0 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 8 May 2015 09:31:45 +0900 Subject: [PATCH 2/2] Remove wallet.get_transactions() Because references are returned, it's not threadsafe as ThomasV pointed out. --- lib/verifier.py | 32 +++++++++---------------- lib/wallet.py | 64 ++++++++++++++++++++++++++++--------------------- 2 files changed, 48 insertions(+), 48 deletions(-) diff --git a/lib/verifier.py b/lib/verifier.py index 94942a9e..b221856a 100644 --- a/lib/verifier.py +++ b/lib/verifier.py @@ -36,19 +36,14 @@ class SPV(util.DaemonThread): self.queue = Queue.Queue() def run(self): - requested_merkle = [] + requested_merkle = set() while self.is_running(): - verified_tx, unverified_tx = self.wallet.get_transactions() - # request missing tx - for tx_hash, tx_height in unverified_tx.items(): - if tx_hash not in verified_tx: - # do not request merkle branch before headers are available - if tx_height > self.network.get_local_height(): - continue - if self.merkle_roots.get(tx_hash) is None and tx_hash not in requested_merkle: - if self.network.send([ ('blockchain.transaction.get_merkle',[tx_hash, tx_height]) ], self.queue.put): - self.print_error('requesting merkle', tx_hash) - requested_merkle.append(tx_hash) + unverified = self.wallet.get_unverified_txs() + for (tx_hash, tx_height) in unverified: + if self.merkle_roots.get(tx_hash) is None and tx_hash not in requested_merkle: + if self.network.send([ ('blockchain.transaction.get_merkle',[tx_hash, tx_height]) ], self.queue.put): + self.print_error('requesting merkle', tx_hash) + requested_merkle.add(tx_hash) try: r = self.queue.get(timeout=0.1) except Queue.Empty: @@ -97,12 +92,7 @@ class SPV(util.DaemonThread): def undo_verifications(self, height): - verified_tx, unverified_tx = self.wallet.get_transactions() - txs = [] - for tx_hash, item in verified_tx: - tx_height, timestamp, pos = item - if tx_height >= height: - self.print_error("redoing", tx_hash) - txs.append(tx_hash) - self.merkle_roots.pop(tx_hash, None) - self.wallet.unverify_txs(txs) + tx_hashes = selt.wallet.undo_verifications(height) + for tx_hash in tx_hashes: + self.print_error("redoing", tx_hash) + self.merkle_roots.pop(tx_hash, None) diff --git a/lib/wallet.py b/lib/wallet.py index a5d5aa69..b5ef0ef1 100644 --- a/lib/wallet.py +++ b/lib/wallet.py @@ -403,38 +403,48 @@ class Abstract_Wallet(object): self.storage.put('verified_tx3', self.verified_tx, True) self.network.trigger_callback('updated') - def unverify_txs(self, txs): + def get_unverified_txs(self): + '''Returns a list of tuples (tx_hash, height) that are unverified and not beyond local height''' + txs = [] + with self.lock: + for tx_hash, tx_height in self.unverified_tx.items(): + # do not request merkle branch before headers are available + if tx_hash not in self.verified_tx and tx_height <= self.network.get_local_height(): + txs.append((tx_hash, tx_height)) + return txs + + def undo_verifications(self, height): '''Used by the verifier when a reorg has happened''' + txs = [] with self.lock: - for tx_hash in txs: - self.verified_tx.pop(tx_hash, None) - - def get_transactions(self): - '''Return the verified and unverified tx dicts''' - with self.lock: - return self.verified_tx, self.unverified_tx + for tx_hash, item in self.verified_tx: + tx_height, timestamp, pos = item + if tx_height >= height: + self.verified_tx.pop(tx_hash, None) + txs.append(tx_hash) + return txs def get_confirmations(self, tx): """ return the number of confirmations of a monitored transaction. """ - verified_tx, unverified_tx = self.get_transactions() - if tx in verified_tx: - height, timestamp, pos = verified_tx[tx] - conf = (self.network.get_local_height() - height + 1) - if conf <= 0: timestamp = None - elif tx in unverified_tx: - conf = -1 - timestamp = None - else: - conf = 0 - timestamp = None + with self.lock: + if tx in self.verified_tx: + height, timestamp, pos = self.verified_tx[tx] + conf = (self.network.get_local_height() - height + 1) + if conf <= 0: timestamp = None + elif tx in self.unverified_tx: + conf = -1 + timestamp = None + else: + conf = 0 + timestamp = None return conf, timestamp def get_txpos(self, tx_hash): "return position, even if the tx is unverified" - verified_tx, unverified_tx = self.get_transactions() - x = verified_tx.get(tx_hash) - y = unverified_tx.get(tx_hash) + with self.lock: + x = self.verified_tx.get(tx_hash) + y = self.unverified_tx.get(tx_hash) if x: height, timestamp, pos = x return height, pos @@ -1021,8 +1031,8 @@ class Abstract_Wallet(object): self.add_unverified_tx (tx_hash, tx_height) # if we are on a pruning server, remove unverified transactions - verified_tx, unverified_tx = self.get_transactions() - vr = verified_tx.keys() + unverified_tx.keys() + with self.lock: + vr = self.verified_tx.keys() + self.unverified_tx.keys() for tx_hash in self.transactions.keys(): if tx_hash not in vr: print_error("removing transaction", tx_hash) @@ -1038,7 +1048,6 @@ class Abstract_Wallet(object): return False # check that we are not "orphaning" a transaction - verified_tx, unverified_tx = self.get_transactions() old_hist = self.history.get(addr,[]) for tx_hash, height in old_hist: if tx_hash in map(lambda x:x[0], hist): @@ -1058,8 +1067,9 @@ class Abstract_Wallet(object): if not tx: continue # already verified? - if tx_hash in verified_tx: - continue + with self.lock: + if tx_hash in self.verified_tx: + continue # unconfirmed tx print_error("new history is orphaning transaction:", tx_hash) # check that all outputs are not mine, request histories