Runner - Support mpi mode (#146)
Support mpi mode in runner: * concate mpirun command * support mca and env config * prepare hostfile and update Ansible host pattern Co-authored-by: Peng Cheng <chengpeng5555@outlook.com>
This commit is contained in:
Родитель
96fc4d09dd
Коммит
98b6c0e3ca
|
@ -78,6 +78,18 @@ class AnsibleClient():
|
|||
logger.info(r.stats)
|
||||
return r.rc
|
||||
|
||||
def update_mpi_config(self, ansible_config):
|
||||
"""Update ansible config for mpi, run on the first host of inventory group.
|
||||
|
||||
Args:
|
||||
ansible_config (dict): Ansible config dict.
|
||||
|
||||
Returns:
|
||||
dict: Updated Ansible config dict.
|
||||
"""
|
||||
ansible_config['host_pattern'] += '[0]'
|
||||
return ansible_config
|
||||
|
||||
def get_shell_config(self, cmd):
|
||||
"""Get ansible config for shell module.
|
||||
|
||||
|
|
|
@ -43,3 +43,9 @@
|
|||
dest: '{{ workspace }}/sb.env'
|
||||
mode: 0644
|
||||
become: yes
|
||||
- name: Updating Hostfile
|
||||
copy:
|
||||
content: "{{ sb_nodes | join('\n') }}\n"
|
||||
dest: '{{ workspace }}/hostfile'
|
||||
mode: 0644
|
||||
become: yes
|
||||
|
|
|
@ -48,7 +48,7 @@ class SuperBenchRunner():
|
|||
"""
|
||||
SuperBenchLogger.add_handler(logger.logger, filename=str(self._output_path / filename))
|
||||
|
||||
def __validate_sb_config(self):
|
||||
def __validate_sb_config(self): # noqa: C901
|
||||
"""Validate SuperBench config object.
|
||||
|
||||
Raise:
|
||||
|
@ -69,6 +69,18 @@ class SuperBenchRunner():
|
|||
elif mode.name == 'torch.distributed':
|
||||
if not mode.proc_num:
|
||||
self._sb_benchmarks[name].modes[idx].proc_num = 8
|
||||
elif mode.name == 'mpi':
|
||||
if not mode.mca:
|
||||
self._sb_benchmarks[name].modes[idx].mca = {
|
||||
'pml': 'ob1',
|
||||
'btl': '^openib',
|
||||
'btl_tcp_if_exclude': 'lo,docker0',
|
||||
'coll_hcoll_enable': 0,
|
||||
}
|
||||
if not mode.env:
|
||||
self._sb_benchmarks[name].modes[idx].env = {}
|
||||
for key in ['PATH', 'LD_LIBRARY_PATH', 'SB_MICRO_PATH']:
|
||||
self._sb_benchmarks[name].modes[idx].env.setdefault(key, None)
|
||||
|
||||
def __get_enabled_benchmarks(self):
|
||||
"""Get enabled benchmarks list.
|
||||
|
@ -122,6 +134,23 @@ class SuperBenchRunner():
|
|||
'superbench.benchmarks.{name}.parameters.distributed_backend=nccl'
|
||||
).format(name=benchmark_name),
|
||||
)
|
||||
elif mode.name == 'mpi':
|
||||
mode_command = (
|
||||
'mpirun ' # use default OpenMPI in image
|
||||
'-tag-output ' # tag mpi output with [jobid,rank]<stdout/stderr> prefix
|
||||
'-allow-run-as-root ' # allow mpirun to run when executed by root user
|
||||
'-hostfile hostfile ' # use prepared hostfile
|
||||
'-map-by ppr:{proc_num}:node ' # launch {proc_num} processes on each node
|
||||
'-bind-to numa ' # bind processes to numa
|
||||
'{mca_list} {env_list} {command}'
|
||||
).format(
|
||||
proc_num=mode.proc_num,
|
||||
mca_list=' '.join(f'-mca {k} {v}' for k, v in mode.mca.items()),
|
||||
env_list=' '.join(f'-x {k}={v}' if v else f'-x {k}' for k, v in mode.env.items()),
|
||||
command=exec_command,
|
||||
)
|
||||
else:
|
||||
logger.warning('Unknown mode %s.', mode.name)
|
||||
return mode_command.strip()
|
||||
|
||||
def deploy(self): # pragma: no cover
|
||||
|
@ -186,15 +215,15 @@ class SuperBenchRunner():
|
|||
"""
|
||||
mode.update(vars)
|
||||
logger.info('Runner is going to run %s in %s mode, proc rank %d.', benchmark_name, mode.name, mode.proc_rank)
|
||||
rc = self._ansible_client.run(
|
||||
self._ansible_client.get_shell_config(
|
||||
(
|
||||
'docker exec sb-workspace bash -c '
|
||||
"'set -o allexport && source sb.env && set +o allexport && {command}'"
|
||||
).format(command=self.__get_mode_command(benchmark_name, mode), )
|
||||
),
|
||||
sudo=True
|
||||
ansible_runner_config = self._ansible_client.get_shell_config(
|
||||
(
|
||||
'docker exec sb-workspace bash -c '
|
||||
"'set -o allexport && source sb.env && set +o allexport && {command}'"
|
||||
).format(command=self.__get_mode_command(benchmark_name, mode))
|
||||
)
|
||||
if mode.name == 'mpi':
|
||||
ansible_runner_config = self._ansible_client.update_mpi_config(ansible_runner_config)
|
||||
rc = self._ansible_client.run(ansible_runner_config, sudo=True)
|
||||
return rc
|
||||
|
||||
def run(self):
|
||||
|
@ -211,6 +240,8 @@ class SuperBenchRunner():
|
|||
'proc_rank': proc_rank
|
||||
}) for proc_rank in range(mode.proc_num)
|
||||
)
|
||||
elif mode.name == 'torch.distributed':
|
||||
elif mode.name == 'torch.distributed' or mode.name == 'mpi':
|
||||
self._run_proc(benchmark_name, mode, {'proc_rank': 0})
|
||||
else:
|
||||
logger.warning('Unknown mode %s.', mode.name)
|
||||
self.fetch_results()
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
{% for host in hostvars.values() | map(attribute='ansible_hostname') | sort %}
|
||||
Host node{{ loop.index0 }}
|
||||
HostName {{ host }}
|
||||
Host {{ host }}
|
||||
Port {{ ssh_port }}
|
||||
IdentityFile /root/.ssh/key
|
||||
StrictHostKeyChecking no
|
||||
|
|
|
@ -58,6 +58,15 @@ class AnsibleClientTestCase(unittest.TestCase):
|
|||
}
|
||||
)
|
||||
|
||||
def test_update_mpi_config(self):
|
||||
"""Test update_mpi_config of client."""
|
||||
self.assertDictEqual(
|
||||
self.ansible_client.update_mpi_config(self.ansible_client._config), {
|
||||
**self.ansible_client._config,
|
||||
'host_pattern': 'all[0]',
|
||||
}
|
||||
)
|
||||
|
||||
def test_get_shell_config(self):
|
||||
"""Test get_shell_config of client."""
|
||||
cmd = 'ls -la'
|
||||
|
|
|
@ -122,6 +122,46 @@ class RunnerTestCase(unittest.TestCase):
|
|||
'superbench.benchmarks.foo.parameters.distributed_backend=nccl'
|
||||
),
|
||||
},
|
||||
{
|
||||
'benchmark_name':
|
||||
'foo',
|
||||
'mode': {
|
||||
'name': 'mpi',
|
||||
'proc_num': 8,
|
||||
'proc_rank': 1,
|
||||
'mca': {},
|
||||
'env': {
|
||||
'PATH': None,
|
||||
'LD_LIBRARY_PATH': None,
|
||||
},
|
||||
},
|
||||
'expected_command': (
|
||||
'mpirun -tag-output -allow-run-as-root -hostfile hostfile -map-by ppr:8:node -bind-to numa '
|
||||
' -x PATH -x LD_LIBRARY_PATH '
|
||||
f'sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo'
|
||||
),
|
||||
},
|
||||
{
|
||||
'benchmark_name':
|
||||
'foo',
|
||||
'mode': {
|
||||
'name': 'mpi',
|
||||
'proc_num': 8,
|
||||
'proc_rank': 2,
|
||||
'mca': {
|
||||
'coll_hcoll_enable': 0,
|
||||
},
|
||||
'env': {
|
||||
'SB_MICRO_PATH': '/sb',
|
||||
'FOO': 'BAR',
|
||||
},
|
||||
},
|
||||
'expected_command': (
|
||||
'mpirun -tag-output -allow-run-as-root -hostfile hostfile -map-by ppr:8:node -bind-to numa '
|
||||
'-mca coll_hcoll_enable 0 -x SB_MICRO_PATH=/sb -x FOO=BAR '
|
||||
f'sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo'
|
||||
),
|
||||
},
|
||||
]
|
||||
for test_case in test_cases:
|
||||
with self.subTest(msg='Testing with case', test_case=test_case):
|
||||
|
|
Загрузка…
Ссылка в новой задаче