Split configuration files further
- Allow docker public hub passthrough in private context - Pass config through in more places in shipyard
This commit is contained in:
Родитель
6fb2d7e221
Коммит
ee242c21d6
77
cascade.py
77
cascade.py
|
@ -48,6 +48,7 @@ _ENABLE_P2P = True
|
|||
_NON_P2P_CONCURRENT_DOWNLOADING = True
|
||||
_COMPRESSION = True
|
||||
_SEED_BIAS = 3
|
||||
_ALLOW_PUBLIC_PULL_WITH_PRIVATE = False
|
||||
_SAVELOAD_FILE_EXTENSION = 'tar.gz'
|
||||
_REGISTRY = None
|
||||
# mutable global state
|
||||
|
@ -227,6 +228,32 @@ class DockerSaveThread(threading.Thread):
|
|||
_DIRECTDL_DOWNLOADING.append(self.resource)
|
||||
|
||||
def run(self):
|
||||
success = False
|
||||
try:
|
||||
self._pull_and_load()
|
||||
success = True
|
||||
except Exception as ex:
|
||||
print(ex, file=sys.stderr)
|
||||
finally:
|
||||
# cancel callback
|
||||
if _ENABLE_P2P or not _NON_P2P_CONCURRENT_DOWNLOADING:
|
||||
_CBHANDLES[self.resource].cancel()
|
||||
_CBHANDLES.pop(self.resource)
|
||||
# release queue message
|
||||
self.queue_client.update_message(
|
||||
_STORAGE_CONTAINERS['queue_globalresources'],
|
||||
message_id=self.msg_id,
|
||||
pop_receipt=_QUEUE_MESSAGES[self.msg_id],
|
||||
visibility_timeout=0)
|
||||
_QUEUE_MESSAGES.pop(self.msg_id)
|
||||
print('queue message released for {}'.format(self.resource))
|
||||
# remove from downloading list
|
||||
if success:
|
||||
with _DIRECTDL_LOCK:
|
||||
_DIRECTDL_DOWNLOADING.remove(self.resource)
|
||||
_DIRECTDL.remove(self.resource)
|
||||
|
||||
def _pull_and_load(self):
|
||||
if _REGISTRY is None:
|
||||
raise RuntimeError(
|
||||
('{} image specified for global resource, but there are '
|
||||
|
@ -242,13 +269,24 @@ class DockerSaveThread(threading.Thread):
|
|||
subprocess.check_output(
|
||||
'docker pull {}'.format(image), shell=True)
|
||||
else:
|
||||
_pub = False
|
||||
try:
|
||||
subprocess.check_output(
|
||||
'docker pull {}/{}'.format(_REGISTRY, image),
|
||||
shell=True)
|
||||
except subprocess.CalledProcessError:
|
||||
if _ALLOW_PUBLIC_PULL_WITH_PRIVATE:
|
||||
subprocess.check_output(
|
||||
'docker pull {}'.format(image), shell=True)
|
||||
_pub = True
|
||||
else:
|
||||
raise
|
||||
# tag image to remove registry ip
|
||||
if not _pub:
|
||||
subprocess.check_call(
|
||||
'docker tag {}/{} {}'.format(_REGISTRY, image, image),
|
||||
shell=True)
|
||||
del _pub
|
||||
diff = (datetime.datetime.now() - start).total_seconds()
|
||||
print('took {} sec to pull docker image {} from {}'.format(
|
||||
diff, image, _REGISTRY))
|
||||
|
@ -365,22 +403,6 @@ class DockerSaveThread(threading.Thread):
|
|||
'gr-done',
|
||||
'nglobalresources={}'.format(self.nglobalresources))
|
||||
_GR_DONE = True
|
||||
# cancel callback
|
||||
if _ENABLE_P2P or not _NON_P2P_CONCURRENT_DOWNLOADING:
|
||||
_CBHANDLES[self.resource].cancel()
|
||||
_CBHANDLES.pop(self.resource)
|
||||
# release queue message
|
||||
self.queue_client.update_message(
|
||||
_STORAGE_CONTAINERS['queue_globalresources'],
|
||||
message_id=self.msg_id,
|
||||
pop_receipt=_QUEUE_MESSAGES[self.msg_id],
|
||||
visibility_timeout=0)
|
||||
_QUEUE_MESSAGES.pop(self.msg_id)
|
||||
print('queue message released for {}'.format(self.resource))
|
||||
# remove from downloading list
|
||||
with _DIRECTDL_LOCK:
|
||||
_DIRECTDL_DOWNLOADING.remove(self.resource)
|
||||
_DIRECTDL.remove(self.resource)
|
||||
|
||||
|
||||
async def _direct_download_resources_async(
|
||||
|
@ -851,17 +873,18 @@ def main():
|
|||
global _ENABLE_P2P, _NON_P2P_CONCURRENT_DOWNLOADING
|
||||
# get command-line args
|
||||
args = parseargs()
|
||||
_NON_P2P_CONCURRENT_DOWNLOADING = args.nonp2pcd
|
||||
_ENABLE_P2P = args.torrent
|
||||
p2popts = args.p2popts.split(':')
|
||||
_ENABLE_P2P = p2popts[0] == 'true'
|
||||
_NON_P2P_CONCURRENT_DOWNLOADING = p2popts[1]
|
||||
# set p2p options
|
||||
if _ENABLE_P2P:
|
||||
if not _LIBTORRENT_IMPORTED:
|
||||
raise ImportError('No module named \'libtorrent\'')
|
||||
global _COMPRESSION, _SEED_BIAS, _SAVELOAD_FILE_EXTENSION
|
||||
p2popts = args.p2popts.split(':')
|
||||
_COMPRESSION = p2popts[0] == 'true'
|
||||
_SEED_BIAS = int(p2popts[1])
|
||||
del p2popts
|
||||
global _COMPRESSION, _SEED_BIAS, _ALLOW_PUBLIC_PULL_WITH_PRIVATE, \
|
||||
_SAVELOAD_FILE_EXTENSION
|
||||
_COMPRESSION = p2popts[3] == 'true'
|
||||
_SEED_BIAS = int(p2popts[2])
|
||||
_ALLOW_PUBLIC_PULL_WITH_PRIVATE = p2popts[4] == 'true'
|
||||
if not _COMPRESSION:
|
||||
_SAVELOAD_FILE_EXTENSION = 'tar'
|
||||
print('peer-to-peer options: compression={} seedbias={}'.format(
|
||||
|
@ -872,6 +895,7 @@ def main():
|
|||
else:
|
||||
print('non-p2p concurrent downloading: {}'.format(
|
||||
_NON_P2P_CONCURRENT_DOWNLOADING))
|
||||
del p2popts
|
||||
|
||||
# get event loop
|
||||
if _ON_WINDOWS:
|
||||
|
@ -917,10 +941,11 @@ def parseargs():
|
|||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Cascade: Azure Batch P2P File/Image Replicator')
|
||||
parser.set_defaults(ipaddress=None, nonp2pcd=False, torrent=True)
|
||||
parser.set_defaults(ipaddress=None)
|
||||
parser.add_argument(
|
||||
'p2popts', nargs='?',
|
||||
help='peer to peer options [compression:seed bias]')
|
||||
'p2popts',
|
||||
help='peer to peer options [enabled:non-p2p concurrent '
|
||||
'downloading:seed bias:compression:public pull passthrough]')
|
||||
parser.add_argument(
|
||||
'--ipaddress', help='ip address')
|
||||
parser.add_argument(
|
||||
|
|
29
graph.py
29
graph.py
|
@ -26,16 +26,17 @@ def _create_credentials(config: dict):
|
|||
"""
|
||||
global _STORAGEACCOUNT, _STORAGEACCOUNTKEY, _BATCHACCOUNT, _POOLID, \
|
||||
_PARTITION_KEY, _TABLE_NAME
|
||||
_STORAGEACCOUNT = config['credentials']['storage_account']
|
||||
_STORAGEACCOUNTKEY = config['credentials']['storage_account_key']
|
||||
_BATCHACCOUNT = config['credentials']['batch_account']
|
||||
_POOLID = config['poolspec']['id']
|
||||
ssel = config['credentials']['shipyard_storage']
|
||||
_STORAGEACCOUNT = config['credentials']['storage'][ssel]['account']
|
||||
_STORAGEACCOUNTKEY = config['credentials']['storage'][ssel]['account_key']
|
||||
_BATCHACCOUNT = config['credentials']['batch']['account']
|
||||
_POOLID = config['pool_specification']['id']
|
||||
_PARTITION_KEY = '{}${}'.format(_BATCHACCOUNT, _POOLID)
|
||||
_TABLE_NAME = config['storage_entity_prefix'] + 'perf'
|
||||
table_client = azuretable.TableService(
|
||||
account_name=_STORAGEACCOUNT,
|
||||
account_key=_STORAGEACCOUNTKEY,
|
||||
endpoint_suffix=config['credentials']['storage_endpoint'])
|
||||
endpoint_suffix=config['credentials']['storage'][ssel]['endpoint'])
|
||||
return table_client
|
||||
|
||||
|
||||
|
@ -228,15 +229,17 @@ def main():
|
|||
# get command-line args
|
||||
args = parseargs()
|
||||
|
||||
if args.settings is None:
|
||||
raise ValueError('global settings not specified')
|
||||
if args.credentials is None:
|
||||
raise ValueError('credentials json not specified')
|
||||
if args.config is None:
|
||||
raise ValueError('config settings for action not specified')
|
||||
raise ValueError('config json not specified')
|
||||
|
||||
with open(args.settings, 'r') as f:
|
||||
with open(args.credentials, 'r') as f:
|
||||
config = json.load(f)
|
||||
with open(args.config, 'r') as f:
|
||||
config = merge_dict(config, json.load(f))
|
||||
with open(args.pool, 'r') as f:
|
||||
config = merge_dict(config, json.load(f))
|
||||
|
||||
# create storage credentials
|
||||
table_client = _create_credentials(config)
|
||||
|
@ -253,11 +256,11 @@ def parseargs():
|
|||
parser = argparse.ArgumentParser(
|
||||
description='Shipyard perf graph generator')
|
||||
parser.add_argument(
|
||||
'--settings',
|
||||
help='global settings json file config. required for all actions')
|
||||
'--credentials', help='credentials json config')
|
||||
parser.add_argument(
|
||||
'--config',
|
||||
help='json file config for option. required for all actions')
|
||||
'--config', help='general json config for option')
|
||||
parser.add_argument(
|
||||
'--pool', help='pool json config')
|
||||
return parser.parse_args()
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -4,34 +4,29 @@ set -e
|
|||
set -o pipefail
|
||||
|
||||
azurefile=0
|
||||
nonp2pcd=
|
||||
offer=
|
||||
p2p=
|
||||
prefix=
|
||||
privatereg=
|
||||
sku=
|
||||
|
||||
while getopts "h?aco:p:r:s:t:" opt; do
|
||||
while getopts "h?ao:p:r:s:t:" opt; do
|
||||
case "$opt" in
|
||||
h|\?)
|
||||
echo "nodeprep.sh parameters"
|
||||
echo ""
|
||||
echo "-a install azurefile docker volume driver"
|
||||
echo "-c concurrent downloading in non-p2p mode"
|
||||
echo "-o [offer] VM offer"
|
||||
echo "-p [prefix] storage container prefix"
|
||||
echo "-r [container:archive:image id] private registry"
|
||||
echo "-s [sku] VM sku"
|
||||
echo "-t [compression:seed bias] enable p2p sharing"
|
||||
echo "-t [enabled:non-p2p concurrent download:seed bias:compression:pub pull passthrough] p2p sharing"
|
||||
echo ""
|
||||
exit 1
|
||||
;;
|
||||
a)
|
||||
azurefile=1
|
||||
;;
|
||||
c)
|
||||
nonp2pcd="--nonp2pcd"
|
||||
;;
|
||||
o)
|
||||
offer=${OPTARG,,}
|
||||
;;
|
||||
|
@ -76,14 +71,11 @@ PYTHONASYNCIODEBUG=1
|
|||
# get ip address of eth0
|
||||
ipaddress=`ip addr list eth0 | grep "inet " | cut -d' ' -f6 | cut -d/ -f1`
|
||||
|
||||
# set torrent flag and iptables rules
|
||||
torrentflag=
|
||||
# set iptables rules
|
||||
if [ ! -z "$p2p" ]; then
|
||||
# disable DHT connection tracking
|
||||
iptables -t raw -I PREROUTING -p udp --dport 6881 -j CT --notrack
|
||||
iptables -t raw -I OUTPUT -p udp --sport 6881 -j CT --notrack
|
||||
else
|
||||
torrentflag="--no-torrent"
|
||||
fi
|
||||
|
||||
# copy job prep docker block file to shared
|
||||
|
@ -196,7 +188,7 @@ fi
|
|||
|
||||
# start cascade
|
||||
./perf.py cascade start $prefix
|
||||
./cascade.py $p2p --ipaddress $ipaddress $prefix $torrentflag $nonp2pcd > cascade.log &
|
||||
./cascade.py $p2p --ipaddress $ipaddress $prefix > cascade.log &
|
||||
# if not in p2p mode, then wait for cascade exit
|
||||
if [ -z "$p2p" ]; then
|
||||
wait
|
||||
|
|
193
shipyard.py
193
shipyard.py
|
@ -65,23 +65,23 @@ def _populate_global_settings(config: dict, action: str):
|
|||
"""
|
||||
global _STORAGEACCOUNT, _STORAGEACCOUNTKEY, _BATCHACCOUNTKEY, \
|
||||
_REGISTRY_FILE
|
||||
_STORAGEACCOUNT = config['credentials']['storage_account']
|
||||
_STORAGEACCOUNTKEY = config['credentials']['storage_account_key']
|
||||
_BATCHACCOUNTKEY = config['credentials']['batch_account_key']
|
||||
ssel = config['credentials']['shipyard_storage']
|
||||
_STORAGEACCOUNT = config['credentials']['storage'][ssel]['account']
|
||||
_STORAGEACCOUNTKEY = config['credentials']['storage'][ssel]['account_key']
|
||||
_BATCHACCOUNTKEY = config['credentials']['batch']['account_key']
|
||||
try:
|
||||
sep = config['storage_entity_prefix']
|
||||
except KeyError:
|
||||
sep = None
|
||||
if sep is None:
|
||||
sep = ''
|
||||
postfix = '-'.join(
|
||||
(config['credentials']['batch']['account'].lower(),
|
||||
config['pool_specification']['id'].lower()))
|
||||
_STORAGE_CONTAINERS['blob_resourcefiles'] = '-'.join(
|
||||
(sep + 'resourcefiles',
|
||||
config['credentials']['batch_account'].lower(),
|
||||
config['pool_specification']['id'].lower()))
|
||||
(sep + 'resourcefiles', postfix))
|
||||
_STORAGE_CONTAINERS['blob_torrents'] = '-'.join(
|
||||
(sep + 'torrents',
|
||||
config['credentials']['batch_account'].lower(),
|
||||
config['pool_specification']['id'].lower()))
|
||||
(sep + 'torrents', postfix))
|
||||
_STORAGE_CONTAINERS['table_dht'] = sep + 'dht'
|
||||
_STORAGE_CONTAINERS['table_registry'] = sep + 'registry'
|
||||
_STORAGE_CONTAINERS['table_torrentinfo'] = sep + 'torrentinfo'
|
||||
|
@ -89,20 +89,25 @@ def _populate_global_settings(config: dict, action: str):
|
|||
_STORAGE_CONTAINERS['table_globalresources'] = sep + 'globalresources'
|
||||
_STORAGE_CONTAINERS['table_perf'] = sep + 'perf'
|
||||
_STORAGE_CONTAINERS['queue_globalresources'] = '-'.join(
|
||||
(sep + 'globalresources',
|
||||
config['credentials']['batch_account'].lower(),
|
||||
config['pool_specification']['id'].lower()))
|
||||
(sep + 'globalresources', postfix))
|
||||
if action != 'addpool':
|
||||
return
|
||||
try:
|
||||
if config['docker_registry']['private']['enabled']:
|
||||
dpre = config['docker_registry']['private']['enabled']
|
||||
except KeyError:
|
||||
dpre = False
|
||||
if dpre:
|
||||
try:
|
||||
rf = config['docker_registry']['private'][
|
||||
'docker_save_registry_file']
|
||||
except KeyError:
|
||||
rf = 'resources/docker-registry-v2.tar.gz'
|
||||
prf = pathlib.Path(rf)
|
||||
# attempt to package if registry file doesn't exist
|
||||
if not prf.exists():
|
||||
if not prf.exists() or prf.stat().st_size == 0:
|
||||
print('attempting to generate docker private registry tarball')
|
||||
try:
|
||||
prf.parent.mkdir(mode=0o750, parents=True, exist_ok=True)
|
||||
output = subprocess.check_output(
|
||||
'sudo docker images -q registry:2', shell=True)
|
||||
output = output.decode('utf-8').strip()
|
||||
|
@ -110,7 +115,7 @@ def _populate_global_settings(config: dict, action: str):
|
|||
pass
|
||||
else:
|
||||
if len(output) == 12:
|
||||
rf = 'resources/docker_registry_v2.tar.gz'
|
||||
rf = 'resources/docker-registry-v2.tar.gz'
|
||||
prf = pathlib.Path(rf)
|
||||
config['docker_registry']['private'][
|
||||
'docker_save_registry_image_id'] = output
|
||||
|
@ -125,8 +130,6 @@ def _populate_global_settings(config: dict, action: str):
|
|||
)
|
||||
else:
|
||||
_REGISTRY_FILE = (None, None, None)
|
||||
except Exception:
|
||||
_REGISTRY_FILE = (None, None, None)
|
||||
print('private registry settings: {}'.format(_REGISTRY_FILE))
|
||||
|
||||
|
||||
|
@ -147,24 +150,25 @@ def _create_credentials(config: dict) -> tuple:
|
|||
:rtype: tuple
|
||||
:return: (batch client, blob client, queue client, table client)
|
||||
"""
|
||||
ssel = config['credentials']['shipyard_storage']
|
||||
credentials = batchauth.SharedKeyCredentials(
|
||||
config['credentials']['batch_account'],
|
||||
config['credentials']['batch']['account'],
|
||||
_BATCHACCOUNTKEY)
|
||||
batch_client = batch.BatchServiceClient(
|
||||
credentials,
|
||||
base_url=config['credentials']['batch_account_service_url'])
|
||||
base_url=config['credentials']['batch']['account_service_url'])
|
||||
blob_client = azureblob.BlockBlobService(
|
||||
account_name=_STORAGEACCOUNT,
|
||||
account_key=_STORAGEACCOUNTKEY,
|
||||
endpoint_suffix=config['credentials']['storage_endpoint'])
|
||||
endpoint_suffix=config['credentials']['storage'][ssel]['endpoint'])
|
||||
queue_client = azurequeue.QueueService(
|
||||
account_name=_STORAGEACCOUNT,
|
||||
account_key=_STORAGEACCOUNTKEY,
|
||||
endpoint_suffix=config['credentials']['storage_endpoint'])
|
||||
endpoint_suffix=config['credentials']['storage'][ssel]['endpoint'])
|
||||
table_client = azuretable.TableService(
|
||||
account_name=_STORAGEACCOUNT,
|
||||
account_key=_STORAGEACCOUNTKEY,
|
||||
endpoint_suffix=config['credentials']['storage_endpoint'])
|
||||
endpoint_suffix=config['credentials']['storage'][ssel]['endpoint'])
|
||||
return batch_client, blob_client, queue_client, table_client
|
||||
|
||||
|
||||
|
@ -199,6 +203,7 @@ def upload_resource_files(
|
|||
:rtype: dict
|
||||
:return: sas url dict
|
||||
"""
|
||||
ssel = config['credentials']['shipyard_storage']
|
||||
sas_urls = {}
|
||||
for file in files:
|
||||
# skip if no file is specified
|
||||
|
@ -228,7 +233,7 @@ def upload_resource_files(
|
|||
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0], file[1])
|
||||
sas_urls[file[0]] = 'https://{}.blob.{}/{}/{}?{}'.format(
|
||||
_STORAGEACCOUNT,
|
||||
config['credentials']['storage_endpoint'],
|
||||
config['credentials']['storage'][ssel]['endpoint'],
|
||||
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0],
|
||||
blob_client.generate_blob_shared_access_signature(
|
||||
_STORAGE_CONTAINERS['blob_resourcefiles'], file[0],
|
||||
|
@ -266,8 +271,9 @@ def setup_azurefile_volume_driver(
|
|||
'global_resources']['docker_volumes']['shared_data_volumes'][svkey]
|
||||
if conf['volume_driver'] == 'azurefile':
|
||||
# check every entry to ensure the same storage account
|
||||
_sa = conf['storage_account']
|
||||
_sakey = conf['storage_account_key']
|
||||
ssel = conf['storage_account_settings']
|
||||
_sa = config['credentials']['storage'][ssel]['account']
|
||||
_sakey = config['credentials']['storage'][ssel]['account_key']
|
||||
if sa is not None and sa != _sa:
|
||||
raise ValueError(
|
||||
'multiple storage accounts are not supported for '
|
||||
|
@ -330,6 +336,8 @@ def add_pool(
|
|||
except KeyError:
|
||||
p2pcomp = True
|
||||
else:
|
||||
p2psbias = 0
|
||||
p2pcomp = False
|
||||
try:
|
||||
nonp2pcd = config[
|
||||
'data_replication']['non_peer_to_peer_concurrent_downloading']
|
||||
|
@ -339,13 +347,20 @@ def add_pool(
|
|||
try:
|
||||
preg = config['docker_registry']['private']['enabled']
|
||||
pcont = config['docker_registry']['private']['container']
|
||||
pregpubpull = config['docker_registry']['private'][
|
||||
'allow_public_docker_hub_pull_on_missing']
|
||||
except KeyError:
|
||||
preg = False
|
||||
pregpubpull = False
|
||||
# create private registry flags
|
||||
if preg:
|
||||
preg = ' -r {}:{}:{}'.format(
|
||||
pcont, _REGISTRY_FILE[0], _REGISTRY_FILE[2])
|
||||
else:
|
||||
preg = ''
|
||||
# create torrent flags
|
||||
torrentflags = ' -t {}:{}:{}:{}:{}'.format(
|
||||
p2p, nonp2pcd, p2psbias, p2pcomp, pregpubpull)
|
||||
# docker settings
|
||||
try:
|
||||
dockeruser = config['docker_registry']['login']['username']
|
||||
|
@ -405,6 +420,24 @@ def add_pool(
|
|||
# upload resource files
|
||||
sas_urls = upload_resource_files(blob_client, config, _rflist)
|
||||
del _rflist
|
||||
# create start task commandline
|
||||
start_task = [
|
||||
'{} -o {} -s {}{}{}{}{}'.format(
|
||||
_NODEPREP_FILE[0],
|
||||
offer,
|
||||
sku,
|
||||
preg,
|
||||
torrentflags,
|
||||
' -p {}'.format(prefix) if prefix else '',
|
||||
' -a' if azurefile_vd else '',
|
||||
),
|
||||
]
|
||||
try:
|
||||
start_task.extend(
|
||||
config['pool_specification']['additional_node_prep_commands'])
|
||||
except KeyError:
|
||||
pass
|
||||
ssel = config['docker_registry']['private']['storage_account_settings']
|
||||
# create pool param
|
||||
pool = batchmodels.PoolAddParameter(
|
||||
id=config['pool_specification']['id'],
|
||||
|
@ -415,16 +448,7 @@ def add_pool(
|
|||
target_dedicated=config['pool_specification']['vm_count'],
|
||||
enable_inter_node_communication=True,
|
||||
start_task=batchmodels.StartTask(
|
||||
command_line='{} -o {} -s {}{}{}{}{}{}'.format(
|
||||
_NODEPREP_FILE[0],
|
||||
offer,
|
||||
sku,
|
||||
preg,
|
||||
' -p {}'.format(prefix) if prefix else '',
|
||||
' -t {}:{}'.format(p2pcomp, p2psbias) if p2p else '',
|
||||
' -c' if nonp2pcd else '',
|
||||
' -a' if azurefile_vd else '',
|
||||
),
|
||||
command_line=_wrap_commands_in_shell(start_task, wait=False),
|
||||
run_elevated=True,
|
||||
wait_for_success=True,
|
||||
environment_settings=[
|
||||
|
@ -432,6 +456,9 @@ def add_pool(
|
|||
batchmodels.EnvironmentSetting('CASCADE_SA', _STORAGEACCOUNT),
|
||||
batchmodels.EnvironmentSetting(
|
||||
'CASCADE_SAKEY', _STORAGEACCOUNTKEY),
|
||||
batchmodels.EnvironmentSetting(
|
||||
'CASCADE_EP',
|
||||
config['credentials']['storage'][ssel]['endpoint']),
|
||||
],
|
||||
resource_files=[],
|
||||
),
|
||||
|
@ -446,12 +473,12 @@ def add_pool(
|
|||
pool.start_task.environment_settings.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
'PRIVATE_REGISTRY_SA',
|
||||
config['docker_registry']['private']['storage_account'])
|
||||
config['credentials']['storage'][ssel]['account'])
|
||||
)
|
||||
pool.start_task.environment_settings.append(
|
||||
batchmodels.EnvironmentSetting(
|
||||
'PRIVATE_REGISTRY_SAKEY',
|
||||
config['docker_registry']['private']['storage_account_key'])
|
||||
config['credentials']['storage'][ssel]['account_key'])
|
||||
)
|
||||
if (dockeruser is not None and len(dockeruser) > 0 and
|
||||
dockerpw is not None and len(dockerpw) > 0):
|
||||
|
@ -464,6 +491,8 @@ def add_pool(
|
|||
# create pool if not exists
|
||||
try:
|
||||
print('Attempting to create pool:', pool.id)
|
||||
print(' >> node prep commandline: {}'.format(
|
||||
pool.start_task.command_line))
|
||||
batch_client.pool.add(pool)
|
||||
print('Created pool:', pool.id)
|
||||
except batchmodels.BatchErrorException as e:
|
||||
|
@ -499,13 +528,15 @@ def add_pool(
|
|||
for node in nodes:
|
||||
print(' > {}: {}'.format(node.id, node.state))
|
||||
time.sleep(10)
|
||||
get_remote_login_settings(batch_client, pool.id, nodes)
|
||||
get_remote_login_settings(batch_client, config, nodes)
|
||||
if any(node.state != batchmodels.ComputeNodeState.idle for node in nodes):
|
||||
raise RuntimeError('node(s) of pool {} not in idle state'.format(
|
||||
pool.id))
|
||||
|
||||
|
||||
def resize_pool(batch_client, pool_id, vm_count):
|
||||
def resize_pool(batch_client, config):
|
||||
pool_id = config['pool_specification']['id']
|
||||
vm_count = int(config['pool_specification']['vm_count'])
|
||||
print('Resizing pool {} to {}'.format(pool_id, vm_count))
|
||||
batch_client.pool.resize(
|
||||
pool_id=pool_id,
|
||||
|
@ -516,12 +547,14 @@ def resize_pool(batch_client, pool_id, vm_count):
|
|||
)
|
||||
|
||||
|
||||
def del_pool(batch_client, pool_id):
|
||||
def del_pool(batch_client, config):
|
||||
pool_id = config['pool_specification']['id']
|
||||
print('Deleting pool: {}'.format(pool_id))
|
||||
batch_client.pool.delete(pool_id)
|
||||
|
||||
|
||||
def del_node(batch_client, pool_id, node_id):
|
||||
def del_node(batch_client, config, node_id):
|
||||
pool_id = config['pool_specification']['id']
|
||||
print('Deleting node {} from pool {}'.format(node_id, pool_id))
|
||||
batch_client.pool.remove_nodes(
|
||||
pool_id=pool_id,
|
||||
|
@ -531,7 +564,7 @@ def del_node(batch_client, pool_id, node_id):
|
|||
)
|
||||
|
||||
|
||||
def add_job(batch_client, blob_client, config):
|
||||
def add_jobs(batch_client, blob_client, config):
|
||||
pool_id = config['pool_specification']['id']
|
||||
global_resources = []
|
||||
for gr in config['global_resources']['docker_images']:
|
||||
|
@ -731,12 +764,16 @@ def add_job(batch_client, blob_client, config):
|
|||
batch_client.task.add(job_id=job.id, task=task)
|
||||
|
||||
|
||||
def del_job(batch_client, job_id):
|
||||
def del_jobs(batch_client, config):
|
||||
for job in config['job_specifications']:
|
||||
job_id = job['id']
|
||||
print('Deleting job: {}'.format(job_id))
|
||||
batch_client.job.delete(job_id)
|
||||
|
||||
|
||||
def terminate_job(batch_client, job_id):
|
||||
def terminate_jobs(batch_client, config):
|
||||
for job in config['job_specifications']:
|
||||
job_id = job['id']
|
||||
print('Terminating job: {}'.format(job_id))
|
||||
batch_client.job.terminate(job_id)
|
||||
|
||||
|
@ -745,10 +782,12 @@ def del_all_jobs(batch_client):
|
|||
print('Listing jobs...')
|
||||
jobs = batch_client.job.list()
|
||||
for job in jobs:
|
||||
del_job(batch_client, job.id)
|
||||
print('Deleting job: {}'.format(job.id))
|
||||
batch_client.job.delete(job.id)
|
||||
|
||||
|
||||
def get_remote_login_settings(batch_client, pool_id, nodes=None):
|
||||
def get_remote_login_settings(batch_client, config, nodes=None):
|
||||
pool_id = config['pool_specification']['id']
|
||||
if nodes is None:
|
||||
nodes = batch_client.compute_node.list(pool_id)
|
||||
for node in nodes:
|
||||
|
@ -786,7 +825,7 @@ def _clear_table(table_client, table_name, config):
|
|||
print('clearing table: {}'.format(table_name))
|
||||
ents = table_client.query_entities(
|
||||
table_name, filter='PartitionKey eq \'{}${}\''.format(
|
||||
config['credentials']['batch_account'],
|
||||
config['credentials']['batch']['account'],
|
||||
config['pool_specification']['id'])
|
||||
)
|
||||
# batch delete entities
|
||||
|
@ -841,6 +880,7 @@ def _add_global_resource(
|
|||
else:
|
||||
raise NotImplementedError()
|
||||
resource = '{}:{}'.format(prefix, gr)
|
||||
print('adding global resource: {}'.format(resource))
|
||||
table_client.insert_or_replace_entity(
|
||||
_STORAGE_CONTAINERS['table_globalresources'],
|
||||
{
|
||||
|
@ -863,7 +903,7 @@ def populate_queues(queue_client, table_client, config):
|
|||
except KeyError:
|
||||
preg = False
|
||||
pk = '{}${}'.format(
|
||||
config['credentials']['batch_account'],
|
||||
config['credentials']['batch']['account'],
|
||||
config['pool_specification']['id'])
|
||||
# if using docker public hub, then populate registry table with hub
|
||||
if not preg:
|
||||
|
@ -923,15 +963,30 @@ def main():
|
|||
args = parseargs()
|
||||
args.action = args.action.lower()
|
||||
|
||||
if args.settings is None:
|
||||
raise ValueError('global settings not specified')
|
||||
if args.credentials is None:
|
||||
raise ValueError('credentials json not specified')
|
||||
if args.config is None:
|
||||
raise ValueError('config settings for action not specified')
|
||||
raise ValueError('config json not specified')
|
||||
|
||||
with open(args.settings, 'r') as f:
|
||||
with open(args.credentials, 'r') as f:
|
||||
config = json.load(f)
|
||||
with open(args.config, 'r') as f:
|
||||
config = merge_dict(config, json.load(f))
|
||||
try:
|
||||
with open(args.pool, 'r') as f:
|
||||
config = merge_dict(config, json.load(f))
|
||||
except Exception:
|
||||
config['pool_specification'] = {
|
||||
'id': args.poolid
|
||||
}
|
||||
if args.action in ('addjobs', 'deljobs'):
|
||||
try:
|
||||
with open(args.jobs, 'r') as f:
|
||||
config = merge_dict(config, json.load(f))
|
||||
except Exception:
|
||||
config['job_specifications'] = [{
|
||||
'id': args.jobid
|
||||
}]
|
||||
print('config:')
|
||||
print(json.dumps(config, indent=4))
|
||||
_populate_global_settings(config, args.action)
|
||||
|
@ -947,21 +1002,21 @@ def main():
|
|||
populate_queues(queue_client, table_client, config)
|
||||
add_pool(batch_client, blob_client, config)
|
||||
elif args.action == 'resizepool':
|
||||
resize_pool(batch_client, args.poolid, args.numvms)
|
||||
resize_pool(batch_client, config)
|
||||
elif args.action == 'delpool':
|
||||
del_pool(batch_client, args.poolid)
|
||||
del_pool(batch_client, config)
|
||||
elif args.action == 'delnode':
|
||||
del_node(batch_client, args.poolid, args.nodeid)
|
||||
elif args.action == 'addjob':
|
||||
add_job(batch_client, blob_client, config)
|
||||
del_node(batch_client, config, args.nodeid)
|
||||
elif args.action == 'addjobs':
|
||||
add_jobs(batch_client, blob_client, config)
|
||||
elif args.action == 'termjob':
|
||||
terminate_job(batch_client, blob_client, args.jobid)
|
||||
elif args.action == 'deljob':
|
||||
del_job(batch_client, args.jobid)
|
||||
terminate_jobs(batch_client, config)
|
||||
elif args.action == 'deljobs':
|
||||
del_jobs(batch_client, config)
|
||||
elif args.action == 'delalljobs':
|
||||
del_all_jobs(batch_client)
|
||||
elif args.action == 'grl':
|
||||
get_remote_login_settings(batch_client, args.poolid)
|
||||
get_remote_login_settings(batch_client, config)
|
||||
elif args.action == 'delstorage':
|
||||
delete_storage_containers(
|
||||
blob_client, queue_client, table_client, config)
|
||||
|
@ -983,14 +1038,18 @@ def parseargs():
|
|||
'action', help='action: addpool, addjob, termjob, delpool, '
|
||||
'delnode, deljob, delalljobs, grl, delstorage, clearstorage')
|
||||
parser.add_argument(
|
||||
'--settings',
|
||||
help='global settings json file config. required for all actions')
|
||||
'--credentials',
|
||||
help='credentials json config. required for all actions')
|
||||
parser.add_argument(
|
||||
'--config',
|
||||
help='json file config for option. required for all actions')
|
||||
parser.add_argument('--poolid', help='pool id')
|
||||
parser.add_argument('--jobid', help='job id')
|
||||
parser.add_argument('--nodeid', help='node id')
|
||||
help='general json config for option. required for all actions')
|
||||
parser.add_argument(
|
||||
'--pool',
|
||||
help='pool json config. required for most actions')
|
||||
parser.add_argument(
|
||||
'--jobs',
|
||||
help='jobs json config. required for job actions')
|
||||
parser.add_argument('--nodeid', help='node id for delnode action')
|
||||
return parser.parse_args()
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Загрузка…
Ссылка в новой задаче