diff --git a/superbench/benchmarks/micro_benchmarks/ib_loopback_performance.py b/superbench/benchmarks/micro_benchmarks/ib_loopback_performance.py index 703e19a7..9cc3eba5 100644 --- a/superbench/benchmarks/micro_benchmarks/ib_loopback_performance.py +++ b/superbench/benchmarks/micro_benchmarks/ib_loopback_performance.py @@ -4,6 +4,7 @@ """Module of the IB loopback benchmarks.""" import os +import socket from pathlib import Path from superbench.common.utils import logger @@ -47,8 +48,14 @@ class IBLoopbackBenchmark(MicroBenchmarkWithInvoke): super().__init__(name, parameters) self._bin_name = 'run_perftest_loopback' + self.__sock_fds = [] self.__support_ib_commands = {'write': 'ib_write_bw', 'read': 'ib_read_bw', 'send': 'ib_send_bw'} + def __del__(self): + """Destructor.""" + for fd in self.__sock_fds: + fd.close() + def add_parser_arguments(self): """Add the specified arguments.""" super().add_parser_arguments() @@ -121,10 +128,7 @@ class IBLoopbackBenchmark(MicroBenchmarkWithInvoke): Return: True if _preprocess() succeed. """ - if not super()._preprocess(): - return False - - if not self.__get_arguments_from_env(): + if not super()._preprocess() or not self.__get_arguments_from_env(): return False # Format the arguments @@ -146,33 +150,39 @@ class IBLoopbackBenchmark(MicroBenchmarkWithInvoke): ) ) return False - else: - try: - command = os.path.join(self._args.bin_dir, self._bin_name) - numa_cores = get_numa_cores(self._args.numa) - if len(numa_cores) < 2: - self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) - logger.error('Getting numa core devices failure - benchmark: {}.'.format(self._name)) - return False - if len(numa_cores) >= 4: - server_core = int(numa_cores[-1]) - client_core = int(numa_cores[-3]) - else: - server_core = int(numa_cores[-1]) - client_core = int(numa_cores[-2]) - command += ' ' + str(server_core) + ' ' + str(client_core) - command += ' ' + os.path.join(self._args.bin_dir, self.__support_ib_commands[ib_command]) - command += command_mode + ' -F' - command += ' --iters=' + str(self._args.iters) - command += ' -d ' + network.get_ib_devices()[self._args.ib_index].split(':')[0] - command += ' -p ' + str(network.get_free_port()) - command += ' -x ' + str(self._args.gid_index) - command += ' --report_gbits' - self._commands.append(command) - except BaseException as e: - self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) - logger.error('Getting ib devices failure - benchmark: {}, message: {}.'.format(self._name, str(e))) - return False + + try: + self.__sock_fds.append(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) + # grep SO_REUSE /usr/include/asm-generic/socket.h + self.__sock_fds[-1].setsockopt(socket.SOL_SOCKET, getattr(socket, 'SO_REUSEADDR', 2), 1) + self.__sock_fds[-1].setsockopt(socket.SOL_SOCKET, getattr(socket, 'SO_REUSEPORT', 15), 1) + self.__sock_fds[-1].bind(('127.0.0.1', 0)) + except OSError as e: + self._result.set_return_code(ReturnCode.RUNTIME_EXCEPTION_ERROR) + logger.error('Error when binding port - benchmark: %s, message: %s.', self._name, e) + return False + try: + ib_devices = network.get_ib_devices() + except BaseException as e: + self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) + logger.error('Getting ib devices failure - benchmark: {}, message: {}.'.format(self._name, str(e))) + return False + numa_cores = get_numa_cores(self._args.numa) + if not numa_cores or len(numa_cores) < 2: + self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE) + logger.error('Getting numa core devices failure - benchmark: {}.'.format(self._name)) + return False + command = os.path.join(self._args.bin_dir, self._bin_name) + command += ' ' + str(numa_cores[-1]) + ' ' + str(numa_cores[-3 + int((len(numa_cores) < 4))]) + command += ' ' + os.path.join(self._args.bin_dir, self.__support_ib_commands[ib_command]) + command += command_mode + ' -F' + command += ' --iters=' + str(self._args.iters) + command += ' -d ' + ib_devices[self._args.ib_index].split(':')[0] + command += ' -p ' + str(self.__sock_fds[-1].getsockname()[1]) + command += ' -x ' + str(self._args.gid_index) + command += ' --report_gbits' + self._commands.append(command) + return True def _process_raw_result(self, cmd_idx, raw_output): diff --git a/tests/benchmarks/micro_benchmarks/test_ib_loopback_performance.py b/tests/benchmarks/micro_benchmarks/test_ib_loopback_performance.py index 7c9ec698..3372adbb 100644 --- a/tests/benchmarks/micro_benchmarks/test_ib_loopback_performance.py +++ b/tests/benchmarks/micro_benchmarks/test_ib_loopback_performance.py @@ -37,10 +37,9 @@ class IBLoopbackBenchmarkTest(BenchmarkTestCase, unittest.TestCase): assert (isinstance(numa_cores[i], numbers.Number)) @decorator.load_data('tests/data/ib_loopback_all_sizes.log') - @mock.patch('superbench.common.utils.network.get_free_port') @mock.patch('superbench.benchmarks.micro_benchmarks.ib_loopback_performance.get_numa_cores') @mock.patch('superbench.common.utils.network.get_ib_devices') - def test_ib_loopback_all_sizes(self, raw_output, mock_ib_devices, mock_numa_cores, mock_port): + def test_ib_loopback_all_sizes(self, raw_output, mock_ib_devices, mock_numa_cores): """Test ib-loopback benchmark for all sizes.""" # Test without ib devices # Check registry. @@ -69,15 +68,15 @@ class IBLoopbackBenchmarkTest(BenchmarkTestCase, unittest.TestCase): mock_ib_devices.return_value = ['mlx5_0'] mock_numa_cores.return_value = [0, 1, 2, 3] - mock_port.return_value = 10000 os.environ['PROC_RANK'] = '0' os.environ['IB_DEVICES'] = '0,2,4,6' os.environ['NUMA_NODES'] = '1,0,3,2' ret = benchmark._preprocess() assert (ret) + port = benchmark._IBLoopbackBenchmark__sock_fds[-1].getsockname()[1] expect_command = 'run_perftest_loopback 3 1 ' + benchmark._args.bin_dir + \ - '/ib_write_bw -a -F --iters=2000 -d mlx5_0 -p 10000 -x 0 --report_gbits' + f'/ib_write_bw -a -F --iters=2000 -d mlx5_0 -p {port} -x 0 --report_gbits' command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1] assert (command == expect_command) @@ -110,10 +109,9 @@ class IBLoopbackBenchmarkTest(BenchmarkTestCase, unittest.TestCase): assert (benchmark._args.commands == ['write']) @decorator.load_data('tests/data/ib_loopback_8M_size.log') - @mock.patch('superbench.common.utils.network.get_free_port') @mock.patch('superbench.benchmarks.micro_benchmarks.ib_loopback_performance.get_numa_cores') @mock.patch('superbench.common.utils.network.get_ib_devices') - def test_ib_loopback_8M_size(self, raw_output, mock_ib_devices, mock_numa_cores, mock_port): + def test_ib_loopback_8M_size(self, raw_output, mock_ib_devices, mock_numa_cores): """Test ib-loopback benchmark for 8M size.""" # Test without ib devices # Check registry. @@ -142,12 +140,12 @@ class IBLoopbackBenchmarkTest(BenchmarkTestCase, unittest.TestCase): mock_ib_devices.return_value = ['mlx5_0'] mock_numa_cores.return_value = [0, 1, 2, 3] - mock_port.return_value = 10000 ret = benchmark._preprocess() assert (ret) + port = benchmark._IBLoopbackBenchmark__sock_fds[-1].getsockname()[1] expect_command = 'run_perftest_loopback 3 1 ' + benchmark._args.bin_dir + \ - '/ib_write_bw -s 8388608 -F --iters=2000 -d mlx5_0 -p 10000 -x 0 --report_gbits' + f'/ib_write_bw -s 8388608 -F --iters=2000 -d mlx5_0 -p {port} -x 0 --report_gbits' command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1] assert (command == expect_command)