Merge pull request #37 from Azure/bugfix/never_clamp_to_0

WIP: Never automatically reduce maxNumber
This commit is contained in:
bwatrous 2023-11-29 17:24:10 -08:00 коммит произвёл GitHub
Родитель 7fb05c228a 4342a81411
Коммит ec5dbd74cb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 33 добавлений и 28 удалений

Просмотреть файл

@ -95,11 +95,8 @@ class CycleCloudProvider:
virtual_machine = bucket["virtualMachine"] virtual_machine = bucket["virtualMachine"]
# Symphony hates special characters # Symphony hates special characters
nodearray_name = nodearray_root["name"] nodearray_name = nodearray_root["name"]
max_count = self._max_count(nodearray_name, nodearray, virtual_machine.get("vcpuCount"), bucket)
is_paused = self.capacity_tracker.is_paused(nodearray_name, machine_type) is_paused = self.capacity_tracker.is_paused(nodearray_name, machine_type)
if is_paused: at_least_one_available_bucket = at_least_one_available_bucket or not is_paused
max_count = 0
at_least_one_available_bucket = at_least_one_available_bucket or max_count > 0
return not at_least_one_available_bucket return not at_least_one_available_bucket
# Regenerate templates (and potentially reconfig HostFactory) without output, so # Regenerate templates (and potentially reconfig HostFactory) without output, so
@ -174,6 +171,7 @@ class CycleCloudProvider:
machine_type_name = bucket["definition"]["machineType"] machine_type_name = bucket["definition"]["machineType"]
machine_type_short = machine_type_name.lower().replace("standard_", "").replace("basic_", "").replace("_", "") machine_type_short = machine_type_name.lower().replace("standard_", "").replace("basic_", "").replace("_", "")
machine_type = bucket["virtualMachine"] machine_type = bucket["virtualMachine"]
at_least_one_available_bucket = True
# Symphony hates special characters # Symphony hates special characters
nodearray_name = nodearray_root["name"] nodearray_name = nodearray_root["name"]
@ -181,10 +179,9 @@ class CycleCloudProvider:
template_id = self._escape_id(template_id) template_id = self._escape_id(template_id)
if bucket.get("valid", True): if bucket.get("valid", True):
currently_available_templates.add(template_id) currently_available_templates.add(template_id)
max_count = self._max_count(nodearray_name, nodearray, machine_type.get("vcpuCount"), bucket)
at_least_one_available_bucket = at_least_one_available_bucket or max_count > 0 max_count = self._max_count(nodearray_name, nodearray, machine_type.get("vcpuCount"), bucket)
memory = machine_type.get("memory") * 1024 memory = machine_type.get("memory") * 1024
is_low_prio = nodearray.get("Interruptible", False) is_low_prio = nodearray.get("Interruptible", False)
ngpus = 0 ngpus = 0
@ -192,6 +189,14 @@ class CycleCloudProvider:
ngpus = int(nodearray.get("Configuration", {}).get("symphony", {}).get("ngpus", 0)) ngpus = int(nodearray.get("Configuration", {}).get("symphony", {}).get("ngpus", 0))
except ValueError: except ValueError:
logger.exception("Ignoring symphony.ngpus for nodearray %s" % nodearray_name) logger.exception("Ignoring symphony.ngpus for nodearray %s" % nodearray_name)
# Check previous maxNumber setting - we NEVER lower maxNumber, only raise it.
previous_max_count = 0
if template_id in templates_store:
previous_max_count = templates_store[template_id].get("maxNumber", max_count)
if max_count < previous_max_count:
logger.info("Rejecting attempt to lower maxNumber from %s to %s for %s", previous_max_count, max_count, template_id)
max_count = previous_max_count
base_priority = bucket_priority(nodearrays, nodearray_root, b_index) base_priority = bucket_priority(nodearrays, nodearray_root, b_index)
# Symphony # Symphony
@ -201,7 +206,8 @@ class CycleCloudProvider:
if ncpus_use_vcpus: if ncpus_use_vcpus:
ncpus = machine_type.get("vcpuCount") ncpus = machine_type.get("vcpuCount")
else: else:
ncpus = machine_type.get("pcpuCount") ncpus = machine_type.get("pcpuCount")
record = { record = {
"maxNumber": max_count, "maxNumber": max_count,
"templateId": template_id, "templateId": template_id,
@ -285,12 +291,12 @@ class CycleCloudProvider:
templates_store[record_mpi["templateId"]] = record_mpi templates_store[record_mpi["templateId"]] = record_mpi
currently_available_templates.add(record_mpi["templateId"]) currently_available_templates.add(record_mpi["templateId"])
# for templates that are no longer available, advertise them but set maxNumber = 0 # for templates that are no longer available, advertise them but set priority = 0
# NOTE: do not modify "maxNumber" - Symphony HF does not respond well to lowering maxNumber while jobs may be runni
for symphony_template in templates_store.values(): for symphony_template in templates_store.values():
if symphony_template["templateId"] not in currently_available_templates: if symphony_template["templateId"] not in currently_available_templates:
if self.fine: if self.fine:
logger.debug("Ignoring old template %s vs %s", symphony_template["templateId"], currently_available_templates) logger.debug("Ignoring old template %s vs %s", symphony_template["templateId"], currently_available_templates)
symphony_template["maxNumber"] = 0
symphony_template["priority"] = 0 symphony_template["priority"] = 0
new_templates_str = json.dumps(templates_store, indent=2, sort_keys=True) new_templates_str = json.dumps(templates_store, indent=2, sort_keys=True)
@ -330,7 +336,8 @@ class CycleCloudProvider:
except: except:
logger.warning("Exiting Non-zero so that symphony will retry") logger.warning("Exiting Non-zero so that symphony will retry")
logger.exception("Could not get template_json") logger.exception("Could not get template_json")
sys.exit(1) sys.exit(1)
def generate_userdata(self, template): def generate_userdata(self, template):
ret = {} ret = {}
@ -376,13 +383,13 @@ class CycleCloudProvider:
def _max_count(self, nodearray_name, nodearray, machine_cores, bucket): def _max_count(self, nodearray_name, nodearray, machine_cores, bucket):
if machine_cores < 0: if machine_cores < 0:
logger.error("Invalid number of machine cores - %s", machine_cores) logger.error("Invalid number of machine cores - %s.", machine_cores)
return -1 return -1
max_count = bucket.get("maxCount") max_count = bucket.get("maxCount")
if max_count is not None: if max_count is not None:
logger.debug("Using maxCount %s for %s", max_count, bucket) logger.debug("Using maxCount %s for nodearray %s and bucket %s", max_count, nodearray_name, bucket)
max_count = max(-1, max_count) max_count = max(-1, max_count)
else: else:
max_core_count = bucket.get("maxCoreCount") max_core_count = bucket.get("maxCoreCount")
@ -390,8 +397,8 @@ class CycleCloudProvider:
if nodearray.get("maxCoreCount") is None: if nodearray.get("maxCoreCount") is None:
logger.error("Need to define either maxCount or maxCoreCount! %s", pprint.pformat(bucket)) logger.error("Need to define either maxCount or maxCoreCount! %s", pprint.pformat(bucket))
return -1 return -1
logger.debug("Using maxCoreCount")
max_core_count = nodearray.get("maxCoreCount") max_core_count = nodearray.get("maxCoreCount")
logger.debug("Using maxCoreCount %s for nodearray %s and bucket %s", max_core_count, nodearray_name, bucket)
max_core_count = max(-1, max_core_count) max_core_count = max(-1, max_core_count)
@ -400,17 +407,6 @@ class CycleCloudProvider:
# We handle unexpected Capacity failures (Spot) by zeroing out capacity for a timed duration # We handle unexpected Capacity failures (Spot) by zeroing out capacity for a timed duration
machine_type_name = bucket["definition"]["machineType"] machine_type_name = bucket["definition"]["machineType"]
# Below code is commented out as a customer faced an issue where autoscaling was being limited
# by spot_increment_per_sku.
# For Spot instances, quota and limits are not great indicators of capacity, so artificially limit
# requests to single machine types to spread the load and find available skus for large workloads
# is_low_prio = nodearray.get("Interruptible", False)
# if is_low_prio:
# # Allow up to N _additional_ VMs (need to keep increasing this or symphony will stop considering the sku)
# active_count = bucket["activeCount"]
# max_count = min(max_count, active_count+self.spot_maxnumber_increment_per_sku)
return int(max_count) return int(max_count)
@failureresponse({"requests": [], "status": RequestStates.running}) @failureresponse({"requests": [], "status": RequestStates.running})

Просмотреть файл

@ -1,4 +1,4 @@
__version__ = "1.0.7" __version__ = "1.0.8"
def get_version(): def get_version():

Просмотреть файл

@ -150,6 +150,9 @@ class RequestsStoreInMem:
def read(self): def read(self):
return self.requests return self.requests
def write(self, data):
self.requests = deepcopy(data)
def __enter__(self): def __enter__(self):
return self.requests return self.requests
@ -290,7 +293,11 @@ class TestHostFactory(unittest.TestCase):
"buckets": [a4bucket, a8bucket]}]}) "buckets": [a4bucket, a8bucket]}]})
epoch_clock = MockClock((1970, 1, 1, 0, 0, 0)) epoch_clock = MockClock((1970, 1, 1, 0, 0, 0))
hostnamer = MockHostnamer() hostnamer = MockHostnamer()
provider = cyclecloud_provider.CycleCloudProvider(provider_config, cluster, hostnamer, json_writer, RequestsStoreInMem(), RequestsStoreInMem(), epoch_clock) provider = cyclecloud_provider.CycleCloudProvider(provider_config, cluster, hostnamer, json_writer,
terminate_requests=RequestsStoreInMem(),
creation_requests=RequestsStoreInMem(),
templates=RequestsStoreInMem(),
clock=epoch_clock)
provider.capacity_tracker.reset() provider.capacity_tracker.reset()
return provider return provider
@ -387,7 +394,7 @@ class TestHostFactory(unittest.TestCase):
status_response = provider.status({"requests": [{"requestId": "delete-missing"}]}) status_response = provider.status({"requests": [{"requestId": "delete-missing"}]})
# self.assertEqual({'status': 'running', 'requests': [{'status': 'running', "message": "Unknown termination request id.", 'requestId': 'delete-missing', 'machines': []}]}, status_response) # self.assertEqual({'status': 'running', 'requests': [{'status': 'running', "message": "Unknown termination request id.", 'requestId': 'delete-missing', 'machines': []}]}, status_response)
self.assertEqual({'status': 'running', 'requests': [{'status': 'complete_with_error', "message": "Unknown termination request id.", 'requestId': 'delete-missing', 'machines': []}]}, status_response) self.assertEqual({'status': 'running', 'requests': [{'status': 'complete_with_error', "message": "Warning: Ignoring unknown termination request id.", 'requestId': 'delete-missing', 'machines': []}]}, status_response)
def test_terminate_status(self): def test_terminate_status(self):
provider = self._new_provider() provider = self._new_provider()
@ -469,6 +476,8 @@ class TestHostFactory(unittest.TestCase):
self.assertEqual(100, provider.templates()["templates"][0]["maxNumber"]) self.assertEqual(100, provider.templates()["templates"][0]["maxNumber"])
def test_reprioritize_template(self): def test_reprioritize_template(self):
provider = self._new_provider() provider = self._new_provider()