Benchmarks: Add Benchmark - Add ib traffic validation distributed benchmark (#215)
**Description** Add ib traffic validation distributed benchmark. **Major Revision** - Add ib traffic validation distributed benchmark, example and test
This commit is contained in:
Родитель
f15fdf7295
Коммит
54919424c3
|
@ -0,0 +1,24 @@
|
|||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT license.
|
||||
|
||||
"""Micro benchmark example for IB validation performance between nodes.
|
||||
|
||||
Commands to run:
|
||||
mpirun -np 2 -H node0:1,node1:1 -mca pml ob1 --mca btl ^openib \
|
||||
-mca btl_tcp_if_exclude lo,docker0 -mca coll_hcoll_enable 0 \
|
||||
-x LD_LIBRARY_PATH -x PATH python examples/benchmarks/ib_traffic_performance.py
|
||||
"""
|
||||
|
||||
from superbench.benchmarks import BenchmarkRegistry
|
||||
from superbench.common.utils import logger
|
||||
|
||||
if __name__ == '__main__':
|
||||
context = BenchmarkRegistry.create_benchmark_context('ib-traffic')
|
||||
|
||||
benchmark = BenchmarkRegistry.launch_benchmark(context)
|
||||
if benchmark:
|
||||
logger.info(
|
||||
'benchmark: {}, return code: {}, result: {}'.format(
|
||||
benchmark.name, benchmark.return_code, benchmark.result
|
||||
)
|
||||
)
|
|
@ -18,6 +18,7 @@ from superbench.benchmarks.micro_benchmarks.ib_loopback_performance import IBLoo
|
|||
from superbench.benchmarks.micro_benchmarks.cuda_nccl_bw_performance import CudaNcclBwBenchmark
|
||||
from superbench.benchmarks.micro_benchmarks.rocm_memory_bw_performance import RocmMemBwBenchmark
|
||||
from superbench.benchmarks.micro_benchmarks.rocm_gemm_flops_performance import RocmGemmFlopsBenchmark
|
||||
from superbench.benchmarks.micro_benchmarks.ib_validation_performance import IBBenchmark
|
||||
from superbench.benchmarks.micro_benchmarks.gpu_copy_bw_performance import GpuCopyBwBenchmark
|
||||
from superbench.benchmarks.micro_benchmarks.tcp_connectivity import TCPConnectivityBenchmark
|
||||
from superbench.benchmarks.micro_benchmarks.gpcnet_performance import GPCNetBenchmark
|
||||
|
@ -26,5 +27,5 @@ __all__ = [
|
|||
'MicroBenchmark', 'MicroBenchmarkWithInvoke', 'ShardingMatmul', 'ComputationCommunicationOverlap', 'KernelLaunch',
|
||||
'CublasBenchmark', 'CudnnBenchmark', 'GemmFlopsBenchmark', 'CudaGemmFlopsBenchmark', 'MemBwBenchmark',
|
||||
'CudaMemBwBenchmark', 'DiskBenchmark', 'IBLoopbackBenchmark', 'CudaNcclBwBenchmark', 'RocmMemBwBenchmark',
|
||||
'RocmGemmFlopsBenchmark', 'GpuCopyBwBenchmark', 'TCPConnectivityBenchmark', 'GPCNetBenchmark'
|
||||
'RocmGemmFlopsBenchmark', 'IBBenchmark', 'GpuCopyBwBenchmark', 'TCPConnectivityBenchmark', 'GPCNetBenchmark'
|
||||
]
|
||||
|
|
|
@ -0,0 +1,379 @@
|
|||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT license.
|
||||
|
||||
"""Module of the IB performance benchmarks."""
|
||||
|
||||
import os
|
||||
|
||||
from superbench.common.utils import logger
|
||||
from superbench.common.utils import network
|
||||
from superbench.benchmarks import BenchmarkRegistry, ReturnCode
|
||||
from superbench.common.devices import GPU
|
||||
from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke
|
||||
|
||||
|
||||
class IBBenchmark(MicroBenchmarkWithInvoke):
|
||||
"""The IB validation performance benchmark class."""
|
||||
def __init__(self, name, parameters=''):
|
||||
"""Constructor.
|
||||
|
||||
Args:
|
||||
name (str): benchmark name.
|
||||
parameters (str): benchmark parameters.
|
||||
"""
|
||||
super().__init__(name, parameters)
|
||||
|
||||
self._bin_name = 'ib_validation'
|
||||
self.__support_ib_commands = [
|
||||
'ib_write_bw', 'ib_read_bw', 'ib_send_bw', 'ib_write_lat', 'ib_read_lat', 'ib_send_lat'
|
||||
]
|
||||
self.__patterns = ['one-to-one', 'one-to-many', 'many-to-one']
|
||||
self.__config_path = os.getcwd() + '/config.txt'
|
||||
self.__config = []
|
||||
|
||||
def add_parser_arguments(self):
|
||||
"""Add the specified arguments."""
|
||||
super().add_parser_arguments()
|
||||
|
||||
self._parser.add_argument(
|
||||
'--ib_index',
|
||||
type=int,
|
||||
default=0,
|
||||
required=False,
|
||||
help='The index of ib device.',
|
||||
)
|
||||
self._parser.add_argument(
|
||||
'--iters',
|
||||
type=int,
|
||||
default=5000,
|
||||
required=False,
|
||||
help='The iterations of running ib command',
|
||||
)
|
||||
self._parser.add_argument(
|
||||
'--msg_size',
|
||||
type=int,
|
||||
default=None,
|
||||
required=False,
|
||||
help='The message size of running ib command, e.g., 8388608.',
|
||||
)
|
||||
self._parser.add_argument(
|
||||
'--commands',
|
||||
type=str,
|
||||
nargs='+',
|
||||
default=['ib_write_bw'],
|
||||
help='The ib command used to run, e.g., {}.'.format(' '.join(self.__support_ib_commands)),
|
||||
)
|
||||
self._parser.add_argument(
|
||||
'--pattern',
|
||||
type=str,
|
||||
default='one-to-one',
|
||||
required=False,
|
||||
help='Test IB traffic pattern type, e.g., {}.'.format(''.join(self.__patterns)),
|
||||
)
|
||||
self._parser.add_argument(
|
||||
'--config',
|
||||
type=str,
|
||||
default=None,
|
||||
required=False,
|
||||
help='The path of config file on the target machines',
|
||||
)
|
||||
self._parser.add_argument(
|
||||
'--bidirectional', action='store_true', default=False, help='Measure bidirectional bandwidth.'
|
||||
)
|
||||
self._parser.add_argument(
|
||||
'--gpu_index', type=int, default=None, required=False, help='Test Use GPUDirect with the gpu index.'
|
||||
)
|
||||
self._parser.add_argument(
|
||||
'--hostfile',
|
||||
type=str,
|
||||
default='/root/hostfile',
|
||||
required=False,
|
||||
help='The path of hostfile on the target machines',
|
||||
)
|
||||
|
||||
def __one_to_many(self, n):
|
||||
"""Generate one-to-many pattern config.
|
||||
|
||||
There are a total of n rounds
|
||||
In each round, The i-th participant will be paired as a client with the remaining n-1 servers.
|
||||
|
||||
Args:
|
||||
n (int): the number of participants.
|
||||
|
||||
Returns:
|
||||
list: the generated config list, each item in the list is a str like "0,1;2,3".
|
||||
"""
|
||||
config = []
|
||||
for client in range(n):
|
||||
row = []
|
||||
for server in range(n):
|
||||
if server != client:
|
||||
pair = '{},{}'.format(server, client)
|
||||
row.append(pair)
|
||||
row = ';'.join(row)
|
||||
config.append(row)
|
||||
return config
|
||||
|
||||
def __many_to_one(self, n):
|
||||
"""Generate many-to-one pattern config.
|
||||
|
||||
There are a total of n rounds
|
||||
In each round, The i-th participant will be paired as a server with the remaining n-1 clients.
|
||||
|
||||
Args:
|
||||
n (int): the number of participants.
|
||||
|
||||
Returns:
|
||||
list: the generated config list, each item in the list is a str like "0,1;2,3".
|
||||
"""
|
||||
config = []
|
||||
for server in range(n):
|
||||
row = []
|
||||
for client in range(n):
|
||||
if server != client:
|
||||
pair = '{},{}'.format(server, client)
|
||||
row.append(pair)
|
||||
row = ';'.join(row)
|
||||
config.append(row)
|
||||
return config
|
||||
|
||||
def __fully_one_to_one(self, n):
|
||||
"""Generate one-to-one pattern config.
|
||||
|
||||
One-to-one means that each participant plays every other participant once.
|
||||
The algorithm refers circle method of Round-robin tournament in
|
||||
https://en.wikipedia.org/wiki/Round-robin_tournament.
|
||||
if n is even, there are a total of n-1 rounds, with n/2 pair of 2 unique participants in each round.
|
||||
If n is odd, there will be n rounds, each with n-1/2 pairs, and one participant rotating empty in that round.
|
||||
In each round, pair up two by two from the beginning to the middle as (begin, end),(begin+1,end-1)...
|
||||
Then, all the participants except the beginning shift left one position, and repeat the previous step.
|
||||
|
||||
Args:
|
||||
n (int): the number of participants.
|
||||
|
||||
Returns:
|
||||
list: the generated config list, each item in the list is a str like "0,1;2,3".
|
||||
"""
|
||||
config = []
|
||||
candidates = list(range(n))
|
||||
# Add a fake participant if n is odd
|
||||
if n % 2 == 1:
|
||||
candidates.append(-1)
|
||||
count = len(candidates)
|
||||
non_moving = [candidates[0]]
|
||||
for _ in range(count - 1):
|
||||
pairs = [
|
||||
'{},{}'.format(candidates[i], candidates[count - i - 1]) for i in range(0, count // 2)
|
||||
if candidates[i] != -1 and candidates[count - i - 1] != -1
|
||||
]
|
||||
row = ';'.join(pairs)
|
||||
config.append(row)
|
||||
robin = candidates[2:] + candidates[1:2]
|
||||
candidates = non_moving + robin
|
||||
return config
|
||||
|
||||
def gen_traffic_pattern(self, n, mode, config_file_path):
|
||||
"""Generate traffic pattern into config file.
|
||||
|
||||
Args:
|
||||
n (int): the number of nodes.
|
||||
mode (str): the traffic mode, including 'one-to-one', 'one-to-many', 'many-to-one'.
|
||||
config_file_path (str): the path of config file to generate.
|
||||
"""
|
||||
config = []
|
||||
if mode == 'one-to-many':
|
||||
config = self.__one_to_many(n)
|
||||
elif mode == 'many-to-one':
|
||||
config = self.__many_to_one(n)
|
||||
elif mode == 'one-to-one':
|
||||
config = self.__fully_one_to_one(n)
|
||||
with open(config_file_path, 'w') as f:
|
||||
for line in config:
|
||||
f.write(line + '\n')
|
||||
|
||||
def __prepare_config(self, node_num):
|
||||
"""Prepare and read config file.
|
||||
|
||||
Args:
|
||||
node_num (int): the number of nodes.
|
||||
|
||||
Returns:
|
||||
True if the config is not empty and valid.
|
||||
"""
|
||||
try:
|
||||
# Generate the config file if not define
|
||||
if self._args.config is None:
|
||||
self.gen_traffic_pattern(node_num, self._args.pattern, self.__config_path)
|
||||
# Use the config file defined in args
|
||||
else:
|
||||
self.__config_path = self._args.config
|
||||
# Read the config file and check if it's empty and valid
|
||||
with open(self.__config_path, 'r') as f:
|
||||
lines = f.readlines()
|
||||
for line in lines:
|
||||
pairs = line.strip().strip(';').split(';')
|
||||
# Check format of config
|
||||
for pair in pairs:
|
||||
pair = pair.split(',')
|
||||
if len(pair) != 2:
|
||||
return False
|
||||
pair[0] = int(pair[0])
|
||||
pair[1] = int(pair[1])
|
||||
self.__config.extend(pairs)
|
||||
except BaseException as e:
|
||||
self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
|
||||
logger.error('Failed to generate and check config - benchmark: {}, message: {}.'.format(self._name, str(e)))
|
||||
return False
|
||||
if len(self.__config) == 0:
|
||||
self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
|
||||
logger.error('No valid config - benchmark: {}.'.format(self._name))
|
||||
return False
|
||||
return True
|
||||
|
||||
def __prepare_general_ib_command_params(self):
|
||||
"""Prepare general params for ib commands.
|
||||
|
||||
Returns:
|
||||
Str of ib command params if arguments are valid, otherwise False.
|
||||
"""
|
||||
# Format the ib command type
|
||||
self._args.commands = [command.lower() for command in self._args.commands]
|
||||
# Add message size for ib command
|
||||
msg_size = ''
|
||||
if self._args.msg_size is None:
|
||||
msg_size = '-a'
|
||||
else:
|
||||
msg_size = '-s ' + str(self._args.msg_size)
|
||||
# Add GPUDirect for ib command
|
||||
gpu_enable = ''
|
||||
if self._args.gpu_index:
|
||||
gpu = GPU()
|
||||
if gpu.vendor == 'nvidia':
|
||||
gpu_enable = ' --use_cuda={gpu_index}'.format(gpu_index=str(self._args.gpu_index))
|
||||
elif gpu.vendor == 'amd':
|
||||
gpu_enable = ' --use_rocm={gpu_index}'.format(gpu_index=str(self._args.gpu_index))
|
||||
else:
|
||||
self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
|
||||
logger.error('No GPU found - benchmark: {}'.format(self._name))
|
||||
return False
|
||||
# Generate ib command params
|
||||
try:
|
||||
command_params = '-F --iters={iter} -d {device} {size}{gpu}'.format(
|
||||
iter=str(self._args.iters),
|
||||
device=network.get_ib_devices()[self._args.ib_index].split(':')[0],
|
||||
size=msg_size,
|
||||
gpu=gpu_enable
|
||||
)
|
||||
except BaseException as e:
|
||||
self._result.set_return_code(ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE)
|
||||
logger.error('Getting ib devices failure - benchmark: {}, message: {}.'.format(self._name, str(e)))
|
||||
return False
|
||||
return command_params
|
||||
|
||||
def _preprocess(self):
|
||||
"""Preprocess/preparation operations before the benchmarking.
|
||||
|
||||
Return:
|
||||
True if _preprocess() succeed.
|
||||
"""
|
||||
if not super()._preprocess():
|
||||
return False
|
||||
|
||||
# Check MPI environment
|
||||
self._args.pattern = self._args.pattern.lower()
|
||||
if os.getenv('OMPI_COMM_WORLD_SIZE'):
|
||||
node_num = int(os.getenv('OMPI_COMM_WORLD_SIZE'))
|
||||
else:
|
||||
self._result.set_return_code(ReturnCode.MICROBENCHMARK_MPI_INIT_FAILURE)
|
||||
logger.error('No MPI environment - benchmark: {}.'.format(self._name))
|
||||
return False
|
||||
|
||||
# Generate and check config
|
||||
if not self.__prepare_config(node_num):
|
||||
return False
|
||||
|
||||
# Prepare general params for ib commands
|
||||
command_params = self.__prepare_general_ib_command_params()
|
||||
if not command_params:
|
||||
return False
|
||||
# Generate commands
|
||||
for ib_command in self._args.commands:
|
||||
if ib_command not in self.__support_ib_commands:
|
||||
self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
|
||||
logger.error(
|
||||
'Unsupported ib command - benchmark: {}, command: {}, expected: {}.'.format(
|
||||
self._name, ib_command, ' '.join(self.__support_ib_commands)
|
||||
)
|
||||
)
|
||||
return False
|
||||
else:
|
||||
ib_command_prefix = '{command} {command_params}'.format(
|
||||
command=ib_command, command_params=command_params
|
||||
)
|
||||
if 'bw' in ib_command and self._args.bidirectional:
|
||||
ib_command_prefix += ' -b'
|
||||
|
||||
command = os.path.join(self._args.bin_dir, self._bin_name)
|
||||
command += ' --hostfile ' + self._args.hostfile
|
||||
command += ' --cmd_prefix ' + '\"' + ib_command_prefix + '\"'
|
||||
command += ' --input_config ' + self.__config_path
|
||||
self._commands.append(command)
|
||||
|
||||
return True
|
||||
|
||||
def _process_raw_result(self, cmd_idx, raw_output): # noqa: C901
|
||||
"""Function to parse raw results and save the summarized results.
|
||||
|
||||
self._result.add_raw_data() and self._result.add_result() need to be called to save the results.
|
||||
|
||||
Args:
|
||||
cmd_idx (int): the index of command corresponding with the raw_output.
|
||||
raw_output (str): raw output string of the micro-benchmark.
|
||||
|
||||
Return:
|
||||
True if the raw output string is valid and result can be extracted.
|
||||
"""
|
||||
self._result.add_raw_data('raw_output_' + self._args.commands[cmd_idx], raw_output)
|
||||
|
||||
# If it's invoked by MPI and rank is not 0, no result is expected
|
||||
if os.getenv('OMPI_COMM_WORLD_RANK'):
|
||||
rank = int(os.getenv('OMPI_COMM_WORLD_RANK'))
|
||||
if rank > 0:
|
||||
return True
|
||||
|
||||
valid = False
|
||||
content = raw_output.splitlines()
|
||||
line_index = 0
|
||||
config_index = 0
|
||||
try:
|
||||
result_index = -1
|
||||
for index, line in enumerate(content):
|
||||
if 'results' in line:
|
||||
result_index = index + 1
|
||||
break
|
||||
if result_index == -1:
|
||||
valid = False
|
||||
else:
|
||||
content = content[result_index:]
|
||||
for line in content:
|
||||
line = list(filter(None, line.strip().split(',')))
|
||||
for item in line:
|
||||
metric = '{line}-{pair}'.format(line=str(line_index), pair=self.__config[config_index])
|
||||
self._result.add_result(metric, float(item))
|
||||
valid = True
|
||||
config_index += 1
|
||||
line_index += 1
|
||||
except Exception:
|
||||
valid = False
|
||||
if valid is False or config_index != len(self.__config):
|
||||
logger.error(
|
||||
'The result format is invalid - round: {}, benchmark: {}, raw output: {}.'.format(
|
||||
self._curr_run_index, self._name, raw_output
|
||||
)
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
BenchmarkRegistry.register_benchmark('ib-traffic', IBBenchmark)
|
|
@ -2,7 +2,7 @@
|
|||
# Licensed under the MIT License.
|
||||
|
||||
cmake_minimum_required(VERSION 3.18)
|
||||
project(ib_mpi)
|
||||
project(ib_validation)
|
||||
|
||||
set(CMAKE_CXX_STANDARD 14)
|
||||
|
||||
|
@ -10,7 +10,7 @@ set(CMAKE_CXX_STANDARD 14)
|
|||
set(GCC_COVERAGE_COMPILE_FLAGS "-Wall -pedantic -lm -O3 -funroll-loops -fopenmp")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${GCC_COVERAGE_COMPILE_FLAGS}")
|
||||
|
||||
add_executable(ib_mpi ib_validation_performance.cc)
|
||||
add_executable(ib_validation ib_validation_performance.cc)
|
||||
|
||||
# MPI
|
||||
find_package(MPI REQUIRED)
|
||||
|
@ -18,10 +18,11 @@ find_package(MPI REQUIRED)
|
|||
# Boost
|
||||
execute_process(COMMAND wget -O boost_1_71_0.tar.gz https://boostorg.jfrog.io/artifactory/main/release/1.71.0/source/boost_1_71_0.tar.gz WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/)
|
||||
execute_process(COMMAND tar xzvf boost_1_71_0.tar.gz WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/)
|
||||
execute_process(COMMAND ./bootstrap.sh --prefix=build WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/boost_1_71_0/)
|
||||
execute_process(COMMAND ./b2 install WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/boost_1_71_0/)
|
||||
list(APPEND CMAKE_PREFIX_PATH ${CMAKE_CURRENT_SOURCE_DIR}/boost_1_71_0/build/)
|
||||
execute_process(COMMAND ./bootstrap.sh WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/boost_1_71_0/)
|
||||
execute_process(COMMAND ./b2 --with-program_options install WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/boost_1_71_0/)
|
||||
list(APPEND CMAKE_PREFIX_PATH ${CMAKE_CURRENT_SOURCE_DIR}/boost_1_71_0/build/)
|
||||
|
||||
find_package(Boost REQUIRED COMPONENTS system thread program_options)
|
||||
find_package(Boost REQUIRED COMPONENTS program_options)
|
||||
include_directories(${Boost_INCLUDE_DIRS})
|
||||
target_link_libraries(ib_mpi PUBLIC MPI::MPI_CXX ${Boost_LIBRARIES})
|
||||
target_link_libraries(ib_validation PUBLIC MPI::MPI_CXX ${Boost_LIBRARIES})
|
||||
install(TARGETS ib_validation RUNTIME DESTINATION bin)
|
||||
|
|
|
@ -29,6 +29,7 @@ class ReturnCode(Enum):
|
|||
MICROBENCHMARK_RESULT_PARSING_FAILURE = 33
|
||||
MICROBENCHMARK_UNSUPPORTED_ARCHITECTURE = 34
|
||||
MICROBENCHMARK_DEVICE_GETTING_FAILURE = 35
|
||||
MICROBENCHMARK_MPI_INIT_FAILURE = 36
|
||||
# Return codes related to docker benchmarks.
|
||||
DOCKERBENCHMARK_IMAGE_NOT_SET = 50
|
||||
DOCKERBENCHMARK_CONTAINER_NOT_SET = 51
|
||||
|
|
|
@ -0,0 +1,246 @@
|
|||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
"""Tests for ib-traffic benchmark."""
|
||||
|
||||
import os
|
||||
import numbers
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
from collections import defaultdict
|
||||
|
||||
from superbench.benchmarks import BenchmarkRegistry, Platform, BenchmarkType, ReturnCode
|
||||
|
||||
|
||||
class IBBenchmarkTest(unittest.TestCase):
|
||||
"""Tests for IBBenchmark benchmark."""
|
||||
def setUp(self):
|
||||
"""Method called to prepare the test fixture."""
|
||||
# Create fake binary file just for testing.
|
||||
os.environ['SB_MICRO_PATH'] = '/tmp/superbench'
|
||||
binary_path = Path(os.getenv('SB_MICRO_PATH'), 'bin')
|
||||
binary_path.mkdir(parents=True, exist_ok=True)
|
||||
self.__binary_file = Path(binary_path, 'ib_validation')
|
||||
self.__binary_file.touch(mode=0o755, exist_ok=True)
|
||||
|
||||
def tearDown(self):
|
||||
"""Method called after the test method has been called and the result recorded."""
|
||||
self.__binary_file.unlink()
|
||||
|
||||
def test_generate_config(self): # noqa: C901
|
||||
"""Test util functions ."""
|
||||
test_config_file = 'test_gen_config.txt'
|
||||
|
||||
def read_config(filename):
|
||||
config = []
|
||||
with open(filename, 'r') as f:
|
||||
lines = f.readlines()
|
||||
for line in lines:
|
||||
pairs = line.strip().split(';')
|
||||
config.append(pairs)
|
||||
return config
|
||||
|
||||
expected_config = {}
|
||||
expected_config['one-to-one'] = [['0,3', '1,2'], ['0,1', '2,3'], ['0,2', '3,1']]
|
||||
expected_config['many-to-one'] = [
|
||||
['0,1', '0,2', '0,3'], ['1,0', '1,2', '1,3'], ['2,0', '2,1', '2,3'], ['3,0', '3,1', '3,2']
|
||||
]
|
||||
expected_config['one-to-many'] = [
|
||||
['1,0', '2,0', '3,0'], ['0,1', '2,1', '3,1'], ['0,2', '1,2', '3,2'], ['0,3', '1,3', '2,3']
|
||||
]
|
||||
benchmark_name = 'ib-traffic'
|
||||
(benchmark_class,
|
||||
predefine_params) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CPU)
|
||||
assert (benchmark_class)
|
||||
benchmark = benchmark_class(benchmark_name)
|
||||
# Small scale test
|
||||
node_num = 4
|
||||
for m in ['one-to-one', 'one-to-many', 'many-to-one']:
|
||||
benchmark.gen_traffic_pattern(node_num, m, test_config_file)
|
||||
config = read_config(test_config_file)
|
||||
assert (config == expected_config[m])
|
||||
# Large scale test
|
||||
node_num = 1000
|
||||
# check for 'one-to-many' and 'many-to-one'
|
||||
# In Nth step, the count of N is (N-1), others are all 1
|
||||
for m in ['one-to-many', 'many-to-one']:
|
||||
benchmark.gen_traffic_pattern(node_num, m, test_config_file)
|
||||
config = read_config(test_config_file)
|
||||
assert (len(config) == node_num)
|
||||
assert (len(config[0]) == node_num - 1)
|
||||
for step in range(node_num):
|
||||
server = defaultdict(int)
|
||||
client = defaultdict(int)
|
||||
for pair in config[step]:
|
||||
pair = pair.split(',')
|
||||
server[int(pair[0])] += 1
|
||||
client[int(pair[1])] += 1
|
||||
for i in range(node_num):
|
||||
if m == 'many-to-one':
|
||||
if i == step:
|
||||
assert (server[i] == node_num - 1)
|
||||
else:
|
||||
assert (client[i] == 1)
|
||||
elif m == 'one-to-many':
|
||||
if i == step:
|
||||
assert (client[i] == node_num - 1)
|
||||
else:
|
||||
assert (server[i] == 1)
|
||||
# check for 'one-to-one'
|
||||
# Each index appears 1 time in each step
|
||||
# Each index has been combined once with all the remaining indexes
|
||||
benchmark.gen_traffic_pattern(node_num, 'one-to-one', test_config_file)
|
||||
config = read_config(test_config_file)
|
||||
if node_num % 2 == 1:
|
||||
assert (len(config) == node_num)
|
||||
assert (len(config[0]) == node_num // 2)
|
||||
else:
|
||||
assert (len(config) == node_num - 1)
|
||||
assert (len(config[0]) == node_num // 2)
|
||||
test_pairs = defaultdict(list)
|
||||
for step in range(len(config)):
|
||||
node = defaultdict(int)
|
||||
for pair in config[step]:
|
||||
pair = pair.split(',')
|
||||
node[int(pair[0])] += 1
|
||||
node[int(pair[1])] += 1
|
||||
test_pairs[int(pair[0])].append(int(pair[1]))
|
||||
test_pairs[int(pair[1])].append(int(pair[0]))
|
||||
for index in node:
|
||||
assert (node[index] == 1)
|
||||
for node in range(node_num):
|
||||
assert (sorted(test_pairs[node]) == [(i) for i in range(node_num) if i != node])
|
||||
|
||||
Path(test_config_file).unlink()
|
||||
|
||||
@mock.patch('superbench.common.utils.network.get_ib_devices')
|
||||
def test_ib_traffic_performance(self, mock_ib_devices):
|
||||
"""Test ib-traffic benchmark."""
|
||||
# Test without ib devices
|
||||
# Check registry.
|
||||
benchmark_name = 'ib-traffic'
|
||||
(benchmark_class,
|
||||
predefine_params) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CPU)
|
||||
assert (benchmark_class)
|
||||
|
||||
# Check preprocess
|
||||
# Negative cases
|
||||
parameters = '--ib_index 0 --iters 2000 --pattern one-to-one'
|
||||
benchmark = benchmark_class(benchmark_name, parameters=parameters)
|
||||
mock_ib_devices.return_value = None
|
||||
ret = benchmark._preprocess()
|
||||
assert (ret is False)
|
||||
assert (benchmark.return_code == ReturnCode.MICROBENCHMARK_MPI_INIT_FAILURE)
|
||||
|
||||
os.environ['OMPI_COMM_WORLD_SIZE'] = '4'
|
||||
parameters = '--ib_index 0 --iters 2000 --pattern one-to-one'
|
||||
benchmark = benchmark_class(benchmark_name, parameters=parameters)
|
||||
mock_ib_devices.return_value = None
|
||||
ret = benchmark._preprocess()
|
||||
assert (ret is False)
|
||||
assert (benchmark.return_code == ReturnCode.MICROBENCHMARK_DEVICE_GETTING_FAILURE)
|
||||
|
||||
# Positive cases
|
||||
os.environ['OMPI_COMM_WORLD_SIZE'] = '3'
|
||||
parameters = '--ib_index 0 --iters 2000 --pattern one-to-one'
|
||||
benchmark = benchmark_class(benchmark_name, parameters=parameters)
|
||||
mock_ib_devices.return_value = ['mlx5_0']
|
||||
ret = benchmark._preprocess()
|
||||
assert (ret is True)
|
||||
|
||||
# Generate config
|
||||
parameters = '--ib_index 0 --iters 2000 --msg_size 33554432'
|
||||
benchmark = benchmark_class(benchmark_name, parameters=parameters)
|
||||
os.environ['OMPI_COMM_WORLD_SIZE'] = '4'
|
||||
mock_ib_devices.return_value = ['mlx5_0']
|
||||
ret = benchmark._preprocess()
|
||||
Path('config.txt').unlink()
|
||||
assert (ret)
|
||||
expect_command = 'ib_validation --hostfile /root/hostfile --cmd_prefix "ib_write_bw -F ' + \
|
||||
'--iters=2000 -d mlx5_0 -s 33554432" --input_config ' + os.getcwd() + '/config.txt'
|
||||
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
|
||||
assert (command == expect_command)
|
||||
|
||||
# Custom config
|
||||
config = ['0,1', '1,0;0,1', '0,1;1,0', '1,0;0,1']
|
||||
with open('test_config.txt', 'w') as f:
|
||||
for line in config:
|
||||
f.write(line + '\n')
|
||||
parameters = '--ib_index 0 --iters 2000 --msg_size 33554432 --config test_config.txt'
|
||||
benchmark = benchmark_class(benchmark_name, parameters=parameters)
|
||||
os.environ['OMPI_COMM_WORLD_SIZE'] = '2'
|
||||
mock_ib_devices.return_value = ['mlx5_0']
|
||||
ret = benchmark._preprocess()
|
||||
Path('test_config.txt').unlink()
|
||||
assert (ret)
|
||||
expect_command = 'ib_validation --hostfile /root/hostfile --cmd_prefix "ib_write_bw -F ' + \
|
||||
'--iters=2000 -d mlx5_0 -s 33554432" --input_config test_config.txt'
|
||||
|
||||
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
|
||||
assert (command == expect_command)
|
||||
raw_output_0 = """
|
||||
The predix of cmd to run is: ib_write_bw -a -d ibP257p0s0
|
||||
Load the config file from: config.txt
|
||||
Output will be saved to:
|
||||
config:
|
||||
0,1
|
||||
1,0;0,1
|
||||
0,1;1,0
|
||||
1,0;0,1
|
||||
config end
|
||||
results from rank ROOT_RANK:
|
||||
23452.6,
|
||||
22212.6,22433
|
||||
22798.8,23436.3
|
||||
23435.3,22766.5
|
||||
"""
|
||||
raw_output_1 = """
|
||||
The predix of cmd to run is: ib_write_bw -F --iters=2000 -d mlx5_0 -s 33554432
|
||||
Load the config file from: config.txt
|
||||
Output will be saved to:
|
||||
config:
|
||||
0,1
|
||||
1,0;0,1
|
||||
0,1;1,0
|
||||
1,0;0,1
|
||||
config end
|
||||
results from rank ROOT_RANK:
|
||||
23452.6,
|
||||
22212.6,22433,
|
||||
22798.8,23436.3,
|
||||
"""
|
||||
raw_output_2 = """
|
||||
--------------------------------------------------------------------------
|
||||
mpirun was unable to launch the specified application as it could not access
|
||||
or execute an executable:
|
||||
|
||||
while attempting to start process rank 0.
|
||||
--------------------------------------------------------------------------
|
||||
2 total processes failed to start
|
||||
"""
|
||||
|
||||
# Check function process_raw_data.
|
||||
# Positive case - valid raw output.
|
||||
os.environ['OMPI_COMM_WORLD_RANK'] = '0'
|
||||
assert (benchmark._process_raw_result(0, raw_output_0))
|
||||
|
||||
for metric in benchmark.result:
|
||||
assert (metric in benchmark.result)
|
||||
assert (len(benchmark.result[metric]) == 1)
|
||||
assert (isinstance(benchmark.result[metric][0], numbers.Number))
|
||||
# Negative case - valid raw output.
|
||||
assert (benchmark._process_raw_result(0, raw_output_1) is False)
|
||||
assert (benchmark._process_raw_result(0, raw_output_2) is False)
|
||||
os.environ.pop('OMPI_COMM_WORLD_RANK')
|
||||
|
||||
# Check basic information.
|
||||
assert (benchmark.name == 'ib-traffic')
|
||||
assert (benchmark.type == BenchmarkType.MICRO)
|
||||
assert (benchmark._bin_name == 'ib_validation')
|
||||
|
||||
# Check parameters specified in BenchmarkContext.
|
||||
assert (benchmark._args.ib_index == 0)
|
||||
assert (benchmark._args.iters == 2000)
|
||||
assert (benchmark._args.msg_size == 33554432)
|
||||
assert (benchmark._args.commands == ['ib_write_bw'])
|
Загрузка…
Ссылка в новой задаче