network: separate callbacks from unanswered_requests

This commit is contained in:
ThomasV 2015-11-26 10:57:43 +01:00
parent d8ca881457
commit 43df795b1f
1 changed files with 25 additions and 26 deletions

View File

@ -166,7 +166,9 @@ class Network(util.DaemonThread):
self.heights = {} self.heights = {}
self.merkle_roots = {} self.merkle_roots = {}
self.utxo_roots = {} self.utxo_roots = {}
# callbacks passed with subscriptions
self.subscriptions = defaultdict(list) self.subscriptions = defaultdict(list)
# callbacks set by the GUI
self.callbacks = defaultdict(list) self.callbacks = defaultdict(list)
dir_path = os.path.join( self.config.path, 'certs') dir_path = os.path.join( self.config.path, 'certs')
@ -454,7 +456,7 @@ class Network(util.DaemonThread):
self.switch_lagging_interface(i.server) self.switch_lagging_interface(i.server)
self.notify('updated') self.notify('updated')
def process_response(self, interface, response, callback): def process_response(self, interface, response):
if self.debug: if self.debug:
self.print_error("<--", response) self.print_error("<--", response)
error = response.get('error') error = response.get('error')
@ -485,17 +487,9 @@ class Network(util.DaemonThread):
elif method == 'blockchain.block.get_header': elif method == 'blockchain.block.get_header':
self.on_get_header(interface, response) self.on_get_header(interface, response)
else: else:
if callback is None: params = response['params']
params = response['params'] callbacks = self.subscriptions.get(repr((method, params)), [])
with self.lock: for callback in callbacks:
for k,v in self.subscriptions.items():
if (method, params) in v:
callback = k
break
if callback is None:
self.print_error("received unexpected notification",
method, params)
else:
callback(response) callback(response)
def process_responses(self, interface): def process_responses(self, interface):
@ -511,7 +505,7 @@ class Network(util.DaemonThread):
client_req = self.unanswered_requests.pop(message_id, None) client_req = self.unanswered_requests.pop(message_id, None)
if client_req: if client_req:
assert interface == self.interface assert interface == self.interface
callback = client_req[2]
# Copy the request method and params to the response # Copy the request method and params to the response
response['method'] = method response['method'] = method
response['params'] = params response['params'] = params
@ -534,12 +528,21 @@ class Network(util.DaemonThread):
response['result'] = params[1] response['result'] = params[1]
# Response is now in canonical form # Response is now in canonical form
self.process_response(interface, response, callback) self.process_response(interface, response)
def send(self, messages, callback): def send(self, messages, callback):
'''Messages is a list of (method, params) tuples''' '''Messages is a list of (method, params) tuples'''
with self.lock: with self.lock:
self.pending_sends.append((messages, callback)) subs = filter(lambda (m,v): m.endswith('.subscribe'), messages)
for method, params in subs:
k = repr((method, params))
l = self.subscriptions.get(k, [])
if callback not in l:
l.append(callback)
self.subscriptions[k] = l
self.pending_sends += messages
def process_pending_sends(self): def process_pending_sends(self):
# Requests needs connectivity. If we don't have an interface, # Requests needs connectivity. If we don't have an interface,
@ -551,23 +554,19 @@ class Network(util.DaemonThread):
sends = self.pending_sends sends = self.pending_sends
self.pending_sends = [] self.pending_sends = []
for messages, callback in sends: for method, params in sends:
subs = filter(lambda (m,v): m.endswith('.subscribe'), messages) message_id = self.queue_request(method, params)
with self.lock: self.unanswered_requests[message_id] = method, params
for sub in subs:
if sub not in self.subscriptions[callback]:
self.subscriptions[callback].append(sub)
for method, params in messages:
message_id = self.queue_request(method, params)
self.unanswered_requests[message_id] = method, params, callback
def unsubscribe(self, callback): def unsubscribe(self, callback):
'''Unsubscribe a callback to free object references to enable GC.''' '''Unsubscribe a callback to free object references to enable GC.'''
# Note: we can't unsubscribe from the server, so if we receive # Note: we can't unsubscribe from the server, so if we receive
# subsequent notifications process_response() will emit a harmless # subsequent notifications process_response() will emit a harmless
# "received unexpected notification" warning # "received unexpected notification" warning
self.subscriptions.pop(callback, None) with self.lock:
for v in self.subscriptions.values():
if callback in v:
v.remove(callback)
def connection_down(self, server): def connection_down(self, server):
'''A connection to server either went down, or was never made. '''A connection to server either went down, or was never made.