Benchmarks: micro benchmark - Support cpu-gpu and gpu-cpu in ib-validation (#581)

**Description**
Benchmarks: micro benchmark - Support cpu-gpu and gpu-cpu in
ib-validation

**Major Revision**
- Support cpu-gpu and gpu-cpu in ib-validation


**Minor Revision**
- support multi msg size, multi direction, multi ib commands in
ib-validation
This commit is contained in:
Yuting Jiang 2023-12-04 22:20:46 +08:00 коммит произвёл GitHub
Родитель 028819b388
Коммит 9ae8c67093
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 140 добавлений и 76 удалений

Просмотреть файл

@ -355,6 +355,8 @@ gpcnet-network-load-test: Select full system network tests run with four congest
Measure the InfiniBand performance under multi nodes' traffic pattern.
The direction between client and server can be 'cpu-to-cpu'/'gpu-to-gpu'/'gpu-to-cpu'/'cpu-to-gpu'.
The traffic pattern is defined in a config file, which is pre-defined for one-to-many, many-to-one and all-to-all patterns.
Each row in the config is one round, and all pairs of nodes in a row run ib command simultaneously.
@ -371,10 +373,10 @@ with topology distance of 2, 4, 6, respectively.
#### Metrics
| Metrics | Unit | Description |
|------------------------------------------------------------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| ib-traffic/ib\_write\_bw\_${line}\_${pair}:${server}\_${client} | bandwidth (GB/s) | The max bandwidth of perftest (ib_write_bw, ib_send_bw, ib_read_bw) run between the ${pair}<sup>th</sup> node pair in the ${line}<sup>th</sup> line of the config, ${server} and ${client} are the hostname of server and client. |
| ib-traffic/ib\_write\_lat\_${line}\_${pair}:${server}\_${client} | time (us) | The max latency of perftest (ib_write_lat, ib_send_lat, ib_read_lat) run between the ${pair}<sup>th</sup> node pair in the ${line}<sup>th</sup> line of the config, ${server} and ${client} are the hostname of server and client. |
| Metrics | Unit | Description |
|---------------------------------------------------------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| ib-traffic/ib\_write\_bw\_${msg_size}\_${direction}\_${line}\_${pair}:${server}\_${client} | bandwidth (GB/s) | The max bandwidth of perftest (ib_write_bw, ib_send_bw, ib_read_bw) using ${msg_size} with ${direction}('cpu-to-cpu'/'gpu-to-gpu'/'gpu-to-cpu'/'cpu-to-gpu') run between the ${pair}<sup>th</sup> node pair in the ${line}<sup>th</sup> line of the config, ${server} and ${client} are the hostname of server and client. |
| ib-traffic/ib\_write\_lat\_${msg_size}\_${direction}\_${line}\_${pair}:${server}\_${client} | time (us) | The max latency of perftest (ib_write_lat, ib_send_lat, ib_read_lat) using ${msg_size} with ${direction}('cpu-to-cpu'/'gpu-to-gpu'/'gpu-to-cpu'/'cpu-to-gpu') run between the ${pair}<sup>th</sup> node pair in the ${line}<sup>th</sup> line of the config, ${server} and ${client} are the hostname of server and client. |
## Computation-communication Benchmarks

Просмотреть файл

@ -27,6 +27,7 @@ class IBBenchmark(MicroBenchmarkWithInvoke):
self.__support_ib_commands = [
'ib_write_bw', 'ib_read_bw', 'ib_send_bw', 'ib_write_lat', 'ib_read_lat', 'ib_send_lat'
]
self.__support_directions = ['gpu-to-gpu', 'cpu-to-cpu', 'cpu-to-gpu', 'gpu-to-cpu']
self.__patterns = ['one-to-one', 'one-to-many', 'many-to-one', 'topo-aware']
self.__config_path = os.path.join(os.getcwd(), 'config.txt')
self.__config = []
@ -74,6 +75,7 @@ class IBBenchmark(MicroBenchmarkWithInvoke):
self._parser.add_argument(
'--msg_size',
type=int,
nargs='+',
default=8388608,
required=False,
help='The message size of perftest command, e.g., 8388608.',
@ -84,6 +86,7 @@ class IBBenchmark(MicroBenchmarkWithInvoke):
self._parser.add_argument(
'--command',
type=str,
nargs='+',
default='ib_write_bw',
required=False,
help='The perftest command to use, e.g., {}.'.format(' '.join(self.__support_ib_commands)),
@ -137,6 +140,14 @@ class IBBenchmark(MicroBenchmarkWithInvoke):
required=False,
help='The path of ibnetdiscover output',
)
self._parser.add_argument(
'--direction',
type=str,
nargs='+',
default='gpu-to-gpu',
required=False,
help='The direction of traffic pattern, e.g., gpu-to-gpu, cpu-to-cpu, cpu-to-gpu, gpu-to-cpu'
)
def __one_to_many(self, n):
"""Generate one-to-many pattern config.
@ -249,37 +260,32 @@ class IBBenchmark(MicroBenchmarkWithInvoke):
return False
return True
def __prepare_general_ib_command_params(self):
def __prepare_general_ib_command_params(self, msg_size, device='cpu'):
"""Prepare general params for ib commands.
Returns:
Str of ib command params if arguments are valid, otherwise False.
"""
# Format the ib command type
self._args.command = self._args.command.lower()
# Add message size for ib command
msg_size = f'-s {self._args.msg_size}' if self._args.msg_size > 0 else '-a'
msg_size = f'-s {msg_size}' if msg_size > 0 else '-a'
# Add GPUDirect for ib command
gpu_dev = ''
if self._args.gpu_dev is not None:
if 'bw' in self._args.command:
gpu = GPU()
if gpu.vendor == 'nvidia':
gpu_dev = f'--use_cuda={self._args.gpu_dev}'
elif gpu.vendor == 'amd':
gpu_dev = f'--use_rocm={self._args.gpu_dev}'
else:
self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
logger.error('No GPU found - benchmark: {}'.format(self._name))
return False
elif 'lat' in self._args.command:
logger.warning('Wrong configuration: Perftest supports CUDA/ROCM only in BW tests')
if device == 'gpu' and self._args.gpu_dev is not None:
gpu = GPU()
if gpu.vendor == 'nvidia':
gpu_dev = f'--use_cuda={self._args.gpu_dev}'
elif gpu.vendor == 'amd':
gpu_dev = f'--use_rocm={self._args.gpu_dev}'
else:
self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
logger.error('No GPU found - benchmark: {}'.format(self._name))
return False
# Generate ib command params
command_params = f'-F -n {self._args.iters} -d {self._args.ib_dev} {msg_size} {gpu_dev}'
command_params = f'{command_params.strip()} --report_gbits'
return command_params
def _preprocess(self):
def _preprocess(self): # noqa: C901
"""Preprocess/preparation operations before the benchmarking.
Return:
@ -292,31 +298,66 @@ class IBBenchmark(MicroBenchmarkWithInvoke):
if not self.__prepare_config():
return False
# Prepare general params for ib commands
command_params = self.__prepare_general_ib_command_params()
if not command_params:
return False
# Generate commands
if self._args.command not in self.__support_ib_commands:
self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
logger.error(
'Unsupported ib command - benchmark: {}, command: {}, expected: {}.'.format(
self._name, self._args.command, ' '.join(self.__support_ib_commands)
)
)
return False
else:
ib_command_prefix = f'{os.path.join(self._args.bin_dir, self._args.command)} {command_params}'
if self._args.numa_dev is not None:
ib_command_prefix = f'numactl -N {self._args.numa_dev} {ib_command_prefix}'
if 'bw' in self._args.command and self._args.bidirectional:
ib_command_prefix += ' -b'
command = os.path.join(self._args.bin_dir, self._bin_name)
command += ' --cmd_prefix ' + "'" + ib_command_prefix + "'"
command += f' --timeout {self._args.timeout} ' + \
f'--hostfile {self._args.hostfile} --input_config {self.__config_path}'
self._commands.append(command)
self._commands_ib_commands = []
self._commands_msg_size = []
self._commands_direction = []
if not isinstance(self._args.msg_size, list):
self._args.msg_size = [self._args.msg_size]
for msg_size in self._args.msg_size:
if msg_size < 0:
logger.error('Invalid message size - benchmark: {}, message size: {}.'.format(self._name, msg_size))
return False
# Prepare general params for ib commands
cpu_command_params = self.__prepare_general_ib_command_params(msg_size)
gpu_command_params = self.__prepare_general_ib_command_params(msg_size, 'gpu')
if not cpu_command_params or (self._args.gpu_dev and not gpu_command_params):
return False
# Generate commands
if isinstance(self._args.command, str):
self._args.command = [self._args.command]
for ib_command in self._args.command:
if ib_command not in self.__support_ib_commands:
self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
logger.error(
'Unsupported ib command - benchmark: {}, command: {}, expected: {}.'.format(
self._name, ib_command, ' '.join(self.__support_ib_commands)
)
)
return False
else:
# Format the ib command type
ib_command = ib_command.lower()
cpu_ib_command_prefix = f'{os.path.join(self._args.bin_dir, ib_command)} {cpu_command_params}'
gpu_ib_command_prefix = f'{os.path.join(self._args.bin_dir, ib_command)} {gpu_command_params}'
if self._args.numa_dev is not None:
cpu_ib_command_prefix = f'numactl -N {self._args.numa_dev} {cpu_ib_command_prefix}'
gpu_ib_command_prefix = f'numactl -N {self._args.numa_dev} {gpu_ib_command_prefix}'
if 'bw' in ib_command and self._args.bidirectional:
cpu_ib_command_prefix += ' -b'
gpu_ib_command_prefix += ' -b'
if not isinstance(self._args.direction, list):
self._args.direction = [self._args.direction]
for direction in self._args.direction:
if direction not in self.__support_directions:
self._result.set_return_code(ReturnCode.INVALID_ARGUMENT)
logger.error(
'Unsupported direction - benchmark: {}, direction: {}, expected: {}.'.format(
self._name, direction, ' '.join(self.__support_directions)
)
)
return False
# Generate commands
command = os.path.join(self._args.bin_dir, self._bin_name)
command += ' --send_cmd_prefix ' + "'" + cpu_ib_command_prefix + "'" \
if 'cpu-to' in direction else ' --send_cmd_prefix ' + "'" + gpu_ib_command_prefix + "'"
command += ' --recv_cmd_prefix ' + "'" + cpu_ib_command_prefix + "'" \
if 'to-cpu' in direction else ' --recv_cmd_prefix ' + "'" + gpu_ib_command_prefix + "'"
command += f' --timeout {self._args.timeout} ' + \
f'--hostfile {self._args.hostfile} --input_config {self.__config_path}'
self._commands.append(command)
self._commands_ib_commands.append(ib_command)
self._commands_msg_size.append(msg_size)
self._commands_direction.append(direction)
return True
@ -332,7 +373,10 @@ class IBBenchmark(MicroBenchmarkWithInvoke):
Return:
True if the raw output string is valid and result can be extracted.
"""
self._result.add_raw_data('raw_output_' + self._args.command, raw_output, self._args.log_raw_data)
command = self._commands_ib_commands[cmd_idx]
msg_size = self._commands_msg_size[cmd_idx]
direction = self._commands_direction[cmd_idx]
self._result.add_raw_data(f'raw_output_{command}_{msg_size}_{direction}', raw_output, self._args.log_raw_data)
# If it's invoked by MPI and rank is not 0, no result is expected
if os.getenv('OMPI_COMM_WORLD_RANK'):
@ -343,7 +387,6 @@ class IBBenchmark(MicroBenchmarkWithInvoke):
valid = False
content = raw_output.splitlines()
config_index = 0
command = self._args.command
try:
result_index = -1
for index, line in enumerate(content):
@ -359,7 +402,8 @@ class IBBenchmark(MicroBenchmarkWithInvoke):
for pair_index, pair_result in enumerate(line_result):
rank_results = list(filter(None, pair_result.strip().split(' ')))
for rank_index, rank_result in enumerate(rank_results):
metric = f'{command}_{line_index}_{pair_index}:{self.__config[config_index]}:{rank_index}'
metric = f'{command}_{msg_size}_{direction}_{line_index}_{pair_index}:' \
+ f'{self.__config[config_index]}:{rank_index}'
value = float(rank_result)
# Check if the value is valid before the base conversion
if 'bw' in command and value >= 0.0:

Просмотреть файл

@ -51,7 +51,8 @@ struct Args {
// Timeout for each command
int timeout;
// The prefix of command to run
std::string cmd_prefix;
std::string send_cmd_prefix;
std::string recv_cmd_prefix;
// The path of input config file
std::string input_config;
// The path of output csv file
@ -65,9 +66,13 @@ void load_args(int argc, char *argv[], Args &args) {
// Get and parse command line arguments
boost::program_options::options_description opt("all options");
opt.add_options()("timeout,t", boost::program_options::value<int>(&args.timeout)->default_value(120),
"timeout of each command")(
"cmd_prefix,c",
boost::program_options::value<std::string>(&args.cmd_prefix)->default_value("ib_write_bw -s 33554432 -d ib0"),
"timeout of each command")("send_cmd_prefix,c",
boost::program_options::value<std::string>(&args.send_cmd_prefix)
->default_value("ib_write_bw -s 33554432 -d ib0"),
"ib command prefix")(
"recv_cmd_prefix,c",
boost::program_options::value<std::string>(&args.recv_cmd_prefix)
->default_value("ib_write_bw -s 33554432 -d ib0"),
"ib command prefix")(
"input_config,i", boost::program_options::value<std::string>(&args.input_config)->default_value("config.txt"),
"the path of input config file")(
@ -86,7 +91,7 @@ void load_args(int argc, char *argv[], Args &args) {
}
if (g_world_rank == ROOT_RANK) {
std::cout << "Timeout for each command is: " << args.timeout << std::endl;
std::cout << "The prefix of cmd to run is: " << args.cmd_prefix << std::endl;
std::cout << "The prefix of cmd to run is: " << args.send_cmd_prefix << args.recv_cmd_prefix << std::endl;
std::cout << "Load the config file from: " << args.input_config << std::endl;
std::cout << "Output will be saved to: " << args.output_path << std::endl;
}
@ -318,8 +323,9 @@ float run_cmd(string cmd_prefix, int timeout, int port, bool server, string host
}
// The ranks in vector of (server, client) run commands parallel
vector<float> run_cmd_parallel(string cmd_prefix, int timeout, const vector<std::pair<int, int>> &run_pairs_in_parallel,
const vector<int> &ports, const vector<string> &hostnames) {
vector<float> run_cmd_parallel(string send_cmd_prefix, string recv_cmd_prefix, int timeout,
const vector<std::pair<int, int>> &run_pairs_in_parallel, const vector<int> &ports,
const vector<string> &hostnames) {
// invoke function to run cmd in multi threads mode for each rank in the pairs
unordered_map<int, std::future<float>> threads;
int flag;
@ -331,14 +337,14 @@ vector<float> run_cmd_parallel(string cmd_prefix, int timeout, const vector<std:
if (server_index == g_world_rank) {
flag = index;
MPI_Send(&flag, 1, MPI_INT, client_index, rank_index, MPI_COMM_WORLD);
threads[2 * rank_index] = (std::async(std::launch::async, run_cmd, cmd_prefix, timeout,
threads[2 * rank_index] = (std::async(std::launch::async, run_cmd, recv_cmd_prefix, timeout,
ports[rank_index], true, hostnames[server_index / local_size]));
}
if (client_index == g_world_rank) {
// in case that client starts before server
MPI_Recv(&flag, 1, MPI_INT, server_index, rank_index, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
threads[2 * rank_index + 1] =
(std::async(std::launch::async, run_cmd, cmd_prefix, timeout, ports[rank_index], false,
(std::async(std::launch::async, run_cmd, send_cmd_prefix, timeout, ports[rank_index], false,
hostnames[server_index / local_size]));
}
}
@ -384,7 +390,8 @@ vector<vector<float>> run_benchmark(const Args &args, vector<vector<std::pair<in
// Insert barrier to sync before each run
MPI_Barrier(MPI_COMM_WORLD);
// run commands parallel for single line of config
vector<float> results_single_line = run_cmd_parallel(args.cmd_prefix, args.timeout, line, ports, hostnames);
vector<float> results_single_line =
run_cmd_parallel(args.send_cmd_prefix, args.recv_cmd_prefix, args.timeout, line, ports, hostnames);
// collect results for each run
results.push_back(results_single_line);
}
@ -451,10 +458,12 @@ int main(int argc, char **argv) {
// Handle local size and rank
#if defined(OPEN_MPI)
local_size = atoi(getenv("OMPI_COMM_WORLD_LOCAL_SIZE"));
boost::replace_all(args.cmd_prefix, "LOCAL_RANK", "OMPI_COMM_WORLD_LOCAL_RANK");
boost::replace_all(args.send_cmd_prefix, "LOCAL_RANK", "OMPI_COMM_WORLD_LOCAL_RANK");
boost::replace_all(args.recv_cmd_prefix, "LOCAL_RANK", "OMPI_COMM_WORLD_LOCAL_RANK");
#elif defined(MPICH)
local_size = atoi(getenv("MPI_LOCALNRANKS"));
boost::replace_all(args.cmd_prefix, "LOCAL_RANK", "MPI_LOCALRANKID");
boost::replace_all(args.send_cmd_prefix, "LOCAL_RANK", "MPI_LOCALRANKID");
boost::replace_all(args.recv_cmd_prefix, "LOCAL_RANK", "MPI_LOCALRANKID");
#else
local_size = atoi(getenv("LOCAL_SIZE"));
std::cout << "Warning: unknown mpi used." << std::endl;
@ -473,7 +482,7 @@ int main(int argc, char **argv) {
// rank ROOT_RANK output the results to file
if (g_world_rank == ROOT_RANK) {
if (args.output_path.size() != 0)
output_to_file(args.cmd_prefix, config, results, args.output_path);
output_to_file(args.send_cmd_prefix, config, results, args.output_path);
}
// Finalize the MPI environment. No more MPI calls can be made after this

Просмотреть файл

@ -184,18 +184,23 @@ class IBBenchmarkTest(BenchmarkTestCase, unittest.TestCase):
ret = benchmark._preprocess()
Path('config.txt').unlink()
assert (ret)
expect_command = "ib_validation --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d $(echo mlx5_0) -s 33554432 --report_gbits' " + \
expect_command = "ib_validation --send_cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d $(echo mlx5_0) -s 33554432 --report_gbits'" + \
f" --recv_cmd_prefix '{benchmark._args.bin_dir}/ib_write_bw -F -n 2000" + \
" -d $(echo mlx5_0) -s 33554432 --report_gbits' " + \
f'--timeout 120 --hostfile hostfile --input_config {os.getcwd()}/config.txt'
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command)
parameters = '--ib_dev mlx5_0 --msg_size 0 --iters 2000 --pattern one-to-one --hostfile hostfile --gpu_dev 0'
parameters = '--ib_dev mlx5_0 --msg_size 0 --iters 2000 --pattern one-to-one ' \
+ '--hostfile hostfile --gpu_dev 0 --direction gpu-to-gpu'
mock_gpu.return_value = 'nvidia'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
ret = benchmark._preprocess()
expect_command = "ib_validation --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -a --use_cuda=0 --report_gbits' " + \
expect_command = "ib_validation --send_cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -a --use_cuda=0 --report_gbits'" + \
f" --recv_cmd_prefix '{benchmark._args.bin_dir}/ib_write_bw -F -n 2000" + \
" -d mlx5_0 -a --use_cuda=0 --report_gbits' " + \
f'--timeout 120 --hostfile hostfile --input_config {os.getcwd()}/config.txt'
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command)
@ -207,12 +212,14 @@ class IBBenchmarkTest(BenchmarkTestCase, unittest.TestCase):
assert (command == expect_command)
parameters = '--command ib_read_lat --ib_dev mlx5_0 --iters 2000 --msg_size 33554432 ' + \
'--pattern one-to-one --hostfile hostfile --gpu_dev 0'
'--pattern one-to-one --hostfile hostfile --gpu_dev 0 --direction gpu-to-gpu'
mock_gpu.return_value = 'nvidia'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
ret = benchmark._preprocess()
expect_command = "ib_validation --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_read_lat -F -n 2000 -d mlx5_0 -s 33554432 --report_gbits' " + \
expect_command = "ib_validation --send_cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_read_lat -F -n 2000 -d mlx5_0 -s 33554432 --use_cuda=0 --report_gbits'" + \
f" --recv_cmd_prefix '{benchmark._args.bin_dir}/ib_read_lat -F -n 2000" + \
" -d mlx5_0 -s 33554432 --use_cuda=0 --report_gbits' " + \
f'--timeout 120 --hostfile hostfile --input_config {os.getcwd()}/config.txt'
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
assert (command == expect_command)
@ -223,14 +230,16 @@ class IBBenchmarkTest(BenchmarkTestCase, unittest.TestCase):
for line in config:
f.write(line + '\n')
parameters = '--ib_dev mlx5_0 --timeout 180 --iters 2000 --msg_size 33554432 ' + \
'--config test_config.txt --hostfile hostfile'
'--config test_config.txt --hostfile hostfile --direction cpu-to-cpu'
benchmark = benchmark_class(benchmark_name, parameters=parameters)
os.environ['OMPI_COMM_WORLD_SIZE'] = '2'
ret = benchmark._preprocess()
Path('test_config.txt').unlink()
assert (ret)
expect_command = "ib_validation --cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -s 33554432 --report_gbits' " + \
expect_command = "ib_validation --send_cmd_prefix '" + benchmark._args.bin_dir + \
"/ib_write_bw -F -n 2000 -d mlx5_0 -s 33554432 --report_gbits'" + \
f" --recv_cmd_prefix '{benchmark._args.bin_dir}/ib_write_bw -F -n 2000" + \
" -d mlx5_0 -s 33554432 --report_gbits' " + \
'--timeout 180 --hostfile hostfile --input_config test_config.txt'
command = benchmark._bin_name + benchmark._commands[0].split(benchmark._bin_name)[1]
@ -323,5 +332,5 @@ while attempting to start process rank 0.
# Check parameters specified in BenchmarkContext.
assert (benchmark._args.ib_dev == 'mlx5_0')
assert (benchmark._args.iters == 2000)
assert (benchmark._args.msg_size == 33554432)
assert (benchmark._args.command == 'ib_write_bw')
assert (benchmark._args.msg_size == [33554432])
assert (benchmark._args.command == ['ib_write_bw'])