From 8d9601dd2567430d58029eabfaac55dd08aa50aa Mon Sep 17 00:00:00 2001 From: Bert Kleewein Date: Wed, 6 Oct 2021 16:17:30 -0700 Subject: [PATCH] DeviceClient E2E tests (#861) * e2e tests inistial checkin * fix to run on py27 * Rename service helper objects * rename fixtures * spelling typo * client createion based on command line args * sas renewal tests improved and expanded * parametrize connect_disconnect tests * fix client_kwargs to use extra_client_args to override parametrized args * add logging * move send_message tests together --- .gitignore | 5 +- device_e2e/aio/conftest.py | 86 +++++ device_e2e/aio/test_c2d.py | 40 +++ device_e2e/aio/test_connect_disconnect.py | 59 ++++ device_e2e/aio/test_methods.py | 94 ++++++ device_e2e/aio/test_pnp_commands.py | 113 +++++++ device_e2e/aio/test_pnp_connect.py | 54 +++ device_e2e/aio/test_pnp_properties.py | 270 +++++++++++++++ device_e2e/aio/test_pnp_telemetry.py | 42 +++ device_e2e/aio/test_sas_renewal.py | 80 +++++ device_e2e/aio/test_send_message.py | 181 ++++++++++ device_e2e/aio/test_twin.py | 173 ++++++++++ device_e2e/client_fixtures.py | 81 +++++ device_e2e/conftest.py | 78 +++++ device_e2e/const.py | 15 + device_e2e/drop.py | 97 ++++++ device_e2e/drop_fixtures.py | 32 ++ device_e2e/e2e_settings.py | 51 +++ device_e2e/pnp_fixtures.py | 44 +++ device_e2e/pytest.ini | 11 + device_e2e/service_helper.py | 124 +++++++ device_e2e/service_helper_sync.py | 319 ++++++++++++++++++ device_e2e/sync/conftest.py | 73 ++++ device_e2e/sync/test_sync_c2d.py | 39 +++ .../sync/test_sync_connect_disconnect.py | 56 +++ device_e2e/sync/test_sync_methods.py | 94 ++++++ device_e2e/sync/test_sync_sas_renewal.py | 76 +++++ device_e2e/sync/test_sync_send_message.py | 167 +++++++++ device_e2e/sync/test_sync_twin.py | 171 ++++++++++ device_e2e/test_config.py | 42 +++ device_e2e/utils.py | 42 +++ 31 files changed, 2808 insertions(+), 1 deletion(-) create mode 100644 device_e2e/aio/conftest.py create mode 100644 device_e2e/aio/test_c2d.py create mode 100644 device_e2e/aio/test_connect_disconnect.py create mode 100644 device_e2e/aio/test_methods.py create mode 100644 device_e2e/aio/test_pnp_commands.py create mode 100644 device_e2e/aio/test_pnp_connect.py create mode 100644 device_e2e/aio/test_pnp_properties.py create mode 100644 device_e2e/aio/test_pnp_telemetry.py create mode 100644 device_e2e/aio/test_sas_renewal.py create mode 100644 device_e2e/aio/test_send_message.py create mode 100644 device_e2e/aio/test_twin.py create mode 100644 device_e2e/client_fixtures.py create mode 100644 device_e2e/conftest.py create mode 100644 device_e2e/const.py create mode 100644 device_e2e/drop.py create mode 100644 device_e2e/drop_fixtures.py create mode 100644 device_e2e/e2e_settings.py create mode 100644 device_e2e/pnp_fixtures.py create mode 100644 device_e2e/pytest.ini create mode 100644 device_e2e/service_helper.py create mode 100644 device_e2e/service_helper_sync.py create mode 100644 device_e2e/sync/conftest.py create mode 100644 device_e2e/sync/test_sync_c2d.py create mode 100644 device_e2e/sync/test_sync_connect_disconnect.py create mode 100644 device_e2e/sync/test_sync_methods.py create mode 100644 device_e2e/sync/test_sync_sas_renewal.py create mode 100644 device_e2e/sync/test_sync_send_message.py create mode 100644 device_e2e/sync/test_sync_twin.py create mode 100644 device_e2e/test_config.py create mode 100644 device_e2e/utils.py diff --git a/.gitignore b/.gitignore index 4b9d6fb05..64b1e9743 100644 --- a/.gitignore +++ b/.gitignore @@ -121,4 +121,7 @@ coverage/ # Certificates *.pem demoCA/ -*.rnd \ No newline at end of file +*.rnd + +# e2e +**/_e2e_settings.json diff --git a/device_e2e/aio/conftest.py b/device_e2e/aio/conftest.py new file mode 100644 index 000000000..952e142e6 --- /dev/null +++ b/device_e2e/aio/conftest.py @@ -0,0 +1,86 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import asyncio +import functools +import time +import e2e_settings +import test_config +import logging +from service_helper import ServiceHelper +from azure.iot.device.iothub.aio import IoTHubDeviceClient, IoTHubModuleClient + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + + +@pytest.fixture(scope="module") +def event_loop(): + loop = asyncio.get_event_loop() + yield loop + loop.close() + + +@pytest.fixture(scope="function") +async def brand_new_client(client_kwargs): + client = None + + if test_config.config.identity == test_config.IDENTITY_DEVICE_CLIENT: + ClientClass = IoTHubDeviceClient + elif test_config.config.identity == test_config.IDENTITY_MODULE_CLIENT: + ClientClass = IoTHubModuleClient + else: + raise Exception("config.identity invalid") + + if test_config.config.auth == test_config.AUTH_CONNECTION_STRING: + # TODO: This is currently using a connection string stored in _e2e_settings.xml. This will move to be a dynamically created identity similar to the way node's device_identity_helper.js works. + logger.info( + "Creating {} using create_from_connection_string with kwargs={}".format( + ClientClass, client_kwargs + ) + ) + client = ClientClass.create_from_connection_string( + e2e_settings.DEVICE_CONNECTION_STRING, **client_kwargs + ) + elif test_config.config.auth == test_config.X509: + # need to implement + raise Exception("X509 Auth not yet implemented") + else: + raise Exception("config.auth invalid") + + yield client + + await client.shutdown() + + +@pytest.fixture(scope="function") +async def client(brand_new_client): + client = brand_new_client + + await client.connect() + + yield client + + +@pytest.fixture(scope="module") +async def service_helper(event_loop, executor): + service_helper = ServiceHelper(event_loop, executor) + time.sleep(1) + yield service_helper + print("shutting down") + await service_helper.shutdown() + + +@pytest.fixture(scope="function") +def get_next_eventhub_arrival( + event_loop, executor, service_helper, device_id, module_id, watches_events # noqa: F811 +): + yield functools.partial(service_helper.get_next_eventhub_arrival, device_id, module_id) + + +@pytest.fixture(scope="function") +def get_next_reported_patch_arrival( + event_loop, executor, service_helper, device_id, module_id, watches_events # noqa: F811 +): + yield functools.partial(service_helper.get_next_reported_patch_arrival, device_id, module_id) diff --git a/device_e2e/aio/test_c2d.py b/device_e2e/aio/test_c2d.py new file mode 100644 index 000000000..f77c6afe5 --- /dev/null +++ b/device_e2e/aio/test_c2d.py @@ -0,0 +1,40 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import asyncio +import pytest +import logging +import json +from utils import get_random_dict + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + +pytestmark = pytest.mark.asyncio + +# TODO: add tests for various application properties +# TODO: is there a way to call send_c2d so it arrives as an object rather than a JSON string? + + +@pytest.mark.describe("Device Client C2d") +class TestSendMessage(object): + @pytest.mark.it("Can receive C2D") + async def test_send_message(self, client, service_helper, device_id, module_id, event_loop): + message = json.dumps(get_random_dict()) + + received_message = None + received = asyncio.Event() + + async def handle_on_message_received(message): + nonlocal received_message, received + logger.info("received {}".format(message)) + received_message = message + event_loop.call_soon_threadsafe(received.set) + + client.on_message_received = handle_on_message_received + + await service_helper.send_c2d(device_id, module_id, message, {}) + + await asyncio.wait_for(received.wait(), 10) + + assert received_message.data.decode("utf-8") == message diff --git a/device_e2e/aio/test_connect_disconnect.py b/device_e2e/aio/test_connect_disconnect.py new file mode 100644 index 000000000..3d7d9d7af --- /dev/null +++ b/device_e2e/aio/test_connect_disconnect.py @@ -0,0 +1,59 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import asyncio +import pytest +import logging +import test_config + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + +pytestmark = pytest.mark.asyncio + + +@pytest.mark.describe("Device Client") +class TestConnectDisconnect(object): + @pytest.mark.it("Can disconnect and reconnect") + @pytest.mark.parametrize(*test_config.connection_retry_disabled_and_enabled) + @pytest.mark.parametrize(*test_config.auto_connect_off_and_on) + async def test_connect_disconnect(self, brand_new_client): + client = brand_new_client + + assert client + await client.connect() + assert client.connected + + await client.disconnect() + assert not client.connected + + await client.connect() + assert client.connected + + +@pytest.mark.dropped_connection +@pytest.mark.describe("Device Client with dropped connection") +class TestConnectDisconnectDroppedConnection(object): + @pytest.fixture(scope="class") + def extra_client_kwargs(self): + return {"keep_alive": 5} + + @pytest.mark.it("disconnects when network drops all outgoing packets") + async def test_disconnect_on_drop_outgoing(self, client, dropper): + + await client.connect() + assert client.connected + dropper.drop_outgoing() + + while client.connected: + await asyncio.sleep(1) + + @pytest.mark.it("disconnects when network rejects all outgoing packets") + async def test_disconnect_on_reject_outgoing(self, client, dropper): + + await client.connect() + assert client.connected + dropper.reject_outgoing() + + while client.connected: + await asyncio.sleep(1) diff --git a/device_e2e/aio/test_methods.py b/device_e2e/aio/test_methods.py new file mode 100644 index 000000000..d82abfb95 --- /dev/null +++ b/device_e2e/aio/test_methods.py @@ -0,0 +1,94 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import logging +import asyncio +from utils import get_random_dict +from azure.iot.device.iothub import MethodResponse + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + +pytestmark = pytest.mark.asyncio + + +@pytest.fixture +def method_name(): + return "this_is_my_method_name" + + +@pytest.fixture +def method_response_status(): + return 299 + + +@pytest.mark.describe("Device Client methods") +class TestMethods(object): + @pytest.mark.it("Can handle a simple direct method call") + @pytest.mark.parametrize( + "include_request_payload", + [ + pytest.param(True, id="with request payload"), + pytest.param(False, id="without request payload"), + ], + ) + @pytest.mark.parametrize( + "include_response_payload", + [ + pytest.param(True, id="with response payload"), + pytest.param(False, id="without response payload"), + ], + ) + async def test_handle_method_call( + self, + client, + method_name, + device_id, + module_id, + method_response_status, + include_request_payload, + include_response_payload, + service_helper, + ): + actual_request = None + + if include_request_payload: + request_payload = get_random_dict() + else: + request_payload = None + + if include_response_payload: + response_payload = get_random_dict() + else: + response_payload = None + + async def handle_on_method_request_received(request): + nonlocal actual_request + logger.info("Method request for {} received".format(request.name)) + actual_request = request + logger.info("Sending response") + await client.send_method_response( + MethodResponse.create_from_method_request( + request, method_response_status, response_payload + ) + ) + + client.on_method_request_received = handle_on_method_request_received + await asyncio.sleep(1) # wait for subscribe, etc, to complete + + # invoke the method call + method_response = await service_helper.invoke_method( + device_id, module_id, method_name, request_payload + ) + + # verify that the method request arrived correctly + assert actual_request.name == method_name + if request_payload: + assert actual_request.payload == request_payload + else: + assert not actual_request.payload + + # and make sure the response came back successfully + assert method_response.status == method_response_status + assert method_response.payload == response_payload diff --git a/device_e2e/aio/test_pnp_commands.py b/device_e2e/aio/test_pnp_commands.py new file mode 100644 index 000000000..f60fa260d --- /dev/null +++ b/device_e2e/aio/test_pnp_commands.py @@ -0,0 +1,113 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import logging +import asyncio +from utils import get_random_dict +import azure.iot.device.iothub + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + +pytestmark = pytest.mark.asyncio +try: + CommandResponse = azure.iot.device.iothub.CommandResponse +except AttributeError: + # only run if PNP enabled + pytestmark = pytest.mark.skip + + +@pytest.fixture(scope="class") +def extra_client_kwargs(pnp_model_id): + return {"model_id": pnp_model_id} + + +@pytest.mark.pnp +@pytest.mark.describe("Pnp Commands") +class TestPnpCommands(object): + @pytest.mark.it("Can handle a simple command") + @pytest.mark.parametrize( + "include_request_payload", + [ + pytest.param(True, id="with request payload"), + pytest.param(False, id="without request payload"), + ], + ) + @pytest.mark.parametrize( + "include_response_payload", + [ + pytest.param(True, id="with response payload"), + pytest.param(False, id="without response payload"), + ], + ) + @pytest.mark.parametrize( + "include_component_name", + [ + pytest.param(True, id="with component name"), + pytest.param(False, id="without component name"), + ], + ) + async def test_handle_pnp_commmand( + self, + client, + event_loop, + pnp_command_name, + pnp_component_name, + pnp_command_response_status, + include_component_name, + include_request_payload, + include_response_payload, + service_helper, + device_id, + module_id, + ): + actual_request = None + + if include_request_payload: + request_payload = get_random_dict() + else: + request_payload = "" + + if include_response_payload: + response_payload = get_random_dict() + else: + response_payload = None + + async def handle_on_command_request_received(request): + nonlocal actual_request + logger.info( + "command request for component {}, command {} received".format( + request.component_name, request.command_name + ) + ) + actual_request = request + logger.info("Sending response") + await client.send_command_response( + CommandResponse.create_from_command_request( + request, pnp_command_response_status, response_payload + ) + ) + + client.on_command_request_received = handle_on_command_request_received + await asyncio.sleep(1) # wait for subscribe, etc, to complete + + # invoke the command + command_response = await service_helper.invoke_pnp_command( + device_id, module_id, pnp_component_name, pnp_command_name, request_payload + ) + + # verify that the method request arrived correctly + assert actual_request.command_name == pnp_command_name + assert actual_request.component_name == pnp_component_name + + if request_payload: + assert actual_request.payload == request_payload + else: + assert not actual_request.payload + + # and make sure the response came back successfully + # Currently no way to check the command response status code because the DigitalTwinClient A + # object in the service SDK does not return this to the caller. + # assert command_response[Fields.COMMAND_RESPONSE_STATUS_CODE] == command_response_status + assert command_response == response_payload diff --git a/device_e2e/aio/test_pnp_connect.py b/device_e2e/aio/test_pnp_connect.py new file mode 100644 index 000000000..01b8917c2 --- /dev/null +++ b/device_e2e/aio/test_pnp_connect.py @@ -0,0 +1,54 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import logging +import azure.iot.device.iothub + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + +pytestmark = pytest.mark.asyncio +if not getattr(azure.iot.device.iothub, "CommandRequest", None): + # only run if PNP enabled + pytestmark = pytest.mark.skip + + +@pytest.fixture(scope="class") +def pnp_model_id(): + return "dtmi:com:example:TemperatureController;2" + + +@pytest.fixture(scope="class") +def extra_client_kwargs(pnp_model_id): + return {"model_id": pnp_model_id} + + +@pytest.mark.pnp +@pytest.mark.describe("Device Client PNP Connection") +class TestPnpConnect(object): + @pytest.mark.it("Can connect and disconnect with model_id set") + async def test_connect(self, client, pnp_model_id): + assert client._mqtt_pipeline.pipeline_configuration.model_id == pnp_model_id + + assert client + await client.connect() + assert client.connected + + await client.disconnect() + assert not client.connected + + await client.connect() + assert client.connected + + @pytest.mark.it("Shows up as a PNP device in the service client") + async def test_model_id_in_service_helper( + self, client, pnp_model_id, device_id, module_id, service_helper + ): + + assert client._mqtt_pipeline.pipeline_configuration.model_id == pnp_model_id + assert client.connected + + props = await service_helper.get_pnp_properties(device_id, module_id) + + assert props["$metadata"]["$model"] == pnp_model_id diff --git a/device_e2e/aio/test_pnp_properties.py b/device_e2e/aio/test_pnp_properties.py new file mode 100644 index 000000000..bf7f23b53 --- /dev/null +++ b/device_e2e/aio/test_pnp_properties.py @@ -0,0 +1,270 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import logging +import asyncio +import pprint +import azure.iot.device.iothub +from utils import get_random_dict, make_pnp_desired_property_patch + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + +pytestmark = pytest.mark.asyncio +try: + ClientPropertyCollection = azure.iot.device.iothub.ClientPropertyCollection + generate_writable_property_response = ( + azure.iot.device.iothub.generate_writable_property_response + ) +except AttributeError: + # only run if PNP enabled + pytestmark = pytest.mark.skip + + +@pytest.fixture(scope="class") +def extra_client_kwargs(pnp_model_id): + return {"model_id": pnp_model_id} + + +@pytest.mark.pnp +@pytest.mark.parametrize( + "is_component_property", + [pytest.param(True, id="component property"), pytest.param(False, id="root property")], +) +@pytest.mark.describe("Device Client PNP properties") +class TestPnpSetProperties(object): + @pytest.mark.it( + "Can set a reported property value and retrieve it via the service get_digital_twin function" + ) + async def test_set_reported_property( + self, + client, + pnp_read_only_property_name, + pnp_component_name, + is_component_property, + service_helper, + device_id, + module_id, + ): + random_property_value = get_random_dict() + assert client.connected + + patch = ClientPropertyCollection() + if is_component_property: + patch.set_component_property( + pnp_component_name, pnp_read_only_property_name, random_property_value + ) + else: + patch.set_property(pnp_read_only_property_name, random_property_value) + + logger.info("Setting {} to {}".format(pnp_read_only_property_name, random_property_value)) + await client.update_client_properties(patch) + + while True: + properties = await service_helper.get_pnp_properties(device_id, module_id) + + if is_component_property: + actual_value = properties.get(pnp_component_name, {}).get( + pnp_read_only_property_name, None + ) + else: + actual_value = properties.get(pnp_read_only_property_name, None) + + if actual_value == random_property_value: + return + + else: + logger.warning( + "property not matched yet. Expected = {}, actual = {}".format( + random_property_value, actual_value + ) + ) + + logger.warning( + "digital_twin_client.get_digital_twin returned {}".format( + pprint.pformat(properties) + ) + ) + + await asyncio.sleep(5) + + @pytest.mark.it("Can retrieve a reported property via the get_client_properties function") + async def test_get_reported_property( + self, + client, + pnp_read_only_property_name, + pnp_component_name, + is_component_property, + ): + random_property_value = get_random_dict() + assert client.connected + + patch = ClientPropertyCollection() + if is_component_property: + patch.set_component_property( + pnp_component_name, pnp_read_only_property_name, random_property_value + ) + else: + patch.set_property(pnp_read_only_property_name, random_property_value) + + logger.info("Setting {} to {}".format(pnp_read_only_property_name, random_property_value)) + await client.update_client_properties(patch) + + properties = await client.get_client_properties() + + if is_component_property: + assert ( + properties.reported_from_device.get_component_property( + pnp_component_name, pnp_read_only_property_name + ) + == random_property_value + ) + + assert properties.reported_from_device.backing_object[pnp_component_name]["__t"] == "c" + else: + assert ( + properties.reported_from_device.get_property(pnp_read_only_property_name) + == random_property_value + ) + + @pytest.mark.it("Can retrieve a desired property via the get_client_properties function") + async def test_desired_properties_via_get_client_properties( + self, + event_loop, + client, + pnp_component_name, + pnp_writable_property_name, + is_component_property, + service_helper, + device_id, + module_id, + ): + random_property_value = get_random_dict() + received = asyncio.Event() + + async def handle_on_patch_received(patch): + nonlocal received + logger.info("received {}".format(patch)) + event_loop.call_soon_threadsafe(received.set) + + client.on_writable_property_update_request_received = handle_on_patch_received + await asyncio.sleep(1) + + props = make_pnp_desired_property_patch( + pnp_component_name if is_component_property else None, + pnp_writable_property_name, + random_property_value, + ) + await service_helper.update_pnp_properties(device_id, module_id, props) + + # wait for the desired property patch to arrive at the client + # We don't actually check the contents of the patch, but the + # fact that it arrived means the device registry should have + # finished ingesting the patch + await asyncio.wait_for(received.wait(), 10) + logger.info("got it") + + properties = await client.get_client_properties() + if is_component_property: + assert ( + properties.writable_properties_requests.get_component_property( + pnp_component_name, pnp_writable_property_name + ) + == random_property_value + ) + assert ( + properties.writable_properties_requests.backing_object[pnp_component_name]["__t"] + == "c" + ) + else: + assert ( + properties.writable_properties_requests.get_property(pnp_writable_property_name) + == random_property_value + ) + + @pytest.mark.it( + "can receive a desired property patch and corectly respond with a writable_property_response" + ) + async def test_receive_desired_property_patch( + self, + event_loop, + client, + pnp_component_name, + pnp_writable_property_name, + is_component_property, + pnp_ack_code, + pnp_ack_description, + service_helper, + device_id, + module_id, + ): + random_property_value = get_random_dict() + received_patch = None + received = asyncio.Event() + + async def handle_on_patch_received(patch): + nonlocal received_patch, received + logger.info("received {}".format(patch)) + received_patch = patch + event_loop.call_soon_threadsafe(received.set) + + client.on_writable_property_update_request_received = handle_on_patch_received + await asyncio.sleep(1) + + # patch desired properites + props = make_pnp_desired_property_patch( + pnp_component_name if is_component_property else None, + pnp_writable_property_name, + random_property_value, + ) + await service_helper.update_pnp_properties(device_id, module_id, props) + logger.info("patch sent. Waiting for desired proprety") + + # wait for the desired property patch to arrive at the client + await asyncio.wait_for(received.wait(), 10) + logger.info("got it") + + # validate the patch + if is_component_property: + assert ( + received_patch.get_component_property( + pnp_component_name, pnp_writable_property_name + ) + == random_property_value + ) + assert received_patch.backing_object[pnp_component_name]["__t"] == "c" + else: + assert received_patch.get_property(pnp_writable_property_name) == random_property_value + + # make a reported property patch to respond + update_patch = ClientPropertyCollection() + property_response = generate_writable_property_response( + random_property_value, pnp_ack_code, pnp_ack_description, received_patch.version + ) + + if is_component_property: + update_patch.set_component_property( + pnp_component_name, pnp_writable_property_name, property_response + ) + else: + update_patch.set_property(pnp_writable_property_name, property_response) + + # send the reported property patch + await client.update_client_properties(update_patch) + + # verify that the reported value via digital_twin_client.get_digital_twin() + props = await service_helper.get_pnp_properties(device_id, module_id) + if is_component_property: + props = props[pnp_component_name] + + assert props[pnp_writable_property_name] == random_property_value + metadata = props["$metadata"][pnp_writable_property_name] + assert metadata["ackCode"] == pnp_ack_code + assert metadata["ackDescription"] == pnp_ack_description + assert metadata["ackVersion"] == received_patch.version + assert metadata["desiredVersion"] == received_patch.version + assert metadata["desiredValue"] == random_property_value + + +# TODO: etag tests, version tests diff --git a/device_e2e/aio/test_pnp_telemetry.py b/device_e2e/aio/test_pnp_telemetry.py new file mode 100644 index 000000000..7af221500 --- /dev/null +++ b/device_e2e/aio/test_pnp_telemetry.py @@ -0,0 +1,42 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import logging +import pprint +import const +from utils import get_random_dict +import azure.iot.device.iothub + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + +pytestmark = pytest.mark.asyncio +if not getattr(azure.iot.device.iothub, "CommandRequest", None): + # only run if PNP enabled + pytestmark = pytest.mark.skip + + +@pytest.fixture(scope="class") +def extra_client_kwargs(pnp_model_id): + return {"model_id": pnp_model_id} + + +@pytest.mark.pnp +@pytest.mark.describe("Pnp Telemetry") +class TestPnpTelemetry(object): + @pytest.mark.it("Can send a telemetry message") + async def test_send_pnp_telemetry(self, client, pnp_model_id, get_next_eventhub_arrival): + telemetry = get_random_dict() + + await client.send_telemetry(telemetry) + event = await get_next_eventhub_arrival() + + logger.info(pprint.pformat(event)) + + assert event.message_body == telemetry + + system_props = event.system_properties + assert system_props[const.EVENTHUB_SYSPROP_DT_DATASCHEMA] == pnp_model_id + assert system_props[const.EVENTHUB_SYSPROP_CONTENT_TYPE] == const.JSON_CONTENT_TYPE + assert system_props[const.EVENTHUB_SYSPROP_CONTENT_ENCODING] == const.JSON_CONTENT_ENCODING diff --git a/device_e2e/aio/test_sas_renewal.py b/device_e2e/aio/test_sas_renewal.py new file mode 100644 index 000000000..b84d455da --- /dev/null +++ b/device_e2e/aio/test_sas_renewal.py @@ -0,0 +1,80 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import asyncio +import pytest +import json +import logging +import test_config + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + +pytestmark = pytest.mark.asyncio + + +@pytest.mark.describe("Client sas renewal code") +class TestSasRenewalReconnectEnabled(object): + @pytest.fixture(scope="class") + def extra_client_kwargs(self): + # should renew after 10 seconds + return {"sastoken_ttl": 130} + + @pytest.mark.it("Renews and reconnects before expiry") + @pytest.mark.parametrize(*test_config.connection_retry_disabled_and_enabled) + @pytest.mark.parametrize(*test_config.auto_connect_off_and_on) + async def test_sas_renews(self, client, event_loop, get_next_eventhub_arrival, random_message): + + connected_event = asyncio.Event() + disconnected_event = asyncio.Event() + token_at_connect_time = None + + logger.info("connected and ready") + + token_object = client._mqtt_pipeline._pipeline.pipeline_configuration.sastoken + + async def handle_on_connection_state_change(): + nonlocal token_at_connect_time + logger.info("handle_on_connection_state_change: {}".format(client.connected)) + if client.connected: + token_at_connect_time = str(token_object) + logger.info("saving token: {}".format(token_at_connect_time)) + + event_loop.call_soon_threadsafe(connected_event.set) + else: + event_loop.call_soon_threadsafe(disconnected_event.set) + + client.on_connection_state_change = handle_on_connection_state_change + + # setting on_connection_state_change seems to have the side effect of + # calling handle_on_connection_state_change once with the initial value. + # Wait for one disconnect/reconnect cycle so we can get past it. + await connected_event.wait() + + # OK, we're ready to test. wait for the renewal + token_before_connect = str(token_object) + + disconnected_event.clear() + connected_event.clear() + + logger.info("Waiting for client to disconnect") + await disconnected_event.wait() + logger.info("Waiting for client to reconnect") + await connected_event.wait() + logger.info("Client reconnected") + + # Finally verify that our token changed. + logger.info("token now = {}".format(str(token_object))) + logger.info("token at_connect = {}".format(str(token_at_connect_time))) + logger.info("token before_connect = {}".format(str(token_before_connect))) + + assert str(token_object) == token_at_connect_time + assert not token_before_connect == token_at_connect_time + + # and verify that we can send + await client.send_message(random_message) + + # and verify that the message arrived at the service + # TODO incoming_event_queue.get should check thread future + event = await get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data diff --git a/device_e2e/aio/test_send_message.py b/device_e2e/aio/test_send_message.py new file mode 100644 index 000000000..ab945cfcf --- /dev/null +++ b/device_e2e/aio/test_send_message.py @@ -0,0 +1,181 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import asyncio +import pytest +import logging +import json +from azure.iot.device.exceptions import OperationCancelled + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + +pytestmark = pytest.mark.asyncio + + +@pytest.mark.describe("Device Client send_message method") +class TestSendMessage(object): + @pytest.mark.it("Can send a simple message") + async def test_send_message(self, client, random_message, get_next_eventhub_arrival): + + await client.send_message(random_message) + + event = await get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + @pytest.mark.it("Connects the transport if necessary") + async def test_connect_if_necessary(self, client, random_message, get_next_eventhub_arrival): + + await client.disconnect() + assert not client.connected + + await client.send_message(random_message) + assert client.connected + + event = await get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + +@pytest.mark.dropped_connection +@pytest.mark.describe("Device Client send_message method with dropped connections") +class TestSendMessageDroppedConnection(object): + @pytest.fixture(scope="class") + def extra_client_kwargs(self): + return {"keep_alive": 5} + + @pytest.mark.it("Sends if connection drops before sending") + async def test_sends_if_drop_before_sending( + self, client, random_message, dropper, get_next_eventhub_arrival + ): + + assert client.connected + + dropper.drop_outgoing() + send_task = asyncio.create_task(client.send_message(random_message)) + + while client.connected: + await asyncio.sleep(1) + + assert not send_task.done() + + dropper.restore_all() + while not client.connected: + await asyncio.sleep(1) + + await send_task + + event = await get_next_eventhub_arrival() + + logger.info("sent from device= {}".format(random_message.data)) + logger.info("received at eventhub = {}".format(event.message_body)) + + assert json.dumps(event.message_body) == random_message.data + + logger.info("Success") + + @pytest.mark.it("Sends if connection rejects send") + async def test_sends_if_reject_before_sending( + self, client, random_message, dropper, get_next_eventhub_arrival + ): + + assert client.connected + + dropper.reject_outgoing() + send_task = asyncio.create_task(client.send_message(random_message)) + + while client.connected: + await asyncio.sleep(1) + + assert not send_task.done() + + dropper.restore_all() + while not client.connected: + await asyncio.sleep(1) + + await send_task + + event = await get_next_eventhub_arrival() + + logger.info("sent from device= {}".format(random_message.data)) + logger.info("received at eventhub = {}".format(event.message_body)) + + assert json.dumps(event.message_body) == random_message.data + + logger.info("Success") + + +@pytest.mark.describe("Device Client send_message with reconnect disabled") +class TestSendMessageRetryDisabled(object): + @pytest.fixture(scope="class") + def extra_client_kwargs(self): + return {"keep_alive": 5, "connection_retry": False} + + @pytest.fixture(scope="function", autouse=True) + async def reconnect_after_test(self, dropper, client): + yield + dropper.restore_all() + await client.connect() + assert client.connected + + @pytest.mark.it("Can send a simple message") + async def test_send_message(self, client, random_message, get_next_eventhub_arrival): + await client.send_message(random_message) + + event = await get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + @pytest.mark.it("Automatically connects if transport manually disconnected before sending") + async def test_connect_if_necessary(self, client, random_message, get_next_eventhub_arrival): + + await client.disconnect() + assert not client.connected + + await client.send_message(random_message) + assert client.connected + + event = await get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + @pytest.mark.it("Automatically connects if transport automatically disconnected before sending") + async def test_connects_after_automatic_disconnect( + self, client, random_message, dropper, get_next_eventhub_arrival + ): + + assert client.connected + + dropper.drop_outgoing() + while client.connected: + await asyncio.sleep(1) + + assert not client.connected + dropper.restore_all() + await client.send_message(random_message) + assert client.connected + + event = await get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + @pytest.mark.it("Fails if connection disconnects before sending") + async def test_fails_if_disconnect_before_sending(self, client, random_message, dropper): + + assert client.connected + + dropper.drop_outgoing() + send_task = asyncio.create_task(client.send_message(random_message)) + + while client.connected: + await asyncio.sleep(1) + + with pytest.raises(OperationCancelled): + await send_task + + @pytest.mark.it("Fails if connection drops before sending") + async def test_fails_if_drop_before_sending(self, client, random_message, dropper): + + assert client.connected + + dropper.drop_outgoing() + with pytest.raises(OperationCancelled): + await client.send_message(random_message) + + assert not client.connected diff --git a/device_e2e/aio/test_twin.py b/device_e2e/aio/test_twin.py new file mode 100644 index 000000000..cbcb0672c --- /dev/null +++ b/device_e2e/aio/test_twin.py @@ -0,0 +1,173 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import asyncio +import pytest +import logging +import const +from utils import get_random_dict + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + +pytestmark = pytest.mark.asyncio + + +# TODO: tests with drop_incoming and reject_incoming + +reset_reported_props = {const.TEST_CONTENT: None} + + +@pytest.mark.describe("Device Client Reported Properties") +class TestReportedProperties(object): + @pytest.mark.it("Can set a simple reported property") + async def test_simple_patch(self, client, reported_props, get_next_reported_patch_arrival): + + # patch properties + await client.patch_twin_reported_properties(reported_props) + + # wait for patch to arrive at service and verify + received_patch = await get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + ) + + # get twin from the service and verify content + twin = await client.get_twin() + assert twin[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + + @pytest.mark.it("Can clear a reported property") + async def test_clear_property(self, client, reported_props, get_next_reported_patch_arrival): + + # patch properties and verify that the service received the patch + await client.patch_twin_reported_properties(reported_props) + received_patch = await get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + ) + + # send a patch clearing properties and verify that the service received that patch + await client.patch_twin_reported_properties(reset_reported_props) + received_patch = await get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] + == reset_reported_props[const.TEST_CONTENT] + ) + + # get the twin and verify that the properties are no longer part of the twin + twin = await client.get_twin() + assert const.TEST_CONTENT not in twin[const.REPORTED] + + @pytest.mark.it("Connects the transport if necessary") + async def test_connect_if_necessary( + self, client, reported_props, get_next_reported_patch_arrival + ): + + await client.disconnect() + + assert not client.connected + await client.patch_twin_reported_properties(reported_props) + assert client.connected + + received_patch = await get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + ) + + twin = await client.get_twin() + assert twin[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + + +@pytest.mark.dropped_connection +@pytest.mark.describe("Device Client Reported Properties with dropped connection") +class TestReportedPropertiesDroppedConnection(object): + @pytest.fixture(scope="class") + def extra_client_kwargs(self): + return {"keep_alive": 5} + + # TODO: split drop tests between first and second patches + + @pytest.mark.it("Sends if connection drops before sending") + async def test_sends_if_drop_before_sending( + self, client, reported_props, dropper, get_next_reported_patch_arrival + ): + + assert client.connected + dropper.drop_outgoing() + + send_task = asyncio.create_task(client.patch_twin_reported_properties(reported_props)) + while client.connected: + await asyncio.sleep(1) + + assert not send_task.done() + + dropper.restore_all() + while not client.connected: + await asyncio.sleep(1) + + await send_task + + received_patch = await get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + ) + + @pytest.mark.it("Sends if connection rejects send") + async def test_sends_if_reject_before_sending( + self, client, reported_props, dropper, get_next_reported_patch_arrival + ): + + assert client.connected + dropper.reject_outgoing() + + send_task = asyncio.create_task(client.patch_twin_reported_properties(reported_props)) + while client.connected: + await asyncio.sleep(1) + + assert not send_task.done() + + dropper.restore_all() + while not client.connected: + await asyncio.sleep(1) + + await send_task + + received_patch = await get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + ) + + +@pytest.mark.describe("Device Client Desired Properties") +class TestDesiredProperties(object): + @pytest.mark.it("Receives a patch for a simple desired property") + async def test_simple_patch(self, client, event_loop, service_helper, device_id, module_id): + + received_patch = None + received = asyncio.Event() + + async def handle_on_patch_received(patch): + nonlocal received_patch, received + print("received {}".format(patch)) + received_patch = patch + event_loop.call_soon_threadsafe(received.set) + + client.on_twin_desired_properties_patch_received = handle_on_patch_received + + random_dict = get_random_dict() + await service_helper.set_desired_properties( + device_id, + module_id, + {const.TEST_CONTENT: random_dict}, + ) + + await asyncio.wait_for(received.wait(), 10) + logger.info("got it") + + assert received_patch[const.TEST_CONTENT] == random_dict + + twin = await client.get_twin() + assert twin[const.DESIRED][const.TEST_CONTENT] == random_dict + + +# TODO: etag tests, version tests diff --git a/device_e2e/client_fixtures.py b/device_e2e/client_fixtures.py new file mode 100644 index 000000000..86ca32eb7 --- /dev/null +++ b/device_e2e/client_fixtures.py @@ -0,0 +1,81 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import copy +import collections +import json +import functools +import time +import sys +import const +import test_config +from azure.iot.device.iothub import Message +from utils import get_random_message, get_random_dict +from azure.iot.device.iothub import IoTHubDeviceClient + + +@pytest.fixture(scope="function") +def device_id(brand_new_client): + # TODO: suggest adding device_id and module_id to client object + return brand_new_client._mqtt_pipeline._pipeline.pipeline_configuration.device_id + + +@pytest.fixture(scope="function") +def module_id(brand_new_client): + return brand_new_client._mqtt_pipeline._pipeline.pipeline_configuration.module_id + + +@pytest.fixture(scope="function") +def reported_props(): + return {const.TEST_CONTENT: get_random_dict()} + + +@pytest.fixture(scope="function") +def watches_events(service_helper, device_id, module_id): + service_helper.start_watching(device_id, module_id) + yield + service_helper.stop_watching(device_id, module_id) + + +@pytest.fixture(scope="function") +def random_message(): + return get_random_message() + + +@pytest.fixture(scope="function") +def connection_retry(): + return True + + +@pytest.fixture(scope="function") +def auto_connect(): + return True + + +@pytest.fixture(scope="function") +def websockets(): + return test_config.config.transport == test_config.TRANSPORT_MQTT_WS + + +@pytest.fixture(scope="function") +def extra_client_kwargs(): + return {} + + +@pytest.fixture(scope="function") +def client_kwargs(extra_client_kwargs, auto_connect, connection_retry, websockets): + kwargs = {} + kwargs["auto_connect"] = auto_connect + kwargs["connection_retry"] = connection_retry + kwargs["websockets"] = websockets + for key, value in extra_client_kwargs.items(): + kwargs[key] = value + return kwargs + + +collect_ignore = [] + +# Ignore Async tests if below Python 3.5 +if sys.version_info < (3, 5): + collect_ignore.append("aio") diff --git a/device_e2e/conftest.py b/device_e2e/conftest.py new file mode 100644 index 000000000..74f185b2b --- /dev/null +++ b/device_e2e/conftest.py @@ -0,0 +1,78 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import logging +import concurrent.futures +import test_config + +# noqa: F401 defined in .flake8 file in root of repo + +from drop_fixtures import dropper +from pnp_fixtures import ( + pnp_model_id, + pnp_command_name, + pnp_component_name, + pnp_command_response_status, + pnp_writable_property_name, + pnp_read_only_property_name, + pnp_ack_code, + pnp_ack_description, +) +from client_fixtures import ( + client_kwargs, + extra_client_kwargs, + auto_connect, + connection_retry, + websockets, + device_id, + module_id, + reported_props, + watches_events, + random_message, +) + +logging.basicConfig(level=logging.WARNING) +logging.getLogger("e2e").setLevel(level=logging.DEBUG) +logging.getLogger("paho").setLevel(level=logging.DEBUG) +logging.getLogger("azure.iot").setLevel(level=logging.DEBUG) + + +@pytest.fixture(scope="module") +def transport(): + return test_config.config.transport + + +@pytest.fixture(scope="module") +def executor(): + return concurrent.futures.ThreadPoolExecutor() + + +def pytest_addoption(parser): + parser.addoption( + "--transport", + help="Transport to use for tests", + type=str, + choices=test_config.TRANSPORT_CHOICES, + default=test_config.TRANSPORT_MQTT, + ) + parser.addoption( + "--auth", + help="Auth to use for tests", + type=str, + choices=test_config.AUTH_CHOICES, + default=test_config.AUTH_CONNECTION_STRING, + ) + parser.addoption( + "--identity", + help="Identity (client type) to use for tests", + type=str, + choices=test_config.IDENTITY_CHOICES, + default=test_config.IDENTITY_DEVICE_CLIENT, + ) + + +def pytest_configure(config): + test_config.config.transport = config.getoption("transport") + test_config.config.auth = config.getoption("auth") + test_config.config.identity = config.getoption("identity") diff --git a/device_e2e/const.py b/device_e2e/const.py new file mode 100644 index 000000000..cd89355be --- /dev/null +++ b/device_e2e/const.py @@ -0,0 +1,15 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. + +JSON_CONTENT_TYPE = "application/json" +JSON_CONTENT_ENCODING = "utf-8" + +TEST_CONTENT = "testContent" +REPORTED = "reported" +DESIRED = "desired" + +# properties of the eventhub message +EVENTHUB_SYSPROP_CONTENT_TYPE = "content-type" +EVENTHUB_SYSPROP_CONTENT_ENCODING = "content-encoding" +EVENTHUB_SYSPROP_DT_DATASCHEMA = "dt-dataschema" diff --git a/device_e2e/drop.py b/device_e2e/drop.py new file mode 100644 index 000000000..ba1f173c6 --- /dev/null +++ b/device_e2e/drop.py @@ -0,0 +1,97 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for +# full license information. +import logging +import subprocess + +logger = logging.getLogger("e2e.{}".format(__name__)) + +mqtt_port = 8883 +mqttws_port = 443 +uninitialized = "uninitialized" +sudo_prefix = uninitialized +all_disconnect_types = ["DROP", "REJECT"] +all_transports = ["mqtt", "mqttws"] + + +def get_sudo_prefix(): + """ + Get the prefix for running sudo commands. If the sudo binary doens't exist, then + we assume that we're running in a container or somewhere else where we don't + need to use sudo to elevate our process. + """ + global sudo_prefix + + # use "uninitialized" to mean uninitialized, because None and [] are both falsy and we want to set it to [], so we can't use None + if sudo_prefix == uninitialized: + try: + run_shell_command("which sudo") + except subprocess.CalledProcessError: + sudo_prefix = "" + else: + sudo_prefix = "sudo -n " + + return sudo_prefix + + +def run_shell_command(cmd): + """ + Run a shell command and raise an exception on error + """ + logger.info("running [{}]".format(cmd)) + try: + return subprocess.check_output(cmd.split(" ")).decode("utf-8").splitlines() + except subprocess.CalledProcessError as e: + logger.error("Error spawning {}".format(e.cmd)) + logger.error("Process returned {}".format(e.returncode)) + logger.error("process output: {}".format(e.output)) + raise + + +def transport_to_port(transport): + """ + Given a transport, return the port that the transport uses. + """ + if transport == "mqtt": + return mqtt_port + elif transport == "mqttws": + return mqttws_port + else: + raise ValueError( + "transport_type {} invalid. Only mqtt and mqttws are accepted".format(transport) + ) + + +def disconnect_port(disconnect_type, transport): + """ + Disconnect the port for a given transport. disconnect_type can either be "DROP" to drop + packets sent to that port, or it can be "REJECT" to reject packets sent to that port. + """ + # sudo -n iptables -A OUTPUT -p tcp --dport 8883 -j DROP + port = transport_to_port(transport) + run_shell_command( + "{}iptables -A OUTPUT -p tcp --dport {} -j {}".format( + get_sudo_prefix(), port, disconnect_type + ) + ) + + +def reconnect_all(transport): + """ + Reconnect all disconnects for all ports used by all transports. Effectively, clean up + anyting that this module may have done. + """ + port = transport_to_port(transport) + for disconnect_type in all_disconnect_types: + # sudo -n iptables -L OUTPUT -n -v --line-numbers + lines = run_shell_command( + "{}iptables -L OUTPUT -n -v --line-numbers".format(get_sudo_prefix()) + ) + # do the lines in reverse because deleting an entry changes the line numbers of all entries after that. + lines.reverse() + for line in lines: + if disconnect_type in line and str(port) in line: + line_number = line.split(" ")[0] + logger.info("Removing {} from [{}]".format(line_number, line)) + # sudo -n iptables -D OUTPUT 1 + run_shell_command("{}iptables -D OUTPUT {}".format(get_sudo_prefix(), line_number)) diff --git a/device_e2e/drop_fixtures.py b/device_e2e/drop_fixtures.py new file mode 100644 index 000000000..2a37397cc --- /dev/null +++ b/device_e2e/drop_fixtures.py @@ -0,0 +1,32 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import drop +import logging + +logger = logging.getLogger(__name__) + +drop.reconnect_all("mqtt") + + +class Dropper(object): + def __init__(self, transport): + self.transport = transport + + def drop_outgoing(self): + drop.disconnect_port("DROP", self.transport) + + def reject_outgoing(self): + drop.disconnect_port("REJECT", self.transport) + + def restore_all(self): + drop.reconnect_all(self.transport) + + +@pytest.fixture(scope="function") +def dropper(transport): + dropper = Dropper(transport) + yield dropper + logger.info("restoring all") + dropper.restore_all() diff --git a/device_e2e/e2e_settings.py b/device_e2e/e2e_settings.py new file mode 100644 index 000000000..419a20ca1 --- /dev/null +++ b/device_e2e/e2e_settings.py @@ -0,0 +1,51 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for +# full license information. +import six +import os +import json + +if six.PY2: + FileNotFoundError = IOError + +secrets = None + +this_file_path = os.path.dirname(os.path.realpath(__file__)) +test_path = this_file_path +while secrets is None: + filename = os.path.join(test_path, "_e2e_settings.json") + try: + with open(filename, "r") as f: + secrets = json.load(f) + print("settings loaded from {}".format(filename)) + except FileNotFoundError: + new_test_path = os.path.dirname(test_path) + if new_test_path == test_path: + raise Exception("_e2e_settings.json not found in {} or parent".format(this_file_path)) + test_path = new_test_path + +# Device ID used when running tests +DEVICE_ID = secrets.get("deviceId", None) + +# Connection string for the iothub instance +IOTHUB_CONNECTION_STRING = secrets.get("iothubConnectionString", None) + +# Connection string for the eventhub instance +EVENTHUB_CONNECTION_STRING = secrets.get("eventhubConnectionString", None) + +# Consumer group used when monitoring eventhub events +EVENTHUB_CONSUMER_GROUP = secrets.get("eventhubConsumerGroup", None) + +# Name of iothub. Probably DNS name for the hub without the azure-devices.net suffix +IOTHUB_NAME = secrets.get("iothubName", None) + +# Connection string for device under test +DEVICE_CONNECTION_STRING = secrets.get("deviceConnectionString", None) + +# Set default values +if not EVENTHUB_CONSUMER_GROUP: + EVENTHUB_CONSUMER_GROUP = "$default" + +del secrets +del this_file_path +del test_path diff --git a/device_e2e/pnp_fixtures.py b/device_e2e/pnp_fixtures.py new file mode 100644 index 000000000..539bd6dcc --- /dev/null +++ b/device_e2e/pnp_fixtures.py @@ -0,0 +1,44 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest + + +@pytest.fixture(scope="session") +def pnp_model_id(): + return "dtmi:com:example:TemperatureController;2" + + +@pytest.fixture +def pnp_command_name(): + return "this_is_my_command_name" + + +@pytest.fixture +def pnp_component_name(): + return "this_is_my_component_name" + + +@pytest.fixture +def pnp_command_response_status(): + return 299 + + +@pytest.fixture +def pnp_writable_property_name(): + return "writable_property_2" + + +@pytest.fixture +def pnp_read_only_property_name(): + return "read_only_property" + + +@pytest.fixture +def pnp_ack_code(): + return 266 + + +@pytest.fixture +def pnp_ack_description(): + return "this is an ack description" diff --git a/device_e2e/pytest.ini b/device_e2e/pytest.ini new file mode 100644 index 000000000..46179ffd8 --- /dev/null +++ b/device_e2e/pytest.ini @@ -0,0 +1,11 @@ +[pytest] +timeout=180 +testdox_format=plaintext +addopts= + --testdox + --strict-markers + -m "not dropped_connection and not pnp" +norecursedirs=__pycache__, *.egg-info +markers= + pnp: includes tests for PNP functions + dropped_connection: includes tests that simplate dropped network connections (much slower) diff --git a/device_e2e/service_helper.py b/device_e2e/service_helper.py new file mode 100644 index 000000000..97ee421e9 --- /dev/null +++ b/device_e2e/service_helper.py @@ -0,0 +1,124 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for +# full license information. +from service_helper_sync import ServiceHelperSync + + +class ServiceHelper: + def __init__(self, event_loop, executor): + self.event_loop = event_loop + self.executor = executor + self.inner_object = ServiceHelperSync() + + @property + def device_id(self): + return self.inner_object.device_id + + @property + def module_id(self): + return self.inner_object.module_id + + def start_watching(self, device_id, module_id): + return self.inner_object.start_watching(device_id, module_id) + + def stop_watching(self, device_id, module_id): + return self.inner_object.stop_watching(device_id, module_id) + + async def get_next_incoming_event(self, device_id, module_id, block=True, timeout=None): + return await self.event_loop.run_in_executor( + self.executor, + self.inner_object.get_next_incoming_event, + device_id, + module_id, + block, + timeout, + ) + + async def set_desired_properties(self, device_id, module_id, desired_props): + return await self.event_loop.run_in_executor( + self.executor, + self.inner_object.set_desired_properties, + device_id, + module_id, + desired_props, + ) + + async def invoke_method( + self, + device_id, + module_id, + method_name, + payload, + connect_timeout_in_seconds=None, + response_timeout_in_seconds=None, + ): + return await self.event_loop.run_in_executor( + self.executor, + self.inner_object.invoke_method, + device_id, + module_id, + method_name, + payload, + connect_timeout_in_seconds, + response_timeout_in_seconds, + ) + + async def invoke_pnp_command( + self, + device_id, + module_id, + component_name, + command_name, + payload, + connect_timeout_in_seconds=None, + response_timeout_in_seconds=None, + ): + return await self.event_loop.run_in_executor( + self.executor, + self.inner_object.invoke_pnp_command, + device_id, + module_id, + component_name, + command_name, + payload, + connect_timeout_in_seconds, + response_timeout_in_seconds, + ) + + async def get_pnp_properties(self, device_id, module_id): + return await self.event_loop.run_in_executor( + self.executor, self.inner_object.get_pnp_properties, device_id, module_id + ) + + async def update_pnp_properties(self, device_id, module_id, properties): + return await self.event_loop.run_in_executor( + self.executor, self.inner_object.update_pnp_properties, device_id, module_id, properties + ) + + async def send_c2d(self, device_id, module_id, payload, properties): + return await self.event_loop.run_in_executor( + self.executor, self.inner_object.send_c2d, device_id, module_id, payload, properties + ) + + async def get_next_eventhub_arrival(self, device_id, module_id, block=True, timeout=None): + return await self.event_loop.run_in_executor( + self.executor, + self.inner_object.get_next_eventhub_arrival, + device_id, + module_id, + block, + timeout, + ) + + async def get_next_reported_patch_arrival(self, device_id, module_id, block=True, timeout=None): + return await self.event_loop.run_in_executor( + self.executor, + self.inner_object.get_next_reported_patch_arrival, + device_id, + module_id, + block, + timeout, + ) + + async def shutdown(self): + return await self.event_loop.run_in_executor(self.executor, self.inner_object.shutdown) diff --git a/device_e2e/service_helper_sync.py b/device_e2e/service_helper_sync.py new file mode 100644 index 000000000..164c355b6 --- /dev/null +++ b/device_e2e/service_helper_sync.py @@ -0,0 +1,319 @@ +# Copyright (c) Microsoft. All rights reserved. +# Licensed under the MIT license. See LICENSE file in the project root for +# full license information. +import logging +import threading +from six.moves import queue +import copy +from concurrent.futures import ThreadPoolExecutor +from azure.iot.hub import IoTHubRegistryManager, DigitalTwinClient +from azure.iot.hub.protocol.models import Twin, TwinProperties, CloudToDeviceMethod +from azure.eventhub import EventHubConsumerClient +import e2e_settings + +logger = logging.getLogger("e2e.{}".format(__name__)) + +iothub_connection_string = e2e_settings.IOTHUB_CONNECTION_STRING +iothub_name = e2e_settings.IOTHUB_NAME +eventhub_connection_string = e2e_settings.EVENTHUB_CONNECTION_STRING +eventhub_consumer_group = e2e_settings.EVENTHUB_CONSUMER_GROUP + +assert iothub_connection_string +assert iothub_name +assert eventhub_connection_string +assert eventhub_consumer_group + + +def convert_binary_dict_to_string_dict(src): + def binary_to_string(x): + if isinstance(x, bytes): + return x.decode("utf-8") + else: + return x + + if src: + dest = {} + for key, value in src.items(): + dest[binary_to_string(key)] = binary_to_string(value) + return dest + else: + return src + + +def get_device_id_from_event(event): + """ + Helper function to get the device_id from an EventHub message + """ + return event.message.annotations["iothub-connection-device-id".encode()].decode() + + +def get_message_source_from_event(event): + """ + Helper function to get the message source from an EventHub message + """ + return event.message.annotations["iothub-message-source".encode()].decode() + + +class C2dMessage(object): + def __init__(self): + self.device_id = None + self.module_id = None + self.message_body = None + self.content_type = None + self.system_properties = None + self.properties = None + + +class PerClientData(object): + """ + Object that holds data that needs to be stored in a device-by-device basis + """ + + def __init__(self, device_id, module_id): + self.device_id = device_id + self.module_id = module_id + self.incoming_event_queue = queue.Queue() + self.incoming_patch_queue = queue.Queue() + + +class ClientList(object): + """ + Thread-safe object for holding a dictionary of PerDeviceData objects. + """ + + def __init__(self): + self.lock = threading.Lock() + self.values = {} + + def add(self, device_id, module_id, value): + """ + Add a new object to the dict + """ + key = self.get_key(device_id, module_id) + with self.lock: + self.values[key] = value + + def remove(self, device_id, module_id): + """ + remove an object from the dict + """ + key = self.get_key(device_id, module_id) + with self.lock: + if key in self.values: + del self.values[key] + + def try_get(self, device_id, module_id): + """ + Try to get an object from the dict, returning None if the object doesn't exist + """ + key = self.get_key(device_id, module_id) + with self.lock: + return self.values.get(key, None) + + def get_or_create(self, device_id, module_id): + key = self.get_key(device_id, module_id) + with self.lock: + if key in self.values: + return self.values.get(key) + else: + value = PerClientData(device_id, module_id) + self.values[key] = value + return value + + def get_key(self, device_id, module_id): + return "{}%{}".format(device_id, module_id) + + def get_keys(self): + """ + Get a list of keys for the objects in the dict + """ + with self.lock: + return list(self.values.keys()) + + def get_incoming_event_queue(self, device_id, module_id): + client_data = self.try_get(device_id, module_id) + if client_data: + return client_data.incoming_event_queue + + def get_incoming_patch_queue(self, device_id, module_id): + client_data = self.try_get(device_id, module_id) + if client_data: + return client_data.incoming_patch_queue + + +class ServiceHelperSync(object): + def __init__(self): + self._client_list = ClientList() + self._executor = ThreadPoolExecutor() + + self._registry_manager = IoTHubRegistryManager(iothub_connection_string) + + self._digital_twin_client = DigitalTwinClient.from_connection_string( + iothub_connection_string + ) + + self._eventhub_consumer_client = EventHubConsumerClient.from_connection_string( + eventhub_connection_string, consumer_group=eventhub_consumer_group + ) + + self._eventhub_future = self._executor.submit(self._eventhub_thread) + + def start_watching(self, device_id, module_id): + self._client_list.get_or_create(device_id, module_id) + + def stop_watching(self, device_id, module_id): + self._client_list.remove(device_id, module_id) + + def get_next_incoming_event(self, device_id, module_id, block=True, timeout=None): + return self._client_list.get_incoming_event_queue.get(block=block, timeout=timeout) + + def set_desired_properties(self, device_id, module_id, desired_props): + if module_id: + self._registry_manager.update_module_twin( + device_id, module_id, Twin(properties=TwinProperties(desired=desired_props)), "*" + ) + else: + self._registry_manager.update_twin( + device_id, Twin(properties=TwinProperties(desired=desired_props)), "*" + ) + + def invoke_method( + self, + device_id, + module_id, + method_name, + payload, + connect_timeout_in_seconds=None, + response_timeout_in_seconds=None, + ): + request = CloudToDeviceMethod( + method_name=method_name, + payload=payload, + response_timeout_in_seconds=response_timeout_in_seconds, + connect_timeout_in_seconds=connect_timeout_in_seconds, + ) + + if module_id: + response = self._registry_manager.invoke_device_module_method( + device_id, module_id, request + ) + + else: + response = self._registry_manager.invoke_device_method(device_id, request) + + return response + + def invoke_pnp_command( + self, + device_id, + module_id, + component_name, + command_name, + payload, + connect_timeout_in_seconds=None, + response_timeout_in_seconds=None, + ): + assert not module_id # TODO + if component_name: + return self._digital_twin_client.invoke_component_command( + device_id, + component_name, + command_name, + payload, + connect_timeout_in_seconds, + response_timeout_in_seconds, + ) + else: + return self._digital_twin_client.invoke_command( + device_id, + command_name, + payload, + connect_timeout_in_seconds, + response_timeout_in_seconds, + ) + + def get_pnp_properties(self, device_id, module_id): + assert not module_id # TODO + return self._digital_twin_client.get_digital_twin(device_id) + + def update_pnp_properties(self, device_id, module_id, properties): + assert not module_id # TODO + return self._digital_twin_client.update_digital_twin(device_id, properties) + + def send_c2d(self, device_id, module_id, payload, properties): + assert not module_id # TODO + self._registry_manager.send_c2d_message(device_id, payload, properties) + + def get_next_eventhub_arrival(self, device_id, module_id, block=True, timeout=None): + return self._client_list.get_incoming_event_queue(device_id, module_id).get( + block=block, timeout=timeout + ) + + def get_next_reported_patch_arrival(self, device_id, module_id, block=True, timeout=None): + return self._client_list.get_incoming_patch_queue(device_id, module_id).get( + block=block, timeout=timeout + ) + + def shutdown(self): + if self._eventhub_consumer_client: + self._eventhub_consumer_client.close() + + def _convert_incoming_event(self, event): + event_body = event.body_as_json() + device_id = get_device_id_from_event(event) + module_id = None # TODO: extract module_id + + if get_message_source_from_event(event) == "twinChangeEvents": + return copy.deepcopy(event_body.get("properties", {})) + + else: + message = C2dMessage() + message.device_id = device_id + message.module_id = module_id + message.message_body = event_body + message.content_type = event.message.properties.content_type.decode("utf-8") + message.system_properties = convert_binary_dict_to_string_dict(event.system_properties) + message.properties = convert_binary_dict_to_string_dict(event.properties) + return message + + def _eventhub_thread(self): + def on_error(partition_context, error): + logger.error("EventHub on_error: {}".format(str(error) or type(error))) + + def on_partition_initialize(partition_context): + logger.warning("EventHub on_partition_initialize") + + def on_partition_close(partition_context, reason): + # commented out because it causes ugly warning spew on shutdown + # logger.warning("EventHub on_partition_close: {}".format(reason)) + pass + + def on_event(partition_context, event): + if event: + device_id = get_device_id_from_event(event) + module_id = None # TODO: extract module_id + if get_message_source_from_event(event) == "twinChangeEvents": + queue = self._client_list.get_incoming_patch_queue(device_id, module_id) + else: + queue = self._client_list.get_incoming_event_queue(device_id, module_id) + if queue: + logger.info( + "Received {} for device {}, module {}".format( + get_message_source_from_event(event), device_id, module_id + ) + ) + queue.put(self._convert_incoming_event(event)) + + try: + with self._eventhub_consumer_client: + logger.info("Starting EventHub receive") + self._eventhub_consumer_client.receive( + on_event, + on_error=on_error, + on_partition_initialize=on_partition_initialize, + on_partition_close=on_partition_close, + max_wait_time=3600, + ) + except Exception: + logger.error("_eventhub_thread exception", exc_info=True) + raise diff --git a/device_e2e/sync/conftest.py b/device_e2e/sync/conftest.py new file mode 100644 index 000000000..080df2be0 --- /dev/null +++ b/device_e2e/sync/conftest.py @@ -0,0 +1,73 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import functools +import time +import e2e_settings +import test_config +import logging +from service_helper_sync import ServiceHelperSync +from azure.iot.device.iothub import IoTHubDeviceClient, IoTHubModuleClient + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + + +@pytest.fixture(scope="function") +def brand_new_client(client_kwargs): + client = None + + if test_config.config.identity == test_config.IDENTITY_DEVICE_CLIENT: + ClientClass = IoTHubDeviceClient + elif test_config.config.identity == test_config.IDENTITY_MODULE_CLIENT: + ClientClass = IoTHubModuleClient + else: + raise Exception("config.identity invalid") + + if test_config.config.auth == test_config.AUTH_CONNECTION_STRING: + # TODO: This is currently using a connection string stored in _e2e_settings.xml. This will move to be a dynamically created identity similar to the way node's device_identity_helper.js works. + logger.info( + "Creating {} using create_from_connection_string with kwargs={}".format( + ClientClass, client_kwargs + ) + ) + client = ClientClass.create_from_connection_string( + e2e_settings.DEVICE_CONNECTION_STRING, **client_kwargs + ) + elif test_config.config.auth == test_config.X509: + # need to implement + raise Exception("X509 Auth not yet implemented") + else: + raise Exception("config.auth invalid") + + yield client + + client.shutdown() + + +@pytest.fixture(scope="function") +def client(brand_new_client): + client = brand_new_client + + client.connect() + + yield client + + +@pytest.fixture(scope="module") +def service_helper(): + service_helper = ServiceHelperSync() + time.sleep(1) + yield service_helper + service_helper.shutdown() + + +@pytest.fixture(scope="function") +def get_next_eventhub_arrival(service_helper, device_id, module_id, watches_events): + yield functools.partial(service_helper.get_next_eventhub_arrival, device_id, module_id) + + +@pytest.fixture(scope="function") +def get_next_reported_patch_arrival(executor, service_helper, device_id, module_id, watches_events): + yield functools.partial(service_helper.get_next_reported_patch_arrival, device_id, module_id) diff --git a/device_e2e/sync/test_sync_c2d.py b/device_e2e/sync/test_sync_c2d.py new file mode 100644 index 000000000..3634abcec --- /dev/null +++ b/device_e2e/sync/test_sync_c2d.py @@ -0,0 +1,39 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import logging +import json +import threading +from utils import get_random_dict + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + +# TODO: add tests for various application properties +# TODO: is there a way to call send_c2d so it arrives as an object rather than a JSON string? + + +@pytest.mark.describe("Device Client C2d") +class TestSendMessage(object): + @pytest.mark.it("Can receive C2D") + def test_send_message(self, client, service_helper, device_id, module_id): + message = json.dumps(get_random_dict()) + + received = threading.Event() + + # hack needed because there is no `nonlocal` keyword in py27. + nonlocal_py27_hack = {"received_msg": None, "received": received} + + def handle_on_message_received(message): + logger.info("received {}".format(message)) + nonlocal_py27_hack["received_message"] = message + nonlocal_py27_hack["received"].set() + + client.on_message_received = handle_on_message_received + + service_helper.send_c2d(device_id, module_id, message, {}) + + received.wait(timeout=10) + + assert nonlocal_py27_hack["received_message"].data.decode("utf-8") == message diff --git a/device_e2e/sync/test_sync_connect_disconnect.py b/device_e2e/sync/test_sync_connect_disconnect.py new file mode 100644 index 000000000..b4859ec4f --- /dev/null +++ b/device_e2e/sync/test_sync_connect_disconnect.py @@ -0,0 +1,56 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import logging +import time +import test_config + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + + +@pytest.mark.describe("Device Client") +class TestConnectDisconnect(object): + @pytest.mark.it("Can disconnect and reconnect") + @pytest.mark.parametrize(*test_config.connection_retry_disabled_and_enabled) + @pytest.mark.parametrize(*test_config.auto_connect_off_and_on) + def test_connect_disconnect(self, brand_new_client): + client = brand_new_client + + client.connect() + assert client.connected + + client.disconnect() + assert not client.connected + + client.connect() + assert client.connected + + +@pytest.mark.dropped_connection +@pytest.mark.describe("Device Client with dropped connection") +class TestConnectDisconnectDroppedConnection(object): + @pytest.fixture(scope="class") + def extra_client_kwargs(self): + return {"keep_alive": 5} + + @pytest.mark.it("disconnects when network drops all outgoing packets") + def test_disconnect_on_drop_outgoing(self, client, dropper): + + client.connect() + assert client.connected + dropper.drop_outgoing() + + while client.connected: + time.sleep(1) + + @pytest.mark.it("disconnects when network rejects all outgoing packets") + def test_disconnect_on_reject_outgoing(self, client, dropper): + + client.connect() + assert client.connected + dropper.reject_outgoing() + + while client.connected: + time.sleep(1) diff --git a/device_e2e/sync/test_sync_methods.py b/device_e2e/sync/test_sync_methods.py new file mode 100644 index 000000000..f87a22196 --- /dev/null +++ b/device_e2e/sync/test_sync_methods.py @@ -0,0 +1,94 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import logging +import time +from utils import get_random_dict +from azure.iot.device.iothub import MethodResponse + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + + +@pytest.fixture +def method_name(): + return "this_is_my_method_name" + + +@pytest.fixture +def method_response_status(): + return 299 + + +@pytest.mark.describe("Device Client methods") +class TestMethods(object): + @pytest.mark.it("Can handle a simple direct method call") + @pytest.mark.parametrize( + "include_request_payload", + [ + pytest.param(True, id="with request payload"), + pytest.param(False, id="without request payload"), + ], + ) + @pytest.mark.parametrize( + "include_response_payload", + [ + pytest.param(True, id="with response payload"), + pytest.param(False, id="without response payload"), + ], + ) + def test_handle_method_call( + self, + client, + method_name, + device_id, + module_id, + method_response_status, + include_request_payload, + include_response_payload, + service_helper, + ): + + if include_request_payload: + request_payload = get_random_dict() + else: + request_payload = None + + if include_response_payload: + response_payload = get_random_dict() + else: + response_payload = None + + # hack needed because there is no `nonlocal` keyword in py27. + nonlocal_py27_hack = {"actual_request": None} + + def handle_on_method_request_received(request): + logger.info("Method request for {} received".format(request.name)) + nonlocal_py27_hack["actual_request"] = request + logger.info("Sending response") + client.send_method_response( + MethodResponse.create_from_method_request( + request, method_response_status, response_payload + ) + ) + + client.on_method_request_received = handle_on_method_request_received + time.sleep(1) # wait for subscribe, etc, to complete + + # invoke the method call + method_response = service_helper.invoke_method( + device_id, module_id, method_name, request_payload + ) + + # verify that the method request arrived correctly + actual_request = nonlocal_py27_hack["actual_request"] + assert actual_request.name == method_name + if request_payload: + assert actual_request.payload == request_payload + else: + assert not actual_request.payload + + # and make sure the response came back successfully + assert method_response.status == method_response_status + assert method_response.payload == response_payload diff --git a/device_e2e/sync/test_sync_sas_renewal.py b/device_e2e/sync/test_sync_sas_renewal.py new file mode 100644 index 000000000..f31bc5c19 --- /dev/null +++ b/device_e2e/sync/test_sync_sas_renewal.py @@ -0,0 +1,76 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import json +import logging +import threading +import test_config + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + + +@pytest.mark.describe("Client sas renewal code") +class TestSasRenewalReconnectEnabled(object): + @pytest.fixture(scope="class") + def extra_client_kwargs(self): + # should renew after 10 seconds + return {"sastoken_ttl": 130} + + @pytest.mark.it("Renews and reconnects before expiry") + @pytest.mark.parametrize(*test_config.connection_retry_disabled_and_enabled) + @pytest.mark.parametrize(*test_config.auto_connect_off_and_on) + def test_sas_renews(self, client, get_next_eventhub_arrival, random_message): + + connected_event = threading.Event() + disconnected_event = threading.Event() + + token_object = client._mqtt_pipeline._pipeline.pipeline_configuration.sastoken + + # hack needed because there is no `nonlocal` keyword in py27. + nonlocal_py27_hack = {"token_at_connect_time": None} + + def handle_on_connection_state_change(): + logger.info("handle_on_connection_state_change: {}".format(client.connected)) + if client.connected: + nonlocal_py27_hack["token_at_connect_time"] = str(token_object) + logger.info("saving token: {}".format(nonlocal_py27_hack["token_at_connect_time"])) + + connected_event.set() + else: + disconnected_event.set() + + client.on_connection_state_change = handle_on_connection_state_change + + # setting on_connection_state_change seems to have the side effect of + # calling handle_on_connection_state_change once with the initial value. + # Wait for one disconnect/reconnect cycle so we can get past it. + connected_event.wait() + + # OK, we're ready to test. wait for the renewal + token_before_connect = str(token_object) + + disconnected_event.clear() + connected_event.clear() + + logger.info("Waiting for client to disconnect") + disconnected_event.wait() + logger.info("Waiting for client to reconnect") + connected_event.wait() + logger.info("Client reconnected") + + # Finally verify that our token changed. + token_at_connect_time = nonlocal_py27_hack["token_at_connect_time"] + logger.info("token now = {}".format(str(token_object))) + logger.info("token at_connect = {}".format(str(token_at_connect_time))) + logger.info("token before_connect = {}".format(str(token_before_connect))) + assert str(token_object) == token_at_connect_time + assert not token_before_connect == token_at_connect_time + + # and verify that we can send + client.send_message(random_message) + + # and verify that the message arrived at the service + event = get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data diff --git a/device_e2e/sync/test_sync_send_message.py b/device_e2e/sync/test_sync_send_message.py new file mode 100644 index 000000000..25a96d0e2 --- /dev/null +++ b/device_e2e/sync/test_sync_send_message.py @@ -0,0 +1,167 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import logging +import json +import time +from azure.iot.device.exceptions import OperationCancelled + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + + +@pytest.mark.describe("Device Client send_message method") +class TestSendMessage(object): + @pytest.mark.it("Can send a simple message") + def test_send_message(self, client, random_message, get_next_eventhub_arrival): + + client.send_message(random_message) + + event = get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + @pytest.mark.it("Connects the transport if necessary") + def test_connect_if_necessary(self, client, random_message, get_next_eventhub_arrival): + + client.disconnect() + assert not client.connected + + client.send_message(random_message) + assert client.connected + + event = get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + +@pytest.mark.dropped_connection +@pytest.mark.describe("Device Client send_message method with dropped connections") +class TestSendMessageDroppedConnection(object): + @pytest.fixture(scope="class") + def extra_client_kwargs(self): + return {"keep_alive": 5} + + @pytest.mark.it("Sends if connection drops before sending") + def test_sends_if_drop_before_sending( + self, client, random_message, dropper, get_next_eventhub_arrival, executor + ): + + assert client.connected + + dropper.drop_outgoing() + send_task = executor.submit(client.send_message, random_message) + + while client.connected: + time.sleep(1) + + assert not send_task.done() + + dropper.restore_all() + while not client.connected: + time.sleep(1) + + send_task.result() + + event = get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + @pytest.mark.it("Sends if connection rejects send") + def test_sends_if_reject_before_sending( + self, client, random_message, dropper, get_next_eventhub_arrival, executor + ): + + assert client.connected + + dropper.reject_outgoing() + send_task = executor.submit(client.send_message, random_message) + + while client.connected: + time.sleep(1) + + assert not send_task.done() + + dropper.restore_all() + while not client.connected: + time.sleep(1) + + send_task.result() + + event = get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + +@pytest.mark.describe("Device Client send_message with reconnect disabled") +class TestSendMessageRetryDisabled(object): + @pytest.fixture(scope="class") + def extra_client_kwargs(self): + return {"keep_alive": 5, "connection_retry": False} + + @pytest.fixture(scope="function", autouse=True) + def reconnect_after_test(self, dropper, client): + yield + dropper.restore_all() + client.connect() + assert client.connected + + @pytest.mark.it("Can send a simple message") + def test_send_message(self, client, random_message, get_next_eventhub_arrival): + client.send_message(random_message) + + event = get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + @pytest.mark.it("Automatically connects if transport manually disconnected before sending") + def test_connect_if_necessary(self, client, random_message, get_next_eventhub_arrival): + + client.disconnect() + assert not client.connected + + client.send_message(random_message) + assert client.connected + + event = get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + @pytest.mark.it("Automatically connects if transport automatically disconnected before sending") + def test_connects_after_automatic_disconnect( + self, client, random_message, dropper, get_next_eventhub_arrival + ): + + assert client.connected + + dropper.drop_outgoing() + while client.connected: + time.sleep(1) + + assert not client.connected + dropper.restore_all() + client.send_message(random_message) + assert client.connected + + event = get_next_eventhub_arrival() + assert json.dumps(event.message_body) == random_message.data + + @pytest.mark.it("Fails if connection disconnects before sending") + def test_fails_if_disconnect_before_sending(self, client, random_message, dropper, executor): + + assert client.connected + + dropper.drop_outgoing() + send_task = executor.submit(client.send_message, random_message) + + while client.connected: + time.sleep(1) + + with pytest.raises(OperationCancelled): + send_task.result() + + @pytest.mark.it("Fails if connection drops before sending") + def test_fails_if_drop_before_sending(self, client, random_message, dropper): + + assert client.connected + + dropper.drop_outgoing() + with pytest.raises(OperationCancelled): + client.send_message(random_message) + + assert not client.connected diff --git a/device_e2e/sync/test_sync_twin.py b/device_e2e/sync/test_sync_twin.py new file mode 100644 index 000000000..a38c58b03 --- /dev/null +++ b/device_e2e/sync/test_sync_twin.py @@ -0,0 +1,171 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest +import logging +import time +import threading +import const +from utils import get_random_dict + +logger = logging.getLogger(__name__) +logger.setLevel(level=logging.INFO) + + +# TODO: tests with drop_incoming and reject_incoming + +reset_reported_props = {const.TEST_CONTENT: None} + + +@pytest.mark.describe("Device Client Reported Properties") +class TestReportedProperties(object): + @pytest.mark.it("Can set a simple reported property") + def test_simple_patch(self, client, reported_props, get_next_reported_patch_arrival): + + # patch properties + client.patch_twin_reported_properties(reported_props) + + # wait for patch to arrive at service and verify + received_patch = get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + ) + + # get twin from the service and verify content + twin = client.get_twin() + assert twin[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + + @pytest.mark.it("Can clear a reported property") + def test_clear_property(self, client, reported_props, get_next_reported_patch_arrival): + + # patch properties and verify that the service received the patch + client.patch_twin_reported_properties(reported_props) + received_patch = get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + ) + + # send a patch clearing properties and verify that the service received that patch + client.patch_twin_reported_properties(reset_reported_props) + received_patch = get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] + == reset_reported_props[const.TEST_CONTENT] + ) + + # get the twin and verify that the properties are no longer part of the twin + twin = client.get_twin() + assert const.TEST_CONTENT not in twin[const.REPORTED] + + @pytest.mark.it("Connects the transport if necessary") + def test_connect_if_necessary(self, client, reported_props, get_next_reported_patch_arrival): + + client.disconnect() + + assert not client.connected + client.patch_twin_reported_properties(reported_props) + assert client.connected + + received_patch = get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + ) + + twin = client.get_twin() + assert twin[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + + +@pytest.mark.dropped_connection +@pytest.mark.describe("Device Client Reported Properties with dropped connection") +class TestReportedPropertiesDroppedConnection(object): + @pytest.fixture(scope="class") + def extra_client_kwargs(self): + return {"keep_alive": 5} + + # TODO: split drop tests between first and second patches + + @pytest.mark.it("Sends if connection drops before sending") + def test_sends_if_drop_before_sending( + self, client, reported_props, dropper, get_next_reported_patch_arrival, executor + ): + + assert client.connected + dropper.drop_outgoing() + + send_task = executor.submit(client.patch_twin_reported_properties, reported_props) + while client.connected: + time.sleep(1) + + assert not send_task.done() + + dropper.restore_all() + while not client.connected: + time.sleep(1) + + send_task.result() + + received_patch = get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + ) + + @pytest.mark.it("Sends if connection rejects send") + def test_sends_if_reject_before_sending( + self, client, reported_props, dropper, get_next_reported_patch_arrival, executor + ): + + assert client.connected + dropper.reject_outgoing() + + send_task = executor.submit(client.patch_twin_reported_properties, reported_props) + while client.connected: + time.sleep(1) + + assert not send_task.done() + + dropper.restore_all() + while not client.connected: + time.sleep(1) + + send_task.result() + + received_patch = get_next_reported_patch_arrival() + assert ( + received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT] + ) + + +@pytest.mark.describe("Device Client Desired Properties") +class TestDesiredProperties(object): + @pytest.mark.it("Receives a patch for a simple desired property") + def test_simple_patch(self, client, service_helper, device_id, module_id): + + received = threading.Event() + + # hack needed because there is no `nonlocal` keyword in py27. + nonlocal_py27_hack = {"received_patch": None, "received": received} + + def handle_on_patch_received(patch): + print("received {}".format(patch)) + nonlocal_py27_hack["received_patch"] = patch + nonlocal_py27_hack["received"].set() + + client.on_twin_desired_properties_patch_received = handle_on_patch_received + + random_dict = get_random_dict() + service_helper.set_desired_properties( + device_id, + module_id, + {const.TEST_CONTENT: random_dict}, + ) + + received.wait(timeout=10) + logger.info("got it") + + assert nonlocal_py27_hack["received_patch"][const.TEST_CONTENT] == random_dict + + twin = client.get_twin() + assert twin[const.DESIRED][const.TEST_CONTENT] == random_dict + + +# TODO: etag tests, version tests diff --git a/device_e2e/test_config.py b/device_e2e/test_config.py new file mode 100644 index 000000000..439f2ea4b --- /dev/null +++ b/device_e2e/test_config.py @@ -0,0 +1,42 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import pytest + +TRANSPORT_MQTT = "mqtt" +TRANSPORT_MQTT_WS = "mqttws" +TRANSPORT_CHOICES = [TRANSPORT_MQTT, TRANSPORT_MQTT_WS] + +IDENTITY_DEVICE_CLIENT = "deviceclient" +IDENTITY_MODULE_CLIENT = "module_client" +IDENTITY_CHOICES = [IDENTITY_DEVICE_CLIENT, IDENTITY_MODULE_CLIENT] + +AUTH_CONNECTION_STRING = "connection_string" +AUTH_X509 = "x509" +AUTH_CHOICES = [AUTH_CONNECTION_STRING, AUTH_X509] + + +class Config(object): + def __init__(self): + self.transport = TRANSPORT_MQTT + self.identity = IDENTITY_DEVICE_CLIENT + self.auth = AUTH_CONNECTION_STRING + + +config = Config() + +connection_retry_disabled_and_enabled = [ + "connection_retry", + [ + pytest.param(True, id="connection_retry enabled"), + pytest.param(False, id="connection_retry disabled"), + ], +] + +auto_connect_off_and_on = [ + "auto_connect", + [ + pytest.param(True, id="auto_connect enabled"), + pytest.param(False, id="auto_connect disabled"), + ], +] diff --git a/device_e2e/utils.py b/device_e2e/utils.py new file mode 100644 index 000000000..e8d388286 --- /dev/null +++ b/device_e2e/utils.py @@ -0,0 +1,42 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +import random +import string +import json +import uuid +import const +from azure.iot.device.iothub import Message + + +def get_random_dict(): + return { + "random_guid": str(uuid.uuid4()), + "sub_object": { + "string_value": "".join( + random.choice(string.ascii_uppercase + string.digits) for _ in range(10) + ), + "bool_value": random.random() > 0.5, + "int_value": random.randint(-65535, 65535), + }, + } + + +def get_random_message(): + message = Message(json.dumps(get_random_dict())) + message.content_type = const.JSON_CONTENT_TYPE + message.content_encoding = const.JSON_CONTENT_ENCODING + return message + + +def make_pnp_desired_property_patch(component_name, property_name, property_value): + if component_name: + return [ + { + "op": "add", + "path": "/{}".format(component_name), + "value": {property_name: property_value, "$metadata": {}}, + } + ] + else: + return [{"op": "add", "path": "/{}".format(property_name), "value": property_value}]