moving to asyncio
This commit is contained in:
Родитель
320f2f0e2b
Коммит
d9e6ff00f9
|
@ -1,5 +1,4 @@
|
|||
{
|
||||
"python.analysis.completeFunctionParens": true,
|
||||
"python.analysis.diagnosticMode": "workspace",
|
||||
"python.analysis.typeCheckingMode": "basic"
|
||||
}
|
|
@ -1,15 +1,16 @@
|
|||
import threading
|
||||
import random
|
||||
import asyncio
|
||||
|
||||
from typing import Optional, TypeVar, Union, cast
|
||||
|
||||
from .jdconstants import *
|
||||
from .events import *
|
||||
from .packet import *
|
||||
from .transport import Transport
|
||||
from .taskq import TaskQ
|
||||
|
||||
import jacdac.util as util
|
||||
from .util import now
|
||||
from .util import now, log
|
||||
|
||||
|
||||
EV_CHANGE = "change"
|
||||
|
@ -51,13 +52,20 @@ class Bus(EventEmitter):
|
|||
self.unattached_clients: list['Client'] = []
|
||||
self.all_clients: list['Client'] = []
|
||||
self.servers: list['Server'] = []
|
||||
self.taskq = TaskQ()
|
||||
if devid is None:
|
||||
devid = random.randbytes(8).hex()
|
||||
self.self_device = Device(self, devid, bytearray(4))
|
||||
self.process_thread = threading.Thread(target=self._process_task)
|
||||
self.transport = transport
|
||||
|
||||
# self.taskq.recurring(2000, self.debug_dump)
|
||||
|
||||
self.process_thread.start()
|
||||
|
||||
def _process_task(self):
|
||||
self.loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
|
||||
loop = self.loop
|
||||
|
||||
from . import ctrl
|
||||
ctrls = ctrl.CtrlServer(self) # attach control server
|
||||
|
||||
|
@ -65,21 +73,29 @@ class Bus(EventEmitter):
|
|||
self.emit(EV_SELF_ANNOUNCE)
|
||||
self._gc_devices()
|
||||
ctrls.queue_announce()
|
||||
self.taskq.recurring(500, announce)
|
||||
|
||||
# self.taskq.recurring(2000, self.debug_dump)
|
||||
|
||||
self.process_thread.start()
|
||||
loop.call_later(0.500, announce)
|
||||
loop.call_later(0.500, announce)
|
||||
|
||||
from . import sample
|
||||
sample.acc_sample(self)
|
||||
|
||||
def _process_task(self):
|
||||
while True:
|
||||
self.taskq.execute()
|
||||
pkt = self.transport.receive(timeout_ms=self.taskq.sleeptime())
|
||||
if pkt:
|
||||
self.process_packet(JDPacket(frombytes=pkt))
|
||||
def process_bytes(pkt: bytes):
|
||||
self.process_packet(JDPacket(frombytes=pkt))
|
||||
def process_later(pkt: bytes):
|
||||
loop.call_soon_threadsafe(process_bytes, pkt)
|
||||
self.transport.on_receive = process_later
|
||||
|
||||
try:
|
||||
loop.run_forever()
|
||||
finally:
|
||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||
loop.close()
|
||||
|
||||
def force_jd_thread(self):
|
||||
assert threading.current_thread() is self.process_thread
|
||||
|
||||
def force_non_jd_thread(self):
|
||||
assert threading.current_thread() is not self.process_thread
|
||||
|
||||
def debug_dump(self):
|
||||
print("Devices:")
|
||||
|
@ -271,15 +287,15 @@ class RawRegisterClient(EventEmitter):
|
|||
def second_refresh():
|
||||
if prev_data is self._data:
|
||||
self._query()
|
||||
self.bus.taskq.delay(100, final_check)
|
||||
self.bus.loop.call_later(0.100, final_check)
|
||||
|
||||
def first_refresh():
|
||||
if prev_data is self._data:
|
||||
self._query()
|
||||
self.bus.taskq.delay(50, second_refresh)
|
||||
self.bus.loop.call_later(0.050, second_refresh)
|
||||
|
||||
self._query()
|
||||
self.bus.taskq.delay(20, first_refresh)
|
||||
self.bus.loop.call_later(0.020, first_refresh)
|
||||
|
||||
# can't be called from event handlers!
|
||||
def query(self, refresh_ms: int = 500):
|
||||
|
@ -348,8 +364,8 @@ class Server(EventEmitter):
|
|||
pkt = JDPacket(cmd=self.bus.mk_event_cmd(event_code), data=data)
|
||||
def resend(): self.send_report(pkt)
|
||||
resend()
|
||||
self.bus.taskq.delay(20, resend)
|
||||
self.bus.taskq.delay(100, resend)
|
||||
self.bus.loop.call_later(0.020, resend)
|
||||
self.bus.loop.call_later(0.100, resend)
|
||||
|
||||
def send_change_event(self):
|
||||
self.send_event(_JD_EV_CHANGE)
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import threading
|
||||
from typing import Callable, TYPE_CHECKING
|
||||
from .util import now, log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .bus import Bus
|
||||
|
@ -13,24 +12,18 @@ class EventEmitter:
|
|||
self.bus = bus
|
||||
|
||||
def emit(self, id: str, *args: object):
|
||||
self.bus.force_jd_thread()
|
||||
if not hasattr(self, "_listeners"):
|
||||
return
|
||||
fns: list[HandlerFn] = []
|
||||
idx = 0
|
||||
while idx < len(self._listeners):
|
||||
lid, fn, once = self._listeners[idx]
|
||||
if lid == id:
|
||||
fns.append(fn)
|
||||
self.bus.loop.call_soon(fn, *args)
|
||||
if once:
|
||||
del self._listeners[idx]
|
||||
idx -= 1
|
||||
idx += 1
|
||||
for fn in fns:
|
||||
t0 = now()
|
||||
fn(*args)
|
||||
d = now() - t0
|
||||
if d > 100:
|
||||
log("long running handler for '{}'; {}ms", id, d)
|
||||
|
||||
def _init_emitter(self):
|
||||
if not hasattr(self, "_listeners"):
|
||||
|
@ -53,8 +46,14 @@ class EventEmitter:
|
|||
return
|
||||
raise ValueError("no matching on() for off()")
|
||||
|
||||
# usage: await x.event("...")
|
||||
async def event(self, id: str):
|
||||
f = self.bus.loop.create_future()
|
||||
self.once(id, lambda: f.set_result(None))
|
||||
await f
|
||||
|
||||
def wait_for(self, id: str):
|
||||
assert threading.current_thread() is not self.bus.process_thread
|
||||
self.bus.force_non_jd_thread()
|
||||
cv = threading.Condition()
|
||||
happened = False
|
||||
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
import functools
|
||||
import heapq
|
||||
from typing import Callable
|
||||
from .util import now
|
||||
|
||||
class TaskQ:
|
||||
def __init__(self) -> None:
|
||||
self.tasks: list['ProcessTask'] = []
|
||||
|
||||
def _toptask(self):
|
||||
if len(self.tasks):
|
||||
return self.tasks[0]
|
||||
return None
|
||||
|
||||
def execute(self):
|
||||
while True:
|
||||
n = now()
|
||||
t = self._toptask()
|
||||
if not t or t.when > n:
|
||||
break
|
||||
t2 = heapq.heappop(self.tasks)
|
||||
assert t is t2
|
||||
if t.repeat:
|
||||
t.when += t.repeat
|
||||
heapq.heappush(self.tasks, t)
|
||||
t.fn()
|
||||
|
||||
def sleeptime(self):
|
||||
d = 100
|
||||
t = self._toptask()
|
||||
if t:
|
||||
d = t.when - now()
|
||||
if d < 0:
|
||||
d = 0
|
||||
return d
|
||||
|
||||
def delay(self, delta_ms: int, fn: Callable[[], None]):
|
||||
heapq.heappush(self.tasks, ProcessTask(now() + delta_ms, fn))
|
||||
|
||||
def recurring(self, delta_ms: int, fn: Callable[[], None]):
|
||||
t = ProcessTask(now() + delta_ms, fn)
|
||||
t.repeat = delta_ms
|
||||
heapq.heappush(self.tasks, t)
|
||||
|
||||
|
||||
@functools.total_ordering
|
||||
class ProcessTask:
|
||||
def __init__(self, when: int, fn: Callable[[], None]) -> None:
|
||||
self.when = when
|
||||
self.fn = fn
|
||||
self.repeat = 0
|
||||
|
||||
def __lt__(self, other: 'ProcessTask'):
|
||||
return self.when < other.when
|
|
@ -1,9 +1,10 @@
|
|||
from typing import Callable
|
||||
|
||||
|
||||
class Transport:
|
||||
# A base class for packet transports
|
||||
|
||||
def receive(self, timeout_ms: int) -> bytes:
|
||||
# returns the next packet from the packet queue, None is empty
|
||||
raise NotImplementedError
|
||||
on_receive: Callable[[bytes], None]
|
||||
|
||||
def send(self, pkt: bytes) -> None:
|
||||
# send a packet payload over the transport layer
|
||||
|
|
|
@ -1,38 +1,32 @@
|
|||
from typing import List
|
||||
from websocket import WebSocketApp
|
||||
from transport import Transport
|
||||
|
||||
|
||||
class WebSocketTransport(Transport):
|
||||
# A websocket-based transport
|
||||
#
|
||||
|
||||
url: str
|
||||
ws: WebSocketApp
|
||||
pkts: List[bytes]
|
||||
|
||||
def __init__(self, url: str):
|
||||
self.url = url
|
||||
self.open()
|
||||
self.pkts = []
|
||||
|
||||
def open(self) -> None:
|
||||
ws = WebSocketApp(self.url,
|
||||
self.ws = WebSocketApp(self.url,
|
||||
on_open=self.on_open,
|
||||
on_message=self.on_message,
|
||||
on_error=self.on_error,
|
||||
on_close=self.on_close)
|
||||
ws.run_forever()
|
||||
|
||||
def receive(self, timeout_ms: int) -> bytes:
|
||||
# TODO: concurrency
|
||||
return self.pkts.pop(0)
|
||||
self.ws.run_forever() # type: ignore
|
||||
|
||||
def send(self, pkt: bytes) -> None:
|
||||
self.ws.send(pkt)
|
||||
self.ws.send(pkt) # type: ignore
|
||||
|
||||
def on_message(self, ws: WebSocketApp, message: bytes):
|
||||
# TODO: concurrency
|
||||
self.pkts.append(message)
|
||||
if self.on_receive:
|
||||
self.on_receive(message)
|
||||
|
||||
def on_error(self, ws: WebSocketApp, error: str):
|
||||
print(error)
|
||||
|
|
|
@ -4,9 +4,11 @@ build-backend = "setuptools.build_meta"
|
|||
|
||||
[tool.pyright]
|
||||
include = ["jacdac"]
|
||||
pythonVersion = 3.9
|
||||
pythonVersion = 3.7
|
||||
typeCheckingMode = "basic"
|
||||
|
||||
useLibraryCodeForTypes = true
|
||||
|
||||
strictListInference = true
|
||||
strictDictionaryInference = true
|
||||
strictSetInference = true
|
||||
|
|
Загрузка…
Ссылка в новой задаче