New Capture Stderr (#667)
* alternate format * fix tests, add error handling
This commit is contained in:
Родитель
b4b09e143b
Коммит
b3fb80d1c2
|
@ -16,15 +16,61 @@ logger = get_logger(__name__)
|
|||
|
||||
|
||||
class EmbeddedCLI(object):
|
||||
def __init__(self, cli_ctx=None):
|
||||
"""
|
||||
An embedded CLI wrapper for easily invoking commands.
|
||||
|
||||
...
|
||||
|
||||
Attributes
|
||||
----------
|
||||
output : str
|
||||
The output of the last invoked cli command. If the last command failed or there were no runs,
|
||||
will return ""
|
||||
error_code : int
|
||||
Error code of the last invoked cli command. If no runs, will be 0.
|
||||
az_cli : AzCli
|
||||
The cli that will be used for invoking commands. Should be the default CLI.
|
||||
user_subscription : Optional[str]
|
||||
The invoker's subscription.
|
||||
capture_stderr : bool
|
||||
Flag to determine whether we capture (don't print) output from invoked commands, but raise errors
|
||||
when they occur.
|
||||
"""
|
||||
def __init__(self, cli_ctx=None, capture_stderr: bool = False):
|
||||
super(EmbeddedCLI, self).__init__()
|
||||
self.output = ""
|
||||
self.error_code = 0
|
||||
self.az_cli = get_default_cli()
|
||||
self.user_subscription = cli_ctx.data.get('subscription_id') if cli_ctx else None
|
||||
self.capture_stderr = capture_stderr
|
||||
|
||||
def invoke(self, command: str, subscription: str = None):
|
||||
def invoke(
|
||||
self, command: str, subscription: str = None, capture_stderr: Optional[bool] = None
|
||||
):
|
||||
"""
|
||||
Run a given command.
|
||||
|
||||
Note that if capture_stderr is True, any error during invocation will be raised.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
command : str
|
||||
The command to invoke. Note that the command should omit the `az` from the command.
|
||||
subscription : Optional[str]
|
||||
Subscription for when it needs to be different from the self.user_subscription. Takes
|
||||
precedence over self.user_subscription.
|
||||
capture_stderr : Optional[bool]
|
||||
Flag to determine whether we capture (don't print) output from invoked commands, but raise errors
|
||||
when they occur. Takes precedence over self.capture_stderr.
|
||||
"""
|
||||
output_file = StringIO()
|
||||
old_exception_handler = None
|
||||
|
||||
# if capture_stderr is defined, use that, otherwise default to self.capture_stderr
|
||||
if (capture_stderr is None and self.capture_stderr) or capture_stderr:
|
||||
# Stop exception from being logged
|
||||
old_exception_handler = self.az_cli.exception_handler
|
||||
self.az_cli.exception_handler = lambda _: None
|
||||
|
||||
command = self._ensure_json_output(command=command)
|
||||
# prioritize subscription passed into invoke
|
||||
|
@ -37,7 +83,6 @@ class EmbeddedCLI(object):
|
|||
command=command, subscription=self.user_subscription
|
||||
)
|
||||
|
||||
# TODO: Capture stderr?
|
||||
try:
|
||||
self.error_code = (
|
||||
self.az_cli.invoke(shlex.split(command), out_file=output_file) or 0
|
||||
|
@ -52,11 +97,23 @@ class EmbeddedCLI(object):
|
|||
self.error_code,
|
||||
self.output,
|
||||
)
|
||||
|
||||
if old_exception_handler:
|
||||
self.az_cli.exception_handler = old_exception_handler
|
||||
if self.get_error():
|
||||
raise self.get_error()
|
||||
|
||||
output_file.close()
|
||||
|
||||
return self
|
||||
|
||||
def as_json(self):
|
||||
"""
|
||||
Try to parse the result of the last invoked cli command as a json.
|
||||
|
||||
If the json cannot be parsed, the last invoked cli command must have failed. This will raise a
|
||||
CLIInternalError.
|
||||
"""
|
||||
try:
|
||||
return json.loads(self.output)
|
||||
except Exception:
|
||||
|
@ -67,14 +124,18 @@ class EmbeddedCLI(object):
|
|||
)
|
||||
|
||||
def success(self) -> bool:
|
||||
"""Return if last invoked cli command was a success."""
|
||||
logger.debug("Operation error code: %s", self.error_code)
|
||||
return self.error_code == 0
|
||||
|
||||
def get_error(self) -> Optional[Exception]:
|
||||
"""Return error from last invoked cli command."""
|
||||
return self.az_cli.result.error
|
||||
|
||||
def _ensure_json_output(self, command: str) -> str:
|
||||
"""Force invoked cli command to return a json."""
|
||||
return "{} -o json".format(command)
|
||||
|
||||
def _ensure_subscription(self, command: str, subscription: str) -> str:
|
||||
"""Add subscription to invoked cli command."""
|
||||
return "{} --subscription '{}'".format(command, subscription)
|
||||
|
|
|
@ -13,6 +13,7 @@ from azure.cli.core.azclierror import (
|
|||
InvalidArgumentValueError
|
||||
)
|
||||
from azext_iot.common.embedded_cli import EmbeddedCLI
|
||||
from azext_iot.common.utility import handle_service_exception
|
||||
from azext_iot.iothub.common import (
|
||||
BYTES_PER_MEGABYTE,
|
||||
FORCE_DELETE_WARNING,
|
||||
|
@ -25,6 +26,7 @@ from azext_iot.iothub.common import (
|
|||
from azext_iot.iothub.providers.base import IoTHubProvider
|
||||
from azext_iot.common._azure import parse_cosmos_db_connection_string
|
||||
from azure.mgmt.iothub.models import ManagedIdentity
|
||||
from azure.core.exceptions import HttpResponseError
|
||||
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
@ -212,12 +214,15 @@ class MessageEndpoint(IoTHubProvider):
|
|||
})
|
||||
endpoints.storage_containers.append(new_endpoint)
|
||||
|
||||
try:
|
||||
return self.discovery.client.begin_create_or_update(
|
||||
self.hub_resource.additional_properties["resourcegroup"],
|
||||
self.hub_resource.name,
|
||||
self.hub_resource,
|
||||
if_match=self.hub_resource.etag
|
||||
)
|
||||
except HttpResponseError as e:
|
||||
handle_service_exception(e)
|
||||
|
||||
def show(self, endpoint_name: str):
|
||||
endpoints = self.hub_resource.properties.routing.endpoints
|
||||
|
@ -358,12 +363,15 @@ class MessageEndpoint(IoTHubProvider):
|
|||
endpoints.cosmos_db_sql_collections = []
|
||||
endpoints.storage_containers = []
|
||||
|
||||
try:
|
||||
return self.discovery.client.begin_create_or_update(
|
||||
self.hub_resource.additional_properties["resourcegroup"],
|
||||
self.hub_resource.name,
|
||||
self.hub_resource,
|
||||
if_match=self.hub_resource.etag
|
||||
)
|
||||
except HttpResponseError as e:
|
||||
handle_service_exception(e)
|
||||
|
||||
|
||||
def get_eventhub_cstring(
|
||||
|
|
|
@ -7,9 +7,10 @@
|
|||
from typing import Optional
|
||||
from knack.log import get_logger
|
||||
from azure.cli.core.azclierror import ResourceNotFoundError
|
||||
from azext_iot.common.utility import process_json_arg
|
||||
from azext_iot.common.utility import handle_service_exception, process_json_arg
|
||||
from azext_iot.iothub.common import RouteSourceType
|
||||
from azext_iot.iothub.providers.base import IoTHubProvider
|
||||
from azure.core.exceptions import HttpResponseError
|
||||
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
@ -42,12 +43,15 @@ class MessageRoute(IoTHubProvider):
|
|||
}
|
||||
)
|
||||
|
||||
try:
|
||||
return self.discovery.client.begin_create_or_update(
|
||||
resource_group_name=self.hub_resource.additional_properties['resourcegroup'],
|
||||
resource_name=self.hub_resource.name,
|
||||
iot_hub_description=self.hub_resource,
|
||||
if_match=self.hub_resource.etag
|
||||
)
|
||||
except HttpResponseError as e:
|
||||
handle_service_exception(e)
|
||||
|
||||
def update(
|
||||
self,
|
||||
|
@ -63,12 +67,15 @@ class MessageRoute(IoTHubProvider):
|
|||
route.condition = route.condition if condition is None else condition
|
||||
route.is_enabled = route.is_enabled if enabled is None else enabled
|
||||
|
||||
try:
|
||||
return self.discovery.client.begin_create_or_update(
|
||||
resource_group_name=self.hub_resource.additional_properties['resourcegroup'],
|
||||
resource_name=self.hub_resource.name,
|
||||
iot_hub_description=self.hub_resource,
|
||||
if_match=self.hub_resource.etag
|
||||
)
|
||||
except HttpResponseError as e:
|
||||
handle_service_exception(e)
|
||||
|
||||
def show(self, route_name: str):
|
||||
routes = self.hub_resource.properties.routing.routes
|
||||
|
@ -92,12 +99,15 @@ class MessageRoute(IoTHubProvider):
|
|||
else:
|
||||
routing.routes = [route for route in routing.routes if route.source.lower() != source_type.lower()]
|
||||
|
||||
try:
|
||||
return self.discovery.client.begin_create_or_update(
|
||||
resource_group_name=self.hub_resource.additional_properties['resourcegroup'],
|
||||
resource_name=self.hub_resource.name,
|
||||
iot_hub_description=self.hub_resource,
|
||||
if_match=self.hub_resource.etag
|
||||
)
|
||||
except HttpResponseError as e:
|
||||
handle_service_exception(e)
|
||||
|
||||
def test(
|
||||
self,
|
||||
|
|
|
@ -5,7 +5,15 @@
|
|||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
import os
|
||||
from azure.cli.core.azclierror import CLIInternalError
|
||||
from azure.cli.core.azclierror import (
|
||||
AzureResponseError,
|
||||
RequiredArgumentMissingError,
|
||||
ResourceNotFoundError,
|
||||
CLIInternalError,
|
||||
UnauthorizedError,
|
||||
InvalidArgumentValueError,
|
||||
)
|
||||
import pytest
|
||||
from azext_iot.common.embedded_cli import EmbeddedCLI
|
||||
from azext_iot.common.shared import EntityStatusType
|
||||
from azext_iot.tests.dps import DATAPLANE_AUTH_TYPES, clean_dps_dataplane
|
||||
|
@ -31,26 +39,40 @@ def test_dps_device_registration_symmetrickey_lifecycle(provisioned_iot_dps_modu
|
|||
group_id, device_id1, device_id2 = generate_names(count=3)
|
||||
|
||||
# Enrollment needs to be created
|
||||
registration_result = cli.invoke(
|
||||
with pytest.raises(ResourceNotFoundError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --dps-name {dps_name} -g {dps_rg} --group-id {group_id} "
|
||||
f"--registration-id {device_id1}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
assert registration_result.success() is False
|
||||
|
||||
# Cannot retrieve device credentials
|
||||
registration_result = cli.invoke(
|
||||
if auth_phase == "cstring":
|
||||
with pytest.raises(ResourceNotFoundError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --id-scope {id_scope} --group-id {group_id} "
|
||||
f"--registration-id {device_id1}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
else:
|
||||
with pytest.raises(RequiredArgumentMissingError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --id-scope {id_scope} --group-id {group_id} "
|
||||
f"--registration-id {device_id1}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
assert registration_result.success() is False
|
||||
|
||||
# Regular enrollment group
|
||||
keys = cli.invoke(
|
||||
|
@ -201,15 +223,16 @@ def test_dps_device_registration_symmetrickey_lifecycle(provisioned_iot_dps_modu
|
|||
compare_registrations(device2_registration, service_side)
|
||||
|
||||
# Cannot use group key as device key
|
||||
registration_result = cli.invoke(
|
||||
with pytest.raises(UnauthorizedError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --dps-name {dps_name} -g {dps_rg} --registration-id {device_id1} "
|
||||
f"--key {keys['primaryKey']}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
assert registration_result.success() is False
|
||||
|
||||
# Try with payload
|
||||
payload = {"Thermostat": {"$metadata": {}}}
|
||||
|
@ -253,15 +276,16 @@ def test_dps_device_registration_x509_lifecycle(provisioned_iot_dps_module):
|
|||
group_id = generate_names()
|
||||
|
||||
# Enrollment needs to be created
|
||||
registration_result = cli.invoke(
|
||||
with pytest.raises(UnauthorizedError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --dps-name {dps_name} -g {dps_rg} --registration-id {devices[0][0]} "
|
||||
f"--cp {devices[0][0] + CERT_ENDING} --kp {devices[0][0] + KEY_ENDING}",
|
||||
f"iot device registration create --dps-name {dps_name} -g {dps_rg} --registration-id "
|
||||
f"{devices[0][0]} --group-id {group_id} --cp {devices[0][0] + CERT_ENDING} --kp {devices[0][0] + KEY_ENDING}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
)
|
||||
assert registration_result.success() is False
|
||||
|
||||
# Create enrollment group
|
||||
cli.invoke(
|
||||
|
@ -274,14 +298,16 @@ def test_dps_device_registration_x509_lifecycle(provisioned_iot_dps_module):
|
|||
)
|
||||
|
||||
# Need to specify file - cannot retrieve need info from service
|
||||
registration_result = cli.invoke(
|
||||
with pytest.raises(InvalidArgumentValueError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --dps-name {dps_name} -g {dps_rg} --registration-id {devices[0][0]}",
|
||||
f"iot device registration create --dps-name {dps_name} -g {dps_rg} --registration-id {devices[0][0]} "
|
||||
f"--group-id {group_id}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
assert registration_result.success() is False
|
||||
|
||||
# Normal registration
|
||||
registration_states = []
|
||||
|
@ -390,14 +416,15 @@ def test_dps_device_registration_unlinked_hub(provisioned_iot_dps_no_hub_module)
|
|||
).as_json()
|
||||
|
||||
# registration throws error
|
||||
registration_result = cli.invoke(
|
||||
with pytest.raises(AzureResponseError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --id-scope {id_scope} --registration-id {device_id} --key {device_key}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
)
|
||||
assert registration_result.success() is False
|
||||
|
||||
# Can see registration
|
||||
show_registration_result = cli.invoke(
|
||||
|
@ -438,15 +465,16 @@ def test_dps_device_registration_disabled_enrollment(provisioned_iot_dps_module)
|
|||
raise AssertionError(f"Failed to create enrollment group with attestation-type {auth_phase}")
|
||||
|
||||
# Registration throws error
|
||||
registration_result = cli.invoke(
|
||||
with pytest.raises(AzureResponseError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --group-id {group_id} -g {dps_rg} --dps-name {dps_name} "
|
||||
f"--registration-id {device_id}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
assert registration_result.success() is False
|
||||
|
||||
# Can see registration
|
||||
registration = cli.invoke(
|
||||
|
|
|
@ -4,7 +4,15 @@
|
|||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
from azure.cli.core.azclierror import CLIInternalError
|
||||
from azure.cli.core.azclierror import (
|
||||
AzureResponseError,
|
||||
RequiredArgumentMissingError,
|
||||
ResourceNotFoundError,
|
||||
CLIInternalError,
|
||||
UnauthorizedError,
|
||||
InvalidArgumentValueError
|
||||
)
|
||||
import pytest
|
||||
from azext_iot.common.embedded_cli import EmbeddedCLI
|
||||
from azext_iot.common.shared import EntityStatusType, AttestationType
|
||||
from azext_iot.tests.dps import (
|
||||
|
@ -32,24 +40,37 @@ def test_dps_device_registration_symmetrickey_lifecycle(provisioned_iot_dps_modu
|
|||
enrollment_id, device_id = generate_names(count=2)
|
||||
|
||||
# Enrollment needs to be created
|
||||
enrollment_result = cli.invoke(
|
||||
with pytest.raises(ResourceNotFoundError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --dps-name {dps_name} -g {dps_rg} --registration-id {enrollment_id}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
)
|
||||
assert enrollment_result.success() is False
|
||||
|
||||
# Cannot retrieve device credentials
|
||||
enrollment_result = cli.invoke(
|
||||
if auth_phase == "cstring":
|
||||
with pytest.raises(ResourceNotFoundError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --id-scope {id_scope} --registration-id {enrollment_id}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
else:
|
||||
with pytest.raises(RequiredArgumentMissingError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --id-scope {id_scope} --registration-id {enrollment_id}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
assert enrollment_result.success() is False
|
||||
|
||||
# Enrollment with no device id; deviceId becomes enrollmentId
|
||||
keys = cli.invoke(
|
||||
|
@ -112,14 +133,16 @@ def test_dps_device_registration_symmetrickey_lifecycle(provisioned_iot_dps_modu
|
|||
|
||||
# Unauthorized
|
||||
bad_key = keys["primaryKey"].replace(keys["primaryKey"][0], "")
|
||||
bad_registration = cli.invoke(
|
||||
|
||||
with pytest.raises((ValueError, UnauthorizedError)):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --id-scope {id_scope} --registration-id {enrollment_id} --key {bad_key}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
assert bad_registration.success() is False
|
||||
|
||||
# Try secondary key
|
||||
registration = cli.invoke(
|
||||
|
@ -232,14 +255,15 @@ def test_dps_device_registration_x509_lifecycle(provisioned_iot_dps_module):
|
|||
device_id = generate_names()
|
||||
|
||||
# Enrollment needs to be created
|
||||
enrollment_result = cli.invoke(
|
||||
with pytest.raises(ResourceNotFoundError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --dps-name {dps_name} -g {dps_rg} --registration-id {cert_name}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
)
|
||||
assert enrollment_result.success() is False
|
||||
|
||||
# Enrollment with no device id; deviceId becomes enrollmentId
|
||||
cli.invoke(
|
||||
|
@ -252,14 +276,15 @@ def test_dps_device_registration_x509_lifecycle(provisioned_iot_dps_module):
|
|||
)
|
||||
|
||||
# Need to specify file - cannot retrieve need info from service
|
||||
enrollment_result = cli.invoke(
|
||||
with pytest.raises(InvalidArgumentValueError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --dps-name {dps_name} -g {dps_rg} --registration-id {cert_name}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
)
|
||||
assert enrollment_result.success() is False
|
||||
|
||||
# Normal registration
|
||||
registration = cli.invoke(
|
||||
|
@ -412,16 +437,17 @@ def test_dps_device_registration_unlinked_hub(provisioned_iot_dps_no_hub_module)
|
|||
raise AssertionError(f"Failed to create enrollment with auth-type {auth_phase}")
|
||||
key = result.as_json()["attestation"]["symmetricKey"]["primaryKey"]
|
||||
|
||||
# registration throws error
|
||||
registration_result = cli.invoke(
|
||||
# registration throws
|
||||
with pytest.raises(AzureResponseError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create --id-scope {id_scope} --registration-id {enrollment_id} "
|
||||
f"--key {key}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
)
|
||||
assert registration_result.success() is False
|
||||
|
||||
# Can see registration
|
||||
show_registration_result = cli.invoke(
|
||||
|
@ -464,14 +490,15 @@ def test_dps_device_registration_disabled_enrollment(provisioned_iot_dps_module)
|
|||
raise AssertionError(f"Failed to create enrollment with attestation-type {attestation_type}")
|
||||
|
||||
# registration throws error
|
||||
registration_result = cli.invoke(
|
||||
with pytest.raises(AzureResponseError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot device registration create -g {dps_rg} --dps-name {dps_name} --registration-id {enrollment_id}",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
)
|
||||
assert registration_result.success() is False
|
||||
|
||||
# Can see registration
|
||||
registration = cli.invoke(
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
import pytest
|
||||
from azure.cli.core.azclierror import BadRequestError
|
||||
from azext_iot.common.embedded_cli import EmbeddedCLI
|
||||
from azext_iot.common.shared import EntityStatusType, AttestationType, AllocationType, ReprovisionType
|
||||
from azext_iot.common.utility import generate_key
|
||||
|
@ -84,15 +86,16 @@ def test_dps_enrollment_group_x509_lifecycle(provisioned_iot_dps_module):
|
|||
# assert enrollment_show["attestation"]["x509"]
|
||||
|
||||
# Compute Device Key only works for symmetric key enrollment groups
|
||||
failure_command = cli.invoke(
|
||||
with pytest.raises(BadRequestError):
|
||||
cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
f"iot dps enrollment-group compute-device-key -g {dps_rg} --dps-name {dps_name} "
|
||||
f"--enrollment-id {enrollment_id} --registration-id myarbitrarydeviceId",
|
||||
auth_type=auth_phase,
|
||||
cstring=dps_cstring
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
)
|
||||
assert failure_command.success() is False
|
||||
|
||||
enrollment_update = cli.invoke(
|
||||
set_cmd_auth_type(
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
from typing import Optional
|
||||
import pytest
|
||||
from azure.cli.core.azclierror import BadRequestError
|
||||
from azext_iot.common.utility import ensure_iothub_sdk_min_version
|
||||
from azext_iot.iothub.common import AuthenticationType, RouteSourceType
|
||||
from azext_iot.common.embedded_cli import EmbeddedCLI
|
||||
|
@ -41,6 +42,13 @@ def test_iot_eventhub_endpoint_lifecycle(provisioned_event_hub_with_identity_mod
|
|||
endpoint_uri = "sb:" + event_hub_obj["namespace"]["serviceBusEndpoint"].split(":")[1]
|
||||
eventhub_cs = event_hub_obj["connectionString"]
|
||||
endpoint_names = generate_ep_names(3)
|
||||
# Ensure there are no endpoints
|
||||
cli.invoke(
|
||||
"iot hub message-endpoint delete -n {} -g {} -y -f".format(
|
||||
iot_hub, iot_rg
|
||||
)
|
||||
)
|
||||
|
||||
# use connection string - note how the connection string needs to have entity path and the
|
||||
# endpoint uri and path are left blank
|
||||
cli.invoke(
|
||||
|
@ -176,6 +184,12 @@ def test_iot_servicebus_endpoint_lifecycle(provisioned_service_bus_with_identity
|
|||
iot_rg = iot_hub_obj["resourcegroup"]
|
||||
iot_sub = iot_hub_obj["subscriptionid"]
|
||||
user_id = list(iot_hub_obj["identity"]["userAssignedIdentities"].keys())[0]
|
||||
# Ensure there are no endpoints
|
||||
cli.invoke(
|
||||
"iot hub message-endpoint delete -n {} -g {} -y -f".format(
|
||||
iot_hub, iot_rg
|
||||
)
|
||||
)
|
||||
|
||||
queue_instance = servicebus_obj["queue"]["name"]
|
||||
topic_instance = servicebus_obj["topic"]["name"]
|
||||
|
@ -456,6 +470,12 @@ def test_iot_storage_endpoint_lifecycle(provisioned_storage_with_identity_module
|
|||
iot_rg = iot_hub_obj["resourcegroup"]
|
||||
iot_sub = iot_hub_obj["subscriptionid"]
|
||||
user_id = list(iot_hub_obj["identity"]["userAssignedIdentities"].keys())[0]
|
||||
# Ensure there are no endpoints
|
||||
cli.invoke(
|
||||
"iot hub message-endpoint delete -n {} -g {} -y -f".format(
|
||||
iot_hub, iot_rg
|
||||
)
|
||||
)
|
||||
|
||||
endpoint_names = generate_ep_names(3)
|
||||
storage_cs = storage_obj["connectionString"]
|
||||
|
@ -623,6 +643,12 @@ def test_iot_cosmos_endpoint_lifecycle(provisioned_cosmosdb_with_identity_module
|
|||
iot_rg = iot_hub_obj["resourcegroup"]
|
||||
iot_sub = iot_hub_obj["subscriptionid"]
|
||||
user_id = list(iot_hub_obj["identity"]["userAssignedIdentities"].keys())[0]
|
||||
# Ensure there are no endpoints
|
||||
cli.invoke(
|
||||
"iot hub message-endpoint delete -n {} -g {} -y -f".format(
|
||||
iot_hub, iot_rg
|
||||
)
|
||||
)
|
||||
|
||||
cosmos_cstring = cosmosdb_obj["connectionString"]
|
||||
database = cosmosdb_obj["database"]["name"]
|
||||
|
@ -837,12 +863,13 @@ def test_iot_endpoint_force_delete(provisioned_service_bus_with_identity_module)
|
|||
)
|
||||
|
||||
# try delete with name without force
|
||||
delete_result = cli.invoke(
|
||||
with pytest.raises(BadRequestError):
|
||||
cli.invoke(
|
||||
"iot hub message-endpoint delete -n {} -g {} --en {} -y".format(
|
||||
iot_hub, iot_rg, endpoint_names[0],
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
)
|
||||
assert delete_result.success() is False
|
||||
|
||||
# delete with name force
|
||||
delete_result = cli.invoke(
|
||||
|
@ -910,12 +937,13 @@ def test_iot_endpoint_force_delete(provisioned_service_bus_with_identity_module)
|
|||
)
|
||||
|
||||
# delete by endpoint type without force
|
||||
delete_result = cli.invoke(
|
||||
with pytest.raises(BadRequestError):
|
||||
cli.invoke(
|
||||
"iot hub message-endpoint delete -n {} -g {} -t {} -y".format(
|
||||
iot_hub, iot_rg, "servicebus-topic",
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
)
|
||||
assert delete_result.success() is False
|
||||
|
||||
# delete by endpoint type with force
|
||||
delete_result = cli.invoke(
|
||||
|
@ -987,12 +1015,14 @@ def test_iot_endpoint_force_delete(provisioned_service_bus_with_identity_module)
|
|||
)
|
||||
|
||||
# delete all endpoints without force
|
||||
delete_result = cli.invoke(
|
||||
|
||||
with pytest.raises(BadRequestError):
|
||||
cli.invoke(
|
||||
"iot hub message-endpoint delete -n {} -g {} -y".format(
|
||||
iot_hub, iot_rg,
|
||||
),
|
||||
capture_stderr=True
|
||||
)
|
||||
)
|
||||
assert delete_result.success() is False
|
||||
|
||||
# delete all endpoints with force
|
||||
delete_result = cli.invoke(
|
||||
|
|
|
@ -11,6 +11,7 @@ import sys
|
|||
|
||||
from unittest import mock
|
||||
from knack.util import CLIError
|
||||
from azure.cli.core.azclierror import CLIInternalError
|
||||
from azure.cli.core.extension import get_extension_path
|
||||
from azext_iot.common.utility import (
|
||||
handle_service_exception,
|
||||
|
@ -321,40 +322,36 @@ class TestVersionComparison(object):
|
|||
|
||||
|
||||
class TestEmbeddedCli(object):
|
||||
@pytest.fixture(params=[0, 1])
|
||||
@pytest.fixture(params=[0, 1, 2])
|
||||
def mocked_azclient(self, mocker, request):
|
||||
azclient = mocker.patch("azext_iot.common.embedded_cli.get_default_cli")
|
||||
|
||||
def mock_invoke(args, out_file):
|
||||
azclient.return_value.exception_handler("Generic Issue")
|
||||
azclient.return_value.result.error = None
|
||||
if request.param == 0:
|
||||
out_file.write(json.dumps({"generickey": "genericvalue"}))
|
||||
else:
|
||||
out_file.write("Something not json")
|
||||
if request.param == 1:
|
||||
azclient.return_value.result.error = CLIError("Generic Error")
|
||||
|
||||
return request.param
|
||||
|
||||
azclient = mocker.patch("azext_iot.common.embedded_cli.get_default_cli")
|
||||
azclient.return_value.invoke.side_effect = mock_invoke
|
||||
azclient.test_meta.error_code = request.param
|
||||
return azclient
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"command, user_subscription, subscription",
|
||||
[
|
||||
("iot hub device-identity create -n abcd -d dcba", None, None),
|
||||
(
|
||||
"iot hub device-twin show -n 'abcd' -d 'dcba'",
|
||||
"20a300e5-a444-4130-bb5a-1abd08ad930a",
|
||||
None,
|
||||
),
|
||||
(
|
||||
@pytest.mark.parametrize("command", [
|
||||
"iot hub device-identity create -n abcd -d dcba",
|
||||
None,
|
||||
"20a300e5-a444-4130-bb5a-1abd08ad930a",
|
||||
),
|
||||
(
|
||||
"iot hub device-twin show -n 'abcd' -d 'dcba'",
|
||||
"20a300e5-a444-4130-bb5a-1abd08ad930a",
|
||||
"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
|
||||
),
|
||||
],
|
||||
)
|
||||
"iot hub device-twin show -n 'abcd' -d 'dcba'"
|
||||
])
|
||||
@pytest.mark.parametrize("user_subscription", [None, "20a300e5-a444-4130-bb5a-1abd08ad930a"])
|
||||
@pytest.mark.parametrize("subscription", [None, "40a300e5-4130-a444-bb5a-1abd08ad930a"])
|
||||
@pytest.mark.parametrize("init_capture_stderr", [True, False])
|
||||
@pytest.mark.parametrize("capture_stderr", [None, True, False])
|
||||
def test_embedded_cli(
|
||||
self, mocker, mocked_azclient, command, user_subscription, subscription
|
||||
self, mocker, mocked_azclient, command, user_subscription, subscription, init_capture_stderr, capture_stderr
|
||||
):
|
||||
import shlex
|
||||
|
||||
|
@ -362,8 +359,18 @@ class TestEmbeddedCli(object):
|
|||
cli_ctx.data = {}
|
||||
if user_subscription:
|
||||
cli_ctx.data["subscription_id"] = user_subscription
|
||||
cli = EmbeddedCLI(cli_ctx)
|
||||
cli.invoke(command=command, subscription=subscription)
|
||||
|
||||
expected_count = 0 if (capture_stderr is None and init_capture_stderr) or capture_stderr else 1
|
||||
cli = EmbeddedCLI(cli_ctx, capture_stderr=init_capture_stderr)
|
||||
|
||||
if mocked_azclient.test_meta.error_code != 1 or expected_count == 1:
|
||||
cli.invoke(command=command, subscription=subscription, capture_stderr=capture_stderr)
|
||||
else:
|
||||
with pytest.raises(CLIError) as e:
|
||||
cli.invoke(command=command, subscription=subscription, capture_stderr=capture_stderr)
|
||||
assert "Generic Error" in str(e.value)
|
||||
|
||||
assert cli.az_cli.exception_handler.call_count == expected_count
|
||||
|
||||
# Due to forced json output
|
||||
command += " -o json"
|
||||
|
@ -377,14 +384,16 @@ class TestEmbeddedCli(object):
|
|||
call = mocked_azclient().invoke.call_args_list[0]
|
||||
actual_args, _ = call
|
||||
assert expected_args == actual_args[0]
|
||||
assert cli.output
|
||||
success = cli.success()
|
||||
|
||||
if mocked_azclient.test_meta.error_code == 1:
|
||||
if mocked_azclient.test_meta.error_code > 0:
|
||||
assert not success
|
||||
if mocked_azclient.test_meta.error_code == 2:
|
||||
with pytest.raises(CLIInternalError) as e:
|
||||
cli.as_json()
|
||||
assert "Issue parsing received payload" in str(e.value)
|
||||
elif mocked_azclient.test_meta.error_code == 0:
|
||||
assert success
|
||||
|
||||
assert cli.output
|
||||
assert cli.as_json()
|
||||
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче