diff --git a/client/interface.py b/client/interface.py index 9c602e34..c86d0199 100644 --- a/client/interface.py +++ b/client/interface.py @@ -45,6 +45,14 @@ class Interface: self.disconnected_event = threading.Event() self.disconnected_event.clear() + #only asynchrnous + self.addresses_waiting_for_status = [] + self.addresses_waiting_for_history = [] + self.tx_event = threading.Event() + self.up_to_date_event = threading.Event() + self.up_to_date_event.clear() + + def send_tx(self, data): out = self.handler('transaction.broadcast', data ) return out @@ -56,42 +64,94 @@ class Interface: pass + def handle_json_response(self, c): + #print c + msg_id = c.get('id') + result = c.get('result') + error = c.get('error') + if msg_id is None: + print "error: message without ID" + return + + method, params = self.messages[msg_id] + if error: + print "received error:", c, method, params + else: + self.handle_response(method, params, result) + + + + def handle_response(self, method, params, result): + + if method == 'session.new': + self.session_id, self.message = ast.literal_eval( result ) + self.was_updated = True + + elif method == 'server.banner': + self.message = result + self.was_updated = True + + elif method == 'session.poll': + blocks, changed_addresses = ast.literal_eval( result ) + if blocks == -1: raise BaseException("session not found") + self.blocks = int(blocks) + if changed_addresses: + self.is_up_to_date = False + self.was_updated = True + for addr, status in changed_addresses.items(): + apply(self.address_callback, (addr, status)) + else: + self.is_up_to_date = True + + elif method == 'server.peers': + self.servers = map( lambda x:x[1], result ) + + elif method == 'address.subscribe': + addr = params[-1] + if addr in self.addresses_waiting_for_status: + self.addresses_waiting_for_status.remove(addr) + apply(self.address_callback,(addr, result)) + + elif method == 'address.get_history': + addr = params[0] + if addr in self.addresses_waiting_for_history: + self.addresses_waiting_for_history.remove(addr) + apply(self.history_callback, (addr, result)) + self.was_updated = True + + elif method == 'transaction.broadcast': + self.tx_result = result + self.tx_event.set() + + elif method == 'numblocks.subscribe': + self.blocks = result + if self.newblock_callback: apply(self.newblock_callback,(result,)) + else: + print "received message:", c, method, params + + + class PollingInterface(Interface): """ non-persistent connection. synchronous calls""" def start_session(self, addresses, version): - out = self.handler('session.new', [ version, addresses ] ) - self.session_id, self.message = ast.literal_eval( out ) + self.handler([('session.new', [ version, addresses ])] ) thread.start_new_thread(self.poll_thread, ()) def poll_interval(self): return 5 - def retrieve_history(self, address): - out = self.handler('address.get_history', address ) - return out - - def get_history(self, addr): - data = self.retrieve_history(addr) - apply(self.history_callback, (addr, data) ) - self.was_updated = True + def get_history(self, address): + self.handler([('address.get_history', [address] )]) def subscribe(self, addresses): for addr in addresses: - status = self.handler('address.subscribe', [ self.session_id, addr ] ) - apply(self.address_callback, (addr, status) ) + self.handler([('address.subscribe', [ self.session_id, addr ] )]) def update_wallet(self): while True: - changed_addresses = self.poll() - if changed_addresses: - self.is_up_to_date = False - else: - self.is_up_to_date = True - break - - for addr, status in changed_addresses.items(): - apply(self.address_callback, (addr, status)) + self.handler([('session.poll', self.session_id )]) + if self.is_up_to_date: break #if is_new or wallet.remote_url: # self.was_updated = True @@ -102,13 +162,6 @@ class PollingInterface(Interface): #else: # return False - def poll(self): - out = self.handler('session.poll', self.session_id ) - blocks, changed_addr = ast.literal_eval( out ) - if blocks == -1: raise BaseException("session not found") - self.blocks = int(blocks) - return changed_addr - def poll_thread(self): while self.is_connected: try: @@ -136,8 +189,7 @@ class PollingInterface(Interface): for server in DEFAULT_SERVERS: try: self.peers_server = server - out = self.handler('server.peers') - self.servers = map( lambda x:x[1], out ) + self.handler([('server.peers',[])]) # print "Received server list from %s" % self.peers_server, out break except socket.timeout: @@ -150,9 +202,12 @@ class PollingInterface(Interface): time.sleep(5*60) + + + class NativeInterface(PollingInterface): - def handler(self, method, params = ''): + def handler(self, messages): import time cmds = {'session.new':'new_session', 'server.peers':'peers', @@ -161,45 +216,72 @@ class NativeInterface(PollingInterface): 'address.get_history':'h', 'address.subscribe':'address.subscribe' } - cmd = cmds[method] - if type(params) != type(''): params = repr( params ) - t1 = time.time() - request = repr ( (cmd, params) ) + "#" - s = socket.socket( socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(DEFAULT_TIMEOUT) - s.connect(( self.host if cmd!='peers' else self.peers_server, self.port) ) - s.send( request ) - out = '' - while 1: - msg = s.recv(1024) - if msg: out += msg - else: break - s.close() - self.rtime = time.time() - t1 - self.is_connected = True - if cmd in[ 'peers','h']: - out = ast.literal_eval( out ) - return out + + for m in messages: + method, params = m + cmd = cmds[method] + + if cmd=='h': + str_params = params[0] + elif type(params) != type(''): + str_params = repr( params ) + else: + str_params = params + t1 = time.time() + request = repr ( (cmd, str_params) ) + "#" + s = socket.socket( socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(DEFAULT_TIMEOUT) + s.connect(( self.host if cmd!='peers' else self.peers_server, self.port) ) + s.send( request ) + out = '' + while 1: + msg = s.recv(1024) + if msg: out += msg + else: break + s.close() + self.rtime = time.time() - t1 + self.is_connected = True + if cmd in[ 'peers','h']: + out = ast.literal_eval( out ) + + if out=='': out=None #fixme + + self.handle_response(method, params, out) + + + class HttpInterface(PollingInterface): - def handler(self, method, params = []): + def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None): + Interface.__init__(self, host, port, address_callback, history_callback, newblock_callback) + self.message_id = 0 + self.messages = {} + + def handler(self, messages): import urllib2, json, time - if type(params) != type([]): params = [ params ] - t1 = time.time() - data = { 'method':method, 'id':'jsonrpc', 'params':params } + + data = [] + for m in messages: + method, params = m + if type(params) != type([]): params = [params] + t1 = time.time() + data.append( { 'method':method, 'id':self.message_id, 'params':params } ) + self.messages[self.message_id] = (method, params) + self.message_id += 1 + data_json = json.dumps(data) host = 'http://%s:%d'%( self.host if method!='server.peers' else self.peers_server, self.port ) req = urllib2.Request(host, data_json, {'content-type': 'application/json'}) response_stream = urllib2.urlopen(req) response = json.loads( response_stream.read() ) - out = response.get('result') - if not out: - print response + self.rtime = time.time() - t1 self.is_connected = True - return out + + for item in response: + self.handle_json_response(item) @@ -214,13 +296,6 @@ class AsynchronousInterface(Interface): self.message_id = 0 self.messages = {} - self.tx_event = threading.Event() - self.addresses_waiting_for_status = [] - self.addresses_waiting_for_history = [] - # up to date - self.up_to_date_event = threading.Event() - self.up_to_date_event.clear() - def listen_thread(self): try: self.is_connected = True @@ -239,53 +314,13 @@ class AsynchronousInterface(Interface): c = out[0:s] out = out[s+1:] c = json.loads(c) - - #print c - msg_id = c.get('id') - result = c.get('result') - error = c.get('error') - - if msg_id is None: - print "error: message without ID" - continue - - method, params = self.messages[msg_id] - - if method == 'server.banner': - self.message = result - self.was_updated = True - - elif method == 'server.peers': - self.servers = map( lambda x:x[1], result ) - - elif method == 'address.subscribe': - addr = params[0] - if addr in self.addresses_waiting_for_status: - self.addresses_waiting_for_status.remove(addr) - apply(self.address_callback,(addr, result)) - - elif method == 'address.get_history': - addr = params[0] - if addr in self.addresses_waiting_for_history: - self.addresses_waiting_for_history.remove(addr) - apply(self.history_callback, (addr, result)) - self.was_updated = True - - elif method == 'transaction.broadcast': - self.tx_result = result - self.tx_event.set() - - elif method == 'numblocks.subscribe': - self.blocks = result - if self.newblock_callback: apply(self.newblock_callback,(result,)) - else: - print "received message:", c - + self.handle_json_response(c) if self.addresses_waiting_for_status or self.addresses_waiting_for_history: self.is_up_to_date = False else: self.is_up_to_date = True self.up_to_date_event.set() + except: traceback.print_exc(file=sys.stdout) @@ -336,6 +371,7 @@ class AsynchronousInterface(Interface): + def new_interface(wallet): if wallet.host: host = wallet.host diff --git a/client/wallet.py b/client/wallet.py index e9fc579c..f33c9cf1 100644 --- a/client/wallet.py +++ b/client/wallet.py @@ -701,12 +701,12 @@ class Wallet: def receive_status_callback(self, addr, status): if self.status.get(addr) != status: - #print "updating status for", addr + #print "updating status for", addr, repr(self.status.get(addr)), repr(status) self.status[addr] = status self.interface.get_history(addr) def receive_history_callback(self, addr, data): - #print "updating history for", addr + #print "updating history for", addr, repr(data) self.history[addr] = data self.synchronize() self.update_tx_history()