Upgrade to Python3 for CycleCloud 8.x support.
This commit is contained in:
Родитель
a1f6384ad0
Коммит
47deb4f27c
|
@ -187,27 +187,27 @@ def run(args):
|
|||
apps = list_apps()
|
||||
for app in apps:
|
||||
all_tasks, running_tasks, pending_tasks = count_tasks(app)
|
||||
print "Tasks for %s :\t%s\t%s\t%s" % (app, all_tasks, running_tasks, pending_tasks)
|
||||
print("Tasks for %s :\t%s\t%s\t%s" % (app, all_tasks, running_tasks, pending_tasks))
|
||||
estimated_runtime = estimate_runtime_per_task(app)
|
||||
print "\nAvg runtime for %s = %s" % (app, estimated_runtime)
|
||||
print("\nAvg runtime for %s = %s" % (app, estimated_runtime))
|
||||
|
||||
# Max is 1 CPU per Task, but fit to expected Tasks per hour
|
||||
hours_per_task = float(estimated_runtime)/3600
|
||||
demand_by_app[app] = min(all_tasks, int(math.ceil(all_tasks * hours_per_task)))
|
||||
print "Demand for %s = %s with %s tasks" % (app, demand_by_app[app], all_tasks)
|
||||
print("Demand for %s = %s with %s tasks" % (app, demand_by_app[app], all_tasks))
|
||||
|
||||
total_slots, free_slots = count_resources()
|
||||
print "Slots: %s free of %s" % (free_slots, total_slots)
|
||||
print("Slots: %s free of %s" % (free_slots, total_slots))
|
||||
|
||||
total_demand = 0
|
||||
if demand_by_app:
|
||||
total_demand = sum(d for app, d in demand_by_app.iteritems())
|
||||
total_demand = sum(d for app, d in demand_by_app.items())
|
||||
|
||||
print "\nUnmet Demand = %s" % (max(0, total_demand - total_slots))
|
||||
print("\nUnmet Demand = %s" % (max(0, total_demand - total_slots)))
|
||||
|
||||
# TODO: We need to take Dynamic Slot Requests into account (multi-slot or partial-slot tasks)
|
||||
cores_per_slot=1
|
||||
print "Requesting %i ideal %i-core slots from the cloud." % (total_demand, cores_per_slot)
|
||||
print("Requesting %i ideal %i-core slots from the cloud." % (total_demand, cores_per_slot))
|
||||
|
||||
|
||||
# TODO: We should allow each app (or maybe resource group?) to specify a slot_type
|
||||
|
@ -220,7 +220,7 @@ def run(args):
|
|||
'request_cpus': slot_demand
|
||||
}]
|
||||
|
||||
print "Requesting %d slots of type: %s" % (slot_demand, slot_type)
|
||||
print("Requesting %d slots of type: %s" % (slot_demand, slot_type))
|
||||
jetpack.autoscale.scale_by_jobs(autoscale_requests)
|
||||
|
||||
|
||||
|
|
|
@ -134,46 +134,46 @@ def get_resource_status():
|
|||
|
||||
def close_host(hostname, reclaim=True, dry_run=False):
|
||||
if dry_run:
|
||||
print "Would Close host %s with reclaim %s" % (hostname, reclaim)
|
||||
print("Would Close host %s with reclaim %s" % (hostname, reclaim))
|
||||
else:
|
||||
print "Closing host %s with reclaim %s" % (hostname, reclaim)
|
||||
print("Closing host %s with reclaim %s" % (hostname, reclaim))
|
||||
if reclaim:
|
||||
retcode, out, err = egosh(['resource', 'close', '-reclaim', hostname])
|
||||
else:
|
||||
retcode, out, err = egosh(['resource', 'close', hostname])
|
||||
print "Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err)
|
||||
print("Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err))
|
||||
|
||||
|
||||
def close_unavail_hosts(host_status, dry_run=False):
|
||||
for h, status in host_status.iteritems():
|
||||
for h, status in host_status.items():
|
||||
if status.lower() == "unavail":
|
||||
if dry_run:
|
||||
print "Would Close unavail host %s with status %s" % (h, status)
|
||||
print("Would Close unavail host %s with status %s" % (h, status))
|
||||
else:
|
||||
close_host(h, reclaim=False, dry_run=dry_run)
|
||||
|
||||
|
||||
def remove_hosts(host_status, dry_run=False):
|
||||
for h, status in host_status.iteritems():
|
||||
for h, status in host_status.items():
|
||||
if dry_run:
|
||||
print "Would Remove host %s with status %s" % (h, status)
|
||||
print("Would Remove host %s with status %s" % (h, status))
|
||||
else:
|
||||
print "Removing host %s with status %s" % (h, status)
|
||||
print("Removing host %s with status %s" % (h, status))
|
||||
close_host(h, reclaim=True, dry_run=dry_run)
|
||||
retcode, out, err = egosh(['resource', 'remove', h])
|
||||
print "Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err)
|
||||
print("Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err))
|
||||
|
||||
|
||||
def remove_unavail_hosts(host_status, dry_run=False):
|
||||
close_unavail_hosts(host_status, dry_run=dry_run)
|
||||
for h, status in host_status.iteritems():
|
||||
for h, status in host_status.items():
|
||||
if status.lower() == "unavail":
|
||||
if dry_run:
|
||||
print "Would unavail Remove host %s with status %s" % (h, status)
|
||||
print("Would unavail Remove host %s with status %s" % (h, status))
|
||||
else:
|
||||
print "Removing unavail host %s with status %s" % (h, status)
|
||||
print("Removing unavail host %s with status %s" % (h, status))
|
||||
retcode, out, err = egosh(['resource', 'remove', h])
|
||||
print "Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err)
|
||||
print("Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err))
|
||||
|
||||
|
||||
def count_tasks(app_name):
|
||||
|
@ -245,8 +245,8 @@ def estimate_runtime_per_task(app_name):
|
|||
|
||||
def run(args):
|
||||
r_status = get_resource_status()
|
||||
print "Current Resource states: "
|
||||
print r_status
|
||||
print("Current Resource states: ")
|
||||
print(r_status)
|
||||
remove_unavail_hosts(r_status)
|
||||
|
||||
|
||||
|
|
|
@ -134,45 +134,45 @@ def get_resource_status():
|
|||
|
||||
def close_host(hostname, reclaim=True, dry_run=False):
|
||||
if dry_run:
|
||||
print "Would Close host %s with reclaim %s" % (hostname, reclaim)
|
||||
print("Would Close host %s with reclaim %s" % (hostname, reclaim))
|
||||
else:
|
||||
print "Closing host %s with reclaim %s" % (hostname, reclaim)
|
||||
print("Closing host %s with reclaim %s" % (hostname, reclaim))
|
||||
if reclaim:
|
||||
retcode, out, err = egosh(['resource', 'close', '-reclaim', hostname])
|
||||
else:
|
||||
retcode, out, err = egosh(['resource', 'close', hostname])
|
||||
print "Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err)
|
||||
print("Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err))
|
||||
|
||||
|
||||
def close_unavail_hosts(host_status, dry_run=False):
|
||||
for h, status in host_status.iteritems():
|
||||
for h, status in host_status.items():
|
||||
if status.lower() == "unavail":
|
||||
if dry_run:
|
||||
print "Would Close unavail host %s with status %s" % (h, status)
|
||||
print("Would Close unavail host %s with status %s" % (h, status))
|
||||
else:
|
||||
close_host(h, reclaim=False, dry_run=dry_run)
|
||||
|
||||
def remove_hosts(host_status, dry_run=False):
|
||||
for h, status in host_status.iteritems():
|
||||
for h, status in host_status.items():
|
||||
if dry_run:
|
||||
print "Would Remove host %s with status %s" % (h, status)
|
||||
print("Would Remove host %s with status %s" % (h, status))
|
||||
else:
|
||||
print "Removing host %s with status %s" % (h, status)
|
||||
print("Removing host %s with status %s" % (h, status))
|
||||
close_host(h, reclaim=True, dry_run=dry_run)
|
||||
retcode, out, err = egosh(['resource', 'remove', h])
|
||||
print "Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err)
|
||||
print("Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err))
|
||||
|
||||
|
||||
def remove_unavail_hosts(host_status, dry_run=False):
|
||||
close_unavail_hosts(host_status, dry_run=dry_run)
|
||||
for h, status in host_status.iteritems():
|
||||
for h, status in host_status.items():
|
||||
if status.lower() == "unavail":
|
||||
if dry_run:
|
||||
print "Would unavail Remove host %s with status %s" % (h, status)
|
||||
print("Would unavail Remove host %s with status %s" % (h, status))
|
||||
else:
|
||||
print "Removing unavail host %s with status %s" % (h, status)
|
||||
print("Removing unavail host %s with status %s" % (h, status))
|
||||
retcode, out, err = egosh(['resource', 'remove', h])
|
||||
print "Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err)
|
||||
print("Status: %s \n Out: %s \n Err: %s \n" % (retcode, out, err))
|
||||
|
||||
def count_tasks(app_name):
|
||||
all_tasks = 0
|
||||
|
@ -247,27 +247,27 @@ def run(args):
|
|||
apps = list_apps()
|
||||
for app in apps:
|
||||
all_tasks, running_tasks, pending_tasks = count_tasks(app)
|
||||
print "Tasks for %s :\t%s\t%s\t%s" % (app, all_tasks, running_tasks, pending_tasks)
|
||||
print("Tasks for %s :\t%s\t%s\t%s" % (app, all_tasks, running_tasks, pending_tasks))
|
||||
estimated_runtime = estimate_runtime_per_task(app)
|
||||
print "\nAvg runtime for %s = %s" % (app, estimated_runtime)
|
||||
print("\nAvg runtime for %s = %s" % (app, estimated_runtime))
|
||||
|
||||
# Max is 1 CPU per Task, but fit to expected Tasks per hour
|
||||
hours_per_task = float(estimated_runtime)/3600
|
||||
demand_by_app[app] = min(all_tasks, int(math.ceil(all_tasks * hours_per_task)))
|
||||
print "Demand for %s = %s with %s tasks" % (app, demand_by_app[app], all_tasks)
|
||||
print("Demand for %s = %s with %s tasks" % (app, demand_by_app[app], all_tasks))
|
||||
|
||||
total_slots, free_slots = count_resources()
|
||||
print "Slots: %s free of %s" % (free_slots, total_slots)
|
||||
print("Slots: %s free of %s" % (free_slots, total_slots))
|
||||
|
||||
total_demand = 0
|
||||
if demand_by_app:
|
||||
total_demand = sum(d for app, d in demand_by_app.iteritems())
|
||||
total_demand = sum(d for app, d in demand_by_app.items())
|
||||
|
||||
print "\nUnmet Demand = %s" % (max(0, total_demand - total_slots))
|
||||
print("\nUnmet Demand = %s" % (max(0, total_demand - total_slots)))
|
||||
|
||||
# TODO: We need to take Dynamic Slot Requests into account (multi-slot or partial-slot tasks)
|
||||
cores_per_slot=1
|
||||
print "Requesting %i ideal %i-core slots from the cloud." % (total_demand, cores_per_slot)
|
||||
print("Requesting %i ideal %i-core slots from the cloud." % (total_demand, cores_per_slot))
|
||||
|
||||
|
||||
# TODO: We should allow each app (or maybe resource group?) to specify a slot_type
|
||||
|
@ -280,11 +280,11 @@ def run(args):
|
|||
'request_cpus': slot_demand
|
||||
}]
|
||||
|
||||
print "Requesting %d slots of type: %s" % (slot_demand, slot_type)
|
||||
print("Requesting %d slots of type: %s" % (slot_demand, slot_type))
|
||||
|
||||
r_status = get_resource_status()
|
||||
print "Current Resource states: "
|
||||
print r_status
|
||||
print("Current Resource states: ")
|
||||
print(r_status)
|
||||
remove_unavail_hosts(r_status, dry_run=True)
|
||||
|
||||
|
||||
|
|
|
@ -76,7 +76,7 @@ class CapacityTrackingDb:
|
|||
|
||||
now = calendar.timegm(self.clock())
|
||||
expired = []
|
||||
for k, v in self.capacity_db.read().iteritems():
|
||||
for k, v in self.capacity_db.read().items():
|
||||
if _limit_expired(now, v):
|
||||
expired.append(k)
|
||||
self.remove_limits(expired)
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
import json
|
||||
|
||||
import logging
|
||||
from urllib import urlencode
|
||||
from urllib.parse import urlencode
|
||||
from builtins import str
|
||||
|
||||
|
||||
try:
|
||||
|
@ -42,13 +43,13 @@ class Cluster:
|
|||
# if Symphony may have a stale machineId -> hostname mapping, so find the existing instance with that hostname and kill it
|
||||
machine_names = [ machine["name"].split(".")[0] for machine in machines if machine.get("name") ]
|
||||
if machine_names:
|
||||
self.logger.warn("Terminating the following nodes by machine_names: %s", machine_names)
|
||||
self.logger.warning("Terminating the following nodes by machine_names: %s", machine_names)
|
||||
|
||||
f = urlencode({"instance-filter": 'HostName in {%s}' % ",".join('"%s"' % x for x in machine_names)})
|
||||
try:
|
||||
self.post("/cloud/actions/terminate_node/%s?%s" % (self.cluster_name, f))
|
||||
except cyclecli.UserError as e:
|
||||
if "No instances were found matching your query" in unicode(e):
|
||||
if "No instances were found matching your query" in str(e):
|
||||
return
|
||||
raise
|
||||
|
||||
|
|
|
@ -8,6 +8,8 @@ import os
|
|||
import pprint
|
||||
import sys
|
||||
import uuid
|
||||
from builtins import str
|
||||
|
||||
|
||||
from symphony import RequestStates, MachineStates, MachineResults, SymphonyRestClient
|
||||
import cluster
|
||||
|
@ -206,10 +208,10 @@ class CycleCloudProvider:
|
|||
|
||||
if custom_env:
|
||||
record["UserData"]["symphony"] = {"custom_env": custom_env,
|
||||
"custom_env_names": " ".join(sorted(custom_env.iterkeys()))}
|
||||
"custom_env_names": " ".join(sorted(custom_env))}
|
||||
|
||||
record["UserData"]["symphony"]["attributes"] = attributes
|
||||
record["UserData"]["symphony"]["attribute_names"] = " ".join(sorted(attributes.iterkeys()))
|
||||
record["UserData"]["symphony"]["attribute_names"] = " ".join(sorted(attributes))
|
||||
|
||||
record["pgrpName"] = None
|
||||
|
||||
|
@ -234,7 +236,7 @@ class CycleCloudProvider:
|
|||
record_mpi["attributes"]["azureccmpi"] = ["Boolean", "1"]
|
||||
record_mpi["UserData"]["symphony"]["attributes"]["azureccmpi"] = True
|
||||
# regenerate names, as we have added placementgroup
|
||||
record_mpi["UserData"]["symphony"]["attribute_names"] = " ".join(sorted(record_mpi["attributes"].iterkeys()))
|
||||
record_mpi["UserData"]["symphony"]["attribute_names"] = " ".join(sorted(record_mpi["attributes"]))
|
||||
record_mpi["priority"] = record_mpi["priority"] - n - 1
|
||||
record_mpi["templateId"] = template_id
|
||||
record_mpi["maxNumber"] = min(record["maxNumber"], nodearray.get("Azure", {}).get("MaxScalesetSize", 40))
|
||||
|
@ -259,8 +261,8 @@ class CycleCloudProvider:
|
|||
generator = difflib.context_diff(prior_templates_str.splitlines(), new_templates_str.splitlines())
|
||||
difference = "\n".join([str(x) for x in generator])
|
||||
new_template_order = ", ".join(["%s:%s" % (x.get("templateId", "?"), x.get("maxNumber", "?")) for x in symphony_templates])
|
||||
logger.warn("Templates have changed - new template priority order: %s", new_template_order)
|
||||
logger.warn("Diff:\n%s", str(difference))
|
||||
logger.warning("Templates have changed - new template priority order: %s", new_template_order)
|
||||
logger.warning("Diff:\n%s", str(difference))
|
||||
try:
|
||||
rest_client = SymphonyRestClient(self.config, logger)
|
||||
rest_client.update_hostfactory_templates({"templates": symphony_templates, "message": "Get available templates success."})
|
||||
|
@ -283,7 +285,7 @@ class CycleCloudProvider:
|
|||
def generate_userdata(self, template):
|
||||
ret = {}
|
||||
|
||||
for key, value_array in template.get("attributes", {}).iteritems():
|
||||
for key, value_array in template.get("attributes", {}).items():
|
||||
if len(value_array) != 2:
|
||||
logger.error("Invalid attribute %s %s", key, value_array)
|
||||
continue
|
||||
|
@ -358,7 +360,7 @@ class CycleCloudProvider:
|
|||
active_count = bucket["activeCount"]
|
||||
max_count = min(max_count, active_count+self.spot_maxnumber_increment_per_sku)
|
||||
|
||||
return max_count
|
||||
return int(max_count)
|
||||
|
||||
@failureresponse({"requests": [], "status": RequestStates.running})
|
||||
def create_machines(self, input_json):
|
||||
|
@ -431,15 +433,15 @@ class CycleCloudProvider:
|
|||
except UserError as e:
|
||||
logger.exception("Azure CycleCloud experienced an error and the node creation request failed. %s", e)
|
||||
return self.json_writer({"requestId": request_id, "status": RequestStates.complete_with_error,
|
||||
"message": "Azure CycleCloud experienced an error: %s" % unicode(e)})
|
||||
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
|
||||
except ValueError as e:
|
||||
logger.exception("Azure CycleCloud experienced an error and the node creation request failed. %s", e)
|
||||
return self.json_writer({"requestId": request_id, "status": RequestStates.complete_with_error,
|
||||
"message": "Azure CycleCloud experienced an error: %s" % unicode(e)})
|
||||
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
|
||||
except Exception as e:
|
||||
logger.exception("Azure CycleCloud experienced an error, though it may have succeeded: %s", e)
|
||||
return self.json_writer({"requestId": request_id, "status": RequestStates.running,
|
||||
"message": "Azure CycleCloud experienced an error, though it may have succeeded: %s" % unicode(e)})
|
||||
"message": "Azure CycleCloud experienced an error, though it may have succeeded: %s" % str(e)})
|
||||
|
||||
@failureresponse({"requests": [], "status": RequestStates.complete_with_error})
|
||||
def get_return_requests(self, input_json):
|
||||
|
@ -479,12 +481,12 @@ class CycleCloudProvider:
|
|||
logger.exception("Azure CycleCloud experienced an error and the get return request failed. %s", e)
|
||||
return self.json_writer({"status": RequestStates.complete_with_error,
|
||||
"requests": [],
|
||||
"message": "Azure CycleCloud experienced an error: %s" % unicode(e)})
|
||||
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
|
||||
except ValueError as e:
|
||||
logger.exception("Azure CycleCloud experienced an error and the get return request failed. %s", e)
|
||||
return self.json_writer({"status": RequestStates.complete_with_error,
|
||||
"requests": [],
|
||||
"message": "Azure CycleCloud experienced an error: %s" % unicode(e)})
|
||||
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
|
||||
|
||||
message = ""
|
||||
report_failure_states = ["Unavailable", "Failed"]
|
||||
|
@ -500,7 +502,7 @@ class CycleCloudProvider:
|
|||
try:
|
||||
hostname = self.hostnamer.hostname(node.get("PrivateIp"))
|
||||
except Exception:
|
||||
logger.warn("get_return_requests: No hostname set and could not convert ip %s to hostname for \"%s\" VM.", node.get("PrivateIp"), node)
|
||||
logger.warning("get_return_requests: No hostname set and could not convert ip %s to hostname for \"%s\" VM.", node.get("PrivateIp"), node)
|
||||
|
||||
machine = {"gracePeriod": 0,
|
||||
"machine": hostname or ""}
|
||||
|
@ -549,12 +551,13 @@ class CycleCloudProvider:
|
|||
except UserError as e:
|
||||
logger.exception("Azure CycleCloud experienced an error and the node creation request failed. %s", e)
|
||||
return self.json_writer({"status": RequestStates.complete_with_error,
|
||||
"requests": [{"requestId": request_id, "status": RequestStates.complete_with_error} for request_id in request_ids] ,
|
||||
"message": "Azure CycleCloud experienced an error: %s" % unicode(e)})
|
||||
"requests": [{"requestId": request_id, "status": RequestStates.complete_with_error} for request_id in request_ids],
|
||||
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
|
||||
except ValueError as e:
|
||||
logger.exception("Azure CycleCloud experienced an error and the node creation request failed. %s", e)
|
||||
return self.json_writer({"requestId": request_id, "status": RequestStates.complete_with_error,
|
||||
"message": "Azure CycleCloud experienced an error: %s" % unicode(e)})
|
||||
return self.json_writer({"status": RequestStates.complete_with_error,
|
||||
"requests": [{"requestId": request_id, "status": RequestStates.complete_with_error} for request_id in request_ids],
|
||||
"message": "Azure CycleCloud experienced an error: %s" % str(e)})
|
||||
|
||||
message = ""
|
||||
|
||||
|
@ -563,10 +566,10 @@ class CycleCloudProvider:
|
|||
unknown_state_count = 0
|
||||
requesting_count = 0
|
||||
|
||||
for request_id, requested_nodes in nodes_by_request_id.iteritems():
|
||||
for request_id, requested_nodes in nodes_by_request_id.items():
|
||||
if not requested_nodes:
|
||||
# nothing to do.
|
||||
logger.warn("No nodes found for request id %s.", request_id)
|
||||
logger.warning("No nodes found for request id %s.", request_id)
|
||||
|
||||
machines = []
|
||||
request = {"requestId": request_id,
|
||||
|
@ -616,12 +619,12 @@ class CycleCloudProvider:
|
|||
try:
|
||||
hostname = self.hostnamer.hostname(node.get("PrivateIp"))
|
||||
except Exception:
|
||||
logger.warn("_create_status: No hostname set and could not convert ip %s to hostname for \"%s\" VM.", node.get("PrivateIp"), node_status)
|
||||
logger.warning("_create_status: No hostname set and could not convert ip %s to hostname for \"%s\" VM.", node.get("PrivateIp"), node_status)
|
||||
|
||||
try:
|
||||
logger.warn("Warning: Cluster status check terminating failed node %s", node)
|
||||
logger.warning("Warning: Cluster status check terminating failed node %s", node)
|
||||
# import traceback
|
||||
#logger.warn("Traceback:\n%s", '\n'.join([line for line in traceback.format_stack()]))
|
||||
#logger.warning("Traceback:\n%s", '\n'.join([line for line in traceback.format_stack()]))
|
||||
self.cluster.terminate([{"machineId": node.get("NodeId"), "name": hostname}])
|
||||
except Exception:
|
||||
logger.exception("Could not terminate node with id %s" % node.get("NodeId"))
|
||||
|
@ -637,7 +640,7 @@ class CycleCloudProvider:
|
|||
machine_status = MachineStates.active
|
||||
private_ip_address = node.get("PrivateIp")
|
||||
if not private_ip_address:
|
||||
logger.warn("No ip address found for ready node %s", node.get("Name"))
|
||||
logger.warning("No ip address found for ready node %s", node.get("Name"))
|
||||
machine_result = MachineResults.executing
|
||||
machine_status = MachineStates.building
|
||||
request_status = RequestStates.running
|
||||
|
@ -647,7 +650,7 @@ class CycleCloudProvider:
|
|||
try:
|
||||
hostname = self.hostnamer.hostname(node.get("PrivateIp"))
|
||||
except Exception:
|
||||
logger.warn("_create_status: No hostname set and could not convert ip %s to hostname for \"%s\" VM.", node.get("PrivateIp"), node)
|
||||
logger.warning("_create_status: No hostname set and could not convert ip %s to hostname for \"%s\" VM.", node.get("PrivateIp"), node)
|
||||
else:
|
||||
machine_result = MachineResults.executing
|
||||
machine_status = MachineStates.building
|
||||
|
@ -680,7 +683,7 @@ class CycleCloudProvider:
|
|||
if request_status == RequestStates.complete:
|
||||
logger.info("Request %s is complete.", request_id)
|
||||
elif request_status == RequestStates.complete_with_error:
|
||||
logger.warn("Request %s completed with error: %s.", request_id, message)
|
||||
logger.warning("Request %s completed with error: %s.", request_id, message)
|
||||
request["message"] = message
|
||||
|
||||
response["status"] = symphony.RequestStates.complete
|
||||
|
@ -713,11 +716,11 @@ class CycleCloudProvider:
|
|||
if termination_id in terminate_requests:
|
||||
termination = terminate_requests[termination_id]
|
||||
if not termination.get("terminated"):
|
||||
for machine_id, name in termination["machines"].iteritems():
|
||||
for machine_id, name in termination["machines"].items():
|
||||
machines_to_terminate.append({"machineId": machine_id, "name": name})
|
||||
|
||||
if machines_to_terminate:
|
||||
logger.warn("Re-attempting termination of nodes %s", machines_to_terminate)
|
||||
logger.warning("Re-attempting termination of nodes %s", machines_to_terminate)
|
||||
self.cluster.terminate(machines_to_terminate)
|
||||
|
||||
for termination_id in termination_ids:
|
||||
|
@ -741,19 +744,19 @@ class CycleCloudProvider:
|
|||
machines = termination_request.get("machines", {})
|
||||
|
||||
if machines:
|
||||
logger.info("Terminating machines: %s", [hostname for hostname in machines.itervalues()])
|
||||
logger.info("Terminating machines: %s", [hostname for hostname in machines.values()])
|
||||
else:
|
||||
logger.warn("No machines found for termination request %s. Will retry.", termination_id)
|
||||
logger.warning("No machines found for termination request %s. Will retry.", termination_id)
|
||||
request_status = RequestStates.running
|
||||
|
||||
for machine_id, hostname in machines.iteritems():
|
||||
for machine_id, hostname in machines.items():
|
||||
response_machines.append({"name": hostname,
|
||||
"status": MachineStates.deleted,
|
||||
"result": MachineResults.succeed,
|
||||
"machineId": machine_id})
|
||||
else:
|
||||
# we don't recognize this termination request!
|
||||
logger.warn("Unknown termination request %s. You may intervene manually by updating terminate_nodes.json" +
|
||||
logger.warning("Unknown termination request %s. You may intervene manually by updating terminate_nodes.json" +
|
||||
" to contain the relevant NodeIds. %s ", termination_id, terminate_requests)
|
||||
# set to running so symphony will keep retrying, hopefully, until someone intervenes.
|
||||
request_status = RequestStates.running
|
||||
|
@ -774,10 +777,10 @@ class CycleCloudProvider:
|
|||
|
||||
with self.terminate_json as term_requests:
|
||||
requests = {}
|
||||
for node_id, hostname in ids_to_hostname.iteritems():
|
||||
for node_id, hostname in ids_to_hostname.items():
|
||||
machine_record = {"machineId": node_id, "name": hostname}
|
||||
found_a_request = False
|
||||
for request_id, request in term_requests.iteritems():
|
||||
for request_id, request in term_requests.items():
|
||||
if node_id in request["machines"]:
|
||||
|
||||
found_a_request = True
|
||||
|
@ -788,10 +791,10 @@ class CycleCloudProvider:
|
|||
requests[request_id]["machines"].append(machine_record)
|
||||
|
||||
if not found_a_request:
|
||||
logger.warn("No termination request found for machine %s", machine_record)
|
||||
# logger.warn("Forcing termination request for machine %s", machine_record)
|
||||
logger.warning("No termination request found for machine %s", machine_record)
|
||||
# logger.warning("Forcing termination request for machine %s", machine_record)
|
||||
# import traceback
|
||||
# logger.warn("Traceback:\n%s" % '\n'.join([line for line in traceback.format_stack()]))
|
||||
# logger.warning("Traceback:\n%s" % '\n'.join([line for line in traceback.format_stack()]))
|
||||
# terminate_request = { "machines": [ machine_record ]}
|
||||
# self.terminate_machines( terminate_request, lambda x: x )
|
||||
|
||||
|
@ -888,10 +891,10 @@ class CycleCloudProvider:
|
|||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(unicode(e))
|
||||
logger.exception(str(e))
|
||||
if request_id_persisted:
|
||||
return json_writer({"status": RequestStates.running, "requestId": request_id})
|
||||
return json_writer({"status": RequestStates.complete_with_error, "requestId": request_id, "message": unicode(e)})
|
||||
return json_writer({"status": RequestStates.complete_with_error, "requestId": request_id, "message": str(e)})
|
||||
|
||||
def status(self, input_json):
|
||||
'''
|
||||
|
@ -952,7 +955,7 @@ def _placement_groups(config):
|
|||
if num_placement_groups <= 0:
|
||||
return []
|
||||
else:
|
||||
return ["pg%s" % x for x in xrange(num_placement_groups)]
|
||||
return ["pg%s" % x for x in range(num_placement_groups)]
|
||||
|
||||
|
||||
def simple_json_writer(data, debug_output=True): # pragma: no cover
|
||||
|
@ -1015,11 +1018,11 @@ def main(argv=sys.argv, json_writer=simple_json_writer): # pragma: no cover
|
|||
provider.terminate_machines(input_json)
|
||||
|
||||
except ImportError as e:
|
||||
logger.exception(unicode(e))
|
||||
logger.exception(str(e))
|
||||
|
||||
except Exception as e:
|
||||
if logger:
|
||||
logger.exception(unicode(e))
|
||||
logger.exception(str(e))
|
||||
else:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
|
|
@ -2,7 +2,7 @@ import json
|
|||
|
||||
import logging
|
||||
import requests
|
||||
from urllib import urlencode
|
||||
from urllib.parse import urlencode
|
||||
|
||||
class MachineStates:
|
||||
building = "building"
|
||||
|
|
|
@ -9,6 +9,7 @@ import shutil
|
|||
import subprocess
|
||||
import sys
|
||||
import traceback
|
||||
from builtins import str
|
||||
|
||||
|
||||
try:
|
||||
|
@ -152,7 +153,7 @@ def failureresponse(response):
|
|||
return func(*args, **kwargs)
|
||||
except cyclecli.UserError as ue:
|
||||
with_message = deepcopy(response)
|
||||
message = unicode(ue)
|
||||
message = str(ue)
|
||||
logger.debug(traceback.format_exc())
|
||||
|
||||
try:
|
||||
|
@ -164,13 +165,13 @@ def failureresponse(response):
|
|||
with_message["message"] = message
|
||||
return args[0].json_writer(with_message)
|
||||
except Exception as e:
|
||||
logger.exception(unicode(e))
|
||||
logger.exception(str(e))
|
||||
logger.debug(traceback.format_exc())
|
||||
with_message = deepcopy(response)
|
||||
with_message["message"] = unicode(e)
|
||||
with_message["message"] = str(e)
|
||||
return args[0].json_writer(with_message)
|
||||
except: # nopep8 ignore the bare except
|
||||
logger.exception(unicode(e))
|
||||
logger.exception("Caught unknown exception...")
|
||||
logger.debug(traceback.format_exc())
|
||||
with_message = deepcopy(response)
|
||||
with_message["message"] = traceback.format_exc()
|
||||
|
@ -203,7 +204,7 @@ class ProviderConfig:
|
|||
break
|
||||
|
||||
if not hasattr(top_value, "keys"):
|
||||
self.logger.warn("Invalid format, as a child key was specified for %s when its type is %s ", key, type(top_value))
|
||||
self.logger.warning("Invalid format, as a child key was specified for %s when its type is %s ", key, type(top_value))
|
||||
return {}
|
||||
|
||||
value = top_value.get(keys[n])
|
||||
|
@ -217,7 +218,7 @@ class ProviderConfig:
|
|||
try:
|
||||
return self.jetpack_config.get(key, default_value)
|
||||
except cyclecli.ConfigError as e:
|
||||
if key in unicode(e):
|
||||
if key in str(e):
|
||||
return default_value
|
||||
raise
|
||||
|
||||
|
@ -293,20 +294,20 @@ def provider_config_from_environment(pro_conf_dir=os.getenv('PRO_CONF_DIR', os.g
|
|||
|
||||
# don't let the user define these in two places
|
||||
if config.pop("templates", {}):
|
||||
logger.warn("Please define template overrides in %s, not the azureccprov_config.json" % templates_file)
|
||||
logger.warning("Please define template overrides in %s, not the azureccprov_config.json" % templates_file)
|
||||
|
||||
# and merge them so it is transparent to the code
|
||||
flattened_templates = {}
|
||||
for template in customer_templates.get("templates", []):
|
||||
|
||||
if "templateId" not in template:
|
||||
logger.warn("Skipping template because templateId is not defined: %s", template)
|
||||
logger.warning("Skipping template because templateId is not defined: %s", template)
|
||||
continue
|
||||
|
||||
nodearray = template.pop("templateId") # definitely don't want to rename them as machineId
|
||||
|
||||
if nodearray in flattened_templates:
|
||||
logger.warn("Ignoring redefinition of templateId %s", nodearray)
|
||||
logger.warning("Ignoring redefinition of templateId %s", nodearray)
|
||||
continue
|
||||
|
||||
flattened_templates[nodearray] = template
|
||||
|
|
|
@ -79,7 +79,7 @@ class MockCluster:
|
|||
if machine_type in self.limit_capacity:
|
||||
available_capacity = self.limit_capacity[machine_type]
|
||||
if count > available_capacity:
|
||||
print "Capacity Limited! <%s> Requested: <%d> Available Capacity: <%d>" % (machine_type, count, available_capacity)
|
||||
print("Capacity Limited! <%s> Requested: <%d> Available Capacity: <%d>" % (machine_type, count, available_capacity))
|
||||
count = available_capacity
|
||||
|
||||
for i in range(count):
|
||||
|
@ -119,10 +119,10 @@ class MockCluster:
|
|||
'''
|
||||
|
||||
def _yield_nodes(**attrs):
|
||||
for nodes_for_template in self._nodes.itervalues():
|
||||
for nodes_for_template in self._nodes.values():
|
||||
for node in nodes_for_template:
|
||||
all_match = True
|
||||
for key, value in attrs.iteritems():
|
||||
for key, value in attrs.items():
|
||||
if isinstance(value, list) or isinstance(value, set):
|
||||
all_match = all_match and node[key] in value
|
||||
else:
|
||||
|
@ -169,37 +169,37 @@ class TestHostFactory(unittest.TestCase):
|
|||
|
||||
templates = provider.templates()["templates"]
|
||||
|
||||
self.assertEquals(3, len(templates))
|
||||
self.assertEquals("executea4", templates[0]["templateId"])
|
||||
self.assertEqual(3, len(templates))
|
||||
self.assertEqual("executea4", templates[0]["templateId"])
|
||||
# WARNING: LSF does not quote Numerics and Symphony does (Symphony will likely upgrade to match LSF eventually)
|
||||
self.assertEquals(["Numeric", '4'], templates[0]["attributes"]["ncores"])
|
||||
self.assertEquals(["Numeric", '1'], templates[0]["attributes"]["ncpus"])
|
||||
self.assertEqual(["Numeric", '4'], templates[0]["attributes"]["ncores"])
|
||||
self.assertEqual(["Numeric", '1'], templates[0]["attributes"]["ncpus"])
|
||||
|
||||
provider.cluster._nodearrays["nodearrays"][0]["buckets"].append({"maxCount": 2, "definition": {"machineType": "A8"}, "virtualMachine": MACHINE_TYPES["A8"]})
|
||||
|
||||
templates = provider.templates()["templates"]
|
||||
|
||||
self.assertEquals(4, len(templates))
|
||||
self.assertEqual(4, len(templates))
|
||||
a4 = [t for t in templates if t["templateId"] == "executea4"][0]
|
||||
a8 = [t for t in templates if t["templateId"] == "executea8"][0]
|
||||
lpa4 = [t for t in templates if t["templateId"] == "lpexecutea4"][0]
|
||||
lpa8 = [t for t in templates if t["templateId"] == "lpexecutea8"][0]
|
||||
|
||||
self.assertEquals(["Numeric", '4'], a4["attributes"]["ncores"])
|
||||
self.assertEquals(["Numeric", '1'], a4["attributes"]["ncpus"])
|
||||
self.assertEquals(["Numeric", '1024'], a4["attributes"]["mem"])
|
||||
self.assertEquals(["String", "X86_64"], a4["attributes"]["type"])
|
||||
self.assertEqual(["Numeric", '4'], a4["attributes"]["ncores"])
|
||||
self.assertEqual(["Numeric", '1'], a4["attributes"]["ncpus"])
|
||||
self.assertEqual(["Numeric", '1024'], a4["attributes"]["mem"])
|
||||
self.assertEqual(["String", "X86_64"], a4["attributes"]["type"])
|
||||
|
||||
self.assertEquals(["Numeric", '8'], a8["attributes"]["ncores"])
|
||||
self.assertEquals(["Numeric", '1'], a8["attributes"]["ncpus"])
|
||||
self.assertEquals(["Numeric", '2048'], a8["attributes"]["mem"])
|
||||
self.assertEquals(["String", "X86_64"], a8["attributes"]["type"])
|
||||
self.assertEqual(["Numeric", '8'], a8["attributes"]["ncores"])
|
||||
self.assertEqual(["Numeric", '1'], a8["attributes"]["ncpus"])
|
||||
self.assertEqual(["Numeric", '2048'], a8["attributes"]["mem"])
|
||||
self.assertEqual(["String", "X86_64"], a8["attributes"]["type"])
|
||||
|
||||
|
||||
self.assertEquals(["Boolean", "0"], a4["attributes"]["azurecclowprio"])
|
||||
self.assertEquals(["Boolean", "0"], a8["attributes"]["azurecclowprio"])
|
||||
self.assertEquals(["Boolean", "1"], lpa4["attributes"]["azurecclowprio"])
|
||||
self.assertEquals(["Boolean", "1"], lpa8["attributes"]["azurecclowprio"])
|
||||
self.assertEqual(["Boolean", "0"], a4["attributes"]["azurecclowprio"])
|
||||
self.assertEqual(["Boolean", "0"], a8["attributes"]["azurecclowprio"])
|
||||
self.assertEqual(["Boolean", "1"], lpa4["attributes"]["azurecclowprio"])
|
||||
self.assertEqual(["Boolean", "1"], lpa8["attributes"]["azurecclowprio"])
|
||||
|
||||
|
||||
request = provider.create_machines(self._make_request("executea4", 1))
|
||||
|
@ -225,27 +225,27 @@ class TestHostFactory(unittest.TestCase):
|
|||
statuses = provider.status({"requests": [{"requestId": request["requestId"], 'sets': [request]}]})
|
||||
|
||||
request_status_obj = statuses["requests"][0]
|
||||
self.assertEquals(expected_request_status, request_status_obj["status"])
|
||||
self.assertEqual(expected_request_status, request_status_obj["status"])
|
||||
machines = request_status_obj["machines"]
|
||||
self.assertEquals(expected_machines, len(machines))
|
||||
self.assertEquals(expected_node_status, mutable_node[0]["State"])
|
||||
self.assertEqual(expected_machines, len(machines))
|
||||
self.assertEqual(expected_node_status, mutable_node[0]["State"])
|
||||
|
||||
if expected_machines == 0:
|
||||
return
|
||||
|
||||
for n, m in enumerate(machines):
|
||||
if m["privateIpAddress"]:
|
||||
self.assertEquals(MockHostnamer().hostname(m["privateIpAddress"]), m["name"])
|
||||
self.assertEquals("execute-%d_id" % (n + 1), m["machineId"])
|
||||
self.assertEquals(expected_machine_status, m["status"])
|
||||
self.assertEquals(expected_machine_result, m["result"])
|
||||
self.assertEqual(MockHostnamer().hostname(m["privateIpAddress"]), m["name"])
|
||||
self.assertEqual("execute-%d_id" % (n + 1), m["machineId"])
|
||||
self.assertEqual(expected_machine_status, m["status"])
|
||||
self.assertEqual(expected_machine_result, m["result"])
|
||||
|
||||
if node_status == "Failed" and provider.config.get("symphony.terminate_failed_nodes", False):
|
||||
mutable_node = provider.cluster.inodes(Name="execute-1")
|
||||
self.assertEquals(mutable_node[0].get("TargetState"), "Terminated")
|
||||
self.assertEqual(mutable_node[0].get("TargetState"), "Terminated")
|
||||
else:
|
||||
mutable_node = provider.cluster.inodes(Name="execute-1")
|
||||
self.assertEquals(mutable_node[0].get("TargetState"), node_target_state)
|
||||
self.assertEqual(mutable_node[0].get("TargetState"), node_target_state)
|
||||
|
||||
# no instanceid == no machines
|
||||
run_test(instance=None, expected_machines=0)
|
||||
|
@ -306,10 +306,10 @@ class TestHostFactory(unittest.TestCase):
|
|||
provider = self._new_provider()
|
||||
provider.templates()
|
||||
request1 = provider.create_machines(self._make_request("executea4", 1))
|
||||
self.assertEquals(RequestStates.running, request1["status"])
|
||||
self.assertEqual(RequestStates.running, request1["status"])
|
||||
|
||||
request2 = provider.create_machines(self._make_request("executea4", 4))
|
||||
self.assertEquals(RequestStates.running, request2["status"])
|
||||
self.assertEqual(RequestStates.running, request2["status"])
|
||||
|
||||
# Order of statuses is undefined
|
||||
def find_request_status(request_status, request):
|
||||
|
@ -319,24 +319,24 @@ class TestHostFactory(unittest.TestCase):
|
|||
return None
|
||||
|
||||
request_status = provider.status({'requests': [request1, request2]})
|
||||
self.assertEquals(RequestStates.complete, request_status["status"])
|
||||
self.assertEqual(RequestStates.complete, request_status["status"])
|
||||
request_status1 = find_request_status(request_status, request1)
|
||||
request_status2 = find_request_status(request_status, request2)
|
||||
self.assertEquals(RequestStates.running, request_status1["status"])
|
||||
self.assertEquals(0, len(request_status1["machines"]))
|
||||
self.assertEquals(RequestStates.running, request_status2["status"])
|
||||
self.assertEquals(0, len(request_status2["machines"]))
|
||||
self.assertEqual(RequestStates.running, request_status1["status"])
|
||||
self.assertEqual(0, len(request_status1["machines"]))
|
||||
self.assertEqual(RequestStates.running, request_status2["status"])
|
||||
self.assertEqual(0, len(request_status2["machines"]))
|
||||
|
||||
provider.cluster.complete_node_startup([request1['requestId'], request2['requestId']])
|
||||
|
||||
request_status = provider.status({'requests': [request1, request2]})
|
||||
self.assertEquals(RequestStates.complete, request_status["status"])
|
||||
self.assertEqual(RequestStates.complete, request_status["status"])
|
||||
request_status1 = find_request_status(request_status, request1)
|
||||
request_status2 = find_request_status(request_status, request2)
|
||||
self.assertEquals(RequestStates.complete, request_status1["status"])
|
||||
self.assertEquals(1, len(request_status1["machines"]))
|
||||
self.assertEquals(RequestStates.complete, request_status2["status"])
|
||||
self.assertEquals(4, len(request_status2["machines"]))
|
||||
self.assertEqual(RequestStates.complete, request_status1["status"])
|
||||
self.assertEqual(1, len(request_status1["machines"]))
|
||||
self.assertEqual(RequestStates.complete, request_status2["status"])
|
||||
self.assertEqual(4, len(request_status2["machines"]))
|
||||
|
||||
def test_capacity_limited_create(self):
|
||||
provider = self._new_provider()
|
||||
|
@ -344,24 +344,24 @@ class TestHostFactory(unittest.TestCase):
|
|||
|
||||
# we can _never_ return an empty list, so in the case of no remaining capacity, return placeholder
|
||||
# a8bucket["maxCoreCount"] = 0
|
||||
# self.assertEquals(cyclecloud_provider.PLACEHOLDER_TEMPLATE, provider.templates()["templates"][0])
|
||||
# self.assertEqual(cyclecloud_provider.PLACEHOLDER_TEMPLATE, provider.templates()["templates"][0])
|
||||
|
||||
# CC thinks there are up to 50 VMs available
|
||||
a4bucket["maxCount"] = 50
|
||||
templates = provider.templates()
|
||||
self.assertEquals(50, templates["templates"][0]["maxNumber"])
|
||||
self.assertEqual(50, templates["templates"][0]["maxNumber"])
|
||||
|
||||
# Request 10 VMs, but get 1 due to out-of-capacity
|
||||
provider.cluster.limit_capacity[a4bucket['definition']['machineType']] = 1
|
||||
request = provider.create_machines(self._make_request("executea4", 10))
|
||||
self.assertEquals(RequestStates.running, request["status"])
|
||||
self.assertEqual(RequestStates.running, request["status"])
|
||||
|
||||
provider.cluster.complete_node_startup([request['requestId']])
|
||||
|
||||
request_status = provider.status({'requests': [request]})
|
||||
self.assertEquals(RequestStates.complete, request_status["status"])
|
||||
self.assertEquals(RequestStates.complete, request_status["requests"][0]["status"])
|
||||
self.assertEquals(1, len(request_status["requests"][0]["machines"]))
|
||||
self.assertEqual(RequestStates.complete, request_status["status"])
|
||||
self.assertEqual(RequestStates.complete, request_status["requests"][0]["status"])
|
||||
self.assertEqual(1, len(request_status["requests"][0]["machines"]))
|
||||
|
||||
# IMPORTANT:
|
||||
# Since numRequested < MaxCount and numCreated < numRequested, we're going to assume
|
||||
|
@ -374,98 +374,98 @@ class TestHostFactory(unittest.TestCase):
|
|||
term_requests = provider.terminate_json
|
||||
term_response = provider.terminate_machines({"machines": [{"name": "host-123", "machineId": "id-123"}]})
|
||||
|
||||
self.assertEquals(term_response["status"], "complete")
|
||||
self.assertEqual(term_response["status"], "complete")
|
||||
self.assertTrue(term_response["requestId"] in term_requests.requests)
|
||||
self.assertEquals({"id-123": "host-123"}, term_requests.requests[term_response["requestId"]]["machines"])
|
||||
self.assertEqual({"id-123": "host-123"}, term_requests.requests[term_response["requestId"]]["machines"])
|
||||
|
||||
status_response = provider.status({"requests": [{"requestId": term_response["requestId"]}]})
|
||||
self.assertEquals(1, len(status_response["requests"]))
|
||||
self.assertEquals(1, len(status_response["requests"][0]["machines"]))
|
||||
self.assertEqual(1, len(status_response["requests"]))
|
||||
self.assertEqual(1, len(status_response["requests"][0]["machines"]))
|
||||
|
||||
status_response = provider.status({"requests": [{"requestId": "missing"}]})
|
||||
self.assertEquals({'status': 'complete', 'requests': [{'status': 'complete', 'message': '', 'requestId': 'missing', 'machines': []}]}, status_response)
|
||||
self.assertEqual({'status': 'complete', 'requests': [{'status': 'complete', 'message': '', 'requestId': 'missing', 'machines': []}]}, status_response)
|
||||
|
||||
status_response = provider.status({"requests": [{"requestId": "delete-missing"}]})
|
||||
self.assertEquals({'status': 'running', 'requests': [{'status': 'running', "message": "Unknown termination request id.", 'requestId': 'delete-missing', 'machines': []}]}, status_response)
|
||||
self.assertEqual({'status': 'running', 'requests': [{'status': 'running', "message": "Unknown termination request id.", 'requestId': 'delete-missing', 'machines': []}]}, status_response)
|
||||
|
||||
def test_terminate_status(self):
|
||||
provider = self._new_provider()
|
||||
term_requests = provider.terminate_json
|
||||
term_response = provider.terminate_machines({"machines": [{"name": "host-123", "machineId": "id-123"}]})
|
||||
|
||||
self.assertEquals(term_response["status"], "complete")
|
||||
self.assertEqual(term_response["status"], "complete")
|
||||
self.assertTrue(term_response["requestId"] in term_requests.requests)
|
||||
self.assertEquals({"id-123": "host-123"}, term_requests.requests[term_response["requestId"]]["machines"])
|
||||
self.assertEqual({"id-123": "host-123"}, term_requests.requests[term_response["requestId"]]["machines"])
|
||||
|
||||
status_response = provider.terminate_status({"machines": [{"machineId": "id-123", "name": "host-123"}]})
|
||||
self.assertEquals(1, len(status_response["requests"]))
|
||||
self.assertEquals(1, len(status_response["requests"][0]["machines"]))
|
||||
self.assertEqual(1, len(status_response["requests"]))
|
||||
self.assertEqual(1, len(status_response["requests"][0]["machines"]))
|
||||
|
||||
status_response = provider.terminate_status({"machines": [{"machineId": "missing", "name": "missing-123"}]})
|
||||
self.assertEquals({'requests': [], 'status': 'complete'}, status_response)
|
||||
self.assertEqual({'requests': [], 'status': 'complete'}, status_response)
|
||||
|
||||
def test_terminate_error(self):
|
||||
provider = self._new_provider()
|
||||
term_response = provider.terminate_machines({"machines": [{"name": "host-123", "machineId": "id-123"}]})
|
||||
self.assertEquals(term_response["status"], RequestStates.complete)
|
||||
self.assertEqual(term_response["status"], RequestStates.complete)
|
||||
|
||||
# if it raises an exception, don't mark the request id as successful.
|
||||
provider.cluster.raise_during_termination = True
|
||||
term_response = provider.terminate_machines({"machines": [{"name": "host-123", "machineId": "id-123"}]})
|
||||
self.assertEquals(RequestStates.running, term_response["status"])
|
||||
self.assertEqual(RequestStates.running, term_response["status"])
|
||||
failed_request_id = term_response["requestId"]
|
||||
self.assertNotEquals(True, provider.terminate_json.read()[term_response["requestId"]].get("terminated"))
|
||||
|
||||
# if it raises an exception, don't mark the request id as successful.
|
||||
provider.cluster.raise_during_termination = False
|
||||
term_response = provider.terminate_machines({"machines": [{"name": "host-123", "machineId": "id-123"}]})
|
||||
self.assertEquals(RequestStates.complete, term_response["status"])
|
||||
self.assertEquals(True, provider.terminate_json.read()[term_response["requestId"]].get("terminated"))
|
||||
self.assertEqual(RequestStates.complete, term_response["status"])
|
||||
self.assertEqual(True, provider.terminate_json.read()[term_response["requestId"]].get("terminated"))
|
||||
|
||||
provider.status({"requests": [{"requestId": failed_request_id}]})
|
||||
self.assertEquals(True, provider.terminate_json.read()[failed_request_id].get("terminated"))
|
||||
self.assertEqual(True, provider.terminate_json.read()[failed_request_id].get("terminated"))
|
||||
|
||||
# def test_json_store_lock(self):
|
||||
# json_store = JsonStore("test.json", "/tmp")
|
||||
|
||||
# json_store._lock()
|
||||
# self.assertEquals(101, subprocess.call([sys.executable, test_json_source_helper.__file__, "test.json", "/tmp"]))
|
||||
# self.assertEqual(101, subprocess.call([sys.executable, test_json_source_helper.__file__, "test.json", "/tmp"]))
|
||||
|
||||
# json_store._unlock()
|
||||
# self.assertEquals(0, subprocess.call([sys.executable, test_json_source_helper.__file__, "test.json", "/tmp"]))
|
||||
# self.assertEqual(0, subprocess.call([sys.executable, test_json_source_helper.__file__, "test.json", "/tmp"]))
|
||||
|
||||
def test_templates(self):
|
||||
provider = self._new_provider()
|
||||
a4bucket, a8bucket = provider.cluster._nodearrays["nodearrays"][0]["buckets"]
|
||||
nodearray = {"MaxCoreCount": 100}
|
||||
self.assertEquals(2, provider._max_count('execute', nodearray, 4, {"maxCount": 2, "definition": {"machineType": "A$"}}))
|
||||
self.assertEquals(3, provider._max_count('execute', nodearray, 8, {"maxCoreCount": 24, "definition": {"machineType": "A$"}}))
|
||||
self.assertEquals(3, provider._max_count('execute', nodearray, 8, {"maxCoreCount": 25, "definition": {"machineType": "A$"}}))
|
||||
self.assertEquals(3, provider._max_count('execute', nodearray, 8, {"maxCoreCount": 31, "definition": {"machineType": "A$"}}))
|
||||
self.assertEquals(4, provider._max_count('execute', nodearray, 8, {"maxCoreCount": 32, "definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(2, provider._max_count('execute', nodearray, 4, {"maxCount": 2, "definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(3, provider._max_count('execute', nodearray, 8, {"maxCoreCount": 24, "definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(3, provider._max_count('execute', nodearray, 8, {"maxCoreCount": 25, "definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(3, provider._max_count('execute', nodearray, 8, {"maxCoreCount": 31, "definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(4, provider._max_count('execute', nodearray, 8, {"maxCoreCount": 32, "definition": {"machineType": "A$"}}))
|
||||
|
||||
# simple zero conditions
|
||||
self.assertEquals(0, provider._max_count('execute', nodearray, 8, {"maxCoreCount": 0, "definition": {"machineType": "A$"}}))
|
||||
self.assertEquals(0, provider._max_count('execute', nodearray, 8, {"maxCount": 0, "definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(0, provider._max_count('execute', nodearray, 8, {"maxCoreCount": 0, "definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(0, provider._max_count('execute', nodearray, 8, {"maxCount": 0, "definition": {"machineType": "A$"}}))
|
||||
|
||||
# error conditions return -1
|
||||
nodearray = {}
|
||||
self.assertEquals(-1, provider._max_count('execute', nodearray, -100, {"maxCoreCount": 32, "definition": {"machineType": "A$"}}))
|
||||
self.assertEquals(-1, provider._max_count('execute', nodearray, -100, {"maxCount": 32, "definition": {"machineType": "A$"}}))
|
||||
self.assertEquals(-1, provider._max_count('execute', nodearray, 4, {"definition": {"machineType": "A$"}}))
|
||||
self.assertEquals(-1, provider._max_count('execute', nodearray, 4, {"maxCount": -100, "definition": {"machineType": "A$"}}))
|
||||
self.assertEquals(-1, provider._max_count('execute', nodearray, 4, {"maxCoreCount": -100, "definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(-1, provider._max_count('execute', nodearray, -100, {"maxCoreCount": 32, "definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(-1, provider._max_count('execute', nodearray, -100, {"maxCount": 32, "definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(-1, provider._max_count('execute', nodearray, 4, {"definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(-1, provider._max_count('execute', nodearray, 4, {"maxCount": -100, "definition": {"machineType": "A$"}}))
|
||||
self.assertEqual(-1, provider._max_count('execute', nodearray, 4, {"maxCoreCount": -100, "definition": {"machineType": "A$"}}))
|
||||
|
||||
a4bucket["maxCount"] = 0
|
||||
a8bucket["maxCoreCount"] = 0 # we can _never_ return an empty list
|
||||
self.assertEquals(cyclecloud_provider.PLACEHOLDER_TEMPLATE, provider.templates()["templates"][0])
|
||||
self.assertEqual(cyclecloud_provider.PLACEHOLDER_TEMPLATE, provider.templates()["templates"][0])
|
||||
|
||||
a8bucket["maxCoreCount"] = 24
|
||||
self.assertEquals(3, provider.templates()["templates"][-1]["maxNumber"])
|
||||
self.assertEqual(3, provider.templates()["templates"][-1]["maxNumber"])
|
||||
a8bucket["maxCoreCount"] = 0
|
||||
|
||||
a4bucket["maxCount"] = 100
|
||||
self.assertEquals(100, provider.templates()["templates"][0]["maxNumber"])
|
||||
self.assertEqual(100, provider.templates()["templates"][0]["maxNumber"])
|
||||
|
||||
|
||||
def test_reprioritize_template(self):
|
||||
|
@ -484,36 +484,36 @@ class TestHostFactory(unittest.TestCase):
|
|||
|
||||
# a4 overrides the default and has custom2 defined as well
|
||||
attributes = any_template("execute")["attributes"]
|
||||
self.assertEquals(["String", "custom_override_value"], attributes["custom"])
|
||||
self.assertEquals(["String", "custom_value2"], attributes["custom2"])
|
||||
self.assertEquals(["Numeric", '1024'], attributes["mem"])
|
||||
self.assertEqual(["String", "custom_override_value"], attributes["custom"])
|
||||
self.assertEqual(["String", "custom_value2"], attributes["custom2"])
|
||||
self.assertEqual(["Numeric", '1024'], attributes["mem"])
|
||||
|
||||
def test_errors(self):
|
||||
provider = self._new_provider()
|
||||
provider.cluster.raise_during_add_nodes = True
|
||||
provider.templates()
|
||||
response = provider.create_machines(self._make_request("executea4", 1))
|
||||
self.assertEquals('Azure CycleCloud experienced an error, though it may have succeeded: raise_during_add_nodes', response["message"])
|
||||
self.assertEquals(RequestStates.running, response["status"])
|
||||
self.assertEqual('Azure CycleCloud experienced an error, though it may have succeeded: raise_during_add_nodes', response["message"])
|
||||
self.assertEqual(RequestStates.running, response["status"])
|
||||
self.assertNotEquals(None, response.get("requestId"))
|
||||
|
||||
provider.cluster.raise_during_termination = True
|
||||
term_response = provider.terminate_machines({"machines": [{"machineId": "mach123", "name": "n-1-123"}]})
|
||||
self.assertEquals(RequestStates.running, term_response["status"])
|
||||
self.assertEqual(RequestStates.running, term_response["status"])
|
||||
|
||||
def test_missing_template_in_request(self):
|
||||
provider = self._new_provider()
|
||||
provider.templates_json.requests.clear()
|
||||
request = provider.create_machines(self._make_request("executea4", 1))
|
||||
self.assertEquals(RequestStates.complete_with_error, request["status"])
|
||||
self.assertEqual(RequestStates.complete_with_error, request["status"])
|
||||
|
||||
def test_expired_terminations(self):
|
||||
provider = self._new_provider()
|
||||
term_response = provider.terminate_machines({"machines": [{"machineId": "id-123", "name": "e-1-123"},
|
||||
{"machineId": "id-124", "name": "e-2-234"}]})
|
||||
self.assertEquals(RequestStates.complete, term_response["status"])
|
||||
self.assertEqual(RequestStates.complete, term_response["status"])
|
||||
stat_response = provider.status({"requests": [{"requestId": term_response["requestId"]}]})
|
||||
self.assertEquals(RequestStates.complete, stat_response["requests"][0]["status"])
|
||||
self.assertEqual(RequestStates.complete, stat_response["requests"][0]["status"])
|
||||
self.assertIn(term_response["requestId"], provider.terminate_json.read())
|
||||
|
||||
# expires after 2 hours, so this is just shy of 2 hours
|
||||
|
@ -523,19 +523,19 @@ class TestHostFactory(unittest.TestCase):
|
|||
|
||||
term_response = provider.terminate_machines({"machines": [{"machineId": "id-234", "name": "n-1-123"}]})
|
||||
stat_response = provider.status({"requests": [{"requestId": term_response["requestId"]}]})
|
||||
self.assertEquals(RequestStates.complete, stat_response["requests"][0]["status"])
|
||||
self.assertEqual(RequestStates.complete, stat_response["requests"][0]["status"])
|
||||
self.assertIn(expired_request, provider.terminate_json.read())
|
||||
|
||||
# just over 2 hours, it will be gone.
|
||||
provider.clock.now = (1970, 1, 1, 2.01, 0, 0)
|
||||
with provider.terminate_json as requests:
|
||||
for _, request in requests.iteritems():
|
||||
for _, request in requests.items():
|
||||
request["terminated"] = False
|
||||
stat_response = provider.status({"requests": [{"requestId": term_response["requestId"]}]})
|
||||
self.assertIn(expired_request, provider.terminate_json.read())
|
||||
|
||||
with provider.terminate_json as requests:
|
||||
for _, request in requests.iteritems():
|
||||
for _, request in requests.items():
|
||||
request["terminated"] = True
|
||||
stat_response = provider.status({"requests": [{"requestId": term_response["requestId"]}]})
|
||||
self.assertNotIn(expired_request, provider.terminate_json.read())
|
||||
|
@ -546,7 +546,7 @@ class TestHostFactory(unittest.TestCase):
|
|||
|
||||
def _maxNumber(name):
|
||||
ret = [t for t in templates if t["templateId"] == name]
|
||||
self.assertEquals(1, len(ret))
|
||||
self.assertEqual(1, len(ret))
|
||||
return ret[0]["maxNumber"]
|
||||
|
||||
self.assertTrue(_maxNumber("executea4") > 0)
|
||||
|
@ -574,20 +574,20 @@ class TestHostFactory(unittest.TestCase):
|
|||
|
||||
# a4 overrides the default and has custom2 defined as well
|
||||
attributes = any_template("execute")["attributes"]
|
||||
self.assertEquals(["String", "custom_override_value"], attributes["custom"])
|
||||
self.assertEquals(["String", "custom_value2"], attributes["custom2"])
|
||||
self.assertEquals(["Numeric", '1024'], attributes["mem"])
|
||||
self.assertEqual(["String", "custom_override_value"], attributes["custom"])
|
||||
self.assertEqual(["String", "custom_value2"], attributes["custom2"])
|
||||
self.assertEqual(["Numeric", '1024'], attributes["mem"])
|
||||
|
||||
# a8 only has the default
|
||||
attributes = any_template("other")["attributes"]
|
||||
self.assertEquals(["String", "custom_default_value"], attributes["custom"])
|
||||
self.assertEqual(["String", "custom_default_value"], attributes["custom"])
|
||||
self.assertNotIn("custom2", attributes)
|
||||
self.assertEquals(0, any_template("other")["maxNumber"])
|
||||
self.assertEqual(0, any_template("other")["maxNumber"])
|
||||
|
||||
def test_invalid_template(self):
|
||||
provider = self._new_provider()
|
||||
response = provider.create_machines(self._make_request("nonsense", 1))
|
||||
self.assertEquals(RequestStates.complete_with_error, response["status"])
|
||||
self.assertEqual(RequestStates.complete_with_error, response["status"])
|
||||
|
||||
def test_provider_config_from_env(self):
|
||||
tempdir = tempfile.mkdtemp()
|
||||
|
@ -606,7 +606,7 @@ class TestHostFactory(unittest.TestCase):
|
|||
for template in provider.templates()["templates"]:
|
||||
self.assertIn(template["templateId"], ["executea4", "executea8", "lpexecutea4", "lpexecutea8"])
|
||||
assert "custom" in template["attributes"]
|
||||
self.assertEquals(["String", "VALUE"], template["attributes"]["custom"])
|
||||
self.assertEqual(["String", "VALUE"], template["attributes"]["custom"])
|
||||
|
||||
except Exception:
|
||||
shutil.rmtree(tempdir, ignore_errors=True)
|
||||
|
@ -618,21 +618,21 @@ class TestHostFactory(unittest.TestCase):
|
|||
|
||||
config.set("templates.default.UserData", "abc=123;def=1==1")
|
||||
provider_templates = provider.templates()
|
||||
self.assertEquals({"abc": "123", "def": "1==1"}, provider_templates["templates"][0]["UserData"]["symphony"]["custom_env"])
|
||||
self.assertEquals("abc def", provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env_names"])
|
||||
self.assertEqual({"abc": "123", "def": "1==1"}, provider_templates["templates"][0]["UserData"]["symphony"]["custom_env"])
|
||||
self.assertEqual("abc def", provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env_names"])
|
||||
|
||||
config.set("templates.default.UserData", "abc=123;def=1==1;")
|
||||
self.assertEquals({"abc": "123", "def": "1==1"}, provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env"])
|
||||
self.assertEquals("abc def", provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env_names"])
|
||||
self.assertEqual({"abc": "123", "def": "1==1"}, provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env"])
|
||||
self.assertEqual("abc def", provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env_names"])
|
||||
|
||||
config.set("templates.default.UserData", "abc=123;def=1==1;bad_form")
|
||||
|
||||
self.assertEquals({"abc": "123", "def": "1==1"}, provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env"])
|
||||
self.assertEquals("abc def", provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env_names"])
|
||||
self.assertEqual({"abc": "123", "def": "1==1"}, provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env"])
|
||||
self.assertEqual("abc def", provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env_names"])
|
||||
|
||||
config.set("templates.default.UserData", "abc=123;def=1==1;good_form=234;bad_form_123")
|
||||
self.assertEquals({"abc": "123", "def": "1==1", "good_form": "234"}, provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env"])
|
||||
self.assertEquals("abc def good_form", provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env_names"])
|
||||
self.assertEqual({"abc": "123", "def": "1==1", "good_form": "234"}, provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env"])
|
||||
self.assertEqual("abc def good_form", provider.templates()["templates"][0]["UserData"]["symphony"]["custom_env_names"])
|
||||
|
||||
def assert_no_user_data():
|
||||
templates = provider.templates()
|
||||
|
|
|
@ -6,26 +6,26 @@ import util
|
|||
class UtilTest(unittest.TestCase):
|
||||
|
||||
def test_provider_config(self):
|
||||
self.assertEquals("d", util.ProviderConfig({"a": {"b": {"c": "d"}}}, {}).get("a.b.c"))
|
||||
self.assertEquals({"c": "d"}, util.ProviderConfig({"a": {"b": {"c": "d"}}}, {}).get("a.b"))
|
||||
self.assertEqual("d", util.ProviderConfig({"a": {"b": {"c": "d"}}}, {}).get("a.b.c"))
|
||||
self.assertEqual({"c": "d"}, util.ProviderConfig({"a": {"b": {"c": "d"}}}, {}).get("a.b"))
|
||||
|
||||
# missing from user config, look in jetpack
|
||||
self.assertEquals("y", util.ProviderConfig({"a": {"b": {"c": "d"}}}, {"x": "y"}).get("x"))
|
||||
self.assertEqual("y", util.ProviderConfig({"a": {"b": {"c": "d"}}}, {"x": "y"}).get("x"))
|
||||
# fall back on default_value, if all else fails
|
||||
self.assertEquals("0", util.ProviderConfig({}, {"x": "y"}).get("z.a.b", "0"))
|
||||
self.assertEqual("0", util.ProviderConfig({}, {"x": "y"}).get("z.a.b", "0"))
|
||||
# user config overrides jetpack
|
||||
self.assertEquals("d", util.ProviderConfig({"a": {"b": {"c": "d"}}}, {"a": {"b": {"c": "e"}}}).get("a.b.c"))
|
||||
self.assertEqual("d", util.ProviderConfig({"a": {"b": {"c": "d"}}}, {"a": {"b": {"c": "e"}}}).get("a.b.c"))
|
||||
|
||||
pc = util.ProviderConfig({}, {})
|
||||
pc.set("a", "b")
|
||||
self.assertEquals("b", pc.get("a"))
|
||||
self.assertEqual("b", pc.get("a"))
|
||||
|
||||
pc.set("x.y.z", "123")
|
||||
self.assertEquals("123", pc.get("x.y.z"))
|
||||
self.assertEquals({"z": "123"}, pc.get("x.y"))
|
||||
self.assertEquals({"y": {"z": "123"}}, pc.get("x"))
|
||||
self.assertEquals({"x": {"y": {"z": "123"}}, "a": "b"}, pc.get(""))
|
||||
self.assertEquals({"x": {"y": {"z": "123"}}, "a": "b"}, pc.get(None))
|
||||
self.assertEqual("123", pc.get("x.y.z"))
|
||||
self.assertEqual({"z": "123"}, pc.get("x.y"))
|
||||
self.assertEqual({"y": {"z": "123"}}, pc.get("x"))
|
||||
self.assertEqual({"x": {"y": {"z": "123"}}, "a": "b"}, pc.get(""))
|
||||
self.assertEqual({"x": {"y": {"z": "123"}}, "a": "b"}, pc.get(None))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Загрузка…
Ссылка в новой задаче