diff --git a/cascade.py b/cascade.py index 0c320a3..b8ad781 100755 --- a/cascade.py +++ b/cascade.py @@ -48,6 +48,7 @@ _ENABLE_P2P = True _NON_P2P_CONCURRENT_DOWNLOADING = True _COMPRESSION = True _SEED_BIAS = 3 +_ALLOW_PUBLIC_PULL_WITH_PRIVATE = False _SAVELOAD_FILE_EXTENSION = 'tar.gz' _REGISTRY = None # mutable global state @@ -227,6 +228,32 @@ class DockerSaveThread(threading.Thread): _DIRECTDL_DOWNLOADING.append(self.resource) def run(self): + success = False + try: + self._pull_and_load() + success = True + except Exception as ex: + print(ex, file=sys.stderr) + finally: + # cancel callback + if _ENABLE_P2P or not _NON_P2P_CONCURRENT_DOWNLOADING: + _CBHANDLES[self.resource].cancel() + _CBHANDLES.pop(self.resource) + # release queue message + self.queue_client.update_message( + _STORAGE_CONTAINERS['queue_globalresources'], + message_id=self.msg_id, + pop_receipt=_QUEUE_MESSAGES[self.msg_id], + visibility_timeout=0) + _QUEUE_MESSAGES.pop(self.msg_id) + print('queue message released for {}'.format(self.resource)) + # remove from downloading list + if success: + with _DIRECTDL_LOCK: + _DIRECTDL_DOWNLOADING.remove(self.resource) + _DIRECTDL.remove(self.resource) + + def _pull_and_load(self): if _REGISTRY is None: raise RuntimeError( ('{} image specified for global resource, but there are ' @@ -242,13 +269,24 @@ class DockerSaveThread(threading.Thread): subprocess.check_output( 'docker pull {}'.format(image), shell=True) else: - subprocess.check_output( - 'docker pull {}/{}'.format(_REGISTRY, image), - shell=True) + _pub = False + try: + subprocess.check_output( + 'docker pull {}/{}'.format(_REGISTRY, image), + shell=True) + except subprocess.CalledProcessError: + if _ALLOW_PUBLIC_PULL_WITH_PRIVATE: + subprocess.check_output( + 'docker pull {}'.format(image), shell=True) + _pub = True + else: + raise # tag image to remove registry ip - subprocess.check_call( - 'docker tag {}/{} {}'.format(_REGISTRY, image, image), - shell=True) + if not _pub: + subprocess.check_call( + 'docker tag {}/{} {}'.format(_REGISTRY, image, image), + shell=True) + del _pub diff = (datetime.datetime.now() - start).total_seconds() print('took {} sec to pull docker image {} from {}'.format( diff, image, _REGISTRY)) @@ -365,22 +403,6 @@ class DockerSaveThread(threading.Thread): 'gr-done', 'nglobalresources={}'.format(self.nglobalresources)) _GR_DONE = True - # cancel callback - if _ENABLE_P2P or not _NON_P2P_CONCURRENT_DOWNLOADING: - _CBHANDLES[self.resource].cancel() - _CBHANDLES.pop(self.resource) - # release queue message - self.queue_client.update_message( - _STORAGE_CONTAINERS['queue_globalresources'], - message_id=self.msg_id, - pop_receipt=_QUEUE_MESSAGES[self.msg_id], - visibility_timeout=0) - _QUEUE_MESSAGES.pop(self.msg_id) - print('queue message released for {}'.format(self.resource)) - # remove from downloading list - with _DIRECTDL_LOCK: - _DIRECTDL_DOWNLOADING.remove(self.resource) - _DIRECTDL.remove(self.resource) async def _direct_download_resources_async( @@ -851,17 +873,18 @@ def main(): global _ENABLE_P2P, _NON_P2P_CONCURRENT_DOWNLOADING # get command-line args args = parseargs() - _NON_P2P_CONCURRENT_DOWNLOADING = args.nonp2pcd - _ENABLE_P2P = args.torrent + p2popts = args.p2popts.split(':') + _ENABLE_P2P = p2popts[0] == 'true' + _NON_P2P_CONCURRENT_DOWNLOADING = p2popts[1] # set p2p options if _ENABLE_P2P: if not _LIBTORRENT_IMPORTED: raise ImportError('No module named \'libtorrent\'') - global _COMPRESSION, _SEED_BIAS, _SAVELOAD_FILE_EXTENSION - p2popts = args.p2popts.split(':') - _COMPRESSION = p2popts[0] == 'true' - _SEED_BIAS = int(p2popts[1]) - del p2popts + global _COMPRESSION, _SEED_BIAS, _ALLOW_PUBLIC_PULL_WITH_PRIVATE, \ + _SAVELOAD_FILE_EXTENSION + _COMPRESSION = p2popts[3] == 'true' + _SEED_BIAS = int(p2popts[2]) + _ALLOW_PUBLIC_PULL_WITH_PRIVATE = p2popts[4] == 'true' if not _COMPRESSION: _SAVELOAD_FILE_EXTENSION = 'tar' print('peer-to-peer options: compression={} seedbias={}'.format( @@ -872,6 +895,7 @@ def main(): else: print('non-p2p concurrent downloading: {}'.format( _NON_P2P_CONCURRENT_DOWNLOADING)) + del p2popts # get event loop if _ON_WINDOWS: @@ -917,10 +941,11 @@ def parseargs(): """ parser = argparse.ArgumentParser( description='Cascade: Azure Batch P2P File/Image Replicator') - parser.set_defaults(ipaddress=None, nonp2pcd=False, torrent=True) + parser.set_defaults(ipaddress=None) parser.add_argument( - 'p2popts', nargs='?', - help='peer to peer options [compression:seed bias]') + 'p2popts', + help='peer to peer options [enabled:non-p2p concurrent ' + 'downloading:seed bias:compression:public pull passthrough]') parser.add_argument( '--ipaddress', help='ip address') parser.add_argument( diff --git a/graph.py b/graph.py index 9ea1650..1172517 100755 --- a/graph.py +++ b/graph.py @@ -26,16 +26,17 @@ def _create_credentials(config: dict): """ global _STORAGEACCOUNT, _STORAGEACCOUNTKEY, _BATCHACCOUNT, _POOLID, \ _PARTITION_KEY, _TABLE_NAME - _STORAGEACCOUNT = config['credentials']['storage_account'] - _STORAGEACCOUNTKEY = config['credentials']['storage_account_key'] - _BATCHACCOUNT = config['credentials']['batch_account'] - _POOLID = config['poolspec']['id'] + ssel = config['credentials']['shipyard_storage'] + _STORAGEACCOUNT = config['credentials']['storage'][ssel]['account'] + _STORAGEACCOUNTKEY = config['credentials']['storage'][ssel]['account_key'] + _BATCHACCOUNT = config['credentials']['batch']['account'] + _POOLID = config['pool_specification']['id'] _PARTITION_KEY = '{}${}'.format(_BATCHACCOUNT, _POOLID) _TABLE_NAME = config['storage_entity_prefix'] + 'perf' table_client = azuretable.TableService( account_name=_STORAGEACCOUNT, account_key=_STORAGEACCOUNTKEY, - endpoint_suffix=config['credentials']['storage_endpoint']) + endpoint_suffix=config['credentials']['storage'][ssel]['endpoint']) return table_client @@ -228,15 +229,17 @@ def main(): # get command-line args args = parseargs() - if args.settings is None: - raise ValueError('global settings not specified') + if args.credentials is None: + raise ValueError('credentials json not specified') if args.config is None: - raise ValueError('config settings for action not specified') + raise ValueError('config json not specified') - with open(args.settings, 'r') as f: + with open(args.credentials, 'r') as f: config = json.load(f) with open(args.config, 'r') as f: config = merge_dict(config, json.load(f)) + with open(args.pool, 'r') as f: + config = merge_dict(config, json.load(f)) # create storage credentials table_client = _create_credentials(config) @@ -253,11 +256,11 @@ def parseargs(): parser = argparse.ArgumentParser( description='Shipyard perf graph generator') parser.add_argument( - '--settings', - help='global settings json file config. required for all actions') + '--credentials', help='credentials json config') parser.add_argument( - '--config', - help='json file config for option. required for all actions') + '--config', help='general json config for option') + parser.add_argument( + '--pool', help='pool json config') return parser.parse_args() if __name__ == '__main__': diff --git a/scripts/nodeprep.sh b/scripts/nodeprep.sh index 2524950..d1c9305 100755 --- a/scripts/nodeprep.sh +++ b/scripts/nodeprep.sh @@ -4,34 +4,29 @@ set -e set -o pipefail azurefile=0 -nonp2pcd= offer= p2p= prefix= privatereg= sku= -while getopts "h?aco:p:r:s:t:" opt; do +while getopts "h?ao:p:r:s:t:" opt; do case "$opt" in h|\?) echo "nodeprep.sh parameters" echo "" echo "-a install azurefile docker volume driver" - echo "-c concurrent downloading in non-p2p mode" echo "-o [offer] VM offer" echo "-p [prefix] storage container prefix" echo "-r [container:archive:image id] private registry" echo "-s [sku] VM sku" - echo "-t [compression:seed bias] enable p2p sharing" + echo "-t [enabled:non-p2p concurrent download:seed bias:compression:pub pull passthrough] p2p sharing" echo "" exit 1 ;; a) azurefile=1 ;; - c) - nonp2pcd="--nonp2pcd" - ;; o) offer=${OPTARG,,} ;; @@ -76,14 +71,11 @@ PYTHONASYNCIODEBUG=1 # get ip address of eth0 ipaddress=`ip addr list eth0 | grep "inet " | cut -d' ' -f6 | cut -d/ -f1` -# set torrent flag and iptables rules -torrentflag= +# set iptables rules if [ ! -z "$p2p" ]; then # disable DHT connection tracking iptables -t raw -I PREROUTING -p udp --dport 6881 -j CT --notrack iptables -t raw -I OUTPUT -p udp --sport 6881 -j CT --notrack -else - torrentflag="--no-torrent" fi # copy job prep docker block file to shared @@ -196,7 +188,7 @@ fi # start cascade ./perf.py cascade start $prefix -./cascade.py $p2p --ipaddress $ipaddress $prefix $torrentflag $nonp2pcd > cascade.log & +./cascade.py $p2p --ipaddress $ipaddress $prefix > cascade.log & # if not in p2p mode, then wait for cascade exit if [ -z "$p2p" ]; then wait diff --git a/shipyard.py b/shipyard.py index 372f7d4..75a6b6c 100755 --- a/shipyard.py +++ b/shipyard.py @@ -65,23 +65,23 @@ def _populate_global_settings(config: dict, action: str): """ global _STORAGEACCOUNT, _STORAGEACCOUNTKEY, _BATCHACCOUNTKEY, \ _REGISTRY_FILE - _STORAGEACCOUNT = config['credentials']['storage_account'] - _STORAGEACCOUNTKEY = config['credentials']['storage_account_key'] - _BATCHACCOUNTKEY = config['credentials']['batch_account_key'] + ssel = config['credentials']['shipyard_storage'] + _STORAGEACCOUNT = config['credentials']['storage'][ssel]['account'] + _STORAGEACCOUNTKEY = config['credentials']['storage'][ssel]['account_key'] + _BATCHACCOUNTKEY = config['credentials']['batch']['account_key'] try: sep = config['storage_entity_prefix'] except KeyError: sep = None if sep is None: sep = '' + postfix = '-'.join( + (config['credentials']['batch']['account'].lower(), + config['pool_specification']['id'].lower())) _STORAGE_CONTAINERS['blob_resourcefiles'] = '-'.join( - (sep + 'resourcefiles', - config['credentials']['batch_account'].lower(), - config['pool_specification']['id'].lower())) + (sep + 'resourcefiles', postfix)) _STORAGE_CONTAINERS['blob_torrents'] = '-'.join( - (sep + 'torrents', - config['credentials']['batch_account'].lower(), - config['pool_specification']['id'].lower())) + (sep + 'torrents', postfix)) _STORAGE_CONTAINERS['table_dht'] = sep + 'dht' _STORAGE_CONTAINERS['table_registry'] = sep + 'registry' _STORAGE_CONTAINERS['table_torrentinfo'] = sep + 'torrentinfo' @@ -89,43 +89,46 @@ def _populate_global_settings(config: dict, action: str): _STORAGE_CONTAINERS['table_globalresources'] = sep + 'globalresources' _STORAGE_CONTAINERS['table_perf'] = sep + 'perf' _STORAGE_CONTAINERS['queue_globalresources'] = '-'.join( - (sep + 'globalresources', - config['credentials']['batch_account'].lower(), - config['pool_specification']['id'].lower())) + (sep + 'globalresources', postfix)) if action != 'addpool': return try: - if config['docker_registry']['private']['enabled']: + dpre = config['docker_registry']['private']['enabled'] + except KeyError: + dpre = False + if dpre: + try: rf = config['docker_registry']['private'][ 'docker_save_registry_file'] - prf = pathlib.Path(rf) - # attempt to package if registry file doesn't exist - if not prf.exists(): - print('attempting to generate docker private registry tarball') - try: - output = subprocess.check_output( - 'sudo docker images -q registry:2', shell=True) - output = output.decode('utf-8').strip() - except subprocess.CalledProcessError as ex: - pass - else: - if len(output) == 12: - rf = 'resources/docker_registry_v2.tar.gz' - prf = pathlib.Path(rf) - config['docker_registry']['private'][ - 'docker_save_registry_image_id'] = output - subprocess.check_call( - 'sudo docker save registry:2 ' - '| gzip -c > {}'.format(rf), shell=True) - _REGISTRY_FILE = ( - prf.name, - rf, - config['docker_registry']['private'][ - 'docker_save_registry_image_id'] - ) - else: - _REGISTRY_FILE = (None, None, None) - except Exception: + except KeyError: + rf = 'resources/docker-registry-v2.tar.gz' + prf = pathlib.Path(rf) + # attempt to package if registry file doesn't exist + if not prf.exists() or prf.stat().st_size == 0: + print('attempting to generate docker private registry tarball') + try: + prf.parent.mkdir(mode=0o750, parents=True, exist_ok=True) + output = subprocess.check_output( + 'sudo docker images -q registry:2', shell=True) + output = output.decode('utf-8').strip() + except subprocess.CalledProcessError as ex: + pass + else: + if len(output) == 12: + rf = 'resources/docker-registry-v2.tar.gz' + prf = pathlib.Path(rf) + config['docker_registry']['private'][ + 'docker_save_registry_image_id'] = output + subprocess.check_call( + 'sudo docker save registry:2 ' + '| gzip -c > {}'.format(rf), shell=True) + _REGISTRY_FILE = ( + prf.name, + rf, + config['docker_registry']['private'][ + 'docker_save_registry_image_id'] + ) + else: _REGISTRY_FILE = (None, None, None) print('private registry settings: {}'.format(_REGISTRY_FILE)) @@ -147,24 +150,25 @@ def _create_credentials(config: dict) -> tuple: :rtype: tuple :return: (batch client, blob client, queue client, table client) """ + ssel = config['credentials']['shipyard_storage'] credentials = batchauth.SharedKeyCredentials( - config['credentials']['batch_account'], + config['credentials']['batch']['account'], _BATCHACCOUNTKEY) batch_client = batch.BatchServiceClient( credentials, - base_url=config['credentials']['batch_account_service_url']) + base_url=config['credentials']['batch']['account_service_url']) blob_client = azureblob.BlockBlobService( account_name=_STORAGEACCOUNT, account_key=_STORAGEACCOUNTKEY, - endpoint_suffix=config['credentials']['storage_endpoint']) + endpoint_suffix=config['credentials']['storage'][ssel]['endpoint']) queue_client = azurequeue.QueueService( account_name=_STORAGEACCOUNT, account_key=_STORAGEACCOUNTKEY, - endpoint_suffix=config['credentials']['storage_endpoint']) + endpoint_suffix=config['credentials']['storage'][ssel]['endpoint']) table_client = azuretable.TableService( account_name=_STORAGEACCOUNT, account_key=_STORAGEACCOUNTKEY, - endpoint_suffix=config['credentials']['storage_endpoint']) + endpoint_suffix=config['credentials']['storage'][ssel]['endpoint']) return batch_client, blob_client, queue_client, table_client @@ -199,6 +203,7 @@ def upload_resource_files( :rtype: dict :return: sas url dict """ + ssel = config['credentials']['shipyard_storage'] sas_urls = {} for file in files: # skip if no file is specified @@ -228,7 +233,7 @@ def upload_resource_files( _STORAGE_CONTAINERS['blob_resourcefiles'], file[0], file[1]) sas_urls[file[0]] = 'https://{}.blob.{}/{}/{}?{}'.format( _STORAGEACCOUNT, - config['credentials']['storage_endpoint'], + config['credentials']['storage'][ssel]['endpoint'], _STORAGE_CONTAINERS['blob_resourcefiles'], file[0], blob_client.generate_blob_shared_access_signature( _STORAGE_CONTAINERS['blob_resourcefiles'], file[0], @@ -266,8 +271,9 @@ def setup_azurefile_volume_driver( 'global_resources']['docker_volumes']['shared_data_volumes'][svkey] if conf['volume_driver'] == 'azurefile': # check every entry to ensure the same storage account - _sa = conf['storage_account'] - _sakey = conf['storage_account_key'] + ssel = conf['storage_account_settings'] + _sa = config['credentials']['storage'][ssel]['account'] + _sakey = config['credentials']['storage'][ssel]['account_key'] if sa is not None and sa != _sa: raise ValueError( 'multiple storage accounts are not supported for ' @@ -330,6 +336,8 @@ def add_pool( except KeyError: p2pcomp = True else: + p2psbias = 0 + p2pcomp = False try: nonp2pcd = config[ 'data_replication']['non_peer_to_peer_concurrent_downloading'] @@ -339,13 +347,20 @@ def add_pool( try: preg = config['docker_registry']['private']['enabled'] pcont = config['docker_registry']['private']['container'] + pregpubpull = config['docker_registry']['private'][ + 'allow_public_docker_hub_pull_on_missing'] except KeyError: preg = False + pregpubpull = False + # create private registry flags if preg: preg = ' -r {}:{}:{}'.format( pcont, _REGISTRY_FILE[0], _REGISTRY_FILE[2]) else: preg = '' + # create torrent flags + torrentflags = ' -t {}:{}:{}:{}:{}'.format( + p2p, nonp2pcd, p2psbias, p2pcomp, pregpubpull) # docker settings try: dockeruser = config['docker_registry']['login']['username'] @@ -405,6 +420,24 @@ def add_pool( # upload resource files sas_urls = upload_resource_files(blob_client, config, _rflist) del _rflist + # create start task commandline + start_task = [ + '{} -o {} -s {}{}{}{}{}'.format( + _NODEPREP_FILE[0], + offer, + sku, + preg, + torrentflags, + ' -p {}'.format(prefix) if prefix else '', + ' -a' if azurefile_vd else '', + ), + ] + try: + start_task.extend( + config['pool_specification']['additional_node_prep_commands']) + except KeyError: + pass + ssel = config['docker_registry']['private']['storage_account_settings'] # create pool param pool = batchmodels.PoolAddParameter( id=config['pool_specification']['id'], @@ -415,16 +448,7 @@ def add_pool( target_dedicated=config['pool_specification']['vm_count'], enable_inter_node_communication=True, start_task=batchmodels.StartTask( - command_line='{} -o {} -s {}{}{}{}{}{}'.format( - _NODEPREP_FILE[0], - offer, - sku, - preg, - ' -p {}'.format(prefix) if prefix else '', - ' -t {}:{}'.format(p2pcomp, p2psbias) if p2p else '', - ' -c' if nonp2pcd else '', - ' -a' if azurefile_vd else '', - ), + command_line=_wrap_commands_in_shell(start_task, wait=False), run_elevated=True, wait_for_success=True, environment_settings=[ @@ -432,6 +456,9 @@ def add_pool( batchmodels.EnvironmentSetting('CASCADE_SA', _STORAGEACCOUNT), batchmodels.EnvironmentSetting( 'CASCADE_SAKEY', _STORAGEACCOUNTKEY), + batchmodels.EnvironmentSetting( + 'CASCADE_EP', + config['credentials']['storage'][ssel]['endpoint']), ], resource_files=[], ), @@ -446,12 +473,12 @@ def add_pool( pool.start_task.environment_settings.append( batchmodels.EnvironmentSetting( 'PRIVATE_REGISTRY_SA', - config['docker_registry']['private']['storage_account']) + config['credentials']['storage'][ssel]['account']) ) pool.start_task.environment_settings.append( batchmodels.EnvironmentSetting( 'PRIVATE_REGISTRY_SAKEY', - config['docker_registry']['private']['storage_account_key']) + config['credentials']['storage'][ssel]['account_key']) ) if (dockeruser is not None and len(dockeruser) > 0 and dockerpw is not None and len(dockerpw) > 0): @@ -464,6 +491,8 @@ def add_pool( # create pool if not exists try: print('Attempting to create pool:', pool.id) + print(' >> node prep commandline: {}'.format( + pool.start_task.command_line)) batch_client.pool.add(pool) print('Created pool:', pool.id) except batchmodels.BatchErrorException as e: @@ -499,13 +528,15 @@ def add_pool( for node in nodes: print(' > {}: {}'.format(node.id, node.state)) time.sleep(10) - get_remote_login_settings(batch_client, pool.id, nodes) + get_remote_login_settings(batch_client, config, nodes) if any(node.state != batchmodels.ComputeNodeState.idle for node in nodes): raise RuntimeError('node(s) of pool {} not in idle state'.format( pool.id)) -def resize_pool(batch_client, pool_id, vm_count): +def resize_pool(batch_client, config): + pool_id = config['pool_specification']['id'] + vm_count = int(config['pool_specification']['vm_count']) print('Resizing pool {} to {}'.format(pool_id, vm_count)) batch_client.pool.resize( pool_id=pool_id, @@ -516,12 +547,14 @@ def resize_pool(batch_client, pool_id, vm_count): ) -def del_pool(batch_client, pool_id): +def del_pool(batch_client, config): + pool_id = config['pool_specification']['id'] print('Deleting pool: {}'.format(pool_id)) batch_client.pool.delete(pool_id) -def del_node(batch_client, pool_id, node_id): +def del_node(batch_client, config, node_id): + pool_id = config['pool_specification']['id'] print('Deleting node {} from pool {}'.format(node_id, pool_id)) batch_client.pool.remove_nodes( pool_id=pool_id, @@ -531,7 +564,7 @@ def del_node(batch_client, pool_id, node_id): ) -def add_job(batch_client, blob_client, config): +def add_jobs(batch_client, blob_client, config): pool_id = config['pool_specification']['id'] global_resources = [] for gr in config['global_resources']['docker_images']: @@ -731,24 +764,30 @@ def add_job(batch_client, blob_client, config): batch_client.task.add(job_id=job.id, task=task) -def del_job(batch_client, job_id): - print('Deleting job: {}'.format(job_id)) - batch_client.job.delete(job_id) +def del_jobs(batch_client, config): + for job in config['job_specifications']: + job_id = job['id'] + print('Deleting job: {}'.format(job_id)) + batch_client.job.delete(job_id) -def terminate_job(batch_client, job_id): - print('Terminating job: {}'.format(job_id)) - batch_client.job.terminate(job_id) +def terminate_jobs(batch_client, config): + for job in config['job_specifications']: + job_id = job['id'] + print('Terminating job: {}'.format(job_id)) + batch_client.job.terminate(job_id) def del_all_jobs(batch_client): print('Listing jobs...') jobs = batch_client.job.list() for job in jobs: - del_job(batch_client, job.id) + print('Deleting job: {}'.format(job.id)) + batch_client.job.delete(job.id) -def get_remote_login_settings(batch_client, pool_id, nodes=None): +def get_remote_login_settings(batch_client, config, nodes=None): + pool_id = config['pool_specification']['id'] if nodes is None: nodes = batch_client.compute_node.list(pool_id) for node in nodes: @@ -786,7 +825,7 @@ def _clear_table(table_client, table_name, config): print('clearing table: {}'.format(table_name)) ents = table_client.query_entities( table_name, filter='PartitionKey eq \'{}${}\''.format( - config['credentials']['batch_account'], + config['credentials']['batch']['account'], config['pool_specification']['id']) ) # batch delete entities @@ -841,6 +880,7 @@ def _add_global_resource( else: raise NotImplementedError() resource = '{}:{}'.format(prefix, gr) + print('adding global resource: {}'.format(resource)) table_client.insert_or_replace_entity( _STORAGE_CONTAINERS['table_globalresources'], { @@ -863,7 +903,7 @@ def populate_queues(queue_client, table_client, config): except KeyError: preg = False pk = '{}${}'.format( - config['credentials']['batch_account'], + config['credentials']['batch']['account'], config['pool_specification']['id']) # if using docker public hub, then populate registry table with hub if not preg: @@ -923,15 +963,30 @@ def main(): args = parseargs() args.action = args.action.lower() - if args.settings is None: - raise ValueError('global settings not specified') + if args.credentials is None: + raise ValueError('credentials json not specified') if args.config is None: - raise ValueError('config settings for action not specified') + raise ValueError('config json not specified') - with open(args.settings, 'r') as f: + with open(args.credentials, 'r') as f: config = json.load(f) with open(args.config, 'r') as f: config = merge_dict(config, json.load(f)) + try: + with open(args.pool, 'r') as f: + config = merge_dict(config, json.load(f)) + except Exception: + config['pool_specification'] = { + 'id': args.poolid + } + if args.action in ('addjobs', 'deljobs'): + try: + with open(args.jobs, 'r') as f: + config = merge_dict(config, json.load(f)) + except Exception: + config['job_specifications'] = [{ + 'id': args.jobid + }] print('config:') print(json.dumps(config, indent=4)) _populate_global_settings(config, args.action) @@ -947,21 +1002,21 @@ def main(): populate_queues(queue_client, table_client, config) add_pool(batch_client, blob_client, config) elif args.action == 'resizepool': - resize_pool(batch_client, args.poolid, args.numvms) + resize_pool(batch_client, config) elif args.action == 'delpool': - del_pool(batch_client, args.poolid) + del_pool(batch_client, config) elif args.action == 'delnode': - del_node(batch_client, args.poolid, args.nodeid) - elif args.action == 'addjob': - add_job(batch_client, blob_client, config) + del_node(batch_client, config, args.nodeid) + elif args.action == 'addjobs': + add_jobs(batch_client, blob_client, config) elif args.action == 'termjob': - terminate_job(batch_client, blob_client, args.jobid) - elif args.action == 'deljob': - del_job(batch_client, args.jobid) + terminate_jobs(batch_client, config) + elif args.action == 'deljobs': + del_jobs(batch_client, config) elif args.action == 'delalljobs': del_all_jobs(batch_client) elif args.action == 'grl': - get_remote_login_settings(batch_client, args.poolid) + get_remote_login_settings(batch_client, config) elif args.action == 'delstorage': delete_storage_containers( blob_client, queue_client, table_client, config) @@ -983,14 +1038,18 @@ def parseargs(): 'action', help='action: addpool, addjob, termjob, delpool, ' 'delnode, deljob, delalljobs, grl, delstorage, clearstorage') parser.add_argument( - '--settings', - help='global settings json file config. required for all actions') + '--credentials', + help='credentials json config. required for all actions') parser.add_argument( '--config', - help='json file config for option. required for all actions') - parser.add_argument('--poolid', help='pool id') - parser.add_argument('--jobid', help='job id') - parser.add_argument('--nodeid', help='node id') + help='general json config for option. required for all actions') + parser.add_argument( + '--pool', + help='pool json config. required for most actions') + parser.add_argument( + '--jobs', + help='jobs json config. required for job actions') + parser.add_argument('--nodeid', help='node id for delnode action') return parser.parse_args() if __name__ == '__main__':