зеркало из
1
0
Форкнуть 0
This commit is contained in:
McCoy Patiño 2023-12-13 09:40:03 -08:00 коммит произвёл GitHub
Родитель 630b103822
Коммит ad5ddf4e71
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
28 изменённых файлов: 66 добавлений и 605 удалений

Просмотреть файл

@ -2,10 +2,10 @@ import asyncio
from unittest.mock import Mock
from azure.core.credentials import AccessToken
from devtools_testutils import AzureMgmtTestCase
from devtools_testutils import AzureMgmtRecordedTestCase
class AzureMgmtAsyncTestCase(AzureMgmtTestCase):
class AzureMgmtAsyncTestCase(AzureMgmtRecordedTestCase):
def setUp(self):
super(AzureMgmtAsyncTestCase, self).setUp()

Просмотреть файл

@ -2,9 +2,9 @@ import asyncio
from unittest.mock import Mock
from azure.core.credentials import AccessToken
from devtools_testutils import AzureMgmtTestCase
from devtools_testutils import AzureMgmtRecordedTestCase
class AzureMgmtAsyncTestCase(AzureMgmtTestCase):
class AzureMgmtAsyncTestCase(AzureMgmtRecordedTestCase):
def setUp(self):
super(AzureMgmtAsyncTestCase, self).setUp()

Просмотреть файл

@ -1,14 +0,0 @@
# coding: utf-8
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from devtools_testutils import AzureTestCase
from azure.developer.devcenter.aio import DevCenterClient
class DevcenterAsyncTest(AzureTestCase):
def create_client(self, endpoint):
credential = self.get_credential(DevCenterClient)
return DevCenterClient(endpoint=endpoint, credential=credential)

Просмотреть файл

@ -5,7 +5,7 @@
# license information.
# --------------------------------------------------------------------------
import functools
from devtools_testutils import AzureTestCase, AzureRecordedTestCase, EnvironmentVariableLoader
from devtools_testutils import AzureRecordedTestCase, EnvironmentVariableLoader
from azure.defender.easm import EasmClient
import datetime

Просмотреть файл

@ -2,9 +2,9 @@ import asyncio
from unittest.mock import Mock
from azure.core.credentials import AccessToken
from devtools_testutils import AzureMgmtTestCase
from devtools_testutils import AzureMgmtRecordedTestCase
class AzureMgmtAsyncTestCase(AzureMgmtTestCase):
class AzureMgmtAsyncTestCase(AzureMgmtRecordedTestCase):
def setUp(self):
super(AzureMgmtAsyncTestCase, self).setUp()

Просмотреть файл

@ -10,12 +10,12 @@ import unittest
import azure.mgmt.hanaonazure.models
from devtools_testutils import AzureMgmtTestCase, ResourceGroupPreparer
from devtools_testutils import AzureMgmtRecordedTestCase, ResourceGroupPreparer
class MgmtHanaOnAzureTest(AzureMgmtTestCase):
class TestMgmtHanaOnAzure(AzureMgmtRecordedTestCase):
def setUp(self):
super(MgmtHanaOnAzureTest, self).setUp()
super(TestMgmtHanaOnAzure, self).setUp()
self.hanaonazure_client = self.create_mgmt_client(
azure.mgmt.hanaonazure.HanaManagementClient

Просмотреть файл

@ -1,11 +1,11 @@
from azure.mgmt.managementpartner import ACEProvisioningManagementPartnerAPI
from azure.mgmt.managementpartner.models import PartnerResponse
from devtools_testutils import AzureMgmtTestCase
from devtools_testutils import AzureMgmtRecordedTestCase
import unittest
@unittest.skip("hard to test")
class MgmtPartnerTest(AzureMgmtTestCase):
class TestMgmtPartner(AzureMgmtRecordedTestCase):
def _validate_partner(self, PartnerResponse):
self.assertIsNotNone(PartnerResponse)
@ -20,7 +20,7 @@ class MgmtPartnerTest(AzureMgmtTestCase):
self.assertIsNotNone(PartnerResponse.version)
def setUp(self):
super(MgmtPartnerTest, self).setUp()
super(TestMgmtPartner, self).setUp()
self.managementpartner_client = self.create_basic_client(ACEProvisioningManagementPartnerAPI)
def test_managementpartner_get(self):

Просмотреть файл

@ -2,28 +2,25 @@ import sys
import pytest
import unittest
from unittest.mock import Mock
from devtools_testutils import AzureTestCase
from azure.core.credentials import AzureKeyCredential
from azure.maps.search.models import StructuredAddress
from azure.maps.search import MapsSearchClient
from azure.maps.search._shared import converter
# cSpell:disable
class AzureMapsSearchClientUnitTest(AzureTestCase):
def test_fuzzy_search_invalid_top():
client = MapsSearchClient(
credential=Mock(AzureKeyCredential)
)
with pytest.raises(TypeError):
client.search_point_of_interest_category(StructuredAddress())
def test_fuzzy_search_invalid_top(self):
client = MapsSearchClient(
credential=Mock(AzureKeyCredential)
)
with pytest.raises(TypeError):
client.search_point_of_interest_category(StructuredAddress())
def test_search_structured_address(self):
client = MapsSearchClient(
credential=Mock(AzureKeyCredential)
)
with pytest.raises(TypeError):
client.search_structured_address(StructuredAddress())
def test_search_structured_address():
client = MapsSearchClient(
credential=Mock(AzureKeyCredential)
)
with pytest.raises(TypeError):
client.search_structured_address(StructuredAddress())
class TestConverter(unittest.TestCase):

Просмотреть файл

@ -11,7 +11,7 @@ import azure.mgmt.media
import azure.mgmt.storage
from devtools_testutils import (
AzureMgmtTestCase, ResourceGroupPreparer,
AzureMgmtRecordedTestCase, ResourceGroupPreparer,
StorageAccountPreparer, FakeStorageAccount,
)
@ -28,7 +28,7 @@ FAKE_STORAGE = FakeStorageAccount(
)
raise unittest.SkipTest("Skipping all tests")
class MgmtMediaTest(AzureMgmtTestCase):
class TestMgmtMedia(AzureMgmtRecordedTestCase):
def setUp(self):
super(MgmtMediaTest, self).setUp()

Просмотреть файл

@ -2,9 +2,9 @@ import asyncio
from unittest.mock import Mock
from azure.core.credentials import AccessToken
from devtools_testutils import AzureMgmtTestCase
from devtools_testutils import AzureMgmtRecordedTestCase
class AzureMgmtAsyncTestCase(AzureMgmtTestCase):
class AzureMgmtAsyncTestCase(AzureMgmtRecordedTestCase):
def setUp(self):
super(AzureMgmtAsyncTestCase, self).setUp()

Просмотреть файл

@ -1,6 +1,8 @@
import pytest
from azure.mgmt.reservations import AzureReservationAPI
from azure.mgmt.reservations.models import *
from devtools_testutils import AzureMgmtTestCase
from devtools_testutils import AzureMgmtRecordedTestCase
from azure.mgmt.reservations.models import (
ReservedResourceType,
InstanceFlexibility,
@ -12,11 +14,10 @@ import unittest
# change the custom endpoint to set the environment
_CUSTOM_ENDPOINT = "https://api-dogfood.resources.windows-int.net/"
@unittest.skip("skip test")
class MgmtReservationsTest(AzureMgmtTestCase):
@pytest.mark.skip("skip test")
class TestMgmtReservations(AzureMgmtRecordedTestCase):
def setUp(self):
super(MgmtReservationsTest, self).setUp()
def setup_method(self, _):
self.reservation_client = self.create_basic_client(AzureReservationAPI, base_url=_CUSTOM_ENDPOINT)
# self.reservation_client = self.create_basic_client(AzureReservationAPI)

Просмотреть файл

@ -2,9 +2,9 @@ import asyncio
from unittest.mock import Mock
from azure.core.credentials import AccessToken
from devtools_testutils import AzureMgmtTestCase
from devtools_testutils import AzureMgmtRecordedTestCase
class AzureMgmtAsyncTestCase(AzureMgmtTestCase):
class AzureMgmtAsyncTestCase(AzureMgmtRecordedTestCase):
def setUp(self):
super(AzureMgmtAsyncTestCase, self).setUp()

Просмотреть файл

@ -10,19 +10,20 @@
# applications: 10/10
# application_definitions: 7/7
import pytest
import unittest
import azure.mgmt.resource
# import azure.mgmt.managementgroups
import azure.mgmt.resource.resources.v2019_10_01
from azure.core.exceptions import HttpResponseError
from devtools_testutils import AzureMgmtTestCase, RandomNameResourceGroupPreparer
from devtools_testutils import AzureMgmtRecordedTestCase, RandomNameResourceGroupPreparer
@unittest.skip("Hard to test, skip them")
class MgmtResourceLinksTest(AzureMgmtTestCase):
@pytest.mark.skip("Hard to test, skip them")
class TestMgmtResourceLinks(AzureMgmtRecordedTestCase):
def setUp(self):
super(MgmtResourceLinksTest, self).setUp()
super(TestMgmtResourceLinks, self).setUp()
self.mgmt_client = self.create_mgmt_client(
azure.mgmt.resource.ApplicationClient
)

Просмотреть файл

@ -5,17 +5,18 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import pytest
import unittest
from azure.mgmt.resourcegraph import ResourceGraphClient
from azure.mgmt.resourcegraph.models import *
from devtools_testutils import AzureMgmtTestCase
from devtools_testutils import AzureMgmtRecordedTestCase
@unittest.skip("The test doesn't work.")
class MgmtResourceGraphTest(AzureMgmtTestCase):
@pytest.mark.skip("The test doesn't work.")
class TestMgmtResourceGraph(AzureMgmtRecordedTestCase):
def setUp(self):
super(MgmtResourceGraphTest, self).setUp()
super(TestMgmtResourceGraph, self).setUp()
self.resourcegraph_client = self.create_basic_client(
ResourceGraphClient
)

Просмотреть файл

@ -30,14 +30,12 @@ import json
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.schemaregistry.encoder.avroencoder import InvalidContentError, InvalidSchemaError
from devtools_testutils import AzureRecordedTestCase, EnvironmentVariableLoader, recorded_by_proxy, AzureTestCase
from devtools_testutils import AzureRecordedTestCase, EnvironmentVariableLoader, recorded_by_proxy
import avro
from avro.errors import AvroTypeException
from azure.schemaregistry.encoder.avroencoder._apache_avro_encoder import ApacheAvroObjectEncoder as AvroObjectEncoder
from devtools_testutils import AzureTestCase
SchemaRegistryEnvironmentVariableLoader = functools.partial(
EnvironmentVariableLoader,
"schemaregistry",

Просмотреть файл

@ -40,11 +40,11 @@ from azure.schemaregistry.serializer.avroserializer._apache_avro_serializer impo
from azure.identity import ClientSecretCredential
from azure.core.exceptions import ClientAuthenticationError, ServiceRequestError, HttpResponseError
from devtools_testutils import AzureTestCase, PowerShellPreparer
from devtools_testutils import AzureRecordedTestCase, PowerShellPreparer
SchemaRegistryPowerShellPreparer = functools.partial(PowerShellPreparer, "schemaregistry", schemaregistry_fully_qualified_namespace="fake_resource.servicebus.windows.net/", schemaregistry_group="fakegroup")
class AvroSerializerTests(AzureTestCase):
class TestAvroSerializer(AzureRecordedTestCase):
def test_raw_avro_serializer(self):
schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}"""

Просмотреть файл

@ -40,11 +40,11 @@ from azure.schemaregistry.serializer.avroserializer.exceptions import SchemaPars
from azure.identity.aio import ClientSecretCredential
from azure.core.exceptions import ClientAuthenticationError, ServiceRequestError, HttpResponseError
from devtools_testutils import AzureTestCase, PowerShellPreparer
from devtools_testutils import AzureRecordedTestCase, PowerShellPreparer
SchemaRegistryPowerShellPreparer = functools.partial(PowerShellPreparer, "schemaregistry", schemaregistry_fully_qualified_namespace="fake_resource.servicebus.windows.net/", schemaregistry_group="fakegroup")
class AvroSerializerAsyncTests(AzureTestCase):
class TestAvroSerializerAsync(AzureRecordedTestCase):
def create_client(self, fully_qualified_namespace):
credential = self.get_credential(SchemaRegistryClient, is_async=True)

Просмотреть файл

@ -60,7 +60,7 @@ from azure.servicebus.exceptions import (
MessageSizeExceededError,
OperationTimeoutError
)
from devtools_testutils import AzureMgmtRecordedTestCase, AzureTestCase
from devtools_testutils import AzureMgmtRecordedTestCase, AzureRecordedTestCase
from servicebus_preparer import (
CachedServiceBusNamespacePreparer,
CachedServiceBusQueuePreparer,
@ -1756,7 +1756,7 @@ class TestServiceBusQueueAsync(AzureMgmtRecordedTestCase):
await receiver.complete_message(messages[0])
@pytest.mark.skip('hard to test')
@AzureTestCase.await_prepared_test
@AzureRecordedTestCase.await_prepared_test
async def test_async_queue_mock_auto_lock_renew_callback(self):
# A warning to future devs: If the renew period override heuristic in registration
# ever changes, it may break this (since it adjusts renew period if it is not short enough)
@ -1839,7 +1839,7 @@ class TestServiceBusQueueAsync(AzureMgmtRecordedTestCase):
assert not results
assert not errors
@AzureTestCase.await_prepared_test
@AzureRecordedTestCase.await_prepared_test
async def test_async_queue_mock_no_reusing_auto_lock_renew(self):
auto_lock_renew = AutoLockRenewer()
auto_lock_renew._renew_period = 1

Просмотреть файл

@ -2,9 +2,9 @@ import asyncio
from unittest.mock import Mock
from azure.core.credentials import AccessToken
from devtools_testutils import AzureMgmtTestCase
from devtools_testutils import AzureMgmtRecordedTestCase
class AzureMgmtAsyncTestCase(AzureMgmtTestCase):
class AzureMgmtAsyncTestCase(AzureMgmtRecordedTestCase):
def setUp(self):
super(AzureMgmtAsyncTestCase, self).setUp()

Просмотреть файл

@ -5,14 +5,11 @@
# license information.
# --------------------------------------------------------------------------
import functools
from devtools_testutils import AzureTestCase, PowerShellPreparer
from devtools_testutils import AzureRecordedTestCase, PowerShellPreparer
from azure.messaging.webpubsubservice import WebPubSubServiceClient
class WebpubsubTest(AzureTestCase):
def __init__(self, method_name, **kwargs):
super(WebpubsubTest, self).__init__(method_name, **kwargs)
class WebpubsubTest(AzureRecordedTestCase):
def create_client(self, endpoint=None, hub=None, reverse_proxy_endpoint=None, **kwargs):
if kwargs.get("connection_string"):
return WebPubSubServiceClient.from_connection_string(kwargs.pop("connection_string"), hub, **kwargs)

Просмотреть файл

@ -4,14 +4,11 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from devtools_testutils import AzureTestCase
from devtools_testutils import AzureRecordedTestCase
from azure.messaging.webpubsubservice.aio import WebPubSubServiceClient
class WebpubsubAsyncTest(AzureTestCase):
def __init__(self, method_name, **kwargs):
super(WebpubsubAsyncTest, self).__init__(method_name, **kwargs)
class WebpubsubAsyncTest(AzureRecordedTestCase):
def create_client(self, endpoint=None, hub=None, reverse_proxy_endpoint=None, **kwargs):
if kwargs.get("connection_string"):
return WebPubSubServiceClient.from_connection_string(kwargs.pop("connection_string"), hub, **kwargs)

Просмотреть файл

@ -3,7 +3,7 @@
This package is intended for usage in direct combination with the azure-sdk-for-python repo. It provides:
- Common test classes and functionality
- `AzureTestCase` used for common record/playback functionality
- `AzureRecordedTestCase` used for common record/playback functionality
- `EnvironmentVariablePreparer` to allow recorded tests access to `New-TestResources.ps1`-created resources.
- Test-Proxy Shim/Startup capabilities
- `Build` entrypoint

Просмотреть файл

@ -1,7 +1,7 @@
from .mgmt_testcase import AzureMgmtTestCase, AzureMgmtPreparer
from .mgmt_testcase import AzureMgmtPreparer
from .mgmt_recorded_testcase import AzureMgmtRecordedTestCase
from .azure_recorded_testcase import AzureRecordedTestCase
from .azure_testcase import AzureTestCase, is_live, get_region_override
from .azure_testcase import is_live, get_region_override
from .resource_testcase import (
FakeResource,
ResourceGroupPreparer,
@ -72,7 +72,6 @@ __all__ = [
"add_uri_regex_sanitizer",
"add_uri_string_sanitizer",
"add_uri_subscription_id_sanitizer",
"AzureMgmtTestCase",
"AzureMgmtPreparer",
"AzureMgmtRecordedTestCase",
"AzureRecordedTestCase",
@ -84,7 +83,6 @@ __all__ = [
"BlobAccountPreparer",
"CachedStorageAccountPreparer",
"FakeStorageAccount",
"AzureTestCase",
"is_live",
"get_region_override",
"RandomNameResourceGroupPreparer",

Просмотреть файл

@ -3,46 +3,13 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import functools
import inspect
import logging
import os.path
import sys
import time
import zlib
try:
from inspect import getfullargspec as get_arg_spec
except ImportError:
from inspect import getargspec as get_arg_spec
try:
from urllib.parse import quote
except ImportError:
from urllib2 import quote # type: ignore
import pytest
from dotenv import load_dotenv, find_dotenv
from azure_devtools.scenario_tests import (
ReplayableTest,
AzureTestError,
GeneralNameReplacer,
RequestUrlNormalizer,
AuthenticationMetadataFilter,
OAuthRequestResponsesFilter,
)
from azure_devtools.scenario_tests.config import TestConfig
from azure_devtools.scenario_tests.utilities import trim_kwargs_from_test_function
from .config import TEST_SETTING_FILENAME
from . import mgmt_settings_fake as fake_settings
try:
# Try to import the AsyncFakeCredential, if we cannot assume it is Python 2
from .fake_credentials_async import AsyncFakeCredential
except SyntaxError:
pass
class HttpStatusCode(object):
@ -98,277 +65,5 @@ def _is_autorest_v3(client_class):
Could be refined later if necessary.
"""
args = get_arg_spec(client_class.__init__).args
args = inspect.getfullargspec(client_class.__init__).args
return "credential" in args
class AzureTestCase(ReplayableTest):
def __init__(
self,
method_name,
config_file=None,
recording_dir=None,
recording_name=None,
recording_processors=None,
replay_processors=None,
recording_patches=None,
replay_patches=None,
**kwargs
):
self.working_folder = os.path.dirname(__file__)
self.qualified_test_name = get_qualified_method_name(self, method_name)
self._fake_settings, self._real_settings = self._load_settings()
self.scrubber = GeneralNameReplacer()
config_file = config_file or os.path.join(self.working_folder, TEST_SETTING_FILENAME)
if not os.path.exists(config_file):
config_file = None
load_dotenv(find_dotenv())
super(AzureTestCase, self).__init__(
method_name,
config_file=config_file,
recording_dir=recording_dir,
recording_name=recording_name or self.qualified_test_name,
recording_processors=recording_processors or self._get_recording_processors(),
replay_processors=replay_processors or self._get_replay_processors(),
recording_patches=recording_patches,
replay_patches=replay_patches,
**kwargs
)
@property
def settings(self):
if self.is_live:
if self._real_settings:
return self._real_settings
else:
raise AzureTestError("Need a mgmt_settings_real.py file to run tests live.")
else:
return self._fake_settings
def _load_settings(self):
try:
from . import mgmt_settings_real as real_settings
return fake_settings, real_settings
except ImportError:
return fake_settings, None
def _get_recording_processors(self):
return [
self.scrubber,
AuthenticationMetadataFilter(),
OAuthRequestResponsesFilter(),
RequestUrlNormalizer(),
]
def _get_replay_processors(self):
return [RequestUrlNormalizer()]
def is_playback(self):
return not self.is_live
def get_settings_value(self, key):
key_value = os.environ.get("AZURE_" + key, None)
if key_value and self._real_settings and getattr(self._real_settings, key) != key_value:
raise ValueError(
"You have both AZURE_{key} env variable and mgmt_settings_real.py for {key} to different values".format(
key=key
)
)
if not key_value:
try:
key_value = getattr(self.settings, key)
except Exception:
print("Could not get {}".format(key))
raise
return key_value
def set_value_to_scrub(self, key, default_value):
if self.is_live:
value = self.get_settings_value(key)
self.scrubber.register_name_pair(value, default_value)
return value
else:
return default_value
def setUp(self):
# Every test uses a different resource group name calculated from its
# qualified test name.
#
# When running all tests serially, this allows us to delete
# the resource group in teardown without waiting for the delete to
# complete. The next test in line will use a different resource group,
# so it won't have any trouble creating its resource group even if the
# previous test resource group hasn't finished deleting.
#
# When running tests individually, if you try to run the same test
# multiple times in a row, it's possible that the delete in the previous
# teardown hasn't completed yet (because we don't wait), and that
# would make resource group creation fail.
# To avoid that, we also delete the resource group in the
# setup, and we wait for that delete to complete.
super(AzureTestCase, self).setUp()
def tearDown(self):
return super(AzureTestCase, self).tearDown()
def get_credential(self, client_class, **kwargs):
tenant_id = os.environ.get("AZURE_TENANT_ID", getattr(self._real_settings, "TENANT_ID", None))
client_id = os.environ.get("AZURE_CLIENT_ID", getattr(self._real_settings, "CLIENT_ID", None))
secret = os.environ.get("AZURE_CLIENT_SECRET", getattr(self._real_settings, "CLIENT_SECRET", None))
is_async = kwargs.pop("is_async", False)
if tenant_id and client_id and secret and self.is_live:
if _is_autorest_v3(client_class):
# Create azure-identity class
from azure.identity import ClientSecretCredential
if is_async:
from azure.identity.aio import ClientSecretCredential
return ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=secret)
else:
# Create msrestazure class
from msrestazure.azure_active_directory import (
ServicePrincipalCredentials,
)
return ServicePrincipalCredentials(tenant=tenant_id, client_id=client_id, secret=secret)
else:
if _is_autorest_v3(client_class):
if is_async:
if self.is_live:
raise ValueError(
"Async live doesn't support mgmt_setting_real, please set AZURE_TENANT_ID, AZURE_CLIENT_ID, AZURE_CLIENT_SECRET"
)
return AsyncFakeCredential()
else:
return self.settings.get_azure_core_credentials()
else:
return self.settings.get_credentials()
def create_client_from_credential(self, client_class, credential, **kwargs):
# Real client creation
# TODO decide what is the final argument for that
# if self.is_playback():
# kwargs.setdefault("polling_interval", 0)
if _is_autorest_v3(client_class):
kwargs.setdefault("logging_enable", True)
client = client_class(credential=credential, **kwargs)
else:
client = client_class(credentials=credential, **kwargs)
if self.is_playback():
try:
client._config.polling_interval = 0 # FIXME in azure-mgmt-core, make this a kwargs
except AttributeError:
pass
if hasattr(client, "config"): # Autorest v2
if self.is_playback():
client.config.long_running_operation_timeout = 0
client.config.enable_http_logger = True
return client
def create_basic_client(self, client_class, **kwargs):
"""DO NOT USE ME ANYMORE."""
logger = logging.getLogger()
logger.warning(
"'create_basic_client' will be deprecated in the future. It is recommended that you use \
'get_credential' and 'create_client_from_credential' to create your client."
)
credentials = self.get_credential(client_class)
return self.create_client_from_credential(client_class, credentials, **kwargs)
def create_random_name(self, name):
return get_resource_name(name, self.qualified_test_name.encode())
def get_resource_name(self, name):
"""Alias to create_random_name for back compatibility."""
return self.create_random_name(name)
def get_replayable_random_resource_name(self, name):
"""In a replay scenario, (is not live) gives the static moniker. In the random scenario, gives generated name."""
if self.is_live:
created_name = self.create_random_name(name)
self.scrubber.register_name_pair(created_name, name)
return name
def get_preparer_resource_name(self, prefix):
"""Random name generation for use by preparers.
If prefix is a blank string, use the fully qualified test name instead.
This is what legacy tests do for resource groups."""
return self.get_resource_name(prefix or self.qualified_test_name.replace(".", "_"))
@staticmethod
def await_prepared_test(test_fn):
"""Synchronous wrapper for async test methods. Used to avoid making changes
upstream to AbstractPreparer, which only awaits async tests that use preparers.
(Add @AzureTestCase.await_prepared_test decorator to async tests without preparers)
# Note: this will only be needed so long as we maintain unittest.TestCase in our
test-class inheritance chain.
"""
if sys.version_info < (3, 5):
raise ImportError("Async wrapper is not needed for Python 2.7 code.")
import asyncio
@functools.wraps(test_fn)
def run(test_class_instance, *args, **kwargs):
trim_kwargs_from_test_function(test_fn, kwargs)
loop = asyncio.get_event_loop()
return loop.run_until_complete(test_fn(test_class_instance, **kwargs))
return run
def sleep(self, seconds):
if self.is_live:
time.sleep(seconds)
def generate_sas(self, *args, **kwargs):
sas_func = args[0]
sas_func_pos_args = args[1:]
fake_value = kwargs.pop("fake_value", "fake_token_value")
token = sas_func(*sas_func_pos_args, **kwargs)
fake_token = self._create_fake_token(token, fake_value)
self._register_encodings(token, fake_token)
if self.is_live:
return token
return fake_token
def _register_encodings(self, token, fake_token):
self.scrubber.register_name_pair(token, fake_token)
url_safe_token = token.replace("/", "%2F")
self.scrubber.register_name_pair(url_safe_token, fake_token)
async_token = token.replace("%3A", ":")
self.scrubber.register_name_pair(async_token, fake_token)
def _create_fake_token(self, token, fake_value):
parts = token.split("&")
for idx, part in enumerate(parts):
if part.startswith("sig"):
key = part.split("=")
key[1] = fake_value
parts[idx] = "=".join(key)
elif part.startswith("st"):
key = part.split("=")
key[1] = "start"
parts[idx] = "=".join(key)
elif part.startswith("se"):
key = part.split("=")
key[1] = "end"
parts[idx] = "=".join(key)
return "&".join(parts)

Просмотреть файл

@ -1,127 +0,0 @@
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import time
from collections import namedtuple
from azure_devtools.scenario_tests import ReplayableTest
from devtools_testutils import AzureTestCase
from azure_devtools.scenario_tests.exceptions import AzureTestError
from . import AzureMgmtPreparer, ResourceGroupPreparer
from .resource_testcase import RESOURCE_GROUP_PARAM
from azure.mgmt.cognitiveservices import CognitiveServicesManagementClient
from msrest.authentication import CognitiveServicesCredentials
FakeCognitiveServicesAccount = namedtuple("FakeResource", ["endpoint"])
class CognitiveServiceTest(AzureTestCase):
"""Can be used for Track 1 tests"""
FILTER_HEADERS = ReplayableTest.FILTER_HEADERS + ["Ocp-Apim-Subscription-Key"]
def __init__(self, method_name):
super(CognitiveServiceTest, self).__init__(method_name)
class CognitiveServicesAccountPreparer(AzureMgmtPreparer):
def __init__(
self,
name_prefix="",
sku="S0",
location="westus2",
kind="cognitiveservices",
parameter_name="cognitiveservices_account",
legacy=False,
resource_group_parameter_name=RESOURCE_GROUP_PARAM,
disable_recording=True,
playback_fake_resource=None,
client_kwargs=None,
random_name_enabled=True,
**kwargs
):
super(CognitiveServicesAccountPreparer, self).__init__(
name_prefix,
24,
disable_recording=disable_recording,
playback_fake_resource=playback_fake_resource,
client_kwargs=client_kwargs,
random_name_enabled=random_name_enabled,
)
self.location = location
self.sku = sku
self.kind = kind
self.resource_group_parameter_name = resource_group_parameter_name
self.parameter_name = parameter_name
self.cogsci_key = ""
self.legacy = legacy
self.custom_subdomain_name = kwargs.pop("custom_subdomain_name", None)
def create_resource(self, name, **kwargs):
if self.is_live:
self.client = self.create_mgmt_client(CognitiveServicesManagementClient)
group = self._get_resource_group(**kwargs)
cogsci_account = self.client.accounts.begin_create(
group.name,
name,
account={
"sku": {"name": self.sku},
"location": self.location,
"kind": self.kind,
"properties": {"custom_sub_domain_name": self.custom_subdomain_name},
},
).result()
time.sleep(10) # it takes a few seconds to create a cognitive services account
self.resource = cogsci_account
self.cogsci_key = self.client.accounts.list_keys(group.name, name).key1
# FIXME: LuisAuthoringClient and LuisRuntimeClient need authoring key from ARM API (coming soon-ish)
else:
if self.custom_subdomain_name:
self.resource = FakeCognitiveServicesAccount(
"https://{}.cognitiveservices.azure.com".format(self.custom_subdomain_name)
)
self.cogsci_key = "ZmFrZV9hY29jdW50X2tleQ=="
else:
self.resource = FakeCognitiveServicesAccount(
"https://{}.api.cognitive.microsoft.com".format(self.location)
)
self.cogsci_key = "ZmFrZV9hY29jdW50X2tleQ=="
if self.legacy:
try:
return {
self.parameter_name: self.resource.properties.endpoint,
"{}_key".format(self.parameter_name): CognitiveServicesCredentials(self.cogsci_key),
}
except AttributeError:
return {
self.parameter_name: self.resource.endpoint,
"{}_key".format(self.parameter_name): CognitiveServicesCredentials(self.cogsci_key),
}
else:
try:
return {
self.parameter_name: self.resource.properties.endpoint,
"{}_key".format(self.parameter_name): self.cogsci_key,
}
except AttributeError:
return {
self.parameter_name: self.resource.endpoint,
"{}_key".format(self.parameter_name): self.cogsci_key,
}
def remove_resource(self, name, **kwargs):
if self.is_live:
group = self._get_resource_group(**kwargs)
self.client.accounts.begin_delete(group.name, name).wait()
def _get_resource_group(self, **kwargs):
try:
return kwargs.get(self.resource_group_parameter_name)
except KeyError:
template = (
"To create a cognitive services account a resource group is required. Please add "
"decorator @{} in front of this cognitive services account preparer."
)
raise AzureTestError(template.format(ResourceGroupPreparer.__name__))

Просмотреть файл

@ -3,26 +3,14 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from collections import namedtuple
import inspect
import re
import six
import os.path
import zlib
from azure_devtools.scenario_tests import (
ReplayableTest,
AzureTestError,
AbstractPreparer,
GeneralNameReplacer,
OAuthRequestResponsesFilter,
DeploymentNameReplacer,
RequestUrlNormalizer,
)
from azure_devtools.scenario_tests import AbstractPreparer, GeneralNameReplacer
from azure_devtools.scenario_tests.utilities import is_text_payload
from .azure_testcase import AzureTestCase
from .config import TEST_SETTING_FILENAME
from . import mgmt_settings_fake as fake_settings
class HttpStatusCode(object):
@ -54,77 +42,6 @@ def get_qualified_method_name(obj, method_name):
return "{0}.{1}".format(module_name, method_name)
class AzureMgmtTestCase(AzureTestCase):
def __init__(
self,
method_name,
config_file=None,
recording_dir=None,
recording_name=None,
recording_processors=None,
replay_processors=None,
recording_patches=None,
replay_patches=None,
**kwargs
):
self.region = "westus"
self.re_replacer = RENameReplacer()
super(AzureMgmtTestCase, self).__init__(
method_name,
config_file=config_file,
recording_dir=recording_dir,
recording_name=recording_name,
recording_processors=recording_processors,
replay_processors=replay_processors,
recording_patches=recording_patches,
replay_patches=replay_patches,
**kwargs
)
self.recording_processors.append(self.re_replacer)
def _setup_scrubber(self):
constants_to_scrub = ["SUBSCRIPTION_ID", "TENANT_ID"]
for key in constants_to_scrub:
key_value = self.get_settings_value(key)
if key_value and hasattr(self._fake_settings, key):
self.scrubber.register_name_pair(key_value, getattr(self._fake_settings, key))
def setUp(self):
# Every test uses a different resource group name calculated from its
# qualified test name.
#
# When running all tests serially, this allows us to delete
# the resource group in teardown without waiting for the delete to
# complete. The next test in line will use a different resource group,
# so it won't have any trouble creating its resource group even if the
# previous test resource group hasn't finished deleting.
#
# When running tests individually, if you try to run the same test
# multiple times in a row, it's possible that the delete in the previous
# teardown hasn't completed yet (because we don't wait), and that
# would make resource group creation fail.
# To avoid that, we also delete the resource group in the
# setup, and we wait for that delete to complete.
self._setup_scrubber()
super(AzureMgmtTestCase, self).setUp()
def tearDown(self):
return super(AzureMgmtTestCase, self).tearDown()
def create_mgmt_client(self, client_class, **kwargs):
# Whatever the client, if subscription_id is None, fail
with self.assertRaises(ValueError):
self.create_basic_client(client_class, subscription_id=None, **kwargs)
subscription_id = None
if self.is_live:
subscription_id = os.environ.get("AZURE_SUBSCRIPTION_ID", None)
if not subscription_id:
subscription_id = self.settings.SUBSCRIPTION_ID
return self.create_basic_client(client_class, subscription_id=subscription_id, **kwargs)
class AzureMgmtPreparer(AbstractPreparer):
def __init__(
self,