electrum-bitcoinprivate/lib/network.py

489 lines
16 KiB
Python
Raw Normal View History

import threading, time, Queue, os, sys, shutil, random
from util import user_dir, appdata_dir, print_error, print_msg
from bitcoin import *
2013-09-08 08:23:01 -07:00
import interface
from blockchain import Blockchain
2013-09-11 23:41:27 -07:00
DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'}
DEFAULT_SERVERS = {
2014-08-19 05:41:27 -07:00
'electrum.be':DEFAULT_PORTS,
'electrum.drollette.com':{'t':'50001', 's':'50002'},
'erbium1.sytes.net':{'t':'50001', 's':'50002'},
'ecdsa.net':{'t':'50001', 's':'110'},
'eco-electrum.ddns.net':{'t': '50001', 's': '50002', 'h': '80', 'g': '443'},
'electrum0.electricnewyear.net':{'t':'50001', 's':'50002'},
'kirsche.emzy.de':{'t':'50001', 's':'50002', 'h':'8081'},
'electrum2.hachre.de':DEFAULT_PORTS,
'electrum.hsmiths.com':DEFAULT_PORTS,
'EAST.electrum.jdubya.info':DEFAULT_PORTS,
'WEST.electrum.jdubya.info':DEFAULT_PORTS,
'electrum.no-ip.org':{'t':'50001', 's':'50002', 'h':'80', 'g':'443'},
2014-03-13 06:07:36 -07:00
'electrum.thwg.org':DEFAULT_PORTS,
2014-08-19 05:41:27 -07:00
'us.electrum.be':DEFAULT_PORTS,
2013-09-11 23:41:27 -07:00
}
2014-07-30 01:43:15 -07:00
DISCONNECTED_RETRY_INTERVAL = 60
2013-09-11 23:41:27 -07:00
2014-02-11 00:48:02 -08:00
def parse_servers(result):
""" parse servers list into dict format"""
from version import PROTOCOL_VERSION
servers = {}
for item in result:
host = item[1]
out = {}
version = None
pruning_level = '-'
if len(item) > 2:
for v in item[2]:
if re.match("[stgh]\d*", v):
protocol, port = v[0], v[1:]
if port == '': port = DEFAULT_PORTS[protocol]
out[protocol] = port
elif re.match("v(.?)+", v):
version = v[1:]
elif re.match("p\d*", v):
pruning_level = v[1:]
if pruning_level == '': pruning_level = '0'
try:
is_recent = float(version)>=float(PROTOCOL_VERSION)
except Exception:
is_recent = False
if out and is_recent:
out['pruning'] = pruning_level
servers[host] = out
return servers
2013-10-04 01:38:03 -07:00
2013-09-11 23:41:27 -07:00
def filter_protocol(servers, p):
l = []
for k, protocols in servers.items():
if p in protocols:
l.append( ':'.join([k, protocols[p], p]) )
return l
def pick_random_server(p='s'):
return random.choice( filter_protocol(DEFAULT_SERVERS,p) )
2013-10-06 03:28:45 -07:00
from simple_config import SimpleConfig
2013-09-08 08:23:01 -07:00
2014-07-24 14:14:47 -07:00
2013-09-08 08:23:01 -07:00
class Network(threading.Thread):
def __init__(self, config=None):
if config is None:
config = {} # Do not use mutables as default values!
2013-09-08 08:23:01 -07:00
threading.Thread.__init__(self)
self.daemon = True
2013-10-06 03:28:45 -07:00
self.config = SimpleConfig(config) if type(config) == type({}) else config
2013-09-08 08:23:01 -07:00
self.lock = threading.Lock()
self.num_server = 8 if not self.config.get('oneserver') else 0
2013-10-06 03:28:45 -07:00
self.blockchain = Blockchain(self.config, self)
2013-09-08 08:23:01 -07:00
self.interfaces = {}
self.queue = Queue.Queue()
2013-10-05 02:16:09 -07:00
self.protocol = self.config.get('protocol','s')
self.running = False
# Server for addresses and transactions
self.default_server = self.config.get('server')
if not self.default_server:
self.default_server = pick_random_server(self.protocol)
2014-07-25 00:11:56 -07:00
self.irc_servers = {} # returned by interface (list from irc)
2014-07-30 01:43:15 -07:00
self.disconnected_servers = set([])
2014-07-30 01:43:15 -07:00
self.disconnected_time = time.time()
self.recent_servers = self.config.get('recent_servers',[]) # successful connections
self.pending_servers = set()
2013-09-11 23:41:27 -07:00
self.banner = ''
2013-09-13 04:56:33 -07:00
self.interface = None
self.proxy = self.config.get('proxy')
2013-09-15 00:03:45 -07:00
self.heights = {}
2014-01-27 01:06:49 -08:00
self.merkle_roots = {}
self.utxo_roots = {}
2013-09-08 08:23:01 -07:00
dir_path = os.path.join( self.config.path, 'certs')
if not os.path.exists(dir_path):
os.mkdir(dir_path)
2014-07-27 15:13:40 -07:00
# address subscriptions and cached results
self.addresses = {}
self.connection_status = 'connecting'
self.requests_queue = Queue.Queue()
2014-07-27 15:13:40 -07:00
def get_server_height(self):
return self.heights.get(self.default_server,0)
def server_is_lagging(self):
h = self.get_server_height()
if not h:
print_error('no height for main interface')
return False
lag = self.get_local_height() - self.get_server_height()
return lag > 1
2014-07-24 14:14:47 -07:00
def set_status(self, status):
self.connection_status = status
2014-07-30 01:19:15 -07:00
self.notify('status')
2014-07-24 14:14:47 -07:00
2013-10-04 04:51:46 -07:00
def is_connected(self):
return self.interface and self.interface.is_connected
2013-10-02 04:00:02 -07:00
def send_subscriptions(self):
2014-07-27 15:13:40 -07:00
for addr in self.addresses:
self.interface.send_request({'method':'blockchain.address.subscribe', 'params':[addr]})
self.interface.send_request({'method':'server.banner','params':[]})
self.interface.send_request({'method':'server.peers.subscribe','params':[]})
2013-10-02 04:00:02 -07:00
def get_status_value(self, key):
if key == 'status':
value = self.connection_status
elif key == 'banner':
value = self.banner
elif key == 'updated':
value = (self.get_local_height(), self.get_server_height())
elif key == 'servers':
value = self.get_servers()
elif key == 'interfaces':
value = self.get_interfaces()
return value
2014-07-30 01:19:15 -07:00
def notify(self, key):
value = self.get_status_value(key)
self.response_queue.put({'method':'network.status', 'params':[key, value]})
2013-09-08 08:23:01 -07:00
def random_server(self):
2013-09-12 05:58:42 -07:00
choice_list = []
l = filter_protocol(self.get_servers(), self.protocol)
2013-09-12 05:58:42 -07:00
for s in l:
if s in self.pending_servers or s in self.disconnected_servers or s in self.interfaces.keys():
2013-09-12 05:58:42 -07:00
continue
else:
choice_list.append(s)
2013-10-04 04:51:46 -07:00
if not choice_list:
return
2013-09-12 05:58:42 -07:00
server = random.choice( choice_list )
return server
2013-09-08 08:23:01 -07:00
2014-07-25 00:11:56 -07:00
def get_parameters(self):
host, port, protocol = self.default_server.split(':')
proxy = self.proxy
auto_connect = self.config.get('auto_cycle', True)
return host, port, protocol, proxy, auto_connect
def get_interfaces(self):
return self.interfaces.keys()
2013-09-08 08:23:01 -07:00
2013-09-11 23:41:27 -07:00
def get_servers(self):
if self.irc_servers:
out = self.irc_servers
else:
out = DEFAULT_SERVERS
for s in self.recent_servers:
host, port, protocol = s.split(':')
if host not in out:
out[host] = { protocol:port }
return out
2013-09-11 23:41:27 -07:00
def start_interface(self, server):
if server in self.interfaces.keys():
return
2013-10-06 03:28:45 -07:00
i = interface.Interface(server, self.config)
self.pending_servers.add(server)
i.start(self.queue)
2014-07-27 15:13:40 -07:00
return i
def start_random_interface(self):
server = self.random_server()
if server:
self.start_interface(server)
def start_interfaces(self):
self.interface = self.start_interface(self.default_server)
for i in range(self.num_server):
self.start_random_interface()
def start(self, response_queue):
self.running = True
self.response_queue = response_queue
2013-09-09 04:33:25 -07:00
self.start_interfaces()
2014-07-28 06:49:41 -07:00
t = threading.Thread(target=self.process_requests_thread)
t.daemon = True
t.start()
self.blockchain.start()
2014-07-27 15:13:40 -07:00
threading.Thread.start(self)
2013-10-05 01:01:33 -07:00
2013-10-05 02:16:09 -07:00
def set_parameters(self, host, port, protocol, proxy, auto_connect):
self.config.set_key('auto_cycle', auto_connect, True)
2013-10-05 01:01:33 -07:00
self.config.set_key("proxy", proxy, True)
2013-10-05 02:16:09 -07:00
self.config.set_key("protocol", protocol, True)
server = ':'.join([ host, port, protocol ])
2013-10-05 01:01:33 -07:00
self.config.set_key("server", server, True)
if self.proxy != proxy or self.protocol != protocol:
self.proxy = proxy
self.protocol = protocol
for i in self.interfaces.values(): i.stop()
if auto_connect:
#self.interface = None
return
2013-10-05 01:01:33 -07:00
if auto_connect:
2013-12-16 11:04:59 -08:00
if not self.interface.is_connected:
2013-10-05 01:01:33 -07:00
self.switch_to_random_interface()
else:
if self.server_is_lagging():
self.stop_interface()
2013-10-05 01:01:33 -07:00
else:
self.set_server(server)
def switch_to_random_interface(self):
2014-07-29 05:26:19 -07:00
while self.interfaces:
2014-07-29 05:19:23 -07:00
i = random.choice(self.interfaces.values())
if i.is_connected:
self.switch_to_interface(i)
2014-07-29 08:53:31 -07:00
break
2014-07-29 05:19:23 -07:00
else:
self.remove_interface(i)
2013-10-05 01:01:33 -07:00
def switch_to_interface(self, interface):
server = interface.server
print_error("switching to", server)
2013-10-05 01:01:33 -07:00
self.interface = interface
self.config.set_key('server', server, False)
self.default_server = server
2013-10-05 01:01:33 -07:00
self.send_subscriptions()
2014-07-24 14:14:47 -07:00
self.set_status('connected')
2013-10-05 01:01:33 -07:00
2013-10-02 03:13:07 -07:00
def stop_interface(self):
self.interface.stop()
2013-12-17 09:20:54 -08:00
2013-10-02 03:13:07 -07:00
def set_server(self, server):
if self.default_server == server and self.interface.is_connected:
return
2013-10-05 06:31:39 -07:00
if self.protocol != server.split(':')[2]:
return
2013-10-02 03:13:07 -07:00
# stop the interface in order to terminate subscriptions
if self.interface.is_connected:
self.stop_interface()
2013-10-02 03:13:07 -07:00
# notify gui
2014-07-24 14:14:47 -07:00
self.set_status('connecting')
2013-10-02 03:13:07 -07:00
# start interface
2013-09-10 10:59:58 -07:00
self.default_server = server
2013-10-05 01:01:33 -07:00
self.config.set_key("server", server, True)
2013-10-02 04:00:02 -07:00
if server in self.interfaces.keys():
2013-10-05 01:01:33 -07:00
self.switch_to_interface( self.interfaces[server] )
2013-10-02 04:00:02 -07:00
else:
self.interface = self.start_interface(server)
2013-10-02 04:00:02 -07:00
def add_recent_server(self, i):
# list is ordered
s = i.server
if s in self.recent_servers:
self.recent_servers.remove(s)
self.recent_servers.insert(0,s)
self.recent_servers = self.recent_servers[0:20]
self.config.set_key('recent_servers', self.recent_servers)
2013-10-02 04:00:02 -07:00
2013-09-10 10:59:58 -07:00
def add_interface(self, i):
self.interfaces[i.server] = i
self.notify('interfaces')
def remove_interface(self, i):
self.interfaces.pop(i.server)
self.notify('interfaces')
2013-10-04 15:21:48 -07:00
def new_blockchain_height(self, blockchain_height, i):
if self.is_connected():
if self.server_is_lagging():
2014-07-25 11:14:08 -07:00
print_error( "Server is lagging", blockchain_height, self.get_server_height())
if self.config.get('auto_cycle'):
self.set_server(i.server)
2014-07-30 01:19:15 -07:00
self.notify('updated')
2013-10-04 15:21:48 -07:00
2013-09-10 10:59:58 -07:00
2014-07-27 15:13:40 -07:00
def process_response(self, i, response):
method = response['method']
if method == 'blockchain.address.subscribe':
self.on_address(i, response)
elif method == 'blockchain.headers.subscribe':
self.on_header(i, response)
elif method == 'server.peers.subscribe':
self.on_peers(i, response)
2014-07-27 21:42:05 -07:00
elif method == 'server.banner':
self.on_banner(i, response)
2014-07-27 22:53:02 -07:00
else:
self.response_queue.put(response)
2014-07-27 15:13:40 -07:00
def process_requests_thread(self):
while self.is_running():
try:
request = self.requests_queue.get(timeout=0.1)
except Queue.Empty:
continue
2014-07-27 15:13:40 -07:00
self.process_request(request)
2013-09-08 08:23:01 -07:00
2014-07-27 15:13:40 -07:00
def process_request(self, request):
method = request['method']
params = request['params']
_id = request['id']
if method.startswith('network.'):
out = {'id':_id}
try:
f = getattr(self, method[8:])
except AttributeError:
out['error'] = "unknown method"
try:
out['result'] = f(*params)
except BaseException as e:
out['error'] = str(e)
print_error("network error", str(e))
self.response_queue.put(out)
return
2014-07-27 15:13:40 -07:00
if method == 'blockchain.address.subscribe':
addr = params[0]
if addr in self.addresses:
self.response_queue.put({'id':_id, 'result':self.addresses[addr]})
return
2013-09-08 08:23:01 -07:00
2014-07-27 15:13:40 -07:00
self.interface.send_request(request)
def run(self):
2013-09-08 08:23:01 -07:00
while self.is_running():
2013-10-04 01:38:03 -07:00
try:
2014-07-30 01:43:15 -07:00
i, response = self.queue.get(timeout=0.1)
2013-10-04 01:38:03 -07:00
except Queue.Empty:
2014-07-30 01:43:15 -07:00
2014-08-17 02:48:46 -07:00
if len(self.interfaces) + len(self.pending_servers) < self.num_server:
2013-10-04 01:38:03 -07:00
self.start_random_interface()
2014-07-30 01:43:15 -07:00
if not self.interfaces:
if time.time() - self.disconnected_time > DISCONNECTED_RETRY_INTERVAL:
print_error('network: retrying connections')
self.disconnected_servers = set([])
self.disconnected_time = time.time()
2014-09-16 03:21:01 -07:00
if not self.interface.is_connected:
if time.time() - self.disconnected_time > DISCONNECTED_RETRY_INTERVAL:
print_error("forcing reconnection")
self.queue.put((self.interface, None))
self.disconnected_time = time.time()
2013-10-04 01:38:03 -07:00
continue
2014-07-27 15:13:40 -07:00
if response is not None:
self.process_response(i, response)
continue
2013-09-08 08:23:01 -07:00
2014-07-27 15:13:40 -07:00
# if response is None it is a notification about the interface
if i.server in self.pending_servers:
self.pending_servers.remove(i.server)
2013-12-16 11:07:35 -08:00
2013-09-08 08:23:01 -07:00
if i.is_connected:
self.add_interface(i)
self.add_recent_server(i)
2014-07-27 15:13:40 -07:00
i.send_request({'method':'blockchain.headers.subscribe','params':[]})
2013-09-08 08:23:01 -07:00
if i == self.interface:
2013-10-04 02:13:05 -07:00
print_error('sending subscriptions to', self.interface.server)
2013-10-02 04:00:02 -07:00
self.send_subscriptions()
2014-07-24 14:14:47 -07:00
self.set_status('connected')
2013-09-08 08:23:01 -07:00
else:
self.disconnected_servers.add(i.server)
if i.server in self.interfaces:
self.remove_interface(i)
2013-10-04 15:21:48 -07:00
if i.server in self.heights:
self.heights.pop(i.server)
2013-09-08 08:23:01 -07:00
if i == self.interface:
2014-07-24 14:14:47 -07:00
self.set_status('disconnected')
2013-10-04 01:38:03 -07:00
2014-09-14 05:27:39 -07:00
if not self.interface.is_connected:
if self.config.get('auto_cycle'):
self.switch_to_random_interface()
else:
if self.default_server not in self.disconnected_servers:
print_error("restarting main interface")
if self.default_server in self.interfaces.keys():
self.switch_to_interface(self.interfaces[self.default_server])
else:
self.interface = self.start_interface(self.default_server)
2013-10-04 01:38:03 -07:00
2014-07-29 03:13:21 -07:00
print_error("Network: Stopping interfaces")
for i in self.interfaces.values():
i.stop()
2013-10-04 01:38:03 -07:00
2013-09-15 00:03:45 -07:00
def on_header(self, i, r):
result = r.get('result')
if not result:
return
height = result.get('block_height')
if not height:
return
self.heights[i.server] = height
2014-01-27 01:06:49 -08:00
self.merkle_roots[i.server] = result.get('merkle_root')
self.utxo_roots[i.server] = result.get('utxo_root')
# notify blockchain about the new height
2013-09-11 23:41:27 -07:00
self.blockchain.queue.put((i,result))
2013-09-08 08:23:01 -07:00
2013-10-05 01:01:33 -07:00
if i == self.interface:
if self.server_is_lagging() and self.config.get('auto_cycle'):
print_error( "Server lagging, stopping interface")
self.stop_interface()
2014-07-30 01:19:15 -07:00
self.notify('updated')
2013-10-05 01:01:33 -07:00
2013-09-11 23:41:27 -07:00
def on_peers(self, i, r):
2013-09-12 05:58:42 -07:00
if not r: return
2014-02-11 00:48:02 -08:00
self.irc_servers = parse_servers(r.get('result'))
2014-07-30 01:19:15 -07:00
self.notify('servers')
2013-09-08 08:23:01 -07:00
2013-09-11 23:41:27 -07:00
def on_banner(self, i, r):
self.banner = r.get('result')
2014-07-30 01:19:15 -07:00
self.notify('banner')
2013-09-08 08:23:01 -07:00
2014-07-27 15:13:40 -07:00
def on_address(self, i, r):
addr = r.get('params')[0]
result = r.get('result')
self.addresses[addr] = result
self.response_queue.put(r)
2013-09-08 08:23:01 -07:00
def stop(self):
2014-07-29 03:13:21 -07:00
print_error("stopping network")
with self.lock:
self.running = False
2013-09-08 08:23:01 -07:00
def is_running(self):
with self.lock:
return self.running
2013-09-08 08:23:01 -07:00
2014-03-10 12:53:05 -07:00
def get_header(self, tx_height):
return self.blockchain.read_header(tx_height)
def get_local_height(self):
2014-03-11 01:38:08 -07:00
return self.blockchain.height()
2014-03-10 12:53:05 -07:00