- Resolves #33
- Add --poolid to storage clear and storage del
- jobs del and jobs term now cleanup storage data if autopool is
  detected
This commit is contained in:
Fred Park 2017-07-21 10:32:04 -07:00
Родитель 30ea8c280f
Коммит e32fc4d93e
11 изменённых файлов: 454 добавлений и 88 удалений

Просмотреть файл

@ -51,6 +51,8 @@ full pass-through of the
[Azure Batch API](https://azure.microsoft.com/en-us/documentation/articles/batch-api-basics/)
to containers executed on compute nodes
* Support for [Low Priority Compute Nodes](https://docs.microsoft.com/en-us/azure/batch/batch-low-pri-vms)
* Support for Azure Batch "auto" concepts, including [autoscale](https://github.com/Azure/batch-shipyard/blob/master/docs/30-batch-shipyard-autoscale.md) and autopool
to dynamically scale and control computing resources on-demand
* Support for
[Azure Batch task dependencies](https://azure.microsoft.com/en-us/documentation/articles/batch-task-dependencies/)
allowing complex processing pipelines and DAGs with Docker containers

Просмотреть файл

@ -63,12 +63,10 @@ _TORRENT_STATE = [
'seeding', 'allocating', 'checking fastresume'
]
_TORRENT_SESSION = None
_BATCHACCOUNT = os.environ['AZ_BATCH_ACCOUNT_NAME']
_POOLID = os.environ['AZ_BATCH_POOL_ID']
_NODEID = os.environ['AZ_BATCH_NODE_ID']
_SHARED_DIR = os.environ['AZ_BATCH_NODE_SHARED_DIR']
_TORRENT_DIR = pathlib.Path(_SHARED_DIR, '.torrents')
_PARTITION_KEY = '{}${}'.format(_BATCHACCOUNT, _POOLID)
_PARTITION_KEY = None
_LR_LOCK_ASYNC = asyncio.Lock()
_PT_LOCK = threading.Lock()
_DIRECTDL_LOCK = threading.Lock()
@ -140,22 +138,32 @@ def _setup_logger() -> None:
logger.info('logger initialized, log file: {}'.format(logloc))
def _setup_container_names(sep: str) -> None:
"""Set up storage container names
def _setup_storage_names(sep: str) -> None:
"""Set up storage names
:param str sep: storage container prefix
"""
global _PARTITION_KEY, _PREFIX
# transform pool id if necessary
poolid = os.environ['AZ_BATCH_POOL_ID'].lower()
autopool = os.environ.get('SHIPYARD_AUTOPOOL', default=None)
# remove guid portion of pool id if autopool
if autopool is not None:
poolid = poolid[:-37]
# set partition key
batchaccount = os.environ['AZ_BATCH_ACCOUNT_NAME'].lower()
_PARTITION_KEY = '{}${}'.format(batchaccount, poolid)
# set container names
if sep is None or len(sep) == 0:
raise ValueError('storage_entity_prefix is invalid')
_STORAGE_CONTAINERS['blob_globalresources'] = '-'.join(
(sep + 'gr', _BATCHACCOUNT.lower(), _POOLID.lower()))
(sep + 'gr', batchaccount, poolid))
_STORAGE_CONTAINERS['blob_torrents'] = '-'.join(
(sep + 'tor', _BATCHACCOUNT.lower(), _POOLID.lower()))
(sep + 'tor', batchaccount, poolid))
_STORAGE_CONTAINERS['table_dht'] = sep + 'dht'
_STORAGE_CONTAINERS['table_registry'] = sep + 'registry'
_STORAGE_CONTAINERS['table_torrentinfo'] = sep + 'torrentinfo'
_STORAGE_CONTAINERS['table_images'] = sep + 'images'
_STORAGE_CONTAINERS['table_globalresources'] = sep + 'gr'
global _PREFIX
_PREFIX = sep
@ -1193,7 +1201,7 @@ async def _get_ipaddress_async(loop: asyncio.BaseEventLoop) -> str:
def main():
"""Main function"""
global _ENABLE_P2P, _CONCURRENT_DOWNLOADS_ALLOWED, \
_ALLOW_PUBLIC_PULL_WITH_PRIVATE
_ALLOW_PUBLIC_PULL_WITH_PRIVATE, _POOL_ID
# get command-line args
args = parseargs()
p2popts = args.p2popts.split(':')
@ -1235,8 +1243,8 @@ def main():
ipaddress = args.ipaddress
logger.debug('ip address: {}'.format(ipaddress))
# set up container names
_setup_container_names(args.prefix)
# set up storage names
_setup_storage_names(args.prefix)
# create storage credentials
blob_client, table_client = _create_credentials()

Просмотреть файл

@ -11,7 +11,6 @@
"max_wall_time": "02:00:00",
"retention_time": "1.12:00:00",
"priority": 0,
"allow_run_on_missing_image": false,
"user_identity": {
"default_pool_admin": true,
"specific_user": {
@ -19,6 +18,11 @@
"gid": 1001
}
},
"auto_pool": {
"pool_lifetime": "job",
"keep_alive": false
},
"allow_run_on_missing_image": false,
"remove_container_after_exit": true,
"shm_size": "256m",
"infiniband": false,

Просмотреть файл

@ -2302,6 +2302,32 @@ def generate_docker_login_settings(config, for_ssh=False):
return env, cmd
def check_jobs_for_auto_pool(config):
# type: (dict) -> bool
"""Check jobs for auto pool
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
:rtype: bool
:return: if auto pool is enabled
"""
# ensure all jobspecs uniformly have autopool or all off
autopool = []
for jobspec in settings.job_specifications(config):
if settings.job_auto_pool(jobspec) is None:
autopool.append(False)
else:
autopool.append(True)
if autopool.count(False) == len(autopool):
logger.debug('autopool not detected for jobs')
return False
elif autopool.count(True) == len(autopool):
logger.debug('autopool detected for jobs')
return True
else:
raise ValueError('all jobs must have auto_pool enabled or disabled')
def _format_generic_task_id(tasknum):
# type: (int) -> str
"""Format a generic task id from a task number
@ -2425,17 +2451,18 @@ def _add_task_collection(batch_client, job_id, task_map):
def add_jobs(
batch_client, blob_client, keyvault_client, config, jpfile, bxfile,
recreate=False, tail=None):
batch_client, blob_client, keyvault_client, config, autopool, jpfile,
bxfile, recreate=False, tail=None):
# type: (batch.BatchServiceClient, azureblob.BlockBlobService,
# azure.keyvault.KeyVaultClient, dict, tuple, tuple, bool,
# str) -> None
# azure.keyvault.KeyVaultClient, dict,
# batchmodels.PoolSpecification, tuple, tuple, bool, str) -> None
"""Add jobs
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param azure.keyvault.KeyVaultClient keyvault_client: keyvault client
:param dict config: configuration dict
:param batchmodels.PoolSpecification autopool: auto pool specification
:param tuple jpfile: jobprep file
:param tuple bxfile: blobxfer file
:param bool recreate: recreate job if completed
@ -2447,16 +2474,17 @@ def add_jobs(
try:
cloud_pool = batch_client.pool.get(pool.id)
except batchmodels.batch_error.BatchErrorException as ex:
if 'The specified pool does not exist.' in ex.message.value:
logger.error('{} pool does not exist'.format(pool.id))
if util.confirm_action(
config, 'add jobs to nonexistant pool {}'.format(pool.id)):
cloud_pool = None
else:
logger.error(
'not submitting jobs to nonexistant pool {}'.format(
pool.id))
return
if 'The specified pool does not exist' in ex.message.value:
cloud_pool = None
if autopool is None:
logger.error('{} pool does not exist'.format(pool.id))
if not util.confirm_action(
config,
'add jobs to nonexistant pool {}'.format(pool.id)):
logger.error(
'not submitting jobs to nonexistant pool {}'.format(
pool.id))
return
else:
raise
preg = settings.docker_registry_private_settings(config)
@ -2572,10 +2600,32 @@ def add_jobs(
user_identity=_RUN_ELEVATED,
rerun_on_node_reboot_after_success=False,
)
# construct pool info
if autopool is None:
pool_info = batchmodels.PoolInformation(pool_id=pool.id)
else:
autopool_settings = settings.job_auto_pool(jobspec)
if autopool_settings is None:
raise ValueError(
'auto_pool settings is invalid for job {}'.format(
settings.job_id(jobspec)))
if autopool_settings.pool_lifetime == 'job_schedule':
autopool_plo = batchmodels.PoolLifetimeOption.job_schedule
else:
autopool_plo = batchmodels.PoolLifetimeOption(
autopool_settings.pool_lifetime)
pool_info = batchmodels.PoolInformation(
auto_pool_specification=batchmodels.AutoPoolSpecification(
auto_pool_id_prefix=pool.id,
pool_lifetime_option=autopool_plo,
keep_alive=autopool_settings.keep_alive,
pool=autopool,
)
)
# create job
job = batchmodels.JobAddParameter(
id=settings.job_id(jobspec),
pool_info=batchmodels.PoolInformation(pool_id=pool.id),
pool_info=pool_info,
constraints=job_constraints,
uses_task_dependencies=uses_task_dependencies,
job_preparation_task=jptask,
@ -2820,7 +2870,6 @@ def add_jobs(
batch_client.job.patch(
job_id=job.id,
job_patch_parameter=batchmodels.JobPatchParameter(
pool_info=batchmodels.PoolInformation(pool_id=pool.id),
on_all_tasks_complete=batchmodels.
OnAllTasksComplete.terminate_job))
# tail file if specified

Просмотреть файл

@ -247,11 +247,12 @@ def check_for_invalid_config(config):
'property.')
def populate_global_settings(config, fs_storage):
def populate_global_settings(config, fs_storage, pool_id=None):
# type: (dict, bool) -> None
"""Populate global settings from config
:param dict config: configuration dict
:param bool fs_storage: adjust for fs context
:param str pool_id: pool id override
"""
bs = settings.batch_shipyard_settings(config)
sc = settings.credentials_storage(config, bs.storage_account_settings)
@ -259,10 +260,13 @@ def populate_global_settings(config, fs_storage):
# set postfix to empty for now, it will be populated with the
# storage cluster during the actual calls
postfix = ''
if util.is_not_empty(pool_id):
raise ValueError('pool id specified for fs_storage')
else:
bc = settings.credentials_batch(config)
postfix = '-'.join(
(bc.account.lower(), settings.pool_id(config, lower=True)))
if util.is_none_or_empty(pool_id):
pool_id = settings.pool_id(config, lower=True)
postfix = '-'.join((bc.account.lower(), pool_id))
storage.set_storage_configuration(
bs.storage_entity_prefix,
postfix,
@ -671,7 +675,7 @@ def _create_storage_cluster_mount_args(
return (fstab_mount, sc_arg)
def _add_pool(
def _construct_pool_object(
resource_client, compute_client, network_client, batch_mgmt_client,
batch_client, blob_client, config):
# type: (azure.mgmt.resource.resources.ResourceManagementClient,
@ -680,7 +684,8 @@ def _add_pool(
# azure.mgmt.batch.BatchManagementClient,
# azure.batch.batch_service_client.BatchServiceClient,
# azureblob.BlockBlobService, dict) -> None
"""Add a Batch pool to account
"""Construct a pool add parameter object for create pool along with
uploading resource files
:param azure.mgmt.resource.resources.ResourceManagementClient
resource_client: resource client
:param azure.mgmt.compute.ComputeManagementClient compute_client:
@ -820,12 +825,6 @@ def _add_pool(
block_for_gr = ','.join([x for x in images])
else:
logger.warning('no docker images specified in global resources')
# ingress data to Azure Blob Storage if specified
storage_threads = []
if pool_settings.transfer_files_on_pool_creation:
storage_threads = data.ingress_data(
batch_client, compute_client, network_client, config, rls=None,
kind='storage')
# shipyard settings
bs = settings.batch_shipyard_settings(config)
# data replication and peer-to-peer settings
@ -1089,6 +1088,92 @@ def _add_pool(
)
pool.start_task.environment_settings.extend(
batch.generate_docker_login_settings(config)[0])
return (pool_settings, gluster_on_compute, pool)
def _construct_auto_pool_specification(
resource_client, compute_client, network_client, batch_mgmt_client,
batch_client, blob_client, config):
# type: (azure.mgmt.resource.resources.ResourceManagementClient,
# azure.mgmt.compute.ComputeManagementClient,
# azure.mgmt.network.NetworkManagementClient,
# azure.mgmt.batch.BatchManagementClient,
# azure.batch.batch_service_client.BatchServiceClient,
# azureblob.BlockBlobService, dict) -> None
"""Construct an auto pool specification
:param azure.mgmt.resource.resources.ResourceManagementClient
resource_client: resource client
:param azure.mgmt.compute.ComputeManagementClient compute_client:
compute client
:param azure.mgmt.network.NetworkManagementClient network_client:
network client
:param azure.mgmt.batch.BatchManagementClient: batch_mgmt_client
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
batch client
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
"""
# upload resource files and construct pool add parameter object
pool_settings, gluster_on_compute, pool = _construct_pool_object(
resource_client, compute_client, network_client, batch_mgmt_client,
batch_client, blob_client, config)
# convert pool add parameter object to a pool specification object
poolspec = batchmodels.PoolSpecification(
vm_size=pool.vm_size,
virtual_machine_configuration=pool.virtual_machine_configuration,
max_tasks_per_node=pool.max_tasks_per_node,
task_scheduling_policy=pool.task_scheduling_policy,
resize_timeout=pool.resize_timeout,
target_dedicated_nodes=pool.target_dedicated_nodes,
target_low_priority_nodes=pool.target_low_priority_nodes,
enable_auto_scale=pool.enable_auto_scale,
auto_scale_formula=pool.auto_scale_formula,
auto_scale_evaluation_interval=pool.auto_scale_evaluation_interval,
enable_inter_node_communication=pool.enable_inter_node_communication,
network_configuration=pool.network_configuration,
start_task=pool.start_task,
certificate_references=pool.certificate_references,
metadata=pool.metadata,
)
# add auto pool env var for cascade
poolspec.start_task.environment_settings.append(
batchmodels.EnvironmentSetting('SHIPYARD_AUTOPOOL', 1)
)
return poolspec
def _add_pool(
resource_client, compute_client, network_client, batch_mgmt_client,
batch_client, blob_client, config):
# type: (azure.mgmt.resource.resources.ResourceManagementClient,
# azure.mgmt.compute.ComputeManagementClient,
# azure.mgmt.network.NetworkManagementClient,
# azure.mgmt.batch.BatchManagementClient,
# azure.batch.batch_service_client.BatchServiceClient,
# azureblob.BlockBlobService, dict) -> None
"""Add a Batch pool to account
:param azure.mgmt.resource.resources.ResourceManagementClient
resource_client: resource client
:param azure.mgmt.compute.ComputeManagementClient compute_client:
compute client
:param azure.mgmt.network.NetworkManagementClient network_client:
network client
:param azure.mgmt.batch.BatchManagementClient: batch_mgmt_client
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
batch client
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param dict config: configuration dict
"""
# upload resource files and construct pool add parameter object
pool_settings, gluster_on_compute, pool = _construct_pool_object(
resource_client, compute_client, network_client, batch_mgmt_client,
batch_client, blob_client, config)
# ingress data to Azure Blob Storage if specified
storage_threads = []
if pool_settings.transfer_files_on_pool_creation:
storage_threads = data.ingress_data(
batch_client, compute_client, network_client, config, rls=None,
kind='storage')
# create pool
nodes = batch.create_pool(batch_client, config, pool)
_pool = batch_client.pool.get(pool.id)
@ -1721,6 +1806,31 @@ def _adjust_settings_for_pool_creation(config):
logger.warning('cannot add SSH user with zero target nodes')
def _check_settings_for_auto_pool(config):
# type: (dict) -> None
"""Check settings for autopool
:param dict config: configuration dict
"""
# check glusterfs on compute
try:
sdv = settings.global_resources_shared_data_volumes(config)
for sdvkey in sdv:
if settings.is_shared_data_volume_gluster_on_compute(sdv, sdvkey):
raise ValueError(
'GlusterFS on compute is not possible with autopool')
break
except KeyError:
pass
# get settings
pool = settings.pool_settings(config)
# check local data movement to pool
if pool.transfer_files_on_pool_creation:
raise ValueError('Cannot ingress data on pool creation with autopool')
# check ssh
if util.is_not_empty(pool.ssh.username):
logger.warning('cannot add SSH user with autopool')
def action_fs_disks_add(resource_client, compute_client, config):
# type: (azure.mgmt.resource.resources.ResourceManagementClient,
# azure.mgmt.compute.ComputeManagementClient, dict) -> None
@ -2429,21 +2539,69 @@ def action_pool_autoscale_lastexec(batch_client, config):
def action_jobs_add(
batch_client, blob_client, keyvault_client, config, recreate, tail):
# type: (batchsc.BatchServiceClient, azureblob.BlockBlobService,
resource_client, compute_client, network_client, batch_mgmt_client,
batch_client, blob_client, table_client, keyvault_client, config,
recreate, tail):
# type: (azure.mgmt.resource.resources.ResourceManagementClient,
# azure.mgmt.compute.ComputeManagementClient,
# azure.mgmt.network.NetworkManagementClient,
# azure.mgmt.batch.BatchManagementClient,
# azure.batch.batch_service_client.BatchServiceClient,
# azureblob.BlockBlobService, azuretable.TableService,
# azure.keyvault.KeyVaultClient, dict, bool, str) -> None
"""Action: Jobs Add
:param azure.mgmt.resource.resources.ResourceManagementClient
resource_client: resource client
:param azure.mgmt.compute.ComputeManagementClient compute_client:
compute client
:param azure.mgmt.network.NetworkManagementClient network_client:
network client
:param azure.mgmt.batch.BatchManagementClient: batch_mgmt_client
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
batch client
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param azure.storage.table.TableService table_client: table client
:param azure.keyvault.KeyVaultClient keyvault_client: keyvault client
:param dict config: configuration dict
:param bool recreate: recreate jobs if completed
:param str tail: file to tail or last job and task added
"""
# check for job autopools
autopool = batch.check_jobs_for_auto_pool(config)
if autopool:
# check to ensure pool id is within 20 chars
pool_id = settings.pool_id(config)
if len(pool_id) > 20:
raise ValueError(
'pool id must be less than 21 characters: {}'.format(pool_id))
# check if a pool id with existing pool id exists
try:
batch_client.pool.get(pool_id)
except batchmodels.BatchErrorException as ex:
if 'The specified pool does not exist' in ex.message.value:
pass
else:
raise RuntimeError(
'pool with id of {} already exists'.format(pool_id))
# create storage containers and clear
storage.create_storage_containers(blob_client, table_client, config)
storage.clear_storage_containers(blob_client, table_client, config)
_adjust_settings_for_pool_creation(config)
storage.populate_global_resource_blobs(
blob_client, table_client, config)
# create autopool specification object
autopool = _construct_auto_pool_specification(
resource_client, compute_client, network_client, batch_mgmt_client,
batch_client, blob_client, config
)
# check settings and warn
_check_settings_for_auto_pool(config)
else:
autopool = None
# add jobs
batch.add_jobs(
batch_client, blob_client, keyvault_client, config, _JOBPREP_FILE,
_BLOBXFER_FILE, recreate, tail)
batch_client, blob_client, keyvault_client, config, autopool,
_JOBPREP_FILE, _BLOBXFER_FILE, recreate, tail)
def action_jobs_list(batch_client, config):
@ -2513,11 +2671,18 @@ def action_jobs_deltasks(batch_client, config, jobid, taskid, wait):
batch_client, config, jobid=jobid, taskid=taskid, wait=wait)
def action_jobs_term(batch_client, config, all, jobid, termtasks, wait):
# type: (batchsc.BatchServiceClient, dict, bool, str, bool, bool) -> None
def action_jobs_term(
batch_client, blob_client, queue_client, table_client, config, all,
jobid, termtasks, wait):
# type: (batchsc.BatchServiceClient, azureblob.BlockBlobService,
# azurequeue.QueueService, azuretable.TableService, dict, bool,
# str, bool, bool) -> None
"""Action: Jobs Term
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
batch client
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param azure.storage.queue.QueueService queue_client: queue client
:param azure.storage.table.TableService table_client: table client
:param dict config: configuration dict
:param bool all: all jobs
:param str jobid: job id
@ -2530,15 +2695,42 @@ def action_jobs_term(batch_client, config, all, jobid, termtasks, wait):
batch.terminate_all_jobs(
batch_client, config, termtasks=termtasks, wait=wait)
else:
# check for autopool
if util.is_none_or_empty(jobid):
autopool = batch.check_jobs_for_auto_pool(config)
if autopool:
# check if a pool id with existing pool id exists
try:
batch_client.pool.get(settings.pool_id(config))
except batchmodels.BatchErrorException as ex:
if 'The specified pool does not exist' in ex.message.value:
pass
else:
autopool = False
else:
autopool = False
# terminate the jobs
batch.terminate_jobs(
batch_client, config, jobid=jobid, termtasks=termtasks, wait=wait)
# if autopool, delete the storage
if autopool:
# TODO remove queue_client in 3.0
storage.cleanup_with_del_pool(
blob_client, queue_client, table_client, config)
def action_jobs_del(batch_client, config, all, jobid, termtasks, wait):
# type: (batchsc.BatchServiceClient, dict, bool, str, bool, bool) -> None
def action_jobs_del(
batch_client, blob_client, queue_client, table_client, config, all,
jobid, termtasks, wait):
# type: (batchsc.BatchServiceClient, azureblob.BlockBlobService,
# azurequeue.QueueService, azuretable.TableService, dict, bool,
# str, bool, bool) -> None
"""Action: Jobs Del
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
batch client
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param azure.storage.queue.QueueService queue_client: queue client
:param azure.storage.table.TableService table_client: table client
:param dict config: configuration dict
:param bool all: all jobs
:param str jobid: job id
@ -2551,8 +2743,28 @@ def action_jobs_del(batch_client, config, all, jobid, termtasks, wait):
batch.del_all_jobs(
batch_client, config, termtasks=termtasks, wait=wait)
else:
# check for autopool
if util.is_none_or_empty(jobid):
autopool = batch.check_jobs_for_auto_pool(config)
if autopool:
# check if a pool id with existing pool id exists
try:
batch_client.pool.get(settings.pool_id(config))
except batchmodels.BatchErrorException as ex:
if 'The specified pool does not exist' in ex.message.value:
pass
else:
autopool = False
else:
autopool = False
# delete the jobs
batch.del_jobs(
batch_client, config, jobid=jobid, termtasks=termtasks, wait=wait)
# if autopool, delete the storage
if autopool:
# TODO remove queue_client in 3.0
storage.cleanup_with_del_pool(
blob_client, queue_client, table_client, config)
def action_jobs_cmi(batch_client, config, delete):
@ -2645,32 +2857,43 @@ def action_jobs_enable(batch_client, config, jobid):
def action_storage_del(
blob_client, queue_client, table_client, config, clear_tables):
blob_client, queue_client, table_client, config, clear_tables, poolid):
# type: (azureblob.BlockBlobService, azurequeue.QueueService,
# azuretable.TableService, dict, bool) -> None
# azuretable.TableService, dict, bool, str) -> None
"""Action: Storage Del
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param azure.storage.queue.QueueService queue_client: queue client
:param azure.storage.table.TableService table_client: table client
:param dict config: configuration dict
:param bool clear_tables: clear tables instead of deleting
:param str poolid: pool id to target
"""
# reset storage settings to target poolid
if util.is_not_empty(poolid):
populate_global_settings(config, False, pool_id=poolid)
if clear_tables:
storage.clear_storage_containers(
blob_client, queue_client, table_client, config, tables_only=True)
blob_client, table_client, config, tables_only=True,
pool_id=poolid)
storage.delete_storage_containers(
blob_client, queue_client, table_client, config,
skip_tables=clear_tables)
def action_storage_clear(blob_client, table_client, config):
# type: (azureblob.BlockBlobService, azuretable.TableService, dict) -> None
def action_storage_clear(blob_client, table_client, config, poolid):
# type: (azureblob.BlockBlobService, azuretable.TableService, dict,
# str) -> None
"""Action: Storage Clear
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param azure.storage.table.TableService table_client: table client
:param dict config: configuration dict
:param str poolid: pool id to target
"""
storage.clear_storage_containers(blob_client, table_client, config)
# reset storage settings to target poolid
if util.is_not_empty(poolid):
populate_global_settings(config, False, pool_id=poolid)
storage.clear_storage_containers(
blob_client, table_client, config, pool_id=poolid)
def action_data_stream(batch_client, config, filespec, disk):

Просмотреть файл

@ -112,6 +112,12 @@ PoolAutoscaleSettings = collections.namedtuple(
'scenario',
]
)
PoolAutopoolSettings = collections.namedtuple(
'PoolAutopoolSettings', [
'pool_lifetime',
'keep_alive',
]
)
PoolSettings = collections.namedtuple(
'PoolSettings', [
'id', 'vm_size', 'vm_count', 'resize_timeout', 'max_tasks_per_node',
@ -2074,6 +2080,24 @@ def job_auto_complete(conf):
return ac
def job_auto_pool(conf):
# type: (dict) -> PoolAutopoolSettings
"""Get job autopool setting
:param dict conf: job configuration object
:rtype: PoolAutopoolSettings
:return: job autopool settings
"""
ap = _kv_read_checked(conf, 'auto_pool')
if ap is not None:
return PoolAutopoolSettings(
pool_lifetime=_kv_read_checked(
ap, 'pool_lifetime', 'job').lower(),
keep_alive=_kv_read(ap, 'keep_alive', False),
)
else:
return None
def job_priority(conf):
# type: (dict) -> int
"""Get job priority setting
@ -2303,9 +2327,9 @@ def task_settings(cloud_pool, config, poolconf, jobspec, conf, missing_images):
sku = None
node_agent = poolconf.vm_configuration.node_agent
else:
publisher = poolconf.publisher.lower()
offer = poolconf.offer.lower()
sku = poolconf.sku.lower()
publisher = poolconf.vm_configuration.publisher.lower()
offer = poolconf.vm_configuration.offer.lower()
sku = poolconf.vm_configuration.sku.lower()
else:
pool_id = cloud_pool.id
vm_size = cloud_pool.vm_size.lower()

Просмотреть файл

@ -215,15 +215,18 @@ def create_file_share_saskey(
)
def _construct_partition_key_from_config(config):
# type: (dict) -> str
def _construct_partition_key_from_config(config, pool_id=None):
# type: (dict, str) -> str
"""Construct partition key from config
:param dict config: configuration dict
:param str pool_id: use specified pool id instead
:rtype: str
:return: partition key
"""
if util.is_none_or_empty(pool_id):
pool_id = settings.pool_id(config, lower=True)
return '{}${}'.format(
settings.credentials_batch(config).account, settings.pool_id(config))
settings.credentials_batch(config).account, pool_id)
def _add_global_resource(
@ -408,9 +411,13 @@ def _clear_blobs(blob_client, container):
:param str container: container to clear blobs from
"""
logger.info('deleting blobs: {}'.format(container))
blobs = blob_client.list_blobs(container)
for blob in blobs:
blob_client.delete_blob(container, blob.name)
try:
blobs = blob_client.list_blobs(container)
except azure.common.AzureMissingResourceHttpError:
logger.warning('container not found: {}'.format(container))
else:
for blob in blobs:
blob_client.delete_blob(container, blob.name)
def _clear_blob_task_resourcefiles(blob_client, container, config):
@ -423,19 +430,24 @@ def _clear_blob_task_resourcefiles(blob_client, container, config):
bs = settings.batch_shipyard_settings(config)
envfileloc = '{}taskrf-'.format(bs.storage_entity_prefix)
logger.info('deleting blobs with prefix: {}'.format(envfileloc))
blobs = blob_client.list_blobs(container, prefix=envfileloc)
for blob in blobs:
blob_client.delete_blob(container, blob.name)
try:
blobs = blob_client.list_blobs(container, prefix=envfileloc)
except azure.common.AzureMissingResourceHttpError:
logger.warning('container not found: {}'.format(container))
else:
for blob in blobs:
blob_client.delete_blob(container, blob.name)
def _clear_table(table_client, table_name, config):
def _clear_table(table_client, table_name, config, pool_id=None):
# type: (azuretable.TableService, str, dict, str) -> None
"""Clear table entities
:param azure.storage.table.TableService table_client: table client
:param str table_name: table name
:param dict config: configuration dict
:param str pool_id: use specified pool id instead
"""
# type: (azuretable.TableService, str, dict) -> None
pk = _construct_partition_key_from_config(config)
pk = _construct_partition_key_from_config(config, pool_id=pool_id)
logger.debug('clearing table (pk={}): {}'.format(pk, table_name))
ents = table_client.query_entities(
table_name, filter='PartitionKey eq \'{}\''.format(pk))
@ -454,14 +466,15 @@ def _clear_table(table_client, table_name, config):
def clear_storage_containers(
blob_client, table_client, config, tables_only=False):
blob_client, table_client, config, tables_only=False, pool_id=None):
# type: (azureblob.BlockBlobService, azuretable.TableService, dict,
# bool) -> None
# bool, str) -> None
"""Clear storage containers
:param azure.storage.blob.BlockBlobService blob_client: blob client
:param azure.storage.table.TableService table_client: table client
:param dict config: configuration dict
:param bool tables_only: clear only tables
:param str pool_id: use specified pool id instead
"""
bs = settings.batch_shipyard_settings(config)
for key in _STORAGE_CONTAINERS:
@ -470,7 +483,9 @@ def clear_storage_containers(
_clear_blobs(blob_client, _STORAGE_CONTAINERS[key])
elif key.startswith('table_'):
try:
_clear_table(table_client, _STORAGE_CONTAINERS[key], config)
_clear_table(
table_client, _STORAGE_CONTAINERS[key], config,
pool_id=pool_id)
except azure.common.AzureMissingResourceHttpError:
if key != 'table_perf' or bs.store_timing_metrics:
raise

Просмотреть файл

@ -19,7 +19,6 @@ The jobs schema is as follows:
"max_wall_time": "02:00:00",
"retention_time": "1.12:00:00",
"priority": 0,
"allow_run_on_missing_image": false,
"user_identity": {
"default_pool_admin": true,
"specific_user": {
@ -27,6 +26,11 @@ The jobs schema is as follows:
"gid": 1000
}
},
"auto_pool": {
"pool_lifetime": "job",
"keep_alive": false
},
"allow_run_on_missing_image": false,
"remove_container_after_exit": true,
"shm_size": "256m",
"infiniband": false,
@ -188,12 +192,6 @@ Tasks within jobs with higher priority are run ahead of those with lower
priority, however, tasks that are already running with lower priority are
not preempted. Valid values are within the range of [-1000, 1000] and the
default is `0`.
* (optional) `allow_run_on_missing_image` allows tasks with a Docker image
reference that was not pre-loaded on to the compute node via
`global_resources`:`docker_images` in the global configuration to be able to
run. Note that you should attempt to specify all Docker images that you intend
to run in the `global_resources`:`docker_images` property in the global
configuration to minimize scheduling to task execution latency.
* (optional) `user_identity` property is to define which user to run the
container as. By default, if this property is not defined, the container will
be run as the root user. However, it may be required to run the container
@ -208,6 +206,25 @@ mutually exclusive of one another.
user.
* (required) `uid` is the user id of the user
* (required) `gid` is the group id of the user
* (optional) `auto_pool` will create a compute pool on demand for
the job as specified in the pool configuration. Note that storage resources
required by Batch Shipyard may not be automatically cleaned up when using
autopools. Utilizing `jobs term` or `jobs del` without any jobid scoping
will attempt to clean up storage resources. Otherwise, you will need to use
`storage del` or `storage clear` to clean up storage resources manually.
* (optional) `pool_lifetime` specifies the lifetime of the pool. Valid
values are `job` and `job_schedule`. You may not specify `job_schedule`
for non-recurring jobs. The default is `job`.
* (optional) `keep_alive` specifies if the pool should be kept even after
its lifetime expires. The default is `false`. Note that setting this
value to `false` and setting `auto_complete` to `true` will automatically
delete the compute pool once all tasks under the job complete.
* (optional) `allow_run_on_missing_image` allows tasks with a Docker image
reference that was not pre-loaded on to the compute node via
`global_resources`:`docker_images` in the global configuration to be able to
run. Note that you should attempt to specify all Docker images that you intend
to run in the `global_resources`:`docker_images` property in the global
configuration to minimize scheduling to task execution latency.
* (optional) `remove_container_after_exit` property specifies if all
containers under the job should be automatically removed/cleaned up after
the task exits. This defaults to `false`.

Просмотреть файл

@ -357,7 +357,9 @@ to the Batch pool
sub-command is typically not required if `multi_instance_auto_complete` is
set to `true` in the job specification for the job.
* `--delete` will delete any stale cleanup jobs
* `del` will delete jobs specified in the jobs configuration file
* `del` will delete jobs specified in the jobs configuration file. If an
autopool is specified for all jobs and a jobid option is not specified,
the storage associated with the autopool will be cleaned up.
* `--all` will delete all jobs found in the Batch account
* `--jobid` force deletion scope to just this job id
* `--termtasks` will manually terminate tasks prior to deletion. Termination
@ -386,7 +388,9 @@ file
* `--requeue` requeue running tasks
* `--terminate` terminate running tasks
* `--wait` wait for running tasks to complete
* `term` will terminate jobs found in the jobs configuration file
* `term` will terminate jobs found in the jobs configuration file. If an
autopool is specified for all jobs and a jobid option is not specified,
the storage associated with the autopool will be cleaned up.
* `--all` will terminate all jobs found in the Batch account
* `--jobid` force termination scope to just this job id
* `--termtasks` will manually terminate tasks prior to termination.
@ -523,8 +527,11 @@ The `storage` command has the following sub-commands:
```
* `clear` will clear the Azure Storage containers used by Batch Shipyard
for metadata purposes
* `--poolid` will target a specific pool id rather than from configuration
* `del` will delete the Azure Storage containers used by Batch Shipyard
for metadata purposes
* `--clear-tables` will clear tables instead of deleting them
* `--poolid` will target a specific pool id
## Example Invocations
```shell

Просмотреть файл

@ -85,6 +85,16 @@ Additionally, if you have specified an SSH user for your pool and there
is a start task failure, you can still issue the command `pool asu` to add the
pool SSH user and then `pool ssh` to SSH into the node to debug further.
Please note that the start task requires downloading some files that are
uploaded to your Azure Storage account with the command `pool add`. These
files have SAS tokens which allow the Batch compute node to authenticate
with the Azure Storage service to download the files. These SAS tokens are
bound to the storage account key for which they were generated with. If you
change/regenerate your storage account key that these SAS tokens were
originally generated with, then the compute nodes will fail to start as
these files will no longer be able to be downloaded. You will need to
recreate your pool in these situations.
#### Compute Node does not start or is unusable
If the compute node is "stuck" in starting state or enters unusable state,
this indicates that there was an issue allocating the node from the Azure

Просмотреть файл

@ -907,28 +907,32 @@ def storage(ctx):
@storage.command('del')
@click.option(
'--clear-tables', is_flag=True, help='Clear tables instead of deleting')
@click.option(
'--poolid', help='Delete storage containers for the specified pool')
@common_options
@batch_options
@keyvault_options
@pass_cli_context
def storage_del(ctx, clear_tables):
def storage_del(ctx, clear_tables, poolid):
"""Delete Azure Storage containers used by Batch Shipyard"""
ctx.initialize_for_storage()
convoy.fleet.action_storage_del(
ctx.blob_client, ctx.queue_client, ctx.table_client, ctx.config,
clear_tables)
clear_tables, poolid)
@storage.command('clear')
@click.option(
'--poolid', help='Clear storage containers for the specified pool')
@common_options
@batch_options
@keyvault_options
@pass_cli_context
def storage_clear(ctx):
def storage_clear(ctx, poolid):
"""Clear Azure Storage containers used by Batch Shipyard"""
ctx.initialize_for_storage()
convoy.fleet.action_storage_clear(
ctx.blob_client, ctx.table_client, ctx.config)
ctx.blob_client, ctx.table_client, ctx.config, poolid)
@cli.group()
@ -1327,8 +1331,9 @@ def jobs_add(ctx, recreate, tail):
"""Add jobs"""
ctx.initialize_for_batch()
convoy.fleet.action_jobs_add(
ctx.batch_client, ctx.blob_client, ctx.keyvault_client, ctx.config,
recreate, tail)
ctx.resource_client, ctx.compute_client, ctx.network_client,
ctx.batch_mgmt_client, ctx.batch_client, ctx.blob_client,
ctx.table_client, ctx.keyvault_client, ctx.config, recreate, tail)
@jobs.command('list')
@ -1402,7 +1407,8 @@ def jobs_term(ctx, all, jobid, termtasks, wait):
"""Terminate jobs"""
ctx.initialize_for_batch()
convoy.fleet.action_jobs_term(
ctx.batch_client, ctx.config, all, jobid, termtasks, wait)
ctx.batch_client, ctx.blob_client, ctx.queue_client, ctx.table_client,
ctx.config, all, jobid, termtasks, wait)
@jobs.command('del')
@ -1423,7 +1429,8 @@ def jobs_del(ctx, all, jobid, termtasks, wait):
"""Delete jobs"""
ctx.initialize_for_batch()
convoy.fleet.action_jobs_del(
ctx.batch_client, ctx.config, all, jobid, termtasks, wait)
ctx.batch_client, ctx.blob_client, ctx.queue_client, ctx.table_client,
ctx.config, all, jobid, termtasks, wait)
@jobs.command('deltasks')