Add job migration support
- Add enable/disable job support too - Resolves #108
This commit is contained in:
Родитель
3b65ba684f
Коммит
7ba85e7496
169
convoy/batch.py
169
convoy/batch.py
|
@ -1000,6 +1000,146 @@ def del_node(batch_client, config, all_start_task_failed, node_id):
|
|||
)
|
||||
|
||||
|
||||
def check_pool_for_job_migration(
|
||||
batch_client, config, jobid=None, poolid=None):
|
||||
# type: (azure.batch.batch_service_client.BatchServiceClient, dict,
|
||||
# str, str) -> None
|
||||
"""Check pool for job migration eligibility
|
||||
:param batch_client: The batch client to use.
|
||||
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
|
||||
:param dict config: configuration dict
|
||||
:param str jobid: job id to migrate
|
||||
:param str poolid: pool id to update to
|
||||
"""
|
||||
if poolid is None:
|
||||
poolid = settings.pool_id(config)
|
||||
if jobid is None:
|
||||
jobs = settings.job_specifications(config)
|
||||
else:
|
||||
jobs = [{'id': jobid}]
|
||||
for _job in jobs:
|
||||
job_id = settings.job_id(_job)
|
||||
job = batch_client.job.get(job_id=job_id)
|
||||
if (job.state == batchmodels.JobState.completed or
|
||||
job.state == batchmodels.JobState.deleting or
|
||||
job.state == batchmodels.JobState.terminating):
|
||||
raise RuntimeError(
|
||||
'cannot migrate job {} in state {}'.format(job_id, job.state))
|
||||
if job.pool_info.auto_pool_specification is not None:
|
||||
raise RuntimeError(
|
||||
'cannot migrate job {} with an autopool specification'.format(
|
||||
job_id))
|
||||
if job.pool_info.pool_id == poolid:
|
||||
raise RuntimeError(
|
||||
'cannot migrate job {} to the same pool {}'.format(
|
||||
job_id, poolid))
|
||||
|
||||
|
||||
def update_job_with_pool(batch_client, config, jobid=None, poolid=None):
|
||||
# type: (azure.batch.batch_service_client.BatchServiceClient, dict,
|
||||
# str, str) -> None
|
||||
"""Update job with different pool id
|
||||
:param batch_client: The batch client to use.
|
||||
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
|
||||
:param dict config: configuration dict
|
||||
:param str jobid: job id to update
|
||||
:param str poolid: pool id to update to
|
||||
"""
|
||||
if poolid is None:
|
||||
poolid = settings.pool_id(config)
|
||||
if jobid is None:
|
||||
jobs = settings.job_specifications(config)
|
||||
else:
|
||||
jobs = [{'id': jobid}]
|
||||
for _job in jobs:
|
||||
job_id = settings.job_id(_job)
|
||||
batch_client.job.patch(
|
||||
job_id=job_id,
|
||||
job_patch_parameter=batchmodels.JobPatchParameter(
|
||||
pool_info=batchmodels.PoolInformation(
|
||||
pool_id=poolid)
|
||||
)
|
||||
)
|
||||
logger.info('updated job {} to target pool {}'.format(
|
||||
job_id, poolid))
|
||||
|
||||
|
||||
def disable_jobs(
|
||||
batch_client, config, disable_tasks_action, jobid=None,
|
||||
disabling_state_ok=False, terminate_tasks=False):
|
||||
# type: (azure.batch.batch_service_client.BatchServiceClient, dict,
|
||||
# str, str, bool, bool) -> None
|
||||
"""Disable jobs
|
||||
:param batch_client: The batch client to use.
|
||||
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
|
||||
:param dict config: configuration dict
|
||||
:param str disable_tasks_action: disable tasks action
|
||||
:param str jobid: job id to disable
|
||||
:param bool disabling_state_ok: disabling state is ok to proceed
|
||||
:param bool terminate_tasks: terminate tasks after disable
|
||||
"""
|
||||
if jobid is None:
|
||||
jobs = settings.job_specifications(config)
|
||||
else:
|
||||
jobs = [{'id': jobid}]
|
||||
for job in jobs:
|
||||
job_id = settings.job_id(job)
|
||||
try:
|
||||
batch_client.job.disable(
|
||||
job_id=job_id,
|
||||
disable_tasks=batchmodels.DisableJobOption(
|
||||
disable_tasks_action),
|
||||
)
|
||||
except batchmodels.batch_error.BatchErrorException as ex:
|
||||
if ('The specified job is already in a completed state' in
|
||||
ex.message.value):
|
||||
pass
|
||||
else:
|
||||
# wait for job to enter disabled/completed/deleting state
|
||||
while True:
|
||||
_job = batch_client.job.get(
|
||||
job_id=job_id,
|
||||
job_get_options=batchmodels.JobGetOptions(
|
||||
select='id,state')
|
||||
)
|
||||
if ((disabling_state_ok and
|
||||
_job.state == batchmodels.JobState.disabling) or
|
||||
_job.state == batchmodels.JobState.disabled or
|
||||
_job.state == batchmodels.JobState.completed or
|
||||
_job.state == batchmodels.JobState.deleting):
|
||||
break
|
||||
time.sleep(1)
|
||||
logger.info('job {} disabled'.format(job_id))
|
||||
if terminate_tasks:
|
||||
terminate_tasks(
|
||||
batch_client, config, jobid=job_id, wait=True)
|
||||
|
||||
|
||||
def enable_jobs(batch_client, config, jobid=None):
|
||||
# type: (azure.batch.batch_service_client.BatchServiceClient, dict,
|
||||
# str) -> None
|
||||
"""Enable jobs
|
||||
:param batch_client: The batch client to use.
|
||||
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
|
||||
:param dict config: configuration dict
|
||||
:param str jobid: job id to enable
|
||||
"""
|
||||
if jobid is None:
|
||||
jobs = settings.job_specifications(config)
|
||||
else:
|
||||
jobs = [{'id': jobid}]
|
||||
for job in jobs:
|
||||
job_id = settings.job_id(job)
|
||||
try:
|
||||
batch_client.job.enable(job_id=job_id)
|
||||
except batchmodels.batch_error.BatchErrorException as ex:
|
||||
if ('The specified job is already in a completed state' in
|
||||
ex.message.value):
|
||||
pass
|
||||
else:
|
||||
logger.info('job {} enabled'.format(job_id))
|
||||
|
||||
|
||||
def del_jobs(batch_client, config, jobid=None, termtasks=False, wait=False):
|
||||
# type: (azure.batch.batch_service_client.BatchServiceClient, dict,
|
||||
# str, bool, bool) -> None
|
||||
|
@ -1030,32 +1170,9 @@ def del_jobs(batch_client, config, jobid=None, termtasks=False, wait=False):
|
|||
logger.debug(
|
||||
'disabling job {} first due to task termination'.format(
|
||||
job_id))
|
||||
try:
|
||||
batch_client.job.disable(
|
||||
job_id,
|
||||
disable_tasks=batchmodels.DisableJobOption.wait
|
||||
)
|
||||
except batchmodels.batch_error.BatchErrorException as ex:
|
||||
if ('The specified job is already in a completed state' in
|
||||
ex.message.value):
|
||||
pass
|
||||
else:
|
||||
# wait for job to enter non-active/enabling state
|
||||
while True:
|
||||
_job = batch_client.job.get(
|
||||
job_id,
|
||||
job_get_options=batchmodels.JobGetOptions(
|
||||
select='id,state')
|
||||
)
|
||||
if (_job.state == batchmodels.JobState.disabling or
|
||||
_job.state == batchmodels.JobState.disabled or
|
||||
_job.state == batchmodels.JobState.completed or
|
||||
_job.state == batchmodels.JobState.deleting):
|
||||
break
|
||||
time.sleep(1)
|
||||
# terminate tasks with forced wait
|
||||
terminate_tasks(
|
||||
batch_client, config, jobid=job_id, wait=True)
|
||||
disable_jobs(
|
||||
batch_client, config, 'wait', jobid=job_id,
|
||||
disabling_state_ok=True, terminate_tasks=True)
|
||||
# delete job
|
||||
batch_client.job.delete(job_id)
|
||||
except batchmodels.batch_error.BatchErrorException as ex:
|
||||
|
|
|
@ -2570,6 +2570,80 @@ def action_jobs_cmi(batch_client, config, delete):
|
|||
batch.del_clean_mi_jobs(batch_client, config)
|
||||
|
||||
|
||||
def action_jobs_migrate(
|
||||
batch_client, config, jobid, poolid, requeue, terminate, wait):
|
||||
# type: (batchsc.BatchServiceClient, dict, str, str, bool, bool,
|
||||
# bool) -> None
|
||||
"""Action: Jobs Migrate
|
||||
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
|
||||
batch client
|
||||
:param dict config: configuration dict
|
||||
:param str jobid: job id to migrate to in lieu of config
|
||||
:param str poolid: pool id to migrate to in lieu of config
|
||||
:param bool requeue: requeue action
|
||||
:param bool terminate: terminate action
|
||||
:param bool wait: wait action
|
||||
"""
|
||||
if [requeue, terminate, wait].count(True) != 1:
|
||||
raise ValueError(
|
||||
'must specify only one option of --requeue, --terminate, --wait')
|
||||
if requeue:
|
||||
action = 'requeue'
|
||||
elif terminate:
|
||||
action = 'terminate'
|
||||
elif wait:
|
||||
action = 'wait'
|
||||
# check jobs to see if targetted pool id is the same
|
||||
batch.check_pool_for_job_migration(
|
||||
batch_client, config, jobid=jobid, poolid=poolid)
|
||||
if not util.confirm_action(config, msg='migrate jobs'):
|
||||
return
|
||||
# disable job and wait for disabled state
|
||||
batch.disable_jobs(batch_client, config, action, jobid=jobid)
|
||||
# patch job
|
||||
batch.update_job_with_pool(
|
||||
batch_client, config, jobid=jobid, poolid=poolid)
|
||||
# enable job
|
||||
batch.enable_jobs(batch_client, config, jobid=jobid)
|
||||
|
||||
|
||||
def action_jobs_disable(
|
||||
batch_client, config, jobid, requeue, terminate, wait):
|
||||
# type: (batchsc.BatchServiceClient, dict, str, bool, bool,
|
||||
# bool) -> None
|
||||
"""Action: Jobs Disable
|
||||
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
|
||||
batch client
|
||||
:param dict config: configuration dict
|
||||
:param str jobid: job id to migrate to in lieu of config
|
||||
:param bool requeue: requeue action
|
||||
:param bool terminate: terminate action
|
||||
:param bool wait: wait action
|
||||
"""
|
||||
if [requeue, terminate, wait].count(True) != 1:
|
||||
raise ValueError(
|
||||
'must specify only one option of --requeue, --terminate, --wait')
|
||||
if requeue:
|
||||
action = 'requeue'
|
||||
elif terminate:
|
||||
action = 'terminate'
|
||||
elif wait:
|
||||
action = 'wait'
|
||||
batch.disable_jobs(
|
||||
batch_client, config, action, jobid=jobid, disabling_state_ok=True)
|
||||
|
||||
|
||||
def action_jobs_enable(batch_client, config, jobid):
|
||||
# type: (batchsc.BatchServiceClient, dict, str) -> None
|
||||
"""Action: Jobs Enable
|
||||
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
|
||||
batch client
|
||||
:param dict config: configuration dict
|
||||
:param str jobid: job id to migrate to in lieu of config
|
||||
"""
|
||||
batch.enable_jobs(batch_client, config, jobid=jobid)
|
||||
|
||||
|
||||
def action_storage_del(
|
||||
blob_client, queue_client, table_client, config, clear_tables):
|
||||
# type: (azureblob.BlockBlobService, azurequeue.QueueService,
|
||||
|
|
|
@ -340,8 +340,11 @@ The `jobs` command has the following sub-commands:
|
|||
cmi Cleanup multi-instance jobs
|
||||
del Delete jobs
|
||||
deltasks Delete specified tasks in jobs
|
||||
disable Disable jobs
|
||||
enable Enable jobs
|
||||
list List jobs
|
||||
listtasks List tasks within jobs
|
||||
migrate Migrate jobs to another pool
|
||||
term Terminate jobs
|
||||
termtasks Terminate specified tasks in jobs
|
||||
```
|
||||
|
@ -365,11 +368,24 @@ configuration file. Active or running tasks will be terminated first.
|
|||
* `--jobid` force deletion scope to just this job id
|
||||
* `--taskid` force deletion scope to just this task id
|
||||
* `--wait` will wait for deletion to complete
|
||||
* `disable` will disable jobs
|
||||
* `--jobid` force disable scope to just this job id
|
||||
* `--requeue` requeue running tasks
|
||||
* `--terminate` terminate running tasks
|
||||
* `--wait` wait for running tasks to complete
|
||||
* `enable` will enable jobs
|
||||
* `--jobid` force enable scope to just this job id
|
||||
* `list` will list all jobs in the Batch account
|
||||
* `listtasks` will list tasks from jobs specified in the jobs configuration
|
||||
file
|
||||
* `--jobid` force scope to just this job id
|
||||
* `--poll-until-tasks-complete` will poll until all tasks have completed
|
||||
* `migrate` will migrate jobs to another pool
|
||||
* `--jobid` force migration scope to just this job id
|
||||
* `--poolid` force migration to this specified pool id
|
||||
* `--requeue` requeue running tasks
|
||||
* `--terminate` terminate running tasks
|
||||
* `--wait` wait for running tasks to complete
|
||||
* `term` will terminate jobs found in the jobs configuration file
|
||||
* `--all` will terminate all jobs found in the Batch account
|
||||
* `--jobid` force termination scope to just this job id
|
||||
|
|
58
shipyard.py
58
shipyard.py
|
@ -1460,6 +1460,64 @@ def jobs_cmi(ctx, delete):
|
|||
convoy.fleet.action_jobs_cmi(ctx.batch_client, ctx.config, delete)
|
||||
|
||||
|
||||
@jobs.command('migrate')
|
||||
@click.option(
|
||||
'--jobid', help='Migrate only the specified job id')
|
||||
@click.option(
|
||||
'--poolid', help='Target specified pool id rather than from configuration')
|
||||
@click.option(
|
||||
'--requeue', is_flag=True, help='Requeue running tasks in job')
|
||||
@click.option(
|
||||
'--terminate', is_flag=True, help='Terminate running tasks in job')
|
||||
@click.option(
|
||||
'--wait', is_flag=True, help='Wait for running tasks to complete in job')
|
||||
@common_options
|
||||
@batch_options
|
||||
@keyvault_options
|
||||
@aad_options
|
||||
@pass_cli_context
|
||||
def jobs_migrate(ctx, jobid, poolid, requeue, terminate, wait):
|
||||
"""Migrate jobs to another pool"""
|
||||
ctx.initialize_for_batch()
|
||||
convoy.fleet.action_jobs_migrate(
|
||||
ctx.batch_client, ctx.config, jobid, poolid, requeue, terminate, wait)
|
||||
|
||||
|
||||
@jobs.command('disable')
|
||||
@click.option(
|
||||
'--jobid', help='Disable only the specified job id')
|
||||
@click.option(
|
||||
'--requeue', is_flag=True, help='Requeue running tasks in job')
|
||||
@click.option(
|
||||
'--terminate', is_flag=True, help='Terminate running tasks in job')
|
||||
@click.option(
|
||||
'--wait', is_flag=True, help='Wait for running tasks to complete in job')
|
||||
@common_options
|
||||
@batch_options
|
||||
@keyvault_options
|
||||
@aad_options
|
||||
@pass_cli_context
|
||||
def jobs_disable(ctx, jobid, requeue, terminate, wait):
|
||||
"""Disable jobs"""
|
||||
ctx.initialize_for_batch()
|
||||
convoy.fleet.action_jobs_disable(
|
||||
ctx.batch_client, ctx.config, jobid, requeue, terminate, wait)
|
||||
|
||||
|
||||
@jobs.command('enable')
|
||||
@click.option(
|
||||
'--jobid', help='Enable only the specified job id')
|
||||
@common_options
|
||||
@batch_options
|
||||
@keyvault_options
|
||||
@aad_options
|
||||
@pass_cli_context
|
||||
def jobs_enable(ctx, jobid):
|
||||
"""Enable jobs"""
|
||||
ctx.initialize_for_batch()
|
||||
convoy.fleet.action_jobs_enable(ctx.batch_client, ctx.config, jobid)
|
||||
|
||||
|
||||
@cli.group()
|
||||
@pass_cli_context
|
||||
def data(ctx):
|
||||
|
|
Загрузка…
Ссылка в новой задаче