batch-shipyard/shipyard.py

1575 строки
60 KiB
Python
Исходник Обычный вид История

2016-07-18 18:19:36 +03:00
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation
#
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
2016-07-18 18:19:36 +03:00
# stdlib imports
from __future__ import print_function, unicode_literals
2016-07-18 18:19:36 +03:00
import argparse
import base64
import copy
2016-07-18 18:19:36 +03:00
import datetime
import json
import hashlib
import logging
import logging.handlers
2016-07-18 18:19:36 +03:00
import os
import pathlib
import subprocess
import sys
2016-07-18 18:19:36 +03:00
import time
try:
import urllib.request as urllibreq
except ImportError:
import urllib as urllibreq
2016-07-18 18:19:36 +03:00
# non-stdlib imports
import azure.batch.batch_auth as batchauth
import azure.batch.batch_service_client as batch
import azure.batch.models as batchmodels
import azure.common
import azure.storage.blob as azureblob
import azure.storage.queue as azurequeue
import azure.storage.table as azuretable
# create logger
logger = logging.getLogger('shipyard')
2016-07-18 18:19:36 +03:00
# global defines
_PY2 = sys.version_info.major == 2
_STORAGEACCOUNT = None
_STORAGEACCOUNTKEY = None
_STORAGEACCOUNTEP = None
2016-07-18 18:19:36 +03:00
_STORAGE_CONTAINERS = {
'blob_resourcefiles': None,
'blob_torrents': None,
'table_dht': None,
2016-07-18 18:19:36 +03:00
'table_registry': None,
'table_torrentinfo': None,
'table_images': None,
2016-07-18 18:19:36 +03:00
'table_globalresources': None,
2016-08-10 01:23:15 +03:00
'table_perf': None,
2016-07-18 18:19:36 +03:00
'queue_globalresources': None,
}
_AZUREFILE_DVD_VERSION = '0.4.1'
_AZUREFILE_DVD_URL = (
'https://github.com/Azure/azurefile-dockervolumedriver/releases'
'/download/' + _AZUREFILE_DVD_VERSION + '/azurefile-dockervolumedriver'
)
_AZUREFILE_DVD_MD5 = 'f3c1750583c4842dfbf95bbd56f65ede'
_AZUREFILE_SYSTEMD_SERVICE_URL = (
'https://raw.githubusercontent.com/Azure/azurefile-dockervolumedriver'
'/master/contrib/init/systemd/azurefile-dockervolumedriver.service'
)
_AZUREFILE_SYSTEMD_SERVICE_MD5 = 'd58f2f5e9f9f78216651ac28419878f1'
_MAX_REBOOT_RETRIES = 5
2016-07-18 18:19:36 +03:00
_NODEPREP_FILE = ('nodeprep.sh', 'scripts/nodeprep.sh')
_JOBPREP_FILE = ('jpdockerblock.sh', 'scripts/jpdockerblock.sh')
2016-08-27 21:35:32 +03:00
_CASCADE_FILE = ('cascade.py', 'cascade/cascade.py')
_SETUP_PR_FILE = (
'setup_private_registry.py', 'cascade/setup_private_registry.py'
)
_PERF_FILE = ('perf.py', 'cascade/perf.py')
_REGISTRY_FILE = None
_SSH_KEY_PREFIX = 'id_rsa_shipyard'
_SSH_TUNNEL_SCRIPT = 'ssh_docker_tunnel_shipyard.sh'
_GENERIC_DOCKER_TASK_PREFIX = 'dockertask-'
_TEMP_DIR = pathlib.Path('/tmp')
2016-07-18 18:19:36 +03:00
def _setup_logger():
# type: () -> None
"""Set up logger"""
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)sZ %(levelname)s %(funcName)s:%(lineno)d %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.info('logger initialized')
def _populate_global_settings(config, action):
# type: (dict, str) -> None
2016-07-18 18:19:36 +03:00
"""Populate global settings from config
:param dict config: configuration dict
:param str action: action
2016-07-18 18:19:36 +03:00
"""
global _STORAGEACCOUNT, _STORAGEACCOUNTKEY, _STORAGEACCOUNTEP, \
_REGISTRY_FILE
ssel = config['credentials']['shipyard_storage']
_STORAGEACCOUNT = config['credentials']['storage'][ssel]['account']
_STORAGEACCOUNTKEY = config['credentials']['storage'][ssel]['account_key']
try:
_STORAGEACCOUNTEP = config['credentials']['storage'][ssel]['endpoint']
except KeyError:
_STORAGEACCOUNTEP = 'core.windows.net'
2016-07-18 18:19:36 +03:00
try:
sep = config['storage_entity_prefix']
2016-07-18 18:19:36 +03:00
except KeyError:
sep = None
if sep is None:
sep = ''
postfix = '-'.join(
(config['credentials']['batch']['account'].lower(),
config['pool_specification']['id'].lower()))
_STORAGE_CONTAINERS['blob_resourcefiles'] = '-'.join(
(sep + 'resourcefiles', postfix))
_STORAGE_CONTAINERS['blob_torrents'] = '-'.join(
(sep + 'torrents', postfix))
_STORAGE_CONTAINERS['table_dht'] = sep + 'dht'
2016-07-18 18:19:36 +03:00
_STORAGE_CONTAINERS['table_registry'] = sep + 'registry'
_STORAGE_CONTAINERS['table_torrentinfo'] = sep + 'torrentinfo'
_STORAGE_CONTAINERS['table_images'] = sep + 'images'
2016-07-18 18:19:36 +03:00
_STORAGE_CONTAINERS['table_globalresources'] = sep + 'globalresources'
2016-08-10 01:23:15 +03:00
_STORAGE_CONTAINERS['table_perf'] = sep + 'perf'
_STORAGE_CONTAINERS['queue_globalresources'] = '-'.join(
(sep + 'globalresources', postfix))
if action != 'addpool':
return
try:
dpre = config['docker_registry']['private']['enabled']
except KeyError:
dpre = False
if dpre:
try:
rf = config['docker_registry']['private'][
'docker_save_registry_file']
except KeyError:
rf = 'resources/docker-registry-v2.tar.gz'
prf = pathlib.Path(rf)
# attempt to package if registry file doesn't exist
if not prf.exists() or prf.stat().st_size == 0:
logger.debug(
'attempting to generate docker private registry tarball')
try:
prf.parent.mkdir(mode=0o750, parents=True, exist_ok=True)
output = subprocess.check_output(
'sudo docker images -q registry:2', shell=True)
output = output.decode('utf-8').strip()
except subprocess.CalledProcessError:
rf = None
config['docker_registry']['private'][
'docker_save_registry_image_id'] = None
else:
if len(output) == 12:
rf = 'resources/docker-registry-v2.tar.gz'
prf = pathlib.Path(rf)
config['docker_registry']['private'][
'docker_save_registry_image_id'] = output
subprocess.check_call(
'sudo docker save registry:2 '
'| gzip -c > {}'.format(rf), shell=True)
_REGISTRY_FILE = (
prf.name if rf is not None else None,
rf,
config['docker_registry']['private'][
'docker_save_registry_image_id']
)
else:
_REGISTRY_FILE = (None, None, None)
logger.info('private registry settings: {}'.format(_REGISTRY_FILE))
2016-07-18 18:19:36 +03:00
def _wrap_commands_in_shell(commands, wait=True):
# type: (List[str], bool) -> str
2016-07-18 18:19:36 +03:00
"""Wrap commands in a shell
:param list commands: list of commands to wrap
:param bool wait: add wait for background processes
:rtype: str
:return: wrapped commands
"""
return '/bin/bash -c \'set -e; set -o pipefail; {}{}\''.format(
2016-07-18 18:19:36 +03:00
';'.join(commands), '; wait' if wait else '')
def _create_credentials(config):
# type: (dict) -> tuple
2016-07-18 18:19:36 +03:00
"""Create authenticated clients
:param dict config: configuration dict
:rtype: tuple
:return: (batch client, blob client, queue client, table client)
"""
credentials = batchauth.SharedKeyCredentials(
config['credentials']['batch']['account'],
config['credentials']['batch']['account_key'])
2016-07-18 18:19:36 +03:00
batch_client = batch.BatchServiceClient(
credentials,
base_url=config['credentials']['batch']['account_service_url'])
2016-07-18 18:19:36 +03:00
blob_client = azureblob.BlockBlobService(
account_name=_STORAGEACCOUNT,
account_key=_STORAGEACCOUNTKEY,
endpoint_suffix=_STORAGEACCOUNTEP)
2016-07-18 18:19:36 +03:00
queue_client = azurequeue.QueueService(
account_name=_STORAGEACCOUNT,
account_key=_STORAGEACCOUNTKEY,
endpoint_suffix=_STORAGEACCOUNTEP)
2016-07-18 18:19:36 +03:00
table_client = azuretable.TableService(
account_name=_STORAGEACCOUNT,
account_key=_STORAGEACCOUNTKEY,
endpoint_suffix=_STORAGEACCOUNTEP)
2016-07-18 18:19:36 +03:00
return batch_client, blob_client, queue_client, table_client
def compute_md5_for_file(file, as_base64, blocksize=65536):
# type: (pathlib.Path, bool, int) -> str
"""Compute MD5 hash for file
:param pathlib.Path file: file to compute md5 for
:param bool as_base64: return as base64 encoded string
:param int blocksize: block size in bytes
:rtype: str
:return: md5 for file
"""
hasher = hashlib.md5()
with file.open('rb') as filedesc:
while True:
buf = filedesc.read(blocksize)
if not buf:
break
hasher.update(buf)
if as_base64:
if _PY2:
return base64.b64encode(hasher.digest())
else:
return str(base64.b64encode(hasher.digest()), 'ascii')
else:
return hasher.hexdigest()
def upload_resource_files(blob_client, config, files):
# type: (azure.storage.blob.BlockBlobService, dict, List[tuple]) -> dict
2016-07-18 18:19:36 +03:00
"""Upload resource files to blob storage
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
:rtype: dict
:return: sas url dict
"""
sas_urls = {}
for file in files:
# skip if no file is specified
if file[0] is None:
continue
upload = True
fp = pathlib.Path(file[1])
if (_REGISTRY_FILE is not None and fp.name == _REGISTRY_FILE[0] and
not fp.exists()):
logger.debug('skipping optional docker registry image: {}'.format(
_REGISTRY_FILE[0]))
continue
else:
# check if blob exists
try:
prop = blob_client.get_blob_properties(
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0])
if (prop.properties.content_settings.content_md5 ==
compute_md5_for_file(fp, True)):
logger.debug(
'remote file is the same for {}, skipping'.format(
file[0]))
upload = False
except azure.common.AzureMissingResourceHttpError:
pass
2016-07-18 18:19:36 +03:00
if upload:
logger.info('uploading file: {}'.format(file[1]))
2016-07-18 18:19:36 +03:00
blob_client.create_blob_from_path(
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0], file[1])
sas_urls[file[0]] = 'https://{}.blob.{}/{}/{}?{}'.format(
_STORAGEACCOUNT,
_STORAGEACCOUNTEP,
2016-07-18 18:19:36 +03:00
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0],
blob_client.generate_blob_shared_access_signature(
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0],
permission=azureblob.BlobPermissions.READ,
expiry=datetime.datetime.utcnow() +
datetime.timedelta(hours=2)
)
)
return sas_urls
def setup_azurefile_volume_driver(blob_client, config):
# type: (azure.storage.blob.BlockBlobService, dict) -> tuple
"""Set up the Azure File docker volume driver
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
:rtype: tuple
:return: (bin path, service file path, service env file path,
volume creation script path)
"""
# check to see if binary is downloaded
bin = pathlib.Path('resources/azurefile-dockervolumedriver')
bin.parent.mkdir(mode=0o750, parents=True, exist_ok=True)
if (not bin.exists() or
compute_md5_for_file(bin, False) != _AZUREFILE_DVD_MD5):
response = urllibreq.urlopen(_AZUREFILE_DVD_URL)
with bin.open('wb') as f:
f.write(response.read())
srv = pathlib.Path('resources/azurefile-dockervolumedriver.service')
if (not srv.exists() or
compute_md5_for_file(srv, False) !=
_AZUREFILE_SYSTEMD_SERVICE_MD5):
response = urllibreq.urlopen(_AZUREFILE_SYSTEMD_SERVICE_URL)
with srv.open('wb') as f:
f.write(response.read())
# construct systemd env file
sa = None
sakey = None
saep = None
for svkey in config[
'global_resources']['docker_volumes']['shared_data_volumes']:
conf = config[
'global_resources']['docker_volumes']['shared_data_volumes'][svkey]
if conf['volume_driver'] == 'azurefile':
# check every entry to ensure the same storage account
ssel = conf['storage_account_settings']
_sa = config['credentials']['storage'][ssel]['account']
if sa is not None and sa != _sa:
raise ValueError(
'multiple storage accounts are not supported for '
'azurefile docker volume driver')
sa = _sa
sakey = config['credentials']['storage'][ssel]['account_key']
saep = config['credentials']['storage'][ssel]['endpoint']
if sa is None or sakey is None:
raise RuntimeError(
'storage account or storage account key not specified for '
'azurefile docker volume driver')
srvenv = pathlib.Path('resources/azurefile-dockervolumedriver.env')
with srvenv.open('w', encoding='utf8') as f:
f.write('AZURE_STORAGE_ACCOUNT={}\n'.format(sa))
f.write('AZURE_STORAGE_ACCOUNT_KEY={}\n'.format(sakey))
f.write('AZURE_STORAGE_BASE={}\n'.format(saep))
# create docker volume mount command script
volcreate = pathlib.Path('resources/azurefile-dockervolume-create.sh')
with volcreate.open('w', encoding='utf8') as f:
f.write('#!/usr/bin/env bash\n\n')
for svkey in config[
'global_resources']['docker_volumes']['shared_data_volumes']:
conf = config[
'global_resources']['docker_volumes'][
'shared_data_volumes'][svkey]
opts = [
'-o share={}'.format(conf['azure_file_share_name'])
]
for opt in conf['mount_options']:
opts.append('-o {}'.format(opt))
f.write('docker volume create -d azurefile --name {} {}\n'.format(
svkey, ' '.join(opts)))
return bin, srv, srvenv, volcreate
def add_pool(batch_client, blob_client, config):
# type: (batch.BatchServiceClient, azureblob.BlockBlobService,dict) -> None
2016-07-18 18:19:36 +03:00
"""Add a Batch pool to account
:param azure.batch.batch_service_client.BatchServiceClient: batch client
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
"""
publisher = config['pool_specification']['publisher']
offer = config['pool_specification']['offer']
sku = config['pool_specification']['sku']
vm_count = config['pool_specification']['vm_count']
try:
perf = config['store_timing_metrics']
except KeyError:
perf = False
try:
use_shipyard_docker_image = config['use_shipyard_docker_image']
except KeyError:
use_shipyard_docker_image = True
# peer-to-peer settings
2016-07-18 18:19:36 +03:00
try:
p2p = config['data_replication']['peer_to_peer']['enabled']
2016-07-18 18:19:36 +03:00
except KeyError:
p2p = True
if p2p:
nonp2pcd = False
try:
p2psbias = config['data_replication'][
'peer_to_peer']['direct_download_seed_bias']
if p2psbias is None or p2psbias < 1:
raise KeyError()
except KeyError:
p2psbias = vm_count // 10
if p2psbias < 1:
p2psbias = 1
try:
p2pcomp = config[
'data_replication']['peer_to_peer']['compression']
except KeyError:
p2pcomp = True
else:
p2psbias = 0
p2pcomp = False
try:
nonp2pcd = config[
'data_replication']['non_peer_to_peer_concurrent_downloading']
except KeyError:
nonp2pcd = True
# private registry settings
2016-07-18 18:19:36 +03:00
try:
pcont = config['docker_registry']['private']['container']
pregpubpull = config['docker_registry']['private'][
'allow_public_docker_hub_pull_on_missing']
preg = config['docker_registry']['private']['enabled']
2016-07-18 18:19:36 +03:00
except KeyError:
preg = False
pregpubpull = False
# create private registry flags
if preg:
preg = ' -r {}:{}:{}'.format(
pcont, _REGISTRY_FILE[0], _REGISTRY_FILE[2])
else:
preg = ''
# create torrent flags
torrentflags = ' -t {}:{}:{}:{}:{}'.format(
p2p, nonp2pcd, p2psbias, p2pcomp, pregpubpull)
# docker settings
2016-07-18 18:19:36 +03:00
try:
dockeruser = config['docker_registry']['login']['username']
dockerpw = config['docker_registry']['login']['password']
2016-07-18 18:19:36 +03:00
except KeyError:
dockeruser = None
dockerpw = None
# check volume mounts for azurefile
azurefile_vd = False
try:
shared_data_volumes = config[
'global_resources']['docker_volumes']['shared_data_volumes']
for key in shared_data_volumes:
if shared_data_volumes[key]['volume_driver'] == 'azurefile':
azurefile_vd = True
break
except KeyError:
pass
# prefix settings
2016-07-18 18:19:36 +03:00
try:
prefix = config['storage_entity_prefix']
2016-07-18 18:19:36 +03:00
if len(prefix) == 0:
prefix = None
except KeyError:
prefix = None
# pick latest sku
node_agent_skus = batch_client.account.list_node_agent_skus()
skus_to_use = [
(nas, image_ref) for nas in node_agent_skus for image_ref in sorted(
nas.verified_image_references, key=lambda item: item.sku)
if image_ref.publisher.lower() == publisher.lower() and
image_ref.offer.lower() == offer.lower() and
image_ref.sku.lower() == sku.lower()
]
sku_to_use, image_ref_to_use = skus_to_use[-1]
# create resource files list
_rflist = [_NODEPREP_FILE, _JOBPREP_FILE, _REGISTRY_FILE]
if not use_shipyard_docker_image:
_rflist.append(_CASCADE_FILE)
_rflist.append(_SETUP_PR_FILE)
if perf:
_rflist.append(_PERF_FILE)
# handle azurefile docker volume driver
if azurefile_vd:
# only ubuntu 16.04 is supported for azurefile dvd
if (publisher.lower() != 'canonical' or
offer.lower() != 'ubuntuserver' or
sku.lower() < '16.04.0-lts'):
raise ValueError(
'Unsupported Docker Host VM Config with Azurefile '
'Docker Volume Driver')
afbin, afsrv, afenv, afvc = setup_azurefile_volume_driver(
blob_client, config)
_rflist.append((str(afbin.name), str(afbin)))
_rflist.append((str(afsrv.name), str(afsrv)))
_rflist.append((str(afenv.name), str(afenv)))
_rflist.append((str(afvc.name), str(afvc)))
2016-07-18 18:19:36 +03:00
# upload resource files
sas_urls = upload_resource_files(blob_client, config, _rflist)
del _rflist
# create start task commandline
start_task = [
'{} -o {} -s {}{}{}{}{}{}'.format(
_NODEPREP_FILE[0],
offer,
sku,
preg,
torrentflags,
' -p {}'.format(prefix) if prefix else '',
' -a' if azurefile_vd else '',
' -d' if use_shipyard_docker_image else '',
),
]
try:
start_task.extend(
config['pool_specification']['additional_node_prep_commands'])
except KeyError:
pass
2016-07-18 18:19:36 +03:00
# create pool param
pool = batchmodels.PoolAddParameter(
id=config['pool_specification']['id'],
2016-07-18 18:19:36 +03:00
virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
image_reference=image_ref_to_use,
node_agent_sku_id=sku_to_use.id),
vm_size=config['pool_specification']['vm_size'],
target_dedicated=vm_count,
enable_inter_node_communication=p2p, # enable only for p2p mode
2016-07-18 18:19:36 +03:00
start_task=batchmodels.StartTask(
command_line=_wrap_commands_in_shell(start_task, wait=False),
2016-07-18 18:19:36 +03:00
run_elevated=True,
wait_for_success=True,
environment_settings=[
batchmodels.EnvironmentSetting('LC_ALL', 'en_US.UTF-8'),
batchmodels.EnvironmentSetting(
'CASCADE_STORAGE_ENV',
'{}:{}:{}'.format(
_STORAGEACCOUNT,
_STORAGEACCOUNTEP,
_STORAGEACCOUNTKEY)
)
2016-07-18 18:19:36 +03:00
],
resource_files=[],
),
)
for rf in sas_urls:
pool.start_task.resource_files.append(
batchmodels.ResourceFile(
file_path=rf,
blob_source=sas_urls[rf])
)
if preg:
ssel = config['docker_registry']['private']['storage_account_settings']
2016-07-18 18:19:36 +03:00
pool.start_task.environment_settings.append(
batchmodels.EnvironmentSetting(
'CASCADE_PRIVATE_REGISTRY_STORAGE_ENV',
'{}:{}:{}'.format(
config['credentials']['storage'][ssel]['account'],
config['credentials']['storage'][ssel]['endpoint'],
config['credentials']['storage'][ssel]['account_key'])
)
2016-07-18 18:19:36 +03:00
)
del ssel
2016-07-18 18:19:36 +03:00
if (dockeruser is not None and len(dockeruser) > 0 and
dockerpw is not None and len(dockerpw) > 0):
pool.start_task.environment_settings.append(
batchmodels.EnvironmentSetting('DOCKER_LOGIN_USERNAME', dockeruser)
)
pool.start_task.environment_settings.append(
batchmodels.EnvironmentSetting('DOCKER_LOGIN_PASSWORD', dockerpw)
)
if perf:
pool.start_task.environment_settings.append(
batchmodels.EnvironmentSetting('CASCADE_TIMING', '1')
)
2016-07-18 18:19:36 +03:00
# create pool if not exists
try:
logger.info('Attempting to create pool: {}'.format(pool.id))
logger.debug('node prep commandline: {}'.format(
pool.start_task.command_line))
2016-07-18 18:19:36 +03:00
batch_client.pool.add(pool)
logger.info('Created pool: {}'.format(pool.id))
2016-07-18 18:19:36 +03:00
except batchmodels.BatchErrorException as e:
if e.error.code != 'PoolExists':
raise
else:
logger.error('Pool {!r} already exists'.format(pool.id))
2016-07-18 18:19:36 +03:00
# wait for pool idle
node_state = frozenset(
(batchmodels.ComputeNodeState.starttaskfailed,
batchmodels.ComputeNodeState.unusable,
batchmodels.ComputeNodeState.idle)
)
try:
reboot_on_failed = config[
'pool_specification']['reboot_on_start_task_failed']
except KeyError:
reboot_on_failed = False
nodes = _wait_for_pool_ready(
batch_client, node_state, pool.id, reboot_on_failed)
# create admin user on each node if requested
add_ssh_tunnel_user(batch_client, config, nodes)
# log remote login settings
get_remote_login_settings(batch_client, config, nodes)
def add_ssh_tunnel_user(batch_client, config, nodes=None):
# type: (batch.BatchServiceClient, dict,
# List[batchmodels.ComputeNode]) -> None
"""Add an SSH user to node and optionally generate an SSH tunneling script
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param dict config: configuration dict
:param list nodes: list of nodes
"""
pool_id = config['pool_specification']['id']
try:
docker_user = config[
'pool_specification']['ssh_docker_tunnel']['username']
if docker_user is None:
raise KeyError()
except KeyError:
logger.info('not creating ssh tunnel user on pool {}'.format(pool_id))
else:
ssh_priv_key = None
try:
ssh_pub_key = config[
'pool_specification']['ssh_docker_tunnel']['ssh_public_key']
except KeyError:
ssh_pub_key = None
try:
gen_tunnel_script = config[
'pool_specification']['ssh_docker_tunnel'][
'generate_tunnel_script']
except KeyError:
gen_tunnel_script = False
# generate ssh key pair if not specified
if ssh_pub_key is None:
ssh_priv_key, ssh_pub_key = generate_ssh_keypair()
# get node list if not provided
if nodes is None:
nodes = batch_client.compute_node.list(pool_id)
for node in nodes:
add_admin_user_to_compute_node(
batch_client, pool_id, node, docker_user, ssh_pub_key)
# generate tunnel script if requested
if gen_tunnel_script:
ssh_args = ['ssh']
if ssh_priv_key is not None:
ssh_args.append('-i')
ssh_args.append(ssh_priv_key)
ssh_args.extend([
'-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null',
'-p', '$2', '-N', '-L', '2375:localhost:2375',
'{}@$1'.format(docker_user)])
with open(_SSH_TUNNEL_SCRIPT, 'w') as fd:
fd.write('#!/usr/bin/env bash\n')
fd.write('set -e\n')
fd.write(' '.join(ssh_args))
fd.write('\n')
os.chmod(_SSH_TUNNEL_SCRIPT, 0o755)
logger.info('ssh tunnel script generated: {}'.format(
_SSH_TUNNEL_SCRIPT))
def _wait_for_pool_ready(batch_client, node_state, pool_id, reboot_on_failed):
# type: (batch.BatchServiceClient, List[batchmodels.ComputeNodeState],
# str, bool) -> List[batchmodels.ComputeNode]
"""Wait for pool to enter "ready": steady state and all nodes idle
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param dict config: configuration dict
:param str pool_id: pool id
:param bool reboot_on_failed: reboot node on failed start state
:rtype: list
:return: list of nodes
"""
logger.info(
'waiting for all nodes in pool {} to reach one of: {!r}'.format(
pool_id, node_state))
2016-07-18 18:19:36 +03:00
i = 0
reboot_map = {}
2016-07-18 18:19:36 +03:00
while True:
# refresh pool to ensure that there is no resize error
pool = batch_client.pool.get(pool_id)
2016-07-18 18:19:36 +03:00
if pool.resize_error is not None:
raise RuntimeError(
'resize error encountered for pool {}: code={} msg={}'.format(
pool.id, pool.resize_error.code,
pool.resize_error.message))
2016-07-18 18:19:36 +03:00
nodes = list(batch_client.compute_node.list(pool.id))
if (reboot_on_failed and
any(node.state == batchmodels.ComputeNodeState.starttaskfailed
for node in nodes)):
for node in nodes:
if (node.state ==
batchmodels.ComputeNodeState.starttaskfailed):
if node.id not in reboot_map:
reboot_map[node.id] = 0
if reboot_map[node.id] > _MAX_REBOOT_RETRIES:
raise RuntimeError(
('ran out of reboot retries recovering node {} '
'in pool {}').format(node.id, pool.id))
_reboot_node(batch_client, pool.id, node.id, True)
reboot_map[node.id] += 1
# refresh node list
nodes = list(batch_client.compute_node.list(pool.id))
2016-07-18 18:19:36 +03:00
if (len(nodes) >= pool.target_dedicated and
all(node.state in node_state for node in nodes)):
if any(node.state != batchmodels.ComputeNodeState.idle
for node in nodes):
raise RuntimeError(
'node(s) of pool {} not in idle state'.format(pool.id))
else:
return nodes
2016-07-18 18:19:36 +03:00
i += 1
if i % 3 == 0:
i = 0
logger.debug('waiting for {} nodes to reach desired state'.format(
2016-07-18 18:19:36 +03:00
pool.target_dedicated))
for node in nodes:
logger.debug('{}: {}'.format(node.id, node.state))
2016-07-18 18:19:36 +03:00
time.sleep(10)
def generate_ssh_keypair():
# type: (str) -> tuple
"""Generate an ssh keypair for use with user logins
:param str key_fileprefix: key file prefix
:rtype: tuple
:return: (private key filename, public key filename)
"""
pubkey = _SSH_KEY_PREFIX + '.pub'
try:
if os.path.exists(_SSH_KEY_PREFIX):
old = _SSH_KEY_PREFIX + '.old'
if os.path.exists(old):
os.remove(old)
os.rename(_SSH_KEY_PREFIX, old)
except OSError:
pass
try:
if os.path.exists(pubkey):
old = pubkey + '.old'
if os.path.exists(old):
os.remove(old)
os.rename(pubkey, old)
except OSError:
pass
logger.info('generating ssh key pair')
subprocess.check_call(
['ssh-keygen', '-f', _SSH_KEY_PREFIX, '-t', 'rsa', '-N', ''''''])
return (_SSH_KEY_PREFIX, pubkey)
def add_admin_user_to_compute_node(
batch_client, pool_id, node, username, ssh_public_key):
# type: (batch.BatchServiceClient, str, batchmodels.ComputeNode, str,
# str) -> None
"""Adds an administrative user to the Batch Compute Node
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param str pool_id: The pool id containing the node.
:param node: The compute node.
:type node: `batchserviceclient.models.ComputeNode`
:param str username: user name
:param str ssh_public_key: ssh rsa public key
"""
logger.info('adding user {} to node {} in pool {}'.format(
username, node.id, pool_id))
try:
batch_client.compute_node.add_user(
pool_id,
node.id,
batchmodels.ComputeNodeUser(
username,
is_admin=True,
password=None,
ssh_public_key=open(ssh_public_key, 'rb').read().decode('utf8')
)
)
except batchmodels.batch_error.BatchErrorException as ex:
if 'The node user already exists' not in ex.message.value:
raise
def _add_global_resource(
queue_client, table_client, config, pk, p2pcsd, grtype):
# type: (azurequeue.QueueService, azuretable.TableService, dict, str,
# bool, str) -> None
"""Add global resources
:param azure.storage.queue.QueueService queue_client: queue client
:param azure.storage.table.TableService table_client: table client
:param dict config: configuration dict
:param str pk: partition key
:param int p2pcsd: peer-to-peer concurrent source downloads
:param str grtype: global resources type
"""
try:
for gr in config['global_resources'][grtype]:
if grtype == 'docker_images':
prefix = 'docker'
else:
raise NotImplementedError()
resource = '{}:{}'.format(prefix, gr)
logger.info('adding global resource: {}'.format(resource))
table_client.insert_or_replace_entity(
_STORAGE_CONTAINERS['table_globalresources'],
{
'PartitionKey': pk,
'RowKey': hashlib.sha1(
resource.encode('utf8')).hexdigest(),
'Resource': resource,
}
)
for _ in range(0, p2pcsd):
queue_client.put_message(
_STORAGE_CONTAINERS['queue_globalresources'], resource)
except KeyError:
pass
def populate_queues(queue_client, table_client, config):
# type: (azurequeue.QueueService, azuretable.TableService, dict) -> None
"""Populate queues
:param azure.storage.queue.QueueService queue_client: queue client
:param azure.storage.table.TableService table_client: table client
:param dict config: configuration dict
"""
try:
preg = config['docker_registry']['private']['enabled']
except KeyError:
preg = False
pk = '{}${}'.format(
config['credentials']['batch']['account'],
config['pool_specification']['id'])
# if using docker public hub, then populate registry table with hub
if not preg:
table_client.insert_or_replace_entity(
_STORAGE_CONTAINERS['table_registry'],
{
'PartitionKey': pk,
'RowKey': 'registry.hub.docker.com',
'Port': 80,
}
)
# get p2pcsd setting
try:
p2p = config['data_replication']['peer_to_peer']['enabled']
except KeyError:
p2p = True
if p2p:
try:
p2pcsd = config['data_replication']['peer_to_peer'][
'concurrent_source_downloads']
if p2pcsd is None or p2pcsd < 1:
raise KeyError()
except KeyError:
p2pcsd = config['pool_specification']['vm_count'] // 6
if p2pcsd < 1:
p2pcsd = 1
else:
p2pcsd = 1
# add global resources
_add_global_resource(
queue_client, table_client, config, pk, p2pcsd, 'docker_images')
def _adjust_settings_for_pool_creation(config):
# type: (dict) -> None
"""Adjust settings for pool creation
:param dict config: configuration dict
"""
publisher = config['pool_specification']['publisher'].lower()
offer = config['pool_specification']['offer'].lower()
sku = config['pool_specification']['sku'].lower()
# enforce publisher/offer/sku restrictions
allowed = False
shipyard_container_required = True
if publisher == 'canonical':
if offer == 'ubuntuserver':
if sku >= '14.04.0-lts':
allowed = True
if sku >= '16.04.0-lts':
shipyard_container_required = False
elif publisher == 'credativ':
if offer == 'debian':
if sku >= '8':
allowed = True
elif publisher == 'openlogic':
if offer.startswith('centos'):
if sku >= '7':
allowed = True
elif publisher == 'redhat':
if offer == 'rhel':
if sku >= '7':
allowed = True
elif publisher == 'suse':
if offer.startswith('sles'):
if sku >= '12':
allowed = True
elif offer == 'opensuse-leap':
if sku >= '42':
allowed = True
elif offer == 'opensuse':
if sku == '13.2':
allowed = True
# oracle linux is not supported due to UEKR4 requirement
if not allowed:
raise ValueError(
('Unsupported Docker Host VM Config, publisher={} offer={} '
'sku={}').format(publisher, offer, sku))
# adjust for shipyard container requirement
if shipyard_container_required:
config['use_shipyard_docker_image'] = True
logger.warning(
('forcing shipyard docker image to be used due to '
'VM config, publisher={} offer={} sku={}').format(
publisher, offer, sku))
# adjust inter node comm setting
vm_count = int(config['pool_specification']['vm_count'])
try:
p2p = config['data_replication']['peer_to_peer']['enabled']
except KeyError:
p2p = True
max_vms = 20 if publisher == 'microsoftwindowsserver' else 40
if p2p and vm_count > max_vms:
logger.warning(
('disabling peer-to-peer transfer as pool size of {} exceeds '
'max limit of {} vms for inter-node communication').format(
vm_count, max_vms))
if 'data_replication' not in config:
config['data_replication'] = {}
if 'peer_to_peer' not in config['data_replication']:
config['data_replication']['peer_to_peer'] = {}
config['data_replication']['peer_to_peer']['enabled'] = False
def resize_pool(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Resize a pool
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param dict config: configuration dict
"""
pool_id = config['pool_specification']['id']
vm_count = int(config['pool_specification']['vm_count'])
logger.info('Resizing pool {} to {}'.format(pool_id, vm_count))
2016-07-18 18:19:36 +03:00
batch_client.pool.resize(
pool_id=pool_id,
pool_resize_parameter=batchmodels.PoolResizeParameter(
target_dedicated=vm_count,
resize_timeout=datetime.timedelta(minutes=20),
)
2016-07-18 18:19:36 +03:00
)
def del_pool(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Delete a pool
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param dict config: configuration dict
"""
pool_id = config['pool_specification']['id']
logger.info('Deleting pool: {}'.format(pool_id))
2016-07-18 18:19:36 +03:00
batch_client.pool.delete(pool_id)
def del_node(batch_client, config, node_id):
# type: (batch.BatchServiceClient, dict, str) -> None
"""Delete a node in a pool
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param dict config: configuration dict
:param str node_id: node id to delete
"""
pool_id = config['pool_specification']['id']
logger.info('Deleting node {} from pool {}'.format(node_id, pool_id))
batch_client.pool.remove_nodes(
pool_id=pool_id,
node_remove_parameter=batchmodels.NodeRemoveParameter(
node_list=[node_id],
)
)
def _reboot_node(batch_client, pool_id, node_id, wait):
# type: (batch.BatchServiceClient, str, str, bool) -> None
"""Reboot a node in a pool
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param str pool_id: pool id of node
:param str node_id: node id to delete
:param bool wait: wait for node to enter rebooting state
"""
logger.info('Rebooting node {} from pool {}'.format(node_id, pool_id))
batch_client.compute_node.reboot(
pool_id=pool_id,
node_id=node_id,
)
if wait:
logger.debug('waiting for node {} to enter rebooting state'.format(
node_id))
while True:
node = batch_client.compute_node.get(pool_id, node_id)
if node.state == batchmodels.ComputeNodeState.rebooting:
break
else:
time.sleep(1)
def add_jobs(batch_client, blob_client, config):
# type: (batch.BatchServiceClient, azureblob.BlockBlobService,dict) -> None
"""Add jobs
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
"""
pool_id = config['pool_specification']['id']
global_resources = []
for gr in config['global_resources']['docker_images']:
global_resources.append(gr)
# TODO add global resources for non-docker resources
jpcmdline = _wrap_commands_in_shell([
'$AZ_BATCH_NODE_SHARED_DIR/{} {}'.format(
_JOBPREP_FILE[0], ' '.join(global_resources))])
for jobspec in config['job_specifications']:
job = batchmodels.JobAddParameter(
id=jobspec['id'],
pool_info=batchmodels.PoolInformation(pool_id=pool_id),
job_preparation_task=batchmodels.JobPreparationTask(
command_line=jpcmdline,
wait_for_success=True,
run_elevated=True,
rerun_on_node_reboot_after_success=False,
)
)
# check if any tasks in job have dependencies
dependencies = False
for task in jobspec['tasks']:
# do not break, check to ensure ids are set on each task if
# task dependencies are set
if 'depends_on' in task and len(task['depends_on']) > 0:
if ('id' not in task or task['id'] is None or
len(task['id']) == 0):
raise ValueError(
'task id is not specified, but depends_on is set')
dependencies = True
job.uses_task_dependencies = dependencies
logger.info('Adding job: {}'.format(job.id))
try:
batch_client.job.add(job)
except batchmodels.batch_error.BatchErrorException as ex:
if 'The specified job already exists' not in ex.message.value:
raise
# add all tasks under job
for task in jobspec['tasks']:
image = task['image']
# get or generate task id
try:
task_id = task['id']
if task_id is None or len(task_id) == 0:
raise KeyError()
except KeyError:
# get filtered, sorted list of generic docker task ids
try:
tasklist = sorted(
filter(lambda x: x.id.startswith(
_GENERIC_DOCKER_TASK_PREFIX), list(
batch_client.task.list(job.id))),
key=lambda y: y.id)
tasknum = int(tasklist[-1].id.split('-')[-1]) + 1
except (batchmodels.batch_error.BatchErrorException,
IndexError):
tasknum = 0
task_id = '{0}{1:03d}'.format(
_GENERIC_DOCKER_TASK_PREFIX, tasknum)
# get generic run opts
try:
run_opts = task['additional_docker_run_options']
except KeyError:
run_opts = []
# parse remove container option
try:
rm_container = task['remove_container_after_exit']
except KeyError:
pass
else:
if rm_container and '--rm' not in run_opts:
run_opts.append('--rm')
del rm_container
# parse name option
try:
name = task['name']
if name is not None:
run_opts.append('-n {}'.format(name))
del name
except KeyError:
pass
# parse labels option
try:
labels = task['labels']
if labels is not None and len(labels) > 0:
for label in labels:
run_opts.append('-l {}'.format(label))
del labels
except KeyError:
pass
# parse ports option
try:
ports = task['ports']
if ports is not None and len(ports) > 0:
for port in ports:
run_opts.append('-p {}'.format(port))
del ports
except KeyError:
pass
# parse entrypoint
try:
entrypoint = task['entrypoint']
if entrypoint is not None:
run_opts.append('--entrypoint {}'.format(entrypoint))
del entrypoint
except KeyError:
pass
# parse data volumes
try:
data_volumes = task['data_volumes']
except KeyError:
pass
else:
if data_volumes is not None and len(data_volumes) > 0:
for key in data_volumes:
dvspec = config[
'global_resources']['docker_volumes'][
'data_volumes'][key]
try:
hostpath = dvspec['host_path']
except KeyError:
hostpath = None
if hostpath is not None and len(hostpath) > 0:
run_opts.append('-v {}:{}'.format(
hostpath, dvspec['container_path']))
else:
run_opts.append('-v {}'.format(
dvspec['container_path']))
# parse shared data volumes
try:
shared_data_volumes = task['shared_data_volumes']
except KeyError:
pass
else:
if (shared_data_volumes is not None and
len(shared_data_volumes) > 0):
for key in shared_data_volumes:
dvspec = config[
'global_resources']['docker_volumes'][
'shared_data_volumes'][key]
run_opts.append('-v {}:{}'.format(
key, dvspec['container_path']))
# get command
try:
command = task['command']
except KeyError:
command = None
# get and create env var file
envfile = '.env.list'.format(task_id)
sas_urls = None
try:
env_vars = jobspec['environment_variables']
except KeyError:
env_vars = None
try:
task_ev = task['environment_variables']
if env_vars is None:
env_vars = task_ev
else:
env_vars = merge_dict(env_vars, task_ev)
except KeyError:
pass
else:
if env_vars is not None and len(env_vars) > 0:
envfiletmp = _TEMP_DIR / '{}{}'.format(task_id, envfile)
envfileloc = '{}taskrf-{}/{}{}'.format(
config['storage_entity_prefix'], job.id, task_id,
envfile)
with envfiletmp.open('w', encoding='utf8') as f:
for key in env_vars:
f.write('{}={}{}'.format(
key, env_vars[key], os.linesep))
# upload env var file if exists
sas_urls = upload_resource_files(
blob_client, config, [(envfileloc, str(envfiletmp))])
envfiletmp.unlink()
if len(sas_urls) != 1:
raise RuntimeError('unexpected number of sas urls')
# always add option for envfile
run_opts.append('--env-file {}'.format(envfile))
# mount batch root dir
run_opts.append(
'-v $AZ_BATCH_NODE_ROOT_DIR:$AZ_BATCH_NODE_ROOT_DIR')
# set working directory
run_opts.append('-w $AZ_BATCH_TASK_WORKING_DIR')
# add task
task_commands = [
'env | grep AZ_BATCH_ >> {}'.format(envfile),
'docker run {} {}{}'.format(
' '.join(run_opts),
image,
' {}'.format(command) if command else '')
]
batchtask = batchmodels.TaskAddParameter(
id=task_id,
command_line=_wrap_commands_in_shell(task_commands),
run_elevated=True,
resource_files=[],
)
batchtask.resource_files.append(
batchmodels.ResourceFile(
file_path=str(envfile),
blob_source=next(iter(sas_urls.values())),
file_mode='0640',
)
)
# add additional resource files
try:
rfs = task['resource_files']
except KeyError:
pass
else:
for rf in rfs:
try:
fm = rf['file_mode']
except KeyError:
fm = None
batchtask.resource_files.append(
batchmodels.ResourceFile(
file_path=rf['file_path'],
blob_source=rf['blob_source'],
file_mode=fm,
)
)
# add task dependencies
if 'depends_on' in task and len(task['depends_on']) > 0:
batchtask.depends_on = batchmodels.TaskDependencies(
task_ids=task['depends_on']
)
# create task
logger.info('Adding task {}: {}'.format(
task_id, batchtask.command_line))
batch_client.task.add(job_id=job.id, task=batchtask)
2016-07-18 18:19:36 +03:00
def del_jobs(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Delete jobs
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param dict config: configuration dict
"""
for job in config['job_specifications']:
job_id = job['id']
logger.info('Deleting job: {}'.format(job_id))
batch_client.job.delete(job_id)
2016-07-18 18:19:36 +03:00
def terminate_jobs(batch_client, config):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict) -> None
"""Terminate jobs
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param dict config: configuration dict
"""
for job in config['job_specifications']:
job_id = job['id']
logger.info('Terminating job: {}'.format(job_id))
batch_client.job.terminate(job_id)
2016-07-18 18:19:36 +03:00
def del_all_jobs(batch_client):
# type: (azure.batch.batch_service_client.BatchServiceClient) -> None
"""Delete all jobs
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
"""
logger.debug('Getting list of all jobs...')
2016-07-18 18:19:36 +03:00
jobs = batch_client.job.list()
for job in jobs:
logger.info('Deleting job: {}'.format(job.id))
batch_client.job.delete(job.id)
2016-07-18 18:19:36 +03:00
def get_remote_login_settings(batch_client, config, nodes=None):
# type: (batch.BatchServiceClient, dict, List[str], bool) -> None
"""Get remote login settings
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param dict config: configuration dict
:param list nodes: list of nodes
"""
pool_id = config['pool_specification']['id']
2016-07-18 18:19:36 +03:00
if nodes is None:
nodes = batch_client.compute_node.list(pool_id)
for node in nodes:
rls = batch_client.compute_node.get_remote_login_settings(
pool_id, node.id)
logger.info('node {}: {}'.format(node.id, rls))
2016-07-18 18:19:36 +03:00
def delete_storage_containers(blob_client, queue_client, table_client, config):
# type: (azureblob.BlockBlobService, azurequeue.QueueService,
# azuretable.TableService, dict) -> None
"""Delete storage containers
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param azure.storage.queue.QueueService queue_client: queue client
:param azure.storage.table.TableService table_client: table client
:param dict config: configuration dict
"""
for key in _STORAGE_CONTAINERS:
if key.startswith('blob_'):
blob_client.delete_container(_STORAGE_CONTAINERS[key])
elif key.startswith('table_'):
table_client.delete_table(_STORAGE_CONTAINERS[key])
elif key.startswith('queue_'):
queue_client.delete_queue(_STORAGE_CONTAINERS[key])
2016-07-18 18:19:36 +03:00
def _clear_blobs(blob_client, container):
# type: (azureblob.BlockBlobService, str) -> None
"""Clear blobs in container
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param str container: container to clear blobs from
"""
logger.info('deleting blobs: {}'.format(container))
2016-07-18 18:19:36 +03:00
blobs = blob_client.list_blobs(container)
for blob in blobs:
blob_client.delete_blob(container, blob.name)
def _clear_blob_task_resourcefiles(blob_client, container, config):
# type: (azureblob.BlockBlobService, str, dict) -> None
"""Clear task resource file blobs in container
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param str container: container to clear blobs from
:param dict config: configuration dict
"""
envfileloc = '{}taskrf-'.format(config['storage_entity_prefix'])
logger.info('deleting blobs with prefix: {}'.format(envfileloc))
blobs = blob_client.list_blobs(container, prefix=envfileloc)
for blob in blobs:
blob_client.delete_blob(container, blob.name)
def _clear_table(table_client, table_name, config):
"""Clear table entities
:param azure.storage.table.TableService table_client: table client
:param str table_name: table name
:param dict config: configuration dict
"""
# type: (azuretable.TableService, str, dict) -> None
logger.info('clearing table: {}'.format(table_name))
ents = table_client.query_entities(
table_name, filter='PartitionKey eq \'{}${}\''.format(
config['credentials']['batch']['account'],
config['pool_specification']['id'])
)
# batch delete entities
2016-08-10 01:23:15 +03:00
i = 0
bet = azuretable.TableBatch()
for ent in ents:
bet.delete_entity(ent['PartitionKey'], ent['RowKey'])
i += 1
if i == 100:
table_client.commit_batch(table_name, bet)
bet = azuretable.TableBatch()
i = 0
if i > 0:
table_client.commit_batch(table_name, bet)
2016-07-18 18:19:36 +03:00
def clear_storage_containers(blob_client, queue_client, table_client, config):
# type: (azureblob.BlockBlobService, azurequeue.QueueService,
# azuretable.TableService, dict) -> None
"""Clear storage containers
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param azure.storage.queue.QueueService queue_client: queue client
:param azure.storage.table.TableService table_client: table client
:param dict config: configuration dict
"""
try:
perf = config['store_timing_metrics']
except KeyError:
perf = False
for key in _STORAGE_CONTAINERS:
if key.startswith('blob_'):
# TODO this is temp to preserve registry upload
if key != 'blob_resourcefiles':
_clear_blobs(blob_client, _STORAGE_CONTAINERS[key])
else:
_clear_blob_task_resourcefiles(
blob_client, _STORAGE_CONTAINERS[key], config)
elif key.startswith('table_'):
try:
_clear_table(table_client, _STORAGE_CONTAINERS[key], config)
except azure.common.AzureMissingResourceHttpError:
if key != 'table_perf' or perf:
raise
elif key.startswith('queue_'):
logger.info('clearing queue: {}'.format(_STORAGE_CONTAINERS[key]))
queue_client.clear_messages(_STORAGE_CONTAINERS[key])
2016-07-18 18:19:36 +03:00
def create_storage_containers(blob_client, queue_client, table_client, config):
# type: (azureblob.BlockBlobService, azurequeue.QueueService,
# azuretable.TableService, dict) -> None
"""Create storage containers
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param azure.storage.queue.QueueService queue_client: queue client
:param azure.storage.table.TableService table_client: table client
:param dict config: configuration dict
"""
try:
perf = config['store_timing_metrics']
except KeyError:
perf = False
for key in _STORAGE_CONTAINERS:
if key.startswith('blob_'):
logger.info(
'creating container: {}'.format(_STORAGE_CONTAINERS[key]))
blob_client.create_container(_STORAGE_CONTAINERS[key])
elif key.startswith('table_'):
if key == 'table_perf' and not perf:
continue
logger.info('creating table: {}'.format(_STORAGE_CONTAINERS[key]))
table_client.create_table(_STORAGE_CONTAINERS[key])
elif key.startswith('queue_'):
logger.info('creating queue: {}'.format(_STORAGE_CONTAINERS[key]))
queue_client.create_queue(_STORAGE_CONTAINERS[key])
2016-07-18 18:19:36 +03:00
def merge_dict(dict1, dict2):
# type: (dict, dict) -> dict
"""Recursively merge dictionaries: dict2 on to dict1. This differs
from dict.update() in that values that are dicts are recursively merged.
Note that only dict value types are merged, not lists, etc.
:param dict dict1: dictionary to merge to
:param dict dict2: dictionary to merge with
:rtype: dict
:return: merged dictionary
"""
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
raise ValueError('dict1 or dict2 is not a dictionary')
result = copy.deepcopy(dict1)
for k, v in dict2.items():
if k in result and isinstance(result[k], dict):
result[k] = merge_dict(result[k], v)
else:
result[k] = copy.deepcopy(v)
return result
2016-07-18 18:19:36 +03:00
def main():
"""Main function"""
# get command-line args
args = parseargs()
args.action = args.action.lower()
if args.credentials is None:
raise ValueError('credentials json not specified')
if args.config is None:
raise ValueError('config json not specified')
with open(args.credentials, 'r') as f:
config = json.load(f)
with open(args.config, 'r') as f:
config = merge_dict(config, json.load(f))
try:
with open(args.pool, 'r') as f:
config = merge_dict(config, json.load(f))
except Exception:
config['pool_specification'] = {
'id': args.poolid
}
if args.action in ('addjobs', 'deljobs'):
try:
with open(args.jobs, 'r') as f:
config = merge_dict(config, json.load(f))
except Exception:
config['job_specifications'] = [{
'id': args.jobid
}]
logger.debug('config:\n' + json.dumps(config, indent=4))
_populate_global_settings(config, args.action)
2016-07-18 18:19:36 +03:00
batch_client, blob_client, queue_client, table_client = \
_create_credentials(config)
if args.action == 'addpool':
# first check if pool exists to prevent accidential metadata clear
if batch_client.pool.exists(config['pool_specification']['id']):
raise RuntimeError(
'attempting to create a pool that already exists: {}'.format(
config['pool_specification']['id']))
2016-07-18 18:19:36 +03:00
create_storage_containers(
blob_client, queue_client, table_client, config)
clear_storage_containers(
blob_client, queue_client, table_client, config)
_adjust_settings_for_pool_creation(config)
2016-07-18 18:19:36 +03:00
populate_queues(queue_client, table_client, config)
add_pool(batch_client, blob_client, config)
elif args.action == 'resizepool':
resize_pool(batch_client, config)
2016-07-18 18:19:36 +03:00
elif args.action == 'delpool':
del_pool(batch_client, config)
elif args.action == 'addsshuser':
add_ssh_tunnel_user(batch_client, config)
elif args.action == 'delnode':
del_node(batch_client, config, args.nodeid)
elif args.action == 'addjobs':
add_jobs(batch_client, blob_client, config)
elif args.action == 'termjob':
terminate_jobs(batch_client, config)
elif args.action == 'deljobs':
del_jobs(batch_client, config)
2016-07-18 18:19:36 +03:00
elif args.action == 'delalljobs':
del_all_jobs(batch_client)
elif args.action == 'grl':
get_remote_login_settings(batch_client, config)
2016-07-18 18:19:36 +03:00
elif args.action == 'delstorage':
delete_storage_containers(
blob_client, queue_client, table_client, config)
elif args.action == 'clearstorage':
clear_storage_containers(
blob_client, queue_client, table_client, config)
else:
raise ValueError('Unknown action: {}'.format(args.action))
def parseargs():
"""Parse program arguments
:rtype: argparse.Namespace
:return: parsed arguments
"""
parser = argparse.ArgumentParser(
description='Shipyard: Azure Batch to Docker Bridge')
parser.add_argument(
'action', help='action: addpool, addjob, addsshuser, termjob, '
'delpool, delnode, deljob, delalljobs, grl, delstorage, clearstorage')
2016-07-18 18:19:36 +03:00
parser.add_argument(
'--credentials',
help='credentials json config. required for all actions')
parser.add_argument(
'--config',
help='general json config for option. required for all actions')
parser.add_argument(
'--pool',
help='pool json config. required for most actions')
parser.add_argument(
'--jobs',
help='jobs json config. required for job actions')
parser.add_argument('--nodeid', help='node id for delnode action')
2016-07-18 18:19:36 +03:00
return parser.parse_args()
if __name__ == '__main__':
_setup_logger()
2016-07-18 18:19:36 +03:00
main()