This commit is contained in:
ludruda 2021-02-01 16:00:02 +01:00
Родитель e6d8b8c87d
Коммит 78909f2945
8 изменённых файлов: 426 добавлений и 231 удалений

8
.gitignore поставляемый
Просмотреть файл

@ -13,4 +13,10 @@ exp.py
# publish # publish
*.egg-info *.egg-info
build build
dist dist
# virtualenv
Include/
Lib/
Scripts/
pyvenv.cfg

1
requirements.txt Normal file
Просмотреть файл

@ -0,0 +1 @@
azure-iot-device

Просмотреть файл

@ -6,41 +6,42 @@ import sys
from random import randint from random import randint
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__),'samples.ini')) config.read(os.path.join(os.path.dirname(__file__), "samples.ini"))
if config['DEFAULT'].getboolean('Local'): if config["DEFAULT"].getboolean("Local"):
sys.path.insert(0, 'src') sys.path.insert(0, "src")
from iotc import IOTCConnectType, IOTCLogLevel, IOTCEvents from iotc import IOTCConnectType, IOTCLogLevel, IOTCEvents
from iotc.aio import IoTCClient from iotc.aio import IoTCClient
device_id = config['DEVICE_M3']['DeviceId'] device_id = config["DEVICE_M3"]["DeviceId"]
scope_id = config['DEVICE_M3']['ScopeId'] scope_id = config["DEVICE_M3"]["ScopeId"]
key = config['DEVICE_M3']['DeviceKey'] key = config["DEVICE_M3"]["DeviceKey"]
# optional model Id for auto-provisioning # optional model Id for auto-provisioning
try: try:
model_id = config['DEVICE_M3']['ModelId'] model_id = config["DEVICE_M3"]["ModelId"]
except: except:
model_id = None model_id = None
async def on_props(propName, propValue): async def on_props(property_name, property_value, component_name):
print(propValue) print("Received {}:{}".format(property_name, property_value))
return True return True
async def on_commands(command, ack): async def on_commands(command, ack):
print(command.name) print(command.name)
await ack(command.name, 'Command received', command.request_id) await ack(command.name, "Command received", command.request_id)
async def on_enqueued_commands(command_name,command_data):
async def on_enqueued_commands(command_name, command_data):
print(command_name) print(command_name)
print(command_data) print(command_data)
# change connect type to reflect the used key (device or group) # change connect type to reflect the used key (device or group)
client = IoTCClient(device_id, scope_id, client = IoTCClient(device_id, scope_id, IOTCConnectType.IOTC_CONNECT_DEVICE_KEY, key)
IOTCConnectType.IOTC_CONNECT_SYMM_KEY, key)
if model_id != None: if model_id != None:
client.set_model_id(model_id) client.set_model_id(model_id)
@ -54,13 +55,18 @@ client.on(IOTCEvents.IOTC_ENQUEUED_COMMAND, on_enqueued_commands)
async def main(): async def main():
await client.connect() await client.connect()
await client.send_property({"writeableProp": 50})
while client.is_connected(): while client.is_connected():
await client.send_telemetry({ await client.send_telemetry(
'accelerometerX': str(randint(20, 45)), {
'accelerometerY': str(randint(20, 45)), "acceleration": {
"accelerometerZ": str(randint(20, 45)) "x": str(randint(20, 45)),
}) "y": str(randint(20, 45)),
"z": str(randint(20, 45)),
}
}
)
await asyncio.sleep(3) await asyncio.sleep(3)
asyncio.run(main()) asyncio.run(main())

Просмотреть файл

@ -5,7 +5,7 @@ import sys
with open("README.md", "r") as fh: with open("README.md", "r") as fh:
long_description = fh.read() long_description = fh.read()
version = "1.0.4" version = "1.0.5"
setuptools.setup( setuptools.setup(
name='iotc', name='iotc',

Просмотреть файл

@ -1,4 +1,3 @@
import sys import sys
import threading import threading
import time import time
@ -12,9 +11,11 @@ from datetime import datetime
__version__ = pkg_resources.get_distribution("iotc").version __version__ = pkg_resources.get_distribution("iotc").version
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
import urllib import urllib
quote = urllib.quote_plus quote = urllib.quote_plus
else: else:
import urllib.parse import urllib.parse
quote = urllib.parse.quote_plus quote = urllib.parse.quote_plus
try: try:
@ -48,7 +49,6 @@ except ImportError:
sys.exit() sys.exit()
class IOTCConnectType: class IOTCConnectType:
IOTC_CONNECT_SYMM_KEY = 1 IOTC_CONNECT_SYMM_KEY = 1
IOTC_CONNECT_X509_CERT = 2 IOTC_CONNECT_X509_CERT = 2
@ -78,8 +78,8 @@ class IOTCMessageStatus:
class IOTCEvents: class IOTCEvents:
IOTC_COMMAND = 2, IOTC_COMMAND = (2,)
IOTC_PROPERTIES = 4, IOTC_PROPERTIES = (4,)
IOTC_ENQUEUED_COMMAND = 8 IOTC_ENQUEUED_COMMAND = 8
@ -113,8 +113,8 @@ class AbstractClient:
self._prop_thread = None self._prop_thread = None
self._cmd_thread = None self._cmd_thread = None
self._enqueued_cmd_thread = None self._enqueued_cmd_thread = None
self._content_type='application%2Fjson' self._content_type = "application%2Fjson"
self._content_encoding='utf-8' self._content_encoding = "utf-8"
self._global_endpoint = "global.azure-devices-provisioning.net" self._global_endpoint = "global.azure-devices-provisioning.net"
def is_connected(self): def is_connected(self):
@ -124,7 +124,9 @@ class AbstractClient:
:rtype: bool :rtype: bool
""" """
if not self._device_client: if not self._device_client:
print("ERROR: A connection was never attempted. You need to first call connect() before querying the connection state") print(
"ERROR: A connection was never attempted. You need to first call connect() before querying the connection state"
)
else: else:
return self._device_client.connected return self._device_client.connected
@ -149,14 +151,14 @@ class AbstractClient:
""" """
self._logger.set_log_level(log_level) self._logger.set_log_level(log_level)
def set_content_type(self,content_type): def set_content_type(self, content_type):
self._content_type = quote(content_type) self._content_type = quote(content_type)
def set_content_encoding(self,content_encoding): def set_content_encoding(self, content_encoding):
self._content_encoding = content_encoding self._content_encoding = content_encoding
def _prepare_message(self,payload,properties): def _prepare_message(self, payload, properties):
msg = Message(payload,uuid.uuid4(),self._content_encoding,self._content_type) msg = Message(payload, uuid.uuid4(), self._content_encoding, self._content_type)
if bool(properties): if bool(properties):
for prop in properties: for prop in properties:
msg.custom_properties[prop] = properties[prop] msg.custom_properties[prop] = properties[prop]
@ -171,99 +173,152 @@ class AbstractClient:
self._events[eventname] = callback self._events[eventname] = callback
return 0 return 0
class IoTCClient(AbstractClient):
class IoTCClient(AbstractClient):
def __init__(self, device_id, scope_id, cred_type, key_or_cert, logger=None): def __init__(self, device_id, scope_id, cred_type, key_or_cert, logger=None):
AbstractClient.__init__(self,device_id, scope_id, cred_type, key_or_cert) AbstractClient.__init__(self, device_id, scope_id, cred_type, key_or_cert)
if logger is None: if logger is None:
self._logger = ConsoleLogger(IOTCLogLevel.IOTC_LOGGING_API_ONLY) self._logger = ConsoleLogger(IOTCLogLevel.IOTC_LOGGING_API_ONLY)
else: else:
if hasattr(logger, "info") and hasattr(logger, "debug") and hasattr(logger, "set_log_level"): if (
hasattr(logger, "info")
and hasattr(logger, "debug")
and hasattr(logger, "set_log_level")
):
self._logger = logger self._logger = logger
else: else:
print("ERROR: Logger object has unsupported format. It must implement the following functions\n\ print(
info(message);\ndebug(message);\nset_log_level(message);") "ERROR: Logger object has unsupported format. It must implement the following functions\n\
info(message);\ndebug(message);\nset_log_level(message);"
)
sys.exit() sys.exit()
def _handle_property_ack(
self,
callback,
property_name,
property_value,
property_version,
component_name=None,
):
ret = callback(property_name, property_value, component_name)
if ret:
if component_name is not None:
self._logger.debug("Acknowledging {}".format(property_name))
self.send_property(
{
"{}".format(component_name): {
"{}".format(property_name): {
"ac": 200,
"ad": "Property received",
"av": property_version,
"value": property_value,
}
}
}
)
else:
self._logger.debug("Acknowledging {}".format(property_name))
self.send_property(
{
"{}".format(property_name): {
"ac": 200,
"ad": "Property received",
"av": property_version,
"value": property_value,
}
}
)
else:
self._logger.debug('Property "{}" unsuccessfully processed'.format(property_name))
def _on_properties(self): def _on_properties(self):
self._logger.debug('Setup properties listener') self._logger.debug("Setup properties listener")
while True: while True:
try: try:
prop_cb = self._events[IOTCEvents.IOTC_PROPERTIES] prop_cb = self._events[IOTCEvents.IOTC_PROPERTIES]
except KeyError: except KeyError:
self._logger.debug('Properties callback not found') self._logger.debug("Properties callback not found")
time.sleep(10) time.sleep(10)
continue continue
patch = self._device_client.receive_twin_desired_properties_patch() patch = self._device_client.receive_twin_desired_properties_patch()
self._logger.debug( self._logger.debug("\nReceived desired properties. {}\n".format(patch))
'\nReceived desired properties. {}\n'.format(patch))
for prop in patch: for prop in patch:
if prop == '$version': is_component = False
if prop == "$version":
continue continue
ret = prop_cb(prop, patch[prop]['value']) # check if component
if ret: try:
self._logger.debug('Acknowledging {}'.format(prop)) is_component = patch[prop]["__t"]
self.send_property({ del patch[prop]["__t"]
'{}'.format(prop): { except KeyError:
"ac": 200, pass
"ad": 'Property received',
"av": patch['$version'], if is_component:
"value": patch[prop]["value"] for component_prop in patch[prop]:
} self._logger.debug(
}) 'In component "{}" for property "{}"'.format(
prop, component_prop
)
)
self._handle_property_ack(
prop_cb,
component_prop,
patch[prop][component_prop]["value"],
patch["$version"],
prop,
)
else: else:
self._logger.debug( self._handle_property_ack(
'Property "{}" unsuccessfully processed'.format(prop)) prop_cb, prop, patch[prop]["value"], patch["$version"]
)
def _cmd_ack(self, name, value, request_id): def _cmd_ack(self, name, value, request_id):
self.send_property({ self.send_property(
'{}'.format(name): { {"{}".format(name): {"value": value, "requestId": request_id}}
'value': value, )
'requestId': request_id
}
})
def _on_commands(self): def _on_commands(self):
self._logger.debug('Setup commands listener') self._logger.debug("Setup commands listener")
while True: while True:
try: try:
cmd_cb = self._events[IOTCEvents.IOTC_COMMAND] cmd_cb = self._events[IOTCEvents.IOTC_COMMAND]
except KeyError: except KeyError:
self._logger.debug('Commands callback not found') self._logger.debug("Commands callback not found")
time.sleep(10) time.sleep(10)
continue continue
# Wait for unknown method calls # Wait for unknown method calls
method_request = self._device_client.receive_method_request() method_request = self._device_client.receive_method_request()
self._logger.debug( self._logger.debug("Received command {}".format(method_request.name))
'Received command {}'.format(method_request.name)) self._device_client.send_method_response(
self._device_client.send_method_response(MethodResponse.create_from_method_request( MethodResponse.create_from_method_request(
method_request, 200, { method_request, 200, {"result": True, "data": "Command received"}
'result': True, 'data': 'Command received'} )
)) )
cmd_cb(method_request, self._cmd_ack) cmd_cb(method_request, self._cmd_ack)
def _on_enqueued_commands(self): def _on_enqueued_commands(self):
self._logger.debug('Setup enqueued commands listener') self._logger.debug("Setup enqueued commands listener")
while True: while True:
try: try:
enqueued_cmd_cb = self._events[IOTCEvents.IOTC_ENQUEUED_COMMAND] enqueued_cmd_cb = self._events[IOTCEvents.IOTC_ENQUEUED_COMMAND]
except KeyError: except KeyError:
self._logger.debug('Enqueued commands callback not found') self._logger.debug("Enqueued commands callback not found")
time.sleep(10) time.sleep(10)
continue continue
# Wait for unknown method calls # Wait for unknown method calls
c2d = self._device_client.receive_message() c2d = self._device_client.receive_message()
c2d_name=c2d.custom_properties['method-name'].split(':')[1] c2d_name = c2d.custom_properties["method-name"].split(":")[1]
self._logger.debug( self._logger.debug("Received enqueued command {}".format(c2d_name))
'Received enqueued command {}'.format(c2d_name)) enqueued_cmd_cb(c2d_name, c2d.data)
enqueued_cmd_cb(c2d_name,c2d.data)
def _send_message(self, payload, properties): def _send_message(self, payload, properties):
msg = self._prepare_message(payload,properties) msg = self._prepare_message(payload, properties)
self._device_client.send_message(msg) self._device_client.send_message(msg)
def send_property(self, payload): def send_property(self, payload):
@ -271,7 +326,7 @@ class IoTCClient(AbstractClient):
Send a property message Send a property message
:param dict payload: The properties payload. Can contain multiple properties in the form {'<propName>':{'value':'<propValue>'}} :param dict payload: The properties payload. Can contain multiple properties in the form {'<propName>':{'value':'<propValue>'}}
""" """
self._logger.debug('Sending property {}'.format(json.dumps(payload))) self._logger.debug("Sending property {}".format(json.dumps(payload)))
self._device_client.patch_twin_reported_properties(payload) self._device_client.patch_twin_reported_properties(payload)
def send_telemetry(self, payload, properties=None): def send_telemetry(self, payload, properties=None):
@ -280,7 +335,7 @@ class IoTCClient(AbstractClient):
:param dict payload: The telemetry payload. Can contain multiple telemetry fields in the form {'<fieldName1>':<fieldValue1>,...,'<fieldNameN>':<fieldValueN>} :param dict payload: The telemetry payload. Can contain multiple telemetry fields in the form {'<fieldName1>':<fieldValue1>,...,'<fieldNameN>':<fieldValueN>}
:param dict optional properties: An object with custom properties to add to the message. :param dict optional properties: An object with custom properties to add to the message.
""" """
self._logger.info('Sending telemetry message: {}'.format(payload)) self._logger.info("Sending telemetry message: {}".format(payload))
self._send_message(json.dumps(payload), properties) self._send_message(json.dumps(payload), properties)
def connect(self): def connect(self):
@ -288,58 +343,84 @@ class IoTCClient(AbstractClient):
Connects the device. Connects the device.
:raises exception: If connection fails :raises exception: If connection fails
""" """
if self._cred_type in (IOTCConnectType.IOTC_CONNECT_DEVICE_KEY, IOTCConnectType.IOTC_CONNECT_SYMM_KEY): if self._cred_type in (
IOTCConnectType.IOTC_CONNECT_DEVICE_KEY,
IOTCConnectType.IOTC_CONNECT_SYMM_KEY,
):
if self._cred_type == IOTCConnectType.IOTC_CONNECT_SYMM_KEY: if self._cred_type == IOTCConnectType.IOTC_CONNECT_SYMM_KEY:
self._key_or_cert = self._compute_derived_symmetric_key( self._key_or_cert = self._compute_derived_symmetric_key(
self._key_or_cert, self._device_id) self._key_or_cert, self._device_id
self._logger.debug('Device key: {}'.format(self._key_or_cert)) )
self._logger.debug("Device key: {}".format(self._key_or_cert))
self._provisioning_client = ProvisioningDeviceClient.create_from_symmetric_key( self._provisioning_client = (
self._global_endpoint, self._device_id, self._scope_id, self._key_or_cert) ProvisioningDeviceClient.create_from_symmetric_key(
self._global_endpoint,
self._device_id,
self._scope_id,
self._key_or_cert,
)
)
else: else:
self._key_file = self._key_or_cert['key_file'] self._key_file = self._key_or_cert["key_file"]
self._cert_file = self._key_or_cert['cert_file'] self._cert_file = self._key_or_cert["cert_file"]
try: try:
self._cert_phrase = self._key_or_cert['cert_phrase'] self._cert_phrase = self._key_or_cert["cert_phrase"]
x509 = X509(self._cert_file, self._key_file, self._cert_phrase) x509 = X509(self._cert_file, self._key_file, self._cert_phrase)
except: except:
self._logger.debug( self._logger.debug(
'No passphrase available for certificate. Trying without it') "No passphrase available for certificate. Trying without it"
)
x509 = X509(self._cert_file, self._key_file) x509 = X509(self._cert_file, self._key_file)
# Certificate provisioning # Certificate provisioning
self._provisioning_client = ProvisioningDeviceClient.create_from_x509_certificate( self._provisioning_client = (
provisioning_host=self._global_endpoint, registration_id=self._device_id, id_scope=self._scope_id, x509=x509) ProvisioningDeviceClient.create_from_x509_certificate(
provisioning_host=self._global_endpoint,
registration_id=self._device_id,
id_scope=self._scope_id,
x509=x509,
)
)
if self._model_id: if self._model_id:
self._provisioning_client.provisioning_payload = { self._provisioning_client.provisioning_payload = {
'iotcModelId': self._model_id} "iotcModelId": self._model_id
}
try: try:
registration_result = self._provisioning_client.register() registration_result = self._provisioning_client.register()
assigned_hub = registration_result.registration_state.assigned_hub assigned_hub = registration_result.registration_state.assigned_hub
self._logger.debug(assigned_hub) self._logger.debug(assigned_hub)
self._hub_conn_string = 'HostName={};DeviceId={};SharedAccessKey={}'.format( self._hub_conn_string = "HostName={};DeviceId={};SharedAccessKey={}".format(
assigned_hub, self._device_id, self._key_or_cert) assigned_hub, self._device_id, self._key_or_cert
)
self._logger.debug( self._logger.debug(
'IoTHub Connection string: {}'.format(self._hub_conn_string)) "IoTHub Connection string: {}".format(self._hub_conn_string)
)
if self._cred_type in (IOTCConnectType.IOTC_CONNECT_DEVICE_KEY, IOTCConnectType.IOTC_CONNECT_SYMM_KEY): if self._cred_type in (
IOTCConnectType.IOTC_CONNECT_DEVICE_KEY,
IOTCConnectType.IOTC_CONNECT_SYMM_KEY,
):
self._device_client = IoTHubDeviceClient.create_from_connection_string( self._device_client = IoTHubDeviceClient.create_from_connection_string(
self._hub_conn_string) self._hub_conn_string
)
else: else:
self._device_client = IoTHubDeviceClient.create_from_x509_certificate( self._device_client = IoTHubDeviceClient.create_from_x509_certificate(
x509=x509, hostname=assigned_hub, device_id=registration_result.registration_state.device_id) x509=x509,
hostname=assigned_hub,
device_id=registration_result.registration_state.device_id,
)
except: except:
t, v, tb = sys.exc_info() t, v, tb = sys.exc_info()
self._logger.info( self._logger.info("ERROR: Failed to get device provisioning information")
'ERROR: Failed to get device provisioning information')
raise t(v) raise t(v)
# Connect to iothub # Connect to iothub
try: try:
self._device_client.connect() self._device_client.connect()
self._logger.debug('Device connected') self._logger.debug("Device connected")
except: except:
t, v, tb = sys.exc_info() t, v, tb = sys.exc_info()
self._logger.info('ERROR: Failed to connect to Hub') self._logger.info("ERROR: Failed to connect to Hub")
raise t(v) raise t(v)
# setup listeners # setup listeners
@ -361,8 +442,11 @@ class IoTCClient(AbstractClient):
try: try:
secret = base64.b64decode(secret) secret = base64.b64decode(secret)
except: except:
self._logger.debug( self._logger.debug("ERROR: broken base64 secret => `" + secret + "`")
"ERROR: broken base64 secret => `" + secret + "`")
sys.exit() sys.exit()
return base64.b64encode(hmac.new(secret, msg=reg_id.encode('utf8'), digestmod=hashlib.sha256).digest()).decode('utf-8') return base64.b64encode(
hmac.new(
secret, msg=reg_id.encode("utf8"), digestmod=hashlib.sha256
).digest()
).decode("utf-8")

Просмотреть файл

@ -1,7 +1,7 @@
import sys import sys
import asyncio import asyncio
import pkg_resources import pkg_resources
from .. import AbstractClient,IOTCLogLevel, IOTCEvents, IOTCConnectType from .. import AbstractClient, IOTCLogLevel, IOTCEvents, IOTCConnectType
from azure.iot.device import X509, MethodResponse, Message from azure.iot.device import X509, MethodResponse, Message
from azure.iot.device.aio import IoTHubDeviceClient, ProvisioningDeviceClient from azure.iot.device.aio import IoTHubDeviceClient, ProvisioningDeviceClient
@ -58,98 +58,151 @@ class ConsoleLogger:
class IoTCClient(AbstractClient): class IoTCClient(AbstractClient):
def __init__(self, device_id, scope_id, cred_type, key_or_cert, logger=None): def __init__(self, device_id, scope_id, cred_type, key_or_cert, logger=None):
AbstractClient.__init__(self,device_id, scope_id, cred_type, key_or_cert) AbstractClient.__init__(self, device_id, scope_id, cred_type, key_or_cert)
if logger is None: if logger is None:
self._logger = ConsoleLogger(IOTCLogLevel.IOTC_LOGGING_API_ONLY) self._logger = ConsoleLogger(IOTCLogLevel.IOTC_LOGGING_API_ONLY)
else: else:
if hasattr(logger, "info") and hasattr(logger, "debug") and hasattr(logger, "set_log_level"): if (
hasattr(logger, "info")
and hasattr(logger, "debug")
and hasattr(logger, "set_log_level")
):
self._logger = logger self._logger = logger
else: else:
print("ERROR: Logger object has unsupported format. It must implement the following functions\n\ print(
info(message);\ndebug(message);\nset_log_level(message);") "ERROR: Logger object has unsupported format. It must implement the following functions\n\
info(message);\ndebug(message);\nset_log_level(message);"
)
sys.exit() sys.exit()
async def _handle_property_ack(
self,
callback,
property_name,
property_value,
property_version,
component_name=None,
):
ret = await callback(property_name, property_value, component_name)
if ret:
if component_name is not None:
await self._logger.debug("Acknowledging {}".format(property_name))
await self.send_property(
{
"{}".format(component_name): {
"{}".format(property_name): {
"ac": 200,
"ad": "Property received",
"av": property_version,
"value": property_value,
}
}
}
)
else:
await self._logger.debug("Acknowledging {}".format(property_name))
await self.send_property(
{
"{}".format(property_name): {
"ac": 200,
"ad": "Property received",
"av": property_version,
"value": property_value,
}
}
)
else:
await self._logger.debug(
'Property "{}" unsuccessfully processed'.format(property_name)
)
async def _on_properties(self): async def _on_properties(self):
await self._logger.debug('Setup properties listener') await self._logger.debug("Setup properties listener")
while True: while True:
try: try:
prop_cb = self._events[IOTCEvents.IOTC_PROPERTIES] prop_cb = self._events[IOTCEvents.IOTC_PROPERTIES]
except KeyError: except KeyError:
await self._logger.debug('Properties callback not found') await self._logger.debug("Properties callback not found")
await asyncio.sleep(10) await asyncio.sleep(10)
continue continue
patch = await self._device_client.receive_twin_desired_properties_patch() patch = await self._device_client.receive_twin_desired_properties_patch()
await self._logger.debug('Received desired properties. {}'.format(patch)) await self._logger.debug("Received desired properties. {}".format(patch))
for prop in patch: for prop in patch:
if prop == '$version': is_component = False
if prop == "$version":
continue continue
ret = await prop_cb(prop, patch[prop]['value']) # check if component
if ret: try:
await self._logger.debug('Acknowledging {}'.format(prop)) is_component = patch[prop]["__t"]
await self.send_property({ del patch[prop]["__t"]
'{}'.format(prop): { except KeyError:
"ac": 200, pass
"ad": 'Property received',
"av": patch['$version'], if is_component:
"value": patch[prop]["value"]} for component_prop in patch[prop]:
}) await self._logger.debug(
'In component "{}" for property "{}"'.format(
prop, component_prop
)
)
await self._handle_property_ack(
prop_cb,
component_prop,
patch[prop][component_prop]["value"],
patch["$version"],
prop,
)
else: else:
await self._logger.debug( await self._handle_property_ack(
'Property "{}" unsuccessfully processed'.format(prop)) prop_cb, prop, patch[prop]["value"], patch["$version"]
)
async def _cmd_ack(self, name, value, requestId): async def _cmd_ack(self, name, value, requestId):
await self.send_property({ await self.send_property(
'{}'.format(name): { {"{}".format(name): {"value": value, "requestId": requestId}}
'value': value, )
'requestId': requestId
}
})
async def _on_commands(self): async def _on_commands(self):
await self._logger.debug('Setup commands listener') await self._logger.debug("Setup commands listener")
while True: while True:
try: try:
cmd_cb = self._events[IOTCEvents.IOTC_COMMAND] cmd_cb = self._events[IOTCEvents.IOTC_COMMAND]
except KeyError: except KeyError:
await self._logger.debug('Commands callback not found') await self._logger.debug("Commands callback not found")
await asyncio.sleep(10) await asyncio.sleep(10)
continue continue
# Wait for unknown method calls # Wait for unknown method calls
method_request = ( method_request = await self._device_client.receive_method_request()
await self._device_client.receive_method_request() await self._logger.debug("Received command {}".format(method_request.name))
await self._device_client.send_method_response(
MethodResponse.create_from_method_request(
method_request, 200, {"result": True, "data": "Command received"}
)
) )
await self._logger.debug(
'Received command {}'.format(method_request.name))
await self._device_client.send_method_response(MethodResponse.create_from_method_request(
method_request, 200, {
'result': True, 'data': 'Command received'}
))
await cmd_cb(method_request, self._cmd_ack) await cmd_cb(method_request, self._cmd_ack)
async def _on_enqueued_commands(self): async def _on_enqueued_commands(self):
await self._logger.debug('Setup enqueued commands listener') await self._logger.debug("Setup enqueued commands listener")
while True: while True:
try: try:
enqueued_cmd_cb = self._events[IOTCEvents.IOTC_ENQUEUED_COMMAND] enqueued_cmd_cb = self._events[IOTCEvents.IOTC_ENQUEUED_COMMAND]
except KeyError: except KeyError:
await self._logger.debug('Enqueued commands callback not found') await self._logger.debug("Enqueued commands callback not found")
await asyncio.sleep(10) await asyncio.sleep(10)
continue continue
# Wait for unknown method calls # Wait for unknown method calls
c2d = await self._device_client.receive_message() c2d = await self._device_client.receive_message()
c2d_name=c2d.custom_properties['method-name'].split(':')[1] c2d_name = c2d.custom_properties["method-name"].split(":")[1]
await self._logger.debug( await self._logger.debug("Received enqueued command {}".format(c2d_name))
'Received enqueued command {}'.format(c2d_name)) await enqueued_cmd_cb(c2d_name, c2d.data)
await enqueued_cmd_cb(c2d_name,c2d.data)
async def _send_message(self, payload, properties): async def _send_message(self, payload, properties):
msg = self._prepare_message(payload,properties) msg = self._prepare_message(payload, properties)
await self._device_client.send_message(msg) await self._device_client.send_message(msg)
async def send_property(self, payload): async def send_property(self, payload):
@ -157,7 +210,7 @@ class IoTCClient(AbstractClient):
Send a property message Send a property message
:param dict payload: The properties payload. Can contain multiple properties in the form {'<propName>':{'value':'<propValue>'}} :param dict payload: The properties payload. Can contain multiple properties in the form {'<propName>':{'value':'<propValue>'}}
""" """
await self._logger.debug('Sending property {}'.format(json.dumps(payload))) await self._logger.debug("Sending property {}".format(json.dumps(payload)))
await self._device_client.patch_twin_reported_properties(payload) await self._device_client.patch_twin_reported_properties(payload)
async def send_telemetry(self, payload, properties=None): async def send_telemetry(self, payload, properties=None):
@ -166,7 +219,7 @@ class IoTCClient(AbstractClient):
:param dict payload: The telemetry payload. Can contain multiple telemetry fields in the form {'<fieldName1>':<fieldValue1>,...,'<fieldNameN>':<fieldValueN>} :param dict payload: The telemetry payload. Can contain multiple telemetry fields in the form {'<fieldName1>':<fieldValue1>,...,'<fieldNameN>':<fieldValueN>}
:param dict optional properties: An object with custom properties to add to the message. :param dict optional properties: An object with custom properties to add to the message.
""" """
await self._logger.info('Sending telemetry message: {}'.format(payload)) await self._logger.info("Sending telemetry message: {}".format(payload))
await self._send_message(json.dumps(payload), properties) await self._send_message(json.dumps(payload), properties)
async def connect(self): async def connect(self):
@ -174,58 +227,88 @@ class IoTCClient(AbstractClient):
Connects the device. Connects the device.
:raises exception: If connection fails :raises exception: If connection fails
""" """
if self._cred_type in (IOTCConnectType.IOTC_CONNECT_DEVICE_KEY, IOTCConnectType.IOTC_CONNECT_SYMM_KEY): if self._cred_type in (
IOTCConnectType.IOTC_CONNECT_DEVICE_KEY,
IOTCConnectType.IOTC_CONNECT_SYMM_KEY,
):
if self._cred_type == IOTCConnectType.IOTC_CONNECT_SYMM_KEY: if self._cred_type == IOTCConnectType.IOTC_CONNECT_SYMM_KEY:
self._key_or_cert = await self._compute_derived_symmetric_key( self._key_or_cert = await self._compute_derived_symmetric_key(
self._key_or_cert, self._device_id) self._key_or_cert, self._device_id
)
await self._logger.debug('Device key: {}'.format(self._key_or_cert)) await self._logger.debug("Device key: {}".format(self._key_or_cert))
self._provisioning_client = ProvisioningDeviceClient.create_from_symmetric_key( self._provisioning_client = (
self._global_endpoint, self._device_id, self._scope_id, self._key_or_cert) ProvisioningDeviceClient.create_from_symmetric_key(
self._global_endpoint,
self._device_id,
self._scope_id,
self._key_or_cert,
)
)
else: else:
self._key_file = self._key_or_cert['key_file'] self._key_file = self._key_or_cert["key_file"]
self._cert_file = self._key_or_cert['cert_file'] self._cert_file = self._key_or_cert["cert_file"]
try: try:
self._cert_phrase = self._key_or_cert['cert_phrase'] self._cert_phrase = self._key_or_cert["cert_phrase"]
x509 = X509(self._cert_file, self._key_file, self._cert_phrase) x509 = X509(self._cert_file, self._key_file, self._cert_phrase)
except: except:
await self._logger.debug( await self._logger.debug(
'No passphrase available for certificate. Trying without it') "No passphrase available for certificate. Trying without it"
)
x509 = X509(self._cert_file, self._key_file) x509 = X509(self._cert_file, self._key_file)
# Certificate provisioning # Certificate provisioning
self._provisioning_client = ProvisioningDeviceClient.create_from_x509_certificate( self._provisioning_client = (
provisioning_host=self._global_endpoint, registration_id=self._device_id, id_scope=self._scope_id, x509=x509) ProvisioningDeviceClient.create_from_x509_certificate(
provisioning_host=self._global_endpoint,
registration_id=self._device_id,
id_scope=self._scope_id,
x509=x509,
)
)
if self._model_id: if self._model_id:
print("Provision model Id") print("Provision model Id")
self._provisioning_client.provisioning_payload = { self._provisioning_client.provisioning_payload = {
'iotcModelId': self._model_id} "iotcModelId": self._model_id
}
try: try:
registration_result = await self._provisioning_client.register() registration_result = await self._provisioning_client.register()
assigned_hub = registration_result.registration_state.assigned_hub assigned_hub = registration_result.registration_state.assigned_hub
await self._logger.debug(assigned_hub) await self._logger.debug(assigned_hub)
self._hub_conn_string = 'HostName={};DeviceId={};SharedAccessKey={}'.format( self._hub_conn_string = "HostName={};DeviceId={};SharedAccessKey={}".format(
assigned_hub, self._device_id, self._key_or_cert) assigned_hub, self._device_id, self._key_or_cert
)
await self._logger.debug( await self._logger.debug(
'IoTHub Connection string: {}'.format(self._hub_conn_string)) "IoTHub Connection string: {}".format(self._hub_conn_string)
)
if self._cred_type in (IOTCConnectType.IOTC_CONNECT_DEVICE_KEY, IOTCConnectType.IOTC_CONNECT_SYMM_KEY): if self._cred_type in (
IOTCConnectType.IOTC_CONNECT_DEVICE_KEY,
IOTCConnectType.IOTC_CONNECT_SYMM_KEY,
):
self._device_client = IoTHubDeviceClient.create_from_connection_string( self._device_client = IoTHubDeviceClient.create_from_connection_string(
self._hub_conn_string) self._hub_conn_string
)
else: else:
self._device_client = IoTHubDeviceClient.create_from_x509_certificate( self._device_client = IoTHubDeviceClient.create_from_x509_certificate(
x509=x509, hostname=assigned_hub, device_id=registration_result.registration_state.device_id) x509=x509,
hostname=assigned_hub,
device_id=registration_result.registration_state.device_id,
)
except: except:
await self._logger.info( await self._logger.info(
'ERROR: Failed to get device provisioning information') "ERROR: Failed to get device provisioning information"
)
sys.exit() sys.exit()
# Connect to iothub # Connect to iothub
try: try:
await self._device_client.connect() await self._device_client.connect()
await self._logger.debug('Device connected') await self._logger.debug("Device connected")
self._twin = await self._device_client.get_twin()
await self._logger.debug("Current twin: {}".format(self._twin))
except: except:
await self._logger.info('ERROR: Failed to connect to Hub') await self._logger.info("ERROR: Failed to connect to Hub")
sys.exit() sys.exit()
# setup listeners # setup listeners
@ -243,8 +326,11 @@ class IoTCClient(AbstractClient):
try: try:
secret = base64.b64decode(secret) secret = base64.b64decode(secret)
except: except:
await self._logger.debug( await self._logger.debug("ERROR: broken base64 secret => `" + secret + "`")
"ERROR: broken base64 secret => `" + secret + "`")
sys.exit() sys.exit()
return base64.b64encode(hmac.new(secret, msg=reg_id.encode('utf8'), digestmod=hashlib.sha256).digest()).decode('utf-8') return base64.b64encode(
hmac.new(
secret, msg=reg_id.encode("utf8"), digestmod=hashlib.sha256
).digest()
).decode("utf-8")

Просмотреть файл

@ -103,6 +103,9 @@ class NewDeviceClient():
def patch_twin_reported_properties(self, payload): def patch_twin_reported_properties(self, payload):
return True return True
def get_twin(self):
return 'Twin'
def init(mocker): def init(mocker):
client = IoTCClient(device_id, scopeId, client = IoTCClient(device_id, scopeId,
@ -139,7 +142,7 @@ def test_onproperties_before(mocker):
mocker.patch.object(client, 'send_property', mock.Mock()) mocker.patch.object(client, 'send_property', mock.Mock())
client.on(IOTCEvents.IOTC_PROPERTIES, on_props) client.on(IOTCEvents.IOTC_PROPERTIES, on_props)
client.connect() client.connect()
on_props.assert_called_with('prop1', 40) on_props.assert_called_with('prop1', 40,None)
def test_onproperties_after(mocker): def test_onproperties_after(mocker):
@ -151,7 +154,7 @@ def test_onproperties_after(mocker):
client.on(IOTCEvents.IOTC_PROPERTIES, on_props) client.on(IOTCEvents.IOTC_PROPERTIES, on_props)
# give at least 10 seconds for the new listener to be recognized. assign the listener after connection is discouraged # give at least 10 seconds for the new listener to be recognized. assign the listener after connection is discouraged
time.sleep(11) time.sleep(11)
on_props.assert_called_with('prop1', 40) on_props.assert_called_with('prop1', 40,None)
def test_onCommands_before(mocker): def test_onCommands_before(mocker):

Просмотреть файл

@ -9,60 +9,61 @@ import sys
from contextlib import suppress from contextlib import suppress
from azure.iot.device.provisioning.models import RegistrationResult from azure.iot.device.provisioning.models import RegistrationResult
from azure.iot.device.iothub.models import MethodRequest,Message from azure.iot.device.iothub.models import MethodRequest, Message
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), '../tests.ini')) config.read(os.path.join(os.path.dirname(__file__), "../tests.ini"))
if config['TESTS'].getboolean('Local'): if config["TESTS"].getboolean("Local"):
sys.path.insert(0, 'src') sys.path.insert(0, "src")
from iotc import IOTCConnectType, IOTCLogLevel, IOTCEvents from iotc import IOTCConnectType, IOTCLogLevel, IOTCEvents
from iotc.aio import IoTCClient from iotc.aio import IoTCClient
try: try:
groupKey = config['TESTS']['GroupKey'] groupKey = config["TESTS"]["GroupKey"]
except: except:
groupKey='groupKey' groupKey = "groupKey"
try: try:
deviceKey = config['TESTS']['DeviceKey'] deviceKey = config["TESTS"]["DeviceKey"]
except: except:
deviceKey='kPufjjN/EMoyKcNiAXvlTz8H61mlhSnmvoF6dxhnysA=' deviceKey = "kPufjjN/EMoyKcNiAXvlTz8H61mlhSnmvoF6dxhnysA="
try: try:
device_id = config['TESTS']['DeviceId'] device_id = config["TESTS"]["DeviceId"]
except: except:
device_id='device_id' device_id = "device_id"
try: try:
scopeId = config['TESTS']['ScopeId'] scopeId = config["TESTS"]["ScopeId"]
except: except:
scopeId='scopeId' scopeId = "scopeId"
try: try:
assignedHub = config['TESTS']['AssignedHub'] assignedHub = config["TESTS"]["AssignedHub"]
except: except:
assignedHub='assignedHub' assignedHub = "assignedHub"
try: try:
expectedHub = config['TESTS']['ExpectedHub'] expectedHub = config["TESTS"]["ExpectedHub"]
except: except:
expectedHub='HostName=assignedHub;DeviceId=device_id;SharedAccessKey=kPufjjN/EMoyKcNiAXvlTz8H61mlhSnmvoF6dxhnysA=' expectedHub = "HostName=assignedHub;DeviceId=device_id;SharedAccessKey=kPufjjN/EMoyKcNiAXvlTz8H61mlhSnmvoF6dxhnysA="
propPayload = {'prop1': {'value': 40}, '$version': 5} propPayload = {"prop1": {"value": 40}, "$version": 5}
cmdRequestId = 'abcdef' cmdRequestId = "abcdef"
cmdName = 'command1' cmdName = "command1"
cmdPayload = 'payload' cmdPayload = "payload"
methodRequest = MethodRequest(cmdRequestId, cmdName, cmdPayload) methodRequest = MethodRequest(cmdRequestId, cmdName, cmdPayload)
enqueued_method_name='test:enqueued' enqueued_method_name = "test:enqueued"
enqueued_message = Message('test_enqueued') enqueued_message = Message("test_enqueued")
enqueued_message.custom_properties['method-name']=enqueued_method_name enqueued_message.custom_properties["method-name"] = enqueued_method_name
class NewRegState():
class NewRegState:
def __init__(self): def __init__(self):
self.assigned_hub = assignedHub self.assigned_hub = assignedHub
@ -70,13 +71,13 @@ class NewRegState():
return self.assigned_hub return self.assigned_hub
class NewProvClient(): class NewProvClient:
async def register(self): async def register(self):
reg = RegistrationResult('3o375i827i852', 'assigned', NewRegState()) reg = RegistrationResult("3o375i827i852", "assigned", NewRegState())
return reg return reg
class NewDeviceClient(): class NewDeviceClient:
async def connect(self): async def connect(self):
return True return True
@ -98,6 +99,9 @@ class NewDeviceClient():
async def patch_twin_reported_properties(self, payload): async def patch_twin_reported_properties(self, payload):
return True return True
async def get_twin(self):
return "Twin"
def async_return(result): def async_return(result):
f = asyncio.Future() f = asyncio.Future()
@ -113,20 +117,24 @@ async def stop_threads(client):
@pytest.mark.asyncio @pytest.mark.asyncio
def init(mocker): def init(mocker):
client = IoTCClient(device_id, scopeId, client = IoTCClient(
IOTCConnectType.IOTC_CONNECT_SYMM_KEY, groupKey) device_id, scopeId, IOTCConnectType.IOTC_CONNECT_SYMM_KEY, groupKey
mocker.patch('iotc.aio.ProvisioningDeviceClient.create_from_symmetric_key', )
return_value=NewProvClient()) mocker.patch(
mocker.patch('iotc.aio.IoTHubDeviceClient.create_from_connection_string', "iotc.aio.ProvisioningDeviceClient.create_from_symmetric_key",
return_value=NewDeviceClient()) return_value=NewProvClient(),
)
mocker.patch(
"iotc.aio.IoTHubDeviceClient.create_from_connection_string",
return_value=NewDeviceClient(),
)
return client return client
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_computeKey(mocker): async def test_computeKey(mocker):
client = init(mocker) client = init(mocker)
key = await client._compute_derived_symmetric_key( key = await client._compute_derived_symmetric_key(groupKey, device_id)
groupKey, device_id)
assert key == deviceKey assert key == deviceKey
@ -150,13 +158,14 @@ async def test_hubConnectionString(mocker):
async def test_onproperties_before(mocker): async def test_onproperties_before(mocker):
client = init(mocker) client = init(mocker)
async def onProps(propname, propvalue): async def on_props(propname, propvalue,component_name):
assert propname == 'prop1' assert propname == "prop1"
assert propvalue == 40 assert propvalue == 40
assert component_name == None
await stop_threads(client) await stop_threads(client)
mocker.patch.object(client, 'send_property', return_value=True) mocker.patch.object(client, "send_property", return_value=True)
client.on(IOTCEvents.IOTC_PROPERTIES, onProps) client.on(IOTCEvents.IOTC_PROPERTIES, on_props)
await client.connect() await client.connect()
try: try:
await client._prop_thread await client._prop_thread
@ -169,15 +178,16 @@ async def test_onproperties_before(mocker):
async def test_onproperties_after(mocker): async def test_onproperties_after(mocker):
client = init(mocker) client = init(mocker)
async def onProps(propname, propvalue): async def on_props(propname, propvalue, component_name):
assert propname == 'prop1' assert propname == "prop1"
assert propvalue == 40 assert propvalue == 40
assert component_name == None
await stop_threads(client) await stop_threads(client)
return True return True
mocker.patch.object(client, 'send_property', return_value=True) mocker.patch.object(client, "send_property", return_value=True)
await client.connect() await client.connect()
client.on(IOTCEvents.IOTC_PROPERTIES, onProps) client.on(IOTCEvents.IOTC_PROPERTIES, on_props)
try: try:
await client._prop_thread await client._prop_thread
@ -192,14 +202,14 @@ async def test_on_commands_before(mocker):
async def onCmds(command, ack): async def onCmds(command, ack):
ret = ack() ret = ack()
assert ret == 'mocked' assert ret == "mocked"
await stop_threads(client) await stop_threads(client)
return True return True
def mockedAck(): def mockedAck():
return 'mocked' return "mocked"
mocker.patch.object(client, '_cmd_ack', mockedAck) mocker.patch.object(client, "_cmd_ack", mockedAck)
client.on(IOTCEvents.IOTC_COMMAND, onCmds) client.on(IOTCEvents.IOTC_COMMAND, onCmds)
await client.connect() await client.connect()
@ -216,14 +226,14 @@ async def test_on_commands_after(mocker):
async def onCmds(command, ack): async def onCmds(command, ack):
ret = ack() ret = ack()
assert ret == 'mocked' assert ret == "mocked"
await stop_threads(client) await stop_threads(client)
return True return True
def mockedAck(): def mockedAck():
return 'mocked' return "mocked"
mocker.patch.object(client, '_cmd_ack', mockedAck) mocker.patch.object(client, "_cmd_ack", mockedAck)
await client.connect() await client.connect()
client.on(IOTCEvents.IOTC_COMMAND, onCmds) client.on(IOTCEvents.IOTC_COMMAND, onCmds)
@ -233,17 +243,17 @@ async def test_on_commands_after(mocker):
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_on_enqueued_commands_before(mocker): async def test_on_enqueued_commands_before(mocker):
client = init(mocker) client = init(mocker)
async def on_enqs(command_name,command_data): async def on_enqs(command_name, command_data):
assert command_name == enqueued_method_name.split(':')[1] assert command_name == enqueued_method_name.split(":")[1]
await stop_threads(client) await stop_threads(client)
return True return True
client.on(IOTCEvents.IOTC_ENQUEUED_COMMAND, on_enqs) client.on(IOTCEvents.IOTC_ENQUEUED_COMMAND, on_enqs)
await client.connect() await client.connect()
try: try:
@ -257,12 +267,11 @@ async def test_on_enqueued_commands_after(mocker):
client = init(mocker) client = init(mocker)
async def on_enqs(command_name,command_data): async def on_enqs(command_name, command_data):
assert command_name == enqueued_method_name.split(':')[1] assert command_name == enqueued_method_name.split(":")[1]
await stop_threads(client) await stop_threads(client)
return True return True
await client.connect() await client.connect()
client.on(IOTCEvents.IOTC_ENQUEUED_COMMAND, on_enqs) client.on(IOTCEvents.IOTC_ENQUEUED_COMMAND, on_enqs)
try: try: