Include VM extension code
This commit is contained in:
Sunbin Zhu 2019-07-19 15:03:46 +08:00
Родитель a2ff32a012
Коммит c7c071ffa0
5 изменённых файлов: 1068 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,14 @@
[{
"name": "HPCNodeManager",
"version": 1.0,
"handlerManifest": {
"installCommand": "hpcnodemanager.py -install",
"uninstallCommand": "hpcnodemanager.py -uninstall",
"updateCommand": "hpcnodemanager.py -update",
"enableCommand": "hpcnodemanager.py -enable",
"disableCommand": "hpcnodemanager.py -disable",
"rebootAfterInstall": false,
"reportHeartbeat": false,
"updateMode": "UpdateWithInstall"
}
}]

Просмотреть файл

@ -0,0 +1,326 @@
#
# Handler library for Linux IaaS
#
# Copyright 2014 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.7+
"""
JSON def:
HandlerEnvironment.json
[{
"name": "ExampleHandlerLinux",
"seqNo": "seqNo",
"version": "1.0",
"handlerEnvironment": {
"logFolder": "<your log folder location>",
"configFolder": "<your config folder location>",
"statusFolder": "<your status folder location>",
"heartbeatFile": "<your heartbeat file location>",
}
}]
Example ./config/1.settings
"{"runtimeSettings":[{"handlerSettings":{"protectedSettingsCertThumbprint":"1BE9A13AA1321C7C515EF109746998BAB6D86FD1","protectedSettings":
"MIIByAYJKoZIhvcNAQcDoIIBuTCCAbUCAQAxggFxMIIBbQIBADBVMEExPzA9BgoJkiaJk/IsZAEZFi9XaW5kb3dzIEF6dXJlIFNlcnZpY2UgTWFuYWdlbWVudCBmb3IgR+nhc6VHQTQpCiiV2zANBgkqhkiG9w0BAQEFAASCAQCKr09QKMGhwYe+O4/a8td+vpB4eTR+BQso84cV5KCAnD6iUIMcSYTrn9aveY6v6ykRLEw8GRKfri2d6tvVDggUrBqDwIgzejGTlCstcMJItWa8Je8gHZVSDfoN80AEOTws9Fp+wNXAbSuMJNb8EnpkpvigAWU2v6pGLEFvSKC0MCjDTkjpjqciGMcbe/r85RG3Zo21HLl0xNOpjDs/qqikc/ri43Y76E/Xv1vBSHEGMFprPy/Hwo3PqZCnulcbVzNnaXN3qi/kxV897xGMPPC3IrO7Nc++AT9qRLFI0841JLcLTlnoVG1okPzK9w6ttksDQmKBSHt3mfYV+skqs+EOMDsGCSqGSIb3DQEHATAUBggqhkiG9w0DBwQITgu0Nu3iFPuAGD6/QzKdtrnCI5425fIUy7LtpXJGmpWDUA==","publicSettings":{"port":"3000"}}}]}"
Example HeartBeat
{
"version": 1.0,
"heartbeat" : {
"status": "ready",
"code": 0,
"Message": "Sample Handler running. Waiting for a new configuration from user."
}
}
Example Status Report:
[{"version":"1.0","timestampUTC":"2014-05-29T04:20:13Z","status":{"name":"Chef Extension Handler","operation":"chef-client-run","status":"success","code":0,"formattedMessage":{"lang":"en-US","message":"Chef-client run success"}}}]
"""
import os
import os.path
import sys
import imp
import base64
import json
import time
from os.path import join
from Utils.WAAgentUtil import waagent
from waagent import LoggerInit
DateTimeFormat = "%Y-%m-%dT%H:%M:%SZ"
class HandlerContext:
def __init__(self,name):
self._name = name
self._version = '0.0'
self._config_dir = None
self._log_dir = None
self._log_file = None
self._status_dir = None
self._heartbeat_file = None
self._seq_no = -1
self._status_file = None
self._settings_file = None
self._config = None
return
class HandlerUtility:
def __init__(self, log, error, short_name):
self._log = log
self._error = error
self._short_name = short_name
def _get_log_prefix(self):
return '[%s-%s]' %(self._context._name, self._context._version)
def _get_current_seq_no(self, config_folder):
seq_no = -1
cur_seq_no = -1
freshest_time = None
for subdir, dirs, files in os.walk(config_folder):
for file in files:
try:
cur_seq_no = int(os.path.basename(file).split('.')[0])
if(freshest_time == None):
freshest_time = os.path.getmtime(join(config_folder,file))
seq_no = cur_seq_no
else:
current_file_m_time = os.path.getmtime(join(config_folder,file))
if(current_file_m_time > freshest_time):
freshest_time=current_file_m_time
seq_no = cur_seq_no
except ValueError:
continue
return seq_no
def log(self, message):
self._log(self._get_log_prefix() + message)
def error(self, message):
self._error(self._get_log_prefix() + message)
def _parse_config(self, ctxt):
config = None
try:
config=json.loads(ctxt)
except:
self.error('JSON exception decoding ' + ctxt)
if config == None:
self.error("JSON error processing settings file:" + ctxt)
else:
handlerSettings = config['runtimeSettings'][0]['handlerSettings']
if handlerSettings.has_key('protectedSettings') and \
handlerSettings.has_key("protectedSettingsCertThumbprint") and \
handlerSettings['protectedSettings'] is not None and \
handlerSettings["protectedSettingsCertThumbprint"] is not None:
protectedSettings = handlerSettings['protectedSettings']
thumb=handlerSettings['protectedSettingsCertThumbprint']
cert=waagent.LibDir+'/'+thumb+'.crt'
pkey=waagent.LibDir+'/'+thumb+'.prv'
unencodedSettings = base64.standard_b64decode(protectedSettings)
openSSLcmd = "openssl smime -inform DER -decrypt -recip {0} -inkey {1}"
cleartxt = waagent.RunSendStdin(openSSLcmd.format(cert, pkey), unencodedSettings)[1]
if cleartxt == None:
self.error("OpenSSh decode error using thumbprint " + thumb )
self.do_exit(1,operation,'error','1', 'Failed decrypting protectedSettings')
jctxt=''
try:
jctxt=json.loads(cleartxt)
except:
self.error('JSON exception decoding ' + cleartxt)
handlerSettings['protectedSettings']=jctxt
self.log('Config decoded correctly.')
return config
def do_parse_context(self,operation):
_context = self.try_parse_context()
if not _context:
self.do_exit(1,operation,'error','1', operation + ' Failed')
return _context
def try_parse_context(self):
self._context = HandlerContext(self._short_name)
handler_env=None
config=None
ctxt=None
code=0
# get the HandlerEnvironment.json. According to the extension handler spec, it is always in the ./ directory
self.log('cwd is ' + os.path.realpath(os.path.curdir))
handler_env_file='./HandlerEnvironment.json'
if not os.path.isfile(handler_env_file):
self.error("Unable to locate " + handler_env_file)
return None
ctxt = waagent.GetFileContents(handler_env_file)
if ctxt == None :
self.error("Unable to read " + handler_env_file)
try:
handler_env=json.loads(ctxt)
except:
pass
if handler_env == None :
self.log("JSON error processing " + handler_env_file)
return None
if type(handler_env) == list:
handler_env = handler_env[0]
self._context._name = handler_env['name']
self._context._version = str(handler_env['version'])
self._context._config_dir=handler_env['handlerEnvironment']['configFolder']
self._context._log_dir= handler_env['handlerEnvironment']['logFolder']
self._context._log_file= os.path.join(handler_env['handlerEnvironment']['logFolder'],'extension.log')
self._change_log_file()
self._context._status_dir=handler_env['handlerEnvironment']['statusFolder']
self._context._heartbeat_file=handler_env['handlerEnvironment']['heartbeatFile']
self._context._seq_no = self._get_current_seq_no(self._context._config_dir)
if self._context._seq_no < 0:
self.error("Unable to locate a .settings file!")
return None
self._context._seq_no = str(self._context._seq_no)
self.log('sequence number is ' + self._context._seq_no)
self._context._status_file= os.path.join(self._context._status_dir, self._context._seq_no +'.status')
self._context._settings_file = os.path.join(self._context._config_dir, self._context._seq_no + '.settings')
self.log("setting file path is" + self._context._settings_file)
ctxt=None
ctxt=waagent.GetFileContents(self._context._settings_file)
if ctxt == None :
error_msg = 'Unable to read ' + self._context._settings_file + '. '
self.error(error_msg)
return None
self.log("JSON config: " + ctxt)
self._context._config = self._parse_config(ctxt)
return self._context
def _change_log_file(self):
self.log("Change log file to " + self._context._log_file)
LoggerInit(self._context._log_file,'/dev/stdout')
self._log = waagent.Log
self._error = waagent.Error
def set_verbose_log(self, verbose):
if(verbose == "1" or verbose == 1):
self.log("Enable verbose log")
LoggerInit(self._context._log_file, '/dev/stdout', verbose=True)
else:
self.log("Disable verbose log")
LoggerInit(self._context._log_file, '/dev/stdout', verbose=False)
def is_seq_smaller(self):
return int(self._context._seq_no) <= self._get_most_recent_seq()
def save_seq(self):
self._set_most_recent_seq(self._context._seq_no)
self.log("set most recent sequence number to " + self._context._seq_no)
def exit_if_enabled(self):
self.exit_if_seq_smaller()
def exit_if_seq_smaller(self):
if(self.is_seq_smaller()):
self.log("Current sequence number, " + self._context._seq_no + ", is not greater than the sequnce number of the most recent executed configuration. Exiting...")
sys.exit(0)
self.save_seq()
def _get_most_recent_seq(self):
if(os.path.isfile('mrseq')):
seq = waagent.GetFileContents('mrseq')
if(seq):
return int(seq)
return -1
def is_current_config_seq_greater_inused(self):
return int(self._context._seq_no) > self._get_most_recent_seq()
def get_inused_config_seq(self):
return self._get_most_recent_seq()
def set_inused_config_seq(self,seq):
self._set_most_recent_seq(seq)
def _set_most_recent_seq(self,seq):
waagent.SetFileContents('mrseq', str(seq))
def do_status_report(self, operation, status, status_code, message):
self.log("{0},{1},{2},{3}".format(operation, status, status_code, message))
tstamp=time.strftime(DateTimeFormat, time.gmtime())
stat = [{
"version" : self._context._version,
"timestampUTC" : tstamp,
"status" : {
"name" : self._context._name,
"operation" : operation,
"status" : status,
"code" : status_code,
"formattedMessage" : {
"lang" : "en-US",
"message" : message
}
}
}]
stat_rept = json.dumps(stat)
if self._context._status_file:
tmp = "%s.tmp" %(self._context._status_file)
with open(tmp,'w+') as f:
f.write(stat_rept)
os.rename(tmp, self._context._status_file)
def do_heartbeat_report(self, heartbeat_file,status,code,message):
# heartbeat
health_report='[{"version":"1.0","heartbeat":{"status":"' + status+ '","code":"'+ code + '","Message":"' + message + '"}}]'
if waagent.SetFileContents(heartbeat_file,health_report) == None :
self.error('Unable to wite heartbeat info to ' + heartbeat_file)
def do_exit(self,exit_code,operation,status,code,message):
try:
self.do_status_report(operation, status,code,message)
except Exception as e:
self.log("Can't update status: "+str(e))
sys.exit(exit_code)
def get_name(self):
return self._context._name
def get_seq_no(self):
return self._context._seq_no
def get_log_dir(self):
return self._context._log_dir
def get_handler_settings(self):
if (self._context._config != None):
return self._context._config['runtimeSettings'][0]['handlerSettings']
return None
def get_protected_settings(self):
if (self._context._config != None):
return self.get_handler_settings().get('protectedSettings')
return None
def get_public_settings(self):
handlerSettings = self.get_handler_settings()
if (handlerSettings != None):
return self.get_handler_settings().get('publicSettings')
return None

Просмотреть файл

@ -0,0 +1,90 @@
# Wrapper module for waagent
#
# waagent is not written as a module. This wrapper module is created
# to use the waagent code as a module.
#
# Copyright 2014 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.7+
#
import imp
import os
import os.path
#
# The following code will search and load waagent code and expose
# it as a submodule of current module
#
def searchWAAgent():
# if the extension ships waagent in its package to default to this version first
pkg_agent_path = os.path.join(os.getcwd(), 'waagent')
if os.path.isfile(pkg_agent_path):
return pkg_agent_path
agentPath = '/usr/sbin/waagent'
if os.path.isfile(agentPath):
return agentPath
user_paths = os.environ['PYTHONPATH'].split(os.pathsep)
for user_path in user_paths:
agentPath = os.path.join(user_path, 'waagent')
if os.path.isfile(agentPath):
return agentPath
return None
waagent = None
agentPath = searchWAAgent()
if agentPath:
waagent = imp.load_source('waagent', agentPath)
else:
raise Exception("Can't load waagent.")
if not hasattr(waagent, "AddExtensionEvent"):
"""
If AddExtensionEvent is not defined, provide a dummy impl.
"""
def _AddExtensionEvent(*args, **kwargs):
pass
waagent.AddExtensionEvent = _AddExtensionEvent
if not hasattr(waagent, "WALAEventOperation"):
class _WALAEventOperation:
HeartBeat="HeartBeat"
Provision = "Provision"
Install = "Install"
UnIsntall = "UnInstall"
Disable = "Disable"
Enable = "Enable"
Download = "Download"
Upgrade = "Upgrade"
Update = "Update"
waagent.WALAEventOperation = _WALAEventOperation
__ExtensionName__=None
def InitExtensionEventLog(name):
global __ExtensionName__
__ExtensionName__ = name
def AddExtensionEvent(name=__ExtensionName__,
op=waagent.WALAEventOperation.Enable,
isSuccess=False,
message=None):
if name is not None:
waagent.AddExtensionEvent(name=name,
op=op,
isSuccess=isSuccess,
message=message)

Просмотреть файл

@ -0,0 +1,19 @@
#
# Copyright 2014 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.7+
#

Просмотреть файл

@ -0,0 +1,619 @@
#!/usr/bin/env python
#
# HPCNodeManager extension
#
# Copyright 2015 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.7+
import os
import sys
import json
import subprocess
import re
import time
import traceback
import socket
import shutil
import platform
import struct
import array
import fcntl
import hashlib
from Utils.WAAgentUtil import waagent
import Utils.HandlerUtil as Util
#Define global variables
ExtensionShortName = 'HPCNodeManager'
DaemonPidFilePath = '/var/run/hpcnmdaemon.pid'
InstallRoot = '/opt/hpcnodemanager'
DistroName = None
DistroVersion = None
RestartIntervalInSeconds = 60
def main():
waagent.LoggerInit('/var/log/waagent.log','/dev/stdout')
waagent.Log("%s started to handle." %(ExtensionShortName))
waagent.MyDistro = waagent.GetMyDistro()
global DistroName, DistroVersion
distro = platform.dist()
DistroName = distro[0].lower()
DistroVersion = distro[1]
for a in sys.argv[1:]:
if re.match("^([-/]*)(disable)", a):
disable()
elif re.match("^([-/]*)(uninstall)", a):
uninstall()
elif re.match("^([-/]*)(install)", a):
install()
elif re.match("^([-/]*)(enable)", a):
enable()
elif re.match("^([-/]*)(daemon)", a):
daemon()
elif re.match("^([-/]*)(update)", a):
update()
def _is_nodemanager_daemon(pid):
retcode, output = waagent.RunGetOutput("ps -p {0} -o cmd=".format(pid))
if retcode == 0:
waagent.Log("The cmd for process {0} is {1}".format(pid, output))
pattern = r'(.*[/\s])?{0}\s+[-/]*daemon$'.format(os.path.basename(__file__))
if re.match(pattern, output):
return True
waagent.Log("The process {0} is not HPC Linux node manager daemon".format(pid))
return False
def install_package(package_name):
if DistroName == "centos" or DistroName == "redhat":
cmd = "yum -y install " + package_name
elif DistroName == "ubuntu":
waagent.Log("Updating apt package lists with command: apt-get -y update")
exitcode = waagent.Run("apt-get -y update", chk_err=False)
if exitcode != 0:
waagent.Log("Update apt package lists failed with exitcode: {0}".format(exitcode))
cmd = "apt-get -y install " + package_name
elif DistroName == "suse":
if not os.listdir('/etc/zypp/repos.d'):
waagent.Run("zypper ar http://download.opensuse.org/distribution/13.2/repo/oss/suse/ opensuse")
cmd = "zypper -n --gpg-auto-import-keys install --force-resolution -l " + package_name
else:
cmd = "zypper -n install --force-resolution -l " + package_name
else:
raise Exception("Unsupported Linux Distro.")
waagent.Log("The command to install {0}: {1}".format(package_name, cmd))
attempt = 1
while(True):
waagent.Log("Installing package {0} (Attempt {1})".format(package_name, attempt))
retcode, retoutput = waagent.RunGetOutput(cmd)
if retcode == 0:
waagent.Log("package {0} installation succeeded".format(package_name))
break
else:
waagent.Log("package {0} installation failed {1}:\n {2}".format(package_name, retcode, retoutput))
if attempt < 10:
time.sleep(min(30, pow(2, attempt)))
attempt += 1
if DistroName == 'suse' and retcode == 104:
waagent.Run("zypper ar http://download.opensuse.org/distribution/13.2/repo/oss/suse/ opensuse")
cmd = "zypper -n --gpg-auto-import-keys install --force-resolution -l " + package_name
elif DistroName == "ubuntu":
waagent.Run("apt-get -y update", chk_err=False)
continue
else:
raise Exception("failed to install package {0}:{1}".format(package_name, retcode))
def _uninstall_nodemanager_files():
if os.path.isdir(InstallRoot):
for tmpname in os.listdir(InstallRoot):
if tmpname == 'logs':
continue
if tmpname == 'certs':
continue
if tmpname == 'filters':
continue
tmppath = os.path.join(InstallRoot, tmpname)
if os.path.isdir(tmppath):
shutil.rmtree(tmppath)
elif os.path.isfile(tmppath):
os.remove(tmppath)
def _install_cgroup_tool():
if waagent.Run("command -v cgexec", chk_err=False) == 0:
waagent.Log("cgroup tools was already installed")
else:
waagent.Log("Start to install cgroup tools")
if DistroName == "ubuntu":
cg_pkgname = 'cgroup-bin'
elif (DistroName == "centos" or DistroName == "redhat") and re.match("^6", DistroVersion):
cg_pkgname = 'libcgroup'
else:
cg_pkgname = 'libcgroup-tools'
install_package(cg_pkgname)
waagent.Log("cgroup tool was successfully installed")
def _install_sysstat():
if waagent.Run("command -v iostat", chk_err=False) == 0:
waagent.Log("sysstat was already installed")
else:
waagent.Log("Start to install sysstat")
install_package('sysstat')
waagent.Log("sysstat was successfully installed")
def _install_pstree():
if waagent.Run("command -v pstree", chk_err=False) == 0:
waagent.Log("pstree was already installed")
else:
waagent.Log("Start to install pstree")
install_package('psmisc')
waagent.Log("pstree was successfully installed")
def get_networkinterfaces():
"""
Return the interface name, and ip addr of the
all non loopback interfaces.
"""
expected=16 # how many devices should I expect...
is_64bits = sys.maxsize > 2**32
struct_size=40 if is_64bits else 32 # for 64bit the size is 40 bytes, for 32bits it is 32 bytes.
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
buff=array.array('B', b'\0' * (expected*struct_size))
retsize=(struct.unpack('iL', fcntl.ioctl(s.fileno(), 0x8912, struct.pack('iL',expected*struct_size,buff.buffer_info()[0]))))[0]
if retsize == (expected*struct_size) :
waagent.Log('SIOCGIFCONF returned more than ' + str(expected) + ' up network interfaces.')
nics = []
s=buff.tostring()
for i in range(0,retsize,struct_size):
iface=s[i:i+16].split(b'\0', 1)[0]
if iface == b'lo':
continue
else:
nics.append((iface.decode('latin-1'), socket.inet_ntoa(s[i+20:i+24])))
return nics
def cleanup_host_entries():
hostsfile = '/etc/hosts'
if not os.path.isfile(hostsfile):
return
try:
hpcentryexists = False
newcontent=''
with open(hostsfile, 'r') as F:
for line in F.readlines():
if re.match(r"^[0-9\.]+\s+[^\s#]+\s+#HPCD?\s*$", line):
hpcentryexists = True
else:
newcontent += line
if hpcentryexists:
waagent.Log("Clean all HPC related host entries from hosts file")
waagent.ReplaceFileContentsAtomic(hostsfile,newcontent)
os.chmod(hostsfile, 0o644)
except :
raise
def init_suse_hostsfile(host_name, ipaddrs):
hostsfile = '/etc/hosts'
if not os.path.isfile(hostsfile):
return
try:
newhpcd_entries = ''
for ipaddr in ipaddrs:
newhpcd_entries += '{0:24}{1:30}#HPCD\n'.format(ipaddr, host_name)
curhpcd_entries = ''
newcontent = ''
hpcentryexists = False
with open(hostsfile, 'r') as F:
for line in F.readlines():
if re.match(r"^[0-9\.]+\s+[^\s#]+\s+#HPCD\s*$", line):
curhpcd_entries += line
hpcentryexists = True
elif re.match(r"^[0-9\.]+\s+[^\s#]+\s+#HPC\s*$", line):
hpcentryexists = True
else:
newcontent += line
if newhpcd_entries != curhpcd_entries:
if hpcentryexists:
waagent.Log("Clean the HPC related host entries from hosts file")
waagent.Log("Add the following HPCD host entries:\n{0}".format(newhpcd_entries))
if newcontent and newcontent[-1] != '\n':
newcontent += '\n'
newcontent += newhpcd_entries
waagent.ReplaceFileContentsAtomic(hostsfile,newcontent)
os.chmod(hostsfile, 0o644)
except :
raise
def gethostname_from_configfile(configfile):
config_hostname = None
if os.path.isfile(configfile):
with open(configfile, 'r') as F:
configjson = json.load(F)
if 'RegisterUri' in configjson:
reguri = configjson['RegisterUri']
reguri = reguri[0:reguri.rindex('/')]
config_hostname = reguri[reguri.rindex('/')+1:]
return config_hostname
def _add_dns_search(domain_fqdn):
need_update = False
new_content = ''
for line in (open('/etc/resolv.conf','r')).readlines():
if re.match('^search.* {0}'.format(domain_fqdn), line):
waagent.Log('{0} was already added in /etc/resolv.conf'.format(domain_fqdn))
return
if re.match('^search', line):
need_update = True
new_content += line.replace('search', 'search {0}'.format(domain_fqdn))
else:
new_content += line
if need_update:
waagent.Log('Adding {0} to /etc/resolv.conf'.format(domain_fqdn))
waagent.SetFileContents('/etc/resolv.conf', new_content)
def _update_dns_record(domain_fqdn):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
try:
s.connect((domain_fqdn, 53))
break
except Exception, e:
waagent.Log('Failed to connect to {0}:53: {1}'.format(domain_fqdn, e))
ipaddr = s.getsockname()[0]
host_fqdn = "{0}.{1}".format(socket.gethostname().split('.')[0], domain_fqdn)
dns_cmd = 'echo -e "server {0}\nzone {0}\nupdate delete {1}\nupdate add {1} 864000 A {2}\nsend\n" | nsupdate -v'.format(domain_fqdn, host_fqdn, ipaddr)
waagent.Log('The command to update ip to dns server is: {0}'.format(dns_cmd))
retry = 0
while retry < 60:
dns_ret, dns_msg = waagent.RunGetOutput(dns_cmd)
if dns_ret == 0:
waagent.Log("Succeeded to update ip to dns server.")
return
else:
retry = retry + 1
waagent.Log("Failed to update ip to dns server: {0}, {1}".format(dns_ret, dns_msg))
time.sleep(10)
def _mount_cgroup():
if not os.path.isdir('/cgroup'):
os.mkdir('/cgroup')
if not os.listdir('/cgroup'):
retcode, mount_msg = waagent.RunGetOutput('mount -t cgroup cgroup /cgroup')
waagent.Log("mount /cgroup directory {0}:{1}".format(retcode, mount_msg))
if retcode == 0:
waagent.Log("/cgroup directory is successfully mounted.")
else:
raise Exception("failed to mount /cgroup directory")
else:
waagent.Log("/cgroup directory was already mounted.")
def config_firewall_rules():
if DistroName == 'redhat':
waagent.Log('Configuring the firewall rules')
major_version = int(DistroVersion.split('.')[0])
if major_version < 7:
waagent.Run('lokkit --port=40000:tcp --update', chk_err=False)
elif waagent.Run("firewall-cmd --state", chk_err=False) == 0:
waagent.Run("firewall-cmd --permanent --zone=public --add-port=40000/tcp")
waagent.Run("firewall-cmd --reload")
def parse_context(operation):
hutil = Util.HandlerUtility(waagent.Log, waagent.Error, ExtensionShortName)
hutil.do_parse_context(operation)
return hutil
def cmpFileHash(file1, file2):
if not (os.path.isfile(file1) and os.path.isfile(file2)):
return False
digests = []
for filename in [file1, file2]:
md5hash = hashlib.md5()
with open(filename, 'rb') as f:
buf = f.read()
md5hash.update(buf)
digest = md5hash.hexdigest()
digests.append(digest)
return digests[0] == digests[1]
def install():
hutil = parse_context('Install')
try:
cleanup_host_entries()
_uninstall_nodemanager_files()
if DistroName == "centos" or DistroName == "redhat":
waagent.Run("yum-config-manager --setopt=\\*.skip_if_unavailable=1 --save", chk_err=False)
_install_cgroup_tool()
_install_sysstat()
_install_pstree()
logDir = os.path.join(InstallRoot, "logs")
if not os.path.isdir(logDir):
os.makedirs(logDir)
srcDir = os.path.join(os.getcwd(), "bin")
waagent.RunGetOutput("chmod +x {0}/*".format(srcDir))
waagent.RunGetOutput("chmod +x {0}/lib/*".format(srcDir))
for filename in os.listdir(srcDir):
srcname = os.path.join(srcDir, filename)
destname = os.path.join(InstallRoot, filename)
if os.path.isfile(srcname):
shutil.copy2(srcname, destname)
elif os.path.isdir(srcname):
shutil.copytree(srcname, destname)
libdir = os.path.join(InstallRoot, 'lib')
for tmpname in os.listdir(libdir):
tmppath = os.path.join(libdir, tmpname)
if tmpname.endswith(".tar.gz") and os.path.isfile(tmppath):
waagent.Run("tar xzvf {0} -C {1}".format(tmppath, libdir))
os.remove(tmppath)
waagent.Run("chmod -R 755 {0}".format(libdir))
host_name = None
public_settings = hutil._context._config['runtimeSettings'][0]['handlerSettings'].get('publicSettings')
if public_settings:
host_name = public_settings.get('HostName')
backup_configfile = os.path.join(os.getcwd(), 'nodemanager.json')
if not host_name:
# if there is backup nodemanager.json, means it is an update install, if 'HostName' not defined in the extension
# settings, we shall get from the backup nodemanager.json
if os.path.isfile(backup_configfile):
waagent.Log("Backup nodemanager configuration file found")
host_name = gethostname_from_configfile(backup_configfile)
curhostname = socket.gethostname().split('.')[0]
if host_name:
if host_name.lower() != curhostname.lower():
waagent.Log("HostName was set: hostname from {0} to {1}".format(curhostname, host_name))
waagent.MyDistro.setHostname(host_name)
waagent.MyDistro.publishHostname(host_name)
else:
host_name = curhostname
public_settings = hutil._context._config['runtimeSettings'][0]['handlerSettings'].get('publicSettings')
cluster_connstring = public_settings.get('ClusterConnectionString')
if not cluster_connstring:
waagent.Log("ClusterConnectionString is not specified")
protect_settings = hutil._context._config['runtimeSettings'][0]['handlerSettings'].get('protectedSettings')
cluster_connstring = protect_settings.get('ClusterName')
if not cluster_connstring:
error_msg = "neither ClusterConnectionString nor ClusterName is specified."
hutil.error(error_msg)
raise ValueError(error_msg)
ssl_thumbprint = public_settings.get('SSLThumbprint')
certsdir = os.path.join(InstallRoot, "certs")
if not ssl_thumbprint:
api_prefix = "http://{0}:80/HpcLinux/api/"
listen_uri = "http://0.0.0.0:40000"
else:
api_prefix = "https://{0}:443/HpcLinux/api/"
listen_uri = "https://0.0.0.0:40002"
# import the ssl certificate for hpc nodemanager
if not os.path.isdir(certsdir):
os.makedirs(certsdir, 0o750)
else:
os.chmod(certsdir, 0o750)
ssl_thumbprint = ssl_thumbprint.upper()
prvfile = os.path.join("/var/lib/waagent", ssl_thumbprint + ".prv")
srccrtfile = os.path.join("/var/lib/waagent", ssl_thumbprint + ".crt")
rsakeyfile = os.path.join(certsdir, "nodemanager_rsa.key")
dstcrtfile = os.path.join(certsdir, "nodemanager.crt")
if os.path.isfile(prvfile) and not cmpFileHash(prvfile, rsakeyfile):
waagent.Run("rm -rf {0}/nodemanager.crt {0}/nodemanager.key {0}/nodemanager.pem {0}/nodemanager_rsa.key".format(certsdir), chk_err=False)
shutil.copy2(prvfile, rsakeyfile)
shutil.copy2(srccrtfile, dstcrtfile)
shutil.copy2(dstcrtfile, os.path.join(certsdir, "nodemanager.pem"))
waagent.Run("openssl rsa -in {0}/nodemanager_rsa.key -out {0}/nodemanager.key".format(certsdir))
waagent.Run("chmod 640 {0}/nodemanager.crt {0}/nodemanager.key {0}/nodemanager.pem {0}/nodemanager_rsa.key".format(certsdir))
node_uri = api_prefix + host_name + "/computenodereported"
reg_uri = api_prefix + host_name + "/registerrequested"
hostsfile_uri = api_prefix + "hostsfile"
metric_ids_uri = api_prefix + host_name + "/getinstanceids"
namingSvcUris = ['https://{0}:443/HpcNaming/api/fabric/resolve/singleton/'.format(h.split('.')[0].strip()) for h in cluster_connstring.split(',')]
if os.path.isfile(backup_configfile):
with open(backup_configfile, 'r') as F:
configjson = json.load(F)
configjson["NamingServiceUri"] = namingSvcUris
configjson["HeartbeatUri"] = node_uri
configjson["RegisterUri"] = reg_uri
configjson["HostsFileUri"] = hostsfile_uri
configjson["MetricInstanceIdsUri"] = metric_ids_uri
configjson["MetricUri"] = ""
configjson["ListeningUri"] = listen_uri
else:
configjson = {
"ConfigVersion": "1.0",
"NamingServiceUri": namingSvcUris,
"HeartbeatUri": node_uri,
"RegisterUri": reg_uri,
"MetricUri": "",
"MetricInstanceIdsUri": metric_ids_uri,
"HostsFileUri": hostsfile_uri,
"HostsFetchInterval": 120,
"ListeningUri": listen_uri,
"DefaultServiceName": "SchedulerStatefulService",
"UdpMetricServiceName": "MonitoringStatefulService"
}
if ssl_thumbprint:
configjson["TrustedCAFile"] = os.path.join(certsdir, "nodemanager.pem")
configjson["CertificateChainFile"] = os.path.join(certsdir, "nodemanager.crt")
configjson["PrivateKeyFile"] = os.path.join(certsdir, "nodemanager.key")
configfile = os.path.join(InstallRoot, 'nodemanager.json')
waagent.SetFileContents(configfile, json.dumps(configjson))
shutil.copy2(configfile, backup_configfile)
config_firewall_rules()
hutil.do_exit(0, 'Install', 'success', '0', 'Install Succeeded.')
except Exception, e:
hutil.do_exit(1, 'Install','error','1', '{0}'.format(e))
def enable():
#Check whether monitor process is running.
#If it does, return. Otherwise clear pid file
hutil = parse_context('Enable')
if os.path.isfile(DaemonPidFilePath):
pid = waagent.GetFileContents(DaemonPidFilePath)
if os.path.isdir(os.path.join("/proc", pid)) and _is_nodemanager_daemon(pid):
if hutil.is_seq_smaller():
hutil.do_exit(0, 'Enable', 'success', '0',
'HPC Linux node manager daemon is already running')
else:
waagent.Log("Stop old daemon: {0}".format(pid))
os.killpg(int(pid), 9)
os.remove(DaemonPidFilePath)
args = [os.path.join(os.getcwd(), __file__), "daemon"]
devnull = open(os.devnull, 'w')
child = subprocess.Popen(args, stdout=devnull, stderr=devnull, preexec_fn=os.setsid)
if child.pid is None or child.pid < 1:
hutil.do_exit(1, 'Enable', 'error', '1',
'Failed to launch HPC Linux node manager daemon')
else:
hutil.save_seq()
waagent.SetFileContents(DaemonPidFilePath, str(child.pid))
#Sleep 3 seconds to check if the process is still running
time.sleep(3)
if child.poll() is None:
waagent.Log("Daemon pid: {0}".format(child.pid))
hutil.do_exit(0, 'Enable', 'success', '0',
'HPC Linux node manager daemon is enabled')
else:
hutil.do_exit(1, 'Enable', 'error', '2',
'Failed to launch HPC Linux node manager daemon')
def daemon():
hutil = parse_context('Enable')
try:
public_settings = hutil._context._config['runtimeSettings'][0]['handlerSettings'].get('publicSettings')
domain_fqdn = public_settings.get('DomainName')
if not domain_fqdn:
cluster_connstring = public_settings.get('ClusterConnectionString')
if not cluster_connstring:
waagent.Log("ClusterConnectionString is not specified, use ClusterName instead")
protect_settings = hutil._context._config['runtimeSettings'][0]['handlerSettings'].get('protectedSettings')
cluster_connstring = protect_settings.get('ClusterName')
headnode_name = cluster_connstring.split(',')[0].strip()
if headnode_name.find('.') > 0:
# The head node name is FQDN, extract the domain FQDN
domain_fqdn = headnode_name.split(".", 1)[1]
if domain_fqdn:
waagent.Log("The domain FQDN is " + domain_fqdn)
_add_dns_search(domain_fqdn)
#thread.start_new_thread(_update_dns_record, (domain_fqdn,))
# A fix only for SUSE Linux that sometimes the hostname got changed because out-of-date host/IP entry in /etc/hosts
# It may happen when the node was assigned a different IP after deallocation
# We shall clean the current HPC related host/IP entries and add the actual IPs before fetching the hosts file from head node.
if DistroName == 'suse':
configfile = os.path.join(InstallRoot, 'nodemanager.json')
confighostname = gethostname_from_configfile(configfile)
curhostname = socket.gethostname().split('.')[0]
if confighostname.lower() != curhostname.lower():
cleanup_host_entries()
waagent.Log("Correct the hostname from {0} to {1}".format(curhostname, confighostname))
waagent.MyDistro.setHostname(confighostname)
waagent.MyDistro.publishHostname(confighostname)
retry = 0
while True:
nics = get_networkinterfaces()
if len(nics) > 0:
init_suse_hostsfile(confighostname, [nic[1] for nic in nics])
break
elif retry < 30:
waagent.Log("Failed to get network interfaces information, retry later ...")
time.sleep(2)
retry = retry + 1
else:
waagent.Log("Failed to get network interfaces information, just clean")
break
# Mount the directory /cgroup for centos 6.*
major_version = int(DistroVersion.split('.')[0])
if (DistroName == 'centos' or DistroName == 'redhat') and major_version < 7:
_mount_cgroup()
while True:
exe_path = os.path.join(InstallRoot, "nodemanager")
devnull = open(os.devnull, 'w')
child_process = subprocess.Popen(exe_path, stdout=devnull, stderr=devnull, cwd=InstallRoot)
if child_process.pid is None or child_process.pid < 1:
exit_msg = 'Failed to start HPC node manager process'
hutil.do_status_report('Enable', 'error', 1, exit_msg)
else:
#Sleep 1 second to check if the process is still running
time.sleep(1)
if child_process.poll() is None:
hutil.do_status_report('Enable', 'success', 0, "")
waagent.Log('HPC node manager process started')
exit_code = child_process.wait()
exit_msg = "HPC node manager process exits: {0}".format(exit_code)
hutil.do_status_report('Enable', 'warning', exit_code, exit_msg)
else:
exit_msg = "HPC node manager process crashes: {0}".format(child_process.returncode)
hutil.do_status_report('Enable', 'error', child_process.returncode, exit_msg)
waagent.Log(exit_msg)
waagent.Log("Restart HPC node manager process after {0} seconds".format(RestartIntervalInSeconds))
time.sleep(RestartIntervalInSeconds)
except Exception, e:
hutil.error("Failed to enable the extension with error: %s, stack trace: %s" %(str(e), traceback.format_exc()))
hutil.do_exit(1, 'Enable','error','1', 'Enable failed.')
def uninstall():
hutil = parse_context('Uninstall')
_uninstall_nodemanager_files()
cleanup_host_entries()
hutil.do_exit(0,'Uninstall','success','0', 'Uninstall succeeded')
def disable():
hutil = parse_context('Disable')
#Check whether monitor process is running.
#If it does, kill it. Otherwise clear pid file
if os.path.isfile(DaemonPidFilePath):
pid = waagent.GetFileContents(DaemonPidFilePath)
if os.path.isdir(os.path.join("/proc", pid)) and _is_nodemanager_daemon(pid):
waagent.Log(("Stop HPC node manager daemon: {0}").format(pid))
os.killpg(int(pid), 9)
os.remove(DaemonPidFilePath)
cleanup_host_entries()
hutil.do_exit(0, 'Disable', 'success', '0',
'HPC node manager daemon is disabled')
os.remove(DaemonPidFilePath)
hutil.do_exit(0, 'Disable', 'success', '0',
'HPC node manager daemon is not running')
def update():
hutil = parse_context('Update')
cleanup_host_entries()
configfile = os.path.join(InstallRoot, 'nodemanager.json')
if os.path.isfile(configfile):
waagent.Log("Update extension: backup the nodemanager configuration file.")
shutil.copy2(configfile, os.getcwd())
# A fix only for SUSE Linux that sometimes the hostname got changed because out-of-date host/IP entry in /etc/hosts
# It may happen when the node was assigned a different IP after deallocation
if DistroName == 'suse':
confighostname = gethostname_from_configfile(configfile)
if confighostname:
curhostname = socket.gethostname().split('.')[0]
if confighostname.lower() != curhostname.lower():
waagent.Log("Update: Set the hostname from {0} to {1}".format(curhostname, confighostname))
waagent.MyDistro.setHostname(confighostname)
waagent.MyDistro.publishHostname(confighostname)
hutil.do_exit(0,'Update','success','0', 'Update Succeeded')
if __name__ == '__main__' :
main()