- Allow pools to be added with zero target nodes
- Add pool autoscale commands
This commit is contained in:
Fred Park 2017-07-19 15:34:46 -07:00
Родитель 5291ff1130
Коммит 82a46a615a
9 изменённых файлов: 752 добавлений и 70 удалений

Просмотреть файл

@ -21,6 +21,22 @@
},
"resize_timeout": "00:20:00",
"max_tasks_per_node": 1,
"autoscale": {
"evaluation_interval": "00:05:00",
"scenario": {
"name": "active_tasks",
"maximum_vm_count": {
"dedicated": 16,
"low_priority": 8
},
"node_deallocation_option": "taskcompletion",
"sample_lookback_interval": "00:10:00",
"required_sample_percentage": 70,
"bias_last_sample": true,
"bias_node_type": "low_priority"
},
"formula": ""
},
"inter_node_communication_enabled": true,
"reboot_on_start_task_failed": true,
"block_until_all_global_resources_loaded": true,

274
convoy/autoscale.py Normal file
Просмотреть файл

@ -0,0 +1,274 @@
# Copyright (c) Microsoft Corporation
#
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
# compat imports
from __future__ import (
absolute_import, division, print_function, unicode_literals
)
from builtins import ( # noqa
bytes, dict, int, list, object, range, str, ascii, chr, hex, input,
next, oct, open, pow, round, super, filter, map, zip)
# stdlib imports
import collections
# non-stdlib imports
# local imports
from . import util
# global defines
_UNBOUND_MAX_NODES = 16777216
AutoscaleMinMax = collections.namedtuple(
'AutoscaleMinMax', [
'max_tasks_per_node',
'min_target_dedicated',
'min_target_low_priority',
'max_target_dedicated',
'max_target_low_priority',
]
)
def _formula_tasks(pool):
# type: (settings.PoolSettings) -> str
"""Generate an autoscale formula for tasks scenario
:param settings.PoolSettings pool: pool settings
:rtype: str
:return: autoscale formula
"""
minmax = _get_minmax(pool)
if pool.autoscale.scenario.name == 'active_tasks':
task_type = 'Active'
elif pool.autoscale.scenario.name == 'pending_tasks':
task_type = 'Pending'
else:
raise ValueError('autoscale scenario name invalid: {}'.format(
pool.autoscale.scenario.name))
if pool.autoscale.scenario.bias_last_sample:
req_vms = [
'sli = TimeInterval_Second * {}'.format(
pool.autoscale.scenario.sample_lookback_interval.
total_seconds()),
'samplepercent = ${}Tasks.GetSamplePercent(sli)'.format(task_type),
'lastsample = val(${}Tasks.GetSample(1), 0)'.format(task_type),
'samplevecavg = avg(${}Tasks.GetSample(sli))'.format(task_type),
('{}TaskAvg = samplepercent < {} ? max(0, lastsample) : '
'(lastsample < samplevecavg ? avg(lastsample, samplevecavg) : '
'max(lastsample, samplevecavg))').format(
task_type,
pool.autoscale.scenario.required_sample_percentage,
),
'reqVMs = {}TaskAvg / maxTasksPerNode'.format(task_type),
]
else:
req_vms = [
'sli = TimeInterval_Second * {}'.format(
pool.autoscale.scenario.sample_lookback_interval.
total_seconds()),
'{}TaskAvg = avg(${}Tasks.GetSample(sli, {}))'.format(
task_type, task_type,
pool.autoscale.scenario.required_sample_percentage),
'reqVMs = {}TaskAvg / maxTasksPerNode'.format(task_type),
'reqVMs = ({}TaskAvg > 0 && reqVMs < 1) ? 1 : reqVMs'.format(
task_type),
]
req_vms = ';\n'.join(req_vms)
if pool.autoscale.scenario.bias_node_type is None:
target_vms = [
'divisor = (maxTargetDedicated == 0 || '
'maxTargetLowPriority == 0) ? 1 : 2',
'dedicatedVMs = max(minTargetDedicated, reqVMs / divisor)',
'dedicatedVMs = min(maxTargetDedicated, '
'(dedicatedVMs > 0 && dedicatedVMs < 1) ? 1 : dedicatedVMs)',
'$TargetDedicatedNodes = dedicatedVMs',
'$TargetLowPriorityNodes = max(minTargetLowPriority, '
'min(maxTargetLowPriority, reqVMs - dedicatedVMs))',
]
elif pool.autoscale.scenario.bias_node_type == 'dedicated':
target_vms = [
'dedicatedVMs = min(maxTargetDedicated, '
'max(minTargetDedicated, reqVMs))',
'$TargetDedicatedNodes = dedicatedVMs',
'$TargetLowPriorityNodes = max(minTargetLowPriority, '
'min(maxTargetLowPriority, reqVMs - dedicatedVMs))',
]
elif pool.autoscale.scenario.bias_node_type == 'low_priority':
target_vms = [
'lowPriVms = min(maxTargetLowPriority, '
'max(minTargetLowPriority, reqVMs))',
'$TargetLowPriorityNodes = lowPriVms',
'$TargetDedicatedNodes = max(minTargetDedicated, '
'min(maxTargetDedicated, reqVMs - lowPriVms))',
]
else:
raise ValueError(
'autoscale scenario bias node type invalid: {}'.format(
pool.autoscale.scenario.bias_node_type))
target_vms = ';\n'.join(target_vms)
formula = [
'maxTasksPerNode = {}'.format(minmax.max_tasks_per_node),
'minTargetDedicated = {}'.format(minmax.min_target_dedicated),
'minTargetLowPriority = {}'.format(minmax.min_target_low_priority),
'maxTargetDedicated = {}'.format(minmax.max_target_dedicated),
'maxTargetLowPriority = {}'.format(minmax.max_target_low_priority),
req_vms,
target_vms,
'$NodeDeallocationOption = {}'.format(
pool.autoscale.scenario.node_deallocation_option),
]
return ';\n'.join(formula) + ';'
def _formula_day_of_week(pool):
# type: (settings.PoolSettings) -> str
"""Generate an autoscale formula for a day of the week scenario
:param settings.PoolSettings pool: pool settings
:rtype: str
:return: autoscale formula
"""
minmax = _get_minmax(pool)
if pool.autoscale.scenario.name == 'workday':
target_vms = [
'now = time()',
'isWorkHours = now.hour >= 8 && now.hour < 18',
'isWeekday = now.weekday >= 1 && now.weekday <= 5',
'isPeakTime = isWeekday && isWorkHours',
]
elif (pool.autoscale.scenario.name ==
'workday_with_offpeak_max_low_priority'):
target_vms = [
'now = time()',
'isWorkHours = now.hour >= 8 && now.hour < 18',
'isWeekday = now.weekday >= 1 && now.weekday <= 5',
'isPeakTime = isWeekday && isWorkHours',
'$TargetLowPriorityNodes = maxTargetLowPriority',
]
if pool.autoscale.scenario.bias_node_type == 'low_priority':
target_vms.append('$TargetDedicatedNodes = minTargetDedicated')
else:
target_vms.append(
'$TargetDedicatedNodes = isPeakTime ? '
'maxTargetDedicated : minTargetDedicated')
elif pool.autoscale.scenario.name == 'weekday':
target_vms = [
'now = time()',
'isPeakTime = now.weekday >= 1 && now.weekday <= 5',
]
elif pool.autoscale.scenario.name == 'weekend':
target_vms = [
'now = time()',
'isPeakTime = now.weekday >= 6 && now.weekday <= 7',
]
else:
raise ValueError('autoscale scenario name invalid: {}'.format(
pool.autoscale.scenario.name))
if pool.autoscale.scenario.name != 'workday_with_offpeak_max_low_priority':
if pool.autoscale.scenario.bias_node_type is None:
target_vms.append(
'$TargetDedicatedNodes = isPeakTime ? '
'maxTargetDedicated : minTargetDedicated')
target_vms.append(
'$TargetLowPriorityNodes = isPeakTime ? '
'maxTargetLowPriority : minTargetLowPriority')
elif pool.autoscale.scenario.bias_node_type == 'dedicated':
target_vms.append(
'$TargetDedicatedNodes = isPeakTime ? '
'maxTargetDedicated : minTargetDedicated')
target_vms.append('$TargetLowPriorityNodes = minTargetLowPriority')
elif pool.autoscale.scenario.bias_node_type == 'low_priority':
target_vms.append('$TargetDedicatedNodes = minTargetDedicated')
target_vms.append(
'$TargetLowPriorityNodes = isPeakTime ? '
'maxTargetLowPriority : minTargetLowPriority')
else:
raise ValueError(
'autoscale scenario bias node type invalid: {}'.format(
pool.autoscale.scenario.bias_node_type))
target_vms = ';\n'.join(target_vms)
formula = [
'maxTasksPerNode = {}'.format(minmax.max_tasks_per_node),
'minTargetDedicated = {}'.format(minmax.min_target_dedicated),
'minTargetLowPriority = {}'.format(minmax.min_target_low_priority),
'maxTargetDedicated = {}'.format(minmax.max_target_dedicated),
'maxTargetLowPriority = {}'.format(minmax.max_target_low_priority),
target_vms,
'$NodeDeallocationOption = {}'.format(
pool.autoscale.scenario.node_deallocation_option),
]
return ';\n'.join(formula) + ';'
def _get_minmax(pool):
# type: (settings.PoolSettings) -> AutoscaleMinMax
"""Get the min/max settings for autoscale spec
:param settings.PoolSettings pool: pool settings
:rtype: AutoscaleMinMax
:return: autoscale min max object
"""
min_target_dedicated = pool.vm_count.dedicated
min_target_low_priority = pool.vm_count.low_priority
max_target_dedicated = pool.autoscale.scenario.maximum_vm_count.dedicated
if max_target_dedicated < 0:
max_target_dedicated = _UNBOUND_MAX_NODES
max_target_low_priority = (
pool.autoscale.scenario.maximum_vm_count.low_priority
)
if max_target_low_priority < 0:
max_target_low_priority = _UNBOUND_MAX_NODES
if min_target_dedicated > max_target_dedicated:
raise ValueError(
'min target dedicated {} > max target dedicated {}'.format(
min_target_dedicated, max_target_dedicated))
if min_target_low_priority > max_target_low_priority:
raise ValueError(
'min target low priority {} > max target low priority {}'.format(
min_target_low_priority, max_target_low_priority))
return AutoscaleMinMax(
max_tasks_per_node=pool.max_tasks_per_node,
min_target_dedicated=min_target_dedicated,
min_target_low_priority=min_target_low_priority,
max_target_dedicated=max_target_dedicated,
max_target_low_priority=max_target_low_priority,
)
_AUTOSCALE_SCENARIOS = {
'active_tasks': _formula_tasks,
'pending_tasks': _formula_tasks,
'workday': _formula_day_of_week,
'workday_with_offpeak_max_low_priority': _formula_day_of_week,
'weekday': _formula_day_of_week,
'weekend': _formula_day_of_week,
}
def get_formula(pool):
# type: (settings.PoolSettings) -> str
"""Get or generate an autoscale formula according to settings
:param settings.PoolSettings pool: pool settings
:rtype: str
:return: autoscale formula
"""
if util.is_not_empty(pool.autoscale.formula):
return pool.autoscale.formula
else:
return _AUTOSCALE_SCENARIOS[pool.autoscale.scenario.name](pool)

Просмотреть файл

@ -46,6 +46,7 @@ import time
# non-stdlib imports
import azure.batch.models as batchmodels
# local imports
from . import autoscale
from . import crypto
from . import data
from . import keyvault
@ -710,6 +711,41 @@ def list_pools(batch_client):
logger.error('no pools found')
def _check_metadata_mismatch(mdtype, metadata, req_ge=None):
# type: (str, List[batchmodels.MetadataItem], str) -> None
"""Check for metadata mismatch
:param str mdtype: metadata type (e.g., pool, job)
:param list metadata: list of metadata items
:param str req_ge: required greater than or equal to
"""
if util.is_none_or_empty(metadata):
if req_ge is not None:
raise RuntimeError(
('{} version metadata not present but version {} is '
'required').format(mdtype, req_ge))
else:
logger.warning('{} version metadata not present'.format(mdtype))
else:
for md in metadata:
if md.name == settings.get_metadata_version_name():
if md.value != __version__:
logger.warning(
'{} version metadata mismatch: {}={} cli={}'.format(
mdtype, mdtype, md.value, __version__))
if req_ge is not None:
# split version into tuple
mdt = md.value.split('.')
mdt = tuple((int(mdt[0]), int(mdt[1]), mdt[2]))
rv = req_ge.split('.')
rv = tuple((int(rv[0]), int(rv[1]), rv[2]))
if mdt < rv:
raise RuntimeError(
('{} version of {} does not meet the version '
'requirement of at least {}').format(
mdtype, md.value, req_ge))
break
def resize_pool(batch_client, config, wait=False):
# type: (azure.batch.batch_service_client.BatchServiceClient, dict,
# bool) -> list
@ -724,16 +760,7 @@ def resize_pool(batch_client, config, wait=False):
pool = settings.pool_settings(config)
_pool = batch_client.pool.get(pool.id)
# check pool metadata version
if util.is_none_or_empty(_pool.metadata):
logger.warning('pool version metadata not present')
else:
for md in _pool.metadata:
if (md.name == settings.get_metadata_version_name() and
md.value != __version__):
logger.warning(
'pool version metadata mismatch: pool={} cli={}'.format(
md.value, __version__))
break
_check_metadata_mismatch('pool', _pool.metadata)
logger.info(
('Resizing pool {} to {} compute nodes [current_dedicated_nodes={} '
'current_low_priority_nodes={}]').format(
@ -794,6 +821,114 @@ def del_pool(batch_client, config, pool_id=None):
return True
def pool_autoscale_disable(batch_client, config):
# type: (batch.BatchServiceClient, dict) -> None
"""Enable autoscale formula
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
pool_id = settings.pool_id(config)
batch_client.pool.disable_auto_scale(pool_id=pool_id)
logger.info('autoscale disabled for pool {}'.format(pool_id))
def pool_autoscale_enable(batch_client, config):
# type: (batch.BatchServiceClient, dict) -> None
"""Enable autoscale formula
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
pool = settings.pool_settings(config)
_pool = batch_client.pool.get(pool.id)
# check pool metadata
# TODO fix req version to current release version until 2.9.0
_check_metadata_mismatch('pool', _pool.metadata, req_ge='2.8.0')
asformula = None
asei = None
if not _pool.enable_auto_scale:
# check if an autoscale formula exists in config
if not settings.is_pool_autoscale_enabled(config, pas=pool.autoscale):
if not util.confirm_action(
config,
('enable dummy formula for pool {} as no autoscale '
'formula exists').format(pool.id)):
logger.error('not enabling autoscale for pool {}'.format(
pool.id))
return
# set dummy formula
asformula = (
'$TargetDedicatedNodes = {}; '
'$TargetLowPriorityNodes = {};'
).format(
_pool.target_dedicated_nodes, _pool.target_low_priority_nodes)
if asformula is None:
asformula = autoscale.get_formula(pool)
asei = pool.autoscale.evaluation_interval
# enable autoscale
batch_client.pool.enable_auto_scale(
pool_id=pool.id,
auto_scale_formula=asformula,
auto_scale_evaluation_interval=asei,
)
logger.info('autoscale enabled/updated for pool {}'.format(pool.id))
def _output_autoscale_result(result):
# type: (batchmodels.AutoScaleRun) -> None
"""Output autoscale evalute or last exec results
:param batchmodels.AutoScaleRun result: result
"""
if result is None:
logger.error(
'autoscale result is invalid, ensure autoscale is enabled')
return
if result.error is not None:
logger.error('autoscale evaluate error: code={} message={}'.format(
result.error.code, result.error.message))
else:
logger.info('autoscale result: {}'.format(result.results))
logger.info('last autoscale evaluation: {}'.format(result.timestamp))
def pool_autoscale_evaluate(batch_client, config):
# type: (batch.BatchServiceClient, dict) -> None
"""Evaluate autoscale formula
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
pool = settings.pool_settings(config)
if not settings.is_pool_autoscale_enabled(config, pas=pool.autoscale):
logger.error(
('cannot evaluate autoscale for pool {}, not enabled or '
'no formula').format(pool.id))
return
result = batch_client.pool.evaluate_auto_scale(
pool_id=pool.id,
auto_scale_formula=autoscale.get_formula(pool),
)
_output_autoscale_result(result)
def pool_autoscale_lastexec(batch_client, config):
# type: (batch.BatchServiceClient, dict) -> None
"""Get last execution of the autoscale formula
:param batch_client: The batch client to use.
:type batch_client: `azure.batch.batch_service_client.BatchServiceClient`
:param dict config: configuration dict
"""
pool_id = settings.pool_id(config)
pool = batch_client.pool.get(pool_id)
if not pool.enable_auto_scale:
logger.error(
('last execution information not available for autoscale '
'disabled pool {}').format(pool_id))
return
_output_autoscale_result(pool.auto_scale_run)
def reboot_nodes(batch_client, config, all_start_task_failed, node_id):
# type: (batch.BatchServiceClient, dict, bool, str) -> None
"""Reboot nodes in a pool
@ -2363,17 +2498,7 @@ def add_jobs(
else:
# retrieve job and check for version consistency
_job = batch_client.job.get(job.id)
if util.is_none_or_empty(_job.metadata):
logger.warning('job version metadata not present')
else:
for md in _job.metadata:
if (md.name == settings.get_metadata_version_name()
and md.value != __version__):
logger.warning(
('job version metadata mismatch: '
'job={} cli={}').format(
md.value, __version__))
break
_check_metadata_mismatch('job', _job.metadata)
else:
raise
del multi_instance

Просмотреть файл

@ -45,6 +45,7 @@ import azure.batch.models as batchmodels
import azure.mgmt.batch.models as batchmgmtmodels
import azure.mgmt.compute.models as computemodels
# local imports
from . import autoscale
from . import batch
from . import crypto
from . import data
@ -720,6 +721,16 @@ def _add_pool(
pass
# retrieve settings
pool_settings = settings.pool_settings(config)
# get autoscale settings
if settings.is_pool_autoscale_enabled(config, pas=pool_settings.autoscale):
asenable = True
asformula = autoscale.get_formula(pool_settings)
asei = pool_settings.autoscale.evaluation_interval
else:
asenable = False
asformula = None
asei = None
logger.debug('autoscale enabled: {}'.format(asenable))
custom_image_na = settings.pool_custom_image_node_agent(config)
bc = settings.credentials_batch(config)
vnet = None
@ -956,8 +967,12 @@ def _add_pool(
id=pool_settings.id,
virtual_machine_configuration=vmconfig,
vm_size=pool_settings.vm_size,
target_dedicated_nodes=pool_settings.vm_count.dedicated,
target_low_priority_nodes=pool_settings.vm_count.low_priority,
target_dedicated_nodes=(
pool_settings.vm_count.dedicated if not asenable else None
),
target_low_priority_nodes=(
pool_settings.vm_count.low_priority if not asenable else None
),
resize_timeout=pool_settings.resize_timeout,
max_tasks_per_node=pool_settings.max_tasks_per_node,
enable_inter_node_communication=pool_settings.
@ -981,6 +996,9 @@ def _add_pool(
],
resource_files=[],
),
enable_auto_scale=asenable,
auto_scale_formula=asformula,
auto_scale_evaluation_interval=asei,
metadata=[
batchmodels.MetadataItem(
name=settings.get_metadata_version_name(),
@ -1063,7 +1081,14 @@ def _add_pool(
batch.generate_docker_login_settings(config)[0])
# create pool
nodes = batch.create_pool(batch_client, config, pool)
if util.is_none_or_empty(nodes):
_pool = batch_client.pool.get(pool.id)
pool_current_vm_count = (
_pool.current_dedicated_nodes + _pool.current_low_priority_nodes
)
pool_target_vm_count = (
_pool.target_dedicated_nodes + _pool.target_low_priority_nodes
)
if util.is_none_or_empty(nodes) and pool_target_vm_count > 0:
raise RuntimeError(
('No nodes could be allocated for pool: {}. If the pool is '
'comprised entirely of low priority nodes, then there may not '
@ -1071,33 +1096,33 @@ def _add_pool(
'your request. Please inspect the pool for resize errors and '
'issue pool resize to try again.').format(pool.id))
# set up gluster on compute if specified
if gluster_on_compute:
if gluster_on_compute and pool_current_vm_count > 0:
_setup_glusterfs(
batch_client, blob_client, config, nodes, _GLUSTERPREP_FILE,
cmdline=None)
# create admin user on each node if requested
try:
batch.add_ssh_user(batch_client, config, nodes)
except Exception as e:
logger.exception(e)
logger.error(
'Could not add SSH users to nodes. Please ensure ssh-keygen is '
'available in your PATH or cwd. Skipping data ingress if '
'specified.')
else:
# log remote login settings
rls = batch.get_remote_login_settings(batch_client, config, nodes)
# ingress data to shared fs if specified
if pool_settings.transfer_files_on_pool_creation:
_pool = batch_client.pool.get(pool.id)
total_vm_count = (
_pool.current_dedicated_nodes +
_pool.current_low_priority_nodes
)
data.ingress_data(
batch_client, compute_client, network_client, config, rls=rls,
kind='shared', total_vm_count=total_vm_count)
del _pool
if pool_current_vm_count > 0:
try:
batch.add_ssh_user(batch_client, config, nodes)
except Exception as e:
logger.exception(e)
logger.error(
'Could not add SSH users to nodes. Please ensure ssh-keygen '
'is available in your PATH or cwd. Skipping data ingress if '
'specified.')
else:
# log remote login settings
rls = batch.get_remote_login_settings(batch_client, config, nodes)
# ingress data to shared fs if specified
if pool_settings.transfer_files_on_pool_creation:
total_vm_count = (
_pool.current_dedicated_nodes +
_pool.current_low_priority_nodes
)
data.ingress_data(
batch_client, compute_client, network_client, config,
rls=rls, kind='shared', total_vm_count=total_vm_count)
del _pool
# wait for storage ingress processes
data.wait_for_storage_threads(storage_threads)
@ -1600,19 +1625,18 @@ def _adjust_settings_for_pool_creation(config):
('forcing shipyard docker image to be used due to '
'VM config, publisher={} offer={} sku={}').format(
publisher, offer, sku))
# adjust inter node comm setting
if pool_total_vm_count < 1:
raise ValueError('invalid total vm_count: {}'.format(
pool_total_vm_count))
# re-read pool and data replication settings
pool = settings.pool_settings(config)
dr = settings.data_replication_settings(config)
# ensure settings p2p/internode settings are compatible
if dr.peer_to_peer.enabled and not pool.inter_node_communication_enabled:
logger.warning(
'force enabling inter-node communication due to peer-to-peer '
'transfer')
settings.set_inter_node_communication_enabled(config, True)
# ensure settings p2p/as/internode settings are compatible
if dr.peer_to_peer.enabled:
if settings.is_pool_autoscale_enabled(config, pas=pool.autoscale):
raise ValueError('cannot enable peer-to-peer and autoscale')
if pool.inter_node_communication_enabled:
logger.warning(
'force enabling inter-node communication due to peer-to-peer '
'transfer')
settings.set_inter_node_communication_enabled(config, True)
# hpn-ssh can only be used for Ubuntu currently
try:
if (pool.ssh.hpn_server_swap and
@ -1644,6 +1668,11 @@ def _adjust_settings_for_pool_creation(config):
sdv = settings.global_resources_shared_data_volumes(config)
for sdvkey in sdv:
if settings.is_shared_data_volume_gluster_on_compute(sdv, sdvkey):
if settings.is_pool_autoscale_enabled(
config, pas=pool.autoscale):
raise ValueError(
'glusterfs on compute cannot be installed on an '
'autoscale-enabled pool')
if not pool.inter_node_communication_enabled:
# do not modify value and proceed since this interplays
# with p2p settings, simply raise exception and force
@ -1677,6 +1706,9 @@ def _adjust_settings_for_pool_creation(config):
'per pool')
except KeyError:
pass
# check pool count of 0 and ssh
if pool_total_vm_count == 0 and util.is_not_empty(pool.ssh.username):
logger.warning('cannot add SSH user with zero target nodes')
def action_fs_disks_add(resource_client, compute_client, config):
@ -2346,6 +2378,46 @@ def action_pool_listimages(batch_client, config):
_list_docker_images(batch_client, config)
def action_pool_autoscale_disable(batch_client, config):
# type: (batchsc.BatchServiceClient, dict, str, str, bool) -> None
"""Action: Pool Autoscale Disable
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
batch client
:param dict config: configuration dict
"""
batch.pool_autoscale_disable(batch_client, config)
def action_pool_autoscale_enable(batch_client, config):
# type: (batchsc.BatchServiceClient, dict, str, str, bool) -> None
"""Action: Pool Autoscale Enable
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
batch client
:param dict config: configuration dict
"""
batch.pool_autoscale_enable(batch_client, config)
def action_pool_autoscale_evaluate(batch_client, config):
# type: (batchsc.BatchServiceClient, dict, str, str, bool) -> None
"""Action: Pool Autoscale Evaluate
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
batch client
:param dict config: configuration dict
"""
batch.pool_autoscale_evaluate(batch_client, config)
def action_pool_autoscale_lastexec(batch_client, config):
# type: (batchsc.BatchServiceClient, dict, str, str, bool) -> None
"""Action: Pool Autoscale Lastexec
:param azure.batch.batch_service_client.BatchServiceClient batch_client:
batch client
:param dict config: configuration dict
"""
batch.pool_autoscale_lastexec(batch_client, config)
def action_jobs_add(
batch_client, blob_client, keyvault_client, config, recreate, tail):
# type: (batchsc.BatchServiceClient, azureblob.BlockBlobService,

Просмотреть файл

@ -31,6 +31,7 @@ from builtins import ( # noqa
next, oct, open, pow, round, super, filter, map, zip)
# stdlib imports
import collections
import datetime
try:
import pathlib2 as pathlib
except ImportError:
@ -93,6 +94,24 @@ PoolVmCustomImageSettings = collections.namedtuple(
'node_agent',
]
)
PoolAutoscaleScenarioSettings = collections.namedtuple(
'PoolAutoscaleScenarioSettings', [
'name',
'maximum_vm_count',
'node_deallocation_option',
'sample_lookback_interval',
'required_sample_percentage',
'bias_last_sample',
'bias_node_type',
]
)
PoolAutoscaleSettings = collections.namedtuple(
'PoolAutoscaleSettings', [
'evaluation_interval',
'formula',
'scenario',
]
)
PoolSettings = collections.namedtuple(
'PoolSettings', [
'id', 'vm_size', 'vm_count', 'resize_timeout', 'max_tasks_per_node',
@ -101,7 +120,7 @@ PoolSettings = collections.namedtuple(
'block_until_all_global_resources_loaded',
'transfer_files_on_pool_creation', 'input_data', 'resource_files',
'gpu_driver', 'ssh', 'additional_node_prep_commands',
'virtual_network',
'virtual_network', 'autoscale',
]
)
SSHSettings = collections.namedtuple(
@ -516,14 +535,16 @@ def pool_specification(config):
return config['pool_specification']
def _pool_vm_count(config):
# type: (dict) -> PoolVmCountSettings
def _pool_vm_count(config, conf=None):
# type: (dict, dict) -> PoolVmCountSettings
"""Get Pool vm count settings
:param dict config: configuration object
:param dict conf: vm_count object
:rtype: PoolVmCountSettings
:return: pool vm count settings
"""
conf = pool_specification(config)['vm_count']
if conf is None:
conf = pool_specification(config)['vm_count']
if isinstance(conf, int):
conf = {'dedicated': conf}
return PoolVmCountSettings(
@ -573,6 +594,69 @@ def _populate_pool_vm_configuration(config):
)
def pool_autoscale_settings(config):
# type: (dict) -> PoolAutoscaleSettings
"""Get Pool autoscale settings
:param dict config: configuration object
:rtype: PoolAutoscaleSettings
:return: pool autoscale settings from specification
"""
conf = pool_specification(config)
conf = _kv_read_checked(conf, 'autoscale', {})
ei = _kv_read_checked(conf, 'evaluation_interval')
if util.is_not_empty(ei):
ei = util.convert_string_to_timedelta(ei)
else:
ei = datetime.timedelta(minutes=15)
scenconf = _kv_read_checked(conf, 'scenario')
if scenconf is not None:
mvc = _kv_read_checked(scenconf, 'maximum_vm_count', {})
ndo = _kv_read_checked(
scenconf, 'node_deallocation_option', 'taskcompletion')
if (ndo is not None and
ndo not in (
'requeue', 'terminate', 'taskcompletion', 'retaineddata')):
raise ValueError(
'invalid node_deallocation_option: {}'.format(ndo))
sli = _kv_read_checked(scenconf, 'sample_lookback_interval')
if util.is_not_empty(sli):
sli = util.convert_string_to_timedelta(sli)
else:
sli = datetime.timedelta(minutes=10)
scenario = PoolAutoscaleScenarioSettings(
name=_kv_read_checked(scenconf, 'name').lower(),
maximum_vm_count=_pool_vm_count(config, conf=mvc),
node_deallocation_option=ndo,
sample_lookback_interval=sli,
required_sample_percentage=_kv_read(
scenconf, 'required_sample_percentage', 70),
bias_last_sample=_kv_read(
scenconf, 'bias_last_sample', True),
bias_node_type=_kv_read_checked(
scenconf, 'bias_node_type'),
)
else:
scenario = None
return PoolAutoscaleSettings(
evaluation_interval=ei,
formula=_kv_read_checked(conf, 'formula'),
scenario=scenario,
)
def is_pool_autoscale_enabled(config, pas=None):
# type: (dict, PoolAutoscaleSettings) -> bool
"""Check if pool autoscale is enabled
:param dict config: configuration object
:param PoolAutoscaleSettings pas: pool autoscale settings
:rtype: bool
:return: if pool autoscale is enabled
"""
if pas is None:
pas = pool_autoscale_settings(config)
return util.is_not_empty(pas.formula) or pas.scenario is not None
def pool_settings(config):
# type: (dict) -> PoolSettings
"""Get Pool settings
@ -719,6 +803,7 @@ def pool_settings(config):
default_existing_ok=True,
default_create_nonexistant=False,
),
autoscale=pool_autoscale_settings(config),
)

Просмотреть файл

@ -249,7 +249,7 @@ def _add_global_resource(
resource = '{}:{}'.format(prefix, gr)
resource_sha1 = hashlib.sha1(
resource.encode('utf8')).hexdigest()
logger.info('adding global resource: {} {}'.format(
logger.info('adding global resource: {} hash={}'.format(
resource, resource_sha1))
table_client.insert_or_replace_entity(
_STORAGE_CONTAINERS['table_globalresources'],

Просмотреть файл

@ -29,6 +29,22 @@ The pool schema is as follows:
},
"resize_timeout": "00:20:00",
"max_tasks_per_node": 1,
"autoscale": {
"evaluation_interval": "00:05:00",
"scenario": {
"name": "active_tasks",
"maximum_vm_count": {
"dedicated": 16,
"low_priority": 8
},
"node_deallocation_option": "taskcompletion",
"sample_lookback_interval": "00:10:00",
"required_sample_percentage": 70,
"bias_last_sample": true,
"bias_node_type": "low_priority"
},
"formula": ""
},
"inter_node_communication_enabled": true,
"reboot_on_start_task_failed": true,
"block_until_all_global_resources_loaded": true,
@ -136,6 +152,53 @@ value of 1 if not specified. The maximum value for the property that Azure
Batch will accept is `4 x <# cores per compute node>`. For instance, for a
`STANDARD_F2` instance, because the virtual machine has 2 cores, the maximum
allowable value for this property would be `8`.
* (optional) `autoscale` designates the autoscale settings for the pool. If
specified, the `vm_count` becomes the minimum number of virtual machines for
each node type.
* (optional) `evaluation_interval` is the time interval between autoscale
evaluations performed by the service. The format for this property is a
timedelta with a string representation of "d.HH:mm:ss". "HH:mm:ss" is
required, but "d" is optional, if specified. If not specified, the default
is 15 minutes. The smallest value that can be specified is 5 minutes.
* (optional) `scenario` is a pre-set autoscale scenario where a formula
will be generated with the parameters specified within this property.
* (required) `name` is the autoscale scenario name to apply. Please see
the [autoscale guide](30-batch-shipyard-autoscale.md) for valid values.
* (optional) `maximum_vm_count` is the maximum number of compute nodes
that can be allocated from an autoscale evaluation. It is useful to
have these limits in place as to control the top-end scale of the
autoscale scenario. Specifying a negative value for either of the
following properties will result in effectively no maximum limit.
* (optional) `dedicated` is the maximum number of dedicated compute
nodes that can be allocated.
* (optional) `low_priority` is the maximum number of low priority
compute nodes that can be allocated.
* (optional) `node_deallocation_option` is the node deallocation option
to apply. When a pool is resized down and a node is selected for
removal, what action is performed for the running task is specified
with this option. The valid values are: `requeue`, `terminate`,
`taskcompletion`, and `retaineddata`. Please see [this doc](https://docs.microsoft.com/en-us/azure/batch/batch-automatic-scaling#variables) for more information.
* (optional) `sample_lookback_interval` is the time interval to lookback
for past history for certain scenarios such as autoscale based on
active and pending tasks. The format for this property is a timedelta
with a string representation of "d.HH:mm:ss". "HH:mm:ss" is required,
but "d" is optional, if specified. If not specified, the default is
10 minutes.
* (optional) `required_sample_percentage` is the required percentage of
samples that must be present during the `sample_lookback_interval`.
If not specified, the default is 70.
* (optional) `bias_last_sample` will bias the autoscale scenario, if
applicable, to use the last sample during history computation. This can
be enabled to more quickly respond to changes in history with respect
to averages. The default is `true`.
* (optional) `bias_node_type` will bias the the autoscale scenario, if
applicable, to favor one type of node over the other when making a
decision on how many of each node to allocate. The default is `null`
or equal weight to both `dedicated` and `low_priority` nodes. Valid
values are `null` (or omitting the property), `dedicated`, or
`low_priority`.
* (optional) `formula` is a custom autoscale formula to apply to the pool.
If both `formula` and `scenario` are specified, then `formula` is used.
* (optional) `inter_node_communication_enabled` designates if this pool is set
up for inter-node communication. This must be set to `true` for any containers
that must communicate with each other such as MPI applications. This

Просмотреть файл

@ -73,14 +73,6 @@ Alternatively, you can issue the command
`pool rebootnode --all-start-task-failed` which will attempt to reboot the
nodes that have entered this state.
Please note that the start task requires downloading some files that are
uploaded to your Azure Storage account with the command `pool add`. These
files have SAS tokens which allow the Batch compute node to authenticate
with the Azure Storage service to download the files. These SAS tokens have
a finite expiration with a default of 30 days. You can adjust this expiration
by modifying the setting `batch_shipyard`:`generated_sas_expiry_days` to
your desired value if you expect your pool to last longer than the default.
If the compute node fails to start properly, Batch Shipyard will automatically
download the compute node's stdout and stderr files for the start task into
the directory where you ran `shipyard`. The files will be placed in

Просмотреть файл

@ -1249,6 +1249,61 @@ def pool_listimages(ctx):
convoy.fleet.action_pool_listimages(ctx.batch_client, ctx.config)
@pool.group()
@pass_cli_context
def autoscale(ctx):
"""Pool autoscale actions"""
pass
@autoscale.command('disable')
@common_options
@batch_options
@keyvault_options
@aad_options
@pass_cli_context
def autoscale_disable(ctx):
"""Disable autoscale on a pool"""
ctx.initialize_for_batch()
convoy.fleet.action_pool_autoscale_disable(ctx.batch_client, ctx.config)
@autoscale.command('enable')
@common_options
@batch_options
@keyvault_options
@aad_options
@pass_cli_context
def autoscale_enable(ctx):
"""Enable autoscale on a pool"""
ctx.initialize_for_batch()
convoy.fleet.action_pool_autoscale_enable(ctx.batch_client, ctx.config)
@autoscale.command('evaluate')
@common_options
@batch_options
@keyvault_options
@aad_options
@pass_cli_context
def autoscale_evaluate(ctx):
"""Evaluate autoscale formula"""
ctx.initialize_for_batch()
convoy.fleet.action_pool_autoscale_evaluate(ctx.batch_client, ctx.config)
@autoscale.command('lastexec')
@common_options
@batch_options
@keyvault_options
@aad_options
@pass_cli_context
def autoscale_lastexec(ctx):
"""Get the result of the last execution of the autoscale formula"""
ctx.initialize_for_batch()
convoy.fleet.action_pool_autoscale_lastexec(ctx.batch_client, ctx.config)
@cli.group()
@pass_cli_context
def jobs(ctx):