зеркало из
1
0
Форкнуть 0

Test: Tests for gate and for fuzzing (#1025)

* Tests for gate and for fuzzing

* Regression test for GitHub issue #990

* update 990 regression app to include more scenarios

* move files and add documentation

* install test utilities for e2e tests

* add test_utils for test-only config

* move iptables to test_utils

* roll back change in iptables logging

* code review feedback

* fix spelling mistakes
This commit is contained in:
Bert Kleewein 2022-08-16 09:15:27 -07:00 коммит произвёл GitHub
Родитель 60535b5554
Коммит c91cb47f83
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
34 изменённых файлов: 1411 добавлений и 229 удалений

Просмотреть файл

@ -0,0 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
from .random_content import get_random_dict, get_random_message, get_random_string # noqa: F401
from .service_helper import ServiceHelper # noqa: F401
from .service_helper_sync import ServiceHelperSync # noqa: F401

Просмотреть файл

@ -4,7 +4,7 @@
import logging
import subprocess
import socket
from utils import is_windows
import sys
logger = logging.getLogger("e2e.{}".format(__name__))
@ -84,7 +84,7 @@ def reconnect_all(transport, host):
Reconnect all disconnects for this host and transport. Effectively, clean up
anything that this module may have done.
"""
if not is_windows():
if not sys.platform.startswith("win"):
ip = get_ip(host)
port = transport_to_port(transport)
for disconnect_type in all_disconnect_types:

Просмотреть файл

@ -0,0 +1,205 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import functools
import inspect
import threading
# List of Paho functions to add logging to
PAHO_FUNCTIONS_TO_HOOK = {
"connect": True,
"disconnect": True,
"enable_logger": True,
"loop_start": True,
"loop_stop": True,
"on_connect": True,
"on_disconnect": True,
"on_message": True,
"on_publish": True,
"on_subscribe": True,
"on_unsubscribe": True,
"proxy_set": True,
"publish": True,
"reconnect_delay_set": True,
"subscribe": True,
"tls_set_context": True,
"unsubscribe": True,
"username_pw_set": False,
"ws_set_options": True,
}
# List of device/module client functions to add logging to
DEVICE_CLIENT_FUNCTIONS_TO_HOOK = {
"shutdown": True,
"connect": True,
"disconnect": True,
"update_sastoken": True,
"send_message": True,
"receive_method_request": True,
"send_method_response": True,
"get_twin": True,
"patch_twin_reported_properties": True,
}
# lock for synchronizing multithreaded access to call_index and indent_count
global_lock = threading.Lock()
# running count of calls that are being logged. Included with the log output so readers can match calls and returns
call_index = 0
# count of indent levels for calls. Used to indent logs so calls and returns can be visually matched.
indent_count = 0
def _get_next_call_index():
"""
Get an index for function calls where each function call gets a new index #. This can be used
to correlate calls with return.
"""
global global_lock, call_index
with global_lock:
call_index += 1
return call_index
def _indent():
"""
increment the indent and return a string that can be used for indenting logging lines.
"""
global global_lock, indent_count
with global_lock:
indent_count += 1
return " " * indent_count
def _unindent():
"""
decrement the indent and return a string that can be used for indenting logging lines.
"""
global global_lock, indent_count
with global_lock:
ret = " " * indent_count
indent_count -= 1
return ret
def add_logging_hook(obj, func_name, log_func, module_name, log_args=True):
"""
Add a logging hook to the given method
"""
def log_call(index, args, kwargs):
if log_args:
log_func(
"{indent}{module_name}-{index}: calling {func_name} with {args}, {kwargs}".format(
indent=_indent(),
module_name=module_name,
index=index,
func_name=func_name,
args=args,
kwargs=kwargs,
)
)
else:
log_func(
"{indent}{module_name}-{index}: calling {func_name} with <REDACTED>".format(
indent=_indent(), module_name=module_name, index=index, func_name=func_name
)
)
def log_return(index, ret):
log_func(
"{indent}{module_name}-{index}: {func_name} returned {ret}".format(
indent=_unindent(),
module_name=module_name,
index=index,
func_name=func_name,
ret=ret,
)
)
def log_exception(index, e):
log_func(
"{indent}{module_name}-{index}: {func_name} RAISED {exc}".format(
indent=_unindent(),
module_name=module_name,
index=index,
func_name=func_name,
exc=str(e) or type(e),
)
)
func_or_coro = getattr(obj, func_name)
if (
inspect.isawaitable(func_or_coro)
or inspect.iscoroutine(func_or_coro)
or inspect.iscoroutinefunction(func_or_coro)
):
@functools.wraps(func_or_coro)
async def coro_wrapper(*args, **kwargs):
index = _get_next_call_index()
log_call(index, args, kwargs)
try:
ret = await func_or_coro(*args, **kwargs)
except Exception as e:
log_exception(index, e)
raise
else:
log_return(index, ret)
return ret
setattr(obj, func_name, coro_wrapper)
else:
@functools.wraps(func_or_coro)
def func_wrapper(*args, **kwargs):
index = _get_next_call_index()
log_call(index, args, kwargs)
try:
ret = func_or_coro(*args, **kwargs)
except Exception as e:
log_exception(index, e)
raise
else:
log_return(index, ret)
return ret
setattr(obj, func_name, func_wrapper)
def get_paho_from_device_client(device_client):
pipeline_root = device_client._mqtt_pipeline._pipeline
stage = pipeline_root
while stage.next:
stage = stage.next
return stage.transport._mqtt_client
def hook_device_client(device_client, log_func=print):
"""
Add logging to the given device client object.
"""
paho = get_paho_from_device_client(device_client)
for name in PAHO_FUNCTIONS_TO_HOOK:
add_logging_hook(
obj=paho,
func_name=name,
log_func=log_func,
module_name="Paho",
log_args=PAHO_FUNCTIONS_TO_HOOK[name],
)
for name in DEVICE_CLIENT_FUNCTIONS_TO_HOOK:
add_logging_hook(
obj=device_client,
func_name=name,
log_func=log_func,
module_name="device_client",
log_args=DEVICE_CLIENT_FUNCTIONS_TO_HOOK[name],
)

Просмотреть файл

@ -0,0 +1,47 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import random
import string
import json
import uuid
from azure.iot.device.iothub import Message
JSON_CONTENT_TYPE = "application/json"
JSON_CONTENT_ENCODING = "utf-8"
def get_random_string(length, random_length=False):
if random_length:
length = random.randint(0, length)
return "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
def get_random_dict(total_payload_length=0):
obj = {
"random_guid": str(uuid.uuid4()),
"sub_object": {
"string_value": get_random_string(10),
"bool_value": random.random() > 0.5,
"int_value": random.randint(-65535, 65535),
},
}
if total_payload_length:
length = len(json.dumps(obj))
extra_characters = total_payload_length - length - len(', "extra": ""')
if extra_characters > 0:
obj["extra"] = get_random_string(extra_characters)
assert len(json.dumps(obj)) == total_payload_length
return obj
def get_random_message(total_payload_length=0):
message = Message(json.dumps(get_random_dict(total_payload_length)))
message.content_type = JSON_CONTENT_TYPE
message.content_encoding = JSON_CONTENT_ENCODING
message.message_id = str(uuid.uuid4())
return message

Просмотреть файл

@ -0,0 +1,76 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
from .service_helper_sync import ServiceHelperSync
import asyncio
import concurrent.futures
class ServiceHelper:
def __init__(
self,
iothub_connection_string,
eventhub_connection_string,
eventhub_consumer_group,
event_loop=None,
executor=None,
):
self._event_loop = event_loop or asyncio.get_event_loop()
self._executor = executor or concurrent.futures.ThreadPoolExecutor()
self._inner_object = ServiceHelperSync(
iothub_connection_string, eventhub_connection_string, eventhub_consumer_group
)
def set_identity(self, device_id, module_id):
return self._inner_object.set_identity(device_id, module_id)
async def set_desired_properties(self, desired_props):
return await self._event_loop.run_in_executor(
self._executor,
self._inner_object.set_desired_properties,
desired_props,
)
async def invoke_method(
self,
method_name,
payload,
connect_timeout_in_seconds=None,
response_timeout_in_seconds=None,
):
return await self._event_loop.run_in_executor(
self._executor,
self._inner_object.invoke_method,
method_name,
payload,
connect_timeout_in_seconds,
response_timeout_in_seconds,
)
async def send_c2d(
self,
payload,
properties,
):
return await self._event_loop.run_in_executor(
self._executor, self._inner_object.send_c2d, payload, properties
)
async def wait_for_eventhub_arrival(self, message_id, timeout=60):
return await self._event_loop.run_in_executor(
self._executor,
self._inner_object.wait_for_eventhub_arrival,
message_id,
timeout,
)
async def get_next_reported_patch_arrival(self, block=True, timeout=20):
return await self._event_loop.run_in_executor(
self._executor,
self._inner_object.get_next_reported_patch_arrival,
block,
timeout,
)
async def shutdown(self):
return await self._event_loop.run_in_executor(self._executor, self._inner_object.shutdown)

Просмотреть файл

@ -11,20 +11,9 @@ from concurrent.futures import ThreadPoolExecutor
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.protocol.models import Twin, TwinProperties, CloudToDeviceMethod
from azure.eventhub import EventHubConsumerClient
import e2e_settings
logger = logging.getLogger("e2e.{}".format(__name__))
iothub_connection_string = e2e_settings.IOTHUB_CONNECTION_STRING
iothub_name = e2e_settings.IOTHUB_NAME
eventhub_connection_string = e2e_settings.EVENTHUB_CONNECTION_STRING
eventhub_consumer_group = e2e_settings.EVENTHUB_CONSUMER_GROUP
assert iothub_connection_string
assert iothub_name
assert eventhub_connection_string
assert eventhub_consumer_group
def convert_binary_dict_to_string_dict(src):
def binary_to_string(x):
@ -75,9 +64,20 @@ class EventhubEvent(object):
self.system_properties = None
self.properties = None
@property
def message_id(self):
# if message_id is missing, make one with a random guid. Do this because incoming_eventhub_events
# is a dict indexed on message_id
return self.system_properties.get("message-id", "no-message-id-{}".format(uuid.uuid4()))
class ServiceHelperSync(object):
def __init__(self):
def __init__(
self,
iothub_connection_string,
eventhub_connection_string,
eventhub_consumer_group,
):
self._executor = ThreadPoolExecutor()
self._registry_manager = IoTHubRegistryManager(iothub_connection_string)
@ -224,9 +224,7 @@ class ServiceHelperSync(object):
return message
def _store_eventhub_arrival(self, converted_event):
message_id = converted_event.system_properties.get(
"message-id", "no-message-id-{}".format(uuid.uuid4())
)
message_id = converted_event.message_id
if message_id:
with self.cv:
self.incoming_eventhub_events[message_id] = converted_event

Просмотреть файл

@ -0,0 +1,34 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import os
IOTHUB_CONNECTION_STRING = None
EVENTHUB_CONNECTION_STRING = None
IOTHUB_HOSTNAME = None
IOTHUB_NAME = None
EVENTHUB_CONSUMER_GROUP = None
DEVICE_CONNECTION_STRING = None
if "IOTHUB_E2E_IOTHUB_CONNECTION_STRING" in os.environ:
IOTHUB_CONNECTION_STRING = os.environ["IOTHUB_E2E_IOTHUB_CONNECTION_STRING"]
EVENTHUB_CONNECTION_STRING = os.environ["IOTHUB_E2E_EVENTHUB_CONNECTION_STRING"]
EVENTHUB_CONSUMER_GROUP = os.getenv("IOTHUB_E2E_EVENTHUB_CONSUMER_GROUP", None)
else:
IOTHUB_CONNECTION_STRING = os.environ["IOTHUB_CONNECTION_STRING"]
EVENTHUB_CONNECTION_STRING = os.environ.get("EVENTHUB_CONNECTION_STRING")
DEVICE_CONNECTION_STRING = os.environ.get("IOTHUB_DEVICE_CONNECTION_STRING")
parts = {}
for key_and_value in IOTHUB_CONNECTION_STRING.split(";"):
key, value = key_and_value.split("=", 1)
parts[key] = value
IOTHUB_HOSTNAME = parts["HostName"]
IOTHUB_NAME = IOTHUB_HOSTNAME.split(".")[0]
if not EVENTHUB_CONSUMER_GROUP:
EVENTHUB_CONSUMER_GROUP = "$default"

13
dev_utils/setup.py Normal file
Просмотреть файл

@ -0,0 +1,13 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from setuptools import setup
setup(
name="dev_utils",
description="Internal development utilities for Azure IoT. NOT FOR DISTRIBUTION.",
version="0.0.0a1", # Alpha Release
license="MIT License",
)

Просмотреть файл

@ -45,6 +45,7 @@ if __name__ == "__main__":
# Install testing environment dependencies
pip_command("install -U -r requirements_test.txt")
pip_command("install -e dev_utils")
if args.dev_mode:
# Install local development environment dependencies.

48
sdklab/README.md Normal file
Просмотреть файл

@ -0,0 +1,48 @@
<div align=center>
<img src="../azure-iot-device/doc/images/azure_iot_sdk_python_banner.png"></img>
</div>
# Azure IoT SDK for Python SDK Lab tools
This directory contains a set of command-line tools that exercise the Azure IoT SDK for Python in different and sometimes interesting ways.
The tools are like end-to-end tests, or stress tests, or long-haul tests.
They show examples of library usage, but they should not be considered as samples. In some cases, they do "bad things".
The name "sdklab" is inspired by the idea that this like a laboratory full of tools and experiments that can be used to exercise the SDK.
* Some of them are perfectly innocent apps that do everything "correctly," but perhaps a bit aggressively than traditional samples or tests.
* Some of them use undocumented knowledge to alter or distort the behavior of the library.
* Some of them simulate very specific conditions which have been known to break functionality in the past.
* Some of them use common but strange programming practices technically correct and legal, but don't make much sense.
* Some of them expose (hopefully theoretical) bugs that haven't been fixed yet or weaknesses in the library that will eventually need to be architected away.
These are all designed to return a success value so they can be used in shell scripts or ci/cd pipelines.
The goal is to have them all succeed and return `0` 100% of the time.
When the apps do this, they can be added to `run_gate_tests.py` so they get run regularly to ensure library quality and prevent bit-rot.
Some of the apps in here won't succeed.
Returning failure is reserved for apps that expose issues that have been discovered but not yet fixed.
# Test App Description
## `meantimerecovery` directory
This tool predates the re-organization of this set of tools.
It has a README that describes how it can be used, and it does not follow the "should always return success" rules discussed above.
## `./fuzzing/fuzz_send_message.py`
This tool exercises the `send_message` method with different faults injected into Paho at different times. Documentation on what faults are injected can be found in `../dev_utils/dev_utils/paho_fuzz_hook.py`.
## `./regressions/regression_pr1023_infinite_get_twin.py`
This tool exercises a shutdown scenario that was fixed in pull request #1023.
## `./regressions/regression_github_990.py`
This tool verifies a number of different behaviors around reconnect failures that were originally reported in GitHub issue #990.
## `./simple_stress/simple_send_message_bulk.py`
This tool verifies simple ``send_message` operation in bulk.

Просмотреть файл

@ -0,0 +1,153 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import asyncio
import logging
import argparse
import paho_fuzz_hook
from azure.iot.device.aio import IoTHubDeviceClient
from dev_utils import test_env, random_content
from dev_utils.service_helper import ServiceHelper
logging.basicConfig(level=logging.WARNING)
logging.getLogger("e2e").setLevel(level=logging.INFO)
"""
This tool does limited "fuzzing" of the client library by by injecting various
failures into the sockets that Paho uses. The set of failures is limited and are
described in the `fuzz_type_help` string below. Some of these "failures" are
problems that might occur at the network level, such as lost packets and dropped
connections, and other "failures" are exceptions that are raised from lower components.
Calling this "fuzzing" is a misnomer. While this code "injects random failures" into
the transport, that set of random failures are based on scenarios that we except might happen.
The fact that we're guessing the types of failures that might happen limits the scope
of testing, but it still provides value.
"""
async def queue_send_messages(device_client, messages_to_send=60):
messages = [random_content.get_random_message() for _ in range(0, messages_to_send)]
message_ids = [m.message_id for m in messages]
send_tasks = [asyncio.create_task(device_client.send_message(m)) for m in messages]
return (send_tasks, message_ids)
async def wait_for_messages(service_helper, message_ids):
count_received = 0
while len(message_ids):
message = await service_helper.wait_for_eventhub_arrival(message_id=None)
if message.message_id in message_ids:
message_ids.remove(message.message_id)
count_received += 1
print("received={}, remaining={}".format(count_received, len(message_ids)))
drop_outgoing_packets_until_reconnect = 1
drop_individual_outgoing_packets = 2
drop_incoming_packets_until_reconnect = 3
drop_individual_incoming_packets = 4
raise_send_exception = 5
raise_receive_exception = 6
fuzz_type_help = """
1: drop_outgoing_packets_until_reconnect
Simulates failures where the transport connection drops all outgoing packets
until the network socket is closed and re-opened. This simulates a
"broken output pipe".
2: drop_individual_outgoing_packets
Simulates loss of individual outgoing packets. This simulates scenarios
where individual outgoing messages are lost, but the connection isn't
necessarily "broken".
3: drop_incoming_packets_until_reconnect
Simulates failures where the transport connection drops all incoming packets
until the network socket is closed and re-opened. This simulates a
"broken input pipe".
4: drop_individual_incoming_packets
Simulates the loss of individual incoming packets. This simulates scenarios
where individual incoming messages are lost, but the connection isn't necessarily
"broken"
5: raise_send_exception
Simulates a failure where the call into the transport socket `send` function raises
an exception. This simulates low-level socket failures on the outgoing socket.
6: raise_receive_exception
Simulates a failure where the call into the transport socket `recv` function raises
an exception. This simulates low-level socket failures on the incoming socket.
"""
async def main(fuzz_type):
device_client = IoTHubDeviceClient.create_from_connection_string(
test_env.DEVICE_CONNECTION_STRING, keep_alive=15
)
service_helper = ServiceHelper(
iothub_connection_string=test_env.IOTHUB_CONNECTION_STRING,
eventhub_connection_string=test_env.EVENTHUB_CONNECTION_STRING,
eventhub_consumer_group=test_env.EVENTHUB_CONSUMER_GROUP,
)
paho_fuzz_hook.add_paho_logging_hook(device_client)
try:
# Connect the device client.
print("connecting")
await device_client.connect()
# Start fuzzing after the client is connected
if fuzz_type == drop_outgoing_packets_until_reconnect:
paho_fuzz_hook.add_hook_drop_outgoing_until_reconnect(device_client, 0.05)
elif fuzz_type == drop_individual_outgoing_packets:
paho_fuzz_hook.add_hook_drop_individual_outgoing(device_client, 0.05)
elif fuzz_type == drop_incoming_packets_until_reconnect:
# probability for incoming is calculated on every byte, so it needs to be much lower
paho_fuzz_hook.add_hook_drop_incoming_until_reconnect(device_client, 0.001)
elif fuzz_type == drop_individual_incoming_packets:
paho_fuzz_hook.add_hook_drop_individual_incoming(device_client, 0.05)
elif fuzz_type == raise_send_exception:
paho_fuzz_hook.add_hook_raise_send_exception(device_client, 0.05)
elif fuzz_type == raise_receive_exception:
paho_fuzz_hook.add_hook_raise_receive_exception(device_client, 0.05)
else:
assert False
# TOOD: can we add device_id and module_id attributes on the client?
service_helper.set_identity(
device_id=device_client._mqtt_pipeline.pipeline_configuration.device_id,
module_id=device_client._mqtt_pipeline.pipeline_configuration.module_id,
)
print("sleeping to let eventhub consumers spin up correctly")
await asyncio.sleep(5)
print("sending")
(send_tasks, message_ids) = await queue_send_messages(device_client)
await asyncio.gather(*send_tasks)
print("done sending")
print("waiting")
await wait_for_messages(service_helper, message_ids)
finally:
print("Shutting down service helper")
await service_helper.shutdown()
print("Shutting down device client")
await device_client.shutdown()
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="fuzz_send_message")
parser.add_argument("fuzz_type", type=int, choices=range(1, 7), help=fuzz_type_help)
args = parser.parse_args()
asyncio.run(main(args.fuzz_type))

Просмотреть файл

@ -0,0 +1,253 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from dev_utils import logging_hook
import random
import functools
"""
def _sock_send(self, buf):
def _sock_recv(self, buffsize):
def _reset_sockets(self, sockpair_only=False):
def _sock_close(self):
"""
# List of Paho functions to add logging to
PAHO_FUNCTIONS_TO_HOOK = {
# "_sock_send": False,
# "_sock_recv": False,
# "_sock_close": False,
# "_reset_sockets": False,
"connect": False,
"disconnect": False,
"reconnect": False,
"publish": False,
"on_connect": True,
"on_disconnect": True,
}
class MyFakeException(Exception):
pass
def add_paho_logging_hook(device_client, log_func=print):
"""
Add logging hooks to all the Paho functions listed in the `PAHO_FUNCTIONS_TO_HOOK`
list
"""
paho = logging_hook.get_paho_from_device_client(device_client)
for name in PAHO_FUNCTIONS_TO_HOOK:
logging_hook.add_logging_hook(
obj=paho,
func_name=name,
log_func=log_func,
module_name="Paho",
log_args=PAHO_FUNCTIONS_TO_HOOK[name],
)
def add_hook_drop_outgoing_until_reconnect(device_client, failure_probability, log_func=print):
"""
Add a hook to randomly drop all outgoing messages until reconnect based on some failure
probability. This is used to simulate a "connection drop" scenario where packets just stop
being sent and the only way to recover is to close the socket and open a new one.
"""
paho = logging_hook.get_paho_from_device_client(device_client)
old_sock_send = paho._sock_send
old_sock_close = paho._sock_close
failed = False
@functools.wraps(old_sock_send)
def new_sock_send(buf):
"""
Inside `sock_send`, we randomly decide to stop sending packets. Once we stop sending,
we drop all outgoing packets until `failed` gets set back to `True`
"""
nonlocal failed
if not failed and random.random() < failure_probability:
log_func("-----------SOCKET FAILURE. All outgoing packets will be dropped")
failed = True
if failed:
log_func("-----------DROPPING {} bytes".format(len(buf)))
return len(buf)
else:
count = old_sock_send(buf)
log_func("---------- SENT {} bytes".format(count))
return count
@functools.wraps(old_sock_close)
def new_sock_close():
"""
Inside `sock_close`, we reset the `failed` variable to `False` to simulate the connection
being "fixed" after the socket is re-opened
"""
nonlocal failed
if failed:
log_func("-----------RESTORING SOCKET behavior")
failed = False
return old_sock_close()
paho._sock_send = new_sock_send
paho._sock_close = new_sock_close
def add_hook_drop_individual_outgoing(device_client, failure_probability, log_func=print):
"""
Add a hook to randomly drop individual outgoing messages until reconnect based on some
probability. This is used to simulate a "unreliable network" scenario where individual
outgoing packets get dropped, but other packets continue to flow.
"""
paho = logging_hook.get_paho_from_device_client(device_client)
old_sock_send = paho._sock_send
@functools.wraps(old_sock_send)
def new_sock_send(buf):
"""
Inside `sock_send` we randomly drop individual outgoing packets.
"""
if random.random() < failure_probability:
log_func("-----------DROPPING {} bytes".format(len(buf)))
return len(buf)
else:
count = old_sock_send(buf)
log_func("---------- SENT {} bytes".format(count))
return count
paho._sock_send = new_sock_send
def add_hook_drop_incoming_until_reconnect(device_client, failure_probability, log_func=print):
"""
Add a hook to randomly drop all incoming messages until reconnect based on some failure
probability. This is used to simulate a "connection drop" scenario where packets just stop
being sent and the only way to recover is to close the socket and open a new one.
"""
paho = logging_hook.get_paho_from_device_client(device_client)
old_sock_recv = paho._sock_recv
old_sock_close = paho._sock_close
failed = False
@functools.wraps(old_sock_recv)
def new_sock_recv(buffsize):
"""
Inside `sock_recv`, we randomly decide to stop receiving packets. Once we stop receiving,
we drop all incoming bytes until `failed` gets set back to `True`
Note: `sock_send` gets called on a packet-by-packet basis, sending hundreds of bytes
per call, and `sock_recv` gets called on a byte-by-byte basis, receiving a single byte
at a time, the `failure_probability` value passed to this function needs to be much
smaller than the `failure_probability` that might be used when fuzzing `sock_send`.
"""
nonlocal failed
if not failed and random.random() < failure_probability:
log_func("-----------SOCKET FAILURE. All incoming packets will be dropped")
failed = True
buf = old_sock_recv(buffsize)
if failed:
log_func("-----------DROPPING {} bytes".format(len(buf)))
raise BlockingIOError
else:
log_func("---------- RECEIVED {} bytes".format(len(buf)))
return buf
@functools.wraps(old_sock_close)
def new_sock_close():
"""
Inside `sock_close`, we reset the `failed` variable to `False` to simulate the connection
being "fixed" after the socket is re-opened
"""
nonlocal failed
if failed:
log_func("-----------RESTORING SOCKET behavior")
failed = False
return old_sock_close()
paho._sock_recv = new_sock_recv
paho._sock_close = new_sock_close
def add_hook_drop_individual_incoming(device_client, failure_probability, log_func=print):
"""
Add a hook to randomly drop individual incoming messages until reconnect based on some
failure probability. This is used to simulate a "unreliable network" scenario where individual
outgoing packets get dropped, but other packets continue to flow. Since we don't know what
an "individual message" is when we're receiving, we just flush the incoming byte queue and
assume that is good enough.
"""
paho = logging_hook.get_paho_from_device_client(device_client)
old_sock_recv = paho._sock_recv
@functools.wraps(old_sock_recv)
def new_sock_recv(buffsize):
"""
Inside `sock_recv` we randomly flush the incoming packet queue to simulate dropping of
a single packet, (or a small number of incoming packets depending on how quickly the
incoming queue is handled).
"""
if random.random() < failure_probability:
buf = old_sock_recv(2048)
log_func("---------- DROPPED {} bytes".format(len(buf)))
raise BlockingIOError
else:
buf = old_sock_recv(buffsize)
log_func("---------- RECEIVED {} bytes".format(len(buf)))
return buf
paho._sock_recv = new_sock_recv
def add_hook_raise_send_exception(device_client, failure_probability, log_func=print):
"""
Add a hook to randomly raise an exception when sending based on some failure probability. This
is used to simulate an exception inside Paho when sending.
"""
paho = logging_hook.get_paho_from_device_client(device_client)
old_sock_send = paho._sock_send
@functools.wraps(old_sock_send)
def new_sock_send(buf):
if random.random() < failure_probability:
log_func("---------- RAISING EXCEPTION")
raise MyFakeException("Forced Send Failure")
else:
count = old_sock_send(buf)
log_func("---------- SENT {} bytes".format(count))
return count
paho._sock_send = new_sock_send
def add_hook_raise_receive_exception(device_client, failure_probability, log_func=print):
"""
Add a hook to randomly raise an exception when receiving based on some failure probability. This
is used to simulate an exception inside Paho when receiving.
"""
paho = logging_hook.get_paho_from_device_client(device_client)
old_sock_recv = paho._sock_recv
@functools.wraps(old_sock_recv)
def new_sock_recv(buffsize):
if random.random() < failure_probability:
log_func("---------- RAISING EXCEPTION")
raise MyFakeException("Forced Receive Failure")
else:
buf = old_sock_recv(buffsize)
log_func("---------- RECEIVED {} bytes".format(len(buf)))
return buf
paho._sock_recv = new_sock_recv

Просмотреть файл

@ -0,0 +1,288 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import asyncio
import logging
import functools
import ssl
from dev_utils import test_env, logging_hook
from azure.iot.device.aio import IoTHubDeviceClient
import azure.iot.device.common
logging.basicConfig(level=logging.WARNING)
logging.getLogger("azure.iot").setLevel(level=logging.DEBUG)
"""
This app simulates was a bug that was reported where a websockets connection was raising a
`TlsExchangeAuthError` on a reconnect, between `PUBLISH` and `PUBACK`.
Order of events for this bug repro:
1. Customer calls `send_message`
2. After `PUBLISH` and before `PUBACK`, transport disconnects with `rc=1`
3. On reconnect, `transport.connect` raised `TlsExchangeAuthError(None,)` caused by SSLError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:852)')`
4. `TlsExchangeAuthError` is not transient, so `reconnect` fails. Since the reconnect is not user-initiated, no error is returned to the caller.
5. `PUBACK` never received, so `send_message` never completes.
The fix is to make `TlsExchangeAuthError` into a retryable error, so a reconnect gets initiated after step 3 above.
In the interest of completeness, this test was expanded to 8 different scenarios based on 3 flags
* Half of them simulate a retryable error (`TlsExchangeAuthError`), and half of them simulate a non-retryable error (bare `Exception`).
* Half of them run with `connection_retry` set to `True` and half run with `connection_retry` set to False`.
* Half of them run with `auto_connect` set to `True` and half run with `auto_connect` set to `False`.
`send_message` is not expected to succeed in all of these scenarios. The `send_should_succeed` flag is used to make sure it succeeds or fails appropriately.
"""
class MyFakeException(Exception):
pass
def workaround_github_990():
# This is a customer workaround for github issue 990. It should not be necessary
# but I"m leaving this here unused for reference.
try:
stage = azure.iot.device.common.pipeline.pipeline_stages_base.ConnectionStateStage
except AttributeError:
stage = azure.iot.device.common.pipeline.pipeline_stages_base.ReconnectStage
err = azure.iot.device.common.transport_exceptions.TlsExchangeAuthError
if err not in stage.transient_connect_errors:
stage.transient_connect_errors.append(err)
def hack_paho_to_disconnect_after_publish(
device_client, exception_to_raise_on_reconnect, log_func=print
):
"""
This function adds hooks to Paho to simulate the failure condition as outlined in the comments above.
"""
paho = logging_hook.get_paho_from_device_client(device_client)
old_sock_send = paho._sock_send
old_sock_recv = paho._sock_recv
old_sock_close = paho._sock_close
old_publish = paho.publish
old_reconnect = paho.reconnect
fail_sock_send_until_reconnect = False
first_publish = True
raise_on_next_reconnect = False
block_next_puback = False
@functools.wraps(old_publish)
def new_publish(*args, **kwargs):
"""
If this is the first call to `publish`, we set a 3 flags:
1. `block_next_publish` tells so `sock_recv` drops the next incoming packet, thus simulating a blocked `PUBACK`.
2. `fail_sock_send_until_reconnect` tells `sock_send` to stop sending outgoing packets, thus simulating a broken socket.
3. `raise_on_next_reconnect` tells `reconnect` to fail by raising `exception_to_raise_on_reconnect`
"""
nonlocal fail_sock_send_until_reconnect, first_publish, raise_on_next_reconnect, block_next_puback
if first_publish:
block_next_puback = True
result = old_publish(*args, **kwargs)
if first_publish:
log_func("-----------FIRST PUBLISH COMPLETE. Breaking socket")
fail_sock_send_until_reconnect = True
raise_on_next_reconnect = True
first_publish = False
return result
@functools.wraps(old_sock_recv)
def new_sock_recv(buflen):
"""
`sock_receive` only needs to block a single `PUBACK` packet, then resume normal operation.
"""
nonlocal block_next_puback
if block_next_puback:
result = old_sock_recv(1024) # flush the buffer
log_func("---------- BLOCKING PUBACK = {} bytes dropped".format(len(result)))
block_next_puback = False
raise BlockingIOError()
else:
return old_sock_recv(buflen)
@functools.wraps(old_sock_send)
def new_sock_send(buf):
"""
`sock_send` needs to drop all outgoing packets to simulate a broken socket connection.
"""
nonlocal fail_sock_send_until_reconnect
if fail_sock_send_until_reconnect:
log_func("-----------DROPPING {} bytes".format(len(buf)))
return len(buf)
else:
count = old_sock_send(buf)
log_func("---------- SENT {} bytes".format(count))
return count
@functools.wraps(old_sock_close)
def new_sock_close():
"""
`sock_close` needs to restore normal operation, thus removing the failure conditions when
a new socket is opened.
"""
nonlocal fail_sock_send_until_reconnect
if fail_sock_send_until_reconnect:
log_func("-----------RESTORING SOCKET behavior")
fail_sock_send_until_reconnect = False
return old_sock_close()
@functools.wraps(old_reconnect)
def new_reconnect(*args, **kwargs):
"""
`reconnect` needs to raise an exception when we need it to in order to simulate the exception
that lead to the discovery of this bug.
"""
nonlocal raise_on_next_reconnect
if raise_on_next_reconnect:
log_func(
"----------- RECONNECT CALLED. raising {}".format(
type(exception_to_raise_on_reconnect)
)
)
raise_on_next_reconnect = False
raise exception_to_raise_on_reconnect
return old_reconnect(*args, **kwargs)
paho._sock_send = new_sock_send
paho._sock_recv = new_sock_recv
paho._sock_close = new_sock_close
paho.publish = new_publish
paho.reconnect = new_reconnect
async def run_test(
connection_retry, auto_connect, send_should_succeed, exception_to_raise_on_reconnect
):
try:
print("*" * 80)
print()
print(
"Running test with connection_retry={}, auto_connect={}, and exception_to_raise_on_reconnect={}".format(
connection_retry, auto_connect, type(exception_to_raise_on_reconnect)
)
)
print()
print("*" * 80)
# Create instance of the device client using the connection string
device_client = IoTHubDeviceClient.create_from_connection_string(
test_env.DEVICE_CONNECTION_STRING,
keep_alive=10,
connection_retry=connection_retry,
auto_connect=auto_connect,
)
logging_hook.hook_device_client(device_client)
hack_paho_to_disconnect_after_publish(device_client, exception_to_raise_on_reconnect)
# Connect the device client.
await device_client.connect()
try:
print("Sending message...")
await device_client.send_message("This is a message that is being sent")
print("Message successfully sent!")
assert send_should_succeed
except Exception as e:
if send_should_succeed:
raise
else:
print("send_message failed as expected.")
print("raised: {}".format(str(e) or type(e)))
print("Shutting down")
await device_client.shutdown()
except Exception:
print("FAILED " * 10)
print(
"FAILED with connection_retry={}, auto_connect={}, and exception_to_raise_on_reconnect={}".format(
connection_retry, auto_connect, type(exception_to_raise_on_reconnect)
)
)
raise
async def main():
workaround_github_990()
# test with retryable errors.
tls_auth_error = ssl.SSLError()
tls_auth_error.strerror = "CERTIFICATE_VERIFY_FAILED"
# If `connection_retry` is `True`, the client should re-connect and
# `send_message` should succeed
await run_test(
connection_retry=True,
auto_connect=True,
send_should_succeed=True,
exception_to_raise_on_reconnect=tls_auth_error,
)
await run_test(
connection_retry=True,
auto_connect=False,
send_should_succeed=True,
exception_to_raise_on_reconnect=tls_auth_error,
)
# If `connection_retry` is `False`, the client should _not_ re-connect and
# `send_message` should fail
await run_test(
connection_retry=False,
auto_connect=True,
send_should_succeed=False,
exception_to_raise_on_reconnect=tls_auth_error,
)
await run_test(
connection_retry=False,
auto_connect=False,
send_should_succeed=False,
exception_to_raise_on_reconnect=tls_auth_error,
)
# Test with non-retryable (fatal) error. In all cases, send_message should fail.
# These scenarios are currently broken.
"""
fatal_error = MyFakeException("Fatal exception")
await run_test(
connection_retry=True,
auto_connect=True,
send_should_succeed=False,
exception_to_raise_on_reconnect=fatal_error,
)
await run_test(
connection_retry=True,
auto_connect=False,
send_should_succeed=False,
exception_to_raise_on_reconnect=fatal_error,
)
await run_test(
connection_retry=True,
auto_connect=True,
send_should_succeed=False,
exception_to_raise_on_reconnect=fatal_error,
)
await run_test(
connection_retry=False,
auto_connect=False,
send_should_succeed=False,
exception_to_raise_on_reconnect=fatal_error,
)
"""
if __name__ == "__main__":
asyncio.run(main())

Просмотреть файл

@ -0,0 +1,79 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import asyncio
import logging
from dev_utils import test_env
from azure.iot.device.aio import IoTHubDeviceClient
logging.basicConfig(level=logging.WARNING)
"""
This code checks to make sure the bug fixed by GitHub PR #1023 was fixed.
Order of events for the bug repro:
1. Customer code creates a client object and registers for twin change events.
2. When reconnecting, the device client code would issue a twin "GET" packet to the service.
This is done to ensure the client has a current version of the twin, which may have been
updated while the client was disconnected.
3. If `shutdown` is called _immediately_ after `connect`, the "GET" operation fails. This
is expected because there was no response.
4. The EnsureDesiredPropertyStage code responds to the GET operation failure by submitting a
new GET operation.
5. Because the client is shutting down, this second GET operation also fails. Go to step 4.
Final result (before this fix):
Multiple GET calls and an access violation.
"""
async def main():
# Create instance of the device client using the connection string
device_client = IoTHubDeviceClient.create_from_connection_string(
test_env.DEVICE_CONNECTION_STRING
)
# Connect the device client.
await device_client.connect()
async def on_patch(p):
print("Got patch")
# Even though we're not expecting a patch, registering for the patch is an important
# precondition for this particular bug.
device_client.on_twin_desired_properties_patch_received = on_patch
# Send a single message
print("Sending message...")
await device_client.send_message("This is a message that is being sent")
print("Message successfully sent!")
print("Getting twin...")
await device_client.get_twin()
print("got twin...")
print("Disconnecting")
await device_client.disconnect()
print("Disconnected")
print("Connecting")
await device_client.connect()
print("Connected")
# Finally, shut down the client
# If this is done _immediately_ after the `connect` call, this used to trigger a race condition
# which would cause a stack overflow and core dump. Using `disconnect` instead of `shutdown`
# or putting a sleep before this would not repro the same bug.
print("Shutting down")
await device_client.shutdown()
if __name__ == "__main__":
asyncio.run(main())

65
sdklab/run_gate_tests.py Normal file
Просмотреть файл

@ -0,0 +1,65 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import sys
import os
from subprocess import check_call
this_file_path = os.path.dirname(os.path.realpath(__file__))
failures = []
def run_test_app(relative_app_name, timeout):
absolute_app = os.path.join(this_file_path, relative_app_name)
try:
print("+-" * 66)
print()
print("Running test app: {}".format(absolute_app))
check_call([sys.executable] + absolute_app.split(), timeout=timeout)
print("Test app {} SUCCEEDED".format(absolute_app))
except Exception as err:
print(err)
print("Test app {} FAILED".format(absolute_app))
failures.push("{} failed with {}".format(relative_app_name, str(err) or type(err)))
print()
print("+-" * 66)
print()
print()
if __name__ == "__main__":
run_test_app("./simple_stress/simple_send_message_bulk.py", timeout=600)
run_test_app("./regressions/regression_pr_1023_infinite_get_twin.py", timeout=600)
run_test_app("./regressions/regression_issue_990_exception_after_publish.py", timeout=600)
run_test_app("./fuzzing/fuzz_send_message.py 1", timeout=600)
run_test_app("./fuzzing/fuzz_send_message.py 3", timeout=600)
# individual dropped PUBLISH messages don't get retried until after reconnect, so these 2 are
# currently commented out.
# run_test_app("./fuzzing/fuzz_send_message.py 2", timeout=600)
# run_test_app("./fuzzing/fuzz_send_message.py 4", timeout=600)
# exceptions in the Paho thread aren't caught and handled, so these 2 are currently commented out.
# run_test_app("./fuzzing/fuzz_send_message.py 5", timeout=600)
# run_test_app("./fuzzing/fuzz_send_message.py 6", timeout=600)
print("+-" * 66)
print()
print("FINAL RESULT: {}".format("FAILED" if len(failures) else "SUCCEEDED"))
if len(failures):
print()
for failure in failures:
print(failure)
print()
print("+-" * 66)
sys.exit(len(failures))

Просмотреть файл

@ -0,0 +1,75 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import asyncio
import logging
from azure.iot.device.aio import IoTHubDeviceClient
from dev_utils import test_env, random_content
from dev_utils.service_helper import ServiceHelper
logging.basicConfig(level=logging.WARNING)
logging.getLogger("e2e").setLevel(level=logging.INFO)
async def queue_send_messages(device_client, messages_to_send):
messages = [random_content.get_random_message() for _ in range(0, messages_to_send)]
message_ids = [m.message_id for m in messages]
send_tasks = [asyncio.create_task(device_client.send_message(m)) for m in messages]
return (send_tasks, message_ids)
async def wait_for_messages(service_helper, message_ids):
count_received = 0
while len(message_ids):
message = await service_helper.wait_for_eventhub_arrival(message_id=None)
message_ids.remove(message.message_id)
count_received += 1
print("received={}, remaining={}".format(count_received, len(message_ids)))
async def main():
device_client = IoTHubDeviceClient.create_from_connection_string(
test_env.DEVICE_CONNECTION_STRING
)
service_helper = ServiceHelper(
iothub_connection_string=test_env.IOTHUB_CONNECTION_STRING,
eventhub_connection_string=test_env.EVENTHUB_CONNECTION_STRING,
eventhub_consumer_group=test_env.EVENTHUB_CONSUMER_GROUP,
)
# logging_hook.hook_device_client(device_client)
# Connect the device client.
print("connecting")
await device_client.connect()
# TOOD: can we add device_id and module_id attributes on the client?
service_helper.set_identity(
device_id=device_client._mqtt_pipeline.pipeline_configuration.device_id,
module_id=device_client._mqtt_pipeline.pipeline_configuration.module_id,
)
print("sleeping to let eventhub consumers spin up correctly")
await asyncio.sleep(5)
print("sending 1000 messages")
(send_tasks, message_ids) = await queue_send_messages(device_client, 1000)
await asyncio.gather(*send_tasks)
print("done sending")
print("waiting")
await wait_for_messages(service_helper, message_ids)
print("Shutting down device client")
await device_client.shutdown()
print("Shutting down service helper")
await service_helper.shutdown()
if __name__ == "__main__":
asyncio.run(main())

Просмотреть файл

@ -3,13 +3,12 @@
# license information.
import pytest
import asyncio
import e2e_settings
from dev_utils import test_env, ServiceHelper
import logging
import datetime
import json
import retry_async
from utils import create_client_object
from service_helper import ServiceHelper
from azure.iot.device.iothub.aio import IoTHubDeviceClient, IoTHubModuleClient
logger = logging.getLogger(__name__)
@ -71,7 +70,7 @@ async def brand_new_client(device_identity, client_kwargs, service_helper, devic
# Keep this here. It is useful to see this info inside the inside devops pipeline test failures.
logger.info(
"Connecting device_id={}, module_id={}, to hub={} at {} (UTC)".format(
device_id, module_id, e2e_settings.IOTHUB_HOSTNAME, datetime.datetime.utcnow()
device_id, module_id, test_env.IOTHUB_HOSTNAME, datetime.datetime.utcnow()
)
)
@ -103,7 +102,13 @@ async def client(brand_new_client):
@pytest.fixture(scope="module")
async def service_helper(event_loop, executor):
service_helper = ServiceHelper(event_loop, executor)
service_helper = ServiceHelper(
iothub_connection_string=test_env.IOTHUB_CONNECTION_STRING,
eventhub_connection_string=test_env.EVENTHUB_CONNECTION_STRING,
eventhub_consumer_group=test_env.EVENTHUB_CONSUMER_GROUP,
event_loop=event_loop,
executor=executor,
)
await asyncio.sleep(3)
yield service_helper

Просмотреть файл

@ -5,7 +5,7 @@ import asyncio
import pytest
import logging
import json
from utils import get_random_dict
from dev_utils import get_random_dict
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)

Просмотреть файл

@ -5,7 +5,7 @@ import pytest
import logging
import asyncio
import parametrize
from utils import get_random_dict
from dev_utils import get_random_dict
from azure.iot.device.iothub import MethodResponse
logger = logging.getLogger(__name__)

Просмотреть файл

@ -5,7 +5,7 @@ import asyncio
import pytest
import logging
import json
import utils
import dev_utils
from azure.iot.device.exceptions import OperationCancelled, ClientError
logger = logging.getLogger(__name__)
@ -66,7 +66,7 @@ class TestSendMessage(object):
async def test_sends_json_string(self, client, service_helper, leak_tracker):
leak_tracker.set_initial_object_list()
message = json.dumps(utils.get_random_dict())
message = json.dumps(dev_utils.get_random_dict())
await client.send_message(message)
@ -79,7 +79,7 @@ class TestSendMessage(object):
async def test_sends_random_string(self, client, service_helper, leak_tracker):
leak_tracker.set_initial_object_list()
message = utils.get_random_string(16)
message = dev_utils.get_random_string(16)
await client.send_message(message)

Просмотреть файл

@ -8,8 +8,8 @@ import json
import time
import parametrize
import task_cleanup
from iptables import all_disconnect_types
from utils import get_random_message
from dev_utils import get_random_message
from dev_utils.iptables import all_disconnect_types
from retry_async import retry_exponential_backoff_with_jitter
logger = logging.getLogger(__name__)

Просмотреть файл

@ -5,7 +5,7 @@ import asyncio
import pytest
import logging
import const
from utils import get_random_dict
from dev_utils import get_random_dict
from azure.iot.device.exceptions import ClientError
logger = logging.getLogger(__name__)

Просмотреть файл

@ -7,10 +7,9 @@ import concurrent.futures
import test_config
import device_identity_helper
import const
import leak_tracker as leak_tracker_module
import iptables
import e2e_settings
from utils import get_random_message, get_random_dict, is_windows
import dev_utils.leak_tracker as leak_tracker_module
from dev_utils import test_env, get_random_message, get_random_dict, iptables
from utils import is_windows
from drop_fixtures import dropper # noqa: F401
from client_fixtures import ( # noqa: F401
@ -186,7 +185,7 @@ def pytest_runtest_setup(item):
"""
# reconnect in case a previously interrupted test run left our network disconnected
iptables.reconnect_all(test_config.config.transport, e2e_settings.IOTHUB_HOSTNAME)
iptables.reconnect_all(test_config.config.transport, test_env.IOTHUB_HOSTNAME)
# tests that use iptables need to be skipped on Windows
if is_windows():

Просмотреть файл

@ -2,7 +2,7 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import uuid
import e2e_settings
from dev_utils import test_env
import time
from azure.iot.hub import IoTHubRegistryManager
from base64 import b64encode, b64decode
@ -54,14 +54,14 @@ def create_device_with_symmetric_key():
desc.device_id = "00e2etest-delete-me-python-key-" + str(uuid.uuid4())
registry_manager = IoTHubRegistryManager.from_connection_string(
e2e_settings.IOTHUB_CONNECTION_STRING
test_env.IOTHUB_CONNECTION_STRING
)
dev = registry_manager.create_device_with_sas(desc.device_id, None, None, "enabled")
desc.primary_key = dev.authentication.symmetric_key.primary_key
desc.connection_string = (
"HostName="
+ e2e_settings.IOTHUB_HOSTNAME
+ test_env.IOTHUB_HOSTNAME
+ ";DeviceId="
+ desc.device_id
+ ";SharedAccessKey="
@ -76,13 +76,13 @@ def create_device_with_sas():
desc.device_id = "00e2etest-delete-me-python-sas-" + str(uuid.uuid4())
registry_manager = IoTHubRegistryManager.from_connection_string(
e2e_settings.IOTHUB_CONNECTION_STRING
test_env.IOTHUB_CONNECTION_STRING
)
dev = registry_manager.create_device_with_sas(desc.device_id, None, None, "enabled")
desc.primary_key = dev.authentication.symmetric_key.primary_key
uri = "{}/devices/{}".format(e2e_settings.IOTHUB_HOSTNAME, desc.device_id)
uri = "{}/devices/{}".format(test_env.IOTHUB_HOSTNAME, desc.device_id)
expiry = time.time() + 3600
desc.sas_token = generate_sas_token(uri, desc.primary_key, None, expiry)
@ -100,6 +100,6 @@ def create_device_with_x509_ca_signed_cert():
def delete_device(device_id):
registry_manager = IoTHubRegistryManager.from_connection_string(
e2e_settings.IOTHUB_CONNECTION_STRING
test_env.IOTHUB_CONNECTION_STRING
)
registry_manager.delete_device(device_id)

Просмотреть файл

@ -2,14 +2,16 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import pytest
import iptables
import logging
import e2e_settings
from dev_utils import iptables, test_env
logger = logging.getLogger(__name__)
iptables.reconnect_all("mqtt", e2e_settings.IOTHUB_HOSTNAME)
iptables.reconnect_all("mqttws", e2e_settings.IOTHUB_HOSTNAME)
print("reconnecting mqtt")
iptables.reconnect_all("mqtt", test_env.IOTHUB_HOSTNAME)
print("reconnecting mqttws")
iptables.reconnect_all("mqttws", test_env.IOTHUB_HOSTNAME)
print("Done")
class Dropper(object):
@ -17,18 +19,16 @@ class Dropper(object):
self.transport = transport
def disconnect_outgoing(self, disconnect_type):
iptables.disconnect_output_port(
disconnect_type, self.transport, e2e_settings.IOTHUB_HOSTNAME
)
iptables.disconnect_output_port(disconnect_type, self.transport, test_env.IOTHUB_HOSTNAME)
def drop_outgoing(self):
iptables.disconnect_output_port("DROP", self.transport, e2e_settings.IOTHUB_HOSTNAME)
iptables.disconnect_output_port("DROP", self.transport, test_env.IOTHUB_HOSTNAME)
def reject_outgoing(self):
iptables.disconnect_output_port("REJECT", self.transport, e2e_settings.IOTHUB_HOSTNAME)
iptables.disconnect_output_port("REJECT", self.transport, test_env.IOTHUB_HOSTNAME)
def restore_all(self):
iptables.reconnect_all(self.transport, e2e_settings.IOTHUB_HOSTNAME)
iptables.reconnect_all(self.transport, test_env.IOTHUB_HOSTNAME)
@pytest.fixture(scope="function")

Просмотреть файл

@ -1,61 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import six
import os
import json
if six.PY2:
FileNotFoundError = IOError
IOTHUB_CONNECTION_STRING = None
EVENTHUB_CONNECTION_STRING = None
IOTHUB_HOSTNAME = None
IOTHUB_NAME = None
EVENTHUB_CONSUMER_GROUP = None
def get_secrets():
global IOTHUB_CONNECTION_STRING, EVENTHUB_CONNECTION_STRING, IOTHUB_HOSTNAME, IOTHUB_NAME, EVENTHUB_CONSUMER_GROUP
secrets = None
this_file_path = os.path.dirname(os.path.realpath(__file__))
test_path = this_file_path
while secrets is None:
filename = os.path.join(test_path, "_e2e_settings.json")
try:
with open(filename, "r") as f:
secrets = json.load(f)
print("settings loaded from {}".format(filename))
except FileNotFoundError:
new_test_path = os.path.dirname(test_path)
if new_test_path == test_path:
break
test_path = new_test_path
if secrets:
IOTHUB_CONNECTION_STRING = secrets.get("iothubConnectionString", None)
EVENTHUB_CONNECTION_STRING = secrets.get("eventhubConnectionString", None)
EVENTHUB_CONSUMER_GROUP = secrets.get("eventhubConsumerGroup", None)
elif "IOTHUB_E2E_IOTHUB_CONNECTION_STRING" in os.environ:
IOTHUB_CONNECTION_STRING = os.environ["IOTHUB_E2E_IOTHUB_CONNECTION_STRING"]
EVENTHUB_CONNECTION_STRING = os.environ["IOTHUB_E2E_EVENTHUB_CONNECTION_STRING"]
EVENTHUB_CONSUMER_GROUP = os.getenv("IOTHUB_E2E_EVENTHUB_CONSUMER_GROUP", None)
else:
IOTHUB_CONNECTION_STRING = os.environ["IOTHUB_CONNECTION_STRING"]
EVENTHUB_CONNECTION_STRING = os.environ.get("EVENTHUB_CONNECTION_STRING")
parts = {}
for key_and_value in IOTHUB_CONNECTION_STRING.split(";"):
key, value = key_and_value.split("=", 1)
parts[key] = value
IOTHUB_HOSTNAME = parts["HostName"]
IOTHUB_NAME = IOTHUB_HOSTNAME.split(".")[0]
get_secrets()
if not EVENTHUB_CONSUMER_GROUP:
EVENTHUB_CONSUMER_GROUP = "$default"

Просмотреть файл

@ -1,65 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
from service_helper_sync import ServiceHelperSync
class ServiceHelper:
def __init__(self, event_loop, executor):
self.event_loop = event_loop
self.executor = executor
self.inner_object = ServiceHelperSync()
def set_identity(self, device_id, module_id):
return self.inner_object.set_identity(device_id, module_id)
async def set_desired_properties(self, desired_props):
return await self.event_loop.run_in_executor(
self.executor,
self.inner_object.set_desired_properties,
desired_props,
)
async def invoke_method(
self,
method_name,
payload,
connect_timeout_in_seconds=None,
response_timeout_in_seconds=None,
):
return await self.event_loop.run_in_executor(
self.executor,
self.inner_object.invoke_method,
method_name,
payload,
connect_timeout_in_seconds,
response_timeout_in_seconds,
)
async def send_c2d(
self,
payload,
properties,
):
return await self.event_loop.run_in_executor(
self.executor, self.inner_object.send_c2d, payload, properties
)
async def wait_for_eventhub_arrival(self, message_id, timeout=60):
return await self.event_loop.run_in_executor(
self.executor,
self.inner_object.wait_for_eventhub_arrival,
message_id,
timeout,
)
async def get_next_reported_patch_arrival(self, block=True, timeout=20):
return await self.event_loop.run_in_executor(
self.executor,
self.inner_object.get_next_reported_patch_arrival,
block,
timeout,
)
async def shutdown(self):
return await self.event_loop.run_in_executor(self.executor, self.inner_object.shutdown)

Просмотреть файл

@ -3,11 +3,10 @@
# license information.
import pytest
import time
import e2e_settings
from dev_utils import test_env, ServiceHelperSync
import logging
import datetime
from utils import create_client_object
from service_helper_sync import ServiceHelperSync
from azure.iot.device.iothub import IoTHubDeviceClient, IoTHubModuleClient
logger = logging.getLogger(__name__)
@ -21,7 +20,7 @@ def brand_new_client(device_identity, client_kwargs, service_helper, device_id,
# Keep this here. It is useful to see this info inside the inside devops pipeline test failures.
logger.info(
"Connecting device_id={}, module_id={}, to hub={} at {} (UTC)".format(
device_id, module_id, e2e_settings.IOTHUB_HOSTNAME, datetime.datetime.utcnow()
device_id, module_id, test_env.IOTHUB_HOSTNAME, datetime.datetime.utcnow()
)
)
@ -53,7 +52,11 @@ def client(brand_new_client):
@pytest.fixture(scope="module")
def service_helper():
service_helper = ServiceHelperSync()
service_helper = ServiceHelperSync(
iothub_connection_string=test_env.IOTHUB_CONNECTION_STRING,
eventhub_connection_string=test_env.EVENTHUB_CONNECTION_STRING,
eventhub_consumer_group=test_env.EVENTHUB_CONSUMER_GROUP,
)
time.sleep(3)
yield service_helper

Просмотреть файл

@ -5,7 +5,7 @@ import pytest
import logging
import json
import threading
from utils import get_random_dict
from dev_utils import get_random_dict
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)

Просмотреть файл

@ -4,7 +4,7 @@
import pytest
import logging
import time
from utils import get_random_dict
from dev_utils import get_random_dict
import parametrize
from azure.iot.device.iothub import MethodResponse

Просмотреть файл

@ -5,7 +5,7 @@ import pytest
import logging
import json
import time
import utils
import dev_utils
from azure.iot.device.exceptions import OperationCancelled, ClientError
logger = logging.getLogger(__name__)
@ -61,7 +61,7 @@ class TestSendMessage(object):
def test_sync_sends_json_string(self, client, service_helper, leak_tracker):
leak_tracker.set_initial_object_list()
message = json.dumps(utils.get_random_dict())
message = json.dumps(dev_utils.get_random_dict())
client.send_message(message)
@ -74,7 +74,7 @@ class TestSendMessage(object):
def test_sync_sends_random_string(self, client, service_helper, leak_tracker):
leak_tracker.set_initial_object_list()
message = utils.get_random_string(16)
message = dev_utils.get_random_string(16)
client.send_message(message)

Просмотреть файл

@ -6,7 +6,7 @@ import logging
import time
import const
import queue
from utils import get_random_dict
from dev_utils import get_random_dict
from azure.iot.device.exceptions import ClientError
logger = logging.getLogger(__name__)

Просмотреть файл

@ -1,13 +1,8 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
import random
import string
import json
import uuid
import const
import test_config
import e2e_settings
from dev_utils import test_env
import logging
import sys
from azure.iot.device.iothub import Message
@ -16,42 +11,6 @@ logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
def get_random_string(length, random_length=False):
if random_length:
length = random.randint(0, length)
return "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
def get_random_dict(total_payload_length=0):
obj = {
"random_guid": str(uuid.uuid4()),
"sub_object": {
"string_value": get_random_string(10),
"bool_value": random.random() > 0.5,
"int_value": random.randint(-65535, 65535),
},
}
if total_payload_length:
length = len(json.dumps(obj))
extra_characters = total_payload_length - length - len(', "extra": ""')
if extra_characters > 0:
obj["extra"] = get_random_string(extra_characters)
assert len(json.dumps(obj)) == total_payload_length
return obj
def get_random_message(total_payload_length=0):
message = Message(json.dumps(get_random_dict(total_payload_length)))
message.content_type = const.JSON_CONTENT_TYPE
message.content_encoding = const.JSON_CONTENT_ENCODING
message.message_id = str(uuid.uuid4())
return message
fault_injection_types = {
"KillTcp": " severs the TCP connection ",
"ShutDownMqtt": " cleanly shutdowns the MQTT connection ",
@ -102,7 +61,7 @@ def create_client_object(device_identity, client_kwargs, DeviceClass, ModuleClas
client = ClientClass.create_from_symmetric_key(
device_identity.primary_key,
e2e_settings.IOTHUB_HOSTNAME,
test_env.IOTHUB_HOSTNAME,
device_identity.device_id,
**client_kwargs
)