2153 строки
86 KiB
Python
Executable File
2153 строки
86 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Copyright (c) Microsoft Corporation
|
|
#
|
|
# All rights reserved.
|
|
#
|
|
# MIT License
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a
|
|
# copy of this software and associated documentation files (the "Software"),
|
|
# to deal in the Software without restriction, including without limitation
|
|
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
# and/or sell copies of the Software, and to permit persons to whom the
|
|
# Software is furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
# DEALINGS IN THE SOFTWARE.
|
|
|
|
# stdlib imports
|
|
from __future__ import print_function, unicode_literals
|
|
import argparse
|
|
import base64
|
|
import copy
|
|
import datetime
|
|
import json
|
|
import hashlib
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
try:
|
|
import pathlib
|
|
except ImportError:
|
|
import pathlib2 as pathlib
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
try:
|
|
import urllib.request as urllibreq
|
|
except ImportError:
|
|
import urllib as urllibreq
|
|
# non-stdlib imports
|
|
import azure.batch.batch_auth as batchauth
|
|
import azure.batch.batch_service_client as batch
|
|
import azure.batch.models as batchmodels
|
|
import azure.common
|
|
import azure.storage.blob as azureblob
|
|
import azure.storage.queue as azurequeue
|
|
import azure.storage.table as azuretable
|
|
# function remaps
|
|
try:
|
|
raw_input
|
|
except NameError:
|
|
raw_input = input
|
|
|
|
# create logger
|
|
logger = logging.getLogger('shipyard')
|
|
# global defines
|
|
_PY2 = sys.version_info.major == 2
|
|
_STORAGEACCOUNT = None
|
|
_STORAGEACCOUNTKEY = None
|
|
_STORAGEACCOUNTEP = None
|
|
_STORAGE_CONTAINERS = {
|
|
'blob_resourcefiles': None,
|
|
'blob_torrents': None,
|
|
'table_dht': None,
|
|
'table_registry': None,
|
|
'table_torrentinfo': None,
|
|
'table_images': None,
|
|
'table_globalresources': None,
|
|
'table_perf': None,
|
|
'queue_globalresources': None,
|
|
}
|
|
_AZUREFILE_DVD_BIN_VERSION = '0.4.1'
|
|
_AZUREFILE_DVD_BIN_URL = (
|
|
'https://github.com/Azure/azurefile-dockervolumedriver/releases'
|
|
'/download/' + _AZUREFILE_DVD_BIN_VERSION + '/azurefile-dockervolumedriver'
|
|
)
|
|
_AZUREFILE_DVD_BIN_MD5 = 'f3c1750583c4842dfbf95bbd56f65ede'
|
|
_NVIDIA_DOCKER = {
|
|
'ubuntuserver': {
|
|
'url': (
|
|
'https://github.com/NVIDIA/nvidia-docker/releases'
|
|
'/download/v1.0.0-rc.3/nvidia-docker_1.0.0.rc.3-1_amd64.deb'
|
|
),
|
|
'md5': '49990712ebf3778013fae81ee67f6c79'
|
|
}
|
|
}
|
|
_NVIDIA_DRIVER = 'nvidia-driver.run'
|
|
_MAX_REBOOT_RETRIES = 5
|
|
_NODEPREP_FILE = ('shipyard_nodeprep.sh', 'scripts/shipyard_nodeprep.sh')
|
|
_JOBPREP_FILE = ('docker_jp_block.sh', 'scripts/docker_jp_block.sh')
|
|
_CASCADE_FILE = ('cascade.py', 'cascade/cascade.py')
|
|
_SETUP_PR_FILE = (
|
|
'setup_private_registry.py', 'cascade/setup_private_registry.py'
|
|
)
|
|
_PERF_FILE = ('perf.py', 'cascade/perf.py')
|
|
_REGISTRY_FILE = None
|
|
_VM_TCP_NO_TUNE = (
|
|
'basic_a0', 'basic_a1', 'basic_a2', 'basic_a3', 'basic_a4', 'standard_a0',
|
|
'standard_a1', 'standard_d1', 'standard_d2', 'standard_d1_v2',
|
|
'standard_f1'
|
|
)
|
|
_SSH_KEY_PREFIX = 'id_rsa_shipyard'
|
|
_SSH_TUNNEL_SCRIPT = 'ssh_docker_tunnel_shipyard.sh'
|
|
_GENERIC_DOCKER_TASK_PREFIX = 'dockertask-'
|
|
_TEMP_DIR = pathlib.Path('/tmp')
|
|
|
|
|
|
def _setup_logger():
|
|
# type: () -> None
|
|
"""Set up logger"""
|
|
logger.setLevel(logging.DEBUG)
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(
|
|
'%(asctime)sZ %(levelname)s %(funcName)s:%(lineno)d %(message)s')
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
|
|
def _populate_global_settings(config, action):
|
|
# type: (dict, str) -> None
|
|
"""Populate global settings from config
|
|
:param dict config: configuration dict
|
|
:param str action: action
|
|
"""
|
|
global _STORAGEACCOUNT, _STORAGEACCOUNTKEY, _STORAGEACCOUNTEP, \
|
|
_REGISTRY_FILE
|
|
ssel = config['batch_shipyard']['storage_account_settings']
|
|
_STORAGEACCOUNT = config['credentials']['storage'][ssel]['account']
|
|
_STORAGEACCOUNTKEY = config['credentials']['storage'][ssel]['account_key']
|
|
try:
|
|
_STORAGEACCOUNTEP = config['credentials']['storage'][ssel]['endpoint']
|
|
except KeyError:
|
|
_STORAGEACCOUNTEP = 'core.windows.net'
|
|
try:
|
|
sep = config['batch_shipyard']['storage_entity_prefix']
|
|
except KeyError:
|
|
sep = None
|
|
if sep is None:
|
|
sep = ''
|
|
postfix = '-'.join(
|
|
(config['credentials']['batch']['account'].lower(),
|
|
config['pool_specification']['id'].lower()))
|
|
_STORAGE_CONTAINERS['blob_resourcefiles'] = '-'.join(
|
|
(sep + 'resourcefiles', postfix))
|
|
_STORAGE_CONTAINERS['blob_torrents'] = '-'.join(
|
|
(sep + 'torrents', postfix))
|
|
_STORAGE_CONTAINERS['table_dht'] = sep + 'dht'
|
|
_STORAGE_CONTAINERS['table_registry'] = sep + 'registry'
|
|
_STORAGE_CONTAINERS['table_torrentinfo'] = sep + 'torrentinfo'
|
|
_STORAGE_CONTAINERS['table_images'] = sep + 'images'
|
|
_STORAGE_CONTAINERS['table_globalresources'] = sep + 'globalresources'
|
|
_STORAGE_CONTAINERS['table_perf'] = sep + 'perf'
|
|
_STORAGE_CONTAINERS['queue_globalresources'] = '-'.join(
|
|
(sep + 'globalresources', postfix))
|
|
if action != 'addpool':
|
|
return
|
|
try:
|
|
dpre = config['docker_registry']['private']['enabled']
|
|
except KeyError:
|
|
dpre = False
|
|
# set docker private registry file info
|
|
if dpre:
|
|
rf = None
|
|
imgid = None
|
|
try:
|
|
rf = config['docker_registry']['private'][
|
|
'docker_save_registry_file']
|
|
imgid = config['docker_registry']['private'][
|
|
'docker_save_registry_image_id']
|
|
if rf is not None and len(rf) == 0:
|
|
rf = None
|
|
if imgid is not None and len(imgid) == 0:
|
|
imgid = None
|
|
if rf is None or imgid is None:
|
|
raise KeyError()
|
|
except KeyError:
|
|
if rf is None:
|
|
rf = 'resources/docker-registry-v2.tar.gz'
|
|
imgid = None
|
|
prf = pathlib.Path(rf)
|
|
# attempt to package if registry file doesn't exist
|
|
if not prf.exists() or prf.stat().st_size == 0 or imgid is None:
|
|
logger.debug(
|
|
'attempting to generate docker private registry tarball')
|
|
try:
|
|
imgid = subprocess.check_output(
|
|
'sudo docker images -q registry:2', shell=True).decode(
|
|
'utf-8').strip()
|
|
except subprocess.CalledProcessError:
|
|
rf = None
|
|
imgid = None
|
|
else:
|
|
if len(imgid) == 12:
|
|
if rf is None:
|
|
rf = 'resources/docker-registry-v2.tar.gz'
|
|
prf = pathlib.Path(rf)
|
|
subprocess.check_call(
|
|
'sudo docker save registry:2 '
|
|
'| gzip -c > {}'.format(rf), shell=True)
|
|
_REGISTRY_FILE = (prf.name if rf is not None else None, rf, imgid)
|
|
else:
|
|
_REGISTRY_FILE = (None, None, None)
|
|
logger.info('private registry settings: {}'.format(_REGISTRY_FILE))
|
|
|
|
|
|
def _wrap_commands_in_shell(commands, wait=True):
|
|
# type: (List[str], bool) -> str
|
|
"""Wrap commands in a shell
|
|
:param list commands: list of commands to wrap
|
|
:param bool wait: add wait for background processes
|
|
:rtype: str
|
|
:return: wrapped commands
|
|
"""
|
|
return '/bin/bash -c \'set -e; set -o pipefail; {}{}\''.format(
|
|
'; '.join(commands), '; wait' if wait else '')
|
|
|
|
|
|
def _create_credentials(config):
|
|
# type: (dict) -> tuple
|
|
"""Create authenticated clients
|
|
:param dict config: configuration dict
|
|
:rtype: tuple
|
|
:return: (batch client, blob client, queue client, table client)
|
|
"""
|
|
credentials = batchauth.SharedKeyCredentials(
|
|
config['credentials']['batch']['account'],
|
|
config['credentials']['batch']['account_key'])
|
|
batch_client = batch.BatchServiceClient(
|
|
credentials,
|
|
base_url=config['credentials']['batch']['account_service_url'])
|
|
blob_client = azureblob.BlockBlobService(
|
|
account_name=_STORAGEACCOUNT,
|
|
account_key=_STORAGEACCOUNTKEY,
|
|
endpoint_suffix=_STORAGEACCOUNTEP)
|
|
queue_client = azurequeue.QueueService(
|
|
account_name=_STORAGEACCOUNT,
|
|
account_key=_STORAGEACCOUNTKEY,
|
|
endpoint_suffix=_STORAGEACCOUNTEP)
|
|
table_client = azuretable.TableService(
|
|
account_name=_STORAGEACCOUNT,
|
|
account_key=_STORAGEACCOUNTKEY,
|
|
endpoint_suffix=_STORAGEACCOUNTEP)
|
|
return batch_client, blob_client, queue_client, table_client
|
|
|
|
|
|
def compute_md5_for_file(file, as_base64, blocksize=65536):
|
|
# type: (pathlib.Path, bool, int) -> str
|
|
"""Compute MD5 hash for file
|
|
:param pathlib.Path file: file to compute md5 for
|
|
:param bool as_base64: return as base64 encoded string
|
|
:param int blocksize: block size in bytes
|
|
:rtype: str
|
|
:return: md5 for file
|
|
"""
|
|
hasher = hashlib.md5()
|
|
with file.open('rb') as filedesc:
|
|
while True:
|
|
buf = filedesc.read(blocksize)
|
|
if not buf:
|
|
break
|
|
hasher.update(buf)
|
|
if as_base64:
|
|
if _PY2:
|
|
return base64.b64encode(hasher.digest())
|
|
else:
|
|
return str(base64.b64encode(hasher.digest()), 'ascii')
|
|
else:
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def upload_resource_files(blob_client, config, files):
|
|
# type: (azure.storage.blob.BlockBlobService, dict, List[tuple]) -> dict
|
|
"""Upload resource files to blob storage
|
|
:param azure.storage.blob.BlockBlobService blob_client: blob client
|
|
:param dict config: configuration dict
|
|
:rtype: dict
|
|
:return: sas url dict
|
|
"""
|
|
sas_urls = {}
|
|
for file in files:
|
|
# skip if no file is specified
|
|
if file[0] is None:
|
|
continue
|
|
upload = True
|
|
fp = pathlib.Path(file[1])
|
|
if (_REGISTRY_FILE is not None and fp.name == _REGISTRY_FILE[0] and
|
|
not fp.exists()):
|
|
logger.debug('skipping optional docker registry image: {}'.format(
|
|
_REGISTRY_FILE[0]))
|
|
continue
|
|
else:
|
|
# check if blob exists
|
|
try:
|
|
prop = blob_client.get_blob_properties(
|
|
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0])
|
|
if (prop.properties.content_settings.content_md5 ==
|
|
compute_md5_for_file(fp, True)):
|
|
logger.debug(
|
|
'remote file is the same for {}, skipping'.format(
|
|
file[0]))
|
|
upload = False
|
|
except azure.common.AzureMissingResourceHttpError:
|
|
pass
|
|
if upload:
|
|
logger.info('uploading file: {}'.format(file[1]))
|
|
blob_client.create_blob_from_path(
|
|
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0], file[1])
|
|
sas_urls[file[0]] = 'https://{}.blob.{}/{}/{}?{}'.format(
|
|
_STORAGEACCOUNT,
|
|
_STORAGEACCOUNTEP,
|
|
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0],
|
|
blob_client.generate_blob_shared_access_signature(
|
|
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0],
|
|
permission=azureblob.BlobPermissions.READ,
|
|
expiry=datetime.datetime.utcnow() +
|
|
datetime.timedelta(hours=2)
|
|
)
|
|
)
|
|
return sas_urls
|
|
|
|
|
|
def setup_nvidia_docker_package(blob_client, config):
|
|
# type: (azure.storage.blob.BlockBlobService, dict) -> pathlib.Path
|
|
"""Set up the nvidia docker package
|
|
:param azure.storage.blob.BlockBlobService blob_client: blob client
|
|
:param dict config: configuration dict
|
|
:rtype: pathlib.Path
|
|
:return: package path
|
|
"""
|
|
offer = config['pool_specification']['offer'].lower()
|
|
if offer == 'ubuntuserver':
|
|
pkg = pathlib.Path('resources/nvidia-docker.deb')
|
|
else:
|
|
raise ValueError('Offer {} is unsupported with nvidia docker'.format(
|
|
offer))
|
|
# check to see if package is downloaded
|
|
if (not pkg.exists() or
|
|
compute_md5_for_file(pkg, False) != _NVIDIA_DOCKER[offer]['md5']):
|
|
response = urllibreq.urlopen(_NVIDIA_DOCKER[offer]['url'])
|
|
with pkg.open('wb') as f:
|
|
f.write(response.read())
|
|
# check md5
|
|
if compute_md5_for_file(pkg, False) != _NVIDIA_DOCKER[offer]['md5']:
|
|
raise RuntimeError('md5 mismatch for {}'.format(pkg))
|
|
return pkg
|
|
|
|
|
|
def setup_azurefile_volume_driver(blob_client, config):
|
|
# type: (azure.storage.blob.BlockBlobService, dict) -> tuple
|
|
"""Set up the Azure File docker volume driver
|
|
:param azure.storage.blob.BlockBlobService blob_client: blob client
|
|
:param dict config: configuration dict
|
|
:rtype: tuple
|
|
:return: (bin path, service file path, service env file path,
|
|
volume creation script path)
|
|
"""
|
|
publisher = config['pool_specification']['publisher'].lower()
|
|
offer = config['pool_specification']['offer'].lower()
|
|
sku = config['pool_specification']['sku'].lower()
|
|
# check to see if binary is downloaded
|
|
bin = pathlib.Path('resources/azurefile-dockervolumedriver')
|
|
if (not bin.exists() or
|
|
compute_md5_for_file(bin, False) != _AZUREFILE_DVD_BIN_MD5):
|
|
response = urllibreq.urlopen(_AZUREFILE_DVD_BIN_URL)
|
|
with bin.open('wb') as f:
|
|
f.write(response.read())
|
|
# check md5
|
|
if compute_md5_for_file(bin, False) != _AZUREFILE_DVD_BIN_MD5:
|
|
raise RuntimeError('md5 mismatch for {}'.format(bin))
|
|
if (publisher == 'canonical' and offer == 'ubuntuserver' and
|
|
sku.startswith('14.04')):
|
|
srv = pathlib.Path('resources/azurefile-dockervolumedriver.conf')
|
|
else:
|
|
srv = pathlib.Path('resources/azurefile-dockervolumedriver.service')
|
|
# construct systemd env file
|
|
sa = None
|
|
sakey = None
|
|
saep = None
|
|
for svkey in config[
|
|
'global_resources']['docker_volumes']['shared_data_volumes']:
|
|
conf = config[
|
|
'global_resources']['docker_volumes']['shared_data_volumes'][svkey]
|
|
if conf['volume_driver'] == 'azurefile':
|
|
# check every entry to ensure the same storage account
|
|
ssel = conf['storage_account_settings']
|
|
_sa = config['credentials']['storage'][ssel]['account']
|
|
if sa is not None and sa != _sa:
|
|
raise ValueError(
|
|
'multiple storage accounts are not supported for '
|
|
'azurefile docker volume driver')
|
|
sa = _sa
|
|
sakey = config['credentials']['storage'][ssel]['account_key']
|
|
saep = config['credentials']['storage'][ssel]['endpoint']
|
|
else:
|
|
raise NotImplementedError(
|
|
'Unsupported volume driver: {}'.format(conf['volume_driver']))
|
|
if sa is None or sakey is None:
|
|
raise RuntimeError(
|
|
'storage account or storage account key not specified for '
|
|
'azurefile docker volume driver')
|
|
srvenv = pathlib.Path('resources/azurefile-dockervolumedriver.env')
|
|
with srvenv.open('w', encoding='utf8') as f:
|
|
f.write('AZURE_STORAGE_ACCOUNT={}\n'.format(sa))
|
|
f.write('AZURE_STORAGE_ACCOUNT_KEY={}\n'.format(sakey))
|
|
f.write('AZURE_STORAGE_BASE={}\n'.format(saep))
|
|
# create docker volume mount command script
|
|
volcreate = pathlib.Path('resources/azurefile-dockervolume-create.sh')
|
|
with volcreate.open('w', encoding='utf8') as f:
|
|
f.write('#!/usr/bin/env bash\n\n')
|
|
for svkey in config[
|
|
'global_resources']['docker_volumes']['shared_data_volumes']:
|
|
conf = config[
|
|
'global_resources']['docker_volumes'][
|
|
'shared_data_volumes'][svkey]
|
|
opts = [
|
|
'-o share={}'.format(conf['azure_file_share_name'])
|
|
]
|
|
for opt in conf['mount_options']:
|
|
opts.append('-o {}'.format(opt))
|
|
f.write('docker volume create -d azurefile --name {} {}\n'.format(
|
|
svkey, ' '.join(opts)))
|
|
return bin, srv, srvenv, volcreate
|
|
|
|
|
|
def add_pool(batch_client, blob_client, config):
|
|
# type: (batch.BatchServiceClient, azureblob.BlockBlobService,dict) -> None
|
|
"""Add a Batch pool to account
|
|
:param azure.batch.batch_service_client.BatchServiceClient: batch client
|
|
:param azure.storage.blob.BlockBlobService blob_client: blob client
|
|
:param dict config: configuration dict
|
|
"""
|
|
publisher = config['pool_specification']['publisher']
|
|
offer = config['pool_specification']['offer']
|
|
sku = config['pool_specification']['sku']
|
|
vm_count = config['pool_specification']['vm_count']
|
|
vm_size = config['pool_specification']['vm_size']
|
|
try:
|
|
maxtasks = config['pool_specification']['max_tasks_per_node']
|
|
except KeyError:
|
|
maxtasks = 1
|
|
# cascade settings
|
|
try:
|
|
perf = config['batch_shipyard']['store_timing_metrics']
|
|
except KeyError:
|
|
perf = False
|
|
# peer-to-peer settings
|
|
try:
|
|
p2p = config['data_replication']['peer_to_peer']['enabled']
|
|
except KeyError:
|
|
p2p = False
|
|
if p2p:
|
|
nonp2pcd = False
|
|
try:
|
|
p2psbias = config['data_replication'][
|
|
'peer_to_peer']['direct_download_seed_bias']
|
|
if p2psbias is None or p2psbias < 1:
|
|
raise KeyError()
|
|
except KeyError:
|
|
p2psbias = vm_count // 10
|
|
if p2psbias < 1:
|
|
p2psbias = 1
|
|
try:
|
|
p2pcomp = config[
|
|
'data_replication']['peer_to_peer']['compression']
|
|
except KeyError:
|
|
p2pcomp = True
|
|
else:
|
|
p2psbias = 0
|
|
p2pcomp = False
|
|
try:
|
|
nonp2pcd = config[
|
|
'data_replication']['non_peer_to_peer_concurrent_downloading']
|
|
except KeyError:
|
|
nonp2pcd = True
|
|
# private registry settings
|
|
try:
|
|
pcont = config['docker_registry']['private']['container']
|
|
pregpubpull = config['docker_registry']['private'][
|
|
'allow_public_docker_hub_pull_on_missing']
|
|
preg = config['docker_registry']['private']['enabled']
|
|
except KeyError:
|
|
preg = False
|
|
pregpubpull = False
|
|
# create private registry flags
|
|
if preg:
|
|
preg = ' -r {}:{}:{}'.format(
|
|
pcont, _REGISTRY_FILE[0], _REGISTRY_FILE[2])
|
|
else:
|
|
preg = ''
|
|
# create torrent flags
|
|
torrentflags = ' -t {}:{}:{}:{}:{}'.format(
|
|
p2p, nonp2pcd, p2psbias, p2pcomp, pregpubpull)
|
|
# docker settings
|
|
try:
|
|
dockeruser = config['docker_registry']['login']['username']
|
|
dockerpw = config['docker_registry']['login']['password']
|
|
except KeyError:
|
|
dockeruser = None
|
|
dockerpw = None
|
|
try:
|
|
use_shipyard_docker_image = config[
|
|
'batch_shipyard']['use_shipyard_docker_image']
|
|
except KeyError:
|
|
use_shipyard_docker_image = True
|
|
try:
|
|
block_for_gr = config[
|
|
'pool_specification']['block_until_all_global_resources_loaded']
|
|
except KeyError:
|
|
block_for_gr = True
|
|
if block_for_gr:
|
|
block_for_gr = ','.join(
|
|
[r for r in config['global_resources']['docker_images']])
|
|
# check volume mounts for azurefile
|
|
azurefile_vd = False
|
|
try:
|
|
shared_data_volumes = config[
|
|
'global_resources']['docker_volumes']['shared_data_volumes']
|
|
for key in shared_data_volumes:
|
|
if shared_data_volumes[key]['volume_driver'] == 'azurefile':
|
|
azurefile_vd = True
|
|
break
|
|
except KeyError:
|
|
pass
|
|
# prefix settings
|
|
try:
|
|
prefix = config['batch_shipyard']['storage_entity_prefix']
|
|
if len(prefix) == 0:
|
|
prefix = None
|
|
except KeyError:
|
|
prefix = None
|
|
# create resource files list
|
|
_rflist = [_NODEPREP_FILE, _JOBPREP_FILE, _REGISTRY_FILE]
|
|
if not use_shipyard_docker_image:
|
|
_rflist.append(_CASCADE_FILE)
|
|
_rflist.append(_SETUP_PR_FILE)
|
|
if perf:
|
|
_rflist.append(_PERF_FILE)
|
|
# handle azurefile docker volume driver
|
|
if azurefile_vd:
|
|
afbin, afsrv, afenv, afvc = setup_azurefile_volume_driver(
|
|
blob_client, config)
|
|
_rflist.append((str(afbin.name), str(afbin)))
|
|
_rflist.append((str(afsrv.name), str(afsrv)))
|
|
_rflist.append((str(afenv.name), str(afenv)))
|
|
_rflist.append((str(afvc.name), str(afvc)))
|
|
# gpu settings
|
|
if (vm_size.lower().startswith('standard_nc') or
|
|
vm_size.lower().startswith('standard_nv')):
|
|
gpupkg = setup_nvidia_docker_package(blob_client, config)
|
|
_rflist.append((str(gpupkg.name), str(gpupkg)))
|
|
gpu_env = '{}:{}:{}:{}'.format(
|
|
vm_size.lower().startswith('standard_nv'),
|
|
config['pool_specification']['gpu']['nvidia_driver']['version'],
|
|
_NVIDIA_DRIVER,
|
|
gpupkg.name)
|
|
else:
|
|
gpu_env = None
|
|
# pick latest sku
|
|
node_agent_skus = batch_client.account.list_node_agent_skus()
|
|
skus_to_use = [
|
|
(nas, image_ref) for nas in node_agent_skus for image_ref in sorted(
|
|
nas.verified_image_references, key=lambda item: item.sku)
|
|
if image_ref.publisher.lower() == publisher.lower() and
|
|
image_ref.offer.lower() == offer.lower() and
|
|
image_ref.sku.lower() == sku.lower()
|
|
]
|
|
sku_to_use, image_ref_to_use = skus_to_use[-1]
|
|
# upload resource files
|
|
sas_urls = upload_resource_files(blob_client, config, _rflist)
|
|
del _rflist
|
|
# create start task commandline
|
|
start_task = [
|
|
'{} -o {} -s {}{}{}{}{}{}{}{}{}'.format(
|
|
_NODEPREP_FILE[0],
|
|
offer,
|
|
sku,
|
|
preg,
|
|
torrentflags,
|
|
' -a' if azurefile_vd else '',
|
|
' -b {}'.format(block_for_gr) if block_for_gr else '',
|
|
' -d' if use_shipyard_docker_image else '',
|
|
' -g {}'.format(gpu_env) if gpu_env is not None else '',
|
|
' -n' if vm_size.lower() not in _VM_TCP_NO_TUNE else '',
|
|
' -p {}'.format(prefix) if prefix else '',
|
|
),
|
|
]
|
|
try:
|
|
start_task.extend(
|
|
config['pool_specification']['additional_node_prep_commands'])
|
|
except KeyError:
|
|
pass
|
|
# create pool param
|
|
pool = batchmodels.PoolAddParameter(
|
|
id=config['pool_specification']['id'],
|
|
virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
|
|
image_reference=image_ref_to_use,
|
|
node_agent_sku_id=sku_to_use.id),
|
|
vm_size=vm_size,
|
|
target_dedicated=vm_count,
|
|
max_tasks_per_node=maxtasks,
|
|
enable_inter_node_communication=config[
|
|
'pool_specification']['inter_node_communication_enabled'],
|
|
start_task=batchmodels.StartTask(
|
|
command_line=_wrap_commands_in_shell(start_task, wait=False),
|
|
run_elevated=True,
|
|
wait_for_success=True,
|
|
environment_settings=[
|
|
batchmodels.EnvironmentSetting('LC_ALL', 'en_US.UTF-8'),
|
|
batchmodels.EnvironmentSetting(
|
|
'CASCADE_STORAGE_ENV',
|
|
'{}:{}:{}'.format(
|
|
_STORAGEACCOUNT,
|
|
_STORAGEACCOUNTEP,
|
|
_STORAGEACCOUNTKEY)
|
|
)
|
|
],
|
|
resource_files=[],
|
|
),
|
|
)
|
|
for rf in sas_urls:
|
|
pool.start_task.resource_files.append(
|
|
batchmodels.ResourceFile(
|
|
file_path=rf,
|
|
blob_source=sas_urls[rf])
|
|
)
|
|
if gpu_env:
|
|
pool.start_task.resource_files.append(
|
|
batchmodels.ResourceFile(
|
|
file_path=_NVIDIA_DRIVER,
|
|
blob_source=config[
|
|
'pool_specification']['gpu']['nvidia_driver']['source'],
|
|
file_mode='0755')
|
|
)
|
|
if preg:
|
|
ssel = config['docker_registry']['private']['storage_account_settings']
|
|
pool.start_task.environment_settings.append(
|
|
batchmodels.EnvironmentSetting(
|
|
'CASCADE_PRIVATE_REGISTRY_STORAGE_ENV',
|
|
'{}:{}:{}'.format(
|
|
config['credentials']['storage'][ssel]['account'],
|
|
config['credentials']['storage'][ssel]['endpoint'],
|
|
config['credentials']['storage'][ssel]['account_key'])
|
|
)
|
|
)
|
|
del ssel
|
|
if (dockeruser is not None and len(dockeruser) > 0 and
|
|
dockerpw is not None and len(dockerpw) > 0):
|
|
pool.start_task.environment_settings.append(
|
|
batchmodels.EnvironmentSetting('DOCKER_LOGIN_USERNAME', dockeruser)
|
|
)
|
|
pool.start_task.environment_settings.append(
|
|
batchmodels.EnvironmentSetting('DOCKER_LOGIN_PASSWORD', dockerpw)
|
|
)
|
|
if perf:
|
|
pool.start_task.environment_settings.append(
|
|
batchmodels.EnvironmentSetting('CASCADE_TIMING', '1')
|
|
)
|
|
# create pool if not exists
|
|
try:
|
|
logger.info('Attempting to create pool: {}'.format(pool.id))
|
|
logger.debug('node prep commandline: {}'.format(
|
|
pool.start_task.command_line))
|
|
batch_client.pool.add(pool)
|
|
logger.info('Created pool: {}'.format(pool.id))
|
|
except batchmodels.BatchErrorException as e:
|
|
if e.error.code != 'PoolExists':
|
|
raise
|
|
else:
|
|
logger.error('Pool {!r} already exists'.format(pool.id))
|
|
# wait for pool idle
|
|
node_state = frozenset(
|
|
(batchmodels.ComputeNodeState.starttaskfailed,
|
|
batchmodels.ComputeNodeState.unusable,
|
|
batchmodels.ComputeNodeState.idle)
|
|
)
|
|
try:
|
|
reboot_on_failed = config[
|
|
'pool_specification']['reboot_on_start_task_failed']
|
|
except KeyError:
|
|
reboot_on_failed = False
|
|
nodes = _wait_for_pool_ready(
|
|
batch_client, node_state, pool.id, reboot_on_failed)
|
|
# create admin user on each node if requested
|
|
add_ssh_tunnel_user(batch_client, config, nodes)
|
|
# log remote login settings
|
|
get_remote_login_settings(batch_client, config, nodes)
|
|
|
|
|
|
def add_ssh_tunnel_user(batch_client, config, nodes=None):
|
|
# type: (batch.BatchServiceClient, dict,
|
|
# List[batchmodels.ComputeNode]) -> None
|
|
"""Add an SSH user to node and optionally generate an SSH tunneling script
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
:param list nodes: list of nodes
|
|
"""
|
|
pool_id = config['pool_specification']['id']
|
|
try:
|
|
docker_user = config[
|
|
'pool_specification']['ssh_docker_tunnel']['username']
|
|
if docker_user is None:
|
|
raise KeyError()
|
|
except KeyError:
|
|
logger.info('not creating ssh tunnel user on pool {}'.format(pool_id))
|
|
else:
|
|
ssh_priv_key = None
|
|
try:
|
|
ssh_pub_key = config[
|
|
'pool_specification']['ssh_docker_tunnel']['ssh_public_key']
|
|
except KeyError:
|
|
ssh_pub_key = None
|
|
try:
|
|
gen_tunnel_script = config[
|
|
'pool_specification']['ssh_docker_tunnel'][
|
|
'generate_tunnel_script']
|
|
except KeyError:
|
|
gen_tunnel_script = False
|
|
# generate ssh key pair if not specified
|
|
if ssh_pub_key is None:
|
|
ssh_priv_key, ssh_pub_key = generate_ssh_keypair()
|
|
# get node list if not provided
|
|
if nodes is None:
|
|
nodes = batch_client.compute_node.list(pool_id)
|
|
for node in nodes:
|
|
add_admin_user_to_compute_node(
|
|
batch_client, pool_id, node, docker_user, ssh_pub_key)
|
|
# generate tunnel script if requested
|
|
if gen_tunnel_script:
|
|
ssh_args = ['ssh']
|
|
if ssh_priv_key is not None:
|
|
ssh_args.append('-i')
|
|
ssh_args.append(ssh_priv_key)
|
|
ssh_args.extend([
|
|
'-o', 'StrictHostKeyChecking=no',
|
|
'-o', 'UserKnownHostsFile=/dev/null',
|
|
'-p', '$2', '-N', '-L', '2375:localhost:2375',
|
|
'{}@$1'.format(docker_user)])
|
|
with open(_SSH_TUNNEL_SCRIPT, 'w') as fd:
|
|
fd.write('#!/usr/bin/env bash\n')
|
|
fd.write('set -e\n')
|
|
fd.write(' '.join(ssh_args))
|
|
fd.write('\n')
|
|
os.chmod(_SSH_TUNNEL_SCRIPT, 0o755)
|
|
logger.info('ssh tunnel script generated: {}'.format(
|
|
_SSH_TUNNEL_SCRIPT))
|
|
|
|
|
|
def _wait_for_pool_ready(batch_client, node_state, pool_id, reboot_on_failed):
|
|
# type: (batch.BatchServiceClient, List[batchmodels.ComputeNodeState],
|
|
# str, bool) -> List[batchmodels.ComputeNode]
|
|
"""Wait for pool to enter "ready": steady state and all nodes idle
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
:param str pool_id: pool id
|
|
:param bool reboot_on_failed: reboot node on failed start state
|
|
:rtype: list
|
|
:return: list of nodes
|
|
"""
|
|
logger.info(
|
|
'waiting for all nodes in pool {} to reach one of: {!r}'.format(
|
|
pool_id, node_state))
|
|
i = 0
|
|
reboot_map = {}
|
|
while True:
|
|
# refresh pool to ensure that there is no resize error
|
|
pool = batch_client.pool.get(pool_id)
|
|
if pool.resize_error is not None:
|
|
raise RuntimeError(
|
|
'resize error encountered for pool {}: code={} msg={}'.format(
|
|
pool.id, pool.resize_error.code,
|
|
pool.resize_error.message))
|
|
nodes = list(batch_client.compute_node.list(pool.id))
|
|
if (reboot_on_failed and
|
|
any(node.state == batchmodels.ComputeNodeState.starttaskfailed
|
|
for node in nodes)):
|
|
for node in nodes:
|
|
if (node.state ==
|
|
batchmodels.ComputeNodeState.starttaskfailed):
|
|
if node.id not in reboot_map:
|
|
reboot_map[node.id] = 0
|
|
if reboot_map[node.id] > _MAX_REBOOT_RETRIES:
|
|
raise RuntimeError(
|
|
('ran out of reboot retries recovering node {} '
|
|
'in pool {}').format(node.id, pool.id))
|
|
_reboot_node(batch_client, pool.id, node.id, True)
|
|
reboot_map[node.id] += 1
|
|
# refresh node list
|
|
nodes = list(batch_client.compute_node.list(pool.id))
|
|
if (len(nodes) >= pool.target_dedicated and
|
|
all(node.state in node_state for node in nodes)):
|
|
if any(node.state != batchmodels.ComputeNodeState.idle
|
|
for node in nodes):
|
|
raise RuntimeError(
|
|
'node(s) of pool {} not in idle state'.format(pool.id))
|
|
else:
|
|
return nodes
|
|
i += 1
|
|
if i % 3 == 0:
|
|
i = 0
|
|
logger.debug('waiting for {} nodes to reach desired state'.format(
|
|
pool.target_dedicated))
|
|
for node in nodes:
|
|
logger.debug('{}: {}'.format(node.id, node.state))
|
|
time.sleep(10)
|
|
|
|
|
|
def generate_ssh_keypair():
|
|
# type: (str) -> tuple
|
|
"""Generate an ssh keypair for use with user logins
|
|
:param str key_fileprefix: key file prefix
|
|
:rtype: tuple
|
|
:return: (private key filename, public key filename)
|
|
"""
|
|
pubkey = _SSH_KEY_PREFIX + '.pub'
|
|
try:
|
|
if os.path.exists(_SSH_KEY_PREFIX):
|
|
old = _SSH_KEY_PREFIX + '.old'
|
|
if os.path.exists(old):
|
|
os.remove(old)
|
|
os.rename(_SSH_KEY_PREFIX, old)
|
|
except OSError:
|
|
pass
|
|
try:
|
|
if os.path.exists(pubkey):
|
|
old = pubkey + '.old'
|
|
if os.path.exists(old):
|
|
os.remove(old)
|
|
os.rename(pubkey, old)
|
|
except OSError:
|
|
pass
|
|
logger.info('generating ssh key pair')
|
|
subprocess.check_call(
|
|
['ssh-keygen', '-f', _SSH_KEY_PREFIX, '-t', 'rsa', '-N', ''''''])
|
|
return (_SSH_KEY_PREFIX, pubkey)
|
|
|
|
|
|
def add_admin_user_to_compute_node(
|
|
batch_client, pool_id, node, username, ssh_public_key):
|
|
# type: (batch.BatchServiceClient, str, batchmodels.ComputeNode, str,
|
|
# str) -> None
|
|
"""Adds an administrative user to the Batch Compute Node
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param str pool_id: The pool id containing the node.
|
|
:param node: The compute node.
|
|
:type node: `batchserviceclient.models.ComputeNode`
|
|
:param str username: user name
|
|
:param str ssh_public_key: ssh rsa public key
|
|
"""
|
|
logger.info('adding user {} to node {} in pool {}'.format(
|
|
username, node.id, pool_id))
|
|
try:
|
|
batch_client.compute_node.add_user(
|
|
pool_id,
|
|
node.id,
|
|
batchmodels.ComputeNodeUser(
|
|
username,
|
|
is_admin=True,
|
|
password=None,
|
|
ssh_public_key=open(ssh_public_key, 'rb').read().decode('utf8')
|
|
)
|
|
)
|
|
except batchmodels.batch_error.BatchErrorException as ex:
|
|
if 'The node user already exists' not in ex.message.value:
|
|
raise
|
|
|
|
|
|
def _add_global_resource(
|
|
queue_client, table_client, config, pk, p2pcsd, grtype):
|
|
# type: (azurequeue.QueueService, azuretable.TableService, dict, str,
|
|
# bool, str) -> None
|
|
"""Add global resources
|
|
:param azure.storage.queue.QueueService queue_client: queue client
|
|
:param azure.storage.table.TableService table_client: table client
|
|
:param dict config: configuration dict
|
|
:param str pk: partition key
|
|
:param int p2pcsd: peer-to-peer concurrent source downloads
|
|
:param str grtype: global resources type
|
|
"""
|
|
try:
|
|
for gr in config['global_resources'][grtype]:
|
|
if grtype == 'docker_images':
|
|
prefix = 'docker'
|
|
else:
|
|
raise NotImplementedError()
|
|
resource = '{}:{}'.format(prefix, gr)
|
|
logger.info('adding global resource: {}'.format(resource))
|
|
table_client.insert_or_replace_entity(
|
|
_STORAGE_CONTAINERS['table_globalresources'],
|
|
{
|
|
'PartitionKey': pk,
|
|
'RowKey': hashlib.sha1(
|
|
resource.encode('utf8')).hexdigest(),
|
|
'Resource': resource,
|
|
}
|
|
)
|
|
for _ in range(0, p2pcsd):
|
|
queue_client.put_message(
|
|
_STORAGE_CONTAINERS['queue_globalresources'], resource)
|
|
except KeyError:
|
|
pass
|
|
|
|
|
|
def populate_queues(queue_client, table_client, config):
|
|
# type: (azurequeue.QueueService, azuretable.TableService, dict) -> None
|
|
"""Populate queues
|
|
:param azure.storage.queue.QueueService queue_client: queue client
|
|
:param azure.storage.table.TableService table_client: table client
|
|
:param dict config: configuration dict
|
|
"""
|
|
try:
|
|
preg = config['docker_registry']['private']['enabled']
|
|
except KeyError:
|
|
preg = False
|
|
pk = '{}${}'.format(
|
|
config['credentials']['batch']['account'],
|
|
config['pool_specification']['id'])
|
|
# if using docker public hub, then populate registry table with hub
|
|
if not preg:
|
|
table_client.insert_or_replace_entity(
|
|
_STORAGE_CONTAINERS['table_registry'],
|
|
{
|
|
'PartitionKey': pk,
|
|
'RowKey': 'registry.hub.docker.com',
|
|
'Port': 80,
|
|
}
|
|
)
|
|
# get p2pcsd setting
|
|
try:
|
|
p2p = config['data_replication']['peer_to_peer']['enabled']
|
|
except KeyError:
|
|
p2p = False
|
|
if p2p:
|
|
try:
|
|
p2pcsd = config['data_replication']['peer_to_peer'][
|
|
'concurrent_source_downloads']
|
|
if p2pcsd is None or p2pcsd < 1:
|
|
raise KeyError()
|
|
except KeyError:
|
|
p2pcsd = config['pool_specification']['vm_count'] // 6
|
|
if p2pcsd < 1:
|
|
p2pcsd = 1
|
|
else:
|
|
p2pcsd = 1
|
|
# add global resources
|
|
_add_global_resource(
|
|
queue_client, table_client, config, pk, p2pcsd, 'docker_images')
|
|
|
|
|
|
def _adjust_settings_for_pool_creation(config):
|
|
# type: (dict) -> None
|
|
"""Adjust settings for pool creation
|
|
:param dict config: configuration dict
|
|
"""
|
|
publisher = config['pool_specification']['publisher'].lower()
|
|
offer = config['pool_specification']['offer'].lower()
|
|
sku = config['pool_specification']['sku'].lower()
|
|
vm_size = config['pool_specification']['vm_size']
|
|
# enforce publisher/offer/sku restrictions
|
|
allowed = False
|
|
shipyard_container_required = True
|
|
if publisher == 'canonical':
|
|
if offer == 'ubuntuserver':
|
|
if sku >= '14.04.0-lts':
|
|
allowed = True
|
|
if sku >= '16.04.0-lts':
|
|
shipyard_container_required = False
|
|
elif publisher == 'credativ':
|
|
if offer == 'debian':
|
|
if sku >= '8':
|
|
allowed = True
|
|
elif publisher == 'openlogic':
|
|
if offer.startswith('centos'):
|
|
if sku >= '7':
|
|
allowed = True
|
|
elif publisher == 'redhat':
|
|
if offer == 'rhel':
|
|
if sku >= '7':
|
|
allowed = True
|
|
elif publisher == 'suse':
|
|
if offer.startswith('sles'):
|
|
if sku >= '12':
|
|
allowed = True
|
|
elif offer == 'opensuse-leap':
|
|
if sku >= '42':
|
|
allowed = True
|
|
elif offer == 'opensuse':
|
|
if sku == '13.2':
|
|
allowed = True
|
|
# check for valid image if gpu, currently only ubuntu 16.04 is supported
|
|
if ((vm_size.lower().startswith('standard_nc') or
|
|
vm_size.lower().startswith('standard_nv')) and
|
|
(publisher != 'canonical' and offer != 'ubuntuserver' and
|
|
sku < '16.04.0-lts')):
|
|
allowed = False
|
|
# oracle linux is not supported due to UEKR4 requirement
|
|
if not allowed:
|
|
raise ValueError(
|
|
('Unsupported Docker Host VM Config, publisher={} offer={} '
|
|
'sku={} vm_size={}').format(publisher, offer, sku, vm_size))
|
|
# adjust for shipyard container requirement
|
|
if shipyard_container_required:
|
|
config['batch_shipyard']['use_shipyard_docker_image'] = True
|
|
logger.warning(
|
|
('forcing shipyard docker image to be used due to '
|
|
'VM config, publisher={} offer={} sku={}').format(
|
|
publisher, offer, sku))
|
|
# adjust inter node comm setting
|
|
vm_count = int(config['pool_specification']['vm_count'])
|
|
try:
|
|
p2p = config['data_replication']['peer_to_peer']['enabled']
|
|
except KeyError:
|
|
p2p = False
|
|
try:
|
|
internode = config[
|
|
'pool_specification']['inter_node_communication_enabled']
|
|
except KeyError:
|
|
internode = True
|
|
max_vms = 20 if publisher == 'microsoftwindowsserver' else 40
|
|
if vm_count > max_vms:
|
|
if p2p:
|
|
logger.warning(
|
|
('disabling peer-to-peer transfer as pool size of {} exceeds '
|
|
'max limit of {} vms for inter-node communication').format(
|
|
vm_count, max_vms))
|
|
if 'data_replication' not in config:
|
|
config['data_replication'] = {}
|
|
if 'peer_to_peer' not in config['data_replication']:
|
|
config['data_replication']['peer_to_peer'] = {}
|
|
config['data_replication']['peer_to_peer']['enabled'] = False
|
|
p2p = False
|
|
if internode:
|
|
logger.warning(
|
|
('disabling inter-node communication as pool size of {} '
|
|
'exceeds max limit of {} vms for setting').format(
|
|
vm_count, max_vms))
|
|
config['pool_specification'][
|
|
'inter_node_communication_enabled'] = False
|
|
internode = False
|
|
# ensure settings p2p/internode settings are compatible
|
|
if p2p and not internode:
|
|
config['pool_specification']['inter_node_communication_enabled'] = True
|
|
logger.warning(
|
|
'force enabling inter-node communication due to peer-to-peer '
|
|
'transfer')
|
|
|
|
|
|
def resize_pool(batch_client, config):
|
|
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
|
|
"""Resize a pool
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
"""
|
|
pool_id = config['pool_specification']['id']
|
|
vm_count = int(config['pool_specification']['vm_count'])
|
|
logger.info('Resizing pool {} to {}'.format(pool_id, vm_count))
|
|
batch_client.pool.resize(
|
|
pool_id=pool_id,
|
|
pool_resize_parameter=batchmodels.PoolResizeParameter(
|
|
target_dedicated=vm_count,
|
|
resize_timeout=datetime.timedelta(minutes=20),
|
|
)
|
|
)
|
|
|
|
|
|
def _confirm_action(config, msg=None):
|
|
# type: (dict) -> bool
|
|
"""Confirm action with user before proceeding
|
|
:param dict config: configuration dict
|
|
:rtype: bool
|
|
:return: if user confirmed or not
|
|
"""
|
|
if config['_auto_confirm']:
|
|
return True
|
|
if msg is None:
|
|
msg = 'action'
|
|
user = raw_input('Confirm {} [y/n]: '.format(msg))
|
|
if user.lower() in ['y', 'yes']:
|
|
return True
|
|
return False
|
|
|
|
|
|
def del_pool(batch_client, config):
|
|
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
|
|
"""Delete a pool
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
"""
|
|
pool_id = config['pool_specification']['id']
|
|
if not _confirm_action(config, 'delete {} pool'.format(pool_id)):
|
|
return
|
|
logger.info('Deleting pool: {}'.format(pool_id))
|
|
batch_client.pool.delete(pool_id)
|
|
|
|
|
|
def del_node(batch_client, config, node_id):
|
|
# type: (batch.BatchServiceClient, dict, str) -> None
|
|
"""Delete a node in a pool
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
:param str node_id: node id to delete
|
|
"""
|
|
if node_id is None or len(node_id) == 0:
|
|
raise ValueError('node id is invalid')
|
|
pool_id = config['pool_specification']['id']
|
|
if not _confirm_action(
|
|
config, 'delete node {} from {} pool'.format(node_id, pool_id)):
|
|
return
|
|
logger.info('Deleting node {} from pool {}'.format(node_id, pool_id))
|
|
batch_client.pool.remove_nodes(
|
|
pool_id=pool_id,
|
|
node_remove_parameter=batchmodels.NodeRemoveParameter(
|
|
node_list=[node_id],
|
|
)
|
|
)
|
|
|
|
|
|
def _reboot_node(batch_client, pool_id, node_id, wait):
|
|
# type: (batch.BatchServiceClient, str, str, bool) -> None
|
|
"""Reboot a node in a pool
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param str pool_id: pool id of node
|
|
:param str node_id: node id to delete
|
|
:param bool wait: wait for node to enter rebooting state
|
|
"""
|
|
logger.info('Rebooting node {} from pool {}'.format(node_id, pool_id))
|
|
batch_client.compute_node.reboot(
|
|
pool_id=pool_id,
|
|
node_id=node_id,
|
|
)
|
|
if wait:
|
|
logger.debug('waiting for node {} to enter rebooting state'.format(
|
|
node_id))
|
|
while True:
|
|
node = batch_client.compute_node.get(pool_id, node_id)
|
|
if node.state == batchmodels.ComputeNodeState.rebooting:
|
|
break
|
|
else:
|
|
time.sleep(1)
|
|
|
|
|
|
def add_jobs(batch_client, blob_client, config):
|
|
# type: (batch.BatchServiceClient, azureblob.BlockBlobService,dict) -> None
|
|
"""Add jobs
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param azure.storage.blob.BlockBlobService blob_client: blob client
|
|
:param dict config: configuration dict
|
|
"""
|
|
# get the pool inter-node comm setting
|
|
pool_id = config['pool_specification']['id']
|
|
_pool = batch_client.pool.get(pool_id)
|
|
global_resources = []
|
|
for gr in config['global_resources']['docker_images']:
|
|
global_resources.append(gr)
|
|
# TODO add global resources for non-docker resources
|
|
jpcmdline = _wrap_commands_in_shell([
|
|
'$AZ_BATCH_NODE_SHARED_DIR/{} {}'.format(
|
|
_JOBPREP_FILE[0], ' '.join(global_resources))])
|
|
for jobspec in config['job_specifications']:
|
|
job = batchmodels.JobAddParameter(
|
|
id=jobspec['id'],
|
|
pool_info=batchmodels.PoolInformation(pool_id=pool_id),
|
|
job_preparation_task=batchmodels.JobPreparationTask(
|
|
command_line=jpcmdline,
|
|
wait_for_success=True,
|
|
run_elevated=True,
|
|
rerun_on_node_reboot_after_success=False,
|
|
)
|
|
)
|
|
# perform checks:
|
|
# 1. if tasks have dependencies, set it if so
|
|
# 2. if there are multi-instance tasks
|
|
try:
|
|
mi_ac = jobspec['multi_instance_auto_complete']
|
|
except KeyError:
|
|
mi_ac = True
|
|
job.uses_task_dependencies = False
|
|
multi_instance = False
|
|
docker_container_name = None
|
|
for task in jobspec['tasks']:
|
|
# do not break, check to ensure ids are set on each task if
|
|
# task dependencies are set
|
|
if 'depends_on' in task and len(task['depends_on']) > 0:
|
|
if ('id' not in task or task['id'] is None or
|
|
len(task['id']) == 0):
|
|
raise ValueError(
|
|
'task id is not specified, but depends_on is set')
|
|
job.uses_task_dependencies = True
|
|
if 'multi_instance' in task:
|
|
if multi_instance and mi_ac:
|
|
raise ValueError(
|
|
'cannot specify more than one multi-instance task '
|
|
'per job with auto completion enabled')
|
|
multi_instance = True
|
|
docker_container_name = task['name']
|
|
# add multi-instance settings
|
|
set_terminate_on_all_tasks_complete = False
|
|
if multi_instance and mi_ac:
|
|
if (docker_container_name is None or
|
|
len(docker_container_name) == 0):
|
|
raise ValueError(
|
|
'multi-instance task must be invoked with a named '
|
|
'container')
|
|
set_terminate_on_all_tasks_complete = True
|
|
job.job_release_task = batchmodels.JobReleaseTask(
|
|
command_line=_wrap_commands_in_shell(
|
|
['docker stop {}'.format(docker_container_name),
|
|
'docker rm -v {}'.format(docker_container_name)]),
|
|
run_elevated=True,
|
|
)
|
|
logger.info('Adding job: {}'.format(job.id))
|
|
try:
|
|
batch_client.job.add(job)
|
|
except batchmodels.batch_error.BatchErrorException as ex:
|
|
if 'The specified job already exists' in ex.message.value:
|
|
# cannot re-use an existing job if multi-instance due to
|
|
# job release requirement
|
|
if multi_instance and mi_ac:
|
|
raise
|
|
else:
|
|
raise
|
|
del mi_ac
|
|
del multi_instance
|
|
del docker_container_name
|
|
# add all tasks under job
|
|
for task in jobspec['tasks']:
|
|
# get image name
|
|
image = task['image']
|
|
# get or generate task id
|
|
try:
|
|
task_id = task['id']
|
|
if task_id is None or len(task_id) == 0:
|
|
raise KeyError()
|
|
except KeyError:
|
|
# get filtered, sorted list of generic docker task ids
|
|
try:
|
|
tasklist = sorted(
|
|
filter(lambda x: x.id.startswith(
|
|
_GENERIC_DOCKER_TASK_PREFIX), list(
|
|
batch_client.task.list(job.id))),
|
|
key=lambda y: y.id)
|
|
tasknum = int(tasklist[-1].id.split('-')[-1]) + 1
|
|
except (batchmodels.batch_error.BatchErrorException,
|
|
IndexError):
|
|
tasknum = 0
|
|
task_id = '{0}{1:03d}'.format(
|
|
_GENERIC_DOCKER_TASK_PREFIX, tasknum)
|
|
# set run and exec commands
|
|
docker_run_cmd = 'docker run'
|
|
docker_exec_cmd = 'docker exec'
|
|
# get generic run opts
|
|
try:
|
|
run_opts = task['additional_docker_run_options']
|
|
except KeyError:
|
|
run_opts = []
|
|
# parse remove container option
|
|
try:
|
|
rm_container = task['remove_container_after_exit']
|
|
except KeyError:
|
|
rm_container = False
|
|
else:
|
|
if rm_container and '--rm' not in run_opts:
|
|
run_opts.append('--rm')
|
|
# parse name option
|
|
try:
|
|
name = task['name']
|
|
if name is not None:
|
|
run_opts.append('--name {}'.format(name))
|
|
except KeyError:
|
|
name = None
|
|
# parse labels option
|
|
try:
|
|
labels = task['labels']
|
|
if labels is not None and len(labels) > 0:
|
|
for label in labels:
|
|
run_opts.append('-l {}'.format(label))
|
|
del labels
|
|
except KeyError:
|
|
pass
|
|
# parse ports option
|
|
try:
|
|
ports = task['ports']
|
|
if ports is not None and len(ports) > 0:
|
|
for port in ports:
|
|
run_opts.append('-p {}'.format(port))
|
|
del ports
|
|
except KeyError:
|
|
pass
|
|
# parse entrypoint
|
|
try:
|
|
entrypoint = task['entrypoint']
|
|
if entrypoint is not None:
|
|
run_opts.append('--entrypoint {}'.format(entrypoint))
|
|
del entrypoint
|
|
except KeyError:
|
|
pass
|
|
# parse data volumes
|
|
try:
|
|
data_volumes = task['data_volumes']
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
if data_volumes is not None and len(data_volumes) > 0:
|
|
for key in data_volumes:
|
|
dvspec = config[
|
|
'global_resources']['docker_volumes'][
|
|
'data_volumes'][key]
|
|
try:
|
|
hostpath = dvspec['host_path']
|
|
except KeyError:
|
|
hostpath = None
|
|
if hostpath is not None and len(hostpath) > 0:
|
|
run_opts.append('-v {}:{}'.format(
|
|
hostpath, dvspec['container_path']))
|
|
else:
|
|
run_opts.append('-v {}'.format(
|
|
dvspec['container_path']))
|
|
# parse shared data volumes
|
|
try:
|
|
shared_data_volumes = task['shared_data_volumes']
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
if (shared_data_volumes is not None and
|
|
len(shared_data_volumes) > 0):
|
|
for key in shared_data_volumes:
|
|
dvspec = config[
|
|
'global_resources']['docker_volumes'][
|
|
'shared_data_volumes'][key]
|
|
run_opts.append('-v {}:{}'.format(
|
|
key, dvspec['container_path']))
|
|
# get command
|
|
try:
|
|
command = task['command']
|
|
if command is not None and len(command) == 0:
|
|
raise KeyError()
|
|
except KeyError:
|
|
command = None
|
|
# get and create env var file
|
|
envfile = '.shipyard.envlist'
|
|
sas_urls = None
|
|
try:
|
|
env_vars = jobspec['environment_variables']
|
|
except KeyError:
|
|
env_vars = None
|
|
try:
|
|
infiniband = task['infiniband']
|
|
except KeyError:
|
|
infiniband = False
|
|
# ensure we're on HPC VMs with inter node comm enabled
|
|
if infiniband:
|
|
if not _pool.enable_inter_node_communication:
|
|
raise RuntimeError(
|
|
('cannot initialize an infiniband task on a '
|
|
'non-internode communication enabled '
|
|
'pool: {}').format(pool_id))
|
|
if (_pool.vm_size.lower() != 'standard_a8' and
|
|
_pool.vm_size.lower() != 'standard_a9'):
|
|
raise RuntimeError(
|
|
('cannot initialize an infiniband task on nodes '
|
|
'without RDMA, pool: {} vm_size: {}').format(
|
|
pool_id, _pool.vm_size))
|
|
publisher = _pool.virtual_machine_configuration.\
|
|
image_reference.publisher.lower()
|
|
offer = _pool.virtual_machine_configuration.\
|
|
image_reference.offer.lower()
|
|
# TODO support SLES-HPC, for now only support CentOS-HPC
|
|
if publisher != 'openlogic' and offer != 'centos-hpc':
|
|
raise ValueError(
|
|
('Unsupported infiniband VM config, publisher={} '
|
|
'offer={}').format(publisher, offer))
|
|
# ensure we're on n-series for gpu
|
|
try:
|
|
gpu = task['gpu']
|
|
except KeyError:
|
|
gpu = False
|
|
if gpu:
|
|
if not (_pool.vm_size.lower().startswith('standard_nc') or
|
|
_pool.vm_size.lower().startswith('standard_nv')):
|
|
raise RuntimeError(
|
|
('cannot initialize a gpu task on nodes without '
|
|
'gpus, pool: {} vm_size: {}').format(
|
|
pool_id, _pool.vm_size))
|
|
publisher = _pool.virtual_machine_configuration.\
|
|
image_reference.publisher.lower()
|
|
offer = _pool.virtual_machine_configuration.\
|
|
image_reference.offer.lower()
|
|
sku = _pool.virtual_machine_configuration.\
|
|
image_reference.sku.lower()
|
|
# TODO other images as they become available with gpu support
|
|
if (publisher != 'canonical' and offer != 'ubuntuserver' and
|
|
sku < '16.04.0-lts'):
|
|
raise ValueError(
|
|
('Unsupported gpu VM config, publisher={} offer={} '
|
|
'sku={}').format(publisher, offer, sku))
|
|
# override docker commands with nvidia docker wrapper
|
|
docker_run_cmd = 'nvidia-docker run'
|
|
docker_exec_cmd = 'nvidia-docker exec'
|
|
try:
|
|
task_ev = task['environment_variables']
|
|
if env_vars is None:
|
|
env_vars = task_ev
|
|
else:
|
|
env_vars = merge_dict(env_vars, task_ev)
|
|
except KeyError:
|
|
pass
|
|
if env_vars is not None and len(env_vars) > 0:
|
|
envfiletmp = _TEMP_DIR / '{}{}'.format(task_id, envfile)
|
|
envfileloc = '{}taskrf-{}/{}{}'.format(
|
|
config['batch_shipyard']['storage_entity_prefix'],
|
|
job.id, task_id, envfile)
|
|
with envfiletmp.open('w', encoding='utf8') as f:
|
|
for key in env_vars:
|
|
f.write('{}={}{}'.format(
|
|
key, env_vars[key], os.linesep))
|
|
if infiniband:
|
|
f.write('I_MPI_FABRICS=shm:dapl{}'.format(os.linesep))
|
|
f.write('I_MPI_DAPL_PROVIDER=ofa-v2-ib0{}'.format(
|
|
os.linesep))
|
|
f.write('I_MPI_DYNAMIC_CONNECTION=0{}'.format(
|
|
os.linesep))
|
|
# create a manpath entry for potentially buggy
|
|
# intel mpivars.sh
|
|
f.write(
|
|
'MANPATH=/usr/share/man:/usr/local/man{}'.format(
|
|
os.linesep))
|
|
# upload env var file if exists
|
|
sas_urls = upload_resource_files(
|
|
blob_client, config, [(envfileloc, str(envfiletmp))])
|
|
envfiletmp.unlink()
|
|
if len(sas_urls) != 1:
|
|
raise RuntimeError('unexpected number of sas urls')
|
|
# always add option for envfile
|
|
run_opts.append('--env-file {}'.format(envfile))
|
|
# add infiniband run opts
|
|
if infiniband:
|
|
run_opts.append('--net=host')
|
|
run_opts.append('--ulimit memlock=9223372036854775807')
|
|
run_opts.append('--device=/dev/hvnd_rdma')
|
|
run_opts.append('--device=/dev/infiniband/rdma_cm')
|
|
run_opts.append('--device=/dev/infiniband/uverbs0')
|
|
run_opts.append('-v /etc/rdma:/etc/rdma:ro')
|
|
run_opts.append('-v /opt/intel:/opt/intel:ro')
|
|
# mount batch root dir
|
|
run_opts.append(
|
|
'-v $AZ_BATCH_NODE_ROOT_DIR:$AZ_BATCH_NODE_ROOT_DIR')
|
|
# set working directory
|
|
run_opts.append('-w $AZ_BATCH_TASK_WORKING_DIR')
|
|
# check if there are multi-instance tasks
|
|
mis = None
|
|
if 'multi_instance' in task:
|
|
if not _pool.enable_inter_node_communication:
|
|
raise RuntimeError(
|
|
('cannot run a multi-instance task on a '
|
|
'non-internode communication enabled '
|
|
'pool: {}').format(pool_id))
|
|
# container must be named
|
|
if name is None or len(name) == 0:
|
|
raise ValueError(
|
|
'multi-instance task must be invoked with a named '
|
|
'container')
|
|
# docker exec command cannot be empty/None
|
|
if command is None or len(command) == 0:
|
|
raise ValueError(
|
|
'multi-instance task must have an application command')
|
|
# set docker run as coordination command
|
|
try:
|
|
run_opts.remove('--rm')
|
|
except ValueError:
|
|
pass
|
|
# run in detached mode
|
|
run_opts.append('-d')
|
|
# ensure host networking stack is used
|
|
if '--net=host' not in run_opts:
|
|
run_opts.append('--net=host')
|
|
# get coordination command
|
|
try:
|
|
coordination_command = task[
|
|
'multi_instance']['coordination_command']
|
|
if (coordination_command is not None and
|
|
len(coordination_command) == 0):
|
|
raise KeyError()
|
|
except KeyError:
|
|
coordination_command = None
|
|
cc_args = [
|
|
'env | grep AZ_BATCH_ >> {}'.format(envfile),
|
|
'{} {} {}{}'.format(
|
|
docker_run_cmd,
|
|
' '.join(run_opts),
|
|
image,
|
|
'{}'.format(' ' + coordination_command)
|
|
if coordination_command else '')
|
|
]
|
|
# create multi-instance settings
|
|
num_instances = task['multi_instance']['num_instances']
|
|
if not isinstance(num_instances, int):
|
|
if num_instances == 'pool_specification_vm_count':
|
|
num_instances = config[
|
|
'pool_specification']['vm_count']
|
|
elif num_instances == 'pool_current_dedicated':
|
|
num_instances = _pool.current_dedicated
|
|
else:
|
|
raise ValueError(
|
|
('multi instance num instances setting '
|
|
'invalid: {}').format(num_instances))
|
|
mis = batchmodels.MultiInstanceSettings(
|
|
number_of_instances=num_instances,
|
|
coordination_command_line=_wrap_commands_in_shell(
|
|
cc_args, wait=False),
|
|
common_resource_files=[],
|
|
)
|
|
# add common resource files for multi-instance
|
|
try:
|
|
rfs = task['multi_instance']['resource_files']
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
for rf in rfs:
|
|
try:
|
|
fm = rf['file_mode']
|
|
except KeyError:
|
|
fm = None
|
|
mis.common_resource_files.append(
|
|
batchmodels.ResourceFile(
|
|
file_path=rf['file_path'],
|
|
blob_source=rf['blob_source'],
|
|
file_mode=fm,
|
|
)
|
|
)
|
|
# set application command
|
|
task_commands = [
|
|
'{} {} {}'.format(docker_exec_cmd, name, command)
|
|
]
|
|
else:
|
|
task_commands = [
|
|
'env | grep AZ_BATCH_ >> {}'.format(envfile),
|
|
'{} {} {}{}'.format(
|
|
docker_run_cmd,
|
|
' '.join(run_opts),
|
|
image,
|
|
'{}'.format(' ' + command) if command else '')
|
|
]
|
|
# create task
|
|
batchtask = batchmodels.TaskAddParameter(
|
|
id=task_id,
|
|
command_line=_wrap_commands_in_shell(task_commands),
|
|
run_elevated=True,
|
|
resource_files=[],
|
|
)
|
|
if mis is not None:
|
|
batchtask.multi_instance_settings = mis
|
|
batchtask.resource_files.append(
|
|
batchmodels.ResourceFile(
|
|
file_path=str(envfile),
|
|
blob_source=next(iter(sas_urls.values())),
|
|
file_mode='0640',
|
|
)
|
|
)
|
|
# add additional resource files
|
|
try:
|
|
rfs = task['resource_files']
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
for rf in rfs:
|
|
try:
|
|
fm = rf['file_mode']
|
|
except KeyError:
|
|
fm = None
|
|
batchtask.resource_files.append(
|
|
batchmodels.ResourceFile(
|
|
file_path=rf['file_path'],
|
|
blob_source=rf['blob_source'],
|
|
file_mode=fm,
|
|
)
|
|
)
|
|
# add task dependencies
|
|
if 'depends_on' in task and len(task['depends_on']) > 0:
|
|
batchtask.depends_on = batchmodels.TaskDependencies(
|
|
task_ids=task['depends_on']
|
|
)
|
|
# create task
|
|
logger.info('Adding task {}: {}'.format(
|
|
task_id, batchtask.command_line))
|
|
if mis is not None:
|
|
logger.info(
|
|
'multi-instance task coordination command: {}'.format(
|
|
mis.coordination_command_line))
|
|
batch_client.task.add(job_id=job.id, task=batchtask)
|
|
# update job if job autocompletion is needed
|
|
if set_terminate_on_all_tasks_complete:
|
|
batch_client.job.update(
|
|
job_id=job.id,
|
|
job_update_parameter=batchmodels.JobUpdateParameter(
|
|
pool_info=batchmodels.PoolInformation(pool_id=pool_id),
|
|
on_all_tasks_complete=batchmodels.
|
|
OnAllTasksComplete.terminate_job))
|
|
|
|
|
|
def del_jobs(batch_client, config):
|
|
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
|
|
"""Delete jobs
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
"""
|
|
for job in config['job_specifications']:
|
|
job_id = job['id']
|
|
if not _confirm_action(config, 'delete {} job'.format(job_id)):
|
|
continue
|
|
logger.info('Deleting job: {}'.format(job_id))
|
|
batch_client.job.delete(job_id)
|
|
|
|
|
|
def clean_mi_jobs(batch_client, config):
|
|
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
|
|
"""Clean up multi-instance jobs
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
"""
|
|
for job in config['job_specifications']:
|
|
job_id = job['id']
|
|
cleanup_job_id = 'shipyardcleanup-' + job_id
|
|
cleanup_job = batchmodels.JobAddParameter(
|
|
id=cleanup_job_id,
|
|
pool_info=batchmodels.PoolInformation(
|
|
pool_id=config['pool_specification']['id']),
|
|
)
|
|
try:
|
|
batch_client.job.add(cleanup_job)
|
|
logger.info('Added cleanup job: {}'.format(cleanup_job.id))
|
|
except batchmodels.batch_error.BatchErrorException as ex:
|
|
if 'The specified job already exists' not in ex.message.value:
|
|
raise
|
|
# get all cleanup tasks
|
|
cleanup_tasks = [x.id for x in batch_client.task.list(cleanup_job_id)]
|
|
# list all tasks in job
|
|
tasks = batch_client.task.list(job_id)
|
|
for task in tasks:
|
|
if (task.id in cleanup_tasks or
|
|
task.multi_instance_settings is None):
|
|
continue
|
|
# check if task is complete
|
|
if task.state == batchmodels.TaskState.completed:
|
|
name = task.multi_instance_settings.coordination_command_line.\
|
|
split('--name')[-1].split()[0]
|
|
# create cleanup task
|
|
batchtask = batchmodels.TaskAddParameter(
|
|
id=task.id,
|
|
multi_instance_settings=batchmodels.MultiInstanceSettings(
|
|
number_of_instances=task.
|
|
multi_instance_settings.number_of_instances,
|
|
coordination_command_line=_wrap_commands_in_shell([
|
|
'docker stop {}'.format(name),
|
|
'docker rm -v {}'.format(name),
|
|
'exit 0',
|
|
], wait=False),
|
|
),
|
|
command_line='/bin/sh -c "exit 0"',
|
|
run_elevated=True,
|
|
)
|
|
batch_client.task.add(job_id=cleanup_job_id, task=batchtask)
|
|
logger.debug(
|
|
('Waiting for docker multi-instance clean up task {} '
|
|
'for job {} to complete').format(batchtask.id, job_id))
|
|
# wait for cleanup task to complete before adding another
|
|
while True:
|
|
batchtask = batch_client.task.get(cleanup_job_id, task.id)
|
|
if batchtask.state == batchmodels.TaskState.completed:
|
|
break
|
|
time.sleep(1)
|
|
logger.info(
|
|
('Docker multi-instance clean up task {} for job {} '
|
|
'completed').format(batchtask.id, job_id))
|
|
|
|
|
|
def del_clean_mi_jobs(batch_client, config):
|
|
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
|
|
"""Delete clean up multi-instance jobs
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
"""
|
|
for job in config['job_specifications']:
|
|
job_id = job['id']
|
|
cleanup_job_id = 'shipyardcleanup-' + job_id
|
|
logger.info('deleting job: {}'.format(cleanup_job_id))
|
|
try:
|
|
batch_client.job.delete(cleanup_job_id)
|
|
except batchmodels.batch_error.BatchErrorException:
|
|
pass
|
|
|
|
|
|
def terminate_jobs(batch_client, config):
|
|
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
|
|
"""Terminate jobs
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
"""
|
|
for job in config['job_specifications']:
|
|
job_id = job['id']
|
|
if not _confirm_action(config, 'terminate {} job'.format(job_id)):
|
|
continue
|
|
logger.info('Terminating job: {}'.format(job_id))
|
|
batch_client.job.terminate(job_id)
|
|
|
|
|
|
def del_all_jobs(batch_client, config):
|
|
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
|
|
"""Delete all jobs
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
"""
|
|
logger.debug('Getting list of all jobs...')
|
|
jobs = batch_client.job.list()
|
|
for job in jobs:
|
|
if not _confirm_action(config, 'delete {} job'.format(job.id)):
|
|
continue
|
|
logger.info('Deleting job: {}'.format(job.id))
|
|
batch_client.job.delete(job.id)
|
|
|
|
|
|
def get_remote_login_settings(batch_client, config, nodes=None):
|
|
# type: (batch.BatchServiceClient, dict, List[str], bool) -> None
|
|
"""Get remote login settings
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
:param list nodes: list of nodes
|
|
"""
|
|
pool_id = config['pool_specification']['id']
|
|
if nodes is None:
|
|
nodes = batch_client.compute_node.list(pool_id)
|
|
for node in nodes:
|
|
rls = batch_client.compute_node.get_remote_login_settings(
|
|
pool_id, node.id)
|
|
logger.info('node {}: {}'.format(node.id, rls))
|
|
|
|
|
|
def stream_file_and_wait_for_task(batch_client):
|
|
# type: (batch.BatchServiceClient) -> None
|
|
"""Stream a file and wait for task to complete
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
"""
|
|
job_id = raw_input('Enter job id: ')
|
|
task_id = raw_input('Enter task id: ')
|
|
file = raw_input('Enter task-relative file path to stream [stdout.txt]: ')
|
|
if file == '' or file is None:
|
|
file = 'stdout.txt'
|
|
logger.debug('attempting to stream file {} from job={} task={}'.format(
|
|
file, job_id, task_id))
|
|
curr = 0
|
|
end = 0
|
|
completed = False
|
|
timeout = datetime.timedelta(minutes=5)
|
|
time_to_timeout_at = datetime.datetime.now() + timeout
|
|
while datetime.datetime.now() < time_to_timeout_at:
|
|
# get task file properties
|
|
try:
|
|
tfp = batch_client.file.get_node_file_properties_from_task(
|
|
job_id, task_id, file, raw=True)
|
|
except batchmodels.BatchErrorException as ex:
|
|
if ('The specified operation is not valid for the current '
|
|
'state of the resource.' in ex.message.value):
|
|
time.sleep(1)
|
|
continue
|
|
else:
|
|
raise
|
|
size = int(tfp.response.headers['Content-Length'])
|
|
if size != end and curr != size:
|
|
end = size
|
|
# get stdout.txt
|
|
frag = batch_client.file.get_from_task(
|
|
job_id, task_id, file,
|
|
batchmodels.FileGetFromTaskOptions(
|
|
ocp_range='bytes={}-{}'.format(curr, end))
|
|
)
|
|
for f in frag:
|
|
print(f.decode('utf8'), end='')
|
|
curr = end
|
|
elif completed:
|
|
print()
|
|
break
|
|
if not completed:
|
|
task = batch_client.task.get(job_id, task_id)
|
|
if task.state == batchmodels.TaskState.completed:
|
|
completed = True
|
|
time.sleep(1)
|
|
|
|
|
|
def get_file_via_task(batch_client, config):
|
|
# type: (batch.BatchServiceClient, dict) -> None
|
|
"""Get a file task style
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
"""
|
|
job_id = raw_input('Enter job id: ')
|
|
task_id = raw_input('Enter task id: ')
|
|
file = raw_input(
|
|
'Enter task-relative file path to retrieve [stdout.txt]: ')
|
|
if file == '' or file is None:
|
|
file = 'stdout.txt'
|
|
# check if file exists on disk; a possible race condition here is
|
|
# understood
|
|
fp = pathlib.Path(pathlib.Path(file).name)
|
|
if (fp.exists() and
|
|
not _confirm_action(config, 'file overwrite of {}'.format(file))):
|
|
raise RuntimeError('file already exists: {}'.format(file))
|
|
logger.debug('attempting to retrieve file {} from job={} task={}'.format(
|
|
file, job_id, task_id))
|
|
stream = batch_client.file.get_from_task(job_id, task_id, file)
|
|
with fp.open('wb') as f:
|
|
for data in stream:
|
|
f.write(data)
|
|
logger.debug('file {} retrieved from job={} task={} bytes={}'.format(
|
|
file, job_id, task_id, fp.stat().st_size))
|
|
|
|
|
|
def get_file_via_node(batch_client, config, node_id):
|
|
# type: (batch.BatchServiceClient, dict, str) -> None
|
|
"""Get a file node style
|
|
:param batch_client: The batch client to use.
|
|
:type batch_client: `batchserviceclient.BatchServiceClient`
|
|
:param dict config: configuration dict
|
|
:param str nodeid: node id
|
|
"""
|
|
if node_id is None or len(node_id) == 0:
|
|
raise ValueError('node id is invalid')
|
|
pool_id = config['pool_specification']['id']
|
|
file = raw_input('Enter node-relative file path to retrieve: ')
|
|
if file == '' or file is None:
|
|
raise RuntimeError('specified invalid file to retrieve')
|
|
# check if file exists on disk; a possible race condition here is
|
|
# understood
|
|
fp = pathlib.Path(pathlib.Path(file).name)
|
|
if (fp.exists() and
|
|
not _confirm_action(config, 'file overwrite of {}'.format(file))):
|
|
raise RuntimeError('file already exists: {}'.format(file))
|
|
logger.debug('attempting to retrieve file {} from pool={} node={}'.format(
|
|
file, pool_id, node_id))
|
|
stream = batch_client.file.get_from_compute_node(pool_id, node_id, file)
|
|
with fp.open('wb') as f:
|
|
for data in stream:
|
|
f.write(data)
|
|
logger.debug('file {} retrieved from pool={} node={} bytes={}'.format(
|
|
file, pool_id, node_id, fp.stat().st_size))
|
|
|
|
|
|
def delete_storage_containers(blob_client, queue_client, table_client, config):
|
|
# type: (azureblob.BlockBlobService, azurequeue.QueueService,
|
|
# azuretable.TableService, dict) -> None
|
|
"""Delete storage containers
|
|
:param azure.storage.blob.BlockBlobService blob_client: blob client
|
|
:param azure.storage.queue.QueueService queue_client: queue client
|
|
:param azure.storage.table.TableService table_client: table client
|
|
:param dict config: configuration dict
|
|
"""
|
|
for key in _STORAGE_CONTAINERS:
|
|
if key.startswith('blob_'):
|
|
blob_client.delete_container(_STORAGE_CONTAINERS[key])
|
|
elif key.startswith('table_'):
|
|
table_client.delete_table(_STORAGE_CONTAINERS[key])
|
|
elif key.startswith('queue_'):
|
|
queue_client.delete_queue(_STORAGE_CONTAINERS[key])
|
|
|
|
|
|
def _clear_blobs(blob_client, container):
|
|
# type: (azureblob.BlockBlobService, str) -> None
|
|
"""Clear blobs in container
|
|
:param azure.storage.blob.BlockBlobService blob_client: blob client
|
|
:param str container: container to clear blobs from
|
|
"""
|
|
logger.info('deleting blobs: {}'.format(container))
|
|
blobs = blob_client.list_blobs(container)
|
|
for blob in blobs:
|
|
blob_client.delete_blob(container, blob.name)
|
|
|
|
|
|
def _clear_blob_task_resourcefiles(blob_client, container, config):
|
|
# type: (azureblob.BlockBlobService, str, dict) -> None
|
|
"""Clear task resource file blobs in container
|
|
:param azure.storage.blob.BlockBlobService blob_client: blob client
|
|
:param str container: container to clear blobs from
|
|
:param dict config: configuration dict
|
|
"""
|
|
envfileloc = '{}taskrf-'.format(
|
|
config['batch_shipyard']['storage_entity_prefix'])
|
|
logger.info('deleting blobs with prefix: {}'.format(envfileloc))
|
|
blobs = blob_client.list_blobs(container, prefix=envfileloc)
|
|
for blob in blobs:
|
|
blob_client.delete_blob(container, blob.name)
|
|
|
|
|
|
def _clear_table(table_client, table_name, config):
|
|
"""Clear table entities
|
|
:param azure.storage.table.TableService table_client: table client
|
|
:param str table_name: table name
|
|
:param dict config: configuration dict
|
|
"""
|
|
# type: (azuretable.TableService, str, dict) -> None
|
|
logger.info('clearing table: {}'.format(table_name))
|
|
ents = table_client.query_entities(
|
|
table_name, filter='PartitionKey eq \'{}${}\''.format(
|
|
config['credentials']['batch']['account'],
|
|
config['pool_specification']['id'])
|
|
)
|
|
# batch delete entities
|
|
i = 0
|
|
bet = azuretable.TableBatch()
|
|
for ent in ents:
|
|
bet.delete_entity(ent['PartitionKey'], ent['RowKey'])
|
|
i += 1
|
|
if i == 100:
|
|
table_client.commit_batch(table_name, bet)
|
|
bet = azuretable.TableBatch()
|
|
i = 0
|
|
if i > 0:
|
|
table_client.commit_batch(table_name, bet)
|
|
|
|
|
|
def clear_storage_containers(blob_client, queue_client, table_client, config):
|
|
# type: (azureblob.BlockBlobService, azurequeue.QueueService,
|
|
# azuretable.TableService, dict) -> None
|
|
"""Clear storage containers
|
|
:param azure.storage.blob.BlockBlobService blob_client: blob client
|
|
:param azure.storage.queue.QueueService queue_client: queue client
|
|
:param azure.storage.table.TableService table_client: table client
|
|
:param dict config: configuration dict
|
|
"""
|
|
try:
|
|
perf = config['batch_shipyard']['store_timing_metrics']
|
|
except KeyError:
|
|
perf = False
|
|
for key in _STORAGE_CONTAINERS:
|
|
if key.startswith('blob_'):
|
|
# TODO this is temp to preserve registry upload
|
|
if key != 'blob_resourcefiles':
|
|
_clear_blobs(blob_client, _STORAGE_CONTAINERS[key])
|
|
else:
|
|
_clear_blob_task_resourcefiles(
|
|
blob_client, _STORAGE_CONTAINERS[key], config)
|
|
elif key.startswith('table_'):
|
|
try:
|
|
_clear_table(table_client, _STORAGE_CONTAINERS[key], config)
|
|
except azure.common.AzureMissingResourceHttpError:
|
|
if key != 'table_perf' or perf:
|
|
raise
|
|
elif key.startswith('queue_'):
|
|
logger.info('clearing queue: {}'.format(_STORAGE_CONTAINERS[key]))
|
|
queue_client.clear_messages(_STORAGE_CONTAINERS[key])
|
|
|
|
|
|
def create_storage_containers(blob_client, queue_client, table_client, config):
|
|
# type: (azureblob.BlockBlobService, azurequeue.QueueService,
|
|
# azuretable.TableService, dict) -> None
|
|
"""Create storage containers
|
|
:param azure.storage.blob.BlockBlobService blob_client: blob client
|
|
:param azure.storage.queue.QueueService queue_client: queue client
|
|
:param azure.storage.table.TableService table_client: table client
|
|
:param dict config: configuration dict
|
|
"""
|
|
try:
|
|
perf = config['batch_shipyard']['store_timing_metrics']
|
|
except KeyError:
|
|
perf = False
|
|
for key in _STORAGE_CONTAINERS:
|
|
if key.startswith('blob_'):
|
|
logger.info(
|
|
'creating container: {}'.format(_STORAGE_CONTAINERS[key]))
|
|
blob_client.create_container(_STORAGE_CONTAINERS[key])
|
|
elif key.startswith('table_'):
|
|
if key == 'table_perf' and not perf:
|
|
continue
|
|
logger.info('creating table: {}'.format(_STORAGE_CONTAINERS[key]))
|
|
table_client.create_table(_STORAGE_CONTAINERS[key])
|
|
elif key.startswith('queue_'):
|
|
logger.info('creating queue: {}'.format(_STORAGE_CONTAINERS[key]))
|
|
queue_client.create_queue(_STORAGE_CONTAINERS[key])
|
|
|
|
|
|
def merge_dict(dict1, dict2):
|
|
# type: (dict, dict) -> dict
|
|
"""Recursively merge dictionaries: dict2 on to dict1. This differs
|
|
from dict.update() in that values that are dicts are recursively merged.
|
|
Note that only dict value types are merged, not lists, etc.
|
|
|
|
:param dict dict1: dictionary to merge to
|
|
:param dict dict2: dictionary to merge with
|
|
:rtype: dict
|
|
:return: merged dictionary
|
|
"""
|
|
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
|
|
raise ValueError('dict1 or dict2 is not a dictionary')
|
|
result = copy.deepcopy(dict1)
|
|
for k, v in dict2.items():
|
|
if k in result and isinstance(result[k], dict):
|
|
result[k] = merge_dict(result[k], v)
|
|
else:
|
|
result[k] = copy.deepcopy(v)
|
|
return result
|
|
|
|
|
|
def main():
|
|
"""Main function"""
|
|
# get command-line args
|
|
args = parseargs()
|
|
args.action = args.action.lower()
|
|
|
|
if args.credentials is None:
|
|
raise ValueError('credentials json not specified')
|
|
if args.config is None:
|
|
raise ValueError('config json not specified')
|
|
|
|
with open(args.credentials, 'r') as f:
|
|
config = json.load(f)
|
|
with open(args.config, 'r') as f:
|
|
config = merge_dict(config, json.load(f))
|
|
try:
|
|
with open(args.pool, 'r') as f:
|
|
config = merge_dict(config, json.load(f))
|
|
except ValueError:
|
|
raise
|
|
except Exception:
|
|
config['pool_specification'] = {
|
|
'id': args.poolid
|
|
}
|
|
if args.action in ('addjobs', 'cleanmijobs', 'deljobs', 'termjobs'):
|
|
try:
|
|
with open(args.jobs, 'r') as f:
|
|
config = merge_dict(config, json.load(f))
|
|
except ValueError:
|
|
raise
|
|
except Exception:
|
|
config['job_specifications'] = [{
|
|
'id': args.jobid
|
|
}]
|
|
logger.debug('config:\n' + json.dumps(config, indent=4))
|
|
_populate_global_settings(config, args.action)
|
|
config['_auto_confirm'] = args.yes
|
|
|
|
batch_client, blob_client, queue_client, table_client = \
|
|
_create_credentials(config)
|
|
|
|
if args.action == 'addpool':
|
|
# first check if pool exists to prevent accidential metadata clear
|
|
if batch_client.pool.exists(config['pool_specification']['id']):
|
|
raise RuntimeError(
|
|
'attempting to create a pool that already exists: {}'.format(
|
|
config['pool_specification']['id']))
|
|
create_storage_containers(
|
|
blob_client, queue_client, table_client, config)
|
|
clear_storage_containers(
|
|
blob_client, queue_client, table_client, config)
|
|
_adjust_settings_for_pool_creation(config)
|
|
populate_queues(queue_client, table_client, config)
|
|
add_pool(batch_client, blob_client, config)
|
|
elif args.action == 'resizepool':
|
|
resize_pool(batch_client, config)
|
|
elif args.action == 'delpool':
|
|
del_pool(batch_client, config)
|
|
elif args.action == 'addsshuser':
|
|
add_ssh_tunnel_user(batch_client, config)
|
|
get_remote_login_settings(batch_client, config)
|
|
elif args.action == 'delnode':
|
|
del_node(batch_client, config, args.nodeid)
|
|
elif args.action == 'addjobs':
|
|
add_jobs(batch_client, blob_client, config)
|
|
elif args.action == 'cleanmijobs':
|
|
clean_mi_jobs(batch_client, config)
|
|
elif args.action == 'termjobs':
|
|
terminate_jobs(batch_client, config)
|
|
elif args.action == 'deljobs':
|
|
del_jobs(batch_client, config)
|
|
elif args.action == 'delcleanmijobs':
|
|
del_clean_mi_jobs(batch_client, config)
|
|
elif args.action == 'delalljobs':
|
|
del_all_jobs(batch_client, config)
|
|
elif args.action == 'grls':
|
|
get_remote_login_settings(batch_client, config)
|
|
elif args.action == 'streamfile':
|
|
stream_file_and_wait_for_task(batch_client)
|
|
elif args.action == 'gettaskfile':
|
|
get_file_via_task(batch_client, config)
|
|
elif args.action == 'getnodefile':
|
|
get_file_via_node(batch_client, config, args.nodeid)
|
|
elif args.action == 'delstorage':
|
|
delete_storage_containers(
|
|
blob_client, queue_client, table_client, config)
|
|
elif args.action == 'clearstorage':
|
|
clear_storage_containers(
|
|
blob_client, queue_client, table_client, config)
|
|
else:
|
|
raise ValueError('Unknown action: {}'.format(args.action))
|
|
|
|
|
|
def parseargs():
|
|
"""Parse program arguments
|
|
:rtype: argparse.Namespace
|
|
:return: parsed arguments
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
description='Batch Shipyard: Provision and Execute Docker Workloads '
|
|
'on Azure Batch')
|
|
parser.set_defaults(yes=False)
|
|
parser.add_argument(
|
|
'action', help='addpool, addjobs, addsshuser, cleanmijobs, '
|
|
'termjobs, deljobs, delcleanmijobs, delalljobs, delpool, delnode, '
|
|
'grls, streamfile, gettaskfile, getnodefile, clearstorage, delstorage')
|
|
parser.add_argument(
|
|
'-y', '--yes', dest='yes', action='store_true',
|
|
help='assume yes for all yes/no confirmations')
|
|
parser.add_argument(
|
|
'--credentials',
|
|
help='credentials json config. required for all actions')
|
|
parser.add_argument(
|
|
'--config',
|
|
help='global json config for option. required for all actions')
|
|
parser.add_argument(
|
|
'--pool',
|
|
help='pool json config. required for most actions')
|
|
parser.add_argument(
|
|
'--jobs',
|
|
help='jobs json config. required for job-related actions')
|
|
parser.add_argument(
|
|
'--nodeid',
|
|
help='node id for delnode or getnodefile action')
|
|
return parser.parse_args()
|
|
|
|
if __name__ == '__main__':
|
|
_setup_logger()
|
|
main()
|