Use select.

This commit is contained in:
Neil Booth 2015-06-03 00:03:33 +09:00 committed by ThomasV
parent aedfbd3855
commit 49a48d52ac
2 changed files with 344 additions and 289 deletions

View File

@ -17,10 +17,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import copy, re, errno, os import os
import threading, traceback, sys, time, Queue import re
import socket import socket
import ssl import ssl
import sys
import threading
import time
import traceback
import requests import requests
ca_path = requests.certs.where() ca_path = requests.certs.where()
@ -28,49 +32,30 @@ ca_path = requests.certs.where()
import util import util
import x509 import x509
import pem import pem
from version import ELECTRUM_VERSION, PROTOCOL_VERSION
from simple_config import SimpleConfig
def Interface(server, response_queue, config = None): def Connection(server, queue, config_path):
"""Interface factory function. The returned interface class handles the connection """Makes asynchronous connections to a remote remote electrum server.
to a single remote electrum server. The object handles all necessary locking. It's Returns the running thread that is making the connection.
exposed API is:
- Inherits everything from threading.Thread. Once the thread has connected, it finishes, placing a tuple on the
- Member functions send_request(), stop(), is_connected() queue of the form (server, socket), where socket is None if
- Member variable server. connection failed.
"server" is constant for the object's lifetime and hence synchronization is unnecessary.
""" """
host, port, protocol = server.split(':') host, port, protocol = server.split(':')
if protocol in 'st': if not protocol in 'st':
return TcpInterface(server, response_queue, config) raise Exception('Unknown protocol: %s' % protocol)
else: c = TcpConnection(server, queue, config_path)
raise Exception('Unknown protocol: %s'%protocol) c.start()
return c
# Connection status class TcpConnection(threading.Thread):
CS_OPENING, CS_CONNECTED, CS_FAILED = range(3)
class TcpInterface(threading.Thread): def __init__(self, server, queue, config_path):
def __init__(self, server, response_queue, config = None):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.daemon = True self.daemon = True
self.config = config if config is not None else SimpleConfig() self.config_path = config_path
# Set by stop(); no more data is exchanged and the thread exits after gracefully self.queue = queue
# closing the socket
self.disconnect = False
self._status = CS_OPENING
self.debug = False # dump network messages. can be changed at runtime using the console
self.message_id = 0
self.response_queue = response_queue
self.request_queue = Queue.Queue()
self.unanswered_requests = {}
# request timeouts
self.request_time = time.time()
self.ping_time = 0
# parse server
self.server = server self.server = server
self.host, self.port, self.protocol = self.server.split(':') self.host, self.port, self.protocol = self.server.split(':')
self.host = str(self.host) self.host = str(self.host)
@ -78,47 +63,31 @@ class TcpInterface(threading.Thread):
self.use_ssl = (self.protocol == 's') self.use_ssl = (self.protocol == 's')
def print_error(self, *msg): def print_error(self, *msg):
util.print_error("[%s]"%self.host, *msg) util.print_error("[%s]" % self.host, *msg)
def process_response(self, response): def check_host_name(self, peercert, name):
if self.debug: """Simple certificate/host name checker. Returns True if the
self.print_error("<--", response) certificate matches, False otherwise. Does not support
wildcards."""
msg_id = response.get('id') # Check that the peer has supplied a certificate.
error = response.get('error') # None/{} is not acceptable.
result = response.get('result') if not peercert:
return False
if msg_id is not None: if peercert.has_key("subjectAltName"):
method, params, _id, queue = self.unanswered_requests.pop(msg_id) for typ, val in peercert["subjectAltName"]:
if queue is None: if typ == "DNS" and val == name:
queue = self.response_queue return True
else: else:
# notification # Only check the subject DN if there is no subject alternative
method = response.get('method') # name.
params = response.get('params') cn = None
_id = None for attr, val in peercert["subject"]:
queue = self.response_queue # Use most-specific (last) commonName attribute.
# restore parameters if attr == "commonName":
if method == 'blockchain.numblocks.subscribe': cn = val
result = params[0] if cn is not None:
params = [] return cn == name
elif method == 'blockchain.headers.subscribe': return False
result = params[0]
params = []
elif method == 'blockchain.address.subscribe':
addr = params[0]
result = params[1]
params = [addr]
if method == 'server.version':
self.server_version = result
return
if error:
queue.put((self, {'method':method, 'params':params, 'error':error, 'id':_id}))
else:
queue.put((self, {'method':method, 'params':params, 'result':result, 'id':_id}))
def get_simple_socket(self): def get_simple_socket(self):
try: try:
@ -138,10 +107,9 @@ class TcpInterface(threading.Thread):
else: else:
self.print_error("failed to connect", str(e)) self.print_error("failed to connect", str(e))
def get_socket(self): def get_socket(self):
if self.use_ssl: if self.use_ssl:
cert_path = os.path.join( self.config.path, 'certs', self.host) cert_path = os.path.join(self.config_path, 'certs', self.host)
if not os.path.exists(cert_path): if not os.path.exists(cert_path):
is_new = True is_new = True
s = self.get_simple_socket() s = self.get_simple_socket()
@ -152,7 +120,7 @@ class TcpInterface(threading.Thread):
s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1, cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path, do_handshake_on_connect=True) s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1, cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path, do_handshake_on_connect=True)
except ssl.SSLError, e: except ssl.SSLError, e:
s = None s = None
if s and check_host_name(s.getpeercert(), self.host): if s and self.check_host_name(s.getpeercert(), self.host):
self.print_error("SSL certificate signed by CA") self.print_error("SSL certificate signed by CA")
return s return s
@ -229,117 +197,131 @@ class TcpInterface(threading.Thread):
return s return s
def send_request(self, request, response_queue = None): def run(self):
socket = self.get_socket()
if socket:
self.print_error("connected")
self.queue.put((self.server, socket))
class Interface:
"""The Interface class handles a socket connected to a single remote
electrum server. It's exposed API is:
- Member functions close(), fileno(), get_responses(), has_timed_out(),
ping_required(), queue_request(), send_requests()
- Member variable server.
"""
def __init__(self, server, socket):
self.server = server
self.host, _, _ = server.split(':')
self.socket = socket
self.pipe = util.SocketPipe(socket)
self.pipe.set_timeout(0.0) # Don't wait for data
# Dump network messages. Set at runtime from the console.
self.debug = False
self.message_id = 0
self.unsent_requests = []
self.unanswered_requests = {}
# Set last ping to zero to ensure immediate ping
self.last_request = time.time()
self.last_ping = 0
self.closed_remotely = False
def print_error(self, *msg):
util.print_error("[%s]" % self.host, *msg)
def fileno(self):
# Needed for select
return self.socket.fileno()
def close(self):
if not self.closed_remotely:
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
def queue_request(self, request):
'''Queue a request.''' '''Queue a request.'''
self.request_time = time.time() self.request_time = time.time()
self.request_queue.put((copy.deepcopy(request), response_queue)) self.unsent_requests.append(request)
def send_requests(self): def send_requests(self):
'''Sends all queued requests''' '''Sends all queued requests. Returns False on failure.'''
while self.is_connected() and not self.request_queue.empty(): def copy_request(orig):
request, response_queue = self.request_queue.get() # Replace ID after making copy - mustn't change caller's copy
method = request.get('method') request = orig.copy()
params = request.get('params') request['id'] = self.message_id
r = {'id': self.message_id, 'method': method, 'params': params}
try:
self.pipe.send(r)
except socket.error, e:
self.print_error("socket error:", e)
self.stop()
return
if self.debug:
self.print_error("-->", r)
self.unanswered_requests[self.message_id] = method, params, request.get('id'), response_queue
self.message_id += 1 self.message_id += 1
if self.debug:
self.print_error("-->", request, orig.get('id'))
return request
def is_connected(self): requests_as_sent = map(copy_request, self.unsent_requests)
'''True if status is connected''' try:
return self._status == CS_CONNECTED and not self.disconnect self.pipe.send_all(requests_as_sent)
except socket.error, e:
self.print_error("socket error:", e)
return False
# unanswered_requests stores the original unmodified user
# request, keyed by wire ID
for n, request in enumerate(self.unsent_requests):
self.unanswered_requests[requests_as_sent[n]['id']] = request
self.unsent_requests = []
return True
def stop(self): def ping_required(self):
if not self.disconnect: '''Maintains time since last ping. Returns True if a ping should
self.disconnect = True be sent.
self.print_error("disconnecting") '''
now = time.time()
if now - self.last_ping > 60:
self.last_ping = now
return True
return False
def maybe_ping(self): def has_timed_out(self):
# ping the server with server.version '''Returns True if the interface has timed out.'''
if time.time() - self.ping_time > 60: if (self.unanswered_requests and time.time() - self.request_time > 10
self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]}) and self.pipe.idle_time() > 10):
self.ping_time = time.time() self.print_error("timeout", len(self.unanswered_requests))
# stop interface if we have been waiting for more than 10 seconds return True
if self.unanswered_requests and time.time() - self.request_time > 10 and self.pipe.idle_time() > 10:
self.print_error("interface timeout", len(self.unanswered_requests))
self.stop()
def get_and_process_response(self): return False
if self.is_connected():
def get_responses(self):
'''Call if there is data available on the socket. Returns a list of
notifications and a list of responses. The notifications are
singleton unsolicited responses presumably as a result of
prior subscriptions. The responses are (request, response)
pairs. If the connection was closed remotely or the remote
server is misbehaving, the last notification will be None.
'''
notifications, responses = [], []
while True:
try: try:
response = self.pipe.get() response = self.pipe.get()
except util.timeout: except util.timeout:
return break
# If remote side closed the socket, SocketPipe closes our socket and returns None
if response is None: if response is None:
self.disconnect = True notifications.append(None)
self.closed_remotely = True
self.print_error("connection closed remotely") self.print_error("connection closed remotely")
break
if self.debug:
self.print_error("<--", response)
wire_id = response.pop('id', None)
if wire_id is None:
notifications.append(response)
elif wire_id in self.unanswered_requests:
request = self.unanswered_requests.pop(wire_id)
responses.append((request, response))
else: else:
self.process_response(response) notifications.append(None)
self.print_error("unknown wire ID", wire_id)
break
def run(self): return notifications, responses
s = self.get_socket()
if s:
self.pipe = util.SocketPipe(s)
s.settimeout(0.1)
self.print_error("connected")
self._status = CS_CONNECTED
# Indicate to parent that we've connected
self.notify_status()
while self.is_connected():
self.maybe_ping()
self.send_requests()
self.get_and_process_response()
s.shutdown(socket.SHUT_RDWR)
s.close()
# Also for the s is None case
self._status = CS_FAILED
# Indicate to parent that the connection is now down
self.notify_status()
def notify_status(self):
'''Notify owner that we have just connected or just failed the connection.
Owner determines which through e.g. testing is_connected()'''
self.response_queue.put((self, None))
def _match_hostname(name, val):
if val == name:
return True
return val.startswith('*.') and name.endswith(val[1:])
def check_host_name(peercert, name):
"""Simple certificate/host name checker. Returns True if the
certificate matches, False otherwise."""
# Check that the peer has supplied a certificate.
# None/{} is not acceptable.
if not peercert:
return False
if peercert.has_key("subjectAltName"):
for typ, val in peercert["subjectAltName"]:
if typ == "DNS" and _match_hostname(name, val):
return True
else:
# Only check the subject DN if there is no subject alternative
# name.
cn = None
for attr, val in peercert["subject"]:
# Use most-specific (last) commonName attribute.
if attr == "commonName":
cn = val
if cn is not None:
return _match_hostname(name, cn)
return False
def check_cert(host, cert): def check_cert(host, cert):
@ -361,7 +343,15 @@ def check_cert(host, cert):
util.print_msg(m) util.print_msg(m)
# Used by tests
def _match_hostname(name, val):
if val == name:
return True
return val.startswith('*.') and name.endswith(val[1:])
def test_certificates(): def test_certificates():
from simple_config import SimpleConfig
config = SimpleConfig() config = SimpleConfig()
mydir = os.path.join(config.path, "certs") mydir = os.path.join(config.path, "certs")
certs = os.listdir(mydir) certs = os.listdir(mydir)

View File

@ -3,7 +3,9 @@ import Queue
import os import os
import sys import sys
import random import random
import select
import traceback import traceback
from collections import deque
import socks import socks
import socket import socket
@ -11,9 +13,9 @@ import json
import util import util
from bitcoin import * from bitcoin import *
import interface from interface import Connection, Interface
from blockchain import Blockchain from blockchain import Blockchain
from collections import deque from version import ELECTRUM_VERSION, PROTOCOL_VERSION
DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'} DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'}
@ -119,11 +121,13 @@ def deserialize_server(server_str):
def serialize_server(host, port, protocol): def serialize_server(host, port, protocol):
return str(':'.join([host, port, protocol])) return str(':'.join([host, port, protocol]))
class Network(util.DaemonThread): class Network(util.DaemonThread):
"""The Network class manages a set of connections to remote """The Network class manages a set of connections to remote electrum
electrum servers, each connection is handled by its own servers, each connected socket is handled by an Interface() object.
thread object returned from Interface(). Its external API: Connections are initiated by a Connection() thread which stops once
the connection succeeds or fails.
Our external API:
- Member functions get_header(), get_parameters(), get_status_value(), - Member functions get_header(), get_parameters(), get_status_value(),
new_blockchain_height(), set_parameters(), start(), new_blockchain_height(), set_parameters(), start(),
@ -137,7 +141,6 @@ class Network(util.DaemonThread):
self.config = SimpleConfig(config) if type(config) == type({}) else config self.config = SimpleConfig(config) if type(config) == type({}) else config
self.num_server = 8 if not self.config.get('oneserver') else 0 self.num_server = 8 if not self.config.get('oneserver') else 0
self.blockchain = Blockchain(self.config, self) self.blockchain = Blockchain(self.config, self)
self.queue = Queue.Queue()
self.requests_queue = pipe.send_queue self.requests_queue = pipe.send_queue
self.response_queue = pipe.get_queue self.response_queue = pipe.get_queue
# A deque of interface header requests, processed left-to-right # A deque of interface header requests, processed left-to-right
@ -169,7 +172,7 @@ class Network(util.DaemonThread):
self.subscribed_addresses = set() self.subscribed_addresses = set()
# cached address status # cached address status
self.addr_responses = {} self.addr_responses = {}
# unanswered requests # Requests from client we've not seen a response to
self.unanswered_requests = {} self.unanswered_requests = {}
# retry times # retry times
self.server_retry_time = time.time() self.server_retry_time = time.time()
@ -180,6 +183,8 @@ class Network(util.DaemonThread):
self.interface = None self.interface = None
self.interfaces = {} self.interfaces = {}
self.auto_connect = self.config.get('auto_connect', False) self.auto_connect = self.config.get('auto_connect', False)
self.connecting = {}
self.socket_queue = Queue.Queue()
self.start_network(deserialize_server(self.default_server)[2], self.start_network(deserialize_server(self.default_server)[2],
deserialize_proxy(self.config.get('proxy'))) deserialize_proxy(self.config.get('proxy')))
@ -224,19 +229,22 @@ class Network(util.DaemonThread):
self.notify('status') self.notify('status')
def is_connected(self): def is_connected(self):
return self.interface and self.interface.is_connected() return self.interface is not None
def queue_request(self, method, params):
self.interface.queue_request({'method': method, 'params': params})
def send_subscriptions(self): def send_subscriptions(self):
# clear cache # clear cache
self.cached_responses = {} self.cached_responses = {}
self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses)) self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses))
for r in self.unanswered_requests.values(): for r in self.unanswered_requests.values():
self.interface.send_request(r) self.interface.queue_request(r)
for addr in self.subscribed_addresses: for addr in self.subscribed_addresses:
self.interface.send_request({'method':'blockchain.address.subscribe','params':[addr]}) self.queue_request('blockchain.address.subscribe', [addr])
self.interface.send_request({'method':'server.banner','params':[]}) self.queue_request('server.banner', [])
self.interface.send_request({'method':'server.peers.subscribe','params':[]}) self.queue_request('server.peers.subscribe', [])
self.interface.send_request({'method':'blockchain.estimatefee','params':[2]}) self.queue_request('blockchain.estimatefee', [2])
def get_status_value(self, key): def get_status_value(self, key):
if key == 'status': if key == 'status':
@ -263,7 +271,7 @@ class Network(util.DaemonThread):
def get_interfaces(self): def get_interfaces(self):
'''The interfaces that are in connected state''' '''The interfaces that are in connected state'''
return [s for s, i in self.interfaces.items() if i.is_connected()] return self.interfaces.keys()
def get_servers(self): def get_servers(self):
if self.irc_servers: if self.irc_servers:
@ -280,12 +288,11 @@ class Network(util.DaemonThread):
return out return out
def start_interface(self, server): def start_interface(self, server):
if not server in self.interfaces.keys(): if (not server in self.interfaces and not server in self.connecting):
if server == self.default_server: if server == self.default_server:
self.set_status('connecting') self.set_status('connecting')
i = interface.Interface(server, self.queue, self.config) c = Connection(server, self.socket_queue, self.config.path)
self.interfaces[i.server] = i self.connecting[server] = c
i.start()
def start_random_interface(self): def start_random_interface(self):
exclude_set = self.disconnected_servers.union(set(self.interfaces)) exclude_set = self.disconnected_servers.union(set(self.interfaces))
@ -312,6 +319,7 @@ class Network(util.DaemonThread):
def start_network(self, protocol, proxy): def start_network(self, protocol, proxy):
assert not self.interface and not self.interfaces assert not self.interface and not self.interfaces
assert not self.connecting and self.socket_queue.empty()
self.print_error('starting network') self.print_error('starting network')
self.disconnected_servers = set([]) self.disconnected_servers = set([])
self.protocol = protocol self.protocol = protocol
@ -320,10 +328,13 @@ class Network(util.DaemonThread):
def stop_network(self): def stop_network(self):
self.print_error("stopping network") self.print_error("stopping network")
for i in self.interfaces.values(): for interface in self.interfaces.values():
i.stop() self.close_interface(interface)
self.interface = None assert self.interface is None
self.interfaces = {} assert not self.interfaces
self.connecting = {}
# Get a new queue - no old pending connections thanks!
self.socket_queue = Queue.Queue()
def set_parameters(self, host, port, protocol, proxy, auto_connect): def set_parameters(self, host, port, protocol, proxy, auto_connect):
self.auto_connect = auto_connect self.auto_connect = auto_connect
@ -339,7 +350,10 @@ class Network(util.DaemonThread):
self.switch_lagging_interface() self.switch_lagging_interface()
def switch_to_random_interface(self): def switch_to_random_interface(self):
'''Switch to a random connected server other than the current one'''
servers = self.get_interfaces() # Those in connected state servers = self.get_interfaces() # Those in connected state
if self.default_server in servers:
servers.remove(self.default_server)
if servers: if servers:
self.switch_to_interface(random.choice(servers)) self.switch_to_interface(random.choice(servers))
@ -362,30 +376,28 @@ class Network(util.DaemonThread):
self.start_interface(server) self.start_interface(server)
return return
i = self.interfaces[server] i = self.interfaces[server]
if not i.is_connected():
# do nothing; we will switch once connected
return
if self.interface != i: if self.interface != i:
self.print_error("switching to", server) self.print_error("switching to", server)
# stop any current interface in order to terminate subscriptions # stop any current interface in order to terminate subscriptions
self.stop_interface() self.close_interface(self.interface)
self.interface = i self.interface = i
self.addr_responses = {} self.addr_responses = {}
self.send_subscriptions() self.send_subscriptions()
self.set_status('connected') self.set_status('connected')
self.notify('updated') self.notify('updated')
def stop_interface(self): def close_interface(self, interface):
if self.interface: if interface:
self.interface.stop() self.interfaces.pop(interface.server)
self.interface = None if interface.server == self.default_server:
self.interface = None
interface.close()
def add_recent_server(self, i): def add_recent_server(self, server):
# list is ordered # list is ordered
s = i.server if server in self.recent_servers:
if s in self.recent_servers: self.recent_servers.remove(server)
self.recent_servers.remove(s) self.recent_servers.insert(0, server)
self.recent_servers.insert(0,s)
self.recent_servers = self.recent_servers[0:20] self.recent_servers = self.recent_servers[0:20]
self.save_recent_servers() self.save_recent_servers()
@ -393,70 +405,75 @@ class Network(util.DaemonThread):
self.switch_lagging_interface(i.server) self.switch_lagging_interface(i.server)
self.notify('updated') self.notify('updated')
def process_if_notification(self, i): def process_response(self, interface, response):
'''Handle interface addition and removal through notifications''' error = response.get('error')
if i.is_connected():
self.add_recent_server(i)
i.send_request({'method':'blockchain.headers.subscribe','params':[]})
if i.server == self.default_server:
self.switch_to_interface(i.server)
else:
self.interfaces.pop(i.server, None)
self.heights.pop(i.server, None)
if i == self.interface:
self.interface = None
self.addr_responses = {}
self.set_status('disconnected')
self.disconnected_servers.add(i.server)
# Our set of interfaces changed
self.notify('interfaces')
def process_response(self, i, response):
# the id comes from the daemon or the network proxy
_id = response.get('id')
if _id is not None:
if i != self.interface:
return
self.unanswered_requests.pop(_id)
method = response.get('method')
result = response.get('result') result = response.get('result')
if method == 'blockchain.headers.subscribe': method = response.get('method')
self.on_header(i, response)
# We handle some responses; return the rest to the client.
if method == 'server.version':
interface.server_version = result
elif method == 'blockchain.headers.subscribe':
if error is None:
self.on_header(interface, result)
elif method == 'server.peers.subscribe': elif method == 'server.peers.subscribe':
self.irc_servers = parse_servers(result) if error is None:
self.notify('servers') self.irc_servers = parse_servers(result)
self.notify('servers')
elif method == 'server.banner': elif method == 'server.banner':
self.banner = result if error is None:
self.notify('banner') self.banner = result
self.notify('banner')
elif method == 'blockchain.estimatefee': elif method == 'blockchain.estimatefee':
from bitcoin import COIN if error is None:
self.fee = int(result * COIN) self.fee = int(result * COIN)
self.print_error("recommended fee", self.fee) self.print_error("recommended fee", self.fee)
self.notify('fee') self.notify('fee')
elif method == 'blockchain.address.subscribe':
addr = response.get('params')[0]
self.addr_responses[addr] = result
self.response_queue.put(response)
elif method == 'blockchain.block.get_chunk': elif method == 'blockchain.block.get_chunk':
self.on_get_chunk(i, response) self.on_get_chunk(interface, response)
elif method == 'blockchain.block.get_header': elif method == 'blockchain.block.get_header':
self.on_get_header(i, response) self.on_get_header(interface, response)
else: else:
# Cache address subscription results
if method == 'blockchain.address.subscribe' and error is None:
addr = response['params'][0]
self.addr_responses[addr] = result
self.response_queue.put(response) self.response_queue.put(response)
def handle_requests(self): def process_responses(self, interface):
'''Some requests require connectivity, others we handle locally in notifications, responses = interface.get_responses()
process_request() and must do so in order to e.g. prevent the
daemon seeming unresponsive. for request, response in responses:
''' # Client ID was given by the daemon or proxy
unhandled = [] client_id = request.get('id')
if client_id is not None:
if interface != self.interface:
continue
self.unanswered_requests.pop(client_id)
# Copy the request method and params to the response
response['method'] = request.get('method')
response['params'] = request.get('params')
response['id'] = client_id
self.process_response(interface, response)
for response in notifications:
if not response: # Closed remotely
self.connection_down(interface.server)
break
# Rewrite response shape to match subscription request response
method = response.get('method')
if method == 'blockchain.headers.subscribe':
response['result'] = response['params'][0]
response['params'] = []
elif method == 'blockchain.address.subscribe':
params = response['params']
response['params'] = [params[0]] # addr
response['result'] = params[1]
self.process_response(interface, response)
def handle_incoming_requests(self):
while not self.requests_queue.empty(): while not self.requests_queue.empty():
request = self.requests_queue.get() self.process_request(self.requests_queue.get())
if not self.process_request(request):
unhandled.append(request)
for request in unhandled:
self.requests_queue.put(request)
def process_request(self, request): def process_request(self, request):
'''Returns true if the request was processed.''' '''Returns true if the request was processed.'''
@ -487,22 +504,62 @@ class Network(util.DaemonThread):
# This request needs connectivity. If we don't have an # This request needs connectivity. If we don't have an
# interface, we cannot process it. # interface, we cannot process it.
if not self.is_connected(): if not self.interface:
return False return False
self.unanswered_requests[_id] = request self.unanswered_requests[_id] = request
self.interface.send_request(request) self.interface.queue_request(request)
return True return True
def check_interfaces(self): def connection_down(self, server):
'''A connection to server either went down, or was never made.
We distinguish by whether it is in self.interfaces.'''
self.disconnected_servers.add(server)
if server == self.default_server:
self.set_status('disconnected')
if server in self.interfaces:
self.close_interface(self.interfaces[server])
self.heights.pop(server, None)
self.notify('interfaces')
def new_interface(self, server, socket):
self.add_recent_server(server)
self.interfaces[server] = interface = Interface(server, socket)
interface.queue_request({'method': 'blockchain.headers.subscribe',
'params': []})
if server == self.default_server:
self.switch_to_interface(server)
self.notify('interfaces')
def maintain_sockets(self):
'''Socket maintenance.'''
# Responses to connection attempts?
while not self.socket_queue.empty():
server, socket = self.socket_queue.get()
self.connecting.pop(server)
if socket:
self.new_interface(server, socket)
else:
self.connection_down(server)
# Send pings and shut down stale interfaces
for interface in self.interfaces.values():
if interface.has_timed_out():
self.connection_down(interface.server)
elif interface.ping_required():
version_req = {'method': 'server.version',
'params': [ELECTRUM_VERSION, PROTOCOL_VERSION]}
interface.queue_request(version_req)
now = time.time() now = time.time()
# nodes # nodes
if len(self.interfaces) < self.num_server: if len(self.interfaces) + len(self.connecting) < self.num_server:
self.start_random_interface() self.start_random_interface()
if now - self.nodes_retry_time > NODES_RETRY_INTERVAL: if now - self.nodes_retry_time > NODES_RETRY_INTERVAL:
self.print_error('network: retrying connections') self.print_error('network: retrying connections')
self.disconnected_servers = set([]) self.disconnected_servers = set([])
self.nodes_retry_time = now self.nodes_retry_time = now
# main interface # main interface
if not self.is_connected(): if not self.is_connected():
if self.auto_connect: if self.auto_connect:
@ -517,7 +574,8 @@ class Network(util.DaemonThread):
def request_chunk(self, interface, data, idx): def request_chunk(self, interface, data, idx):
interface.print_error("requesting chunk %d" % idx) interface.print_error("requesting chunk %d" % idx)
interface.send_request({'method':'blockchain.block.get_chunk', 'params':[idx]}) interface.queue_request({'method':'blockchain.block.get_chunk',
'params':[idx]})
data['chunk_idx'] = idx data['chunk_idx'] = idx
data['req_time'] = time.time() data['req_time'] = time.time()
@ -537,7 +595,8 @@ class Network(util.DaemonThread):
def request_header(self, interface, data, height): def request_header(self, interface, data, height):
interface.print_error("requesting header %d" % height) interface.print_error("requesting header %d" % height)
interface.send_request({'method':'blockchain.block.get_header', 'params':[height]}) interface.queue_request({'method':'blockchain.block.get_header',
'params':[height]})
data['header_height'] = height data['header_height'] = height
data['req_time'] = time.time() data['req_time'] = time.time()
if not 'chain' in data: if not 'chain' in data:
@ -563,7 +622,9 @@ class Network(util.DaemonThread):
self.request_header(interface, data, next_height) self.request_header(interface, data, next_height)
def bc_request_headers(self, interface, data): def bc_request_headers(self, interface, data):
'''Send a request for the next header, or a chunk of them, if necessary''' '''Send a request for the next header, or a chunk of them,
if necessary.
'''
local_height, if_height = self.get_local_height(), data['if_height'] local_height, if_height = self.get_local_height(), data['if_height']
if if_height <= local_height: if if_height <= local_height:
return False return False
@ -575,11 +636,12 @@ class Network(util.DaemonThread):
def handle_bc_requests(self): def handle_bc_requests(self):
'''Work through each interface that has notified us of a new header. '''Work through each interface that has notified us of a new header.
Send it requests if it is ahead of our blockchain object''' Send it requests if it is ahead of our blockchain object.
'''
while self.bc_requests: while self.bc_requests:
interface, data = self.bc_requests.popleft() interface, data = self.bc_requests.popleft()
# If the connection was lost move on # If the connection was lost move on
if not interface.is_connected(): if not interface in self.interfaces.values():
continue continue
req_time = data.get('req_time') req_time = data.get('req_time')
@ -590,36 +652,39 @@ class Network(util.DaemonThread):
continue continue
elif time.time() - req_time > 10: elif time.time() - req_time > 10:
interface.print_error("blockchain request timed out") interface.print_error("blockchain request timed out")
interface.stop() self.connection_down(interface.server)
continue continue
# Put updated request state back at head of deque # Put updated request state back at head of deque
self.bc_requests.appendleft((interface, data)) self.bc_requests.appendleft((interface, data))
break break
def wait_on_sockets(self):
# Python docs say Windows doesn't like empty selects.
# Sleep to prevent busy looping
if not self.interfaces:
time.sleep(0.1)
return
rin = [i for i in self.interfaces.values()]
win = [i for i in self.interfaces.values() if i.unsent_requests]
rout, wout, xout = select.select(rin, win, [], 0.1)
assert not xout
for interface in wout:
interface.send_requests()
for interface in rout:
self.process_responses(interface)
def run(self): def run(self):
self.blockchain.init() self.blockchain.init()
while self.is_running(): while self.is_running():
self.check_interfaces() self.maintain_sockets()
self.handle_requests() self.wait_on_sockets()
self.handle_incoming_requests()
self.handle_bc_requests() self.handle_bc_requests()
try:
i, response = self.queue.get(timeout=0.1)
except Queue.Empty:
continue
# if response is None it is a notification about the interface
if response is None:
self.process_if_notification(i)
else:
self.process_response(i, response)
self.stop_network() self.stop_network()
self.print_error("stopped") self.print_error("stopped")
def on_header(self, i, r): def on_header(self, i, header):
header = r.get('result')
if not header:
return
height = header.get('block_height') height = header.get('block_height')
if not height: if not height:
return return