From 33300c551ccf1093c8600d847909a0b8a8dc7b85 Mon Sep 17 00:00:00 2001 From: Fred Park Date: Fri, 14 Oct 2016 13:59:19 -0700 Subject: [PATCH] Add pool/job/task-level data ingress support --- CHANGELOG.md | 15 +++- config_templates/jobs.json | 22 +++++ config_templates/pool.json | 11 +++ convoy/batch.py | 41 ++++++--- convoy/data.py | 91 ++++++++++++++++++++ convoy/storage.py | 20 +++++ docs/10-batch-shipyard-configuration.md | 105 ++++++++++++++++++++++++ docs/20-batch-shipyard-usage.md | 3 +- scripts/shipyard_blobingress.sh | 24 ++++++ scripts/shipyard_nodeprep.sh | 5 +- shipyard.py | 17 +++- 11 files changed, 336 insertions(+), 18 deletions(-) create mode 100755 scripts/shipyard_blobingress.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index c015a63..82d4acd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,19 @@ ## [Unreleased] ### Added -- Data ingress support to GlusterFS and Azure Blob Storage. Please see the -configuration doc for more information. +- Comprehensive data movement support. Please see the configuration doc for +more information. + - Ingress from local machine with `files` in global configuration + - To GlusterFS shared volume + - To Azure Blob Storage + - Ingress from Azure Blob Storage with `input_data` in pool and jobs + configuration + - Pool-level: to compute nodes + - Job-level: to compute nodes running the specified job + - Task-level: to compute nodes running a task of a job + - Egress to local machine as actions + - Single file from compute node + - Whole task-level directories from compute node - Experimental support for OpenSSH HPN on Ubuntu - Additional actions: `ingressdata`, `gettaskallfiles`, `listjobs`, `listtasks`. Please see the usage doc for more information. diff --git a/config_templates/jobs.json b/config_templates/jobs.json index f4c980b..0f0e007 100644 --- a/config_templates/jobs.json +++ b/config_templates/jobs.json @@ -6,6 +6,17 @@ "environment_variables": { "abc": "xyz" }, + "input_data": { + "azure_blob": [ + { + "storage_account_settings": "mystorageaccount", + "container": "jobcontainer", + "include": ["jobdata*.bin"], + "destination": "$AZ_BATCH_NODE_SHARED_DIR/jobdata", + "blobxfer_extra_options": null + } + ] + }, "tasks": [ { "id": null, @@ -33,6 +44,17 @@ "file_mode": "" } ], + "input_data": { + "azure_blob": [ + { + "storage_account_settings": "mystorageaccount", + "container": "taskcontainer", + "include": ["taskdata*.bin"], + "destination": "$AZ_BATCH_NODE_SHARED_DIR/taskdata", + "blobxfer_extra_options": null + } + ] + }, "entrypoint": null, "command": "", "infiniband": false, diff --git a/config_templates/pool.json b/config_templates/pool.json index d63e710..e010d91 100644 --- a/config_templates/pool.json +++ b/config_templates/pool.json @@ -11,6 +11,17 @@ "reboot_on_start_task_failed": true, "block_until_all_global_resources_loaded": true, "transfer_files_on_pool_creation": false, + "input_data": { + "azure_blob": [ + { + "storage_account_settings": "mystorageaccount", + "container": "poolcontainer", + "include": ["pooldata*.bin"], + "destination": "$AZ_BATCH_NODE_SHARED_DIR/pooldata", + "blobxfer_extra_options": null + } + ] + }, "ssh": { "username": "docker", "expiry_days": 7, diff --git a/convoy/batch.py b/convoy/batch.py index 8979b10..e831f4d 100644 --- a/convoy/batch.py +++ b/convoy/batch.py @@ -36,6 +36,7 @@ import time # non-stdlib imports import azure.batch.models as batchmodels # local imports +import convoy.data import convoy.storage import convoy.util @@ -819,15 +820,16 @@ def list_task_files(batch_client, config): logger.error('no tasks found for job {}'.format(job['id'])) -def add_jobs(batch_client, blob_client, config, jpfile): +def add_jobs(batch_client, blob_client, config, jpfile, bifile): # type: (batch.BatchServiceClient, azureblob.BlockBlobService, - # tuple, dict) -> None + # dict, tuple, tuple) -> None """Add jobs :param batch_client: The batch client to use. :type batch_client: `azure.batch.batch_service_client.BatchServiceClient` :param azure.storage.blob.BlockBlobService blob_client: blob client :param dict config: configuration dict :param tuple jpfile: jobprep file + :param tuple bifile: blob ingress file """ # get the pool inter-node comm setting pool_id = config['pool_specification']['id'] @@ -835,11 +837,17 @@ def add_jobs(batch_client, blob_client, config, jpfile): global_resources = [] for gr in config['global_resources']['docker_images']: global_resources.append(gr) - # TODO add global resources for non-docker resources - jpcmdline = convoy.util.wrap_commands_in_shell([ - '$AZ_BATCH_NODE_SHARED_DIR/{} {}'.format( - jpfile[0], ' '.join(global_resources))]) + jpcmd = ['$AZ_BATCH_NODE_SHARED_DIR/{} {}'.format( + jpfile[0], ' '.join(global_resources))] for jobspec in config['job_specifications']: + # digest any input_data + addlcmds = convoy.data.process_input_data( + blob_client, config, bifile, jobspec) + if addlcmds is not None: + jpcmd.append(addlcmds) + del addlcmds + jpcmdline = convoy.util.wrap_commands_in_shell(jpcmd) + del jpcmd job = batchmodels.JobAddParameter( id=jobspec['id'], pool_info=batchmodels.PoolInformation(pool_id=pool_id), @@ -1244,6 +1252,12 @@ def add_jobs(batch_client, blob_client, config, jpfile): image, '{}'.format(' ' + command) if command else '') ] + # digest any input_data + addlcmds = convoy.data.process_input_data( + blob_client, config, bifile, task, on_task=True) + if addlcmds is not None: + task_commands.insert(0, addlcmds) + del addlcmds # create task batchtask = batchmodels.TaskAddParameter( id=task_id, @@ -1285,12 +1299,15 @@ def add_jobs(batch_client, blob_client, config, jpfile): task_ids=task['depends_on'] ) # create task - logger.info('Adding task {}: {}'.format( - task_id, batchtask.command_line)) - if mis is not None: - logger.info( - 'multi-instance task coordination command: {}'.format( - mis.coordination_command_line)) + if config['_verbose']: + if mis is not None: + logger.info( + 'Multi-instance task coordination command: {}'.format( + mis.coordination_command_line)) + logger.info('Adding task: {} command: {}'.format( + task_id, batchtask.command_line)) + else: + logger.info('Adding task: {}'.format(task_id)) batch_client.task.add(job_id=job.id, task=batchtask) # update job if job autocompletion is needed if set_terminate_on_all_tasks_complete: diff --git a/convoy/data.py b/convoy/data.py index b03f558..5a581d5 100644 --- a/convoy/data.py +++ b/convoy/data.py @@ -41,6 +41,7 @@ import threading import time # local imports import convoy.batch +import convoy.storage import convoy.util # create logger @@ -52,6 +53,94 @@ _MAX_READ_BLOCKSIZE_BYTES = 4194304 _FILE_SPLIT_PREFIX = '_shipyard-' +def _process_blob_input_data(blob_client, config, input_data, on_task): + # type: (azure.storage.blob.BlockBlobService, dict, dict, bool) -> str + """Process blob input data to ingress + :param azure.storage.blob.BlockBlobService blob_client: blob client + :param dict config: configuration dict + :param dict spec: config spec with input_data + :param bool on_task: if this is originating from a task spec + :rtype: list + :return: args to pass to blob ingress script + """ + args = [] + for xfer in input_data: + storage_settings = config['credentials']['storage'][ + xfer['storage_account_settings']] + container = xfer['container'] + try: + include = xfer['include'] + if include is not None: + if len(include) == 0: + include = '' + elif len(include) > 1: + raise ValueError( + 'include for input_data from {}:{} cannot exceed ' + '1 filter'.format( + xfer['storage_account_settings'], container)) + else: + include = '' + except KeyError: + include = '' + try: + dst = xfer['destination'] + except KeyError: + if on_task: + dst = None + else: + raise + if on_task and dst is None or len(dst) == 0: + dst = '$AZ_BATCH_TASK_WORKING_DIR' + try: + eo = xfer['blobxfer_extra_options'] + if eo is None: + eo = '' + except KeyError: + eo = '' + # create saskey for container with 7day expiry with rl perm + saskey = convoy.storage.create_blob_container_rl_saskey( + storage_settings, container) + # construct argument + # sa:ep:saskey:container:include:eo:dst + args.append('"{}:{}:{}:{}:{}:{}:{}"'.format( + storage_settings['account'], storage_settings['endpoint'], + saskey, container, include, eo, dst)) + return args + + +def process_input_data(blob_client, config, bifile, spec, on_task=False): + # type: (azure.storage.blob.BlockBlobService, dict, tuple, dict, + # bool) -> str + """Process input data to ingress + :param azure.storage.blob.BlockBlobService blob_client: blob client + :param dict config: configuration dict + :param tuple bifile: blog ingress script + :param dict spec: config spec with input_data + :param bool on_task: if this is originating from a task spec + :rtype: str + :return: additonal command + """ + ret = None + try: + input_data = spec['input_data'] + if input_data is not None and len(input_data) > 0: + for key in input_data: + if key == 'azure_batch': + # TODO implement compute node ingress + raise NotImplementedError() + elif key == 'azure_blob': + blobargs = _process_blob_input_data( + blob_client, config, input_data[key], on_task) + ret = '$AZ_BATCH_NODE_SHARED_DIR/{} {}'.format( + bifile[0], ' '.join(blobargs)) + else: + raise ValueError( + 'unknown input_data method: {}'.format(key)) + except KeyError: + pass + return ret + + def _singlenode_transfer( method, src, dst, username, ssh_private_key, rls, eo, reo): # type: (str, str, str, pathlib.Path, dict, str, str) -> None @@ -592,6 +681,8 @@ def ingress_data(batch_client, config, rls=None, kind=None): raise RuntimeError( 'ssh private key does not exist at: {}'.format( ssh_private_key)) + logger.debug('using ssh_private_key from: {}'.format( + ssh_private_key)) # convert shared to actual path shared_data_volumes = config['global_resources'][ 'docker_volumes']['shared_data_volumes'] diff --git a/convoy/storage.py b/convoy/storage.py index 294f614..cdddcb2 100644 --- a/convoy/storage.py +++ b/convoy/storage.py @@ -165,6 +165,26 @@ def create_clients(): return blob_client, queue_client, table_client +def create_blob_container_rl_saskey(storage_settings, container): + # type: (dict, str) -> str + """Create a saskey for a blob container with a 7day expiry time and rl perm + :param dict storage_settings: storage settings + :param str container: container + :rtype: str + :return: saskey + """ + blob_client = azureblob.BlockBlobService( + account_name=storage_settings['account'], + account_key=storage_settings['account_key'], + endpoint_suffix=storage_settings['endpoint']) + return blob_client.generate_container_shared_access_signature( + container, + azureblob.ContainerPermissions.READ | + azureblob.ContainerPermissions.LIST, + expiry=datetime.datetime.utcnow() + datetime.timedelta(days=7) + ) + + def _add_global_resource( queue_client, table_client, config, pk, p2pcsd, grtype): # type: (azurequeue.QueueService, azuretable.TableService, dict, str, diff --git a/docs/10-batch-shipyard-configuration.md b/docs/10-batch-shipyard-configuration.md index d476e2c..e7f7569 100644 --- a/docs/10-batch-shipyard-configuration.md +++ b/docs/10-batch-shipyard-configuration.md @@ -369,6 +369,17 @@ The pool schema is as follows: "reboot_on_start_task_failed": true, "block_until_all_global_resources_loaded": true, "transfer_files_on_pool_creation": false, + "input_data": { + "azure_blob": [ + { + "storage_account_settings": "mystorageaccount", + "container": "poolcontainer", + "include": ["pooldata*.bin"], + "destination": "$AZ_BATCH_NODE_SHARED_DIR/pooldata", + "blobxfer_extra_options": null + } + ] + }, "ssh": { "username": "docker", "expiry_days": 7, @@ -420,6 +431,32 @@ system is ready. Files can be ingressed to both Azure Blob Storage and a shared file system during the same pool creation invocation. If this property is set to `true` then `block_until_all_global_resources_loaded` will be force disabled. If omitted, this property defaults to `false`. +* (optional) `input_data` is an object containing data that should be +ingressed to all compute nodes as part of node preparation. It is +important to note that if you are combining this action with `files` and +are ingressing data to Blob storage as part of pool creation, that the blob +containers defined here will be downloaded as soon as the compute node is +ready to do so. This may result in the blob container/blobs not being ready +in time for the `input_data` transfer. It is up to you to ensure that these +two operations do not overlap. If there is a possibility of overlap, then you +should ingress data defined in `files` prior to pool creation and disable +the option above `transfer_files_on_pool_creation`. This object currently only +supports `azure_blob` as a member. + * `azure_blob` contains the following members: + * (required) `storage_account_settings` contains a storage account link + as defined in the credentials json. + * (required) `container` the container to transfer. + * (optional) `include` property defines an optional include filter. + Although this property is an array, it is only allowed to have 1 + maximum filter. + * (required) `destination` property defines where to place the + downloaded files on the host file system. Please note that you should + not specify a destination that is on a shared file system. If you + require ingressing to a shared file system location like a GlusterFS + volume, then use the global configuration `files` property and the + `ingressdata` action. + * (optional) `blobxfer_extra_options` are any extra options to pass to + `blobxfer`. * (optional) `ssh` is the property for creating a user to accomodate SSH sessions to compute nodes. If this property is absent, then an SSH user is not created with pool creation. @@ -460,6 +497,17 @@ The jobs schema is as follows: "environment_variables": { "abc": "xyz" }, + "input_data": { + "azure_blob": [ + { + "storage_account_settings": "mystorageaccount", + "container": "jobcontainer", + "include": ["jobdata*.bin"], + "destination": "$AZ_BATCH_NODE_SHARED_DIR/jobdata", + "blobxfer_extra_options": null + } + ] + }, "tasks": [ { "id": null, @@ -486,6 +534,17 @@ The jobs schema is as follows: "file_mode": "" } ], + "input_data": { + "azure_blob": [ + { + "storage_account_settings": "mystorageaccount", + "container": "taskcontainer", + "include": ["taskdata*.bin"], + "destination": "$AZ_BATCH_NODE_SHARED_DIR/taskdata", + "blobxfer_extra_options": null + } + ] + }, "remove_container_after_exit": true, "additional_docker_run_options": [ ], @@ -520,6 +579,31 @@ Docker container in multi-instance tasks. This is defaulted to `true` when multi-instance tasks are specified. * (optional) `environment_variables` under the job are environment variables which will be applied to all tasks operating under the job. +* (optional) `input_data` is an object containing data that should be +ingressed for the job. Any `input_data` defined at this level will be +downloaded for this job which can be run on any number of compute nodes +depending upon the number of constituent tasks and repeat invocations. However, +`input_data` is only downloaded once per job invocation on a compute node. +For example, if `job-1`:`task-1` is run on compute node A and then +`job-1`:`task-2` is run on compute node B, then this `input_data` is ingressed +to both compute node A and B. However, if `job-1`:`task-3` is run on +compute node A, then the `input_data` is not transferred again. This object +currently only supports `azure_blob` as a member. + * `azure_blob` contains the following members: + * (required) `storage_account_settings` contains a storage account link + as defined in the credentials json. + * (required) `container` the container to transfer. + * (optional) `include` property defines an optional include filter. + Although this property is an array, it is only allowed to have 1 + maximum filter. + * (required) `destination` property defines where to place the + downloaded files on the host file system. Please note that you should + not specify a destination that is on a shared file system. If you + require ingressing to a shared file system location like a GlusterFS + volume, then use the global configuration `files` property and the + `ingressdata` action. + * (optional) `blobxfer_extra_options` are any extra options to pass to + `blobxfer`. * (required) `tasks` is an array of tasks to add to the job. * (optional) `id` is the task id. Note that if the task `id` is null or empty then a generic task id will be assigned. The generic task id is @@ -551,6 +635,27 @@ which will be applied to all tasks operating under the job. Blob Storage URL. * `file_mode` if the file mode to set for the file on the compute node. This is optional. + * (optional) `input_data` is an object containing data that should be + ingressed for this specific task. This object currently only supports + `azure_blob` as a member. + * `azure_blob` contains the following members: + * (required) `storage_account_settings` contains a storage account link + as defined in the credentials json. + * (required) `container` the container to transfer. + * (optional) `include` property defines an optional include filter. + Although this property is an array, it is only allowed to have 1 + maximum filter. + * (optional) `destination` property defines where to place the + downloaded files on the host file system. Unlike the job-level + version of `input_data`, this `destination` property can be ommitted. + If `destination` is not specified at this level, then files are + defaulted to download into `$AZ_BATCH_TASK_WORKING_DIR`. Please note + that you should not specify a destination that is on a shared file + system. If you require ingressing to a shared file system location + like a GlusterFS volume, then use the global configuration `files` + property and the `ingressdata` action. + * (optional) `blobxfer_extra_options` are any extra options to pass to + `blobxfer`. * (optional) `remove_container_after_exit` property specifies if the container should be automatically removed/cleaned up after it exits. This defaults to `false`. diff --git a/docs/20-batch-shipyard-usage.md b/docs/20-batch-shipyard-usage.md index 09c646e..dffe34d 100644 --- a/docs/20-batch-shipyard-usage.md +++ b/docs/20-batch-shipyard-usage.md @@ -71,7 +71,8 @@ file. compute node. `--filespec` can be used here but without the filename, e.g., `--filespec myjob:mytask`. * `getnodefile`: retrieve a file with pool id/node id from a live compute node. -* `ingressdata`: ingress data as specified in the global configuration file. +* `ingressdata`: ingress data as specified in the `files` property of the +global configuration file. * `listjobs`: list all jobs under the Batch account. * `listtasks`: list tasks under jobs specified in the jobs configuraiton file. * `clearstorage`: clear storage containers as specified in the configuration diff --git a/scripts/shipyard_blobingress.sh b/scripts/shipyard_blobingress.sh new file mode 100755 index 0000000..816d6b0 --- /dev/null +++ b/scripts/shipyard_blobingress.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +set -e +set -o pipefail + +for spec in "$@"; do + # sa:ep:saskey:container:include:eo:dst + IFS=':' read -ra parts <<< "$spec" + sa=${parts[0]} + ep=${parts[1]} + saskey=${parts[2]} + container=${parts[3]} + incl=${parts[4]} + eo=${parts[5]} + dst=${parts[6]} + include= + if [ ! -z $incl ]; then + include="--include $incl" + fi + # create destination directory + mkdir -p $dst + # ingress data from blobs + docker run --rm -t -v $dst:/blobxfer -w /blobxfer alfpark/blobxfer $sa $container . --saskey $saskey --remoteresource . --download --no-progressbar $include $eo +done diff --git a/scripts/shipyard_nodeprep.sh b/scripts/shipyard_nodeprep.sh index 3a92b3f..8b4dca1 100755 --- a/scripts/shipyard_nodeprep.sh +++ b/scripts/shipyard_nodeprep.sh @@ -171,7 +171,7 @@ EOF fi # copy required shell scripts to shared -cp docker_jp_block.sh $AZ_BATCH_NODE_SHARED_DIR +cp docker_jp_block.sh shipyard_blobingress.sh $AZ_BATCH_NODE_SHARED_DIR # install docker host engine if [ $offer == "ubuntuserver" ] || [ $offer == "debian" ]; then @@ -564,6 +564,9 @@ if [ $p2penabled -eq 0 ]; then wait fi +# retrieve blobxfer docker image to assist with data ingress from blob +docker pull alfpark/blobxfer + # block until images ready if specified if [ ! -z $block ]; then echo "blocking until images ready: $block" diff --git a/shipyard.py b/shipyard.py index 7cafb12..f862b20 100755 --- a/shipyard.py +++ b/shipyard.py @@ -76,6 +76,9 @@ _NODEPREP_FILE = ('shipyard_nodeprep.sh', 'scripts/shipyard_nodeprep.sh') _GLUSTERPREP_FILE = ('shipyard_glusterfs.sh', 'scripts/shipyard_glusterfs.sh') _HPNSSH_FILE = ('shipyard_hpnssh.sh', 'scripts/shipyard_hpnssh.sh') _JOBPREP_FILE = ('docker_jp_block.sh', 'scripts/docker_jp_block.sh') +_BLOBINGRESS_FILE = ( + 'shipyard_blobingress.sh', 'scripts/shipyard_blobingress.sh' +) _CASCADE_FILE = ('cascade.py', 'cascade/cascade.py') _SETUP_PR_FILE = ( 'setup_private_registry.py', 'cascade/setup_private_registry.py' @@ -419,7 +422,7 @@ def add_pool(batch_client, blob_client, config): except KeyError: prefix = None # create resource files list - _rflist = [_NODEPREP_FILE, _JOBPREP_FILE, regfile] + _rflist = [_NODEPREP_FILE, _JOBPREP_FILE, _BLOBINGRESS_FILE, regfile] if not use_shipyard_docker_image: _rflist.append(_CASCADE_FILE) _rflist.append(_SETUP_PR_FILE) @@ -478,11 +481,18 @@ def add_pool(batch_client, blob_client, config): ' -w' if hpnssh else '', ), ] + # add additional start task commands try: start_task.extend( config['pool_specification']['additional_node_prep_commands']) except KeyError: pass + # digest any input_data + addlcmds = convoy.data.process_input_data( + blob_client, config, _BLOBINGRESS_FILE, config['pool_specification']) + if addlcmds is not None: + start_task.append(addlcmds) + del addlcmds # create pool param pool = batchmodels.PoolAddParameter( id=config['pool_specification']['id'], @@ -842,6 +852,7 @@ def main(): }] if args.verbose: logger.debug('config:\n' + json.dumps(config, indent=4)) + config['_verbose'] = args.verbose _populate_global_settings(config, args.action) config['_auto_confirm'] = args.yes @@ -873,7 +884,9 @@ def main(): elif args.action == 'delnode': convoy.batch.del_node(batch_client, config, args.nodeid) elif args.action == 'addjobs': - convoy.batch.add_jobs(batch_client, blob_client, config, _JOBPREP_FILE) + convoy.batch.add_jobs( + batch_client, blob_client, config, _JOBPREP_FILE, + _BLOBINGRESS_FILE) elif args.action == 'cleanmijobs': convoy.batch.clean_mi_jobs(batch_client, config) elif args.action == 'termjobs':