From 6b90109cc82fb0cd7691c44e7bd71324b3507cde Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 12:03:57 +0100 Subject: [PATCH 01/12] virtual class PollingInterface --- client/interface.py | 68 +++++++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/client/interface.py b/client/interface.py index 203211fe..fc0320e3 100644 --- a/client/interface.py +++ b/client/interface.py @@ -53,8 +53,8 @@ class Interface: pass -class NativeInterface(Interface): - """This is the original Electrum protocol. It uses polling, and a non-persistent tcp connection""" +class PollingInterface(Interface): + """ non-persistent connection """ def __init__(self, host, port): Interface.__init__(self, host, port) @@ -70,36 +70,6 @@ class NativeInterface(Interface): out = self.handler('session.update', [ self.session_id, addresses ] ) return out - def handler(self, method, params = ''): - import time - cmds = {'session.new':'new_session', - 'peers':'peers', - 'session.poll':'poll', - 'session.update':'update_session', - 'transaction.broadcast':'tx', - 'address.get_history':'h', - 'address.subscribe':'address.subscribe' - } - cmd = cmds[method] - if type(params) != type(''): params = repr( params ) - t1 = time.time() - request = repr ( (cmd, params) ) + "#" - s = socket.socket( socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(DEFAULT_TIMEOUT) - s.connect(( self.host if cmd!='peers' else self.peers_server, self.port) ) - s.send( request ) - out = '' - while 1: - msg = s.recv(1024) - if msg: out += msg - else: break - s.close() - self.rtime = time.time() - t1 - self.is_connected = True - if cmd in[ 'peers','h']: - out = ast.literal_eval( out ) - return out - def poll_interval(self): return 5 @@ -185,8 +155,40 @@ class NativeInterface(Interface): time.sleep(5*60) +class NativeInterface(PollingInterface): -class HttpInterface(NativeInterface): + def handler(self, method, params = ''): + import time + cmds = {'session.new':'new_session', + 'peers':'peers', + 'session.poll':'poll', + 'session.update':'update_session', + 'transaction.broadcast':'tx', + 'address.get_history':'h', + 'address.subscribe':'address.subscribe' + } + cmd = cmds[method] + if type(params) != type(''): params = repr( params ) + t1 = time.time() + request = repr ( (cmd, params) ) + "#" + s = socket.socket( socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(DEFAULT_TIMEOUT) + s.connect(( self.host if cmd!='peers' else self.peers_server, self.port) ) + s.send( request ) + out = '' + while 1: + msg = s.recv(1024) + if msg: out += msg + else: break + s.close() + self.rtime = time.time() - t1 + self.is_connected = True + if cmd in[ 'peers','h']: + out = ast.literal_eval( out ) + return out + + +class HttpInterface(PollingInterface): def handler(self, method, params = []): import urllib2, json, time From 75344be7daea0a6289e2d97eee11d9a8fa891c3e Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 12:07:36 +0100 Subject: [PATCH 02/12] minor --- client/interface.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/client/interface.py b/client/interface.py index fc0320e3..16afc710 100644 --- a/client/interface.py +++ b/client/interface.py @@ -36,7 +36,7 @@ class Interface: self.blocks = 0 self.message = '' self.was_updated = True # fixme: use a semaphore - self.is_up_to_date = False # True after the first poll + self.is_up_to_date = False self.is_connected = False self.disconnected_event = threading.Event() @@ -54,7 +54,7 @@ class Interface: class PollingInterface(Interface): - """ non-persistent connection """ + """ non-persistent connection. synchronous calls""" def __init__(self, host, port): Interface.__init__(self, host, port) @@ -213,7 +213,7 @@ class HttpInterface(PollingInterface): import threading class TCPInterface(Interface): - """json-rpc over persistent TCP connection""" + """json-rpc over persistent TCP connection, asynchronous""" def __init__(self, host, port): Interface.__init__(self, host, port) @@ -224,7 +224,6 @@ class TCPInterface(Interface): self.addresses_waiting_for_status = [] self.addresses_waiting_for_history = [] # up to date - self.is_up_to_date = False self.up_to_date_event = threading.Event() self.up_to_date_event.clear() From 4b6e163a3912705202cf2775438b04185f71caf2 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 12:36:27 +0100 Subject: [PATCH 03/12] better encapsulation --- client/electrum | 6 +++- client/interface.py | 67 +++++++++++++++++++++++---------------------- client/wallet.py | 4 +-- 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/client/electrum b/client/electrum index b966920e..c5468219 100755 --- a/client/electrum +++ b/client/electrum @@ -164,7 +164,11 @@ if __name__ == '__main__': # open session if cmd not in ['password', 'mktx', 'history', 'label', 'contacts', 'help', 'validateaddress', 'signmessage', 'verifymessage', 'eval', 'create', 'addresses', 'import']: - interface.start_session(wallet) + + addresses = wallet.all_addresses() + version = wallet.electrum_version + address_callback = wallet.retrieve_status_callback + interface.start_session(addresses, version, address_callback) interface.update_wallet(wallet) wallet.save() diff --git a/client/interface.py b/client/interface.py index 16afc710..037d43be 100644 --- a/client/interface.py +++ b/client/interface.py @@ -27,9 +27,11 @@ DEFAULT_SERVERS = ['ecdsa.org','electrum.novit.ro'] # list of default servers class Interface: - def __init__(self, host, port): + def __init__(self, host, port, address_callback, history_callback): self.host = host self.port = port + self.address_callback = address_callback + self.history_callback = history_callback self.servers = DEFAULT_SERVERS # actual list from IRC self.rtime = 0 @@ -56,15 +58,10 @@ class Interface: class PollingInterface(Interface): """ non-persistent connection. synchronous calls""" - def __init__(self, host, port): - Interface.__init__(self, host, port) - - def start_session(self, wallet): - addresses = wallet.all_addresses() - version = wallet.electrum_version + def start_session(self, addresses, version): out = self.handler('session.new', [ version, addresses ] ) self.session_id, self.message = ast.literal_eval( out ) - thread.start_new_thread(self.poll_thread, (wallet,)) + thread.start_new_thread(self.poll_thread, ()) def update_session(self, addresses): out = self.handler('session.update', [ self.session_id, addresses ] ) @@ -77,16 +74,16 @@ class PollingInterface(Interface): out = self.handler('address.get_history', address ) return out - def get_history(self, addr, history_callback): + def get_history(self, addr): data = self.retrieve_history(addr) - apply(history_callback, (addr, data) ) + apply(self.history_callback, (addr, data) ) self.was_updated = True - def subscribe(self, addr, status_callback): + def subscribe(self, addr): status = self.handler('address.subscribe', [ self.session_id, addr ] ) - apply(status_callback, (addr, status) ) + apply(self.address_callback, (addr, status) ) - def update_wallet(self, wallet): + def update_wallet(self): while True: changed_addresses = self.poll() if changed_addresses: @@ -96,7 +93,7 @@ class PollingInterface(Interface): break for addr, status in changed_addresses.items(): - wallet.receive_status_callback(addr, status) + apply(self.address_callback, (addr, status)) #if is_new or wallet.remote_url: # self.was_updated = True @@ -114,10 +111,10 @@ class PollingInterface(Interface): self.blocks = int(blocks) return changed_addr - def poll_thread(self, wallet): + def poll_thread(self): while self.is_connected: try: - self.update_wallet(wallet) + self.update_wallet() time.sleep(self.poll_interval()) except socket.gaierror: break @@ -215,8 +212,8 @@ import threading class TCPInterface(Interface): """json-rpc over persistent TCP connection, asynchronous""" - def __init__(self, host, port): - Interface.__init__(self, host, port) + def __init__(self, host, port, acb, hcb): + Interface.__init__(self, host, port, acb, hcb) self.message_id = 0 self.messages = {} @@ -233,7 +230,7 @@ class TCPInterface(Interface): self.s.send( request + '\n' ) self.message_id += 1 - def listen_thread(self, wallet): + def listen_thread(self): try: self.is_connected = True out = '' @@ -274,13 +271,13 @@ class TCPInterface(Interface): addr = params[0] if addr in self.addresses_waiting_for_status: self.addresses_waiting_for_status.remove(addr) - wallet.receive_status_callback(addr, result) + apply(self.address_callback,(addr, result)) elif method == 'address.get_history': addr = params[0] if addr in self.addresses_waiting_for_history: self.addresses_waiting_for_history.remove(addr) - wallet.receive_history_callback(addr, result) + apply(self.history_callback, (addr, result)) self.was_updated = True elif method == 'transaction.broadcast': @@ -304,7 +301,7 @@ class TCPInterface(Interface): self.is_connected = False self.disconnected_event.set() - def update_wallet(self,wallet): + def update_wallet(self,cb): self.up_to_date_event.wait() def send_tx(self, data): @@ -313,27 +310,27 @@ class TCPInterface(Interface): self.tx_event.wait() return self.tx_result - def subscribe(self, address, callback): + def subscribe(self, address): self.send('address.subscribe', [address]) self.addresses_waiting_for_status.append(address) def get_servers(self): self.send('server.peers') - def get_history(self, addr, callback): + def get_history(self, addr): self.send('address.get_history', [addr]) self.addresses_waiting_for_history.append(addr) - def start_session(self, wallet): + def start_session(self, addresses, version): self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) self.s.settimeout(1) self.s.connect(( self.host, self.port)) - thread.start_new_thread(self.listen_thread, (wallet,)) - self.send('client.version', [wallet.electrum_version]) + thread.start_new_thread(self.listen_thread, ()) + self.send('client.version', [version]) self.send('server.banner') self.send('numblocks.subscribe') - for address in wallet.all_addresses(): - self.subscribe(address, wallet.receive_status_callback) + for addr in addresses: + self.subscribe(addr) @@ -346,13 +343,15 @@ def new_interface(wallet): else: host = random.choice( DEFAULT_SERVERS ) # random choice when the wallet is created port = wallet.port + address_cb = wallet.receive_status_callback + history_cb = wallet.receive_history_callback if port == 50000: - interface = NativeInterface(host,port) + interface = NativeInterface(host, port, address_cb, history_cb) elif port == 50001: - interface = TCPInterface(host,port) + interface = TCPInterface(host, port, address_cb, history_cb) elif port in [80, 81, 8080, 8081]: - interface = HttpInterface(host,port) + interface = HttpInterface(host, port, address_cb, history_cb) else: print "unknown port number: %d. using native protocol."%port interface = NativeInterface(host,port) @@ -363,7 +362,9 @@ def new_interface(wallet): def loop_interfaces_thread(wallet): while True: try: - wallet.interface.start_session(wallet) + addresses = wallet.all_addresses() + version = wallet.electrum_version + wallet.interface.start_session(addresses, version) wallet.interface.get_servers() wallet.interface.disconnected_event.wait() diff --git a/client/wallet.py b/client/wallet.py index bd815f49..e8954eeb 100644 --- a/client/wallet.py +++ b/client/wallet.py @@ -456,7 +456,7 @@ class Wallet: def create_new_address(self, bool): address = self.create_new_address_without_history(bool) - self.interface.subscribe(address, self.receive_status_callback) + self.interface.subscribe(address) return address @@ -702,7 +702,7 @@ class Wallet: def receive_status_callback(self, addr, status): if self.status.get(addr) != status: self.status[addr] = status - self.interface.get_history(addr, self.receive_history_callback) + self.interface.get_history(addr) def receive_history_callback(self, addr, data): #print "updating history for", addr From 9039cf959ab39fd7f357d830968419ad742cd4ef Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 12:51:16 +0100 Subject: [PATCH 04/12] callback for blocks --- client/blocks | 12 ++++++++++++ client/interface.py | 13 +++++++------ 2 files changed, 19 insertions(+), 6 deletions(-) create mode 100755 client/blocks diff --git a/client/blocks b/client/blocks new file mode 100755 index 00000000..f1a83fe3 --- /dev/null +++ b/client/blocks @@ -0,0 +1,12 @@ +#!/usr/bin/env python + +import socket, time, interface + +def cb(block_number): + print block_number + +i = interface.TCPInterface('ecdsa.org', 50001, newblock_callback=cb) +i.start_session([],"zob") + +while True: + time.sleep(1) diff --git a/client/interface.py b/client/interface.py index 037d43be..8a2d461d 100644 --- a/client/interface.py +++ b/client/interface.py @@ -27,11 +27,12 @@ DEFAULT_SERVERS = ['ecdsa.org','electrum.novit.ro'] # list of default servers class Interface: - def __init__(self, host, port, address_callback, history_callback): + def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None): self.host = host self.port = port self.address_callback = address_callback self.history_callback = history_callback + self.newblock_callback = newblock_callback self.servers = DEFAULT_SERVERS # actual list from IRC self.rtime = 0 @@ -212,8 +213,8 @@ import threading class TCPInterface(Interface): """json-rpc over persistent TCP connection, asynchronous""" - def __init__(self, host, port, acb, hcb): - Interface.__init__(self, host, port, acb, hcb) + def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None): + Interface.__init__(self, host, port, address_callback, history_callback, newblock_callback) self.message_id = 0 self.messages = {} @@ -286,7 +287,7 @@ class TCPInterface(Interface): elif method == 'numblocks.subscribe': self.blocks = result - + apply(self.newblock_callback,(result,)) else: print "received message:", c @@ -351,10 +352,10 @@ def new_interface(wallet): elif port == 50001: interface = TCPInterface(host, port, address_cb, history_cb) elif port in [80, 81, 8080, 8081]: - interface = HttpInterface(host, port, address_cb, history_cb) + interface = HttpInterface(host, port, address_cb, history_cb) else: print "unknown port number: %d. using native protocol."%port - interface = NativeInterface(host,port) + interface = NativeInterface(host, port, address_cb, history_cb) return interface From 65f208e733027981cdf42bf0d1cc9163c88a6cd9 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 12:52:16 +0100 Subject: [PATCH 05/12] minor --- client/blocks | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/blocks b/client/blocks index f1a83fe3..92153b2b 100755 --- a/client/blocks +++ b/client/blocks @@ -6,7 +6,7 @@ def cb(block_number): print block_number i = interface.TCPInterface('ecdsa.org', 50001, newblock_callback=cb) -i.start_session([],"zob") +i.start_session([],"blocks") while True: time.sleep(1) From e21c463923a057d5cab60c167822efc0566949f2 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 13:12:16 +0100 Subject: [PATCH 06/12] rename class --- client/blocks | 2 +- client/interface.py | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/client/blocks b/client/blocks index 92153b2b..7971b1c7 100755 --- a/client/blocks +++ b/client/blocks @@ -5,7 +5,7 @@ import socket, time, interface def cb(block_number): print block_number -i = interface.TCPInterface('ecdsa.org', 50001, newblock_callback=cb) +i = interface.AsynchronousInterface('ecdsa.org', 50001, newblock_callback=cb) i.start_session([],"blocks") while True: diff --git a/client/interface.py b/client/interface.py index 8a2d461d..118f6bac 100644 --- a/client/interface.py +++ b/client/interface.py @@ -210,7 +210,7 @@ class HttpInterface(PollingInterface): import threading -class TCPInterface(Interface): +class AsynchronousInterface(Interface): """json-rpc over persistent TCP connection, asynchronous""" def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None): @@ -287,7 +287,7 @@ class TCPInterface(Interface): elif method == 'numblocks.subscribe': self.blocks = result - apply(self.newblock_callback,(result,)) + if self.newblock_callback: apply(self.newblock_callback,(result,)) else: print "received message:", c @@ -348,14 +348,16 @@ def new_interface(wallet): history_cb = wallet.receive_history_callback if port == 50000: - interface = NativeInterface(host, port, address_cb, history_cb) + InterfaceClass = NativeInterface elif port == 50001: - interface = TCPInterface(host, port, address_cb, history_cb) + InterfaceClass = AsynchronousInterface elif port in [80, 81, 8080, 8081]: - interface = HttpInterface(host, port, address_cb, history_cb) + InterfaceClass = HttpInterface else: print "unknown port number: %d. using native protocol."%port - interface = NativeInterface(host, port, address_cb, history_cb) + InterfaceClass = NativeInterface + + interface = InterfaceClass(host, port, address_cb, history_cb) return interface From afd11ea9c101a23b722d788dfcea6c3104e69968 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 15:46:24 +0100 Subject: [PATCH 07/12] minor --- client/interface.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/interface.py b/client/interface.py index 118f6bac..ade4d3f5 100644 --- a/client/interface.py +++ b/client/interface.py @@ -52,7 +52,7 @@ class Interface: def get_servers(self): pass - def start_session(self, wallet): + def start_session(self, addresses, version): pass @@ -320,7 +320,7 @@ class AsynchronousInterface(Interface): def get_history(self, addr): self.send('address.get_history', [addr]) - self.addresses_waiting_for_history.append(addr) + self.addresses_waiting_for_history.append(addr) def start_session(self, addresses, version): self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) From 10daacf353ede60ca4dec4c8b458e0536c01bb60 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 16:02:28 +0100 Subject: [PATCH 08/12] send many --- client/interface.py | 42 ++++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/client/interface.py b/client/interface.py index ade4d3f5..ff4662d5 100644 --- a/client/interface.py +++ b/client/interface.py @@ -225,12 +225,6 @@ class AsynchronousInterface(Interface): self.up_to_date_event = threading.Event() self.up_to_date_event.clear() - def send(self, method, params = []): - request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } ) - self.messages[self.message_id] = (method, params) - self.s.send( request + '\n' ) - self.message_id += 1 - def listen_thread(self): try: self.is_connected = True @@ -305,21 +299,34 @@ class AsynchronousInterface(Interface): def update_wallet(self,cb): self.up_to_date_event.wait() + def send(self, messages): + out = '' + for m in messages: + method, params = m + request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } ) + self.messages[self.message_id] = (method, params) + self.message_id += 1 + out += request + '\n' + self.s.send( out ) + def send_tx(self, data): self.tx_event.clear() - self.send('transaction.broadcast', [data] ) + self.send([('transaction.broadcast', [data])]) self.tx_event.wait() return self.tx_result - def subscribe(self, address): - self.send('address.subscribe', [address]) - self.addresses_waiting_for_status.append(address) - + def subscribe(self, addresses): + messages = [] + for addr in addresses: + messages.append(('address.subscribe', [addr])) + self.addresses_waiting_for_status.append(addr) + self.send(messages) + def get_servers(self): - self.send('server.peers') + self.send([('server.peers',[])]) def get_history(self, addr): - self.send('address.get_history', [addr]) + self.send([('address.get_history', [addr])]) self.addresses_waiting_for_history.append(addr) def start_session(self, addresses, version): @@ -327,13 +334,8 @@ class AsynchronousInterface(Interface): self.s.settimeout(1) self.s.connect(( self.host, self.port)) thread.start_new_thread(self.listen_thread, ()) - self.send('client.version', [version]) - self.send('server.banner') - self.send('numblocks.subscribe') - for addr in addresses: - self.subscribe(addr) - - + self.send([('client.version', [version]), ('server.banner',[]), ('numblocks.subscribe',[])]) + self.subscribe(addresses) From 6145c21c958367938bd8fd9ef75fa552739ae5e1 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 16:17:11 +0100 Subject: [PATCH 09/12] fix --- client/interface.py | 7 ++++--- client/wallet.py | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/client/interface.py b/client/interface.py index ff4662d5..0e78050e 100644 --- a/client/interface.py +++ b/client/interface.py @@ -80,9 +80,10 @@ class PollingInterface(Interface): apply(self.history_callback, (addr, data) ) self.was_updated = True - def subscribe(self, addr): - status = self.handler('address.subscribe', [ self.session_id, addr ] ) - apply(self.address_callback, (addr, status) ) + def subscribe(self, addresses): + for addr in addresses: + status = self.handler('address.subscribe', [ self.session_id, addr ] ) + apply(self.address_callback, (addr, status) ) def update_wallet(self): while True: diff --git a/client/wallet.py b/client/wallet.py index e8954eeb..e9fc579c 100644 --- a/client/wallet.py +++ b/client/wallet.py @@ -456,7 +456,7 @@ class Wallet: def create_new_address(self, bool): address = self.create_new_address_without_history(bool) - self.interface.subscribe(address) + self.interface.subscribe([address]) return address @@ -701,6 +701,7 @@ class Wallet: def receive_status_callback(self, addr, status): if self.status.get(addr) != status: + #print "updating status for", addr self.status[addr] = status self.interface.get_history(addr) From a9cbe479dcd356686e23242edc81395a850a6b78 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 16:24:43 +0100 Subject: [PATCH 10/12] deprecate session.update --- client/interface.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/client/interface.py b/client/interface.py index 0e78050e..f8623e81 100644 --- a/client/interface.py +++ b/client/interface.py @@ -64,10 +64,6 @@ class PollingInterface(Interface): self.session_id, self.message = ast.literal_eval( out ) thread.start_new_thread(self.poll_thread, ()) - def update_session(self, addresses): - out = self.handler('session.update', [ self.session_id, addresses ] ) - return out - def poll_interval(self): return 5 @@ -161,7 +157,6 @@ class NativeInterface(PollingInterface): cmds = {'session.new':'new_session', 'peers':'peers', 'session.poll':'poll', - 'session.update':'update_session', 'transaction.broadcast':'tx', 'address.get_history':'h', 'address.subscribe':'address.subscribe' From 1603dc50d0e5fc22fdd11009a0b0727b8672c4c1 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 16:50:24 +0100 Subject: [PATCH 11/12] rename --- client/interface.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/interface.py b/client/interface.py index f8623e81..9c602e34 100644 --- a/client/interface.py +++ b/client/interface.py @@ -136,7 +136,7 @@ class PollingInterface(Interface): for server in DEFAULT_SERVERS: try: self.peers_server = server - out = self.handler('peers') + out = self.handler('server.peers') self.servers = map( lambda x:x[1], out ) # print "Received server list from %s" % self.peers_server, out break @@ -155,7 +155,7 @@ class NativeInterface(PollingInterface): def handler(self, method, params = ''): import time cmds = {'session.new':'new_session', - 'peers':'peers', + 'server.peers':'peers', 'session.poll':'poll', 'transaction.broadcast':'tx', 'address.get_history':'h', @@ -190,7 +190,7 @@ class HttpInterface(PollingInterface): t1 = time.time() data = { 'method':method, 'id':'jsonrpc', 'params':params } data_json = json.dumps(data) - host = 'http://%s:%d'%( self.host if method!='peers' else self.peers_server, self.port ) + host = 'http://%s:%d'%( self.host if method!='server.peers' else self.peers_server, self.port ) req = urllib2.Request(host, data_json, {'content-type': 'application/json'}) response_stream = urllib2.urlopen(req) response = json.loads( response_stream.read() ) From f3128b95f9886f1d505c1685c79564ecc6983379 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sat, 17 Mar 2012 23:32:36 +0100 Subject: [PATCH 12/12] refactoring interfaces --- client/interface.py | 250 +++++++++++++++++++++++++------------------- client/wallet.py | 4 +- 2 files changed, 145 insertions(+), 109 deletions(-) diff --git a/client/interface.py b/client/interface.py index 9c602e34..c86d0199 100644 --- a/client/interface.py +++ b/client/interface.py @@ -45,6 +45,14 @@ class Interface: self.disconnected_event = threading.Event() self.disconnected_event.clear() + #only asynchrnous + self.addresses_waiting_for_status = [] + self.addresses_waiting_for_history = [] + self.tx_event = threading.Event() + self.up_to_date_event = threading.Event() + self.up_to_date_event.clear() + + def send_tx(self, data): out = self.handler('transaction.broadcast', data ) return out @@ -56,42 +64,94 @@ class Interface: pass + def handle_json_response(self, c): + #print c + msg_id = c.get('id') + result = c.get('result') + error = c.get('error') + if msg_id is None: + print "error: message without ID" + return + + method, params = self.messages[msg_id] + if error: + print "received error:", c, method, params + else: + self.handle_response(method, params, result) + + + + def handle_response(self, method, params, result): + + if method == 'session.new': + self.session_id, self.message = ast.literal_eval( result ) + self.was_updated = True + + elif method == 'server.banner': + self.message = result + self.was_updated = True + + elif method == 'session.poll': + blocks, changed_addresses = ast.literal_eval( result ) + if blocks == -1: raise BaseException("session not found") + self.blocks = int(blocks) + if changed_addresses: + self.is_up_to_date = False + self.was_updated = True + for addr, status in changed_addresses.items(): + apply(self.address_callback, (addr, status)) + else: + self.is_up_to_date = True + + elif method == 'server.peers': + self.servers = map( lambda x:x[1], result ) + + elif method == 'address.subscribe': + addr = params[-1] + if addr in self.addresses_waiting_for_status: + self.addresses_waiting_for_status.remove(addr) + apply(self.address_callback,(addr, result)) + + elif method == 'address.get_history': + addr = params[0] + if addr in self.addresses_waiting_for_history: + self.addresses_waiting_for_history.remove(addr) + apply(self.history_callback, (addr, result)) + self.was_updated = True + + elif method == 'transaction.broadcast': + self.tx_result = result + self.tx_event.set() + + elif method == 'numblocks.subscribe': + self.blocks = result + if self.newblock_callback: apply(self.newblock_callback,(result,)) + else: + print "received message:", c, method, params + + + class PollingInterface(Interface): """ non-persistent connection. synchronous calls""" def start_session(self, addresses, version): - out = self.handler('session.new', [ version, addresses ] ) - self.session_id, self.message = ast.literal_eval( out ) + self.handler([('session.new', [ version, addresses ])] ) thread.start_new_thread(self.poll_thread, ()) def poll_interval(self): return 5 - def retrieve_history(self, address): - out = self.handler('address.get_history', address ) - return out - - def get_history(self, addr): - data = self.retrieve_history(addr) - apply(self.history_callback, (addr, data) ) - self.was_updated = True + def get_history(self, address): + self.handler([('address.get_history', [address] )]) def subscribe(self, addresses): for addr in addresses: - status = self.handler('address.subscribe', [ self.session_id, addr ] ) - apply(self.address_callback, (addr, status) ) + self.handler([('address.subscribe', [ self.session_id, addr ] )]) def update_wallet(self): while True: - changed_addresses = self.poll() - if changed_addresses: - self.is_up_to_date = False - else: - self.is_up_to_date = True - break - - for addr, status in changed_addresses.items(): - apply(self.address_callback, (addr, status)) + self.handler([('session.poll', self.session_id )]) + if self.is_up_to_date: break #if is_new or wallet.remote_url: # self.was_updated = True @@ -102,13 +162,6 @@ class PollingInterface(Interface): #else: # return False - def poll(self): - out = self.handler('session.poll', self.session_id ) - blocks, changed_addr = ast.literal_eval( out ) - if blocks == -1: raise BaseException("session not found") - self.blocks = int(blocks) - return changed_addr - def poll_thread(self): while self.is_connected: try: @@ -136,8 +189,7 @@ class PollingInterface(Interface): for server in DEFAULT_SERVERS: try: self.peers_server = server - out = self.handler('server.peers') - self.servers = map( lambda x:x[1], out ) + self.handler([('server.peers',[])]) # print "Received server list from %s" % self.peers_server, out break except socket.timeout: @@ -150,9 +202,12 @@ class PollingInterface(Interface): time.sleep(5*60) + + + class NativeInterface(PollingInterface): - def handler(self, method, params = ''): + def handler(self, messages): import time cmds = {'session.new':'new_session', 'server.peers':'peers', @@ -161,45 +216,72 @@ class NativeInterface(PollingInterface): 'address.get_history':'h', 'address.subscribe':'address.subscribe' } - cmd = cmds[method] - if type(params) != type(''): params = repr( params ) - t1 = time.time() - request = repr ( (cmd, params) ) + "#" - s = socket.socket( socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(DEFAULT_TIMEOUT) - s.connect(( self.host if cmd!='peers' else self.peers_server, self.port) ) - s.send( request ) - out = '' - while 1: - msg = s.recv(1024) - if msg: out += msg - else: break - s.close() - self.rtime = time.time() - t1 - self.is_connected = True - if cmd in[ 'peers','h']: - out = ast.literal_eval( out ) - return out + + for m in messages: + method, params = m + cmd = cmds[method] + + if cmd=='h': + str_params = params[0] + elif type(params) != type(''): + str_params = repr( params ) + else: + str_params = params + t1 = time.time() + request = repr ( (cmd, str_params) ) + "#" + s = socket.socket( socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(DEFAULT_TIMEOUT) + s.connect(( self.host if cmd!='peers' else self.peers_server, self.port) ) + s.send( request ) + out = '' + while 1: + msg = s.recv(1024) + if msg: out += msg + else: break + s.close() + self.rtime = time.time() - t1 + self.is_connected = True + if cmd in[ 'peers','h']: + out = ast.literal_eval( out ) + + if out=='': out=None #fixme + + self.handle_response(method, params, out) + + + class HttpInterface(PollingInterface): - def handler(self, method, params = []): + def __init__(self, host, port, address_callback=None, history_callback=None, newblock_callback=None): + Interface.__init__(self, host, port, address_callback, history_callback, newblock_callback) + self.message_id = 0 + self.messages = {} + + def handler(self, messages): import urllib2, json, time - if type(params) != type([]): params = [ params ] - t1 = time.time() - data = { 'method':method, 'id':'jsonrpc', 'params':params } + + data = [] + for m in messages: + method, params = m + if type(params) != type([]): params = [params] + t1 = time.time() + data.append( { 'method':method, 'id':self.message_id, 'params':params } ) + self.messages[self.message_id] = (method, params) + self.message_id += 1 + data_json = json.dumps(data) host = 'http://%s:%d'%( self.host if method!='server.peers' else self.peers_server, self.port ) req = urllib2.Request(host, data_json, {'content-type': 'application/json'}) response_stream = urllib2.urlopen(req) response = json.loads( response_stream.read() ) - out = response.get('result') - if not out: - print response + self.rtime = time.time() - t1 self.is_connected = True - return out + + for item in response: + self.handle_json_response(item) @@ -214,13 +296,6 @@ class AsynchronousInterface(Interface): self.message_id = 0 self.messages = {} - self.tx_event = threading.Event() - self.addresses_waiting_for_status = [] - self.addresses_waiting_for_history = [] - # up to date - self.up_to_date_event = threading.Event() - self.up_to_date_event.clear() - def listen_thread(self): try: self.is_connected = True @@ -239,53 +314,13 @@ class AsynchronousInterface(Interface): c = out[0:s] out = out[s+1:] c = json.loads(c) - - #print c - msg_id = c.get('id') - result = c.get('result') - error = c.get('error') - - if msg_id is None: - print "error: message without ID" - continue - - method, params = self.messages[msg_id] - - if method == 'server.banner': - self.message = result - self.was_updated = True - - elif method == 'server.peers': - self.servers = map( lambda x:x[1], result ) - - elif method == 'address.subscribe': - addr = params[0] - if addr in self.addresses_waiting_for_status: - self.addresses_waiting_for_status.remove(addr) - apply(self.address_callback,(addr, result)) - - elif method == 'address.get_history': - addr = params[0] - if addr in self.addresses_waiting_for_history: - self.addresses_waiting_for_history.remove(addr) - apply(self.history_callback, (addr, result)) - self.was_updated = True - - elif method == 'transaction.broadcast': - self.tx_result = result - self.tx_event.set() - - elif method == 'numblocks.subscribe': - self.blocks = result - if self.newblock_callback: apply(self.newblock_callback,(result,)) - else: - print "received message:", c - + self.handle_json_response(c) if self.addresses_waiting_for_status or self.addresses_waiting_for_history: self.is_up_to_date = False else: self.is_up_to_date = True self.up_to_date_event.set() + except: traceback.print_exc(file=sys.stdout) @@ -336,6 +371,7 @@ class AsynchronousInterface(Interface): + def new_interface(wallet): if wallet.host: host = wallet.host diff --git a/client/wallet.py b/client/wallet.py index e9fc579c..f33c9cf1 100644 --- a/client/wallet.py +++ b/client/wallet.py @@ -701,12 +701,12 @@ class Wallet: def receive_status_callback(self, addr, status): if self.status.get(addr) != status: - #print "updating status for", addr + #print "updating status for", addr, repr(self.status.get(addr)), repr(status) self.status[addr] = status self.interface.get_history(addr) def receive_history_callback(self, addr, data): - #print "updating history for", addr + #print "updating history for", addr, repr(data) self.history[addr] = data self.synchronize() self.update_tx_history()