tls-canary/tlscanary/report.py

282 строки
10 KiB
Python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import dateutil.parser
from distutils import dir_util
import json
import logging
import os
import shutil
from tlscanary.tools import cert
logger = logging.getLogger(__name__)
module_dir = os.path.split(__file__)[0]
def generate(mode, logs, output_dir):
global logger
logger.debug("Generating `%s` report for %d logs in `%s`" % (mode, len(logs), output_dir))
if mode == "web":
for log_name in sorted(logs.keys()):
log = logs[log_name]
meta = log.get_meta()
if meta["mode"] != "regression":
logger.warning("Skipping report generation for non-regression log `%s`" % log_name)
continue
if not log.has_finished():
logger.warning("Skipping report generation for incomplete log `%s`" % log_name)
continue
if not log.is_compatible():
logger.warning("Skipping report generation for incompatible log `%s`" % log_name)
continue
web_report(log, output_dir)
else:
logger.critical("Report generator mode `%s` not implemented" % mode)
def web_report(log, report_dir):
global logger
# Create report directory if necessary.
if not os.path.exists(report_dir):
logger.debug('Creating report directory %s' % report_dir)
os.makedirs(report_dir)
# Fetch log metadata
meta = log.get_meta()
run_start_time = dateutil.parser.parse(meta["run_start_time"])
timestamp = run_start_time.strftime("%Y-%m-%d-%H-%M-%S")
# Read the complete runs log to see if this log was already reported
runs_log_file = os.path.join(report_dir, "runs", "runs.json")
if os.path.exists(runs_log_file):
with open(runs_log_file) as f:
runs_log = json.load(f)
for line in runs_log[0]["data"]:
logger.debug("Line read from runs.json: `%s`" % line)
else:
# File does not exist, create an empty log
runs_log = json.loads('[{"data":[]}]')
if timestamp in json.dumps(runs_log):
logger.warning("Skipping log `%s` which was already reported before" % log.handle)
return
# Write log file
run_dir = os.path.join(report_dir, "runs", timestamp)
logger.info("Writing HTML report to `%s`" % run_dir)
uri_data = []
for line in log:
if meta["args"]["filter"] == 1:
# Filter out stray timeout errors
connection_speed = line["response"]["response_time"]-line["response"]["command_time"]
timeout = line["response"]["original_cmd"]["args"]["timeout"] * 1000
try:
error_message = line["response"]["result"]["info"]["short_error_message"]
except KeyError:
error_message = "unknown"
if error_message == "NS_BINDING_ABORTED" and connection_speed > timeout:
continue
uri_data.append(line)
log_data = [{"meta": log.get_meta(), "data": uri_data}]
# Install static template files in report directory
template_dir = os.path.join(module_dir, "template")
dir_util.copy_tree(os.path.join(template_dir, "js"),
os.path.join(report_dir, "js"))
dir_util.copy_tree(os.path.join(template_dir, "css"),
os.path.join(report_dir, "css"))
dir_util.copy_tree(os.path.join(template_dir, "img"),
os.path.join(report_dir, "img"))
shutil.copyfile(os.path.join(template_dir, "index.htm"),
os.path.join(report_dir, "index.htm"))
# Create per-run directory for report output
if not os.path.isdir(run_dir):
os.makedirs(run_dir)
# Copy profiles
if "profiles" in meta:
for profile in meta["profiles"]:
log_zip = log.part(profile["log_part"])
run_dir_zip = os.path.join(run_dir, profile["log_part"])
logger.debug("Copying `%s` profile archive from `%s` to `%s`" % (profile["name"], log_zip, run_dir_zip))
shutil.copyfile(log_zip, run_dir_zip)
cert_dir = os.path.join(run_dir, "certs")
__extract_certificates(log, cert_dir)
shutil.copyfile(os.path.join(template_dir, "report_template.htm"),
os.path.join(run_dir, "index.htm"))
# Write the final log file
with open(os.path.join(run_dir, "log.json"), "w") as log_file:
log_file.write(json.dumps(log_data, indent=4, sort_keys=True))
# Append to runs log
new_run_log = {
"run": timestamp,
"branch": meta["test_metadata"]["branch"].capitalize(),
"errors": len(log),
"description": "Fx%s %s vs Fx%s %s" % (meta["test_metadata"]["app_version"],
meta["test_metadata"]["branch"],
meta["base_metadata"]["app_version"],
meta["base_metadata"]["branch"])
}
runs_log[0]["data"].append(new_run_log)
logger.debug("Writing back runs log to `%s`" % runs_log_file)
with open(runs_log_file, "w") as f:
f.write(json.dumps(runs_log, indent=4, sort_keys=True))
def __extract_certificates(log, cert_dir):
global logger
if not os.path.exists(cert_dir):
os.makedirs(cert_dir)
for log_line in log:
result = {
"host": log_line["host"],
"rank": log_line["rank"],
"response": log_line["response"]
}
cert_file = os.path.join(cert_dir, "%s.der" % result["host"])
if "certificate_chain" in result["response"]["result"]["info"] \
and result["response"]["result"]["info"]["certificate_chain"] is not None:
server_cert_string = "".join(map(chr, result["response"]["result"]["info"]["certificate_chain"][0]))
logger.debug("Writing certificate data for `%s` to `%s`" % (result["host"], cert_file))
with open(cert_file, "w") as f:
f.write(server_cert_string)
else:
logger.debug("No certificate data available for `%s`" % result["host"])
NSErrorMap = {
# For network error messages that are not obtainable otherwise
# https://developer.mozilla.org/en-US/docs/Mozilla/Errors
0X00000000: "NS_OK",
0X80004004: "NS_ERROR_ABORT",
0X8000FFFF: "UNEXPECTED_ERROR",
0X804B0002: "NS_BINDING_ABORTED",
0X804B000A: "ERROR_MALFORMED_URI",
0X804B000D: "CONNECTION_REFUSED_ERROR",
0X804B0014: "NET_RESET_ERROR",
0X804B001E: "DOMAIN_NOT_FOUND_ERROR",
}
def decode_ns_status(scan_result):
status = scan_result["response"]["result"]["info"]["status"]
try:
return NSErrorMap[status]
except KeyError:
return "UNKNOWN_STATUS"
def decode_error_type(scan_result):
status = scan_result["response"]["result"]["info"]["status"]
if status & 0xff0000 == 0x5a0000: # security module
error_class = scan_result["response"]["result"]["info"]["error_class"]
if error_class == 2: # nsINSSErrorsService::ERROR_CLASS_BAD_CERT
return "certificate"
else:
return "protocol"
else:
return "network"
def decode_raw_error(scan_result):
if "raw_error" in scan_result["response"]["result"]["info"]:
raw_error = scan_result["response"]["result"]["info"]["raw_error"]
if "Error code:" in raw_error:
return raw_error.split("Error code:")[1].split(">")[1].split("<")[0]
return decode_ns_status(scan_result)
def collect_error_info(scan_result):
error_info = {
"message": decode_raw_error(scan_result),
"code": "%s" % hex(scan_result["response"]["result"]["info"]["status"]),
"type": decode_error_type(scan_result)
}
return error_info
def collect_site_info(scan_result):
site_info = {
"timestamp": scan_result["response"]["response_time"],
"connectionSpeed": scan_result["response"]["response_time"] - scan_result["response"]["command_time"],
"uri": scan_result["host"],
"rank": scan_result["rank"]
}
return site_info
def collect_certificate_info(scan_result):
result = scan_result["response"]["result"]
if not result["info"]["ssl_status_status"]:
return {}
status = result["info"]["ssl_status"]
server_cert = status["serverCert"]
parsed_server_cert = cert.Cert(result["info"]["certificate_chain"][0])
root_cert = server_cert
chain_length = 1
while root_cert["issuer"] is not None:
root_cert = root_cert["issuer"]
chain_length += 1
cert_info = {
"nickname": server_cert["nickname"] if "nickname" in server_cert else "(no nickname)",
"emailAddress": server_cert["emailAddress"],
"subjectName": server_cert["subjectName"],
"commonName": server_cert["commonName"],
"organization": server_cert["organization"],
"organizationalUnit": server_cert["organizationalUnit"],
"issuerCommonName": server_cert["issuerCommonName"],
"issuerOrganization": server_cert["issuerOrganization"],
"sha1Fingerprint": server_cert["sha1Fingerprint"],
"sha256Fingerprint": server_cert["sha256Fingerprint"],
"chainLength": chain_length,
"certifiedUsages": result["info"]["certified_usages"],
"validityNotBefore": server_cert["validity"]["notBeforeGMT"],
"validityNotAfter": server_cert["validity"]["notAfterGMT"],
"isEV": str(status["isExtendedValidation"]),
"subjectAltName": parsed_server_cert.subject_alt_name(),
"signatureAlgorithm": parsed_server_cert.signature_hash_algorithm(),
"keyUsage": server_cert["keyUsages"],
"extKeyUsage": parsed_server_cert.ext_key_usage(),
"rootCertificateSubjectName": root_cert["subjectName"],
"rootCertificateOrganization": root_cert["organization"],
"rootCertificateOrganizationalUnit": root_cert["organizationalUnit"],
"rootCertificateSHA1Fingerprint": root_cert["sha1Fingerprint"],
}
return cert_info
def collect_scan_info(scan_result):
return {
"site_info": collect_site_info(scan_result),
"error": collect_error_info(scan_result),
"cert_info": collect_certificate_info(scan_result)
}
def add_performance_info(log_data, scan_result):
log_data["site_info"]["connectionSpeedChange"] = scan_result["response"]["connection_speed_change"]
log_data["site_info"]["connectionSpeedSamples"] = scan_result["response"]["connection_speed_samples"]