Add tests for no outbound connectivity (#2804)

* Add tests for no outbound connectivity

---------

Co-authored-by: narrieta <narrieta>
This commit is contained in:
Norberto Arrieta 2023-04-18 13:17:36 -07:00 коммит произвёл GitHub
Родитель 7de613305a
Коммит cb566561a7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
17 изменённых файлов: 432 добавлений и 74 удалений

Просмотреть файл

@ -25,6 +25,23 @@ import tests_e2e
from tests_e2e.tests.lib.agent_test import AgentTest
class TestInfo(object):
"""
Description of a test
"""
# The class that implements the test
test_class: Type[AgentTest]
# If True, an error in the test blocks the execution of the test suite (defaults to False)
blocks_suite: bool
@property
def name(self) -> str:
return self.test_class.__name__
def __str__(self):
return self.name
class TestSuiteInfo(object):
"""
Description of a test suite
@ -32,13 +49,15 @@ class TestSuiteInfo(object):
# The name of the test suite
name: str
# The tests that comprise the suite
tests: List[Type[AgentTest]]
tests: List[TestInfo]
# Images or image sets (as defined in images.yml) on which the suite must run.
images: List[str]
# The location (region) on which the suite must run; if empty, the suite can run on any location
location: str
# Whether this suite must run on its own test VM
owns_vm: bool
# Customization for the ARM template used when creating the test VM
template: str
def __str__(self):
return self.name
@ -139,7 +158,7 @@ class AgentTestLoader(object):
"""
Loads the description of a TestSuite from its YAML file.
A test suite has 5 properties: name, tests, images, location, and owns-vm. For example:
A test suite has 5 properties: name, tests, images, location, and owns_vm. For example:
name: "AgentBvt"
tests:
@ -148,18 +167,22 @@ class AgentTestLoader(object):
- "bvts/vm_access.py"
images: "endorsed"
location: "eastuseaup"
owns-vm: true
owns_vm: true
* name - A string used to identify the test suite
* tests - A list of the tests in the suite. Each test is specified by the path for its source code relative to
WALinuxAgent/tests_e2e/tests.
* tests - A list of the tests in the suite. Each test can be specified by a string (the path for its source code relative to
WALinuxAgent/tests_e2e/tests), or a dictionary with two items:
* source: the path for its source code relative to WALinuxAgent/tests_e2e/tests
* blocks_suite: [Optional; boolean] If True, a failure on the test will stop execution of the test suite (i.e. the
rest of the tests in the suite will not be executed). By default, a failure on a test does not stop execution of
the test suite.
* images - A string, or a list of strings, specifying the images on which the test suite must be executed. Each value
can be the name of a single image (e.g."ubuntu_2004"), or the name of an image set (e.g. "endorsed"). The
names for images and image sets are defined in WALinuxAgent/tests_e2e/tests_suites/images.yml.
* location - [Optional; string] If given, the test suite must be executed on that location. If not specified,
or set to an empty string, the test suite will be executed in the default location. This is useful
for test suites that exercise a feature that is enabled only in certain regions.
* owns-vm - [Optional; boolean] By default all suites in a test run are executed on the same test VMs; if this
* owns_vm - [Optional; boolean] By default all suites in a test run are executed on the same test VMs; if this
value is set to True, new test VMs will be created and will be used exclusively for this test suite.
This is useful for suites that modify the test VMs in such a way that the setup may cause problems
in other test suites (for example, some tests targeted to the HGAP block internet access in order to
@ -176,9 +199,15 @@ class AgentTestLoader(object):
test_suite_info.name = test_suite["name"]
test_suite_info.tests = []
source_files = [AgentTestLoader._SOURCE_CODE_ROOT/"tests"/t for t in test_suite["tests"]]
for f in source_files:
test_suite_info.tests.extend(AgentTestLoader._load_test_classes(f))
for test in test_suite["tests"]:
test_info = TestInfo()
if isinstance(test, str):
test_info.test_class = AgentTestLoader._load_test_class(test)
test_info.blocks_suite = False
else:
test_info.test_class = AgentTestLoader._load_test_class(test["source"])
test_info.blocks_suite = test.get("blocks_suite", False)
test_suite_info.tests.append(test_info)
images = test_suite["images"]
if isinstance(images, str):
@ -190,20 +219,26 @@ class AgentTestLoader(object):
if test_suite_info.location is None:
test_suite_info.location = ""
test_suite_info.owns_vm = "owns-vm" in test_suite and test_suite["owns-vm"]
test_suite_info.owns_vm = "owns_vm" in test_suite and test_suite["owns_vm"]
test_suite_info.template = test_suite.get("template", "")
return test_suite_info
@staticmethod
def _load_test_classes(source_file: Path) -> List[Type[AgentTest]]:
def _load_test_class(relative_path: str) -> Type[AgentTest]:
"""
Takes a 'source_file', which must be a Python module, and returns a list of all the classes derived from AgentTest.
Loads an AgentTest from its source code file, which is given as a path relative to WALinuxAgent/tests_e2e/tests.
"""
spec = importlib.util.spec_from_file_location(f"tests_e2e.tests.{source_file.name}", str(source_file))
full_path: Path = AgentTestLoader._SOURCE_CODE_ROOT/"tests"/relative_path
spec = importlib.util.spec_from_file_location(f"tests_e2e.tests.{relative_path.replace('/', '.').replace('.py', '')}", str(full_path))
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# return all the classes in the module that are subclasses of AgentTest but are not AgentTest itself.
return [v for v in module.__dict__.values() if isinstance(v, type) and issubclass(v, AgentTest) and v != AgentTest]
matches = [v for v in module.__dict__.values() if isinstance(v, type) and issubclass(v, AgentTest) and v != AgentTest]
if len(matches) != 1:
raise Exception(f"Error in {full_path} (each test file must contain exactly one class derived from AgentTest)")
return matches[0]
@staticmethod
def _load_images() -> Dict[str, List[VmImageInfo]]:

Просмотреть файл

@ -459,6 +459,8 @@ class AgentTestSuite(LisaTestSuite):
with _set_thread_name(suite_full_name): # The thread name is added to the LISA log
log_path: Path = self.context.log_path/f"{suite_full_name}.log"
with set_current_thread_log(log_path):
suite_success: bool = True
try:
log.info("")
log.info("**************************************** %s ****************************************", suite_name)
@ -467,54 +469,54 @@ class AgentTestSuite(LisaTestSuite):
summary: List[str] = []
for test in suite.tests:
test_name = test.__name__
test_full_name = f"{suite_name}-{test_name}"
test_full_name = f"{suite_name}-{test.name}"
test_start_time: datetime.datetime = datetime.datetime.now()
log.info("******** Executing %s", test_name)
log.info("******** Executing %s", test.name)
self.context.lisa_log.info("Executing test %s", test_full_name)
test_success: bool = True
try:
test.test_class(self.context).run()
test(self.context).run()
summary.append(f"[Passed] {test_name}")
log.info("******** [Passed] %s", test_name)
summary.append(f"[Passed] {test.name}")
log.info("******** [Passed] %s", test.name)
self.context.lisa_log.info("[Passed] %s", test_full_name)
self._report_test_result(
suite_full_name,
test_name,
test.name,
TestStatus.PASSED,
test_start_time)
except TestSkipped as e:
summary.append(f"[Skipped] {test_name}")
log.info("******** [Skipped] %s: %s", test_name, e)
summary.append(f"[Skipped] {test.name}")
log.info("******** [Skipped] %s: %s", test.name, e)
self.context.lisa_log.info("******** [Skipped] %s", test_full_name)
self._report_test_result(
suite_full_name,
test_name,
test.name,
TestStatus.SKIPPED,
test_start_time,
message=str(e))
except AssertionError as e:
success = False
summary.append(f"[Failed] {test_name}")
log.error("******** [Failed] %s: %s", test_name, e)
test_success = False
summary.append(f"[Failed] {test.name}")
log.error("******** [Failed] %s: %s", test.name, e)
self.context.lisa_log.error("******** [Failed] %s", test_full_name)
self._report_test_result(
suite_full_name,
test_name,
test.name,
TestStatus.FAILED,
test_start_time,
message=str(e))
except: # pylint: disable=bare-except
success = False
summary.append(f"[Error] {test_name}")
log.exception("UNHANDLED EXCEPTION IN %s", test_name)
test_success = False
summary.append(f"[Error] {test.name}")
log.exception("UNHANDLED EXCEPTION IN %s", test.name)
self.context.lisa_log.exception("UNHANDLED EXCEPTION IN %s", test_full_name)
self._report_test_result(
suite_full_name,
test_name,
test.name,
TestStatus.FAILED,
test_start_time,
message="Unhandled exception.",
@ -522,14 +524,21 @@ class AgentTestSuite(LisaTestSuite):
log.info("")
log.info("********* [Test Results]")
suite_success = suite_success and test_success
if not test_success and test.blocks_suite:
log.warning("%s failed and blocks the suite. Stopping suite execution.", test.name)
break
log.info("")
log.info("******** [Test Results]")
log.info("")
for r in summary:
log.info("\t%s", r)
log.info("")
except: # pylint: disable=bare-except
success = False
suite_success = False
self._report_test_result(
suite_full_name,
suite_name,
@ -538,7 +547,7 @@ class AgentTestSuite(LisaTestSuite):
message=f"Unhandled exception while executing test suite {suite_name}.",
add_exception_stack_trace=True)
finally:
if not success:
if not suite_success:
self._mark_log_as_failed()
return success
@ -562,7 +571,7 @@ class AgentTestSuite(LisaTestSuite):
# E1133: Non-iterable value self.context.test_suites is used in an iterating context (not-an-iterable)
for suite in self.context.test_suites: # pylint: disable=E1133
for test in suite.tests:
ignore_error_rules.extend(test(self.context).get_ignore_error_rules())
ignore_error_rules.extend(test.test_class(self.context).get_ignore_error_rules())
if len(ignore_error_rules) > 0:
new = []

Просмотреть файл

@ -218,36 +218,44 @@ class AgentTestSuitesCombinator(Combinator):
else:
vm_size = ""
if suite_info.owns_vm:
# create an environment for exclusive use by this suite
environment_list.append({
# Note: Disabling "W0640: Cell variable 'foo' defined in loop (cell-var-from-loop)". This is a false positive, the closure is OK
# to use, since create_environment() is called within the same iteration of the loop.
# pylint: disable=W0640
def create_environment(env_name: str) -> Dict[str, Any]:
tags = {}
if suite_info.template != '':
tags["templates"] = suite_info.template
return {
"c_marketplace_image": marketplace_image,
"c_cloud": self.runbook.cloud,
"c_location": location,
"c_vm_size": vm_size,
"c_vhd": vhd,
"c_test_suites": [suite_info],
"c_env_name": f"{name}-{suite_info.name}",
"c_env_name": env_name,
"c_marketplace_image_information_location": self._MARKETPLACE_IMAGE_INFORMATION_LOCATIONS[self.runbook.cloud],
"c_shared_resource_group_location": self._SHARED_RESOURCE_GROUP_LOCATIONS[self.runbook.cloud]
})
"c_shared_resource_group_location": self._SHARED_RESOURCE_GROUP_LOCATIONS[self.runbook.cloud],
"c_vm_tags": tags
}
# pylint: enable=W0640
if suite_info.owns_vm:
# create an environment for exclusive use by this suite
environment_list.append(create_environment(f"{name}-{suite_info.name}"))
else:
# add this suite to the shared environments
key: str = f"{name}-{location}"
if key in shared_environments:
shared_environments[key]["c_test_suites"].append(suite_info)
environment = shared_environments.get(key)
if environment is not None:
environment["c_test_suites"].append(suite_info)
if suite_info.template != '':
vm_tags = environment["c_vm_tags"]
if "templates" in vm_tags:
vm_tags["templates"] += ", " + suite_info.template
else:
vm_tags["templates"] = suite_info.template
else:
shared_environments[key] = {
"c_marketplace_image": marketplace_image,
"c_cloud": self.runbook.cloud,
"c_location": location,
"c_vm_size": vm_size,
"c_vhd": vhd,
"c_test_suites": [suite_info],
"c_env_name": key,
"c_marketplace_image_information_location": self._MARKETPLACE_IMAGE_INFORMATION_LOCATIONS[self.runbook.cloud],
"c_shared_resource_group_location": self._SHARED_RESOURCE_GROUP_LOCATIONS[self.runbook.cloud]
}
shared_environments[key] = create_environment(key)
environment_list.extend(shared_environments.values())
@ -256,18 +264,17 @@ class AgentTestSuitesCombinator(Combinator):
log: logging.Logger = logging.getLogger("lisa")
log.info("")
log.info("******** Agent Test Environments *****")
log.info("******** Waagent: Test Environments *****")
log.info("")
for environment in environment_list:
test_suites = [s.name for s in environment['c_test_suites']]
log.info("Settings for %s:\n%s\n", environment['c_env_name'], '\n'.join([f"\t{name}: {value if name != 'c_test_suites' else test_suites}" for name, value in environment.items()]))
log.info("***************************")
log.info("")
return environment_list
_URN = re.compile(r"(?P<publisher>[^\s:]+)[\s:](?P<offer>[^\s:]+)[\s:](?P<sku>[^\s:]+)[\s:](?P<version>[^\s:]+)")
@staticmethod
def _is_urn(urn: str) -> bool:
# URNs can be given as '<Publisher> <Offer> <Sku> <Version>' or '<Publisher>:<Offer>:<Sku>:<Version>'

Просмотреть файл

@ -0,0 +1,67 @@
# Microsoft Azure Linux Agent
#
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import importlib
import logging
from pathlib import Path
from typing import Any, Callable
# Disable those warnings, since 'lisa' is an external, non-standard, dependency
# E0401: Unable to import 'lisa.*' (import-error)
# pylint: disable=E0401
from lisa.environment import Environment
from lisa.util import hookimpl, plugin_manager
from lisa.sut_orchestrator.azure.platform_ import AzurePlatformSchema
# pylint: enable=E0401
import tests_e2e
class UpdateArmTemplateHook:
"""
This hook allows to customize the ARM template used to create the test VMs (see wiki for details).
"""
@hookimpl
def azure_update_arm_template(self, template: Any, environment: Environment) -> None:
azure_runbook: AzurePlatformSchema = environment.platform.runbook.get_extended_runbook(AzurePlatformSchema)
vm_tags = azure_runbook.vm_tags
templates = vm_tags.get("templates")
if templates is not None:
log: logging.Logger = logging.getLogger("lisa")
log.info("******** Waagent: Applying custom templates '%s' to environment '%s'", templates, environment.name)
for t in templates.split(","):
update_arm_template = self._get_update_arm_template(t)
update_arm_template(template)
_SOURCE_CODE_ROOT: Path = Path(tests_e2e.__path__[0])
@staticmethod
def _get_update_arm_template(template_path: str) -> Callable:
source_file: Path = UpdateArmTemplateHook._SOURCE_CODE_ROOT/"tests"/template_path
spec = importlib.util.spec_from_file_location(f"tests_e2e.tests.templates.{source_file.name}", str(source_file))
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
matches = [v for v in module.__dict__.values() if callable(v) and v.__name__ == "update_arm_template"]
if len(matches) != 1:
raise Exception(f"Could not find update_arm_template in {source_file}")
return matches[0]
plugin_manager.register(UpdateArmTemplateHook())

Просмотреть файл

@ -67,6 +67,10 @@ variable:
# Most of these variables are handled by LISA and are used to define the set of test VMs that need to be
# created. The variables marked with 'is_case_visible' are also referenced by the AgentTestSuite.
#
# 'c_vm_tags' is a special case: it is used by the azure_update_arm_template hook. This hook does not
# have access to the runbook variables, so instead we use a dummy VM tag named "template" to pass the
# name of the custom ARM template that the hook needs to use (see wiki for more details).
#
- name: c_env_name
value: ""
is_case_visible: true
@ -90,6 +94,8 @@ variable:
- name: c_test_suites
value: []
is_case_visible: true
- name: c_vm_tags
value: {}
#
# Set these variables to use an SSH proxy when executing the runbook
@ -120,6 +126,7 @@ platform:
# shared_resource_group_location: $(c_shared_resource_group_location)
subscription_id: $(subscription_id)
wait_delete: false
vm_tags: $(c_vm_tags)
requirement:
core_count:
min: 2

Просмотреть файл

@ -76,12 +76,8 @@ variable:
is_case_visible: true
#
# The values for these variables are generated by the AgentTestSuitesCombinator combinator. They are
# prefixed with "c_" to distinguish them from the rest of the variables, whose value can be set from
# the command line.
#
# Most of these variables are handled by LISA and are used to define the set of test VMs that need to be
# created. The variables marked with 'is_case_visible' are also referenced by the AgentTestSuite.
# The values for these variables are generated by the AgentTestSuitesCombinator. See
# tests_e2e/orchestrator/runbook.yml for details.
#
- name: c_env_name
value: ""
@ -101,6 +97,8 @@ variable:
- name: c_test_suites
value: []
is_case_visible: true
- name: c_vm_tags
value: {}
#
# Set these variables to use an SSH proxy when executing the runbook
@ -129,6 +127,7 @@ platform:
resource_group_name: $(resource_group_name)
deploy: false
subscription_id: $(subscription_id)
vm_tags: $(c_vm_tags)
requirement:
azure:
name: $(c_vm_name)

Просмотреть файл

@ -9,7 +9,7 @@ parameters:
- name: test_suites
displayName: Test Suites
type: string
default: agent_bvt
default: agent_bvt, no_outbound_connections
# NOTES:
# * 'image', 'location' and 'vm_size' override any values in the test suites/images definition

Просмотреть файл

@ -0,0 +1,21 @@
#
# This suite is used to test the scenario where outbound connections are blocked on the VM. In this case,
# the agent should fallback to the HostGAPlugin to request any downloads.
#
# The suite uses a custom ARM template to create a VM with a Network Security Group that blocks all outbound
# connections. The first test in the suite verifies that the setup of the NSG was successful, then the rest
# of the tests exercise different extension operations. The last test in the suite checks the agent log
# to verify it did fallback to the HostGAPlugin to execute the extensions.
#
name: "NoOutboundConnections"
tests:
- source: "no_outbound_connections/check_no_outbound_connections.py"
blocks_suite: true # If the NSG is not setup correctly, there is no point in executing the rest of the tests.
- "bvts/extension_operations.py"
- "bvts/run_command.py"
- "bvts/vm_access.py"
- "no_outbound_connections/check_fallback_to_hgap.py"
images:
- "ubuntu_2004"
template: "no_outbound_connections/nsg_template.py"
owns_vm: true

Просмотреть файл

@ -45,10 +45,7 @@ class RunCommandBvt(AgentTest):
self.get_settings = get_settings
def run(self):
ssh_client = SshClient(
ip_address=self._context.vm_ip_address,
username=self._context.username,
private_key_file=self._context.private_key_file)
ssh_client: SshClient = self._context.create_ssh_client()
test_cases = [
RunCommandBvt.TestCase(

Просмотреть файл

@ -38,7 +38,7 @@ from tests_e2e.tests.lib.vm_extension import VmExtension
class VmAccessBvt(AgentTest):
def run(self):
ssh: SshClient = SshClient(ip_address=self._context.vm_ip_address, username=self._context.username, private_key_file=self._context.private_key_file)
ssh: SshClient = self._context.create_ssh_client()
if "-flatcar" in ssh.run_command("uname -a"):
raise TestSkipped("Currently VMAccess is not supported on Flatcar")

Просмотреть файл

@ -21,6 +21,7 @@ from pathlib import Path
import tests_e2e
from tests_e2e.tests.lib.identifiers import VmIdentifier
from tests_e2e.tests.lib.ssh_client import SshClient
class AgentTestContext:
@ -121,6 +122,16 @@ class AgentTestContext:
"""
return self._connection._ssh_port
def create_ssh_client(self) -> SshClient:
"""
Convenience method to create an SSH client using the connection info from the context.
"""
return SshClient(
ip_address=self.vm_ip_address,
username=self.username,
private_key_file=self.private_key_file,
port=self.ssh_port)
@staticmethod
def from_args():
"""

Просмотреть файл

@ -55,5 +55,5 @@ def retry_ssh_run(operation: Callable[[], Any]) -> Any:
# Instance of 'Exception' has no 'exit_code' member (no-member) - Disabled: e is actually an CommandError
if e.exit_code != 255 or attempts == 0: # pylint: disable=no-member
raise
log.warning("The operation failed with %s, retrying in 30 secs.", e)
log.warning("The operation failed, retrying in 30 secs.\n%s", e)
time.sleep(30)

Просмотреть файл

@ -38,7 +38,7 @@ class CommandError(Exception):
def run_command(command: Any, shell=False) -> str:
"""
This function is a thin wrapper around Popen/communicate in the subprocess module. It executes the given command
and returns its stdout. If the command returns a non-zero exit code, the function raises a RunCommandException.
and returns its stdout. If the command returns a non-zero exit code, the function raises a CommandError.
Similarly to Popen, the 'command' can be a string or a list of strings, and 'shell' indicates whether to execute
the command through the shell.

Просмотреть файл

@ -34,7 +34,7 @@ class SshClient(object):
def run_command(self, command: str, use_sudo: bool = False) -> str:
"""
Executes the given command over SSH and returns its stdout. If the command returns a non-zero exit code,
the function raises a RunCommandException.
the function raises a CommandError.
"""
if re.match(r"^\s*sudo\s*", command):
raise Exception("Do not include 'sudo' in the 'command' argument, use the 'use_sudo' parameter instead")

Просмотреть файл

@ -0,0 +1,51 @@
#!/usr/bin/env python3
# Microsoft Azure Linux Agent
#
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from assertpy import assert_that
from tests_e2e.tests.lib.agent_test import AgentTest
from tests_e2e.tests.lib.logging import log
from tests_e2e.tests.lib.ssh_client import SshClient
class NoOutboundConnections(AgentTest):
"""
Check the agent log to verify that the default channel was changed to HostGAPlugin before executing any extensions.
"""
def run(self):
# 2023-04-14T14:49:43.005530Z INFO ExtHandler ExtHandler Default channel changed to HostGAPlugin channel.
# 2023-04-14T14:49:44.625061Z INFO ExtHandler [Microsoft.Azure.Monitor.AzureMonitorLinuxAgent-1.25.2] Target handler state: enabled [incarnation_2]
ssh_client: SshClient = self._context.create_ssh_client()
log.info("Parsing agent log on the test VM")
output = ssh_client.run_command("grep -E 'INFO ExtHandler.*(Default channel changed to HostGAPlugin)|(Target handler state:)' /var/log/waagent.log | head").split('\n')
log.info("Output (first 10 lines) from the agent log:\n\t\t%s", '\n\t\t'.join(output))
assert_that(len(output) > 1).is_true().described_as(
"The agent log should contain multiple matching records"
)
assert_that(output[0]).contains("Default channel changed to HostGAPlugin").described_as(
"The agent log should contain a record indicating that the default channel was changed to HostGAPlugin before executing any extensions"
)
log.info("The agent log indicates that the default channel was changed to HostGAPlugin before executing any extensions")
if __name__ == "__main__":
NoOutboundConnections.run_from_command_line()

Просмотреть файл

@ -0,0 +1,59 @@
#!/usr/bin/env python3
# Microsoft Azure Linux Agent
#
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from assertpy import fail
from tests_e2e.tests.lib.agent_test import AgentTest
from tests_e2e.tests.lib.logging import log
from tests_e2e.tests.lib.shell import CommandError
from tests_e2e.tests.lib.ssh_client import SshClient
class CheckNoOutboundConnections(AgentTest):
"""
Verifies that there is no outbound connectivity on the test VM.
"""
def run(self):
# This script is executed on the test VM. It tries to connect to a well-known DNS server (DNS is on port 53).
script: str = """
import socket, sys
try:
socket.setdefaulttimeout(5)
socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect(("8.8.8.8", 53))
except socket.timeout:
print("No outbound connectivity [expected]")
exit(0)
print("There is outbound connectivity [unexpected: the custom ARM template should not allow it]", file=sys.stderr)
exit(1)
"""
ssh_client: SshClient = self._context.create_ssh_client()
try:
log.info("Verifying that there is no outbound connectivity on the test VM")
ssh_client.run_command("pypy3 -c '{0}'".format(script.replace('"', '\"')))
log.info("There is no outbound connectivity, as expected.")
except CommandError as e:
if e.exit_code == 1 and "There is outbound connectivity" in e.stderr:
fail("There is outbound connectivity on the test VM, the custom ARM template should not allow it")
else:
raise Exception(f"Unexpected error while checking outbound connectivity on the test VM: {e}")
if __name__ == "__main__":
CheckNoOutboundConnections.run_from_command_line()

Просмотреть файл

@ -0,0 +1,95 @@
#!/usr/bin/env python3
# Microsoft Azure Linux Agent
#
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
from typing import Any
def update_arm_template(template: Any) -> None:
"""
Updates the ARM template to add a network security group that denies all outbound connections.
"""
resources = template["resources"]
# Append the NSG to the list of resources
resources.append(json.loads("""
{
"type": "Microsoft.Network/networkSecurityGroups",
"name": "no-outbound-connections",
"location": "[parameters('location')]",
"apiVersion": "2020-05-01",
"properties": {
"securityRules": [
{
"name": "ssh_rule",
"properties": {
"description": "Allows inbound SSH connections.",
"protocol": "Tcp",
"sourcePortRange": "*",
"destinationPortRange": "22",
"sourceAddressPrefix": "*",
"destinationAddressPrefix": "*",
"access": "Allow",
"priority": 110,
"direction": "Inbound"
}
},
{
"name": "outbound_rule",
"properties": {
"description": "Denies all outbound connections.",
"protocol": "*",
"sourcePortRange": "*",
"destinationPortRange": "*",
"sourceAddressPrefix": "*",
"destinationAddressPrefix": "Internet",
"access": "Deny",
"priority": 200,
"direction": "Outbound"
}
}
]
}
}
"""))
# Add a dependency of the deployment on the NSG
deployment_resource = _get_resource(resources, "Microsoft.Resources/deployments")
deployment_resource["dependsOn"].append("[resourceId('Microsoft.Network/networkSecurityGroups', 'no-outbound-connections')]")
# Add reference to the NSG to the properties of the network interface
template_resources = deployment_resource["properties"]["template"]["resources"]
network_interface_resource = _get_resource(template_resources, "Microsoft.Network/networkInterfaces")
network_interface_resource["properties"].update(json.loads(
"""
{
"networkSecurityGroup": {
"id": "[resourceId('Microsoft.Network/networkSecurityGroups', 'no-outbound-connections')]"
}
}
"""))
def _get_resource(resources: Any, type_name: str) -> Any:
for item in resources:
if item["type"] == type_name:
return item
raise Exception(f"Cannot find a resource of type {type_name} in the ARM template")