updated unit-test and review comments

This commit is contained in:
nidhi0622 2024-03-12 15:55:11 -05:00
Родитель 9963f9547f
Коммит 1981be2f9a
2 изменённых файлов: 76 добавлений и 56 удалений

Просмотреть файл

@ -8,7 +8,6 @@ import os
import pprint
import sys
import uuid
import requests
from builtins import str
@ -656,15 +655,15 @@ class CycleCloudProvider:
except Exception as e:
if "No operation found for request id" in str(e):
nodes_by_request_id[request_id] = {"nodes": []}
elif isinstance(e, requests.exceptions.ConnectionError):
else:
exceptions.append(e)
# send HF request is still running so that it remembers request.
logger.warning(f"Connection error occured during create_status call {request_id} reported as running")
logger.exception("Azure CycleCloud experienced an error but reporting status as running %s. %s", request_id, e)
return json_writer({"status": RequestStates.running,
"requests": [{"requestId": request_id, "status": RequestStates.running} for request_id in request_ids],
"message": "Azure CycleCloud is still requesting nodes"})
else:
exceptions.append(e)
logger.exception("Azure CycleCloud experienced an error and the node creation request failed for request_id %s. %s", request_id, e)
if not nodes_by_request_id:
error_messages = " | ".join(list(set([str(e) for e in exceptions])))
@ -859,6 +858,7 @@ class CycleCloudProvider:
# executing is a terminal state.
response = {"requests": []}
request_status = RequestStates.complete
# needs to be a [] when we return
with self.terminate_json as terminate_requests:
@ -866,51 +866,48 @@ class CycleCloudProvider:
termination_ids = [r["requestId"] for r in input_json["requests"] if r["requestId"]]
machines_to_terminate = []
try:
for termination_id in termination_ids:
if termination_id in terminate_requests:
termination = terminate_requests[termination_id]
if not termination.get("terminated"):
for machine_id, name in termination["machines"].items():
machines_to_terminate.append({"machineId": machine_id, "name": name})
if machines_to_terminate:
logger.warning("Re-attempting termination of nodes %s", machines_to_terminate)
try:
self.cluster.terminate(machines_to_terminate)
except requests.exceptions.ConnectionError:
logger.warning("Could not terminate machines %s due to connection exception, sending status as running", machines_to_terminate)
request_status = RequestStates.running
response["status"] = request_status
response_machines = []
for termination_id in termination_ids:
# if we don't know the termination_id then we report an empty list of machines
request = {"requestId": termination_id,
"machines": response_machines}
# report machines are in deleting state so HF remembers the request
if termination_id in terminate_requests:
termination_request = terminate_requests.get(termination_id)
machines = termination_request.get("machines", {})
if machines:
for machine_id, hostname in machines.items():
response_machines.append({"name": hostname,
"status": MachineStates.deleting,
"result": MachineResults.executing,
"machineId": machine_id})
response["requests"].append(request)
return self.json_writer(response)
for termination_id in termination_ids:
if termination_id in terminate_requests:
termination = terminate_requests[termination_id]
termination["terminated"] = True
except Exception:
request_status = RequestStates.running
logger.exception("Could not terminate nodes with ids %s. Will retry", machines_to_terminate)
for termination_id in termination_ids:
if termination_id in terminate_requests:
termination = terminate_requests[termination_id]
if not termination.get("terminated"):
for machine_id, name in termination["machines"].items():
machines_to_terminate.append({"machineId": machine_id, "name": name})
if machines_to_terminate:
logger.warning("Re-attempting termination of nodes %s", machines_to_terminate)
try:
self.cluster.terminate(machines_to_terminate)
except Exception:
# Send HF request status as running so it remembers the request
logger.exception("Could not terminate machines %s due to an exception, reported status as running", machines_to_terminate)
request_status = RequestStates.running
response["status"] = request_status
response_machines = []
for termination_id in termination_ids:
# if we don't know the termination_id then we report an empty list of machines
request = {"requestId": termination_id,
"machines": response_machines}
# report machines are in deleting state so HF remembers the request
if termination_id in terminate_requests:
termination_request = terminate_requests.get(termination_id)
machines = termination_request.get("machines", {})
if machines:
for machine_id, hostname in machines.items():
response_machines.append({"name": hostname,
"status": MachineStates.deleting,
"result": MachineResults.executing,
"machineId": machine_id})
response["requests"].append(request)
return self.json_writer(response)
for termination_id in termination_ids:
if termination_id in terminate_requests:
termination = terminate_requests[termination_id]
termination["terminated"] = True
for termination_id in termination_ids:
request_status = RequestStates.complete
@ -1079,8 +1076,9 @@ class CycleCloudProvider:
terminations[request_id] = {"id": request_id, "machines": machines, "requestTime": calendar.timegm(self.clock())}
request_id_persisted = True
except:
# NOTE: Here we will not exit immediately but exit after an attempted shutdown
logger.exception("Could not open terminate.json")
#NOTE: Here we will not exit immediately but exit after an attempted shutdown
request_status = RequestStates.complete
message = "CycleCloud is terminating the VM(s)"
@ -1362,8 +1360,8 @@ def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
cmd, ignore, input_json_path = argv[1:]
input_json = util.load_json(input_json_path)
logger.info("Arguments - %s %s %s", cmd, ignore, input_json_path)
operation_id = int(time.time())
logger.info("BEGIN %s %s - %s %s", operation_id, cmd, ignore, input_json_path)
logger.debug("Input: %s", json.dumps(input_json))
if cmd == "templates":
@ -1401,7 +1399,7 @@ def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
import traceback
traceback.print_exc()
finally:
logger.info("End Arguments - %s %s %s", cmd, ignore, input_json_path)
logger.info("END %s %s - %s %s", operation_id, cmd, ignore, input_json_path)
if __name__ == "__main__":
main() # pragma: no cover

Просмотреть файл

@ -45,6 +45,7 @@ class MockCluster:
self._nodes = {}
self.raise_during_termination = False
self.raise_during_add_nodes = False
self.raise_during_nodes = False
# {<MachineType>: <ActualCapacity != MaxCount>}
# {'standard_a8': 1} => max of 1 VM will be returned by add_nodes regardless of requested count
@ -100,9 +101,12 @@ class MockCluster:
for node in self.inodes(RequestId=request_ids):
node['InstanceId'] = '%s_%s' % (node["RequestId"], instance_count)
node['State'] = 'Started'
node['Status'] = 'Started'
node['PrivateIp'] = '10.0.0.%s' % instance_count
def nodes(self, request_ids=[]):
if self.raise_during_nodes:
raise RuntimeError("raise_during_nodes")
ret = {}
for request_id in request_ids:
@ -333,9 +337,19 @@ class TestHostFactory(unittest.TestCase):
self.assertEqual(0, len(request_status1["machines"]))
self.assertEqual(RequestStates.running, request_status2["status"])
self.assertEqual(0, len(request_status2["machines"]))
# Test for a case when exception raised during status call.
provider.cluster.raise_during_nodes = True
request_status = provider.status({'requests': [request1, request2]})
self.assertEqual(RequestStates.running, request_status["status"])
request_status1 = find_request_status(request_status, request1)
request_status2 = find_request_status(request_status, request2)
self.assertEqual(RequestStates.running, request_status1["status"])
self.assertEqual(RequestStates.running, request_status2["status"])
provider.cluster.complete_node_startup([request1['requestId'], request2['requestId']])
provider.cluster.raise_during_nodes = False
request_status = provider.status({'requests': [request1, request2]})
self.assertEqual(RequestStates.complete, request_status["status"])
request_status1 = find_request_status(request_status, request1)
@ -412,6 +426,14 @@ class TestHostFactory(unittest.TestCase):
status_response = provider.terminate_status({"machines": [{"machineId": "missing", "name": "missing-123"}]})
self.assertEqual({'requests': [], 'status': 'complete'}, status_response)
# test status is reported as running so that HF keeps requesting status of request.
provider.cluster.raise_during_termination = True
term_response = provider.terminate_machines({"machines": [{"name": "host-123", "machineId": "id-123"}]})
failed_request_id = term_response["requestId"]
status_response = provider.status({"requests": [{"requestId": failed_request_id}]})
self.assertEqual(status_response["status"], "running")
def test_terminate_error(self):
provider = self._new_provider()
term_response = provider.terminate_machines({"machines": [{"name": "host-123", "machineId": "id-123"}]})