This commit is contained in:
ludruda 2020-07-29 16:20:12 +02:00
Родитель 9eb7f9c4a7
Коммит be1e70cce0
12 изменённых файлов: 492 добавлений и 311 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -2,6 +2,7 @@
__pycache__
dist
*egg-info
MANIFEST
# Micropython
boot.py

21
LICENSE Normal file
Просмотреть файл

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2020 iot-for-all
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

177
README.md
Просмотреть файл

@ -1 +1,176 @@
Work in progress...
# Microsoft Azure IoTCentral SDK for MicroPython
[![Join the chat at https://gitter.im/iotdisc/community](https://badges.gitter.im/iotdisc.svg)](https://gitter.im/iotdisc/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Licensed under the MIT License](https://img.shields.io/badge/License-MIT-blue.svg)](https://github.com/iot-for-all/iotc-micropython-client/blob/master/LICENSE)
[![PyPI version](https://badge.fury.io/py/micropython-iotc.svg)](https://badge.fury.io/py/micropython-iotc)
### An Azure IoT Central device client library for Micropython.
This repository contains code for the Azure IoT Central SDK for Micropython. This enables micropython developers to easily create device solutions that semealessly connect to Azure IoT Central applications.
It can run on various boards with some tweaks for low-memory devices.
## Prerequisites
+ Micropython 1.12+ (recommended)
## Import ``iotc``
In most of the micropython capable boards, is sufficient to import library or install it if missing through upip.
```py
try:
import iotc
except:
import upip
upip.install('micropython-iotc')
import iotc
```
The same commands apply when running through Micropython REPL.
> **NOTE:** for low-end devices like the **ESP8266**, importing as external module can cause out-of-memory exception during execution because of the limited amount of heap space.
For this kind of boards, putting the library on flash memory as a frozen module might be the only available option.
<br/>
<br/>
Details on how to build a custom firmware for specific boards with frozen modules can be found on official micropython [github repo](https://github.com/micropython/micropython) and [website](http://docs.micropython.org/en/latest/)
## Samples
Check out the [sample repository](samples) for example code showing how the SDK can be used in the various scenarios:
## Connecting
Currently only connection through Shared Access Keys is supported.
You can use both device keys or group keys.
### Init
```py
from iotc import IoTCConnectType
id_scope = 'scopeID'
device_id = 'device_id'
sasKey = 'masterKey' # or use device key directly
conn_type=IoTCConnectType.SYMM_KEY # or use DEVICE_KEY if working with device keys
client = IoTCClient(id_scope, device_id, conn_type, sasKey)
```
You can pass a logger instance to have your custom log implementation. (see [#Logging](#logging))
e.g.
```py
from iotc import ConsoleLogger,IoTCLogLevel
logger = ConsoleLogger(IoTCLogLevel.ALL)
client = IoTCClient(id_scope, device_id, conn_type, sasKey, logger)
```
### Connect
```py
client.connect()
```
After successfull connection, IOTC context is available for further commands.
## Operations
### Send telemetry
```py
client.send_telemetry(payload,properties=None)
```
e.g. Send telemetry every 3 seconds
```py
while client.is_connected():
print('Sending telemetry')
client.send_telemetry({'temperature':randint(0,20),'pressure':randint(0,20),'acceleration':{'x':randint(0,20),'y':randint(0,20)}})
sleep(3)
```
An optional *properties* object can be included in the send methods, to specify additional properties for the message (e.g. timestamp,etc... ).
Properties can be custom or part of the reserved ones (see list [here](https://github.com/Azure/azure-iot-sdk-csharp/blob/master/iothub/device/src/MessageSystemPropertyNames.cs#L36)).
> **NOTE:** Payload content type and encoding are set by default to 'application/json' and 'utf-8'. Alternative values can be set using these functions:<br/>
_iotc.set_content_type(content_type)_ # .e.g 'text/plain'
_iotc.set_content_encoding(content_encoding)_ # .e.g 'ascii'
### Send property update
```py
client.send_property({'fieldName':'fieldValue'})
```
### Listen to properties update
Subscribe to properties update event before calling _connect()_:
```py
client.on(IoTCEvents.PROPERTIES, callback)
```
To provide property sync aknowledgement, the callback must return the
property value if has been successfully applied or nothing.
e.g.
```py
def on_props(prop_name, prop_value):
if prop_value>10:
# process property
return prop_value
client.on(IoTCEvents.PROPERTIES, on_props)
```
### Listen to commands
Subscribe to command events before calling _connect()_:
```py
client.on(IoTCEvents.COMMANDS, callback)
```
To provide feedbacks for the command like execution result or progress, the client can call the **ack** function available in the callback.
The function accepts 2 arguments: the command instance and a custom response message.
```py
def on_commands(command, ack):
print(command.name)
ack(command, 'Command received')
client.on(IoTCEvents.COMMANDS, on_commands)
```
## Logging
The default log prints to serial console operations status and errors.
This is the _API_ONLY_ logging level.
The function __set_log_level()__ can be used to change options or disable logs. It accepts a _IoTCLogLevel_ value among the following:
- IoTCLogLevel.DISABLED (log disabled)
- IoTCLogLevel.API_ONLY (information and errors, default)
- IoTCLogLevel.ALL (all messages, debug and underlying errors)
The device client also accepts an optional Logger instance to redirect logs to other targets than console.
The custom class must implement three methods:
- info(message)
- debug(message)
- set_log_level(message);
## One-touch device provisioning and approval
A device can send custom data during provision process: if a device is aware of its IoT Central template Id, then it can be automatically provisioned.
### How to set IoTC template ID in your device
Template Id can be found in the device explorer page of IoTCentral
![Img](assets/modelid_1.png)
![Img](assets/modelid_2.png)
Then call this method before connect():
```py
client.set_model_id('<modelId>');
```
### Manual approval (default)
By default device auto-approval in IoT Central is disabled, which means that administrator needs to approve the device registration to complete the provisioning process.
This can be done from explorer page after selecting the device
![Img](assets/manual_approval.jpg)
### Automatic approval
To change default behavior, administrator can enable device auto-approval from Device Connection page under the Administration section.
With automatic approval a device can be provisioned without any manual action and can start sending/receiving data after status changes to "Provisioned"
![Img](assets/auto_approval.jpg)
## License
This samples is licensed with the MIT license. For more information, see [LICENSE](./LICENSE)

Двоичные данные
assets/auto_approval.jpg Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 74 KiB

Двоичные данные
assets/manual_approval.jpg Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 50 KiB

Двоичные данные
assets/modelid_1.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 121 KiB

Двоичные данные
assets/modelid_2.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 150 KiB

Просмотреть файл

@ -0,0 +1,218 @@
from iotc.constants import *
from iotc.provision import ProvisioningClient
import ure
import json
from utime import time,sleep
import gc
try:
from umqtt.robust import MQTTClient
except:
print('Mqtt library not found. Installing...')
import upip
upip.install('micropython-umqtt.robust')
from umqtt.robust import MQTTClient
gc.collect()
class Command():
def __init__(self, cmd_name, request_id):
self._cmd_name = cmd_name
self._request_id = request_id
@property
def name(self):
return self._cmd_name
@property
def payload(self):
return self._payload
@payload.setter
def payload(self,value):
self._payload=value
@property
def request_id(self):
return self._request_id
class IoTCClient():
def __init__(self, id_scope, device_id, credentials_type: IoTCConnectType, credentials, logger=None):
self._device_id = device_id
self._id_scope = id_scope
self._credentials_type = credentials_type
self._content_type = 'application%2Fjson'
self._content_encoding = 'utf-8'
self._connected = False
self._credentials = credentials
self._events = {}
self._model_id = None
if logger is not None:
self._logger = logger
else:
self._logger = ConsoleLogger(IoTCLogLevel.API_ONLY)
self._twin_request_id = time()
def set_content_type(self, content_type):
self._content_type = encode_uri_component(content_type)
def set_content_encoding(self, content_encoding):
self._content_encoding = content_encoding
def set_log_level(self,log_level:IoTCLogLevel):
self._logger.set_log_level(log_level)
def _on_message(self, topic, message):
topic = topic.decode('utf-8')
if topic == HubTopics.TWIN_RES.format(200, self._twin_request_id):
self._logger.info('Received twin: {}'.format(message))
if topic.startswith(HubTopics.PROPERTIES):
# desired properties
self._logger.info(
'Received desired property message: {}'.format(message))
message = json.loads(message.decode('utf-8'))
self.on_properties_update(message)
elif topic.startswith(HubTopics.COMMANDS):
# commands
self._logger.info(
'Received command {} with message: {}'.format(topic, message))
match = self._commands_regex.match(topic)
if match is not None:
if all(m is not None for m in [match.group(1), match.group(2)]):
command_name = match.group(1)
command_req = match.group(2)
command = Command(command_name, command_req)
if message is not None:
command.payload = message
self._on_commands(command)
elif topic.startswith(HubTopics.ENQUEUED_COMMANDS.format(self._device_id)):
params = topic.split(
"devices/{}/messages/devicebound/".format(self._device_id), 1)[1].split('&')
for param in params:
p = param.split('=')
if p[0] == "method-name":
command_name = p[1].split("Commands%3A")[1]
self._logger.info(
'Received enqueued command {} with message: {}'.format(command_name, message))
command = Command(command_name, None)
if message is not None:
command.payload = message
self._on_enqueued_commands(command)
def connect(self):
prov = ProvisioningClient(
self._id_scope, self._device_id, self._credentials_type,self._credentials,self._logger,self._model_id)
creds = prov.register()
self._mqtt_client = MQTTClient(self._device_id, creds.host, 8883, creds.user.encode(
'utf-8'), creds.password.encode('utf-8'), ssl=True, keepalive=60)
self._commands_regex = ure.compile(
'\$iothub\/methods\/POST\/(.+)\/\?\$rid=(.+)')
self._mqtt_client.connect(False)
self._connected = True
self._logger.info('Device connected!')
self._mqtt_client.set_callback(self._on_message)
self._mqtt_client.subscribe(HubTopics.TWIN)
self._mqtt_client.subscribe('{}/#'.format(HubTopics.PROPERTIES))
self._mqtt_client.subscribe('{}/#'.format(HubTopics.COMMANDS))
self._mqtt_client.subscribe(
'{}/#'.format(HubTopics.ENQUEUED_COMMANDS.format(self._device_id)))
self._logger.debug(self._twin_request_id)
self._mqtt_client.publish(
HubTopics.TWIN_REQ.format(self._twin_request_id).encode('utf-8'), '{{}}')
def is_connected(self):
if self._connected == True:
return True
return False
def set_model_id(self, model):
self._model_id = model
def send_property(self, payload):
self._logger.debug('Sending properties {}'.format(json.dumps(payload)))
self._mqtt_client.publish(
HubTopics.PROP_REPORT.format(time()).encode('utf-8'), json.dumps(payload))
def send_telemetry(self, payload, properties=None):
topic = 'devices/{}/messages/events/?$.ct={}&$.ce={}'.format(
self._device_id, self._content_type, self._content_encoding)
if properties is not None:
for prop in properties:
topic += '{}={}&'.format(encode_uri_component(prop),
encode_uri_component(properties[prop]))
topic = topic[:-1]
self._mqtt_client.publish(topic.encode(
'utf-8'), json.dumps(payload).encode('utf-8'))
def on(self, event, callback):
self._events[event] = callback
def listen(self):
if not self.is_connected():
return
self._mqtt_client.ping()
self._mqtt_client.wait_msg()
sleep(1)
def on_properties_update(self, patch):
try:
prop_cb = self._events[IoTCEvents.PROPERTIES]
except:
return
for prop in patch:
if prop == '$version':
continue
ret = prop_cb(prop, patch[prop]['value'])
if ret:
self._logger.debug('Acknowledging {}'.format(prop))
self.send_property({
'{}'.format(prop): {
"value": patch[prop]["value"],
'status': 'completed',
'desiredVersion': patch['$version'],
'message': 'Property received'}
})
else:
self._logger.debug(
'Property "{}" unsuccessfully processed'.format(prop))
def _cmd_resp(self, command: Command, value):
self._logger.debug(
'Responding to command "{}" request'.format(command.name))
self.send_property({
'{}'.format(command.name): {
'value': value,
'requestId': command.request_id
}
})
def _cmd_ack(self, command: Command):
self._logger.debug('Acknowledging command {}'.format(command.name))
self._mqtt_client.publish(
'$iothub/methods/res/{}/?$rid={}'.format(200, command.request_id).encode('utf-8'), '')
def _on_commands(self, command: Command):
try:
cmd_cb = self._events[IoTCEvents.COMMANDS]
except KeyError:
return
self._logger.debug(
'Received command {}'.format(command.name))
self._cmd_ack(command)
cmd_cb(command, self._cmd_resp)
def _on_enqueued_commands(self, command: Command):
try:
cmd_cb = self._events[IoTCEvents.ENQUEUED_COMMANDS]
except KeyError:
return
self._logger.debug(
'Received enqueued command {}'.format(command.name))
self._cmd_ack(command)
cmd_cb(command)

63
iotc/constants.py Normal file
Просмотреть файл

@ -0,0 +1,63 @@
class IoTCLogLevel:
DISABLED = 1
API_ONLY = 2
ALL = 3
class IoTCConnectType:
SYMM_KEY = 1
DEVICE_KEY = 2
class IoTCEvents:
PROPERTIES = 1
COMMANDS = 2
ENQUEUED_COMMANDS = 3
class HubTopics:
TWIN = '$iothub/twin/res/#'
TWIN_REQ = '$iothub/twin/GET/?$rid={}'
TWIN_RES = '$iothub/twin/res/{}/?$rid={}'
PROPERTIES = '$iothub/twin/PATCH/properties/desired'
PROP_REPORT = '$iothub/twin/PATCH/properties/reported/?$rid={}'
COMMANDS = '$iothub/methods/POST'
ENQUEUED_COMMANDS = 'devices/{}/messages/devicebound'
class ConsoleLogger:
def __init__(self, log_level=IoTCLogLevel.API_ONLY):
self._log_level = log_level
def _log(self, message):
print(message)
def info(self, message):
if self._log_level != IoTCLogLevel.DISABLED:
self._log(message)
def debug(self, message):
if self._log_level == IoTCLogLevel.ALL:
self._log(message)
def set_log_level(self, log_level):
self._log_level = log_level
unsafe = {
'?': '%3F',
' ': '%20',
'$': '%24',
'%': '%25',
'&': '%26',
"\'": '%27',
'/': '%2F',
':': '%3A',
';': '%3B',
'+': '%2B',
'=': '%3D',
'@': '%40'
}
def encode_uri_component(string):
ret = ''
for char in string:
if char in unsafe:
char = unsafe[char]
ret = '{}{}'.format(ret, char)
return ret

Просмотреть файл

@ -1,250 +0,0 @@
import gc
gc.collect()
from utime import time,sleep
gc.collect()
import json
gc.collect()
try:
from umqtt.robust import MQTTClient
gc.collect()
except:
print('Mqtt library not found. Installing...')
import upip
upip.install('micropython-umqtt.robust')
from umqtt.robust import MQTTClient
unsafe = {
'?': '%3F',
' ': '%20',
'$': '%24',
'%': '%25',
'&': '%26',
"\'": '%27',
'/': '%2F',
':': '%3A',
';': '%3B',
'+': '%2B',
'=': '%3D',
'@': '%40'
}
def encode_uri_component(string):
ret = ''
for char in string:
if char in unsafe:
char = unsafe[char]
ret = '{}{}'.format(ret, char)
return ret
class Command():
def __init__(self, cmd_name, request_id):
self._cmd_name = cmd_name
self._request_id = request_id
@property
def name(self):
return self._cmd_name
@property
def payload(self):
return self._payload
@payload.setter
def payload(self,value):
self._payload=value
@property
def request_id(self):
return self._request_id
class IoTCEvents:
PROPERTIES = 1
COMMANDS = 2
ENQUEUED_COMMANDS = 3
class HubTopics:
TWIN = '$iothub/twin/res/#'
TWIN_REQ = '$iothub/twin/GET/?$rid={}'
TWIN_RES = '$iothub/twin/res/{}/?$rid={}'
PROPERTIES = '$iothub/twin/PATCH/properties/desired'
PROP_REPORT = '$iothub/twin/PATCH/properties/reported/?$rid={}'
COMMANDS = '$iothub/methods/POST'
ENQUEUED_COMMANDS = 'devices/{}/messages/devicebound'
class DeviceClient():
def __init__(self,device_id, credentials, logger):
# clean up modules
try:
import sys
del sys.modules['iotc.provision']
del sys.modules['iotc']
except:
pass
self._device_id=device_id
self._content_type='application%2Fjson'
self._content_encoding='utf-8'
self._connected=False
self._events = {}
self._logger = logger
self._twin_request_id = time()
self._mqtt_client = MQTTClient(self._device_id,credentials.host, 8883, credentials.user.encode(
'utf-8'), credentials.password.encode('utf-8'), ssl=True,keepalive=60)
import ure
gc.collect()
self._commands_regex=ure.compile('\$iothub\/methods\/POST\/(.+)\/\?\$rid=(.+)')
def set_content_type(self,content_type):
self._content_type = encode_uri_component(content_type)
def set_content_encoding(self,content_encoding):
self._content_encoding = content_encoding
def _on_message(self, topic, message):
topic = topic.decode('utf-8')
if topic == HubTopics.TWIN_RES.format(200, self._twin_request_id):
self._logger.info('Received twin: {}'.format(message))
if topic.startswith(HubTopics.PROPERTIES):
# desired properties
self._logger.info(
'Received desired property message: {}'.format(message))
message = json.loads(message.decode('utf-8'))
self.on_properties_update(message)
elif topic.startswith(HubTopics.COMMANDS):
# commands
self._logger.info('Received command {} with message: {}'.format(topic,message))
match=self._commands_regex.match(topic)
if match is not None:
if all(m is not None for m in [match.group(1),match.group(2)]):
command_name=match.group(1)
command_req=match.group(2)
command=Command(command_name,command_req)
if message is not None:
command.payload=message
self._on_commands(command)
elif topic.startswith(HubTopics.ENQUEUED_COMMANDS.format(self._device_id)):
params=topic.split("devices/{}/messages/devicebound/".format(self._device_id),1)[1].split('&')
for param in params:
p=param.split('=')
if p[0] == "method-name":
command_name=p[1].split("Commands%3A")[1]
self._logger.info('Received enqueued command {} with message: {}'.format(command_name,message))
command=Command(command_name,None)
if message is not None:
command.payload=message
self._on_enqueued_commands(command)
def connect(self):
self._mqtt_client.connect(False)
self._connected = True
self._logger.info('Device connected!')
self._mqtt_client.set_callback(self._on_message)
self._mqtt_client.subscribe(HubTopics.TWIN)
self._mqtt_client.subscribe('{}/#'.format(HubTopics.PROPERTIES))
self._mqtt_client.subscribe('{}/#'.format(HubTopics.COMMANDS))
self._mqtt_client.subscribe(
'{}/#'.format(HubTopics.ENQUEUED_COMMANDS.format(self._device_id)))
self._logger.debug(self._twin_request_id)
self._mqtt_client.publish(
HubTopics.TWIN_REQ.format(self._twin_request_id).encode('utf-8'), '{{}}')
def is_connected(self):
if self._connected == True:
return True
return False
def set_model_id(self,model):
self._model_id=model
def send_property(self, payload):
self._logger.debug('Sending properties {}'.format(json.dumps(payload)))
self._mqtt_client.publish(
HubTopics.PROP_REPORT.format(time()).encode('utf-8'), json.dumps(payload))
def send_telemetry(self,payload,properties=None):
topic = 'devices/{}/messages/events/?$.ct={}&$.ce={}'.format(self._device_id,self._content_type,self._content_encoding)
if properties is not None:
for prop in properties:
topic+='{}={}&'.format(encode_uri_component(prop),encode_uri_component(properties[prop]))
topic=topic[:-1]
self._mqtt_client.publish(topic.encode('utf-8'),json.dumps(payload).encode('utf-8'))
def on(self, event, callback):
self._events[event] = callback
def listen(self):
if not self.is_connected():
return
self._mqtt_client.ping()
self._mqtt_client.wait_msg()
sleep(1)
def on_properties_update(self, patch):
try:
prop_cb = self._events[IoTCEvents.PROPERTIES]
except:
return
for prop in patch:
if prop == '$version':
continue
ret = prop_cb(prop, patch[prop]['value'])
if ret:
self._logger.debug('Acknowledging {}'.format(prop))
self.send_property({
'{}'.format(prop): {
"value": patch[prop]["value"],
'status': 'completed',
'desiredVersion': patch['$version'],
'message': 'Property received'}
})
else:
self._logger.debug(
'Property "{}" unsuccessfully processed'.format(prop))
def _cmd_resp(self, command:Command,value):
self._logger.debug('Responding to command "{}" request'.format(command.name))
self.send_property({
'{}'.format(command.name): {
'value': value,
'requestId': command.request_id
}
})
def _cmd_ack(self,command:Command):
self._logger.debug('Acknowledging command {}'.format(command.name))
self._mqtt_client.publish('$iothub/methods/res/{}/?$rid={}'.format(200,command.request_id).encode('utf-8'),'')
def _on_commands(self,command:Command):
try:
cmd_cb = self._events[IoTCEvents.COMMANDS]
except KeyError:
return
self._logger.debug(
'Received command {}'.format(command.name))
self._cmd_ack(command)
cmd_cb(command, self._cmd_resp)
def _on_enqueued_commands(self,command:Command):
try:
cmd_cb = self._events[IoTCEvents.ENQUEUED_COMMANDS]
except KeyError:
return
self._logger.debug(
'Received enqueued command {}'.format(command.name))
self._cmd_ack(command)
cmd_cb(command)

Просмотреть файл

@ -1,23 +0,0 @@
class IoTCLogLevel:
DISABLED = 1
API_ONLY = 2
ALL = 3
class ConsoleLogger:
def __init__(self, log_level=IoTCLogLevel.API_ONLY):
self._log_level = log_level
def _log(self, message):
print(message)
def info(self, message):
if self._log_level != IoTCLogLevel.DISABLED:
self._log(message)
def debug(self, message):
if self._log_level == IoTCLogLevel.ALL:
self._log(message)
def set_log_level(self, log_level):
self._log_level = log_level

Просмотреть файл

@ -1,51 +1,27 @@
import sys
import gc
gc.collect()
import json
from iotc.constants import IoTCConnectType,encode_uri_component,ConsoleLogger,IoTCLogLevel
import ubinascii
import hashlib
from iotc.hmac import new as hmac
gc.collect()
try:
from utime import time, sleep
gc.collect()
except:
print('ERROR: missing dependency `utime`')
sys.exit(1)
gc.collect()
try:
import urequests
gc.collect()
except:
import upip
upip.install('micropython-urequests')
import urequests
import json
unsafe = {
'?': '%3F',
' ': '%20',
'$': '%24',
'%': '%25',
'&': '%26',
"\'": '%27',
'/': '%2F',
':': '%3A',
';': '%3B',
'+': '%2B',
'=': '%3D',
'@': '%40'
}
def encode_uri_component(string):
ret = ''
for char in string:
if char in unsafe:
char = unsafe[char]
ret = '{}{}'.format(ret, char)
return ret
gc.collect()
class IoTCConnectType:
SYMM_KEY = 1
DEVICE_KEY = 2
x509_CERT = 3
class Credentials:
@ -78,7 +54,10 @@ class ProvisioningClient():
self._registration_id = registration_id
self._credentials_type = credentials_type
self._api_version = '2019-01-15'
self._logger = logger
if logger is not None:
self._logger=logger
else:
self._logger=ConsoleLogger(IoTCLogLevel.DISABLED)
if model_id is not None:
self._model_id = model_id
@ -176,9 +155,6 @@ class ProvisioningClient():
return None
def _compute_key(self, key, payload):
import ubinascii
import hashlib
from iotc.hmac import new as hmac
try:
secret = ubinascii.a2b_base64(key)
except: