- Resolves #86
This commit is contained in:
Fred Park 2017-05-17 18:13:24 -07:00
Родитель 644e86ddb6
Коммит d2b066bf6d
2 изменённых файлов: 125 добавлений и 41 удалений

Просмотреть файл

@ -1905,25 +1905,43 @@ def generate_docker_login_settings(config, for_ssh=False):
return env, cmd
def _generate_next_generic_task_id(batch_client, job_id, reserved=None):
def _format_generic_task_id(tasknum):
# type: (int) -> str
"""Format a generic task id from a task number
:param int tasknum: task number
:rtype: str
:return: generic task id
"""
if tasknum > 99999:
return '{}{}'.format(_GENERIC_DOCKER_TASK_PREFIX, tasknum)
else:
return '{0}{1:05d}'.format(_GENERIC_DOCKER_TASK_PREFIX, tasknum)
def _generate_next_generic_task_id(
batch_client, job_id, tasklist=None, reserved=None, task_map=None):
# type: (azure.batch.batch_service_client.BatchServiceClient, str,
# str) -> str
# list, str, dict) -> Tuple[list, str]
"""Generate the next generic task id
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param str job_id: job id
:param list tasklist: list of current (committed) tasks in job
:param str reserved: reserved task id
:rtype: str
:return: returns a generic docker task id
:param dict task_map: map of pending tasks to add to the job
:rtype: tuple
:return: (list of committed task ids for job, next generic docker task id)
"""
# get filtered, sorted list of generic docker task ids
try:
tasklist = batch_client.task.list(
job_id,
task_list_options=batchmodels.TaskListOptions(
filter='startswith(id, \'{}\')'.format(
_GENERIC_DOCKER_TASK_PREFIX),
select='id'))
if util.is_none_or_empty(tasklist):
tasklist = batch_client.task.list(
job_id,
task_list_options=batchmodels.TaskListOptions(
filter='startswith(id, \'{}\')'.format(
_GENERIC_DOCKER_TASK_PREFIX),
select='id'))
tasklist = list(tasklist)
tasknum = sorted([int(x.id.split('-')[-1]) for x in tasklist])[-1] + 1
except (batchmodels.batch_error.BatchErrorException, IndexError):
tasknum = 0
@ -1931,10 +1949,73 @@ def _generate_next_generic_task_id(batch_client, job_id, reserved=None):
tasknum_reserved = int(reserved.split('-')[-1])
while tasknum == tasknum_reserved:
tasknum += 1
if tasknum > 99999:
return '{}{}'.format(_GENERIC_DOCKER_TASK_PREFIX, tasknum)
else:
return '{0}{1:05d}'.format(_GENERIC_DOCKER_TASK_PREFIX, tasknum)
id = _format_generic_task_id(tasknum)
if task_map is not None:
while id in task_map:
tasknum += 1
id = _format_generic_task_id(tasknum)
return tasklist, id
def _add_task_collection(batch_client, job_id, task_map):
# type: (batch.BatchServiceClient, str, dict) -> None
"""Add a collection of tasks to a job
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param str job_id: job to add to
:param dict task_map: task collection map to add
"""
all_tasks = list(task_map.values())
start = 0
slice = 100 # can only submit up to 100 tasks at a time
while True:
end = start + slice
if end > len(all_tasks):
end = len(all_tasks)
chunk = all_tasks[start:end]
logger.debug('submitting {} tasks ({} -> {}) to job {}'.format(
len(chunk), start, end - 1, job_id))
try:
results = batch_client.task.add_collection(job_id, chunk)
except batchmodels.BatchErrorException as e:
if e.error.code == 'RequestBodyTooLarge':
# collection contents are too large, reduce and retry
if slice == 1:
raise
slice = slice >> 1
if slice < 1:
slice = 1
logger.error(
('task collection slice was too big, retrying with '
'slice={}').format(slice))
continue
else:
# go through result and retry just failed tasks
while True:
retry = []
for result in results.value:
if result.status == batchmodels.TaskAddStatus.client_error:
logger.error(
('skipping retry of adding task {} as it '
'returned a client error (code={} message={}) '
'for job {}').format(
result.task_id, result.error.code,
result.error.message, job_id))
elif (result.status ==
batchmodels.TaskAddStatus.server_error):
retry.append(task_map[result.task_id])
if len(retry) > 0:
logger.debug('retrying adding {} tasks to job {}'.format(
len(retry), job_id))
results = batch_client.task.add_collection(job_id, retry)
else:
break
if end == len(all_tasks):
break
start += slice
slice = 100
logger.info('submitted all {} tasks to job {}'.format(
len(task_map), job_id))
def add_jobs(
@ -1989,6 +2070,7 @@ def add_jobs(
uses_task_dependencies = False
missing_images = []
allow_run_on_missing = settings.job_allow_run_on_missing(jobspec)
existing_tasklist = None
# check for public pull on missing setting
if (allow_run_on_missing and
preg.allow_public_docker_hub_pull_on_missing):
@ -2023,19 +2105,15 @@ def add_jobs(
if util.is_none_or_empty(mi_docker_container_name):
_id = settings.task_id(task)
if util.is_none_or_empty(_id):
reserved_task_id = _generate_next_generic_task_id(
batch_client, job_id)
existing_tasklist, reserved_task_id = \
_generate_next_generic_task_id(
batch_client, job_id,
tasklist=existing_tasklist)
settings.set_task_id(task, reserved_task_id)
_id = '{}-{}'.format(job_id, reserved_task_id)
settings.set_task_name(task, _id)
mi_docker_container_name = settings.task_name(task)
del _id
# set autocomplete settings
job_updated_for_auto_terminate = False
if auto_complete:
set_terminate_on_all_tasks_complete = True
else:
set_terminate_on_all_tasks_complete = False
# define max task retry count constraint for this task if set
job_constraints = None
max_task_retries = settings.job_max_task_retries(jobspec)
@ -2122,7 +2200,6 @@ def add_jobs(
raise
else:
raise
del auto_complete
del multi_instance
del mi_docker_container_name
del uses_task_dependencies
@ -2137,11 +2214,13 @@ def add_jobs(
del jevs
del _job_env_vars_secid
# add all tasks under job
task_map = {}
for _task in settings.job_tasks(jobspec):
_task_id = settings.task_id(_task)
if util.is_none_or_empty(_task_id):
_task_id = _generate_next_generic_task_id(
batch_client, job.id, reserved_task_id)
existing_tasklist, _task_id = _generate_next_generic_task_id(
batch_client, job.id, tasklist=existing_tasklist,
reserved=reserved_task_id, task_map=task_map)
settings.set_task_id(_task, _task_id)
if util.is_none_or_empty(settings.task_name(_task)):
settings.set_task_name(_task, '{}-{}'.format(job.id, _task_id))
@ -2302,25 +2381,27 @@ def add_jobs(
# create task
if settings.verbose(config):
if mis is not None:
logger.info(
'Multi-instance task coordination command: {}'.format(
logger.debug(
'multi-instance task coordination command: {}'.format(
mis.coordination_command_line))
logger.info('Adding task: {} command: {}'.format(
logger.debug('task: {} command: {}'.format(
task.id, batchtask.command_line))
else:
logger.info('Adding task: {}'.format(task.id))
batch_client.task.add(job_id=job.id, task=batchtask)
# update job if job autocompletion is needed
if (set_terminate_on_all_tasks_complete and
not job_updated_for_auto_terminate):
batch_client.job.update(
job_id=job.id,
job_update_parameter=batchmodels.JobUpdateParameter(
pool_info=batchmodels.PoolInformation(pool_id=pool.id),
on_all_tasks_complete=batchmodels.
OnAllTasksComplete.terminate_job))
job_updated_for_auto_terminate = True
if task.id in task_map:
raise RuntimeError(
'duplicate task id detected: {} for job {}'.format(
task.id, job.id))
task_map[task.id] = batchtask
lasttask = task.id
# add task collection to job
_add_task_collection(batch_client, job.id, task_map)
# update job if job autocompletion is needed
if auto_complete:
batch_client.job.update(
job_id=job.id,
job_update_parameter=batchmodels.JobUpdateParameter(
pool_info=batchmodels.PoolInformation(pool_id=pool.id),
on_all_tasks_complete=batchmodels.
OnAllTasksComplete.terminate_job))
# tail file if specified
if tail:
stream_file_and_wait_for_task(

Просмотреть файл

@ -46,6 +46,9 @@ Batch Shipyard at this time.
current limitation of the underlying Azure Batch service.
* Only Intel MPI can be used in conjunction Infiniband/RDMA on Azure Linux VMs.
This is a current limitation of the underlying VM and host drivers.
* Adding tasks to the same job across multiple, concurrent Batch Shipyard
invocations may result in failure if task ids for these jobs are
auto-generated.
### Special Considerations for Low-Priority Compute Nodes
* Pool and compute node allocation may take up to the full resize timeout