interface: send from same thread and simplify timeouts
This commit is contained in:
parent
78f5dbb72e
commit
45fd3ef343
|
@ -59,13 +59,11 @@ class TcpInterface(threading.Thread):
|
||||||
self.debug = False # dump network messages. can be changed at runtime using the console
|
self.debug = False # dump network messages. can be changed at runtime using the console
|
||||||
self.message_id = 0
|
self.message_id = 0
|
||||||
self.response_queue = response_queue
|
self.response_queue = response_queue
|
||||||
self.lock = threading.Lock()
|
self.request_queue = Queue.Queue()
|
||||||
self.unanswered_requests = {}
|
self.unanswered_requests = {}
|
||||||
# request timeouts
|
# request timeouts
|
||||||
self.request_time = False
|
self.response_time = time.time()
|
||||||
# are we waiting for a pong?
|
self.ping_time = time.time()
|
||||||
self.is_ping = False
|
|
||||||
self.ping_time = 0
|
|
||||||
# parse server
|
# parse server
|
||||||
self.server = server
|
self.server = server
|
||||||
self.host, self.port, self.protocol = self.server.split(':')
|
self.host, self.port, self.protocol = self.server.split(':')
|
||||||
|
@ -84,7 +82,6 @@ class TcpInterface(threading.Thread):
|
||||||
result = response.get('result')
|
result = response.get('result')
|
||||||
|
|
||||||
if msg_id is not None:
|
if msg_id is not None:
|
||||||
with self.lock:
|
|
||||||
method, params, _id, queue = self.unanswered_requests.pop(msg_id)
|
method, params, _id, queue = self.unanswered_requests.pop(msg_id)
|
||||||
if queue is None:
|
if queue is None:
|
||||||
queue = self.response_queue
|
queue = self.response_queue
|
||||||
|
@ -108,7 +105,6 @@ class TcpInterface(threading.Thread):
|
||||||
|
|
||||||
if method == 'server.version':
|
if method == 'server.version':
|
||||||
self.server_version = result
|
self.server_version = result
|
||||||
self.is_ping = False
|
|
||||||
return
|
return
|
||||||
|
|
||||||
if error:
|
if error:
|
||||||
|
@ -252,7 +248,13 @@ class TcpInterface(threading.Thread):
|
||||||
return s
|
return s
|
||||||
|
|
||||||
def send_request(self, request, response_queue = None):
|
def send_request(self, request, response_queue = None):
|
||||||
with self.lock:
|
'''Queue a request. Blocking only if called from other threads.'''
|
||||||
|
self.request_queue.put((request, response_queue), threading.current_thread() != self)
|
||||||
|
|
||||||
|
def send_requests(self):
|
||||||
|
'''Sends all queued requests'''
|
||||||
|
while self.is_connected() and not self.request_queue.empty():
|
||||||
|
request, response_queue = self.request_queue.get()
|
||||||
method = request.get('method')
|
method = request.get('method')
|
||||||
params = request.get('params')
|
params = request.get('params')
|
||||||
r = {'id': self.message_id, 'method': method, 'params': params}
|
r = {'id': self.message_id, 'method': method, 'params': params}
|
||||||
|
@ -278,46 +280,32 @@ class TcpInterface(threading.Thread):
|
||||||
self.print_error("stopped")
|
self.print_error("stopped")
|
||||||
|
|
||||||
def maybe_ping(self):
|
def maybe_ping(self):
|
||||||
# ping the server with server.version?
|
# ping the server with server.version
|
||||||
if time.time() - self.ping_time > 60:
|
if time.time() - self.ping_time > 60:
|
||||||
if self.is_ping:
|
|
||||||
self.print_error("ping timeout")
|
|
||||||
self.stop()
|
|
||||||
else:
|
|
||||||
self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]})
|
self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]})
|
||||||
self.is_ping = True
|
|
||||||
self.ping_time = time.time()
|
self.ping_time = time.time()
|
||||||
|
# stop interface if we have been waiting for more than 10 seconds
|
||||||
|
if self.unanswered_requests and time.time() - self.response_time > 10:
|
||||||
|
self.print_error("interface timeout", len(self.unanswered_requests))
|
||||||
|
self.stop()
|
||||||
|
|
||||||
def get_and_process_one_response(self):
|
def get_response(self):
|
||||||
if self.is_connected():
|
if self.is_connected():
|
||||||
try:
|
try:
|
||||||
response = self.pipe.get()
|
response = self.pipe.get()
|
||||||
|
self.response_time = time.time()
|
||||||
except util.timeout:
|
except util.timeout:
|
||||||
if self.unanswered_requests:
|
|
||||||
if self.request_time is False:
|
|
||||||
self.request_time = time.time()
|
|
||||||
self.print_error("setting timer")
|
|
||||||
else:
|
|
||||||
if time.time() - self.request_time > 10:
|
|
||||||
self.print_error("request timeout", len(self.unanswered_requests))
|
|
||||||
self.stop()
|
|
||||||
return
|
return
|
||||||
|
|
||||||
if self.request_time is not False:
|
|
||||||
self.print_error("stopping timer")
|
|
||||||
self.request_time = False
|
|
||||||
|
|
||||||
# If remote side closed the socket, SocketPipe closes our socket and returns None
|
# If remote side closed the socket, SocketPipe closes our socket and returns None
|
||||||
if response is None:
|
if response is None:
|
||||||
self.connected = False
|
self.connected = False
|
||||||
else:
|
return response
|
||||||
self.process_response(response)
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.s = self.get_socket()
|
self.s = self.get_socket()
|
||||||
if self.s:
|
if self.s:
|
||||||
self.pipe = util.SocketPipe(self.s)
|
self.pipe = util.SocketPipe(self.s)
|
||||||
self.s.settimeout(2)
|
self.s.settimeout(0.1)
|
||||||
self.connected = True
|
self.connected = True
|
||||||
self.print_error("connected")
|
self.print_error("connected")
|
||||||
|
|
||||||
|
@ -327,7 +315,10 @@ class TcpInterface(threading.Thread):
|
||||||
|
|
||||||
while self.connected:
|
while self.connected:
|
||||||
self.maybe_ping()
|
self.maybe_ping()
|
||||||
self.get_and_process_one_response()
|
self.send_requests()
|
||||||
|
response = self.get_response()
|
||||||
|
if response:
|
||||||
|
self.process_response(response)
|
||||||
|
|
||||||
self.change_status()
|
self.change_status()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue