diff --git a/examples/benchmarks/ib_loopback_performance.py b/examples/benchmarks/ib_loopback_performance.py new file mode 100644 index 00000000..0d3b8433 --- /dev/null +++ b/examples/benchmarks/ib_loopback_performance.py @@ -0,0 +1,22 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Micro benchmark example for IB loopback performance. + +Commands to run: + python examples/benchmarks/ib_loopback_performance_performance.py +""" + +from superbench.benchmarks import BenchmarkRegistry +from superbench.common.utils import logger + +if __name__ == '__main__': + context = BenchmarkRegistry.create_benchmark_context('ib-loopback') + + benchmark = BenchmarkRegistry.launch_benchmark(context) + if benchmark: + logger.info( + 'benchmark: {}, return code: {}, result: {}'.format( + benchmark.name, benchmark.return_code, benchmark.result + ) + ) diff --git a/superbench/benchmarks/micro_benchmarks/__init__.py b/superbench/benchmarks/micro_benchmarks/__init__.py index 1f54e265..901a52d2 100644 --- a/superbench/benchmarks/micro_benchmarks/__init__.py +++ b/superbench/benchmarks/micro_benchmarks/__init__.py @@ -12,8 +12,9 @@ from superbench.benchmarks.micro_benchmarks.cudnn_function import CudnnBenchmark from superbench.benchmarks.micro_benchmarks.gemm_flops_performance import GemmFlopsCuda from superbench.benchmarks.micro_benchmarks.cuda_memory_bw_performance import CudaMemBwBenchmark from superbench.benchmarks.micro_benchmarks.disk_performance import DiskBenchmark +from superbench.benchmarks.micro_benchmarks.ib_loopback_performance import IBLoopbackBenchmark __all__ = [ 'MicroBenchmark', 'MicroBenchmarkWithInvoke', 'ShardingMatmul', 'ComputationCommunicationOverlap', 'KernelLaunch', - 'CublasBenchmark', 'CudnnBenchmark', 'GemmFlopsCuda', 'CudaMemBwBenchmark', 'DiskBenchmark' + 'CublasBenchmark', 'CudnnBenchmark', 'GemmFlopsCuda', 'CudaMemBwBenchmark', 'DiskBenchmark', 'IBLoopbackBenchmark' ] diff --git a/superbench/benchmarks/micro_benchmarks/ib_loopback_performance.py b/superbench/benchmarks/micro_benchmarks/ib_loopback_performance.py new file mode 100644 index 00000000..994ebd79 --- /dev/null +++ b/superbench/benchmarks/micro_benchmarks/ib_loopback_performance.py @@ -0,0 +1,224 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Module of the IB loopback benchmarks.""" + +import os +from pathlib import Path + +from superbench.common.utils import logger +from superbench.common.utils import network +from superbench.benchmarks import BenchmarkRegistry, ReturnCode +from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke + + +def get_numa_cores(numa_index): + """Get the available cores from different physical cpu core of NUMA. + + Args: + numa_index (int): the index of numa node. + + Return: + list: The available cores from different physical cpu core of NUMA. + None if no available cores or numa index. + """ + try: + with Path(f'/sys/devices/system/node/node{numa_index}/cpulist').open('r') as f: + cores = [] + core_ranges = f.read().strip().split(',') + for core_range in core_ranges: + start, end = core_range.split('-') + for core in range(int(start), int(end) + 1): + cores.append(core) + return cores + except IOError: + return None + + +class IBLoopbackBenchmark(MicroBenchmarkWithInvoke): + """The IB loopback performance benchmark class.""" + def __init__(self, name, parameters=''): + """Constructor. + + Args: + name (str): benchmark name. + parameters (str): benchmark parameters. + """ + super().__init__(name, parameters) + + self._bin_name = 'run_perftest_loopback' + self.__support_ib_commands = {'write': 'ib_write_bw', 'read': 'ib_read_bw', 'send': 'ib_send_bw'} + + def add_parser_arguments(self): + """Add the specified arguments.""" + super().add_parser_arguments() + + self._parser.add_argument( + '--ib_index', + type=int, + default=0, + required=False, + help='The index of ib device.', + ) + self._parser.add_argument( + '--iters', + type=int, + default=20000, + required=False, + help='The iterations of running ib command', + ) + self._parser.add_argument( + '--msg_size', + type=int, + default=None, + required=False, + help='The message size of running ib command, e.g., 8388608.', + ) + self._parser.add_argument( + '--commands', + type=str, + nargs='+', + default=['write'], + help='The ib command used to run, e.g., {}.'.format(' '.join(list(self.__support_ib_commands.keys()))), + ) + self._parser.add_argument( + '--numa', + type=int, + default=0, + required=False, + help='The index of numa node.', + ) + self._parser.add_argument( + '--gid_index', + type=int, + default=0, + required=False, + help='Test uses GID with GID index taken from command.', + ) + + def __get_arguments_from_env(self): + """Read environment variables from runner used for parallel and fill in ib_index and numa_node_index. + + Get 'PROC_RANK'(rank of current process) 'IB_DEVICES' 'NUMA_NODES' environment variables + Get ib_index and numa_node_index according to 'NUMA_NODES'['PROC_RANK'] and 'IB_DEVICES'['PROC_RANK'] + Note: The config from env variables will overwrite the configs defined in the command line + """ + try: + if os.getenv('PROC_RANK'): + rank = int(os.getenv('PROC_RANK')) + if os.getenv('IB_DEVICES'): + self._args.ib_index = int(os.getenv('IB_DEVICES').split(',')[rank]) + if os.getenv('NUMA_NODES'): + self._args.numa = int(os.getenv('NUMA_NODES').split(',')[rank]) + return True + except BaseException: + logger.error('The proc_rank is out of index of devices - benchmark: {}.'.format(self._name)) + return False + + def _preprocess(self): + """Preprocess/preparation operations before the benchmarking. + + Return: + True if _preprocess() succeed. + """ + if not super()._preprocess(): + return False + + if not self.__get_arguments_from_env(): + return False + + # Format the arguments + self._args.commands = [command.lower() for command in self._args.commands] + + # Check whether arguments are valid + command_mode = '' + if self._args.msg_size is None: + command_mode = ' -a' + else: + command_mode = ' -s ' + str(self._args.msg_size) + + for ib_command in self._args.commands: + if ib_command not in self.__support_ib_commands: + self._result.set_return_code(ReturnCode.INVALID_ARGUMENT) + logger.error( + 'Unsupported ib command - benchmark: {}, command: {}, expected: {}.'.format( + self._name, ib_command, ' '.join(list(self.__support_ib_commands.keys())) + ) + ) + return False + else: + try: + command = os.path.join(self._args.bin_dir, self._bin_name) + numa_cores = get_numa_cores(self._args.numa) + if len(numa_cores) < 2: + self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) + logger.error('Getting numa core devices failure - benchmark: {}.'.format(self._name)) + return False + if len(numa_cores) >= 4: + server_core = int(numa_cores[-1]) + client_core = int(numa_cores[-3]) + else: + server_core = int(numa_cores[-1]) + client_core = int(numa_cores[-2]) + command += ' ' + str(server_core) + ' ' + str(client_core) + command += ' ' + self.__support_ib_commands[ib_command] + command += command_mode + ' -F' + command += ' --iters=' + str(self._args.iters) + command += ' -d ' + network.get_ib_devices()[self._args.ib_index] + command += ' -p ' + str(network.get_free_port()) + command += ' -x ' + str(self._args.gid_index) + self._commands.append(command) + except BaseException as e: + self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) + logger.error('Getting ib devices failure - benchmark: {}, message: {}.'.format(self._name, str(e))) + return False + return True + + def _process_raw_result(self, cmd_idx, raw_output): + """Function to parse raw results and save the summarized results. + + self._result.add_raw_data() and self._result.add_result() need to be called to save the results. + + Args: + cmd_idx (int): the index of command corresponding with the raw_output. + raw_output (str): raw output string of the micro-benchmark. + + Return: + True if the raw output string is valid and result can be extracted. + """ + self._result.add_raw_data( + 'raw_output_' + self._args.commands[cmd_idx] + '_IB' + str(self._args.ib_index), raw_output + ) + + valid = False + content = raw_output.splitlines() + + metric_set = set() + for line in content: + try: + values = list(filter(None, line.split(' '))) + if len(values) != 5: + continue + # Extract value from the line + size = int(values[0]) + avg_bw = float(values[-2]) + metric = 'IB_{}_{}_Avg_{}'.format(self._args.commands[cmd_idx], size, str(self._args.ib_index)) + # Filter useless value in client output + if metric not in metric_set: + metric_set.add(metric) + self._result.add_result(metric, avg_bw) + valid = True + except BaseException: + pass + if valid is False: + logger.error( + 'The result format is invalid - round: {}, benchmark: {}, raw output: {}.'.format( + self._curr_run_index, self._name, raw_output + ) + ) + return False + + return True + + +BenchmarkRegistry.register_benchmark('ib-loopback', IBLoopbackBenchmark) diff --git a/superbench/benchmarks/return_code.py b/superbench/benchmarks/return_code.py index 0991ddb2..da207d01 100644 --- a/superbench/benchmarks/return_code.py +++ b/superbench/benchmarks/return_code.py @@ -28,3 +28,4 @@ class ReturnCode(Enum): MICROBENCHMARK_EXECUTION_FAILURE = 32 MICROBENCHMARK_RESULT_PARSING_FAILURE = 33 MICROBENCHMARK_UNSUPPORTED_ARCHITECTURE = 34 + MICROBENCHMARK_DEVICE_GETTING_FAILURE = 35 diff --git a/superbench/config/default.yaml b/superbench/config/default.yaml index dc5404e8..a5328b0b 100644 --- a/superbench/config/default.yaml +++ b/superbench/config/default.yaml @@ -28,6 +28,17 @@ superbench: model_action: - train benchmarks: + ib-loopback: + enable: true + modes: + - name: local + proc_num: 4 + prefix: PROC_RANK={proc_rank} IB_DEVICES=0,2,4,6 NUMA_NODES=1,0,3,2 + parallel: yes + - name: local + proc_num: 4 + prefix: PROC_RANK={proc_rank} IB_DEVICES=1,3,5,7 NUMA_NODES=1,0,3,2 + parallel: yes disk-benchmark: enable: false modes: diff --git a/tests/benchmarks/micro_benchmarks/test_ib_loopback_performance.py b/tests/benchmarks/micro_benchmarks/test_ib_loopback_performance.py new file mode 100644 index 00000000..9a03f43d --- /dev/null +++ b/tests/benchmarks/micro_benchmarks/test_ib_loopback_performance.py @@ -0,0 +1,293 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for ib-loopback benchmark.""" + +import os +import numbers +import unittest +from pathlib import Path +from unittest import mock + +from superbench.benchmarks import BenchmarkRegistry, Platform, BenchmarkType, ReturnCode +from superbench.common.utils import network +from superbench.benchmarks.micro_benchmarks import ib_loopback_performance + + +class IBLoopbackBenchmarkTest(unittest.TestCase): + """Tests for IBLoopbackBenchmark benchmark.""" + def setUp(self): + """Method called to prepare the test fixture.""" + if (len(network.get_ib_devices()) < 1): + # Create fake binary file just for testing. + os.environ['SB_MICRO_PATH'] = '/tmp/superbench' + binary_path = Path(os.getenv('SB_MICRO_PATH'), 'bin') + binary_path.mkdir(parents=True, exist_ok=True) + self.__binary_file = Path(binary_path, 'run_perftest_loopback') + self.__binary_file.touch(mode=0o755, exist_ok=True) + + def tearDown(self): + """Method called after the test method has been called and the result recorded.""" + if (len(network.get_ib_devices()) < 1): + self.__binary_file.unlink() + + def test_ib_loopback_util(self): + """Test util functions 'get_numa_cores' and 'get_free_port' used in ib-loopback benchmark.""" + port = network.get_free_port() + assert (isinstance(port, numbers.Number)) + numa_cores = ib_loopback_performance.get_numa_cores(0) + assert (len(numa_cores) >= 2) + for i in range(len(numa_cores)): + assert (isinstance(numa_cores[i], numbers.Number)) + + @mock.patch('superbench.common.utils.network.get_free_port') + @mock.patch('superbench.benchmarks.micro_benchmarks.ib_loopback_performance.get_numa_cores') + @mock.patch('superbench.common.utils.network.get_ib_devices') + def test_ib_loopback_all_sizes(self, mock_ib_devices, mock_numa_cores, mock_port): + """Test ib-loopback benchmark for all sizes.""" + raw_output = """ +************************************ +* Waiting for client to connect... * +************************************ +--------------------------------------------------------------------------------------- + RDMA_Write BW Test +Dual-port : OFF Device : ibP257p0s0 +Number of qps : 1 Transport type : IB +Connection type : RC Using SRQ : OFF +PCIe relax order: ON +--------------------------------------------------------------------------------------- + RDMA_Write BW Test +Dual-port : OFF Device : ibP257p0s0 +Number of qps : 1 Transport type : IB +Connection type : RC Using SRQ : OFF +PCIe relax order: ON +ibv_wr* API : ON +TX depth : 128 +CQ Moderation : 100 +Mtu : 4096[B] +Link type : IB +Max inline data : 0[B] +rdma_cm QPs : OFF +Data ex. method : Ethernet +--------------------------------------------------------------------------------------- +ibv_wr* API : ON +CQ Moderation : 100 +Mtu : 4096[B] +Link type : IB +Max inline data : 0[B] +rdma_cm QPs : OFF +Data ex. method : Ethernet +--------------------------------------------------------------------------------------- +local address: LID 0xd06 QPN 0x092f PSN 0x3ff1bc RKey 0x080329 VAddr 0x007fc97ff50000 +local address: LID 0xd06 QPN 0x092e PSN 0x3eb82d RKey 0x080228 VAddr 0x007f19adcbf000 +remote address: LID 0xd06 QPN 0x092e PSN 0x3eb82d RKey 0x080228 VAddr 0x007f19adcbf000 +remote address: LID 0xd06 QPN 0x092f PSN 0x3ff1bc RKey 0x080329 VAddr 0x007fc97ff50000 +--------------------------------------------------------------------------------------- +--------------------------------------------------------------------------------------- +#bytes #iterations BW peak[MB/sec] BW average[MB/sec] MsgRate[Mpps] +#bytes #iterations BW peak[MB/sec] BW average[MB/sec] MsgRate[Mpps] +2 2000 5.32 5.30 2.778732 +4 2000 10.65 10.64 2.788833 +8 2000 21.30 21.27 2.787609 +16 2000 42.60 42.55 2.788268 +32 2000 84.90 82.82 2.713896 +64 2000 173.55 171.66 2.812504 +128 2000 362.27 353.83 2.898535 +256 2000 687.82 679.37 2.782698 +512 2000 1337.12 1311.59 2.686135 +1024 2000 2674.25 2649.39 2.712980 +2048 2000 5248.56 5118.18 2.620509 +4096 2000 10034.02 9948.41 2.546793 +8192 2000 18620.51 12782.56 1.636168 +16384 2000 23115.27 16782.50 1.074080 +32768 2000 22927.94 18586.03 0.594753 +65536 2000 23330.56 21167.79 0.338685 +131072 2000 22750.35 21443.14 0.171545 +262144 2000 22673.63 22411.35 0.089645 +524288 2000 22679.02 22678.86 0.045358 +1048576 2000 22817.06 22816.86 0.022817 +2097152 2000 22919.37 22919.27 0.011460 +4194304 2000 23277.93 23277.91 0.005819 +8388608 2000 23240.68 23240.68 0.002905 +--------------------------------------------------------------------------------------- +8388608 2000 23240.68 23240.68 0.002905 +--------------------------------------------------------------------------------------- + """ + # Test without ib devices + # Check registry. + benchmark_name = 'ib-loopback' + (benchmark_class, + predefine_params) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CPU) + assert (benchmark_class) + + # Check preprocess + # Negative case + parameters = '--ib_index 0 --numa 0 --iters 2000' + benchmark = benchmark_class(benchmark_name, parameters=parameters) + mock_ib_devices.return_value = None + ret = benchmark._preprocess() + assert (ret is False) + assert (benchmark.return_code == ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) + parameters = '--ib_index 0 --numa 0 --iters 2000' + benchmark = benchmark_class(benchmark_name, parameters=parameters) + mock_numa_cores.return_value = None + ret = benchmark._preprocess() + assert (ret is False) + assert (benchmark.return_code == ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) + # Positive case + parameters = '--ib_index 0 --numa 0 --iters 2000' + benchmark = benchmark_class(benchmark_name, parameters=parameters) + + mock_ib_devices.return_value = ['mlx5_0'] + mock_numa_cores.return_value = [0, 1, 2, 3] + mock_port.return_value = 10000 + os.environ['PROC_RANK'] = '0' + os.environ['IB_DEVICES'] = '0,2,4,6' + os.environ['NUMA_NODES'] = '1,0,3,2' + ret = benchmark._preprocess() + assert (ret) + + expect_command = 'run_perftest_loopback 3 1 ib_write_bw -a -F --iters=2000 -d mlx5_0 -p 10000 -x 0' + command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1] + assert (command == expect_command) + + assert (benchmark._process_raw_result(0, raw_output)) + + # Check function process_raw_data. + # Positive case - valid raw output. + metric_list = [] + for ib_command in benchmark._args.commands: + for size in ['8388608', '4194304', '1024', '2']: + metric = 'IB_{}_{}_Avg_{}'.format(ib_command, size, str(benchmark._args.ib_index)) + metric_list.append(metric) + for metric in metric_list: + assert (metric in benchmark.result) + assert (len(benchmark.result[metric]) == 1) + assert (isinstance(benchmark.result[metric][0], numbers.Number)) + + # Negative case - Add invalid raw output. + assert (benchmark._process_raw_result(0, 'Invalid raw output') is False) + + # Check basic information. + assert (benchmark.name == 'ib-loopback') + assert (benchmark.type == BenchmarkType.MICRO) + assert (benchmark._bin_name == 'run_perftest_loopback') + + # Check parameters specified in BenchmarkContext. + assert (benchmark._args.ib_index == 0) + assert (benchmark._args.numa == 1) + assert (benchmark._args.iters == 2000) + assert (benchmark._args.commands == ['write']) + + @mock.patch('superbench.common.utils.network.get_free_port') + @mock.patch('superbench.benchmarks.micro_benchmarks.ib_loopback_performance.get_numa_cores') + @mock.patch('superbench.common.utils.network.get_ib_devices') + def test_ib_loopback_8M_size(self, mock_ib_devices, mock_numa_cores, mock_port): + """Test ib-loopback benchmark for 8M size.""" + raw_output = """ + RDMA_Write BW Test + Dual-port : OFF Device : ibP257p0s0 + Number of qps : 1 Transport type : IB + Connection type : RC Using SRQ : OFF + PCIe relax order: ON + TX depth : 128 + CQ Moderation : 1 + Mtu : 4096[B] + Link type : IB + Max inline data : 0[B] + rdma_cm QPs : OFF + Data ex. method : Ethernet +--------------------------------------------------------------------------------------- + local address: LID 0xd06 QPN 0x095f PSN 0x3c9e82 RKey 0x080359 VAddr 0x007f9fc479c000 + remote address: LID 0xd06 QPN 0x095e PSN 0xbd024b RKey 0x080258 VAddr 0x007fe62504b000 +--------------------------------------------------------------------------------------- + #bytes #iterations BW peak[MB/sec] BW average[MB/sec] MsgRate[Mpps] + 8388608 20000 24056.74 24056.72 0.003007 +************************************ +* Waiting for client to connect... * +************************************ +--------------------------------------------------------------------------------------- + RDMA_Write BW Test + Dual-port : OFF Device : ibP257p0s0 + Number of qps : 1 Transport type : IB + Connection type : RC Using SRQ : OFF + PCIe relax order: ON + CQ Moderation : 1 + Mtu : 4096[B] + Link type : IB + Max inline data : 0[B] + rdma_cm QPs : OFF + Data ex. method : Ethernet +--------------------------------------------------------------------------------------- + local address: LID 0xd06 QPN 0x095e PSN 0xbd024b RKey 0x080258 VAddr 0x007fe62504b000 + remote address: LID 0xd06 QPN 0x095f PSN 0x3c9e82 RKey 0x080359 VAddr 0x007f9fc479c000 +--------------------------------------------------------------------------------------- + #bytes #iterations BW peak[MB/sec] BW average[MB/sec] MsgRate[Mpps] + 8388608 20000 24056.74 24056.72 0.003007 +--------------------------------------------------------------------------------------- + +--------------------------------------------------------------------------------------- +--------------------------------------------------------------------------------------- +""" + # Test without ib devices + # Check registry. + benchmark_name = 'ib-loopback' + (benchmark_class, + predefine_params) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CPU) + assert (benchmark_class) + + # Check preprocess + # Negative case + parameters = '--ib_index 0 --numa 0 --iters 2000' + benchmark = benchmark_class(benchmark_name, parameters=parameters) + mock_ib_devices.return_value = None + ret = benchmark._preprocess() + assert (ret is False) + assert (benchmark.return_code == ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) + parameters = '--ib_index 0 --numa 0 --iters 2000' + benchmark = benchmark_class(benchmark_name, parameters=parameters) + mock_numa_cores.return_value = None + ret = benchmark._preprocess() + assert (ret is False) + assert (benchmark.return_code == ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) + # Positive case + parameters = '--ib_index 0 --numa 0 --iters 2000 --msg_size 8388608' + benchmark = benchmark_class(benchmark_name, parameters=parameters) + + mock_ib_devices.return_value = ['mlx5_0'] + mock_numa_cores.return_value = [0, 1, 2, 3] + mock_port.return_value = 10000 + ret = benchmark._preprocess() + assert (ret) + + expect_command = 'run_perftest_loopback 3 1 ib_write_bw -s 8388608 -F --iters=2000 -d mlx5_0 -p 10000 -x 0' + command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1] + assert (command == expect_command) + + assert (benchmark._process_raw_result(0, raw_output)) + + # Check function process_raw_data. + # Positive case - valid raw output. + metric_list = [] + for ib_command in benchmark._args.commands: + metric = 'IB_{}_8388608_Avg_{}'.format(ib_command, str(benchmark._args.ib_index)) + metric_list.append(metric) + for metric in metric_list: + assert (metric in benchmark.result) + assert (len(benchmark.result[metric]) == 1) + assert (isinstance(benchmark.result[metric][0], numbers.Number)) + + # Negative case - Add invalid raw output. + assert (benchmark._process_raw_result(0, 'Invalid raw output') is False) + + # Check basic information. + assert (benchmark.name == 'ib-loopback') + assert (benchmark.type == BenchmarkType.MICRO) + assert (benchmark._bin_name == 'run_perftest_loopback') + + # Check parameters specified in BenchmarkContext. + assert (benchmark._args.ib_index == 0) + assert (benchmark._args.numa == 0) + assert (benchmark._args.iters == 2000) + assert (benchmark._args.msg_size == 8388608) + assert (benchmark._args.commands == ['write'])