Fix Task Runner regressions
- Input/output data phases not correctly triggered for multi-instance and MPI jobs - Output data was not triggered at all - Pre-exec triggering on native - Resolves #301
This commit is contained in:
Родитель
07e86a3928
Коммит
8b7b17f465
207
convoy/batch.py
207
convoy/batch.py
|
@ -4553,10 +4553,17 @@ def _construct_task(
|
|||
env_vars = util.merge_dict(env_vars, gpu_env)
|
||||
del gpu_env
|
||||
taskenv = []
|
||||
commands = {
|
||||
'mpi': None,
|
||||
'docker_exec': False,
|
||||
'preexec': None,
|
||||
'task': None,
|
||||
'login': None,
|
||||
'input': None,
|
||||
'output': None,
|
||||
}
|
||||
# check if this is a multi-instance task
|
||||
mis = None
|
||||
mpi_command = None
|
||||
mpi_docker_exec_command = None
|
||||
if settings.is_multi_instance_task(_task):
|
||||
if util.is_not_empty(task.multi_instance.coordination_command):
|
||||
if native:
|
||||
|
@ -4598,18 +4605,27 @@ def _construct_task(
|
|||
file_mode=rf.file_mode,
|
||||
)
|
||||
)
|
||||
# set pre-exec command
|
||||
if util.is_not_empty(task.multi_instance.pre_execution_command):
|
||||
commands['preexec'] = [
|
||||
task.multi_instance.pre_execution_command]
|
||||
# set application command
|
||||
if native:
|
||||
if task.multi_instance.mpi is None:
|
||||
task_commands = [task.command]
|
||||
commands['task'] = [task.command]
|
||||
else:
|
||||
mpi_command, ib_env = _construct_mpi_command(pool, task)
|
||||
commands['mpi'], ib_env = _construct_mpi_command(pool, task)
|
||||
if util.is_not_empty(ib_env):
|
||||
env_vars = util.merge_dict(env_vars, ib_env)
|
||||
del ib_env
|
||||
task_commands = [mpi_command]
|
||||
commands['task'] = [commands['mpi']]
|
||||
# insert preexec prior to task command for native
|
||||
if util.is_not_empty(commands['preexec']):
|
||||
commands['task'].insert(0, commands['preexec'][0])
|
||||
else:
|
||||
task_commands = []
|
||||
commands['task'] = []
|
||||
# for non-native do not set the RUNTIME so the user command is
|
||||
# executed as-is
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_ENV_EXCLUDE',
|
||||
|
@ -4622,62 +4638,47 @@ def _construct_task(
|
|||
value=task.envfile,
|
||||
)
|
||||
)
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_RUNTIME_CMD_OPTS',
|
||||
value=(
|
||||
' '.join(task.run_options) if is_singularity
|
||||
else ' '.join(task.docker_exec_options)
|
||||
),
|
||||
)
|
||||
)
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_RUNTIME_CMD',
|
||||
value=(
|
||||
task.singularity_cmd if is_singularity else
|
||||
'exec'
|
||||
),
|
||||
)
|
||||
)
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_CONTAINER_IMAGE_NAME',
|
||||
value=(
|
||||
task.singularity_image if is_singularity else
|
||||
task.name # docker exec requires task name
|
||||
),
|
||||
)
|
||||
)
|
||||
if not is_singularity:
|
||||
commands['docker_exec'] = True
|
||||
if task.multi_instance.mpi is not None:
|
||||
mpi_command, ib_env = _construct_mpi_command(pool, task)
|
||||
commands['mpi'], ib_env = _construct_mpi_command(pool, task)
|
||||
if util.is_not_empty(ib_env):
|
||||
env_vars = util.merge_dict(env_vars, ib_env)
|
||||
del ib_env
|
||||
if not is_singularity:
|
||||
mpi_docker_exec_command = (
|
||||
'docker exec {} {} $AZ_BATCH_NODE_STARTUP_DIR/wd/'
|
||||
'shipyard_task_runner.sh'.format(
|
||||
' '.join(task.docker_exec_options),
|
||||
task.name
|
||||
)
|
||||
)
|
||||
else:
|
||||
# if it's a multi-instance task, but not an mpi task,
|
||||
# populate environment settings
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_RUNTIME_CMD_OPTS',
|
||||
value=(
|
||||
' '.join(task.run_options) if is_singularity
|
||||
else ' '.join(task.docker_exec_options)
|
||||
),
|
||||
)
|
||||
)
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_RUNTIME',
|
||||
value='singularity' if is_singularity else 'docker',
|
||||
)
|
||||
)
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_RUNTIME_CMD',
|
||||
value=(
|
||||
task.singularity_cmd if is_singularity else
|
||||
'exec'
|
||||
),
|
||||
)
|
||||
)
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_CONTAINER_IMAGE_NAME',
|
||||
value=(
|
||||
task.singularity_image if is_singularity else
|
||||
task.name # docker exec requires task name
|
||||
),
|
||||
)
|
||||
)
|
||||
else:
|
||||
if native:
|
||||
task_commands = [
|
||||
commands['task'] = [
|
||||
'{}'.format(' ' + task.command) if task.command else ''
|
||||
]
|
||||
else:
|
||||
task_commands = []
|
||||
commands['task'] = []
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_ENV_EXCLUDE',
|
||||
|
@ -4722,70 +4723,75 @@ def _construct_task(
|
|||
if (not native and allow_run_on_missing and
|
||||
(len(docker_missing_images) > 0 or
|
||||
len(singularity_missing_images) > 0)):
|
||||
loginenv, logincmd = generate_docker_login_settings(config)
|
||||
logincmd.extend(task_commands)
|
||||
loginenv, commands['login'] = generate_docker_login_settings(config)
|
||||
taskenv.extend(loginenv)
|
||||
task_commands = logincmd
|
||||
# digest any input_data
|
||||
addlcmds = data.process_input_data(config, bxfile, _task, on_task=True)
|
||||
if addlcmds is not None:
|
||||
commands['input'] = data.process_input_data(
|
||||
config, bxfile, _task, on_task=True)
|
||||
if native and commands['input'] is not None:
|
||||
raise RuntimeError(
|
||||
'input_data at task-level is not supported on '
|
||||
'native container pools')
|
||||
# digest any output data
|
||||
commands['output'] = data.process_output_data(config, bxfile, _task)
|
||||
if commands['output'] is not None:
|
||||
if native:
|
||||
raise RuntimeError(
|
||||
'input_data at task-level is not supported on '
|
||||
'native container pools')
|
||||
task_commands.insert(0, addlcmds)
|
||||
output_files = commands['output']
|
||||
commands['output'] = None
|
||||
else:
|
||||
commands['output'] = [commands['output']]
|
||||
# populate task runner vars for non-native mode
|
||||
if not native:
|
||||
if util.is_not_empty(task_commands):
|
||||
# set the correct runner script
|
||||
if commands['docker_exec']:
|
||||
commands['task'] = [
|
||||
'$AZ_BATCH_NODE_STARTUP_DIR/wd/'
|
||||
'shipyard_docker_exec_task_runner.sh'
|
||||
]
|
||||
else:
|
||||
commands['task'] = [
|
||||
'$AZ_BATCH_NODE_STARTUP_DIR/wd/shipyard_task_runner.sh'
|
||||
]
|
||||
# set system prologue command
|
||||
sys_prologue_cmd = []
|
||||
if util.is_not_empty(commands['login']):
|
||||
sys_prologue_cmd.extend(commands['login'])
|
||||
if util.is_not_empty(commands['input']):
|
||||
sys_prologue_cmd.append(commands['input'])
|
||||
if util.is_not_empty(sys_prologue_cmd):
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_SYSTEM_PROLOGUE_CMD',
|
||||
value=util.wrap_commands_in_shell(
|
||||
task_commands, windows=is_windows),
|
||||
sys_prologue_cmd, windows=is_windows),
|
||||
)
|
||||
)
|
||||
task_commands = []
|
||||
# execute multi instance pre-exec cmd
|
||||
if (util.is_not_empty(task.multi_instance.pre_execution_command)):
|
||||
task_commands.insert(0, task.multi_instance.pre_execution_command)
|
||||
if not native:
|
||||
if util.is_not_empty(task_commands):
|
||||
del sys_prologue_cmd
|
||||
# set user prologue command
|
||||
if util.is_not_empty(commands['preexec']):
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_USER_PROLOGUE_CMD',
|
||||
value=util.wrap_commands(
|
||||
task_commands, windows=is_windows),
|
||||
value=util.wrap_commands_in_shell(
|
||||
commands['preexec'], windows=is_windows),
|
||||
)
|
||||
)
|
||||
task_commands = []
|
||||
# digest any output data
|
||||
addlcmds = data.process_output_data(config, bxfile, _task)
|
||||
if addlcmds is not None:
|
||||
if native:
|
||||
output_files = addlcmds
|
||||
else:
|
||||
task_commands.append(addlcmds)
|
||||
del addlcmds
|
||||
if not native:
|
||||
if util.is_not_empty(task_commands):
|
||||
# set user command (task)
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_USER_CMD',
|
||||
value=commands['mpi'] or task.command,
|
||||
)
|
||||
)
|
||||
# set epilogue command
|
||||
if util.is_not_empty(commands['output']):
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_SYSTEM_EPILOGUE_CMD',
|
||||
value=util.wrap_commands_in_shell(
|
||||
task_commands, windows=is_windows),
|
||||
commands['output'], windows=is_windows),
|
||||
)
|
||||
)
|
||||
taskenv.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
name='SHIPYARD_USER_CMD',
|
||||
value=mpi_command or task.command,
|
||||
)
|
||||
)
|
||||
if mpi_docker_exec_command is not None:
|
||||
task_commands = [mpi_docker_exec_command]
|
||||
else:
|
||||
task_commands = [
|
||||
'$AZ_BATCH_NODE_STARTUP_DIR/wd/shipyard_task_runner.sh'
|
||||
]
|
||||
# always add env vars in (host) task to be dumped into container
|
||||
# task (if non-native)
|
||||
if util.is_not_empty(env_vars):
|
||||
|
@ -4809,15 +4815,16 @@ def _construct_task(
|
|||
)
|
||||
)
|
||||
# create task
|
||||
if util.is_not_empty(task_commands):
|
||||
if util.is_not_empty(commands['task']):
|
||||
if native:
|
||||
if is_windows:
|
||||
tc = ' && '.join(task_commands)
|
||||
tc = ' && '.join(commands['task'])
|
||||
else:
|
||||
tc = '; '.join(task_commands)
|
||||
tc = '; '.join(commands['task'])
|
||||
tc = tc.strip()
|
||||
else:
|
||||
tc = util.wrap_commands_in_shell(task_commands, windows=is_windows)
|
||||
tc = util.wrap_commands_in_shell(
|
||||
commands['task'], windows=is_windows)
|
||||
else:
|
||||
tc = ''
|
||||
batchtask = batchmodels.TaskAddParameter(
|
||||
|
|
|
@ -214,6 +214,10 @@ _TASK_RUNNER_FILE = (
|
|||
'shipyard_task_runner.sh',
|
||||
pathlib.Path(_ROOT_PATH, 'scripts/shipyard_task_runner.sh')
|
||||
)
|
||||
_DOCKER_EXEC_TASK_RUNNER_FILE = (
|
||||
'shipyard_docker_exec_task_runner.sh',
|
||||
pathlib.Path(_ROOT_PATH, 'scripts/shipyard_docker_exec_task_runner.sh')
|
||||
)
|
||||
_GLUSTERPREP_FILE = (
|
||||
'shipyard_glusterfs_on_compute.sh',
|
||||
pathlib.Path(_ROOT_PATH, 'scripts/shipyard_glusterfs_on_compute.sh')
|
||||
|
@ -1381,6 +1385,7 @@ def _construct_pool_object(
|
|||
_rflist.append(_NODEPREP_FILE)
|
||||
if not native:
|
||||
_rflist.append(_TASK_RUNNER_FILE)
|
||||
_rflist.append(_DOCKER_EXEC_TASK_RUNNER_FILE)
|
||||
# create start task commandline
|
||||
start_task.append(
|
||||
('{npf}{a}{b}{c}{d}{e}{f}{g}{i}{j}{k}{lis}{m}{n}{o}{p}{q}{r}{s}'
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
set -o pipefail
|
||||
|
||||
# environment variables used
|
||||
# SHIPYARD_SYSTEM_PROLOGUE_CMD: pre-exec system cmd
|
||||
# SHIPYARD_USER_PROLOGUE_CMD: pre-exec user cmd
|
||||
# SHIPYARD_SYSTEM_EPILOGUE_CMD: post-exec system cmd
|
||||
# SHIPYARD_ENV_EXCLUDE: environment vars to exclude
|
||||
# SHIPYARD_ENV_FILE: env file
|
||||
# SHIPYARD_RUNTIME: docker or singularity
|
||||
# SHIPYARD_RUNTIME_CMD: run or exec
|
||||
# SHIPYARD_RUNTIME_CMD_OPTS: options
|
||||
# SHIPYARD_CONTAINER_IMAGE_NAME: container name
|
||||
# SHIPYARD_USER_CMD: user command
|
||||
|
||||
## Load environment modules, if available
|
||||
if [ -f /etc/profile.d/modules.sh ]; then
|
||||
# shellcheck disable=SC1091
|
||||
source /etc/profile.d/modules.sh
|
||||
fi
|
||||
|
||||
## PRE-EXEC
|
||||
if [ -n "$SHIPYARD_SYSTEM_PROLOGUE_CMD" ]; then
|
||||
eval "$SHIPYARD_SYSTEM_PROLOGUE_CMD"
|
||||
fi
|
||||
|
||||
## TASK EXEC
|
||||
if [ -n "$SHIPYARD_ENV_EXCLUDE" ]; then
|
||||
env | grep -vE "$SHIPYARD_ENV_EXCLUDE" > "$SHIPYARD_ENV_FILE"
|
||||
else
|
||||
env > "$SHIPYARD_ENV_FILE"
|
||||
fi
|
||||
|
||||
SHIPYARD_RUNTIME_CMD_OPTS=$(eval echo "${SHIPYARD_RUNTIME_CMD_OPTS}")
|
||||
|
||||
set +e
|
||||
|
||||
# shellcheck disable=SC2086
|
||||
docker exec -e SHIPYARD_SYSTEM_PROLOGUE_CMD= -e SHIPYARD_SYSTEM_EPILOGUE_CMD= \
|
||||
$SHIPYARD_RUNTIME_CMD_OPTS $SHIPYARD_CONTAINER_IMAGE_NAME \
|
||||
$AZ_BATCH_NODE_STARTUP_DIR/wd/shipyard_task_runner.sh
|
||||
SHIPYARD_TASK_EC=$?
|
||||
|
||||
## POST EXEC
|
||||
if [ -n "$SHIPYARD_SYSTEM_EPILOGUE_CMD" ]; then
|
||||
if [ "$SHIPYARD_TASK_EC" -eq 0 ]; then
|
||||
export SHIPYARD_TASK_RESULT=success
|
||||
else
|
||||
export SHIPYARD_TASK_RESULT=fail
|
||||
fi
|
||||
eval "$SHIPYARD_SYSTEM_EPILOGUE_CMD"
|
||||
fi
|
||||
|
||||
exit $SHIPYARD_TASK_EC
|
|
@ -329,6 +329,17 @@ get_vm_size_from_imds() {
|
|||
log INFO "VmSize=$vm_size RDMA=$vm_rdma_type"
|
||||
}
|
||||
|
||||
set_default_pool_user_privileges() {
|
||||
if [ -n "$docker_group" ]; then
|
||||
# Modify the default pool user to have sudo privileges, which in turn
|
||||
# will give access to the Docker socket. This is required to allow
|
||||
# input/output phases via Docker containers as the default pool user.
|
||||
usermod -aG _azbatchsudogrp _azbatch
|
||||
else
|
||||
usermod -aG docker _azbatch
|
||||
fi
|
||||
}
|
||||
|
||||
download_file_as() {
|
||||
log INFO "Downloading: $1 as $2"
|
||||
local retries=10
|
||||
|
@ -1780,6 +1791,11 @@ if [ $custom_image -eq 0 ] && [ $native_mode -eq 0 ]; then
|
|||
install_docker_host_engine
|
||||
fi
|
||||
|
||||
# set default pool user privileges
|
||||
if [ $native_mode -eq 0 ]; then
|
||||
set_default_pool_user_privileges
|
||||
fi
|
||||
|
||||
# login to registry servers (do not specify -e as creds have been decrypted)
|
||||
./registry_login.sh
|
||||
|
||||
|
|
|
@ -51,13 +51,13 @@ else
|
|||
fi
|
||||
|
||||
## POST EXEC
|
||||
if [ -n "$SHIPYARD_USER_EPILOGUE_CMD" ]; then
|
||||
if [ -n "$SHIPYARD_SYSTEM_EPILOGUE_CMD" ]; then
|
||||
if [ "$SHIPYARD_TASK_EC" -eq 0 ]; then
|
||||
export SHIPYARD_TASK_RESULT=success
|
||||
else
|
||||
export SHIPYARD_TASK_RESULT=fail
|
||||
fi
|
||||
eval "$SHIPYARD_USER_EPILOGUE_CMD"
|
||||
eval "$SHIPYARD_SYSTEM_EPILOGUE_CMD"
|
||||
fi
|
||||
|
||||
exit $SHIPYARD_TASK_EC
|
||||
|
|
Загрузка…
Ссылка в новой задаче