This commit is contained in:
Zihao Chen 2018-03-13 17:24:55 +08:00
Родитель 6e53b5263f
Коммит a8a82fac47
5 изменённых файлов: 57 добавлений и 67 удалений

Просмотреть файл

@ -9,8 +9,7 @@ class HeartBeatTable(object):
def __init__(self, provisioning_timeout=timedelta(minutes=15), heartbeat_timeout=timedelta(minutes=3)):
self._table = {}
self.logger = logging_aux.init_logger_aux(
"hpcframework.heartbeat", "hpcframework.heartbeat.log")
self.logger = logging_aux.init_logger_aux("hpcframework.heartbeat", "hpcframework.heartbeat.log")
self.on_host_running = []
self._table_lock = threading.Lock()
self._provisioning_timeout = provisioning_timeout
@ -20,8 +19,7 @@ class HeartBeatTable(object):
if hostname in self._table and self._table[hostname].state != HpcState.Closed:
self.logger.warn("Heart beat entry of {} existed. old value: {}.".format(
hostname, str(self._table[hostname])))
slaveinfo = SlaveInfo(hostname, agent_id, task_id,
cpus, last_heartbeat, HpcState.Provisioning)
slaveinfo = SlaveInfo(hostname, agent_id, task_id, cpus, last_heartbeat, HpcState.Provisioning)
self._table[hostname] = slaveinfo
self.logger.info("Heart beat entry added: {}".format(str(slaveinfo)))
@ -34,39 +32,31 @@ class HeartBeatTable(object):
if self._table[hostname].state == HpcState.Provisioning:
self._table[hostname].state = HpcState.Running
self.__exec_callback(self.on_host_running)
self.logger.info(
"Host {} start running".format(hostname))
self.logger.info("Host {} start running".format(hostname))
else:
self.logger.error(
"Host {} is not recognized. Heartbeat ignored.".format(hostname))
self.logger.error("Host {} is not recognized. Heartbeat ignored.".format(hostname))
def on_slave_close(self, hostname):
if hostname in self._table:
self._table[hostname].state = HpcState.Closed
self.logger.info("Host {} closed".format(hostname))
else:
self.logger.error(
"Host {} is not recognized. Close event ignored.".format(hostname))
self.logger.error("Host {} is not recognized. Close event ignored.".format(hostname))
def get_task_info(self, hostname):
if hostname in self._table:
entry = self._table[hostname]
return (entry.task_id, entry.agent_id)
else:
self.logger.error(
"Host {} is not recognized. Failed to get task info.".format(hostname))
self.logger.error("Host {} is not recognized. Failed to get task info.".format(hostname))
def __exec_callback(self, callbacks):
for callback in callbacks:
try:
self.logger.debug(
'Callback %s on %s' % (callback.__name__)
)
self.logger.debug('Callback %s on %s' % (callback.__name__))
callback()
except Exception as e:
self.logger.exception(
'Error in %s callback: %s' % (callback.__name__, str(e))
)
self.logger.exception('Error in %s callback: %s' % (callback.__name__, str(e)))
def check_timeout(self, now=datetime.utcnow()):
provision_timeout_list = []

Просмотреть файл

@ -14,11 +14,11 @@ from datetime import datetime
from mesoshttp.client import MesosClient
from mesoshttp.offers import Offer
import heartbeat_table
import logging_aux
import restclient
import restserver
from restclient import AutoScaleRestClient
import heartbeat_table
class Test(object):
@ -36,8 +36,7 @@ class Test(object):
def __init__(self):
logging.basicConfig()
self.logger = logging_aux.init_logger_aux(
"hpcframework", "hpcframework.log")
self.logger = logging_aux.init_logger_aux("hpcframework", "hpcframework.log")
# signal.signal(signal.SIGINT, signal.SIG_IGN)
logging.getLogger('mesoshttp').setLevel(logging.DEBUG)
@ -49,8 +48,7 @@ class Test(object):
hpc_setup_ps1 = scriptfile.read()
self.logger.info("Loaded HPC setup script:\n{}".format(hpc_setup_ps1))
hpc_setup_ps1_utf16 = hpc_setup_ps1.encode('utf-16')
hpc_setup_ps1_utf16_nobom = hpc_setup_ps1_utf16[2:] if hpc_setup_ps1_utf16[
0:2] == codecs.BOM_UTF16 else hpc_setup_ps1_utf16
hpc_setup_ps1_utf16_nobom = hpc_setup_ps1_utf16[2:] if hpc_setup_ps1_utf16[0:2] == codecs.BOM_UTF16 else hpc_setup_ps1_utf16
self.hpc_setup_ps1_b64 = base64.b64encode(hpc_setup_ps1_utf16_nobom)
self.driver = None # type: MesosClient.SchedulerDriver
@ -62,7 +60,7 @@ class Test(object):
self.th = Test.MesosFramework(self.mesos_client)
self.th.start()
self.heartbeat_server = restserver.RestServer(self.heartbeat_table ,8088)
self.heartbeat_server = restserver.RestServer(self.heartbeat_table, 8088)
self.heartbeat_server.start()
while True and self.th.isAlive():
@ -92,21 +90,19 @@ class Test(object):
def offer_received(self, offers):
# self.logger.info('OFFER: %s' % (str(offers)))
grow_decision = self.hpc_client.get_grow_decision()
cores_to_grow = grow_decision.cores_to_grow - self.heartbeat_table.get_cores_in_provisioning()
if grow_decision.cores_to_grow - self.core_provisioning > 0:
if cores_to_grow > 0:
for offer in offers: # type: Offer
mesos_offer = offer.get_offer()
self.logger.info("offer_received: {}".format(
(str(mesos_offer))))
self.logger.info("offer_received: {}".format((str(mesos_offer))))
if 'attributes' in mesos_offer:
attributes = mesos_offer['attributes']
if self.get_text(attributes, 'os') != 'windows_server':
offer.decline()
else:
cores = self.get_scalar(attributes, 'cores')
cpus = self.get_scalar(
mesos_offer['resources'], 'cpus')
cpus = self.get_scalar(mesos_offer['resources'], 'cpus')
if cores == cpus:
self.accept_offer(offer)
else:
@ -118,10 +114,9 @@ class Test(object):
offer.decline()
def accept_offer(self, offer):
self.logger.info("Offer %s meets HPC's requirement" %
offer.get_offer()['id']['value'])
self.logger.info("Offer %s meets HPC's requirement" % offer.get_offer()['id']['value'])
self.run_job(offer)
def get_scalar(self, collection, name):
for i in collection:
if i['name'] == name:
@ -137,7 +132,6 @@ class Test(object):
def run_job(self, mesos_offer):
offer = mesos_offer.get_offer()
self.logger.info("Accepting offer: {}".format(str(offer)))
agent_id = offer['agent_id']['value']
hostname = offer['hostname']
task_id = uuid.uuid4().hex
@ -162,10 +156,10 @@ class Test(object):
],
'command': {'value': 'powershell -EncodedCommand ' + self.hpc_setup_ps1_b64}
}
self.logger.debug(
"Sending command:\n{}".format(task['command']['value']))
self.logger.debug("Sending command:\n{}".format(task['command']['value']))
mesos_offer.accept([task])
self.heartbeat_table.add_slaveinfo(hostname, agent_id, task, cpus)
self.heartbeat_table.add_slaveinfo(hostname, agent_id, task, cpus)
if __name__ == "__main__":
test_mesos = Test()

Просмотреть файл

@ -1,7 +1,8 @@
import logging
def init_logger_aux(logger_name, filelog_name, console_level=logging.WARNING, file_level=logging.DEBUG):
logger = logging.getLogger(logger_name) # type: logging.Logger
def init_logger_aux(logger_name, filelog_name, console_level=logging.WARNING, file_level=logging.DEBUG):
logger = logging.getLogger(logger_name) # type: logging.Logger
logger.setLevel(file_level)
fh = logging.FileHandler(filelog_name)
fh.setLevel(file_level)
@ -12,4 +13,4 @@ def init_logger_aux(logger_name, filelog_name, console_level=logging.WARNING, fi
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
return logger

Просмотреть файл

@ -1,12 +1,15 @@
import requests
import json
import logging
from collections import namedtuple
import requests
import logging_aux
GrowDecision = namedtuple("GrowDecision", "cores_to_grow nodes_to_grow sockets_to_grow")
IdleNode = namedtuple("IdleNode", "node_name idle_since")
class AutoScaleRestClient(object):
def __init__(self, hostname="localhost"):
self.hostname = hostname
@ -16,27 +19,28 @@ class AutoScaleRestClient(object):
def get_grow_decision(self):
url = self.grow_decision_api_route.format(self.hostname)
res = requests.post(url, verify = False)
res = requests.post(url, verify=False)
if res.ok:
self.logger.info(res.content)
jobj = json.loads(res.content)
return GrowDecision(jobj['CoresToGrow'], jobj['NodesToGrow'], jobj['SocketsToGrow'])
else:
self.logger.error("status_code:{} content:{}".format(res.status_code, res.content))
def check_nodes_idle(self, nodes):
headers = {"Content-Type": "application/json"}
url = self.check_nodes_idle_route.format(self.hostname)
res = requests.post(url, data = nodes, headers = headers, verify = False)
res = requests.post(url, data=nodes, headers=headers, verify=False)
if res.ok:
self.logger.info(res.content)
jobjs = json.loads(res.content)
jobjs = json.loads(res.content)
return [IdleNode(idle_info['NodeName'], idle_info['IdleSince']) for idle_info in jobjs]
else:
self.logger.error("status_code:{} content:{}".format(res.status_code, res.content))
if __name__ == '__main__':
client = AutoScaleRestClient()
ans = client.get_grow_decision()
print ans.cores_to_grow
print client.check_nodes_idle(json.dumps(['mesoswinagent', 'mesoswinagent2']))
print client.check_nodes_idle(json.dumps(['mesoswinagent', 'mesoswinagent2']))

Просмотреть файл

@ -5,34 +5,35 @@ from heartbeat_table import HeartBeatTable
import logging_aux
import logging
class RestServer(object): # TODO: replace this implementation with twisted based implementation
def __init__(self, heartbeat_table, port = 80):
self.logger = logging_aux.init_logger_aux(
"hpcframework.heatbeat_server", "hpcframework.heatbeat_server.log")
self._heartbeat_table = heartbeat_table # type: HeartBeatTable
class RestServer(object): # TODO: replace this implementation with twisted based implementation
def __init__(self, heartbeat_table, port=80):
self.logger = logging_aux.init_logger_aux("hpcframework.heatbeat_server", "hpcframework.heatbeat_server.log")
self._heartbeat_table = heartbeat_table # type: HeartBeatTable
self._server_address = ('', port)
self._server_class = HTTPServer
self._server_class = HTTPServer
self._handler_class = HeartBeatHandler
self._port = port
self._httpd = self._server_class(self._server_address, self._handler_class)
self._server_thread = threading.Thread(target=self.run)
self._server_thread = threading.Thread(target=self.run)
HeartBeatHandler.logger = self.logger
HeartBeatHandler.heartbeat_table = self._heartbeat_table
def run(self):
def run(self):
self.logger.debug('Starting httpd...')
self._httpd.serve_forever()
self._httpd.serve_forever()
def stop(self):
self._httpd.shutdown()
self._httpd.shutdown()
self._server_thread.join()
def start(self):
self._server_thread.start()
class HeartBeatHandler(BaseHTTPRequestHandler):
logger = None # type: logging.Logger
heartbeat_table = None # type: HeartBeatTable
logger = None # type: logging.Logger
heartbeat_table = None # type: HeartBeatTable
def _set_headers(self):
self.send_response(200)
@ -45,25 +46,25 @@ class HeartBeatHandler(BaseHTTPRequestHandler):
def do_HEAD(self):
self._set_headers()
def do_POST(self):
# Doesn't do anything with posted data
content_length = int(self.headers['Content-Length']) # <--- Gets the size of data
post_data = self.rfile.read(content_length) # <--- Gets the data itself
content_length = int(self.headers['Content-Length']) # <--- Gets the size of data
post_data = self.rfile.read(content_length) # <--- Gets the data itself
self._set_headers()
json_obj = json.loads(post_data)
json_obj = json.loads(post_data)
# self.wfile.write("<html><body><h1>POST!</h1><pre>" + str(json_obj) + "</pre></body></html>")
self.logger.debug("Received heartbeat object {}".format(str(json_obj)))
try:
self.heartbeat_table.on_slave_heartbeat(json_obj['hostname'])
except Exception as ex:
self.logger.exception(ex)
self.logger.exception(ex)
# if __name__ == "__main__":
# from sys import argv
#
#
# if len(argv) == 2:
# run(port=int(argv[1]))
# else:
# run()
# run()