electrum-bitcoinprivate/lib/network.py

545 lines
18 KiB
Python
Raw Normal View History

import threading
import time
import Queue
import os
import sys
import random
import traceback
import socks
import socket
import util
from bitcoin import *
2013-09-08 08:23:01 -07:00
import interface
from blockchain import Blockchain
2013-09-11 23:41:27 -07:00
DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'}
DEFAULT_SERVERS = {
2014-08-19 05:41:27 -07:00
'electrum.be':DEFAULT_PORTS,
'electrum.drollette.com':{'t':'50001', 's':'50002'},
'erbium1.sytes.net':{'t':'50001', 's':'50002'},
'ecdsa.net':{'t':'50001', 's':'110'},
'eco-electrum.ddns.net':{'t': '50001', 's': '50002', 'h': '80', 'g': '443'},
'electrum0.electricnewyear.net':{'t':'50001', 's':'50002'},
'kirsche.emzy.de':{'t':'50001', 's':'50002', 'h':'8081'},
'electrum2.hachre.de':DEFAULT_PORTS,
'electrum.hsmiths.com':DEFAULT_PORTS,
'EAST.electrum.jdubya.info':DEFAULT_PORTS,
'WEST.electrum.jdubya.info':DEFAULT_PORTS,
'electrum.no-ip.org':{'t':'50001', 's':'50002', 'h':'80', 'g':'443'},
2014-03-13 06:07:36 -07:00
'electrum.thwg.org':DEFAULT_PORTS,
2014-08-19 05:41:27 -07:00
'us.electrum.be':DEFAULT_PORTS,
2013-09-11 23:41:27 -07:00
}
2014-07-30 01:43:15 -07:00
DISCONNECTED_RETRY_INTERVAL = 60
2013-09-11 23:41:27 -07:00
2014-02-11 00:48:02 -08:00
def parse_servers(result):
""" parse servers list into dict format"""
from version import PROTOCOL_VERSION
servers = {}
for item in result:
host = item[1]
out = {}
version = None
pruning_level = '-'
if len(item) > 2:
for v in item[2]:
if re.match("[stgh]\d*", v):
protocol, port = v[0], v[1:]
if port == '': port = DEFAULT_PORTS[protocol]
out[protocol] = port
elif re.match("v(.?)+", v):
version = v[1:]
elif re.match("p\d*", v):
pruning_level = v[1:]
if pruning_level == '': pruning_level = '0'
try:
2014-02-11 00:48:02 -08:00
is_recent = float(version)>=float(PROTOCOL_VERSION)
except Exception:
is_recent = False
if out and is_recent:
out['pruning'] = pruning_level
servers[host] = out
return servers
2013-10-04 01:38:03 -07:00
2013-09-11 23:41:27 -07:00
def filter_protocol(servers, p):
l = []
for k, protocols in servers.items():
if p in protocols:
s = serialize_server(k, protocols[p], p)
l.append(s)
2013-09-11 23:41:27 -07:00
return l
2013-09-11 23:41:27 -07:00
def pick_random_server(p='s'):
return random.choice( filter_protocol(DEFAULT_SERVERS,p) )
2013-10-06 03:28:45 -07:00
from simple_config import SimpleConfig
2013-09-08 08:23:01 -07:00
proxy_modes = ['socks4', 'socks5', 'http']
def serialize_proxy(p):
if type(p) != dict:
return None
return ':'.join([p.get('mode'),p.get('host'), p.get('port')])
def deserialize_proxy(s):
if type(s) != str:
return None
if s.lower() == 'none':
return None
proxy = { "mode":"socks5", "host":"localhost" }
args = s.split(':')
n = 0
if proxy_modes.count(args[n]) == 1:
proxy["mode"] = args[n]
n += 1
if len(args) > n:
proxy["host"] = args[n]
n += 1
if len(args) > n:
proxy["port"] = args[n]
else:
proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
return proxy
2014-07-24 14:14:47 -07:00
def deserialize_server(server_str):
host, port, protocol = str(server_str).split(':')
assert protocol in 'st'
int(port)
return host, port, protocol
def serialize_server(host, port, protocol):
return str(':'.join([host, port, protocol]))
2014-07-24 14:14:47 -07:00
2015-03-13 15:04:29 -07:00
class Network(util.DaemonThread):
2013-09-08 08:23:01 -07:00
def __init__(self, config=None):
if config is None:
config = {} # Do not use mutables as default values!
2015-03-13 15:04:29 -07:00
util.DaemonThread.__init__(self)
2013-10-06 03:28:45 -07:00
self.config = SimpleConfig(config) if type(config) == type({}) else config
2013-09-08 08:23:01 -07:00
self.lock = threading.Lock()
self.num_server = 8 if not self.config.get('oneserver') else 0
2013-10-06 03:28:45 -07:00
self.blockchain = Blockchain(self.config, self)
2013-09-08 08:23:01 -07:00
self.interfaces = {}
self.queue = Queue.Queue()
# Server for addresses and transactions
self.default_server = self.config.get('server')
2015-02-25 08:14:31 -08:00
# Sanitize default server
try:
deserialize_server(self.default_server)
2015-02-25 08:14:31 -08:00
except:
self.default_server = None
if not self.default_server:
2015-02-28 06:37:50 -08:00
self.default_server = pick_random_server('s')
self.protocol = deserialize_server(self.default_server)[2]
2014-07-25 00:11:56 -07:00
self.irc_servers = {} # returned by interface (list from irc)
2014-07-30 01:43:15 -07:00
self.disconnected_servers = set([])
2014-07-30 01:43:15 -07:00
self.recent_servers = self.config.get('recent_servers',[]) # successful connections
self.pending_servers = set()
2013-09-11 23:41:27 -07:00
self.banner = ''
2013-09-13 04:56:33 -07:00
self.interface = None
2013-09-15 00:03:45 -07:00
self.heights = {}
2014-01-27 01:06:49 -08:00
self.merkle_roots = {}
self.utxo_roots = {}
2013-09-08 08:23:01 -07:00
dir_path = os.path.join( self.config.path, 'certs')
if not os.path.exists(dir_path):
os.mkdir(dir_path)
2014-07-27 15:13:40 -07:00
# address subscriptions and cached results
self.addresses = {}
self.connection_status = 'connecting'
self.requests_queue = Queue.Queue()
self.set_proxy(deserialize_proxy(self.config.get('proxy')))
def print_error(self, *msg):
util.print_error("[network]", *msg)
def get_server_height(self):
return self.heights.get(self.default_server, 0)
def server_is_lagging(self):
h = self.get_server_height()
if not h:
self.print_error('no height for main interface')
return False
lag = self.get_local_height() - self.get_server_height()
return lag > 1
2014-07-24 14:14:47 -07:00
def set_status(self, status):
self.connection_status = status
2014-07-30 01:19:15 -07:00
self.notify('status')
2014-07-24 14:14:47 -07:00
2013-10-04 04:51:46 -07:00
def is_connected(self):
return self.interface and self.interface.is_connected
2013-10-02 04:00:02 -07:00
def send_subscriptions(self):
2014-07-27 15:13:40 -07:00
for addr in self.addresses:
self.interface.send_request({'method':'blockchain.address.subscribe', 'params':[addr]})
self.interface.send_request({'method':'server.banner','params':[]})
self.interface.send_request({'method':'server.peers.subscribe','params':[]})
2013-10-02 04:00:02 -07:00
def get_status_value(self, key):
if key == 'status':
value = self.connection_status
elif key == 'banner':
value = self.banner
elif key == 'updated':
value = (self.get_local_height(), self.get_server_height())
elif key == 'servers':
value = self.get_servers()
elif key == 'interfaces':
value = self.get_interfaces()
return value
2014-07-30 01:19:15 -07:00
def notify(self, key):
value = self.get_status_value(key)
self.response_queue.put({'method':'network.status', 'params':[key, value]})
2013-09-08 08:23:01 -07:00
def random_server(self):
2013-09-12 05:58:42 -07:00
choice_list = []
l = filter_protocol(self.get_servers(), self.protocol)
2013-09-12 05:58:42 -07:00
for s in l:
if s in self.pending_servers or s in self.disconnected_servers or s in self.interfaces.keys():
2013-09-12 05:58:42 -07:00
continue
else:
choice_list.append(s)
if not choice_list:
2013-10-04 04:51:46 -07:00
return
2013-09-12 05:58:42 -07:00
server = random.choice( choice_list )
return server
2013-09-08 08:23:01 -07:00
2014-07-25 00:11:56 -07:00
def get_parameters(self):
host, port, protocol = deserialize_server(self.default_server)
2014-07-25 00:11:56 -07:00
auto_connect = self.config.get('auto_cycle', True)
return host, port, protocol, self.proxy, auto_connect
2014-07-25 00:11:56 -07:00
def get_interfaces(self):
return self.interfaces.keys()
2013-09-08 08:23:01 -07:00
2013-09-11 23:41:27 -07:00
def get_servers(self):
if self.irc_servers:
out = self.irc_servers
else:
out = DEFAULT_SERVERS
for s in self.recent_servers:
host, port, protocol = deserialize_server(s)
if host not in out:
out[host] = { protocol:port }
return out
2013-09-11 23:41:27 -07:00
def start_interface(self, server):
if server in self.interfaces.keys():
return
2013-10-06 03:28:45 -07:00
i = interface.Interface(server, self.config)
self.pending_servers.add(server)
i.start(self.queue)
2014-07-27 15:13:40 -07:00
return i
def start_random_interface(self):
server = self.random_server()
if server:
self.start_interface(server)
def start_interfaces(self):
self.interface = self.start_interface(self.default_server)
for i in range(self.num_server):
self.start_random_interface()
def start(self, response_queue):
self.running = True
self.response_queue = response_queue
2013-09-09 04:33:25 -07:00
self.start_interfaces()
2014-07-28 06:49:41 -07:00
t = threading.Thread(target=self.process_requests_thread)
t.start()
self.blockchain.start()
2015-03-13 15:04:29 -07:00
util.DaemonThread.start(self)
2013-10-05 01:01:33 -07:00
def set_proxy(self, proxy):
self.proxy = proxy
if proxy:
proxy_mode = proxy_modes.index(proxy["mode"]) + 1
socks.setdefaultproxy(proxy_mode, proxy["host"], int(proxy["port"]))
socket.socket = socks.socksocket
# prevent dns leaks, see http://stackoverflow.com/questions/13184205/dns-over-proxy
2015-03-13 04:00:08 -07:00
socket.getaddrinfo = lambda *args: [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))]
else:
2015-03-13 04:00:08 -07:00
socket.socket = socket._socketobject
socket.getaddrinfo = socket._socket.getaddrinfo
2013-10-05 02:16:09 -07:00
def set_parameters(self, host, port, protocol, proxy, auto_connect):
proxy_str = serialize_proxy(proxy)
server_str = serialize_server(host, port, protocol)
self.config.set_key('auto_cycle', auto_connect, True)
self.config.set_key("proxy", proxy_str, True)
self.config.set_key("server", server_str, True)
# abort if changes were not allowed by config
if self.config.get('server') != server_str or self.config.get('proxy') != proxy_str:
return
2013-10-05 01:01:33 -07:00
if self.proxy != proxy or self.protocol != protocol:
self.print_error('restarting network')
for i in self.interfaces.values():
i.stop()
self.interfaces.pop(i.server)
self.set_proxy(proxy)
self.protocol = protocol
self.disconnected_servers = set([])
if auto_connect:
#self.interface = None
return
2013-10-05 01:01:33 -07:00
if auto_connect:
2013-12-16 11:04:59 -08:00
if not self.interface.is_connected:
2013-10-05 01:01:33 -07:00
self.switch_to_random_interface()
else:
if self.server_is_lagging():
self.stop_interface()
2013-10-05 01:01:33 -07:00
else:
self.set_server(server_str)
2013-10-05 01:01:33 -07:00
def switch_to_random_interface(self):
2014-07-29 05:26:19 -07:00
while self.interfaces:
2014-07-29 05:19:23 -07:00
i = random.choice(self.interfaces.values())
if i.is_connected:
self.switch_to_interface(i)
2014-07-29 08:53:31 -07:00
break
2014-07-29 05:19:23 -07:00
else:
self.remove_interface(i)
2013-10-05 01:01:33 -07:00
def switch_to_interface(self, interface):
server = interface.server
self.print_error("switching to", server)
2013-10-05 01:01:33 -07:00
self.interface = interface
self.config.set_key('server', server, False)
self.default_server = server
2013-10-05 01:01:33 -07:00
self.send_subscriptions()
2014-07-24 14:14:47 -07:00
self.set_status('connected')
self.notify('updated')
2013-10-05 01:01:33 -07:00
2013-10-02 03:13:07 -07:00
def stop_interface(self):
self.interface.stop()
2013-12-17 09:20:54 -08:00
2013-10-02 03:13:07 -07:00
def set_server(self, server):
if self.default_server == server and self.interface.is_connected:
return
if self.protocol != deserialize_server(server)[2]:
2013-10-05 06:31:39 -07:00
return
2013-10-02 03:13:07 -07:00
# stop the interface in order to terminate subscriptions
if self.interface.is_connected:
self.stop_interface()
2013-10-02 03:13:07 -07:00
# notify gui
2014-07-24 14:14:47 -07:00
self.set_status('connecting')
2013-10-02 03:13:07 -07:00
# start interface
2013-09-10 10:59:58 -07:00
self.default_server = server
2013-10-05 01:01:33 -07:00
self.config.set_key("server", server, True)
2013-10-02 04:00:02 -07:00
if server in self.interfaces.keys():
2013-10-05 01:01:33 -07:00
self.switch_to_interface( self.interfaces[server] )
2013-10-02 04:00:02 -07:00
else:
self.interface = self.start_interface(server)
2013-10-02 04:00:02 -07:00
def add_recent_server(self, i):
# list is ordered
s = i.server
if s in self.recent_servers:
self.recent_servers.remove(s)
self.recent_servers.insert(0,s)
self.recent_servers = self.recent_servers[0:20]
self.config.set_key('recent_servers', self.recent_servers)
2013-10-02 04:00:02 -07:00
2013-09-10 10:59:58 -07:00
def add_interface(self, i):
self.interfaces[i.server] = i
self.notify('interfaces')
def remove_interface(self, i):
self.interfaces.pop(i.server)
self.notify('interfaces')
2013-10-04 15:21:48 -07:00
def new_blockchain_height(self, blockchain_height, i):
if self.is_connected():
if self.server_is_lagging():
self.print_error("Server is lagging", blockchain_height, self.get_server_height())
if self.config.get('auto_cycle'):
self.set_server(i.server)
2014-07-30 01:19:15 -07:00
self.notify('updated')
2013-10-04 15:21:48 -07:00
2013-09-10 10:59:58 -07:00
2014-07-27 15:13:40 -07:00
def process_response(self, i, response):
method = response['method']
if method == 'blockchain.address.subscribe':
self.on_address(i, response)
elif method == 'blockchain.headers.subscribe':
self.on_header(i, response)
elif method == 'server.peers.subscribe':
self.on_peers(i, response)
2014-07-27 21:42:05 -07:00
elif method == 'server.banner':
self.on_banner(i, response)
2014-07-27 22:53:02 -07:00
else:
self.response_queue.put(response)
2014-07-27 15:13:40 -07:00
def process_requests_thread(self):
while self.is_running():
try:
request = self.requests_queue.get(timeout=0.1)
except Queue.Empty:
continue
2014-07-27 15:13:40 -07:00
self.process_request(request)
2013-09-08 08:23:01 -07:00
2014-07-27 15:13:40 -07:00
def process_request(self, request):
method = request['method']
params = request['params']
_id = request['id']
if method.startswith('network.'):
out = {'id':_id}
try:
f = getattr(self, method[8:])
except AttributeError:
out['error'] = "unknown method"
try:
out['result'] = f(*params)
except BaseException as e:
out['error'] = str(e)
2015-02-11 12:05:33 -08:00
traceback.print_exc(file=sys.stdout)
self.print_error("network error", str(e))
self.response_queue.put(out)
return
2014-07-27 15:13:40 -07:00
if method == 'blockchain.address.subscribe':
addr = params[0]
if addr in self.addresses:
self.response_queue.put({'id':_id, 'result':self.addresses[addr]})
2014-07-27 15:13:40 -07:00
return
2013-09-08 08:23:01 -07:00
2015-03-07 13:47:25 -08:00
try:
self.interface.send_request(request)
except:
# put it back in the queue
self.print_error("warning: interface not ready for", request)
2015-03-07 13:47:25 -08:00
self.requests_queue.put(request)
time.sleep(0.1)
def run(self):
2015-03-07 13:47:25 -08:00
disconnected_time = time.time()
2013-09-08 08:23:01 -07:00
while self.is_running():
2013-10-04 01:38:03 -07:00
try:
2014-07-30 01:43:15 -07:00
i, response = self.queue.get(timeout=0.1)
2013-10-04 01:38:03 -07:00
except Queue.Empty:
2014-08-17 02:48:46 -07:00
if len(self.interfaces) + len(self.pending_servers) < self.num_server:
2013-10-04 01:38:03 -07:00
self.start_random_interface()
2014-07-30 01:43:15 -07:00
if not self.interfaces:
2015-03-07 13:47:25 -08:00
if time.time() - disconnected_time > DISCONNECTED_RETRY_INTERVAL:
self.print_error('network: retrying connections')
2014-07-30 01:43:15 -07:00
self.disconnected_servers = set([])
2015-03-07 13:47:25 -08:00
disconnected_time = time.time()
if not self.interface.is_connected:
if self.config.get('auto_cycle'):
if self.interfaces:
self.switch_to_random_interface()
else:
if self.default_server in self.interfaces.keys():
self.switch_to_interface(self.interfaces[self.default_server])
else:
if self.default_server not in self.disconnected_servers and self.default_server not in self.pending_servers:
self.print_error("forcing reconnection")
self.interface = self.start_interface(self.default_server)
2013-10-04 01:38:03 -07:00
continue
2014-07-27 15:13:40 -07:00
if response is not None:
self.process_response(i, response)
continue
2013-09-08 08:23:01 -07:00
2014-07-27 15:13:40 -07:00
# if response is None it is a notification about the interface
if i.server in self.pending_servers:
self.pending_servers.remove(i.server)
2013-12-16 11:07:35 -08:00
2013-09-08 08:23:01 -07:00
if i.is_connected:
self.add_interface(i)
self.add_recent_server(i)
2014-07-27 15:13:40 -07:00
i.send_request({'method':'blockchain.headers.subscribe','params':[]})
2013-09-08 08:23:01 -07:00
if i == self.interface:
self.print_error('sending subscriptions to', self.interface.server)
2013-10-02 04:00:02 -07:00
self.send_subscriptions()
2014-07-24 14:14:47 -07:00
self.set_status('connected')
2013-09-08 08:23:01 -07:00
else:
if i.server in self.interfaces:
self.remove_interface(i)
2013-10-04 15:21:48 -07:00
if i.server in self.heights:
self.heights.pop(i.server)
2013-09-08 08:23:01 -07:00
if i == self.interface:
2014-07-24 14:14:47 -07:00
self.set_status('disconnected')
2015-03-07 13:47:25 -08:00
self.disconnected_servers.add(i.server)
2013-10-04 01:38:03 -07:00
self.print_error("stopping interfaces")
2014-07-29 03:13:21 -07:00
for i in self.interfaces.values():
i.stop()
2013-10-04 01:38:03 -07:00
2013-09-15 00:03:45 -07:00
def on_header(self, i, r):
result = r.get('result')
if not result:
return
height = result.get('block_height')
if not height:
return
self.heights[i.server] = height
2014-01-27 01:06:49 -08:00
self.merkle_roots[i.server] = result.get('merkle_root')
self.utxo_roots[i.server] = result.get('utxo_root')
# notify blockchain about the new height
2013-09-11 23:41:27 -07:00
self.blockchain.queue.put((i,result))
2013-09-08 08:23:01 -07:00
2013-10-05 01:01:33 -07:00
if i == self.interface:
if self.server_is_lagging() and self.config.get('auto_cycle'):
self.print_error("Server lagging, stopping interface")
self.stop_interface()
2014-07-30 01:19:15 -07:00
self.notify('updated')
2013-10-05 01:01:33 -07:00
2013-09-11 23:41:27 -07:00
def on_peers(self, i, r):
2013-09-12 05:58:42 -07:00
if not r: return
2014-02-11 00:48:02 -08:00
self.irc_servers = parse_servers(r.get('result'))
2014-07-30 01:19:15 -07:00
self.notify('servers')
2013-09-08 08:23:01 -07:00
2013-09-11 23:41:27 -07:00
def on_banner(self, i, r):
self.banner = r.get('result')
2014-07-30 01:19:15 -07:00
self.notify('banner')
2013-09-08 08:23:01 -07:00
2014-07-27 15:13:40 -07:00
def on_address(self, i, r):
addr = r.get('params')[0]
result = r.get('result')
self.addresses[addr] = result
self.response_queue.put(r)
2014-03-10 12:53:05 -07:00
def get_header(self, tx_height):
return self.blockchain.read_header(tx_height)
def get_local_height(self):
2014-03-11 01:38:08 -07:00
return self.blockchain.height()