From e32fc4d93efde382f854f8fe4cdf655a8fcc06bb Mon Sep 17 00:00:00 2001 From: Fred Park Date: Fri, 21 Jul 2017 10:32:04 -0700 Subject: [PATCH] Add Autopool support - Resolves #33 - Add --poolid to storage clear and storage del - jobs del and jobs term now cleanup storage data if autopool is detected --- README.md | 2 + cascade/cascade.py | 30 +- config_templates/jobs.json | 6 +- convoy/batch.py | 81 ++++-- convoy/fleet.py | 273 +++++++++++++++++-- convoy/settings.py | 30 +- convoy/storage.py | 45 ++- docs/14-batch-shipyard-configuration-jobs.md | 31 ++- docs/20-batch-shipyard-usage.md | 11 +- docs/96-troubleshooting-guide.md | 10 + shipyard.py | 23 +- 11 files changed, 454 insertions(+), 88 deletions(-) diff --git a/README.md b/README.md index f58858c..8c450d8 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,8 @@ full pass-through of the [Azure Batch API](https://azure.microsoft.com/en-us/documentation/articles/batch-api-basics/) to containers executed on compute nodes * Support for [Low Priority Compute Nodes](https://docs.microsoft.com/en-us/azure/batch/batch-low-pri-vms) +* Support for Azure Batch "auto" concepts, including [autoscale](https://github.com/Azure/batch-shipyard/blob/master/docs/30-batch-shipyard-autoscale.md) and autopool +to dynamically scale and control computing resources on-demand * Support for [Azure Batch task dependencies](https://azure.microsoft.com/en-us/documentation/articles/batch-task-dependencies/) allowing complex processing pipelines and DAGs with Docker containers diff --git a/cascade/cascade.py b/cascade/cascade.py index f7d9be9..3029a73 100755 --- a/cascade/cascade.py +++ b/cascade/cascade.py @@ -63,12 +63,10 @@ _TORRENT_STATE = [ 'seeding', 'allocating', 'checking fastresume' ] _TORRENT_SESSION = None -_BATCHACCOUNT = os.environ['AZ_BATCH_ACCOUNT_NAME'] -_POOLID = os.environ['AZ_BATCH_POOL_ID'] _NODEID = os.environ['AZ_BATCH_NODE_ID'] _SHARED_DIR = os.environ['AZ_BATCH_NODE_SHARED_DIR'] _TORRENT_DIR = pathlib.Path(_SHARED_DIR, '.torrents') -_PARTITION_KEY = '{}${}'.format(_BATCHACCOUNT, _POOLID) +_PARTITION_KEY = None _LR_LOCK_ASYNC = asyncio.Lock() _PT_LOCK = threading.Lock() _DIRECTDL_LOCK = threading.Lock() @@ -140,22 +138,32 @@ def _setup_logger() -> None: logger.info('logger initialized, log file: {}'.format(logloc)) -def _setup_container_names(sep: str) -> None: - """Set up storage container names +def _setup_storage_names(sep: str) -> None: + """Set up storage names :param str sep: storage container prefix """ + global _PARTITION_KEY, _PREFIX + # transform pool id if necessary + poolid = os.environ['AZ_BATCH_POOL_ID'].lower() + autopool = os.environ.get('SHIPYARD_AUTOPOOL', default=None) + # remove guid portion of pool id if autopool + if autopool is not None: + poolid = poolid[:-37] + # set partition key + batchaccount = os.environ['AZ_BATCH_ACCOUNT_NAME'].lower() + _PARTITION_KEY = '{}${}'.format(batchaccount, poolid) + # set container names if sep is None or len(sep) == 0: raise ValueError('storage_entity_prefix is invalid') _STORAGE_CONTAINERS['blob_globalresources'] = '-'.join( - (sep + 'gr', _BATCHACCOUNT.lower(), _POOLID.lower())) + (sep + 'gr', batchaccount, poolid)) _STORAGE_CONTAINERS['blob_torrents'] = '-'.join( - (sep + 'tor', _BATCHACCOUNT.lower(), _POOLID.lower())) + (sep + 'tor', batchaccount, poolid)) _STORAGE_CONTAINERS['table_dht'] = sep + 'dht' _STORAGE_CONTAINERS['table_registry'] = sep + 'registry' _STORAGE_CONTAINERS['table_torrentinfo'] = sep + 'torrentinfo' _STORAGE_CONTAINERS['table_images'] = sep + 'images' _STORAGE_CONTAINERS['table_globalresources'] = sep + 'gr' - global _PREFIX _PREFIX = sep @@ -1193,7 +1201,7 @@ async def _get_ipaddress_async(loop: asyncio.BaseEventLoop) -> str: def main(): """Main function""" global _ENABLE_P2P, _CONCURRENT_DOWNLOADS_ALLOWED, \ - _ALLOW_PUBLIC_PULL_WITH_PRIVATE + _ALLOW_PUBLIC_PULL_WITH_PRIVATE, _POOL_ID # get command-line args args = parseargs() p2popts = args.p2popts.split(':') @@ -1235,8 +1243,8 @@ def main(): ipaddress = args.ipaddress logger.debug('ip address: {}'.format(ipaddress)) - # set up container names - _setup_container_names(args.prefix) + # set up storage names + _setup_storage_names(args.prefix) # create storage credentials blob_client, table_client = _create_credentials() diff --git a/config_templates/jobs.json b/config_templates/jobs.json index 6d7ae8a..6fc986d 100644 --- a/config_templates/jobs.json +++ b/config_templates/jobs.json @@ -11,7 +11,6 @@ "max_wall_time": "02:00:00", "retention_time": "1.12:00:00", "priority": 0, - "allow_run_on_missing_image": false, "user_identity": { "default_pool_admin": true, "specific_user": { @@ -19,6 +18,11 @@ "gid": 1001 } }, + "auto_pool": { + "pool_lifetime": "job", + "keep_alive": false + }, + "allow_run_on_missing_image": false, "remove_container_after_exit": true, "shm_size": "256m", "infiniband": false, diff --git a/convoy/batch.py b/convoy/batch.py index f095a89..4557468 100644 --- a/convoy/batch.py +++ b/convoy/batch.py @@ -2302,6 +2302,32 @@ def generate_docker_login_settings(config, for_ssh=False): return env, cmd +def check_jobs_for_auto_pool(config): + # type: (dict) -> bool + """Check jobs for auto pool + :param batch_client: The batch client to use. + :type batch_client: `azure.batch.batch_service_client.BatchServiceClient` + :param dict config: configuration dict + :rtype: bool + :return: if auto pool is enabled + """ + # ensure all jobspecs uniformly have autopool or all off + autopool = [] + for jobspec in settings.job_specifications(config): + if settings.job_auto_pool(jobspec) is None: + autopool.append(False) + else: + autopool.append(True) + if autopool.count(False) == len(autopool): + logger.debug('autopool not detected for jobs') + return False + elif autopool.count(True) == len(autopool): + logger.debug('autopool detected for jobs') + return True + else: + raise ValueError('all jobs must have auto_pool enabled or disabled') + + def _format_generic_task_id(tasknum): # type: (int) -> str """Format a generic task id from a task number @@ -2425,17 +2451,18 @@ def _add_task_collection(batch_client, job_id, task_map): def add_jobs( - batch_client, blob_client, keyvault_client, config, jpfile, bxfile, - recreate=False, tail=None): + batch_client, blob_client, keyvault_client, config, autopool, jpfile, + bxfile, recreate=False, tail=None): # type: (batch.BatchServiceClient, azureblob.BlockBlobService, - # azure.keyvault.KeyVaultClient, dict, tuple, tuple, bool, - # str) -> None + # azure.keyvault.KeyVaultClient, dict, + # batchmodels.PoolSpecification, tuple, tuple, bool, str) -> None """Add jobs :param batch_client: The batch client to use. :type batch_client: `azure.batch.batch_service_client.BatchServiceClient` :param azure.storage.blob.BlockBlobService blob_client: blob client :param azure.keyvault.KeyVaultClient keyvault_client: keyvault client :param dict config: configuration dict + :param batchmodels.PoolSpecification autopool: auto pool specification :param tuple jpfile: jobprep file :param tuple bxfile: blobxfer file :param bool recreate: recreate job if completed @@ -2447,16 +2474,17 @@ def add_jobs( try: cloud_pool = batch_client.pool.get(pool.id) except batchmodels.batch_error.BatchErrorException as ex: - if 'The specified pool does not exist.' in ex.message.value: - logger.error('{} pool does not exist'.format(pool.id)) - if util.confirm_action( - config, 'add jobs to nonexistant pool {}'.format(pool.id)): - cloud_pool = None - else: - logger.error( - 'not submitting jobs to nonexistant pool {}'.format( - pool.id)) - return + if 'The specified pool does not exist' in ex.message.value: + cloud_pool = None + if autopool is None: + logger.error('{} pool does not exist'.format(pool.id)) + if not util.confirm_action( + config, + 'add jobs to nonexistant pool {}'.format(pool.id)): + logger.error( + 'not submitting jobs to nonexistant pool {}'.format( + pool.id)) + return else: raise preg = settings.docker_registry_private_settings(config) @@ -2572,10 +2600,32 @@ def add_jobs( user_identity=_RUN_ELEVATED, rerun_on_node_reboot_after_success=False, ) + # construct pool info + if autopool is None: + pool_info = batchmodels.PoolInformation(pool_id=pool.id) + else: + autopool_settings = settings.job_auto_pool(jobspec) + if autopool_settings is None: + raise ValueError( + 'auto_pool settings is invalid for job {}'.format( + settings.job_id(jobspec))) + if autopool_settings.pool_lifetime == 'job_schedule': + autopool_plo = batchmodels.PoolLifetimeOption.job_schedule + else: + autopool_plo = batchmodels.PoolLifetimeOption( + autopool_settings.pool_lifetime) + pool_info = batchmodels.PoolInformation( + auto_pool_specification=batchmodels.AutoPoolSpecification( + auto_pool_id_prefix=pool.id, + pool_lifetime_option=autopool_plo, + keep_alive=autopool_settings.keep_alive, + pool=autopool, + ) + ) # create job job = batchmodels.JobAddParameter( id=settings.job_id(jobspec), - pool_info=batchmodels.PoolInformation(pool_id=pool.id), + pool_info=pool_info, constraints=job_constraints, uses_task_dependencies=uses_task_dependencies, job_preparation_task=jptask, @@ -2820,7 +2870,6 @@ def add_jobs( batch_client.job.patch( job_id=job.id, job_patch_parameter=batchmodels.JobPatchParameter( - pool_info=batchmodels.PoolInformation(pool_id=pool.id), on_all_tasks_complete=batchmodels. OnAllTasksComplete.terminate_job)) # tail file if specified diff --git a/convoy/fleet.py b/convoy/fleet.py index 82d72f9..e1b0f1b 100644 --- a/convoy/fleet.py +++ b/convoy/fleet.py @@ -247,11 +247,12 @@ def check_for_invalid_config(config): 'property.') -def populate_global_settings(config, fs_storage): +def populate_global_settings(config, fs_storage, pool_id=None): # type: (dict, bool) -> None """Populate global settings from config :param dict config: configuration dict :param bool fs_storage: adjust for fs context + :param str pool_id: pool id override """ bs = settings.batch_shipyard_settings(config) sc = settings.credentials_storage(config, bs.storage_account_settings) @@ -259,10 +260,13 @@ def populate_global_settings(config, fs_storage): # set postfix to empty for now, it will be populated with the # storage cluster during the actual calls postfix = '' + if util.is_not_empty(pool_id): + raise ValueError('pool id specified for fs_storage') else: bc = settings.credentials_batch(config) - postfix = '-'.join( - (bc.account.lower(), settings.pool_id(config, lower=True))) + if util.is_none_or_empty(pool_id): + pool_id = settings.pool_id(config, lower=True) + postfix = '-'.join((bc.account.lower(), pool_id)) storage.set_storage_configuration( bs.storage_entity_prefix, postfix, @@ -671,7 +675,7 @@ def _create_storage_cluster_mount_args( return (fstab_mount, sc_arg) -def _add_pool( +def _construct_pool_object( resource_client, compute_client, network_client, batch_mgmt_client, batch_client, blob_client, config): # type: (azure.mgmt.resource.resources.ResourceManagementClient, @@ -680,7 +684,8 @@ def _add_pool( # azure.mgmt.batch.BatchManagementClient, # azure.batch.batch_service_client.BatchServiceClient, # azureblob.BlockBlobService, dict) -> None - """Add a Batch pool to account + """Construct a pool add parameter object for create pool along with + uploading resource files :param azure.mgmt.resource.resources.ResourceManagementClient resource_client: resource client :param azure.mgmt.compute.ComputeManagementClient compute_client: @@ -820,12 +825,6 @@ def _add_pool( block_for_gr = ','.join([x for x in images]) else: logger.warning('no docker images specified in global resources') - # ingress data to Azure Blob Storage if specified - storage_threads = [] - if pool_settings.transfer_files_on_pool_creation: - storage_threads = data.ingress_data( - batch_client, compute_client, network_client, config, rls=None, - kind='storage') # shipyard settings bs = settings.batch_shipyard_settings(config) # data replication and peer-to-peer settings @@ -1089,6 +1088,92 @@ def _add_pool( ) pool.start_task.environment_settings.extend( batch.generate_docker_login_settings(config)[0]) + return (pool_settings, gluster_on_compute, pool) + + +def _construct_auto_pool_specification( + resource_client, compute_client, network_client, batch_mgmt_client, + batch_client, blob_client, config): + # type: (azure.mgmt.resource.resources.ResourceManagementClient, + # azure.mgmt.compute.ComputeManagementClient, + # azure.mgmt.network.NetworkManagementClient, + # azure.mgmt.batch.BatchManagementClient, + # azure.batch.batch_service_client.BatchServiceClient, + # azureblob.BlockBlobService, dict) -> None + """Construct an auto pool specification + :param azure.mgmt.resource.resources.ResourceManagementClient + resource_client: resource client + :param azure.mgmt.compute.ComputeManagementClient compute_client: + compute client + :param azure.mgmt.network.NetworkManagementClient network_client: + network client + :param azure.mgmt.batch.BatchManagementClient: batch_mgmt_client + :param azure.batch.batch_service_client.BatchServiceClient batch_client: + batch client + :param azure.storage.blob.BlockBlobService blob_client: blob client + :param dict config: configuration dict + """ + # upload resource files and construct pool add parameter object + pool_settings, gluster_on_compute, pool = _construct_pool_object( + resource_client, compute_client, network_client, batch_mgmt_client, + batch_client, blob_client, config) + # convert pool add parameter object to a pool specification object + poolspec = batchmodels.PoolSpecification( + vm_size=pool.vm_size, + virtual_machine_configuration=pool.virtual_machine_configuration, + max_tasks_per_node=pool.max_tasks_per_node, + task_scheduling_policy=pool.task_scheduling_policy, + resize_timeout=pool.resize_timeout, + target_dedicated_nodes=pool.target_dedicated_nodes, + target_low_priority_nodes=pool.target_low_priority_nodes, + enable_auto_scale=pool.enable_auto_scale, + auto_scale_formula=pool.auto_scale_formula, + auto_scale_evaluation_interval=pool.auto_scale_evaluation_interval, + enable_inter_node_communication=pool.enable_inter_node_communication, + network_configuration=pool.network_configuration, + start_task=pool.start_task, + certificate_references=pool.certificate_references, + metadata=pool.metadata, + ) + # add auto pool env var for cascade + poolspec.start_task.environment_settings.append( + batchmodels.EnvironmentSetting('SHIPYARD_AUTOPOOL', 1) + ) + return poolspec + + +def _add_pool( + resource_client, compute_client, network_client, batch_mgmt_client, + batch_client, blob_client, config): + # type: (azure.mgmt.resource.resources.ResourceManagementClient, + # azure.mgmt.compute.ComputeManagementClient, + # azure.mgmt.network.NetworkManagementClient, + # azure.mgmt.batch.BatchManagementClient, + # azure.batch.batch_service_client.BatchServiceClient, + # azureblob.BlockBlobService, dict) -> None + """Add a Batch pool to account + :param azure.mgmt.resource.resources.ResourceManagementClient + resource_client: resource client + :param azure.mgmt.compute.ComputeManagementClient compute_client: + compute client + :param azure.mgmt.network.NetworkManagementClient network_client: + network client + :param azure.mgmt.batch.BatchManagementClient: batch_mgmt_client + :param azure.batch.batch_service_client.BatchServiceClient batch_client: + batch client + :param azure.storage.blob.BlockBlobService blob_client: blob client + :param dict config: configuration dict + """ + # upload resource files and construct pool add parameter object + pool_settings, gluster_on_compute, pool = _construct_pool_object( + resource_client, compute_client, network_client, batch_mgmt_client, + batch_client, blob_client, config) + # ingress data to Azure Blob Storage if specified + storage_threads = [] + if pool_settings.transfer_files_on_pool_creation: + storage_threads = data.ingress_data( + batch_client, compute_client, network_client, config, rls=None, + kind='storage') # create pool nodes = batch.create_pool(batch_client, config, pool) _pool = batch_client.pool.get(pool.id) @@ -1721,6 +1806,31 @@ def _adjust_settings_for_pool_creation(config): logger.warning('cannot add SSH user with zero target nodes') +def _check_settings_for_auto_pool(config): + # type: (dict) -> None + """Check settings for autopool + :param dict config: configuration dict + """ + # check glusterfs on compute + try: + sdv = settings.global_resources_shared_data_volumes(config) + for sdvkey in sdv: + if settings.is_shared_data_volume_gluster_on_compute(sdv, sdvkey): + raise ValueError( + 'GlusterFS on compute is not possible with autopool') + break + except KeyError: + pass + # get settings + pool = settings.pool_settings(config) + # check local data movement to pool + if pool.transfer_files_on_pool_creation: + raise ValueError('Cannot ingress data on pool creation with autopool') + # check ssh + if util.is_not_empty(pool.ssh.username): + logger.warning('cannot add SSH user with autopool') + + def action_fs_disks_add(resource_client, compute_client, config): # type: (azure.mgmt.resource.resources.ResourceManagementClient, # azure.mgmt.compute.ComputeManagementClient, dict) -> None @@ -2429,21 +2539,69 @@ def action_pool_autoscale_lastexec(batch_client, config): def action_jobs_add( - batch_client, blob_client, keyvault_client, config, recreate, tail): - # type: (batchsc.BatchServiceClient, azureblob.BlockBlobService, + resource_client, compute_client, network_client, batch_mgmt_client, + batch_client, blob_client, table_client, keyvault_client, config, + recreate, tail): + # type: (azure.mgmt.resource.resources.ResourceManagementClient, + # azure.mgmt.compute.ComputeManagementClient, + # azure.mgmt.network.NetworkManagementClient, + # azure.mgmt.batch.BatchManagementClient, + # azure.batch.batch_service_client.BatchServiceClient, + # azureblob.BlockBlobService, azuretable.TableService, # azure.keyvault.KeyVaultClient, dict, bool, str) -> None """Action: Jobs Add + :param azure.mgmt.resource.resources.ResourceManagementClient + resource_client: resource client + :param azure.mgmt.compute.ComputeManagementClient compute_client: + compute client + :param azure.mgmt.network.NetworkManagementClient network_client: + network client + :param azure.mgmt.batch.BatchManagementClient: batch_mgmt_client :param azure.batch.batch_service_client.BatchServiceClient batch_client: batch client :param azure.storage.blob.BlockBlobService blob_client: blob client + :param azure.storage.table.TableService table_client: table client :param azure.keyvault.KeyVaultClient keyvault_client: keyvault client :param dict config: configuration dict :param bool recreate: recreate jobs if completed :param str tail: file to tail or last job and task added """ + # check for job autopools + autopool = batch.check_jobs_for_auto_pool(config) + if autopool: + # check to ensure pool id is within 20 chars + pool_id = settings.pool_id(config) + if len(pool_id) > 20: + raise ValueError( + 'pool id must be less than 21 characters: {}'.format(pool_id)) + # check if a pool id with existing pool id exists + try: + batch_client.pool.get(pool_id) + except batchmodels.BatchErrorException as ex: + if 'The specified pool does not exist' in ex.message.value: + pass + else: + raise RuntimeError( + 'pool with id of {} already exists'.format(pool_id)) + # create storage containers and clear + storage.create_storage_containers(blob_client, table_client, config) + storage.clear_storage_containers(blob_client, table_client, config) + _adjust_settings_for_pool_creation(config) + storage.populate_global_resource_blobs( + blob_client, table_client, config) + # create autopool specification object + autopool = _construct_auto_pool_specification( + resource_client, compute_client, network_client, batch_mgmt_client, + batch_client, blob_client, config + ) + # check settings and warn + _check_settings_for_auto_pool(config) + else: + autopool = None + # add jobs batch.add_jobs( - batch_client, blob_client, keyvault_client, config, _JOBPREP_FILE, - _BLOBXFER_FILE, recreate, tail) + batch_client, blob_client, keyvault_client, config, autopool, + _JOBPREP_FILE, _BLOBXFER_FILE, recreate, tail) def action_jobs_list(batch_client, config): @@ -2513,11 +2671,18 @@ def action_jobs_deltasks(batch_client, config, jobid, taskid, wait): batch_client, config, jobid=jobid, taskid=taskid, wait=wait) -def action_jobs_term(batch_client, config, all, jobid, termtasks, wait): - # type: (batchsc.BatchServiceClient, dict, bool, str, bool, bool) -> None +def action_jobs_term( + batch_client, blob_client, queue_client, table_client, config, all, + jobid, termtasks, wait): + # type: (batchsc.BatchServiceClient, azureblob.BlockBlobService, + # azurequeue.QueueService, azuretable.TableService, dict, bool, + # str, bool, bool) -> None """Action: Jobs Term :param azure.batch.batch_service_client.BatchServiceClient batch_client: batch client + :param azure.storage.blob.BlockBlobService blob_client: blob client + :param azure.storage.queue.QueueService queue_client: queue client + :param azure.storage.table.TableService table_client: table client :param dict config: configuration dict :param bool all: all jobs :param str jobid: job id @@ -2530,15 +2695,42 @@ def action_jobs_term(batch_client, config, all, jobid, termtasks, wait): batch.terminate_all_jobs( batch_client, config, termtasks=termtasks, wait=wait) else: + # check for autopool + if util.is_none_or_empty(jobid): + autopool = batch.check_jobs_for_auto_pool(config) + if autopool: + # check if a pool id with existing pool id exists + try: + batch_client.pool.get(settings.pool_id(config)) + except batchmodels.BatchErrorException as ex: + if 'The specified pool does not exist' in ex.message.value: + pass + else: + autopool = False + else: + autopool = False + # terminate the jobs batch.terminate_jobs( batch_client, config, jobid=jobid, termtasks=termtasks, wait=wait) + # if autopool, delete the storage + if autopool: + # TODO remove queue_client in 3.0 + storage.cleanup_with_del_pool( + blob_client, queue_client, table_client, config) -def action_jobs_del(batch_client, config, all, jobid, termtasks, wait): - # type: (batchsc.BatchServiceClient, dict, bool, str, bool, bool) -> None +def action_jobs_del( + batch_client, blob_client, queue_client, table_client, config, all, + jobid, termtasks, wait): + # type: (batchsc.BatchServiceClient, azureblob.BlockBlobService, + # azurequeue.QueueService, azuretable.TableService, dict, bool, + # str, bool, bool) -> None """Action: Jobs Del :param azure.batch.batch_service_client.BatchServiceClient batch_client: batch client + :param azure.storage.blob.BlockBlobService blob_client: blob client + :param azure.storage.queue.QueueService queue_client: queue client + :param azure.storage.table.TableService table_client: table client :param dict config: configuration dict :param bool all: all jobs :param str jobid: job id @@ -2551,8 +2743,28 @@ def action_jobs_del(batch_client, config, all, jobid, termtasks, wait): batch.del_all_jobs( batch_client, config, termtasks=termtasks, wait=wait) else: + # check for autopool + if util.is_none_or_empty(jobid): + autopool = batch.check_jobs_for_auto_pool(config) + if autopool: + # check if a pool id with existing pool id exists + try: + batch_client.pool.get(settings.pool_id(config)) + except batchmodels.BatchErrorException as ex: + if 'The specified pool does not exist' in ex.message.value: + pass + else: + autopool = False + else: + autopool = False + # delete the jobs batch.del_jobs( batch_client, config, jobid=jobid, termtasks=termtasks, wait=wait) + # if autopool, delete the storage + if autopool: + # TODO remove queue_client in 3.0 + storage.cleanup_with_del_pool( + blob_client, queue_client, table_client, config) def action_jobs_cmi(batch_client, config, delete): @@ -2645,32 +2857,43 @@ def action_jobs_enable(batch_client, config, jobid): def action_storage_del( - blob_client, queue_client, table_client, config, clear_tables): + blob_client, queue_client, table_client, config, clear_tables, poolid): # type: (azureblob.BlockBlobService, azurequeue.QueueService, - # azuretable.TableService, dict, bool) -> None + # azuretable.TableService, dict, bool, str) -> None """Action: Storage Del :param azure.storage.blob.BlockBlobService blob_client: blob client :param azure.storage.queue.QueueService queue_client: queue client :param azure.storage.table.TableService table_client: table client :param dict config: configuration dict :param bool clear_tables: clear tables instead of deleting + :param str poolid: pool id to target """ + # reset storage settings to target poolid + if util.is_not_empty(poolid): + populate_global_settings(config, False, pool_id=poolid) if clear_tables: storage.clear_storage_containers( - blob_client, queue_client, table_client, config, tables_only=True) + blob_client, table_client, config, tables_only=True, + pool_id=poolid) storage.delete_storage_containers( blob_client, queue_client, table_client, config, skip_tables=clear_tables) -def action_storage_clear(blob_client, table_client, config): - # type: (azureblob.BlockBlobService, azuretable.TableService, dict) -> None +def action_storage_clear(blob_client, table_client, config, poolid): + # type: (azureblob.BlockBlobService, azuretable.TableService, dict, + # str) -> None """Action: Storage Clear :param azure.storage.blob.BlockBlobService blob_client: blob client :param azure.storage.table.TableService table_client: table client :param dict config: configuration dict + :param str poolid: pool id to target """ - storage.clear_storage_containers(blob_client, table_client, config) + # reset storage settings to target poolid + if util.is_not_empty(poolid): + populate_global_settings(config, False, pool_id=poolid) + storage.clear_storage_containers( + blob_client, table_client, config, pool_id=poolid) def action_data_stream(batch_client, config, filespec, disk): diff --git a/convoy/settings.py b/convoy/settings.py index cf2ae05..b53cb35 100644 --- a/convoy/settings.py +++ b/convoy/settings.py @@ -112,6 +112,12 @@ PoolAutoscaleSettings = collections.namedtuple( 'scenario', ] ) +PoolAutopoolSettings = collections.namedtuple( + 'PoolAutopoolSettings', [ + 'pool_lifetime', + 'keep_alive', + ] +) PoolSettings = collections.namedtuple( 'PoolSettings', [ 'id', 'vm_size', 'vm_count', 'resize_timeout', 'max_tasks_per_node', @@ -2074,6 +2080,24 @@ def job_auto_complete(conf): return ac +def job_auto_pool(conf): + # type: (dict) -> PoolAutopoolSettings + """Get job autopool setting + :param dict conf: job configuration object + :rtype: PoolAutopoolSettings + :return: job autopool settings + """ + ap = _kv_read_checked(conf, 'auto_pool') + if ap is not None: + return PoolAutopoolSettings( + pool_lifetime=_kv_read_checked( + ap, 'pool_lifetime', 'job').lower(), + keep_alive=_kv_read(ap, 'keep_alive', False), + ) + else: + return None + + def job_priority(conf): # type: (dict) -> int """Get job priority setting @@ -2303,9 +2327,9 @@ def task_settings(cloud_pool, config, poolconf, jobspec, conf, missing_images): sku = None node_agent = poolconf.vm_configuration.node_agent else: - publisher = poolconf.publisher.lower() - offer = poolconf.offer.lower() - sku = poolconf.sku.lower() + publisher = poolconf.vm_configuration.publisher.lower() + offer = poolconf.vm_configuration.offer.lower() + sku = poolconf.vm_configuration.sku.lower() else: pool_id = cloud_pool.id vm_size = cloud_pool.vm_size.lower() diff --git a/convoy/storage.py b/convoy/storage.py index 3fa46b1..9138b98 100644 --- a/convoy/storage.py +++ b/convoy/storage.py @@ -215,15 +215,18 @@ def create_file_share_saskey( ) -def _construct_partition_key_from_config(config): - # type: (dict) -> str +def _construct_partition_key_from_config(config, pool_id=None): + # type: (dict, str) -> str """Construct partition key from config :param dict config: configuration dict + :param str pool_id: use specified pool id instead :rtype: str :return: partition key """ + if util.is_none_or_empty(pool_id): + pool_id = settings.pool_id(config, lower=True) return '{}${}'.format( - settings.credentials_batch(config).account, settings.pool_id(config)) + settings.credentials_batch(config).account, pool_id) def _add_global_resource( @@ -408,9 +411,13 @@ def _clear_blobs(blob_client, container): :param str container: container to clear blobs from """ logger.info('deleting blobs: {}'.format(container)) - blobs = blob_client.list_blobs(container) - for blob in blobs: - blob_client.delete_blob(container, blob.name) + try: + blobs = blob_client.list_blobs(container) + except azure.common.AzureMissingResourceHttpError: + logger.warning('container not found: {}'.format(container)) + else: + for blob in blobs: + blob_client.delete_blob(container, blob.name) def _clear_blob_task_resourcefiles(blob_client, container, config): @@ -423,19 +430,24 @@ def _clear_blob_task_resourcefiles(blob_client, container, config): bs = settings.batch_shipyard_settings(config) envfileloc = '{}taskrf-'.format(bs.storage_entity_prefix) logger.info('deleting blobs with prefix: {}'.format(envfileloc)) - blobs = blob_client.list_blobs(container, prefix=envfileloc) - for blob in blobs: - blob_client.delete_blob(container, blob.name) + try: + blobs = blob_client.list_blobs(container, prefix=envfileloc) + except azure.common.AzureMissingResourceHttpError: + logger.warning('container not found: {}'.format(container)) + else: + for blob in blobs: + blob_client.delete_blob(container, blob.name) -def _clear_table(table_client, table_name, config): +def _clear_table(table_client, table_name, config, pool_id=None): + # type: (azuretable.TableService, str, dict, str) -> None """Clear table entities :param azure.storage.table.TableService table_client: table client :param str table_name: table name :param dict config: configuration dict + :param str pool_id: use specified pool id instead """ - # type: (azuretable.TableService, str, dict) -> None - pk = _construct_partition_key_from_config(config) + pk = _construct_partition_key_from_config(config, pool_id=pool_id) logger.debug('clearing table (pk={}): {}'.format(pk, table_name)) ents = table_client.query_entities( table_name, filter='PartitionKey eq \'{}\''.format(pk)) @@ -454,14 +466,15 @@ def _clear_table(table_client, table_name, config): def clear_storage_containers( - blob_client, table_client, config, tables_only=False): + blob_client, table_client, config, tables_only=False, pool_id=None): # type: (azureblob.BlockBlobService, azuretable.TableService, dict, - # bool) -> None + # bool, str) -> None """Clear storage containers :param azure.storage.blob.BlockBlobService blob_client: blob client :param azure.storage.table.TableService table_client: table client :param dict config: configuration dict :param bool tables_only: clear only tables + :param str pool_id: use specified pool id instead """ bs = settings.batch_shipyard_settings(config) for key in _STORAGE_CONTAINERS: @@ -470,7 +483,9 @@ def clear_storage_containers( _clear_blobs(blob_client, _STORAGE_CONTAINERS[key]) elif key.startswith('table_'): try: - _clear_table(table_client, _STORAGE_CONTAINERS[key], config) + _clear_table( + table_client, _STORAGE_CONTAINERS[key], config, + pool_id=pool_id) except azure.common.AzureMissingResourceHttpError: if key != 'table_perf' or bs.store_timing_metrics: raise diff --git a/docs/14-batch-shipyard-configuration-jobs.md b/docs/14-batch-shipyard-configuration-jobs.md index 22896e4..65ed29d 100644 --- a/docs/14-batch-shipyard-configuration-jobs.md +++ b/docs/14-batch-shipyard-configuration-jobs.md @@ -19,7 +19,6 @@ The jobs schema is as follows: "max_wall_time": "02:00:00", "retention_time": "1.12:00:00", "priority": 0, - "allow_run_on_missing_image": false, "user_identity": { "default_pool_admin": true, "specific_user": { @@ -27,6 +26,11 @@ The jobs schema is as follows: "gid": 1000 } }, + "auto_pool": { + "pool_lifetime": "job", + "keep_alive": false + }, + "allow_run_on_missing_image": false, "remove_container_after_exit": true, "shm_size": "256m", "infiniband": false, @@ -188,12 +192,6 @@ Tasks within jobs with higher priority are run ahead of those with lower priority, however, tasks that are already running with lower priority are not preempted. Valid values are within the range of [-1000, 1000] and the default is `0`. -* (optional) `allow_run_on_missing_image` allows tasks with a Docker image -reference that was not pre-loaded on to the compute node via -`global_resources`:`docker_images` in the global configuration to be able to -run. Note that you should attempt to specify all Docker images that you intend -to run in the `global_resources`:`docker_images` property in the global -configuration to minimize scheduling to task execution latency. * (optional) `user_identity` property is to define which user to run the container as. By default, if this property is not defined, the container will be run as the root user. However, it may be required to run the container @@ -208,6 +206,25 @@ mutually exclusive of one another. user. * (required) `uid` is the user id of the user * (required) `gid` is the group id of the user +* (optional) `auto_pool` will create a compute pool on demand for +the job as specified in the pool configuration. Note that storage resources +required by Batch Shipyard may not be automatically cleaned up when using +autopools. Utilizing `jobs term` or `jobs del` without any jobid scoping +will attempt to clean up storage resources. Otherwise, you will need to use +`storage del` or `storage clear` to clean up storage resources manually. + * (optional) `pool_lifetime` specifies the lifetime of the pool. Valid + values are `job` and `job_schedule`. You may not specify `job_schedule` + for non-recurring jobs. The default is `job`. + * (optional) `keep_alive` specifies if the pool should be kept even after + its lifetime expires. The default is `false`. Note that setting this + value to `false` and setting `auto_complete` to `true` will automatically + delete the compute pool once all tasks under the job complete. +* (optional) `allow_run_on_missing_image` allows tasks with a Docker image +reference that was not pre-loaded on to the compute node via +`global_resources`:`docker_images` in the global configuration to be able to +run. Note that you should attempt to specify all Docker images that you intend +to run in the `global_resources`:`docker_images` property in the global +configuration to minimize scheduling to task execution latency. * (optional) `remove_container_after_exit` property specifies if all containers under the job should be automatically removed/cleaned up after the task exits. This defaults to `false`. diff --git a/docs/20-batch-shipyard-usage.md b/docs/20-batch-shipyard-usage.md index ccdf58f..020a452 100644 --- a/docs/20-batch-shipyard-usage.md +++ b/docs/20-batch-shipyard-usage.md @@ -357,7 +357,9 @@ to the Batch pool sub-command is typically not required if `multi_instance_auto_complete` is set to `true` in the job specification for the job. * `--delete` will delete any stale cleanup jobs -* `del` will delete jobs specified in the jobs configuration file +* `del` will delete jobs specified in the jobs configuration file. If an +autopool is specified for all jobs and a jobid option is not specified, +the storage associated with the autopool will be cleaned up. * `--all` will delete all jobs found in the Batch account * `--jobid` force deletion scope to just this job id * `--termtasks` will manually terminate tasks prior to deletion. Termination @@ -386,7 +388,9 @@ file * `--requeue` requeue running tasks * `--terminate` terminate running tasks * `--wait` wait for running tasks to complete -* `term` will terminate jobs found in the jobs configuration file +* `term` will terminate jobs found in the jobs configuration file. If an +autopool is specified for all jobs and a jobid option is not specified, +the storage associated with the autopool will be cleaned up. * `--all` will terminate all jobs found in the Batch account * `--jobid` force termination scope to just this job id * `--termtasks` will manually terminate tasks prior to termination. @@ -523,8 +527,11 @@ The `storage` command has the following sub-commands: ``` * `clear` will clear the Azure Storage containers used by Batch Shipyard for metadata purposes + * `--poolid` will target a specific pool id rather than from configuration * `del` will delete the Azure Storage containers used by Batch Shipyard for metadata purposes + * `--clear-tables` will clear tables instead of deleting them + * `--poolid` will target a specific pool id ## Example Invocations ```shell diff --git a/docs/96-troubleshooting-guide.md b/docs/96-troubleshooting-guide.md index b3d9d57..7a91b79 100644 --- a/docs/96-troubleshooting-guide.md +++ b/docs/96-troubleshooting-guide.md @@ -85,6 +85,16 @@ Additionally, if you have specified an SSH user for your pool and there is a start task failure, you can still issue the command `pool asu` to add the pool SSH user and then `pool ssh` to SSH into the node to debug further. +Please note that the start task requires downloading some files that are +uploaded to your Azure Storage account with the command `pool add`. These +files have SAS tokens which allow the Batch compute node to authenticate +with the Azure Storage service to download the files. These SAS tokens are +bound to the storage account key for which they were generated with. If you +change/regenerate your storage account key that these SAS tokens were +originally generated with, then the compute nodes will fail to start as +these files will no longer be able to be downloaded. You will need to +recreate your pool in these situations. + #### Compute Node does not start or is unusable If the compute node is "stuck" in starting state or enters unusable state, this indicates that there was an issue allocating the node from the Azure diff --git a/shipyard.py b/shipyard.py index 67dd013..bedfceb 100755 --- a/shipyard.py +++ b/shipyard.py @@ -907,28 +907,32 @@ def storage(ctx): @storage.command('del') @click.option( '--clear-tables', is_flag=True, help='Clear tables instead of deleting') +@click.option( + '--poolid', help='Delete storage containers for the specified pool') @common_options @batch_options @keyvault_options @pass_cli_context -def storage_del(ctx, clear_tables): +def storage_del(ctx, clear_tables, poolid): """Delete Azure Storage containers used by Batch Shipyard""" ctx.initialize_for_storage() convoy.fleet.action_storage_del( ctx.blob_client, ctx.queue_client, ctx.table_client, ctx.config, - clear_tables) + clear_tables, poolid) @storage.command('clear') +@click.option( + '--poolid', help='Clear storage containers for the specified pool') @common_options @batch_options @keyvault_options @pass_cli_context -def storage_clear(ctx): +def storage_clear(ctx, poolid): """Clear Azure Storage containers used by Batch Shipyard""" ctx.initialize_for_storage() convoy.fleet.action_storage_clear( - ctx.blob_client, ctx.table_client, ctx.config) + ctx.blob_client, ctx.table_client, ctx.config, poolid) @cli.group() @@ -1327,8 +1331,9 @@ def jobs_add(ctx, recreate, tail): """Add jobs""" ctx.initialize_for_batch() convoy.fleet.action_jobs_add( - ctx.batch_client, ctx.blob_client, ctx.keyvault_client, ctx.config, - recreate, tail) + ctx.resource_client, ctx.compute_client, ctx.network_client, + ctx.batch_mgmt_client, ctx.batch_client, ctx.blob_client, + ctx.table_client, ctx.keyvault_client, ctx.config, recreate, tail) @jobs.command('list') @@ -1402,7 +1407,8 @@ def jobs_term(ctx, all, jobid, termtasks, wait): """Terminate jobs""" ctx.initialize_for_batch() convoy.fleet.action_jobs_term( - ctx.batch_client, ctx.config, all, jobid, termtasks, wait) + ctx.batch_client, ctx.blob_client, ctx.queue_client, ctx.table_client, + ctx.config, all, jobid, termtasks, wait) @jobs.command('del') @@ -1423,7 +1429,8 @@ def jobs_del(ctx, all, jobid, termtasks, wait): """Delete jobs""" ctx.initialize_for_batch() convoy.fleet.action_jobs_del( - ctx.batch_client, ctx.config, all, jobid, termtasks, wait) + ctx.batch_client, ctx.blob_client, ctx.queue_client, ctx.table_client, + ctx.config, all, jobid, termtasks, wait) @jobs.command('deltasks')