Add more features to pool/tasks

- Add infiniband support
- Add max tasks per node
- Properly handle multi-instance tasks with docker run <-> exec
- Add docker multi-instance cleanup helper
This commit is contained in:
Fred Park 2016-08-31 02:21:50 -07:00
Родитель 351800344d
Коммит c5368207cd
6 изменённых файлов: 240 добавлений и 77 удалений

Просмотреть файл

@ -47,8 +47,8 @@
"azure_file_share_name": "",
"container_path": "",
"mount_options": [
"filemode=0640",
"dirmode=0750",
"filemode=0777",
"dirmode=0755",
"nolock=true"
]
}

Просмотреть файл

@ -34,12 +34,18 @@
],
"entrypoint": null,
"command": "",
"infiniband": false,
"depends_on": [
],
"multi_instance": {
"num_instances": 3,
"num_instances": "pool_specification_vm_count",
"coordination_command": null,
"resource_files": [
{
"file_path": "",
"blob_source": "",
"file_mode": ""
}
]
}
}

Просмотреть файл

@ -3,6 +3,7 @@
"id": "dockerpool",
"vm_size": "STANDARD_D2_V2",
"vm_count": 5,
"max_tasks_per_node": 1,
"publisher": "Canonical",
"offer": "UbuntuServer",
"sku": "16.04.0-LTS",

Просмотреть файл

Просмотреть файл

@ -114,8 +114,8 @@ if [ ! -z $p2p ]; then
iptables -t raw -I OUTPUT -p udp --sport 6881 -j CT --notrack
fi
# copy job prep docker block file to shared
cp jpdockerblock.sh $AZ_BATCH_NODE_SHARED_DIR
# copy required shell scripts to shared
cp docker_jp_block.sh $AZ_BATCH_NODE_SHARED_DIR
# install docker host engine
if [ $offer == "ubuntuserver" ] || [ $offer == "debian" ]; then

Просмотреть файл

@ -77,8 +77,8 @@ _AZUREFILE_DVD_BIN_URL = (
)
_AZUREFILE_DVD_BIN_MD5 = 'f3c1750583c4842dfbf95bbd56f65ede'
_MAX_REBOOT_RETRIES = 5
_NODEPREP_FILE = ('nodeprep.sh', 'scripts/nodeprep.sh')
_JOBPREP_FILE = ('jpdockerblock.sh', 'scripts/jpdockerblock.sh')
_NODEPREP_FILE = ('shipyard_nodeprep.sh', 'scripts/shipyard_nodeprep.sh')
_JOBPREP_FILE = ('docker_jp_block.sh', 'scripts/docker_jp_block.sh')
_CASCADE_FILE = ('cascade.py', 'cascade/cascade.py')
_SETUP_PR_FILE = (
'setup_private_registry.py', 'cascade/setup_private_registry.py'
@ -157,7 +157,6 @@ def _populate_global_settings(config, action):
logger.debug(
'attempting to generate docker private registry tarball')
try:
prf.parent.mkdir(mode=0o750, parents=True, exist_ok=True)
output = subprocess.check_output(
'sudo docker images -q registry:2', shell=True)
output = output.decode('utf-8').strip()
@ -194,7 +193,7 @@ def _wrap_commands_in_shell(commands, wait=True):
:return: wrapped commands
"""
return '/bin/bash -c \'set -e; set -o pipefail; {}{}\''.format(
';'.join(commands), '; wait' if wait else '')
'; '.join(commands), '; wait' if wait else '')
def _create_credentials(config):
@ -315,7 +314,6 @@ def setup_azurefile_volume_driver(blob_client, config):
sku = config['pool_specification']['sku'].lower()
# check to see if binary is downloaded
bin = pathlib.Path('resources/azurefile-dockervolumedriver')
bin.parent.mkdir(mode=0o750, parents=True, exist_ok=True)
if (not bin.exists() or
compute_md5_for_file(bin, False) != _AZUREFILE_DVD_BIN_MD5):
response = urllibreq.urlopen(_AZUREFILE_DVD_BIN_URL)
@ -390,6 +388,10 @@ def add_pool(batch_client, blob_client, config):
offer = config['pool_specification']['offer']
sku = config['pool_specification']['sku']
vm_count = config['pool_specification']['vm_count']
try:
maxtasks = config['pool_specification']['max_tasks_per_node']
except KeyError:
maxtasks = 1
# cascade settings
try:
perf = config['store_timing_metrics']
@ -533,6 +535,7 @@ def add_pool(batch_client, blob_client, config):
node_agent_sku_id=sku_to_use.id),
vm_size=config['pool_specification']['vm_size'],
target_dedicated=vm_count,
max_tasks_per_node=maxtasks,
enable_inter_node_communication=p2p, # enable only for p2p mode
start_task=batchmodels.StartTask(
command_line=_wrap_commands_in_shell(start_task, wait=False),
@ -1091,26 +1094,24 @@ def add_jobs(batch_client, blob_client, config):
_GENERIC_DOCKER_TASK_PREFIX, tasknum)
# get generic run opts
try:
run_opts = task['additional_docker_run_options'].split()
run_opts = task['additional_docker_run_options']
except KeyError:
run_opts = []
# parse remove container option
try:
rm_container = task['remove_container_after_exit']
except KeyError:
pass
rm_container = False
else:
if rm_container and '--rm' not in run_opts:
run_opts.append('--rm')
del rm_container
# parse name option
try:
name = task['name']
if name is not None:
run_opts.append('-n {}'.format(name))
del name
run_opts.append('--name {}'.format(name))
except KeyError:
pass
name = None
# parse labels option
try:
labels = task['labels']
@ -1184,6 +1185,24 @@ def add_jobs(batch_client, blob_client, config):
env_vars = jobspec['environment_variables']
except KeyError:
env_vars = None
try:
infiniband = task['infiniband']
except KeyError:
infiniband = False
# ensure we're on HPC skus with inter node comm enabled
if infiniband:
if not _pool.enable_inter_node_communication:
raise RuntimeError(
('cannot initialize an infiniband task on a '
'non-internode communication enabled '
'pool: {}').format(pool_id))
publisher = config['pool_specification']['publisher'].lower()
offer = config['pool_specification']['offer'].lower()
# TODO support SLES-HPC, for now only support CentOS-HPC
if publisher != 'openlogic' and offer != 'centos-hpc':
raise ValueError(
('Unsupported infiniband VM config, publisher={} '
'offer={}').format(publisher, offer))
try:
task_ev = task['environment_variables']
if env_vars is None:
@ -1192,43 +1211,139 @@ def add_jobs(batch_client, blob_client, config):
env_vars = merge_dict(env_vars, task_ev)
except KeyError:
pass
else:
if env_vars is not None and len(env_vars) > 0:
envfiletmp = _TEMP_DIR / '{}{}'.format(task_id, envfile)
envfileloc = '{}taskrf-{}/{}{}'.format(
config['storage_entity_prefix'], job.id, task_id,
envfile)
with envfiletmp.open('w', encoding='utf8') as f:
for key in env_vars:
f.write('{}={}{}'.format(
key, env_vars[key], os.linesep))
# upload env var file if exists
sas_urls = upload_resource_files(
blob_client, config, [(envfileloc, str(envfiletmp))])
envfiletmp.unlink()
if len(sas_urls) != 1:
raise RuntimeError('unexpected number of sas urls')
if env_vars is not None and len(env_vars) > 0:
envfiletmp = _TEMP_DIR / '{}{}'.format(task_id, envfile)
envfileloc = '{}taskrf-{}/{}{}'.format(
config['storage_entity_prefix'], job.id, task_id,
envfile)
with envfiletmp.open('w', encoding='utf8') as f:
for key in env_vars:
f.write('{}={}{}'.format(
key, env_vars[key], os.linesep))
if infiniband:
f.write('I_MPI_FABRICS=shm:dapl{}'.format(os.linesep))
f.write('I_MPI_DAPL_PROVIDER=ofa-v2-ib0{}'.format(
os.linesep))
f.write('I_MPI_DYNAMIC_CONNECTION=0{}'.format(
os.linesep))
# create a manpath entry for potentially buggy
# intel mpivars.sh
f.write(
'MANPATH=/usr/share/man:/usr/local/man{}'.format(
os.linesep))
# upload env var file if exists
sas_urls = upload_resource_files(
blob_client, config, [(envfileloc, str(envfiletmp))])
envfiletmp.unlink()
if len(sas_urls) != 1:
raise RuntimeError('unexpected number of sas urls')
# always add option for envfile
run_opts.append('--env-file {}'.format(envfile))
# add infiniband run opts
if infiniband:
run_opts.append('--net=host')
run_opts.append('--ulimit memlock=9223372036854775807')
run_opts.append('--device=/dev/hvnd_rdma')
run_opts.append('--device=/dev/infiniband/rdma_cm')
run_opts.append('--device=/dev/infiniband/uverbs0')
run_opts.append('-v /etc/rdma:/etc/rdma:ro')
run_opts.append('-v /opt/intel:/opt/intel:ro')
# mount batch root dir
run_opts.append(
'-v $AZ_BATCH_NODE_ROOT_DIR:$AZ_BATCH_NODE_ROOT_DIR')
# set working directory
run_opts.append('-w $AZ_BATCH_TASK_WORKING_DIR')
# add task
task_commands = [
'env | grep AZ_BATCH_ >> {}'.format(envfile),
'docker run {} {}{}'.format(
' '.join(run_opts),
image,
' {}'.format(command) if command else '')
]
# check if there are multi-instance tasks
mis = None
if 'multi_instance' in task:
if not _pool.enable_inter_node_communication:
raise RuntimeError(
('cannot run a multi-instance task on a '
'non-internode communication enabled '
'pool: {}').format(pool_id))
# container must be named
if name is None or len(name) == 0:
raise ValueError(
'multi-instance task must be invoked with a named '
'container')
# docker exec command cannot be empty/None
if command is None or len(command) == 0:
raise ValueError(
'multi-instance task must have an application command')
# set docker run as coordination command
try:
run_opts.remove('--rm')
except ValueError:
pass
# run in detached mode
run_opts.append('-d')
# get coordination command
try:
coordination_command = task[
'multi_instance']['coordination_command']
except KeyError:
coordination_command = None
cc_args = [
'env | grep AZ_BATCH_ >> {}'.format(envfile),
'docker run {} {}{}'.format(
' '.join(run_opts),
image,
'{}'.format(' ' + coordination_command)
if coordination_command else '')
]
# create multi-instance settings
num_instances = task['multi_instance']['num_instances']
if not isinstance(num_instances, int):
if num_instances == 'pool_specification_vm_count':
num_instances = config[
'pool_specification']['vm_count']
else:
raise ValueError(
('multi instance num instances setting '
'invalid: {}').format(num_instances))
mis = batchmodels.MultiInstanceSettings(
number_of_instances=num_instances,
coordination_command_line=_wrap_commands_in_shell(
cc_args, wait=False),
common_resource_files=[],
)
# add common resource files for multi-instance
try:
rfs = task['multi_instance']['resource_files']
except KeyError:
pass
else:
for rf in rfs:
try:
fm = rf['file_mode']
except KeyError:
fm = None
mis.common_resource_files.append(
batchmodels.ResourceFile(
file_path=rf['file_path'],
blob_source=rf['blob_source'],
file_mode=fm,
)
)
# set application command
task_commands = ['docker exec {} {}'.format(name, command)]
else:
task_commands = [
'env | grep AZ_BATCH_ >> {}'.format(envfile),
'docker run {} {}{}'.format(
' '.join(run_opts),
image,
'{}'.format(' ' + command) if command else '')
]
# create task
batchtask = batchmodels.TaskAddParameter(
id=task_id,
command_line=_wrap_commands_in_shell(task_commands),
run_elevated=True,
resource_files=[],
)
if mis is not None:
batchtask.multi_instance_settings = mis
batchtask.resource_files.append(
batchmodels.ResourceFile(
file_path=str(envfile),
@ -1259,42 +1374,13 @@ def add_jobs(batch_client, blob_client, config):
batchtask.depends_on = batchmodels.TaskDependencies(
task_ids=task['depends_on']
)
# check if there are multi-instance tasks
if 'multi_instance' in task:
if not _pool.enable_inter_node_communication:
raise RuntimeError(
('cannot run a multi-instance task on a '
'non-internode communication enabled '
'pool: {}').format(pool_id))
mis = batchmodels.MultiInstanceSettings(
number_of_instances=task[
'multi_instance']['num_instances'],
coordination_command_line=task[
'multi_instance']['coordination_command'],
common_resouorce_files=[]
)
# add common resource files for multi-instance
try:
rfs = task['multi_instance']['resource_files']
except KeyError:
pass
else:
for rf in rfs:
try:
fm = rf['file_mode']
except KeyError:
fm = None
mis.common_resource_files.append(
batchmodels.ResourceFile(
file_path=rf['file_path'],
blob_source=rf['blob_source'],
file_mode=fm,
)
)
batchtask.multi_instance_settings = mis
# create task
logger.info('Adding task {}: {}'.format(
task_id, batchtask.command_line))
if mis is not None:
logger.info(
'multi-instance task coordination command: {}'.format(
mis.coordination_command_line))
batch_client.task.add(job_id=job.id, task=batchtask)
@ -1311,6 +1397,69 @@ def del_jobs(batch_client, config):
batch_client.job.delete(job_id)
def clean_mi_jobs(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Clean up multi-instance jobs
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param dict config: configuration dict
"""
for job in config['job_specifications']:
job_id = job['id']
cleanup_job_id = 'shipyardcleanup-' + job_id
cleanup_job = batchmodels.JobAddParameter(
id=cleanup_job_id,
pool_info=batchmodels.PoolInformation(
pool_id=config['pool_specification']['id']),
)
try:
batch_client.job.add(cleanup_job)
logger.info('Added cleanup job: {}'.format(cleanup_job.id))
except batchmodels.batch_error.BatchErrorException as ex:
if 'The specified job already exists' not in ex.message.value:
raise
# get all cleanup tasks
cleanup_tasks = [x.id for x in batch_client.task.list(cleanup_job_id)]
# list all tasks in job
tasks = batch_client.task.list(job_id)
for task in tasks:
if (task.id in cleanup_tasks or
task.multi_instance_settings is None):
continue
# check if task is complete
if task.state == batchmodels.TaskState.completed:
name = task.multi_instance_settings.coordination_command_line.\
split('--name')[-1].split()[0]
# create cleanup task
batchtask = batchmodels.TaskAddParameter(
id=task.id,
multi_instance_settings=batchmodels.MultiInstanceSettings(
number_of_instances=task.
multi_instance_settings.number_of_instances,
coordination_command_line=_wrap_commands_in_shell([
'docker stop {}'.format(name),
'docker rm -v {}'.format(name),
'exit 0',
], wait=False),
),
command_line='/bin/sh -c "exit 0"',
run_elevated=True,
)
batch_client.task.add(job_id=cleanup_job_id, task=batchtask)
logger.debug(
('Waiting for docker multi-instance clean up task {} '
'for job {} to complete').format(batchtask.id, job_id))
# wait for cleanup task to complete before adding another
while True:
batchtask = batch_client.task.get(cleanup_job_id, task.id)
if batchtask.state == batchmodels.TaskState.completed:
break
time.sleep(1)
logger.info(
('Docker multi-instance clean up task {} for job {} '
'completed').format(batchtask.id, job_id))
def terminate_jobs(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Terminate jobs
@ -1525,14 +1674,18 @@ def main():
try:
with open(args.pool, 'r') as f:
config = merge_dict(config, json.load(f))
except ValueError:
raise
except Exception:
config['pool_specification'] = {
'id': args.poolid
}
if args.action in ('addjobs', 'deljobs'):
if args.action in ('addjobs', 'cleanmijobs', 'deljobs', 'termjobs'):
try:
with open(args.jobs, 'r') as f:
config = merge_dict(config, json.load(f))
except ValueError:
raise
except Exception:
config['job_specifications'] = [{
'id': args.jobid
@ -1567,7 +1720,9 @@ def main():
del_node(batch_client, config, args.nodeid)
elif args.action == 'addjobs':
add_jobs(batch_client, blob_client, config)
elif args.action == 'termjob':
elif args.action == 'cleanmijobs':
clean_mi_jobs(batch_client, config)
elif args.action == 'termjobs':
terminate_jobs(batch_client, config)
elif args.action == 'deljobs':
del_jobs(batch_client, config)
@ -1593,8 +1748,9 @@ def parseargs():
parser = argparse.ArgumentParser(
description='Shipyard: Azure Batch to Docker Bridge')
parser.add_argument(
'action', help='action: addpool, addjob, addsshuser, termjob, '
'delpool, delnode, deljob, delalljobs, grl, delstorage, clearstorage')
'action', help='action: addpool, addjob, addsshuser, cleanmijobs, '
'termjobs, delpool, delnode, deljob, delalljobs, grl, delstorage, '
'clearstorage')
parser.add_argument(
'--credentials',
help='credentials json config. required for all actions')