Merge pull request #1219 from kyuupichan/network_queues
Have queues passed to the network constructor.
This commit is contained in:
commit
5841f943e8
|
@ -117,9 +117,10 @@ class NetworkServer(util.DaemonThread):
|
||||||
util.DaemonThread.__init__(self)
|
util.DaemonThread.__init__(self)
|
||||||
self.debug = False
|
self.debug = False
|
||||||
self.config = config
|
self.config = config
|
||||||
self.network = Network(config)
|
|
||||||
# network sends responses on that queue
|
# network sends responses on that queue
|
||||||
self.network_queue = Queue.Queue()
|
self.network_queue = Queue.Queue()
|
||||||
|
self.requests_queue = Queue.Queue()
|
||||||
|
self.network = Network(self.requests_queue, self.network_queue, config)
|
||||||
|
|
||||||
self.running = False
|
self.running = False
|
||||||
self.lock = threading.RLock()
|
self.lock = threading.RLock()
|
||||||
|
@ -150,11 +151,11 @@ class NetworkServer(util.DaemonThread):
|
||||||
|
|
||||||
if self.debug:
|
if self.debug:
|
||||||
print_error("-->", request)
|
print_error("-->", request)
|
||||||
self.network.requests_queue.put(request)
|
self.requests_queue.put(request)
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.network.start(self.network_queue)
|
self.network.start()
|
||||||
while self.is_running():
|
while self.is_running():
|
||||||
try:
|
try:
|
||||||
response = self.network_queue.get(timeout=0.1)
|
response = self.network_queue.get(timeout=0.1)
|
||||||
|
|
|
@ -123,7 +123,7 @@ def serialize_server(host, port, protocol):
|
||||||
|
|
||||||
class Network(util.DaemonThread):
|
class Network(util.DaemonThread):
|
||||||
|
|
||||||
def __init__(self, config=None):
|
def __init__(self, requests_queue, response_queue, config=None):
|
||||||
if config is None:
|
if config is None:
|
||||||
config = {} # Do not use mutables as default values!
|
config = {} # Do not use mutables as default values!
|
||||||
util.DaemonThread.__init__(self)
|
util.DaemonThread.__init__(self)
|
||||||
|
@ -132,6 +132,8 @@ class Network(util.DaemonThread):
|
||||||
self.blockchain = Blockchain(self.config, self)
|
self.blockchain = Blockchain(self.config, self)
|
||||||
self.interfaces = {}
|
self.interfaces = {}
|
||||||
self.queue = Queue.Queue()
|
self.queue = Queue.Queue()
|
||||||
|
self.requests_queue = requests_queue
|
||||||
|
self.response_queue = response_queue
|
||||||
# Server for addresses and transactions
|
# Server for addresses and transactions
|
||||||
self.default_server = self.config.get('server')
|
self.default_server = self.config.get('server')
|
||||||
# Sanitize default server
|
# Sanitize default server
|
||||||
|
@ -168,7 +170,6 @@ class Network(util.DaemonThread):
|
||||||
self.unanswered_requests = {}
|
self.unanswered_requests = {}
|
||||||
|
|
||||||
self.connection_status = 'connecting'
|
self.connection_status = 'connecting'
|
||||||
self.requests_queue = Queue.Queue()
|
|
||||||
self.set_proxy(deserialize_proxy(self.config.get('proxy')))
|
self.set_proxy(deserialize_proxy(self.config.get('proxy')))
|
||||||
# retry times
|
# retry times
|
||||||
self.server_retry_time = time.time()
|
self.server_retry_time = time.time()
|
||||||
|
@ -297,9 +298,8 @@ class Network(util.DaemonThread):
|
||||||
for i in range(self.num_server):
|
for i in range(self.num_server):
|
||||||
self.start_random_interface()
|
self.start_random_interface()
|
||||||
|
|
||||||
def start(self, response_queue):
|
def start(self):
|
||||||
self.running = True
|
self.running = True
|
||||||
self.response_queue = response_queue
|
|
||||||
self.start_interfaces()
|
self.start_interfaces()
|
||||||
self.blockchain.start()
|
self.blockchain.start()
|
||||||
util.DaemonThread.start(self)
|
util.DaemonThread.start(self)
|
||||||
|
|
|
@ -54,9 +54,9 @@ class NetworkProxy(util.DaemonThread):
|
||||||
self.pipe = util.SocketPipe(socket)
|
self.pipe = util.SocketPipe(socket)
|
||||||
self.network = None
|
self.network = None
|
||||||
else:
|
else:
|
||||||
self.network = Network(config)
|
self.pipe = util.QueuePipe()
|
||||||
self.pipe = util.QueuePipe(send_queue=self.network.requests_queue)
|
self.network = Network(self.pipe.send_queue, self.pipe.get_queue, config)
|
||||||
self.network.start(self.pipe.get_queue)
|
self.network.start()
|
||||||
for key in ['status','banner','updated','servers','interfaces']:
|
for key in ['status','banner','updated','servers','interfaces']:
|
||||||
value = self.network.get_status_value(key)
|
value = self.network.get_status_value(key)
|
||||||
self.pipe.get_queue.put({'method':'network.status', 'params':[key, value]})
|
self.pipe.get_queue.put({'method':'network.status', 'params':[key, value]})
|
||||||
|
|
Loading…
Reference in New Issue