[V3] IoTHubHTTPClient and Direct Method refactor (#1104)
* Implemented initial IoTHubHTTPClient * No proxy support (yet) * All API references to "methods" now refer more accurately to "direct methods"
This commit is contained in:
Родитель
a9c20135dc
Коммит
d7384ff8b8
|
@ -1,40 +1,44 @@
|
|||
# TODO: REMOVE THIS WHEN NO LONGER TESTING AT IOTHUB-MQTT LEVEL
|
||||
|
||||
from v3_async_wip.config import IoTHubClientConfig
|
||||
from v3_async_wip import sastoken as st
|
||||
from v3_async_wip import signing_mechanism as sm
|
||||
from azure.iot.device.common.auth import connection_string as cs
|
||||
from azure.iot.device.common.auth import sastoken as st
|
||||
from azure.iot.device.common.auth import signing_mechanism as sm
|
||||
import ssl
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_client_config(cs_str):
|
||||
async def create_client_config(cs_str):
|
||||
connection_string = cs.ConnectionString(cs_str)
|
||||
hostname = connection_string[cs.HOST_NAME]
|
||||
device_id = connection_string[cs.DEVICE_ID]
|
||||
module_id = connection_string.get(cs.MODULE_ID)
|
||||
sastoken = _create_sastoken(connection_string)
|
||||
|
||||
generator = _create_sastoken_generator(connection_string)
|
||||
sastoken_provider = await st.SasTokenProvider.create_from_generator(generator)
|
||||
|
||||
ssl_context = _create_ssl_context()
|
||||
|
||||
return IoTHubClientConfig(
|
||||
device_id=device_id,
|
||||
module_id=module_id,
|
||||
hostname=hostname,
|
||||
sastoken=sastoken,
|
||||
sastoken_provider=sastoken_provider,
|
||||
ssl_context=ssl_context,
|
||||
)
|
||||
|
||||
|
||||
def _create_sastoken(connection_string, ttl=3600):
|
||||
def _create_sastoken_generator(connection_string, ttl=3600):
|
||||
uri = _form_sas_uri(
|
||||
hostname=connection_string[cs.HOST_NAME],
|
||||
device_id=connection_string[cs.DEVICE_ID],
|
||||
module_id=connection_string.get(cs.MODULE_ID),
|
||||
)
|
||||
signing_mechanism = sm.SymmetricKeySigningMechanism(key=connection_string[cs.SHARED_ACCESS_KEY])
|
||||
sastoken = st.RenewableSasToken(uri=uri, signing_mechanism=signing_mechanism, ttl=ttl)
|
||||
return sastoken
|
||||
sastoken_generator = st.InternalSasTokenGenerator(signing_mechanism, uri, ttl)
|
||||
return sastoken_generator
|
||||
|
||||
|
||||
def _form_sas_uri(hostname, device_id, module_id=None):
|
||||
|
|
|
@ -10,4 +10,9 @@ setup(
|
|||
description="Internal development utilities for Azure IoT. NOT FOR DISTRIBUTION.",
|
||||
version="0.0.0a1", # Alpha Release
|
||||
license="MIT License",
|
||||
install_requires=[
|
||||
# NOTE: there is an optional aiohttp[speedups] install that improves performance
|
||||
# but I don't know if we want to bring in extra, optional packages
|
||||
"aiohttp",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
# -------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# --------------------------------------------------------------------------
|
||||
import pytest
|
||||
import logging
|
||||
from v3_async_wip import http_path_iothub
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
# NOTE: All tests are parametrized with multiple values for URL encoding. This is to show that the
|
||||
# URL encoding is done correctly - not all URL encoding encodes the '+' character. Thus we must
|
||||
# make sure any URL encoded value can encode a '+' specifically, in addition to regular encoding.
|
||||
|
||||
|
||||
@pytest.mark.describe(".get_direct_method_invoke_path()")
|
||||
class TestGetMethodInvokePath(object):
|
||||
@pytest.mark.it("Returns the relative method invoke HTTP path")
|
||||
@pytest.mark.parametrize(
|
||||
"device_id, module_id, expected_path",
|
||||
[
|
||||
pytest.param(
|
||||
"my_device",
|
||||
None,
|
||||
"/twins/my_device/methods",
|
||||
id="'my_device' ==> '/twins/my_device/methods'",
|
||||
),
|
||||
pytest.param(
|
||||
"my/device",
|
||||
None,
|
||||
"/twins/my%2Fdevice/methods",
|
||||
id="'my/device' ==> '/twins/my%2Fdevice/methods'",
|
||||
),
|
||||
pytest.param(
|
||||
"my+device",
|
||||
None,
|
||||
"/twins/my%2Bdevice/methods",
|
||||
id="'my+device' ==> '/twins/my%2Bdevice/methods'",
|
||||
),
|
||||
pytest.param(
|
||||
"my_device",
|
||||
"my_module",
|
||||
"/twins/my_device/modules/my_module/methods",
|
||||
id="('my_device', 'my_module') ==> '/twins/my_device/modules/my_module/methods'",
|
||||
),
|
||||
pytest.param(
|
||||
"my/device",
|
||||
"my?module",
|
||||
"/twins/my%2Fdevice/modules/my%3Fmodule/methods",
|
||||
id="('my/device', 'my?module') ==> '/twins/my%2Fdevice/modules/my%3Fmodule/methods'",
|
||||
),
|
||||
pytest.param(
|
||||
"my+device",
|
||||
"my+module",
|
||||
"/twins/my%2Bdevice/modules/my%2Bmodule/methods",
|
||||
id="('my+device', 'my+module') ==> '/twins/my%2Bdevice/modules/my%2Bmodule/methods'",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_path(self, device_id, module_id, expected_path):
|
||||
path = http_path_iothub.get_direct_method_invoke_path(
|
||||
device_id=device_id, module_id=module_id
|
||||
)
|
||||
assert path == expected_path
|
||||
|
||||
|
||||
@pytest.mark.describe(".get_storage_info_for_blob_path()")
|
||||
class TestGetStorageInfoPath(object):
|
||||
@pytest.mark.it("Returns the relative storage info HTTP path")
|
||||
@pytest.mark.parametrize(
|
||||
"device_id, expected_path",
|
||||
[
|
||||
pytest.param(
|
||||
"my_device",
|
||||
"/devices/my_device/files",
|
||||
id="'my_device' ==> '/devices/my_device/files'",
|
||||
),
|
||||
pytest.param(
|
||||
"my/device",
|
||||
"/devices/my%2Fdevice/files",
|
||||
id="'my/device' ==> '/devices/my%2Fdevice/files'",
|
||||
),
|
||||
pytest.param(
|
||||
"my+device",
|
||||
"/devices/my%2Bdevice/files",
|
||||
id="'my+device' ==> '/devices/my%2Bdevice/files'",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_path(self, device_id, expected_path):
|
||||
path = http_path_iothub.get_storage_info_for_blob_path(device_id)
|
||||
assert path == expected_path
|
||||
|
||||
|
||||
@pytest.mark.describe(".get_notify_blob_upload_status_path()")
|
||||
class TestGetNotifyBlobUploadStatusPath(object):
|
||||
@pytest.mark.it("Returns the relative notify blob upload status HTTP path")
|
||||
@pytest.mark.parametrize(
|
||||
"device_id, expected_path",
|
||||
[
|
||||
pytest.param(
|
||||
"my_device",
|
||||
"/devices/my_device/files/notifications",
|
||||
id="'my_device' ==> '/devices/my_device/files/notifications'",
|
||||
),
|
||||
pytest.param(
|
||||
"my/device",
|
||||
"/devices/my%2Fdevice/files/notifications",
|
||||
id="'my/device' ==> '/devices/my%2Fdevice/files/notifications'",
|
||||
),
|
||||
pytest.param(
|
||||
"my+device",
|
||||
"/devices/my%2Bdevice/files/notifications",
|
||||
id="'my+device' ==> '/devices/my%2Bdevice/files/notifications'",
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_path(self, device_id, expected_path):
|
||||
path = http_path_iothub.get_notify_blob_upload_status_path(device_id)
|
||||
assert path == expected_path
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -121,7 +121,7 @@ async def client(mocker, client_config):
|
|||
await client.shutdown()
|
||||
|
||||
|
||||
@pytest.mark.describe("IoTHubMQTTClient - Instantiation")
|
||||
@pytest.mark.describe("IoTHubMQTTClient -- Instantiation")
|
||||
class TestIoTHubMQTTClientInstantiation:
|
||||
# NOTE: As the instantiation is the unit under test here, we shouldn't use the client fixture.
|
||||
# This means that you must do graceful exit by shutting down the client at the end of all tests
|
||||
|
@ -425,7 +425,7 @@ class TestIoTHubMQTTClientInstantiation:
|
|||
|
||||
await client.shutdown()
|
||||
|
||||
@pytest.mark.it("Adds incoming message filter on the MQTTClient for method requests")
|
||||
@pytest.mark.it("Adds incoming message filter on the MQTTClient for direct method requests")
|
||||
@pytest.mark.parametrize(
|
||||
"device_id, module_id",
|
||||
[
|
||||
|
@ -433,10 +433,10 @@ class TestIoTHubMQTTClientInstantiation:
|
|||
pytest.param(FAKE_DEVICE_ID, FAKE_MODULE_ID, id="Module Configuration"),
|
||||
],
|
||||
)
|
||||
async def test_method_request_filter(self, mocker, client_config, device_id, module_id):
|
||||
async def test_direct_method_request_filter(self, mocker, client_config, device_id, module_id):
|
||||
client_config.device_id = device_id
|
||||
client_config.module_id = module_id
|
||||
expected_topic = mqtt_topic.get_method_topic_for_subscribe()
|
||||
expected_topic = mqtt_topic.get_direct_method_request_topic_for_subscribe()
|
||||
|
||||
mocker.patch.object(mqtt, "MQTTClient", spec=mqtt.MQTTClient)
|
||||
client = IoTHubMQTTClient(client_config)
|
||||
|
@ -552,8 +552,10 @@ class TestIoTHubMQTTClientInstantiation:
|
|||
|
||||
await client.shutdown()
|
||||
|
||||
# NOTE: For testing the functionality of this generator, see the corresponding test suite (TestIoTHubMQTTClientIncomingMethodRequests)
|
||||
@pytest.mark.it("Creates and stores an incoming method request generator as an attribute")
|
||||
# NOTE: For testing the functionality of this generator, see the corresponding test suite (TestIoTHubMQTTClientIncomingDirectDirectMethodRequests)
|
||||
@pytest.mark.it(
|
||||
"Creates and stores an incoming direct method request generator as an attribute"
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"device_id, module_id",
|
||||
[
|
||||
|
@ -561,11 +563,11 @@ class TestIoTHubMQTTClientInstantiation:
|
|||
pytest.param(FAKE_DEVICE_ID, FAKE_MODULE_ID, id="Module Configuration"),
|
||||
],
|
||||
)
|
||||
async def test_method_request_generator(self, client_config, device_id, module_id):
|
||||
async def test_direct_method_request_generator(self, client_config, device_id, module_id):
|
||||
client_config.device_id = device_id
|
||||
client_config.module_id = module_id
|
||||
client = IoTHubMQTTClient(client_config)
|
||||
assert isinstance(client.incoming_method_requests, typing.AsyncGenerator)
|
||||
assert isinstance(client.incoming_direct_method_requests, typing.AsyncGenerator)
|
||||
|
||||
await client.shutdown()
|
||||
|
||||
|
@ -718,7 +720,6 @@ class TestIoTHubMQTTClientInstantiation:
|
|||
await client.shutdown()
|
||||
|
||||
|
||||
# TODO: exceptions
|
||||
@pytest.mark.describe("IoTHubMQTTClient - .shutdown()")
|
||||
class TestIoTHubMQTTClientShutdown:
|
||||
@pytest.fixture(autouse=True)
|
||||
|
@ -1185,26 +1186,28 @@ class TestIoTHubMQTTClientSendMessage:
|
|||
await t
|
||||
|
||||
|
||||
@pytest.mark.describe("IoTHubMQTTClient - .send_method_response()")
|
||||
class TestIoTHubMQTTClientSendMethodResponse:
|
||||
@pytest.mark.describe("IoTHubMQTTClient - .send_direct_method_response()")
|
||||
class TestIoTHubMQTTClientSendDirectMethodResponse:
|
||||
@pytest.fixture
|
||||
def method_response(self):
|
||||
json_response = {"some": {"json": "payload"}}
|
||||
method_response = models.MethodResponse(request_id="123", status=200, payload=json_response)
|
||||
method_response = models.DirectMethodResponse(
|
||||
request_id="123", status=200, payload=json_response
|
||||
)
|
||||
return method_response
|
||||
|
||||
@pytest.mark.it(
|
||||
"Awaits a publish to the method response topic using the MQTTClient, sending the given MethodResponse's JSON payload converted to string"
|
||||
"Awaits a publish to the direct method response topic using the MQTTClient, sending the given DirectMethodResponse's JSON payload converted to string"
|
||||
)
|
||||
async def test_mqtt_publish(self, mocker, client, method_response):
|
||||
assert client._mqtt_client.publish.await_count == 0
|
||||
|
||||
expected_topic = mqtt_topic.get_method_topic_for_publish(
|
||||
expected_topic = mqtt_topic.get_direct_method_response_topic_for_publish(
|
||||
method_response.request_id, method_response.status
|
||||
)
|
||||
expected_payload = json.dumps(method_response.payload)
|
||||
|
||||
await client.send_method_response(method_response)
|
||||
await client.send_direct_method_response(method_response)
|
||||
|
||||
assert client._mqtt_client.publish.await_count == 1
|
||||
assert client._mqtt_client.publish.await_args == mocker.call(
|
||||
|
@ -1217,14 +1220,14 @@ class TestIoTHubMQTTClientSendMethodResponse:
|
|||
client._mqtt_client.publish.side_effect = exception
|
||||
|
||||
with pytest.raises(type(exception)) as e_info:
|
||||
await client.send_method_response(method_response)
|
||||
await client.send_direct_method_response(method_response)
|
||||
assert e_info.value is exception
|
||||
|
||||
@pytest.mark.it("Can be cancelled while waiting for the MQTTClient publish to finish")
|
||||
async def test_cancel(self, client, method_response):
|
||||
client._mqtt_client.publish = custom_mock.HangingAsyncMock()
|
||||
|
||||
t = asyncio.create_task(client.send_method_response(method_response))
|
||||
t = asyncio.create_task(client.send_direct_method_response(method_response))
|
||||
|
||||
# Hanging, waiting for MQTT publish to finish
|
||||
await client._mqtt_client.publish.wait_for_hang()
|
||||
|
@ -2037,26 +2040,26 @@ class TestIoTHubMQTTClientDisableInputMessageReceive(IoTHubMQTTClientDisableRece
|
|||
await client.disable_input_message_receive()
|
||||
|
||||
|
||||
@pytest.mark.describe("IoTHubMQTTClient - .enable_method_request_receive()")
|
||||
class TestIoTHubMQTTClientEnableMethodRequestReceive(IoTHubMQTTClientEnableReceiveTest):
|
||||
@pytest.mark.describe("IoTHubMQTTClient - .enable_direct_method_request_receive()")
|
||||
class TestIoTHubMQTTClientEnableDirectMethodRequestReceive(IoTHubMQTTClientEnableReceiveTest):
|
||||
@pytest.fixture
|
||||
def method_name(self):
|
||||
return "enable_method_request_receive"
|
||||
return "enable_direct_method_request_receive"
|
||||
|
||||
@pytest.fixture
|
||||
def expected_topic(self):
|
||||
return mqtt_topic.get_method_topic_for_subscribe()
|
||||
return mqtt_topic.get_direct_method_request_topic_for_subscribe()
|
||||
|
||||
|
||||
@pytest.mark.describe("IoTHubMQTTClient - .disable_method_request_receive()")
|
||||
class TestIoTHubMQTTClientDisableMethodRequestReceive(IoTHubMQTTClientDisableReceiveTest):
|
||||
@pytest.mark.describe("IoTHubMQTTClient - .disable_direct_method_request_receive()")
|
||||
class TestIoTHubMQTTClientDisableDirectMethodRequestReceive(IoTHubMQTTClientDisableReceiveTest):
|
||||
@pytest.fixture
|
||||
def method_name(self):
|
||||
return "disable_method_request_receive"
|
||||
return "disable_direct_method_request_receive"
|
||||
|
||||
@pytest.fixture
|
||||
def expected_topic(self):
|
||||
return mqtt_topic.get_method_topic_for_subscribe()
|
||||
return mqtt_topic.get_direct_method_request_topic_for_subscribe()
|
||||
|
||||
|
||||
@pytest.mark.describe("IoTHubMQTTClient - .enable_twin_patch_receive()")
|
||||
|
@ -2502,13 +2505,13 @@ class TestIoTHubMQTTClientIncomingInputMessages:
|
|||
assert msg.content_encoding == "utf-8"
|
||||
|
||||
|
||||
@pytest.mark.describe("IoTHubMQTTClient - .incoming_method_requests")
|
||||
class TestIoTHubMQTTClientIncomingMethodRequests:
|
||||
@pytest.mark.describe("IoTHubMQTTClient - .incoming_direct_s")
|
||||
class TestIoTHubMQTTClientIncomingDirectMethodRequests:
|
||||
@pytest.mark.it(
|
||||
"Yields a MethodRequest whenever the MQTTClient receives an MQTTMessage on the incoming method request topic"
|
||||
"Yields a DirectMethodRequest whenever the MQTTClient receives an MQTTMessage on the incoming direct method request topic"
|
||||
)
|
||||
async def test_yields_method_request(self, client):
|
||||
generic_topic = mqtt_topic.get_method_topic_for_subscribe()
|
||||
async def test_yields_direct_(self, client):
|
||||
generic_topic = mqtt_topic.get_direct_method_request_topic_for_subscribe()
|
||||
|
||||
# Create MQTTMessages
|
||||
mreq1_name = "some_method"
|
||||
|
@ -2529,17 +2532,17 @@ class TestIoTHubMQTTClientIncomingMethodRequests:
|
|||
await client._mqtt_client._incoming_filtered_messages[generic_topic].put(mqtt_msg2)
|
||||
|
||||
# Get items from generator
|
||||
mreq1 = await client.incoming_method_requests.__anext__()
|
||||
assert isinstance(mreq1, models.MethodRequest)
|
||||
mreq2 = await client.incoming_method_requests.__anext__()
|
||||
assert isinstance(mreq2, models.MethodRequest)
|
||||
mreq1 = await client.incoming_direct_method_requests.__anext__()
|
||||
assert isinstance(mreq1, models.DirectMethodRequest)
|
||||
mreq2 = await client.incoming_direct_method_requests.__anext__()
|
||||
assert isinstance(mreq2, models.DirectMethodRequest)
|
||||
assert mreq1 != mreq2
|
||||
|
||||
@pytest.mark.it(
|
||||
"Extracts the method name and request id from the MQTTMessage's topic and sets them on the resulting MethodRequest"
|
||||
"Extracts the method name and request id from the MQTTMessage's topic and sets them on the resulting DirectMethodRequest"
|
||||
)
|
||||
async def test_method_request_attributes(self, client):
|
||||
generic_topic = mqtt_topic.get_method_topic_for_subscribe()
|
||||
async def test_direct_method_request_attributes(self, client):
|
||||
generic_topic = mqtt_topic.get_direct_method_request_topic_for_subscribe()
|
||||
|
||||
mreq_name = "some_method"
|
||||
mreq_id = "12"
|
||||
|
@ -2548,16 +2551,16 @@ class TestIoTHubMQTTClientIncomingMethodRequests:
|
|||
mqtt_msg1.payload = '{"json": "in", "a": {"string": "format"}}'.encode("utf-8")
|
||||
|
||||
await client._mqtt_client._incoming_filtered_messages[generic_topic].put(mqtt_msg1)
|
||||
mreq = await client.incoming_method_requests.__anext__()
|
||||
mreq = await client.incoming_direct_method_requests.__anext__()
|
||||
|
||||
assert mreq.name == mreq_name
|
||||
assert mreq.request_id == mreq_id
|
||||
|
||||
@pytest.mark.it(
|
||||
"Derives the yielded MethodRequest JSON payload from the MQTTMessage's byte payload using the utf-8 codec"
|
||||
"Derives the yielded DirectMethodRequest JSON payload from the MQTTMessage's byte payload using the utf-8 codec"
|
||||
)
|
||||
async def test_payload(self, client):
|
||||
generic_topic = mqtt_topic.get_method_topic_for_subscribe()
|
||||
generic_topic = mqtt_topic.get_direct_method_request_topic_for_subscribe()
|
||||
expected_payload = {"json": "derived", "from": {"byte": "payload"}}
|
||||
|
||||
mreq_name = "some_method"
|
||||
|
@ -2567,7 +2570,7 @@ class TestIoTHubMQTTClientIncomingMethodRequests:
|
|||
mqtt_msg1.payload = json.dumps(expected_payload).encode("utf-8")
|
||||
|
||||
await client._mqtt_client._incoming_filtered_messages[generic_topic].put(mqtt_msg1)
|
||||
mreq = await client.incoming_method_requests.__anext__()
|
||||
mreq = await client.incoming_direct_method_requests.__anext__()
|
||||
|
||||
assert mreq.payload == expected_payload
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
import pytest
|
||||
import logging
|
||||
from v3_async_wip.models import Message, MethodRequest, MethodResponse
|
||||
from v3_async_wip.models import Message, DirectMethodRequest, DirectMethodResponse
|
||||
from v3_async_wip import constant
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
@ -231,51 +231,51 @@ class TestMessage:
|
|||
assert message.content_type == "text/plain"
|
||||
|
||||
|
||||
@pytest.mark.describe("MethodRequest")
|
||||
class TestMethodRequest:
|
||||
@pytest.mark.describe("DirectMethodRequest")
|
||||
class TestDirectMethodRequest:
|
||||
@pytest.mark.it("Instantiates with the provided 'request_id' set as an attribute")
|
||||
def test_request_id(self):
|
||||
m_req = MethodRequest(request_id=FAKE_RID, name=FAKE_METHOD_NAME, payload={})
|
||||
m_req = DirectMethodRequest(request_id=FAKE_RID, name=FAKE_METHOD_NAME, payload={})
|
||||
assert m_req.request_id == FAKE_RID
|
||||
|
||||
@pytest.mark.it("Instantiates with the provided 'name' set as an attribute")
|
||||
def test_name(self):
|
||||
m_req = MethodRequest(request_id=FAKE_RID, name=FAKE_METHOD_NAME, payload={})
|
||||
m_req = DirectMethodRequest(request_id=FAKE_RID, name=FAKE_METHOD_NAME, payload={})
|
||||
assert m_req.name == FAKE_METHOD_NAME
|
||||
|
||||
@pytest.mark.it("Instantiates with the provided 'payload' set as an attribute")
|
||||
@pytest.mark.parametrize("payload", json_serializable_payload_params)
|
||||
def test_payload(self, payload):
|
||||
m_req = MethodRequest(request_id=FAKE_RID, name=FAKE_METHOD_NAME, payload=payload)
|
||||
m_req = DirectMethodRequest(request_id=FAKE_RID, name=FAKE_METHOD_NAME, payload=payload)
|
||||
assert m_req.payload == payload
|
||||
|
||||
|
||||
@pytest.mark.describe("MethodResponse")
|
||||
class TestMethodResponse:
|
||||
@pytest.mark.describe("DirectMethodResponse")
|
||||
class TestDirectMethodResponse:
|
||||
@pytest.mark.it("Instantiates with the provided 'request_id' set as an attribute")
|
||||
def test_request_id(self):
|
||||
m_resp = MethodResponse(request_id=FAKE_RID, status=FAKE_STATUS, payload={})
|
||||
m_resp = DirectMethodResponse(request_id=FAKE_RID, status=FAKE_STATUS, payload={})
|
||||
assert m_resp.request_id == FAKE_RID
|
||||
|
||||
@pytest.mark.it("Instantiates with the provided 'status' set as an attribute")
|
||||
def test_status(self):
|
||||
m_resp = MethodResponse(request_id=FAKE_RID, status=FAKE_STATUS, payload={})
|
||||
m_resp = DirectMethodResponse(request_id=FAKE_RID, status=FAKE_STATUS, payload={})
|
||||
assert m_resp.status == FAKE_STATUS
|
||||
|
||||
@pytest.mark.it("Instantiates with the optional provided 'payload' set as an attribute")
|
||||
@pytest.mark.parametrize("payload", json_serializable_payload_params)
|
||||
def test_payload(self, payload):
|
||||
m_resp = MethodResponse(request_id=FAKE_RID, status=FAKE_STATUS, payload=payload)
|
||||
m_resp = DirectMethodResponse(request_id=FAKE_RID, status=FAKE_STATUS, payload=payload)
|
||||
assert m_resp.payload == payload
|
||||
|
||||
@pytest.mark.it("Can be instantiated from a MethodResponse via factory API")
|
||||
@pytest.mark.it("Can be instantiated from a DirectMethodResponse via factory API")
|
||||
@pytest.mark.parametrize("payload", json_serializable_payload_params)
|
||||
def test_factory(self, payload):
|
||||
m_req = MethodRequest(request_id=FAKE_RID, name=FAKE_METHOD_NAME, payload={})
|
||||
m_resp = MethodResponse.create_from_method_request(
|
||||
m_req = DirectMethodRequest(request_id=FAKE_RID, name=FAKE_METHOD_NAME, payload={})
|
||||
m_resp = DirectMethodResponse.create_from_method_request(
|
||||
method_request=m_req, status=FAKE_STATUS, payload=payload
|
||||
)
|
||||
assert isinstance(m_resp, MethodResponse)
|
||||
assert isinstance(m_resp, DirectMethodResponse)
|
||||
assert m_resp.request_id == m_req.request_id
|
||||
assert m_resp.status == FAKE_STATUS
|
||||
assert m_resp.payload == payload
|
||||
|
|
|
@ -121,11 +121,11 @@ class TestGetInputTopicForSubscribe:
|
|||
assert topic == expected_topic
|
||||
|
||||
|
||||
@pytest.mark.describe(".get_method_topic_for_subscribe()")
|
||||
@pytest.mark.describe(".get_direct_method_request_topic_for_subscribe()")
|
||||
class TestGetMethodTopicForSubscribe:
|
||||
@pytest.mark.it("Returns the topic for subscribing to methods from IoTHub")
|
||||
def test_returns_topic(self):
|
||||
topic = mqtt_topic_iothub.get_method_topic_for_subscribe()
|
||||
topic = mqtt_topic_iothub.get_direct_method_request_topic_for_subscribe()
|
||||
assert topic == "$iothub/methods/POST/#"
|
||||
|
||||
|
||||
|
@ -229,9 +229,9 @@ class TestGetTelemetryTopicForPublish:
|
|||
assert topic == expected_topic
|
||||
|
||||
|
||||
@pytest.mark.describe(".get_method_topic_for_publish()")
|
||||
@pytest.mark.describe(".get_direct_method_response_topic_for_publish()")
|
||||
class TestGetMethodTopicForPublish:
|
||||
@pytest.mark.it("Returns the topic for sending a method response to IoTHub")
|
||||
@pytest.mark.it("Returns the topic for sending a direct method response to IoTHub")
|
||||
@pytest.mark.parametrize(
|
||||
"request_id, status, expected_topic",
|
||||
[
|
||||
|
@ -242,7 +242,7 @@ class TestGetMethodTopicForPublish:
|
|||
],
|
||||
)
|
||||
def test_returns_topic(self, request_id, status, expected_topic):
|
||||
topic = mqtt_topic_iothub.get_method_topic_for_publish(request_id, status)
|
||||
topic = mqtt_topic_iothub.get_direct_method_response_topic_for_publish(request_id, status)
|
||||
assert topic == expected_topic
|
||||
|
||||
@pytest.mark.it("URL encodes provided values when generating the topic")
|
||||
|
@ -270,7 +270,7 @@ class TestGetMethodTopicForPublish:
|
|||
],
|
||||
)
|
||||
def test_url_encoding(self, request_id, status, expected_topic):
|
||||
topic = mqtt_topic_iothub.get_method_topic_for_publish(request_id, status)
|
||||
topic = mqtt_topic_iothub.get_direct_method_response_topic_for_publish(request_id, status)
|
||||
assert topic == expected_topic
|
||||
|
||||
@pytest.mark.it("Converts the provided values to strings when generating the topic")
|
||||
|
@ -278,7 +278,7 @@ class TestGetMethodTopicForPublish:
|
|||
request_id = 1
|
||||
status = 200
|
||||
expected_topic = "$iothub/methods/res/200/?$rid=1"
|
||||
topic = mqtt_topic_iothub.get_method_topic_for_publish(request_id, status)
|
||||
topic = mqtt_topic_iothub.get_direct_method_response_topic_for_publish(request_id, status)
|
||||
assert topic == expected_topic
|
||||
|
||||
|
||||
|
@ -588,15 +588,16 @@ class TestExtractPropertiesFromMessageTopic:
|
|||
mqtt_topic_iothub.extract_properties_from_message_topic(topic)
|
||||
|
||||
|
||||
@pytest.mark.describe(".extract_name_from_method_request_topic()")
|
||||
@pytest.mark.describe(".extract_name_from_direct_method_request_topic()")
|
||||
class TestExtractNameFromMethodRequestTopic:
|
||||
@pytest.mark.it("Returns the method name from a method topic")
|
||||
def test_valid_method_topic(self):
|
||||
def test_valid_direct_method_topic(self):
|
||||
topic = "$iothub/methods/POST/fake_method/?$rid=1"
|
||||
expected_method_name = "fake_method"
|
||||
|
||||
assert (
|
||||
mqtt_topic_iothub.extract_name_from_method_request_topic(topic) == expected_method_name
|
||||
mqtt_topic_iothub.extract_name_from_direct_method_request_topic(topic)
|
||||
== expected_method_name
|
||||
)
|
||||
|
||||
@pytest.mark.it("URL decodes the returned method name")
|
||||
|
@ -617,7 +618,8 @@ class TestExtractNameFromMethodRequestTopic:
|
|||
)
|
||||
def test_url_decodes_value(self, topic, expected_method_name):
|
||||
assert (
|
||||
mqtt_topic_iothub.extract_name_from_method_request_topic(topic) == expected_method_name
|
||||
mqtt_topic_iothub.extract_name_from_direct_method_request_topic(topic)
|
||||
== expected_method_name
|
||||
)
|
||||
|
||||
@pytest.mark.it("Raises a ValueError if the provided topic is not a method topic")
|
||||
|
@ -632,20 +634,20 @@ class TestExtractNameFromMethodRequestTopic:
|
|||
pytest.param("$iothub/methdos/POST/fake_method/?$rid=1", id="Malformed topic"),
|
||||
],
|
||||
)
|
||||
def test_invalid_method_topic(self, topic):
|
||||
def test_invalid_direct_method_topic(self, topic):
|
||||
with pytest.raises(ValueError):
|
||||
mqtt_topic_iothub.extract_name_from_method_request_topic(topic)
|
||||
mqtt_topic_iothub.extract_name_from_direct_method_request_topic(topic)
|
||||
|
||||
|
||||
@pytest.mark.describe(".extract_request_id_from_method_request_topic()")
|
||||
@pytest.mark.describe(".extract_request_id_from_direct_method_request_topic()")
|
||||
class TestExtractRequestIdFromMethodRequestTopic:
|
||||
@pytest.mark.it("Returns the request id from a method topic")
|
||||
def test_valid_method_topic(self):
|
||||
def test_valid_direct_method_topic(self):
|
||||
topic = "$iothub/methods/POST/fake_method/?$rid=1"
|
||||
expected_request_id = "1"
|
||||
|
||||
assert (
|
||||
mqtt_topic_iothub.extract_request_id_from_method_request_topic(topic)
|
||||
mqtt_topic_iothub.extract_request_id_from_direct_method_request_topic(topic)
|
||||
== expected_request_id
|
||||
)
|
||||
|
||||
|
@ -667,7 +669,7 @@ class TestExtractRequestIdFromMethodRequestTopic:
|
|||
)
|
||||
def test_url_decodes_value(self, topic, expected_request_id):
|
||||
assert (
|
||||
mqtt_topic_iothub.extract_request_id_from_method_request_topic(topic)
|
||||
mqtt_topic_iothub.extract_request_id_from_direct_method_request_topic(topic)
|
||||
== expected_request_id
|
||||
)
|
||||
|
||||
|
@ -683,9 +685,9 @@ class TestExtractRequestIdFromMethodRequestTopic:
|
|||
pytest.param("$iothub/methdos/POST/fake_method/?$rid=1", id="Malformed topic"),
|
||||
],
|
||||
)
|
||||
def test_invalid_method_topic(self, topic):
|
||||
def test_invalid_direct_method_topic(self, topic):
|
||||
with pytest.raises(ValueError):
|
||||
mqtt_topic_iothub.extract_request_id_from_method_request_topic(topic)
|
||||
mqtt_topic_iothub.extract_request_id_from_direct_method_request_topic(topic)
|
||||
|
||||
@pytest.mark.it("Raises a ValueError if the provided topic does not contain a request id")
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -698,7 +700,7 @@ class TestExtractRequestIdFromMethodRequestTopic:
|
|||
)
|
||||
def test_no_request_id(self, topic):
|
||||
with pytest.raises(ValueError):
|
||||
mqtt_topic_iothub.extract_request_id_from_method_request_topic(topic)
|
||||
mqtt_topic_iothub.extract_request_id_from_direct_method_request_topic(topic)
|
||||
|
||||
|
||||
@pytest.mark.describe(".extract_status_code_from_twin_response_topic()")
|
||||
|
|
|
@ -35,7 +35,7 @@ class ProxyOptions:
|
|||
self,
|
||||
proxy_type: str,
|
||||
proxy_address: str,
|
||||
proxy_port: int,
|
||||
proxy_port: Optional[int] = None,
|
||||
proxy_username: Optional[str] = None,
|
||||
proxy_password: Optional[str] = None,
|
||||
):
|
||||
|
@ -48,9 +48,14 @@ class ProxyOptions:
|
|||
If it is not provided, authentication will not be used (servers may accept unauthenticated requests).
|
||||
:param str proxy_password: (optional) This parameter is valid only for SOCKS5 servers and specifies the respective password for the username provided.
|
||||
"""
|
||||
# TODO: port default
|
||||
# TODO: is that documentation about auth only being used on SOCKS accurate? Seems inaccurate.
|
||||
(self.proxy_type, self.proxy_type_socks) = _format_proxy_type(proxy_type)
|
||||
self.proxy_address = proxy_address
|
||||
self.proxy_port = int(proxy_port)
|
||||
if proxy_port is None:
|
||||
self.proxy_port = _derive_default_proxy_port(self.proxy_type)
|
||||
else:
|
||||
self.proxy_port = int(proxy_port)
|
||||
self.proxy_username = proxy_username
|
||||
self.proxy_password = proxy_password
|
||||
|
||||
|
@ -111,6 +116,7 @@ class IoTHubClientConfig(ClientConfig):
|
|||
*,
|
||||
device_id: str,
|
||||
module_id: Optional[str] = None,
|
||||
is_edge_module: bool = False,
|
||||
product_info: str = "",
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
|
@ -119,12 +125,14 @@ class IoTHubClientConfig(ClientConfig):
|
|||
|
||||
:param str device_id: The device identity being used with the IoTHub
|
||||
:param str module_id: The module identity being used with the IoTHub
|
||||
:param bool is_edge_module: Boolean indicating whether or not using an Edge Module
|
||||
:param str product_info: A custom identification string.
|
||||
|
||||
Additional parameters found in the docstring of the parent class
|
||||
"""
|
||||
self.device_id = device_id
|
||||
self.module_id = module_id
|
||||
self.is_edge_module = is_edge_module
|
||||
self.product_info = product_info
|
||||
super().__init__(**kwargs)
|
||||
|
||||
|
@ -157,6 +165,13 @@ def _format_proxy_type(proxy_type):
|
|||
raise ValueError("Invalid Proxy Type")
|
||||
|
||||
|
||||
def _derive_default_proxy_port(proxy_type):
|
||||
if proxy_type == "HTTP":
|
||||
return 8080
|
||||
else:
|
||||
return 1080
|
||||
|
||||
|
||||
def _sanitize_keep_alive(keep_alive):
|
||||
try:
|
||||
keep_alive = int(keep_alive)
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
# --------------------------------------------------------------------------
|
||||
|
||||
from typing import Union, Dict, List, Tuple
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
# typing does not support recursion, so we must use forward references here (PEP484)
|
||||
JSONSerializable = Union[
|
||||
|
@ -19,5 +20,22 @@ JSONSerializable = Union[
|
|||
]
|
||||
# TODO: verify that the JSON specification requires str as keys in dict. Not sure why that's defined here.
|
||||
|
||||
|
||||
Twin = Dict[str, Dict[str, JSONSerializable]]
|
||||
TwinPatch = Dict[str, JSONSerializable]
|
||||
|
||||
|
||||
# TODO: should this be "direct method?"
|
||||
class MethodParameters(TypedDict):
|
||||
methodName: str
|
||||
payload: str
|
||||
connectTimeoutInSeconds: int
|
||||
responseTimeoutInSeconds: int
|
||||
|
||||
|
||||
class StorageInfo(TypedDict):
|
||||
correlationId: str
|
||||
hostName: str
|
||||
containerName: str
|
||||
blobName: str
|
||||
sasToken: str
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
# --------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
import logging
|
||||
import urllib
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_direct_method_invoke_path(device_id: str, module_id: Optional[str] = None) -> str:
|
||||
"""
|
||||
:return: The relative path for invoking methods from one module to a device or module. It is of the format
|
||||
/twins/uri_encode($device_id)/modules/uri_encode($module_id)/methods
|
||||
"""
|
||||
if module_id:
|
||||
return "/twins/{device_id}/modules/{module_id}/methods".format(
|
||||
device_id=urllib.parse.quote_plus(device_id),
|
||||
module_id=urllib.parse.quote_plus(module_id),
|
||||
)
|
||||
else:
|
||||
return "/twins/{device_id}/methods".format(device_id=urllib.parse.quote_plus(device_id))
|
||||
|
||||
|
||||
def get_storage_info_for_blob_path(device_id: str):
|
||||
"""
|
||||
This does not take a module_id since get_storage_info_for_blob_path should only ever be invoked on device clients.
|
||||
|
||||
:return: The relative path for getting the storage sdk credential information from IoT Hub. It is of the format
|
||||
/devices/uri_encode($device_id)/files
|
||||
"""
|
||||
return "/devices/{}/files".format(urllib.parse.quote_plus(device_id))
|
||||
|
||||
|
||||
def get_notify_blob_upload_status_path(device_id: str):
|
||||
"""
|
||||
This does not take a module_id since get_notify_blob_upload_status_path should only ever be invoked on device clients.
|
||||
|
||||
:return: The relative path for getting the storage sdk credential information from IoT Hub. It is of the format
|
||||
/devices/uri_encode($device_id)/files/notifications
|
||||
"""
|
||||
return "/devices/{}/files/notifications".format(urllib.parse.quote_plus(device_id))
|
|
@ -0,0 +1,284 @@
|
|||
# --------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# --------------------------------------------------------------------------
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import logging
|
||||
import urllib.parse
|
||||
from typing import Optional, cast
|
||||
from .custom_typing import MethodParameters, StorageInfo
|
||||
from .iot_exceptions import IoTHubClientError, IoTHubError, IoTEdgeError
|
||||
from . import config, constant, user_agent
|
||||
from . import http_path_iothub as http_path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Header Definitions
|
||||
HEADER_AUTHORIZATION = "Authorization"
|
||||
HEADER_EDGE_MODULE_ID = "x-ms-edge-moduleId"
|
||||
HEADER_USER_AGENT = "User-Agent"
|
||||
|
||||
# Query parameter definitions
|
||||
PARAM_API_VERISON = "api-version"
|
||||
|
||||
# Other definitions
|
||||
HTTP_TIMEOUT = 10
|
||||
|
||||
# NOTE: Outstanding items in this module:
|
||||
# TODO: document aiohttp exceptions that can be raised
|
||||
# TODO: URL Encoding logic
|
||||
# TODO: Proxy support
|
||||
# TODO: Hostname/Gateway Hostname split (E2E test to see what works)
|
||||
# TODO: Should direct method responses be a DirectMethodResponse object? If so, what is the rid?
|
||||
# See specific inline commentary for more details on what is required
|
||||
|
||||
|
||||
# NOTE: aiohttp 3.x is bugged on Windows on Python 3.8.x - 3.10.6
|
||||
# If running the application using asyncio.run(), there will be an issue with the Event Loop
|
||||
# raising a spurious RuntimeError on application exit.
|
||||
#
|
||||
# Windows Event Loops are notoriously tricky to deal with. This issue stems from the use of the
|
||||
# default ProactorEventLoop, and can be mitigated by switching to a SelectorEventLoop, but
|
||||
# we as SDK developers really ought not be modifying the end user's event loop, or monkeypatching
|
||||
# error suppression into it. Furthermore, switching to a SelectorEvenLoop has some degradation of
|
||||
# functionality.
|
||||
#
|
||||
# The best course of action is for the end user to use loop.run_until_complete() instead of
|
||||
# asyncio.run() in their application, as this will allow for better cleanup.
|
||||
#
|
||||
# Eventually when there is an aiohttp 4.x released, this bug will be eliminated from all versions
|
||||
# of Python, but until then, there's not much to be done about it.
|
||||
#
|
||||
# See: https://github.com/aio-libs/aiohttp/issues/4324, as well as many, many other similar issues
|
||||
# for more details.
|
||||
|
||||
|
||||
class IoTHubHTTPClient:
|
||||
def __init__(self, client_config: config.IoTHubClientConfig) -> None:
|
||||
"""Instantiate the client
|
||||
|
||||
:param client_config: The config object for the client
|
||||
:type client_config: :class:`IoTHubClientConfig`
|
||||
"""
|
||||
self._device_id = client_config.device_id
|
||||
self._module_id = client_config.module_id
|
||||
self._edge_module_id = _format_edge_module_id(
|
||||
self._device_id, self._module_id, client_config.is_edge_module
|
||||
)
|
||||
self._user_agent_string = user_agent.get_iothub_user_agent() + client_config.product_info
|
||||
if client_config.gateway_hostname:
|
||||
self._hostname = client_config.gateway_hostname
|
||||
else:
|
||||
self._hostname = client_config.hostname
|
||||
|
||||
# TODO: add proxy support
|
||||
# Doing so will require building a custom "Connector" that can be injected into the
|
||||
# Session object. There are many examples around online.
|
||||
# The built in per-request proxy of aiohttp is only partly functional, so I decided to
|
||||
# not even bother implementing it, if it only does half the job.
|
||||
if client_config.proxy_options:
|
||||
# TODO: these warnings should probably be at API level
|
||||
logger.warning("Proxy use with .invoke_direct_method() not supported")
|
||||
logger.warning("Proxy use with .get_storage_info_for_blob() not supported")
|
||||
logger.warning("Proxy use with .notify_blob_upload_status() not supported")
|
||||
|
||||
self._session = _create_client_session(self._hostname)
|
||||
self._ssl_context = client_config.ssl_context
|
||||
self._sastoken_provider = client_config.sastoken_provider
|
||||
|
||||
async def shutdown(self):
|
||||
"""Shut down the client
|
||||
|
||||
Invoke only when complete finished with the client for graceful exit.
|
||||
"""
|
||||
await self._session.close()
|
||||
# Wait 250ms for the underlying SSL connections to close
|
||||
# See: https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
# TODO: Should this return type be a MethodResponse? Or should we get rid of those objects entirely?
|
||||
# TODO: Either way, need a better rtype than "dict"
|
||||
async def invoke_direct_method(
|
||||
self, *, device_id: str, module_id: Optional[str] = None, method_params: MethodParameters
|
||||
) -> dict:
|
||||
"""Send a request to invoke a direct method on a target device or module
|
||||
|
||||
:param str device_id: The target device ID
|
||||
:param str module_id: The target module ID
|
||||
:param dict method_params: The parameters for the direct method invocation
|
||||
|
||||
:raises: :class:`IoTHubClientError` if not using an IoT Edge Module
|
||||
:raises: :class:`IoTHubClientError` if the direct method response cannot be parsed
|
||||
:raises: :class:`IoTEdgeError` if IoT Edge responds with failure
|
||||
"""
|
||||
if not self._edge_module_id:
|
||||
raise IoTHubClientError(".invoke_direct_method() only available for Edge Modules")
|
||||
|
||||
path = http_path.get_direct_method_invoke_path(device_id, module_id)
|
||||
query_params = {PARAM_API_VERISON: constant.IOTHUB_API_VERSION}
|
||||
# NOTE: Other headers are auto-generated by aiohttp
|
||||
# TODO: we may need to explicitly add the Host header depending on how host/gateway host works out
|
||||
headers = {
|
||||
HEADER_USER_AGENT: urllib.parse.quote_plus(self._user_agent_string),
|
||||
HEADER_EDGE_MODULE_ID: self._edge_module_id, # TODO: I assume this isn't supposed to be URI encoded just like in MQTT?
|
||||
}
|
||||
# If using SAS auth, pass the auth header
|
||||
if self._sastoken_provider:
|
||||
headers[HEADER_AUTHORIZATION] = str(self._sastoken_provider.get_current_sastoken())
|
||||
|
||||
logger.debug(
|
||||
"Sending direct method invocation request to {device_id}/{module_id}".format(
|
||||
device_id=device_id, module_id=module_id
|
||||
)
|
||||
)
|
||||
async with self._session.post(
|
||||
url=path,
|
||||
json=method_params,
|
||||
params=query_params,
|
||||
headers=headers,
|
||||
ssl=self._ssl_context,
|
||||
) as response:
|
||||
|
||||
if response.status >= 300:
|
||||
logger.error("Received failure response from IoT Edge for direct method invocation")
|
||||
raise IoTEdgeError(
|
||||
"IoT Edge responded to direct method invocation with a failed status ({status}) - {reason}".format(
|
||||
status=response.status, reason=response.reason
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
"Successfully received response from IoT Edge for direct method invocation"
|
||||
)
|
||||
dm_response_json = await response.json()
|
||||
|
||||
return dm_response_json
|
||||
|
||||
async def get_storage_info_for_blob(self, *, blob_name: str) -> StorageInfo:
|
||||
"""Request information for uploading blob file via the Azure Storage SDK
|
||||
|
||||
:param str blob_name: The name of the blob that will be uploaded to the Azure Storage SDK
|
||||
|
||||
:returns: The Azure Storage information returned by IoTHub
|
||||
:rtype: dict
|
||||
|
||||
:raises: :class:`IoTHubClientError` if not using a Device
|
||||
:raises: :class:`IoTHubError` if IoTHub responds with failure
|
||||
"""
|
||||
if self._module_id:
|
||||
raise IoTHubClientError(".get_storage_info_for_blob() only available for Devices")
|
||||
|
||||
path = http_path.get_storage_info_for_blob_path(
|
||||
self._device_id
|
||||
) # TODO: is this bad that this is encoding? aiohttp encodes automatically
|
||||
query_params = {PARAM_API_VERISON: constant.IOTHUB_API_VERSION}
|
||||
data = {"blobName": blob_name}
|
||||
# NOTE: Other headers are auto-generated by aiohttp
|
||||
headers = {HEADER_USER_AGENT: urllib.parse.quote_plus(self._user_agent_string)}
|
||||
# If using SAS auth, pass the auth header
|
||||
if self._sastoken_provider:
|
||||
headers[HEADER_AUTHORIZATION] = str(self._sastoken_provider.get_current_sastoken())
|
||||
|
||||
logger.debug("Sending storage info request to IoTHub...")
|
||||
async with self._session.post(
|
||||
url=path,
|
||||
json=data,
|
||||
params=query_params,
|
||||
headers=headers,
|
||||
ssl=self._ssl_context,
|
||||
) as response:
|
||||
|
||||
if response.status >= 300:
|
||||
logger.error("Received failure response from IoTHub for storage info request")
|
||||
raise IoTHubError(
|
||||
"IoTHub responded to storage info request with a failed status ({status}) - {reason}".format(
|
||||
status=response.status, reason=response.reason
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.debug("Successfully received response from IoTHub for storage info request")
|
||||
storage_info = cast(StorageInfo, await response.json())
|
||||
|
||||
return storage_info
|
||||
|
||||
async def notify_blob_upload_status(
|
||||
self, *, correlation_id: str, is_success: bool, status_code: int, status_description: str
|
||||
) -> None:
|
||||
"""Notify IoTHub of the result of a Azure Storage SDK blob upload
|
||||
|
||||
:param str correlation_id: ID for the blob upload
|
||||
:param bool is_success: Indicates whether the file was uploaded successfully
|
||||
:param int status_code: A numeric status code for the file upload
|
||||
:param str status_description: A description that corresponds to the status_code
|
||||
|
||||
:raises: :class:`IoTHubClientError` if not using a Device
|
||||
:raises: :class:`IoTHubError` if IoTHub responds with failure
|
||||
"""
|
||||
if self._module_id:
|
||||
raise IoTHubClientError(".notify_blob_upload_status() only available for Devices")
|
||||
|
||||
path = http_path.get_notify_blob_upload_status_path(self._device_id)
|
||||
query_params = {PARAM_API_VERISON: constant.IOTHUB_API_VERSION}
|
||||
data = {
|
||||
"correlationId": correlation_id,
|
||||
"isSuccess": is_success,
|
||||
"statusCode": status_code,
|
||||
"statusDescription": status_description,
|
||||
}
|
||||
# NOTE: Other headers are auto-generated by aiohttp
|
||||
headers = {HEADER_USER_AGENT: urllib.parse.quote_plus(self._user_agent_string)}
|
||||
# If using SAS auth, pass the auth header
|
||||
if self._sastoken_provider:
|
||||
headers[HEADER_AUTHORIZATION] = str(self._sastoken_provider.get_current_sastoken())
|
||||
|
||||
logger.debug("Sending blob upload notification to IoTHub...")
|
||||
async with self._session.post(
|
||||
url=path,
|
||||
json=data,
|
||||
params=query_params,
|
||||
headers=headers,
|
||||
ssl=self._ssl_context,
|
||||
) as response:
|
||||
|
||||
if response.status >= 300:
|
||||
logger.error("Received failure response from IoTHub for blob upload notification")
|
||||
raise IoTHubError(
|
||||
"IoTHub responded to blob upload notification with a failed status ({status}) - {reason}".format(
|
||||
status=response.status, reason=response.reason
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
"Successfully received from response from IoTHub for blob upload notification"
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _format_edge_module_id(
|
||||
device_id: str, module_id: Optional[str], is_edge_module
|
||||
) -> Optional[str]:
|
||||
"""Returns the edge module identifier"""
|
||||
if is_edge_module:
|
||||
if module_id:
|
||||
return "{device_id}/{module_id}".format(device_id=device_id, module_id=module_id)
|
||||
else:
|
||||
# This shouldn't ever happen
|
||||
raise ValueError("Invalid configuration - Edge Module with no Module ID")
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def _create_client_session(hostname: str) -> aiohttp.ClientSession:
|
||||
"""Create and return a aiohttp ClientSession object"""
|
||||
base_url = "https://{hostname}".format(hostname=hostname)
|
||||
timeout = aiohttp.ClientTimeout(total=HTTP_TIMEOUT)
|
||||
session = aiohttp.ClientSession(base_url=base_url, timeout=timeout)
|
||||
logger.debug(
|
||||
"Creating HTTP Session for {url} with timeout of {timeout}".format(
|
||||
url=base_url, timeout=timeout.total
|
||||
)
|
||||
)
|
||||
return session
|
|
@ -11,7 +11,7 @@ import urllib.parse
|
|||
from typing import Optional, Union, AsyncGenerator
|
||||
from .custom_typing import TwinPatch, Twin
|
||||
from .iot_exceptions import IoTHubError, IoTHubClientError
|
||||
from .models import Message, MethodResponse, MethodRequest
|
||||
from .models import Message, DirectMethodResponse, DirectMethodRequest
|
||||
from . import config, constant, user_agent
|
||||
from . import request_response as rr
|
||||
from . import mqtt_client as mqtt
|
||||
|
@ -34,8 +34,7 @@ class IoTHubMQTTClient:
|
|||
self,
|
||||
client_config: config.IoTHubClientConfig,
|
||||
) -> None:
|
||||
"""
|
||||
Instantiate the client
|
||||
"""Instantiate the client
|
||||
|
||||
:param client_config: The config object for the client
|
||||
:type client_config: :class:`IoTHubClientConfig`
|
||||
|
@ -72,9 +71,9 @@ class IoTHubMQTTClient:
|
|||
self.incoming_input_messages: Optional[
|
||||
AsyncGenerator[Message, None]
|
||||
] = _create_input_message_generator(self._device_id, self._module_id, self._mqtt_client)
|
||||
self.incoming_method_requests: AsyncGenerator[
|
||||
MethodRequest, None
|
||||
] = _create_method_request_generator(self._mqtt_client)
|
||||
self.incoming_direct_method_requests: AsyncGenerator[
|
||||
DirectMethodRequest, None
|
||||
] = _create_direct_method_request_generator(self._mqtt_client)
|
||||
self.incoming_twin_patches: AsyncGenerator[TwinPatch, None] = _create_twin_patch_generator(
|
||||
self._mqtt_client
|
||||
)
|
||||
|
@ -145,8 +144,7 @@ class IoTHubMQTTClient:
|
|||
break
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
"""
|
||||
Shut down the client.
|
||||
"""Shut down the client.
|
||||
|
||||
Invoke only when completely finished with the client for graceful exit.
|
||||
Cannot be cancelled - if you try, the client will still fully shut down as much as
|
||||
|
@ -229,25 +227,27 @@ class IoTHubMQTTClient:
|
|||
await self._mqtt_client.publish(topic, byte_payload)
|
||||
logger.debug("Sending telemetry message succeeded")
|
||||
|
||||
async def send_method_response(self, method_response: MethodResponse):
|
||||
"""Send a method response to IoTHub.
|
||||
async def send_direct_method_response(self, method_response: DirectMethodResponse) -> None:
|
||||
"""Send a direct method response to IoTHub.
|
||||
|
||||
:param method_response: The MethodResponse to be sent
|
||||
:type method_response: :class:`models.MethodResponse`
|
||||
:param method_response: The DirectMethodResponse to be sent
|
||||
:type method_response: :class:`models.DirectMethodResponse`
|
||||
|
||||
:raises: MQTTError if there is an error sending the MethodResponse
|
||||
:raises: ValueError if the size of the MethodResponse payload is too large
|
||||
:raises: MQTTError if there is an error sending the DirectMethodResponse
|
||||
:raises: ValueError if the size of the DirectMethodResponse payload is too large
|
||||
"""
|
||||
topic = mqtt_topic.get_method_topic_for_publish(
|
||||
topic = mqtt_topic.get_direct_method_response_topic_for_publish(
|
||||
method_response.request_id, method_response.status
|
||||
)
|
||||
payload = json.dumps(method_response.payload)
|
||||
logger.debug(
|
||||
"Sending method response to IoTHub... (rid: {})".format(method_response.request_id)
|
||||
"Sending direct method response to IoTHub... (rid: {})".format(
|
||||
method_response.request_id
|
||||
)
|
||||
)
|
||||
await self._mqtt_client.publish(topic, payload)
|
||||
logger.debug(
|
||||
"Sending method response succeeded (rid: {})".format(method_response.request_id)
|
||||
"Sending direct method response succeeded (rid: {})".format(method_response.request_id)
|
||||
)
|
||||
|
||||
async def send_twin_patch(self, patch: TwinPatch) -> None:
|
||||
|
@ -441,27 +441,29 @@ class IoTHubMQTTClient:
|
|||
await self._mqtt_client.unsubscribe(topic)
|
||||
logger.debug("Input message receive disabled")
|
||||
|
||||
async def enable_method_request_receive(self) -> None:
|
||||
"""Enable the ability to receive method requests
|
||||
async def enable_direct_method_request_receive(self) -> None:
|
||||
"""Enable the ability to receive direct method requests
|
||||
|
||||
:raises: MQTTError if there is an error enabling method request receive
|
||||
:raises: CancelledError if enabling method request receive is cancelled by network failure
|
||||
:raises: MQTTError if there is an error enabling direct method request receive
|
||||
:raises: CancelledError if enabling direct method request receive is cancelled by
|
||||
network failure
|
||||
"""
|
||||
logger.debug("Enabling receive for method requests...")
|
||||
topic = mqtt_topic.get_method_topic_for_subscribe()
|
||||
logger.debug("Enabling receive for direct method requests...")
|
||||
topic = mqtt_topic.get_direct_method_request_topic_for_subscribe()
|
||||
await self._mqtt_client.subscribe(topic)
|
||||
logger.debug("Method request receive enabled")
|
||||
logger.debug("Direct method request receive enabled")
|
||||
|
||||
async def disable_method_request_receive(self) -> None:
|
||||
"""Disable the ability to receive method requests
|
||||
async def disable_direct_method_request_receive(self) -> None:
|
||||
"""Disable the ability to receive direct method requests
|
||||
|
||||
:raises: MQTTError if there is an error disabling method request receive
|
||||
:raises: CancelledError if disabling method request receive is cancelled by network failure
|
||||
:raises: MQTTError if there is an error disabling direct method request receive
|
||||
:raises: CancelledError if disabling direct method request receive is cancelled by
|
||||
network failure
|
||||
"""
|
||||
logger.debug("Disabling receive for method requests...")
|
||||
topic = mqtt_topic.get_method_topic_for_subscribe()
|
||||
logger.debug("Disabling receive for direct method requests...")
|
||||
topic = mqtt_topic.get_direct_method_request_topic_for_subscribe()
|
||||
await self._mqtt_client.unsubscribe(topic)
|
||||
logger.debug("Method request receive disabled")
|
||||
logger.debug("Direct method request receive disabled")
|
||||
|
||||
async def enable_twin_patch_receive(self) -> None:
|
||||
"""Enable the ability to receive twin patches
|
||||
|
@ -544,7 +546,7 @@ def _create_mqtt_client(
|
|||
client_config.device_id, client_config.module_id
|
||||
)
|
||||
client.add_incoming_message_filter(input_msg_topic)
|
||||
method_request_topic = mqtt_topic.get_method_topic_for_subscribe()
|
||||
method_request_topic = mqtt_topic.get_direct_method_request_topic_for_subscribe()
|
||||
client.add_incoming_message_filter(method_request_topic)
|
||||
twin_patch_topic = mqtt_topic.get_twin_patch_topic_for_subscribe()
|
||||
client.add_incoming_message_filter(twin_patch_topic)
|
||||
|
@ -622,24 +624,30 @@ def _create_input_message_generator(
|
|||
return input_message_generator(mqtt_msg_generator)
|
||||
|
||||
|
||||
def _create_method_request_generator(
|
||||
def _create_direct_method_request_generator(
|
||||
mqtt_client: mqtt.MQTTClient,
|
||||
) -> AsyncGenerator[MethodRequest, None]:
|
||||
method_request_topic = mqtt_topic.get_method_topic_for_subscribe()
|
||||
) -> AsyncGenerator[DirectMethodRequest, None]:
|
||||
method_request_topic = mqtt_topic.get_direct_method_request_topic_for_subscribe()
|
||||
mqtt_msg_generator = mqtt_client.get_incoming_message_generator(method_request_topic)
|
||||
|
||||
async def method_request_generator(
|
||||
async def direct_method_request_generator(
|
||||
incoming_mqtt_messages: AsyncGenerator[mqtt.MQTTMessage, None]
|
||||
) -> AsyncGenerator[MethodRequest, None]:
|
||||
) -> AsyncGenerator[DirectMethodRequest, None]:
|
||||
async for mqtt_message in incoming_mqtt_messages:
|
||||
# TODO: should request_id be an int in this context?
|
||||
request_id = mqtt_topic.extract_request_id_from_method_request_topic(mqtt_message.topic)
|
||||
method_name = mqtt_topic.extract_name_from_method_request_topic(mqtt_message.topic)
|
||||
request_id = mqtt_topic.extract_request_id_from_direct_method_request_topic(
|
||||
mqtt_message.topic
|
||||
)
|
||||
method_name = mqtt_topic.extract_name_from_direct_method_request_topic(
|
||||
mqtt_message.topic
|
||||
)
|
||||
payload = json.loads(mqtt_message.payload.decode("utf-8"))
|
||||
method_request = MethodRequest(request_id=request_id, name=method_name, payload=payload)
|
||||
method_request = DirectMethodRequest(
|
||||
request_id=request_id, name=method_name, payload=payload
|
||||
)
|
||||
yield method_request
|
||||
|
||||
return method_request_generator(mqtt_msg_generator)
|
||||
return direct_method_request_generator(mqtt_msg_generator)
|
||||
|
||||
|
||||
def _create_twin_patch_generator(mqtt_client: mqtt.MQTTClient) -> AsyncGenerator[TwinPatch, None]:
|
||||
|
|
|
@ -149,7 +149,7 @@ class Message:
|
|||
return message
|
||||
|
||||
|
||||
class MethodRequest:
|
||||
class DirectMethodRequest:
|
||||
"""Represents a request to invoke a direct method.
|
||||
|
||||
:ivar str request_id: The request id.
|
||||
|
@ -159,7 +159,7 @@ class MethodRequest:
|
|||
"""
|
||||
|
||||
def __init__(self, request_id: str, name: str, payload: JSONSerializable) -> None:
|
||||
"""Initializer for a MethodRequest.
|
||||
"""Initializer for a DirectMethodRequest.
|
||||
|
||||
:param str request_id: The request id.
|
||||
:param str name: The name of the method to be invoked
|
||||
|
@ -171,20 +171,20 @@ class MethodRequest:
|
|||
self.payload = payload
|
||||
|
||||
|
||||
class MethodResponse:
|
||||
class DirectMethodResponse:
|
||||
"""Represents a response to a direct method.
|
||||
|
||||
:ivar str request_id: The request id of the MethodRequest being responded to.
|
||||
:ivar int status: The status of the execution of the MethodRequest.
|
||||
:ivar str request_id: The request id of the DirectMethodRequest being responded to.
|
||||
:ivar int status: The status of the execution of the DirectMethodRequest.
|
||||
:ivar payload: The JSON payload to be sent with the response.
|
||||
:type payload: dict, str, int, float, bool, or None (JSON compatible values)
|
||||
"""
|
||||
|
||||
def __init__(self, request_id: str, status: int, payload: JSONSerializable = None) -> None:
|
||||
"""Initializer for MethodResponse.
|
||||
"""Initializer for DirectMethodResponse.
|
||||
|
||||
:param str request_id: The request id of the MethodRequest being responded to.
|
||||
:param int status: The status of the execution of the MethodRequest.
|
||||
:param str request_id: The request id of the DirectMethodRequest being responded to.
|
||||
:param int status: The status of the execution of the DirectMethodRequest.
|
||||
:param payload: The JSON payload to be sent with the response. (OPTIONAL)
|
||||
:type payload: dict, str, int, float, bool, or None (JSON compatible values)
|
||||
"""
|
||||
|
@ -194,13 +194,13 @@ class MethodResponse:
|
|||
|
||||
@classmethod
|
||||
def create_from_method_request(
|
||||
cls, method_request: MethodRequest, status: int, payload: JSONSerializable = None
|
||||
cls, method_request: DirectMethodRequest, status: int, payload: JSONSerializable = None
|
||||
):
|
||||
"""Factory method for creating a MethodResponse from a MethodRequest.
|
||||
"""Factory method for creating a DirectMethodResponse from a DirectMethodRequest.
|
||||
|
||||
:param method_request: The MethodRequest object to respond to.
|
||||
:type method_request: MethodRequest.
|
||||
:param int status: The status of the execution of the MethodRequest.
|
||||
:param method_request: The DirectMethodRequest object to respond to.
|
||||
:type method_request: DirectMethodRequest.
|
||||
:param int status: The status of the execution of the DirectMethodRequest.
|
||||
:type payload: dict, str, int, float, bool, or None (JSON compatible values)
|
||||
"""
|
||||
return cls(request_id=method_request.request_id, status=status, payload=payload)
|
||||
|
|
|
@ -66,9 +66,9 @@ def get_input_topic_for_subscribe(device_id: str, module_id: str) -> str:
|
|||
return _get_topic_base(device_id, module_id) + "/inputs/#"
|
||||
|
||||
|
||||
def get_method_topic_for_subscribe() -> str:
|
||||
def get_direct_method_request_topic_for_subscribe() -> str:
|
||||
"""
|
||||
:return: The topic for ALL incoming methods. It is of the format
|
||||
:return: The topic for ALL incoming direct methods. It is of the format
|
||||
"$iothub/methods/POST/#"
|
||||
"""
|
||||
return "$iothub/methods/POST/#"
|
||||
|
@ -97,9 +97,9 @@ def get_telemetry_topic_for_publish(device_id: str, module_id: Optional[str] = N
|
|||
return _get_topic_base(device_id, module_id) + "/messages/events/"
|
||||
|
||||
|
||||
def get_method_topic_for_publish(request_id: str, status: Union[str, int]) -> str:
|
||||
def get_direct_method_response_topic_for_publish(request_id: str, status: Union[str, int]) -> str:
|
||||
"""
|
||||
:return: The topic for publishing method responses. It is of the format
|
||||
:return: The topic for publishing direct method responses. It is of the format
|
||||
"$iothub/methods/res/<status>/?$rid=<requestId>"
|
||||
"""
|
||||
return "$iothub/methods/res/{status}/?$rid={request_id}".format(
|
||||
|
@ -182,10 +182,10 @@ def extract_properties_from_message_topic(topic: str) -> Dict[str, str]:
|
|||
return _extract_properties(properties_string)
|
||||
|
||||
|
||||
def extract_name_from_method_request_topic(topic: str) -> str:
|
||||
def extract_name_from_direct_method_request_topic(topic: str) -> str:
|
||||
"""
|
||||
Extract the method name from the method topic.
|
||||
Topics for methods are of the following format:
|
||||
Extract the direct method name from the direct method topic.
|
||||
Topics for direct methods are of the following format:
|
||||
"$iothub/methods/POST/{method name}/?$rid={request id}"
|
||||
|
||||
:param str topic: The topic string
|
||||
|
@ -198,10 +198,10 @@ def extract_name_from_method_request_topic(topic: str) -> str:
|
|||
raise ValueError("topic has incorrect format")
|
||||
|
||||
|
||||
def extract_request_id_from_method_request_topic(topic: str) -> str:
|
||||
def extract_request_id_from_direct_method_request_topic(topic: str) -> str:
|
||||
"""
|
||||
Extract the Request ID (RID) from the method topic.
|
||||
Topics for methods are of the following format:
|
||||
Extract the Request ID (RID) from the direct method topic.
|
||||
Topics for direct methods are of the following format:
|
||||
"$iothub/methods/POST/{method name}/?$rid={request id}"
|
||||
|
||||
:param str topic: the topic string
|
||||
|
|
Загрузка…
Ссылка в новой задаче