daemon notifications (network.status)
This commit is contained in:
parent
9ee0614edb
commit
09e4efc439
|
@ -793,8 +793,7 @@ class MiniDriver(QObject):
|
|||
|
||||
if self.network:
|
||||
self.network.register_callback('updated',self.update_callback)
|
||||
self.network.register_callback('connected', self.update_callback)
|
||||
self.network.register_callback('disconnected', self.update_callback)
|
||||
self.network.register_callback('status', self.update_callback)
|
||||
|
||||
self.state = None
|
||||
|
||||
|
|
|
@ -179,8 +179,7 @@ class ElectrumWindow(QMainWindow):
|
|||
if self.network:
|
||||
self.network.register_callback('updated', lambda: self.need_update.set())
|
||||
self.network.register_callback('banner', lambda: self.emit(QtCore.SIGNAL('banner_signal')))
|
||||
self.network.register_callback('disconnected', lambda: self.emit(QtCore.SIGNAL('update_status')))
|
||||
self.network.register_callback('disconnecting', lambda: self.emit(QtCore.SIGNAL('update_status')))
|
||||
self.network.register_callback('status', lambda: self.emit(QtCore.SIGNAL('update_status')))
|
||||
self.network.register_callback('new_transaction', lambda: self.emit(QtCore.SIGNAL('transaction_signal')))
|
||||
|
||||
# set initial message
|
||||
|
|
|
@ -60,14 +60,16 @@ class ClientThread(threading.Thread):
|
|||
self.server = server
|
||||
self.daemon = True
|
||||
self.s = s
|
||||
self.s.settimeout(0.1)
|
||||
self.network = network
|
||||
self.queue = Queue.Queue()
|
||||
self.unanswered_requests = {}
|
||||
self.debug = False
|
||||
self.server.add_client(self)
|
||||
|
||||
|
||||
def run(self):
|
||||
self.server.add_client(self)
|
||||
|
||||
message = ''
|
||||
while True:
|
||||
self.send_responses()
|
||||
|
@ -140,9 +142,9 @@ class ClientThread(threading.Thread):
|
|||
class NetworkServer:
|
||||
|
||||
def __init__(self, config):
|
||||
network = Network(config)
|
||||
network.start(wait=False)
|
||||
self.network = network
|
||||
self.network = Network(config)
|
||||
self.network.trigger_callback = self.trigger_callback
|
||||
self.network.start()
|
||||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.daemon_port = config.get('daemon_port', DAEMON_PORT)
|
||||
|
@ -150,28 +152,44 @@ class NetworkServer:
|
|||
self.socket.listen(5)
|
||||
self.socket.settimeout(1)
|
||||
self.running = False
|
||||
# daemon terminates after period of inactivity
|
||||
self.timeout = config.get('daemon_timeout', 60)
|
||||
|
||||
#
|
||||
self.lock = threading.RLock()
|
||||
|
||||
# each GUI is a client of the daemon
|
||||
self.clients = []
|
||||
# need to know which client subscribed to which address
|
||||
#
|
||||
# report status
|
||||
self.network.status_callback = self.on_status
|
||||
# daemon needs to know which client subscribed to which address
|
||||
|
||||
|
||||
def add_client(self, client):
|
||||
for key in ['status','banner','updated','servers']:
|
||||
value = self.get_status_value(key)
|
||||
client.queue.put({'method':'network.status', 'params':[key, value]})
|
||||
with self.lock:
|
||||
self.clients.append(client)
|
||||
|
||||
|
||||
def remove_client(self, client):
|
||||
with self.lock:
|
||||
self.clients.remove(client)
|
||||
print_error("client quit:", len(self.clients))
|
||||
|
||||
def on_status(self, status):
|
||||
def get_status_value(self, key):
|
||||
if key == 'status':
|
||||
value = self.network.connection_status
|
||||
elif key == 'banner':
|
||||
value = self.network.banner
|
||||
elif key == 'updated':
|
||||
value = self.network.get_local_height()
|
||||
elif key == 'servers':
|
||||
value = self.network.irc_servers
|
||||
return value
|
||||
|
||||
def trigger_callback(self, key):
|
||||
value = self.get_status_value(key)
|
||||
print_error("daemon trigger callback", key, len(self.clients))
|
||||
for client in self.clients:
|
||||
client.queue.put({'method':'network.subscribe', 'status':status})
|
||||
client.queue.put({'method':'network.status', 'params':[key, value]})
|
||||
|
||||
def main_loop(self):
|
||||
self.running = True
|
||||
|
@ -188,7 +206,8 @@ class NetworkServer:
|
|||
continue
|
||||
client = ClientThread(self, self.network, connection)
|
||||
client.start()
|
||||
print_error("daemon: timed out")
|
||||
|
||||
print_error("Daemon exiting (timeout)")
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -70,6 +70,8 @@ def pick_random_server(p='s'):
|
|||
|
||||
from simple_config import SimpleConfig
|
||||
|
||||
|
||||
|
||||
class Network(threading.Thread):
|
||||
|
||||
def __init__(self, config=None):
|
||||
|
@ -115,6 +117,13 @@ class Network(threading.Thread):
|
|||
self.subscriptions[self.on_peers] = [('server.peers.subscribe',[])]
|
||||
self.pending_transactions_for_notifications = []
|
||||
|
||||
self.connection_status = 'disconnected'
|
||||
|
||||
|
||||
def set_status(self, status):
|
||||
self.connection_status = status
|
||||
self.trigger_callback('status')
|
||||
|
||||
|
||||
def is_connected(self):
|
||||
return self.interface and self.interface.is_connected
|
||||
|
@ -161,6 +170,7 @@ class Network(threading.Thread):
|
|||
|
||||
|
||||
def trigger_callback(self, event):
|
||||
# note: this method is overwritten by daemon
|
||||
with self.lock:
|
||||
callbacks = self.callbacks.get(event,[])[:]
|
||||
if callbacks:
|
||||
|
@ -212,32 +222,14 @@ class Network(threading.Thread):
|
|||
|
||||
def start_interfaces(self):
|
||||
self.interface = self.start_interface(self.default_server)
|
||||
|
||||
for i in range(self.num_server):
|
||||
self.start_random_interface()
|
||||
|
||||
|
||||
def start(self, wait=False):
|
||||
def start(self):
|
||||
self.start_interfaces()
|
||||
threading.Thread.start(self)
|
||||
if wait:
|
||||
raise
|
||||
return self.wait_until_connected()
|
||||
|
||||
def wait_until_connected(self):
|
||||
"wait until connection status is known"
|
||||
if self.config.get('auto_cycle'):
|
||||
# self.random_server() returns None if all servers have been tried
|
||||
while not self.is_connected() and self.random_server():
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
self.interface.connect_event.wait()
|
||||
|
||||
return self.interface.is_connected
|
||||
|
||||
|
||||
def set_parameters(self, host, port, protocol, proxy, auto_connect):
|
||||
|
||||
self.config.set_key('auto_cycle', auto_connect, True)
|
||||
self.config.set_key("proxy", proxy, True)
|
||||
self.config.set_key("protocol", protocol, True)
|
||||
|
@ -277,7 +269,7 @@ class Network(threading.Thread):
|
|||
self.config.set_key('server', server, False)
|
||||
self.default_server = server
|
||||
self.send_subscriptions()
|
||||
self.trigger_callback('connected')
|
||||
self.set_status('connected')
|
||||
|
||||
|
||||
def stop_interface(self):
|
||||
|
@ -296,7 +288,7 @@ class Network(threading.Thread):
|
|||
self.stop_interface()
|
||||
|
||||
# notify gui
|
||||
self.trigger_callback('disconnecting')
|
||||
self.set_status('connecting')
|
||||
# start interface
|
||||
self.default_server = server
|
||||
self.config.set_key("server", server, True)
|
||||
|
@ -357,7 +349,7 @@ class Network(threading.Thread):
|
|||
if i == self.interface:
|
||||
print_error('sending subscriptions to', self.interface.server)
|
||||
self.send_subscriptions()
|
||||
self.trigger_callback('connected')
|
||||
self.set_status('connected')
|
||||
else:
|
||||
self.disconnected_servers.add(i.server)
|
||||
if i.server in self.interfaces:
|
||||
|
@ -366,7 +358,7 @@ class Network(threading.Thread):
|
|||
self.heights.pop(i.server)
|
||||
if i == self.interface:
|
||||
#self.interface = None
|
||||
self.trigger_callback('disconnected')
|
||||
self.set_status('disconnected')
|
||||
|
||||
if not self.interface.is_connected and self.config.get('auto_cycle'):
|
||||
self.switch_to_random_interface()
|
||||
|
@ -397,7 +389,7 @@ class Network(threading.Thread):
|
|||
def on_peers(self, i, r):
|
||||
if not r: return
|
||||
self.irc_servers = parse_servers(r.get('result'))
|
||||
self.trigger_callback('peers')
|
||||
self.trigger_callback('servers')
|
||||
|
||||
def on_banner(self, i, r):
|
||||
self.banner = r.get('result')
|
||||
|
|
|
@ -49,11 +49,16 @@ class NetworkProxy(threading.Thread):
|
|||
self.debug = False
|
||||
self.lock = threading.Lock()
|
||||
self.pending_transactions_for_notifications = []
|
||||
self.banner = ''
|
||||
self.callbacks = {}
|
||||
self.running = True
|
||||
self.daemon = True
|
||||
|
||||
# status variables
|
||||
self.status = 'disconnected'
|
||||
self.servers = []
|
||||
self.banner = ''
|
||||
self.height = 0
|
||||
|
||||
def is_running(self):
|
||||
return self.running
|
||||
|
||||
|
@ -85,9 +90,18 @@ class NetworkProxy(threading.Thread):
|
|||
if self.debug:
|
||||
print_error("<--", response)
|
||||
|
||||
if response.get('method') == 'network.subscribe':
|
||||
status = response.get('status')
|
||||
self.trigger_callback(status)
|
||||
if response.get('method') == 'network.status':
|
||||
#print_error("<--", response)
|
||||
key, value = response.get('params')
|
||||
if key == 'status':
|
||||
self.status = value
|
||||
elif key == 'banner':
|
||||
self.banner = value
|
||||
elif key == 'updated':
|
||||
self.height = value
|
||||
elif key == 'servers':
|
||||
self.servers = value
|
||||
self.trigger_callback(key)
|
||||
return
|
||||
|
||||
msg_id = response.get('id')
|
||||
|
@ -150,16 +164,16 @@ class NetworkProxy(threading.Thread):
|
|||
|
||||
|
||||
def get_servers(self):
|
||||
return self.synchronous_get([('network.get_servers',[])])[0]
|
||||
return self.servers
|
||||
|
||||
def get_header(self, height):
|
||||
return self.synchronous_get([('network.get_header',[height])])[0]
|
||||
|
||||
def get_local_height(self):
|
||||
return self.synchronous_get([('network.get_local_height',[])])[0]
|
||||
return self.height
|
||||
|
||||
def is_connected(self):
|
||||
return self.synchronous_get([('network.is_connected',[])])[0]
|
||||
return self.status == 'connected'
|
||||
|
||||
def is_up_to_date(self):
|
||||
return self.synchronous_get([('network.is_up_to_date',[])])[0]
|
||||
|
@ -170,14 +184,12 @@ class NetworkProxy(threading.Thread):
|
|||
def stop(self):
|
||||
self.running = False
|
||||
|
||||
|
||||
def register_callback(self, event, callback):
|
||||
with self.lock:
|
||||
if not self.callbacks.get(event):
|
||||
self.callbacks[event] = []
|
||||
self.callbacks[event].append(callback)
|
||||
|
||||
|
||||
def trigger_callback(self, event):
|
||||
with self.lock:
|
||||
callbacks = self.callbacks.get(event,[])[:]
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
|
||||
|
||||
import threading
|
||||
import time
|
||||
import Queue
|
||||
|
||||
import bitcoin
|
||||
from util import print_error
|
||||
from transaction import Transaction
|
||||
|
@ -38,10 +40,12 @@ class WalletSynchronizer(threading.Thread):
|
|||
self.address_queue = Queue.Queue()
|
||||
|
||||
def stop(self):
|
||||
with self.lock: self.running = False
|
||||
with self.lock:
|
||||
self.running = False
|
||||
|
||||
def is_running(self):
|
||||
with self.lock: return self.running
|
||||
with self.lock:
|
||||
return self.running
|
||||
|
||||
def add(self, address):
|
||||
self.address_queue.put(address)
|
||||
|
@ -57,9 +61,7 @@ class WalletSynchronizer(threading.Thread):
|
|||
self.running = True
|
||||
while self.is_running():
|
||||
while not self.network.is_connected():
|
||||
import time
|
||||
time.sleep(5)
|
||||
#self.network.wait_until_connected()
|
||||
time.sleep(1)
|
||||
self.run_interface()
|
||||
|
||||
def run_interface(self):
|
||||
|
|
Loading…
Reference in New Issue