diff --git a/lib/interface.py b/lib/interface.py index 9f844ff6..e9102fc9 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -17,16 +17,16 @@ # along with this program. If not, see . -import re, errno, os +import copy, re, errno, os import threading, traceback, sys, time, Queue import socket import ssl -import x509 -import util import requests ca_path = requests.certs.where() +import util +import x509 from version import ELECTRUM_VERSION, PROTOCOL_VERSION from simple_config import SimpleConfig @@ -40,8 +40,7 @@ def Interface(server, response_queue, config = None): - Member functions send_request(), stop(), is_connected() - Member variable server. - "is_connected()" is currently racy. "server" is constant for the object's lifetime and hence - synchronization is unnecessary. + "server" is constant for the object's lifetime and hence synchronization is unnecessary. """ host, port, protocol = server.split(':') if protocol in 'st': @@ -55,7 +54,12 @@ class TcpInterface(threading.Thread): threading.Thread.__init__(self) self.daemon = True self.config = config if config is not None else SimpleConfig() - self.connected = False + # Set by stop(); no more data is exchanged and the thread exits after gracefully + # closing the socket + self.disconnect = False + # Initially True to avoid a race; set to False on failure to create a socket or when + # it is closed + self.connected = True self.debug = False # dump network messages. can be changed at runtime using the console self.message_id = 0 self.response_queue = response_queue @@ -250,7 +254,7 @@ class TcpInterface(threading.Thread): def send_request(self, request, response_queue = None): '''Queue a request. Blocking only if called from other threads.''' self.request_time = time.time() - self.request_queue.put((request, response_queue), threading.current_thread() != self) + self.request_queue.put((copy.deepcopy(request), response_queue), threading.current_thread() != self) def send_requests(self): '''Sends all queued requests''' @@ -271,14 +275,11 @@ class TcpInterface(threading.Thread): self.message_id += 1 def is_connected(self): - return self.connected + return self.connected and not self.disconnect def stop(self): - if self.connected and self.protocol in 'st' and self.s: - self.s.shutdown(socket.SHUT_RDWR) - self.s.close() - self.connected = False - self.print_error("stopped") + self.disconnect = True + self.print_error("disconnecting") def maybe_ping(self): # ping the server with server.version @@ -290,7 +291,7 @@ class TcpInterface(threading.Thread): self.print_error("interface timeout", len(self.unanswered_requests)) self.stop() - def get_response(self): + def get_and_process_response(self): if self.is_connected(): try: response = self.pipe.get() @@ -298,28 +299,30 @@ class TcpInterface(threading.Thread): return # If remote side closed the socket, SocketPipe closes our socket and returns None if response is None: - self.connected = False - return response - - def run(self): - self.s = self.get_socket() - if self.s: - self.pipe = util.SocketPipe(self.s) - self.s.settimeout(0.1) - self.connected = True - self.print_error("connected") - - self.change_status() - if not self.connected: - return - - while self.connected: - self.maybe_ping() - self.send_requests() - response = self.get_response() - if response: + self.connected = False # Don't re-close the socket + self.print_error("connection closed remotely") + else: self.process_response(response) + def run(self): + s = self.get_socket() + if s: + self.pipe = util.SocketPipe(s) + s.settimeout(0.1) + self.print_error("connected") + # Indicate to parent that we've connected + self.change_status() + while self.is_connected(): + self.maybe_ping() + self.send_requests() + self.get_and_process_response() + if self.connected: # Don't shutdown() a closed socket + s.shutdown(socket.SHUT_RDWR) + s.close() + + # Also for the s is None case + self.connected = False + # Indicate to parent that the connection is now down self.change_status() def change_status(self):