Merged PR 1223: Add tools for creating manifests.

Also add tools for testing/debugging.

Related work items: #503
This commit is contained in:
Christopher Weimer 2020-03-16 20:05:16 +00:00
Родитель a591d3e3b3
Коммит 99d64e3016
13 изменённых файлов: 2961 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,6 @@
ID=2
C:\Cerberus\tools\manifest_tools\cfm_manifest.xml
C:\Cerberus\tools\manifest_tools\cfm_manifest2.xml
Output=C:\Cerberus\tools\manifest_tools\cfm.bin
Key=C:\Cerberus\core\testing\keys\rsapriv.pem
KeySize=256

Просмотреть файл

@ -0,0 +1,289 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT license.
"""
from __future__ import print_function
from __future__ import unicode_literals
import os
import sys
import ctypes
import binascii
import manifest_types
import manifest_common
import manifest_parser
from Crypto.PublicKey import RSA
CFM_CONFIG_FILENAME = "cfm_generator.config"
class cfm_components_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('components_count', ctypes.c_ubyte),
('reserved', ctypes.c_ubyte)]
class cfm_component_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('fw_count', ctypes.c_ubyte),
('reserved', ctypes.c_ubyte),
('component_id', ctypes.c_uint)]
class cfm_fw_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('img_count', ctypes.c_ubyte),
('reserved', ctypes.c_ubyte),
('version_length', ctypes.c_ushort),
('reserved2', ctypes.c_ushort)]
class cfm_img_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('digest_length', ctypes.c_ushort),
('flags', ctypes.c_ushort),
('reserved', ctypes.c_ushort)]
def generate_cfm(cfm_header_instance, components_header_instance, components_list):
"""
Create a CFM object from all the different CFM components
:param cfm_header_instance: Instance of a manifest header
:param components_header_instance: Instance of a CFM components header
:param components_list: List of components to be included in CFM
:return Instance of a CFM object
"""
components_size = components_header_instance.length - ctypes.sizeof(components_header_instance)
class cfm(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('manifest_header', manifest_common.manifest_header),
('components_header', cfm_components_header),
('components', ctypes.c_ubyte * components_size)]
offset = 0
components_buf = (ctypes.c_ubyte * components_size)()
for component in components_list:
ctypes.memmove(ctypes.addressof(components_buf) + offset, ctypes.addressof(component),
component.header.length)
offset += component.header.length
return cfm(cfm_header_instance, components_header_instance, components_buf)
def generate_img_instance(filename, device_id, version, img):
"""
Create a signed image instance
:param filename: XML filename
:param device_id: ID of component currently being processed
:param version: Firmware version
:param img: Signed image
:return Signed image instance
"""
if "failure_action" not in img:
raise ValueError("Failed to generate CFM: Component {0}, FW {1} has signed image with no failure action - {2}".format(device_id, version, filename))
if "digest" not in img:
raise ValueError("Failed to generate CFM: Component {0}, FW {1} has signed image with no digest - {2}".format(device_id, version, filename))
flags = int(img["failure_action"])
digest = img["digest"]
header = cfm_img_header(0, len(digest), flags, 0)
digest_arr = (ctypes.c_ubyte * len(digest)).from_buffer_copy(digest)
class cfm_signed_img(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('header', cfm_img_header),
('digest', ctypes.c_ubyte * len(digest))]
return cfm_signed_img(header, digest_arr)
def generate_signed_images(filename, device_id, version, img_list):
"""
Create a list of signed image struct instances from image list
:param filename: XML filename
:param device_id: ID of component currently being processed
:param version: Firmware version
:param img_list: List of signed images
:return List of signed image struct instances
"""
signed_list = []
for img in img_list:
img_instance = generate_img_instance(filename, device_id, version, img)
img_instance.header.length = ctypes.sizeof(img_instance)
signed_list.append(img_instance)
return signed_list
def generate_fw_instance(filename, device_id, version, imgs):
"""
Create a firmware struct instance
:param filename: XML filename
:param device_id: ID of component currently being processed
:param version: Firmware version
:param imgs: List of signed image struct instances
:return A firmware struct instance
"""
imgs_size = 0
for img in imgs:
imgs_size = imgs_size + ctypes.sizeof(img)
offset = 0
imgs_buf = (ctypes.c_ubyte * imgs_size)()
for img in imgs:
ctypes.memmove(ctypes.addressof(imgs_buf) + offset, ctypes.addressof(img),
img.header.length)
offset += img.header.length
header = cfm_fw_header(0, len(imgs), 0, len(version), 0)
num_alignment = len(version) % 4
num_alignment = 0 if (num_alignment == 0) else (4 - num_alignment)
alignment_buf = (ctypes.c_ubyte * num_alignment)()
ctypes.memset(alignment_buf, 0, num_alignment)
class cfm_component_fw(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('header', cfm_fw_header),
('version_id', ctypes.c_char * len(version)),
('alignment', ctypes.c_ubyte * num_alignment),
('signed_imgs', ctypes.c_ubyte * imgs_size)]
fw = cfm_component_fw(header, version.encode('utf-8'), alignment_buf, imgs_buf)
return fw
def generate_firmware_list(filename, device_id, fw_list):
"""
Create a list of firmware struct instances from firmware list
:param filename: XML filename
:param device_id: ID of component currently being processed
:param fw_list: List of supported component firmware
:return List of firmware struct instances
"""
firmware_list = []
for fw in fw_list:
if "version" not in fw:
raise KeyError("Failed to generate CFM: Component {0} firmware has no version - {1}".format(device_id, filename))
if "signed_imgs" not in fw:
raise KeyError("Failed to generate CFM: Component {0} firmware has no signed images - {1}".format(device_id, filename))
version = fw["version"]
imgs = generate_signed_images(filename, device_id, version, fw["signed_imgs"])
fw_instance = generate_fw_instance(filename, device_id, version, imgs)
fw_instance.header.length = ctypes.sizeof(fw_instance)
firmware_list.append(fw_instance)
return firmware_list
def generate_components_list(xml_list):
"""
Create a list of components from parsed XML list
:param xml_list: List of parsed XML of components to be included in CFM
:return list of component struct instances
"""
component_list = []
for filename, xml in xml_list.items():
if "device_id" not in xml:
raise KeyError("Failed to generate CFM: XML has no device id - {0}".format(filename))
if "fw_list" not in xml:
raise KeyError("Failed to generate CFM: XML has no firmware defined - {0}".format(filename))
device_id = int(xml["device_id"])
firmware_list = generate_firmware_list(filename, device_id, xml["fw_list"])
fw_size = 0
for fw in firmware_list:
fw_size += ctypes.sizeof(fw)
offset = 0
fw_buf = (ctypes.c_ubyte * fw_size)()
for fw in firmware_list:
ctypes.memmove(ctypes.addressof(fw_buf) + offset, ctypes.addressof(fw),
fw.header.length)
offset += fw.header.length
header = cfm_component_header(0, len(firmware_list), 0, device_id)
class cfm_component(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('header', cfm_component_header),
('firmware', ctypes.c_ubyte * fw_size)]
component = cfm_component(header, fw_buf)
component.header.length = ctypes.sizeof(cfm_component)
component_list.append(component)
return component_list
def generate_components_header(components_list):
"""
Create a components header from a components list
:param components_list: List of components to be included in CFM
:return Components header instance
"""
size = ctypes.sizeof(cfm_components_header)
for component in components_list:
size = size + ctypes.sizeof(component)
return cfm_components_header(size, len(components_list), 0)
#*************************************** Start of Script ***************************************
if len(sys.argv) < 2:
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), CFM_CONFIG_FILENAME)
else:
path = os.path.abspath(sys.argv[1])
processed_xml, sign, key_size, key, id, output = manifest_common.load_xmls (path, None,
manifest_types.CFM)
components_list = generate_components_list(processed_xml)
components_header = generate_components_header(components_list)
cfm_header_instance = manifest_common.generate_manifest_header(id, key_size, manifest_types.CFM)
cfm_header_instance.length = ctypes.sizeof(cfm_header_instance) + components_header.length + \
cfm_header_instance.sig_length
cfm = generate_cfm(cfm_header_instance, components_header, components_list)
manifest_common.write_manifest(sign, cfm, key, output,
cfm_header_instance.length - cfm_header_instance.sig_length,
cfm_header_instance.sig_length)
print ("Completed CFM generation: {0}".format(output))

Просмотреть файл

@ -0,0 +1,202 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT license.
"""
from __future__ import print_function
from __future__ import unicode_literals
import ctypes
import sys
import os
import traceback
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
import manifest_types
import manifest_parser
PFM_MAGIC_NUM = int("0x504D", 16)
CFM_MAGIC_NUM = int("0xA592", 16)
PCD_MAGIC_NUM = int("0x8EBC", 16)
class manifest_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('magic', ctypes.c_ushort),
('id', ctypes.c_uint),
('sig_length', ctypes.c_ushort),
('reserved', ctypes.c_ushort)]
def load_config (config_file):
"""
Load configuration options from file
:param config_file: Path for a text file containing config options
:return Parsed configuration
"""
config = {}
config["id"] = 0
config["xml_list"] = []
config["prv_key_path"] = ""
config["output"] = ""
config["key_size"] = ""
with open (config_file, 'r') as fh:
data = fh.readlines ()
if not data:
print("Failed to load configuration")
sys.exit(1)
for string in data:
string = string.replace("\n", "")
string = string.replace("\r", "")
if string.startswith("ID"):
config["id"] = string.split("=")[-1].strip()
elif string.startswith("Output"):
config["output"] = string.split("=")[-1].strip()
elif string.startswith("KeySize"):
config["key_size"] = string.split("=")[-1].strip()
elif string.startswith("Key"):
config["prv_key_path"] = string.split("=")[-1].strip()
else:
config["xml_list"].append(string)
return config
def load_key(key_size, prv_key_path):
"""
Load private RSA key to sign manifest from provided path. If no valid key can be imported,
key size will be what is provided. Otherwise, key size will be size of key imported
:param key_size: Provided key_size
:param prv_key_path: Provided private key path
:return <Sign manifest or not> <key_size> <Key to use for signing>
"""
if prv_key_path:
try:
key = RSA.importKey(open(prv_key_path).read())
except Exception:
print("Unsigned PFM will be generated, provided RSA key could not be imported: {0}".format(prv_key_path))
traceback.print_exc ()
return False, key_size, None
return True, int((key.size() + 1)/8), key
else:
print("No RSA private key provided in config, unsigned PFM will be generated.")
return False, key_size, None
def generate_manifest_header(manifest_id, key_size, manifest_type):
"""
Create a manifest header
:param manifest_id: Manifest id
:param key_size: Size of RSA key
:param manifest_type: Manifest type
:return Instance of a manifest header
"""
if manifest_type == manifest_types.PFM:
magic_num = PFM_MAGIC_NUM
elif manifest_type == manifest_types.CFM:
magic_num = CFM_MAGIC_NUM
elif manifest_type == manifest_types.PCD:
magic_num = PCD_MAGIC_NUM
else:
raise ValueError ("Unknown manifest type: {0}".format (manifest_type))
sig_len = 0 if key_size is None else key_size
return manifest_header(0, magic_num, int(manifest_id), sig_len, 0)
def load_xmls (config_filename, max_num_xmls, xml_type):
"""
Load XMLs listed in config file
:param config_filename: Path to config file
:param max_num_xmls: Maximum number of XMLs that can be loaded, set to NULL if no limit
:param xml_type: Type of XML
:return list of XML elements, boolean indicating whether to sign output or not, key size,
key to use for signing, output ID, output filename
"""
config = load_config (config_filename)
key_size = None
prv_key_path = None
sign = False
if "key_size" in config and config["key_size"]:
key_size = int (config["key_size"])
if "prv_key_path" in config and config["prv_key_path"]:
prv_key_path = config["prv_key_path"]
if max_num_xmls and (len (config["xml_list"]) > max_num_xmls):
raise RuntimeError ("Too many XML files provided: {0}".format (len (config["xml_list"])))
sign, key_size, key = load_key (key_size, prv_key_path)
processed_xml = {}
for xml in config["xml_list"]:
parsed_xml = manifest_parser.load_and_process_xml (xml, xml_type)
if parsed_xml is None:
raise RuntimeError ("Failed to parse XML: {0}".format(xml))
processed_xml.update({xml:parsed_xml})
return processed_xml, sign, key_size, key, config["id"], config["output"]
def write_manifest(sign, manifest, key, output_filename, manifest_length, sig_length):
"""
Write manifest generated to provided path.
:param sign: Boolean indicating whether to sign manifest or not
:param manifest: Generated manifest to write
:param key: Key to use for signing
:param output_filename: Name to use for output file
:param manifest_length: The manifest length
:param sig_length: Signature length
"""
if ctypes.sizeof(manifest) > (65535 - sig_length):
raise ValueError("Manifest is too large - {0}".format(ctypes.sizeof(manifest)))
if ctypes.sizeof(manifest) != manifest_length:
raise ValueError("Manifest doesn't match output size")
if manifest.manifest_header.length != manifest_length + sig_length:
raise ValueError("Manifest length is not set correctly")
if sign:
h = SHA256.new(manifest)
signer = PKCS1_v1_5.new(key)
signature = signer.sign(h)
signature = (ctypes.c_ubyte * sig_length).from_buffer_copy(signature)
manifest_buf = (ctypes.c_char * (manifest_length + sig_length))()
ctypes.memmove(ctypes.byref(manifest_buf, manifest_length), ctypes.addressof(signature),
sig_length)
else:
manifest_buf = (ctypes.c_char * (manifest_length))()
out_dir = os.path.dirname(os.path.abspath(output_filename))
if not os.path.exists(out_dir):
os.makedirs(out_dir)
with open(output_filename, 'wb') as fh:
ctypes.memmove(ctypes.byref(manifest_buf), ctypes.addressof(manifest), manifest_length)
fh.write(manifest_buf)

Просмотреть файл

@ -0,0 +1,412 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT license.
"""
from __future__ import print_function
from __future__ import unicode_literals
import re
import binascii
import traceback
import xml.etree.ElementTree as et
import manifest_types
XML_ID_ATTRIB = "id"
XML_VERSION_ATTRIB = "version"
XML_PLATFORM_ATTRIB = "platform"
XML_LEVEL_ATTRIB = "level"
XML_FW_TAG = "Firmware"
XML_DIGEST_TAG = "Digest"
XML_FAILURE_ACTION_TAG = "FailureAction"
XML_RW_TAG = "ReadWrite"
XML_REGION_TAG = "Region"
XML_IMG_TAG = "SignedImage"
XML_START_ADDR_TAG = "StartAddr"
XML_END_ADDR_TAG = "EndAddr"
XML_PB_KEY_TAG = "PublicKey"
XML_SIG_TAG = "Signature"
XML_VALIDATE_TAG = "ValidateOnBoot"
XML_VERSION_ADDR_TAG = "VersionAddr"
XML_UNUSED_BYTE_TAG = "UnusedByte"
XML_VERSION_TAG = "Version"
XML_ROT_TAG = "RoT"
XML_PORTS_TAG = "Ports"
XML_PORT_TAG = "Port"
XML_INTERFACE_TAG = "Interface"
XML_ADDRESS_TAG = "Address"
XML_BMC_ADDRESS_TAG = "BMCAddress"
XML_EID_TAG = "EID"
XML_CPLD_TAG = "CPLD"
XML_CHANNEL_TAG = "Channel"
XML_COMPONENTS_TAG = "Components"
XML_COMPONENT_TAG = "Component"
XML_DEVICETYPE_TAG = "DeviceType"
XML_BUS_TAG = "Bus"
XML_I2CMODE_TAG = "I2CMode"
XML_PWRCTRL_TAG = "PwrCtrl"
XML_REGISTER_TAG = "Register"
XML_MASK_TAG = "Mask"
XML_MUXES_TAG = "Muxes"
XML_MUX_TAG = "Mux"
XML_POLICY_TAG = "Policy"
XML_ACTIVE_TAG = "Active"
XML_DEFAULT_FAILURE_ACTION_TAG = "DefaultFailureAction"
XML_SPIFREQ_TAG = "SPIFreq"
XML_IS_PA_ROT_TAG = "IsPARoT"
def xml_extract_attrib (root, attrib_name, string, required=True):
attrib = root.attrib.get(attrib_name)
if not attrib:
if required:
print ("Missing {0} attribute in PCD".format (attrib_name))
return None
if string:
attrib.encode("utf8")
return attrib.strip ()
def xml_find_single_tag (root, tag_name, required=True):
tag = root.findall (tag_name)
if not tag:
if required:
print ("Missing {0} tag in PCD".format (tag_name))
return None
elif len (tag) > 1:
print ("Too many {0} tags in PCD".format (tag_name))
return None
return tag[0]
def xml_extract_single_value (root, requests):
result = {}
for name, tag_name in requests.items ():
tag = root.findall (tag_name)
if not tag:
print ("Missing {0} tag in PCD".format (tag_name))
return None
elif len (tag) > 1:
print ("Too many {0} tags in PCD".format (tag_name))
return None
result.update ({name:tag[0].text.strip ()})
return result
def process_pfm(root):
xml = {}
def process_region(root, version_id):
region = {}
addr = root.findall(XML_START_ADDR_TAG)
if not addr or len(addr) > 1:
print("Invalid number of StartAddr tags in Firmware: {0}".format(version_id))
return None
region["start"] = addr[0].text.strip()
addr = root.findall(XML_END_ADDR_TAG)
if not addr or len(addr) > 1:
print("Invalid number of EndAddr tags in Firmware: {0}".format(version_id))
return None
region["end"] = addr[0].text.strip()
return region
version_id = root.attrib.get(XML_VERSION_ATTRIB)
if version_id is None:
print("No Firmware version ID provided")
return None
platform_id = root.attrib.get(XML_PLATFORM_ATTRIB)
if platform_id is None:
print("No Platform ID provided")
return None
xml["version_id"] = version_id.strip().encode("utf8")
xml["platform_id"] = platform_id.strip().encode("utf8")
version = root.findall(XML_VERSION_ADDR_TAG)
if not version or len(version) > 1:
print("Invalid number of VersionAddr tags in Firmware: {0}".format(xml["version_id"]))
return None
xml["version_addr"] = version[0].text.strip()
unused_byte = root.findall(XML_UNUSED_BYTE_TAG)
if len(unused_byte) > 1:
print("Invalid number of UnusedByte tags in Firmware: {0}".format(xml["version_id"]))
return None
if unused_byte:
xml["unused_byte"] = unused_byte[0].text.strip()
else:
xml["unused_byte"] = "0xff"
xml["rw_regions"] = []
for rw in root.findall(XML_RW_TAG):
for region in rw.findall(XML_REGION_TAG):
processed_region = process_region(region, xml["version_id"])
if processed_region is None:
return None
xml["rw_regions"].append(processed_region)
xml["signed_imgs"] = []
for img in root.findall(XML_IMG_TAG):
image = {}
image["regions"] = []
pbkey = img.findall(XML_PB_KEY_TAG)
if not pbkey or len (pbkey) > 1:
print("Invalid number of PublicKey tags in SignedImage, Firmware {0}".format (xml["version_id"]))
return None
image["pbkey"] = pbkey[0].text.strip()
sig = img.findall(XML_SIG_TAG)
if not sig or len (sig) > 1:
print("Invalid number of Signature tags in SignedImage, Firmware {0}".format (xml["version_id"]))
return None
image["signature"] = binascii.a2b_hex(re.sub("\s", "", sig[0].text.strip()))
for region in img.findall(XML_REGION_TAG):
processed_region = process_region(region, xml["version_id"])
if processed_region is None:
return None
image["regions"].append(processed_region)
if not image["regions"]:
print("No regions found for SignedImage, Firmware: {0}".format(xml["version_id"]))
return None
prop = img.findall(XML_VALIDATE_TAG)
if not prop or len (prop) > 1:
print("Invalid number of ValidateOnBoot tags in SignedImage, Firmware {0}".format (xml["version_id"]))
return None
image["validate"] = prop[0].text.strip()
xml["signed_imgs"].append(image)
if not xml["signed_imgs"]:
print("No signed images found for Firmware: {0}".format(xml["version_id"]))
return None
return xml
def process_cfm(root):
xml = {}
xml["fw_list"] = []
device_id = root.attrib.get(XML_ID_ATTRIB)
if device_id is None:
print("No Device ID provided")
return None
xml["device_id"] = device_id.strip()
for fw in root.findall(XML_FW_TAG):
firmware = {}
firmware["signed_imgs"] = []
version = fw.attrib.get(XML_VERSION_ATTRIB)
if version is None:
print("No Firmware version provided for Device: {0}".format(xml["device_id"]))
return None
firmware["version"] = version.strip()
for img in fw.findall(XML_IMG_TAG):
image = {}
digest = img.findall(XML_DIGEST_TAG)
if not digest or len(digest) > 1:
print("Invalid number of Digest tags in Device: {0}, Firmware: {1}".format(xml["device_id"], firmware["version"]))
return None
image["digest"] = binascii.a2b_hex(re.sub("\s", "", digest[0].text.strip()))
action = img.findall(XML_FAILURE_ACTION_TAG)
if not action or len(action) > 1:
print("Invalid number of FailureAction tags in Device: {0}, Firmware: {1}".format(xml["device_id"], firmware["version"]))
return None
image["failure_action"] = action[0].text.strip()
firmware["signed_imgs"].append(image)
if not firmware["signed_imgs"]:
print("No signed images found for Device: {0}, Firmware: {1}".format(xml["device_id"], firmware["version"]))
return None
xml["fw_list"].append(firmware)
if not xml["fw_list"]:
print("No firmware found for Device: {0}".format(xml["device_id"]))
return None
return xml
def process_pcd (root):
xml = {}
result = xml_extract_attrib (root, XML_PLATFORM_ATTRIB, True)
if result is None:
return None
xml.update ({"platform_id":result})
result = xml_extract_single_value (root, {"version": XML_VERSION_TAG})
if result is None:
return None
xml.update (result)
rot = xml_find_single_tag (root, XML_ROT_TAG)
if rot is None:
return None
xml["rot"] = {}
ports = xml_find_single_tag (rot, XML_PORTS_TAG, False)
if ports is not None:
xml["rot"]["ports"] = {}
for port in ports.findall (XML_PORT_TAG):
port_id = xml_extract_attrib (port, XML_ID_ATTRIB, False)
if port_id is None:
return None
result = xml_extract_single_value (port, {"spifreq": XML_SPIFREQ_TAG})
if result is None:
return None
xml["rot"]["ports"].update({port_id:result})
interface = xml_find_single_tag (rot, XML_INTERFACE_TAG)
if interface is None:
return None
result = xml_extract_single_value (rot, {"is_pa_rot": XML_IS_PA_ROT_TAG})
if result is None:
return None
xml["rot"].update (result)
result = xml_extract_single_value (interface, {"address": XML_ADDRESS_TAG,
"bmc_address": XML_BMC_ADDRESS_TAG})
if result is None:
return None
xml["rot"]["interface"] = result
cpld = xml_find_single_tag (root, XML_CPLD_TAG)
if cpld is None:
return None
result = xml_extract_single_value (cpld, {"address": XML_ADDRESS_TAG,
"channel": XML_CHANNEL_TAG})
if result is None:
return None
xml["cpld"] = result
components = xml_find_single_tag (root, XML_COMPONENTS_TAG, False)
if components is not None:
xml["components"] = []
for component in components.findall(XML_COMPONENT_TAG):
result = xml_extract_single_value (component, {"devicetype": XML_DEVICETYPE_TAG,
"bus": XML_BUS_TAG, "address": XML_ADDRESS_TAG, "i2cmode": XML_I2CMODE_TAG,
"eid": XML_EID_TAG})
if result is None:
return None
pwrctl = xml_find_single_tag (component, XML_PWRCTRL_TAG)
if pwrctl is None:
return None
pwrctl_result = xml_extract_single_value (pwrctl, {"register": XML_REGISTER_TAG,
"mask": XML_MASK_TAG})
if pwrctl_result is None:
return None
result.update ({"powerctrl": pwrctl_result})
muxes = xml_find_single_tag (component, XML_MUXES_TAG, False)
if muxes is not None:
muxes_list = {}
for mux in muxes.findall (XML_MUX_TAG):
mux_result = xml_extract_single_value (mux, {"address": XML_ADDRESS_TAG,
"channel": XML_CHANNEL_TAG})
if mux_result is None:
return None
mux_level = xml_extract_attrib (mux, XML_LEVEL_ATTRIB, False)
if mux_level is None:
return None
muxes_list.update ({mux_level: mux_result})
result.update ({"muxes": muxes_list})
xml["components"].append(result)
policy = xml_find_single_tag (root, XML_POLICY_TAG)
if policy is None:
return None
result = xml_extract_single_value (policy, {"active": XML_ACTIVE_TAG,
"defaultfailureaction": XML_DEFAULT_FAILURE_ACTION_TAG})
if result is None:
return None
xml["policy"] = result
return xml
def load_and_process_xml (xml_file, xml_type):
try:
root = et.parse(xml_file).getroot()
if xml_type is manifest_types.PFM:
return process_pfm(root)
elif xml_type is manifest_types.CFM:
return process_cfm(root)
elif xml_type is manifest_types.PCD:
return process_pcd(root)
else:
print("Unknown XML type: {0}".format(xml_type))
return None
except Exception:
print ("load_and_process_xml Exception")
traceback.print_exc ()
return None

Просмотреть файл

@ -0,0 +1,8 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT license.
"""
PFM = 0
CFM = 1
PCD = 2

Просмотреть файл

@ -0,0 +1,5 @@
ID=1
C:\Cerberus\tools\manifest_tools\pcd.xml
Output=C:\Cerberus\tools\manifest_tools\pcd.bin
Key=C:\Cerberus\core\testing\keys\rsapriv.pem
KeySize=256

Просмотреть файл

@ -0,0 +1,375 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT license.
"""
from __future__ import print_function
from __future__ import unicode_literals
import os
import sys
import ctypes
import binascii
import manifest_types
import manifest_common
import manifest_parser
from Crypto.PublicKey import RSA
PCD_CONFIG_FILENAME = "pcd_generator.config"
class pcd_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('header_len', ctypes.c_ushort),
('format_id', ctypes.c_ubyte),
('reserved1', ctypes.c_ubyte),
('reserved2', ctypes.c_ubyte),
('reserved3', ctypes.c_ubyte)]
class pcd_rot_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('header_len', ctypes.c_ushort),
('format_id', ctypes.c_ubyte),
('num_ports', ctypes.c_ubyte),
('addr', ctypes.c_ubyte),
('bmc_i2c_addr', ctypes.c_ubyte),
('cpld_addr', ctypes.c_ubyte),
('cpld_channel', ctypes.c_ubyte),
('active', ctypes.c_ubyte),
('default_failure_action', ctypes.c_ubyte),
('flags', ctypes.c_ubyte),
('reserved1', ctypes.c_ubyte),
('reserved2', ctypes.c_ubyte),
('reserved3', ctypes.c_ubyte)]
class pcd_port_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('header_len', ctypes.c_ushort),
('format_id', ctypes.c_ubyte),
('id', ctypes.c_ubyte),
('reserved1', ctypes.c_ubyte),
('reserved2', ctypes.c_ubyte),
('frequency', ctypes.c_ulong)]
class pcd_components_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('header_len', ctypes.c_ushort),
('format_id', ctypes.c_ubyte),
('num_components', ctypes.c_ubyte),
('reserved1', ctypes.c_ubyte),
('reserved2', ctypes.c_ubyte)]
class pcd_component_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('header_len', ctypes.c_ushort),
('format_id', ctypes.c_ubyte),
('num_muxes', ctypes.c_ubyte),
('addr', ctypes.c_ubyte),
('channel', ctypes.c_ubyte),
('flags', ctypes.c_ubyte),
('eid', ctypes.c_ubyte),
('power_ctrl_reg', ctypes.c_ubyte),
('power_ctrl_mask', ctypes.c_ubyte),
('id', ctypes.c_ulong)]
class pcd_mux_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('header_len', ctypes.c_ushort),
('format_id', ctypes.c_ubyte),
('addr', ctypes.c_ubyte),
('channel', ctypes.c_ubyte),
('mux_level', ctypes.c_ubyte),]
class pcd_platform_id_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('header_len', ctypes.c_ushort),
('format_id', ctypes.c_ubyte),
('id_len', ctypes.c_ubyte),
('reserved1', ctypes.c_ubyte),
('reserved2', ctypes.c_ubyte)]
def get_key_from_dict (dictionary, key, group, required=True):
if key not in dictionary:
if required:
raise KeyError ("Failed to generate PCD: {0} missing {1}".format (group, key))
else:
return None
else:
return dictionary[key]
def generate_ports_buf (xml_ports):
"""
Create a buffer of pcd_port struct instances from parsed XML list
:param xml_ports: List of parsed XML of ports to be included in PCD
:return Ports buffer, number of ports
"""
if xml_ports is None or len (xml_ports) < 1:
return None, 0
ports_buf = (ctypes.c_ubyte * (ctypes.sizeof (pcd_port_header) * len (xml_ports))) ()
ports_len = 0
for id, port in xml_ports.items ():
freq = int (get_key_from_dict (port, "spifreq", "Port"))
port_body = pcd_port_header (ctypes.sizeof (pcd_port_header),
ctypes.sizeof (pcd_port_header), 0, int (id), 0, 0, freq)
ctypes.memmove (ctypes.addressof (ports_buf) + ports_len, ctypes.addressof (port_body),
ctypes.sizeof (pcd_port_header))
ports_len += ctypes.sizeof (pcd_port_header)
return ports_buf, len (xml_ports)
def generate_rot_header (xml_rot, xml_rot_interface, xml_cpld, xml_policy, num_ports):
"""
Create a PCD RoT header
:param xml_rot: List of parsed XML of RoT to be included in PCD
:param xml_rot_interface: List of parsed XML of RoT interface to be included in PCD
:param xml_cpld: List of parsed XML of CPLD to be included in PCD
:param xml_policy: List of parsed XML of policy to be included in PCD
:param num_ports: Number of RoT ports
:return PCD RoT header instance
"""
is_pa_rot = bool (get_key_from_dict (xml_rot, "is_pa_rot", "PA RoT Flag"))
addr = int (get_key_from_dict (xml_rot_interface, "address", "RoT interface"), base=16)
bmc_i2c_addr = int (get_key_from_dict (xml_rot_interface, "bmc_address", "RoT interface"),
base=16)
cpld_addr = int (get_key_from_dict (xml_cpld, "address", "CPLD"), base=16)
cpld_channel = int (get_key_from_dict (xml_cpld, "channel", "CPLD"))
active = bool (get_key_from_dict (xml_policy, "active", "Policy"))
failure_action = int (get_key_from_dict (xml_policy, "defaultfailureaction", "Policy"))
ports_buf_len = ctypes.sizeof (pcd_port_header) * num_ports
flags = 0
flags |= (is_pa_rot << 0)
return pcd_rot_header (ports_buf_len + ctypes.sizeof (pcd_rot_header),
ctypes.sizeof (pcd_rot_header), 0, num_ports, addr, bmc_i2c_addr, cpld_addr, cpld_channel,
active, failure_action, flags)
def generate_muxes_buf (xml_muxes):
"""
Create a buffer of pcd_mux struct instances from parsed XML list
:param xml_muxes: List of parsed XML of muxes to be included in PCD
:return Muxes buffer, number of muxes
"""
if xml_muxes is None or len (xml_muxes) < 1:
return None, 0
muxes_buf = (ctypes.c_ubyte * (ctypes.sizeof (pcd_mux_header) * len (xml_muxes))) ()
muxes_len = 0
for level, mux in xml_muxes.items ():
addr = int (get_key_from_dict (mux, "address", "Mux"), base=16)
channel = int (get_key_from_dict (mux, "channel", "Mux"))
mux_body = pcd_mux_header (ctypes.sizeof (pcd_mux_header), ctypes.sizeof (pcd_mux_header),
0, addr, channel, int (level))
ctypes.memmove (ctypes.addressof (muxes_buf) + muxes_len, ctypes.addressof (mux_body),
ctypes.sizeof (pcd_mux_header))
muxes_len += ctypes.sizeof (pcd_mux_header)
return muxes_buf, len (xml_muxes)
def generate_components_buf (xml_components):
"""
Create a buffer of component section struct instances from parsed XML list
:param xml_components: List of parsed XML of components to be included in PCD
:return Components buffer, number of components
"""
if xml_components is None or len (xml_components) < 1:
return None, 0, 0
components_list = []
components_len = 0
for component in xml_components:
device_type = int (get_key_from_dict (component, "devicetype", "Component"), base=16)
bus = int (get_key_from_dict (component, "bus", "Component"))
address = int (get_key_from_dict (component, "address", "Component"), base=16)
i2c_mode = int (get_key_from_dict (component, "i2cmode", "Component"), base=10)
eid = int (get_key_from_dict (component, "eid", "Component"), base=16)
powerctrl = get_key_from_dict (component, "powerctrl", "Component")
powerctrl_reg = int (get_key_from_dict (powerctrl, "register", "Component PowerCtrl"),
base=16)
powerctrl_mask = int (get_key_from_dict (powerctrl, "mask", "Component PowerCtrl"), base=16)
flags = 0
flags |= (i2c_mode << 0)
muxes = get_key_from_dict (component, "muxes", "Component", required=False)
muxes_buf, num_muxes = generate_muxes_buf (muxes)
if muxes_buf is None:
muxes_buf = (ctypes.c_ubyte * 0)()
num_muxes = 0
class pcd_component (ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('component_header', pcd_component_header),
('muxes', ctypes.c_ubyte * ctypes.sizeof (muxes_buf))]
component_header = pcd_component_header (ctypes.sizeof (pcd_component),
ctypes.sizeof (pcd_component_header), 0, num_muxes, address, bus, flags, eid,
powerctrl_reg, powerctrl_mask, device_type)
component_section = pcd_component (component_header, muxes_buf)
components_list.append (component_section)
components_len += ctypes.sizeof (component_section)
components_buf = (ctypes.c_ubyte * components_len) ()
offset = 0
for component in components_list:
ctypes.memmove (ctypes.addressof (components_buf) + offset, ctypes.addressof (component),
ctypes.sizeof (component))
offset += ctypes.sizeof (component)
return components_buf, len (xml_components)
def generate_components_header (components_len, num_components):
"""
Create a PCD components header
:param components_len: Length of components buffer
:param num_components: Number of components in PCD
:return PCD components header instance
"""
return pcd_components_header (components_len + ctypes.sizeof (pcd_components_header),
ctypes.sizeof (pcd_components_header), 0, num_components, 0, 0)
def generate_platform_id_header (id_len):
"""
Create a PCD platform ID header
:param id_len: Length of platform ID buffer
:return PCD platform ID header instance
"""
return pcd_platform_id_header (id_len + ctypes.sizeof (pcd_platform_id_header),
ctypes.sizeof (pcd_platform_id_header), 0, id_len, 0, 0)
def generate_pcd_header (pcd_length):
"""
Create a PCD header
:param pcd_length: Length of PCD
:return PCD header instance
"""
return pcd_header (pcd_length + ctypes.sizeof (pcd_header), ctypes.sizeof (pcd_header),
0, 0, 0, 0)
def generate_pcd (manifest_header, header, rot_header, ports, components_header,
components, platform_id_header, platform_id):
"""
Create a PCD object from all the different PCD sections
:param manifest_header: Instance of a manifest header
:param header: Instance of a PCD header
:param rot_header: Instance of a PCD RoT header
:param ports: Ports section buffer
:param components_header: Instance of a PCD components header
:param components: Components section buffer
:param platform_id_header: Instance of a PCD platform ID header
:param platform_id: PCD platform ID
:return Instance of a PCD object
"""
ports_len = ctypes.sizeof (pcd_port_header) * rot_header.num_ports
components_len = components_header.length - components_header.header_len
components_buf = (ctypes.c_ubyte * components_len) ()
ctypes.memmove (ctypes.addressof (components_buf), ctypes.addressof (components),
components_len)
class pcd (ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('manifest_header', manifest_common.manifest_header),
('header', pcd_header),
('rot_header', pcd_rot_header),
('ports', ctypes.c_ubyte * ports_len),
('components_header', pcd_components_header),
('components', ctypes.c_ubyte * components_len),
('platform_id_header', pcd_platform_id_header),
('platform_id', ctypes.c_char * len (platform_id))]
return pcd (manifest_header, header, rot_header, ports, components_header, components_buf,
platform_id_header, platform_id.encode('utf-8'))
#*************************************** Start of Script ***************************************
if len (sys.argv) < 2:
path = os.path.join (os.path.dirname (os.path.abspath (__file__)), PCD_CONFIG_FILENAME)
else:
path = os.path.abspath (sys.argv[1])
processed_xml, sign, key_size, key, id, output = manifest_common.load_xmls (path, 1,
manifest_types.PCD)
processed_xml = list(processed_xml.items())[0][1]
ports = (ctypes.c_ubyte * 0)()
num_ports = 0
if "ports" in processed_xml["rot"]:
ports, num_ports = generate_ports_buf (processed_xml["rot"]["ports"])
components = (ctypes.c_ubyte * 0)()
num_components = 0
if "components" in processed_xml:
components, num_components = generate_components_buf (
processed_xml["components"])
components_header = generate_components_header (ctypes.sizeof (components), num_components)
rot_header = generate_rot_header (processed_xml["rot"], processed_xml["rot"]["interface"],
processed_xml["cpld"], processed_xml["policy"], num_ports)
platform_id_header = generate_platform_id_header (len (processed_xml["platform_id"]))
header = generate_pcd_header (rot_header.length + components_header.length + \
platform_id_header.length)
manifest_header = manifest_common.generate_manifest_header (id, key_size,
manifest_types.PCD)
manifest_header.length = ctypes.sizeof (manifest_header) + header.length + \
manifest_header.sig_length
pcd = generate_pcd (manifest_header, header, rot_header, ports, components_header,
components, platform_id_header, processed_xml["platform_id"])
manifest_common.write_manifest (sign, pcd, key, output, manifest_header.length - \
manifest_header.sig_length, manifest_header.sig_length)
print ("Completed PCD generation: {0}".format(output))

Просмотреть файл

@ -0,0 +1,5 @@
ID=1
C:\Cerberus\tools\manifest_tools\pfm_manifest.xml
Output=C:\Cerberus\tools\manifest_tools\pfm.bin
Key=C:\Cerberus\core\testing\keys\rsapriv.pem
KeySize=256

Просмотреть файл

@ -0,0 +1,515 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT license.
"""
from __future__ import print_function
from __future__ import unicode_literals
import os
import sys
import ctypes
import binascii
import argparse
import manifest_types
import manifest_common
from Crypto.PublicKey import RSA
PFM_CONFIG_FILENAME = "pfm_generator.config"
VALIDATE_ON_BOOT_FLAG = 1
# Table which indexes public keys used in this PFM file
pbkey_table = []
# List of all FW version strings
version_list = []
# These ctype structs resemble the format the PFM consumer utilizes
class pfm_fw_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('version_length', ctypes.c_ubyte),
('blank_byte', ctypes.c_ubyte),
('version_addr', ctypes.c_uint),
('img_count', ctypes.c_ubyte),
('rw_count', ctypes.c_ubyte),
('reserved', ctypes.c_ushort)]
class pfm_allowable_fw_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('fw_count', ctypes.c_ubyte),
('reserved', ctypes.c_ubyte)]
class pfm_public_key_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('key_length', ctypes.c_ushort),
('key_exponent', ctypes.c_uint),
('id', ctypes.c_ubyte),
('reserved', ctypes.c_ubyte * 3)]
class pfm_key_manifest_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('key_count', ctypes.c_ubyte),
('reserved', ctypes.c_ubyte)]
class pfm_flash_region(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('start_addr', ctypes.c_uint),
('end_addr', ctypes.c_uint)]
class pfm_image_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('flags', ctypes.c_ushort),
('key_id', ctypes.c_ubyte),
('region_count', ctypes.c_ubyte),
('sig_length', ctypes.c_ushort)]
class pfm_platform_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('length', ctypes.c_ushort),
('id_length', ctypes.c_ubyte),
('reserved', ctypes.c_ubyte)]
def process_pbkey(xml_list):
"""
Iterate through all public keys used in this PFM and create a public key table.
:param xml_list: List of parsed XMLs for different FW to be included in PFM
"""
for _, xml in xml_list.items():
if "signed_imgs" in xml:
for img in xml["signed_imgs"]:
if "pbkey" in img:
pbkey = RSA.importKey(img["pbkey"])
img.pop("pbkey")
if pbkey not in pbkey_table:
pbkey_table.append(pbkey)
img["pbkey_index"] = len(pbkey_table) - 1
else:
img["pbkey_index"] = pbkey_table.index(pbkey)
def generate_pfm(pfm_header_instance, allowable_fw_header_instance, allowable_fw_list,
keys_header_instance, keys_list, platform_header_instance):
"""
Create a PFM object from all the different PFM components
:param pfm_header_instance: Instance of a PFM header
:param allowable_fw_header_instance: Instance of a PFM allowable FW header
:param allowable_fw_list: List of allowable FWs to be included in PFM
:param keys_header_instance: Instance of a PFM key manifest header
:param keys_list: List of public keys to be included in PFM
:param platform_header_instance: Instance of a PFM platform header
:return Instance of a PFM object
"""
fw_size = allowable_fw_header_instance.length - ctypes.sizeof(allowable_fw_header_instance)
keys_size = keys_header_instance.length - ctypes.sizeof(keys_header_instance)
platform_size = ctypes.sizeof(platform_header_instance)
class pfm(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('manifest_header', manifest_common.manifest_header),
('allowable_fw_header', pfm_allowable_fw_header),
('allowable_fw', ctypes.c_ubyte * fw_size),
('key_manifest_header', pfm_key_manifest_header),
('pb_keys', ctypes.c_ubyte * keys_size),
('platform', ctypes.c_ubyte * platform_size)]
offset = 0
fw_buf = (ctypes.c_ubyte * fw_size)()
for fw in allowable_fw_list:
ctypes.memmove(ctypes.addressof(fw_buf) + offset, ctypes.addressof(fw), fw.header.length)
offset += fw.header.length
offset = 0
keys_buf = (ctypes.c_ubyte * keys_size)()
for key in keys_list:
ctypes.memmove(ctypes.addressof(keys_buf) + offset, ctypes.addressof(key),
key.header.length)
offset += key.header.length
platform_buf = (ctypes.c_ubyte * platform_size)()
ctypes.memmove (ctypes.addressof(platform_buf), ctypes.addressof(platform_header_instance),
platform_size)
return pfm(pfm_header_instance, allowable_fw_header_instance, fw_buf, keys_header_instance,
keys_buf, platform_buf)
def generate_flash_region(filename, region_list):
"""
Create a list of flash region struct instances from region list
:param region_list: List of flash regions
:return List of flash region struct instances
"""
flash_list = []
for region in region_list:
if "start" in region and "end" in region:
start_addr = int(region["start"], 16)
end_addr = int(region["end"], 16)
if end_addr <= start_addr:
raise ValueError("Failed to generate PFM: Image has an invalid flash region - {0}"
.format(filename))
flash_list.append(pfm_flash_region(start_addr, end_addr))
return flash_list
def generate_img_instance(filename, img, regions, signature):
"""
Create a list of signed image instances
:param region_list: List of flash regions
:return List of signed image instances
"""
if "validate" not in img:
raise KeyError("Failed to generate PFM: Image has no validate flag - {0}".format(filename))
if "pbkey_index" not in img:
raise KeyError("Failed to generate PFM: Image has no public key index - {0}".format(
filename))
flags = 1 if img["validate"] == "true" else 0
header = pfm_image_header(0, flags, img["pbkey_index"], len(regions), len(signature))
sig_arr = (ctypes.c_ubyte * len(signature)).from_buffer_copy(signature)
region_arr = (pfm_flash_region * len(regions))(*regions)
class pfm_signed_img(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('header', pfm_image_header),
('img_signature', ctypes.c_ubyte * len(signature)),
('flash_regions', pfm_flash_region * len(regions))]
return pfm_signed_img(header, sig_arr, region_arr)
def generate_signed_image(filename, img_list):
"""
Create a list of signed image struct instances from image list
:param img_list: List of allowable firmware images
:return List of signed image struct instances
"""
signed_list = []
for img in img_list:
if "regions" not in img:
raise KeyError("Failed to generate PFM: Image has no regions list - {0}".format(
filename))
if "signature" not in img:
raise KeyError("Failed to generate PFM: Image has no signature - {0}".format(filename))
regions = generate_flash_region(filename, img["regions"])
img_instance = generate_img_instance(filename, img, regions, img["signature"])
img_instance.header.length = ctypes.sizeof(img_instance)
signed_list.append(img_instance)
return signed_list
def generate_allowable_fw_list(xml_list):
"""
Create a list of allowable firmware from parsed XML list
:param xml_list: List of parsed XML of firmware to be included in PFM
:return list of allowable firmware struct instances
"""
fw_list = []
for filename, xml in xml_list.items():
if "version_id" not in xml:
raise KeyError("Failed to generate PFM: XML has no version id - {0}".format(filename))
if "version_addr" not in xml:
raise KeyError("Failed to generate PFM: XML has no version address - {0}".format(
filename))
if "signed_imgs" not in xml:
raise KeyError("Failed to generate PFM: XML has no signed images list - {0}".format(
filename))
if "unused_byte" not in xml:
raise KeyError("Failed to generate PFM: Unused byte value not known - {0}".format(
filename))
if "rw_regions" not in xml:
xml["rw_regions"]= []
all_regions = []
flags = 0
version_addr = int(xml["version_addr"], 16)
unused_byte = int(xml["unused_byte"], 16)
version_addr_valid = False
for version in version_list:
if version == xml["version_id"]:
raise KeyError("Failed to generate PFM: Duplicate version ID - {0}".format (
xml["version_id"]))
elif version.startswith(xml["version_id"]) or xml["version_id"].startswith(version):
raise ValueError("Failed to generate PFM: Ambiguous version ID - {0}, {1}".format (
xml["version_id"], version))
version_list.append (xml["version_id"])
if unused_byte > 255:
raise ValueError("Unused byte value ({0}) is not valid - {1}".format(
format(unused_byte, '02x'), filename))
header = pfm_fw_header(0, len(xml["version_id"]), unused_byte, version_addr,
len(xml["signed_imgs"]), len(xml["rw_regions"]), 0)
rw_regions_list = generate_flash_region(filename, xml["rw_regions"])
rw_regions_arr = (pfm_flash_region * len(rw_regions_list))(*rw_regions_list)
signed_imgs_list = generate_signed_image(filename, xml["signed_imgs"])
for region in rw_regions_list:
if region.start_addr & 0xFFFF:
raise ValueError("Failed to generate PFM: RW Start address (0x{0}) is not 64kB aligned - {1}"
.format(format(region.start_addr, '08x'), filename))
if (region.end_addr & 0xFFFF) != 0xFFFF:
raise ValueError("Failed to generate PFM: RW End address (0x{0}) is not 64kB aligned - {1}"
.format(format(region.end_addr, '08x'), filename))
all_regions.append([region.start_addr, region.end_addr])
for img in signed_imgs_list:
flags |= (img.header.flags & VALIDATE_ON_BOOT_FLAG)
for region in img.flash_regions:
all_regions.append([region.start_addr, region.end_addr])
if (img.header.flags & VALIDATE_ON_BOOT_FLAG) == VALIDATE_ON_BOOT_FLAG:
if ((version_addr + header.version_length - 1) <= region.end_addr and
version_addr >= region.start_addr):
version_addr_valid = True
if not version_addr_valid:
raise ValueError("Failed to generate PFM: Version address not in a signed image with validate on boot flag set - {0}"
.format(filename))
if flags == 0:
raise ValueError("Failed to generate PFM: XML has no signed images with validate on boot flag set - {0}"
.format(filename))
all_regions.sort()
for i_region, region in enumerate(all_regions):
for i_comp in range(i_region + 1, len(all_regions)):
if region[1] >= all_regions[i_comp][0]:
raise ValueError("Failed to generate PFM: XML has overlapping regions - {0}"
.format(filename))
signed_imgs_size = 0
for img in signed_imgs_list:
signed_imgs_size = signed_imgs_size + ctypes.sizeof(img)
offset = 0
signed_imgs_buf = (ctypes.c_ubyte * signed_imgs_size)()
for img in signed_imgs_list:
ctypes.memmove(ctypes.addressof(signed_imgs_buf) + offset, ctypes.addressof(img),
img.header.length)
offset += img.header.length
num_alignment = len(xml["version_id"]) % 4
num_alignment = 0 if (num_alignment == 0) else (4 - num_alignment)
alignment_buf = (ctypes.c_ubyte * num_alignment)()
ctypes.memset(alignment_buf, 0, num_alignment)
class pfm_allowable_fw(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('header', pfm_fw_header),
('version_id', ctypes.c_char * len(xml["version_id"])),
('alignment', ctypes.c_ubyte * num_alignment),
('rw_regions', pfm_flash_region * len(rw_regions_list)),
('signed_imgs', ctypes.c_ubyte * signed_imgs_size)]
fw = pfm_allowable_fw(header, xml["version_id"], alignment_buf, rw_regions_arr,
signed_imgs_buf)
fw.header.length = ctypes.sizeof(pfm_allowable_fw)
fw_list.append(fw)
return fw_list
def generate_allowable_fw_header(fw_list):
"""
Create an allowable FW header from an allowable FW list
:param fw_list: List of allowable FW to be included in PFM
:return Allowable FW header instance
"""
size = ctypes.sizeof(pfm_allowable_fw_header)
for fw in fw_list:
size = size + ctypes.sizeof(fw)
return pfm_allowable_fw_header(size, len(fw_list), 0)
def generate_key_instance(header, modulus):
"""
Create a key instance from header and modulus
:param header: PFM public key header
:param modulus: List of public key modulus digits
:return Public key instance
"""
arr = (ctypes.c_ubyte * len(modulus)).from_buffer_copy(modulus)
class pfm_public_key(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('header', pfm_public_key_header),
('public_key_modulus', ctypes.c_ubyte * len(modulus))]
return pfm_public_key(header, arr)
def generate_pbkey_list():
"""
Create a public key list from the public key table
:return List of public key instances
"""
keys_list = []
for pub_key in pbkey_table:
reserved_buf = (ctypes.c_ubyte * 3)()
ctypes.memset(reserved_buf, 0, 3)
mod_fmt = "%%0%dx" % (pub_key.n.bit_length() // 4)
modulus = binascii.a2b_hex(mod_fmt % pub_key.n)
header = pfm_public_key_header(0, len(modulus), pub_key.e, pbkey_table.index(pub_key),
reserved_buf)
key = generate_key_instance(header, modulus)
key.header.length = ctypes.sizeof(key)
keys_list.append(key)
return keys_list
def generate_pbkey_header(keys_list):
"""
Create a public key manifest header from list of public key objects
:param keys_list: List of public key objects
:return Public key manifest header
"""
size = ctypes.sizeof(pfm_key_manifest_header)
for key in keys_list:
size = size + ctypes.sizeof(key)
return pfm_key_manifest_header(size, len(keys_list), 0)
def get_platform_id(xml_list):
"""
Determine the platform ID for the manifest
:param xml_list: List of parse XML files with version information.
:return The platform ID
"""
platform_id = None
for filename, xml in xml_list.items():
if "platform_id" not in xml:
raise KeyError("Failed to generate PFM: XML has no platform id - {0}".format(filename))
if platform_id:
if platform_id != xml["platform_id"]:
raise ValueError("Failed to generate PFM: Version platform ids don't match - ({0}, {1})"
.format(platform_id, xml["platform_id"]))
else:
platform_id = xml["platform_id"]
return platform_id
def generate_platform_info(platform_id):
"""
Create the platform information section of the PFM.
:param platform_id: ID for the platform
:return Platform manifest section
"""
header = pfm_platform_header(0, len(platform_id), 0)
num_alignment = len(platform_id) % 4
num_alignment = 0 if (num_alignment == 0) else (4 - num_alignment)
alignment_buf = (ctypes.c_ubyte * num_alignment)()
ctypes.memset(alignment_buf, 0, num_alignment)
class pfm_platform_id(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('header', pfm_platform_header),
('platform_id', ctypes.c_char * len(platform_id)),
('alignment', ctypes.c_ubyte * num_alignment)]
platform = pfm_platform_id(header, platform_id, alignment_buf)
platform.header.length = ctypes.sizeof(pfm_platform_id)
return platform
#*************************************** Start of Script ***************************************
default_config = os.path.join(os.path.dirname(os.path.abspath(__file__)), PFM_CONFIG_FILENAME)
parser = argparse.ArgumentParser(description = 'Create a PFM')
parser.add_argument('config', nargs = '?', default = default_config,
help = 'Path to configurtaion file')
parser.add_argument('--bypass', action = 'store_true', help = 'Create a bypass mode PFM')
args = parser.parse_args()
processed_xml, sign, key_size, key, pfm_id, output = manifest_common.load_xmls (args.config, None,
manifest_types.PFM)
process_pbkey(processed_xml)
if (args.bypass):
allowable_fw_list = []
else:
allowable_fw_list = generate_allowable_fw_list(processed_xml)
allowable_fw_header = generate_allowable_fw_header(allowable_fw_list)
if (args.bypass):
keys_list = []
else:
keys_list = generate_pbkey_list()
keys_header = generate_pbkey_header(keys_list)
platform_id = get_platform_id(processed_xml)
platform_header = generate_platform_info(platform_id)
pfm_header_instance = manifest_common.generate_manifest_header(pfm_id, key_size, manifest_types.PFM)
pfm_header_instance.length = ctypes.sizeof(pfm_header_instance) + keys_header.length + \
allowable_fw_header.length + pfm_header_instance.sig_length + platform_header.header.length
pfm = generate_pfm(pfm_header_instance, allowable_fw_header, allowable_fw_list, keys_header,
keys_list, platform_header)
manifest_common.write_manifest(sign, pfm, key, output,
pfm_header_instance.length - pfm_header_instance.sig_length,
pfm_header_instance.sig_length)
print("Completed PFM generation: {0}".format(output))

Просмотреть файл

@ -0,0 +1,592 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "..\..\core\manifest\manifest_format.h"
#include "..\..\core\manifest\pfm\pfm_format.h"
#include "..\..\core\manifest\cfm\cfm_format.h"
#include "..\..\core\manifest\pcd\pcd_format.h"
#include "..\..\core\manifest\pcd\pcd.h"
enum {
MANIFEST_TYPE_PFM = 0,
MANIFEST_TYPE_CFM,
MANIFEST_TYPE_PCD
};
size_t visualize_pfm (uint8_t *pfm)
{
struct pfm_allowable_firmware_header *allowable_fw_header =
(struct pfm_allowable_firmware_header*) pfm;
uint8_t* pointer = ((uint8_t*)allowable_fw_header) +
sizeof(struct pfm_allowable_firmware_header);
printf ("pfm_allowable_firmware_header\n");
printf ("{");
printf ("\tlength: %i\n", allowable_fw_header->length);
printf ("\tfw_count: %i\n", allowable_fw_header->fw_count);
printf ("\treserved: %i\n", allowable_fw_header->reserved);
printf ("}\n");
printf ("Allowable Firmware\n");
printf ("[\n");
for (int i = 0; i < allowable_fw_header->fw_count; ++i)
{
struct pfm_firmware_header *fw_header = (struct pfm_firmware_header*)pointer;
char *version_id = (char*)malloc (fw_header->version_length + 1);
if(version_id == NULL)
{
printf ("Failed to allocate version id buffer.\n");
return -1;
}
strncpy(version_id, (char*)((uint8_t*)fw_header + sizeof(struct pfm_firmware_header)),
fw_header->version_length);
version_id[fw_header->version_length] = '\0';
int alignment = fw_header->version_length % 4;
alignment = (alignment == 0) ? 0 : (4 - alignment);
pointer = (uint8_t*)fw_header + sizeof(struct pfm_firmware_header) +
fw_header->version_length + alignment;
printf ("\t{\n");
printf ("\t\tpfm_firmware_header\n");
printf ("\t\t{\n");
printf ("\t\t\tlength: %i\n", fw_header->length);
printf ("\t\t\tversion_length: %i\n", fw_header->version_length);
printf ("\t\t\tblank_byte: 0x%02x\n", fw_header->blank_byte);
printf ("\t\t\tversion_addr: 0x%x\n", fw_header->version_addr);
printf ("\t\t\timg_count: %i\n", fw_header->img_count);
printf ("\t\t\trw_count: %i\n", fw_header->rw_count);
printf ("\t\t\treserved: %i\n", fw_header->reserved);
printf ("\t\t}\n");
printf ("\t\tversion_id: %s\n", version_id);
free (version_id);
printf ("\t\tRW Regions\n");
printf ("\t\t[\n");
for (int j = 0; j < fw_header->rw_count; ++j)
{
uint32_t start_addr = *((uint32_t*)pointer);
uint32_t end_addr = *((uint32_t*)pointer + 1);
pointer = (uint8_t*)((uint32_t*)pointer + 2);
printf ("\t\t\t{\n");
printf ("\t\t\t\tpfm_flash_region\n");
printf ("\t\t\t\t{\n");
printf ("\t\t\t\t\tstart_addr: 0x%x\n", start_addr);
printf ("\t\t\t\t\tend_addr: 0x%x\n", end_addr);
printf ("\t\t\t\t}\n");
printf ("\t\t\t}\n");
}
printf ("\t\t]\n");
for (int j = 0; j < fw_header->img_count; ++j)
{
struct pfm_image_header *img_header = (struct pfm_image_header*) pointer;
uint8_t *sig = malloc (img_header->sig_length);
if(sig == NULL)
{
printf ("Failed to allocate signature buffer.\n");
return -1;
}
memcpy(sig, img_header + 1, img_header->sig_length);
pointer = (uint8_t*)img_header + sizeof(struct pfm_image_header) +
img_header->sig_length;
printf ("\t\tpfm_image_header\n");
printf ("\t\t{\n");
printf ("\t\t\tlength: %i\n", img_header->length);
printf ("\t\t\tflags: %i\n", img_header->flags);
printf ("\t\t\tkey_id: %i\n", img_header->key_id);
printf ("\t\t\tregion_count: %i\n", img_header->region_count);
printf ("\t\t\tsig_length: %i\n", img_header->sig_length);
printf ("\t\t}\n");
printf ("\t\tSignature:");
for (int k = 0; k < img_header->sig_length; ++k)
{
if ((k % 32) == 0)
{
printf ("\n\t\t\t");
}
printf ("%02x", sig[k]);
}
printf ("\n");
free (sig);
printf ("\t\tRO Regions\n");
printf ("\t\t[\n");
for (int k = 0; k < img_header->region_count; ++k)
{
printf ("\t\t\t{\n");
uint32_t start_addr = *((uint32_t*)pointer);
uint32_t end_addr = *((uint32_t*)pointer + 1);
pointer = (uint8_t*)((uint32_t*)pointer + 2);
printf ("\t\t\t\tpfm_flash_region\n");
printf ("\t\t\t\t{\n");
printf ("\t\t\t\t\tstart_addr: 0x%x\n", start_addr);
printf ("\t\t\t\t\tend_addr: 0x%x\n", end_addr);
printf ("\t\t\t\t}\n");
printf ("\t\t\t}\n");
}
printf ("\t\t]\n");
}
printf ("\t}\n");
}
printf ("]\n");
struct pfm_key_manifest_header *key_manifest_header = (struct pfm_key_manifest_header *)pointer;
pointer = (uint8_t*)pointer + sizeof(struct pfm_key_manifest_header);
printf ("pfm_key_manifest_header\n");
printf ("{\n");
printf ("\tlength: %i\n", key_manifest_header->length);
printf ("\tkey_count: %i\n", key_manifest_header->key_count);
printf ("\treserved: %i\n", key_manifest_header->reserved);
printf ("}\n");
printf ("Keys\n");
printf ("[\n");
for (int i = 0; i < key_manifest_header->key_count; ++i)
{
struct pfm_public_key_header *key_header = (struct pfm_public_key_header*)pointer;
printf ("\t{\n");
printf ("\t\tpfm_key_manifest_header\n");
printf ("\t\tlength: %i\n", key_header->length);
printf ("\t\tkey_length: %i\n", key_header->key_length);
printf ("\t\tkey_exponent: 0x%x\n", key_header->key_exponent);
printf ("\t\tid: %i\n", key_header->id);
printf ("\t\treserved[0]: %i\n", key_header->reserved[0]);
printf ("\t\treserved[1]: %i\n", key_header->reserved[1]);
printf ("\t\treserved[2]: %i\n", key_header->reserved[2]);
uint8_t *modulus = (uint8_t*)malloc (key_header->key_length);
if(modulus == NULL)
{
printf ("Failed to allocate modulus buffer.\n");
return -1;
}
memcpy(modulus, key_header + 1, key_header->key_length);
pointer = (uint8_t*)key_header + sizeof(struct pfm_public_key_header) +
key_header->key_length;
printf ("\t\tmodulus:");
for (int j = 0; j < key_header->key_length; ++j)
{
if ((j % 32) == 0)
{
printf ("\n\t\t\t");
}
printf ("%02x", modulus[j]);
}
printf ("\n");
free (modulus);
printf ("\n");
printf ("\t}\n");
}
printf ("]\n");
struct pfm_platform_header *platform_header = (struct pfm_platform_header *)pointer;
pointer = (uint8_t*)pointer + sizeof(struct pfm_platform_header);
printf ("pfm_platform_header\n");
printf ("{\n");
printf ("\tlength: %i\n", platform_header->length);
printf ("\tid_length: %i\n", platform_header->id_length);
printf ("\treserved: %i\n", platform_header->reserved);
printf ("}\n");
char *platform_id = malloc (platform_header->id_length + 1);
if(platform_id == NULL)
{
printf ("Failed to allocate platform id buffer.\n");
return -1;
}
memcpy(platform_id, pointer, platform_header->id_length);
platform_id[platform_header->id_length] = '\0';
int alignment = platform_header->id_length % 4;
alignment = (alignment == 0) ? 0 : (4 - alignment);
pointer += platform_header->id_length + alignment;
printf ("platform_id: %s\n", platform_id);
free (platform_id);
return (pointer - pfm);
}
size_t visualize_cfm (uint8_t *cfm)
{
struct cfm_components_header *components_header = (struct cfm_components_header*)cfm;
uint8_t* pointer = ((uint8_t*)components_header) + sizeof(struct cfm_components_header);
printf ("cfm_components_header\n");
printf ("{");
printf ("\tlength: %i\n", components_header->length);
printf ("\tcomponents_count: %i\n", components_header->components_count);
printf ("\treserved: %i\n", components_header->reserved);
printf ("}\n");
printf ("Components\n");
printf ("[\n");
for (int i = 0; i < components_header->components_count; ++i)
{
struct cfm_component_header *component_header = (struct cfm_component_header*)pointer;
printf ("\t{\n");
printf ("\t\tcfm_component_header\n");
printf ("\t\t{\n");
printf ("\t\t\tlength: %i\n", component_header->length);
printf ("\t\t\tfw_count: %i\n", component_header->fw_count);
printf ("\t\t\treserved: %i\n", component_header->reserved);
printf ("\t\t\tcomponent_id: 0x%x\n", component_header->component_id);
printf ("\t\t}\n");
pointer += sizeof(struct cfm_component_header);
printf ("\t\tFirmware\n");
printf ("\t\t[\n");
for (int j = 0; j < component_header->fw_count; ++j)
{
struct cfm_fw_header *fw_header = (struct cfm_fw_header*) pointer;
char *version_id = (char*)malloc (fw_header->version_length + 1);
if(version_id == NULL)
{
printf ("Failed to allocate version ID buffer.\n");
return -1;
}
strncpy (version_id, (char*)((uint8_t*)fw_header + sizeof(struct cfm_fw_header)),
fw_header->version_length);
version_id[fw_header->version_length] = '\0';
int alignment = fw_header->version_length % 4;
alignment = (alignment == 0) ? 0 : (4 - alignment);
pointer += sizeof(struct cfm_fw_header) + fw_header->version_length + alignment;
printf ("\t\t\t{\n");
printf ("\t\t\t\tcfm_fw_header\n");
printf ("\t\t\t\t{\n");
printf ("\t\t\t\t\tlength: %i\n", fw_header->length);
printf ("\t\t\t\t\timg_count: %i\n", fw_header->img_count);
printf ("\t\t\t\t\treserved: %i\n", fw_header->reserved);
printf ("\t\t\t\t\tversion_length: %i\n", fw_header->version_length);
printf ("\t\t\t\t\treserved2: %i\n", fw_header->reserved2);
printf ("\t\t\t\t}\n");
printf ("\t\t\t\tversion_id: %s\n", version_id);
printf ("\t\t\t\tSigned Images\n");
printf ("\t\t\t\t[\n");
for (int k = 0; k < fw_header->img_count; ++k)
{
struct cfm_img_header *img_header = (struct cfm_img_header*) pointer;
uint8_t *digest = malloc (img_header->digest_length);
if(digest == NULL)
{
printf ("Failed to allocate digest buffer.\n");
return -1;
}
memcpy(digest, (char*)((uint8_t*)img_header + sizeof(struct cfm_img_header)),
img_header->digest_length);
pointer += sizeof(struct cfm_img_header) + img_header->digest_length;
printf ("\t\t\t\t\t{\n");
printf ("\t\t\t\t\t\tcfm_img_header\n");
printf ("\t\t\t\t\t\t{\n");
printf ("\t\t\t\t\t\t\tlength: %i\n", img_header->length);
printf ("\t\t\t\t\t\t\tdigest_length: %i\n", img_header->digest_length);
printf ("\t\t\t\t\t\t\tflags: %i\n", img_header->flags);
printf ("\t\t\t\t\t\t\treserved: %i\n", img_header->reserved);
printf ("\t\t\t\t\t\t}\n");
printf ("\t\t\t\t\t\tDigest:");
for (int l = 0; l < img_header->digest_length; ++l)
{
if ((l % 32) == 0)
{
printf ("\n\t\t\t\t\t\t\t");
}
printf ("%02x", digest[l]);
}
printf ("\n");
printf ("\t\t\t\t\t}\n");
free (digest);
}
printf ("\t\t\t\t]\n");
printf ("\t\t\t}\n");
}
printf ("\t\t]\n");
printf ("\t}\n");
}
printf ("]\n");
return (pointer - cfm);
}
size_t visualize_pcd (uint8_t *pcd)
{
struct pcd_header *pcd_header = (struct pcd_header*) pcd;
uint8_t* pointer = pcd + sizeof(struct pcd_header);
printf ("PCD\n");
printf ("{\n");
printf ("\tpcd_header\n");
printf ("\t{\n");
printf ("\t\tlength: %i\n", pcd_header->length);
printf ("\t\theader_len: %i\n", pcd_header->header_len);
printf ("\t\tformat_id: %i\n", pcd_header->format_id);
printf ("\t\treserved1: %i\n", pcd_header->reserved1);
printf ("\t\treserved2: %i\n", pcd_header->reserved2);
printf ("\t\treserved3: %i\n", pcd_header->reserved3);
printf ("\t}\n");
struct pcd_rot_header *rot_header = (struct pcd_rot_header*) pointer;
pointer += sizeof(struct pcd_rot_header);
printf ("\tpcd_rot_header\n");
printf ("\t{\n");
printf ("\t\tlength: %i\n", rot_header->length);
printf ("\t\theader_len: %i\n", rot_header->header_len);
printf ("\t\tformat_id: %i\n", rot_header->format_id);
printf ("\t\tnum_ports: %i\n", rot_header->num_ports);
printf ("\t\taddr: 0x%x\n", rot_header->addr);
printf ("\t\tbmc_i2c_addr: 0x%x\n", rot_header->bmc_i2c_addr);
printf ("\t\tcpld_addr: 0x%x\n", rot_header->cpld_addr);
printf ("\t\tcpld_channel: %i\n", rot_header->cpld_channel);
printf ("\t\tactive: %i\n", rot_header->active);
printf ("\t\tdefault_failure_action: %i\n", rot_header->default_failure_action);
printf ("\t\tflags: 0x%x\n", rot_header->flags);
printf ("\t\treserved1: %i\n", rot_header->reserved1);
printf ("\t\treserved2: %i\n", rot_header->reserved2);
printf ("\t\treserved3: %i\n", rot_header->reserved3);
printf ("\t}\n");
printf ("\tPorts\n");
printf ("\t[\n");
for (int i = 0; i < rot_header->num_ports; ++i)
{
struct pcd_port_header *port = (struct pcd_port_header*)pointer;
pointer += sizeof(struct pcd_port_header);
printf ("\t\tpcd_port_header\n");
printf ("\t\t{\n");
printf ("\t\t\tlength: %i\n", port->length);
printf ("\t\t\theader_len: %i\n", port->header_len);
printf ("\t\t\tformat_id: %i\n", port->format_id);
printf ("\t\t\tid: %i\n", port->id);
printf ("\t\t\treserved1: %i\n", port->reserverd1);
printf ("\t\t\treserved2: %i\n", port->reserverd2);
printf ("\t\t\tfrequency: %i\n", port->frequency);
printf ("\t\t}\n");
}
printf ("\t]\n");
struct pcd_components_header *components_header = (struct pcd_components_header*) pointer;
pointer += sizeof(struct pcd_components_header);
printf ("\tpcd_components_header\n");
printf ("\t{\n");
printf ("\t\tlength: %i\n", components_header->length);
printf ("\t\theader_len: %i\n", components_header->header_len);
printf ("\t\tformat_id: %i\n", components_header->format_id);
printf ("\t\tnum_components: %i\n", components_header->num_components);
printf ("\t\treserved1: %i\n", components_header->reserved1);
printf ("\t\treserved2: %i\n", components_header->reserved2);
printf ("\t}\n");
printf ("\tComponents\n");
printf ("\t[\n");
for (int i = 0; i < components_header->num_components; ++i)
{
struct pcd_component_header *component_header = (struct pcd_component_header*)pointer;
pointer += sizeof(struct pcd_component_header);
printf ("\t\t{\n");
printf ("\t\t\tpcd_component_header\n");
printf ("\t\t\t{\n");
printf ("\t\t\t\tlength: %i\n", component_header->length);
printf ("\t\t\t\theader_len: %i\n", component_header->header_len);
printf ("\t\t\t\tformat_id: %i\n", component_header->format_id);
printf ("\t\t\t\tnum_muxes: %i\n", component_header->num_muxes);
printf ("\t\t\t\taddr: 0x%x\n", component_header->addr);
printf ("\t\t\t\tchannel: %i\n", component_header->channel);
printf ("\t\t\t\tflags: %i\n", component_header->flags);
printf ("\t\t\t\teid: 0x%x\n", component_header->eid);
printf ("\t\t\t\tpower_ctrl_reg: 0x%x\n", component_header->power_ctrl_reg);
printf ("\t\t\t\tpower_ctrl_mask: 0x%x\n", component_header->power_ctrl_mask);
printf ("\t\t\t\tid: %i\n", component_header->id);
printf ("\t\t\t}\n");
printf ("\t\t\tMuxes\n");
printf ("\t\t\t[\n");
for (int i = 0; i < component_header->num_muxes; ++i)
{
struct pcd_mux_header *mux = (struct pcd_mux_header*)pointer;
pointer += sizeof(struct pcd_mux_header);
printf ("\t\t\t\t\tpcd_mux_header\n");
printf ("\t\t\t\t\t{\n");
printf ("\t\t\t\t\t\tlength: %i\n", mux->length);
printf ("\t\t\t\t\t\theader_len: %i\n", mux->header_len);
printf ("\t\t\t\t\t\tformat_id: %i\n", mux->format_id);
printf ("\t\t\t\t\t\taddr: 0x%x\n", mux->addr);
printf ("\t\t\t\t\t\tchannel: %i\n", mux->channel);
printf ("\t\t\t\t\t\tmux_level: %i\n", mux->mux_level);
printf ("\t\t\t\t\t}\n");
}
printf ("\t\t\t]\n");
printf ("\t\t}\n");
}
printf ("\t]\n");
struct pcd_platform_header *platform_header = (struct pcd_platform_header*) pointer;
pointer += sizeof(struct pcd_platform_header);
printf ("\tpcd_platform_header\n");
printf ("\t{\n");
printf ("\t\tlength: %i\n", platform_header->length);
printf ("\t\theader_len: %i\n", platform_header->header_len);
printf ("\t\tformat_id: %i\n", platform_header->format_id);
printf ("\t\tid_len: %i\n", platform_header->id_len);
printf ("\t\treserved1: %i\n", platform_header->reserved1);
printf ("\t\treserved2: %i\n", platform_header->reserved2);
printf ("\t}\n");
char *platform_id = malloc (platform_header->id_len + 1);
if (platform_id == NULL) {
printf ("Failed to allocate platform id buffer.\n");
return -1;
}
memcpy (platform_id, pointer, platform_header->id_len);
platform_id[platform_header->id_len] = '\0';
pointer += platform_header->id_len;
printf ("\tPlatform ID: %s\n", platform_id);
free (platform_id);
printf ("}\n");
return (pointer - pcd);
}
int main (int argc, char** argv)
{
FILE *fp;
size_t offset;
uint8_t manifest_type;
uint8_t *pointer;
uint8_t *manifest;
unsigned long fileLen;
if (argc < 3 || argv == NULL) {
printf ("No PCD file passed in.\n");
return -1;
}
if (strncmp (argv[2], "pfm", 3) == 0) {
manifest_type = MANIFEST_TYPE_PFM;
}
else if (strncmp (argv[2], "cfm", 3) == 0) {
manifest_type = MANIFEST_TYPE_CFM;
}
else if (strncmp (argv[2], "pcd", 3) == 0) {
manifest_type = MANIFEST_TYPE_PCD;
}
else {
printf ("Manifest type unknown: %s", argv[2]);
return -1;
}
fp = fopen (argv[1], "rb");
if (fp == NULL) {
printf ("Failed to open PCD file.\n");
return -1;
}
fseek (fp, 0, SEEK_END);
fileLen = ftell (fp);
fseek (fp, 0, SEEK_SET);
manifest = (uint8_t*) malloc (fileLen + 1);
if (manifest == NULL) {
printf ("Failed to allocate PCD buffer.\n");
return -1;
}
fread ((void*) manifest, sizeof (uint8_t), fileLen, fp);
fclose (fp);
struct manifest_header *header = (struct manifest_header *) manifest;
printf ("manifest_header\n");
printf ("{");
printf ("\tlength: %i\n", header->length);
printf ("\tmagic: 0x%x\n", header->magic);
printf ("\tid: %i\n", header->id);
printf ("\tsig_length: %i\n", header->sig_length);
printf ("\treserved: %i\n", header->reserved);
printf ("}\n");
switch (manifest_type)
{
case MANIFEST_TYPE_PFM:
offset = visualize_pfm (manifest + sizeof (struct manifest_header));
break;
case MANIFEST_TYPE_CFM:
offset = visualize_cfm (manifest + sizeof (struct manifest_header));
break;
case MANIFEST_TYPE_PCD:
offset = visualize_pcd (manifest + sizeof (struct manifest_header));
break;
default:
goto exit;
}
pointer = manifest + offset + sizeof (struct manifest_header);
printf ("Signature:");
for (int k = 0; k < header->sig_length; ++k)
{
if ((k % 32) == 0)
{
printf ("\n\t");
}
printf ("%02x", pointer[k]);
}
printf ("\n");
exit:
free (manifest);
}

Просмотреть файл

@ -0,0 +1,4 @@
C:\Cerberus\tools\recovery_tools\recovery_image.xml
Output=C:\Cerberus\tools\recovery_tools\recovery_image.bin
Key=C:\Cerberus\core\testing\keys\rsapriv.pem
KeySize=256

Просмотреть файл

@ -0,0 +1,405 @@
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT license.
"""
from __future__ import print_function
from __future__ import unicode_literals
import os
import sys
import traceback
import xml.etree.ElementTree as et
import binascii
import re
import ctypes
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA256
RECOVERY_IMAGE_CONFIG_FILENAME = "recovery_image_generator.config"
XML_VERSION_ATTRIB = "version"
XML_PLATFORM_ATTRIB = "platform"
XML_RECOVERY_SECTION_TAG = "RecoverySection"
XML_WRITE_ADDRESS_TAG = "WriteAddress"
XML_ENCODED_IMAGE_TAG = "EncodedImage"
RECOVERY_IMAGE_MAGIC_NUM = int("0x8a147c29", 16)
RECOVERY_IMAGE_SECTION_MAGIC_NUM = int("0x4b172f31", 16)
RECOVERY_IMAGE_FORMAT_NUM = 0
RECOVERY_IMAGE_SECTION_FORMAT_NUM = 0
RECOVERY_IMAGE_SECTION_HEADER_LENGTH = 16
RECOVERY_IMAGE_MAX_SIZE = 524288
RECOVERY_IMAGE_MAX_VERSION_ID_SIZE = 32
class recovery_image_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('header_length', ctypes.c_ushort),
('format', ctypes.c_ushort),
('marker', ctypes.c_uint),
('image_length', ctypes.c_uint),
('sig_length', ctypes.c_uint),
('platform_id_length', ctypes.c_ubyte)]
class recovery_image_section_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('header_length', ctypes.c_ushort),
('format', ctypes.c_ushort),
('marker', ctypes.c_uint),
('addr', ctypes.c_uint),
('image_length', ctypes.c_uint)]
def process_recovery_image(root):
"""
Process the tree storing the recovery image data starting with the root element
:param root: The root element for the tree storing the XML recovery image data
:return dictionary of the processed recovery image data
"""
xml = {}
version_id = root.attrib.get(XML_VERSION_ATTRIB)
if (version_id in (None, "") or (len(version_id) > (RECOVERY_IMAGE_MAX_VERSION_ID_SIZE - 1))):
raise ValueError("Invalid or no recovery image version ID provided")
platform_id = root.attrib.get(XML_PLATFORM_ATTRIB)
if platform_id in (None, ""):
raise ValueError("No Platform ID provided")
xml["version_id"] = version_id.strip().encode("utf8")
xml["platform_id"] = platform_id.strip().encode("utf8")
sections = root.findall(XML_RECOVERY_SECTION_TAG)
if not sections:
raise ValueError("Invalid number of RecoverySections tags in recovery image: {0}".format(
xml["version_id"]))
xml["sections"] = []
for s in sections:
recovery_section = {}
write_addr = s.findall(XML_WRITE_ADDRESS_TAG)
if not write_addr or len(write_addr) > 1:
raise ValueError("Invalid number of WriteAddress tags in recovery image: {0}".format(
xml["version_id"]))
recovery_section["addr"] = write_addr[0].text.strip()
encoded_image = s.findall(XML_ENCODED_IMAGE_TAG)
if not encoded_image or len(encoded_image) > 1:
raise ValueError("Invalid number of EncodedImage tags in recovery image: {0}".format(
xml["version_id"]))
recovery_section["image"] = binascii.a2b_base64(re.sub("\s", "",
encoded_image[0].text.strip()))
if len(xml["sections"]) == 0:
xml["sections"].append(recovery_section)
else:
ind = 1
for sec in xml["sections"]:
if sec["addr"] > recovery_section["addr"]:
xml["sections"].insert(ind - 1, recovery_section)
break
elif sec["addr"] == recovery_section["addr"]:
raise ValueError("Invalid WriteAddress in recovery image section: {0}".format(
recovery_section["addr"]))
elif ind == len(xml["sections"]):
xml["sections"].append(recovery_section)
break
ind += 1
if not xml["sections"]:
raise ValueError("No recovery sections found for recovery image: {0}".format(xml["version_id"]))
return xml
def load_config(config_file):
"""
Load configuration options from file
:param config_file: Path for a text file containing config options
:return parsed configuration
"""
config = {}
config["xml"] = ""
config["output"] = ""
config["prv_key_path"] = ""
config["key_size"] = ""
with open(config_file, 'r') as fh:
data = fh.readlines()
if not data:
print("Failed to load configuration")
sys.exit(1)
for string in data:
string = string.replace("\n", "")
string = string.replace("\r", "")
if string.startswith("Output"):
config["output"] = string.split("=")[-1].strip()
elif string.startswith("KeySize"):
config["key_size"] = string.split("=")[-1].strip()
elif string.startswith("Key"):
config["prv_key_path"] = string.split("=")[-1].strip()
else:
config["xml"] = string
return config
def load_and_process_xml(xml_file):
"""
Process the XML file storing the recovery image data
:param xml_file: Name of XML file storing the recovery image data
:return dictionary of the processed recovery image data
"""
root = et.parse(xml_file).getroot()
return process_recovery_image(root)
def get_recovery_image_len(xml, sig_len):
"""
Calculate the recovery image length from the processed recovery image data. The total includes
the headers, image(s), and signature.
:param xml: The processed recovery image data
:param sig_len: The recovery image signature length
:return the total length of the recovery image
"""
header_len = 49 + len(xml["platform_id"]) + 1
image_lens = 0
for section in xml["sections"]:
image_lens += len(section["image"]) + 16
return header_len + image_lens + sig_len
def generate_recovery_image_section_instance(section):
"""
Create a recovery image section
:param section: The recovery image section data
:return instance of a recovery image section
"""
addr = int(section["addr"], 16)
section_header = recovery_image_section_header(RECOVERY_IMAGE_SECTION_HEADER_LENGTH,
RECOVERY_IMAGE_SECTION_FORMAT_NUM, RECOVERY_IMAGE_SECTION_MAGIC_NUM, addr,
len(section["image"]))
img_array = (ctypes.c_ubyte * len(section["image"])).from_buffer_copy(section["image"])
class recovery_image_section(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('header', recovery_image_section_header),
('img', ctypes.c_ubyte * len(section["image"]))]
return recovery_image_section(section_header, img_array)
def generate_recovery_image_section_list(xml):
"""
Create a list of recovery image sections from the parsed XML list
:param xml: List of parsed XML recovery image data
:return list of recovery image section instances
"""
section_list = []
min_addr = -1
for s in xml["sections"]:
sec_addr = int(s["addr"], 16)
if sec_addr <= min_addr:
raise ValueError("Invalid WriteAddress in recovery image section: {0}".format(
s["addr"]))
section = generate_recovery_image_section_instance(s)
section_list.append(section)
min_addr = sec_addr + len(s["image"])
return section_list
def generate_recovery_image(xml, recovery_image_header_instance, recovery_image_sections_list):
"""
Create a recovery image object from all the different recovery image components
:param xml: List of parsed XML recovery image data
:param recovery_image_header_instance: Instance of a recovery image header
:param recovery_image_sections_list: List of recovery image sections to be included in the
recovery image
:return Instance of a recovery image object
"""
sections_size = 0
for section in recovery_image_sections_list:
sections_size += section.header.header_length + section.header.image_length
version_len = len(xml["version_id"])
xml["version_id"] = xml["version_id"].decode() + ''.join('\x00' for i in range(version_len, 32))
version_id_str_buf = ctypes.create_string_buffer(xml["version_id"].encode('utf-8'), 32)
version_id_buf = (ctypes.c_ubyte * 32)()
ctypes.memmove(ctypes.addressof(version_id_buf), ctypes.addressof(version_id_str_buf), 32)
xml["platform_id"] = xml["platform_id"].decode() + '\x00'
platform_id_str_buf = ctypes.create_string_buffer(xml["platform_id"].encode('utf-8'), len(xml["platform_id"]))
platform_id_buf = (ctypes.c_ubyte * len(xml["platform_id"]))()
ctypes.memmove(ctypes.addressof(platform_id_buf), ctypes.addressof(platform_id_str_buf),
len(xml["platform_id"]))
class complete_header(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('header_length', ctypes.c_ushort),
('format', ctypes.c_ushort),
('marker', ctypes.c_uint),
('version_id', ctypes.c_ubyte * 32),
('image_length', ctypes.c_uint),
('sig_length', ctypes.c_uint),
('platform_id_length', ctypes.c_ubyte),
('platform_id', ctypes.c_ubyte * len(xml["platform_id"]))]
class recovery_image(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [('recovery_image_header', ctypes.c_ubyte *
recovery_image_header_instance.header_length),
('recovery_sections', ctypes.c_ubyte * sections_size)]
complete_header_instance = complete_header(recovery_image_header_instance.header_length,
recovery_image_header_instance.format, recovery_image_header_instance.marker,
version_id_buf, recovery_image_header_instance.image_length,
recovery_image_header_instance.sig_length,
recovery_image_header_instance.platform_id_length, platform_id_buf)
header_buf = (ctypes.c_ubyte * recovery_image_header_instance.header_length)()
ctypes.memmove(ctypes.addressof(header_buf), ctypes.addressof(complete_header_instance),
recovery_image_header_instance.header_length)
offset = 0
sections_buf = (ctypes.c_ubyte * sections_size)()
for section in recovery_image_sections_list:
ctypes.memmove(ctypes.addressof(sections_buf) + offset, ctypes.addressof(section),
section.header.header_length + section.header.image_length)
offset += section.header.header_length + section.header.image_length
return recovery_image(header_buf, sections_buf)
def write_recovery_image(sign, recovery_image, key, output_file_name, recovery_image_header):
"""
Write recovery image generated to provided path.
:param sign: Boolean indicating whether to sign the recovery image or not
:param recovery_image: Generated recovery image to write
:param key: Key to use for signing
:param output_filename: Name to use for output file
:param recovery_image_header: The recovery image header instance
"""
recovery_image_length = recovery_image_header.image_length - recovery_image_header.sig_length
if ctypes.sizeof(recovery_image) > (RECOVERY_IMAGE_MAX_SIZE - recovery_image_header.sig_length):
raise ValueError("Recovery image is too large - {0}".format(ctypes.sizeof(recovery_image)))
if ctypes.sizeof(recovery_image) != recovery_image_length:
raise ValueError("Recovery image doesn't match output size")
if sign:
h = SHA256.new(recovery_image)
signer = PKCS1_v1_5.new(key)
signature = signer.sign(h)
signature = (ctypes.c_ubyte * recovery_image_header.sig_length).from_buffer_copy(signature)
recovery_image_buf = (ctypes.c_char * (recovery_image_header.image_length))()
ctypes.memmove(ctypes.byref(recovery_image_buf, recovery_image_length), ctypes.addressof(signature),
recovery_image_header.sig_length)
else:
recovery_image_buf = (ctypes.c_char * (recovery_image_length))()
out_dir = os.path.dirname(os.path.abspath(output_file_name))
if not os.path.exists(out_dir):
os.makedirs(out_dir)
with open(output_file_name, 'wb') as fh:
ctypes.memmove(ctypes.byref(recovery_image_buf), ctypes.addressof(recovery_image),
recovery_image_length)
fh.write(recovery_image_buf)
def load_key(key_size, prv_key_path):
"""
Load private RSA key to sign the recovery image from the provided path. If no valid key can be
imported, key size will be what is provided. Otherwise, key size will be size of key imported.
:param key_size: Provided key_size
:param prv_key_path: Provided private key path
:return <Sign image or not> <key_size> <Key to use for signing>
"""
if prv_key_path:
try:
key = RSA.importKey(open(prv_key_path).read())
except Exception:
print("Unsigned recovery image will be generated, provided RSA key could not be imported: {0}".format(prv_key_path))
traceback.print_exc ()
return False, key_size, None
return True, int((key.size() + 1)/8), key
else:
print("No RSA private key provided in config, unsigned recovery image will be generated.")
return False, key_size, None
#*************************************** Start of Script ***************************************
if len(sys.argv) < 2:
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), RECOVERY_IMAGE_CONFIG_FILENAME)
else:
path = os.path.abspath(sys.argv[1])
config = load_config(path)
key_size = None
prv_key_path = None
if "key_size" in config and config["key_size"]:
key_size = int(config["key_size"])
if "prv_key_path" in config and config["prv_key_path"]:
prv_key_path = config["prv_key_path"]
sign, key_size, key = load_key(key_size, prv_key_path)
sig_len = 0 if key_size is None else key_size
processed_xml = load_and_process_xml(config["xml"])
platform_id_len = len(processed_xml["platform_id"]) + 1
header_len = 49 + platform_id_len
image_len = get_recovery_image_len(processed_xml, sig_len)
recovery_image_header_instance = recovery_image_header(header_len, RECOVERY_IMAGE_FORMAT_NUM,
RECOVERY_IMAGE_MAGIC_NUM, image_len, sig_len, platform_id_len)
recovery_image_sections_list = generate_recovery_image_section_list(processed_xml)
recovery_image = generate_recovery_image(processed_xml, recovery_image_header_instance,
recovery_image_sections_list)
write_recovery_image(sign, recovery_image, key, config["output"], recovery_image_header_instance)
print("Completed recovery image generation: {0}".format(config["output"]))

143
tools/testing/sign_csr.py Normal file
Просмотреть файл

@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the MIT license.
"""
import os
import sys
import datetime
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.serialization import Encoding
def print_usage():
print ("Usage: {0} <CSR> <CA key> <root CA key>".format (sys.argv[0]))
sys.exit (1)
if (len (sys.argv) < 4):
print_usage ()
if (not os.path.exists (sys.argv[1])):
print ("Can't find CSR file to sign: {0}".format (sys.argv[1]))
sys.exit (1)
with (open (sys.argv[1], "rb")) as csr_file:
csr_data = csr_file.read ()
csr = x509.load_der_x509_csr (csr_data, backend = default_backend ())
if (not os.path.exists (sys.argv[2])):
print ("Can't find CA key file: {0}".format (sys.argv[2]))
sys.exit (1)
with (open (sys.argv[2], "rb")) as key_file:
key_data = key_file.read ()
ca_key_priv = serialization.load_pem_private_key (key_data, password = None,
backend = default_backend ())
ca_key = ca_key_priv.public_key ()
if ((ca_key_priv.key_size == 384) or (ca_key_priv.key_size > 2048)):
ca_hash = hashes.SHA384 ()
else:
ca_hash = hashes.SHA256 ()
if (not os.path.exists (sys.argv[3])):
print ("Can't find root CA key file: {0}".format (sys.argv[3]))
sys.exit (1)
with (open (sys.argv[3], "rb")) as key_file:
key_data = key_file.read ()
root_key_priv = serialization.load_pem_private_key (key_data, password = None,
backend = default_backend ())
root_key = root_key_priv.public_key ()
if ((root_key_priv.key_size == 384) or (root_key_priv.key_size > 2048)):
root_hash = hashes.SHA384 ()
else:
root_hash = hashes.SHA256 ()
# Generate the root CA certificate
root = x509.CertificateBuilder ()
root = root.subject_name (x509.Name ([x509.NameAttribute (NameOID.COMMON_NAME, "Root")]))
root = root.issuer_name (x509.Name ([x509.NameAttribute (NameOID.COMMON_NAME, "Root")]))
root = root.serial_number (0x12345678)
root = root.not_valid_before (datetime.datetime (2019, 1, 1, 0, 0, 0))
root = root.not_valid_after (datetime.datetime (9999, 12, 31, 23, 59, 59))
root = root.add_extension (x509.SubjectKeyIdentifier.from_public_key (root_key), critical = False)
root = root.add_extension (x509.AuthorityKeyIdentifier.from_issuer_public_key (root_key),
critical = False)
root = root.add_extension (x509.BasicConstraints (ca = True, path_length = None), critical = True)
root = root.add_extension (x509.KeyUsage (digital_signature = False, content_commitment = False,
key_encipherment = False, data_encipherment = False, key_agreement = False,
key_cert_sign = True, crl_sign = False, encipher_only = False, decipher_only = False),
critical = True)
root = root.public_key (root_key)
root_cert = root.sign (private_key = root_key_priv, algorithm = root_hash,
backend = default_backend ())
with (open ("root.crt", "wb")) as cert_file:
cert_file.write (root_cert.public_bytes (Encoding.DER))
# Generate an intermediate CA certificate
ca = x509.CertificateBuilder ()
ca = ca.subject_name (x509.Name ([x509.NameAttribute (NameOID.COMMON_NAME, "IntrCA")]))
ca = ca.issuer_name (x509.Name ([x509.NameAttribute (NameOID.COMMON_NAME, "Root")]))
ca = ca.serial_number (0x76543210)
ca = ca.not_valid_before (datetime.datetime (2019, 1, 1, 0, 0, 0))
ca = ca.not_valid_after (datetime.datetime (9999, 12, 31, 23, 59, 59))
ca = ca.add_extension (x509.SubjectKeyIdentifier.from_public_key (ca_key), critical = False)
ca = ca.add_extension (x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier (
root_cert.extensions.get_extension_for_class (x509.SubjectKeyIdentifier)), critical = False)
ca = ca.add_extension (x509.BasicConstraints (ca = True, path_length = None), critical = True)
ca = ca.add_extension (x509.KeyUsage (digital_signature = False, content_commitment = False,
key_encipherment = False, data_encipherment = False, key_agreement = False,
key_cert_sign = True, crl_sign = False, encipher_only = False, decipher_only = False),
critical = True)
ca = ca.public_key (ca_key)
ca_cert = ca.sign (private_key = root_key_priv, algorithm = root_hash,
backend = default_backend ())
with (open ("ca.crt", "wb")) as cert_file:
cert_file.write (ca_cert.public_bytes (Encoding.DER))
# Sign the CSR
cert = x509.CertificateBuilder ()
cert = cert.subject_name (csr.subject)
cert = cert.issuer_name (x509.Name ([x509.NameAttribute (NameOID.COMMON_NAME, "IntrCA")]))
cert = cert.serial_number (0x11223344)
cert = cert.not_valid_before (datetime.datetime (2019, 1, 1, 0, 0, 0))
cert = cert.not_valid_after (datetime.datetime (9999, 12, 31, 23, 59, 59))
cert = cert.add_extension (x509.SubjectKeyIdentifier.from_public_key (csr.public_key ()),
critical = False)
cert = cert.add_extension (x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier (
ca_cert.extensions.get_extension_for_class (x509.SubjectKeyIdentifier)), critical = False)
for ext in csr.extensions:
cert = cert.add_extension (ext.value, critical = ext.critical)
cert = cert.public_key (csr.public_key ())
image_cert = cert.sign (private_key = ca_key_priv, algorithm = ca_hash,
backend = default_backend ())
with (open ("cert.crt", "wb")) as cert_file:
cert_file.write (image_cert.public_bytes (Encoding.DER))