From 97b5f6d6a3630748d46d42de1b4033cd7b27303c Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 7 May 2015 08:16:42 +0900 Subject: [PATCH] Fix two races in interface.py. First, close the socket from the thread itself rather than from the stop() function. This prevents another thread closing the socket that the interface thread is simultaneously using. Second, it occasionally would happen that the parent thread such as network.py start() an interface, do a send_request() and timeout waiting for a response (timeouts are 0.1s). It would check is_connected(), get False, and assume the connection has failed. In fact the thread hadn't even been scheduled or gotten around to completing the socket connection. Fix by having self.connected start out True. If the connection fails or times out, we set connected to False soon enough. Finally for correctness we need to deepcopy a send_request() rather than take a reference to it. --- lib/interface.py | 71 +++++++++++++++++++++++++----------------------- 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/lib/interface.py b/lib/interface.py index 9f844ff6..e9102fc9 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -17,16 +17,16 @@ # along with this program. If not, see . -import re, errno, os +import copy, re, errno, os import threading, traceback, sys, time, Queue import socket import ssl -import x509 -import util import requests ca_path = requests.certs.where() +import util +import x509 from version import ELECTRUM_VERSION, PROTOCOL_VERSION from simple_config import SimpleConfig @@ -40,8 +40,7 @@ def Interface(server, response_queue, config = None): - Member functions send_request(), stop(), is_connected() - Member variable server. - "is_connected()" is currently racy. "server" is constant for the object's lifetime and hence - synchronization is unnecessary. + "server" is constant for the object's lifetime and hence synchronization is unnecessary. """ host, port, protocol = server.split(':') if protocol in 'st': @@ -55,7 +54,12 @@ class TcpInterface(threading.Thread): threading.Thread.__init__(self) self.daemon = True self.config = config if config is not None else SimpleConfig() - self.connected = False + # Set by stop(); no more data is exchanged and the thread exits after gracefully + # closing the socket + self.disconnect = False + # Initially True to avoid a race; set to False on failure to create a socket or when + # it is closed + self.connected = True self.debug = False # dump network messages. can be changed at runtime using the console self.message_id = 0 self.response_queue = response_queue @@ -250,7 +254,7 @@ class TcpInterface(threading.Thread): def send_request(self, request, response_queue = None): '''Queue a request. Blocking only if called from other threads.''' self.request_time = time.time() - self.request_queue.put((request, response_queue), threading.current_thread() != self) + self.request_queue.put((copy.deepcopy(request), response_queue), threading.current_thread() != self) def send_requests(self): '''Sends all queued requests''' @@ -271,14 +275,11 @@ class TcpInterface(threading.Thread): self.message_id += 1 def is_connected(self): - return self.connected + return self.connected and not self.disconnect def stop(self): - if self.connected and self.protocol in 'st' and self.s: - self.s.shutdown(socket.SHUT_RDWR) - self.s.close() - self.connected = False - self.print_error("stopped") + self.disconnect = True + self.print_error("disconnecting") def maybe_ping(self): # ping the server with server.version @@ -290,7 +291,7 @@ class TcpInterface(threading.Thread): self.print_error("interface timeout", len(self.unanswered_requests)) self.stop() - def get_response(self): + def get_and_process_response(self): if self.is_connected(): try: response = self.pipe.get() @@ -298,28 +299,30 @@ class TcpInterface(threading.Thread): return # If remote side closed the socket, SocketPipe closes our socket and returns None if response is None: - self.connected = False - return response - - def run(self): - self.s = self.get_socket() - if self.s: - self.pipe = util.SocketPipe(self.s) - self.s.settimeout(0.1) - self.connected = True - self.print_error("connected") - - self.change_status() - if not self.connected: - return - - while self.connected: - self.maybe_ping() - self.send_requests() - response = self.get_response() - if response: + self.connected = False # Don't re-close the socket + self.print_error("connection closed remotely") + else: self.process_response(response) + def run(self): + s = self.get_socket() + if s: + self.pipe = util.SocketPipe(s) + s.settimeout(0.1) + self.print_error("connected") + # Indicate to parent that we've connected + self.change_status() + while self.is_connected(): + self.maybe_ping() + self.send_requests() + self.get_and_process_response() + if self.connected: # Don't shutdown() a closed socket + s.shutdown(socket.SHUT_RDWR) + s.close() + + # Also for the s is None case + self.connected = False + # Indicate to parent that the connection is now down self.change_status() def change_status(self):