Merge pull request #47 from Azure/bugfix/fix_leaked_nodes

Bugfix: Guarantee json output is only written in std output once
This commit is contained in:
nidhi0622 2024-04-16 12:15:16 -05:00 коммит произвёл GitHub
Родитель 08419a29b9 d5f2e51304
Коммит 8eaf0d434c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
4 изменённых файлов: 59 добавлений и 55 удалений

Просмотреть файл

@ -40,11 +40,11 @@ class InvalidCycleCloudVersionError(RuntimeError):
class CycleCloudProvider:
def __init__(self, config, cluster, hostnamer, json_writer, terminate_requests, creation_requests, templates, clock):
def __init__(self, config, cluster, hostnamer, output_handler, terminate_requests, creation_requests, templates, clock):
self.config = config
self.cluster = cluster
self.hostnamer = hostnamer
self.json_writer = json_writer
self.output_handler = output_handler
self.terminate_json = terminate_requests
self.templates_json = templates
self.creation_json = creation_requests
@ -335,7 +335,7 @@ class CycleCloudProvider:
def templates(self):
try:
symphony_templates = self._update_templates()
return self.json_writer({"templates": symphony_templates, "message": "Get available templates success."}, debug_output=False)
return self.output_handler.handle({"templates": symphony_templates, "message": "Get available templates success."}, debug_output=False)
except:
logger.warning("Exiting Non-zero so that symphony will retry")
logger.exception("Could not get template_json")
@ -446,7 +446,7 @@ class CycleCloudProvider:
if not template:
available_templates = template_store.keys()
return self.json_writer({"requestId": request_id, "status": RequestStates.complete_with_error,
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.complete_with_error,
"message": "Unknown templateId %s. Available %s" % (template_id, available_templates)})
machine_count = input_json["template"]["machineCount"]
@ -500,23 +500,23 @@ class CycleCloudProvider:
else:
logger.info("Requested %s instances of machine type %s in nodearray %s.", machine_count, machinetype_name, _get("nodearray"))
return self.json_writer({"requestId": request_id, "status": RequestStates.running,
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.running,
"message": "Request instances success from Azure CycleCloud."})
except UserError as e:
logger.exception("Azure CycleCloud experienced an error and the node creation request failed. %s", e)
return self.json_writer({"requestId": request_id, "status": RequestStates.complete_with_error,
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.complete_with_error,
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
except ValueError as e:
logger.exception("Azure CycleCloud experienced an error and the node creation request failed. %s", e)
return self.json_writer({"requestId": request_id, "status": RequestStates.complete_with_error,
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.complete_with_error,
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
except OutOfCapacityError as e:
logger.warning("Request Id %s failed with out of capacity %s", request_id, e)
return self.json_writer({"requestId": request_id, "status": RequestStates.running,
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.running,
"message": "Azure CycleCloud does not currently have capacity %s" % str(e)})
except Exception as e:
logger.exception("Azure CycleCloud experienced an error, though it may have succeeded: %s", e)
return self.json_writer({"requestId": request_id, "status": RequestStates.running,
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.running,
"message": "Azure CycleCloud experienced an error, though it may have succeeded: %s" % str(e)})
@failureresponse({"requests": [], "status": RequestStates.complete_with_error})
@ -556,12 +556,12 @@ class CycleCloudProvider:
all_nodes = self.cluster.all_nodes()
except UserError as e:
logger.exception("Azure CycleCloud experienced an error and the get return request failed. %s", e)
return self.json_writer({"status": RequestStates.complete_with_error,
return self.output_handler.handle({"status": RequestStates.complete_with_error,
"requests": [],
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
except ValueError as e:
logger.exception("Azure CycleCloud experienced an error and the get return request failed. %s", e)
return self.json_writer({"status": RequestStates.complete_with_error,
return self.output_handler.handle({"status": RequestStates.complete_with_error,
"requests": [],
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
@ -599,7 +599,8 @@ class CycleCloudProvider:
try:
if to_shutdown:
logger.debug("Terminating returned machines: %s", to_shutdown)
self.terminate_machines({"machines": to_shutdown})
quiet_output = JsonOutputHandler(quiet=True)
self.terminate_machines({"machines": to_shutdown}, quiet_output)
except:
logger.exception()
missing_from_cc = sym_existing_hostnames - cc_existing_hostnames
@ -617,10 +618,10 @@ class CycleCloudProvider:
response["message"] = message
response["status"] = request_status
return self.json_writer(response)
return self.output_handler.handle(response)
@failureresponse({"requests": [], "status": RequestStates.running})
def _create_status(self, input_json, json_writer=None):
def _create_status(self, input_json, output_handler=None):
"""
input:
{'requests': [{'requestId': 'req-123'}, {'requestId': 'req-234'}]}
@ -642,7 +643,7 @@ class CycleCloudProvider:
'status': 'complete'}
"""
json_writer = json_writer or self.json_writer
output_handler = output_handler or self.output_handler
request_ids = [r["requestId"] for r in input_json["requests"]]
@ -659,7 +660,7 @@ class CycleCloudProvider:
exceptions.append(e)
# send HF request is still running so that it remembers request.
logger.exception("Azure CycleCloud experienced an error but reporting status as running %s. %s", request_id, e)
return json_writer({"status": RequestStates.running,
return output_handler.handle({"status": RequestStates.running,
"requests": [{"requestId": request_id, "status": RequestStates.running} for request_id in request_ids],
"message": "Azure CycleCloud is still requesting nodes"})
@ -667,7 +668,7 @@ class CycleCloudProvider:
if not nodes_by_request_id:
error_messages = " | ".join(list(set([str(e) for e in exceptions])))
return json_writer({"status": RequestStates.complete_with_error,
return output_handler.handle({"status": RequestStates.complete_with_error,
"requests": [{"requestId": request_id, "status": RequestStates.complete_with_error} for request_id in request_ids],
"message": "Azure CycleCloud experienced an error: %s" % error_messages})
@ -850,7 +851,7 @@ class CycleCloudProvider:
response["status"] = symphony.RequestStates.complete
return json_writer(response)
return output_handler.handle(response)
@failureresponse({"requests": [], "status": RequestStates.running})
def _deperecated_terminate_status(self, input_json):
@ -902,7 +903,7 @@ class CycleCloudProvider:
"machineId": machine_id})
response["requests"].append(request)
return self.json_writer(response)
return self.output_handler.handle(response)
for termination_id in termination_ids:
@ -961,7 +962,7 @@ class CycleCloudProvider:
response["status"] = request_status
return self.json_writer(response)
return response
@failureresponse({"status": RequestStates.running})
def terminate_status(self, input_json):
@ -1047,7 +1048,7 @@ class CycleCloudProvider:
@failureresponse({"status": RequestStates.complete_with_error})
def terminate_machines(self, input_json, json_writer=None):
def terminate_machines(self, input_json, output_handler=None):
"""
input:
{
@ -1061,7 +1062,7 @@ class CycleCloudProvider:
"status": "complete"
}
"""
json_writer = json_writer or self.json_writer
output_handler = output_handler or self.output_handler
logger.info("Terminate_machines request for : %s", input_json)
request_id = "delete-%s" % str(uuid.uuid4())
request_id_persisted = False
@ -1100,7 +1101,7 @@ class CycleCloudProvider:
if not request_id_persisted:
return sys.exit(1)
return json_writer({"message": message,
return output_handler.handle({"message": message,
"requestId": request_id,
"status": request_status,
"machines": [ {
@ -1119,22 +1120,21 @@ class CycleCloudProvider:
except Exception as e:
logger.exception(str(e))
if request_id_persisted:
return json_writer({"status": RequestStates.running, "requestId": request_id})
return json_writer({"status": RequestStates.complete_with_error, "requestId": request_id, "message": str(e)})
return output_handler.handle({"status": RequestStates.running, "requestId": request_id})
return output_handler.handle({"status": RequestStates.complete_with_error, "requestId": request_id, "message": str(e)})
def status(self, input_json):
'''
Kludge: can't seem to get provider.json to reliably call the correct request action.
'''
json_writer = self.json_writer
self.json_writer = lambda x: x
output_handler = self.output_handler
creates = [x for x in input_json["requests"] if not x["requestId"].startswith("delete-")]
deletes = [x for x in input_json["requests"] if x["requestId"].startswith("delete-")]
create_response = {}
delete_response = {}
quiet_output = JsonOutputHandler(quiet=True)
if creates:
create_response = self._create_status({"requests": creates})
create_response = self._create_status({"requests": creates}, quiet_output)
assert "status" in create_response
if deletes:
@ -1166,7 +1166,7 @@ class CycleCloudProvider:
response = {"status": combined_status,
"requests": create_response.get("requests", []) + delete_response.get("requests", [])
}
return json_writer(response)
return output_handler.handle(response)
def _terminate_expired_requests(self):
@ -1177,12 +1177,11 @@ class CycleCloudProvider:
for request_id, request in self.creation_json.read().items():
if request["allNodes"] is None:
never_queried_requests.append(request_id)
quiet_output = JsonOutputHandler(quiet=True)
if never_queried_requests:
try:
unrecoverable_request_ids = []
response = self._create_status({"requests": [{"requestId": r} for r in never_queried_requests]}, lambda input_json, **ignore: input_json)
response = self._create_status({"requests": [{"requestId": r} for r in never_queried_requests]}, quiet_output)
for request in response["requests"]:
if request["status"] == RequestStates.complete_with_error and not request.get("_recoverable_", True):
@ -1212,7 +1211,7 @@ class CycleCloudProvider:
return
self._create_status({"requests": [{"requestId": r} for r in to_update_status]},
lambda input_json, **ignore: input_json)
quiet_output)
with self.creation_json as requests_store:
to_shutdown = []
@ -1239,10 +1238,8 @@ class CycleCloudProvider:
return
if to_shutdown:
original_writer = self.json_writer
self.json_writer = lambda data, debug_output=True: 0
self.terminate_machines({"machines": [{"machineId": x, "name": x} for x in to_shutdown]})
self.json_writer = original_writer
quiet_output = JsonOutputHandler(quiet=True)
self.terminate_machines({"machines": [{"machineId": x, "name": x} for x in to_shutdown]}, quiet_output)
for request in to_mark_complete:
request["lastUpdateTime"] = calendar.timegm(self.clock())
@ -1325,12 +1322,20 @@ def _placement_groups(config):
return ["pg%s" % x for x in range(num_placement_groups)]
def simple_json_writer(data, debug_output=True): # pragma: no cover
data_str = json.dumps(data)
if debug_output:
logger.debug("Response: %s", data_str)
print(data_str)
return data
class JsonOutputHandler:
def __init__(self, quiet=False) -> None:
self.written = False
self.quiet = quiet
def handle(self, data, debug_output=True): # pragma: no cover
assert not self.written
self.written = True
data_str = json.dumps(data)
if debug_output:
logger.debug("Response: %s", data_str)
if not self.quiet:
print(data_str)
return data
def true_gmt_clock(): # pragma: no cover
@ -1338,7 +1343,7 @@ def true_gmt_clock(): # pragma: no cover
return time.gmtime()
def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
def main(argv=sys.argv): # pragma: no cover
try:
global logger
@ -1347,11 +1352,12 @@ def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
data_dir = os.getenv('PRO_DATA_DIR', os.getcwd())
hostnamer = util.Hostnamer(provider_config.get("cyclecloud.hostnames.use_fqdn", True))
cluster_name = provider_config.get("cyclecloud.cluster.name")
output_handler = JsonOutputHandler()
provider = CycleCloudProvider(config=provider_config,
cluster=cluster.Cluster(cluster_name, provider_config, logger),
hostnamer=hostnamer,
json_writer=json_writer,
output_handler=output_handler,
terminate_requests=JsonStore("terminate_requests.json", data_dir),
creation_requests=JsonStore("create_requests.json", data_dir),
templates=JsonStore("templates.json", data_dir, formatted=True),

Просмотреть файл

@ -223,13 +223,14 @@ def failureresponse(response):
pass
with_message["message"] = message
return args[0].json_writer(with_message)
# args[0] is self
return args[0].output_handler.handle(with_message)
except Exception as e:
logger.exception(str(e))
logger.debug(traceback.format_exc())
with_message = deepcopy(response)
with_message["message"] = str(e)
return args[0].json_writer(with_message)
return args[0].output_handler.handle(with_message)
except SystemExit as se:
# NOTE: see terminate_machines for more info
logger.exception("System Exit occured intentionally write 0 json so symphony recovers")
@ -239,7 +240,7 @@ def failureresponse(response):
logger.debug(traceback.format_exc())
with_message = deepcopy(response)
with_message["message"] = traceback.format_exc()
return args[0].json_writer(with_message)
return args[0].output_handler.handle(with_message)
return _wrap
return decorator

Просмотреть файл

@ -1,4 +1,4 @@
__version__ = "1.0.10"
__version__ = "1.0.11"
def get_version():

Просмотреть файл

@ -162,10 +162,6 @@ class RequestsStoreInMem:
def __exit__(self, *args):
pass
def json_writer(data, debug_output=False):
return data
class TestHostFactory(unittest.TestCase):
@ -297,7 +293,8 @@ class TestHostFactory(unittest.TestCase):
"buckets": [a4bucket, a8bucket]}]})
epoch_clock = MockClock((1970, 1, 1, 0, 0, 0))
hostnamer = MockHostnamer()
provider = cyclecloud_provider.CycleCloudProvider(provider_config, cluster, hostnamer, json_writer,
output_handler = cyclecloud_provider.JsonOutputHandler(quiet=True)
provider = cyclecloud_provider.CycleCloudProvider(provider_config, cluster, hostnamer, output_handler,
terminate_requests=RequestsStoreInMem(),
creation_requests=RequestsStoreInMem(),
templates=RequestsStoreInMem(),