Fixes terminating nodes in case of expired create requests (#52)

* Fix cleanup of expired creation requests

* Update completed nodes only when HF asks

* updating template
This commit is contained in:
nidhi0622 2024-05-24 14:07:25 -05:00 коммит произвёл GitHub
Родитель c8cb1d9a29
Коммит 2a560499d0
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
4 изменённых файлов: 130 добавлений и 20 удалений

Просмотреть файл

@ -38,6 +38,11 @@ class InvalidCycleCloudVersionError(RuntimeError):
pass
def quiet_output():
'''Return a JsonOutputHandler that does not print to stdout and still can only be invoked once'''
return JsonOutputHandler(quiet=True)
class CycleCloudProvider:
def __init__(self, config, cluster, hostnamer, stdout_handler, terminate_requests, creation_requests, templates, clock):
@ -599,8 +604,7 @@ class CycleCloudProvider:
try:
if to_shutdown:
logger.debug("Terminating returned machines: %s", to_shutdown)
quiet_output = JsonOutputHandler(quiet=True)
self.terminate_machines({"machines": to_shutdown}, quiet_output)
self.terminate_machines({"machines": to_shutdown}, quiet_output())
except:
logger.exception()
missing_from_cc = sym_existing_hostnames - cc_existing_hostnames
@ -621,7 +625,7 @@ class CycleCloudProvider:
return self.stdout_handler.handle(response)
@failureresponse({"requests": [], "status": RequestStates.running})
def _create_status(self, input_json, output_handler=None):
def _create_status(self, input_json, output_handler=None, update_completed_nodes=True):
"""
input:
{'requests': [{'requestId': 'req-123'}, {'requestId': 'req-234'}]}
@ -644,7 +648,6 @@ class CycleCloudProvider:
"""
output_handler = output_handler or self.stdout_handler
request_ids = [r["requestId"] for r in input_json["requests"]]
nodes_by_request_id = {}
@ -829,11 +832,13 @@ class CycleCloudProvider:
logger.warning("Out-of-capacity condition detected for machine_type %s in nodearray %s", machine_type, nodearray_name)
self.capacity_tracker.pause_capacity(nodearray_name=nodearray_name, machine_type=machine_type)
requests_store[request_id]["lastNumNodes"] = actual_machine_cnt
requests_store[request_id]["completedNodes"] = completed_nodes
# Bugfix: Periodic cleanup calls this function however nodes reach ready state after symphony has
# stopped making status calls should not update this.
if update_completed_nodes:
requests_store[request_id]["completedNodes"] = completed_nodes
if requests_store[request_id].get("allNodes") is None:
requests_store[request_id]["allNodes"] = all_nodes
requests_store[request_id]["completed"] = len(nodes_by_request_id) == len(completed_nodes)
requests_store[request_id]["completed"] = len(requested_nodes) == len(completed_nodes)
active = len([x for x in machines if x["status"] == MachineStates.active])
building = len([x for x in machines if x["status"] == MachineStates.building])
@ -1099,9 +1104,8 @@ class CycleCloudProvider:
deletes = [x for x in input_json["requests"] if x["requestId"].startswith("delete-")]
create_response = {}
delete_response = {}
quiet_output = JsonOutputHandler(quiet=True)
if creates:
create_response = self._create_status({"requests": creates}, quiet_output)
create_response = self._create_status({"requests": creates}, quiet_output())
assert "status" in create_response
if deletes:
@ -1144,11 +1148,12 @@ class CycleCloudProvider:
for request_id, request in self.creation_json.read().items():
if request["allNodes"] is None:
never_queried_requests.append(request_id)
quiet_output = JsonOutputHandler(quiet=True)
if never_queried_requests:
try:
unrecoverable_request_ids = []
response = self._create_status({"requests": [{"requestId": r} for r in never_queried_requests]}, quiet_output)
response = self._create_status({"requests": [{"requestId": r} for r in never_queried_requests]},
quiet_output(),
update_completed_nodes=False)
for request in response["requests"]:
if request["status"] == RequestStates.complete_with_error and not request.get("_recoverable_", True):
@ -1178,7 +1183,10 @@ class CycleCloudProvider:
return
self._create_status({"requests": [{"requestId": r} for r in to_update_status]},
quiet_output)
quiet_output(),
# We need to terminate nodes that were not ready by the time the request expired
# We will terminate nodes that converge after timeout
update_completed_nodes=False)
with self.creation_json as requests_store:
to_shutdown = []
@ -1205,8 +1213,7 @@ class CycleCloudProvider:
return
if to_shutdown:
quiet_output = JsonOutputHandler(quiet=True)
self.terminate_machines({"machines": [{"machineId": x, "name": x} for x in to_shutdown]}, quiet_output)
self.terminate_machines({"machines": [{"machineId": x, "name": x} for x in to_shutdown]}, quiet_output())
for request in to_mark_complete:
request["lastUpdateTime"] = calendar.timegm(self.clock())
@ -1303,6 +1310,11 @@ class JsonOutputHandler:
if not self.quiet:
print(data_str)
return data
def try_handle(self, data, debug_output=True):
if self.written:
return
return self.handle(data, debug_output)
def true_gmt_clock(): # pragma: no cover

Просмотреть файл

@ -224,13 +224,13 @@ def failureresponse(response):
with_message["message"] = message
# args[0] is self
return args[0].output_handler.handle(with_message)
return args[0].output_handler.try_handle(with_message)
except Exception as e:
logger.exception(str(e))
logger.debug(traceback.format_exc())
with_message = deepcopy(response)
with_message["message"] = str(e)
return args[0].output_handler.handle(with_message)
return args[0].output_handler.try_handle(with_message)
except SystemExit as se:
# NOTE: see terminate_machines for more info
logger.exception("System Exit occured intentionally write 0 json so symphony recovers")
@ -240,7 +240,7 @@ def failureresponse(response):
logger.debug(traceback.format_exc())
with_message = deepcopy(response)
with_message["message"] = traceback.format_exc()
return args[0].output_handler.handle(with_message)
return args[0].output_handler.try_handle(with_message)
return _wrap
return decorator

Просмотреть файл

@ -1,4 +1,4 @@
__version__ = "1.0.11"
__version__ = "1.0.12"
def get_version():

Просмотреть файл

@ -14,11 +14,15 @@ Autoscale = $Autoscale
SubnetId = $SubnetId
Region = $Region
KeyPairLocation = ~/.ssh/cyclecloud.pem
ShutdownPolicy = $ShutdownPolicy
MachineType = $ExecuteMachineType
[[[configuration]]]
run_list = recipe[cuser]
# Disable ip-XXXXXXXX hostname generation
cyclecloud.hosts.standalone_dns.enabled = ${NodeNameIsHostname==false}
cyclecloud.hosts.simple_vpc_dns.enabled = ${NodeNameIsHostname==false}
[[[network-interface eth0]]]
AssociatePublicIpAddress = false
@ -46,12 +50,29 @@ Autoscale = $Autoscale
[[[cluster-init symphony:master:2.0.0]]]
[[[volume boot]]]
Size = $MasterBootDiskSize
Ssd = $MasterBootDiskSSD
[[[volume vartmp]]]
Size = 2048
SSD = true
Mount = vartmp
Persistent = false
[[nodearray management]]
MachineType = $ManagementMachineType
InitialCoreCount = 0
MaxCoreCount = 64
[[[volume boot]]]
Size = $ExecuteBootDiskSize
Ssd = $ExecuteBootDiskSize
[[[configuration]]]
symphony.node_prefix = ${ifThenElse(NodeNamePrefix=="Cluster Prefix", StrJoin("-", toLower(ClusterName), ""), NodeNamePrefix)}
symphony.node_domain_suffix = $ZCSDomain
symphony.use_nodename_as_hostname = $NodeNameIsHostname
cyclecloud.discoverable = true
[[[configuration symphony]]]
@ -63,8 +84,16 @@ Autoscale = $Autoscale
MaxCoreCount = $MaxExecuteCoreCount
Interruptible = $UseLowPrio
Priority = 10
AwaitInstallationTimeout=10
[[[volume boot]]]
Size = $ExecuteBootDiskSize
Ssd = $ExecuteBootDiskSize
[[[configuration]]]
symphony.node_prefix = ${ifThenElse(NodeNamePrefix=="Cluster Prefix", StrJoin("-", toLower(ClusterName), ""), NodeNamePrefix)}
symphony.node_domain_suffix = $ZCSDomain
symphony.use_nodename_as_hostname = $NodeNameIsHostname
autoscaling.enabled = true
[[[cluster-init symphony:execute:2.0.0]]]
@ -75,12 +104,23 @@ Autoscale = $Autoscale
MaxCount = 400
Priority = 100
[[[volume boot]]]
Size = $ExecuteBootDiskSize
Ssd = $ExecuteBootDiskSize
[[nodearray persistent-execute]]
InitialCoreCount= 0
MaxCoreCount = 128
[[[volume boot]]]
Size = $ExecuteBootDiskSize
Ssd = $ExecuteBootDiskSize
[[[configuration]]]
symphony.node_prefix = ${ifThenElse(NodeNamePrefix=="Cluster Prefix", StrJoin("-", toLower(ClusterName), ""), NodeNamePrefix)}
symphony.node_domain_suffix = $ZCSDomain
symphony.use_nodename_as_hostname = $NodeNameIsHostname
cyclecloud.cluster.autoscale.stop_enabled = false
[[[cluster-init symphony:execute:2.0.0]]]
@ -133,7 +173,7 @@ Order = 10
Label = Mgmt Machine Type
Description = The machine type for the Symphony Management nodes
ParameterType = Cloud.MachineType
DefaultValue = Standard_E16_v3
DefaultValue = Standard_F2s_v2
[[[parameter ExecuteMachineType]]]
Label = Execute VM Type
@ -193,11 +233,58 @@ Order = 20
Description = "Specify the scheduling software, and base OS installed on all nodes, and optionally the cluster-init and chef versions from your Locker."
Order = 10
[[[parameter NodeNameIsHostname]]]
Label = Name As Hostname
Description = Should the hostname match the nodename for execute nodes?
ParameterType = Boolean
DefaultValue = false
[[[parameter ZCSDomain]]]
Label = Name of Domain
Description = Domain name of the node
ParameterType = String
DefaultValue = true
[[[parameter NodeNamePrefix]]]
Label = Node Prefix
Description = Prefix for generated node names, i.e. "prefix-" generates prefix-nodearray-1. Use 'Cluster Prefix' to get $ClusterName-nodearray-1
ParameterType = StringList
Config.Plugin = pico.form.Dropdown
Config.FreeForm = true
DefaultValue = "Cluster Prefix"
Config.Entries := {[Value=""], [Value="Cluster Prefix"]}
Conditions.Hidden := NodeNameIsHostname != true
[[[parameter ImageName]]]
Label = Base OS
ParameterType = Cloud.Image
Config.OS = linux
DefaultValue = cycle.image.centos7
DefaultValue = cycle.image.ubuntu20
[[[parameter MasterBootDiskSize]]]
Label = Master Boot Disk Size
Description = Size of the master boot disk in GB
ParameterType = Integer
DefaultValue = 128
[[[parameter MasterBootDiskSSD]]]
Label = Master Boot Disk SSD
Description = Use SSD for master boot disk
ParameterType = Boolean
DefaultValue = true
[[[parameter ExecuteBootDiskSize]]]
Label = Exec Boot Disk Size
Description = Size of the execute boot disk in GB
ParameterType = Integer
DefaultValue = 128
[[[parameter ExecuteBootDiskSSD]]]
Label = Exec Boot Disk SSD
Description = Use SSD for execute boot disk
ParameterType = Boolean
DefaultValue = true
[[[parameter DefaultClusterInitSpecs]]]
Label = Default Cluster-Init Specs
@ -231,4 +318,15 @@ Order = 20
ParameterType = Boolean
Config.Label = Access master node from the Internet
[[[parameter ShutdownPolicy]]]
description = By default, autostop will Delete stopped VMS for lowest cost. Optionally, Stop/Deallocate the VMs for faster restart instead.
DefaultValue = Terminate
config.plugin = pico.control.AutoCompleteDropdown
[[[[list Config.Entries]]]]
Name = Terminate
Label = Terminate
[[[[list Config.Entries]]]]
Name = Deallocate
Label = Deallocate