Co-authored-by: Spencer Amann <samann@redhat.com>
This commit is contained in:
Ellis Johnson 2023-03-23 15:02:22 +11:00 коммит произвёл GitHub
Родитель 9129880a9c
Коммит 54883f70b9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 902 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,346 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the Apache License 2.0.
import ipaddress
import re
from itertools import tee
from azure.cli.core.commands.client_factory import get_mgmt_service_client
from azure.cli.core.commands.validators import get_default_location_from_resource_group
from azure.cli.core.profiles import ResourceType
from azure.cli.core.azclierror import CLIInternalError, InvalidArgumentValueError, \
RequiredArgumentMissingError
from azure.core.exceptions import ResourceNotFoundError
from knack.log import get_logger
from msrestazure.tools import is_valid_resource_id
from msrestazure.tools import parse_resource_id
from msrestazure.azure_exceptions import CloudError
from azext_aro._validators import validate_vnet, validate_cidr
from azext_aro._rbac import has_role_assignment_on_resource
import azext_aro.custom
logger = get_logger(__name__)
def can_do_action(perms, action):
for perm in perms:
for not_action in perm.not_actions:
match = re.escape(not_action)
match = re.match("(?i)^" + match.replace(r"\*", ".*") + "$", action)
if match:
return f"{action} permission is disabled"
for perm_action in perm.actions:
match = re.escape(perm_action)
match = re.match("(?i)^" + match.replace(r"\*", ".*") + "$", action)
if match:
return None
return f"{action} permission is missing"
def validate_resource(client, key, resource, actions):
perms = client.permissions.list_for_resource(resource['resource_group'],
resource['namespace'],
"",
resource['type'],
resource['name'])
errors = []
for action in actions:
perms, perms_copy = tee(perms)
perms_list = list(perms_copy)
error = can_do_action(perms_list, action)
if error is not None:
row = [key, resource['name'], error]
errors.append(row)
return errors
def get_subnet(client, subnet, subnet_parts):
try:
subnet_obj = client.subnets.get(subnet_parts['resource_group'],
subnet_parts['name'],
subnet_parts['child_name_1'])
except ResourceNotFoundError as err:
raise InvalidArgumentValueError(
f"Invalid -- subnet, error when getting '{subnet}': {str(err)}") from err
except Exception as err:
raise CLIInternalError(
f"Unexpected error when getting subnet '{subnet}': {str(err)}") from err
return subnet_obj
def get_clients(key, cmd):
parts = parse_resource_id(key)
network_client = get_mgmt_service_client(
cmd.cli_ctx, ResourceType.MGMT_NETWORK)
auth_client = get_mgmt_service_client(
cmd.cli_ctx, ResourceType.MGMT_AUTHORIZATION, api_version="2015-07-01")
return parts, network_client, auth_client
# Function to create a progress tracker decorator for the dynamic validation functions
def get_progress_tracker(msg):
def progress_tracking(func):
def inner(cmd, namespace):
hook = cmd.cli_ctx.get_progress_controller()
hook.add(message=msg)
errors = func(cmd, namespace)
hook.end()
return errors
return inner
return progress_tracking
# Validating that the virtual network has the correct permissions
def dyn_validate_vnet(key):
prog = get_progress_tracker("Validating Virtual Network Permissions")
@prog
def _validate_vnet(cmd, namespace):
errors = []
vnet = getattr(namespace, key)
if not is_valid_resource_id(vnet):
raise RequiredArgumentMissingError(
f"Must specify --vnet if --{key.replace('_', '-')} is not an id.")
validate_vnet(cmd, namespace)
parts, network_client, auth_client = get_clients(vnet, cmd)
try:
network_client.virtual_networks.get(parts['resource_group'], parts['name'])
except ResourceNotFoundError as err:
raise InvalidArgumentValueError(
f"Invalid --{key.replace('_', '-')}, error when getting '{vnet}': {str(err)}") from err
except Exception as err:
raise CLIInternalError(
f"Unexpected error when getting vnet '{vnet}': {str(err)}") from err
errors = validate_resource(auth_client, key, parts, [
"Microsoft.Network/virtualNetworks/join/action",
"Microsoft.Network/virtualNetworks/read",
"Microsoft.Network/virtualNetworks/write",
"Microsoft.Network/virtualNetworks/subnets/join/action",
"Microsoft.Network/virtualNetworks/subnets/read",
"Microsoft.Network/virtualNetworks/subnets/write", ])
return errors
return _validate_vnet
# Validating that the route tables attached to the subnet have the
# correct permissions and that the subnet is not assigned to an NSG
def dyn_validate_subnet_and_route_tables(key):
prog = get_progress_tracker(f"Validating {key} permissions")
@prog
def _validate_subnet(cmd, namespace):
errors = []
subnet = getattr(namespace, key)
if not is_valid_resource_id(subnet):
if not namespace.vnet:
raise RequiredArgumentMissingError(
f"Must specify --vnet if --{key.replace('_', '-')} is not an id.")
validate_vnet(cmd, namespace)
subnet = namespace.vnet + '/subnets/' + subnet
setattr(namespace, key, subnet)
parts, network_client, auth_client = get_clients(subnet, cmd)
try:
subnet_obj = network_client.subnets.get(parts['resource_group'],
parts['name'],
parts['child_name_1'])
route_table_obj = subnet_obj.route_table
if route_table_obj is not None:
route_parts = parse_resource_id(route_table_obj.id)
errors = validate_resource(auth_client, f"{key}_route_table", route_parts, [
"Microsoft.Network/routeTables/join/action",
"Microsoft.Network/routeTables/read",
"Microsoft.Network/routeTables/write"])
except ResourceNotFoundError as err:
raise InvalidArgumentValueError(
f"Invalid -- subnet, error when getting '{subnet}': {str(err)}") from err
except Exception as err:
raise CLIInternalError(
f"Unexpected error when getting subnet '{subnet}': {str(err)}") from err
if subnet_obj.network_security_group is not None:
message = f"A Network Security Group \"{subnet_obj.network_security_group.id}\" "\
"is already assigned to this subnet. Ensure there are no Network "\
"Security Groups assigned to cluster subnets before cluster creation"
error = [key, parts['child_name_1'], message]
errors.append(error)
return errors
return _validate_subnet
# Validating that the cidr ranges between the master_subnet, worker_subnet,
# service_cidr and pod_cidr do not overlap at all
def dyn_validate_cidr_ranges():
prog = get_progress_tracker("Validating no overlapping CIDR Ranges on subnets")
@prog
def _validate_cidr_ranges(cmd, namespace):
MIN_CIDR_PREFIX = 23
addresses = []
ERROR_KEY = "CIDR Range"
master_subnet = namespace.master_subnet
worker_subnet = namespace.worker_subnet
pod_cidr = namespace.pod_cidr
service_cidr = namespace.service_cidr
worker_parts = parse_resource_id(worker_subnet)
master_parts = parse_resource_id(master_subnet)
fn = validate_cidr("pod_cidr")
fn(namespace)
fn = validate_cidr("service_cidr")
fn(namespace)
cidr_array = {}
if pod_cidr is not None:
node_mask = MIN_CIDR_PREFIX - int(pod_cidr.split("/")[1])
if node_mask < 2:
addresses.append(["Pod CIDR",
"Pod CIDR Capacity",
f"{pod_cidr} does not contain enough addresses for 3 master nodes " +
"(Requires cidr prefix of 21 or lower)"])
cidr_array["Pod CIDR"] = ipaddress.IPv4Network(pod_cidr)
if service_cidr is not None:
cidr_array["Service CIDR"] = ipaddress.IPv4Network(service_cidr)
network_client = get_mgmt_service_client(
cmd.cli_ctx, ResourceType.MGMT_NETWORK)
worker_subnet_obj = get_subnet(network_client, worker_subnet, worker_parts)
if worker_subnet_obj.address_prefix is None:
for address in worker_subnet_obj.address_prefixes:
cidr_array["Worker Subnet CIDR -- " + address] = ipaddress.IPv4Network(address)
else:
cidr_array["Worker Subnet CIDR"] = ipaddress.IPv4Network(worker_subnet_obj.address_prefix)
master_subnet_obj = get_subnet(network_client, master_subnet, master_parts)
if master_subnet_obj.address_prefix is None:
for address in master_subnet_obj.address_prefixes:
cidr_array["Master Subnet CIDR -- " + address] = ipaddress.IPv4Network(address)
else:
cidr_array["Master Subnet CIDR"] = ipaddress.IPv4Network(master_subnet_obj.address_prefix)
ipv4_zero = ipaddress.IPv4Network("0.0.0.0/0")
for item in cidr_array.items():
key = item[0]
cidr = item[1]
if not cidr.overlaps(ipv4_zero):
addresses.append([ERROR_KEY, key, f"{cidr} is not valid as it does not overlap with {ipv4_zero}"])
for item2 in cidr_array.items():
compare = item2[1]
if cidr is not compare:
if cidr.overlaps(compare):
addresses.append([ERROR_KEY, key, f"{cidr} is not valid as it overlaps with {compare}"])
return addresses
return _validate_cidr_ranges
def dyn_validate_resource_permissions(service_principle_ids, resources):
prog = get_progress_tracker("Validating resource permissions")
@prog
def _validate_resource_permissions(cmd,
_namespace):
errors = []
for sp_id in service_principle_ids:
for role in sorted(resources):
for resource in resources[role]:
try:
resource_contributor_exists = has_role_assignment_on_resource(cmd.cli_ctx,
resource,
sp_id,
role)
if not resource_contributor_exists:
parts = parse_resource_id(resource)
errors.append(["Resource Permissions",
parts['type'],
f"Resource {parts['name']} is missing role assignment " +
f"{role} for service principal {sp_id} " +
"(These roles will be automatically added during cluster creation)"])
except CloudError as e:
logger.error(e.message)
raise
return errors
return _validate_resource_permissions
def dyn_validate_version():
prog = get_progress_tracker("Validating OpenShift Version")
@prog
def _validate_version(cmd,
namespace):
errors = []
if namespace.location is None:
get_default_location_from_resource_group(cmd, namespace)
versions = azext_aro.custom.aro_get_versions(namespace.client, namespace.location)
found = False
for version in versions:
if version == namespace.version:
found = True
break
if not found:
errors.append(["OpenShift Version",
namespace.version,
f"{namespace.version} is not a valid version, valid versions are {versions}"])
return errors
return _validate_version
def validate_cluster_create(version,
resources,
service_principle_ids):
error_object = []
error_object.append(dyn_validate_vnet("vnet"))
error_object.append(dyn_validate_subnet_and_route_tables("master_subnet"))
error_object.append(dyn_validate_subnet_and_route_tables("worker_subnet"))
error_object.append(dyn_validate_cidr_ranges())
error_object.append(dyn_validate_resource_permissions(service_principle_ids, resources))
if version is not None:
error_object.append(dyn_validate_version())
return error_object

Просмотреть файл

@ -23,6 +23,16 @@ helps['aro create'] = """
text: az aro create --resource-group MyResourceGroup --name MyCluster --vnet MyVnet --master-subnet MyMasterSubnet --worker-subnet MyWorkerSubnet --apiserver-visibility Private --ingress-visibility Private
"""
helps['aro validate'] = """
type: command
short-summary: Validate permissions required to create a cluster.
examples:
- name: Validate permissions.
text: az aro validate --resource-group MyGroup --name MyName --vnet MyVnet --master-subnet MyMasterSubnet --worker-subnet MyWorkerSubnet
- name: Validate permissions and OpenShift version
text: az aro validate --resource-group MyGroup --name MyName --vnet MyVnet --master-subnet MyMasterSubnet --worker-subnet MyWorkerSubnet --version X.Y.Z
"""
helps['aro list'] = """
type: command
short-summary: List clusters.

Просмотреть файл

@ -26,3 +26,5 @@ def load_command_table(self, _):
g.custom_command('get-admin-kubeconfig', 'aro_list_admin_credentials')
g.custom_command('get-versions', 'aro_get_versions', table_transformer=aro_version_table_format)
g.custom_command('validate', 'aro_validate')

Просмотреть файл

@ -1,9 +1,11 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the Apache License 2.0.
import collections
import random
import os
from base64 import b64decode
import textwrap
import azext_aro.vendored_sdks.azure.mgmt.redhatopenshift.v2022_09_04.models as openshiftcluster
@ -18,6 +20,7 @@ from azext_aro._rbac import assign_role_to_resource, \
has_role_assignment_on_resource
from azext_aro._rbac import ROLE_NETWORK_CONTRIBUTOR, ROLE_READER
from azext_aro._validators import validate_subnets
from azext_aro._dynamic_validators import validate_cluster_create
from knack.log import get_logger
@ -25,6 +28,8 @@ from msrestazure.azure_exceptions import CloudError
from msrestazure.tools import resource_id, parse_resource_id
from msrest.exceptions import HttpOperationError
from tabulate import tabulate
logger = get_logger(__name__)
FP_CLIENT_ID = 'f1dd0a37-89c6-4e07-bcd1-ffd3d43d8875'
@ -153,6 +158,89 @@ def aro_create(cmd, # pylint: disable=too-many-locals
parameters=oc)
def aro_validate(cmd, # pylint: disable=too-many-locals,too-many-statements
client, # pylint: disable=unused-argument
resource_group_name, # pylint: disable=unused-argument
resource_name, # pylint: disable=unused-argument
master_subnet,
worker_subnet,
vnet=None,
cluster_resource_group=None, # pylint: disable=unused-argument
client_id=None,
client_secret=None, # pylint: disable=unused-argument
vnet_resource_group_name=None, # pylint: disable=unused-argument
disk_encryption_set=None,
location=None, # pylint: disable=unused-argument
version=None,
pod_cidr=None, # pylint: disable=unused-argument
service_cidr=None # pylint: disable=unused-argument
):
class mockoc: # pylint: disable=too-few-public-methods
def __init__(self, disk_encryption_id, master_subnet_id, worker_subnet_id):
self.master_profile = openshiftcluster.MasterProfile(
subnet_id=master_subnet_id,
disk_encryption_set_id=disk_encryption_id
)
self.worker_profiles = [openshiftcluster.WorkerProfile(
subnet_id=worker_subnet_id
)]
aad = AADManager(cmd.cli_ctx)
rp_client_sp_id = aad.get_service_principal_id(resolve_rp_client_id())
if not rp_client_sp_id:
raise ResourceNotFoundError("RP service principal not found.")
sp_obj_ids = [rp_client_sp_id]
if client_id is not None:
sp_obj_ids.append(aad.get_service_principal_id(client_id))
cluster = mockoc(disk_encryption_set, master_subnet, worker_subnet)
try:
# Get cluster resources we need to assign permissions on, sort to ensure the same order of operations
resources = {ROLE_NETWORK_CONTRIBUTOR: sorted(get_cluster_network_resources(cmd.cli_ctx, cluster, True)),
ROLE_READER: sorted(get_disk_encryption_resources(cluster))}
except (CloudError, HttpOperationError) as e:
logger.error(e.message)
raise
if vnet is None:
master_parts = parse_resource_id(master_subnet)
vnet = resource_id(
subscription=master_parts['subscription'],
resource_group=master_parts['resource_group'],
namespace='Microsoft.Network',
type='virtualNetworks',
name=master_parts['name'],
)
error_objects = validate_cluster_create(version,
resources,
sp_obj_ids)
errors = []
for error_func in error_objects:
namespace = collections.namedtuple("Namespace", locals().keys())(*locals().values())
error_obj = error_func(cmd, namespace)
if error_obj != []:
for err in error_obj:
# Wrap text so tabulate returns a pretty table
new_err = []
for txt in err:
new_err.append(textwrap.fill(txt, width=160))
errors.append(new_err)
if len(errors) > 0:
logger.error("Issues found blocking cluster creation.\n")
headers = ["Type", "Name", "Error"]
table = tabulate(errors, headers=headers, tablefmt="grid")
print(f"\n{table}")
else:
print("\nNo Issues on subscription blocking cluster creation\n")
def aro_delete(cmd, client, resource_group_name, resource_name, no_wait=False):
# TODO: clean up rbac
rp_client_sp_id = None

Просмотреть файл

@ -0,0 +1,456 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the Apache License 2.0.
from unittest.mock import Mock, patch
from azext_aro._dynamic_validators import (
dyn_validate_cidr_ranges, dyn_validate_subnet_and_route_tables, dyn_validate_vnet, dyn_validate_resource_permissions, dyn_validate_version
)
from azure.mgmt.authorization.models import Permission
import pytest
test_validate_cidr_data = [
(
"should return no error on address_prefix set on subnets, no additional cidrs, no overlap",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr=None, service_cidr=None),
Mock(**{"subnets.get.side_effect": [Mock(address_prefix="172.143.5.0/24"), Mock(address_prefix="172.143.4.0/25")]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
None
),
(
"should return no error on address_prefix set on subnets, additional cidrs, no overlap",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr="172.142.0.0/21", service_cidr="172.143.6.0/25"),
Mock(**{"subnets.get.side_effect": [Mock(address_prefix="172.143.4.0/24"), Mock(address_prefix="172.143.5.0/25")]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
None
),
(
"should return no error on multiple address_prefixes set on subnets, additional cidrs, no overlap",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr="172.142.0.0/21", service_cidr="172.143.6.0/25"),
Mock(**{"subnets.get.side_effect": [Mock(address_prefix=None,
address_prefixes=["172.143.4.0/24", "172.143.8.0/25"]),
Mock(address_prefix=None,
address_prefixes=["172.143.5.0/25", "172.143.9.0/24"])]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
None
),
(
"should error on address_prefix set on subnets, no additional cidrs, overlap",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr=None, service_cidr=None),
Mock(**{"subnets.get.side_effect": [Mock(address_prefix="172.143.4.0/24"), Mock(address_prefix="172.143.4.0/25")]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
"172.143.4.0/24 is not valid as it overlaps with 172.143.4.0/25"
),
(
"should return error on pod cidr not having enough addresses to create cluster",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr="172.143.4.0/24", service_cidr=None),
Mock(**{"subnets.get.side_effect": [Mock(address_prefix="172.143.5.0/24"), Mock(address_prefix="172.143.4.0/25")]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
"172.143.4.0/24 does not contain enough addresses for 3 master nodes (Requires cidr prefix of 21 or lower)"
),
]
@pytest.mark.parametrize(
"test_description, cmd_mock, namespace_mock, client_mock, parse_resource_id_mock_return_value, expected_addresses",
test_validate_cidr_data,
ids=[i[0] for i in test_validate_cidr_data]
)
@ patch('azext_aro._dynamic_validators.get_mgmt_service_client')
@ patch('azext_aro._dynamic_validators.parse_resource_id')
def test_validate_cidr(
# Mocked functions:
parse_resource_id_mock, get_mgmt_service_client_mock,
# Test cases parameters:
test_description, cmd_mock, namespace_mock, client_mock, parse_resource_id_mock_return_value, expected_addresses
):
parse_resource_id_mock.return_value = parse_resource_id_mock_return_value
get_mgmt_service_client_mock.return_value = client_mock
validate_cidr_fn = dyn_validate_cidr_ranges()
if expected_addresses is None:
addresses = validate_cidr_fn(cmd_mock, namespace_mock)
if (len(addresses) > 0):
raise Exception(f"Unexpected Error: {addresses[0]}")
else:
addresses = validate_cidr_fn(cmd_mock, namespace_mock)
if (addresses[0][2] != expected_addresses):
raise Exception(f"Error returned was not expected\n Expected : {expected_addresses}\n Actual : {addresses[0][2]}")
test_validate_subnets_data = [
(
"should not return missing permission when actions are permitted",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr=None, service_cidr=None),
Mock(**{"subnets.get.return_value": Mock(network_security_group=None, route_table=Mock(id="test"))}),
Mock(**{"permissions.list_for_resource.return_value": [Permission(actions=["Microsoft.Network/routeTables/*"], not_actions=[])]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
Mock(),
None
),
(
"should return missing permission when actions are not permitted",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr=None, service_cidr=None),
Mock(**{"subnets.get.return_value": Mock(network_security_group=None, route_table=Mock(id="test"))}),
Mock(**{"permissions.list_for_resource.return_value": [Permission(actions=[], not_actions=["Microsoft.Network/routeTables/*"])]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
Mock(),
"Microsoft.Network/routeTables/join/action permission is disabled"
),
(
"should return missing permission when actions are not present",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr=None, service_cidr=None),
Mock(**{"subnets.get.return_value": Mock(network_security_group=None, route_table=Mock(id="test"))}),
Mock(**{"permissions.list_for_resource.return_value": [Permission(actions=[], not_actions=[])]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
Mock(),
"Microsoft.Network/routeTables/join/action permission is missing"
),
(
"should return message when network security group is already attached to subnet",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr=None, service_cidr=None),
Mock(**{"subnets.get.return_value": Mock(network_security_group=Mock(id="test"), route_table=Mock(id="test"))}),
Mock(**{"permissions.list_for_resource.return_value": [Permission(actions=["Microsoft.Network/routeTables/*"], not_actions=[])]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
Mock(),
"A Network Security Group \"test\" is already assigned to this subnet. "
"Ensure there are no Network Security Groups assigned to cluster "
"subnets before cluster creation"
)
]
@pytest.mark.parametrize(
"test_description, cmd_mock, namespace_mock, network_client_mock, auth_client_mock, parse_resource_id_mock_return_value, is_valid_resource_id_mock_return_value, expected_missing_perms",
test_validate_subnets_data,
ids=[i[0] for i in test_validate_subnets_data]
)
@ patch('azext_aro._dynamic_validators.get_mgmt_service_client')
@ patch('azext_aro._dynamic_validators.parse_resource_id')
@ patch('azext_aro._dynamic_validators.is_valid_resource_id')
def test_validate_subnets(
# Mocked functions:
is_valid_resource_id_mock, parse_resource_id_mock, get_mgmt_service_client_mock,
# Test cases parameters:
test_description, cmd_mock, namespace_mock, network_client_mock, auth_client_mock, parse_resource_id_mock_return_value, is_valid_resource_id_mock_return_value, expected_missing_perms
):
is_valid_resource_id_mock.return_value = is_valid_resource_id_mock_return_value
parse_resource_id_mock.return_value = parse_resource_id_mock_return_value
get_mgmt_service_client_mock.side_effect = [network_client_mock, auth_client_mock]
validate_subnet_fn = dyn_validate_subnet_and_route_tables('')
if expected_missing_perms is None:
missing_perms = validate_subnet_fn(cmd_mock, namespace_mock)
if (len(missing_perms) > 0):
raise Exception(f"Unexpected Permission Missing: {missing_perms[0]}")
else:
missing_perms = validate_subnet_fn(cmd_mock, namespace_mock)
if (missing_perms[0][2] != expected_missing_perms):
raise Exception(f"Error returned was not expected\n Expected : {expected_missing_perms}\n Actual : {missing_perms[0][2]}")
test_validate_vnets_data = [
(
"should not return missing permission when actions are permitted",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr=None, service_cidr=None),
Mock(**{"subnets.get.return_value": Mock(route_table=Mock(id="test"))}),
Mock(**{"permissions.list_for_resource.return_value": [Permission(actions=["Microsoft.Network/virtualNetworks/*"], not_actions=[])]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
Mock(),
None
),
(
"should return disabled permission when actions are not permitted",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr=None, service_cidr=None),
Mock(**{"subnets.get.return_value": Mock(route_table=Mock(id="test"))}),
Mock(**{"permissions.list_for_resource.return_value": [Permission(actions=[], not_actions=["Microsoft.Network/virtualNetworks/*"])]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
Mock(),
"Microsoft.Network/virtualNetworks/join/action permission is disabled"
),
(
"should return missing permission when actions are not present",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(vnet='', key="192.168.0.1/32", master_subnet='', worker_subnet='', pod_cidr=None, service_cidr=None),
Mock(**{"subnets.get.return_value": Mock(route_table=Mock(id="test"))}),
Mock(**{"permissions.list_for_resource.return_value": [Permission(actions=[], not_actions=[])]}),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
Mock(),
"Microsoft.Network/virtualNetworks/join/action permission is missing"
)
]
@pytest.mark.parametrize(
"test_description, cmd_mock, namespace_mock, network_client_mock, auth_client_mock, parse_resource_id_mock_return_value, is_valid_resource_id_mock_return_value, expected_missing_perms",
test_validate_vnets_data,
ids=[i[0] for i in test_validate_vnets_data]
)
@ patch('azext_aro._dynamic_validators.get_mgmt_service_client')
@ patch('azext_aro._dynamic_validators.parse_resource_id')
@ patch('azext_aro._dynamic_validators.is_valid_resource_id')
def test_validate_vnets(
# Mocked functions:
is_valid_resource_id_mock, parse_resource_id_mock, get_mgmt_service_client_mock,
# Test cases parameters:
test_description, cmd_mock, namespace_mock, network_client_mock, auth_client_mock, parse_resource_id_mock_return_value, is_valid_resource_id_mock_return_value, expected_missing_perms
):
is_valid_resource_id_mock.return_value = is_valid_resource_id_mock_return_value
parse_resource_id_mock.return_value = parse_resource_id_mock_return_value
get_mgmt_service_client_mock.side_effect = [network_client_mock, auth_client_mock]
validate_vnet_fn = dyn_validate_vnet("vnet")
if expected_missing_perms is None:
missing_perms = validate_vnet_fn(cmd_mock, namespace_mock)
if (len(missing_perms) > 0):
raise Exception(f"Unexpected Permission Missing: {missing_perms[0]}")
else:
missing_perms = validate_vnet_fn(cmd_mock, namespace_mock)
if (missing_perms[0][2] != expected_missing_perms):
raise Exception(f"Error returned was not expected\n Expected : {expected_missing_perms}\n Actual : {missing_perms[0][2]}")
test_validate_resource_data = [
(
"should not return missing permission when role assignments are assigned",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": None,
"child_name_1": None
},
[True, True, True, True, True, True, True, True],
None
),
(
"should return missing permission when role assignments are not assigned",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(),
{
"subscription": "subscription",
"namespace": "MICROSOFT.NETWORK",
"type": "virtualnetworks",
"last_child_num": 1,
"child_type_1": "subnets",
"resource_group": None,
"name": "Test_Subnet",
"child_name_1": None
},
[True, True, True, True, True, True, True, False],
"Resource Test_Subnet is missing role assignment test for service principal test " +
"(These roles will be automatically added during cluster creation)"
)
]
@pytest.mark.parametrize(
"test_description, cmd_mock, namespace_mock, parse_resource_id_mock_return_value, has_role_assignment_on_resource_mock_return_value, expected_missing_perms",
test_validate_resource_data,
ids=[i[0] for i in test_validate_resource_data]
)
@ patch('azext_aro._dynamic_validators.has_role_assignment_on_resource')
@ patch('azext_aro._dynamic_validators.parse_resource_id')
def test_validate_resources(
# Mocked functions:
parse_resource_id_mock, has_role_assignment_on_resource_mock,
# Test cases parameters:
test_description, cmd_mock, namespace_mock, parse_resource_id_mock_return_value, has_role_assignment_on_resource_mock_return_value, expected_missing_perms
):
parse_resource_id_mock.return_value = parse_resource_id_mock_return_value
has_role_assignment_on_resource_mock.side_effect = has_role_assignment_on_resource_mock_return_value
sp_ids = ["test", "test"]
resources = {"test": "test", "test": "test"}
validate_res_perms_fn = dyn_validate_resource_permissions(sp_ids, resources)
if expected_missing_perms is None:
missing_perms = validate_res_perms_fn(cmd_mock, namespace_mock)
if (len(missing_perms) > 0):
raise Exception(f"Unexpected Permission Missing: {missing_perms[0]}")
else:
missing_perms = validate_res_perms_fn(cmd_mock, namespace_mock)
if (missing_perms[0][2] != expected_missing_perms):
raise Exception(f"Error returned was not expected\n Expected : {expected_missing_perms}\n Actual : {missing_perms[0][2]}")
test_validate_version_data = [
(
"should not return error when visibility is Public",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(version="4.9.10"),
["4.9.10"],
None
),
(
"should return error when visibility is random string",
Mock(cli_ctx=Mock(get_progress_controller=Mock(add=Mock(), end=Mock()))),
Mock(version="72.2343.21212"),
["4.9.10"],
"72.2343.21212 is not a valid version, valid versions are ['4.9.10']"
)
]
@pytest.mark.parametrize(
"test_description, cmd_mock, namespace_mock, get_versions_mock_return_value, expected_errors",
test_validate_version_data,
ids=[i[0] for i in test_validate_version_data]
)
@ patch('azext_aro.custom.aro_get_versions')
def test_validate_version(
# Mocked Functions
aro_get_versions_mock,
# Test cases parameters:
test_description, cmd_mock, namespace_mock, get_versions_mock_return_value, expected_errors
):
aro_get_versions_mock.return_value = get_versions_mock_return_value
validate_version_fn = dyn_validate_version()
if expected_errors is None:
errors = validate_version_fn(cmd_mock, namespace_mock)
if (len(errors) > 0):
raise Exception(f"Unexpected Error: {errors[0][2]}")
else:
errors = validate_version_fn(cmd_mock, namespace_mock)
if (errors[0][2] != expected_errors):
raise Exception(f"Error returned was not expected\n Expected : {expected_errors}\n Actual : {errors[0][2]}")