Add monitor_task_completion for recurring jobs

This commit is contained in:
Fred Park 2017-08-08 20:29:46 -07:00
Родитель 5ae9001716
Коммит 44a1f14b31
6 изменённых файлов: 137 добавлений и 20 удалений

Просмотреть файл

@ -31,7 +31,8 @@
},
"job_manager": {
"allow_low_priority_node": true,
"run_exclusive": false
"run_exclusive": false,
"monitor_task_completion": false
}
},
"allow_run_on_missing_image": false,

Просмотреть файл

@ -285,7 +285,7 @@ def _block_for_nodes_ready(
fatal_resize_error = False
errors = []
for err in pool.resize_errors:
errors.append('code={} msg={}'.format(err.code, err.message))
errors.append('{}: {}'.format(err.code, err.message))
if (err.code == 'AccountCoreQuotaReached' or
(err.code == 'AccountLowPriorityCoreQuotaReached' and
pool.target_dedicated_nodes == 0)):
@ -3191,17 +3191,29 @@ def add_jobs(
# create jobschedule
recurrence = settings.job_recurrence(jobspec)
if recurrence is not None:
if not auto_complete:
logger.warning(
('recurrence specified for job schedule {}, but '
'auto_complete is disabled').format(job_id))
on_all_tasks_complete = (
batchmodels.OnAllTasksComplete.no_action
)
if recurrence.job_manager.monitor_task_completion:
kill_job_on_completion = True
else:
kill_job_on_completion = False
if auto_complete:
if kill_job_on_completion:
logger.warning(
('overriding monitor_task_completion with '
'auto_complete for job schedule {}').format(
job_id))
kill_job_on_completion = False
on_all_tasks_complete = (
batchmodels.OnAllTasksComplete.terminate_job
)
else:
if not kill_job_on_completion:
logger.warning(
('recurrence specified for job schedule {}, but '
'auto_complete and monitor_task_completion are '
'both disabled').format(job_id))
on_all_tasks_complete = (
batchmodels.OnAllTasksComplete.no_action
)
jobschedule = batchmodels.JobScheduleAddParameter(
id=job_id,
schedule=batchmodels.Schedule(
@ -3225,10 +3237,12 @@ def add_jobs(
'.shipyard-jmtask.envlist '
'-v $AZ_BATCH_TASK_DIR:$AZ_BATCH_TASK_DIR '
'-w $AZ_BATCH_TASK_WORKING_DIR '
'alfpark/batch-shipyard:rjm-{}').format(
__version__),
'alfpark/batch-shipyard:rjm-{}{}').format(
__version__,
' --monitor' if kill_job_on_completion else ''
)
]),
kill_job_on_completion=False,
kill_job_on_completion=kill_job_on_completion,
user_identity=_RUN_ELEVATED,
run_exclusive=recurrence.job_manager.run_exclusive,
authentication_token_settings=batchmodels.

Просмотреть файл

@ -218,7 +218,7 @@ JobScheduleSettings = collections.namedtuple(
)
JobManagerSettings = collections.namedtuple(
'JobManagerSettings', [
'allow_low_priority_node', 'run_exclusive',
'allow_low_priority_node', 'run_exclusive', 'monitor_task_completion',
]
)
JobRecurrenceSettings = collections.namedtuple(
@ -2213,6 +2213,8 @@ def job_recurrence(conf):
allow_low_priority_node=_kv_read(
jm, 'allow_low_priority_node', True),
run_exclusive=_kv_read(jm, 'run_exclusive', False),
monitor_task_completion=_kv_read(
jm, 'monitor_task_completion', False),
)
)
else:

Просмотреть файл

@ -39,7 +39,8 @@ The jobs schema is as follows:
},
"job_manager": {
"allow_low_priority_node": true,
"run_exclusive": false
"run_exclusive": false,
"monitor_task_completion": false
}
},
"allow_run_on_missing_image": false,
@ -256,9 +257,9 @@ specified `tasks` under the job will be added to the existing job.
* (optional) `auto_complete` enables auto-completion of the job for which
the specified tasks are run under. When run with multi-instance tasks, this
performs automatic cleanup of the Docker container which is run in detached
mode. The default is `false`. It is important to set this value to `true`
if you do not have logic in your tasks to terminate or delete your job when
using a job `recurrence`.
mode. The default is `false`. If creating a job `recurrence`, utilizing
`auto_complete` is one way to have recurrent job instances created from a
schedule to complete such that the next job recurrence can be created.
* (optional) `environment_variables` under the job are environment variables
which will be applied to all tasks operating under the job. Note that
environment variables are not expanded and are passed as-is. You will need
@ -351,8 +352,34 @@ a set interval.
implication is that even if you set this property to a minimum value
of 1 minute, there may be delays in completing the job and triggering
the next which may artificially increase the time between recurrences.
It is important to set the `auto_complete` setting to `true` if your
It is important to set either the `auto_complete` or the
`job_manager`:`monitor_task_completion` setting to `true` if your
tasks have no logic to terminate or delete the parent job.
* (optional) `job_manager` property controls the job manager execution. The
job manager is the task that is automatically created and run on a compute
node that submits the `tasks` at the given `recurrence_interval`.
* (optional) `allow_low_priority_node` allows the job manager to run
on a low priority node. The default is `true`. Sometimes it is necessary
to guarantee that the job manager is not preempted, if so, set this
value to `false` and ensure that your pool has dedicated nodes
provisioned.
* (optional) `run_exclusive` forces the job manager to run on a compute
node where there are no other tasks running. The default is `false`.
This is only relevant when the pool's `max_tasks_per_node` setting is
greater than 1.
* (optional) `monitor_task_completion` allows the job manager to monitor
the tasks in the job for completion instead of relying on
`auto_complete`. The advantage for doing so is that the job can move
much more quickly into completed state thus allowing the next job
recurrence to be created for very small values of `recurrence_interval`.
In order to properly utilize this feature, you must either set
your pool's `max_tasks_per_node` to greater than 1 or have more than
one compute node in your pool. If neither of these conditions are met,
then the tasks that the job manager creates will be blocked as there
will be no free scheduling slots to accommodate them (since the job
manager task occupies a scheduling slot itself). The default is
`false`. Setting both this value and `auto_complete` to `true` will
result in `auto_complete` as `true` behavior.
* (optional) `allow_run_on_missing_image` allows tasks with a Docker image
reference that was not pre-loaded on to the compute node via
`global_resources`:`docker_images` in the global configuration to be able to

Просмотреть файл

@ -4,7 +4,7 @@ FROM alpine:3.6
MAINTAINER Fred Park <https://github.com/Azure/batch-shipyard>
# copy in files
COPY recurrent_jm.py requirements.txt /opt/batch-shipyard/
COPY recurrent_job_manager.py requirements.txt /opt/batch-shipyard/
# add base packages and python dependencies
RUN apk update \
@ -22,4 +22,4 @@ RUN apk update \
RUN python3 -m compileall -f /opt/batch-shipyard
# set entrypoint
ENTRYPOINT ["/opt/batch-shipyard/recurrent_jm.py"]
ENTRYPOINT ["/opt/batch-shipyard/recurrent_job_manager.py"]

Просмотреть файл

@ -25,10 +25,12 @@
# DEALINGS IN THE SOFTWARE.
# stdlib imports
import argparse
import logging
import logging.handlers
import os
import pickle
import time
# non-stdlib imports
import azure.batch.models as batchmodels
import azure.batch.batch_service_client as batch
@ -170,8 +172,60 @@ def _add_task_collection(batch_client, job_id, task_map):
len(task_map), job_id))
def _monitor_tasks(batch_client, job_id, numtasks):
# type: (batch.BatchServiceClient, str, int) -> None
"""Monitor tasks for completion
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param str job_id: job to add to
:param int numtasks: number of tasks
"""
i = 0
j = 0
while True:
try:
task_counts = batch_client.job.get_task_counts(job_id=job_id)
except batchmodels.batch_error.BatchErrorException as ex:
logger.exception(ex)
else:
if (task_counts.validation_status ==
batchmodels.TaskCountValidationStatus.validated):
j = 0
if task_counts.completed == numtasks:
logger.info(task_counts)
logger.info('all {} tasks completed'.format(numtasks))
break
else:
# unvalidated, perform manual list tasks
j += 1
if j % 10 == 0:
j = 0
try:
tasks = batch_client.task.list(
job_id=job_id,
task_list_options=batchmodels.TaskListOptions(
select='id,state')
)
states = [task.state for task in tasks]
except batchmodels.batch_error.BatchErrorException as ex:
logger.exception(ex)
else:
if (states.count(batchmodels.TaskState.completed) ==
numtasks):
logger.info('all {} tasks completed'.format(
numtasks))
break
i += 1
if i % 15 == 0:
i = 0
logger.debug(task_counts)
time.sleep(2)
def main():
"""Main function"""
# get command-line args
args = parseargs()
# get job id
job_id = os.environ['AZ_BATCH_JOB_ID']
# create batch client
@ -182,6 +236,25 @@ def main():
task_map = pickle.load(f, fix_imports=True)
# submit tasks to job
_add_task_collection(batch_client, job_id, task_map)
# monitor tasks for completion
if not args.monitor:
logger.info('not monitoring tasks for completion')
else:
logger.info('monitoring tasks for completion')
_monitor_tasks(batch_client, job_id, len(task_map))
def parseargs():
"""Parse program arguments
:rtype: argparse.Namespace
:return: parsed arguments
"""
parser = argparse.ArgumentParser(
description='rjm: Azure Batch Shipyard recurrent job manager')
parser.set_defaults(monitor=False)
parser.add_argument(
'--monitor', action='store_true', help='monitor tasks for completion')
return parser.parse_args()
if __name__ == '__main__':