diff --git a/VMExtension/HandlerManifest.json b/VMExtension/HandlerManifest.json new file mode 100644 index 0000000..8ae321e --- /dev/null +++ b/VMExtension/HandlerManifest.json @@ -0,0 +1,14 @@ +[{ + "name": "HPCNodeManager", + "version": 1.0, + "handlerManifest": { + "installCommand": "hpcnodemanager.py -install", + "uninstallCommand": "hpcnodemanager.py -uninstall", + "updateCommand": "hpcnodemanager.py -update", + "enableCommand": "hpcnodemanager.py -enable", + "disableCommand": "hpcnodemanager.py -disable", + "rebootAfterInstall": false, + "reportHeartbeat": false, + "updateMode": "UpdateWithInstall" + } +}] diff --git a/VMExtension/Utils/HandlerUtil.py b/VMExtension/Utils/HandlerUtil.py new file mode 100644 index 0000000..7d05990 --- /dev/null +++ b/VMExtension/Utils/HandlerUtil.py @@ -0,0 +1,326 @@ +# +# Handler library for Linux IaaS +# +# Copyright 2014 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ + + +""" +JSON def: +HandlerEnvironment.json +[{ + "name": "ExampleHandlerLinux", + "seqNo": "seqNo", + "version": "1.0", + "handlerEnvironment": { + "logFolder": "", + "configFolder": "", + "statusFolder": "", + "heartbeatFile": "", + + } +}] + +Example ./config/1.settings +"{"runtimeSettings":[{"handlerSettings":{"protectedSettingsCertThumbprint":"1BE9A13AA1321C7C515EF109746998BAB6D86FD1","protectedSettings": +"MIIByAYJKoZIhvcNAQcDoIIBuTCCAbUCAQAxggFxMIIBbQIBADBVMEExPzA9BgoJkiaJk/IsZAEZFi9XaW5kb3dzIEF6dXJlIFNlcnZpY2UgTWFuYWdlbWVudCBmb3IgR+nhc6VHQTQpCiiV2zANBgkqhkiG9w0BAQEFAASCAQCKr09QKMGhwYe+O4/a8td+vpB4eTR+BQso84cV5KCAnD6iUIMcSYTrn9aveY6v6ykRLEw8GRKfri2d6tvVDggUrBqDwIgzejGTlCstcMJItWa8Je8gHZVSDfoN80AEOTws9Fp+wNXAbSuMJNb8EnpkpvigAWU2v6pGLEFvSKC0MCjDTkjpjqciGMcbe/r85RG3Zo21HLl0xNOpjDs/qqikc/ri43Y76E/Xv1vBSHEGMFprPy/Hwo3PqZCnulcbVzNnaXN3qi/kxV897xGMPPC3IrO7Nc++AT9qRLFI0841JLcLTlnoVG1okPzK9w6ttksDQmKBSHt3mfYV+skqs+EOMDsGCSqGSIb3DQEHATAUBggqhkiG9w0DBwQITgu0Nu3iFPuAGD6/QzKdtrnCI5425fIUy7LtpXJGmpWDUA==","publicSettings":{"port":"3000"}}}]}" + + +Example HeartBeat +{ +"version": 1.0, + "heartbeat" : { + "status": "ready", + "code": 0, + "Message": "Sample Handler running. Waiting for a new configuration from user." + } +} +Example Status Report: +[{"version":"1.0","timestampUTC":"2014-05-29T04:20:13Z","status":{"name":"Chef Extension Handler","operation":"chef-client-run","status":"success","code":0,"formattedMessage":{"lang":"en-US","message":"Chef-client run success"}}}] + +""" + + +import os +import os.path +import sys +import imp +import base64 +import json +import time + +from os.path import join +from Utils.WAAgentUtil import waagent +from waagent import LoggerInit + +DateTimeFormat = "%Y-%m-%dT%H:%M:%SZ" + +class HandlerContext: + def __init__(self,name): + self._name = name + self._version = '0.0' + self._config_dir = None + self._log_dir = None + self._log_file = None + self._status_dir = None + self._heartbeat_file = None + self._seq_no = -1 + self._status_file = None + self._settings_file = None + self._config = None + return + +class HandlerUtility: + def __init__(self, log, error, short_name): + self._log = log + self._error = error + self._short_name = short_name + + def _get_log_prefix(self): + return '[%s-%s]' %(self._context._name, self._context._version) + + def _get_current_seq_no(self, config_folder): + seq_no = -1 + cur_seq_no = -1 + freshest_time = None + for subdir, dirs, files in os.walk(config_folder): + for file in files: + try: + cur_seq_no = int(os.path.basename(file).split('.')[0]) + if(freshest_time == None): + freshest_time = os.path.getmtime(join(config_folder,file)) + seq_no = cur_seq_no + else: + current_file_m_time = os.path.getmtime(join(config_folder,file)) + if(current_file_m_time > freshest_time): + freshest_time=current_file_m_time + seq_no = cur_seq_no + except ValueError: + continue + return seq_no + + def log(self, message): + self._log(self._get_log_prefix() + message) + + def error(self, message): + self._error(self._get_log_prefix() + message) + + def _parse_config(self, ctxt): + config = None + try: + config=json.loads(ctxt) + except: + self.error('JSON exception decoding ' + ctxt) + + if config == None: + self.error("JSON error processing settings file:" + ctxt) + else: + handlerSettings = config['runtimeSettings'][0]['handlerSettings'] + if handlerSettings.has_key('protectedSettings') and \ + handlerSettings.has_key("protectedSettingsCertThumbprint") and \ + handlerSettings['protectedSettings'] is not None and \ + handlerSettings["protectedSettingsCertThumbprint"] is not None: + protectedSettings = handlerSettings['protectedSettings'] + thumb=handlerSettings['protectedSettingsCertThumbprint'] + cert=waagent.LibDir+'/'+thumb+'.crt' + pkey=waagent.LibDir+'/'+thumb+'.prv' + unencodedSettings = base64.standard_b64decode(protectedSettings) + openSSLcmd = "openssl smime -inform DER -decrypt -recip {0} -inkey {1}" + cleartxt = waagent.RunSendStdin(openSSLcmd.format(cert, pkey), unencodedSettings)[1] + if cleartxt == None: + self.error("OpenSSh decode error using thumbprint " + thumb ) + self.do_exit(1,operation,'error','1', 'Failed decrypting protectedSettings') + jctxt='' + try: + jctxt=json.loads(cleartxt) + except: + self.error('JSON exception decoding ' + cleartxt) + handlerSettings['protectedSettings']=jctxt + self.log('Config decoded correctly.') + return config + + def do_parse_context(self,operation): + _context = self.try_parse_context() + if not _context: + self.do_exit(1,operation,'error','1', operation + ' Failed') + return _context + + def try_parse_context(self): + self._context = HandlerContext(self._short_name) + handler_env=None + config=None + ctxt=None + code=0 + # get the HandlerEnvironment.json. According to the extension handler spec, it is always in the ./ directory + self.log('cwd is ' + os.path.realpath(os.path.curdir)) + handler_env_file='./HandlerEnvironment.json' + if not os.path.isfile(handler_env_file): + self.error("Unable to locate " + handler_env_file) + return None + ctxt = waagent.GetFileContents(handler_env_file) + if ctxt == None : + self.error("Unable to read " + handler_env_file) + try: + handler_env=json.loads(ctxt) + except: + pass + if handler_env == None : + self.log("JSON error processing " + handler_env_file) + return None + if type(handler_env) == list: + handler_env = handler_env[0] + + self._context._name = handler_env['name'] + self._context._version = str(handler_env['version']) + self._context._config_dir=handler_env['handlerEnvironment']['configFolder'] + self._context._log_dir= handler_env['handlerEnvironment']['logFolder'] + self._context._log_file= os.path.join(handler_env['handlerEnvironment']['logFolder'],'extension.log') + self._change_log_file() + self._context._status_dir=handler_env['handlerEnvironment']['statusFolder'] + self._context._heartbeat_file=handler_env['handlerEnvironment']['heartbeatFile'] + self._context._seq_no = self._get_current_seq_no(self._context._config_dir) + if self._context._seq_no < 0: + self.error("Unable to locate a .settings file!") + return None + self._context._seq_no = str(self._context._seq_no) + self.log('sequence number is ' + self._context._seq_no) + self._context._status_file= os.path.join(self._context._status_dir, self._context._seq_no +'.status') + self._context._settings_file = os.path.join(self._context._config_dir, self._context._seq_no + '.settings') + self.log("setting file path is" + self._context._settings_file) + ctxt=None + ctxt=waagent.GetFileContents(self._context._settings_file) + if ctxt == None : + error_msg = 'Unable to read ' + self._context._settings_file + '. ' + self.error(error_msg) + return None + + self.log("JSON config: " + ctxt) + self._context._config = self._parse_config(ctxt) + return self._context + + + def _change_log_file(self): + self.log("Change log file to " + self._context._log_file) + LoggerInit(self._context._log_file,'/dev/stdout') + self._log = waagent.Log + self._error = waagent.Error + + def set_verbose_log(self, verbose): + if(verbose == "1" or verbose == 1): + self.log("Enable verbose log") + LoggerInit(self._context._log_file, '/dev/stdout', verbose=True) + else: + self.log("Disable verbose log") + LoggerInit(self._context._log_file, '/dev/stdout', verbose=False) + + def is_seq_smaller(self): + return int(self._context._seq_no) <= self._get_most_recent_seq() + + def save_seq(self): + self._set_most_recent_seq(self._context._seq_no) + self.log("set most recent sequence number to " + self._context._seq_no) + + def exit_if_enabled(self): + self.exit_if_seq_smaller() + + def exit_if_seq_smaller(self): + if(self.is_seq_smaller()): + self.log("Current sequence number, " + self._context._seq_no + ", is not greater than the sequnce number of the most recent executed configuration. Exiting...") + sys.exit(0) + self.save_seq() + + def _get_most_recent_seq(self): + if(os.path.isfile('mrseq')): + seq = waagent.GetFileContents('mrseq') + if(seq): + return int(seq) + + return -1 + + def is_current_config_seq_greater_inused(self): + return int(self._context._seq_no) > self._get_most_recent_seq() + + def get_inused_config_seq(self): + return self._get_most_recent_seq() + + def set_inused_config_seq(self,seq): + self._set_most_recent_seq(seq) + + def _set_most_recent_seq(self,seq): + waagent.SetFileContents('mrseq', str(seq)) + + def do_status_report(self, operation, status, status_code, message): + self.log("{0},{1},{2},{3}".format(operation, status, status_code, message)) + tstamp=time.strftime(DateTimeFormat, time.gmtime()) + stat = [{ + "version" : self._context._version, + "timestampUTC" : tstamp, + "status" : { + "name" : self._context._name, + "operation" : operation, + "status" : status, + "code" : status_code, + "formattedMessage" : { + "lang" : "en-US", + "message" : message + } + } + }] + stat_rept = json.dumps(stat) + if self._context._status_file: + tmp = "%s.tmp" %(self._context._status_file) + with open(tmp,'w+') as f: + f.write(stat_rept) + os.rename(tmp, self._context._status_file) + + def do_heartbeat_report(self, heartbeat_file,status,code,message): + # heartbeat + health_report='[{"version":"1.0","heartbeat":{"status":"' + status+ '","code":"'+ code + '","Message":"' + message + '"}}]' + if waagent.SetFileContents(heartbeat_file,health_report) == None : + self.error('Unable to wite heartbeat info to ' + heartbeat_file) + + def do_exit(self,exit_code,operation,status,code,message): + try: + self.do_status_report(operation, status,code,message) + except Exception as e: + self.log("Can't update status: "+str(e)) + sys.exit(exit_code) + + def get_name(self): + return self._context._name + + def get_seq_no(self): + return self._context._seq_no + + def get_log_dir(self): + return self._context._log_dir + + def get_handler_settings(self): + if (self._context._config != None): + return self._context._config['runtimeSettings'][0]['handlerSettings'] + return None + + def get_protected_settings(self): + if (self._context._config != None): + return self.get_handler_settings().get('protectedSettings') + return None + + def get_public_settings(self): + handlerSettings = self.get_handler_settings() + if (handlerSettings != None): + return self.get_handler_settings().get('publicSettings') + return None + diff --git a/VMExtension/Utils/WAAgentUtil.py b/VMExtension/Utils/WAAgentUtil.py new file mode 100644 index 0000000..b3441e6 --- /dev/null +++ b/VMExtension/Utils/WAAgentUtil.py @@ -0,0 +1,90 @@ +# Wrapper module for waagent +# +# waagent is not written as a module. This wrapper module is created +# to use the waagent code as a module. +# +# Copyright 2014 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ +# + + +import imp +import os +import os.path + +# +# The following code will search and load waagent code and expose +# it as a submodule of current module +# +def searchWAAgent(): + # if the extension ships waagent in its package to default to this version first + pkg_agent_path = os.path.join(os.getcwd(), 'waagent') + if os.path.isfile(pkg_agent_path): + return pkg_agent_path + + agentPath = '/usr/sbin/waagent' + if os.path.isfile(agentPath): + return agentPath + + user_paths = os.environ['PYTHONPATH'].split(os.pathsep) + for user_path in user_paths: + agentPath = os.path.join(user_path, 'waagent') + if os.path.isfile(agentPath): + return agentPath + return None + +waagent = None +agentPath = searchWAAgent() +if agentPath: + waagent = imp.load_source('waagent', agentPath) +else: + raise Exception("Can't load waagent.") + +if not hasattr(waagent, "AddExtensionEvent"): + """ + If AddExtensionEvent is not defined, provide a dummy impl. + """ + def _AddExtensionEvent(*args, **kwargs): + pass + waagent.AddExtensionEvent = _AddExtensionEvent + +if not hasattr(waagent, "WALAEventOperation"): + class _WALAEventOperation: + HeartBeat="HeartBeat" + Provision = "Provision" + Install = "Install" + UnIsntall = "UnInstall" + Disable = "Disable" + Enable = "Enable" + Download = "Download" + Upgrade = "Upgrade" + Update = "Update" + waagent.WALAEventOperation = _WALAEventOperation + +__ExtensionName__=None +def InitExtensionEventLog(name): + global __ExtensionName__ + __ExtensionName__ = name + +def AddExtensionEvent(name=__ExtensionName__, + op=waagent.WALAEventOperation.Enable, + isSuccess=False, + message=None): + if name is not None: + waagent.AddExtensionEvent(name=name, + op=op, + isSuccess=isSuccess, + message=message) diff --git a/VMExtension/Utils/__init__.py b/VMExtension/Utils/__init__.py new file mode 100644 index 0000000..32a9170 --- /dev/null +++ b/VMExtension/Utils/__init__.py @@ -0,0 +1,19 @@ +# +# Copyright 2014 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ +# + + diff --git a/VMExtension/hpcnodemanager.py b/VMExtension/hpcnodemanager.py new file mode 100644 index 0000000..4bd1b8d --- /dev/null +++ b/VMExtension/hpcnodemanager.py @@ -0,0 +1,619 @@ +#!/usr/bin/env python +# +# HPCNodeManager extension +# +# Copyright 2015 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ + + +import os +import sys +import json +import subprocess +import re +import time +import traceback +import socket +import shutil +import platform +import struct +import array +import fcntl +import hashlib + +from Utils.WAAgentUtil import waagent +import Utils.HandlerUtil as Util + +#Define global variables +ExtensionShortName = 'HPCNodeManager' +DaemonPidFilePath = '/var/run/hpcnmdaemon.pid' +InstallRoot = '/opt/hpcnodemanager' +DistroName = None +DistroVersion = None +RestartIntervalInSeconds = 60 + +def main(): + waagent.LoggerInit('/var/log/waagent.log','/dev/stdout') + waagent.Log("%s started to handle." %(ExtensionShortName)) + waagent.MyDistro = waagent.GetMyDistro() + global DistroName, DistroVersion + distro = platform.dist() + DistroName = distro[0].lower() + DistroVersion = distro[1] + for a in sys.argv[1:]: + if re.match("^([-/]*)(disable)", a): + disable() + elif re.match("^([-/]*)(uninstall)", a): + uninstall() + elif re.match("^([-/]*)(install)", a): + install() + elif re.match("^([-/]*)(enable)", a): + enable() + elif re.match("^([-/]*)(daemon)", a): + daemon() + elif re.match("^([-/]*)(update)", a): + update() + +def _is_nodemanager_daemon(pid): + retcode, output = waagent.RunGetOutput("ps -p {0} -o cmd=".format(pid)) + if retcode == 0: + waagent.Log("The cmd for process {0} is {1}".format(pid, output)) + pattern = r'(.*[/\s])?{0}\s+[-/]*daemon$'.format(os.path.basename(__file__)) + if re.match(pattern, output): + return True + waagent.Log("The process {0} is not HPC Linux node manager daemon".format(pid)) + return False + +def install_package(package_name): + if DistroName == "centos" or DistroName == "redhat": + cmd = "yum -y install " + package_name + elif DistroName == "ubuntu": + waagent.Log("Updating apt package lists with command: apt-get -y update") + exitcode = waagent.Run("apt-get -y update", chk_err=False) + if exitcode != 0: + waagent.Log("Update apt package lists failed with exitcode: {0}".format(exitcode)) + cmd = "apt-get -y install " + package_name + elif DistroName == "suse": + if not os.listdir('/etc/zypp/repos.d'): + waagent.Run("zypper ar http://download.opensuse.org/distribution/13.2/repo/oss/suse/ opensuse") + cmd = "zypper -n --gpg-auto-import-keys install --force-resolution -l " + package_name + else: + cmd = "zypper -n install --force-resolution -l " + package_name + else: + raise Exception("Unsupported Linux Distro.") + waagent.Log("The command to install {0}: {1}".format(package_name, cmd)) + attempt = 1 + while(True): + waagent.Log("Installing package {0} (Attempt {1})".format(package_name, attempt)) + retcode, retoutput = waagent.RunGetOutput(cmd) + if retcode == 0: + waagent.Log("package {0} installation succeeded".format(package_name)) + break + else: + waagent.Log("package {0} installation failed {1}:\n {2}".format(package_name, retcode, retoutput)) + if attempt < 10: + time.sleep(min(30, pow(2, attempt))) + attempt += 1 + if DistroName == 'suse' and retcode == 104: + waagent.Run("zypper ar http://download.opensuse.org/distribution/13.2/repo/oss/suse/ opensuse") + cmd = "zypper -n --gpg-auto-import-keys install --force-resolution -l " + package_name + elif DistroName == "ubuntu": + waagent.Run("apt-get -y update", chk_err=False) + continue + else: + raise Exception("failed to install package {0}:{1}".format(package_name, retcode)) + +def _uninstall_nodemanager_files(): + if os.path.isdir(InstallRoot): + for tmpname in os.listdir(InstallRoot): + if tmpname == 'logs': + continue + if tmpname == 'certs': + continue + if tmpname == 'filters': + continue + tmppath = os.path.join(InstallRoot, tmpname) + if os.path.isdir(tmppath): + shutil.rmtree(tmppath) + elif os.path.isfile(tmppath): + os.remove(tmppath) + +def _install_cgroup_tool(): + if waagent.Run("command -v cgexec", chk_err=False) == 0: + waagent.Log("cgroup tools was already installed") + else: + waagent.Log("Start to install cgroup tools") + if DistroName == "ubuntu": + cg_pkgname = 'cgroup-bin' + elif (DistroName == "centos" or DistroName == "redhat") and re.match("^6", DistroVersion): + cg_pkgname = 'libcgroup' + else: + cg_pkgname = 'libcgroup-tools' + install_package(cg_pkgname) + waagent.Log("cgroup tool was successfully installed") + +def _install_sysstat(): + if waagent.Run("command -v iostat", chk_err=False) == 0: + waagent.Log("sysstat was already installed") + else: + waagent.Log("Start to install sysstat") + install_package('sysstat') + waagent.Log("sysstat was successfully installed") + +def _install_pstree(): + if waagent.Run("command -v pstree", chk_err=False) == 0: + waagent.Log("pstree was already installed") + else: + waagent.Log("Start to install pstree") + install_package('psmisc') + waagent.Log("pstree was successfully installed") + +def get_networkinterfaces(): + """ + Return the interface name, and ip addr of the + all non loopback interfaces. + """ + expected=16 # how many devices should I expect... + is_64bits = sys.maxsize > 2**32 + struct_size=40 if is_64bits else 32 # for 64bit the size is 40 bytes, for 32bits it is 32 bytes. + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + buff=array.array('B', b'\0' * (expected*struct_size)) + retsize=(struct.unpack('iL', fcntl.ioctl(s.fileno(), 0x8912, struct.pack('iL',expected*struct_size,buff.buffer_info()[0]))))[0] + if retsize == (expected*struct_size) : + waagent.Log('SIOCGIFCONF returned more than ' + str(expected) + ' up network interfaces.') + nics = [] + s=buff.tostring() + for i in range(0,retsize,struct_size): + iface=s[i:i+16].split(b'\0', 1)[0] + if iface == b'lo': + continue + else: + nics.append((iface.decode('latin-1'), socket.inet_ntoa(s[i+20:i+24]))) + return nics + +def cleanup_host_entries(): + hostsfile = '/etc/hosts' + if not os.path.isfile(hostsfile): + return + try: + hpcentryexists = False + newcontent='' + with open(hostsfile, 'r') as F: + for line in F.readlines(): + if re.match(r"^[0-9\.]+\s+[^\s#]+\s+#HPCD?\s*$", line): + hpcentryexists = True + else: + newcontent += line + if hpcentryexists: + waagent.Log("Clean all HPC related host entries from hosts file") + waagent.ReplaceFileContentsAtomic(hostsfile,newcontent) + os.chmod(hostsfile, 0o644) + except : + raise + +def init_suse_hostsfile(host_name, ipaddrs): + hostsfile = '/etc/hosts' + if not os.path.isfile(hostsfile): + return + try: + newhpcd_entries = '' + for ipaddr in ipaddrs: + newhpcd_entries += '{0:24}{1:30}#HPCD\n'.format(ipaddr, host_name) + + curhpcd_entries = '' + newcontent = '' + hpcentryexists = False + with open(hostsfile, 'r') as F: + for line in F.readlines(): + if re.match(r"^[0-9\.]+\s+[^\s#]+\s+#HPCD\s*$", line): + curhpcd_entries += line + hpcentryexists = True + elif re.match(r"^[0-9\.]+\s+[^\s#]+\s+#HPC\s*$", line): + hpcentryexists = True + else: + newcontent += line + + if newhpcd_entries != curhpcd_entries: + if hpcentryexists: + waagent.Log("Clean the HPC related host entries from hosts file") + waagent.Log("Add the following HPCD host entries:\n{0}".format(newhpcd_entries)) + if newcontent and newcontent[-1] != '\n': + newcontent += '\n' + newcontent += newhpcd_entries + waagent.ReplaceFileContentsAtomic(hostsfile,newcontent) + os.chmod(hostsfile, 0o644) + except : + raise + +def gethostname_from_configfile(configfile): + config_hostname = None + if os.path.isfile(configfile): + with open(configfile, 'r') as F: + configjson = json.load(F) + if 'RegisterUri' in configjson: + reguri = configjson['RegisterUri'] + reguri = reguri[0:reguri.rindex('/')] + config_hostname = reguri[reguri.rindex('/')+1:] + return config_hostname + +def _add_dns_search(domain_fqdn): + need_update = False + new_content = '' + for line in (open('/etc/resolv.conf','r')).readlines(): + if re.match('^search.* {0}'.format(domain_fqdn), line): + waagent.Log('{0} was already added in /etc/resolv.conf'.format(domain_fqdn)) + return + if re.match('^search', line): + need_update = True + new_content += line.replace('search', 'search {0}'.format(domain_fqdn)) + else: + new_content += line + if need_update: + waagent.Log('Adding {0} to /etc/resolv.conf'.format(domain_fqdn)) + waagent.SetFileContents('/etc/resolv.conf', new_content) + +def _update_dns_record(domain_fqdn): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + while True: + try: + s.connect((domain_fqdn, 53)) + break + except Exception, e: + waagent.Log('Failed to connect to {0}:53: {1}'.format(domain_fqdn, e)) + ipaddr = s.getsockname()[0] + host_fqdn = "{0}.{1}".format(socket.gethostname().split('.')[0], domain_fqdn) + dns_cmd = 'echo -e "server {0}\nzone {0}\nupdate delete {1}\nupdate add {1} 864000 A {2}\nsend\n" | nsupdate -v'.format(domain_fqdn, host_fqdn, ipaddr) + waagent.Log('The command to update ip to dns server is: {0}'.format(dns_cmd)) + retry = 0 + while retry < 60: + dns_ret, dns_msg = waagent.RunGetOutput(dns_cmd) + if dns_ret == 0: + waagent.Log("Succeeded to update ip to dns server.") + return + else: + retry = retry + 1 + waagent.Log("Failed to update ip to dns server: {0}, {1}".format(dns_ret, dns_msg)) + time.sleep(10) + +def _mount_cgroup(): + if not os.path.isdir('/cgroup'): + os.mkdir('/cgroup') + if not os.listdir('/cgroup'): + retcode, mount_msg = waagent.RunGetOutput('mount -t cgroup cgroup /cgroup') + waagent.Log("mount /cgroup directory {0}:{1}".format(retcode, mount_msg)) + if retcode == 0: + waagent.Log("/cgroup directory is successfully mounted.") + else: + raise Exception("failed to mount /cgroup directory") + else: + waagent.Log("/cgroup directory was already mounted.") + +def config_firewall_rules(): + if DistroName == 'redhat': + waagent.Log('Configuring the firewall rules') + major_version = int(DistroVersion.split('.')[0]) + if major_version < 7: + waagent.Run('lokkit --port=40000:tcp --update', chk_err=False) + elif waagent.Run("firewall-cmd --state", chk_err=False) == 0: + waagent.Run("firewall-cmd --permanent --zone=public --add-port=40000/tcp") + waagent.Run("firewall-cmd --reload") + +def parse_context(operation): + hutil = Util.HandlerUtility(waagent.Log, waagent.Error, ExtensionShortName) + hutil.do_parse_context(operation) + return hutil + + +def cmpFileHash(file1, file2): + if not (os.path.isfile(file1) and os.path.isfile(file2)): + return False + digests = [] + for filename in [file1, file2]: + md5hash = hashlib.md5() + with open(filename, 'rb') as f: + buf = f.read() + md5hash.update(buf) + digest = md5hash.hexdigest() + digests.append(digest) + return digests[0] == digests[1] + + +def install(): + hutil = parse_context('Install') + try: + cleanup_host_entries() + _uninstall_nodemanager_files() + if DistroName == "centos" or DistroName == "redhat": + waagent.Run("yum-config-manager --setopt=\\*.skip_if_unavailable=1 --save", chk_err=False) + _install_cgroup_tool() + _install_sysstat() + _install_pstree() + + logDir = os.path.join(InstallRoot, "logs") + if not os.path.isdir(logDir): + os.makedirs(logDir) + srcDir = os.path.join(os.getcwd(), "bin") + waagent.RunGetOutput("chmod +x {0}/*".format(srcDir)) + waagent.RunGetOutput("chmod +x {0}/lib/*".format(srcDir)) + for filename in os.listdir(srcDir): + srcname = os.path.join(srcDir, filename) + destname = os.path.join(InstallRoot, filename) + if os.path.isfile(srcname): + shutil.copy2(srcname, destname) + elif os.path.isdir(srcname): + shutil.copytree(srcname, destname) + libdir = os.path.join(InstallRoot, 'lib') + for tmpname in os.listdir(libdir): + tmppath = os.path.join(libdir, tmpname) + if tmpname.endswith(".tar.gz") and os.path.isfile(tmppath): + waagent.Run("tar xzvf {0} -C {1}".format(tmppath, libdir)) + os.remove(tmppath) + waagent.Run("chmod -R 755 {0}".format(libdir)) + + host_name = None + public_settings = hutil._context._config['runtimeSettings'][0]['handlerSettings'].get('publicSettings') + if public_settings: + host_name = public_settings.get('HostName') + backup_configfile = os.path.join(os.getcwd(), 'nodemanager.json') + if not host_name: + # if there is backup nodemanager.json, means it is an update install, if 'HostName' not defined in the extension + # settings, we shall get from the backup nodemanager.json + if os.path.isfile(backup_configfile): + waagent.Log("Backup nodemanager configuration file found") + host_name = gethostname_from_configfile(backup_configfile) + + curhostname = socket.gethostname().split('.')[0] + if host_name: + if host_name.lower() != curhostname.lower(): + waagent.Log("HostName was set: hostname from {0} to {1}".format(curhostname, host_name)) + waagent.MyDistro.setHostname(host_name) + waagent.MyDistro.publishHostname(host_name) + else: + host_name = curhostname + public_settings = hutil._context._config['runtimeSettings'][0]['handlerSettings'].get('publicSettings') + cluster_connstring = public_settings.get('ClusterConnectionString') + if not cluster_connstring: + waagent.Log("ClusterConnectionString is not specified") + protect_settings = hutil._context._config['runtimeSettings'][0]['handlerSettings'].get('protectedSettings') + cluster_connstring = protect_settings.get('ClusterName') + if not cluster_connstring: + error_msg = "neither ClusterConnectionString nor ClusterName is specified." + hutil.error(error_msg) + raise ValueError(error_msg) + ssl_thumbprint = public_settings.get('SSLThumbprint') + certsdir = os.path.join(InstallRoot, "certs") + if not ssl_thumbprint: + api_prefix = "http://{0}:80/HpcLinux/api/" + listen_uri = "http://0.0.0.0:40000" + else: + api_prefix = "https://{0}:443/HpcLinux/api/" + listen_uri = "https://0.0.0.0:40002" + # import the ssl certificate for hpc nodemanager + if not os.path.isdir(certsdir): + os.makedirs(certsdir, 0o750) + else: + os.chmod(certsdir, 0o750) + ssl_thumbprint = ssl_thumbprint.upper() + prvfile = os.path.join("/var/lib/waagent", ssl_thumbprint + ".prv") + srccrtfile = os.path.join("/var/lib/waagent", ssl_thumbprint + ".crt") + rsakeyfile = os.path.join(certsdir, "nodemanager_rsa.key") + dstcrtfile = os.path.join(certsdir, "nodemanager.crt") + if os.path.isfile(prvfile) and not cmpFileHash(prvfile, rsakeyfile): + waagent.Run("rm -rf {0}/nodemanager.crt {0}/nodemanager.key {0}/nodemanager.pem {0}/nodemanager_rsa.key".format(certsdir), chk_err=False) + shutil.copy2(prvfile, rsakeyfile) + shutil.copy2(srccrtfile, dstcrtfile) + shutil.copy2(dstcrtfile, os.path.join(certsdir, "nodemanager.pem")) + waagent.Run("openssl rsa -in {0}/nodemanager_rsa.key -out {0}/nodemanager.key".format(certsdir)) + waagent.Run("chmod 640 {0}/nodemanager.crt {0}/nodemanager.key {0}/nodemanager.pem {0}/nodemanager_rsa.key".format(certsdir)) + + node_uri = api_prefix + host_name + "/computenodereported" + reg_uri = api_prefix + host_name + "/registerrequested" + hostsfile_uri = api_prefix + "hostsfile" + metric_ids_uri = api_prefix + host_name + "/getinstanceids" + namingSvcUris = ['https://{0}:443/HpcNaming/api/fabric/resolve/singleton/'.format(h.split('.')[0].strip()) for h in cluster_connstring.split(',')] + if os.path.isfile(backup_configfile): + with open(backup_configfile, 'r') as F: + configjson = json.load(F) + configjson["NamingServiceUri"] = namingSvcUris + configjson["HeartbeatUri"] = node_uri + configjson["RegisterUri"] = reg_uri + configjson["HostsFileUri"] = hostsfile_uri + configjson["MetricInstanceIdsUri"] = metric_ids_uri + configjson["MetricUri"] = "" + configjson["ListeningUri"] = listen_uri + else: + configjson = { + "ConfigVersion": "1.0", + "NamingServiceUri": namingSvcUris, + "HeartbeatUri": node_uri, + "RegisterUri": reg_uri, + "MetricUri": "", + "MetricInstanceIdsUri": metric_ids_uri, + "HostsFileUri": hostsfile_uri, + "HostsFetchInterval": 120, + "ListeningUri": listen_uri, + "DefaultServiceName": "SchedulerStatefulService", + "UdpMetricServiceName": "MonitoringStatefulService" + } + if ssl_thumbprint: + configjson["TrustedCAFile"] = os.path.join(certsdir, "nodemanager.pem") + configjson["CertificateChainFile"] = os.path.join(certsdir, "nodemanager.crt") + configjson["PrivateKeyFile"] = os.path.join(certsdir, "nodemanager.key") + configfile = os.path.join(InstallRoot, 'nodemanager.json') + waagent.SetFileContents(configfile, json.dumps(configjson)) + shutil.copy2(configfile, backup_configfile) + config_firewall_rules() + hutil.do_exit(0, 'Install', 'success', '0', 'Install Succeeded.') + except Exception, e: + hutil.do_exit(1, 'Install','error','1', '{0}'.format(e)) + +def enable(): + #Check whether monitor process is running. + #If it does, return. Otherwise clear pid file + hutil = parse_context('Enable') + if os.path.isfile(DaemonPidFilePath): + pid = waagent.GetFileContents(DaemonPidFilePath) + if os.path.isdir(os.path.join("/proc", pid)) and _is_nodemanager_daemon(pid): + if hutil.is_seq_smaller(): + hutil.do_exit(0, 'Enable', 'success', '0', + 'HPC Linux node manager daemon is already running') + else: + waagent.Log("Stop old daemon: {0}".format(pid)) + os.killpg(int(pid), 9) + os.remove(DaemonPidFilePath) + + args = [os.path.join(os.getcwd(), __file__), "daemon"] + devnull = open(os.devnull, 'w') + child = subprocess.Popen(args, stdout=devnull, stderr=devnull, preexec_fn=os.setsid) + if child.pid is None or child.pid < 1: + hutil.do_exit(1, 'Enable', 'error', '1', + 'Failed to launch HPC Linux node manager daemon') + else: + hutil.save_seq() + waagent.SetFileContents(DaemonPidFilePath, str(child.pid)) + #Sleep 3 seconds to check if the process is still running + time.sleep(3) + if child.poll() is None: + waagent.Log("Daemon pid: {0}".format(child.pid)) + hutil.do_exit(0, 'Enable', 'success', '0', + 'HPC Linux node manager daemon is enabled') + else: + hutil.do_exit(1, 'Enable', 'error', '2', + 'Failed to launch HPC Linux node manager daemon') + +def daemon(): + hutil = parse_context('Enable') + try: + public_settings = hutil._context._config['runtimeSettings'][0]['handlerSettings'].get('publicSettings') + domain_fqdn = public_settings.get('DomainName') + if not domain_fqdn: + cluster_connstring = public_settings.get('ClusterConnectionString') + if not cluster_connstring: + waagent.Log("ClusterConnectionString is not specified, use ClusterName instead") + protect_settings = hutil._context._config['runtimeSettings'][0]['handlerSettings'].get('protectedSettings') + cluster_connstring = protect_settings.get('ClusterName') + headnode_name = cluster_connstring.split(',')[0].strip() + if headnode_name.find('.') > 0: + # The head node name is FQDN, extract the domain FQDN + domain_fqdn = headnode_name.split(".", 1)[1] + + if domain_fqdn: + waagent.Log("The domain FQDN is " + domain_fqdn) + _add_dns_search(domain_fqdn) + #thread.start_new_thread(_update_dns_record, (domain_fqdn,)) + + # A fix only for SUSE Linux that sometimes the hostname got changed because out-of-date host/IP entry in /etc/hosts + # It may happen when the node was assigned a different IP after deallocation + # We shall clean the current HPC related host/IP entries and add the actual IPs before fetching the hosts file from head node. + if DistroName == 'suse': + configfile = os.path.join(InstallRoot, 'nodemanager.json') + confighostname = gethostname_from_configfile(configfile) + curhostname = socket.gethostname().split('.')[0] + if confighostname.lower() != curhostname.lower(): + cleanup_host_entries() + waagent.Log("Correct the hostname from {0} to {1}".format(curhostname, confighostname)) + waagent.MyDistro.setHostname(confighostname) + waagent.MyDistro.publishHostname(confighostname) + retry = 0 + while True: + nics = get_networkinterfaces() + if len(nics) > 0: + init_suse_hostsfile(confighostname, [nic[1] for nic in nics]) + break + elif retry < 30: + waagent.Log("Failed to get network interfaces information, retry later ...") + time.sleep(2) + retry = retry + 1 + else: + waagent.Log("Failed to get network interfaces information, just clean") + break + # Mount the directory /cgroup for centos 6.* + major_version = int(DistroVersion.split('.')[0]) + if (DistroName == 'centos' or DistroName == 'redhat') and major_version < 7: + _mount_cgroup() + while True: + exe_path = os.path.join(InstallRoot, "nodemanager") + devnull = open(os.devnull, 'w') + child_process = subprocess.Popen(exe_path, stdout=devnull, stderr=devnull, cwd=InstallRoot) + if child_process.pid is None or child_process.pid < 1: + exit_msg = 'Failed to start HPC node manager process' + hutil.do_status_report('Enable', 'error', 1, exit_msg) + else: + #Sleep 1 second to check if the process is still running + time.sleep(1) + if child_process.poll() is None: + hutil.do_status_report('Enable', 'success', 0, "") + waagent.Log('HPC node manager process started') + exit_code = child_process.wait() + exit_msg = "HPC node manager process exits: {0}".format(exit_code) + hutil.do_status_report('Enable', 'warning', exit_code, exit_msg) + else: + exit_msg = "HPC node manager process crashes: {0}".format(child_process.returncode) + hutil.do_status_report('Enable', 'error', child_process.returncode, exit_msg) + waagent.Log(exit_msg) + waagent.Log("Restart HPC node manager process after {0} seconds".format(RestartIntervalInSeconds)) + time.sleep(RestartIntervalInSeconds) + + except Exception, e: + hutil.error("Failed to enable the extension with error: %s, stack trace: %s" %(str(e), traceback.format_exc())) + hutil.do_exit(1, 'Enable','error','1', 'Enable failed.') + +def uninstall(): + hutil = parse_context('Uninstall') + _uninstall_nodemanager_files() + cleanup_host_entries() + hutil.do_exit(0,'Uninstall','success','0', 'Uninstall succeeded') + +def disable(): + hutil = parse_context('Disable') + #Check whether monitor process is running. + #If it does, kill it. Otherwise clear pid file + if os.path.isfile(DaemonPidFilePath): + pid = waagent.GetFileContents(DaemonPidFilePath) + if os.path.isdir(os.path.join("/proc", pid)) and _is_nodemanager_daemon(pid): + waagent.Log(("Stop HPC node manager daemon: {0}").format(pid)) + os.killpg(int(pid), 9) + os.remove(DaemonPidFilePath) + cleanup_host_entries() + hutil.do_exit(0, 'Disable', 'success', '0', + 'HPC node manager daemon is disabled') + os.remove(DaemonPidFilePath) + + hutil.do_exit(0, 'Disable', 'success', '0', + 'HPC node manager daemon is not running') + +def update(): + hutil = parse_context('Update') + cleanup_host_entries() + configfile = os.path.join(InstallRoot, 'nodemanager.json') + if os.path.isfile(configfile): + waagent.Log("Update extension: backup the nodemanager configuration file.") + shutil.copy2(configfile, os.getcwd()) + # A fix only for SUSE Linux that sometimes the hostname got changed because out-of-date host/IP entry in /etc/hosts + # It may happen when the node was assigned a different IP after deallocation + if DistroName == 'suse': + confighostname = gethostname_from_configfile(configfile) + if confighostname: + curhostname = socket.gethostname().split('.')[0] + if confighostname.lower() != curhostname.lower(): + waagent.Log("Update: Set the hostname from {0} to {1}".format(curhostname, confighostname)) + waagent.MyDistro.setHostname(confighostname) + waagent.MyDistro.publishHostname(confighostname) + hutil.do_exit(0,'Update','success','0', 'Update Succeeded') + +if __name__ == '__main__' : + main() +