Merge branch 'master' into feature/weighted_template
This commit is contained in:
Коммит
3b61643c47
|
@ -18,6 +18,7 @@ from request_tracking_db import RequestTrackingDb
|
|||
from util import JsonStore, failureresponse
|
||||
import util
|
||||
import symphony
|
||||
import version
|
||||
from cyclecliwrapper import UserError
|
||||
|
||||
|
||||
|
@ -321,9 +322,17 @@ class CycleCloudProvider:
|
|||
except Exception as e:
|
||||
if "No operation found for request id" in str(e):
|
||||
nodes_by_request_id[request_id] = {"nodes": []}
|
||||
elif "Could not find request id" in str(e):
|
||||
nodes_by_request_id[request_id] = []
|
||||
else:
|
||||
exceptions.append(e)
|
||||
logger.exception("Azure CycleCloud experienced an error and the node creation request failed for request_id %s. %s", request_id, e)
|
||||
# send HF request is still running so that it remembers request.
|
||||
logger.exception("Azure CycleCloud experienced an error but reporting status as running %s. %s", request_id, e)
|
||||
return json_writer({"status": RequestStates.running,
|
||||
"requests": [{"requestId": request_id, "status": RequestStates.running} for request_id in request_ids],
|
||||
"message": "Azure CycleCloud is still requesting nodes"})
|
||||
|
||||
|
||||
|
||||
if not nodes_by_request_id:
|
||||
error_messages = " | ".join(list(set([str(e) for e in exceptions])))
|
||||
|
@ -335,9 +344,6 @@ class CycleCloudProvider:
|
|||
|
||||
response = {"requests": []}
|
||||
|
||||
unknown_state_count = 0
|
||||
requesting_count = 0
|
||||
|
||||
for request_id, requested_nodes in nodes_by_request_id.items():
|
||||
request_status = RequestStates.complete
|
||||
unknown_state_count = 0
|
||||
|
@ -503,6 +509,7 @@ class CycleCloudProvider:
|
|||
# executing is a terminal state.
|
||||
|
||||
response = {"requests": []}
|
||||
request_status = RequestStates.complete
|
||||
# needs to be a [] when we return
|
||||
with self.terminate_json as terminate_requests:
|
||||
|
||||
|
@ -510,27 +517,50 @@ class CycleCloudProvider:
|
|||
|
||||
termination_ids = [r["requestId"] for r in input_json["requests"] if r["requestId"]]
|
||||
machines_to_terminate = []
|
||||
try:
|
||||
for termination_id in termination_ids:
|
||||
if termination_id in terminate_requests:
|
||||
termination = terminate_requests[termination_id]
|
||||
if not termination.get("terminated"):
|
||||
for machine_id, name in termination["machines"].items():
|
||||
machines_to_terminate.append({"machineId": machine_id, "name": name})
|
||||
|
||||
if machines_to_terminate:
|
||||
logger.warning("Re-attempting termination of nodes %s", machines_to_terminate)
|
||||
self.cluster.shutdown_nodes(machines_to_terminate)
|
||||
|
||||
for termination_id in termination_ids:
|
||||
if termination_id in terminate_requests:
|
||||
termination = terminate_requests[termination_id]
|
||||
termination["terminated"] = True
|
||||
|
||||
except Exception:
|
||||
request_status = RequestStates.running
|
||||
logger.exception("Could not terminate nodes with ids %s. Will retry", machines_to_terminate)
|
||||
for termination_id in termination_ids:
|
||||
if termination_id in terminate_requests:
|
||||
termination = terminate_requests[termination_id]
|
||||
if not termination.get("terminated"):
|
||||
for machine_id, name in termination["machines"].items():
|
||||
machines_to_terminate.append({"machineId": machine_id, "name": name})
|
||||
|
||||
if machines_to_terminate:
|
||||
logger.warning("Re-attempting termination of nodes %s", machines_to_terminate)
|
||||
try:
|
||||
self.cluster.shutdown(machines_to_terminate)
|
||||
except Exception:
|
||||
# Send HF request status as running so it remembers the request
|
||||
logger.exception("Could not terminate machines %s due to an exception, reported status as running", machines_to_terminate)
|
||||
request_status = RequestStates.running
|
||||
response["status"] = request_status
|
||||
response_machines = []
|
||||
|
||||
for termination_id in termination_ids:
|
||||
response_machines = []
|
||||
# if we don't know the termination_id then we report an empty list of machines
|
||||
request = {"requestId": termination_id,
|
||||
"machines": response_machines}
|
||||
request["status"] = request_status
|
||||
# report machines are in deleting state so HF remembers the request
|
||||
if termination_id in terminate_requests:
|
||||
termination_request = terminate_requests.get(termination_id)
|
||||
machines = termination_request.get("machines", {})
|
||||
if machines:
|
||||
for machine_id, hostname in machines.items():
|
||||
response_machines.append({"name": hostname,
|
||||
"status": MachineStates.deleting,
|
||||
"result": MachineResults.executing,
|
||||
"machineId": machine_id})
|
||||
response["requests"].append(request)
|
||||
|
||||
return self.json_writer(response)
|
||||
|
||||
|
||||
for termination_id in termination_ids:
|
||||
if termination_id in terminate_requests:
|
||||
termination = terminate_requests[termination_id]
|
||||
termination["terminated"] = True
|
||||
|
||||
for termination_id in termination_ids:
|
||||
|
||||
request_status = RequestStates.complete
|
||||
|
@ -699,8 +729,9 @@ class CycleCloudProvider:
|
|||
terminations[request_id] = {"id": request_id, "machines": machines, "requestTime": calendar.timegm(self.clock())}
|
||||
request_id_persisted = True
|
||||
except:
|
||||
# NOTE: Here we will not exit immediately but exit after an attempted shutdown
|
||||
logger.exception("Could not open terminate.json")
|
||||
#NOTE: Here we will not exit immediately but exit after an attempted shutdown
|
||||
|
||||
request_status = RequestStates.complete
|
||||
message = "CycleCloud is terminating the VM(s)"
|
||||
|
||||
|
@ -980,7 +1011,8 @@ def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
|
|||
|
||||
input_json = util.load_json(input_json_path)
|
||||
|
||||
logger.info("BEGIN - %s %s %s", cmd, ignore, input_json_path)
|
||||
operation_id = int(time.time())
|
||||
logger.info("BEGIN %s %s - %s %s", operation_id, cmd, ignore, input_json_path)
|
||||
logger.debug("Input: %s", json.dumps(input_json))
|
||||
|
||||
if cmd == "validate_templates" or input_json.get("dry-run"):
|
||||
|
@ -988,7 +1020,10 @@ def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
|
|||
provider.dry_run = True
|
||||
if cmd == "generate_templates":
|
||||
provider.generate_sample_template()
|
||||
|
||||
|
||||
if cmd == "templates":
|
||||
logger.info("Using azurecc version %s", version.get_version())
|
||||
provider.templates()
|
||||
elif cmd == "create_machines":
|
||||
provider.create_machines(input_json)
|
||||
|
@ -1012,7 +1047,7 @@ def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
|
|||
|
||||
|
||||
# best effort cleanup.
|
||||
provider.periodic_cleanup()
|
||||
provider.periodic_cleanup(skip_templates=(cmd == "templates"))
|
||||
|
||||
except ImportError as e:
|
||||
logger.exception(str(e))
|
||||
|
@ -1024,7 +1059,7 @@ def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
|
|||
import traceback
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
logger.info("END - %s %s %s", cmd, ignore, input_json_path)
|
||||
logger.info("END %s %s - %s %s", operation_id, cmd, ignore, input_json_path)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main() # pragma: no cover
|
||||
|
|
|
@ -45,6 +45,7 @@ class MockCluster:
|
|||
self._nodes = {}
|
||||
self.raise_during_termination = False
|
||||
self.raise_during_add_nodes = False
|
||||
self.raise_during_nodes = False
|
||||
|
||||
# {<MachineType>: <ActualCapacity != MaxCount>}
|
||||
# {'standard_a8': 1} => max of 1 VM will be returned by add_nodes regardless of requested count
|
||||
|
@ -100,9 +101,12 @@ class MockCluster:
|
|||
for node in self.inodes(RequestId=request_ids):
|
||||
node['InstanceId'] = '%s_%s' % (node["RequestId"], instance_count)
|
||||
node['State'] = 'Started'
|
||||
node['Status'] = 'Started'
|
||||
node['PrivateIp'] = '10.0.0.%s' % instance_count
|
||||
|
||||
def nodes(self, request_ids=[]):
|
||||
if self.raise_during_nodes:
|
||||
raise RuntimeError("raise_during_nodes")
|
||||
ret = {}
|
||||
|
||||
for request_id in request_ids:
|
||||
|
@ -150,6 +154,9 @@ class RequestsStoreInMem:
|
|||
def read(self):
|
||||
return self.requests
|
||||
|
||||
def write(self, data):
|
||||
self.requests = deepcopy(data)
|
||||
|
||||
def __enter__(self):
|
||||
return self.requests
|
||||
|
||||
|
@ -295,7 +302,11 @@ class TestHostFactory(unittest.TestCase):
|
|||
"buckets": [a4bucket, a8bucket]}]})
|
||||
epoch_clock = MockClock((1970, 1, 1, 0, 0, 0))
|
||||
hostnamer = MockHostnamer()
|
||||
provider = cyclecloud_provider.CycleCloudProvider(provider_config, cluster, hostnamer, json_writer, RequestsStoreInMem(), RequestsStoreInMem(), epoch_clock)
|
||||
provider = cyclecloud_provider.CycleCloudProvider(provider_config, cluster, hostnamer, json_writer,
|
||||
terminate_requests=RequestsStoreInMem(),
|
||||
creation_requests=RequestsStoreInMem(),
|
||||
templates=RequestsStoreInMem(),
|
||||
clock=epoch_clock)
|
||||
provider.capacity_tracker.reset()
|
||||
return provider
|
||||
|
||||
|
@ -331,9 +342,19 @@ class TestHostFactory(unittest.TestCase):
|
|||
self.assertEqual(0, len(request_status1["machines"]))
|
||||
self.assertEqual(RequestStates.running, request_status2["status"])
|
||||
self.assertEqual(0, len(request_status2["machines"]))
|
||||
|
||||
|
||||
# Test for a case when exception raised during status call.
|
||||
provider.cluster.raise_during_nodes = True
|
||||
request_status = provider.status({'requests': [request1, request2]})
|
||||
self.assertEqual(RequestStates.running, request_status["status"])
|
||||
request_status1 = find_request_status(request_status, request1)
|
||||
request_status2 = find_request_status(request_status, request2)
|
||||
self.assertEqual(RequestStates.running, request_status1["status"])
|
||||
self.assertEqual(RequestStates.running, request_status2["status"])
|
||||
|
||||
provider.cluster.complete_node_startup([request1['requestId'], request2['requestId']])
|
||||
|
||||
|
||||
provider.cluster.raise_during_nodes = False
|
||||
request_status = provider.status({'requests': [request1, request2]})
|
||||
self.assertEqual(RequestStates.complete, request_status["status"])
|
||||
request_status1 = find_request_status(request_status, request1)
|
||||
|
@ -391,7 +412,8 @@ class TestHostFactory(unittest.TestCase):
|
|||
self.assertEqual({'status': 'complete', 'requests': [{'status': 'complete', 'message': '', 'requestId': 'missing', 'machines': []}]}, status_response)
|
||||
|
||||
status_response = provider.status({"requests": [{"requestId": "delete-missing"}]})
|
||||
self.assertEqual({'status': 'running', 'requests': [{'status': 'running', "message": "Unknown termination request id.", 'requestId': 'delete-missing', 'machines': []}]}, status_response)
|
||||
# self.assertEqual({'status': 'running', 'requests': [{'status': 'running', "message": "Unknown termination request id.", 'requestId': 'delete-missing', 'machines': []}]}, status_response)
|
||||
self.assertEqual({'status': 'running', 'requests': [{'status': 'complete_with_error', "message": "Warning: Ignoring unknown termination request id.", 'requestId': 'delete-missing', 'machines': []}]}, status_response)
|
||||
|
||||
def test_terminate_status(self):
|
||||
provider = self._new_provider()
|
||||
|
@ -409,6 +431,26 @@ class TestHostFactory(unittest.TestCase):
|
|||
status_response = provider.terminate_status({"machines": [{"machineId": "missing", "name": "missing-123"}]})
|
||||
self.assertEqual({'requests': [], 'status': 'complete'}, status_response)
|
||||
|
||||
# test status is reported as running so that HF keeps requesting status of request.
|
||||
provider.cluster.raise_during_termination = True
|
||||
term_response = provider.terminate_machines({"machines": [{"name": "host-123", "machineId": "id-123"}, {"name": "host-231", "machineId": "id-231"}]})
|
||||
failed_request_id = term_response["requestId"]
|
||||
term_response = provider.terminate_machines({"machines": [{"name": "host-123", "machineId": "id-1234"}]})
|
||||
failed_request_id2 = term_response["requestId"]
|
||||
status_response = provider.status({"requests": [{"requestId": failed_request_id}, {"requestId": failed_request_id2}]})
|
||||
self.assertEqual(status_response["status"], "running")
|
||||
self.assertEqual(len(status_response["requests"]), 2)
|
||||
self.assertEqual(status_response["requests"][0]["requestId"], failed_request_id)
|
||||
self.assertEqual(status_response["requests"][0]["machines"][0]["machineId"], "id-123")
|
||||
self.assertEqual(status_response["requests"][0]["machines"][1]["machineId"], "id-231")
|
||||
self.assertEqual(len(status_response["requests"][0]["machines"]), 2)
|
||||
self.assertEqual(status_response["requests"][0]["status"], "running")
|
||||
self.assertEqual(status_response["requests"][1]["requestId"], failed_request_id2)
|
||||
self.assertEqual(status_response["requests"][1]["machines"][0]["machineId"], "id-1234")
|
||||
self.assertEqual(len(status_response["requests"][1]["machines"]), 1)
|
||||
self.assertEqual(status_response["requests"][1]["status"], "running")
|
||||
|
||||
|
||||
def test_terminate_error(self):
|
||||
provider = self._new_provider()
|
||||
term_response = provider.terminate_machines({"machines": [{"name": "host-123", "machineId": "id-123"}]})
|
||||
|
@ -473,6 +515,8 @@ class TestHostFactory(unittest.TestCase):
|
|||
self.assertEqual(100, provider.templates()["templates"][0]["maxNumber"])
|
||||
|
||||
|
||||
|
||||
|
||||
def test_reprioritize_template(self):
|
||||
provider = self._new_provider()
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче