wallet synchronizer thread

This commit is contained in:
ThomasV 2012-03-31 11:47:16 +02:00
parent 96eaf00af8
commit c3bbc35fa8
3 changed files with 188 additions and 166 deletions

View File

@ -20,12 +20,9 @@ import re, sys, getpass
from optparse import OptionParser
from wallet import Wallet, SecretToASecret
from interface import WalletSynchronizer
from decimal import Decimal
import thread
from wallet import format_satoshis
from interface import loop_interfaces_thread
known_commands = ['help', 'validateaddress', 'balance', 'contacts', 'create', 'restore', 'payto', 'sendtx', 'password', 'addresses', 'history', 'label', 'mktx','seed','import','signmessage','verifymessage','eval']
offline_commands = ['password', 'mktx', 'history', 'label', 'contacts', 'help', 'validateaddress', 'signmessage', 'verifymessage', 'eval', 'create', 'addresses', 'import', 'seed']
@ -71,7 +68,7 @@ if __name__ == '__main__':
exit(1)
gui = gui.ElectrumGui(wallet)
thread.start_new_thread(loop_interfaces_thread, (wallet,))
WalletSynchronizer(wallet,True).start()
try:
found = wallet.file_exists
@ -138,11 +135,9 @@ if __name__ == '__main__':
sys.exit(1)
wallet.seed = str(seed)
wallet.start_interface()
WalletSynchronizer(wallet).start()
print "recovering wallet..."
wallet.init_mpk( wallet.seed )
wallet.start_interface()
thread.start_new_thread(wallet.run, ())
wallet.update()
if wallet.is_found():
wallet.fill_addressbook()
@ -175,8 +170,7 @@ if __name__ == '__main__':
# open session
if cmd not in offline_commands:
wallet.start_interface()
thread.start_new_thread(wallet.run, ())
WalletSynchronizer(wallet).start()
wallet.update()
wallet.save()

View File

@ -17,8 +17,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random, socket, ast
import thread, threading, traceback, sys, time, json, Queue
import random, socket, ast, re
import threading, traceback, sys, time, json, Queue
DEFAULT_TIMEOUT = 5
DEFAULT_SERVERS = ['ecdsa.org:50001:t'] # ['electrum.bitcoins.sk','ecdsa.org','electrum.novit.ro'] # list of default servers
@ -33,8 +33,10 @@ def old_to_new(s):
return s
class Interface:
class Interface(threading.Thread):
def __init__(self, host, port):
threading.Thread.__init__(self)
self.daemon = True
self.host = host
self.port = port
@ -98,7 +100,6 @@ class Interface:
def start_session(self, addresses, version):
#print "Starting new session: %s:%d"%(self.host,self.port)
self.start()
self.send([('server.version', [version]), ('server.banner',[]), ('blockchain.numblocks.subscribe',[]), ('server.peers.subscribe',[])])
self.subscribe(addresses)
@ -106,13 +107,15 @@ class Interface:
class PollingInterface(Interface):
""" non-persistent connection. synchronous calls"""
def __init__(self, host, port):
Interface.__init__(self, host, port)
self.session_id = None
def get_history(self, address):
self.send([('blockchain.address.get_history', [address] )])
def poll(self):
self.send([('session.poll', [])])
pass
#if is_new or wallet.remote_url:
# self.was_updated = True
# is_new = wallet.synchronize()
@ -122,10 +125,12 @@ class PollingInterface(Interface):
#else:
# return False
def poll_thread(self):
def run(self):
self.is_connected = True
while self.is_connected:
try:
self.poll()
if self.session_id:
self.poll()
time.sleep(self.poll_interval)
except socket.gaierror:
break
@ -136,7 +141,7 @@ class PollingInterface(Interface):
break
self.is_connected = False
self.responses.put(None)
self.poke()
@ -148,7 +153,9 @@ class NativeInterface(PollingInterface):
def start_session(self, addresses, version):
self.send([('session.new', [ version, addresses ])] )
self.send([('server.peers.subscribe',[])])
thread.start_new_thread(self.poll_thread, ())
def poll(self):
self.send([('session.poll', [])])
def send(self, messages):
import time
@ -211,13 +218,8 @@ class NativeInterface(PollingInterface):
class HttpInterface(PollingInterface):
def start(self):
self.session_id = None
thread.start_new_thread(self.poll_thread, ())
def poll(self):
if self.session_id:
self.send( [] )
self.send([])
def send(self, messages):
import urllib2, json, time, cookielib
@ -278,13 +280,25 @@ class HttpInterface(PollingInterface):
class AsynchronousInterface(Interface):
"""json-rpc over persistent TCP connection, asynchronous"""
def listen_thread(self):
def __init__(self, host, port):
Interface.__init__(self, host, port)
self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
self.s.settimeout(5)
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
try:
self.s.connect(( self.host, self.port))
self.is_connected = True
except:
self.is_connected = False
print "not connected"
def run(self):
try:
out = ''
while self.is_connected:
try: msg = self.s.recv(1024)
except socket.timeout: continue
except socket.timeout:
continue
out += msg
if msg == '':
self.is_connected = False
@ -316,31 +330,149 @@ class AsynchronousInterface(Interface):
def get_history(self, addr):
self.send([('blockchain.address.get_history', [addr])])
def start(self):
self.s = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
self.s.settimeout(5)
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.s.connect(( self.host, self.port))
thread.start_new_thread(self.listen_thread, ())
class WalletSynchronizer(threading.Thread):
def __init__(self, wallet, loop=False):
threading.Thread.__init__(self)
self.daemon = True
self.wallet = wallet
self.loop = loop
self.start_interface()
def loop_interfaces_thread(wallet):
while True:
def handle_response(self, r):
if r is None:
return
method = r['method']
params = r['params']
result = r['result']
if method == 'server.banner':
self.wallet.banner = result
self.wallet.was_updated = True
elif method == 'session.poll':
# native poll
blocks, changed_addresses = result
if blocks == -1: raise BaseException("session not found")
self.wallet.blocks = int(blocks)
if changed_addresses:
self.wallet.was_updated = True
for addr, status in changed_addresses.items():
self.wallet.receive_status_callback(addr, status)
elif method == 'server.peers.subscribe':
servers = []
for item in result:
s = []
host = item[1]
if len(item)>2:
for v in item[2]:
if re.match("[thn]\d+",v):
s.append(host+":"+v[1:]+":"+v[0])
#if not s:
# s.append(host+":50000:n")
#else:
# s.append(host+":50000:n")
servers = servers + s
self.interface.servers = servers
elif method == 'blockchain.address.subscribe':
addr = params[-1]
self.wallet.receive_status_callback(addr, result)
elif method == 'blockchain.address.get_history':
addr = params[0]
self.wallet.receive_history_callback(addr, result)
self.wallet.was_updated = True
elif method == 'blockchain.transaction.broadcast':
self.wallet.tx_result = result
self.wallet.tx_event.set()
elif method == 'blockchain.numblocks.subscribe':
self.wallet.blocks = result
elif method == 'server.version':
pass
else:
print "unknown message:", method, params, result
def start_interface(self):
try:
wallet.start_interface()
wallet.run()
except socket.error:
print "socket error"
wallet.interface.is_connected = False
time.sleep(5)
host, port, protocol = self.wallet.server.split(':')
port = int(port)
except:
traceback.print_exc(file=sys.stdout)
wallet.interface.is_connected = False
time.sleep(5)
continue
self.wallet.pick_random_server()
host, port, protocol = self.wallet.server.split(':')
port = int(port)
#print protocol, host, port
if protocol == 'n':
InterfaceClass = NativeInterface
elif protocol == 't':
InterfaceClass = AsynchronousInterface
elif protocol == 'h':
InterfaceClass = HttpInterface
else:
print "unknown protocol"
InterfaceClass = NativeInterface
self.interface = InterfaceClass(host, port)
self.wallet.interface = self.interface
with self.wallet.lock:
self.wallet.addresses_waiting_for_status = []
self.wallet.addresses_waiting_for_history = []
addresses = self.wallet.all_addresses()
version = self.wallet.electrum_version
for addr in addresses:
self.wallet.addresses_waiting_for_status.append(addr)
try:
self.interface.start()
self.interface.start_session(addresses,version)
except:
self.interface.is_connected = False
def run(self):
import socket, time
while True:
try:
while self.interface.is_connected:
new_addresses = self.wallet.synchronize()
if new_addresses:
self.interface.subscribe(new_addresses)
for addr in new_addresses:
with self.wallet.lock:
self.wallet.addresses_waiting_for_status.append(addr)
if self.wallet.is_up_to_date():
self.wallet.up_to_date = True
self.wallet.up_to_date_event.set()
else:
self.wallet.up_to_date = False
response = self.interface.responses.get(True,100000000000) # workaround so that it can be keyboard interrupted
self.handle_response(response)
except socket.error:
print "socket error"
wallet.interface.is_connected = False
if self.loop:
time.sleep(5)
self.start_interface()
continue
else:
break

View File

@ -271,7 +271,7 @@ class Wallet:
self.up_to_date_event = threading.Event()
self.up_to_date_event.clear()
self.up_to_date = False
self.interface_lock = threading.Lock()
self.lock = threading.Lock()
self.tx_event = threading.Event()
#
@ -571,7 +571,7 @@ class Wallet:
self.fee = int( d.get('fee') )
self.seed = d.get('seed')
self.server = d.get('server')
blocks = d.get('blocks')
#blocks = d.get('blocks')
self.addresses = d.get('addresses')
self.change_addresses = d.get('change_addresses')
self.history = d.get('history')
@ -703,18 +703,21 @@ class Wallet:
return status
def receive_status_callback(self, addr, status):
if self.get_status(addr) != status:
#print "updating status for", addr, status
self.addresses_waiting_for_history.append(addr)
self.interface.get_history(addr)
if addr in self.addresses_waiting_for_status: self.addresses_waiting_for_status.remove(addr)
with self.lock:
if self.get_status(addr) != status:
#print "updating status for", addr, status
self.addresses_waiting_for_history.append(addr)
self.interface.get_history(addr)
if addr in self.addresses_waiting_for_status:
self.addresses_waiting_for_status.remove(addr)
def receive_history_callback(self, addr, data):
def receive_history_callback(self, addr, data):
#print "updating history for", addr
self.history[addr] = data
self.update_tx_history()
self.save()
if addr in self.addresses_waiting_for_history: self.addresses_waiting_for_history.remove(addr)
with self.lock:
self.history[addr] = data
self.update_tx_history()
self.save()
if addr in self.addresses_waiting_for_history: self.addresses_waiting_for_history.remove(addr)
def get_tx_history(self):
lines = self.tx_history.values()
@ -929,116 +932,9 @@ class Wallet:
return address, amount, label, message, signature, identity, url
def handle_response(self, r):
if r is None:
return
method = r['method']
params = r['params']
result = r['result']
if method == 'server.banner':
self.banner = result
self.was_updated = True
elif method == 'session.poll':
# native poll
blocks, changed_addresses = result
if blocks == -1: raise BaseException("session not found")
self.blocks = int(blocks)
if changed_addresses:
self.was_updated = True
for addr, status in changed_addresses.items():
self.receive_status_callback(addr, status)
elif method == 'server.peers.subscribe':
servers = []
for item in result:
s = []
host = item[1]
if len(item)>2:
for v in item[2]:
if re.match("[thn]\d+",v):
s.append(host+":"+v[1:]+":"+v[0])
#if not s:
# s.append(host+":50000:n")
#else:
# s.append(host+":50000:n")
servers = servers + s
self.interface.servers = servers
elif method == 'blockchain.address.subscribe':
addr = params[-1]
self.receive_status_callback(addr, result)
elif method == 'blockchain.address.get_history':
addr = params[0]
self.receive_history_callback(addr, result)
self.was_updated = True
elif method == 'blockchain.transaction.broadcast':
self.tx_result = result
self.tx_event.set()
elif method == 'blockchain.numblocks.subscribe':
self.blocks = result
elif method == 'server.version':
pass
else:
print "unknown message:", method, params, result
def update(self):
self.interface.poke()
self.up_to_date_event.wait()
def run(self):
while self.interface.is_connected:
new_addresses = self.synchronize()
if new_addresses:
self.interface.subscribe(new_addresses)
for addr in new_addresses:
self.addresses_waiting_for_status.append(addr)
if self.is_up_to_date():
self.up_to_date = True
self.up_to_date_event.set()
else:
self.up_to_date = False
response = self.interface.responses.get(True,100000000000) # workaround so that it can be keyboard interrupted
self.handle_response(response)
def start_interface(self):
try:
host, port, protocol = self.server.split(':')
port = int(port)
except:
self.pick_random_server()
host, port, protocol = self.server.split(':')
port = int(port)
if protocol == 'n':
InterfaceClass = NativeInterface
elif protocol == 't':
InterfaceClass = AsynchronousInterface
elif protocol == 'h':
InterfaceClass = HttpInterface
else:
print "unknown protocol"
InterfaceClass = NativeInterface
self.interface = InterfaceClass(host, port)
addresses = self.all_addresses()
version = self.electrum_version
for addr in addresses:
self.addresses_waiting_for_status.append(addr)
self.interface.start_session(addresses,version)