network: cache subscription responses

This commit is contained in:
ThomasV 2015-11-26 11:26:01 +01:00
parent 43df795b1f
commit 042f8ef832
1 changed files with 25 additions and 13 deletions

View File

@ -168,6 +168,7 @@ class Network(util.DaemonThread):
self.utxo_roots = {}
# callbacks passed with subscriptions
self.subscriptions = defaultdict(list)
self.sub_cache = {}
# callbacks set by the GUI
self.callbacks = defaultdict(list)
@ -275,6 +276,7 @@ class Network(util.DaemonThread):
def send_subscriptions(self):
self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses))
self.sub_cache.clear()
# Resend unanswered requests
requests = self.unanswered_requests.values()
self.unanswered_requests = {}
@ -462,6 +464,7 @@ class Network(util.DaemonThread):
error = response.get('error')
result = response.get('result')
method = response.get('method')
params = response.get('params')
# We handle some responses; return the rest to the client.
if method == 'server.version':
@ -486,9 +489,11 @@ class Network(util.DaemonThread):
self.on_get_chunk(interface, response)
elif method == 'blockchain.block.get_header':
self.on_get_header(interface, response)
else:
params = response['params']
callbacks = self.subscriptions.get(repr((method, params)), [])
elif method.endswith('.subscribe'):
k = repr((method, params))
self.sub_cache[k] = response
callbacks = self.subscriptions.get(k, [])
for callback in callbacks:
callback(response)
@ -496,7 +501,6 @@ class Network(util.DaemonThread):
responses = interface.get_responses()
for request, response in responses:
callback = None
if request:
method, params, message_id = request
# client requests go through self.send() with a
@ -505,7 +509,6 @@ class Network(util.DaemonThread):
client_req = self.unanswered_requests.pop(message_id, None)
if client_req:
assert interface == self.interface
# Copy the request method and params to the response
response['method'] = method
response['params'] = params
@ -534,14 +537,23 @@ class Network(util.DaemonThread):
'''Messages is a list of (method, params) tuples'''
with self.lock:
subs = filter(lambda (m,v): m.endswith('.subscribe'), messages)
for method, params in subs:
k = repr((method, params))
l = self.subscriptions.get(k, [])
if callback not in l:
l.append(callback)
self.subscriptions[k] = l
self.pending_sends += messages
for message in messages:
method, params = message
if method.endswith('.subscribe'):
k = repr((method, params))
l = self.subscriptions.get(k, [])
if callback not in l:
l.append(callback)
self.subscriptions[k] = l
# check cached response
r = self.sub_cache.get(k)
if r is not None:
util.print_error("cache hit", k)
callback(r)
else:
self.pending_sends.append(message)
else:
self.pending_sends.append(message)
def process_pending_sends(self):