Fix two races in interface.py.

First, close the socket from the thread itself rather than from
the stop() function.  This prevents another thread closing the
socket that the interface thread is simultaneously using.

Second, it occasionally would happen that the parent thread such as
network.py start() an interface, do a send_request() and timeout
waiting for a response (timeouts are 0.1s).  It would check
is_connected(), get False, and assume the connection has failed.
In fact the thread hadn't even been scheduled or gotten around to
completing the socket connection.  Fix by having self.connected
start out True.  If the connection fails or times out, we set
connected to False soon enough.

Finally for correctness we need to deepcopy a send_request() rather
than take a reference to it.
This commit is contained in:
Neil Booth 2015-05-07 08:16:42 +09:00
parent 656560be72
commit 97b5f6d6a3
1 changed files with 37 additions and 34 deletions

View File

@ -17,16 +17,16 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import re, errno, os import copy, re, errno, os
import threading, traceback, sys, time, Queue import threading, traceback, sys, time, Queue
import socket import socket
import ssl import ssl
import x509
import util
import requests import requests
ca_path = requests.certs.where() ca_path = requests.certs.where()
import util
import x509
from version import ELECTRUM_VERSION, PROTOCOL_VERSION from version import ELECTRUM_VERSION, PROTOCOL_VERSION
from simple_config import SimpleConfig from simple_config import SimpleConfig
@ -40,8 +40,7 @@ def Interface(server, response_queue, config = None):
- Member functions send_request(), stop(), is_connected() - Member functions send_request(), stop(), is_connected()
- Member variable server. - Member variable server.
"is_connected()" is currently racy. "server" is constant for the object's lifetime and hence "server" is constant for the object's lifetime and hence synchronization is unnecessary.
synchronization is unnecessary.
""" """
host, port, protocol = server.split(':') host, port, protocol = server.split(':')
if protocol in 'st': if protocol in 'st':
@ -55,7 +54,12 @@ class TcpInterface(threading.Thread):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.daemon = True self.daemon = True
self.config = config if config is not None else SimpleConfig() self.config = config if config is not None else SimpleConfig()
self.connected = False # Set by stop(); no more data is exchanged and the thread exits after gracefully
# closing the socket
self.disconnect = False
# Initially True to avoid a race; set to False on failure to create a socket or when
# it is closed
self.connected = True
self.debug = False # dump network messages. can be changed at runtime using the console self.debug = False # dump network messages. can be changed at runtime using the console
self.message_id = 0 self.message_id = 0
self.response_queue = response_queue self.response_queue = response_queue
@ -250,7 +254,7 @@ class TcpInterface(threading.Thread):
def send_request(self, request, response_queue = None): def send_request(self, request, response_queue = None):
'''Queue a request. Blocking only if called from other threads.''' '''Queue a request. Blocking only if called from other threads.'''
self.request_time = time.time() self.request_time = time.time()
self.request_queue.put((request, response_queue), threading.current_thread() != self) self.request_queue.put((copy.deepcopy(request), response_queue), threading.current_thread() != self)
def send_requests(self): def send_requests(self):
'''Sends all queued requests''' '''Sends all queued requests'''
@ -271,14 +275,11 @@ class TcpInterface(threading.Thread):
self.message_id += 1 self.message_id += 1
def is_connected(self): def is_connected(self):
return self.connected return self.connected and not self.disconnect
def stop(self): def stop(self):
if self.connected and self.protocol in 'st' and self.s: self.disconnect = True
self.s.shutdown(socket.SHUT_RDWR) self.print_error("disconnecting")
self.s.close()
self.connected = False
self.print_error("stopped")
def maybe_ping(self): def maybe_ping(self):
# ping the server with server.version # ping the server with server.version
@ -290,7 +291,7 @@ class TcpInterface(threading.Thread):
self.print_error("interface timeout", len(self.unanswered_requests)) self.print_error("interface timeout", len(self.unanswered_requests))
self.stop() self.stop()
def get_response(self): def get_and_process_response(self):
if self.is_connected(): if self.is_connected():
try: try:
response = self.pipe.get() response = self.pipe.get()
@ -298,28 +299,30 @@ class TcpInterface(threading.Thread):
return return
# If remote side closed the socket, SocketPipe closes our socket and returns None # If remote side closed the socket, SocketPipe closes our socket and returns None
if response is None: if response is None:
self.connected = False self.connected = False # Don't re-close the socket
return response self.print_error("connection closed remotely")
else:
def run(self):
self.s = self.get_socket()
if self.s:
self.pipe = util.SocketPipe(self.s)
self.s.settimeout(0.1)
self.connected = True
self.print_error("connected")
self.change_status()
if not self.connected:
return
while self.connected:
self.maybe_ping()
self.send_requests()
response = self.get_response()
if response:
self.process_response(response) self.process_response(response)
def run(self):
s = self.get_socket()
if s:
self.pipe = util.SocketPipe(s)
s.settimeout(0.1)
self.print_error("connected")
# Indicate to parent that we've connected
self.change_status()
while self.is_connected():
self.maybe_ping()
self.send_requests()
self.get_and_process_response()
if self.connected: # Don't shutdown() a closed socket
s.shutdown(socket.SHUT_RDWR)
s.close()
# Also for the s is None case
self.connected = False
# Indicate to parent that the connection is now down
self.change_status() self.change_status()
def change_status(self): def change_status(self):