зеркало из https://github.com/mozilla/tls-canary.git
537 строки
17 KiB
Python
537 строки
17 KiB
Python
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
# You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
import io
|
|
import datetime
|
|
import glob
|
|
import bz2
|
|
import hashfs
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CertDB(object):
|
|
"""
|
|
Class to efficiently store SSL certificates
|
|
"""
|
|
# TODO: CertDB needs testing
|
|
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.cert_dir = self.log_dir = os.path.abspath(os.path.join(args.workdir, "certs"))
|
|
if not os.path.isdir(self.cert_dir):
|
|
os.makedirs(self.cert_dir)
|
|
self.hash_fs = hashfs.HashFS(self.cert_dir, depth=4, width=1, algorithm='sha256')
|
|
|
|
def put(self, der_data):
|
|
if type(der_data) is str:
|
|
hash_address = self.hash_fs.put(io.StringIO(str(der_data)), "der")
|
|
logger.debug("Wrote certificate data to `%s`" % hash_address.abspath)
|
|
return hash_address.id
|
|
elif type(der_data) is list:
|
|
return list(map(self.put, der_data))
|
|
else:
|
|
raise Exception("Unsupported argument type")
|
|
|
|
def exists(self, hash_id):
|
|
return self.hash_fs.exists(hash_id)
|
|
|
|
def get_abspath(self, hash_id):
|
|
hash_address = self.hash_fs.get(hash_id)
|
|
if hash_address is None:
|
|
return None
|
|
else:
|
|
return hash_address.abspath
|
|
|
|
def get_relpath(self, hash_id):
|
|
hash_address = self.hash_fs.get(hash_id)
|
|
if hash_address is None:
|
|
return None
|
|
else:
|
|
return hash_address.relpath
|
|
|
|
|
|
class RunLogDB(object):
|
|
"""
|
|
Class to manage run log files
|
|
"""
|
|
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.log_dir = os.path.abspath(os.path.join(args.workdir, "log"))
|
|
if not os.path.isdir(self.log_dir):
|
|
os.makedirs(self.log_dir)
|
|
self.cert_db = CertDB(self.args)
|
|
|
|
def handle_to_dir_name(self, handle):
|
|
"""
|
|
Converts a log handle to its directory name
|
|
:param handle: str with handle
|
|
:return: str with file name
|
|
"""
|
|
# handle format is .strftime("%Y-%m-%dZ%H-%M-%S")
|
|
year, month, _, _, _ = handle.split("-")
|
|
return os.path.join(self.log_dir, year, month, handle)
|
|
|
|
@staticmethod
|
|
def dir_name_to_handle(dir_name):
|
|
"""
|
|
Converts run log dir name to its handle
|
|
:param dir_name: str
|
|
:return: str with handle
|
|
"""
|
|
return os.path.basename(dir_name)
|
|
|
|
def exists(self, handle):
|
|
"""
|
|
Check whether run log handle is valid
|
|
:param handle: str with handle
|
|
:return: bool
|
|
"""
|
|
matches = glob.glob(os.path.join(self.log_dir, "2???", "??", "%s" % handle))
|
|
return len(matches) > 0
|
|
|
|
def list_logs(self):
|
|
"""
|
|
Returns a list of available log directory names
|
|
:return: list of str of available log directories
|
|
"""
|
|
return glob.glob(os.path.join(self.log_dir, "2???", "??", "2???-??-??*"))
|
|
|
|
def list(self):
|
|
"""
|
|
Returns a list of log handles
|
|
:return: list of str of log handles
|
|
"""
|
|
return [self.dir_name_to_handle(dir_name) for dir_name in self.list_logs()]
|
|
|
|
def delete(self, handle):
|
|
"""
|
|
Delete a log specified by its handle. All associated
|
|
files are purged from the database. There is no undo.
|
|
:param handle: str with log handle
|
|
:return: None
|
|
"""
|
|
global logger
|
|
# TODO: method needs testing
|
|
dir_name = self.handle_to_dir_name(handle)
|
|
logger.debug("Purging `%s` from run log database" % dir_name)
|
|
shutil.rmtree(dir_name)
|
|
|
|
def list_parts(self, handle):
|
|
"""
|
|
Returns a list of parts for a log handles
|
|
:return: list of str of log parts
|
|
"""
|
|
# TODO: method needs testing
|
|
return os.listdir(self.handle_to_dir_name(handle))
|
|
|
|
def part_path(self, handle, part):
|
|
"""
|
|
Return absolute path of a part inside a log's directory.
|
|
:param handle: str with log handle
|
|
:param part: str with part handle
|
|
:return: str with absolute path
|
|
"""
|
|
# TODO: method needs testing
|
|
log_dir_path = self.handle_to_dir_name(handle)
|
|
return os.path.join(log_dir_path, part)
|
|
|
|
def open(self, handle, part, mode="r", compress=True):
|
|
"""
|
|
Open a log file by handle and part name
|
|
:param handle: str log handle
|
|
:param part: str part name
|
|
:param mode: str file mode
|
|
:param compress: bool
|
|
:return: file object
|
|
"""
|
|
global logger
|
|
|
|
part_name = self.part_path(handle, part)
|
|
if "w" in mode and not os.path.isdir(os.path.dirname(part_name)):
|
|
os.makedirs(os.path.dirname(part_name))
|
|
|
|
if "r" in mode:
|
|
if os.path.exists(part_name + ".bz2"):
|
|
part_name = part_name + ".bz2"
|
|
else:
|
|
if compress:
|
|
part_name += ".bz2"
|
|
|
|
logger.debug("Opening run log file `%s` in mode `%s`" % (part_name, mode))
|
|
|
|
try:
|
|
if part_name.endswith(".bz2"):
|
|
return bz2.BZ2File(part_name, mode)
|
|
else:
|
|
return open(part_name, mode)
|
|
except IOError as err:
|
|
raise Exception("Error opening run log file for handle `%s` part `%s`: %s" % (handle, part, err))
|
|
|
|
def read(self, handle, part):
|
|
"""
|
|
Return the string content of a log file referenced by its handle
|
|
:param handle: str with handle
|
|
:param part: str with part name
|
|
:return: str with content of logfile
|
|
"""
|
|
global logger
|
|
logger.debug("Reading run log `%s` part `%s`" % (handle, part))
|
|
with self.open(handle, part, "r") as f:
|
|
return f.read().decode("utf-8")
|
|
|
|
def write(self, handle, part, data):
|
|
"""
|
|
Write to log file referenced by handle and part.
|
|
The data object will be passed through str().encode("utf-8").
|
|
:param handle: str with handle
|
|
:param part: str with part name
|
|
:param data: object to write
|
|
:return: None
|
|
"""
|
|
global logger
|
|
logger.debug("Writing run log `%s`" % handle)
|
|
with self.open(handle, part, "w") as f:
|
|
f.write(str(data).encode("utf-8"))
|
|
|
|
def put_cert(self, cert_data):
|
|
"""
|
|
Add a certificate to the certificate database. Returns handle
|
|
:param cert_data: str with DER data
|
|
:return: str handle
|
|
"""
|
|
# TODO: method needs testing
|
|
return self.cert_db.put(cert_data)
|
|
|
|
def new_log(self):
|
|
"""
|
|
Return a new RunLog object that supports incremental logging
|
|
:return:
|
|
"""
|
|
handle = datetime.datetime.utcnow().strftime("%Y-%m-%dZ%H-%M-%S")
|
|
return RunLog(handle, "w", self)
|
|
|
|
def read_log(self, handle):
|
|
"""
|
|
Return a RunLog object that works on the existing log
|
|
refered to by handle.
|
|
:param handle: str with handle
|
|
:return: RunLog object
|
|
"""
|
|
return RunLog(handle, "r", self)
|
|
|
|
|
|
class RunLog(object):
|
|
"""
|
|
Class to keep state about runs in a generic format
|
|
suitable for later processing. Calling the .log() method is
|
|
designed to work incrementally such that we don't need to keep
|
|
aggregating all results in memory.
|
|
"""
|
|
|
|
format_revision = 2
|
|
|
|
def __init__(self, handle, mode, db):
|
|
"""
|
|
Constructor for RunLog
|
|
|
|
The `mode` argument determines solely how the run log object
|
|
behaves when called in a `with` statement. If the mode is "w",
|
|
then .start() is called.
|
|
|
|
Calling .start() switches the log to running mode. Iterating the
|
|
log or calling len() while running throws an exception.
|
|
|
|
:param handle: str with log handle
|
|
:param mode: str with mode ("r" or "w")
|
|
:param db: RunLogDB instance
|
|
"""
|
|
self.db = db
|
|
self.handle = handle
|
|
self.mode = mode
|
|
self.parts = []
|
|
self.log_fh = None
|
|
self.meta_fh = None
|
|
self.filter = None
|
|
self.meta = None
|
|
self.is_running = False
|
|
|
|
def part(self, part_handle):
|
|
"""
|
|
Returns absolute path for an additional log part, referenced
|
|
by its handle, inside the log directory.
|
|
:param part_handle: str with handle
|
|
:return: str with absolute path
|
|
"""
|
|
# TODO: method needs testing
|
|
return self.db.part_path(self.handle, part_handle)
|
|
|
|
def open_part(self, part_handle, mode="r", compress=True):
|
|
"""
|
|
Returns file object of given log part, openend in specified mode
|
|
and compression. The class keeps internally track of file objects
|
|
generated by this method, so they can all be closed at once.
|
|
|
|
:param part_handle: str with handle
|
|
:param mode: str with file mode
|
|
:param compress: bool
|
|
:return: file object
|
|
"""
|
|
part_fh = self.db.open(self.handle, part_handle, mode, compress)
|
|
self.parts.append(part_fh)
|
|
return part_fh
|
|
|
|
def close(self):
|
|
"""
|
|
Close all files that were opened through .open_part().
|
|
"""
|
|
if self.is_running:
|
|
self.stop()
|
|
|
|
if self.log_fh is not None:
|
|
self.log_fh.close()
|
|
self.log_fh = None
|
|
if self.meta_fh is not None:
|
|
self.meta_fh.close()
|
|
self.meta_fh = None
|
|
for f in self.parts:
|
|
if not f.closed:
|
|
f.close()
|
|
self.parts = []
|
|
|
|
def start(self, meta=None, log_filter=lambda x: x):
|
|
"""
|
|
Start logging. Writes a metadata header to the file.
|
|
|
|
Calling this method puts the log in running mode.
|
|
Starting an existing log will overwrite log and meta data.
|
|
|
|
:param meta: optional dict with metadata to write
|
|
:param log_filter: filter lambda to map on every log line
|
|
:return: None
|
|
"""
|
|
if self.is_running:
|
|
raise Exception("Can't restart already running log `%s`", self.handle)
|
|
|
|
self.meta = meta if meta is not None else {}
|
|
self.meta["format_revision"] = self.format_revision
|
|
self.meta["run_completed"] = False
|
|
self.meta["log_lines"] = 0
|
|
self.filter = log_filter
|
|
|
|
self.log_fh = self.open_part("log", self.mode, compress=True)
|
|
self.meta_fh = self.open_part("meta", self.mode, compress=False)
|
|
|
|
self.meta_fh.seek(0)
|
|
self.meta_fh.write(json.dumps(self.meta, indent=4, sort_keys=True))
|
|
self.meta_fh.truncate()
|
|
|
|
self.is_running = True
|
|
|
|
def log(self, result_batch):
|
|
"""
|
|
Write an individual or multiple lines to the log file.
|
|
Each line is passed through the filter function set by .start().
|
|
A line is not logged if the filter returns None.
|
|
:param result_batch: log item or list of log items
|
|
:return: None
|
|
"""
|
|
if not self.is_running:
|
|
raise Exception("Unable to write to stopped log `%s`" % self.handle)
|
|
|
|
if type(result_batch) is not list:
|
|
result_batch = [result_batch]
|
|
for result in map(self.filter, result_batch):
|
|
if result is not None:
|
|
self.log_fh.write(("%s\n" % json.dumps(result)).encode("utf-8"))
|
|
self.meta["log_lines"] += 1
|
|
|
|
def stop(self, meta=None):
|
|
"""
|
|
Wrap up the log file. The given metadata is joined with previously
|
|
provided metadata and appended at the end of the file. Finally, an
|
|
indicator is written to the log file that is used for distinguishing
|
|
between complete and partial log files.
|
|
|
|
Metadata given to .stop() has precedence over metadata given to .start().
|
|
:param meta: dict with additional metadata
|
|
:return: None
|
|
"""
|
|
if not self.is_running:
|
|
raise Exception("Unable to stop stopped log `%s`", self.handle)
|
|
|
|
if meta is None:
|
|
meta = {}
|
|
self.meta.update(meta)
|
|
self.meta["run_completed"] = True
|
|
self.meta_fh.seek(0)
|
|
self.meta_fh.write(json.dumps(self.meta, indent=4, sort_keys=True))
|
|
self.meta_fh.truncate()
|
|
|
|
self.meta_fh.close()
|
|
self.meta_fh = None
|
|
self.log_fh.close()
|
|
self.log_fh = None
|
|
|
|
self.is_running = False
|
|
|
|
def has_finished(self):
|
|
"""
|
|
Checks whether the run finished. If the log file is in write mode,
|
|
it checks whether the .stop() method has been called, yet. If the log is in
|
|
read mode, it checks for the presence of the `run_completed` metadata.
|
|
:return: bool
|
|
"""
|
|
if self.is_running:
|
|
return False
|
|
meta = self.get_meta()
|
|
if "run_completed" not in meta:
|
|
return False
|
|
else:
|
|
return meta["run_completed"]
|
|
|
|
def is_compatible(self):
|
|
"""
|
|
Returns whether the log has the same revision as RunLog.format_revision.
|
|
:return: bool
|
|
"""
|
|
if self.is_running:
|
|
return True
|
|
meta = self.get_meta()
|
|
if "format_revision" not in meta:
|
|
return False
|
|
else:
|
|
return meta["format_revision"] == self.format_revision
|
|
|
|
def update_meta(self, meta):
|
|
"""
|
|
Update the metadata on a log. If the log is running, it will not be
|
|
written until .stop() is called.
|
|
|
|
Keep in mind that metadata is only updated with the data provided.
|
|
Keys can't be deleted.
|
|
:param meta: dict
|
|
:return: None
|
|
"""
|
|
if self.is_running:
|
|
self.meta.update(meta)
|
|
else:
|
|
self.get_meta()
|
|
self.meta.update(meta)
|
|
with self.open_part("meta", "w") as f:
|
|
f.write(json.dumps(self.meta, indent=4, sort_keys=True))
|
|
|
|
def get_meta(self):
|
|
"""
|
|
Get metadata from log. If the log is running, the metadata collected
|
|
by calls to .start() and .stop() is returned. If the log is not running,
|
|
metadata is taken from the `meta` property of this object.
|
|
:return: dict with metadata
|
|
"""
|
|
if self.is_running:
|
|
return self.meta
|
|
if self.meta is None:
|
|
with self.open_part("meta") as f:
|
|
meta = f.read().strip()
|
|
try:
|
|
self.meta = json.loads(meta)
|
|
except ValueError:
|
|
self.meta = {"CORRUPTED": True}
|
|
return self.meta
|
|
|
|
def __iter__(self):
|
|
"""
|
|
If the log is in read mode, iterate over all log lines.
|
|
:return: iterator
|
|
"""
|
|
if self.is_running:
|
|
raise Exception("Unable to iterate running log `%s`", self.handle)
|
|
|
|
with self.open_part("log") as f:
|
|
line_number = 0
|
|
while True:
|
|
try:
|
|
line_number += 1
|
|
line = f.readline().decode("utf-8")
|
|
except EOFError:
|
|
logger.debug("EOFError on log `%s`. Log is truncated." % self.handle)
|
|
break
|
|
if line == "":
|
|
break
|
|
if not line.startswith("#"):
|
|
try:
|
|
json_line = json.loads(line.strip())
|
|
except ValueError as err:
|
|
raise Exception("JSON error in run log `%s` line %d: %s"
|
|
% (self.handle, line_number, str(err)))
|
|
yield json_line
|
|
|
|
def __len__(self):
|
|
"""
|
|
Return the number of lines in the log.
|
|
|
|
If the log is running, returns the number of lines written so far.
|
|
If the log is not running and was completed, it returns the value of
|
|
`log_lines` from the metadata.
|
|
If the log is not running and was not completed, it iterates all the
|
|
log lines to count determine the number (slow!).
|
|
"""
|
|
# TODO: method needs testing
|
|
if self.is_running:
|
|
return self.meta["log_lines"]
|
|
|
|
if self.has_finished():
|
|
return self.get_meta()["log_lines"]
|
|
|
|
logger.debug("Counting lines in incomplete log `%s`" % self.handle)
|
|
incomplete_log_lines = 0
|
|
for _ in self:
|
|
incomplete_log_lines += 1
|
|
return incomplete_log_lines
|
|
|
|
def __enter__(self):
|
|
"""Called when object enters a `with` block"""
|
|
if "w" in self.mode:
|
|
self.start()
|
|
return self
|
|
|
|
def __exit__(self, exception_type, exception_value, traceback):
|
|
"""Called when object exits a `with` block"""
|
|
if self.is_running:
|
|
self.stop()
|
|
self.close()
|
|
if exception_type is not None:
|
|
logger.error("Exception while handling run log %s: %s %s %s"
|
|
% (self.handle, exception_type, exception_value, traceback))
|
|
return self
|
|
|
|
def delete(self):
|
|
"""
|
|
Delete this log from the log database. All associated files are
|
|
purged from disk. Be careful! There is no undo.
|
|
|
|
After calling this method, the log object becomes dysfunctional.
|
|
No other methods should be called hereafter.
|
|
:return: None
|
|
"""
|
|
# TODO: method needs testing
|
|
if self.is_running:
|
|
self.stop()
|
|
self.close()
|
|
self.db.delete(self.handle)
|
|
|
|
def put_cert(self, cert_data):
|
|
"""
|
|
Add a certificate to the certificate database. Return handle
|
|
:param cert_data: str with DER data
|
|
:return: str handle
|
|
"""
|
|
# TODO: method needs testing
|
|
return self.db.put_cert(cert_data)
|