queue responses

This commit is contained in:
ThomasV 2012-03-23 14:37:02 +01:00
parent 5b15fa0539
commit 39895f41cc
1 changed files with 35 additions and 23 deletions

View File

@ -18,7 +18,7 @@
import random, socket, ast
import thread, threading, traceback, sys, time, json
import thread, threading, traceback, sys, time, json, Queue
DEFAULT_TIMEOUT = 5
DEFAULT_SERVERS = ['electrum.bitcoins.sk','ecdsa.org','electrum.novit.ro'] # list of default servers
@ -40,8 +40,6 @@ class Interface:
self.is_up_to_date = False
self.is_connected = False
self.disconnected_event = threading.Event()
self.disconnected_event.clear()
#only asynchrnous
self.addresses_waiting_for_status = []
@ -54,6 +52,8 @@ class Interface:
self.message_id = 0
self.messages = {}
self.responses = Queue.Queue()
def send_tx(self, data):
self.tx_event.clear()
self.send([('transaction.broadcast', [data])])
@ -65,7 +65,7 @@ class Interface:
pass
def handle_json_response(self, c):
def queue_json_response(self, c):
#print repr(c)
msg_id = c.get('id')
result = c.get('result')
@ -79,13 +79,21 @@ class Interface:
if error:
print "received error:", c, method, params
else:
self.handle_response(method, params, result)
self.is_up_to_date = True
#self.handle_response(method, params, result)
self.responses.put({'method':method, 'params':params, 'result':result})
#self.is_up_to_date = True
def handle_response(self, method, params, result):
def handle_response(self, r):
if r is None:
print "empty item"
return
method = r['method']
params = r['params']
result = r['result']
if method == 'server.banner':
self.message = result
@ -135,6 +143,12 @@ class Interface:
else:
print "unknown message:", method, params, result
if self.addresses_waiting_for_status or self.addresses_waiting_for_history:
self.is_up_to_date = False
else:
self.is_up_to_date = True
self.up_to_date_event.set()
def subscribe(self, addresses):
messages = []
@ -210,7 +224,7 @@ class PollingInterface(Interface):
break
self.is_connected = False
self.disconnected_event.set()
self.responses.put(None)
@ -269,7 +283,7 @@ class NativeInterface(PollingInterface):
self.session_id, self.message = ast.literal_eval( out )
self.was_updated = True
else:
self.handle_response(method, params, out)
self.responses.put({'method':method, 'params':params, 'result':out})
@ -323,14 +337,12 @@ class HttpInterface(PollingInterface):
response = response_stream.read()
if response:
#print "response",response
response = json.loads( response )
if type(response) is not type([]):
self.handle_json_response(response)
self.queue_json_response(response)
else:
for item in response:
self.handle_json_response(item)
self.queue_json_response(item)
self.rtime = time.time() - t1
self.is_connected = True
@ -359,18 +371,13 @@ class AsynchronousInterface(Interface):
c = out[0:s]
out = out[s+1:]
c = json.loads(c)
self.handle_json_response(c)
if self.addresses_waiting_for_status or self.addresses_waiting_for_history:
self.is_up_to_date = False
else:
self.is_up_to_date = True
self.up_to_date_event.set()
self.queue_json_response(c)
except:
traceback.print_exc(file=sys.stdout)
self.is_connected = False
self.disconnected_event.set()
self.responses.put(None)
def update_wallet(self):
self.up_to_date_event.wait()
@ -391,7 +398,9 @@ class AsynchronousInterface(Interface):
def start_session(self, addresses, version):
self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
self.s.settimeout(1)
self.s.settimeout(5)
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.s.connect(( self.host, self.port))
thread.start_new_thread(self.listen_thread, ())
self.send([('client.version', [version]), ('server.banner',[]), ('numblocks.subscribe',[]), ('server.peers',[])])
@ -432,7 +441,10 @@ def loop_interfaces_thread(wallet):
version = wallet.electrum_version
wallet.interface.start_session(addresses, version)
wallet.interface.disconnected_event.wait()
while wallet.interface.is_connected:
response = wallet.interface.responses.get()
wallet.interface.handle_response(response)
print "Disconnected"
except socket.error:
print "socket error"