Integration with scalelib 1.0.3

This commit is contained in:
nidhi0622 2024-01-19 14:19:15 -06:00
Родитель cc64ab2b12
Коммит 9744ca3f61
6 изменённых файлов: 214 добавлений и 715 удалений

Просмотреть файл

@ -1,109 +0,0 @@
import os
import calendar
from util import JsonStore, init_logging
class CapacityTrackingDb:
def __init__(self, config, cluster_name, clock, limits_timeout=300):
self.config = config
self.cluster_name = cluster_name
self.clock = clock
self.limits_timeout = limits_timeout
# initialize in constructor so that cyclecloud_provider can initialize this
# with the proper log_level. In tests, this will use the default.
self.logger = init_logging()
default_dir = os.getenv('HF_WORKDIR', '/var/tmp')
self.db_dir = config.get('symphony.hostfactory.db_path', default_dir)
self.requests_db = JsonStore('azurecc_requests.json', self.db_dir)
self.capacity_db = JsonStore('azurecc_capacity.json', self.db_dir)
def reset(self):
self.requests_db.clear()
self.capacity_db.clear()
def add_request(self, request_set):
with self.requests_db as db:
request_id = request_set['requestId']
db[request_id] = { 'requestId': request_id,
'timestamp': calendar.timegm(self.clock()),
'sets': [request_set] }
def get_requests(self):
pending_requests = self.requests_db.read()
return pending_requests
def get_request(self, request_id):
pending_requests = self.get_requests()
if request_id in pending_requests:
return pending_requests[request_id]
return None
def remove_request(self, request_id):
with self.requests_db as pending_requests:
pending_requests.pop(request_id)
def remove_requests(self, request_ids):
with self.requests_db as pending_requests:
for request_id in request_ids:
pending_requests.pop(request_id)
def remove_limits(self, capacity_keys):
with self.capacity_db as db:
for k in capacity_keys:
db.pop(k)
def _capacity_key(self, nodearray_name, machine_type):
return "%s_%s" % (nodearray_name, machine_type)
def pause_capacity(self, nodearray_name, machine_type):
with self.capacity_db as db:
now = calendar.timegm(self.clock())
key = self._capacity_key(nodearray_name, machine_type)
db[key] = { 'nodearray': nodearray_name,
'machine_type': machine_type,
'max_count': 0, # we used to provide max_count set to 0 so left for backwards compatibility
'start_time': now }
def _release_expired_limits(self):
# Return True if any limits changed
def _limit_expired(now, capacity_limit):
expiry_time = self.limits_timeout + capacity_limit['start_time']
return now >= expiry_time
now = calendar.timegm(self.clock())
expired = []
for k, v in self.capacity_db.read().items():
if _limit_expired(now, v):
expired.append(k)
if expired:
self.remove_limits(expired)
return len(expired) > 0
def request_completed(self, request_status):
# Return True if any limits changed
limits_changed = False
request_id = request_status["requestId"]
num_created = len(request_status["machines"])
request_envelope = self.get_request(request_id)
if request_envelope:
self.remove_request(request_id)
self._release_expired_limits()
def is_paused(self, nodearray_name, machine_type):
key = self._capacity_key(nodearray_name, machine_type)
ret = False
limited_buckets = self.capacity_db.read()
if key in limited_buckets:
ret = True
self.logger.info("Limiting reported priority for machine_type %s in nodearray %s to 0", machine_type, nodearray_name)
self._release_expired_limits()
return ret

Просмотреть файл

@ -5,15 +5,13 @@ from urllib.parse import urlencode
from builtins import str
from copy import deepcopy
from hpc.autoscale.node.nodemanager import new_node_manager
from hpc.autoscale.util import partition
try:
import cyclecli
except ImportError:
import cyclecliwrapper as cyclecli
class OutOfCapacityError(RuntimeError):
pass
class Cluster:
@ -43,78 +41,47 @@ class Cluster:
def get_buckets(self):
buckets = self.node_mgr.get_buckets()
status_json = self.status()
nodearrays = status_json["nodearrays"]
for nodearray_root in nodearrays:
for bucket in buckets:
if nodearray_root.get("name") == bucket.nodearray:
bucket.priority = nodearray_root.get("Priority")
self.logger.debug("Buckets %s", buckets)
return buckets
def limit_request_by_available_count(self, status, request, logger):
disable_active_count_fix = bool(self.provider_config.get("symphony.disable_active_count_fix", False))
if disable_active_count_fix:
return request
request_copy = deepcopy(request)
request_set = request_copy['sets'][0]
machine_type = request_set["definition"]["machineType"]
nodearray_name = request_set['nodearray']
filtered = [x for x in status["nodearrays"] if x["name"] == nodearray_name]
if len(filtered) < 1:
raise RuntimeError(f"Nodearray {nodearray_name} does not exist or has been removed")
nodearray = filtered[0]
def add_nodes_scalelib(self, request, template_id, use_weighted_templates=False, vmTypes={}, dry_run=False):
# if true, do new slot based allocation with weighting
# if false, use vm_size/node based allocation
if use_weighted_templates:
self.logger.debug("Using weighted templates")
self.node_mgr.add_default_resource(selection={}, resource_name="template_id", default_value="node.nodearray")
self.logger.debug("vmTypes %s", vmTypes.items())
for vm_size, weight in vmTypes.items():
self.node_mgr.add_default_resource(selection={"node.vm_size": vm_size},
resource_name="weight",
default_value=weight)
else:
self.node_mgr.add_default_resource(selection={}, resource_name="template_id",
default_value=lambda node: "{node.nodearray + node.vm_size.replace('_', '')}".lower())
self.node_mgr.add_default_resource(selection={}, resource_name="weight", default_value=1)
filtered_buckets = [x for x in nodearray["buckets"] if x["definition"]["machineType"] == machine_type]
if len(filtered_buckets) < 1:
raise RuntimeError(f"VM Size {machine_type} does not exist or has been removed from nodearray {nodearray_name}")
bucket = filtered_buckets[0]
if bucket["availableCount"] == 0:
raise OutOfCapacityError(f"No availablity for {nodearray_name}/{machine_type}")
result = self.node_mgr.allocate({"weight": 1, "template_id": template_id, "capacity-failure-backoff": 300},
slot_count=request['sets'][0]['count'],
allow_existing=False)
self.logger.debug("Result of allocation %s", result)
if bucket["availableCount"] < request_set["count"]:
logger.warning(f"Requesting available count {bucket['availableCount']} vs requested. {request_set['count']}")
logger.warning(f"This could trigger a pause capacity for nodearray {nodearray_name} VM Size {machine_type}")
request_set["count"] = bucket["availableCount"]
return request_copy
def get_avail_count(self, machine_type, nodearray_name):
status_resp = self.status()
filtered = [x for x in status_resp["nodearrays"] if x["name"] == nodearray_name]
if len(filtered) < 1:
raise RuntimeError(f"Nodearray {nodearray_name} does not exist or has been removed")
nodearray = filtered[0]
filtered_buckets = [x for x in nodearray["buckets"] if x["definition"]["machineType"] == machine_type]
if len(filtered_buckets) < 1:
raise RuntimeError(f"VM Size {machine_type} does not exist or has been removed from nodearray {nodearray_name}")
bucket = filtered_buckets[0]
return bucket["availableCount"]
def add_nodes(self, request):
# TODO: Remove request_copy once Max count is correctly enforced in CC.
status_resp = self.status()
request_copy = self.limit_request_by_available_count(status=status_resp, request=request, logger=self.logger)
response = self.add_nodes_scalelib(request_copy, max_count)
# TODO: Get rid of extra status call in CC 8.4.0
import time
request_copy_set = request_copy['sets'][0]
origin_avail_count = self.get_avail_count(request_copy_set["definition"]["machineType"], request_copy_set['nodearray'])
max_mitigation_attempts = int(self.provider_config.get("symphony.max_status_mitigation_attempts", 10))
i = 0
avail_has_decreased = False
self.logger.info("BEGIN Overallocation Mitigation request id %s", request["requestId"])
while i < max_mitigation_attempts and not avail_has_decreased:
i = i + 1
new_avail_count = self.get_avail_count(request_copy_set["definition"]["machineType"], request_copy_set['nodearray'])
if new_avail_count < origin_avail_count:
avail_has_decreased = True
break
time.sleep(1)
if avail_has_decreased:
self.logger.info("END Availibility updated after %d attempts for requestId %s", i, request["requestId"])
else:
self.logger.warning("END For request %s availability has not properly updated after %d attempts", request["requestId"], max_mitigation_attempts)
if dry_run:
by_vm_size = partition(result.nodes, lambda node: node.vm_size)
for key,value in by_vm_size.items():
self.logger.debug("VM Size %s count %s", key, len(value))
print("Allocation result:")
print (key, len(value))
return True
if not dry_run and result:
request_id_start = f"{request['requestId']}-start"
request_id_create = f"{request['requestId']}-create"
return self.node_mgr.bootup(request_id_start=request_id_start, request_id_create=request_id_create)
return False
def add_nodes(self, request, use_weighted_templates=False, vmTypes={}, dry_run=False):
response = self.add_nodes_scalelib(request, template_id=request['sets'][0]['definition']['templateId'],
use_weighted_templates=use_weighted_templates, vmTypes=vmTypes, dry_run=dry_run)
return response
def all_nodes(self):

Просмотреть файл

@ -9,12 +9,12 @@ import pprint
import sys
import uuid
from builtins import str
import weighted_template_parse
from symphony import RequestStates, MachineStates, MachineResults, SymphonyRestClient
import cluster
from cluster import OutOfCapacityError
from capacity_tracking_db import CapacityTrackingDb
from request_tracking_db import RequestTrackingDb
from util import JsonStore, failureresponse
import util
import symphony
@ -40,13 +40,12 @@ class InvalidCycleCloudVersionError(RuntimeError):
class CycleCloudProvider:
def __init__(self, config, cluster, hostnamer, json_writer, terminate_requests, creation_requests, templates, clock):
def __init__(self, config, cluster, hostnamer, json_writer, terminate_requests, creation_requests, clock):
self.config = config
self.cluster = cluster
self.hostnamer = hostnamer
self.json_writer = json_writer
self.terminate_json = terminate_requests
self.templates_json = templates
self.creation_json = creation_requests
self.exit_code = 0
self.clock = clock
@ -54,369 +53,57 @@ class CycleCloudProvider:
self.creation_request_ttl = int(self.config.get("symphony.creation_request_ttl", 40 * 60))
self.node_request_timeouts = float(self.config.get("cyclecloud.machine_request_retirement", 120) * 60)
self.fine = False
self.capacity_tracker = CapacityTrackingDb(self.config, self.cluster.cluster_name, self.clock)
self.request_tracker = RequestTrackingDb(self.config, self.cluster.cluster_name, self.clock)
self.weighted_template = weighted_template_parse.WeightedTemplates(logger)
self.dry_run = False
def _escape_id(self, name):
return name.lower().replace("_", "")
def example_templates(self):
self._example_templates(self.templates()["templates"], [sys.stdout])
def _example_templates(self, templates, writers):
example = OrderedDict()
nodearrays = []
for template in templates["templates"]:
nodearray = template["attributes"]["nodearray"][1]
if nodearray not in nodearrays:
nodearrays.append(nodearray)
for nodearray in nodearrays:
example[nodearray] = {"templateId": nodearray,
"attributes": {"custom": ["String", "custom_value"]}}
for writer in writers:
json.dump(example, writer, indent=2, separators=(',', ': '))
def _check_for_zero_capacity(self, buckets):
at_least_one_available_bucket = False
for bucket in buckets:
autoscale_enabled = bucket.software_configuration.get("autoscaling", {}).get("enabled", False)
if not autoscale_enabled:
continue
max_count = bucket.max_count
is_paused = self.capacity_tracker.is_paused(bucket.nodearray, bucket.vm_size)
if is_paused:
max_count = 0
at_least_one_available_bucket = at_least_one_available_bucket or max_count > 0
return not at_least_one_available_bucket
def _generate_templates(self):
def _generate_templates(self):
"""
input (ignored):
[]
output:
{'templates': [{'attributes': {'azurecchost': ['Boolean', '1'],
'mem': ['Numeric', '2048'],
'ncores': ['Numeric', '4'],
'ncpus': ['Numeric', '1'],
'type': ['String', 'X86_64'],
'zone': ['String', 'southeastus']},
'instanceTags': 'group=project1',
'maxNumber': 10,
'pgrpName': None,
'priority': 0,
'templateId': 'execute0'},
{'attributes': {'azurecchost': ['Boolean', '1'],
'mem': ['Numeric', '4096'],
'ncores': ['Numeric', '8'],
'ncpus': ['Numeric', '1'],
'rank': ['Numeric', '0'],
'priceInfo': ['String', 'price:0.1,billingTimeUnitType:prorated_hour,billingTimeUnitNumber:1,billingRoundoffType:unit'],
'type': ['String', 'X86_64'],
'zone': ['String', 'southeastus']},
'instanceTags': 'group=project1',
'maxNumber': 10,
'pgrpName': None,
'priority': 0,
'templateId': 'execute1'}]}
"""
at_least_one_available_bucket = False
templates_store = self.templates_json.read()
# returns Cloud.Node records joined on MachineType - the array node only
response = self.cluster.status()
nodearrays = response["nodearrays"]
if "nodeArrays" in nodearrays:
logger.error("Invalid CycleCloud version. Please upgrade your CycleCloud instance.")
raise InvalidCycleCloudVersionError("Invalid CycleCloud version. Please upgrade your CycleCloud instance.")
if self.fine:
logger.debug("nodearrays response\n%s", json.dumps(nodearrays, indent=2))
# We cannot report to Symphony that we have 0 capacity on all VMs. If we detect this we
# simply reset any temporary capacity constraints.
if self._check_for_zero_capacity(buckets):
logger.warning("All buckets have 0 capacity. Resetting capacity tracker.")
self.capacity_tracker.reset()
currently_available_templates = set()
for bucket in buckets:
autoscale_enabled = bucket.software_configuration.get("autoscaling", {}).get("enabled", False)
if not autoscale_enabled:
continue
# Symphony hates special characters
template_id = "%s%s" % (bucket.nodearray, bucket.vm_size)
template_id = self._escape_id(template_id)
currently_available_templates.add(template_id)
max_count = bucket.max_count
is_low_prio = bucket.spot
at_least_one_available_bucket = at_least_one_available_bucket or max_count > 0
memory = bucket.memory.convert_to("m").value
ngpus = 0
try:
ngpus = int(bucket.software_configuration.get("symphony", {}).get("ngpus", bucket.gpu_count))
except ValueError:
logger.exception("Ignoring symphony.ngpus for nodearray %s" % bucket.nodearray)
base_priority = bucket_priority(buckets, bucket)
# Symphony
# - uses nram rather than mem
# - uses strings for numerics
record = {
"maxNumber": max_count,
"templateId": template_id,
"priority": base_priority,
"attributes": {
"zone": ["String", bucket.location],
"mem": ["Numeric", "%d" % memory],
"nram": ["Numeric", "%d" % memory],
# NOTE:
# ncpus == num_sockets == ncores / cores_per_socket
# Since we don't generally know the num_sockets,
# just set ncpus = 1 for all skus (1 giant CPU with N cores)
#"ncpus": ["Numeric", "%d" % machine_type.get("???physical_socket_count???")],
"ncpus": ["Numeric", "%d" % bucket.resources.get("ncpus", 1)],
"ncores": ["Numeric", "%d" % bucket.resources.get("ncores", bucket.vcpu_count)],
"ngpus": ["Numeric", ngpus],
"azurecchost": ["Boolean", "1"],
'rank': ['Numeric', '0'],
'priceInfo': ['String', 'price:0.1,billingTimeUnitType:prorated_hour,billingTimeUnitNumber:1,billingRoundoffType:unit'], 'type': ['String', 'X86_64'],
"type": ["String", "X86_64"],
"machinetypefull": ["String", bucket.vm_size],
"machinetype": ["String", bucket.vm_size],
"nodearray": ["String", bucket.nodearray],
"azureccmpi": ["Boolean", "0"],
"azurecclowprio": ["Boolean", "1" if is_low_prio else "0"]
}
}
# deepcopy so we can pop attributes
for override_sub_key in ["default", bucket.nodearray]:
overrides = deepcopy(self.config.get("templates.%s" % override_sub_key, {}))
attribute_overrides = overrides.pop("attributes", {})
record.update(overrides)
record["attributes"].update(attribute_overrides)
attributes = self.generate_userdata(record)
custom_env = self._parse_UserData(record.pop("UserData", "") or "")
record["UserData"] = {"symphony": {}}
if custom_env:
record["UserData"]["symphony"] = {"custom_env": custom_env,
"custom_env_names": " ".join(sorted(custom_env))}
record["UserData"]["symphony"]["attributes"] = attributes
record["UserData"]["symphony"]["attribute_names"] = " ".join(sorted(attributes))
record["pgrpName"] = None
is_paused_capacity = self.capacity_tracker.is_paused(bucket.nodearray, bucket.vm_size)
if is_paused_capacity:
record["priority"] = 0
templates_store[template_id] = record
# for templates that are no longer available, advertise them but set maxNumber = 0
for symphony_template in templates_store.values():
if symphony_template["templateId"] not in currently_available_templates:
if self.fine:
logger.debug("Ignoring old template %s vs %s", symphony_template["templateId"], currently_available_templates)
symphony_template["maxNumber"] = 0
symphony_template["priority"] = 0
symphony_templates = list(templates_store.values())
symphony_templates = sorted(symphony_templates, key=lambda x: -x["priority"])
# Note: we aren't going to store this, so it will naturally appear as an error during allocation.
if not at_least_one_available_bucket:
symphony_templates.insert(0, PLACEHOLDER_TEMPLATE)
self.templates_json.write(templates_store)
return symphony_templates
# Regenerate templates (and potentially reconfig HostFactory) without output, so
# this method is safe to call from other API calls
def _update_templates(self):
"""
input (ignored):
[]
output:
{'templates': [{'attributes': {'azurecchost': ['Boolean', '1'],
'mem': ['Numeric', '2048'],
'ncores': ['Numeric', '4'],
'ncpus': ['Numeric', '1'],
'type': ['String', 'X86_64'],
'zone': ['String', 'southeastus']},
'instanceTags': 'group=project1',
'maxNumber': 10,
'pgrpName': None,
'priority': 0,
'templateId': 'execute0'},
{'attributes': {'azurecchost': ['Boolean', '1'],
'mem': ['Numeric', '4096'],
'ncores': ['Numeric', '8'],
'ncpus': ['Numeric', '1'],
'rank': ['Numeric', '0'],
'priceInfo': ['String', 'price:0.1,billingTimeUnitType:prorated_hour,billingTimeUnitNumber:1,billingRoundoffType:unit'],
'type': ['String', 'X86_64'],
'zone': ['String', 'southeastus']},
'instanceTags': 'group=project1',
'maxNumber': 10,
'pgrpName': None,
'priority': 0,
'templateId': 'execute1'}]}
"""
templates_store = self.templates_json.read()
prior_templates_str = json.dumps(templates_store, indent=2, sort_keys=True)
logger.info("Prior templates:\n%s", prior_templates_str)
symphony_templates = self._generate_templates()
templates_store = self.templates_json.read()
new_templates_str = json.dumps(templates_store, indent=2, sort_keys=True)
logger.info("new templates:\n%s", new_templates_str)
#if new_templates_str != prior_templates_str and len(prior_templates) > 0: this probably was here to avoid a deadlock, but here we are avoiding with do_REST
if new_templates_str != prior_templates_str:
generator = difflib.context_diff(prior_templates_str.splitlines(), new_templates_str.splitlines())
difference = "\n".join([str(x) for x in generator])
new_template_order = ", ".join(["%s:%s" % (x.get("templateId", "?"), x.get("maxNumber", "?")) for x in symphony_templates])
logger.warning("Templates have changed - new template priority order: %s", new_template_order)
logger.warning("Diff:\n%s", str(difference))
try:
rest_client = SymphonyRestClient(self.config, logger)
rest_client.update_hostfactory_templates({"templates": symphony_templates, "message": "Get available templates success."})
persist_templates = True
except:
logger.exception("Ignoring failure to update cluster templates via Symphony REST API. (Is REST service running?)")
if persist_templates:
self.templates_json.write(templates_store)
return symphony_templates
# TODO: method to generate templates in memory and methods for returning it for getAvailableTemplates or REST call.
# If we return an empty list or templates with 0 hosts, it removes us forever and ever more, so _always_
# return at least one machine.
# BUGFIX: exiting non-zero code will make symphony retry.
def templates(self):
def templates(self):
try:
if not self.config.get("symphony.enable_template_creation", False):
pro_conf_dir=os.getenv('PRO_CONF_DIR', os.getcwd())
conf_path = os.path.join(pro_conf_dir, "conf", "azureccprov_templates.json")
with open(conf_path, 'r') as json_file:
template_json = json.load(json_file)
symphony_templates = template_json["templates"]
templates_store = {}
for template in symphony_templates:
key = template["templateId"]
templates_store[key] = template
self.templates_json.write(templates_store)
logger.info("Symphony tempolates")
logger.info(symphony_templates)
else:
symphony_templates = self._generate_templates()
return self.json_writer({"templates": symphony_templates, "message": "Get available templates success."}, debug_output=False)
pro_conf_dir=os.getenv('PRO_CONF_DIR', os.getcwd())
conf_path = os.path.join(pro_conf_dir, "conf", "azureccprov_templates.json")
with open(conf_path, 'r') as json_file:
templates_json = json.load(json_file)
templates_json["message"] = "Get available templates success."
return self.json_writer(templates_json, debug_output=False)
except:
logger.warning("Exiting Non-zero so that symphony will retry")
logger.exception("Could not get template_json")
logger.exception("Could not get templates_json")
sys.exit(1)
def generate_userdata(self, template):
ret = {}
for key, value_array in template.get("attributes", {}).items():
if len(value_array) != 2:
logger.error("Invalid attribute %s %s", key, value_array)
continue
if value_array[0].lower() == "boolean":
ret[key] = str(value_array[1] != "0").lower()
def generate_sample_template(self):
buckets = self.cluster.get_buckets()
template_dict = {}
for bucket in buckets:
autoscale_enabled = bucket.software_configuration.get("autoscaling", {}).get("enabled", False)
if not autoscale_enabled:
#print("Autoscaling is disabled in CC for nodearray %s" % bucket.nodearray)
continue
if template_dict.get(bucket.nodearray) is None:
template_dict[bucket.nodearray] = {}
template_dict[bucket.nodearray]["templateId"] = bucket.nodearray
template_dict[bucket.nodearray]["maxNumber"] = bucket.max_count
template_dict[bucket.nodearray]["attributes"] = {}
template_dict[bucket.nodearray]["attributes"]["type"] = ["String", "X86_64"]
template_dict[bucket.nodearray]["attributes"]["nram"] = ["Numeric", "4096"]
template_dict[bucket.nodearray]["attributes"]["ncpus"] = ["Numeric", "1"]
template_dict[bucket.nodearray]["attributes"]["ncores"] = ["Numeric", "1"]
template_dict[bucket.nodearray]["vmTypes"] = {}
weight = bucket.resources.get("ncores", bucket.vcpu_count)
template_dict[bucket.nodearray]["vmTypes"].update({bucket.vm_size: weight})
else:
ret[key] = value_array[1]
if template.get("customScriptUri"):
ret["custom_script_uri"] = template.get("customScriptUri")
weight = bucket.resources.get("ncores", bucket.vcpu_count)
template_dict[bucket.nodearray]["vmTypes"].update({bucket.vm_size: weight})
templates = {"templates": list(template_dict.values())}
print(json.dumps(templates, indent=4))
return ret
def _parse_UserData(self, user_data):
ret = {}
user_data = (user_data or "").strip()
if not user_data:
return ret
key_values = user_data.split(";")
# kludge: this can be overridden either at the template level
# or during a creation request. We always want it defined in userdata
# though.
for kv in key_values:
try:
key, value = kv.split("=", 1)
ret[key] = value
except ValueError:
logger.error("Invalid UserData entry! '%s'", kv)
return ret
def is_capacity_limited(self, bucket):
return False
def _max_count(self, nodearray_name, nodearray, machine_cores, bucket):
if machine_cores < 0:
logger.error("Invalid number of machine cores - %s", machine_cores)
return -1
max_count = bucket.get("maxCount")
if max_count is not None:
max_count = max(-1, max_count)
logger.debug("Using maxCount %s for %s", max_count, bucket)
else:
max_core_count = bucket.get("maxCoreCount")
if max_core_count is None:
if nodearray.get("maxCoreCount") is None:
logger.error("Need to define either maxCount or maxCoreCount! %s", pprint.pformat(bucket))
return -1
max_core_count = nodearray.get("maxCoreCount")
logger.debug("Using maxCoreCount %s",max_core_count)
max_core_count = max(-1, max_core_count)
max_count = max_core_count / machine_cores
# We handle unexpected Capacity failures (Spot) by zeroing out capacity for a timed duration
# machine_type_name = bucket["definition"]["machineType"]
# Below code is commented out as a customer faced an issue where autoscaling was being limited
# by spot_increment_per_sku.
# For Spot instances, quota and limits are not great indicators of capacity, so artificially limit
# requests to single machine types to spread the load and find available skus for large workloads
# is_low_prio = nodearray.get("Interruptible", False)
# if is_low_prio:
# # Allow up to N _additional_ VMs (need to keep increasing this or symphony will stop considering the sku)
# active_count = bucket["activeCount"]
# max_count = min(max_count, active_count+self.spot_maxnumber_increment_per_sku)
return int(max_count)
@failureresponse({"requests": [], "status": RequestStates.running})
def create_machines(self, input_json):
"""
@ -428,73 +115,49 @@ class CycleCloudProvider:
output:
{'message': 'Request VM from Azure CycleCloud successful.',
'requestId': 'req-123'}
"""
"""
request_id = str(uuid.uuid4())
logger.info("Creating requestId %s", request_id)
try:
# save the request so we can time it out
with self.creation_json as requests_store:
requests_store[request_id] = {"requestTime": calendar.timegm(self.clock()),
"completedNodes": [],
"allNodes": None,
"completed": False,
"lastNumNodes": input_json["template"]["machineCount"]}
except:
logger.exception("Could not open creation_json")
sys.exit(1)
try:
template_store = self.templates_json.read()
# same as nodearrays - Cloud.Node joined with MachineType
template_id = input_json["template"]["templateId"]
template = template_store.get(template_id)
if not template:
available_templates = template_store.keys()
return self.json_writer({"requestId": request_id, "status": RequestStates.complete_with_error,
"message": "Unknown templateId %s. Available %s" % (template_id, available_templates)})
machine_count = input_json["template"]["machineCount"]
def _get(name):
return template["attributes"].get(name, [None, None])[1]
# rc_account = input_json.get("rc_account", "default")
# user_data = template.get("UserData")
# if rc_account != "default":
# if "symphony" not in user_data:
# user_data["symphony"] = {}
# if "custom_env" not in user_data["symphony"]:
# user_data["symphony"]["custom_env"] = {}
# user_data["symphony"]["custom_env"]["rc_account"] = rc_account
# user_data["symphony"]["custom_env_names"] = " ".join(sorted(user_data["symphony"]["custom_env"].keys()))
nodearray = _get("nodearray")
machinetype_name = _get("machinetypefull")
request_set = { 'count': machine_count,
'definition': {'machineType': machinetype_name},
# 'nodeAttributes': {'Tags': {"rc_account": rc_account},
# 'Configuration': user_data},
'nodearray': nodearray }
# if template["attributes"].get("placementgroup"):
# request_set["placementGroupId"] = template["attributes"].get("placementgroup")[1]
if not self.dry_run:
try:
# save the request so we can time it out
with self.creation_json as requests_store:
requests_store[request_id] = {"requestTime": calendar.timegm(self.clock()),
"completedNodes": [],
"allNodes": None,
"completed": False,
"lastNumNodes": input_json["template"]["machineCount"]}
except:
logger.exception("Could not open creation_json")
sys.exit(1)
try:
use_weighted_templates = False
vmTypes = {}
if self.config.get("symphony.enable_weighted_templates", True):
pro_conf_dir=os.getenv('PRO_CONF_DIR', os.getcwd())
conf_path = os.path.join(pro_conf_dir, "conf", "azureccprov_templates.json")
with open(conf_path, 'r') as json_file:
templates_json = json.load(json_file)
vmTypes = self.weighted_template.parse_weighted_template(input_json, templates_json["templates"])
logger.debug("vmTypes %s", vmTypes)
use_weighted_templates = True
request_set = { 'count': input_json["template"]["machineCount"],
'definition':{'templateId':input_json["template"]["templateId"]}}
if self.dry_run:
add_nodes_response = self.cluster.add_nodes({'requestId': request_id,
'sets': [request_set]},use_weighted_templates, vmTypes, self.dry_run)
if add_nodes_response:
print("Dry run succeeded")
exit(0)
# We are grabbing the lock to serialize this call.
try:
with self.creation_json as requests_store:
add_nodes_response = self.cluster.add_nodes({'requestId': request_id,
'sets': [request_set]}, template.get("maxNumber"))
'sets': [request_set]},use_weighted_templates, vmTypes)
finally:
request_set['requestId'] = request_id
self.capacity_tracker.add_request(request_set)
self.request_tracker.add_request(request_set)
if not add_nodes_response:
raise ValueError("No nodes were created")
@ -503,14 +166,10 @@ class CycleCloudProvider:
with self.creation_json as requests_store:
requests_store[request_id]["allNodes"] = [self.cluster.get_node_id(x) for x in add_nodes_response.nodes]
if template["attributes"].get("placementgroup"):
logger.info("Requested %s instances of machine type %s in placement group %s for nodearray %s.", machine_count, machinetype_name, _get("placementgroup"), _get("nodearray"))
else:
logger.info("Requested %s instances of machine type %s in nodearray %s.", machine_count, machinetype_name, _get("nodearray"))
return self.json_writer({"requestId": request_id, "status": RequestStates.running,
"message": "Request instances success from Azure CycleCloud."})
except UserError as e:
logger.exception("Azure CycleCloud experienced an error and the node creation request failed. %s", e)
return self.json_writer({"requestId": request_id, "status": RequestStates.complete_with_error,
@ -519,10 +178,6 @@ class CycleCloudProvider:
logger.exception("Azure CycleCloud experienced an error and the node creation request failed. %s", e)
return self.json_writer({"requestId": request_id, "status": RequestStates.complete_with_error,
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
except OutOfCapacityError as e:
logger.warning("Request Id %s failed with out of capacity %s", request_id, e)
return self.json_writer({"requestId": request_id, "status": RequestStates.running,
"message": "Azure CycleCloud does not currently have capacity %s" % str(e)})
except Exception as e:
logger.exception("Azure CycleCloud experienced an error, though it may have succeeded: %s", e)
return self.json_writer({"requestId": request_id, "status": RequestStates.running,
@ -812,25 +467,6 @@ class CycleCloudProvider:
requests_store[request_id] = {"requestTime": calendar.timegm(self.clock())}
#set default
requests_store[request_id]["lastUpdateTime"] = calendar.timegm(self.clock())
actual_machine_cnt = len(valid_nodes)
if not requests_store[request_id].get("lastNumNodes") :
requests_store[request_id]["lastNumNodes"] = actual_machine_cnt
if actual_machine_cnt < requests_store[request_id]["lastNumNodes"]:
request_envelope = self.capacity_tracker.get_request(request_id)
if not request_envelope:
logger.warning("No request envelope maybe all buckets have hit capacity issue")
logger.debug("No request envelope found for request id %s", request_id)
else:
reqs = request_envelope.get('sets', [])
if not reqs:
continue
assert len(reqs) == 1
req = reqs[0]
nodearray_name = req['nodearray']
machine_type = req['definition']['machineType']
logger.warning("Out-of-capacity condition detected for machine_type %s in nodearray %s", machine_type, nodearray_name)
self.capacity_tracker.pause_capacity(nodearray_name=nodearray_name, machine_type=machine_type)
requests_store[request_id]["lastNumNodes"] = actual_machine_cnt
requests_store[request_id]["completedNodes"] = completed_nodes
if requests_store[request_id].get("allNodes") is None:
@ -1119,11 +755,11 @@ class CycleCloudProvider:
delete_response = self._deperecated_terminate_status({"requests": deletes})
assert "status" in delete_response
# Update capacity tracking
# Update request tracking
if 'requests' in create_response:
for cr in create_response['requests']:
if cr['status'] in [ RequestStates.complete ]:
self.capacity_tracker.request_completed(cr)
self.request_tracker.request_completed(cr)
create_status = create_response.get("status", RequestStates.complete)
delete_status = delete_response.get("status", RequestStates.complete)
@ -1137,9 +773,6 @@ class CycleCloudProvider:
combined_status = RequestStates.complete_with_error
else:
combined_status = RequestStates.complete
# if the Capacity limits have changed, templates will be updated at the end of this call.
# -> HostFactory rarely (if ever) calls getAvailableTemplates after startup
response = {"status": combined_status,
"requests": create_response.get("requests", []) + delete_response.get("requests", [])
@ -1227,12 +860,6 @@ class CycleCloudProvider:
request["completed"] = True
def periodic_cleanup(self):
try:
if self.config.get("symphony.enable_template_creation", False):
self._update_templates()
except Exception:
logger.exception("Could not update templates")
try:
self._retry_termination_requests()
except Exception:
@ -1265,28 +892,32 @@ class CycleCloudProvider:
internal_completed_nodes.update(set(request.get("completedNodes")))
incomplete_nodes = actual_completed_nodes - internal_completed_nodes
print(incomplete_nodes)
def bucket_priority(nodearrays, bucket_nodearray, b_index):
nodearray = bucket_nodearray.get("nodearray")
prio = nodearray.get("Priority")
if isinstance(prio, str):
try:
prio = int(float(prio))
except Exception:
prio = None
if isinstance(prio, float):
prio = int(prio)
if isinstance(prio, int):
if prio < 0:
prio = None
if prio is not None and not isinstance(prio, int):
prio = None
if prio is None:
prio = (nodearrays.index(bucket_nodearray.nodearray) + 1) * 10
if prio > 0:
return prio * 1000 - b_index
assert prio == 0, f'Unexpected prio {prio} - should ALWAYS be >= 0.'
return prio
def validate_template(self):
cluster_status = self.cluster.status()
nodearrays = cluster_status["nodearrays"]
pro_conf_dir=os.getenv('PRO_CONF_DIR', os.getcwd())
conf_path = os.path.join(pro_conf_dir, "conf", "azureccprov_templates.json")
with open(conf_path, 'r') as json_file:
templates_json = json.load(json_file)
templates_json = templates_json["templates"]
for template in templates_json:
for nodearray_root in nodearrays:
if template["templateId"] == nodearray_root.get("name"):
for key,value in template["vmTypes"].items():
vmTypeExist = False
for bucket in nodearray_root.get("buckets"):
if key == bucket.get("definition")["machineType"]:
vmTypeExist = True
break
if not vmTypeExist:
print("Template validation failed")
print("vmType %s does not exist in nodearray %s" % (key, template["templateId"]))
return False
print("Template validation passed")
return True
#generate_template as a stdout
def simple_json_writer(data, debug_output=True): # pragma: no cover
data_str = json.dumps(data)
@ -1308,6 +939,7 @@ def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
provider_config, logger, fine = util.provider_config_from_environment()
data_dir = os.getenv('PRO_DATA_DIR', os.getcwd())
conf_dir = os.getenv('PRO_CONF_DIR', os.getcwd())
hostnamer = util.Hostnamer(provider_config.get("cyclecloud.hostnames.use_fqdn", True))
cluster_name = provider_config.get("cyclecloud.cluster.name")
@ -1317,17 +949,22 @@ def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
json_writer=json_writer,
terminate_requests=JsonStore("terminate_requests.json", data_dir),
creation_requests=JsonStore("create_requests.json", data_dir),
templates=JsonStore("templates.json", data_dir, formatted=True),
clock=true_gmt_clock)
provider.fine = fine
# every command has the format cmd -f input.json
cmd, ignore, input_json_path = argv[1:]
input_json = util.load_json(input_json_path)
logger.info("Arguments - %s %s %s", cmd, ignore, json.dumps(input_json))
if input_json.get("dry-run"):
provider.validate_template()
provider.dry_run = True
if cmd == "generate_template":
provider.generate_sample_template()
if cmd == "templates":
provider.templates()
elif cmd == "create_machines":
@ -1349,6 +986,8 @@ def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
elif cmd == "debug_completed_nodes":
provider.debug_completed_nodes()
# best effort cleanup.
provider.periodic_cleanup()

Просмотреть файл

@ -0,0 +1,54 @@
import os
import calendar
from util import JsonStore, init_logging
class RequestTrackingDb:
def __init__(self, config, cluster_name, clock):
self.config = config
self.cluster_name = cluster_name
self.clock = clock
# initialize in constructor so that cyclecloud_provider can initialize this
# with the proper log_level. In tests, this will use the default.
self.logger = init_logging()
default_dir = os.getenv('HF_WORKDIR', '/var/tmp')
self.db_dir = config.get('symphony.hostfactory.db_path', default_dir)
self.requests_db = JsonStore('azurecc_requests.json', self.db_dir)
def reset(self):
self.requests_db.clear()
def add_request(self, request_set):
with self.requests_db as db:
request_id = request_set['requestId']
db[request_id] = { 'requestId': request_id,
'timestamp': calendar.timegm(self.clock()),
'sets': [request_set] }
def get_requests(self):
pending_requests = self.requests_db.read()
return pending_requests
def get_request(self, request_id):
pending_requests = self.get_requests()
if request_id in pending_requests:
return pending_requests[request_id]
return None
def remove_request(self, request_id):
with self.requests_db as pending_requests:
pending_requests.pop(request_id)
def remove_requests(self, request_ids):
with self.requests_db as pending_requests:
for request_id in request_ids:
pending_requests.pop(request_id)
def request_completed(self, request_status):
request_id = request_status["requestId"]
request_envelope = self.get_request(request_id)
if request_envelope:
self.remove_request(request_id)

Просмотреть файл

@ -255,7 +255,7 @@ class ProviderConfig:
jetpack_config = json.load(json_file)
# import jetpack
# jetpack_config = jetpack.config
except (ModuleNotFoundError,ImportError) as ex:
except (ModuleNotFoundError,ImportError,FileNotFoundError) as ex:
jetpack_config = {}
self.jetpack_config = jetpack_config

Просмотреть файл

@ -1,70 +1,18 @@
import os
import json
from collections import OrderedDict
import math
import unittest
import cluster
import logging
from unittest.mock import MagicMock, patch
import string
class WeightedTemplates():
def __init__(self, cluster_name, provider_config, logger=None):
cluster_name = provider_config.get("cyclecloud.cluster.name")
self.cluster=cluster.Cluster(cluster_name, provider_config, logger)
def __init__(self, logger=None):
self.logger = logger or logging.getLogger()
def create_machines(self, input_json, azurecc_template):
def parse_weighted_template(self, input_json, azurecc_template):
symphony_templates = azurecc_template
result = []
self.logger.debug("symphony_templates: %s", symphony_templates)
vm_types = {}
for template in symphony_templates:
self.logger.debug("template: %s", template)
self.logger.debug("templateId: %s", input_json["template"]["templateId"])
if template["templateId"] == input_json["template"]["templateId"]:
nodearray_name = template["attributes"]["nodearray"][1]
vm_types_weight = template["vmTypes"]
req_count = input_json["template"]["machineCount"]
vm_priority_dict = template["vmTypePriority"]
maxNumber = template["maxNumber"]
if vm_priority_dict is None or vm_priority_dict == {}:
vm_types = template["vmTypes"]
else:
vm_types = dict(sorted(vm_priority_dict.items(), key=lambda item: item[1], reverse=True))
print(vm_types)
for vm_type in vm_types:
avail_count = self.cluster.get_avail_count(vm_type, nodearray_name)
print("avail_count: " + str(avail_count))
vm_type_weight = vm_types_weight[vm_type]
print("vm type weight: " + str(vm_type_weight))
maxNumberWeight = math.floor(maxNumber/vm_type_weight)
print ("maxNumber: " + str(maxNumberWeight))
req_count_weight = math.ceil(req_count/vm_type_weight)
req_machines = min(req_count_weight, avail_count, maxNumberWeight)
if req_machines <= 0:
print("Reached maxNumber or no available machines")
continue
result.append((vm_type, req_machines))
print("Create vmType: " + vm_type + " with count: " + str(req_machines))
req_count = req_count - avail_count * vm_type_weight
maxNumber = maxNumber - avail_count * vm_type_weight
return result
def azurecc_template_generate(vmTypes, vmTypePriority, maxNumber):
azurecc_template = [{
"templateId": "execute",
"attributes" : {
"type": ["String", "X86_64"],
"nram": ["Numeric", "4096"],
"ncpus": ["Numeric", 1],
"nodearray": ["String", "execute"]
},
"vmTypes": {" Standard_D2_v2 ":2, " Standard_D1_v2 ":1},
"vmTypePriority": {" Standard_D2_v2 ":1000, " Standard_D1_v2 ":100},
"priceInfo": ["String", "price:0.1,billingTimeUnitType:prorated_hour,billingTimeUnitNumber:1,billingRoundoffType:unit"],
"rank": ["Numeric", "0"],
"maxNumber": 100
}]
azurecc_template[0]["vmTypes"] = vmTypes
azurecc_template[0]["vmTypePriority"] = vmTypePriority
azurecc_template[0]["maxNumber"] = maxNumber
return azurecc_template
return vm_types