From 0b0e6186c6eab1d43f5e82611eeb3b317adae235 Mon Sep 17 00:00:00 2001 From: slush0 Date: Thu, 31 Mar 2016 20:13:06 +0200 Subject: [PATCH] Added uasyncio Added playground module --- emu.sh | 3 +- src/main.py | 3 +- src/playground/__init__.py | 93 +++++++++++++++++ src/playground/utils.py | 2 + src/uasyncio/__init__.py | 206 +++++++++++++++++++++++++++++++++++++ src/uasyncio/core.py | 187 +++++++++++++++++++++++++++++++++ src/uasyncio/queues.py | 94 +++++++++++++++++ 7 files changed, 586 insertions(+), 2 deletions(-) create mode 100644 src/playground/__init__.py create mode 100644 src/playground/utils.py create mode 100644 src/uasyncio/__init__.py create mode 100644 src/uasyncio/core.py create mode 100644 src/uasyncio/queues.py diff --git a/emu.sh b/emu.sh index 6f933ab0..04389332 100755 --- a/emu.sh +++ b/emu.sh @@ -1,3 +1,4 @@ #!/bin/bash cd `dirname $0`/src -../vendor/micropython/unix/micropython main.py + +../vendor/micropython/unix/micropython -O0 -X heapsize=100000 main.py diff --git a/src/main.py b/src/main.py index d3bdc064..90fdb11d 100644 --- a/src/main.py +++ b/src/main.py @@ -2,4 +2,5 @@ import layout layout.show_send('1BitkeyP2nDd5oa64x7AjvBbbwST54W5Zmx2', 110.126967) -while True: pass +import playground +playground.run() diff --git a/src/playground/__init__.py b/src/playground/__init__.py new file mode 100644 index 00000000..87c15582 --- /dev/null +++ b/src/playground/__init__.py @@ -0,0 +1,93 @@ +# import time +import gc +import utime +import logging +import uasyncio +from uasyncio import core + +from TrezorUi import Display + +from . import utils + +d = Display() +logging.basicConfig(level=logging.INFO) +loop = uasyncio.get_event_loop() + +if __debug__: + def meminfo(): + mem_free = gc.mem_free() + gc.collect() + print("free_mem: %s/%s, collect: %s" % (mem_free, gc.mem_free(), gc.collect())) + loop.call_later(1, meminfo) + + # meminfo() + +def animate(col): + + col %= 0xff + col += 0x0f + + f = open('../assets/lock.toi', 'r') + d.icon(10, 170, f.read(), utils.rgb2color(0, col, 0), 0xffff) + + loop.call_later(0.5, animate, col) + +def animate2(col): + + col %= 0xff + col += 0x0f + + # yield True + f = open('../assets/lock.toi', 'r') + d.icon(170, 170, f.read(), utils.rgb2color(col, 0, 0), 0xffff) + + loop.call_later(0.1, animate2, col) + +sec = 0 +event = None +def sekunda(x): + global sec + print('Sekunda %d' % sec) + + + if sec == x: + loop.call_soon(loop.button_cb, 'levy') + loop.button_cb = None + + sec += 1 + loop.call_later(1, sekunda, x) + + + # global event + # event = wait_for() + # event.__next__() + + +def wait_for(): + print("Jsem tady") + + ktery = yield core.IOButton() + print(ktery) + + print("Po cekani na event") + +def run(): + ''' + d = Display() + d.bar(0, 0, 240, 240, 0) + + f = open('../assets/trezor.toi', 'r') + d.image(0, 0, f.read()) + ''' + + # logging.basicConfig(level=logging.INFO) + + sekunda(3) + + loop.call_soon(animate, 0x0000) + loop.call_soon(animate2, 0x00ff) + + loop.call_soon(wait_for()) + + loop.run_forever() + loop.close() diff --git a/src/playground/utils.py b/src/playground/utils.py new file mode 100644 index 00000000..3345c0f8 --- /dev/null +++ b/src/playground/utils.py @@ -0,0 +1,2 @@ +def rgb2color(r, g, b): + return ((r & 0xF8) << 8) | ((g & 0xFC) << 3) | ((b & 0xF8) >> 3) \ No newline at end of file diff --git a/src/uasyncio/__init__.py b/src/uasyncio/__init__.py new file mode 100644 index 00000000..504f1e92 --- /dev/null +++ b/src/uasyncio/__init__.py @@ -0,0 +1,206 @@ +import errno +import uselect as select +import usocket as _socket +from uasyncio.core import * + + +class EpollEventLoop(EventLoop): + + def __init__(self): + EventLoop.__init__(self) + self.poller = select.poll() + self.objmap = {} + + def add_reader(self, fd, cb, *args): + if __debug__: + log.debug("add_reader%s", (fd, cb, args)) + if args: + self.poller.register(fd, select.POLLIN) + self.objmap[fd] = (cb, args) + else: + self.poller.register(fd, select.POLLIN) + self.objmap[fd] = cb + + def remove_reader(self, fd): + if __debug__: + log.debug("remove_reader(%s)", fd) + self.poller.unregister(fd) + del self.objmap[fd] + + def add_writer(self, fd, cb, *args): + if __debug__: + log.debug("add_writer%s", (fd, cb, args)) + if args: + self.poller.register(fd, select.POLLOUT) + self.objmap[fd] = (cb, args) + else: + self.poller.register(fd, select.POLLOUT) + self.objmap[fd] = cb + + def remove_writer(self, fd): + if __debug__: + log.debug("remove_writer(%s)", fd) + try: + self.poller.unregister(fd) + self.objmap.pop(fd, None) + except OSError as e: + # StreamWriter.awrite() first tries to write to an fd, + # and if that succeeds, yield IOWrite may never be called + # for that fd, and it will never be added to poller. So, + # ignore such error. + if e.args[0] != errno.ENOENT: + raise + + def wait(self, delay): + if __debug__: + log.debug("epoll.wait(%d)", delay) + # We need one-shot behavior (second arg of 1 to .poll()) + if delay == -1: + res = self.poller.poll(-1, 1) + else: + res = self.poller.poll(int(delay * 1000), 1) + #log.debug("epoll result: %s", res) + for fd, ev in res: + cb = self.objmap[fd] + if __debug__: + log.debug("Calling IO callback: %r", cb) + if isinstance(cb, tuple): + cb[0](*cb[1]) + else: + self.call_soon(cb) + + +class StreamReader: + + def __init__(self, s): + self.s = s + + def read(self, n=-1): + yield IORead(self.s) + while True: + res = self.s.read(n) + if res is not None: + break + log.warn("Empty read") + if not res: + yield IOReadDone(self.s) + return res + + def readline(self): + if __debug__: + log.debug("StreamReader.readline()") + yield IORead(self.s) +# if __debug__: +# log.debug("StreamReader.readline(): after IORead: %s", s) + while True: + res = self.s.readline() + if res is not None: + break + log.warn("Empty read") + if not res: + yield IOReadDone(self.s) + if __debug__: + log.debug("StreamReader.readline(): res: %s", res) + return res + + def aclose(self): + yield IOReadDone(self.s) + self.s.close() + + def __repr__(self): + return "" % self.s + + +class StreamWriter: + + def __init__(self, s, extra): + self.s = s + self.extra = extra + + def awrite(self, buf): + # This method is called awrite (async write) to not proliferate + # incompatibility with original asyncio. Unlike original asyncio + # whose .write() method is both not a coroutine and guaranteed + # to return immediately (which means it has to buffer all the + # data), this method is a coroutine. + sz = len(buf) + if __debug__: + log.debug("StreamWriter.awrite(): spooling %d bytes", sz) + while True: + res = self.s.write(buf) + # If we spooled everything, return immediately + if res == sz: + if __debug__: + log.debug("StreamWriter.awrite(): completed spooling %d bytes", res) + return + if res is None: + res = 0 + if __debug__: + log.debug("StreamWriter.awrite(): spooled partial %d bytes", res) + assert res < sz + buf = buf[res:] + sz -= res + yield IOWrite(self.s) + #assert s2.fileno() == self.s.fileno() + if __debug__: + log.debug("StreamWriter.awrite(): can write more") + + def aclose(self): + yield IOWriteDone(self.s) + self.s.close() + + def get_extra_info(self, name, default=None): + return self.extra.get(name, default) + + def __repr__(self): + return "" % self.s + + +def open_connection(host, port): + if __debug__: + log.debug("open_connection(%s, %s)", host, port) + s = _socket.socket() + s.setblocking(False) + ai = _socket.getaddrinfo(host, port) + addr = ai[0][4] + try: + s.connect(addr) + except OSError as e: + if e.args[0] != errno.EINPROGRESS: + raise + if __debug__: + log.debug("open_connection: After connect") + yield IOWrite(s) +# if __debug__: +# assert s2.fileno() == s.fileno() + if __debug__: + log.debug("open_connection: After iowait: %s", s) + return StreamReader(s), StreamWriter(s, {}) + + +def start_server(client_coro, host, port, backlog=10): + log.debug("start_server(%s, %s)", host, port) + s = _socket.socket() + s.setblocking(False) + + ai = _socket.getaddrinfo(host, port) + addr = ai[0][4] + s.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1) + s.bind(addr) + s.listen(backlog) + while True: + if __debug__: + log.debug("start_server: Before accept") + yield IORead(s) + if __debug__: + log.debug("start_server: After iowait") + s2, client_addr = s.accept() + s2.setblocking(False) + if __debug__: + log.debug("start_server: After accept: %s", s2) + extra = {"peername": client_addr} + yield client_coro(StreamReader(s2), StreamWriter(s2, extra)) + + +import uasyncio.core +uasyncio.core._event_loop_class = EpollEventLoop diff --git a/src/uasyncio/core.py b/src/uasyncio/core.py new file mode 100644 index 00000000..62b5b68f --- /dev/null +++ b/src/uasyncio/core.py @@ -0,0 +1,187 @@ +try: + import utime as time +except ImportError: + import time +import uheapq as heapq +import logging + + +log = logging.getLogger("asyncio") + +type_gen = type((lambda: (yield))()) + +class EventLoop: + + def __init__(self): + self.q = [] + self.cnt = 0 + self.button_cb = None + + def time(self): + return time.time() + + def create_task(self, coro): + # CPython 3.4.2 + self.call_at(0, coro) + # CPython asyncio incompatibility: we don't return Task object + + def call_soon(self, callback, *args): + self.call_at(0, callback, *args) + + def call_later(self, delay, callback, *args): + self.call_at(self.time() + delay, callback, *args) + + def call_at(self, time, callback, *args): + # Including self.cnt is a workaround per heapq docs + if __debug__: + log.debug("Scheduling %s", (time, self.cnt, callback, args)) + heapq.heappush(self.q, (time, self.cnt, callback, args)) +# print(self.q) + self.cnt += 1 + + def wait(self, delay): + # Default wait implementation, to be overriden in subclasses + # with IO scheduling + log.debug("Sleeping for: %s", delay) + time.sleep(delay) + + def run_forever(self): + while True: + if self.q: + t, cnt, cb, args = heapq.heappop(self.q) + if __debug__: + log.debug("Next coroutine to run: %s", (t, cnt, cb, args)) +# __main__.mem_info() + tnow = self.time() + delay = t - tnow + if delay > 0: + self.wait(delay) + else: + self.wait(-1) + # Assuming IO completion scheduled some tasks + continue + if callable(cb): + cb(*args) + else: + delay = 0 + try: + if args == (): + args = (None,) + if __debug__: + log.debug("Coroutine %s send args: %s", cb, args) + ret = cb.send(*args) + if __debug__: + log.debug("Coroutine %s yield result: %s", cb, ret) + if isinstance(ret, SysCall1): + arg = ret.arg + if isinstance(ret, Sleep): + delay = arg + elif isinstance(ret, IORead): +# self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj) +# self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj) +# self.add_reader(arg.fileno(), lambda cb: self.call_soon(cb), cb) + self.add_reader(arg.fileno(), cb) + continue + elif isinstance(ret, IOWrite): +# self.add_writer(arg.fileno(), lambda cb: self.call_soon(cb), cb) + self.add_writer(arg.fileno(), cb) + continue + elif isinstance(ret, IOReadDone): + self.remove_reader(arg.fileno()) + elif isinstance(ret, IOWriteDone): + self.remove_writer(arg.fileno()) + elif isinstance(ret, StopLoop): + return arg + elif isinstance(ret, IOButton): + print("TADY") + self.button_cb = cb + continue + elif isinstance(ret, type_gen): + self.call_soon(ret) + elif ret is None: + # Just reschedule + pass + else: + assert False, "Unsupported coroutine yield value: %r (of type %r)" % (ret, type(ret)) + except StopIteration as e: + if __debug__: + log.debug("Coroutine finished: %s", cb) + continue + self.call_later(delay, cb, *args) + + def run_until_complete(self, coro): + def _run_and_stop(): + yield from coro + yield StopLoop(0) + self.call_soon(_run_and_stop()) + self.run_forever() + + def close(self): + pass + + +class SysCall: + + def __init__(self, *args): + self.args = args + + def handle(self): + raise NotImplementedError + +# Optimized syscall with 1 arg +class SysCall1(SysCall): + + def __init__(self, arg): + self.arg = arg + +class IOButton(SysCall): + pass + +class Sleep(SysCall1): + pass + +class StopLoop(SysCall1): + pass + +class IORead(SysCall1): + pass + +class IOWrite(SysCall1): + pass + +class IOReadDone(SysCall1): + pass + +class IOWriteDone(SysCall1): + pass + + +_event_loop = None +_event_loop_class = EventLoop +def get_event_loop(): + global _event_loop + if _event_loop is None: + _event_loop = _event_loop_class() + return _event_loop + +def sleep(secs): + yield Sleep(secs) + +def coroutine(f): + return f + +# +# The functions below are deprecated in uasyncio, and provided only +# for compatibility with CPython asyncio +# + +def async(coro, loop=_event_loop): + _event_loop.call_soon(coro) + # CPython asyncio incompatibility: we don't return Task object + return coro + + +# CPython asyncio incompatibility: Task is a function, not a class (for efficiency) +def Task(coro, loop=_event_loop): + # Same as async() + _event_loop.call_soon(coro) diff --git a/src/uasyncio/queues.py b/src/uasyncio/queues.py new file mode 100644 index 00000000..4a8ae5fe --- /dev/null +++ b/src/uasyncio/queues.py @@ -0,0 +1,94 @@ +from collections.deque import deque +from uasyncio.core import sleep + + +class QueueEmpty(Exception): + """Exception raised by get_nowait().""" + + +class QueueFull(Exception): + """Exception raised by put_nowait().""" + + +class Queue: + """A queue, useful for coordinating producer and consumer coroutines. + + If maxsize is less than or equal to zero, the queue size is infinite. If it + is an integer greater than 0, then "yield from put()" will block when the + queue reaches maxsize, until an item is removed by get(). + + Unlike the standard library Queue, you can reliably know this Queue's size + with qsize(), since your single-threaded uasyncio application won't be + interrupted between calling qsize() and doing an operation on the Queue. + """ + _attempt_delay = 0.1 + + def __init__(self, maxsize=0): + self.maxsize = maxsize + self._queue = deque() + + def _get(self): + return self._queue.popleft() + + def get(self): + """Returns generator, which can be used for getting (and removing) + an item from a queue. + + Usage:: + + item = yield from queue.get() + """ + while not self._queue: + yield from sleep(self._attempt_delay) + return self._get() + + def get_nowait(self): + """Remove and return an item from the queue. + + Return an item if one is immediately available, else raise QueueEmpty. + """ + if not self._queue: + raise QueueEmpty() + return self._get() + + def _put(self, val): + self._queue.append(val) + + def put(self, val): + """Returns generator which can be used for putting item in a queue. + + Usage:: + + yield from queue.put(item) + """ + while self.qsize() > self.maxsize and self.maxsize: + yield from sleep(self._attempt_delay) + self._put(val) + + def put_nowait(self, val): + """Put an item into the queue without blocking. + + If no free slot is immediately available, raise QueueFull. + """ + if self.qsize() >= self.maxsize and self.maxsize: + raise QueueFull() + self._put(val) + + def qsize(self): + """Number of items in the queue.""" + return len(self._queue) + + def empty(self): + """Return True if the queue is empty, False otherwise.""" + return not self._queue + + def full(self): + """Return True if there are maxsize items in the queue. + + Note: if the Queue was initialized with maxsize=0 (the default), + then full() is never True. + """ + if self.maxsize <= 0: + return False + else: + return self.qsize() >= self.maxsize