add test cases for env, and testsuite

refine test cases to reuse test case util functions
minor update on comments
This commit is contained in:
Chi Song 2020-09-15 22:00:54 +08:00
Родитель 13a41d20af
Коммит 694de48b0f
4 изменённых файлов: 498 добавлений и 69 удалений

Просмотреть файл

@ -0,0 +1,125 @@
from typing import Any, List
from unittest import TestCase
from lisa import schema, search_space
from lisa.environment import load_environments
from lisa.testsuite import simple_requirement
from lisa.util import constants
def generate_runbook(
is_single_env: bool = False,
local: bool = False,
remote: bool = False,
requirement: bool = False,
) -> schema.EnvironmentRoot:
environments: List[Any] = list()
nodes: List[Any] = list()
if local:
nodes.append(
{
constants.TYPE: constants.ENVIRONMENTS_NODES_LOCAL,
constants.ENVIRONMENTS_NODES_CAPABILITY: {"coreCount": {"min": 4}},
}
)
if remote:
nodes.append(
{
constants.TYPE: constants.ENVIRONMENTS_NODES_REMOTE,
constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS: "internal_address",
constants.ENVIRONMENTS_NODES_REMOTE_PORT: 22,
"publicAddress": "public_address",
"publicPort": 10022,
constants.ENVIRONMENTS_NODES_REMOTE_USERNAME: "name_of_user",
constants.ENVIRONMENTS_NODES_REMOTE_PASSWORD: "dont_use_it",
}
)
if requirement:
nodes.append(
{
constants.TYPE: constants.ENVIRONMENTS_NODES_REQUIREMENT,
"nodeCount": 2,
"coreCount": 8,
"diskCount": {"min": 1},
"nicCount": {"max": 1},
}
)
if is_single_env:
environments = [{"nodes": nodes}]
else:
for node in nodes:
environments.append({"nodes": [node]})
data = {"maxConcurrency": 2, constants.ENVIRONMENTS: environments}
return schema.EnvironmentRoot.schema().load(data) # type: ignore
class EnvironmentTestCase(TestCase):
def test_load_empty_runbook(self) -> None:
envs = load_environments(None)
self.assertEqual(0, len(envs))
self.assertEqual(False, envs.warn_as_error)
self.assertEqual(1, envs.max_concurrency)
self.assertEqual(True, envs.allow_create)
def test_create_from_runbook_splited(self) -> None:
runbook = generate_runbook(local=True, remote=True)
envs = load_environments(runbook)
self.assertEqual(2, len(envs))
for env in envs.values():
for node in env.nodes.list():
# mock initializing
node._is_initialized = True
self.assertEqual(1, len(env.nodes))
def test_create_from_runbook_merged(self) -> None:
runbook = generate_runbook(is_single_env=True, local=True, remote=True)
envs = load_environments(runbook)
self.assertEqual(1, len(envs))
for env in envs.values():
for node in env.nodes.list():
# mock initializing
node._is_initialized = True
self.assertEqual(2, len(env.nodes))
def test_create_from_runbook_cap(self) -> None:
runbook = generate_runbook(local=True, requirement=True)
envs = load_environments(runbook)
self.assertEqual(2, len(envs))
env = envs.get("runbook_0")
assert env
for node in env.nodes.list():
# mock initializing
node._is_initialized = True
self.assertEqual(search_space.IntRange(min=4), node.capability.core_count)
self.assertEqual(search_space.IntRange(min=1), node.capability.disk_count)
# check from env capability
env_cap = env.capability
self.assertEqual(1, len(env_cap.nodes))
self.assertEqual(search_space.IntRange(min=4), env_cap.nodes[0].core_count)
self.assertEqual(search_space.IntRange(min=1), env_cap.nodes[0].disk_count)
# test pure node_requirement
env = envs.get("runbook_1")
assert env
env_cap = env.capability
self.assertEqual(2, len(env_cap.nodes))
self.assertEqual(8, env_cap.nodes[0].core_count)
self.assertEqual(search_space.IntRange(min=1), env_cap.nodes[0].disk_count)
self.assertEqual(search_space.IntRange(max=1), env_cap.nodes[0].nic_count)
def test_create_from_requirement(self) -> None:
requirement = simple_requirement(min_count=2)
env_requirement = requirement.environment
assert env_requirement
envs = load_environments(None)
env = envs.get_or_create(requirement=env_requirement)
assert env
self.assertEqual(1, len(envs))
env = envs.get_or_create(requirement=env_requirement)
self.assertEqual(1, len(envs), "get or create again won't create new")
assert env
self.assertEqual(0, len(env.nodes))
self.assertIsNone(env.runbook.nodes)
assert env.runbook.nodes_requirement
self.assertEqual(2, len(env.runbook.nodes_requirement))

Просмотреть файл

@ -1,56 +1,46 @@
from typing import Any, List
from unittest import TestCase
from lisa.parameter_parser.runbook import validate_data
from lisa.testselector import select_testcases
from lisa.testsuite import (
TestCaseMetadata,
TestCaseRuntimeData,
TestSuiteMetadata,
get_cases_metadata,
get_suites_metadata,
)
from lisa.tests.test_testsuite import cleanup_metadata, select_and_check
from lisa.util import LisaException, constants
class SelectorTestCase(TestCase):
def setUp(self) -> None:
get_cases_metadata().clear()
get_suites_metadata().clear()
cleanup_metadata()
def test_no_case_selected(self) -> None:
runbook = [{constants.TESTCASE_CRITERIA: {"area": "demo"}}]
self._select_and_check(runbook, [])
select_and_check(self, runbook, [])
def test_skip_not_enabled(self) -> None:
runbook = [
{constants.TESTCASE_CRITERIA: {"tags": "t2"}, constants.ENABLE: False},
{constants.TESTCASE_CRITERIA: {"tags": "t3"}},
]
self._select_and_check(runbook, ["ut3"])
select_and_check(self, runbook, ["ut3"])
def test_select_by_priroity(self) -> None:
runbook = [{constants.TESTCASE_CRITERIA: {"priority": 0}}]
self._select_and_check(runbook, ["ut1"])
select_and_check(self, runbook, ["ut1"])
def test_select_by_tag(self) -> None:
runbook = [{constants.TESTCASE_CRITERIA: {"tags": "t1"}}]
self._select_and_check(runbook, ["ut1", "ut2"])
select_and_check(self, runbook, ["ut1", "ut2"])
def test_select_by_one_of_tag(self) -> None:
runbook = [{constants.TESTCASE_CRITERIA: {"tags": ["t1", "t3"]}}]
self._select_and_check(runbook, ["ut1", "ut2", "ut3"])
select_and_check(self, runbook, ["ut1", "ut2", "ut3"])
def test_select_by_two_rules(self) -> None:
runbook = [{constants.TESTCASE_CRITERIA: {"tags": ["t1", "t3"], "area": "a1"}}]
self._select_and_check(runbook, ["ut1", "ut2"])
select_and_check(self, runbook, ["ut1", "ut2"])
def test_select_by_two_criteria(self) -> None:
runbook = [
{constants.TESTCASE_CRITERIA: {"name": "mock_ut1"}},
{constants.TESTCASE_CRITERIA: {"name": "mock_ut2"}},
]
self._select_and_check(runbook, ["ut1", "ut2"])
select_and_check(self, runbook, ["ut1", "ut2"])
def test_select_then_drop(self) -> None:
runbook = [
@ -60,7 +50,7 @@ class SelectorTestCase(TestCase):
constants.TESTCASE_SELECT_ACTION: "exclude",
},
]
self._select_and_check(runbook, ["ut1"])
select_and_check(self, runbook, ["ut1"])
def test_select_drop_select(self) -> None:
runbook = [
@ -71,7 +61,7 @@ class SelectorTestCase(TestCase):
},
{constants.TESTCASE_CRITERIA: {"tags": "t1"}},
]
self._select_and_check(runbook, ["ut1", "ut2"])
select_and_check(self, runbook, ["ut1", "ut2"])
def test_select_force_include(self) -> None:
runbook = [
@ -84,7 +74,7 @@ class SelectorTestCase(TestCase):
constants.TESTCASE_SELECT_ACTION: "exclude",
},
]
self._select_and_check(runbook, ["ut1", "ut2"])
select_and_check(self, runbook, ["ut1", "ut2"])
def test_select_force_conflict(self) -> None:
runbook = [
@ -98,7 +88,7 @@ class SelectorTestCase(TestCase):
},
]
with self.assertRaises(LisaException) as cm:
self._select_and_check(runbook, ["ut1", "ut2"])
select_and_check(self, runbook, ["ut1", "ut2"])
self.assertIsInstance(cm.exception, LisaException)
self.assertIn("force", str(cm.exception))
@ -118,7 +108,7 @@ class SelectorTestCase(TestCase):
},
]
with self.assertRaises(LisaException) as cm:
self._select_and_check(runbook, [])
select_and_check(self, runbook, [])
self.assertIsInstance(cm.exception, LisaException)
self.assertIn("force", str(cm.exception))
@ -126,7 +116,7 @@ class SelectorTestCase(TestCase):
runbook = [
{constants.TESTCASE_CRITERIA: {"tags": "t1"}, "retry": 2},
]
selected = self._select_and_check(runbook, ["ut1", "ut2"])
selected = select_and_check(self, runbook, ["ut1", "ut2"])
self.assertListEqual([2, 2], [case.retry for case in selected])
@ -139,7 +129,7 @@ class SelectorTestCase(TestCase):
constants.TESTCASE_SELECT_ACTION: "none",
},
]
selected = self._select_and_check(runbook, ["ut1", "ut2", "ut2"])
selected = select_and_check(self, runbook, ["ut1", "ut2", "ut2"])
self.assertListEqual([1, 2, 2], [case.times for case in selected])
self.assertListEqual([0, 0, 0], [case.retry for case in selected])
@ -153,7 +143,7 @@ class SelectorTestCase(TestCase):
constants.TESTCASE_SELECT_ACTION: "none",
},
]
selected = self._select_and_check(runbook, ["ut1", "ut2"])
selected = select_and_check(self, runbook, ["ut1", "ut2"])
self.assertListEqual([0, 2], [case.retry for case in selected])
def test_select_with_diff_setting(self) -> None:
@ -161,45 +151,6 @@ class SelectorTestCase(TestCase):
{constants.TESTCASE_CRITERIA: {"tags": "t1"}, "retry": 2},
{constants.TESTCASE_CRITERIA: {"name": "mock_ut2"}, "retry": 3},
]
selected = self._select_and_check(runbook, ["ut1", "ut2"])
selected = select_and_check(self, runbook, ["ut1", "ut2"])
self.assertListEqual([2, 3], [case.retry for case in selected])
def _select_and_check(
self, case_runbook: List[Any], expected_descriptions: List[str]
) -> List[TestCaseRuntimeData]:
runbook = validate_data({constants.TESTCASE: case_runbook})
case_metadatas = self._generate_metadata()
selected = select_testcases(runbook.testcase, case_metadatas)
self.assertListEqual(
expected_descriptions, [case.description for case in selected]
)
return selected
def _generate_metadata(self) -> List[TestCaseMetadata]:
ut_cases = [
TestCaseMetadata("ut1", 0),
TestCaseMetadata("ut2", 1),
TestCaseMetadata("ut3", 2),
]
suite_metadata1 = TestSuiteMetadata("a1", "c1", "des1", ["t1", "t2"])
suite_metadata2 = TestSuiteMetadata("a2", "c2", "des2", ["t2", "t3"])
for metadata in ut_cases[0:2]:
self._init_metadata(metadata, suite_metadata1)
self._init_metadata(ut_cases[2], suite_metadata2)
return ut_cases
def _init_metadata(
self, metadata: TestCaseMetadata, suite_metadata: TestSuiteMetadata
) -> None:
func = _mock_test_function
func.__name__ = f"mock_{metadata.description}"
func.__qualname__ = f"mockclass.mock_{metadata.description}"
metadata(func)
metadata.set_suite(suite_metadata)
suite_metadata.cases.append(metadata)
def _mock_test_function() -> None:
pass

Просмотреть файл

@ -0,0 +1,353 @@
import asyncio
from typing import Any, List
from unittest import TestCase
from lisa import schema
from lisa.environment import load_environments
from lisa.operating_system import Linux, Windows
from lisa.parameter_parser.runbook import validate_data
from lisa.tests.test_environment import generate_runbook
from lisa.testselector import select_testcases
from lisa.testsuite import (
TestCaseMetadata,
TestCaseRuntimeData,
TestResult,
TestStatus,
TestSuite,
TestSuiteMetadata,
get_cases_metadata,
get_suites_metadata,
simple_requirement,
)
from lisa.util import LisaException, constants
class MockTestSuite(TestSuite):
def set_fail_phase(
self,
fail_on_before_suite: bool = False,
fail_on_after_suite: bool = False,
fail_on_before_case: bool = False,
fail_on_after_case: bool = False,
fail_case_count: int = 0,
) -> None:
self.fail_on_before_suite = fail_on_before_suite
self.fail_on_after_suite = fail_on_after_suite
self.fail_on_before_case = fail_on_before_case
self.fail_on_after_case = fail_on_after_case
self.fail_case_count = fail_case_count
def before_suite(self) -> None:
if self.fail_on_before_suite:
raise LisaException("failed")
def after_suite(self) -> None:
if self.fail_on_after_suite:
raise LisaException("failed")
def before_case(self) -> None:
if self.fail_on_before_case:
raise LisaException("failed")
def after_case(self) -> None:
if self.fail_on_after_case:
raise LisaException("failed")
def mock_ut1(self) -> None:
while self.fail_case_count > 0:
self.fail_case_count -= 1
raise LisaException("mock_ut1 failed")
def mock_ut2(self) -> None:
pass
class MockTestSuite2(TestSuite):
def mock_ut3(self) -> None:
pass
def cleanup_metadata() -> None:
get_cases_metadata().clear()
get_suites_metadata().clear()
def generate_cases_metadata() -> List[TestCaseMetadata]:
ut_cases = [
TestCaseMetadata("ut1", 0, requirement=simple_requirement(min_count=2),),
TestCaseMetadata("ut2", 1),
TestCaseMetadata("ut3", 2),
]
suite_metadata1 = TestSuiteMetadata("a1", "c1", "des1", ["t1", "t2"])
suite_metadata1(MockTestSuite)
ut_cases[0](MockTestSuite.mock_ut1)
ut_cases[1](MockTestSuite.mock_ut2)
suite_metadata2 = TestSuiteMetadata(
"a2",
"c2",
"des2",
["t2", "t3"],
requirement=simple_requirement(node=schema.NodeSpace(core_count=8)),
)
suite_metadata2(MockTestSuite2)
ut_cases[2](MockTestSuite2.mock_ut3)
return ut_cases
def generate_cases_result() -> List[TestResult]:
case_metadata = generate_cases_metadata()
case_results = [TestResult(TestCaseRuntimeData(x)) for x in case_metadata]
return case_results
def select_and_check(
ut: TestCase, case_runbook: List[Any], expected_descriptions: List[str]
) -> List[TestCaseRuntimeData]:
runbook = validate_data({constants.TESTCASE: case_runbook})
case_metadatas = generate_cases_metadata()
selected = select_testcases(runbook.testcase, case_metadatas)
ut.assertListEqual(expected_descriptions, [case.description for case in selected])
return selected
class TestSuiteTestCase(TestCase):
def generate_suite_instance(self) -> MockTestSuite:
case_results = generate_cases_result()
case_results = case_results[:2]
suite_metadata = case_results[0].runtime_data.metadata.suite
runbook = generate_runbook(is_single_env=True, local=True, remote=True)
envs = load_environments(runbook)
self.default_env = list(envs.values())[0]
assert self.default_env
test_suite = MockTestSuite(
environment=self.default_env,
case_results=case_results,
metadata=suite_metadata,
)
return test_suite
def setUp(self) -> None:
cleanup_metadata()
def test_expanded_nodespace(self) -> None:
cases = generate_cases_metadata()
for case in cases:
self.assertIsNotNone(case.requirement)
assert case.requirement.environment
for node in case.requirement.environment.nodes:
self.assertEqual(
1, node.node_count, "node count should be expanded to 1"
)
def test_case_override_suite(self) -> None:
cases = generate_cases_metadata()
case1_found = False
case2_found = False
for case in cases:
assert case.requirement.environment
assert case.suite.requirement.environment
if case.name == "mock_ut1":
self.assertEqual(2, len(case.requirement.environment.nodes))
self.assertEqual(1, len(case.suite.requirement.environment.nodes))
case1_found = True
if case.name == "mock_ut2":
self.assertEqual(1, len(case.requirement.environment.nodes))
self.assertEqual(1, len(case.suite.requirement.environment.nodes))
case2_found = True
self.assertEqual(True, case1_found)
self.assertEqual(True, case2_found)
def test_test_result_canrun(self) -> None:
runbook = [{constants.TESTCASE_CRITERIA: {"priority": [0, 1, 2]}}]
cases = select_and_check(self, runbook, ["ut1", "ut2", "ut3"])
case = cases[0]
for status in TestStatus:
result = TestResult(case)
result.set_status(status, f"set_{status}")
self.assertEqual(f"set_{status}", result.message)
self.assertEqual(status, result.status)
if status == TestStatus.NOTRUN:
self.assertEqual(True, result.can_run)
else:
self.assertEqual(False, result.can_run)
def test_skip_before_suite_failed(self) -> None:
test_suite = self.generate_suite_instance()
test_suite.set_fail_phase(fail_on_before_suite=True)
asyncio.run(test_suite.start())
for result in test_suite.case_results:
self.assertEqual(TestStatus.SKIPPED, result.status)
self.assertEqual("before_suite: failed", result.message)
def test_pass_after_suite_failed(self) -> None:
test_suite = self.generate_suite_instance()
test_suite.set_fail_phase(fail_on_after_suite=True)
asyncio.run(test_suite.start())
for result in test_suite.case_results:
self.assertEqual(TestStatus.PASSED, result.status)
self.assertEqual("", result.message)
def test_skip_before_case_failed(self) -> None:
test_suite = self.generate_suite_instance()
test_suite.set_fail_phase(fail_on_before_case=True)
asyncio.run(test_suite.start())
for result in test_suite.case_results:
self.assertEqual(TestStatus.SKIPPED, result.status)
self.assertEqual("before_case: failed", result.message)
def test_pass_after_case_failed(self) -> None:
test_suite = self.generate_suite_instance()
test_suite.set_fail_phase(fail_on_after_case=True)
asyncio.run(test_suite.start())
for result in test_suite.case_results:
self.assertEqual(TestStatus.PASSED, result.status)
self.assertEqual("", result.message)
def test_skip_case_failed(self) -> None:
test_suite = self.generate_suite_instance()
test_suite.set_fail_phase(fail_case_count=1)
asyncio.run(test_suite.start())
result = test_suite.case_results[0]
self.assertEqual(TestStatus.FAILED, result.status)
self.assertEqual("failed: mock_ut1 failed", result.message)
result = test_suite.case_results[1]
self.assertEqual(TestStatus.PASSED, result.status)
self.assertEqual("", result.message)
def test_retry_passed(self) -> None:
test_suite = self.generate_suite_instance()
test_suite.set_fail_phase(fail_case_count=1)
result = test_suite.case_results[0]
result.runtime_data.retry = 1
asyncio.run(test_suite.start())
self.assertEqual(TestStatus.PASSED, result.status)
self.assertEqual("", result.message)
result = test_suite.case_results[1]
self.assertEqual(TestStatus.PASSED, result.status)
self.assertEqual("", result.message)
def test_retry_notenough_failed(self) -> None:
test_suite = self.generate_suite_instance()
test_suite.set_fail_phase(fail_case_count=2)
result = test_suite.case_results[0]
result.runtime_data.retry = 1
asyncio.run(test_suite.start())
self.assertEqual(TestStatus.FAILED, result.status)
self.assertEqual("failed: mock_ut1 failed", result.message)
result = test_suite.case_results[1]
self.assertEqual(TestStatus.PASSED, result.status)
self.assertEqual("", result.message)
def test_attempt_ignore_failure(self) -> None:
test_suite = self.generate_suite_instance()
test_suite.set_fail_phase(fail_case_count=2)
result = test_suite.case_results[0]
result.runtime_data.ignore_failure = True
asyncio.run(test_suite.start())
self.assertEqual(TestStatus.ATTEMPTED, result.status)
self.assertEqual("mock_ut1 failed", result.message)
result = test_suite.case_results[1]
self.assertEqual(TestStatus.PASSED, result.status)
self.assertEqual("", result.message)
def test_result_check_env_not_ready_os_type(self) -> None:
test_suite = self.generate_suite_instance()
assert self.default_env
self.default_env.is_ready = False
for node in self.default_env.nodes.list():
node.os = Linux(node)
for result in test_suite.case_results:
check_result = result.check_environment(self.default_env)
self.assertTrue(check_result)
def test_result_check_env_os_type_not_unsupported(self) -> None:
test_suite = self.generate_suite_instance()
assert self.default_env
self.default_env.is_ready = True
case_metadata = test_suite.case_results[0].runtime_data.metadata
case_metadata.requirement = simple_requirement(
min_count=2, unsupported_os=[Linux]
)
for node in self.default_env.nodes.list():
node.os = Windows(node)
for result in test_suite.case_results:
check_result = result.check_environment(self.default_env)
self.assertTrue(check_result)
def test_result_check_env_os_type_unsupported(self) -> None:
test_suite = self.generate_suite_instance()
assert self.default_env
self.default_env.is_ready = True
case_metadata = test_suite.case_results[0].runtime_data.metadata
case_metadata.requirement = simple_requirement(
min_count=2, unsupported_os=[Linux]
)
for node in self.default_env.nodes.list():
node.os = Linux(node)
check_result = test_suite.case_results[0].check_environment(self.default_env)
self.assertFalse(check_result)
check_result = test_suite.case_results[1].check_environment(self.default_env)
self.assertTrue(check_result)
def test_result_check_env_os_type_supported(self) -> None:
test_suite = self.generate_suite_instance()
assert self.default_env
self.default_env.is_ready = True
case_metadata = test_suite.case_results[0].runtime_data.metadata
case_metadata.requirement = simple_requirement(
min_count=2, supported_os=[Linux]
)
for node in self.default_env.nodes.list():
node.os = Linux(node)
for result in test_suite.case_results:
check_result = result.check_environment(self.default_env)
self.assertTrue(check_result)
def test_result_check_env_os_type_not_supported(self) -> None:
test_suite = self.generate_suite_instance()
assert self.default_env
self.default_env.is_ready = True
case_metadata = test_suite.case_results[0].runtime_data.metadata
case_metadata.requirement = simple_requirement(
min_count=2, supported_os=[Linux]
)
for node in self.default_env.nodes.list():
node.os = Windows(node)
check_result = test_suite.case_results[0].check_environment(self.default_env)
self.assertFalse(check_result)
check_result = test_suite.case_results[1].check_environment(self.default_env)
self.assertTrue(check_result)
def test_skipped_not_meet_req(self) -> None:
test_suite = self.generate_suite_instance()
assert self.default_env
self.default_env.is_ready = True
case_metadata = test_suite.case_results[0].runtime_data.metadata
case_metadata.requirement = simple_requirement(min_count=3)
result = test_suite.case_results[0]
check_result = result.check_environment(self.default_env, save_reason=True)
self.assertFalse(check_result)
# only save reason, but not set final status, so that it can try next env
self.assertEqual(TestStatus.NOTRUN, result.status)
assert result.check_results
self.assertFalse(result.check_results.result)
self.assertListEqual(
["no enough nodes, capability: 2, requirement: 3"],
result.check_results.reasons,
)
result = test_suite.case_results[1]
check_result = result.check_environment(self.default_env, save_reason=True)
self.assertTrue(check_result)
self.assertEqual(TestStatus.NOTRUN, result.status)
assert result.check_results
self.assertTrue(result.check_results.result)
self.assertListEqual(
[], result.check_results.reasons,
)

Просмотреть файл

@ -176,7 +176,7 @@ class TestCaseMetadata:
self.requirement = requirement
def __getattr__(self, key: str) -> Any:
# inherit any attributes of metadata
# inherit all attributes of test suite
assert self.suite, "suite is not set before use metadata"
return getattr(self.suite, key)
@ -210,7 +210,7 @@ class TestCaseRuntimeData:
self.environment_name: str = ""
def __getattr__(self, key: str) -> Any:
# inherit any attributes of metadata
# inherit all attributes of metadata
assert self.metadata
return getattr(self.metadata, key)