Pull over small fixes from stress branch (#247)
This commit is contained in:
Родитель
bacf3bb51d
Коммит
9befefb000
|
@ -19,6 +19,7 @@ compat eol=lf
|
|||
copyright eol=lf
|
||||
format eol=lf
|
||||
rules eol=lf
|
||||
bin/* eol=lf
|
||||
|
||||
# Denote all files that should not have line endings normalized, should not be
|
||||
# merged, and should not show in a textual diff.
|
||||
|
|
|
@ -200,7 +200,7 @@ def handle_command_args(args):
|
|||
print(Fore.GREEN + "Done. Deploy with the following command:")
|
||||
print(
|
||||
Fore.GREEN
|
||||
+ "horton deploy iothub --image {}:{}".format(
|
||||
+ "horton deploy iothub image {}:{}".format(
|
||||
tags.docker_full_image_name, tags.image_tags[0]
|
||||
)
|
||||
)
|
||||
|
|
|
@ -32,6 +32,7 @@ def add_edge_modules(testMod_image):
|
|||
settings.friend_module.module_id = "friendMod"
|
||||
settings.friend_module.connection_type = "environment"
|
||||
settings.friend_module.object_type = "iotedge_module"
|
||||
settings.friend_module.container_name = "friendMod"
|
||||
utilities.set_args_from_image(settings.friend_module, friendMod_image)
|
||||
|
||||
settings.test_module.host_port = testMod_host_port
|
||||
|
@ -39,6 +40,7 @@ def add_edge_modules(testMod_image):
|
|||
settings.test_module.module_id = "testMod"
|
||||
settings.test_module.connection_type = "environment"
|
||||
settings.test_module.object_type = "iotedge_module"
|
||||
settings.test_module.container_name = "testMod"
|
||||
utilities.set_args_from_image(settings.test_module, testMod_image)
|
||||
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ def deploy_for_iotedge(testMod_image):
|
|||
settings.leaf_device.language = settings.test_module.language
|
||||
settings.leaf_device.host_port = settings.test_module.host_port
|
||||
settings.leaf_device.container_port = settings.test_module.container_port
|
||||
settings.leaf_device.container_name = settings.test_module.container_name
|
||||
settings.leaf_device.object_type = "leaf_device"
|
||||
|
||||
settings.net_control.test_destination = host
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
# Copyright (c) Microsoft. All rights reserved.
|
||||
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
|
||||
script_dir=$(cd "$(dirname "$0")" && pwd)
|
||||
root_dir=$(cd "${script_dir}/.." && pwd)
|
||||
|
||||
module_list="testMod friendMod edgeHub edgeAgent"
|
||||
|
||||
for mod in ${module_list}; do
|
||||
echo "getting log for $mod"
|
||||
sudo docker logs -t ${mod} &> ${mod}.log
|
||||
[ $? -eq 0 ] || { echo "No log for ${mod}"; }
|
||||
done
|
||||
|
||||
echo "merging logs"
|
||||
|
||||
args="-filterfile ${root_dir}/pyscripts/docker_log_processor_filters.json"
|
||||
for mod in ${module_list}; do
|
||||
if [ -f ${mod}.log ]; then
|
||||
args="${args} -staticfile ${mod}.log"
|
||||
fi
|
||||
done
|
||||
|
||||
python ${root_dir}/pyscripts/docker_log_processor.py $args > merged.log
|
||||
[ $? -eq 0 ] || { echo "error merging logs"; exit 1; }
|
||||
|
||||
echo "Done. Output is in ./merged.log"
|
||||
|
|
@ -5,10 +5,22 @@ import logging
|
|||
import heap_check
|
||||
from azure.iot.device import IoTHubModuleClient
|
||||
from azure.iot.device.common.pipeline import pipeline_stages_base
|
||||
from azure.iot.device.iothub.auth import base_renewable_token_authentication_provider
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
do_async = False
|
||||
sas_renewal_interval = None
|
||||
|
||||
# Length of time, in seconds, that a SAS token is valid for.
|
||||
ORIGINAL_DEFAULT_TOKEN_VALIDITY_PERIOD = (
|
||||
base_renewable_token_authentication_provider.DEFAULT_TOKEN_VALIDITY_PERIOD
|
||||
)
|
||||
|
||||
# Length of time, in seconds, before a token expires that we want to begin renewing it.
|
||||
ORIGINAL_DEFAULT_TOKEN_RENEWAL_MARGIN = (
|
||||
base_renewable_token_authentication_provider.DEFAULT_TOKEN_RENEWAL_MARGIN
|
||||
)
|
||||
|
||||
|
||||
def log_message(msg):
|
||||
|
@ -20,9 +32,15 @@ def log_message(msg):
|
|||
|
||||
def set_flags(flags):
|
||||
global do_async
|
||||
global sas_renewal_interval
|
||||
|
||||
logger.info("setting flags to {}".format(flags))
|
||||
if "test_async" in flags and flags["test_async"]:
|
||||
do_async = True
|
||||
# Resist the tempation to use getattr. We don't want to change flags that aren't populated.
|
||||
if "test_async" in flags:
|
||||
do_async = flags["test_async"]
|
||||
if "sas_renewal_interval" in flags:
|
||||
sas_renewal_interval = flags["sas_renewal_interval"]
|
||||
print("Setting sas_renewal_interval to {}".format(sas_renewal_interval))
|
||||
|
||||
|
||||
def get_capabilities():
|
||||
|
@ -48,3 +66,20 @@ def send_command(cmd):
|
|||
heap_check.assert_all_iothub_objects_have_been_collected()
|
||||
else:
|
||||
raise Exception("Unsupported Command")
|
||||
|
||||
|
||||
def set_sas_interval():
|
||||
global sas_renewal_interval
|
||||
print("Using sas_renewal_interval of {}".format(sas_renewal_interval))
|
||||
if sas_renewal_interval:
|
||||
base_renewable_token_authentication_provider.DEFAULT_TOKEN_VALIDITY_PERIOD = (
|
||||
sas_renewal_interval
|
||||
)
|
||||
base_renewable_token_authentication_provider.DEFAULT_TOKEN_RENEWAL_MARGIN = 10
|
||||
else:
|
||||
base_renewable_token_authentication_provider.DEFAULT_TOKEN_VALIDITY_PERIOD = (
|
||||
ORIGINAL_DEFAULT_TOKEN_VALIDITY_PERIOD
|
||||
)
|
||||
base_renewable_token_authentication_provider.DEFAULT_TOKEN_RENEWAL_MARGIN = (
|
||||
ORIGINAL_DEFAULT_TOKEN_RENEWAL_MARGIN
|
||||
)
|
||||
|
|
|
@ -36,6 +36,8 @@ class Connect(ConnectionStatus):
|
|||
|
||||
def create_from_connection_string(self, transport_type, connection_string, cert):
|
||||
|
||||
internal_control_glue.set_sas_interval()
|
||||
|
||||
kwargs = {}
|
||||
if transport_type == "mqttws":
|
||||
kwargs["websockets"] = True
|
||||
|
@ -88,6 +90,8 @@ class ConnectFromEnvironment(object):
|
|||
|
||||
def create_from_environment(self, transport_type):
|
||||
|
||||
internal_control_glue.set_sas_interval()
|
||||
|
||||
kwargs = {}
|
||||
if transport_type == "mqttws":
|
||||
kwargs["websockets"] = True
|
||||
|
|
|
@ -4,11 +4,11 @@
|
|||
import logging
|
||||
import convert
|
||||
import async_helper
|
||||
import internal_control_glue
|
||||
from connection_status import ConnectionStatus
|
||||
from azure.iot.device import MethodResponse
|
||||
from azure.iot.device.aio import IoTHubDeviceClient, IoTHubModuleClient
|
||||
from azure.iot.device.common import mqtt_transport
|
||||
from azure.iot.device.iothub.auth import base_renewable_token_authentication_provider
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -28,6 +28,8 @@ class Connect(ConnectionStatus):
|
|||
|
||||
def create_from_connection_string(self, transport_type, connection_string, cert):
|
||||
|
||||
internal_control_glue.set_sas_interval()
|
||||
|
||||
kwargs = {}
|
||||
if transport_type == "mqttws":
|
||||
kwargs["websockets"] = True
|
||||
|
@ -80,6 +82,8 @@ class ConnectFromEnvironment(object):
|
|||
|
||||
def create_from_environment(self, transport_type):
|
||||
|
||||
internal_control_glue.set_sas_interval()
|
||||
|
||||
kwargs = {}
|
||||
if transport_type == "mqttws":
|
||||
kwargs["websockets"] = True
|
||||
|
|
|
@ -31,12 +31,9 @@ mkdir -p $resultsdir
|
|||
pushd $resultsdir
|
||||
[ $? -eq 0 ] || { echo "pushd ${resultsdir} failed"; exit 1; }
|
||||
|
||||
echo "fetching docker logs for ${module_list}"
|
||||
for mod in ${module_list}; do
|
||||
echo "getting log for $mod"
|
||||
sudo docker logs -t ${mod} &> ${mod}.log
|
||||
[ $? -eq 0 ] || { echo "error fetching logs for ${mod}"; exit 1; }
|
||||
done
|
||||
echo "fetching docker logs"
|
||||
fetch-docker-logs
|
||||
[ $? -eq 0 ] || { echo "error fetching logs"; exit 1; }
|
||||
|
||||
if [ "${deployment_type}" -eq "iotedge" ]; then
|
||||
echo getting iotedged log
|
||||
|
@ -44,14 +41,6 @@ if [ "${deployment_type}" -eq "iotedge" ]; then
|
|||
[ $? -eq 0 ] || { echo "error fetching iotedged journal"; exit 1; }
|
||||
fi
|
||||
|
||||
echo "merging logs"
|
||||
args="-filterfile ${root_dir}/pyscripts/docker_log_processor_filters.json"
|
||||
for mod in ${module_list}; do
|
||||
args="${args} -staticfile ${mod}.log"
|
||||
done
|
||||
python ${root_dir}/pyscripts/docker_log_processor.py $args > merged.log
|
||||
[ $? -eq 0 ] || { echo "error merging logs"; exit 1; }
|
||||
|
||||
echo "saving original junit"
|
||||
cp "../TEST-${job_name}.xml" "./orig-TEST-${job_name}.xml"
|
||||
[ $? -eq 0 ] || { echo "error saving junit"; exit 1; }
|
||||
|
|
|
@ -12,7 +12,10 @@ if [ -f /etc/apt/sources.list.d/microsoft-prod.list ]; then
|
|||
fi
|
||||
|
||||
# Download the Microsoft repository GPG keys
|
||||
if [ $(lsb_release -is) == "Ubuntu" ]; then
|
||||
if [ $(lsb_release -is) == "LinuxMint" ] && [ $(lsb_release -rs) == "19.3" ]; then
|
||||
curl https://packages.microsoft.com/config/ubuntu/18.04/multiarch/prod.list > ./microsoft-prod.list
|
||||
[ $? -eq 0 ] || { colorecho $_red "curl failed"; exit 1; }
|
||||
elif [ $(lsb_release -is) == "Ubuntu" ]; then
|
||||
curl https://packages.microsoft.com/config/ubuntu/$(lsb_release -rs)/multiarch/prod.list > ./microsoft-prod.list
|
||||
[ $? -eq 0 ] || { colorecho $_red "curl failed"; exit 1; }
|
||||
elif [ $(lsb_release -is) == "Raspbian" ]; then
|
||||
|
|
|
@ -5,6 +5,10 @@
|
|||
# default timeout for all rest API calls, not otherwise specified
|
||||
default_api_timeout = 120
|
||||
|
||||
# timeout for control APIs. Separated from default_api_timeout because these API
|
||||
# calls aren't subject to network disconnection
|
||||
control_api_timeout = 60
|
||||
|
||||
# timeout for print_message calls
|
||||
print_message_timeout = 2
|
||||
|
||||
|
|
|
@ -11,6 +11,10 @@ emulate_async_executor = concurrent.futures.ThreadPoolExecutor(
|
|||
max_workers=32, thread_name_prefix="emulate_async"
|
||||
)
|
||||
|
||||
control_api_emulate_async_executor = concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=8, thread_name_prefix="control_api_emulate_async"
|
||||
)
|
||||
|
||||
|
||||
def get_running_loop():
|
||||
"""
|
||||
|
@ -29,7 +33,8 @@ def get_running_loop():
|
|||
|
||||
|
||||
def emulate_async(fn):
|
||||
"""Returns a coroutine function that calls a given function with emulated asynchronous
|
||||
"""
|
||||
Returns a coroutine function that calls a given function with emulated asynchronous
|
||||
behavior via use of mulithreading.
|
||||
|
||||
Can be applied as a decorator.
|
||||
|
@ -47,3 +52,29 @@ def emulate_async(fn):
|
|||
)
|
||||
|
||||
return async_fn_wrapper
|
||||
|
||||
|
||||
def control_api_emulate_async(fn):
|
||||
"""
|
||||
Returns a coroutine function that calls a given function with emulated asynchronous
|
||||
behavior via use of mulithreading.
|
||||
|
||||
Control APIs have their own threadpool. This is necessary because the emualte_async
|
||||
threadpool can become full, especially if the network is disconencted. We need
|
||||
control APIs to run so we can re-connect the network in this scenario.
|
||||
|
||||
Can be applied as a decorator.
|
||||
|
||||
:param fn: The sync function to be run in async.
|
||||
:returns: A coroutine function that will call the given sync function.
|
||||
"""
|
||||
|
||||
@functools.wraps(fn)
|
||||
async def async_fn_wrapper(*args, **kwargs):
|
||||
loop = get_running_loop()
|
||||
|
||||
return await loop.run_in_executor(
|
||||
control_api_emulate_async_executor, functools.partial(fn, *args, **kwargs)
|
||||
)
|
||||
|
||||
return async_fn_wrapper
|
||||
|
|
|
@ -7,7 +7,6 @@ from .generated.e2erestapi.azure_iot_end_to_end_test_wrapper_rest_api import (
|
|||
import msrest
|
||||
from .. import adapter_config
|
||||
from ..abstract_control_api import AbstractControlApi
|
||||
from ..decorators import emulate_async
|
||||
from .rest_decorators import log_entry_and_exit
|
||||
|
||||
rest_endpoints = None
|
||||
|
|
|
@ -8,7 +8,7 @@ import msrest
|
|||
from .. import adapter_config
|
||||
from ..abstract_net_api import AbstractNetApi
|
||||
from .rest_decorators import log_entry_and_exit
|
||||
from ..decorators import emulate_async
|
||||
from ..decorators import control_api_emulate_async
|
||||
|
||||
|
||||
class NetApi(AbstractNetApi):
|
||||
|
@ -19,35 +19,35 @@ class NetApi(AbstractNetApi):
|
|||
@log_entry_and_exit
|
||||
def set_destination_sync(self, ip, transport):
|
||||
self.rest_endpoint.set_destination(
|
||||
ip, transport, timeout=adapter_config.default_api_timeout
|
||||
ip, transport, timeout=adapter_config.control_api_timeout
|
||||
)
|
||||
|
||||
@emulate_async
|
||||
@control_api_emulate_async
|
||||
@log_entry_and_exit
|
||||
def disconnect(self, disconnect_type):
|
||||
self.rest_endpoint.disconnect(
|
||||
disconnect_type, timeout=adapter_config.default_api_timeout
|
||||
disconnect_type, timeout=adapter_config.control_api_timeout
|
||||
)
|
||||
|
||||
@emulate_async
|
||||
@control_api_emulate_async
|
||||
@log_entry_and_exit
|
||||
def reconnect(self):
|
||||
self.rest_endpoint.reconnect(timeout=adapter_config.default_api_timeout)
|
||||
self.rest_endpoint.reconnect(timeout=adapter_config.control_api_timeout)
|
||||
|
||||
@log_entry_and_exit
|
||||
def reconnect_sync(self):
|
||||
self.rest_endpoint.reconnect(timeout=adapter_config.default_api_timeout)
|
||||
self.rest_endpoint.reconnect(timeout=adapter_config.control_api_timeout)
|
||||
|
||||
@emulate_async
|
||||
@control_api_emulate_async
|
||||
@log_entry_and_exit
|
||||
def disconnect_after_c2d(self, disconnect_type):
|
||||
self.rest_endpoint.disconnect_after_c2d(
|
||||
disconnect_type, timeout=adapter_config.default_api_timeout
|
||||
disconnect_type, timeout=adapter_config.control_api_timeout
|
||||
)
|
||||
|
||||
@emulate_async
|
||||
@control_api_emulate_async
|
||||
@log_entry_and_exit
|
||||
def disconnect_after_d2c(self, disconnect_type):
|
||||
self.rest_endpoint.disconnect_after_d2c(
|
||||
disconnect_type, timeout=adapter_config.default_api_timeout
|
||||
disconnect_type, timeout=adapter_config.control_api_timeout
|
||||
)
|
||||
|
|
|
@ -40,6 +40,7 @@ def registry():
|
|||
|
||||
@pytest.fixture
|
||||
def friend():
|
||||
if settings.friend_module.adapter_address:
|
||||
friend_module = connections.get_module_client(settings.friend_module)
|
||||
yield friend_module
|
||||
logger(separator.format("friend module"))
|
||||
|
@ -47,6 +48,8 @@ def friend():
|
|||
friend_module.disconnect_sync()
|
||||
except Exception as e:
|
||||
logger("exception disconnecting friend module: {}".format(e))
|
||||
else:
|
||||
yield None
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -62,6 +65,7 @@ def test_module():
|
|||
|
||||
@pytest.fixture
|
||||
def leaf_device():
|
||||
if settings.leaf_device.adapter_address:
|
||||
leaf_device = connections.get_device_client(settings.leaf_device)
|
||||
yield leaf_device
|
||||
logger(separator.format("leaf device"))
|
||||
|
@ -69,6 +73,8 @@ def leaf_device():
|
|||
leaf_device.disconnect_sync()
|
||||
except Exception as e:
|
||||
logger("exception disconnecting leaf device: {}".format(e))
|
||||
else:
|
||||
yield None
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
Загрузка…
Ссылка в новой задаче