Merge pull request #1205 from kyuupichan/if-cleanup-final
Fix two races in interface.py.
This commit is contained in:
commit
60be02dace
|
@ -17,16 +17,16 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import re, errno, os
|
||||
import copy, re, errno, os
|
||||
import threading, traceback, sys, time, Queue
|
||||
import socket
|
||||
import ssl
|
||||
|
||||
import x509
|
||||
import util
|
||||
import requests
|
||||
ca_path = requests.certs.where()
|
||||
|
||||
import util
|
||||
import x509
|
||||
from version import ELECTRUM_VERSION, PROTOCOL_VERSION
|
||||
from simple_config import SimpleConfig
|
||||
|
||||
|
@ -40,8 +40,7 @@ def Interface(server, response_queue, config = None):
|
|||
- Member functions send_request(), stop(), is_connected()
|
||||
- Member variable server.
|
||||
|
||||
"is_connected()" is currently racy. "server" is constant for the object's lifetime and hence
|
||||
synchronization is unnecessary.
|
||||
"server" is constant for the object's lifetime and hence synchronization is unnecessary.
|
||||
"""
|
||||
host, port, protocol = server.split(':')
|
||||
if protocol in 'st':
|
||||
|
@ -55,7 +54,12 @@ class TcpInterface(threading.Thread):
|
|||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.config = config if config is not None else SimpleConfig()
|
||||
self.connected = False
|
||||
# Set by stop(); no more data is exchanged and the thread exits after gracefully
|
||||
# closing the socket
|
||||
self.disconnect = False
|
||||
# Initially True to avoid a race; set to False on failure to create a socket or when
|
||||
# it is closed
|
||||
self.connected = True
|
||||
self.debug = False # dump network messages. can be changed at runtime using the console
|
||||
self.message_id = 0
|
||||
self.response_queue = response_queue
|
||||
|
@ -250,7 +254,7 @@ class TcpInterface(threading.Thread):
|
|||
def send_request(self, request, response_queue = None):
|
||||
'''Queue a request. Blocking only if called from other threads.'''
|
||||
self.request_time = time.time()
|
||||
self.request_queue.put((request, response_queue), threading.current_thread() != self)
|
||||
self.request_queue.put((copy.deepcopy(request), response_queue), threading.current_thread() != self)
|
||||
|
||||
def send_requests(self):
|
||||
'''Sends all queued requests'''
|
||||
|
@ -271,14 +275,11 @@ class TcpInterface(threading.Thread):
|
|||
self.message_id += 1
|
||||
|
||||
def is_connected(self):
|
||||
return self.connected
|
||||
return self.connected and not self.disconnect
|
||||
|
||||
def stop(self):
|
||||
if self.connected and self.protocol in 'st' and self.s:
|
||||
self.s.shutdown(socket.SHUT_RDWR)
|
||||
self.s.close()
|
||||
self.connected = False
|
||||
self.print_error("stopped")
|
||||
self.disconnect = True
|
||||
self.print_error("disconnecting")
|
||||
|
||||
def maybe_ping(self):
|
||||
# ping the server with server.version
|
||||
|
@ -290,7 +291,7 @@ class TcpInterface(threading.Thread):
|
|||
self.print_error("interface timeout", len(self.unanswered_requests))
|
||||
self.stop()
|
||||
|
||||
def get_response(self):
|
||||
def get_and_process_response(self):
|
||||
if self.is_connected():
|
||||
try:
|
||||
response = self.pipe.get()
|
||||
|
@ -298,28 +299,30 @@ class TcpInterface(threading.Thread):
|
|||
return
|
||||
# If remote side closed the socket, SocketPipe closes our socket and returns None
|
||||
if response is None:
|
||||
self.connected = False
|
||||
return response
|
||||
|
||||
def run(self):
|
||||
self.s = self.get_socket()
|
||||
if self.s:
|
||||
self.pipe = util.SocketPipe(self.s)
|
||||
self.s.settimeout(0.1)
|
||||
self.connected = True
|
||||
self.print_error("connected")
|
||||
|
||||
self.change_status()
|
||||
if not self.connected:
|
||||
return
|
||||
|
||||
while self.connected:
|
||||
self.maybe_ping()
|
||||
self.send_requests()
|
||||
response = self.get_response()
|
||||
if response:
|
||||
self.connected = False # Don't re-close the socket
|
||||
self.print_error("connection closed remotely")
|
||||
else:
|
||||
self.process_response(response)
|
||||
|
||||
def run(self):
|
||||
s = self.get_socket()
|
||||
if s:
|
||||
self.pipe = util.SocketPipe(s)
|
||||
s.settimeout(0.1)
|
||||
self.print_error("connected")
|
||||
# Indicate to parent that we've connected
|
||||
self.change_status()
|
||||
while self.is_connected():
|
||||
self.maybe_ping()
|
||||
self.send_requests()
|
||||
self.get_and_process_response()
|
||||
if self.connected: # Don't shutdown() a closed socket
|
||||
s.shutdown(socket.SHUT_RDWR)
|
||||
s.close()
|
||||
|
||||
# Also for the s is None case
|
||||
self.connected = False
|
||||
# Indicate to parent that the connection is now down
|
||||
self.change_status()
|
||||
|
||||
def change_status(self):
|
||||
|
|
Loading…
Reference in New Issue