Add pool/job/task-level data ingress support

This commit is contained in:
Fred Park 2016-10-14 13:59:19 -07:00
Родитель 9d0c6d3ca6
Коммит 33300c551c
11 изменённых файлов: 336 добавлений и 18 удалений

Просмотреть файл

@ -2,8 +2,19 @@
## [Unreleased]
### Added
- Data ingress support to GlusterFS and Azure Blob Storage. Please see the
configuration doc for more information.
- Comprehensive data movement support. Please see the configuration doc for
more information.
- Ingress from local machine with `files` in global configuration
- To GlusterFS shared volume
- To Azure Blob Storage
- Ingress from Azure Blob Storage with `input_data` in pool and jobs
configuration
- Pool-level: to compute nodes
- Job-level: to compute nodes running the specified job
- Task-level: to compute nodes running a task of a job
- Egress to local machine as actions
- Single file from compute node
- Whole task-level directories from compute node
- Experimental support for OpenSSH HPN on Ubuntu
- Additional actions: `ingressdata`, `gettaskallfiles`, `listjobs`,
`listtasks`. Please see the usage doc for more information.

Просмотреть файл

@ -6,6 +6,17 @@
"environment_variables": {
"abc": "xyz"
},
"input_data": {
"azure_blob": [
{
"storage_account_settings": "mystorageaccount",
"container": "jobcontainer",
"include": ["jobdata*.bin"],
"destination": "$AZ_BATCH_NODE_SHARED_DIR/jobdata",
"blobxfer_extra_options": null
}
]
},
"tasks": [
{
"id": null,
@ -33,6 +44,17 @@
"file_mode": ""
}
],
"input_data": {
"azure_blob": [
{
"storage_account_settings": "mystorageaccount",
"container": "taskcontainer",
"include": ["taskdata*.bin"],
"destination": "$AZ_BATCH_NODE_SHARED_DIR/taskdata",
"blobxfer_extra_options": null
}
]
},
"entrypoint": null,
"command": "",
"infiniband": false,

Просмотреть файл

@ -11,6 +11,17 @@
"reboot_on_start_task_failed": true,
"block_until_all_global_resources_loaded": true,
"transfer_files_on_pool_creation": false,
"input_data": {
"azure_blob": [
{
"storage_account_settings": "mystorageaccount",
"container": "poolcontainer",
"include": ["pooldata*.bin"],
"destination": "$AZ_BATCH_NODE_SHARED_DIR/pooldata",
"blobxfer_extra_options": null
}
]
},
"ssh": {
"username": "docker",
"expiry_days": 7,

Просмотреть файл

@ -36,6 +36,7 @@ import time
# non-stdlib imports
import azure.batch.models as batchmodels
# local imports
import convoy.data
import convoy.storage
import convoy.util
@ -819,15 +820,16 @@ def list_task_files(batch_client, config):
logger.error('no tasks found for job {}'.format(job['id']))
def add_jobs(batch_client, blob_client, config, jpfile):
def add_jobs(batch_client, blob_client, config, jpfile, bifile):
# type: (batch.BatchServiceClient, azureblob.BlockBlobService,
# tuple, dict) -> None
# dict, tuple, tuple) -> None
"""Add jobs
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
:param tuple jpfile: jobprep file
:param tuple bifile: blob ingress file
"""
# get the pool inter-node comm setting
pool_id = config['pool_specification']['id']
@ -835,11 +837,17 @@ def add_jobs(batch_client, blob_client, config, jpfile):
global_resources = []
for gr in config['global_resources']['docker_images']:
global_resources.append(gr)
# TODO add global resources for non-docker resources
jpcmdline = convoy.util.wrap_commands_in_shell([
'$AZ_BATCH_NODE_SHARED_DIR/{} {}'.format(
jpfile[0], ' '.join(global_resources))])
jpcmd = ['$AZ_BATCH_NODE_SHARED_DIR/{} {}'.format(
jpfile[0], ' '.join(global_resources))]
for jobspec in config['job_specifications']:
# digest any input_data
addlcmds = convoy.data.process_input_data(
blob_client, config, bifile, jobspec)
if addlcmds is not None:
jpcmd.append(addlcmds)
del addlcmds
jpcmdline = convoy.util.wrap_commands_in_shell(jpcmd)
del jpcmd
job = batchmodels.JobAddParameter(
id=jobspec['id'],
pool_info=batchmodels.PoolInformation(pool_id=pool_id),
@ -1244,6 +1252,12 @@ def add_jobs(batch_client, blob_client, config, jpfile):
image,
'{}'.format(' ' + command) if command else '')
]
# digest any input_data
addlcmds = convoy.data.process_input_data(
blob_client, config, bifile, task, on_task=True)
if addlcmds is not None:
task_commands.insert(0, addlcmds)
del addlcmds
# create task
batchtask = batchmodels.TaskAddParameter(
id=task_id,
@ -1285,12 +1299,15 @@ def add_jobs(batch_client, blob_client, config, jpfile):
task_ids=task['depends_on']
)
# create task
logger.info('Adding task {}: {}'.format(
task_id, batchtask.command_line))
if mis is not None:
logger.info(
'multi-instance task coordination command: {}'.format(
mis.coordination_command_line))
if config['_verbose']:
if mis is not None:
logger.info(
'Multi-instance task coordination command: {}'.format(
mis.coordination_command_line))
logger.info('Adding task: {} command: {}'.format(
task_id, batchtask.command_line))
else:
logger.info('Adding task: {}'.format(task_id))
batch_client.task.add(job_id=job.id, task=batchtask)
# update job if job autocompletion is needed
if set_terminate_on_all_tasks_complete:

Просмотреть файл

@ -41,6 +41,7 @@ import threading
import time
# local imports
import convoy.batch
import convoy.storage
import convoy.util
# create logger
@ -52,6 +53,94 @@ _MAX_READ_BLOCKSIZE_BYTES = 4194304
_FILE_SPLIT_PREFIX = '_shipyard-'
def _process_blob_input_data(blob_client, config, input_data, on_task):
# type: (azure.storage.blob.BlockBlobService, dict, dict, bool) -> str
"""Process blob input data to ingress
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
:param dict spec: config spec with input_data
:param bool on_task: if this is originating from a task spec
:rtype: list
:return: args to pass to blob ingress script
"""
args = []
for xfer in input_data:
storage_settings = config['credentials']['storage'][
xfer['storage_account_settings']]
container = xfer['container']
try:
include = xfer['include']
if include is not None:
if len(include) == 0:
include = ''
elif len(include) > 1:
raise ValueError(
'include for input_data from {}:{} cannot exceed '
'1 filter'.format(
xfer['storage_account_settings'], container))
else:
include = ''
except KeyError:
include = ''
try:
dst = xfer['destination']
except KeyError:
if on_task:
dst = None
else:
raise
if on_task and dst is None or len(dst) == 0:
dst = '$AZ_BATCH_TASK_WORKING_DIR'
try:
eo = xfer['blobxfer_extra_options']
if eo is None:
eo = ''
except KeyError:
eo = ''
# create saskey for container with 7day expiry with rl perm
saskey = convoy.storage.create_blob_container_rl_saskey(
storage_settings, container)
# construct argument
# sa:ep:saskey:container:include:eo:dst
args.append('"{}:{}:{}:{}:{}:{}:{}"'.format(
storage_settings['account'], storage_settings['endpoint'],
saskey, container, include, eo, dst))
return args
def process_input_data(blob_client, config, bifile, spec, on_task=False):
# type: (azure.storage.blob.BlockBlobService, dict, tuple, dict,
# bool) -> str
"""Process input data to ingress
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
:param tuple bifile: blog ingress script
:param dict spec: config spec with input_data
:param bool on_task: if this is originating from a task spec
:rtype: str
:return: additonal command
"""
ret = None
try:
input_data = spec['input_data']
if input_data is not None and len(input_data) > 0:
for key in input_data:
if key == 'azure_batch':
# TODO implement compute node ingress
raise NotImplementedError()
elif key == 'azure_blob':
blobargs = _process_blob_input_data(
blob_client, config, input_data[key], on_task)
ret = '$AZ_BATCH_NODE_SHARED_DIR/{} {}'.format(
bifile[0], ' '.join(blobargs))
else:
raise ValueError(
'unknown input_data method: {}'.format(key))
except KeyError:
pass
return ret
def _singlenode_transfer(
method, src, dst, username, ssh_private_key, rls, eo, reo):
# type: (str, str, str, pathlib.Path, dict, str, str) -> None
@ -592,6 +681,8 @@ def ingress_data(batch_client, config, rls=None, kind=None):
raise RuntimeError(
'ssh private key does not exist at: {}'.format(
ssh_private_key))
logger.debug('using ssh_private_key from: {}'.format(
ssh_private_key))
# convert shared to actual path
shared_data_volumes = config['global_resources'][
'docker_volumes']['shared_data_volumes']

Просмотреть файл

@ -165,6 +165,26 @@ def create_clients():
return blob_client, queue_client, table_client
def create_blob_container_rl_saskey(storage_settings, container):
# type: (dict, str) -> str
"""Create a saskey for a blob container with a 7day expiry time and rl perm
:param dict storage_settings: storage settings
:param str container: container
:rtype: str
:return: saskey
"""
blob_client = azureblob.BlockBlobService(
account_name=storage_settings['account'],
account_key=storage_settings['account_key'],
endpoint_suffix=storage_settings['endpoint'])
return blob_client.generate_container_shared_access_signature(
container,
azureblob.ContainerPermissions.READ |
azureblob.ContainerPermissions.LIST,
expiry=datetime.datetime.utcnow() + datetime.timedelta(days=7)
)
def _add_global_resource(
queue_client, table_client, config, pk, p2pcsd, grtype):
# type: (azurequeue.QueueService, azuretable.TableService, dict, str,

Просмотреть файл

@ -369,6 +369,17 @@ The pool schema is as follows:
"reboot_on_start_task_failed": true,
"block_until_all_global_resources_loaded": true,
"transfer_files_on_pool_creation": false,
"input_data": {
"azure_blob": [
{
"storage_account_settings": "mystorageaccount",
"container": "poolcontainer",
"include": ["pooldata*.bin"],
"destination": "$AZ_BATCH_NODE_SHARED_DIR/pooldata",
"blobxfer_extra_options": null
}
]
},
"ssh": {
"username": "docker",
"expiry_days": 7,
@ -420,6 +431,32 @@ system is ready. Files can be ingressed to both Azure Blob Storage and a
shared file system during the same pool creation invocation. If this property
is set to `true` then `block_until_all_global_resources_loaded` will be force
disabled. If omitted, this property defaults to `false`.
* (optional) `input_data` is an object containing data that should be
ingressed to all compute nodes as part of node preparation. It is
important to note that if you are combining this action with `files` and
are ingressing data to Blob storage as part of pool creation, that the blob
containers defined here will be downloaded as soon as the compute node is
ready to do so. This may result in the blob container/blobs not being ready
in time for the `input_data` transfer. It is up to you to ensure that these
two operations do not overlap. If there is a possibility of overlap, then you
should ingress data defined in `files` prior to pool creation and disable
the option above `transfer_files_on_pool_creation`. This object currently only
supports `azure_blob` as a member.
* `azure_blob` contains the following members:
* (required) `storage_account_settings` contains a storage account link
as defined in the credentials json.
* (required) `container` the container to transfer.
* (optional) `include` property defines an optional include filter.
Although this property is an array, it is only allowed to have 1
maximum filter.
* (required) `destination` property defines where to place the
downloaded files on the host file system. Please note that you should
not specify a destination that is on a shared file system. If you
require ingressing to a shared file system location like a GlusterFS
volume, then use the global configuration `files` property and the
`ingressdata` action.
* (optional) `blobxfer_extra_options` are any extra options to pass to
`blobxfer`.
* (optional) `ssh` is the property for creating a user to accomodate SSH
sessions to compute nodes. If this property is absent, then an SSH user is not
created with pool creation.
@ -460,6 +497,17 @@ The jobs schema is as follows:
"environment_variables": {
"abc": "xyz"
},
"input_data": {
"azure_blob": [
{
"storage_account_settings": "mystorageaccount",
"container": "jobcontainer",
"include": ["jobdata*.bin"],
"destination": "$AZ_BATCH_NODE_SHARED_DIR/jobdata",
"blobxfer_extra_options": null
}
]
},
"tasks": [
{
"id": null,
@ -486,6 +534,17 @@ The jobs schema is as follows:
"file_mode": ""
}
],
"input_data": {
"azure_blob": [
{
"storage_account_settings": "mystorageaccount",
"container": "taskcontainer",
"include": ["taskdata*.bin"],
"destination": "$AZ_BATCH_NODE_SHARED_DIR/taskdata",
"blobxfer_extra_options": null
}
]
},
"remove_container_after_exit": true,
"additional_docker_run_options": [
],
@ -520,6 +579,31 @@ Docker container in multi-instance tasks. This is defaulted to `true` when
multi-instance tasks are specified.
* (optional) `environment_variables` under the job are environment variables
which will be applied to all tasks operating under the job.
* (optional) `input_data` is an object containing data that should be
ingressed for the job. Any `input_data` defined at this level will be
downloaded for this job which can be run on any number of compute nodes
depending upon the number of constituent tasks and repeat invocations. However,
`input_data` is only downloaded once per job invocation on a compute node.
For example, if `job-1`:`task-1` is run on compute node A and then
`job-1`:`task-2` is run on compute node B, then this `input_data` is ingressed
to both compute node A and B. However, if `job-1`:`task-3` is run on
compute node A, then the `input_data` is not transferred again. This object
currently only supports `azure_blob` as a member.
* `azure_blob` contains the following members:
* (required) `storage_account_settings` contains a storage account link
as defined in the credentials json.
* (required) `container` the container to transfer.
* (optional) `include` property defines an optional include filter.
Although this property is an array, it is only allowed to have 1
maximum filter.
* (required) `destination` property defines where to place the
downloaded files on the host file system. Please note that you should
not specify a destination that is on a shared file system. If you
require ingressing to a shared file system location like a GlusterFS
volume, then use the global configuration `files` property and the
`ingressdata` action.
* (optional) `blobxfer_extra_options` are any extra options to pass to
`blobxfer`.
* (required) `tasks` is an array of tasks to add to the job.
* (optional) `id` is the task id. Note that if the task `id` is null or
empty then a generic task id will be assigned. The generic task id is
@ -551,6 +635,27 @@ which will be applied to all tasks operating under the job.
Blob Storage URL.
* `file_mode` if the file mode to set for the file on the compute node.
This is optional.
* (optional) `input_data` is an object containing data that should be
ingressed for this specific task. This object currently only supports
`azure_blob` as a member.
* `azure_blob` contains the following members:
* (required) `storage_account_settings` contains a storage account link
as defined in the credentials json.
* (required) `container` the container to transfer.
* (optional) `include` property defines an optional include filter.
Although this property is an array, it is only allowed to have 1
maximum filter.
* (optional) `destination` property defines where to place the
downloaded files on the host file system. Unlike the job-level
version of `input_data`, this `destination` property can be ommitted.
If `destination` is not specified at this level, then files are
defaulted to download into `$AZ_BATCH_TASK_WORKING_DIR`. Please note
that you should not specify a destination that is on a shared file
system. If you require ingressing to a shared file system location
like a GlusterFS volume, then use the global configuration `files`
property and the `ingressdata` action.
* (optional) `blobxfer_extra_options` are any extra options to pass to
`blobxfer`.
* (optional) `remove_container_after_exit` property specifies if the
container should be automatically removed/cleaned up after it exits. This
defaults to `false`.

Просмотреть файл

@ -71,7 +71,8 @@ file.
compute node. `--filespec` can be used here but without the filename, e.g.,
`--filespec myjob:mytask`.
* `getnodefile`: retrieve a file with pool id/node id from a live compute node.
* `ingressdata`: ingress data as specified in the global configuration file.
* `ingressdata`: ingress data as specified in the `files` property of the
global configuration file.
* `listjobs`: list all jobs under the Batch account.
* `listtasks`: list tasks under jobs specified in the jobs configuraiton file.
* `clearstorage`: clear storage containers as specified in the configuration

24
scripts/shipyard_blobingress.sh Executable file
Просмотреть файл

@ -0,0 +1,24 @@
#!/usr/bin/env bash
set -e
set -o pipefail
for spec in "$@"; do
# sa:ep:saskey:container:include:eo:dst
IFS=':' read -ra parts <<< "$spec"
sa=${parts[0]}
ep=${parts[1]}
saskey=${parts[2]}
container=${parts[3]}
incl=${parts[4]}
eo=${parts[5]}
dst=${parts[6]}
include=
if [ ! -z $incl ]; then
include="--include $incl"
fi
# create destination directory
mkdir -p $dst
# ingress data from blobs
docker run --rm -t -v $dst:/blobxfer -w /blobxfer alfpark/blobxfer $sa $container . --saskey $saskey --remoteresource . --download --no-progressbar $include $eo
done

Просмотреть файл

@ -171,7 +171,7 @@ EOF
fi
# copy required shell scripts to shared
cp docker_jp_block.sh $AZ_BATCH_NODE_SHARED_DIR
cp docker_jp_block.sh shipyard_blobingress.sh $AZ_BATCH_NODE_SHARED_DIR
# install docker host engine
if [ $offer == "ubuntuserver" ] || [ $offer == "debian" ]; then
@ -564,6 +564,9 @@ if [ $p2penabled -eq 0 ]; then
wait
fi
# retrieve blobxfer docker image to assist with data ingress from blob
docker pull alfpark/blobxfer
# block until images ready if specified
if [ ! -z $block ]; then
echo "blocking until images ready: $block"

Просмотреть файл

@ -76,6 +76,9 @@ _NODEPREP_FILE = ('shipyard_nodeprep.sh', 'scripts/shipyard_nodeprep.sh')
_GLUSTERPREP_FILE = ('shipyard_glusterfs.sh', 'scripts/shipyard_glusterfs.sh')
_HPNSSH_FILE = ('shipyard_hpnssh.sh', 'scripts/shipyard_hpnssh.sh')
_JOBPREP_FILE = ('docker_jp_block.sh', 'scripts/docker_jp_block.sh')
_BLOBINGRESS_FILE = (
'shipyard_blobingress.sh', 'scripts/shipyard_blobingress.sh'
)
_CASCADE_FILE = ('cascade.py', 'cascade/cascade.py')
_SETUP_PR_FILE = (
'setup_private_registry.py', 'cascade/setup_private_registry.py'
@ -419,7 +422,7 @@ def add_pool(batch_client, blob_client, config):
except KeyError:
prefix = None
# create resource files list
_rflist = [_NODEPREP_FILE, _JOBPREP_FILE, regfile]
_rflist = [_NODEPREP_FILE, _JOBPREP_FILE, _BLOBINGRESS_FILE, regfile]
if not use_shipyard_docker_image:
_rflist.append(_CASCADE_FILE)
_rflist.append(_SETUP_PR_FILE)
@ -478,11 +481,18 @@ def add_pool(batch_client, blob_client, config):
' -w' if hpnssh else '',
),
]
# add additional start task commands
try:
start_task.extend(
config['pool_specification']['additional_node_prep_commands'])
except KeyError:
pass
# digest any input_data
addlcmds = convoy.data.process_input_data(
blob_client, config, _BLOBINGRESS_FILE, config['pool_specification'])
if addlcmds is not None:
start_task.append(addlcmds)
del addlcmds
# create pool param
pool = batchmodels.PoolAddParameter(
id=config['pool_specification']['id'],
@ -842,6 +852,7 @@ def main():
}]
if args.verbose:
logger.debug('config:\n' + json.dumps(config, indent=4))
config['_verbose'] = args.verbose
_populate_global_settings(config, args.action)
config['_auto_confirm'] = args.yes
@ -873,7 +884,9 @@ def main():
elif args.action == 'delnode':
convoy.batch.del_node(batch_client, config, args.nodeid)
elif args.action == 'addjobs':
convoy.batch.add_jobs(batch_client, blob_client, config, _JOBPREP_FILE)
convoy.batch.add_jobs(
batch_client, blob_client, config, _JOBPREP_FILE,
_BLOBINGRESS_FILE)
elif args.action == 'cleanmijobs':
convoy.batch.clean_mi_jobs(batch_client, config)
elif args.action == 'termjobs':