lightning: split out into function, reintroduce ForeverCoroutineJob, use same connection if possible

This commit is contained in:
Janus 2017-12-22 18:28:51 +01:00
parent 193f8e2d3e
commit 2dc33f07ab
4 changed files with 104 additions and 65 deletions

View File

@ -24,6 +24,8 @@ import base64
import asyncio
from concurrent.futures import TimeoutError
WALLET = None
NETWORK = None
CONFIG = None
@ -643,9 +645,9 @@ class LightningRPC(ForeverCoroutineJob):
super(LightningRPC, self).__init__()
self.queue = queue.Queue()
# overridden
async def run(self):
async def run(self, is_running):
print("RPC STARTED")
while True:
while is_running():
try:
qitem = self.queue.get(block=False)
except queue.Empty:
@ -697,13 +699,13 @@ class LightningWorker(ForeverCoroutineJob):
deser = bitcoin.deserialize_xpub(wallet().keystore.xpub)
assert deser[0] == "p2wpkh", deser
async def run(self):
async def run(self, is_running):
global WALLET, NETWORK
global CONFIG
wasAlreadyUpToDate = False
while True:
while is_running():
WALLET = self.wallet()
NETWORK = self.network()
CONFIG = self.config()
@ -724,69 +726,74 @@ class LightningWorker(ForeverCoroutineJob):
writer.write(b"MAGIC")
writer.write(privateKeyHash[:6])
await writer.drain()
data = b""
while True:
try:
json.loads(data)
except ValueError:
data += await reader.read(1)
else:
break
obj = json.loads(data)
while is_running():
obj = await readJson(reader, is_running)
if not obj: continue
await readReqAndReply(obj, writer)
except:
traceback.print_exc()
continue
else:
methods = [FetchRootKey
,ConfirmedBalance
,NewAddress
,ListUnspentWitness
,SetHdSeed
,NewRawKey
,FetchInputInfo
,ComputeInputScript
,SignOutputRaw
,PublishTransaction
,LockOutpoint
,UnlockOutpoint
,ListTransactionDetails
,SendOutputs
,IsSynced
,SignMessage]
result = None
found = False
try:
for method in methods:
if method.__name__ == obj["method"]:
params = obj["params"][0]
print("calling method", obj["method"], "with", params)
if asyncio.iscoroutinefunction(method):
result = await method(params)
else:
result = method(params)
found = True
break
except BaseException as e:
traceback.print_exc()
print("exception while calling method", obj["method"])
writer.write(json.dumps({"id":obj["id"],"error": {"code": -32002, "message": str(e)}}).encode("ascii"))
await writer.drain()
async def readJson(reader, is_running):
data = b""
while is_running():
if data != b"": print("parse failed, data has", data)
try:
return json.loads(data)
except ValueError:
try:
data += await asyncio.wait_for(reader.read(2048), 1)
except TimeoutError:
continue
async def readReqAndReply(obj, writer):
methods = [FetchRootKey
,ConfirmedBalance
,NewAddress
,ListUnspentWitness
,SetHdSeed
,NewRawKey
,FetchInputInfo
,ComputeInputScript
,SignOutputRaw
,PublishTransaction
,LockOutpoint
,UnlockOutpoint
,ListTransactionDetails
,SendOutputs
,IsSynced
,SignMessage]
result = None
found = False
try:
for method in methods:
if method.__name__ == obj["method"]:
params = obj["params"][0]
print("calling method", obj["method"], "with", params)
if asyncio.iscoroutinefunction(method):
result = await method(params)
else:
if not found:
writer.write(json.dumps({"id":obj["id"],"error": {"code": -32601, "message": "invalid method"}}).encode("ascii"))
else:
print("result was", result)
if result is None:
result = "{}"
try:
assert type({}) is type(json.loads(result))
except:
traceback.print_exc()
print("wrong method implementation")
writer.write(json.dumps({"id":obj["id"],"error": {"code": -32000, "message": "wrong return type in electrum-lightning-hub"}}).encode("ascii"))
else:
writer.write(json.dumps({"id":obj["id"],"result": result}).encode("ascii"))
await writer.drain()
finally:
if writer: writer.close()
result = method(params)
found = True
break
except BaseException as e:
traceback.print_exc()
print("exception while calling method", obj["method"])
writer.write(json.dumps({"id":obj["id"],"error": {"code": -32002, "message": str(e)}}).encode("ascii"))
await writer.drain()
else:
if not found:
writer.write(json.dumps({"id":obj["id"],"error": {"code": -32601, "message": "invalid method"}}).encode("ascii"))
else:
print("result was", result)
if result is None:
result = "{}"
try:
assert type({}) is type(json.loads(result))
except:
traceback.print_exc()
print("wrong method implementation")
writer.write(json.dumps({"id":obj["id"],"error": {"code": -32000, "message": "wrong return type in electrum-lightning-hub"}}).encode("ascii"))
else:
writer.write(json.dumps({"id":obj["id"],"result": result}).encode("ascii"))
await writer.drain()

View File

@ -1122,9 +1122,11 @@ class Network(util.DaemonThread):
raise
asyncio.ensure_future(job())
run_future = asyncio.Future()
self.run_forever_coroutines()
asyncio.ensure_future(self.run_async(run_future))
loop.run_until_complete(run_future)
assert self.forever_coroutines_task.done()
run_future.exception()
self.print_error("run future result", run_future.result())
loop.close()

View File

@ -32,6 +32,7 @@ import time
import json
import urllib.request, urllib.parse, urllib.error
import queue
import asyncio
from .i18n import _
@ -76,6 +77,15 @@ class PrintError(object):
def print_msg(self, *msg):
print_msg("[%s]" % self.diagnostic_name(), *msg)
class ForeverCoroutineJob(PrintError):
"""A job that is run from a thread's main loop. run() is
called from that thread's context.
"""
async def run(self, is_running):
"""Called once from the thread"""
pass
class CoroutineJob(PrintError):
"""A job that is run periodically from a thread's main loop. run() is
called from that thread's context.
@ -130,11 +140,28 @@ class DaemonThread(threading.Thread, PrintError):
self.job_lock = threading.Lock()
self.jobs = []
self.coroutines = []
self.forever_coroutines_task = None
def add_coroutines(self, jobs):
for i in jobs: assert isinstance(i, CoroutineJob), i.__class__.__name__ + " does not inherit from CoroutineJob"
self.coroutines.extend(jobs)
def set_forever_coroutines(self, jobs):
for i in jobs: assert isinstance(i, ForeverCoroutineJob), i.__class__.__name__ + " does not inherit from ForeverCoroutineJob"
async def put():
await self.forever_coroutines_queue.put(jobs)
asyncio.run_coroutine_threadsafe(put(), self.loop)
def run_forever_coroutines(self):
self.forever_coroutines_queue = asyncio.Queue() # making queue here because __init__ is called from non-network thread
self.loop = asyncio.get_event_loop()
async def getFromQueueAndStart():
jobs = await self.forever_coroutines_queue.get()
await asyncio.gather(*[i.run(self.is_running) for i in jobs])
print("FOREVER JOBS DONE")
self.forever_coroutines_task = asyncio.ensure_future(getFromQueueAndStart())
return self.forever_coroutines_task
async def run_coroutines(self):
for coroutine in self.coroutines:
assert isinstance(coroutine, CoroutineJob)

View File

@ -974,6 +974,9 @@ class Abstract_Wallet(PrintError):
self.verifier = SPV(self.network, self)
self.synchronizer = Synchronizer(self, network)
network.add_coroutines([self.verifier, self.synchronizer])
self.lightning = LightningRPC()
self.lightningworker = LightningWorker(lambda: self, lambda: network, lambda: network.config)
network.set_forever_coroutines([self.lightning, self.lightningworker])
else:
self.verifier = None
self.synchronizer = None