Make the synchronizer not a thread.
The synchronizer's work is done from the network proxy's main loop. A minor problem with the old synchronizer was that it considered itself out of date if the network was out of date. This was too generic: the network can have pending requests unrelated to the synchronizer. This resulted in the synchronizer often unnecessarily flipping the wallet between up-to-date and not-up-to-date, and causing unnecessary calls to wallet.save_transactions(). This was observable when opening the network dialog box: frequently just opening it would cause a wallet status change and transaction flush, simply because the network dialog sends a get_parameters() request. This rework of the synchronizer does not have that issue.
This commit is contained in:
parent
f02c2fde64
commit
e8db8983ec
|
@ -61,10 +61,13 @@ class NetworkProxy(util.DaemonThread):
|
||||||
self.blockchain_height = 0
|
self.blockchain_height = 0
|
||||||
self.server_height = 0
|
self.server_height = 0
|
||||||
self.interfaces = []
|
self.interfaces = []
|
||||||
|
self.jobs = []
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while self.is_running():
|
while self.is_running():
|
||||||
|
for job in self.jobs:
|
||||||
|
job()
|
||||||
try:
|
try:
|
||||||
response = self.pipe.get()
|
response = self.pipe.get()
|
||||||
except util.timeout:
|
except util.timeout:
|
||||||
|
|
|
@ -17,172 +17,169 @@
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
|
||||||
import threading
|
from threading import Lock
|
||||||
import time
|
|
||||||
import Queue
|
|
||||||
|
|
||||||
import bitcoin
|
from bitcoin import Hash, hash_encode
|
||||||
import util
|
|
||||||
from transaction import Transaction
|
from transaction import Transaction
|
||||||
|
from util import print_error, print_msg
|
||||||
|
|
||||||
|
|
||||||
class WalletSynchronizer(util.DaemonThread):
|
class WalletSynchronizer():
|
||||||
|
'''The synchronizer keeps the wallet up-to-date with its set of
|
||||||
|
addresses and their transactions. It subscribes over the network
|
||||||
|
to wallet addresses, gets the wallet to generate new addresses
|
||||||
|
when necessary, requests the transaction history of any addresses
|
||||||
|
we don't have the full history of, and requests binary transaction
|
||||||
|
data of any transactions the wallet doesn't have.
|
||||||
|
|
||||||
|
External interface: __init__() and add() member functions.
|
||||||
|
'''
|
||||||
|
|
||||||
def __init__(self, wallet, network):
|
def __init__(self, wallet, network):
|
||||||
util.DaemonThread.__init__(self)
|
|
||||||
self.wallet = wallet
|
self.wallet = wallet
|
||||||
self.network = network
|
self.network = network
|
||||||
self.was_updated = True
|
self.new_addresses = set()
|
||||||
self.queue = Queue.Queue()
|
# Entries are (tx_hash, tx_height) tuples
|
||||||
self.address_queue = Queue.Queue()
|
self.requested_tx = set()
|
||||||
|
self.requested_histories = {}
|
||||||
|
self.requested_addrs = set()
|
||||||
|
self.lock = Lock()
|
||||||
|
self.initialize()
|
||||||
|
|
||||||
|
def print_error(self, *msg):
|
||||||
|
print_error("[Synchronizer]", *msg)
|
||||||
|
|
||||||
|
def print_msg(self, *msg):
|
||||||
|
print_msg("[Synchronizer]", *msg)
|
||||||
|
|
||||||
|
def parse_response(self, response):
|
||||||
|
if response.get('error'):
|
||||||
|
self.print_error("response error:", response)
|
||||||
|
return None, None
|
||||||
|
return response['params'], response['result']
|
||||||
|
|
||||||
|
def is_up_to_date(self):
|
||||||
|
return (not self.requested_tx and not self.requested_histories
|
||||||
|
and not self.requested_addrs)
|
||||||
|
|
||||||
def add(self, address):
|
def add(self, address):
|
||||||
self.address_queue.put(address)
|
'''This can be called from the proxy or GUI threads.'''
|
||||||
|
with self.lock:
|
||||||
|
self.new_addresses.add(address)
|
||||||
|
|
||||||
def subscribe_to_addresses(self, addresses):
|
def subscribe_to_addresses(self, addresses):
|
||||||
messages = []
|
if addresses:
|
||||||
for addr in addresses:
|
self.requested_addrs |= addresses
|
||||||
messages.append(('blockchain.address.subscribe', [addr]))
|
msgs = map(lambda addr: ('blockchain.address.subscribe', [addr]),
|
||||||
self.network.send(messages, self.queue.put)
|
addresses)
|
||||||
|
self.network.send(msgs, self.addr_subscription_response)
|
||||||
|
|
||||||
def run(self):
|
def addr_subscription_response(self, response):
|
||||||
while self.is_running():
|
params, result = self.parse_response(response)
|
||||||
if not self.network.is_connected():
|
if not params:
|
||||||
time.sleep(0.1)
|
return
|
||||||
continue
|
addr = params[0]
|
||||||
self.run_interface()
|
if addr in self.requested_addrs: # Notifications won't be in
|
||||||
self.print_error("stopped")
|
self.requested_addrs.remove(addr)
|
||||||
|
history = self.wallet.get_address_history(addr)
|
||||||
|
if self.wallet.get_status(history) != result:
|
||||||
|
if self.requested_histories.get(addr) is None:
|
||||||
|
self.network.send([('blockchain.address.get_history', [addr])],
|
||||||
|
self.addr_history_response)
|
||||||
|
self.requested_histories[addr] = result
|
||||||
|
|
||||||
def run_interface(self):
|
def addr_history_response(self, response):
|
||||||
#print_error("synchronizer: connected to", self.network.get_parameters())
|
params, result = self.parse_response(response)
|
||||||
|
if not params:
|
||||||
|
return
|
||||||
|
addr = params[0]
|
||||||
|
self.print_error("receiving history", addr, len(result))
|
||||||
|
server_status = self.requested_histories.pop(addr)
|
||||||
|
|
||||||
requested_tx = []
|
# Check that txids are unique
|
||||||
missing_tx = []
|
hashes = set(map(lambda item: item['tx_hash'], result))
|
||||||
requested_histories = {}
|
if len(hashes) != len(result):
|
||||||
|
raise Exception("error: server history has non-unique txids: %s"
|
||||||
|
% addr)
|
||||||
|
|
||||||
# request any missing transactions
|
# Check that the status corresponds to what was announced
|
||||||
|
hist = map(lambda item: (item['tx_hash'], item['height']), result)
|
||||||
|
if self.wallet.get_status(hist) != server_status:
|
||||||
|
raise Exception("error: status mismatch: %s" % addr)
|
||||||
|
|
||||||
|
# Store received history
|
||||||
|
self.wallet.receive_history_callback(addr, hist)
|
||||||
|
|
||||||
|
# Request transactions we don't have
|
||||||
|
self.request_missing_txs(hist)
|
||||||
|
|
||||||
|
def tx_response(self, response):
|
||||||
|
params, result = self.parse_response(response)
|
||||||
|
if not params:
|
||||||
|
return
|
||||||
|
tx_hash, tx_height = params
|
||||||
|
assert tx_hash == hash_encode(Hash(result.decode('hex')))
|
||||||
|
tx = Transaction(result)
|
||||||
|
try:
|
||||||
|
tx.deserialize()
|
||||||
|
except Exception:
|
||||||
|
self.print_msg("cannot deserialize transaction, skipping", tx_hash)
|
||||||
|
return
|
||||||
|
|
||||||
|
self.wallet.receive_tx_callback(tx_hash, tx, tx_height)
|
||||||
|
self.requested_tx.remove((tx_hash, tx_height))
|
||||||
|
self.print_error("received tx:", tx_hash, len(tx.raw))
|
||||||
|
if not self.requested_tx:
|
||||||
|
self.network.trigger_callback('updated')
|
||||||
|
# Updated gets called too many times from other places as
|
||||||
|
# well; if we used that signal we get the notification
|
||||||
|
# three times
|
||||||
|
self.network.trigger_callback("new_transaction")
|
||||||
|
|
||||||
|
def request_missing_txs(self, hist):
|
||||||
|
# "hist" is a list of [tx_hash, tx_height] lists
|
||||||
|
missing = set()
|
||||||
|
for tx_hash, tx_height in hist:
|
||||||
|
if self.wallet.transactions.get(tx_hash) is None:
|
||||||
|
missing.add((tx_hash, tx_height))
|
||||||
|
missing -= self.requested_tx
|
||||||
|
if missing:
|
||||||
|
requests = [('blockchain.transaction.get', tx) for tx in missing]
|
||||||
|
self.network.send(requests, self.tx_response)
|
||||||
|
self.requested_tx |= missing
|
||||||
|
|
||||||
|
def initialize(self):
|
||||||
|
'''Check the initial state of the wallet. Subscribe to all its
|
||||||
|
addresses, and request any transactions in its address history
|
||||||
|
we don't have.
|
||||||
|
'''
|
||||||
for history in self.wallet.history.values():
|
for history in self.wallet.history.values():
|
||||||
if history == ['*']: continue
|
# Old electrum servers returned ['*'] when all history for
|
||||||
for tx_hash, tx_height in history:
|
# the address was pruned. This no longer happens but may
|
||||||
if self.wallet.transactions.get(tx_hash) is None and (tx_hash, tx_height) not in missing_tx:
|
# remain in old wallets.
|
||||||
missing_tx.append( (tx_hash, tx_height) )
|
if history == ['*']:
|
||||||
|
|
||||||
if missing_tx:
|
|
||||||
self.print_error("missing tx", missing_tx)
|
|
||||||
|
|
||||||
# subscriptions
|
|
||||||
self.subscribe_to_addresses(self.wallet.addresses(True))
|
|
||||||
|
|
||||||
while self.is_running():
|
|
||||||
|
|
||||||
# 1. create new addresses
|
|
||||||
self.wallet.synchronize()
|
|
||||||
|
|
||||||
# request missing addresses
|
|
||||||
new_addresses = []
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
addr = self.address_queue.get(block=False)
|
|
||||||
except Queue.Empty:
|
|
||||||
break
|
|
||||||
new_addresses.append(addr)
|
|
||||||
if new_addresses:
|
|
||||||
self.subscribe_to_addresses(new_addresses)
|
|
||||||
|
|
||||||
# request missing transactions
|
|
||||||
for tx_hash, tx_height in missing_tx:
|
|
||||||
if (tx_hash, tx_height) not in requested_tx:
|
|
||||||
self.network.send([ ('blockchain.transaction.get',[tx_hash, tx_height]) ], self.queue.put)
|
|
||||||
requested_tx.append( (tx_hash, tx_height) )
|
|
||||||
missing_tx = []
|
|
||||||
|
|
||||||
# detect if situation has changed
|
|
||||||
if self.network.is_up_to_date() and self.queue.empty():
|
|
||||||
if not self.wallet.is_up_to_date():
|
|
||||||
self.wallet.set_up_to_date(True)
|
|
||||||
self.was_updated = True
|
|
||||||
self.wallet.save_transactions()
|
|
||||||
else:
|
|
||||||
if self.wallet.is_up_to_date():
|
|
||||||
self.wallet.set_up_to_date(False)
|
|
||||||
self.was_updated = True
|
|
||||||
|
|
||||||
if self.was_updated:
|
|
||||||
self.network.trigger_callback('updated')
|
|
||||||
self.was_updated = False
|
|
||||||
|
|
||||||
# 2. get a response
|
|
||||||
try:
|
|
||||||
r = self.queue.get(timeout=0.1)
|
|
||||||
except Queue.Empty:
|
|
||||||
continue
|
continue
|
||||||
|
self.request_missing_txs(history)
|
||||||
|
|
||||||
# 3. process response
|
if self.requested_tx:
|
||||||
method = r['method']
|
self.print_error("missing tx", self.requested_tx)
|
||||||
params = r['params']
|
self.subscribe_to_addresses(set(self.wallet.addresses(True)))
|
||||||
result = r.get('result')
|
|
||||||
error = r.get('error')
|
|
||||||
if error:
|
|
||||||
self.print_error("error", r)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if method == 'blockchain.address.subscribe':
|
def main_loop(self):
|
||||||
addr = params[0]
|
'''Called from the network proxy thread main loop.'''
|
||||||
if self.wallet.get_status(self.wallet.get_address_history(addr)) != result:
|
# 1. Create new addresses
|
||||||
if requested_histories.get(addr) is None:
|
self.wallet.synchronize()
|
||||||
self.network.send([('blockchain.address.get_history', [addr])], self.queue.put)
|
|
||||||
requested_histories[addr] = result
|
|
||||||
|
|
||||||
elif method == 'blockchain.address.get_history':
|
# 2. Subscribe to new addresses
|
||||||
addr = params[0]
|
with self.lock:
|
||||||
self.print_error("receiving history", addr, len(result))
|
addresses = self.new_addresses
|
||||||
hist = []
|
self.new_addresses = set()
|
||||||
# check that txids are unique
|
self.subscribe_to_addresses(addresses)
|
||||||
txids = []
|
|
||||||
for item in result:
|
|
||||||
tx_hash = item['tx_hash']
|
|
||||||
if tx_hash not in txids:
|
|
||||||
txids.append(tx_hash)
|
|
||||||
hist.append( (tx_hash, item['height']) )
|
|
||||||
|
|
||||||
if len(hist) != len(result):
|
# 3. Detect if situation has changed
|
||||||
raise Exception("error: server sent history with non-unique txid", result)
|
up_to_date = self.is_up_to_date()
|
||||||
|
if up_to_date != self.wallet.is_up_to_date():
|
||||||
# check that the status corresponds to what was announced
|
self.wallet.set_up_to_date(up_to_date)
|
||||||
rs = requested_histories.pop(addr)
|
if up_to_date:
|
||||||
if self.wallet.get_status(hist) != rs:
|
self.wallet.save_transactions()
|
||||||
raise Exception("error: status mismatch: %s"%addr)
|
self.network.trigger_callback('updated')
|
||||||
|
|
||||||
# store received history
|
|
||||||
self.wallet.receive_history_callback(addr, hist)
|
|
||||||
|
|
||||||
# request transactions that we don't have
|
|
||||||
for tx_hash, tx_height in hist:
|
|
||||||
if self.wallet.transactions.get(tx_hash) is None:
|
|
||||||
if (tx_hash, tx_height) not in requested_tx and (tx_hash, tx_height) not in missing_tx:
|
|
||||||
missing_tx.append( (tx_hash, tx_height) )
|
|
||||||
|
|
||||||
elif method == 'blockchain.transaction.get':
|
|
||||||
tx_hash = params[0]
|
|
||||||
tx_height = params[1]
|
|
||||||
assert tx_hash == bitcoin.hash_encode(bitcoin.Hash(result.decode('hex')))
|
|
||||||
tx = Transaction(result)
|
|
||||||
try:
|
|
||||||
tx.deserialize()
|
|
||||||
except Exception:
|
|
||||||
self.print_msg("Warning: Cannot deserialize transactions. skipping")
|
|
||||||
continue
|
|
||||||
|
|
||||||
self.wallet.receive_tx_callback(tx_hash, tx, tx_height)
|
|
||||||
self.was_updated = True
|
|
||||||
requested_tx.remove( (tx_hash, tx_height) )
|
|
||||||
self.print_error("received tx:", tx_hash, len(tx.raw))
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.print_error("Error: Unknown message:" + method + ", " + repr(params) + ", " + repr(result) )
|
|
||||||
|
|
||||||
if self.was_updated and not requested_tx:
|
|
||||||
self.network.trigger_callback('updated')
|
|
||||||
# Updated gets called too many times from other places as well; if we use that signal we get the notification three times
|
|
||||||
self.network.trigger_callback("new_transaction")
|
|
||||||
self.was_updated = False
|
|
||||||
|
|
|
@ -1105,15 +1105,16 @@ class Abstract_Wallet(object):
|
||||||
self.verifier.start()
|
self.verifier.start()
|
||||||
self.set_verifier(self.verifier)
|
self.set_verifier(self.verifier)
|
||||||
self.synchronizer = WalletSynchronizer(self, network)
|
self.synchronizer = WalletSynchronizer(self, network)
|
||||||
self.synchronizer.start()
|
network.jobs.append(self.synchronizer.main_loop)
|
||||||
else:
|
else:
|
||||||
self.verifier = None
|
self.verifier = None
|
||||||
self.synchronizer =None
|
self.synchronizer = None
|
||||||
|
|
||||||
def stop_threads(self):
|
def stop_threads(self):
|
||||||
if self.network:
|
if self.network:
|
||||||
self.verifier.stop()
|
self.verifier.stop()
|
||||||
self.synchronizer.stop()
|
self.network.jobs = []
|
||||||
|
self.synchronizer = None
|
||||||
self.storage.put('stored_height', self.get_local_height(), True)
|
self.storage.put('stored_height', self.get_local_height(), True)
|
||||||
|
|
||||||
def restore(self, cb):
|
def restore(self, cb):
|
||||||
|
|
Loading…
Reference in New Issue