This commit is contained in:
pelikhan 2021-11-18 08:22:11 -08:00
Родитель efa07d8861
Коммит 91151b18f4
51 изменённых файлов: 607 добавлений и 198 удалений

Просмотреть файл

@ -0,0 +1,134 @@
from jacdac.bus import Bus, Client
from .constants import *
from typing import Union
from jacdac.events import HandlerFn
class AccelerometerClient(Client):
"""
A 3-axis accelerometer.
"""
def __init__(self, bus: Bus, role: str) -> None:
super().__init__(bus, JD_SERVICE_CLASS_ACCELEROMETER, JD_ACCELEROMETER_PACK_FORMATS, role)
@property
def x(self) -> Union[float, None]:
"""
Indicates the current forces acting on accelerometer., g
"""
reg = self.register(JD_ACCELEROMETER_REG_FORCES)
return reg.value(0)
@property
def y(self) -> Union[float, None]:
"""
Indicates the current forces acting on accelerometer., g
"""
reg = self.register(JD_ACCELEROMETER_REG_FORCES)
return reg.value(1)
@property
def z(self) -> Union[float, None]:
"""
Indicates the current forces acting on accelerometer., g
"""
reg = self.register(JD_ACCELEROMETER_REG_FORCES)
return reg.value(2)
@property
def forces_error(self) -> Union[float, None]:
"""
(Optional) Error on the reading value., g
"""
reg = self.register(JD_ACCELEROMETER_REG_FORCES_ERROR)
return reg.value(0)
@property
def max_force(self) -> Union[float, None]:
"""
(Optional) Configures the range forces detected.
The value will be "rounded up" to one of `max_forces_supported`., g
"""
reg = self.register(JD_ACCELEROMETER_REG_MAX_FORCE)
return reg.value(0)
@max_force.setter
def max_force(self, value: float) -> None:
reg = self.register(JD_ACCELEROMETER_REG_MAX_FORCE)
reg.set_value(0, value)
def on_tilt_up(self, handler: HandlerFn) -> None:
"""
Emitted when accelerometer is tilted in the given direction.
"""
# TODO
def on_tilt_down(self, handler: HandlerFn) -> None:
"""
Emitted when accelerometer is tilted in the given direction.
"""
# TODO
def on_tilt_left(self, handler: HandlerFn) -> None:
"""
Emitted when accelerometer is tilted in the given direction.
"""
# TODO
def on_tilt_right(self, handler: HandlerFn) -> None:
"""
Emitted when accelerometer is tilted in the given direction.
"""
# TODO
def on_face_up(self, handler: HandlerFn) -> None:
"""
Emitted when accelerometer is laying flat in the given direction.
"""
# TODO
def on_face_down(self, handler: HandlerFn) -> None:
"""
Emitted when accelerometer is laying flat in the given direction.
"""
# TODO
def on_freefall(self, handler: HandlerFn) -> None:
"""
Emitted when total force acting on accelerometer is much less than 1g.
"""
# TODO
def on_shake(self, handler: HandlerFn) -> None:
"""
Emitted when forces change violently a few times.
"""
# TODO
def on_force_2g(self, handler: HandlerFn) -> None:
"""
Emitted when force in any direction exceeds given threshold.
"""
# TODO
def on_force_3g(self, handler: HandlerFn) -> None:
"""
Emitted when force in any direction exceeds given threshold.
"""
# TODO
def on_force_6g(self, handler: HandlerFn) -> None:
"""
Emitted when force in any direction exceeds given threshold.
"""
# TODO
def on_force_8g(self, handler: HandlerFn) -> None:
"""
Emitted when force in any direction exceeds given threshold.
"""
# TODO

Просмотреть файл

@ -30,7 +30,7 @@ class ArcadeSoundClient(Client):
@property
def buffer_size(self) -> Union[float, None]:
def buffer_size(self) -> Union[int, None]:
"""
The size of the internal audio buffer., B
"""
@ -38,7 +38,7 @@ class ArcadeSoundClient(Client):
return reg.value(0)
@property
def buffer_pending(self) -> Union[float, None]:
def buffer_pending(self) -> Union[int, None]:
"""
How much data is still left in the buffer to play.
Clients should not send more data than `buffer_size - buffer_pending`,
@ -53,5 +53,5 @@ class ArcadeSoundClient(Client):
"""
Play samples, which are single channel, signed 16-bit little endian values.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_arcade_Sound_CMD_play, "b", [samples]))
self.send_cmd_packed(JD_ARCADE_SOUND_CMD_PLAY, [samples])

Просмотреть файл

@ -53,11 +53,11 @@ class AzureIotHubHealthClient(Client):
"""
Starts a connection to the IoT hub service
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_azure_Iot_Hub_Health_CMD_connect))
self.send_cmd_packed(JD_AZURE_IOT_HUB_HEALTH_CMD_CONNECT, [])
def disconnect(self, ) -> None:
"""
Starts disconnecting from the IoT hub service
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_azure_Iot_Hub_Health_CMD_disconnect))
self.send_cmd_packed(JD_AZURE_IOT_HUB_HEALTH_CMD_DISCONNECT, [])

Просмотреть файл

@ -0,0 +1,31 @@
from jacdac.bus import Bus, Client
from .constants import *
from typing import Union
class BarometerClient(Client):
"""
A sensor measuring air pressure of outside environment.
"""
def __init__(self, bus: Bus, role: str) -> None:
super().__init__(bus, JD_SERVICE_CLASS_BAROMETER, JD_BAROMETER_PACK_FORMATS, role)
@property
def pressure(self) -> Union[float, None]:
"""
The air pressure., hPa
"""
reg = self.register(JD_BAROMETER_REG_PRESSURE)
return reg.value(0)
@property
def pressure_error(self) -> Union[float, None]:
"""
The real pressure is between `pressure - pressure_error` and `pressure + pressure_error`., hPa
"""
reg = self.register(JD_BAROMETER_REG_PRESSURE_ERROR)
return reg.value(0)

Просмотреть файл

@ -27,7 +27,7 @@ class BitRadioClient(Client):
@property
def group(self) -> Union[float, None]:
def group(self) -> Union[int, None]:
"""
Group used to filter packets
"""
@ -35,13 +35,13 @@ class BitRadioClient(Client):
return reg.value(0)
@group.setter
def group(self, value: float) -> None:
def group(self, value: int) -> None:
reg = self.register(JD_BIT_RADIO_REG_GROUP)
reg.set_value(0, value)
@property
def transmission_power(self) -> Union[float, None]:
def transmission_power(self) -> Union[int, None]:
"""
Antenna power to increase or decrease range.
"""
@ -49,13 +49,13 @@ class BitRadioClient(Client):
return reg.value(0)
@transmission_power.setter
def transmission_power(self, value: float) -> None:
def transmission_power(self, value: int) -> None:
reg = self.register(JD_BIT_RADIO_REG_TRANSMISSION_POWER)
reg.set_value(0, value)
@property
def frequency_band(self) -> Union[float, None]:
def frequency_band(self) -> Union[int, None]:
"""
Change the transmission and reception band of the radio to the given channel.
"""
@ -63,7 +63,7 @@ class BitRadioClient(Client):
return reg.value(0)
@frequency_band.setter
def frequency_band(self, value: float) -> None:
def frequency_band(self, value: int) -> None:
reg = self.register(JD_BIT_RADIO_REG_FREQUENCY_BAND)
reg.set_value(0, value)
@ -73,23 +73,23 @@ class BitRadioClient(Client):
"""
Sends a string payload as a radio message, maximum 18 characters.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_bit_Radio_CMD_send_string, "s", [message]))
self.send_cmd_packed(JD_BIT_RADIO_CMD_SEND_STRING, [message])
def send_number(self, value: float) -> None:
"""
Sends a double precision number payload as a radio message
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_bit_Radio_CMD_send_number, "f64", [value]))
self.send_cmd_packed(JD_BIT_RADIO_CMD_SEND_NUMBER, [value])
def send_value(self, value: float, name: str) -> None:
"""
Sends a double precision number and a name payload as a radio message
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_bit_Radio_CMD_send_value, "f64 s", [value, name]))
self.send_cmd_packed(JD_BIT_RADIO_CMD_SEND_VALUE, [value, name])
def send_buffer(self, data: bytes) -> None:
"""
Sends a payload of bytes as a radio message
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_bit_Radio_CMD_send_buffer, "b", [data]))
self.send_cmd_packed(JD_BIT_RADIO_CMD_SEND_BUFFER, [data])

Просмотреть файл

@ -41,7 +41,7 @@ class BrailleDisplayClient(Client):
@property
def length(self) -> Union[float, None]:
def length(self) -> Union[int, None]:
"""
Gets the number of patterns that can be displayed., #
"""

Просмотреть файл

@ -15,6 +15,7 @@ from .transport import Transport
import jacdac.util as util
from .util import now, log, logv, unpack
from .control.constants import *
from pack import PackType, jdpack
EV_CHANGE = "change"
@ -564,8 +565,16 @@ class Client(EventEmitter):
pkt._header[3] |= JD_FRAME_FLAG_COMMAND
self.bus._send_core(pkt)
def on_attach(self):
pass
def send_cmd_packed(self, cmd: int, args: PackType = None):
if args is None:
pkt = JDPacket(cmd=cmd)
else:
fmt = self.pack_formats[cmd]
if fmt is None:
raise RuntimeError("unknown data format")
data = jdpack(fmt, args)
pkt = JDPacket(cmd=cmd, data=data)
self.send_cmd(pkt)
def _attach(self, dev: 'Device', service_idx: int):
assert self.device is None

Просмотреть файл

@ -27,16 +27,16 @@ class BuzzerClient(Client):
def play_tone(self, period: float, duty: float, duration: float) -> None:
def play_tone(self, period: int, duty: int, duration: int) -> None:
"""
Play a PWM tone with given period and duty for given duration.
The duty is scaled down with `volume` register.
To play tone at frequency `F` Hz and volume `V` (in `0..1`) you will want
to send `P = 1000000 / F` and `D = P * V / 2`.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_buzzer_CMD_play_tone, "u16 u16 u16", [period, duty, duration]))
self.send_cmd_packed(JD_BUZZER_CMD_PLAY_TONE, [period, duty, duration])
def play_note(self, frequency: float, volume: float, duration: float) -> None:
def play_note(self, frequency: int, volume: float, duration: int) -> None:
"""
Play a note at the given frequency and volume.
"""

Просмотреть файл

@ -32,5 +32,5 @@ class CapacitiveButtonClient(Client):
Request to calibrate the capactive. When calibration is requested, the device expects that no object is touching the button.
The report indicates the calibration is done.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_capacitive_Button_CMD_calibrate))
self.send_cmd_packed(JD_CAPACITIVE_BUTTON_CMD_CALIBRATE, [])

Просмотреть файл

@ -63,7 +63,7 @@ class CharacterScreenClient(Client):
@property
def rows(self) -> Union[float, None]:
def rows(self) -> Union[int, None]:
"""
Gets the number of rows., #
"""
@ -71,7 +71,7 @@ class CharacterScreenClient(Client):
return reg.value(0)
@property
def columns(self) -> Union[float, None]:
def columns(self) -> Union[int, None]:
"""
Gets the number of columns., #
"""
@ -79,15 +79,15 @@ class CharacterScreenClient(Client):
return reg.value(0)
def set_line(self, index: float, message: str) -> None:
def set_line(self, index: int, message: str) -> None:
"""
Overrides the content of a single line at a 0-based index.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_character_Screen_CMD_set_line, "u16 s", [index, message]))
self.send_cmd_packed(JD_CHARACTER_SCREEN_CMD_SET_LINE, [index, message])
def clear(self, ) -> None:
"""
Clears all text from the display.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_character_Screen_CMD_clear))
self.send_cmd_packed(JD_CHARACTER_SCREEN_CMD_CLEAR, [])

Просмотреть файл

@ -22,9 +22,9 @@ class CodalMessageBusClient(Client):
# TODO
def send(self, source: float, value: float) -> None:
def send(self, source: int, value: int) -> None:
"""
Send a message on the CODAL bus. If `source` is `0`, it is treated as wildcard.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_codal_Message_Bus_CMD_send, "u16 u16", [source, value]))
self.send_cmd_packed(JD_CODAL_MESSAGE_BUS_CMD_SEND, [source, value])

Просмотреть файл

@ -47,5 +47,5 @@ class CompassClient(Client):
"""
Starts a calibration sequence for the compass.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_compass_CMD_calibrate))
self.send_cmd_packed(JD_COMPASS_CMD_CALIBRATE, [])

Просмотреть файл

@ -31,5 +31,5 @@ class DmxClient(Client):
"""
Send a DMX packet, up to 236bytes long, including the start code.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_dmx_CMD_send, "b", [channels]))
self.send_cmd_packed(JD_DMX_CMD_SEND, [channels])

Просмотреть файл

@ -42,7 +42,7 @@ class DotMatrixClient(Client):
@property
def rows(self) -> Union[float, None]:
def rows(self) -> Union[int, None]:
"""
Number of rows on the screen, #
"""
@ -50,7 +50,7 @@ class DotMatrixClient(Client):
return reg.value(0)
@property
def columns(self) -> Union[float, None]:
def columns(self) -> Union[int, None]:
"""
Number of columns on the screen, #
"""

Просмотреть файл

@ -45,7 +45,7 @@ class ECO2Client(Client):
return reg.value(0)
@property
def conditioning_period(self) -> Union[float, None]:
def conditioning_period(self) -> Union[int, None]:
"""
(Optional) Time required to achieve good sensor stability before measuring after long idle period., s
"""

Просмотреть файл

@ -13,7 +13,7 @@ class HidAdapterClient(Client):
@property
def num_configurations(self) -> Union[float, None]:
def num_configurations(self) -> Union[int, None]:
"""
The number of configurations stored on the server.
"""
@ -21,13 +21,13 @@ class HidAdapterClient(Client):
return reg.value(0)
@num_configurations.setter
def num_configurations(self, value: float) -> None:
def num_configurations(self, value: int) -> None:
reg = self.register(JD_HID_ADAPTER_REG_NUM_CONFIGURATIONS)
reg.set_value(0, value)
@property
def current_configuration(self) -> Union[float, None]:
def current_configuration(self) -> Union[int, None]:
"""
The current configuration the server is using.
"""
@ -35,7 +35,7 @@ class HidAdapterClient(Client):
return reg.value(0)
@current_configuration.setter
def current_configuration(self, value: float) -> None:
def current_configuration(self, value: int) -> None:
reg = self.register(JD_HID_ADAPTER_REG_CURRENT_CONFIGURATION)
reg.set_value(0, value)
@ -47,27 +47,27 @@ class HidAdapterClient(Client):
# TODO
def set_binding(self, configuration_number: float, binding_index: float, padding: float, device_id: float, service_class: float, trigger_value: float, trigger_context: float, service_index: float, selector: float, modifiers: undefined) -> None:
def set_binding(self, configuration_number: int, binding_index: int, padding: int, device_id: int, service_class: int, trigger_value: int, trigger_context: int, service_index: int, selector: int, modifiers: undefined) -> None:
"""
Stores the given binding on the server. If a binding exists at this index, the new binding will replace it.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_hid_Adapter_CMD_set_binding, "u8 u8 b[2] u64 u32 u32 u8 u8 u16 u16", [configuration_number, binding_index, padding, device_id, service_class, trigger_value, trigger_context, service_index, selector, modifiers]))
self.send_cmd_packed(JD_HID_ADAPTER_CMD_SET_BINDING, [configuration_number, binding_index, padding, device_id, service_class, trigger_value, trigger_context, service_index, selector, modifiers])
def clear_binding(self, configuration_number: float, binding_index: float) -> None:
def clear_binding(self, configuration_number: int, binding_index: int) -> None:
"""
Clears a specific binding stored on the device.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_hid_Adapter_CMD_clear_binding, "u8 u8", [configuration_number, binding_index]))
self.send_cmd_packed(JD_HID_ADAPTER_CMD_CLEAR_BINDING, [configuration_number, binding_index])
def clear_configuration(self, configuration_number: float) -> None:
def clear_configuration(self, configuration_number: int) -> None:
"""
Clears a specific configuration stored on the device.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_hid_Adapter_CMD_clear_configuration, "u8", [configuration_number]))
self.send_cmd_packed(JD_HID_ADAPTER_CMD_CLEAR_CONFIGURATION, [configuration_number])
def clear(self, ) -> None:
"""
Clears all configurations and bindings stored on the device.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_hid_Adapter_CMD_clear))
self.send_cmd_packed(JD_HID_ADAPTER_CMD_CLEAR, [])

Просмотреть файл

@ -23,5 +23,5 @@ class HidKeyboardClient(Client):
"""
Clears all pressed keys.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_hid_Keyboard_CMD_clear))
self.send_cmd_packed(JD_HID_KEYBOARD_CMD_CLEAR, [])

Просмотреть файл

@ -19,19 +19,19 @@ class HidMouseClient(Client):
A ``Click`` is the same as ``Down`` followed by ``Up`` after 100ms.
A ``DoubleClick`` is two clicks with ``150ms`` gap between them (that is, ``100ms`` first click, ``150ms`` gap, ``100ms`` second click).
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_hid_Mouse_CMD_set_button, "u16 u8", [buttons, event]))
self.send_cmd_packed(JD_HID_MOUSE_CMD_SET_BUTTON, [buttons, event])
def move(self, dx: float, dy: float, time: float) -> None:
def move(self, dx: int, dy: int, time: int) -> None:
"""
Moves the mouse by the distance specified.
If the time is positive, it specifies how long to make the move.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_hid_Mouse_CMD_move, "i16 i16 u16", [dx, dy, time]))
self.send_cmd_packed(JD_HID_MOUSE_CMD_MOVE, [dx, dy, time])
def wheel(self, dy: float, time: float) -> None:
def wheel(self, dy: int, time: int) -> None:
"""
Turns the wheel up or down. Positive if scrolling up.
If the time is positive, it specifies how long to make the move.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_hid_Mouse_CMD_wheel, "i16 u16", [dy, time]))
self.send_cmd_packed(JD_HID_MOUSE_CMD_WHEEL, [dy, time])

Просмотреть файл

@ -30,7 +30,7 @@ class IndexedScreenClient(Client):
@property
def bits_per_pixel(self) -> Union[float, None]:
def bits_per_pixel(self) -> Union[int, None]:
"""
Determines the number of palette entries.
Typical values are 1, 2, 4, or 8., bit
@ -39,7 +39,7 @@ class IndexedScreenClient(Client):
return reg.value(0)
@property
def width(self) -> Union[float, None]:
def width(self) -> Union[int, None]:
"""
Screen width in "natural" orientation., px
"""
@ -47,7 +47,7 @@ class IndexedScreenClient(Client):
return reg.value(0)
@property
def height(self) -> Union[float, None]:
def height(self) -> Union[int, None]:
"""
Screen height in "natural" orientation., px
"""
@ -74,7 +74,7 @@ class IndexedScreenClient(Client):
@property
def up_sampling(self) -> Union[float, None]:
def up_sampling(self) -> Union[int, None]:
"""
Every pixel sent over wire is represented by `up_sampling x up_sampling` square of physical pixels.
Some displays may allow changing this (which will also result in changes to `width` and `height`).
@ -84,13 +84,13 @@ class IndexedScreenClient(Client):
return reg.value(0)
@up_sampling.setter
def up_sampling(self, value: float) -> None:
def up_sampling(self, value: int) -> None:
reg = self.register(JD_INDEXED_SCREEN_REG_UP_SAMPLING)
reg.set_value(0, value)
@property
def rotation(self) -> Union[float, None]:
def rotation(self) -> Union[int, None]:
"""
Possible values are 0, 90, 180 and 270 only.
Write to this register do not affect `width` and `height` registers,
@ -100,22 +100,22 @@ class IndexedScreenClient(Client):
return reg.value(0)
@rotation.setter
def rotation(self, value: float) -> None:
def rotation(self, value: int) -> None:
reg = self.register(JD_INDEXED_SCREEN_REG_ROTATION)
reg.set_value(0, value)
def start_update(self, x: float, y: float, width: float, height: float) -> None:
def start_update(self, x: int, y: int, width: int, height: int) -> None:
"""
Sets the update window for subsequent `set_pixels` commands.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_indexed_Screen_CMD_start_update, "u16 u16 u16 u16", [x, y, width, height]))
self.send_cmd_packed(JD_INDEXED_SCREEN_CMD_START_UPDATE, [x, y, width, height])
def set_pixels(self, pixels: bytes) -> None:
"""
Set pixels in current window, according to current palette.
Each "line" of data is aligned to a byte.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_indexed_Screen_CMD_set_pixels, "b", [pixels]))
self.send_cmd_packed(JD_INDEXED_SCREEN_CMD_SET_PIXELS, [pixels])

Просмотреть файл

@ -13,7 +13,7 @@ class LedClient(Client):
@property
def color_Red(self) -> Union[float, None]:
def color_Red(self) -> Union[int, None]:
"""
The current color of the LED.
"""
@ -21,7 +21,7 @@ class LedClient(Client):
return reg.value(0)
@property
def color_Green(self) -> Union[float, None]:
def color_Green(self) -> Union[int, None]:
"""
The current color of the LED.
"""
@ -29,7 +29,7 @@ class LedClient(Client):
return reg.value(1)
@property
def color_Blue(self) -> Union[float, None]:
def color_Blue(self) -> Union[int, None]:
"""
The current color of the LED.
"""
@ -37,7 +37,7 @@ class LedClient(Client):
return reg.value(2)
@property
def max_power(self) -> Union[float, None]:
def max_power(self) -> Union[int, None]:
"""
(Optional) Limit the power drawn by the light-strip (and controller)., mA
"""
@ -45,13 +45,13 @@ class LedClient(Client):
return reg.value(0)
@max_power.setter
def max_power(self, value: float) -> None:
def max_power(self, value: int) -> None:
reg = self.register(JD_LED_REG_MAX_POWER)
reg.set_value(0, value)
@property
def led_count(self) -> Union[float, None]:
def led_count(self) -> Union[int, None]:
"""
(Optional) If known, specifies the number of LEDs in parallel on this device.
"""
@ -59,7 +59,7 @@ class LedClient(Client):
return reg.value(0)
@property
def wave_length(self) -> Union[float, None]:
def wave_length(self) -> Union[int, None]:
"""
(Optional) If monochrome LED, specifies the wave length of the LED., nm
"""
@ -67,7 +67,7 @@ class LedClient(Client):
return reg.value(0)
@property
def luminous_intensity(self) -> Union[float, None]:
def luminous_intensity(self) -> Union[int, None]:
"""
(Optional) The luminous intensity of the LED, at full value, in micro candella., mcd
"""
@ -83,9 +83,9 @@ class LedClient(Client):
return reg.value(0)
def animate(self, to_red: float, to_green: float, to_blue: float, speed: float) -> None:
def animate(self, to_red: int, to_green: int, to_blue: int, speed: int) -> None:
"""
This has the same semantics as `set_status_light` in the control service.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_led_CMD_animate, "u8 u8 u8 u8", [to_red, to_green, to_blue, speed]))
self.send_cmd_packed(JD_LED_CMD_ANIMATE, [to_red, to_green, to_blue, speed])

Просмотреть файл

@ -54,7 +54,7 @@ class LedPixelClient(Client):
@property
def num_pixels(self) -> Union[float, None]:
def num_pixels(self) -> Union[int, None]:
"""
Specifies the number of pixels in the strip.
Controllers which are sold with lights should default to the correct length
@ -64,13 +64,13 @@ class LedPixelClient(Client):
return reg.value(0)
@num_pixels.setter
def num_pixels(self, value: float) -> None:
def num_pixels(self, value: int) -> None:
reg = self.register(JD_LED_PIXEL_REG_NUM_PIXELS)
reg.set_value(0, value)
@property
def num_columns(self) -> Union[float, None]:
def num_columns(self) -> Union[int, None]:
"""
(Optional) If the LED pixel strip is a matrix, specifies the number of columns. Otherwise, a square shape is assumed. Controllers which are sold with lights should default to the correct length
and could not allow change. Increasing length at runtime leads to ineffective use of memory and may lead to controller reboot., #
@ -79,13 +79,13 @@ class LedPixelClient(Client):
return reg.value(0)
@num_columns.setter
def num_columns(self, value: float) -> None:
def num_columns(self, value: int) -> None:
reg = self.register(JD_LED_PIXEL_REG_NUM_COLUMNS)
reg.set_value(0, value)
@property
def max_power(self) -> Union[float, None]:
def max_power(self) -> Union[int, None]:
"""
Limit the power drawn by the light-strip (and controller)., mA
"""
@ -93,13 +93,13 @@ class LedPixelClient(Client):
return reg.value(0)
@max_power.setter
def max_power(self, value: float) -> None:
def max_power(self, value: int) -> None:
reg = self.register(JD_LED_PIXEL_REG_MAX_POWER)
reg.set_value(0, value)
@property
def max_pixels(self) -> Union[float, None]:
def max_pixels(self) -> Union[int, None]:
"""
The maximum supported number of pixels.
All writes to `num_pixels` are clamped to `max_pixels`., #
@ -108,7 +108,7 @@ class LedPixelClient(Client):
return reg.value(0)
@property
def num_repeats(self) -> Union[float, None]:
def num_repeats(self) -> Union[int, None]:
"""
How many times to repeat the program passed in `run` command.
Should be set before the `run` command.
@ -118,7 +118,7 @@ class LedPixelClient(Client):
return reg.value(0)
@num_repeats.setter
def num_repeats(self, value: float) -> None:
def num_repeats(self, value: int) -> None:
reg = self.register(JD_LED_PIXEL_REG_NUM_REPEATS)
reg.set_value(0, value)
@ -136,5 +136,5 @@ class LedPixelClient(Client):
"""
Run the given light "program". See service description for details.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_led_Pixel_CMD_run, "b", [program]))
self.send_cmd_packed(JD_LED_PIXEL_CMD_RUN, [program])

Просмотреть файл

@ -13,7 +13,7 @@ class MagnetometerClient(Client):
@property
def x(self) -> Union[float, None]:
def x(self) -> Union[int, None]:
"""
Indicates the current magnetic field on magnetometer.
For reference: `1 mgauss` is `100 nT` (and `1 gauss` is `100 000 nT`)., nT
@ -22,7 +22,7 @@ class MagnetometerClient(Client):
return reg.value(0)
@property
def y(self) -> Union[float, None]:
def y(self) -> Union[int, None]:
"""
Indicates the current magnetic field on magnetometer.
For reference: `1 mgauss` is `100 nT` (and `1 gauss` is `100 000 nT`)., nT
@ -31,7 +31,7 @@ class MagnetometerClient(Client):
return reg.value(1)
@property
def z(self) -> Union[float, None]:
def z(self) -> Union[int, None]:
"""
Indicates the current magnetic field on magnetometer.
For reference: `1 mgauss` is `100 nT` (and `1 gauss` is `100 000 nT`)., nT
@ -40,7 +40,7 @@ class MagnetometerClient(Client):
return reg.value(2)
@property
def forces_error(self) -> Union[float, None]:
def forces_error(self) -> Union[int, None]:
"""
(Optional) Error on the readings., nT
"""
@ -53,5 +53,5 @@ class MagnetometerClient(Client):
Forces a calibration sequence where the user/device
might have to rotate to be calibrated.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_magnetometer_CMD_calibrate))
self.send_cmd_packed(JD_MAGNETOMETER_CMD_CALIBRATE, [])

Просмотреть файл

@ -13,7 +13,7 @@ class MatrixKeypadClient(Client):
@property
def rows(self) -> Union[float, None]:
def rows(self) -> Union[int, None]:
"""
Number of rows in the matrix, #
"""
@ -21,7 +21,7 @@ class MatrixKeypadClient(Client):
return reg.value(0)
@property
def columns(self) -> Union[float, None]:
def columns(self) -> Union[int, None]:
"""
Number of columns in the matrix, #
"""

Просмотреть файл

@ -13,7 +13,7 @@ class MicrophoneClient(Client):
@property
def sampling_period(self) -> Union[float, None]:
def sampling_period(self) -> Union[int, None]:
"""
Get or set microphone sampling period.
Sampling rate is `1_000_000 / sampling_period Hz`., us
@ -22,7 +22,7 @@ class MicrophoneClient(Client):
return reg.value(0)
@sampling_period.setter
def sampling_period(self, value: float) -> None:
def sampling_period(self, value: int) -> None:
reg = self.register(JD_MICROPHONE_REG_SAMPLING_PERIOD)
reg.set_value(0, value)

Просмотреть файл

@ -31,11 +31,11 @@ class MidiOutputClient(Client):
"""
Clears any pending send data that has not yet been sent from the MIDIOutput's queue.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_midi_Output_CMD_clear))
self.send_cmd_packed(JD_MIDI_OUTPUT_CMD_CLEAR, [])
def send(self, data: bytes) -> None:
"""
Enqueues the message to be sent to the corresponding MIDI port
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_midi_Output_CMD_send, "b", [data]))
self.send_cmd_packed(JD_MIDI_OUTPUT_CMD_SEND, [data])

Просмотреть файл

@ -17,7 +17,7 @@ class ModelRunnerClient(Client):
@property
def auto_invoke_every(self) -> Union[float, None]:
def auto_invoke_every(self) -> Union[int, None]:
"""
When register contains `N > 0`, run the model automatically every time new `N` samples are collected.
Model may be run less often if it takes longer to run than `N * sampling_interval`.
@ -28,13 +28,13 @@ class ModelRunnerClient(Client):
return reg.value(0)
@auto_invoke_every.setter
def auto_invoke_every(self, value: float) -> None:
def auto_invoke_every(self, value: int) -> None:
reg = self.register(JD_MODEL_RUNNER_REG_AUTO_INVOKE_EVERY)
reg.set_value(0, value)
@property
def last_run_time(self) -> Union[float, None]:
def last_run_time(self) -> Union[int, None]:
"""
The time consumed in last model execution., us
"""
@ -42,7 +42,7 @@ class ModelRunnerClient(Client):
return reg.value(0)
@property
def allocated_arena_size(self) -> Union[float, None]:
def allocated_arena_size(self) -> Union[int, None]:
"""
Number of RAM bytes allocated for model execution., B
"""
@ -50,7 +50,7 @@ class ModelRunnerClient(Client):
return reg.value(0)
@property
def model_size(self) -> Union[float, None]:
def model_size(self) -> Union[int, None]:
"""
The size of the model in bytes., B
"""
@ -77,7 +77,7 @@ class ModelRunnerClient(Client):
return reg.value(0)
@property
def format_version(self) -> Union[float, None]:
def format_version(self) -> Union[int, None]:
"""
A version number for the format.
"""
@ -94,12 +94,12 @@ class ModelRunnerClient(Client):
return reg.value(0)
def set_model(self, model_size: float) -> None:
def set_model(self, model_size: int) -> None:
"""
Open pipe for streaming in the model. The size of the model has to be declared upfront.
The model is streamed over regular pipe data packets.
The format supported by this instance of the service is specified in `format` register.
When the pipe is closed, the model is written all into flash, and the device running the service may reset.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_model_Runner_CMD_set_model, "u32", [model_size]))
self.send_cmd_packed(JD_MODEL_RUNNER_CMD_SET_MODEL, [model_size])

Просмотреть файл

@ -29,7 +29,7 @@ class MotionClient(Client):
return reg.value(0)
@property
def angle(self) -> Union[float, None]:
def angle(self) -> Union[int, None]:
"""
(Optional) Opening of the field of view, °
"""

Просмотреть файл

@ -245,6 +245,7 @@ def jdpack(fmt: str, *args: PackType) -> bytes:
return res
# TODO: move out to some test file?
def _jdpack_test():
import json

Просмотреть файл

@ -0,0 +1,31 @@
from jacdac.bus import Bus, Client
from .constants import *
from typing import Union
class PotentiometerClient(Client):
"""
A slider or rotary potentiometer.
"""
def __init__(self, bus: Bus, role: str) -> None:
super().__init__(bus, JD_SERVICE_CLASS_POTENTIOMETER, JD_POTENTIOMETER_PACK_FORMATS, role)
@property
def position(self) -> Union[float, None]:
"""
The relative position of the slider., /
"""
reg = self.register(JD_POTENTIOMETER_REG_POSITION)
return reg.value(0)
@property
def variant(self) -> Union[PotentiometerVariant, None]:
"""
(Optional) Specifies the physical layout of the potentiometer.
"""
reg = self.register(JD_POTENTIOMETER_REG_VARIANT)
return reg.value(0)

Просмотреть файл

@ -29,7 +29,7 @@ class PowerClient(Client):
@property
def max_power(self) -> Union[float, None]:
def max_power(self) -> Union[int, None]:
"""
(Optional) Limit the power provided by the service. The actual maximum limit will depend on hardware.
This field may be read-only in some implementations - you should read it back after setting., mA
@ -38,7 +38,7 @@ class PowerClient(Client):
return reg.value(0)
@max_power.setter
def max_power(self, value: float) -> None:
def max_power(self, value: int) -> None:
reg = self.register(JD_POWER_REG_MAX_POWER)
reg.set_value(0, value)
@ -53,7 +53,7 @@ class PowerClient(Client):
return reg.value(0)
@property
def current_draw(self) -> Union[float, None]:
def current_draw(self) -> Union[int, None]:
"""
(Optional) Present current draw from the bus., mA
"""
@ -61,7 +61,7 @@ class PowerClient(Client):
return reg.value(0)
@property
def battery_voltage(self) -> Union[float, None]:
def battery_voltage(self) -> Union[int, None]:
"""
(Optional) Voltage on input., mV
"""
@ -77,7 +77,7 @@ class PowerClient(Client):
return reg.value(0)
@property
def battery_capacity(self) -> Union[float, None]:
def battery_capacity(self) -> Union[int, None]:
"""
(Optional) Energy that can be delivered to the bus when battery is fully charged.
This excludes conversion overheads if any., mWh
@ -86,7 +86,7 @@ class PowerClient(Client):
return reg.value(0)
@property
def keep_on_pulse_duration(self) -> Union[float, None]:
def keep_on_pulse_duration(self) -> Union[int, None]:
"""
(Optional) Many USB power packs need current to be drawn from time to time to prevent shutdown.
This regulates how often and for how long such current is drawn.
@ -96,13 +96,13 @@ class PowerClient(Client):
return reg.value(0)
@keep_on_pulse_duration.setter
def keep_on_pulse_duration(self, value: float) -> None:
def keep_on_pulse_duration(self, value: int) -> None:
reg = self.register(JD_POWER_REG_KEEP_ON_PULSE_DURATION)
reg.set_value(0, value)
@property
def keep_on_pulse_period(self) -> Union[float, None]:
def keep_on_pulse_period(self) -> Union[int, None]:
"""
(Optional) Many USB power packs need current to be drawn from time to time to prevent shutdown.
This regulates how often and for how long such current is drawn.
@ -112,7 +112,7 @@ class PowerClient(Client):
return reg.value(0)
@keep_on_pulse_period.setter
def keep_on_pulse_period(self, value: float) -> None:
def keep_on_pulse_period(self, value: int) -> None:
reg = self.register(JD_POWER_REG_KEEP_ON_PULSE_PERIOD)
reg.set_value(0, value)
@ -128,5 +128,5 @@ class PowerClient(Client):
"""
Sent by the power service periodically, as broadcast.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_power_CMD_shutdown))
self.send_cmd_packed(JD_POWER_CMD_SHUTDOWN, [])

Просмотреть файл

@ -36,7 +36,7 @@ class ProtoTestClient(Client):
return reg.value(0)
@property
def rw_u32(self) -> Union[float, None]:
def rw_u32(self) -> Union[int, None]:
"""
A read write u32 register.
"""
@ -44,13 +44,13 @@ class ProtoTestClient(Client):
return reg.value(0)
@rw_u32.setter
def rw_u32(self, value: float) -> None:
def rw_u32(self, value: int) -> None:
reg = self.register(JD_PROTO_TEST_REG_RW_U32)
reg.set_value(0, value)
@property
def ro_u32(self) -> Union[float, None]:
def ro_u32(self) -> Union[int, None]:
"""
A read only u32 register.. Mirrors rw_u32.
"""
@ -58,7 +58,7 @@ class ProtoTestClient(Client):
return reg.value(0)
@property
def rw_i32(self) -> Union[float, None]:
def rw_i32(self) -> Union[int, None]:
"""
A read write i32 register.
"""
@ -66,13 +66,13 @@ class ProtoTestClient(Client):
return reg.value(0)
@rw_i32.setter
def rw_i32(self, value: float) -> None:
def rw_i32(self, value: int) -> None:
reg = self.register(JD_PROTO_TEST_REG_RW_I32)
reg.set_value(0, value)
@property
def ro_i32(self) -> Union[float, None]:
def ro_i32(self) -> Union[int, None]:
"""
A read only i32 register.. Mirrors rw_i32.
"""
@ -124,7 +124,7 @@ class ProtoTestClient(Client):
return reg.value(0)
@property
def rw_i8_u8_u16_i32I8(self) -> Union[float, None]:
def rw_i8_u8_u16_i32I8(self) -> Union[int, None]:
"""
A read write i8, u8, u16, i32 register.
"""
@ -132,13 +132,13 @@ class ProtoTestClient(Client):
return reg.value(0)
@rw_i8_u8_u16_i32I8.setter
def rw_i8_u8_u16_i32I8(self, value: float) -> None:
def rw_i8_u8_u16_i32I8(self, value: int) -> None:
reg = self.register(JD_PROTO_TEST_REG_RW_I8_U8_U16_I32)
reg.set_value(0, value)
@property
def rw_i8_u8_u16_i32U8(self) -> Union[float, None]:
def rw_i8_u8_u16_i32U8(self) -> Union[int, None]:
"""
A read write i8, u8, u16, i32 register.
"""
@ -146,13 +146,13 @@ class ProtoTestClient(Client):
return reg.value(1)
@rw_i8_u8_u16_i32U8.setter
def rw_i8_u8_u16_i32U8(self, value: float) -> None:
def rw_i8_u8_u16_i32U8(self, value: int) -> None:
reg = self.register(JD_PROTO_TEST_REG_RW_I8_U8_U16_I32)
reg.set_value(1, value)
@property
def rw_i8_u8_u16_i32U16(self) -> Union[float, None]:
def rw_i8_u8_u16_i32U16(self) -> Union[int, None]:
"""
A read write i8, u8, u16, i32 register.
"""
@ -160,13 +160,13 @@ class ProtoTestClient(Client):
return reg.value(2)
@rw_i8_u8_u16_i32U16.setter
def rw_i8_u8_u16_i32U16(self, value: float) -> None:
def rw_i8_u8_u16_i32U16(self, value: int) -> None:
reg = self.register(JD_PROTO_TEST_REG_RW_I8_U8_U16_I32)
reg.set_value(2, value)
@property
def rw_i8_u8_u16_i32I32(self) -> Union[float, None]:
def rw_i8_u8_u16_i32I32(self) -> Union[int, None]:
"""
A read write i8, u8, u16, i32 register.
"""
@ -174,13 +174,13 @@ class ProtoTestClient(Client):
return reg.value(3)
@rw_i8_u8_u16_i32I32.setter
def rw_i8_u8_u16_i32I32(self, value: float) -> None:
def rw_i8_u8_u16_i32I32(self, value: int) -> None:
reg = self.register(JD_PROTO_TEST_REG_RW_I8_U8_U16_I32)
reg.set_value(3, value)
@property
def ro_i8_u8_u16_i32I8(self) -> Union[float, None]:
def ro_i8_u8_u16_i32I8(self) -> Union[int, None]:
"""
A read only i8, u8, u16, i32 register.. Mirrors rw_i8_u8_u16_i32.
"""
@ -188,7 +188,7 @@ class ProtoTestClient(Client):
return reg.value(0)
@property
def ro_i8_u8_u16_i32U8(self) -> Union[float, None]:
def ro_i8_u8_u16_i32U8(self) -> Union[int, None]:
"""
A read only i8, u8, u16, i32 register.. Mirrors rw_i8_u8_u16_i32.
"""
@ -196,7 +196,7 @@ class ProtoTestClient(Client):
return reg.value(1)
@property
def ro_i8_u8_u16_i32U16(self) -> Union[float, None]:
def ro_i8_u8_u16_i32U16(self) -> Union[int, None]:
"""
A read only i8, u8, u16, i32 register.. Mirrors rw_i8_u8_u16_i32.
"""
@ -204,7 +204,7 @@ class ProtoTestClient(Client):
return reg.value(2)
@property
def ro_i8_u8_u16_i32I32(self) -> Union[float, None]:
def ro_i8_u8_u16_i32I32(self) -> Union[int, None]:
"""
A read only i8, u8, u16, i32 register.. Mirrors rw_i8_u8_u16_i32.
"""
@ -212,7 +212,7 @@ class ProtoTestClient(Client):
return reg.value(3)
@property
def rw_u8_string_U8(self) -> Union[float, None]:
def rw_u8_string_U8(self) -> Union[int, None]:
"""
A read write u8, string register.
"""
@ -220,7 +220,7 @@ class ProtoTestClient(Client):
return reg.value(0)
@rw_u8_string_U8.setter
def rw_u8_string_U8(self, value: float) -> None:
def rw_u8_string_U8(self, value: int) -> None:
reg = self.register(JD_PROTO_TEST_REG_RW_U8_STRING)
reg.set_value(0, value)
@ -240,7 +240,7 @@ class ProtoTestClient(Client):
@property
def ro_u8_string_U8(self) -> Union[float, None]:
def ro_u8_string_U8(self) -> Union[int, None]:
"""
A read only u8, string register.. Mirrors rw_u8_string.
"""
@ -302,41 +302,41 @@ class ProtoTestClient(Client):
"""
A command to set rw_bool.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_proto_Test_CMD_c_bool, "u8", [bool]))
self.send_cmd_packed(JD_PROTO_TEST_CMD_C_BOOL, [bool])
def c_u32(self, u32: float) -> None:
def c_u32(self, u32: int) -> None:
"""
A command to set rw_u32.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_proto_Test_CMD_c_u32, "u32", [u32]))
self.send_cmd_packed(JD_PROTO_TEST_CMD_C_U32, [u32])
def c_i32(self, i32: float) -> None:
def c_i32(self, i32: int) -> None:
"""
A command to set rw_i32.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_proto_Test_CMD_c_i32, "i32", [i32]))
self.send_cmd_packed(JD_PROTO_TEST_CMD_C_I32, [i32])
def c_string(self, string: str) -> None:
"""
A command to set rw_string.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_proto_Test_CMD_c_string, "s", [string]))
self.send_cmd_packed(JD_PROTO_TEST_CMD_C_STRING, [string])
def c_bytes(self, bytes: bytes) -> None:
"""
A command to set rw_string.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_proto_Test_CMD_c_bytes, "b", [bytes]))
self.send_cmd_packed(JD_PROTO_TEST_CMD_C_BYTES, [bytes])
def c_i8_u8_u16_i32(self, i8: float, u8: float, u16: float, i32: float) -> None:
def c_i8_u8_u16_i32(self, i8: int, u8: int, u16: int, i32: int) -> None:
"""
A command to set rw_bytes.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_proto_Test_CMD_c_i8_u8_u16_i32, "i8 u8 u16 i32", [i8, u8, u16, i32]))
self.send_cmd_packed(JD_PROTO_TEST_CMD_C_I8_U8_U16_I32, [i8, u8, u16, i32])
def c_u8_string(self, u8: float, string: str) -> None:
def c_u8_string(self, u8: int, string: str) -> None:
"""
A command to set rw_u8_string.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_proto_Test_CMD_c_u8_string, "u8 s", [u8, string]))
self.send_cmd_packed(JD_PROTO_TEST_CMD_C_U8_STRING, [u8, string])

Просмотреть файл

@ -13,7 +13,7 @@ class RealTimeClockClient(Client):
@property
def year(self) -> Union[float, None]:
def year(self) -> Union[int, None]:
"""
Current time in 24h representation.
* ``day_of_month`` is day of the month, starting at ``1``
@ -24,7 +24,7 @@ class RealTimeClockClient(Client):
return reg.value(0)
@property
def month(self) -> Union[float, None]:
def month(self) -> Union[int, None]:
"""
Current time in 24h representation.
* ``day_of_month`` is day of the month, starting at ``1``
@ -35,7 +35,7 @@ class RealTimeClockClient(Client):
return reg.value(1)
@property
def day_of_month(self) -> Union[float, None]:
def day_of_month(self) -> Union[int, None]:
"""
Current time in 24h representation.
* ``day_of_month`` is day of the month, starting at ``1``
@ -46,7 +46,7 @@ class RealTimeClockClient(Client):
return reg.value(2)
@property
def day_of_week(self) -> Union[float, None]:
def day_of_week(self) -> Union[int, None]:
"""
Current time in 24h representation.
* ``day_of_month`` is day of the month, starting at ``1``
@ -57,7 +57,7 @@ class RealTimeClockClient(Client):
return reg.value(3)
@property
def hour(self) -> Union[float, None]:
def hour(self) -> Union[int, None]:
"""
Current time in 24h representation.
* ``day_of_month`` is day of the month, starting at ``1``
@ -68,7 +68,7 @@ class RealTimeClockClient(Client):
return reg.value(4)
@property
def min(self) -> Union[float, None]:
def min(self) -> Union[int, None]:
"""
Current time in 24h representation.
* ``day_of_month`` is day of the month, starting at ``1``
@ -79,7 +79,7 @@ class RealTimeClockClient(Client):
return reg.value(5)
@property
def sec(self) -> Union[float, None]:
def sec(self) -> Union[int, None]:
"""
Current time in 24h representation.
* ``day_of_month`` is day of the month, starting at ``1``
@ -114,9 +114,9 @@ class RealTimeClockClient(Client):
return reg.value(0)
def set_time(self, year: float, month: float, day_of_month: float, day_of_week: float, hour: float, min: float, sec: float) -> None:
def set_time(self, year: int, month: int, day_of_month: int, day_of_week: int, hour: int, min: int, sec: int) -> None:
"""
Sets the current time and resets the error.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_real_Time_Clock_CMD_set_time, "u16 u8 u8 u8 u8 u8 u8", [year, month, day_of_month, day_of_week, hour, min, sec]))
self.send_cmd_packed(JD_REAL_TIME_CLOCK_CMD_SET_TIME, [year, month, day_of_month, day_of_week, hour, min, sec])

59
jacdac/relay/client.gpy Normal file
Просмотреть файл

@ -0,0 +1,59 @@
from jacdac.bus import Bus, Client
from .constants import *
from typing import Union
from jacdac.events import HandlerFn
class RelayClient(Client):
"""
A switching relay.
"""
def __init__(self, bus: Bus, role: str) -> None:
super().__init__(bus, JD_SERVICE_CLASS_RELAY, JD_RELAY_PACK_FORMATS, role)
@property
def closed(self) -> Union[bool, None]:
"""
Indicates whether the relay circuit is currently energized (closed) or not.
"""
reg = self.register(JD_RELAY_REG_CLOSED)
return reg.value(0)
@closed.setter
def closed(self, value: bool) -> None:
reg = self.register(JD_RELAY_REG_CLOSED)
reg.set_value(0, value)
@property
def variant(self) -> Union[RelayVariant, None]:
"""
(Optional) Describes the type of relay used.
"""
reg = self.register(JD_RELAY_REG_VARIANT)
return reg.value(0)
@property
def max_switching_current(self) -> Union[int, None]:
"""
(Optional) Maximum switching current for a resistive load., mA
"""
reg = self.register(JD_RELAY_REG_MAX_SWITCHING_CURRENT)
return reg.value(0)
def on_active(self, handler: HandlerFn) -> None:
"""
Emitted when relay goes from `inactive` to `active` state.
Normally open (NO) relays close the circuit when activated.
"""
# TODO
def on_inactive(self, handler: HandlerFn) -> None:
"""
Emitted when relay goes from `active` to `inactive` state.
Normally closed (NC) relays open the circuit when activated.
"""
# TODO

Просмотреть файл

@ -57,21 +57,21 @@ class RoleManagerClient(Client):
# TODO
def get_role(self, device_id: bytes, service_idx: float) -> None:
def get_role(self, device_id: bytes, service_idx: int) -> None:
"""
Get the role corresponding to given device identifer. Returns empty string if unset.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_role_Manager_CMD_get_role, "b[8] u8", [device_id, service_idx]))
self.send_cmd_packed(JD_ROLE_MANAGER_CMD_GET_ROLE, [device_id, service_idx])
def set_role(self, device_id: bytes, service_idx: float, role: str) -> None:
def set_role(self, device_id: bytes, service_idx: int, role: str) -> None:
"""
Set role. Can set to empty to remove role binding.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_role_Manager_CMD_set_role, "b[8] u8 s", [device_id, service_idx, role]))
self.send_cmd_packed(JD_ROLE_MANAGER_CMD_SET_ROLE, [device_id, service_idx, role])
def clear_all_roles(self, ) -> None:
"""
Remove all role bindings.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_role_Manager_CMD_clear_all_roles))
self.send_cmd_packed(JD_ROLE_MANAGER_CMD_CLEAR_ALL_ROLES, [])

Просмотреть файл

@ -0,0 +1,32 @@
from jacdac.bus import Bus, Client
from .constants import *
from typing import Union
class RotaryEncoderClient(Client):
"""
An incremental rotary encoder - converts angular motion of a shaft to digital signal.
"""
def __init__(self, bus: Bus, role: str) -> None:
super().__init__(bus, JD_SERVICE_CLASS_ROTARY_ENCODER, JD_ROTARY_ENCODER_PACK_FORMATS, role)
@property
def position(self) -> Union[int, None]:
"""
Upon device reset starts at `0` (regardless of the shaft position).
Increases by `1` for a clockwise "click", by `-1` for counter-clockwise., #
"""
reg = self.register(JD_ROTARY_ENCODER_REG_POSITION)
return reg.value(0)
@property
def clicks_per_turn(self) -> Union[int, None]:
"""
(Optional) This specifies by how much `position` changes when the crank does 360 degree turn. Typically 12 or 24., #
"""
reg = self.register(JD_ROTARY_ENCODER_REG_CLICKS_PER_TURN)
return reg.value(0)

Просмотреть файл

@ -14,7 +14,7 @@ class SensorAggregatorClient(Client):
@property
def num_samples(self) -> Union[float, None]:
def num_samples(self) -> Union[int, None]:
"""
Number of input samples collected so far.
"""
@ -22,7 +22,7 @@ class SensorAggregatorClient(Client):
return reg.value(0)
@property
def sample_size(self) -> Union[float, None]:
def sample_size(self) -> Union[int, None]:
"""
Size of a single sample., B
"""
@ -30,7 +30,7 @@ class SensorAggregatorClient(Client):
return reg.value(0)
@property
def streaming_samples(self) -> Union[float, None]:
def streaming_samples(self) -> Union[int, None]:
"""
When set to `N`, will stream `N` samples as `current_sample` reading., #
"""
@ -38,7 +38,7 @@ class SensorAggregatorClient(Client):
return reg.value(0)
@streaming_samples.setter
def streaming_samples(self, value: float) -> None:
def streaming_samples(self, value: int) -> None:
reg = self.register(JD_SENSOR_AGGREGATOR_REG_STREAMING_SAMPLES)
reg.set_value(0, value)

Просмотреть файл

@ -65,7 +65,7 @@ class ServoClient(Client):
return reg.value(0)
@property
def min_pulse(self) -> Union[float, None]:
def min_pulse(self) -> Union[int, None]:
"""
The length of pulse corresponding to lowest angle., us
"""
@ -73,7 +73,7 @@ class ServoClient(Client):
return reg.value(0)
@min_pulse.setter
def min_pulse(self, value: float) -> None:
def min_pulse(self, value: int) -> None:
reg = self.register(JD_SERVO_REG_MIN_PULSE)
reg.set_value(0, value)
@ -87,7 +87,7 @@ class ServoClient(Client):
return reg.value(0)
@property
def max_pulse(self) -> Union[float, None]:
def max_pulse(self) -> Union[int, None]:
"""
The length of pulse corresponding to highest angle., us
"""
@ -95,7 +95,7 @@ class ServoClient(Client):
return reg.value(0)
@max_pulse.setter
def max_pulse(self, value: float) -> None:
def max_pulse(self, value: int) -> None:
reg = self.register(JD_SERVO_REG_MAX_PULSE)
reg.set_value(0, value)

Просмотреть файл

@ -23,5 +23,5 @@ class SettingsClient(Client):
"""
Get the value of given setting. If no such entry exists, the value returned is empty.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_settings_CMD_get, "s", [key]))
self.send_cmd_packed(JD_SETTINGS_CMD_GET, [key])

Просмотреть файл

@ -69,7 +69,7 @@ class SevenSegmentDisplayClient(Client):
@property
def digit_count(self) -> Union[float, None]:
def digit_count(self) -> Union[int, None]:
"""
The number of digits available on the display.
"""

Просмотреть файл

@ -35,7 +35,7 @@ class SoundLevelClient(Client):
@property
def min_decibels(self) -> Union[float, None]:
def min_decibels(self) -> Union[int, None]:
"""
(Optional) The minimum power value considered by the sensor.
If both ``min_decibels`` and ``max_decibels`` are supported,
@ -46,13 +46,13 @@ class SoundLevelClient(Client):
return reg.value(0)
@min_decibels.setter
def min_decibels(self, value: float) -> None:
def min_decibels(self, value: int) -> None:
reg = self.register(JD_SOUND_LEVEL_REG_MIN_DECIBELS)
reg.set_value(0, value)
@property
def max_decibels(self) -> Union[float, None]:
def max_decibels(self) -> Union[int, None]:
"""
(Optional) The maximum power value considered by the sensor.
If both ``min_decibels`` and ``max_decibels`` are supported,
@ -63,7 +63,7 @@ class SoundLevelClient(Client):
return reg.value(0)
@max_decibels.setter
def max_decibels(self, value: float) -> None:
def max_decibels(self, value: int) -> None:
reg = self.register(JD_SOUND_LEVEL_REG_MAX_DECIBELS)
reg.set_value(0, value)

Просмотреть файл

@ -31,5 +31,5 @@ class SoundPlayerClient(Client):
"""
Starts playing a sound.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_sound_Player_CMD_play, "s", [name]))
self.send_cmd_packed(JD_SOUND_PLAYER_CMD_PLAY, [name])

Просмотреть файл

@ -35,7 +35,7 @@ class SoundSpectrumClient(Client):
@property
def fft_pow2_size(self) -> Union[float, None]:
def fft_pow2_size(self) -> Union[int, None]:
"""
The power of 2 used as the size of the FFT to be used to determine the frequency domain.
"""
@ -43,13 +43,13 @@ class SoundSpectrumClient(Client):
return reg.value(0)
@fft_pow2_size.setter
def fft_pow2_size(self, value: float) -> None:
def fft_pow2_size(self, value: int) -> None:
reg = self.register(JD_SOUND_SPECTRUM_REG_FFT_POW2_SIZE)
reg.set_value(0, value)
@property
def min_decibels(self) -> Union[float, None]:
def min_decibels(self) -> Union[int, None]:
"""
The minimum power value in the scaling range for the FFT analysis data, dB
"""
@ -57,13 +57,13 @@ class SoundSpectrumClient(Client):
return reg.value(0)
@min_decibels.setter
def min_decibels(self, value: float) -> None:
def min_decibels(self, value: int) -> None:
reg = self.register(JD_SOUND_SPECTRUM_REG_MIN_DECIBELS)
reg.set_value(0, value)
@property
def max_decibels(self) -> Union[float, None]:
def max_decibels(self) -> Union[int, None]:
"""
The maximum power value in the scaling range for the FFT analysis data, dB
"""
@ -71,7 +71,7 @@ class SoundSpectrumClient(Client):
return reg.value(0)
@max_decibels.setter
def max_decibels(self, value: float) -> None:
def max_decibels(self, value: int) -> None:
reg = self.register(JD_SOUND_SPECTRUM_REG_MAX_DECIBELS)
reg.set_value(0, value)

Просмотреть файл

@ -87,11 +87,11 @@ class SpeechSynthesisClient(Client):
"""
Adds an utterance to the utterance queue; it will be spoken when any other utterances queued before it have been spoken.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_speech_Synthesis_CMD_speak, "s", [text]))
self.send_cmd_packed(JD_SPEECH_SYNTHESIS_CMD_SPEAK, [text])
def cancel(self, ) -> None:
"""
Cancels current utterance and all utterances from the utterance queue.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_speech_Synthesis_CMD_cancel))
self.send_cmd_packed(JD_SPEECH_SYNTHESIS_CMD_CANCEL, [])

Просмотреть файл

@ -0,0 +1,55 @@
from jacdac.bus import Bus, Client
from .constants import *
from typing import Union
class ThermocoupleClient(Client):
"""
A thermocouple using a heat probe to gather temperatures.
"""
def __init__(self, bus: Bus, role: str) -> None:
super().__init__(bus, JD_SERVICE_CLASS_THERMOCOUPLE, JD_THERMOCOUPLE_PACK_FORMATS, role)
@property
def temperature(self) -> Union[float, None]:
"""
The temperature., °C
"""
reg = self.register(JD_THERMOCOUPLE_REG_TEMPERATURE)
return reg.value(0)
@property
def min_temperature(self) -> Union[float, None]:
"""
Lowest temperature that can be reported., °C
"""
reg = self.register(JD_THERMOCOUPLE_REG_MIN_TEMPERATURE)
return reg.value(0)
@property
def max_temperature(self) -> Union[float, None]:
"""
Highest temperature that can be reported., °C
"""
reg = self.register(JD_THERMOCOUPLE_REG_MAX_TEMPERATURE)
return reg.value(0)
@property
def temperature_error(self) -> Union[float, None]:
"""
The real temperature is between `temperature - temperature_error` and `temperature + temperature_error`., °C
"""
reg = self.register(JD_THERMOCOUPLE_REG_TEMPERATURE_ERROR)
return reg.value(0)
@property
def variant(self) -> Union[ThermocoupleVariant, None]:
"""
(Optional) Specifies the type of thermometer.
"""
reg = self.register(JD_THERMOCOUPLE_REG_VARIANT)
return reg.value(0)

Просмотреть файл

@ -0,0 +1,55 @@
from jacdac.bus import Bus, Client
from .constants import *
from typing import Union
class ThermometerClient(Client):
"""
A thermometer measuring outside or inside environment.
"""
def __init__(self, bus: Bus, role: str) -> None:
super().__init__(bus, JD_SERVICE_CLASS_THERMOMETER, JD_THERMOMETER_PACK_FORMATS, role)
@property
def temperature(self) -> Union[float, None]:
"""
The temperature., °C
"""
reg = self.register(JD_THERMOMETER_REG_TEMPERATURE)
return reg.value(0)
@property
def min_temperature(self) -> Union[float, None]:
"""
Lowest temperature that can be reported., °C
"""
reg = self.register(JD_THERMOMETER_REG_MIN_TEMPERATURE)
return reg.value(0)
@property
def max_temperature(self) -> Union[float, None]:
"""
Highest temperature that can be reported., °C
"""
reg = self.register(JD_THERMOMETER_REG_MAX_TEMPERATURE)
return reg.value(0)
@property
def temperature_error(self) -> Union[float, None]:
"""
The real temperature is between `temperature - temperature_error` and `temperature + temperature_error`., °C
"""
reg = self.register(JD_THERMOMETER_REG_TEMPERATURE_ERROR)
return reg.value(0)
@property
def variant(self) -> Union[ThermometerVariant, None]:
"""
(Optional) Specifies the type of thermometer.
"""
reg = self.register(JD_THERMOMETER_REG_VARIANT)
return reg.value(0)

Просмотреть файл

@ -45,7 +45,7 @@ class TvocClient(Client):
return reg.value(0)
@property
def conditioning_period(self) -> Union[float, None]:
def conditioning_period(self) -> Union[int, None]:
"""
(Optional) Time required to achieve good sensor stability before measuring after long idle period., s
"""

Просмотреть файл

@ -64,6 +64,7 @@ def u32(buf: bytes, off: int):
return buf[off] | (buf[off+1] << 8) | (buf[off+2] << 16) | (buf[off+3] << 24)
# TODO: do we need this?
# TODO would we want the "u32 u16" kind of format strings?
def unpack(buf: bytes, fmt: str):
if fmt is None:
@ -71,6 +72,7 @@ def unpack(buf: bytes, fmt: str):
return struct.unpack("<" + fmt, buf)
# TODO: do we need this?
def pack(fmt: str, *args: object):
if len(args) == 1 and isinstance(args[0], (tuple, list)):
args = args[0] # type: ignore

Просмотреть файл

@ -21,7 +21,7 @@ class VerifiedTelemetryClient(Client):
return reg.value(0)
@property
def telemetry_status_interval(self) -> Union[float, None]:
def telemetry_status_interval(self) -> Union[int, None]:
"""
(Optional) Specifies the interval between computing the fingerprint information., ms
"""
@ -29,7 +29,7 @@ class VerifiedTelemetryClient(Client):
return reg.value(0)
@telemetry_status_interval.setter
def telemetry_status_interval(self, value: float) -> None:
def telemetry_status_interval(self, value: int) -> None:
reg = self.register(JD_VERIFIED_TELEMETRY_REG_TELEMETRY_STATUS_INTERVAL)
reg.set_value(0, value)
@ -43,7 +43,7 @@ class VerifiedTelemetryClient(Client):
return reg.value(0)
@property
def fingerprint_template_Confidence(self) -> Union[float, None]:
def fingerprint_template_Confidence(self) -> Union[int, None]:
"""
Template Fingerprint information of a working sensor., %
"""
@ -75,11 +75,11 @@ class VerifiedTelemetryClient(Client):
"""
This command will clear the template fingerprint of a sensor and collect a new template fingerprint of the attached sensor.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_verified_Telemetry_CMD_reset_fingerprint_template))
self.send_cmd_packed(JD_VERIFIED_TELEMETRY_CMD_RESET_FINGERPRINT_TEMPLATE, [])
def retrain_fingerprint_template(self, ) -> None:
"""
This command will append a new template fingerprint to the `fingerprintTemplate`. Appending more fingerprints will increase the accuracy in detecting the telemetry status.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_verified_Telemetry_CMD_retrain_fingerprint_template))
self.send_cmd_packed(JD_VERIFIED_TELEMETRY_CMD_RETRAIN_FINGERPRINT_TEMPLATE, [])

Просмотреть файл

@ -94,11 +94,11 @@ class WeightScaleClient(Client):
"""
Call this command when there is nothing on the scale. If supported, the module should save the calibration data.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_weight_Scale_CMD_calibrate_zero_offset))
self.send_cmd_packed(JD_WEIGHT_SCALE_CMD_CALIBRATE_ZERO_OFFSET, [])
def calibrate_gain(self, weight: float) -> None:
"""
Call this command with the weight of the thing on the scale.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_weight_Scale_CMD_calibrate_gain, "u22.10", [weight]))
self.send_cmd_packed(JD_WEIGHT_SCALE_CMD_CALIBRATE_GAIN, [weight])

Просмотреть файл

@ -62,7 +62,7 @@ class WifiClient(Client):
return reg.value(0)
@property
def rssi(self) -> Union[float, None]:
def rssi(self) -> Union[int, None]:
"""
Current signal strength. Returns -128 when not connected., dB
"""
@ -108,38 +108,38 @@ class WifiClient(Client):
"""
Automatically connect to named network if available. Also set password if network is not open.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_wifi_CMD_add_network, "z z", [ssid, password]))
self.send_cmd_packed(JD_WIFI_CMD_ADD_NETWORK, [ssid, password])
def reconnect(self, ) -> None:
"""
Enable the WiFi (if disabled), initiate a scan, wait for results, disconnect from current WiFi network if any,
and then reconnect (using regular algorithm, see `set_network_priority`).
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_wifi_CMD_reconnect))
self.send_cmd_packed(JD_WIFI_CMD_RECONNECT, [])
def forget_network(self, ssid: str) -> None:
"""
Prevent from automatically connecting to named network in future.
Forgetting a network resets its priority to `0`.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_wifi_CMD_forget_network, "s", [ssid]))
self.send_cmd_packed(JD_WIFI_CMD_FORGET_NETWORK, [ssid])
def forget_all_networks(self, ) -> None:
"""
Clear the list of known networks.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_wifi_CMD_forget_all_networks))
self.send_cmd_packed(JD_WIFI_CMD_FORGET_ALL_NETWORKS, [])
def set_network_priority(self, priority: float, ssid: str) -> None:
def set_network_priority(self, priority: int, ssid: str) -> None:
"""
Set connection priority for a network.
By default, all known networks have priority of `0`.
"""
# TODO: self.sendCommand(jacdac.JDPacket.jdpacked(JD_wifi_CMD_set_network_priority, "i16 s", [priority, ssid]))
self.send_cmd_packed(JD_WIFI_CMD_SET_NETWORK_PRIORITY, [priority, ssid])
def scan(self, ) -> None:
"""
Initiate search for WiFi networks. Generates `scan_complete` event.
"""
# TODO: self.sendCommand(jacdac.JDPacket.onlyHeader(JD_wifi_CMD_scan))
self.send_cmd_packed(JD_WIFI_CMD_SCAN, [])

Просмотреть файл

@ -13,7 +13,7 @@ class WindDirectionClient(Client):
@property
def wind_direction(self) -> Union[float, None]:
def wind_direction(self) -> Union[int, None]:
"""
The direction of the wind., °
"""
@ -21,7 +21,7 @@ class WindDirectionClient(Client):
return reg.value(0)
@property
def wind_direction_error(self) -> Union[float, None]:
def wind_direction_error(self) -> Union[int, None]:
"""
(Optional) Error on the wind direction reading, °
"""