зеркало из https://github.com/Azure/iotedgedev.git
314 строки
15 KiB
Python
314 строки
15 KiB
Python
"""
|
|
This module provides interfaces to manipulate IoT Edge deployment manifest (deployment.json)
|
|
and deployment manifest template (deployment.template.json)
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import functools
|
|
|
|
import jsonschema
|
|
from urllib.request import urlopen
|
|
|
|
from .utility import Utility
|
|
from .constants import Constants
|
|
|
|
|
|
TWIN_VALUE_MAX_SIZE = 512
|
|
TWIN_VALUE_MAX_CHUNKS = 8
|
|
|
|
|
|
class DeploymentManifest:
|
|
def __init__(self, envvars, output, utility, path, is_template, expand_vars=True):
|
|
self.envvars = envvars
|
|
self.utility = utility
|
|
self.output = output
|
|
try:
|
|
self.path = path
|
|
self.is_template = is_template
|
|
self.json = json.loads(Utility.get_file_contents(path, expandvars=expand_vars))
|
|
except FileNotFoundError:
|
|
if is_template:
|
|
deployment_manifest_path = self.envvars.DEPLOYMENT_CONFIG_FILE_PATH
|
|
if os.path.exists(deployment_manifest_path):
|
|
self.output.error('Deployment manifest template file "{0}" not found'.format(path))
|
|
if output.confirm('Would you like to make a copy of the deployment manifest file "{0}" as the deployment template file?'.format(deployment_manifest_path), default=True):
|
|
shutil.copyfile(deployment_manifest_path, path)
|
|
self.json = json.load(open(self.envvars.DEPLOYMENT_CONFIG_FILE_PATH))
|
|
self.envvars.save_envvar("DEPLOYMENT_CONFIG_TEMPLATE_FILE", path)
|
|
else:
|
|
raise FileNotFoundError('Deployment manifest file "{0}" not found'.format(path))
|
|
else:
|
|
raise FileNotFoundError('Deployment manifest file "{0}" not found'.format(path))
|
|
|
|
def add_module_template(self, module_name, create_options={}, is_debug=False):
|
|
"""Add a module template to the deployment manifest"""
|
|
new_module = {
|
|
"version": "1.0",
|
|
"type": "docker",
|
|
"status": "running",
|
|
"restartPolicy": "always",
|
|
"settings": {
|
|
"image": DeploymentManifest.get_image_placeholder(module_name, is_debug),
|
|
"createOptions": create_options
|
|
}
|
|
}
|
|
|
|
try:
|
|
self.utility.nested_set(self._get_module_content(), ["$edgeAgent", "properties.desired", "modules", module_name], new_module)
|
|
except KeyError as err:
|
|
raise KeyError("Missing key {0} in file {1}".format(err, self.path))
|
|
|
|
self.add_default_route(module_name)
|
|
|
|
def add_default_route(self, module_name):
|
|
"""Add a default route to send messages to IoT Hub"""
|
|
new_route_name = "{0}ToIoTHub".format(module_name)
|
|
new_route = "FROM /messages/modules/{0}/outputs/* INTO $upstream".format(module_name)
|
|
|
|
try:
|
|
self.utility.nested_set(self._get_module_content(), ["$edgeHub", "properties.desired", "routes", new_route_name], new_route)
|
|
except KeyError as err:
|
|
raise KeyError("Missing key {0} in file {1}".format(err, self.path))
|
|
|
|
def get_user_modules(self) -> dict:
|
|
"""Get user modules from deployment manifest"""
|
|
try:
|
|
if not self.has_user_modules() and self.is_layered_deployment_schema():
|
|
return {}
|
|
return self.get_desired_property("$edgeAgent", "modules")
|
|
except KeyError as err:
|
|
raise KeyError("Missing key {0} in file {1}".format(err, self.path))
|
|
|
|
def get_system_modules(self):
|
|
"""Get system modules from deployment manifest"""
|
|
try:
|
|
if not self.has_system_modules() and self.is_layered_deployment_schema():
|
|
return {}
|
|
return self.get_desired_property("$edgeAgent", "systemModules")
|
|
except KeyError as err:
|
|
raise KeyError("Missing key {0} in file {1}".format(err, self.path))
|
|
|
|
def get_all_modules(self):
|
|
all_modules = {}
|
|
all_modules.update(self.get_user_modules())
|
|
all_modules.update(self.get_system_modules())
|
|
|
|
return all_modules
|
|
|
|
def get_desired_property(self, module, prop):
|
|
return self._get_module_content_split()[module]["properties"]["desired"][prop]
|
|
|
|
def get_template_schema_ver(self):
|
|
return self.json.get("$schema-template", "")
|
|
|
|
def has_system_modules(self):
|
|
"""Check if system modules exist in the deployment manifest"""
|
|
return self.has_desired_property("$edgeAgent", "systemModules")
|
|
|
|
def has_user_modules(self):
|
|
"""Check if user modules exist in the deployment manifest"""
|
|
return self.has_desired_property("$edgeAgent", "modules")
|
|
|
|
def has_desired_property(self, module, prop) -> bool:
|
|
return prop in self._get_module_content_split().get(module, {}).get('properties', {}).get('desired', {})
|
|
|
|
def convert_create_options(self):
|
|
modules = self.get_all_modules()
|
|
for module_name, module_info in modules.items():
|
|
if "settings" in module_info and "createOptions" in module_info["settings"]:
|
|
create_options = module_info["settings"]["createOptions"]
|
|
if not isinstance(create_options, str):
|
|
# Stringify and minify the createOptions from dict format
|
|
create_options = json.dumps(create_options, separators=(',', ':'))
|
|
|
|
options = [m for m in re.finditer("(.|[\r\n]){{1,{0}}}".format(TWIN_VALUE_MAX_SIZE), create_options)]
|
|
if len(options) > TWIN_VALUE_MAX_CHUNKS:
|
|
raise ValueError("Size of createOptions of {0} is too big. The maximum size of createOptions is 4K".format(module_name))
|
|
|
|
for i, option in enumerate(options):
|
|
if i == 0:
|
|
module_info["settings"]["createOptions"] = option.group()
|
|
else:
|
|
module_info["settings"]["createOptions{0:0=2d}".format(i)] = option.group()
|
|
|
|
def expand_image_placeholders(self, replacements):
|
|
modules = self.get_all_modules()
|
|
for module_name, module_info in modules.items():
|
|
if module_name in replacements:
|
|
self.utility.nested_set(module_info, ["settings", "image"], replacements[module_name])
|
|
|
|
def del_key(self, keys):
|
|
self.utility.del_key(self.json, keys)
|
|
|
|
def dump(self, path=None):
|
|
"""Dump the JSON to the disk"""
|
|
if path is None:
|
|
path = self.path
|
|
|
|
with open(path, "w") as deployment_manifest:
|
|
json.dump(self.json, deployment_manifest, indent=2)
|
|
|
|
def validate_deployment_template(self):
|
|
validation_success = True
|
|
try:
|
|
template_schema = json.loads(urlopen(Constants.deployment_template_schema_url).read().decode("utf-8"))
|
|
validation_success = self._validate_json_schema(template_schema, self.json, "Deployment template")
|
|
except Exception as ex: # Ignore other non shcema validation errors
|
|
self.output.info("Unexpected error during deployment template schema validation, skip schema validation. Error:%s" % ex)
|
|
|
|
return validation_success
|
|
|
|
def validate_deployment_manifest(self):
|
|
validation_success = True
|
|
try:
|
|
if not self.is_layered_deployment_schema():
|
|
validation_success = self._validate_deployment_manifest_schema()
|
|
validation_success &= self._validate_create_options()
|
|
except Exception as err:
|
|
self.output.info("Unexpected error during deployment manifest validation, skip the validation. Error:%s" % err)
|
|
|
|
return validation_success
|
|
|
|
def is_layered_deployment_schema(self):
|
|
return "content" in self.json
|
|
|
|
@staticmethod
|
|
def get_image_placeholder(module_name, is_debug=False):
|
|
return "${{MODULES.{0}}}".format(module_name + ".debug" if is_debug else module_name)
|
|
|
|
def _get_module_content_split(self):
|
|
return DeploymentManifest.dot_to_json(self._get_module_content())
|
|
|
|
def _get_module_content(self):
|
|
if "modulesContent" in self.json:
|
|
return self.json["modulesContent"]
|
|
elif "moduleContent" in self.json:
|
|
return self.json["moduleContent"]
|
|
elif self.is_layered_deployment_schema() and "modulesContent" in self.json["content"]:
|
|
return self.json["content"]["modulesContent"]
|
|
else:
|
|
raise KeyError("modulesContent")
|
|
|
|
@staticmethod
|
|
def dot_to_json(a):
|
|
output = {}
|
|
for key, value in a.items():
|
|
if '.' not in key:
|
|
output[key] = DeploymentManifest.dot_to_json(value)
|
|
continue
|
|
path = key.split('.')
|
|
target = functools.reduce(lambda d, k: d.setdefault(k, {}), path[:-1], output)
|
|
target[path[-1]] = value
|
|
return output
|
|
|
|
# Carefully check upper/lower case of the output when using this function
|
|
def _validate_json_schema(self, schema_object, json_object, schema_type):
|
|
validation_success = True
|
|
try:
|
|
self.output.info("Validating schema of %s." % schema_type.lower())
|
|
validator_class = jsonschema.validators.validator_for(schema_object)
|
|
validator = validator_class(schema_object)
|
|
validation_errors = validator.iter_errors(self.json)
|
|
for error in validation_errors:
|
|
validation_success = False
|
|
self.output.warning("%s schema error: %s. Property path:%s" % (schema_type, error.message, "->".join(error.path)))
|
|
if validation_success:
|
|
self.output.info("%s schema validation passed." % schema_type)
|
|
else:
|
|
self.output.warning("%s schema validation failed. Please see previous logs for more details" % schema_type)
|
|
|
|
except jsonschema.exceptions.SchemaError as schemaErr:
|
|
self.output.info("Errors found in %s schema, skip schema validation. Error:%s" % (schema_type, schemaErr.message))
|
|
except Exception as ex: # Ignore other non schema validation errors
|
|
self.output.info("Unexpected error during %s schema validation, skip schema validation. Error:%s" % (schema_type, ex))
|
|
|
|
return validation_success
|
|
|
|
def _validate_deployment_manifest_schema(self):
|
|
validation_success = True
|
|
try:
|
|
deployment_schema = json.loads(urlopen(Constants.deployment_manifest_schema_url).read())
|
|
validation_success = self._validate_json_schema(deployment_schema, self.json, "Deployment manifest")
|
|
except Exception as ex: # Ignore other non schema validation errors
|
|
self.output.info("Unexpected error during deployment manifest schema validation, skip schema validation. Error:%s" % ex)
|
|
|
|
return validation_success
|
|
|
|
# Call _validate_deployment_manifest_schema first. This function assumes createOptions are strings.
|
|
def _validate_create_options(self):
|
|
self.output.info("Start validating createOptions for all modules.")
|
|
modules = self.get_all_modules()
|
|
validation_success = True
|
|
for module_name, module_info in modules.items():
|
|
current_module_validation_success = True
|
|
try:
|
|
self.output.info("Validating createOptions for module %s" % module_name)
|
|
if "settings" in module_info and "createOptions" in module_info["settings"]:
|
|
current_module_validation_success = self._validate_create_options_for_module(module_name, module_info)
|
|
if current_module_validation_success:
|
|
self.output.info("createOptions of module %s validation passed" % module_name)
|
|
else:
|
|
self.output.info("No settings or createOptions property found in module %s. Skip createOptions validation." % module_name)
|
|
except Exception as ex:
|
|
self.output.info("Unexpected error occurs when validating createOptions for module %s: %s" % (module_name, ex))
|
|
finally:
|
|
validation_success &= current_module_validation_success
|
|
if (validation_success):
|
|
self.output.info("Validation for all createOptions passed.")
|
|
else:
|
|
self.output.warning("Errors found during createOptions validation. Please check the logs for details.")
|
|
return validation_success
|
|
|
|
def _validate_create_options_for_module(self, module_name, module_info):
|
|
validation_success = True
|
|
validation_success &= self._validate_create_options_lengh(module_name, module_info)
|
|
validation_success &= self._validate_create_options_format(module_name, module_info)
|
|
return validation_success
|
|
|
|
def _validate_create_options_lengh(self, module_name, module_info):
|
|
validation_success = True
|
|
create_options_value = module_info["settings"]["createOptions"]
|
|
if len(str(create_options_value)) > TWIN_VALUE_MAX_SIZE:
|
|
validation_success = False
|
|
self.output.warning("Length of createOptions in module %s exceeds %d" % (module_name, TWIN_VALUE_MAX_SIZE))
|
|
# Merge additional create options
|
|
for i in range(1, TWIN_VALUE_MAX_CHUNKS):
|
|
property_name = "createOptions0%d" % i
|
|
if property_name in module_info["settings"]:
|
|
create_options_value = module_info["settings"][property_name]
|
|
if len(str(create_options_value)) > TWIN_VALUE_MAX_SIZE:
|
|
validation_success = False
|
|
self.output.warning("Length of %s in module %s exceeds %d" % (property_name, module_name, TWIN_VALUE_MAX_SIZE))
|
|
else:
|
|
break
|
|
return validation_success
|
|
|
|
def _validate_create_options_format(self, module_name, module_info):
|
|
validation_success = True
|
|
create_options_string = self._merge_create_options(module_name, module_info)
|
|
if not create_options_string.startswith('{'):
|
|
validation_success = False
|
|
self.output.warning("createOptions of module %s should be an object" % module_name)
|
|
else:
|
|
try:
|
|
json.loads(create_options_string)
|
|
except ValueError as err:
|
|
validation_success = False
|
|
self.output.warning("createOptions of module %s is not a valid JSON string. Error: %s" % (module_name, err))
|
|
return validation_success
|
|
|
|
def _merge_create_options(self, module_name, module_info):
|
|
create_options = []
|
|
create_options.append(str(module_info["settings"]["createOptions"]))
|
|
# Merge additional create options
|
|
for i in range(1, TWIN_VALUE_MAX_CHUNKS):
|
|
property_name = "createOptions0%d" % i
|
|
if property_name in module_info["settings"]:
|
|
create_options.append(str(module_info["settings"][property_name]))
|
|
else:
|
|
break
|
|
return "".join(create_options).strip()
|