electrum-bitcoinprivate/lib/network.py

585 lines
19 KiB
Python
Raw Normal View History

import threading
import time
import Queue
import os
import sys
import random
import traceback
import socks
import socket
2015-04-02 01:12:51 -07:00
import json
import util
from bitcoin import *
2013-09-08 08:23:01 -07:00
import interface
from blockchain import Blockchain
2013-09-11 23:41:27 -07:00
DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'}
DEFAULT_SERVERS = {
2014-08-19 05:41:27 -07:00
'electrum.be':DEFAULT_PORTS,
'electrum.drollette.com':{'t':'50001', 's':'50002'},
'erbium1.sytes.net':{'t':'50001', 's':'50002'},
'ecdsa.net':{'t':'50001', 's':'110'},
'eco-electrum.ddns.net':{'t': '50001', 's': '50002', 'h': '80', 'g': '443'},
'electrum0.electricnewyear.net':{'t':'50001', 's':'50002'},
'kirsche.emzy.de':{'t':'50001', 's':'50002', 'h':'8081'},
'electrum2.hachre.de':DEFAULT_PORTS,
'electrum.hsmiths.com':DEFAULT_PORTS,
'EAST.electrum.jdubya.info':DEFAULT_PORTS,
'WEST.electrum.jdubya.info':DEFAULT_PORTS,
'electrum.no-ip.org':{'t':'50001', 's':'50002', 'h':'80', 'g':'443'},
2014-03-13 06:07:36 -07:00
'electrum.thwg.org':DEFAULT_PORTS,
2014-08-19 05:41:27 -07:00
'us.electrum.be':DEFAULT_PORTS,
2013-09-11 23:41:27 -07:00
}
NODES_RETRY_INTERVAL = 60
SERVER_RETRY_INTERVAL = 10
2014-07-30 01:43:15 -07:00
2013-09-11 23:41:27 -07:00
2014-02-11 00:48:02 -08:00
def parse_servers(result):
""" parse servers list into dict format"""
from version import PROTOCOL_VERSION
servers = {}
for item in result:
host = item[1]
out = {}
version = None
pruning_level = '-'
if len(item) > 2:
for v in item[2]:
if re.match("[stgh]\d*", v):
protocol, port = v[0], v[1:]
if port == '': port = DEFAULT_PORTS[protocol]
out[protocol] = port
elif re.match("v(.?)+", v):
version = v[1:]
elif re.match("p\d*", v):
pruning_level = v[1:]
if pruning_level == '': pruning_level = '0'
try:
2014-02-11 00:48:02 -08:00
is_recent = float(version)>=float(PROTOCOL_VERSION)
except Exception:
is_recent = False
if out and is_recent:
out['pruning'] = pruning_level
servers[host] = out
return servers
2013-10-04 01:38:03 -07:00
2013-09-11 23:41:27 -07:00
def filter_protocol(servers, p):
l = []
for k, protocols in servers.items():
if p in protocols:
s = serialize_server(k, protocols[p], p)
l.append(s)
2013-09-11 23:41:27 -07:00
return l
2013-09-11 23:41:27 -07:00
def pick_random_server(p='s'):
return random.choice( filter_protocol(DEFAULT_SERVERS,p) )
2013-10-06 03:28:45 -07:00
from simple_config import SimpleConfig
2013-09-08 08:23:01 -07:00
proxy_modes = ['socks4', 'socks5', 'http']
def serialize_proxy(p):
if type(p) != dict:
return None
return ':'.join([p.get('mode'),p.get('host'), p.get('port')])
def deserialize_proxy(s):
2015-04-09 10:05:11 -07:00
if s is None:
return None
if s.lower() == 'none':
return None
proxy = { "mode":"socks5", "host":"localhost" }
args = s.split(':')
n = 0
if proxy_modes.count(args[n]) == 1:
proxy["mode"] = args[n]
n += 1
if len(args) > n:
proxy["host"] = args[n]
n += 1
if len(args) > n:
proxy["port"] = args[n]
else:
proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
return proxy
2014-07-24 14:14:47 -07:00
def deserialize_server(server_str):
host, port, protocol = str(server_str).split(':')
assert protocol in 'st'
int(port)
return host, port, protocol
def serialize_server(host, port, protocol):
return str(':'.join([host, port, protocol]))
2014-07-24 14:14:47 -07:00
2015-03-13 15:04:29 -07:00
class Network(util.DaemonThread):
2013-09-08 08:23:01 -07:00
def __init__(self, config=None):
if config is None:
config = {} # Do not use mutables as default values!
2015-03-13 15:04:29 -07:00
util.DaemonThread.__init__(self)
2013-10-06 03:28:45 -07:00
self.config = SimpleConfig(config) if type(config) == type({}) else config
2013-09-08 08:23:01 -07:00
self.lock = threading.Lock()
self.num_server = 8 if not self.config.get('oneserver') else 0
2013-10-06 03:28:45 -07:00
self.blockchain = Blockchain(self.config, self)
2013-09-08 08:23:01 -07:00
self.interfaces = {}
self.queue = Queue.Queue()
# Server for addresses and transactions
self.default_server = self.config.get('server')
2015-02-25 08:14:31 -08:00
# Sanitize default server
try:
deserialize_server(self.default_server)
2015-02-25 08:14:31 -08:00
except:
self.default_server = None
if not self.default_server:
2015-02-28 06:37:50 -08:00
self.default_server = pick_random_server('s')
self.protocol = deserialize_server(self.default_server)[2]
2014-07-25 00:11:56 -07:00
self.irc_servers = {} # returned by interface (list from irc)
2014-07-30 01:43:15 -07:00
self.disconnected_servers = set([])
2014-07-30 01:43:15 -07:00
2015-04-02 01:12:51 -07:00
self.recent_servers = self.read_recent_servers()
self.pending_servers = set()
2013-09-11 23:41:27 -07:00
self.banner = ''
2013-09-13 04:56:33 -07:00
self.interface = None
2013-09-15 00:03:45 -07:00
self.heights = {}
2014-01-27 01:06:49 -08:00
self.merkle_roots = {}
self.utxo_roots = {}
2013-09-08 08:23:01 -07:00
dir_path = os.path.join( self.config.path, 'certs')
if not os.path.exists(dir_path):
os.mkdir(dir_path)
# subscriptions and requests
self.subscribed_addresses = set()
# cached address status
self.addr_responses = {}
# unanswered requests
self.unanswered_requests = {}
self.connection_status = 'connecting'
self.requests_queue = Queue.Queue()
self.set_proxy(deserialize_proxy(self.config.get('proxy')))
# retry times
self.server_retry_time = time.time()
self.nodes_retry_time = time.time()
2015-04-02 01:12:51 -07:00
def read_recent_servers(self):
if not self.config.path:
return []
path = os.path.join(self.config.path, "recent_servers")
try:
with open(path, "r") as f:
data = f.read()
return json.loads(data)
except:
return []
def save_recent_servers(self):
if not self.config.path:
return
path = os.path.join(self.config.path, "recent_servers")
s = json.dumps(self.recent_servers, indent=4, sort_keys=True)
try:
with open(path, "w") as f:
f.write(s)
except:
pass
def get_server_height(self):
return self.heights.get(self.default_server, 0)
def server_is_lagging(self):
h = self.get_server_height()
if not h:
self.print_error('no height for main interface')
return False
lag = self.get_local_height() - self.get_server_height()
return lag > 1
2014-07-24 14:14:47 -07:00
def set_status(self, status):
self.connection_status = status
2014-07-30 01:19:15 -07:00
self.notify('status')
2014-07-24 14:14:47 -07:00
2013-10-04 04:51:46 -07:00
def is_connected(self):
return self.interface and self.interface.is_connected()
2013-10-04 04:51:46 -07:00
2013-10-02 04:00:02 -07:00
def send_subscriptions(self):
# clear cache
self.cached_responses = {}
self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses))
for r in self.unanswered_requests.values():
self.interface.send_request(r)
for addr in self.subscribed_addresses:
self.interface.send_request({'method':'blockchain.address.subscribe','params':[addr]})
2014-07-27 15:13:40 -07:00
self.interface.send_request({'method':'server.banner','params':[]})
self.interface.send_request({'method':'server.peers.subscribe','params':[]})
2013-10-02 04:00:02 -07:00
def get_status_value(self, key):
if key == 'status':
value = self.connection_status
elif key == 'banner':
value = self.banner
elif key == 'updated':
value = (self.get_local_height(), self.get_server_height())
elif key == 'servers':
value = self.get_servers()
elif key == 'interfaces':
value = self.get_interfaces()
return value
2014-07-30 01:19:15 -07:00
def notify(self, key):
value = self.get_status_value(key)
self.response_queue.put({'method':'network.status', 'params':[key, value]})
2013-09-08 08:23:01 -07:00
def random_server(self):
2013-09-12 05:58:42 -07:00
choice_list = []
l = filter_protocol(self.get_servers(), self.protocol)
2013-09-12 05:58:42 -07:00
for s in l:
if s in self.pending_servers or s in self.disconnected_servers or s in self.interfaces.keys():
2013-09-12 05:58:42 -07:00
continue
else:
choice_list.append(s)
if not choice_list:
2013-10-04 04:51:46 -07:00
return
2013-09-12 05:58:42 -07:00
server = random.choice( choice_list )
return server
2013-09-08 08:23:01 -07:00
2014-07-25 00:11:56 -07:00
def get_parameters(self):
host, port, protocol = deserialize_server(self.default_server)
2014-07-25 00:11:56 -07:00
auto_connect = self.config.get('auto_cycle', True)
return host, port, protocol, self.proxy, auto_connect
2014-07-25 00:11:56 -07:00
def get_interfaces(self):
return self.interfaces.keys()
2013-09-08 08:23:01 -07:00
2013-09-11 23:41:27 -07:00
def get_servers(self):
if self.irc_servers:
out = self.irc_servers
else:
out = DEFAULT_SERVERS
for s in self.recent_servers:
2015-03-28 11:17:07 -07:00
try:
host, port, protocol = deserialize_server(s)
except:
continue
if host not in out:
out[host] = { protocol:port }
return out
2013-09-11 23:41:27 -07:00
def start_interface(self, server):
if server in self.interfaces.keys():
return
i = interface.Interface(server, self.queue, self.config)
self.pending_servers.add(server)
i.start()
2014-07-27 15:13:40 -07:00
return i
def start_random_interface(self):
server = self.random_server()
if server:
self.start_interface(server)
def start_interfaces(self):
self.interface = self.start_interface(self.default_server)
for i in range(self.num_server):
self.start_random_interface()
def start(self, response_queue):
self.running = True
self.response_queue = response_queue
2013-09-09 04:33:25 -07:00
self.start_interfaces()
2014-07-28 06:49:41 -07:00
t = threading.Thread(target=self.process_requests_thread)
t.start()
self.blockchain.start()
2015-03-13 15:04:29 -07:00
util.DaemonThread.start(self)
2013-10-05 01:01:33 -07:00
def set_proxy(self, proxy):
self.proxy = proxy
if proxy:
proxy_mode = proxy_modes.index(proxy["mode"]) + 1
socks.setdefaultproxy(proxy_mode, proxy["host"], int(proxy["port"]))
socket.socket = socks.socksocket
# prevent dns leaks, see http://stackoverflow.com/questions/13184205/dns-over-proxy
2015-03-13 04:00:08 -07:00
socket.getaddrinfo = lambda *args: [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (args[0], args[1]))]
else:
2015-03-13 04:00:08 -07:00
socket.socket = socket._socketobject
socket.getaddrinfo = socket._socket.getaddrinfo
2013-10-05 02:16:09 -07:00
def set_parameters(self, host, port, protocol, proxy, auto_connect):
if self.proxy != proxy or self.protocol != protocol:
self.print_error('restarting network')
for i in self.interfaces.values():
i.stop()
self.interfaces.pop(i.server)
self.set_proxy(proxy)
self.protocol = protocol
self.disconnected_servers = set([])
if auto_connect:
#self.interface = None
return
2013-10-05 01:01:33 -07:00
if auto_connect:
if not self.interface.is_connected():
2013-10-05 01:01:33 -07:00
self.switch_to_random_interface()
else:
if self.server_is_lagging():
self.stop_interface()
2013-10-05 01:01:33 -07:00
else:
server_str = serialize_server(host, port, protocol)
self.set_server(server_str)
2013-10-05 01:01:33 -07:00
def switch_to_random_interface(self):
2014-07-29 05:26:19 -07:00
while self.interfaces:
2014-07-29 05:19:23 -07:00
i = random.choice(self.interfaces.values())
if i.is_connected():
2014-07-29 05:19:23 -07:00
self.switch_to_interface(i)
2014-07-29 08:53:31 -07:00
break
2014-07-29 05:19:23 -07:00
else:
self.remove_interface(i)
2013-10-05 01:01:33 -07:00
def switch_to_interface(self, interface):
server = interface.server
self.print_error("switching to", server)
2013-10-05 01:01:33 -07:00
self.interface = interface
self.default_server = server
2013-10-05 01:01:33 -07:00
self.send_subscriptions()
2014-07-24 14:14:47 -07:00
self.set_status('connected')
self.notify('updated')
2013-10-05 01:01:33 -07:00
2013-10-02 03:13:07 -07:00
def stop_interface(self):
self.interface.stop()
2013-12-17 09:20:54 -08:00
2013-10-02 03:13:07 -07:00
def set_server(self, server):
if self.default_server == server and self.interface.is_connected():
return
if self.protocol != deserialize_server(server)[2]:
2013-10-05 06:31:39 -07:00
return
2013-10-02 03:13:07 -07:00
# stop the interface in order to terminate subscriptions
if self.interface.is_connected():
self.stop_interface()
2013-10-02 03:13:07 -07:00
# notify gui
2014-07-24 14:14:47 -07:00
self.set_status('connecting')
2013-10-02 03:13:07 -07:00
# start interface
2013-09-10 10:59:58 -07:00
self.default_server = server
2013-10-02 04:00:02 -07:00
if server in self.interfaces.keys():
2013-10-05 01:01:33 -07:00
self.switch_to_interface( self.interfaces[server] )
2013-10-02 04:00:02 -07:00
else:
self.interface = self.start_interface(server)
2013-10-02 04:00:02 -07:00
def add_recent_server(self, i):
# list is ordered
s = i.server
if s in self.recent_servers:
self.recent_servers.remove(s)
self.recent_servers.insert(0,s)
self.recent_servers = self.recent_servers[0:20]
2015-04-02 01:12:51 -07:00
self.save_recent_servers()
2013-09-10 10:59:58 -07:00
def add_interface(self, i):
self.interfaces[i.server] = i
self.notify('interfaces')
def remove_interface(self, i):
self.interfaces.pop(i.server)
self.notify('interfaces')
2013-10-04 15:21:48 -07:00
def new_blockchain_height(self, blockchain_height, i):
if self.is_connected():
if self.server_is_lagging():
self.print_error("Server is lagging", blockchain_height, self.get_server_height())
if self.config.get('auto_cycle'):
self.set_server(i.server)
2014-07-30 01:19:15 -07:00
self.notify('updated')
2013-10-04 15:21:48 -07:00
2013-09-10 10:59:58 -07:00
2014-07-27 15:13:40 -07:00
def process_response(self, i, response):
# the id comes from the daemon or the network proxy
_id = response.get('id')
if _id is not None:
self.unanswered_requests.pop(_id)
2014-07-27 15:13:40 -07:00
method = response['method']
if method == 'blockchain.address.subscribe':
self.on_address(i, response)
elif method == 'blockchain.headers.subscribe':
self.on_header(i, response)
elif method == 'server.peers.subscribe':
self.on_peers(i, response)
2014-07-27 21:42:05 -07:00
elif method == 'server.banner':
self.on_banner(i, response)
2014-07-27 22:53:02 -07:00
else:
self.response_queue.put(response)
2014-07-27 15:13:40 -07:00
def process_requests_thread(self):
while self.is_running():
try:
request = self.requests_queue.get(timeout=0.1)
except Queue.Empty:
continue
2014-07-27 15:13:40 -07:00
self.process_request(request)
2013-09-08 08:23:01 -07:00
2014-07-27 15:13:40 -07:00
def process_request(self, request):
method = request['method']
params = request['params']
_id = request['id']
if method.startswith('network.'):
out = {'id':_id}
try:
f = getattr(self, method[8:])
except AttributeError:
out['error'] = "unknown method"
try:
out['result'] = f(*params)
except BaseException as e:
out['error'] = str(e)
2015-02-11 12:05:33 -08:00
traceback.print_exc(file=sys.stdout)
self.print_error("network error", str(e))
self.response_queue.put(out)
return
# store request
self.unanswered_requests[_id] = request
2014-07-27 15:13:40 -07:00
if method == 'blockchain.address.subscribe':
addr = params[0]
self.subscribed_addresses.add(addr)
if addr in self.addr_responses:
self.response_queue.put({'id':_id, 'result':self.addr_responses[addr]})
2014-07-27 15:13:40 -07:00
return
2013-09-08 08:23:01 -07:00
2015-03-07 13:47:25 -08:00
try:
self.interface.send_request(request)
except:
# put it back in the queue
self.print_error("warning: interface not ready for", request)
2015-03-07 13:47:25 -08:00
self.requests_queue.put(request)
time.sleep(0.1)
def check_interfaces(self):
now = time.time()
if len(self.interfaces) + len(self.pending_servers) < self.num_server:
self.start_random_interface()
if not self.interfaces:
if now - self.nodes_retry_time > NODES_RETRY_INTERVAL:
self.print_error('network: retrying connections')
self.disconnected_servers = set([])
self.nodes_retry_time = now
if not self.interface.is_connected():
if self.config.get('auto_cycle'):
if self.interfaces:
self.switch_to_random_interface()
else:
if self.default_server in self.interfaces.keys():
self.switch_to_interface(self.interfaces[self.default_server])
else:
if self.default_server in self.disconnected_servers:
if now - self.server_retry_time > SERVER_RETRY_INTERVAL:
self.disconnected_servers.remove(self.default_server)
self.server_retry_time = now
else:
if self.default_server not in self.pending_servers:
self.print_error("forcing reconnection")
self.interface = self.start_interface(self.default_server)
def run(self):
2013-09-08 08:23:01 -07:00
while self.is_running():
self.check_interfaces()
2013-10-04 01:38:03 -07:00
try:
2014-07-30 01:43:15 -07:00
i, response = self.queue.get(timeout=0.1)
2013-10-04 01:38:03 -07:00
except Queue.Empty:
continue
2014-07-27 15:13:40 -07:00
if response is not None:
self.process_response(i, response)
continue
2013-09-08 08:23:01 -07:00
2014-07-27 15:13:40 -07:00
# if response is None it is a notification about the interface
if i.server in self.pending_servers:
self.pending_servers.remove(i.server)
2013-12-16 11:07:35 -08:00
if i.is_connected():
self.add_interface(i)
self.add_recent_server(i)
2014-07-27 15:13:40 -07:00
i.send_request({'method':'blockchain.headers.subscribe','params':[]})
2013-09-08 08:23:01 -07:00
if i == self.interface:
2013-10-02 04:00:02 -07:00
self.send_subscriptions()
2014-07-24 14:14:47 -07:00
self.set_status('connected')
2013-09-08 08:23:01 -07:00
else:
if i.server in self.interfaces:
self.remove_interface(i)
2013-10-04 15:21:48 -07:00
if i.server in self.heights:
self.heights.pop(i.server)
2013-09-08 08:23:01 -07:00
if i == self.interface:
2014-07-24 14:14:47 -07:00
self.set_status('disconnected')
2015-03-07 13:47:25 -08:00
self.disconnected_servers.add(i.server)
2013-10-04 01:38:03 -07:00
self.print_error("stopping interfaces")
2014-07-29 03:13:21 -07:00
for i in self.interfaces.values():
i.stop()
self.print_error("stopped")
2013-10-04 01:38:03 -07:00
2013-09-15 00:03:45 -07:00
def on_header(self, i, r):
result = r.get('result')
if not result:
return
height = result.get('block_height')
if not height:
return
self.heights[i.server] = height
2014-01-27 01:06:49 -08:00
self.merkle_roots[i.server] = result.get('merkle_root')
self.utxo_roots[i.server] = result.get('utxo_root')
# notify blockchain about the new height
2013-09-11 23:41:27 -07:00
self.blockchain.queue.put((i,result))
2013-09-08 08:23:01 -07:00
2013-10-05 01:01:33 -07:00
if i == self.interface:
if self.server_is_lagging() and self.config.get('auto_cycle'):
self.print_error("Server lagging, stopping interface")
self.stop_interface()
2014-07-30 01:19:15 -07:00
self.notify('updated')
2013-10-05 01:01:33 -07:00
2013-09-11 23:41:27 -07:00
def on_peers(self, i, r):
2013-09-12 05:58:42 -07:00
if not r: return
2014-02-11 00:48:02 -08:00
self.irc_servers = parse_servers(r.get('result'))
2014-07-30 01:19:15 -07:00
self.notify('servers')
2013-09-08 08:23:01 -07:00
2013-09-11 23:41:27 -07:00
def on_banner(self, i, r):
self.banner = r.get('result')
2014-07-30 01:19:15 -07:00
self.notify('banner')
2013-09-08 08:23:01 -07:00
2014-07-27 15:13:40 -07:00
def on_address(self, i, r):
addr = r.get('params')[0]
result = r.get('result')
self.addr_responses[addr] = result
2014-07-27 15:13:40 -07:00
self.response_queue.put(r)
2014-03-10 12:53:05 -07:00
def get_header(self, tx_height):
return self.blockchain.read_header(tx_height)
def get_local_height(self):
return self.blockchain.height()