Part2 - Migrate NVMe test cases. (#1369)

* Part2 - Migrate NVMe test cases.

* Implement assertion for cmd result.

This pattern is used by many test cases, implement the shared logic.

Add message for assert exit code method.

Co-authored-by: Chi Song <chisong@example.com>
This commit is contained in:
LiliDeng 2021-07-07 14:11:34 +08:00 коммит произвёл GitHub
Родитель abdc1fadf2
Коммит 10758c05f8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 570 добавлений и 19 удалений

Просмотреть файл

@ -7,6 +7,7 @@ from .cat import Cat
from .date import Date from .date import Date
from .dmesg import Dmesg from .dmesg import Dmesg
from .echo import Echo from .echo import Echo
from .fdisk import Fdisk
from .find import Find from .find import Find
from .gcc import Gcc from .gcc import Gcc
from .git import Git from .git import Git
@ -15,7 +16,9 @@ from .lsmod import Lsmod
from .lspci import Lspci from .lspci import Lspci
from .lsvmbus import Lsvmbus from .lsvmbus import Lsvmbus
from .make import Make from .make import Make
from .mkfs import Mkfsext, Mkfsxfs
from .modinfo import Modinfo from .modinfo import Modinfo
from .mount import Mount
from .ntttcp import Ntttcp from .ntttcp import Ntttcp
from .nvmecli import Nvmecli from .nvmecli import Nvmecli
from .reboot import Reboot from .reboot import Reboot
@ -27,6 +30,7 @@ __all__ = [
"Date", "Date",
"Dmesg", "Dmesg",
"Echo", "Echo",
"Fdisk",
"Find", "Find",
"Gcc", "Gcc",
"Git", "Git",
@ -35,7 +39,10 @@ __all__ = [
"Lspci", "Lspci",
"Lsvmbus", "Lsvmbus",
"Make", "Make",
"Mkfsext",
"Mkfsxfs",
"Modinfo", "Modinfo",
"Mount",
"Ntttcp", "Ntttcp",
"Nvmecli", "Nvmecli",
"Reboot", "Reboot",

68
lisa/tools/fdisk.py Normal file
Просмотреть файл

@ -0,0 +1,68 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
from enum import Enum
from typing import cast
from lisa.executable import Tool
from lisa.operating_system import Posix
from lisa.util import LisaException
from .mkfs import Mkfsext, Mkfsxfs
FileSystem = Enum(
"mkfs",
["xfs", "ext2", "ext3", "ext4"],
)
class Fdisk(Tool):
@property
def command(self) -> str:
return "fdisk"
@property
def can_install(self) -> bool:
return True
def make_partition(self, disk_name: str, file_system: FileSystem) -> None:
"""
disk_name: make a partition against the disk.
file_system: making the file system type against the partition
Make a partition and a filesystem against the disk.
"""
# n => new a partition
# p => primary partition
# 1 => Partition number
# "" => Use default 2048 for 'First sector'
# "" => Use default 2147483647 as 'Last sector'
# w => write table to disk and exit
self.node.execute(
f"(echo n; echo p; echo 1; echo ; echo; echo ; echo w) | "
f"{self.command} {disk_name}",
shell=True,
sudo=True,
)
if file_system == FileSystem.xfs:
mkfs_xfs = self.node.tools[Mkfsxfs]
mkfs_xfs.mkfs(f"{disk_name}p1", str(file_system))
elif file_system in [FileSystem.ext2, FileSystem.ext3, FileSystem.ext4]:
mkfs_ext = self.node.tools[Mkfsext]
mkfs_ext.mkfs(f"{disk_name}p1", str(file_system))
else:
raise LisaException(f"Unrecognized file system {file_system}.")
def delete_partition(self, disk_name: str) -> None:
"""
disk: delete one partition against the disk.
Delete the only partition of this disk.
"""
# d => delete a partition
# w => write table to disk and exit
self.node.execute(
f"(echo d; echo w) | {self.command} {disk_name}", shell=True, sudo=True
)
def _install(self) -> bool:
posix_os: Posix = cast(Posix, self.node.os)
posix_os.install_packages("util-linux")
return self._check_exists()

Просмотреть файл

@ -18,6 +18,10 @@ class Lsmod(Tool):
r"^(?P<name>[^\s]+)\s+(?P<size>[^\s]+)\s+(?P<usedby>.*)?$", re.MULTILINE r"^(?P<name>[^\s]+)\s+(?P<size>[^\s]+)\s+(?P<usedby>.*)?$", re.MULTILINE
) )
@property
def command(self) -> str:
return self._command
def _check_exists(self) -> bool: def _check_exists(self) -> bool:
return True return True

Просмотреть файл

@ -2,7 +2,7 @@
# Licensed under the MIT license. # Licensed under the MIT license.
import re import re
from typing import Any, List from typing import Any, Dict, List
from lisa.executable import Tool from lisa.executable import Tool
from lisa.operating_system import Posix from lisa.operating_system import Posix
@ -30,6 +30,12 @@ PATTERN_PCI_DEVICE = re.compile(
re.MULTILINE, re.MULTILINE,
) )
DEVICE_TYPE_DICT: Dict[str, str] = {
"SRIOV": "Ethernet controller",
"NVME": "Non-Volatile memory controller",
"GPU": "3D controller",
}
class PciDevice: class PciDevice:
def __init__(self, pci_device_raw: str) -> None: def __init__(self, pci_device_raw: str) -> None:
@ -64,8 +70,16 @@ class Lspci(Tool):
self.node.os.install_packages("pciutils") self.node.os.install_packages("pciutils")
return self._check_exists() return self._check_exists()
def _get_devices_slots_by_class_name(
self, class_name: str, force_run: bool = False
) -> List[str]:
devices_list = self.get_device_list(force_run)
devices_slots = [x.slot for x in devices_list if class_name == x.device_class]
return devices_slots
def get_device_list(self, force_run: bool = False) -> List[PciDevice]: def get_device_list(self, force_run: bool = False) -> List[PciDevice]:
if (not self._pci_devices) or force_run: if (not self._pci_devices) or force_run:
self._pci_devices = []
result = self.run("-m", force_run=force_run, shell=True) result = self.run("-m", force_run=force_run, shell=True)
if result.exit_code != 0: if result.exit_code != 0:
result = self.run("-m", force_run=force_run, shell=True, sudo=True) result = self.run("-m", force_run=force_run, shell=True, sudo=True)
@ -79,3 +93,27 @@ class Lspci(Tool):
self._pci_devices.append(pci_device) self._pci_devices.append(pci_device)
return self._pci_devices return self._pci_devices
def disable_devices(self, device_type: str) -> None:
if device_type.upper() not in DEVICE_TYPE_DICT.keys():
raise LisaException(f"pci_type {device_type} is not supported to disable.")
device_type_name = DEVICE_TYPE_DICT[device_type.upper()]
devices_slot = self._get_devices_slots_by_class_name(device_type_name)
if 0 == len(devices_slot):
self._log.debug("No matched devices found.")
return
for device_slot in devices_slot:
cmd_result = self.node.execute(
f"echo 1 > /sys/bus/pci/devices/{device_slot}/remove",
shell=True,
sudo=True,
)
cmd_result.assert_exit_code()
if len(self._get_devices_slots_by_class_name(device_type_name, True)) > 0:
raise LisaException(f"Fail to disable {device_type_name} devices.")
def enable_devices(self) -> None:
cmd_result = self.node.execute(
"echo 1 > /sys/bus/pci/rescan", shell=True, sudo=True
)
cmd_result.assert_exit_code()

46
lisa/tools/mkfs.py Normal file
Просмотреть файл

@ -0,0 +1,46 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import re
from typing import cast
from lisa.executable import Tool
from lisa.operating_system import Posix
class Mkfs(Tool):
__EXIST_FILE_SYSTEM_PATTERN = re.compile(
r".*appears to contain an existing filesystem", re.MULTILINE
)
@property
def command(self) -> str:
return "mkfs.xfs"
@property
def can_install(self) -> bool:
return True
# command - mkfs.xfs, mkfs.ext2, mkfs.ext3, mkfs.ext4
def mkfs(self, disk: str, command: str) -> None:
cmd_result = self.node.execute(
f"echo y | {command} {disk}", shell=True, sudo=True
)
if self.__EXIST_FILE_SYSTEM_PATTERN.match(cmd_result.stdout):
cmd_result = self.node.execute(
f"echo y | {command} -f {disk}", shell=True, sudo=True
)
cmd_result.assert_exit_code()
class Mkfsxfs(Mkfs):
def _install(self) -> bool:
posix_os: Posix = cast(Posix, self.node.os)
posix_os.install_packages("xfsprogs")
return self._check_exists()
class Mkfsext(Mkfs):
def _install(self) -> bool:
posix_os: Posix = cast(Posix, self.node.os)
posix_os.install_packages("e2fsprogs")
return self._check_exists()

46
lisa/tools/mount.py Normal file
Просмотреть файл

@ -0,0 +1,46 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import re
from pathlib import PurePosixPath
from typing import cast
from lisa.executable import Tool
from lisa.operating_system import Posix
from lisa.tools import Fdisk
from lisa.util import LisaException
class Mount(Tool):
__UMOUNT_ERROR_PATTERN = re.compile(r".*mountpoint not found", re.MULTILINE)
@property
def command(self) -> str:
return "mount"
@property
def can_install(self) -> bool:
return True
def mount(self, disk_name: str, point: str) -> None:
self.node.shell.mkdir(PurePosixPath(point), exist_ok=True)
cmd_result = self.node.execute(
f"mount {disk_name} {point}", shell=True, sudo=True
)
cmd_result.assert_exit_code()
def umount(self, disk_name: str, point: str, erase: bool = True) -> None:
cmd_result = self.node.execute(f"umount {point}", shell=True, sudo=True)
if erase:
fdisk = self.node.tools[Fdisk]
fdisk.delete_partition(disk_name)
self.node.execute(f"rm -r {point}", shell=True, sudo=True)
if (
not self.__UMOUNT_ERROR_PATTERN.match(cmd_result.stdout)
and 0 != cmd_result.exit_code
):
raise LisaException(f"Fail to run umount {point}.")
def _install(self) -> bool:
posix_os: Posix = cast(Posix, self.node.os)
posix_os.install_packages("util-linux")
return self._check_exists()

Просмотреть файл

@ -1,15 +1,23 @@
# Copyright (c) Microsoft Corporation. # Copyright (c) Microsoft Corporation.
# Licensed under the MIT license. # Licensed under the MIT license.
import re
from typing import cast from typing import cast
from lisa.executable import Tool from lisa.executable import Tool
from lisa.operating_system import Posix from lisa.operating_system import Posix
from lisa.tools import Git, Make from lisa.tools import Git, Make
from lisa.util import find_patterns_in_lines
from lisa.util.process import ExecutableResult
class Nvmecli(Tool): class Nvmecli(Tool):
repo = "https://github.com/linux-nvme/nvme-cli" repo = "https://github.com/linux-nvme/nvme-cli"
# error_count\t: 0
__error_count_pattern = re.compile(r"^error_count.*:[ ]+([\d]+)\r?$", re.M)
# [3:3] : 0 NS Management and Attachment Supported
__ns_management_attachement_support = "NS Management and Attachment Supported"
# [1:1] : 0x1 Format NVM Supported
__format_device_support = "Format NVM Supported"
@property @property
def command(self) -> str: def command(self) -> str:
@ -30,6 +38,18 @@ class Nvmecli(Tool):
code_path = tool_path.joinpath("nvme-cli") code_path = tool_path.joinpath("nvme-cli")
make.make_and_install(cwd=code_path) make.make_and_install(cwd=code_path)
def create_namespace(self, namespace: str) -> ExecutableResult:
return self.run(f"create-ns {namespace}", shell=True, sudo=True)
def delete_namespace(self, namespace: str, id: int) -> ExecutableResult:
return self.run(f"delete-ns -n {id} {namespace}", shell=True, sudo=True)
def detach_namespace(self, namespace: str, id: int) -> ExecutableResult:
return self.run(f"detach-ns -n {id} {namespace}", shell=True, sudo=True)
def format_namespace(self, namespace: str) -> ExecutableResult:
return self.run(f"format {namespace}", shell=True, sudo=True)
def install(self) -> bool: def install(self) -> bool:
if not self._check_exists(): if not self._check_exists():
posix_os: Posix = cast(Posix, self.node.os) posix_os: Posix = cast(Posix, self.node.os)
@ -38,3 +58,22 @@ class Nvmecli(Tool):
if not self._check_exists(): if not self._check_exists():
self._install_from_src() self._install_from_src()
return self._check_exists() return self._check_exists()
def get_error_count(self, namespace: str) -> int:
error_log = self.run(f"error-log {namespace}", shell=True, sudo=True)
error_count = 0
# for row in error_log.stdout.splitlines():
errors = find_patterns_in_lines(error_log.stdout, [self.__error_count_pattern])
if errors[0]:
error_count = sum([int(element) for element in errors[0]])
return error_count
def support_ns_manage_attach(self, device_name: str) -> bool:
cmd_result = self.run(f"id-ctrl -H {device_name}", shell=True, sudo=True)
cmd_result.assert_exit_code()
return self.__ns_management_attachement_support in cmd_result.stdout
def support_device_format(self, device_name: str) -> bool:
cmd_result = self.run(f"id-ctrl -H {device_name}", shell=True, sudo=True)
cmd_result.assert_exit_code()
return self.__format_device_support in cmd_result.stdout

Просмотреть файл

@ -154,10 +154,9 @@ def set_filtered_fields(src: Any, dest: Any, fields: List[str]) -> None:
def find_patterns_in_lines(lines: str, patterns: List[Pattern[str]]) -> List[List[str]]: def find_patterns_in_lines(lines: str, patterns: List[Pattern[str]]) -> List[List[str]]:
results: List[List[str]] = [[]] * len(patterns) results: List[List[str]] = [[]] * len(patterns)
for line in lines.splitlines(keepends=False): for index, pattern in enumerate(patterns):
for index, pattern in enumerate(patterns): if not results[index]:
if not results[index]: results[index] = pattern.findall(lines)
results[index] = pattern.findall(line)
return results return results

Просмотреть файл

@ -8,9 +8,10 @@ import signal
import subprocess import subprocess
import time import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict, Optional from typing import Dict, List, Optional, Union
import spur # type: ignore import spur # type: ignore
from assertpy.assertpy import AssertionBuilder, assert_that
from spur.errors import NoSuchCommandError # type: ignore from spur.errors import NoSuchCommandError # type: ignore
from lisa.util.logger import Logger, LogWriter, get_logger from lisa.util.logger import Logger, LogWriter, get_logger
@ -23,11 +24,24 @@ class ExecutableResult:
stdout: str stdout: str
stderr: str stderr: str
exit_code: Optional[int] exit_code: Optional[int]
cmd: Union[str, List[str]]
elapsed: float elapsed: float
def __str__(self) -> str: def __str__(self) -> str:
return self.stdout return self.stdout
def assert_exit_code(
self, expected_exit_code: int = 0, message: str = ""
) -> AssertionBuilder:
message = ". ".join([message, f"get unexpected exit code on cmd {self.cmd}"])
return assert_that(self.exit_code, message).is_equal_to(expected_exit_code)
def assert_stderr(
self, expected_stderr: str = "", message: str = ""
) -> AssertionBuilder:
message = ". ".join([message, f"get unexpected stderr on cmd {self.cmd}"])
return assert_that(self.stderr, message).is_equal_to(expected_stderr)
# TODO: So much cleanup here. It was using duck typing. # TODO: So much cleanup here. It was using duck typing.
class Process: class Process:
@ -116,12 +130,14 @@ class Process:
store_pid=self._is_posix, store_pid=self._is_posix,
encoding="utf-8", encoding="utf-8",
) )
# save for logging.
self._cmd = split_command
self._running = True self._running = True
except (FileNotFoundError, NoSuchCommandError) as identifier: except (FileNotFoundError, NoSuchCommandError) as identifier:
# FileNotFoundError: not found command on Windows # FileNotFoundError: not found command on Windows
# NoSuchCommandError: not found command on remote Posix # NoSuchCommandError: not found command on remote Posix
self._result = ExecutableResult( self._result = ExecutableResult(
"", identifier.strerror, 1, self._timer.elapsed() "", identifier.strerror, 1, split_command, self._timer.elapsed()
) )
self._log.log(stderr_level, f"not found command: {identifier}") self._log.log(stderr_level, f"not found command: {identifier}")
@ -137,7 +153,6 @@ class Process:
self.kill() self.kill()
if self._result is None: if self._result is None:
# if not isinstance(self._process, ExecutableResult):
assert self._process assert self._process
process_result = self._process.wait_for_result() process_result = self._process.wait_for_result()
self._stdout_writer.close() self._stdout_writer.close()
@ -147,6 +162,7 @@ class Process:
process_result.output.strip(), process_result.output.strip(),
process_result.stderr_output.strip(), process_result.stderr_output.strip(),
process_result.return_code, process_result.return_code,
self._cmd,
self._timer.elapsed(), self._timer.elapsed(),
) )
# TODO: The spur library is not very good and leaves open # TODO: The spur library is not very good and leaves open

Просмотреть файл

@ -160,10 +160,7 @@ class TimeSync(TestSuite):
sudo=True, sudo=True,
shell=True, shell=True,
) )
assert_that(cmd_result.exit_code).described_as( cmd_result.assert_exit_code()
f"Fail to execute command "
f"[echo {clock_source_result.stdout} > {self.unbind_clocksource}]."
).is_equal_to(0)
clock_source_result_expected = _wait_file_changed( clock_source_result_expected = _wait_file_changed(
node, self.current_clocksource, available_clocksources_array node, self.current_clocksource, available_clocksources_array
@ -225,10 +222,7 @@ class TimeSync(TestSuite):
sudo=True, sudo=True,
shell=True, shell=True,
) )
assert_that(cmd_result.exit_code).described_as( cmd_result.assert_exit_code()
f"Fail to execute command "
f"[echo {clock_event_name} > {self.unbind_clockevent}]."
).is_equal_to(0)
clock_event_result_expected = _wait_file_changed( clock_event_result_expected = _wait_file_changed(
node, self.current_clockevent, "lapic" node, self.current_clockevent, "lapic"

Просмотреть файл

@ -8,7 +8,22 @@ from lisa import Environment, Node, TestCaseMetadata, TestSuite, TestSuiteMetada
from lisa.features import Nvme from lisa.features import Nvme
from lisa.sut_orchestrator.azure.platform_ import AzurePlatform from lisa.sut_orchestrator.azure.platform_ import AzurePlatform
from lisa.testsuite import simple_requirement from lisa.testsuite import simple_requirement
from lisa.tools import Lscpu from lisa.tools import Cat, Fdisk, Lscpu, Lspci, Mount, Nvmecli
from lisa.tools.fdisk import FileSystem
from lisa.util import SkippedException
def _format_mount_disk(
node: Node,
namespace: str,
file_system: FileSystem,
) -> None:
mount_point = namespace.rpartition("/")[-1]
fdisk = node.tools[Fdisk]
mount = node.tools[Mount]
mount.umount(namespace, mount_point)
fdisk.make_partition(namespace, file_system)
mount.mount(f"{namespace}p1", mount_point)
@TestSuiteMetadata( @TestSuiteMetadata(
@ -36,7 +51,7 @@ class nvme(TestSuite):
4. Azure platform only, nvme devices count should equal to 4. Azure platform only, nvme devices count should equal to
actual vCPU count / 8. actual vCPU count / 8.
""", """,
priority=0, priority=1,
requirement=simple_requirement( requirement=simple_requirement(
supported_features=[Nvme], supported_features=[Nvme],
), ),
@ -77,3 +92,282 @@ class nvme(TestSuite):
assert_that(nvme_namespace).described_as( assert_that(nvme_namespace).described_as(
"nvme devices count should be equal to [vCPU/8]." "nvme devices count should be equal to [vCPU/8]."
).is_length(expected_count) ).is_length(expected_count)
@TestCaseMetadata(
description="""
This test case will do following things for each NVMe device.
1. Get the number of errors from nvme-cli before operations.
2. Create a partition, filesystem and mount it.
3. Create a txt file on the partition, content is 'TestContent'.
4. Create a file 'data' on the partition, get the md5sum value.
5. Umount and remount the partition.
6. Get the txt file content, compare the value.
7. Compare the number of errors from nvme-cli after operations.
""",
priority=2,
requirement=simple_requirement(
supported_features=[Nvme],
),
)
def nvme_function_validation(self, node: Node) -> None:
nvme = node.features[Nvme]
nvme_namespaces = nvme.get_namespaces()
nvme_cli = node.tools[Nvmecli]
cat = node.tools[Cat]
mount = node.tools[Mount]
for namespace in nvme_namespaces:
# 1. Get the number of errors from nvme-cli before operations.
error_count_before_operations = nvme_cli.get_error_count(namespace)
# 2. Create a partition, filesystem and mount it.
_format_mount_disk(node, namespace, FileSystem.ext4)
# 3. Create a txt file on the partition, content is 'TestContent'.
mount_point = namespace.rpartition("/")[-1]
cmd_result = node.execute(
f"echo TestContent > {mount_point}/testfile.txt", shell=True, sudo=True
)
cmd_result.assert_exit_code(
message=f"{mount_point}/testfile.txt may not exist."
)
# 4. Create a file 'data' on the partition, get the md5sum value.
cmd_result = node.execute(
f"dd if=/dev/zero of={mount_point}/data bs=10M count=100",
shell=True,
sudo=True,
)
cmd_result.assert_exit_code(
message=f"{mount_point}/data is not created successfully, "
"please check the disk space."
)
initial_md5 = node.execute(
f"md5sum {mount_point}/data", shell=True, sudo=True
)
initial_md5.assert_exit_code(
message=f"{mount_point}/data not exist or md5sum command enounter"
" unexpected error."
)
# 5. Umount and remount the partition.
mount.umount(namespace, mount_point, erase=False)
mount.mount(f"{namespace}p1", mount_point)
# 6. Get the txt file content, compare the value.
file_content = cat.run(f"{mount_point}/testfile.txt", shell=True, sudo=True)
assert_that(
file_content.stdout,
f"content of {mount_point}/testfile.txt should keep consistent "
"after umount and re-mount.",
).is_equal_to("TestContent")
# 6. Get md5sum value of file 'data', compare with initial value.
final_md5 = node.execute(
f"md5sum {mount_point}/data", shell=True, sudo=True
)
assert_that(
initial_md5.stdout,
f"md5sum of {mount_point}/data should keep consistent "
"after umount and re-mount.",
).is_equal_to(final_md5.stdout)
# 7. Compare the number of errors from nvme-cli after operations.
error_count_after_operations = nvme_cli.get_error_count(namespace)
assert_that(
error_count_before_operations,
"error-log should not increase after operations.",
).is_equal_to(error_count_after_operations)
mount.umount(disk_name=namespace, point=mount_point)
@TestCaseMetadata(
description="""
This test case will
1. Create a partition, xfs filesystem and mount it.
2. Check how much the mountpoint is trimmed before operations.
3. Create a 300 gb file 'data' using dd command in the partition.
4. Check how much the mountpoint is trimmed after creating the file.
5. Delete the file 'data'.
6. Check how much the mountpoint is trimmed after deleting the file,
and compare the final fstrim status with initial fstrim status.
""",
priority=3,
requirement=simple_requirement(
supported_features=[Nvme],
),
)
def nvme_fstrim_validation(self, node: Node) -> None:
nvme = node.features[Nvme]
nvme_namespaces = nvme.get_namespaces()
mount = node.tools[Mount]
for namespace in nvme_namespaces:
mount_point = namespace.rpartition("/")[-1]
mount.umount(disk_name=namespace, point=mount_point)
# 1. Create a partition, xfs filesystem and mount it.
_format_mount_disk(node, namespace, FileSystem.xfs)
# 2. Check how much the mountpoint is trimmed before operations.
initial_fstrim = node.execute(
f"fstrim {mount_point} -v", shell=True, sudo=True
)
initial_fstrim.assert_exit_code(
message=f"{mount_point} not exist or fstrim command enounter "
"unexpected error."
)
# 3. Create a 300 gb file 'data' using dd command in the partition.
cmd_result = node.execute(
f"dd if=/dev/zero of={mount_point}/data bs=1G count=300",
shell=True,
sudo=True,
)
cmd_result.assert_exit_code(
message=f"{mount_point}/data is not created successfully, "
"please check the disk space."
)
# 4. Check how much the mountpoint is trimmed after creating the file.
intermediate_fstrim = node.execute(
f"fstrim {mount_point} -v", shell=True, sudo=True
)
intermediate_fstrim.assert_exit_code(
message=f"{mount_point} not exist or fstrim command enounter "
"unexpected error."
)
# 5. Delete the file 'data'.
node.execute(f"rm {mount_point}/data", shell=True, sudo=True)
# 6. Check how much the mountpoint is trimmed after deleting the file,
# and compare the final fstrim status with initial fstrim status.
final_fstrim = node.execute(
f"fstrim {mount_point} -v", shell=True, sudo=True
)
mount.umount(disk_name=namespace, point=mount_point)
assert_that(
final_fstrim.stdout,
"initial_fstrim should equal to final_fstrim after operations "
"after umount and re-mount.",
).is_equal_to(initial_fstrim.stdout)
@TestCaseMetadata(
description="""
This test case will
1. Create a partition, xfs filesystem and mount it.
2. Umount the mountpoint.
3. Run blkdiscard command on the partition.
4. Remount command should fail after run blkdiscard command.
""",
priority=3,
requirement=simple_requirement(
supported_features=[Nvme],
),
)
def nvme_blkdiscard_validation(self, node: Node) -> None:
os_version = node.os._get_os_version()
if "Ubuntu" == os_version.vendor and "14.04" == os_version.release:
raise SkippedException(
f"blkdiscard is not supported with distro {os_version.vendor} and "
f"version {os_version.release}"
)
nvme = node.features[Nvme]
nvme_namespaces = nvme.get_namespaces()
mount = node.tools[Mount]
for namespace in nvme_namespaces:
mount_point = namespace.rpartition("/")[-1]
mount.umount(disk_name=namespace, point=mount_point)
# 1. Create a partition, xfs filesystem and mount it.
_format_mount_disk(node, namespace, FileSystem.xfs)
# 2. Umount the mountpoint.
mount.umount(disk_name=namespace, point=mount_point, erase=False)
# 3. Run blkdiscard command on the partition.
blkdiscard = node.execute(
f"blkdiscard -v {namespace}p1", shell=True, sudo=True
)
if 0 != blkdiscard.exit_code:
blkdiscard = node.execute(
f"blkdiscard -f -v {namespace}p1", shell=True, sudo=True
)
blkdiscard.assert_exit_code(
message=f"{namespace}p1 not exist or blkdiscard command enounter "
"unexpected error."
)
# 4. Remount command should fail after run blkdiscard command.
mount_result = node.execute(
f"mount {namespace}p1 {mount_point}", shell=True, sudo=True
)
mount_result.assert_exit_code(expected_exit_code=32)
@TestCaseMetadata(
description="""
This test case will run commands 2-5, the commands are expected fail or not
based on the capabilities of the device.
1. Use `nvme id-ctrl device` command list the capabilities of the device.
1.1 When 'Format NVM Supported' shown up in output of 'nvme id-ctrl device',
then nvme disk can be format, otherwise, it can't be format.
1.2 When 'NS Management and Attachment Supported' shown up in output of
'nvme id-ctrl device', nvme namespace can be created, deleted and detached,
otherwise it can't be managed.
2. `nvme format namespace` - format a namespace.
3. `nvme create-ns namespace` - create a namespace.
4. `nvme delete-ns -n 1 namespace` - delete a namespace.
5. `nvme detach-ns -n 1 namespace` - detach a namespace.
""",
priority=3,
requirement=simple_requirement(
supported_features=[Nvme],
),
)
def nvme_manage_ns_validation(self, node: Node) -> None:
nvme = node.features[Nvme]
nvme_namespaces = nvme.get_namespaces()
nvme_devices = nvme.get_devices()
nvme_cli = node.tools[Nvmecli]
device_format_exit_code = 0
ns_management_exit_code = 0
# 1. Use `nvme id-ctrl device` command list the capabilities of the device.
# 1.1 When 'Format NVM Supported' shown up in output of 'nvme id-ctrl device',
# then nvme disk can be format, otherwise, it can't be format.
if not nvme_cli.support_device_format(nvme_devices[0]):
device_format_exit_code = 1
# 1.2 When 'NS Management and Attachment Supported' shown up in output of
# 'nvme id-ctrl device', nvme namespace can be created, deleted and detached,
# otherwise it can't be managed.
if not nvme_cli.support_ns_manage_attach(nvme_devices[0]):
# NVMe Status:INVALID_OPCODE(1)
ns_management_exit_code = 1
for namespace in nvme_namespaces:
# 2. `nvme format namespace` - format a namespace.
format_namespace = nvme_cli.format_namespace(namespace)
format_namespace.assert_exit_code(device_format_exit_code)
# 3. `nvme create-ns namespace` - create a namespace.
create_namespace = nvme_cli.create_namespace(namespace)
create_namespace.assert_exit_code(ns_management_exit_code)
# 4. `nvme delete-ns -n 1 namespace` - delete a namespace.
delete_namespace = nvme_cli.delete_namespace(namespace, 1)
delete_namespace.assert_exit_code(ns_management_exit_code)
# 5. `nvme detach-ns -n 1 namespace` - detach a namespace.
detach_namespace = nvme_cli.detach_namespace(namespace, 1)
detach_namespace.assert_exit_code(ns_management_exit_code)
@TestCaseMetadata(
description="""
This test case will
1. Disable NVME devices.
2. Enable NVME device.
""",
priority=2,
requirement=simple_requirement(
supported_features=[Nvme],
),
)
def nvme_rescind_validation(self, node: Node) -> None:
lspci = node.tools[Lspci]
# 1. Disable NVME devices.
lspci.disable_devices(device_type="NVME")
# 2. Enable NVME device.
lspci.enable_devices()