Enhanced support for autogen task ids

- Add support to override global default at a per-job level and at a per
task factory level
- Resolves #324
This commit is contained in:
Fred Park 2019-11-14 02:59:02 +00:00
Родитель b0a3b9ef1a
Коммит bc4a47d88d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 3C4D545F457737EB
7 изменённых файлов: 105 добавлений и 30 удалений

Просмотреть файл

@ -67,6 +67,9 @@ job_specifications:
default_working_dir: batch
restrict_default_bind_mounts: false
force_enable_task_dependencies: false
autogenerated_task_id:
prefix: task-
zfill_width: 5
federation_constraints:
pool:
autoscale:
@ -180,6 +183,9 @@ job_specifications:
module: mypkg.mymodule
package: null
repeat: 3
autogenerated_task_id:
prefix: task-
zfill_width: 5
singularity_execution:
cmd: exec
elevated: false

Просмотреть файл

@ -4172,16 +4172,17 @@ def _format_generic_task_id(prefix, padding, tasknum):
def _generate_next_generic_task_id(
batch_client, config, job_id, tasklist=None, reserved=None,
batch_client, config, job_id, task, tasklist=None, reserved=None,
task_map=None, last_task_id=None, is_merge_task=False,
federation_id=None):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict, str,
# list, str, dict, str, bool, str) -> Tuple[list, str]
# dict, list, str, dict, str, bool, str) -> Tuple[list, str]
"""Generate the next generic task id
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param str job_id: job id
:param dict task: task config spec
:param list tasklist: list of current (committed) tasks in job
:param str reserved: reserved task id
:param dict task_map: map of pending tasks to add to the job
@ -4192,13 +4193,17 @@ def _generate_next_generic_task_id(
:return: (list of committed task ids for job, next generic docker task id)
"""
# get prefix and padding settings
prefix = settings.autogenerated_task_id_prefix(config)
padding = settings.autogenerated_task_id_zfill(config)
prefix = task['##task_id_prefix']
padding = task['##task_id_padding']
delimiter = prefix if util.is_not_empty(prefix) else ' '
if is_merge_task:
prefix = 'merge-{}'.format(prefix)
# get filtered, sorted list of generic docker task ids
# reset tasklist if cached tasklist doesn't start with prefix
if (util.is_not_empty(tasklist) and
not tasklist[0].id.startswith(prefix)):
tasklist = None
try:
# get filtered, sorted list of generic task ids
if tasklist is None and util.is_none_or_empty(federation_id):
tasklist = batch_client.task.list(
job_id,
@ -4207,6 +4212,7 @@ def _generate_next_generic_task_id(
if util.is_not_empty(prefix) else None,
select='id'))
tasklist = list(tasklist)
# get last task and increment
tasknum = sorted(
[int(x.id.split(delimiter)[-1]) for x in tasklist])[-1] + 1
except (batchmodels.BatchErrorException, IndexError, TypeError):
@ -4524,7 +4530,7 @@ def _construct_task(
_task_id = settings.task_id(_task)
if util.is_none_or_empty(_task_id):
existing_tasklist, _task_id = _generate_next_generic_task_id(
batch_client, config, job_id, tasklist=existing_tasklist,
batch_client, config, job_id, _task, tasklist=existing_tasklist,
reserved=reserved_task_id, task_map=task_map,
last_task_id=lasttaskid, is_merge_task=is_merge_task,
federation_id=federation_id)
@ -5198,7 +5204,7 @@ def add_jobs(
existing_tasklist, reserved_task_id = \
_generate_next_generic_task_id(
batch_client, config, job_id,
tasklist=existing_tasklist,
task, tasklist=existing_tasklist,
federation_id=federation_id)
settings.set_task_id(task, reserved_task_id)
_id = '{}-{}'.format(job_id, reserved_task_id)
@ -5846,6 +5852,8 @@ def generate_info_metadata_for_federation_message(
multi_instance, uses_task_dependencies, has_gpu_task, has_ib_task,
max_instance_count_in_job, instances_required_in_job, has_merge_task,
merge_task_id, task_map):
prefix, padding = settings.autogenerated_task_id_settings(
config, level='global')
info = {
'version': '1',
'action': {
@ -5910,8 +5918,8 @@ def generate_info_metadata_for_federation_message(
},
},
'task_naming': {
'prefix': settings.autogenerated_task_id_prefix(config),
'padding': settings.autogenerated_task_id_zfill(config),
'prefix': prefix,
'padding': padding,
},
},
}

Просмотреть файл

@ -3089,16 +3089,24 @@ def job_specifications(config):
'jobs configuration file?')
def autogenerated_task_id_prefix(config):
# type: (dict) -> str
def autogenerated_task_id_settings(config, level=None):
# type: (dict, str) -> Tuple[str, str]
"""Get the autogenerated task id prefix to use
:param dict config: configuration object
:rtype: str
:return: auto-gen task id prefix
:param str level: 'global', 'job', or 'task_factory'
:rtype: tuple
:return: (auto-gen task id prefix, auto-gen task id zfill)
"""
conf = _kv_read_checked(
config['batch_shipyard'], 'autogenerated_task_id', {}
)
if level == 'global':
conf = config['batch_shipyard']
elif level == 'job':
conf = config
elif level == 'task_factory':
conf = config['task_factory']
else:
raise RuntimeError(
'invalid level={} for autogenerated task id setting'.format(level))
conf = _kv_read_checked(conf, 'autogenerated_task_id', {})
# do not use _kv_read_checked for prefix we want to allow empty string
try:
prefix = conf['prefix']
@ -3106,20 +3114,8 @@ def autogenerated_task_id_prefix(config):
raise KeyError()
except KeyError:
prefix = 'task-'
return prefix
def autogenerated_task_id_zfill(config):
# type: (dict) -> int
"""Get the autogenerated task zfill setting to use
:param dict config: configuration object
:rtype: int
:return: auto-gen task number zfill
"""
conf = _kv_read_checked(
config['batch_shipyard'], 'autogenerated_task_id', {}
)
return _kv_read(conf, 'zfill_width', 5)
padding = _kv_read(conf, 'zfill_width', 5)
return (prefix, padding)
def job_tasks(config, conf):
@ -3130,6 +3126,11 @@ def job_tasks(config, conf):
:rtype: list
:return: list of tasks
"""
if 'autogenerated_task_id' in conf:
prefix, padding = autogenerated_task_id_settings(conf, level='job')
else:
prefix, padding = autogenerated_task_id_settings(
config, level='global')
for _task in conf['tasks']:
if 'task_factory' in _task:
# get storage settings if applicable
@ -3148,10 +3149,21 @@ def job_tasks(config, conf):
)
else:
tfstorage = None
# get autogenerated task id settings
if 'autogenerated_task_id' in _task['task_factory']:
tfprefix, tfpadding = autogenerated_task_id_settings(
_task, level='task_factory')
else:
tfprefix = prefix
tfpadding = padding
for task in task_factory.generate_task(_task, tfstorage):
task['##tfgen'] = True
task['##task_id_prefix'] = tfprefix
task['##task_id_padding'] = tfpadding
yield task
else:
_task['##task_id_prefix'] = prefix
_task['##task_id_padding'] = padding
yield _task

Просмотреть файл

@ -458,5 +458,7 @@ def generate_task(task, storage_settings):
yield taskcopy
else:
raise ValueError('unknown parametric sweep type: {}'.format(sweep))
elif 'autogenerated_task_id' in task_factory:
pass
else:
raise ValueError('unknown task factory type: {}'.format(task_factory))

Просмотреть файл

@ -93,6 +93,9 @@ job_specifications:
default_working_dir: batch
restrict_default_bind_mounts: false
force_enable_task_dependencies: false
autogenerated_task_id:
prefix: task-
zfill_width: 5
federation_constraints:
pool:
autoscale:
@ -204,6 +207,9 @@ job_specifications:
module: mypkg.mymodule
package: null
repeat: 3
autogenerated_task_id:
prefix: task-
zfill_width: 5
singularity_execution:
cmd: exec
elevated: false
@ -572,6 +578,18 @@ task dependencies explicitly even if no `tasks` have dependencies specified.
This is useful for scenarios where the same job is used for tasks at a
later time that do have dependencies and/or are dependent on tasks
previously added to the same job. The default is `false`.
* (optional) `autogenerated_task_id` controls how autogenerated task ids
are named. Note that the total length of an autogenerated task id must not
exceed 64 characters. The property specified at this level will overwrite
the "global" setting in the Global configuration file.
* (optional) `prefix` is the task prefix to use with the task id. This can
be any combination of alphanumeric characters including hyphens and
underscores. Empty string is permitted for the `prefix`. The default
is `task-`.
* (optional) `zfill_width` is the number of zeros to left pad the integral
task number. This can be set to zero which may be useful for task
dependency range scenarios in combination with an empty string `prefix`
above. The default is `5`.
* (optional) `federation_constraints` defines properties to apply to the job
and all tasks (i.e., the task group) when submitting the job to a federation.
Please see the [federation guide](68-batch-shipyard-federation.md) for more
@ -835,6 +853,19 @@ task executions.
`generate` generator function. This should be a dictionary where
all keys are strings.
* (optional) `repeat` will create N number of identical tasks.
* (optional) `autogenerated_task_id` controls how autogenerated task ids
are named for tasks of this task factory only. Note that the total length
of an autogenerated task id must not exceed 64 characters. The property
specified at this level will overwrite both the "global" setting in the
Global configuration file and at the job level.
* (optional) `prefix` is the task prefix to use with the task id.
This can be any combination of alphanumeric characters including
hyphens and underscores. Empty string is permitted for the `prefix`.
The default is `task-`.
* (optional) `zfill_width` is the number of zeros to left pad the
integral task number. This can be set to zero which may be useful
for task dependency range scenarios in combination with an empty
string `prefix` above. The default is `5`.
* (optional) `depends_on` is an array of task ids for which this container
invocation (task) depends on and must run to successful completion prior
to this task executing. Note that when a `task_factory` is specified, all

Просмотреть файл

@ -313,6 +313,8 @@ formula will scale up/down both low priority and dedicated nodes.
federation. Adding a pool to multiple federations simultaneously will result
in undefined behavior.
* Singularity containers are not fully supported in federations.
* Auto-genereated task id configuration at the job and task factory level
are not supported.
### Quotas
Ensure that you have sufficient active job/job schedule quota for each

Просмотреть файл

@ -173,6 +173,13 @@ mapping:
type: bool
force_enable_task_dependencies:
type: bool
autogenerated_task_id:
type: map
mapping:
prefix:
type: str
zfill_width:
type: int
federation_constraints:
type: map
mapping:
@ -439,6 +446,13 @@ mapping:
type: str
repeat:
type: int
autogenerated_task_id:
type: map
mapping:
prefix:
type: str
zfill_width:
type: int
id:
type: str
docker_image: