This commit is contained in:
lucadruda 2019-11-30 16:43:00 +01:00
Коммит a02aa636f7
18 изменённых файлов: 2167 добавлений и 0 удалений

Двоичные данные
.DS_Store поставляемый Normal file

Двоичный файл не отображается.

14
.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,14 @@
# Python
exp.py
.pytest_cache/
**/__pycache__
*.pyc
*.pyo
# VSCODE
.vscode/
# publish
iotc.egg-info
build
dist

62
CHANGELOG Executable file
Просмотреть файл

@ -0,0 +1,62 @@
### 0.1.5
- add baltimore.pem into pypi packaging
- fix README - pypi package name
- Enable: verify server certificate
- remove setProtocol (won't support)
- clean up error codes
- remove redundant loop on paho
- parse command name for user
### 0.1.6
- Support Python 3.4+
- make sure json parse input is string
- add more detail to json.parse failure messages
- use decode encoding instead of str for mqtt events
### 0.1.7
- adjust testing for micropython client
- merge mqtt publish interface
- pass the mqtt channel noise
### 0.2.0
- add x509 authentication support
- Update README
### 0.2.2
- fix documentation typos and add docs for modelData ++ enable it
### 0.2.3
- force device settings to be delivered on start
- enable 'setModelData'
- new 'getDeviceSettings' method to request device settings at any given time
### 0.3.1
- new `sendTelemetry` interface for system property support. see README
- update to 'SettingsUpdated' logic. Now individual setting values include '$version'
- new 'setExitOnError' interface provides an option to enable exiting from app on mqtt layer issues.
### 0.3.2
- change isConnected status to false if mqtt rc == 1
- new `setTokenExpiration` interface for manually defining token expiration time
### 0.3.3
- add `getHostName` API
- extend `connect()` with an optional `hostName` arg. => `connect(hostName)`
### 0.3.4
- adding support for changing the QoS value of MQTT publishing events (firedog1024)
### 0.3.5
- fix spacing issue with httplib (lucadruda)

1
MANIFEST.in Executable file
Просмотреть файл

@ -0,0 +1 @@
include src/iotc/baltimore.pem

386
README.md Executable file
Просмотреть файл

@ -0,0 +1,386 @@
## iotc - Azure IoT Central - Python (light) device SDK Documentation
### Prerequisites
Python 2.7+ or Python 3.4+ or Micropython 1.9+
*Runtime dependencies vary per platform*
### Install
Python 2/3
```
pip install iotc
```
### Common Concepts
- API calls should return `0` on success and `error code` otherwise.
- External API naming convention follows `lowerCamelCase` for `Device` class members
- Asyncronous commands must be acknowledge as any other sync commands but execution updates should be sent through the `sendCommand` function
### Usage
```
import iotc
device = iotc.Device(scopeId, keyORCert, deviceId, credType)
```
- *scopeId* : Azure IoT DPS Scope Id
- *keyORcert* : Group or device symmetric key or x509 Certificate
- *deviceId* : Device Id
- *credType* : `IOTConnectType.IOTC_CONNECT_SYMM_KEY`,`IOTConnectType.IOTC_CONNECT_DEVICE_KEY` or `IOTConnectType.IOTC_CONNECT_X509_CERT`
`keyORcert` for `X509` certificate:
```
credType = IOTConnectType.IOTC_CONNECT_X509_CERT
keyORcert = {
"keyfile": "/src/python/test/device.key.pem",
"certfile": "/src/python/test/device.cert.pem"
}
```
`keyORcert` for `SAS` token:
```
credType = IOTConnectType.IOTC_CONNECT_SYMM_KEY
keyORcert = "xxxxxxxxxxxxxxx........"
```
#### setLogLevel
set logging level
```
device.setLogLevel(logLevel)
```
*logLevel* : (default value is `IOTC_LOGGING_DISABLED`)
```
class IOTLogLevel:
IOTC_LOGGING_DISABLED = 1
IOTC_LOGGING_API_ONLY = 2
IOTC_LOGGING_ALL = 16
```
*i.e.* => `device.setLogLevel(IOTLogLevel.IOTC_LOGGING_API_ONLY)`
#### setExitOnError
enable/disable application termination on mqtt later exceptions. (default false)
```
device.setExitOnError(isEnabled)
```
*i.e.* => `device.setExitOnError(True)`
#### setModelData
set the device model data (if any)
```
device.setModelData(modelJSON)
```
*modelJSON* : Device model definition.
*i.e.* => `device.setModelData({"__iot:interfaces": {"CapabilityModelId": "<PUT_MODEL_ID_HERE>"}})`
#### setInterfaces
set the interfaces exposed by the device. If interfaces are not declared, related properties and commands will not be received by the device and telemetry will have no effect
```
device.setInterfaces(interfaceJSON)
```
*interfacesJSON* : Interfaces object
*i.e.* => `device.setInterfaces({"[PUT_INTERFACE_1_NAME_HERE]": "[PUT_INTERFACE_1_ID_HERE]", ... ,"[PUT_INTERFACE_N_NAME_HERE]": "[PUT_INTERFACE_N_ID_HERE]"})`
#### setTokenExpiration
set the token expiration timeout. default is 21600 seconds (6 hours)
```
device.setTokenExpiration(totalSeconds)
```
*totalSeconds* : timeout in seconds.
*i.e.* => `device.setTokenExpiration(600)`
#### setServiceHost
set the service endpoint URL
```
device.setServiceHost(url)
```
*url* : URL for service endpoint. (default value is `global.azure-devices-provisioning.net`)
*call this before connect*
#### setQosLevel
Set the MQTT Quality of Service (QoS) level desired for all MQTT publish calls
```
device.setQosLevel(qosLevel)
```
*qosLevel* : (default value is `IOTC_QOS_AT_MOST_ONCE`)
```
class IOTQosLevel:
IOTC_QOS_AT_MOST_ONCE = 0
IOTC_QOS_AT_LEAST_ONCE = 1
```
Note: IOTC_QOS_AT_LEAST_ONCE will have slower performance than IOTC_QOS_AT_MOST_ONCE as the MQTT client must store the value for possible replay and also wait for an acknowledgement from the IoT Hub that the MQTT message has been received. Think of IOTC_QOS_AT_MOST_ONCE as "fire and forget" vs. IOTC_QOS_AT_LEAST_ONCE as "guaranteed delivery". As the developer you should consider the importance of 100% data delivery vs. increased connection time and data traffic over the data connection.
*call this before connect*
#### connect
connect device client `# blocking`. Raises `ConnectionStatus` event.
```
device.connect()
```
or
```
device.connect(hostName)
```
*i.e.* => `device.connect()`
#### sendTelemetry
send telemetry
```
device.sendTelemetry(payload, [[optional system properties]])
```
*payload* : A text payload.
*i.e.* => `device.sendTelemetry('{ "temperature": 15 }')`
You may also set system properties for the telemetry message. See also [iothub message format](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-messages-construct)
*i.e.* => `device.sendTelemetry('{ "temperature":22 }', {"iothub-creation-time-utc": time.time()})`
#### sendState
send device state
```
device.sendState(payload)
```
*payload* : A text payload.
*i.e.* => `device.sendState('{ "status": "WARNING"}')`
#### sendProperty
send reported property
```
device.sendProperty(payload)
```
*payload* : A text payload.
*i.e.* => `device.sendProperty('{"countdown":{"value": %d}}')`
#### sendCommandUpdate
send execution updates for asyncronous messages
```
device.sendCommandUpdate(interface_name, command_name, request_id, status_code, update_message)
```
*interface_name* : Command interface name.
*command_name* : Command name.
*request_id* : Request Id for the command. This must be the same value received for the command in on("Command"). Can be obtain by calling _getPayload()["requestId"]_ of the callback object (see example)
*status_code* : Update status code.
*update_message* : A text message.
*i.e.* => `device.sendCommandUpdate("commands", "reboot", "a4795148-155f-4d75-92bf-3536bc0cbcee", 200, "Progress")`
`
#### doNext
let framework do the partial MQTT work.
```
device.doNext()
```
#### isConnected
returns whether the connection was established or not.
```
device.isConnected()
```
*i.e.* => `device.isConnected()`
#### disconnect
disconnect device client
```
device.disconnect()
```
*i.e.* => `device.disconnect()`
#### getDeviceProperties
pulls latest twin data (device properties). Raises `PropertiesUpdated` event.
```
device.getDeviceProperties()
```
*i.e.* => `device.getDeviceProperties()`
#### getHostName
returns the iothub hostname cached during the initial connection.
```
device.getHostName()
```
*i.e.* => `device.getDeviceHostName()`
#### on
set event callback to listen events
- `ConnectionStatus` : connection status has changed
- `MessageSent` : message was sent
- `Command` : a command received from Azure IoT Central
- `PropertiesUpdated` : device properties were updated
i.e.
```
def onconnect(info):
if info.getStatusCode() == 0:
print("connected!")
device.on("ConnectionStatus", onconnect)
```
```
def onmessagesent(info):
print("message sent -> " + info.getPayload())
device.on("MessageSent", onmessagesent)
```
```
def oncommand(info):
print("command name:", info.getTag())
print("command args: ", info.getPayload())
device.on("Command", oncommand)
```
```
def onpropertiesupdated(info):
print("property name:", info.getTag())
print("property value: ", info.getPayload())
device.on("Updated", onpropertiesupdated)
```
#### callback info class
`iotc` callbacks have a single argument derived from `IOTCallbackInfo`.
Using this interface, you can get the callback details and respond back when it's necessary.
public members of `IOTCallbackInfo` are;
`getResponseCode()` : get response code or `None`
`getResponseMessage()` : get response message or `None`
`setResponse(responseCode, responseMessage)` : set callback response
*i.e.* => `info.setResponse(200, 'completed')`
`getClient()` : get active `device` client
`getEventName()` : get the name of the event
`getPayload()` : get the payload or `None`
`getTag()` : get the tag or `None`
`getStatusCode()` : get callback status code
#### sample app
```
import iotc
from iotc import IOTConnectType, IOTLogLevel, IOTQosLevel
from random import randint
deviceId = "DEVICE_ID"
scopeId = "SCOPE_ID"
key = "SYMM_KEY"
# see iotc.Device documentation above for x509 argument sample
iotc = iotc.Device(scopeId, key, deviceId,
IOTConnectType.IOTC_CONNECT_SYMM_KEY)
iotc.setLogLevel(IOTLogLevel.IOTC_LOGGING_ALL)
iotc.setQosLevel(IOTQosLevel.IOTC_QOS_AT_MOST_ONCE)
gCanSend = False
gCounter = 0
def onconnect(info):
global gCanSend
print("- [onconnect] => status:" + str(info.getStatusCode()))
if info.getStatusCode() == 0:
if iotc.isConnected():
gCanSend = True
def onmessagesent(info):
print("\t- [onmessagesent] => " + str(info.getPayload()))
def oncommand(info):
interface_name = info.getInterface()
command_name = info.getTag()
request_id = None
try:
command_value = info.getPayload()["value"]
request_id = info.getPayload()["requestId"]
except:
command_value = info.getPayload()
resp = "Received"
info.setResponse(200,resp)
def onpropertiesupdated(info):
print("property name:", info.getTag())
print("property value: ", info.getPayload())
iotc.on("ConnectionStatus", onconnect)
iotc.on("MessageSent", onmessagesent)
iotc.on("Command", oncommand)
iotc.on("Properties", onpropertiesupdated)
iotc.setModelData(
{"__iot:interfaces": {"CapabilityModelId": "MODEL_ID"}})
# Put interfaces for the specified model
iotc.setInterfaces({"IFNAME": "IFID"})
iotc.connect()
registered = False
while iotc.isConnected():
iotc.doNext() # do the async work needed to be done for MQTT
if gCanSend == True:
if gCounter % 20 == 0:
gCounter = 0
iotc.sendTelemetry({"temperature": str(randint(20, 45))}, {
"$.ifid": 'IFID',
"$.ifname": 'IFNAME',
"$.schema": 'temperature'})
iotc.sendTelemetry({"pressure": str(randint(20, 45))}, {
"$.ifid": 'IFID',
"$.ifname": 'IFNAME',
"$.schema": 'pressure'})
gCounter += 1
```

92
app.py Normal file
Просмотреть файл

@ -0,0 +1,92 @@
import sys
sys.path.insert(1, 'src')
import iotc
from iotc import IOTConnectType, IOTLogLevel, IOTQosLevel
from random import randint
deviceId = "py4"
scopeId = "0ne0009AC0E"
key = "djD3Cx3HzdyoX/gLSmw33pNwde/LovdJiXbbzR4ybrDwOBbuKyd17efy/DwDtc91f/kaWQbMqdPqS2buJf5zOA=="
# see iotc.Device documentation above for x509 argument sample
iotc = iotc.Device(scopeId, key, deviceId,
IOTConnectType.IOTC_CONNECT_SYMM_KEY)
iotc.setLogLevel(IOTLogLevel.IOTC_LOGGING_ALL)
iotc.setQosLevel(IOTQosLevel.IOTC_QOS_AT_MOST_ONCE)
gCanSend = False
gCounter = 0
def onconnect(info):
global gCanSend
print("- [onconnect] => status:" + str(info.getStatusCode()))
if info.getStatusCode() == 0:
if iotc.isConnected():
gCanSend = True
def onmessagesent(info):
print("\t- [onmessagesent] => " + str(info.getPayload()))
def oncommand(info):
interface_name = info.getInterface()
command_name = info.getTag()
request_id = None
try:
command_value = info.getPayload()["value"]
request_id = info.getPayload()["requestId"]
except:
command_value = info.getPayload()
resp = "Received"
if command_name == "echo":
resp = command_value
info.setResponse(200, resp)
elif command_name == "reboot":
info.setResponse(200, resp)
def cb():
# global gNeedAsync
iotc.sendCommandUpdate(
interface_name, command_name, request_id, 200, "Progress")
info.setCallback(cb)
def onpropertiesupdated(info):
print("property name:", info.getTag())
print("property value: ", info.getPayload())
iotc.on("ConnectionStatus", onconnect)
iotc.on("MessageSent", onmessagesent)
iotc.on("Command", oncommand)
iotc.on("Properties", onpropertiesupdated)
iotc.setModelData(
{"__iot:interfaces": {"CapabilityModelId": "urn:mxchip:mxchip_iot_devkit:2"}})
iotc.setInterfaces({"settings": "urn:mxchip:settings:1", "sensors": "urn:mxchip:built_in_sensors:1",
"leds": "urn:mxchip:built_in_leds:1", "screen": "urn:mxchip:screen:2"})
iotc.connect()
registered = False
while iotc.isConnected():
iotc.doNext() # do the async work needed to be done for MQTT
if gCanSend == True:
if gCounter % 20 == 0:
gCounter = 0
iotc.sendTelemetry({"temperature": str(randint(20, 45))}, {
"$.ifid": 'urn:mxchip:built_in_sensors:1',
"$.ifname": 'sensors',
"$.schema": 'temperature'})
iotc.sendTelemetry({"pressure": str(randint(20, 45))}, {
"$.ifid": 'urn:mxchip:built_in_sensors:1',
"$.ifname": 'sensors',
"$.schema": 'pressure'})
gCounter += 1

4
iotz.json Executable file
Просмотреть файл

@ -0,0 +1,4 @@
{
"name": "iotc-device",
"toolchain": "micro-python"
}

45
notes.txt Normal file
Просмотреть файл

@ -0,0 +1,45 @@
if (message.properties && message.properties.count() > 0) {
for (var i = 0; i < message.properties.count(); i++) {
if (i > 0 || sysPropString)
topic += '&';
topic += encodeURIComponent(message.properties.propertyList[i].key) + '=' + encodeURIComponent(message.properties.propertyList[i].value);
}
}
devices/mxchip2/messages/events/%24.ifid=built_in_sensors&%24.ifname=sensors&%24.schema=temperature
{"desired":{"$iotin:settings":{"fanSpeed":{"value":4}},"$version":2},"reported":{"$version":1}}
{\"$iotin:settings\":{\"fanSpeed\":{\"value\":10,\"sc\":200,\"sd\":\"helpful descriptive text\",\"sv\":19}}}
$iothub/twin/PATCH/properties/reported/?$rid=16298574-2703-40d6-83b2-94b4ec35f9c5
topic
"$iothub/twin/PATCH/properties/reported/?$rid=0590bc31-db1d-428f-8cbd-183028dc2e07"
body.toString()
"{"$iotin:settings":{"fanSpeed":{"value":17,"sc":200,"sd":"helpful descriptive text","sv":11}}}"
devices/py1/messages/events/%24.ct=application%2Fjson&%24.ce=utf-8&%24.ifid=urn%3Aazureiot%3AModelDiscovery%3AModelInformation%3A1&%24.ifname=urn_azureiot_ModelDiscovery_ModelInformation&%24.schema=modelInformation
{"$iotin:urn_azureiot_Client_SDKInformation":{"version":{"value":"azure-iot-digitaltwins-device/1.0.0-preview.3"}}}
{"$iotin:urn_azureiot_Client_SDKInformation":{"vendor":{"value":"Microsoft Corporation"}}}
JSON.stringify(_this.properties.reported)
"{"$iotin:urn_azureiot_Client_SDKInformation":{"language":{"value":"Node.js"},"version":{"value":"azure-iot-digitaltwins-device/1.0.0-preview.3"},"vendor":{"value":"Microsoft Corporation"}},"$iotin:settings":{"fanSpeed":{"value":14,"sc":200,"sd":"helpful descriptive text","sv":22}},"$version":43}"
JSON.stringify(_this.properties.desired)
"{"$iotin:settings":{"fanSpeed":{"value":14}},"$version":22}"
"{"propertyList":[{"key":"iothub-message-schema","value":"asyncResult"},{"key":"iothub-command-name","value":"reboot"},{"key":"iothub-command-request-id","value":"1bc3d109-9e70-42ad-9a9b-d2417f462e94"},{"key":"iothub-command-statuscode","value":"200"},{"key":"$.ifname","value":"screen"}]}"
message.data
""progress1""
JSON.stringify(dest)
"{"$iotin:urn_azureiot_Client_SDKInformation":{"language":{"value":"Node.js"},"version":{"value":"azure-iot-digitaltwins-device/1.0.0-preview.3"},"vendor":{"value":"Microsoft Corporation"}},"$iotin:settings":{"fanSpeed":{"value":16,"sc":200,"sd":"helpful descriptive text","sv":25}},"$version":55}"
"{"modelInformation":{"capabilityModelId":"urn:mxchip:mxchip_iot_devkit:1","interfaces":{"urn_azureiot_ModelDiscovery_ModelInformation":"urn:azureiot:ModelDiscovery:ModelInformation:1","urn_azureiot_Client_SDKInformation":"urn:azureiot:Client:SDKInformation:1","settings":"urn:mxchip:settings:1"}}}"

38
play.py Normal file
Просмотреть файл

@ -0,0 +1,38 @@
import json
obj = {"desired": {"$iotin:settings": {"fanSpeed": {"value": 4}, "ledColor": {
"value": "red"}}, "$version": 2}, "reported": {"$version": 1}}
version = None
reported = []
if 'desired' in obj:
obj = obj['desired']
version = obj['$version']
for attr, value in obj.items():
if attr != '$version':
if not attr.startswith('$iotin:'):
continue
ifname = attr
print("Interface name: `{0}`".format(ifname))
for propName, propValue in value.items():
print("PropName:{0}, PropValue:{1}".format(
propName, propValue))
try:
eventValue = json.loads(json.dumps(propValue))
if version != None:
eventValue['sv'] = version
prop = {"ifname": ifname, "propName": propName,
"eventValue": eventValue}
reported.append(prop)
except:
continue
for item in reported:
ret_code = 200
ret_message = "completed"
item["eventValue"]["sc"] = ret_code
item["eventValue"]["sd"] = ret_message
patch={}
prop={"{0}".format(item["propName"]):item["eventValue"]}
patch[item["ifname"]]=prop
msg = json.dumps(patch)
print(msg)

12
publish.sh Executable file
Просмотреть файл

@ -0,0 +1,12 @@
#!/bin/bash
rm -rf build/ dist/ src/iotc/iotc_device.egg-info src/iotc/_pycache_ src/iotc/_init_.pyc
TEST=""
if [[ $1 == 'test' ]]; then
TEST="-r testpypi"
fi
python2 setup.py sdist bdist_wheel
python3 setup.py sdist bdist_wheel
twine upload dist/* $TEST

36
setup.py Executable file
Просмотреть файл

@ -0,0 +1,36 @@
import setuptools
import sys
sys.path.insert(0, 'src')
from iotc import __version__, __name__
with open("README.md", "r") as fh:
long_description = fh.read()
setuptools.setup(
name=__name__,
version=__version__,
author="Oguz Bastemur",
author_email="ogbastem@microsoft.com",
description="Azure IoT Central device client for Python",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/Azure/iot-central-firmware",
packages=setuptools.find_packages('src'),
package_dir={'': 'src'},
license="MIT",
platform="OS Independent",
keywords="iot,azure,iotcentral",
classifiers=[
'License :: OSI Approved :: MIT License',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy'
],
include_package_data=True,
install_requires=["paho-mqtt", "httplib2"]
)

943
src/iotc/__init__.py Executable file
Просмотреть файл

@ -0,0 +1,943 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license.
__version__ = "0.3.6-beta.1"
__name__ = "iotc"
import sys
from .constants import CONSTANTS
class IOTConnectType:
IOTC_CONNECT_SYMM_KEY = 1
IOTC_CONNECT_X509_CERT = 2
IOTC_CONNECT_DEVICE_KEY = 3
gIsMicroPython = ('implementation' in dir(sys)) and ('name' in dir(
sys.implementation)) and (sys.implementation.name == 'micropython')
try:
import urllib
except ImportError:
print("ERROR: missing dependency `micropython-urllib`")
print("Try micropython -m upip install micropython-urequests ",
"micropython-time micropython-urllib.parse",
"micropython-json micropython-hashlib",
"micropython-hmac micropython-base64",
"micropython-threading micropython-ssl",
"micropython-umqtt.simple micropython-socket micropython-urequests"
)
sys.exit()
http = None
if gIsMicroPython == False:
try:
import httplib as http
except ImportError:
import http.client as http
else:
try:
import urequests
except ImportError:
print("ERROR: missing dependency micropython-urequests")
sys.exit()
try:
import time
except ImportError:
print("ERROR: missing dependency `micropython-time`")
sys.exit()
if gIsMicroPython:
try:
import usocket
except ImportError:
print("ERROR: missing dependency `micropython-usocket`")
sys.exit()
try:
import json
except ImportError:
print("ERROR: missing dependency `micropython-json`")
sys.exit()
try:
import uuid
except ImportError:
print("ERROR: missing dependency `micropython-uuid`")
sys.exit()
try:
import hashlib
except ImportError:
print("ERROR: missing dependency `micropython-hashlib`")
sys.exit()
try:
import hmac
except ImportError:
print("ERROR: missing dependency `micropython-hmac`")
sys.exit()
try:
import base64
except ImportError:
print("ERROR: missing dependency `micropython-base64`")
sys.exit()
try:
import ssl
except ImportError:
print("ERROR: missing dependency `micropython-ssl`")
sys.exit()
try:
from threading import Timer
except ImportError:
try:
from threading import Thread
except ImportError:
print("ERROR: missing dependency `micropython-threading`")
sys.exit()
try:
import binascii
except ImportError:
import ubinascii
mqtt = None
try:
import paho.mqtt.client as mqtt
MQTT_SUCCESS = mqtt.MQTT_ERR_SUCCESS
except ImportError:
try:
from umqtt.simple import MQTTClient
except ImportError:
if gIsMicroPython == True:
print("ERROR: missing dependency `micropython-umqtt.simple`")
else:
print("ERROR: missing dependency `paho-mqtt`")
print("IoTCentral client version %s" % __version__)
def _createMQTTClient(__self, username, passwd):
if mqtt != None:
__self._mqtts = mqtt.Client(
client_id=__self._deviceId, protocol=mqtt.MQTTv311)
__self._mqtts.on_connect = __self._onConnect
__self._mqtts.on_message = __self._onMessage
__self._mqtts.on_log = __self._onLog
__self._mqtts.on_publish = __self._onPublish
__self._mqtts.on_disconnect = __self._onDisconnect
__self._mqtts.username_pw_set(username=username, password=passwd)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_default_certs()
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.check_hostname = True
if __self._credType not in (IOTConnectType.IOTC_CONNECT_SYMM_KEY, IOTConnectType.IOTC_CONNECT_DEVICE_KEY):
ssl_context.load_cert_chain(
certfile=__self._certfile, keyfile=__self._keyfile)
__self._mqtts.tls_set_context(ssl_context)
__self._mqtts.connect_async(__self._hostname, port=8883, keepalive=120)
__self._mqtts.loop_start()
else:
__self._mqtts = MQTTClient(__self._deviceId, __self._hostname, port=8883,
user=username, password=passwd, keepalive=0, ssl=True, ssl_params={})
__self._mqtts.set_callback(__self._mqttcb)
__self._mqtts.connect()
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
class HTTP_PROXY_OPTIONS:
def __init__(self, host_address, port, username, password):
self._host_address = host_address
self._port = port
self._username = username
self._password = password
class IOTCallbackInfo:
def __init__(self, client, eventName, payload, interface, tag, status, msgid):
self._client = client
self._eventName = eventName
self._payload = payload
self._tag = tag
self._interface = interface
self._status = status
self._responseCode = None
self._responseMessage = None
self._msgid = msgid
self._callback = None
def setResponse(self, responseCode, responseMessage):
self._responseCode = responseCode
self._responseMessage = responseMessage
def setCallback(self, callback):
self._callback = callback
def getCallback(self):
return self._callback
def getClient(self):
return self._client
def getEventName(self):
return self._eventName
def getPayload(self):
return self._payload
def getTag(self):
return self._tag
def getInterface(self):
return self._interface
def getStatusCode(self):
return self._status
def getResponseCode(self):
return self._responseCode
def getResponseMessage(self):
return self._responseMessage
def getMessageId(self):
return self._msgid
class IOTProtocol:
IOTC_PROTOCOL_MQTT = 1
IOTC_PROTOCOL_AMQP = 2
IOTC_PROTOCOL_HTTP = 4
class IOTLogLevel:
IOTC_LOGGING_DISABLED = 1
IOTC_LOGGING_API_ONLY = 2
IOTC_LOGGING_ALL = 16
class IOTQosLevel:
IOTC_QOS_AT_MOST_ONCE = 0
IOTC_QOS_AT_LEAST_ONCE = 1
class IOTConnectionState:
IOTC_CONNECTION_EXPIRED_SAS_TOKEN = 1
IOTC_CONNECTION_DEVICE_DISABLED = 2
IOTC_CONNECTION_BAD_CREDENTIAL = 4
IOTC_CONNECTION_RETRY_EXPIRED = 8
IOTC_CONNECTION_NO_NETWORK = 16
IOTC_CONNECTION_COMMUNICATION_ERROR = 32
IOTC_CONNECTION_OK = 64
class IOTMessageStatus:
IOTC_MESSAGE_ACCEPTED = 1
IOTC_MESSAGE_REJECTED = 2
IOTC_MESSAGE_ABANDONED = 4
gLOG_LEVEL = IOTLogLevel.IOTC_LOGGING_DISABLED
# default is set to QoS 0 "At most once" IoT hub also supports QoS 1 "At least once"
gQOS_LEVEL = IOTQosLevel.IOTC_QOS_AT_MOST_ONCE
def LOG_IOTC(msg, level=IOTLogLevel.IOTC_LOGGING_API_ONLY):
global gLOG_LEVEL
if gLOG_LEVEL > IOTLogLevel.IOTC_LOGGING_DISABLED:
if level <= gLOG_LEVEL:
print(time.time(), msg)
return 0
def MAKE_CALLBACK(client, eventName, payload, interface, tag, status, msgid=None):
LOG_IOTC("- iotc :: MAKE_CALLBACK :: " +
eventName, IOTLogLevel.IOTC_LOGGING_ALL)
try:
obj = client["_events"]
except:
obj = client._events
if obj != None and (eventName in obj) and obj[eventName] != None:
cb = IOTCallbackInfo(client, eventName, payload,
interface, tag, status, msgid)
obj[eventName](cb)
return cb
return 0
def _quote(a, b):
# pylint: disable=no-member
global gIsMicroPython
features = dir(urllib)
if gIsMicroPython == False and int(sys.version[0]) < 3:
return urllib.quote(a, safe=b)
else:
return urllib.parse.quote(a)
def _get_cert_path():
file_path = __file__[:len(__file__) - len("__init__.py")]
# check for .py(c) diff
file_path = file_path[:len(
file_path) - 1] if file_path[len(file_path) - 1:] == "_" else file_path
LOG_IOTC("- iotc :: _get_cert_path :: " +
file_path, IOTLogLevel.IOTC_LOGGING_ALL)
return file_path + "baltimore.pem"
def _doRequest(device, target_url, method, body, headers):
conn = http.HTTPSConnection(
device._dpsEndPoint, '443', cert_file=device._certfile, key_file=device._keyfile)
req_headers = {"Content-Type": headers["content-type"],
"User-Agent": headers["user-agent"],
"Accept": headers["Accept"],
"Accept-Encoding": "gzip, deflate"}
if "authorization" in headers:
req_headers["authorization"] = headers["authorization"]
if body != None:
req_headers["Content-Length"] = str(len(body))
conn.request(method, target_url, body, req_headers)
response = conn.getresponse()
return response.read()
def _request(device, target_url, method, body, headers):
content = None
if http != None:
return _doRequest(device, target_url, method, body, headers)
else:
if device._certfile != None:
LOG_IOTC(
"ERROR: micropython client requires the client certificate is embedded.")
sys.exit()
response = urequests.request(
method, target_url, data=body, headers=headers)
return response.text
class Device:
def __init__(self, scopeId, keyORCert, deviceId, credType):
self._mqtts = None
self._loopInterval = 2
self._mqttConnected = False
self._deviceId = deviceId
self._scopeId = scopeId
self._credType = credType
self._hostname = None
self._auth_response_received = None
self._messages = {}
self._loopTry = 0
self._protocol = IOTProtocol.IOTC_PROTOCOL_MQTT
self._dpsEndPoint = "global.azure-devices-provisioning.net"
self._modelData = None
self._sslVerificiationIsEnabled = True
self._dpsAPIVersion = "2019-03-31"
self._keyfile = None
self._certfile = None
self._addMessageTimeStamp = False
self._exitOnError = False
self._tokenExpires = 21600
self._events = {
"MessageSent": None,
"ConnectionStatus": None,
"Command": None,
"Properties": None
}
self._defaultCapabilities = {CONSTANTS.MODEL_INFORMATION_KEY: {CONSTANTS.CAPABILITY_MODEL_KEY: "urn:mxchip:sample_device:1", CONSTANTS.INTERFACES_KEY: {"urn_azureiot_ModelDiscovery_ModelInformation":
"urn:azureiot:ModelDiscovery:ModelInformation:1", "urn_azureiot_Client_SDKInformation": "urn:azureiot:Client:SDKInformation:1", "deviceinfo": "urn:azureiot:DeviceManagement:DeviceInformation:1"}}}
self._interfaces = {}
if self._credType in (IOTConnectType.IOTC_CONNECT_DEVICE_KEY, IOTConnectType.IOTC_CONNECT_SYMM_KEY):
if self._credType == IOTConnectType.IOTC_CONNECT_SYMM_KEY:
keyORCert = self._computeDrivedSymmetricKey(
keyORCert, self._deviceId)
self._keyORCert = keyORCert
else:
self._keyfile = keyORCert["keyfile"]
self._certfile = keyORCert["certfile"]
def setTokenExpiration(self, totalSeconds):
self._tokenExpires = totalSeconds
return 0
def setExitOnError(self, isEnabled):
self._exitOnError = isEnabled
return 0
def setSSLVerification(self, isEnabled):
if self._auth_response_received:
LOG_IOTC("ERROR: setSSLVerification should be called before `connect`")
return 1
self._sslVerificiationIsEnabled = isEnabled
return 0
def setModelData(self, data):
LOG_IOTC("- iotc :: setModelData :: " +
json.dumps(data), IOTLogLevel.IOTC_LOGGING_ALL)
if self._auth_response_received:
LOG_IOTC("ERROR: setModelData should be called before `connect`")
return 1
self._modelData = data
return 0
def setDPSEndpoint(self, endpoint):
LOG_IOTC("- iotc :: setDPSEndpoint :: " +
endpoint, IOTLogLevel.IOTC_LOGGING_ALL)
if self._auth_response_received:
LOG_IOTC("ERROR: setDPSEndpoint should be called before `connect`")
return 1
self._dpsEndPoint = endpoint
return 0
def setLogLevel(self, logLevel):
global gLOG_LEVEL
if logLevel < IOTLogLevel.IOTC_LOGGING_DISABLED or logLevel > IOTLogLevel.IOTC_LOGGING_ALL:
LOG_IOTC("ERROR: (setLogLevel) invalid argument.")
return 1
gLOG_LEVEL = logLevel
return 0
def setInterfaces(self, interfaces):
try:
json.loads(json.dumps(interfaces))
self._interfaces = interfaces
return 0
except:
LOG_IOTC(
"ERROR: Not a valid interface object")
return 1
def setQosLevel(self, qosLevel):
global gQOS_LEVEL
if qosLevel < IOTQosLevel.IOTC_QOS_AT_MOST_ONCE or qosLevel > IOTQosLevel.IOTC_QOS_AT_LEAST_ONCE:
LOG_IOTC(
"ERROR: Only QOS level 0 (at most once) or 1 (at least once) is supported by IoT Hub")
return 1
gQOS_LEVEL = qosLevel
return 0
def _computeDrivedSymmetricKey(self, secret, regId):
# pylint: disable=no-member
global gIsMicroPython
try:
secret = base64.b64decode(secret)
except:
LOG_IOTC("ERROR: broken base64 secret => `" + secret + "`")
sys.exit()
if gIsMicroPython == False:
return base64.b64encode(hmac.new(secret, msg=regId.encode('utf8'), digestmod=hashlib.sha256).digest())
else:
return base64.b64encode(hmac.new(secret, msg=regId.encode('utf8'), digestmod=hashlib._sha256.sha256).digest())
def _loopAssign(self, operationId, headers):
uri = "https://%s/%s/registrations/%s/operations/%s?api-version=%s" % (
self._dpsEndPoint, self._scopeId, self._deviceId, operationId, self._dpsAPIVersion)
LOG_IOTC("- iotc :: _loopAssign :: " +
uri, IOTLogLevel.IOTC_LOGGING_ALL)
target = urlparse(uri)
content = _request(self, target.geturl(), "GET", None, headers)
try:
data = json.loads(content.decode("utf-8"))
except:
try:
data = json.loads(content)
except Exception as e:
err = "ERROR: %s => %s", (str(e), content)
LOG_IOTC(err)
return self._mqttConnect(err, None)
if data != None and 'status' in data:
if data['status'] == 'assigning':
time.sleep(3)
if self._loopTry < 20:
self._loopTry = self._loopTry + 1
return self._loopAssign(operationId, headers)
else:
# todo error code
LOG_IOTC("ERROR: Unable to provision the device.")
data = "Unable to provision the device."
return 1
elif data['status'] == "assigned":
state = data['registrationState']
self._hostName = state['assignedHub']
LOG_IOTC("IoTHub host: `{}`".format(
self._hostName), IOTLogLevel.IOTC_LOGGING_ALL)
return self._mqttConnect(None, self._hostName)
else:
data = str(data)
return self._mqttConnect("DPS L => " + str(data), None)
def _onConnect(self, client, userdata, _, rc):
LOG_IOTC("- iotc :: _onConnect :: rc = " +
str(rc), IOTLogLevel.IOTC_LOGGING_ALL)
if rc == 0:
self._mqttConnected = True
self._auth_response_received = True
# send capability telemetry message
capabilityData = self._defaultCapabilities
for ifname, ifid in self._interfaces.items():
capabilityData[CONSTANTS.MODEL_INFORMATION_KEY][CONSTANTS.INTERFACES_KEY].update({
ifname: ifid})
self._sendCommon(CONSTANTS.DEVICE_CAPABILITIES_MESSAGE.format(
self._deviceId), json.dumps(capabilityData))
self.doNext()
self._sendCommon(CONSTANTS.DEVICETWIN_PATCH_MESSAGE.format(uuid.uuid4()), json.dumps({"$iotin:urn_azureiot_Client_SDKInformation": {"language": {"value": "Python"}, "version": {
"value": "0.3.5"}, "vendor": {"value": "Azure"}}, "$iotin:deviceinfo": {"manufacturer": {"model": "imodeli"}, "swVersion": {"value": "0.0.1"}}}))
self.doNext()
def _echoDesired(self, msg, topic):
LOG_IOTC("- iotc :: _echoDesired :: " +
topic, IOTLogLevel.IOTC_LOGGING_ALL)
obj = None
try:
obj = json.loads(msg)
except Exception as e:
LOG_IOTC(
"ERROR: JSON parse for Properties message object has failed. => " + msg + " => " + str(e))
return
version = None
if 'desired' in obj:
obj = obj['desired']
if not '$version' in obj:
LOG_IOTC("ERROR: Unexpected payload for properties update => " + msg)
return 1
version = obj['$version']
reported = []
for attr, value in obj.items():
if attr != '$version':
if not attr.startswith('$iotin:'):
continue
ifname = attr
for propName, propValue in value.items():
try:
eventValue = json.loads(json.dumps(propValue))
if version != None:
eventValue['sv'] = version
prop = {"ifname": ifname, "propName": propName,
"eventValue": eventValue}
reported.append(prop)
except:
continue
for prop in reported:
ret = MAKE_CALLBACK(
self, "Properties", prop["eventValue"], prop["ifname"], prop["propName"], 0)
if not topic.startswith('$iothub/twin/res/200/?$rid=') and version != None:
self._reportDesired(prop, ret)
def _reportDesired(self, property, ret):
ret_code = 200
ret_message = "completed"
if ret.getResponseCode() != None:
ret_code = ret.getResponseCode()
if ret.getResponseMessage() != None:
ret_message = ret.getResponseMessage()
property["eventValue"]["sc"] = ret_code
property["eventValue"]["sd"] = ret_message
patch = {}
prop = {"{0}".format(property["propName"]): property["eventValue"]}
patch[property["ifname"]] = prop
msg = json.dumps(patch)
print("Sending `"+msg+"`")
topic = '$iothub/twin/PATCH/properties/reported/?$rid={}'.format(
uuid.uuid4())
self._sendCommon(topic, msg, True)
self.doNext()
# report sdk info
# self._reportSdk()
def _reportSdk(self):
self._sendCommon('$iothub/twin/PATCH/properties/reported/?$rid={}'.format(
uuid.uuid4()), json.dumps({"$iotin:urn_azureiot_Client_SDKInformation": {"version": {"value": CONSTANTS.SDK_VERSION}}}))
self.doNext()
self._sendCommon('$iothub/twin/PATCH/properties/reported/?$rid={}'.format(
uuid.uuid4()), json.dumps({"$iotin:urn_azureiot_Client_SDKInformation": {"vendor": {"value": CONSTANTS.SDK_VENDOR}}}))
self.doNext()
def _onMessage(self, client, _, data):
topic = ""
msg = None
if data == None:
LOG_IOTC("WARNING: (_onMessage) data is None.")
return
LOG_IOTC("- iotc :: _onMessage :: topic(" + str(data.topic) +
") payload(" + str(data.payload) + ")", IOTLogLevel.IOTC_LOGGING_ALL)
if data.payload != None:
try:
msg = data.payload.decode("utf-8")
except:
msg = str(data.payload)
if data.topic != None:
try:
topic = data.topic.decode("utf-8")
except:
topic = str(data.topic)
if topic.startswith('$iothub/'): # twin
# DO NOT need to echo twin response since IOTC api takes care of the desired messages internally
# if topic.startswith('$iothub/twin/res/'): # twin response
# self._handleTwin(topic, msg)
#
# twin desired property change
if topic.startswith('$iothub/twin/PATCH/properties/desired/') or topic.startswith('$iothub/twin/res/200/?$rid='):
self._echoDesired(msg, topic)
elif topic.startswith('$iothub/methods'): # Commands
index = topic.find("$rid=")
request_id = 1
command_name = "None"
interface_name = "None"
if index == -1:
LOG_IOTC("ERROR: Command doesn't include topic id")
else:
request_id = topic[index + 5:]
topic_template = "$iothub/methods/POST/"
len_temp = len(topic_template)
command_id = topic[len_temp:topic.find("/", len_temp + 1)]
ifprefix = command_id.find(":")+1
interface_name = command_id[ifprefix:command_id.find("*")]
command_name = command_id[ifprefix + len(
interface_name)+1:len(command_id)]
payload = None
try:
payload = json.loads(msg)["commandRequest"]
except:
None
ret = MAKE_CALLBACK(self, "Command", payload,
interface_name, command_name, 0)
ret_code = 200
ret_message = "{}"
if ret.getResponseCode() != None:
ret_code = ret.getResponseCode()
if ret.getResponseMessage() != None:
ret_message = ret.getResponseMessage()
next_topic = '$iothub/methods/res/{}/?$rid={}'.format(
ret_code, request_id)
LOG_IOTC("Command: => " + command_name + " of interface " +
interface_name + " with data " + json.dumps(payload), IOTLogLevel.IOTC_LOGGING_ALL)
(result, msg_id) = self._mqtts.publish(
next_topic, "\"{}\"".format(ret_message), qos=gQOS_LEVEL)
if result != MQTT_SUCCESS:
LOG_IOTC(
"ERROR: (send method callback) failed to send. MQTT client return value: " + str(result))
if ret.getCallback() != None:
ret.getCallback()()
else:
if not topic.startswith('$iothub/twin/res/'): # not twin response
LOG_IOTC('ERROR: unknown twin! {} - {}'.format(topic, msg))
else:
LOG_IOTC('ERROR: (unknown message) {} - {}'.format(topic, msg))
def _onLog(self, client, userdata, level, buf):
global gLOG_LEVEL
if gLOG_LEVEL > IOTLogLevel.IOTC_LOGGING_API_ONLY:
LOG_IOTC("mqtt-log : " + buf)
elif level <= 8:
LOG_IOTC("mqtt-log : " + buf) # transport layer exception
if self._exitOnError:
sys.exit()
def _onDisconnect(self, client, userdata, rc):
LOG_IOTC("- iotc :: _onDisconnect :: rc = " +
str(rc), IOTLogLevel.IOTC_LOGGING_ALL)
self._auth_response_received = True
if rc == 5:
LOG_IOTC("on(disconnect) : Not authorized")
self.disconnect()
if rc == 1:
self._mqttConnected = False
if rc != 5:
MAKE_CALLBACK(self, "ConnectionStatus", userdata, "", "", rc)
def _onPublish(self, client, data, msgid):
LOG_IOTC("- iotc :: _onPublish :: " + str(data),
IOTLogLevel.IOTC_LOGGING_ALL)
if data == None:
data = ""
if msgid != None and (str(msgid) in self._messages) and self._messages[str(msgid)] != None:
MAKE_CALLBACK(self, "MessageSent",
self._messages[str(msgid)], None, data, 0)
if (str(msgid) in self._messages):
del self._messages[str(msgid)]
def _mqttcb(self, topic, msg):
# NOOP
pass
def _mqttConnect(self, err, hostname):
if err != None:
LOG_IOTC("ERROR : (_mqttConnect) " + str(err))
return 1
LOG_IOTC("- iotc :: _mqttConnect :: " +
hostname, IOTLogLevel.IOTC_LOGGING_ALL)
self._hostname = hostname
passwd = None
username = '{}/{}/api-version={}'.format(
self._hostname, self._deviceId, CONSTANTS.HUB_API_VERSION)
LOG_IOTC("Username: `{}`".format(username),
IOTLogLevel.IOTC_LOGGING_ALL)
if self._credType in (IOTConnectType.IOTC_CONNECT_SYMM_KEY, IOTConnectType.IOTC_CONNECT_DEVICE_KEY):
passwd = self._gen_sas_token(
self._hostname, self._deviceId, self._keyORCert)
LOG_IOTC("Password: `{}`".format(passwd),
IOTLogLevel.IOTC_LOGGING_ALL)
_createMQTTClient(self, username, passwd)
LOG_IOTC(" - iotc :: _mqttconnect :: created mqtt client. connecting..",
IOTLogLevel.IOTC_LOGGING_ALL)
if mqtt != None:
while self._auth_response_received == None:
self.doNext()
LOG_IOTC(" - iotc :: _mqttconnect :: on_connect must be fired. Connected ? " +
str(self.isConnected()), IOTLogLevel.IOTC_LOGGING_ALL)
if not self.isConnected():
return 1
else:
self._mqttConnected = True
self._auth_response_received = True
self._mqtts.subscribe(
'devices/{}/messages/events/#'.format(self._deviceId))
self._mqtts.subscribe(
'devices/{}/messages/deviceBound/#'.format(self._deviceId))
# twin desired property changes
self._mqtts.subscribe('$iothub/twin/PATCH/properties/desired/#')
self._mqtts.subscribe('$iothub/twin/res/#') # twin properties response
self._mqtts.subscribe('$iothub/methods/#')
if self.getDeviceProperties() == 0:
MAKE_CALLBACK(self, "ConnectionStatus", None, None, None, 0)
else:
return 1
return 0
def getDeviceProperties(self):
LOG_IOTC("- iotc :: getDeviceProperties :: ",
IOTLogLevel.IOTC_LOGGING_ALL)
self.doNext()
return self._sendCommon("$iothub/twin/GET/?$rid=0", " ")
def getHostName(self):
return self._hostName
def connect(self, hostName=None):
LOG_IOTC("- iotc :: connect :: ", IOTLogLevel.IOTC_LOGGING_ALL)
if hostName != None:
self._hostName = hostName
return self._mqttConnect(None, self._hostName)
expires = int(time.time() + self._tokenExpires)
authString = None
if self._credType in (IOTConnectType.IOTC_CONNECT_DEVICE_KEY, IOTConnectType.IOTC_CONNECT_SYMM_KEY):
sr = self._scopeId + "%2Fregistrations%2F" + self._deviceId
sigNoEncode = self._computeDrivedSymmetricKey(
self._keyORCert, sr + "\n" + str(expires))
sigEncoded = _quote(sigNoEncode, '~()*!.\'')
authString = "SharedAccessSignature sr=" + sr + "&sig=" + \
sigEncoded + "&se=" + str(expires) + "&skn=registration"
headers = {
"content-type": "application/json; charset=utf-8",
"user-agent": "iot-central-client/1.0",
"Accept": "*/*"
}
if authString != None:
headers["authorization"] = authString
if self._modelData != None:
body = "{\"registrationId\":\"%s\",\"payload\":%s}" % (
self._deviceId, json.dumps(self._modelData))
else:
body = "{\"registrationId\":\"%s\"}" % (self._deviceId)
uri = "https://%s/%s/registrations/%s/register?api-version=%s" % (
self._dpsEndPoint, self._scopeId, self._deviceId, self._dpsAPIVersion)
target = urlparse(uri)
content = _request(self, target.geturl(), "PUT", body, headers)
data = None
try:
data = json.loads(content.decode("utf-8"))
except:
try:
data = json.loads(content)
except Exception as e:
err = "ERROR: non JSON is received from %s => %s .. message : %s", (
self._dpsEndPoint, content, str(e))
LOG_IOTC(err)
return self._mqttConnect(err, None)
if 'errorCode' in data:
err = "DPS => " + str(data)
return self._mqttConnect(err, None)
else:
time.sleep(1)
return self._loopAssign(data['operationId'], headers)
def _gen_sas_token(self, hub_host, device_name, key):
token_expiry = int(time.time() + self._tokenExpires)
uri = hub_host + "%2Fdevices%2F" + device_name
signed_hmac_sha256 = self._computeDrivedSymmetricKey(
key, uri + "\n" + str(token_expiry))
signature = _quote(signed_hmac_sha256, '~()*!.\'')
# somewhere along the crypto chain a newline is inserted
if signature.endswith('\n'):
signature = signature[:-1]
token = 'SharedAccessSignature sr={}&sig={}&se={}'.format(
uri, signature, token_expiry)
return token
def _sendCommon(self, topic, data, noEvent=None):
if mqtt != None:
(result, msg_id) = self._mqtts.publish(topic, data, qos=gQOS_LEVEL)
if result != mqtt.MQTT_ERR_SUCCESS:
LOG_IOTC(
"ERROR: (sendTelemetry) failed to send. MQTT client return value: " + str(result) + "")
return 1
else:
self._mqtts.publish(topic, data, qos=gQOS_LEVEL)
msg_id = 0
self._messages[str(msg_id)] = None
if noEvent == None:
self._messages[str(msg_id)] = data
if mqtt == None:
self._onPublish(None, topic, msg_id)
return 0
def sendTelemetry(self, data, systemProperties=None):
if not isinstance(data, str):
data = json.dumps(data)
LOG_IOTC("- iotc :: sendTelemetry :: " +
data, IOTLogLevel.IOTC_LOGGING_ALL)
topic = 'devices/{}/messages/events/'.format(self._deviceId)
if systemProperties != None:
firstProp = True
for prop in systemProperties:
if not firstProp:
topic += "&"
else:
firstProp = False
topic += _quote(prop, "$#%/\"\'") + '=' + \
str(systemProperties[prop])
return self._sendCommon(topic, data)
def sendCommandUpdate(self, interface_name, command_name, request_id, status_code, data):
if not isinstance(data, str):
data = json.dumps(data)
LOG_IOTC("- iotc :: sendCommandUpdate :: " +
data, IOTLogLevel.IOTC_LOGGING_ALL)
topic = 'devices/{}/messages/events/'.format(self._deviceId)
systemProperties = {CONSTANTS.COMMAND_SCHEMA: "asyncResult", CONSTANTS.COMMAND_NAME: command_name, CONSTANTS.INTERFACE_NAME: interface_name,
CONSTANTS.COMMAND_REQUEST_ID: request_id, CONSTANTS.COMMAND_STATUS_CODE: status_code}
firstProp = True
for prop in systemProperties:
if not firstProp:
topic += "&"
else:
firstProp = False
topic += _quote(prop, "$#%/\"\'") + '=' + \
str(systemProperties[prop])
return self._sendCommon(topic, "\"{}\"".format(data))
def sendState(self, data):
return self.sendTelemetry(data)
def sendEvent(self, data):
return self.sendTelemetry(data)
def sendProperty(self, data):
LOG_IOTC("- iotc :: sendProperty :: " +
data, IOTLogLevel.IOTC_LOGGING_ALL)
topic = '$iothub/twin/PATCH/properties/reported/?$rid={}'.format(
int(time.time()))
return self._sendCommon(topic, data)
def disconnect(self):
if not self.isConnected():
return
LOG_IOTC("- iotc :: disconnect :: ", IOTLogLevel.IOTC_LOGGING_ALL)
self._mqttConnected = False
if mqtt != None:
self._mqtts.disconnect()
self._mqtts.loop_stop()
else:
MAKE_CALLBACK(self, "ConnectionStatus", None, None, None, 0)
return 0
def on(self, eventName, callback):
self._events[eventName] = callback
return 0
def isConnected(self):
return self._mqttConnected
def doNext(self, idleTime=1):
if not self.isConnected():
return
if mqtt == None:
try: # try non-blocking
self._mqtts.check_msg()
time.sleep(idleTime)
except: # non-blocking wasn't implemented
self._mqtts.wait_msg()
else: # paho
time.sleep(idleTime)

21
src/iotc/baltimore.pem Executable file
Просмотреть файл

@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDdzCCAl+gAwIBAgIEAgAAuTANBgkqhkiG9w0BAQUFADBaMQswCQYDVQQGEwJJ
RTESMBAGA1UEChMJQmFsdGltb3JlMRMwEQYDVQQLEwpDeWJlclRydXN0MSIwIAYD
VQQDExlCYWx0aW1vcmUgQ3liZXJUcnVzdCBSb290MB4XDTAwMDUxMjE4NDYwMFoX
DTI1MDUxMjIzNTkwMFowWjELMAkGA1UEBhMCSUUxEjAQBgNVBAoTCUJhbHRpbW9y
ZTETMBEGA1UECxMKQ3liZXJUcnVzdDEiMCAGA1UEAxMZQmFsdGltb3JlIEN5YmVy
VHJ1c3QgUm9vdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKMEuyKr
mD1X6CZymrV51Cni4eiVgLGw41uOKymaZN+hXe2wCQVt2yguzmKiYv60iNoS6zjr
IZ3AQSsBUnuId9Mcj8e6uYi1agnnc+gRQKfRzMpijS3ljwumUNKoUMMo6vWrJYeK
mpYcqWe4PwzV9/lSEy/CG9VwcPCPwBLKBsua4dnKM3p31vjsufFoREJIE9LAwqSu
XmD+tqYF/LTdB1kC1FkYmGP1pWPgkAx9XbIGevOF6uvUA65ehD5f/xXtabz5OTZy
dc93Uk3zyZAsuT3lySNTPx8kmCFcB5kpvcY67Oduhjprl3RjM71oGDHweI12v/ye
jl0qhqdNkNwnGjkCAwEAAaNFMEMwHQYDVR0OBBYEFOWdWTCCR1jMrPoIVDaGezq1
BE3wMBIGA1UdEwEB/wQIMAYBAf8CAQMwDgYDVR0PAQH/BAQDAgEGMA0GCSqGSIb3
DQEBBQUAA4IBAQCFDF2O5G9RaEIFoN27TyclhAO992T9Ldcw46QQF+vaKSm2eT92
9hkTI7gQCvlYpNRhcL0EYWoSihfVCr3FvDB81ukMJY2GQE/szKN+OMY3EU/t3Wgx
jkzSswF07r51XgdIGn9w/xZchMB5hbgF/X++ZRGjD8ACtPhSNzkE1akxehi/oCr0
Epn3o0WC4zxe9Z2etciefC7IpJ5OCBRLbf1wbWsaY71k5h+3zvDyny67G7fyUIhz
ksLi4xaNmjICq44Y3ekQEe5+NauQrz4wlHrQMz2nZQ/1/I6eYs9HRCwBXbsdtTLS
R9I4LtD+gdwyah617jzV/OeBHRnDJELqYzmp
-----END CERTIFICATE-----

25
src/iotc/constants.py Normal file
Просмотреть файл

@ -0,0 +1,25 @@
class CONSTANTS:
DEVICE_CAPABILITIES_MESSAGE = "devices/{}/messages/events/%24.ifid=urn%3aazureiot%3aModelDiscovery%3aModelInformation%3a1&%24.ifname=urn_azureiot_ModelDiscovery_ModelInformation&%24.schema=modelInformation&%24.ct=application%2fjson"
DEVICETWIN_PATCH_MESSAGE="$iothub/twin/PATCH/properties/reported/?$rid={}"
SDK_VERSION="iotc-python-device-client/0.3.5"
SDK_VENDOR="Microsoft Corporation"
MODEL_INFORMATION_KEY="modelInformation"
INTERFACES_KEY="interfaces"
CAPABILITY_MODEL_KEY="capabilityModelId"
DPS_API_VERSION="2019-03-31"
HUB_API_VERSION="2019-07-01-preview"
# Interfaces
INTERFACE_NAME="$.ifname"
INTERFACE_ID="$.ifid"
CONTENT_TYPE="$.ct"
CONTENT_ENCODING="$.ce"
# Telemetry
TELEMETRY_SCHEMA="$.schema"
# Commands
COMMAND_SCHEMA="iothub-message-schema"
COMMAND_NAME="iothub-command-name"
COMMAND_REQUEST_ID="iothub-command-request-id"
COMMAND_STATUS_CODE="iothub-command-statuscode"

141
test/basics.py Executable file
Просмотреть файл

@ -0,0 +1,141 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license.
import sys
import time
import hashlib
import hmac
import base64
file_path = __file__[:len(__file__) - len("basics.py")]
# Update /usr/local/lib/python2.7/site-packages/iotc/__init__.py ?
if 'dont_write_bytecode' in dir(sys):
import os
sys.dont_write_bytecode = True
gIsMicroPython = False
else: # micropython
gIsMicroPython = True
file_path = file_path[:len(file_path) - 1] if file_path[len(file_path) - 1:] == "b" else file_path
sys.path.append(file_path + "../src")
import iotc
from iotc import IOTConnectType, IOTLogLevel
import json
pytest_run = False
def test_LOG_IOTC():
global pytest_run
pytest_run = True
assert iotc.LOG_IOTC("LOGME") == 0
def CALLBACK_(info):
if info.getPayload() == "{\"number\":1}":
if info.getTag() == "TAG":
if info.getStatusCode() == 0:
if info.getEventName() == "TEST":
info.setResponse(200, "DONE")
return 0
return 1
def test_MAKE_CALLBACK():
client = { "_events" : { "TEST" : CALLBACK_ } }
ret = iotc.MAKE_CALLBACK(client, "TEST", "{\"number\":1}", "TAG", 0)
assert ret != 0
assert ret.getResponseCode() == 200
assert ret.getResponseMessage() == "DONE"
def test_quote():
assert iotc._quote("abc+\\0123\"?%456@def", '~()*!.') == "abc%2B%5C0123%22%3F%25456%40def"
with open(file_path + "config.json", "r") as fh:
configText = fh.read()
config = json.loads(configText)
assert config["scopeId"] != None and config["masterKey"] != None and config["hostName"] != None
testCounter = 0
def compute_key(secret, regId):
global gIsMicroPython
try:
secret = base64.b64decode(secret)
except:
print("ERROR: broken base64 secret => `" + secret + "`")
sys.exit()
if gIsMicroPython == False:
return base64.b64encode(hmac.new(secret, msg=regId.encode('utf8'), digestmod=hashlib.sha256).digest())
else:
return base64.b64encode(hmac.new(secret, msg=regId.encode('utf8'), digestmod=hashlib._sha256.sha256).digest())
def test_lifetime():
global testCounter
if config["TEST_ID"] == 2:
keyORcert = config["cert"]
else:
keyORcert = compute_key(config["masterKey"], "dev1")
device = iotc.Device(config["scopeId"], keyORcert, "dev1", config["TEST_ID"]) # 1 / 2 (symm / x509)
if "modelData" in config:
assert device.setModelData(config["modelData"]) == 0
device.setExitOnError(True)
def onconnect(info):
global testCounter
if device.isConnected():
assert info.getStatusCode() == 0
testCounter += 1
assert device.sendTelemetry("{\"temp\":22}", {"iothub-creation-time-utc": time.time()}) == 0
testCounter += 1
assert device.sendProperty("{\"dieNumber\":3}") == 0
else:
print ("- ", "done")
def onmessagesent(info):
global testCounter
print ("onmessagesent", info.getTag(), info.getPayload())
testCounter += 1
def oncommand(info):
global testCounter
print("oncommand", info.getTag(), info.getPayload())
assert info.getTag() == "echo" or info.getTag() == "countdown"
testCounter += 1
allSettings = {}
def onsettingsupdated(info):
global testCounter
testCounter += 1
assert '$version' in json.loads(info.getPayload()) # each setting has a version
allSettings[info.getTag()] = True
print("onsettingsupdated", info.getTag(), info.getPayload())
assert device.on("ConnectionStatus", onconnect) == 0
assert device.on("MessageSent", onmessagesent) == 0
assert device.on("Command", oncommand) == 0
assert device.on("SettingsUpdated", onsettingsupdated) == 0
assert device.setLogLevel(IOTLogLevel.IOTC_LOGGING_ALL) == 0
assert device.setDPSEndpoint(config["hostName"]) == 0
assert device.connect() == 0
showCommandWarning = False
MAX_EXPECTED_TEST_COUNTER = 11
while device.isConnected() and testCounter < MAX_EXPECTED_TEST_COUNTER:
if showCommandWarning == False and testCounter >= MAX_EXPECTED_TEST_COUNTER - 2:
showCommandWarning = True
print("now, send a command from central")
device.doNext()
assert device.disconnect() == 0
if pytest_run == False:
test_LOG_IOTC()
test_MAKE_CALLBACK()
test_quote()
test_lifetime()

127
test/cacheHost.py Executable file
Просмотреть файл

@ -0,0 +1,127 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license.
import sys
import time
import hashlib
import hmac
import base64
file_path = __file__[:len(__file__) - len("cacheHost.py")]
# Update /usr/local/lib/python2.7/site-packages/iotc/__init__.py ?
if 'dont_write_bytecode' in dir(sys):
import os
sys.dont_write_bytecode = True
gIsMicroPython = False
else: # micropython
gIsMicroPython = True
file_path = file_path[:len(file_path) - 1] if file_path[len(file_path) - 1:] == "b" else file_path
sys.path.append(file_path + "../src")
import iotc
from iotc import IOTConnectType, IOTLogLevel
import json
pytest_run = False
def test_LOG_IOTC():
global pytest_run
pytest_run = True
assert iotc.LOG_IOTC("LOGME") == 0
def CALLBACK_(info):
if info.getPayload() == "{\"number\":1}":
if info.getTag() == "TAG":
if info.getStatusCode() == 0:
if info.getEventName() == "TEST":
info.setResponse(200, "DONE")
return 0
return 1
def test_MAKE_CALLBACK():
client = { "_events" : { "TEST" : CALLBACK_ } }
ret = iotc.MAKE_CALLBACK(client, "TEST", "{\"number\":1}", "TAG", 0)
assert ret != 0
assert ret.getResponseCode() == 200
assert ret.getResponseMessage() == "DONE"
def test_quote():
assert iotc._quote("abc+\\0123\"?%456@def", '~()*!.') == "abc%2B%5C0123%22%3F%25456%40def"
with open(file_path + "config.json", "r") as fh:
configText = fh.read()
config = json.loads(configText)
assert config["scopeId"] != None and config["masterKey"] != None and config["hostName"] != None
testCounter = 0
eventRaised = False
device = None
def compute_key(secret, regId):
global gIsMicroPython
try:
secret = base64.b64decode(secret)
except:
print("ERROR: broken base64 secret => `" + secret + "`")
sys.exit()
if gIsMicroPython == False:
return base64.b64encode(hmac.new(secret, msg=regId.encode('utf8'), digestmod=hashlib.sha256).digest())
else:
return base64.b64encode(hmac.new(secret, msg=regId.encode('utf8'), digestmod=hashlib._sha256.sha256).digest())
def test_lifetime():
global testCounter
global device
global eventRaised
if config["TEST_ID"] == 2:
keyORcert = config["cert"]
else:
keyORcert = compute_key(config["masterKey"], "dev1")
device = iotc.Device(config["scopeId"], keyORcert, "dev1", config["TEST_ID"]) # 1 / 2 (symm / x509)
if "modelData" in config:
assert device.setModelData(config["modelData"]) == 0
device.setExitOnError(True)
hostName = None
def onconnect(info):
global testCounter
global eventRaised
print "PASS = " + str(testCounter)
if testCounter < 2:
eventRaised = True
testCounter = testCounter + 1
assert device.on("ConnectionStatus", onconnect) == 0
# assert device.setLogLevel(IOTLogLevel.IOTC_LOGGING_ALL) == 0
assert device.setDPSEndpoint(config["hostName"]) == 0
assert device.connect() == 0
hostName = device.getHostName()
showCommandWarning = False
MAX_EXPECTED_TEST_COUNTER = 3
while testCounter < MAX_EXPECTED_TEST_COUNTER:
if eventRaised == True:
eventRaised = False
if testCounter == 1:
print "DISCONNECTS"
assert device.disconnect() == 0
else:
print "CONNECTS"
device = iotc.Device(config["scopeId"], keyORcert, "dev1", config["TEST_ID"]) # 1 / 2 (symm / x509)
assert device.on("ConnectionStatus", onconnect) == 0
# assert device.setLogLevel(IOTLogLevel.IOTC_LOGGING_ALL) == 0
assert device.setDPSEndpoint(config["hostName"]) == 0
device.connect(hostName)
device.doNext()
assert device.disconnect() == 0
if pytest_run == False:
test_LOG_IOTC()
test_MAKE_CALLBACK()
test_quote()
test_lifetime()

138
test/multiDevice.py Executable file
Просмотреть файл

@ -0,0 +1,138 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license.
import sys
import time
import hashlib
import hmac
import base64
file_path = __file__[:len(__file__) - len("multiDevice.py")]
# Update /usr/local/lib/python2.7/site-packages/iotc/__init__.py ?
if 'dont_write_bytecode' in dir(sys):
import os
sys.dont_write_bytecode = True
gIsMicroPython = False
else: # micropython
gIsMicroPython = True
file_path = file_path[:len(file_path) - 1] if file_path[len(file_path) - 1:] == "b" else file_path
sys.path.append(file_path + "../src")
import iotc
from iotc import IOTConnectType, IOTLogLevel
import json
pytest_run = False
def test_LOG_IOTC():
global pytest_run
pytest_run = True
assert iotc.LOG_IOTC("LOGME") == 0
def CALLBACK_(info):
if info.getPayload() == "{\"number\":1}":
if info.getTag() == "TAG":
if info.getStatusCode() == 0:
if info.getEventName() == "TEST":
info.setResponse(200, "DONE")
return 0
return 1
def test_MAKE_CALLBACK():
client = { "_events" : { "TEST" : CALLBACK_ } }
ret = iotc.MAKE_CALLBACK(client, "TEST", "{\"number\":1}", "TAG", 0)
assert ret != 0
assert ret.getResponseCode() == 200
assert ret.getResponseMessage() == "DONE"
def test_quote():
assert iotc._quote("abc+\\0123\"?%456@def", '~()*!.') == "abc%2B%5C0123%22%3F%25456%40def"
with open(file_path + "config.json", "r") as fh:
configText = fh.read()
config = json.loads(configText)
assert config["scopeId"] != None and config["masterKey"] != None and config["hostName"] != None
testCounter = 0
devices = []
def compute_key(secret, regId):
global gIsMicroPython
try:
secret = base64.b64decode(secret)
except:
print("ERROR: broken base64 secret => `" + secret + "`")
sys.exit()
if gIsMicroPython == False:
return base64.b64encode(hmac.new(secret, msg=regId.encode('utf8'), digestmod=hashlib.sha256).digest())
else:
return base64.b64encode(hmac.new(secret, msg=regId.encode('utf8'), digestmod=hashlib._sha256.sha256).digest())
def test_lifetime(id):
deviceId = "dev" + str(id + 1)
devices.append(iotc.Device(config["scopeId"], compute_key(config["masterKey"], deviceId), deviceId, config["TEST_ID"])) # 1 / 2 (symm / x509)
if "modelData" in config:
assert devices[id].setModelData(config["modelData"]) == 0
devices[id].setExitOnError(True)
def onconnect(info):
global testCounter
if devices[id].isConnected():
assert info.getStatusCode() == 0
testCounter += 1
assert devices[id].sendTelemetry("{\"temp\":22}", {"iothub-creation-time-utc": time.time()}) == 0
testCounter += 1
assert devices[id].sendProperty("{\"dieNumber\":3}") == 0
else:
print ("- ", "done", "device", deviceId)
def onmessagesent(info):
global testCounter
print ("onmessagesent", deviceId, info.getTag(), info.getPayload())
testCounter += 1
def oncommand(info):
global testCounter
print("oncommand", deviceId, info.getTag(), info.getPayload())
assert info.getTag() == "echo"
testCounter += 1
allSettings = {}
def onsettingsupdated(info):
global testCounter
testCounter += 1
assert '$version' in json.loads(info.getPayload()) # each setting has a version
assert not info.getTag() + str(id) in allSettings # no double event
allSettings[info.getTag() + str(id)] = True
print("onsettingsupdated", deviceId, info.getTag(), info.getPayload())
assert devices[id].on("ConnectionStatus", onconnect) == 0
assert devices[id].on("MessageSent", onmessagesent) == 0
assert devices[id].on("Command", oncommand) == 0
assert devices[id].on("SettingsUpdated", onsettingsupdated) == 0
assert devices[id].setLogLevel(IOTLogLevel.IOTC_LOGGING_ALL) == 0
assert devices[id].setDPSEndpoint(config["hostName"]) == 0
assert devices[id].connect() == 0
if pytest_run == False:
test_LOG_IOTC()
test_MAKE_CALLBACK()
test_quote()
for i in range(3):
test_lifetime(i)
while testCounter < 30:
for i in range(3):
assert devices[i].isConnected()
devices[i].doNext()
for i in range(3):
assert devices[i].disconnect() == 0

82
test/stress.py Executable file
Просмотреть файл

@ -0,0 +1,82 @@
import os
import sys
file_path = __file__[:len(__file__) - len("basics.py")]
file_path = file_path[:len(file_path) - 1] if file_path[len(file_path) - 1:] == "b" else file_path
sys.path.append(os.path.join(file_path, "..", "src"))
import iotc
from iotc import IOTConnectType, IOTLogLevel, IOTQosLevel
from datetime import datetime, timezone
from threading import Timer, Thread, Event
class PT():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
deviceId = "<Add device Id here>"
scopeId = "<Add scope Id here>"
deviceKey = "<Add device Key here>"
# see iotc.Device documentation above for x509 argument sample
iotc = iotc.Device(scopeId, deviceKey, deviceId, IOTConnectType.IOTC_CONNECT_SYMM_KEY)
iotc.setLogLevel(IOTLogLevel.IOTC_LOGGING_API_ONLY)
iotc.setTokenExpiration(21600 * 12)
# set QoS level to guarantee delivery at least once
iotc.setQosLevel(IOTQosLevel.IOTC_QOS_AT_LEAST_ONCE)
gCanSend = False
sent = 0
confirmed = 0
enableLogging = False
def log(s):
if enableLogging:
f=open("python-iotc.txt","a+")
f.write(datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S') + " - " + s + "\n")
f.close()
def onconnect(info):
global gCanSend
log("[onconnect] => status:" + str(info.getStatusCode()))
if info.getStatusCode() == 0:
if iotc.isConnected():
gCanSend = True
def onmessagesent(info):
global confirmed
confirmed += 1
log("[onmessagesent] => " + str(info.getPayload()))
def oncommand(info):
log("command name:", info.getTag())
log("command value: ", info.getPayload())
def onsettingsupdated(info):
log("setting name:", info.getTag())
log("setting value: ", info.getPayload())
def send():
global sent
sent += 1
print("Sending telemetry.. sent:{}, confirmed: {}".format(sent, confirmed))
iotc.sendTelemetry("{\"index\": " + str(sent) + "}")
iotc.on("ConnectionStatus", onconnect)
iotc.on("MessageSent", onmessagesent)
iotc.on("Command", oncommand)
iotc.on("SettingsUpdated", onsettingsupdated)
iotc.connect()
t = PT(5, send)
t.start()