2020-11-17 05:06:54 +03:00
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
2019-04-11 19:40:07 +03:00
import argparse
import json
import logging
import os
2019-08-07 16:38:31 +03:00
import re
2020-10-26 16:12:15 +03:00
from urllib import parse as urlparse
2019-04-11 19:40:07 +03:00
from collections import defaultdict
import manifestupdate
from wptrunner import expected
from wptrunner.wptmanifest.serializer import serialize
from wptrunner.wptmanifest.backends import base
here = os.path.dirname(__file__)
logger = logging.getLogger(__name__)
2019-08-07 16:38:31 +03:00
yaml = None
2019-04-11 19:40:07 +03:00
class Compiler(base.Compiler):
def visit_KeyValueNode(self, node):
key_name = node.data
values = []
for child in node.children:
self.output_node.set(key_name, values)
def visit_ConditionalNode(self, node):
assert len(node.children) == 2
# For conditional nodes, just return the subtree
return node.children[0], self.visit(node.children[1])
def visit_UnaryExpressionNode(self, node):
raise NotImplementedError
def visit_BinaryExpressionNode(self, node):
raise NotImplementedError
def visit_UnaryOperatorNode(self, node):
raise NotImplementedError
def visit_BinaryOperatorNode(self, node):
raise NotImplementedError
class ExpectedManifest(base.ManifestItem):
def __init__(self, node, test_path, url_base):
"""Object representing all the tests in a particular manifest
:param name: Name of the AST Node associated with this object.
Should always be None since this should always be associated with
the root node of the AST.
:param test_path: Path of the test file associated with this manifest.
:param url_base: Base url for serving the tests in this manifest
if test_path is None:
raise ValueError("ExpectedManifest requires a test path")
if url_base is None:
raise ValueError("ExpectedManifest requires a base url")
base.ManifestItem.__init__(self, node)
self.child_map = {}
self.test_path = test_path
self.url_base = url_base
def append(self, child):
"""Add a test to the manifest"""
base.ManifestItem.append(self, child)
self.child_map[child.id] = child
def url(self):
return urlparse.urljoin(
self.url_base, "/".join(self.test_path.split(os.path.sep))
2020-10-26 21:34:53 +03:00
2019-04-11 19:40:07 +03:00
class DirectoryManifest(base.ManifestItem):
class TestManifestItem(base.ManifestItem):
def __init__(self, node, **kwargs):
"""Tree node associated with a particular test in a manifest
:param name: name of the test"""
base.ManifestItem.__init__(self, node)
self.subtests = {}
def id(self):
return urlparse.urljoin(self.parent.url, self.name)
def append(self, node):
"""Add a subtest to the current test
:param node: AST Node associated with the subtest"""
child = base.ManifestItem.append(self, node)
self.subtests[child.name] = child
def get_subtest(self, name):
"""Get the SubtestNode corresponding to a particular subtest, by name
:param name: Name of the node to return"""
if name in self.subtests:
return self.subtests[name]
return None
class SubtestManifestItem(TestManifestItem):
def data_cls_getter(output_node, visited_node):
# visited_node is intentionally unused
if output_node is None:
return ExpectedManifest
if isinstance(output_node, ExpectedManifest):
return TestManifestItem
if isinstance(output_node, TestManifestItem):
return SubtestManifestItem
raise ValueError
def get_manifest(metadata_root, test_path, url_base):
"""Get the ExpectedManifest for a particular test path, or None if there is no
metadata stored for that test path.
:param metadata_root: Absolute path to the root of the metadata directory
:param test_path: Path to the test(s) relative to the test root
:param url_base: Base url for serving the tests in this manifest
:param run_info: Dictionary of properties of the test run for which the expectation
values should be computed.
manifest_path = expected.expected_path(metadata_root, test_path)
2020-10-26 16:12:15 +03:00
with open(manifest_path, "rb") as f:
2019-04-11 19:40:07 +03:00
return compile(
except IOError:
return None
def get_dir_manifest(path):
"""Get the ExpectedManifest for a particular test path, or None if there is no
metadata stored for that test path.
:param path: Full path to the ini file
:param run_info: Dictionary of properties of the test run for which the expectation
values should be computed.
2020-10-26 16:12:15 +03:00
with open(path, "rb") as f:
2020-09-29 17:22:49 +03:00
return compile(f, data_cls_getter=lambda x, y: DirectoryManifest)
2019-04-11 19:40:07 +03:00
except IOError:
return None
def compile(stream, data_cls_getter=None, **kwargs):
return base.compile(Compiler, stream, data_cls_getter=data_cls_getter, **kwargs)
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--out-dir", help="Directory to store output files")
2019-08-07 16:38:31 +03:00
"--meta-dir", help="Directory containing wpt-metadata " "checkout to update."
2019-04-11 19:40:07 +03:00
return parser
def run(src_root, obj_root, logger_=None, **kwargs):
logger_obj = logger_ if logger_ is not None else logger
manifests = manifestupdate.run(src_root, obj_root, logger_obj, **kwargs)
rv = {}
dirs_seen = set()
for meta_root, test_path, test_metadata in iter_tests(manifests):
for dir_path in get_dir_paths(meta_root, test_path):
if dir_path not in dirs_seen:
dir_manifest = get_dir_manifest(dir_path)
rel_path = os.path.relpath(dir_path, meta_root)
if dir_manifest:
add_manifest(rv, rel_path, dir_manifest)
add_manifest(rv, test_path, test_metadata)
if kwargs["out_dir"]:
if not os.path.exists(kwargs["out_dir"]):
out_path = os.path.join(kwargs["out_dir"], "summary.json")
with open(out_path, "w") as f:
json.dump(rv, f)
2019-08-30 03:05:00 +03:00
print(json.dumps(rv, indent=2))
2019-04-11 19:40:07 +03:00
2019-08-07 16:38:31 +03:00
if kwargs["meta_dir"]:
update_wpt_meta(logger_obj, kwargs["meta_dir"], rv)
2019-04-11 19:40:07 +03:00
def get_dir_paths(test_root, test_path):
if not os.path.isabs(test_path):
test_path = os.path.join(test_root, test_path)
dir_path = os.path.dirname(test_path)
while dir_path != test_root:
yield os.path.join(dir_path, "__dir__.ini")
dir_path = os.path.dirname(dir_path)
assert len(dir_path) >= len(test_root)
def iter_tests(manifests):
2020-10-26 16:12:15 +03:00
for manifest in manifests.keys():
2019-04-11 19:40:07 +03:00
for test_type, test_path, tests in manifest:
url_base = manifests[manifest]["url_base"]
metadata_base = manifests[manifest]["metadata_path"]
expected_manifest = get_manifest(metadata_base, test_path, url_base)
if expected_manifest:
yield metadata_base, test_path, expected_manifest
def add_manifest(target, path, metadata):
dir_name = os.path.dirname(path)
key = [dir_name]
add_metadata(target, key, metadata)
for test_metadata in metadata.children:
add_metadata(target, key, test_metadata)
for subtest_metadata in test_metadata.children:
add_metadata(target, key, subtest_metadata)
simple_props = [
statuses = set(["CRASH"])
def add_metadata(target, key, metadata):
if not is_interesting(metadata):
for part in key:
if part not in target:
target[part] = {}
target = target[part]
for prop in simple_props:
2020-09-29 17:22:49 +03:00
if metadata.has_key(prop): # noqa W601
2019-04-11 19:40:07 +03:00
target[prop] = get_condition_value_list(metadata, prop)
2020-09-29 17:22:49 +03:00
if metadata.has_key("expected"): # noqa W601
2019-07-26 00:55:01 +03:00
intermittent = []
2019-04-11 19:40:07 +03:00
values = metadata.get("expected")
by_status = defaultdict(list)
for item in values:
if isinstance(item, tuple):
condition, status = item
condition = None
status = item
2019-07-24 16:53:16 +03:00
if isinstance(status, list):
2019-07-26 00:55:01 +03:00
intermittent.append((condition, status))
expected_status = status[0]
expected_status = status
2019-04-11 19:40:07 +03:00
for status in statuses:
if status in by_status:
target["expected_%s" % status] = [
serialize(item) if item else None for item in by_status[status]
2019-07-26 00:55:01 +03:00
if intermittent:
target["intermittent"] = [
[serialize(cond) if cond else None, intermittent_statuses]
for cond, intermittent_statuses in intermittent
2019-04-11 19:40:07 +03:00
def get_condition_value_list(metadata, key):
conditions = []
for item in metadata.get(key):
if isinstance(item, tuple):
assert len(item) == 2
conditions.append((serialize(item[0]), item[1]))
conditions.append((None, item))
return conditions
def is_interesting(metadata):
2020-09-29 17:22:49 +03:00
if any(metadata.has_key(prop) for prop in simple_props): # noqa W601
2019-04-11 19:40:07 +03:00
return True
2020-09-29 17:22:49 +03:00
if metadata.has_key("expected"): # noqa W601
2019-07-26 00:55:01 +03:00
for expected_value in metadata.get("expected"):
2019-07-24 16:53:16 +03:00
# Include both expected and known intermittent values
2019-07-26 00:55:01 +03:00
if isinstance(expected_value, tuple):
expected_value = expected_value[1]
if isinstance(expected_value, list):
return True
if expected_value in statuses:
return True
2019-07-24 16:53:16 +03:00
return True
2019-04-11 19:40:07 +03:00
return False
2019-08-07 16:38:31 +03:00
def update_wpt_meta(logger, meta_root, data):
global yaml
import yaml
if not os.path.exists(meta_root) or not os.path.isdir(meta_root):
raise ValueError("%s is not a directory" % (meta_root,))
with WptMetaCollection(meta_root) as wpt_meta:
2020-10-26 16:12:15 +03:00
for dir_path, dir_data in sorted(data.items()):
for test, test_data in dir_data.get("_tests", {}).items():
2019-08-07 16:38:31 +03:00
add_test_data(logger, wpt_meta, dir_path, test, None, test_data)
2020-10-26 16:12:15 +03:00
for subtest, subtest_data in test_data.get("_subtests", {}).items():
2019-08-07 16:38:31 +03:00
logger, wpt_meta, dir_path, test, subtest, subtest_data
2020-09-29 17:22:49 +03:00
2019-08-07 16:38:31 +03:00
def add_test_data(logger, wpt_meta, dir_path, test, subtest, test_data):
triage_keys = ["bug"]
for key in triage_keys:
if key in test_data:
value = test_data[key]
for cond_value in value:
if cond_value[0] is not None:
logger.info("Skipping conditional metadata")
cond_value = cond_value[1]
if not isinstance(cond_value, list):
cond_value = [cond_value]
for bug_value in cond_value:
bug_link = get_bug_link(bug_value)
if bug_link is None:
logger.info("Could not extract bug: %s" % value)
meta = wpt_meta.get(dir_path)
meta.set(test, subtest, product="firefox", bug_url=bug_link)
bugzilla_re = re.compile("https://bugzilla\.mozilla\.org/show_bug\.cgi\?id=\d+")
bug_re = re.compile("(?:[Bb][Uu][Gg])?\s*(\d+)")
def get_bug_link(value):
value = value.strip()
m = bugzilla_re.match(value)
if m:
return m.group(0)
m = bug_re.match(value)
if m:
return "https://bugzilla.mozilla.org/show_bug.cgi?id=%s" % m.group(1)
class WptMetaCollection(object):
def __init__(self, root):
self.root = root
self.loaded = {}
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
for item in self.loaded.itervalues():
self.loaded = {}
def get(self, dir_path):
if dir_path not in self.loaded:
meta = WptMeta.get_or_create(self.root, dir_path)
self.loaded[dir_path] = meta
return self.loaded[dir_path]
class WptMeta(object):
def __init__(self, dir_path, data):
assert "links" in data and isinstance(data["links"], list)
self.dir_path = dir_path
self.data = data
def meta_path(meta_root, dir_path):
return os.path.join(meta_root, dir_path, "META.yml")
def path(self, meta_root):
return self.meta_path(meta_root, self.dir_path)
def get_or_create(cls, meta_root, dir_path):
if os.path.exists(cls.meta_path(meta_root, dir_path)):
return cls.load(meta_root, dir_path)
return cls(dir_path, {"links": []})
def load(cls, meta_root, dir_path):
with open(cls.meta_path(meta_root, dir_path), "r") as f:
data = yaml.safe_load(f)
return cls(dir_path, data)
def set(self, test, subtest, product, bug_url):
target_link = None
for link in self.data["links"]:
link_product = link.get("product")
if link_product:
link_product = link_product.split("-", 1)[0]
if link_product is None or link_product == product:
if link["url"] == bug_url:
target_link = link
if target_link is None:
target_link = {
"product": product.encode("utf8"),
"url": bug_url.encode("utf8"),
"results": [],
2020-09-29 17:22:49 +03:00
if "results" not in target_link:
2019-08-07 16:38:31 +03:00
target_link["results"] = []
has_result = any(
(result["test"] == test and result.get("subtest") == subtest)
2020-09-29 17:22:49 +03:00
for result in target_link["results"]
2019-08-07 16:38:31 +03:00
if not has_result:
data = {"test": test.encode("utf8")}
if subtest:
data["subtest"] = subtest.encode("utf8")
def write(self, meta_root):
path = self.path(meta_root)
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
with open(path, "wb") as f:
yaml.safe_dump(self.data, f, default_flow_style=False, allow_unicode=True)