зеркало из
1
0
Форкнуть 0
* e2e tests inistial checkin

* fix to run on py27

* Rename service helper objects

* rename fixtures

* spelling typo

* client createion based on command line args

* sas renewal tests improved and expanded

* parametrize connect_disconnect tests

* fix client_kwargs to use extra_client_args to override parametrized args

* add logging

* move send_message tests together
This commit is contained in:
Bert Kleewein 2021-10-06 16:17:30 -07:00 коммит произвёл GitHub
Родитель c61cd24ba5
Коммит 8d9601dd25
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
31 изменённых файлов: 2808 добавлений и 1 удалений

5
.gitignore поставляемый
Просмотреть файл

@ -121,4 +121,7 @@ coverage/
# Certificates
*.pem
demoCA/
*.rnd
*.rnd
# e2e
**/_e2e_settings.json

Просмотреть файл

@ -0,0 +1,86 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import asyncio
import functools
import time
import e2e_settings
import test_config
import logging
from service_helper import ServiceHelper
from azure.iot.device.iothub.aio import IoTHubDeviceClient, IoTHubModuleClient
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
@pytest.fixture(scope="module")
def event_loop():
loop = asyncio.get_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="function")
async def brand_new_client(client_kwargs):
client = None
if test_config.config.identity == test_config.IDENTITY_DEVICE_CLIENT:
ClientClass = IoTHubDeviceClient
elif test_config.config.identity == test_config.IDENTITY_MODULE_CLIENT:
ClientClass = IoTHubModuleClient
else:
raise Exception("config.identity invalid")
if test_config.config.auth == test_config.AUTH_CONNECTION_STRING:
# TODO: This is currently using a connection string stored in _e2e_settings.xml. This will move to be a dynamically created identity similar to the way node's device_identity_helper.js works.
logger.info(
"Creating {} using create_from_connection_string with kwargs={}".format(
ClientClass, client_kwargs
)
)
client = ClientClass.create_from_connection_string(
e2e_settings.DEVICE_CONNECTION_STRING, **client_kwargs
)
elif test_config.config.auth == test_config.X509:
# need to implement
raise Exception("X509 Auth not yet implemented")
else:
raise Exception("config.auth invalid")
yield client
await client.shutdown()
@pytest.fixture(scope="function")
async def client(brand_new_client):
client = brand_new_client
await client.connect()
yield client
@pytest.fixture(scope="module")
async def service_helper(event_loop, executor):
service_helper = ServiceHelper(event_loop, executor)
time.sleep(1)
yield service_helper
print("shutting down")
await service_helper.shutdown()
@pytest.fixture(scope="function")
def get_next_eventhub_arrival(
event_loop, executor, service_helper, device_id, module_id, watches_events # noqa: F811
):
yield functools.partial(service_helper.get_next_eventhub_arrival, device_id, module_id)
@pytest.fixture(scope="function")
def get_next_reported_patch_arrival(
event_loop, executor, service_helper, device_id, module_id, watches_events # noqa: F811
):
yield functools.partial(service_helper.get_next_reported_patch_arrival, device_id, module_id)

Просмотреть файл

@ -0,0 +1,40 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import asyncio
import pytest
import logging
import json
from utils import get_random_dict
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
pytestmark = pytest.mark.asyncio
# TODO: add tests for various application properties
# TODO: is there a way to call send_c2d so it arrives as an object rather than a JSON string?
@pytest.mark.describe("Device Client C2d")
class TestSendMessage(object):
@pytest.mark.it("Can receive C2D")
async def test_send_message(self, client, service_helper, device_id, module_id, event_loop):
message = json.dumps(get_random_dict())
received_message = None
received = asyncio.Event()
async def handle_on_message_received(message):
nonlocal received_message, received
logger.info("received {}".format(message))
received_message = message
event_loop.call_soon_threadsafe(received.set)
client.on_message_received = handle_on_message_received
await service_helper.send_c2d(device_id, module_id, message, {})
await asyncio.wait_for(received.wait(), 10)
assert received_message.data.decode("utf-8") == message

Просмотреть файл

@ -0,0 +1,59 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import asyncio
import pytest
import logging
import test_config
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
pytestmark = pytest.mark.asyncio
@pytest.mark.describe("Device Client")
class TestConnectDisconnect(object):
@pytest.mark.it("Can disconnect and reconnect")
@pytest.mark.parametrize(*test_config.connection_retry_disabled_and_enabled)
@pytest.mark.parametrize(*test_config.auto_connect_off_and_on)
async def test_connect_disconnect(self, brand_new_client):
client = brand_new_client
assert client
await client.connect()
assert client.connected
await client.disconnect()
assert not client.connected
await client.connect()
assert client.connected
@pytest.mark.dropped_connection
@pytest.mark.describe("Device Client with dropped connection")
class TestConnectDisconnectDroppedConnection(object):
@pytest.fixture(scope="class")
def extra_client_kwargs(self):
return {"keep_alive": 5}
@pytest.mark.it("disconnects when network drops all outgoing packets")
async def test_disconnect_on_drop_outgoing(self, client, dropper):
await client.connect()
assert client.connected
dropper.drop_outgoing()
while client.connected:
await asyncio.sleep(1)
@pytest.mark.it("disconnects when network rejects all outgoing packets")
async def test_disconnect_on_reject_outgoing(self, client, dropper):
await client.connect()
assert client.connected
dropper.reject_outgoing()
while client.connected:
await asyncio.sleep(1)

Просмотреть файл

@ -0,0 +1,94 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import logging
import asyncio
from utils import get_random_dict
from azure.iot.device.iothub import MethodResponse
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
pytestmark = pytest.mark.asyncio
@pytest.fixture
def method_name():
return "this_is_my_method_name"
@pytest.fixture
def method_response_status():
return 299
@pytest.mark.describe("Device Client methods")
class TestMethods(object):
@pytest.mark.it("Can handle a simple direct method call")
@pytest.mark.parametrize(
"include_request_payload",
[
pytest.param(True, id="with request payload"),
pytest.param(False, id="without request payload"),
],
)
@pytest.mark.parametrize(
"include_response_payload",
[
pytest.param(True, id="with response payload"),
pytest.param(False, id="without response payload"),
],
)
async def test_handle_method_call(
self,
client,
method_name,
device_id,
module_id,
method_response_status,
include_request_payload,
include_response_payload,
service_helper,
):
actual_request = None
if include_request_payload:
request_payload = get_random_dict()
else:
request_payload = None
if include_response_payload:
response_payload = get_random_dict()
else:
response_payload = None
async def handle_on_method_request_received(request):
nonlocal actual_request
logger.info("Method request for {} received".format(request.name))
actual_request = request
logger.info("Sending response")
await client.send_method_response(
MethodResponse.create_from_method_request(
request, method_response_status, response_payload
)
)
client.on_method_request_received = handle_on_method_request_received
await asyncio.sleep(1) # wait for subscribe, etc, to complete
# invoke the method call
method_response = await service_helper.invoke_method(
device_id, module_id, method_name, request_payload
)
# verify that the method request arrived correctly
assert actual_request.name == method_name
if request_payload:
assert actual_request.payload == request_payload
else:
assert not actual_request.payload
# and make sure the response came back successfully
assert method_response.status == method_response_status
assert method_response.payload == response_payload

Просмотреть файл

@ -0,0 +1,113 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import logging
import asyncio
from utils import get_random_dict
import azure.iot.device.iothub
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
pytestmark = pytest.mark.asyncio
try:
CommandResponse = azure.iot.device.iothub.CommandResponse
except AttributeError:
# only run if PNP enabled
pytestmark = pytest.mark.skip
@pytest.fixture(scope="class")
def extra_client_kwargs(pnp_model_id):
return {"model_id": pnp_model_id}
@pytest.mark.pnp
@pytest.mark.describe("Pnp Commands")
class TestPnpCommands(object):
@pytest.mark.it("Can handle a simple command")
@pytest.mark.parametrize(
"include_request_payload",
[
pytest.param(True, id="with request payload"),
pytest.param(False, id="without request payload"),
],
)
@pytest.mark.parametrize(
"include_response_payload",
[
pytest.param(True, id="with response payload"),
pytest.param(False, id="without response payload"),
],
)
@pytest.mark.parametrize(
"include_component_name",
[
pytest.param(True, id="with component name"),
pytest.param(False, id="without component name"),
],
)
async def test_handle_pnp_commmand(
self,
client,
event_loop,
pnp_command_name,
pnp_component_name,
pnp_command_response_status,
include_component_name,
include_request_payload,
include_response_payload,
service_helper,
device_id,
module_id,
):
actual_request = None
if include_request_payload:
request_payload = get_random_dict()
else:
request_payload = ""
if include_response_payload:
response_payload = get_random_dict()
else:
response_payload = None
async def handle_on_command_request_received(request):
nonlocal actual_request
logger.info(
"command request for component {}, command {} received".format(
request.component_name, request.command_name
)
)
actual_request = request
logger.info("Sending response")
await client.send_command_response(
CommandResponse.create_from_command_request(
request, pnp_command_response_status, response_payload
)
)
client.on_command_request_received = handle_on_command_request_received
await asyncio.sleep(1) # wait for subscribe, etc, to complete
# invoke the command
command_response = await service_helper.invoke_pnp_command(
device_id, module_id, pnp_component_name, pnp_command_name, request_payload
)
# verify that the method request arrived correctly
assert actual_request.command_name == pnp_command_name
assert actual_request.component_name == pnp_component_name
if request_payload:
assert actual_request.payload == request_payload
else:
assert not actual_request.payload
# and make sure the response came back successfully
# Currently no way to check the command response status code because the DigitalTwinClient A
# object in the service SDK does not return this to the caller.
# assert command_response[Fields.COMMAND_RESPONSE_STATUS_CODE] == command_response_status
assert command_response == response_payload

Просмотреть файл

@ -0,0 +1,54 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import logging
import azure.iot.device.iothub
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
pytestmark = pytest.mark.asyncio
if not getattr(azure.iot.device.iothub, "CommandRequest", None):
# only run if PNP enabled
pytestmark = pytest.mark.skip
@pytest.fixture(scope="class")
def pnp_model_id():
return "dtmi:com:example:TemperatureController;2"
@pytest.fixture(scope="class")
def extra_client_kwargs(pnp_model_id):
return {"model_id": pnp_model_id}
@pytest.mark.pnp
@pytest.mark.describe("Device Client PNP Connection")
class TestPnpConnect(object):
@pytest.mark.it("Can connect and disconnect with model_id set")
async def test_connect(self, client, pnp_model_id):
assert client._mqtt_pipeline.pipeline_configuration.model_id == pnp_model_id
assert client
await client.connect()
assert client.connected
await client.disconnect()
assert not client.connected
await client.connect()
assert client.connected
@pytest.mark.it("Shows up as a PNP device in the service client")
async def test_model_id_in_service_helper(
self, client, pnp_model_id, device_id, module_id, service_helper
):
assert client._mqtt_pipeline.pipeline_configuration.model_id == pnp_model_id
assert client.connected
props = await service_helper.get_pnp_properties(device_id, module_id)
assert props["$metadata"]["$model"] == pnp_model_id

Просмотреть файл

@ -0,0 +1,270 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import logging
import asyncio
import pprint
import azure.iot.device.iothub
from utils import get_random_dict, make_pnp_desired_property_patch
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
pytestmark = pytest.mark.asyncio
try:
ClientPropertyCollection = azure.iot.device.iothub.ClientPropertyCollection
generate_writable_property_response = (
azure.iot.device.iothub.generate_writable_property_response
)
except AttributeError:
# only run if PNP enabled
pytestmark = pytest.mark.skip
@pytest.fixture(scope="class")
def extra_client_kwargs(pnp_model_id):
return {"model_id": pnp_model_id}
@pytest.mark.pnp
@pytest.mark.parametrize(
"is_component_property",
[pytest.param(True, id="component property"), pytest.param(False, id="root property")],
)
@pytest.mark.describe("Device Client PNP properties")
class TestPnpSetProperties(object):
@pytest.mark.it(
"Can set a reported property value and retrieve it via the service get_digital_twin function"
)
async def test_set_reported_property(
self,
client,
pnp_read_only_property_name,
pnp_component_name,
is_component_property,
service_helper,
device_id,
module_id,
):
random_property_value = get_random_dict()
assert client.connected
patch = ClientPropertyCollection()
if is_component_property:
patch.set_component_property(
pnp_component_name, pnp_read_only_property_name, random_property_value
)
else:
patch.set_property(pnp_read_only_property_name, random_property_value)
logger.info("Setting {} to {}".format(pnp_read_only_property_name, random_property_value))
await client.update_client_properties(patch)
while True:
properties = await service_helper.get_pnp_properties(device_id, module_id)
if is_component_property:
actual_value = properties.get(pnp_component_name, {}).get(
pnp_read_only_property_name, None
)
else:
actual_value = properties.get(pnp_read_only_property_name, None)
if actual_value == random_property_value:
return
else:
logger.warning(
"property not matched yet. Expected = {}, actual = {}".format(
random_property_value, actual_value
)
)
logger.warning(
"digital_twin_client.get_digital_twin returned {}".format(
pprint.pformat(properties)
)
)
await asyncio.sleep(5)
@pytest.mark.it("Can retrieve a reported property via the get_client_properties function")
async def test_get_reported_property(
self,
client,
pnp_read_only_property_name,
pnp_component_name,
is_component_property,
):
random_property_value = get_random_dict()
assert client.connected
patch = ClientPropertyCollection()
if is_component_property:
patch.set_component_property(
pnp_component_name, pnp_read_only_property_name, random_property_value
)
else:
patch.set_property(pnp_read_only_property_name, random_property_value)
logger.info("Setting {} to {}".format(pnp_read_only_property_name, random_property_value))
await client.update_client_properties(patch)
properties = await client.get_client_properties()
if is_component_property:
assert (
properties.reported_from_device.get_component_property(
pnp_component_name, pnp_read_only_property_name
)
== random_property_value
)
assert properties.reported_from_device.backing_object[pnp_component_name]["__t"] == "c"
else:
assert (
properties.reported_from_device.get_property(pnp_read_only_property_name)
== random_property_value
)
@pytest.mark.it("Can retrieve a desired property via the get_client_properties function")
async def test_desired_properties_via_get_client_properties(
self,
event_loop,
client,
pnp_component_name,
pnp_writable_property_name,
is_component_property,
service_helper,
device_id,
module_id,
):
random_property_value = get_random_dict()
received = asyncio.Event()
async def handle_on_patch_received(patch):
nonlocal received
logger.info("received {}".format(patch))
event_loop.call_soon_threadsafe(received.set)
client.on_writable_property_update_request_received = handle_on_patch_received
await asyncio.sleep(1)
props = make_pnp_desired_property_patch(
pnp_component_name if is_component_property else None,
pnp_writable_property_name,
random_property_value,
)
await service_helper.update_pnp_properties(device_id, module_id, props)
# wait for the desired property patch to arrive at the client
# We don't actually check the contents of the patch, but the
# fact that it arrived means the device registry should have
# finished ingesting the patch
await asyncio.wait_for(received.wait(), 10)
logger.info("got it")
properties = await client.get_client_properties()
if is_component_property:
assert (
properties.writable_properties_requests.get_component_property(
pnp_component_name, pnp_writable_property_name
)
== random_property_value
)
assert (
properties.writable_properties_requests.backing_object[pnp_component_name]["__t"]
== "c"
)
else:
assert (
properties.writable_properties_requests.get_property(pnp_writable_property_name)
== random_property_value
)
@pytest.mark.it(
"can receive a desired property patch and corectly respond with a writable_property_response"
)
async def test_receive_desired_property_patch(
self,
event_loop,
client,
pnp_component_name,
pnp_writable_property_name,
is_component_property,
pnp_ack_code,
pnp_ack_description,
service_helper,
device_id,
module_id,
):
random_property_value = get_random_dict()
received_patch = None
received = asyncio.Event()
async def handle_on_patch_received(patch):
nonlocal received_patch, received
logger.info("received {}".format(patch))
received_patch = patch
event_loop.call_soon_threadsafe(received.set)
client.on_writable_property_update_request_received = handle_on_patch_received
await asyncio.sleep(1)
# patch desired properites
props = make_pnp_desired_property_patch(
pnp_component_name if is_component_property else None,
pnp_writable_property_name,
random_property_value,
)
await service_helper.update_pnp_properties(device_id, module_id, props)
logger.info("patch sent. Waiting for desired proprety")
# wait for the desired property patch to arrive at the client
await asyncio.wait_for(received.wait(), 10)
logger.info("got it")
# validate the patch
if is_component_property:
assert (
received_patch.get_component_property(
pnp_component_name, pnp_writable_property_name
)
== random_property_value
)
assert received_patch.backing_object[pnp_component_name]["__t"] == "c"
else:
assert received_patch.get_property(pnp_writable_property_name) == random_property_value
# make a reported property patch to respond
update_patch = ClientPropertyCollection()
property_response = generate_writable_property_response(
random_property_value, pnp_ack_code, pnp_ack_description, received_patch.version
)
if is_component_property:
update_patch.set_component_property(
pnp_component_name, pnp_writable_property_name, property_response
)
else:
update_patch.set_property(pnp_writable_property_name, property_response)
# send the reported property patch
await client.update_client_properties(update_patch)
# verify that the reported value via digital_twin_client.get_digital_twin()
props = await service_helper.get_pnp_properties(device_id, module_id)
if is_component_property:
props = props[pnp_component_name]
assert props[pnp_writable_property_name] == random_property_value
metadata = props["$metadata"][pnp_writable_property_name]
assert metadata["ackCode"] == pnp_ack_code
assert metadata["ackDescription"] == pnp_ack_description
assert metadata["ackVersion"] == received_patch.version
assert metadata["desiredVersion"] == received_patch.version
assert metadata["desiredValue"] == random_property_value
# TODO: etag tests, version tests

Просмотреть файл

@ -0,0 +1,42 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import logging
import pprint
import const
from utils import get_random_dict
import azure.iot.device.iothub
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
pytestmark = pytest.mark.asyncio
if not getattr(azure.iot.device.iothub, "CommandRequest", None):
# only run if PNP enabled
pytestmark = pytest.mark.skip
@pytest.fixture(scope="class")
def extra_client_kwargs(pnp_model_id):
return {"model_id": pnp_model_id}
@pytest.mark.pnp
@pytest.mark.describe("Pnp Telemetry")
class TestPnpTelemetry(object):
@pytest.mark.it("Can send a telemetry message")
async def test_send_pnp_telemetry(self, client, pnp_model_id, get_next_eventhub_arrival):
telemetry = get_random_dict()
await client.send_telemetry(telemetry)
event = await get_next_eventhub_arrival()
logger.info(pprint.pformat(event))
assert event.message_body == telemetry
system_props = event.system_properties
assert system_props[const.EVENTHUB_SYSPROP_DT_DATASCHEMA] == pnp_model_id
assert system_props[const.EVENTHUB_SYSPROP_CONTENT_TYPE] == const.JSON_CONTENT_TYPE
assert system_props[const.EVENTHUB_SYSPROP_CONTENT_ENCODING] == const.JSON_CONTENT_ENCODING

Просмотреть файл

@ -0,0 +1,80 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import asyncio
import pytest
import json
import logging
import test_config
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
pytestmark = pytest.mark.asyncio
@pytest.mark.describe("Client sas renewal code")
class TestSasRenewalReconnectEnabled(object):
@pytest.fixture(scope="class")
def extra_client_kwargs(self):
# should renew after 10 seconds
return {"sastoken_ttl": 130}
@pytest.mark.it("Renews and reconnects before expiry")
@pytest.mark.parametrize(*test_config.connection_retry_disabled_and_enabled)
@pytest.mark.parametrize(*test_config.auto_connect_off_and_on)
async def test_sas_renews(self, client, event_loop, get_next_eventhub_arrival, random_message):
connected_event = asyncio.Event()
disconnected_event = asyncio.Event()
token_at_connect_time = None
logger.info("connected and ready")
token_object = client._mqtt_pipeline._pipeline.pipeline_configuration.sastoken
async def handle_on_connection_state_change():
nonlocal token_at_connect_time
logger.info("handle_on_connection_state_change: {}".format(client.connected))
if client.connected:
token_at_connect_time = str(token_object)
logger.info("saving token: {}".format(token_at_connect_time))
event_loop.call_soon_threadsafe(connected_event.set)
else:
event_loop.call_soon_threadsafe(disconnected_event.set)
client.on_connection_state_change = handle_on_connection_state_change
# setting on_connection_state_change seems to have the side effect of
# calling handle_on_connection_state_change once with the initial value.
# Wait for one disconnect/reconnect cycle so we can get past it.
await connected_event.wait()
# OK, we're ready to test. wait for the renewal
token_before_connect = str(token_object)
disconnected_event.clear()
connected_event.clear()
logger.info("Waiting for client to disconnect")
await disconnected_event.wait()
logger.info("Waiting for client to reconnect")
await connected_event.wait()
logger.info("Client reconnected")
# Finally verify that our token changed.
logger.info("token now = {}".format(str(token_object)))
logger.info("token at_connect = {}".format(str(token_at_connect_time)))
logger.info("token before_connect = {}".format(str(token_before_connect)))
assert str(token_object) == token_at_connect_time
assert not token_before_connect == token_at_connect_time
# and verify that we can send
await client.send_message(random_message)
# and verify that the message arrived at the service
# TODO incoming_event_queue.get should check thread future
event = await get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data

Просмотреть файл

@ -0,0 +1,181 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import asyncio
import pytest
import logging
import json
from azure.iot.device.exceptions import OperationCancelled
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
pytestmark = pytest.mark.asyncio
@pytest.mark.describe("Device Client send_message method")
class TestSendMessage(object):
@pytest.mark.it("Can send a simple message")
async def test_send_message(self, client, random_message, get_next_eventhub_arrival):
await client.send_message(random_message)
event = await get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.it("Connects the transport if necessary")
async def test_connect_if_necessary(self, client, random_message, get_next_eventhub_arrival):
await client.disconnect()
assert not client.connected
await client.send_message(random_message)
assert client.connected
event = await get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.dropped_connection
@pytest.mark.describe("Device Client send_message method with dropped connections")
class TestSendMessageDroppedConnection(object):
@pytest.fixture(scope="class")
def extra_client_kwargs(self):
return {"keep_alive": 5}
@pytest.mark.it("Sends if connection drops before sending")
async def test_sends_if_drop_before_sending(
self, client, random_message, dropper, get_next_eventhub_arrival
):
assert client.connected
dropper.drop_outgoing()
send_task = asyncio.create_task(client.send_message(random_message))
while client.connected:
await asyncio.sleep(1)
assert not send_task.done()
dropper.restore_all()
while not client.connected:
await asyncio.sleep(1)
await send_task
event = await get_next_eventhub_arrival()
logger.info("sent from device= {}".format(random_message.data))
logger.info("received at eventhub = {}".format(event.message_body))
assert json.dumps(event.message_body) == random_message.data
logger.info("Success")
@pytest.mark.it("Sends if connection rejects send")
async def test_sends_if_reject_before_sending(
self, client, random_message, dropper, get_next_eventhub_arrival
):
assert client.connected
dropper.reject_outgoing()
send_task = asyncio.create_task(client.send_message(random_message))
while client.connected:
await asyncio.sleep(1)
assert not send_task.done()
dropper.restore_all()
while not client.connected:
await asyncio.sleep(1)
await send_task
event = await get_next_eventhub_arrival()
logger.info("sent from device= {}".format(random_message.data))
logger.info("received at eventhub = {}".format(event.message_body))
assert json.dumps(event.message_body) == random_message.data
logger.info("Success")
@pytest.mark.describe("Device Client send_message with reconnect disabled")
class TestSendMessageRetryDisabled(object):
@pytest.fixture(scope="class")
def extra_client_kwargs(self):
return {"keep_alive": 5, "connection_retry": False}
@pytest.fixture(scope="function", autouse=True)
async def reconnect_after_test(self, dropper, client):
yield
dropper.restore_all()
await client.connect()
assert client.connected
@pytest.mark.it("Can send a simple message")
async def test_send_message(self, client, random_message, get_next_eventhub_arrival):
await client.send_message(random_message)
event = await get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.it("Automatically connects if transport manually disconnected before sending")
async def test_connect_if_necessary(self, client, random_message, get_next_eventhub_arrival):
await client.disconnect()
assert not client.connected
await client.send_message(random_message)
assert client.connected
event = await get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.it("Automatically connects if transport automatically disconnected before sending")
async def test_connects_after_automatic_disconnect(
self, client, random_message, dropper, get_next_eventhub_arrival
):
assert client.connected
dropper.drop_outgoing()
while client.connected:
await asyncio.sleep(1)
assert not client.connected
dropper.restore_all()
await client.send_message(random_message)
assert client.connected
event = await get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.it("Fails if connection disconnects before sending")
async def test_fails_if_disconnect_before_sending(self, client, random_message, dropper):
assert client.connected
dropper.drop_outgoing()
send_task = asyncio.create_task(client.send_message(random_message))
while client.connected:
await asyncio.sleep(1)
with pytest.raises(OperationCancelled):
await send_task
@pytest.mark.it("Fails if connection drops before sending")
async def test_fails_if_drop_before_sending(self, client, random_message, dropper):
assert client.connected
dropper.drop_outgoing()
with pytest.raises(OperationCancelled):
await client.send_message(random_message)
assert not client.connected

173
device_e2e/aio/test_twin.py Normal file
Просмотреть файл

@ -0,0 +1,173 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import asyncio
import pytest
import logging
import const
from utils import get_random_dict
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
pytestmark = pytest.mark.asyncio
# TODO: tests with drop_incoming and reject_incoming
reset_reported_props = {const.TEST_CONTENT: None}
@pytest.mark.describe("Device Client Reported Properties")
class TestReportedProperties(object):
@pytest.mark.it("Can set a simple reported property")
async def test_simple_patch(self, client, reported_props, get_next_reported_patch_arrival):
# patch properties
await client.patch_twin_reported_properties(reported_props)
# wait for patch to arrive at service and verify
received_patch = await get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
)
# get twin from the service and verify content
twin = await client.get_twin()
assert twin[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
@pytest.mark.it("Can clear a reported property")
async def test_clear_property(self, client, reported_props, get_next_reported_patch_arrival):
# patch properties and verify that the service received the patch
await client.patch_twin_reported_properties(reported_props)
received_patch = await get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
)
# send a patch clearing properties and verify that the service received that patch
await client.patch_twin_reported_properties(reset_reported_props)
received_patch = await get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT]
== reset_reported_props[const.TEST_CONTENT]
)
# get the twin and verify that the properties are no longer part of the twin
twin = await client.get_twin()
assert const.TEST_CONTENT not in twin[const.REPORTED]
@pytest.mark.it("Connects the transport if necessary")
async def test_connect_if_necessary(
self, client, reported_props, get_next_reported_patch_arrival
):
await client.disconnect()
assert not client.connected
await client.patch_twin_reported_properties(reported_props)
assert client.connected
received_patch = await get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
)
twin = await client.get_twin()
assert twin[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
@pytest.mark.dropped_connection
@pytest.mark.describe("Device Client Reported Properties with dropped connection")
class TestReportedPropertiesDroppedConnection(object):
@pytest.fixture(scope="class")
def extra_client_kwargs(self):
return {"keep_alive": 5}
# TODO: split drop tests between first and second patches
@pytest.mark.it("Sends if connection drops before sending")
async def test_sends_if_drop_before_sending(
self, client, reported_props, dropper, get_next_reported_patch_arrival
):
assert client.connected
dropper.drop_outgoing()
send_task = asyncio.create_task(client.patch_twin_reported_properties(reported_props))
while client.connected:
await asyncio.sleep(1)
assert not send_task.done()
dropper.restore_all()
while not client.connected:
await asyncio.sleep(1)
await send_task
received_patch = await get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
)
@pytest.mark.it("Sends if connection rejects send")
async def test_sends_if_reject_before_sending(
self, client, reported_props, dropper, get_next_reported_patch_arrival
):
assert client.connected
dropper.reject_outgoing()
send_task = asyncio.create_task(client.patch_twin_reported_properties(reported_props))
while client.connected:
await asyncio.sleep(1)
assert not send_task.done()
dropper.restore_all()
while not client.connected:
await asyncio.sleep(1)
await send_task
received_patch = await get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
)
@pytest.mark.describe("Device Client Desired Properties")
class TestDesiredProperties(object):
@pytest.mark.it("Receives a patch for a simple desired property")
async def test_simple_patch(self, client, event_loop, service_helper, device_id, module_id):
received_patch = None
received = asyncio.Event()
async def handle_on_patch_received(patch):
nonlocal received_patch, received
print("received {}".format(patch))
received_patch = patch
event_loop.call_soon_threadsafe(received.set)
client.on_twin_desired_properties_patch_received = handle_on_patch_received
random_dict = get_random_dict()
await service_helper.set_desired_properties(
device_id,
module_id,
{const.TEST_CONTENT: random_dict},
)
await asyncio.wait_for(received.wait(), 10)
logger.info("got it")
assert received_patch[const.TEST_CONTENT] == random_dict
twin = await client.get_twin()
assert twin[const.DESIRED][const.TEST_CONTENT] == random_dict
# TODO: etag tests, version tests

Просмотреть файл

@ -0,0 +1,81 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import copy
import collections
import json
import functools
import time
import sys
import const
import test_config
from azure.iot.device.iothub import Message
from utils import get_random_message, get_random_dict
from azure.iot.device.iothub import IoTHubDeviceClient
@pytest.fixture(scope="function")
def device_id(brand_new_client):
# TODO: suggest adding device_id and module_id to client object
return brand_new_client._mqtt_pipeline._pipeline.pipeline_configuration.device_id
@pytest.fixture(scope="function")
def module_id(brand_new_client):
return brand_new_client._mqtt_pipeline._pipeline.pipeline_configuration.module_id
@pytest.fixture(scope="function")
def reported_props():
return {const.TEST_CONTENT: get_random_dict()}
@pytest.fixture(scope="function")
def watches_events(service_helper, device_id, module_id):
service_helper.start_watching(device_id, module_id)
yield
service_helper.stop_watching(device_id, module_id)
@pytest.fixture(scope="function")
def random_message():
return get_random_message()
@pytest.fixture(scope="function")
def connection_retry():
return True
@pytest.fixture(scope="function")
def auto_connect():
return True
@pytest.fixture(scope="function")
def websockets():
return test_config.config.transport == test_config.TRANSPORT_MQTT_WS
@pytest.fixture(scope="function")
def extra_client_kwargs():
return {}
@pytest.fixture(scope="function")
def client_kwargs(extra_client_kwargs, auto_connect, connection_retry, websockets):
kwargs = {}
kwargs["auto_connect"] = auto_connect
kwargs["connection_retry"] = connection_retry
kwargs["websockets"] = websockets
for key, value in extra_client_kwargs.items():
kwargs[key] = value
return kwargs
collect_ignore = []
# Ignore Async tests if below Python 3.5
if sys.version_info < (3, 5):
collect_ignore.append("aio")

78
device_e2e/conftest.py Normal file
Просмотреть файл

@ -0,0 +1,78 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import logging
import concurrent.futures
import test_config
# noqa: F401 defined in .flake8 file in root of repo
from drop_fixtures import dropper
from pnp_fixtures import (
pnp_model_id,
pnp_command_name,
pnp_component_name,
pnp_command_response_status,
pnp_writable_property_name,
pnp_read_only_property_name,
pnp_ack_code,
pnp_ack_description,
)
from client_fixtures import (
client_kwargs,
extra_client_kwargs,
auto_connect,
connection_retry,
websockets,
device_id,
module_id,
reported_props,
watches_events,
random_message,
)
logging.basicConfig(level=logging.WARNING)
logging.getLogger("e2e").setLevel(level=logging.DEBUG)
logging.getLogger("paho").setLevel(level=logging.DEBUG)
logging.getLogger("azure.iot").setLevel(level=logging.DEBUG)
@pytest.fixture(scope="module")
def transport():
return test_config.config.transport
@pytest.fixture(scope="module")
def executor():
return concurrent.futures.ThreadPoolExecutor()
def pytest_addoption(parser):
parser.addoption(
"--transport",
help="Transport to use for tests",
type=str,
choices=test_config.TRANSPORT_CHOICES,
default=test_config.TRANSPORT_MQTT,
)
parser.addoption(
"--auth",
help="Auth to use for tests",
type=str,
choices=test_config.AUTH_CHOICES,
default=test_config.AUTH_CONNECTION_STRING,
)
parser.addoption(
"--identity",
help="Identity (client type) to use for tests",
type=str,
choices=test_config.IDENTITY_CHOICES,
default=test_config.IDENTITY_DEVICE_CLIENT,
)
def pytest_configure(config):
test_config.config.transport = config.getoption("transport")
test_config.config.auth = config.getoption("auth")
test_config.config.identity = config.getoption("identity")

15
device_e2e/const.py Normal file
Просмотреть файл

@ -0,0 +1,15 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
JSON_CONTENT_TYPE = "application/json"
JSON_CONTENT_ENCODING = "utf-8"
TEST_CONTENT = "testContent"
REPORTED = "reported"
DESIRED = "desired"
# properties of the eventhub message
EVENTHUB_SYSPROP_CONTENT_TYPE = "content-type"
EVENTHUB_SYSPROP_CONTENT_ENCODING = "content-encoding"
EVENTHUB_SYSPROP_DT_DATASCHEMA = "dt-dataschema"

97
device_e2e/drop.py Normal file
Просмотреть файл

@ -0,0 +1,97 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import logging
import subprocess
logger = logging.getLogger("e2e.{}".format(__name__))
mqtt_port = 8883
mqttws_port = 443
uninitialized = "uninitialized"
sudo_prefix = uninitialized
all_disconnect_types = ["DROP", "REJECT"]
all_transports = ["mqtt", "mqttws"]
def get_sudo_prefix():
"""
Get the prefix for running sudo commands. If the sudo binary doens't exist, then
we assume that we're running in a container or somewhere else where we don't
need to use sudo to elevate our process.
"""
global sudo_prefix
# use "uninitialized" to mean uninitialized, because None and [] are both falsy and we want to set it to [], so we can't use None
if sudo_prefix == uninitialized:
try:
run_shell_command("which sudo")
except subprocess.CalledProcessError:
sudo_prefix = ""
else:
sudo_prefix = "sudo -n "
return sudo_prefix
def run_shell_command(cmd):
"""
Run a shell command and raise an exception on error
"""
logger.info("running [{}]".format(cmd))
try:
return subprocess.check_output(cmd.split(" ")).decode("utf-8").splitlines()
except subprocess.CalledProcessError as e:
logger.error("Error spawning {}".format(e.cmd))
logger.error("Process returned {}".format(e.returncode))
logger.error("process output: {}".format(e.output))
raise
def transport_to_port(transport):
"""
Given a transport, return the port that the transport uses.
"""
if transport == "mqtt":
return mqtt_port
elif transport == "mqttws":
return mqttws_port
else:
raise ValueError(
"transport_type {} invalid. Only mqtt and mqttws are accepted".format(transport)
)
def disconnect_port(disconnect_type, transport):
"""
Disconnect the port for a given transport. disconnect_type can either be "DROP" to drop
packets sent to that port, or it can be "REJECT" to reject packets sent to that port.
"""
# sudo -n iptables -A OUTPUT -p tcp --dport 8883 -j DROP
port = transport_to_port(transport)
run_shell_command(
"{}iptables -A OUTPUT -p tcp --dport {} -j {}".format(
get_sudo_prefix(), port, disconnect_type
)
)
def reconnect_all(transport):
"""
Reconnect all disconnects for all ports used by all transports. Effectively, clean up
anyting that this module may have done.
"""
port = transport_to_port(transport)
for disconnect_type in all_disconnect_types:
# sudo -n iptables -L OUTPUT -n -v --line-numbers
lines = run_shell_command(
"{}iptables -L OUTPUT -n -v --line-numbers".format(get_sudo_prefix())
)
# do the lines in reverse because deleting an entry changes the line numbers of all entries after that.
lines.reverse()
for line in lines:
if disconnect_type in line and str(port) in line:
line_number = line.split(" ")[0]
logger.info("Removing {} from [{}]".format(line_number, line))
# sudo -n iptables -D OUTPUT 1
run_shell_command("{}iptables -D OUTPUT {}".format(get_sudo_prefix(), line_number))

Просмотреть файл

@ -0,0 +1,32 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import drop
import logging
logger = logging.getLogger(__name__)
drop.reconnect_all("mqtt")
class Dropper(object):
def __init__(self, transport):
self.transport = transport
def drop_outgoing(self):
drop.disconnect_port("DROP", self.transport)
def reject_outgoing(self):
drop.disconnect_port("REJECT", self.transport)
def restore_all(self):
drop.reconnect_all(self.transport)
@pytest.fixture(scope="function")
def dropper(transport):
dropper = Dropper(transport)
yield dropper
logger.info("restoring all")
dropper.restore_all()

Просмотреть файл

@ -0,0 +1,51 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import six
import os
import json
if six.PY2:
FileNotFoundError = IOError
secrets = None
this_file_path = os.path.dirname(os.path.realpath(__file__))
test_path = this_file_path
while secrets is None:
filename = os.path.join(test_path, "_e2e_settings.json")
try:
with open(filename, "r") as f:
secrets = json.load(f)
print("settings loaded from {}".format(filename))
except FileNotFoundError:
new_test_path = os.path.dirname(test_path)
if new_test_path == test_path:
raise Exception("_e2e_settings.json not found in {} or parent".format(this_file_path))
test_path = new_test_path
# Device ID used when running tests
DEVICE_ID = secrets.get("deviceId", None)
# Connection string for the iothub instance
IOTHUB_CONNECTION_STRING = secrets.get("iothubConnectionString", None)
# Connection string for the eventhub instance
EVENTHUB_CONNECTION_STRING = secrets.get("eventhubConnectionString", None)
# Consumer group used when monitoring eventhub events
EVENTHUB_CONSUMER_GROUP = secrets.get("eventhubConsumerGroup", None)
# Name of iothub. Probably DNS name for the hub without the azure-devices.net suffix
IOTHUB_NAME = secrets.get("iothubName", None)
# Connection string for device under test
DEVICE_CONNECTION_STRING = secrets.get("deviceConnectionString", None)
# Set default values
if not EVENTHUB_CONSUMER_GROUP:
EVENTHUB_CONSUMER_GROUP = "$default"
del secrets
del this_file_path
del test_path

Просмотреть файл

@ -0,0 +1,44 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
@pytest.fixture(scope="session")
def pnp_model_id():
return "dtmi:com:example:TemperatureController;2"
@pytest.fixture
def pnp_command_name():
return "this_is_my_command_name"
@pytest.fixture
def pnp_component_name():
return "this_is_my_component_name"
@pytest.fixture
def pnp_command_response_status():
return 299
@pytest.fixture
def pnp_writable_property_name():
return "writable_property_2"
@pytest.fixture
def pnp_read_only_property_name():
return "read_only_property"
@pytest.fixture
def pnp_ack_code():
return 266
@pytest.fixture
def pnp_ack_description():
return "this is an ack description"

11
device_e2e/pytest.ini Normal file
Просмотреть файл

@ -0,0 +1,11 @@
[pytest]
timeout=180
testdox_format=plaintext
addopts=
--testdox
--strict-markers
-m "not dropped_connection and not pnp"
norecursedirs=__pycache__, *.egg-info
markers=
pnp: includes tests for PNP functions
dropped_connection: includes tests that simplate dropped network connections (much slower)

Просмотреть файл

@ -0,0 +1,124 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
from service_helper_sync import ServiceHelperSync
class ServiceHelper:
def __init__(self, event_loop, executor):
self.event_loop = event_loop
self.executor = executor
self.inner_object = ServiceHelperSync()
@property
def device_id(self):
return self.inner_object.device_id
@property
def module_id(self):
return self.inner_object.module_id
def start_watching(self, device_id, module_id):
return self.inner_object.start_watching(device_id, module_id)
def stop_watching(self, device_id, module_id):
return self.inner_object.stop_watching(device_id, module_id)
async def get_next_incoming_event(self, device_id, module_id, block=True, timeout=None):
return await self.event_loop.run_in_executor(
self.executor,
self.inner_object.get_next_incoming_event,
device_id,
module_id,
block,
timeout,
)
async def set_desired_properties(self, device_id, module_id, desired_props):
return await self.event_loop.run_in_executor(
self.executor,
self.inner_object.set_desired_properties,
device_id,
module_id,
desired_props,
)
async def invoke_method(
self,
device_id,
module_id,
method_name,
payload,
connect_timeout_in_seconds=None,
response_timeout_in_seconds=None,
):
return await self.event_loop.run_in_executor(
self.executor,
self.inner_object.invoke_method,
device_id,
module_id,
method_name,
payload,
connect_timeout_in_seconds,
response_timeout_in_seconds,
)
async def invoke_pnp_command(
self,
device_id,
module_id,
component_name,
command_name,
payload,
connect_timeout_in_seconds=None,
response_timeout_in_seconds=None,
):
return await self.event_loop.run_in_executor(
self.executor,
self.inner_object.invoke_pnp_command,
device_id,
module_id,
component_name,
command_name,
payload,
connect_timeout_in_seconds,
response_timeout_in_seconds,
)
async def get_pnp_properties(self, device_id, module_id):
return await self.event_loop.run_in_executor(
self.executor, self.inner_object.get_pnp_properties, device_id, module_id
)
async def update_pnp_properties(self, device_id, module_id, properties):
return await self.event_loop.run_in_executor(
self.executor, self.inner_object.update_pnp_properties, device_id, module_id, properties
)
async def send_c2d(self, device_id, module_id, payload, properties):
return await self.event_loop.run_in_executor(
self.executor, self.inner_object.send_c2d, device_id, module_id, payload, properties
)
async def get_next_eventhub_arrival(self, device_id, module_id, block=True, timeout=None):
return await self.event_loop.run_in_executor(
self.executor,
self.inner_object.get_next_eventhub_arrival,
device_id,
module_id,
block,
timeout,
)
async def get_next_reported_patch_arrival(self, device_id, module_id, block=True, timeout=None):
return await self.event_loop.run_in_executor(
self.executor,
self.inner_object.get_next_reported_patch_arrival,
device_id,
module_id,
block,
timeout,
)
async def shutdown(self):
return await self.event_loop.run_in_executor(self.executor, self.inner_object.shutdown)

Просмотреть файл

@ -0,0 +1,319 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import logging
import threading
from six.moves import queue
import copy
from concurrent.futures import ThreadPoolExecutor
from azure.iot.hub import IoTHubRegistryManager, DigitalTwinClient
from azure.iot.hub.protocol.models import Twin, TwinProperties, CloudToDeviceMethod
from azure.eventhub import EventHubConsumerClient
import e2e_settings
logger = logging.getLogger("e2e.{}".format(__name__))
iothub_connection_string = e2e_settings.IOTHUB_CONNECTION_STRING
iothub_name = e2e_settings.IOTHUB_NAME
eventhub_connection_string = e2e_settings.EVENTHUB_CONNECTION_STRING
eventhub_consumer_group = e2e_settings.EVENTHUB_CONSUMER_GROUP
assert iothub_connection_string
assert iothub_name
assert eventhub_connection_string
assert eventhub_consumer_group
def convert_binary_dict_to_string_dict(src):
def binary_to_string(x):
if isinstance(x, bytes):
return x.decode("utf-8")
else:
return x
if src:
dest = {}
for key, value in src.items():
dest[binary_to_string(key)] = binary_to_string(value)
return dest
else:
return src
def get_device_id_from_event(event):
"""
Helper function to get the device_id from an EventHub message
"""
return event.message.annotations["iothub-connection-device-id".encode()].decode()
def get_message_source_from_event(event):
"""
Helper function to get the message source from an EventHub message
"""
return event.message.annotations["iothub-message-source".encode()].decode()
class C2dMessage(object):
def __init__(self):
self.device_id = None
self.module_id = None
self.message_body = None
self.content_type = None
self.system_properties = None
self.properties = None
class PerClientData(object):
"""
Object that holds data that needs to be stored in a device-by-device basis
"""
def __init__(self, device_id, module_id):
self.device_id = device_id
self.module_id = module_id
self.incoming_event_queue = queue.Queue()
self.incoming_patch_queue = queue.Queue()
class ClientList(object):
"""
Thread-safe object for holding a dictionary of PerDeviceData objects.
"""
def __init__(self):
self.lock = threading.Lock()
self.values = {}
def add(self, device_id, module_id, value):
"""
Add a new object to the dict
"""
key = self.get_key(device_id, module_id)
with self.lock:
self.values[key] = value
def remove(self, device_id, module_id):
"""
remove an object from the dict
"""
key = self.get_key(device_id, module_id)
with self.lock:
if key in self.values:
del self.values[key]
def try_get(self, device_id, module_id):
"""
Try to get an object from the dict, returning None if the object doesn't exist
"""
key = self.get_key(device_id, module_id)
with self.lock:
return self.values.get(key, None)
def get_or_create(self, device_id, module_id):
key = self.get_key(device_id, module_id)
with self.lock:
if key in self.values:
return self.values.get(key)
else:
value = PerClientData(device_id, module_id)
self.values[key] = value
return value
def get_key(self, device_id, module_id):
return "{}%{}".format(device_id, module_id)
def get_keys(self):
"""
Get a list of keys for the objects in the dict
"""
with self.lock:
return list(self.values.keys())
def get_incoming_event_queue(self, device_id, module_id):
client_data = self.try_get(device_id, module_id)
if client_data:
return client_data.incoming_event_queue
def get_incoming_patch_queue(self, device_id, module_id):
client_data = self.try_get(device_id, module_id)
if client_data:
return client_data.incoming_patch_queue
class ServiceHelperSync(object):
def __init__(self):
self._client_list = ClientList()
self._executor = ThreadPoolExecutor()
self._registry_manager = IoTHubRegistryManager(iothub_connection_string)
self._digital_twin_client = DigitalTwinClient.from_connection_string(
iothub_connection_string
)
self._eventhub_consumer_client = EventHubConsumerClient.from_connection_string(
eventhub_connection_string, consumer_group=eventhub_consumer_group
)
self._eventhub_future = self._executor.submit(self._eventhub_thread)
def start_watching(self, device_id, module_id):
self._client_list.get_or_create(device_id, module_id)
def stop_watching(self, device_id, module_id):
self._client_list.remove(device_id, module_id)
def get_next_incoming_event(self, device_id, module_id, block=True, timeout=None):
return self._client_list.get_incoming_event_queue.get(block=block, timeout=timeout)
def set_desired_properties(self, device_id, module_id, desired_props):
if module_id:
self._registry_manager.update_module_twin(
device_id, module_id, Twin(properties=TwinProperties(desired=desired_props)), "*"
)
else:
self._registry_manager.update_twin(
device_id, Twin(properties=TwinProperties(desired=desired_props)), "*"
)
def invoke_method(
self,
device_id,
module_id,
method_name,
payload,
connect_timeout_in_seconds=None,
response_timeout_in_seconds=None,
):
request = CloudToDeviceMethod(
method_name=method_name,
payload=payload,
response_timeout_in_seconds=response_timeout_in_seconds,
connect_timeout_in_seconds=connect_timeout_in_seconds,
)
if module_id:
response = self._registry_manager.invoke_device_module_method(
device_id, module_id, request
)
else:
response = self._registry_manager.invoke_device_method(device_id, request)
return response
def invoke_pnp_command(
self,
device_id,
module_id,
component_name,
command_name,
payload,
connect_timeout_in_seconds=None,
response_timeout_in_seconds=None,
):
assert not module_id # TODO
if component_name:
return self._digital_twin_client.invoke_component_command(
device_id,
component_name,
command_name,
payload,
connect_timeout_in_seconds,
response_timeout_in_seconds,
)
else:
return self._digital_twin_client.invoke_command(
device_id,
command_name,
payload,
connect_timeout_in_seconds,
response_timeout_in_seconds,
)
def get_pnp_properties(self, device_id, module_id):
assert not module_id # TODO
return self._digital_twin_client.get_digital_twin(device_id)
def update_pnp_properties(self, device_id, module_id, properties):
assert not module_id # TODO
return self._digital_twin_client.update_digital_twin(device_id, properties)
def send_c2d(self, device_id, module_id, payload, properties):
assert not module_id # TODO
self._registry_manager.send_c2d_message(device_id, payload, properties)
def get_next_eventhub_arrival(self, device_id, module_id, block=True, timeout=None):
return self._client_list.get_incoming_event_queue(device_id, module_id).get(
block=block, timeout=timeout
)
def get_next_reported_patch_arrival(self, device_id, module_id, block=True, timeout=None):
return self._client_list.get_incoming_patch_queue(device_id, module_id).get(
block=block, timeout=timeout
)
def shutdown(self):
if self._eventhub_consumer_client:
self._eventhub_consumer_client.close()
def _convert_incoming_event(self, event):
event_body = event.body_as_json()
device_id = get_device_id_from_event(event)
module_id = None # TODO: extract module_id
if get_message_source_from_event(event) == "twinChangeEvents":
return copy.deepcopy(event_body.get("properties", {}))
else:
message = C2dMessage()
message.device_id = device_id
message.module_id = module_id
message.message_body = event_body
message.content_type = event.message.properties.content_type.decode("utf-8")
message.system_properties = convert_binary_dict_to_string_dict(event.system_properties)
message.properties = convert_binary_dict_to_string_dict(event.properties)
return message
def _eventhub_thread(self):
def on_error(partition_context, error):
logger.error("EventHub on_error: {}".format(str(error) or type(error)))
def on_partition_initialize(partition_context):
logger.warning("EventHub on_partition_initialize")
def on_partition_close(partition_context, reason):
# commented out because it causes ugly warning spew on shutdown
# logger.warning("EventHub on_partition_close: {}".format(reason))
pass
def on_event(partition_context, event):
if event:
device_id = get_device_id_from_event(event)
module_id = None # TODO: extract module_id
if get_message_source_from_event(event) == "twinChangeEvents":
queue = self._client_list.get_incoming_patch_queue(device_id, module_id)
else:
queue = self._client_list.get_incoming_event_queue(device_id, module_id)
if queue:
logger.info(
"Received {} for device {}, module {}".format(
get_message_source_from_event(event), device_id, module_id
)
)
queue.put(self._convert_incoming_event(event))
try:
with self._eventhub_consumer_client:
logger.info("Starting EventHub receive")
self._eventhub_consumer_client.receive(
on_event,
on_error=on_error,
on_partition_initialize=on_partition_initialize,
on_partition_close=on_partition_close,
max_wait_time=3600,
)
except Exception:
logger.error("_eventhub_thread exception", exc_info=True)
raise

Просмотреть файл

@ -0,0 +1,73 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import functools
import time
import e2e_settings
import test_config
import logging
from service_helper_sync import ServiceHelperSync
from azure.iot.device.iothub import IoTHubDeviceClient, IoTHubModuleClient
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
@pytest.fixture(scope="function")
def brand_new_client(client_kwargs):
client = None
if test_config.config.identity == test_config.IDENTITY_DEVICE_CLIENT:
ClientClass = IoTHubDeviceClient
elif test_config.config.identity == test_config.IDENTITY_MODULE_CLIENT:
ClientClass = IoTHubModuleClient
else:
raise Exception("config.identity invalid")
if test_config.config.auth == test_config.AUTH_CONNECTION_STRING:
# TODO: This is currently using a connection string stored in _e2e_settings.xml. This will move to be a dynamically created identity similar to the way node's device_identity_helper.js works.
logger.info(
"Creating {} using create_from_connection_string with kwargs={}".format(
ClientClass, client_kwargs
)
)
client = ClientClass.create_from_connection_string(
e2e_settings.DEVICE_CONNECTION_STRING, **client_kwargs
)
elif test_config.config.auth == test_config.X509:
# need to implement
raise Exception("X509 Auth not yet implemented")
else:
raise Exception("config.auth invalid")
yield client
client.shutdown()
@pytest.fixture(scope="function")
def client(brand_new_client):
client = brand_new_client
client.connect()
yield client
@pytest.fixture(scope="module")
def service_helper():
service_helper = ServiceHelperSync()
time.sleep(1)
yield service_helper
service_helper.shutdown()
@pytest.fixture(scope="function")
def get_next_eventhub_arrival(service_helper, device_id, module_id, watches_events):
yield functools.partial(service_helper.get_next_eventhub_arrival, device_id, module_id)
@pytest.fixture(scope="function")
def get_next_reported_patch_arrival(executor, service_helper, device_id, module_id, watches_events):
yield functools.partial(service_helper.get_next_reported_patch_arrival, device_id, module_id)

Просмотреть файл

@ -0,0 +1,39 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import logging
import json
import threading
from utils import get_random_dict
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
# TODO: add tests for various application properties
# TODO: is there a way to call send_c2d so it arrives as an object rather than a JSON string?
@pytest.mark.describe("Device Client C2d")
class TestSendMessage(object):
@pytest.mark.it("Can receive C2D")
def test_send_message(self, client, service_helper, device_id, module_id):
message = json.dumps(get_random_dict())
received = threading.Event()
# hack needed because there is no `nonlocal` keyword in py27.
nonlocal_py27_hack = {"received_msg": None, "received": received}
def handle_on_message_received(message):
logger.info("received {}".format(message))
nonlocal_py27_hack["received_message"] = message
nonlocal_py27_hack["received"].set()
client.on_message_received = handle_on_message_received
service_helper.send_c2d(device_id, module_id, message, {})
received.wait(timeout=10)
assert nonlocal_py27_hack["received_message"].data.decode("utf-8") == message

Просмотреть файл

@ -0,0 +1,56 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import logging
import time
import test_config
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
@pytest.mark.describe("Device Client")
class TestConnectDisconnect(object):
@pytest.mark.it("Can disconnect and reconnect")
@pytest.mark.parametrize(*test_config.connection_retry_disabled_and_enabled)
@pytest.mark.parametrize(*test_config.auto_connect_off_and_on)
def test_connect_disconnect(self, brand_new_client):
client = brand_new_client
client.connect()
assert client.connected
client.disconnect()
assert not client.connected
client.connect()
assert client.connected
@pytest.mark.dropped_connection
@pytest.mark.describe("Device Client with dropped connection")
class TestConnectDisconnectDroppedConnection(object):
@pytest.fixture(scope="class")
def extra_client_kwargs(self):
return {"keep_alive": 5}
@pytest.mark.it("disconnects when network drops all outgoing packets")
def test_disconnect_on_drop_outgoing(self, client, dropper):
client.connect()
assert client.connected
dropper.drop_outgoing()
while client.connected:
time.sleep(1)
@pytest.mark.it("disconnects when network rejects all outgoing packets")
def test_disconnect_on_reject_outgoing(self, client, dropper):
client.connect()
assert client.connected
dropper.reject_outgoing()
while client.connected:
time.sleep(1)

Просмотреть файл

@ -0,0 +1,94 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import logging
import time
from utils import get_random_dict
from azure.iot.device.iothub import MethodResponse
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
@pytest.fixture
def method_name():
return "this_is_my_method_name"
@pytest.fixture
def method_response_status():
return 299
@pytest.mark.describe("Device Client methods")
class TestMethods(object):
@pytest.mark.it("Can handle a simple direct method call")
@pytest.mark.parametrize(
"include_request_payload",
[
pytest.param(True, id="with request payload"),
pytest.param(False, id="without request payload"),
],
)
@pytest.mark.parametrize(
"include_response_payload",
[
pytest.param(True, id="with response payload"),
pytest.param(False, id="without response payload"),
],
)
def test_handle_method_call(
self,
client,
method_name,
device_id,
module_id,
method_response_status,
include_request_payload,
include_response_payload,
service_helper,
):
if include_request_payload:
request_payload = get_random_dict()
else:
request_payload = None
if include_response_payload:
response_payload = get_random_dict()
else:
response_payload = None
# hack needed because there is no `nonlocal` keyword in py27.
nonlocal_py27_hack = {"actual_request": None}
def handle_on_method_request_received(request):
logger.info("Method request for {} received".format(request.name))
nonlocal_py27_hack["actual_request"] = request
logger.info("Sending response")
client.send_method_response(
MethodResponse.create_from_method_request(
request, method_response_status, response_payload
)
)
client.on_method_request_received = handle_on_method_request_received
time.sleep(1) # wait for subscribe, etc, to complete
# invoke the method call
method_response = service_helper.invoke_method(
device_id, module_id, method_name, request_payload
)
# verify that the method request arrived correctly
actual_request = nonlocal_py27_hack["actual_request"]
assert actual_request.name == method_name
if request_payload:
assert actual_request.payload == request_payload
else:
assert not actual_request.payload
# and make sure the response came back successfully
assert method_response.status == method_response_status
assert method_response.payload == response_payload

Просмотреть файл

@ -0,0 +1,76 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import json
import logging
import threading
import test_config
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
@pytest.mark.describe("Client sas renewal code")
class TestSasRenewalReconnectEnabled(object):
@pytest.fixture(scope="class")
def extra_client_kwargs(self):
# should renew after 10 seconds
return {"sastoken_ttl": 130}
@pytest.mark.it("Renews and reconnects before expiry")
@pytest.mark.parametrize(*test_config.connection_retry_disabled_and_enabled)
@pytest.mark.parametrize(*test_config.auto_connect_off_and_on)
def test_sas_renews(self, client, get_next_eventhub_arrival, random_message):
connected_event = threading.Event()
disconnected_event = threading.Event()
token_object = client._mqtt_pipeline._pipeline.pipeline_configuration.sastoken
# hack needed because there is no `nonlocal` keyword in py27.
nonlocal_py27_hack = {"token_at_connect_time": None}
def handle_on_connection_state_change():
logger.info("handle_on_connection_state_change: {}".format(client.connected))
if client.connected:
nonlocal_py27_hack["token_at_connect_time"] = str(token_object)
logger.info("saving token: {}".format(nonlocal_py27_hack["token_at_connect_time"]))
connected_event.set()
else:
disconnected_event.set()
client.on_connection_state_change = handle_on_connection_state_change
# setting on_connection_state_change seems to have the side effect of
# calling handle_on_connection_state_change once with the initial value.
# Wait for one disconnect/reconnect cycle so we can get past it.
connected_event.wait()
# OK, we're ready to test. wait for the renewal
token_before_connect = str(token_object)
disconnected_event.clear()
connected_event.clear()
logger.info("Waiting for client to disconnect")
disconnected_event.wait()
logger.info("Waiting for client to reconnect")
connected_event.wait()
logger.info("Client reconnected")
# Finally verify that our token changed.
token_at_connect_time = nonlocal_py27_hack["token_at_connect_time"]
logger.info("token now = {}".format(str(token_object)))
logger.info("token at_connect = {}".format(str(token_at_connect_time)))
logger.info("token before_connect = {}".format(str(token_before_connect)))
assert str(token_object) == token_at_connect_time
assert not token_before_connect == token_at_connect_time
# and verify that we can send
client.send_message(random_message)
# and verify that the message arrived at the service
event = get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data

Просмотреть файл

@ -0,0 +1,167 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import logging
import json
import time
from azure.iot.device.exceptions import OperationCancelled
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
@pytest.mark.describe("Device Client send_message method")
class TestSendMessage(object):
@pytest.mark.it("Can send a simple message")
def test_send_message(self, client, random_message, get_next_eventhub_arrival):
client.send_message(random_message)
event = get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.it("Connects the transport if necessary")
def test_connect_if_necessary(self, client, random_message, get_next_eventhub_arrival):
client.disconnect()
assert not client.connected
client.send_message(random_message)
assert client.connected
event = get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.dropped_connection
@pytest.mark.describe("Device Client send_message method with dropped connections")
class TestSendMessageDroppedConnection(object):
@pytest.fixture(scope="class")
def extra_client_kwargs(self):
return {"keep_alive": 5}
@pytest.mark.it("Sends if connection drops before sending")
def test_sends_if_drop_before_sending(
self, client, random_message, dropper, get_next_eventhub_arrival, executor
):
assert client.connected
dropper.drop_outgoing()
send_task = executor.submit(client.send_message, random_message)
while client.connected:
time.sleep(1)
assert not send_task.done()
dropper.restore_all()
while not client.connected:
time.sleep(1)
send_task.result()
event = get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.it("Sends if connection rejects send")
def test_sends_if_reject_before_sending(
self, client, random_message, dropper, get_next_eventhub_arrival, executor
):
assert client.connected
dropper.reject_outgoing()
send_task = executor.submit(client.send_message, random_message)
while client.connected:
time.sleep(1)
assert not send_task.done()
dropper.restore_all()
while not client.connected:
time.sleep(1)
send_task.result()
event = get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.describe("Device Client send_message with reconnect disabled")
class TestSendMessageRetryDisabled(object):
@pytest.fixture(scope="class")
def extra_client_kwargs(self):
return {"keep_alive": 5, "connection_retry": False}
@pytest.fixture(scope="function", autouse=True)
def reconnect_after_test(self, dropper, client):
yield
dropper.restore_all()
client.connect()
assert client.connected
@pytest.mark.it("Can send a simple message")
def test_send_message(self, client, random_message, get_next_eventhub_arrival):
client.send_message(random_message)
event = get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.it("Automatically connects if transport manually disconnected before sending")
def test_connect_if_necessary(self, client, random_message, get_next_eventhub_arrival):
client.disconnect()
assert not client.connected
client.send_message(random_message)
assert client.connected
event = get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.it("Automatically connects if transport automatically disconnected before sending")
def test_connects_after_automatic_disconnect(
self, client, random_message, dropper, get_next_eventhub_arrival
):
assert client.connected
dropper.drop_outgoing()
while client.connected:
time.sleep(1)
assert not client.connected
dropper.restore_all()
client.send_message(random_message)
assert client.connected
event = get_next_eventhub_arrival()
assert json.dumps(event.message_body) == random_message.data
@pytest.mark.it("Fails if connection disconnects before sending")
def test_fails_if_disconnect_before_sending(self, client, random_message, dropper, executor):
assert client.connected
dropper.drop_outgoing()
send_task = executor.submit(client.send_message, random_message)
while client.connected:
time.sleep(1)
with pytest.raises(OperationCancelled):
send_task.result()
@pytest.mark.it("Fails if connection drops before sending")
def test_fails_if_drop_before_sending(self, client, random_message, dropper):
assert client.connected
dropper.drop_outgoing()
with pytest.raises(OperationCancelled):
client.send_message(random_message)
assert not client.connected

Просмотреть файл

@ -0,0 +1,171 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import logging
import time
import threading
import const
from utils import get_random_dict
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
# TODO: tests with drop_incoming and reject_incoming
reset_reported_props = {const.TEST_CONTENT: None}
@pytest.mark.describe("Device Client Reported Properties")
class TestReportedProperties(object):
@pytest.mark.it("Can set a simple reported property")
def test_simple_patch(self, client, reported_props, get_next_reported_patch_arrival):
# patch properties
client.patch_twin_reported_properties(reported_props)
# wait for patch to arrive at service and verify
received_patch = get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
)
# get twin from the service and verify content
twin = client.get_twin()
assert twin[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
@pytest.mark.it("Can clear a reported property")
def test_clear_property(self, client, reported_props, get_next_reported_patch_arrival):
# patch properties and verify that the service received the patch
client.patch_twin_reported_properties(reported_props)
received_patch = get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
)
# send a patch clearing properties and verify that the service received that patch
client.patch_twin_reported_properties(reset_reported_props)
received_patch = get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT]
== reset_reported_props[const.TEST_CONTENT]
)
# get the twin and verify that the properties are no longer part of the twin
twin = client.get_twin()
assert const.TEST_CONTENT not in twin[const.REPORTED]
@pytest.mark.it("Connects the transport if necessary")
def test_connect_if_necessary(self, client, reported_props, get_next_reported_patch_arrival):
client.disconnect()
assert not client.connected
client.patch_twin_reported_properties(reported_props)
assert client.connected
received_patch = get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
)
twin = client.get_twin()
assert twin[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
@pytest.mark.dropped_connection
@pytest.mark.describe("Device Client Reported Properties with dropped connection")
class TestReportedPropertiesDroppedConnection(object):
@pytest.fixture(scope="class")
def extra_client_kwargs(self):
return {"keep_alive": 5}
# TODO: split drop tests between first and second patches
@pytest.mark.it("Sends if connection drops before sending")
def test_sends_if_drop_before_sending(
self, client, reported_props, dropper, get_next_reported_patch_arrival, executor
):
assert client.connected
dropper.drop_outgoing()
send_task = executor.submit(client.patch_twin_reported_properties, reported_props)
while client.connected:
time.sleep(1)
assert not send_task.done()
dropper.restore_all()
while not client.connected:
time.sleep(1)
send_task.result()
received_patch = get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
)
@pytest.mark.it("Sends if connection rejects send")
def test_sends_if_reject_before_sending(
self, client, reported_props, dropper, get_next_reported_patch_arrival, executor
):
assert client.connected
dropper.reject_outgoing()
send_task = executor.submit(client.patch_twin_reported_properties, reported_props)
while client.connected:
time.sleep(1)
assert not send_task.done()
dropper.restore_all()
while not client.connected:
time.sleep(1)
send_task.result()
received_patch = get_next_reported_patch_arrival()
assert (
received_patch[const.REPORTED][const.TEST_CONTENT] == reported_props[const.TEST_CONTENT]
)
@pytest.mark.describe("Device Client Desired Properties")
class TestDesiredProperties(object):
@pytest.mark.it("Receives a patch for a simple desired property")
def test_simple_patch(self, client, service_helper, device_id, module_id):
received = threading.Event()
# hack needed because there is no `nonlocal` keyword in py27.
nonlocal_py27_hack = {"received_patch": None, "received": received}
def handle_on_patch_received(patch):
print("received {}".format(patch))
nonlocal_py27_hack["received_patch"] = patch
nonlocal_py27_hack["received"].set()
client.on_twin_desired_properties_patch_received = handle_on_patch_received
random_dict = get_random_dict()
service_helper.set_desired_properties(
device_id,
module_id,
{const.TEST_CONTENT: random_dict},
)
received.wait(timeout=10)
logger.info("got it")
assert nonlocal_py27_hack["received_patch"][const.TEST_CONTENT] == random_dict
twin = client.get_twin()
assert twin[const.DESIRED][const.TEST_CONTENT] == random_dict
# TODO: etag tests, version tests

42
device_e2e/test_config.py Normal file
Просмотреть файл

@ -0,0 +1,42 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
TRANSPORT_MQTT = "mqtt"
TRANSPORT_MQTT_WS = "mqttws"
TRANSPORT_CHOICES = [TRANSPORT_MQTT, TRANSPORT_MQTT_WS]
IDENTITY_DEVICE_CLIENT = "deviceclient"
IDENTITY_MODULE_CLIENT = "module_client"
IDENTITY_CHOICES = [IDENTITY_DEVICE_CLIENT, IDENTITY_MODULE_CLIENT]
AUTH_CONNECTION_STRING = "connection_string"
AUTH_X509 = "x509"
AUTH_CHOICES = [AUTH_CONNECTION_STRING, AUTH_X509]
class Config(object):
def __init__(self):
self.transport = TRANSPORT_MQTT
self.identity = IDENTITY_DEVICE_CLIENT
self.auth = AUTH_CONNECTION_STRING
config = Config()
connection_retry_disabled_and_enabled = [
"connection_retry",
[
pytest.param(True, id="connection_retry enabled"),
pytest.param(False, id="connection_retry disabled"),
],
]
auto_connect_off_and_on = [
"auto_connect",
[
pytest.param(True, id="auto_connect enabled"),
pytest.param(False, id="auto_connect disabled"),
],
]

42
device_e2e/utils.py Normal file
Просмотреть файл

@ -0,0 +1,42 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import random
import string
import json
import uuid
import const
from azure.iot.device.iothub import Message
def get_random_dict():
return {
"random_guid": str(uuid.uuid4()),
"sub_object": {
"string_value": "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(10)
),
"bool_value": random.random() > 0.5,
"int_value": random.randint(-65535, 65535),
},
}
def get_random_message():
message = Message(json.dumps(get_random_dict()))
message.content_type = const.JSON_CONTENT_TYPE
message.content_encoding = const.JSON_CONTENT_ENCODING
return message
def make_pnp_desired_property_patch(component_name, property_name, property_value):
if component_name:
return [
{
"op": "add",
"path": "/{}".format(component_name),
"value": {property_name: property_value, "$metadata": {}},
}
]
else:
return [{"op": "add", "path": "/{}".format(property_name), "value": property_value}]