move callbacks to the network class

This commit is contained in:
thomasv 2013-09-09 13:33:25 +02:00
parent b3e880b587
commit 047e4b3478
7 changed files with 51 additions and 46 deletions

View File

@ -130,13 +130,11 @@ if __name__ == '__main__':
# network interface # network interface
network = Network(config) network = Network(config)
network.start() network.start()
#interface.send([('server.peers.subscribe',[])])
gui = gui.ElectrumGui(config, network) gui = gui.ElectrumGui(config, network)
gui.main(url) gui.main(url)
network.stop() network.stop()
# we use daemon threads, their termination is enforced. # we use daemon threads, their termination is enforced.
# this sleep command gives them time to terminate cleanly. # this sleep command gives them time to terminate cleanly.
time.sleep(0.1) time.sleep(0.1)
@ -319,15 +317,13 @@ if __name__ == '__main__':
# open session # open session
if cmd not in offline_commands and not options.offline: if cmd not in offline_commands and not options.offline:
interface = Interface(config) network = Network(config)
interface.register_callback('connected', lambda: sys.stderr.write("Connected to " + interface.connection_msg + "\n")) network.register_callback('connected', lambda: sys.stderr.write("Connected to " + network.interface.connection_msg + "\n"))
if not network.start(wait=True):
if not interface.start(wait=True):
print_msg("Not connected, aborting.") print_msg("Not connected, aborting.")
sys.exit(1) sys.exit(1)
blockchain = BlockchainVerifier(interface, config)
blockchain.start() wallet.start_threads(network)
wallet.start_threads(interface, blockchain)
wallet.update() wallet.update()
@ -388,7 +384,6 @@ if __name__ == '__main__':
if cmd not in offline_commands and not options.offline: if cmd not in offline_commands and not options.offline:
wallet.stop_threads() wallet.stop_threads()
interface.stop() network.stop()
blockchain.stop()
time.sleep(0.1) time.sleep(0.1)
sys.exit(0) sys.exit(0)

View File

@ -227,10 +227,11 @@ class ElectrumWindow(QMainWindow):
self.showNormal() self.showNormal()
def __init__(self, config): def __init__(self, config, network):
QMainWindow.__init__(self) QMainWindow.__init__(self)
self.config = config self.config = config
self.network = network
self.init_plugins() self.init_plugins()
self._close_electrum = False self._close_electrum = False
@ -303,11 +304,11 @@ class ElectrumWindow(QMainWindow):
import electrum import electrum
self.wallet = wallet self.wallet = wallet
self.wallet.interface.register_callback('updated', lambda: self.need_update.set()) self.network.register_callback('updated', lambda: self.need_update.set())
self.wallet.interface.register_callback('banner', lambda: self.emit(QtCore.SIGNAL('banner_signal'))) self.network.register_callback('banner', lambda: self.emit(QtCore.SIGNAL('banner_signal')))
self.wallet.interface.register_callback('disconnected', lambda: self.emit(QtCore.SIGNAL('update_status'))) self.network.register_callback('disconnected', lambda: self.emit(QtCore.SIGNAL('update_status')))
self.wallet.interface.register_callback('disconnecting', lambda: self.emit(QtCore.SIGNAL('update_status'))) self.network.register_callback('disconnecting', lambda: self.emit(QtCore.SIGNAL('update_status')))
self.wallet.interface.register_callback('new_transaction', lambda: self.emit(QtCore.SIGNAL('transaction_signal'))) self.network.register_callback('new_transaction', lambda: self.emit(QtCore.SIGNAL('transaction_signal')))
title = 'Electrum ' + self.wallet.electrum_version + ' - ' + self.wallet.storage.path title = 'Electrum ' + self.wallet.electrum_version + ' - ' + self.wallet.storage.path
if not self.wallet.seed: title += ' [%s]' % (_('seedless')) if not self.wallet.seed: title += ' [%s]' % (_('seedless'))
self.setWindowTitle( title ) self.setWindowTitle( title )
@ -2283,7 +2284,7 @@ class ElectrumGui:
s = Timer() s = Timer()
s.start() s.start()
w = ElectrumWindow(self.config) w = ElectrumWindow(self.config, self.network)
w.load_wallet(wallet) w.load_wallet(wallet)
self.windows.append(w) self.windows.append(w)

View File

@ -56,17 +56,19 @@ class Blockchain(threading.Thread):
while self.is_running(): while self.is_running():
try: try:
i, result = self.queue.get() result = self.queue.get()
except Queue.Empty: except Queue.Empty:
continue continue
if not result: continue
i, result = result
header= result.get('result') header= result.get('result')
#print_error( i.server, header )
height = header.get('block_height') height = header.get('block_height')
if height > self.local_height + 50: if height > self.local_height + 50:
self.get_chunks(i, header, height) self.get_chunks(i, header, height)
i.trigger_callback('updated') i.network.trigger_callback('updated')
if height > self.local_height: if height > self.local_height:
# get missing parts from interface (until it connects to my chain) # get missing parts from interface (until it connects to my chain)
@ -85,7 +87,7 @@ class Blockchain(threading.Thread):
print_error("error", i.server) print_error("error", i.server)
# todo: dismiss that server # todo: dismiss that server
i.trigger_callback('updated') i.network.trigger_callback('updated')

View File

@ -67,19 +67,6 @@ def pick_random_server():
class Interface(threading.Thread): class Interface(threading.Thread):
def register_callback(self, event, callback):
with self.lock:
if not self.callbacks.get(event):
self.callbacks[event] = []
self.callbacks[event].append(callback)
def trigger_callback(self, event):
with self.lock:
callbacks = self.callbacks.get(event,[])[:]
if callbacks:
[callback() for callback in callbacks]
def init_server(self, host, port, proxy=None, use_ssl=True): def init_server(self, host, port, proxy=None, use_ssl=True):
self.host = host self.host = host
@ -156,11 +143,11 @@ class Interface(threading.Thread):
elif method == 'server.banner': elif method == 'server.banner':
self.banner = result self.banner = result
self.trigger_callback('banner') self.network.trigger_callback('banner')
elif method == 'server.peers.subscribe': elif method == 'server.peers.subscribe':
self.servers = self.parse_servers(result) self.servers = self.parse_servers(result)
self.trigger_callback('peers') self.network.trigger_callback('peers')
else: else:
# notification: find the channel(s) # notification: find the channel(s)
@ -196,8 +183,9 @@ class Interface(threading.Thread):
def get_response(self, channel='default', block=True, timeout=10000000000): def get_response(self, channel='default', block=True, timeout=10000000000):
i, r = self.responses[channel].get(block, timeout) ir = self.responses[channel].get(block, timeout)
return r if ir:
return ir[1]
def register_channel(self, channel, queue=None): def register_channel(self, channel, queue=None):
if queue is None: if queue is None:
@ -441,7 +429,6 @@ class Interface(threading.Thread):
self.responses = {} self.responses = {}
self.responses['default'] = Queue.Queue() self.responses['default'] = Queue.Queue()
self.callbacks = {}
self.lock = threading.Lock() self.lock = threading.Lock()
self.servers = {} # actual list from IRC self.servers = {} # actual list from IRC

View File

@ -17,8 +17,22 @@ class Network(threading.Thread):
self.queue = Queue.Queue() self.queue = Queue.Queue()
self.default_server = self.config.get('server') self.default_server = self.config.get('server')
self.servers_list = interface.filter_protocol(interface.DEFAULT_SERVERS,'s') self.servers_list = interface.filter_protocol(interface.DEFAULT_SERVERS,'s')
self.callbacks = {}
def register_callback(self, event, callback):
with self.lock:
if not self.callbacks.get(event):
self.callbacks[event] = []
self.callbacks[event].append(callback)
def trigger_callback(self, event):
with self.lock:
callbacks = self.callbacks.get(event,[])[:]
if callbacks:
[callback() for callback in callbacks]
def start_interfaces(self): def start_interfaces(self):
@ -26,22 +40,29 @@ class Network(threading.Thread):
self.interfaces[server] = interface.Interface({'server':server}) self.interfaces[server] = interface.Interface({'server':server})
for i in self.interfaces.values(): for i in self.interfaces.values():
i.network = self # fixme
i.start(self.queue) i.start(self.queue)
if self.default_server: if self.default_server:
self.interface = interface.Interface({'server':self.default_server}) self.interface = interface.Interface({'server':self.default_server})
self.interface.network = self # fixme
self.interface.start(self.queue) self.interface.start(self.queue)
else: else:
self.interface = self.interfaces[0] self.interface = self.interfaces[0]
def start(self, wait=False):
self.start_interfaces()
threading.Thread.__init__(self)
if wait:
self.interface.connect_event.wait()
return self.interface.is_connected
def run(self): def run(self):
self.blockchain.start() self.blockchain.start()
self.start_interfaces()
with self.lock: with self.lock:
self.running = True self.running = True

View File

@ -90,7 +90,7 @@ class TxVerifier(threading.Thread):
def stop(self): def stop(self):
with self.lock: self.running = False with self.lock: self.running = False
self.interface.poke('verifier') #self.interface.poke('verifier')
def is_running(self): def is_running(self):
with self.lock: return self.running with self.lock: return self.running

View File

@ -1447,7 +1447,7 @@ class WalletSynchronizer(threading.Thread):
self.was_updated = True self.was_updated = True
if self.was_updated: if self.was_updated:
self.interface.trigger_callback('updated') self.interface.network.trigger_callback('updated')
self.was_updated = False self.was_updated = False
# 2. get a response # 2. get a response
@ -1523,8 +1523,7 @@ class WalletSynchronizer(threading.Thread):
print_error("Error: Unknown message:" + method + ", " + repr(params) + ", " + repr(result) ) print_error("Error: Unknown message:" + method + ", " + repr(params) + ", " + repr(result) )
if self.was_updated and not requested_tx: if self.was_updated and not requested_tx:
self.interface.trigger_callback('updated') self.interface.network.trigger_callback('updated')
self.interface.trigger_callback("new_transaction") # Updated gets called too many times from other places as well; if we use that signal we get the notification three times self.interface.network.trigger_callback("new_transaction") # Updated gets called too many times from other places as well; if we use that signal we get the notification three times
self.was_updated = False self.was_updated = False