Merge pull request #48 from Azure/fix/cleanup_code
Cleanup terminate_status code, remove default output handler
This commit is contained in:
Коммит
fbc88adba3
|
@ -40,11 +40,11 @@ class InvalidCycleCloudVersionError(RuntimeError):
|
|||
|
||||
class CycleCloudProvider:
|
||||
|
||||
def __init__(self, config, cluster, hostnamer, output_handler, terminate_requests, creation_requests, templates, clock):
|
||||
def __init__(self, config, cluster, hostnamer, stdout_handler, terminate_requests, creation_requests, templates, clock):
|
||||
self.config = config
|
||||
self.cluster = cluster
|
||||
self.hostnamer = hostnamer
|
||||
self.output_handler = output_handler
|
||||
self.stdout_handler = stdout_handler
|
||||
self.terminate_json = terminate_requests
|
||||
self.templates_json = templates
|
||||
self.creation_json = creation_requests
|
||||
|
@ -335,7 +335,7 @@ class CycleCloudProvider:
|
|||
def templates(self):
|
||||
try:
|
||||
symphony_templates = self._update_templates()
|
||||
return self.output_handler.handle({"templates": symphony_templates, "message": "Get available templates success."}, debug_output=False)
|
||||
return self.stdout_handler.handle({"templates": symphony_templates, "message": "Get available templates success."}, debug_output=False)
|
||||
except:
|
||||
logger.warning("Exiting Non-zero so that symphony will retry")
|
||||
logger.exception("Could not get template_json")
|
||||
|
@ -446,7 +446,7 @@ class CycleCloudProvider:
|
|||
|
||||
if not template:
|
||||
available_templates = template_store.keys()
|
||||
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.complete_with_error,
|
||||
return self.stdout_handler.handle({"requestId": request_id, "status": RequestStates.complete_with_error,
|
||||
"message": "Unknown templateId %s. Available %s" % (template_id, available_templates)})
|
||||
|
||||
machine_count = input_json["template"]["machineCount"]
|
||||
|
@ -500,23 +500,23 @@ class CycleCloudProvider:
|
|||
else:
|
||||
logger.info("Requested %s instances of machine type %s in nodearray %s.", machine_count, machinetype_name, _get("nodearray"))
|
||||
|
||||
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.running,
|
||||
return self.stdout_handler.handle({"requestId": request_id, "status": RequestStates.running,
|
||||
"message": "Request instances success from Azure CycleCloud."})
|
||||
except UserError as e:
|
||||
logger.exception("Azure CycleCloud experienced an error and the node creation request failed. %s", e)
|
||||
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.complete_with_error,
|
||||
return self.stdout_handler.handle({"requestId": request_id, "status": RequestStates.complete_with_error,
|
||||
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
|
||||
except ValueError as e:
|
||||
logger.exception("Azure CycleCloud experienced an error and the node creation request failed. %s", e)
|
||||
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.complete_with_error,
|
||||
return self.stdout_handler.handle({"requestId": request_id, "status": RequestStates.complete_with_error,
|
||||
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
|
||||
except OutOfCapacityError as e:
|
||||
logger.warning("Request Id %s failed with out of capacity %s", request_id, e)
|
||||
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.running,
|
||||
return self.stdout_handler.handle({"requestId": request_id, "status": RequestStates.running,
|
||||
"message": "Azure CycleCloud does not currently have capacity %s" % str(e)})
|
||||
except Exception as e:
|
||||
logger.exception("Azure CycleCloud experienced an error, though it may have succeeded: %s", e)
|
||||
return self.output_handler.handle({"requestId": request_id, "status": RequestStates.running,
|
||||
return self.stdout_handler.handle({"requestId": request_id, "status": RequestStates.running,
|
||||
"message": "Azure CycleCloud experienced an error, though it may have succeeded: %s" % str(e)})
|
||||
|
||||
@failureresponse({"requests": [], "status": RequestStates.complete_with_error})
|
||||
|
@ -556,12 +556,12 @@ class CycleCloudProvider:
|
|||
all_nodes = self.cluster.all_nodes()
|
||||
except UserError as e:
|
||||
logger.exception("Azure CycleCloud experienced an error and the get return request failed. %s", e)
|
||||
return self.output_handler.handle({"status": RequestStates.complete_with_error,
|
||||
return self.stdout_handler.handle({"status": RequestStates.complete_with_error,
|
||||
"requests": [],
|
||||
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
|
||||
except ValueError as e:
|
||||
logger.exception("Azure CycleCloud experienced an error and the get return request failed. %s", e)
|
||||
return self.output_handler.handle({"status": RequestStates.complete_with_error,
|
||||
return self.stdout_handler.handle({"status": RequestStates.complete_with_error,
|
||||
"requests": [],
|
||||
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
|
||||
|
||||
|
@ -618,7 +618,7 @@ class CycleCloudProvider:
|
|||
|
||||
response["message"] = message
|
||||
response["status"] = request_status
|
||||
return self.output_handler.handle(response)
|
||||
return self.stdout_handler.handle(response)
|
||||
|
||||
@failureresponse({"requests": [], "status": RequestStates.running})
|
||||
def _create_status(self, input_json, output_handler=None):
|
||||
|
@ -643,7 +643,7 @@ class CycleCloudProvider:
|
|||
'status': 'complete'}
|
||||
|
||||
"""
|
||||
output_handler = output_handler or self.output_handler
|
||||
output_handler = output_handler or self.stdout_handler
|
||||
|
||||
request_ids = [r["requestId"] for r in input_json["requests"]]
|
||||
|
||||
|
@ -854,7 +854,7 @@ class CycleCloudProvider:
|
|||
return output_handler.handle(response)
|
||||
|
||||
@failureresponse({"requests": [], "status": RequestStates.running})
|
||||
def _deperecated_terminate_status(self, input_json):
|
||||
def _terminate_status(self, input_json):
|
||||
# can transition from complete -> executing or complete -> complete_with_error -> executing
|
||||
# executing is a terminal state.
|
||||
|
||||
|
@ -903,7 +903,7 @@ class CycleCloudProvider:
|
|||
"machineId": machine_id})
|
||||
response["requests"].append(request)
|
||||
|
||||
return self.output_handler.handle(response)
|
||||
return response
|
||||
|
||||
|
||||
for termination_id in termination_ids:
|
||||
|
@ -963,39 +963,7 @@ class CycleCloudProvider:
|
|||
response["status"] = request_status
|
||||
|
||||
return response
|
||||
|
||||
@failureresponse({"status": RequestStates.running})
|
||||
def terminate_status(self, input_json):
|
||||
ids_to_hostname = {}
|
||||
|
||||
for machine in input_json["machines"]:
|
||||
ids_to_hostname[machine["machineId"]] = machine["name"]
|
||||
|
||||
with self.terminate_json as term_requests:
|
||||
requests = {}
|
||||
for node_id, hostname in ids_to_hostname.items():
|
||||
machine_record = {"machineId": node_id, "name": hostname}
|
||||
found_a_request = False
|
||||
for request_id, request in term_requests.items():
|
||||
if node_id in request["machines"]:
|
||||
|
||||
found_a_request = True
|
||||
|
||||
if request_id not in requests:
|
||||
requests[request_id] = {"machines": []}
|
||||
|
||||
requests[request_id]["machines"].append(machine_record)
|
||||
|
||||
if not found_a_request:
|
||||
logger.warning("No termination request found for machine %s", machine_record)
|
||||
# logger.warning("Forcing termination request for machine %s", machine_record)
|
||||
# import traceback
|
||||
# logger.warning("Traceback:\n%s" % '\n'.join([line for line in traceback.format_stack()]))
|
||||
# terminate_request = { "machines": [ machine_record ]}
|
||||
# self.terminate_machines( terminate_request, lambda x: x )
|
||||
|
||||
deprecated_json = {"requests": [{"requestId": request_id, "machines": requests[request_id]["machines"]} for request_id in requests]}
|
||||
return self._deperecated_terminate_status(deprecated_json)
|
||||
|
||||
|
||||
def _retry_termination_requests(self):
|
||||
with self.terminate_json as terminate_requests:
|
||||
|
@ -1062,7 +1030,7 @@ class CycleCloudProvider:
|
|||
"status": "complete"
|
||||
}
|
||||
"""
|
||||
output_handler = output_handler or self.output_handler
|
||||
output_handler = output_handler or self.stdout_handler
|
||||
logger.info("Terminate_machines request for : %s", input_json)
|
||||
request_id = "delete-%s" % str(uuid.uuid4())
|
||||
request_id_persisted = False
|
||||
|
@ -1127,7 +1095,6 @@ class CycleCloudProvider:
|
|||
'''
|
||||
Kludge: can't seem to get provider.json to reliably call the correct request action.
|
||||
'''
|
||||
output_handler = self.output_handler
|
||||
creates = [x for x in input_json["requests"] if not x["requestId"].startswith("delete-")]
|
||||
deletes = [x for x in input_json["requests"] if x["requestId"].startswith("delete-")]
|
||||
create_response = {}
|
||||
|
@ -1138,7 +1105,7 @@ class CycleCloudProvider:
|
|||
assert "status" in create_response
|
||||
|
||||
if deletes:
|
||||
delete_response = self._deperecated_terminate_status({"requests": deletes})
|
||||
delete_response = self._terminate_status({"requests": deletes})
|
||||
assert "status" in delete_response
|
||||
|
||||
# Update capacity tracking
|
||||
|
@ -1166,7 +1133,7 @@ class CycleCloudProvider:
|
|||
response = {"status": combined_status,
|
||||
"requests": create_response.get("requests", []) + delete_response.get("requests", [])
|
||||
}
|
||||
return output_handler.handle(response)
|
||||
return self.stdout_handler.handle(response)
|
||||
|
||||
def _terminate_expired_requests(self):
|
||||
|
||||
|
@ -1352,12 +1319,11 @@ def main(argv=sys.argv): # pragma: no cover
|
|||
data_dir = os.getenv('PRO_DATA_DIR', os.getcwd())
|
||||
hostnamer = util.Hostnamer(provider_config.get("cyclecloud.hostnames.use_fqdn", True))
|
||||
cluster_name = provider_config.get("cyclecloud.cluster.name")
|
||||
output_handler = JsonOutputHandler()
|
||||
|
||||
provider = CycleCloudProvider(config=provider_config,
|
||||
cluster=cluster.Cluster(cluster_name, provider_config, logger),
|
||||
hostnamer=hostnamer,
|
||||
output_handler=output_handler,
|
||||
stdout_handler=JsonOutputHandler(quiet=False),
|
||||
terminate_requests=JsonStore("terminate_requests.json", data_dir),
|
||||
creation_requests=JsonStore("create_requests.json", data_dir),
|
||||
templates=JsonStore("templates.json", data_dir, formatted=True),
|
||||
|
@ -1377,13 +1343,10 @@ def main(argv=sys.argv): # pragma: no cover
|
|||
provider.templates()
|
||||
elif cmd == "create_machines":
|
||||
provider.create_machines(input_json)
|
||||
elif cmd in ["status", "create_status", "terminate_status"]:
|
||||
elif cmd in ["create_status"]:
|
||||
if "requests" in input_json:
|
||||
# provider.status handles both create_status and deprecated terminate_status calls.
|
||||
# provider.status handles both create_status and terminate_status calls.
|
||||
provider.status(input_json)
|
||||
elif cmd == "terminate_status":
|
||||
# doesn't pass in a requestId but just a list of machines.
|
||||
provider.terminate_status(input_json)
|
||||
else:
|
||||
# should be impossible
|
||||
raise RuntimeError("Unexpected input json for cmd %s" % (input_json, cmd))
|
||||
|
|
Загрузка…
Ссылка в новой задаче