Add listjobs, listtasks, gettaskallfiles actions

This commit is contained in:
Fred Park 2016-10-13 14:11:52 -07:00
Родитель 4ce2f1d6c2
Коммит d88475baa7
4 изменённых файлов: 183 добавлений и 22 удалений

Просмотреть файл

@ -5,6 +5,8 @@
- Data ingress support to GlusterFS. Please see the configuration doc for
more information.
- Experimental support for OpenSSH HPN on Ubuntu
- Additional actions: `ingressdata`, `gettaskallfiles`, `listjobs`,
`listtasks`. Please see the usage doc for more information.
### Changed
- **Breaking Change:** `ssh_docker_tunnel` in the `pool_specification` has

Просмотреть файл

@ -62,7 +62,7 @@ def _reboot_node(batch_client, pool_id, node_id, wait):
# type: (batch.BatchServiceClient, str, str, bool) -> None
"""Reboot a node in a pool
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param str pool_id: pool id of node
:param str node_id: node id to delete
:param bool wait: wait for node to enter rebooting state
@ -88,7 +88,7 @@ def _wait_for_pool_ready(batch_client, node_state, pool_id, reboot_on_failed):
# str, bool) -> List[batchmodels.ComputeNode]
"""Wait for pool to enter "ready": steady state and all nodes idle
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param str pool_id: pool id
:param bool reboot_on_failed: reboot node on failed start state
@ -148,7 +148,7 @@ def create_pool(batch_client, config, pool):
# List[batchmodels.ComputeNode]
"""Create pool if not exists
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param batchmodels.PoolAddParameter pool: pool addparameter object
:rtype: list
@ -189,10 +189,10 @@ def _add_admin_user_to_compute_node(
"""Adds an administrative user to the Batch Compute Node with a default
expiry time of 7 days if not specified.
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param node: The compute node.
:type node: `batchserviceclient.models.ComputeNode`
:type node: `azure.batch.batch_service_client.models.ComputeNode`
:param str username: user name
:param str ssh_public_key: ssh rsa public key
"""
@ -227,7 +227,7 @@ def add_ssh_user(batch_client, config, nodes=None):
# List[batchmodels.ComputeNode]) -> None
"""Add an SSH user to node and optionally generate an SSH tunneling script
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param list nodes: list of nodes
"""
@ -283,7 +283,7 @@ def resize_pool(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Resize a pool
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
pool_id = config['pool_specification']['id']
@ -302,7 +302,7 @@ def del_pool(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Delete a pool
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
pool_id = config['pool_specification']['id']
@ -317,7 +317,7 @@ def del_node(batch_client, config, node_id):
# type: (batch.BatchServiceClient, dict, str) -> None
"""Delete a node in a pool
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param str node_id: node id to delete
"""
@ -340,7 +340,7 @@ def del_jobs(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Delete jobs
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
for job in config['job_specifications']:
@ -356,7 +356,7 @@ def clean_mi_jobs(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Clean up multi-instance jobs
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
for job in config['job_specifications']:
@ -421,7 +421,7 @@ def del_clean_mi_jobs(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Delete clean up multi-instance jobs
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
for job in config['job_specifications']:
@ -438,7 +438,7 @@ def terminate_jobs(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Terminate jobs
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
for job in config['job_specifications']:
@ -454,7 +454,7 @@ def del_all_jobs(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Delete all jobs
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
logger.debug('Getting list of all jobs...')
@ -471,7 +471,7 @@ def get_remote_login_settings(batch_client, config, nodes=None):
# type: (batch.BatchServiceClient, dict, List[str], bool) -> dict
"""Get remote login settings
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param list nodes: list of nodes
:rtype: dict
@ -494,7 +494,7 @@ def stream_file_and_wait_for_task(batch_client, filespec=None):
# type: (batch.BatchServiceClient, str) -> None
"""Stream a file and wait for task to complete
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param str filespec: filespec (jobid:taskid:filename)
"""
if filespec is None:
@ -572,7 +572,7 @@ def get_file_via_task(batch_client, config, filespec=None):
# type: (batch.BatchServiceClient, dict, str) -> None
"""Get a file task style
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param str filespec: filespec (jobid:taskid:filename)
"""
@ -626,11 +626,71 @@ def get_file_via_task(batch_client, config, filespec=None):
file, job_id, task_id, fp.stat().st_size))
def get_all_files_via_task(batch_client, config, filespec=None):
# type: (batch.BatchServiceClient, dict, str) -> None
"""Get all files from a task
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param str filespec: filespec (jobid:taskid:filename)
"""
if filespec is None:
job_id = None
task_id = None
else:
job_id, task_id = filespec.split(':')
if job_id is None:
job_id = convoy.util.get_input('Enter job id: ')
if task_id is None:
task_id = convoy.util.get_input('Enter task id: ')
# get first running task if specified
if task_id == '@FIRSTRUNNING':
logger.debug('attempting to get first running task in job {}'.format(
job_id))
while True:
tasks = batch_client.task.list(
job_id,
task_list_options=batchmodels.TaskListOptions(
filter='state eq \'running\'',
),
)
for task in tasks:
task_id = task.id
break
if task_id == '@FIRSTRUNNING':
time.sleep(1)
else:
break
# iterate through all files in task and download them
logger.debug('downloading files to {}/{}'.format(job_id, task_id))
files = batch_client.file.list_from_task(job_id, task_id, recursive=True)
i = 0
dirs_created = set()
for file in files:
if file.is_directory:
continue
stream = batch_client.file.get_from_task(job_id, task_id, file.name)
fp = pathlib.Path(job_id, task_id, file.name)
if str(fp.parent) not in dirs_created:
fp.parent.mkdir(mode=0o750, parents=True, exist_ok=True)
dirs_created.add(str(fp.parent))
with fp.open('wb') as f:
for data in stream:
f.write(data)
i += 1
if i == 0:
logger.error('no files found for task {} job {}'.format(
task_id, job_id))
else:
logger.info('all task files retrieved from job={} task={}'.format(
job_id, task_id))
def get_file_via_node(batch_client, config, node_id):
# type: (batch.BatchServiceClient, dict, str) -> None
"""Get a file node style
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:param str nodeid: node id
"""
@ -657,12 +717,92 @@ def get_file_via_node(batch_client, config, node_id):
file, pool_id, node_id, fp.stat().st_size))
def list_jobs(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""List all jobs
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
jobs = batch_client.job.list()
i = 0
for job in jobs:
logger.info('job_id={} [state={} pool_id={}]'.format(
job.id, job.state, job.pool_info.pool_id))
i += 1
if i == 0:
logger.error('no jobs found')
def list_tasks(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""List tasks for specified jobs
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
for job in config['job_specifications']:
i = 0
try:
tasks = batch_client.task.list(job['id'])
for task in tasks:
logger.info(
'job_id={} task_id={} [display_name={} state={} '
'pool_id={} node_id={}]'.format(
job['id'], task.id, task.display_name, task.state,
task.node_info.pool_id, task.node_info.node_id))
i += 1
except batchmodels.batch_error.BatchErrorException as ex:
if 'The specified job does not exist' in ex.message.value:
logger.error('{} job does not exist'.format(job['id']))
continue
if i == 0:
logger.error('no tasks found for job {}'.format(job['id']))
def list_task_files(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""List task files for specified jobs
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
for job in config['job_specifications']:
i = 0
try:
tasks = batch_client.task.list(job['id'])
for task in tasks:
j = 0
files = batch_client.file.list_from_task(
job['id'], task.id, recursive=True)
for file in files:
if file.is_directory:
continue
logger.info(
'task_id={} file={} [job_id={} lmt={} '
'bytes={}]'.format(
task.id, file.name, job['id'],
file.properties.last_modified,
file.properties.content_length))
j += 1
if j == 0:
logger.error('no files found for task {} job {}'.format(
task.id, job['id']))
i += 1
except batchmodels.batch_error.BatchErrorException as ex:
if 'The specified job does not exist' in ex.message.value:
logger.error('{} job does not exist'.format(job['id']))
continue
if i == 0:
logger.error('no tasks found for job {}'.format(job['id']))
def add_jobs(batch_client, blob_client, config, jpfile):
# type: (batch.BatchServiceClient, azureblob.BlockBlobService,
# tuple, dict) -> None
"""Add jobs
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
:param tuple jpfile: jobprep file

Просмотреть файл

@ -67,7 +67,13 @@ cleaning up these types of jobs.
file.
* `streamfile`: stream a file from a live compute node.
* `gettaskfile`: retrieve a file with job id/task id from a live compute node.
* `gettaskallfiles`: retrieve all files with job id/task id from a live
compute node. `--filespec` can be used here but without the filename, e.g.,
`--filespec myjob:mytask`.
* `getnodefile`: retrieve a file with pool id/node id from a live compute node.
* `ingressdata`: ingress data as specified in the global configuration file.
* `listjobs`: list all jobs under the Batch account.
* `listtasks`: list tasks under jobs specified in the jobs configuraiton file.
* `clearstorage`: clear storage containers as specified in the configuration
files.
* `delstorage`: delete storage containers as specified in the configuration

Просмотреть файл

@ -564,7 +564,7 @@ def _setup_glusterfs(batch_client, blob_client, config, nodes):
# List[batchmodels.ComputeNode]) -> None
"""Setup glusterfs via multi-instance task
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
:param list nodes: list of nodes
@ -820,7 +820,9 @@ def main():
config['pool_specification'] = {
'id': args.poolid
}
if args.action in ('addjobs', 'cleanmijobs', 'deljobs', 'termjobs'):
if args.action in (
'addjobs', 'cleanmijobs', 'delcleanmijobs', 'deljobs',
'termjobs', 'listtasks', 'listtaskfiles'):
if args.configdir is not None and args.jobs is None:
args.jobs = str(pathlib.Path(args.configdir, 'jobs.json'))
try:
@ -882,10 +884,19 @@ def main():
convoy.batch.stream_file_and_wait_for_task(batch_client, args.filespec)
elif args.action == 'gettaskfile':
convoy.batch.get_file_via_task(batch_client, config, args.filespec)
elif args.action == 'gettaskallfiles':
convoy.batch.get_all_files_via_task(
batch_client, config, args.filespec)
elif args.action == 'getnodefile':
convoy.batch.get_file_via_node(batch_client, config, args.nodeid)
elif args.action == 'ingressdata':
convoy.data.ingress_data(batch_client, config)
elif args.action == 'listjobs':
convoy.batch.list_jobs(batch_client, config)
elif args.action == 'listtasks':
convoy.batch.list_tasks(batch_client, config)
elif args.action == 'listtaskfiles':
convoy.batch.list_task_files(batch_client, config)
elif args.action == 'delstorage':
convoy.storage.delete_storage_containers(
blob_client, queue_client, table_client, config)
@ -908,7 +919,9 @@ def parseargs():
parser.add_argument(
'action', help='addpool, addjobs, addsshuser, cleanmijobs, '
'termjobs, deljobs, delcleanmijobs, delalljobs, delpool, delnode, '
'grls, streamfile, gettaskfile, getnodefile, clearstorage, delstorage')
'grls, streamfile, gettaskfile, gettaskallfiles, getnodefile, '
'ingressdata, listjobs, listtasks, listtaskfiles, clearstorage, '
'delstorage')
parser.add_argument(
'-v', '--verbose', dest='verbose', action='store_true',
help='verbose output')