send() can now block
This commit is contained in:
Michal Moskal 2021-11-15 14:54:30 -08:00
Родитель d9e6ff00f9
Коммит 6517996f5a
3 изменённых файлов: 26 добавлений и 12 удалений

Просмотреть файл

@ -1,6 +1,7 @@
import threading
import random
import asyncio
import queue
from typing import Optional, TypeVar, Union, cast
@ -57,11 +58,20 @@ class Bus(EventEmitter):
self.self_device = Device(self, devid, bytearray(4))
self.process_thread = threading.Thread(target=self._process_task)
self.transport = transport
self._sendq: queue.Queue[bytes] = queue.Queue()
self.sender_thread = threading.Thread(target=self._sender)
self.sender_thread.start()
# self.taskq.recurring(2000, self.debug_dump)
self.process_thread.start()
def _sender(self):
while True:
pkt = self._sendq.get()
self.transport.send(pkt)
def _process_task(self):
self.loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
loop = self.loop

Просмотреть файл

@ -1,10 +1,11 @@
import serial
import sys
import threading
import queue
import random
import struct
from ..transport import Transport
HF2_CMD_INFO = 0x0002
HF2_CMD_DMESG = 0x0010
@ -31,8 +32,8 @@ HF2_EV_JDS_PACKET = 0x800020
class HF2Error(Exception):
pass
class HF2Transport:
def _write(self, buf: bytearray):
class HF2Transport(Transport):
def _write(self, buf: bytes):
frame = bytearray(64)
pos = 0
while True:
@ -46,14 +47,16 @@ class HF2Transport:
frame[0] = HF2_FLAG_CMDPKT_LAST
frame[0] |= l
frame[1:1+l] = buf[pos:pos+l]
self.serial.write(frame)
self.serial.write(frame) # type: ignore
pos += l
def _on_serial(self, buf: bytes, is_error: bool):
self.log("serial: %s" % buf.decode("utf-8"))
def _on_jd_pkt(self, buf: bytes):
self.log("jd: " + buf.hex())
# self.log("jd: " + buf.hex())
if self.on_receive:
self.on_receive(buf)
def _on_event(self, buf: bytes):
(evid,) = struct.unpack("<I", buf[0:4])
@ -63,9 +66,9 @@ class HF2Transport:
self.log("unknown event: 0x%x" % evid)
def _read_loop(self):
frames = []
frames: list[bytes] = []
while True:
buf: bytes = self.serial.read(64)
buf: bytes = self.serial.read(64) # type: ignore
tp = buf[0] & HF2_FLAG_MASK
l = buf[0] & 63
frame = buf[1:1+l]
@ -91,7 +94,7 @@ class HF2Transport:
self.log("Error: %s" % msg)
raise HF2Error("HF2: %s" % msg)
def _talk(self, cmd: int, data = b'') -> bytes:
def _talk(self, cmd: int, data: bytes = b'') -> bytes:
with self._talk_lock:
self._cmd_seq = (self._cmd_seq + 1) & 0xffff
seq = self._cmd_seq
@ -116,12 +119,13 @@ class HF2Transport:
self.log("connected to '%s'" % info.decode("utf8"))
self._talk(HF2_CMD_JDS_CONFIG, struct.pack("<I", 1))
def send_jd_pkt(self, pkt: bytes):
def send(self, pkt: bytes):
self._talk(HF2_CMD_JDS_SEND, pkt)
def __init__(self, portname: str) -> None:
self.serial = serial.Serial(portname, 4_000_000)
self._msgs = queue.Queue()
import serial
self.serial: serial.Serial = serial.Serial(portname, 4_000_000)
self._msgs: queue.Queue[bytes] = queue.Queue()
self._cmd_seq = random.randint(0x1000, 0xffff)
self._talk_lock = threading.Lock()
self._reader_thread = threading.Thread(target=self._read_loop)

Просмотреть файл

@ -1,5 +1,5 @@
from websocket import WebSocketApp
from transport import Transport
from ..transport import Transport
class WebSocketTransport(Transport):