зеркало из
1
0
Форкнуть 0
This commit is contained in:
olivakar 2024-01-29 11:45:19 -08:00 коммит произвёл GitHub
Родитель d559dce72f
Коммит 62690bd999
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
25 изменённых файлов: 386 добавлений и 266 удалений

Просмотреть файл

@ -4,7 +4,7 @@ repos:
hooks:
- id: black
language_version: python3
- repo: https://gitlab.com/pycqa/flake8
- repo: https://github.com/pycqa/flake8
rev: 3.9.1 # Use the ref you want to point at
hooks:
- id: flake8

Просмотреть файл

@ -21,7 +21,7 @@ The Azure IoT Device library is available on PyPI:
pip install azure-iot-device
```
Python 3.6 or higher is required in order to use the library
Python 3.7 or higher is required in order to use the library
## Using the library
API documentation for this package is available via [**Microsoft Docs**](https://docs.microsoft.com/python/api/azure-iot-device/azure.iot.device?view=azure-python).

Просмотреть файл

@ -6,7 +6,7 @@
import logging
import ssl
import requests
import requests # type: ignore
from . import transport_exceptions as exceptions
from .pipeline import pipeline_thread

Просмотреть файл

@ -0,0 +1,41 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from typing import Any, Union, Dict, List, Tuple, Callable, Awaitable, TypeVar
from typing_extensions import TypedDict, ParamSpec
_P = ParamSpec("_P")
_R = TypeVar("_R")
FunctionOrCoroutine = Union[Callable[_P, _R], Callable[_P, Awaitable[_R]]]
# typing does not support recursion, so we must use forward references here (PEP484)
JSONSerializable = Union[
Dict[str, "JSONSerializable"],
List["JSONSerializable"],
Tuple["JSONSerializable", ...],
str,
int,
float,
bool,
None,
]
# TODO: verify that the JSON specification requires str as keys in dict. Not sure why that's defined here.
Twin = Dict[str, Dict[str, JSONSerializable]]
TwinPatch = Dict[str, JSONSerializable]
class StorageInfo(TypedDict):
correlationId: str
hostName: str
containerName: str
blobName: str
sasToken: str
ProvisioningPayload = Union[Dict[str, Any], str, int]

Просмотреть файл

@ -5,7 +5,7 @@
# --------------------------------------------------------------------------
"""This module contains abstract classes for the various clients of the Azure IoT Hub Device SDK
"""
from __future__ import annotations # Needed for annotation bug < 3.10
import abc
import logging
import threading
@ -17,14 +17,20 @@ from .pipeline import constant as pipeline_constant
from azure.iot.device.common.auth import connection_string as cs
from azure.iot.device.common.auth import sastoken as st
from azure.iot.device.iothub import client_event
from azure.iot.device.iothub.models import Message, MethodRequest, MethodResponse
from azure.iot.device.common.models import X509
from azure.iot.device import exceptions
from azure.iot.device.common import auth, handle_exceptions
from . import edge_hsm
from .pipeline import MQTTPipeline, HTTPPipeline
from typing_extensions import Self
from azure.iot.device.custom_typing import FunctionOrCoroutine, Twin, TwinPatch
from typing import Any, Dict, List, Optional, Union
logger = logging.getLogger(__name__)
def _validate_kwargs(exclude=[], **kwargs):
def _validate_kwargs(exclude: Optional[List[str]] = [], **kwargs) -> None:
"""Helper function to validate user provided kwargs.
Raises TypeError if an invalid option has been provided"""
valid_kwargs = [
@ -43,11 +49,11 @@ def _validate_kwargs(exclude=[], **kwargs):
]
for kwarg in kwargs:
if (kwarg not in valid_kwargs) or (kwarg in exclude):
if (kwarg not in valid_kwargs) or (exclude is not None and kwarg in exclude):
raise TypeError("Unsupported keyword argument: '{}'".format(kwarg))
def _get_config_kwargs(**kwargs):
def _get_config_kwargs(**kwargs) -> Dict[str, Any]:
"""Get the subset of kwargs which pertain the config object"""
valid_config_kwargs = [
"server_verification_cert",
@ -70,7 +76,7 @@ def _get_config_kwargs(**kwargs):
return config_kwargs
def _form_sas_uri(hostname, device_id, module_id=None):
def _form_sas_uri(hostname: str, device_id: str, module_id: Optional[str] = None) -> str:
if module_id:
return "{hostname}/devices/{device_id}/modules/{module_id}".format(
hostname=hostname, device_id=device_id, module_id=module_id
@ -79,7 +85,7 @@ def _form_sas_uri(hostname, device_id, module_id=None):
return "{hostname}/devices/{device_id}".format(hostname=hostname, device_id=device_id)
def _extract_sas_uri_values(uri):
def _extract_sas_uri_values(uri: str) -> Dict[str, Any]:
d = {}
items = uri.split("/")
if len(items) != 3 and len(items) != 5:
@ -93,7 +99,7 @@ def _extract_sas_uri_values(uri):
try:
d["module_id"] = items[4]
except IndexError:
d["module_id"] = None
d["module_id"] = ""
return d
@ -108,7 +114,7 @@ class AbstractIoTHubClient(abc.ABC):
This class needs to be extended for specific clients.
"""
def __init__(self, mqtt_pipeline, http_pipeline):
def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline) -> None:
"""Initializer for a generic client.
:param mqtt_pipeline: The pipeline used to connect to the IoTHub endpoint.
@ -122,9 +128,10 @@ class AbstractIoTHubClient(abc.ABC):
self._receive_type = RECEIVE_TYPE_NONE_SET
self._client_lock = threading.Lock()
def _on_connected(self):
def _on_connected(self) -> None:
"""Helper handler that is called upon an iothub pipeline connect"""
logger.info("Connection State - Connected")
if self._inbox_manager is not None:
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
@ -133,9 +140,10 @@ class AbstractIoTHubClient(abc.ABC):
# Ensure that all handlers are running now that connection is re-established.
self._handler_manager.ensure_running()
def _on_disconnected(self):
def _on_disconnected(self) -> None:
"""Helper handler that is called upon an iothub pipeline disconnect"""
logger.info("Connection State - Disconnected")
if self._inbox_manager is not None:
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
@ -146,25 +154,27 @@ class AbstractIoTHubClient(abc.ABC):
self._inbox_manager.clear_all_method_requests()
logger.info("Cleared all pending method requests due to disconnect")
def _on_new_sastoken_required(self):
def _on_new_sastoken_required(self) -> None:
"""Helper handler that is called upon the iothub pipeline needing new SAS token"""
logger.info("New SasToken required from user")
if self._inbox_manager is not None:
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
event = client_event.ClientEvent(client_event.NEW_SASTOKEN_REQUIRED)
client_event_inbox.put(event)
def _on_background_exception(self, e):
def _on_background_exception(self, e: Exception) -> None:
"""Helper handler that is called upon an iothub pipeline background exception"""
handle_exceptions.handle_background_exception(e)
if self._inbox_manager is not None:
client_event_inbox = self._inbox_manager.get_client_event_inbox()
# Only add a ClientEvent to the inbox if the Handler Manager is capable of dealing with it
if self._handler_manager.handling_client_events:
event = client_event.ClientEvent(client_event.BACKGROUND_EXCEPTION, e)
client_event_inbox.put(event)
def _check_receive_mode_is_api(self):
def _check_receive_mode_is_api(self) -> None:
"""Call this function first in EVERY receive API"""
with self._client_lock:
if self._receive_type is RECEIVE_TYPE_NONE_SET:
@ -177,13 +187,14 @@ class AbstractIoTHubClient(abc.ABC):
else:
pass
def _check_receive_mode_is_handler(self):
def _check_receive_mode_is_handler(self) -> None:
"""Call this function first in EVERY handler setter"""
with self._client_lock:
if self._receive_type is RECEIVE_TYPE_NONE_SET:
# Lock the client to ONLY use receive handlers (no APIs)
self._receive_type = RECEIVE_TYPE_HANDLER
# Set the inbox manager to use unified msg receives
if self._inbox_manager is not None:
self._inbox_manager.use_unified_msg_mode = True
elif self._receive_type is RECEIVE_TYPE_API:
raise exceptions.ClientError(
@ -192,7 +203,7 @@ class AbstractIoTHubClient(abc.ABC):
else:
pass
def _replace_user_supplied_sastoken(self, sastoken_str):
def _replace_user_supplied_sastoken(self, sastoken_str: str) -> None:
"""
Replaces the pipeline's NonRenewableSasToken with a new one based on a provided
sastoken string. Also does validation.
@ -220,7 +231,7 @@ class AbstractIoTHubClient(abc.ABC):
raise ValueError("Provided SasToken is for a device")
if self._mqtt_pipeline.pipeline_configuration.device_id != vals["device_id"]:
raise ValueError("Provided SasToken does not match existing device id")
if self._mqtt_pipeline.pipeline_configuration.module_id != vals["module_id"]:
if vals["module_id"] != "" and self._mqtt_pipeline.pipeline_configuration.module_id != vals["module_id"]:
raise ValueError("Provided SasToken does not match existing module id")
if self._mqtt_pipeline.pipeline_configuration.hostname != vals["hostname"]:
raise ValueError("Provided SasToken does not match existing hostname")
@ -232,12 +243,17 @@ class AbstractIoTHubClient(abc.ABC):
self._mqtt_pipeline.pipeline_configuration.sastoken = new_token_o
@abc.abstractmethod
def _generic_receive_handler_setter(self, handler_name, feature_name, new_handler):
def _generic_receive_handler_setter(
self,
handler_name: str,
feature_name: str,
new_handler: Optional[FunctionOrCoroutine[[Any], Any]],
) -> None:
# Will be implemented differently in child classes, but define here for static analysis
pass
@classmethod
def create_from_connection_string(cls, connection_string, **kwargs):
def create_from_connection_string(cls, connection_string: str, **kwargs) -> Self:
"""
Instantiate the client from a IoTHub device or module connection string.
@ -281,18 +297,18 @@ class AbstractIoTHubClient(abc.ABC):
_validate_kwargs(exclude=excluded_kwargs, **kwargs)
# Create SasToken
connection_string = cs.ConnectionString(connection_string)
if connection_string.get(cs.X509) is not None:
connection_string_dict = cs.ConnectionString(connection_string)
if connection_string_dict.get(cs.X509) is not None:
raise ValueError(
"Use the .create_from_x509_certificate() method instead when using X509 certificates"
)
uri = _form_sas_uri(
hostname=connection_string[cs.HOST_NAME],
device_id=connection_string[cs.DEVICE_ID],
module_id=connection_string.get(cs.MODULE_ID),
hostname=connection_string_dict[cs.HOST_NAME],
device_id=connection_string_dict[cs.DEVICE_ID],
module_id=connection_string_dict.get(cs.MODULE_ID),
)
signing_mechanism = auth.SymmetricKeySigningMechanism(
key=connection_string[cs.SHARED_ACCESS_KEY]
key=connection_string_dict[cs.SHARED_ACCESS_KEY]
)
token_ttl = kwargs.get("sastoken_ttl", 3600)
try:
@ -304,12 +320,12 @@ class AbstractIoTHubClient(abc.ABC):
# Pipeline Config setup
config_kwargs = _get_config_kwargs(**kwargs)
pipeline_configuration = pipeline.IoTHubPipelineConfig(
device_id=connection_string[cs.DEVICE_ID],
module_id=connection_string.get(cs.MODULE_ID),
hostname=connection_string[cs.HOST_NAME],
gateway_hostname=connection_string.get(cs.GATEWAY_HOST_NAME),
device_id=connection_string_dict[cs.DEVICE_ID],
module_id=connection_string_dict.get(cs.MODULE_ID),
hostname=connection_string_dict[cs.HOST_NAME],
gateway_hostname=connection_string_dict.get(cs.GATEWAY_HOST_NAME),
sastoken=sastoken,
**config_kwargs
**config_kwargs,
)
if cls.__name__ == "IoTHubDeviceClient":
pipeline_configuration.blob_upload = True
@ -321,7 +337,7 @@ class AbstractIoTHubClient(abc.ABC):
return cls(mqtt_pipeline, http_pipeline)
@classmethod
def create_from_sastoken(cls, sastoken, **kwargs):
def create_from_sastoken(cls, sastoken: str, **kwargs: Dict[str, Any]) -> Self:
"""Instantiate the client from a pre-created SAS Token string
:param str sastoken: The SAS Token string
@ -381,7 +397,7 @@ class AbstractIoTHubClient(abc.ABC):
module_id=vals["module_id"],
hostname=vals["hostname"],
sastoken=sastoken_o,
**config_kwargs
**config_kwargs,
)
if cls.__name__ == "IoTHubDeviceClient":
pipeline_configuration.blob_upload = True # Blob Upload is a feature on Device Clients
@ -393,66 +409,70 @@ class AbstractIoTHubClient(abc.ABC):
return cls(mqtt_pipeline, http_pipeline)
@abc.abstractmethod
def shutdown(self):
def shutdown(self) -> None:
pass
@abc.abstractmethod
def connect(self):
def connect(self) -> None:
pass
@abc.abstractmethod
def disconnect(self):
def disconnect(self) -> None:
pass
@abc.abstractmethod
def update_sastoken(self, sastoken):
def update_sastoken(self, sastoken: str) -> None:
pass
@abc.abstractmethod
def send_message(self, message):
def send_message(self, message: Union[Message, str]) -> None:
pass
@abc.abstractmethod
def receive_method_request(self, method_name=None):
def receive_method_request(self, method_name: Optional[str] = None) -> None:
pass
@abc.abstractmethod
def send_method_response(self, method_request, payload, status):
def send_method_response(
self, method_response: MethodResponse
) -> None:
pass
@abc.abstractmethod
def get_twin(self):
def get_twin(self) -> Twin:
pass
@abc.abstractmethod
def patch_twin_reported_properties(self, reported_properties_patch):
def patch_twin_reported_properties(self, reported_properties_patch: TwinPatch) -> None:
pass
@abc.abstractmethod
def receive_twin_desired_properties_patch(self):
def receive_twin_desired_properties_patch(self) -> TwinPatch:
pass
@property
def connected(self):
def connected(self) -> bool:
"""
Read-only property to indicate if the transport is connected or not.
"""
return self._mqtt_pipeline.connected
@property
def on_connection_state_change(self):
def on_connection_state_change(self) -> FunctionOrCoroutine[[None], None]:
"""The handler function or coroutine that will be called when the connection state changes.
The function or coroutine definition should take no positional arguments.
"""
if self._handler_manager is not None:
return self._handler_manager.on_connection_state_change
@on_connection_state_change.setter
def on_connection_state_change(self, value):
def on_connection_state_change(self, value: FunctionOrCoroutine[[None], None]) -> None:
if self._handler_manager is not None:
self._handler_manager.on_connection_state_change = value
@property
def on_new_sastoken_required(self):
def on_new_sastoken_required(self) -> FunctionOrCoroutine[[None], None]:
"""The handler function or coroutine that will be called when the client requires a new
SAS token. This will happen approximately 2 minutes before the SAS Token expires.
On Windows platforms, if the lifespan exceeds approximately 49 days, a new token will
@ -466,31 +486,35 @@ class AbstractIoTHubClient(abc.ABC):
The function or coroutine definition should take no positional arguments.
"""
if self._handler_manager is not None:
return self._handler_manager.on_new_sastoken_required
@on_new_sastoken_required.setter
def on_new_sastoken_required(self, value):
def on_new_sastoken_required(self, value: FunctionOrCoroutine[[None], None]) -> None:
if self._handler_manager is not None:
self._handler_manager.on_new_sastoken_required = value
@property
def on_background_exception(self):
def on_background_exception(self) -> FunctionOrCoroutine[[Exception], None]:
"""The handler function or coroutine will be called when a background exception occurs.
The function or coroutine definition should take one positional argument (the exception
object)"""
if self._handler_manager is not None:
return self._handler_manager.on_background_exception
@on_background_exception.setter
def on_background_exception(self, value):
def on_background_exception(self, value: FunctionOrCoroutine[[Exception], None]) -> None:
if self._handler_manager is not None:
self._handler_manager.on_background_exception = value
@abc.abstractproperty
def on_message_received(self):
def on_message_received(self) -> FunctionOrCoroutine[[Message], None]:
# Defined below on AbstractIoTHubDeviceClient / AbstractIoTHubModuleClient
pass
@property
def on_method_request_received(self):
def on_method_request_received(self) -> FunctionOrCoroutine[[MethodRequest], None]:
"""The handler function or coroutine that will be called when a method request is received.
Remember to acknowledge the method request in your function or coroutine via use of the
@ -498,25 +522,29 @@ class AbstractIoTHubClient(abc.ABC):
The function or coroutine definition should take one positional argument (the
:class:`azure.iot.device.MethodRequest` object)"""
if self._handler_manager is not None:
return self._handler_manager.on_method_request_received
@on_method_request_received.setter
def on_method_request_received(self, value):
def on_method_request_received(self, value: FunctionOrCoroutine[[MethodRequest], None]) -> None:
self._generic_receive_handler_setter(
"on_method_request_received", pipeline_constant.METHODS, value
)
@property
def on_twin_desired_properties_patch_received(self):
def on_twin_desired_properties_patch_received(self) -> FunctionOrCoroutine[[TwinPatch], None]:
"""The handler function or coroutine that will be called when a twin desired properties
patch is received.
The function or coroutine definition should take one positional argument (the twin patch
in the form of a JSON dictionary object)"""
if self._handler_manager is not None:
return self._handler_manager.on_twin_desired_properties_patch_received
@on_twin_desired_properties_patch_received.setter
def on_twin_desired_properties_patch_received(self, value):
def on_twin_desired_properties_patch_received(
self, value: FunctionOrCoroutine[[TwinPatch], None]
):
self._generic_receive_handler_setter(
"on_twin_desired_properties_patch_received", pipeline_constant.TWIN_PATCHES, value
)
@ -524,7 +552,9 @@ class AbstractIoTHubClient(abc.ABC):
class AbstractIoTHubDeviceClient(AbstractIoTHubClient):
@classmethod
def create_from_x509_certificate(cls, x509, hostname, device_id, **kwargs):
def create_from_x509_certificate(
cls, x509: X509, hostname: str, device_id: str, **kwargs
) -> Self:
"""
Instantiate a client using X509 certificate authentication.
@ -586,7 +616,9 @@ class AbstractIoTHubDeviceClient(AbstractIoTHubClient):
return cls(mqtt_pipeline, http_pipeline)
@classmethod
def create_from_symmetric_key(cls, symmetric_key, hostname, device_id, **kwargs):
def create_from_symmetric_key(
cls, symmetric_key: str, hostname: str, device_id: str, **kwargs
) -> Self:
"""
Instantiate a client using symmetric key authentication.
@ -657,29 +689,30 @@ class AbstractIoTHubDeviceClient(AbstractIoTHubClient):
return cls(mqtt_pipeline, http_pipeline)
@abc.abstractmethod
def receive_message(self):
def receive_message(self) -> Message:
pass
@abc.abstractmethod
def get_storage_info_for_blob(self, blob_name):
def get_storage_info_for_blob(self, blob_name: str) -> Dict[str, Any]:
pass
@abc.abstractmethod
def notify_blob_upload_status(
self, correlation_id, is_success, status_code, status_description
):
self, correlation_id: str, is_success: bool, status_code: int, status_description: str
) -> None:
pass
@property
def on_message_received(self):
def on_message_received(self) -> FunctionOrCoroutine[[Message], None]:
"""The handler function or coroutine that will be called when a message is received.
The function or coroutine definition should take one positional argument (the
:class:`azure.iot.device.Message` object)"""
if self._handler_manager is not None:
return self._handler_manager.on_message_received
@on_message_received.setter
def on_message_received(self, value):
def on_message_received(self, value: FunctionOrCoroutine[[Message], None]):
self._generic_receive_handler_setter(
"on_message_received", pipeline_constant.C2D_MSG, value
)
@ -687,7 +720,7 @@ class AbstractIoTHubDeviceClient(AbstractIoTHubClient):
class AbstractIoTHubModuleClient(AbstractIoTHubClient):
@classmethod
def create_from_edge_environment(cls, **kwargs):
def create_from_edge_environment(cls, **kwargs) -> Self:
"""
Instantiate the client from the IoT Edge environment.
@ -794,11 +827,11 @@ class AbstractIoTHubModuleClient(AbstractIoTHubClient):
try:
sastoken = st.RenewableSasToken(uri, signing_mechanism, ttl=token_ttl)
except st.SasTokenError as e:
new_err = ValueError(
new_val_err = ValueError(
"Could not create a SasToken using the values provided, or in the Edge environment"
)
new_err.__cause__ = e
raise new_err
new_val_err.__cause__ = e
raise new_val_err
# Pipeline Config setup
config_kwargs = _get_config_kwargs(**kwargs)
@ -809,7 +842,7 @@ class AbstractIoTHubModuleClient(AbstractIoTHubClient):
gateway_hostname=gateway_hostname,
sastoken=sastoken,
server_verification_cert=server_verification_cert,
**config_kwargs
**config_kwargs,
)
pipeline_configuration.ensure_desired_properties = True
@ -824,7 +857,9 @@ class AbstractIoTHubModuleClient(AbstractIoTHubClient):
return cls(mqtt_pipeline, http_pipeline)
@classmethod
def create_from_x509_certificate(cls, x509, hostname, device_id, module_id, **kwargs):
def create_from_x509_certificate(
cls, x509: X509, hostname: str, device_id: str, module_id: str, **kwargs
) -> Self:
"""
Instantiate a client using X509 certificate authentication.
@ -885,27 +920,30 @@ class AbstractIoTHubModuleClient(AbstractIoTHubClient):
return cls(mqtt_pipeline, http_pipeline)
@abc.abstractmethod
def send_message_to_output(self, message, output_name):
def send_message_to_output(self, message: Union[Message, str], output_name: str) -> None:
pass
@abc.abstractmethod
def receive_message_on_input(self, input_name):
def receive_message_on_input(self, input_name: str) -> Message:
pass
@abc.abstractmethod
def invoke_method(self, method_params, device_id, module_id=None):
def invoke_method(
self, method_params: dict, device_id: str, module_id: Optional[str] = None
) -> None:
pass
@property
def on_message_received(self):
def on_message_received(self) -> FunctionOrCoroutine[[Message], Any]:
"""The handler function or coroutine that will be called when an input message is received.
The function definition or coroutine should take one positional argument (the
:class:`azure.iot.device.Message` object)"""
if self._handler_manager is not None:
return self._handler_manager.on_message_received
@on_message_received.setter
def on_message_received(self, value):
def on_message_received(self, value: FunctionOrCoroutine[[Message], Any]) -> None:
self._generic_receive_handler_setter(
"on_message_received", pipeline_constant.INPUT_MSG, value
)

Просмотреть файл

@ -6,7 +6,7 @@
"""This module contains user-facing asynchronous clients for the
Azure IoTHub Device SDK for Python.
"""
from __future__ import annotations # Needed for annotation bug < 3.10
import logging
import asyncio
import deprecation
@ -16,7 +16,7 @@ from azure.iot.device.iothub.abstract_clients import (
AbstractIoTHubDeviceClient,
AbstractIoTHubModuleClient,
)
from azure.iot.device.iothub.models import Message
from azure.iot.device.iothub.models import Message, MethodRequest, MethodResponse
from azure.iot.device.iothub.pipeline import constant
from azure.iot.device.iothub.pipeline import exceptions as pipeline_exceptions
from azure.iot.device import exceptions
@ -24,11 +24,14 @@ from azure.iot.device.iothub.inbox_manager import InboxManager
from .async_inbox import AsyncClientInbox
from . import async_handler_manager, loop_management
from azure.iot.device import constant as device_constant
from azure.iot.device.iothub.pipeline import MQTTPipeline, HTTPPipeline
from azure.iot.device.custom_typing import FunctionOrCoroutine, StorageInfo, Twin, TwinPatch
from typing import Any, Optional, Union
logger = logging.getLogger(__name__)
async def handle_result(callback):
async def handle_result(callback: FunctionOrCoroutine[[Any], None]):
try:
return await callback.completion()
except pipeline_exceptions.ConnectionDroppedError as e:
@ -91,7 +94,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
self._mqtt_pipeline.on_method_request_received = self._inbox_manager.route_method_request
self._mqtt_pipeline.on_twin_patch_received = self._inbox_manager.route_twin_patch
async def _enable_feature(self, feature_name):
async def _enable_feature(self, feature_name: str) -> None:
"""Enable an Azure IoT Hub feature
:param feature_name: The name of the feature to enable.
@ -111,7 +114,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
# This branch shouldn't be reached, but in case it is, log it
logger.info("Feature ({}) already enabled - skipping".format(feature_name))
async def _disable_feature(self, feature_name):
async def _disable_feature(self, feature_name: str) -> None:
"""Disable an Azure IoT Hub feature
:param feature_name: The name of the feature to enable.
@ -131,7 +134,9 @@ class GenericIoTHubClient(AbstractIoTHubClient):
# This branch shouldn't be reached, but in case it is, log it
logger.info("Feature ({}) already disabled - skipping".format(feature_name))
def _generic_receive_handler_setter(self, handler_name, feature_name, new_handler):
def _generic_receive_handler_setter(
self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[None], None]
) -> None:
"""Set a receive handler on the handler manager and enable the corresponding feature.
This is a synchronous call (yes, even though this is the async client), meaning that this
@ -163,7 +168,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
fut = asyncio.run_coroutine_threadsafe(self._disable_feature(feature_name), loop=loop)
fut.result()
async def shutdown(self):
async def shutdown(self) -> None:
"""Shut down the client for graceful exit.
Once this method is called, any attempts at further client calls will result in a
@ -207,7 +212,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
# capability for HTTP pipeline.
logger.info("Client shutdown complete")
async def connect(self):
async def connect(self) -> None:
"""Connects the client to an Azure IoT Hub or Azure IoT Edge Hub instance.
The destination is chosen based on the credentials passed via the auth_provider parameter
@ -232,7 +237,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Successfully connected to Hub")
async def disconnect(self):
async def disconnect(self) -> None:
"""Disconnect the client from the Azure IoT Hub or Azure IoT Edge Hub instance.
It is recommended that you make sure to call this coroutine when you are completely done
@ -277,7 +282,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Successfully disconnected from Hub")
async def update_sastoken(self, sastoken):
async def update_sastoken(self, sastoken: str) -> None:
"""
Update the client's SAS Token used for authentication, then reauthorizes the connection.
@ -316,7 +321,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Successfully reauthorized connection to Hub")
async def send_message(self, message):
async def send_message(self, message: Union[Message, str]) -> None:
"""Sends a message to the default events endpoint on the Azure IoT Hub or Azure IoT Edge Hub instance.
If the connection to the service has not previously been opened by a call to connect, this
@ -360,7 +365,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
current_version=device_constant.VERSION,
details="We recommend that you use the .on_method_request_received property to set a handler instead",
)
async def receive_method_request(self, method_name=None):
async def receive_method_request(self, method_name: Optional[str] = None) -> MethodRequest:
"""Receive a method request via the Azure IoT Hub or Azure IoT Edge Hub.
If no method request is yet available, will wait until it is available.
@ -384,7 +389,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Received method request")
return method_request
async def send_method_response(self, method_response):
async def send_method_response(self, method_response: MethodResponse) -> None:
"""Send a response to a method request via the Azure IoT Hub or Azure IoT Edge Hub.
If the connection to the service has not previously been opened by a call to connect, this
@ -419,7 +424,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Successfully sent method response to Hub")
async def get_twin(self):
async def get_twin(self) -> Twin:
"""
Gets the device or module twin from the Azure IoT Hub or Azure IoT Edge Hub service.
@ -452,7 +457,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Successfully retrieved twin")
return twin
async def patch_twin_reported_properties(self, reported_properties_patch):
async def patch_twin_reported_properties(self, reported_properties_patch: TwinPatch) -> None:
"""
Update reported properties with the Azure IoT Hub or Azure IoT Edge Hub service.
@ -495,7 +500,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
current_version=device_constant.VERSION,
details="We recommend that you use the .on_twin_desired_properties_patch_received property to set a handler instead",
)
async def receive_twin_desired_properties_patch(self):
async def receive_twin_desired_properties_patch(self) -> TwinPatch:
"""
Receive a desired property patch via the Azure IoT Hub or Azure IoT Edge Hub.
@ -519,7 +524,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
"""An asynchronous device client that connects to an Azure IoT Hub instance."""
def __init__(self, mqtt_pipeline, http_pipeline):
def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline):
"""Initializer for a IoTHubDeviceClient.
This initializer should not be called directly.
@ -536,7 +541,7 @@ class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
current_version=device_constant.VERSION,
details="We recommend that you use the .on_message_received property to set a handler instead",
)
async def receive_message(self):
async def receive_message(self) -> Message:
"""Receive a message that has been sent from the Azure IoT Hub.
If no message is yet available, will wait until an item is available.
@ -555,7 +560,7 @@ class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
logger.info("Message received")
return message
async def get_storage_info_for_blob(self, blob_name):
async def get_storage_info_for_blob(self, blob_name: str) -> StorageInfo:
"""Sends a POST request over HTTP to an IoTHub endpoint that will return information for uploading via the Azure Storage Account linked to the IoTHub your device is connected to.
:param str blob_name: The name in string format of the blob that will be uploaded using the storage API. This name will be used to generate the proper credentials for Storage, and needs to match what will be used with the Azure Storage SDK to perform the blob upload.
@ -573,8 +578,8 @@ class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
return storage_info
async def notify_blob_upload_status(
self, correlation_id, is_success, status_code, status_description
):
self, correlation_id: str, is_success: bool, status_code: int, status_description: str
) -> None:
"""When the upload is complete, the device sends a POST request to the IoT Hub endpoint with information on the status of an upload to blob attempt. This is used by IoT Hub to notify listening clients.
:param str correlation_id: Provided by IoT Hub on get_storage_info_for_blob request.
@ -601,7 +606,7 @@ class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient):
"""An asynchronous module client that connects to an Azure IoT Hub or Azure IoT Edge instance."""
def __init__(self, mqtt_pipeline, http_pipeline):
def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline):
"""Initializer for a IoTHubModuleClient.
This initializer should not be called directly.
@ -613,7 +618,7 @@ class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient):
super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline)
self._mqtt_pipeline.on_input_message_received = self._inbox_manager.route_input_message
async def send_message_to_output(self, message, output_name):
async def send_message_to_output(self, message: Union[Message, str], output_name: str) -> None:
"""Sends an event/message to the given module output.
These are outgoing events and are meant to be "output events"
@ -664,7 +669,7 @@ class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient):
current_version=device_constant.VERSION,
details="We recommend that you use the .on_message_received property to set a handler instead",
)
async def receive_message_on_input(self, input_name):
async def receive_message_on_input(self, input_name: str) -> Message:
"""Receive an input message that has been sent from another Module to a specific input.
If no message is yet available, will wait until an item is available.
@ -685,7 +690,9 @@ class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient):
logger.info("Input message received on: " + input_name)
return message
async def invoke_method(self, method_params, device_id, module_id=None):
async def invoke_method(
self, method_params, device_id, module_id: Optional[str] = None
) -> MethodResponse:
"""Invoke a method from your client onto a device or module client, and receive the response to the method call.
:param dict method_params: Should contain a methodName (str), payload (str),

Просмотреть файл

@ -7,7 +7,7 @@
import logging
import json
import base64
import requests
import requests # type: ignore
import requests_unixsocket
import urllib
from azure.iot.device.common.auth.signing_mechanism import SigningMechanism

Просмотреть файл

@ -50,7 +50,7 @@ class Message(object):
self._iothub_interface_id = None
@property
def iothub_interface_id(self):
def iothub_interface_id(self) -> str:
return self._iothub_interface_id
def set_as_security_message(self):
@ -64,7 +64,7 @@ class Message(object):
def __str__(self):
return str(self.data)
def get_size(self):
def get_size(self) -> int:
total = 0
total = total + sum(
sys.getsizeof(v)

Просмотреть файл

@ -5,6 +5,9 @@
# --------------------------------------------------------------------------
"""This module contains classes related to direct method invocations.
"""
from typing import Optional
from typing_extensions import Self
from azure.iot.device.custom_typing import JSONSerializable
class MethodRequest(object):
@ -15,7 +18,7 @@ class MethodRequest(object):
:ivar dict payload: The JSON payload being sent with the request.
"""
def __init__(self, request_id, name, payload):
def __init__(self, request_id: str, name: str, payload: JSONSerializable):
"""Initializer for a MethodRequest.
:param str request_id: The request id.
@ -27,15 +30,15 @@ class MethodRequest(object):
self._payload = payload
@property
def request_id(self):
def request_id(self) -> str:
return self._request_id
@property
def name(self):
def name(self) -> str:
return self._name
@property
def payload(self):
def payload(self) -> JSONSerializable:
return self._payload
@ -48,7 +51,7 @@ class MethodResponse(object):
:type payload: dict, str, int, float, bool, or None (JSON compatible values)
"""
def __init__(self, request_id, status, payload=None):
def __init__(self, request_id: str, status: int, payload: Optional[JSONSerializable] = None):
"""Initializer for MethodResponse.
:param str request_id: The request id of the MethodRequest being responded to.
@ -61,7 +64,7 @@ class MethodResponse(object):
self.payload = payload
@classmethod
def create_from_method_request(cls, method_request, status, payload=None):
def create_from_method_request(cls, method_request: MethodRequest, status: int, payload: Optional[JSONSerializable] = None) -> Self:
"""Factory method for creating a MethodResponse from a MethodRequest.
:param method_request: The MethodRequest object to respond to.

Просмотреть файл

@ -6,15 +6,16 @@
"""This module contains user-facing synchronous clients for the
Azure IoTHub Device SDK for Python.
"""
from __future__ import annotations # Needed for annotation bug < 3.10
import logging
from queue import Queue
import deprecation
from .abstract_clients import (
AbstractIoTHubClient,
AbstractIoTHubDeviceClient,
AbstractIoTHubModuleClient,
)
from .models import Message
from .models import Message, MethodResponse, MethodRequest
from .inbox_manager import InboxManager
from .sync_inbox import SyncClientInbox, InboxEmpty
from . import sync_handler_manager
@ -23,7 +24,9 @@ from .pipeline import exceptions as pipeline_exceptions
from azure.iot.device import exceptions
from azure.iot.device.common.evented_callback import EventedCallback
from azure.iot.device import constant as device_constant
from .pipeline import MQTTPipeline, HTTPPipeline
from azure.iot.device.custom_typing import FunctionOrCoroutine, StorageInfo, Twin, TwinPatch
from typing import Optional, Union
logger = logging.getLogger(__name__)
@ -91,7 +94,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
self._mqtt_pipeline.on_method_request_received = self._inbox_manager.route_method_request
self._mqtt_pipeline.on_twin_patch_received = self._inbox_manager.route_twin_patch
def _enable_feature(self, feature_name):
def _enable_feature(self, feature_name: str) -> None:
"""Enable an Azure IoT Hub feature.
This is a synchronous call, meaning that this function will not return until the feature
@ -111,7 +114,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
# This branch shouldn't be reached, but in case it is, log it
logger.info("Feature ({}) already disabled - skipping".format(feature_name))
def _disable_feature(self, feature_name):
def _disable_feature(self, feature_name: str) -> None:
"""Disable an Azure IoT Hub feature
This is a synchronous call, meaning that this function will not return until the feature
@ -132,7 +135,9 @@ class GenericIoTHubClient(AbstractIoTHubClient):
# This branch shouldn't be reached, but in case it is, log it
logger.info("Feature ({}) already disabled - skipping".format(feature_name))
def _generic_receive_handler_setter(self, handler_name, feature_name, new_handler):
def _generic_receive_handler_setter(
self, handler_name: str, feature_name: str, new_handler: FunctionOrCoroutine[[None], None]
) -> None:
"""Set a receive handler on the handler manager and enable the corresponding feature.
This is a synchronous call, meaning that this function will not return until the feature
@ -154,7 +159,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
elif new_handler is None and self._mqtt_pipeline.feature_enabled[feature_name]:
self._disable_feature(feature_name)
def shutdown(self):
def shutdown(self) -> None:
"""Shut down the client for graceful exit.
Once this method is called, any attempts at further client calls will result in a
@ -180,6 +185,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.debug("Completed pipeline shutdown operation")
# Stop the Client Event handlers now that everything else is completed
if self._handler_manager is not None:
self._handler_manager.stop(receiver_handlers_only=False)
# Yes, that means the pipeline is disconnected twice (well, actually three times if you
@ -197,7 +203,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
# capability for HTTP pipeline.
logger.info("Client shutdown complete")
def connect(self):
def connect(self) -> None:
"""Connects the client to an Azure IoT Hub or Azure IoT Edge Hub instance.
The destination is chosen based on the credentials passed via the auth_provider parameter
@ -224,7 +230,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Successfully connected to Hub")
def disconnect(self):
def disconnect(self) -> None:
"""Disconnect the client from the Azure IoT Hub or Azure IoT Edge Hub instance.
It is recommended that you make sure to call this function when you are completely done
@ -247,6 +253,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
# Note that in the process of stopping the handlers and resolving pending calls
# a user-supplied handler may cause a reconnection to occur
logger.debug("Stopping handlers...")
if self._handler_manager is not None:
self._handler_manager.stop(receiver_handlers_only=True)
logger.debug("Successfully stopped handlers")
@ -270,7 +277,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Successfully disconnected from Hub")
def update_sastoken(self, sastoken):
def update_sastoken(self, sastoken: str) -> None:
"""
Update the client's SAS Token used for authentication, then reauthorizes the connection.
@ -306,7 +313,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Successfully reauthorized connection to Hub")
def send_message(self, message):
def send_message(self, message: Union[Message, str]) -> None:
"""Sends a message to the default events endpoint on the Azure IoT Hub or Azure IoT Edge Hub instance.
This is a synchronous event, meaning that this function will not return until the event
@ -352,7 +359,9 @@ class GenericIoTHubClient(AbstractIoTHubClient):
current_version=device_constant.VERSION,
details="We recommend that you use the .on_method_request_received property to set a handler instead",
)
def receive_method_request(self, method_name=None, block=True, timeout=None):
def receive_method_request(
self, method_name: Optional[str] = None, block: bool = True, timeout: Optional[int] = None
) -> Optional[MethodRequest]:
"""Receive a method request via the Azure IoT Hub or Azure IoT Edge Hub.
:param str method_name: Optionally provide the name of the method to receive requests for.
@ -369,7 +378,8 @@ class GenericIoTHubClient(AbstractIoTHubClient):
if not self._mqtt_pipeline.feature_enabled[pipeline_constant.METHODS]:
self._enable_feature(pipeline_constant.METHODS)
method_inbox = self._inbox_manager.get_method_request_inbox(method_name)
if self._inbox_manager is not None:
method_inbox : Queue[MethodRequest] = self._inbox_manager.get_method_request_inbox(method_name)
logger.info("Waiting for method request...")
try:
@ -380,7 +390,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Did not receive method request")
return method_request
def send_method_response(self, method_response):
def send_method_response(self, method_response: MethodResponse) -> None:
"""Send a response to a method request via the Azure IoT Hub or Azure IoT Edge Hub.
This is a synchronous event, meaning that this function will not return until the event
@ -413,7 +423,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Successfully sent method response to Hub")
def get_twin(self):
def get_twin(self) -> Twin:
"""
Gets the device or module twin from the Azure IoT Hub or Azure IoT Edge Hub service.
@ -446,7 +456,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
logger.info("Successfully retrieved twin")
return twin
def patch_twin_reported_properties(self, reported_properties_patch):
def patch_twin_reported_properties(self, reported_properties_patch: TwinPatch) -> None:
"""
Update reported properties with the Azure IoT Hub or Azure IoT Edge Hub service.
@ -488,7 +498,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
current_version=device_constant.VERSION,
details="We recommend that you use the .on_twin_desired_properties_patch_received property to set a handler instead",
)
def receive_twin_desired_properties_patch(self, block=True, timeout=None):
def receive_twin_desired_properties_patch(self, block=True, timeout=None) -> TwinPatch:
"""
Receive a desired property patch via the Azure IoT Hub or Azure IoT Edge Hub.
@ -513,7 +523,8 @@ class GenericIoTHubClient(AbstractIoTHubClient):
if not self._mqtt_pipeline.feature_enabled[pipeline_constant.TWIN_PATCHES]:
self._enable_feature(pipeline_constant.TWIN_PATCHES)
twin_patch_inbox = self._inbox_manager.get_twin_patch_inbox()
if self._inbox_manager is not None:
twin_patch_inbox : Queue[TwinPatch] = self._inbox_manager.get_twin_patch_inbox()
logger.info("Waiting for twin patches...")
try:
@ -528,7 +539,7 @@ class GenericIoTHubClient(AbstractIoTHubClient):
class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
"""A synchronous device client that connects to an Azure IoT Hub instance."""
def __init__(self, mqtt_pipeline, http_pipeline):
def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline):
"""Initializer for a IoTHubDeviceClient.
This initializer should not be called directly.
@ -538,6 +549,7 @@ class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
:type mqtt_pipeline: :class:`azure.iot.device.iothub.pipeline.MQTTPipeline`
"""
super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline)
if self._inbox_manager is not None:
self._mqtt_pipeline.on_c2d_message_received = self._inbox_manager.route_c2d_message
@deprecation.deprecated(
@ -545,7 +557,7 @@ class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
current_version=device_constant.VERSION,
details="We recommend that you use the .on_message_received property to set a handler instead",
)
def receive_message(self, block=True, timeout=None):
def receive_message(self, block=True, timeout=None) -> Optional[Message]:
"""Receive a message that has been sent from the Azure IoT Hub.
:param bool block: Indicates if the operation should block until a message is received.
@ -559,7 +571,8 @@ class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
if not self._mqtt_pipeline.feature_enabled[pipeline_constant.C2D_MSG]:
self._enable_feature(pipeline_constant.C2D_MSG)
c2d_inbox = self._inbox_manager.get_c2d_message_inbox()
if self._inbox_manager is not None:
c2d_inbox : Queue[Message] = self._inbox_manager.get_c2d_message_inbox()
logger.info("Waiting for message from Hub...")
try:
@ -570,7 +583,7 @@ class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
logger.info("No message received.")
return message
def get_storage_info_for_blob(self, blob_name):
def get_storage_info_for_blob(self, blob_name: str) -> StorageInfo:
"""Sends a POST request over HTTP to an IoTHub endpoint that will return information for uploading via the Azure Storage Account linked to the IoTHub your device is connected to.
:param str blob_name: The name in string format of the blob that will be uploaded using the storage API. This name will be used to generate the proper credentials for Storage, and needs to match what will be used with the Azure Storage SDK to perform the blob upload.
@ -584,8 +597,8 @@ class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
return storage_info
def notify_blob_upload_status(
self, correlation_id, is_success, status_code, status_description
):
self, correlation_id: str, is_success: bool, status_code: int, status_description: str
) -> None:
"""When the upload is complete, the device sends a POST request to the IoT Hub endpoint with information on the status of an upload to blob attempt. This is used by IoT Hub to notify listening clients.
:param str correlation_id: Provided by IoT Hub on get_storage_info_for_blob request.
@ -608,7 +621,7 @@ class IoTHubDeviceClient(GenericIoTHubClient, AbstractIoTHubDeviceClient):
class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient):
"""A synchronous module client that connects to an Azure IoT Hub or Azure IoT Edge instance."""
def __init__(self, mqtt_pipeline, http_pipeline):
def __init__(self, mqtt_pipeline: MQTTPipeline, http_pipeline: HTTPPipeline):
"""Initializer for a IoTHubModuleClient.
This initializer should not be called directly.
@ -620,9 +633,10 @@ class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient):
:type http_pipeline: :class:`azure.iot.device.iothub.pipeline.HTTPPipeline`
"""
super().__init__(mqtt_pipeline=mqtt_pipeline, http_pipeline=http_pipeline)
if self._inbox_manager is not None:
self._mqtt_pipeline.on_input_message_received = self._inbox_manager.route_input_message
def send_message_to_output(self, message, output_name):
def send_message_to_output(self, message: Union[Message, str], output_name: str) -> None:
"""Sends an event/message to the given module output.
These are outgoing events and are meant to be "output events".
@ -673,7 +687,9 @@ class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient):
current_version=device_constant.VERSION,
details="We recommend that you use the .on_message_received property to set a handler instead",
)
def receive_message_on_input(self, input_name, block=True, timeout=None):
def receive_message_on_input(
self, input_name: str, block: bool = True, timeout: Optional[int] = None
) -> Optional[Message]:
"""Receive an input message that has been sent from another Module to a specific input.
:param str input_name: The input name to receive a message on.
@ -687,7 +703,8 @@ class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient):
if not self._mqtt_pipeline.feature_enabled[pipeline_constant.INPUT_MSG]:
self._enable_feature(pipeline_constant.INPUT_MSG)
input_inbox = self._inbox_manager.get_input_message_inbox(input_name)
if self._inbox_manager is not None:
input_inbox : Queue[Message] = self._inbox_manager.get_input_message_inbox(input_name)
logger.info("Waiting for input message on: " + input_name + "...")
try:
@ -698,7 +715,7 @@ class IoTHubModuleClient(GenericIoTHubClient, AbstractIoTHubModuleClient):
logger.info("No input message received on: " + input_name)
return message
def invoke_method(self, method_params, device_id, module_id=None):
def invoke_method(self, method_params: dict, device_id: str, module_id=None):
"""Invoke a method from your client onto a device or module client, and receive the response to the method call.
:param dict method_params: Should contain a methodName (str), payload (str),

Просмотреть файл

@ -7,12 +7,13 @@
import inspect
import logging
from typing import Dict
logger = logging.getLogger(__name__)
# This dict will be used as a scope for imports and defs in add_shims_for_inherited_methods
# in order to keep them out of the global scope of this module.
shim_scope = {}
shim_scope : Dict[str, str] = {}
def add_shims_for_inherited_methods(target_class):

Просмотреть файл

@ -10,15 +10,21 @@ Device Provisioning Service.
import abc
import logging
from typing_extensions import Self
from typing import Any, Dict, List, Optional
from azure.iot.device.provisioning import pipeline
from azure.iot.device.common.auth import sastoken as st
from azure.iot.device.common import auth, handle_exceptions
from .pipeline import MQTTPipeline
from azure.iot.device.common.models import X509
from azure.iot.device.custom_typing import ProvisioningPayload
from azure.iot.device.provisioning.models import RegistrationResult
logger = logging.getLogger(__name__)
def _validate_kwargs(exclude=[], **kwargs):
def _validate_kwargs(exclude: Optional[List[str]] = [], **kwargs):
"""Helper function to validate user provided kwargs.
Raises TypeError if an invalid option has been provided"""
@ -33,16 +39,16 @@ def _validate_kwargs(exclude=[], **kwargs):
]
for kwarg in kwargs:
if (kwarg not in valid_kwargs) or (kwarg in exclude):
if (kwarg not in valid_kwargs) or (exclude is not None and kwarg in exclude):
raise TypeError("Unsupported keyword argument '{}'".format(kwarg))
def validate_registration_id(reg_id):
def validate_registration_id(reg_id: str) -> None:
if not (reg_id and reg_id.strip()):
raise ValueError("Registration Id can not be none, empty or blank.")
def _get_config_kwargs(**kwargs):
def _get_config_kwargs(**kwargs) -> Dict[str, Any]:
"""Get the subset of kwargs which pertain the config object"""
valid_config_kwargs = [
"server_verification_cert",
@ -60,7 +66,7 @@ def _get_config_kwargs(**kwargs):
return config_kwargs
def _form_sas_uri(id_scope, registration_id):
def _form_sas_uri(id_scope: str, registration_id: str) -> str:
return "{id_scope}/registrations/{registration_id}".format(
id_scope=id_scope, registration_id=registration_id
)
@ -71,7 +77,7 @@ class AbstractProvisioningDeviceClient(abc.ABC):
Super class for any client that can be used to register devices to Device Provisioning Service.
"""
def __init__(self, pipeline):
def __init__(self, pipeline: MQTTPipeline):
"""
Initializes the provisioning client.
@ -89,8 +95,8 @@ class AbstractProvisioningDeviceClient(abc.ABC):
@classmethod
def create_from_symmetric_key(
cls, provisioning_host, registration_id, id_scope, symmetric_key, **kwargs
):
cls, provisioning_host: str, registration_id: str, id_scope: str, symmetric_key: str, **kwargs
) -> Self:
"""
Create a client which can be used to run the registration of a device with provisioning service
using Symmetric Key authentication.
@ -163,8 +169,8 @@ class AbstractProvisioningDeviceClient(abc.ABC):
@classmethod
def create_from_x509_certificate(
cls, provisioning_host, registration_id, id_scope, x509, **kwargs
):
cls, provisioning_host: str, registration_id: str, id_scope: str, x509: X509, **kwargs
) -> Self:
"""
Create a client which can be used to run the registration of a device with
provisioning service using X509 certificate authentication.
@ -224,18 +230,18 @@ class AbstractProvisioningDeviceClient(abc.ABC):
return cls(mqtt_provisioning_pipeline)
@abc.abstractmethod
def register(self):
def register(self) -> RegistrationResult:
"""
Register the device with the Device Provisioning Service.
"""
pass
@property
def provisioning_payload(self):
def provisioning_payload(self) -> ProvisioningPayload:
return self._provisioning_payload
@provisioning_payload.setter
def provisioning_payload(self, provisioning_payload):
def provisioning_payload(self, provisioning_payload: ProvisioningPayload):
"""
Set the payload that will form the request payload in a registration request.
@ -245,7 +251,7 @@ class AbstractProvisioningDeviceClient(abc.ABC):
self._provisioning_payload = provisioning_payload
def log_on_register_complete(result=None):
def log_on_register_complete(result: Optional[RegistrationResult] = None) -> None:
# This could be a failed/successful registration result from DPS
# or a error from polling machine. Response should be given appropriately
if result is not None:

Просмотреть файл

@ -8,9 +8,11 @@ This module contains user-facing asynchronous Provisioning Device Client for Azu
Device SDK. This client uses Symmetric Key and X509 authentication to register devices with an
IoT Hub via the Device Provisioning Service.
"""
from __future__ import annotations # Needed for annotation bug < 3.10
import logging
from typing import Any
from azure.iot.device.common import async_adapter
from azure.iot.device.custom_typing import FunctionOrCoroutine
from azure.iot.device.provisioning.abstract_provisioning_device_client import (
AbstractProvisioningDeviceClient,
)
@ -20,11 +22,12 @@ from azure.iot.device.provisioning.abstract_provisioning_device_client import (
from azure.iot.device.provisioning.pipeline import exceptions as pipeline_exceptions
from azure.iot.device import exceptions
from azure.iot.device.provisioning.pipeline import constant as dps_constant
from azure.iot.device.provisioning.models import RegistrationResult
logger = logging.getLogger(__name__)
async def handle_result(callback):
async def handle_result(callback: FunctionOrCoroutine[[Any], Any]) -> Any:
try:
return await callback.completion()
except pipeline_exceptions.ConnectionDroppedError as e:
@ -49,7 +52,7 @@ class ProvisioningDeviceClient(AbstractProvisioningDeviceClient):
using Symmetric Key or X509 authentication.
"""
async def register(self):
async def register(self) -> RegistrationResult:
"""
Register the device with the provisioning service.
@ -94,7 +97,7 @@ class ProvisioningDeviceClient(AbstractProvisioningDeviceClient):
return result
async def _enable_responses(self):
async def _enable_responses(self) -> None:
"""Enable to receive responses from Device Provisioning Service."""
logger.info("Enabling reception of response from Device Provisioning Service...")
subscribe_async = async_adapter.emulate_async(self._pipeline.enable_responses)

Просмотреть файл

@ -4,45 +4,8 @@
# license information.
# --------------------------------------------------------------------------
import json
class RegistrationResult(object):
"""
The final result of a completed or failed registration attempt
:ivar:request_id: The request id to which the response is being obtained
:ivar:operation_id: The id of the operation as returned by the registration request.
:ivar status: The status of the registration process as returned by the provisioning service.
Values can be "unassigned", "assigning", "assigned", "failed", "disabled"
:ivar registration_state : Details like device id, assigned hub , date times etc returned
from the provisioning service.
"""
def __init__(self, operation_id, status, registration_state=None):
"""
:param operation_id: The id of the operation as returned by the initial registration request.
:param status: The status of the registration process.
Values can be "unassigned", "assigning", "assigned", "failed", "disabled"
:param registration_state : Details like device id, assigned hub , date times etc returned
from the provisioning service.
"""
self._operation_id = operation_id
self._status = status
self._registration_state = registration_state
@property
def operation_id(self):
return self._operation_id
@property
def status(self):
return self._status
@property
def registration_state(self):
return self._registration_state
def __str__(self):
return "\n".join([str(self.registration_state), self.status])
from typing import Optional
from azure.iot.device.custom_typing import JSONSerializable
class RegistrationState(object):
@ -86,34 +49,75 @@ class RegistrationState(object):
self._response_payload = payload
@property
def device_id(self):
def device_id(self) -> str:
return self._device_id
@property
def assigned_hub(self):
def assigned_hub(self) -> str:
return self._assigned_hub
@property
def sub_status(self):
def sub_status(self) -> str:
return self._sub_status
@property
def created_date_time(self):
def created_date_time(self) -> str:
return self._created_date_time
@property
def last_update_date_time(self):
def last_update_date_time(self) -> str:
return self._last_update_date_time
@property
def etag(self):
def etag(self) -> str:
return self._etag
@property
def response_payload(self):
def response_payload(self) -> JSONSerializable:
return json.dumps(self._response_payload, default=lambda o: o.__dict__, sort_keys=True)
def __str__(self):
return "\n".join(
[self.device_id, self.assigned_hub, self.sub_status, self.response_payload]
)
class RegistrationResult(object):
"""
The final result of a completed or failed registration attempt
:ivar:request_id: The request id to which the response is being obtained
:ivar:operation_id: The id of the operation as returned by the registration request.
:ivar status: The status of the registration process as returned by the provisioning service.
Values can be "unassigned", "assigning", "assigned", "failed", "disabled"
:ivar registration_state : Details like device id, assigned hub , date times etc returned
from the provisioning service.
"""
def __init__(
self, operation_id: str, status: str, registration_state: Optional[RegistrationState] = None
):
"""
:param operation_id: The id of the operation as returned by the initial registration request.
:param status: The status of the registration process.
Values can be "unassigned", "assigning", "assigned", "failed", "disabled"
:param registration_state : Details like device id, assigned hub , date times etc returned
from the provisioning service.
"""
self._operation_id = operation_id
self._status = status
self._registration_state = registration_state
@property
def operation_id(self) -> str:
return self._operation_id
@property
def status(self) -> str:
return self._status
@property
def registration_state(self) -> Optional[RegistrationState]:
return self._registration_state
def __str__(self):
return "\n".join([str(self.registration_state), self.status])

Просмотреть файл

@ -8,7 +8,14 @@
# For now, present relevant transport errors as part of the Pipeline API surface
# so that they do not have to be duplicated at this layer.
# OK TODO This mimics the IotHub Case. Both IotHub and Provisioning needs to change
from azure.iot.device.common.pipeline.pipeline_exceptions import * # noqa: F401, F403
from azure.iot.device.common.pipeline.pipeline_exceptions import ( # noqa: F401, F403
PipelineException,
OperationCancelled,
OperationTimeout,
OperationError,
PipelineNotRunning,
PipelineRuntimeError
)
from azure.iot.device.common.transport_exceptions import ( # noqa: F401
ConnectionFailedError,
ConnectionDroppedError,

Просмотреть файл

@ -8,19 +8,23 @@ This module contains user-facing synchronous Provisioning Device Client for Azur
Device SDK. This client uses Symmetric Key and X509 authentication to register devices with an
IoT Hub via the Device Provisioning Service.
"""
from __future__ import annotations # Needed for annotation bug < 3.10
import logging
from typing import Any
from azure.iot.device.common.evented_callback import EventedCallback
from azure.iot.device.custom_typing import FunctionOrCoroutine
from .abstract_provisioning_device_client import AbstractProvisioningDeviceClient
from .abstract_provisioning_device_client import log_on_register_complete
from azure.iot.device.provisioning.pipeline import constant as dps_constant
from .pipeline import exceptions as pipeline_exceptions
from azure.iot.device import exceptions
from azure.iot.device.provisioning.models import RegistrationResult
logger = logging.getLogger(__name__)
def handle_result(callback):
def handle_result(callback: FunctionOrCoroutine[[Any], None]) -> RegistrationResult:
try:
return callback.wait_for_completion()
except pipeline_exceptions.ConnectionDroppedError as e:
@ -47,7 +51,7 @@ class ProvisioningDeviceClient(AbstractProvisioningDeviceClient):
using Symmetric Key or X509 authentication.
"""
def register(self):
def register(self) -> RegistrationResult:
"""
Register the device with the provisioning service
@ -94,7 +98,7 @@ class ProvisioningDeviceClient(AbstractProvisioningDeviceClient):
return result
def _enable_responses(self):
def _enable_responses(self) -> None:
"""Enable to receive responses from Device Provisioning Service.
This is a synchronous call, meaning that this function will not return until the feature

Просмотреть файл

Просмотреть файл

@ -27,7 +27,7 @@ async def main():
# The client object is used to interact with your Azure IoT Edge device.
device_client = IoTHubDeviceClient.create_from_connection_string(
connection_string=conn_str, server_verification_cert=root_ca_cert
connection_string_dict=conn_str, server_verification_cert=root_ca_cert
)
# Connect the client.

Просмотреть файл

@ -6,7 +6,7 @@
script_dir=$(cd "$(dirname "$0")" && pwd)
export RUNTIMES_TO_INSTALL="3.6.6 3.7.1 3.8.10 3.9.9 3.10.2"
export RUNTIMES_TO_INSTALL="3.7.1 3.8.10 3.9.9 3.10.2"
echo "This script will do the following:"
echo "1. Use apt to install pre-requisites for pyenv"

Просмотреть файл

@ -65,7 +65,6 @@ setup(
"License :: OSI Approved :: MIT License",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
@ -85,9 +84,11 @@ setup(
"requests-unixsocket>=0.1.5,<1.0.0",
"janus",
"PySocks",
"typing_extensions",
],
python_requires=">=3.6, <4",
python_requires=">=3.7, <4",
packages=find_namespace_packages(where="azure-iot-device"),
package_data={"azure.iot.device": ["py.typed"]},
package_dir={"": "azure-iot-device"},
zip_safe=False,
)

Просмотреть файл

@ -1032,7 +1032,7 @@ class SharedIoTHubDeviceClientCreateFromSastokenTests(
# Verify the IoTHubPipelineConfig is constructed as expected
config = mock_mqtt_pipeline_init.call_args[0][0]
assert config.device_id == expected_device_id
assert config.module_id is None
assert config.module_id == ""
assert config.hostname == expected_hostname
assert config.gateway_hostname is None
assert config.sastoken is sastoken_mock.return_value

Просмотреть файл

@ -29,8 +29,6 @@ jobs:
vmImage: 'Ubuntu 20.04'
strategy:
matrix:
Python36:
python.version: '3.6'
Python37:
python.version: '3.7'
Python38:

Просмотреть файл

@ -16,11 +16,6 @@ jobs:
transport: 'mqttws'
imageName: 'windows-latest'
consumerGroup: 'cg2'
py36_linux_mqtt:
pv: '3.6'
transport: 'mqtt'
imageName: 'Ubuntu 20.04'
consumerGroup: 'cg3'
py37_linux_mqttws:
pv: '3.7'
transport: 'mqttws'

Просмотреть файл

@ -40,7 +40,7 @@ jobs:
strategy:
matrix:
py310_mqtt: { pv: '3.10', transport: 'mqtt', consumer_group: 'e2e-consumer-group-3' }
py36_mqtt_ws: { pv: '3.6', transport: 'mqttws', consumer_group: 'e2e-consumer-group-4' }
py37_mqtt_ws: { pv: '3.7', transport: 'mqttws', consumer_group: 'e2e-consumer-group-4' }
steps:
- task: UsePythonVersion@0

Просмотреть файл

@ -17,11 +17,6 @@ jobs:
imageName: 'windows-latest'
consumerGroup: 'cg2'
py36_linux_mqtt:
pv: '3.6'
transport: 'mqtt'
imageName: 'Ubuntu 20.04'
consumerGroup: 'cg3'
py37_linux_mqttws:
pv: '3.7'
transport: 'mqttws'